Skip to content

Commit

Permalink
upd: gcs intake (#28)
Browse files Browse the repository at this point in the history
Co-authored-by: anna-grim <[email protected]>
  • Loading branch information
anna-grim and anna-grim authored Jan 13, 2024
1 parent 1832f4a commit 5af46ec
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 85 deletions.
4 changes: 1 addition & 3 deletions src/deep_neurographs/densegraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@
"""

import os

import networkx as nx
from more_itertools import zip_broadcast
from scipy.spatial import KDTree

from deep_neurographs import swc_utils, utils
from deep_neurographs import swc_utils
from deep_neurographs.geometry_utils import dist as get_dist


Expand Down
97 changes: 22 additions & 75 deletions src/deep_neurographs/intake.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,16 @@
import os
from concurrent.futures import (
ProcessPoolExecutor,
ThreadPoolExecutor,
as_completed,
)
from io import BytesIO
from time import time
from zipfile import ZipFile

from google.cloud import storage

from deep_neurographs import graph_utils as gutils
from deep_neurographs import swc_utils, utils
from deep_neurographs import utils
from deep_neurographs.neurograph import NeuroGraph
from deep_neurographs.swc_utils import parse_gcs_zip, process_local_paths
from deep_neurographs.swc_utils import process_local_paths, process_gsc_zip

N_PROPOSALS_PER_LEAF = 3
OPTIMIZE_PROPOSALS = False
Expand Down Expand Up @@ -143,14 +140,22 @@ def build_neurograph_from_gcs_zips(
Neurograph generated from zips of swc files stored in a GCS bucket.
"""
# Process swc files
t0 = time()
swc_dicts = download_gcs_zips(bucket_name, cloud_path, min_size)
print(f"download_gcs_zips(): {time() - t0} seconds")

# Build neurograph
t0 = time()
neurograph = build_neurograph(
swc_dicts,
img_path=img_path,
prune=prune,
prune_depth=prune_depth,
smooth=smooth,
)
print(f"build_neurograph(): {time() - t0} seconds")

if search_radius > 0:
neurograph.generate_proposals(
search_radius, n_proposals_per_leaf=n_proposals_per_leaf
Expand Down Expand Up @@ -180,71 +185,34 @@ def download_gcs_zips(bucket_name, cloud_path, min_size):
# Initializations
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
zip_paths = list_gcs_filenames(bucket, cloud_path, ".zip")
zip_paths = utils.list_gcs_filenames(bucket, cloud_path, ".zip")
chunk_size = int(len(zip_paths) * 0.1)
print(f"# zip files: {len(zip_paths)} \n\n")
print(f"# zip files: {len(zip_paths)} \n")

# Parse
cnt = 1
t0 = time()
t1 = time()
swc_dicts = []
print(f"-- Starting Multithread Reads with chunk_size={chunk_size} -- \n")
swc_dicts = dict()
for i, path in enumerate(zip_paths):
swc_dict_i = download_zip(bucket, path, min_size=min_size)
swc_dicts.extend(swc_dict_i)
swc_dicts.update(process_gsc_zip(bucket, path, min_size=min_size))
if len(swc_dicts) > 10000:
break
if i > cnt * chunk_size:
report_runtimes(len(zip_paths), i, chunk_size, t0, t1)
t1 = time()
cnt += 1
break
t, unit = utils.time_writer(time() - t0)
print("# connected components:", len(swc_dicts))
print(f"Download Runtime: {round(t, 4)} {unit}")
return swc_dicts


def download_zip(bucket, zip_path, min_size=0):
zip_blob = bucket.blob(zip_path)
zip_content = zip_blob.download_as_bytes()
with ZipFile(BytesIO(zip_content)) as zip_file:
with ThreadPoolExecutor() as executor:
results = [
executor.submit(parse_gcs_zip, zip_file, path, min_size)
for path in list_files_in_gcs_zip(zip_content)
]
swc_dicts = [result.result() for result in as_completed(results)]
return swc_dicts


def count_files_in_zips(bucket, zip_paths):
file_cnt = 0
for zip_path in zip_paths:
zip_blob = bucket.blob(zip_path)
zip_content = zip_blob.download_as_bytes()
file_paths = list_files_in_gcs_zip(zip_content)
file_cnt += len(file_paths)
zip_content = bucket.blob(zip_path).download_as_bytes()
file_cnt += len(utils.list_files_in_gcs_zip(zip_content))
return file_cnt


def list_files_in_gcs_zip(zip_content):
"""
Lists all files in a zip file stored in a GCS bucket.
"""
with ZipFile(BytesIO(zip_content), "r") as zip_file:
return zip_file.namelist()


def list_gcs_filenames(bucket, cloud_path, extension):
"""
Lists all files in a GCS bucket with the given extension.
"""
blobs = bucket.list_blobs(prefix=cloud_path)
return [blob.name for blob in blobs if extension in blob.name]


# -- Build neurograph ---
def build_neurograph(
swc_dicts,
Expand All @@ -256,9 +224,8 @@ def build_neurograph(
smooth=SMOOTH,
):
# Extract irreducibles
t0 = time()
irreducibles = dict()
"""
-- asynchronous --
with ProcessPoolExecutor() as executor:
# Assign Processes
processes = [None] * len(swc_dicts)
Expand All @@ -274,15 +241,7 @@ def build_neurograph(
for process in as_completed(processes):
process_id, result = process.result()
irreducibles[process_id] = result
"""
for key in swc_dicts.keys():
_, irreducibles[key] = gutils.get_irreducibles(
swc_dicts[key],
swc_id=key,
prune=prune,
depth=prune_depth,
smooth=smooth
)
print(f" --> get_irreducibles(): {time() - t0} seconds")

# Build neurograph
t0 = time()
Expand All @@ -301,17 +260,6 @@ def get_paths(swc_dir):
return paths


def get_start_ids(swc_dicts):
node_ids = []
cnt = 0
for swc_dict in swc_dicts:
graph = swc_utils.to_graph(swc_dict)
leafs, junctions = gutils.get_irreducibles(graph)
node_ids.append(cnt)
cnt += len(leafs) + len(junctions)
return node_ids


def report_runtimes(
n_files, n_files_completed, chunk_size, start, start_chunk
):
Expand All @@ -320,11 +268,10 @@ def report_runtimes(
n_files_remaining = n_files - n_files_completed
rate = chunk_runtime / chunk_size
eta = (runtime + n_files_remaining * rate) / 60
files_processed = f"{n_files_completed - chunk_size}-{n_files_completed}"
zip_processed = f"{n_files_completed - chunk_size}-{n_files_completed}"
print(f"Completed: {round(100 * n_files_completed / n_files, 2)}%")
print(
f"Runtime for Zips {files_processed}: {round(chunk_runtime, 4)} seconds"
f"Runtime for Zips {zip_processed}: {round(chunk_runtime, 4)} seconds"
)
print(f"Zip Processing Rate: {rate} seconds")
print(f"Approximate Total Runtime: {round(eta, 4)} minutes")
print("")
35 changes: 28 additions & 7 deletions src/deep_neurographs/swc_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
"""

from io import BytesIO
import networkx as nx
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
from zipfile import ZipFile

from deep_neurographs import geometry_utils
from deep_neurographs import graph_utils as gutils
Expand All @@ -21,21 +24,39 @@
def process_local_paths(paths, min_size, bbox=None):
swc_dicts = dict()
for path in paths:
swc_dict = parse_local_swc(path, bbox=bbox, min_size=min_size)
if len(swc_dict):
swc_id = utils.get_swc_id(path)
swc_id, swc_dict = parse_local_swc(path, bbox=bbox, min_size=min_size)
if len(swc_dict["id"]) > min_size:
swc_dicts[swc_id] = swc_dict
return swc_dicts


def process_gsc_zip(bucket, zip_path, min_size=0):
swc_dicts = dict()
zip_blob = bucket.blob(zip_path)
zip_content = zip_blob.download_as_bytes()
with ZipFile(BytesIO(zip_content)) as zip_file:
with ThreadPoolExecutor() as executor:
threads = [
executor.submit(parse_gcs_zip, zip_file, path, min_size)
for path in utils.list_files_in_gcs_zip(zip_content)
]
for thread in as_completed(threads):
swc_id, result = thread.result()
if len(result["id"]) > min_size:
swc_dicts[swc_id] = result
return swc_dicts


def parse_local_swc(path, bbox=None, min_size=0):
contents = read_from_local(path)
return parse(contents, bbox=bbox) if len(contents) > min_size else []
swc_dict = parse(contents) if len(contents) > min_size else {"id": [-1]}
return utils.get_swc_id(path), swc_dict


def parse_gcs_zip(zip_file, path, min_size=0):
swc_contents = read_from_gcs_zip(zip_file, path)
return parse(swc_contents) if len(swc_contents) > min_size else []
contents = read_from_gcs_zip(zip_file, path)
swc_dict = parse(contents) if len(contents) > min_size else {"id": [-1]}
return utils.get_swc_id(path), swc_dict


def parse(swc_contents, bbox=None):
Expand Down Expand Up @@ -81,7 +102,7 @@ def parse(swc_contents, bbox=None):
swc_dict["id"][i] -= min_id
swc_dict["pid"][i] -= min_id

return swc_dict if len(swc_dict["id"]) > 1 else []
return swc_dict if len(swc_dict["id"]) > 1 else {"id": [-1]}


def read_from_local(path):
Expand Down
21 changes: 21 additions & 0 deletions src/deep_neurographs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import os
import shutil
from copy import deepcopy
from io import BytesIO
from zipfile import ZipFile

import numpy as np
import tensorstore as ts
Expand Down Expand Up @@ -192,6 +194,25 @@ def list_subdirs(path, keyword=None):
return subdirs


# -- gcs utils --
def list_files_in_gcs_zip(zip_content):
"""
Lists all files in a zip file stored in a GCS bucket.
"""
with ZipFile(BytesIO(zip_content), "r") as zip_file:
return zip_file.namelist()


def list_gcs_filenames(bucket, cloud_path, extension):
"""
Lists all files in a GCS bucket with the given extension.
"""
blobs = bucket.list_blobs(prefix=cloud_path)
return [blob.name for blob in blobs if extension in blob.name]


# --- io utils ---
def open_zarr(path):
"""
Expand Down

0 comments on commit 5af46ec

Please sign in to comment.