From ad454e1b1b044c1cf97eb77368aab1c2c1e5e603 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Fri, 30 Aug 2024 16:51:44 +0800 Subject: [PATCH 01/13] Add spark changes to use hbase api --- .../scala/dev/caraml/spark/BasePipeline.scala | 4 ++ .../dev/caraml/spark/BatchPipeline.scala | 8 ++++ .../scala/dev/caraml/spark/IngestionJob.scala | 3 ++ .../dev/caraml/spark/IngestionJobConfig.scala | 1 + .../bigtable/BigTableSinkRelation.scala | 8 +++- .../spark/stores/bigtable/DefaultSource.scala | 47 +++++++++++++------ .../stores/bigtable/HbaseSinkRelation.scala | 17 +++++++ 7 files changed, 72 insertions(+), 16 deletions(-) create mode 100644 caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala index cb854aa..9fd6322 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala @@ -33,6 +33,10 @@ object BasePipeline { conf .set("spark.bigtable.projectId", projectId) .set("spark.bigtable.instanceId", instanceId) + case HBaseConfig(zookeeperQuorum, zookeeperPort) => + conf + .set("spark.hbase.zookeeper.quorum", zookeeperQuorum) + .set("spark.hbase.zookeeper.port", zookeeperPort.toString) } jobConfig.metrics match { diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/BatchPipeline.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/BatchPipeline.scala index 4ce5d55..d2757f7 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/BatchPipeline.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/BatchPipeline.scala @@ -66,11 +66,19 @@ object BatchPipeline extends BasePipeline { .map(metrics.incrementRead) .filter(rowValidator.allChecks) + val onlineStore = config.store match { + case _: RedisConfig => "redis" + case _: BigTableConfig => "bigtable" + case _: HBaseConfig => "hbase" + } + validRows.write .format(config.store match { case _: RedisConfig => "dev.caraml.spark.stores.redis" case _: BigTableConfig => "dev.caraml.spark.stores.bigtable" + case _: HBaseConfig => "dev.caraml.spark.stores.bigtable" }) + .option("online_store", onlineStore) .option("entity_columns", featureTable.entities.map(_.name).mkString(",")) .option("namespace", featureTable.name) .option("project_name", featureTable.project) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJob.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJob.scala index 69196c9..20939b2 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJob.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJob.scala @@ -87,6 +87,9 @@ object IngestionJob { opt[String](name = "bigtable") .action((x, c) => c.copy(store = parseJSON(x).camelizeKeys.extract[BigTableConfig])) + opt[String](name = "hbase") + .action((x, c) => c.copy(store = parseJSON(x).extract[HBaseConfig])) + opt[String](name = "statsd") .action((x, c) => c.copy(metrics = Some(parseJSON(x).extract[StatsDConfig]))) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala index a13524d..c69c64a 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala @@ -27,6 +27,7 @@ case class RedisWriteProperties( ratePerSecondLimit: Int = 50000 ) case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig +case class HBaseConfig(zookeeperQuorum: String, zookeeperPort: Int) extends StoreConfig sealed trait MetricConfig diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala index 8cf36b8..a60668b 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala @@ -4,7 +4,7 @@ import com.google.cloud.bigtable.hbase.BigtableConfiguration import dev.caraml.spark.serialization.Serializer import dev.caraml.spark.utils.StringUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.client.{Admin, Connection, Put} import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName} import org.apache.hadoop.mapred.JobConf @@ -30,8 +30,12 @@ class BigTableSinkRelation( override def schema: StructType = ??? + def getConnection(hadoopConfig: Configuration): Connection = { + BigtableConfiguration.connect(hadoopConfig) + } + def createTable(): Unit = { - val btConn = BigtableConfiguration.connect(hadoopConfig) + val btConn = getConnection(hadoopConfig) try { val admin = btConn.getAdmin diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala index 3c31c89..ebaf678 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala @@ -23,27 +23,43 @@ class DefaultSource extends CreatableRelationProvider { parameters: Map[String, String], data: DataFrame ): BaseRelation = { - val bigtableConf = BigtableConfiguration.configure( - sqlContext.getConf(PROJECT_KEY), - sqlContext.getConf(INSTANCE_KEY) - ) - - if (sqlContext.getConf("spark.bigtable.emulatorHost", "").nonEmpty) { - bigtableConf.set( - BIGTABLE_EMULATOR_HOST_KEY, - sqlContext.getConf("spark.bigtable.emulatorHost") + val onlineStore = parameters.getOrElse("onlineStore", "bigtable") + var rel: BigTableSinkRelation = null + if (onlineStore == "bigtable") { + val bigtableConf = BigtableConfiguration.configure( + sqlContext.getConf(PROJECT_KEY), + sqlContext.getConf(INSTANCE_KEY) ) - } - configureBigTableClient(bigtableConf, sqlContext) + if (sqlContext.getConf("spark.bigtable.emulatorHost", "").nonEmpty) { + bigtableConf.set( + BIGTABLE_EMULATOR_HOST_KEY, + sqlContext.getConf("spark.bigtable.emulatorHost") + ) + } + + configureBigTableClient(bigtableConf, sqlContext) - val rel = - new BigTableSinkRelation( + rel = + new BigTableSinkRelation( + sqlContext, + new AvroSerializer, + SparkBigtableConfig.parse(parameters), + bigtableConf + ) + } else if (onlineStore == "hbase"){ + val hbaseConf = new Configuration() + hbaseConf.set("hbase.zookeeper.quorum", sqlContext.getConf(ZOOKEEPER_QUOROM_KEY)) + hbaseConf.set("hbase.zookeeper.property.clientPort", sqlContext.getConf(ZOOKEEPER_PORT_KEY)) + rel = new HbaseSinkRelation( sqlContext, new AvroSerializer, SparkBigtableConfig.parse(parameters), - bigtableConf + hbaseConf ) + } else { + throw new UnsupportedOperationException(s"Unsupported online store: $onlineStore") + } rel.createTable() rel.saveWriteSchema(data) rel.insert(data, overwrite = false) @@ -79,4 +95,7 @@ object DefaultSource { private val THROTTLING_THRESHOLD_MILLIS_KEY = "spark.bigtable.throttlingThresholdMs" private val MAX_ROW_COUNT_KEY = "spark.bigtable.maxRowCount" private val MAX_INFLIGHT_KEY = "spark.bigtable.maxInflightRpcs" + + private val ZOOKEEPER_QUOROM_KEY = "spark.hbase.zookeeper.quorum" + private val ZOOKEEPER_PORT_KEY = "spark.hbase.zookeeper.port" } diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala new file mode 100644 index 0000000..f7596ea --- /dev/null +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala @@ -0,0 +1,17 @@ +package dev.caraml.spark.stores.bigtable + +import dev.caraml.spark.serialization.Serializer +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory} +import org.apache.spark.sql.SQLContext + +class HbaseSinkRelation( + sqlContext: SQLContext, + serializer: Serializer, + config: SparkBigtableConfig, + hadoopConfig: Configuration + ) extends BigTableSinkRelation(sqlContext, serializer, config, hadoopConfig) { + override def getConnection(hadoopConfig: Configuration): Connection = { + ConnectionFactory.createConnection(hadoopConfig) + } +} \ No newline at end of file From 2528ffe9e19be569793a8011861a3a17eeac7750 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Mon, 2 Sep 2024 15:06:05 +0800 Subject: [PATCH 02/13] Fix connection issues --- .../caraml/spark/stores/bigtable/BigTableSinkRelation.scala | 2 +- .../scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala index a60668b..62d7ab2 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala @@ -119,7 +119,7 @@ class BigTableSinkRelation( val qualifier = "avro".getBytes put.addColumn(metadataColumnFamily.getBytes, qualifier, schema.asInstanceOf[String].getBytes) - val btConn = BigtableConfiguration.connect(hadoopConfig) + val btConn = getConnection(hadoopConfig) try { val table = btConn.getTable(TableName.valueOf(tableName)) table.checkAndPut( diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala index ebaf678..2dea536 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala @@ -23,8 +23,9 @@ class DefaultSource extends CreatableRelationProvider { parameters: Map[String, String], data: DataFrame ): BaseRelation = { - val onlineStore = parameters.getOrElse("onlineStore", "bigtable") + val onlineStore = parameters.getOrElse("online_store", "bigtable") var rel: BigTableSinkRelation = null + println(s"onlineStore: $onlineStore") if (onlineStore == "bigtable") { val bigtableConf = BigtableConfiguration.configure( sqlContext.getConf(PROJECT_KEY), From 773eac10a9670766c6d66716b663f83f313db6f7 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Mon, 2 Sep 2024 15:51:49 +0800 Subject: [PATCH 03/13] Fix linting --- .../dev/caraml/spark/BatchPipeline.scala | 6 +++--- .../dev/caraml/spark/IngestionJobConfig.scala | 2 +- .../spark/stores/bigtable/DefaultSource.scala | 19 +++++++++---------- .../stores/bigtable/HbaseSinkRelation.scala | 12 ++++++------ 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/BatchPipeline.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/BatchPipeline.scala index d2757f7..733d7d2 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/BatchPipeline.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/BatchPipeline.scala @@ -67,16 +67,16 @@ object BatchPipeline extends BasePipeline { .filter(rowValidator.allChecks) val onlineStore = config.store match { - case _: RedisConfig => "redis" + case _: RedisConfig => "redis" case _: BigTableConfig => "bigtable" - case _: HBaseConfig => "hbase" + case _: HBaseConfig => "hbase" } validRows.write .format(config.store match { case _: RedisConfig => "dev.caraml.spark.stores.redis" case _: BigTableConfig => "dev.caraml.spark.stores.bigtable" - case _: HBaseConfig => "dev.caraml.spark.stores.bigtable" + case _: HBaseConfig => "dev.caraml.spark.stores.bigtable" }) .option("online_store", onlineStore) .option("entity_columns", featureTable.entities.map(_.name).mkString(",")) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala index c69c64a..cae6053 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala @@ -26,7 +26,7 @@ case class RedisWriteProperties( enableRateLimit: Boolean = false, ratePerSecondLimit: Int = 50000 ) -case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig +case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig case class HBaseConfig(zookeeperQuorum: String, zookeeperPort: Int) extends StoreConfig sealed trait MetricConfig diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala index 2dea536..5838aab 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala @@ -23,7 +23,7 @@ class DefaultSource extends CreatableRelationProvider { parameters: Map[String, String], data: DataFrame ): BaseRelation = { - val onlineStore = parameters.getOrElse("online_store", "bigtable") + val onlineStore = parameters.getOrElse("online_store", "bigtable") var rel: BigTableSinkRelation = null println(s"onlineStore: $onlineStore") if (onlineStore == "bigtable") { @@ -41,14 +41,13 @@ class DefaultSource extends CreatableRelationProvider { configureBigTableClient(bigtableConf, sqlContext) - rel = - new BigTableSinkRelation( - sqlContext, - new AvroSerializer, - SparkBigtableConfig.parse(parameters), - bigtableConf - ) - } else if (onlineStore == "hbase"){ + rel = new BigTableSinkRelation( + sqlContext, + new AvroSerializer, + SparkBigtableConfig.parse(parameters), + bigtableConf + ) + } else if (onlineStore == "hbase") { val hbaseConf = new Configuration() hbaseConf.set("hbase.zookeeper.quorum", sqlContext.getConf(ZOOKEEPER_QUOROM_KEY)) hbaseConf.set("hbase.zookeeper.property.clientPort", sqlContext.getConf(ZOOKEEPER_PORT_KEY)) @@ -98,5 +97,5 @@ object DefaultSource { private val MAX_INFLIGHT_KEY = "spark.bigtable.maxInflightRpcs" private val ZOOKEEPER_QUOROM_KEY = "spark.hbase.zookeeper.quorum" - private val ZOOKEEPER_PORT_KEY = "spark.hbase.zookeeper.port" + private val ZOOKEEPER_PORT_KEY = "spark.hbase.zookeeper.port" } diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala index f7596ea..6ebf53b 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala @@ -6,12 +6,12 @@ import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory} import org.apache.spark.sql.SQLContext class HbaseSinkRelation( - sqlContext: SQLContext, - serializer: Serializer, - config: SparkBigtableConfig, - hadoopConfig: Configuration - ) extends BigTableSinkRelation(sqlContext, serializer, config, hadoopConfig) { + sqlContext: SQLContext, + serializer: Serializer, + config: SparkBigtableConfig, + hadoopConfig: Configuration +) extends BigTableSinkRelation(sqlContext, serializer, config, hadoopConfig) { override def getConnection(hadoopConfig: Configuration): Connection = { ConnectionFactory.createConnection(hadoopConfig) } -} \ No newline at end of file +} From fd3030ab2a88a9f83d1fdfc5bdb8e1a62574705e Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Thu, 5 Sep 2024 10:58:53 +0800 Subject: [PATCH 04/13] Add configuration for hbase api --- .../store/bigtable/HBaseStoreConfig.java | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java new file mode 100644 index 0000000..83630f1 --- /dev/null +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java @@ -0,0 +1,40 @@ +package dev.caraml.serving.store.bigtable; + +import dev.caraml.serving.store.OnlineRetriever; +import lombok.Getter; +import lombok.Setter; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.io.IOException; + +@Configuration +@ConfigurationProperties(prefix = "caraml.store.hbase") +@ConditionalOnProperty(prefix = "caraml.store", name = "active", havingValue = "hbase") +@Getter +@Setter +public class HBaseStoreConfig { + private String zookeeperQuorum; + private String zookeeperClientPort; + + @Bean + public OnlineRetriever getRetriever() { + org.apache.hadoop.conf.Configuration conf; + conf = HBaseConfiguration.create(); + conf.set("hbase.zookeeper.quorum", zookeeperQuorum); + conf.set("hbase.zookeeper.property.clientPort", zookeeperClientPort); + Connection connection; + try{ + connection = ConnectionFactory.createConnection(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return new HBaseOnlineRetriever(connection); + } +} From cdfad3dca834fca499ac1f42d85a6a1de5e7fc31 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Thu, 5 Sep 2024 15:29:45 +0800 Subject: [PATCH 05/13] Set platform to linux/amd64 --- caraml-store-spark/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/caraml-store-spark/docker/Dockerfile b/caraml-store-spark/docker/Dockerfile index 710d9b8..bb2c48b 100644 --- a/caraml-store-spark/docker/Dockerfile +++ b/caraml-store-spark/docker/Dockerfile @@ -1,4 +1,4 @@ -FROM apache/spark-py:v3.1.3 +FROM --platform=linux/amd64 apache/spark-py:v3.1.3 ARG GCS_CONNECTOR_VERSION=2.2.5 ARG BQ_CONNECTOR_VERSION=0.27.1 From 73dfd8ede974a3ce3f2bbef3af5cdc26b48c8b13 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Thu, 5 Sep 2024 15:30:30 +0800 Subject: [PATCH 06/13] Update application.yaml to include hbase --- .../src/main/resources/application.yaml | 74 ++++++++++--------- 1 file changed, 39 insertions(+), 35 deletions(-) diff --git a/caraml-store-serving/src/main/resources/application.yaml b/caraml-store-serving/src/main/resources/application.yaml index b963028..969391f 100644 --- a/caraml-store-serving/src/main/resources/application.yaml +++ b/caraml-store-serving/src/main/resources/application.yaml @@ -33,41 +33,41 @@ caraml: maxExpectedCount: 150 store: - # Active store. Possible values: [redisCluster, redis, bigtable] + # Active store. Possible values: [hbase, redis, bigtable] active: redis - - redis: - host: localhost - port: 6379 - password: "" - ssl: false - - redisCluster: - # Connection string specifies the host:port of Redis instances in the redis cluster. - connectionString: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005" - # Password authentication. Empty string if password is not set. - password: "" - readFrom: MASTER - # Redis operation timeout in ISO-8601 format - timeout: PT0.5S -# # Uncomment to customize netty behaviour -# tcp: -# # Epoll Channel Option: TCP_KEEPIDLE -# keepIdle: 15 -# # Epoll Channel Option: TCP_KEEPINTVL -# keepInterval: 5 -# # Epoll Channel Option: TCP_KEEPCNT -# keepConnection: 3 -# # Epoll Channel Option: TCP_USER_TIMEOUT -# userConnection: 60000 -# # Uncomment to customize redis cluster topology refresh config -# topologyRefresh: -# # enable adaptive topology refresh from all triggers : MOVED_REDIRECT, ASK_REDIRECT, PERSISTENT_RECONNECTS, UNKNOWN_NODE (since 5.1), and UNCOVERED_SLOT (since 5.2) (see also reconnect attempts for the reconnect trigger) -# enableAllAdaptiveTriggerRefresh: true -# # enable periodic refresh -# enablePeriodicRefresh: false -# # topology refresh period in seconds -# refreshPeriodSecond: 30 + # + # redis: + # host: localhost + # port: 6379 + # password: "" + # ssl: false + # + # redisCluster: + # # Connection string specifies the host:port of Redis instances in the redis cluster. + # connectionString: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005" + # # Password authentication. Empty string if password is not set. + # password: "" + # readFrom: MASTER + # # Redis operation timeout in ISO-8601 format + # timeout: PT0.5S + # # Uncomment to customize netty behaviour + # tcp: + # # Epoll Channel Option: TCP_KEEPIDLE + # keepIdle: 15 + # # Epoll Channel Option: TCP_KEEPINTVL + # keepInterval: 5 + # # Epoll Channel Option: TCP_KEEPCNT + # keepConnection: 3 + # # Epoll Channel Option: TCP_USER_TIMEOUT + # userConnection: 60000 + # # Uncomment to customize redis cluster topology refresh config + # topologyRefresh: + # # enable adaptive topology refresh from all triggers : MOVED_REDIRECT, ASK_REDIRECT, PERSISTENT_RECONNECTS, UNKNOWN_NODE (since 5.1), and UNCOVERED_SLOT (since 5.2) (see also reconnect attempts for the reconnect trigger) + # enableAllAdaptiveTriggerRefresh: true + # # enable periodic refresh + # enablePeriodicRefresh: false + # # topology refresh period in seconds + # refreshPeriodSecond: 30 bigtable: projectId: gcp-project-name @@ -78,6 +78,10 @@ caraml: timeoutMs: 0 isUsingHBaseSDK: true + hbase: + zookeeperQuorum: 127.0.0.1 + zookeeperClientPort: 2181 + grpc: server: port: 6566 @@ -96,4 +100,4 @@ spring: logging: level: - root: "info" \ No newline at end of file + root: "info" From acd54c2d48b8bbb71a998160698f611e616fe2e6 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Thu, 5 Sep 2024 16:59:49 +0800 Subject: [PATCH 07/13] Refator BigTableSinkRelation to use updated classes Use of deprecated classes result in ImmutableHTableDescriptor returned which throws java.lang.UnsupportedOperationException: HTableDescriptor is read-only error --- .../bigtable/BigTableSinkRelation.scala | 48 ++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala index 62d7ab2..687037b 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala @@ -4,7 +4,7 @@ import com.google.cloud.bigtable.hbase.BigtableConfiguration import dev.caraml.spark.serialization.Serializer import dev.caraml.spark.utils.StringUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.client.{Admin, Connection, Put} +import org.apache.hadoop.hbase.client.{Admin, ColumnFamilyDescriptorBuilder, Connection, Put, TableDescriptorBuilder} import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName} import org.apache.hadoop.mapred.JobConf @@ -40,36 +40,50 @@ class BigTableSinkRelation( val admin = btConn.getAdmin val table = if (!admin.isTableAvailable(TableName.valueOf(tableName))) { - val t = new HTableDescriptor(TableName.valueOf(tableName)) - val metadataCF = new HColumnDescriptor(metadataColumnFamily) - t.addFamily(metadataCF) - t + val tableBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)) + val cf = ColumnFamilyDescriptorBuilder.of(metadataColumnFamily) + tableBuilder.setColumnFamily(cf) + val table = tableBuilder.build() + table +// val t = new HTableDescriptor(TableName.valueOf(tableName)) +// val metadataCF = new HColumnDescriptor(metadataColumnFamily) +// t.addFamily(metadataCF) +// t } else { - admin.getTableDescriptor(TableName.valueOf(tableName)) +// val t = admin.getTableDescriptor(TableName.valueOf(tableName)) + val t = btConn.getTable(TableName.valueOf(tableName)) + t.getDescriptor() } - val featuresCF = new HColumnDescriptor(config.namespace) - if (config.maxAge > 0) { - featuresCF.setTimeToLive(config.maxAge.toInt) +// val featuresCF = new HColumnDescriptor(config.namespace) +// if (config.maxAge > 0) { +// featuresCF.setTimeToLive(config.maxAge.toInt) +// } +// featuresCF.setMaxVersions(1) + val featuresCFBuilder = ColumnFamilyDescriptorBuilder.newBuilder(config.namespace.getBytes) + if (config.maxAge > 0){ + featuresCFBuilder.setTimeToLive(config.maxAge.toInt) } + featuresCFBuilder.setMaxVersions(1) + val featuresCF = featuresCFBuilder.build() - featuresCF.setMaxVersions(1) - + println("config.namespaces: ", config.namespace) + val tdb = TableDescriptorBuilder.newBuilder(table) if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) { - table.addFamily(featuresCF) - +// table.addFamily(featuresCF) + tdb.setColumnFamily(featuresCF) if (!admin.isTableAvailable(table.getTableName)) { - admin.createTable(table) + admin.createTable(tdb.build()) } else { - admin.modifyTable(table) + admin.modifyTable(tdb.build()) } } else if ( config.maxAge > 0 && table .getColumnFamily(config.namespace.getBytes) .getTimeToLive != featuresCF.getTimeToLive ) { - table.modifyFamily(featuresCF) - admin.modifyTable(table) + tdb.modifyColumnFamily(featuresCF) + admin.modifyTable(tdb.build()) } } finally { btConn.close() From f685315f4fb3ea287f49a89760dc4156e0667387 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Fri, 6 Sep 2024 22:43:19 +0800 Subject: [PATCH 08/13] Fix issue due to difference in bigtable and hbase response * Use offset and length to get rowCell values because hbase server returns slightly different response structure than bigtable * This is also applied when looking up the avro schema --- .../store/bigtable/HBaseOnlineRetriever.java | 26 ++++++++++++++++--- .../store/bigtable/HBaseSchemaRegistry.java | 10 ++++++- .../bigtable/BigTableSinkRelation.scala | 5 ++-- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index a52db32..a9ff0a8 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -6,6 +6,7 @@ import dev.caraml.serving.store.Feature; import dev.caraml.store.protobuf.serving.ServingServiceProto; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.*; import java.util.stream.Collectors; import org.apache.avro.AvroRuntimeException; @@ -57,13 +58,31 @@ public List> convertRowToFeature( return featureReferences.stream() .map(ServingServiceProto.FeatureReference::getFeatureTable) .distinct() - .map(cf -> row.getColumnCells(cf.getBytes(), null)) + .map(cf -> { + List rowCells = row.getColumnCells(cf.getBytes(), null); + System.out.println("Column Family: " + cf); + System.out.println("Row Cells: " + rowCells); + return rowCells; + }) +// .map(cf -> row.getColumnCells(cf.getBytes(), null)) .filter(ls -> !ls.isEmpty()) .flatMap( rowCells -> { Cell rowCell = rowCells.get(0); // Latest cell - String family = Bytes.toString(rowCell.getFamilyArray()); - ByteString value = ByteString.copyFrom(rowCell.getValueArray()); +// String family = Bytes.toString(rowCell.getFamilyArray()); +// System.out.println("rowCell: " + rowCell.toString()); +// ByteString value = ByteString.copyFrom(rowCell.getValueArray()); +// System.out.println("value: " + value); + ByteBuffer valueBuffer = ByteBuffer.wrap(rowCell.getValueArray()) + .position(rowCell.getValueOffset()) + .limit(rowCell.getValueOffset() + rowCell.getValueLength()) + .slice(); + ByteBuffer familyBuffer = ByteBuffer.wrap(rowCell.getFamilyArray()) + .position(rowCell.getFamilyOffset()) + .limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength()) + .slice(); + String family = ByteString.copyFrom(familyBuffer).toStringUtf8(); + ByteString value = ByteString.copyFrom(valueBuffer); List features; List localFeatureReferences = @@ -118,6 +137,7 @@ public Map getFeaturesFromSSTable( .filter(row -> !row.isEmpty()) .forEach(row -> result.put(ByteString.copyFrom(row.getRow()), row)); + return result; } catch (IOException e) { throw new RuntimeException(e); diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java index 7802f19..f9ed029 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java @@ -5,6 +5,7 @@ import com.google.common.cache.LoadingCache; import com.google.protobuf.ByteString; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; @@ -94,7 +95,14 @@ private GenericDatumReader loadReader(SchemaReference reference) Result result = table.get(query); Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes()); - Schema schema = new Schema.Parser().parse(Bytes.toString(last.getValueArray())); + if (last == null) { + throw new RuntimeException("Schema not found"); + } + ByteBuffer schemaBuffer = ByteBuffer.wrap(last.getValueArray()) + .position(last.getValueOffset()) + .limit(last.getValueOffset() + last.getValueLength()) + .slice(); + Schema schema = new Schema.Parser().parse(ByteString.copyFrom(schemaBuffer).toStringUtf8()); return new GenericDatumReader<>(schema); } catch (IOException e) { throw new RuntimeException(e); diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala index 687037b..d9a01f7 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala @@ -72,10 +72,11 @@ class BigTableSinkRelation( if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) { // table.addFamily(featuresCF) tdb.setColumnFamily(featuresCF) + val t = tdb.build() if (!admin.isTableAvailable(table.getTableName)) { - admin.createTable(tdb.build()) + admin.createTable(t) } else { - admin.modifyTable(tdb.build()) + admin.modifyTable(t) } } else if ( config.maxAge > 0 && table From 0054a146ac894226955f8ba198fb23b9af27ce78 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Fri, 6 Sep 2024 22:49:23 +0800 Subject: [PATCH 09/13] Fix linting --- .../store/bigtable/HBaseOnlineRetriever.java | 48 ++++++++++--------- .../store/bigtable/HBaseSchemaRegistry.java | 12 ++--- .../store/bigtable/HBaseStoreConfig.java | 35 +++++++------- .../bigtable/BigTableSinkRelation.scala | 12 +++-- 4 files changed, 58 insertions(+), 49 deletions(-) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index a9ff0a8..847c42d 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -17,7 +17,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; public class HBaseOnlineRetriever implements SSTableOnlineRetriever { private final Connection client; @@ -58,31 +57,37 @@ public List> convertRowToFeature( return featureReferences.stream() .map(ServingServiceProto.FeatureReference::getFeatureTable) .distinct() - .map(cf -> { - List rowCells = row.getColumnCells(cf.getBytes(), null); - System.out.println("Column Family: " + cf); - System.out.println("Row Cells: " + rowCells); - return rowCells; + .map( + cf -> { + List rowCells = row.getColumnCells(cf.getBytes(), null); + System.out.println("Column Family: " + cf); + System.out.println("Row Cells: " + rowCells); + return rowCells; }) -// .map(cf -> row.getColumnCells(cf.getBytes(), null)) + // .map(cf -> row.getColumnCells(cf.getBytes(), null)) .filter(ls -> !ls.isEmpty()) .flatMap( rowCells -> { Cell rowCell = rowCells.get(0); // Latest cell -// String family = Bytes.toString(rowCell.getFamilyArray()); -// System.out.println("rowCell: " + rowCell.toString()); -// ByteString value = ByteString.copyFrom(rowCell.getValueArray()); -// System.out.println("value: " + value); - ByteBuffer valueBuffer = ByteBuffer.wrap(rowCell.getValueArray()) - .position(rowCell.getValueOffset()) - .limit(rowCell.getValueOffset() + rowCell.getValueLength()) - .slice(); - ByteBuffer familyBuffer = ByteBuffer.wrap(rowCell.getFamilyArray()) - .position(rowCell.getFamilyOffset()) - .limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength()) - .slice(); - String family = ByteString.copyFrom(familyBuffer).toStringUtf8(); - ByteString value = ByteString.copyFrom(valueBuffer); + // String family = + // Bytes.toString(rowCell.getFamilyArray()); + // System.out.println("rowCell: " + + // rowCell.toString()); + // ByteString value = + // ByteString.copyFrom(rowCell.getValueArray()); + // System.out.println("value: " + value); + ByteBuffer valueBuffer = + ByteBuffer.wrap(rowCell.getValueArray()) + .position(rowCell.getValueOffset()) + .limit(rowCell.getValueOffset() + rowCell.getValueLength()) + .slice(); + ByteBuffer familyBuffer = + ByteBuffer.wrap(rowCell.getFamilyArray()) + .position(rowCell.getFamilyOffset()) + .limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength()) + .slice(); + String family = ByteString.copyFrom(familyBuffer).toStringUtf8(); + ByteString value = ByteString.copyFrom(valueBuffer); List features; List localFeatureReferences = @@ -137,7 +142,6 @@ public Map getFeaturesFromSSTable( .filter(row -> !row.isEmpty()) .forEach(row -> result.put(ByteString.copyFrom(row.getRow()), row)); - return result; } catch (IOException e) { throw new RuntimeException(e); diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java index f9ed029..3af2511 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java @@ -16,7 +16,6 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; public class HBaseSchemaRegistry { private final Connection hbaseClient; @@ -96,12 +95,13 @@ private GenericDatumReader loadReader(SchemaReference reference) Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes()); if (last == null) { - throw new RuntimeException("Schema not found"); + throw new RuntimeException("Schema not found"); } - ByteBuffer schemaBuffer = ByteBuffer.wrap(last.getValueArray()) - .position(last.getValueOffset()) - .limit(last.getValueOffset() + last.getValueLength()) - .slice(); + ByteBuffer schemaBuffer = + ByteBuffer.wrap(last.getValueArray()) + .position(last.getValueOffset()) + .limit(last.getValueOffset() + last.getValueLength()) + .slice(); Schema schema = new Schema.Parser().parse(ByteString.copyFrom(schemaBuffer).toStringUtf8()); return new GenericDatumReader<>(schema); } catch (IOException e) { diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java index 83630f1..d36203c 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java @@ -1,6 +1,7 @@ package dev.caraml.serving.store.bigtable; import dev.caraml.serving.store.OnlineRetriever; +import java.io.IOException; import lombok.Getter; import lombok.Setter; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -11,30 +12,28 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.io.IOException; - @Configuration @ConfigurationProperties(prefix = "caraml.store.hbase") @ConditionalOnProperty(prefix = "caraml.store", name = "active", havingValue = "hbase") @Getter @Setter public class HBaseStoreConfig { - private String zookeeperQuorum; - private String zookeeperClientPort; + private String zookeeperQuorum; + private String zookeeperClientPort; - @Bean - public OnlineRetriever getRetriever() { - org.apache.hadoop.conf.Configuration conf; - conf = HBaseConfiguration.create(); - conf.set("hbase.zookeeper.quorum", zookeeperQuorum); - conf.set("hbase.zookeeper.property.clientPort", zookeeperClientPort); - Connection connection; - try{ - connection = ConnectionFactory.createConnection(conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - - return new HBaseOnlineRetriever(connection); + @Bean + public OnlineRetriever getRetriever() { + org.apache.hadoop.conf.Configuration conf; + conf = HBaseConfiguration.create(); + conf.set("hbase.zookeeper.quorum", zookeeperQuorum); + conf.set("hbase.zookeeper.property.clientPort", zookeeperClientPort); + Connection connection; + try { + connection = ConnectionFactory.createConnection(conf); + } catch (IOException e) { + throw new RuntimeException(e); } + + return new HBaseOnlineRetriever(connection); + } } diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala index d9a01f7..90d1a67 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala @@ -4,7 +4,13 @@ import com.google.cloud.bigtable.hbase.BigtableConfiguration import dev.caraml.spark.serialization.Serializer import dev.caraml.spark.utils.StringUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.client.{Admin, ColumnFamilyDescriptorBuilder, Connection, Put, TableDescriptorBuilder} +import org.apache.hadoop.hbase.client.{ + Admin, + ColumnFamilyDescriptorBuilder, + Connection, + Put, + TableDescriptorBuilder +} import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName} import org.apache.hadoop.mapred.JobConf @@ -41,7 +47,7 @@ class BigTableSinkRelation( val table = if (!admin.isTableAvailable(TableName.valueOf(tableName))) { val tableBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)) - val cf = ColumnFamilyDescriptorBuilder.of(metadataColumnFamily) + val cf = ColumnFamilyDescriptorBuilder.of(metadataColumnFamily) tableBuilder.setColumnFamily(cf) val table = tableBuilder.build() table @@ -61,7 +67,7 @@ class BigTableSinkRelation( // } // featuresCF.setMaxVersions(1) val featuresCFBuilder = ColumnFamilyDescriptorBuilder.newBuilder(config.namespace.getBytes) - if (config.maxAge > 0){ + if (config.maxAge > 0) { featuresCFBuilder.setTimeToLive(config.maxAge.toInt) } featuresCFBuilder.setMaxVersions(1) From 38f7bc736f8a034e7502f0f374651ef75b240bc0 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Tue, 17 Sep 2024 15:51:09 +0800 Subject: [PATCH 10/13] Clean up comments --- .../store/bigtable/HBaseOnlineRetriever.java | 16 +--------------- .../stores/bigtable/BigTableSinkRelation.scala | 14 +------------- 2 files changed, 2 insertions(+), 28 deletions(-) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index 847c42d..145a901 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -57,25 +57,11 @@ public List> convertRowToFeature( return featureReferences.stream() .map(ServingServiceProto.FeatureReference::getFeatureTable) .distinct() - .map( - cf -> { - List rowCells = row.getColumnCells(cf.getBytes(), null); - System.out.println("Column Family: " + cf); - System.out.println("Row Cells: " + rowCells); - return rowCells; - }) - // .map(cf -> row.getColumnCells(cf.getBytes(), null)) + .map(cf -> row.getColumnCells(cf.getBytes(), null)) .filter(ls -> !ls.isEmpty()) .flatMap( rowCells -> { Cell rowCell = rowCells.get(0); // Latest cell - // String family = - // Bytes.toString(rowCell.getFamilyArray()); - // System.out.println("rowCell: " + - // rowCell.toString()); - // ByteString value = - // ByteString.copyFrom(rowCell.getValueArray()); - // System.out.println("value: " + value); ByteBuffer valueBuffer = ByteBuffer.wrap(rowCell.getValueArray()) .position(rowCell.getValueOffset()) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala index 90d1a67..36be800 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala @@ -51,21 +51,10 @@ class BigTableSinkRelation( tableBuilder.setColumnFamily(cf) val table = tableBuilder.build() table -// val t = new HTableDescriptor(TableName.valueOf(tableName)) -// val metadataCF = new HColumnDescriptor(metadataColumnFamily) -// t.addFamily(metadataCF) -// t } else { -// val t = admin.getTableDescriptor(TableName.valueOf(tableName)) val t = btConn.getTable(TableName.valueOf(tableName)) t.getDescriptor() } - -// val featuresCF = new HColumnDescriptor(config.namespace) -// if (config.maxAge > 0) { -// featuresCF.setTimeToLive(config.maxAge.toInt) -// } -// featuresCF.setMaxVersions(1) val featuresCFBuilder = ColumnFamilyDescriptorBuilder.newBuilder(config.namespace.getBytes) if (config.maxAge > 0) { featuresCFBuilder.setTimeToLive(config.maxAge.toInt) @@ -73,10 +62,9 @@ class BigTableSinkRelation( featuresCFBuilder.setMaxVersions(1) val featuresCF = featuresCFBuilder.build() - println("config.namespaces: ", config.namespace) + // TODO: Set compression type for column family val tdb = TableDescriptorBuilder.newBuilder(table) if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) { -// table.addFamily(featuresCF) tdb.setColumnFamily(featuresCF) val t = tdb.build() if (!admin.isTableAvailable(table.getTableName)) { From 691c8d13af3cab9bcb9baba20816b9dc8e1bc842 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Tue, 17 Sep 2024 15:54:47 +0800 Subject: [PATCH 11/13] Fix application yaml --- caraml-store-serving/src/main/resources/application.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/caraml-store-serving/src/main/resources/application.yaml b/caraml-store-serving/src/main/resources/application.yaml index 969391f..e15d3a4 100644 --- a/caraml-store-serving/src/main/resources/application.yaml +++ b/caraml-store-serving/src/main/resources/application.yaml @@ -33,7 +33,7 @@ caraml: maxExpectedCount: 150 store: - # Active store. Possible values: [hbase, redis, bigtable] + # Active store. Possible values: [redisCluster, redis, bigtable, hbase] active: redis # # redis: From a0e44a38b5f32a0087d504476d21f27fd64b03f1 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Tue, 17 Sep 2024 17:09:45 +0800 Subject: [PATCH 12/13] Add option for hbase for stream ingestion jobs --- .../main/scala/dev/caraml/spark/StreamingPipeline.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/StreamingPipeline.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/StreamingPipeline.scala index 1620705..1fef66d 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/StreamingPipeline.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/StreamingPipeline.scala @@ -77,6 +77,12 @@ object StreamingPipeline extends BasePipeline with Serializable { case _ => Array() } + val onlineStore = config.store match { + case _: RedisConfig => "redis" + case _: BigTableConfig => "bigtable" + case _: HBaseConfig => "hbase" + } + val parsed = input .withColumn("features", featureStruct) .select(metadata :+ col("features.*"): _*) @@ -100,6 +106,7 @@ object StreamingPipeline extends BasePipeline with Serializable { val metadataColName: Array[String] = metadata.map(_.toString) + rowsAfterValidation .map(metrics.incrementRead) .filter(if (config.doNotIngestInvalidRows) expr("_isValid") else rowValidator.allChecks) @@ -108,7 +115,9 @@ object StreamingPipeline extends BasePipeline with Serializable { .format(config.store match { case _: RedisConfig => "dev.caraml.spark.stores.redis" case _: BigTableConfig => "dev.caraml.spark.stores.bigtable" + case _: HBaseConfig => "dev.caraml.spark.stores.bigtable" }) + .option("online_store", onlineStore) .option("entity_columns", featureTable.entities.map(_.name).mkString(",")) .option("namespace", featureTable.name) .option("project_name", featureTable.project) From fa28a59f3e732fbab6246c56d1ede461d57a4d7d Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Wed, 18 Sep 2024 11:10:37 +0800 Subject: [PATCH 13/13] Fix linting --- .../src/main/scala/dev/caraml/spark/StreamingPipeline.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/StreamingPipeline.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/StreamingPipeline.scala index 1fef66d..fedae24 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/StreamingPipeline.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/StreamingPipeline.scala @@ -106,7 +106,6 @@ object StreamingPipeline extends BasePipeline with Serializable { val metadataColName: Array[String] = metadata.map(_.toString) - rowsAfterValidation .map(metrics.incrementRead) .filter(if (config.doNotIngestInvalidRows) expr("_isValid") else rowValidator.allChecks)