Source code for pyglimer.rf.deconvolve

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''

Various Deconvolution approaches used for the RF technique.

:copyright:
   The PyGLImER development team (makus@gfz-potsdam.de).
:license:
   GNU Lesser General Public License, Version 3
   (https://www.gnu.org/copyleft/lesser.html)
:author:
    Peter Makus (makus@gfz-potsdam.de)

Created: Wednesday, 16th October 2019 02:24:30 pm
Last Modified: Wednesday, 20th July 2022 02:55:20 pm

'''

import numpy as np
from scipy.signal.windows import dpss
from scipy.fftpack import next_fast_len

import pyglimer.utils.signalproc as sptb


[docs]def it(P, H, dt, shift=0, width=2.5, omega_min=0.5, it_max=200): """ Iterative deconvolution after Ligorria & Ammon (1999) Essentially, P will be deconvolved from H. :param P: The source wavelet estimation / denominator. :type P: 1D np.ndarray :param H: The enumerator (impulse response * source wavelet estimation). :type H: 1D np.ndarray :param dt: Sampling interval [s]. :type dt: float :param shift: Time shift of main arrival [s]. The default is 0. :type shift: float, optional :param width: Gaussian width parameter. The default is 2.5. :type width: float, optional :param omega_min: Convergence control parameter (percentage improvement per iteration). The default is 0.5. :type omega_min: float, optional :param it_max: Maximal number of iterations before the algorithm interrupts, defaults to 400 :type it_max: int, optional :return: rf : The receiver function. it : Number of iterations until algorithm converged. IR : Estimation of the medium's impulse response. :rtype: rf : np.ndarray it : int IR : np.ndarray """ omega_min = omega_min/100 # change from per cent to decimal # create proper numpy arrays (allow functions on arrays as in Matlab) P = np.array(P, dtype=float) H = np.array(H, dtype=float) N = len(H) N2 = next_fast_len(N) # Pad input with zeros to the next power of 2 P = np.append(P, np.zeros(N2-len(P))) H = np.append(H, np.zeros(N2-len(H))) # PREPARING PARAMETERS FOR LOOP # it = 0 # number of iteration r = H # the first residual value # Begin iterative process: IR = np.zeros(N2) omega = 1 # start value # WHILE LOOP FOR ITERATIVE DECON ##### while omega > omega_min and it < it_max: it = it+1 # find cross-correlation a = sptb.corrf(r, P, N2) # Find position of maximum correlation, with k*dt k = np.argmax(abs(a[0:N2])) # Find amplitude of point with highest correlation # That dt is here because I don't have it in the corrf function ak = a[k]/(np.sum(np.power(P, 2))*dt) IR[k] = IR[k] + ak # construct estimated impulse response # convolve impulse response and source wavelet: est = sptb.convf(IR, P, N2, dt) # compute residual for ongoing iteration it r_new = H - est omega = abs(( np.linalg.norm(r)**2 - np.linalg.norm(r_new)**2) / np.linalg.norm(r)**2) r = r_new # Create receiver function Gauss = sptb.gaussian(N2, dt, width) rf = sptb.filter(IR, Gauss, dt) # shift and truncate RF if shift: # only if shift !=0 # rf = sptb.sshift(rf, N2, dt, shift) rf = np.roll(rf, round(shift/dt)) rf = rf[0:N] return rf, it, IR
[docs]def spectraldivision(v, u, ndt, tshift, regul, phase, test=False): """ Function spectraldivision(v,u,ndt,tshift,regul) is a standard spectral division frequency domain deconvolution. :param v: Source wavelet estimation (denominator). :type v: 1D np.ndarray :param u: Impulse response convolved by v (enumerator). :type u: 1D np.ndarray :param ndt: Sampling interval [s]. :type ndt: float :param tshift: Time shift of primary arrival [s]. :type tshift: float :param regul: Regularization, can be chosen by the variable "regul", this can be 'con', 'wat', or 'fqd' for constant damping factor, waterlevel, or frequency-dependent damping, respectively :type regul: str :param phase: Phase either "P" for Ps or "S" for Sp. :type phase: str :raises Exception: for uknown regulisations :return: qrf : Receiver function. lrf : Output of deoncolution of source wavelet estimation from longitudinal component. :rtype: 1D np.ndarray """ phase = phase[-1].upper() N = len(v) # pre-event noise needed for regularisation parameter vn = np.zeros(N) if phase == "P": vn[:round((tshift-2)/ndt)] = v[:round((tshift-2)/ndt)] elif phase == "S": vn[:round((tshift/2)/ndt)] = v[:round((tshift/2)/ndt)] else: raise ValueError('Unknown teleseismic phase') # number of points in fft nf = next_fast_len(N) # fourier transform uf = np.fft.fft(u, n=nf) vf = np.fft.fft(v, n=nf) vnf = np.fft.fft(vn, n=nf) # denominator and regularisation den = np.multiply(vf, np.conj(vf)) noise = np.multiply(vnf, np.conj(vnf)) # which regularization do you want? freqdep = regul == 'fqd' const = regul == 'con' water = regul == 'wat' if not freqdep and not const and not water: raise ValueError( "Regularization not defined (your input: regul=%s. " % regul + 'Use either "fqd" for frequency-dependent or "con" for constant ' + "value regularization or 'wat' for water-level.") # constant damping factor regularization if test: eps = max(den.real)*0.25 # den = den + eps den[den.real < eps] = eps elif const: eps = max(noise.real) den = den + eps # frequency-dependent damping elif freqdep: den = den+noise # waterlevel regularization elif water: eps = max(noise.real) den[den.real < eps] = eps # numerator num = np.multiply(uf, np.conj(vf)) numl = np.multiply(vf, np.conj(vf)) # deconvolution, with scaling by 1/ndt to bring back # rfl peak close to unity, for any value of ndt rfq = num/den*(1/ndt) rfl = numl/den*(1/ndt) # SR: this is the cos^2 filter discussed in Park and Levin, 2000, # on page 1509 (top of right column), with fc set to 1 Hz - probably # comes from Helffrich's code. if freqdep: N2 = np.floor(np.size(rfq)/2) for i in range(int(N2)): fac = np.cos(np.pi/2*(i-1)/N2)**2 rfq[i] = rfq[i]*fac # back transformation v = np.fft.fftfreq(nf, ndt)*2*np.pi # Re-Introduce Time shift dt = np.exp(-1j*v*tshift) # /ndt rfq = np.multiply(rfq, dt) rfl = np.multiply(rfl, dt) qrft = np.fft.ifft(rfq, n=nf) qrf = qrft.real qrf = qrf[:N] lrft = np.fft.ifft(rfl, n=nf) lrf = lrft[:N].real lrf = lrf[:N] return qrf, lrf
[docs]def multitaper( P: np.ndarray, D: np.ndarray, dt: float, tshift: int or float, regul: str): """ Output has to be filtered (Noise appears in high-frequency)! multitaper: takes Ved-Kathrins code and changes inputs to make it run like Helffrich algorithm, minus normalization and with questionable calculation of pre-event noise (have to double check this and compare it to the regular FFT based estimate without any tapers). Findings: when P arrival is not in the center of the window, the amplitudes are not unity at the beginning and decreasing from there on. Instead they peak at the time shift which corresponds to the middle index in the P time window. TB = time bandwidth product (usually between 2 and 4) NT = number of tapers to use, has to be <= 2*TB-1 TB = 4; NT = 7; #choice of TB = 4, NT = 3 is supposed to be optimal t0 = -5; t1 = max(time); # This defines the beginning and end of the lag times for the receiver function changed by KS June 2016, added coherence and variance estimate; output RF in time domain (rf), RF in freq. domain (tmpp), and variance of RF (var_rf); can be used as input for multitaper_weighting.m the input "regul" defines which type of regularization is used, regul='fqd' defines frequency dependent regularisation from pre-event noise, regul='con' defines adding a constant value (here maximum of pre-event noise) as regularisation Parameters ---------- P : np.ndarray Source time function estimation. D : np.ndarray Component containing the converted wave. dt : float Sampling interval [s]. tshift : float Time shift of primary arrival [s]. regul : str Regularization, either 'fqd' for frequency dependentor 'con' for constant. Raises ------ Exception For unknown input. Returns ------- rf : np.ndarray The receiver function. lrf : np.ndarray The source-time wavelet convolved by itself. var_rf : np.ndarray The receiver function's variance. tmpp : np.ndarray Receiver function without variance weighting. """ # wavelet always in the center of the window # Original from Ved's # win_len = tshift*2; # Modification to get P&L # win_len = length(P)*dt; # Modificaiton to get Helffrich win_len = 50 Nwin = round(win_len/dt) # Fraction of overlap overlap between moving time windows. As your TB # increases, the frequency smearing gets worse, which means that the RFs # degrate at shorter and shorter lag times. Therefore, as you increase TB, # you should also increase Poverlap. Poverlap = 0.75 # Length of waveforms; nh = len(P) # tapernumber, bandwith # is in general put to NT=3, and bandwidth to 2.5 # TB=2.5; # #NT=2*TB-1; #4 tapers # NT=3; TB = 4 # NT=2*TB-1; #4 tapers NT = 3 # Create moving time windowed slepians starts = np.arange(0, nh-Nwin+1, round((1-Poverlap)*Nwin)) # Construct Slepians Etmp, lambdas = dpss(Nwin, TB, Kmax=NT, return_ratios=True) Etmp = Etmp.T # 05.08.2020 tranpose slepian windows E = np.zeros((len(starts)*NT, nh)) # Start Index n = 0 NUM = np.zeros((NT, len(P))) DEN = np.zeros((NT, len(D))) DUM = np.zeros((NT, len(D))) ESTP = np.zeros((NT, len(P))) ESTD = np.zeros((NT, len(D))) # finding frequency dependent regularisation parameter DEN_noise # added: KS 26.06.2016 Pn = np.zeros(np.shape(P)) # Pn(3/dt:(tshift-5)/dt)=P(3/dt:(tshift-5)/dt) Pn[:round((tshift-2)/dt)] = P[:round((tshift-2)/dt)] # pre-event noise: starting 3s after trace start # stop 10s before theoretical start of P # wave to aviod including it DEN_noise = np.zeros((NT, len(P))) # Multitaper # SR: problem here is how the loop is done ... there's only a peak # at the pulse because the two windows of the num and den are moving # together. One should first compute the entire estimate of the wavelet # and the data for each valie of k, and then do the sum of products for # each k!!! for k in range(NT): for j in starts: E[n, j:j+Nwin] = Etmp[:, k].transpose() tmp1 = np.fft.fft(np.multiply(E[n, :], P)) tmp2 = np.fft.fft(np.multiply(E[n, :], D)) NUM[k, :] = NUM[k, :] + np.multiply(lambdas[k]*tmp1.conj(), tmp2) DEN[k, :] = DEN[k, :] + np.multiply(lambdas[k]*tmp1.conj(), tmp1) ESTP[k, :] = ESTP[k, :] + lambdas[k]*tmp1 ESTD[k, :] = ESTD[k, :] + lambdas[k]*tmp2 # DUM only from D trace (converted wave component) used in # coherence estimate DUM[k, :] = DUM[k, :] + np.multiply(lambdas[k]*tmp2.conj(), tmp2) # pre-event noise # always stick to first time window tmp1n = np.fft.fft(np.multiply(E[n, :], Pn)) DEN_noise[k, :] = DEN_noise[k, :] \ + np.multiply(lambdas[k]*tmp1n.conj(), tmp1n) n = n + 1 # max_imag_ep = max(abs(np.imag(np.fft.ifft(sum(ESTP))))) # max_imag_ed = max(abs(np.imag(np.fft.ifft(sum(ESTD))))) # ep = np.real(np.fft.ifft(sum(ESTP))); # ed = np.real(np.fft.ifft(sum(ESTD))); # Calculate optimal RF with frequency-dependend regularisation freqdep = regul == 'fqd' const = regul == 'con' if freqdep: tmpp = np.divide(np.sum(np.multiply(ESTP.conj(), ESTD), axis=0), np.sum(np.multiply(ESTP.conj(), ESTP), axis=0) + sum(DEN_noise))*1/dt tmpp_l = np.divide(np.sum(np.multiply(ESTP.conj(), ESTP), axis=0), np.sum(np.multiply(ESTP.conj(), ESTP), axis=0) + sum(DEN_noise))*1/dt N2 = np.floor(nh/2) + 1 for i in range(int(N2)): fac = np.cos(np.pi/2*i/N2)**2 tmpp[i] = tmpp[i]*fac elif const: # ordinary regularisation with adding only a constant value eps = DEN_noise.real.max() + 1 tmpp = np.divide(np.sum(np.multiply(ESTP.conj(), ESTD), axis=0), np.sum(np.multiply(ESTP.conj(), ESTP)+eps, axis=0))*1/dt tmpp_l = np.divide(np.sum(np.multiply(ESTP.conj(), ESTD), axis=0), np.sum(np.multiply(ESTP.conj(), ESTP)+eps, axis=0))*1/dt else: raise Exception('Regularization not defined (your input: regul=', regul, """). Use either "fqd" for frequency-dependent or "con" for constant value regularization.""") # RF without variance weighting tmp1 = np.fft.ifft(tmpp).real tmp1_l = np.fft.ifft(tmpp_l).real # Interpolate to desired N = len(P) rf = tmp1[N-round(tshift/dt):N] rf = np.append(rf, tmp1[:N-round(tshift/dt)]) # Python has to append, whereas in Matlab this here worked: # rf[round(tshift/dt):] = tmp1[:N-round(tshift/dt)] lrf = tmp1_l[round(N-tshift/dt):N] lrf = np.append(lrf, tmp1_l[:round(N-tshift/dt)]) # lrf[tshift/dt:N] = tmp1_l[:N-tshift/dt] #### # Coherence and Variance of RF # added: KS 26.06.2016 C_rf = np.divide(NUM, np.sqrt(np.multiply(DUM, DEN))) var_rf = np.zeros(len(C_rf)) # for ii in range(len(C_rf)): # var_rf[ii] = ( # (1-abs(C_rf[ii])**2)/((NT-1)*abs(C_rf[ii])**2))*(abs(tmpp[ii])**2) var_rf = None return rf, lrf, var_rf, tmpp