forked from globus/globus-flows-trigger-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
trigger_transfer_flow.py
executable file
·104 lines (83 loc) · 3.12 KB
/
trigger_transfer_flow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env python
import argparse
import os
# This could go into a different file and be invoked without the file watcher
from flows_service import create_flows_client
from watch import FileTrigger, translate_local_path_to_globus_path
def run_flow(event_file):
# TODO: Specify the flow to run when triggered
flow_id = "REPLACE_WITH_FLOW_ID"
fc = create_flows_client(flow_id=flow_id)
# TODO: Set a label for the flow run
# Default includes the file name that triggered the run
flow_label = f"Trigger transfer: {os.path.basename(event_file)}"
# TODO: Modify source collection ID
# Source collection must be on the endpoint where this trigger code is running
source_id = "REPLACE_WITH_SOURCE_COLLECTION_ID"
# TODO: Modify destination collection ID
# Default is "Globus Tutorials on ALCF Eagle"
destination_id = "a6f165fa-aee2-4fe5-95f3-97429c28bf82"
# TODO: Modify destination collection path
# Update path to include your user name e.g. /automate-tutorial/dev1/
destination_base_path = "/automation-tutorial/USERNAME/"
# Get the Globus-compatible directory name where the triggering file is stored.
event_folder = os.path.dirname(event_file)
source_path = translate_local_path_to_globus_path(event_folder)
# Get name of monitored folder to use as destination path
# and for setting permissions
event_folder_name = os.path.basename(event_folder)
# Add a trailing '/' to meet Transfer requirements for directory transfer
destination_path = os.path.join(destination_base_path, event_folder_name, "")
# Convert Windows path separators to forward slashes.
destination_path = destination_path.replace("\\", "/")
# Inputs to the flow
flow_input = {
"input": {
"source": {
"id": source_id,
"path": source_path,
},
"destination": {
"id": destination_id,
"path": destination_path,
},
"recursive_tx": True,
}
}
flow_run_request = fc.run_flow(
body=flow_input,
label=flow_label,
tags=["Trigger_Tutorial"],
)
print(f"Transferring: {event_folder_name}")
print(f"https://app.globus.org/runs/{flow_run_request['run_id']}")
# Parse input arguments
def parse_args():
parser = argparse.ArgumentParser(
description="""
Watch a directory and trigger a simple transfer flow."""
)
parser.add_argument(
"--watchdir",
type=str,
default=os.path.abspath("."),
help=f"Directory path to watch. [default: current directory]",
)
parser.add_argument(
"--patterns",
type=str,
default="",
nargs="*",
help='Filename suffix pattern(s) that will trigger the flow. [default: ""]',
)
parser.set_defaults(verbose=True)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
# Creates and starts the watcher
trigger = FileTrigger(
watch_dir=os.path.expanduser(args.watchdir),
patterns=args.patterns,
FlowRunner=run_flow,
)
trigger.run()