#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
:copyright:
The PyGLImER development team (makus@gfz-potsdam.de).
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
:author:
Peter Makus (makus@gfz-potsdam.de)
Created: Sunday, 20th October 2019 10:31:03 am
Last Modified: Wednesday, 13th April 2022 02:55:06 pm
'''
import warnings
import numpy as np
from obspy import Stream, Trace
[docs]def resample_or_decimate(
data: Stream, sampling_rate_new: int, filter=True) -> Stream:
"""
Decimates the data if the desired new sampling rate allows to do so.
Else the signal will be interpolated (a lot slower).
:note: The stream has to be filtered a priori to avoid aliasing.
:param data: Stream to be resampled.
:type data: Stream
:param sampling_rate_new: The desired new sampling rate
:type sampling_rate_new: int
:return: The resampled stream
:rtype: Stream
"""
if isinstance(data, Stream):
sr = data[0].stats.sampling_rate
srl = [tr.stats.sampling_rate for tr in data]
if len(set(srl)) != 1:
# differing sampling rates in stream
for tr in data:
try:
tr = resample_or_decimate(tr, sampling_rate_new, filter)
except ValueError:
warnings.warn(
f'Trace {tr} not downsampled. Sampling rate is lower'
+ ' than requested sampling rate.')
return data
elif isinstance(data, Trace):
sr = data.stats.sampling_rate
else:
raise TypeError('Data has to be an obspy Stream or Trace.')
srn = sampling_rate_new
if srn > sr:
raise ValueError('New sampling rate greater than old. This function \
is only intended for downsampling.')
elif srn == sr:
return data
# Chosen this filter design as it's exactly the same as
# obspy.Stream.decimate uses
# Just for RFs to avoid instabilities
if filter and srn == 10:
data.filter('lowpass_cheby_2', freq=4, maxorder=12)
elif filter:
freq = sr * 0.5 / float(sr/srn)
data.filter('lowpass_cheby_2', freq=freq, maxorder=12)
if sr/srn == sr//srn:
return data.decimate(int(sr//srn), no_filter=True)
else:
return data.resample(srn, window='hann')
[docs]def convf(u: np.ndarray, v: np.ndarray, nf: int, dt: float) -> np.ndarray:
"""
Convolution conducted in the frequency domain.
Parameters
----------
u : np.array
Array 1.
v : np.array
Array 2.
nf : INTEGER
Array length in frequency domain (use next power of 2).
dt : FLOAT
Sampling Interval [s].
Returns
-------
c : np.array
Convolution of u with v.
"""
if not len(u) or not len(v):
raise ValueError('The input arrays have to have a length!')
U = np.fft.fft(u, n=nf)
V = np.fft.fft(v, n=nf)
C = U*V*dt
c = np.real(np.fft.ifft(C, n=nf))
return c
[docs]def corrf(u: np.ndarray, v: np.ndarray, nf: int) -> np.ndarray:
"""
Cross-correlation in frequency domain,
calculates the x=crosscorr(u,v). Hence, v is the flipped vector.
Parameters
----------
u : np.array
flipped vector.
v : np.array
Vector that the correlation is measured to.
nf : INTEGER
Array length in frequency domain (use next power of 2).
Returns
-------
x : np.array
Correlation vector.
"""
if not len(u) or not len(v):
raise ValueError('The input arrays have to have a length!')
V = np.conj(np.fft.fft(v, n=nf))
U = np.fft.fft(u, n=nf)
X = U*V
x = np.real(np.fft.ifft(X, n=nf))
return x
[docs]def gaussian(N: int, dt: float, width: float) -> np.ndarray:
"""
Create a zero-phase Gaussian function (i.e., a low-pass filter). In
particular meant to be convolved with the impulse
response that is the output of an iterative deconvolution
Parameters
----------
N : INTEGER
Length of the desired array.
dt : FLOAT
Sampling interval [s].
width : FLOAT
Width parameter of the Gaussian function.
Returns
-------
G : np.array
Gaussian window.
"""
if width <= 0:
raise ValueError('Gaussian width parameter has to be greater than 0.')
df = 1/(N*dt) # frequency step
f = np.arange(0, round(0.5*N), 1, dtype=float)*df # frequency array
w = 2*np.pi*f # angular frequency
G = np.array([0]*N, dtype=float)
G[0:round(N/2)] = np.exp(-w**2/(4*width**2))/dt
G_lr = np.flip(G)
G[round(N/2)+1:] = G_lr[-len(G[round(N/2)+1:]):]
return G
[docs]def filter(s: np.ndarray, F: np.ndarray, dt: float) -> np.ndarray:
"""
Convolves a filter with a signal (given in time domain).
Parameters
----------
s : np.array
Signal given in time domain.
F : np.array
Filter's amplitude response (i.e., frequency domain).
dt : FLOAT
Sampling interval [s].
Returns
-------
s_f : np.array
Filtered signal.
"""
nf = len(F)
S = np.fft.fft(s, n=nf)
S_f = np.multiply(S, F)*dt
s_f = np.real(np.fft.ifft(S_f, n=nf))
return s_f
[docs]def ricker(sigma: float, N2: int, dt: float) -> tuple:
""" create a zero-phase Ricker / Mexican hat wavelet
Parameters
----------
sigma : float
standard deviation
dt : float
sampling Interval
N2 : int
number of samples/2 (halflength). Full length = 2*N2+1
"""
rick_tt = np.arange(-N2, N2+1, 1, dtype=float)*dt # time vector
rick = 2/(np.sqrt(3*sigma)*np.pi**(1/4))*(1-np.power(rick_tt, 2)/sigma**2)\
* np.exp(-rick_tt**2/(2*sigma**2))
return rick_tt, rick
[docs]def noise(N: int, A: float) -> np.ndarray:
""" create random noise
Parameters
----------
N: int
signal length
A: float
maximal amplitude
"""
noise = np.random.rand(N)*A
return noise
[docs]def sshift(s: np.ndarray, N2: int, dt: float, shift: float) -> np.ndarray:
""" shift a signal by a given time-shift in the frequency domain
Parameters
----------
s : Arraylike
signal
N2 : int
length of the signal in f-domain (usually equals the next pof 2)
dt : float
sampling interval
shift : float
the time shift in seconds
Returns
-------
Timeshifted signal"""
S = np.fft.fft(s, n=N2)
# Omega
phshift = np.exp(-1j*shift*np.fft.fftfreq(N2, dt)*2*np.pi)
s_out = np.real(np.fft.ifft(phshift*S))
return s_out[:len(s)]