'''
:copyright:
The PyGLImER development team (makus@gfz-potsdam.de).
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
:author:
Lucas Sawade (lsawade@princeton.edu)
Peter Makus (makus@gfz-potsdam.de)
Created: Tue May 26 2019 13:31:30
Last Modified: Monday, 26th September 2022 10:45:48 am
'''
import logging
import os
from obspy.core.event.event import Event
import typing as tp
from typing import List, Tuple, Optional
from warnings import warn
import psutil
import numpy as np
from joblib import Parallel, delayed
from obspy.clients.fdsn import Client, header
from obspy.clients.fdsn.header import URL_MAPPINGS
from obspy.core.inventory.inventory import Inventory
from obspy.core.stream import Stream, Trace
from obspy.core.utcdatetime import UTCDateTime
from pyglimer.database.raw import save_raw_DB_single_station, write_st
from pyglimer.rf.create import RFStream, RFTrace
# from pyglimer.database.asdf import save_raw_single_station_asdf
from .roundhalf import roundhalf
log_lvl = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'CRITICAL': logging.CRITICAL,
'ERROR': logging.ERROR}
def utc_save_str(utc: UTCDateTime):
return UTCDateTime(round(utc.timestamp)).format_fissures()[:-6]
[docs]def dt_string(dt: float) -> str:
"""Returns Time elapsed string depending on how much time has passed.
After a certain amount of seconds it returns minutes, and after a certain
amount of minutes it returns the elapsed time in hours."""
if dt > 500:
dt = dt / 60
if dt > 120:
dt = dt / 60
tstring = " Time elapsed: %3.1f h" % dt
else:
tstring = " Time elapsed: %3.1f min" % dt
else:
tstring = " Time elapsed: %3.1f s" % dt
return tstring
[docs]def chunks(lst: list, n: int) -> list:
"""Yield successive n-sized chunks from lst. Useful for multi-threading"""
for i in range(0, len(lst), n):
yield lst[i:i + n]
[docs]def download_full_inventory(statloc: str, fdsn_client: list):
"""
This utility loops through statloc and redownloads the whole response
(i.e., all channels and all times) for every station. Thus, overwriting
the old xml file.
:param statloc: Folder in which the old stationxmls are saved
:type statloc: str
:param fdsn_client: List of FDSN providers that should be queried.
:type fdsn_client: list
"""
bulk = []
for fi in os.listdir(statloc):
f = fi.split('.')
if f[-1].lower() != 'xml':
continue
bulk.append((f[0], f[1], '*', '*', '*', '*'))
fdsn_client = get_multiple_fdsn_clients(fdsn_client)
_ = Parallel(n_jobs=-1, prefer='threads')(
delayed(__client__loop__)(client, statloc, bulk)
for client in fdsn_client)
def join_inv(invlist=List[Inventory]) -> Inventory:
inv = invlist.pop(0)
for ii in invlist:
for net in ii:
inv.extend([net])
return inv
[docs]def check_UTC_overlap(
start: List[UTCDateTime], end: List[UTCDateTime]) -> List[bool]:
"""Checks a list of starttimes and endtimes for overlap
Parameters
----------
start : List[UTCDateTime]
List of UTCDatetime starttimes
end : List[UTCDateTime]
List of UTCDateTime endtimes
Returns
-------
List[bool]
List of booleans. If True no overlap, if False overlap
"""
# Initiate list saying there is no overlap
check = len(start)*[True]
for _i, (_start, _end) in enumerate(zip(start, end)):
# Loop over same array to check whether there is overlap in any of
# the windows.
for _j, (_startc, _endc) in enumerate(zip(start, end)):
# Don't want to compute overlap of the same window
if _j == _i:
continue
# Check whether start or endtime of another window is in the range
if (_start < _startc and _startc < _end) \
or (_start < _endc and _endc < _end):
check[_i] = False
return check
def __client__loop__(
client: str or Client, statloc: str, bulk: list) -> Inventory:
"""
Download station information from specified client and for the
specified bulk list.
:param client: FDSN client to use
:type client: str or obspy.fdsn.clients.Client
:param statloc: Location in which the station xmls are to be saved.
:type statloc: str
:param bulk: Bulk list to described the queried download.
:type bulk: list
:return: The inventory for all downloaded stations
:rtype: obspy.Inventory
"""
logger = logging.getLogger('pyglimer.request')
try:
if not isinstance(client, Client):
client = Client(client)
stat_inv = client.get_stations_bulk(
bulk, level='response')
except (
header.FDSNNoDataException, header.FDSNException, ValueError,
TypeError) as e:
logger.warning(str(e))
logger.warning(f"--> {bulk}")
return # wrong client
# ValueError is raised for querying a client without station service
for network in stat_inv:
netcode = network.code
for station in network:
statcode = station.code
logger.debug(f"{netcode}.{statcode}")
out = os.path.join(statloc, '%s.%s.xml' % (netcode, statcode))
stat_inv.select(network=netcode, station=statcode).write(
out, format="STATIONXML")
return stat_inv
[docs]def chunkdict(d: dict, chunksize) -> list:
"""Chunks upd dictionary with values into list of dictionaries where
each dictionary has chunksize or less entries.
Parameters
----------
d : dict
dictionary of lists
chunksize : int
number of elements per list in dictionary
Returns
-------
list
returns a list of dictionaries subsets of lists in the original
dictionary
"""
#
tempd = dict()
for k, v in d.items():
tempd[k] = list(chunks(v, chunksize))
key = list(d.keys())[0]
Nchunk = len(tempd[key])
out = []
for i in range(Nchunk):
subd = dict()
for k, v in tempd.items():
subd[k] = v[i]
out.append(subd)
return out
[docs]def save_stream(
st: Stream, rawloc: str, saveh5: bool, saved: dict, inv: Inventory,
network: tp.Union[str, None] = None,
station: tp.Union[str, None] = None):
"""Saves a stream to either mseed or HDF5 format depending on flag, and the
event-channel dictionary ``saved`` which with created in a parent function.
Parameters
----------
st : Stream
stream to save
rawloc : str
path to database raw waveforms databases
saveh5 : bool
whether to save hdf5 or mseed
saved : dict
event-channel dictionary that contains the bulk requests made
inv : Inventory
inventory
network : str | None, optional
network string, by default None
station : str | None, optional
station string, by default None
"""
# Save stuff in hdf5 format (custom format that is provided by pyglimer)
if saveh5:
# Make sure HDF5 file is only opened once
if (network is not None) and (station is not None):
save_raw_DB_single_station(
network, station, saved, st, rawloc, inv)
# Opens and closes HDF5 file for all traces belonging to one station.
# It's more versatile, but less efficient
else:
save_raw(saved, st, rawloc, inv, True)
# Save minised
else:
save_raw(saved, st, rawloc, inv, False)
def __download_sub__(client: str, saved: dict) -> Stream:
"""Takes in bulk requests, downloads them, and and returns stream of
traces, empty stream or raises an error.
Parameters
----------
client : str
Name of the client that data is requested from
saved : dict
dictionary with event-channel data and bulk requests
Returns
-------
Stream
empty or full
Raises
------
ValueError
if request is faulty or another error was thrown
"""
logger = logging.getLogger('pyglimer.request')
# Get bulk and flatten
bulk = []
# Get full bulk list
for _b in saved['bulk']:
bulk.extend(_b)
# ## The Block here is to debug the actual request made #####
# But for code and output purposes it is unecessary to log.
# I would just keep it for future debugging
# for _net, _sta, _cha, _st, _et, _event, _b in zip(
# saved['net'], saved['stat'], saved['chan'],
# saved['startt'], saved['endt'], saved['event'], saved['bulk']):
# # Get event id
# origin = (_event.preferred_origin() or _event.origins[0])
# e_id = utc_save_str(origin.time)
# # Print relevant debugging information
# print(f"{_net}.{_sta}..{_cha}: {_st} -- {_et} --> {e_id}")
# for __b in _b:
# print(f"^-->{__b}")
# #############################################################
# Sort bulk request.
bulk.sort()
try:
if not isinstance(client, Client):
client = Client(client)
st = client.get_waveforms_bulk(bulk)
logger.info(
f'Downloaded {st.count()} traces from Client {str(client)}.')
return st
except (header.FDSNNoDataException, header.FDSNException, ValueError) as e:
if 'HTTP Status code: 204' in str(e):
logger.debug('--------- NO DATA FOR REQUESTS: ----------------')
for __bulk in bulk:
logger.debug(f"|| {__bulk}")
logger.debug('------------------------------------------------')
return Stream()
elif isinstance(e, ValueError):
print(str(e))
else:
print(str(e))
raise ValueError('See Error above.')
# ValueError is raised for querying a client without
# station service
def __client__loop_wav__(
client: str, rawloc: str, saved: dict, saveh5: bool,
inv: Inventory, network: Optional[str] = None,
station: Optional[str] = None, parallel: bool = False):
""" Loops over event-channel requests and launches parallel or single
threaded downloads depending on the input parameter ``parallel``. The
Single threaded type is used if station downloads are parallelized (a
parent function) and parallel is used when a single station is downloaded
and events can be parallelized.
Parameters
----------
client : str
client to download from, e.g. "IRIS".
rawloc : str
database location for the raw data
saved : dict
dict containing channel-event info and bulk requests
saveh5 : bool
whether to save in HDF5 or not
inv : Inventory
inventory of the stations
network : Optional[str], optional
network string, by default None
station : Optional[str], optional
station string, by default None
parallel : bool, optional
whether to download requests in parallel or not, by default False
"""
logger = logging.getLogger('pyglimer.request')
# Making sure that all chunk sizes are the same
# This has become the equivalent of number of traces, unless there are
# multiple locations available.
chunksize = 5000
# Get one key to get length of lists in dictionaries.
key = list(saved.keys())[0]
N0 = len(saved[key])
# Chunkify data and bulkstrings
if N0 > chunksize:
saved = chunkdict(saved, chunksize)
# Request too small to parallelize
else:
# Create single chunk
saved = [saved, ]
# Set parallel to false because it is unnecessary
parallel = False
# Number of Total chunks to be downloaded
N = len(saved)
logger.debug("Downloading ...")
# Get number of cores available
if parallel:
# Get number of available CPUs
NCPU = psutil.cpu_count(logical=False) - 2
# Turn off parallelism if c=number of available cores is 1
if NCPU <= 1:
parallel = False
# Limit CPUs to four so that the FDSN server is not overwhelmed
elif NCPU > 4:
NCPU = 4
# Splitting the total number of chunks of NCPU, so that NCPU chunks
# are downloaded simulatenously
saved = list(chunks(saved, NCPU))
counter = 0
logger.debug(
f" Downloading {N} chunks with each chunk "
f"<= 100 requests dowloaded in Parallel")
with Parallel(n_jobs=NCPU, backend='multiprocessing') as PARALLEL:
# Loop of chunk subsets
for _i, _saved in enumerate(saved):
counter += len(_saved)
logger.debug(f" Downloading {counter}/{N} chunks")
# Download streams in parallel
streams = PARALLEL(
delayed(__download_sub__)(client, __saved)
for __saved in _saved)
# Save streams one by one
for st, __saved in zip(streams, _saved):
# If stream is empty skip
if len(st) == 0:
continue
# Save stream to mseed ot h5
save_stream(
st, rawloc, saveh5, __saved, inv, network, station)
# Get stuff single threadedly
else:
streams = []
for i, _saved in enumerate(saved):
logger.debug(f" --> Bulk chunk {i}/{N}")
st = __download_sub__(client, _saved)
if st is None or len(st) == 0:
continue
# Save stream to mseed ot h5
save_stream(st, rawloc, saveh5, _saved, inv, network, station)
[docs]def save_raw(
saved: dict, st: Stream, rawloc: str, inv: Inventory, saveh5: bool):
"""
Save the raw waveform data in the desired format.
The point of this function is mainly that the waveforms will be saved
with the correct associations and at the correct locations.
:param saved: Dictionary holding information about the original streams
to identify them afterwards.
:type saved: dict
:param st: obspy stream holding all data (from various stations)
:type st: Stream
:param rawloc: Parental directory (with phase) to save the files in.
:type rawloc: str
:param inv: The inventory holding all the station information
:type inv: Inventory
:param saveasdf: If True the data will be saved in asdf format.
:type saveasdf: bool
"""
# Just use the same name
for evt, startt, endt, net, stat in zip(
saved['event'], saved['startt'], saved['endt'], saved['net'],
saved['stat']):
# earlier we downloaded all locations, but we don't really want
# to have several, so let's just keep one
try:
sst = st.select(network=net, station=stat)
# This might actually be empty if so, let's just skip
if sst.count() == 0:
logging.debug(f'No trace of {net}.{stat} in Stream.')
continue
slst = sst.slice(startt, endt)
# Only write the prevelant location
locs = [tr.stats.location for tr in sst]
filtloc = max(set(locs), key=locs.count)
sslst = slst.select(location=filtloc)
if saveh5:
sinv = inv.select(net, stat, starttime=startt, endtime=endt)
write_st(sslst, evt, rawloc, sinv)
else:
save_raw_mseed(evt, sslst, rawloc, net, stat)
except Exception as e:
logging.error(e)
[docs]def save_raw_mseed(evt: Event, slst: Stream, rawloc: str, net: str, stat: str):
"""
Saves input stream in the correct location in mini seed format.
:param evt: event that is associated to the recording.
:type evt: Event
:param slst: The selected stream. Should contain all components of one
recording of a teleseismic event.
:type slst: Stream
:param rawloc: Location (with phase) to save the miniseed files in (will
also create subfolders).
:type rawloc: str
:param net: Network code
:type net: str
:param stat: Station Code
:type stat: str
"""
o = (evt.preferred_origin() or evt.origins[0])
ot_loc = UTCDateTime(o.time, precision=-1).format_fissures()[:-6]
evtlat_loc = str(roundhalf(o.latitude))
evtlon_loc = str(roundhalf(o.longitude))
folder = os.path.join(
rawloc, '%s_%s_%s' % (ot_loc, evtlat_loc, evtlon_loc))
os.makedirs(folder, exist_ok=True)
fn = os.path.join(folder, '%s.%s.mseed' % (net, stat))
slst.write(fn, fmt='mseed')
[docs]def get_multiple_fdsn_clients(
clients: List[str] or str or None) -> Tuple[Client]:
"""
Returns a tuple holding all the queried fdsn providers. Also finds
all available fdsn providers if input is None. Will also sort the clients
so the big providers (IRIS and ORFEUS) come last.
Just a modified version of a code in the obspy mass downloader.
:param clients: List of strings, each describing one client.
Put None if you want to use all available.
:type clients: List[str] or str or None
:return: Tuple of the fdsn Client objects.
:rtype: Tuple[Client]
"""
# That bit is stolen from the massdownloader
if isinstance(clients, str):
clients = [clients]
# That bit is stolen from the massdownloader
elif clients is None:
providers = dict(URL_MAPPINGS.items())
_p = []
if "RASPISHAKE" in providers:
# exclude RASPISHAKE by default
del providers["RASPISHAKE"]
if "IRIS" in providers:
has_iris = True
del providers["IRIS"]
else:
has_iris = False
if "ODC" in providers:
providers["ORFEUS"] = providers["ODC"]
del providers["ODC"]
if "ORFEUS" in providers:
has_orfeus = True
del providers["ORFEUS"]
else:
has_orfeus = False
_p = sorted(providers)
if has_orfeus:
_p.append("ORFEUS")
if has_iris:
_p.append("IRIS")
providers = _p
clients = tuple(providers)
return clients
[docs]def create_bulk_str(
networks: tp.Union[str, List[str]], stations: tp.Union[str, List[str]],
location: str, channel: tp.Union[str, List[str]],
t0: UTCDateTime or str or List[UTCDateTime],
t1: UTCDateTime or str or List[UTCDateTime]) -> List[tuple]:
"""
Function to generate the input for the obspy functions:
get_stations_bulk() and get_waveforms_bulk().
:param networks: The requested networks, can be str or List.
:type networks: str or List[str]
:param stations: The requested stations, can be str or List.
:type stations: str or List[str]
:param location: Location string
:type location: str
:param channel: Channel string
:type channel: str
:param t0: starttimes
:type t0: UTCDateTime or str or List[UTCDateTime]
:param t1: endtimes
:type t1: UTCDateTime or str or List[UTCDateTime]
:raises ValueError: For invalid input types or inputs
:return: The List to be used as input with the aforementioned functions.
:rtype: List[tuple]
.. note::
All parameters accept wildcards.
"""
# request object
bulk = []
if isinstance(t0, str) and t0 != '*':
t0 = UTCDateTime(t0)
elif isinstance(t0, list):
t0 = [UTCDateTime(t) for t in t0]
if isinstance(t1, str) and t1 != '*':
t1 = UTCDateTime(t1)
elif isinstance(t1, list):
t1 = [UTCDateTime(t) for t in t1]
if isinstance(networks, list) and isinstance(stations, list):
if len(networks) != len(stations):
raise ValueError(
'If network and station are provided as lists, they have to\
have the same length!')
if (isinstance(t1, list) and isinstance(t0, list)) \
or type(t1) != type(t0):
if len(stations) != len(t1) or len(t1) != len(t0) \
or type(t1) != type(t0):
raise ValueError('Time Lists have to have same length!')
for net, stat, st, et in zip(networks, stations, t0, t1):
bulk.append((net, stat, location, channel, st, et))
return bulk
elif isinstance(t0, (str, UTCDateTime)) and isinstance(
t1, (str, UTCDateTime)):
for net, stat in zip(networks, stations):
bulk.append((net, stat, location, channel, t0, t1))
elif isinstance(networks, list) and stations == '*':
if (isinstance(t1, list) and isinstance(t0, list)) \
or type(t1) != type(t0):
if len(networks) != len(t1) or len(t1) != len(t0) \
or type(t1) != type(t0):
raise ValueError('Time Lists have to have same length!')
for net, st, et in zip(networks, t0, t1):
bulk.append((net, stations, location, channel, st, et))
return bulk
elif isinstance(t0, (str, UTCDateTime)) and isinstance(
t1, (str, UTCDateTime)):
for net in networks:
bulk.append((net, stations, location, channel, t0, t1))
elif isinstance(stations, list) and isinstance(networks, str):
if (isinstance(t1, list) and isinstance(t0, list)) \
or type(t0) != type(t1):
if len(stations) != len(t1) or len(t1) != len(t0) \
or type(t0) != type(t1):
raise ValueError('Time Lists have to have same length!')
for stat, st, et in zip(stations, t0, t1):
bulk.append((networks, stat, location, channel, st, et))
elif isinstance(t0, (str, UTCDateTime)) and isinstance(
t1, (str, UTCDateTime)):
for stat in stations:
bulk.append((networks, stat, location, channel, t0, t1))
elif isinstance(stations, str) and isinstance(networks, str):
if (isinstance(t1, list) and isinstance(t0, list)) \
or type(t1) != type(t0):
if len(t1) != len(t0) or type(t1) != type(t0):
raise ValueError('Time Lists have to have same length!')
for st, et in zip(t0, t1):
bulk.append((networks, stations, location, channel, st, et))
elif isinstance(t0, (str, UTCDateTime)) and isinstance(
t1, (str, UTCDateTime)):
bulk.append((networks, stations, location, channel, t0, t1))
else:
raise ValueError('Invalid combination of input types or input length.\
\nCheck the following:\n\t1. If all inputs are lists, do they have the same \
length?\n\t2. If stations is a string and not a wildcard (i.e., *), networks \
has to be a string as well.')
return bulk
[docs]def cos_taper_st(
st: Stream, taper_len: float, taper_at_masked: bool,
side: str = 'both') -> Stream:
"""
Applies a cosine taper to the input Stream.
:param tr: Input Stream
:type tr: :class:`~obspy.core.stream.Stream`
:param taper_len: Length of the taper per side
:type taper_len: float
:param taper_at_masked: applies a split to each trace and merges again
afterwards
:type taper_at_masked: bool
:return: Tapered Stream
:rtype: :class:`~obspy.core.stream.Stream`
.. note::
This action is performed in place. If you want to keep the
original data use :func:`~obspy.core.stream.Stream.copy`.
"""
if isinstance(st, Trace):
st = Stream([st])
elif isinstance(st, RFTrace):
st = RFStream([st])
for ii, _ in enumerate(st):
try:
st[ii] = cos_taper(st[ii], taper_len, taper_at_masked, side)
except ValueError as e:
warn('%s, corresponding trace not tapered.' % e)
return st
[docs]def cos_taper(
tr: Trace, taper_len: float, taper_at_masked: bool,
side: str = 'both') -> Trace:
"""
Applies a cosine taper to the input trace.
:param tr: Input Trace
:type tr: Trace
:param taper_len: Length of the taper per side in seconds
:type taper_len: float
:param taper_at_masked: applies a split to each trace and merges again
afterwards
:type taper_at_masked: bool
:return: Tapered Trace
:rtype: Trace
.. note::
This action is performed in place. If you want to keep the
original data use :func:`~obspy.core.trace.Trace.copy`.
"""
if taper_len <= 0:
raise ValueError('Taper length must be larger than 0 s')
if taper_at_masked:
st = tr.split()
st = cos_taper_st(st, taper_len, False)
st = st.merge()
if st.count():
tr.data = st[0].data
return tr
else:
raise ValueError('Taper length must be larger than 0 s')
taper = np.ones_like(tr.data)
tl_n = round(taper_len*tr.stats.sampling_rate)
if tl_n * 2 > tr.stats.npts:
raise ValueError(
'Taper Length * 2 has to be smaller or equal to trace\'s length.')
tap = np.sin(np.linspace(0, np.pi, tl_n*2))
if side in ['left', 'both']:
taper[:tl_n] = tap[:tl_n]
if side in ['right', 'both']:
taper[-tl_n:] = tap[-tl_n:]
tr.data = np.multiply(tr.data, taper)
return tr