A very specific, but time-saving library for analyzing Hive tables using PySpark.
The library is tested on PySpark 3.1, 3.2
- Clone repository
conda create -n my_env python=3.10
conda activate my_env
pip install -r requirements.txt
pip install -e .
unzip some_zip.zip
find ./ -depth -name "*.txt" -exec sh -c 'mv "$1" "${1%.txt}.py"' _ {} \;
credits to
- Getting info about a table based on Primary Key (PK) and count NULL values in columns
- Comparing a table with a reference table by PK
- Getting info about tables in a schema. Dropping empty tables.
- Validating and getting stats on a table.
- Simplifying operations like reading, writing Hive tables.
- Working with SCD2 tables (creating, merging, joining)
Code in txt for bypassing email firewall
Email for any suggestions - [email protected]
This class helps you
- Getting info about Primary Key of one DF
- Getting info about NULL columns of one DF
- Comparing two tables based on Primary Key
# 1 checks for PK and non PK columns
df_check = DFExtender(df, pk=['pk1', 'pk2'], verbose=True)
df_check.get_info()
# You can access DF with PK duplicates in an attribute `.df_duplicates_pk`
# Count all: 9
# Unique PK count: 8
# PK with duplicates: 1
# PK column 'pk1' contains empty values, be careful!
# PK column 'pk2' contains empty values, be careful!
# Null values in columns - {'column': [count NULL, share NULL]}:
# {'pk1': [2, 0.2222], 'pk2': [2, 0.2222], 'var1': [1, 0.1111], 'var2': [1, 0.1111]}
# Use method `.get_df_with_null(List[str])` to get a df with specified NULL columns
# get results in attr like:
df_check.pk_stats
# PK_stats(cnt_rows=9, unique_pk_cnt=8, pk_with_duplicates_pk=1)
# 2
# finding rows with duplicates by PK sorted by a number of duplicates in descending order
df_check_pk = df_check.df_duplicates_pk.cache()
df_check_pk.show()
# +-----+----+---+------+--------+----------+----------+------+
# |index| pk1|pk2| var1| var2| dt_part|group_part|cnt_pk|
# +-----+----+---+------+--------+----------+----------+------+
# | 1|key1| 1|value1|value2_1|2022-12-15| group1| 2|
# | 2|key1| 1|value1|value2_1|2022-12-16| group2| 2|
# +-----+----+---+------+--------+----------+----------+------+
# 3
# this method returns a DF sorted by count of nulls in selected columns in descending order
df_check_null = df_check.get_df_with_null(['var1', 'var2'])
df_check_null.show()
# +-----+----+---+------+--------+----------+----------+---------+
# |index| pk1|pk2| var1| var2| dt_part|group_part|cnt_nulls|
# +-----+----+---+------+--------+----------+----------+---------+
# | 4|key2| 2| null|value2_1|2022-12-17| group1| 1|
# | 6|key2| 4|value1| null|2022-12-19| group3| 1|
# +-----+----+---+------+--------+----------+----------+---------+
# 4
# Comparing 2 DataFrames by PK
df_main = DFExtender(df, pk=['pk1', 'pk2'], verbose=True)
df_main.compare_tables(df_ref)
# Main DF
# Count all: 6
# Unique PK count: 6
# PK with duplicates: 0
# Reference DF
# Count all: 6
# Unique PK count: 6
# PK with duplicates: 0
# Errors in columns - {'column': [count is_error, share is_error]}
# {'var1': [2, 0.4], 'group_part': [2, 0.4], 'var2': [1, 0.2]}
# Count stats of matching main and reference tables:
# not in main table: 1
# not in reference table: 1
# correct matching: 5
# Use DF in attribute `.df_with_errors` for further analysis
# you can get result from attrs like:
df_main.dict_cols_with_errors
# {'group_part': [2, 0.4], 'var1': [2, 0.4], 'var2': [1, 0.2]}
df_main.matching_results
# Compare_tables_pk_stats(not_in_main_table_cnt=1, not_in_ref_table=1, correct_matching_cnt=5)
# 5
# analyzing results of tables comparison
# filter for finding an exact difference in column
df_matching_errors = df_main.df_with_errors.cache()
df_matching_errors.filter(col('var1_is_diff') == 1)\
.select('var1_is_diff', 'var1_main', 'var1_ref').show()
# +------------+---------+--------------+
# |var1_is_diff|var1_main| var1_ref|
# +------------+---------+--------------+
# | 1| value1| value19|
# | 1| null|value_not_null|
# +------------+---------+--------------+
# columns only present in Reference DF
(
df_matching_errors
.filter(col('is_joined_main').isNull())
.select('pk1', 'pk2', 'is_joined_main', 'is_joined_ref', 'var1_main', 'var1_ref')
).show()
# +----+---+--------------+-------------+---------+--------+
# | pk1|pk2|is_joined_main|is_joined_ref|var1_main|var1_ref|
# +----+---+--------------+-------------+---------+--------+
# |key2| 5| null| 1| null| value1|
# +----+---+--------------+-------------+---------+--------+
# 6
# There is an option to alternative order of columns in .df_with_errors attribute
# Use it this way. It might be easier to read
alt_order_cols = df_main.columns_diff_reordered_all
(
df_matching_errors
.select(*alt_order_cols)
.filter(col('var1_is_diff') == 1)
).show()
# +----+---+--------------+-------------+------------+-----------+---------------+----------+---------+-------------+---------+--------+------------+---------------+--------------+------------------+---------+--------------+------------+
# | pk1|pk2|is_joined_main|is_joined_ref|dt_part_main|dt_part_ref|dt_part_is_diff|index_main|index_ref|index_is_diff|var2_main|var2_ref|var2_is_diff|group_part_main|group_part_ref|group_part_is_diff|var1_main| var1_ref|var1_is_diff|
# +----+---+--------------+-------------+------------+-----------+---------------+----------+---------+-------------+---------+--------+------------+---------------+--------------+------------------+---------+--------------+------------+
# |key1| 1| 1| 1| 2022-12-15| 2022-12-15| 0| 1| 1| 0| value2_1|value2_1| 0| group2| group7| 1| value1| value19| 1|
# |key2| 1| 1| 1| 2022-12-17| 2022-12-17| 0| 4| 4| 0| value2_1|value2_1| 0| group1| group10| 1| null|value_not_null| 1|
# +----+---+--------------+-------------+------------+-----------+---------------+----------+---------+-------------+---------+--------+------------+---------------+--------------+------------------+---------+--------------+------------+
The class helps to get partitions of partitioned Hive table in a readable and ready-to-use format
# 1
table_partitions = TablePartitionDescriber('default.part_table_test1')
table_partitions_got = table_partitions.get_partitions_parsed()
table_partitions_got.show(100, False)
# +----------+----------+
# |dt_part |group_part|
# +----------+----------+
# |2022-12-15|group1 |
# |2022-12-16|group2 |
# |2022-12-16|group3 |
# |2022-12-17|group1 |
# |2022-12-18|group2 |
# |2022-12-19|group3 |
# |2022-12-19|group4 |
# |2022-12-20|group3 |
# |2022-12-20|group7 |
# +----------+----------+
# 2
max_dt = table_partitions.get_max_value_from_partitions('dt_part')
print(max_dt)
# '2022-12-20'
# 3
prefilter = col('group_part') == 'group1'
max_dt_group = table_partitions.get_max_value_from_partitions('dt_part', prefilter=prefilter)
print(max_dt_group)
# '2022-12-17'
# extra
# You may want to change types of column in a table 'table_partitions_got' like in the example but still use a method .get_max_value_from_partitions()
# For this action use in-place method .cast_col_types() like:
table_partitions_got.cast_col_types({'dt_part': 'date'})
SCD2Helper helps to create, validate, update and join SCD2 tables.
# 1. Create SCD2 table
df1_transactions_s = SCD2Helper(
df1_transactions,
pk=['pk1', 'pk2'],
non_pk=['nonpk1', 'nonpk2', 'nonpk3'],
time_col='ts',
)
df1_scd2 = df1_transactions_s.df_to_scd2()
# +---+----+------+------+------+-----------+-------------------+--------------------------------+---------------+-------------+
# |pk1|pk2 |nonpk1|nonpk2|nonpk3|nonpk_extra|ts |row_hash |row_actual_from|row_actual_to|
# +---+----+------+------+------+-----------+-------------------+--------------------------------+---------------+-------------+
# |v1 |null|null |null |c2 |r3 |2023-05-07 15:00:00|56e6807f4b745e20dffeb1b731e5a6d4|2023-05-07 |2023-05-09 |
# |v1 |null|null |null |null |null |2023-05-10 15:00:00|6654c734ccab8f440ff0825eb443dc7f|2023-05-10 |2023-05-10 |
# |v1 |null|null |fds |null |null |2023-05-11 15:00:00|2d2722576095dd7996570b307d777539|2023-05-11 |2023-05-11 |
# |v1 |null|null |fds |asdf |null |2023-05-12 15:00:00|b08363345cd7c1cb14e6f4747ce1563d|2023-05-12 |9999-12-31 |
# |v1 |c1 |a1 |b1 |c1 |r1 |2023-05-01 10:00:00|93e6cc4b8b0445cf261e9417106ae6f0|2023-05-01 |2023-05-02 |
# |v1 |c1 |a1 |b2 |c2 |null |2023-05-03 15:00:00|a6244d3c7c2aed33c4d9525fbef29c1d|2023-05-03 |2023-05-04 |
# |v1 |c1 |null |b2 |c2 |r3 |2023-05-05 15:00:00|17f599be9e07976c2036361c9ad8f633|2023-05-05 |2023-05-06 |
# |v1 |c1 |null |null |c2 |r3 |2023-05-07 15:00:00|a363a9dd6d5b30865ab5813581941516|2023-05-07 |2023-05-09 |
# |v1 |c1 |null |null |null |null |2023-05-10 15:00:00|da58ea33b20d82042d9969c46c16c3b8|2023-05-10 |2023-05-12 |
# |v1 |c1 |null |null |c2 |r3 |2023-05-13 15:00:00|a363a9dd6d5b30865ab5813581941516|2023-05-13 |9999-12-31 |
# +---+----+------+------+------+-----------+-------------------+--------------------------------+---------------+-------------+
# 2. Validate SCD2 table
df1_scd2_wrong_copy = SCD2Helper(
df1_scd2.withColumn('row_actual_to', F.when(col('row_actual_to') == '9999-12-31', F.lit('1000-01-01'))),
pk=['pk1', 'pk2'],
non_pk=['nonpk1', 'nonpk2', 'nonpk3'],
time_col='ts',
)
res = df1_scd2_wrong_copy.validate_scd2()
# There are 2 PK duplicates by ['pk1', 'pk2', 'row_actual_to'] Look at `.basic_pk_check.df_duplicates_pk`
# 10 rows with invalid dates, look at `.df_invalid_dates`
# Number of records: 10
# Errors_In_SCD2_table(duplicates_by_pk=2, invalid_dates=10, broken_history=0, duplicates_by_version=0)
# 3. Fill history with versions with null values
df1_scd2_add_more_holes = SCD2Helper(
df1_holes_in_history,
pk=['pk1', 'pk2'],
non_pk=['nonpk1', 'nonpk2', 'nonpk3'],
)
df1_filled_history = df1_scd2_add_more_holes.fill_scd2_history()
# +---+----+------+------+------+-----------+-------------------+--------------------------------+---------------+-------------+
# |pk1|pk2 |nonpk1|nonpk2|nonpk3|nonpk_extra|ts |row_hash |row_actual_from|row_actual_to|
# +---+----+------+------+------+-----------+-------------------+--------------------------------+---------------+-------------+
# |v1 |null|null |null |null |null |null |56e6807f4b745e20dffeb1b731e5a6d4|1000-01-01 |2023-05-06 |
# |v1 |null|null |null |c2 |r3 |2023-05-07 15:00:00|56e6807f4b745e20dffeb1b731e5a6d4|2023-05-07 |2023-05-09 |
# |v1 |null|null |null |null |null |2023-05-10 15:00:00|6654c734ccab8f440ff0825eb443dc7f|2023-05-10 |2023-05-10 |
# |v1 |null|null |fds |null |null |2023-05-11 15:00:00|2d2722576095dd7996570b307d777539|2023-05-11 |2023-05-11 |
# |v1 |null|null |fds |asdf |null |2023-05-12 15:00:00|b08363345cd7c1cb14e6f4747ce1563d|2023-05-12 |9999-12-31 |
# |v1 |c1 |null |null |null |null |null |93e6cc4b8b0445cf261e9417106ae6f0|1000-01-01 |2023-04-30 |
# |v1 |c1 |a1 |b1 |c1 |r1 |2023-05-01 10:00:00|93e6cc4b8b0445cf261e9417106ae6f0|2023-05-01 |2023-05-02 |
# |v1 |c1 |null |null |null |null |null |17f599be9e07976c2036361c9ad8f633|2023-05-03 |2023-05-04 |
# |v1 |c1 |null |b2 |c2 |r3 |2023-05-05 15:00:00|17f599be9e07976c2036361c9ad8f633|2023-05-05 |2023-05-06 |
# |v1 |c1 |null |null |null |null |null |da58ea33b20d82042d9969c46c16c3b8|2023-05-07 |2023-05-09 |
# |v1 |c1 |null |null |null |null |2023-05-10 15:00:00|da58ea33b20d82042d9969c46c16c3b8|2023-05-10 |2023-05-12 |
# |v1 |c1 |null |null |null |null |null |da58ea33b20d82042d9969c46c16c3b8|2023-05-13 |9999-12-31 |
# +---+----+------+------+------+-----------+-------------------+--------------------------------+---------------+-------------+
# 4. Merge versions with same hash so only TRUE versions are left
df1_scd2_fewer_non_pk = SCD2Helper(
df1_scd2,
pk=['pk1', 'pk2'],
non_pk=['nonpk2'],
)
df1_merged_history = df1_scd2_fewer_non_pk.merge_scd2_history().cache()
df1_scd2.orderBy('pk1', 'pk2', 'row_actual_from').show(10, False)
# +---+----+------+------+------+-----------+-------------------+--------------------------------+---------------+-------------+
# |pk1|pk2 |nonpk1|nonpk2|nonpk3|nonpk_extra|ts |row_hash |row_actual_from|row_actual_to|
# +---+----+------+------+------+-----------+-------------------+--------------------------------+---------------+-------------+
# |v1 |null|null |null |c2 |r3 |2023-05-07 15:00:00|56e6807f4b745e20dffeb1b731e5a6d4|2023-05-07 |2023-05-09 |
# |v1 |null|null |null |null |null |2023-05-10 15:00:00|6654c734ccab8f440ff0825eb443dc7f|2023-05-10 |2023-05-10 |
# |v1 |null|null |fds |null |null |2023-05-11 15:00:00|2d2722576095dd7996570b307d777539|2023-05-11 |2023-05-11 |
# |v1 |null|null |fds |asdf |null |2023-05-12 15:00:00|b08363345cd7c1cb14e6f4747ce1563d|2023-05-12 |9999-12-31 |
# |v1 |c1 |a1 |b1 |c1 |r1 |2023-05-01 10:00:00|93e6cc4b8b0445cf261e9417106ae6f0|2023-05-01 |2023-05-02 |
# |v1 |c1 |a1 |b2 |c2 |null |2023-05-03 15:00:00|a6244d3c7c2aed33c4d9525fbef29c1d|2023-05-03 |2023-05-04 |
# |v1 |c1 |null |b2 |c2 |r3 |2023-05-05 15:00:00|17f599be9e07976c2036361c9ad8f633|2023-05-05 |2023-05-06 |
# |v1 |c1 |null |null |c2 |r3 |2023-05-07 15:00:00|a363a9dd6d5b30865ab5813581941516|2023-05-07 |2023-05-09 |
# |v1 |c1 |null |null |null |null |2023-05-10 15:00:00|da58ea33b20d82042d9969c46c16c3b8|2023-05-10 |2023-05-12 |
# |v1 |c1 |null |null |c2 |r3 |2023-05-13 15:00:00|a363a9dd6d5b30865ab5813581941516|2023-05-13 |9999-12-31 |
# +---+----+------+------+------+-----------+-------------------+--------------------------------+---------------+-------------+
# 5. Join scd2 tables
df1_scd2_j, df2_scd2_j = [SCD2Helper(df, ['pk1', 'pk2'], [non_pk_col], 'ts').df_to_scd2().cache() for df, non_pk_col in zip((df1, df2), ('email_id', 'phone_id'))]
df1_scd2_j, df2_scd2_j = [SCD2Helper(df.drop('ts'), ['pk1', 'pk2'], [non_pk_col], 'ts') for df, non_pk_col in zip((df1_scd2_j, df2_scd2_j),('email_id', 'phone_id'))]
df1_scd2_j.join_scd2(df2_scd2_j).orderBy('pk1', 'pk2', 'row_actual_from').show(100, False)
# +---+---+--------+--------+---------------+-------------+
# |pk1|pk2|email_id|phone_id|row_actual_from|row_actual_to|
# +---+---+--------+--------+---------------+-------------+
# |v1 |c1 |e1 |e1 |2023-05-01 |2023-05-03 |
# |v1 |c1 |e2 |e1 |2023-05-04 |2023-05-05 |
# |v1 |c1 |e2 |e2 |2023-05-06 |2023-05-09 |
# |v1 |c1 |e3 |e2 |2023-05-10 |2023-05-11 |
# |v1 |c1 |e1 |e3 |2023-05-12 |2023-05-12 |
# |v1 |c1 |e1 |e1 |2023-05-13 |9999-12-31 |
# |v1 |c2 |e1 |e1 |2023-05-01 |2023-05-05 |
# |v1 |c2 |e1 |e2 |2023-05-06 |2023-05-11 |
# |v1 |c2 |e1 |e3 |2023-05-12 |2023-05-12 |
# |v1 |c2 |e1 |e1 |2023-05-13 |9999-12-31 |
# |v1 |c3 |e2 |null |2023-05-04 |2023-05-09 |
# |v1 |c3 |e3 |null |2023-05-10 |2023-05-11 |
# |v1 |c3 |e1 |null |2023-05-12 |9999-12-31 |
# +---+---+--------+--------+---------------+-------------+
# 6. Merge SCD2 update
df_merged_upd = SCD2Helper(dfv1, pk=['pk1', 'pk2'], non_pk=['nonpk1', 'nonpk2', 'nonpk3']).merge_scd2_update(dfv2_ded).cache()
df_merged_upd.show(10, False)
# +---+---+------+------+------+-----------+---------------+-------------+--------------------------------+
# |pk1|pk2|nonpk1|nonpk2|nonpk3|nonpk_extra|row_actual_from|row_actual_to|row_hash |
# +---+---+------+------+------+-----------+---------------+-------------+--------------------------------+
# |v1 |c0 |a1 |b1 |c1 |r1 |2023-01-01 |2023-05-01 |c3ded78319e4a4af660f608d7ce273f7|
# |v1 |c1 |a1 |b1 |c1 |r1 |2023-01-01 |2023-05-01 |93e6cc4b8b0445cf261e9417106ae6f0|
# |v1 |c1 |a1 |b1 |c1 |r2 |2023-05-02 |9999-12-31 |93e6cc4b8b0445cf261e9417106ae6f0|
# |v1 |c3 |a1 |b1 |c1 |r2 |2023-05-01 |9999-12-31 |86c2c40a3d76eb1ea4de2d093525e14b|
# |v1 |c2 |a1 |b1 |c1 |r2 |2023-05-01 |2023-07-23 |afc91c3bf13930551a1658788e1b5ba5|
# |v1 |c2 |a3 |b1 |c1 |r2 |2023-07-24 |9999-12-31 |185d9be103f655c0e42a7ed94e81c2e1|
# |v1 |c0 |a2 |b1 |c1 |r2 |2023-07-24 |9999-12-31 |ebd84206aad35e3e55fb273eaa6eb288|
# |v1 |c4 |a2 |b1 |c1 |r2 |2023-07-24 |9999-12-31 |f987461f2b6fefed0b4a0b6f740f4a50|
# +---+---+------+------+------+-----------+---------------+-------------+--------------------------------+
This class helps cleaning a database that has a lot of empty tables. Empty tables for example might be created because of bulk dropping huge files in HDFS.
# 1
popular_schema = SchemaManager('popular_schema')
# 3 tables in popular_schema
# run find_empty_tables() on instance to find empty tables in popular_schema
# 2
popular_schema.find_empty_tables()
# 2 tables going to be dropped out of 3 (66.67%)
# Data about tables is stored in an attribute '.dict_of_tables':
# 1 - has data, 0 - doesn't and going to be deleted
# run drop_empty_tables() on instance to drop empty tables in popular_schema
# 3
# dict of tables
popular_schema.dict_of_tables
# {'table2': 0, 'table1': 1, 'table3': 0}
# 4
popular_schema.drop_empty_tables()
# After dropping tables there are 1 tables in popular_schema
Makes union of more than 2 tables. Meanwhile native Spark function .unionByName()
or .union()
allows unioning of only two tables at once.
df_unioned = union_all(df1, df2, df3, df4)
# or
list_df = [df1, df2, df3, df4]
df_unioned = union_all(*list_df)
This function helps reading a Hive table with additional information
df = read_table('default.part_table_test1', verbose=True, cnt_files=True)
# root
# |-- index: string (nullable = true)
# |-- pk1: string (nullable = true)
# |-- pk2: string (nullable = true)
# |-- var1: string (nullable = true)
# |-- var2: string (nullable = true)
# |-- dt_part: string (nullable = true)
# |-- group_part: string (nullable = true)
# partition columns: ['dt_part', 'group_part']
# Running command: hdfs dfs -ls -R file:/Users/pyro/github/HiveHelper_on_PySpark/spark-warehouse/part_table_test1 | grep '.parquet' | wc -l
# 9 parquet files in the specified above location
Writes DataFrame to Hive.
This function uses PySpark .write
method, but with common defaults.
# Mandatory parameters are DF and a name of the table. Other are optional
write_table(df.repartition(1), 'test_writing_2', schema='default', partition_cols=['index', 'var1'], mode='overwrite', format_files='parquet')
# DF saved as default.test_writing_2
# it is same as
df.repartition(1).write.format('parquet').partitionBy(['index', 'var1']).mode('overwrite').saveAsTable('default.test_writing_1')
Makes a checkpoint of DF by writing to HDFS and reading this table. It saves an extra line of doing it in two actions: write_table() and read_table()
# Mandatory parameters are DF and table_name
df_dedup_cp = write_read_table(df_dedup, 'table_name12', schema='test_checkpoint', verbose=1)
# DF saved as test_checkpoint.table_name12
Function makes a new DF based on pk and order of non-PK columns with deduplication using row_number()
df.show()
# +-----+----+---+------+--------+----------+----------+
# |index| pk1|pk2| var1| var2| dt_part|group_part|
# +-----+----+---+------+--------+----------+----------+
# | 1|key1| 1|value1|value2_1|2022-12-15| group2|
# | 2|key1| 2|value1|value2_1|2022-12-16| group2|
# | 3|key1| 3|value1|value2_1|2022-12-16| group3|
# | 5|key2| 2|value1|value2_1|2022-12-18| group2|
# | 4|key2| 1| null|value2_1|2022-12-17| group1|
# | 6|key2| 3|value1| null|2022-12-20| group3|
# +-----+----+---+------+--------+----------+----------+
df_dedup = deduplicate_df(df, pk=['pk1'], order_by_cols=[col('dt_part').desc(), col('group_part')])
df_dedup.show()
# +-----+----+---+------+--------+----------+----------+
# |index| pk1|pk2| var1| var2| dt_part|group_part|
# +-----+----+---+------+--------+----------+----------+
# | 2|key1| 2|value1|value2_1|2022-12-16| group2|
# | 6|key2| 3|value1| null|2022-12-20| group3|
# +-----+----+---+------+--------+----------+----------+
- Setup local PySpark with Hive (although it runs fine without Hive) 1 2
- Synth table with partitions
- Basic testing script (testing inside .ipynb)
- How to extend Pyspark
- Finish DFExtender
- Finish SchemaManager
- Finish comparing tables
- Finish Extra: reading from Hive, writing to Hive
- Validate on a Production cluster
- Breaking down big methods and reducing duplication
- Add rounding float numbers in comparing tables
- Good documentation
- Add tests at the bottom of demo.ipynb
- Include custom values in NULLs check
- Add function write_read_table to make checkpoints
- Clean up attributes and methods in DFExtender and refactor something
- Create a new class SCD2Helper to create, validate, update and join SCD2 tables
- Add automatic tests outside of .ipynb (coverage is very low, so is the available time)
- Enable linter and reformat code even more
- Create class to help to compare pk of different tables and simplify routine operations inside functions
- Make another code refactoring in the future