Source code for pyglimer.utils.SphericalNN

"""
Spherical nearest neighbour idea taken from ObsPy originally,
but heavily modified to include interpolation using shephard's and
modified shephard's method of inverse distance weighting.

"""
# External
from __future__ import annotations
from copy import deepcopy
from typing import Union, Optional
import numpy as np
from scipy.spatial import cKDTree

# Internal
from ..constants import R_EARTH, KM2DEG
from .geo_utils import geo2cart


[docs]class SphericalNN(object): """Spherical nearest neighbour queries using scipy's fast kd-tree implementation. Attributes ---------- data : numpy.ndarray cartesian point data array [x,y,z] kd_tree : scipy.spatial.cKDTree a KDTree used to query data Methods ------- query(qlat, qlon) Query a set of latitudes and longitudes query_pairs(maximum_distance) Find pairs of points that are within a certain distance of each other interp(data, qlat, qlon) Use the kdtree to interpolate data corresponding to the points of the Kdtree onto a new set of points using nearest neighbor interpolation or weighted nearest neighbor interpolation (default). Notes ----- :Authors: Lucas Sawade (lsawade@princeton.edu) :Last Modified: 2021.04.21 20.00 """ def __init__(self, lat, lon): """Initialize class Parameters ---------- lat : numpy.ndarray latitudes lon : numpy.ndarray longitudes """ cart_data = self.spherical2cartesian(lat, lon) self.data = cart_data self.kd_tree = cKDTree(data=cart_data, leafsize=10)
[docs] def query(self, qlat, qlon): """Query latitude and longitude values from kdtree Parameters ---------- qlat : np.ndarray or float query latitude qlon : np.ndarray or float query longitude Returns ------- tuple (distance, indeces) to closest point in kdtree """ points = self.spherical2cartesian(qlat, qlon) d, i = self.kd_tree.query(points) # Filter NaNs. Happens when not enough points are available. m = np.isfinite(d) return d[m], i[m]
[docs] def query_pairs(self, maximum_distance=180.0): """Query pairs within the kdtree Parameters ---------- maximum_distance : float Maximum query distance in deg Returns ------- set or ndarray Set of pairs (i, j) where i < j """ distkm = np.abs( 2 * np.sin(maximum_distance/2.0 / 180.0*np.pi)) * R_EARTH return self.kd_tree.query_pairs(distkm, output_type='ndarray')
[docs] def sparse_distance_matrix(self, other: Union[SphericalNN, None] = None, maximum_distance=180.0, sparse: bool = False, km: bool = False): """Computes the sparse distance matrix between two kdtree. if no other kdtree is provided, this kdtree is used Parameters ---------- other : SphericalNN other SphericalNN tree maximum_distance : float Maximum query distance in deg sparse: bool Flag to output sparse array for memory. Default False km: bool flag to output distances in kilometers. Default False Returns ------- set or ndarray Set of pairs (i, j) where i < j """ # Get distance distkm = np.abs( 2 * np.sin(maximum_distance/2.0 / 180.0*np.pi)) * R_EARTH # Get tree if isinstance(other, SphericalNN): other = other.kd_tree else: other = deepcopy(self).kd_tree # Compute dense or sparse distance matrix if sparse: output_mat = self.kd_tree.sparse_distance_matrix( other, distkm) else: output_mat = self.kd_tree.sparse_distance_matrix( other, distkm).toarray() # Convert form kilometers to degrees. if km is False: output_mat *= KM2DEG return output_mat
[docs] def interpolator( self, qlat, qlon, maximum_distance=None, no_weighting=False, k: Optional[int] = None, p: float = 2.0): """Spherical interpolation function using the ``SphericalNN`` object. Returns an interpolator that can be used for interpolating the same set of locations based on the KDTree. The only input the interpolator takes are the data corresponding to the points in the KDTree. Parameters ---------- qlat : numpy.ndarray query latitudes qlon : numpy.ndarray query longitude maximum_distance : float, optional max distace for the interpolation in degree angle. Default None. If the mindistance to any points is larger than maximum_distance the interpolated value is set to ``np.nan``. no_weighting : bool, optional Whether or not the function uses a weightied nearest neighbor interpolation k : int, optional Define maximum number of neighbors to be used for the weighted interpolation. Not used if ``no_weighting = True``. Default None p : float, optional Exponent to compute the inverse distance weights. Note that in the limit ``p->inf`` is just a nearest neighbor interpolation. Default is 2 Notes ----- In the future, I may add a variable weighting function for the weighted interpolation. Please refer to https://en.wikipedia.org/wiki/Inverse_distance_weighting for the interpolation weighting. """ # Get query points in cartesian shp = qlat.shape points = self.spherical2cartesian(qlat.flatten(), qlon.flatten()) # Query points if no_weighting: # Get distances and indeces d, inds = self.kd_tree.query(points) def interpolator(data): # Assign the interpolation data. qdata = data[inds] # Filter out distances too far out. if maximum_distance is not None: qdata = np.where( d <= np.abs( 2 * np.sin(maximum_distance/2.0/180.0*np.pi)) * R_EARTH, qdata, np.nan) return data[inds].reshape(shp) else: # Set K to the max number of points if not given if k is None: k = self.kd_tree.n else: if k > self.kd_tree.n: k = self.kd_tree.n # Get multiple distances and indeces d, inds = self.kd_tree.query(points, k=k) if d.shape == (1,): d = d.reshape((1, 1)) inds = inds.reshape((1, 1)) # Filter out distances too far out. # Modified Shepard's method if maximum_distance is not None: # Get cartesian distance cartd = np.abs(2 * np.sin(maximum_distance/2.0/180.0*np.pi) * R_EARTH) # Compute the weights using modified shepard w = (np.fmax(0, cartd - d) / cartd * d) ** p wsum = np.sum(w, axis=1) datarows = (wsum != 0) def interpolator(data): """ Using the weights and indices, we can return an interpolator """ # Empty array qdata = np.empty_like(wsum) qdata[:] = np.nan # interpolation using the weights qdata[datarows] = np.nansum( w[datarows, :] * data[inds[datarows]], axis=1) \ / wsum[datarows] return qdata.reshape(shp) # Shepard's method else: # Compute the weights using modified shepard w = (1 / d) ** p wsum = np.sum(w, axis=1) datarows = (wsum != 0) def interpolator(data): """ Using the weights and indices, we can return an interpolator """ # Empty array qdata = np.empty_like(wsum) qdata[:] = np.nan # interpolation using the weights qdata[datarows] = np.sum( w * data[inds[datarows]], axis=1) / wsum[datarows] return qdata.reshape(shp) return interpolator
[docs] @ staticmethod def spherical2cartesian(lat, lon): """ Converts a list of :class:`~obspy.fdsn.download_status.Station` objects to an array of shape(len(list), 3) containing x/y/z in meters. """ # Create three arrays containing lat/lng/radius. r = np.ones_like(lat) * R_EARTH # Convert data from lat/lng to x/y/z. x, y, z = geo2cart(r, lat, lon) return np.vstack((x, y, z)).T