Skip to content

Commit

Permalink
bring dev in
Browse files Browse the repository at this point in the history
  • Loading branch information
kasunamare committed Nov 14, 2023
2 parents 63d7545 + 7b8d5b1 commit 8b147ed
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 8 deletions.
48 changes: 40 additions & 8 deletions src/triage/component/architect/builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,18 @@ def build_matrix(
matrix_metadata["label_timespan"],
)

output = self.stitch_csvs(feature_queries, label_query, matrix_store, matrix_uuid)
logger.debug(f"matrix stitched, pandas DF returned")
output, labels = self.stitch_csvs(feature_queries, label_query, matrix_store, matrix_uuid)
logger.info(f"matrix stitched, pandas DF returned")
matrix_store.metadata = matrix_metadata
labels = output.pop(matrix_store.label_column_name)
#labels = output.pop(matrix_store.label_column_name)
matrix_store.matrix_label_tuple = output, labels
logging.debug(f'About to save the matrix {matrix_uuid} in {matrix_store.matrix_base_store.path}')
matrix_store.save()
#matrix_store.save()
logger.info(f"Saving matrix metadata (yaml) for matrix {matrix_uuid}")
matrix_store.save_matrix_metadata()

# If completely archived, save its information to matrices table
# At this point, existence of matrix already tested, so no need to delete from db
logging.info(f"Getting all matrix metadata for matrix {matrix_uuid}")
if matrix_type == "train":
lookback = matrix_metadata["max_training_history"]
else:
Expand All @@ -352,6 +354,7 @@ def build_matrix(
matrix_metadata=matrix_metadata,
built_by_experiment=self.experiment_hash
)
logger.info(f"About to save all metrix metadata on DB for matrix {matrix_uuid}")
# before saving the matrix metadata we need to cast datetimes to str
matrix_metadata = change_datetimes_on_metadata(matrix_metadata)
session = self.sessionmaker()
Expand Down Expand Up @@ -555,23 +558,52 @@ def stitch_csvs(self, features_queries, label_query, matrix_store, matrix_uuid):
df_pl = df_pl.with_columns(pl.col("entity_id").cast(pl.Int32, strict=False))
end = time.time()
logger.debug(f"time casting entity_id and as_of_date of matrix with uuid {matrix_uuid} (sec): {(end-start)/60}")

logger.debug(f"getting labels pandas series from polars data frame")
# getting label series
labels_pl = df_pl.select(df_pl.columns[-1])
# convert into pandas series
labels_df = labels_pl.to_pandas()
labels_series = labels_df.squeeze()

# remove labels from features and return as df
logger.debug(f"removing labels from main polars df")
df_pl_aux = df_pl.drop(df_pl.columns[-1])

# converting from polars to pandas
logger.debug(f"about to convert polars df into pandas df")
start = time.time()
df = df_pl.to_pandas()
df = df_pl_aux.to_pandas()
end = time.time()
logger.debug(f"Time converting from polars to pandas (sec): {(end-start)/60}")
df.set_index(["entity_id", "as_of_date"], inplace=True)
logger.debug(f"df data types: {df.dtypes}")
logger.spam(f"Pandas DF memory usage: {df.memory_usage(deep=True).sum()/1000000} MB")

logger.debug(f"Generating gzip from full matrix csv")
self.generate_gzip(path_, matrix_uuid)

logger.debug(f"removing csvs files for matrix {matrix_uuid}")
# addinig _sorted and _fixed files to list of files to rm
rm_filenames = generate_list_of_files_to_remove(filenames, matrix_uuid)
self.remove_unnecessary_files(rm_filenames, path_, matrix_uuid)

#return downcast_matrix(df)
return df
return df, labels_series


def generate_gzip(self, path, matrix_uuid):
"""
Generates a gzip from the csv file with all the features (doesn't include the label)
Args:
path (string): _description_
matrix_uuid (string): _description_
"""
cmd_line = "gzip -k " + path + "/" + matrix_uuid + ".csv"
logger.debug(f"Generating gzip of full matrix on cmd line with command: {cmd_line}")
subprocess.run(cmd_line, shell=True)
logger.debug(f"Full matrix {matrix_uuid} compressed and saved!")


def remove_unnecessary_files(self, filenames, path_, matrix_uuid):
"""
Expand Down
4 changes: 4 additions & 0 deletions src/triage/component/catwalk/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,10 @@ def _load_as_df(self):
with self.matrix_base_store.open("rb") as fd:
return pd.read_csv(fd, compression="gzip", parse_dates=["as_of_date"])

def save_matrix_metadata(self):
with self.metadata_base_store.open("wb") as fd:
yaml.dump(self.metadata, fd, encoding="utf-8")

def save(self):
logging.debug('About to compress')
self.matrix_base_store.write(gzip.compress(self.full_matrix_for_saving.to_csv(None).encode("utf-8")))
Expand Down

0 comments on commit 8b147ed

Please sign in to comment.