Skip to content

Commit

Permalink
notifications: Add BillingMessageSerializerVisitor
Browse files Browse the repository at this point in the history
Motivation:

At the moment there are different MessageSerializer in notifications to
serialize different messages. One of them is BillingMessageSerializer which
implies to be able to serialize all possible BillingMessages. In fact, it's
just able to serialize MoverInfoMessages. Further, I was looking for
an easy way to convert different kind of InfoMessages to JSON format for GH dCache#7421.

Modification:

Refactoring of BillingMessageSerializer to MoverInfoMessage.
Creation of BillingMessageSerializerVisitor to be able to serialize all billing
messages into JSON format easiliy. This serializer uses the available visitor
pattern.
Missing message serializers was added.

Result:

BillingMessageSerializerVisitor can be used for all kind of BillingMessages and
BillingMessageSerializer is no longer misleading.

Target: master
Patch: https://rb.dcache.org/r/14220/
Requires-notes: no
Requires-book: no
Acked-by: Dmitry
  • Loading branch information
svemeyer committed Feb 28, 2024
1 parent 00ba8c7 commit 4bace63
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.dcache.notification;

import diskCacheV111.vehicles.DoorRequestInfoMessage;
import diskCacheV111.vehicles.InfoMessageVisitor;
import diskCacheV111.vehicles.MoverInfoMessage;
import diskCacheV111.vehicles.PoolHitInfoMessage;
import diskCacheV111.vehicles.RemoveFileInfoMessage;
import diskCacheV111.vehicles.StorageInfoMessage;
import diskCacheV111.vehicles.WarningPnfsFileInfoMessage;

public class BillingMessageSerializerVisitor implements InfoMessageVisitor {

private byte[] data;

@Override
public void visit(DoorRequestInfoMessage message) {
try (DoorRequestMessageSerializer serializer = new DoorRequestMessageSerializer()) {
data = serializer.serialize("", message);
}
}

@Override
public void visit(MoverInfoMessage message) {
try (MoverInfoMessageSerializer serializer = new MoverInfoMessageSerializer()) {
data = serializer.serialize("", message);
}
}

@Override
public void visit(PoolHitInfoMessage message) {
try (PoolHitInfoMessageSerializer serializer = new PoolHitInfoMessageSerializer()) {
data = serializer.serialize("", message);
}
}

@Override
public void visit(RemoveFileInfoMessage message) {
try (RemoveFileInfoMessageSerializer serializer = new RemoveFileInfoMessageSerializer()) {
data = serializer.serialize("", message);
}
}

@Override
public void visit(StorageInfoMessage message) {
try (StorageInfoMessageSerializer serializer = new StorageInfoMessageSerializer()) {
data = serializer.serialize("", message);
}
}

@Override
public void visit(WarningPnfsFileInfoMessage message) {
try (WarningPnfsFileInfoMessageSerializer serializer = new WarningPnfsFileInfoMessageSerializer()) {
data = serializer.serialize("", message);
}
}

public byte[] getData() {
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public byte[] serialize(String topic, DoorRequestInfoMessage data) {
MoverInfoMessage moverInfoMessage = data.getMoverInfo();

if (moverInfoMessage != null) {
JSONObject moverInfo = BillingMessageSerializer.transform(moverInfoMessage);
JSONObject moverInfo = MoverInfoMessageSerializer.transform(moverInfoMessage);
Arrays.stream(REDUNDANT_MOVER_DATA_KEYS).forEach(moverInfo::remove);
o.put("moverInfo", moverInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.json.JSONArray;
import org.json.JSONObject;

public class BillingMessageSerializer implements Serializer<MoverInfoMessage> {
public class MoverInfoMessageSerializer implements Serializer<MoverInfoMessage> {

static JSONObject transform(MoverInfoMessage data) {
JSONObject o = new JSONObject();
Expand All @@ -41,7 +41,8 @@ static JSONObject transform(MoverInfoMessage data) {
data.getWriteIdle().ifPresent(d -> o.put("writeIdle", d.toString()));
data.getReadActive().ifPresent(d -> o.put("readActive", d.toString()));
data.getWriteActive().ifPresent(d -> o.put("writeActive", d.toString()));
data.getLocalEndpoint().ifPresent(d -> o.put("localEndpoint", InetAddresses.toUriString(d.getAddress()) + ":" + d.getPort()));
data.getLocalEndpoint().ifPresent(d -> o.put("localEndpoint",
InetAddresses.toUriString(d.getAddress()) + ":" + d.getPort()));

JSONObject status = new JSONObject();
status.put("code", data.getResultCode());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.dcache.notification;

import static java.nio.charset.StandardCharsets.UTF_8;

import diskCacheV111.vehicles.PoolHitInfoMessage;
import diskCacheV111.vehicles.StorageInfo;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import org.json.JSONObject;

public class PoolHitInfoMessageSerializer implements Serializer<PoolHitInfoMessage> {

@Override
public byte[] serialize(String topic, PoolHitInfoMessage data) {
JSONObject o = new JSONObject();

o.put("version", "1.0");
o.put("msgType", data.getMessageType());
o.put("date", DateTimeFormatter.ISO_OFFSET_DATE_TIME
.format(ZonedDateTime.ofInstant(Instant.ofEpochMilli(data.getTimestamp()),
ZoneId.systemDefault())));
o.put("cellName", data.getCellAddress().getCellName());
o.put("cellType", data.getCellType());
o.put("cellDomain", data.getCellAddress().getCellDomainName());

JSONObject status = new JSONObject();
status.put("code", data.getResultCode());
status.put("msg", data.getMessage());
o.put("status", status);

o.put("pnfsid", data.getPnfsId());
o.put("filesize", data.getFileSize());
o.put("billingPath", data.getBillingPath());
o.put("protocol", data.getProtocolInfo());
o.put("cached", data.getFileCached());

StorageInfo info = data.getStorageInfo();
if (info != null) {
o.put("storageInfo", info.getStorageClass() + "@" + info.getHsm());
}

return o.toString().getBytes(UTF_8);
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.dcache.notification;

import static java.nio.charset.StandardCharsets.UTF_8;

import diskCacheV111.vehicles.WarningPnfsFileInfoMessage;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import org.json.JSONObject;

public class WarningPnfsFileInfoMessageSerializer implements
Serializer<diskCacheV111.vehicles.WarningPnfsFileInfoMessage> {

@Override
public byte[] serialize(String topic, WarningPnfsFileInfoMessage data) {
JSONObject o = new JSONObject();

o.put("version", "1.0");
o.put("msgType", data.getMessageType());
o.put("date", DateTimeFormatter.ISO_OFFSET_DATE_TIME
.format(ZonedDateTime.ofInstant(Instant.ofEpochMilli(data.getTimestamp()),
ZoneId.systemDefault())));
o.put("cellName", data.getCellAddress().getCellName());
o.put("cellType", data.getCellType());
o.put("cellDomain", data.getCellAddress().getCellDomainName());

JSONObject status = new JSONObject();
status.put("code", data.getResultCode());
status.put("msg", data.getMessage());
o.put("status", status);

return o.toString().getBytes(UTF_8);
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@
<map>
<entry key="bootstrap.servers" value="${pool.kafka.producer.bootstrap.servers}"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<entry key="value.serializer" value="org.dcache.notification.BillingMessageSerializer"/>
<entry key="value.serializer" value="org.dcache.notification.MoverInfoMessageSerializer"/>
<entry key="client.id" value="${pool.cell.name}@${dcache.domain.name}-transfer"/>
</map>
</property>
Expand Down

0 comments on commit 4bace63

Please sign in to comment.