'''This is a newer version of preprocess.py meant to be used with pyasdf.Now, we will have to work in a very different manner than for .mseed filesand process files station wise rather than event wise.:copyright: The PyGLImER development team (makus@gfz-potsdam.de).:license: GNU Lesser General Public License, Version 3 (https://www.gnu.org/copyleft/lesser.html):author: Peter Makus (makus@gfz-potsdam.de)Created: Thursday, 18th February 2021 02:26:03 pmLast Modified: Tuesday, 25th October 2022 12:26:09 pm'''fromglobimportglobimportloggingimportosfromtypingimportList,TupleimportnumpyasnpfromjoblibimportParallel,delayedimportobspyfromobspyimportStream,UTCDateTimefromobspy.geodeticsimportgps2dist_azimuth,kilometer2degreesfromtqdm.stdimporttqdmfrompyglimerimportconstantsfrompyglimer.database.rfh5importRFDataBasefrompyglimer.database.rawimportRawDatabasefrompyglimer.utils.logimportcreate_mpi_loggerfrom.qcimportqcp,qcsfrom.rotateimportrotate_LQT_min,rotate_PSVfrom..rf.createimportRFStream,createRF# program-specific Exceptions
[docs]classSNRError(Exception):"""raised when the SNR is too high"""# Constructor methoddef__init__(self,value):self.value=value# __str__ display functiondef__str__(self):returnrepr(self.value)
[docs]classStreamLengthError(Exception):"""raised when stream has fewer than 3 components"""# Constructor methoddef__init__(self,value):self.value=value# __str__ display functiondef__str__(self):returnrepr(self.value)
[docs]defpreprocessh5(phase:str,rot:str,pol:str,taper_perc:float,model:obspy.taup.TauPyModel,taper_type:str,tz:int,ta:int,rawloc:str,rfloc:str,deconmeth:str,hc_filt:float,netrestr:str,statrestr:str,logger:logging.Logger,rflogger:logging.Logger,client:str,evtcat:obspy.Catalog,remove_response:bool):""" Preprocess files saved in hdf5 (pyasdf) format. Will save the computed receiver functions in hdf5 format as well. Processing is done via a multiprocessing backend (either joblib or mpi). :param phase: The Teleseismic phase to consider :type phase: str :param rot: The Coordinate system that the seismogram should be rotated to. :type rot: str :param pol: Polarisationfor PRFs. Can be either 'v' or 'h' (vertical or horizontal). :type pol: str :param taper_perc: Percentage for the pre deconvolution taper. :type taper_perc: float :param model: TauPyModel to be used for travel time computations :type model: obspy.taup.TauPyModel :param taper_type: type of taper (see obspy) :type taper_type: str :param tz: Length of time window before theoretical arrival (seconds) :type tz: int :param ta: Length of time window after theoretical arrival (seconds) :type ta: int :param rawloc: Directory, in which the raw data is saved. :type rawloc: str :param rfloc: Directory to save the receiver functions in. :type rfloc: str :param deconmeth: Deconvolution method to use. :type deconmeth: str :param hc_filt: Second High-Cut filter (optional, can be None or False) :type hc_filt: float :param netrestr: Network restrictions :type netrestr: str :param statrestr: Station restrictions :type statrestr: str :param logger: Logger to use :type logger: logging.Logger :param rflogger: [description] :type rflogger: logging.Logger :param client: Multiprocessing Backend to use :type client: str :param evtcat: event Catalogue :type evtcat: obspy.catalog :raises NotImplementedError: For uknowns multiprocessing backends. """os.makedirs(rfloc,exist_ok=True)# Open dsfpattern='%s.%s.h5'%(netrestror'*',statrestror'*')flist=list(glob(os.path.join(rawloc,fpattern)))# Perform processing depending on clientifclient.lower()=='mpi':frommpi4pyimportMPIcomm=MPI.COMM_WORLDrank=comm.Get_rank()psize=comm.Get_size()pmap=(np.arange(len(flist))*psize)/len(flist)pmap=pmap.astype(np.int32)ind=pmap==rankind=np.arange(len(flist))[ind]# get new MPI compatible loggerslogger=create_mpi_logger(logger,rank)rflogger=logging.getLogger("%s.RF"%logger.name)foriiinind:_preprocessh5_single(phase,rot,pol,taper_perc,model,taper_type,tz,ta,rfloc,deconmeth,hc_filt,logger,rflogger,flist[ii],evtcat,remove_response)elifclient.lower()=='joblib':Parallel(n_jobs=-1)(delayed(_preprocessh5_single)(phase,rot,pol,taper_perc,model,taper_type,tz,ta,rfloc,deconmeth,hc_filt,logger,rflogger,f,evtcat,remove_response)forfintqdm(flist))elifclient.lower()=='single':forfintqdm(flist):_preprocessh5_single(phase,rot,pol,taper_perc,model,taper_type,tz,ta,rfloc,deconmeth,hc_filt,logger,rflogger,f,evtcat,remove_response)else:raiseNotImplementedError('Unknown multiprocessing backend %s.'%client)
def_preprocessh5_single(phase:str,rot:str,pol:str,taper_perc:float,model:obspy.taup.TauPyModel,taper_type:str,tz:int,ta:int,rfloc:str,deconmeth:str,hc_filt:float,logger:logging.Logger,rflogger:logging.Logger,hdf5_file:str,evtcat:obspy.Catalog,remove_response:bool):""" Single core processing of one single hdf5 file. .. warning:: Should not be called use :func:`~seismic.waveform.preprocessh5.preprocess_h5`! """f=hdf5_filenet,stat,_=os.path.basename(f).split('.')code='%s.%s'%(net,stat)outf=os.path.join(rfloc,code)# Find out which files have already been processed:ifos.path.isfile(outf+'.h5'):withRFDataBase(outf)asrfdb:ret,rej=rfdb._get_known_waveforms()rflogger.debug('Already processed waveforms: %s'%str(ret))rflogger.debug('\nAlready rejected waveforms: %s'%str(rej))else:ret=[]rej=[]rflogger.info(f'Processing Station {code}')withRawDatabase(f,mode='r')asrdb:# get station inventorytry:inv=rdb.get_response(net,stat)exceptKeyError:logger.exception(f'Could not find station inventory for Station {net}.{stat}')rf=RFStream()# There has to be a smarter way to do this. Only some events# have a corresponding waveform# At least only compute theoretical arrival if the distance is within# thresholds# Which times are available as raw data?t_raw=list(rdb._get_table_of_contents().values())[0]t_raw=[UTCDateTime(t)fortint_raw]t_raw_min=min(t_raw)-600t_raw_max=max(t_raw)+600# c_date = inv[0][0].creation_date# t_date = inv[0][0].termination_dateforevtintqdm(evtcat):# Already processed?ot=(evt.preferred_origin()orevt.origins[0]).timeot_fiss=UTCDateTime(ot).format_fissures()ifot_fissinrejorot_fissinret:rflogger.debug('RF with ot %s already processed.'%ot_fiss)continue# Skip events with no data.ifot<t_raw_minort_raw_max<ot:rflogger.debug(f'No raw data for event {ot_fiss}.')continuetry:toa,rayp,rayp_s_deg,baz,distance=compute_toa(evt,inv[0][0].latitude,inv[0][0].longitude,phase,model)exceptIndexError:rflogger.debug('Phase not viable for epicentral distance')continueexceptValueErrorase:rflogger.debug(e)continuest=rdb.get_data(net,stat,ot)st=st.slice(starttime=toa-tz,endtime=toa+ta)ifst.count()<3:logger.info(f'Only {st.count()} traces found for Station {net}.{stat}'+f'and arrival time {toa}.')continueifst[0].stats.endtime-st[0].stats.starttime<tz+ta-20:logger.warning('Stream shorter than requested time window, skip.')continuetry:rf_temp=__station_process__(st,inv,evt,phase,rot,pol,taper_perc,taper_type,tz,ta,deconmeth,hc_filt,logger,rflogger,net,stat,baz,distance,rayp,rayp_s_deg,toa,rej,ret,remove_response)exceptExceptionase:rflogger.exception('RF Creation failed. Waveform Data:\n'+f'{net}.{stat}.{ot_fiss}\noriginal error:\n'+f'{e}')continueifrf_tempisnotNone:rf.append(rf_temp)# Write regularly to not clutter too much into the RAMifrf.count()>=20:rflogger.info('Writing to file %s....'%outf)withRFDataBase(outf)asrfdb:rfdb.add_rf(rf)rfdb._add_known_waveform_data(ret,rej)rflogger.info('..written.')rf.clear()ifrf.count():rflogger.info('Writing to file %s....'%outf)withRFDataBase(outf)asrfdb:rfdb.add_rf(rf)rfdb._add_known_waveform_data(ret,rej)rflogger.info('..written.')rf.clear()
[docs]defcompute_toa(evt:obspy.core.event.Event,slat:float,slon:float,phase:str,model:obspy.taup.TauPyModel)->Tuple[UTCDateTime,float,float,float]:""" Compute time of theoretical arrival for teleseismic events and a given teleseismic phase at the provided station. :param evt: Event to compute the arrival for. :type evt: obspy.core.event.Event :param slat: station latitude :type slat: float :param slon: station longitude :type slon: float :param phase: The teleseismic phase to consider. :type phase: str :param model: Taupymodel to use :type model: obspy.taup.TauPyModel :return: A Tuple holding: [the time of theoretical arrival (UTC), the apparent slowness in s/km, the ray parameter in s/deg, the back azimuth, the distance between station and event in deg] :rtype: Tuple[UTCDateTime, float, float, float] """origin=evt.preferred_origin()orevt.origins[0]distance,baz,_=gps2dist_azimuth(slat,slon,origin.latitude,origin.longitude)distance=kilometer2degrees(distance/1000)# compute time of first arrival & ray parameterodepth=origin.depthor10000# Some events have no depth information# Throw out events that should not be used for RFsif(constants.maxdepth[phase]andconstants.maxdepth[phase]<odepth/1000) \
ornot(constants.min_epid[phase]<=distance<=constants.max_epid[phase]):raiseValueError(f'Distance {distance} deg or origin depth {odepth}m should not be '+'used for RFs')arrival=model.get_travel_times(source_depth_in_km=odepth/1000,distance_in_degree=distance,phase_list=[phase])[0]rayp_s_deg=arrival.ray_param_sec_degreerayp=rayp_s_deg/111319.9# apparent slownesstoa=origin.time+arrival.timereturntoa,rayp,rayp_s_deg,baz,distance
def__station_process__(st,inv,evt,phase,rot,pol,taper_perc,taper_type,tz,ta,deconmeth,hc_filt,logger,rflogger,net,stat,baz,distance,rayp,rayp_s_deg,toa,rej:List[str],ret:List[str],remove_response:bool):""" Processing that is equal for each waveform recorded on one station """# Is the data already processed?origin=(evt.preferred_origin()orevt.origins[0])ot_fiss=UTCDateTime(origin.time).format_fissures()# Remove repsonseifremove_response:st.attach_response(inv)st.remove_response()# DEMEAN AND DETREND #st.detrend(type='demean')# TAPER #st.taper(max_percentage=taper_perc,type=taper_type,max_length=None,side='both')infodict={}# create RFtry:st,_,infodict=__rotate_qc(phase,st,inv,net,stat,baz,distance,ot_fiss,evt,origin.latitude,origin.longitude,origin.depth,rayp_s_deg,toa,logger,infodict,tz,pol)ifhc_filt:st.filter('lowpass',freq=hc_filt,zerophase=True,corners=2)# Rotate to LQT or PSSifrot=="LQT":st,ia=rotate_LQT_min(st,phase)# additional QCifia<5oria>75:raiseSNRError("The estimated incidence angle is unrealistic with"+'%s degree.'%str(ia))elifrot=="PSS":_,_,st=rotate_PSV(inv[0][0][0].latitude,inv[0][0][0].longitude,rayp,st,phase)# Create RF objectifphase[-1]=="S":trim=[40,0]ifdistance>=70:trim[1]=ta-(-2*distance+180)else:trim[1]=ta-40elifphase[-1]=="P":trim=FalseRF=createRF(st,phase,pol=pol,info=infodict,trim=trim,method=deconmeth)ret.append(ot_fiss)exceptSNRErrorase:rflogger.info(f'{e}{ot_fiss}')rej.append(ot_fiss)returnNoneexceptExceptionase:rflogger.exception('RF Creation failed. Waveform Data:\n'+f'{net}.{stat}.{ot_fiss}\noriginal error:\n'+f'{e}')returnNonereturnRFdef__rotate_qc(phase,st,station_inv,network,station,baz,distance,ot_fiss,event,evtlat,evtlon,depth,rayp_s_deg,first_arrival,logger,infodict,tz,pol):""" REMOVE INSTRUMENT RESPONSE + convert to vel + SIMULATE """st.rotate(method='->ZNE',inventory=station_inv)st.rotate(method='NE->RT',inventory=station_inv,back_azimuth=baz)st.normalize()# Sometimes streams contain more than 3 traces:ifst.count()>3:stream={}fortrinst:stream[tr.stats.component]=trif"Z"instream:st=Stream([stream["Z"],stream["R"],stream["T"]])elif"3"instream:st=Stream([stream["3"],stream["R"],stream["T"]])delstream# SNR CRITERIAdt=st[0].stats.delta# sampling intervalsampling_f=st[0].stats.sampling_rateifphase[-1]=="P":st,crit,f,noisemat=qcp(st,dt,sampling_f,tz)ifnotcrit:infodict['dt']=dtinfodict['sampling_rate']=sampling_finfodict['network']=networkinfodict['station']=stationinfodict['statlat']=station_inv[0][0][0].latitudeinfodict['statlon']=station_inv[0][0][0].longitudeinfodict['statel']=station_inv[0][0][0].elevationraiseSNRError('QC rejected %s'%np.array2string(noisemat))elifphase[-1]=="S":st,crit,f,noisemat=qcs(st,dt,sampling_f,tz)ifnotcrit:infodict['dt']=dtinfodict['sampling_rate']=sampling_finfodict['network']=networkinfodict['station']=stationinfodict['statlat']=station_inv[0][0][0].latitudeinfodict['statlon']=station_inv[0][0][0].longitudeinfodict['statel']=station_inv[0][0][0].elevationraiseSNRError('QC rejected %s'%np.array2string(noisemat))# WRITE AN INFO FILEappend_inf=[['magnitude',(event.preferred_magnitude()orevent.magnitudes[0])['mag']],['magnitude_type',(event.preferred_magnitude()orevent.magnitudes[0])['magnitude_type']],['evtlat',evtlat],['evtlon',evtlon],['ot_ret',ot_fiss],['ot_all',ot_fiss],['evt_depth',depth],['evt_id',event.get('resource_id')],['noisemat',noisemat],['co_f',f],['npts',st[1].stats.npts],['rbaz',baz],['rdelta',distance],['rayp_s_deg',rayp_s_deg],['onset',first_arrival],['starttime',st[0].stats.starttime],['pol',pol]]# Check if values are already in dictforkey,valueinappend_inf:infodict.setdefault(key,[]).append(value)infodict['dt']=dtinfodict['sampling_rate']=sampling_finfodict['network']=networkinfodict['station']=stationinfodict['statlat']=station_inv[0][0][0].latitudeinfodict['statlon']=station_inv[0][0][0].longitudeinfodict['statel']=station_inv[0][0][0].elevationlogger.info(f"Stream accepted {ot_fiss}. Preprocessing successful")returnst,crit,infodict