Skip to content

Commit

Permalink
Address review comments + add unit test.
Browse files Browse the repository at this point in the history
  • Loading branch information
sidhdirenge committed Dec 3, 2024
1 parent cf371b0 commit 0032a5b
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2155,6 +2155,8 @@ public static final class MessagingSystem {

// TMS HBase table attribute that indicates the number of prefix bytes used for the row key
public static final String HBASE_MESSAGING_TABLE_PREFIX_NUM_BYTES = "cdap.messaging.table.prefix.num.bytes";

public static final String SPANNER_EXTENSION_PROPERTY_PREFIX = "messaging.spanner.properties.";
}

/**
Expand Down
36 changes: 33 additions & 3 deletions cdap-common/src/main/resources/cdap-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,10 @@

<property>
<name>twill.jvm.gc.opts</name>
<value>-XX:+UseG1GC -verbose:gc -Xloggc:&lt;LOG_DIR&gt;/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M</value>
<value>-XX:+UseG1GC -verbose:gc -Xloggc:&lt;LOG_DIR&gt;/gc.log -XX:+PrintGCDetails
-XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
-XX:GCLogFileSize=1M
</value>
<activation>
<jdk>(,9)</jdk>
</activation>
Expand Down Expand Up @@ -2661,6 +2664,31 @@
</description>
</property>

<property>
<name>messaging.spanner.properties.publish.batch.size</name>
<value>50</value>
<description>
The default max number of messages in each publish batch for spanner messaging service.
</description>
</property>

<property>
<name>messaging.spanner.properties.publish.batch.timeout.millis</name>
<value>50</value>
<description>
The default maximum waiting time before a batch is published by spanner messaging service.
</description>
</property>

<property>
<name>messaging.spanner.properties.publish.delay.millis</name>
<value>5</value>
<description>
The default waiting time for receiving new publish calls that ensures efficient batching and
timely publishing of messages.
</description>
</property>

<property>
<!-- Use lower heap memory ratio for the messaging service, since it uses non-heap memory for the connections -->
<name>messaging.twill.java.heap.memory.ratio</name>
Expand Down Expand Up @@ -5384,7 +5412,8 @@
<value>true</value>
<description>
Whether user code isolation is enabled in task worker. When enabled, task workers will ensure
only 1 request is accepted at max and later after the task worker is restarted it can take next request.
only 1 request is accepted at max and later after the task worker is restarted it can take
next request.
</description>
</property>

Expand Down Expand Up @@ -6515,7 +6544,8 @@
<name>auditlog.messaging.fetch.size</name>
<value>20</value>
<description>
Number of messages to fetch from messaging system for each batch to consume by Audit log consumer.
Number of messages to fetch from messaging system for each batch to consume by Audit log
consumer.
</description>
</property>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@
public class SpannerMessagingService implements MessagingService {

private static final Logger LOG = LoggerFactory.getLogger(SpannerMessagingService.class);
public static final String PAYLOAD_FIELD = "payload";
public static final String PUBLISH_TS_FIELD = "publish_ts";
public static final String PAYLOAD_SEQUENCE_ID = "payload_sequence_id";
public static final String SEQUENCE_ID_FIELD = "sequence_id";
public static final String TOPIC_METADATA_TABLE = "topic_metadata";
public static final String TOPIC_ID_FIELD = "topic_id";
public static final String PROPERTIES_FIELD = "properties";
public static final String NAMESPACE_FIELD = "namespace";
public static final String TOPIC_TABLE_PREFIX = "messaging";

private DatabaseClient client;

Expand All @@ -69,16 +78,13 @@ public class SpannerMessagingService implements MessagingService {

private String databaseId;

private final ConcurrentLinkedQueue<StoreRequest> batch = new ConcurrentLinkedQueue<>();
private int publishBatchSize;

public static final String PAYLOAD_FIELD = "payload";
public static final String PUBLISH_TS_FIELD = "publish_ts";
public static final String PAYLOAD_SEQUENCE_ID = "payload_sequence_id";
public static final String SEQUENCE_ID_FIELD = "sequence_id";
public static final String TOPIC_METADATA_TABLE = "topic_metadata";
public static final String TOPIC_ID_FIELD = "topic_id";
public static final String PROPERTIES_FIELD = "properties";
public static final String NAMESPACE_FIELD = "namespace";
private int publishBatchTimeoutMillis;

private int publishDelayMillis;

private final ConcurrentLinkedQueue<StoreRequest> batch = new ConcurrentLinkedQueue<>();

@Override
public void initialize(MessagingServiceContext context) throws IOException {
Expand All @@ -88,6 +94,11 @@ public void initialize(MessagingServiceContext context) throws IOException {
String projectID = SpannerUtil.getProjectID(cConf);
Credentials credentials = SpannerUtil.getCredentials(cConf);

this.publishBatchSize = Integer.parseInt(cConf.get(SpannerUtil.PUBLISH_BATCH_SIZE));
this.publishBatchTimeoutMillis = Integer.parseInt(
cConf.get(SpannerUtil.PUBLISH_BATCH_TIMEOUT_MILLIS));
this.publishDelayMillis = Integer.parseInt(cConf.get(SpannerUtil.PUBLISH_DELAY_MILLIS));

Spanner spanner = SpannerUtil.getSpannerService(projectID, credentials);
this.client = SpannerUtil.getSpannerDbClient(projectID, instanceId, databaseId, spanner);
this.adminClient = SpannerUtil.getSpannerDbAdminClient(spanner);
Expand All @@ -111,7 +122,6 @@ public void createTopic(TopicMetadata topicMetadata)
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Failed to create topic {}", topicMetadata.getTopicId().getTopic(), e);
throw new IOException(e);
}

Expand All @@ -124,8 +134,6 @@ public void createTopic(TopicMetadata topicMetadata)
try {
client.write(Collections.singleton(mutation));
} catch (SpannerException e) {
LOG.error("Failed to update topic metadata table for {}",
topicMetadata.getTopicId().getTopic(), e);
throw new IOException(e);
}
LOG.info("Created topic : {}", topicMetadata.getTopicId().getTopic());
Expand All @@ -141,13 +149,13 @@ private static String getCreateTopicDDLStatement(TopicId topicId) {
return String.format("CREATE TABLE IF NOT EXISTS %s ( %s INT64, %s INT64, %s"
+ " TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true), %s BYTES(MAX) )"
+ " PRIMARY KEY (%s, %s, %s), ROW DELETION POLICY"
+ " (OLDER_THAN(publish_ts, INTERVAL 7 DAY))", getTableName(topicId), SEQUENCE_ID_FIELD,
+ " (OLDER_THAN(%s, INTERVAL 7 DAY))", getTableName(topicId), SEQUENCE_ID_FIELD,
PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD, PAYLOAD_FIELD, SEQUENCE_ID_FIELD,
PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD);
PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD, PUBLISH_TS_FIELD);
}

public static String getTableName(TopicId topicId) {
return topicId.getNamespace() + topicId.getTopic();
return String.join("-", TOPIC_TABLE_PREFIX, topicId.getNamespace(), topicId.getTopic());
}

@Override
Expand All @@ -169,7 +177,6 @@ public void updateTopic(TopicMetadata topicMetadata)
throw new TopicNotFoundException(topicMetadata.getTopicId().getNamespace(),
topicMetadata.getTopicId().getTopic());
}
LOG.error("Failed to update topic {}", topicId, e);
throw new IOException(e);
}
}
Expand All @@ -185,15 +192,13 @@ public void deleteTopic(TopicId topicId)
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error when executing DDL statements", e);
throw new IOException(e);
}

Mutation mutation = Mutation.delete(TOPIC_METADATA_TABLE, Key.of(topicId.getTopic()));
try {
client.write(Collections.singletonList(mutation));
} catch (SpannerException e) {
LOG.error("Unable to delete {} from topic metadata table", topicId.getTopic());
throw new IOException(e);
}
}
Expand Down Expand Up @@ -222,13 +227,10 @@ public List<TopicId> listTopics(NamespaceId namespaceId)

List<TopicId> topics = new ArrayList<>();
String namespace = namespaceId.getNamespace();
String topicSQL = String.format(
"SELECT %s FROM %s WHERE %s = '%s'",
TOPIC_ID_FIELD, TOPIC_METADATA_TABLE, NAMESPACE_FIELD, namespace
);
String topicSQL = String.format("SELECT %s FROM %s WHERE %s = '%s'", TOPIC_ID_FIELD,
TOPIC_METADATA_TABLE, NAMESPACE_FIELD, namespace);

try (ResultSet resultSet = client.singleUse().executeQuery(
Statement.of(topicSQL))) {
try (ResultSet resultSet = client.singleUse().executeQuery(Statement.of(topicSQL))) {

while (resultSet.next()) {
String topicId = resultSet.getString(TOPIC_ID_FIELD);
Expand Down Expand Up @@ -262,11 +264,11 @@ public RollbackDetail publish(StoreRequest request)
batchCopy.add(mutation);
}

if (batch.isEmpty() && (i < 50 || System.currentTimeMillis() - start < 50)) {
if (batch.isEmpty() && (i < publishBatchSize
|| System.currentTimeMillis() - start < publishBatchTimeoutMillis)) {
try {
Thread.sleep(5);
Thread.sleep(publishDelayMillis);
} catch (InterruptedException e) {
LOG.error("error during sleep", e);
throw new IOException(e);
}
}
Expand All @@ -275,7 +277,6 @@ public RollbackDetail publish(StoreRequest request)
try {
client.write(batchCopy);
} catch (SpannerException e) {
LOG.error("failed to publish message ", e);
throw new IOException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class SpannerUtil {
static final String INSTANCE = "instance";
static final String DATABASE = "database";
static final String CREDENTIALS_PATH = "credentials.path";
static final String PUBLISH_BATCH_SIZE = "publish.batch.size";
static final String PUBLISH_BATCH_TIMEOUT_MILLIS = "publish.batch.timeout.millis";
static final String PUBLISH_DELAY_MILLIS = "publish.delay.millis";

static DatabaseClient getSpannerDbClient(String projectID, String instanceID,
String databaseID, Spanner spanner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ public class DefaultMessagingServiceContext implements MessagingServiceContext {
@Override
public Map<String, String> getProperties() {
// TODO: cdap-tms module refactoring will remove this dependency on spanner.
String spannerPropertiesPrefix =
String spannerStoragePropertiesPrefix =
Constants.Dataset.STORAGE_EXTENSION_PROPERTY_PREFIX + storageImpl + ".";
Map<String, String> propertiesMap = new HashMap<>(
cConf.getPropsWithPrefix(spannerPropertiesPrefix));
String spannerMessagingPropertiesPrefix = Constants.MessagingSystem.SPANNER_EXTENSION_PROPERTY_PREFIX;
Map<String, String> propertiesMap = new HashMap<>();
propertiesMap.putAll(cConf.getPropsWithPrefix(spannerStoragePropertiesPrefix));
propertiesMap.putAll(cConf.getPropsWithPrefix(spannerMessagingPropertiesPrefix));
return Collections.unmodifiableMap(propertiesMap);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,4 +707,4 @@ public void close() {
closeQuietly(messageTable);
}
}
}
}

0 comments on commit 0032a5b

Please sign in to comment.