Skip to content

Commit

Permalink
parallel downloading waveforms
Browse files Browse the repository at this point in the history
  • Loading branch information
xumi1993 committed Aug 13, 2024
1 parent 7d77e15 commit 883bdcf
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 33 deletions.
3 changes: 3 additions & 0 deletions seispy/para.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def __init__(self):
self.use_remote_data=False
self.data_server_user = None
self.data_server_password = None
self.n_proc = 1
self.stainfo = StaInfo()

def get_para(self):
Expand Down Expand Up @@ -276,6 +277,8 @@ def read_para(cls, cfg_file):
pa.data_server_password = value
elif key == 'data_server':
pa.data_server = value
elif key == 'n_proc':
pa.n_proc = cf.getint('fetch', 'n_proc')
else:
exec('pa.stainfo.{} = value'.format(key))
sections.remove('fetch')
Expand Down
84 changes: 51 additions & 33 deletions seispy/rf.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import argparse
import sys
import pickle
import concurrent.futures


def pickphase(eqs, para, logger):
Expand Down Expand Up @@ -109,6 +110,44 @@ def read_catalog(logpath:str, b_time, e_time, stla:float, stlo:float,
return eq_lst


def process_row(i, size, row, para, model, query, tb, te, logger):
new_col = ['dis', 'bazi', 'data', 'datestr']
datestr = row['date'].strftime('%Y.%j.%H.%M.%S')
daz = distaz(para.stainfo.stla, para.stainfo.stlo, row['evla'], row['evlo'])
arrivals = model.get_travel_times(row['evdp'], daz.delta, phase_list=[para.phase])

if not arrivals:
logger.RFlog.error('The phase of {} with source depth {} and distance {} is not exists'.format(
para.phase, row['evdp'], daz.delta))
return None

if len(arrivals) > 1:
logger.RFlog.error('More than one phase were calculated with source depth of {} and distance of {}'.format(
row['evdp'], daz.delta))
return None

arr_time = arrivals[0].time
t1 = row['date'] + arr_time - tb
t2 = row['date'] + arr_time + te
try:
logger.RFlog.info('Fetch waveforms of event {} ({}/{}) from {}'.format(datestr, i, size, para.data_server))
st = query.client.get_waveforms(para.stainfo.network, para.stainfo.station,
para.stainfo.location, para.stainfo.channel, t1, t2)
_add_header(st, row['date'], para.stainfo)
except Exception as e:
logger.RFlog.error('Error in fetching waveforms of event {}: {}'.format(datestr, str(e).strip()))
return None

try:
this_eq = EQ.from_stream(st)
except Exception as e:
logger.RFlog.error('{}'.format(e))
return None

this_eq.get_time_offset(row['date'])
this_df = pd.DataFrame([[daz.delta, daz.baz, this_eq, datestr]], columns=new_col, index=[i])
return this_df

def fetch_waveform(eq_lst, para, model, logger):
"""Fetch waveforms from remote data server
Expand All @@ -129,43 +168,22 @@ def fetch_waveform(eq_lst, para, model, logger):
query = para.stainfo.query
except:
logger.RFlog.error('Please load station information and search earthquake before fetch waveform')
new_col = ['dis', 'bazi', 'data', 'datestr']
eqall = []
for i, row in eq_lst.iterrows():
datestr = row['date'].strftime('%Y.%j.%H.%M.%S')
daz = distaz(para.stainfo.stla, para.stainfo.stlo, row['evla'], row['evlo'])
arrivals = model.get_travel_times(row['evdp'], daz.delta, phase_list=[para.phase])
if not arrivals:
logger.RFlog.error('The phase of {} with source depth {} and distance {} is not exists'.format(
para.phase, row['evdp'], daz.delta))
continue
if len(arrivals) > 1:
logger.RFlog.error('More than one phase were calculated with source depth of {} and distance of {}'.format(
row['evdp'], daz.delta))
else:
arr_time = arrivals[0].time
t1 = row['date']+arr_time-tb
t2 = row['date']+arr_time+te
try:
logger.RFlog.info('Fetch waveforms of ({}/{}) event {} from {}'.format(
i+1, eq_lst.shape[0], datestr, para.data_server))
st = query.client.get_waveforms(para.stainfo.network, para.stainfo.station,
para.stainfo.location, para.stainfo.channel, t1, t2)
_add_header(st, row['date'], para.stainfo)
except Exception as e:
logger.RFlog.error('Error in fetching waveforms of event {}: {}'.format(datestr, str(e).strip()))
continue
try:
this_eq = EQ.from_stream(st)
except Exception as e:
logger.RFlog.error('{}'.format(e))
continue
this_eq.get_time_offset(row['date'])
this_df = pd.DataFrame([[daz.delta, daz.baz, this_eq, datestr]], columns=new_col, index=[i])
eqall.append(this_df)

# parallel downloading waveforms
with concurrent.futures.ProcessPoolExecutor(max_workers=para.n_proc) as executor:
futures = {executor.submit(process_row, i, eq_lst.shape[0], row, para, model, query, tb, te, logger): i for i, row in eq_lst.iterrows()}

for future in concurrent.futures.as_completed(futures):
result = future.result()
if result is not None:
eqall.append(result)

if not eqall:
logger.RFlog.error('No waveforms fetched')
sys.exit(1)

# list to DataFrame
eq_match = pd.concat(eqall)
ind = eq_match.index.drop_duplicates(keep=False)
eq_match = eq_match.loc[ind]
Expand Down

0 comments on commit 883bdcf

Please sign in to comment.