'''
:copyright:
The PyGLImER development team (makus@gfz-potsdam.de).
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
:author:
Peter Makus (makus@gfz-potsdam.de)
Created: Tuesday, 19th May 2019 8:59:40 pm
Last Modified: Friday, 21st October 2022 03:31:31 pm
'''
import fnmatch
import logging
import os
import shelve
import time
import itertools
from typing import Tuple
import numpy as np
from joblib import Parallel, delayed, cpu_count
import obspy
from obspy import read, read_inventory, Stream, UTCDateTime
from obspy.geodetics import gps2dist_azimuth, kilometer2degrees
from tqdm.std import tqdm
from pyglimer.utils.log import create_mpi_logger
from pyglimer.waveform.preprocessh5 import preprocessh5
from pyglimer.utils.signalproc import resample_or_decimate
from .errorhandler import redownload, redownload_statxml, \
NoMatchingResponseHandler
from .qc import qcp, qcs
from .rotate import rotate_LQT_min, rotate_PSV
from ..rf.create import createRF
from ..utils.roundhalf import roundhalf
from ..utils.utils import dt_string, chunks
[docs]def preprocess(
phase: str, rot: str, pol: str, taper_perc: float,
event_cat: obspy.Catalog, model: obspy.taup.TauPyModel,
taper_type: str, tz: int, ta: int, statloc: str, rawloc: str,
preproloc: str, rfloc: str, deconmeth: str, hc_filt: float or None,
saveasdf: bool = False, netrestr=None, statrestr=None,
client: str = 'joblib', remove_response: bool = True):
"""
Preprocesses waveforms to create receiver functions
1. Clips waveform to the right length
(tz before and ta after theorethical arrival.)
2. Demean & Detrend
3. Tapering
4. Remove Instrument response, convert to velocity &
simulate havard station.
5. Rotation to NEZ and, subsequently, to RTZ.
6. Compute SNR for highpass filtered waveforms
(highpass f defined in qc.lowco).
If SNR lower than in qc.SNR_criteria for all filters,
rejects waveform.
7. Write finished and filtered waveforms to folder
specified in qc.outputloc.
8. Write info file with shelf containing station,
event and waveform information.
Only starts after all waveforms of the event have been
downloaded by download.py.
(checked over the dynamic variables prepro_folder and tmp.folder)
Saves preprocessed waveform files.
Creates info file to save parameters.
Parameters
----------
phase : string
"P" or "S"
rot : string
Coordinate system to cast seismogram in before deconvolution.
Options are "RTZ", "LQT", or "PSS".
pol : string
"h" for Sh or "v" for Sv, only for PRFs.
taper_perc : FLOAT
Percentage to be tapered in beginning and at the end of waveforms.
taper_type : STRING
Taper type (see obspy documentation stream.taper).
event_cat : event catalogue
catalogue containing all events of waveforms.
model : obspy.taup.TauPyModel
1D velocity model to calculate arrival.
tz : int
time window before first arrival in seconds
ta : int
time window after first arrival in seconds
logdir : string, optional
Set the directory to where the download log is saved
loglvl : int, optional
Set logger to this level.
Returns
-------
None.
"""
###########
# logging
logger = logging.getLogger("pyglimer.request.preprocess")
logger.info('\n\n\n...Preprocessing initialiased...\n')
# Logging for RF creation
rflogger = logging.getLogger("pyglimer.request.preprocess.RF")
#########
if saveasdf:
preprocessh5(
phase, rot, pol, taper_perc, model, taper_type, tz, ta,
rawloc, rfloc, deconmeth, hc_filt, netrestr,
statrestr, logger, rflogger, client, event_cat, remove_response)
return
# else:
# Here, we work with all available cores to speed things up
# Split up event catalogue to mitigate the danger of data loss
# Now the infodicts will be written in an even way
# i.e. every 100 events
# Number of cores is usally a power of 2 (therefore 128)
n_split = int(np.ceil(event_cat.count()/128))
# All error handlers rely on download via IRIS webservice.
# However, there is a maximum number for connections (3).
# So I don't really want to flood everything with exceptions.
if cpu_count() > 12:
eh = False
else:
eh = True
# Returns generator object with evtcats with each 100 events
evtcats = chunks(event_cat, n_split)
for evtcat in evtcats:
if client.lower() == 'joblib':
logger.debug('USING JOBLIB AS BACKEND')
out = Parallel(n_jobs=-1, backend='multiprocessing')(
delayed(__event_loop)(
phase, rot, pol, event, taper_perc,
taper_type, model, logger, rflogger, eh, tz,
ta, statloc, rawloc, preproloc, rfloc, deconmeth,
hc_filt, netrestr, statrestr, remove_response,
single_core=False)
for event in tqdm(evtcat))
elif client.lower() == 'mpi':
logger.debug('USING MPI AS BACKEND')
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
psize = comm.Get_size()
pmap = (np.arange(len(evtcat))*psize)/len(evtcat)
pmap = pmap.astype(np.int32)
ind = pmap == rank
ind = np.arange(len(evtcat))[ind]
# get new MPI compatible loggers
logger = create_mpi_logger(logger, rank)
rflogger = logging.getLogger("%s.RF" % logger.name)
# rflogger.debug(f'Rank/Size: {rank}/{psize}')
for ii in tqdm(ind):
__event_loop(
phase, rot, pol, evtcat[ii], taper_perc, taper_type,
model, logger, rflogger, eh, tz, ta, statloc, rawloc,
preproloc, rfloc, deconmeth, hc_filt, netrestr, statrestr,
remove_response)
# Use single core only
elif client.lower() == 'single':
logger.debug('USING SINGLE CORE BACKEND')
out = []
for event in tqdm(evtcat):
out.append(
__event_loop(
phase, rot, pol, event, taper_perc,
taper_type, model, logger, rflogger, eh, tz,
ta, statloc, rawloc, preproloc, rfloc, deconmeth,
hc_filt, netrestr, statrestr, remove_response,
single_core=True))
else:
raise NotImplementedError('Unknown client %s' % client)
# For simultaneous download, dicts are written
# while the processing is happening
# (Not robust for multicore)
# The multicore process returns a list of lists of dictionaries
dictlist = list(itertools.chain.from_iterable(out))
# 1. Write all dictionaries in a "masterdictionary", where the keys
# are Network and station code. Then, extent these subdictionaries
# with the new information from new dictionaries for the same
# station.
masterdict = {}
for d in dictlist:
if not d: # Some of the dictionaries might be empty
continue
try:
net = d['network']
stat = d['station']
key = net + '.' + stat
except (ValueError, KeyError) as e:
logger.exception([e, d])
continue
if key in masterdict:
for k in d:
if type(d[k]) == list:
masterdict[key].setdefault(k, []).extend(d[k])
else:
masterdict[key] = d
# Write dictionaries in the folder
for d in masterdict:
try:
net, stat = d.split('.')
write_info(net, stat, masterdict[d], preproloc)
except (ValueError, KeyError) as e:
logger.exception([e, d])
continue
logger.info("Preprocessing finished.")
def __event_loop(
phase: str, rot: str, pol: str, event: obspy.core.event.Event,
taper_perc: float, taper_type: str, model: obspy.taup.TauPyModel,
logger: logging.Logger, rflogger: logging.Logger, eh: bool, tz: float,
ta: float, statloc: str, rawloc: str, preproloc: str, rfloc: str,
deconmeth: str, hc_filt: float, netrestr: str, statrestr: str,
remove_response: bool, single_core: bool):
"""
Loops over each event in the event catalogue
"""
logger.info('Processing waveforms for event %s.' % event.resource_id)
# create list for what will later be the info files
infolist = []
# fetch event-data
origin = (event.preferred_origin() or event.origins[0])
origin_time = origin.time
ot_fiss = UTCDateTime(origin_time).format_fissures()
evtlat = origin.latitude
evtlon = origin.longitude
depth = origin.depth or 25000 # For very old events this info is empty
# Rounded for filenames
ot_loc = UTCDateTime(origin_time, precision=-1).format_fissures()[:-6]
evtlat_loc = str(roundhalf(evtlat))
evtlon_loc = str(roundhalf(evtlon))
by_event = os.path.join(
preproloc, 'by_event', '%s_%s_%s' % (ot_loc, evtlat_loc, evtlon_loc))
# make folder that will contain softlinks
os.makedirs(by_event, exist_ok=True)
# Folder, in which the preprocessing is actually happening
prepro_folder = os.path.join(
rawloc, '%s_%s_%s' % (ot_loc, evtlat_loc, evtlon_loc))
try: # If one event has no folder it interrupts else
# Remove empty folders in the raw directory
if not os.listdir(prepro_folder):
os.rmdir(prepro_folder)
return infolist # It's important to return empty lists!
except FileNotFoundError:
# If we are not downloading that's entirely normal as
# an earlier iteration just deletes empty directories
pass
return infolist
# Preprocessing just for some stations?
# Then skip files that should not be preprocessed
if netrestr:
pattern = '%s.%s*' % (netrestr, statrestr or '')
files = fnmatch.filter(os.listdir(prepro_folder), pattern)
else:
files = os.listdir(prepro_folder)
for filestr in files:
try:
info = __waveform_loop(
phase, rot, pol, filestr, taper_perc, taper_type,
model, origin_time, ot_fiss, evtlat, evtlon, depth,
prepro_folder, event, logger, rflogger, by_event, eh, tz, ta,
statloc, preproloc, rfloc, deconmeth, hc_filt, remove_response,
single_core)
infolist.append(info)
except Exception as e:
# Unhandled exceptions should not cause the loop to quit
# processing one event
logger.exception([filestr, e])
continue
return infolist
def __waveform_loop(
phase: str, rot: str, pol: str, filestr: str, taper_perc: float,
taper_type: str, model: obspy.taup.TauPyModel, origin_time: UTCDateTime,
ot_fiss: str, evtlat: float, evtlon: float, depth: float,
prepro_folder: str, event: obspy.core.event.event.Event,
logger: logging.Logger, rflogger: logging.Logger, by_event, eh: bool,
tz: float, ta: float, statloc: str, preproloc: str, rfloc: str,
deconmeth: str, hc_filt: float, remove_response: bool,
single_core=bool):
"""
Loops over each waveform for a specific event and a specific station
"""
infodict = {} # empty dictionary that will be dumped in a shelve file
# at the end of the program
start = time.time()
# Open files that should be processed
try:
st = read(os.path.join(prepro_folder, filestr))
except FileNotFoundError: # file has not been downloaded yet
return # I will still want to have the RFs
except Exception as e: # Unknown erros
logger.exception([prepro_folder, filestr, e])
return
station = st[0].stats.station
network = st[0].stats.network
# Location definitions
outdir = os.path.join(preproloc, 'by_station', network, station)
# Info file
infof = os.path.join(outdir, 'info')
ot_loc = UTCDateTime(origin_time, precision=-1).format_fissures()[:-6]
outf = os.path.join(outdir, '%s.%s.%s.mseed' % (network, station, ot_loc))
statfile = os.path.join(statloc, '%s.%s.xml' % (network, station))
# Create directory for preprocessed file
os.makedirs(outdir, exist_ok=True)
# If the file hasn't been downloaded and preprocessed
# in an earlier iteration of the program
if not __file_in_db(outdir, 'info.dat') or ot_fiss not in \
shelve.open(infof, flag='r')['ot_all']:
crit = False # criterion to retain
try: # From here on, all exceptions are logged
try:
station_inv = read_inventory(statfile, format="STATIONXML")
except FileNotFoundError:
if eh:
station_inv = redownload_statxml(
st, network, station, statfile)
else:
raise FileNotFoundError(
"Station XML not available for station %s.%s" % (
network, station))
# compute theoretical arrival
distance, baz, _ = gps2dist_azimuth(
station_inv[0][0].latitude, station_inv[0][0].longitude,
evtlat, evtlon)
distance = kilometer2degrees(distance/1000)
# compute time of first arrival & ray parameter
arrival = model.get_travel_times(
source_depth_in_km=depth/1000, distance_in_degree=distance,
phase_list=[phase])[0]
rayp_s_deg = arrival.ray_param_sec_degree
rayp = rayp_s_deg / 111319.9 # apparent slowness
first_arrival = origin_time + arrival.time
end = time.time()
logger.debug("Before cut and resample")
logger.debug(dt_string(end-start))
# Check if step is already done
if st[0].stats.sampling_rate != 10:
st = __cut_resample(
st, logger, first_arrival, network, station, prepro_folder,
filestr, taper_perc, taper_type, eh, tz, ta)
# Finalise preprocessing
st, crit, infodict = __rotate_qc(
phase, st, station_inv, network, station, baz,
distance, outf, ot_fiss, event, evtlat, evtlon, depth,
rayp_s_deg, first_arrival, infof, logger, infodict, by_event,
eh, tz, statloc, remove_response)
# Exceptions & logging
except SNRError as e: # QR rejections
logger.debug(
[filestr, "QC was not met, SNR ratios are", e])
infodict['dt'] = st[0].stats.delta
infodict['sampling_rate'] = st[0].stats.sampling_rate
infodict['network'] = network
infodict['station'] = station
infodict['statlat'] = station_inv[0][0][0].latitude
infodict['statlon'] = station_inv[0][0][0].longitude
infodict['statel'] = station_inv[0][0][0].elevation
if __file_in_db(outdir, 'info.dat'):
with shelve.open(infof, flag='r') as info:
if 'ot_all' not in info or ot_fiss not in info['ot_all']:
# Don't count rejected events twice
infodict.setdefault('ot_all', []).append(ot_fiss)
else:
infodict.setdefault('ot_all', []).append(ot_fiss)
if single_core:
write_info(network, station, infodict)
return
return infodict
except StreamLengthError as e:
logger.debug([filestr, e])
# Everything else that might have gone wrong
except Exception as e:
logger.exception([prepro_folder, filestr, e])
finally:
end = time.time()
logger.info(
"File preprocessed.\n"
+ "Station %s.%s\n" % (network, station)
+ "Event %s" % event.resource_id)
logger.debug(dt_string(end-start))
else: # The file was already processed
# Did it pass QC?
with shelve.open(infof, flag='r') as info:
if "ot_ret" not in info or ot_fiss not in info["ot_ret"]:
return
else:
crit = True
st = read(outf)
station_inv = read_inventory(statfile)
j = info["ot_ret"].index(ot_fiss)
rayp = info["rayp_s_deg"][j] / 111319.9
distance = info["rdelta"][j]
# CREATE RF + ROTATION TO PSS / LQT #
# Check if RF was already computed and if it should be
# computed at all, and if the waveform was retained (SNR)
if deconmeth and not __file_in_db(
os.path.join(rfloc, network, station), '%s.%s.%s.sac' % (
network, station, ot_loc)) and crit:
# 21.04.2020 Second highcut filter
if hc_filt:
st.filter('lowpass', freq=hc_filt, zerophase=True, corners=2)
start = time.time()
####
try:
# Rotate to LQT or PSS
if rot == "LQT":
st, ia = rotate_LQT_min(st, phase)
# additional QC
if ia < 5 or ia > 75:
crit = False
raise SNRError(
"The estimated incidence angle is "
+ "unrealistic with %s degree." % str(ia))
elif rot == "PSS":
_, _, st = rotate_PSV(
station_inv[0][0][0].latitude,
station_inv[0][0][0].longitude,
rayp, st, phase)
# Create RF object
if phase[-1] == "S":
trim = [40, 0]
if distance >= 70:
trim[1] = ta - (-2*distance + 180)
else:
trim[1] = ta - 40
elif phase[-1] == "P":
trim = False
if not infodict:
info = shelve.open(infof, flag='r')
RF = createRF(
st, phase, pol=pol, info=info, trim=trim, method=deconmeth)
else:
RF = createRF(
st, phase, pol=pol, info=infodict, trim=trim,
method=deconmeth)
# Write RF
rfdir = os.path.join(rfloc, network, station)
if pol == 'h' and phase == 'P':
rfdir = rfdir + pol
os.makedirs(rfdir, exist_ok=True)
RF.write(os.path.join(
rfdir, '%s.%s.%s.sac' % (network, station, ot_loc)),
format='SAC')
end = time.time()
rflogger.info(
"RF created.\n"
+ 'Station %s.%s\n' % (network, station)
+ 'Event: %s' % event.resource_id)
rflogger.debug(dt_string(end-start))
# Exception that occured in the RF creation
# Usually don't happen
except SNRError as e:
rflogger.info(e)
except Exception as e:
print("RF creation failed")
rflogger.exception([network, station, ot_loc, e])
finally:
if single_core:
# Single-core case
write_info(network, station, infodict, preproloc)
# just to save RAM - not needed for single-core
infodict = None
else: # The multicore case
return infodict
def __cut_resample(
st: obspy.Stream, logger: logging.Logger, first_arrival: UTCDateTime,
network: str, station: str, prepro_folder: str, filestr: str,
taper_perc: float, taper_type: str, eh: bool, tz: float,
ta: float) -> obspy.Stream:
"""Cut and resample raw file. Will overwrite original raw"""
start = time.time()
# Trim TO RIGHT LENGTH BEFORE AND AFTER FIRST ARRIVAL #
# start and endtime of stream
starttime = first_arrival - tz
endtime = first_arrival + ta
if st.count() < 3:
if not eh:
raise StreamLengthError(
["The stream contains less than three traces.", filestr])
st = redownload(network, station, starttime, endtime, st)
# Check one last time. If stream to short raise Exception
if st.count() < 3:
raise StreamLengthError(
["The stream contains less than 3 traces", filestr])
# Change dtype
for tr in st:
np.require(tr.data, dtype=np.float64)
tr.stats.mseed.encoding = 'FLOAT64'
# trim to according length
# Anti-Alias filtering is now done within the function below
st = resample_or_decimate(st, 10)
st.trim(starttime=starttime, endtime=endtime)
# After trimming length has to be checked again (recording may
# be empty now)
if st.count() < 3:
raise Exception("The stream contains less than 3 traces")
# DEMEAN AND DETREND #
st.detrend(type='demean')
# TAPER #
st.taper(max_percentage=taper_perc, type=taper_type,
max_length=None, side='both')
# Write trimmed and resampled files into raw-file folder
# to save space
try:
st.write(os.path.join(prepro_folder, filestr), format="MSEED")
except ValueError:
# Occurs for dtype=int32
for tr in st:
del tr.stats.mseed
st.write(
os.path.join(prepro_folder, filestr), format="MSEED")
end = time.time()
logger.debug("Unprocessed file rewritten")
logger.debug(dt_string(end - start))
return st
def __rotate_qc(
phase: str, st: obspy.Stream, station_inv: obspy.Inventory, network: str,
station: str, baz: float, distance: float, outf: str, ot_fiss: str,
event: obspy.core.event.event.Event, evtlat: float, evtlon: float,
depth: float, rayp_s_deg: float, first_arrival: UTCDateTime, infof: str,
logger: logging.Logger, infodict: dict, by_event, eh: bool, tz: float,
statloc: str, remove_response: bool) -> Tuple[Stream, bool, dict]:
"""REMOVE INSTRUMENT RESPONSE + convert to vel + SIMULATE
Bugs occur here due to station inventories without response information
Looks like the bulk downloader sometimes donwnloads
station inventories without response files. I could fix that here by
redownloading the response file (alike to the 3 traces problem)"""
if remove_response:
try:
st.attach_response(station_inv)
st.remove_response()
except ValueError:
# Occurs for "No matching response file found"
if eh:
station_inv, st = NoMatchingResponseHandler(
st, network, station, statloc)
if not eh or not station_inv:
raise ValueError(
["No matching response file found for", network, station])
st.rotate(method='->ZNE', inventory=station_inv)
st.rotate(method='NE->RT', inventory=station_inv,
back_azimuth=baz)
st.normalize()
# Sometimes streams contain more than 3 traces:
if st.count() > 3:
stream = {}
for tr in st:
stream[tr.stats.component] = tr
if "Z" in stream:
st = Stream([stream["Z"], stream["R"], stream["T"]])
elif "3" in stream:
st = Stream([stream["3"], stream["R"], stream["T"]])
del stream
# SNR CRITERIA
dt = st[0].stats.delta # sampling interval
sampling_f = st[0].stats.sampling_rate
if phase[-1] == "P":
st, crit, f, noisemat = qcp(st, dt, sampling_f, tz)
if not crit:
infodict['dt'] = dt
infodict['sampling_rate'] = sampling_f
infodict['network'] = network
infodict['station'] = station
infodict['statlat'] = station_inv[0][0][0].latitude
infodict['statlon'] = station_inv[0][0][0].longitude
infodict['statel'] = station_inv[0][0][0].elevation
raise SNRError('QC rejected %s' % np.array2string(noisemat))
elif phase[-1] == "S":
st, crit, f, noisemat = qcs(st, dt, sampling_f, tz)
if not crit:
infodict['dt'] = dt
infodict['sampling_rate'] = sampling_f
infodict['network'] = network
infodict['station'] = station
infodict['statlat'] = station_inv[0][0][0].latitude
infodict['statlon'] = station_inv[0][0][0].longitude
infodict['statel'] = station_inv[0][0][0].elevation
raise SNRError('QC rejected %s' % np.array2string(noisemat))
# WRITE FILES #
try:
st.write(outf, format="MSEED")
except ValueError:
# Occurs for weird mseed encodings
for tr in st:
del tr.stats.mseed
st.write(outf, format="MSEED", encoding="ASCII")
# create softlink
try:
os.symlink(outf, by_event)
except FileExistsError:
pass
# WRITE AN INFO FILE
# append_info: [key,value]
append_inf = [
['magnitude', (
event.preferred_magnitude() or event.magnitudes[0])['mag']],
['magnitude_type', (
event.preferred_magnitude() or event.magnitudes[0])[
'magnitude_type']],
['evtlat', evtlat], ['evtlon', evtlon],
['ot_ret', ot_fiss], ['ot_all', ot_fiss],
['evt_depth', depth],
['evt_id', event.get('resource_id')],
['noisemat', noisemat],
['co_f', f], ['npts', st[1].stats.npts],
['rbaz', baz],
['rdelta', distance],
['rayp_s_deg', rayp_s_deg],
['onset', first_arrival],
['starttime', st[0].stats.starttime]]
# Check if values are already in dict
for key, value in append_inf:
infodict.setdefault(key, []).append(value)
infodict['dt'] = dt
infodict['sampling_rate'] = sampling_f
infodict['network'] = network
infodict['station'] = station
infodict['statlat'] = station_inv[0][0][0].latitude
infodict['statlon'] = station_inv[0][0][0].longitude
infodict['statel'] = station_inv[0][0][0].elevation
logger.info("Stream accepted. Preprocessing successful")
return st, crit, infodict
def __file_in_db(loc: str, filename: str) -> bool:
"""Checks if file "filename" is already in location "loc"."""
if os.path.isfile(os.path.join(loc, filename)):
return True
else:
return False
[docs]def write_info(network: str, station: str, dictionary: dict, preproloc: str):
"""
Writes information dictionary in shelve format in each of the station
folders.
Parameters
----------
network : str
Network Code.
station : str
Station code.
dictionary : dict
Dictionary containing the information.
Returns
-------
None.
"""
loc = os.path.join(preproloc, 'by_station', network, station)
infof = os.path.join(loc, 'info')
if not __file_in_db(loc, 'info.dat'):
with shelve.open(os.path.join(loc, 'info'), writeback=True) as info:
info.update(dictionary)
info['num'] = len(info['ot_all'])
if 'ot_ret' in info:
info['numret'] = len(info['ot_ret'])
info.sync()
else:
if 'ot_ret' in dictionary:
append_inf = ['magnitude', 'magnitude_type', 'evtlat',
'evtlon', 'ot_ret', 'ot_all', 'evt_depth',
'evt_id', 'noisemat', 'co_f', 'npts', 'rbaz',
'rdelta', 'rayp_s_deg', 'onset', 'starttime']
with shelve.open(infof, writeback=True) as info:
for key in append_inf:
info.setdefault(key, []).extend(dictionary[key])
info['num'] = len(info['ot_all'])
info['numret'] = len(info['ot_ret'])
info.sync()
else:
with shelve.open(infof, writeback=True) as info:
info.setdefault('ot_all', []).extend(dictionary['ot_all'])
info['num'] = len(info['ot_all'])
info.sync()
# program-specific Exceptions
[docs]class SNRError(Exception):
"""raised when the SNR is too high"""
# Constructor method
def __init__(self, value):
self.value = value
# __str__ display function
def __str__(self):
return repr(self.value)
[docs]class StreamLengthError(Exception):
"""raised when stream has fewer than 3 components"""
# Constructor method
def __init__(self, value):
self.value = value
# __str__ display function
def __str__(self):
return repr(self.value)