diff --git a/.gitignore b/.gitignore index 45e5170..816314c 100644 --- a/.gitignore +++ b/.gitignore @@ -129,6 +129,7 @@ benchmarks/maqp_scripts/rsync_dm.sh # profiling profiling_results profiling.py +bar.pdf *.lprof optimized_inference.cpp diff --git a/README.md b/README.md index eb3913f..eee4945 100755 --- a/README.md +++ b/README.md @@ -9,6 +9,7 @@ Benjamin Hilprecht, Andreas Schmidt, Moritz Kulessa, Alejandro Molina, Kristian ![DeepDB Overview](baselines/plots/overview.png "DeepDB Overview") # Setup +Tested with python3.7 and python3.8 ``` git clone https://github.com/DataManagementLab/deepdb-public.git cd deepdb-public @@ -18,21 +19,12 @@ source venv/bin/activate pip3 install -r requirements.txt ``` -# How to experiment with DeepDB on a new Dataset -- Specify a new schema in the schemas folder -- Due to the current implementation, make sure to declare - - the primary key, - - the filename of the csv sample file, - - the correct table size and sample rate, - - the relationships among tables if you do not just run queries over a single table, - - any non-key functional dependencies (this is rather an implementation detail), - - and include all columns in the no-compression list by default (as done for the IMDB benchmark), -- To further reduce the training time, you can exclude columns you do not need in your experiments (also done in the IMDB benchmark) -- Generate the HDF/sampled HDF files and learn the RSPN ensemble -- Use the RSPN ensemble to answer queries -- For reference, please check the commands to reproduce the results of the paper +For python3.8: Sometimes spflow fails, in this case remove spflow from requirements.txt, install them and run +``` +pip3 install spflow --no-deps +``` -# How to Reproduce Experiments in the Paper +# Reproduce Experiments ## Cardinality Estimation Download the [Job dataset](http://homepages.cwi.nl/~boncz/job/imdb.tgz). @@ -288,3 +280,61 @@ python3 maqp.py --evaluate_confidence_intervals --confidence_upsampling_factor 100 --confidence_sample_size 10000000 ``` + +### TPC-DS (Single Table) pipeline +As an additional example on how to work with DeepDB, we provide an example on just a single table of the TPC-DS schema for the queries in `./benchmarks/tpc_ds_single_table/sql/aqp_queries.sql`. As a prerequisite, you need a 10 million tuple sample of the store_sales table in the directory `../mqp-data/tpc-ds-benchmark/store_sales_sampled.csv`. Afterwards, +you can run the following commands. To compute the ground truth, you need a postgres instance with a 1T TPC-DS dataset. + +Generate hdf files from csvs +``` +python3 maqp.py --generate_hdf + --dataset tpc-ds-1t + --csv_seperator | + --csv_path ../mqp-data/tpc-ds-benchmark + --hdf_path ../mqp-data/tpc-ds-benchmark/gen_hdf +``` + +Learn the ensemble +``` +python3 maqp.py --generate_ensemble + --dataset tpc-ds-1t + --samples_per_spn 10000000 + --ensemble_strategy single + --hdf_path ../mqp-data/tpc-ds-benchmark/gen_hdf + --ensemble_path ../mqp-data/tpc-ds-benchmark/spn_ensembles + --rdc_threshold 0.3 + --post_sampling_factor 10 +``` + +Compute ground truth +``` +python3 maqp.py --aqp_ground_truth + --dataset tpc-ds-1t + --query_file_location ./benchmarks/tpc_ds_single_table/sql/aqp_queries.sql + --target_path ./benchmarks/tpc_ds_single_table/ground_truth_1t.pkl + --database_name tcpds +``` + +Evaluate the AQP queries +``` +python3 maqp.py --evaluate_aqp_queries + --dataset tpc-ds-1t + --target_path ./baselines/aqp/results/deepDB/tpcds1t_model_based.csv + --ensemble_location ../mqp-data/tpc-ds-benchmark/spn_ensembles/ensemble_single_tpc-ds-1t_10000000.pkl + --query_file_location ./benchmarks/tpc_ds_single_table/sql/aqp_queries.sql + --ground_truth_file_location ./benchmarks/tpc_ds_single_table/ground_truth_1t.pkl +``` + +# How to experiment with DeepDB on a new Dataset +- Specify a new schema in the schemas folder +- Due to the current implementation, make sure to declare + - the primary key, + - the filename of the csv sample file, + - the correct table size and sample rate, + - the relationships among tables if you do not just run queries over a single table, + - any non-key functional dependencies (this is rather an implementation detail), + - and include all columns in the no-compression list by default (as done for the IMDB benchmark), +- To further reduce the training time, you can exclude columns you do not need in your experiments (also done in the IMDB benchmark) +- Generate the HDF/sampled HDF files and learn the RSPN ensemble +- Use the RSPN ensemble to answer queries +- For reference, please check the commands to reproduce the results of the paper diff --git a/aqp_spn/aqp_spn.py b/aqp_spn/aqp_spn.py index 8068a47..9301663 100755 --- a/aqp_spn/aqp_spn.py +++ b/aqp_spn/aqp_spn.py @@ -1,195 +1,38 @@ import copy import logging -import time -from functools import partial from time import perf_counter import numpy as np -from spn.structure.Base import Context, Product from spn.structure.StatisticalTypes import MetaType -from aqp_spn.aqp_leaves import Categorical -from aqp_spn.aqp_leaves import IdentityNumericLeaf, identity_likelihood_range, identity_expectation, \ - categorical_likelihood_range, identity_distinct_ranges, categorical_distinct_ranges -from aqp_spn.aqp_leaves import Sum -from aqp_spn.custom_spflow.custom_learning import learn_mspn -from aqp_spn.expectations import expectation from aqp_spn.group_by_combination import group_by_combinations -from aqp_spn.ranges import NominalRange, NumericRange from ensemble_compilation.spn_ensemble import CombineSPN +from rspn.algorithms.ranges import NominalRange, NumericRange +from rspn.rspn import RSPN +from rspn.structure.leaves import IdentityNumericLeaf, identity_distinct_ranges, categorical_distinct_ranges, \ + Categorical, identity_likelihood_range, categorical_likelihood_range +from rspn.updates.top_down_updates import cluster_center_update_dataset logger = logging.getLogger(__name__) -def build_ds_context(column_names, meta_types, null_values, table_meta_data, train_data, group_by_threshold=1200): - """ - Builds context according to training data. - :param column_names: - :param meta_types: - :param null_values: - :param table_meta_data: - :param train_data: - :return: - """ - ds_context = Context(meta_types=meta_types) - ds_context.null_values = null_values - assert ds_context.null_values is not None, "Null-Values have to be specified" - domain = [] - no_unique_values = [] - # If metadata is given use this to build domains for categorical values - unified_column_dictionary = None - if table_meta_data is not None: - unified_column_dictionary = {k: v for table, table_md in table_meta_data.items() if - table != 'inverted_columns_dict' and table != 'inverted_fd_dict' - for k, v in table_md['categorical_columns_dict'].items()} - - # domain values - group_by_attributes = [] - for col in range(train_data.shape[1]): - - feature_meta_type = meta_types[col] - min_val = np.nanmin(train_data[:, col]) - max_val = np.nanmax(train_data[:, col]) - - unique_vals = len(np.unique(train_data[:, col])) - no_unique_values.append(unique_vals) - if column_names is not None: - if unique_vals <= group_by_threshold and 'mul_' not in column_names[col] and '_nn' not in column_names[col]: - group_by_attributes.append(col) - - domain_values = [min_val, max_val] - - if feature_meta_type == MetaType.REAL: - min_val = np.nanmin(train_data[:, col]) - max_val = np.nanmax(train_data[:, col]) - domain.append([min_val, max_val]) - elif feature_meta_type == MetaType.DISCRETE: - # if no metadata is given, infer domains from data - if column_names is not None \ - and unified_column_dictionary.get(column_names[col]) is not None: - no_diff_values = len(unified_column_dictionary[column_names[col]].keys()) - domain.append(np.arange(0, no_diff_values + 1, 1)) - else: - domain.append(np.arange(domain_values[0], domain_values[1] + 1, 1)) - else: - raise Exception("Unkown MetaType " + str(meta_types[col])) - - ds_context.domains = np.asanyarray(domain) - ds_context.no_unique_values = np.asanyarray(no_unique_values) - ds_context.group_by_attributes = group_by_attributes - - return ds_context - - -def insert_dataset(spn, dataset, metadata): - """ - Updates the SPN when a new dataset arrives. The function recursively traverses the - tree and inserts the different values of a dataset at the according places. - - At every sum node, the child node is selected, based on the minimal euclidian distance to the - cluster_center of on of the child-nodes. - :param spn: - :param dataset: - :param metadata: root of aqp_spn containing meta-information (ensemble-object) - :return: - """ - - if isinstance(spn, Categorical): - - spn.updateStatistics(dataset, metadata) - - return spn - elif isinstance(spn, IdentityNumericLeaf): - - spn.updateStatistics(dataset, metadata) - - return spn - elif isinstance(spn, Sum): - cc = spn.cluster_centers - - node_idx = 0 - - from scipy.spatial import distance - min_dist = np.inf - min_idx = -1 - for n in spn.children: - # distance calculation between the dataset and the different clusters - # (there exist a much faster version on scipy) - # this? https://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise_distances.html - # - proj = projection(dataset, n.scope) - dist = distance.euclidean(cc[node_idx], proj) - if dist < min_dist: - min_dist = dist - min_idx = node_idx - - node_idx += 1 - assert min_idx > -1 - assert min_idx < len(spn.children) - adapt_weights(spn, min_idx) - insert_dataset(spn.children[min_idx], dataset, metadata) - elif isinstance(spn, Product): - - for n in spn.children: - proj = projection(dataset, n.scope) - insert_dataset(n, dataset, metadata) - else: - raise Exception("Invalid node type " + str(type(spn))) - spn.cardinality += 1 - - -def adapt_weights(sum_node, selected_child_idx): - assert isinstance(sum_node, Sum), "adapt_weights called on non Sum-node" - - card = sum_node.cardinality - from math import isclose - cardinalities = np.array(sum_node.weights) * card - - cardinalities[selected_child_idx] += 1 - - sum_node.weights = cardinalities / (card + 1) - sum_node.weights = sum_node.weights.tolist() - - weight_sum = np.sum(sum_node.weights) - - assert isclose(weight_sum, 1, abs_tol=0.05) - - -def projection(dataset, scope): - projection = [] - for idx in scope: - assert len(dataset) > idx, "wrong scope " + str(scope) + " for dataset" + str(dataset) - projection.append(dataset[idx]) - return projection - - -class AQPSPN(CombineSPN): +class AQPSPN(CombineSPN, RSPN): def __init__(self, meta_types, null_values, full_join_size, schema_graph, relationship_list, full_sample_size=None, table_set=None, column_names=None, table_meta_data=None): - CombineSPN.__init__(self, full_join_size, schema_graph, relationship_list, table_set=table_set) - - self.meta_types = meta_types - self.null_values = null_values - self.full_sample_size = full_sample_size + full_sample_size = full_sample_size if full_sample_size is None: - self.full_sample_size = full_join_size - self.column_names = column_names - self.table_meta_data = table_meta_data - self.mspn = None - self.ds_context = None - - self.use_generated_code = False + full_sample_size = full_join_size - # training stats - self.learn_time = None - self.rdc_threshold = None - self.min_instances_slice = None + CombineSPN.__init__(self, full_join_size, schema_graph, relationship_list, table_set=table_set) + RSPN.__init__(self, meta_types, null_values, full_sample_size, + column_names=column_names, + table_meta_data=table_meta_data) def add_dataset(self, dataset): """ - modifies the SPN, based on new dataset. + modifies the RSPN based on new dataset. What has to be done? - Traverse the tree to find the leave nodes where the values have to be added. - Depending on the leaf-nodes, the data in the nodes (mean, unique_vale, p, ...) have to be changed @@ -215,49 +58,28 @@ def add_dataset(self, dataset): dataset), "dataset has a differnt number of columns as spn. spn expects " + str( str(len(self.mspn.scope))) + " columns, but dataset has " + str(len(dataset)) + " columns" - # from var_dump import var_dump - # print(var_dump(self.mspn)) - - insert_dataset(self.mspn, dataset, self) + cluster_center_update_dataset(self.mspn, dataset) self.full_sample_size += 1 self.full_join_size += 1 def learn(self, train_data, rdc_threshold=0.3, min_instances_slice=1, max_sampling_threshold_cols=10000, max_sampling_threshold_rows=100000, bloom_filters=False): - # build domains (including the dependence analysis) - domain_start_t = time.perf_counter() - ds_context = build_ds_context(self.column_names, self.meta_types, self.null_values, self.table_meta_data, - train_data) - self.ds_context = ds_context - domain_end_t = time.perf_counter() - logging.debug(f"Built domains in {domain_end_t - domain_start_t} sec") - - # learn mspn - learn_start_t = time.perf_counter() - # find scopes for variables which indicate not null column + no_compression_scopes = None if self.column_names is not None: - ds_context.no_compression_scopes = [] + no_compression_scopes = [] for table in self.table_set: table_obj = self.schema_graph.table_dictionary[table] for attribute in table_obj.no_compression: column_name = table + '.' + attribute if column_name in self.column_names: - ds_context.no_compression_scopes.append(self.column_names.index(column_name)) + no_compression_scopes.append(self.column_names.index(column_name)) - self.mspn = learn_mspn(train_data, ds_context, - min_instances_slice=min_instances_slice, threshold=rdc_threshold, - max_sampling_threshold_cols=max_sampling_threshold_cols, - max_sampling_threshold_rows=max_sampling_threshold_rows, - bloom_filters=bloom_filters) - learn_end_t = time.perf_counter() - self.learn_time = learn_end_t - learn_start_t - logging.debug(f"Built SPN in {learn_end_t - learn_start_t} sec") - - # statistics - self.rdc_threshold = rdc_threshold - self.min_instances_slice = min_instances_slice + RSPN.learn(self, train_data, rdc_threshold=rdc_threshold, min_instances_slice=min_instances_slice, + max_sampling_threshold_cols=max_sampling_threshold_cols, + max_sampling_threshold_rows=max_sampling_threshold_rows, + no_compression_scopes=no_compression_scopes) def learn_incremental(self, data): """ @@ -281,13 +103,6 @@ def learn_incremental(self, data): return self - # def predict(self, ranges, feature): - # assert feature in self.column_names, "Feature not in column names" - # regressor_idx = self.column_names.index(feature) - # - # prediction = predict(self.mspn, ranges, regressor_idx) - # return prediction - def evaluate_expectation(self, expectation, standard_deviations=False, gen_code_stats=None): """ Evaluates expectation. Does transformations to map to SPN query compilation. @@ -550,236 +365,6 @@ def _group_by_combinations(self, feature_scope, range_conditions=None): return group_by_combinations(self.mspn, self.ds_context, feature_scope, range_conditions, node_distinct_vals=_node_distinct_values, node_likelihoods=_node_likelihoods_range) - def _add_null_values_to_ranges(self, range_conditions): - for col in range(range_conditions.shape[1]): - if range_conditions[0][col] is None: - continue - for idx in range(range_conditions.shape[0]): - range_conditions[idx, col].null_value = self.null_values[col] - - return range_conditions - - def _probability(self, range_conditions): - """ - Compute probability of range conditions. - - e.g. np.array([NominalRange([0]), NumericRange([[0,0.3]]), None]) - """ - - return self._indicator_expectation([], range_conditions=range_conditions) - - def _indicator_expectation(self, feature_scope, identity_leaf_expectation=None, inverted_features=None, - range_conditions=None, force_no_generated=False, gen_code_stats=None): - """ - Compute E[1_{conditions} * X_feature_scope]. Can also compute products (specify multiple feature scopes). - For inverted features 1/val is used. - - Is basis for both unnormalized and normalized expectation computation. - - Uses safe evaluation for products, i.e. compute extra expectation for every multiplier. If results deviate too - largely (>max_deviation), replace by mean. We also experimented with splitting the multipliers into 10 random - groups. However, this works equally well and is faster. - """ - - if inverted_features is None: - inverted_features = [False] * len(feature_scope) - - if range_conditions is None: - range_conditions = np.array([None] * len(self.mspn.scope)).reshape(1, len(self.mspn.scope)) - else: - range_conditions = self._add_null_values_to_ranges(range_conditions) - - if identity_leaf_expectation is None: - _node_expectation = {IdentityNumericLeaf: identity_expectation} - else: - _node_expectation = {IdentityNumericLeaf: identity_leaf_expectation} - - _node_likelihoods_range = {IdentityNumericLeaf: identity_likelihood_range, - Categorical: categorical_likelihood_range} - - if hasattr(self, 'use_generated_code') and self.use_generated_code and not force_no_generated: - full_result = expectation(self.mspn, feature_scope, inverted_features, range_conditions, - node_expectation=_node_expectation, node_likelihoods=_node_likelihoods_range, - use_generated_code=True, spn_id=self.id, meta_types=self.meta_types, - gen_code_stats=gen_code_stats) - else: - full_result = expectation(self.mspn, feature_scope, inverted_features, range_conditions, - node_expectation=_node_expectation, node_likelihoods=_node_likelihoods_range) - - return full_result - - def _augment_not_null_conditions(self, feature_scope, range_conditions): - if range_conditions is None: - range_conditions = np.array([None] * len(self.mspn.scope)).reshape(1, len(self.mspn.scope)) - - # for second computation make sure that features that are not normalized are not NULL - for not_null_scope in feature_scope: - if self.null_values[not_null_scope] is None: - continue - - if range_conditions[0, not_null_scope] is None: - if self.meta_types[not_null_scope] == MetaType.REAL: - range_conditions[:, not_null_scope] = NumericRange([[-np.inf, np.inf]], is_not_null_condition=True) - elif self.meta_types[not_null_scope] == MetaType.DISCRETE: - NumericRange([[-np.inf, np.inf]], is_not_null_condition=True) - categorical_feature_name = self.column_names[not_null_scope] - - for table in self.table_meta_data.keys(): - categorical_values = self.table_meta_data[table]['categorical_columns_dict'] \ - .get(categorical_feature_name) - if categorical_values is not None: - possible_values = list(categorical_values.values()) - possible_values.remove(self.null_values[not_null_scope]) - range_conditions[:, not_null_scope] = NominalRange(possible_values, - is_not_null_condition=True) - break - - return range_conditions - - def _indicator_expectation_with_std(self, feature_scope, inverted_features=None, - range_conditions=None): - """ - Computes standard deviation of the estimator for 1_{conditions}*X_feature_scope. Uses the identity - V(X)=E(X^2)-E(X)^2. - :return: - """ - e_x = self._indicator_expectation(feature_scope, identity_leaf_expectation=identity_expectation, - inverted_features=inverted_features, - range_conditions=range_conditions) - - not_null_conditions = self._augment_not_null_conditions(feature_scope, None) - n = self._probability(not_null_conditions) * self.full_sample_size - - # shortcut: use binomial std if it is just a probability - if len(feature_scope) == 0: - std = np.sqrt(e_x * (1 - e_x) * 1 / n) - return std, e_x - - e_x_sq = self._indicator_expectation(feature_scope, - identity_leaf_expectation=partial(identity_expectation, power=2), - inverted_features=inverted_features, - range_conditions=range_conditions, - force_no_generated=True) - - v_x = e_x_sq - e_x * e_x - - # Indeed divide by sample size of SPN not only qualifying tuples. Because this is not a conditional expectation. - std = np.sqrt(v_x / n) - - return std, e_x - - def _unnormalized_conditional_expectation(self, feature_scope, inverted_features=None, range_conditions=None, - impute_p=False, gen_code_stats=None): - """ - Compute conditional expectation. Can also compute products (specify multiple feature scopes). - For inverted features 1/val is used. Normalization is not possible here. - """ - - range_conditions = self._augment_not_null_conditions(feature_scope, range_conditions) - unnormalized_exp = self._indicator_expectation(feature_scope, inverted_features=inverted_features, - range_conditions=range_conditions, gen_code_stats=gen_code_stats) - - p = self._probability(range_conditions) - if any(p == 0): - if impute_p: - impute_val = np.mean( - unnormalized_exp[np.where(p != 0)[0]] / p[np.where(p != 0)[0]]) - result = unnormalized_exp / p - result[np.where(p == 0)[0]] = impute_val - return result - - return self._indicator_expectation(feature_scope, inverted_features=inverted_features, - gen_code_stats=gen_code_stats) - - return unnormalized_exp / p - - def _unnormalized_conditional_expectation_with_std(self, feature_scope, inverted_features=None, - range_conditions=None, gen_code_stats=None): - """ - Compute conditional expectation. Can also compute products (specify multiple feature scopes). - For inverted features 1/val is used. Normalization is not possible here. - """ - range_conditions = self._augment_not_null_conditions(feature_scope, range_conditions) - p = self._probability(range_conditions) - - e_x_sq = self._indicator_expectation(feature_scope, - identity_leaf_expectation=partial(identity_expectation, power=2), - inverted_features=inverted_features, - range_conditions=range_conditions, - force_no_generated=True, - gen_code_stats=gen_code_stats) / p - - e_x = self._indicator_expectation(feature_scope, inverted_features=inverted_features, - range_conditions=range_conditions, - gen_code_stats=gen_code_stats) / p - - v_x = e_x_sq - e_x * e_x - - n = p * self.full_sample_size - std = np.sqrt(v_x / n) - - return std, e_x - - def _normalized_conditional_expectation(self, feature_scope, inverted_features=None, normalizing_scope=None, - range_conditions=None, standard_deviations=False, impute_p=False, - gen_code_stats=None): - """ - Computes unbiased estimate for conditional expectation E(feature_scope| range_conditions). - To this end, normalization might be required (will always be certain multipliers.) - E[1_{conditions} * X_feature_scope] / E[1_{conditions} * X_normalizing_scope] - - :param feature_scope: - :param inverted_features: - :param normalizing_scope: - :param range_conditions: - :return: - """ - if range_conditions is None: - range_conditions = np.array([None] * len(self.mspn.scope)).reshape(1, len(self.mspn.scope)) - - # If normalization is not required, simply return unnormalized conditional expectation - if normalizing_scope is None or len(normalizing_scope) == 0: - if standard_deviations: - return self._unnormalized_conditional_expectation_with_std(feature_scope, - inverted_features=inverted_features, - range_conditions=range_conditions, - gen_code_stats=gen_code_stats) - else: - return None, self._unnormalized_conditional_expectation(feature_scope, - inverted_features=inverted_features, - range_conditions=range_conditions, - impute_p=impute_p, - gen_code_stats=gen_code_stats) - - assert set(normalizing_scope).issubset(feature_scope), "Normalizing scope must be subset of feature scope" - - # for computation make sure that features that are not normalized are not NULL - range_conditions = self._augment_not_null_conditions(set(feature_scope).difference(normalizing_scope), - range_conditions) - - # E[1_{conditions} * X_feature_scope] - std = None - if standard_deviations: - std, _ = self._unnormalized_conditional_expectation_with_std(feature_scope, - inverted_features=inverted_features, - range_conditions=range_conditions, - gen_code_stats=gen_code_stats) - - nominator = self._indicator_expectation(feature_scope, - inverted_features=inverted_features, - range_conditions=range_conditions, - gen_code_stats=gen_code_stats) - - # E[1_{conditions} * X_normalizing_scope] - inverted_features_of_norm = \ - [inverted_features[feature_scope.index(variable_scope)] for variable_scope in normalizing_scope] - assert all(inverted_features_of_norm), "Normalizing factors should be inverted" - - denominator = self._indicator_expectation(normalizing_scope, - inverted_features=inverted_features_of_norm, - range_conditions=range_conditions) - return std, nominator / denominator - def _parse_conditions(self, conditions, group_by_columns=None, group_by_tuples=None): """ Translates string conditions to NumericRange and NominalRanges the SPN understands. diff --git a/aqp_spn/generate_code.py b/aqp_spn/generate_code.py new file mode 100755 index 0000000..a1de628 --- /dev/null +++ b/aqp_spn/generate_code.py @@ -0,0 +1,34 @@ +import logging +from time import perf_counter + +from rspn.code_generation.generate_code import generate_code, replace_template, TemplatePath + +logger = logging.getLogger(__name__) + + +def generate_ensemble_code(spn_ensemble, floating_data_type='float', ensemble_path=None): + registrations = [] + methods = [] + logger.debug(f"Starting code generation") + for i, spn in enumerate(spn_ensemble.spns): + spn.id = i + gen_start = perf_counter() + generated_method, registrate_method = generate_code(i, spn.mspn, spn.meta_types, floating_data_type) + registrations.append(registrate_method) + methods.append(generated_method) + gen_end = perf_counter() + logger.debug(f"Generated code for SPN {i + 1}/{len(spn_ensemble.spns)} in {gen_end - gen_start:.2f}s.") + + value_dictionary = { + 'methods': '\n\n'.join(methods), + 'registration': '\n\t'.join(registrations) + } + generated_code = replace_template(TemplatePath.MASTER, value_dictionary, 0) + + if ensemble_path is not None: + spn_ensemble.save(ensemble_path) + + with open('optimized_inference.cpp', 'w') as f: + f.write(generated_code) + + logger.debug(f"Finished code generation.") diff --git a/aqp_spn/group_by_combination.py b/aqp_spn/group_by_combination.py index a4e7ec4..455fc1c 100644 --- a/aqp_spn/group_by_combination.py +++ b/aqp_spn/group_by_combination.py @@ -5,9 +5,9 @@ from spn.algorithms.Inference import likelihood from spn.structure.Base import get_nodes_by_type, Leaf, Product, eval_spn_bottom_up, assign_ids -from aqp_spn.aqp_leaves import Sum -from aqp_spn.custom_spflow.custom_transform_structure import Prune -from aqp_spn.custom_spflow.custom_validity import is_valid +from rspn.algorithms.transform_structure import Prune +from rspn.algorithms.validity.validity import is_valid +from rspn.structure.base import Sum logger = logging.getLogger(__name__) @@ -58,17 +58,6 @@ def prod_group_by(node, children, data=None, dtype=np.float64): result_values = [result_value + (matching_value[matching_idx],) for result_value in result_values for matching_value in matching_values] # assert len(result_values) <= len(group_by_scopes) - old_len = len(result_values) - if hasattr(node, 'binary_bloom_filters'): # , "For grouping product nodes must have bloom filters." - for scope, bloom_filter in node.binary_bloom_filters.items(): - if scope[0] in group_by_scopes and scope[1] in group_by_scopes: - idx_left = group_by_scopes.index(scope[0]) - idx_right = group_by_scopes.index(scope[1]) - result_values = [result_value for result_value in result_values if - (result_value[idx_left], result_value[idx_right],) in bloom_filter] - if old_len > len(result_values): - logger.debug( - f"\t\tDue to bloom filters results were reduced by {(1 - len(result_values) / old_len) * 100}%") return group_by_scopes, set(result_values) # Only probabilities, normal inference elif contains_probs: @@ -191,9 +180,6 @@ def marg_recursive(node): newNode.weights.extend(node.weights) if not light: newNode.cluster_centers.extend(node.cluster_centers) - if isinstance(node, Product): - if hasattr(node, 'binary_bloom_filters'): - newNode.binary_bloom_filters = node.binary_bloom_filters for c in node.children: new_c = marg_recursive(c) @@ -210,9 +196,9 @@ def marg_recursive(node): if not light: assign_ids(newNode) - newNode = Prune(newNode, light=light) + newNode = Prune(newNode, check_cluster_centers=light) - valid, err = is_valid(newNode, light=light) + valid, err = is_valid(newNode, check_cluster_centers=light) assert valid, err # Loc.leave() return newNode diff --git a/benchmarks/tpc_ds_single_table/ground_truth_10g.pkl b/benchmarks/tpc_ds_single_table/ground_truth_10g.pkl new file mode 100644 index 0000000..8709bd3 Binary files /dev/null and b/benchmarks/tpc_ds_single_table/ground_truth_10g.pkl differ diff --git a/benchmarks/tpc_ds_single_table/ground_truth_10g.pkl_times.pkl b/benchmarks/tpc_ds_single_table/ground_truth_10g.pkl_times.pkl new file mode 100644 index 0000000..f17a42f Binary files /dev/null and b/benchmarks/tpc_ds_single_table/ground_truth_10g.pkl_times.pkl differ diff --git a/benchmarks/tpc_ds_single_table/setup-sql/schema.sql b/benchmarks/tpc_ds_single_table/setup-sql/schema.sql new file mode 100644 index 0000000..aa3b938 --- /dev/null +++ b/benchmarks/tpc_ds_single_table/setup-sql/schema.sql @@ -0,0 +1,27 @@ +create unlogged table store_sales +( + ss_sold_date_sk integer , + ss_sold_time_sk integer , + ss_item_sk integer not null, + ss_customer_sk integer , + ss_cdemo_sk integer , + ss_hdemo_sk integer , + ss_addr_sk integer , + ss_store_sk integer , + ss_promo_sk integer , + ss_ticket_number integer not null, + ss_quantity integer , + ss_wholesale_cost decimal(7,2) , + ss_list_price decimal(7,2) , + ss_sales_price decimal(7,2) , + ss_ext_discount_amt decimal(7,2) , + ss_ext_sales_price decimal(7,2) , + ss_ext_wholesale_cost decimal(7,2) , + ss_ext_list_price decimal(7,2) , + ss_ext_tax decimal(7,2) , + ss_coupon_amt decimal(7,2) , + ss_net_paid decimal(7,2) , + ss_net_paid_inc_tax decimal(7,2) , + ss_net_profit decimal(7,2) , + primary key (ss_item_sk, ss_ticket_number) +); diff --git a/benchmarks/tpc_ds_single_table/sql/aqp_queries.sql b/benchmarks/tpc_ds_single_table/sql/aqp_queries.sql new file mode 100644 index 0000000..923273f --- /dev/null +++ b/benchmarks/tpc_ds_single_table/sql/aqp_queries.sql @@ -0,0 +1,5 @@ +SELECT SUM(ss_sales_price) FROM store_sales; +SELECT ss_store_sk, SUM(ss_sales_price) FROM store_sales GROUP BY ss_store_sk; +SELECT ss_store_sk, SUM(ss_sales_price) FROM store_sales WHERE ss_sold_date_sk>=2452632 GROUP BY ss_store_sk; +SELECT ss_store_sk, SUM(ss_sales_price) FROM store_sales WHERE ss_sold_date_sk>=2451642 GROUP BY ss_store_sk; +SELECT ss_store_sk, SUM(ss_sales_price) FROM store_sales WHERE ss_sold_date_sk>=2450632 GROUP BY ss_store_sk; \ No newline at end of file diff --git a/data_preparation/join_data_preparation.py b/data_preparation/join_data_preparation.py index 7ac888b..74fda83 100644 --- a/data_preparation/join_data_preparation.py +++ b/data_preparation/join_data_preparation.py @@ -1,11 +1,9 @@ import copy import logging +import math import pickle import random -import dask.dataframe as dd -import gc -import math import pandas as pd from spn.structure.StatisticalTypes import MetaType @@ -446,6 +444,8 @@ def generate_join_sample(self, single_table=None, relationship_list=None, min_st # df_samples = df_samples.join(next_table_data, how='left') #20s # df_samples = df_samples.merge(next_table_data, how='left', right_on=left_attribute, # left_index=True) #34s + # fix pandas error + df_samples.index.name = None df_samples = df_samples.merge(next_table_data, how='left', right_index=True, left_on=right_attribute) @@ -475,6 +475,9 @@ def generate_join_sample(self, single_table=None, relationship_list=None, min_st df_samples = df_samples.set_index(left_attribute, drop=False) next_table_data = next_table_data.set_index(right_attribute, drop=False) + # fix pandas error + df_samples.index.name = None + # next_table_data.index.name = None df_samples = df_samples.merge(next_table_data, how='left', right_index=True, left_on=left_attribute) # 10s, 15s # df_samples.merge(next_table_data, how='left', right_on=right_attribute, diff --git a/data_preparation/prepare_single_tables.py b/data_preparation/prepare_single_tables.py index f6e81f7..6225c4e 100644 --- a/data_preparation/prepare_single_tables.py +++ b/data_preparation/prepare_single_tables.py @@ -19,7 +19,8 @@ def read_table_csv(table_obj, csv_seperator=','): for attribute in table_obj.irrelevant_attributes: df_rows = df_rows.drop(table_obj.table_name + '.' + attribute, axis=1) - return df_rows.convert_objects() + return df_rows.apply(pd.to_numeric, errors="ignore") + # return df_rows.convert_objects() def find_relationships(schema_graph, table, incoming=True): @@ -101,6 +102,9 @@ def prepare_single_table(schema_graph, table, path, max_distinct_vals=10000, csv table_primary_key = table + '.' + table_obj.primary_key[0] assert table_primary_key == left_attribute, "Currently, only references to primary key are supported" + # fix for new pandas version + table_data.index.name = None + neighbor_table_data.index.name = None muls = table_data.join(neighbor_table_data, how='left')[[table_primary_key, right_attribute]] \ .groupby([table_primary_key]).count() diff --git a/ensemble_compilation/spn_ensemble.py b/ensemble_compilation/spn_ensemble.py index df7374b..308d113 100755 --- a/ensemble_compilation/spn_ensemble.py +++ b/ensemble_compilation/spn_ensemble.py @@ -221,7 +221,7 @@ def read_ensemble(ensemble_locations, build_reverse_dict=False): for spn in current_ensemble.spns: logging.debug(f"Including SPN with table_set {spn.table_set} with sampling ratio" f"({spn.full_sample_size} / {spn.full_join_size})") - logging.debug(f"Stats: ({get_structure_stats(spn.mspn)})") + # logging.debug(f"Stats: ({get_structure_stats(spn.mspn)})") # build reverse dict. if build_reverse_dict: _build_reverse_spn_dict(spn) diff --git a/maqp.py b/maqp.py index e133130..7fecc19 100755 --- a/maqp.py +++ b/maqp.py @@ -6,7 +6,7 @@ import numpy as np -from aqp_spn.code_generation.generate_code import generate_ensemble_code +from rspn.code_generation.generate_code import generate_ensemble_code from data_preparation.join_data_preparation import prepare_sample_hdf from data_preparation.prepare_single_tables import prepare_all_tables from ensemble_compilation.spn_ensemble import read_ensemble @@ -16,6 +16,7 @@ from schemas.flights.schema import gen_flights_1B_schema from schemas.imdb.schema import gen_job_light_imdb_schema from schemas.ssb.schema import gen_500gb_ssb_schema +from schemas.tpc_ds.schema import gen_1t_tpc_ds_schema np.random.seed(1) @@ -118,6 +119,8 @@ schema = gen_500gb_ssb_schema(table_csv_path) elif args.dataset == 'flights1B': schema = gen_flights_1B_schema(table_csv_path) + elif args.dataset == 'tpc-ds-1t': + schema = gen_1t_tpc_ds_schema(table_csv_path) else: raise ValueError('Dataset unknown') diff --git a/requirements_python3.7.txt b/requirements_python3.7.txt new file mode 100644 index 0000000..811f091 --- /dev/null +++ b/requirements_python3.7.txt @@ -0,0 +1,66 @@ +arff==0.9 +asn1crypto==0.24.0 +atomicwrites==1.3.0 +attrs==19.1.0 +bloom-filter==1.3 +certifi==2019.3.9 +cffi==1.12.3 +chardet==3.0.4 +cloudpickle==1.2.1 +colorama==0.4.1 +cryptography==2.1.4 +cycler==0.10.0 +dask==1.2.0 +decorator==4.4.0 +ete3==3.1.1 +idna==2.6 +joblib==0.13.2 +keyring==10.6.0 +keyrings.alt==3.0 +kiwisolver==1.1.0 +lark-parser==0.7.1 +locket==0.2.0 +matplotlib==3.0.3 +mock==3.0.4 +more-itertools==7.0.0 +mpmath==1.1.0 +networkx==2.3 +numexpr==2.6.9 +numpy==1.16.3 +pandas==0.23.4 +partd==1.0.0 +patsy==0.5.1 +pluggy==0.9.0 +psycopg2==2.8.3 +py==1.8.0 +py4j==0.10.8.1 +pybind11==2.3.0 +pycparser==2.19 +pycrypto==2.6.1 +pydot==1.4.1 +pyparsing==2.4.0 +PyQt5==5.12.1 +PyQt5-sip==4.19.15 +pytest==4.4.1 +python-dateutil==2.7.5 +pytz==2018.7 +pyverdict==0.1.3.2 +pyxdg==0.25 +PyYAML==3.12 +requests==2.18.4 +scikit-learn==0.20.3 +scipy==1.2.1 +SecretStorage==2.3.1 +six==1.12.0 +sklearn==0.0 +spflow==0.0.34 +SQLAlchemy==1.3.11 +sqlparse==0.3.0 +statsmodels==0.9.0 +sympy==1.4 +tables==3.5.1 +toolz==0.9.0 +tqdm==4.31.1 +urllib3==1.22 +var-dump==1.2 +wincertstore==0.2 diff --git a/requirements_python3.8.txt b/requirements_python3.8.txt new file mode 100644 index 0000000..4bfe691 --- /dev/null +++ b/requirements_python3.8.txt @@ -0,0 +1,57 @@ +arff==0.9 +asn1crypto==0.24.0 +atomicwrites==1.3.0 +attrs==19.1.0 +bloom-filter==1.3 +certifi==2019.3.9 +cffi==1.14.0 +chardet==3.0.4 +cloudpickle==1.2.1 +colorama==0.4.1 +cryptography==2.9.2 +cycler==0.10.0 +dataclasses==0.6 +decorator==4.4.0 +ete3==3.1.1 +idna==2.6 +jeepney==0.4.3 +joblib==0.14.1 +keyring==10.6.0 +keyrings.alt==3.0 +kiwisolver==1.1.0 +lark-parser==0.7.1 +locket==0.2.0 +matplotlib==3.2.1 +mock==3.0.4 +more-itertools==7.0.0 +mpmath==1.1.0 +networkx==2.3 +numexpr==2.7.1 +numpy==1.18.3 +packaging==20.3 +pandas==1.0.3 +patsy==0.5.1 +pkg-resources==0.0.0 +pluggy==0.13.1 +psycopg2==2.8.5 +py==1.8.1 +pycparser==2.20 +pydot==1.4.1 +pyparsing==2.4.7 +PyQt5==5.9.2 +PyQt5-sip==12.7.2 +pytest==5.4.1 +python-dateutil==2.8.1 +pytz==2019.3 +scikit-learn==0.22.2.post1 +scipy==1.4.1 +SecretStorage==3.1.2 +six==1.14.0 +sklearn==0.0 +spflow==0.0.40 +sqlparse==0.3.1 +statsmodels==0.11.1 +sympy==1.5.1 +tables==3.6.1 +tqdm==4.45.0 +wcwidth==0.1.9 diff --git a/rspn/algorithms/expectations.py b/rspn/algorithms/expectations.py new file mode 100755 index 0000000..4e5696d --- /dev/null +++ b/rspn/algorithms/expectations.py @@ -0,0 +1,172 @@ +import logging +from time import perf_counter + +import numpy as np +from spn.algorithms.Inference import likelihood +from spn.structure.Base import Product + +from rspn.code_generation.convert_conditions import convert_range +from rspn.structure.base import Sum + +logger = logging.getLogger(__name__) + + +def expectation(spn, feature_scope, inverted_features, ranges, node_expectation=None, node_likelihoods=None, + use_generated_code=False, spn_id=None, meta_types=None, gen_code_stats=None): + """Compute the Expectation: + E[1_{conditions} * X_feature_scope] + First factor is one if condition is fulfilled. For the second factor the variables in feature scope are + multiplied. If inverted_features[i] is True, variable is taken to denominator. + The conditional expectation would be E[1_{conditions} * X_feature_scope]/P(conditions) + """ + + # evidence_scope = set([i for i, r in enumerate(ranges) if not np.isnan(r)]) + evidence_scope = set([i for i, r in enumerate(ranges[0]) if r is not None]) + evidence = ranges + + assert not (len(evidence_scope) > 0 and evidence is None) + + relevant_scope = set() + relevant_scope.update(evidence_scope) + relevant_scope.update(feature_scope) + if len(relevant_scope) == 0: + return np.ones((ranges.shape[0], 1)) + + if ranges.shape[0] == 1: + + applicable = True + if use_generated_code: + boolean_relevant_scope = [i in relevant_scope for i in range(len(meta_types))] + boolean_feature_scope = [i in feature_scope for i in range(len(meta_types))] + applicable, parameters = convert_range(boolean_relevant_scope, boolean_feature_scope, meta_types, ranges[0], + inverted_features) + + # generated C++ code + if use_generated_code and applicable: + time_start = perf_counter() + import optimized_inference + + spn_func = getattr(optimized_inference, f'spn{spn_id}') + result = np.array([[spn_func(*parameters)]]) + + time_end = perf_counter() + + if gen_code_stats is not None: + gen_code_stats.calls += 1 + gen_code_stats.total_time += (time_end - time_start) + + # logger.debug(f"\t\tGenerated Code Latency: {(time_end - time_start) * 1000:.3f}ms") + return result + + # lightweight non-batch version + else: + return np.array( + [[expectation_recursive(spn, feature_scope, inverted_features, relevant_scope, evidence, + node_expectation, node_likelihoods)]]) + # full batch version + return expectation_recursive_batch(spn, feature_scope, inverted_features, relevant_scope, evidence, + node_expectation, node_likelihoods) + + +def expectation_recursive_batch(node, feature_scope, inverted_features, relevant_scope, evidence, node_expectation, + node_likelihoods): + if isinstance(node, Product): + + llchildren = np.concatenate( + [expectation_recursive_batch(child, feature_scope, inverted_features, relevant_scope, evidence, + node_expectation, node_likelihoods) + for child in node.children if + len(relevant_scope.intersection(child.scope)) > 0], axis=1) + return np.nanprod(llchildren, axis=1).reshape(-1, 1) + + elif isinstance(node, Sum): + if len(relevant_scope.intersection(node.scope)) == 0: + return np.full((evidence.shape[0], 1), np.nan) + + llchildren = np.concatenate( + [expectation_recursive_batch(child, feature_scope, inverted_features, relevant_scope, evidence, + node_expectation, node_likelihoods) + for child in node.children], axis=1) + + relevant_children_idx = np.where(np.isnan(llchildren[0]) == False)[0] + if len(relevant_children_idx) == 0: + return np.array([np.nan]) + + weights_normalizer = sum(node.weights[j] for j in relevant_children_idx) + b = np.array(node.weights)[relevant_children_idx] / weights_normalizer + + return np.dot(llchildren[:, relevant_children_idx], b).reshape(-1, 1) + + else: + if node.scope[0] in feature_scope: + t_node = type(node) + if t_node in node_expectation: + exps = np.zeros((evidence.shape[0], 1)) + + feature_idx = feature_scope.index(node.scope[0]) + inverted = inverted_features[feature_idx] + + exps[:] = node_expectation[t_node](node, evidence, inverted=inverted) + return exps + else: + raise Exception('Node type unknown: ' + str(t_node)) + + return likelihood(node, evidence, node_likelihood=node_likelihoods) + + +def nanproduct(product, factor): + if np.isnan(product): + if not np.isnan(factor): + return factor + else: + return np.nan + else: + if np.isnan(factor): + return product + else: + return product * factor + + +def expectation_recursive(node, feature_scope, inverted_features, relevant_scope, evidence, node_expectation, + node_likelihoods): + if isinstance(node, Product): + + product = np.nan + for child in node.children: + if len(relevant_scope.intersection(child.scope)) > 0: + factor = expectation_recursive(child, feature_scope, inverted_features, relevant_scope, evidence, + node_expectation, node_likelihoods) + product = nanproduct(product, factor) + return product + + elif isinstance(node, Sum): + if len(relevant_scope.intersection(node.scope)) == 0: + return np.nan + + llchildren = [expectation_recursive(child, feature_scope, inverted_features, relevant_scope, evidence, + node_expectation, node_likelihoods) + for child in node.children] + + relevant_children_idx = np.where(np.isnan(llchildren) == False)[0] + + if len(relevant_children_idx) == 0: + return np.nan + + weights_normalizer = sum(node.weights[j] for j in relevant_children_idx) + weighted_sum = sum(node.weights[j] * llchildren[j] for j in relevant_children_idx) + + return weighted_sum / weights_normalizer + + else: + if node.scope[0] in feature_scope: + t_node = type(node) + if t_node in node_expectation: + + feature_idx = feature_scope.index(node.scope[0]) + inverted = inverted_features[feature_idx] + + return node_expectation[t_node](node, evidence, inverted=inverted).item() + else: + raise Exception('Node type unknown: ' + str(t_node)) + + return node_likelihoods[type(node)](node, evidence).item() diff --git a/rspn/algorithms/ranges.py b/rspn/algorithms/ranges.py new file mode 100644 index 0000000..c2ee2f5 --- /dev/null +++ b/rspn/algorithms/ranges.py @@ -0,0 +1,46 @@ +import numpy as np + + +class NominalRange: + """ + This class specifies the range for a nominal attribute. It contains a list of integers which + represent the values which are in the range. + + e.g. possible_values = [5,2] + """ + + def __init__(self, possible_values, null_value=None, is_not_null_condition=False): + self.is_not_null_condition = is_not_null_condition + self.possible_values = np.array(possible_values, dtype=np.int64) + self.null_value = null_value + + def is_impossible(self): + return len(self.possible_values) == 0 + + def get_ranges(self): + return self.possible_values + + +class NumericRange: + """ + This class specifies the range for a numeric attribute. It contains a list of intervals which + represents the values which are valid. Inclusive Intervals specifies whether upper and lower bound are included. + + e.g. ranges = [[10,15],[22,23]] if valid values are between 10 and 15 plus 22 and 23 (bounds inclusive) + """ + + def __init__(self, ranges, inclusive_intervals=None, null_value=None, is_not_null_condition=False): + self.is_not_null_condition = is_not_null_condition + self.ranges = ranges + self.null_value = null_value + self.inclusive_intervals = inclusive_intervals + if self.inclusive_intervals is None: + self.inclusive_intervals = [] + for interval in self.ranges: + self.inclusive_intervals.append([True, True]) + + def is_impossible(self): + return len(self.ranges) == 0 + + def get_ranges(self): + return self.ranges diff --git a/rspn/algorithms/transform_structure.py b/rspn/algorithms/transform_structure.py new file mode 100644 index 0000000..746d2fb --- /dev/null +++ b/rspn/algorithms/transform_structure.py @@ -0,0 +1,72 @@ +from spn.structure.Base import get_nodes_by_type, Product, Leaf, assign_ids + +from rspn.algorithms.validity.validity import is_valid +from rspn.structure.base import Sum + + +def Prune(node, check_cluster_centers=False): + """ + Prunes spn. Ensures that nodes have at least one child and that types of node and children differ. + Adapts weigths and optionally bloom filters accordingly. + :param node: + :return: + """ + + # v, err = is_valid(node) + # assert v, err + nodes = get_nodes_by_type(node, (Product, Sum)) + + while len(nodes) > 0: + n = nodes.pop() + + n_type = type(n) + is_sum = n_type == Sum + + i = 0 + while i < len(n.children): + c = n.children[i] + + # if my child has only one node, we can get rid of it and link directly to that grandchildren + # in this case, no bloom filters can be lost since we do not split + if not isinstance(c, Leaf) and len(c.children) == 1: + n.children[i] = c.children[0] + continue + + # if the type is similar to the type of the child + if n_type == type(c): + + if is_sum: + # cluster centers learned? + if len(n.cluster_centers) > 0: + old_len = len(n.cluster_centers) + len_child_cluster = len(c.cluster_centers) + del n.cluster_centers[i] + n.cluster_centers.extend(c.cluster_centers) + + if check_cluster_centers: + assert old_len - 1 + len_child_cluster == len( + n.cluster_centers), "cluster_center length mismatch, node " + n + c + + del n.children[i] + n.children.extend(c.children) + + if is_sum: + w = n.weights[i] + del n.weights[i] + + n.weights.extend([cw * w for cw in c.weights]) + + continue + + i += 1 + if is_sum and i > 0: + n.weights[0] = 1.0 - sum(n.weights[1:]) + + if isinstance(node, (Product, Sum)) and len(node.children) == 1: + node = node.children[0] + + assign_ids(node) + v, err = is_valid(node, check_cluster_centers=check_cluster_centers) + assert v, err + + return node diff --git a/rspn/algorithms/validity/validity.py b/rspn/algorithms/validity/validity.py new file mode 100644 index 0000000..5a2625d --- /dev/null +++ b/rspn/algorithms/validity/validity.py @@ -0,0 +1,154 @@ +import logging + +import numpy as np +from math import isclose +from spn.structure.Base import get_nodes_by_type, Product, assign_ids, Leaf, eval_spn_bottom_up + +from rspn.structure.base import Sum +from rspn.structure.leaves import IdentityNumericLeaf, Categorical + +logger = logging.getLogger(__name__) + + +def is_consistent(node): + """ + all children of a product node have different scope + """ + + assert node is not None + + allchildscope = set() + for prod_node in reversed(get_nodes_by_type(node, Product)): + nscope = set(prod_node.scope) + + if len(prod_node.children) == 0: + return False, "Product node %s has no children" % prod_node.id + + allchildscope.clear() + sum_features = 0 + for child in prod_node.children: + sum_features += len(child.scope) + allchildscope.update(child.scope) + + if allchildscope != nscope or sum_features != len(allchildscope): + return False, "children of (prod) node %s do not have exclusive scope" % prod_node.id + + return True, None + + +def is_complete(node): + """ + all children of a sum node have same scope as the parent + """ + + assert node is not None + + for sum_node in reversed(get_nodes_by_type(node, Sum)): + nscope = set(sum_node.scope) + + if len(sum_node.children) == 0: + return False, "Sum node %s has no children" % sum_node.id + + for child in sum_node.children: + if nscope != set(child.scope): + return False, "children of (sum) node %s do not have the same scope as parent" % sum_node.id + + return True, None + + +def is_valid_prob_sum(prob_sum, unique_vals, card): + # return True, Null + length = len(prob_sum) - 1 + + if len(prob_sum) != len(unique_vals) + 1: + return False, "len(prob_sum)!= len(unique_vals)+1" + last_prob_sum = 0 + cards = [] + + sum_card = 0 + for i in range(0, len(prob_sum)): + if prob_sum[i] > 1.0001: + return False, "prob_sum[" + str(i) + "] must be =< 1.000, actual value at position " + str(i) + ":" + str( + prob_sum[i]) + ", len:" + str(len(prob_sum)) + if last_prob_sum - 0.0000001 > prob_sum[i]: + return False, "prob_sum value must be increase (last_prob_sum:" + str(last_prob_sum) + ", prob_sum[" + str( + i) + "]:" + str(prob_sum[i]) + num = (prob_sum[i] - last_prob_sum) * card + if False and not isclose(num, round(num), abs_tol=0.05): + err_msg = "wrong probability value at idx " + str(i) + " (" + str( + num) + ")- does not fit to an integer cardinality value for value " + str(unique_vals[i]) + + return False, err_msg + last_prob_sum = prob_sum[i] + sum_card += round(num) + cards.append(round(num)) + + if not isclose(prob_sum[length], 1, abs_tol=0.05): + return False, "Last value of prob_sum must be 1.0" + + return True, None + + +def is_valid(node, check_ids=True, check_cluster_centers=False): + if check_ids: + val, err = has_valid_ids(node) + if not val: + return val, err + + for n in get_nodes_by_type(node): + if len(n.scope) == 0: + return False, "node %s has no scope" % n.id + is_sum = isinstance(n, Sum) + is_prod = isinstance(n, Product) + is_float = isinstance(n, IdentityNumericLeaf) + + if check_cluster_centers and is_sum: + if len(n.children) != len(n.cluster_centers): + return False, "node %s has different children/cluster_centers (#cluster_centers: %d, #childs: %d)" % ( + n.id, len(n.cluster_centers), len(n.children)) + + if is_sum: + if len(n.children) != len(n.weights): + return False, "node %s has different children/weights" % n.id + + weight_sum = np.sum(n.weights) + + if not isclose(weight_sum, 1, abs_tol=0.05): + return False, "Sum of weights is not equal 1.0 (instead:" + weight_sum + ")" + + if is_sum or is_prod: + if len(n.children) == 0: + return False, "node %s has no children" % n.id + + if is_float: + ok, err = is_valid_prob_sum(n.prob_sum, n.unique_vals, n.cardinality) + if not ok: + return False, err + + a, err = is_consistent(node) + if not a: + return a, err + + b, err = is_complete(node) + if not b: + return b, err + + return True, None + + +def has_valid_ids(node): + ids = set() + all_nodes = get_nodes_by_type(node) + for n in all_nodes: + ids.add(n.id) + + if len(ids) != len(all_nodes): + return False, "Nodes are missing ids or there are repeated ids" + + if min(ids) != 0: + return False, "Node ids not starting at 0" + + if max(ids) != len(ids) - 1: + return False, "Node ids not consecutive" + + return True, None diff --git a/rspn/code_generation/convert_conditions.py b/rspn/code_generation/convert_conditions.py new file mode 100755 index 0000000..a4d40fd --- /dev/null +++ b/rspn/code_generation/convert_conditions.py @@ -0,0 +1,67 @@ +import numpy as np +from spn.structure.StatisticalTypes import MetaType + + +def _convert_range(range, pos): + if range[pos] == -np.inf or range[pos] == np.inf: + minusInf = True + condition = 0 + else: + minusInf = False + condition = range[pos] + return minusInf, condition + + +def _convert_real(idx, condition, inverted_features): + # method_params += [f'bool inverse{i}', f'bool leftMinusInf{i}', f'float leftCondition{i}', + # f'bool rightMinusInf{i}', f'float rightCondition{i}', f'bool leftIncluded{i}', + # f'bool rightIncluded{i}', f'float nullValue{i}'] + + inverse = idx in inverted_features + if condition is not None: + leftMinusInf, leftCondition = _convert_range(condition.ranges[0], 0) + rightMinusInf, rightCondition = _convert_range(condition.ranges[0], 1) + return inverse, leftMinusInf, leftCondition, rightMinusInf, rightCondition, condition.inclusive_intervals[0][0], \ + condition.inclusive_intervals[0][1], condition.null_value + + return inverse, False, 0, False, 0, False, False, 0 + + +def _convert_categorical(condition): + # method_params += [f'vector <int> possibleValues{i}', f'int nullValueIdx{i}'] + + if condition is not None: + if condition.is_not_null_condition: + return condition.possible_values, condition.null_value + else: + return condition.possible_values, -1 + + # leaves will anyway not be evaluated + return [0], 0 + + +def convert_range(relevant_scope, featureScope, meta_types, conditions, inverted_features): + """ + Translates conditions for an expectation method call into parameters that can be passed to generated SPN code. + :param relevant_scope: relevant_scope from expectation method + :param featureScope: feature_scope from expectation method + :param meta_types: types of the columns of the SPN + :param conditions: conditions to be translated + :param inverted_features: list indicating which indexes are inverted features (1/x) + :return: Boolean indicating whether inference is supported by generated SPN. Parameters that have to be passed. + """ + parameters = (relevant_scope, featureScope) + + for idx, condition in enumerate(conditions): + if meta_types[idx] == MetaType.DISCRETE: + parameters += _convert_categorical(condition) + elif meta_types[idx] == MetaType.REAL: + # several conditions currently not supported + if condition is not None and len(condition.ranges) > 1: + return False, None + # conditions on feature column currently not supported in C++ + if featureScope[idx] is None: + return False, None + parameters += _convert_real(idx, condition, inverted_features) + + return True, parameters \ No newline at end of file diff --git a/rspn/code_generation/generate_code.py b/rspn/code_generation/generate_code.py new file mode 100755 index 0000000..3848457 --- /dev/null +++ b/rspn/code_generation/generate_code.py @@ -0,0 +1,205 @@ +import logging +from enum import Enum +from time import perf_counter + +import numpy as np +from spn.structure.Base import assign_ids, Product, get_number_of_nodes +from spn.structure.StatisticalTypes import MetaType + +import os + +from rspn.structure.base import Sum +from rspn.structure.leaves import Categorical, IdentityNumericLeaf + +logger = logging.getLogger(__name__) + + +class TemplatePath(Enum): + current_file_path = __file__ + current_file_dir = os.path.dirname(__file__) + MASTER = os.path.join(current_file_dir, 'templates/master.cpp') + CATEGORICAL = os.path.join(current_file_dir, 'templates/categorical_leave.cpp') + IDENTITY = os.path.join(current_file_dir, 'templates/identity_leave.cpp') + PRODUCT = os.path.join(current_file_dir, 'templates/product_node.cpp') + SUM = os.path.join(current_file_dir, 'templates/sum_node.cpp') + METHOD_MASTER = os.path.join(current_file_dir, 'templates/method_master.cpp') + REGISTRATION_MASTER = os.path.join(current_file_dir, 'templates/registration_master.cpp') + + +def replace_template(template_path, value_dictionary, depth): + with open(template_path.value, 'r') as ftemp: + templateString = ftemp.read() + + code_string = templateString.format(**value_dictionary) + padding = ''.join([' '] * depth) + return ''.join([padding + line for line in code_string.splitlines(True)]) + + +def comma_seperated_list(value_list): + return ', '.join([str(v) for v in value_list]) + + +def generate_scope_check(scope): + return ' || '.join([f'relevantScope[{node_scope}]' for node_scope in scope]) + + +def generate_categorical_node(node, root_node, floating_data_type, depth): + value_dictionary = { + 'node_id': node.id, + 'node_scope': node.scope[0], + 'node_p': comma_seperated_list(node.p), + 'final_assert': f'resultValue = nodeIntermediateResult[{node.id}];' if root_node == node else '', + 'floating_data_type': floating_data_type + } + return replace_template(TemplatePath.CATEGORICAL, value_dictionary, depth) + + +def nan_replacement(value): + if np.isnan(value): + return 0 + else: + return value + + +def generate_identity_node(node, root_node, floating_data_type, depth): + value_dictionary = { + 'node_id': node.id, + 'node_scope': node.scope[0], + 'null_value_prob': node.null_value_prob, + 'unique_values': comma_seperated_list(node.unique_vals), + 'prob_sum': comma_seperated_list(node.prob_sum), + 'mean': nan_replacement(node.mean * (1 - node.null_value_prob)), + 'inverted_mean': nan_replacement(node.inverted_mean * (1 - node.null_value_prob)), + 'floating_data_type': floating_data_type, + 'final_assert': f'resultValue = nodeIntermediateResult[{node.id}];' if root_node == node else '' + } + return replace_template(TemplatePath.IDENTITY, value_dictionary, depth) + + +def generate_product_node(node, root_node, floating_data_type, depth): + # if ({scope_check}) {{ + # {subtree_code} + # nodeIntermediateResult[{node_id}] = 1.0 + # {result_calculation} + # }} + + result_calculation_lines = [] + for child in node.children: + result_calculation_lines += [f'if ({generate_scope_check(child.scope)}) ' + f'{{nodeIntermediateResult[{node.id}] *= nodeIntermediateResult[{child.id}];}}'] + + value_dictionary = { + 'node_id': node.id, + 'scope_check': generate_scope_check(node.scope), + 'subtree_code': '\n'.join( + [generate_method_body(child, root_node, floating_data_type, depth) for child in node.children]), + 'result_calculation': '\n '.join(result_calculation_lines), + 'final_assert': f'resultValue = nodeIntermediateResult[{node.id}];' if root_node == node else '' + } + return replace_template(TemplatePath.PRODUCT, value_dictionary, depth) + + +def generate_sum_node(node, root_node, floating_data_type, depth): + # if ({scope_check}) {{ + # {subtree_code} + # {result_calculation} + # {final_assert} + # }} + + result_calculation_lines = [] + for i, child in enumerate(node.children): + result_calculation_lines += [f'nodeIntermediateResult[{child.id}] * {node.weights[i]}'] + + value_dictionary = { + 'scope_check': generate_scope_check(node.scope), + 'subtree_code': '\n'.join( + [generate_method_body(child, root_node, floating_data_type, depth) for child in node.children]), + 'result_calculation': f'nodeIntermediateResult[{node.id}]=' + ' + '.join(result_calculation_lines) + ';', + 'final_assert': f'resultValue = nodeIntermediateResult[{node.id}];' if root_node == node else '' + } + return replace_template(TemplatePath.SUM, value_dictionary, depth) + + +def generate_method_body(node, root_node, floating_data_type, depth): + if isinstance(node, Categorical): + return generate_categorical_node(node, root_node, floating_data_type, depth + 1) + elif isinstance(node, IdentityNumericLeaf): + return generate_identity_node(node, root_node, floating_data_type, depth + 1) + elif isinstance(node, Product): + return generate_product_node(node, root_node, floating_data_type, depth + 1) + elif isinstance(node, Sum): + return generate_sum_node(node, root_node, floating_data_type, depth + 1) + else: + raise NotImplementedError + + +def generate_code(spn_id, spn, meta_types, floating_data_type): + """ + Generates inference code for an SPN + :param target_path: the path the generated C++ code is written to + :param floating_data_type: data type floating numbers are represented in generated C++ code + :param spn: root node of an SPN + :return: code string + """ + + # make sure we have ids + assign_ids(spn) + + # fill method body according to SPN structure + method_body = generate_method_body(spn, spn, floating_data_type, 0) + + # build parameters used in generated c++ function + method_params = [] + passed_params = [] + for i, type in enumerate(meta_types): + if type == MetaType.DISCRETE: + method_params += [f'vector <int> possibleValues{i}', f'int nullValueIdx{i}'] + passed_params += [f'py::arg("possibleValues{i}")', f'py::arg("nullValueIdx{i}")'] + elif type == MetaType.REAL: + method_params += [f'bool inverse{i}', f'bool leftMinusInf{i}', f'float leftCondition{i}', + f'bool rightMinusInf{i}', f'float rightCondition{i}', f'bool leftIncluded{i}', + f'bool rightIncluded{i}', f'float nullValue{i}'] + passed_params += [f'py::arg("inverse{i}")', f'py::arg("leftMinusInf{i}")', f'py::arg("leftCondition{i}")', + f'py::arg("rightMinusInf{i}")', f'py::arg("rightCondition{i}")', + f'py::arg("leftIncluded{i}")', f'py::arg("rightIncluded{i}")', f'py::arg("nullValue{i}")'] + + value_dictionary = { + 'spn_id': spn_id, + 'method_body': method_body, + 'method_params': ', '.join(method_params), + 'node_count': get_number_of_nodes(spn), + 'passed_params': ', '.join(passed_params), + 'floating_data_type': floating_data_type + } + generated_method = replace_template(TemplatePath.METHOD_MASTER, value_dictionary, 0) + registrate_method = replace_template(TemplatePath.REGISTRATION_MASTER, value_dictionary, 0) + + return generated_method, registrate_method + + +def generate_ensemble_code(spn_ensemble, floating_data_type='float', ensemble_path=None): + registrations = [] + methods = [] + logger.debug(f"Starting code generation") + for i, spn in enumerate(spn_ensemble.spns): + spn.id = i + gen_start = perf_counter() + generated_method, registrate_method = generate_code(i, spn.mspn, spn.meta_types, floating_data_type) + registrations.append(registrate_method) + methods.append(generated_method) + gen_end = perf_counter() + logger.debug(f"Generated code for SPN {i + 1}/{len(spn_ensemble.spns)} in {gen_end - gen_start:.2f}s.") + + value_dictionary = { + 'methods': '\n\n'.join(methods), + 'registration': '\n\t'.join(registrations) + } + generated_code = replace_template(TemplatePath.MASTER, value_dictionary, 0) + + if ensemble_path is not None: + spn_ensemble.save(ensemble_path) + + with open('optimized_inference.cpp', 'w') as f: + f.write(generated_code) + + logger.debug(f"Finished code generation.") diff --git a/rspn/code_generation/templates/categorical_leave.cpp b/rspn/code_generation/templates/categorical_leave.cpp new file mode 100755 index 0000000..06e74c9 --- /dev/null +++ b/rspn/code_generation/templates/categorical_leave.cpp @@ -0,0 +1,14 @@ +if (relevantScope[{node_scope}]) {{ + // notNanPerNode[{node_id}] = true; + {floating_data_type} probsNode{node_id}[] = {{ {node_p} }}; + + //not null condition + if (nullValueIdx{node_scope} != -1) {{ + nodeIntermediateResult[{node_id}] = 1 - probsNode{node_id}[nullValueIdx{node_scope}]; + }} else {{ + for (int &idx: possibleValues{node_scope}) {{ + nodeIntermediateResult[{node_id}] += probsNode{node_id}[idx]; + }} + }} + {final_assert} +}} \ No newline at end of file diff --git a/rspn/code_generation/templates/identity_leave.cpp b/rspn/code_generation/templates/identity_leave.cpp new file mode 100755 index 0000000..0dfac68 --- /dev/null +++ b/rspn/code_generation/templates/identity_leave.cpp @@ -0,0 +1,48 @@ +if (relevantScope[{node_scope}]) {{ + if (featureScope[{node_scope}]) {{ + if (inverse{node_scope}) {{ + nodeIntermediateResult[{node_id}] = {inverted_mean}; + }} else {{ + nodeIntermediateResult[{node_id}] = {mean}; + }} + }} else {{ + + vector<{floating_data_type}> uniqueVals{node_id}{{ {unique_values} }}; + vector<{floating_data_type}> probSum{node_id}{{ {prob_sum} }}; + + // search right and left bounds via binary search + int leftIdx{node_id} = 0; + if (!leftMinusInf{node_scope}) {{ + vector<{floating_data_type}>::iterator leftBoundIdx{node_id}; + leftBoundIdx{node_id} = std::lower_bound(uniqueVals{node_id}.begin(), uniqueVals{node_id}.end(), leftCondition{node_scope}); + leftIdx{node_id} = leftBoundIdx{node_id} - uniqueVals{node_id}.begin(); + }} + + int rightIdx{node_id} = uniqueVals{node_id}.size(); + if (!rightMinusInf{node_scope}) {{ + vector<{floating_data_type}>::iterator rightBoundIdx{node_id}; + rightBoundIdx{node_id} = std::upper_bound(uniqueVals{node_id}.begin(), uniqueVals{node_id}.end(), rightCondition{node_scope}); + rightIdx{node_id} = rightBoundIdx{node_id} - uniqueVals{node_id}.begin(); + }} + + nodeIntermediateResult[{node_id}] = probSum{node_id}[rightIdx{node_id}] - probSum{node_id}[leftIdx{node_id}]; + + // exclude null value if it was included before + if (((leftMinusInf{node_scope} || leftCondition{node_scope} < nullValue{node_scope}) && (rightMinusInf{node_scope} || rightCondition{node_scope} > nullValue{node_scope})) || + (!leftMinusInf{node_scope} && (nullValue{node_scope} == leftCondition{node_scope}) && leftIncluded{node_scope}) || + (!rightMinusInf{node_scope} && (nullValue{node_scope} == rightCondition{node_scope}) && rightIncluded{node_scope})) {{ + nodeIntermediateResult[{node_id}] -= {null_value_prob}; // null value prob + }} + + // left value should not be included in interval + if (!leftIncluded{node_scope} && !leftMinusInf{node_scope} && leftCondition{node_scope} == uniqueVals{node_id}[leftIdx{node_id}]) {{ + nodeIntermediateResult[{node_id}] -= probSum{node_id}[leftIdx{node_id} + 1] - probSum{node_id}[leftIdx{node_id}]; + }} + + //same for right + if (!rightIncluded{node_scope} && !rightMinusInf{node_scope} && rightCondition{node_scope} == uniqueVals{node_id}[rightIdx{node_id}-{node_id}] && leftCondition{node_scope} != rightCondition{node_scope}) {{ + nodeIntermediateResult[{node_id}] -= probSum{node_id}[rightIdx{node_id}] - probSum{node_id}[rightIdx{node_id} - 1]; + }} + }} + {final_assert} +}} \ No newline at end of file diff --git a/rspn/code_generation/templates/master.cpp b/rspn/code_generation/templates/master.cpp new file mode 100755 index 0000000..3b306c7 --- /dev/null +++ b/rspn/code_generation/templates/master.cpp @@ -0,0 +1,12 @@ +#include <pybind11/pybind11.h> +#include <pybind11/stl.h> +#include <vector> +using namespace std; +namespace py = pybind11; + +{methods} + +PYBIND11_MODULE(optimized_inference, m){{ + m.doc() = "Generated RSPN ensemble code"; + {registration} +}} \ No newline at end of file diff --git a/rspn/code_generation/templates/method_master.cpp b/rspn/code_generation/templates/method_master.cpp new file mode 100755 index 0000000..2eb187a --- /dev/null +++ b/rspn/code_generation/templates/method_master.cpp @@ -0,0 +1,9 @@ +{floating_data_type} spn{spn_id}(vector<bool> relevantScope, vector<bool> featureScope, {method_params}){{ + {floating_data_type} resultValue = 0.0; + // bool notNanPerNode[{node_count}] = {{ false }}; + {floating_data_type} nodeIntermediateResult[{node_count}] = {{ 0 }}; + +{method_body} + + return resultValue; +}} \ No newline at end of file diff --git a/rspn/code_generation/templates/product_node.cpp b/rspn/code_generation/templates/product_node.cpp new file mode 100755 index 0000000..22218cd --- /dev/null +++ b/rspn/code_generation/templates/product_node.cpp @@ -0,0 +1,6 @@ +if ({scope_check}) {{ +{subtree_code} + nodeIntermediateResult[{node_id}] = 1.0; + {result_calculation} + {final_assert} +}} \ No newline at end of file diff --git a/rspn/code_generation/templates/registration_master.cpp b/rspn/code_generation/templates/registration_master.cpp new file mode 100755 index 0000000..a3baea9 --- /dev/null +++ b/rspn/code_generation/templates/registration_master.cpp @@ -0,0 +1 @@ + m.def("spn{spn_id}", &spn{spn_id}, "Generate expectation on SPN", py::arg("relevantScope"), py::arg("featureScope"), {passed_params}); \ No newline at end of file diff --git a/rspn/code_generation/templates/sum_node.cpp b/rspn/code_generation/templates/sum_node.cpp new file mode 100755 index 0000000..0c7aeff --- /dev/null +++ b/rspn/code_generation/templates/sum_node.cpp @@ -0,0 +1,5 @@ +if ({scope_check}) {{ +{subtree_code} + {result_calculation} + {final_assert} +}} \ No newline at end of file diff --git a/rspn/learning/rspn_learning.py b/rspn/learning/rspn_learning.py new file mode 100644 index 0000000..0ac7de7 --- /dev/null +++ b/rspn/learning/rspn_learning.py @@ -0,0 +1,218 @@ +import logging + +import numpy as np +from sklearn.cluster import KMeans +from spn.algorithms.splitting.Base import preproc, split_data_by_clusters +from spn.algorithms.splitting.RDC import getIndependentRDCGroups_py +from spn.structure.StatisticalTypes import MetaType + +from rspn.structure.leaves import IdentityNumericLeaf, Categorical + +logger = logging.getLogger(__name__) +MAX_UNIQUE_LEAF_VALUES = 10000 + + +def learn_mspn( + data, + ds_context, + cols="rdc", + rows="kmeans", + min_instances_slice=200, + threshold=0.3, + max_sampling_threshold_cols=10000, + max_sampling_threshold_rows=100000, + ohe=False, + leaves=None, + memory=None, + rand_gen=None, + cpus=-1 +): + """ + Adapts normal learn_mspn to use custom identity leafs and use sampling for structure learning. + :param max_sampling_threshold_rows: + :param max_sampling_threshold_cols: + :param data: + :param ds_context: + :param cols: + :param rows: + :param min_instances_slice: + :param threshold: + :param ohe: + :param leaves: + :param memory: + :param rand_gen: + :param cpus: + :return: + """ + if leaves is None: + leaves = create_custom_leaf + + if rand_gen is None: + rand_gen = np.random.RandomState(17) + + from rspn.learning.structure_learning import get_next_operation, learn_structure + + def l_mspn(data, ds_context, cols, rows, min_instances_slice, threshold, ohe): + split_cols, split_rows = get_splitting_functions(max_sampling_threshold_rows, max_sampling_threshold_cols, cols, + rows, ohe, threshold, rand_gen, cpus) + + nextop = get_next_operation(min_instances_slice) + + node = learn_structure(data, ds_context, split_rows, split_cols, leaves, next_operation=nextop) + return node + + if memory: + l_mspn = memory.cache(l_mspn) + + spn = l_mspn(data, ds_context, cols, rows, min_instances_slice, threshold, ohe) + return spn + + +def create_custom_leaf(data, ds_context, scope): + """ + Adapted leafs for cardinality SPN. Either categorical or identityNumeric leafs. + """ + + idx = scope[0] + meta_type = ds_context.meta_types[idx] + + if meta_type == MetaType.REAL: + assert len(scope) == 1, "scope for more than one variable?" + + unique_vals, counts = np.unique(data[:, 0], return_counts=True) + + if hasattr(ds_context, 'no_compression_scopes') and idx not in ds_context.no_compression_scopes and \ + len(unique_vals) > MAX_UNIQUE_LEAF_VALUES: + # if there are too many unique values build identity leaf with histogram representatives + hist, bin_edges = np.histogram(data[:, 0], bins=MAX_UNIQUE_LEAF_VALUES, density=False) + logger.debug(f"\t\tDue to histograms leaf size was reduced " + f"by {(1 - float(MAX_UNIQUE_LEAF_VALUES) / len(unique_vals)) * 100:.2f}%") + unique_vals = bin_edges[:-1] + probs = hist / data.shape[0] + lidx = len(probs) - 1 + + assert len(probs) == len(unique_vals) + + else: + probs = np.array(counts, np.float64) / len(data[:, 0]) + lidx = len(probs) - 1 + + null_value = ds_context.null_values[idx] + leaf = IdentityNumericLeaf(unique_vals, probs, null_value, scope, cardinality=data.shape[0]) + + return leaf + + elif meta_type == MetaType.DISCRETE: + + unique, counts = np.unique(data[:, 0], return_counts=True) + # +1 because of potential 0 value that might not occur + sorted_counts = np.zeros(len(ds_context.domains[idx]) + 1, dtype=np.float64) + for i, x in enumerate(unique): + sorted_counts[int(x)] = counts[i] + p = sorted_counts / data.shape[0] + null_value = ds_context.null_values[idx] + node = Categorical(p, null_value, scope, cardinality=data.shape[0]) + + return node + + +def get_splitting_functions(max_sampling_threshold_rows, max_sampling_threshold_cols, cols, rows, ohe, threshold, + rand_gen, n_jobs): + from spn.algorithms.splitting.Clustering import get_split_rows_TSNE, get_split_rows_GMM + from spn.algorithms.splitting.PoissonStabilityTest import get_split_cols_poisson_py + from spn.algorithms.splitting.RDC import get_split_rows_RDC_py + + if isinstance(cols, str): + + if cols == "rdc": + split_cols = get_split_cols_RDC_py(max_sampling_threshold_cols=max_sampling_threshold_cols, + threshold=threshold, + rand_gen=rand_gen, ohe=ohe, n_jobs=n_jobs) + elif cols == "poisson": + split_cols = get_split_cols_poisson_py(threshold, n_jobs=n_jobs) + else: + raise AssertionError("unknown columns splitting strategy type %s" % str(cols)) + else: + split_cols = cols + + if isinstance(rows, str): + + if rows == "rdc": + split_rows = get_split_rows_RDC_py(rand_gen=rand_gen, ohe=ohe, n_jobs=n_jobs) + elif rows == "kmeans": + split_rows = get_split_rows_KMeans(max_sampling_threshold_rows=max_sampling_threshold_rows) + elif rows == "tsne": + split_rows = get_split_rows_TSNE() + elif rows == "gmm": + split_rows = get_split_rows_GMM() + else: + raise AssertionError("unknown rows splitting strategy type %s" % str(rows)) + else: + split_rows = rows + return split_cols, split_rows + + +# noinspection PyPep8Naming +def get_split_rows_KMeans(max_sampling_threshold_rows, n_clusters=2, pre_proc=None, ohe=False, seed=17): + # noinspection PyPep8Naming + def split_rows_KMeans(local_data, ds_context, scope): + data = preproc(local_data, ds_context, pre_proc, ohe) + + if data.shape[0] > max_sampling_threshold_rows: + data_sample = data[np.random.randint(data.shape[0], size=max_sampling_threshold_rows), :] + + kmeans = KMeans(n_clusters=n_clusters, random_state=seed) + clusters = kmeans.fit(data_sample).predict(data) + else: + kmeans = KMeans(n_clusters=n_clusters, random_state=seed) + clusters = kmeans.fit_predict(data) + + cluster_centers = kmeans.cluster_centers_ + result = split_data_by_clusters(local_data, clusters, scope, rows=True) + + return result, cluster_centers.tolist() + + return split_rows_KMeans + + +# noinspection PyPep8Naming +def get_split_cols_RDC_py(max_sampling_threshold_cols=10000, threshold=0.3, ohe=True, k=10, s=1 / 6, + non_linearity=np.sin, + n_jobs=-2, rand_gen=None): + from spn.algorithms.splitting.RDC import split_data_by_clusters + + def split_cols_RDC_py(local_data, ds_context, scope): + meta_types = ds_context.get_meta_types_by_scope(scope) + domains = ds_context.get_domains_by_scope(scope) + + if local_data.shape[0] > max_sampling_threshold_cols: + local_data_sample = local_data[np.random.randint(local_data.shape[0], size=max_sampling_threshold_cols), :] + clusters = getIndependentRDCGroups_py( + local_data_sample, + threshold, + meta_types, + domains, + k=k, + s=s, + # ohe=True, + non_linearity=non_linearity, + n_jobs=n_jobs, + rand_gen=rand_gen, + ) + return split_data_by_clusters(local_data, clusters, scope, rows=False) + else: + clusters = getIndependentRDCGroups_py( + local_data, + threshold, + meta_types, + domains, + k=k, + s=s, + # ohe=True, + non_linearity=non_linearity, + n_jobs=n_jobs, + rand_gen=rand_gen, + ) + return split_data_by_clusters(local_data, clusters, scope, rows=False) + + return split_cols_RDC_py diff --git a/rspn/learning/structure_learning.py b/rspn/learning/structure_learning.py new file mode 100644 index 0000000..1d35d03 --- /dev/null +++ b/rspn/learning/structure_learning.py @@ -0,0 +1,319 @@ +import logging +import multiprocessing +import os +from collections import deque +from enum import Enum + +import numpy as np +from spn.algorithms.StructureLearning import default_slicer +from spn.structure.Base import assign_ids, Product + +from rspn.algorithms.transform_structure import Prune +from rspn.algorithms.validity.validity import is_valid +from rspn.structure.base import Sum + +logger = logging.getLogger(__name__) + +try: + from time import perf_counter +except: + from time import time + + perf_counter = time + +parallel = True + +if parallel: + cpus = max(1, os.cpu_count() - 2) # - int(os.getloadavg()[2]) +else: + cpus = 1 +pool = multiprocessing.Pool(processes=cpus) + + +class Operation(Enum): + CREATE_LEAF = 1 + SPLIT_COLUMNS = 2 + SPLIT_ROWS = 3 + NAIVE_FACTORIZATION = 4 + REMOVE_UNINFORMATIVE_FEATURES = 5 + CONDITIONING = 6 + + +def get_next_operation(min_instances_slice=100, min_features_slice=1, multivariate_leaf=False): + def next_operation( + data, + scope, + create_leaf, + no_clusters=False, + no_independencies=False, + is_first=False, + cluster_first=True, + cluster_univariate=False, + ): + + minimalFeatures = len(scope) == min_features_slice + minimalInstances = data.shape[0] <= min_instances_slice + + if minimalFeatures: + if minimalInstances or no_clusters: + return Operation.CREATE_LEAF, None + else: + if cluster_univariate: + return Operation.SPLIT_ROWS, None + else: + return Operation.CREATE_LEAF, None + + uninformative_features_idx = np.var(data[:, 0: len(scope)], 0) == 0 + ncols_zero_variance = np.sum(uninformative_features_idx) + if ncols_zero_variance > 0: + if ncols_zero_variance == data.shape[1]: + if multivariate_leaf: + return Operation.CREATE_LEAF, None + else: + return Operation.NAIVE_FACTORIZATION, None + else: + return ( + Operation.REMOVE_UNINFORMATIVE_FEATURES, + np.arange(len(scope))[uninformative_features_idx].tolist(), + ) + + if minimalInstances or (no_clusters and no_independencies): + if multivariate_leaf: + return Operation.CREATE_LEAF, None + else: + return Operation.NAIVE_FACTORIZATION, None + + if no_independencies: + return Operation.SPLIT_ROWS, None + + if no_clusters: + return Operation.SPLIT_COLUMNS, None + + if is_first: + if cluster_first: + return Operation.SPLIT_ROWS, None + else: + return Operation.SPLIT_COLUMNS, None + + return Operation.SPLIT_COLUMNS, None + + return next_operation + + +def learn_structure( + dataset, + ds_context, + split_rows, + split_cols, + create_leaf, + next_operation=get_next_operation(), + initial_scope=None, + data_slicer=default_slicer, +): + assert dataset is not None + assert ds_context is not None + assert split_rows is not None + assert split_cols is not None + assert create_leaf is not None + assert next_operation is not None + + root = Product() + root.children.append(None) + root.cardinality = dataset.shape[0] + + if initial_scope is None: + initial_scope = list(range(dataset.shape[1])) + num_conditional_cols = None + elif len(initial_scope) < dataset.shape[1]: + num_conditional_cols = dataset.shape[1] - len(initial_scope) + else: + num_conditional_cols = None + assert len(initial_scope) > dataset.shape[1], "check initial scope: %s" % initial_scope + + tasks = deque() + tasks.append((dataset, root, 0, initial_scope, False, False)) + + while tasks: + + local_data, parent, children_pos, scope, no_clusters, no_independencies = tasks.popleft() + + operation, op_params = next_operation( + local_data, + scope, + create_leaf, + no_clusters=no_clusters, + no_independencies=no_independencies, + is_first=(parent is root), + ) + + logging.debug("OP: {} on slice {} (remaining tasks {})".format(operation, local_data.shape, len(tasks))) + + if operation == Operation.REMOVE_UNINFORMATIVE_FEATURES: + + node = Product() + node.cardinality = local_data.shape[0] + + # In this case, no bloom filters are required since the variables which are split are constant. As a + # consequence, no illegal group by combinations can occur. + node.scope.extend(scope) + parent.children[children_pos] = node + + rest_scope = set(range(len(scope))) + scope_slices = [] + for col in op_params: + rest_scope.remove(col) + scope_slices.append([scope[col]]) + node.children.append(None) + tasks.append( + ( + data_slicer(local_data, [col], num_conditional_cols), + node, + len(node.children) - 1, + [scope[col]], + True, + True, + ) + ) + if len(rest_scope) > 0: + scope_slices.append([scope[col] for col in rest_scope]) + + next_final = False + + if len(rest_scope) == 0: + continue + elif len(rest_scope) == 1: + next_final = True + + node.children.append(None) + c_pos = len(node.children) - 1 + + rest_cols = list(rest_scope) + rest_scope = [scope[col] for col in rest_scope] + tasks.append( + ( + data_slicer(local_data, rest_cols, num_conditional_cols), + node, + c_pos, + rest_scope, + next_final, + next_final, + ) + ) + + continue + + elif operation == Operation.SPLIT_ROWS: + + split_start_t = perf_counter() + data_slices, cluster_centers = split_rows(local_data, ds_context, scope) + split_end_t = perf_counter() + logging.debug( + "\t\tfound {} row clusters (in {:.5f} secs)".format(len(data_slices), split_end_t - split_start_t) + ) + + if len(data_slices) == 1: + tasks.append((local_data, parent, children_pos, scope, True, False)) + continue + + create_sum_node(children_pos, data_slices, parent, scope, tasks, cluster_centers) + + continue + + elif operation == Operation.SPLIT_COLUMNS: + split_start_t = perf_counter() + data_slices = split_cols(local_data, ds_context, scope) + split_end_t = perf_counter() + logging.debug( + "\t\tfound {} col clusters (in {:.5f} secs)".format(len(data_slices), split_end_t - split_start_t) + ) + + if len(data_slices) == 1: + tasks.append((local_data, parent, children_pos, scope, False, True)) + assert np.shape(data_slices[0][0]) == np.shape(local_data) + assert data_slices[0][1] == scope + continue + + node = Product() + node.cardinality = local_data.shape[0] + + node.scope.extend(scope) + parent.children[children_pos] = node + + for data_slice, scope_slice, _ in data_slices: + assert isinstance(scope_slice, list), "slice must be a list" + + node.children.append(None) + tasks.append((data_slice, node, len(node.children) - 1, scope_slice, False, False)) + + continue + + elif operation == Operation.NAIVE_FACTORIZATION: + + node = Product() + node.cardinality = local_data.shape[0] + + node.scope.extend(scope) + parent.children[children_pos] = node + + local_tasks = [] + local_children_params = [] + split_start_t = perf_counter() + for col in range(len(scope)): + node.children.append(None) + local_tasks.append(len(node.children) - 1) + child_data_slice = data_slicer(local_data, [col], num_conditional_cols) + local_children_params.append((child_data_slice, ds_context, [scope[col]])) + + result_nodes = pool.starmap(create_leaf, local_children_params) + + for child_pos, child in zip(local_tasks, result_nodes): + node.children[child_pos] = child + + split_end_t = perf_counter() + + logging.debug( + "\t\tnaive factorization {} columns (in {:.5f} secs)".format(len(scope), split_end_t - split_start_t) + ) + + continue + + elif operation == Operation.CREATE_LEAF: + leaf_start_t = perf_counter() + node = create_leaf(local_data, ds_context, scope) + parent.children[children_pos] = node + leaf_end_t = perf_counter() + + logging.debug( + "\t\t created leaf {} for scope={} (in {:.5f} secs)".format( + node.__class__.__name__, scope, leaf_end_t - leaf_start_t + ) + ) + + else: + raise Exception("Invalid operation: " + operation) + + node = root.children[0] + assign_ids(node) + valid, err = is_valid(node) + assert valid, "invalid spn: " + err + node = Prune(node) + valid, err = is_valid(node) + assert valid, "invalid spn: " + err + + return node + + +def create_sum_node(children_pos, data_slices, parent, scope, tasks, cluster_centers=None): + node = Sum(cluster_centers=cluster_centers) + node.scope.extend(scope) + + parent.children[children_pos] = node + # assert parent.scope == node.scope + cardinality = 0 + for data_slice, scope_slice, proportion in data_slices: + assert isinstance(scope_slice, list), "slice must be a list" + cardinality += len(data_slice) + node.children.append(None) + node.weights.append(proportion) + tasks.append((data_slice, node, len(node.children) - 1, scope, False, False)) + node.cardinality = cardinality diff --git a/rspn/rspn.py b/rspn/rspn.py new file mode 100644 index 0000000..5f47ba2 --- /dev/null +++ b/rspn/rspn.py @@ -0,0 +1,358 @@ +import logging +import time +from functools import partial + +import numpy as np +from spn.structure.Base import Context +from spn.structure.StatisticalTypes import MetaType + +from rspn.algorithms.expectations import expectation +from rspn.algorithms.ranges import NominalRange, NumericRange +from rspn.algorithms.validity.validity import is_valid +from rspn.learning.rspn_learning import learn_mspn +from rspn.structure.leaves import IdentityNumericLeaf, identity_expectation, Categorical, categorical_likelihood_range, \ + identity_likelihood_range + +logger = logging.getLogger(__name__) + + +def build_ds_context(column_names, meta_types, null_values, table_meta_data, no_compression_scopes, train_data, + group_by_threshold=1200): + """ + Builds context according to training data. + :param column_names: + :param meta_types: + :param null_values: + :param table_meta_data: + :param train_data: + :return: + """ + ds_context = Context(meta_types=meta_types) + ds_context.null_values = null_values + assert ds_context.null_values is not None, "Null-Values have to be specified" + domain = [] + no_unique_values = [] + # If metadata is given use this to build domains for categorical values + unified_column_dictionary = None + if table_meta_data is not None: + unified_column_dictionary = {k: v for table, table_md in table_meta_data.items() if + table != 'inverted_columns_dict' and table != 'inverted_fd_dict' + for k, v in table_md['categorical_columns_dict'].items()} + + # domain values + group_by_attributes = [] + for col in range(train_data.shape[1]): + + feature_meta_type = meta_types[col] + min_val = np.nanmin(train_data[:, col]) + max_val = np.nanmax(train_data[:, col]) + + unique_vals = len(np.unique(train_data[:, col])) + no_unique_values.append(unique_vals) + if column_names is not None: + if unique_vals <= group_by_threshold and 'mul_' not in column_names[col] and '_nn' not in column_names[col]: + group_by_attributes.append(col) + + domain_values = [min_val, max_val] + + if feature_meta_type == MetaType.REAL: + min_val = np.nanmin(train_data[:, col]) + max_val = np.nanmax(train_data[:, col]) + domain.append([min_val, max_val]) + elif feature_meta_type == MetaType.DISCRETE: + # if no metadata is given, infer domains from data + if column_names is not None \ + and unified_column_dictionary.get(column_names[col]) is not None: + no_diff_values = len(unified_column_dictionary[column_names[col]].keys()) + domain.append(np.arange(0, no_diff_values + 1, 1)) + else: + domain.append(np.arange(domain_values[0], domain_values[1] + 1, 1)) + else: + raise Exception("Unkown MetaType " + str(meta_types[col])) + + ds_context.domains = np.asanyarray(domain) + ds_context.no_unique_values = np.asanyarray(no_unique_values) + ds_context.group_by_attributes = group_by_attributes + + if no_compression_scopes is None: + no_compression_scopes = [] + ds_context.no_compression_scopes = no_compression_scopes + + return ds_context + + +class RSPN: + + def __init__(self, meta_types, null_values, full_sample_size, column_names=None, table_meta_data=None): + + self.meta_types = meta_types + self.null_values = null_values + self.full_sample_size = full_sample_size + self.column_names = column_names + self.table_meta_data = table_meta_data + self.mspn = None + self.ds_context = None + + self.use_generated_code = False + + # training stats + self.learn_time = None + self.rdc_threshold = None + self.min_instances_slice = None + + def learn(self, train_data, rdc_threshold=0.3, min_instances_slice=1, max_sampling_threshold_cols=10000, + max_sampling_threshold_rows=100000, no_compression_scopes=None): + + # build domains (including the dependence analysis) + domain_start_t = time.perf_counter() + ds_context = build_ds_context(self.column_names, self.meta_types, self.null_values, self.table_meta_data, + no_compression_scopes, train_data) + self.ds_context = ds_context + domain_end_t = time.perf_counter() + logging.debug(f"Built domains in {domain_end_t - domain_start_t} sec") + + # learn mspn + learn_start_t = time.perf_counter() + + self.mspn = learn_mspn(train_data, ds_context, + min_instances_slice=min_instances_slice, threshold=rdc_threshold, + max_sampling_threshold_cols=max_sampling_threshold_cols, + max_sampling_threshold_rows=max_sampling_threshold_rows) + assert is_valid(self.mspn, check_ids=True) + learn_end_t = time.perf_counter() + self.learn_time = learn_end_t - learn_start_t + logging.debug(f"Built SPN in {learn_end_t - learn_start_t} sec") + + # statistics + self.rdc_threshold = rdc_threshold + self.min_instances_slice = min_instances_slice + + def _add_null_values_to_ranges(self, range_conditions): + for col in range(range_conditions.shape[1]): + if range_conditions[0][col] is None: + continue + for idx in range(range_conditions.shape[0]): + range_conditions[idx, col].null_value = self.null_values[col] + + return range_conditions + + def _probability(self, range_conditions): + """ + Compute probability of range conditions. + + e.g. np.array([NominalRange([0]), NumericRange([[0,0.3]]), None]) + """ + + return self._indicator_expectation([], range_conditions=range_conditions) + + def _indicator_expectation(self, feature_scope, identity_leaf_expectation=None, inverted_features=None, + range_conditions=None, force_no_generated=False, gen_code_stats=None): + """ + Compute E[1_{conditions} * X_feature_scope]. Can also compute products (specify multiple feature scopes). + For inverted features 1/val is used. + + Is basis for both unnormalized and normalized expectation computation. + + Uses safe evaluation for products, i.e. compute extra expectation for every multiplier. If results deviate too + largely (>max_deviation), replace by mean. We also experimented with splitting the multipliers into 10 random + groups. However, this works equally well and is faster. + """ + + if inverted_features is None: + inverted_features = [False] * len(feature_scope) + + if range_conditions is None: + range_conditions = np.array([None] * len(self.mspn.scope)).reshape(1, len(self.mspn.scope)) + else: + range_conditions = self._add_null_values_to_ranges(range_conditions) + + if identity_leaf_expectation is None: + _node_expectation = {IdentityNumericLeaf: identity_expectation} + else: + _node_expectation = {IdentityNumericLeaf: identity_leaf_expectation} + + _node_likelihoods_range = {IdentityNumericLeaf: identity_likelihood_range, + Categorical: categorical_likelihood_range} + + if hasattr(self, 'use_generated_code') and self.use_generated_code and not force_no_generated: + full_result = expectation(self.mspn, feature_scope, inverted_features, range_conditions, + node_expectation=_node_expectation, node_likelihoods=_node_likelihoods_range, + use_generated_code=True, spn_id=self.id, meta_types=self.meta_types, + gen_code_stats=gen_code_stats) + else: + full_result = expectation(self.mspn, feature_scope, inverted_features, range_conditions, + node_expectation=_node_expectation, node_likelihoods=_node_likelihoods_range) + + return full_result + + def _augment_not_null_conditions(self, feature_scope, range_conditions): + if range_conditions is None: + range_conditions = np.array([None] * len(self.mspn.scope)).reshape(1, len(self.mspn.scope)) + + # for second computation make sure that features that are not normalized are not NULL + for not_null_scope in feature_scope: + if self.null_values[not_null_scope] is None: + continue + + if range_conditions[0, not_null_scope] is None: + if self.meta_types[not_null_scope] == MetaType.REAL: + range_conditions[:, not_null_scope] = NumericRange([[-np.inf, np.inf]], is_not_null_condition=True) + elif self.meta_types[not_null_scope] == MetaType.DISCRETE: + NumericRange([[-np.inf, np.inf]], is_not_null_condition=True) + categorical_feature_name = self.column_names[not_null_scope] + + for table in self.table_meta_data.keys(): + categorical_values = self.table_meta_data[table]['categorical_columns_dict'] \ + .get(categorical_feature_name) + if categorical_values is not None: + possible_values = list(categorical_values.values()) + possible_values.remove(self.null_values[not_null_scope]) + range_conditions[:, not_null_scope] = NominalRange(possible_values, + is_not_null_condition=True) + break + + return range_conditions + + def _indicator_expectation_with_std(self, feature_scope, inverted_features=None, + range_conditions=None): + """ + Computes standard deviation of the estimator for 1_{conditions}*X_feature_scope. Uses the identity + V(X)=E(X^2)-E(X)^2. + :return: + """ + e_x = self._indicator_expectation(feature_scope, identity_leaf_expectation=identity_expectation, + inverted_features=inverted_features, + range_conditions=range_conditions) + + not_null_conditions = self._augment_not_null_conditions(feature_scope, None) + n = self._probability(not_null_conditions) * self.full_sample_size + + # shortcut: use binomial std if it is just a probability + if len(feature_scope) == 0: + std = np.sqrt(e_x * (1 - e_x) * 1 / n) + return std, e_x + + e_x_sq = self._indicator_expectation(feature_scope, + identity_leaf_expectation=partial(identity_expectation, power=2), + inverted_features=inverted_features, + range_conditions=range_conditions, + force_no_generated=True) + + v_x = e_x_sq - e_x * e_x + + # Indeed divide by sample size of SPN not only qualifying tuples. Because this is not a conditional expectation. + std = np.sqrt(v_x / n) + + return std, e_x + + def _unnormalized_conditional_expectation(self, feature_scope, inverted_features=None, range_conditions=None, + impute_p=False, gen_code_stats=None): + """ + Compute conditional expectation. Can also compute products (specify multiple feature scopes). + For inverted features 1/val is used. Normalization is not possible here. + """ + + range_conditions = self._augment_not_null_conditions(feature_scope, range_conditions) + unnormalized_exp = self._indicator_expectation(feature_scope, inverted_features=inverted_features, + range_conditions=range_conditions, gen_code_stats=gen_code_stats) + + p = self._probability(range_conditions) + if any(p == 0): + if impute_p: + impute_val = np.mean( + unnormalized_exp[np.where(p != 0)[0]] / p[np.where(p != 0)[0]]) + result = unnormalized_exp / p + result[np.where(p == 0)[0]] = impute_val + return result + + return self._indicator_expectation(feature_scope, inverted_features=inverted_features, + gen_code_stats=gen_code_stats) + + return unnormalized_exp / p + + def _unnormalized_conditional_expectation_with_std(self, feature_scope, inverted_features=None, + range_conditions=None, gen_code_stats=None): + """ + Compute conditional expectation. Can also compute products (specify multiple feature scopes). + For inverted features 1/val is used. Normalization is not possible here. + """ + range_conditions = self._augment_not_null_conditions(feature_scope, range_conditions) + p = self._probability(range_conditions) + + e_x_sq = self._indicator_expectation(feature_scope, + identity_leaf_expectation=partial(identity_expectation, power=2), + inverted_features=inverted_features, + range_conditions=range_conditions, + force_no_generated=True, + gen_code_stats=gen_code_stats) / p + + e_x = self._indicator_expectation(feature_scope, inverted_features=inverted_features, + range_conditions=range_conditions, + gen_code_stats=gen_code_stats) / p + + v_x = e_x_sq - e_x * e_x + + n = p * self.full_sample_size + std = np.sqrt(v_x / n) + + return std, e_x + + def _normalized_conditional_expectation(self, feature_scope, inverted_features=None, normalizing_scope=None, + range_conditions=None, standard_deviations=False, impute_p=False, + gen_code_stats=None): + """ + Computes unbiased estimate for conditional expectation E(feature_scope| range_conditions). + To this end, normalization might be required (will always be certain multipliers.) + E[1_{conditions} * X_feature_scope] / E[1_{conditions} * X_normalizing_scope] + + :param feature_scope: + :param inverted_features: + :param normalizing_scope: + :param range_conditions: + :return: + """ + if range_conditions is None: + range_conditions = np.array([None] * len(self.mspn.scope)).reshape(1, len(self.mspn.scope)) + + # If normalization is not required, simply return unnormalized conditional expectation + if normalizing_scope is None or len(normalizing_scope) == 0: + if standard_deviations: + return self._unnormalized_conditional_expectation_with_std(feature_scope, + inverted_features=inverted_features, + range_conditions=range_conditions, + gen_code_stats=gen_code_stats) + else: + return None, self._unnormalized_conditional_expectation(feature_scope, + inverted_features=inverted_features, + range_conditions=range_conditions, + impute_p=impute_p, + gen_code_stats=gen_code_stats) + + assert set(normalizing_scope).issubset(feature_scope), "Normalizing scope must be subset of feature scope" + + # for computation make sure that features that are not normalized are not NULL + range_conditions = self._augment_not_null_conditions(set(feature_scope).difference(normalizing_scope), + range_conditions) + + # E[1_{conditions} * X_feature_scope] + std = None + if standard_deviations: + std, _ = self._unnormalized_conditional_expectation_with_std(feature_scope, + inverted_features=inverted_features, + range_conditions=range_conditions, + gen_code_stats=gen_code_stats) + + nominator = self._indicator_expectation(feature_scope, + inverted_features=inverted_features, + range_conditions=range_conditions, + gen_code_stats=gen_code_stats) + + # E[1_{conditions} * X_normalizing_scope] + inverted_features_of_norm = \ + [inverted_features[feature_scope.index(variable_scope)] for variable_scope in normalizing_scope] + assert all(inverted_features_of_norm), "Normalizing factors should be inverted" + + denominator = self._indicator_expectation(normalizing_scope, + inverted_features=inverted_features_of_norm, + range_conditions=range_conditions) + return std, nominator / denominator diff --git a/rspn/structure/base.py b/rspn/structure/base.py new file mode 100644 index 0000000..766d2e7 --- /dev/null +++ b/rspn/structure/base.py @@ -0,0 +1,28 @@ +from spn.structure.Base import Node + + +class Sum(Node): + def __init__(self, weights=None, children=None, cluster_centers=None, cardinality=None): + Node.__init__(self) + + if weights is None: + weights = [] + self.weights = weights + + if children is None: + children = [] + self.children = children + + if cluster_centers is None: + cluster_centers = [] + self.cluster_centers = cluster_centers + + if cardinality is None: + cardinality = 0 + self.cardinality = cardinality + + @property + def parameters(self): + sorted_children = sorted(self.children, key=lambda c: c.id) + params = [(n.id, self.weights[i]) for i, n in enumerate(sorted_children)] + return tuple(params) diff --git a/rspn/structure/leaves.py b/rspn/structure/leaves.py new file mode 100644 index 0000000..7d61f3b --- /dev/null +++ b/rspn/structure/leaves.py @@ -0,0 +1,376 @@ +import numpy as np +from spn.structure.Base import Leaf +from spn.structure.leaves.parametric.Parametric import Parametric + + +class Categorical(Parametric): + """ + Implements a univariate categorical distribution with k parameters + """ + + from spn.structure.StatisticalTypes import Type + from collections import namedtuple + + type = Type.CATEGORICAL + property_type = namedtuple("Categorical", "p") + + def __init__(self, p, null_value, scope, cardinality=0): + Parametric.__init__(self, type(self).type, scope=scope) + + # parameters + assert np.isclose(np.sum(p), 1), "Probabilities p shall sum to 1" + if not isinstance(p, np.ndarray): + p = np.array(p) + self.p = p + self.cardinality = cardinality + self.null_value = null_value + + def copy_node(self): + return Categorical(np.copy(self.p), self.null_value, self.scope, cardinality=self.cardinality) + + @property + def parameters(self): + return __class__.property_type(p=self.p) + + @property + def k(self): + return len(self.p) + + +class IdentityNumericLeaf(Leaf): + def __init__(self, unique_vals, probabilities, null_value, scope, cardinality=0): + """ + Instead of histogram remember individual values. + :param unique_vals: all possible values in leaf + :param mean: mean of not null values + :param inverted_mean: inverted mean of not null values + :param square_mean: mean of squared not null values + :param inverted_square_mean: mean of 1/squared not null values + :param prob_sum: cumulative sum of probabilities + :param null_value_prob: proportion of null values in the leaf + :param scope: + """ + Leaf.__init__(self, scope=scope) + if not isinstance(unique_vals, np.ndarray): + unique_vals = np.array(unique_vals) + self.unique_vals = unique_vals + self.cardinality = cardinality + self.null_value = null_value + self.unique_vals_idx = None + self.update_unique_vals_idx() + + # will be updated later + self.prob_sum = None + self.null_value_prob = None + self.mean = None + self.inverted_mean = None + self.square_mean = None + self.inverted_square_mean = None + + if not isinstance(probabilities, np.ndarray): + probabilities = np.array(probabilities) + self.update_from_new_probabilities(probabilities) + + def copy_node(self): + self_copy = IdentityNumericLeaf(np.copy(self.unique_vals), self.return_histogram(copy=True), self.null_value, + self.scope, cardinality=self.cardinality) + assert self_copy.mean == self.mean and self_copy.null_value_prob == self.null_value_prob + assert self_copy.inverted_mean == self.inverted_mean and self_copy.square_mean == self.square_mean + assert self_copy.inverted_square_mean == self.inverted_square_mean + return self_copy + + def update_unique_vals_idx(self): + self.unique_vals_idx = {self.unique_vals[idx]: idx for idx in range(self.unique_vals.shape[0])} + + def return_histogram(self, copy=True): + if copy: + return np.copy(self.prob_sum[1:] - self.prob_sum[:-1]) + else: + return self.prob_sum[1:] - self.prob_sum[:-1] + + def update_from_new_probabilities(self, p): + assert len(p) == len(self.unique_vals) + # convert p back to cumulative prob sum + self.prob_sum = np.concatenate([[0], np.cumsum(p)]) + # update null value prob + not_null_indexes = np.where(self.unique_vals != self.null_value)[0] + if self.null_value in self.unique_vals_idx.keys(): + self.null_value_prob = p[self.unique_vals_idx[self.null_value]] + else: + self.null_value_prob = 0 + # update not_null idxs + zero_in_dataset = 0 in self.unique_vals_idx.keys() + # all values NAN + if len(not_null_indexes) == 0: + self.mean = 0 + self.inverted_mean = np.nan + + # for variance computation + self.square_mean = 0 + self.inverted_square_mean = np.nan + # some values nan + else: + self.mean = np.dot(self.unique_vals[not_null_indexes], p[not_null_indexes]) / (1 - self.null_value_prob) + self.square_mean = np.dot(np.square(self.unique_vals[not_null_indexes]), p[not_null_indexes]) / ( + 1 - self.null_value_prob) + + if zero_in_dataset: + self.inverted_mean = np.nan + self.inverted_square_mean = np.nan + else: + self.inverted_mean = np.dot(1 / self.unique_vals[not_null_indexes], p[not_null_indexes]) / ( + 1 - self.null_value_prob) + self.inverted_square_mean = np.dot(1 / np.square(self.unique_vals[not_null_indexes]), + p[not_null_indexes]) / (1 - self.null_value_prob) + + +def _interval_probability(node, left, right, null_value, left_included, right_included): + if left == -np.inf: + lower_idx = 0 + else: + lower_idx = np.searchsorted(node.unique_vals, left, side='left') + if left == right == node.unique_vals[lower_idx - 1]: + return node.prob_sum[lower_idx + 1] - node.prob_sum[lower_idx] + + if right == np.inf: + higher_idx = len(node.unique_vals) + else: + higher_idx = np.searchsorted(node.unique_vals, right, side='right') + + if lower_idx == higher_idx: + return 0 + + p = node.prob_sum[higher_idx] - node.prob_sum[lower_idx] + # null value included in interval + if null_value is not None and \ + (left < null_value < right or + null_value == left and left_included or + null_value == right and right_included): + p -= node.null_value_prob + + # left value should not be included in interval + if not left_included and node.unique_vals[lower_idx] == left: + # equivalent to p -= node.probs[lower_idx] + p -= node.prob_sum[lower_idx + 1] - node.prob_sum[lower_idx] + + # same check for right value + if not right_included and node.unique_vals[higher_idx - 1] == right and left != right: + p -= node.prob_sum[higher_idx] - node.prob_sum[higher_idx - 1] + + return p + + +def _interval_expectation(power, node, left, right, null_value, left_included, right_included, inverted=False): + lower_idx = np.searchsorted(node.unique_vals, left, side='left') + higher_idx = np.searchsorted(node.unique_vals, right, side='right') + exp = 0 + + for j in np.arange(lower_idx, higher_idx): + if node.unique_vals[j] == null_value: + continue + if node.unique_vals[j] == left and not left_included: + continue + if node.unique_vals[j] == right and not right_included: + continue + p_j = node.prob_sum[j + 1] - node.prob_sum[j] + if power == 1: + if not inverted: + exp += p_j * node.unique_vals[j] + else: + exp += p_j * 1 / node.unique_vals[j] + elif power == 2: + if not inverted: + exp += p_j * node.unique_vals[j] * node.unique_vals[j] + else: + exp += p_j * 1 / node.unique_vals[j] * 1 / node.unique_vals[j] + + return exp + + +def identity_expectation(node, data, inverted=False, power=1): + exps = np.zeros((data.shape[0], 1)) + ranges = data[:, node.scope[0]] + + for i, rang in enumerate(ranges): + + if node.null_value_prob > 0: + assert rang is not None, "Ensure that features of expectations are not null." + + if rang is None or rang.ranges == [[-np.inf, np.inf]]: + if power == 1: + if not inverted: + exps[i] = node.mean * (1 - node.null_value_prob) + else: + exps[i] = node.inverted_mean * (1 - node.null_value_prob) + elif power == 2: + if not inverted: + exps[i] = node.square_mean * (1 - node.null_value_prob) + else: + exps[i] = node.inverted_square_mean * (1 - node.null_value_prob) + else: + raise NotImplementedError + continue + + for k, interval in enumerate(rang.get_ranges()): + inclusive = rang.inclusive_intervals[k] + + exps[i] += _interval_expectation(power, node, interval[0], interval[1], rang.null_value, inclusive[0], + inclusive[1], inverted=inverted) + + return exps + + +def _convert_to_single_tuple_set(scope, values): + if values is None: + return [scope], None + + return [scope], set(map(lambda x: (x,), values)) + + +def identity_distinct_ranges(node, data, dtype=np.float64, **kwargs): + """ + Returns distinct values. + """ + ranges = data[:, node.scope[0]] + + assert len(ranges) == 1, "Only single range is supported" + if ranges[0] is None: + return _convert_to_single_tuple_set(node.scope[0], node.unique_vals) + + assert len(ranges[0].ranges) == 1, "Only single interval is supported" + + interval = ranges[0].ranges[0] + inclusive = ranges[0].inclusive_intervals[0] + + lower_idx = np.searchsorted(node.unique_vals, interval[0], side='left') + higher_idx = np.searchsorted(node.unique_vals, interval[1], side='right') + + if lower_idx == higher_idx: + return _convert_to_single_tuple_set(node.scope[0], None) + + if node.unique_vals[lower_idx] == interval[0] and not inclusive[0]: + lower_idx += 1 + + if node.unique_vals[higher_idx - 1] == interval[1] and not inclusive[1]: + higher_idx -= 1 + + if lower_idx == higher_idx: + return _convert_to_single_tuple_set(node.scope[0], None) + + vals = set(node.unique_vals[lower_idx:higher_idx]) + if ranges[0].null_value in vals: + vals.remove(ranges[0].null_value) + + return _convert_to_single_tuple_set(node.scope[0], vals) + + +def identity_likelihood_wo_null(node, data, dtype=np.float64, **kwargs): + assert len(node.scope) == 1, node.scope + + probs = np.empty((data.shape[0], 1), dtype=dtype) + probs[:] = np.nan + nd = data[:, node.scope[0]] + + for i, val in enumerate(nd): + if not np.isnan(val): + probs[i] = _interval_probability(node, val, val, None, True, True) + + return probs + + +def identity_likelihood_range(node, data, dtype=np.float64, overwrite_ranges=None, **kwargs): + assert len(node.scope) == 1, node.scope + + probs = np.zeros((data.shape[0], 1), dtype=dtype) + ranges = overwrite_ranges + if overwrite_ranges is None: + ranges = data[:, node.scope[0]] + + for i, rang in enumerate(ranges): + + # Skip if no range is specified aka use a log-probability of 0 for that instance + if rang is None: + probs[i] = 1 + continue + + if rang.is_not_null_condition: + probs[i] = 1 - node.null_value_prob + continue + + # Skip if no values for the range are provided + if rang.is_impossible(): + continue + + for k, interval in enumerate(rang.get_ranges()): + inclusive = rang.inclusive_intervals[k] + + probs[i] += _interval_probability(node, interval[0], interval[1], rang.null_value, inclusive[0], + inclusive[1]) + + return probs + + +def categorical_likelihood_wo_null(node, data, dtype=np.float64, **kwargs): + """ + Returns the likelihood for the given values ignoring NULL values + """ + probs = np.empty((data.shape[0], 1)) + probs[:] = np.nan + for i in range(data.shape[0]): + value = data[i, node.scope[0]] + if not np.isnan(value): + probs[i] = node.p[int(value)] + # probs = np.reshape([node.p[val] for val in data[:, node.scope[0]]], + # (data.shape[0], 1)) + + return probs + + +def categorical_likelihood_range(node, data, dtype=np.float64, **kwargs): + """ + Returns the probability for the given sets. + """ + + # Assert that the given node is only build on one instance + assert len(node.scope) == 1, node.scope + + # Initialize the return variable log_probs with zeros + probs = np.ones((data.shape[0], 1), dtype=dtype) + + # Only select the ranges for the specific feature + ranges = data[:, node.scope[0]] + + # For each instance + for i, rang in enumerate(ranges): + + # Skip if no range is specified aka use a log-probability of 0 for that instance + if rang is None: + continue + + if rang.is_not_null_condition: + probs[i] = 1 - node.p[rang.null_value] + continue + + # Skip if no values for the range are provided + if len(rang.possible_values) == 0: + probs[i] = 0 + + # Compute the sum of the probability of all possible values + probs[i] = sum([node.p[possible_val] for possible_val in rang.possible_values]) + + return probs + + +def categorical_distinct_ranges(node, data, dtype=np.float64, **kwargs): + """ + Returns distinct values. + """ + + ranges = data[:, node.scope[0]] + assert len(ranges) == 1, "Only single range condition is supported" + + if ranges[0] is None: + return _convert_to_single_tuple_set(node.scope[0], np.where(node.p > 0)[0]) + + return _convert_to_single_tuple_set(node.scope[0], + set(np.where(node.p > 0)[0]).intersection(ranges[0].possible_values)) diff --git a/rspn/updates/top_down_updates.py b/rspn/updates/top_down_updates.py new file mode 100644 index 0000000..2b35d81 --- /dev/null +++ b/rspn/updates/top_down_updates.py @@ -0,0 +1,168 @@ +import logging + +import numpy as np +from spn.structure.Base import Product + +from scipy.spatial import distance + +from rspn.structure.base import Sum +from rspn.structure.leaves import Categorical, IdentityNumericLeaf + +logger = logging.getLogger(__name__) + + +def cluster_center_update_dataset(spn, dataset): + """ + Updates the SPN when a new dataset arrives. The function recursively traverses the + tree and inserts the different values of a dataset at the according places. + + At every sum node, the child node is selected, based on the minimal euclidian distance to the + cluster_center of on of the child-nodes. + :param spn: + :param dataset: + :param metadata: root of aqp_spn containing meta-information (ensemble-object) + :return: + """ + + if isinstance(spn, Categorical): + + insert_into_categorical_leaf(spn, np.array([dataset]), np.array([1.0])) + + return spn + elif isinstance(spn, IdentityNumericLeaf): + + insert_into_identity_numeric_leaf(spn, np.array([dataset]), np.array([1.0])) + + return spn + elif isinstance(spn, Sum): + cc = spn.cluster_centers + + node_idx = 0 + + min_dist = np.inf + min_idx = -1 + for n in spn.children: + # distance calculation between the dataset and the different clusters + # (there exist a much faster version on scipy) + # this? https://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise_distances.html + # + proj = projection(dataset, n.scope) + dist = distance.euclidean(cc[node_idx], proj) + if dist < min_dist: + min_dist = dist + min_idx = node_idx + + node_idx += 1 + assert min_idx > -1 + assert min_idx < len(spn.children) + adapt_weights(spn, min_idx) + cluster_center_update_dataset(spn.children[min_idx], dataset) + elif isinstance(spn, Product): + + for n in spn.children: + cluster_center_update_dataset(n, dataset) + else: + raise Exception("Invalid node type " + str(type(spn))) + spn.cardinality += 1 + + +def projection(dataset, scope): + projection = [] + for idx in scope: + assert len(dataset) > idx, "wrong scope " + str(scope) + " for dataset" + str(dataset) + projection.append(dataset[idx]) + return projection + + +def adapt_weights(sum_node, selected_child_idx): + assert isinstance(sum_node, Sum), "adapt_weights called on non Sum-node" + + card = sum_node.cardinality + from math import isclose + cardinalities = np.array(sum_node.weights) * card + + cardinalities[selected_child_idx] += 1 + + sum_node.weights = cardinalities / (card + 1) + sum_node.weights = sum_node.weights.tolist() + + weight_sum = np.sum(sum_node.weights) + + assert isclose(weight_sum, 1, abs_tol=0.05) + + +def insert_into_categorical_leaf(node, data, insert_weights, debug=False): + """ + Updates categorical leaf according to data with associated insert_weights + """ + + relevant_updates, insert_weights = slice_relevant_updates(node.scope[0], data, insert_weights) + node.cardinality, node.p = insert_into_histogram(node.p, node.cardinality, relevant_updates, insert_weights, + debug=debug) + + +def slice_relevant_updates(idx, data, insert_weights): + relevant_updates = data[:, idx] + relevant_idxs = np.where(insert_weights > 0) + insert_weights = insert_weights[relevant_idxs] + relevant_updates = relevant_updates[relevant_idxs] + + return relevant_updates, insert_weights + + +def insert_into_histogram(p, cardinality, relevant_updates_histogram, insert_weights, debug=False): + p *= cardinality + for i in range(relevant_updates_histogram.shape[0]): + p[int(relevant_updates_histogram[i])] += insert_weights[i] + + new_cardinality = max(0, cardinality + np.sum(insert_weights)) + + p = np.clip(p, 0, np.inf) + + sum_p = np.sum(p) + if sum_p > 0: + p /= sum_p + + if debug: + assert not np.any(np.isnan(p)) + assert np.isclose(np.sum(p), 1) + assert new_cardinality >= 0 + + return new_cardinality, p + + +def insert_into_identity_numeric_leaf(node, data, insert_weights, debug=False): + relevant_updates, insert_weights = slice_relevant_updates(node.scope[0], data, insert_weights) + + p = update_unique_vals(node, relevant_updates) + + # treat this as updating a histogram + # however, we have plain unique values not idxs of histogram + relevant_updates_histogram = np.array( + [node.unique_vals_idx[relevant_updates[i]] for i in range(relevant_updates.shape[0])]) + + node.cardinality, p = insert_into_histogram(p, node.cardinality, relevant_updates_histogram, insert_weights, + debug=debug) + + node.update_from_new_probabilities(p) + + +def update_unique_vals(node, relevant_updates): + # convert cumulative prob sum back to probabilities + p = node.prob_sum[1:] - node.prob_sum[:-1] + # extend the domain if necessary, i.e., all with null value + update_domain = set([update for update in relevant_updates]) + additional_values = update_domain.difference(node.unique_vals_idx.keys()) + if len(additional_values) > 0: + # update unique vals + old_unique_vals = node.unique_vals + node.unique_vals = np.array(sorted(update_domain.union((node.unique_vals_idx.keys())))) + node.update_unique_vals_idx() + + # update p's according to new unique_vals + new_p = np.zeros(len(node.unique_vals)) + for i in range(p.shape[0]): + new_idx = node.unique_vals_idx[old_unique_vals[i]] + new_p[new_idx] = p[i] + p = new_p + return p diff --git a/rspn/utils/numpy_utils.py b/rspn/utils/numpy_utils.py new file mode 100644 index 0000000..059f820 --- /dev/null +++ b/rspn/utils/numpy_utils.py @@ -0,0 +1,18 @@ +import numpy as np + + +def sum_by_group(values, groups): + order = np.argsort(groups) + groups = groups[order] + values = values[order] + values.cumsum(out=values, axis=0) + index = np.ones(len(groups), 'bool') + index[:-1] = groups[1:] != groups[:-1] + values = values[index] + groups = groups[index] + values[1:] = values[1:] - values[:-1] + return values, groups + + +def greater_close(a, b): + return np.greater(a, b) | np.isclose(a, b) diff --git a/schemas/tpc_ds/schema.py b/schemas/tpc_ds/schema.py new file mode 100644 index 0000000..e484117 --- /dev/null +++ b/schemas/tpc_ds/schema.py @@ -0,0 +1,79 @@ +from ensemble_compilation.graph_representation import SchemaGraph, Table + + +def gen_10gb_tpc_ds_schema(csv_path): + """ + TPCDS 10g schema + """ + + schema = SchemaGraph() + schema.add_table(Table('store_sales', + attributes=['ss_sold_date_sk', 'ss_sold_time_sk', 'ss_item_sk', 'ss_customer_sk', + 'ss_cdemo_sk', 'ss_hdemo_sk', 'ss_addr_sk', 'ss_store_sk', 'ss_promo_sk', + 'ss_ticket_number', 'ss_quantity', 'ss_wholesale_cost', 'ss_list_price', + 'ss_sales_price', 'ss_ext_discount_amt', 'ss_ext_sales_price', + 'ss_ext_wholesale_cost', 'ss_ext_list_price', 'ss_ext_tax', + 'ss_coupon_amt', 'ss_net_paid', 'ss_net_paid_inc_tax', 'ss_net_profit'], + irrelevant_attributes=['ss_sold_time_sk', 'ss_item_sk', 'ss_customer_sk', 'ss_cdemo_sk', + 'ss_hdemo_sk', 'ss_addr_sk', 'ss_promo_sk', 'ss_ticket_number', + 'ss_quantity', 'ss_wholesale_cost', 'ss_list_price', + 'ss_ext_discount_amt', 'ss_ext_sales_price', 'ss_ext_wholesale_cost', + 'ss_ext_list_price', 'ss_ext_tax', + 'ss_coupon_amt', 'ss_net_paid', 'ss_net_paid_inc_tax', + 'ss_net_profit'], + no_compression=['ss_sold_date_sk', 'ss_store_sk', 'ss_sales_price'], + csv_file_location=csv_path.format('store_sales_sampled'), + table_size=28800991, primary_key=['ss_item_sk', 'ss_ticket_number'], + sample_rate=10000000 / 28800991 + )) + + return schema + + +def gen_1t_tpc_ds_schema(csv_path): + """ + TPCDS 1t schema + """ + + schema = SchemaGraph() + schema.add_table(Table('store_sales', + attributes=['ss_sold_date_sk', 'ss_sold_time_sk', 'ss_item_sk', 'ss_customer_sk', + 'ss_cdemo_sk', 'ss_hdemo_sk', 'ss_addr_sk', 'ss_store_sk', 'ss_promo_sk', + 'ss_ticket_number', 'ss_quantity', 'ss_wholesale_cost', 'ss_list_price', + 'ss_sales_price', 'ss_ext_discount_amt', 'ss_ext_sales_price', + 'ss_ext_wholesale_cost', 'ss_ext_list_price', 'ss_ext_tax', + 'ss_coupon_amt', 'ss_net_paid', 'ss_net_paid_inc_tax', 'ss_net_profit'], + irrelevant_attributes=['ss_sold_time_sk', 'ss_item_sk', 'ss_customer_sk', 'ss_cdemo_sk', + 'ss_hdemo_sk', 'ss_addr_sk', 'ss_promo_sk', 'ss_ticket_number', + 'ss_quantity', 'ss_wholesale_cost', 'ss_list_price', + 'ss_ext_discount_amt', 'ss_ext_sales_price', 'ss_ext_wholesale_cost', + 'ss_ext_list_price', 'ss_ext_tax', + 'ss_coupon_amt', 'ss_net_paid', 'ss_net_paid_inc_tax', + 'ss_net_profit'], + no_compression=['ss_sold_date_sk', 'ss_store_sk', 'ss_sales_price'], + csv_file_location=csv_path.format('store_sales_sampled'), + table_size=2879987999, primary_key=['ss_item_sk', 'ss_ticket_number'], + sample_rate=10000000 / 2879987999 + )) + + return schema + +# suboptimal configuration +# def gen_10gb_tpc_ds_schema(csv_path): +# """ +# TPCDS 10g schema +# """ +# +# schema = SchemaGraph() +# schema.add_table(Table('store_sales', +# attributes=['ss_sold_date_sk', 'ss_sold_time_sk', 'ss_item_sk', 'ss_customer_sk', +# 'ss_cdemo_sk', 'ss_hdemo_sk', 'ss_addr_sk', 'ss_store_sk', 'ss_promo_sk', +# 'ss_ticket_number', 'ss_quantity', 'ss_wholesale_cost', 'ss_list_price', +# 'ss_sales_price', 'ss_ext_discount_amt', 'ss_ext_sales_price', +# 'ss_ext_wholesale_cost', 'ss_ext_list_price', 'ss_ext_tax', +# 'ss_coupon_amt', 'ss_net_paid', 'ss_net_paid_inc_tax', 'ss_net_profit'], +# csv_file_location=csv_path.format('store_sales_sampled'), +# table_size=28800991, primary_key=['ss_item_sk', 'ss_ticket_number'], sample_rate=0.33 +# )) +# +# return schema