-
Notifications
You must be signed in to change notification settings - Fork 0
/
get_rawacfs.py
228 lines (181 loc) · 7.12 KB
/
get_rawacfs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
#!/usr/bin/env python3
# coding: utf-8
"""
Downloads and combines rawACF files for the specified date from the specified source.
"""
import sys
import os
import datetime
import socket
import time
import helper
import subprocess
import bz2
import re
from glob import glob
DELAY = 60 # 1 minute
RETRIES = 15 # Try to connect every 30 minutes for a day
TIMEOUT = 10 # seconds
# Global date variable
date = None
def main(dateString, dataSource=None):
"""
Downloads rawACF files for the specified date from the specified source and combines them.
Args:
dateString (str): The date in 'YYYYMMDD' format.
dataSource (str, optional): The data source ('bas' or 'globus'). Defaults to 'bas'.
"""
global date
date = datetime.datetime.strptime(dateString, '%Y%m%d')
if dataSource is None:
dataSource = 'bas'
else:
dataSource = dataSource.lower().strip()
download_source_files(dataSource)
def download_source_files(dataSource):
"""
Downloads rawACF files from the specified data source.
Args:
dataSource (str): The data source ('bas' or 'globus').
"""
rawDir = date.strftime(helper.RAWACF_DIR_FMT)
os.makedirs(rawDir, exist_ok=True)
if dataSource == 'bas':
download_files_from_bas(rawDir)
elif dataSource == 'globus':
download_files_from_globus(rawDir)
else:
print(f'ERROR: Specified rawACF source is invalid {
dataSource}. Valid options are: \'bas\' or \'globus\'')
def download_files_from_globus(rawDir):
"""
Downloads rawACF files from Globus.
Args:
rawDir (str): The directory to save the downloaded files.
"""
# Start Globus Connect Personal and establish connection
# Also allow access to /project/superdarn/data/
subprocess.call(
f'{helper.GLOBUS_PATH} -start -restrict-paths \'rw~/,rw/project/superdarn/data\' &', shell=True)
# Initiate the transfer from Globus to APL
subprocess.call(f'nohup /project/superdarn/software/python-3.8.1/bin/python3 /homes/superdarn/superdarn/globus/sync_radar_data_globus.py -y {
date.year} -m {date.month} -t raw {rawDir}', shell=True)
# Stop Globus Connect Personal
subprocess.call(f'{helper.GLOBUS_PATH} -stop', shell=True)
# emailSubject = f'"{date.strftime("%Y/%m")} rawACF Data Successfully Downloaded from Globus"'
# emailBody = f'"{date.strftime("%Y/%m")} rawACF source files have been downloaded. Starting conversion to fitACF and netCDF."'
# helper.send_email(emailSubject, emailBody)
def download_files_from_bas(rawDir):
"""
Downloads rawACF files from BAS.
Args:
rawDir (str): The directory to save the downloaded files.
"""
basRawDir = date.strftime(helper.BAS_RAWACF_DIR_FMT)
# Make sure the BAS server is reachable
if not BASServerConnected():
# Send email if BAS couldn't be reached
emailSubject = '"Unable to reach BAS"'
emailBody = 'Unable to reach BAS after trying for {} minutes.'.format(
RETRIES * DELAY / 60)
helper.send_email(emailSubject, emailBody)
sys.exit('{message}'.format(message=emailBody))
dateString = date.strftime('%Y%m%d')
print(f'Downloading {dateString} rawACFs from BAS')
rsyncLogDir = os.path.join(
helper.LOG_DIR, 'BAS_rsync_logs', date.strftime('%Y'))
os.makedirs(rsyncLogDir, exist_ok=True)
rsyncLogFilename = f'BAS_rsync_{dateString}.out'
fullLogFilename = os.path.join(rsyncLogDir, rsyncLogFilename)
rsyncCommand = f'nohup rsync -rv apl@{helper.BAS_SERVER}:{
basRawDir}/{dateString}*.rawacf.bz2 {rawDir} >& {fullLogFilename}'
rsyncProcess = subprocess.Popen(
rsyncCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
rsyncExitCode = rsyncProcess.wait()
if rsyncExitCode == 0:
print(f'Successfully downloaded {dateString} rawACFs from BAS')
else:
# Send an email and end the script if rsync didn't succeed
emailSubject = f'"Unsuccessful attempt to copy {
dateString} BAS rawACF data"'
emailBody = f'"Failed to copy {dateString} rawACFs from BAS with exit code {
rsyncExitCode}. \nSee {fullLogFilename} for more details."'
helper.send_email(emailSubject, emailBody)
print(emailBody)
sys.exit('{message}'.format(message=emailBody))
def combine_source_files():
dateString = date.strftime('%Y%m%d')
print(f'Starting to combine {dateString} rawACF files')
rawDir = date.strftime(helper.RAWACF_DIR_FMT)
# Get all files for the date
filenames = glob(f"{os.path.join(rawDir, dateString)}.*")
radarSites = set() # Use a set to store unique radar sites
for filename in filenames:
# Use regular expression to extract the station string
# E.g. get 'inv.a' from 20230901.2200.03.inv.a.rawacf.bz2
match = re.search(r'\d{8}\.\d{4}\.\d{2}\.(.*?)\.rawacf\.bz2', filename)
if match:
radarSites.add(match.group(1))
for site in radarSites:
siteFilesFormat = os.path.join(rawDir, f"{dateString}*{site}*")
siteFiles = glob.glob(siteFilesFormat)
outputFilename = f"{dateString}.{site}.rawacf"
fullOutputFilename = os.path.join(rawDir, outputFilename)
unzipAndCombine(siteFiles, fullOutputFilename)
def unzipAndCombine(files, outputFile):
"""
Unzips and combines the given files into a single output file.
Args:
files: A list of file paths to the files to be unzipped and combined.
outputFile: The path to the output file.
"""
with open(outputFile, "wb") as f_out:
for file in files:
with bz2.open(file, "rb") as f_in:
f_out.write(f_in.read())
def BASServerConnected():
"""
Checks if the BAS server is reachable.
Returns:
bool: True if the server is reachable, False otherwise.
"""
BASup = False
for i in range(RETRIES):
if isOpen(helper.BAS_SERVER, 22):
BASup = True
break
else:
time.sleep(DELAY)
return BASup
def isOpen(server, port):
"""
Checks if a server is reachable on the specified port.
Args:
server (str): The server hostname or IP address.
port (int): The port number to check.
Returns:
bool: True if the server is reachable on the port, False otherwise.
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(TIMEOUT)
try:
s.connect((server, int(port)))
s.shutdown(socket.SHUT_RDWR)
return True
except:
return False
finally:
s.close()
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python script.py <date> [<dataSource>]")
sys.exit(1)
# Extract the day argument in 'YYYYMMDD' format
dateString = sys.argv[1]
# Extract the optional source argument if provided
dataSource = sys.argv[2] if len(sys.argv) > 2 else None
# Check if the day argument is in the correct format
if not dateString.isdigit() or len(dateString) != 8:
print("Date argument must be in 'YYYYMMDD' format.")
sys.exit(1)
main(dateString, dataSource)