From 883bdcf12643fe3fa53880b2c587afe9b2db4e8b Mon Sep 17 00:00:00 2001 From: xumi1993 Date: Tue, 13 Aug 2024 11:34:33 +0800 Subject: [PATCH] parallel downloading waveforms --- seispy/para.py | 3 ++ seispy/rf.py | 84 ++++++++++++++++++++++++++++++-------------------- 2 files changed, 54 insertions(+), 33 deletions(-) diff --git a/seispy/para.py b/seispy/para.py index c1c6994c..48786235 100644 --- a/seispy/para.py +++ b/seispy/para.py @@ -134,6 +134,7 @@ def __init__(self): self.use_remote_data=False self.data_server_user = None self.data_server_password = None + self.n_proc = 1 self.stainfo = StaInfo() def get_para(self): @@ -276,6 +277,8 @@ def read_para(cls, cfg_file): pa.data_server_password = value elif key == 'data_server': pa.data_server = value + elif key == 'n_proc': + pa.n_proc = cf.getint('fetch', 'n_proc') else: exec('pa.stainfo.{} = value'.format(key)) sections.remove('fetch') diff --git a/seispy/rf.py b/seispy/rf.py index a8ff9a9e..34045333 100644 --- a/seispy/rf.py +++ b/seispy/rf.py @@ -21,6 +21,7 @@ import argparse import sys import pickle +import concurrent.futures def pickphase(eqs, para, logger): @@ -109,6 +110,44 @@ def read_catalog(logpath:str, b_time, e_time, stla:float, stlo:float, return eq_lst +def process_row(i, size, row, para, model, query, tb, te, logger): + new_col = ['dis', 'bazi', 'data', 'datestr'] + datestr = row['date'].strftime('%Y.%j.%H.%M.%S') + daz = distaz(para.stainfo.stla, para.stainfo.stlo, row['evla'], row['evlo']) + arrivals = model.get_travel_times(row['evdp'], daz.delta, phase_list=[para.phase]) + + if not arrivals: + logger.RFlog.error('The phase of {} with source depth {} and distance {} is not exists'.format( + para.phase, row['evdp'], daz.delta)) + return None + + if len(arrivals) > 1: + logger.RFlog.error('More than one phase were calculated with source depth of {} and distance of {}'.format( + row['evdp'], daz.delta)) + return None + + arr_time = arrivals[0].time + t1 = row['date'] + arr_time - tb + t2 = row['date'] + arr_time + te + try: + logger.RFlog.info('Fetch waveforms of event {} ({}/{}) from {}'.format(datestr, i, size, para.data_server)) + st = query.client.get_waveforms(para.stainfo.network, para.stainfo.station, + para.stainfo.location, para.stainfo.channel, t1, t2) + _add_header(st, row['date'], para.stainfo) + except Exception as e: + logger.RFlog.error('Error in fetching waveforms of event {}: {}'.format(datestr, str(e).strip())) + return None + + try: + this_eq = EQ.from_stream(st) + except Exception as e: + logger.RFlog.error('{}'.format(e)) + return None + + this_eq.get_time_offset(row['date']) + this_df = pd.DataFrame([[daz.delta, daz.baz, this_eq, datestr]], columns=new_col, index=[i]) + return this_df + def fetch_waveform(eq_lst, para, model, logger): """Fetch waveforms from remote data server @@ -129,43 +168,22 @@ def fetch_waveform(eq_lst, para, model, logger): query = para.stainfo.query except: logger.RFlog.error('Please load station information and search earthquake before fetch waveform') - new_col = ['dis', 'bazi', 'data', 'datestr'] eqall = [] - for i, row in eq_lst.iterrows(): - datestr = row['date'].strftime('%Y.%j.%H.%M.%S') - daz = distaz(para.stainfo.stla, para.stainfo.stlo, row['evla'], row['evlo']) - arrivals = model.get_travel_times(row['evdp'], daz.delta, phase_list=[para.phase]) - if not arrivals: - logger.RFlog.error('The phase of {} with source depth {} and distance {} is not exists'.format( - para.phase, row['evdp'], daz.delta)) - continue - if len(arrivals) > 1: - logger.RFlog.error('More than one phase were calculated with source depth of {} and distance of {}'.format( - row['evdp'], daz.delta)) - else: - arr_time = arrivals[0].time - t1 = row['date']+arr_time-tb - t2 = row['date']+arr_time+te - try: - logger.RFlog.info('Fetch waveforms of ({}/{}) event {} from {}'.format( - i+1, eq_lst.shape[0], datestr, para.data_server)) - st = query.client.get_waveforms(para.stainfo.network, para.stainfo.station, - para.stainfo.location, para.stainfo.channel, t1, t2) - _add_header(st, row['date'], para.stainfo) - except Exception as e: - logger.RFlog.error('Error in fetching waveforms of event {}: {}'.format(datestr, str(e).strip())) - continue - try: - this_eq = EQ.from_stream(st) - except Exception as e: - logger.RFlog.error('{}'.format(e)) - continue - this_eq.get_time_offset(row['date']) - this_df = pd.DataFrame([[daz.delta, daz.baz, this_eq, datestr]], columns=new_col, index=[i]) - eqall.append(this_df) + + # parallel downloading waveforms + with concurrent.futures.ProcessPoolExecutor(max_workers=para.n_proc) as executor: + futures = {executor.submit(process_row, i, eq_lst.shape[0], row, para, model, query, tb, te, logger): i for i, row in eq_lst.iterrows()} + + for future in concurrent.futures.as_completed(futures): + result = future.result() + if result is not None: + eqall.append(result) + if not eqall: logger.RFlog.error('No waveforms fetched') sys.exit(1) + + # list to DataFrame eq_match = pd.concat(eqall) ind = eq_match.index.drop_duplicates(keep=False) eq_match = eq_match.loc[ind]