Source code for pyglimer.waveform.download

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
:copyright:
   The PyGLImER development team (makus@gfz-potsdam.de).
:license:
   GNU Lesser General Public License, Version 3
   (https://www.gnu.org/copyleft/lesser.html)
:author:
    Peter Makus (makus@gfz-potsdam.de)

Created: Tue May 26 2019 13:31:30
Last Modified: Tuesday, 25th October 2022 12:29:11 pm
'''

from multiprocessing import Event
import typing as tp
from http.client import IncompleteRead
import logging
import os
from functools import partial
from itertools import compress
from tqdm import tqdm

from joblib import Parallel, delayed
import psutil
from obspy import read
from obspy import UTCDateTime
from obspy.clients.fdsn.mass_downloader import CircularDomain, \
    Restrictions, MassDownloader
from obspy.core.event.catalog import Catalog
from obspy.core.inventory.inventory import Inventory
from obspy.taup import TauPyModel

from pyglimer.database.raw import RawDatabase, mseed_to_hdf5
from pyglimer import tmp
from pyglimer.utils.roundhalf import roundhalf
from pyglimer.utils import utils as pu
from pyglimer.utils.utils import download_full_inventory
from pyglimer.waveform.preprocessh5 import compute_toa


def ____check_times_small_db_event(
        rawloc: str, tz: float, ta: float, phase: str, model: TauPyModel,
        logger: logging.Logger, saveh5: bool, mintime: float, maxtime: float,
        inv: Inventory, net: str, stat: str, channels: tp.List[str],
        evt: Event, av_data_manual: tp.Union[dict, None] = None) \
        -> tp.Tuple[UTCDateTime, tp.Union[tp.List[str]], bool]:

    """Checks whether event already in database and if not compute toa and
    return missing channels.

    Parameters
    ----------
    rawloc : str
        path to waveform folder in the database
    tz : float
        padding before arrival
    ta : float
        padding after arrival
    phase : str
        phases to check for
    model : TauPyModel
        model, from obspy
    logger : logging.Logger
        logger for log messages
    saveh5 : bool
        whether to check in the hdf5 database or the miniseed database
    mintime : float
        mintime
    maxtime : float
        maxtime
    inv : Inventory
        station inventory
    net : str
        network name
    stat : str
        station name
    channels : tp.List[str]
        channels
    evt : Event
        event to check arrivals from
    av_data_manual : dict | None, optional
        if not None the events are checked in parallel, by default None

    Returns
    -------
    tp.Tuple[UTCDateTime, tp.List[str]] | bool
        toa, list of channels to request

    """

    # Get origin
    o = (evt.preferred_origin() or evt.origins[0])

    # Make the evt id a global variable for the sub functions if the stations
    # are parallelized
    if not av_data_manual:
        global evt_id

    # Get evt_id
    evt_id = pu.utc_save_str(o.time)

    # Channel fix, not all channels are equally present at in a given time
    # range
    channels_fix = []
    for _cha in channels:
        try:
            STA = inv.select(
                network=net, station=stat, channel=_cha,
                starttime=o.time+mintime-tz, endtime=o.time+maxtime+ta)[0][0]
            channels_fix.append(_cha)
        except Exception:
            logger.debug(f'Channel {net}.{stat}..{_cha} not available for '
                         f'event {str(evt.resource_id).split("=")[-1]}')

    # Already in DB?
    if saveh5:
        # Check if all expected channels are in saved already
        checklist = []
        for _channel in channels_fix:

            # For event parallelism to separate functions.
            if av_data_manual is None:
                indb = wav_in_hdf5(rawloc, net, stat, '*', _channel)
            else:
                indb = evt_id in av_data_manual[net][stat][_channel]

            logger.debug(
                f'||----> {net}.{stat}..{_channel} - {evt_id} in file: {indb}')

            # Add to checklist whether a channel is in the database
            checklist.append(indb)

        if all(checklist):
            logger.info(f'Already in database: {net}.{stat} '
                        f'event={str(evt.resource_id).split("=")[-1]}')
            return False

    # If dataformat is mseed
    else:
        ot_loc = UTCDateTime(
            o.time, precision=-1).format_fissures()[:-6]
        evtlat_loc = str(roundhalf(o.latitude))
        evtlon_loc = str(roundhalf(o.longitude))
        tmp.folder = os.path.join(
            rawloc, '%s_%s_%s' % (ot_loc, evtlat_loc, evtlon_loc))

        # create folder for each event
        os.makedirs(tmp.folder, exist_ok=True)

        # Check if all expected channels are in saved already
        checklist = []

        for _channel in channels_fix:
            # Here we don't need a separate function because each event just
            # checks the existence of a file, which requires
            # no parallel access.
            checklist.append(wav_in_db(net, stat, '*', _channel))

        if all(checklist):
            logger.info(
                f'Already in database: {net}.{stat} '
                f'event={str(evt.resource_id).split("=")[-1]}')
            return False

    # Get required channels
    # Add a list of channels
    addchannels = []
    for _channel, _check in zip(channels_fix, checklist):
        if not _check:
            addchannels.append(_channel)

    # If the file is not in the database compute the TOA
    try:

        logger.debug(f"Computing TOA for {evt.resource_id} and {net}.{stat}")

        # Get origin dependent station
        STA = inv.select(
            network=net, station=stat,
            starttime=o.time+mintime-tz, endtime=o.time+maxtime+ta)[0][0]

        # Compute the Time of arrival
        toa, _, _, _, _ = compute_toa(
            evt, STA.latitude, STA.longitude, phase, model)

    except (IndexError, ValueError):

        # occurs when there is no arrival of the phase at stat
        logger.debug(
            'No valid arrival found for station %s,' % stat
            + 'event %s, and phase %s' % (evt.resource_id, phase))
        return False

    # Finally return the time of arrival and the channels to be downloaded
    return toa, addchannels


def __check_times_small_db_sub(
        event_cat: Catalog,
        rawloc: str, tz: float, ta: float, phase: str, model: TauPyModel,
        logger: logging.Logger, saveh5: bool, mintime: float, maxtime: float,
        inv: Inventory, net: str, stat: str, channels: tp.List[str],
        parallel: bool = False) -> dict:
    """loop over events

    Parameters
    ----------
    event_cat : Catalog
        catalog of events
    rawloc : str
        path to raw waveform folder in database
    tz : float
        padding before
    ta : float
        padding after
    phase : str
        phase to requested
    model : TauPyModel
        obspy 1d traveltime model
    logger : logging.Logger
        logger
    saveh5 : bool
        whether to save/check the saveh5
    mintime : float
        minimum time after an event
    maxtime : float
        maximum time after an event
    inv : Inventory
        station inventory
    net : str
        network string
    stat : str
        station string
    channels : tp.List[str]
        list of channels for net.sta
    parallel : bool, optional
        if true parallelize event loop, if false iterate over events,
        by default False

    Returns
    -------
    dict
        _description_
    """

    d = {
        'event': [],
        'startt': [], 'endt': [],
        'net': [], 'stat': [], 'chan': []}

    # Get origin times
    # otimes = [_o.time _o in [evt.preferred_origin() or evt.origins[0]
    # for evt in event_cat.events]]

    print(f'--> Enter TOA event loop for station {net}.{stat}')

    # This parallelizes the event loop, which is only use if we downloading
    # data for a single station
    if parallel:
        # Get HDF5 content

        # H5 file location
        h5_file = os.path.join(rawloc, '%s.%s.h5' % (
            net, stat))

        # Check available data from this station
        av_data_manual = {}
        av_data_manual.setdefault(net, {})
        av_data_manual[net].setdefault(stat, {})

        if not os.path.isfile(h5_file):
            logging.debug(f'{h5_file} not found')
            for _cha in channels:
                av_data_manual[net][stat][_cha] = list()
        else:
            # The file exists, so we will have to open it and get
            # the dictionary
            with RawDatabase(h5_file) as rdb:
                av_data_manual[net][stat] = rdb._get_table_of_contents()

                # Safety net for when the channel list is empty for some
                # reason.
                if not av_data_manual[net][stat]:
                    for _cha in channels:
                        av_data_manual[net][stat][_cha] = list()

        # Get number of cores available
        NCPU = psutil.cpu_count(logical=False) - 2
        NCPU = 1 if NCPU < 1 else NCPU

        # Create partial function
        dsub = partial(
            ____check_times_small_db_event,
            rawloc, tz, ta, phase, model, logger, saveh5, mintime, maxtime,
            inv, net, stat, channels)

        # Run parallel event loop
        out = Parallel(n_jobs=NCPU, backend='multiprocessing')(
            delayed(dsub)(_evt, av_data_manual)
            for _evt in event_cat)

        for evt, _out in zip(event_cat, out):

            if not _out:
                continue
            else:
                toa, addchannels = _out

            # It's new data, so add to request!
            d['event'].append(evt)
            d['startt'].append(toa-tz)
            d['endt'].append(toa+ta)
            d['net'].append(net)
            d['stat'].append(stat)
            d['chan'].append(addchannels)

    # Here the event loop is serial, because the stations are parallelized
    else:

        # Information on available data for hdf5
        global av_data
        av_data = {}

        # Loop over events (probably because station loop is parallelized.)
        for evt in event_cat:

            out = ____check_times_small_db_event(
                rawloc, tz, ta, phase, model, logger, saveh5, mintime, maxtime,
                inv, net, stat, channels, evt)

            if not out:
                continue
            else:
                toa, addchannels = out

            # It's new data, so add to request!
            d['event'].append(evt)
            d['startt'].append(toa-tz)
            d['endt'].append(toa+ta)
            d['net'].append(net)
            d['stat'].append(stat)
            d['chan'].append(addchannels)

    def printd(d: dict, logger):
        keys = list(d.keys())
        lists = (v for v in d.values())

        net_idx = keys.index('net')
        sta_idx = keys.index('stat')
        cha_idx = keys.index('chan')
        st_idx = keys.index('startt')
        et_idx = keys.index('endt')

        logger.debug("====================================================")
        for _d in zip(*lists):
            logger.debug(f"||  {_d[net_idx]}.{_d[sta_idx]}..{_d[cha_idx]}: "
                         f"{_d[st_idx]} -- {_d[et_idx]}")
        logger.debug("====================================================")

    # Print TOAs before check overlap
    logger.debug(f'Final list of found arrival times for station {net}.{stat}')
    if logger.level < 20:
        printd(d, logger)

    # Filtering overlaps
    dd = filter_overlapping_times(d)

    # Print TOAs after checking overlaps
    logger.debug('After overlap')
    if logger.level < 20:
        printd(dd, logger)

    # Logging the events found by overlap
    logger.info(f'Found {len(d["net"])-len(dd["net"])} overlaps '
                f'at {net}.{stat}')

    return dd


[docs]def filter_overlapping_times(d): """Filters the found dictionary for overlapping windows. Dictionary looks as follows: ``d = {'event': [], 'startt': [], 'endt': [], 'net': [], 'stat': []}``. """ # Check overlapping windows and set checkovelap to False if there is # overlap checkoverlap = pu.check_UTC_overlap(d['startt'], d['endt']) # Create filtered dictionary dfilt = dict() dfilt['event'] = list(compress(d['event'], checkoverlap)) dfilt['net'] = list(compress(d['net'], checkoverlap)) dfilt['stat'] = list(compress(d['stat'], checkoverlap)) dfilt['chan'] = list(compress(d['chan'], checkoverlap)) dfilt['startt'] = list(compress(d['startt'], checkoverlap)) dfilt['endt'] = list(compress(d['endt'], checkoverlap)) return dfilt
[docs]def inv2uniqlists(inv: Inventory): """Creates a list of unique station and channel lists from an inventory.""" # logging logger = logging.getLogger('pyglimer.request') # Create a dictionary of unique networks/stations dinv = dict() for net in inv: # If not in dictionary add network to dictionary if net.code in dinv: pass else: dinv[net.code] = dict() # If not in dictionary add station to dictionary for stat in net: if stat.code in dinv[net.code]: pass else: dinv[net.code][stat.code] = set() # If not in dictionary add station to dictionary for _channel in stat: dinv[net.code][stat.code].add(_channel.code) # Create lists for parallel computation networks, stations, channels, subinvs = [], [], [], [] netsta_str = [] for _net, _stations in dinv.items(): for _sta, _channels in _stations.items(): # Get actual network and station from inventory subinv = inv.select(network=_net, station=_sta) # Add stuff to computations lists networks.append(_net) stations.append(_sta) netsta_str.append(f'{_net}.{_sta}') channels.append(list(_channels)) # The inventory now only includes the relevant station entries # It is important to KEEP ALL STATION ENTRIES because the location # can change and time the inventory was online. # We divide into subinvs so that parallel I/O is faster. subinvs.append(subinv) # Print debugging message logger.debug(f" --- {_net}.{_sta}") for _cha in _channels: logger.debug(f" |---- {_cha}") return networks, stations, channels, subinvs, netsta_str
[docs]def create_bulk_list(netsta_d: tp.List[dict]): """Takes in a list of dictionaries, to create a bulk request list of dictionaries that contain station info and bulk request strings.""" # Fix the lists to check to add request per channel bulk_list = [] fullbulk = [] # Loop over station dictionaries for _d in netsta_d: # Create new set of lists nets, stats, chans, st, et, evts = [], [], [], [], [], [] # Loop over stations for _net, _sta, _channels, _st, _et, _evt in zip( _d['net'], _d['stat'], _d['chan'], _d['startt'], _d['endt'], _d['event']): # Loop over channels for _cha in _channels: nets.append(_net) stats.append(_sta) chans.append(_cha) st.append(_st) et.append(_et) evts.append(_evt) # Create new dictionary with channelwise bulk requests bulk_dict = dict() bulk_dict['net'] = nets bulk_dict['stat'] = stats bulk_dict['chan'] = chans bulk_dict['startt'] = st bulk_dict['endt'] = et bulk_dict['event'] = evts # one list per net.sta.cha start/endtime mini_bulk_list = [] for _nets, _stats, _chans, _st, _et \ in zip(nets, stats, chans, st, et): # Get list of bulk strings associated with net.sta.... combo subbulk = pu.create_bulk_str( _nets, _stats, '*', _chans, _st, _et) # Add list to list of bulk associated with corresponding # net.sta... combo mini_bulk_list.append(subbulk) # Get full number of bulk requests by extending a list of all # requests. fullbulk.extend(subbulk) # Add list to bulk bulk_dict['bulk'] = mini_bulk_list # Append the new dictionary to the bulk_list bulk_list.append(bulk_dict) # The full number of bulk requests Nbulk = len(fullbulk) return bulk_list, Nbulk
[docs]def download_small_db( phase: str, min_epid: float, max_epid: float, model: TauPyModel, event_cat: Catalog, tz: float, ta: float, statloc: str, rawloc: str, clients: list, network: tp.Union[str, tp.List[str]], station: tp.Union[str, tp.List[str]], channel: str, saveh5: bool): """ see corresponding method :meth:`~pyglimer.waveform.request.Request.\ download_waveforms_small_db` """ # Sort event catalog after origin time idx_list = sorted( range(len(event_cat)), key=lambda k: ( event_cat[k].preferred_origin() or event_cat[k].origins[0]).time.format_fissures()) # Remake event catalog into sorted event catalog event_cat = Catalog([event_cat[idx] for idx in idx_list]) # Calculate the min and max theoretical arrival time after event time # according to minimum and maximum epicentral distance mintime = model.get_travel_times( source_depth_in_km=500, distance_in_degree=min_epid, phase_list=[phase])[0].time - tz maxtime = model.get_travel_times( source_depth_in_km=0.001, distance_in_degree=max_epid, phase_list=[phase])[0].time + ta # logging logger = logging.getLogger('pyglimer.request') # If station and network are None station = station or '*' network = network or '*' # First we download the stations to subsequently compute the times of # theoretical arrival clients = pu.get_multiple_fdsn_clients(clients) logger.info('Requesting data from the following FDSN servers:\n %s' % str( clients)) # Find earliest start and end times otimes = [evt.preferred_origin().time for evt in event_cat.events] bulk_stat = pu.create_bulk_str( network, station, '*', channel, min(otimes), max(otimes)) logger.info('Bulk_stat parameter created.') logger.debug('Bulk stat parameters: %s' % str(bulk_stat)) logger.info('Initialising station response download.') # Create Station Output folder os.makedirs(statloc, exist_ok=True) # Get number of cores available NCPU = psutil.cpu_count(logical=False) - 2 NCPU = 1 if NCPU < 1 else NCPU # Run single thread station if len(clients) == 1: logger.debug('Running single thread station loop') inv = pu.__client__loop__(clients[0], statloc, bulk_stat) # or run parallel station loop. else: logger.debug('Running parallel station loop') out = Parallel(n_jobs=NCPU, prefer='multiprocessing')( delayed(pu.__client__loop__)(client, statloc, bulk_stat) for client in clients) inv = pu.join_inv([inv for inv in out]) # Create list of unique networks, stations, channels, subinvs, netsta_str = inv2uniqlists(inv) # Multiple stations MULT = True if len(set(netsta_str)) > 1 else False # Get bulk string for a single station if not MULT: logger.info( 'Computing TOA and checking available data single threaded') # Empty network dict d = dict() d['net'], d['stat'], d['chan'], d['startt'], d['endt'] \ = [], [], [], [], [] # Since we only have a single station we can loop over the events d = __check_times_small_db_sub( event_cat, rawloc, tz, ta, phase, model, logger, saveh5, mintime, maxtime, subinvs[0], networks[0], stations[0], channels[0], parallel=True) # Since theres only a single station, there's also only a single # dictionary but we make it into a list to conform with the rest of the # code netsta_d = [d, ] # Get bulk string for a one station at a time in parallel else: logger.info('Computing TOA and checking available data in parallel') # Create partial function with the input that is the same for all # function calls dsub = partial( __check_times_small_db_sub, event_cat, rawloc, tz, ta, phase, model, logger, saveh5, mintime, maxtime) # Run partial function in parallel, resulting in one # dictionary per net/sta combo netsta_d = Parallel(n_jobs=NCPU, backend='multiprocessing')( delayed(dsub)(_subinv, _net, _sta, _channels) for _subinv, _net, _sta, _channels in zip(subinvs, networks, stations, channels)) # Transform the list of network.station dictionaries containing lists # of channels to a list of network.station dictionaries that lists channels # individually and contain the bulk request parameters. bulk_list, Nbulk = create_bulk_list(netsta_d) # End here if there aren't any bulk requests. if Nbulk == 0: logger.info('No new data found.') return # Else log the number of total requests to be made else: logger.info(f'A total of {Nbulk} requests will be made.') # Log the final bulk strings if logger.level < 20: logger.debug('The request string looks like this:') for _d in bulk_list: for _bulk in _d['bulk']: for _bw in _bulk: logger.debug(f"{_bw}") # This does almost certainly need to be split up, so we don't overload the # RAM with the downloaded mseeds logger.info('Initialising waveform download.') # Create waveform directories os.makedirs(rawloc, exist_ok=True) # If the number of clients is 1, parallelize either stations, or events if # we download a single station if len(clients) == 1: # Single Station, parallellize events -> kwarg parallel=True if not MULT: pu.__client__loop_wav__( clients[0], rawloc, bulk_list[0], saveh5, subinvs[0], network=networks[0], station=stations[0], parallel=True) # Multiple Stations, parallellize stations directly else: # Download multiple stations in parallel Parallel(n_jobs=NCPU, backend='multiprocessing')( delayed(pu.__client__loop_wav__)( clients[0], rawloc, _bulk_dict, saveh5, _subinv, network=_net, station=_sta) for _subinv, _net, _sta, _bulk_dict in zip(subinvs, networks, stations, bulk_list)) # If there are multiple clients, parallelize the clients else: # Length of request N = len(bulk_list) # Number of digits Nd = len(str(N)) # Download station chunks chunks for _i, (_subinv, _net, _sta, _bulk_dict) in enumerate( zip(subinvs, networks, stations, bulk_list)): # Provide status logger.info(f"Downloading ... {_i:{Nd}d}/{N:d}") # Download Parallel(n_jobs=NCPU, backend='multiprocessing')( delayed(pu.__client__loop_wav__)( client, rawloc, _bulk_dict, saveh5, _subinv, network=_net.code, station=_sta.code) for client in clients) logger.info(f"Downloaded {_i:{Nd}d}/{N:d}")
[docs]def downloadwav( phase: str, min_epid: float, max_epid: float, model: TauPyModel, event_cat: Catalog, tz: float, ta: float, statloc: str, rawloc: str, clients: list, evtfile: str, network: str = None, station: str = None, saveasdf: bool = False, log_fh: logging.FileHandler = None, loglvl: int = logging.WARNING, verbose: bool = False, fast_redownload: bool = False): """ Downloads the waveforms for all events in the catalogue for a circular domain around the epicentre with defined epicentral distances from Clients defined in clients. Also Station xmls for corresponding stations are downloaded. Parameters ---------- phase : string Arrival phase to be used. P, S, SKS, or ScS. min_epid : float Minimal epicentral distance to be downloaded. max_epid : float Maxmimal epicentral distance to be downloaded. model : obspy.taup.TauPyModel 1D velocity model to calculate arrival. event_cat : Obspy event catalog Catalog containing all events, for which waveforms should be downloaded. tz : int time window before first arrival to download (seconds) ta : int time window after first arrival to download (seconds) statloc : string Directory containing the station xmls. rawloc : string Directory containing the raw seismograms. clients : list List of FDSN servers. See obspy.Client documentation for acronyms. network : string or list, optional Network restrictions. Only download from these networks, wildcards allowed. The default is None. station : string or list, optional Only allowed if network != None. Station restrictions. Only download from these stations, wildcards are allowed. The default is None. saveasdf : bool, optional Save the dataset as Adaptable Seismic Data Format (asdf; recommended). Else, one will be left with .mseeds. log_fh : logging.FileHandler, optional file handler to be used for the massdownloader logger. loglvl : int, optional Use this logging level. verbose: Bool, optional Set True, when experiencing issues with download. Output of obspy MassDownloader will be logged in download.log. Returns ------- None """ # needed to check whether data is already in the asdf global asdfsave asdfsave = saveasdf global av_data # Dictionary holding available data - needed in download functions # Keys are {network}{station} av_data = {} # Calculate the min and max theoretical arrival time after event time # according to minimum and maximum epicentral distance min_time = model.get_travel_times(source_depth_in_km=500, distance_in_degree=min_epid, phase_list=[phase])[0].time max_time = model.get_travel_times(source_depth_in_km=0.001, distance_in_degree=max_epid, phase_list=[phase])[0].time mdl = MassDownloader(providers=clients) ########### # logging for the download fdsn_mass_logger = logging.getLogger("obspy.clients.fdsn.mass_downloader") fdsn_mass_logger.setLevel(loglvl) # # Create handler to the log if log_fh is None: fh = logging.FileHandler(os.path.join('logs', 'download.log')) fh.setLevel(logging.INFO) fh.setLevel(loglvl) # Create Formatter fmt = logging.Formatter( fmt='%(asctime)s - %(levelname)s - %(message)s') fh.setFormatter(fmt) else: fh = log_fh fdsn_mass_logger.addHandler(fh) #### # counter how many events are written only for h5 jj = 0 # Loop over each event global event global evt_id for ii, event in enumerate(tqdm(event_cat)): # fetch event-data origin_time = (event.preferred_origin() or event.origins[0]).time ot_fiss = UTCDateTime(origin_time).format_fissures() fdsn_mass_logger.info('Downloading event: '+ot_fiss) evtlat = event.origins[0].latitude evtlon = event.origins[0].longitude evt_id = pu.utc_save_str(origin_time) # Download location tmp.folder = os.path.join(rawloc, f'{evt_id}') # create folder for each event os.makedirs(tmp.folder, exist_ok=True) # Circular domain around the epicenter. This module also offers # rectangular and global domains. More complex domains can be # defined by inheriting from the Domain class. domain = CircularDomain(latitude=evtlat, longitude=evtlon, minradius=min_epid, maxradius=max_epid) restrictions = Restrictions( # Get data from sufficient time before earliest arrival # and after the latest arrival # Note: All the traces will still have the same length starttime=origin_time + min_time - tz, endtime=origin_time + max_time + ta, network=network, station=station, # You might not want to deal with gaps in the data. # If this setting is # True, any trace with a gap/overlap will be discarded. # This will delete streams with several traces! reject_channels_with_gaps=False, # And you might only want waveforms that have data for at least 95% # of the requested time span. Any trace that is shorter than 95% of # the desired total duration will be discarded. minimum_length=0.95, # For 1.00 it will always delete the waveform # No two stations should be closer than 1 km to each other. This is # useful to for example filter out stations that are part of # different networks but at the same physical station. Settings # this option to zero or None will disable that filtering. # Guard against the same station having different names. minimum_interstation_distance_in_m=100.0, # Only HH or BH channels. If a station has BH channels, those will # be downloaded, otherwise the HH. Nothing will be downloaded if it # has neither. channel_priorities=["BH[ZNE12]", "HH[ZNE12]"], # Location codes are arbitrary and there is no rule as to which # location is best. Same logic as for the previous setting. # location_priorities=["", "00", "10"], sanitize=False # discards all mseeds for which no station information is available # I changed it too False because else it will redownload over and # over and slow down the script ) # The data will be downloaded to the ``./waveforms/`` and # ``./stations/`` folders with automatically chosen file names. incomplete = True while incomplete: try: mdl.download( domain, restrictions, mseed_storage=get_mseed_storage, stationxml_storage=statloc, threads_per_client=3, download_chunk_size_in_mb=50) incomplete = False except IncompleteRead: continue # Just retry for poor connection except Exception: incomplete = False # Any other error: continue # 2021.02.15 Here, we write everything to hdf5 # this should not be done every single event, but perhaps every 100th # event or so. if saveasdf and ii-jj > 99: fdsn_mass_logger.info('Rewriting mseed to hdf5.....') mseed_to_hdf5(rawloc, False) jj = ii fdsn_mass_logger.info('...Done') if fast_redownload: event_cat[ii:].write(evtfile, format="QUAKEML") # Always issues with the massdownlaoder inventory, so lets fix it here fdsn_mass_logger.info('Downloading all response files.....') download_full_inventory(statloc, clients) if saveasdf: fdsn_mass_logger.info('Rewriting mseed and xmls to hdf5.....') mseed_to_hdf5(rawloc, save_statxml=True, statloc=statloc) fdsn_mass_logger.info('...Done') tmp.folder = "finished" # removes the restriction for preprocess.py
[docs]def get_mseed_storage( network: str, station: str, location: str, channel: str, starttime: UTCDateTime, endtime: UTCDateTime) -> str: """Stores the files and checks if files are already downloaded""" # Returning True means that neither the data nor the StationXML file # will be downloaded. if asdfsave: if wav_in_hdf5( os.path.join(tmp.folder, os.pardir), network, station, location, channel): return True else: if wav_in_db(network, station, location, channel): return True # If a string is returned the file will be saved in that location. return os.path.join(tmp.folder, "%s.%s.mseed" % (network, station))
def get_stationxml_storage(network: str, station: str, statloc: str): filename = os.path.join(statloc, "%s.%s.xml" % (network, station)) return { "available_channels": [], "missing_channels": '*', "filename": filename}
[docs]def wav_in_db( network: str, station: str, location: str, channel: str) -> bool: """Checks if waveform is already downloaded.""" path = os.path.join(tmp.folder, "%s.%s.mseed" % (network, station)) if os.path.isfile(path): st = read(path) # '.' + location + else: return False if len(st) == 3: return True # All three channels are downloaded # In case, channels are missing elif len(st) == 2: if st[0].stats.channel == channel or st[1].stats.channel == channel: return True elif st[0].stats.channel == channel: return True else: return False
[docs]def wav_in_hdf5( rawloc: str, network: str, station: str, location: str, channel: str) -> bool: """Is the waveform already in the Raw hdf5 database?""" # H5 file location h5_file = os.path.join(rawloc, '%s.%s.h5' % ( network, station)) # First check dictionary try: if evt_id in av_data[network][station][channel]: return True else: return False except KeyError: pass # Check whether there is data from this station at all av_data.setdefault(network, {}) av_data[network].setdefault(station, {}) if not os.path.isfile(h5_file): logging.debug(f'{h5_file} not found') av_data[network][station][channel] = [] return False # The file exists, so we will have to open it and get the dictionary with RawDatabase(h5_file) as rdb: av_data[network][station] = rdb._get_table_of_contents() # Safety net for when the channel list is empty for some reason. if not av_data[network][station]: av_data[network][station][channel] = [] # execute this again to check return wav_in_hdf5( rawloc, network, station, location, channel)
[docs]def wav_in_hdf5_no_global( av_data: dict, network: str, station: str, channel: str, evt_id) -> bool: """Is the waveform already in the Raw hdf5 database?""" if channel not in av_data[network][station]: return False if evt_id in av_data[network][station][channel]: return True else: return False