Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/FW-393: ingest cosmos status #9

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies = [
"oracledb",
"pandas",
"platformdirs",
"setuptools",
]
name = "iot-swarm"
dynamic = ["version"]
Expand Down
126 changes: 126 additions & 0 deletions src/iotswarm/processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from pathlib import Path
from typing import Callable, List, Optional
import pandas as pd
import sqlite3
from glob import glob
from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm

def build_database_from_csv(
csv_file: str | Path,
database: str | Path,
table_name: str,
timestamp_header: str,
sort_by: str | None = None,
date_time_format: str = r"%d-%b-%y %H.%M.%S",
) -> None:
"""Adds a database table using a csv file with headers.

Args:
csv_file: A path to the csv.
database: Output destination of the database. File is created if not
existing.
table_name: Name of the table to add into database.
timestamp_header: Name of the column with a timestamp
sort_by: Column to sort by
date_time_format: Format of datetime column
"""

if not isinstance(csv_file, Path):
csv_file = Path(csv_file)

if not isinstance(database, Path):
database = Path(database)

if not csv_file.exists():
raise FileNotFoundError(f'csv_file does not exist: "{csv_file}"')

if not database.parent.exists():
raise NotADirectoryError(f'Database directory not found: "{database.parent}"')

with sqlite3.connect(database) as conn:
print(
f'Writing table: "{table_name}" from csv_file: "{csv_file}" to db: "{database}"'
)
print("Loading csv")
df = pd.read_csv(csv_file)
print("Done")
print("Formatting dates")
# print(df.loc[782794])
df[timestamp_header] = pd.to_datetime(df[timestamp_header], format=date_time_format)
print("Done")
if sort_by is not None:
print("Sorting.")
df = df.sort_values(by=sort_by)
print("Done")

print("Writing to db.")
df.to_sql(table_name, conn, if_exists="replace", index=False)
print("Writing complete.")


def _read_cosmos_status_file(file_path):
return pd.read_csv(file_path, delimiter=",", skiprows=[0,2,3])

def _write_batch_to_csv(batch_df, dst, mode='a', header=False):
batch_df.to_csv(dst, mode=mode, index=False, header=header)

def process_csv_files_parallel(
src,
dst,
batch_size=1000,
sort_columns: Optional[List[str]|str]=None,
extension:str=".dat",
read_method:Callable=_read_cosmos_status_file,
write_method:Callable=_write_batch_to_csv):
"""Converts a directory of .dat files into a combined .csv file
Args:
src: The source directory
dst: The output file
sort_columns: Column to sort the values by
extension: The file extension to match
read_method: The method used to read the files
write_method: The method used to write the files
"""

if not isinstance(src, Path):
src = Path(src)
if not isinstance(dst, Path):
dst = Path(dst)
if not isinstance(sort_columns, list) and sort_columns is not None:
sort_columns = [sort_columns]

# Get the list of all CSV files
files = [Path(x) for x in glob(f"{src}/**/*{extension}", recursive=True)]
# Create the output file and write the header from the first file
header_written = False
total_files = len(files)

# Use a ProcessPoolExecutor to parallelize the loading of files
with ProcessPoolExecutor() as executor, tqdm(total=total_files, desc="Processing files") as progress_bar:
# Process in batches
for i in range(0, total_files, batch_size):
# Select a batch of files
batch_files = files[i:i + batch_size]

# Read the files in parallel
batch_dfs = list(executor.map(write_method, batch_files))

# Concatenate the batch into one DataFrame
combined_batch_df = pd.concat(batch_dfs, ignore_index=True)

# Write the batch to the output file (only write header once)
write_method(combined_batch_df, dst, mode='a', header=not header_written)
header_written = True # Header written after the first batch

# Update the progress bar
progress_bar.update(len(batch_files))

# Optionally clear memory if batches are large
del combined_batch_df, batch_dfs

if sort_columns is not None:
print(f"Sorting by {sort_columns}")
df = pd.read_csv(dst)
df = df.sort_values(by=sort_columns)
df.to_csv(dst, index=False, header=True, mode="w")
1 change: 1 addition & 0 deletions src/iotswarm/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class CosmosTable(StrEnum):
LEVEL_1_NMDB_1HOUR = "LEVEL1_NMDB_1HOUR"
LEVEL_1_PRECIP_1MIN = "LEVEL1_PRECIP_1MIN"
LEVEL_1_PRECIP_RAINE_1MIN = "LEVEL1_PRECIP_RAINE_1MIN"
COSMOS_STATUS_1HOUR = "COSMOS_STATUS_1HOUR"


@enum.unique
Expand Down
56 changes: 1 addition & 55 deletions src/iotswarm/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
"""Module for handling commonly reused utility functions."""

from datetime import date, datetime
from pathlib import Path
import pandas
import sqlite3


def json_serial(obj: object):
"""Serializes an unknown object into a json format."""
Expand All @@ -15,54 +11,4 @@ def json_serial(obj: object):
if obj.__class__.__module__ != "builtins":
return obj.__json__()

raise TypeError(f"Type {type(obj)} is not serializable.")


def build_database_from_csv(
csv_file: str | Path,
database: str | Path,
table_name: str,
sort_by: str | None = None,
date_time_format: str = r"%d-%b-%y %H.%M.%S",
) -> None:
"""Adds a database table using a csv file with headers.

Args:
csv_file: A path to the csv.
database: Output destination of the database. File is created if not
existing.
table_name: Name of the table to add into database.
sort_by: Column to sort by
date_time_format: Format of datetime column
"""

if not isinstance(csv_file, Path):
csv_file = Path(csv_file)

if not isinstance(database, Path):
database = Path(database)

if not csv_file.exists():
raise FileNotFoundError(f'csv_file does not exist: "{csv_file}"')

if not database.parent.exists():
raise NotADirectoryError(f'Database directory not found: "{database.parent}"')

with sqlite3.connect(database) as conn:
print(
f'Writing table: "{table_name}" from csv_file: "{csv_file}" to db: "{database}"'
)
print("Loading csv")
df = pandas.read_csv(csv_file)
print("Done")
print("Formatting dates")
df["DATE_TIME"] = pandas.to_datetime(df["DATE_TIME"], format=date_time_format)
print("Done")
if sort_by is not None:
print("Sorting.")
df = df.sort_values(by=sort_by)
print("Done")

print("Writing to db.")
df.to_sql(table_name, conn, if_exists="replace", index=False)
print("Writing complete.")
raise TypeError(f"Type {type(obj)} is not serializable.")
Loading