diff --git a/.gitignore b/.gitignore index 8f13634e8..45956f380 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,6 @@ dist pipeline.json .ipynb_checkpoints/ .venv/ -metastore_db/ \ No newline at end of file +metastore_db/ +python/elastic.jks +python/tls.cert diff --git a/python/.ci/Jenkinsfile b/python/.ci/Jenkinsfile index a9442b6dc..183a50002 100644 --- a/python/.ci/Jenkinsfile +++ b/python/.ci/Jenkinsfile @@ -725,6 +725,45 @@ pipeline { } } } + stage('Parallel Execution 8'){ + parallel{ + stage('Elasticsearch to GCS'){ + steps{ + retry(count: stageRetryCount) { + sh ''' + python3.8 -m pip install --user virtualenv + + python3.8 -m venv env + + source env/bin/activate + + gsutil cp gs://dataproc-templates/conf/elastic.jks /tmp/ + + export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp" + export SKIP_BUILD=true + export JARS="gs://dataproc-templates/jars/elasticsearch-spark-20_2.12-8.15.3.jar" + export FILES="/tmp/elastic.jks" + + cd python + + ./bin/start.sh \ + --properties=spark.driver.extraJavaOptions='-Djavax.net.ssl.trustStore=./elastic.jks -Djavax.net.ssl.trustStorePassword=changeit',spark.executor.extraJavaOptions='-Djavax.net.ssl.trustStore=./elastic.jks -Djavax.net.ssl.trustStorePassword=changeit' \ + -- --template=ELASTICSEARCHTOGCS \ + --es.gcs.input.node=${ELASTIC_INPUT_NODE} \ + --es.gcs.input.index="books" \ + --es.gcs.input.user=${ELASTIC_USER} \ + --es.gcs.input.password=${ELASTIC_PASSWORD} \ + --es.gcs.output.format="parquet" \ + --es.gcs.output.location="gs://dataproc-templates/integration-testing/output/ELASTICSEARCHTOGCS" \ + --es.gcs.output.mode="overwrite" \ + --es.gcs.input.es.net.ssl="true" \ + --es.gcs.input.es.net.ssl.cert.allow.self.signed="true" + ''' + } + } + } + } + } } post { always{ diff --git a/python/dataproc_templates/elasticsearch/Dockerfile b/python/dataproc_templates/elasticsearch/Dockerfile deleted file mode 100644 index 88de2ed08..000000000 --- a/python/dataproc_templates/elasticsearch/Dockerfile +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright 2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Reference Dockerfile for custom image required for Hbase to GCS Dataproc template - -# Debian 11 is recommended. -FROM debian:11-slim - -# Suppress interactive prompts -ENV DEBIAN_FRONTEND=noninteractive - -# (Required) Install utilities required by Spark scripts. -RUN apt update && apt install -y procps tini - -# (Optional) Add extra jars. -ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/ -ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*' -RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}" -COPY hbase-site.xml /etc/hbase/conf/ -RUN chmod 777 /etc/hbase/conf/hbase-site.xml - -# (Optional) Install and configure Miniconda3. -ENV CONDA_HOME=/opt/miniconda3 -ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python -ENV PATH=${CONDA_HOME}/bin:${PATH} -COPY Miniconda3-latest-Linux-x86_64.sh . -RUN bash Miniconda3-latest-Linux-x86_64.sh -b -p /opt/miniconda3 \ - && ${CONDA_HOME}/bin/conda config --system --set always_yes True \ - && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \ - && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \ - && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict - -# (Optional) Install Conda packages. -# -# The following packages are installed in the default image, it is strongly -# recommended to include all of them. -# -# Use mamba to install packages quickly. -RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \ - && ${CONDA_HOME}/bin/mamba install \ - google-cloud-bigquery \ - google-cloud-bigtable \ - google-cloud-secret-manager - -# (Required) Create the 'spark' group/user. -# The GID and UID must be 1099. Home directory is required. -RUN groupadd -g 1099 spark -RUN useradd -u 1099 -g 1099 -d /home/spark -m spark -USER spark diff --git a/python/dataproc_templates/elasticsearch/README.md b/python/dataproc_templates/elasticsearch/README.md index 175efc667..b7d5aa195 100644 --- a/python/dataproc_templates/elasticsearch/README.md +++ b/python/dataproc_templates/elasticsearch/README.md @@ -595,7 +595,7 @@ options: ``` export GCP_PROJECT=my-project -export JARS="gs://spark-lib/elasticsearch/elasticsearch-spark-30_2.12-8.11.4.jar,gs://spark-lib/bigquery/spark-3.3-bigquery-0.39.0.jar" +export JARS="gs://spark-lib/elasticsearch/elasticsearch-spark-30_2.12-8.11.4.jar" export REGION=us-central1 export SUBNET=projects/my-project/regions/us-central1/subnetworks/test-subnet @@ -614,43 +614,22 @@ export SUBNET=projects/my-project/regions/us-central1/subnetworks/test-subnet Template for exporting an Elasticsearch Index to a BigTable table. -It uses the Apache HBase Spark Connector to write to Bigtable. +It uses the Spark BigTable Connector to write to Bigtable. -This [tutorial](https://cloud.google.com/dataproc/docs/tutorials/spark-hbase#dataproc_hbase_tutorial_view_code-python) shows how to run a Spark/PySpark job connecting to Bigtable. -However, it focuses in running the job using a Dataproc cluster, and not Dataproc Serverless. -Here in this template, you will notice that there are different configuration steps for the PySpark job to successfully run using Dataproc Serverless, connecting to Bigtable using the HBase interface. +Here in this template, you will notice that there are different configuration steps for the PySpark job to successfully run using Dataproc Serverless, connecting to Bigtable using Bigtable connector. You can also check out the [differences between HBase and Cloud Bigtable](https://cloud.google.com/bigtable/docs/hbase-differences). ## Requirements -1) Configure the [hbase-site.xml](./hbase-site.xml) ([reference](https://cloud.google.com/bigtable/docs/hbase-connecting#creating_the_hbase-sitexml_file)) with your Bigtable instance reference - - The hbase-site.xml needs to be available in some path of the container image used by Dataproc Serverless. - - For that, you need to build and host a [customer container image](https://cloud.google.com/dataproc-serverless/docs/guides/custom-containers#submit_a_spark_batch_workload_using_a_custom_container_image) in GCP Container Registry. - - Add the following layer to the [Dockerfile](./Dockerfile), for it to copy your local hbase-site.xml to the container image (already done): - ``` - COPY hbase-site.xml /etc/hbase/conf/ - ``` - - Build the [Dockerfile](./Dockerfile), building and pushing it to GCP Container Registry with: - ``` - wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh - IMAGE=us-central1-docker.pkg.dev///: - docker build --platform linux/amd64 -t "${IMAGE}" . - docker push "${IMAGE}" - ``` - - An SPARK_EXTRA_CLASSPATH environment variable should also be set to the same path when submitting the job. - ``` - (./bin/start.sh ...) - --container-image="us-central1-docker.pkg.dev///:" # image with hbase-site.xml in /etc/hbase/conf/ - --properties='spark.dataproc.driverEnv.SPARK_EXTRA_CLASSPATH=/etc/hbase/conf/' - ``` - -2) Configure the desired HBase catalog json to passed as an argument (table reference and schema) - - The hbase-catalog.json should be passed using the --gcs.bigtable.hbase.catalog.json +1) `export JARS="gs://spark-lib/bigtable/spark-bigtable_2.12-0.1.0.jar"` and also required `spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36` Please refer example from official [documentation](https://github.com/GoogleCloudDataproc/spark-bigtable-connector/tree/main/examples/python). + +2) Configure the desired BigTable catalog json to passed as an argument (table reference and schema) + - The catalog.json should be passed using the --es.bt.catalog.json ``` (./bin/start.sh ...) - -- --gcs.bigtable.hbase.catalog.json='''{ - "table":{"namespace":"default","name":""}, + -- --es.bt.catalog.json='''{ + "table":{"name":""}, "rowkey":"key", "columns":{ "key":{"cf":"rowkey", "col":"key", "type":"string"}, @@ -659,7 +638,7 @@ You can also check out the [differences between HBase and Cloud Bigtable](https: }''' ``` -3) [Create and manage](https://cloud.google.com/bigtable/docs/managing-tables) your Bigtable table schema, column families, etc, to match the provided HBase catalog. +3) [Create and manage](https://cloud.google.com/bigtable/docs/managing-tables) your Bigtable table schema, column families, etc, to match the provided Bigtable catalog. ## Required JAR files @@ -669,24 +648,14 @@ Depending upon the versions of Elasticsearch, PySpark and Scala in the environme The template can support the Elasticsearch versions >= 7.12.0, using the appropriate Elasticsearch Spark Connector -Some HBase and Bigtable dependencies are required to be passed when submitting the job. +Spark Bigtable connector dependencies are required to be passed when submitting the job. These dependencies need to be passed by using the --jars flag, or, in the case of Dataproc Templates, using the JARS environment variable. Some dependencies (jars) must be downloaded from [MVN Repository](https://mvnrepository.com/) and stored your Cloud Storage bucket (create one to store the dependencies). -- **[Apache HBase Spark Connector](https://mvnrepository.com/artifact/org.apache.hbase.connectors.spark/hbase-spark) dependencies (already mounted in Dataproc Serverless, so you refer to them using file://):** - - file:///usr/lib/spark/external/hbase-spark-protocol-shaded.jar - - file:///usr/lib/spark/external/hbase-spark.jar - -- **Bigtable dependency:** - - gs:///bigtable-hbase-2.x-shaded-2.3.0.jar - - Download it using ``` wget https://repo1.maven.org/maven2/com/google/cloud/bigtable/bigtable-hbase-2.x-shaded/2.3.0/bigtable-hbase-2.x-shaded-2.3.0.jar``` - -- **HBase dependencies:** - - gs:///hbase-client-2.4.12.jar - - Download it using ``` wget https://repo1.maven.org/maven2/org/apache/hbase/hbase-client/2.4.12/hbase-client-2.4.12.jar``` - - gs:///hbase-shaded-mapreduce-2.4.12.jar - - Download it using ``` wget https://repo1.maven.org/maven2/org/apache/hbase/hbase-shaded-mapreduce/2.4.12/hbase-shaded-mapreduce-2.4.12.jar``` +- **[Spark BigTable Connector](https://cloud.google.com/bigtable/docs/use-bigtable-spark-connector)** + - gs://spark-lib/bigtable/spark-bigtable_2.12-0.1.0.jar +It also requires [DeltaIO dependencies](https://docs.delta.io/latest/releases.html) to be available in the Dataproc cluster if using delta format. ## Arguments @@ -694,7 +663,9 @@ Some dependencies (jars) must be downloaded from [MVN Repository](https://mvnrep - `es.bt.input.index`: Elasticsearch Input Index Name (format: /) - `es.bt.input.user`: Elasticsearch Username - `es.bt.input.password`: Elasticsearch Password -- `es.bt.hbase.catalog.json`: HBase catalog inline json +- `spark.bigtable.project.id`: GCP project where BigTable instance is running +- `spark.bigtable.instance.id`: BigTable instance id +- `es.bt.catalog.json`: BigTable catalog inline json #### Optional Arguments - `es.bt.input.es.nodes.path.prefix`: Prefix to add to all requests made to Elasticsearch - `es.bt.input.es.query`: Holds the query used for reading data from the specified Index @@ -743,6 +714,8 @@ Some dependencies (jars) must be downloaded from [MVN Repository](https://mvnrep - `es.bt.input.es.net.proxy.socks.use.system.props`: Whether use the system Socks proxy properties (namely socksProxyHost and socksProxyHost) or not (default yes) - `es.bt.flatten.struct.fields`: Flatten the struct fields - `es.bt.flatten.array.fields`: Flatten the n-D array fields to 1-D array fields, it needs es.bt.flatten.struct.fields option to be passed +- `spark.bigtable.create.new.table`: Set True if you want to create a BigTable table from catalog. Default value is False means table must be present. +- `spark.bigtable.batch.mutate.size`: BigTable batch mutation size. Maximum allowed value is `100000`. Default is `100`. Rererence [documentation](https://github.com/GoogleCloudDataproc/spark-bigtable-connector/blob/main/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/datasources/BigtableSparkConf.scala#L86) ## Usage @@ -754,7 +727,9 @@ usage: main.py [-h] --es.bt.input.index ES.BT.INPUT.INDEX --es.bt.input.user ES.BT.INPUT.USER --es.bt.input.password ES.BT.INPUT.PASSWORD - --es.bt.hbase.catalog.json ES.BT.HBASE.CATALOG.JSON + --spark.bigtable.project.id ES.BT.PROJECT.ID + --spark.bigtable.instance.id ES.BT.INSTANCE.ID + --es.bt.catalog.json ES.BT.CATALOG.JSON [--es.bt.input.es.nodes.path.prefix ES.BT.INPUT.ES.NODES.PATH.PREFIX] [--es.bt.input.es.query ES.BT.INPUT.ES.QUERY] [--es.bt.input.es.mapping.date.rich ES.BT.INPUT.ES.MAPPING.DATE.RICH] @@ -802,6 +777,8 @@ usage: main.py [-h] [--es.bt.input.es.net.proxy.socks.use.system.props ES.BT.INPUT.ES.NET.PROXY.SOCKS.USE.SYSTEM.PROPS] [--es.bt.flatten.struct.fields] [--es.bt.flatten.array.fields] + [--spark.bigtable.create.new.table ES.BT.CREATE.NEW.TABLE] + [--spark.bigtable.batch.mutate.size ES.BT.BATCH.MUTATE.SIZE] options: @@ -908,8 +885,12 @@ options: Flatten the struct fields --es.bt.flatten.array.fields Flatten the n-D array fields to 1-D array fields, it needs es.bt.flatten.struct.fields option to be passed - --es.bt.hbase.catalog.json ES.BT.HBASE.CATALOG.JSON - HBase catalog inline json + --spark.bigtable.project.id ES.BT.PROJECT.ID + GCP project id where BigTable instance is running + --spark.bigtable.instance.id ES.BT.INSTANCE.ID + BigTable instance id + --es.bt.catalog.json ES.BT.CATALOG.JSON + BigTable catalog inline json ``` ## Example submission @@ -918,24 +899,19 @@ options: export GCP_PROJECT= export REGION= export GCS_STAGING_LOCATION= -export JARS="gs:///elasticsearch-spark-30_2.12-8.11.4.jar,\ - gs:///bigtable-hbase-2.x-hadoop-2.3.0.jar,\ - gs:///hbase-client-2.4.12.jar,\ - gs:///hbase-shaded-mapreduce-2.4.12.jar,\ - file:///usr/lib/spark/external/hbase-spark-protocol-shaded.jar,\ - file:///usr/lib/spark/external/hbase-spark.jar" -export SUBNET=projects/my-project/regions/us-central1/subnetworks/test-subnet +export JARS="gs:///elasticsearch-spark-30_2.12-8.11.4.jar,gs://spark-lib/bigtable/spark-bigtable_2.12-0.1.0.jar" +export SPARK_PROPERTIES="spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36" ./bin/start.sh \ ---container-image="gcr.io//:" \ ---properties='spark.dataproc.driverEnv.SPARK_EXTRA_CLASSPATH=/etc/hbase/conf/,spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ # image with hbase-site.xml in /etc/hbase/conf/ -- --template=ELASTICSEARCHTOBIGTABLE \ --es.bt.input.node="xxxxxxxxxxxx.us-central1.gcp.cloud.es.io:9243" \ --es.bt.input.index="demo" \ --es.bt.input.user="demo" \ --es.bt.input.password="demo" \ - --es.bt.hbase.catalog.json='''{ - "table":{"namespace":"default","name":"my_table"}, + --spark.bigtable.project.id="demo-project" \ + --spark.bigtable.instance.id="bt-instance-id" \ + --es.bt.catalog.json='''{ + "table":{"name":"my_table"}, "rowkey":"key", "columns":{ "key":{"cf":"rowkey", "col":"key", "type":"string"}, diff --git a/python/dataproc_templates/elasticsearch/elasticsearch_to_bigtable.py b/python/dataproc_templates/elasticsearch/elasticsearch_to_bigtable.py index a93d86e69..3c756f541 100644 --- a/python/dataproc_templates/elasticsearch/elasticsearch_to_bigtable.py +++ b/python/dataproc_templates/elasticsearch/elasticsearch_to_bigtable.py @@ -82,10 +82,36 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]: ) ) parser.add_argument( - f'--{constants.ES_BT_HBASE_CATALOG_JSON}', - dest=constants.ES_BT_HBASE_CATALOG_JSON, + f'--{constants.ES_BT_PROJECT_ID}', + dest=constants.ES_BT_PROJECT_ID, required=True, - help='HBase catalog inline json' + help='BigTable project ID' + ) + parser.add_argument( + f'--{constants.ES_BT_INSTANCE_ID}', + dest=constants.ES_BT_INSTANCE_ID, + required=True, + help='BigTable instance ID' + ) + parser.add_argument( + f'--{constants.ES_BT_CREATE_NEW_TABLE}', + dest=constants.ES_BT_CREATE_NEW_TABLE, + required=False, + help='BigTable create new table flag. Default is false.', + default=False + ) + parser.add_argument( + f'--{constants.ES_BT_BATCH_MUTATE_SIZE}', + dest=constants.ES_BT_BATCH_MUTATE_SIZE, + required=False, + help='BigTable batch mutate size. Maximum allowed size is 100000. Default is 100.', + default=100 + ) + parser.add_argument( + f'--{constants.ES_BT_CATALOG_JSON}', + dest=constants.ES_BT_CATALOG_JSON, + required=True, + help='BigTable catalog inline json' ) known_args: argparse.Namespace @@ -104,7 +130,11 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None: es_password: str = args[constants.ES_BT_NODE_PASSWORD] flatten_struct = args[constants.ES_BT_FLATTEN_STRUCT] flatten_array = args[constants.ES_BT_FLATTEN_ARRAY] - catalog: str = ''.join(args[constants.ES_BT_HBASE_CATALOG_JSON].split()) + catalog: str = ''.join(args[constants.ES_BT_CATALOG_JSON].split()) + project_id: str = args[constants.ES_BT_PROJECT_ID] + instance_id: str = args[constants.ES_BT_INSTANCE_ID] + create_new_table: bool = args[constants.ES_BT_CREATE_NEW_TABLE] + batch_mutate_size: int = args[constants.ES_BT_BATCH_MUTATE_SIZE] ignore_keys = {constants.ES_BT_NODE_PASSWORD} filtered_args = {key:val for key,val in args.items() if key not in ignore_keys} @@ -128,7 +158,10 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None: # Write input_data.write \ - .format(constants.FORMAT_HBASE) \ + .format(constants.FORMAT_BIGTABLE) \ .options(catalog=catalog) \ - .option('hbase.spark.use.hbasecontext', "false") \ + .option(constants.ES_BT_PROJECT_ID, project_id) \ + .option(constants.ES_BT_INSTANCE_ID, instance_id) \ + .option(constants.ES_BT_CREATE_NEW_TABLE, create_new_table) \ + .option(constants.ES_BT_BATCH_MUTATE_SIZE, batch_mutate_size) \ .save() diff --git a/python/dataproc_templates/elasticsearch/hbase-site.xml b/python/dataproc_templates/elasticsearch/hbase-site.xml deleted file mode 100644 index fd13fab34..000000000 --- a/python/dataproc_templates/elasticsearch/hbase-site.xml +++ /dev/null @@ -1,34 +0,0 @@ - - - - - hbase.client.connection.impl - com.google.cloud.bigtable.hbase2_x.BigtableConnection - - - google.bigtable.project.id - project_id - - - google.bigtable.instance.id - bigtable_instance_id - - diff --git a/python/dataproc_templates/util/template_constants.py b/python/dataproc_templates/util/template_constants.py index 99b2dd781..c565d08f4 100644 --- a/python/dataproc_templates/util/template_constants.py +++ b/python/dataproc_templates/util/template_constants.py @@ -531,9 +531,13 @@ def get_es_spark_connector_input_options(prefix): ES_BT_INPUT_INDEX = "es.bt.input.index" ES_BT_NODE_USER = "es.bt.input.user" ES_BT_NODE_PASSWORD = "es.bt.input.password" -ES_BT_HBASE_CATALOG_JSON = "es.bt.hbase.catalog.json" ES_BT_FLATTEN_STRUCT = "es.bt.flatten.struct.fields" ES_BT_FLATTEN_ARRAY = "es.bt.flatten.array.fields" +ES_BT_CATALOG_JSON = "es.bt.catalog.json" +ES_BT_PROJECT_ID = "spark.bigtable.project.id" +ES_BT_INSTANCE_ID = "spark.bigtable.instance.id" +ES_BT_CREATE_NEW_TABLE = "spark.bigtable.create.new.table" +ES_BT_BATCH_MUTATE_SIZE = "spark.bigtable.batch.mutate.size" # GCS to BigQuery GCS_BQ_INPUT_LOCATION = "gcs.bigquery.input.location" diff --git a/python/test/elasticsearch/test_elasticsearch_to_bigtable.py b/python/test/elasticsearch/test_elasticsearch_to_bigtable.py index 3a4af2c06..82680281a 100644 --- a/python/test/elasticsearch/test_elasticsearch_to_bigtable.py +++ b/python/test/elasticsearch/test_elasticsearch_to_bigtable.py @@ -35,13 +35,17 @@ def test_parse_args(self): "--es.bt.input.index=demo", "--es.bt.input.user=demo", "--es.bt.input.password=demo", - "--es.bt.hbase.catalog.json={key:value}"]) + "--spark.bigtable.project.id=GCP_PROJECT", + "--spark.bigtable.instance.id=BIGTABLE_INSTANCE_ID", + "--es.bt.catalog.json={key:value}"]) assert parsed_args["es.bt.input.node"] == "xxxxxxxxxxxx.us-central1.gcp.cloud.es.io:9243" assert parsed_args["es.bt.input.index"] == "demo" assert parsed_args["es.bt.input.user"] == "demo" assert parsed_args["es.bt.input.password"] == "demo" - assert parsed_args["es.bt.hbase.catalog.json"] == '{key:value}' + assert parsed_args["spark.bigtable.project.id"] == "GCP_PROJECT" + assert parsed_args["spark.bigtable.instance.id"] == "BIGTABLE_INSTANCE_ID" + assert parsed_args["es.bt.catalog.json"] == '{key:value}' @mock.patch.object(pyspark.sql, 'SparkSession') @mock.patch("dataproc_templates.util.dataframe_reader_wrappers.rename_columns") @@ -54,7 +58,9 @@ def test_run(self, mock_rename_columns, mock_spark_session): "--es.bt.input.index=demo", "--es.bt.input.user=demo", "--es.bt.input.password=demo", - "--es.bt.hbase.catalog.json={key:value}"]) + "--spark.bigtable.project.id=GCP_PROJECT", + "--spark.bigtable.instance.id=BIGTABLE_INSTANCE_ID", + "--es.bt.catalog.json={key:value}"]) mock_spark_session.sparkContext.newAPIHadoopRDD.return_value = mock_spark_session.rdd.RDD mock_spark_session.read.json.return_value = mock_spark_session.dataframe.DataFrame @@ -66,8 +72,6 @@ def test_run(self, mock_rename_columns, mock_spark_session): mock_spark_session.read.json.assert_called_once() mock_rename_columns.assert_called_once() mock_spark_session.dataframe.DataFrame.write.format. \ - assert_called_once_with(constants.FORMAT_HBASE) + assert_called_once_with(constants.FORMAT_BIGTABLE) mock_spark_session.dataframe.DataFrame.write.format().options. \ assert_called_with(catalog='{key:value}') - mock_spark_session.dataframe.DataFrame.write.format().options().option. \ - assert_called_once_with('hbase.spark.use.hbasecontext', "false")