Source code for pyrolite.util.spatial

"""
Baisc spatial utility functions.
"""
import itertools

import numpy as np

try:
    from psutil import virtual_memory  # memory check
except ImportError:
    virtual_memory = None

from .log import Handle

logger = Handle(__name__)


def _get_sqare_grid_segment_indicies(size, segments):
    """
    Get the indexes for segment boundaries for iterating over a grid within an array.

    Parameters
    ----------
    size : :class:`int`
        Shape of the square array.
    segments : :class:`int`
        Number of segments for the grid.

    Returns
    --------
    :class:`numpy.ndarray`
    """
    seg_size = size // segments
    segx = [(seg_size * ix, seg_size * (ix + 1)) for ix in range(segments)]
    segx[-1] = (seg_size * (segments - 1), size - 1)
    return [[*a, *b] for a, b in itertools.product(segx, segx)]


def _spherical_law_cosinse_GC_distance(φ1, φ2, λ1, λ2):
    """
    Spherical law of cosines calculation of distance between two points. Suffers from
    rounding errors for closer points.

    Parameters
    ----------
    φ1, φ2, λ1, λ2
        Numpy array wih latitudes and longitudes [x1, x2, y1, y2]
    """

    Δλ = np.abs(λ1 - λ2)
    # Δφ = np.abs(φ1 - φ2)
    return np.arccos(np.sin(φ1) * np.sin(φ2) + np.cos(φ1) * np.cos(φ2) * np.cos(Δλ))


def _vicenty_GC_distance(φ1, φ2, λ1, λ2):
    """
    Vicenty formula for an ellipsoid with equal major and minor axes.

    Vincenty T (1975) Direct and Inverse Solutions of Geodesics on the Ellipsoid with
    Application of Nested Equations. Survey Review 23:88–93.
    doi: 10.1179/SRE.1975.23.176.88

    Parameters
    ----------
    φ1, φ2 : :class:`numpy.ndarray`
        Numpy arrays wih latitudes.
    λ1, λ2 : :class:`numpy.ndarray`
        Numpy arrays wih longitude.
    """
    Δλ = np.abs(λ1 - λ2)
    # Δφ = np.abs(φ1 - φ2)

    _S = np.sqrt(
        (np.cos(φ2) * np.sin(Δλ)) ** 2
        + (np.cos(φ1) * np.sin(φ2) - np.sin(φ1) * np.cos(φ2) * np.cos(Δλ)) ** 2
    )
    _C = np.sin(φ1) * np.sin(φ2) + np.cos(φ1) * np.cos(φ2) * np.cos(Δλ)
    return np.abs(np.arctan2(_S, _C))


def _haversine_GC_distance(φ1, φ2, λ1, λ2):
    """
    Haversine formula for great circle distance. Suffers from rounding errors for
    antipodal points.

    Parameters
    ----------
    φ1, φ2 : :class:`numpy.ndarray`
        Numpy arrays wih latitudes.
    λ1, λ2 : :class:`numpy.ndarray`
        Numpy arrays wih longitude.

    """
    Δλ = np.abs(λ1 - λ2)
    Δφ = np.abs(φ1 - φ2)
    return 2 * np.arcsin(
        np.sqrt(np.sin(Δφ / 2) ** 2 + np.cos(φ1) * np.cos(φ2) * np.sin(Δλ / 2) ** 2)
    )


def _segmented_spatial_distance_matrix(
    φ1, φ2, λ1, λ2, metric, dtype="float32", segs=10
):
    size = np.max([a.shape[0] for a in [φ1, φ2, λ1, λ2]])
    angle = np.zeros((size, size), dtype=dtype)  # full matrix
    for ix_s, ix_e, iy_s, iy_e in _get_sqare_grid_segment_indicies(size, segs):
        angle[ix_s:ix_e, iy_s:iy_e] = metric(
            φ1[ix_s:ix_e][:, np.newaxis],
            φ2[iy_s:iy_e][np.newaxis, :],
            λ1[ix_s:ix_e][:, np.newaxis],
            λ2[iy_s:iy_e][np.newaxis, :],
        )
    return angle


[docs]def great_circle_distance( a, b=None, absolute=False, degrees=True, r=6371.0088, method=None, dtype="float32", max_memory_fraction=0.25, ): """ Calculate the great circle distance between two lat, long points. Parameters ---------- a, b : :class:`float` | :class:`numpy.ndarray` Lat-Long points or arrays to calculate distance between. If only one array is specified, a full distance matrix (i.e. calculate a point-to-point distance for every combination of points) will be returned. absolute : :class:`bool`, :code:`False` Whether to return estimates of on-sphere distances [True], or simply return the central angle between the points. degrees : :class:`bool`, :code:`True` Whether lat-long coordinates are in degrees [True] or radians [False]. r : :class:`float` Earth radii for estimating absolute distances. method : :class:`str`, :code:`{'vicenty', 'cosines', 'haversine'}` Which method to use for great circle distance calculation. Defaults to the Vicenty formula. dtype : :class:`numpy.dtype` Data type for distance arrays, to constrain memory management. max_memory_fraction : :class:`float` Constraint to switch to calculating mean distances where :code:`matrix=True` and the distance matrix requires greater than a specified fraction of total avaialbe physical memory. """ a = np.atleast_2d(np.array(a).astype(dtype)) matrix = False if b is not None: b = np.atleast_2d(np.array(b).astype(dtype)) else: matrix = True b = a.copy() # check the sizes of a and b - they should be the same if degrees: # convert from degrees if needed a, b = np.deg2rad(a), np.deg2rad(b) φ1, φ2 = a[:, 0], b[:, 0] # latitudes λ1, λ2 = a[:, 1], b[:, 1] # longitudes if method is None: f = _vicenty_GC_distance else: if method.lower().startswith("cos"): f = _spherical_law_cosinse_GC_distance elif method.lower().startswith("hav"): f = _haversine_GC_distance else: # Default to most precise f = _vicenty_GC_distance if matrix: # if matrix mode we need to turn these 1d arrays into 2d # but, with large arrays it'll spit out a memory error # so instead we can try to build it numerically size = np.max([a.shape[0] for a in [φ1, φ2, λ1, λ2]]) estimated_matrix_size = np.array([[1.0]], dtype=dtype).nbytes * size**2 logger.debug( "Attempting to build {}x{} array of size {:.2f} Gb.".format( size, size, estimated_matrix_size / 1024**3 ) ) infeasible = estimated_matrix_size > (virtual_memory().total * max_memory_fraction) if virtual_memory is not None else False if infeasible: logger.warn( "Angle array for segmented distance matrix larger than maximum memory " "fraction, computing mean global distances instead." ) angle = np.zeros((size, 1)) # compute sum-distances for each lat-long pair for ix, (_φ1, _λ1) in enumerate(np.vstack([φ1, λ1])): angle[ix, 0] = f(_φ1, φ2, _λ1, λ2) else: try: angle = np.atleast_1d( f(φ1[:, None], φ2[None, :], λ1[:, None], λ2[None, :]) ) except (MemoryError, ValueError): logger.warn( "Cannot directly compute distance matrix, attempting segmented distance" " matrix instead." ) # could set segs such that there is a maximum amount of memory per seg angle = _segmented_spatial_distance_matrix(φ1, φ2, λ1, λ2, f) else: angle = np.atleast_1d(f(φ1, φ2, λ1, λ2)) if ( np.isnan(angle).any() and f != _vicenty_GC_distance ): # fallback for cos failure @ 0. fltr = np.isnan(angle) angle[fltr] = _vicenty_GC_distance(φ1[fltr], φ2[fltr], λ1[fltr], λ2[fltr]) if absolute: return np.rad2deg(angle) * r else: return np.rad2deg(angle)
[docs]def piecewise(segment_ranges: list, segments=2, output_fmt=np.float64): """ Generator to provide values of quantizable paramaters which define a grid, here used to split up queries from databases to reduce load. Parameters ---------- segment_ranges : :class:`list` List of segment ranges to create a grid from. segments : :class:`int` Number of segments. output_fmt Function to call on the output. """ outf = np.vectorize(output_fmt) if isinstance(segments, int): segments = list(np.ones(len(segment_ranges), dtype=int) * segments) else: pass seg_width = [ (x2 - x1) / segments[ix] # can have negative steps for ix, (x1, x2) in enumerate(segment_ranges) ] separators = [ np.linspace(x1, x2, segments[ix] + 1)[:-1] for ix, (x1, x2) in enumerate(segment_ranges) ] pieces = list(itertools.product(*separators)) for piece in pieces: piece = np.array(piece) out = np.vstack((piece, piece + np.array(seg_width))) yield outf(out)
[docs]def spatiotemporal_split( segments=4, nan_lims=[np.nan, np.nan], # usebounds=False, # order=['minx', 'miny', 'maxx', 'maxy'], **kwargs ): """ Creates spatiotemporal grid using piecewise function and arbitrary ranges for individial kw-parameters (e.g. age=(0., 450.)), and sequentially returns individial grid cell attributes. Parameters ---------- segments : :class:`int` Number of segments. nan_lims : :class:`list` | :class:`tuple` Specificaiton of NaN indexes for missing boundaries. Yields ------- :class:`dict` Iteration through parameter sets for each cell of the grid. """ part = 0 for item in piecewise(kwargs.values(), segments=segments): x1s, x2s = item part += 1 params = {} for vix, var in enumerate(kwargs.keys()): vx1, vx2 = x1s[vix], x2s[vix] params[var] = (vx1, vx2) items = dict( south=params.get("lat", nan_lims)[0], north=params.get("lat", nan_lims)[1], west=params.get("long", nan_lims)[0], east=params.get("long", nan_lims)[1], ) if "age" in params: items.update( dict( minage=params.get("age", nan_lims)[0], maxage=params.get("age", nan_lims)[1], ) ) items = {k: v for (k, v) in items.items() if not np.isnan(v)} # if usebounds: # bounds = NSEW_2_bounds(items, order=order) # yield bounds # else: yield items
[docs]def NSEW_2_bounds(cardinal, order=["minx", "miny", "maxx", "maxy"]): """ Translates cardinal points to xy points in the form of bounds. Useful for converting to the format required for WFS from REST style queries. Parameters ---------- cardinal : :class:`dict` Cardinally-indexed point bounds. order : :class:`list` List indicating order of returned x-y bound coordinates. Returns ------- :class:`list` x-y indexed extent values in the specified order. """ tnsltr = { xy: c for xy, c in zip( ["minx", "miny", "maxx", "maxy"], ["west", "south", "east", "north"] ) } bnds = [cardinal.get(tnsltr[o]) for o in order] return bnds
[docs]def levenshtein_distance(seq_one, seq_two): """ Compute the Levenshtein Distance between two sequences with comparable items. Adapted from Wiki pseudocode. Parameters ---------- seq_one, seq_two : :class:`str` | :class:`list` Sequences to compare. Returns -------- :class:`int` """ m, n = len(seq_one), len(seq_two) D = np.zeros((m + 1, n + 1), dtype=int) for i in range(m + 1): D[i, 0] = i for j in range(n + 1): D[0, j] = j for j in np.arange(1, n + 1): # n along columns for i in np.arange(1, m + 1): # m along rows if seq_one[i - 1] == seq_two[j - 1]: substitutionCost = 0 else: substitutionCost = 1 D[i, j] = min( D[i - 1, j] + 1, D[i, j - 1] + 1, D[i - 1, j - 1] + substitutionCost ) return D[-1, -1]