Skip to content
Snippets Groups Projects
Commit e8a902ac authored by Nicolas Barthes's avatar Nicolas Barthes
Browse files

add lwplsr traduced from Julia to Python !

Need to add it as a new
parent 4831e785
No related branches found
No related tags found
No related merge requests found
import numpy as np
import numpy.typing as npt
from weighted_ikpls import PLS
def mad(X, zmed, axis=1, keepdims=True):
"""
Compute median absolute deviation row wise in X.
Input:
X = n x d matrix. n=rows and d=features
zmed = row wise median in X
Output:
mad = n x 1 matrix. n=rows col = mad of row
"""
mad = 1.4826 * np.median(np.abs(X-zmed),axis=axis, keepdims=keepdims)
return mad
def dist_to_weights(neighbors, h, criw = 4):
"""
Convert distances to weights using a decreasing exponential function : exp(-dist / (h * MAD(dist))).
Input:
neighbors = n x d matrix. n=samples and d=distances to k nearest neighbors
h : A scaling positive scalar defining the shape of the weight function.
criw : A positive scalar defining outliers in the distances vector `d`.
Output:
weights = n x d matrix
"""
zmed = np.median(neighbors, axis=1).reshape(-1,1)
zmad = mad(neighbors,zmed)
cutoff = zmed + criw * zmad
weights = np.where(neighbors < cutoff, np.exp(-neighbors / (zmad * h)), 10**-6)
return weights/np.max(weights, axis=1).reshape(-1,1)
def isposdef(X) -> bool:
"""
Check if a matrix is positively defined or not
Input:
X = n x d matrix. n=rows and d=features
Output:
Bool True or False
"""
try:
np.linalg.cholesky(X)
return True
except:
return False
def mahala(X):
"""
Prepare Uinv matrix for Mahalanobis distance in knn.
Input:
X = n x d matrix. n=rows and d=features
Output:
Uinv = d x d matrix
"""
S = np.cov(X, rowvar=False, bias=True)
if X.shape[1] == 1:
Uinv = np.linalg.inv(np.sqrt(S))
else:
if isposdef(S) == False:
Uinv = np.diag(1 / np.diagonal(S))
else:
Uinv = np.linalg.inv(np.linalg.cholesky(np.conj(S.T)).T)
return Uinv
def knn(x:npt.ArrayLike, X:npt.ArrayLike, metric = 'euc', k_neighbors:int=500, sklearn=True):
"""
Finds the k nearest neighbors of x in X.
Input:
x = m x d matrix. m=rows and d=features (same number of features as X)
X = n x d matrix. n=rows and d=features
metric = used to compute distance in samples - Euclidean (default) or Mahalanobis
k_neighbors = number of nearest neighbors to be found
sklearn = Boolean - if True, Sklearn KDTree is used ; if False, SciPy KDTree is used
Output:
dists = distances between xTrain/xTest points. Size of n x m
indices = kxm matrix with indices of yTrain labels
"""
N = X.shape[0]
if sklearn == True:
from sklearn.neighbors import KDTree
from sklearnex import patch_sklearn
patch_sklearn()
params = dict(leaf_size=200, metric='euclidean')
else:
from scipy.spatial import KDTree
params = dict(leafsize=200)
if x.ndim == 1:
x = x.reshape(1, -1)
if k_neighbors > N:
k_neighbors = N
if metric == 'mah':
Uinv = mahala(X)
x, X = x @ Uinv, X @ Uinv
tree = KDTree(X, **params)
dists, indices = tree.query(x, k=k_neighbors)
return dists, indices
def lwpls(Xtrain, Xtest, ytrain, globalplsVL=20, metric='euc', h = 2, k = 200, localplsVL = 15, center=True, scale=False, sklearn=True):
"""
Compute a locally weighted PLS Regression : for each row of Xtest to predict, the k nearest neighbors in the globalPLS
projection are used to weight the localPLS Regression
Input:
Xtrain = n x d matrix. n=rows and d=features
ytrain = n x e matrix. n=rows and e=features
Xtest = m x d matrix. m=rows and d=features
globalplsVL = number of component for the globalPLS
metric = used to compute distance in samples - Euclidean (default) or Mahalanobis
h = int defining the shape of the dist_to_weights function. Lower is h, sharper is the function.
k = number of nearest neighbors to compute the knn function
localplsVL = number of component for the localPLS
center, scale = Boolean to center or center and scale the data before the PLS.fit
sklearn = Boolean to use either sklearn or SciPy KDTree algo in knn function
Output:
ytest = m x e matrix m=rows and e=predicted features
"""
global_pls = PLS(algorithm=1, center_Y=center, center_X=center, scale_X = scale, scale_Y = scale)
global_pls.fit(Xtrain, ytrain, globalplsVL)
Xtest_pls = global_pls.transf(Xtest)
knn_distances, knn_indexes = knn(x=Xtest_pls, X=global_pls.T, metric = metric, k_neighbors=k, sklearn=sklearn)
knn_weights = dist_to_weights(knn_distances, h=h)
ytest=np.zeros((len(Xtest), 1), float)
local_pls = PLS(algorithm=1, center_Y=center, center_X=center, scale_X = scale, scale_Y = scale)
for y in range(len(ytest)):
local_pls.fit(Xtrain[knn_indexes[y]], ytrain[knn_indexes[y]], localplsVL, weights=knn_weights[y])
ytest[y] = local_pls.predict(Xtest[y], n_components=localplsVL)
return ytest
# ypred = lwpls(Xtrain, Xtest, ytrain, globalplsVL=15, metric='mah', h = 1, k = 200, localplsVL = 10, center=True, scale=False)
# optimize globalplsVL (5-25), metric (euc, mah), h (1, 3), k (50, 500), localplsVL (5, 25)
\ No newline at end of file
"""
Contains the PLS Class which implements partial least-squares regression using Improved
Kernel PLS by Dayal and MacGregor:
https://doi.org/10.1002/(SICI)1099-128X(199701)11:1%3C73::AID-CEM435%3E3.0.CO;2-%23
The PLS class subclasses scikit-learn's BaseEstimator to ensure compatibility with e.g.
scikit-learn's cross_validate. It is written using NumPy.
Author: Ole-Christian Galbo Engstrøm
E-mail: ole.e@di.ku.dk
Modifications by Nicolas Barthes to add weighted functionality - based on the plskern Jchemo code from Matthieu Lesnoff
"""
import warnings
from typing import Union
import numpy as np
import numpy.linalg as la
import numpy.typing as npt
from sklearn.base import BaseEstimator
class PLS(BaseEstimator):
"""
Implements partial least-squares regression using Improved Kernel PLS by Dayal and
MacGregor:
https://doi.org/10.1002/(SICI)1099-128X(199701)11:1%3C73::AID-CEM435%3E3.0.CO;2-%23
Parameters
----------
algorithm : int, default=1
Whether to use Improved Kernel PLS Algorithm #1 or #2.
center_X : bool, default=True
Whether to center `X` before fitting by subtracting its row of
column-wise means from each row.
center_Y : bool, default=True
Whether to center `Y` before fitting by subtracting its row of
column-wise means from each row.
scale_X : bool, default=True
Whether to scale `X` before fitting by dividing each row with the row of `X`'s
column-wise standard deviations. Bessel's correction for the unbiased estimate
of the sample standard deviation is used.
scale_Y : bool, default=True
Whether to scale `Y` before fitting by dividing each row with the row of `X`'s
column-wise standard deviations. Bessel's correction for the unbiased estimate
of the sample standard deviation is used.
copy : bool, default=True
Whether to copy `X` and `Y` in fit before potentially applying centering and
scaling. If True, then the data is copied before fitting. If False, and `dtype`
matches the type of `X` and `Y`, then centering and scaling is done inplace,
modifying both arrays.
dtype : numpy.float, default=numpy.float64
The float datatype to use in computation of the PLS algorithm. Using a lower
precision than float64 will yield significantly worse results when using an
increasing number of components due to propagation of numerical errors.
Raises
------
ValueError
If `algorithm` is not 1 or 2.
Notes
-----
Any centering and scaling is undone before returning predictions to ensure that
predictions are on the original scale. If both centering and scaling are True, then
the data is first centered and then scaled.
"""
def __init__(
self,
algorithm: int = 1,
center_X: bool = True,
center_Y: bool = True,
scale_X: bool = False,
scale_Y: bool = False,
copy: bool = True,
dtype: np.floating = np.float64,
) -> None:
self.algorithm = algorithm
self.center_X = center_X
self.center_Y = center_Y
self.scale_X = scale_X
self.scale_Y = scale_Y
self.copy = copy
self.dtype = dtype
self.eps = np.finfo(dtype).eps
self.name = f"Improved Kernel PLS Algorithm #{algorithm}"
if self.algorithm not in [1, 2]:
raise ValueError(
f"Invalid algorithm: {self.algorithm}. Algorithm must be 1 or 2."
)
self.A = None
self.N = None
self.K = None
self.M = None
self.B = None
self.W = None
self.P = None
self.Q = None
self.R = None
self.T = None
self.X_mean = None
self.Y_mean = None
self.X_std = None
self.Y_std = None
def _weight_warning(self, i: int) -> None:
"""
Warns the user that the weight is close to zero.
Parameters
----------
i : int
Number of components.
Returns
-------
None.
"""
with warnings.catch_warnings():
warnings.simplefilter("always", UserWarning)
warnings.warn(
f"Weight is close to zero. Results with A = {i} component(s) or higher"
" may be unstable."
)
def _weighted_avg_and_std(self, values, weights):
"""
Return the weighted average and standard deviation.
They weights are in effect first normalized so that they
sum to 1 (and so they must not all be 0).
values, weights -- NumPy ndarrays with the same shape.
"""
average = np.average(values, weights=weights, axis=0)
# Fast and numerically precise:
variance = np.average((values-average)**2, weights=weights, axis=0)
return average, np.sqrt(variance)
def fit(self, X: npt.ArrayLike, Y: npt.ArrayLike, A: int, weights: npt.ArrayLike = None) -> None:
"""
Fits weighted Improved Kernel PLS Algorithm #1 on `X` and `Y` using `A` components.
Parameters
----------
X : Array of shape (N, K)
Predictor variables.
Y : Array of shape (N, M) or (N,)
Response variables.
A : int
Number of components in the PLS model.
weights : None for Global PLS then Array of shape (N, 1)
weights of X variables
Attributes
----------
B : Array of shape (A, K, M)
PLS regression coefficients tensor.
W : Array of shape (K, A)
PLS weights matrix for X.
P : Array of shape (K, A)
PLS loadings matrix for X.
Q : Array of shape (M, A)
PLS Loadings matrix for Y.
R : Array of shape (K, A)
PLS weights matrix to compute scores T directly from original X.
T : Array of shape (N, A)
PLS scores matrix of X. Only assigned for Improved Kernel PLS Algorithm #1.
X_mean : Array of shape (1, K) or None
Mean of X. If centering is not performed, this is None.
Y_mean : Array of shape (1, M) or None
Mean of Y. If centering is not performed, this is None.
X_std : Array of shape (1, K) or None
Sample standard deviation of X. If scaling is not performed, this is None.
Y_std : Array of shape (1, M) or None
Sample standard deviation of Y. If scaling is not performed, this is None.
Returns
-------
None.
Warns
-----
UserWarning.
If at any point during iteration over the number of components `A`, the
residual goes below machine precision for np.float64.
"""
X = np.asarray(X, dtype=self.dtype)
Y = np.asarray(Y, dtype=self.dtype)
# if no weights are provided, set them to 1 for each row of X and normalize
if weights is None:
weights = np.ones(X.shape[0], dtype=X.dtype)
weights /= sum(weights)
weights = np.asarray(weights, dtype=X.dtype)
squared_weights = np.sqrt(weights)
if Y.ndim == 1:
Y = Y.reshape(-1, 1)
if (self.center_X or self.scale_X) and self.copy:
X = X.copy()
if (self.center_Y or self.scale_Y) and self.copy:
Y = Y.copy()
self.X_mean, xstd = self._weighted_avg_and_std(values=X, weights=weights)
self.Y_mean, ystd = self._weighted_avg_and_std(values=Y, weights=weights)
self.X_std = np.ones(X.shape[1], dtype=X.dtype)
self.Y_std = np.ones(Y.shape[1], dtype=Y.dtype)
# if scale = True, center then scale
if self.scale_X and self.scale_Y:
self.X_std = xstd
X = (X - self.X_mean) / self.X_std
self.Y_std = ystd
Y = (Y - self.Y_mean) / self.Y_std
# else center except if center is explicitly set to false
elif self.center_X and self.center_Y:
X -= self.X_mean
Y -= self.Y_mean
# weight X and Y matrices before computing PLS
X *= squared_weights.reshape(-1, 1)
Y *= squared_weights.reshape(-1, 1)
N, K = X.shape
M = Y.shape[1]
A = min(A, K)
self.B = np.zeros(shape=(A, K, M), dtype=self.dtype)
W = np.zeros(shape=(A, K), dtype=self.dtype)
P = np.zeros(shape=(A, K), dtype=self.dtype)
Q = np.zeros(shape=(A, M), dtype=self.dtype)
R = np.zeros(shape=(A, K), dtype=self.dtype)
self.W = W.T
self.P = P.T
self.Q = Q.T
self.R = R.T
if self.algorithm == 1:
T = np.zeros(shape=(A, N), dtype=self.dtype)
self.T = T.T
self.A = A
self.N = N
self.K = K
self.M = M
# Step 1
XTY = X.T @ Y
# Used for algorithm #2
if self.algorithm == 2:
XTX = X.T @ X
for i in range(A):
# Step 2
if M == 1:
norm = la.norm(XTY,)
if np.isclose(norm, 0, atol=self.eps, rtol=0):
self._weight_warning(i)
break
w = XTY / norm
else:
if M < K:
XTYTXTY = XTY.T @ XTY
eig_vals, eig_vecs = la.eigh(XTYTXTY)
q = eig_vecs[:, -1:]
w = XTY @ q
norm = la.norm(w)
if np.isclose(norm, 0, atol=self.eps, rtol=0):
self._weight_warning(i)
break
w = w / norm
else:
XTYYTX = XTY @ XTY.T
eig_vals, eig_vecs = la.eigh(XTYYTX)
norm = eig_vals[-1]
if np.isclose(norm, 0, atol=self.eps, rtol=0):
self._weight_warning(i)
break
w = eig_vecs[:, -1:]
W[i] = w.squeeze()
# Step 3
r = np.copy(w)
for j in range(i):
r = r - P[j].reshape(-1, 1).T @ w * R[j].reshape(-1, 1)
R[i] = r.squeeze()
# Step 4
if self.algorithm == 1:
t = X @ r
T[i] = t.squeeze()
tTt = t.T @ t
p = (t.T @ X).T / tTt
elif self.algorithm == 2:
rXTX = r.T @ XTX
tTt = rXTX @ r
p = rXTX.T / tTt
q = (r.T @ XTY).T / tTt
P[i] = p.squeeze()
Q[i] = q.squeeze()
# Step 5
XTY = XTY - (p @ q.T) * tTt
# Compute regression coefficients
self.B[i] = self.B[i - 1] + r @ q.T
T *= 1/squared_weights
def transf(self, Xtest, n_components=None):
"""
Project data to predict in the PLS space with nlv <= A
Parameters
----------
Xtest : Array of shape (N, K)
New elements whose Scores have to be computed.
n_components : int or None, optional, default=None.
Number of components in the PLS model. If None, then all number of
components are used.
Returns
-------
Xtest Scores : Array of shape (N, A)
"""
if n_components == None:
n_components = self.A
else:
n_components = min(n_components, self.A)
Xtest = Xtest.copy()
# if scale = False, X_std is "1" so calc below is only center!
Xtest = (Xtest - self.X_mean) / self.X_std
Xtest_coordinates_pls = Xtest @ self.R[:, :n_components]
return Xtest_coordinates_pls.squeeze()
def _coef(self, n_components):
"""
compute coefs used in predict function
Parameters
----------
n_components : int
Number of components in the PLS model used to predict.
Returns
-------
B : Array of shape (K, 1)
int = float
"""
beta = self.Q[:, :n_components].T
W = np.diag(self.Y_std)
B = np.diag(1 / self.X_std) @ self.R[:, :n_components] @ beta * W
int = self.Y_mean.T - self.X_mean.T @ B
return B, int
def predict(
self, X: npt.ArrayLike, n_components: Union[None, int] = None
) -> npt.NDArray[np.floating]:
"""
Predicts with Improved Kernel PLS Algorithm #1 on `X` with `B` using
`n_components` components. If `n_components` is None, then predictions are
returned for all number of components.
Parameters
----------
X : Array of shape (N, K)
Predictor variables.
n_components : int or None, optional, default=None.
Number of components in the PLS model. If None, then all number of
components are used.
Returns
-------
Y_pred : Array of shape (N, M) or (A, N, M)
If `n_components` is an int, then an array of shape (N, M) with the
predictions for that specific number of components is used. If
`n_components` is None, returns a prediction for each number of components
up to `A`.
"""
X = np.asarray(X, dtype=self.dtype)#.reshape(1,-1)
if n_components == None:
n_components = self.A
else:
n_components = min(n_components, self.A)
zB, zint = self._coef(n_components=n_components)
Y_pred = zint + (X @ zB)
return Y_pred
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment