'''
:copyright:
The PyGLImER development team (makus@gfz-potsdam.de).
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
:author:
Peter Makus (makus@gfz-potsdam.de)
Created: Wednesday, 11th August 2021 03:20:09 pm
Last Modified: Friday, 16th September 2022 02:47:40 pm
'''
import fnmatch
import os
import re
from typing import Iterable, List, Tuple
import logging
import warnings
import numpy as np
# from numpy.core.fromnumeric import compress
from obspy.core.utcdatetime import UTCDateTime
from obspy.core import Stats
import h5py
from pyglimer.rf.create import RFStream, RFTrace
hierarchy = "/{tag}/{network}/{station}/{phase}/{pol}/{evt_time}"
h5_FMTSTR = os.path.join("{dir}", "{network}.{station}.h5")
[docs]class DBHandler(h5py.File):
"""
The actual file handler of the hdf5 receiver function files.
.. warning::
**Should not be accessed directly. Access
:class:`~pyglimer.database.rfh5.RFDataBase` instead.**
Child object of :class:`h5py.File` and inherets all its attributes and
functions in addition to functions that are particularly useful for
receiver functions.
"""
def __init__(self, path, mode, compression):
super(DBHandler, self).__init__(path, mode=mode)
if isinstance(compression, str):
self.compression = re.findall(r'(\w+?)(\d+)', compression)[0][0]
if self.compression != 'gzip':
raise ValueError(
'Compression of type %s is not supported.'
% self.compression)
self.compression_opts = int(
re.findall(r'(\w+?)(\d+)', compression)[0][1])
if self.compression_opts not in np.arange(1, 10, 1, dtype=int):
ii = np.argmin(abs(
np.arange(1, 10, 1, dtype=int) - self.compression_opts))
self.compression_opts = np.arange(1, 10, 1, dtype=int)[ii]
warnings.warn(
'Chosen compression level is not available for %s. \
%s Has been chosen instead (closest)' % (
self.compression, str(self.compression_opts)))
else:
self.compression = None
self.compression_opts = None
def _close(self):
self.close()
def _add_known_waveform_data(self, ret: List[str], rej: List[str]):
"""
Known waveforms to receive. Is just a dictionary with two keys (each
of which has a list as item with all retained or rejected waveforms).
:param ret: list of retained waveforms
:type ret: List[str]
:param rej: List of rejeccted waveforms
type rej: List[str]
"""
try:
ds = self.create_dataset('known', data=np.empty(1))
except ValueError:
ds = self['known']
# Already existing, just change attributes
ds.attrs['ret'] = str(ret)
ds.attrs['rej'] = str(rej)
[docs] def add_rf(
self, data: RFTrace or RFStream, tag: str = 'rf'):
"""
Add receiver function to the hdf5 file. The data can later be accessed
using the :meth:`~pyglimer.database.rfh5.DBHandler.get_data()` method.
:param data: Data to save. Either a
:class:`~pyglimer.rf.create.RFTrace` object or a
:class:`~pyglimer.rf.create.RFStream` holding one or
several traces.
:type data: RFTrace or RFStream
:param tag: The tag that the data should be saved under. By convention,
receiver functions are saved with the tag `'rf'`.
:raises TypeError: for wrong data type.
"""
if not isinstance(data, RFTrace) and\
not isinstance(data, RFStream):
raise TypeError('Data has to be either a \
:class:`~pyglimer.rf.create.RFTrace` object or a \
:class:`~~pyglimer.rf.create.RFStream` object')
if isinstance(data, RFTrace):
data = [data]
for tr in data:
st = tr.stats
path = hierarchy.format(
tag=tag,
network=st.network, station=st.station, phase=st.phase,
pol=st.pol, evt_time=st.event_time.format_fissures())
try:
ds = self.create_dataset(
path, data=tr.data, compression=self.compression,
compression_opts=self.compression_opts)
convert_header_to_hdf5(ds, st)
except ValueError as e:
print(e)
warnings.warn("The dataset %s is already in file and will be \
omitted." % path, category=UserWarning)
[docs] def get_data(
self, network: str, station: str, phase: str, evt_time: UTCDateTime,
tag: str = 'rf', pol: str = 'v') -> RFStream:
"""
Returns a :class:`~pyglimer.rf.create.RFStream` holding
all the requested data.
.. note::
`Wildcards are allowed for all parameters`.
:param network: network code, e.g., IU
:type network: str
:param station: station code, e.g., HRV
:type station: str
:param phase: Teleseismic phase
:type phase: str
:param evt_time: Origin Time of the Event
:type evt_time: UTCDateTime, optional
:param tag: Data tag (e.g., 'rf'). Defaults to rf.
:type tag: str, optional
:param pol: RF Polarisation. Defaults to v.
:type pol: str, optional
:return: a :class:`~pyglimer.rf.create.RFStream` holding the requested
data.
:rtype: RFStream
"""
try:
evt_time = UTCDateTime(evt_time)
evt_time = evt_time.format_fissures()
except TypeError:
evt_time = '*'
path = hierarchy.format(
tag=tag, network=network, station=station, phase=phase,
pol=pol, evt_time=evt_time)
# Extremely ugly way of changing the path
if '*' not in path:
data = np.array(self[path])
header = read_hdf5_header(self[path])
return RFStream(RFTrace(data, header=header))
# Now, we need to differ between the fnmatch pattern and the actually
# acessed path
pattern = path.replace('/*', '*')
path = path.split('*')[0]
try:
return all_traces_recursive(self[path], RFStream(), pattern)
except KeyError:
warnings.warn(
f'Could not find data from {network}.{station} for phase '
+ f'{phase}, tag {tag}, polarisation {pol}, and time '
+ f'{evt_time}. Returning empty Stream.')
return RFStream()
[docs] def get_coords(
self, network: str, station: str, phase: str = None,
tag: str = 'rf') -> Tuple[float, float, float]:
"""
Return the coordinates of the station.
:param network: Network Code.
:type network: str
:param station: Station Code
:type station: str
:param phase: Teleseismic Phase, defaults to None
:type phase: str, optional
:return: Latitude (dec deg), Longitude (dec deg), Elevation (m)
:rtype: Tuple[ float, float, float]
"""
# This might look confusing but it's actually not looping but just
# choosing the first available file
try:
for ph in self[tag][network][station].keys():
phi = phase or ph
for pol in self[tag][network][station][phi].keys():
for t in self[tag][network][station][phi][pol].keys():
rf = self.get_data(
network, station, phi, t, tag, pol)
if not rf.count():
continue
return (
rf[0].stats.station_latitude,
rf[0].stats.station_longitude,
rf[0].stats.station_elevation)
except KeyError:
# No data?
warnings.warn(
'No Data for station %s.%s and phase %s. Returns None.' % (
network, station, phase
))
return None, None, None
def _get_known_waveforms(self) -> Tuple[List[str], List[str]]:
try:
ds = self['known']
rej = eval(ds.attrs['rej'])
ret = eval(ds.attrs['ret'])
except (KeyError, AttributeError):
ret = []
rej = []
return ret, rej
[docs] def walk(
self, tag: str, network: str, station: str, phase: str,
pol: str = 'v') -> Iterable[RFTrace]:
"""
Iterate over all receiver functions with the given properties.
:param tag: data tag
:type tag: str
:param network: Network code
:type network: str
:param station: Statio ncode
:type station: str
:param phase: Teleseismic phase
:type phase: str
:param pol: RF-Polarisation, defaults to 'v'
:type pol: str, optional
:return: Iterator
:rtype: Iterable[RFTrace]
:yield: one :class:`~pyglimer.rf.create.RFTrace` per receiver function.
:rtype: Iterator[Iterable[RFTrace]]
.. note::
Does not accept wildcards.
"""
for v in self[tag][network][station][phase][pol].values():
yield RFTrace(np.array(v), header=read_hdf5_header(v))
[docs]class RFDataBase(object):
"""
Base class to handle the hdf5 files that contain receiver functions.
"""
def __init__(
self, path: str, mode: str = 'a', compression: str = 'gzip3'):
"""
Access an hdf5 file holding receiver functions. The resulting file can
be accessed using all functionalities of
`h5py <https://www.h5py.org/>`_ (for example as a dict).
:param path: Full path to the file
:type path: str
:param mode: Mode to access the file. Options are: 'a' for all, 'w' for
write, 'r+' for writing in an already existing file, or 'r' for
read-only , defaults to 'a'.
:type mode: str, optional
:param compression: The compression algorithm and compression level
that the arrays should be saved with. 'gzip3' tends to perform
well, else you could choose 'gzipx' where x is a digit between
1 and 9 (i.e., 9 is the highest compression) or None for fastest
perfomance, defaults to 'gzip3'.
:type compression: str, optional
.. warning::
**Access only through a context manager (see below):**
>>> with RFDataBase('myfile.h5') as rfdb:
>>> type(rfdb) # This is a DBHandler
<class 'pyglimer.database.rfh5.DBHandler'>
Example::
>>> with RFDataBase(
'/path/to/db/XN.NEP06.h5') as rfdb:
>>> # find the available tags for existing db
>>> print(list(rfdb.keys()))
['rf', 'rfstack']
>>> # Get Data from all times and tag rf, phase P
>>> st = rfdb.get_data(
>>> 'XN', 'NEP06', 'P', '*', 'rf')
>>> print(st.count())
250
"""
# Create / read file
if not path.split('.')[-1] == 'h5':
path += '.h5'
self.path = path
self.mode = mode
self.compression = compression
def __enter__(self) -> DBHandler:
self.db_handler = DBHandler(
self.path, self.mode, self.compression)
return self.db_handler
def __exit__(self, exc_type, exc_value, tb) -> None or bool:
self.db_handler._close()
if exc_type is not None:
return False
[docs]def all_traces_recursive(
group: h5py._hl.group.Group, stream: RFStream,
pattern: str) -> RFStream:
"""
Recursively, appends all traces in a h5py group to the input stream.
In addition this will check whether the data matches a certain pattern.
:param group: group to search through
:type group: class:`h5py._hl.group.Group`
:param stream: Stream to append the traces to
:type stream: Stream
:param pattern: pattern for the path in the hdf5 file, see fnmatch for
details.
:type pattern: str
:return: Stream with appended traces
:rtype: Stream
"""
for v in group.values():
if isinstance(v, h5py._hl.group.Group):
all_traces_recursive(v, stream, pattern)
elif not fnmatch.fnmatch(v.name, pattern) and v.name not in pattern:
continue
else:
# try:
stream.append(
RFTrace(np.array(v), header=read_hdf5_header(v)))
# except ValueError:
# warnings.warn(
# 'Header could not be converted. Attributes are: %s' % (
# str(v.attrs)))
return stream