Skip to content

Commit

Permalink
deduped messages, aded stats etc
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 committed May 11, 2024
1 parent a447548 commit db0ad0c
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,10 @@ private static void addMinimumProducerDetails(
if (producer.getAccessMode() != null) {
traceDetails.put("accessMode", producer.getAccessMode().name());
}
traceDetails.put("clientHost",
TracingUtils.hostNameOf(producer.getClientAddress(), producer.getCnx().clientSourceAddressAndPort()));
traceDetails.put(
"clientHost",
TracingUtils.hostNameOf(
producer.getClientAddress(), producer.getCnx().clientSourceAddressAndPort()));

if (producer.getTopic() != null) {
traceDetails.put(
Expand All @@ -389,8 +391,10 @@ private static void addMinimumConsumerSubscriptionDetails(
if (consumer != null) {
traceDetails.put("consumerName", consumer.consumerName());
traceDetails.put("consumerId", consumer.consumerId());
traceDetails.put("clientHost",
TracingUtils.hostNameOf(consumer.getClientAddress(), consumer.cnx().clientSourceAddressAndPort()));
traceDetails.put(
"clientHost",
TracingUtils.hostNameOf(
consumer.getClientAddress(), consumer.cnx().clientSourceAddressAndPort()));
traceDetails.put("authRole", consumer.cnx().getAuthRole());
}

Expand Down Expand Up @@ -451,13 +455,16 @@ public void producerClosed(ServerCnx cnx, Producer producer, Map<String, String>
traceDetails.put("metadata", metadata);
traceDetails.put("brokerUrl", cnx.getBrokerService().getPulsar().getBrokerServiceUrl());

Map<String, Object> statsTrace = new TreeMap<>();
PublisherStatsImpl stats = producer.getStats();
traceDetails.put("connectedSince", stats.getConnectedSince());
traceDetails.put("closedAt", DateFormatter.now());
traceDetails.put("averageMsgSize", stats.getAverageMsgSize());
traceDetails.put("msgRateIn", stats.getMsgRateIn());
traceDetails.put("msgThroughputIn", stats.getMsgThroughputIn());

statsTrace.put("connectedSince", stats.getConnectedSince());
statsTrace.put("closedAt", DateFormatter.now());
statsTrace.put("averageMsgSize", stats.getAverageMsgSize());
statsTrace.put("msgRateIn", stats.getMsgRateIn());
statsTrace.put("msgThroughputIn", stats.getMsgThroughputIn());
// no message count in stats? stats.getCount() is not it
traceDetails.put("stats", statsTrace);

trace(EventCategory.PROD, EventSubCategory.CLOSED, traceDetails);
}
Expand Down Expand Up @@ -490,16 +497,24 @@ public void consumerClosed(ServerCnx cnx, Consumer consumer, Map<String, String>
traceDetails.put("brokerUrl", cnx.getBrokerService().getPulsar().getBrokerServiceUrl());

ConsumerStatsImpl stats = consumer.getStats();
traceDetails.put("connectedSince", stats.getConnectedSince());
traceDetails.put("closedAt", DateFormatter.now());
traceDetails.put("averageMsgSize", stats.getAvgMessagesPerEntry());
traceDetails.put("msgRateOut", stats.getMsgRateOut());
traceDetails.put("msgThroughputOut", stats.getMsgThroughputOut());
traceDetails.put("msgOutCounter", stats.getMsgOutCounter());
traceDetails.put("bytesOutCounter", stats.getBytesOutCounter());
traceDetails.put("unackedMessages", stats.getUnackedMessages());
traceDetails.put("messageAckRate", stats.getMessageAckRate());
traceDetails.put("msgRateRedeliver", stats.getMsgRateRedeliver());
Map<String, Object> statsTrace = new TreeMap<>();
statsTrace.put("connectedSince", stats.getConnectedSince());
statsTrace.put("closedAt", DateFormatter.now());
statsTrace.put("averageMsgSize", stats.getAvgMessagesPerEntry());
statsTrace.put("msgRateOut", stats.getMsgRateOut());
statsTrace.put("msgThroughputOut", stats.getMsgThroughputOut());
statsTrace.put("msgOutCounter", stats.getMsgOutCounter());
statsTrace.put("bytesOutCounter", stats.getBytesOutCounter());
statsTrace.put("unackedMessages", stats.getUnackedMessages());
statsTrace.put("messageAckRate", stats.getMessageAckRate());
statsTrace.put("msgRateRedeliver", stats.getMsgRateRedeliver());
statsTrace.put("readPositionWhenJoining", stats.getReadPositionWhenJoining());
Subscription sub = consumer.getSubscription();
if (sub != null) {
statsTrace.put("subscriptionApproxBacklog", sub.getNumberOfEntriesInBacklog(false));
statsTrace.put("subscriptionMsgRateExpired", sub.getExpiredMessageRate());
}
traceDetails.put("stats", statsTrace);

trace(EventCategory.CONS, EventSubCategory.CLOSED, traceDetails);
}
Expand All @@ -524,11 +539,11 @@ public void onMessagePublish(
if (TraceLevel.PAYLOAD == level && headersAndPayload != null) {
Map<String, Object> headersAndPayloadDetails = new TreeMap<>();
traceMetadataAndPayload(
"headersAndPayload",
headersAndPayload.slice(),
"payload",
headersAndPayload.retainedDuplicate(),
headersAndPayloadDetails,
maxPayloadLength);
traceDetails.put("payload", headersAndPayloadDetails);
traceDetails.put("headersAndPayload", headersAndPayloadDetails);
}

trace(EventCategory.MSG, EventSubCategory.PRODUCED, traceDetails);
Expand Down Expand Up @@ -569,7 +584,9 @@ public void beforeSendMessage(

addMinimumConsumerSubscriptionDetails(consumer, subscription, traceDetails);

traceDetails.put("entry", getEntryDetails(level, entry, maxPayloadLength));
traceDetails.put("messageId", entry.getLedgerId() + ":" + entry.getEntryId());

traceDetails.put("headersAndPayload", getEntryDetails(level, entry, maxPayloadLength));

trace(EventCategory.MSG, EventSubCategory.READ, traceDetails);
}
Expand All @@ -588,11 +605,11 @@ public void messageDispatched(
if (TraceLevel.PAYLOAD == level && headersAndPayload != null) {
Map<String, Object> headersAndPayloadDetails = new TreeMap<>();
traceMetadataAndPayload(
"headersAndPayload",
headersAndPayload.slice(),
"payload",
headersAndPayload.retainedDuplicate(),
headersAndPayloadDetails,
maxPayloadLength);
traceDetails.put("payload", headersAndPayloadDetails);
traceDetails.put("headersAndPayload", headersAndPayloadDetails);
}

trace(EventCategory.MSG, EventSubCategory.DISPATCHED, traceDetails);
Expand All @@ -604,35 +621,14 @@ public void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck ackCmd) {
TraceLevel level = getTracingLevel(consumer);
if (consumer != null && level == TraceLevel.OFF) return;

EventSubCategory subcategory = EventSubCategory.ACKED;

Map<String, Object> traceDetails = new TreeMap<>();

addMinimumConsumerSubscriptionDetails(consumer, traceDetails);

if (consumer == null) {
// ack with empty consumer == message filtered by JMSFilter
traceDetails.put("reason", "filtered by JMSFilter");
subcategory = EventSubCategory.FILTERED;
} else {
// todo: am I right that unacked/nacked messages never go through broker interceptor?
// in this case we need consumer interceptor to track nacks
traceDetails.put("reason", "acked");
}

if (consumer != null && consumer.getSubscription() != null) {
Subscription sub = consumer.getSubscription();
traceDetails.put("subscriptionName", sub.getName());
traceDetails.put("topicName", TopicName.get(sub.getTopicName()).getPartitionedTopicName());
traceDetails.put("subscriptionType", sub.getType().name());
}
Map<String, Object> ackDetails = new TreeMap<>();
if (ackCmd.hasAckType()) {
ackDetails.put("type", ackCmd.getAckType().name());
}
if (ackCmd.hasConsumerId()) {
ackDetails.put("ackConsumerId", ackCmd.getConsumerId());
}
ackDetails.put("numAckedMessages", ackCmd.getMessageIdsCount());
ackDetails.put(
"messageIds",
Expand All @@ -652,6 +648,9 @@ public void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck ackCmd) {

traceDetails.put("ack", ackDetails);

EventSubCategory subcategory =
consumer == null ? EventSubCategory.FILTERED : EventSubCategory.ACKED;

trace(EventCategory.MSG, subcategory, traceDetails);
}

Expand Down Expand Up @@ -708,7 +707,7 @@ public void onWebserviceResponse(ServletRequest request, ServletResponse respons

Map<String, Object> traceDetails = new TreeMap<>();

traceDetails.put("remoteHost", hostNameOf(request.getRemoteHost(), request.getRemotePort()));
traceDetails.put("host", hostNameOf(request.getRemoteHost(), request.getRemotePort()));
traceDetails.put("protocol", request.getProtocol());
traceDetails.put("scheme", request.getScheme());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ public String load(String clientAddress) {
});

public static String hostNameOf(String clientAddress, String clientSourceAddressAndPort) {
if (clientAddress == null || clientAddress.isEmpty()
|| clientSourceAddressAndPort == null || !clientSourceAddressAndPort.contains(":")) {
if (clientAddress == null
|| clientAddress.isEmpty()
|| clientSourceAddressAndPort == null
|| !clientSourceAddressAndPort.contains(":")) {
return "unknown/null";
}

Expand Down Expand Up @@ -207,7 +209,8 @@ private static void populateConnectionDetails(ServerCnx cnx, Map<String, Object>
if (cnx == null) {
return;
}
traceDetails.put("clientHost", hostNameOf(cnx.clientSourceAddress(), cnx.clientSourceAddressAndPort()));
traceDetails.put(
"clientHost", hostNameOf(cnx.clientSourceAddress(), cnx.clientSourceAddressAndPort()));
traceDetails.put("authRole", cnx.getAuthRole());
traceDetails.put("clientVersion", cnx.getClientVersion());
traceDetails.put("clientSourceAddressAndPort", cnx.clientSourceAddressAndPort());
Expand Down Expand Up @@ -300,15 +303,12 @@ private static void populateConsumerDetails(Consumer consumer, Map<String, Objec

traceDetails.put("name", consumer.consumerName());
traceDetails.put("consumerId", consumer.consumerId());
Subscription sub = consumer.getSubscription();
if (sub != null) {
traceDetails.put("subscriptionName", sub.getName());
traceDetails.put("topicName", TopicName.get(sub.getTopicName()).getPartitionedTopicName());
}

traceDetails.put("priorityLevel", consumer.getPriorityLevel());
traceDetails.put("subType", consumer.subType() == null ? null : consumer.subType().name());
traceDetails.put("clientHost", hostNameOf(consumer.getClientAddress(), consumer.cnx().clientSourceAddressAndPort()));
traceDetails.put(
"clientHost",
hostNameOf(consumer.getClientAddress(), consumer.cnx().clientSourceAddressAndPort()));

traceDetails.put("metadata", consumer.getMetadata());
traceDetails.put("unackedMessages", consumer.getUnackedMessages());
Expand Down Expand Up @@ -340,7 +340,9 @@ private static void populateProducerDetails(
"topicName", TopicName.get(producer.getTopic().getName()).getPartitionedTopicName());
}

traceDetails.put("clientHost", hostNameOf(producer.getClientAddress(), producer.getCnx().clientSourceAddressAndPort()));
traceDetails.put(
"clientHost",
hostNameOf(producer.getClientAddress(), producer.getCnx().clientSourceAddressAndPort()));

traceDetails.put("metadata", producer.getMetadata());

Expand Down Expand Up @@ -421,17 +423,16 @@ private static void populateEntryDetails(
return;
}

traceDetails.put("messageId", entry.getLedgerId() + ":" + entry.getEntryId());

traceDetails.put("length", entry.getLength());

if (TraceLevel.PAYLOAD == level && entry.getDataBuffer() != null) {
traceMetadataAndPayload(
"payload", entry.getDataBuffer().slice(), traceDetails, maxBinaryDataLength);
"payload", entry.getDataBuffer().retainedDuplicate(), traceDetails, maxBinaryDataLength);
}
}

public static Map<String, Object> getPublishContextDetails(TraceLevel level, Topic.PublishContext publishContext) {
public static Map<String, Object> getPublishContextDetails(
TraceLevel level, Topic.PublishContext publishContext) {
if (publishContext == null) {
return null;
}
Expand All @@ -441,8 +442,8 @@ public static Map<String, Object> getPublishContextDetails(TraceLevel level, Top
return details;
}

private static void populatePublishContext(TraceLevel level,
Topic.PublishContext publishContext, Map<String, Object> traceDetails) {
private static void populatePublishContext(
TraceLevel level, Topic.PublishContext publishContext, Map<String, Object> traceDetails) {
traceDetails.put("sequenceId", publishContext.getSequenceId());
traceDetails.put("entryTimestamp", publishContext.getEntryTimestamp());
traceDetails.put("msgSize", publishContext.getMsgSize());
Expand Down

0 comments on commit db0ad0c

Please sign in to comment.