Skip to content

Commit

Permalink
Merge pull request #33 from dionsongus/master
Browse files Browse the repository at this point in the history
add hive merge pipeline
  • Loading branch information
afoerster authored Jun 6, 2018
2 parents f6981de + 44ec934 commit 7c704f0
Show file tree
Hide file tree
Showing 23 changed files with 769 additions and 0 deletions.
1 change: 1 addition & 0 deletions integration-tests/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ docker-compose exec kimpala /run-all-services.sh

$SCRIPT_DIR/sqoop-parquet-hdfs-impala/run.sh
$SCRIPT_DIR/sqoop-parquet-full-load/run.sh
$SCRIPT_DIR/sqoop-parquet-hdfs-hive-merge/run.sh
$SCRIPT_DIR/kudu-table-ddl/run.sh
$SCRIPT_DIR/sqoop-parquet-hdfs-kudu-impala/run.sh
7 changes: 7 additions & 0 deletions integration-tests/sqoop-parquet-hdfs-hive-merge/env.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
connection_string: "jdbc:mysql://mysql:3306/integration_test"
hdfs_basedir: "hdfs://0.0.0.0:8020/user/hive/warehouse"
source_db_user_name: "pipewrench"
password_file: "file:////mount/password.file"
destination_database: "default"
impala_cmd: "impala-shell -i localhost -f "
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash -e
pipewrench-merge --conf=tables.yml --debug_level ERROR --env=env.yml --pipeline-templates=../../templates/sqoop-parquet-hdfs-hive-merge
1 change: 1 addition & 0 deletions integration-tests/sqoop-parquet-hdfs-hive-merge/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This pipeline uses hive to merge the incremental data as explained in https://hortonworks.com/blog/four-step-strategy-incremental-updates-hive/
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash
# Copyright 2017 Cargill Incorporated
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cd $SCRIPT_DIR
# verify we can generate scripts without error
sudo -u hdfs hdfs dfs -rm -r /user/hive/warehouse/* || true
make -j1 integration-test-all -C output/sqoop-parquet-hdfs-hive-merge

6 changes: 6 additions & 0 deletions integration-tests/sqoop-parquet-hdfs-hive-merge/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash
set -eu
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cd $SCRIPT_DIR
./generate-scripts
docker-compose exec kimpala /mount/sqoop-parquet-hdfs-hive-merge/run-in-container.sh
178 changes: 178 additions & 0 deletions integration-tests/sqoop-parquet-hdfs-hive-merge/tables.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
---
name: "sqoop_parquet_hdfs_hive_merge" # The name of this configuration
user_name: {{ source_db_user_name }} # Source database user name
type_mapping: type-mapping.yml # Type mapping used for database type conversion
sqoop_export_dir: {{ hdfs_basedir }}/export # Sqoop export data HDFS path
sqoop_password_file: {{ password_file}} # Password file for sqoop. Must reside in HDFS
sqoop_password: {{ password }} # Password for sqoop. Must reside in HDFS
connection_manager: "org.apache.sqoop.manager.MySQLManager" # Connection manager fully qualified class
sqoop_job_name_suffix: test # Suffix added to sqoop jobs. Can be used to differentiate environments
impala_cmd: "{{ impala_cmd }}"
source_database:
name: "integration_test" # Source database name
connection_string: {{ connection_string }} # Source database connection string. Should be kept in 'env.yml'
cmd: "mysql -P 3306 -uroot -ppipewrench -h mysql <"
staging_database:
name: "{{ destination_database }}" # Staging database name.
path: "{{ hdfs_basedir }}" # Staging database HDFS path
result_database:
name: "{{ destination_database }}" # Result database
path: "{{ hdfs_basedir }}" # Result database HDFS path
tables:
- id: "titanic" # Uniquely identifies this table
META_CONTACT_INFO: "[email protected]" # Contact info will be loaded into tblproperties
META_LOAD_FREQUENCY: "STREAMING" # Load frequency will be loaded into tblproperties
META_SECURITY_CLASSIFICATION: "OPEN" # Security classification will be loaded into tblproperties
META_SOURCE: "upstream.source.location" # Source will be loaded into tblproperties
source:
name: "titanic" # Source table name
file: ../../../../../data/Titanic.csv
destination:
name: "titanic" # Destination (Impala) table name
split_by_column: "Id" # Sqoop split by column (--split-by)
kudu:
hash_by: # List of columns to hash by
- Id
num_partitions: 2 # Number of Kudu partitions to create
check_column: Id # Incrementing timestamp of numeric column used when incrementally pulling data (sqoop --check-column)
primary_keys: # List of primary keys
- Id
columns:
- name: "Id" # Column name in source table
datatype: "int" # Column datatype in source table
comment: "comment" # Column comment
- name: "LastName"
datatype: "text"
comment: "comment"
- name: "FirstName"
datatype: "text"
comment: "comment"
- name: "PClass"
datatype: "text"
comment: "comment"
- name: "Age"
datatype: "int"
comment: "comment"
- name: "Sex"
datatype: "text"
comment: "comment"
- name: "Survived"
datatype: "int"
comment: "comment"
- name: "SexCode"
datatype: "int"
comment: "comment"
- id: "vocab" # Uniquely identifies this table
source:
name: "vocab" # Source table name
file: ../../../../../data/Vocab.csv
destination:
name: "vocab" # Destination (Impala) table name
split_by_column: "Id" # Sqoop split by column (--split-by)
kudu:
hash_by: # List of columns to hash by
- Id
num_partitions: 2 # Number of Kudu partitions to create
check_column: Id # Incrementing timestamp of numeric column used when incrementally pulling data (sqoop --check-column)
primary_keys: # List of primary keys
- Id
columns:
- name: "Id" # Column name in source table
datatype: "int" # Column datatype in source table
comment: "comment" # Column comment
- name: "Year"
datatype: "int"
comment: "comment"
- name: "Sex"
datatype: "text"
comment: "comment"
- name: "Education"
datatype: "int"
comment: "comment"
- name: "Vocabulary"
datatype: "int"
comment: "comment"
- id: "baseball" # Uniquely identifies this table
source:
name: "baseball" # Source table name
file: ../../../../../data/Baseball.csv
destination:
name: "baseball" # Destination (Impala) table name
split_by_column: "Id" # Sqoop split by column (--split-by)
kudu:
hash_by: # List of columns to hash by
- Id
num_partitions: 2 # Number of Kudu partitions to create
check_column: Id # Incrementing timestamp of numeric column used when incrementally pulling data (sqoop --check-column)
primary_keys: # List of primary keys
- Id
columns:
- name: "Id" # Column name in source table
datatype: "int" # Column datatype in source table
comment: "comment" # Column comment
- name: "PlayerId"
datatype: "text"
comment: "comment"
- name: "Year"
datatype: "int"
comment: "comment"
- name: "Stint"
datatype: "int"
comment: "comment"
- name: "Team"
datatype: "text"
comment: "comment"
- name: "LG"
datatype: "int"
comment: "comment"
- name: "G"
datatype: "int"
comment: "comment"
- name: "AB"
datatype: "int"
comment: "comment"
- name: "R"
datatype: "int"
comment: "comment"
- name: "H"
datatype: "int"
comment: "comment"
- name: "X2b"
datatype: "int"
comment: "comment"
- name: "X3b"
datatype: "int"
comment: "comment"
- name: "HR"
datatype: "int"
comment: "comment"
- name: "RBI"
datatype: "int"
comment: "comment"
- name: "SB"
datatype: "int"
comment: "comment"
- name: "CS"
datatype: "int"
comment: "comment"
- name: "BB"
datatype: "int"
comment: "comment"
- name: "SO"
datatype: "int"
comment: "comment"
- name: "IBB"
datatype: "int"
comment: "comment"
- name: "HBP"
datatype: "int"
comment: "comment"
- name: "SH"
datatype: "int"
comment: "comment"
- name: "SF"
datatype: "int"
comment: "comment"
- name: "GIDP"
datatype: "int"
comment: "comment"
87 changes: 87 additions & 0 deletions templates/sqoop-parquet-hdfs-hive-merge/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
{# Copyright 2017 Cargill Incorporated

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. #}

impala-cmd={{ conf.impala_cmd }}

integration-test:
$(MAKE) clean
$(MAKE) first-run
$(MAKE) update

sqoop-create: sqoop-create.sh #### Create Sqoop job
./run-with-logging.sh ./sqoop-create.sh $@

sqoop-delete: ## Delete Sqoop job
./run-with-logging.sh ./sqoop-delete.sh $@

sqoop-exec: ## Execute sqoop job
./run-with-logging.sh ./sqoop-exec.sh $@

create-base-table: create-base-table.sql
$(impala-cmd) create-base-table.sql

create-incr-table: create-incr-table.sql
$(impala-cmd) create-incr-table.sql

create-merge-view: create-merge-view.sql
$(impala-cmd) create-merge-view.sql

create-report-table: create-report-table.sql
$(impala-cmd) create-report-table.sql

overwrite-base-table: overwrite-base-table.sql
$(impala-cmd) overwrite-base-table.sql

hdfs-clean: hdfs-delete.sh ## Delete parquet files from HDFS
./run-with-logging.sh ./hdfs-delete.sh $@

move-1st-sqoop: move-1st-sqoop.sh
./run-with-logging.sh ./move-1st-sqoop.sh $@

drop-raw-tables: drop-raw-tables.sql
$(impala-cmd) drop-raw-tables.sql
./run-with-logging.sh ./hdfs-delete.sh $@

hdfs-incr-clear: hdfs-incr-clear.sh
./run-with-logging.sh ./hdfs-incr-clear.sh $@

drop-report-table: drop-report-table.sql
$(impala-cmd) drop-report-table.sql
./run-with-logging.sh ./hdfs-report-delete.sh $@

first-run:
$(MAKE) create-base-table #keep this on top so that the hdfs directories are created by impala the lower lever user. Otherwise there might be access issue later.
$(MAKE) sqoop-create
$(MAKE) sqoop-exec
$(MAKE) move-1st-sqoop
$(MAKE) create-incr-table

update:
$(MAKE) sqoop-exec
$(MAKE) create-merge-view
$(MAKE) drop-report-table
$(MAKE) create-report-table
$(MAKE) overwrite-base-table
$(MAKE) hdfs-incr-clear

clean:
$(MAKE) drop-raw-tables
$(MAKE) drop-report-table
$(MAKE) sqoop-delete

targets: ## Print out a list of available targets
@fgrep -h ": " $(MAKEFILE_LIST) | fgrep -v fgrep | sed -e 's/\\$$//' | sed -e 's/:.*//'

help:
@fgrep -h "##" $(MAKEFILE_LIST) | fgrep -v fgrep | sed -e 's/\\$$//' | sed -e 's/##//'
42 changes: 42 additions & 0 deletions templates/sqoop-parquet-hdfs-hive-merge/Makefile.meta
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{# Copyright 2017 Cargill Incorporated

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. #}

first-run-all: {%- for table in tables %} first-run-{{ table.id }} {%- endfor %}

{%- for table in tables %}
first-run-{{ table.id }}:
$(MAKE) first-run -C {{ table.id }}
{%- endfor %}

update-all: {%- for table in tables %} update-{{ table.id }} {%- endfor %}

{%- for table in tables %}
update-{{ table.id }}:
$(MAKE) update -C {{ table.id }}
{%- endfor %}

clean-all:
{%- for table in tables %}
$(MAKE) clean -C {{ table.id }}
{%- endfor %}

{%- for table in tables %}
integration-test-{{ table.id }}:
$(MAKE) integration-test -C {{ table.id }}
{%- endfor %}

integration-test-all:
{%- for table in tables %}
$(MAKE) integration-test -C {{ table.id }}
{%- endfor %}
33 changes: 33 additions & 0 deletions templates/sqoop-parquet-hdfs-hive-merge/create-base-table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{# Copyright 2017 Cargill Incorporated

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. #}

set sync_ddl=1;
USE {{ conf.staging_database.name }};

CREATE EXTERNAL TABLE IF NOT EXISTS {{ table.destination.name }}_base (
{% for column in table.columns %}
{{ column.name }} {{ map_datatypes(column).parquet }} COMMENT '{{ column.comment }}'
{%- if not loop.last -%}, {% endif %}
{%- endfor %})
STORED AS Parquet
LOCATION '{{ conf.staging_database.path }}/{{ table.destination.name }}/base'
TBLPROPERTIES(
'parquet.compression'='SNAPPY',
'SOURCE' = '{{ table.META_SOURCE }}',
'SECURITY_CLASSIFICATION' = '{{ table.META_SECURITY_CLASSIFICATION }}',
'LOAD_FREQUENCY' = '{{ table.META_LOAD_FREQUENCY }}',
'CONTACT_INFO' = '{{ table.META_CONTACT_INFO }}'
);

COMPUTE STATS {{ table.destination.name }}_base
Loading

0 comments on commit 7c704f0

Please sign in to comment.