Source code for pyrolite.comp.aggregate

import warnings

import numpy as np
import pandas as pd

from ..util.log import Handle
from .codata import ALR, inverse_ALR

logger = Handle(__name__)


[docs]def get_full_column(X: np.ndarray): """ Returns the index of the first array column which contains only finite numbers (i.e. no missing data, nan, inf). Parameters --------------- X : :class:`numpy.ndarray` Array for which to find the first full column within. Returns -------- :class:`int` Index of the first full column. """ if len(X.shape) == 1: X = X.reshape((1, *X.shape)) inds = np.arange(X.shape[1]) wherenonnan = np.isfinite(X).all(axis=0) ind = inds[wherenonnan][0] return ind
[docs]def weights_from_array(X: np.ndarray): """ Returns a set of equal weights with size equal to that of the first axis of an array. Parameters --------------- X : :class:`numpy.ndarray` Array of compositions to produce weights for. Returns -------- :class:`numpy.ndarray` Array of weights. """ wts = np.ones((X.shape[0])) return wts / np.sum(wts)
[docs]def nan_weighted_mean(X: np.ndarray, weights=None): """ Returns a weighted mean of compositions, where weights are renormalised to account for missing data. Parameters --------------- X : :class:`numpy.ndarray` Array of compositions to take a weighted mean of. weights : :class:`numpy.ndarray` Array of weights. Returns -------- :class:`numpy.ndarray` Array mean. """ if weights is None: weights = weights_from_array(X) weights = np.array(weights) / np.nansum(weights) mask = (np.isnan(X) + np.isinf(X)) > 0 if not mask.any(): return np.average(X, weights=weights, axis=0) else: return np.ma.average(np.ma.array(X, mask=mask), weights=weights, axis=0)
[docs]def compositional_mean(df, weights=[], **kwargs): """ Implements an aggregation using a compositional weighted mean. Parameters --------------- df : :class:`pandas.DataFrame` Dataframe of compositions to aggregate. weights : :class:`numpy.ndarray` Array of weights. Returns -------- :class:`pandas.Series` Mean values along index of dataframe. """ non_nan_cols = df.dropna(axis=1, how="all").columns assert not df.loc[:, non_nan_cols].isna().values.any() mean = df.iloc[0, :].copy() if not weights: weights = np.ones(len(df.index.values)) weights = np.array(weights) / np.nansum(weights) logmean = ALR(df.loc[:, non_nan_cols].values).T @ weights[:, np.newaxis] # this renormalises by default mean.loc[non_nan_cols] = inverse_ALR(logmean.T.squeeze()) return mean
[docs]def nan_weighted_compositional_mean( X: np.ndarray, weights=None, ind=None, renorm=True, **kwargs ): """ Implements an aggregation using a weighted mean, but accounts for nans. Requires at least one non-nan column for ALR mean. When used for internal standardisation, there should be only a single common element - this would be used by default as the divisor here. When used for multiple-standardisation, the [specified] or first common element will be used. Parameters --------------- X : :class:`numpy.ndarray` Array of compositions to aggregate. weights : :class:`numpy.ndarray` Array of weights. ind : :class:`int` Index of the column to use as the ALR divisor. renorm : :class:`bool`, :code:`True` Whether to renormalise the output compositional mean to unity. Returns ------- :class:`numpy.ndarray` An array with the mean composition. """ if X.ndim == 1: # if it's a single row return X else: if weights is None: weights = weights_from_array(X) else: weights = np.array(weights) / np.sum(weights, axis=-1) if ind is None: # take the first column which has no nans ind = get_full_column(X) if X.ndim < 3 and X.shape[0] == 1: div = X[:, ind].squeeze() # check this else: div = X[:, ind].squeeze()[:, np.newaxis] logvals = np.log(np.divide(X, div)) mean = np.nan * np.ones(X.shape[1:]) ixs = np.arange(logvals.shape[1]) if X.ndim == 2: indexes = ixs elif X.ndim == 3: iys = np.arange(logvals.shape[2]) indexes = np.ixs_(ixs, iys) mean[indexes] = nan_weighted_mean(logvals[:, indexes], weights=weights) mean = np.exp(mean.squeeze()) if renorm: mean /= np.nansum(mean) return mean
[docs]def cross_ratios(df: pd.DataFrame): """ Takes ratios of values across a dataframe, such that columns are denominators and the row indexes the numerators, to create a square array. Returns one array per record. Parameters --------------- df : :class:`pandas.DataFrame` Dataframe of compositions to create ratios of. Returns ------- :class:`numpy.ndarray` A 3D array of ratios. """ ratios = np.ones((len(df.index), len(df.columns), len(df.columns))) for idx in range(df.index.size): row_vals = df.iloc[idx, :].values r1 = row_vals.T[:, np.newaxis] @ np.ones_like(row_vals)[np.newaxis, :] ratios[idx] = r1 / r1.T return ratios
[docs]def np_cross_ratios(X: np.ndarray, debug=False): """ Takes ratios of values across an array, such that columns are denominators and the row indexes the numerators, to create a square array. Returns one array per record. Parameters --------------- X : :class:`numpy.ndarray` Array of compositions to create ratios of. Returns ------- :class:`numpy.ndarray` A 3D array of ratios. """ X = X.copy() with warnings.catch_warnings(): # can get invalid values which raise RuntimeWarnings # consider changing to np.errstate warnings.simplefilter("ignore", category=RuntimeWarning) X[X <= 0] = np.nan if X.ndim == 1: index_length = 1 X = X.reshape((1, *X.shape)) else: index_length = X.shape[0] dims = X.shape[-1] ratios = np.ones((index_length, dims, dims)) for idx in range(index_length): row_vals = X[idx, :] r1 = row_vals.T[:, np.newaxis] @ np.ones_like(row_vals)[np.newaxis, :] ratios[idx] = r1.T / r1 if debug: try: diags = ratios[:, np.arange(dims), np.arange(dims)] # check all diags are 1. assert np.allclose(diags, 1.0) except: # check all diags are 1. or nan assert np.allclose(diags[~np.isnan(diags)], 1.0) return ratios
[docs]def standardise_aggregate( df: pd.DataFrame, int_std=None, fixed_record_idx=0, renorm=True, **kwargs ): """ Performs internal standardisation and aggregates dissimilar geochemical records. Note: this changes the closure parameter, and is generally intended to integrate major and trace element records. Parameters --------------- df : :class:`pandas.DataFrame` Dataframe of compositions to aggregate of. int_std : :class:`str` Name of the internal standard column. fixed_record_idx : :class:`int` Numeric index of a specific record's for which to retain the internal standard value (e.g for standardising trace element data). renorm : :class:`bool`, :code:`True` Whether to renormalise to unity. Returns ------- :class:`pandas.Series` A series representing the internally standardised record. """ if df.index.size == 1: # catch single records return df else: if int_std is None: # Get the 'internal standard column' potential_int_stds = df.count()[df.count() == df.count().max()].index.values assert len(potential_int_stds) > 0 # Use an internal standard int_std = potential_int_stds[0] if len(potential_int_stds) > 1: logger.info("Multiple int. stds possible. Using " + str(int_std)) non_nan_cols = df.dropna(axis=1, how="all").columns assert len(non_nan_cols) mean = nan_weighted_compositional_mean( df.values, ind=df.columns.get_loc(int_std), renorm=False ) ser = pd.Series(mean, index=df.columns) multiplier = ( df.iloc[fixed_record_idx, df.columns.get_loc(int_std)] / ser[int_std] ) ser *= multiplier if renorm: ser /= np.nansum(ser.values) return ser