Skip to content
Snippets Groups Projects
regress.py 14.9 KiB
Newer Older
DIANE's avatar
DIANE committed
from .lwplsr_julia_converted import lwpls
import streamlit as st
DIANE's avatar
DIANE committed
import numpy as np
from pandas import DataFrame
from utils.eval_metrics import metrics
from scipy.signal import savgol_filter
from sklearn.cross_decomposition import PLSRegression
from hyperopt import fmin, hp, tpe, Trials, space_eval, STATUS_OK, anneal

from utils.data_handling import Snv, No_transformation, KF_CV, sel_ratio
DIANE's avatar
DIANE committed


class Regmodel(object):
DIANE's avatar
DIANE committed
    def __init__(self, train, test, n_iter, add_hyperparams=None, remove_hyperparams=None, nfolds=3, **kwargs):
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
        self.SCORE = 100000000
        self._xc, self._xt, self._ytrain, self._ytest = train[0], test[0], train[1], test[1]
        self._nc, self._nt, self._p = train[0].shape[0], test[0].shape[0], train[0].shape[1]
        self._model, self._best = None, None
        self._yc, self._ycv, self._yt = None, None, None
        self._cv_df = DataFrame()
        self._sel_ratio = DataFrame()
        self._nfolds = nfolds
DIANE's avatar
DIANE committed
        self._selected_bands = DataFrame(index=['from', 'to'])
DIANE's avatar
DIANE committed
        self.important_features = None
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
        if self._xc.shape[1] > 1000:
            a = [15, 21, 27, 33]
        else:
            a = [5, 7, 9]
        self._hyper_params = {'polyorder': hp.choice('polyorder', [2]),
                              'deriv': hp.choice('deriv', [0, 1]),
                              # [15, 21, 27, 33]
                              'window_length': hp.choice('window_length', [9]),
                              'normalization': hp.choice('normalization', ['No_transformation'])}
        if remove_hyperparams is not None:
            for i in remove_hyperparams:
                self._hyper_params.pop(i, None)

DIANE's avatar
DIANE committed
        if add_hyperparams is not None:
            self._hyper_params.update(add_hyperparams)
            self._best = None

        trials = Trials()
        best_params = fmin(fn=self.objective,
DIANE's avatar
DIANE committed
                           space=self._hyper_params,
                           # Tree of Parzen Estimators’ (tpe) which is a Bayesian approach
                           algo=tpe.suggest,
                           max_evals=n_iter,
                           trials=trials,
                           verbose=1)

DIANE's avatar
DIANE committed
    @property
    def train_data_(self):
        return [self._xc, self._ytrain]
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
    @property
    def test_data_(self):
        return [self._xt, self._ytest]

    @property
    def pretreated_spectra_(self):
        return self.pretreated

    @property
DIANE's avatar
DIANE committed
    def get_params_(self):  # This method return the search space where the optimization algorithm will search for optimal subset of hyperparameters
        return self._hyper_params

DIANE's avatar
DIANE committed
    def objective(self, params):
DIANE's avatar
DIANE committed
        pass

DIANE's avatar
DIANE committed
    @property
DIANE's avatar
DIANE committed
    # This method returns the subset of selected hyperparametes
    def best_hyperparams_(self):
DIANE's avatar
DIANE committed
        return self._best
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
    @property
DIANE's avatar
DIANE committed
    # This method returns a sentence telling what signal preprocessing method was applied
    def best_hyperparams_print(self):
DIANE's avatar
DIANE committed
        if self._best['normalization'] == 'Snv':
            a = 'Standard Normal Variate (SNV)'

        elif self._best['normalization'] == 'No_transformation':
            a = " No transformation was performed"

DIANE's avatar
DIANE committed
        bb, cc, dd = str(self._best['window_length']), str(
            self._best['polyorder']), str(self._best['deriv'])
        SG = '- Savitzky-Golay derivative parameters:  \n(Window_length:' + \
            bb+';polynomial order:' + cc+'; Derivative order : ' + dd
DIANE's avatar
DIANE committed
        Norm = '- Spectral Normalization:  \n'+a
DIANE's avatar
DIANE committed
        return SG+"\n"+Norm
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
    @property
DIANE's avatar
DIANE committed
    def model_(self):  # This method returns the developed model
DIANE's avatar
DIANE committed
        return self._model
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
    @property
DIANE's avatar
DIANE committed
    def pred_data_(self):  # this method returns the predicted data in training and testing steps
DIANE's avatar
DIANE committed
        return self._yc, self._yt
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
    @property
DIANE's avatar
DIANE committed
    def cv_data_(self):  # Cross validation data
DIANE's avatar
DIANE committed
        return self._ycv
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
    @property
    def CV_results_(self):
        return self._cv_df
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
    @property
    def important_features_(self):
        return self.important_features
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
    @property
    def selected_features_(self):
        return self._selected_bands
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
    @property
    def sel_ratio_(self):
        return self._sel_ratio
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
########################################### PLSR   #########################################
DIANE's avatar
DIANE committed


DIANE's avatar
DIANE committed
class Plsr(Regmodel):
DIANE's avatar
DIANE committed
    def __init__(self, train, test, n_iter=10, cv=3):
        super().__init__(train, test, n_iter, nfolds=cv, add_hyperparams={
            'n_components': hp.randint('n_components', 1, 20)})
        # parameters in common

DIANE's avatar
DIANE committed
    def objective(self, params):
        params['n_components'] = int(params['n_components'])
        x0 = [self._xc, self._xt]
        x1 = []
        x1.append(eval(str(params['normalization'])+'(x0[0])'))
        x1.append(eval(str(params['normalization'])+'(x0[1])'))
DIANE's avatar
DIANE committed

        a, b, c = params['deriv'], params['polyorder'], params['window_length']
        if a > b or b > c:
            if self._best is not None:
                a, b, c = self._best['deriv'], self._best['polyorder'], self._best['window_length']

            else:
                a, b, c = 0, 0, 1

DIANE's avatar
DIANE committed
        params['deriv'], params['polyorder'], params['window_length'] = a, b, c
        x2 = [savgol_filter(x1[i], polyorder=params['polyorder'], deriv=params['deriv'],
                            window_length=params['window_length']) for i in range(2)]

        model = PLSRegression(scale=False, n_components=params['n_components'])
        folds = KF_CV().CV(x=x2[0], y=np.array(
            self._ytrain), n_folds=self._nfolds)
        yp = KF_CV().cross_val_predictor(
            model=model, folds=folds, x=x2[0], y=np.array(self._ytrain))
        self._cv_df = KF_CV().metrics_cv(
            y=np.array(self._ytrain), ypcv=yp, folds=folds)[1]

        score = self._cv_df.loc["mean", 'rmse'] / \
            np.max([0.01, self._cv_df.loc["mean", 'r2']])

        Model = PLSRegression(scale=False, n_components=params['n_components'])
DIANE's avatar
DIANE committed
        Model.fit(x2[0], self._ytrain)

        if self.SCORE > score:
            self.SCORE = score
DIANE's avatar
DIANE committed
            self._ycv = KF_CV().meas_pred_eq(y=np.array(self._ytrain), ypcv=yp, folds=folds)
DIANE's avatar
DIANE committed
            self._yc = Model.predict(x2[0])
            self._yt = Model.predict(x2[1])
            self._model = Model
DIANE's avatar
DIANE committed
            for key, value in params.items():
                try:
                    params[key] = int(value)
                except (TypeError, ValueError):
                    params[key] = value
DIANE's avatar
DIANE committed

            self._best = params
            self.pretreated = DataFrame(x2[0])
            self._sel_ratio = sel_ratio(Model, x2[0])
        return score

    ############################################ iplsr #########################################
DIANE's avatar
DIANE committed


DIANE's avatar
DIANE committed
class TpeIpls(Regmodel):
DIANE's avatar
DIANE committed
    def __init__(self, train, test, n_iter=10, n_intervall=5, cv=3, bestglobalparams=None):
        self.glob = bestglobalparams
        self._best = {}
        self.folds = KF_CV().CV(x=np.array(
            train[0]), y=np.array(train[1]), n_folds=3)
        x1 = [eval(str(self.glob['normalization'])+'(train[0])'),
              eval(str(self.glob['normalization'])+'(test[0])')]
        self.x2 = [savgol_filter(x1[i], polyorder=self.glob['polyorder'], deriv=self.glob['deriv'],
                                 window_length=self.glob['window_length']) for i in range(2)]
DIANE's avatar
DIANE committed
        self.n_intervall = n_intervall
        self.n_arrets = self.n_intervall*2

DIANE's avatar
DIANE committed
        add = {'n_components': hp.randint('n_components', 1, 20)}
        add.update({'v'+str(i): hp.randint('v'+str(i), 0,
                   train[0].shape[1]) for i in range(1, self.n_arrets+1)})
        super().__init__(train, test, n_iter, nfolds=cv, add_hyperparams=add)
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
        # parameters in common
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
    def objective(self, params):
        # wevelengths index
        self.idx = [params['v'+str(i)] for i in range(1, self.n_arrets+1)]
        self.idx.sort()
        arrays = [np.arange(self.idx[2*i], self.idx[2*i+1]+1)
                  for i in range(self.n_intervall)]
        id = np.unique(np.concatenate(arrays, axis=0), axis=0)
        prepared_data = [self.x2[i][:, id] for i in range(2)]
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
        # Modelling
        folds = KF_CV().CV(x=prepared_data[0], y=np.array(
            self._ytrain), n_folds=self._nfolds)
DIANE's avatar
DIANE committed
        try:
DIANE's avatar
DIANE committed
            model = PLSRegression(
                scale=False, n_components=params['n_components'])
            yp = KF_CV().cross_val_predictor(model=model, folds=folds,
                                             x=prepared_data[0], y=np.array(self._ytrain))
            self._cv_df = KF_CV().metrics_cv(
                y=np.array(self._ytrain), ypcv=yp, folds=folds)[1]
DIANE's avatar
DIANE committed
        except:
DIANE's avatar
DIANE committed
            params["n_components"] = 1
DIANE's avatar
DIANE committed
            model = PLSRegression(
                scale=False, n_components=params["n_components"])
            yp = KF_CV().cross_val_predictor(model=model, folds=folds,
                                             x=prepared_data[0], y=np.array(self._ytrain))
            self._cv_df = KF_CV().metrics_cv(
                y=np.array(self._ytrain), ypcv=yp, folds=folds)[1]
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
        score = self._cv_df.loc["mean", 'rmse'] / \
            np.max([0.01, self._cv_df.loc["mean", 'r2']])
DIANE's avatar
DIANE committed

        if self.SCORE > score:
            self.SCORE = score
            self._best = params
DIANE's avatar
DIANE committed
            self.arrays = arrays
            self.prepared_data = prepared_data
            self.model = model
DIANE's avatar
DIANE committed
        return score
DIANE's avatar
DIANE committed

    def best_fit(self):
        Model = PLSRegression(
            scale=False, n_components=self.model.n_components)
        Model.fit(self.prepared_data[0], self._ytrain)

        self._yc = Model.predict(self.prepared_data[0])
        yp = KF_CV().cross_val_predictor(model=Model, folds=self.folds,
                                         x=self.prepared_data[0], y=np.array(self._ytrain))
        self._ycv = KF_CV().meas_pred_eq(y=np.array(
            self._ytrain), ypcv=yp, folds=self.folds)
        self._yt = Model.predict(self.prepared_data[1])
        self._model = Model

        for key, value in self._best.items():
            try:
                self._best[key] = int(value)
            except (TypeError, ValueError):
                self._best[key] = value

        self.pretreated = DataFrame(self.x2[0])
        limits = np.ones(len(self.arrays)*2)
        for i in range(len(self.arrays)):
            limits[2*i], limits[2*i +
                                1] = self.arrays[i][0], self.arrays[i][self.arrays[i].shape[0]-1]

        self.limits = limits.astype(int)
DIANE's avatar
DIANE committed

    ###########################################  LWPLSR  #########################################
DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed

DIANE's avatar
DIANE committed
class LwplsObject:
DIANE's avatar
DIANE committed
    def __init__(self, Reg_json=None, pred=None):
DIANE's avatar
DIANE committed
        if Reg_json is not None and pred is not None:
            from pandas import json_normalize
            self.model_ = Reg_json['model']
            self.best_hyperparams_ = Reg_json['best_lwplsr_params']
            self.pred_data_ = [json_normalize(Reg_json[i]) for i in pred]
DIANE's avatar
DIANE committed

    ############################################  Pcr  #########################################


class LWPLS(Regmodel):
    def __init__(self, train, test, n_iter=10, cv=3, bestglobalparams=None):
        self.glob = bestglobalparams
        self._best = {}
        add = {
            'localplsVL': hp.randint('localplsVL', 2, bestglobalparams['n_components']),
            'dist': hp.choice('dist', ['euc', 'mah']),
            'h': hp.randint('h', 1, 3)}
        self.folds = KF_CV().CV(x=np.array(
            train[0]), y=np.array(train[1]), n_folds=3)

        x1 = [eval(str(self.glob['normalization'])+'(train[0])'),
              eval(str(self.glob['normalization'])+'(test[0])')]
        self.x2 = [savgol_filter(x1[i], polyorder=self.glob['polyorder'], deriv=self.glob['deriv'],
                                 window_length=self.glob['window_length']) for i in range(2)]
        super().__init__(train, test, n_iter, nfolds=cv,
                         add_hyperparams=add, remove_hyperparams=None)

    def objective(self, params):
        yp = {}
        for i in self.folds.keys():
            yp[i] = lwpls(Xtrain=np.delete(np.array(self.x2[0]), self.folds[i], axis=0),
                          ytrain=np.delete(
                              np.array(self._ytrain), self.folds[i], axis=0),
                          Xtest=np.array(self.x2[0])[self.folds[i]],
                          globalplsVL=self.glob['n_components'], metric=params['dist'], h=params['h'], k=200,
                          localplsVL=params['localplsVL'], center=True, scale=False, sklearn=True).ravel()

        self._cv_df = KF_CV().metrics_cv(y=np.array(
            self._ytrain), ypcv=yp, folds=self.folds)[1]
        score = self._cv_df.loc["mean", 'rmse'] / \
            np.max([0.01, self._cv_df.loc["mean", 'r2']])

        if self.SCORE > score:
            self.SCORE = score
            self._best = params
        return score

    def best_fit(self):
        from .lwplsr_julia_converted import lwpls
        yp = {}
        for i in self.folds.keys():
            yp[i] = lwpls(Xtrain=np.delete(np.array(self.x2[0]), self.folds[i], axis=0),
                          ytrain=np.delete(
                              np.array(self._ytrain), self.folds[i], axis=0),
                          Xtest=np.array(self.x2[0])[self.folds[i]],
                          globalplsVL=self.glob['n_components'], metric=self._best['dist'], h=self._best['h'], k=200,
                          localplsVL=self._best['localplsVL'], center=True, scale=False, sklearn=True).ravel()

        self._ycv = KF_CV().meas_pred_eq(y=np.array(
            self._ytrain), ypcv=yp, folds=self.folds)
        self._yt = lwpls(Xtrain=np.array(self.x2[0]),
                         ytrain=np.array(self._ytrain),
                         Xtest=np.array(self.x2[1]),
                         globalplsVL=self.glob['n_components'], metric=self._best['dist'], h=self._best['h'], k=200,
                         localplsVL=self._best['localplsVL'], center=True, scale=False, sklearn=True).ravel()
        self.pretreated = DataFrame(self.x2[0])
        self._model = "LW-PLS"
        for key, value in self._best.items():
            self._best[key] = int(value) if isinstance(value, np.int64) else float(
                value) if isinstance(value, np.float64) else value

        self._model = {'globalplsVL': self.glob['n_components'],
                       'localplsVL': self._best['localplsVL'],
                       'dist': self._best['dist'],
                       'k': 200,
                       'h': self._best['h']}
        self._best = {'normalization':self.glob['normalization'],
                          'polyorder':self.glob['polyorder'],
                          'window_length':self.glob['window_length'],
                          'deriv':self.glob['deriv'],
                          'globalplsVL': self.glob['n_components']}