Source code for satlas.models.basemodel

"""
Implementation of base class for extension to models describing actual data.

.. moduleauthor:: Wouter Gins <wouter.gins@kuleuven.be>
.. moduleauthor:: Ruben de Groote <ruben.degroote@kuleuven.be>
"""
import copy

import lmfit as lm
from satlas.loglikelihood import create_gaussian_priormap
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from copy import deepcopy

__all__ = ['load_model']

class SATLASParameters(lm.Parameters):
    _prefix = ''

    def __getitem__(self, key):
        try:
            return super(SATLASParameters, self).__getitem__(key)
        except KeyError:
            try:
                return super(SATLASParameters, self).__getitem__(self._prefix + key)
            except KeyError:
                raise

    def __deepcopy__(self, memo):
        """Parameters deepcopy needs to make sure that
        asteval is available and that all individula
        parameter objects are copied"""
        _pars = SATLASParameters(asteval=None)

        # find the symbols that were added by users, not during construction
        sym_unique = self._asteval.user_defined_symbols()
        unique_symbols = {key: deepcopy(self._asteval.symtable[key], memo)
                              for key in sym_unique}
        _pars._asteval.symtable.update(unique_symbols)

        # we're just about to add a lot of Parameter objects to the newly
        parameter_list = []
        for key, par in self.items():
            if isinstance(par, lm.Parameter):
                param = lm.Parameter(name=par.name,
                                  value=par.value,
                                  min=par.min,
                                  max=par.max)
                param.vary = par.vary
                param.stderr = par.stderr
                param.correl = par.correl
                param.init_value = par.init_value
                param.expr = par.expr
                parameter_list.append(param)

        _pars.add_many(*parameter_list)
        _pars._prefix = self._prefix
        return _pars

[docs]class BaseModel(object): """Abstract baseclass for all models. For input, see these classes."""
[docs] def __init__(self): super(BaseModel, self).__init__() self._expr = {} self._vary = {} self._constraints = {} self._params = None self._lnprior_mapping = {} self._chisquare_mapping = {} self._prefix = ''
def _set_prefix(self, value): for p in self._parameters: if len(self._prefix) > 0: if self._parameters[p].expr is not None: self._parameters[p].expr = self._parameters[p].expr.replace(self._prefix, value) else: if self._parameters[p].expr is not None: for P in self._parameters: if P in self._parameters[p].expr: self._parameters[p].expr = self._parameters[p].expr.replace(P, value + P) for p in list(self._parameters.keys()): if len(self._prefix) > 0: self._parameters[p].name = self._parameters[p].name[len(self._prefix):] self._parameters[p].name = value + self._parameters[p].name self._parameters[value + p] = self._parameters.pop(p) self._parameters._prefix = value self._prefix = value def _add_prefix(self, value): for p in self._parameters: if self._parameters[p].expr is not None: if len(self._prefix) > 0: self._parameters[p].expr = self._parameters[p].expr.replace(self._prefix, value + self._prefix) else: for P in self._parameters: if P in self._parameters[p].expr: self._parameters[p].expr = self._parameters[p].expr.replace(P, value + P) for p in list(self._parameters.keys()): self._parameters[p].name = value + self._parameters[p].name self._parameters[value + p] = self._parameters.pop(p) self._parameters._prefix = value + self._parameters._prefix self._prefix = value + self._prefix def set_literature_values(self, literatureDict): """Sets the lnprior and chisquare mapping to handle the given literature values and uncertainties. Parameters ---------- literatureDict: dictionary Dictionary with the parameter names as keys. Each key should correspond to a dictionary containing a 'value' and 'uncertainty' key.""" priorDict = {} chisquareDict = {} for k in literatureDict: v, u = literatureDict[k]['value'], literatureDict[k]['uncertainty'] prior = create_gaussian_priormap(v, u) priorDict[k] = prior chisquare = lambda value: (value - v) / u chisquareDict[k] = chisquare self.set_chisquare_mapping(chisquareDict) self.set_lnprior_mapping(priorDict) def set_lnprior_mapping(self, mappingDict): """Sets the prior mapping for the different parameters. This will affect likelihood fits. Parameters ---------- mappingDict: dictionary Dictionary containing the functions that give the prior for the given parameter value. Use the parameter names as keys.""" for k in mappingDict.keys(): self._lnprior_mapping[k] = copy.deepcopy(mappingDict[k]) def set_chisquare_mapping(self, mappingDict): """Sets the prior mapping for the different parameters. This will affect chisquare fits. Parameters ---------- mappingDict: dictionary Dictionary containing the functions that give the prior for the given parameter value. Use the parameter names as keys.""" for k in mappingDict.keys(): self._chisquare_mapping[k] = copy.deepcopy(mappingDict[k]) def set_value(self, valueDict): """Sets the value of the given parameters to the given values. Parameters ---------- valueDict: dictionary Dictionary containing the values for the parameters with the parameter names as keys.""" for key in valueDict: try: self.params[key].value = valueDict[key] except KeyError: pass def set_expr(self, exprDict): """Sets the expression of the selected parameters to the given expressions. Parameters ---------- exprDict: dictionary Dictionary containing the expressions for the parameters with the parameter names as keys.""" for k in exprDict.keys(): self._expr[k] = copy.deepcopy(exprDict[k]) try: self.params[k].expr = self._expr[k] except KeyError: pass def set_variation(self, varyDict): """Sets the variation of the fitparameters as supplied in the dictionary. Parameters ---------- varyDict: dictionary A dictionary containing 'key: True/False' mappings with the parameter names as keys.""" p = self.params.copy() for k in varyDict.keys(): self._vary[k] = copy.deepcopy(varyDict[k]) try: p[k].vary = self._vary[k] except KeyError: pass self.params = p.copy() def set_boundaries(self, boundaryDict): """Sets the boundaries of the fitparameters as supplied in the dictionary. Parameters ---------- boundaryDict: dictionary A dictionary containing "key: {'min': value, 'max': value}" mappings. A missing 'min' or 'max' key gives no boundary in that direction. The parameter names have to be used as keys.""" p = self.params.copy() for k in boundaryDict.keys(): self._constraints[k] = copy.deepcopy(boundaryDict[k]) for bound in self._constraints[k].keys(): b = self._constraints[k][bound] try: if bound.lower() == 'min': p[k].min = b if b is not None else -np.inf elif bound.lower() == 'max': p[k].max = b if b is not None else np.inf else: pass except KeyError: raise self.params = p.copy() def _check_variation(self, par): # Make sure the variations in the params are set correctly. for key in self._vary.keys(): try: par[key].vary = self._vary[key] except KeyError: pass for key in self._constraints.keys(): for bound in self._constraints[key]: try: if bound.lower() == 'min': par[key].min = self._constraints[key][bound] elif bound.lower() == 'max': par[key].max = self._constraints[key][bound] else: pass except KeyError: pass for key in self._expr.keys(): try: par[key].expr = self._expr[self._prefix + key] except KeyError: pass return par.copy() def get_chisquare_mapping(self): return np.array([self._chisquare_mapping[k](self.params[k].value) for k in self._chisquare_mapping.keys()]) def get_lnprior_mapping(self, params): # Check if the parameter values are within the acceptable range. for key in params.keys(): par = params[key] if par.vary: try: leftbound, rightbound = (par.priormin, par.priormax) except AttributeError: leftbound, rightbound = par.min, par.max leftbound = -np.inf if leftbound is None else leftbound rightbound = np.inf if rightbound is None else rightbound if not leftbound < par.value < rightbound: return -np.inf # If defined, calculate the lnprior for each seperate parameter return_value = 1.0 for key in self._lnprior_mapping.keys(): return_value += self._lnprior_mapping[key](params[key].value) return return_value def display_mle_fit(self, scaled=False, **kwargs): """Give a readable overview of the result of the MLE fitting routine. Warning ------- The uncertainty shown is the largest of the asymmetrical errors! Work is being done to incorporate asymmetrical errors in the report; for now, rely on the correlation plot.""" if hasattr(self, 'fit_mle'): if 'show_correl' not in kwargs: kwargs['show_correl'] = False print('NDoF: {:d}, Chisquare: {:.8G}, Reduced Chisquare: {:.8G}'.format(self.ndof_mle, self.chisqr_mle, self.redchi_mle)) if scaled: print('Errors scaled with reduced chisquare.') par = copy.deepcopy(self.fit_mle) for p in par: if par[p].stderr is not None: par[p].stderr *= (self.redchi_mle**0.5) print(lm.fit_report(par, **kwargs)) else: print('Errors not scaled with reduced chisquare.') print(lm.fit_report(self.fit_mle, **kwargs)) else: print('Model has not yet been fitted with this method!') def display_chisquare_fit(self, scaled=False, **kwargs): """Display all relevent info of the least-squares fitting routine, if this has been performed. Parameters ---------- kwargs: misc Keywords passed on to :func:`fit_report` from the LMFit package.""" if hasattr(self, 'chisq_res_par'): print('NDoF: {:d}, Chisquare: {:.8G}, Reduced Chisquare: {:.8G}'.format(self.ndof_chi, self.chisqr_chi, self.redchi_chi)) print('Akaike Information Criterium: {:.8G}, Bayesian Information Criterium: {:.8G}'.format(self.aic_chi, self.bic_chi)) if scaled: print('Errors scaled with reduced chisquare.') par = copy.deepcopy(self.chisq_res_par) for p in par: if par[p].stderr is not None: par[p].stderr *= (self.redchi_chi**0.5) print(lm.fit_report(par, **kwargs)) else: print('Errors not scaled with reduced chisquare.') print(lm.fit_report(self.chisq_res_par, **kwargs)) else: print('Spectrum has not yet been fitted with this method!') def get_goodness_of_fit(self, selection='chisquare'): if selection.lower() == 'chisquare': return (self.ndof_chi, self.chisqr_chi, self.redchi_chi, self.aic_chi, self.bic_chi) elif selection.lower() == 'mle': return (self.ndof_mle, self.chisqr_mle, self.redchi_mle, self.aic_mle, self.bic_mle)
[docs] def get_result(self, selection='any'): """Return the variable names, values and estimated error bars for the parameters as seperate lists. Parameters ---------- selection: string, optional Selects if the chisquare ('chisquare' or 'any') or MLE values are used. Defaults to 'any'. Returns ------- names, values, uncertainties: tuple of lists Returns a 3-tuple of lists containing the names of the parameters, the values and the estimated uncertainties.""" var, var_names, varerr = [], [], [] if hasattr(self, 'chisq_res_par') and (selection.lower() == 'chisquare' or selection.lower() == 'any'): for key in sorted(self.chisq_res_par.keys()): if self.chisq_res_par[key].vary: var.append(self.chisq_res_par[key].value) var_names.append(self.chisq_res_par[key].name) varerr.append(self.chisq_res_par[key].stderr) elif hasattr(self, 'fit_mle'): for key in sorted(self.fit_mle.params.keys()): if self.fit_mle.params[key].vary: var.append(self.fit_mle.params[key].value) var_names.append(self.fit_mle.params[key].name) varerr.append(self.fit_mle.params[key].stderr) else: params = self.params for key in sorted(params.keys()): if params[key].vary: var.append(params[key].value) var_names.append(params[key].name) varerr.append(None) return var_names, var, varerr
[docs] def get_result_frame(self, method='chisquare', selected=False, bounds=False, vary=False, scaled=False): """Returns the data from the fit in a pandas DataFrame. Parameters ---------- method: str, optional Selects which fitresults have to be loaded. Can be 'chisquare' or 'mle'. Defaults to 'chisquare'. selected: list of strings, optional Selects the parameters that have any string in the list as a substring in their name. Set to *None* to select all parameters. Defaults to *None*. bounds: boolean, optional Selects if the boundary also has to be given. Defaults to *False*. vary: boolean, optional Selects if only the parameters that have been varied have to be supplied. Defaults to *False*. scaled: boolean, optional Sets the uncertainty scaling with the reduced chisquare value. Default to *False*. Returns ------- resultframe: DataFrame Dateframe with MultiIndex, using the variable names as main column names and either two subcolumns for the value and the uncertainty, or four subcolumns for the value, uncertainty and bounds.""" if method.lower() == 'chisquare': if scaled: p = copy.deepcopy(self.chisq_res_par) for par in p: p[par].stderr *= self.redchi_chi**0.5 else: p = self.chisq_res_par elif method.lower() == 'mle': if scaled: p = copy.deepcopy(self.fit_mle) for par in p: p[par].stderr *= self.redchi_mle**0.5 else: p = self.fit_mle values = p.values() if selected: values = [v for n in self.selected for v in values if n in v.name] if vary: values = [v for v in values if v.vary] if bounds: ind = ['Value', 'Uncertainty', 'Upper Bound', 'Lower Bound'] data = np.array([[p.value, p.stderr, p.max, p.min] for p in values]).flatten() columns = [[p.name for pair in zip(values, values) for p in pair], [x for p in values for x in ind]] else: data = np.array([[p.value, p.stderr] for p in values]).flatten() ind = ['Value', 'Uncertainty'] columns = [[p.name for pair in zip(values, values) for p in pair], [x for p in values for x in ind]] columns = pd.MultiIndex.from_tuples(list(zip(*columns))) result = pd.DataFrame(data, index=columns).T if method.lower() == 'chisquare': result.loc[:, 'Chisquare'] = pd.Series(np.array([self.chisqr_chi]), index=result.index) result.loc[:, 'Reduced chisquare'] = pd.Series(np.array([self.redchi_chi]), index=result.index) result.loc[:, 'NDoF'] = pd.Series(np.array([self.ndof_chi]), index=result.index) else: result.loc[:, 'Chisquare'] = pd.Series(np.array([self.chisqr_mle]), index=result.index) result.loc[:, 'Reduced chisquare'] = pd.Series(np.array([self.redchi_mle]), index=result.index) result.loc[:, 'NDoF'] = pd.Series(np.array([self.ndof_mle]), index=result.index) return result
[docs] def get_result_dict(self, method='chisquare', scaled=False): """Returns the fitted parameters in a dictionary of the form {name: [value, uncertainty]}. Parameters ---------- method: {'chisquare', 'mle'} Selects which parameters have to be returned. scaled: boolean Selects if, in case of chisquare parameters, the uncertainty has to be scaled by sqrt(reduced_chisquare). Defaults to *False*. Returns ------- dict Dictionary of the form described above.""" if method.lower() == 'chisquare': if scaled: p = copy.deepcopy(self.chisq_res_par) for par in p: p[par].stderr *= self.redchi**0.5 else: p = self.chisq_res_par else: if scaled: p = copy.deepcopy(self.fit_mle) for par in p: p[par].stderr *= self.redchi_mle**0.5 else: p = self.fit_mle returnDict = {P: [p[P].value, p[P].stderr] for P in p} return returnDict
def save(self, path): """Saves the current spectrum, including the results of the fitting and the parameters, to the specified file. Parameters ---------- path: string Name of the file to be created.""" import pickle with open(path, 'wb') as f: pickle.dump(self, f) ####################################### # METHODS CALLED BY FITTING # ####################################### def _sanitize_input(self, x, y, yerr=None): return x, y, yerr def seperate_response(self, x): """Wraps the output of the :meth:`__call__` in a list, for ease of coding in the fitting routines.""" return [self(x)] def __add__(self, other): """Add two spectra together to get an :class:`.SumModel`. Parameters ---------- other: BaseModel Other spectrum to add. Returns ------- SumModel A SumModel combining both spectra.""" from .summodel import SumModel if isinstance(other, SumModel): l = [self] + other.models else: l = [self, other] return SumModel(l) def __radd__(self, other): if other == 0: return self else: return self.__add__(other) def __call__(self, x): raise NotImplementedError("Method has to be implemented in subclass!")
def load_model(path): """Loads the saved BaseModel and returns the reconstructed object. Parameters ---------- path: string Location of the saved model. Returns ------- model: BaseModel Saved BaseModel/child class instance.""" import pickle with open(path, 'rb') as f: model = pickle.load(f) try: for m in model.models: m._parameters._prefix = m._prefix except: model._parameters._prefix = model._prefix return model