Source code for pyrolite.comp.impute

import numpy as np
import scipy.stats as stats

from pyrolite.comp.codata import ALR, close, inverse_ALR
from pyrolite.util.math import augmented_covariance_matrix, nancov
from pyrolite.util.missing import md_pattern

from ..util.log import Handle

logger = Handle(__name__)

def _little_sweep(G, k: int = 0, verify=False):
    G : :class:`numpy.ndarray`
        Input array to sweep.
    k : :class:`int`
        Index to sweep on.
    verify : :class:`bool`
        Whether to verify valid matrix input.

    H : :class:`numpy.ndarray`
        Swept array.

        Little R. J. A. and Rubin D. B. (2014).
        Statistical analysis with missing data. 2nd ed., Wiley, Hoboken, N.J.

        The sweep operator is defined for symmetric matrices as follows. A p x p
        symmetric matrix G is said to be swept on row and column k if it is replaced by
        another symmetric p x p matrix H with elements defined as follows:

        h_kk = -1 / g_kk
        h_jk = h_kj = g_jk / g_kk ; j != k
        h_jl = g_jl - g_jk * g_kl / g_kk ; j != k; l != k

    H = np.asarray(G).copy()
    n = H.shape[0]
    if verify:
        if H.shape != (n, n):
            raise ValueError("Not a square array")
        if not np.isfinite(H).all():
            raise ValueError("Not all finite.")
        if not np.allclose(H - H.T, 0):
            raise ValueError("Not a symmetrical array")
    inds = [i for i in np.linspace(0, n - 1, n).astype(int) if not np.isclose(i, k)]
    assert np.isfinite(G[k, k]) & (G[k, k] != 0.0)
    H[k, k] = -1 / G[k, k]
    H[k, inds] = -G[k, inds] * H[k, k]  # divide row k by D
    H[inds, k] = -G[inds, k] * H[k, k]  # divide column k by D
    for j in inds:
        for l in inds:
            H[j, l] = G[j, l] - H[j, k] * G[k, l]

    return H

def _multisweep(G, ks):
    Sweep G along all indexes ks.

    G : :class:`numpy.ndarray`
        Augmented covariance matrix to sweep.
    ks : :class:`numpy.ndarray`
        Indicies to sweep.

    H = G.copy()
    for k in ks:
        H = _little_sweep(H, k=k)
    return H

def _reg_sweep(M: np.ndarray, C: np.ndarray, varobs: np.ndarray, error_threshold=None):
    Performs multiple sweeps of the augmented covariance matrix and extracts the
    regression coefficients :math:`\beta_{0} \cdots \beta_(d)` and residial covariance
    for the regression of missing variables against observed variables for a given
    missing data pattern. Translated from matlab to python from Palarea-Albaladejo
    and Martín-Fernández (2008) [#ref_1]_. Note that this algorithm requires at least
    two columns free of missing values.

    M : :class:`numpy.ndarray`
        Array of means of shape :code:`(D, )`.
    C : :class:`numpy.ndarray`
        Covariance of shape :code:`(D, D)`.
    varobs : :class:`numpy.ndarray`
        Boolean array indicating which variables are included in the regression model,
        of shape :code:`(D, )`
    error_threshold : :class:`float`
        Low-pass threshold at which an error will result, of shape :code:`(D, )`.
        Effectively limiting mean values to :math:`e^{threshold}`.

    β : :class:`numpy.ndarray`
        Array of estimated regression coefficients.
    σ2_res : :class:`numpy.ndarray`

    .. [#ref_1] Palarea-Albaladejo J. and Martín-Fernández J. A. (2008)
            A modified EM ALR-algorithm for replacing rounded zeros in compositional data sets.
            Computers & Geosciences 34, 902–917.
            doi: `10.1016/j.cageo.2007.09.015 <>`__

    assert np.isfinite(M).all()
    assert np.isfinite(C).all()
    if error_threshold is not None:
        assert (np.abs(M) < error_threshold).all()  # avoid runaway expansion
    dimension = M.size  # p > 0
    nvarobs = varobs.size  # q > 0 # number of observed variables
    dep = np.array([i for i in np.arange(dimension) if not i in varobs])
    # Shift the non-zero element to the end for pivoting
    reor = np.concatenate(([0], varobs + 1, dep + 1), axis=0)  #
    A = augmented_covariance_matrix(M, C)
    A = A[reor, :][:, reor]
    # Astart = A.copy(deep=True)
    assert (np.diag(A) != 0).all()  # Not introducing extra zeroes
    A = _multisweep(A, range(nvarobs + 1))
    A is of form:
    -D  | E
    E.T | F
    # if not np.isfinite(A).all():  # Typically caused by infs
    #    A[~np.isfinite(A)] = 0
    assert np.isfinite(A).all()
    β = A[0 : nvarobs + 1, nvarobs + 1 : dimension + 1]
    σ2_res = A[nvarobs + 1 :, nvarobs + 1 :]
    return β, σ2_res

[docs]def EMCOMP( X, threshold=None, tol=0.0001, convergence_metric=lambda A, B, t: np.linalg.norm(np.abs(A - B)) < t, max_iter=30, ): r""" EMCOMP replaces rounded zeros in a compositional data set based on a set of thresholds. After Palarea-Albaladejo and Martín-Fernández (2008) [#ref_1]_. Parameters ---------- X : :class:`numpy.ndarray` Dataset with rounded zeros threshold : :class:`numpy.ndarray` Array of threshold values for each component as a proprotion. tol : :class:`float` Tolerance to check for convergence. convergence_metric : :class:`callable` Callable function to check for convergence. Here we use a compositional distance rather than a maximum absolute difference, with very similar performance. Function needs to accept two :class:`numpy.ndarray` arguments and third tolerance argument. max_iter : :class:`int` Maximum number of iterations before an error is thrown. Returns -------- X_est : :class:`numpy.ndarray` Dataset with rounded zeros replaced. prop_zeros : :class:`float` Proportion of zeros in the original data set. n_iters : :class:`int` Number of iterations needed for convergence. Notes ----- * At least one component without missing values is needed for the divisor. Rounded zeros/missing values are replaced by values below their respective detection limits. * This routine is not completely numerically stable as written. Todo ------- * Implement methods to deal with variable decection limits (i.e thresholds are array shape :code:`(N, D)`) * Conisder non-normal models for data distributions. * Improve numerical stability to reduce the chance of :code:`np.inf` appearing. References ---------- .. [#ref_1] Palarea-Albaladejo J. and Martín-Fernández J. A. (2008) A modified EM ALR-algorithm for replacing rounded zeros in compositional data sets. Computers & Geosciences 34, 902–917. doi: `10.1016/j.cageo.2007.09.015 <>`__ """ X = X.copy() n_obs, D = X.shape X = close(X, sumf=np.nansum) # --------------------------------- # Convert zeros into missing values # --------------------------------- X = np.where(np.isclose(X, 0.0), np.nan, X) # Use a divisor free of missing values assert np.isfinite(X).all(axis=0).any() pos = np.argmax(np.isfinite(X).all(axis=0)) Yden = X[:, pos] # -------------------------------------- # Compute the matrix of censure points Ψ # -------------------------------------- # need an equivalent concept for ilr cpoints = ( np.ones((n_obs, 1)) @ np.log(threshold[np.newaxis, :]) - np.log(Yden[:, np.newaxis]) @ np.ones((1, D)) - np.spacing(1.0) # Machine epsilon ) assert np.isfinite(cpoints).all() cpoints = cpoints[:, [i for i in range(D) if not i == pos]] # censure points prop_zeroes = np.count_nonzero(~np.isfinite(X)) / (n_obs * D) Y = ALR(X, pos) # ---------------Log Space-------------------------------- LD = Y.shape[1] M = np.nanmean(Y, axis=0) # μ0 C = nancov(Y) # Σ0 assert np.isfinite(M).all() and np.isfinite(C).all() # -------------------------------------------------- # Stage 2: Find and enumerate missing data patterns # -------------------------------------------------- pID, pD = md_pattern(Y) # ------------------------------------------- # Stage 3: Regression against other variables # ------------------------------------------- logger.debug( "Starting Iterative Regression for Matrix : ({}, {})".format(n_obs, LD) ) another_iter = True niters = 0 while another_iter: niters += 1 Mnew, Cnew = M.copy(), C.copy() Ystar = Y.copy() V = np.zeros((LD, LD)) for p_no in np.unique(pID): logger.debug("Pattern ID: {}, {}".format(p_no, pD[p_no]["pattern"])) rows = np.arange(pID.size)[pID == p_no] # rows with this pattern varobs, varmiss = ( np.arange(D - 1)[~pD[p_no]["pattern"]], np.arange(D - 1)[pD[p_no]["pattern"]], ) sigmas = np.zeros((LD)) assert np.isfinite(Y[np.ix_(rows, varobs)]).all() assert (~np.isfinite(Y[np.ix_(rows, varmiss)])).all() if varobs.size and varmiss.size: # Non-completely missing, but missing some logger.debug( "Regressing {} rows for pattern {} | {}.".format( rows.size, "".join(varobs.astype(str)), "".join(varmiss.astype(str)), ) ) B, σ2_res = _reg_sweep(Mnew, Cnew, varobs) assert B.shape == (varobs.size + 1, varmiss.size) assert σ2_res.shape == (varmiss.size, varmiss.size) assert np.isfinite(B).all() logger.debug( "Current Estimator (1, {})".format( ", ".join([{}".format(i) for i in range(B.shape[0] - 1)]) ) ) Ystar[np.ix_(rows, varmiss)] = np.ones((rows.size, 1)) * B[0, :] + ( (Y[np.ix_(rows, varobs)] @ B[1 : (varobs.size + 1), :]) ) sigmas[varmiss] = np.sqrt(np.diag(σ2_res)) assert np.isfinite(sigmas[varmiss]).all() x = ( # position of threshold values relative to estimated means cpoints[np.ix_(rows, varmiss)] - Ystar[np.ix_(rows, varmiss)] ) x /= sigmas[varmiss][np.newaxis, :] # as standard deviations assert np.isfinite(x).all() # ---------------------------------------------------- # Calculate inverse Mills Ratio for Heckman correction # ---------------------------------------------------- ϕ = stats.norm.pdf(x, loc=0, scale=1) # pdf Φ = stats.norm.cdf(x, loc=0, scale=1) # cdf Φ[np.isclose(Φ, 0)] = np.finfo(np.float64).eps * 2 assert (Φ > 0).all() # if its not, infinity will be introduced inversemills = ϕ / Φ Ystar[np.ix_(rows, varmiss)] = ( Ystar[np.ix_(rows, varmiss)] - sigmas[varmiss] * inversemills ) V[np.ix_(varmiss, varmiss)] += σ2_res * rows.size assert np.isfinite(V).all() # ----------------------------------------------- # Update and store parameter vector (μ(t), Σ(t)). # ----------------------------------------------- logger.debug("Regression finished.") M = np.nanmean(Ystar, axis=0) Ydevs = Ystar - np.ones((n_obs, 1)) * M Ydevs[~np.isfinite(Ydevs)] = 0.0 # remove nonfinite components PC =, Ydevs) logger.debug("Correlation:\n{}".format(PC / (n_obs - 1))) C = (PC + V) / (n_obs - 1) logger.debug("Average diff: {}".format(np.mean(Ydevs, axis=0))) assert np.isfinite(C).all() # -------------------- # Convergence checking # -------------------- if convergence_metric(M, Mnew, tol) & convergence_metric(C, Cnew, tol): another_iter = False logger.debug("Convergence achieved.") another_iter = another_iter & (niters < max_iter) logger.debug("Iterations Continuing: {}".format(another_iter)) # ---------------------------- # Back to compositional space # --------------------------- logger.debug("Finished. Inverting to compositional space.") Xstar = inverse_ALR(Ystar, pos) return Xstar, prop_zeroes, niters