Source code for armi.bookkeeping.historyTracker

# Copyright 2019 TerraPower, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The History Tracker is a bookkeeping interface that accesses and reports time-dependent state
information from the database.

At the end of a run, these write text files to
show the histories for various follow-on mechanical analysis,
fuel performance analysis, etc.

Other interfaces may find this useful as well, to get an assembly history
for fuel performance analysis, etc. This is particularly useful in equilibrium runs,
where the ``EqHistoryTrackerInterface`` will unravel the full history from a single
equilibrium cycle.

Getting history information
---------------------------
Loop over blocks, keys, and timesteps of interest and use commands like this::

    history.getBlockHistoryVal(armiBlock.getName(), key, ts)

Using the database-based history trackers
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can pre-load information before gathering it to get much better performance::

    history.preloadBlockHistoryVals(blockNames, historyKeys, timeSteps)

This is essential for performance when history information is going to be accessed
in loops over assemblies or blocks. Reading each param directly from the database
individually in loops is paralyzingly slow.

Specifying parameters to add to the EOL history report
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To add state parameters to the list of things that get their history reported, you need to define an interface
method called `getHistoryParams`. It should return a list of block parameters that will become available. For example::

    def getHistoryParams(self):
        return ['flux', 'percentBu']

When you'd like to access history information, you need to grab the history interface. The history interfaces is
present by default in your interface stack. To get it, just call::

    history = self.getInterface('history')

Now you can do a few things, such as::

    # get some info about what's stored in the history
    assemsWithHistory = history.getDetailAssemblies()
    timeStepsAvailable = history.getTimeIndices()

    # now go out and get some time-dependent block params:
    fluxAtTimeStep3 = history.getBlockHistoryVal('B1003A', 'flux', 3)

Specifying blocks and assemblies to track
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
See :ref:`detail-assems`.

"""
import re
import os
from collections import OrderedDict
from typing import Tuple

import numpy
from matplotlib import pyplot
import tabulate

from armi import interfaces
from armi import runLog
from armi import utils
from armi import operators
from armi.utils import textProcessors
from armi.reactor.flags import Flags
from armi.reactor import grids

ORDER = 2 * interfaces.STACK_ORDER.BEFORE + interfaces.STACK_ORDER.BOOKKEEPING


[docs]def describeInterfaces(cs): """Function for exposing interface(s) to other code""" if cs["runType"] not in (operators.RunTypes.EQUILIBRIUM): klass = HistoryTrackerInterface return (klass, {})
[docs]class HistoryTrackerInterface(interfaces.Interface): """ Makes reports of the state that individual assemblies encounter. Attributes ---------- detailAssemblyNames : list List of detail assembly names in the reactor time : list list of reactor time in years """ name = "history" def __init__(self, r, cs): """ HistoryTracker that uses the database to look up parameter history rather than storing them in memory. Warning ------- If the current timestep history is requested and the database has not yet been written this timestep, the current value of the requested parameter is provided. It is possible that this is not the value that will be written to the database during this time step since many interfaces that change parameters may interact between this call and the database write. """ interfaces.Interface.__init__(self, r, cs) self.detailAssemblyNames = [] self.time = [] # time in years self.fullCoreLocations = {} self.xsHistory = {} self._preloadedBlockHistory = None
[docs] def interactBOL(self): self.addDetailAssembliesBOL()
[docs] def interactBOC(self, cycle=None): """Look for any new assemblies that are asked for and add them to tracking.""" self.addDetailAssemsByAssemNums() if self.cs["detailAllAssems"]: self.addAllFuelAssems()
[docs] def interactEOL(self): """Generate the history reports.""" self._writeDetailAssemblyHistories() self.printFullCoreLocations()
[docs] def addDetailAssembliesBOL(self): """ Find and activate assemblies that the user requested detailed treatment of. """ if self.cs["detailAssemLocationsBOL"]: for locLabel in self.cs["detailAssemLocationsBOL"]: ring, pos, _axial = grids.locatorLabelToIndices(locLabel) i, j = self.r.core.spatialGrid.getIndicesFromRingAndPos(ring, pos) aLoc = self.r.core.spatialGrid[i, j, 0] try: a = self.r.core.childrenByLocator[aLoc] except KeyError: runLog.error( "Detail assembly in location {} (requested via " "`detailAssemLocationsBOL`) is not in core. " "Update settings.".format(locLabel) ) raise self.addDetailAssembly(a) if self.cs["detailAllAssems"]: self.addAllFuelAssems() # This also gets called at BOC but we still # do it here for operators that do not call BOC. self.addDetailAssemsByAssemNums()
[docs] def addAllFuelAssems(self): """Add all fuel assems as detail assems.""" for a in self.r.core: if a.hasFlags(Flags.FUEL): self.addDetailAssembly(a)
[docs] def addDetailAssemsByAssemNums(self): """ Activate detail assemblies from input based on assembly number. This is used to activate detail assembly tracking on assemblies that are not present in the core at BOL. See Also -------- addDetailAssembliesBOL : Similar but for BOL """ detailAssemNums = self.cs["detailAssemNums"] if not detailAssemNums: return for a in self.r.core: thisNum = a.getNum() # check for new detail assemblies if thisNum in detailAssemNums: self.addDetailAssembly(a)
[docs] def _writeDetailAssemblyHistories(self): """ Write data file with assembly histories """ for a in self.getDetailAssemblies(): self.writeAssemHistory(a)
[docs] def _getAssemHistoryFileName(self, assem): return self._getHistoryFileName(assem.getName(), "a")
[docs] def _getBlockHistoryFileName(self, block): """Get name for block.""" return self._getHistoryFileName( block.getName() + "{}".format(block.spatialLocator.k), "b" )
[docs] def _getLocationHistoryFileName(self, location): return self._getHistoryFileName( str(location) + "{}".format(location.axial), "l" )
[docs] def _getHistoryFileName(self, label, letter): return "{0}-{1}-{2}Hist.txt".format(self.cs.caseTitle, label, letter)
[docs] def getTrackedParams(self): """ Give the list of block parameters that are being tracked. """ trackedParams = {"residence", "ztop", "zbottom"} # loop through interfaces to allow them to add custom params. for i in self.o.getInterfaces(): for newParam in i.getHistoryParams(): if newParam not in trackedParams: trackedParams.add(newParam) return sorted(trackedParams)
[docs] def addDetailAssembly(self, a): """Track the name of assemblies that are flagged for detailed treatment.""" aName = a.getName() if aName not in self.detailAssemblyNames: self.detailAssemblyNames.append(aName)
[docs] def getDetailAssemblies(self): r"""returns the assemblies that have been signaled as detail assemblies.""" assems = [] if not self.detailAssemblyNames: runLog.info("No detail assemblies HistoryTrackerInterface") for name in self.detailAssemblyNames: try: assems.append(self.r.core.getAssemblyByName(name)) except KeyError: if name in {a.name for a in self.r.core}: raise Exception("Found it") runLog.warning( "Cannot find detail assembly {} in assemblies-by-name lookup table, which has {} entries" "".format(name, len(self.r.core.assembliesByName)) ) return assems
[docs] def getDetailBlocks(self): """Get all blocks in all detail assemblies.""" return [block for a in self.getDetailAssemblies() for block in a]
[docs] def filterTimeIndices(self, timeIndices, boc=False, moc=False, eoc=False): """Takes a list of time indices and filters them down to boc moc or eoc.""" filtered = [] steps = self.cs["burnSteps"] + 1 for i in timeIndices: if boc and i % steps == 0: filtered.append(i) if moc and i % steps == steps // 2: filtered.append(i) if eoc and i % steps == steps - 1: filtered.append(i) if not boc and not moc and not eoc: filtered.append(i) return filtered
[docs] def getTimeIndices(self, a=None, boc=False, moc=False, eoc=False): r""" Generate a list of timestep indices where valid history data exist for the given criteria. Parameters ---------- a : Assembly, optional If given, only generate time indices where the assembly `a` is in the core. Default: All assemblies. boc, moc, eoc : bool, optional Will return boc/moc/eoc timenodes in every cycle. If any of these are true, allNodes becomes False Returns ------- timeIndices : list A list of integers where history data exists. Examples -------- If there are 5 nodes per cycle (burnSteps = 4), 0 1 2 3 4 | 5 6 7 8 9 | 10 11 12 13 14 | ...: >>> getTimeIndices(moc=True): [2, 7, 12, ...] Warning ------- This is no longer functional, as much of the old history tracking was based on implementation details of the Database, version 2. We now directly support history tracking through the Database, version 3. At some point this code should be removed. See Also -------- getTimeSteps : gets time in years where the assembly is in the core """ timeIndices = [] coreGrid = self.r.core.spatialGrid for globalNode in range( utils.getTimeStepNum(self.r.p.cycle, self.r.p.timeNode, self.cs) + 1 ): if a is None: timeIndices.append(globalNode) else: fBlock = a.getFirstBlock(Flags.FUEL) if fBlock is None: blockLocationLabel = None else: blockLocationLabel = self._blockLocationAtTimenode( fBlock, globalNode ) runLog.info( "Location label of {} at timestep {} is {}".format( fBlock, globalNode, blockLocationLabel ) ) # only add this timestep if it's around for this assembly. if blockLocationLabel is not None: # this label doesn't actually properly correspond to the block # location label determined by _blockLocationAtTimenode. # blockLocationLabel is supposed to be coming from a previous time # state in the database. if a.spatialLocator.grid is coreGrid: timeIndices.append(globalNode) return self.filterTimeIndices(timeIndices, boc, moc, eoc)
[docs] def getBOCEOCTimeIndices(self, assem=None): r"""returns a list of time step indices that only include BOC and EOC, no intermediate ones.""" tIndices = self.getTimeIndices(assem) # list of times in years counter = 0 filtered = [] for tIndex in tIndices: if counter == 0: # boc. add it. filtered.append(tIndex) counter += 1 elif counter == self.cs["burnSteps"]: # eoc. add it. filtered.append(tIndex) counter = 0 else: # not boc or eoc. Just increment counter. tick, tick, tick counter += 1 return filtered
[docs] def getAssemParamHistory(self, a, neededParams): """Gets the history typically used for the Alchemy Writer Returns ------- assemHistory : dict, nested with 3 levels, e.g. assemHistory[block][time_step][parameter] = value of parameter at time step on block Raises ------ RuntimeError When the assembly has no history. """ timeSteps = self.getTimeIndices(a) if not timeSteps: raise RuntimeError( "Time steps empty. Cannot get assembly history for {}".format(a) ) # assemHistory[block][time_step][parameter] = value of parameter at time step on block assemHistory = OrderedDict([(block, None) for block in a.getBlocks(Flags.FUEL)]) for block in assemHistory: assemHistory[block] = OrderedDict([(ts, None) for ts in timeSteps]) for ts in assemHistory[block]: assemHistory[block][ts] = OrderedDict( [(param, None) for param in neededParams] ) for param in assemHistory[block][ts]: val = self.getBlockHistoryVal(block.getName(), param, ts) assemHistory[block][ts][param] = val assemHistory[block][ts]["location"] = self.getBlockHistoryVal( block.getName(), "loc", ts ) return assemHistory
[docs] def writeAssemHistory(self, a, fName=""): """Write the assembly history report to a text file.""" fName = fName or self._getAssemHistoryFileName(a) dbi = self.getInterface("database") times = dbi.getHistory(self.r, ["time"])["time"] with open(fName, "w") as out: # ts is a tuple, remove the spaces from the string representation so it is easy to load # into a spreadsheet or whatever headers = [str(ts).replace(" ", "") for ts in times.keys()] out.write( tabulate.tabulate( headers=headers, tabular_data=(times.values(),), tablefmt="plain", floatfmt="11.5E", ) ) out.write("\n") params = self.getTrackedParams() blocks = [ b for bi, b in enumerate(a) if bi not in self.cs["stationaryBlocks"] ] blockHistories = dbi.getHistories(blocks, params) for param in params: out.write("\n\nkey: {0}\n".format(param)) data = [blockHistories[b][param].values() for b in blocks] out.write(tabulate.tabulate(data, tablefmt="plain", floatfmt="11.5E")) out.write("\n") # loc is a tuple, remove the spaces from the string representation so it is easy to load # into a spreadsheet or whatever location = [ str(loc).replace(" ", "") for loc in dbi.getHistory(a, ["location"])["location"].values() ] out.write("\n\nkey: location\n") out.write(tabulate.tabulate((location,), tablefmt="plain")) out.write("\n\n\n") headers = "EOL bottom top center".split() data = [("", b.p.zbottom, b.p.ztop, b.p.z) for b in blocks] out.write( tabulate.tabulate( data, headers=headers, tablefmt="plain", floatfmt="10.3f" ) ) out.write("\n\n\nAssembly info\n") out.write("{0} {1}\n".format(a.getName(), a.getType())) for b in blocks: out.write('"{}" {} {}\n'.format(b.getType(), b.p.xsType, b.p.buGroup))
[docs] def printFullCoreLocations(self): """ Print a report showing the locations of each assembly as functions of time. This is useful for third-party follow-on analysis of fuel management. """ aNameList = [] # NWT: Have to read this from the DB. ofile = open(self.cs.caseTitle + ".locationHistory.txt", "w") # MORE data files ofile.write( " ".join( ["Assem"] + ["{:5d}".format(c) for c in range(self.cs["nCycles"])] + ["\n"] ) ) for aName in aNameList: # print the assembly number and then all the locations it was ever in. line = [aName[1:] + " "] for cycle in range(self.cs["nCycles"]): row, loc = self.fullCoreLocations.get((aName, cycle), (None, None)) if row: val1 = "{0:02d}{1:03d}".format(row, loc) else: # none returned val1 = " " line.append(val1) line.append("\n") ofile.write(" ".join(line)) ofile.close()
[docs] def preloadBlockHistoryVals(self, names, keys, timesteps): """ Pre-load block data so it can be more quickly accessed in the future. Notes ----- Pre-loading has value because the database is organized in a fashion that is easy/inexpensive to look up data for many of time steps simultaneously. These can then be stored and provided when the specific timestep is requested. The method ``getBlockHistoryVal`` still looks at the database if the preloaded values don't have the needed data, so the same results should be given if this method is not called. """ try: dbi = self.getInterface("database") blocks = [self.r.core.getBlockByName(name) for name in names] # weird special stuff for loc, just leave it be. keys = [key for key in keys if key != "loc"] data = dbi.getHistories(blocks, keys, timesteps) self._preloadedBlockHistory = data except: # pylint: disable=bare-except # fails during the beginning of standard runs, but that's ok runLog.info( f"Unable to pre-load block history values due to error:" "\n{traceback.format_exc()}" ) self.unloadBlockHistoryVals()
[docs] def unloadBlockHistoryVals(self): """Remove all cached db reads.""" self._preloadedBlockHistory = None
[docs] def getBlockHistoryVal(self, name: str, paramName: str, ts: Tuple[int, int]): """ Use the database interface to return the parameter values for the supplied block names, and timesteps. Notes ----- If the current timestep history is requested and the database has not yet been written this timestep, the current value of the requested parameter is returned. Parameters ---------- name name of block paramName parameter keys of interest ts cycle and node from which to load data Raises ------ KeyError When param not found in database. """ block = self.r.core.getBlockByName(name) if paramName == "loc": # special behavior for location param. return self._blockLocationAtTimenode(block, ts) if self._isCurrentTimeStep(ts) and not self._databaseHasDataForTimeStep(ts): # current timenode may not have been written to the DB. Use the current # value in the param system. works for fuel performance, for some params, # e.g. burnup, dpa. return block.p[paramName] try: val = self._preloadedBlockHistory[block][paramName][ts] # not in preloaded or preloaded failed except (TypeError, ValueError, KeyError, IndexError): dbi = self.getInterface("database") try: data = dbi.database.getHistory(block, [paramName], [ts]) val = data[paramName][ts] except KeyError: runLog.error( "No value in DB. param name: {} requested index: {}" "".format(paramName, ts) ) raise return val
[docs] def _isCurrentTimeStep(self, ts: Tuple[int, int]): """Return True if the timestep requested is the current time step.""" return ts == (self.r.p.cycle, self.r.p.timeNode)
[docs] def _databaseHasDataForTimeStep(self, ts): """Return True if the database has data for the requested time step.""" dbi = self.getInterface("database") return ts in dbi.database.genTimeSteps()
[docs] def getTimeSteps(self, a=None): r""" return list of time steps values (in years) that are available. Parameters ---------- a : Assembly object, optional An assembly object designated a detail assem. If passed, only timesteps where this assembly is in the core will be tracked. Returns ------- timeSteps : list times in years that are available in the history See Also -------- getTimeIndices : gets indices where an assembly is in the core """ dbi = self.getInterface("database") timeInYears = dbi.getHistory(self.r, ["time"])["time"] # remove the time step info. Clients don't want it timeInYears = [t[1] for t in timeInYears] if a: b = self._getBlockInAssembly(a) ids = dbi.getHistory(["id"])["id"] timeInYears = [time for time, ids in zip(timeInYears, ids) if b.p.id in ids] return timeInYears
[docs] @staticmethod def _getBlockInAssembly(a): """Get a representative block from an assembly.""" b = a.getFirstBlock(Flags.FUEL) if not b: # there is a problem, it doesn't look like we have a fueled assembly # but that is all we track... what is it? Throw an error runLog.warning("Assembly {} does not contain fuel".format(a)) for b in a: runLog.warning("Block {}".format(b)) raise RuntimeError( "A tracked assembly does not contain fuel and has caused this error, see the details in stdout." ) return b
[docs] def _blockLocationAtTimenode(self, block, timeNode): """ Find block location label at a specific timenode. Warning ------- This fuction no longer functions, as it relies on implmentation details of Database version 2, which is no longer used. Retaining for historical purposes, but this should be removed soon. """ dbi = self.getInterface("database") ids = dbi.database.readBlockParam("id", timeNode) locs = dbi.database.lookupGeometry() if ids is None: return None ids = ids.tolist() try: blockIndex = ids.index(block.p.id) return locs[blockIndex] except ValueError: return None
[docs]class HistoryFile: r""" A general history file that contains the parameter history of an object. The object may be a block or assembly. This tracks them through time Originally, these files were just created by the history interface, but it became necessary to read them and post-process them for statistical needs (stats for individual assembly types) so it became an object They were typically named A234-ahist.txt or so. """
[docs]class AssemblyHistory(HistoryFile): """History report of a single assembly.""" def __repr__(self): return "<AssemHistory {0}>".format(self.assemName)
[docs] def read(self, fName): r""" Reads an assembly history file into memory. Parameters ---------- fName : str The filename to read Creates a blockStack list where each entry is a dictionary of [param,ts]=val maps """ f = textProcessors.TextProcessor(fName) timeSteps = map(int, f.f.next().split()) # first line is timestep integers _timeYears = map(float, f.f.next().split()) # second line is times in years # now there is a loop over all params blockStack = ( [] ) # will assign to block names once they are read in (at end of file) while True: # expect a line like: "key: burnup" line = f.fsearch("key:") paramName = line.split()[1] # pylint: disable=no-member if paramName == "location": operation = str else: operation = float # expect values for each timestep on the next few lines for line in f.f: line = line.strip() # read arbitrary number of blocks if ( not line or "EOL bottom" in line ): # detect axial info to(b/c we used to not have blank lines) # end on blank line break vals = map(operation, line.split()) blockVals = {} for ts, val in zip(timeSteps, vals): blockVals[paramName, ts] = val blockStack.append(blockVals) if paramName == "location": # flags the end of the params. break # skip the EOL axial information (for now) f.fsearch("Assembly info") assemblyInfoLine = next(f.f) assemblyInfo = assemblyInfoLine.split() self.assemName = assemblyInfo[0] if len(assemblyInfo) > 1: self.assemType = " ".join(assemblyInfo[1:]).lower() else: self.assemType = None blockTypes = [] for line in f.f: match = re.search(r'"(.+)"\s(\S)\s(\S)', line) if match: blockTypes.append(match.group(1)) f.f.close() self.blockStack = blockStack
[docs] def readFromArmi(self, blockName, historyInterface): r""" Loads up a working AssemblyHistory object from the history interface """ pass
[docs] def computeBounds(self): r""" Finds the min and max values of all params in this assembly history Returns ------- mins : dict Keys are param names, vals are minimum values for that param maxes : dict Keys are param names, vals are maximum values for that param """ mins = {} maxes = {} for blockVals in self.blockStack: for (paramName, _ts), val in blockVals.items(): if val < mins.get(paramName, float("inf")): mins[paramName] = val if val > maxes.get(paramName, -float("inf")): maxes[paramName] = val return mins, maxes