import numpy as np
import copy
import time
from scipy.sparse import csr_matrix
import matplotlib.pyplot as plt
from ..basefunctions.sepym import func_sepym as sepym
from ..basefunctions.calcindex import func_calcindex
from ..basefunctions.newrec import func_newrec
from ..basefunctions.cliprec import func_cliprec
from .jacobian import func_jacobian as jacobian
from .pmodaic import func_pmodaic as pmodaic
from .pmodbic import func_pmodbic as pmodbic
from .pmodmse import func_pmodmse as pmodmse
from .pmodbic import func_pmodbic as pmodbic
# Global variable to track the figure for plotting
_plotindex_fig = None
_plotindex_line = None
[docs]
def plotindex(trec, goal, name, epoch):
"""Update the live training performance plot.
Parameters
----------
trec : dict
Training record (``trec['index']`` holds the per-epoch MSE values).
goal : float
Target performance value. A horizontal dashed line is drawn if
finite and positive.
name : str
Algorithm name shown in the figure title.
epoch : int
Current epoch index (0-based); only data up to this epoch is plotted.
"""
global _plotindex_fig, _plotindex_line
# Get indices up to current epoch
ind = slice(0, epoch + 1)
epochs_arr = np.arange(epoch + 1)
index_arr = trec['index'][ind]
# Check if goal should be plotted
print_goal = np.isfinite(goal)
plot_goal = np.isfinite(goal) and (goal > 0)
# Create or update figure
if _plotindex_fig is None or not plt.fignum_exists(_plotindex_fig.number):
plt.ion() # Enable interactive mode
_plotindex_fig, ax = plt.subplots()
_plotindex_fig.canvas.manager.set_window_title(f'Training with {name}')
_plotindex_line, = ax.semilogy(epochs_arr, index_arr, linewidth=2)
if plot_goal:
ax.axhline(y=goal, color='r', linestyle='--', linewidth=1, label='Goal')
ax.set_ylabel('Performance Index')
ax.set_xlabel('0 Epochs')
else:
ax = _plotindex_fig.axes[0]
_plotindex_line.set_xdata(epochs_arr)
_plotindex_line.set_ydata(index_arr)
ax.relim()
ax.autoscale_view()
# Update title
title_str = f'Performance is {trec["index"][epoch]:.6g}'
if print_goal:
title_str += f', Goal is {goal:.6g}'
ax = _plotindex_fig.axes[0]
ax.set_title(title_str)
# Update x-label
if epoch == 0:
ax.set_xlabel('Zero Epochs')
elif epoch == 1:
ax.set_xlabel('One Epoch')
else:
ax.set_xlabel(f'{epoch} Epochs')
# Set x-axis limits
if epoch > 0:
ax.set_xlim([0, epoch])
# Update display
_plotindex_fig.canvas.draw()
_plotindex_fig.canvas.flush_events()
plt.pause(0.01)
[docs]
def reset_plotindex():
"""Reset the plot figure for a new training session."""
global _plotindex_fig, _plotindex_line
_plotindex_fig = None
_plotindex_line = None
[docs]
def func_estimlm (pmod,y=[],u=[],show_plot=True,show_output=True) :
"""Fit a prediction model with the Levenberg-Marquardt algorithm.
Minimises the sum of squared one-step-ahead prediction errors using an
iterative Levenberg-Marquardt update:
.. math::
\\Delta X = -(J^T J + \\mu I)^{-1} J^T E
where :math:`J` is the numerical Jacobian of the prediction errors with
respect to the model parameters :math:`X`, :math:`E` is the error vector,
and :math:`\\mu` is the adaptive damping factor.
Parameters
----------
pmod : pmodel
Prediction model with initial parameter values and estimation
hyper-parameters in ``pmod.estimParams``.
y : array-like or dict
Desired output sequence. If a dict, must have keys ``'y'``
(output array) and ``'m'`` (sample-weight vector).
u : array-like, optional
Input sequence. Default ``[]`` (univariate models).
show_plot : bool, optional
Display the live performance-index plot during training.
Default ``True``.
show_output : bool, optional
Print epoch-by-epoch progress to stdout. Default ``True``.
Returns
-------
pmod : pmodel
Fitted model with updated parameter values.
trec : dict
Training record with keys:
- ``'index'`` — performance index at each epoch.
- ``'mu'`` — damping factor :math:`\\mu` at each epoch.
stat : dict
Final statistics:
- ``'sigma'`` — residual variance :math:`\\hat{\\sigma}^2`.
- ``'stdx'`` — standard deviations of parameter estimates.
Notes
-----
Default estimation hyper-parameters (set on ``pmod.estimParams``):
============ ======= ================================================
Attribute Default Meaning
============ ======= ================================================
epochs 10 Maximum iterations
goal 0 Stop when performance ≤ goal
mu 0.001 Initial damping factor
mu_dec 0.1 Multiply mu by this on a successful step
mu_inc 10 Multiply mu by this on a failed step
mu_max 1e10 Stop if mu exceeds this value
min_grad 1e-4 Stop if gradient norm falls below this
max_time inf Wall-clock time limit (seconds)
delta 1e-7 Finite-difference step for Jacobian
============ ======= ================================================
See Also
--------
estimate : Public interface — calls this function internally.
func_jacobian : Numerical Jacobian calculation.
"""
ystru, y, m = sepym(y)
# CALCULATION
# == == == == == =
# Constants
uflag = len(u)>0
this = 'ESTIMLM'
epochs = pmod.estimParams.epochs
goal = pmod.estimParams.goal
min_grad = pmod.estimParams.min_grad
mu = pmod.estimParams.mu
mu_inc = pmod.estimParams.mu_inc
mu_dec = pmod.estimParams.mu_dec
mu_max = pmod.estimParams.mu_max
show = pmod.estimParams.show
max_time = pmod.estimParams.max_time
delta = pmod.estimParams.delta
stop = ''
startTime = time.time()
# Reset plot for new training session
if show_plot:
reset_plotindex()
X = pmod.getmX()
numParameters = len(X)
ii = np.identity(numParameters)
if uflag:
index,e = func_calcindex(pmod, ystru, u)
else:
index,e = func_calcindex(pmod, ystru)
trec = func_newrec(epochs,'index', 'mu' )
repeat = 0
jj = []
# Train
for epoch in range(epochs):
#for epoch in [0,1]:
# Jacobian
if uflag:
je, jj, normgX = jacobian(pmod,delta, ystru, u)
else:
je, jj, normgX = jacobian(pmod,delta, ystru)
# Training Record
trec['index'][epoch] = index
trec['mu'][epoch] = mu
# Stopping Criteria
currentTime = time.time() - startTime
if (index <= goal):
stop = 'Performance goal met.'
elif (epoch == (epochs-1)):
stop = 'Maximum epoch reached, Performance goal was not met.'
elif (currentTime > max_time):
stop = 'Maximum time elapsed, performance goal was not met.'
elif (normgX < min_grad):
if show_output:
print(normgX , min_grad )
stop = 'Minimum gradient reached, performance goal was not met.'
elif (mu > mu_max):
stop = 'Maximum MU reached, performance goal was not met.'
# Progress
if (show > 0 and (epoch % show) == 0) | (len(stop) > 0):
if epochs>0:
message ='Epoch {}/{}'.format(epoch, epochs)
message = message + ' ' + 'Time {}'.format(currentTime)
if goal>-1:
message = message + ' {} {}/{}'.format(pmod.indexFcn.upper(), index, goal)
if min_grad>0:
message = message + ' Gradient {}/{}'.format(normgX, min_grad)
if mu_max>0:
message = message + ' mu {}/{}'.format(mu, mu_max)
if show_output:
print(message)
if show_plot:
plotindex(trec, goal, this, epoch)
if show_output and len(stop)>0:
print('{}, {}\n\n'.format( this, stop))
# Stop when criteria indicate its time
if len(stop) > 0:
break
# Levenberg Marquardt
repeat = 0
while (mu <= mu_max):
repeat = repeat + 1
A = jj + ii * mu
#dX = np.linalg.solve(A, je)
dX = np.linalg.lstsq((jj + ii * mu), je,rcond=None)[0]
dX = dX.T[0] *-1
X2 = X + dX
pmod2 = copy.deepcopy(pmod)
pmod2.setmX(X2)
if uflag:
index2, e1 = func_calcindex(pmod2, ystru, u)
else:
index2, e1 = func_calcindex(pmod2, ystru)
if (index2 < index):
X = X2
pmod = copy.deepcopy(pmod2);
index = index2;
mu = mu * mu_dec;
break
mu = mu * mu_inc;
# End of Loop Mu
if (mu < 1e-15):
mu = 1e-15
if uflag:
e = y - pmod.predict(y, u)
else:
e = y - pmod.predict(y)
x = e.shape
prod = 1
for i in range(len(x)):
prod = prod * x[i]
sigma = sum(sum(e ** 2)) / prod
if np.linalg.det(jj) !=0:
stdx = np.sqrt(sigma * np.diag(np.linalg.inv(jj)))
else:
print('Error no Inv Matrix')
stdx = [0,0]
stat = {
'sigma': sigma,
'stdx': stdx
}
# Finish
trec = func_cliprec( trec, epoch)
return pmod, trec, stat
##----------------