Source code for pyglimer.waveform.preprocessh5

'''
This is a newer version of preprocess.py meant to be used with pyasdf.
Now, we will have to work in a very different manner than for .mseed files
and process files station wise rather than event wise.

:copyright:
   The PyGLImER development team (makus@gfz-potsdam.de).
:license:
   GNU Lesser General Public License, Version 3
   (https://www.gnu.org/copyleft/lesser.html)
:author:
    Peter Makus (makus@gfz-potsdam.de)

Created: Thursday, 18th February 2021 02:26:03 pm
Last Modified: Tuesday, 25th October 2022 12:26:09 pm
'''

from glob import glob
import logging
import os
from typing import List, Tuple

import numpy as np
from joblib import Parallel, delayed
import obspy
from obspy import Stream, UTCDateTime
from obspy.geodetics import gps2dist_azimuth, kilometer2degrees
from tqdm.std import tqdm

from pyglimer import constants
from pyglimer.database.rfh5 import RFDataBase
from pyglimer.database.raw import RawDatabase
from pyglimer.utils.log import create_mpi_logger
from .qc import qcp, qcs
from .rotate import rotate_LQT_min, rotate_PSV
from ..rf.create import RFStream, createRF


# program-specific Exceptions
[docs]class SNRError(Exception): """raised when the SNR is too high""" # Constructor method def __init__(self, value): self.value = value # __str__ display function def __str__(self): return repr(self.value)
[docs]class StreamLengthError(Exception): """raised when stream has fewer than 3 components""" # Constructor method def __init__(self, value): self.value = value # __str__ display function def __str__(self): return repr(self.value)
[docs]def preprocessh5( phase: str, rot: str, pol: str, taper_perc: float, model: obspy.taup.TauPyModel, taper_type: str, tz: int, ta: int, rawloc: str, rfloc: str, deconmeth: str, hc_filt: float, netrestr: str, statrestr: str, logger: logging.Logger, rflogger: logging.Logger, client: str, evtcat: obspy.Catalog, remove_response: bool): """ Preprocess files saved in hdf5 (pyasdf) format. Will save the computed receiver functions in hdf5 format as well. Processing is done via a multiprocessing backend (either joblib or mpi). :param phase: The Teleseismic phase to consider :type phase: str :param rot: The Coordinate system that the seismogram should be rotated to. :type rot: str :param pol: Polarisationfor PRFs. Can be either 'v' or 'h' (vertical or horizontal). :type pol: str :param taper_perc: Percentage for the pre deconvolution taper. :type taper_perc: float :param model: TauPyModel to be used for travel time computations :type model: obspy.taup.TauPyModel :param taper_type: type of taper (see obspy) :type taper_type: str :param tz: Length of time window before theoretical arrival (seconds) :type tz: int :param ta: Length of time window after theoretical arrival (seconds) :type ta: int :param rawloc: Directory, in which the raw data is saved. :type rawloc: str :param rfloc: Directory to save the receiver functions in. :type rfloc: str :param deconmeth: Deconvolution method to use. :type deconmeth: str :param hc_filt: Second High-Cut filter (optional, can be None or False) :type hc_filt: float :param netrestr: Network restrictions :type netrestr: str :param statrestr: Station restrictions :type statrestr: str :param logger: Logger to use :type logger: logging.Logger :param rflogger: [description] :type rflogger: logging.Logger :param client: Multiprocessing Backend to use :type client: str :param evtcat: event Catalogue :type evtcat: obspy.catalog :raises NotImplementedError: For uknowns multiprocessing backends. """ os.makedirs(rfloc, exist_ok=True) # Open ds fpattern = '%s.%s.h5' % (netrestr or '*', statrestr or '*') flist = list(glob(os.path.join(rawloc, fpattern))) # Perform processing depending on client if client.lower() == 'mpi': from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() psize = comm.Get_size() pmap = (np.arange(len(flist))*psize)/len(flist) pmap = pmap.astype(np.int32) ind = pmap == rank ind = np.arange(len(flist))[ind] # get new MPI compatible loggers logger = create_mpi_logger(logger, rank) rflogger = logging.getLogger("%s.RF" % logger.name) for ii in ind: _preprocessh5_single( phase, rot, pol, taper_perc, model, taper_type, tz, ta, rfloc, deconmeth, hc_filt, logger, rflogger, flist[ii], evtcat, remove_response) elif client.lower() == 'joblib': Parallel(n_jobs=-1)(delayed(_preprocessh5_single)( phase, rot, pol, taper_perc, model, taper_type, tz, ta, rfloc, deconmeth, hc_filt, logger, rflogger, f, evtcat, remove_response) for f in tqdm(flist)) elif client.lower() == 'single': for f in tqdm(flist): _preprocessh5_single( phase, rot, pol, taper_perc, model, taper_type, tz, ta, rfloc, deconmeth, hc_filt, logger, rflogger, f, evtcat, remove_response) else: raise NotImplementedError( 'Unknown multiprocessing backend %s.' % client )
def _preprocessh5_single( phase: str, rot: str, pol: str, taper_perc: float, model: obspy.taup.TauPyModel, taper_type: str, tz: int, ta: int, rfloc: str, deconmeth: str, hc_filt: float, logger: logging.Logger, rflogger: logging.Logger, hdf5_file: str, evtcat: obspy.Catalog, remove_response: bool): """ Single core processing of one single hdf5 file. .. warning:: Should not be called use :func:`~seismic.waveform.preprocessh5.preprocess_h5`! """ f = hdf5_file net, stat, _ = os.path.basename(f).split('.') code = '%s.%s' % (net, stat) outf = os.path.join(rfloc, code) # Find out which files have already been processed: if os.path.isfile(outf+'.h5'): with RFDataBase(outf) as rfdb: ret, rej = rfdb._get_known_waveforms() rflogger.debug('Already processed waveforms: %s' % str(ret)) rflogger.debug('\nAlready rejected waveforms: %s' % str(rej)) else: ret = [] rej = [] rflogger.info(f'Processing Station {code}') with RawDatabase(f, mode='r') as rdb: # get station inventory try: inv = rdb.get_response(net, stat) except KeyError: logger.exception( f'Could not find station inventory for Station {net}.{stat}') rf = RFStream() # There has to be a smarter way to do this. Only some events # have a corresponding waveform # At least only compute theoretical arrival if the distance is within # thresholds # Which times are available as raw data? t_raw = list(rdb._get_table_of_contents().values())[0] t_raw = [UTCDateTime(t) for t in t_raw] t_raw_min = min(t_raw) - 600 t_raw_max = max(t_raw) + 600 # c_date = inv[0][0].creation_date # t_date = inv[0][0].termination_date for evt in tqdm(evtcat): # Already processed? ot = (evt.preferred_origin() or evt.origins[0]).time ot_fiss = UTCDateTime(ot).format_fissures() if ot_fiss in rej or ot_fiss in ret: rflogger.debug('RF with ot %s already processed.' % ot_fiss) continue # Skip events with no data. if ot < t_raw_min or t_raw_max < ot: rflogger.debug(f'No raw data for event {ot_fiss}.') continue try: toa, rayp, rayp_s_deg, baz, distance = compute_toa( evt, inv[0][0].latitude, inv[0][0].longitude, phase, model) except IndexError: rflogger.debug('Phase not viable for epicentral distance') continue except ValueError as e: rflogger.debug(e) continue st = rdb.get_data(net, stat, ot) st = st.slice(starttime=toa-tz, endtime=toa+ta) if st.count() < 3: logger.info( f'Only {st.count()} traces found for Station {net}.{stat}' + f'and arrival time {toa}.') continue if st[0].stats.endtime - st[0].stats.starttime < tz+ta-20: logger.warning( 'Stream shorter than requested time window, skip.') continue try: rf_temp = __station_process__( st, inv, evt, phase, rot, pol, taper_perc, taper_type, tz, ta, deconmeth, hc_filt, logger, rflogger, net, stat, baz, distance, rayp, rayp_s_deg, toa, rej, ret, remove_response) except Exception as e: rflogger.exception( 'RF Creation failed. Waveform Data:\n' + f'{net}.{stat}.{ot_fiss}\noriginal error:\n' + f'{e}') continue if rf_temp is not None: rf.append(rf_temp) # Write regularly to not clutter too much into the RAM if rf.count() >= 20: rflogger.info('Writing to file %s....' % outf) with RFDataBase(outf) as rfdb: rfdb.add_rf(rf) rfdb._add_known_waveform_data(ret, rej) rflogger.info('..written.') rf.clear() if rf.count(): rflogger.info('Writing to file %s....' % outf) with RFDataBase(outf) as rfdb: rfdb.add_rf(rf) rfdb._add_known_waveform_data(ret, rej) rflogger.info('..written.') rf.clear()
[docs]def compute_toa( evt: obspy.core.event.Event, slat: float, slon: float, phase: str, model: obspy.taup.TauPyModel) -> Tuple[ UTCDateTime, float, float, float]: """ Compute time of theoretical arrival for teleseismic events and a given teleseismic phase at the provided station. :param evt: Event to compute the arrival for. :type evt: obspy.core.event.Event :param slat: station latitude :type slat: float :param slon: station longitude :type slon: float :param phase: The teleseismic phase to consider. :type phase: str :param model: Taupymodel to use :type model: obspy.taup.TauPyModel :return: A Tuple holding: [the time of theoretical arrival (UTC), the apparent slowness in s/km, the ray parameter in s/deg, the back azimuth, the distance between station and event in deg] :rtype: Tuple[UTCDateTime, float, float, float] """ origin = evt.preferred_origin() or evt.origins[0] distance, baz, _ = gps2dist_azimuth( slat, slon, origin.latitude, origin.longitude) distance = kilometer2degrees(distance/1000) # compute time of first arrival & ray parameter odepth = origin.depth or 10000 # Some events have no depth information # Throw out events that should not be used for RFs if (constants.maxdepth[phase] and constants.maxdepth[phase] < odepth/1000) \ or not ( constants.min_epid[phase] <= distance <= constants.max_epid[phase]): raise ValueError( f'Distance {distance} deg or origin depth {odepth}m should not be ' + 'used for RFs') arrival = model.get_travel_times( source_depth_in_km=odepth / 1000, distance_in_degree=distance, phase_list=[phase])[0] rayp_s_deg = arrival.ray_param_sec_degree rayp = rayp_s_deg / 111319.9 # apparent slowness toa = origin.time + arrival.time return toa, rayp, rayp_s_deg, baz, distance
def __station_process__( st, inv, evt, phase, rot, pol, taper_perc, taper_type, tz, ta, deconmeth, hc_filt, logger, rflogger, net, stat, baz, distance, rayp, rayp_s_deg, toa, rej: List[str], ret: List[str], remove_response: bool): """ Processing that is equal for each waveform recorded on one station """ # Is the data already processed? origin = (evt.preferred_origin() or evt.origins[0]) ot_fiss = UTCDateTime(origin.time).format_fissures() # Remove repsonse if remove_response: st.attach_response(inv) st.remove_response() # DEMEAN AND DETREND # st.detrend(type='demean') # TAPER # st.taper( max_percentage=taper_perc, type=taper_type, max_length=None, side='both') infodict = {} # create RF try: st, _, infodict = __rotate_qc( phase, st, inv, net, stat, baz, distance, ot_fiss, evt, origin.latitude, origin.longitude, origin.depth, rayp_s_deg, toa, logger, infodict, tz, pol) if hc_filt: st.filter('lowpass', freq=hc_filt, zerophase=True, corners=2) # Rotate to LQT or PSS if rot == "LQT": st, ia = rotate_LQT_min(st, phase) # additional QC if ia < 5 or ia > 75: raise SNRError( "The estimated incidence angle is unrealistic with" + '%s degree.' % str(ia)) elif rot == "PSS": _, _, st = rotate_PSV( inv[0][0][0].latitude, inv[0][0][0].longitude, rayp, st, phase) # Create RF object if phase[-1] == "S": trim = [40, 0] if distance >= 70: trim[1] = ta - (-2*distance + 180) else: trim[1] = ta - 40 elif phase[-1] == "P": trim = False RF = createRF( st, phase, pol=pol, info=infodict, trim=trim, method=deconmeth) ret.append(ot_fiss) except SNRError as e: rflogger.info(f'{e} {ot_fiss}') rej.append(ot_fiss) return None except Exception as e: rflogger.exception( 'RF Creation failed. Waveform Data:\n' + f'{net}.{stat}.{ot_fiss}\noriginal error:\n' + f'{e}') return None return RF def __rotate_qc( phase, st, station_inv, network, station, baz, distance, ot_fiss, event, evtlat, evtlon, depth, rayp_s_deg, first_arrival, logger, infodict, tz, pol): """ REMOVE INSTRUMENT RESPONSE + convert to vel + SIMULATE """ st.rotate(method='->ZNE', inventory=station_inv) st.rotate(method='NE->RT', inventory=station_inv, back_azimuth=baz) st.normalize() # Sometimes streams contain more than 3 traces: if st.count() > 3: stream = {} for tr in st: stream[tr.stats.component] = tr if "Z" in stream: st = Stream([stream["Z"], stream["R"], stream["T"]]) elif "3" in stream: st = Stream([stream["3"], stream["R"], stream["T"]]) del stream # SNR CRITERIA dt = st[0].stats.delta # sampling interval sampling_f = st[0].stats.sampling_rate if phase[-1] == "P": st, crit, f, noisemat = qcp(st, dt, sampling_f, tz) if not crit: infodict['dt'] = dt infodict['sampling_rate'] = sampling_f infodict['network'] = network infodict['station'] = station infodict['statlat'] = station_inv[0][0][0].latitude infodict['statlon'] = station_inv[0][0][0].longitude infodict['statel'] = station_inv[0][0][0].elevation raise SNRError('QC rejected %s' % np.array2string(noisemat)) elif phase[-1] == "S": st, crit, f, noisemat = qcs(st, dt, sampling_f, tz) if not crit: infodict['dt'] = dt infodict['sampling_rate'] = sampling_f infodict['network'] = network infodict['station'] = station infodict['statlat'] = station_inv[0][0][0].latitude infodict['statlon'] = station_inv[0][0][0].longitude infodict['statel'] = station_inv[0][0][0].elevation raise SNRError('QC rejected %s' % np.array2string(noisemat)) # WRITE AN INFO FILE append_inf = [ ['magnitude', ( event.preferred_magnitude() or event.magnitudes[0])['mag']], ['magnitude_type', ( event.preferred_magnitude() or event.magnitudes[0])[ 'magnitude_type']], ['evtlat', evtlat], ['evtlon', evtlon], ['ot_ret', ot_fiss], ['ot_all', ot_fiss], ['evt_depth', depth], ['evt_id', event.get('resource_id')], ['noisemat', noisemat], ['co_f', f], ['npts', st[1].stats.npts], ['rbaz', baz], ['rdelta', distance], ['rayp_s_deg', rayp_s_deg], ['onset', first_arrival], ['starttime', st[0].stats.starttime], ['pol', pol]] # Check if values are already in dict for key, value in append_inf: infodict.setdefault(key, []).append(value) infodict['dt'] = dt infodict['sampling_rate'] = sampling_f infodict['network'] = network infodict['station'] = station infodict['statlat'] = station_inv[0][0][0].latitude infodict['statlon'] = station_inv[0][0][0].longitude infodict['statel'] = station_inv[0][0][0].elevation logger.info(f"Stream accepted {ot_fiss}. Preprocessing successful") return st, crit, infodict