From 544b9e92ca38b6872a65523e86a92db4ec42d942 Mon Sep 17 00:00:00 2001 From: Dion Song Date: Tue, 10 Apr 2018 22:33:52 -0500 Subject: [PATCH 1/6] add hive merge pipeline --- integration-tests/run-tests.sh | 7 +- .../sqoop-parquet-hdfs-hive-merge/env.yml | 7 + .../generate-scripts | 2 + .../sqoop-parquet-hdfs-hive-merge/readme.md | 1 + .../run-in-container.sh | 21 +++ .../sqoop-parquet-hdfs-hive-merge/run.sh | 6 + .../sqoop-parquet-hdfs-hive-merge/tables.yml | 178 ++++++++++++++++++ .../sqoop-parquet-hdfs-hive-merge/Makefile | 87 +++++++++ .../Makefile.meta | 42 +++++ .../create-base-table.sql | 33 ++++ .../create-incr-table.sql | 33 ++++ .../create-merge-view.sql | 30 +++ .../create-report-table.sql | 48 +++++ .../drop-raw-tables.sql | 19 ++ .../drop-report-table.sql | 17 ++ .../hdfs-delete.sh | 18 ++ .../hdfs-incr-clear.sh | 18 ++ .../hdfs-report-delete.sh | 18 ++ .../sqoop-parquet-hdfs-hive-merge/imports | 5 + .../move-1st-sqoop.sh | 19 ++ .../overwrite-base-table.sql | 22 +++ .../sqoop-create.sh | 41 ++++ .../type-mapping.yml | 100 ++++++++++ 23 files changed, 769 insertions(+), 3 deletions(-) create mode 100644 integration-tests/sqoop-parquet-hdfs-hive-merge/env.yml create mode 100755 integration-tests/sqoop-parquet-hdfs-hive-merge/generate-scripts create mode 100644 integration-tests/sqoop-parquet-hdfs-hive-merge/readme.md create mode 100755 integration-tests/sqoop-parquet-hdfs-hive-merge/run-in-container.sh create mode 100755 integration-tests/sqoop-parquet-hdfs-hive-merge/run.sh create mode 100644 integration-tests/sqoop-parquet-hdfs-hive-merge/tables.yml create mode 100644 templates/sqoop-parquet-hdfs-hive-merge/Makefile create mode 100644 templates/sqoop-parquet-hdfs-hive-merge/Makefile.meta create mode 100644 templates/sqoop-parquet-hdfs-hive-merge/create-base-table.sql create mode 100644 templates/sqoop-parquet-hdfs-hive-merge/create-incr-table.sql create mode 100644 templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql create mode 100644 templates/sqoop-parquet-hdfs-hive-merge/create-report-table.sql create mode 100644 templates/sqoop-parquet-hdfs-hive-merge/drop-raw-tables.sql create mode 100644 templates/sqoop-parquet-hdfs-hive-merge/drop-report-table.sql create mode 100644 templates/sqoop-parquet-hdfs-hive-merge/hdfs-delete.sh create mode 100644 templates/sqoop-parquet-hdfs-hive-merge/hdfs-incr-clear.sh create mode 100644 templates/sqoop-parquet-hdfs-hive-merge/hdfs-report-delete.sh create mode 100644 templates/sqoop-parquet-hdfs-hive-merge/imports create mode 100644 templates/sqoop-parquet-hdfs-hive-merge/move-1st-sqoop.sh create mode 100644 templates/sqoop-parquet-hdfs-hive-merge/overwrite-base-table.sql create mode 100644 templates/sqoop-parquet-hdfs-hive-merge/sqoop-create.sh create mode 100644 templates/sqoop-parquet-hdfs-hive-merge/type-mapping.yml diff --git a/integration-tests/run-tests.sh b/integration-tests/run-tests.sh index 27970a9..39891cd 100755 --- a/integration-tests/run-tests.sh +++ b/integration-tests/run-tests.sh @@ -38,6 +38,7 @@ set -e docker-compose exec mysql /data/load-data.sh docker-compose exec kimpala /run-all-services.sh -$SCRIPT_DIR/sqoop-parquet-hdfs-impala/run.sh -$SCRIPT_DIR/sqoop-parquet-full-load/run.sh -$SCRIPT_DIR/sqoop-parquet-hdfs-kudu-impala/run.sh +#$SCRIPT_DIR/sqoop-parquet-hdfs-impala/run.sh +#$SCRIPT_DIR/sqoop-parquet-full-load/run.sh +#$SCRIPT_DIR/sqoop-parquet-hdfs-kudu-impala/run.sh +$SCRIPT_DIR/sqoop-parquet-hdfs-hive-merge/run.sh diff --git a/integration-tests/sqoop-parquet-hdfs-hive-merge/env.yml b/integration-tests/sqoop-parquet-hdfs-hive-merge/env.yml new file mode 100644 index 0000000..194871b --- /dev/null +++ b/integration-tests/sqoop-parquet-hdfs-hive-merge/env.yml @@ -0,0 +1,7 @@ +--- +connection_string: "jdbc:mysql://mysql:3306/integration_test" +hdfs_basedir: "hdfs://0.0.0.0:8020/user/hive/warehouse" +source_db_user_name: "pipewrench" +password_file: "file:////mount/password.file" +destination_database: "default" +impala_cmd: "impala-shell -i localhost -f " diff --git a/integration-tests/sqoop-parquet-hdfs-hive-merge/generate-scripts b/integration-tests/sqoop-parquet-hdfs-hive-merge/generate-scripts new file mode 100755 index 0000000..1f90f05 --- /dev/null +++ b/integration-tests/sqoop-parquet-hdfs-hive-merge/generate-scripts @@ -0,0 +1,2 @@ +#!/bin/bash -e +pipewrench-merge --conf=tables.yml --debug_level ERROR --env=env.yml --pipeline-templates=../../templates/sqoop-parquet-hdfs-hive-merge diff --git a/integration-tests/sqoop-parquet-hdfs-hive-merge/readme.md b/integration-tests/sqoop-parquet-hdfs-hive-merge/readme.md new file mode 100644 index 0000000..cd57d66 --- /dev/null +++ b/integration-tests/sqoop-parquet-hdfs-hive-merge/readme.md @@ -0,0 +1 @@ +This pipeline uses hive to merge the incremental data as explained in https://hortonworks.com/blog/four-step-strategy-incremental-updates-hive/ diff --git a/integration-tests/sqoop-parquet-hdfs-hive-merge/run-in-container.sh b/integration-tests/sqoop-parquet-hdfs-hive-merge/run-in-container.sh new file mode 100755 index 0000000..b95995e --- /dev/null +++ b/integration-tests/sqoop-parquet-hdfs-hive-merge/run-in-container.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# Copyright 2017 Cargill Incorporated +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +set -e +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +cd $SCRIPT_DIR +# verify we can generate scripts without error +sudo -u hdfs hdfs dfs -rm -r /user/hive/warehouse/* || true +make -j1 integration-test-all -C output/sqoop-parquet-hdfs-hive-merge + diff --git a/integration-tests/sqoop-parquet-hdfs-hive-merge/run.sh b/integration-tests/sqoop-parquet-hdfs-hive-merge/run.sh new file mode 100755 index 0000000..c05b758 --- /dev/null +++ b/integration-tests/sqoop-parquet-hdfs-hive-merge/run.sh @@ -0,0 +1,6 @@ +#!/bin/bash +set -eu +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +cd $SCRIPT_DIR +./generate-scripts +docker-compose exec kimpala /mount/sqoop-parquet-hdfs-hive-merge/run-in-container.sh diff --git a/integration-tests/sqoop-parquet-hdfs-hive-merge/tables.yml b/integration-tests/sqoop-parquet-hdfs-hive-merge/tables.yml new file mode 100644 index 0000000..28d8bd8 --- /dev/null +++ b/integration-tests/sqoop-parquet-hdfs-hive-merge/tables.yml @@ -0,0 +1,178 @@ +--- +name: "sqoop_parquet_hdfs_hive_merge" # The name of this configuration +user_name: {{ source_db_user_name }} # Source database user name +type_mapping: type-mapping.yml # Type mapping used for database type conversion +sqoop_export_dir: {{ hdfs_basedir }}/export # Sqoop export data HDFS path +sqoop_password_file: {{ password_file}} # Password file for sqoop. Must reside in HDFS +sqoop_password: {{ password }} # Password for sqoop. Must reside in HDFS +connection_manager: "org.apache.sqoop.manager.MySQLManager" # Connection manager fully qualified class +sqoop_job_name_suffix: test # Suffix added to sqoop jobs. Can be used to differentiate environments +impala_cmd: "{{ impala_cmd }}" +source_database: + name: "integration_test" # Source database name + connection_string: {{ connection_string }} # Source database connection string. Should be kept in 'env.yml' + cmd: "mysql -P 3306 -uroot -ppipewrench -h mysql <" +staging_database: + name: "{{ destination_database }}" # Staging database name. + path: "{{ hdfs_basedir }}" # Staging database HDFS path +result_database: + name: "{{ destination_database }}" # Result database + path: "{{ hdfs_basedir }}" # Result database HDFS path +tables: + - id: "titanic" # Uniquely identifies this table + META_CONTACT_INFO: "team@company.com" # Contact info will be loaded into tblproperties + META_LOAD_FREQUENCY: "STREAMING" # Load frequency will be loaded into tblproperties + META_SECURITY_CLASSIFICATION: "OPEN" # Security classification will be loaded into tblproperties + META_SOURCE: "upstream.source.location" # Source will be loaded into tblproperties + source: + name: "titanic" # Source table name + file: ../../../../../data/Titanic.csv + destination: + name: "titanic" # Destination (Impala) table name + split_by_column: "Id" # Sqoop split by column (--split-by) + kudu: + hash_by: # List of columns to hash by + - Id + num_partitions: 2 # Number of Kudu partitions to create + check_column: Id # Incrementing timestamp of numeric column used when incrementally pulling data (sqoop --check-column) + primary_keys: # List of primary keys + - Id + columns: + - name: "Id" # Column name in source table + datatype: "int" # Column datatype in source table + comment: "comment" # Column comment + - name: "LastName" + datatype: "text" + comment: "comment" + - name: "FirstName" + datatype: "text" + comment: "comment" + - name: "PClass" + datatype: "text" + comment: "comment" + - name: "Age" + datatype: "int" + comment: "comment" + - name: "Sex" + datatype: "text" + comment: "comment" + - name: "Survived" + datatype: "int" + comment: "comment" + - name: "SexCode" + datatype: "int" + comment: "comment" + - id: "vocab" # Uniquely identifies this table + source: + name: "vocab" # Source table name + file: ../../../../../data/Vocab.csv + destination: + name: "vocab" # Destination (Impala) table name + split_by_column: "Id" # Sqoop split by column (--split-by) + kudu: + hash_by: # List of columns to hash by + - Id + num_partitions: 2 # Number of Kudu partitions to create + check_column: Id # Incrementing timestamp of numeric column used when incrementally pulling data (sqoop --check-column) + primary_keys: # List of primary keys + - Id + columns: + - name: "Id" # Column name in source table + datatype: "int" # Column datatype in source table + comment: "comment" # Column comment + - name: "Year" + datatype: "int" + comment: "comment" + - name: "Sex" + datatype: "text" + comment: "comment" + - name: "Education" + datatype: "int" + comment: "comment" + - name: "Vocabulary" + datatype: "int" + comment: "comment" + - id: "baseball" # Uniquely identifies this table + source: + name: "baseball" # Source table name + file: ../../../../../data/Baseball.csv + destination: + name: "baseball" # Destination (Impala) table name + split_by_column: "Id" # Sqoop split by column (--split-by) + kudu: + hash_by: # List of columns to hash by + - Id + num_partitions: 2 # Number of Kudu partitions to create + check_column: Id # Incrementing timestamp of numeric column used when incrementally pulling data (sqoop --check-column) + primary_keys: # List of primary keys + - Id + columns: + - name: "Id" # Column name in source table + datatype: "int" # Column datatype in source table + comment: "comment" # Column comment + - name: "PlayerId" + datatype: "text" + comment: "comment" + - name: "Year" + datatype: "int" + comment: "comment" + - name: "Stint" + datatype: "int" + comment: "comment" + - name: "Team" + datatype: "text" + comment: "comment" + - name: "LG" + datatype: "int" + comment: "comment" + - name: "G" + datatype: "int" + comment: "comment" + - name: "AB" + datatype: "int" + comment: "comment" + - name: "R" + datatype: "int" + comment: "comment" + - name: "H" + datatype: "int" + comment: "comment" + - name: "X2b" + datatype: "int" + comment: "comment" + - name: "X3b" + datatype: "int" + comment: "comment" + - name: "HR" + datatype: "int" + comment: "comment" + - name: "RBI" + datatype: "int" + comment: "comment" + - name: "SB" + datatype: "int" + comment: "comment" + - name: "CS" + datatype: "int" + comment: "comment" + - name: "BB" + datatype: "int" + comment: "comment" + - name: "SO" + datatype: "int" + comment: "comment" + - name: "IBB" + datatype: "int" + comment: "comment" + - name: "HBP" + datatype: "int" + comment: "comment" + - name: "SH" + datatype: "int" + comment: "comment" + - name: "SF" + datatype: "int" + comment: "comment" + - name: "GIDP" + datatype: "int" + comment: "comment" diff --git a/templates/sqoop-parquet-hdfs-hive-merge/Makefile b/templates/sqoop-parquet-hdfs-hive-merge/Makefile new file mode 100644 index 0000000..703b678 --- /dev/null +++ b/templates/sqoop-parquet-hdfs-hive-merge/Makefile @@ -0,0 +1,87 @@ +{# Copyright 2017 Cargill Incorporated + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. #} + +impala-cmd={{ conf.impala_cmd }} + +integration-test: + $(MAKE) clean + $(MAKE) first-run + $(MAKE) update + +sqoop-create: sqoop-create.sh #### Create Sqoop job + ./run-with-logging.sh ./sqoop-create.sh $@ + +sqoop-delete: ## Delete Sqoop job + ./run-with-logging.sh ./sqoop-delete.sh $@ + +sqoop-exec: ## Execute sqoop job + ./run-with-logging.sh ./sqoop-exec.sh $@ + +create-base-table: create-base-table.sql + $(impala-cmd) create-base-table.sql + +create-incr-table: create-incr-table.sql + $(impala-cmd) create-incr-table.sql + +create-merge-view: create-merge-view.sql + $(impala-cmd) create-merge-view.sql + +create-report-table: create-report-table.sql + $(impala-cmd) create-report-table.sql + +overwrite-base-table: overwrite-base-table.sql + $(impala-cmd) overwrite-base-table.sql + +hdfs-clean: hdfs-delete.sh ## Delete parquet files from HDFS + ./run-with-logging.sh ./hdfs-delete.sh $@ + +move-1st-sqoop: move-1st-sqoop.sh + ./run-with-logging.sh ./move-1st-sqoop.sh $@ + +drop-raw-tables: drop-raw-tables.sql + $(impala-cmd) drop-raw-tables.sql + ./run-with-logging.sh ./hdfs-delete.sh $@ + +hdfs-incr-clear: hdfs-incr-clear.sh + ./run-with-logging.sh ./hdfs-incr-clear.sh $@ + +drop-report-table: drop-report-table.sql + $(impala-cmd) drop-report-table.sql + ./run-with-logging.sh ./hdfs-report-delete.sh $@ + +first-run: + $(MAKE) create-base-table #keep this on top so that the hdfs directories are created by impala the lower lever user. Otherwise there might be access issue later. + $(MAKE) sqoop-create + $(MAKE) sqoop-exec + $(MAKE) move-1st-sqoop + $(MAKE) create-incr-table + +update: + $(MAKE) sqoop-exec + $(MAKE) create-merge-view + $(MAKE) drop-report-table + $(MAKE) create-report-table + $(MAKE) overwrite-base-table + $(MAKE) hdfs-incr-clear + +clean: + $(MAKE) drop-raw-tables + $(MAKE) drop-report-table + $(MAKE) sqoop-delete + +targets: ## Print out a list of available targets + @fgrep -h ": " $(MAKEFILE_LIST) | fgrep -v fgrep | sed -e 's/\\$$//' | sed -e 's/:.*//' + +help: + @fgrep -h "##" $(MAKEFILE_LIST) | fgrep -v fgrep | sed -e 's/\\$$//' | sed -e 's/##//' diff --git a/templates/sqoop-parquet-hdfs-hive-merge/Makefile.meta b/templates/sqoop-parquet-hdfs-hive-merge/Makefile.meta new file mode 100644 index 0000000..dcb9195 --- /dev/null +++ b/templates/sqoop-parquet-hdfs-hive-merge/Makefile.meta @@ -0,0 +1,42 @@ +{# Copyright 2017 Cargill Incorporated + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. #} + +first-run-all: {%- for table in tables %} first-run-{{ table.id }} {%- endfor %} + +{%- for table in tables %} +first-run-{{ table.id }}: + $(MAKE) first-run -C {{ table.id }} +{%- endfor %} + +update-all: {%- for table in tables %} update-{{ table.id }} {%- endfor %} + +{%- for table in tables %} +update-{{ table.id }}: + $(MAKE) update -C {{ table.id }} +{%- endfor %} + +clean-all: +{%- for table in tables %} + $(MAKE) clean -C {{ table.id }} +{%- endfor %} + +{%- for table in tables %} +integration-test-{{ table.id }}: + $(MAKE) integration-test -C {{ table.id }} +{%- endfor %} + +integration-test-all: +{%- for table in tables %} + $(MAKE) integration-test -C {{ table.id }} +{%- endfor %} diff --git a/templates/sqoop-parquet-hdfs-hive-merge/create-base-table.sql b/templates/sqoop-parquet-hdfs-hive-merge/create-base-table.sql new file mode 100644 index 0000000..a62cde0 --- /dev/null +++ b/templates/sqoop-parquet-hdfs-hive-merge/create-base-table.sql @@ -0,0 +1,33 @@ +{# Copyright 2017 Cargill Incorporated + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. #} + +set sync_ddl=1; +USE {{ conf.staging_database.name }}; + +CREATE EXTERNAL TABLE IF NOT EXISTS {{ table.destination.name }}_base ( + {% for column in table.columns %} + {{ column.name }} {{ map_datatypes(column).parquet }} COMMENT '{{ column.comment }}' + {%- if not loop.last -%}, {% endif %} + {%- endfor %}) +STORED AS Parquet +LOCATION '{{ conf.staging_database.path }}/{{ table.destination.name }}/base' +TBLPROPERTIES( + 'parquet.compression'='SNAPPY', + 'SOURCE' = '{{ table.META_SOURCE }}', + 'SECURITY_CLASSIFICATION' = '{{ table.META_SECURITY_CLASSIFICATION }}', + 'LOAD_FREQUENCY' = '{{ table.META_LOAD_FREQUENCY }}', + 'CONTACT_INFO' = '{{ table.META_CONTACT_INFO }}' +); + +COMPUTE STATS {{ table.destination.name }}_base diff --git a/templates/sqoop-parquet-hdfs-hive-merge/create-incr-table.sql b/templates/sqoop-parquet-hdfs-hive-merge/create-incr-table.sql new file mode 100644 index 0000000..fa1b509 --- /dev/null +++ b/templates/sqoop-parquet-hdfs-hive-merge/create-incr-table.sql @@ -0,0 +1,33 @@ +{# Copyright 2017 Cargill Incorporated + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. #} + +set sync_ddl=1; +USE {{ conf.staging_database.name }}; + +--Create incr table-- +CREATE EXTERNAL TABLE IF NOT EXISTS {{ table.destination.name }}_incr ( + {% for column in table.columns %} + {{ column.name }} {{ map_datatypes(column).parquet }} COMMENT '{{ column.comment }}' + {%- if not loop.last -%}, {% endif %} + {%- endfor %}) +STORED AS Parquet +LOCATION '{{ conf.staging_database.path }}/{{ table.destination.name }}/incr' +TBLPROPERTIES( + 'SOURCE' = '{{ table.META_SOURCE }}', + 'SECURITY_CLASSIFICATION' = '{{ table.META_SECURITY_CLASSIFICATION }}', + 'LOAD_FREQUENCY' = '{{ table.META_LOAD_FREQUENCY }}', + 'CONTACT_INFO' = '{{ table.META_CONTACT_INFO }}' +); + +COMPUTE STATS {{ table.destination.name }}_incr diff --git a/templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql b/templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql new file mode 100644 index 0000000..eb4ff6f --- /dev/null +++ b/templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql @@ -0,0 +1,30 @@ +{# Copyright 2017 Cargill Incorporated + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. #} + +set sync_ddl=1; +USE {{ conf.staging_database.name }}; + +--Create merged view -- +DROP VIEW IF EXISTS {{ table.destination.name }}_view; +CREATE VIEW {{ table.destination.name }}_view AS + SELECT t1.* + FROM (SELECT * FROM {{ table.destination.name }}_base + UNION ALL + SELECT * FROM {{ table.destination.name }}_incr) t1 + JOIN (SELECT {{ table.split_by_column }}, max({{ table.check_column }}) max_modified + FROM (SELECT * FROM {{ table.destination.name }}_base + UNION ALL + SELECT * FROM {{ table.destination.name }}_incr) t2 + GROUP BY {{ table.split_by_column }}) s + ON t1.{{ table.split_by_column }} = s.{{ table.split_by_column }} AND t1.{{ table.check_column }} = s.max_modified; diff --git a/templates/sqoop-parquet-hdfs-hive-merge/create-report-table.sql b/templates/sqoop-parquet-hdfs-hive-merge/create-report-table.sql new file mode 100644 index 0000000..397eacb --- /dev/null +++ b/templates/sqoop-parquet-hdfs-hive-merge/create-report-table.sql @@ -0,0 +1,48 @@ +{# Copyright 2017 Cargill Incorporated + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. #} + +set sync_ddl=1; +USE {{ conf.staging_database.name }}; + +-- cannot get create table as select to work. -- +--ERROR: AnalysisException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient-- +/* CREATE EXTERNAL TABLE {{ table.destination.name }}_report +STORED AS PARQUET +TBLPROPERTIES ('parquet.compression'='SNAPPY', + 'SOURCE' = '{{ table.META_SOURCE }}', + 'SECURITY_CLASSIFICATION' = '{{ table.META_SECURITY_CLASSIFICATION }}', + 'LOAD_FREQUENCY' = '{{ table.META_LOAD_FREQUENCY }}', + 'CONTACT_INFO' = '{{ table.META_CONTACT_INFO }}') +AS SELECT * FROM {{ conf.staging_database.name}}.{{ table.destination.name }}_view; +*/ + +-- create table, then insert overwrite -- +CREATE EXTERNAL TABLE IF NOT EXISTS {{ table.destination.name }}_report ( + {% for column in table.columns %} + {{ column.name }} {{ map_datatypes(column).parquet }} COMMENT '{{ column.comment }}' + {%- if not loop.last -%}, {% endif %} + {%- endfor %}) +STORED AS Parquet +LOCATION '{{ conf.staging_database.path }}/{{ table.destination.name }}/base' +TBLPROPERTIES( + 'parquet.compression'='SNAPPY', + 'SOURCE' = '{{ table.META_SOURCE }}', + 'SECURITY_CLASSIFICATION' = '{{ table.META_SECURITY_CLASSIFICATION }}', + 'LOAD_FREQUENCY' = '{{ table.META_LOAD_FREQUENCY }}', + 'CONTACT_INFO' = '{{ table.META_CONTACT_INFO }}' +); + +insert overwrite {{ table.destination.name }}_report select * from {{ table.destination.name }}_view; + +COMPUTE STATS {{ table.destination.name }}_report diff --git a/templates/sqoop-parquet-hdfs-hive-merge/drop-raw-tables.sql b/templates/sqoop-parquet-hdfs-hive-merge/drop-raw-tables.sql new file mode 100644 index 0000000..56c1125 --- /dev/null +++ b/templates/sqoop-parquet-hdfs-hive-merge/drop-raw-tables.sql @@ -0,0 +1,19 @@ +{# Copyright 2017 Cargill Incorporated + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. #} + +-- Drop the tables and the view -- +USE {{ conf.staging_database.name }}; +DROP VIEW IF EXISTS {{ table.destination.name }}_view; +DROP TABLE IF EXISTS {{ table.destination.name }}_base; +DROP TABLE IF EXISTS {{ table.destination.name }}_incr; diff --git a/templates/sqoop-parquet-hdfs-hive-merge/drop-report-table.sql b/templates/sqoop-parquet-hdfs-hive-merge/drop-report-table.sql new file mode 100644 index 0000000..21781aa --- /dev/null +++ b/templates/sqoop-parquet-hdfs-hive-merge/drop-report-table.sql @@ -0,0 +1,17 @@ +{# Copyright 2017 Cargill Incorporated + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. #} + +-- Drop the tables and the view -- +USE {{ conf.staging_database.name }}; +DROP TABLE IF EXISTS {{ table.destination.name }}_report; diff --git a/templates/sqoop-parquet-hdfs-hive-merge/hdfs-delete.sh b/templates/sqoop-parquet-hdfs-hive-merge/hdfs-delete.sh new file mode 100644 index 0000000..871d39b --- /dev/null +++ b/templates/sqoop-parquet-hdfs-hive-merge/hdfs-delete.sh @@ -0,0 +1,18 @@ +#!/bin/bash +{# Copyright 2017 Cargill Incorporated + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. #} + +# Remove parquet data from hdfs +set -eu +sudo -u hdfs hdfs dfs -rm -r -f {{ conf.staging_database.path }}/{{ table.destination.name }}/ diff --git a/templates/sqoop-parquet-hdfs-hive-merge/hdfs-incr-clear.sh b/templates/sqoop-parquet-hdfs-hive-merge/hdfs-incr-clear.sh new file mode 100644 index 0000000..3fe9e33 --- /dev/null +++ b/templates/sqoop-parquet-hdfs-hive-merge/hdfs-incr-clear.sh @@ -0,0 +1,18 @@ +#!/bin/bash +{# Copyright 2017 Cargill Incorporated + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. #} + +# Remove parquet data from incr +set -eu +sudo -u hdfs hdfs dfs -rm -r -f {{ conf.staging_database.path }}/{{ table.destination.name }}/incr/* diff --git a/templates/sqoop-parquet-hdfs-hive-merge/hdfs-report-delete.sh b/templates/sqoop-parquet-hdfs-hive-merge/hdfs-report-delete.sh new file mode 100644 index 0000000..e7b62f9 --- /dev/null +++ b/templates/sqoop-parquet-hdfs-hive-merge/hdfs-report-delete.sh @@ -0,0 +1,18 @@ +#!/bin/bash +{# Copyright 2017 Cargill Incorporated + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. #} + +# Remove parquet data from hdfs +set -eu +sudo -u hdfs hdfs dfs -rm -r -f {{ conf.staging_database.path }}/{{ table.destination.name }}_report/ diff --git a/templates/sqoop-parquet-hdfs-hive-merge/imports b/templates/sqoop-parquet-hdfs-hive-merge/imports new file mode 100644 index 0000000..93549ad --- /dev/null +++ b/templates/sqoop-parquet-hdfs-hive-merge/imports @@ -0,0 +1,5 @@ +../shared/.gitignore +../shared/type-mapping.yml +../shared/sqoop-exec.sh +../shared/sqoop-delete.sh +../shared/run-with-logging.sh diff --git a/templates/sqoop-parquet-hdfs-hive-merge/move-1st-sqoop.sh b/templates/sqoop-parquet-hdfs-hive-merge/move-1st-sqoop.sh new file mode 100644 index 0000000..cd3281c --- /dev/null +++ b/templates/sqoop-parquet-hdfs-hive-merge/move-1st-sqoop.sh @@ -0,0 +1,19 @@ +#!/bin/bash +{# Copyright 2017 Cargill Incorporated + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. #} + +# Move parquet data from /incr to /base +set -eu +#hdfs dfs -mkdir {{ conf.staging_database.path }}/{{ table.destination.name }}/base || true +sudo -u hdfs hdfs dfs -mv {{ conf.staging_database.path }}/{{ table.destination.name }}/incr/* {{ conf.staging_database.path }}/{{ table.destination.name }}/base diff --git a/templates/sqoop-parquet-hdfs-hive-merge/overwrite-base-table.sql b/templates/sqoop-parquet-hdfs-hive-merge/overwrite-base-table.sql new file mode 100644 index 0000000..f147d03 --- /dev/null +++ b/templates/sqoop-parquet-hdfs-hive-merge/overwrite-base-table.sql @@ -0,0 +1,22 @@ +{# Copyright 2017 Cargill Incorporated + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. #} + +set sync_ddl=1; +USE {{ conf.staging_database.name }}; + +-- Overwrite base table with report table -- +INSERT OVERWRITE TABLE {{ table.destination.name }}_base + SELECT * FROM {{ conf.staging_database.name }}.{{ table.destination.name }}_report; + + diff --git a/templates/sqoop-parquet-hdfs-hive-merge/sqoop-create.sh b/templates/sqoop-parquet-hdfs-hive-merge/sqoop-create.sh new file mode 100644 index 0000000..19ed62a --- /dev/null +++ b/templates/sqoop-parquet-hdfs-hive-merge/sqoop-create.sh @@ -0,0 +1,41 @@ +#!/bin/bash +{# Copyright 2017 Cargill Incorporated + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. #} +{# This function will put the --map-column-java col=String parameter for any clob data types.#} + {% macro map_clobs_macro(columns) -%} + {{ map_clobs(columns) }} + {%- endmacro -%} +# Create a Sqoop job +set -eu +sqoop job -D 'sqoop.metastore.client.record.password=true' \ + --create {{ conf.source_database.name }}.{{ table.source.name }}.{{ conf.sqoop_job_name_suffix }} \ + -- import \ + --connect {{ conf.source_database.connection_string }} \ + --username {{ conf.user_name }} \ + --password-file {{ conf.sqoop_password_file }} \ + --target-dir {{ conf.staging_database.path }}/{{ table.destination.name }}/incr \ + --incremental append \ + --temporary-rootdir {{ conf.staging_database.path }}/{{ table.destination.name }} \ + --append \ + {{ map_clobs_macro(table.columns) }} \ + --split-by {{ table.split_by_column }} \ + --check-column {{ table.check_column }} \ + --as-parquetfile \ + --fetch-size 1000 \ + --compress \ + --compression-codec snappy \ + -m 1 \ + --query 'SELECT {{ table.columns|map(attribute='name')|join(',\n\t')}} + FROM {{ conf.source_database.name }}.{{ table.source.name }} WHERE $CONDITIONS' + diff --git a/templates/sqoop-parquet-hdfs-hive-merge/type-mapping.yml b/templates/sqoop-parquet-hdfs-hive-merge/type-mapping.yml new file mode 100644 index 0000000..84c9057 --- /dev/null +++ b/templates/sqoop-parquet-hdfs-hive-merge/type-mapping.yml @@ -0,0 +1,100 @@ +date: + kudu: bigint + impala: bigint + parquet: bigint + avro: long +timestamp: + kudu: bigint + impala: timestamp + parquet: bigint + avro: long +datetime: + kudu: bigint + impala: timestamp + parquet: bigint + avro: long +bigint: + kudu: bigint + impala: bigint + parquet: bigint + avro: long +decimal: + kudu: string + impala: decimal + parquet: double + avro: decimal +number: + kudu: string + impala: decimal + parquet: double + avro: decimal +string: + kudu: string + impala: string + parquet: string + avro: string +int: + kudu: int + impala: int + parquet: int + avro: int +bit: + kudu: boolean + impala: boolean + parquet: boolean + avro: boolean +boolean: + kudu: boolean + impala: boolean + parquet: boolean + avro: boolean +numeric: + kudu: string + impala: string + parquet: string + avro: string +double: + kudu: double + impala: double + parquet: double + avro: string +real: + kudu: string + impala: decimal + parquet: string + avro: string +float: + kudu: float + impala: float + parquet: float + avro: float +blob: + kudu: string + impala: string + parquet: string + avro: string +clob: + kudu: string + impala: string + parquet: string + avro: string +nclob: + kudu: string + impala: string + parquet: string + avro: string +varchar: + kudu: string + impala: string + parquet: string + avro: string +long: + kudu: bigint + parquet: bigint + impala: bigint + avro: long +text: + kudu: string + impala: string + parquet: string + avro: string From c1b02b46b61bbc0cc649adb258b17b6cb3dcad07 Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 23 Apr 2018 15:33:19 -0500 Subject: [PATCH 2/6] Used PK instead of split-by in the merge script. Renamed _view to _merge_view. Uncommented other examples in run_tests.sh --- integration-tests/run-tests.sh | 6 +++--- .../create-merge-view.sql | 10 +++++----- .../create-report-table.sql | 4 ++-- .../sqoop-parquet-hdfs-hive-merge/drop-raw-tables.sql | 2 +- .../sqoop-parquet-hdfs-hive-merge/move-1st-sqoop.sh | 1 - 5 files changed, 11 insertions(+), 12 deletions(-) diff --git a/integration-tests/run-tests.sh b/integration-tests/run-tests.sh index 39891cd..b92f89d 100755 --- a/integration-tests/run-tests.sh +++ b/integration-tests/run-tests.sh @@ -38,7 +38,7 @@ set -e docker-compose exec mysql /data/load-data.sh docker-compose exec kimpala /run-all-services.sh -#$SCRIPT_DIR/sqoop-parquet-hdfs-impala/run.sh -#$SCRIPT_DIR/sqoop-parquet-full-load/run.sh -#$SCRIPT_DIR/sqoop-parquet-hdfs-kudu-impala/run.sh +$SCRIPT_DIR/sqoop-parquet-hdfs-impala/run.sh +$SCRIPT_DIR/sqoop-parquet-full-load/run.sh +$SCRIPT_DIR/sqoop-parquet-hdfs-kudu-impala/run.sh $SCRIPT_DIR/sqoop-parquet-hdfs-hive-merge/run.sh diff --git a/templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql b/templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql index eb4ff6f..da3269a 100644 --- a/templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql +++ b/templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql @@ -16,15 +16,15 @@ set sync_ddl=1; USE {{ conf.staging_database.name }}; --Create merged view -- -DROP VIEW IF EXISTS {{ table.destination.name }}_view; -CREATE VIEW {{ table.destination.name }}_view AS +DROP VIEW IF EXISTS {{ table.destination.name }}_merge_view; +CREATE VIEW {{ table.destination.name }}_merge_view AS SELECT t1.* FROM (SELECT * FROM {{ table.destination.name }}_base UNION ALL SELECT * FROM {{ table.destination.name }}_incr) t1 - JOIN (SELECT {{ table.split_by_column }}, max({{ table.check_column }}) max_modified + JOIN (SELECT {{ table.primary_keys }}, max({{ table.check_column }}) max_modified FROM (SELECT * FROM {{ table.destination.name }}_base UNION ALL SELECT * FROM {{ table.destination.name }}_incr) t2 - GROUP BY {{ table.split_by_column }}) s - ON t1.{{ table.split_by_column }} = s.{{ table.split_by_column }} AND t1.{{ table.check_column }} = s.max_modified; + GROUP BY {{ table.primary_keys }}) s + ON t1.{{ table.primary_keys }} = s.{{ table.primary_keys }} AND t1.{{ table.check_column }} = s.max_modified; diff --git a/templates/sqoop-parquet-hdfs-hive-merge/create-report-table.sql b/templates/sqoop-parquet-hdfs-hive-merge/create-report-table.sql index 397eacb..c83eb14 100644 --- a/templates/sqoop-parquet-hdfs-hive-merge/create-report-table.sql +++ b/templates/sqoop-parquet-hdfs-hive-merge/create-report-table.sql @@ -24,7 +24,7 @@ TBLPROPERTIES ('parquet.compression'='SNAPPY', 'SECURITY_CLASSIFICATION' = '{{ table.META_SECURITY_CLASSIFICATION }}', 'LOAD_FREQUENCY' = '{{ table.META_LOAD_FREQUENCY }}', 'CONTACT_INFO' = '{{ table.META_CONTACT_INFO }}') -AS SELECT * FROM {{ conf.staging_database.name}}.{{ table.destination.name }}_view; +AS SELECT * FROM {{ conf.staging_database.name}}.{{ table.destination.name }}_merge_view; */ -- create table, then insert overwrite -- @@ -43,6 +43,6 @@ TBLPROPERTIES( 'CONTACT_INFO' = '{{ table.META_CONTACT_INFO }}' ); -insert overwrite {{ table.destination.name }}_report select * from {{ table.destination.name }}_view; +insert overwrite {{ table.destination.name }}_report select * from {{ table.destination.name }}_merge_view; COMPUTE STATS {{ table.destination.name }}_report diff --git a/templates/sqoop-parquet-hdfs-hive-merge/drop-raw-tables.sql b/templates/sqoop-parquet-hdfs-hive-merge/drop-raw-tables.sql index 56c1125..6d70dff 100644 --- a/templates/sqoop-parquet-hdfs-hive-merge/drop-raw-tables.sql +++ b/templates/sqoop-parquet-hdfs-hive-merge/drop-raw-tables.sql @@ -14,6 +14,6 @@ -- Drop the tables and the view -- USE {{ conf.staging_database.name }}; -DROP VIEW IF EXISTS {{ table.destination.name }}_view; +DROP VIEW IF EXISTS {{ table.destination.name }}_merge_view; DROP TABLE IF EXISTS {{ table.destination.name }}_base; DROP TABLE IF EXISTS {{ table.destination.name }}_incr; diff --git a/templates/sqoop-parquet-hdfs-hive-merge/move-1st-sqoop.sh b/templates/sqoop-parquet-hdfs-hive-merge/move-1st-sqoop.sh index cd3281c..b604e39 100644 --- a/templates/sqoop-parquet-hdfs-hive-merge/move-1st-sqoop.sh +++ b/templates/sqoop-parquet-hdfs-hive-merge/move-1st-sqoop.sh @@ -15,5 +15,4 @@ # Move parquet data from /incr to /base set -eu -#hdfs dfs -mkdir {{ conf.staging_database.path }}/{{ table.destination.name }}/base || true sudo -u hdfs hdfs dfs -mv {{ conf.staging_database.path }}/{{ table.destination.name }}/incr/* {{ conf.staging_database.path }}/{{ table.destination.name }}/base From dfdbe1cdb2b83db19df80b0e23f28bed0e365bf1 Mon Sep 17 00:00:00 2001 From: Nicholaev Date: Tue, 22 May 2018 15:11:27 -0500 Subject: [PATCH 3/6] move kudu pipeline testing to last relating to issue #36 --- integration-tests/run-tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/run-tests.sh b/integration-tests/run-tests.sh index 53c1c77..1504419 100755 --- a/integration-tests/run-tests.sh +++ b/integration-tests/run-tests.sh @@ -40,6 +40,6 @@ docker-compose exec kimpala /run-all-services.sh $SCRIPT_DIR/sqoop-parquet-hdfs-impala/run.sh $SCRIPT_DIR/sqoop-parquet-full-load/run.sh +$SCRIPT_DIR/sqoop-parquet-hdfs-hive-merge/run.sh $SCRIPT_DIR/kudu-table-ddl/run.sh $SCRIPT_DIR/sqoop-parquet-hdfs-kudu-impala/run.sh -$SCRIPT_DIR/sqoop-parquet-hdfs-hive-merge/run.sh From 0a1fb465bd527c53be093a7f350edb8b87b81731 Mon Sep 17 00:00:00 2001 From: Nicholaev Date: Fri, 25 May 2018 15:47:47 -0500 Subject: [PATCH 4/6] changed pk format in tables.yml to avoid SQL syntax error, the creation of [Id] --- .../sqoop-parquet-hdfs-hive-merge/tables.yml | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/integration-tests/sqoop-parquet-hdfs-hive-merge/tables.yml b/integration-tests/sqoop-parquet-hdfs-hive-merge/tables.yml index 28d8bd8..7df8a86 100644 --- a/integration-tests/sqoop-parquet-hdfs-hive-merge/tables.yml +++ b/integration-tests/sqoop-parquet-hdfs-hive-merge/tables.yml @@ -35,8 +35,7 @@ tables: - Id num_partitions: 2 # Number of Kudu partitions to create check_column: Id # Incrementing timestamp of numeric column used when incrementally pulling data (sqoop --check-column) - primary_keys: # List of primary keys - - Id + primary_keys: Id # List of primary keys columns: - name: "Id" # Column name in source table datatype: "int" # Column datatype in source table @@ -74,8 +73,7 @@ tables: - Id num_partitions: 2 # Number of Kudu partitions to create check_column: Id # Incrementing timestamp of numeric column used when incrementally pulling data (sqoop --check-column) - primary_keys: # List of primary keys - - Id + primary_keys: Id # List of primary keys columns: - name: "Id" # Column name in source table datatype: "int" # Column datatype in source table @@ -104,8 +102,7 @@ tables: - Id num_partitions: 2 # Number of Kudu partitions to create check_column: Id # Incrementing timestamp of numeric column used when incrementally pulling data (sqoop --check-column) - primary_keys: # List of primary keys - - Id + primary_keys: Id # List of primary keys columns: - name: "Id" # Column name in source table datatype: "int" # Column datatype in source table From 24c40ef22cc67942136d7a364d8f1d2446aecbfe Mon Sep 17 00:00:00 2001 From: Nicholaev Date: Mon, 4 Jun 2018 14:52:55 -0500 Subject: [PATCH 5/6] remove sudo, restore pk as a list --- .../sqoop-parquet-hdfs-hive-merge/tables.yml | 9 ++++++--- .../create-merge-view.sql | 10 +++++++--- templates/sqoop-parquet-hdfs-hive-merge/hdfs-delete.sh | 2 +- .../sqoop-parquet-hdfs-hive-merge/hdfs-incr-clear.sh | 2 +- .../hdfs-report-delete.sh | 2 +- .../sqoop-parquet-hdfs-hive-merge/move-1st-sqoop.sh | 2 +- 6 files changed, 17 insertions(+), 10 deletions(-) diff --git a/integration-tests/sqoop-parquet-hdfs-hive-merge/tables.yml b/integration-tests/sqoop-parquet-hdfs-hive-merge/tables.yml index 7df8a86..06f993e 100644 --- a/integration-tests/sqoop-parquet-hdfs-hive-merge/tables.yml +++ b/integration-tests/sqoop-parquet-hdfs-hive-merge/tables.yml @@ -35,7 +35,8 @@ tables: - Id num_partitions: 2 # Number of Kudu partitions to create check_column: Id # Incrementing timestamp of numeric column used when incrementally pulling data (sqoop --check-column) - primary_keys: Id # List of primary keys + primary_keys: # List of primary keys + - Id columns: - name: "Id" # Column name in source table datatype: "int" # Column datatype in source table @@ -73,7 +74,8 @@ tables: - Id num_partitions: 2 # Number of Kudu partitions to create check_column: Id # Incrementing timestamp of numeric column used when incrementally pulling data (sqoop --check-column) - primary_keys: Id # List of primary keys + primary_keys: # List of primary keys + - Id columns: - name: "Id" # Column name in source table datatype: "int" # Column datatype in source table @@ -102,7 +104,8 @@ tables: - Id num_partitions: 2 # Number of Kudu partitions to create check_column: Id # Incrementing timestamp of numeric column used when incrementally pulling data (sqoop --check-column) - primary_keys: Id # List of primary keys + primary_keys: # List of primary keys + - Id columns: - name: "Id" # Column name in source table datatype: "int" # Column datatype in source table diff --git a/templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql b/templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql index da3269a..f21f829 100644 --- a/templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql +++ b/templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql @@ -22,9 +22,13 @@ CREATE VIEW {{ table.destination.name }}_merge_view AS FROM (SELECT * FROM {{ table.destination.name }}_base UNION ALL SELECT * FROM {{ table.destination.name }}_incr) t1 - JOIN (SELECT {{ table.primary_keys }}, max({{ table.check_column }}) max_modified + JOIN (SELECT {{ table.primary_keys|join(', ') }}, max({{ table.check_column }}) max_modified FROM (SELECT * FROM {{ table.destination.name }}_base UNION ALL SELECT * FROM {{ table.destination.name }}_incr) t2 - GROUP BY {{ table.primary_keys }}) s - ON t1.{{ table.primary_keys }} = s.{{ table.primary_keys }} AND t1.{{ table.check_column }} = s.max_modified; + GROUP BY {{ table.primary_keys|join(', ') }}) s + ON + {% for pk in table.primary_keys %} + t1.pk = s.pk AND + {% endfor %} + t1.{{ table.check_column }} = s.max_modified; diff --git a/templates/sqoop-parquet-hdfs-hive-merge/hdfs-delete.sh b/templates/sqoop-parquet-hdfs-hive-merge/hdfs-delete.sh index 871d39b..60e23f1 100644 --- a/templates/sqoop-parquet-hdfs-hive-merge/hdfs-delete.sh +++ b/templates/sqoop-parquet-hdfs-hive-merge/hdfs-delete.sh @@ -15,4 +15,4 @@ # Remove parquet data from hdfs set -eu -sudo -u hdfs hdfs dfs -rm -r -f {{ conf.staging_database.path }}/{{ table.destination.name }}/ +hdfs dfs -rm -r -f {{ conf.staging_database.path }}/{{ table.destination.name }}/ diff --git a/templates/sqoop-parquet-hdfs-hive-merge/hdfs-incr-clear.sh b/templates/sqoop-parquet-hdfs-hive-merge/hdfs-incr-clear.sh index 3fe9e33..e6757ee 100644 --- a/templates/sqoop-parquet-hdfs-hive-merge/hdfs-incr-clear.sh +++ b/templates/sqoop-parquet-hdfs-hive-merge/hdfs-incr-clear.sh @@ -15,4 +15,4 @@ # Remove parquet data from incr set -eu -sudo -u hdfs hdfs dfs -rm -r -f {{ conf.staging_database.path }}/{{ table.destination.name }}/incr/* +hdfs dfs -rm -r -f {{ conf.staging_database.path }}/{{ table.destination.name }}/incr/* diff --git a/templates/sqoop-parquet-hdfs-hive-merge/hdfs-report-delete.sh b/templates/sqoop-parquet-hdfs-hive-merge/hdfs-report-delete.sh index e7b62f9..242e625 100644 --- a/templates/sqoop-parquet-hdfs-hive-merge/hdfs-report-delete.sh +++ b/templates/sqoop-parquet-hdfs-hive-merge/hdfs-report-delete.sh @@ -15,4 +15,4 @@ # Remove parquet data from hdfs set -eu -sudo -u hdfs hdfs dfs -rm -r -f {{ conf.staging_database.path }}/{{ table.destination.name }}_report/ +hdfs dfs -rm -r -f {{ conf.staging_database.path }}/{{ table.destination.name }}_report/ diff --git a/templates/sqoop-parquet-hdfs-hive-merge/move-1st-sqoop.sh b/templates/sqoop-parquet-hdfs-hive-merge/move-1st-sqoop.sh index b604e39..6838203 100644 --- a/templates/sqoop-parquet-hdfs-hive-merge/move-1st-sqoop.sh +++ b/templates/sqoop-parquet-hdfs-hive-merge/move-1st-sqoop.sh @@ -15,4 +15,4 @@ # Move parquet data from /incr to /base set -eu -sudo -u hdfs hdfs dfs -mv {{ conf.staging_database.path }}/{{ table.destination.name }}/incr/* {{ conf.staging_database.path }}/{{ table.destination.name }}/base +hdfs dfs -mv {{ conf.staging_database.path }}/{{ table.destination.name }}/incr/* {{ conf.staging_database.path }}/{{ table.destination.name }}/base From 44ec9342363dd54594979f06f4e16985c8c1a119 Mon Sep 17 00:00:00 2001 From: Nicholaev Date: Mon, 4 Jun 2018 15:18:01 -0500 Subject: [PATCH 6/6] added brackets to variable names in Jinja loop --- templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql b/templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql index f21f829..8091ad0 100644 --- a/templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql +++ b/templates/sqoop-parquet-hdfs-hive-merge/create-merge-view.sql @@ -29,6 +29,6 @@ CREATE VIEW {{ table.destination.name }}_merge_view AS GROUP BY {{ table.primary_keys|join(', ') }}) s ON {% for pk in table.primary_keys %} - t1.pk = s.pk AND + t1.{{ pk }}= s.{{ pk }} AND {% endfor %} t1.{{ table.check_column }} = s.max_modified;