Skip to content

Commit

Permalink
ros2: support pub-sub message tracking using rmw layer only (eclipse-…
Browse files Browse the repository at this point in the history
…tracecompass-incubator#24)

Signed-off-by: Christophe Bedard <[email protected]>
  • Loading branch information
christophebedard authored and adel-belkhiri committed Oct 15, 2024
1 parent f4a033b commit 426d83b
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.eclipse.tracecompass.incubator.internal.ros2.core.model.HostProcessPointer;
import org.eclipse.tracecompass.incubator.internal.ros2.core.model.HostThread;
import org.eclipse.tracecompass.incubator.internal.ros2.core.model.objects.Ros2ObjectHandle;
import org.eclipse.tracecompass.incubator.internal.ros2.core.trace.Ros2Trace;
import org.eclipse.tracecompass.incubator.internal.ros2.core.trace.layout.IRos2EventLayout;
import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
import org.eclipse.tracecompass.tmf.core.statesystem.AbstractTmfStateProvider;
import org.eclipse.tracecompass.tmf.core.trace.ITmfTrace;
import org.eclipse.tracecompass.tmf.ctf.core.trace.CtfTmfTrace;
import org.osgi.framework.Version;

/**
* Abstract ROS 2 state provider with some common utilities.
Expand All @@ -34,9 +36,18 @@
*/
public abstract class AbstractRos2StateProvider extends AbstractTmfStateProvider {

/**
* First tracetools version that includes the source_timestamp for a
* published message in the rmw_publish tracepoint. See:
* https://github.com/ros2/ros2_tracing/pull/74.
*/
private static final @NonNull Version RMW_SOURCE_TIMESTAMP_MINIMUM_VERSION = new Version("8.0.0"); //$NON-NLS-1$

/** The event layout */
protected static final IRos2EventLayout LAYOUT = IRos2EventLayout.getDefault();

private final boolean fIsPubSourceTimestampAvailableFromRmw;

/**
* Constructor
*
Expand All @@ -47,6 +58,16 @@ public abstract class AbstractRos2StateProvider extends AbstractTmfStateProvider
*/
protected AbstractRos2StateProvider(ITmfTrace trace, String id) {
super(Objects.requireNonNull(trace), Objects.requireNonNull(id));
// Version in trace needs to be >= the minimum version
fIsPubSourceTimestampAvailableFromRmw = ((Ros2Trace) getTrace()).getTracetoolsVersion().compareTo(RMW_SOURCE_TIMESTAMP_MINIMUM_VERSION) >= 0;
}

/**
* @return whether the source_timestamp value on the publication side is
* available from the rmw layer; if not, it is available from DDS
*/
protected boolean isPubSourceTimestampAvailableFromRmw() {
return fIsPubSourceTimestampAvailableFromRmw;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class Ros2MessagesStateProvider extends AbstractRos2StateProvider {

// Publications
private Collection<HostProcessPointer> fKnownDdsWriters = new ArrayList<>();
private Collection<Ros2ObjectHandle> fKnownRmwPublishers = new ArrayList<>();
private Map<HostProcessPointer, ITmfEvent> fRclcppPublishEvents = Maps.newHashMap();
private Map<HostProcessPointer, ITmfEvent> fRclPublishEvents = Maps.newHashMap();
private Map<HostProcessPointer, ITmfEvent> fDdsWritePreEvents = Maps.newHashMap();
Expand Down Expand Up @@ -116,62 +117,80 @@ protected void eventHandle(@NonNull ITmfEvent event) {
}

eventHandleHelpers(event);
// If we can get the source_timestamp from rmw, don't process DDS events
if (!isPubSourceTimestampAvailableFromRmw()) {
eventHandlePublishDds(event, ss, timestamp);
}
eventHandlePublish(event, ss, timestamp);
eventHandleTake(ss, event, timestamp);
eventHandleCallback(event, ss, timestamp);
}

private void eventHandleHelpers(@NonNull ITmfEvent event) {
// dds:create_writer
if (isEvent(event, LAYOUT.eventDdsCreateWriter())) {
/**
* Many DDS writers are internal to:
*
* <pre>
* (1) the RMW implementation; and
* (2) the DDS implementation.
* </pre>
*
* To simplify processing of DDS write-related events (i.e.,
* dds:write), we keep a list of known/"valid" DDS writers and only
* consider events concerning those DDS writers.
*
* For (1), we can filter out DDS writers using the topic name. Most
* DDS writer topic names have prefixes. These prefixes are
* implementation details and are not part of the actual ROS 2 topic
* name:
*
* <pre>
* * "rt" for publication/subscription topics
* * "rq" and "rr" for service request and service reply topics, respectively
* </pre>
*
* Some DDS writers with a topic name not starting with those
* prefixes are internal to RMW:
*
* <pre>
* * "ros_discovery_info" for internal discovery in RMW (per RMW context)
* </pre>
*
* For (2), we do not actually get dds:create_writer events for
* those DDS writers (at least currently), so just going with an
* allow-list allows us to filter those out.
*/
String topicName = (String) getField(event, LAYOUT.fieldTopicName());
if (!topicName.equals("ros_discovery_info")) { //$NON-NLS-1$
HostProcessPointer writer = hostProcessPointerFrom(event, (long) getField(event, LAYOUT.fieldWriter()));
fKnownDdsWriters.add(writer);
/**
* Many DDS writers and rmw publishers are internal to:
*
* <pre>
* (1) the rmw implementation; and
* (2) the DDS implementation.
* </pre>
*
* To simplify processing of DDS write-related and rmw publish-related
* events (i.e., dds:write, ros2:rmw_publish), we keep a list of
* known/"valid" DDS writers and rmw publishers and only consider events
* concerning those DDS writers or rmw publishers.
*/
if (!isPubSourceTimestampAvailableFromRmw()) {
// dds:create_writer
if (isEvent(event, LAYOUT.eventDdsCreateWriter())) {
/**
* For DDS:
*
* For (1), we can filter out DDS writers using the topic name.
* Most DDS writer topic names have prefixes. These prefixes are
* implementation details and are not part of the actual ROS 2
* topic name:
*
* <pre>
* * "rt" for publication/subscription topics
* * "rq" and "rr" for service request and service reply topics, respectively
* </pre>
*
* Some DDS writers with a topic name not starting with those
* prefixes are internal to rmw:
*
* <pre>
* * "ros_discovery_info" for internal discovery in rmw (per rmw context)
* </pre>
*
* For (2), we do not actually get dds:create_writer events for
* those DDS writers (at least currently), so just going with an
* allow-list allows us to filter those out.
*/
String topicName = (String) getField(event, LAYOUT.fieldTopicName());
if (!topicName.equals("ros_discovery_info")) { //$NON-NLS-1$
HostProcessPointer writer = hostProcessPointerFrom(event, (long) getField(event, LAYOUT.fieldWriter()));
fKnownDdsWriters.add(writer);
}
}
} else {
// rcl_publisher_init
if (isEvent(event, LAYOUT.eventRclPublisherInit())) {
/**
* For rmw:
*
* Build an allow-list of rmw publishers that belong to an rcl
* publisher by looking at ros2:rcl_publisher_init events,
* because then that means they're not implementation details of
* rmw or DDS.
*/
Ros2ObjectHandle rmwPublisher = handleFrom(event, (long) getField(event, LAYOUT.fieldRmwPublisherHandle()));
fKnownRmwPublishers.add(rmwPublisher);
}
}
}

private void eventHandlePublish(@NonNull ITmfEvent event, ITmfStateSystemBuilder ss, long timestamp) {
eventHandlePublishRos2(event);

eventHandlePublishDds(event, ss, timestamp);
}

private void eventHandlePublishRos2(@NonNull ITmfEvent event) {
// rclcpp_publish
if (isEvent(event, LAYOUT.eventRclcppPublish())) {
HostProcessPointer message = hostProcessPointerFrom(event, (long) getField(event, LAYOUT.fieldMessage()));
Expand All @@ -186,7 +205,11 @@ else if (isEvent(event, LAYOUT.eventRclPublish())) {
// Add to temporary map
fRclPublishEvents.put(message, event);
}
// TODO rmw_publish, use rmw-level timestamp
// rmw_publish
// We only need these events if we can get the source_timestamp
else if (isPubSourceTimestampAvailableFromRmw() && isEvent(event, LAYOUT.eventRmwPublish())) {
handleRmwPublish(event, ss, timestamp);
}
}

private void eventHandlePublishDds(@NonNull ITmfEvent event, ITmfStateSystemBuilder ss, long timestamp) {
Expand Down Expand Up @@ -214,7 +237,38 @@ private void eventHandlePublishDdsWrite(@NonNull ITmfEvent event, ITmfStateSyste
if (null == message) {
return;
}
handlePublish(event, ss, timestamp, message);
}

private HostProcessPointer getDdsMessage(@NonNull ITmfEvent event, HostProcessPointer writer) {
/**
* Currently, with the Fast DDS instrumentation, the dds:write event
* does not contain the message/data pointer. We need to get it from a
* separate previous event, dds:write_pre.
*/
if (hasField(event, LAYOUT.fieldData())) {
return hostProcessPointerFrom(event, (long) getField(event, LAYOUT.fieldData()));
}
ITmfEvent ddsWritePre = fDdsWritePreEvents.remove(writer);
if (null == ddsWritePre) {
Activator.getInstance().logError("could not get corresponding dds:write_pre event for writer=0x" + Long.toHexString(writer.getPointer())); //$NON-NLS-1$
return null;
}
return hostProcessPointerFrom(ddsWritePre, (long) getField(ddsWritePre, LAYOUT.fieldData()));
}

private void handleRmwPublish(@NonNull ITmfEvent event, ITmfStateSystemBuilder ss, long timestamp) {
// First check if we know the rmw publisher
Ros2ObjectHandle rmwPublisher = handleFrom(event, (long) getField(event, LAYOUT.fieldRmwPublisherHandle()));
if (!fKnownRmwPublishers.contains(rmwPublisher)) {
return;
}

HostProcessPointer message = hostProcessPointerFrom(event, (long) getField(event, LAYOUT.fieldMessage()));
handlePublish(event, ss, timestamp, message);
}

private void handlePublish(@NonNull ITmfEvent event, ITmfStateSystemBuilder ss, long endPubTimestamp, @NonNull HostProcessPointer message) {
/**
* Get corresponding rcl_publish event, since it's the main r*_publish
* event.
Expand All @@ -225,7 +279,7 @@ private void eventHandlePublishDdsWrite(@NonNull ITmfEvent event, ITmfStateSyste
return;
}
Ros2ObjectHandle publisherHandle = handleFrom(rclPublish, (long) getField(rclPublish, LAYOUT.fieldPublisherHandle()));
Ros2PublisherObject publisherObject = Ros2ObjectsUtil.getPublisherObjectFromHandle(fObjectsSs, timestamp, publisherHandle);
Ros2PublisherObject publisherObject = Ros2ObjectsUtil.getPublisherObjectFromHandle(fObjectsSs, endPubTimestamp, publisherHandle);
if (null == publisherObject) {
/**
* FIXME this happens with publishers for /rosout for some reason.
Expand All @@ -237,31 +291,6 @@ private void eventHandlePublishDdsWrite(@NonNull ITmfEvent event, ITmfStateSyste

// Get corresponding rclcpp_publish event
ITmfEvent rclcppPublish = fRclcppPublishEvents.remove(message);

addPublicationInstance(event, ss, timestamp, publisherObject, message, rclcppPublish);
}

private HostProcessPointer getDdsMessage(@NonNull ITmfEvent event, HostProcessPointer writer) {
/**
* Currently, with the Fast DDS instrumentation, the dds:write event
* does not contain the message/data pointer. We need to get it from a
* separate previous event, dds:write_pre.
*/
if (hasField(event, LAYOUT.fieldData())) {
return hostProcessPointerFrom(event, (long) getField(event, LAYOUT.fieldData()));
}
ITmfEvent ddsWritePre = fDdsWritePreEvents.remove(writer);
if (null == ddsWritePre) {
Activator.getInstance().logError("could not get corresponding dds:write_pre event for writer=0x" + Long.toHexString(writer.getPointer())); //$NON-NLS-1$
return null;
}
return hostProcessPointerFrom(ddsWritePre, (long) getField(ddsWritePre, LAYOUT.fieldData()));
}

private void addPublicationInstance(@NonNull ITmfEvent event, ITmfStateSystemBuilder ss, long timestamp, Ros2PublisherObject publisherObject, @NonNull HostProcessPointer message, ITmfEvent rclcppPublish) {
long sourceTimestamp = (long) getField(event, LAYOUT.fieldTimestamp());
long tid = getTid(event);

/**
* Some rcl_publish events (e.g., for internal rcl-level publisher) do
* not have a corresponding rclcpp_publish event.
Expand All @@ -270,27 +299,34 @@ private void addPublicationInstance(@NonNull ITmfEvent event, ITmfStateSystemBui
// TODO support this, using the rcl_publish timestamp instead
return;
}

long pubTimestamp = rclcppPublish.getTimestamp().toNanos();
long sourceTimestamp = (long) getField(event, LAYOUT.fieldTimestamp());
HostThread thread = hostThreadFrom(event);
Ros2PubInstance pubInstance = new Ros2PubInstance(publisherObject.getHandle(), thread.getTid(), message, sourceTimestamp);
addPublicationInstance(ss, pubTimestamp, endPubTimestamp, thread, publisherObject, pubInstance);
}

Integer pubQuark = Ros2MessagesUtil.getPublisherQuarkAndAdd(ss, fObjectsSs, timestamp, publisherObject.getHandle());
private void addPublicationInstance(ITmfStateSystemBuilder ss, long pubTimestamp, long endPubTimestamp, @NonNull HostThread thread, Ros2PublisherObject publisherObject, @NonNull Ros2PubInstance pubInstance) {
Integer pubQuark = Ros2MessagesUtil.getPublisherQuarkAndAdd(ss, fObjectsSs, endPubTimestamp, publisherObject.getHandle());
if (null == pubQuark) {
return;
}
// Mark publication event using state from rclcpp_pub->dds:write
Ros2PubInstance pubInstance = new Ros2PubInstance(publisherObject.getHandle(), tid, message, sourceTimestamp);
// Mark publication event using state from rclcpp_pub->(dds:write or
// rmw_publish)
ss.modifyAttribute(pubTimestamp, pubInstance, pubQuark);
ss.modifyAttribute(timestamp, null, pubQuark);
ss.modifyAttribute(endPubTimestamp, null, pubQuark);

// Keep pub event for pub-sub links
/**
* TODO match using publisher GID when rmw_cyclonedds correctly supports
* it.
*/
Ros2MessageTimestamp messageSourceTimestamp = new Ros2MessageTimestamp(sourceTimestamp, publisherObject.getTopicName());
fPublications.put(messageSourceTimestamp, new Pair<>(publisherObject.getHandle(), timestamp));
Ros2MessageTimestamp messageSourceTimestamp = new Ros2MessageTimestamp(pubInstance.getSourceTimestamp(), publisherObject.getTopicName());
fPublications.put(messageSourceTimestamp, new Pair<>(publisherObject.getHandle(), endPubTimestamp));

// Add publication to multimap for in-callback links
fCallbackPublications.put(hostThreadFrom(event), new Pair<>(publisherObject.getHandle(), pubTimestamp));
fCallbackPublications.put(thread, new Pair<>(publisherObject.getHandle(), pubTimestamp));
}

private void eventHandleTake(@NonNull ITmfStateSystemBuilder ss, @NonNull ITmfEvent event, long timestamp) {
Expand Down
Loading

0 comments on commit 426d83b

Please sign in to comment.