From 5af46ece88256b21d8f9bc5cc961668414b3bfca Mon Sep 17 00:00:00 2001 From: Anna Grim <108307071+anna-grim@users.noreply.github.com> Date: Fri, 12 Jan 2024 17:35:41 -0800 Subject: [PATCH] upd: gcs intake (#28) Co-authored-by: anna-grim --- src/deep_neurographs/densegraph.py | 4 +- src/deep_neurographs/intake.py | 97 +++++++----------------------- src/deep_neurographs/swc_utils.py | 35 ++++++++--- src/deep_neurographs/utils.py | 21 +++++++ 4 files changed, 72 insertions(+), 85 deletions(-) diff --git a/src/deep_neurographs/densegraph.py b/src/deep_neurographs/densegraph.py index 9839a68..870f606 100644 --- a/src/deep_neurographs/densegraph.py +++ b/src/deep_neurographs/densegraph.py @@ -9,13 +9,11 @@ """ -import os - import networkx as nx from more_itertools import zip_broadcast from scipy.spatial import KDTree -from deep_neurographs import swc_utils, utils +from deep_neurographs import swc_utils from deep_neurographs.geometry_utils import dist as get_dist diff --git a/src/deep_neurographs/intake.py b/src/deep_neurographs/intake.py index 2716b92..6ac68f4 100644 --- a/src/deep_neurographs/intake.py +++ b/src/deep_neurographs/intake.py @@ -11,19 +11,16 @@ import os from concurrent.futures import ( ProcessPoolExecutor, - ThreadPoolExecutor, as_completed, ) -from io import BytesIO from time import time -from zipfile import ZipFile from google.cloud import storage from deep_neurographs import graph_utils as gutils -from deep_neurographs import swc_utils, utils +from deep_neurographs import utils from deep_neurographs.neurograph import NeuroGraph -from deep_neurographs.swc_utils import parse_gcs_zip, process_local_paths +from deep_neurographs.swc_utils import process_local_paths, process_gsc_zip N_PROPOSALS_PER_LEAF = 3 OPTIMIZE_PROPOSALS = False @@ -143,7 +140,13 @@ def build_neurograph_from_gcs_zips( Neurograph generated from zips of swc files stored in a GCS bucket. """ + # Process swc files + t0 = time() swc_dicts = download_gcs_zips(bucket_name, cloud_path, min_size) + print(f"download_gcs_zips(): {time() - t0} seconds") + + # Build neurograph + t0 = time() neurograph = build_neurograph( swc_dicts, img_path=img_path, @@ -151,6 +154,8 @@ def build_neurograph_from_gcs_zips( prune_depth=prune_depth, smooth=smooth, ) + print(f"build_neurograph(): {time() - t0} seconds") + if search_radius > 0: neurograph.generate_proposals( search_radius, n_proposals_per_leaf=n_proposals_per_leaf @@ -180,71 +185,34 @@ def download_gcs_zips(bucket_name, cloud_path, min_size): # Initializations storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) - zip_paths = list_gcs_filenames(bucket, cloud_path, ".zip") + zip_paths = utils.list_gcs_filenames(bucket, cloud_path, ".zip") chunk_size = int(len(zip_paths) * 0.1) - print(f"# zip files: {len(zip_paths)} \n\n") + print(f"# zip files: {len(zip_paths)} \n") # Parse cnt = 1 t0 = time() t1 = time() - swc_dicts = [] - print(f"-- Starting Multithread Reads with chunk_size={chunk_size} -- \n") + swc_dicts = dict() for i, path in enumerate(zip_paths): - swc_dict_i = download_zip(bucket, path, min_size=min_size) - swc_dicts.extend(swc_dict_i) + swc_dicts.update(process_gsc_zip(bucket, path, min_size=min_size)) + if len(swc_dicts) > 10000: + break if i > cnt * chunk_size: report_runtimes(len(zip_paths), i, chunk_size, t0, t1) t1 = time() cnt += 1 - break - t, unit = utils.time_writer(time() - t0) - print("# connected components:", len(swc_dicts)) - print(f"Download Runtime: {round(t, 4)} {unit}") - return swc_dicts - - -def download_zip(bucket, zip_path, min_size=0): - zip_blob = bucket.blob(zip_path) - zip_content = zip_blob.download_as_bytes() - with ZipFile(BytesIO(zip_content)) as zip_file: - with ThreadPoolExecutor() as executor: - results = [ - executor.submit(parse_gcs_zip, zip_file, path, min_size) - for path in list_files_in_gcs_zip(zip_content) - ] - swc_dicts = [result.result() for result in as_completed(results)] return swc_dicts def count_files_in_zips(bucket, zip_paths): file_cnt = 0 for zip_path in zip_paths: - zip_blob = bucket.blob(zip_path) - zip_content = zip_blob.download_as_bytes() - file_paths = list_files_in_gcs_zip(zip_content) - file_cnt += len(file_paths) + zip_content = bucket.blob(zip_path).download_as_bytes() + file_cnt += len(utils.list_files_in_gcs_zip(zip_content)) return file_cnt -def list_files_in_gcs_zip(zip_content): - """ - Lists all files in a zip file stored in a GCS bucket. - - """ - with ZipFile(BytesIO(zip_content), "r") as zip_file: - return zip_file.namelist() - - -def list_gcs_filenames(bucket, cloud_path, extension): - """ - Lists all files in a GCS bucket with the given extension. - - """ - blobs = bucket.list_blobs(prefix=cloud_path) - return [blob.name for blob in blobs if extension in blob.name] - - # -- Build neurograph --- def build_neurograph( swc_dicts, @@ -256,9 +224,8 @@ def build_neurograph( smooth=SMOOTH, ): # Extract irreducibles + t0 = time() irreducibles = dict() - """ - -- asynchronous -- with ProcessPoolExecutor() as executor: # Assign Processes processes = [None] * len(swc_dicts) @@ -274,15 +241,7 @@ def build_neurograph( for process in as_completed(processes): process_id, result = process.result() irreducibles[process_id] = result - """ - for key in swc_dicts.keys(): - _, irreducibles[key] = gutils.get_irreducibles( - swc_dicts[key], - swc_id=key, - prune=prune, - depth=prune_depth, - smooth=smooth - ) + print(f" --> get_irreducibles(): {time() - t0} seconds") # Build neurograph t0 = time() @@ -301,17 +260,6 @@ def get_paths(swc_dir): return paths -def get_start_ids(swc_dicts): - node_ids = [] - cnt = 0 - for swc_dict in swc_dicts: - graph = swc_utils.to_graph(swc_dict) - leafs, junctions = gutils.get_irreducibles(graph) - node_ids.append(cnt) - cnt += len(leafs) + len(junctions) - return node_ids - - def report_runtimes( n_files, n_files_completed, chunk_size, start, start_chunk ): @@ -320,11 +268,10 @@ def report_runtimes( n_files_remaining = n_files - n_files_completed rate = chunk_runtime / chunk_size eta = (runtime + n_files_remaining * rate) / 60 - files_processed = f"{n_files_completed - chunk_size}-{n_files_completed}" + zip_processed = f"{n_files_completed - chunk_size}-{n_files_completed}" print(f"Completed: {round(100 * n_files_completed / n_files, 2)}%") print( - f"Runtime for Zips {files_processed}: {round(chunk_runtime, 4)} seconds" + f"Runtime for Zips {zip_processed}: {round(chunk_runtime, 4)} seconds" ) - print(f"Zip Processing Rate: {rate} seconds") print(f"Approximate Total Runtime: {round(eta, 4)} minutes") print("") diff --git a/src/deep_neurographs/swc_utils.py b/src/deep_neurographs/swc_utils.py index 19464a7..5b378e1 100644 --- a/src/deep_neurographs/swc_utils.py +++ b/src/deep_neurographs/swc_utils.py @@ -9,8 +9,11 @@ """ +from io import BytesIO import networkx as nx import numpy as np +from concurrent.futures import ThreadPoolExecutor, as_completed +from zipfile import ZipFile from deep_neurographs import geometry_utils from deep_neurographs import graph_utils as gutils @@ -21,21 +24,39 @@ def process_local_paths(paths, min_size, bbox=None): swc_dicts = dict() for path in paths: - swc_dict = parse_local_swc(path, bbox=bbox, min_size=min_size) - if len(swc_dict): - swc_id = utils.get_swc_id(path) + swc_id, swc_dict = parse_local_swc(path, bbox=bbox, min_size=min_size) + if len(swc_dict["id"]) > min_size: swc_dicts[swc_id] = swc_dict return swc_dicts +def process_gsc_zip(bucket, zip_path, min_size=0): + swc_dicts = dict() + zip_blob = bucket.blob(zip_path) + zip_content = zip_blob.download_as_bytes() + with ZipFile(BytesIO(zip_content)) as zip_file: + with ThreadPoolExecutor() as executor: + threads = [ + executor.submit(parse_gcs_zip, zip_file, path, min_size) + for path in utils.list_files_in_gcs_zip(zip_content) + ] + for thread in as_completed(threads): + swc_id, result = thread.result() + if len(result["id"]) > min_size: + swc_dicts[swc_id] = result + return swc_dicts + + def parse_local_swc(path, bbox=None, min_size=0): contents = read_from_local(path) - return parse(contents, bbox=bbox) if len(contents) > min_size else [] + swc_dict = parse(contents) if len(contents) > min_size else {"id": [-1]} + return utils.get_swc_id(path), swc_dict def parse_gcs_zip(zip_file, path, min_size=0): - swc_contents = read_from_gcs_zip(zip_file, path) - return parse(swc_contents) if len(swc_contents) > min_size else [] + contents = read_from_gcs_zip(zip_file, path) + swc_dict = parse(contents) if len(contents) > min_size else {"id": [-1]} + return utils.get_swc_id(path), swc_dict def parse(swc_contents, bbox=None): @@ -81,7 +102,7 @@ def parse(swc_contents, bbox=None): swc_dict["id"][i] -= min_id swc_dict["pid"][i] -= min_id - return swc_dict if len(swc_dict["id"]) > 1 else [] + return swc_dict if len(swc_dict["id"]) > 1 else {"id": [-1]} def read_from_local(path): diff --git a/src/deep_neurographs/utils.py b/src/deep_neurographs/utils.py index cf9ac54..431cd0d 100644 --- a/src/deep_neurographs/utils.py +++ b/src/deep_neurographs/utils.py @@ -14,6 +14,8 @@ import os import shutil from copy import deepcopy +from io import BytesIO +from zipfile import ZipFile import numpy as np import tensorstore as ts @@ -192,6 +194,25 @@ def list_subdirs(path, keyword=None): return subdirs +# -- gcs utils -- +def list_files_in_gcs_zip(zip_content): + """ + Lists all files in a zip file stored in a GCS bucket. + + """ + with ZipFile(BytesIO(zip_content), "r") as zip_file: + return zip_file.namelist() + + +def list_gcs_filenames(bucket, cloud_path, extension): + """ + Lists all files in a GCS bucket with the given extension. + + """ + blobs = bucket.list_blobs(prefix=cloud_path) + return [blob.name for blob in blobs if extension in blob.name] + + # --- io utils --- def open_zarr(path): """