Source code for pyrolite.comp.impute

import numpy as np
import scipy.stats as stats

from pyrolite.comp.codata import ALR, close, inverse_ALR
from pyrolite.util.math import augmented_covariance_matrix, nancov
from pyrolite.util.missing import md_pattern

from ..util.log import Handle

logger = Handle(__name__)


def _little_sweep(G, k: int = 0, verify=False):
    """
    Parameters
    ---------------
    G : :class:`numpy.ndarray`
        Input array to sweep.
    k : :class:`int`
        Index to sweep on.
    verify : :class:`bool`
        Whether to verify valid matrix input.

    Returns
    --------
    H : :class:`numpy.ndarray`
        Swept array.

    References
    ----------
        Little R. J. A. and Rubin D. B. (2014).
        Statistical analysis with missing data. 2nd ed., Wiley, Hoboken, N.J.

    Notes
    ------
        The sweep operator is defined for symmetric matrices as follows. A p x p
        symmetric matrix G is said to be swept on row and column k if it is replaced by
        another symmetric p x p matrix H with elements defined as follows:

        h_kk = -1 / g_kk
        h_jk = h_kj = g_jk / g_kk ; j != k
        h_jl = g_jl - g_jk * g_kl / g_kk ; j != k; l != k

    """
    H = np.asarray(G).copy()
    n = H.shape[0]
    if verify:
        if H.shape != (n, n):
            raise ValueError("Not a square array")
        if not np.isfinite(H).all():
            raise ValueError("Not all finite.")
        if not np.allclose(H - H.T, 0):
            raise ValueError("Not a symmetrical array")
    inds = [i for i in np.linspace(0, n - 1, n).astype(int) if not np.isclose(i, k)]
    assert np.isfinite(G[k, k]) & (G[k, k] != 0.0)
    H[k, k] = -1 / G[k, k]
    H[k, inds] = -G[k, inds] * H[k, k]  # divide row k by D
    H[inds, k] = -G[inds, k] * H[k, k]  # divide column k by D
    for j in inds:
        for l in inds:
            H[j, l] = G[j, l] - H[j, k] * G[k, l]

    return H


def _multisweep(G, ks):
    """
    Sweep G along all indexes ks.

    Parameters
    -----------
    G : :class:`numpy.ndarray`
        Augmented covariance matrix to sweep.
    ks : :class:`numpy.ndarray`
        Indicies to sweep.

    Returns
    --------
    :class:`numpy.ndarray`
    """
    H = G.copy()
    for k in ks:
        H = _little_sweep(H, k=k)
    return H


def _reg_sweep(M: np.ndarray, C: np.ndarray, varobs: np.ndarray, error_threshold=None):
    r"""
    Performs multiple sweeps of the augmented covariance matrix and extracts the
    regression coefficients :math:`\beta_{0} \cdots \beta_(d)` and residial covariance
    for the regression of missing variables against observed variables for a given
    missing data pattern. Translated from matlab to python from Palarea-Albaladejo
    and Martín-Fernández (2008) [#ref_1]_. Note that this algorithm requires at least
    two columns free of missing values.

    Parameters
    -----------
    M : :class:`numpy.ndarray`
        Array of means of shape :code:`(D, )`.
    C : :class:`numpy.ndarray`
        Covariance of shape :code:`(D, D)`.
    varobs : :class:`numpy.ndarray`
        Boolean array indicating which variables are included in the regression model,
        of shape :code:`(D, )`
    error_threshold : :class:`float`
        Low-pass threshold at which an error will result, of shape :code:`(D, )`.
        Effectively limiting mean values to :math:`e^{threshold}`.

    Returns
    --------
    β : :class:`numpy.ndarray`
        Array of estimated regression coefficients.
    σ2_res : :class:`numpy.ndarray`
        Residuals.

    References
    ----------
    .. [#ref_1] Palarea-Albaladejo J. and Martín-Fernández J. A. (2008)
            A modified EM ALR-algorithm for replacing rounded zeros in compositional data sets.
            Computers & Geosciences 34, 902–917.
            doi: `10.1016/j.cageo.2007.09.015 <https://dx.doi.org/10.1016/j.cageo.2007.09.015>`__

    """
    assert np.isfinite(M).all()
    assert np.isfinite(C).all()
    if error_threshold is not None:
        assert (np.abs(M) < error_threshold).all()  # avoid runaway expansion
    dimension = M.size  # p > 0
    nvarobs = varobs.size  # q > 0 # number of observed variables
    dep = np.array([i for i in np.arange(dimension) if not i in varobs])
    # Shift the non-zero element to the end for pivoting
    reor = np.concatenate(([0], varobs + 1, dep + 1), axis=0)  #
    A = augmented_covariance_matrix(M, C)
    A = A[reor, :][:, reor]
    # Astart = A.copy(deep=True)
    assert (np.diag(A) != 0).all()  # Not introducing extra zeroes
    A = _multisweep(A, range(nvarobs + 1))
    """
    A is of form:
    -D  | E
    E.T | F
    """
    # if not np.isfinite(A).all():  # Typically caused by infs
    #    A[~np.isfinite(A)] = 0
    assert np.isfinite(A).all()
    β = A[0 : nvarobs + 1, nvarobs + 1 : dimension + 1]
    σ2_res = A[nvarobs + 1 :, nvarobs + 1 :]
    return β, σ2_res


[docs]def EMCOMP( X, threshold=None, tol=0.0001, convergence_metric=lambda A, B, t: np.linalg.norm(np.abs(A - B)) < t, max_iter=30, ): r""" EMCOMP replaces rounded zeros in a compositional data set based on a set of thresholds. After Palarea-Albaladejo and Martín-Fernández (2008) [#ref_1]_. Parameters ---------- X : :class:`numpy.ndarray` Dataset with rounded zeros threshold : :class:`numpy.ndarray` Array of threshold values for each component as a proprotion. tol : :class:`float` Tolerance to check for convergence. convergence_metric : :class:`callable` Callable function to check for convergence. Here we use a compositional distance rather than a maximum absolute difference, with very similar performance. Function needs to accept two :class:`numpy.ndarray` arguments and third tolerance argument. max_iter : :class:`int` Maximum number of iterations before an error is thrown. Returns -------- X_est : :class:`numpy.ndarray` Dataset with rounded zeros replaced. prop_zeros : :class:`float` Proportion of zeros in the original data set. n_iters : :class:`int` Number of iterations needed for convergence. Notes ----- * At least one component without missing values is needed for the divisor. Rounded zeros/missing values are replaced by values below their respective detection limits. * This routine is not completely numerically stable as written. Todo ------- * Implement methods to deal with variable decection limits (i.e thresholds are array shape :code:`(N, D)`) * Conisder non-normal models for data distributions. * Improve numerical stability to reduce the chance of :code:`np.inf` appearing. References ---------- .. [#ref_1] Palarea-Albaladejo J. and Martín-Fernández J. A. (2008) A modified EM ALR-algorithm for replacing rounded zeros in compositional data sets. Computers & Geosciences 34, 902–917. doi: `10.1016/j.cageo.2007.09.015 <https://dx.doi.org/10.1016/j.cageo.2007.09.015>`__ """ X = X.copy() n_obs, D = X.shape X = close(X, sumf=np.nansum) # --------------------------------- # Convert zeros into missing values # --------------------------------- X = np.where(np.isclose(X, 0.0), np.nan, X) # Use a divisor free of missing values assert np.isfinite(X).all(axis=0).any() pos = np.argmax(np.isfinite(X).all(axis=0)) Yden = X[:, pos] # -------------------------------------- # Compute the matrix of censure points Ψ # -------------------------------------- # need an equivalent concept for ilr cpoints = ( np.ones((n_obs, 1)) @ np.log(threshold[np.newaxis, :]) - np.log(Yden[:, np.newaxis]) @ np.ones((1, D)) - np.spacing(1.0) # Machine epsilon ) assert np.isfinite(cpoints).all() cpoints = cpoints[:, [i for i in range(D) if not i == pos]] # censure points prop_zeroes = np.count_nonzero(~np.isfinite(X)) / (n_obs * D) Y = ALR(X, pos) # ---------------Log Space-------------------------------- LD = Y.shape[1] M = np.nanmean(Y, axis=0) # μ0 C = nancov(Y) # Σ0 assert np.isfinite(M).all() and np.isfinite(C).all() # -------------------------------------------------- # Stage 2: Find and enumerate missing data patterns # -------------------------------------------------- pID, pD = md_pattern(Y) # ------------------------------------------- # Stage 3: Regression against other variables # ------------------------------------------- logger.debug( "Starting Iterative Regression for Matrix : ({}, {})".format(n_obs, LD) ) another_iter = True niters = 0 while another_iter: niters += 1 Mnew, Cnew = M.copy(), C.copy() Ystar = Y.copy() V = np.zeros((LD, LD)) for p_no in np.unique(pID): logger.debug("Pattern ID: {}, {}".format(p_no, pD[p_no]["pattern"])) rows = np.arange(pID.size)[pID == p_no] # rows with this pattern varobs, varmiss = ( np.arange(D - 1)[~pD[p_no]["pattern"]], np.arange(D - 1)[pD[p_no]["pattern"]], ) sigmas = np.zeros((LD)) assert np.isfinite(Y[np.ix_(rows, varobs)]).all() assert (~np.isfinite(Y[np.ix_(rows, varmiss)])).all() if varobs.size and varmiss.size: # Non-completely missing, but missing some logger.debug( "Regressing {} rows for pattern {} | {}.".format( rows.size, "".join(varobs.astype(str)), "".join(varmiss.astype(str)), ) ) B, σ2_res = _reg_sweep(Mnew, Cnew, varobs) assert B.shape == (varobs.size + 1, varmiss.size) assert σ2_res.shape == (varmiss.size, varmiss.size) assert np.isfinite(B).all() logger.debug( "Current Estimator (1, {})".format( ", ".join([{}".format(i) for i in range(B.shape[0] - 1)]) ) ) Ystar[np.ix_(rows, varmiss)] = np.ones((rows.size, 1)) * B[0, :] + ( (Y[np.ix_(rows, varobs)] @ B[1 : (varobs.size + 1), :]) ) sigmas[varmiss] = np.sqrt(np.diag(σ2_res)) assert np.isfinite(sigmas[varmiss]).all() x = ( # position of threshold values relative to estimated means cpoints[np.ix_(rows, varmiss)] - Ystar[np.ix_(rows, varmiss)] ) x /= sigmas[varmiss][np.newaxis, :] # as standard deviations assert np.isfinite(x).all() # ---------------------------------------------------- # Calculate inverse Mills Ratio for Heckman correction # ---------------------------------------------------- ϕ = stats.norm.pdf(x, loc=0, scale=1) # pdf Φ = stats.norm.cdf(x, loc=0, scale=1) # cdf Φ[np.isclose(Φ, 0)] = np.finfo(np.float64).eps * 2 assert (Φ > 0).all() # if its not, infinity will be introduced inversemills = ϕ / Φ Ystar[np.ix_(rows, varmiss)] = ( Ystar[np.ix_(rows, varmiss)] - sigmas[varmiss] * inversemills ) V[np.ix_(varmiss, varmiss)] += σ2_res * rows.size assert np.isfinite(V).all() # ----------------------------------------------- # Update and store parameter vector (μ(t), Σ(t)). # ----------------------------------------------- logger.debug("Regression finished.") M = np.nanmean(Ystar, axis=0) Ydevs = Ystar - np.ones((n_obs, 1)) * M Ydevs[~np.isfinite(Ydevs)] = 0.0 # remove nonfinite components PC = np.dot(Ydevs.T, Ydevs) logger.debug("Correlation:\n{}".format(PC / (n_obs - 1))) C = (PC + V) / (n_obs - 1) logger.debug("Average diff: {}".format(np.mean(Ydevs, axis=0))) assert np.isfinite(C).all() # -------------------- # Convergence checking # -------------------- if convergence_metric(M, Mnew, tol) & convergence_metric(C, Cnew, tol): another_iter = False logger.debug("Convergence achieved.") another_iter = another_iter & (niters < max_iter) logger.debug("Iterations Continuing: {}".format(another_iter)) # ---------------------------- # Back to compositional space # --------------------------- logger.debug("Finished. Inverting to compositional space.") Xstar = inverse_ALR(Ystar, pos) return Xstar, prop_zeroes, niters