Source code for RepTate.theories.TheoryShanbhagMaxwellModes

# RepTate: Rheology of Entangled Polymers: Toolkit for the Analysis of Theory and Experiments
# --------------------------------------------------------------------------------------------------------
#
# Authors:
#     Jorge Ramirez, jorge.ramirez@upm.es
#     Victor Boudara, victor.boudara@gmail.com
#
# Useful links:
#     http://blogs.upm.es/compsoftmatter/software/reptate/
#     https://github.com/jorge-ramirez-upm/RepTate
#     http://reptate.readthedocs.io
#
# --------------------------------------------------------------------------------------------------------
#
# Copyright (2017-2023): Jorge Ramirez, Victor Boudara, Universidad Politécnica de Madrid, University of Leeds
#
# This file is part of RepTate.
#
# RepTate is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RepTate is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RepTate.  If not, see <http://www.gnu.org/licenses/>.
#
# --------------------------------------------------------------------------------------------------------
"""Module TheoryShanbhagMaxwellModes

Module that defines theories related to Maxwell modes, in the frequency and time domains 
based on the codes pyRespect-time (10.1002/mats.201900005) and pyRespect-frequency (10.3933/ApplRheol-23-24628)

"""
import sys
import os
import numpy as np
from RepTate.core.DataTable import DataTable
from RepTate.core.Parameter import Parameter, ParameterType, OptType
from RepTate.gui.QTheory import QTheory
from PySide6.QtWidgets import QToolBar, QToolButton, QMenu, QMessageBox, QFileDialog
from PySide6.QtCore import QSize
from PySide6.QtGui import QIcon
from scipy.optimize import nnls, minimize, least_squares
from scipy.interpolate import interp1d
from scipy.integrate import cumtrapz, quad
import enum
import RepTate
import time


[docs] class PredictionMode(enum.Enum): """Define which prediction we want to see Parameters can be: - cont: Prediction from Continuous spectrum - disc: Prediction from Discrete spectrum """ cont = 0 disc = 1
[docs] class TheoryShanbhagMaxwellModesFrequency(QTheory): """Extract continuous and discrete relaxation spectra from complex modulus G*(w) * **Parameters** - plateau : is there a residual plateau in the data (default False). - ns : Number of grid points to represent the continuous spectrum (typical 50-100) - lamC : Specify lambda_C instead of using the one inferred from the L-curve (default 0, use the L-curve). - SmFacLam = Smoothing Factor. - MaxNumModes = Max Number of Modes (default 0, automatically determine the optimal number of modes). - lam_min = lower limit of lambda for lcurve calculation (default 1e-10). - lam_max = higher limit of lambda for lcurve calculation (default 1e3). - lamDensity = lambda density per decade (default 3, use 2 or more). - rho_cutoff = Threshold to avoid picking too small lambda for L-curve without (default 0). - deltaBaseWeightDist = how finely to sample BaseWeightDist (default 0.2). - minTauSpacing = how close do successive modes (tau2/tau1) have to be before we try to mege them (default 1.25). """ thname = "ReSpect" description = "Relaxation spectra from dynamic moduli" citations = ["Takeh, A. and Shanbhag, S., Appl. Rheol. 2013, 23, 24628"] doi = ["http://dx.doi.org/10.3933/ApplRheol-23-24628"] html_help_file = "http://reptate.readthedocs.io/manual/Applications/LVE/Theory/theory.html#shanbhag-maxwell-modes" single_file = True def __init__(self, name="", parent_dataset=None, ax=None): """**Constructor**""" super().__init__(name, parent_dataset, ax) self.function = self.ShanBhagMaxwellModesFrequency self.has_modes = True self.view_modes = True wmin = self.parent_dataset.minpositivecol(0) wmax = self.parent_dataset.maxcol(0) nmodes = int(np.round(np.log10(wmax / wmin))) self.parameters["plateau"] = Parameter( "plateau", False, "is there a residual plateau?", ParameterType.boolean, opt_type=OptType.const, display_flag=False, ) self.parameters["ns"] = Parameter( "ns", 100, "Number of grid points to represent the continuous spectrum (typical 50-100)", ParameterType.integer, opt_type=OptType.const, ) self.parameters["lamC"] = Parameter( "lamC", 0, "Specify lambda_C", ParameterType.real, opt_type=OptType.nopt ) self.parameters["SmFacLam"] = Parameter( name="SmFacLam", value=0, description="Smoothing Factor (between -1 and 1)", type=ParameterType.discrete_integer, opt_type=OptType.const, discrete_values=[-1, 0, 1], ) self.parameters["FreqEnd"] = Parameter( name="FreqEnd", value=1, description="Treatment of Frequency Window ends (1, 2 or 3)", type=ParameterType.discrete_integer, opt_type=OptType.const, discrete_values=[1, 2, 3], ) self.parameters["MaxNumModes"] = Parameter( "MaxNumModes", 0, "Max Number of Modes", ParameterType.integer, opt_type=OptType.const, ) self.parameters["lam_min"] = Parameter( "lam_min", 1e-10, "lower limit of lambda for lcurve calculation", ParameterType.real, opt_type=OptType.nopt, ) self.parameters["lam_max"] = Parameter( "lam_max", 1e3, "higher limit of lambda for lcurve calculation", ParameterType.real, opt_type=OptType.nopt, ) self.parameters["lamDensity"] = Parameter( "lamDensity", 3, "lambda density per decade", ParameterType.integer, opt_type=OptType.nopt, ) self.parameters["rho_cutoff"] = Parameter( "rho_cutoff", 0, "Threshold to avoid picking too small lambda", ParameterType.real, opt_type=OptType.nopt, ) self.parameters["deltaBaseWeightDist"] = Parameter( "deltaBaseWeightDist", 0.2, "how finely to sample BaseWeightDist", ParameterType.real, opt_type=OptType.nopt, ) self.parameters["minTauSpacing"] = Parameter( "minTauSpacing", 1.25, "how close modes (tau2/tau1) for merge", ParameterType.real, opt_type=OptType.nopt, ) self.autocalculate = False # GRAPHIC MODES self.graphicmodes = [] self.spectrum = [] ns = self.parameters["ns"].value self.scont = np.zeros(ns) self.Hcont = np.zeros(ns) self.sdisc = np.zeros(ns) self.Hdisc = np.zeros(ns) self.setup_graphic_modes() self.prediction_mode = PredictionMode.cont self.GstM = None self.K = None self.n = 0 # add widgets specific to the theory tb = QToolBar() tb.setIconSize(QSize(24, 24)) self.tbutpredmode = QToolButton() self.tbutpredmode.setPopupMode(QToolButton.MenuButtonPopup) menu = QMenu(self) self.cont_pred_action = menu.addAction( QIcon(":/Icon8/Images/new_icons/icons8-minimum-value.png"), "Fit from Spectrum", ) self.disc_pred_action = menu.addAction( QIcon(":/Icon8/Images/new_icons/icons8-scatter-plot.png"), "Fit from Discrete", ) if self.prediction_mode == PredictionMode.cont: self.tbutpredmode.setDefaultAction(self.cont_pred_action) else: self.tbutpredmode.setDefaultAction(self.disc_pred_action) self.tbutpredmode.setMenu(menu) tb.addWidget(self.tbutpredmode) self.modesaction = tb.addAction( QIcon(":/Icon8/Images/new_icons/icons8-visible.png"), "View modes/spectrum" ) self.plateauaction = tb.addAction( QIcon(":/Icon8/Images/new_icons/icons8-flat-tire-80.png"), "is there a residual plateau?", ) self.save_modes_action = tb.addAction( QIcon(":/Icon8/Images/new_icons/icons8-save-Maxwell.png"), "Save Modes" ) self.save_spectrum_action = tb.addAction( QIcon(":/Icon8/Images/new_icons/icons8-save_Ht.png"), "Save Spectrum" ) self.modesaction.setCheckable(True) self.modesaction.setChecked(True) self.plateauaction.setCheckable(True) self.plateauaction.setChecked(False) self.thToolsLayout.insertWidget(0, tb) connection_id = self.modesaction.triggered.connect(self.modesaction_change) connection_id = self.plateauaction.triggered.connect(self.plateauaction_change) connection_id = self.save_modes_action.triggered.connect(self.save_modes) connection_id = self.save_spectrum_action.triggered.connect(self.save_spectrum) connection_id = self.cont_pred_action.triggered.connect(self.select_cont_pred) connection_id = self.disc_pred_action.triggered.connect(self.select_disc_pred)
[docs] def select_cont_pred(self): self.prediction_mode = PredictionMode.cont self.tbutpredmode.setDefaultAction(self.cont_pred_action) if self.n > 0: th_files = self.theory_files() for f in self.parent_dataset.files: if f in th_files: tt = self.tables[f.file_name_short] tt.data[:, 1] = self.K[: self.n] tt.data[:, 2] = self.K[self.n :] self.Qprint("<b>Fit from continuous spectrum</b>") self.do_error("") self.do_plot("")
[docs] def select_disc_pred(self): self.prediction_mode = PredictionMode.disc self.tbutpredmode.setDefaultAction(self.disc_pred_action) if self.n > 0: th_files = self.theory_files() for f in self.parent_dataset.files: if f in th_files: tt = self.tables[f.file_name_short] tt.data[:, 1] = self.GstM[: self.n] tt.data[:, 2] = self.GstM[self.n :] self.Qprint("<b>Fit from discrete spectrum</b>") self.do_error("") self.do_plot("")
[docs] def save_spectrum(self): """Save Spectrum to a text file""" fpath, _ = QFileDialog.getSaveFileName( self, "Save spectrum to a text file", os.path.join(RepTate.root_dir, "data"), "Text (*.txt)", ) if fpath == "": return with open(fpath, "w") as f: verdata = RepTate._version.get_versions() version = verdata["version"].split("+")[0] date = verdata["date"].split("T")[0] build = verdata["version"] header = "# Continuous spectrum\n" header += "# Generated with RepTate %s %s (build %s)\n" % ( version, date, build, ) header += "# At %s on %s\n" % ( time.strftime("%X"), time.strftime("%a %b %d, %Y"), ) f.write(header) f.write("\n#%15s\t%15s\n" % ("tau_i", "G_i")) n = len(self.scont) for i in range(n): f.write("%15g\t%15g\n" % (self.scont[i], np.exp(self.Hcont[i]))) f.write("\n#end") QMessageBox.information(self, "Success", 'Wrote spectrum "%s"' % fpath)
[docs] def Qhide_theory_extras(self, state): """Uncheck the modeaction button. Called when curent theory is changed""" self.modesaction.setChecked(state)
[docs] def modesaction_change(self, checked): """Change visibility of modes""" self.graphicmodes_visible(checked)
# self.view_modes = self.modesaction.isChecked() # self.graphicmodes.set_visible(self.view_modes) # self.do_calculate("")
[docs] def plateauaction_change(self, checked): self.set_param_value("plateau", checked)
[docs] def handle_spinboxValueChanged(self, value): """Handle a change of the parameter 'nmodes'""" self.set_param_value("nmodes", value) if self.autocalculate: self.parent_dataset.handle_actionCalculate_Theory() self.update_parameter_table()
[docs] def setup_graphic_modes(self): """Setup graphic helpers""" self.contspectrum = self.ax.plot([], [])[0] self.contspectrum.set_marker("*") self.contspectrum.set_linestyle("--") self.contspectrum.set_visible(self.view_modes) self.contspectrum.set_color("green") self.contspectrum.set_linewidth(5) self.contspectrum.set_label("") # self.contspectrum.set_markerfacecolor('yellow') self.contspectrum.set_markeredgecolor("black") self.contspectrum.set_markeredgewidth(3) self.contspectrum.set_markersize(6) self.contspectrum.set_alpha(0.5) self.discspectrum = self.ax.plot([], [])[0] self.discspectrum.set_marker("D") self.discspectrum.set_visible(self.view_modes) self.discspectrum.set_label("") self.discspectrum.set_markerfacecolor("green") self.discspectrum.set_markeredgecolor("black") self.discspectrum.set_color("green") self.discspectrum.set_markeredgewidth(3) self.discspectrum.set_markersize(8) self.discspectrum.set_alpha(0.5) self.plot_theory_stuff()
[docs] def destructor(self): """Called when the theory tab is closed""" self.graphicmodes_visible(False) # self.ax.lines.remove(self.contspectrum) # self.ax.lines.remove(self.discspectrum) self.contspectrum.remove() self.discspectrum.remove()
[docs] def show_theory_extras(self, show=False): """Called when the active theory is changed""" self.Qhide_theory_extras(show) self.graphicmodes_visible(show)
[docs] def graphicmodes_visible(self, state): """Change visibility of modes""" self.view_modes = state self.contspectrum.set_visible(self.view_modes) self.discspectrum.set_visible(self.view_modes) self.parent_dataset.parent_application.update_plot()
[docs] def get_modes(self): """Get the values of Maxwell Modes from this theory""" nmodes = len(self.sdisc) tau = self.sdisc G = self.Hdisc return tau, G, True
[docs] def kernel_prestore(self, H, kernMat, *argv): """ turbocharging kernel function evaluation by prestoring kernel matrix Date : 8/17/2018 Function: kernel_prestore(input) returns K*h, where h = exp(H) Same as kernel, except prestoring hs, S, and W to improve speed 3x. outputs the 2n*1 dimensional vector K(H)(w) which is comparable to G* = [G'|G"]' 3/11/2019: returning Kh + G0 Input: H = substituted CRS, kernMat = 2n*ns matrix [(ws^2/1+ws^2) | (ws/1+ws)]'*hs """ if len(argv) > 0: n = int(kernMat.shape[0] / 2) G0v = np.zeros(2 * n) G0v[:n] = argv[0] else: G0v = 0.0 return np.dot(kernMat, np.exp(H)) + G0v
[docs] def residualLM(self, H, lam, Gexp, kernMat): """ % % HELPER FUNCTION: Gets Residuals r %""" n = int(kernMat.shape[0] / 2) ns = kernMat.shape[1] nl = ns - 2 r = np.zeros(2 * n + nl) # if plateau then unfurl G0 if len(H) > ns: G0 = H[-1] H = H[:-1] r[0 : 2 * n] = 1.0 - self.kernel_prestore(H, kernMat, G0) / Gexp else: r[0 : 2 * n] = 1.0 - self.kernel_prestore(H, kernMat) / Gexp # the curvature constraint is not affected by G0 r[2 * n : 2 * n + nl] = np.sqrt(lam) * np.diff(H, n=2) # second derivative return r
[docs] def jacobianLM(self, H, lam, Gexp, kernMat): """ HELPER FUNCTION: Gets Jacobian J returns a (n+nl * ns) matrix Jr; (ns + 1) if G0 is also supplied. Jr_(i, j) = dr_i/dH_j It uses kernelD, which approximates dK_i/dH_j, where K is the kernel """ n = int(kernMat.shape[0] / 2) ns = kernMat.shape[1] nl = ns - 2 # L is a nl*ns tridiagonal matrix with 1 -2 and 1 on its diagonal. L = ( np.diag(np.ones(ns - 1), 1) + np.diag(np.ones(ns - 1), -1) + np.diag(-2.0 * np.ones(ns)) ) L = L[1 : nl + 1, :] Jr = np.zeros((2 * n + nl, ns)) # # Furnish the Jacobian Jr - (2n+nl)*ns matrix # Kmatrix is 2*n * ns matrix # Kmatrix = np.dot((1.0 / Gexp).reshape(2 * n, 1), np.ones((1, ns))) if len(H) > ns: G0 = H[-1] H = H[:-1] Jr = np.zeros((2 * n + nl, ns + 1)) Jr[0 : 2 * n, 0:ns] = -self.kernelD(H, kernMat) * Kmatrix Jr[0:n, ns] = -1.0 / Gexp[:n] # nonzero dr_i/dG0 only for G' Jr[2 * n : 2 * n + nl, 0:ns] = np.sqrt(lam) * L Jr[2 * n : 2 * n + nl, ns] = np.zeros(nl) # column for dr_i/dG0 = 0 else: Jr = np.zeros((2 * n + nl, ns)) Jr[0 : 2 * n, 0:ns] = -self.kernelD(H, kernMat) * Kmatrix Jr[2 * n : 2 * n + nl, 0:ns] = np.sqrt(lam) * L return Jr
[docs] def kernelD(self, H, kernMat): """ Function: kernelD(input) outputs the 2n*ns dimensional vector DK(H)(w) It approximates dK_i/dH_j = K * e(H_j): Input: H = substituted CRS, kernMat = matrix for faster kernel evaluation Output: DK = Jacobian of H """ n = int(kernMat.shape[0] / 2) ns = kernMat.shape[1] Hsuper = np.dot(np.ones((2 * n, 1)), np.exp(H).reshape(1, ns)) DK = kernMat * Hsuper return DK
[docs] def getKernMat(self, s, w): """furnish kerMat() which helps faster kernel evaluation, given s, w Generates a 2n*ns matrix [(ws^2/1+ws^2) | (ws/1+ws)]'*hs, which can be multiplied with exp(H) to get predicted G*""" ns = len(s) hsv = np.zeros(ns) hsv[0] = 0.5 * np.log(s[1] / s[0]) hsv[ns - 1] = 0.5 * np.log(s[ns - 1] / s[ns - 2]) hsv[1 : ns - 1] = 0.5 * (np.log(s[2:ns]) - np.log(s[0 : ns - 2])) S, W = np.meshgrid(s, w) ws = S * W ws2 = ws**2 return np.vstack((ws2 / (1 + ws2), ws / (1 + ws2))) * hsv
[docs] def getH(self, lam, Gexp, H, kernMat, G0=0): """ minimize_H V(lambda) := ||Gexp - kernel(H)||^2 + lambda * ||L H||^2 Input : lambda = regularization parameter , Gexp = experimental data, H = guessed H, kernMat = matrix for faster kernel evaluation G0 = optional Output : H_lam, [G0] Default uses Trust-Region Method with Jacobian supplied by jacobianLM """ # send Hplus = [H, G0], on return unpack H and G0 if G0 > 0: Hplus = np.append(H, G0) res_lsq = least_squares( self.residualLM, Hplus, jac=self.jacobianLM, args=(lam, Gexp, kernMat) ) return res_lsq.x[:-1], res_lsq.x[-1] # send normal H, and collect optimized H back else: res_lsq = least_squares( self.residualLM, H, jac=self.jacobianLM, args=(lam, Gexp, kernMat) ) return res_lsq.x
[docs] def InitializeH(self, Gexp, s, kernMat, G0=0): """ Function: InitializeH(input) Input: Gexp = 2n\*1 vector [G';G"], s = relaxation modes, kernMat = matrix for faster kernel evaluation G0 = optional; if plateau is nonzero Output: H = guessed H G0 = optional guess if *argv is nonempty """ # # To guess spectrum, pick a negative Hgs and a large value of lambda to get a # solution that is most determined by the regularization # March 2019; a single guess is good enough now, because going from large lambda to small # lambda in lcurve. H = -5.0 * np.ones(len(s)) + np.sin(np.pi * s) lam = 1e0 if G0 > 0: Hlam, G0 = self.getH(lam, Gexp, H, kernMat, G0) return Hlam, G0 else: Hlam = self.getH(lam, Gexp, H, kernMat) return Hlam
[docs] def getAmatrix(self, ns): """Generate symmetric matrix A = L' * L required for error analysis: helper function for lcurve in error determination""" # L is a ns*ns tridiagonal matrix with 1 -2 and 1 on its diagonal; nl = ns - 2 L = ( np.diag(np.ones(ns - 1), 1) + np.diag(np.ones(ns - 1), -1) + np.diag(-2.0 * np.ones(ns)) ) L = L[1 : nl + 1, :] return np.dot(L.T, L)
[docs] def getBmatrix(self, H, kernMat, Gexp, *argv): """get the Bmatrix required for error analysis; helper for lcurve() not explicitly accounting for G0 in Jr because otherwise I get underflow problems """ n = int(len(Gexp) / 2) ns = len(H) nl = ns - 2 r = np.zeros(n) # vector of size (n); # furnish relevant portion of Jacobian and residual Kmatrix = np.dot((1.0 / Gexp).reshape(2 * n, 1), np.ones((1, ns))) Jr = -self.kernelD(H, kernMat) * Kmatrix # if plateau then unfurl G0 if len(argv) > 0: G0 = argv[0] r = 1.0 - self.kernel_prestore(H, kernMat, G0) / Gexp else: r = 1.0 - self.kernel_prestore(H, kernMat) / Gexp B = np.dot(Jr.T, Jr) + np.diag(np.dot(r.T, Jr)) return B
[docs] def oldLamC(self, lam, rho, eta): # # 8/1/2018: Making newer strategy more accurate and robust: dividing by minimum rho/eta # which is not as sensitive to lam_min, lam_max. This makes lamC robust to range of lam explored # # er = rho/np.amin(rho) + eta/np.amin(eta); er = rho / np.amin(rho) + eta / (np.sqrt(np.amax(eta) * np.amin(eta))) # # Since rho v/s lambda is smooth, we can interpolate the coarse mesh to find minimum # # change 3/20/2019: Scipy 0.17 has a bug with extrapolation: so making lami tad smaller lami = np.logspace(np.log10(min(lam) + 1e-15), np.log10(max(lam) - 1e-15), 1000) erri = np.exp( interp1d( np.log(lam), np.log(er), kind="cubic", bounds_error=False, fill_value=(np.log(er[0]), np.log(er[-1])), )(np.log(lami)) ) ermin = np.amin(erri) eridx = np.argmin(erri) lamC = lami[eridx] # # 2/2: Copying 12/18 edit from pyReSpect-time; # for rough data have cutoff at rho = rho_cutoff? # rhoF = interp1d(lam, rho, bounds_error=False, fill_value=(rho[0], rho[-1])) if rhoF(lamC) <= self.parameters["rho_cutoff"].value: try: eridx = ( np.abs(rhoF(lami) - self.parameters["rho_cutoff"].value) ).argmin() if lami[eridx] > lamC: lamC = lami[eridx] except: pass return lamC
[docs] def lcurve(self, Gexp, Hgs, kernMat, *argv): """ Function: lcurve(input) Input: Gexp = 2n*1 vector [Gt], Hgs = guessed H, kernMat = matrix for faster kernel evaluation G0 = optionally Output: lamC and 3 vectors of size npoints*1 contains a range of lambda, rho and eta. "Elbow" = lamC is estimated using a *NEW* heuristic AND by Hansen method March 2019: starting from large lambda to small cuts calculation time by a lot also gives an error estimate """ plateau = self.parameters["plateau"].value if plateau: G0 = argv[0] lamDensity = self.parameters["lamDensity"].value lam_max = self.parameters["lam_max"].value lam_min = self.parameters["lam_min"].value npoints = int(lamDensity * (np.log10(lam_max) - np.log10(lam_min))) hlam = (lam_max / lam_min) ** (1.0 / (npoints - 1.0)) lam = lam_min * hlam ** np.arange(npoints) eta = np.zeros(npoints) rho = np.zeros(npoints) logP = np.zeros(npoints) H = Hgs.copy() n = len(Gexp) ns = len(H) nl = ns - 2 logPmax = -np.inf # so nothing surprises me! Hlambda = np.zeros((ns, npoints)) # Error Analysis: Furnish A_matrix Amat = self.getAmatrix(len(H)) _, LogDetN = np.linalg.slogdet(Amat) # # This is the costliest step # for i in reversed(range(len(lam))): self.Qprint(".", end="") lamb = lam[i] if plateau: H, G0 = self.getH(lamb, Gexp, H, kernMat, G0) rho[i] = np.linalg.norm( (1.0 - self.kernel_prestore(H, kernMat, G0) / Gexp) ) Bmat = self.getBmatrix(H, kernMat, Gexp, G0) else: H = self.getH(lamb, Gexp, H, kernMat) rho[i] = np.linalg.norm((1.0 - self.kernel_prestore(H, kernMat) / Gexp)) Bmat = self.getBmatrix(H, kernMat, Gexp) eta[i] = np.linalg.norm(np.diff(H, n=2)) Hlambda[:, i] = H _, LogDetC = np.linalg.slogdet(lamb * Amat + Bmat) V = rho[i] ** 2 + lamb * eta[i] ** 2 # this assumes a prior exp(-lam) logP[i] = -V + 0.5 * (LogDetN + ns * np.log(lamb) - LogDetC) - lamb if logP[i] > logPmax: logPmax = logP[i] elif logP[i] < logPmax - 18: break # truncate all to significant lambda lam = lam[i:] logP = logP[i:] eta = eta[i:] rho = rho[i:] logP = logP - max(logP) Hlambda = Hlambda[:, i:] # # currently using both schemes to get optimal lamC # new lamM works better with actual experimental data # lamC = self.oldLamC(lam, rho, eta) plam = np.exp(logP) plam = plam / np.sum(plam) lamM = np.exp(np.sum(plam * np.log(lam))) # # Dialling in the Smoothness Factor # SmFacLam = self.parameters["SmFacLam"].value if SmFacLam > 0: lamM = np.exp(np.log(lamM) + SmFacLam * (max(np.log(lam)) - np.log(lamM))) elif SmFacLam < 0: lamM = np.exp(np.log(lamM) + SmFacLam * (np.log(lamM) - min(np.log(lam)))) return lamM, lam, rho, eta, logP, Hlambda
[docs] def MaxwellModes(self, z, w, Gexp, isPlateau): """ Function: MaxwellModes(input) Solves the linear least squares problem to obtain the DRS Input: z = points distributed according to the density, t = n*1 vector contains times, Gexp = 2n*1 vector contains Gp and Gpp isPlateau = True if G0 \neq 0 Output: g, tau = spectrum (array) error = relative error between the input data and the G(t) inferred from the DRS condKp = condition number """ N = len(z) tau = np.exp(z) n = len(w) # # Prune small -ve weights g(i) # g, error, condKp = self.nnLLS(w, tau, Gexp, isPlateau) # first remove runaway modes outside window with potentially large weight izero = np.where( np.logical_or(max(w) * min(tau) < 0.02, min(w) * max(tau) > 50.0) ) tau = np.delete(tau, izero) g = np.delete(g, izero) # search for small weights (gi) if isPlateau: izero = np.where(g[:-1] / np.max(g[:-1]) < 1e-8) else: izero = np.where(g / np.max(g) < 1e-8) tau = np.delete(tau, izero) g = np.delete(g, izero) return g, tau, error, condKp
[docs] def nnLLS(self, w, tau, Gexp, isPlateau): """ # # Helper subfunction which does the actual LLS problem # helps MaxwellModes # """ n = int(len(Gexp) / 2) ntau = len(tau) S, W = np.meshgrid(tau, w) ws = S * W ws2 = ws**2 K = np.vstack((ws2 / (1 + ws2), ws / (1 + ws2))) # 2n * nmodes # K is n*ns [or ns+1] if isPlateau: K = np.hstack((K, np.ones((len(Gexp), 1)))) # G' needs some G0 K[n:, ntau] = 0.0 # G" doesn't have G0 contrib # # gets (Gst/GstE - 1)^2, instead of (Gst - GstE)^2 # Kp = np.dot(np.diag((1.0 / Gexp)), K) condKp = np.linalg.cond(Kp) g = nnls(Kp, np.ones(len(Gexp)))[0] GstM = np.dot(K, g) error = np.sum((GstM / Gexp - 1.0) ** 2) return g, error, condKp
[docs] def GetWeights(self, H, w, s, wb): """ % % Function: GetWeights(input) % % Finds the weight of "each" mode by taking a weighted average of its contribution % to Gp and Gpp, mixed with an even distribution given by `wb` % % Input: H = CRS (ns * 1) % w = n*1 vector contains times % s = relaxation modes (ns * 1) % wb = weightBaseDist % % Output: wt = weight of each mode % """ ns = len(s) n = len(w) hs = np.zeros(ns) wt = hs hs[0] = 0.5 * np.log(s[1] / s[0]) hs[ns - 1] = 0.5 * np.log(s[ns - 1] / s[ns - 2]) hs[1 : ns - 1] = 0.5 * (np.log(s[2:ns]) - np.log(s[0 : ns - 2])) S, W = np.meshgrid(s, w) ws = S * W ws2 = ws**2 kern = np.vstack((ws2 / (1 + ws2), ws / (1 + ws2))) wij = np.dot(kern, np.diag(hs * np.exp(H))) # 2n * ns K = np.dot(kern, hs * np.exp(H)) # 2n * 1, comparable with Gexp for i in np.arange(n): wij[i, :] = wij[i, :] / K[i] for j in np.arange(ns): wt[j] = np.sum(wij[:, j]) wt = wt / np.trapz(wt, np.log(s)) wt = (1.0 - wb) * wt + (wb * np.mean(wt)) * np.ones(len(wt)) return wt
[docs] def GridDensity(self, x, px, N): """# # PROGRAM: GridDensity(input) # # Takes in a PDF or density function, and spits out a bunch of points in # accordance with the PDF # # Input: # x = vector of points. It need *not* be equispaced, # px = vector of same size as x: probability distribution or # density function. It need not be normalized but has to be positive. # N = Number of points >= 3. The end points of "x" are included # necessarily, # # Output: # z = Points distributed according to the density # hz = width of the "intervals" - useful to apportion domain to points # if you are doing quadrature with the results, for example. # # (c) Sachin Shanbhag, November 11, 2015 #""" npts = 100 # can potentially change xi = np.linspace(min(x), max(x), npts) # reinterpolate on equi-spaced axis fint = interp1d(x, px, "cubic") # smoothen using cubic splines pint = fint(xi) # interpolation ci = cumtrapz(pint, xi, initial=0) pint = pint / ci[npts - 1] ci = ci / ci[npts - 1] # normalize ci alfa = 1.0 / (N - 1) # alfa/2 + (N-1)*alfa + alfa/2 zij = np.zeros(N + 1) # quadrature interval end marker z = np.zeros(N) # quadrature point z[0] = min(x) z[N - 1] = max(x) # # ci(Z_j,j+1) = (j - 0.5) * alfa # beta = np.arange(0.5, N - 0.5) * alfa zij[0] = z[0] zij[N] = z[N - 1] fint = interp1d(ci, xi, "cubic") zij[1:N] = fint(beta) h = np.diff(zij) # # Quadrature points are not the centroids, but rather the center of masses # of the quadrature intervals # beta = np.arange(1, N - 1) * alfa z[1 : N - 1] = fint(beta) return z, h
[docs] def mergeModes_magic(self, g, tau, imode): """merge modes imode and imode+1 into a single mode return gp and taup corresponding to this new mode used only when magic = True""" iniGuess = [g[imode] + g[imode + 1], 0.5 * (tau[imode] + tau[imode + 1])] res = minimize(self.costFcn_magic, iniGuess, args=(g, tau, imode)) newtau = np.delete(tau, imode + 1) newtau[imode] = res.x[1] newg = np.delete(g, imode + 1) newg[imode] = res.x[0] return newg, newtau
[docs] def normKern_magic(self, w, gn, taun, g1, tau1, g2, tau2): """helper function: for costFcn and mergeModes used only when magic = True""" wt = w * taun Gnp = gn * (wt**2) / (1.0 + wt**2) Gnpp = gn * wt / (1.0 + wt**2) wt = w * tau1 Gop = g1 * (wt**2) / (1.0 + wt**2) Gopp = g1 * wt / (1.0 + wt**2) wt = w * tau2 Gop += g2 * (wt**2) / (1.0 + wt**2) Gopp += g2 * wt / (1.0 + wt**2) return (Gnp / Gop - 1.0) ** 2 + (Gnpp / Gopp - 1.0) ** 2
[docs] def costFcn_magic(self, par, g, tau, imode): """ "helper function for mergeModes; establishes cost function to minimize used only when magic = True""" gn = par[0] taun = par[1] g1 = g[imode] g2 = g[imode + 1] tau1 = tau[imode] tau2 = tau[imode + 1] wmin = min(1.0 / tau1, 1.0 / tau2) / 10.0 wmax = max(1.0 / tau1, 1.0 / tau2) * 10.0 return quad( self.normKern_magic, wmin, wmax, args=(gn, taun, g1, tau1, g2, tau2) )[0]
[docs] def FineTuneSolution(self, tau, w, Gexp, isPlateau): """Given a spacing of modes tau, tries to do NLLS to fine tune it further If it fails, then it returns the old tau back Uses helper function: res_wG which computes residuals """ success = False initError = np.linalg.norm(self.res_wG(tau, w, Gexp, isPlateau)) try: res = least_squares( self.res_wG, tau, bounds=(0.02 / max(w), 50 / min(w)), args=(w, Gexp, isPlateau), ) tau = res.x tau0 = tau.copy() success = True except: pass g, tau, _, _ = self.MaxwellModes( np.log(tau), w, Gexp, isPlateau ) # Get g_i, taui finalError = np.linalg.norm(self.res_wG(tau, w, Gexp, isPlateau)) # keep fine tuned solution, only if it improves things if finalError > initError: success = False return success, g, tau
[docs] def res_wG(self, tau, wexp, Gexp, isPlateau): """ Helper function for final optimization problem """ g, _, _ = self.nnLLS(wexp, tau, Gexp, isPlateau) Gmodel = np.zeros(len(Gexp)) S, W = np.meshgrid(tau, wexp) ws = S * W ws2 = ws**2 K = np.vstack((ws2 / (1 + ws2), ws / (1 + ws2))) # 2n * nmodes # add G0 if isPlateau: Gmodel = np.dot(K, g[:-1]) n = int(len(Gexp) / 2) Gmodel[:n] += g[-1] else: Gmodel = np.dot(K, g) residual = Gmodel / Gexp - 1.0 return residual
[docs] def ShanBhagMaxwellModesFrequency(self, f=None): """Function that calculates the spectrum""" ft = f.data_table tt = self.tables[f.file_name_short] tt.num_columns = ft.num_columns # CALCULATE THE CONTINUOUS SPECTRUM FIRST self.Qprint("<b>CONTINUOUS SPECTRUM</b>") w = ft.data[:, 0] Gp = ft.data[:, 1] Gpp = ft.data[:, 2] # Remove points with w<=0 filt = w > 0 w = w[filt] Gp = Gp[filt] Gpp = Gpp[filt] # Sanitize the input: remove repeated values and space data homogeneously. Using linear interpolation w, indices = np.unique(w, return_index=True) Gp = Gp[indices] Gpp = Gpp[indices] fp = interp1d(w, Gp, fill_value="extrapolate") fpp = interp1d(w, Gpp, fill_value="extrapolate") w = np.logspace(np.log10(np.min(w)), np.log10(np.max(w)), max(len(w), 200)) Gp = fp(w) Gpp = fpp(w) Gexp = np.append(Gp, Gpp) n = len(w) self.n = n self.wfit = np.copy(w) tt.num_rows = len(w) tt.data = np.zeros((tt.num_rows, tt.num_columns)) tt.data[:, 0] = w ns = self.parameters["ns"].value wmin = w[0] wmax = w[n - 1] # determine frequency window if self.parameters["FreqEnd"].value == 1: smin = np.exp(-np.pi / 2) / wmax smax = np.exp(np.pi / 2) / wmin elif self.parameters["FreqEnd"].value == 2: smin = 1.0 / wmax smax = 1.0 / wmin elif self.parameters["FreqEnd"].value == 3: smin = np.exp(+np.pi / 2) / wmax smax = np.exp(-np.pi / 2) / wmin hs = (smax / smin) ** (1.0 / (ns - 1)) s = smin * hs ** np.arange(ns) kernMat = self.getKernMat(s, w) self.Qprint("Initial Set up...", end="") tic = time.time() plateau = self.parameters["plateau"].value if plateau: Hgs, G0 = self.InitializeH(Gexp, s, kernMat, np.min(Gexp)) else: Hgs = self.InitializeH(Gexp, s, kernMat) te = time.time() - tic self.Qprint("({0:.1f} s)".format(te)) tic = time.time() # Find Optimum Lambda with 'lcurve' self.Qprint("Building the L-curve ...", end="") if self.parameters["lamC"].value == 0: if plateau: lamC, lam, rho, eta, logP, Hlam = self.lcurve(Gexp, Hgs, kernMat, G0) else: lamC, lam, rho, eta, logP, Hlam = self.lcurve(Gexp, Hgs, kernMat) else: lamC = self.parameters["lamC"].value te = time.time() - tic self.Qprint("({0:.1f} s)".format(te)) tic = time.time() self.Qprint("lamC = {0:0.3e}".format(lamC)) # Get the best spectrum self.Qprint("Extracting CRS...", end="") if plateau: H, G0 = self.getH(lamC, Gexp, Hgs, kernMat, G0) self.Qprint("G0 = {0:0.3e} ...".format(G0)) else: H = self.getH(lamC, Gexp, Hgs, kernMat) te = time.time() - tic self.Qprint("done ({0:.1f} s)".format(te)) # Save inferred H(s) and Gw if self.parameters["lamC"].value != 0: if plateau: self.K = self.kernel_prestore(H, kernMat, G0) # np.savetxt('output/H.dat', np.c_[s, H], fmt='%e', header='G0 = {0:0.3e}'.format(G0)) else: self.K = self.kernel_prestore(H, kernMat) # np.savetxt('output/H.dat', np.c_[s, H], fmt='%e') else: plam = np.exp(logP) plam = plam / np.sum(plam) Hm = np.zeros(len(s)) Hm2 = np.zeros(len(s)) cnt = 0 for i in range(len(lam)): # ~ Hm += plam[i]*Hlam[:,i] # ~ Hm2 += plam[i]*Hlam[:,i]**2 # count all spectra within a threshold if plam[i] > 0.1: Hm += Hlam[:, i] Hm2 += Hlam[:, i] ** 2 cnt += 1 Hm = Hm / cnt dH = np.sqrt(Hm2 / cnt - Hm**2) if plateau: self.K = self.kernel_prestore(Hm, kernMat, G0) else: self.K = self.kernel_prestore(Hm, kernMat) # np.savetxt('output/Gfit.dat', np.c_[w, K[:n], K[n:]], fmt='%e') if self.prediction_mode == PredictionMode.cont: tt.data[:, 1] = self.K[:n] tt.data[:, 2] = self.K[n:] # Spectrum self.scont = s self.Hcont = H # GET DISCRETE SPECTRUM self.Qprint("<b>DISCRETE SPECTRUM</b>") # range of N scanned MaxNumModes = self.parameters["MaxNumModes"].value if MaxNumModes == 0: Nmax = min(np.floor(3.0 * np.log10(max(w) / min(w))), n / 4) # maximum N Nmin = max(np.floor(0.5 * np.log10(max(w) / min(w))), 3) # minimum N Nv = np.arange(Nmin, Nmax + 1).astype(int) else: Nv = np.arange(MaxNumModes, MaxNumModes + 1).astype(int) Cerror = 1.0 / (np.std(self.K / Gexp - 1.0)) # Cerror = 1.? npts = len(Nv) # range of wtBaseDist scanned deltaBaseWeightDist = self.parameters["deltaBaseWeightDist"].value wtBase = deltaBaseWeightDist * np.arange(1, int(1.0 / deltaBaseWeightDist)) AICbst = np.zeros(len(wtBase)) Nbst = np.zeros(len(wtBase)) # nominal number of modes nzNbst = np.zeros(len(wtBase)) # actual number of nonzero modes # main loop over wtBaseDist for ib, wb in enumerate(wtBase): # Find the distribution of nodes you need wt = self.GetWeights(H, w, s, wb) # Scan the range of number of Maxwell modes N = (Nmin, Nmax) ev = np.zeros(npts) nzNv = np.zeros(npts) # number of nonzero modes for i, N in enumerate(Nv): z, hz = self.GridDensity(np.log(s), wt, N) # select "tau" Points g, tau, ev[i], _ = self.MaxwellModes(z, w, Gexp, plateau) # get g_i nzNv[i] = len(g) # store the best solution for this particular wb AIC = 2.0 * Nv + 2.0 * Cerror * ev AICbst[ib] = min(AIC) Nbst[ib] = Nv[np.argmin(AIC)] nzNbst[ib] = nzNv[np.argmin(AIC)] # global best settings of wb and Nopt; note this is nominal Nopt (!= len(g) due to NNLS) Nopt = int(Nbst[np.argmin(AICbst)]) wbopt = wtBase[np.argmin(AICbst)] # # Recompute the best data-set stats # wt = self.GetWeights(H, w, s, wbopt) z, hz = self.GridDensity(np.log(s), wt, Nopt) # Select "tau" Points g, tau, error, cKp = self.MaxwellModes(z, w, Gexp, plateau) # Get g_i, taui succ, gf, tauf = self.FineTuneSolution(tau, w, Gexp, plateau) if succ: g = gf.copy() tau = tauf.copy() # # Check if modes are close enough to merge # indx = np.argsort(tau) tau = tau[indx] tauSpacing = tau[1:] / tau[:-1] itry = 0 if plateau: g[:-1] = g[indx] else: g = g[indx] minTauSpacing = self.parameters["minTauSpacing"].value while min(tauSpacing) < minTauSpacing and itry < 3: self.Qprint("Tau Spacing < minTauSpacing") imode = np.argmin(tauSpacing) # merge modes imode and imode + 1 g, tau = self.mergeModes_magic(g, tau, imode) succ, g, tau = self.FineTuneSolution(tau, w, Gexp, plateau) if succ: g = gf.copy() tau = tauf.copy() tauSpacing = tau[1:] / tau[:-1] itry += 1 if plateau: G0 = g[-1] g = g[:-1] self.Qprint("Number of optimum nodes = {0:d}".format(len(g))) # Spectrum self.sdisc = tau self.Hdisc = g S, W = np.meshgrid(tau, w) ws = S * W ws2 = ws**2 K = np.vstack((ws2 / (1 + ws2), ws / (1 + ws2))) self.GstM = np.dot(K, g) if plateau: self.GstM[:n] += G0 # TODO: DECIDE IF WE PLOT THE CONTINUOUS OR DISCRETE FIT TO G*(omega) if self.prediction_mode == PredictionMode.disc: tt.data[:, 1] = self.GstM[:n] tt.data[:, 2] = self.GstM[n:] self.Qprint("<b>Fit from discrete spectrum</b>") else: self.Qprint("<b>Fit from continuous spectrum</b>")
[docs] def do_fit(self, line=""): self.Qprint("Fitting not allowed in this theory")
[docs] def do_error(self, line): """Report the error of the current theory Report the error of the current theory on all the files, taking into account the current selected xrange and yrange. File error is calculated as the mean square of the residual, averaged over all points in the file. Total error is the mean square of the residual, averaged over all points in all files. """ total_error = 0 npoints = 0 view = self.parent_dataset.parent_application.current_view tools = self.parent_dataset.parent_application.tools # table='''<table border="1" width="100%">''' # table+='''<tr><th>File</th><th>Error (RSS)</th><th># Pts</th></tr>''' tab_data = [ ["%-18s" % "File", "%-18s" % "Error (RSS)", "%-18s" % "# Pts"], ] for f in self.theory_files(): if self.stop_theory_flag: break xexp, yexp, success = view.view_proc(f.data_table, f.file_parameters) tmp_dt = self.get_non_extended_th_table(f) xth, yth, success = view.view_proc(tmp_dt, f.file_parameters) yth2 = np.copy(yexp) for i in range(xexp.shape[1]): fint = interp1d( xth[:, i], yth[:, i], "linear" ) # Get the theory at the same points as the data yth2[:, i] = np.copy(fint(xexp[:, i])) xth = np.copy(xexp) yth = np.copy(yth2) if self.xrange.get_visible(): conditionx = (xexp > self.xmin) * (xexp < self.xmax) else: conditionx = np.ones_like(xexp, dtype=bool) if self.yrange.get_visible(): conditiony = (yexp > self.ymin) * (yexp < self.ymax) else: conditiony = np.ones_like(yexp, dtype=bool) conditionnaninf = ( (~np.isnan(xexp)) * (~np.isnan(yexp)) * (~np.isnan(xth)) * (~np.isnan(yth)) * (~np.isinf(xexp)) * (~np.isinf(yexp)) * (~np.isinf(xth)) * (~np.isinf(yth)) ) yexp = np.extract(conditionx * conditiony * conditionnaninf, yexp) yth = np.extract(conditionx * conditiony * conditionnaninf, yth) f_error = np.mean((yth - yexp) ** 2) npt = len(yth) total_error += f_error * npt npoints += npt # table+= '''<tr><td>%-18s</td><td>%-18.4g</td><td>%-18d</td></tr>'''% (f.file_name_short, f_error, npt) tab_data.append( ["%-18s" % f.file_name_short, "%-18.4g" % f_error, "%-18d" % npt] ) # table+='''</table><br>''' # self.Qprint(table) self.Qprint(tab_data)
[docs] def plot_theory_stuff(self): """Plot theory helpers""" if not self.view_modes: return # PLOT CONTINUOUS SPECTRUM view = self.parent_dataset.parent_application.current_view data_table_tmp = DataTable(self.axarr) data_table_tmp.num_columns = 3 ns = self.parameters["ns"].value data_table_tmp.num_rows = ns data_table_tmp.data = np.zeros((ns, 3)) data_table_tmp.data[:, 0] = np.reciprocal(self.scont) data_table_tmp.data[:, 1] = np.exp(self.Hcont) data_table_tmp.data[:, 2] = np.exp(self.Hcont) try: x, y, success = view.view_proc(data_table_tmp, None) except TypeError as e: print(e) return self.contspectrum.set_data(x[:, 0], y[:, 0]) data_table_tmp.num_columns = 3 nmodes = len(self.sdisc) data_table_tmp.num_rows = nmodes data_table_tmp.data = np.zeros((nmodes, 3)) data_table_tmp.data[:, 0] = np.reciprocal(self.sdisc) data_table_tmp.data[:, 1] = self.Hdisc data_table_tmp.data[:, 2] = self.Hdisc try: x, y, success = view.view_proc(data_table_tmp, None) except TypeError as e: print(e) return self.discspectrum.set_data(x[:, 0], y[:, 0])
################################################################################## # MAXWELL MODES TIME ##################################################################################
[docs] class TheoryShanbhagMaxwellModesTime(QTheory): """Extract continuous and discrete relaxation spectra from relaxation modulus G(t) * **Parameters** - plateau : is there a residual plateau in the data (default False). - ns : Number of grid points to represent the continuous spectrum (typical 50-100) - lamC : Specify lambda_C instead of using the one inferred from the L-curve (default 0, use the L-curve). - SmFacLam = Smoothing Factor. - MaxNumModes = Max Number of Modes (default 0, automatically determine the optimal number of modes). - lam_min = lower limit of lambda for lcurve calculation (default 1e-10). - lam_max = higher limit of lambda for lcurve calculation (default 1e3). - lamDensity = lambda density per decade (default 3, use 2 or more). - rho_cutoff = Threshold to avoid picking too small lambda for L-curve without (default 0). - deltaBaseWeightDist = how finely to sample BaseWeightDist (default 0.2). - minTauSpacing = how close do successive modes (tau2/tau1) have to be before we try to mege them (default 1.25). """ thname = "ReSpect" description = "Relaxation spectra from relaxation modulus" citations = ["Shanbhag, S., Macromolecular Theory and Simulations, 2019, 1900005"] doi = ["http://dx.doi.org/10.1002/mats.201900005"] html_help_file = "http://reptate.readthedocs.io/manual/Applications/Gt/Theory/theory.html#shanbhag-maxwell-modes" single_file = True def __init__(self, name="", parent_dataset=None, ax=None): """**Constructor**""" super().__init__(name, parent_dataset, ax) self.function = self.MaxwellModesTime self.has_modes = True self.view_modes = True self.parameters["plateau"] = Parameter( "plateau", False, "is there a residual plateau?", ParameterType.boolean, opt_type=OptType.const, display_flag=False, ) self.parameters["ns"] = Parameter( "ns", 100, "Number of grid points to represent the continuous spectrum (typical 50-100)", ParameterType.integer, opt_type=OptType.const, ) self.parameters["lamC"] = Parameter( "lamC", 0, "Specify lambda_C", ParameterType.real, opt_type=OptType.nopt ) self.parameters["SmFacLam"] = Parameter( name="SmFacLam", value=0, description="Smoothing Factor (between -1 and 1)", type=ParameterType.discrete_integer, opt_type=OptType.const, discrete_values=[-1, 0, 1], ) self.parameters["FreqEnd"] = Parameter( name="FreqEnd", value=1, description="Treatment of Frequency Window ends (1, 2 or 3)", type=ParameterType.discrete_integer, opt_type=OptType.const, discrete_values=[1, 2, 3], ) self.parameters["MaxNumModes"] = Parameter( "MaxNumModes", 0, "Max Number of Modes", ParameterType.integer, opt_type=OptType.const, ) self.parameters["lam_min"] = Parameter( "lam_min", 1e-10, "lower limit of lambda for lcurve calculation", ParameterType.real, opt_type=OptType.nopt, ) self.parameters["lam_max"] = Parameter( "lam_max", 1e3, "higher limit of lambda for lcurve calculation", ParameterType.real, opt_type=OptType.nopt, ) self.parameters["lamDensity"] = Parameter( "lamDensity", 2, "lambda density per decade", ParameterType.integer, opt_type=OptType.nopt, ) self.parameters["rho_cutoff"] = Parameter( "rho_cutoff", 0, "Threshold to avoid picking too small lambda", ParameterType.real, opt_type=OptType.nopt, ) self.parameters["deltaBaseWeightDist"] = Parameter( "deltaBaseWeightDist", 0.2, "how finely to sample BaseWeightDist", ParameterType.real, opt_type=OptType.nopt, ) self.parameters["minTauSpacing"] = Parameter( "minTauSpacing", 1.25, "how close modes (tau2/tau1) for merge", ParameterType.real, opt_type=OptType.nopt, ) self.autocalculate = False # GRAPHIC MODES self.graphicmodes = [] self.spectrum = [] ns = self.parameters["ns"].value self.scont = np.zeros(ns) self.Hcont = np.zeros(ns) self.sdisc = np.zeros(ns) self.Hdisc = np.zeros(ns) self.setup_graphic_modes() self.prediction_mode = PredictionMode.cont self.GtM = None self.K = None self.tfit = None # add widgets specific to the theory tb = QToolBar() tb.setIconSize(QSize(24, 24)) self.tbutpredmode = QToolButton() self.tbutpredmode.setPopupMode(QToolButton.MenuButtonPopup) menu = QMenu(self) self.cont_pred_action = menu.addAction( QIcon(":/Icon8/Images/new_icons/icons8-minimum-value.png"), "Fit from Spectrum", ) self.disc_pred_action = menu.addAction( QIcon(":/Icon8/Images/new_icons/icons8-scatter-plot.png"), "Fit from Discrete", ) if self.prediction_mode == PredictionMode.cont: self.tbutpredmode.setDefaultAction(self.cont_pred_action) else: self.tbutpredmode.setDefaultAction(self.disc_pred_action) self.tbutpredmode.setMenu(menu) tb.addWidget(self.tbutpredmode) self.modesaction = tb.addAction( QIcon(":/Icon8/Images/new_icons/icons8-visible.png"), "View modes" ) self.plateauaction = tb.addAction( QIcon(":/Icon8/Images/new_icons/icons8-flat-tire-80.png"), "is there a residual plateau?", ) self.save_modes_action = tb.addAction( QIcon(":/Icon8/Images/new_icons/icons8-save-Maxwell.png"), "Save Modes" ) self.save_spectrum_action = tb.addAction( QIcon(":/Icon8/Images/new_icons/icons8-save_Ht.png"), "Save Spectrum" ) self.modesaction.setCheckable(True) self.modesaction.setChecked(True) self.plateauaction.setCheckable(True) self.plateauaction.setChecked(False) self.thToolsLayout.insertWidget(0, tb) connection_id = self.modesaction.triggered.connect(self.modesaction_change) connection_id = self.plateauaction.triggered.connect(self.plateauaction_change) connection_id = self.save_modes_action.triggered.connect(self.save_modes) connection_id = self.save_spectrum_action.triggered.connect(self.save_spectrum) connection_id = self.cont_pred_action.triggered.connect(self.select_cont_pred) connection_id = self.disc_pred_action.triggered.connect(self.select_disc_pred)
[docs] def select_cont_pred(self): self.prediction_mode = PredictionMode.cont self.tbutpredmode.setDefaultAction(self.cont_pred_action) if len(self.tfit) > 0: th_files = self.theory_files() for f in self.parent_dataset.files: if f in th_files: tt = self.tables[f.file_name_short] tt.num_rows = len(self.tfit) tt.data = np.zeros((tt.num_rows, tt.num_columns)) tt.data[:, 0] = self.tfit tt.data[:, 1] = self.K self.Qprint("<b>Fit from continuous spectrum</b>") self.do_error("") self.do_plot("")
[docs] def select_disc_pred(self): self.prediction_mode = PredictionMode.disc self.tbutpredmode.setDefaultAction(self.disc_pred_action) if len(self.tfit) > 0: th_files = self.theory_files() for f in self.parent_dataset.files: if f in th_files: tt = self.tables[f.file_name_short] tt.num_rows = len(self.tfit) tt.data = np.zeros((tt.num_rows, tt.num_columns)) tt.data[:, 0] = self.tfit tt.data[:, 1] = self.GtM self.Qprint("<b>Fit from discrete spectrum</b>") self.do_error("") self.do_plot("")
[docs] def plateauaction_change(self, checked): self.set_param_value("plateau", checked)
[docs] def save_spectrum(self): """Save Spectrum to a text file""" fpath, _ = QFileDialog.getSaveFileName( self, "Save spectrum to a text file", os.path.join(RepTate.root_dir, "data"), "Text (*.txt)", ) if fpath == "": return with open(fpath, "w") as f: verdata = RepTate._version.get_versions() version = verdata["version"].split("+")[0] date = verdata["date"].split("T")[0] build = verdata["version"] header = "# Continuous spectrum\n" header += "# Generated with RepTate %s %s (build %s)\n" % ( version, date, build, ) header += "# At %s on %s\n" % ( time.strftime("%X"), time.strftime("%a %b %d, %Y"), ) f.write(header) f.write("\n#%15s\t%15s\n" % ("tau_i", "G_i")) n = len(self.scont) for i in range(n): f.write("%15g\t%15g\n" % (self.scont[i], np.exp(self.Hcont[i]))) f.write("\n#end") QMessageBox.information(self, "Success", 'Wrote spectrum "%s"' % fpath)
[docs] def Qhide_theory_extras(self, state): """Uncheck the modeaction button. Called when curent theory is changed""" self.modesaction.setChecked(state)
[docs] def modesaction_change(self, checked): """Change visibility of modes""" self.graphicmodes_visible(checked)
[docs] def setup_graphic_modes(self): """Setup graphic helpers""" self.contspectrum = self.ax.plot([], [])[0] self.contspectrum.set_marker("*") self.contspectrum.set_linestyle("--") self.contspectrum.set_visible(self.view_modes) self.contspectrum.set_color("green") self.contspectrum.set_linewidth(5) self.contspectrum.set_label("") # self.contspectrum.set_markerfacecolor('yellow') self.contspectrum.set_markeredgecolor("black") self.contspectrum.set_markeredgewidth(3) self.contspectrum.set_markersize(6) self.contspectrum.set_alpha(0.5) self.discspectrum = self.ax.plot([], [])[0] self.discspectrum.set_marker("D") self.discspectrum.set_visible(self.view_modes) self.discspectrum.set_label("") self.discspectrum.set_markerfacecolor("green") self.discspectrum.set_markeredgecolor("black") self.discspectrum.set_color("green") self.discspectrum.set_markeredgewidth(3) self.discspectrum.set_markersize(8) self.discspectrum.set_alpha(0.5) self.plot_theory_stuff()
[docs] def destructor(self): """Called when the theory tab is closed""" self.graphicmodes_visible(False) # self.ax.lines.remove(self.contspectrum) # self.ax.lines.remove(self.discspectrum) self.contspectrum.remove() self.discspectrum.remove()
[docs] def show_theory_extras(self, show=False): """Called when the active theory is changed""" self.Qhide_theory_extras(show) self.graphicmodes_visible(show)
[docs] def graphicmodes_visible(self, state): """Change visibility of modes""" self.view_modes = state self.contspectrum.set_visible(self.view_modes) self.discspectrum.set_visible(self.view_modes) self.parent_dataset.parent_application.update_plot()
[docs] def get_modes(self): """Get the values of Maxwell Modes from this theory""" nmodes = len(self.sdisc) tau = self.sdisc G = self.Hdisc return tau, G, True
[docs] def getKernMat(self, s, t): """furnish kerMat() which helps faster kernel evaluation given s, t generates hs * exp(-T/S) [n * ns matrix], where hs = wi = weights for trapezoidal rule integration. This matrix (K) times h = exp(H), Kh, is comparable with Gexp""" ns = len(s) hsv = np.zeros(ns) hsv[0] = 0.5 * np.log(s[1] / s[0]) hsv[ns - 1] = 0.5 * np.log(s[ns - 1] / s[ns - 2]) hsv[1 : ns - 1] = 0.5 * (np.log(s[2:ns]) - np.log(s[0 : ns - 2])) S, T = np.meshgrid(s, t) return np.exp(-T / S) * hsv
[docs] def kernel_prestore(self, H, kernMat, *argv): """ turbocharging kernel function evaluation by prestoring kernel matrix Function: kernel_prestore(input) returns K*h, where h = exp(H) Same as kernel, except prestoring hs, S, and T to improve speed 3x. outputs the n*1 dimensional vector K(H)(t) which is comparable to Gexp = Gt 3/11/2019: returning Kh + G0 Input: H = substituted CRS, kernMat = n*ns matrix [w * exp(-T/S)] """ if len(argv) > 0: G0 = argv[0] else: G0 = 0.0 return np.dot(kernMat, np.exp(H)) + G0
[docs] def InitializeH(self, Gexp, s, kernMat, *argv): """ Function: InitializeH(input) Input: Gexp = n*1 vector [Gt], s = relaxation modes, kernMat = matrix for faster kernel evaluation G0 = optional; if plateau is nonzero Output: H = guessed H G0 = optional guess if *argv is nonempty """ # # To guess spectrum, pick a negative Hgs and a large value of lambda to get a # solution that is most determined by the regularization # March 2019; a single guess is good enough now, because going from large lambda to small # lambda in lcurve. H = -5.0 * np.ones(len(s)) + np.sin(np.pi * s) lam = 1e0 if len(argv) > 0: G0 = argv[0] Hlam, G0 = self.getH(lam, Gexp, H, kernMat, G0) return Hlam, G0 else: Hlam = self.getH(lam, Gexp, H, kernMat) return Hlam
[docs] def getAmatrix(self, ns): """Generate symmetric matrix A = L' * L required for error analysis: helper function for lcurve in error determination""" # L is a ns*ns tridiagonal matrix with 1 -2 and 1 on its diagonal; nl = ns - 2 L = ( np.diag(np.ones(ns - 1), 1) + np.diag(np.ones(ns - 1), -1) + np.diag(-2.0 * np.ones(ns)) ) L = L[1 : nl + 1, :] return np.dot(L.T, L)
[docs] def getBmatrix(self, H, kernMat, Gexp, *argv): """get the Bmatrix required for error analysis; helper for lcurve() not explicitly accounting for G0 in Jr because otherwise I get underflow problems """ n = kernMat.shape[0] ns = kernMat.shape[1] nl = ns - 2 r = np.zeros(n) # vector of size (n); # furnish relevant portion of Jacobian and residual Kmatrix = np.dot((1.0 / Gexp).reshape(n, 1), np.ones((1, ns))) Jr = -self.kernelD(H, kernMat) * Kmatrix # if plateau then unfurl G0 if len(argv) > 0: G0 = argv[0] r = 1.0 - self.kernel_prestore(H, kernMat, G0) / Gexp else: r = 1.0 - self.kernel_prestore(H, kernMat) / Gexp B = np.dot(Jr.T, Jr) + np.diag(np.dot(r.T, Jr)) return B
[docs] def oldLamC(self, lam, rho, eta): # # 8/1/2018: Making newer strategy more accurate and robust: dividing by minimum rho/eta # which is not as sensitive to lam_min, lam_max. This makes lamC robust to range of lam explored # # er = rho/np.amin(rho) + eta/np.amin(eta); er = rho / np.amin(rho) + eta / (np.sqrt(np.amax(eta) * np.amin(eta))) # # Since rho v/s lambda is smooth, we can interpolate the coarse mesh to find minimum # # change 3/20/2019: Scipy 0.17 has a bug with extrapolation: so making lami tad smaller lami = np.logspace(np.log10(min(lam) + 1e-15), np.log10(max(lam) - 1e-15), 1000) erri = np.exp( interp1d( np.log(lam), np.log(er), kind="cubic", bounds_error=False, fill_value=(np.log(er[0]), np.log(er[-1])), )(np.log(lami)) ) ermin = np.amin(erri) eridx = np.argmin(erri) lamC = lami[eridx] # # 2/2: Copying 12/18 edit from pyReSpect-time; # for rough data have cutoff at rho = rho_cutoff? # rhoF = interp1d(lam, rho) rho_cutoff = self.parameters["rho_cutoff"].value if rhoF(lamC) <= rho_cutoff: try: eridx = (np.abs(rhoF(lami) - rho_cutoff)).argmin() if lami[eridx] > lamC: lamC = lami[eridx] except: pass return lamC
[docs] def lcurve(self, Gexp, Hgs, kernMat, *argv): """ Function: lcurve(input) Input: Gexp = n*1 vector [Gt], Hgs = guessed H, kernMat = matrix for faster kernel evaluation par = parameter dictionary G0 = optionally Output: lamC and 3 vectors of size npoints*1 contains a range of lambda, rho and eta. "Elbow" = lamC is estimated using a *NEW* heuristic AND by Hansen method March 2019: starting from large lambda to small cuts calculation time by a lot also gives an error estimate """ plateau = self.parameters["plateau"].value if plateau: G0 = argv[0] lamDensity = self.parameters["lamDensity"].value lam_max = self.parameters["lam_max"].value lam_min = self.parameters["lam_min"].value npoints = int(lamDensity * (np.log10(lam_max) - np.log10(lam_min))) hlam = (lam_max / lam_min) ** (1.0 / (npoints - 1.0)) lam = lam_min * hlam ** np.arange(npoints) eta = np.zeros(npoints) rho = np.zeros(npoints) logP = np.zeros(npoints) H = Hgs.copy() n = len(Gexp) ns = len(H) nl = ns - 2 logPmax = -np.inf # so nothing surprises me! Hlambda = np.zeros((ns, npoints)) # Error Analysis: Furnish A_matrix Amat = self.getAmatrix(len(H)) _, LogDetN = np.linalg.slogdet(Amat) # # This is the costliest step # for i in reversed(range(len(lam))): self.Qprint(".", end="") lamb = lam[i] if plateau: H, G0 = self.getH(lamb, Gexp, H, kernMat, G0) rho[i] = np.linalg.norm( (1.0 - self.kernel_prestore(H, kernMat, G0) / Gexp) ) Bmat = self.getBmatrix(H, kernMat, Gexp, G0) else: H = self.getH(lamb, Gexp, H, kernMat) rho[i] = np.linalg.norm((1.0 - self.kernel_prestore(H, kernMat) / Gexp)) Bmat = self.getBmatrix(H, kernMat, Gexp) eta[i] = np.linalg.norm(np.diff(H, n=2)) Hlambda[:, i] = H _, LogDetC = np.linalg.slogdet(lamb * Amat + Bmat) V = rho[i] ** 2 + lamb * eta[i] ** 2 # this assumes a prior exp(-lam) logP[i] = -V + 0.5 * (LogDetN + ns * np.log(lamb) - LogDetC) - lamb if logP[i] > logPmax: logPmax = logP[i] elif logP[i] < logPmax - 18: break # truncate all to significant lambda lam = lam[i:] logP = logP[i:] eta = eta[i:] rho = rho[i:] logP = logP - max(logP) Hlambda = Hlambda[:, i:] # # currently using both schemes to get optimal lamC # new lamM works better with actual experimental data # lamC = self.oldLamC(lam, rho, eta) plam = np.exp(logP) plam = plam / np.sum(plam) lamM = np.exp(np.sum(plam * np.log(lam))) # # Dialling in the Smoothness Factor # SmFacLam = self.parameters["SmFacLam"].value if SmFacLam > 0: lamM = np.exp(np.log(lamM) + SmFacLam * (max(np.log(lam)) - np.log(lamM))) elif SmFacLam < 0: lamM = np.exp(np.log(lamM) + SmFacLam * (np.log(lamM) - min(np.log(lam)))) return lamM, lam, rho, eta, logP, Hlambda
[docs] def getH(self, lam, Gexp, H, kernMat, *argv): """Purpose: Given a lambda, this function finds the H_lambda(s) that minimizes V(lambda) V(lambda) := ||Gexp - kernel(H)||^2 + lambda * ||L H||^2 Input : lambda = regularization parameter , Gexp = experimental data, H = guessed H, kernMat = matrix for faster kernel evaluation G0 = optional Output : H_lam, [G0] Default uses Trust-Region Method with Jacobian supplied by jacobianLM """ # send Hplus = [H, G0], on return unpack H and G0 if len(argv) > 0: Hplus = np.append(H, argv[0]) res_lsq = least_squares( self.residualLM, Hplus, jac=self.jacobianLM, args=(lam, Gexp, kernMat) ) return res_lsq.x[:-1], res_lsq.x[-1] # send normal H, and collect optimized H back else: res_lsq = least_squares( self.residualLM, H, jac=self.jacobianLM, args=(lam, Gexp, kernMat) ) return res_lsq.x
[docs] def residualLM(self, H, lam, Gexp, kernMat): """HELPER FUNCTION: Gets Residuals r Input: H = guessed H, lambda = regularization parameter , Gexp = experimental data, kernMat = matrix for faster kernel evaluation G0 = plateau Output: a set of n+nl residuals, the first n correspond to the kernel the last nl correspond to the smoothness criterion %""" n = kernMat.shape[0] ns = kernMat.shape[1] nl = ns - 2 r = np.zeros(n + nl) # if plateau then unfurl G0 if len(H) > ns: G0 = H[-1] H = H[:-1] r[0:n] = 1.0 - self.kernel_prestore(H, kernMat, G0) / Gexp # the Gt and else: r[0:n] = 1.0 - self.kernel_prestore(H, kernMat) / Gexp # the Gt and # the curvature constraint is not affected by G0 r[n : n + nl] = np.sqrt(lam) * np.diff(H, n=2) # second derivative return r
[docs] def jacobianLM(self, H, lam, Gexp, kernMat): """ HELPER FUNCTION for optimization: Get Jacobian J returns a (n+nl * ns) matrix Jr; (ns + 1) if G0 is also supplied. Jr_(i, j) = dr_i/dH_j It uses kernelD, which approximates dK_i/dH_j, where K is the kernel """ n = kernMat.shape[0] ns = kernMat.shape[1] nl = ns - 2 # L is a ns*ns tridiagonal matrix with 1 -2 and 1 on its diagonal; L = ( np.diag(np.ones(ns - 1), 1) + np.diag(np.ones(ns - 1), -1) + np.diag(-2.0 * np.ones(ns)) ) L = L[1 : nl + 1, :] # Furnish the Jacobian Jr (n+ns)*ns matrix Kmatrix = np.dot((1.0 / Gexp).reshape(n, 1), np.ones((1, ns))) if len(H) > ns: G0 = H[-1] H = H[:-1] Jr = np.zeros((n + nl, ns + 1)) Jr[0:n, 0:ns] = -self.kernelD(H, kernMat) * Kmatrix Jr[0:n, ns] = -1.0 / Gexp # column for dr_i/dG0 Jr[n : n + nl, 0:ns] = np.sqrt(lam) * L Jr[n : n + nl, ns] = np.zeros(nl) # column for dr_i/dG0 = 0 else: Jr = np.zeros((n + nl, ns)) Jr[0:n, 0:ns] = -self.kernelD(H, kernMat) * Kmatrix Jr[n : n + nl, 0:ns] = np.sqrt(lam) * L return Jr
[docs] def kernelD(self, H, kernMat): """ Function: kernelD(input) outputs the (n*ns) dimensional matrix DK(H)(t) It approximates dK_i/dH_j = K * e(H_j): Input: H = substituted CRS, kernMat = matrix for faster kernel evaluation Output: DK = Jacobian of H """ n = kernMat.shape[0] ns = kernMat.shape[1] # A n*ns matrix with all the rows = H' Hsuper = np.dot(np.ones((n, 1)), np.exp(H).reshape(1, ns)) DK = kernMat * Hsuper return DK
[docs] def MaxwellModes(self, z, t, Gt, isPlateau): """ Function: MaxwellModes(input) Solves the linear least squares problem to obtain the DRS Input: z = points distributed according to the density, [z = log(tau)] t = n*1 vector contains times, Gt = n*1 vector contains G(t), isPlateau = True if G0 \neq 0 Output: g, tau = spectrum (array) error = relative error between the input data and the G(t) inferred from the DRS condKp = condition number """ N = len(z) tau = np.exp(z) n = len(t) Gexp = Gt # # Prune small and -ve weights g(i) # g, error, condKp = self.nnLLS(t, tau, Gexp, isPlateau) # search for small if isPlateau: izero = np.where(g[:-1] / max(g[:-1]) < 1e-7) else: izero = np.where(g / max(g) < 1e-7) tau = np.delete(tau, izero) g = np.delete(g, izero) return g, tau, error, condKp
[docs] def nnLLS(self, t, tau, Gexp, isPlateau): """ # # Helper subfunction which does the actual LLS problem # helps MaxwellModes; relies on nnls # """ n = len(Gexp) S, T = np.meshgrid(tau, t) K = np.exp(-T / S) # n * nmodes # K is n*ns [or ns+1] if isPlateau: K = np.hstack((K, np.ones((len(Gexp), 1)))) # # gets (Gt/GtE - 1)^2, instead of (Gt - GtE)^2 # Kp = np.dot(np.diag((1.0 / Gexp)), K) condKp = np.linalg.cond(Kp) g = nnls(Kp, np.ones(len(Gexp)))[0] GtM = np.dot(K, g) error = np.sum((GtM / Gexp - 1.0) ** 2) return g, error, condKp
[docs] def GetWeights(self, H, t, s, wb): """ % % Function: GetWeights(input) % % Finds the weight of "each" mode by taking a weighted average of its contribution % to G(t) % % Input: H = CRS (ns * 1) % t = n*1 vector contains times % s = relaxation modes (ns * 1) % wb = weightBaseDist % % Output: wt = weight of each mode % """ ns = len(s) n = len(t) hs = np.zeros(ns) wt = hs hs[0] = 0.5 * np.log(s[1] / s[0]) hs[ns - 1] = 0.5 * np.log(s[ns - 1] / s[ns - 2]) hs[1 : ns - 1] = 0.5 * (np.log(s[2:ns]) - np.log(s[0 : ns - 2])) S, T = np.meshgrid(s, t) kern = np.exp(-T / S) # n * ns wij = np.dot(kern, np.diag(hs * np.exp(H))) # n * ns K = np.dot(kern, hs * np.exp(H)) # n * 1, comparable with Gexp for i in np.arange(n): wij[i, :] = wij[i, :] / K[i] for j in np.arange(ns): wt[j] = np.sum(wij[:, j]) wt = wt / np.trapz(wt, np.log(s)) wt = (1.0 - wb) * wt + (wb * np.mean(wt)) * np.ones(len(wt)) return wt
[docs] def GridDensity(self, x, px, N): """# # PROGRAM: GridDensity(input) # # Takes in a PDF or density function, and spits out a bunch of points in # accordance with the PDF # # Input: # x = vector of points. It need *not* be equispaced, # px = vector of same size as x: probability distribution or # density function. It need not be normalized but has to be positive. # N = Number of points >= 3. The end points of "x" are included # necessarily, # # Output: # z = Points distributed according to the density # hz = width of the "intervals" - useful to apportion domain to points # if you are doing quadrature with the results, for example. # # (c) Sachin Shanbhag, November 11, 2015 #""" npts = 100 # can potentially change xi = np.linspace(min(x), max(x), npts) # reinterpolate on equi-spaced axis fint = interp1d(x, px, "cubic") # smoothen using cubic splines pint = fint(xi) # interpolation ci = cumtrapz(pint, xi, initial=0) pint = pint / ci[npts - 1] ci = ci / ci[npts - 1] # normalize ci alfa = 1.0 / (N - 1) # alfa/2 + (N-1)*alfa + alfa/2 zij = np.zeros(N + 1) # quadrature interval end marker z = np.zeros(N) # quadrature point z[0] = min(x) z[N - 1] = max(x) # ci(Z_j,j+1) = (j - 0.5) * alfa beta = np.arange(0.5, N - 0.5) * alfa zij[0] = z[0] zij[N] = z[N - 1] fint = interp1d(ci, xi, "cubic") zij[1:N] = fint(beta) h = np.diff(zij) # Quadrature points are not the centroids, but rather the center of masses # of the quadrature intervals beta = np.arange(1, N - 1) * alfa z[1 : N - 1] = fint(beta) return z, h
[docs] def mergeModes_magic(self, g, tau, imode): """merge modes imode and imode+1 into a single mode return gp and taup corresponding to this new mode; 12/2018 - also tries finetuning before returning uses helper functions: - normKern_magic() - costFcn_magic() """ iniGuess = [g[imode] + g[imode + 1], 0.5 * (tau[imode] + tau[imode + 1])] res = minimize(self.costFcn_magic, iniGuess, args=(g, tau, imode)) newtau = np.delete(tau, imode + 1) newtau[imode] = res.x[1] return newtau
[docs] def normKern_magic(self, t, gn, taun, g1, tau1, g2, tau2): """helper function: for costFcn and mergeModes""" Gn = gn * np.exp(-t / taun) Go = g1 * np.exp(-t / tau1) + g2 * np.exp(-t / tau2) return (Gn / Go - 1.0) ** 2
[docs] def costFcn_magic(self, par, g, tau, imode): """ "helper function for mergeModes; establishes cost function to minimize""" gn = par[0] taun = par[1] g1 = g[imode] g2 = g[imode + 1] tau1 = tau[imode] tau2 = tau[imode + 1] tmin = min(tau1, tau2) / 10.0 tmax = max(tau1, tau2) * 10.0 return quad( self.normKern_magic, tmin, tmax, args=(gn, taun, g1, tau1, g2, tau2) )[0]
[docs] def FineTuneSolution(self, tau, t, Gexp, isPlateau, estimateError=False): """Given a spacing of modes tau, tries to do NLLS to fine tune it further If it fails, then it returns the old tau back Uses helper function: res_tG which computes residuals """ success = False try: res = least_squares( self.res_tG, tau, bounds=(0.0, np.inf), args=(t, Gexp, isPlateau) ) tau = res.x tau0 = tau.copy() # Error Estimate if estimateError: J = res.jac cov = np.linalg.pinv(J.T.dot(J)) * (res.fun**2).mean() dtau = np.sqrt(np.diag(cov)) success = True except: e = sys.exc_info()[0] self.Qprint("<p>Error: %s</p>" % e) # pass g, tau, _, _ = self.MaxwellModes( np.log(tau), t, Gexp, isPlateau ) # Get g_i, taui # # if mode has dropped out, then need to delete corresponding dtau mode # if estimateError and success: if len(tau) < len(tau0): nkill = 0 for i in range(len(tau0)): if np.min(np.abs(tau0[i] - tau)) > 1e-12 * tau0[i]: dtau = np.delete(dtau, i - nkill) nkill += 1 return g, tau, dtau elif estimateError: return g, tau, -1 * np.ones(len(tau)) else: return g, tau
[docs] def res_tG(self, tau, texp, Gexp, isPlateau): """ Helper function for final optimization problem """ g, _, _ = self.nnLLS(texp, tau, Gexp, isPlateau) Gmodel = np.zeros(len(texp)) for j in range(len(tau)): Gmodel += g[j] * np.exp(-texp / tau[j]) # add G0 if isPlateau: Gmodel += g[-1] residual = Gmodel / Gexp - 1.0 return residual
[docs] def MaxwellModesTime(self, f=None): """Calculate the theory""" ft = f.data_table tt = self.tables[f.file_name_short] tt.num_columns = ft.num_columns # CALCULATE THE CONTINUOUS SPECTRUM FIRST self.Qprint("<b>CONTINUOUS SPECTRUM</b>") t = ft.data[:, 0] Gexp = ft.data[:, 1] # Remove points with t<=0 filt = t > 0 t = t[filt] Gexp = Gexp[filt] # Sanitize the input: remove repeated values and space data homogeneously t, indices = np.unique(t, return_index=True) Gexp = Gexp[indices] f = interp1d(t, Gexp, fill_value="extrapolate") t = np.logspace(np.log10(np.min(t)), np.log10(np.max(t)), max(len(t), 100)) Gexp = f(t) self.tfit = np.copy(t) n = len(t) ns = self.parameters["ns"].value tmin = t[0] tmax = t[n - 1] # determine frequency window if self.parameters["FreqEnd"].value == 1: smin = np.exp(-np.pi / 2) * tmin smax = np.exp(np.pi / 2) * tmax elif self.parameters["FreqEnd"].value == 2: smin = tmin smax = tmax elif self.parameters["FreqEnd"].value == 3: smin = np.exp(+np.pi / 2) * tmin smax = np.exp(-np.pi / 2) * tmax hs = (smax / smin) ** (1.0 / (ns - 1)) s = smin * hs ** np.arange(ns) kernMat = self.getKernMat(s, t) tic = time.time() plateau = self.parameters["plateau"].value self.Qprint("Initial Set up...", end="") if plateau: Hgs, G0 = self.InitializeH(Gexp, s, kernMat, np.min(Gexp)) else: Hgs = self.InitializeH(Gexp, s, kernMat) te = time.time() - tic self.Qprint("({0:.1f} s)".format(te)) tic = time.time() # Find Optimum Lambda with 'lcurve' lamC = self.parameters["lamC"].value self.Qprint("Building the L-curve ...", end="") if lamC == 0: if plateau: lamC, lam, rho, eta, logP, Hlam = self.lcurve(Gexp, Hgs, kernMat, G0) else: lamC, lam, rho, eta, logP, Hlam = self.lcurve(Gexp, Hgs, kernMat) te = time.time() - tic self.Qprint("({0:.1f} s)".format(te)) tic = time.time() self.Qprint("lamC = {0:0.3e}".format(lamC)) # Get the best spectrum self.Qprint("Extracting CRS...", end="") if plateau: H, G0 = self.getH(lamC, Gexp, Hgs, kernMat, G0) self.Qprint("G0 = {0:0.3e} ...".format(G0)) else: H = self.getH(lamC, Gexp, Hgs, kernMat) te = time.time() - tic self.Qprint("done ({0:.1f} s)".format(te)) # Save inferred H(s) and Gw if lamC != 0: if plateau: self.K = self.kernel_prestore(H, kernMat, G0) # np.savetxt('output/H.dat', np.c_[s, H], fmt='%e', header='G0 = {0:0.3e}'.format(G0)) else: self.K = self.kernel_prestore(H, kernMat) # np.savetxt('output/H.dat', np.c_[s, H], fmt='%e') # np.savetxt('output/Gfit.dat', np.c_[w, K[:n], K[n:]], fmt='%e') if self.prediction_mode == PredictionMode.cont: tt.num_rows = len(t) tt.data = np.zeros((tt.num_rows, tt.num_columns)) tt.data[:, 0] = t tt.data[:, 1] = self.K # Spectrum self.scont = s self.Hcont = H # GET DISCRETE SPECTRUM self.Qprint("<b>DISCRETE SPECTRUM</b>") # range of N scanned MaxNumModes = self.parameters["MaxNumModes"].value if MaxNumModes == 0: Nmax = min(np.floor(3.0 * np.log10(max(t) / min(t))), n / 4) # maximum N Nmin = max(np.floor(0.5 * np.log10(max(t) / min(t))), 2) # minimum N Nv = np.arange(Nmin, Nmax + 1).astype(int) else: Nv = np.arange(MaxNumModes, MaxNumModes + 1).astype(int) Cerror = 1.0 / (np.std(self.K / Gexp - 1.0)) # Cerror = 1.? npts = len(Nv) # range of wtBaseDist scanned deltaBaseWeightDist = self.parameters["deltaBaseWeightDist"].value wtBase = deltaBaseWeightDist * np.arange(1, int(1.0 / deltaBaseWeightDist)) AICbst = np.zeros(len(wtBase)) Nbst = np.zeros(len(wtBase)) # nominal number of modes nzNbst = np.zeros(len(wtBase)) # actual number of nonzero modes # main loop over wtBaseDist for ib, wb in enumerate(wtBase): # Find the distribution of nodes you need wt = self.GetWeights(H, t, s, wb) # Scan the range of number of Maxwell modes N = (Nmin, Nmax) ev = np.zeros(npts) nzNv = np.zeros(npts) # number of nonzero modes for i, N in enumerate(Nv): z, hz = self.GridDensity(np.log(s), wt, N) # select "tau" Points g, tau, ev[i], _ = self.MaxwellModes(z, t, Gexp, plateau) # get g_i nzNv[i] = len(g) # store the best solution for this particular wb AIC = 2.0 * Nv + 2.0 * Cerror * ev # Fine-Tune the best in class-fit further by trying an NLLS optimization on it. # N = Nv[np.argmin(AIC)] z, hz = self.GridDensity(np.log(s), wt, N) # Select "tau" Points g, tau, error, cKp = self.MaxwellModes(z, t, Gexp, plateau) # Get g_i, taui AICbst[ib] = min(AIC) Nbst[ib] = Nv[np.argmin(AIC)] nzNbst[ib] = nzNv[np.argmin(AIC)] # global best settings of wb and Nopt; note this is nominal Nopt (!= len(g) due to NNLS) Nopt = int(Nbst[np.argmin(AICbst)]) wbopt = wtBase[np.argmin(AICbst)] # # Recompute the best data-set stats # wt = self.GetWeights(H, t, s, wbopt) z, hz = self.GridDensity(np.log(s), wt, Nopt) # Select "tau" Points g, tau, _, _ = self.MaxwellModes(z, t, Gexp, plateau) # Get g_i, taui g, tau, dtau = self.FineTuneSolution(tau, t, Gexp, plateau, estimateError=True) # # Check if modes are close enough to merge # indx = np.argsort(tau) tau = tau[indx] tauSpacing = tau[1:] / tau[:-1] itry = 0 if plateau: g[:-1] = g[indx] else: g = g[indx] minTauSpacing = self.parameters["minTauSpacing"].value while min(tauSpacing) < minTauSpacing and itry < 3: self.Qprint("Tau Spacing < minTauSpacing") imode = np.argmin(tauSpacing) # merge modes imode and imode + 1 tau = self.mergeModes_magic(g, tau, imode) g, tau, dtau = self.FineTuneSolution( tau, t, Gexp, plateau, estimateError=True ) if len(tau) == 1: break tauSpacing = tau[1:] / tau[:-1] itry += 1 if plateau: G0 = g[-1] g = g[:-1] self.Qprint("Number of optimum nodes = {0:d}".format(len(g))) self.Qprint( "log10(Condition number) of matrix equation: {0:.2f}".format(np.log10(cKp)) ) # Spectrum self.sdisc = tau self.Hdisc = g S, T = np.meshgrid(tau, t) K = np.exp(-T / S) self.GtM = np.dot(K, g) if plateau: self.GtM += G0 # TODO: DECIDE IF WE PLOT THE CONTINUOUS OR DISCRETE FIT TO G*(omega) if self.prediction_mode == PredictionMode.disc: self.Qprint("<b>Fit from discrete spectrum</b>") tt.num_rows = len(t) tt.data = np.zeros((tt.num_rows, tt.num_columns)) tt.data[:, 0] = t tt.data[:, 1] = self.GtM else: self.Qprint("<b>Fit from continuous spectrum</b>") try: gamma = float(f.file_parameters["gamma"]) if gamma == 0: gamma = 1 except: gamma = 1
[docs] def do_fit(self, line=""): self.Qprint("Fitting not allowed in this theory")
[docs] def do_error(self, line): """Report the error of the current theory Report the error of the current theory on all the files, taking into account the current selected xrange and yrange. File error is calculated as the mean square of the residual, averaged over all points in the file. Total error is the mean square of the residual, averaged over all points in all files. """ total_error = 0 npoints = 0 view = self.parent_dataset.parent_application.current_view tools = self.parent_dataset.parent_application.tools # table='''<table border="1" width="100%">''' # table+='''<tr><th>File</th><th>Error (RSS)</th><th># Pts</th></tr>''' tab_data = [ ["%-18s" % "File", "%-18s" % "Error (RSS)", "%-18s" % "# Pts"], ] for f in self.theory_files(): if self.stop_theory_flag: break xexp, yexp, success = view.view_proc(f.data_table, f.file_parameters) tmp_dt = self.get_non_extended_th_table(f) xth, yth, success = view.view_proc(tmp_dt, f.file_parameters) yth2 = np.copy(yexp) for i in range(xexp.shape[1]): fint = interp1d( xth[:, i], yth[:, i], "cubic", bounds_error=False ) # , fill_value=nan) # Get the theory at the same points as the data yth2[:, i] = np.copy(fint(xexp[:, i])) xth = np.copy(xexp) yth = np.copy(yth2) if self.xrange.get_visible(): conditionx = (xexp > self.xmin) * (xexp < self.xmax) else: conditionx = np.ones_like(xexp, dtype=bool) if self.yrange.get_visible(): conditiony = (yexp > self.ymin) * (yexp < self.ymax) else: conditiony = np.ones_like(yexp, dtype=bool) conditionnaninf = ( (~np.isnan(xexp)) * (~np.isnan(yexp)) * (~np.isnan(xth)) * (~np.isnan(yth)) * (~np.isinf(xexp)) * (~np.isinf(yexp)) * (~np.isinf(xth)) * (~np.isinf(yth)) ) yexp = np.extract(conditionx * conditiony * conditionnaninf, yexp) yth = np.extract(conditionx * conditiony * conditionnaninf, yth) f_error = np.mean((yth - yexp) ** 2) npt = len(yth) total_error += f_error * npt npoints += npt # table+= '''<tr><td>%-18s</td><td>%-18.4g</td><td>%-18d</td></tr>'''% (f.file_name_short, f_error, npt) tab_data.append( ["%-18s" % f.file_name_short, "%-18.4g" % f_error, "%-18d" % npt] ) # table+='''</table><br>''' # self.Qprint(table) self.Qprint(tab_data)
[docs] def plot_theory_stuff(self): """Plot theory helpers""" if not self.view_modes: return # PLOT CONTINUOUS SPECTRUM view = self.parent_dataset.parent_application.current_view data_table_tmp = DataTable(self.axarr) data_table_tmp.num_columns = 2 ns = self.parameters["ns"].value data_table_tmp.num_rows = ns data_table_tmp.data = np.zeros((ns, 2)) data_table_tmp.data[:, 0] = self.scont data_table_tmp.data[:, 1] = np.exp(self.Hcont) try: x, y, success = view.view_proc(data_table_tmp, None) except TypeError as e: print(e) return self.contspectrum.set_data(x, y) data_table_tmp.num_columns = 2 nmodes = len(self.sdisc) data_table_tmp.num_rows = nmodes data_table_tmp.data = np.zeros((nmodes, 2)) data_table_tmp.data[:, 0] = self.sdisc data_table_tmp.data[:, 1] = self.Hdisc try: x, y, success = view.view_proc(data_table_tmp, None) except TypeError as e: print(e) return self.discspectrum.set_data(x, y)