From 71c75a13e7dffea68616706971838256b6561fad Mon Sep 17 00:00:00 2001
From: sidhdirenge
Date: Fri, 29 Nov 2024 15:40:30 +0530
Subject: [PATCH 01/18] Create new module.
---
cdap-master/pom.xml | 31 ++++-
cdap-messaging-ext-spanner/pom.xml | 110 ++++++++++++++++++
.../spanner/SpannerMessagingService.java | 106 +++++++++++++++++
...o.cdap.cdap.messaging.spi.MessagingService | 17 +++
.../client/DelegatingMessagingService.java | 9 +-
.../guice/MessagingServiceModule.java | 11 +-
pom.xml | 3 +
7 files changed, 278 insertions(+), 9 deletions(-)
create mode 100644 cdap-messaging-ext-spanner/pom.xml
create mode 100644 cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
create mode 100644 cdap-messaging-ext-spanner/src/main/resources/META-INF/services/io.cdap.cdap.messaging.spi.MessagingService
diff --git a/cdap-master/pom.xml b/cdap-master/pom.xml
index 986669b68f0..98bd874cf0d 100644
--- a/cdap-master/pom.xml
+++ b/cdap-master/pom.xml
@@ -108,6 +108,11 @@
cdap-tms
${project.version}
+
+ io.cdap.cdap
+ cdap-messaging-ext-spanner
+ ${project.version}
+
org.apache.tephra
tephra-api
@@ -245,6 +250,7 @@
${stage.opt.dir}/ext/storageproviders
${stage.opt.dir}/ext/authenticators
${stage.opt.dir}/ext/credentialproviders
+ ${stage.opt.dir}/ext/messagingproviders
${stage.opt.dir}/ext/metricswriters/google_cloud_monitoring_writer
${stage.opt.dir}/ext/eventwriters/google_cloud_pubsub_writer
@@ -705,6 +711,28 @@
+
+
+ copy-messaging-ext-spanner
+ process-resources
+
+ copy-resources
+
+
+ ${stage.messaging.ext.dir}/gcp-spanner
+
+
+
+ ${project.parent.basedir}/cdap-messaging-ext-spanner/target/libexec/
+
+
+ *.jar
+
+
+
+
+
+
copy-authenticator-ext-gcp
@@ -713,7 +741,8 @@
copy-resources
- ${stage.authenticators.ext.dir}/gcp-remote-authenticator
+ ${stage.authenticators.ext.dir}/gcp-remote-authenticator
+
diff --git a/cdap-messaging-ext-spanner/pom.xml b/cdap-messaging-ext-spanner/pom.xml
new file mode 100644
index 00000000000..be934b30d2c
--- /dev/null
+++ b/cdap-messaging-ext-spanner/pom.xml
@@ -0,0 +1,110 @@
+
+
+
+
+ 4.0.0
+
+
+ io.cdap.cdap
+ cdap
+ 6.11.0-SNAPSHOT
+
+
+ cdap-messaging-ext-spanner
+ CDAP Google Cloud Spanner Messaging Extension
+ jar
+
+
+ 30.1.1-jre
+ 6.12.2
+
+
+
+
+
+ com.google.cloud
+ libraries-bom
+ 22.0.0
+ pom
+ import
+
+
+
+
+
+
+ io.cdap.cdap
+ cdap-messaging-spi
+ ${project.version}
+ compile
+
+
+ com.google.cloud
+ google-cloud-spanner
+
+
+
+
+
+ dist
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ 2.8
+
+
+ copy-dependencies
+ prepare-package
+
+ copy-dependencies
+
+
+ ${project.build.directory}/libexec
+ false
+ false
+ true
+ true
+ true
+ runtime
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 2.4
+
+
+ jar
+ prepare-package
+
+ ${project.build.directory}/libexec
+ ${project.groupId}.${project.build.finalName}
+
+
+ jar
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
new file mode 100644
index 00000000000..4fa5a9b4db8
--- /dev/null
+++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright © 2016 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.cdap.cdap.messaging.spanner;
+
+import io.cdap.cdap.api.dataset.lib.CloseableIterator;
+import io.cdap.cdap.api.messaging.TopicAlreadyExistsException;
+import io.cdap.cdap.api.messaging.TopicNotFoundException;
+import io.cdap.cdap.messaging.spi.MessageFetchRequest;
+import io.cdap.cdap.messaging.spi.MessagingService;
+import io.cdap.cdap.messaging.spi.MessagingServiceContext;
+import io.cdap.cdap.messaging.spi.RawMessage;
+import io.cdap.cdap.messaging.spi.RollbackDetail;
+import io.cdap.cdap.messaging.spi.StoreRequest;
+import io.cdap.cdap.messaging.spi.TopicMetadata;
+import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.cdap.proto.id.TopicId;
+import io.cdap.cdap.security.spi.authorization.UnauthorizedException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SpannerMessagingService implements MessagingService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SpannerMessagingService.class);
+
+ @Override
+ public void initialize(MessagingServiceContext context) throws IOException {
+ LOG.info("We are here Sidhdi!");
+ }
+
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public void createTopic(TopicMetadata topicMetadata)
+ throws TopicAlreadyExistsException, IOException, UnauthorizedException {
+
+ }
+
+ @Override
+ public void updateTopic(TopicMetadata topicMetadata)
+ throws TopicNotFoundException, IOException, UnauthorizedException {
+
+ }
+
+ @Override
+ public void deleteTopic(TopicId topicId)
+ throws TopicNotFoundException, IOException, UnauthorizedException {
+
+ }
+
+ @Override
+ public Map getTopicMetadataProperties(TopicId topicId)
+ throws TopicNotFoundException, IOException, UnauthorizedException {
+ return null;
+ }
+
+ @Override
+ public List listTopics(NamespaceId namespaceId)
+ throws IOException, UnauthorizedException {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public RollbackDetail publish(StoreRequest request)
+ throws TopicNotFoundException, IOException, UnauthorizedException {
+ return null;
+ }
+
+ @Override
+ public void storePayload(StoreRequest request)
+ throws TopicNotFoundException, IOException, UnauthorizedException {
+
+ }
+
+ @Override
+ public void rollback(TopicId topicId, RollbackDetail rollbackDetail)
+ throws TopicNotFoundException, IOException, UnauthorizedException {
+
+ }
+
+ @Override
+ public CloseableIterator fetch(MessageFetchRequest messageFetchRequest)
+ throws TopicNotFoundException, IOException {
+ return null;
+ }
+}
diff --git a/cdap-messaging-ext-spanner/src/main/resources/META-INF/services/io.cdap.cdap.messaging.spi.MessagingService b/cdap-messaging-ext-spanner/src/main/resources/META-INF/services/io.cdap.cdap.messaging.spi.MessagingService
new file mode 100644
index 00000000000..5b3cec2ec99
--- /dev/null
+++ b/cdap-messaging-ext-spanner/src/main/resources/META-INF/services/io.cdap.cdap.messaging.spi.MessagingService
@@ -0,0 +1,17 @@
+#
+# Copyright © 2022 Cask Data, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+#
+
+io.cdap.cdap.messaging.spanner.SpannerMessagingService
\ No newline at end of file
diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java
index c7969e947eb..93f12a761ba 100644
--- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java
+++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java
@@ -21,10 +21,9 @@
import io.cdap.cdap.api.messaging.TopicAlreadyExistsException;
import io.cdap.cdap.api.messaging.TopicNotFoundException;
import io.cdap.cdap.common.conf.CConfiguration;
-import io.cdap.cdap.common.conf.Constants.MessagingSystem;
import io.cdap.cdap.messaging.spi.MessageFetchRequest;
-import io.cdap.cdap.messaging.spi.MessagingServiceContext;
import io.cdap.cdap.messaging.spi.MessagingService;
+import io.cdap.cdap.messaging.spi.MessagingServiceContext;
import io.cdap.cdap.messaging.spi.RawMessage;
import io.cdap.cdap.messaging.spi.RollbackDetail;
import io.cdap.cdap.messaging.spi.StoreRequest;
@@ -52,6 +51,7 @@ public class DelegatingMessagingService implements MessagingService {
@Inject
public DelegatingMessagingService(
CConfiguration cConf, MessagingServiceExtensionLoader extensionLoader) {
+ LOG.info("Delegating service is initialised");
this.cConf = cConf;
this.extensionLoader = extensionLoader;
}
@@ -80,7 +80,7 @@ public void initialize(MessagingServiceContext context) throws IOException {
@Override
public String getName() {
- return cConf.get(MessagingSystem.MESSAGING_SERVICE_NAME);
+ return "SpannerMessagingService";
}
@Override
@@ -121,6 +121,7 @@ public void rollback(TopicId topicId, RollbackDetail rollbackDetail)
}
private MessagingService getDelegate() {
+ LOG.info("getDelegate() is called.");
MessagingService messagingService = this.delegate;
if (messagingService != null) {
return messagingService;
@@ -138,6 +139,8 @@ private MessagingService getDelegate() {
}
LOG.info("Messaging service {} is loaded", messagingService.getName());
try {
+ LOG.info("Messaging service {} is loaded, now initializing with conf {}",
+ messagingService.getName(), this.cConf);
messagingService.initialize(new DefaultMessagingServiceContext(this.cConf));
} catch (IOException e) {
throw new RuntimeException(e);
diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingServiceModule.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingServiceModule.java
index f358b202eb7..cc7417cae5f 100644
--- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingServiceModule.java
+++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingServiceModule.java
@@ -43,10 +43,11 @@ public MessagingServiceModule(CConfiguration cConf) {
@Override
protected void configure() {
- if (messagingService.equals(DEFAULT_MESSAGING_SERVICE_NAME)) {
- bind(MessagingService.class).to(ClientMessagingService.class).in(Scopes.SINGLETON);
- } else {
- bind(MessagingService.class).to(DelegatingMessagingService.class).in(Scopes.SINGLETON);
- }
+// bind(MessagingService.class).to(SpannerMessagingService.class).in(Scopes.SINGLETON);
+// if (messagingService.equals(DEFAULT_MESSAGING_SERVICE_NAME)) {
+// bind(MessagingService.class).to(ClientMessagingService.class).in(Scopes.SINGLETON);
+// } else {
+ bind(MessagingService.class).to(DelegatingMessagingService.class).in(Scopes.SINGLETON);
+// }
}
}
diff --git a/pom.xml b/pom.xml
index d170b7c2376..2b119aca828 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,6 +48,9 @@
http://www.cdap.io
+
+ cdap-messaging-ext-spanner
+
scm:git:https://github.com/cdapio/cdap.git
From 2dd2c38551a7789aeee434558c04278878b56cad Mon Sep 17 00:00:00 2001
From: sidhdirenge
Date: Fri, 29 Nov 2024 16:14:02 +0530
Subject: [PATCH 02/18] Implement SpannerMessagingService
---
cdap-messaging-ext-spanner/pom.xml | 4 +
.../spanner/SpannerMessagingService.java | 212 +++++++++++++++++-
.../cdap/messaging/spanner/SpannerUtil.java | 56 +++++
.../cdap/messaging/spi/MessagingService.java | 2 +-
.../client/ClientMessagingService.java | 2 +-
.../client/DelegatingMessagingService.java | 26 ++-
.../LeaderElectionMessagingService.java | 2 +-
.../service/CoreMessagingService.java | 2 +-
8 files changed, 289 insertions(+), 17 deletions(-)
create mode 100644 cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java
diff --git a/cdap-messaging-ext-spanner/pom.xml b/cdap-messaging-ext-spanner/pom.xml
index be934b30d2c..82a036f2e85 100644
--- a/cdap-messaging-ext-spanner/pom.xml
+++ b/cdap-messaging-ext-spanner/pom.xml
@@ -55,6 +55,10 @@
com.google.cloud
google-cloud-spanner
+
+ com.google.api
+ gax
+
diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
index 4fa5a9b4db8..6f7676087ce 100644
--- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
+++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
@@ -15,6 +15,22 @@
*/
package io.cdap.cdap.messaging.spanner;
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.cloud.ByteArray;
+import com.google.cloud.spanner.DatabaseAdminClient;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.ErrorCode;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.KeySet;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.Spanner;
+import com.google.cloud.spanner.SpannerException;
+import com.google.cloud.spanner.Value;
+import com.google.common.collect.ImmutableList;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
import io.cdap.cdap.api.dataset.lib.CloseableIterator;
import io.cdap.cdap.api.messaging.TopicAlreadyExistsException;
import io.cdap.cdap.api.messaging.TopicNotFoundException;
@@ -29,8 +45,13 @@
import io.cdap.cdap.proto.id.TopicId;
import io.cdap.cdap.security.spi.authorization.UnauthorizedException;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,9 +60,40 @@ public class SpannerMessagingService implements MessagingService {
private static final Logger LOG = LoggerFactory.getLogger(SpannerMessagingService.class);
+ private Map cConf;
+
+ private Spanner spanner;
+
+ private DatabaseClient client;
+
+ private DatabaseAdminClient adminClient;
+
+ private String instanceId;
+
+ private String databaseId;
+
+ private final ConcurrentLinkedQueue batch = new ConcurrentLinkedQueue<>();
+
+ public static final String PAYLOAD_FIELD = "payload";
+ public static final String PUBLISH_TS_FIELD = "publish_ts";
+ public static final String PAYLOAD_SEQUENCE_ID = "payload_sequence_id";
+ public static final String SEQUENCE_ID_FIELD = "sequence_id";
+ public static final String TOPIC_METADATA_TABLE = "topic_metadata";
+ public static final String TOPIC_ID_FIELD = "topic_id";
+ public static final String PROPERTIES_FIELD = "properties";
+ public static final String NAMESPACE_FIELD = "namespace";
+
@Override
- public void initialize(MessagingServiceContext context) throws IOException {
- LOG.info("We are here Sidhdi!");
+ public void initialize(MessagingServiceContext context) {
+ this.cConf = context.getProperties();
+ this.databaseId = SpannerUtil.getDatabaseID(cConf);
+ this.instanceId = SpannerUtil.getInstanceID(cConf);
+
+ String projectID = SpannerUtil.getProjectID(cConf);
+ this.spanner = SpannerUtil.getSpannerService(projectID);
+ this.client = SpannerUtil.getSpannerDbClient(projectID, instanceId, databaseId, spanner);
+ this.adminClient = SpannerUtil.getSpannerDbAdminClient(spanner);
+ LOG.info("Spanner messaging service started.");
}
@Override
@@ -52,55 +104,201 @@ public String getName() {
@Override
public void createTopic(TopicMetadata topicMetadata)
throws TopicAlreadyExistsException, IOException, UnauthorizedException {
+ LOG.info("Create topic started {}", topicMetadata.getTopicId().getTopic());
+ List ddlStatements = new ArrayList<>();
+ ddlStatements.add(getCreateTopicMetadataDDLStatement());
+ ddlStatements.add(getCreateTopicDDLStatement(topicMetadata.getTopicId()));
+ OperationFuture future = adminClient.updateDatabaseDdl(
+ this.instanceId, this.databaseId, ddlStatements, null);
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error when executing DDL statements", e);
+ throw new IOException(e);
+ }
+
+ Gson gson = new Gson();
+ String jsonString = gson.toJson(topicMetadata.getProperties());
+ Mutation mutation = Mutation.newInsertOrUpdateBuilder(TOPIC_METADATA_TABLE).set(TOPIC_ID_FIELD)
+ .to(topicMetadata.getTopicId().getTopic()).set(PROPERTIES_FIELD).to(Value.json(jsonString))
+ .set(NAMESPACE_FIELD).to(topicMetadata.getTopicId().getNamespace()).build();
+ try {
+ client.write(Collections.singleton(mutation));
+ } catch (SpannerException e) {
+ LOG.error("Cannot commit mutations ", e);
+ throw new IOException(e);
+ }
+ LOG.info("Create topic started {}", topicMetadata.getTopicId().getTopic());
+ }
+
+ private static String getCreateTopicMetadataDDLStatement() {
+ return String.format(
+ "CREATE TABLE IF NOT EXISTS %s ( %s STRING(MAX) NOT NULL, %s JSON ) PRIMARY KEY(%s)",
+ TOPIC_METADATA_TABLE, TOPIC_ID_FIELD, PROPERTIES_FIELD, TOPIC_ID_FIELD);
+ }
+
+ private static String getCreateTopicDDLStatement(TopicId topicId) {
+ return String.format("CREATE TABLE IF NOT EXISTS %s ( %s INT64, %s INT64, %s"
+ + " TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true), %s BYTES(MAX) )"
+ + " PRIMARY KEY (%s, %s, %s), ROW DELETION POLICY"
+ + " (OLDER_THAN(publish_ts, INTERVAL 7 DAY))", getTableName(topicId), SEQUENCE_ID_FIELD,
+ PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD, SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID,
+ PUBLISH_TS_FIELD, PAYLOAD_FIELD);
+ }
+
+ public static String getTableName(TopicId topicId) {
+ return topicId.getNamespace() + topicId.getTopic();
}
@Override
public void updateTopic(TopicMetadata topicMetadata)
throws TopicNotFoundException, IOException, UnauthorizedException {
+ String topicId = topicMetadata.getTopicId().getTopic();
+ Gson gson = new Gson();
+ String jsonString = gson.toJson(topicMetadata.getProperties());
+
+ // Update the topic properties in the TopicMetadata table
+ Mutation mutation = Mutation.newUpdateBuilder(TOPIC_METADATA_TABLE).set(TOPIC_ID_FIELD)
+ .to(topicId).set(PROPERTIES_FIELD).to(Value.json(jsonString)).build();
+
+ try {
+ client.write(Collections.singleton(mutation));
+ } catch (SpannerException e) {
+ if (e.getErrorCode() == ErrorCode.NOT_FOUND) {
+ throw new TopicNotFoundException(topicMetadata.getTopicId().getNamespace(),
+ topicMetadata.getTopicId().getTopic());
+ }
+ LOG.error("Failed to update topic {}", topicId, e);
+ throw new IOException(e);
+ }
}
@Override
public void deleteTopic(TopicId topicId)
throws TopicNotFoundException, IOException, UnauthorizedException {
+ String topicTableName = getTableName(topicId);
+ String deleteTopicTableSQL = String.format("DROP TABLE IF EXISTS %s", topicTableName);
+ OperationFuture future = adminClient.updateDatabaseDdl(
+ this.instanceId, this.databaseId, Collections.singleton(deleteTopicTableSQL), null);
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error when executing DDL statements", e);
+ throw new IOException(e);
+ }
+
+ Mutation mutation = Mutation.delete(TOPIC_METADATA_TABLE, Key.of(topicId.getTopic()));
+ try {
+ client.write(Collections.singletonList(mutation));
+ } catch (SpannerException e) {
+ LOG.error("Unable to delete {} from topic metadata table", topicId.getTopic());
+ throw new IOException(e);
+ }
}
@Override
public Map getTopicMetadataProperties(TopicId topicId)
throws TopicNotFoundException, IOException, UnauthorizedException {
- return null;
+
+ try (ResultSet resultSet = client.singleUse()
+ .read(TOPIC_METADATA_TABLE, KeySet.singleKey(Key.of(topicId.getTopic())),
+ Collections.singletonList(PROPERTIES_FIELD))) {
+ if (resultSet.next()) {
+ String propertiesJson = resultSet.getString(PROPERTIES_FIELD);
+ Gson gson = new Gson();
+ return gson.fromJson(propertiesJson, new TypeToken
+ */
+ private String getCreateTopicDDLStatement(TopicId topicId) {
return String.format("CREATE TABLE IF NOT EXISTS %s ( %s INT64, %s INT64, %s"
+ " TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true), %s BYTES(MAX) )"
+ " PRIMARY KEY (%s, %s, %s), ROW DELETION POLICY" + " (OLDER_THAN(%s, INTERVAL 7 DAY))",
@@ -269,7 +289,19 @@ public List listTopics(NamespaceId namespaceId)
return ImmutableList.copyOf(topics);
}
-
+ /**
+ * Please refer {@link #getCreateTopicDDLStatement(TopicId)} for schema details. Following table
+ * shows how messages would be persisted in the topic tables.
+ *
+ *
+ * TXN sequence_id payload_sequence_id publish_ts payload
+ * TXN1 0 0 ts1 msg1
+ * TXN1 1 0 ts1 msg2_p0
+ * TXN1 1 1 ts1 msg2_p1
+ * TXN2 0 0 ts2 msg3
+ * TXN3 0 0 ts3 msg4
+ *
+ */
@Nullable
@Override
public RollbackDetail publish(StoreRequest request)
@@ -310,6 +342,7 @@ public RollbackDetail publish(StoreRequest request)
}
}
}
+ //TODO: Add a RollbackDetail implementation that throws exceptions if any of the methods is called.
return null;
}
@@ -328,83 +361,6 @@ public void rollback(TopicId topicId, RollbackDetail rollbackDetail)
@Override
public CloseableIterator fetch(MessageFetchRequest messageFetchRequest)
throws TopicNotFoundException, IOException {
- LOG.info("Message Fetch Request {} : {}", messageFetchRequest.getTopicId().getTopic(),
- messageFetchRequest.getStartOffset());
- Long startTime = 0L;
- if (messageFetchRequest.getStartTime() != null) {
- startTime = messageFetchRequest.getStartTime();
- }
- short sequenceId = -1;
- byte[] id = messageFetchRequest.getStartOffset();
- if (id != null) {
- int offset = 0;
- startTime = Bytes.toLong(id, offset);
- offset += Bytes.SIZEOF_LONG;
- sequenceId = Bytes.toShort(id, offset);
- LOG.info("start time : {} sequenceId : {}", startTime, sequenceId);
- }
-
- String sqlStatement = String.format(
- "SELECT %s, %s, UNIX_MICROS(%s), %s FROM %s where (payload_sequence_id>-1"
- + " and publish_ts > TIMESTAMP_MICROS(%s)) or"
- + " (payload_sequence_id>-1 and publish_ts = TIMESTAMP_MICROS(%s) and sequence_id > %s) order by"
- + " publish_ts,sequence_id LIMIT %s", SpannerMessagingService.SEQUENCE_ID_FIELD,
- SpannerMessagingService.PAYLOAD_SEQUENCE_ID, SpannerMessagingService.PUBLISH_TS_FIELD,
- SpannerMessagingService.PAYLOAD_FIELD,
- SpannerMessagingService.getTableName(messageFetchRequest.getTopicId()), startTime,
- startTime, sequenceId, messageFetchRequest.getLimit());
-
- LOG.info("Fetch sql {}", sqlStatement);
- try {
- ResultSet resultSet = client.singleUse().executeQuery(Statement.of(sqlStatement));
- LOG.info("executeQuery called");
- return new SpannerResultSetClosableIterator<>(resultSet);
- } catch (Exception ex) {
- LOG.error("Error when fetching {}", sqlStatement, ex);
- throw new IOException(ex);
- }
- }
-
- public static class SpannerResultSetClosableIterator extends
- AbstractCloseableIterator {
-
- private final ResultSet resultSet;
-
- public SpannerResultSetClosableIterator(ResultSet resultSet) {
- this.resultSet = resultSet;
- }
-
- @Override
- protected io.cdap.cdap.messaging.spi.RawMessage computeNext() {
- if (!resultSet.next()) {
- return endOfData();
- }
-
- byte[] id = getMessageId(resultSet.getLong(0), resultSet.getLong(1), resultSet.getLong(2));
- byte[] payload = resultSet.getBytes(3).toByteArray();
-
- LOG.info("computeNext called");
- return new io.cdap.cdap.messaging.spi.RawMessage.Builder().setId(id).setPayload(payload)
- .build();
- }
-
- @Override
- public void close() {
- resultSet.close();
- }
- }
-
- public static byte[] getMessageId(long sequenceId, long messageSequenceId, long timestamp) {
- LOG.info("sequenceId {} messageSequenceId {} timestamp {}", sequenceId, messageSequenceId,
- timestamp);
- byte[] result = new byte[Bytes.SIZEOF_LONG + Bytes.SIZEOF_SHORT + Bytes.SIZEOF_LONG
- + Bytes.SIZEOF_SHORT];
- int offset = 0;
- // Implementation copied from MessageId
- offset = Bytes.putLong(result, offset, timestamp);
- offset = Bytes.putShort(result, offset, (short) sequenceId);
- offset = Bytes.putLong(result, offset, 0);
- Bytes.putShort(result, offset, (short) messageSequenceId);
- return result;
+ throw new IOException("NOT IMPLEMENTED");
}
}
diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultMessagingServiceContext.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultMessagingServiceContext.java
index b4a931e1a3b..307645fbc4c 100644
--- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultMessagingServiceContext.java
+++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultMessagingServiceContext.java
@@ -44,7 +44,7 @@ public class DefaultMessagingServiceContext implements MessagingServiceContext {
@Override
public Map getProperties() {
- // TODO: cdap-tms module refactoring will remove this dependency on spanner.
+ // TODO: [CDAP-21090] cdap-tms module refactoring will remove this dependency on spanner.
String spannerStoragePropertiesPrefix =
Constants.Dataset.STORAGE_EXTENSION_PROPERTY_PREFIX + storageImpl + ".";
String spannerMessagingPropertiesPrefix = Constants.MessagingSystem.SPANNER_EXTENSION_PROPERTY_PREFIX;
From 86685fbc49e15757e421a9f16348b6d0e7c74eb5 Mon Sep 17 00:00:00 2001
From: sidhdirenge
Date: Tue, 10 Dec 2024 15:29:58 +0530
Subject: [PATCH 17/18] Update property names
---
cdap-common/src/main/resources/cdap-default.xml | 2 +-
.../cdap/cdap/messaging/spanner/SpannerMessagingService.java | 2 +-
.../main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java | 2 +-
.../cdap/messaging/spanner/SpannerMessagingServiceTest.java | 2 +-
pom.xml | 4 +---
5 files changed, 5 insertions(+), 7 deletions(-)
diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml
index 0c40530d7d7..19d514e6cb2 100644
--- a/cdap-common/src/main/resources/cdap-default.xml
+++ b/cdap-common/src/main/resources/cdap-default.xml
@@ -2678,7 +2678,7 @@
- messaging.spanner.properties.publish.poll.millis
+ messaging.spanner.properties.publish.batch.poll.millis
5
The default polling frequency while either max batch size or batch timeout criterion is met.
diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
index 33240336d2b..344190f016b 100644
--- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
+++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
@@ -97,7 +97,7 @@ public void initialize(MessagingServiceContext context) throws IOException {
this.publishBatchSize = Integer.parseInt(cConf.get(SpannerUtil.PUBLISH_BATCH_SIZE));
this.publishBatchTimeoutMillis = Integer.parseInt(
cConf.get(SpannerUtil.PUBLISH_BATCH_TIMEOUT_MILLIS));
- this.publishDelayMillis = Integer.parseInt(cConf.get(SpannerUtil.PUBLISH_DELAY_MILLIS));
+ this.publishDelayMillis = Integer.parseInt(cConf.get(SpannerUtil.PUBLISH_BATCH_POLL_MILLIS));
Spanner spanner = SpannerUtil.getSpannerService(projectID, credentials);
this.client = SpannerUtil.getSpannerDbClient(projectID, instanceId, databaseId, spanner);
diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java
index bd9bc72e851..669d2e00c7f 100644
--- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java
+++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java
@@ -40,7 +40,7 @@ class SpannerUtil {
static final String CREDENTIALS_PATH = "credentials.path";
static final String PUBLISH_BATCH_SIZE = "publish.batch.size";
static final String PUBLISH_BATCH_TIMEOUT_MILLIS = "publish.batch.timeout.millis";
- static final String PUBLISH_DELAY_MILLIS = "publish.delay.millis";
+ static final String PUBLISH_BATCH_POLL_MILLIS = "publish.batch.poll.millis";
static DatabaseClient getSpannerDbClient(String projectID, String instanceID,
String databaseID, Spanner spanner) {
diff --git a/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java b/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java
index 0ab2bc01879..947da79d345 100644
--- a/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java
+++ b/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java
@@ -84,7 +84,7 @@ public static void createSpannerMessagingService() throws Exception {
configs.put(SpannerUtil.CREDENTIALS_PATH, credentialsPath);
}
- configs.put(SpannerUtil.PUBLISH_DELAY_MILLIS, "5");
+ configs.put(SpannerUtil.PUBLISH_BATCH_POLL_MILLIS, "5");
configs.put(SpannerUtil.PUBLISH_BATCH_SIZE, "10");
configs.put(SpannerUtil.PUBLISH_BATCH_TIMEOUT_MILLIS, "10");
MessagingServiceContext context = new MockMessagingServiceContext(configs);
diff --git a/pom.xml b/pom.xml
index 2b119aca828..6b027880504 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,9 +48,6 @@
http://www.cdap.io
-
- cdap-messaging-ext-spanner
-
scm:git:https://github.com/cdapio/cdap.git
@@ -2607,6 +2604,7 @@
cdap-system-app-unit-test
cdap-tms
cdap-messaging-spi
+ cdap-messaging-ext-spanner
cdap-gateway
cdap-unit-test
cdap-unit-test-spark3_2.12
From fbbbb9fa38f18bc15724cca90ded28a1e864c025 Mon Sep 17 00:00:00 2001
From: sidhdirenge
Date: Thu, 12 Dec 2024 14:33:11 +0530
Subject: [PATCH 18/18] Address review comments
---
.../cdap/messaging/spanner/SpannerMessagingService.java | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
index 344190f016b..34b038caf5c 100644
--- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
+++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
@@ -120,7 +120,7 @@ public void createTopic(TopicMetadata topicMetadata)
ddlStatements.add(getCreateTopicDDLStatement(topicMetadata.getTopicId()));
executeCreateDDLStatements(ddlStatements);
updateTopicMetadataTable(Collections.singletonList(topicMetadata));
- LOG.info("Created topic : {}", topicMetadata.getTopicId().getTopic());
+ LOG.trace("Created topic : {}", topicMetadata.getTopicId().getTopic());
}
private void createTopics(List topics) throws IOException {
@@ -310,7 +310,7 @@ public RollbackDetail publish(StoreRequest request)
batch.add(request);
if (!batch.isEmpty()) {
- int i = 0;
+ int sequenceId = 0;
List batchCopy = new ArrayList<>(batch.size());
// We need to batch less than fetch limit since we read for publish_ts >= last_message.publish_ts
// see fetch for explanation of why we read for publish_ts >= last_message.publish_ts and
@@ -318,14 +318,15 @@ public RollbackDetail publish(StoreRequest request)
while (!batch.isEmpty()) {
StoreRequest headRequest = batch.poll();
for (byte[] payload : headRequest) {
+ //TODO: [CDAP-21094] Breakdown messages with larger payload into parts.
Mutation mutation = Mutation.newInsertBuilder(getTableName(headRequest.getTopicId()))
- .set(SEQUENCE_ID_FIELD).to(i++).set(PAYLOAD_SEQUENCE_ID).to(0).set(PUBLISH_TS_FIELD)
+ .set(SEQUENCE_ID_FIELD).to(sequenceId++).set(PAYLOAD_SEQUENCE_ID).to(0).set(PUBLISH_TS_FIELD)
.to("spanner.commit_timestamp()").set(PAYLOAD_FIELD).to(ByteArray.copyFrom(payload))
.build();
batchCopy.add(mutation);
}
- if (batch.isEmpty() && (i < publishBatchSize
+ if (batch.isEmpty() && (sequenceId < publishBatchSize
|| System.currentTimeMillis() - start < publishBatchTimeoutMillis)) {
try {
Thread.sleep(publishDelayMillis);