from __future__ import print_function
from __future__ import division
import os
import sys
import numpy as np
from scipy import optimize
from numpy.linalg import solve, LinAlgError
from numpy.linalg import cholesky as chol
from six import with_metaclass
from abc import ABCMeta, abstractmethod
try: # Run as a package if installed
from pcntoolkit.util.utils import squared_dist
except ImportError:
pass
path = os.path.abspath(os.path.dirname(__file__))
path = os.path.dirname(path) # parent directory
if path not in sys.path:
sys.path.append(path)
del path
from util.utils import squared_dist
# --------------------
# Covariance functions
# --------------------
[docs]class CovBase(with_metaclass(ABCMeta)):
""" Base class for covariance functions.
All covariance functions must define the following methods::
CovFunction.get_n_params()
CovFunction.cov()
CovFunction.xcov()
CovFunction.dcov()
"""
def __init__(self, x=None):
self.n_params = np.nan
[docs] def get_n_params(self):
""" Report the number of parameters required """
assert not np.isnan(self.n_params), \
"Covariance function not initialised"
return self.n_params
[docs] @abstractmethod
def cov(self, theta, x, z=None):
""" Return the full covariance (or cross-covariance if z is given) """
[docs] @abstractmethod
def dcov(self, theta, x, i):
""" Return the derivative of the covariance function with respect to
the i-th hyperparameter """
[docs]class CovLin(CovBase):
""" Linear covariance function (no hyperparameters)
"""
def __init__(self, x=None):
self.n_params = 0
self.first_call = False
[docs] def cov(self, theta, x, z=None):
if not self.first_call and not theta and theta is not None:
self.first_call = True
if len(theta) > 0 and theta[0] is not None:
print("CovLin: ignoring unnecessary hyperparameter ...")
if z is None:
z = x
K = x.dot(z.T)
return K
[docs] def dcov(self, theta, x, i):
raise ValueError("Invalid covariance function parameter")
[docs]class CovSqExp(CovBase):
""" Ordinary squared exponential covariance function.
The hyperparameters are::
theta = ( log(ell), log(sf) )
where ell is a lengthscale parameter and sf2 is the signal variance
"""
def __init__(self, x=None):
self.n_params = 2
[docs] def cov(self, theta, x, z=None):
self.ell = np.exp(theta[0])
self.sf2 = np.exp(2*theta[1])
if z is None:
z = x
R = squared_dist(x/self.ell, z/self.ell)
K = self.sf2 * np.exp(-R/2)
return K
[docs] def dcov(self, theta, x, i):
self.ell = np.exp(theta[0])
self.sf2 = np.exp(2*theta[1])
R = squared_dist(x/self.ell, x/self.ell)
if i == 0: # return derivative of lengthscale parameter
dK = self.sf2 * np.exp(-R/2) * R
return dK
elif i == 1: # return derivative of signal variance parameter
dK = 2*self.sf2 * np.exp(-R/2)
return dK
else:
raise ValueError("Invalid covariance function parameter")
[docs]class CovSqExpARD(CovBase):
""" Squared exponential covariance function with ARD
The hyperparameters are::
theta = (log(ell_1, ..., log_ell_D), log(sf))
where ell_i are lengthscale parameters and sf2 is the signal variance
"""
def __init__(self, x=None):
if x is None:
raise ValueError("N x D data matrix must be supplied as input")
if len(x.shape) == 1:
self.D = 1
else:
self.D = x.shape[1]
self.n_params = self.D + 1
[docs] def cov(self, theta, x, z=None):
self.ell = np.exp(theta[0:self.D])
self.sf2 = np.exp(2*theta[self.D])
if z is None:
z = x
R = squared_dist(x.dot(np.diag(1./self.ell)),
z.dot(np.diag(1./self.ell)))
K = self.sf2*np.exp(-R/2)
return K
[docs] def dcov(self, theta, x, i):
K = self.cov(theta, x)
if i < self.D: # return derivative of lengthscale parameter
dK = K * squared_dist(x[:, i]/self.ell[i], x[:, i]/self.ell[i])
return dK
elif i == self.D: # return derivative of signal variance parameter
dK = 2*K
return dK
else:
raise ValueError("Invalid covariance function parameter")
[docs]class CovSum(CovBase):
""" Sum of covariance functions. These are passed in as a cell array and
intialised automatically. For example::
C = CovSum(x,(CovLin, CovSqExpARD))
C = CovSum.cov(x, )
The hyperparameters are::
theta = ( log(ell_1, ..., log_ell_D), log(sf2) )
where ell_i are lengthscale parameters and sf2 is the signal variance
"""
def __init__(self, x=None, covfuncnames=None):
if x is None:
raise ValueError("N x D data matrix must be supplied as input")
if covfuncnames is None:
raise ValueError("A list of covariance functions is required")
self.covfuncs = []
self.n_params = 0
for cname in covfuncnames:
covfunc = eval(cname + '(x)')
self.n_params += covfunc.get_n_params()
self.covfuncs.append(covfunc)
if len(x.shape) == 1:
self.N = len(x)
self.D = 1
else:
self.N, self.D = x.shape
[docs] def cov(self, theta, x, z=None):
theta_offset = 0
for ci, covfunc in enumerate(self.covfuncs):
try:
n_params_c = covfunc.get_n_params()
theta_c = [theta[c] for c in
range(theta_offset, theta_offset + n_params_c)]
theta_offset += n_params_c
except Exception as e:
print(e)
if ci == 0:
K = covfunc.cov(theta_c, x, z)
else:
K += covfunc.cov(theta_c, x, z)
return K
[docs] def dcov(self, theta, x, i):
theta_offset = 0
for covfunc in self.covfuncs:
n_params_c = covfunc.get_n_params()
theta_c = [theta[c] for c in
range(theta_offset, theta_offset + n_params_c)]
theta_offset += n_params_c
if theta_c: # does the variable have any hyperparameters?
if 'dK' not in locals():
dK = covfunc.dcov(theta_c, x, i)
else:
dK += covfunc.dcov(theta_c, x, i)
return dK
# -----------------------
# Gaussian process models
# -----------------------
[docs]class GPR:
"""Gaussian process regression
Estimation and prediction of Gaussian process regression models
Basic usage::
G = GPR()
hyp = B.estimate(hyp0, cov, X, y)
ys, ys2 = B.predict(hyp, cov, X, y, Xs)
where the variables are
:param hyp: vector of hyperparmaters
:param cov: covariance function
:param X: N x D data array
:param y: 1D Array of targets (length N)
:param Xs: Nte x D array of test cases
:param hyp0: starting estimates for hyperparameter optimisation
:returns: * ys - predictive mean
* ys2 - predictive variance
The hyperparameters are::
hyp = ( log(sn), (cov function params) ) # hyp is a list or array
The implementation and notation follows Rasmussen and Williams (2006).
As in the gpml toolbox, these parameters are estimated using conjugate
gradient optimisation of the marginal likelihood. Note that there is no
explicit mean function, thus the gpr routines are limited to modelling
zero-mean processes.
Reference:
C. Rasmussen and C. Williams (2006) Gaussian Processes for Machine Learning
Written by A. Marquand
"""
def __init__(self, hyp=None, covfunc=None, X=None, y=None, n_iter=100,
tol=1e-3, verbose=False, warp=None):
self.hyp = np.nan
self.nlZ = np.nan
self.tol = tol # not used at present
self.n_iter = n_iter
self.verbose = verbose
# set up warped likelihood
if warp is None:
self.warp = None
self.n_warp_param = 0
else:
self.warp = warp
self.n_warp_param = warp.get_n_params()
self.gamma = None
def _updatepost(self, hyp, covfunc):
hypeq = np.asarray(hyp == self.hyp)
if hypeq.all() and hasattr(self, 'alpha') and \
(hasattr(self, 'covfunc') and covfunc == self.covfunc):
return False
else:
return True
[docs] def post(self, hyp, covfunc, X, y):
""" Generic function to compute posterior distribution.
"""
if len(hyp.shape) > 1: # force 1d hyperparameter array
hyp = hyp.flatten()
if len(X.shape) == 1:
X = X[:, np.newaxis]
self.N, self.D = X.shape
# hyperparameters
sn2 = np.exp(2*hyp[0]) # noise variance
if self.warp is not None: # parameters for warping the likelhood
n_lik_param = self.n_warp_param+1
else:
n_lik_param = 1
theta = hyp[n_lik_param:] # (generic) covariance hyperparameters
if self.verbose:
print("estimating posterior ... | hyp=", hyp)
self.K = covfunc.cov(theta, X)
self.L = chol(self.K + sn2*np.eye(self.N))
self.alpha = solve(self.L.T, solve(self.L, y))
self.hyp = hyp
self.covfunc = covfunc
[docs] def loglik(self, hyp, covfunc, X, y):
""" Function to compute compute log (marginal) likelihood
"""
# load or recompute posterior
if self.verbose:
print("computing likelihood ... | hyp=", hyp)
# parameters for warping the likelhood function
if self.warp is not None:
gamma = hyp[1:(self.n_warp_param+1)]
y = self.warp.f(y, gamma)
y_unwarped = y
if len(hyp.shape) > 1: # force 1d hyperparameter array
hyp = hyp.flatten()
if self._updatepost(hyp, covfunc):
try:
self.post(hyp, covfunc, X, y)
except (ValueError, LinAlgError):
print("Warning: Estimation of posterior distribution failed")
self.nlZ = 1/np.finfo(float).eps
return self.nlZ
self.nlZ = 0.5*y.T.dot(self.alpha) + sum(np.log(np.diag(self.L))) + \
0.5*self.N*np.log(2*np.pi)
if self.warp is not None:
# add in the Jacobian
self.nlZ = self.nlZ - sum(np.log(self.warp.df(y_unwarped, gamma)))
# make sure the output is finite to stop the minimizer getting upset
if not np.isfinite(self.nlZ):
self.nlZ = 1/np.finfo(float).eps
if self.verbose:
print("nlZ= ", self.nlZ, " | hyp=", hyp)
return self.nlZ
[docs] def dloglik(self, hyp, covfunc, X, y):
""" Function to compute derivatives
"""
if len(hyp.shape) > 1: # force 1d hyperparameter array
hyp = hyp.flatten()
if self.warp is not None:
raise ValueError('optimization with derivatives is not yet ' +
'supported for warped liklihood')
# hyperparameters
sn2 = np.exp(2*hyp[0]) # noise variance
theta = hyp[1:] # (generic) covariance hyperparameters
# load posterior and prior covariance
if self._updatepost(hyp, covfunc):
try:
self.post(hyp, covfunc, X, y)
except (ValueError, LinAlgError):
print("Warning: Estimation of posterior distribution failed")
dnlZ = np.sign(self.dnlZ) / np.finfo(float).eps
return dnlZ
# compute Q = alpha*alpha' - inv(K)
Q = np.outer(self.alpha, self.alpha) - \
solve(self.L.T, solve(self.L, np.eye(self.N)))
# initialise derivatives
self.dnlZ = np.zeros(len(hyp))
# noise variance
self.dnlZ[0] = -sn2*np.trace(Q)
# covariance parameter(s)
for par in range(0, len(theta)):
# compute -0.5*trace(Q.dot(dK/d[theta_i])) efficiently
dK = covfunc.dcov(theta, X, i=par)
self.dnlZ[par+1] = -0.5*np.sum(np.sum(Q*dK.T))
# make sure the gradient is finite to stop the minimizer getting upset
if not all(np.isfinite(self.dnlZ)):
bad = np.where(np.logical_not(np.isfinite(self.dnlZ)))
for b in bad:
self.dnlZ[b] = np.sign(self.dnlZ[b]) / np.finfo(float).eps
if self.verbose:
print("dnlZ= ", self.dnlZ, " | hyp=", hyp)
return self.dnlZ
# model estimation (optimization)
[docs] def estimate(self, hyp0, covfunc, X, y, optimizer='cg'):
""" Function to estimate the model
"""
if len(X.shape) == 1:
X = X[:, np.newaxis]
self.hyp0 = hyp0
if optimizer.lower() == 'cg': # conjugate gradients
out = optimize.fmin_cg(self.loglik, hyp0, self.dloglik,
(covfunc, X, y), disp=True, gtol=self.tol,
maxiter=self.n_iter, full_output=1)
elif optimizer.lower() == 'powell': # Powell's method
out = optimize.fmin_powell(self.loglik, hyp0, (covfunc, X, y),
full_output=1)
else:
raise ValueError("unknown optimizer")
# Always return a 1d array. The optimizer sometimes changes dimesnions
if len(out[0].shape) > 1:
self.hyp = out[0].flatten()
else:
self.hyp = out[0]
self.nlZ = out[1]
self.optimizer = optimizer
return self.hyp
[docs] def predict(self, hyp, X, y, Xs):
""" Function to make predictions from the model
"""
if len(hyp.shape) > 1: # force 1d hyperparameter array
hyp = hyp.flatten()
# ensure X and Xs are multi-dimensional arrays
if len(Xs.shape) == 1:
Xs = Xs[:, np.newaxis]
if len(X.shape) == 1:
X = X[:, np.newaxis]
# parameters for warping the likelhood function
if self.warp is not None:
gamma = hyp[1:(self.n_warp_param+1)]
y = self.warp.f(y, gamma)
# reestimate posterior (avoids numerical problems with optimizer)
self.post(hyp, self.covfunc, X, y)
# hyperparameters
sn2 = np.exp(2*hyp[0]) # noise variance
# (generic) covariance hyperparameters
theta = hyp[(self.n_warp_param + 1):]
Ks = self.covfunc.cov(theta, Xs, X)
kss = self.covfunc.cov(theta, Xs)
# predictive mean
ymu = Ks.dot(self.alpha)
# predictive variance (for a noisy test input)
v = solve(self.L, Ks.T)
ys2 = kss - v.T.dot(v) + sn2
return ymu, ys2