Skip to content

Commit

Permalink
check rows on csv files
Browse files Browse the repository at this point in the history
  • Loading branch information
silil committed Sep 29, 2023
1 parent b754ead commit f52e959
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 8 deletions.
36 changes: 28 additions & 8 deletions src/triage/component/architect/builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
from triage.database_reflection import table_has_data, table_row_count
from triage.tracking import built_matrix, skipped_matrix, errored_matrix
from triage.util.pandas import downcast_matrix
from triage.component.architect.utils import change_datetimes_on_metadata
from triage.component.architect.utils import (
change_datetimes_on_metadata,
check_rows_in_files,
check_entity_ids_in_files
)

class BuilderBase:
def __init__(
Expand Down Expand Up @@ -435,7 +439,8 @@ def feature_load_queries(self, feature_dictionary, entity_date_table_name):
table=entity_date_table_name,
),
right_column_selections=[', "{0}"'.format(fn) for fn in feature_names],
include_index=True if num==0 else False,
include_index=True
#include_index=True if num==0 else False,
))
return queries

Expand Down Expand Up @@ -494,7 +499,21 @@ def stitch_csvs(self, features_queries, label_query, matrix_store, matrix_uuid):

# join all files starting with features and ending with label
files = " ".join(filenames)


# check if the number of rows among all features and label files are the same
try:
assert check_rows_in_files(files)
except AssertionError as e:
logger.exception(
f"Different number of rows among features and label files for matrix uuid {matrix_uuid} ",
)
if self.run_id:
errored_matrix(self.run_id, self.db_engine)
return

# check if the entities_id and knowledge_dates are the same among all the features and label files
check_entity_ids_in_files(files)

# save joined csvs
cmd_line = 'paste ' + files + ' -d "," > ' + path_ + "/" + matrix_uuid + ".csv"
logger.debug(f"paste CSVs columnwise for matrix {matrix_uuid} cmd line: {cmd_line}")
Expand Down Expand Up @@ -551,10 +570,11 @@ def remove_unnecessary_files(self, filenames, path_, matrix_uuid):
matrix_uuid (string): ID of the matrix
"""
# deleting features and label csvs
for filename_ in filenames:
cmd_line = 'rm ' + filename_
subprocess.run(cmd_line, shell=True)
#for filename_ in filenames:
# cmd_line = 'rm ' + filename_
# subprocess.run(cmd_line, shell=True)

# deleting the merged csv
cmd_line = 'rm ' + path_ + "/" + matrix_uuid + '.csv'
subprocess.run(cmd_line, shell=True)
#cmd_line = 'rm ' + path_ + "/" + matrix_uuid + '.csv'
#subprocess.run(cmd_line, shell=True)
pass
38 changes: 38 additions & 0 deletions src/triage/component/architect/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import functools
import operator
import tempfile
import subprocess

import sqlalchemy

Expand Down Expand Up @@ -226,3 +227,40 @@ def create_binary_outcome_events(db_engine, table_name, events_data):

def retry_if_db_error(exception):
return isinstance(exception, sqlalchemy.exc.OperationalError)


def _num_elements(x):
"""Extract the number of rows from the subprocess output"""
return int(str(x.stdout, encoding="utf-8").split(" ")[0])


def check_rows_in_files(filenames):
"""Checks if the number of rows among all the CSV files for features and
and label for a matrix uuid are the same.
Args:
filenames (List): List of CSV files to check the number of rows
path_ (string): Path to get the temporal csv files
"""
outputs = []
for element in filenames:
if element.endswith(".csv"):
cmd_line = "wc -l " + element
outputs.append(subprocess.run(cmd_line, shell=True, capture_output=True))

# get the number of rows from the subprocess
rows = [_num_elements(output) for output in outputs]
rows_set = set(rows)

if len(rows_set) == 1:
return True
else:
return False

def check_entity_ids_in_files(filenames):
# get first 2 columns on each file (entity_id, knowledge_date)
for element in filenames:
cmd_line = f"cut -d ',' -f 1,2 {element}.csv | sort -k 1,2 > {element}_sorted.csv"
subprocess.run(cmd_line, shell=True)


0 comments on commit f52e959

Please sign in to comment.