From d3e2ecf3be366f7b1315a234bd50edfb3ce2ff40 Mon Sep 17 00:00:00 2001 From: Vishal Gupta Date: Wed, 13 Nov 2024 17:11:31 +0000 Subject: [PATCH] Add dataset import differ utility --- run_tests.sh | 2 +- tools/differ/README.md | 29 ++++ tools/differ/__init__.py | 0 tools/differ/differ.py | 251 +++++++++++++++++++++++++++++++++ tools/differ/differ_test.py | 54 +++++++ tools/differ/helper.py | 88 ++++++++++++ tools/differ/test/current.mcf | 35 +++++ tools/differ/test/previous.mcf | 62 ++++++++ tools/differ/test/result1.csv | 5 + tools/differ/test/result2.csv | 5 + 10 files changed, 530 insertions(+), 1 deletion(-) create mode 100644 tools/differ/README.md create mode 100644 tools/differ/__init__.py create mode 100644 tools/differ/differ.py create mode 100644 tools/differ/differ_test.py create mode 100644 tools/differ/helper.py create mode 100644 tools/differ/test/current.mcf create mode 100644 tools/differ/test/previous.mcf create mode 100644 tools/differ/test/result1.csv create mode 100644 tools/differ/test/result2.csv diff --git a/run_tests.sh b/run_tests.sh index 8a527a545d..0ddb8d035e 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -17,7 +17,7 @@ set -e # Array of top-level folders with Python code. -PYTHON_FOLDERS="util/ scripts/ import-automation/executor" +PYTHON_FOLDERS="util/ scripts/ import-automation/executor tools/" # Flag used signal if Python requirements have already been installed. PYTHON_REQUIREMENTS_INSTALLED=false diff --git a/tools/differ/README.md b/tools/differ/README.md new file mode 100644 index 0000000000..16d9fc8cc4 --- /dev/null +++ b/tools/differ/README.md @@ -0,0 +1,29 @@ +# Dataset Differ + +This utility generates a diff (point and series analysis) of two versions of the same dataset for import analysis. + +**Usage** +``` +python differ.py --current_data= --previous_data= +``` + +Parameter description +current_data: Path to the current MCF data (single mcf file or folder/* supported). +previous_data: Path to the previous MCF data (single mcf file or folder/* supported). +output_location: Path to the output data folder. +groupby_columns: Columns to group data for diff analysis in the order var,place,time etc. Default value: “variableMeasured,observationAbout,observationDate” +value_columns: Columns with statvar value (unit etc.) for diff analysis. Default value: "value,unit" + +Summary output generated is of the form below showing counts of differences for each variable. + +variableMeasured added deleted modified same total +0 dcid:var1 1 0 0 0 1 +1 dcid:var2 0 2 1 1 4 +2 dcid:var3 0 0 1 0 1 +3 dcid:var4 0 2 0 0 2 + +Detailed diff output is written to files for further analysis. +- point-analysis-summary.csv: diff summry for point analysis +- point-analysis-results.csv: detailed results for point analysis +- series-analysis-summary.csv: diff summry for series analysis +- series-analysis-results.csv: detailed results for series analysis \ No newline at end of file diff --git a/tools/differ/__init__.py b/tools/differ/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tools/differ/differ.py b/tools/differ/differ.py new file mode 100644 index 0000000000..882cf1719b --- /dev/null +++ b/tools/differ/differ.py @@ -0,0 +1,251 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" Utility to generate a dataset diff for import analysis.""" + +import os +import pandas as pd +import random + +from absl import app +from absl import flags +from absl import logging + +import helper + +FLAGS = flags.FLAGS +flags.DEFINE_string( + 'current_data', '', 'Path to the current MCF data \ + (single mcf file or folder/* supported).') +flags.DEFINE_string( + 'previous_data', '', 'Path to the previous MCF data \ + (single mcf file or folder/* supported).') +flags.DEFINE_string('output_location', 'results', \ + 'Path to the output data folder.') + +flags.DEFINE_string( + 'groupby_columns', 'variableMeasured,observationAbout,observationDate', + 'Columns to group data for diff analysis in the order (var,place,time etc.).' +) +flags.DEFINE_string( + 'value_columns', 'value,unit', + 'Columns with statvar value (unit etc.) for diff analysis.') + +SAMPLE_COUNT = 3 + + +class DatasetDiffer: + """ + Utility to generate a diff (point and series analysis) + of two versions of the same dataset for import analysis. + + Usage: + $ python differ.py --current_data= --previous_data= + + Summary output generated is of the form below showing + counts of differences for each variable. + + variableMeasured added deleted modified same total + 0 dcid:var1 1 0 0 0 1 + 1 dcid:var2 0 2 1 1 4 + 2 dcid:var3 0 0 1 0 1 + 3 dcid:var4 0 2 0 0 2 + + Detailed diff output is written to files for further analysis. + - point-analysis-summary.csv: diff summry for point analysis + - point-analysis-results.csv: detailed results for point analysis + - series-analysis-summary.csv: diff summry for series analysis + - series-analysis-results.csv: detailed results for series analysis + + """ + + def __init__(self, groupby_columns, value_columns): + self.groupby_columns = groupby_columns.split(',') + self.value_columns = value_columns.split(',') + self.variable_column = self.groupby_columns[0] + self.place_column = self.groupby_columns[1] + self.time_column = self.groupby_columns[2] + self.diff_column = '_diff_result' + + def _cleanup_data(self, df: pd.DataFrame): + for column in ['added', 'deleted', 'modified', 'same']: + df[column] = df[column] if column in df.columns else 0 + df[column] = df[column].fillna(0).astype(int) + + def _get_samples(self, row): + years = sorted(row[self.time_column]) + if len(years) > SAMPLE_COUNT: + return years[0] + random.sample(years[1:-1], + SAMPLE_COUNT - 2) + years[-1] + else: + return years + + # Processes two dataset files to identify changes. + def process_data(self, previous_df: pd.DataFrame, + current_df: pd.DataFrame) -> pd.DataFrame: + """ + Process previous and current datasets to generate + the intermediate data for point and series analysis. + Args: + current_df: dataframe with current (new) data + previous_df: dataframe with previous (old) data + Returns: + intermediate merged data for analysis + """ + cur_df_columns = current_df.columns.values.tolist() + self.groupby_columns = [ + i for i in self.groupby_columns if i in cur_df_columns + ] + self.value_columns = [ + i for i in self.value_columns if i in cur_df_columns + ] + df1 = previous_df.loc[:, self.groupby_columns + self.value_columns] + df2 = current_df.loc[:, self.groupby_columns + self.value_columns] + df1['_value_combined'] = df1[self.value_columns]\ + .apply(lambda row: '_'.join(row.values.astype(str)), axis=1) + df2['_value_combined'] = df2[self.value_columns]\ + .apply(lambda row: '_'.join(row.values.astype(str)), axis=1) + df1.drop(columns=self.value_columns, inplace=True) + df2.drop(columns=self.value_columns, inplace=True) + # Perform outer join operation to identify differences. + result = pd.merge(df1, + df2, + on=self.groupby_columns, + how='outer', + indicator=self.diff_column) + result[self.diff_column] = result.apply( + lambda row: 'added' if row[self.diff_column] == 'right_only' \ + else 'deleted' if row[self.diff_column] == 'left_only' \ + else 'modified' if row['_value_combined_x'] != row['_value_combined_y'] \ + else 'same', axis=1) + return result + + def point_analysis(self, + in_data: pd.DataFrame) -> (pd.DataFrame, pd.DataFrame): + """ + Performs point diff analysis to identify data point changes. + Args: + in_data: intermediate data generated by processing previous/current data + Returns: + summary and results from the analysis + """ + column_list = [ + self.variable_column, self.place_column, self.time_column, + self.diff_column + ] + result = in_data.loc[:, column_list] + result = result.groupby( + [self.variable_column, self.diff_column], + observed=True, + as_index=False)[[self.place_column, + self.time_column]].agg(lambda x: x.tolist()) + result['size'] = result.apply(lambda row: len(row[self.place_column]), + axis=1) + result[self.place_column] = result.apply(lambda row: random.sample( + row[self.place_column], + min(SAMPLE_COUNT, len(row[self.place_column]))), + axis=1) + result[self.time_column] = result.apply(self._get_samples, axis=1) + summary = result.pivot( + index=self.variable_column, columns=self.diff_column, values='size')\ + .reset_index().rename_axis(None, axis=1) + self._cleanup_data(summary) + summary['total'] = summary.apply(lambda row: row['added'] + row[ + 'deleted'] + row['modified'] + row['same'], + axis=1) + return summary, result + + def series_analysis(self, + in_data: pd.DataFrame) -> (pd.DataFrame, pd.DataFrame): + """ + Performs series diff analysis to identify time series changes. + Args: + in_data: intermediate data generated by processing previous/current data + Returns: + summary and results from the analysis + """ + column_list = [ + self.variable_column, self.place_column, self.diff_column + ] + result = in_data.loc[:, column_list] + result = result.groupby(column_list, as_index=False).size() + result = result.pivot( + index=[self.variable_column, self.place_column], columns=self.diff_column, values='size')\ + .reset_index().rename_axis(None, axis=1) + self._cleanup_data(result) + result[self.diff_column] = result.apply(lambda row: 'added' if row['added'] > 0 \ + and row['deleted'] + row['modified'] + row['same'] == 0 \ + else 'deleted' if row['deleted'] > 0 and row['added'] + row['modified'] + row['same'] == 0 \ + else 'modified' if row['deleted'] > 0 or row['added'] > 0 or row['modified'] > 0 \ + else 'same', axis=1) + result = result[column_list] + result = result.groupby( + [self.variable_column, self.diff_column], + observed=True, + as_index=False)[self.place_column].agg(lambda x: x.tolist()) + result['size'] = result.apply(lambda row: len(row[self.place_column]), + axis=1) + result[self.place_column] = result.apply(lambda row: random.sample( + row[self.place_column], + min(SAMPLE_COUNT, len(row[self.place_column]))), + axis=1) + summary = result.pivot( + index=self.variable_column, columns=self.diff_column, values='size')\ + .reset_index().rename_axis(None, axis=1) + self._cleanup_data(summary) + summary['total'] = summary.apply(lambda row: row['added'] + row[ + 'deleted'] + row['modified'] + row['same'], + axis=1) + return summary, result + + +def main(_): + '''Runs the differ.''' + differ = DatasetDiffer(FLAGS.groupby_columns, FLAGS.value_columns) + + if not os.path.exists(FLAGS.output_location): + os.makedirs(FLAGS.output_location) + logging.info('Loading data...') + current_df = helper.load_data(FLAGS.current_data, FLAGS.output_location) + previous_df = helper.load_data(FLAGS.previous_data, FLAGS.output_location) + + logging.info('Processing data...') + in_data = differ.process_data(previous_df, current_df) + + logging.info('Point analysis:') + summary, result = differ.point_analysis(in_data) + result.sort_values(by=[differ.diff_column, differ.variable_column], + inplace=True) + print(summary.head(10)) + print(result.head(10)) + helper.write_data(summary, FLAGS.output_location, + 'point-analysis-summary.csv') + helper.write_data(result, FLAGS.output_location, + 'point-analysis-results.csv') + + logging.info('Series analysis:') + summary, result = differ.series_analysis(in_data) + result.sort_values(by=[differ.diff_column, differ.variable_column], + inplace=True) + print(summary.head(10)) + print(result.head(10)) + helper.write_data(summary, FLAGS.output_location, + 'series-analysis-summary.csv') + helper.write_data(result, FLAGS.output_location, + 'series-analysis-results.csv') + + logging.info('Differ output written to folder: %s', FLAGS.output_location) + + +if __name__ == '__main__': + app.run(main) diff --git a/tools/differ/differ_test.py b/tools/differ/differ_test.py new file mode 100644 index 0000000000..905f7b36d6 --- /dev/null +++ b/tools/differ/differ_test.py @@ -0,0 +1,54 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import pandas as pd +import unittest + +from pandas.testing import assert_frame_equal +from differ import DatasetDiffer + +import helper + +module_dir = os.path.dirname(__file__) + + +class TestDiffer(unittest.TestCase): + ''' + Test Class to compare expected output in test/ directory to the + output generated by DatasetDiffer class + ''' + + def test_diff_analysis(self): + groupby_columns = 'variableMeasured,observationAbout,observationDate' + value_columns = 'value' + + differ = DatasetDiffer(groupby_columns, value_columns) + current = helper.load_mcf_file( + os.path.join(module_dir, 'test', 'current.mcf')) + previous = helper.load_mcf_file( + os.path.join(module_dir, 'test', 'previous.mcf')) + + in_data = differ.process_data(previous, current) + summary, result = differ.point_analysis(in_data) + result = pd.read_csv(os.path.join(module_dir, 'test', 'result1.csv')) + assert_frame_equal(summary, result) + + summary, result = differ.series_analysis(in_data) + result = pd.read_csv(os.path.join(module_dir, 'test', 'result2.csv')) + assert_frame_equal(summary, result) + + +if __name__ == '__main__': + unittest.main() diff --git a/tools/differ/helper.py b/tools/differ/helper.py new file mode 100644 index 0000000000..82e03d5ffa --- /dev/null +++ b/tools/differ/helper.py @@ -0,0 +1,88 @@ +import glob +import os +import pandas as pd +import re + +from google.cloud.storage import Client + + +def load_mcf_file(file: str) -> pd.DataFrame: + """ Reads an MCF text file and returns it as a dataframe.""" + mcf_file = open(file, 'r', encoding='utf-8') + mcf_contents = mcf_file.read() + mcf_file.close() + # nodes separated by a blank line + mcf_nodes_text = mcf_contents.split('\n\n') + # lines seprated as property: constraint + mcf_line = re.compile(r'^(\w+): (.*)$') + mcf_nodes = [] + for node in mcf_nodes_text: + current_mcf_node = {} + for line in node.split('\n'): + parsed_line = mcf_line.match(line) + if parsed_line is not None: + current_mcf_node[parsed_line.group(1)] = parsed_line.group(2) + if current_mcf_node and current_mcf_node[ + 'typeOf'] == 'dcid:StatVarObservation': + mcf_nodes.append(current_mcf_node) + df = pd.DataFrame(mcf_nodes) + return df + + +def load_mcf_files(path: str) -> pd.DataFrame: + """ Loads all sharded mcf files in the given directory and + returns a single combined dataframe.""" + df = pd.DataFrame() + filenames = glob.glob(path + '.mcf') + for filename in filenames: + df2 = load_mcf_file(filename) + # Merge data frames, expects same headers + df = pd.concat([df, df2]) + return df + + +def write_data(df: pd.DataFrame, path: str, file: str): + """ Write a dataframe to a CSV file with the given path.""" + out_file = open(os.path.join(path, file), mode='w', encoding='utf-8') + df.to_csv(out_file, index=False, mode='w') + out_file.close() + + +def load_data(path: str, tmp_dir: str) -> pd.DataFrame: + """ Loads data from the given path and returns as a dataframe. + Args: + path: local or gcs path (single file or folder/* format) + Returns: + dataframe with the input data + """ + if path.startswith('gs://'): + path = get_gcs_data(path, tmp_dir) + + if path.endswith('*'): + return load_mcf_files(path) + else: + return load_mcf_file(path) + + +def get_gcs_data(uri: str, tmp_dir: str) -> str: + """ Downloads files form GCS and copies them to local. + Args: + uri: single file path or folder/* format + Returns: + path to the output file/folder + """ + + client = Client() + bucket = client.get_bucket(uri.split('/')[2]) + if uri.endswith('*'): + blobs = client.list_blobs(bucket) + for blob in blobs: + path = os.path.join(os.getcwd(), tmp_dir, blob.name) + blob.download_to_filename(path) + return os.path.join(os.getcwd(), tmp_dir, '*') + else: + file_name = uri.split('/')[3] + blob = bucket.get_blob(file_name) + path = os.path.join(os.getcwd(), tmp_dir, file_name) + blob.download_to_filename(path) + return path diff --git a/tools/differ/test/current.mcf b/tools/differ/test/current.mcf new file mode 100644 index 0000000000..2e994a7a45 --- /dev/null +++ b/tools/differ/test/current.mcf @@ -0,0 +1,35 @@ +Node: cpcb_air_quality/E17/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-24T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___APPCB +variableMeasured: dcid:Max_Concentration_AirPollutant_Ozone +value: 53.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/bhdp3vy7dee0d" + +Node: cpcb_air_quality/E18/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-24T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___APPCB +variableMeasured: dcid:Mean_Concentration_AirPollutant_Ozone +value: 28.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/8e11gqvkt183b" + +Node: cpcb_air_quality/E15/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-24T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___IMD +variableMeasured: dcid:Mean_Concentration_AirPollutant_CO +value: 42.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/h1sjhdxycwwmc" + +Node: cpcb_air_quality/E15/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-25T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___IMD +variableMeasured: dcid:Mean_Concentration_AirPollutant_CO +value: 40.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/h1sjhdxycwwmc" diff --git a/tools/differ/test/previous.mcf b/tools/differ/test/previous.mcf new file mode 100644 index 0000000000..ce9fcb31d1 --- /dev/null +++ b/tools/differ/test/previous.mcf @@ -0,0 +1,62 @@ +Node: cpcb_air_quality/E18/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-24T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___APPCB +variableMeasured: dcid:Mean_Concentration_AirPollutant_Ozone +value: 29.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/8e11gqvkt183b" + +Node: cpcb_air_quality/E16/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-24T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___APPCB +variableMeasured: dcid:Min_Concentration_AirPollutant_Ozone +value: 18.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/z8j7g5sw11klh" + +Node: cpcb_air_quality/E16/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-24T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___IMD +variableMeasured: dcid:Min_Concentration_AirPollutant_Ozone +value: 18.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/z8j7g5sw11klh" + +Node: cpcb_air_quality/E15/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-24T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___APPCB +variableMeasured: dcid:Mean_Concentration_AirPollutant_CO +value: 41.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/h1sjhdxycwwmc" + +Node: cpcb_air_quality/E15/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-25T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___APPCB +variableMeasured: dcid:Mean_Concentration_AirPollutant_CO +value: 40.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/h1sjhdxycwwmc" + +Node: cpcb_air_quality/E15/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-24T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___IMD +variableMeasured: dcid:Mean_Concentration_AirPollutant_CO +value: 41.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/h1sjhdxycwwmc" + +Node: cpcb_air_quality/E15/944d9e6d-ec38-4e61-175a-9bbabfd35f97 +observationDate: "2024-09-25T12:00:00" +unit: dcid:MicrogramsPerCubicMeter +observationAbout: dcid:cpcpAq/Secretariat_Amaravati___IMD +variableMeasured: dcid:Mean_Concentration_AirPollutant_CO +value: 40.0 +typeOf: dcid:StatVarObservation +dcid: "dc/o/h1sjhdxycwwmc" diff --git a/tools/differ/test/result1.csv b/tools/differ/test/result1.csv new file mode 100644 index 0000000000..4d344b5639 --- /dev/null +++ b/tools/differ/test/result1.csv @@ -0,0 +1,5 @@ +variableMeasured,added,deleted,modified,same,total +dcid:Max_Concentration_AirPollutant_Ozone,1,0,0,0,1 +dcid:Mean_Concentration_AirPollutant_CO,0,2,1,1,4 +dcid:Mean_Concentration_AirPollutant_Ozone,0,0,1,0,1 +dcid:Min_Concentration_AirPollutant_Ozone,0,2,0,0,2 diff --git a/tools/differ/test/result2.csv b/tools/differ/test/result2.csv new file mode 100644 index 0000000000..4f3b954643 --- /dev/null +++ b/tools/differ/test/result2.csv @@ -0,0 +1,5 @@ +variableMeasured,added,deleted,modified,same,total +dcid:Max_Concentration_AirPollutant_Ozone,1,0,0,0,1 +dcid:Mean_Concentration_AirPollutant_CO,0,1,1,0,2 +dcid:Mean_Concentration_AirPollutant_Ozone,0,0,1,0,1 +dcid:Min_Concentration_AirPollutant_Ozone,0,2,0,0,2