Skip to content

Commit

Permalink
Merge pull request #126 from caraml-dev/hbase-poc
Browse files Browse the repository at this point in the history
[feat] Add support to use hbase online store
  • Loading branch information
shydefoo authored Sep 20, 2024
2 parents 350783e + fa28a59 commit 7292b48
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 69 deletions.
74 changes: 39 additions & 35 deletions caraml-store-serving/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,41 +33,41 @@ caraml:
maxExpectedCount: 150

store:
# Active store. Possible values: [redisCluster, redis, bigtable]
# Active store. Possible values: [redisCluster, redis, bigtable, hbase]
active: redis

redis:
host: localhost
port: 6379
password: ""
ssl: false

redisCluster:
# Connection string specifies the host:port of Redis instances in the redis cluster.
connectionString: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005"
# Password authentication. Empty string if password is not set.
password: ""
readFrom: MASTER
# Redis operation timeout in ISO-8601 format
timeout: PT0.5S
# # Uncomment to customize netty behaviour
# tcp:
# # Epoll Channel Option: TCP_KEEPIDLE
# keepIdle: 15
# # Epoll Channel Option: TCP_KEEPINTVL
# keepInterval: 5
# # Epoll Channel Option: TCP_KEEPCNT
# keepConnection: 3
# # Epoll Channel Option: TCP_USER_TIMEOUT
# userConnection: 60000
# # Uncomment to customize redis cluster topology refresh config
# topologyRefresh:
# # enable adaptive topology refresh from all triggers : MOVED_REDIRECT, ASK_REDIRECT, PERSISTENT_RECONNECTS, UNKNOWN_NODE (since 5.1), and UNCOVERED_SLOT (since 5.2) (see also reconnect attempts for the reconnect trigger)
# enableAllAdaptiveTriggerRefresh: true
# # enable periodic refresh
# enablePeriodicRefresh: false
# # topology refresh period in seconds
# refreshPeriodSecond: 30
#
# redis:
# host: localhost
# port: 6379
# password: ""
# ssl: false
#
# redisCluster:
# # Connection string specifies the host:port of Redis instances in the redis cluster.
# connectionString: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005"
# # Password authentication. Empty string if password is not set.
# password: ""
# readFrom: MASTER
# # Redis operation timeout in ISO-8601 format
# timeout: PT0.5S
# # Uncomment to customize netty behaviour
# tcp:
# # Epoll Channel Option: TCP_KEEPIDLE
# keepIdle: 15
# # Epoll Channel Option: TCP_KEEPINTVL
# keepInterval: 5
# # Epoll Channel Option: TCP_KEEPCNT
# keepConnection: 3
# # Epoll Channel Option: TCP_USER_TIMEOUT
# userConnection: 60000
# # Uncomment to customize redis cluster topology refresh config
# topologyRefresh:
# # enable adaptive topology refresh from all triggers : MOVED_REDIRECT, ASK_REDIRECT, PERSISTENT_RECONNECTS, UNKNOWN_NODE (since 5.1), and UNCOVERED_SLOT (since 5.2) (see also reconnect attempts for the reconnect trigger)
# enableAllAdaptiveTriggerRefresh: true
# # enable periodic refresh
# enablePeriodicRefresh: false
# # topology refresh period in seconds
# refreshPeriodSecond: 30

bigtable:
projectId: gcp-project-name
Expand All @@ -78,6 +78,10 @@ caraml:
timeoutMs: 0
isUsingHBaseSDK: true

hbase:
zookeeperQuorum: 127.0.0.1
zookeeperClientPort: 2181

grpc:
server:
port: 6566
Expand All @@ -96,4 +100,4 @@ spring:

logging:
level:
root: "info"
root: "info"
2 changes: 1 addition & 1 deletion caraml-store-spark/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM apache/spark-py:v3.1.3
FROM --platform=linux/amd64 apache/spark-py:v3.1.3

ARG GCS_CONNECTOR_VERSION=2.2.5
ARG BQ_CONNECTOR_VERSION=0.27.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ object BasePipeline {
conf
.set("spark.bigtable.projectId", projectId)
.set("spark.bigtable.instanceId", instanceId)
case HBaseConfig(zookeeperQuorum, zookeeperPort) =>
conf
.set("spark.hbase.zookeeper.quorum", zookeeperQuorum)
.set("spark.hbase.zookeeper.port", zookeeperPort.toString)
}

jobConfig.metrics match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,19 @@ object BatchPipeline extends BasePipeline {
.map(metrics.incrementRead)
.filter(rowValidator.allChecks)

val onlineStore = config.store match {
case _: RedisConfig => "redis"
case _: BigTableConfig => "bigtable"
case _: HBaseConfig => "hbase"
}

validRows.write
.format(config.store match {
case _: RedisConfig => "dev.caraml.spark.stores.redis"
case _: BigTableConfig => "dev.caraml.spark.stores.bigtable"
case _: HBaseConfig => "dev.caraml.spark.stores.bigtable"
})
.option("online_store", onlineStore)
.option("entity_columns", featureTable.entities.map(_.name).mkString(","))
.option("namespace", featureTable.name)
.option("project_name", featureTable.project)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ object IngestionJob {
opt[String](name = "bigtable")
.action((x, c) => c.copy(store = parseJSON(x).camelizeKeys.extract[BigTableConfig]))

opt[String](name = "hbase")
.action((x, c) => c.copy(store = parseJSON(x).extract[HBaseConfig]))

opt[String](name = "statsd")
.action((x, c) => c.copy(metrics = Some(parseJSON(x).extract[StatsDConfig])))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ case class RedisWriteProperties(
enableRateLimit: Boolean = false,
ratePerSecondLimit: Int = 50000
)
case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig
case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig
case class HBaseConfig(zookeeperQuorum: String, zookeeperPort: Int) extends StoreConfig

sealed trait MetricConfig

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ object StreamingPipeline extends BasePipeline with Serializable {
case _ => Array()
}

val onlineStore = config.store match {
case _: RedisConfig => "redis"
case _: BigTableConfig => "bigtable"
case _: HBaseConfig => "hbase"
}

val parsed = input
.withColumn("features", featureStruct)
.select(metadata :+ col("features.*"): _*)
Expand Down Expand Up @@ -108,7 +114,9 @@ object StreamingPipeline extends BasePipeline with Serializable {
.format(config.store match {
case _: RedisConfig => "dev.caraml.spark.stores.redis"
case _: BigTableConfig => "dev.caraml.spark.stores.bigtable"
case _: HBaseConfig => "dev.caraml.spark.stores.bigtable"
})
.option("online_store", onlineStore)
.option("entity_columns", featureTable.entities.map(_.name).mkString(","))
.option("namespace", featureTable.name)
.option("project_name", featureTable.project)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import com.google.cloud.bigtable.hbase.BigtableConfiguration
import dev.caraml.spark.serialization.Serializer
import dev.caraml.spark.utils.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.{
Admin,
ColumnFamilyDescriptorBuilder,
Connection,
Put,
TableDescriptorBuilder
}
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.mapred.JobConf
Expand All @@ -30,42 +36,49 @@ class BigTableSinkRelation(

override def schema: StructType = ???

def getConnection(hadoopConfig: Configuration): Connection = {
BigtableConfiguration.connect(hadoopConfig)
}

def createTable(): Unit = {
val btConn = BigtableConfiguration.connect(hadoopConfig)
val btConn = getConnection(hadoopConfig)
try {
val admin = btConn.getAdmin

val table = if (!admin.isTableAvailable(TableName.valueOf(tableName))) {
val t = new HTableDescriptor(TableName.valueOf(tableName))
val metadataCF = new HColumnDescriptor(metadataColumnFamily)
t.addFamily(metadataCF)
t
val tableBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
val cf = ColumnFamilyDescriptorBuilder.of(metadataColumnFamily)
tableBuilder.setColumnFamily(cf)
val table = tableBuilder.build()
table
} else {
admin.getTableDescriptor(TableName.valueOf(tableName))
val t = btConn.getTable(TableName.valueOf(tableName))
t.getDescriptor()
}

val featuresCF = new HColumnDescriptor(config.namespace)
val featuresCFBuilder = ColumnFamilyDescriptorBuilder.newBuilder(config.namespace.getBytes)
if (config.maxAge > 0) {
featuresCF.setTimeToLive(config.maxAge.toInt)
featuresCFBuilder.setTimeToLive(config.maxAge.toInt)
}
featuresCFBuilder.setMaxVersions(1)
val featuresCF = featuresCFBuilder.build()

featuresCF.setMaxVersions(1)

// TODO: Set compression type for column family
val tdb = TableDescriptorBuilder.newBuilder(table)
if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) {
table.addFamily(featuresCF)

tdb.setColumnFamily(featuresCF)
val t = tdb.build()
if (!admin.isTableAvailable(table.getTableName)) {
admin.createTable(table)
admin.createTable(t)
} else {
admin.modifyTable(table)
admin.modifyTable(t)
}
} else if (
config.maxAge > 0 && table
.getColumnFamily(config.namespace.getBytes)
.getTimeToLive != featuresCF.getTimeToLive
) {
table.modifyFamily(featuresCF)
admin.modifyTable(table)
tdb.modifyColumnFamily(featuresCF)
admin.modifyTable(tdb.build())
}
} finally {
btConn.close()
Expand Down Expand Up @@ -115,7 +128,7 @@ class BigTableSinkRelation(
val qualifier = "avro".getBytes
put.addColumn(metadataColumnFamily.getBytes, qualifier, schema.asInstanceOf[String].getBytes)

val btConn = BigtableConfiguration.connect(hadoopConfig)
val btConn = getConnection(hadoopConfig)
try {
val table = btConn.getTable(TableName.valueOf(tableName))
table.checkAndPut(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,43 @@ class DefaultSource extends CreatableRelationProvider {
parameters: Map[String, String],
data: DataFrame
): BaseRelation = {
val bigtableConf = BigtableConfiguration.configure(
sqlContext.getConf(PROJECT_KEY),
sqlContext.getConf(INSTANCE_KEY)
)

if (sqlContext.getConf("spark.bigtable.emulatorHost", "").nonEmpty) {
bigtableConf.set(
BIGTABLE_EMULATOR_HOST_KEY,
sqlContext.getConf("spark.bigtable.emulatorHost")
val onlineStore = parameters.getOrElse("online_store", "bigtable")
var rel: BigTableSinkRelation = null
println(s"onlineStore: $onlineStore")
if (onlineStore == "bigtable") {
val bigtableConf = BigtableConfiguration.configure(
sqlContext.getConf(PROJECT_KEY),
sqlContext.getConf(INSTANCE_KEY)
)
}

configureBigTableClient(bigtableConf, sqlContext)
if (sqlContext.getConf("spark.bigtable.emulatorHost", "").nonEmpty) {
bigtableConf.set(
BIGTABLE_EMULATOR_HOST_KEY,
sqlContext.getConf("spark.bigtable.emulatorHost")
)
}

configureBigTableClient(bigtableConf, sqlContext)

val rel =
new BigTableSinkRelation(
rel = new BigTableSinkRelation(
sqlContext,
new AvroSerializer,
SparkBigtableConfig.parse(parameters),
bigtableConf
)
} else if (onlineStore == "hbase") {
val hbaseConf = new Configuration()
hbaseConf.set("hbase.zookeeper.quorum", sqlContext.getConf(ZOOKEEPER_QUOROM_KEY))
hbaseConf.set("hbase.zookeeper.property.clientPort", sqlContext.getConf(ZOOKEEPER_PORT_KEY))
rel = new HbaseSinkRelation(
sqlContext,
new AvroSerializer,
SparkBigtableConfig.parse(parameters),
hbaseConf
)
} else {
throw new UnsupportedOperationException(s"Unsupported online store: $onlineStore")
}
rel.createTable()
rel.saveWriteSchema(data)
rel.insert(data, overwrite = false)
Expand Down Expand Up @@ -79,4 +95,7 @@ object DefaultSource {
private val THROTTLING_THRESHOLD_MILLIS_KEY = "spark.bigtable.throttlingThresholdMs"
private val MAX_ROW_COUNT_KEY = "spark.bigtable.maxRowCount"
private val MAX_INFLIGHT_KEY = "spark.bigtable.maxInflightRpcs"

private val ZOOKEEPER_QUOROM_KEY = "spark.hbase.zookeeper.quorum"
private val ZOOKEEPER_PORT_KEY = "spark.hbase.zookeeper.port"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package dev.caraml.spark.stores.bigtable

import dev.caraml.spark.serialization.Serializer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
import org.apache.spark.sql.SQLContext

class HbaseSinkRelation(
sqlContext: SQLContext,
serializer: Serializer,
config: SparkBigtableConfig,
hadoopConfig: Configuration
) extends BigTableSinkRelation(sqlContext, serializer, config, hadoopConfig) {
override def getConnection(hadoopConfig: Configuration): Connection = {
ConnectionFactory.createConnection(hadoopConfig)
}
}

0 comments on commit 7292b48

Please sign in to comment.