Source code for pyrolite.util.skl.transform

import re

import numpy as np
import pandas as pd

from ...comp import codata
from ...geochem import ind, parse, transform
from ..lambdas.params import orthogonal_polynomial_constants
from ..log import Handle

logger = Handle(__name__)

try:
    from sklearn.base import BaseEstimator, TransformerMixin
except ImportError:
    msg = "scikit-learn not installed"
    logger.warning(msg)


[docs]class DropBelowZero(BaseEstimator, TransformerMixin): def __init__(self, **kwargs): """Transformer for scikit-learn like use.""" self.kpairs = kwargs self.label = "Feedthrough"
[docs] def transform(self, X, *args, **kwargs): if isinstance(X, pd.DataFrame) or isinstance(X, pd.Series): out = X.where(X > 0, np.nan) else: out = np.where(X > 0, X, np.nan) return out
[docs] def fit(self, X, *args): return self
[docs]class LinearTransform(BaseEstimator, TransformerMixin): def __init__(self, **kwargs): """Linear Transformer for scikit-learn like use.""" self.kpairs = kwargs self.label = "Feedthrough" self.forward = lambda x: x self.inverse = lambda x: x
[docs] def transform(self, X, *args, **kwargs): if isinstance(X, pd.DataFrame): out = X.copy(deep=True) out.loc[:, :] = self.forward(X.values, *args, **kwargs) elif isinstance(X, pd.Series): out = X.copy(deep=True) out.loc[:] = self.forward(X.values, *args, **kwargs) else: out = self.forward(np.array(X), *args, **kwargs) return out
[docs] def inverse_transform(self, Y, *args, **kwargs): if isinstance(Y, pd.DataFrame): out = Y.copy(deep=True) out.loc[:, :] = self.inverse(Y.values, *args, **kwargs) elif isinstance(Y, pd.Series): out = Y.copy(deep=True) out.loc[:] = self.inverse(Y.values, *args, **kwargs) else: out = self.inverse(np.array(Y), *args, **kwargs) return out
[docs] def fit(self, X, *args): return self
[docs]class ExpTransform(BaseEstimator, TransformerMixin): def __init__(self, **kwargs): """Exponential Transformer for scikit-learn like use.""" self.kpairs = kwargs self.label = "Feedthrough" self.forward = np.exp self.inverse = np.log
[docs] def transform(self, X, *args, **kwargs): if isinstance(X, pd.DataFrame): out = self.forward(X) elif isinstance(X, pd.Series): out = X.apply(self.forward) else: out = self.forward(np.array(X), *args, **kwargs) return out
[docs] def inverse_transform(self, Y, *args, **kwargs): if isinstance(Y, pd.DataFrame): out = self.inverse(Y) elif isinstance(Y, pd.Series): out = Y.apply(self.inverse) else: out = self.inverse(np.array(Y), *args, **kwargs) return out
[docs] def fit(self, X, *args): return self
[docs]class LogTransform(BaseEstimator, TransformerMixin): def __init__(self, fmt_string="Ln({})", **kwargs): """Log Transformer for scikit-learn like use.""" self.kpairs = kwargs self.label = "Feedthrough" self.forward = np.log self.inverse = np.exp self.fmt_string = fmt_string self.inverse_regex = "(?P<column>.)".join( [re.escape(s) for s in self.fmt_string.split("{}")] )
[docs] def transform(self, X, *args, **kwargs): if isinstance(X, pd.DataFrame): out = X.copy(deep=True) out.loc[:, :] = self.forward(X.values, *args, **kwargs) if self.fmt_string is not None: out.columns = [self.fmt_string.format(c) for c in X.columns] elif isinstance(X, pd.Series): out = X.copy(deep=True) out.loc[:] = self.forward(X.values, *args, **kwargs) if self.fmt_string is not None: out.name = self.fmt_string.format(X.name) else: out = self.forward(np.array(X), *args, **kwargs) return out
[docs] def inverse_transform(self, Y, *args, **kwargs): if isinstance(Y, pd.DataFrame): out = Y.copy(deep=True) out.loc[:, :] = self.inverse(Y.values, *args, **kwargs) if self.fmt_string is not None: # could store the original columns, but this is generalizable out.columns = [ re.findall(self.inverse_regex, "Ln(A)", re.DOTALL)[0] for c in out.columns ] elif isinstance(Y, pd.Series): out = Y.copy(deep=True) out.loc[:] = self.inverse(Y.values, *args, **kwargs) if self.fmt_string is not None: out.name = re.findall(self.inverse_regex, "Ln(A)", re.DOTALL)[0] else: out = self.inverse(np.array(Y), *args, **kwargs) return out
[docs] def fit(self, X, *args): return self
[docs]class ALRTransform(BaseEstimator, TransformerMixin): def __init__(self, label_mode="numeric", **kwargs): """Additive Log Ratio Transformer for scikit-learn like use.""" self.kpairs = kwargs self.label_mode = label_mode self.label = "ALR" self.forward = codata.ALR self.inverse = codata.inverse_ALR
[docs] def transform(self, X, *args, **kwargs): if isinstance(X, pd.DataFrame): out = X.pyrocomp.ALR( label_mode=self.label_mode, **{**self.kpairs, **kwargs} ) elif isinstance(X, pd.Series): out = X.to_frame().T.pyrocomp.ALR( label_mode=self.label_mode, **{**self.kpairs, **kwargs} ) else: out = self.forward(np.array(X), *args, **kwargs) return out
[docs] def inverse_transform(self, Y, *args, **kwargs): if isinstance(Y, pd.DataFrame): out = Y.pyrocomp.inverse_ALR(**kwargs) elif isinstance(Y, pd.Series): out = Y.to_frame().T.pyrocomp.inverse_ALR(**kwargs) else: out = self.inverse(np.array(Y), *args, **kwargs) return out
[docs] def fit(self, X, *args, **kwargs): return self
[docs]class CLRTransform(BaseEstimator, TransformerMixin): def __init__(self, label_mode="numeric", **kwargs): """Centred Log Ratio Transformer for scikit-learn like use.""" self.kpairs = kwargs self.label_mode = label_mode self.label = "CLR" self.forward = codata.CLR self.inverse = codata.inverse_CLR
[docs] def transform(self, X, *args, **kwargs): if isinstance(X, pd.DataFrame): out = X.pyrocomp.CLR( label_mode=self.label_mode, **{**self.kpairs, **kwargs} ) elif isinstance(X, pd.Series): out = X.to_frame().T.pyrocomp.CLR( label_mode=self.label_mode, **{**self.kpairs, **kwargs} ) else: out = self.forward(np.array(X), *args, **{**self.kpairs, **kwargs}) return out
[docs] def inverse_transform(self, Y, *args, **kwargs): if isinstance(Y, pd.DataFrame): out = Y.pyrocomp.inverse_CLR(**{**self.kpairs, **kwargs}) elif isinstance(Y, pd.Series): out = Y.to_frame().T.pyrocomp.inverse_CLR(**{**self.kpairs, **kwargs}) else: out = self.inverse(np.array(Y), *args, **{**self.kpairs, **kwargs}) return out
[docs] def fit(self, X, *args, **kwargs): return self
[docs]class ILRTransform(BaseEstimator, TransformerMixin): def __init__(self, label_mode="numeric", **kwargs): """Isometric Log Ratio Transformer for scikit-learn like use.""" self.kpairs = kwargs self.label_mode = label_mode self.label = "ILR" self.forward = codata.ILR self.inverse = codata.inverse_ILR self.X = None
[docs] def transform(self, X, *args, **kwargs): self.X = np.array(X) if isinstance(X, pd.DataFrame): out = X.pyrocomp.ILR( label_mode=self.label_mode, **{**self.kpairs, **kwargs} ) elif isinstance(X, pd.Series): out = X.to_frame().T.pyrocomp.ILR( label_mode=self.label_mode, **{**self.kpairs, **kwargs} ) else: out = self.forward(np.array(X), *args, **kwargs) return out
[docs] def inverse_transform(self, Y, *args, **kwargs): if "X" not in kwargs: if self.X is not None: kwargs.update(dict(X=self.X)) if isinstance(Y, pd.DataFrame): out = Y.pyrocomp.inverse_ILR(**kwargs) elif isinstance(Y, pd.Series): out = Y.to_frame().T.pyrocomp.inverse_ILR(**kwargs) else: out = self.inverse(np.array(Y), *args, **kwargs) return out
[docs] def fit(self, X, *args, **kwargs): return self
[docs]class SphericalCoordTransform(BaseEstimator, TransformerMixin): def __init__(self, **kwargs): """Spherical Coordinate Transformer for scikit-learn like use.""" self.kpairs = kwargs self.label = "SphericalCoordTransform" self.forward = codata.sphere self.inverse = codata.inverse_sphere
[docs] def transform(self, X, *args, **kwargs): if isinstance(X, pd.DataFrame): out = X.pyrocomp.sphere(**{**self.kpairs, **kwargs}) elif isinstance(X, pd.Series): out = X.to_frame().T.pyrocomp.sphere(**{**self.kpairs, **kwargs}) else: out = self.forward(np.array(X), *args, **kwargs) return out
[docs] def inverse_transform(self, Y, *args, **kwargs): if isinstance(Y, pd.DataFrame): out = Y.pyrocomp.inverse_sphere(**kwargs) elif isinstance(Y, pd.Series): out = Y.to_frame().T.pyrocomp.inverse_sphere(**kwargs) else: out = self.inverse(np.array(Y), *args, **kwargs) return out
[docs] def fit(self, X, *args, **kwargs): return self
[docs]class BoxCoxTransform(BaseEstimator, TransformerMixin): def __init__(self, **kwargs): """BoxCox Transformer for scikit-learn like use.""" self.kpairs = kwargs self.label = "BoxCox" self.forward = codata.boxcox self.inverse = codata.inverse_boxcox self.lmbda = None
[docs] def transform(self, X, *args, **kwargs): self.X = np.array(X) if "lmbda" not in kwargs: if not (self.lmbda is None): kwargs.update(dict(lmbda=self.lmbda)) data = self.forward(X, *args, **kwargs) else: kwargs.update(dict(return_lmbda=True)) data, lmbda = self.forward(X, *args, **kwargs) self.lmbda = lmbda return data
[docs] def inverse_transform(self, Y, *args, **kwargs): if "lmbda" not in kwargs: kwargs.update(dict(lmbda=self.lmbda)) return self.inverse(Y, *args, **kwargs)
[docs] def fit(self, X, *args, **kwargs): bc_data, lmbda = self.forward(X, *args, **kwargs) self.lmbda = lmbda
[docs]class Devolatilizer(BaseEstimator, TransformerMixin): def __init__( self, exclude=["H2O", "H2O_PLUS", "H2O_MINUS", "CO2", "LOI"], renorm=True ): """Devolatilization transformer for scikit-learn like use.""" self.exclude = [i.upper() for i in exclude] self.renorm = renorm
[docs] def fit(self, X, y=None): return self
[docs] def transform(self, X): assert isinstance(X, pd.DataFrame) exclude = [i for i in X.columns if i.upper() in self.exclude] return transform.devolatilise(X, exclude=exclude, renorm=self.renorm)
[docs]class ElementAggregator(BaseEstimator, TransformerMixin): def __init__(self, renorm=True, form="oxide"): """Element-based aggregation transformer for scikit-learn like use.""" self.renorm = renorm self.form = form
[docs] def fit(self, X, y=None): return self
[docs] def transform(self, X): assert isinstance(X, pd.DataFrame) multiple_entries = parse.check_multiple_cation_inclusion(X) for el in multiple_entries: if self.form == "oxide": out = ind.simple_oxides(el)[0] else: out = el X = transform.aggregate_element(X, to=out) return X
[docs]class LambdaTransformer(BaseEstimator, TransformerMixin): def __init__( self, norm_to="Chondrite_PON", exclude=["Pm", "Eu", "Ce"], params=None, degree=5 ): """Lambda coefficient transformer for scikit-learn like use.""" self.norm_to = norm_to self.ree = [i for i in ind.REE() if i not in exclude] self.radii = np.array(ind.get_ionic_radii(self.ree, charge=3, coordination=8)) self.exclude = exclude if params is None: self.degree = degree self.params = orthogonal_polynomial_constants( self.radii, degree=self.degree ) else: self.params = params self.degree = len(params)
[docs] def fit(self, X, y=None): return self
[docs] def transform(self, X): assert isinstance(X, pd.DataFrame) ree_present = [i in X.columns for i in self.ree] if not all(ree_present): self.ree = [i for i in self.ree if i in X.columns] self.radii = self.radii[ree_present] self.params = orthogonal_polynomial_constants( self.radii, degree=self.degree ) return transform.lambda_lnREE( X, norm_to=self.norm_to, params=self.params, degree=self.degree, exclude=self.exclude, )