From 4bace63646ec7618389b28ab6cf9a93444d70168 Mon Sep 17 00:00:00 2001 From: Svenja Meyer Date: Tue, 6 Feb 2024 17:00:17 +0100 Subject: [PATCH] notifications: Add BillingMessageSerializerVisitor Motivation: At the moment there are different MessageSerializer in notifications to serialize different messages. One of them is BillingMessageSerializer which implies to be able to serialize all possible BillingMessages. In fact, it's just able to serialize MoverInfoMessages. Further, I was looking for an easy way to convert different kind of InfoMessages to JSON format for GH #7421. Modification: Refactoring of BillingMessageSerializer to MoverInfoMessage. Creation of BillingMessageSerializerVisitor to be able to serialize all billing messages into JSON format easiliy. This serializer uses the available visitor pattern. Missing message serializers was added. Result: BillingMessageSerializerVisitor can be used for all kind of BillingMessages and BillingMessageSerializer is no longer misleading. Target: master Patch: https://rb.dcache.org/r/14220/ Requires-notes: no Requires-book: no Acked-by: Dmitry --- .../BillingMessageSerializerVisitor.java | 60 +++++++++++++++++++ .../DoorRequestMessageSerializer.java | 2 +- ...r.java => MoverInfoMessageSerializer.java} | 5 +- .../PoolHitInfoMessageSerializer.java | 56 +++++++++++++++++ .../WarningPnfsFileInfoMessageSerializer.java | 45 ++++++++++++++ .../org/dcache/pool/classic/pool.xml | 2 +- 6 files changed, 166 insertions(+), 4 deletions(-) create mode 100644 modules/dcache/src/main/java/org/dcache/notification/BillingMessageSerializerVisitor.java rename modules/dcache/src/main/java/org/dcache/notification/{BillingMessageSerializer.java => MoverInfoMessageSerializer.java} (95%) create mode 100644 modules/dcache/src/main/java/org/dcache/notification/PoolHitInfoMessageSerializer.java create mode 100644 modules/dcache/src/main/java/org/dcache/notification/WarningPnfsFileInfoMessageSerializer.java diff --git a/modules/dcache/src/main/java/org/dcache/notification/BillingMessageSerializerVisitor.java b/modules/dcache/src/main/java/org/dcache/notification/BillingMessageSerializerVisitor.java new file mode 100644 index 00000000000..7e1398f16e9 --- /dev/null +++ b/modules/dcache/src/main/java/org/dcache/notification/BillingMessageSerializerVisitor.java @@ -0,0 +1,60 @@ +package org.dcache.notification; + +import diskCacheV111.vehicles.DoorRequestInfoMessage; +import diskCacheV111.vehicles.InfoMessageVisitor; +import diskCacheV111.vehicles.MoverInfoMessage; +import diskCacheV111.vehicles.PoolHitInfoMessage; +import diskCacheV111.vehicles.RemoveFileInfoMessage; +import diskCacheV111.vehicles.StorageInfoMessage; +import diskCacheV111.vehicles.WarningPnfsFileInfoMessage; + +public class BillingMessageSerializerVisitor implements InfoMessageVisitor { + + private byte[] data; + + @Override + public void visit(DoorRequestInfoMessage message) { + try (DoorRequestMessageSerializer serializer = new DoorRequestMessageSerializer()) { + data = serializer.serialize("", message); + } + } + + @Override + public void visit(MoverInfoMessage message) { + try (MoverInfoMessageSerializer serializer = new MoverInfoMessageSerializer()) { + data = serializer.serialize("", message); + } + } + + @Override + public void visit(PoolHitInfoMessage message) { + try (PoolHitInfoMessageSerializer serializer = new PoolHitInfoMessageSerializer()) { + data = serializer.serialize("", message); + } + } + + @Override + public void visit(RemoveFileInfoMessage message) { + try (RemoveFileInfoMessageSerializer serializer = new RemoveFileInfoMessageSerializer()) { + data = serializer.serialize("", message); + } + } + + @Override + public void visit(StorageInfoMessage message) { + try (StorageInfoMessageSerializer serializer = new StorageInfoMessageSerializer()) { + data = serializer.serialize("", message); + } + } + + @Override + public void visit(WarningPnfsFileInfoMessage message) { + try (WarningPnfsFileInfoMessageSerializer serializer = new WarningPnfsFileInfoMessageSerializer()) { + data = serializer.serialize("", message); + } + } + + public byte[] getData() { + return data; + } +} diff --git a/modules/dcache/src/main/java/org/dcache/notification/DoorRequestMessageSerializer.java b/modules/dcache/src/main/java/org/dcache/notification/DoorRequestMessageSerializer.java index 1c2b296060c..b390d96cc9f 100644 --- a/modules/dcache/src/main/java/org/dcache/notification/DoorRequestMessageSerializer.java +++ b/modules/dcache/src/main/java/org/dcache/notification/DoorRequestMessageSerializer.java @@ -69,7 +69,7 @@ public byte[] serialize(String topic, DoorRequestInfoMessage data) { MoverInfoMessage moverInfoMessage = data.getMoverInfo(); if (moverInfoMessage != null) { - JSONObject moverInfo = BillingMessageSerializer.transform(moverInfoMessage); + JSONObject moverInfo = MoverInfoMessageSerializer.transform(moverInfoMessage); Arrays.stream(REDUNDANT_MOVER_DATA_KEYS).forEach(moverInfo::remove); o.put("moverInfo", moverInfo); } diff --git a/modules/dcache/src/main/java/org/dcache/notification/BillingMessageSerializer.java b/modules/dcache/src/main/java/org/dcache/notification/MoverInfoMessageSerializer.java similarity index 95% rename from modules/dcache/src/main/java/org/dcache/notification/BillingMessageSerializer.java rename to modules/dcache/src/main/java/org/dcache/notification/MoverInfoMessageSerializer.java index efcffa6fa58..b07ff6d42b2 100644 --- a/modules/dcache/src/main/java/org/dcache/notification/BillingMessageSerializer.java +++ b/modules/dcache/src/main/java/org/dcache/notification/MoverInfoMessageSerializer.java @@ -15,7 +15,7 @@ import org.json.JSONArray; import org.json.JSONObject; -public class BillingMessageSerializer implements Serializer { +public class MoverInfoMessageSerializer implements Serializer { static JSONObject transform(MoverInfoMessage data) { JSONObject o = new JSONObject(); @@ -41,7 +41,8 @@ static JSONObject transform(MoverInfoMessage data) { data.getWriteIdle().ifPresent(d -> o.put("writeIdle", d.toString())); data.getReadActive().ifPresent(d -> o.put("readActive", d.toString())); data.getWriteActive().ifPresent(d -> o.put("writeActive", d.toString())); - data.getLocalEndpoint().ifPresent(d -> o.put("localEndpoint", InetAddresses.toUriString(d.getAddress()) + ":" + d.getPort())); + data.getLocalEndpoint().ifPresent(d -> o.put("localEndpoint", + InetAddresses.toUriString(d.getAddress()) + ":" + d.getPort())); JSONObject status = new JSONObject(); status.put("code", data.getResultCode()); diff --git a/modules/dcache/src/main/java/org/dcache/notification/PoolHitInfoMessageSerializer.java b/modules/dcache/src/main/java/org/dcache/notification/PoolHitInfoMessageSerializer.java new file mode 100644 index 00000000000..db176612167 --- /dev/null +++ b/modules/dcache/src/main/java/org/dcache/notification/PoolHitInfoMessageSerializer.java @@ -0,0 +1,56 @@ +package org.dcache.notification; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import diskCacheV111.vehicles.PoolHitInfoMessage; +import diskCacheV111.vehicles.StorageInfo; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import org.apache.kafka.common.serialization.Serializer; +import org.json.JSONObject; + +public class PoolHitInfoMessageSerializer implements Serializer { + + @Override + public byte[] serialize(String topic, PoolHitInfoMessage data) { + JSONObject o = new JSONObject(); + + o.put("version", "1.0"); + o.put("msgType", data.getMessageType()); + o.put("date", DateTimeFormatter.ISO_OFFSET_DATE_TIME + .format(ZonedDateTime.ofInstant(Instant.ofEpochMilli(data.getTimestamp()), + ZoneId.systemDefault()))); + o.put("cellName", data.getCellAddress().getCellName()); + o.put("cellType", data.getCellType()); + o.put("cellDomain", data.getCellAddress().getCellDomainName()); + + JSONObject status = new JSONObject(); + status.put("code", data.getResultCode()); + status.put("msg", data.getMessage()); + o.put("status", status); + + o.put("pnfsid", data.getPnfsId()); + o.put("filesize", data.getFileSize()); + o.put("billingPath", data.getBillingPath()); + o.put("protocol", data.getProtocolInfo()); + o.put("cached", data.getFileCached()); + + StorageInfo info = data.getStorageInfo(); + if (info != null) { + o.put("storageInfo", info.getStorageClass() + "@" + info.getHsm()); + } + + return o.toString().getBytes(UTF_8); + } + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public void close() { + } +} diff --git a/modules/dcache/src/main/java/org/dcache/notification/WarningPnfsFileInfoMessageSerializer.java b/modules/dcache/src/main/java/org/dcache/notification/WarningPnfsFileInfoMessageSerializer.java new file mode 100644 index 00000000000..b6b63d357ec --- /dev/null +++ b/modules/dcache/src/main/java/org/dcache/notification/WarningPnfsFileInfoMessageSerializer.java @@ -0,0 +1,45 @@ +package org.dcache.notification; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import diskCacheV111.vehicles.WarningPnfsFileInfoMessage; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import org.apache.kafka.common.serialization.Serializer; +import org.json.JSONObject; + +public class WarningPnfsFileInfoMessageSerializer implements + Serializer { + + @Override + public byte[] serialize(String topic, WarningPnfsFileInfoMessage data) { + JSONObject o = new JSONObject(); + + o.put("version", "1.0"); + o.put("msgType", data.getMessageType()); + o.put("date", DateTimeFormatter.ISO_OFFSET_DATE_TIME + .format(ZonedDateTime.ofInstant(Instant.ofEpochMilli(data.getTimestamp()), + ZoneId.systemDefault()))); + o.put("cellName", data.getCellAddress().getCellName()); + o.put("cellType", data.getCellType()); + o.put("cellDomain", data.getCellAddress().getCellDomainName()); + + JSONObject status = new JSONObject(); + status.put("code", data.getResultCode()); + status.put("msg", data.getMessage()); + o.put("status", status); + + return o.toString().getBytes(UTF_8); + } + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public void close() { + } +} diff --git a/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml b/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml index 5d69a55cabc..30b503c1ac6 100644 --- a/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml +++ b/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml @@ -708,7 +708,7 @@ - +