Skip to content

Commit

Permalink
Remove temporary CSV file from processing step
Browse files Browse the repository at this point in the history
See: #52
  • Loading branch information
tillywoodfield committed Aug 7, 2024
1 parent d431d47 commit 5e3492b
Showing 1 changed file with 38 additions and 31 deletions.
69 changes: 38 additions & 31 deletions iatidata/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import base64
import concurrent.futures
import csv
import functools
import gzip
import json
Expand Down Expand Up @@ -490,9 +489,7 @@ def traverse_object(

def create_rows(
id: int, dataset: str, prefix: str, original_object: dict[str, Any], filetype: str
) -> list[list[Any]]:
rows = []

) -> Iterator[dict[str, Any]]:
if original_object is None:
return []

Expand Down Expand Up @@ -534,7 +531,7 @@ def create_rows(
for no_index, full in zip(parent_keys_no_index, parent_keys_list):
object[f"_link_{no_index}"] = f"{id}.{full}"

row = dict(
yield dict(
id=id,
object_key=object_key,
parent_keys=json.dumps(parent_keys),
Expand All @@ -544,10 +541,6 @@ def create_rows(
),
filetype=filetype,
)
rows.append(row)

result = [list(row.values()) for row in rows]
return result


def raw_objects() -> None:
Expand All @@ -573,12 +566,12 @@ def raw_objects() -> None:
)
)

with tempfile.TemporaryDirectory() as tmpdirname:
with engine.begin() as connection:
connection = connection.execution_options(
with engine.begin() as read_connection:
with engine.begin() as write_connection:
read_connection = read_connection.execution_options(
stream_results=True, max_row_buffer=1000
)
results = connection.execute(
results = read_connection.execute(
text(
"""
(SELECT id, dataset, prefix, object, 'activity' AS filetype FROM _raw_activity ORDER BY id)
Expand All @@ -587,25 +580,39 @@ def raw_objects() -> None:
"""
)
)
paths_csv_file = tmpdirname + "/paths.csv"

with gzip.open(paths_csv_file, "wt", newline="") as csv_file:
csv_writer = csv.writer(csv_file)
for num, (id, dataset, prefix, original_object, filetype) in enumerate(
results
):
if num % 10000 == 0:
logger.debug(f"Processed {num} objects so far")
csv_writer.writerows(
create_rows(id, dataset, prefix, original_object, filetype)
)

logger.debug("Loading processed activities from CSV file into database")
with engine.begin() as connection, gzip.open(paths_csv_file, "rt") as f:
dbapi_conn = connection.connection
copy_sql = "COPY _raw_objects FROM STDIN WITH CSV"
cur = dbapi_conn.cursor()
cur.copy_expert(copy_sql, f)
for num, (id, dataset, prefix, original_object, filetype) in enumerate(
results
):
if num % 10000 == 0:
logger.info(f"Processed {num} objects so far")
write_connection.execute(
insert(
table(
"_raw_objects",
column("id"),
column("object_key"),
column("parent_keys"),
column("object_type"),
column("object"),
column("filetype"),
)
).values(
[
{
"id": row["id"],
"object_key": row["object_key"],
"parent_keys": row["parent_keys"],
"object_type": row["object_type"],
"object": row["object"],
"filetype": row["filetype"],
}
for row in create_rows(
id, dataset, prefix, original_object, filetype
)
]
)
)


DATE_RE = r"^(\d{4})-(\d{2})-(\d{2})([T ](\d{2}):(\d{2}):(\d{2}(?:\.\d*)?)((-(\d{2}):(\d{2})|Z)?))?$"
Expand Down

0 comments on commit 5e3492b

Please sign in to comment.