From 71c75a13e7dffea68616706971838256b6561fad Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Fri, 29 Nov 2024 15:40:30 +0530 Subject: [PATCH 01/18] Create new module. --- cdap-master/pom.xml | 31 ++++- cdap-messaging-ext-spanner/pom.xml | 110 ++++++++++++++++++ .../spanner/SpannerMessagingService.java | 106 +++++++++++++++++ ...o.cdap.cdap.messaging.spi.MessagingService | 17 +++ .../client/DelegatingMessagingService.java | 9 +- .../guice/MessagingServiceModule.java | 11 +- pom.xml | 3 + 7 files changed, 278 insertions(+), 9 deletions(-) create mode 100644 cdap-messaging-ext-spanner/pom.xml create mode 100644 cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java create mode 100644 cdap-messaging-ext-spanner/src/main/resources/META-INF/services/io.cdap.cdap.messaging.spi.MessagingService diff --git a/cdap-master/pom.xml b/cdap-master/pom.xml index 986669b68f0..98bd874cf0d 100644 --- a/cdap-master/pom.xml +++ b/cdap-master/pom.xml @@ -108,6 +108,11 @@ cdap-tms ${project.version} + + io.cdap.cdap + cdap-messaging-ext-spanner + ${project.version} + org.apache.tephra tephra-api @@ -245,6 +250,7 @@ ${stage.opt.dir}/ext/storageproviders ${stage.opt.dir}/ext/authenticators ${stage.opt.dir}/ext/credentialproviders + ${stage.opt.dir}/ext/messagingproviders ${stage.opt.dir}/ext/metricswriters/google_cloud_monitoring_writer ${stage.opt.dir}/ext/eventwriters/google_cloud_pubsub_writer @@ -705,6 +711,28 @@ + + + copy-messaging-ext-spanner + process-resources + + copy-resources + + + ${stage.messaging.ext.dir}/gcp-spanner + + + + ${project.parent.basedir}/cdap-messaging-ext-spanner/target/libexec/ + + + *.jar + + + + + + copy-authenticator-ext-gcp @@ -713,7 +741,8 @@ copy-resources - ${stage.authenticators.ext.dir}/gcp-remote-authenticator + ${stage.authenticators.ext.dir}/gcp-remote-authenticator + diff --git a/cdap-messaging-ext-spanner/pom.xml b/cdap-messaging-ext-spanner/pom.xml new file mode 100644 index 00000000000..be934b30d2c --- /dev/null +++ b/cdap-messaging-ext-spanner/pom.xml @@ -0,0 +1,110 @@ + + + + + 4.0.0 + + + io.cdap.cdap + cdap + 6.11.0-SNAPSHOT + + + cdap-messaging-ext-spanner + CDAP Google Cloud Spanner Messaging Extension + jar + + + 30.1.1-jre + 6.12.2 + + + + + + com.google.cloud + libraries-bom + 22.0.0 + pom + import + + + + + + + io.cdap.cdap + cdap-messaging-spi + ${project.version} + compile + + + com.google.cloud + google-cloud-spanner + + + + + + dist + + + + org.apache.maven.plugins + maven-dependency-plugin + 2.8 + + + copy-dependencies + prepare-package + + copy-dependencies + + + ${project.build.directory}/libexec + false + false + true + true + true + runtime + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + jar + prepare-package + + ${project.build.directory}/libexec + ${project.groupId}.${project.build.finalName} + + + jar + + + + + + + + + \ No newline at end of file diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java new file mode 100644 index 00000000000..4fa5a9b4db8 --- /dev/null +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java @@ -0,0 +1,106 @@ +/* + * Copyright © 2016 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.cdap.messaging.spanner; + +import io.cdap.cdap.api.dataset.lib.CloseableIterator; +import io.cdap.cdap.api.messaging.TopicAlreadyExistsException; +import io.cdap.cdap.api.messaging.TopicNotFoundException; +import io.cdap.cdap.messaging.spi.MessageFetchRequest; +import io.cdap.cdap.messaging.spi.MessagingService; +import io.cdap.cdap.messaging.spi.MessagingServiceContext; +import io.cdap.cdap.messaging.spi.RawMessage; +import io.cdap.cdap.messaging.spi.RollbackDetail; +import io.cdap.cdap.messaging.spi.StoreRequest; +import io.cdap.cdap.messaging.spi.TopicMetadata; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.id.TopicId; +import io.cdap.cdap.security.spi.authorization.UnauthorizedException; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SpannerMessagingService implements MessagingService { + + private static final Logger LOG = LoggerFactory.getLogger(SpannerMessagingService.class); + + @Override + public void initialize(MessagingServiceContext context) throws IOException { + LOG.info("We are here Sidhdi!"); + } + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public void createTopic(TopicMetadata topicMetadata) + throws TopicAlreadyExistsException, IOException, UnauthorizedException { + + } + + @Override + public void updateTopic(TopicMetadata topicMetadata) + throws TopicNotFoundException, IOException, UnauthorizedException { + + } + + @Override + public void deleteTopic(TopicId topicId) + throws TopicNotFoundException, IOException, UnauthorizedException { + + } + + @Override + public Map getTopicMetadataProperties(TopicId topicId) + throws TopicNotFoundException, IOException, UnauthorizedException { + return null; + } + + @Override + public List listTopics(NamespaceId namespaceId) + throws IOException, UnauthorizedException { + return null; + } + + @Nullable + @Override + public RollbackDetail publish(StoreRequest request) + throws TopicNotFoundException, IOException, UnauthorizedException { + return null; + } + + @Override + public void storePayload(StoreRequest request) + throws TopicNotFoundException, IOException, UnauthorizedException { + + } + + @Override + public void rollback(TopicId topicId, RollbackDetail rollbackDetail) + throws TopicNotFoundException, IOException, UnauthorizedException { + + } + + @Override + public CloseableIterator fetch(MessageFetchRequest messageFetchRequest) + throws TopicNotFoundException, IOException { + return null; + } +} diff --git a/cdap-messaging-ext-spanner/src/main/resources/META-INF/services/io.cdap.cdap.messaging.spi.MessagingService b/cdap-messaging-ext-spanner/src/main/resources/META-INF/services/io.cdap.cdap.messaging.spi.MessagingService new file mode 100644 index 00000000000..5b3cec2ec99 --- /dev/null +++ b/cdap-messaging-ext-spanner/src/main/resources/META-INF/services/io.cdap.cdap.messaging.spi.MessagingService @@ -0,0 +1,17 @@ +# +# Copyright © 2022 Cask Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +# + +io.cdap.cdap.messaging.spanner.SpannerMessagingService \ No newline at end of file diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java index c7969e947eb..93f12a761ba 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java @@ -21,10 +21,9 @@ import io.cdap.cdap.api.messaging.TopicAlreadyExistsException; import io.cdap.cdap.api.messaging.TopicNotFoundException; import io.cdap.cdap.common.conf.CConfiguration; -import io.cdap.cdap.common.conf.Constants.MessagingSystem; import io.cdap.cdap.messaging.spi.MessageFetchRequest; -import io.cdap.cdap.messaging.spi.MessagingServiceContext; import io.cdap.cdap.messaging.spi.MessagingService; +import io.cdap.cdap.messaging.spi.MessagingServiceContext; import io.cdap.cdap.messaging.spi.RawMessage; import io.cdap.cdap.messaging.spi.RollbackDetail; import io.cdap.cdap.messaging.spi.StoreRequest; @@ -52,6 +51,7 @@ public class DelegatingMessagingService implements MessagingService { @Inject public DelegatingMessagingService( CConfiguration cConf, MessagingServiceExtensionLoader extensionLoader) { + LOG.info("Delegating service is initialised"); this.cConf = cConf; this.extensionLoader = extensionLoader; } @@ -80,7 +80,7 @@ public void initialize(MessagingServiceContext context) throws IOException { @Override public String getName() { - return cConf.get(MessagingSystem.MESSAGING_SERVICE_NAME); + return "SpannerMessagingService"; } @Override @@ -121,6 +121,7 @@ public void rollback(TopicId topicId, RollbackDetail rollbackDetail) } private MessagingService getDelegate() { + LOG.info("getDelegate() is called."); MessagingService messagingService = this.delegate; if (messagingService != null) { return messagingService; @@ -138,6 +139,8 @@ private MessagingService getDelegate() { } LOG.info("Messaging service {} is loaded", messagingService.getName()); try { + LOG.info("Messaging service {} is loaded, now initializing with conf {}", + messagingService.getName(), this.cConf); messagingService.initialize(new DefaultMessagingServiceContext(this.cConf)); } catch (IOException e) { throw new RuntimeException(e); diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingServiceModule.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingServiceModule.java index f358b202eb7..cc7417cae5f 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingServiceModule.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingServiceModule.java @@ -43,10 +43,11 @@ public MessagingServiceModule(CConfiguration cConf) { @Override protected void configure() { - if (messagingService.equals(DEFAULT_MESSAGING_SERVICE_NAME)) { - bind(MessagingService.class).to(ClientMessagingService.class).in(Scopes.SINGLETON); - } else { - bind(MessagingService.class).to(DelegatingMessagingService.class).in(Scopes.SINGLETON); - } +// bind(MessagingService.class).to(SpannerMessagingService.class).in(Scopes.SINGLETON); +// if (messagingService.equals(DEFAULT_MESSAGING_SERVICE_NAME)) { +// bind(MessagingService.class).to(ClientMessagingService.class).in(Scopes.SINGLETON); +// } else { + bind(MessagingService.class).to(DelegatingMessagingService.class).in(Scopes.SINGLETON); +// } } } diff --git a/pom.xml b/pom.xml index d170b7c2376..2b119aca828 100644 --- a/pom.xml +++ b/pom.xml @@ -48,6 +48,9 @@ http://www.cdap.io + + cdap-messaging-ext-spanner + scm:git:https://github.com/cdapio/cdap.git From 2dd2c38551a7789aeee434558c04278878b56cad Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Fri, 29 Nov 2024 16:14:02 +0530 Subject: [PATCH 02/18] Implement SpannerMessagingService --- cdap-messaging-ext-spanner/pom.xml | 4 + .../spanner/SpannerMessagingService.java | 212 +++++++++++++++++- .../cdap/messaging/spanner/SpannerUtil.java | 56 +++++ .../cdap/messaging/spi/MessagingService.java | 2 +- .../client/ClientMessagingService.java | 2 +- .../client/DelegatingMessagingService.java | 26 ++- .../LeaderElectionMessagingService.java | 2 +- .../service/CoreMessagingService.java | 2 +- 8 files changed, 289 insertions(+), 17 deletions(-) create mode 100644 cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java diff --git a/cdap-messaging-ext-spanner/pom.xml b/cdap-messaging-ext-spanner/pom.xml index be934b30d2c..82a036f2e85 100644 --- a/cdap-messaging-ext-spanner/pom.xml +++ b/cdap-messaging-ext-spanner/pom.xml @@ -55,6 +55,10 @@ com.google.cloud google-cloud-spanner + + com.google.api + gax + diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java index 4fa5a9b4db8..6f7676087ce 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java @@ -15,6 +15,22 @@ */ package io.cdap.cdap.messaging.spanner; +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.ByteArray; +import com.google.cloud.spanner.DatabaseAdminClient; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.ErrorCode; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.KeySet; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.Value; +import com.google.common.collect.ImmutableList; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; import io.cdap.cdap.api.dataset.lib.CloseableIterator; import io.cdap.cdap.api.messaging.TopicAlreadyExistsException; import io.cdap.cdap.api.messaging.TopicNotFoundException; @@ -29,8 +45,13 @@ import io.cdap.cdap.proto.id.TopicId; import io.cdap.cdap.security.spi.authorization.UnauthorizedException; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,9 +60,40 @@ public class SpannerMessagingService implements MessagingService { private static final Logger LOG = LoggerFactory.getLogger(SpannerMessagingService.class); + private Map cConf; + + private Spanner spanner; + + private DatabaseClient client; + + private DatabaseAdminClient adminClient; + + private String instanceId; + + private String databaseId; + + private final ConcurrentLinkedQueue batch = new ConcurrentLinkedQueue<>(); + + public static final String PAYLOAD_FIELD = "payload"; + public static final String PUBLISH_TS_FIELD = "publish_ts"; + public static final String PAYLOAD_SEQUENCE_ID = "payload_sequence_id"; + public static final String SEQUENCE_ID_FIELD = "sequence_id"; + public static final String TOPIC_METADATA_TABLE = "topic_metadata"; + public static final String TOPIC_ID_FIELD = "topic_id"; + public static final String PROPERTIES_FIELD = "properties"; + public static final String NAMESPACE_FIELD = "namespace"; + @Override - public void initialize(MessagingServiceContext context) throws IOException { - LOG.info("We are here Sidhdi!"); + public void initialize(MessagingServiceContext context) { + this.cConf = context.getProperties(); + this.databaseId = SpannerUtil.getDatabaseID(cConf); + this.instanceId = SpannerUtil.getInstanceID(cConf); + + String projectID = SpannerUtil.getProjectID(cConf); + this.spanner = SpannerUtil.getSpannerService(projectID); + this.client = SpannerUtil.getSpannerDbClient(projectID, instanceId, databaseId, spanner); + this.adminClient = SpannerUtil.getSpannerDbAdminClient(spanner); + LOG.info("Spanner messaging service started."); } @Override @@ -52,55 +104,201 @@ public String getName() { @Override public void createTopic(TopicMetadata topicMetadata) throws TopicAlreadyExistsException, IOException, UnauthorizedException { + LOG.info("Create topic started {}", topicMetadata.getTopicId().getTopic()); + List ddlStatements = new ArrayList<>(); + ddlStatements.add(getCreateTopicMetadataDDLStatement()); + ddlStatements.add(getCreateTopicDDLStatement(topicMetadata.getTopicId())); + OperationFuture future = adminClient.updateDatabaseDdl( + this.instanceId, this.databaseId, ddlStatements, null); + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error when executing DDL statements", e); + throw new IOException(e); + } + + Gson gson = new Gson(); + String jsonString = gson.toJson(topicMetadata.getProperties()); + Mutation mutation = Mutation.newInsertOrUpdateBuilder(TOPIC_METADATA_TABLE).set(TOPIC_ID_FIELD) + .to(topicMetadata.getTopicId().getTopic()).set(PROPERTIES_FIELD).to(Value.json(jsonString)) + .set(NAMESPACE_FIELD).to(topicMetadata.getTopicId().getNamespace()).build(); + try { + client.write(Collections.singleton(mutation)); + } catch (SpannerException e) { + LOG.error("Cannot commit mutations ", e); + throw new IOException(e); + } + LOG.info("Create topic started {}", topicMetadata.getTopicId().getTopic()); + } + + private static String getCreateTopicMetadataDDLStatement() { + return String.format( + "CREATE TABLE IF NOT EXISTS %s ( %s STRING(MAX) NOT NULL, %s JSON ) PRIMARY KEY(%s)", + TOPIC_METADATA_TABLE, TOPIC_ID_FIELD, PROPERTIES_FIELD, TOPIC_ID_FIELD); + } + + private static String getCreateTopicDDLStatement(TopicId topicId) { + return String.format("CREATE TABLE IF NOT EXISTS %s ( %s INT64, %s INT64, %s" + + " TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true), %s BYTES(MAX) )" + + " PRIMARY KEY (%s, %s, %s), ROW DELETION POLICY" + + " (OLDER_THAN(publish_ts, INTERVAL 7 DAY))", getTableName(topicId), SEQUENCE_ID_FIELD, + PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD, SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID, + PUBLISH_TS_FIELD, PAYLOAD_FIELD); + } + + public static String getTableName(TopicId topicId) { + return topicId.getNamespace() + topicId.getTopic(); } @Override public void updateTopic(TopicMetadata topicMetadata) throws TopicNotFoundException, IOException, UnauthorizedException { + String topicId = topicMetadata.getTopicId().getTopic(); + Gson gson = new Gson(); + String jsonString = gson.toJson(topicMetadata.getProperties()); + + // Update the topic properties in the TopicMetadata table + Mutation mutation = Mutation.newUpdateBuilder(TOPIC_METADATA_TABLE).set(TOPIC_ID_FIELD) + .to(topicId).set(PROPERTIES_FIELD).to(Value.json(jsonString)).build(); + + try { + client.write(Collections.singleton(mutation)); + } catch (SpannerException e) { + if (e.getErrorCode() == ErrorCode.NOT_FOUND) { + throw new TopicNotFoundException(topicMetadata.getTopicId().getNamespace(), + topicMetadata.getTopicId().getTopic()); + } + LOG.error("Failed to update topic {}", topicId, e); + throw new IOException(e); + } } @Override public void deleteTopic(TopicId topicId) throws TopicNotFoundException, IOException, UnauthorizedException { + String topicTableName = getTableName(topicId); + String deleteTopicTableSQL = String.format("DROP TABLE IF EXISTS %s", topicTableName); + OperationFuture future = adminClient.updateDatabaseDdl( + this.instanceId, this.databaseId, Collections.singleton(deleteTopicTableSQL), null); + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error when executing DDL statements", e); + throw new IOException(e); + } + + Mutation mutation = Mutation.delete(TOPIC_METADATA_TABLE, Key.of(topicId.getTopic())); + try { + client.write(Collections.singletonList(mutation)); + } catch (SpannerException e) { + LOG.error("Unable to delete {} from topic metadata table", topicId.getTopic()); + throw new IOException(e); + } } @Override public Map getTopicMetadataProperties(TopicId topicId) throws TopicNotFoundException, IOException, UnauthorizedException { - return null; + + try (ResultSet resultSet = client.singleUse() + .read(TOPIC_METADATA_TABLE, KeySet.singleKey(Key.of(topicId.getTopic())), + Collections.singletonList(PROPERTIES_FIELD))) { + if (resultSet.next()) { + String propertiesJson = resultSet.getString(PROPERTIES_FIELD); + Gson gson = new Gson(); + return gson.fromJson(propertiesJson, new TypeToken>() { + }.getType()); + } else { + throw new TopicNotFoundException(topicId.getNamespace(), topicId.getTopic()); + } + } } @Override public List listTopics(NamespaceId namespaceId) throws IOException, UnauthorizedException { - return null; + + List topics = new ArrayList<>(); + + try (ResultSet resultSet = client.singleUse() + .read(TOPIC_METADATA_TABLE, KeySet.all(), Arrays.asList(TOPIC_ID_FIELD, NAMESPACE_FIELD))) { + while (resultSet.next()) { + String topicIdString = resultSet.getString(TOPIC_ID_FIELD); + String namespace = resultSet.getString(NAMESPACE_FIELD); + + if (namespace.equals(namespaceId.getNamespace())) { + topics.add(TopicId.fromString(topicIdString)); + } + } + } + return ImmutableList.copyOf(topics); } + @Nullable @Override public RollbackDetail publish(StoreRequest request) throws TopicNotFoundException, IOException, UnauthorizedException { + long start = System.currentTimeMillis(); + + batch.add(request); + if (!batch.isEmpty()) { + int i = 0; + List batchCopy = new ArrayList<>(batch.size()); + // We need to batch less than fetch limit since we read for publish_ts >= last_message.publish_ts + // see fetch for explanation of why we read for publish_ts >= last_message.publish_ts and + // not publish_ts > last_message.publish_ts + while (!batch.isEmpty()) { + StoreRequest headRequest = batch.poll(); + for (byte[] payload : headRequest) { + Mutation mutation = Mutation.newInsertBuilder(getTableName(headRequest.getTopicId())) + .set(SEQUENCE_ID_FIELD).to(i++).set(PAYLOAD_SEQUENCE_ID).to(0).set(PUBLISH_TS_FIELD) + .to("spanner.commit_timestamp()").set(PAYLOAD_FIELD).to(ByteArray.copyFrom(payload)) + .build(); + batchCopy.add(mutation); + } + + if (batch.isEmpty() && (i < 50 || System.currentTimeMillis() - start < 50)) { + try { + Thread.sleep(5); + } catch (InterruptedException e) { + LOG.error("error during sleep", e); + throw new IOException(e); + } + } + } + if (!batchCopy.isEmpty()) { + try { + client.write(batchCopy); + } catch (SpannerException e) { + LOG.error("Cannot commit mutations ", e); + throw new IOException(e); + } + } + + } + return null; } @Override public void storePayload(StoreRequest request) throws TopicNotFoundException, IOException, UnauthorizedException { - + throw new IOException("NOT IMPLEMENTED"); } @Override public void rollback(TopicId topicId, RollbackDetail rollbackDetail) throws TopicNotFoundException, IOException, UnauthorizedException { - + throw new IOException("NOT IMPLEMENTED"); } @Override public CloseableIterator fetch(MessageFetchRequest messageFetchRequest) throws TopicNotFoundException, IOException { - return null; + throw new IOException("NOT IMPLEMENTED"); } } diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java new file mode 100644 index 00000000000..b1f104347e5 --- /dev/null +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java @@ -0,0 +1,56 @@ +/* + * Copyright © 2016 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.messaging.spanner; + +import com.google.cloud.spanner.DatabaseAdminClient; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import java.util.Map; + +/** + * Utility class for spanner messaging service. + */ +class SpannerUtil { + + static DatabaseClient getSpannerDbClient(String projectID, String instanceID, + String databaseID, Spanner spanner) { + DatabaseId db = DatabaseId.of(projectID, instanceID, databaseID); + return spanner.getDatabaseClient(db); + } + + static DatabaseAdminClient getSpannerDbAdminClient(Spanner spanner) { + return spanner.getDatabaseAdminClient(); + } + + static String getInstanceID(Map cConf) { + return cConf.get("instance"); + } + + static String getDatabaseID(Map cConf) { + return cConf.get("database"); + } + + static String getProjectID(Map cConf) { + return cConf.get("database"); + } + + public static Spanner getSpannerService(String projectID) { + return SpannerOptions.newBuilder().setProjectId(projectID).build().getService(); + } +} \ No newline at end of file diff --git a/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingService.java b/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingService.java index f72866ab3a2..5663903ceaf 100644 --- a/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingService.java +++ b/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingService.java @@ -41,7 +41,7 @@ public interface MessagingService { * * @param context the context that can be used to initialize the messaging service. */ - void initialize(MessagingServiceContext context) throws IOException; + void initialize(MessagingServiceContext context); /** * Returns the name of this MessagingService. The name needs to match with the configuration diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java index 270baa84ce1..aabfbec4428 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java @@ -119,7 +119,7 @@ public ClientMessagingService(RemoteClientFactory remoteClientFactory, boolean c } @Override - public void initialize(MessagingServiceContext context) throws IOException { + public void initialize(MessagingServiceContext context) { } @Override diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java index 93f12a761ba..22e01eeafb4 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java @@ -21,6 +21,8 @@ import io.cdap.cdap.api.messaging.TopicAlreadyExistsException; import io.cdap.cdap.api.messaging.TopicNotFoundException; import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.messaging.DefaultTopicMetadata; +import io.cdap.cdap.messaging.MessagingServiceUtils; import io.cdap.cdap.messaging.spi.MessageFetchRequest; import io.cdap.cdap.messaging.spi.MessagingService; import io.cdap.cdap.messaging.spi.MessagingServiceContext; @@ -34,6 +36,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +78,7 @@ public void deleteTopic(TopicId topicId) } @Override - public void initialize(MessagingServiceContext context) throws IOException { + public void initialize(MessagingServiceContext context) { } @Override @@ -138,17 +141,28 @@ private MessagingService getDelegate() { "Unsupported messaging service implementation " + getName()); } LOG.info("Messaging service {} is loaded", messagingService.getName()); + messagingService.initialize(new DefaultMessagingServiceContext(this.cConf)); + LOG.info("Messaging service {} is initialized", messagingService.getName()); try { - LOG.info("Messaging service {} is loaded, now initializing with conf {}", - messagingService.getName(), this.cConf); - messagingService.initialize(new DefaultMessagingServiceContext(this.cConf)); - } catch (IOException e) { + createTopics(cConf, messagingService); + } catch (IOException | TopicAlreadyExistsException e) { throw new RuntimeException(e); } - LOG.info("Messaging service {} is initialized", messagingService.getName()); this.delegate = messagingService; return messagingService; } } + + private void createTopics(CConfiguration cConf, MessagingService messagingService) + throws IOException, TopicAlreadyExistsException { + LOG.info("createAllTopics started."); + // If we implement this at some other place, + // we will need to introduce cdap-tms dependency in that package, which is not recommended. + Set systemTopics = MessagingServiceUtils.getSystemTopics(cConf, true); + for (TopicId topic : systemTopics) { + messagingService.createTopic(new DefaultTopicMetadata(topic)); + } + LOG.info("System topic creation done"); + } } diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingService.java index 6e1a8b8f84e..eccbe834b19 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingService.java @@ -83,7 +83,7 @@ public class LeaderElectionMessagingService extends AbstractIdleService implemen } @Override - public void initialize(MessagingServiceContext context) throws IOException { + public void initialize(MessagingServiceContext context) { } @Override diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java index 16fa1735ed8..f50ffaaaf74 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java @@ -132,7 +132,7 @@ protected CoreMessagingService( } @Override - public void initialize(MessagingServiceContext context) throws IOException { + public void initialize(MessagingServiceContext context) { } @Override From 8c542a52505cba4ca127a1fcbd533d8e9da5fece Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Fri, 29 Nov 2024 17:20:39 +0530 Subject: [PATCH 03/18] Correct sql statement. --- .../spanner/SpannerMessagingService.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java index 6f7676087ce..86cd4aa6cfc 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java @@ -108,6 +108,7 @@ public void createTopic(TopicMetadata topicMetadata) List ddlStatements = new ArrayList<>(); ddlStatements.add(getCreateTopicMetadataDDLStatement()); ddlStatements.add(getCreateTopicDDLStatement(topicMetadata.getTopicId())); + LOG.info("Executing {}", ddlStatements); OperationFuture future = adminClient.updateDatabaseDdl( this.instanceId, this.databaseId, ddlStatements, null); @@ -120,9 +121,11 @@ public void createTopic(TopicMetadata topicMetadata) Gson gson = new Gson(); String jsonString = gson.toJson(topicMetadata.getProperties()); - Mutation mutation = Mutation.newInsertOrUpdateBuilder(TOPIC_METADATA_TABLE).set(TOPIC_ID_FIELD) - .to(topicMetadata.getTopicId().getTopic()).set(PROPERTIES_FIELD).to(Value.json(jsonString)) + Mutation mutation = Mutation.newInsertOrUpdateBuilder(TOPIC_METADATA_TABLE) + .set(TOPIC_ID_FIELD).to(getTableName(topicMetadata.getTopicId())) + .set(PROPERTIES_FIELD).to(Value.json(jsonString)) .set(NAMESPACE_FIELD).to(topicMetadata.getTopicId().getNamespace()).build(); + LOG.info("Insert into table {}", mutation); try { client.write(Collections.singleton(mutation)); } catch (SpannerException e) { @@ -134,8 +137,8 @@ public void createTopic(TopicMetadata topicMetadata) private static String getCreateTopicMetadataDDLStatement() { return String.format( - "CREATE TABLE IF NOT EXISTS %s ( %s STRING(MAX) NOT NULL, %s JSON ) PRIMARY KEY(%s)", - TOPIC_METADATA_TABLE, TOPIC_ID_FIELD, PROPERTIES_FIELD, TOPIC_ID_FIELD); + "CREATE TABLE IF NOT EXISTS %s ( %s STRING(MAX) NOT NULL, %s STRING(MAX), %s JSON ) PRIMARY KEY(%s)", + TOPIC_METADATA_TABLE, TOPIC_ID_FIELD, NAMESPACE_FIELD, PROPERTIES_FIELD, TOPIC_ID_FIELD); } private static String getCreateTopicDDLStatement(TopicId topicId) { @@ -143,8 +146,9 @@ private static String getCreateTopicDDLStatement(TopicId topicId) { + " TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true), %s BYTES(MAX) )" + " PRIMARY KEY (%s, %s, %s), ROW DELETION POLICY" + " (OLDER_THAN(publish_ts, INTERVAL 7 DAY))", getTableName(topicId), SEQUENCE_ID_FIELD, - PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD, SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID, - PUBLISH_TS_FIELD, PAYLOAD_FIELD); + PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD, PAYLOAD_FIELD, SEQUENCE_ID_FIELD, + PAYLOAD_SEQUENCE_ID, + PUBLISH_TS_FIELD); } public static String getTableName(TopicId topicId) { From 803ec72d22225a2ad9e4132a5891644ba9fa2a83 Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Fri, 29 Nov 2024 18:44:19 +0530 Subject: [PATCH 04/18] Try individual create statements --- .../spanner/SpannerMessagingService.java | 81 ++++++++++++++++--- 1 file changed, 69 insertions(+), 12 deletions(-) diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java index 86cd4aa6cfc..cb38b1e0492 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java @@ -105,6 +105,27 @@ public String getName() { public void createTopic(TopicMetadata topicMetadata) throws TopicAlreadyExistsException, IOException, UnauthorizedException { LOG.info("Create topic started {}", topicMetadata.getTopicId().getTopic()); + createTopic(topicMetadata.getTopicId()); + LOG.info("Now try other one individually"); + createMetdataTable(); + LOG.info("second execution done too."); + + LOG.info("Trying insert into table"); + Gson gson = new Gson(); + String jsonString = gson.toJson(topicMetadata.getProperties()); + Mutation mutation = Mutation.newInsertOrUpdateBuilder(TOPIC_METADATA_TABLE) + .set(TOPIC_ID_FIELD).to(getTableName(topicMetadata.getTopicId())) + .set(PROPERTIES_FIELD).to(Value.json(jsonString)) + .set(NAMESPACE_FIELD).to(topicMetadata.getTopicId().getNamespace()).build(); + LOG.info("Insert into table {}", mutation); + try { + client.write(Collections.singleton(mutation)); + } catch (SpannerException e) { + LOG.error("Cannot commit mutations ", e); + throw new IOException(e); + } + + LOG.info("Now trying batching"); List ddlStatements = new ArrayList<>(); ddlStatements.add(getCreateTopicMetadataDDLStatement()); ddlStatements.add(getCreateTopicDDLStatement(topicMetadata.getTopicId())); @@ -119,20 +140,56 @@ public void createTopic(TopicMetadata topicMetadata) throw new IOException(e); } - Gson gson = new Gson(); - String jsonString = gson.toJson(topicMetadata.getProperties()); - Mutation mutation = Mutation.newInsertOrUpdateBuilder(TOPIC_METADATA_TABLE) - .set(TOPIC_ID_FIELD).to(getTableName(topicMetadata.getTopicId())) - .set(PROPERTIES_FIELD).to(Value.json(jsonString)) - .set(NAMESPACE_FIELD).to(topicMetadata.getTopicId().getNamespace()).build(); - LOG.info("Insert into table {}", mutation); +// Gson gson = new Gson(); +// String jsonString = gson.toJson(topicMetadata.getProperties()); +// Mutation mutation = Mutation.newInsertOrUpdateBuilder(TOPIC_METADATA_TABLE) +// .set(TOPIC_ID_FIELD).to(getTableName(topicMetadata.getTopicId())) +// .set(PROPERTIES_FIELD).to(Value.json(jsonString)) +// .set(NAMESPACE_FIELD).to(topicMetadata.getTopicId().getNamespace()).build(); +// LOG.info("Insert into table {}", mutation); +// try { +// client.write(Collections.singleton(mutation)); +// } catch (SpannerException e) { +// LOG.error("Cannot commit mutations ", e); +// throw new IOException(e); +// } + LOG.info("Create topic done {}", topicMetadata.getTopicId().getTopic()); + } + + private void createTopic(TopicId topicId) { + String topicSQL = + String.format( + "CREATE TABLE IF NOT EXISTS %s ( %s INT64, %s INT64, %s" + + " TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true), %s BYTES(MAX) )" + + " PRIMARY KEY (sequence_id, payload_sequence_id, publish_ts), ROW DELETION POLICY" + + " (OLDER_THAN(publish_ts, INTERVAL 7 DAY))", + getTableName(topicId), + SEQUENCE_ID_FIELD, + PAYLOAD_SEQUENCE_ID, + PUBLISH_TS_FIELD, + PAYLOAD_FIELD); + OperationFuture future = + adminClient.updateDatabaseDdl( + instanceId, databaseId, Collections.singletonList(topicSQL), null); try { - client.write(Collections.singleton(mutation)); - } catch (SpannerException e) { - LOG.error("Cannot commit mutations ", e); - throw new IOException(e); + future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error when executing {}", topicSQL, e); + } + } + + private void createMetdataTable() { + String sql = String.format( + "CREATE TABLE IF NOT EXISTS %s ( %s STRING(MAX) NOT NULL, %s STRING(MAX), %s JSON ) PRIMARY KEY(%s)", + TOPIC_METADATA_TABLE, TOPIC_ID_FIELD, NAMESPACE_FIELD, PROPERTIES_FIELD, TOPIC_ID_FIELD); + OperationFuture future = + adminClient.updateDatabaseDdl( + instanceId, databaseId, Collections.singletonList(sql), null); + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error when executing {}", sql, e); } - LOG.info("Create topic started {}", topicMetadata.getTopicId().getTopic()); } private static String getCreateTopicMetadataDDLStatement() { From 56fb3e81ed755d1b786b329acbbadf58e2552a47 Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Fri, 29 Nov 2024 19:17:16 +0530 Subject: [PATCH 05/18] Correct project id property name --- .../main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java index b1f104347e5..d96e8cd93b3 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java @@ -47,7 +47,7 @@ static String getDatabaseID(Map cConf) { } static String getProjectID(Map cConf) { - return cConf.get("database"); + return cConf.get("project"); } public static Spanner getSpannerService(String projectID) { From 3270a991e327cd03b3d16e3fb2cd11cd8cadd867 Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Mon, 2 Dec 2024 20:35:02 +0530 Subject: [PATCH 06/18] Add junits --- cdap-messaging-ext-spanner/pom.xml | 4 + .../spanner/SpannerMessagingService.java | 128 ++++--------- .../cdap/messaging/spanner/SpannerUtil.java | 48 ++++- .../spanner/SpannerMessagingServiceTest.java | 173 ++++++++++++++++++ .../src/test/resources/logback-test.xml | 37 ++++ .../cdap/messaging/spi/MessagingService.java | 2 +- .../client/ClientMessagingService.java | 2 +- .../client/DelegatingMessagingService.java | 12 +- .../LeaderElectionMessagingService.java | 2 +- .../guice/MessagingServiceModule.java | 11 +- .../service/CoreMessagingService.java | 45 +++-- 11 files changed, 331 insertions(+), 133 deletions(-) create mode 100644 cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java create mode 100644 cdap-messaging-ext-spanner/src/test/resources/logback-test.xml diff --git a/cdap-messaging-ext-spanner/pom.xml b/cdap-messaging-ext-spanner/pom.xml index 82a036f2e85..662ba97c961 100644 --- a/cdap-messaging-ext-spanner/pom.xml +++ b/cdap-messaging-ext-spanner/pom.xml @@ -59,6 +59,10 @@ com.google.api gax + + junit + junit + diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java index cb38b1e0492..e5dfccadb69 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java @@ -16,6 +16,7 @@ package io.cdap.cdap.messaging.spanner; import com.google.api.gax.longrunning.OperationFuture; +import com.google.auth.Credentials; import com.google.cloud.ByteArray; import com.google.cloud.spanner.DatabaseAdminClient; import com.google.cloud.spanner.DatabaseClient; @@ -26,6 +27,7 @@ import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Value; import com.google.common.collect.ImmutableList; import com.google.gson.Gson; @@ -46,7 +48,6 @@ import io.cdap.cdap.security.spi.authorization.UnauthorizedException; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -60,10 +61,6 @@ public class SpannerMessagingService implements MessagingService { private static final Logger LOG = LoggerFactory.getLogger(SpannerMessagingService.class); - private Map cConf; - - private Spanner spanner; - private DatabaseClient client; private DatabaseAdminClient adminClient; @@ -84,13 +81,14 @@ public class SpannerMessagingService implements MessagingService { public static final String NAMESPACE_FIELD = "namespace"; @Override - public void initialize(MessagingServiceContext context) { - this.cConf = context.getProperties(); + public void initialize(MessagingServiceContext context) throws IOException { + Map cConf = context.getProperties(); this.databaseId = SpannerUtil.getDatabaseID(cConf); this.instanceId = SpannerUtil.getInstanceID(cConf); - String projectID = SpannerUtil.getProjectID(cConf); - this.spanner = SpannerUtil.getSpannerService(projectID); + Credentials credentials = SpannerUtil.getCredentials(cConf); + + Spanner spanner = SpannerUtil.getSpannerService(projectID, credentials); this.client = SpannerUtil.getSpannerDbClient(projectID, instanceId, databaseId, spanner); this.adminClient = SpannerUtil.getSpannerDbAdminClient(spanner); LOG.info("Spanner messaging service started."); @@ -104,92 +102,33 @@ public String getName() { @Override public void createTopic(TopicMetadata topicMetadata) throws TopicAlreadyExistsException, IOException, UnauthorizedException { - LOG.info("Create topic started {}", topicMetadata.getTopicId().getTopic()); - createTopic(topicMetadata.getTopicId()); - LOG.info("Now try other one individually"); - createMetdataTable(); - LOG.info("second execution done too."); - - LOG.info("Trying insert into table"); - Gson gson = new Gson(); - String jsonString = gson.toJson(topicMetadata.getProperties()); - Mutation mutation = Mutation.newInsertOrUpdateBuilder(TOPIC_METADATA_TABLE) - .set(TOPIC_ID_FIELD).to(getTableName(topicMetadata.getTopicId())) - .set(PROPERTIES_FIELD).to(Value.json(jsonString)) - .set(NAMESPACE_FIELD).to(topicMetadata.getTopicId().getNamespace()).build(); - LOG.info("Insert into table {}", mutation); - try { - client.write(Collections.singleton(mutation)); - } catch (SpannerException e) { - LOG.error("Cannot commit mutations ", e); - throw new IOException(e); - } - - LOG.info("Now trying batching"); List ddlStatements = new ArrayList<>(); ddlStatements.add(getCreateTopicMetadataDDLStatement()); ddlStatements.add(getCreateTopicDDLStatement(topicMetadata.getTopicId())); - LOG.info("Executing {}", ddlStatements); OperationFuture future = adminClient.updateDatabaseDdl( this.instanceId, this.databaseId, ddlStatements, null); try { future.get(); } catch (InterruptedException | ExecutionException e) { - LOG.error("Error when executing DDL statements", e); + LOG.error("Failed to create topic {}", topicMetadata.getTopicId().getTopic(), e); throw new IOException(e); } -// Gson gson = new Gson(); -// String jsonString = gson.toJson(topicMetadata.getProperties()); -// Mutation mutation = Mutation.newInsertOrUpdateBuilder(TOPIC_METADATA_TABLE) -// .set(TOPIC_ID_FIELD).to(getTableName(topicMetadata.getTopicId())) -// .set(PROPERTIES_FIELD).to(Value.json(jsonString)) -// .set(NAMESPACE_FIELD).to(topicMetadata.getTopicId().getNamespace()).build(); -// LOG.info("Insert into table {}", mutation); -// try { -// client.write(Collections.singleton(mutation)); -// } catch (SpannerException e) { -// LOG.error("Cannot commit mutations ", e); -// throw new IOException(e); -// } - LOG.info("Create topic done {}", topicMetadata.getTopicId().getTopic()); - } - - private void createTopic(TopicId topicId) { - String topicSQL = - String.format( - "CREATE TABLE IF NOT EXISTS %s ( %s INT64, %s INT64, %s" - + " TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true), %s BYTES(MAX) )" - + " PRIMARY KEY (sequence_id, payload_sequence_id, publish_ts), ROW DELETION POLICY" - + " (OLDER_THAN(publish_ts, INTERVAL 7 DAY))", - getTableName(topicId), - SEQUENCE_ID_FIELD, - PAYLOAD_SEQUENCE_ID, - PUBLISH_TS_FIELD, - PAYLOAD_FIELD); - OperationFuture future = - adminClient.updateDatabaseDdl( - instanceId, databaseId, Collections.singletonList(topicSQL), null); - try { - future.get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Error when executing {}", topicSQL, e); - } - } - - private void createMetdataTable() { - String sql = String.format( - "CREATE TABLE IF NOT EXISTS %s ( %s STRING(MAX) NOT NULL, %s STRING(MAX), %s JSON ) PRIMARY KEY(%s)", - TOPIC_METADATA_TABLE, TOPIC_ID_FIELD, NAMESPACE_FIELD, PROPERTIES_FIELD, TOPIC_ID_FIELD); - OperationFuture future = - adminClient.updateDatabaseDdl( - instanceId, databaseId, Collections.singletonList(sql), null); + Gson gson = new Gson(); + String jsonString = gson.toJson(topicMetadata.getProperties()); + Mutation mutation = Mutation.newInsertOrUpdateBuilder(TOPIC_METADATA_TABLE).set(TOPIC_ID_FIELD) + .to(getTableName(topicMetadata.getTopicId())).set(PROPERTIES_FIELD) + .to(Value.json(jsonString)).set(NAMESPACE_FIELD) + .to(topicMetadata.getTopicId().getNamespace()).build(); try { - future.get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Error when executing {}", sql, e); + client.write(Collections.singleton(mutation)); + } catch (SpannerException e) { + LOG.error("Failed to update topic metadata table for {}", + topicMetadata.getTopicId().getTopic(), e); + throw new IOException(e); } + LOG.info("Created topic : {}", topicMetadata.getTopicId().getTopic()); } private static String getCreateTopicMetadataDDLStatement() { @@ -204,8 +143,7 @@ private static String getCreateTopicDDLStatement(TopicId topicId) { + " PRIMARY KEY (%s, %s, %s), ROW DELETION POLICY" + " (OLDER_THAN(publish_ts, INTERVAL 7 DAY))", getTableName(topicId), SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD, PAYLOAD_FIELD, SEQUENCE_ID_FIELD, - PAYLOAD_SEQUENCE_ID, - PUBLISH_TS_FIELD); + PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD); } public static String getTableName(TopicId topicId) { @@ -265,10 +203,10 @@ public Map getTopicMetadataProperties(TopicId topicId) throws TopicNotFoundException, IOException, UnauthorizedException { try (ResultSet resultSet = client.singleUse() - .read(TOPIC_METADATA_TABLE, KeySet.singleKey(Key.of(topicId.getTopic())), + .read(TOPIC_METADATA_TABLE, KeySet.singleKey(Key.of(getTableName(topicId))), Collections.singletonList(PROPERTIES_FIELD))) { if (resultSet.next()) { - String propertiesJson = resultSet.getString(PROPERTIES_FIELD); + String propertiesJson = resultSet.getJson(PROPERTIES_FIELD); Gson gson = new Gson(); return gson.fromJson(propertiesJson, new TypeToken>() { }.getType()); @@ -283,16 +221,18 @@ public List listTopics(NamespaceId namespaceId) throws IOException, UnauthorizedException { List topics = new ArrayList<>(); + String namespace = namespaceId.getNamespace(); + String topicSQL = String.format( + "SELECT %s FROM %s WHERE %s = '%s'", + TOPIC_ID_FIELD, TOPIC_METADATA_TABLE, NAMESPACE_FIELD, namespace + ); - try (ResultSet resultSet = client.singleUse() - .read(TOPIC_METADATA_TABLE, KeySet.all(), Arrays.asList(TOPIC_ID_FIELD, NAMESPACE_FIELD))) { - while (resultSet.next()) { - String topicIdString = resultSet.getString(TOPIC_ID_FIELD); - String namespace = resultSet.getString(NAMESPACE_FIELD); + try (ResultSet resultSet = client.singleUse().executeQuery( + Statement.of(topicSQL))) { - if (namespace.equals(namespaceId.getNamespace())) { - topics.add(TopicId.fromString(topicIdString)); - } + while (resultSet.next()) { + String topicId = resultSet.getString(TOPIC_ID_FIELD); + topics.add(new TopicId(namespace, topicId)); } } return ImmutableList.copyOf(topics); @@ -339,9 +279,7 @@ public RollbackDetail publish(StoreRequest request) throw new IOException(e); } } - } - return null; } diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java index d96e8cd93b3..c9e6810ea6a 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java @@ -16,11 +16,17 @@ package io.cdap.cdap.messaging.spanner; +import com.google.auth.Credentials; +import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.spanner.DatabaseAdminClient; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.DatabaseId; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Map; /** @@ -28,6 +34,11 @@ */ class SpannerUtil { + static final String PROJECT = "project"; + static final String INSTANCE = "instance"; + static final String DATABASE = "database"; + static final String CREDENTIALS_PATH = "credentials.path"; + static DatabaseClient getSpannerDbClient(String projectID, String instanceID, String databaseID, Spanner spanner) { DatabaseId db = DatabaseId.of(projectID, instanceID, databaseID); @@ -39,18 +50,45 @@ static DatabaseAdminClient getSpannerDbAdminClient(Spanner spanner) { } static String getInstanceID(Map cConf) { - return cConf.get("instance"); + String instance = cConf.get(INSTANCE); + if (instance == null) { + throw new IllegalArgumentException("Missing configuration " + PROJECT); + } + return instance; } static String getDatabaseID(Map cConf) { - return cConf.get("database"); + String instance = cConf.get(DATABASE); + if (instance == null) { + throw new IllegalArgumentException("Missing configuration " + DATABASE); + } + return cConf.get(DATABASE); } static String getProjectID(Map cConf) { - return cConf.get("project"); + String instance = cConf.get(PROJECT); + if (instance == null) { + throw new IllegalArgumentException("Missing configuration " + PROJECT); + } + return cConf.get(PROJECT); + } + + static Credentials getCredentials(Map cConf) throws IOException { + String credentialsPath = cConf.get(CREDENTIALS_PATH); + if (credentialsPath != null) { + try (InputStream is = Files.newInputStream(Paths.get(credentialsPath))) { + return ServiceAccountCredentials.fromStream(is); + } + } + return null; } - public static Spanner getSpannerService(String projectID) { - return SpannerOptions.newBuilder().setProjectId(projectID).build().getService(); + public static Spanner getSpannerService(String projectID, Credentials credentials) { + SpannerOptions.Builder builder = SpannerOptions.newBuilder().setProjectId(projectID); + if (credentials != null) { + builder.setCredentials(credentials); + } + + return builder.build().getService(); } } \ No newline at end of file diff --git a/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java b/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java new file mode 100644 index 00000000000..de0cbdccf3b --- /dev/null +++ b/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java @@ -0,0 +1,173 @@ +/* + * Copyright © 2016 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.cdap.messaging.spanner; + + +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.messaging.TopicNotFoundException; +import io.cdap.cdap.messaging.spi.MessagingServiceContext; +import io.cdap.cdap.messaging.spi.TopicMetadata; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.id.TopicId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +public class SpannerMessagingServiceTest { + + private static SpannerMessagingService service; + + private static SpannerTopicMetadata SIMPLE_TOPIC = new SpannerTopicMetadata( + new TopicId("system", "topic1"), new HashMap<>()); + + private static SpannerTopicMetadata TOPIC_WITH_PROPERTIES = new SpannerTopicMetadata( + new TopicId("system", "topic2"), new HashMap() { + { + put("key1", "value1"); + put("key2", "value2"); + } + }); + + @BeforeClass + public static void createSpannerMessagingService() throws Exception { + String project = System.getProperty("gcp.project"); + String instance = System.getProperty("gcp.spanner.instance"); + String database = System.getProperty("gcp.spanner.database"); + String credentialsPath = System.getProperty("gcp.credentials.path"); + + // GCP project, instance, and database must be provided + Assume.assumeNotNull(project, instance, database); + + Map configs = new HashMap<>(); + configs.put(SpannerUtil.PROJECT, project); + configs.put(SpannerUtil.INSTANCE, instance); + configs.put(SpannerUtil.DATABASE, database); + + if (credentialsPath != null) { + configs.put(SpannerUtil.CREDENTIALS_PATH, credentialsPath); + } + + MessagingServiceContext context = new MockMessagingServiceContext(configs); + + service = new SpannerMessagingService(); + service.initialize(context); + } + + @After + public void cleanUp() throws Exception { + try { + service.deleteTopic(SIMPLE_TOPIC.getTopicId()); + service.deleteTopic(TOPIC_WITH_PROPERTIES.getTopicId()); + } catch (TopicNotFoundException e) { + // no-op + } + } + + @Test + public void testCreateTopic() throws Exception { + service.createTopic(SIMPLE_TOPIC); + Assert.assertEquals(SIMPLE_TOPIC.getProperties(), + service.getTopicMetadataProperties(SIMPLE_TOPIC.getTopicId())); + } + + @Test + public void testCreateTopicWithProperties() throws Exception { + service.createTopic(TOPIC_WITH_PROPERTIES); + Assert.assertEquals(TOPIC_WITH_PROPERTIES.getProperties(), + service.getTopicMetadataProperties(TOPIC_WITH_PROPERTIES.getTopicId())); + } + + @Test(expected = TopicNotFoundException.class) + public void testGetMetadataProperties() throws Exception { + TopicId topicId = new TopicId("system", "invalid"); + service.getTopicMetadataProperties(topicId); + } + + @Test + public void testListTopics() throws Exception { + service.createTopic(SIMPLE_TOPIC); + service.createTopic(TOPIC_WITH_PROPERTIES); + List topics = service.listTopics(new NamespaceId("system")); + Assert.assertEquals(new ArrayList<>(Arrays.asList( + new TopicId("system", SpannerMessagingService.getTableName(SIMPLE_TOPIC.getTopicId())), + new TopicId("system", + SpannerMessagingService.getTableName(TOPIC_WITH_PROPERTIES.getTopicId())))), topics); + } + + @Test + public void testListTopicsEmptyNamespace() throws Exception { + List topics = service.listTopics(new NamespaceId("namespace")); + Assert.assertEquals(new ArrayList<>(), topics); + } + + private static final class MockMessagingServiceContext implements MessagingServiceContext { + + private final Map config; + + MockMessagingServiceContext(Map config) { + this.config = config; + } + + @Override + public Map getProperties() { + return config; + } + } + + private static final class SpannerTopicMetadata implements TopicMetadata { + + private final TopicId topicId; + private final Map properties; + + public SpannerTopicMetadata(TopicId topicId, Map properties) { + this.topicId = topicId; + this.properties = ImmutableMap.copyOf(properties); + } + + @Override + public TopicId getTopicId() { + return topicId; + } + + @Override + public Map getProperties() { + return properties; + } + + @Override + public int getGeneration() { + return 0; + } + + @Override + public boolean exists() { + return false; + } + + @Override + public long getTTL() { + return 0; + } + } + +} \ No newline at end of file diff --git a/cdap-messaging-ext-spanner/src/test/resources/logback-test.xml b/cdap-messaging-ext-spanner/src/test/resources/logback-test.xml new file mode 100644 index 00000000000..f62176a7d9a --- /dev/null +++ b/cdap-messaging-ext-spanner/src/test/resources/logback-test.xml @@ -0,0 +1,37 @@ + + + + + + + + + + + + + + + %d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n + + + + + + + + diff --git a/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingService.java b/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingService.java index 5663903ceaf..f72866ab3a2 100644 --- a/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingService.java +++ b/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingService.java @@ -41,7 +41,7 @@ public interface MessagingService { * * @param context the context that can be used to initialize the messaging service. */ - void initialize(MessagingServiceContext context); + void initialize(MessagingServiceContext context) throws IOException; /** * Returns the name of this MessagingService. The name needs to match with the configuration diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java index aabfbec4428..270baa84ce1 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java @@ -119,7 +119,7 @@ public ClientMessagingService(RemoteClientFactory remoteClientFactory, boolean c } @Override - public void initialize(MessagingServiceContext context) { + public void initialize(MessagingServiceContext context) throws IOException { } @Override diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java index 22e01eeafb4..c179da2bc4d 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java @@ -21,6 +21,7 @@ import io.cdap.cdap.api.messaging.TopicAlreadyExistsException; import io.cdap.cdap.api.messaging.TopicNotFoundException; import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants.MessagingSystem; import io.cdap.cdap.messaging.DefaultTopicMetadata; import io.cdap.cdap.messaging.MessagingServiceUtils; import io.cdap.cdap.messaging.spi.MessageFetchRequest; @@ -78,12 +79,12 @@ public void deleteTopic(TopicId topicId) } @Override - public void initialize(MessagingServiceContext context) { + public void initialize(MessagingServiceContext context) throws IOException { } @Override public String getName() { - return "SpannerMessagingService"; + return cConf.get(MessagingSystem.MESSAGING_SERVICE_NAME); } @Override @@ -141,13 +142,14 @@ private MessagingService getDelegate() { "Unsupported messaging service implementation " + getName()); } LOG.info("Messaging service {} is loaded", messagingService.getName()); - messagingService.initialize(new DefaultMessagingServiceContext(this.cConf)); - LOG.info("Messaging service {} is initialized", messagingService.getName()); try { + messagingService.initialize(new DefaultMessagingServiceContext(this.cConf)); createTopics(cConf, messagingService); } catch (IOException | TopicAlreadyExistsException e) { throw new RuntimeException(e); } + LOG.info("Messaging service {} is initialized and system topics are created.", + messagingService.getName()); this.delegate = messagingService; return messagingService; @@ -156,13 +158,11 @@ private MessagingService getDelegate() { private void createTopics(CConfiguration cConf, MessagingService messagingService) throws IOException, TopicAlreadyExistsException { - LOG.info("createAllTopics started."); // If we implement this at some other place, // we will need to introduce cdap-tms dependency in that package, which is not recommended. Set systemTopics = MessagingServiceUtils.getSystemTopics(cConf, true); for (TopicId topic : systemTopics) { messagingService.createTopic(new DefaultTopicMetadata(topic)); } - LOG.info("System topic creation done"); } } diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingService.java index eccbe834b19..6e1a8b8f84e 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingService.java @@ -83,7 +83,7 @@ public class LeaderElectionMessagingService extends AbstractIdleService implemen } @Override - public void initialize(MessagingServiceContext context) { + public void initialize(MessagingServiceContext context) throws IOException { } @Override diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingServiceModule.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingServiceModule.java index cc7417cae5f..f358b202eb7 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingServiceModule.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/guice/MessagingServiceModule.java @@ -43,11 +43,10 @@ public MessagingServiceModule(CConfiguration cConf) { @Override protected void configure() { -// bind(MessagingService.class).to(SpannerMessagingService.class).in(Scopes.SINGLETON); -// if (messagingService.equals(DEFAULT_MESSAGING_SERVICE_NAME)) { -// bind(MessagingService.class).to(ClientMessagingService.class).in(Scopes.SINGLETON); -// } else { - bind(MessagingService.class).to(DelegatingMessagingService.class).in(Scopes.SINGLETON); -// } + if (messagingService.equals(DEFAULT_MESSAGING_SERVICE_NAME)) { + bind(MessagingService.class).to(ClientMessagingService.class).in(Scopes.SINGLETON); + } else { + bind(MessagingService.class).to(DelegatingMessagingService.class).in(Scopes.SINGLETON); + } } } diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java index f50ffaaaf74..29bcc8aa647 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java @@ -132,7 +132,7 @@ protected CoreMessagingService( } @Override - public void initialize(MessagingServiceContext context) { + public void initialize(MessagingServiceContext context) throws IOException { } @Override @@ -249,8 +249,8 @@ protected void startUp() throws Exception { } /** - * Creates the given topic if it is not yet created. Adds a topic to the {@code - * creationFailureTopics} if creation fails. + * Creates the given topic if it is not yet created. Adds a topic to the + * {@code creationFailureTopics} if creation fails. */ private void createSystemTopic(TopicId topicId, Queue creationFailureTopics) { try { @@ -321,7 +321,9 @@ public void run() { unit); } - /** Creates the given topic if it is not yet created. */ + /** + * Creates the given topic if it is not yet created. + */ private void createTopicIfNotExists(TopicId topicId) throws IOException { try { createTopic(new DefaultTopicMetadata(topicId)); @@ -332,7 +334,9 @@ private void createTopicIfNotExists(TopicId topicId) throws IOException { } } - /** Creates a loading cache for {@link TopicMetadata}. */ + /** + * Creates a loading cache for {@link TopicMetadata}. + */ private LoadingCache createTopicCache() { return CacheBuilder.newBuilder() .build( @@ -347,12 +351,12 @@ public TopicMetadata load(TopicId topicId) throws Exception { } /** - * Creates a {@link LoadingCache} for {@link ConcurrentMessageWriter} for writing to {@link - * MessageTable} or {@link PayloadTable}. + * Creates a {@link LoadingCache} for {@link ConcurrentMessageWriter} for writing to + * {@link MessageTable} or {@link PayloadTable}. * - * @param messageTable {@code true} for building a cache for the {@link MessageTable}; {@code - * false} for the {@link PayloadTable} - * @param cConf the system configuration + * @param messageTable {@code true} for building a cache for the {@link MessageTable}; + * {@code false} for the {@link PayloadTable} + * @param cConf the system configuration * @return a {@link LoadingCache} for */ private LoadingCache createTableWriterCache( @@ -388,7 +392,7 @@ public ConcurrentMessageWriter load(TopicId topicId) throws Exception { StoreRequestWriter messagesWriter = messageTable ? new MessageTableStoreRequestWriter( - createMessageTable(metadata), timeProvider) + createMessageTable(metadata), timeProvider) : new PayloadTableStoreRequestWriter( createPayloadTable(metadata), timeProvider); @@ -409,7 +413,9 @@ public ConcurrentMessageWriter load(TopicId topicId) throws Exception { }); } - /** Creates a new instance of {@link MetadataTable}. */ + /** + * Creates a new instance of {@link MetadataTable}. + */ private MetadataTable createMetadataTable() throws IOException { return tableFactory.createMetadataTable(); } @@ -424,7 +430,9 @@ private PayloadTable createPayloadTable(@SuppressWarnings("unused") TopicMetadat return tableFactory.createPayloadTable(topicMetadata); } - /** Creates default topic properties based on {@link CConfiguration}. */ + /** + * Creates default topic properties based on {@link CConfiguration}. + */ private Map createDefaultProperties() { Map properties = new HashMap<>(); @@ -475,12 +483,12 @@ private MessageId createMessageTableMessageId(MessageId messageId) { } /** - * Creates a raw message id from the given {@link MessageTable.Entry} and {@link - * PayloadTable.Entry}. + * Creates a raw message id from the given {@link MessageTable.Entry} and + * {@link PayloadTable.Entry}. * * @param messageEntry entry in the message table representing a message * @param payloadEntry an optional entry in the payload table if the message payload is stored in - * the Payload Table + * the Payload Table * @return a byte array representing the raw message id. */ private byte[] createMessageId( @@ -494,9 +502,10 @@ private byte[] createMessageId( * Creates a raw message id from the given {@link MessageTable.Entry} and the payload write * timestamp and sequence id. * - * @param messageEntry entry in the message table representing a message + * @param messageEntry entry in the message table representing a message * @param payloadWriteTimestamp the timestamp that the entry was written to the payload table. - * @param payloadSeqId the sequence id generated when the entry was written to the payload table. + * @param payloadSeqId the sequence id generated when the entry was written to the + * payload table. * @return a byte array representing the raw message id. */ private byte[] createMessageId( From 099cc9800427d8460ce6ffa1d633e92115776923 Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Mon, 2 Dec 2024 21:48:50 +0530 Subject: [PATCH 07/18] Update code comments --- .../messaging/spanner/SpannerMessagingService.java | 2 +- .../spanner/SpannerMessagingServiceTest.java | 14 +++++++++++++- .../client/DelegatingMessagingService.java | 2 -- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java index e5dfccadb69..33578a054d2 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java @@ -275,7 +275,7 @@ public RollbackDetail publish(StoreRequest request) try { client.write(batchCopy); } catch (SpannerException e) { - LOG.error("Cannot commit mutations ", e); + LOG.error("failed to publish message ", e); throw new IOException(e); } } diff --git a/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java b/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java index de0cbdccf3b..94999a9d5d3 100644 --- a/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java +++ b/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java @@ -15,7 +15,6 @@ */ package io.cdap.cdap.messaging.spanner; - import com.google.common.collect.ImmutableMap; import io.cdap.cdap.api.messaging.TopicNotFoundException; import io.cdap.cdap.messaging.spi.MessagingServiceContext; @@ -33,6 +32,19 @@ import org.junit.BeforeClass; import org.junit.Test; +/** + * Unit tests for Cloud spanner implementation of the + * {@link io.cdap.cdap.messaging.spi.MessagingService}. This test needs the following Java + * properties to run. If they are not provided, tests will be ignored. + * + *
    + *
  • gcp.project - GCP project name
  • + *
  • gcp.spanner.instance - GCP spanner instance name
  • + *
  • gcp.spanner.database - GCP spanner database name
  • + *
  • (optional) gcp.credentials.path - Local file path to the service account + * json that has the "Cloud Spanner Database User" role
  • + *
+ */ public class SpannerMessagingServiceTest { private static SpannerMessagingService service; diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java index c179da2bc4d..feefd6a7d73 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java @@ -55,7 +55,6 @@ public class DelegatingMessagingService implements MessagingService { @Inject public DelegatingMessagingService( CConfiguration cConf, MessagingServiceExtensionLoader extensionLoader) { - LOG.info("Delegating service is initialised"); this.cConf = cConf; this.extensionLoader = extensionLoader; } @@ -125,7 +124,6 @@ public void rollback(TopicId topicId, RollbackDetail rollbackDetail) } private MessagingService getDelegate() { - LOG.info("getDelegate() is called."); MessagingService messagingService = this.delegate; if (messagingService != null) { return messagingService; From cf371b066d30db5f276ea96c2442285d857dc523 Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Mon, 2 Dec 2024 22:03:03 +0530 Subject: [PATCH 08/18] Revert changes in CoreMessagingService due to formatting issues. --- .../service/CoreMessagingService.java | 35 ++++++++----------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java index 29bcc8aa647..db018a14532 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java @@ -334,9 +334,7 @@ private void createTopicIfNotExists(TopicId topicId) throws IOException { } } - /** - * Creates a loading cache for {@link TopicMetadata}. - */ + /** Creates a loading cache for {@link TopicMetadata}. */ private LoadingCache createTopicCache() { return CacheBuilder.newBuilder() .build( @@ -351,12 +349,12 @@ public TopicMetadata load(TopicId topicId) throws Exception { } /** - * Creates a {@link LoadingCache} for {@link ConcurrentMessageWriter} for writing to - * {@link MessageTable} or {@link PayloadTable}. + * Creates a {@link LoadingCache} for {@link ConcurrentMessageWriter} for writing to {@link + * MessageTable} or {@link PayloadTable}. * - * @param messageTable {@code true} for building a cache for the {@link MessageTable}; - * {@code false} for the {@link PayloadTable} - * @param cConf the system configuration + * @param messageTable {@code true} for building a cache for the {@link MessageTable}; {@code + * false} for the {@link PayloadTable} + * @param cConf the system configuration * @return a {@link LoadingCache} for */ private LoadingCache createTableWriterCache( @@ -413,9 +411,7 @@ public ConcurrentMessageWriter load(TopicId topicId) throws Exception { }); } - /** - * Creates a new instance of {@link MetadataTable}. - */ + /** Creates a new instance of {@link MetadataTable}. */ private MetadataTable createMetadataTable() throws IOException { return tableFactory.createMetadataTable(); } @@ -430,9 +426,7 @@ private PayloadTable createPayloadTable(@SuppressWarnings("unused") TopicMetadat return tableFactory.createPayloadTable(topicMetadata); } - /** - * Creates default topic properties based on {@link CConfiguration}. - */ + /** Creates default topic properties based on {@link CConfiguration}. */ private Map createDefaultProperties() { Map properties = new HashMap<>(); @@ -483,12 +477,12 @@ private MessageId createMessageTableMessageId(MessageId messageId) { } /** - * Creates a raw message id from the given {@link MessageTable.Entry} and - * {@link PayloadTable.Entry}. + * Creates a raw message id from the given {@link MessageTable.Entry} and {@link + * PayloadTable.Entry}. * * @param messageEntry entry in the message table representing a message * @param payloadEntry an optional entry in the payload table if the message payload is stored in - * the Payload Table + * the Payload Table * @return a byte array representing the raw message id. */ private byte[] createMessageId( @@ -502,10 +496,9 @@ private byte[] createMessageId( * Creates a raw message id from the given {@link MessageTable.Entry} and the payload write * timestamp and sequence id. * - * @param messageEntry entry in the message table representing a message + * @param messageEntry entry in the message table representing a message * @param payloadWriteTimestamp the timestamp that the entry was written to the payload table. - * @param payloadSeqId the sequence id generated when the entry was written to the - * payload table. + * @param payloadSeqId the sequence id generated when the entry was written to the payload table. * @return a byte array representing the raw message id. */ private byte[] createMessageId( @@ -714,4 +707,4 @@ public void close() { closeQuietly(messageTable); } } -} +} \ No newline at end of file From 0032a5b137cfc117b5b0c213321b17bb007903f2 Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Tue, 3 Dec 2024 14:49:10 +0530 Subject: [PATCH 09/18] Address review comments + add unit test. --- .../io/cdap/cdap/common/conf/Constants.java | 2 + .../src/main/resources/cdap-default.xml | 36 +++++++++++- .../spanner/SpannerMessagingService.java | 57 ++++++++++--------- .../cdap/messaging/spanner/SpannerUtil.java | 3 + .../DefaultMessagingServiceContext.java | 8 ++- .../service/CoreMessagingService.java | 2 +- 6 files changed, 73 insertions(+), 35 deletions(-) diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index e4a6fcdebe5..19962c983a3 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -2155,6 +2155,8 @@ public static final class MessagingSystem { // TMS HBase table attribute that indicates the number of prefix bytes used for the row key public static final String HBASE_MESSAGING_TABLE_PREFIX_NUM_BYTES = "cdap.messaging.table.prefix.num.bytes"; + + public static final String SPANNER_EXTENSION_PROPERTY_PREFIX = "messaging.spanner.properties."; } /** diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index 0702ad438ae..c4242064269 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -247,7 +247,10 @@ twill.jvm.gc.opts - -XX:+UseG1GC -verbose:gc -Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M + -XX:+UseG1GC -verbose:gc -Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails + -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 + -XX:GCLogFileSize=1M + (,9) @@ -2661,6 +2664,31 @@ + + messaging.spanner.properties.publish.batch.size + 50 + + The default max number of messages in each publish batch for spanner messaging service. + + + + + messaging.spanner.properties.publish.batch.timeout.millis + 50 + + The default maximum waiting time before a batch is published by spanner messaging service. + + + + + messaging.spanner.properties.publish.delay.millis + 5 + + The default waiting time for receiving new publish calls that ensures efficient batching and + timely publishing of messages. + + + messaging.twill.java.heap.memory.ratio @@ -5384,7 +5412,8 @@ true Whether user code isolation is enabled in task worker. When enabled, task workers will ensure - only 1 request is accepted at max and later after the task worker is restarted it can take next request. + only 1 request is accepted at max and later after the task worker is restarted it can take + next request. @@ -6515,7 +6544,8 @@ auditlog.messaging.fetch.size 20 - Number of messages to fetch from messaging system for each batch to consume by Audit log consumer. + Number of messages to fetch from messaging system for each batch to consume by Audit log + consumer. diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java index 33578a054d2..14e2b5c5eac 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java @@ -60,6 +60,15 @@ public class SpannerMessagingService implements MessagingService { private static final Logger LOG = LoggerFactory.getLogger(SpannerMessagingService.class); + public static final String PAYLOAD_FIELD = "payload"; + public static final String PUBLISH_TS_FIELD = "publish_ts"; + public static final String PAYLOAD_SEQUENCE_ID = "payload_sequence_id"; + public static final String SEQUENCE_ID_FIELD = "sequence_id"; + public static final String TOPIC_METADATA_TABLE = "topic_metadata"; + public static final String TOPIC_ID_FIELD = "topic_id"; + public static final String PROPERTIES_FIELD = "properties"; + public static final String NAMESPACE_FIELD = "namespace"; + public static final String TOPIC_TABLE_PREFIX = "messaging"; private DatabaseClient client; @@ -69,16 +78,13 @@ public class SpannerMessagingService implements MessagingService { private String databaseId; - private final ConcurrentLinkedQueue batch = new ConcurrentLinkedQueue<>(); + private int publishBatchSize; - public static final String PAYLOAD_FIELD = "payload"; - public static final String PUBLISH_TS_FIELD = "publish_ts"; - public static final String PAYLOAD_SEQUENCE_ID = "payload_sequence_id"; - public static final String SEQUENCE_ID_FIELD = "sequence_id"; - public static final String TOPIC_METADATA_TABLE = "topic_metadata"; - public static final String TOPIC_ID_FIELD = "topic_id"; - public static final String PROPERTIES_FIELD = "properties"; - public static final String NAMESPACE_FIELD = "namespace"; + private int publishBatchTimeoutMillis; + + private int publishDelayMillis; + + private final ConcurrentLinkedQueue batch = new ConcurrentLinkedQueue<>(); @Override public void initialize(MessagingServiceContext context) throws IOException { @@ -88,6 +94,11 @@ public void initialize(MessagingServiceContext context) throws IOException { String projectID = SpannerUtil.getProjectID(cConf); Credentials credentials = SpannerUtil.getCredentials(cConf); + this.publishBatchSize = Integer.parseInt(cConf.get(SpannerUtil.PUBLISH_BATCH_SIZE)); + this.publishBatchTimeoutMillis = Integer.parseInt( + cConf.get(SpannerUtil.PUBLISH_BATCH_TIMEOUT_MILLIS)); + this.publishDelayMillis = Integer.parseInt(cConf.get(SpannerUtil.PUBLISH_DELAY_MILLIS)); + Spanner spanner = SpannerUtil.getSpannerService(projectID, credentials); this.client = SpannerUtil.getSpannerDbClient(projectID, instanceId, databaseId, spanner); this.adminClient = SpannerUtil.getSpannerDbAdminClient(spanner); @@ -111,7 +122,6 @@ public void createTopic(TopicMetadata topicMetadata) try { future.get(); } catch (InterruptedException | ExecutionException e) { - LOG.error("Failed to create topic {}", topicMetadata.getTopicId().getTopic(), e); throw new IOException(e); } @@ -124,8 +134,6 @@ public void createTopic(TopicMetadata topicMetadata) try { client.write(Collections.singleton(mutation)); } catch (SpannerException e) { - LOG.error("Failed to update topic metadata table for {}", - topicMetadata.getTopicId().getTopic(), e); throw new IOException(e); } LOG.info("Created topic : {}", topicMetadata.getTopicId().getTopic()); @@ -141,13 +149,13 @@ private static String getCreateTopicDDLStatement(TopicId topicId) { return String.format("CREATE TABLE IF NOT EXISTS %s ( %s INT64, %s INT64, %s" + " TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true), %s BYTES(MAX) )" + " PRIMARY KEY (%s, %s, %s), ROW DELETION POLICY" - + " (OLDER_THAN(publish_ts, INTERVAL 7 DAY))", getTableName(topicId), SEQUENCE_ID_FIELD, + + " (OLDER_THAN(%s, INTERVAL 7 DAY))", getTableName(topicId), SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD, PAYLOAD_FIELD, SEQUENCE_ID_FIELD, - PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD); + PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD, PUBLISH_TS_FIELD); } public static String getTableName(TopicId topicId) { - return topicId.getNamespace() + topicId.getTopic(); + return String.join("-", TOPIC_TABLE_PREFIX, topicId.getNamespace(), topicId.getTopic()); } @Override @@ -169,7 +177,6 @@ public void updateTopic(TopicMetadata topicMetadata) throw new TopicNotFoundException(topicMetadata.getTopicId().getNamespace(), topicMetadata.getTopicId().getTopic()); } - LOG.error("Failed to update topic {}", topicId, e); throw new IOException(e); } } @@ -185,7 +192,6 @@ public void deleteTopic(TopicId topicId) try { future.get(); } catch (InterruptedException | ExecutionException e) { - LOG.error("Error when executing DDL statements", e); throw new IOException(e); } @@ -193,7 +199,6 @@ public void deleteTopic(TopicId topicId) try { client.write(Collections.singletonList(mutation)); } catch (SpannerException e) { - LOG.error("Unable to delete {} from topic metadata table", topicId.getTopic()); throw new IOException(e); } } @@ -222,13 +227,10 @@ public List listTopics(NamespaceId namespaceId) List topics = new ArrayList<>(); String namespace = namespaceId.getNamespace(); - String topicSQL = String.format( - "SELECT %s FROM %s WHERE %s = '%s'", - TOPIC_ID_FIELD, TOPIC_METADATA_TABLE, NAMESPACE_FIELD, namespace - ); + String topicSQL = String.format("SELECT %s FROM %s WHERE %s = '%s'", TOPIC_ID_FIELD, + TOPIC_METADATA_TABLE, NAMESPACE_FIELD, namespace); - try (ResultSet resultSet = client.singleUse().executeQuery( - Statement.of(topicSQL))) { + try (ResultSet resultSet = client.singleUse().executeQuery(Statement.of(topicSQL))) { while (resultSet.next()) { String topicId = resultSet.getString(TOPIC_ID_FIELD); @@ -262,11 +264,11 @@ public RollbackDetail publish(StoreRequest request) batchCopy.add(mutation); } - if (batch.isEmpty() && (i < 50 || System.currentTimeMillis() - start < 50)) { + if (batch.isEmpty() && (i < publishBatchSize + || System.currentTimeMillis() - start < publishBatchTimeoutMillis)) { try { - Thread.sleep(5); + Thread.sleep(publishDelayMillis); } catch (InterruptedException e) { - LOG.error("error during sleep", e); throw new IOException(e); } } @@ -275,7 +277,6 @@ public RollbackDetail publish(StoreRequest request) try { client.write(batchCopy); } catch (SpannerException e) { - LOG.error("failed to publish message ", e); throw new IOException(e); } } diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java index c9e6810ea6a..bd9bc72e851 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java @@ -38,6 +38,9 @@ class SpannerUtil { static final String INSTANCE = "instance"; static final String DATABASE = "database"; static final String CREDENTIALS_PATH = "credentials.path"; + static final String PUBLISH_BATCH_SIZE = "publish.batch.size"; + static final String PUBLISH_BATCH_TIMEOUT_MILLIS = "publish.batch.timeout.millis"; + static final String PUBLISH_DELAY_MILLIS = "publish.delay.millis"; static DatabaseClient getSpannerDbClient(String projectID, String instanceID, String databaseID, Spanner spanner) { diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultMessagingServiceContext.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultMessagingServiceContext.java index f0f51b52a06..050f417ba4b 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultMessagingServiceContext.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultMessagingServiceContext.java @@ -38,10 +38,12 @@ public class DefaultMessagingServiceContext implements MessagingServiceContext { @Override public Map getProperties() { // TODO: cdap-tms module refactoring will remove this dependency on spanner. - String spannerPropertiesPrefix = + String spannerStoragePropertiesPrefix = Constants.Dataset.STORAGE_EXTENSION_PROPERTY_PREFIX + storageImpl + "."; - Map propertiesMap = new HashMap<>( - cConf.getPropsWithPrefix(spannerPropertiesPrefix)); + String spannerMessagingPropertiesPrefix = Constants.MessagingSystem.SPANNER_EXTENSION_PROPERTY_PREFIX; + Map propertiesMap = new HashMap<>(); + propertiesMap.putAll(cConf.getPropsWithPrefix(spannerStoragePropertiesPrefix)); + propertiesMap.putAll(cConf.getPropsWithPrefix(spannerMessagingPropertiesPrefix)); return Collections.unmodifiableMap(propertiesMap); } } diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java index db018a14532..e050b81e817 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java @@ -707,4 +707,4 @@ public void close() { closeQuietly(messageTable); } } -} \ No newline at end of file +} From c32cd53448795408ffb7da174e037ca9d400a3bd Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Tue, 3 Dec 2024 14:52:06 +0530 Subject: [PATCH 10/18] Revert unnecessary changes in Core Messaging Service. --- .../cdap/messaging/service/CoreMessagingService.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java index e050b81e817..16fa1735ed8 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java @@ -249,8 +249,8 @@ protected void startUp() throws Exception { } /** - * Creates the given topic if it is not yet created. Adds a topic to the - * {@code creationFailureTopics} if creation fails. + * Creates the given topic if it is not yet created. Adds a topic to the {@code + * creationFailureTopics} if creation fails. */ private void createSystemTopic(TopicId topicId, Queue creationFailureTopics) { try { @@ -321,9 +321,7 @@ public void run() { unit); } - /** - * Creates the given topic if it is not yet created. - */ + /** Creates the given topic if it is not yet created. */ private void createTopicIfNotExists(TopicId topicId) throws IOException { try { createTopic(new DefaultTopicMetadata(topicId)); @@ -390,7 +388,7 @@ public ConcurrentMessageWriter load(TopicId topicId) throws Exception { StoreRequestWriter messagesWriter = messageTable ? new MessageTableStoreRequestWriter( - createMessageTable(metadata), timeProvider) + createMessageTable(metadata), timeProvider) : new PayloadTableStoreRequestWriter( createPayloadTable(metadata), timeProvider); From f87bf2d746a71ccc8210ae4cd613c76aa929eb63 Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Wed, 4 Dec 2024 20:25:25 +0530 Subject: [PATCH 11/18] Use "_" as delimiter in topic name instead of "-" --- .../spanner/SpannerMessagingService.java | 83 ++++++++++++++++++- 1 file changed, 81 insertions(+), 2 deletions(-) diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java index 14e2b5c5eac..3f07bd5ac3e 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java @@ -33,6 +33,8 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; +import io.cdap.cdap.api.common.Bytes; +import io.cdap.cdap.api.dataset.lib.AbstractCloseableIterator; import io.cdap.cdap.api.dataset.lib.CloseableIterator; import io.cdap.cdap.api.messaging.TopicAlreadyExistsException; import io.cdap.cdap.api.messaging.TopicNotFoundException; @@ -155,7 +157,7 @@ private static String getCreateTopicDDLStatement(TopicId topicId) { } public static String getTableName(TopicId topicId) { - return String.join("-", TOPIC_TABLE_PREFIX, topicId.getNamespace(), topicId.getTopic()); + return String.join("_", TOPIC_TABLE_PREFIX, topicId.getNamespace(), topicId.getTopic()); } @Override @@ -299,6 +301,83 @@ public void rollback(TopicId topicId, RollbackDetail rollbackDetail) @Override public CloseableIterator fetch(MessageFetchRequest messageFetchRequest) throws TopicNotFoundException, IOException { - throw new IOException("NOT IMPLEMENTED"); + LOG.info("Message Fetch Request {} : {}", messageFetchRequest.getTopicId().getTopic(), messageFetchRequest.getStartOffset()); + Long startTime = 0L; + if (messageFetchRequest.getStartTime() != null) { + startTime = messageFetchRequest.getStartTime(); + } + short sequenceId = -1; + byte[] id = messageFetchRequest.getStartOffset(); + if (id != null) { + int offset = 0; + startTime = Bytes.toLong(id, offset); + offset += Bytes.SIZEOF_LONG; + sequenceId = Bytes.toShort(id, offset); + LOG.info("start time : {} sequenceId : {}", startTime, sequenceId); + } + + String sqlStatement = String.format( + "SELECT %s, %s, UNIX_MICROS(%s), %s FROM %s where (payload_sequence_id>-1" + + " and publish_ts > TIMESTAMP_MICROS(%s)) or" + + " (payload_sequence_id>-1 and publish_ts = TIMESTAMP_MICROS(%s) and sequence_id > %s) order by" + + " publish_ts,sequence_id LIMIT %s", SpannerMessagingService.SEQUENCE_ID_FIELD, + SpannerMessagingService.PAYLOAD_SEQUENCE_ID, SpannerMessagingService.PUBLISH_TS_FIELD, + SpannerMessagingService.PAYLOAD_FIELD, + SpannerMessagingService.getTableName(messageFetchRequest.getTopicId()), startTime, + startTime, sequenceId, + messageFetchRequest.getLimit()); + + LOG.info("Fetch sql {}", sqlStatement); + try { + ResultSet resultSet = client.singleUse().executeQuery(Statement.of(sqlStatement)); + LOG.info("executeQuery called"); + return new SpannerResultSetClosableIterator<>(resultSet); + } catch (Exception ex) { + LOG.error("Error when fetching {}", sqlStatement, ex); + throw new IOException(ex); + } + } + + public static class SpannerResultSetClosableIterator extends + AbstractCloseableIterator { + + private final ResultSet resultSet; + + public SpannerResultSetClosableIterator(ResultSet resultSet) { + this.resultSet = resultSet; + } + + @Override + protected io.cdap.cdap.messaging.spi.RawMessage computeNext() { + if (!resultSet.next()) { + return endOfData(); + } + + byte[] id = getMessageId(resultSet.getLong(0), resultSet.getLong(1), resultSet.getLong(2)); + byte[] payload = resultSet.getBytes(3).toByteArray(); + + LOG.info("computeNext called"); + return new io.cdap.cdap.messaging.spi.RawMessage.Builder().setId(id).setPayload(payload) + .build(); + } + + @Override + public void close() { + resultSet.close(); + } + } + + public static byte[] getMessageId(long sequenceId, long messageSequenceId, long timestamp) { + LOG.info("sequenceId {} messageSequenceId {} timestamp {}", sequenceId, messageSequenceId, + timestamp); + byte[] result = new byte[Bytes.SIZEOF_LONG + Bytes.SIZEOF_SHORT + Bytes.SIZEOF_LONG + + Bytes.SIZEOF_SHORT]; + int offset = 0; + // Implementation copied from MessageId + offset = Bytes.putLong(result, offset, timestamp); + offset = Bytes.putShort(result, offset, (short) sequenceId); + offset = Bytes.putLong(result, offset, 0); + Bytes.putShort(result, offset, (short) messageSequenceId); + return result; } } From 7a05fbf45786785c76c71c78705fa34a0c97169b Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Wed, 4 Dec 2024 22:07:14 +0530 Subject: [PATCH 12/18] Reformat code. --- .../cdap/cdap/messaging/spanner/SpannerMessagingService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java index 3f07bd5ac3e..66613080d1a 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java @@ -301,7 +301,8 @@ public void rollback(TopicId topicId, RollbackDetail rollbackDetail) @Override public CloseableIterator fetch(MessageFetchRequest messageFetchRequest) throws TopicNotFoundException, IOException { - LOG.info("Message Fetch Request {} : {}", messageFetchRequest.getTopicId().getTopic(), messageFetchRequest.getStartOffset()); + LOG.info("Message Fetch Request {} : {}", messageFetchRequest.getTopicId().getTopic(), + messageFetchRequest.getStartOffset()); Long startTime = 0L; if (messageFetchRequest.getStartTime() != null) { startTime = messageFetchRequest.getStartTime(); From 11a11e3f8dc035d07c518652163d3b42ae3469b0 Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Fri, 6 Dec 2024 17:55:37 +0530 Subject: [PATCH 13/18] Move createTopics() in Spanner specific implementation. --- .../spanner/SpannerMessagingService.java | 74 ++++++++++++------- .../spanner/SpannerMessagingServiceTest.java | 2 +- .../cdap/messaging/spi/MessagingService.java | 5 +- .../client/ClientMessagingService.java | 2 +- .../client/DelegatingMessagingService.java | 24 +++--- .../LeaderElectionMessagingService.java | 2 +- .../service/CoreMessagingService.java | 2 +- 7 files changed, 68 insertions(+), 43 deletions(-) diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java index 66613080d1a..45f9a812038 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java @@ -89,7 +89,8 @@ public class SpannerMessagingService implements MessagingService { private final ConcurrentLinkedQueue batch = new ConcurrentLinkedQueue<>(); @Override - public void initialize(MessagingServiceContext context) throws IOException { + public void initialize(MessagingServiceContext context, List systemTopics) + throws IOException { Map cConf = context.getProperties(); this.databaseId = SpannerUtil.getDatabaseID(cConf); this.instanceId = SpannerUtil.getInstanceID(cConf); @@ -105,6 +106,8 @@ public void initialize(MessagingServiceContext context) throws IOException { this.client = SpannerUtil.getSpannerDbClient(projectID, instanceId, databaseId, spanner); this.adminClient = SpannerUtil.getSpannerDbAdminClient(spanner); LOG.info("Spanner messaging service started."); + + createTopics(systemTopics); } @Override @@ -118,27 +121,21 @@ public void createTopic(TopicMetadata topicMetadata) List ddlStatements = new ArrayList<>(); ddlStatements.add(getCreateTopicMetadataDDLStatement()); ddlStatements.add(getCreateTopicDDLStatement(topicMetadata.getTopicId())); + executeCreateDDLStatements(ddlStatements); + updateTopicMetadataTable(Collections.singletonList(topicMetadata)); + LOG.info("Created topic : {}", topicMetadata.getTopicId().getTopic()); + } - OperationFuture future = adminClient.updateDatabaseDdl( - this.instanceId, this.databaseId, ddlStatements, null); - try { - future.get(); - } catch (InterruptedException | ExecutionException e) { - throw new IOException(e); - } - - Gson gson = new Gson(); - String jsonString = gson.toJson(topicMetadata.getProperties()); - Mutation mutation = Mutation.newInsertOrUpdateBuilder(TOPIC_METADATA_TABLE).set(TOPIC_ID_FIELD) - .to(getTableName(topicMetadata.getTopicId())).set(PROPERTIES_FIELD) - .to(Value.json(jsonString)).set(NAMESPACE_FIELD) - .to(topicMetadata.getTopicId().getNamespace()).build(); - try { - client.write(Collections.singleton(mutation)); - } catch (SpannerException e) { - throw new IOException(e); + private void createTopics(List topics) throws IOException { + List ddlStatements = new ArrayList<>(); + ddlStatements.add(getCreateTopicMetadataDDLStatement()); + for (TopicMetadata topic : topics) { + LOG.info("Creating topic : {}", topic.getTopicId().getTopic()); + ddlStatements.add(getCreateTopicDDLStatement(topic.getTopicId())); } - LOG.info("Created topic : {}", topicMetadata.getTopicId().getTopic()); + executeCreateDDLStatements(ddlStatements); + updateTopicMetadataTable(topics); + LOG.info("Created all system topics."); } private static String getCreateTopicMetadataDDLStatement() { @@ -150,10 +147,36 @@ private static String getCreateTopicMetadataDDLStatement() { private static String getCreateTopicDDLStatement(TopicId topicId) { return String.format("CREATE TABLE IF NOT EXISTS %s ( %s INT64, %s INT64, %s" + " TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true), %s BYTES(MAX) )" - + " PRIMARY KEY (%s, %s, %s), ROW DELETION POLICY" - + " (OLDER_THAN(%s, INTERVAL 7 DAY))", getTableName(topicId), SEQUENCE_ID_FIELD, - PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD, PAYLOAD_FIELD, SEQUENCE_ID_FIELD, - PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD, PUBLISH_TS_FIELD); + + " PRIMARY KEY (%s, %s, %s), ROW DELETION POLICY" + " (OLDER_THAN(%s, INTERVAL 7 DAY))", + getTableName(topicId), SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD, + PAYLOAD_FIELD, SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD, PUBLISH_TS_FIELD); + } + + private void updateTopicMetadataTable(List topics) throws IOException { + for (TopicMetadata topic : topics) { + Gson gson = new Gson(); + String jsonString = gson.toJson(topic.getProperties()); + Mutation mutation = Mutation.newInsertOrUpdateBuilder(TOPIC_METADATA_TABLE) + .set(TOPIC_ID_FIELD) + .to(getTableName(topic.getTopicId())).set(PROPERTIES_FIELD) + .to(Value.json(jsonString)).set(NAMESPACE_FIELD) + .to(topic.getTopicId().getNamespace()).build(); + try { + client.write(Collections.singleton(mutation)); + } catch (SpannerException e) { + throw new IOException(e); + } + } + } + + private void executeCreateDDLStatements(List ddlStatements) throws IOException { + OperationFuture future = adminClient.updateDatabaseDdl( + this.instanceId, this.databaseId, ddlStatements, null); + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); + } } public static String getTableName(TopicId topicId) { @@ -325,8 +348,7 @@ public CloseableIterator fetch(MessageFetchRequest messageFetchReque SpannerMessagingService.PAYLOAD_SEQUENCE_ID, SpannerMessagingService.PUBLISH_TS_FIELD, SpannerMessagingService.PAYLOAD_FIELD, SpannerMessagingService.getTableName(messageFetchRequest.getTopicId()), startTime, - startTime, sequenceId, - messageFetchRequest.getLimit()); + startTime, sequenceId, messageFetchRequest.getLimit()); LOG.info("Fetch sql {}", sqlStatement); try { diff --git a/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java b/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java index 94999a9d5d3..8905d0a0a8d 100644 --- a/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java +++ b/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java @@ -82,7 +82,7 @@ public static void createSpannerMessagingService() throws Exception { MessagingServiceContext context = new MockMessagingServiceContext(configs); service = new SpannerMessagingService(); - service.initialize(context); + service.initialize(context, ); } @After diff --git a/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingService.java b/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingService.java index f72866ab3a2..d32d4a6e65b 100644 --- a/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingService.java +++ b/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingService.java @@ -39,9 +39,10 @@ public interface MessagingService { * Initialize the messaging service. This method is guaranteed to be called before any other * method is called. It will only be called once for the lifetime of the messaging service. * - * @param context the context that can be used to initialize the messaging service. + * @param context the context that can be used to initialize the messaging service. + * @param systemTopics topics to be created initially. */ - void initialize(MessagingServiceContext context) throws IOException; + void initialize(MessagingServiceContext context, List systemTopics) throws IOException; /** * Returns the name of this MessagingService. The name needs to match with the configuration diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java index 270baa84ce1..d51d45afd09 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java @@ -119,7 +119,7 @@ public ClientMessagingService(RemoteClientFactory remoteClientFactory, boolean c } @Override - public void initialize(MessagingServiceContext context) throws IOException { + public void initialize(MessagingServiceContext context, List systemTopics) throws IOException { } @Override diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java index feefd6a7d73..66f7aec4171 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java @@ -35,6 +35,7 @@ import io.cdap.cdap.proto.id.TopicId; import io.cdap.cdap.security.spi.authorization.UnauthorizedException; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -53,8 +54,8 @@ public class DelegatingMessagingService implements MessagingService { private final MessagingServiceExtensionLoader extensionLoader; @Inject - public DelegatingMessagingService( - CConfiguration cConf, MessagingServiceExtensionLoader extensionLoader) { + public DelegatingMessagingService(CConfiguration cConf, + MessagingServiceExtensionLoader extensionLoader) { this.cConf = cConf; this.extensionLoader = extensionLoader; } @@ -78,7 +79,8 @@ public void deleteTopic(TopicId topicId) } @Override - public void initialize(MessagingServiceContext context) throws IOException { + public void initialize(MessagingServiceContext context, List systemTopics) + throws IOException { } @Override @@ -141,26 +143,26 @@ private MessagingService getDelegate() { } LOG.info("Messaging service {} is loaded", messagingService.getName()); try { - messagingService.initialize(new DefaultMessagingServiceContext(this.cConf)); - createTopics(cConf, messagingService); - } catch (IOException | TopicAlreadyExistsException e) { + messagingService.initialize(new DefaultMessagingServiceContext(this.cConf), + topicsToCreate(cConf)); + } catch (IOException e) { throw new RuntimeException(e); } - LOG.info("Messaging service {} is initialized and system topics are created.", - messagingService.getName()); + LOG.info("Messaging service {} is initialized.", messagingService.getName()); this.delegate = messagingService; return messagingService; } } - private void createTopics(CConfiguration cConf, MessagingService messagingService) - throws IOException, TopicAlreadyExistsException { + private List topicsToCreate(CConfiguration cConf) { + List topics = new ArrayList<>(); // If we implement this at some other place, // we will need to introduce cdap-tms dependency in that package, which is not recommended. Set systemTopics = MessagingServiceUtils.getSystemTopics(cConf, true); for (TopicId topic : systemTopics) { - messagingService.createTopic(new DefaultTopicMetadata(topic)); + topics.add(new DefaultTopicMetadata(topic)); } + return topics; } } diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingService.java index 6e1a8b8f84e..a3285a6a853 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingService.java @@ -83,7 +83,7 @@ public class LeaderElectionMessagingService extends AbstractIdleService implemen } @Override - public void initialize(MessagingServiceContext context) throws IOException { + public void initialize(MessagingServiceContext context, List systemTopics) throws IOException { } @Override diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java index 16fa1735ed8..dc44479463f 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java @@ -132,7 +132,7 @@ protected CoreMessagingService( } @Override - public void initialize(MessagingServiceContext context) throws IOException { + public void initialize(MessagingServiceContext context, List systemTopics) throws IOException { } @Override From 215d6a0c30a917dd6f3fe4f81becddbcc0a0c725 Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Mon, 9 Dec 2024 14:07:45 +0530 Subject: [PATCH 14/18] Address review comments + add unit test. --- .../src/main/resources/cdap-default.xml | 9 ++--- .../spanner/SpannerMessagingService.java | 28 +++++++++------- .../spanner/SpannerMessagingServiceTest.java | 33 ++++++++++++++++--- cdap-messaging-spi/pom.xml | 5 --- .../cdap/messaging/spi/MessagingService.java | 5 ++- .../spi/MessagingServiceContext.java | 9 +++++ .../client/ClientMessagingService.java | 2 +- .../DefaultMessagingServiceContext.java | 17 ++++++++++ .../client/DelegatingMessagingService.java | 20 ++--------- .../LeaderElectionMessagingService.java | 2 +- .../service/CoreMessagingService.java | 2 +- 11 files changed, 81 insertions(+), 51 deletions(-) diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index c4242064269..9c2d00927d8 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -247,10 +247,7 @@ twill.jvm.gc.opts - -XX:+UseG1GC -verbose:gc -Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails - -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 - -XX:GCLogFileSize=1M - + -XX:+UseG1GC -verbose:gc -Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M (,9) @@ -2684,8 +2681,8 @@ messaging.spanner.properties.publish.delay.millis 5 - The default waiting time for receiving new publish calls that ensures efficient batching and - timely publishing of messages. + The default waiting time for a publishing batch to accumulate, to ensure efficient batching of + messages in spanner messaging service. diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java index 45f9a812038..8e9d71f577e 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java @@ -89,8 +89,7 @@ public class SpannerMessagingService implements MessagingService { private final ConcurrentLinkedQueue batch = new ConcurrentLinkedQueue<>(); @Override - public void initialize(MessagingServiceContext context, List systemTopics) - throws IOException { + public void initialize(MessagingServiceContext context) throws IOException { Map cConf = context.getProperties(); this.databaseId = SpannerUtil.getDatabaseID(cConf); this.instanceId = SpannerUtil.getInstanceID(cConf); @@ -107,7 +106,7 @@ public void initialize(MessagingServiceContext context, List syst this.adminClient = SpannerUtil.getSpannerDbAdminClient(spanner); LOG.info("Spanner messaging service started."); - createTopics(systemTopics); + createTopics(context.getSystemTopics()); } @Override @@ -127,6 +126,10 @@ public void createTopic(TopicMetadata topicMetadata) } private void createTopics(List topics) throws IOException { + if (topics.isEmpty()) { + LOG.info("No topics were created initially."); + return; + } List ddlStatements = new ArrayList<>(); ddlStatements.add(getCreateTopicMetadataDDLStatement()); for (TopicMetadata topic : topics) { @@ -153,19 +156,20 @@ private static String getCreateTopicDDLStatement(TopicId topicId) { } private void updateTopicMetadataTable(List topics) throws IOException { + List mutations = new ArrayList<>(); for (TopicMetadata topic : topics) { Gson gson = new Gson(); String jsonString = gson.toJson(topic.getProperties()); Mutation mutation = Mutation.newInsertOrUpdateBuilder(TOPIC_METADATA_TABLE) - .set(TOPIC_ID_FIELD) - .to(getTableName(topic.getTopicId())).set(PROPERTIES_FIELD) - .to(Value.json(jsonString)).set(NAMESPACE_FIELD) - .to(topic.getTopicId().getNamespace()).build(); - try { - client.write(Collections.singleton(mutation)); - } catch (SpannerException e) { - throw new IOException(e); - } + .set(TOPIC_ID_FIELD).to(getTableName(topic.getTopicId())).set(PROPERTIES_FIELD) + .to(Value.json(jsonString)).set(NAMESPACE_FIELD).to(topic.getTopicId().getNamespace()) + .build(); + mutations.add(mutation); + } + try { + client.write(mutations); + } catch (SpannerException e) { + throw new IOException(e); } } diff --git a/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java b/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java index 8905d0a0a8d..0ab2bc01879 100644 --- a/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java +++ b/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java @@ -49,10 +49,15 @@ public class SpannerMessagingServiceTest { private static SpannerMessagingService service; - private static SpannerTopicMetadata SIMPLE_TOPIC = new SpannerTopicMetadata( + private static final List SYSTEM_TOPICS = Arrays.asList(new SpannerTopicMetadata( + new TopicId("system", "t1"), new HashMap<>()), + new SpannerTopicMetadata( + new TopicId("system", "t2"), new HashMap<>())); + + private static final SpannerTopicMetadata SIMPLE_TOPIC = new SpannerTopicMetadata( new TopicId("system", "topic1"), new HashMap<>()); - private static SpannerTopicMetadata TOPIC_WITH_PROPERTIES = new SpannerTopicMetadata( + private static final SpannerTopicMetadata TOPIC_WITH_PROPERTIES = new SpannerTopicMetadata( new TopicId("system", "topic2"), new HashMap() { { put("key1", "value1"); @@ -79,10 +84,13 @@ public static void createSpannerMessagingService() throws Exception { configs.put(SpannerUtil.CREDENTIALS_PATH, credentialsPath); } + configs.put(SpannerUtil.PUBLISH_DELAY_MILLIS, "5"); + configs.put(SpannerUtil.PUBLISH_BATCH_SIZE, "10"); + configs.put(SpannerUtil.PUBLISH_BATCH_TIMEOUT_MILLIS, "10"); MessagingServiceContext context = new MockMessagingServiceContext(configs); service = new SpannerMessagingService(); - service.initialize(context, ); + service.initialize(context); } @After @@ -90,6 +98,8 @@ public void cleanUp() throws Exception { try { service.deleteTopic(SIMPLE_TOPIC.getTopicId()); service.deleteTopic(TOPIC_WITH_PROPERTIES.getTopicId()); + service.deleteTopic(SYSTEM_TOPICS.get(0).getTopicId()); + service.deleteTopic(SYSTEM_TOPICS.get(1).getTopicId()); } catch (TopicNotFoundException e) { // no-op } @@ -102,6 +112,14 @@ public void testCreateTopic() throws Exception { service.getTopicMetadataProperties(SIMPLE_TOPIC.getTopicId())); } + @Test + public void testInitialiseSuccessful() throws Exception { + for (TopicMetadata topic : SYSTEM_TOPICS) { + Assert.assertEquals(topic.getProperties(), + service.getTopicMetadataProperties(topic.getTopicId())); + } + } + @Test public void testCreateTopicWithProperties() throws Exception { service.createTopic(TOPIC_WITH_PROPERTIES); @@ -110,7 +128,7 @@ public void testCreateTopicWithProperties() throws Exception { } @Test(expected = TopicNotFoundException.class) - public void testGetMetadataProperties() throws Exception { + public void testGetMetadataPropertiesInvalidTopic() throws Exception { TopicId topicId = new TopicId("system", "invalid"); service.getTopicMetadataProperties(topicId); } @@ -121,6 +139,8 @@ public void testListTopics() throws Exception { service.createTopic(TOPIC_WITH_PROPERTIES); List topics = service.listTopics(new NamespaceId("system")); Assert.assertEquals(new ArrayList<>(Arrays.asList( + new TopicId("system", SpannerMessagingService.getTableName(SYSTEM_TOPICS.get(0).getTopicId())), + new TopicId("system", SpannerMessagingService.getTableName(SYSTEM_TOPICS.get(1).getTopicId())), new TopicId("system", SpannerMessagingService.getTableName(SIMPLE_TOPIC.getTopicId())), new TopicId("system", SpannerMessagingService.getTableName(TOPIC_WITH_PROPERTIES.getTopicId())))), topics); @@ -144,6 +164,11 @@ private static final class MockMessagingServiceContext implements MessagingServi public Map getProperties() { return config; } + + @Override + public List getSystemTopics() { + return SYSTEM_TOPICS; + } } private static final class SpannerTopicMetadata implements TopicMetadata { diff --git a/cdap-messaging-spi/pom.xml b/cdap-messaging-spi/pom.xml index 78f91957ae1..ad0b8ff1abb 100644 --- a/cdap-messaging-spi/pom.xml +++ b/cdap-messaging-spi/pom.xml @@ -42,11 +42,6 @@ cdap-api ${project.version} - - io.cdap.cdap - cdap-security-spi - ${project.version} - \ No newline at end of file diff --git a/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingService.java b/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingService.java index d32d4a6e65b..f72866ab3a2 100644 --- a/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingService.java +++ b/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingService.java @@ -39,10 +39,9 @@ public interface MessagingService { * Initialize the messaging service. This method is guaranteed to be called before any other * method is called. It will only be called once for the lifetime of the messaging service. * - * @param context the context that can be used to initialize the messaging service. - * @param systemTopics topics to be created initially. + * @param context the context that can be used to initialize the messaging service. */ - void initialize(MessagingServiceContext context, List systemTopics) throws IOException; + void initialize(MessagingServiceContext context) throws IOException; /** * Returns the name of this MessagingService. The name needs to match with the configuration diff --git a/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingServiceContext.java b/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingServiceContext.java index cca776b31e1..444790eb93b 100644 --- a/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingServiceContext.java +++ b/cdap-messaging-spi/src/main/java/io/cdap/cdap/messaging/spi/MessagingServiceContext.java @@ -16,6 +16,7 @@ package io.cdap.cdap.messaging.spi; +import java.util.List; import java.util.Map; /** @@ -31,4 +32,12 @@ public interface MessagingServiceContext { */ Map getProperties(); + /** + * Returns a list of system topics {@link TopicMetadata} as configured by the + * {@link Constants.MessagingSystem#SYSTEM_TOPICS} property. + * + * @return list of system topics {@link TopicMetadata} + */ + List getSystemTopics(); + } \ No newline at end of file diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java index d51d45afd09..270baa84ce1 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/ClientMessagingService.java @@ -119,7 +119,7 @@ public ClientMessagingService(RemoteClientFactory remoteClientFactory, boolean c } @Override - public void initialize(MessagingServiceContext context, List systemTopics) throws IOException { + public void initialize(MessagingServiceContext context) throws IOException { } @Override diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultMessagingServiceContext.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultMessagingServiceContext.java index 050f417ba4b..b4a931e1a3b 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultMessagingServiceContext.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultMessagingServiceContext.java @@ -17,10 +17,17 @@ import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.messaging.DefaultTopicMetadata; +import io.cdap.cdap.messaging.MessagingServiceUtils; import io.cdap.cdap.messaging.spi.MessagingServiceContext; +import io.cdap.cdap.messaging.spi.TopicMetadata; +import io.cdap.cdap.proto.id.TopicId; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Set; /** * Default implementation for {@link MessagingServiceContext}. @@ -46,5 +53,15 @@ public Map getProperties() { propertiesMap.putAll(cConf.getPropsWithPrefix(spannerMessagingPropertiesPrefix)); return Collections.unmodifiableMap(propertiesMap); } + + @Override + public List getSystemTopics() { + List topics = new ArrayList<>(); + Set systemTopics = MessagingServiceUtils.getSystemTopics(cConf, true); + for (TopicId topic : systemTopics) { + topics.add(new DefaultTopicMetadata(topic)); + } + return topics; + } } diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java index 66f7aec4171..a9fb4aadbf3 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DelegatingMessagingService.java @@ -22,8 +22,6 @@ import io.cdap.cdap.api.messaging.TopicNotFoundException; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants.MessagingSystem; -import io.cdap.cdap.messaging.DefaultTopicMetadata; -import io.cdap.cdap.messaging.MessagingServiceUtils; import io.cdap.cdap.messaging.spi.MessageFetchRequest; import io.cdap.cdap.messaging.spi.MessagingService; import io.cdap.cdap.messaging.spi.MessagingServiceContext; @@ -35,10 +33,8 @@ import io.cdap.cdap.proto.id.TopicId; import io.cdap.cdap.security.spi.authorization.UnauthorizedException; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,7 +75,7 @@ public void deleteTopic(TopicId topicId) } @Override - public void initialize(MessagingServiceContext context, List systemTopics) + public void initialize(MessagingServiceContext context) throws IOException { } @@ -143,8 +139,7 @@ private MessagingService getDelegate() { } LOG.info("Messaging service {} is loaded", messagingService.getName()); try { - messagingService.initialize(new DefaultMessagingServiceContext(this.cConf), - topicsToCreate(cConf)); + messagingService.initialize(new DefaultMessagingServiceContext(this.cConf)); } catch (IOException e) { throw new RuntimeException(e); } @@ -154,15 +149,4 @@ private MessagingService getDelegate() { return messagingService; } } - - private List topicsToCreate(CConfiguration cConf) { - List topics = new ArrayList<>(); - // If we implement this at some other place, - // we will need to introduce cdap-tms dependency in that package, which is not recommended. - Set systemTopics = MessagingServiceUtils.getSystemTopics(cConf, true); - for (TopicId topic : systemTopics) { - topics.add(new DefaultTopicMetadata(topic)); - } - return topics; - } } diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingService.java index a3285a6a853..6e1a8b8f84e 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/distributed/LeaderElectionMessagingService.java @@ -83,7 +83,7 @@ public class LeaderElectionMessagingService extends AbstractIdleService implemen } @Override - public void initialize(MessagingServiceContext context, List systemTopics) throws IOException { + public void initialize(MessagingServiceContext context) throws IOException { } @Override diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java index dc44479463f..16fa1735ed8 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/service/CoreMessagingService.java @@ -132,7 +132,7 @@ protected CoreMessagingService( } @Override - public void initialize(MessagingServiceContext context, List systemTopics) throws IOException { + public void initialize(MessagingServiceContext context) throws IOException { } @Override From 42eaa073d2b9fc61b1377184e70856d3698962fa Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Mon, 9 Dec 2024 14:18:55 +0530 Subject: [PATCH 15/18] Fix module dependencies --- cdap-messaging-spi/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cdap-messaging-spi/pom.xml b/cdap-messaging-spi/pom.xml index ad0b8ff1abb..78f91957ae1 100644 --- a/cdap-messaging-spi/pom.xml +++ b/cdap-messaging-spi/pom.xml @@ -42,6 +42,11 @@ cdap-api ${project.version} + + io.cdap.cdap + cdap-security-spi + ${project.version} + \ No newline at end of file From f7f1cb010b7cc27d948c7745e7beb5dcacef8197 Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Tue, 10 Dec 2024 15:22:48 +0530 Subject: [PATCH 16/18] Revert back unnecessary fetch changes + add javadocs for publish() and createTopic() methods. --- .../src/main/resources/cdap-default.xml | 5 +- .../spanner/SpannerMessagingService.java | 122 ++++++------------ .../DefaultMessagingServiceContext.java | 2 +- 3 files changed, 42 insertions(+), 87 deletions(-) diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index 9c2d00927d8..0c40530d7d7 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -2678,11 +2678,10 @@ - messaging.spanner.properties.publish.delay.millis + messaging.spanner.properties.publish.poll.millis 5 - The default waiting time for a publishing batch to accumulate, to ensure efficient batching of - messages in spanner messaging service. + The default polling frequency while either max batch size or batch timeout criterion is met. diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java index 8e9d71f577e..33240336d2b 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java @@ -33,8 +33,6 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; -import io.cdap.cdap.api.common.Bytes; -import io.cdap.cdap.api.dataset.lib.AbstractCloseableIterator; import io.cdap.cdap.api.dataset.lib.CloseableIterator; import io.cdap.cdap.api.messaging.TopicAlreadyExistsException; import io.cdap.cdap.api.messaging.TopicNotFoundException; @@ -141,13 +139,35 @@ private void createTopics(List topics) throws IOException { LOG.info("Created all system topics."); } - private static String getCreateTopicMetadataDDLStatement() { + private String getCreateTopicMetadataDDLStatement() { return String.format( "CREATE TABLE IF NOT EXISTS %s ( %s STRING(MAX) NOT NULL, %s STRING(MAX), %s JSON ) PRIMARY KEY(%s)", TOPIC_METADATA_TABLE, TOPIC_ID_FIELD, NAMESPACE_FIELD, PROPERTIES_FIELD, TOPIC_ID_FIELD); } - private static String getCreateTopicDDLStatement(TopicId topicId) { + /** + *

Key features of the schema: + *

    + *
  • **`sequence_id`:** An arbitrary unique number (0-99) assigned by the publisher. + * This helps to distribute writes across the Spanner cluster (avoiding hotspots) + * and allows for efficient batching of up to 100 messages per transaction. + * The optimal range for `sequence_id` may need to be adjusted based on performance + * testing. Smaller ranges generally lead to faster reads but may increase the risk of hotspots. + *
  • + *
  • **`payload_sequence_id`:** Used to identify message chunks when a message is + * larger than 10MB and needs to be split across multiple rows. + *
  • + *
  • **`publish_ts`:** The commit timestamp obtained from the TrueTime API. + * Guaranteed to be monotonically increasing and unique across transactions + * that modify the same fields. + *
  • + *
  • **`payload`:** The message body.
  • + *
  • Message durability is currently set to 7 days. This means that Messaging service + * allows consumers to fetch messages as old as 7 days.
  • + *
+ *

+ */ + private String getCreateTopicDDLStatement(TopicId topicId) { return String.format("CREATE TABLE IF NOT EXISTS %s ( %s INT64, %s INT64, %s" + " TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true), %s BYTES(MAX) )" + " PRIMARY KEY (%s, %s, %s), ROW DELETION POLICY" + " (OLDER_THAN(%s, INTERVAL 7 DAY))", @@ -269,7 +289,19 @@ public List listTopics(NamespaceId namespaceId) return ImmutableList.copyOf(topics); } - + /** + * Please refer {@link #getCreateTopicDDLStatement(TopicId)} for schema details. Following table + * shows how messages would be persisted in the topic tables. + * + *
+   * TXN     sequence_id  payload_sequence_id  publish_ts  payload
+   * TXN1    0            0                    ts1          msg1
+   * TXN1    1            0                    ts1          msg2_p0
+   * TXN1    1            1                    ts1          msg2_p1
+   * TXN2    0            0                    ts2          msg3
+   * TXN3    0            0                    ts3          msg4
+   * 
+ */ @Nullable @Override public RollbackDetail publish(StoreRequest request) @@ -310,6 +342,7 @@ public RollbackDetail publish(StoreRequest request) } } } + //TODO: Add a RollbackDetail implementation that throws exceptions if any of the methods is called. return null; } @@ -328,83 +361,6 @@ public void rollback(TopicId topicId, RollbackDetail rollbackDetail) @Override public CloseableIterator fetch(MessageFetchRequest messageFetchRequest) throws TopicNotFoundException, IOException { - LOG.info("Message Fetch Request {} : {}", messageFetchRequest.getTopicId().getTopic(), - messageFetchRequest.getStartOffset()); - Long startTime = 0L; - if (messageFetchRequest.getStartTime() != null) { - startTime = messageFetchRequest.getStartTime(); - } - short sequenceId = -1; - byte[] id = messageFetchRequest.getStartOffset(); - if (id != null) { - int offset = 0; - startTime = Bytes.toLong(id, offset); - offset += Bytes.SIZEOF_LONG; - sequenceId = Bytes.toShort(id, offset); - LOG.info("start time : {} sequenceId : {}", startTime, sequenceId); - } - - String sqlStatement = String.format( - "SELECT %s, %s, UNIX_MICROS(%s), %s FROM %s where (payload_sequence_id>-1" - + " and publish_ts > TIMESTAMP_MICROS(%s)) or" - + " (payload_sequence_id>-1 and publish_ts = TIMESTAMP_MICROS(%s) and sequence_id > %s) order by" - + " publish_ts,sequence_id LIMIT %s", SpannerMessagingService.SEQUENCE_ID_FIELD, - SpannerMessagingService.PAYLOAD_SEQUENCE_ID, SpannerMessagingService.PUBLISH_TS_FIELD, - SpannerMessagingService.PAYLOAD_FIELD, - SpannerMessagingService.getTableName(messageFetchRequest.getTopicId()), startTime, - startTime, sequenceId, messageFetchRequest.getLimit()); - - LOG.info("Fetch sql {}", sqlStatement); - try { - ResultSet resultSet = client.singleUse().executeQuery(Statement.of(sqlStatement)); - LOG.info("executeQuery called"); - return new SpannerResultSetClosableIterator<>(resultSet); - } catch (Exception ex) { - LOG.error("Error when fetching {}", sqlStatement, ex); - throw new IOException(ex); - } - } - - public static class SpannerResultSetClosableIterator extends - AbstractCloseableIterator { - - private final ResultSet resultSet; - - public SpannerResultSetClosableIterator(ResultSet resultSet) { - this.resultSet = resultSet; - } - - @Override - protected io.cdap.cdap.messaging.spi.RawMessage computeNext() { - if (!resultSet.next()) { - return endOfData(); - } - - byte[] id = getMessageId(resultSet.getLong(0), resultSet.getLong(1), resultSet.getLong(2)); - byte[] payload = resultSet.getBytes(3).toByteArray(); - - LOG.info("computeNext called"); - return new io.cdap.cdap.messaging.spi.RawMessage.Builder().setId(id).setPayload(payload) - .build(); - } - - @Override - public void close() { - resultSet.close(); - } - } - - public static byte[] getMessageId(long sequenceId, long messageSequenceId, long timestamp) { - LOG.info("sequenceId {} messageSequenceId {} timestamp {}", sequenceId, messageSequenceId, - timestamp); - byte[] result = new byte[Bytes.SIZEOF_LONG + Bytes.SIZEOF_SHORT + Bytes.SIZEOF_LONG - + Bytes.SIZEOF_SHORT]; - int offset = 0; - // Implementation copied from MessageId - offset = Bytes.putLong(result, offset, timestamp); - offset = Bytes.putShort(result, offset, (short) sequenceId); - offset = Bytes.putLong(result, offset, 0); - Bytes.putShort(result, offset, (short) messageSequenceId); - return result; + throw new IOException("NOT IMPLEMENTED"); } } diff --git a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultMessagingServiceContext.java b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultMessagingServiceContext.java index b4a931e1a3b..307645fbc4c 100644 --- a/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultMessagingServiceContext.java +++ b/cdap-tms/src/main/java/io/cdap/cdap/messaging/client/DefaultMessagingServiceContext.java @@ -44,7 +44,7 @@ public class DefaultMessagingServiceContext implements MessagingServiceContext { @Override public Map getProperties() { - // TODO: cdap-tms module refactoring will remove this dependency on spanner. + // TODO: [CDAP-21090] cdap-tms module refactoring will remove this dependency on spanner. String spannerStoragePropertiesPrefix = Constants.Dataset.STORAGE_EXTENSION_PROPERTY_PREFIX + storageImpl + "."; String spannerMessagingPropertiesPrefix = Constants.MessagingSystem.SPANNER_EXTENSION_PROPERTY_PREFIX; From 86685fbc49e15757e421a9f16348b6d0e7c74eb5 Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Tue, 10 Dec 2024 15:29:58 +0530 Subject: [PATCH 17/18] Update property names --- cdap-common/src/main/resources/cdap-default.xml | 2 +- .../cdap/cdap/messaging/spanner/SpannerMessagingService.java | 2 +- .../main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java | 2 +- .../cdap/messaging/spanner/SpannerMessagingServiceTest.java | 2 +- pom.xml | 4 +--- 5 files changed, 5 insertions(+), 7 deletions(-) diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index 0c40530d7d7..19d514e6cb2 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -2678,7 +2678,7 @@ - messaging.spanner.properties.publish.poll.millis + messaging.spanner.properties.publish.batch.poll.millis 5 The default polling frequency while either max batch size or batch timeout criterion is met. diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java index 33240336d2b..344190f016b 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java @@ -97,7 +97,7 @@ public void initialize(MessagingServiceContext context) throws IOException { this.publishBatchSize = Integer.parseInt(cConf.get(SpannerUtil.PUBLISH_BATCH_SIZE)); this.publishBatchTimeoutMillis = Integer.parseInt( cConf.get(SpannerUtil.PUBLISH_BATCH_TIMEOUT_MILLIS)); - this.publishDelayMillis = Integer.parseInt(cConf.get(SpannerUtil.PUBLISH_DELAY_MILLIS)); + this.publishDelayMillis = Integer.parseInt(cConf.get(SpannerUtil.PUBLISH_BATCH_POLL_MILLIS)); Spanner spanner = SpannerUtil.getSpannerService(projectID, credentials); this.client = SpannerUtil.getSpannerDbClient(projectID, instanceId, databaseId, spanner); diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java index bd9bc72e851..669d2e00c7f 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerUtil.java @@ -40,7 +40,7 @@ class SpannerUtil { static final String CREDENTIALS_PATH = "credentials.path"; static final String PUBLISH_BATCH_SIZE = "publish.batch.size"; static final String PUBLISH_BATCH_TIMEOUT_MILLIS = "publish.batch.timeout.millis"; - static final String PUBLISH_DELAY_MILLIS = "publish.delay.millis"; + static final String PUBLISH_BATCH_POLL_MILLIS = "publish.batch.poll.millis"; static DatabaseClient getSpannerDbClient(String projectID, String instanceID, String databaseID, Spanner spanner) { diff --git a/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java b/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java index 0ab2bc01879..947da79d345 100644 --- a/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java +++ b/cdap-messaging-ext-spanner/src/test/java/io/cdap/cdap/messaging/spanner/SpannerMessagingServiceTest.java @@ -84,7 +84,7 @@ public static void createSpannerMessagingService() throws Exception { configs.put(SpannerUtil.CREDENTIALS_PATH, credentialsPath); } - configs.put(SpannerUtil.PUBLISH_DELAY_MILLIS, "5"); + configs.put(SpannerUtil.PUBLISH_BATCH_POLL_MILLIS, "5"); configs.put(SpannerUtil.PUBLISH_BATCH_SIZE, "10"); configs.put(SpannerUtil.PUBLISH_BATCH_TIMEOUT_MILLIS, "10"); MessagingServiceContext context = new MockMessagingServiceContext(configs); diff --git a/pom.xml b/pom.xml index 2b119aca828..6b027880504 100644 --- a/pom.xml +++ b/pom.xml @@ -48,9 +48,6 @@ http://www.cdap.io - - cdap-messaging-ext-spanner - scm:git:https://github.com/cdapio/cdap.git @@ -2607,6 +2604,7 @@ cdap-system-app-unit-test cdap-tms cdap-messaging-spi + cdap-messaging-ext-spanner cdap-gateway cdap-unit-test cdap-unit-test-spark3_2.12 From fbbbb9fa38f18bc15724cca90ded28a1e864c025 Mon Sep 17 00:00:00 2001 From: sidhdirenge Date: Thu, 12 Dec 2024 14:33:11 +0530 Subject: [PATCH 18/18] Address review comments --- .../cdap/messaging/spanner/SpannerMessagingService.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java index 344190f016b..34b038caf5c 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java @@ -120,7 +120,7 @@ public void createTopic(TopicMetadata topicMetadata) ddlStatements.add(getCreateTopicDDLStatement(topicMetadata.getTopicId())); executeCreateDDLStatements(ddlStatements); updateTopicMetadataTable(Collections.singletonList(topicMetadata)); - LOG.info("Created topic : {}", topicMetadata.getTopicId().getTopic()); + LOG.trace("Created topic : {}", topicMetadata.getTopicId().getTopic()); } private void createTopics(List topics) throws IOException { @@ -310,7 +310,7 @@ public RollbackDetail publish(StoreRequest request) batch.add(request); if (!batch.isEmpty()) { - int i = 0; + int sequenceId = 0; List batchCopy = new ArrayList<>(batch.size()); // We need to batch less than fetch limit since we read for publish_ts >= last_message.publish_ts // see fetch for explanation of why we read for publish_ts >= last_message.publish_ts and @@ -318,14 +318,15 @@ public RollbackDetail publish(StoreRequest request) while (!batch.isEmpty()) { StoreRequest headRequest = batch.poll(); for (byte[] payload : headRequest) { + //TODO: [CDAP-21094] Breakdown messages with larger payload into parts. Mutation mutation = Mutation.newInsertBuilder(getTableName(headRequest.getTopicId())) - .set(SEQUENCE_ID_FIELD).to(i++).set(PAYLOAD_SEQUENCE_ID).to(0).set(PUBLISH_TS_FIELD) + .set(SEQUENCE_ID_FIELD).to(sequenceId++).set(PAYLOAD_SEQUENCE_ID).to(0).set(PUBLISH_TS_FIELD) .to("spanner.commit_timestamp()").set(PAYLOAD_FIELD).to(ByteArray.copyFrom(payload)) .build(); batchCopy.add(mutation); } - if (batch.isEmpty() && (i < publishBatchSize + if (batch.isEmpty() && (sequenceId < publishBatchSize || System.currentTimeMillis() - start < publishBatchTimeoutMillis)) { try { Thread.sleep(publishDelayMillis);