Source code for pyglimer.waveform.qc

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Contains quality control for waveforms used for receiver function creation.

:copyright:
   The PyGLImER development team (makus@gfz-potsdam.de).
:license:
   GNU Lesser General Public License, Version 3
   (https://www.gnu.org/copyleft/lesser.html)
:author:
    Peter Makus (makus@gfz-potsdam.de)

Created: Friday, 10th April 2020 11:38:40 am
Last Modified: Wednesday, 7th September 2022 06:08:38 pm
'''

import numpy as np
from obspy.signal import filter
from obspy import Stream

# QC parameters

# filter frequencies for SNR check, all in Hz
lowco = [.03, .1, .5]  # only for PRF

lowcoS = .01
highco = np.linspace(.33, .175, 4)  # For SRFs, 16.06.2020 from .175 to .25 Hz

# SNR criteria for QC
SNR_criteriaP = [7.5, 1, 10]  # [snrr, snrr2/snrr, snrz]

SNR_criteriaS = [7, .7, 1]
# [primary/noise, sidelobe/primary, r/z conversions]


[docs]def qcp(st: Stream, dt: float, sampling_f: float, onset: float) -> tuple: """ Quality control for the downloaded waveforms that are used to create PRFS. Works with various filters and SNR criteria Parameters ---------- st : '~obspy.Stream' Input stream. dt : FLOAT Sampling interval [s]. sampling_f : FLOAT Sampling frequency (Hz). onset : float Onset in seconds after trace's start. Returns ------- st : '~obspy.Stream' Output stream. If stream was accepted, then this will contain the filtered stream, filtered with the broadest accepted filter. crit : BOOL True if stream was accepted, false if it wasn't. f : FLOAT Last used low-cut frequency. If crit=True, this will be the frequency for which the stream was accepted. noisemat : np.array SNR values in form of a matrix. Rows represent the different filters and columns the different criteria. """ # Create stream dict to identify channels stream = {} for tr in st: stream[tr.stats.component] = tr.data ptn1 = round(5/dt) ptn2 = round((onset-5)/dt) nptn = ptn2-ptn1+1 # First part of the signal pts1 = round(onset/dt) pts2 = round((onset+7.5)/dt) npts = pts2-pts1+1 # Second part of the signal ptp1 = round((onset+15)/dt) ptp2 = round((onset+22.5)/dt) nptp = ptp2-ptp1+1 # Then, I'll have to filter in the for-loop # and calculate the SNR # matrix to save SNR noisemat = np.zeros((len(lowco), 3), dtype=float) for ii, f in enumerate(lowco): ftcomp = filter.bandpass(stream["T"], f, 4.99, sampling_f, corners=2, zerophase=True) frcomp = filter.bandpass(stream["R"], f, 4.99, sampling_f, corners=2, zerophase=True) if "Z" in stream: fzcomp = filter.bandpass(stream["Z"], f, 4.99, sampling_f, corners=2, zerophase=True) elif "3" in stream: fzcomp = filter.bandpass(stream["3"], f, 4.99, sampling_f, corners=2, zerophase=True) # Compute the SNR for given frequency bands snrr = (sum(np.square(frcomp[pts1:pts2]))/npts)\ / (sum(np.square(frcomp[ptn1:ptn2]))/nptn) snrr2 = (sum(np.square(frcomp[ptp1:ptp2]))/nptp)\ / (sum(np.square(frcomp[ptn1:ptn2]))/nptn) snrz = (sum(np.square(fzcomp[pts1:pts2]))/npts)\ / (sum(np.square(fzcomp[ptn1:ptn2]))/nptn) # Reject or accept traces depending on their SNR # #1: snr1 > 10 (30-37.5s, near P) # snr2/snr1 < 1 (where snr2 is 45-52.5 s, in the coda of P) # note: - next possibility might be to remove those events that # generate high snr between 200-250 s noisemat[ii, 0] = snrr noisemat[ii, 1] = snrr2/snrr noisemat[ii, 2] = snrz if snrr > SNR_criteriaP[0] and\ snrr2/snrr < SNR_criteriaP[1] and\ snrz > SNR_criteriaP[2]: # accept crit = True # overwrite the old traces with the sucessfully filtered ones for tr in st: if tr.stats.component == "R": tr.data = frcomp elif tr.stats.component in ('Z', '3'): tr.data = fzcomp elif tr.stats.component == "T": tr.data = ftcomp break # waveform is accepted else: crit = False return st, crit, f, noisemat
[docs]def qcs(st: Stream, dt: float, sampling_f: float, onset: float) -> tuple: """ Quality control for waveforms that are used to produce SRF. In contrast to the ones used for PRF this is a very rigid criterion and will reject >95% of the waveforms. Parameters ---------- st : '~obspy.Stream' Input stream. dt : FLOAT Sampling interval [s]. sampling_f : FLOAT Sampling frequency (Hz). onset : float Onset in seconds after trace's start. Returns ------- st : '~obspy.Stream' Output stream. If stream was accepted, then this will contain the filtered stream, filtered with the broadest accepted filter. crit : BOOL True if stream was accepted, false if it wasn't. f : FLOAT Last used high-cut frequency. If crit=True, this will be the frequency for which the stream was accepted. noisemat : np.array SNR values in form of a matrix. Rows represent the different filters and columns the different criteria. """ # Create stream dict to identify channels stream = {} for tr in st: stream[tr.stats.component] = tr.data ptn1 = round(5/dt) # Hopefully relatively silent time ptn2 = round((onset-5)/dt) nptn = ptn2-ptn1 # First part of the signal pts1 = round(onset/dt) # theoretical arrival time pts2 = round((onset+10)/dt) npts = pts2-pts1 # Second part of the signal ptp1 = round((onset+10)/dt) ptp2 = round((onset+35)/dt) nptp = ptp2-ptp1 # part where the Sp converted arrival should be strong ptc1 = round((onset-50)/dt) ptc2 = round((onset-10)/dt) nptc = ptc2-ptc1 # filter # matrix to save SNR noisemat = np.zeros((len(highco), 3), dtype=float) # Filter # At some point, I might also want to consider to change the lowcof for ii, hf in enumerate(highco): ftcomp = filter.bandpass(stream["T"], lowcoS, hf, sampling_f, corners=2, zerophase=True) frcomp = filter.bandpass(stream["R"], lowcoS, hf, sampling_f, corners=2, zerophase=True) if "Z" in stream: fzcomp = filter.bandpass(stream["Z"], lowcoS, hf, sampling_f, corners=2, zerophase=True) elif "3" in stream: fzcomp = filter.bandpass(stream["3"], lowcoS, hf, sampling_f, corners=2, zerophase=True) # Compute the SNR for given frequency bands # strength of primary arrival snrr = (sum(np.square(frcomp[pts1:pts2]))/npts)\ / (sum(np.square(frcomp[ptn1:ptn2]))/nptn) # how spiky is the arrival? snrr2 = (sum(np.square(frcomp[ptp1:ptp2]))/nptp)\ / (sum(np.square(frcomp[ptn1:ptn2]))/nptn) # horizontal vs vertical # snrz = (sum(np.square(frcomp[pts1:pts2]))/npts)\ # / (sum(np.square(fzcomp[pts1:pts2]))/npts) snrc = (sum(np.square(frcomp[ptc1:ptc2]))/nptc)\ / sum(np.square(fzcomp[ptc1:ptc2])/nptc) noisemat[ii, 0] = snrr noisemat[ii, 1] = snrr2/snrr noisemat[ii, 2] = snrc # accept if if snrr > SNR_criteriaS[0] and\ snrr2/snrr < SNR_criteriaS[1] and\ snrc < SNR_criteriaS[2]: # This bit didn't give good results # max(frcomp) == max(frcomp[round((onset-2)/dt): # round((onset+10)/dt)]): crit = True # overwrite the old traces with the sucessfully filtered ones for tr in st: if tr.stats.component == "R": tr.data = frcomp elif tr.stats.component in ("Z", '3'): tr.data = fzcomp elif tr.stats.component == "T": tr.data = ftcomp break # waveform is accepted else: crit = False return st, crit, hf, noisemat