Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upd: gcs intake #28

Merged
merged 1 commit into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/deep_neurographs/densegraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@

"""

import os

import networkx as nx
from more_itertools import zip_broadcast
from scipy.spatial import KDTree

from deep_neurographs import swc_utils, utils
from deep_neurographs import swc_utils
from deep_neurographs.geometry_utils import dist as get_dist


Expand Down
97 changes: 22 additions & 75 deletions src/deep_neurographs/intake.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,16 @@
import os
from concurrent.futures import (
ProcessPoolExecutor,
ThreadPoolExecutor,
as_completed,
)
from io import BytesIO
from time import time
from zipfile import ZipFile

from google.cloud import storage

from deep_neurographs import graph_utils as gutils
from deep_neurographs import swc_utils, utils
from deep_neurographs import utils
from deep_neurographs.neurograph import NeuroGraph
from deep_neurographs.swc_utils import parse_gcs_zip, process_local_paths
from deep_neurographs.swc_utils import process_local_paths, process_gsc_zip

N_PROPOSALS_PER_LEAF = 3
OPTIMIZE_PROPOSALS = False
Expand Down Expand Up @@ -143,14 +140,22 @@ def build_neurograph_from_gcs_zips(
Neurograph generated from zips of swc files stored in a GCS bucket.

"""
# Process swc files
t0 = time()
swc_dicts = download_gcs_zips(bucket_name, cloud_path, min_size)
print(f"download_gcs_zips(): {time() - t0} seconds")

# Build neurograph
t0 = time()
neurograph = build_neurograph(
swc_dicts,
img_path=img_path,
prune=prune,
prune_depth=prune_depth,
smooth=smooth,
)
print(f"build_neurograph(): {time() - t0} seconds")

if search_radius > 0:
neurograph.generate_proposals(
search_radius, n_proposals_per_leaf=n_proposals_per_leaf
Expand Down Expand Up @@ -180,71 +185,34 @@ def download_gcs_zips(bucket_name, cloud_path, min_size):
# Initializations
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
zip_paths = list_gcs_filenames(bucket, cloud_path, ".zip")
zip_paths = utils.list_gcs_filenames(bucket, cloud_path, ".zip")
chunk_size = int(len(zip_paths) * 0.1)
print(f"# zip files: {len(zip_paths)} \n\n")
print(f"# zip files: {len(zip_paths)} \n")

# Parse
cnt = 1
t0 = time()
t1 = time()
swc_dicts = []
print(f"-- Starting Multithread Reads with chunk_size={chunk_size} -- \n")
swc_dicts = dict()
for i, path in enumerate(zip_paths):
swc_dict_i = download_zip(bucket, path, min_size=min_size)
swc_dicts.extend(swc_dict_i)
swc_dicts.update(process_gsc_zip(bucket, path, min_size=min_size))
if len(swc_dicts) > 10000:
break
if i > cnt * chunk_size:
report_runtimes(len(zip_paths), i, chunk_size, t0, t1)
t1 = time()
cnt += 1
break
t, unit = utils.time_writer(time() - t0)
print("# connected components:", len(swc_dicts))
print(f"Download Runtime: {round(t, 4)} {unit}")
return swc_dicts


def download_zip(bucket, zip_path, min_size=0):
zip_blob = bucket.blob(zip_path)
zip_content = zip_blob.download_as_bytes()
with ZipFile(BytesIO(zip_content)) as zip_file:
with ThreadPoolExecutor() as executor:
results = [
executor.submit(parse_gcs_zip, zip_file, path, min_size)
for path in list_files_in_gcs_zip(zip_content)
]
swc_dicts = [result.result() for result in as_completed(results)]
return swc_dicts


def count_files_in_zips(bucket, zip_paths):
file_cnt = 0
for zip_path in zip_paths:
zip_blob = bucket.blob(zip_path)
zip_content = zip_blob.download_as_bytes()
file_paths = list_files_in_gcs_zip(zip_content)
file_cnt += len(file_paths)
zip_content = bucket.blob(zip_path).download_as_bytes()
file_cnt += len(utils.list_files_in_gcs_zip(zip_content))
return file_cnt


def list_files_in_gcs_zip(zip_content):
"""
Lists all files in a zip file stored in a GCS bucket.

"""
with ZipFile(BytesIO(zip_content), "r") as zip_file:
return zip_file.namelist()


def list_gcs_filenames(bucket, cloud_path, extension):
"""
Lists all files in a GCS bucket with the given extension.

"""
blobs = bucket.list_blobs(prefix=cloud_path)
return [blob.name for blob in blobs if extension in blob.name]


# -- Build neurograph ---
def build_neurograph(
swc_dicts,
Expand All @@ -256,9 +224,8 @@ def build_neurograph(
smooth=SMOOTH,
):
# Extract irreducibles
t0 = time()
irreducibles = dict()
"""
-- asynchronous --
with ProcessPoolExecutor() as executor:
# Assign Processes
processes = [None] * len(swc_dicts)
Expand All @@ -274,15 +241,7 @@ def build_neurograph(
for process in as_completed(processes):
process_id, result = process.result()
irreducibles[process_id] = result
"""
for key in swc_dicts.keys():
_, irreducibles[key] = gutils.get_irreducibles(
swc_dicts[key],
swc_id=key,
prune=prune,
depth=prune_depth,
smooth=smooth
)
print(f" --> get_irreducibles(): {time() - t0} seconds")

# Build neurograph
t0 = time()
Expand All @@ -301,17 +260,6 @@ def get_paths(swc_dir):
return paths


def get_start_ids(swc_dicts):
node_ids = []
cnt = 0
for swc_dict in swc_dicts:
graph = swc_utils.to_graph(swc_dict)
leafs, junctions = gutils.get_irreducibles(graph)
node_ids.append(cnt)
cnt += len(leafs) + len(junctions)
return node_ids


def report_runtimes(
n_files, n_files_completed, chunk_size, start, start_chunk
):
Expand All @@ -320,11 +268,10 @@ def report_runtimes(
n_files_remaining = n_files - n_files_completed
rate = chunk_runtime / chunk_size
eta = (runtime + n_files_remaining * rate) / 60
files_processed = f"{n_files_completed - chunk_size}-{n_files_completed}"
zip_processed = f"{n_files_completed - chunk_size}-{n_files_completed}"
print(f"Completed: {round(100 * n_files_completed / n_files, 2)}%")
print(
f"Runtime for Zips {files_processed}: {round(chunk_runtime, 4)} seconds"
f"Runtime for Zips {zip_processed}: {round(chunk_runtime, 4)} seconds"
)
print(f"Zip Processing Rate: {rate} seconds")
print(f"Approximate Total Runtime: {round(eta, 4)} minutes")
print("")
35 changes: 28 additions & 7 deletions src/deep_neurographs/swc_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@

"""

from io import BytesIO
import networkx as nx
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
from zipfile import ZipFile

from deep_neurographs import geometry_utils
from deep_neurographs import graph_utils as gutils
Expand All @@ -21,21 +24,39 @@
def process_local_paths(paths, min_size, bbox=None):
swc_dicts = dict()
for path in paths:
swc_dict = parse_local_swc(path, bbox=bbox, min_size=min_size)
if len(swc_dict):
swc_id = utils.get_swc_id(path)
swc_id, swc_dict = parse_local_swc(path, bbox=bbox, min_size=min_size)
if len(swc_dict["id"]) > min_size:
swc_dicts[swc_id] = swc_dict
return swc_dicts


def process_gsc_zip(bucket, zip_path, min_size=0):
swc_dicts = dict()
zip_blob = bucket.blob(zip_path)
zip_content = zip_blob.download_as_bytes()
with ZipFile(BytesIO(zip_content)) as zip_file:
with ThreadPoolExecutor() as executor:
threads = [
executor.submit(parse_gcs_zip, zip_file, path, min_size)
for path in utils.list_files_in_gcs_zip(zip_content)
]
for thread in as_completed(threads):
swc_id, result = thread.result()
if len(result["id"]) > min_size:
swc_dicts[swc_id] = result
return swc_dicts


def parse_local_swc(path, bbox=None, min_size=0):
contents = read_from_local(path)
return parse(contents, bbox=bbox) if len(contents) > min_size else []
swc_dict = parse(contents) if len(contents) > min_size else {"id": [-1]}
return utils.get_swc_id(path), swc_dict


def parse_gcs_zip(zip_file, path, min_size=0):
swc_contents = read_from_gcs_zip(zip_file, path)
return parse(swc_contents) if len(swc_contents) > min_size else []
contents = read_from_gcs_zip(zip_file, path)
swc_dict = parse(contents) if len(contents) > min_size else {"id": [-1]}
return utils.get_swc_id(path), swc_dict


def parse(swc_contents, bbox=None):
Expand Down Expand Up @@ -81,7 +102,7 @@ def parse(swc_contents, bbox=None):
swc_dict["id"][i] -= min_id
swc_dict["pid"][i] -= min_id

return swc_dict if len(swc_dict["id"]) > 1 else []
return swc_dict if len(swc_dict["id"]) > 1 else {"id": [-1]}


def read_from_local(path):
Expand Down
21 changes: 21 additions & 0 deletions src/deep_neurographs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import os
import shutil
from copy import deepcopy
from io import BytesIO
from zipfile import ZipFile

import numpy as np
import tensorstore as ts
Expand Down Expand Up @@ -192,6 +194,25 @@ def list_subdirs(path, keyword=None):
return subdirs


# -- gcs utils --
def list_files_in_gcs_zip(zip_content):
"""
Lists all files in a zip file stored in a GCS bucket.

"""
with ZipFile(BytesIO(zip_content), "r") as zip_file:
return zip_file.namelist()


def list_gcs_filenames(bucket, cloud_path, extension):
"""
Lists all files in a GCS bucket with the given extension.

"""
blobs = bucket.list_blobs(prefix=cloud_path)
return [blob.name for blob in blobs if extension in blob.name]


# --- io utils ---
def open_zarr(path):
"""
Expand Down
Loading