Source code for pyrolite.util.synthetic

"""
Utility functions for creating synthetic (geochemical) data.
"""
import numpy as np
import pandas as pd

from ..comp.codata import ILR, inverse_ILR
from ..geochem.ind import REE, get_ionic_radii
from ..geochem.norm import get_reference_composition
from ..util.lambdas.eval import get_function_components
from .log import Handle
from .meta import get_additional_params

logger = Handle(__name__)


[docs]def random_cov_matrix(dim, sigmas=None, validate=False, seed=None): """ Generate a random covariance matrix which is symmetric positive-semidefinite. Parameters ----------- dim : :class:`int` Dimensionality of the covariance matrix. sigmas : :class:`numpy.ndarray` Optionally specified sigmas for the variables. validate : :class:`bool` Whether to validate output. Returns -------- :class:`numpy.ndarray` Covariance matrix of shape :code:`(dim, dim)`. Todo ----- * Implement a characteristic scale for the covariance matrix. """ if seed is not None: np.random.seed(seed) # create a matrix of correlation coefficients corr = (np.random.rand(dim, dim) - 0.5) * 2 # values between -1 and 1 corr[np.tril_indices(dim)] = corr.T[np.tril_indices(dim)] # lower=upper corr[np.arange(dim), np.arange(dim)] = 1.0 if sigmas is None: sigmas = np.ones(dim).reshape(1, dim) else: sigmas = np.array(sigmas) sigmas = sigmas.reshape(1, dim) cov = sigmas.T @ sigmas # multiply by ~ variance cov *= corr cov = np.sign(cov) * np.sqrt(np.abs(cov) / dim) cov = cov @ cov.T if validate: try: assert (cov == cov.T).all() # eig = np.linalg.eigvalsh(cov) for i in range(dim): assert np.linalg.det(cov[0:i, 0:i]) > 0.0 # sylvesters criterion except AssertionError: # not symmetrical covariance matrix cov = random_cov_matrix(dim, validate=validate) return cov
[docs]def random_composition( size=1000, D=4, mean=None, cov=None, propnan=0.1, missing_columns=None, missing=None, seed=None, ): """ Generate a simulated random unimodal compositional dataset, optionally with missing data. Parameters ----------- size : :class:`int` Size of the dataset. D : :class:`int` Dimensionality of the dataset. mean : :class:`numpy.ndarray`, :code:`None` Optional specification of mean composition. cov : :class:`numpy.ndarray`, :code:`None` Optional specification of covariance matrix (in log space). propnan : :class:`float`, [0, 1) Proportion of missing values in the output dataset. missing_columns : :class:`int` | :class:`tuple` Specification of columns to be missing. If an integer is specified, interpreted to be the number of columns containin missing data (at a proportion defined by `propnan`). If a tuple or list, the specific columns to contain missing data. missing : :class:`str`, :code:`None` Missingness pattern. If not :code:`None`, one of :code:`"MCAR", "MAR", "MNAR"`. * If :code:`missing = "MCAR"`, data will be missing at random. * If :code:`missing = "MAR"`, data will be missing with some relationship to other parameters. * If :code:`missing = "MNAR"`, data will be thresholded at some lower bound. seed : :class:`int`, :code:`None` Random seed to use, optionally specified. Returns -------- :class:`numpy.ndarray` Simulated dataset with missing values. Todo ------ * Add feature to translate rough covariance in D to logcovariance in D-1 * Update the `:code:`missing = "MAR"`` example to be more realistic/variable. """ data = None # dimensions if mean is None and cov is None: pass elif mean is None: D = cov.shape[0] + 1 elif cov is None: mean = np.array(mean) D = mean.size else: # both defined mean, cov = np.array(mean), np.array(cov) assert mean.size == cov.shape[0] + 1 D = mean.size mean = mean.reshape(1, -1) if seed is not None: np.random.seed(seed) # mean if mean is None: if D > 1: mean = np.random.randn(D - 1).reshape(1, -1) else: # D == 1, return a 1D series data = np.exp(np.random.randn(size).reshape(size, D)) data /= np.nanmax(data) return data else: mean = ILR(mean.reshape(1, D)).reshape( 1, -1 ) # ILR of a (1, D) mean to (1, D-1) # covariance if cov is None: if D != 1: cov = random_cov_matrix( D - 1, sigmas=np.abs(mean) * 0.1, seed=seed ) # 10% sigmas else: cov = np.array([[1]]) assert cov.shape in [(D - 1, D - 1), (1, 1)] if size == 1: # single sample data = inverse_ILR(mean).reshape(size, D) # if the covariance matrix isn't for the logspace data, we'd have to convert it if data is None: data = inverse_ILR( np.random.multivariate_normal(mean.reshape(D - 1), cov, size=size) ).reshape(size, D) if missing_columns is None: nancols = ( np.random.choice( range(data.shape[1] - 1), size=int(data.shape[1] - 1), replace=False ) + 1 ) elif isinstance(missing_columns, int): # number of columns specified nancols = ( np.random.choice( range(data.shape[1] - 1), size=missing_columns, replace=False ) + 1 ) else: # tuples, list etc nancols = missing_columns if missing is not None: if missing == "MCAR": for i in nancols: data[np.random.randint(size, size=int(propnan * size)), i] = np.nan elif missing == "MAR": thresholds = np.percentile(data[:, nancols], propnan * 100, axis=0)[ np.newaxis, : ] # should update this such that data are proportional to other variables # potentially just by rearranging the where statement data[:, nancols] = np.where( data[:, nancols] < np.tile(thresholds, size).reshape(size, len(nancols)), np.nan, data[:, nancols], ) elif missing == "MNAR": thresholds = np.percentile(data[:, nancols], propnan * 100, axis=0)[ np.newaxis, : ] data[:, nancols] = np.where( data[:, nancols] < np.tile(thresholds, size).reshape(size, len(nancols)), np.nan, data[:, nancols], ) else: msg = "Provide a value for missing in {}".format( set(["MCAR", "MAR", "MNAR"]) ) raise NotImplementedError(msg) return data
[docs]def normal_frame( columns=["SiO2", "CaO", "MgO", "FeO", "TiO2"], size=10, mean=None, **kwargs ): r""" Creates a :class:`pandas.DataFrame` with samples from a single multivariate-normal distributed composition. Parameters ---------- columns : :class:`list` List of columns to use for the dataframe. These won't have any direct impact on the data returned, and are only for labelling. size : :class:`int` Index length for the dataframe. mean : :class:`numpy.ndarray`, :code:`None` Optional specification of mean composition. {otherparams} Returns -------- :class:`pandas.DataFrame` """ return pd.DataFrame( columns=columns, data=random_composition(size=size, D=len(columns), mean=mean, **kwargs), )
[docs]def normal_series(index=["SiO2", "CaO", "MgO", "FeO", "TiO2"], mean=None, **kwargs): """ Creates a :class:`pandas.Series` with a single sample from a single multivariate-normal distributed composition. Parameters ------------ index : :class:`list` List of indexes for the series. These won't have any direct impact on the data returned, and are only for labelling. mean : :class:`numpy.ndarray`, :code:`None` Optional specification of mean composition. {otherparams} Returns -------- :class:`pandas.Series` """ return pd.Series( random_composition(size=1, D=len(index), mean=mean, **kwargs).flatten(), index=index, )
[docs]def example_spider_data( start="EMORB_SM89", norm_to="PM_PON", size=120, noise_level=0.5, offsets=None, units="ppm", ): """ Generate some random data for demonstrating spider plots. By default, this generates a composition based around EMORB, normalised to Primitive Mantle. Parameters ----------- start : :class:`str` Composition to start with. norm_to : :class:`str` Composition to normalise to. Can optionally specify :code:`None`. size : :class:`int` Number of observations to include (index length). noise_level : :class:`float` Log-units of noise (1sigma). offsets : :class:`dict` Dictionary of offsets in log-units (in log units). units : :class:`str` Units to use before conversion. Should have no effect other than reducing calculation times if `norm_to` is :code:`None`. Returns -------- df : :class:`pandas.DataFrame` Dataframe of example synthetic data. """ ref = get_reference_composition(start) ref.set_units(units) df = ref.comp.pyrochem.compositional if norm_to is not None: df = df.pyrochem.normalize_to(norm_to, units=units) start = np.log(df) nindex = df.columns.size y = np.tile(start.values, size).reshape(size, nindex) y += np.random.normal(0, noise_level / 2.0, size=(size, nindex)) # noise y += np.random.normal(0, noise_level, size=(1, size)).T # random pattern offset syn_df = pd.DataFrame(y, columns=df.columns) if offsets is not None: for element, offset in offsets.items(): syn_df[element] += offset # significant offset for e.g. Eu anomaly syn_df = np.exp(syn_df) return syn_df
[docs]def example_patterns_from_parameters( fit_parameters, radii=None, n=100, proportional_noise=0.15, includes_tetrads=False, columns=None, ): """ """ fit_parameters = np.tile(fit_parameters, n).reshape(n, -1) if radii is None: radii = get_ionic_radii(REE(), coordination=8, charge=3) _, _, components = get_function_components(radii, fit_tetrads=includes_tetrads) pattern_df = pd.DataFrame( np.exp(fit_parameters @ np.array(components)), columns=columns ) # add some random correlated proportional noise sz = len(radii) cov = np.zeros((sz, sz)) for offset in np.arange(-sz + 1, sz): vals = np.ones(sz - np.abs(offset)) * np.abs((sz - np.abs(offset))) / sz cov += np.diag(vals**2, offset) noise = 1 + proportional_noise * np.random.multivariate_normal( np.zeros(sz), cov, size=pattern_df.shape[0] ) pattern_df *= noise return pattern_df
_add_additional_parameters = True normal_frame.__doc__ = normal_frame.__doc__.format( otherparams=[ "", get_additional_params( random_composition, header="Other Parameters", indent=8, subsections=True ), ][_add_additional_parameters] ) normal_series.__doc__ = normal_series.__doc__.format( otherparams=[ "", get_additional_params( random_composition, header="Other Parameters", indent=8, subsections=True ), ][_add_additional_parameters] )