-
Notifications
You must be signed in to change notification settings - Fork 0
/
rtc_otf.py
493 lines (437 loc) · 22.5 KB
/
rtc_otf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
import yaml
import argparse
import os
import asf_search as asf
from eof.download import download_eofs
import logging
import zipfile
from shapely.geometry import Polygon, box
import rasterio
from dem_stitcher import stitch_dem
import docker
from utils import *
from etad import *
import time
import shutil
import json
import geopandas as gpd
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
fh = logging.FileHandler('run.log')
logger = logging.getLogger()
logger.addHandler(fh)
def update_timing_file(key, time, path, replace=False):
"""Update the timing json at specified path. Creates if doesn't exists
Args:
key (str): key for the timing dict
time (floar): time in seconds
path (str): path to the timing file
replace (bool, optional): Replace a value in the file if it already exists. Defaults to False.
"""
if os.path.exists(path):
with open(path, 'r') as fp:
timing = json.load(fp)
else:
timing = {}
if ((key not in timing) or replace):
timing[key] = time
# update total
total_t = sum([timing[k] for k in timing.keys() if k != 'Total'])
timing['Total'] = total_t
with open(path, 'w') as fp:
json.dump(timing, fp)
def run_process(args):
t_start = time.time()
# define success tracker
success = {'opera-rtc': []}
failed = {'opera-rtc': []}
# read in the config for on the fly (otf) processing
with open(args.config, 'r', encoding='utf8') as fin:
otf_cfg = yaml.safe_load(fin.read())
# read in aws credentials and set as environ vars
logging.info(f'setting aws credentials from : {otf_cfg["aws_credentials"]}')
with open(otf_cfg['aws_credentials'], "r", encoding='utf8') as f:
aws_cfg = yaml.safe_load(f.read())
# set all keys as environment variables
for k in aws_cfg.keys():
logging.info(f'setting {k}')
os.environ[k] = aws_cfg[k]
# loop through the list of scenes
# download data -> produce backscatter -> save
for i, scene in enumerate(otf_cfg['scenes']):
timing = {}
t0 = time.time()
# add the scene name to the out folder
OUT_FOLDER = otf_cfg['OPERA_output_folder']
SCENE_OUT_FOLDER = os.path.join(OUT_FOLDER,scene)
os.makedirs(SCENE_OUT_FOLDER, exist_ok=True)
# make the timing file
TIMING_FILE = scene + '_timing.json'
TIMING_FILE_PATH = os.path.join(otf_cfg['OPERA_output_folder'],TIMING_FILE)
logging.info(f'processing scene {i+1} of {len(otf_cfg["scenes"])} : {scene}')
logging.info(f'PROCESS 1: Downloads')
# search for the scene in asf
logging.info(f'searching asf for scene...')
asf.constants.CMR_TIMEOUT = 45
logging.debug(f'CMR will timeout in {asf.constants.CMR_TIMEOUT}s')
asf_results = asf.granule_search([scene], asf.ASFSearchOptions(processingLevel='SLC'))
if len(asf_results) > 0:
logging.info(f'scene found')
asf_result = asf_results[0]
else:
logging.error(f'scene not found : {scene}')
run_success = False
failed['opera-rtc'].append(scene)
continue
# read in credentials to download from ASF
logging.info(f'setting earthdata credentials from: {otf_cfg["earthdata_credentials"]}')
with open(otf_cfg['earthdata_credentials'], "r", encoding='utf8') as f:
earthdata_cfg = yaml.safe_load(f.read())
earthdata_uid = earthdata_cfg['login']
earthdata_pswd = earthdata_cfg['password']
# download scene
logging.info(f'downloading scene')
session = asf.ASFSession()
session.auth_with_creds(earthdata_uid,earthdata_pswd)
SCENE_NAME = asf_result.__dict__['umm']['GranuleUR'].split('-')[0]
POLARIZATION = asf_result.properties['polarization']
POLARIZATION_TYPE = 'dual-pol' if len(POLARIZATION) > 2 else 'co-pol' # string for template value
scene_zip = os.path.join(otf_cfg['scene_folder'], SCENE_NAME + '.zip')
asf_result.download(path=otf_cfg['scene_folder'], session=session)
# unzip scene
ORIGINAL_SAFE_PATH = scene_zip.replace(".zip",".SAFE")
if (otf_cfg['unzip_scene'] or otf_cfg['apply_ETAD']) and not os.path.exists(ORIGINAL_SAFE_PATH):
logging.info(f'unzipping scene to {ORIGINAL_SAFE_PATH}')
with zipfile.ZipFile(scene_zip, 'r') as zip_ref:
zip_ref.extractall(otf_cfg['scene_folder'])
# apply the ETAD corrections to the SLC
if otf_cfg['apply_ETAD']:
logging.info('Applying ETAD corrections')
logging.info(f'loading copernicus credentials from: {otf_cfg["copernicus_credentials"]}')
with open(otf_cfg['copernicus_credentials'], "r", encoding='utf8') as f:
copernicus_cfg = yaml.safe_load(f.read())
copernicus_uid = copernicus_cfg['login']
copernicus_pswd = copernicus_cfg['password']
etad_path = download_scene_etad(
SCENE_NAME,
copernicus_uid,
copernicus_pswd, etad_dir=otf_cfg['ETAD_folder'])
ETAD_SCENE_FOLDER = f'{otf_cfg["scene_folder"]}_ETAD'
logging.info(f'making new directory for etad corrected slc : {ETAD_SCENE_FOLDER}')
ETAD_SAFE_PATH = apply_etad_correction(
ORIGINAL_SAFE_PATH,
etad_path,
out_dir=ETAD_SCENE_FOLDER,
nthreads=otf_cfg['gdal_threads'])
# set as the safe file for processing
SAFE_PATH = ORIGINAL_SAFE_PATH if not otf_cfg['apply_ETAD'] else ETAD_SAFE_PATH
t1 = time.time()
update_timing_file('Download Scene', t1 - t0, TIMING_FILE_PATH)
# download orbits
logging.info(f'downloading orbit files for scene')
prec_orb_files = download_eofs(sentinel_file=scene_zip,
save_dir=otf_cfg['precise_orbit_folder'],
orbit_type='precise')
if len(prec_orb_files) > 0:
ORBIT_PATH = str(prec_orb_files[0])
logging.info(f'using precise orbits: {ORBIT_PATH}')
else:
#download restituted orbits
res_orb_files = download_eofs(sentinel_file=scene_zip,
save_dir=otf_cfg['restituted_orbit_folder'],
orbit_type='restituted',
asf_user=earthdata_uid,
asf_password=earthdata_pswd,
)
ORBIT_PATH = str(res_orb_files[0])
logging.info(f'using restituted orbits: {ORBIT_PATH}')
t2 = time.time()
update_timing_file('Download Orbits', t2 - t1, TIMING_FILE_PATH)
# download a DEM covering the region of interest
# first get the coordinates from the asf search result
points = (asf_result.__dict__['umm']['SpatialExtent']['HorizontalSpatialDomain']
['Geometry']['GPolygons'][0]['Boundary']['Points'])
points = [(p['Longitude'],p['Latitude']) for p in points]
scene_poly = Polygon(points)
scene_bounds = scene_poly.bounds
logging.info(f'Scene bounds : {scene_bounds}')
if otf_cfg['dem_path'] is not None:
# set the dem to be the one specified if supplied
logging.info(f'using DEM path specified : {otf_cfg["dem_path"]}')
if not os.path.exists(otf_cfg['dem_path']):
raise FileExistsError(f'{otf_cfg["dem_path"]} c')
else:
DEM_PATH = otf_cfg['dem_path']
dem_filename = os.path.basename(DEM_PATH)
otf_cfg['dem_folder'] = os.path.dirname(DEM_PATH) # set the dem folder
otf_cfg['overwrite_dem'] = False # do not overwrite dem
else:
# make folders and set filenames
dem_dl_folder = os.path.join(otf_cfg['dem_folder'],otf_cfg['dem_type'])
os.makedirs(dem_dl_folder, exist_ok=True)
dem_filename = SCENE_NAME + '_dem.tif'
DEM_PATH = os.path.join(dem_dl_folder,dem_filename)
# check if scene crosses the AM
antimeridian_crossing = check_s1_bounds_cross_antimeridian(scene_bounds, max_scene_width=8)
if antimeridian_crossing:
if any([otf_cfg['overwrite_dem'],not os.path.exists(DEM_PATH)]):
trg_crs = 3031 # NOTE Assuming in Antarctic region only
logging.warning('DEM crosses the antimeridian, splitting left right side and merging')
# split into bounds either side of the AM
bounds_left, bounds_right = split_am_crossing(scene_poly, lat_buff=0.2)
AM_DEMS = []
# get left dem
for side, split_bounds in [('left',bounds_left), ('right',bounds_right)]:
logging.info(f'Getting DEM for {side} side of AM')
split_bounds = adjust_scene_poly_at_extreme_lat(split_bounds, 4326, 3031).bounds
logging.info(f'DEM bounds: {split_bounds}')
DEM_PATH_SPLIT = DEM_PATH.replace('.tif',f'_{side}.tif')
DEM_PATH_SPLIT_REPROJ = DEM_PATH.replace('.tif',f'_{side}_{trg_crs}.tif')
dem_data, dem_meta = stitch_dem(split_bounds,
dem_name='glo_30',
dst_ellipsoidal_height=True,
dst_area_or_point='Point',
merge_nodata_value=0,
fill_to_bounds=True,
)
with rasterio.open(DEM_PATH_SPLIT, 'w', **dem_meta) as ds:
ds.write(dem_data,1)
ds.update_tags(AREA_OR_POINT='Point')
logging.info(f'Reprojecting DEM for {side} side of AM to {trg_crs}')
reproject_raster(DEM_PATH_SPLIT , DEM_PATH_SPLIT_REPROJ,trg_crs)
os.remove(DEM_PATH_SPLIT)
AM_DEMS.append(DEM_PATH_SPLIT_REPROJ)
# merge the dems
rasterio.merge.merge(AM_DEMS,dst_path=DEM_PATH)
for f in AM_DEMS:
os.remove(f)
else:
logging.info(f'Using existing DEM : {DEM_PATH}')
else:
# if we are at high latitudes we need to correct the bounds due to the skewed box shape
if (scene_bounds[1] < -50) or (scene_bounds[3] < -50):
# Southern Hemisphere
logging.info(f'Adjusting scene bounds due to warping at high latitude')
scene_poly = adjust_scene_poly_at_extreme_lat(scene_bounds, 4326, 3031)
scene_bounds = scene_poly.bounds
logging.info(f'Adjusted scene bounds : {scene_bounds}')
if (scene_bounds[1] > 50) or (scene_bounds[3] > 50):
# Northern Hemisphere
logging.info(f'Adjusting scene bounds due to warping at high latitude')
scene_poly = adjust_scene_poly_at_extreme_lat(scene_bounds, 4326, 3995)
scene_bounds = scene_poly.bounds
logging.info(f'Adjusted scene bounds : {scene_bounds}')
buffer = 0.1
scene_bounds_buf = scene_poly.buffer(buffer).bounds #buffered
if any([otf_cfg['overwrite_dem'],not os.path.exists(DEM_PATH)]):
logging.info(f'Downloding DEM for bounds : {scene_bounds_buf}')
logging.info(f'type of DEM being downloaded : {otf_cfg["dem_type"]}')
# get the DEM and geometry information
dem_data, dem_meta = stitch_dem(scene_bounds_buf,
dem_name=otf_cfg['dem_type'],
dst_ellipsoidal_height=True,
dst_area_or_point='Point',
merge_nodata_value=0,
fill_to_bounds=True,
)
# save with rasterio
logging.info(f'saving dem to {DEM_PATH}')
with rasterio.open(DEM_PATH, 'w', **dem_meta) as ds:
ds.write(dem_data, 1)
ds.update_tags(AREA_OR_POINT='Point')
del dem_data
else:
logging.info(f'Using existing DEM : {DEM_PATH}')
t3 = time.time()
update_timing_file('Download DEM', t3 - t2, TIMING_FILE_PATH)
# now we have downloaded all the necessary data, we can create a
# config for the scene we want to process
with open(otf_cfg['OPERA_rtc_remplate'], 'r') as f:
template_text = f.read()
# search for the strings we want to replaces
template_text = template_text.replace('SAFE_PATH',SAFE_PATH)
template_text = template_text.replace('ORBIT_PATH',ORBIT_PATH)
template_text = template_text.replace('DEM_PATH',DEM_PATH)
template_text = template_text.replace('SCENE_NAME',SCENE_NAME)
template_text = template_text.replace('OPERA_SCRATCH_FOLDER',
otf_cfg['OPERA_scratch_folder'])
template_text = template_text.replace('OPERA_OUTPUT_FOLDER',
SCENE_OUT_FOLDER)
template_text = template_text.replace('POLARIZATION_TYPE',
POLARIZATION_TYPE)
template_text = template_text.replace('X_RESOLUTION',
str(otf_cfg['OPERA_x_resolution']))
template_text = template_text.replace('Y_RESOLUTION',
str(otf_cfg['OPERA_y_resolution']))
TARGET_CRS = otf_cfg['OPERA_crs'] if otf_cfg['OPERA_crs'] is not None else ''
template_text = template_text.replace('TARGET_CRS',
str(TARGET_CRS))
opera_config_name = SCENE_NAME + '.yaml'
opera_config_path = os.path.join(otf_cfg['OPERA_config_folder'], opera_config_name)
with open(opera_config_path, 'w') as f:
f.write(template_text)
# read in the config template for the RTC runs
with open(opera_config_path, 'r', encoding='utf8') as fin:
opera_rtc_cfg = yaml.safe_load(fin.read())
logging.info(f'PROCESS 2: Produce Backscatter')
# run the docker container from the command line\
client = docker.from_env()
# the command we run in the container.
docker_command = f'rtc_s1.py {opera_config_path}'
# We mount the data folder in the container in the same location.
# This is so the files can be accessed by the program at the paths specified
volumes = [
f'{otf_cfg["OPERA_config_folder"]}:{otf_cfg["OPERA_config_folder"]}',
f'{otf_cfg["OPERA_scratch_folder"]}:{otf_cfg["OPERA_scratch_folder"]}',
f'{otf_cfg["OPERA_output_folder"]}:{otf_cfg["OPERA_output_folder"]}',
f'{otf_cfg["scene_folder"]}:{otf_cfg["scene_folder"]}',
f'{otf_cfg["precise_orbit_folder"]}:{otf_cfg["precise_orbit_folder"]}',
f'{otf_cfg["restituted_orbit_folder"]}:{otf_cfg["restituted_orbit_folder"]}',
f'{otf_cfg["dem_folder"]}:{otf_cfg["dem_folder"]}',
]
if otf_cfg['apply_ETAD']:
volumes.append(f'{otf_cfg["scene_folder"]}_ETAD:{otf_cfg["scene_folder"]}_ETAD')
# setup file for logs
logging.info(f'Running the container, this may take some time...')
prod_id = (opera_rtc_cfg['runconfig']
['groups']['product_group']['product_id'])
log_path = os.path.join(SCENE_OUT_FOLDER, prod_id +'.logs')
logging.info(f'logs will be saved to {log_path}')
if not otf_cfg["skip_rtc"]:
container = client.containers.run(f'opera/rtc:final_1.0.1',
docker_command,
volumes=volumes,
user='rtc_user',
detach=True,
stdout=True,
stderr=True,
stream=True)
l, t = 0, 0
# show the logs while the container is running
while container.status in ['created', 'running']:
# refresh every 5 seconds
if ((int(time.time())%5 == 0) and (int(time.time()!=t))):
logs = container.logs()
if len(logs) != l:
# new logs added in container, show these here
new_logs = logs[l:].decode("utf-8")
logging.info(new_logs)
l = len(logs)
container.reload()
t = int(time.time())
# write the logs from the container
# TODO write to file in above, no need to keep
# the full logs in memory
with open(log_path, 'w') as f:
f.write(logs.decode("utf-8"))
# kill the container once processing is done
try:
container.kill()
logging.info('killing container')
except:
logging.info('container already killed')
# check if the final products exist, indicating success
h5_path = os.path.join(SCENE_OUT_FOLDER,
prod_id + '.h5')
# keep track of success
if os.path.exists(h5_path):
clear_logs=True
run_success = True
success['opera-rtc'].append(h5_path)
logging.info(f'RTC Backscatter successfully made')
else:
clear_logs = False
run_success = False
failed['opera-rtc'].append(h5_path)
logging.info(f'RTC Backscatter failed')
# get the crs of the final scene
tifs = [x for x in os.listdir(SCENE_OUT_FOLDER) if '.tif' in x]
tif = os.path.join(SCENE_OUT_FOLDER,tifs[0])
with rasterio.open(tif) as src:
trg_crs = str(src.meta['crs'])
logging.info(f'CRS of mosaic: {trg_crs}')
t4 = time.time()
update_timing_file('RTC Processing', t4 - t3, TIMING_FILE_PATH)
if otf_cfg['push_to_s3'] and run_success:
logging.info(f'PROCESS 3: Push results to S3 bucket')
bucket = otf_cfg['s3_bucket']
outputs = [x for x in os.listdir(SCENE_OUT_FOLDER) if SCENE_NAME in x]
# set the path in the bucket
SCENE_PREFIX = '' if otf_cfg["scene_prefix"] == None else otf_cfg["scene_prefix"]
S3_BUCKET_FOLDER = '' if otf_cfg["s3_bucket_folder"] == None else otf_cfg["s3_bucket_folder"]
bucket_folder = os.path.join(S3_BUCKET_FOLDER,
otf_cfg["software"],
otf_cfg['dem_type'],
f'{trg_crs.split(":")[-1]}',
f'{SCENE_PREFIX}{SCENE_NAME}')
for file_ in outputs:
file_path = os.path.join(SCENE_OUT_FOLDER,file_)
bucket_path = os.path.join(bucket_folder,file_)
logging.info(f'Uploading file: {file_path}')
logging.info(f'Destination: {bucket_path}')
upload_file(file_name=file_path,
bucket=bucket,
object_name=bucket_path)
# push the config
logging.info(f'Uploading file: {opera_config_path}')
bucket_path = os.path.join(bucket_folder,opera_config_name)
logging.info(f'Destination: {bucket_path}')
upload_file(file_name=opera_config_path,
bucket=bucket,
object_name=bucket_path)
if otf_cfg['upload_dem']:
bucket_path = os.path.join(bucket_folder,dem_filename)
logging.info(f'Uploading file: {DEM_PATH}')
upload_file(DEM_PATH,
bucket=bucket,
object_name=bucket_path)
t5 = time.time()
update_timing_file('S3 Upload', t5 - t4, TIMING_FILE_PATH)
if otf_cfg['delete_local_files']:
logging.info(f'PROCESS 4: Clear files locally')
#clear downloads
for file_ in [scene_zip,
DEM_PATH,
ORBIT_PATH,
opera_config_path,
]:
logging.info(f'Deleteing {file_}')
os.remove(file_)
logging.info(f'Clearing SAFE directory: {ORIGINAL_SAFE_PATH}')
shutil.rmtree(ORIGINAL_SAFE_PATH)
if otf_cfg['apply_ETAD']:
logging.info(f'Clearing ETAD corrected SAFE directory: {ETAD_SAFE_PATH}')
shutil.rmtree(ETAD_SAFE_PATH)
logging.info(f'Clearing directory: {ETAD_SAFE_PATH}')
shutil.rmtree(SCENE_OUT_FOLDER)
shutil.rmtree(otf_cfg['OPERA_scratch_folder'])
# remake the scratch folder
os.makedirs(otf_cfg['OPERA_scratch_folder'])
t6 = time.time()
update_timing_file('Delete Files', t6 - t5, TIMING_FILE_PATH)
logging.info(f'Scene finished: {SCENE_NAME}')
logging.info(f'Elapsed time: {((t6 - t0)/60)} minutes')
# push timings + logs to s3
if otf_cfg['push_to_s3'] and run_success:
bucket_path = os.path.join(bucket_folder, TIMING_FILE)
logging.info(f'Uploading file: {TIMING_FILE_PATH}')
logging.info(f'Destination: {bucket_path}')
upload_file(file_name=TIMING_FILE_PATH,
bucket=bucket,
object_name=bucket_path)
os.remove(TIMING_FILE_PATH)
logging.info(f'Run complete, {len(otf_cfg["scenes"])} scenes processed')
logging.info(f'{len(success["opera-rtc"])} scenes successfully processed: ')
for s in success['opera-rtc']:
logging.info(f'{s}')
logging.info(f'{len(failed["opera-rtc"])} scenes FAILED: ')
for s in failed['opera-rtc']:
logging.info(f'{s}')
logging.info(f'Elapsed time: {((time.time() - t_start)/60)} minutes')
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--config", "-c", help="path to config.yml", required=True, type=str)
args = parser.parse_args()
run_process(args)