Skip to content

Commit

Permalink
some queue updates
Browse files Browse the repository at this point in the history
  • Loading branch information
arjunsridhar12345 committed Mar 26, 2024
1 parent b707c33 commit 36a4873
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 50 deletions.
51 changes: 31 additions & 20 deletions src/npc_lims/jobs/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
JobID: TypeAlias = str

QUEUE_JSON_DIR = upath.UPath(
"s3://aind-scratch-data/arjun.sridhar/queue"
"./"
) # TODO figure out path
INITIAL_VALUE = "Added to Queue"
INITIAL_INT_VALUE = -1
Expand All @@ -30,7 +30,8 @@


def read_json(process_name: str) -> dict[str, npc_lims.CapsuleComputationAPI]:
return json.loads((QUEUE_JSON_DIR / process_name).read_bytes())
with open((QUEUE_JSON_DIR / f'{process_name}.json'), 'r') as f:
return json.load(f)


def is_session_in_queue(
Expand Down Expand Up @@ -87,15 +88,19 @@ def get_current_job_status(
job_id = job_or_session_id
else:
job_id = read_json(process_name)[session_id]["id"]
job_status = npc_lims.get_job_status(job_id, check_files=True)

if job_id != INITIAL_VALUE:
job_status = npc_lims.get_job_status(job_id, check_files=True)
else:
job_status = read_json(process_name)[session_id]

return job_status


def sync_json(process_name: str) -> None:
current = read_json(process_name)
for session_id in current:
current[session_id] = get_current_job_status(session_id)
current[session_id] = get_current_job_status(session_id, process_name)
logger.info(f"Updated {session_id} status")

(QUEUE_JSON_DIR / f"{process_name}.json").write_text(json.dumps(current, indent=4))
Expand All @@ -106,7 +111,7 @@ def get_data_asset_name(session_id: SessionID, process_name: str) -> str:
created_dt = (
npc_session.DatetimeRecord(
datetime.datetime.fromtimestamp(
get_current_job_status(session_id)["created"]
get_current_job_status(session_id, process_name)["created"]
)
)
.replace(" ", "_")
Expand All @@ -116,7 +121,8 @@ def get_data_asset_name(session_id: SessionID, process_name: str) -> str:


def create_data_asset(session_id: SessionID, job_id: str, process_name: str) -> None:
asset = codeocean.create_session_data_asset(session_id, job_id, process_name)
data_asset_name = get_data_asset_name(session_id, process_name)
asset = codeocean.create_session_data_asset(session_id, job_id, data_asset_name)

if asset is None:
logger.info(f"Failed to create data asset for {session_id}")
Expand All @@ -140,7 +146,7 @@ def create_all_data_assets(process_name: str, overwrite_existing_assets: bool) -
sync_json(process_name)

for session_id in read_json(process_name):
job_status = get_current_job_status(session_id)
job_status = get_current_job_status(session_id, process_name)
if npc_lims.is_computation_errorred(
job_status
) or not npc_lims.is_computation_finished(job_status):
Expand All @@ -159,8 +165,8 @@ def sync_and_get_num_running_jobs(process_name: str) -> int:
)


def is_started(session_id: SessionID, process_name: str) -> bool:
return read_json(process_name)[session_id]["state"] in ("running", "initializing")
def is_started_or_completed(session_id: SessionID, process_name: str) -> bool:
return read_json(process_name)[session_id]["state"] in ("running", "initializing", "completed")


def add_sessions_to_queue(process_name: str) -> None:
Expand All @@ -175,7 +181,7 @@ def start(
data_assets = [
ComputationDataAsset(
id=npc_lims.get_session_raw_data_asset(session_id)["id"],
mount="ecephys",
mount=npc_lims.get_session_raw_data_asset(session_id)["name"],
),
]
response = npc_lims.run_capsule_or_pipeline(
Expand All @@ -188,27 +194,29 @@ def start(
def process_capsule_or_pipeline_queue(
capsule_or_pipeline_id: str,
process_name: str,
create_data_assets_from_results: bool=True,
rerun_all_jobs: bool = False,
is_pipeline: bool = False,
rerun_errorred_jobs: bool = False,
overwrite_existing_assets: bool = False,
) -> None:
# TODO review and simplify this, right now adding all sessions and then processing,
"""
adds jobs to queue for capsule/pipeline, then processes them - run capsule/pipeline and then create data asset
example: process_queue('1f8f159a-7670-47a9-baf1-078905fc9c2e', 'sorted', is_pipeline=True)
example: process_capsule_or_pipeline_queue('1f8f159a-7670-47a9-baf1-078905fc9c2e', 'sorted', is_pipeline=True)
"""
capsule_pipeline_info = codeocean.CapsulePipelineInfo(
capsule_or_pipeline_id, process_name, is_pipeline
)

add_sessions_to_queue(capsule_pipeline_info.process_name)

for session_info in npc_lims.get_session_info(is_ephys=True, is_uploaded=True):
session_id = session_info.id

if is_started(session_id, capsule_pipeline_info.process_name):
for session_id in read_json(process_name):
if not rerun_all_jobs and is_started_or_completed(session_id, capsule_pipeline_info.process_name):
logger.debug(f"Already started: {session_id}")
job_status = get_current_job_status(session_id)
if not rerun_errorred_jobs or not npc_lims.is_computation_errorred(
get_current_job_status(session_id)
job_status
):
continue

Expand All @@ -218,13 +226,16 @@ def process_capsule_or_pipeline_queue(
>= MAX_RUNNING_JOBS
):
time.sleep(600)

start(session_id, capsule_pipeline_info)

while sync_and_get_num_running_jobs(capsule_pipeline_info.process_name) > 0:
time.sleep(600)
create_all_data_assets(
capsule_pipeline_info.process_name, overwrite_existing_assets
)

if create_data_assets_from_results:
create_all_data_assets(
capsule_pipeline_info.process_name, overwrite_existing_assets
)


if __name__ == "__main__":
Expand Down
18 changes: 9 additions & 9 deletions src/npc_lims/metadata/codeocean.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,15 +436,15 @@ def create_session_data_asset(
source = aind_codeocean_requests.Source(
computation=aind_codeocean_requests.Sources.Computation(id=computation_id)
)
tags = ([str(session.subject), "derived", "ephys", "results"],)
custom_metadata = (
{
"data level": "derived data",
"experiment type": "ecephys",
"modality": "Extracellular electrophysiology",
"subject id": str(session.subject),
},
)
tags = [str(session.subject), "derived", "ephys", "results"]
custom_metadata = \
{
"data level": "derived data",
"experiment type": "ecephys",
"modality": "Extracellular electrophysiology",
"subject id": str(session.subject),
}

create_data_asset_request = aind_codeocean_requests.CreateDataAssetRequest(
name=data_asset_name,
mount=data_asset_name,
Expand Down
25 changes: 4 additions & 21 deletions src/npc_lims/scripts/run_video_processing.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,10 @@
import npc_lims.status as status


def run_helper(session_info: status.SessionInfo, model_name: str, num_jobs: int) -> int:
if not getattr(session_info, f"is_{model_name}"):
# codeocean.run_capsule_or_pipeline(session_info.id, model_name)
num_jobs += 1

return num_jobs
import npc_lims

DLC_EYE_CAPSULE_ID = "4cf0be83-2245-4bb1-a55c-a78201b14bfe"
DLC_PLOT_CAPSULE_ID = "85097267-9d7b-40a4-81db-75f38a35c67e"

def main() -> None:
num_jobs = 0
for session_info in status.get_session_info():
if not session_info.is_uploaded:
continue

num_jobs = run_helper(session_info, "dlc_eye", num_jobs)
num_jobs = run_helper(session_info, "dlc_side", num_jobs)
num_jobs = run_helper(session_info, "dlc_face", num_jobs)
num_jobs = run_helper(session_info, "facemap", num_jobs)

if num_jobs == 12:
break
npc_lims.process_capsule_or_pipeline_queue(DLC_PLOT_CAPSULE_ID, 'dlc_pupil_validation', rerun_all_jobs=True, create_data_assets_from_results=False)


if __name__ == "__main__":
Expand Down

0 comments on commit 36a4873

Please sign in to comment.