Source code for armi.operators.operator

# Copyright 2019 TerraPower, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The standard ARMI operator.

This builds and maintains the interface stack and loops through it for a
certain number of cycles with a certain number of timenodes per cycle.

This is analogous to a real reactor operating over some period of time,
often from initial startup, through the various cycles, and out to
the end of plant life.
"""
import collections
import os
import re
import shutil
import time
from typing import Tuple

from armi import context
from armi import interfaces
from armi import runLog
from armi.bookkeeping import memoryProfiler
from armi.bookkeeping.report import reportingUtils
from armi.operators import settingsValidation
from armi.operators.runTypes import RunTypes
from armi.physics.fuelCycle.settings import CONF_SHUFFLE_LOGIC
from armi.physics.neutronics.globalFlux.globalFluxInterface import (
    GlobalFluxInterfaceUsingExecuters,
)
from armi.settings.fwSettings.globalSettings import (
    CONF_TIGHT_COUPLING,
    CONF_TIGHT_COUPLING_MAX_ITERS,
    CONF_CYCLES_SKIP_TIGHT_COUPLING_INTERACTION,
    CONF_DEFERRED_INTERFACE_NAMES,
    CONF_DEFERRED_INTERFACES_CYCLE,
)
from armi.utils import codeTiming
from armi.utils import (
    pathTools,
    getPowerFractions,
    getAvailabilityFactors,
    getStepLengths,
    getCycleLengths,
    getBurnSteps,
    getMaxBurnSteps,
    getCycleNames,
)


[docs]class Operator: """ Orchestrate an ARMI run, building all the pieces, looping through the interfaces, and manipulating the reactor. This Operator loops over a user-input number of cycles, each with a user-input number of subcycles (called time nodes). It calls a series of interaction hooks on each of the :py:class:`~armi.interfaces.Interface` in the Interface Stack. .. figure:: /.static/armi_general_flowchart.png :align: center **Figure 1.** The computational flow of the interface hooks in a Standard Operator .. note:: The :doc:`/developer/guide` has some additional narrative on this topic. .. impl:: An operator will have a reactor object to communicate between plugins. :id: I_ARMI_OPERATOR_COMM :implements: R_ARMI_OPERATOR_COMM A major design feature of ARMI is that the Operator orchestrates the simulation, and as part of that, the Operator has access to the Reactor data model. In code, this just means the reactor object is a mandatory attribute of an instance of the Operator. But conceptually, this means that while the Operator drives the simulation of the reactor, all code has access to the same copy of the reactor data model. This is a crucial idea that allows disparate external nuclear models to interact; they interact with the ARMI reactor data model. .. impl:: An operator is built from user settings. :id: I_ARMI_OPERATOR_SETTINGS :implements: R_ARMI_OPERATOR_SETTINGS A major design feature of ARMI is that a run is built from user settings. In code, this means that a case ``Settings`` object is passed into this class to intialize an Operator. Conceptually, this means that the Operator that controls a reactor simulation is defined by user settings. Because developers can create their own settings, the user can control an ARMI simulation with arbitrary granularity in this way. In practice, settings common control things like: how many cycles a reactor is being modeled for, how many timesteps are to be modeled per time node, the verbosity of the logging during the run, and which modeling steps (such as economics) will be run. Attributes ---------- cs : Settings Global settings that define the run. cycleNames : list of str The name of each cycle. Cycles without a name are `None`. stepLengths : list of list of float A two-tiered list, where primary indices correspond to cycle and secondary indices correspond to the length of each intra-cycle step (in days). cycleLengths : list of float The duration of each individual cycle in a run (in days). This is the entire cycle, from startup to startup and includes outage time. burnSteps : list of int The number of sub-cycles in each cycle. availabilityFactors : list of float The fraction of time in a cycle that the plant is producing power. Note that capacity factor is always less than or equal to this, depending on the power fraction achieved during each cycle. Note that this is not a two-tiered list like stepLengths or powerFractions, because each cycle can have only one availabilityFactor. powerFractions : list of list of float A two-tiered list, where primary indices correspond to cycles and secondary indices correspond to the fraction of full rated capacity that the plant achieves during that step of the cycle. Zero power fraction can indicate decay-only cycles. interfaces : list The Interface objects that will operate upon the reactor """ inspector = settingsValidation.Inspector def __init__(self, cs): """ Constructor for operator. Parameters ---------- cs : Settings Global settings that define the run. Raises ------ OSError If unable to create the FAST_PATH directory. """ self.r = None self.cs = cs runLog.LOG.startLog(self.cs.caseTitle) self.timer = codeTiming.getMasterTimer() self.interfaces = [] self.restartData = [] self.loadedRestartData = [] self._cycleNames = None self._stepLengths = None self._cycleLengths = None self._burnSteps = None self._maxBurnSteps = None self._powerFractions = None self._availabilityFactors = None self._convergenceSummary = None # Create the welcome headers for the case (case, input, machine, and some basic reactor information) reportingUtils.writeWelcomeHeaders(self, cs) self._initFastPath() @property def burnSteps(self): if not self._burnSteps: self._burnSteps = getBurnSteps(self.cs) if self._burnSteps == [] and self.cs["nCycles"] == 1: # it is possible for there to be one cycle with zero burn up, # in which case burnSteps is an empty list pass else: self._checkReactorCycleAttrs({"burnSteps": self._burnSteps}) return self._burnSteps @property def maxBurnSteps(self): if not self._maxBurnSteps: self._maxBurnSteps = getMaxBurnSteps(self.cs) return self._maxBurnSteps @property def stepLengths(self): """ Calculate step lengths. .. impl:: Calculate step lengths from cycles and burn steps. :id: I_ARMI_FW_HISTORY :implements: R_ARMI_FW_HISTORY In all computational modeling of physical systems, it is necessary to break time into discrete chunks. In reactor modeling, it is common to first break the time a reactor is simulated for into the practical cycles the reactor runs. And then those cycles are broken down into smaller chunks called burn steps. The final step lengths this method returns is a two-tiered list, where primary indices correspond to the cycle and secondary indices correspond to the length of each intra-cycle step (in days). """ if not self._stepLengths: self._stepLengths = getStepLengths(self.cs) if self._stepLengths == [] and self.cs["nCycles"] == 1: # it is possible for there to be one cycle with zero burn up, # in which case stepLengths is an empty list pass else: self._checkReactorCycleAttrs({"Step lengths": self._stepLengths}) self._consistentPowerFractionsAndStepLengths() return self._stepLengths @property def cycleLengths(self): if not self._cycleLengths: self._cycleLengths = getCycleLengths(self.cs) self._checkReactorCycleAttrs({"cycleLengths": self._cycleLengths}) return self._cycleLengths @property def powerFractions(self): if not self._powerFractions: self._powerFractions = getPowerFractions(self.cs) self._checkReactorCycleAttrs({"powerFractions": self._powerFractions}) self._consistentPowerFractionsAndStepLengths() return self._powerFractions @property def availabilityFactors(self): if not self._availabilityFactors: self._availabilityFactors = getAvailabilityFactors(self.cs) self._checkReactorCycleAttrs( {"availabilityFactors": self._availabilityFactors} ) return self._availabilityFactors @property def cycleNames(self): if not self._cycleNames: self._cycleNames = getCycleNames(self.cs) self._checkReactorCycleAttrs({"Cycle names": self._cycleNames}) return self._cycleNames @staticmethod def _initFastPath(): """ Create the FAST_PATH directory for fast local operations. Notes ----- The FAST_PATH was once created at import-time in order to support modules that use FAST_PATH without operators (e.g. Database). However, we decided to leave FAST_PATH as the CWD in INTERACTIVE mode, so this should not be a problem anymore, and we can safely move FAST_PATH creation back into the Operator. If the operator is being used interactively (e.g. at a prompt) we will still use a temporary local fast path (in case the user is working on a slow network path). """ context.activateLocalFastPath() try: os.makedirs(context.getFastPath()) except OSError: # If FAST_PATH exists already that generally should be an error because # different processes will be stepping on each other. # The exception to this rule is in cases that instantiate multiple operators in one # process (e.g. unit tests that loadTestReactor). Since the FAST_PATH is set at # import, these will use the same path multiple times. We pass here for that reason. if not os.path.exists(context.getFastPath()): # if it actually doesn't exist, that's an actual error. Raise raise def _checkReactorCycleAttrs(self, attrsDict): """Check that the list has nCycles number of elements.""" for name, param in attrsDict.items(): if len(param) != self.cs["nCycles"]: raise ValueError( "The `{}` setting did not have a length consistent with the number of cycles.\n" "Expected {} value(s), but only had {} defined.\n" "Current input: {}".format( name, self.cs["nCycles"], len(param), param ) ) def _consistentPowerFractionsAndStepLengths(self): """ Check that the internally-resolved _powerFractions and _stepLengths have consistent shapes, if they both exist. """ if self._powerFractions and self._stepLengths: for cycleIdx in range(len(self._powerFractions)): if len(self._powerFractions[cycleIdx]) != len( self._stepLengths[cycleIdx] ): raise ValueError( "The number of entries in lists for subcycle power " f"fractions and sub-steps are inconsistent in cycle {cycleIdx}" ) @property def atEOL(self): """ Return whether we are approaching EOL. For the standard operator, this will return true when the current cycle is the last cycle (cs["nCycles"] - 1). Other operators may need to impose different logic. """ return self.r.p.cycle == self.cs["nCycles"] - 1
[docs] def initializeInterfaces(self, r): """ Attach the reactor to the operator and initialize all interfaces. This does not occur in `__init__` so that the ARMI operator can be initialized before a reactor is created, which is useful for summarizing the case information quickly. Parameters ---------- r : Reactor The Reactor object to attach to this Operator. """ self.r = r r.o = self # TODO: this is only necessary for fuel-handler hacking with self.timer.getTimer("Interface Creation"): self.createInterfaces() self._processInterfaceDependencies() if context.MPI_RANK == 0: runLog.header("=========== Interface Stack Summary ===========") runLog.info(reportingUtils.getInterfaceStackSummary(self)) self.interactAllInit() else: self._attachInterfaces() self._loadRestartData()
def __repr__(self): return "<{} {} {}>".format(self.__class__.__name__, self.cs["runType"], self.cs) def __enter__(self): """Context manager to enable interface-level error handling hooks.""" return self def __exit__(self, exception_type, exception_value, stacktrace): if any([exception_type, exception_value, stacktrace]): runLog.error( r"{}\n{}\{}".format(exception_type, exception_value, stacktrace) ) self.interactAllError()
[docs] def operate(self): """ Run the operation loop. See Also -------- mainOperator : run the operator loop on the primary MPI node (for parallel runs) workerOperate : run the operator loop for the worker MPI nodes """ self._mainOperate()
def _mainOperate(self): """Main loop for a standard ARMI run. Steps through time interacting with the interfaces.""" self.interactAllBOL() startingCycle = self.r.p.cycle # may be starting at t != 0 in restarts for cycle in range(startingCycle, self.cs["nCycles"]): keepGoing = self._cycleLoop(cycle, startingCycle) if not keepGoing: break self.interactAllEOL() def _cycleLoop(self, cycle, startingCycle): """Run the portion of the main loop that happens each cycle.""" self.r.p.cycleLength = self.cycleLengths[cycle] self.r.p.availabilityFactor = self.availabilityFactors[cycle] self.r.p.cycle = cycle self.r.core.p.coupledIteration = 0 if cycle == startingCycle: startingNode = self.r.p.timeNode else: startingNode = 0 self.r.p.timeNode = startingNode halt = self.interactAllBOC(self.r.p.cycle) if halt: return False # read total core power from settings (power or powerDensity) basicPower = self.cs["power"] or ( self.cs["powerDensity"] * self.r.core.getHMMass() ) for timeNode in range(startingNode, int(self.burnSteps[cycle])): self.r.core.p.power = self.powerFractions[cycle][timeNode] * basicPower self.r.p.capacityFactor = ( self.r.p.availabilityFactor * self.powerFractions[cycle][timeNode] ) self.r.p.stepLength = self.stepLengths[cycle][timeNode] self._timeNodeLoop(cycle, timeNode) else: # do one last node at the end using the same power as the previous node timeNode = self.burnSteps[cycle] if self.burnSteps[cycle] == 0: # this is a zero-burnup case powFrac = 1 else: powFrac = self.powerFractions[cycle][timeNode - 1] self.r.core.p.power = powFrac * basicPower self._timeNodeLoop(cycle, timeNode) self.interactAllEOC(self.r.p.cycle) return True def _timeNodeLoop(self, cycle, timeNode): """Run the portion of the main loop that happens each subcycle.""" self.r.p.timeNode = timeNode self.interactAllEveryNode(cycle, timeNode) self._performTightCoupling(cycle, timeNode) def _performTightCoupling(self, cycle: int, timeNode: int, writeDB: bool = True): """If requested, perform tight coupling and write out database. Notes ----- writeDB is False for OperatorSnapshots as the DB gets written at EOL. """ if not self.couplingIsActive(): # no coupling was requested return skipCycles = tuple( int(val) for val in self.cs[CONF_CYCLES_SKIP_TIGHT_COUPLING_INTERACTION] ) if cycle in skipCycles: runLog.warning( f"interactAllCoupled disabled this cycle ({self.r.p.cycle}) due to " "`cyclesSkipTightCouplingInteraction` setting." ) else: self._convergenceSummary = collections.defaultdict(list) for coupledIteration in range(self.cs[CONF_TIGHT_COUPLING_MAX_ITERS]): self.r.core.p.coupledIteration = coupledIteration + 1 converged = self.interactAllCoupled(coupledIteration) if converged: runLog.important( f"Tight coupling iterations for c{cycle:02d}n{timeNode:02d} have converged!" ) break if not converged: runLog.warning( f"Tight coupling iterations for c{cycle:02d}n{timeNode:02d} have not converged!" f" The maximum number of iterations, {self.cs[CONF_TIGHT_COUPLING_MAX_ITERS]}, was reached." ) if writeDB: # database has not yet been written, so we need to write it. dbi = self.getInterface("database") dbi.writeDBEveryNode(cycle, timeNode) def _interactAll(self, interactionName, activeInterfaces, *args): """ Loop over the supplied activeInterfaces and perform the supplied interaction on each. Notes ----- This is the base method for the other ``interactAll`` methods. """ interactMethodName = "interact{}".format(interactionName) printMemUsage = self.cs["verbosity"] == "debug" and self.cs["debugMem"] if self.cs["debugDB"]: self._debugDB(interactionName, "start", 0) halt = False cycleNodeTag = self._expandCycleAndTimeNodeArgs( *args, interactionName=interactionName ) runLog.header( "=========== Triggering {} Event ===========".format( interactionName + cycleNodeTag ) ) for statePointIndex, interface in enumerate(activeInterfaces, start=1): self.printInterfaceSummary( interface, interactionName, statePointIndex, *args ) # maybe make this a context manager if printMemUsage: memBefore = memoryProfiler.PrintSystemMemoryUsageAction() memBefore.broadcast() memBefore.invoke(self, self.r, self.cs) interactionMessage = " {} interacting with {} ".format( interactionName, interface.name ) with self.timer.getTimer(interactionMessage): interactMethod = getattr(interface, interactMethodName) halt = halt or interactMethod(*args) if self.cs["debugDB"]: self._debugDB(interactionName, interface.name, statePointIndex) if printMemUsage: memAfter = memoryProfiler.PrintSystemMemoryUsageAction() memAfter.broadcast() memAfter.invoke(self, self.r, self.cs) memAfter -= memBefore memAfter.printUsage( "after {:25s} {:15s} interaction".format( interface.name, interactionName ) ) runLog.header( "=========== Completed {} Event ===========\n".format( interactionName + cycleNodeTag ) ) return halt
[docs] def printInterfaceSummary(self, interface, interactionName, statePointIndex, *args): """ Log which interaction point is about to be executed. This looks better as multiple lines but it's a lot easier to grep as one line. We leverage newlines instead of long banners to save disk space. """ nodeInfo = self._expandCycleAndTimeNodeArgs( *args, interactionName=interactionName ) line = "=========== {:02d} - {:30s} {:15s} ===========".format( statePointIndex, interface.name, interactionName + nodeInfo ) runLog.header(line)
@staticmethod def _expandCycleAndTimeNodeArgs(*args, interactionName): """Return text annotating information for current run event. Notes ----- - Init, BOL, EOL: empty - Everynode: (cycle, time node) - BOC: cycle number - Coupling: iteration number """ cycleNodeInfo = "" if args: if len(args) == 1: if interactionName == "Coupled": cycleNodeInfo = f" - iteration {args[0]}" elif interactionName in ("BOC", "EOC"): cycleNodeInfo = f" - cycle {args[0]}" else: cycleNodeInfo = f" - cycle {args[0]}, node {args[1]}" return cycleNodeInfo def _debugDB(self, interactionName, interfaceName, statePointIndex=0): """ Write state to DB with a unique "statePointName", or label. Notes ----- Used within _interactAll to write details between each physics interaction when cs['debugDB'] is enabled. Parameters ---------- interactionName : str name of the interaction (e.g. BOL, BOC, EveryNode) interfaceName : str name of the interface that is interacting (e.g. globalflux, lattice, th) statePointIndex : int (optional) used as a counter to make labels that increment throughout an _interactAll call. The result should be fed into the next call to ensure labels increment. """ dbiForDetailedWrite = self.getInterface("database") db = dbiForDetailedWrite.database if dbiForDetailedWrite is not None else None if db is not None and db.isOpen(): # looks something like "c00t00-BOL-01-main" statePointName = "c{:2<0}t{:2<0}-{}-{:2<0}-{}".format( self.r.p.cycle, self.r.p.timeNode, interactionName, statePointIndex, interfaceName, ) db.writeToDB(self.r, statePointName=statePointName)
[docs] def interactAllInit(self): """Call interactInit on all interfaces in the stack after they are initialized.""" self._interactAll("Init", self.getInterfaces())
[docs] def interactAllBOL(self, excludedInterfaceNames=()): """ Call interactBOL for all interfaces in the interface stack at beginning-of-life. All enabled or bolForce interfaces will be called excluding interfaces with excludedInterfaceNames. """ activeInterfaces = self.getActiveInterfaces("BOL", excludedInterfaceNames) self._interactAll("BOL", activeInterfaces)
[docs] def interactAllBOC(self, cycle): """Interact at beginning of cycle of all enabled interfaces.""" activeInterfaces = self.getActiveInterfaces("BOC", cycle=cycle) return self._interactAll("BOC", activeInterfaces, cycle)
[docs] def interactAllEveryNode(self, cycle, tn, excludedInterfaceNames=()): """ Call the interactEveryNode hook for all enabled interfaces. All enabled interfaces will be called excluding interfaces with excludedInterfaceNames. Parameters ---------- cycle : int The cycle that is currently being run. Starts at 0 tn : int The time node that is currently being run (0 for BOC, etc.) excludedInterfaceNames : list, optional Names of interface names that will not be interacted with. """ activeInterfaces = self.getActiveInterfaces("EveryNode", excludedInterfaceNames) self._interactAll("EveryNode", activeInterfaces, cycle, tn)
[docs] def interactAllEOC(self, cycle, excludedInterfaceNames=()): """Interact end of cycle for all enabled interfaces.""" activeInterfaces = self.getActiveInterfaces("EOC", excludedInterfaceNames) self._interactAll("EOC", activeInterfaces, cycle)
[docs] def interactAllEOL(self): """ Run interactEOL for all enabled interfaces. Notes ----- If the interfaces are flagged to be reversed at EOL, they are separated from the main stack and appended at the end in reverse order. This allows, for example, an interface that must run first to also run last. """ activeInterfaces = self.getActiveInterfaces("EOL") self._interactAll("EOL", activeInterfaces)
[docs] def interactAllCoupled(self, coupledIteration): """ Run all interfaces that are involved in tight physics coupling. .. impl:: Physics coupling is driven from Operator. :id: I_ARMI_OPERATOR_PHYSICS1 :implements: R_ARMI_OPERATOR_PHYSICS This method runs all the interfaces that are defined as part of the tight physics coupling of the reactor. Then it returns if the coupling has converged or not. Tight coupling implies the operator has split iterations between two or more physics solvers at the same solution point in simulated time. For example, a flux solution might be computed, then a temperature solution, and then another flux solution based on updated temperatures (which updates densities, dimensions, and Doppler). This is distinct from loose coupling, which simply uses the temperature values from the previous timestep in the current flux solution. It's also distinct from full coupling where all fields are solved simultaneously. ARMI supports tight and loose coupling. """ activeInterfaces = self.getActiveInterfaces("Coupled") # Store the previous iteration values before calling interactAllCoupled # for each interface. for interface in activeInterfaces: if interface.coupler is not None: interface.coupler.storePreviousIterationValue( interface.getTightCouplingValue() ) self._interactAll("Coupled", activeInterfaces, coupledIteration) return self._checkTightCouplingConvergence(activeInterfaces)
def _checkTightCouplingConvergence(self, activeInterfaces: list): """Check if interfaces are converged. Parameters ---------- activeInterfaces : list the list of active interfaces on the operator Notes ----- This is split off from self.interactAllCoupled to accomodate testing """ # Summarize the coupled results and the convergence status. converged = [] for interface in activeInterfaces: coupler = interface.coupler if coupler is not None: key = f"{interface.name}: {coupler.parameter}" converged.append(coupler.isConverged(interface.getTightCouplingValue())) self._convergenceSummary[key].append(coupler.eps) reportingUtils.writeTightCouplingConvergenceSummary(self._convergenceSummary) return all(converged)
[docs] def interactAllError(self): """Interact when an error is raised by any other interface. Provides a wrap-up option on the way to a crash.""" for i in self.interfaces: runLog.extra("Error-interacting with {0}".format(i.name)) i.interactError()
[docs] def createInterfaces(self): """ Dynamically discover all available interfaces and call their factories, potentially adding them to the stack. An operator contains an ordered list of interfaces. These communicate between the core ARMI structure and auxiliary computational modules and/or external codes. At specified interaction points in a run, the list of interfaces is executed. Each interface optionally defines interaction "hooks" for each of the interaction points. The normal interaction points are BOL, BOC, every node, EOC, and EOL. If an interface defines an interactBOL method, that will run at BOL, and so on. The majority of ARMI capabilities lie within interfaces, and this architecture provides much of the flexibility of ARMI. See Also -------- addInterface : Adds a particular interface to the interface stack. armi.interfaces.STACK_ORDER : A system to determine the required order of interfaces. armi.interfaces.getActiveInterfaceInfo : Collects the interface classes from relevant packages. """ runLog.header("=========== Creating Interfaces ===========") interfaceList = interfaces.getActiveInterfaceInfo(self.cs) for klass, kwargs in interfaceList: self.addInterface(klass(self.r, self.cs), **kwargs)
[docs] def addInterface( self, interface, index=None, reverseAtEOL=False, enabled=True, bolForce=False, ): """ Attach an interface to this operator. Notes ----- Order matters. Parameters ---------- interface : Interface the interface to add index : int, optional. Will insert the interface at this index rather than appending it to the end of the list reverseAtEOL : bool, optional. The interactEOL hooks will run in reverse order if True. All interfaces with this flag will be run as a group after all other interfaces. This allows something to run first at BOL and last at EOL, etc. enabled : bool, optional If enabled, will run at all hooks. If not, won't run any (with possible exception at BOL, see bolForce). Whenever possible, Interfaces that are needed during runtime for some peripheral operation but not during the main loop should be instantiated by the part of the code that actually needs the interface. bolForce: bool, optional If true, will run at BOL hook even if disabled. This is often a sign that the interface in question should be ephemerally instantiated on demand rather than added to the interface stack at all. Raises ------ RuntimeError If an interface of the same name or function is already attached to the Operator. """ if self.getInterface(interface.name): raise RuntimeError( "An interface with name {0} is already attached.".format(interface.name) ) iFunc = self.getInterface(function=interface.function) if iFunc: if issubclass(type(iFunc), type(interface)): runLog.info( "Ignoring Interface {newFunc} because existing interface {old} already " " more specific".format(newFunc=interface, old=iFunc) ) return elif issubclass(type(interface), type(iFunc)): self.removeInterface(iFunc) runLog.info( "Will Insert Interface {newFunc} because it is a subclass of {old} interface and " " more derived".format(newFunc=interface, old=iFunc) ) else: raise RuntimeError( "Cannot add {0}; the {1} already is designated " "as the {2} interface. Multiple interfaces of the same " "function is not supported.".format( interface, iFunc, interface.function ) ) runLog.debug("Adding {0}".format(interface)) if index is None: self.interfaces.append(interface) else: self.interfaces.insert(index, interface) if reverseAtEOL: interface.reverseAtEOL = True if not enabled: interface.enabled(False) interface.bolForce(bolForce) interface.attachReactor(self, self.r)
def _processInterfaceDependencies(self): """ Check all interfaces' dependencies and adds missing ones. Notes ----- Order does not matter here because the interfaces added here are disabled and playing supporting role so it is not intended to run on the interface stack. They will be called by other interfaces. As mentioned in :py:meth:`addInterface`, it may be better to just insantiate utility code when its needed rather than rely on this system. """ # Make multiple passes in case there's one added that depends on another. for _dependencyPass in range(5): numInterfaces = len(self.interfaces) # manipulation friendly, so it's ok to add additional things to the stack for i in self.getInterfaces(): for dependency in i.getDependencies(self.cs): name = dependency.name function = dependency.function klass = dependency if not self.getInterface(name, function=function): runLog.extra( "Attaching {} interface (disabled, BOL forced) due to dependency in {}".format( klass.name, i.name ) ) self.addInterface( klass(r=self.r, cs=self.cs), enabled=False, bolForce=True ) if len(self.interfaces) == numInterfaces: break else: raise RuntimeError("Interface dependency resolution did not converge.")
[docs] def removeAllInterfaces(self): """Removes all of the interfaces.""" for interface in self.interfaces: interface.detachReactor() self.interfaces = []
[docs] def removeInterface(self, interface=None, interfaceName=None): """ Remove a single interface from the interface stack. Parameters ---------- interface : Interface, optional An actual interface object to remove. interfaceName : str, optional The name of the interface to remove. Returns ------- success : boolean True if the interface was removed False if it was not (because it wasn't there to be removed) """ if interfaceName: interface = self.getInterface(interfaceName) if interface and interface in self.interfaces: self.interfaces.remove(interface) interface.detachReactor() return True else: runLog.warning( "Cannot remove interface {0} because it is not in the interface stack.".format( interface ) ) return False
[docs] def getInterface(self, name=None, function=None): """ Returns a specific interface from the stack by its name or more generic function. Parameters ---------- name : str, optional Interface name function : str Interface function (general, like 'globalFlux','th',etc.). This is useful when you need the ___ solver (e.g. globalFlux) but don't care which particular one is active (e.g. SERPENT vs. DIF3D) Raises ------ RuntimeError If there are more than one interfaces of the given name or function. """ candidateI = None for i in self.interfaces: if (name and i.name == name) or (function and i.function == function): if candidateI is None: candidateI = i else: raise RuntimeError( "Cannot retrieve a single interface as there are multiple " "interfaces with name {} or function {} attached. ".format( name, function ) ) return candidateI
[docs] def interfaceIsActive(self, name): """True if named interface exists and is enabled. Notes ----- This logic is significantly simpler that getActiveInterfaces. This logic only touches the enabled() flag, but doesn't take into account the case settings. """ i = self.getInterface(name) return i and i.enabled()
[docs] def getInterfaces(self): """ Get list of interfaces in interface stack. .. impl:: An operator will expose an ordered list of interfaces. :id: I_ARMI_OPERATOR_INTERFACES :implements: R_ARMI_OPERATOR_INTERFACES This method returns an ordered list of instances of the Interface class. This list is useful because at any time node in the reactor simulation, these interfaces will be called in sequence to perform various types of calculations. It is important to note that this Operator instance has a list of Plugins, and each of those Plugins potentially defines multiple Interfaces. And these Interfaces define their own order, separate from the ordering of the Plugins. Notes ----- Returns a copy so you can manipulate the list in an interface, like dependencies. """ return self.interfaces[:]
[docs] def getActiveInterfaces( self, interactState: str, excludedInterfaceNames: Tuple[str] = (), cycle: int = 0, ): """Retrieve the interfaces which are active for a given interaction state. Parameters ---------- interactState: str A string dictating which interaction state the interfaces should be pulled for. excludedInterfaceNames: Tuple[str] A tuple of strings dictating which interfaces should be manually skipped. cycle: int The given cycle. 0 by default. Returns ------- activeInterfaces: List[Interfaces] The interfaces deemed active for the given interactState. """ # Validate the inputs if excludedInterfaceNames is None: excludedInterfaceNames = () if interactState not in ("BOL", "BOC", "EveryNode", "EOC", "EOL", "Coupled"): raise ValueError(f"{interactState} is an unknown interaction state!") # Ensure the interface is enabled. enabled = lambda i: i.enabled() if interactState == "BOL": enabled = lambda i: i.enabled() or i.bolForce() # Ensure the name of the interface isn't in some exclusion list. nameCheck = lambda i: True if interactState == "EveryNode" or interactState == "EOC": nameCheck = lambda i: i.name not in excludedInterfaceNames elif interactState == "BOC" and cycle < self.cs[CONF_DEFERRED_INTERFACES_CYCLE]: nameCheck = lambda i: i.name not in self.cs[CONF_DEFERRED_INTERFACE_NAMES] elif interactState == "BOL": nameCheck = ( lambda i: i.name not in self.cs[CONF_DEFERRED_INTERFACE_NAMES] and i.name not in excludedInterfaceNames ) # Finally, find the active interfaces. activeInterfaces = [i for i in self.interfaces if enabled(i) and nameCheck(i)] # Special Case: At EOL we reverse the order of some interfaces. if interactState == "EOL": actInts = [ii for ii in activeInterfaces if not ii.reverseAtEOL] actInts.extend(reversed([ii for ii in activeInterfaces if ii.reverseAtEOL])) activeInterfaces = actInts return activeInterfaces
[docs] def reattach(self, r, cs=None): """Add links to globally-shared objects to this operator and all interfaces. Notes ----- Could be a good opportunity for weakrefs. """ self.r = r self.r.o = self if cs is not None: self.cs = cs for i in self.interfaces: i.r = r i.o = self if cs is not None: i.cs = cs
[docs] def detach(self): """ Break links to globally-shared objects to this operator and all interfaces. May be required prior to copying these objects over the network. Notes ----- Could be a good opportunity for weakrefs. """ if self.r: self.r.o = None self.r = None for i in self.interfaces: i.o = None i.r = None i.cs = None
def _attachInterfaces(self): """ Links all the interfaces in the interface stack to the operator, reactor, and cs. See Also -------- createInterfaces : creates all interfaces addInterface : adds a single interface to the stack """ for i in self.interfaces: i.attachReactor(self, self.r) def _loadRestartData(self): """ Read a restart.dat file which contains all the fuel management factorLists and cycle lengths. Notes ----- This allows the ARMI to do the same shuffles that it did last time, assuming fuel management logic has not changed. Note, it would be better if the moves were just read from a table in the database. """ restartName = self.cs.caseTitle + ".restart.dat" if not os.path.exists(restartName): return else: runLog.info("Loading restart data from {}".format(restartName)) with open(restartName, "r") as restart: for line in restart: match = re.search( r"cycle=(\d+)\s+time=(\d+\.\d+[Ee+-]+\d+)\s+factorList=[\[\{](.+?)[\]\}]", line, ) if match: newStyle = re.findall(r"'(\w+)':\s*(\d*\.?\d*)", line) if newStyle: # key-based factorList. load a dictionary. factorList = {} for key, val in newStyle: factorList[key] = float(val) else: # list based factorList. Load a list. (old style, backward compat) try: factorList = [ float(item) for item in match.group(3).split(",") ] except ValueError: factorList = match.group(3).split(",") runLog.debug( "loaded restart data for cycle %d" % float(match.group(1)) ) self.restartData.append( (float(match.group(1)), float(match.group(2)), factorList) ) runLog.info("loaded restart data for {0} cycles".format(len(self.restartData)))
[docs] def loadState( self, cycle, timeNode, timeStepName="", fileName=None, updateMassFractions=None ): """ Convenience method reroute to the database interface state reload method. See Also -------- armi.bookeeping.db.loadOperator: A method for loading an operator given a database. loadOperator does not require an operator prior to loading the state of the reactor. loadState does, and therefore armi.init must be called which requires access to the blueprints, settings, and geometry files. These files are stored implicitly on the database, so loadOperator creates the reactor first, and then attaches it to the operator. loadState should be used if you are in the middle of an ARMI calculation and need load a different time step. If you are loading from a fresh ARMI session, either method is sufficient if you have access to all the input files. """ dbi = self.getInterface("database") if not dbi: raise RuntimeError("Cannot load from snapshot without a database interface") if updateMassFractions is not None: runLog.warning( "deprecated: updateMassFractions is no longer a valid option for loadState" ) dbi.loadState(cycle, timeNode, timeStepName, fileName)
[docs] def snapshotRequest(self, cycle, node, iteration=None): """ Process a snapshot request at this time. This copies various physics input and output files to a special folder that follow-on analysis be executed upon later. Notes ----- This was originally used to produce MC2/DIF3D inputs for external parties (who didn't have ARMI) to review. Since then, the concept of snapshots has evolved with respect to the :py:class:`~armi.operators.snapshots.OperatorSnapshots`. """ from armi.physics.neutronics.settings import CONF_LOADING_FILE runLog.info("Producing snapshot for cycle {0} node {1}".format(cycle, node)) self.r.core.zones.summary() newFolder = "snapShot{0}_{1}".format(cycle, node) if os.path.exists(newFolder): runLog.important("Deleting existing snapshot data in {0}".format(newFolder)) pathTools.cleanPath(newFolder) # careful with cleanPath! # give it a minute. time.sleep(1) if os.path.exists(newFolder): runLog.warning( "Deleting existing snapshot data in {0} failed".format(newFolder) ) else: os.mkdir(newFolder) # Moving the cross section files is to a snapshot directory is a reasonable # requirement, but these hard-coded names are not desirable. This is legacy # and should be updated to be more robust for users. for fileName in os.listdir("."): if "mcc" in fileName and re.search(r"[A-Z]AF?\d?.inp", fileName): base, ext = os.path.splitext(fileName) if iteration is not None: newFile = "{0}_{1:03d}_{2:d}_{4}{3}".format( base, cycle, node, ext, iteration ) else: newFile = "{0}_{1:03d}_{2:d}{3}".format(base, cycle, node, ext) # add the cycle and timenode to the XS input file names so that a rx-coeff case that runs # in here won't overwrite them. shutil.copy(fileName, os.path.join(newFolder, newFile)) if "rzmflx" in fileName: pathTools.copyOrWarn("rzmflx for snapshot", fileName, newFolder) fileNamePossibilities = [ f"ISOTXS-c{cycle}n{node}", f"ISOTXS-c{cycle}", ] if iteration is not None: fileNamePossibilities = [ f"ISOTXS-c{cycle}n{node}i{iteration}" ] + fileNamePossibilities for isoFName in fileNamePossibilities: if os.path.exists(isoFName): break pathTools.copyOrWarn( "ISOTXS for snapshot", isoFName, pathTools.armiAbsPath(newFolder, "ISOTXS") ) globalFluxLabel = GlobalFluxInterfaceUsingExecuters.getLabel( self.cs.caseTitle, cycle, node, iteration ) globalFluxInput = globalFluxLabel + ".inp" globalFluxOutput = globalFluxLabel + ".out" pathTools.copyOrWarn("DIF3D input for snapshot", globalFluxInput, newFolder) pathTools.copyOrWarn("DIF3D output for snapshot", globalFluxOutput, newFolder) pathTools.copyOrWarn( "Shuffle logic for snapshot", self.cs[CONF_SHUFFLE_LOGIC], newFolder ) pathTools.copyOrWarn( "Geometry file for snapshot", self.cs["geomFile"], newFolder ) pathTools.copyOrWarn( "Loading definition for snapshot", self.cs[CONF_LOADING_FILE], newFolder ) pathTools.copyOrWarn( "Flow history for snapshot", self.cs.caseTitle + ".flow_history.txt", newFolder, ) pathTools.copyOrWarn( "Pressure history for snapshot", self.cs.caseTitle + ".pressure_history.txt", newFolder, )
[docs] @staticmethod def setStateToDefault(cs): """Update the state of ARMI to fit the kind of run this operator manages.""" return cs.modified(newSettings={"runType": RunTypes.STANDARD})
[docs] def couplingIsActive(self): """True if any kind of physics coupling is active.""" return self.cs[CONF_TIGHT_COUPLING]