#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Contains the request class that is used to initialise the FDSN request for
the waveforms, the preprocessing of the waveforms, and the creation of
time domain receiver functions.
:copyright:
The PyGLImER development team (makus@gfz-potsdam.de).
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
:author:
Peter Makus (makus@gfz-potsdam.de)
Created: Monday, 27th April 2020 10:55:03 pm
Last Modified: Tuesday, 25th October 2022 12:34:06 pm
'''
import os
from http.client import IncompleteRead
from datetime import datetime
import logging
import typing as tp
from typing import Iterable, Tuple, List
import warnings
import time
from obspy import Stream, read_events, Catalog, Inventory
from obspy.clients.fdsn import Client as Webclient
from obspy.clients.fdsn.header import FDSNException
from obspy.core.utcdatetime import UTCDateTime
from obspy.taup import TauPyModel
from tqdm import tqdm
from pyglimer import constants
from pyglimer.waveform.download import download_small_db, downloadwav
from pyglimer.waveform.preprocess import preprocess
from pyglimer.waveform.filesystem import import_database
from pyglimer.utils import utils as pu
[docs]class Request(object):
""""Initialises the FDSN request for
the waveforms, the preprocessing of the waveforms, and the creation of
time domain receiver functions."""
def __init__(
self, proj_dir: str, raw_subdir: str, rf_subdir: str,
statloc_subdir: str, evt_subdir: str, log_subdir: str, phase: str,
rot: str, deconmeth: str, starttime: UTCDateTime or str,
endtime: UTCDateTime or str, prepro_subdir: str = None, pol: str = 'v',
minmag: float or int = 5.5,
event_coords: Tuple[float, float, float, float] = None,
network: tp.Union[str, List[str], None] = None,
station: tp.Union[str, List[str], None] = None,
waveform_client: list = None, evtcat: str = None,
continue_download: bool = False, loglvl: int = logging.WARNING,
format: str = 'hdf5', remove_response: bool = True, **kwargs):
"""
Create object that is used to start the receiver function
workflow.
:param proj_dir: parental directory that all project files
will be saved in (as subfolders).
:type proj_dir: str
:param raw_subdir: Directory, in which to store the raw waveform data.
:type raw_subdir: str
:param prepro_subdir: Directory, in which to store the preprocessed
waveform data (mseed). **Irrelevant if format is hdf5**.
:type prepro_subdir: str or None
:param rf_subdir: Directory, in which to store the receiver functions
in time domain (sac).
:type rf_subdir: str
:param statloc_subdir: Directory, in which to store the station
inventories (xml).
:type statloc: str
:param evt_subdir: Directory, in which to store the event catalogue.
:type evt_subdir: str
:param log_subdir: Directory that logs are stored in.
:type log_subir: str
:param phase: Arrival phase that is to be used as source phase.
"S" to create S-Sp receiver functions and "P" for P-Ps receiver
functions, "SKS" or "ScS" are allowed as well.
:type phase: str
:param rot: The coordinate system in that the seismogram should be
rotated
prior to deconvolution. Options are "RTZ" for radial, transverse,
vertical; "LQT" for an orthogonal coordinate system computed by
minimising primary energy on the
converted component, or "PSS" for a rotation along the polarisation
directions using the Litho1.0 surface wave tomography model.
:type rot: str
:param deconmeth: The deconvolution method to use for the RF creation.
Possible options are:
'it': iterative time domain deconvolution (Ligorria & Ammon, 1999)
'dampedf': damped frequency deconvolution
'fqd': frequency dependent damping - not a good choice for SRF
'waterlevel': Langston (1977)
'multit': for multitaper (Helffrich, 2006)
False/None: don't create RFs
:type deconmeth: str
:param starttime: Earliest event date to be considered.
:type starttime: :class:`obspy.UTCDateTime` or str
:param endtime: Latest event date to be considered.
:type endtime: :class:`obspy.UTCDateTime` or str
:param pol: Polarisation to use as source wavelet. Either "v" for
vertically polarised or 'h' for horizontally polarised S-waves.
Will be ignored if phase='S', by default 'v'.
:type pol: str, optional
:param minmag: Minimum magnitude, by default 5.5
:type minmag: float, optional
:param event_coords: In case you wish to constrain events to certain
origns. Given in the form (minlat, maxlat, minlon, maxlon),
by default None.
:type event_coords: Tuple, optional
:param network: Limit the dowloand and preprocessing to a certain
network.
Wildcards are allowed, by default None., defaults to None
:type network: str, optional
:param station: Limit the download and preprocessing to a certain
station or several stations. Use only if network!=None.
Wildcards are allowed, by default None.
:type station: str, optional
:param waveform_client: List of FDSN compatible servers to download
waveforms from.
See obspy documentation for obspy.Client for allowed acronyms.
A list of servers by region can be found at
`<https://www.fdsn.org/webservices/datacenters/>`_. None means
that all known servers are requested, defaults to None.
:type waveform_client: list, optional
:param evtcat: In case you want to use an already existing event
catalogue
in evtloc. If None a new catalogue will be downloaded (with the
parameters defined before), by default None, defaults to None
:type evtcat: str, optional
:param continue_download: Will delete already used events from the
event catalogue, so that the download will continue at the same
place after being interrupted. Will make the continuation faster,
but then old database will not be updated. Only makes sense if you
define an old catalogue. Defaults to False.
:type continue_download: bool, optional.
:param loglvl: Level for the loggers. One of the following:
CRITICAL, ERROR, WARNING, INFO,or DEBUG
If the level is DEBUG, joblib will fall back to using only
few cores and downloads will be retried,
by default WARNING.
:type loglvl: str, optional
:param remove_response: Correct the traces for station response.
You might want to set this to False if you imported data and don't
have access to response information. Defaults to True.
:type remove_response: bool, optional
:raises NameError: For invalid phases.
"""
# Allocate variables in self
loglvl = pu.log_lvl[loglvl.upper()]
self.loglvl = loglvl
if format.lower() == ('hdf5' or 'h5'):
self.h5 = True
elif format.lower() in ('sac', 'mseed'):
self.h5 = False
else:
raise NotImplementedError(
'Output format %s is unknown.' % format)
self.remove_response = remove_response
# Set velocity model
self.model = TauPyModel('iasp91')
self.phase = phase[:-1] + phase[-1].upper()
self.pol = pol.lower()
self.rot = rot.upper()
self.deconmeth = deconmeth
# Directories
self.logdir = os.path.join(proj_dir, log_subdir)
os.makedirs(self.logdir, exist_ok=True)
self.evtloc = os.path.join(proj_dir, evt_subdir)
self.statloc = os.path.join(proj_dir, statloc_subdir)
self.rawloc = os.path.join(proj_dir, raw_subdir, self.phase)
if prepro_subdir is None and not self.h5:
raise ValueError(
'Prepro_subdir has to be defined if the output format is '
f'{format}.')
if prepro_subdir is None:
self.preproloc = None
else:
self.preproloc = os.path.join(proj_dir, prepro_subdir, self.phase)
self.rfloc = os.path.join(proj_dir, rf_subdir, self.phase)
# logger for the download steps
self.logger = logging.getLogger('pyglimer.request')
self.logger.setLevel(loglvl)
self.fh = logging.FileHandler(
os.path.join(self.logdir, 'request.log'))
self.fh.setLevel(loglvl)
self.logger.addHandler(self.fh)
fmt = logging.Formatter(
fmt='%(asctime)s - %(levelname)s - %(message)s')
self.fh.setFormatter(fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setLevel(loglvl)
consoleHandler.setFormatter(fmt)
self.logger.addHandler(consoleHandler)
# We don't really want to see all the warnings.
if loglvl > logging.WARNING:
warnings.filterwarnings("ignore")
else:
logging.captureWarnings(True)
warnings_logger = logging.getLogger("py.warnings")
warnings_logger.addHandler(self.fh)
warnings_logger.setLevel(loglvl)
# minimum magnitude
self.minmag = minmag
# Request time window
self.starttime = UTCDateTime(starttime)
self.endtime = UTCDateTime(endtime)
# geographical constraints
if event_coords:
(self.eMINLAT, self.eMAXLAT,
self.eMINLON, self.eMAXLON) = event_coords
else:
(self.eMINLAT, self.eMAXLAT,
self.eMINLON, self.eMAXLON) = None, None, None, None
# Set event depth and min/max epicentral distances
# according to phase (see Wilson et. al., 2006)
# and time window before (tz) and after (ta) first arrival
self.ta = 120
try:
self.maxdepth = constants.maxdepth[phase.upper()]
self.min_epid = constants.min_epid[phase.upper()]
self.max_epid = constants.max_epid[phase.upper()]
self.tz = constants.onset[phase.upper()]
except KeyError:
raise NotImplementedError(
f'The phase {self.phase} is not valid or not implemented yet.')
# network and station filters
self.network = network
self.station = station
self.waveform_client = waveform_client
# Download or process available data?
if evtcat == 'None':
# yaml files don't accept None
evtcat = None
if evtcat:
self.evtfile = os.path.join(self.evtloc, evtcat)
self.evtcat = read_events(self.evtfile)
else:
self.download_eventcat()
self.continue_download = continue_download
[docs] def download_eventcat(self):
"""
Download the event catalogue from IRIS DMC.
"""
# Server settings
# 2021/02/16 Events only from IRIS as the USGS webserice tends to be
# unstable and mixing different services will lead to a messed db
self.webclient = Webclient('IRIS')
event_cat_done = False
# 05.07.2021: Shorten length of a to one year, which is a lot
# more robust
# :NOTE: perhaps it would be smart to save each year as a file?
# BUt then again, they have different requirements...
# Check length of request and split if longer than a year.
a = 2*365.25*24*3600 # two yrs in seconds
if self.endtime-self.starttime > a:
# Request is too big, break it down ito several requests
starttimes = [self.starttime, self.starttime+a]
while self.endtime-starttimes[-1] > a:
starttimes.append(starttimes[-1]+a)
endtimes = []
endtimes.extend(starttimes[1:])
endtimes.append(self.endtime)
msg = 'Long request: Breaking it down to %s sub-requests.'\
% str(len(endtimes))
self.logger.info(msg)
# Query
self.evtcat = Catalog()
# We convert the iterator to a list, so tqdm workii properly
for st, et in tqdm(list(zip(starttimes, endtimes))):
event_cat_done = False
while not event_cat_done:
# IRIS has a restriction of ten connections /second
time.sleep(0.2)
try:
self.evtcat.extend(
self.webclient.get_events(
starttime=st, endtime=et,
minlatitude=self.eMINLAT,
maxlatitude=self.eMAXLAT,
minlongitude=self.eMINLON,
maxlongitude=self.eMAXLON,
minmagnitude=self.minmag,
maxmagnitude=10, maxdepth=self.maxdepth))
event_cat_done = True
except (IncompleteRead, FDSNException) as e:
# Server interrupted connection, just try again
# This usually happens with enormeous requests, we
# should reduce a
msg = \
"Server interrupted connection, retrying..."
self.logger.warning(msg)
self.logger.warning(e)
continue
else:
while not event_cat_done:
# IRIS has a restriction of ten connections /second
time.sleep(0.12)
try:
self.evtcat = self.webclient.get_events(
starttime=self.starttime, endtime=self.endtime,
minlatitude=self.eMINLAT, maxlatitude=self.eMAXLAT,
minlongitude=self.eMINLON, maxlongitude=self.eMAXLON,
minmagnitude=self.minmag, maxmagnitude=10,
maxdepth=self.maxdepth)
event_cat_done = True
except (IncompleteRead, FDSNException):
# Server interrupted connection, just try again
# This usually happens with enormeous requests, we should
# reduce a
msg = "Server interrupted connection, \
restarting download..."
self.logger.warning(msg)
continue
os.makedirs(self.evtloc, exist_ok=True)
self.evtfile = os.path.join(
self.evtloc, datetime.now().strftime("%Y%m%dT%H%M%S"))
self.evtcat.write(self.evtfile, format="QUAKEML")
msg = 'Successfully obtained %s events' % str(self.evtcat.count())
self.logger.info(msg)
[docs] def import_database(
self, yield_st: Iterable[Stream],
yield_inv: Iterable[Inventory]):
"""
Import a local database to PyGLImER. This can, for example, be
continuous raw waveform data that you collected in an own experiment.
Data is fed in as Generator function for an obspy type Inventory and
obspy type streams.
.. note::
If you don't have Response information for your stations make
sure to set ``Request.remove_response`` to ``False``
.. note::
Follow the obspy documentation to create StationXMLs. This
`Tutorial <https://docs.obspy.org/tutorial/code_snippets/\
stationxml_file_from_scratch.html>`_
The StationXML needs to contain the following information:
1. Network and Station Code
2. Latitude, Longitude, and Elevation
3. Azimuth of Channel/Location to do the Rotation.
4. (Optional) Station response information.
.. note::
Make sure that the Traces in the Stream contain the following
information in their header:
1. sampling_rate
2. start_time, end_time
3. Network, Station, and Channel Code (Location code arbitrary)
:param yield_st: _description_
:type yield_st: Generator[Stream]
:param yield_inv: _description_
:type yield_inv: Generator[Inventory]
"""
import_database(
self.phase, self.model, self.evtcat, self.tz, self.ta,
self.statloc, self.rawloc, self.h5, yield_inv, yield_st)
[docs] def preprocess(
self, client: str = 'joblib',
hc_filt: float or int or None = None):
"""
Preprocess an existing database. With parameters defined in self.
Parameters
----------
hc_filt : float or int or None, optional
Highcut frequency to filter with right before deconvolution.
Recommended if time domain deconvolution is used. For spectral
division, filtering can still be done after deconvolution (i.e.
set in :func:`~pyglimer.ccp.ccp.CCPStack.compute_stack()`).
Value for PRFs should usually be lower than 2 Hz and for SRFs lower
than .4 Hz, by default None.
"""
preprocess(
self.phase, self.rot, self.pol, 0.05, self.evtcat, self.model,
'hann', self.tz, self.ta, self.statloc, self.rawloc,
self.preproloc, self.rfloc, self.deconmeth, hc_filt,
netrestr=self.network, statrestr=self.station, client=client,
saveasdf=self.h5, remove_response=self.remove_response)