Skip to content

Commit

Permalink
feat!: Use spark bigtable connector instead of hbase connector (#1004)
Browse files Browse the repository at this point in the history
* Change the Hbase connector to Bigtable connector

* Update the argument name

* feat!: Use spark bigtable connector instead of hbase connector

* fix: elasticsearch to bigtable test

* fix: Provide elasticsearch to gcs jenkins test

* fix: shell format

* fix: shell format

* fix: shell script

* fix: pass spark properties directly

* fix: typo

* fix: added elasticsearch to gcs jenkins test

---------

Co-authored-by: Raj Patel <[email protected]>
  • Loading branch information
rohilla-anuj and rajc242 authored Oct 24, 2024
1 parent 0d617bd commit 3f13e3c
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 168 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ dist
pipeline.json
.ipynb_checkpoints/
.venv/
metastore_db/
metastore_db/
python/elastic.jks
python/tls.cert
39 changes: 39 additions & 0 deletions python/.ci/Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,45 @@ pipeline {
}
}
}
stage('Parallel Execution 8'){
parallel{
stage('Elasticsearch to GCS'){
steps{
retry(count: stageRetryCount) {
sh '''
python3.8 -m pip install --user virtualenv
python3.8 -m venv env
source env/bin/activate
gsutil cp gs://dataproc-templates/conf/elastic.jks /tmp/
export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp"
export SKIP_BUILD=true
export JARS="gs://dataproc-templates/jars/elasticsearch-spark-20_2.12-8.15.3.jar"
export FILES="/tmp/elastic.jks"
cd python
./bin/start.sh \
--properties=spark.driver.extraJavaOptions='-Djavax.net.ssl.trustStore=./elastic.jks -Djavax.net.ssl.trustStorePassword=changeit',spark.executor.extraJavaOptions='-Djavax.net.ssl.trustStore=./elastic.jks -Djavax.net.ssl.trustStorePassword=changeit' \
-- --template=ELASTICSEARCHTOGCS \
--es.gcs.input.node=${ELASTIC_INPUT_NODE} \
--es.gcs.input.index="books" \
--es.gcs.input.user=${ELASTIC_USER} \
--es.gcs.input.password=${ELASTIC_PASSWORD} \
--es.gcs.output.format="parquet" \
--es.gcs.output.location="gs://dataproc-templates/integration-testing/output/ELASTICSEARCHTOGCS" \
--es.gcs.output.mode="overwrite" \
--es.gcs.input.es.net.ssl="true" \
--es.gcs.input.es.net.ssl.cert.allow.self.signed="true"
'''
}
}
}
}
}
}
post {
always{
Expand Down
60 changes: 0 additions & 60 deletions python/dataproc_templates/elasticsearch/Dockerfile

This file was deleted.

96 changes: 36 additions & 60 deletions python/dataproc_templates/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ options:

```
export GCP_PROJECT=my-project
export JARS="gs://spark-lib/elasticsearch/elasticsearch-spark-30_2.12-8.11.4.jar,gs://spark-lib/bigquery/spark-3.3-bigquery-0.39.0.jar"
export JARS="gs://spark-lib/elasticsearch/elasticsearch-spark-30_2.12-8.11.4.jar"
export REGION=us-central1
export SUBNET=projects/my-project/regions/us-central1/subnetworks/test-subnet
Expand All @@ -614,43 +614,22 @@ export SUBNET=projects/my-project/regions/us-central1/subnetworks/test-subnet

Template for exporting an Elasticsearch Index to a BigTable table.

It uses the Apache HBase Spark Connector to write to Bigtable.
It uses the Spark BigTable Connector to write to Bigtable.

This [tutorial](https://cloud.google.com/dataproc/docs/tutorials/spark-hbase#dataproc_hbase_tutorial_view_code-python) shows how to run a Spark/PySpark job connecting to Bigtable.
However, it focuses in running the job using a Dataproc cluster, and not Dataproc Serverless.
Here in this template, you will notice that there are different configuration steps for the PySpark job to successfully run using Dataproc Serverless, connecting to Bigtable using the HBase interface.
Here in this template, you will notice that there are different configuration steps for the PySpark job to successfully run using Dataproc Serverless, connecting to Bigtable using Bigtable connector.

You can also check out the [differences between HBase and Cloud Bigtable](https://cloud.google.com/bigtable/docs/hbase-differences).

## Requirements

1) Configure the [hbase-site.xml](./hbase-site.xml) ([reference](https://cloud.google.com/bigtable/docs/hbase-connecting#creating_the_hbase-sitexml_file)) with your Bigtable instance reference
- The hbase-site.xml needs to be available in some path of the container image used by Dataproc Serverless.
- For that, you need to build and host a [customer container image](https://cloud.google.com/dataproc-serverless/docs/guides/custom-containers#submit_a_spark_batch_workload_using_a_custom_container_image) in GCP Container Registry.
- Add the following layer to the [Dockerfile](./Dockerfile), for it to copy your local hbase-site.xml to the container image (already done):
```
COPY hbase-site.xml /etc/hbase/conf/
```
- Build the [Dockerfile](./Dockerfile), building and pushing it to GCP Container Registry with:
```
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
IMAGE=us-central1-docker.pkg.dev/<your_project>/<repository>/<your_custom_image>:<your_version>
docker build --platform linux/amd64 -t "${IMAGE}" .
docker push "${IMAGE}"
```
- An SPARK_EXTRA_CLASSPATH environment variable should also be set to the same path when submitting the job.
```
(./bin/start.sh ...)
--container-image="us-central1-docker.pkg.dev/<your_project>/<repository>/<your_custom_image>:<your_version>" # image with hbase-site.xml in /etc/hbase/conf/
--properties='spark.dataproc.driverEnv.SPARK_EXTRA_CLASSPATH=/etc/hbase/conf/'
```
2) Configure the desired HBase catalog json to passed as an argument (table reference and schema)
- The hbase-catalog.json should be passed using the --gcs.bigtable.hbase.catalog.json
1) `export JARS="gs://spark-lib/bigtable/spark-bigtable_2.12-0.1.0.jar"` and also required `spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36` Please refer example from official [documentation](https://github.com/GoogleCloudDataproc/spark-bigtable-connector/tree/main/examples/python).

2) Configure the desired BigTable catalog json to passed as an argument (table reference and schema)
- The catalog.json should be passed using the --es.bt.catalog.json
```
(./bin/start.sh ...)
-- --gcs.bigtable.hbase.catalog.json='''{
"table":{"namespace":"default","name":"<table_id>"},
-- --es.bt.catalog.json='''{
"table":{"name":"<table_id>"},
"rowkey":"key",
"columns":{
"key":{"cf":"rowkey", "col":"key", "type":"string"},
Expand All @@ -659,7 +638,7 @@ You can also check out the [differences between HBase and Cloud Bigtable](https:
}'''
```
3) [Create and manage](https://cloud.google.com/bigtable/docs/managing-tables) your Bigtable table schema, column families, etc, to match the provided HBase catalog.
3) [Create and manage](https://cloud.google.com/bigtable/docs/managing-tables) your Bigtable table schema, column families, etc, to match the provided Bigtable catalog.
## Required JAR files
Expand All @@ -669,32 +648,24 @@ Depending upon the versions of Elasticsearch, PySpark and Scala in the environme
The template can support the Elasticsearch versions >= 7.12.0, using the appropriate Elasticsearch Spark Connector
Some HBase and Bigtable dependencies are required to be passed when submitting the job.
Spark Bigtable connector dependencies are required to be passed when submitting the job.
These dependencies need to be passed by using the --jars flag, or, in the case of Dataproc Templates, using the JARS environment variable.
Some dependencies (jars) must be downloaded from [MVN Repository](https://mvnrepository.com/) and stored your Cloud Storage bucket (create one to store the dependencies).
- **[Apache HBase Spark Connector](https://mvnrepository.com/artifact/org.apache.hbase.connectors.spark/hbase-spark) dependencies (already mounted in Dataproc Serverless, so you refer to them using file://):**
- file:///usr/lib/spark/external/hbase-spark-protocol-shaded.jar
- file:///usr/lib/spark/external/hbase-spark.jar
- **Bigtable dependency:**
- gs://<your_bucket_to_store_dependencies>/bigtable-hbase-2.x-shaded-2.3.0.jar
- Download it using ``` wget https://repo1.maven.org/maven2/com/google/cloud/bigtable/bigtable-hbase-2.x-shaded/2.3.0/bigtable-hbase-2.x-shaded-2.3.0.jar```
- **HBase dependencies:**
- gs://<your_bucket_to_store_dependencies>/hbase-client-2.4.12.jar
- Download it using ``` wget https://repo1.maven.org/maven2/org/apache/hbase/hbase-client/2.4.12/hbase-client-2.4.12.jar```
- gs://<your_bucket_to_store_dependencies>/hbase-shaded-mapreduce-2.4.12.jar
- Download it using ``` wget https://repo1.maven.org/maven2/org/apache/hbase/hbase-shaded-mapreduce/2.4.12/hbase-shaded-mapreduce-2.4.12.jar```
- **[Spark BigTable Connector](https://cloud.google.com/bigtable/docs/use-bigtable-spark-connector)**
- gs://spark-lib/bigtable/spark-bigtable_2.12-0.1.0.jar
It also requires [DeltaIO dependencies](https://docs.delta.io/latest/releases.html) to be available in the Dataproc cluster if using delta format.
## Arguments
- `es.bt.input.node`: Elasticsearch Node Uri (format: mynode:9600)
- `es.bt.input.index`: Elasticsearch Input Index Name (format: <index>/<type>)
- `es.bt.input.user`: Elasticsearch Username
- `es.bt.input.password`: Elasticsearch Password
- `es.bt.hbase.catalog.json`: HBase catalog inline json
- `spark.bigtable.project.id`: GCP project where BigTable instance is running
- `spark.bigtable.instance.id`: BigTable instance id
- `es.bt.catalog.json`: BigTable catalog inline json
#### Optional Arguments
- `es.bt.input.es.nodes.path.prefix`: Prefix to add to all requests made to Elasticsearch
- `es.bt.input.es.query`: Holds the query used for reading data from the specified Index
Expand Down Expand Up @@ -743,6 +714,8 @@ Some dependencies (jars) must be downloaded from [MVN Repository](https://mvnrep
- `es.bt.input.es.net.proxy.socks.use.system.props`: Whether use the system Socks proxy properties (namely socksProxyHost and socksProxyHost) or not (default yes)
- `es.bt.flatten.struct.fields`: Flatten the struct fields
- `es.bt.flatten.array.fields`: Flatten the n-D array fields to 1-D array fields, it needs es.bt.flatten.struct.fields option to be passed
- `spark.bigtable.create.new.table`: Set True if you want to create a BigTable table from catalog. Default value is False means table must be present.
- `spark.bigtable.batch.mutate.size`: BigTable batch mutation size. Maximum allowed value is `100000`. Default is `100`. Rererence [documentation](https://github.com/GoogleCloudDataproc/spark-bigtable-connector/blob/main/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/datasources/BigtableSparkConf.scala#L86)
## Usage
Expand All @@ -754,7 +727,9 @@ usage: main.py [-h]
--es.bt.input.index ES.BT.INPUT.INDEX
--es.bt.input.user ES.BT.INPUT.USER
--es.bt.input.password ES.BT.INPUT.PASSWORD
--es.bt.hbase.catalog.json ES.BT.HBASE.CATALOG.JSON
--spark.bigtable.project.id ES.BT.PROJECT.ID
--spark.bigtable.instance.id ES.BT.INSTANCE.ID
--es.bt.catalog.json ES.BT.CATALOG.JSON
[--es.bt.input.es.nodes.path.prefix ES.BT.INPUT.ES.NODES.PATH.PREFIX]
[--es.bt.input.es.query ES.BT.INPUT.ES.QUERY]
[--es.bt.input.es.mapping.date.rich ES.BT.INPUT.ES.MAPPING.DATE.RICH]
Expand Down Expand Up @@ -802,6 +777,8 @@ usage: main.py [-h]
[--es.bt.input.es.net.proxy.socks.use.system.props ES.BT.INPUT.ES.NET.PROXY.SOCKS.USE.SYSTEM.PROPS]
[--es.bt.flatten.struct.fields]
[--es.bt.flatten.array.fields]
[--spark.bigtable.create.new.table ES.BT.CREATE.NEW.TABLE]
[--spark.bigtable.batch.mutate.size ES.BT.BATCH.MUTATE.SIZE]


options:
Expand Down Expand Up @@ -908,8 +885,12 @@ options:
Flatten the struct fields
--es.bt.flatten.array.fields
Flatten the n-D array fields to 1-D array fields, it needs es.bt.flatten.struct.fields option to be passed
--es.bt.hbase.catalog.json ES.BT.HBASE.CATALOG.JSON
HBase catalog inline json
--spark.bigtable.project.id ES.BT.PROJECT.ID
GCP project id where BigTable instance is running
--spark.bigtable.instance.id ES.BT.INSTANCE.ID
BigTable instance id
--es.bt.catalog.json ES.BT.CATALOG.JSON
BigTable catalog inline json
```
## Example submission
Expand All @@ -918,24 +899,19 @@ options:
export GCP_PROJECT=<project_id>
export REGION=<region>
export GCS_STAGING_LOCATION=<gcs-staging-bucket-folder>
export JARS="gs://<your_bucket_to_store_dependencies>/elasticsearch-spark-30_2.12-8.11.4.jar,\
gs://<your_bucket_to_store_dependencies>/bigtable-hbase-2.x-hadoop-2.3.0.jar,\
gs://<your_bucket_to_store_dependencies>/hbase-client-2.4.12.jar,\
gs://<your_bucket_to_store_dependencies>/hbase-shaded-mapreduce-2.4.12.jar,\
file:///usr/lib/spark/external/hbase-spark-protocol-shaded.jar,\
file:///usr/lib/spark/external/hbase-spark.jar"
export SUBNET=projects/my-project/regions/us-central1/subnetworks/test-subnet
export JARS="gs://<your_bucket_to_store_dependencies>/elasticsearch-spark-30_2.12-8.11.4.jar,gs://spark-lib/bigtable/spark-bigtable_2.12-0.1.0.jar"
export SPARK_PROPERTIES="spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36"

./bin/start.sh \
--container-image="gcr.io/<your_project>/<your_custom_image>:<your_version>" \
--properties='spark.dataproc.driverEnv.SPARK_EXTRA_CLASSPATH=/etc/hbase/conf/,spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ # image with hbase-site.xml in /etc/hbase/conf/
-- --template=ELASTICSEARCHTOBIGTABLE \
--es.bt.input.node="xxxxxxxxxxxx.us-central1.gcp.cloud.es.io:9243" \
--es.bt.input.index="demo" \
--es.bt.input.user="demo" \
--es.bt.input.password="demo" \
--es.bt.hbase.catalog.json='''{
"table":{"namespace":"default","name":"my_table"},
--spark.bigtable.project.id="demo-project" \
--spark.bigtable.instance.id="bt-instance-id" \
--es.bt.catalog.json='''{
"table":{"name":"my_table"},
"rowkey":"key",
"columns":{
"key":{"cf":"rowkey", "col":"key", "type":"string"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,36 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
)
)
parser.add_argument(
f'--{constants.ES_BT_HBASE_CATALOG_JSON}',
dest=constants.ES_BT_HBASE_CATALOG_JSON,
f'--{constants.ES_BT_PROJECT_ID}',
dest=constants.ES_BT_PROJECT_ID,
required=True,
help='HBase catalog inline json'
help='BigTable project ID'
)
parser.add_argument(
f'--{constants.ES_BT_INSTANCE_ID}',
dest=constants.ES_BT_INSTANCE_ID,
required=True,
help='BigTable instance ID'
)
parser.add_argument(
f'--{constants.ES_BT_CREATE_NEW_TABLE}',
dest=constants.ES_BT_CREATE_NEW_TABLE,
required=False,
help='BigTable create new table flag. Default is false.',
default=False
)
parser.add_argument(
f'--{constants.ES_BT_BATCH_MUTATE_SIZE}',
dest=constants.ES_BT_BATCH_MUTATE_SIZE,
required=False,
help='BigTable batch mutate size. Maximum allowed size is 100000. Default is 100.',
default=100
)
parser.add_argument(
f'--{constants.ES_BT_CATALOG_JSON}',
dest=constants.ES_BT_CATALOG_JSON,
required=True,
help='BigTable catalog inline json'
)

known_args: argparse.Namespace
Expand All @@ -104,7 +130,11 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:
es_password: str = args[constants.ES_BT_NODE_PASSWORD]
flatten_struct = args[constants.ES_BT_FLATTEN_STRUCT]
flatten_array = args[constants.ES_BT_FLATTEN_ARRAY]
catalog: str = ''.join(args[constants.ES_BT_HBASE_CATALOG_JSON].split())
catalog: str = ''.join(args[constants.ES_BT_CATALOG_JSON].split())
project_id: str = args[constants.ES_BT_PROJECT_ID]
instance_id: str = args[constants.ES_BT_INSTANCE_ID]
create_new_table: bool = args[constants.ES_BT_CREATE_NEW_TABLE]
batch_mutate_size: int = args[constants.ES_BT_BATCH_MUTATE_SIZE]

ignore_keys = {constants.ES_BT_NODE_PASSWORD}
filtered_args = {key:val for key,val in args.items() if key not in ignore_keys}
Expand All @@ -128,7 +158,10 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:

# Write
input_data.write \
.format(constants.FORMAT_HBASE) \
.format(constants.FORMAT_BIGTABLE) \
.options(catalog=catalog) \
.option('hbase.spark.use.hbasecontext', "false") \
.option(constants.ES_BT_PROJECT_ID, project_id) \
.option(constants.ES_BT_INSTANCE_ID, instance_id) \
.option(constants.ES_BT_CREATE_NEW_TABLE, create_new_table) \
.option(constants.ES_BT_BATCH_MUTATE_SIZE, batch_mutate_size) \
.save()
Loading

0 comments on commit 3f13e3c

Please sign in to comment.