# Copyright 2019 TerraPower, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
ARMI Database implementation, version 3.
This Implementation of the database is a significant departure from the previous. One of
the foundational concepts in this version is that a reactor model should be fully
recoverable from the database itself; all the way down to the component level. As a
result, the structure of the underlying data is bound to the hierarchical Composite
Reactor Model, rather than an ad hoc collection of Block parameter fields and other
parameters. Furthermore, this format is intended to be more dynamic, permitting as-yet
undeveloped levels and classes in the Composite Reactor Model to be supported as they
are added. More high-level discussion is contained in :doc:`/user/outputs/database`.
The most important contents of this module are the :py:class:`DatabaseInterface`, the
:py:class:`Database3` class, the :py:class:`Layout` class, and the special data
packing/unpacking functions. The ``Database3`` class contains most of the functionality
for interacting with the underlying data. This includes things like dumping a Reactor
state to the database and loading it back again, as well as extracting historical data
for a given object or collection of object from the database file. When interacting with
the database file, the ``Layout`` class is used to help map the hierarchical Composite
Reactor Model to the flat representation in the database.
Refer to :py:mod:`armi.bookkeeping.db` for notes about versioning.
Minor revision changelog
------------------------
- 3.1: Improve the handling of reading/writing grids.
- 3.2: Change the strategy for storing large attributes from using an Object Reference
to an external dataset to using a special string starting with an "@" symbol (e.g.,
"@/c00n00/attrs/5_linkedDims"). This was done to support copying time node datasets
from one file to another without invalidating the references. Support is maintained
for reading previous versions, and for performing a ``mergeHistory()`` and converting
to the new reference strategy, but the old version cannot be written.
- 3.3: Compress the way locations are stored in the database and allow MultiIndex
locations to be read and written.
- 3.4: Modified the way that locations are stored in the database to include complete
indices for indices that can be composed from multiple grids. This was done since the
space is already being used to be able to store them, and because having complete
indices allows for more efficient means of extracting information based on location
without having to compose the full model.
"""
import collections
import copy
import io
import itertools
import os
import pathlib
import re
import shutil
import subprocess
import sys
import time
from platform import uname
from typing import (
Optional,
Tuple,
Type,
Dict,
Any,
List,
Sequence,
MutableSequence,
Generator,
)
import h5py
import numpy
from armi import context
from armi import getApp
from armi import interfaces
from armi import meta
from armi import runLog
from armi import settings
from armi.reactor import parameters
from armi.reactor.parameters import parameterCollections
from armi.reactor.parameters import parameterDefinitions
from armi.reactor.flags import Flags
from armi.reactor.reactors import Reactor, Core
from armi.reactor import assemblies
from armi.reactor.assemblies import Assembly
from armi.reactor.blocks import Block
from armi.reactor.components import Component
from armi.reactor.composites import ArmiObject
from armi.reactor import grids
from armi.bookkeeping.db.typedefs import History, Histories
from armi.reactor import systemLayoutInput
from armi.utils import getPreviousTimeNode, getStepLengths
from armi.utils.textProcessors import resolveMarkupInclusions
from armi.nucDirectory import nuclideBases
from armi.settings.fwSettings.databaseSettings import (
CONF_SYNC_AFTER_WRITE,
CONF_FORCE_DB_PARAMS,
)
ORDER = interfaces.STACK_ORDER.BOOKKEEPING
DB_MAJOR = 3
DB_MINOR = 4
DB_VERSION = f"{DB_MAJOR}.{DB_MINOR}"
ATTR_LINK = re.compile("^@(.*)$")
_SERIALIZER_NAME = "serializerName"
_SERIALIZER_VERSION = "serializerVersion"
LOC_NONE = "N"
LOC_COORD = "C"
LOC_INDEX = "I"
LOC_MULTI = "M:"
LOCATION_TYPE_LABELS = {
type(None): LOC_NONE,
grids.CoordinateLocation: LOC_COORD,
grids.IndexLocation: LOC_INDEX,
grids.MultiIndexLocation: LOC_MULTI,
}
[docs]def getH5GroupName(cycle, timeNode, statePointName=None):
return "c{:0>2}n{:0>2}{}".format(cycle, timeNode, statePointName or "")
[docs]def describeInterfaces(cs):
"""Function for exposing interface(s) to other code"""
return (DatabaseInterface, {"enabled": cs["db"]})
[docs]def updateGlobalAssemblyNum(r):
assemNum = r.core.p.maxAssemNum
if assemNum is not None:
assemblies.setAssemNumCounter(int(assemNum + 1))
else:
raise ValueError("Could not load maxAssemNum from the database")
[docs]class DatabaseInterface(interfaces.Interface):
"""
Handles interactions between the ARMI data model and the persistent data storage
system.
This reads/writes the ARMI state to/from the database and helps derive state
information that can be derived.
"""
name = "database"
def __init__(self, r, cs):
interfaces.Interface.__init__(self, r, cs)
self._db = None
self._dbPath: Optional[pathlib.Path] = None
if cs[CONF_FORCE_DB_PARAMS]:
toSet = {paramName: set() for paramName in cs[CONF_FORCE_DB_PARAMS]}
for (name, _), pDef in parameterDefinitions.ALL_DEFINITIONS.items():
if name in toSet.keys():
toSet[name].add(pDef)
for name, pDefs in toSet.items():
runLog.info(
"Forcing parameter {} to be written to the database, per user "
"input".format(name)
)
for pDef in pDefs:
pDef.saveToDB = True
def __repr__(self):
return "<{} '{}' {} >".format(
self.__class__.__name__, self.name, repr(self._db)
)
@property
def database(self):
"""
Presents the internal database object, if it exists.
"""
if self._db is not None:
return self._db
else:
raise RuntimeError(
"The Database interface has not yet created a database "
"object. InteractBOL or loadState must be called first."
)
[docs] def interactBOL(self):
"""Initialize the database if the main interface was not available. (Begining of Life)"""
if not self._db:
self.initDB()
[docs] def initDB(self, fName: Optional[os.PathLike] = None):
"""
Open the underlying database to be written to, and write input files to DB.
Notes
-----
Main Interface calls this so that the database is available as early as
possible in the run. The database interface interacts near the end of the
interface stack (so that all the parameters have been updated) while the Main
Interface interacts first.
"""
if fName is None:
self._dbPath = pathlib.Path(self.cs.caseTitle + ".h5")
else:
self._dbPath = pathlib.Path(fName)
if self.cs["reloadDBName"].lower() == str(self._dbPath).lower():
raise ValueError(
"It appears that reloadDBName is the same as the case "
"title. This could lead to data loss! Rename the reload DB or the "
"case."
)
self._db = Database3(self._dbPath, "w")
self._db.open()
# Grab geomString here because the DB-level has no access to the reactor or
# blueprints or anything.
# There's not always a geomFile; we are moving towards the core grid definition
# living in the blueprints themselves. In this case, the db doesnt need to store
# a geomFile at all.
if self.cs["geomFile"]:
with open(os.path.join(self.cs.inputDirectory, self.cs["geomFile"])) as f:
geomString = f.read()
else:
geomString = ""
self._db.writeInputsToDB(self.cs, geomString=geomString)
[docs] def interactEveryNode(self, cycle, node):
"""
Write to database.
DBs should receive the state information of the run at each node.
"""
# skip writing for last burn step since it will be written at interact EOC
if node < self.o.burnSteps[cycle]:
self.r.core.p.minutesSinceStart = (
time.time() - self.r.core.timeOfStart
) / 60.0
self._db.writeToDB(self.r)
if self.cs[CONF_SYNC_AFTER_WRITE]:
self._db.syncToSharedFolder()
[docs] def interactEOC(self, cycle=None):
"""In case anything changed since last cycle (e.g. rxSwing), update DB. (End of Cycle)"""
# We cannot presume whether we are at EOL based on cycle and cs["nCycles"],
# since cs["nCycles"] is not a difinitive indicator of EOL; ultimately the
# Operator has the final say.
if not self.o.atEOL:
self.r.core.p.minutesSinceStart = (
time.time() - self.r.core.timeOfStart
) / 60.0
self._db.writeToDB(self.r)
[docs] def interactEOL(self):
"""DB's should be closed at run's end. (End of Life)"""
# minutesSinceStarts should include as much of the ARMI run as possible so EOL
# is necessary, too.
self.r.core.p.minutesSinceStart = (time.time() - self.r.core.timeOfStart) / 60.0
self._db.writeToDB(self.r)
self._db.close(True)
[docs] def interactError(self):
r"""Get shutdown state information even if the run encounters an error"""
try:
self.r.core.p.minutesSinceStart = (
time.time() - self.r.core.timeOfStart
) / 60.0
# this can result in a double-error if the error occurred in the database
# writing
self._db.writeToDB(self.r, "error")
self._db.close(False)
except: # pylint: disable=bare-except; we're already responding to an error
pass
[docs] def interactDistributeState(self) -> None:
"""
Reconnect to pre-existing database.
DB is created and managed by the primary node only but we can still connect to it
from workers to enable things like history tracking.
"""
if context.MPI_RANK > 0:
# DB may not exist if distribute state is called early.
if self._dbPath is not None and os.path.exists(self._dbPath):
self._db = Database3(self._dbPath, "r")
self._db.open()
[docs] def distributable(self):
return self.Distribute.SKIP
[docs] def prepRestartRun(self):
"""
Load the data history from the database requested in the case setting
`reloadDBName`.
Reactor state is put at the cycle/node requested in the case settings
`startCycle` and `startNode`, having loaded the state from all cycles prior
to that in the requested database.
Notes
-----
Mixing the use of simple vs detailed cycles settings is allowed, provided
that the cycle histories prior to `startCycle`/`startNode` are equivalent.
"""
reloadDBName = self.cs["reloadDBName"]
runLog.info(
f"Merging database history from {reloadDBName} for restart analysis."
)
startCycle = self.cs["startCycle"]
startNode = self.cs["startNode"]
with Database3(reloadDBName, "r") as inputDB:
loadDbCs = inputDB.loadCS()
# pull the history up to the cycle/node prior to `startCycle`/`startNode`
dbCycle, dbNode = getPreviousTimeNode(
startCycle,
startNode,
self.cs,
)
# check that cycle histories are equivalent up to this point
self._checkThatCyclesHistoriesAreEquivalentUpToRestartTime(
loadDbCs, dbCycle, dbNode
)
self._db.mergeHistory(inputDB, startCycle, startNode)
self.loadState(dbCycle, dbNode)
def _checkThatCyclesHistoriesAreEquivalentUpToRestartTime(
self, loadDbCs, dbCycle, dbNode
):
dbStepLengths = getStepLengths(loadDbCs)
currentCaseStepLengths = getStepLengths(self.cs)
dbStepHistory = []
currentCaseStepHistory = []
try:
for cycleIdx in range(dbCycle + 1):
if cycleIdx == dbCycle:
# truncate it at dbNode
dbStepHistory.append(dbStepLengths[cycleIdx][:dbNode])
currentCaseStepHistory.append(
currentCaseStepLengths[cycleIdx][:dbNode]
)
else:
dbStepHistory.append(dbStepLengths[cycleIdx])
currentCaseStepHistory.append(currentCaseStepLengths[cycleIdx])
except IndexError:
runLog.error(
f"DB cannot be loaded to this time: cycle={dbCycle}, node={dbNode}"
)
raise
if dbStepHistory != currentCaseStepHistory:
raise ValueError(
"The cycle history up to the restart cycle/node must be equivalent."
)
# TODO: The use of "yield" here is suspect.
def _getLoadDB(self, fileName):
"""
Return the database to load from in order of preference.
Notes
-----
If filename is present only returns one database since specifically instructed
to load from that database.
"""
if fileName is not None:
# only yield 1 database if the file name is specified
if self._db is not None and fileName == self._db._fileName:
yield self._db
elif os.path.exists(fileName):
yield Database3(fileName, "r")
else:
if self._db is not None:
yield self._db
if os.path.exists(self.cs["reloadDBName"]):
yield Database3(self.cs["reloadDBName"], "r")
[docs] def loadState(
self, cycle, timeNode, timeStepName="", fileName=None, updateGlobalAssemNum=True
):
"""
Loads a fresh reactor and applies it to the Operator.
Notes
-----
Will load preferentially from the `fileName` if passed. Otherwise will load from
existing database in memory or `cs["reloadDBName"]` in that order.
Raises
------
RuntimeError
If fileName is specified and that file does not have the time step.
If fileName is not specified and neither the database in memory, nor the
`cs["reloadDBName"]` have the time step specified.
"""
for potentialDatabase in self._getLoadDB(fileName):
with potentialDatabase as loadDB:
if loadDB.hasTimeStep(cycle, timeNode, statePointName=timeStepName):
newR = loadDB.load(
cycle,
timeNode,
statePointName=timeStepName,
cs=self.cs,
allowMissing=True,
updateGlobalAssemNum=updateGlobalAssemNum,
)
self.o.reattach(newR, self.cs)
break
else:
# reactor was never set so fail
if fileName:
raise RuntimeError(
"Cannot load state from specified file {} @ {}".format(
fileName, getH5GroupName(cycle, timeNode, timeStepName)
)
)
raise RuntimeError(
"Cannot load state from <unspecified file> @ {}".format(
getH5GroupName(cycle, timeNode, timeStepName)
)
)
[docs] def getHistory(
self,
comp: ArmiObject,
params: Optional[Sequence[str]] = None,
timeSteps: Optional[MutableSequence[Tuple[int, int]]] = None,
byLocation: bool = False,
) -> History:
"""
Get historical parameter values for a single object.
This is mostly a wrapper around the same function on the ``Database3`` class,
but knows how to return the current value as well.
See Also
--------
Database3.getHistory
"""
# make a copy so that we can potentially remove timesteps without affecting the
# caller
timeSteps = copy.copy(timeSteps)
now = (self.r.p.cycle, self.r.p.timeNode)
nowRequested = timeSteps is None
if timeSteps is not None and now in timeSteps:
nowRequested = True
timeSteps.remove(now)
if byLocation:
history = self.database.getHistoryByLocation(comp, params, timeSteps)
else:
history = self.database.getHistory(comp, params, timeSteps)
if nowRequested:
for param in params or history.keys():
if param == "location":
history[param][now] = tuple(comp.spatialLocator.indices)
else:
history[param][now] = comp.p[param]
return history
[docs] def getHistories(
self,
comps: Sequence[ArmiObject],
params: Optional[Sequence[str]] = None,
timeSteps: Optional[MutableSequence[Tuple[int, int]]] = None,
byLocation: bool = False,
) -> Histories:
"""
Get historical parameter values for one or more objects.
This is mostly a wrapper around the same function on the ``Database3`` class,
but knows how to return the current value as well.
See Also
--------
Database3.getHistories
"""
now = (self.r.p.cycle, self.r.p.timeNode)
nowRequested = timeSteps is None
if timeSteps is not None:
# make a copy so that we can potentially remove timesteps without affecting
# the caller
timeSteps = copy.copy(timeSteps)
if timeSteps is not None and now in timeSteps:
nowRequested = True
timeSteps.remove(now)
if byLocation:
histories = self.database.getHistoriesByLocation(comps, params, timeSteps)
else:
histories = self.database.getHistories(comps, params, timeSteps)
if nowRequested:
for c in comps:
for param in params or histories[c].keys():
if param == "location":
histories[c][param][now] = c.spatialLocator.indices
else:
histories[c][param][now] = c.p[param]
return histories
[docs]class Database3:
"""
Version 3 of the ARMI Database, handling serialization and loading of Reactor states.
This implementation of the database pushes all objects in the Composite Reactor
Model into the database. This process is aided by the ``Layout`` class, which
handles the packing and unpacking of the structure of the objects, their
relationships, and their non-parameter attributes.
See Also
--------
`doc/user/outputs/database` for more details.
"""
timeNodeGroupPattern = re.compile(r"^c(\d\d)n(\d\d)$")
def __init__(self, fileName: os.PathLike, permission: str):
"""
Create a new Database3 object.
Parameters
----------
fileName:
name of the file
permission:
file permissions, write ("w") or read ("r")
"""
self._fileName = fileName
# No full path yet; we will determine this based on FAST_PATH and permissions
self._fullPath: Optional[str] = None
self._permission = permission
self.h5db: Optional[h5py.File] = None
# Allows context management on open files.
# If context management is used on a file that is already open, it will not reopen
# and it will also not close after leaving that context.
# This allows the treatment of all databases the same whether they are open or
# closed.
self._openCount: int = 0
if permission == "w":
self.version = DB_VERSION
else:
# will be set upon read
self._version = None
self._versionMajor = None
self._versionMinor = None
@property
def version(self) -> str:
return self._version
@version.setter
def version(self, value: str):
self._version = value
self._versionMajor, self._versionMinor = (int(v) for v in value.split("."))
@property
def versionMajor(self):
return self._versionMajor
@property
def versionMinor(self):
return self._versionMinor
def __repr__(self):
return "<{} {}>".format(
self.__class__.__name__, repr(self.h5db).replace("<", "").replace(">", "")
)
[docs] def open(self):
if self.h5db is not None:
raise ValueError(
"This database is already open; make sure to close it "
"before trying to open it again."
)
filePath = self._fileName
self._openCount += 1
if self._permission in {"r", "a"}:
self._fullPath = os.path.abspath(filePath)
self.h5db = h5py.File(filePath, self._permission)
self.version = self.h5db.attrs["databaseVersion"]
return
if self._permission == "w":
# assume fast path!
filePath = os.path.join(context.getFastPath(), filePath)
self._fullPath = os.path.abspath(filePath)
else:
runLog.error("Unrecognized file permissions `{}`".format(self._permission))
raise ValueError(
"Cannot open database with permission `{}`".format(self._permission)
)
runLog.info("Opening database file at {}".format(os.path.abspath(filePath)))
self.h5db = h5py.File(filePath, self._permission)
self.h5db.attrs["successfulCompletion"] = False
self.h5db.attrs["version"] = meta.__version__
self.h5db.attrs["databaseVersion"] = self.version
self.h5db.attrs["user"] = context.USER
self.h5db.attrs["python"] = sys.version
self.h5db.attrs["armiLocation"] = os.path.dirname(context.ROOT)
self.h5db.attrs["startTime"] = context.START_TIME
self.h5db.attrs["machines"] = numpy.array(context.MPI_NODENAMES).astype("S")
# store platform data
platform_data = uname()
self.h5db.attrs["platform"] = platform_data.system
self.h5db.attrs["hostname"] = platform_data.node
self.h5db.attrs["platformRelease"] = platform_data.release
self.h5db.attrs["platformVersion"] = platform_data.version
self.h5db.attrs["platformArch"] = platform_data.processor
# store app and plugin data
app = getApp()
self.h5db.attrs["appName"] = app.name
plugins = app.pluginManager.list_name_plugin()
ps = [
(os.path.abspath(sys.modules[p[1].__module__].__file__), p[1].__name__)
for p in plugins
]
ps = numpy.array([str(p[0]) + ":" + str(p[1]) for p in ps]).astype("S")
self.h5db.attrs["pluginPaths"] = ps
# store the commit hash of the local repo
self.h5db.attrs["localCommitHash"] = Database3.grabLocalCommitHash()
[docs] @staticmethod
def grabLocalCommitHash():
"""
Try to determine the local Git commit.
We have to be sure to handle the errors where the code is run on a system that
doesn't have Git installed. Or if the code is simply not run from inside a repo.
Returns
-------
str
The commit hash if it exists, otherwise "unknown".
"""
unknown = "unknown"
if not shutil.which("git"):
# no git available. cannot check git info
return unknown
repo_exists = (
subprocess.run(
"git rev-parse --git-dir".split(),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
).returncode
== 0
and subprocess.run(
["git", "describe"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
).returncode
== 0
)
if repo_exists:
try:
commit_hash = subprocess.check_output(["git", "describe"])
return commit_hash.decode("utf-8").strip()
except:
return unknown
else:
return unknown
[docs] def close(self, completedSuccessfully=False):
"""Close the DB and perform cleanups and auto-conversions."""
self._openCount = 0
if self.h5db is None:
return
if self._permission == "w":
self.h5db.attrs["successfulCompletion"] = completedSuccessfully
# a bit redundant to call flush, but with unreliable IO issues, why not?
self.h5db.flush()
self.h5db.close()
self.h5db = None
if self._permission == "w":
# move out of the FAST_PATH and into the working directory
shutil.move(self._fullPath, self._fileName)
[docs] def splitDatabase(
self, keepTimeSteps: Sequence[Tuple[int, int]], label: str
) -> str:
"""
Discard all data except for specific time steps, retaining old data in a separate file.
This is useful when performing more exotic analyses, where each "time step" may
not represent a specific point in time, but something more nuanced. For example,
equilibrium cases store a new "cycle" for each iteration as it attempts to
converge the equilibrium cycle. At the end of the run, the last "cycle" is the
converged equilibrium cycle, whereas the previous cycles constitute the path to
convergence, which we typically wish to discard before further analysis.
Parameters
----------
keepTimeSteps
A collection of the time steps to retain
label
An informative label for the backed-up database. Usually something like
"-all-iterations". Will be interposed between the source name and the ".h5"
extension.
Returns
-------
str
The name of the new, backed-up database file.
"""
if self.h5db is None:
raise ValueError("There is no open database to split.")
self.h5db.close()
backupDBPath = os.path.abspath(label.join(os.path.splitext(self._fileName)))
runLog.info("Retaining full database history in {}".format(backupDBPath))
if self._fullPath is not None:
shutil.move(self._fullPath, backupDBPath)
self.h5db = h5py.File(self._fullPath, self._permission)
dbOut = self.h5db
with h5py.File(backupDBPath, "r") as dbIn:
dbOut.attrs.update(dbIn.attrs)
# Copy everything except time node data
timeSteps = set()
for groupName, _ in dbIn.items():
m = self.timeNodeGroupPattern.match(groupName)
if m:
timeSteps.add((int(m.group(1)), int(m.group(2))))
else:
dbIn.copy(groupName, dbOut)
if not set(keepTimeSteps).issubset(timeSteps):
raise ValueError(
"Not all desired time steps ({}) are even present in the "
"database".format(keepTimeSteps)
)
minCycle = next(iter(sorted(keepTimeSteps)))[0]
for cycle, node in keepTimeSteps:
offsetCycle = cycle - minCycle
offsetGroupName = getH5GroupName(offsetCycle, node)
dbIn.copy(getH5GroupName(cycle, node), dbOut, name=offsetGroupName)
dbOut[offsetGroupName + "/Reactor/cycle"][()] = offsetCycle
return backupDBPath
@property
def fileName(self):
return self._fileName
@fileName.setter
def fileName(self, fName):
if self.h5db is not None:
raise RuntimeError("Cannot change Database file name while it's opened!")
self._fileName = fName
[docs] def loadCS(self):
"""Attempt to load settings from the database file
Notes
-----
There are no guarantees here. If the database was written from a different version of ARMI than you are using,
these results may not be usable. For instance, the database could have been written from a vastly old or future
version of ARMI from the code you are using.
"""
cs = settings.Settings()
cs.caseTitle = os.path.splitext(os.path.basename(self.fileName))[0]
try:
cs.loadFromString(self.h5db["inputs/settings"].asstr()[()])
except KeyError:
# not all paths to writing a database require inputs to be written to the
# database. Technically, settings do affect some of the behavior of database
# reading, so not having the settings that made the reactor that went into
# the database is not ideal. However, this isn't the right place to crash
# into it. Ideally, there would be not way to not have the settings in the
# database (force writing in writeToDB), or to make reading invariant to
# settings.
pass
return cs
[docs] def loadBlueprints(self):
"""Attempt to load reactor blueprints from the database file
Notes
-----
There are no guarantees here. If the database was written from a different version of ARMI than you are using,
these results may not be usable. For instance, the database could have been written from a vastly old or future
version of ARMI from the code you are using.
"""
# Blueprints use the yamlize package, which uses class attributes to define much of the class's behavior
# through metaclassing. Therefore, we need to be able to import all plugins *before* importing blueprints.
from armi.reactor.blueprints import (
Blueprints,
) # pylint: disable=import-outside-toplevel
bpString = None
try:
bpString = self.h5db["inputs/blueprints"].asstr()[()]
except KeyError:
# not all reactors need to be created from blueprints, so they may not exist
pass
if not bpString:
# looks like no blueprints contents
return None
stream = io.StringIO(bpString)
stream = Blueprints.migrate(stream)
bp = Blueprints.load(stream)
return bp
[docs] def loadGeometry(self):
"""
This is primarily just used for migrations.
The "geometry files" were replaced by ``systems:`` and ``grids:`` sections of ``Blueprints``.
"""
geom = systemLayoutInput.SystemLayoutInput()
geom.readGeomFromStream(io.StringIO(self.h5db["inputs/geomFile"].asstr()[()]))
return geom
[docs] def mergeHistory(self, inputDB, startCycle, startNode):
"""
Copy time step data up to, but not including the passed cycle and node.
Notes
-----
This is used for restart runs with the standard operator for example.
The current time step (being loaded from) should not be copied, as that
time steps data will be written at the end of the time step.
"""
# iterate over the top level H5Groups and copy
for time, h5ts in zip(inputDB.genTimeSteps(), inputDB.genTimeStepGroups()):
cyc, tn = time
if cyc == startCycle and tn == startNode:
# all data up to current state are merged
return
self.h5db.copy(h5ts, h5ts.name)
if inputDB.versionMinor < 2:
# The source database may have object references in some attributes.
# make sure to link those up using our manual path strategy.
references = []
def findReferences(name, obj):
for key, attr in obj.attrs.items():
if isinstance(attr, h5py.h5r.Reference):
references.append((name, key, inputDB.h5db[attr].name))
h5ts.visititems(findReferences)
for key, attr, path in references:
destTs = self.h5db[h5ts.name]
destTs[key].attrs[attr] = "@{}".format(path)
def __enter__(self):
"""Context management support"""
if self._openCount == 0:
# open also increments _openCount
self.open()
else:
self._openCount += 1
return self
def __exit__(self, type, value, traceback):
"""Typically we don't care why it broke but we want the DB to close"""
self._openCount -= 1
# always close if there is a traceback.
if self._openCount == 0 or traceback:
self.close(all(i is None for i in (type, value, traceback)))
def __del__(self):
if self.h5db is not None:
self.close(False)
def __delitem__(self, tn: Tuple[int, int, Optional[str]]):
cycle, timeNode, statePointName = tn
name = getH5GroupName(cycle, timeNode, statePointName)
if self.h5db is not None:
del self.h5db[name]
[docs] def genTimeStepGroups(
self, timeSteps: Sequence[Tuple[int, int]] = None
) -> Generator[h5py._hl.group.Group, None, None]:
"""
Returns a generator of HDF5 Groups for all time nodes, or for the passed selection.
"""
assert (
self.h5db is not None
), "Must open the database before calling genTimeStepGroups"
if timeSteps is None:
for groupName, h5TimeNodeGroup in sorted(self.h5db.items()):
match = self.timeNodeGroupPattern.match(groupName)
if match:
yield h5TimeNodeGroup
else:
for step in timeSteps:
yield self.h5db[getH5GroupName(*step)]
[docs] def getLayout(self, cycle, node):
"""
Return a Layout object representing the requested cycle and time node.
"""
version = (self._versionMajor, self._versionMinor)
timeGroupName = getH5GroupName(cycle, node)
return Layout(version, self.h5db[timeGroupName])
[docs] def genTimeSteps(self) -> Generator[Tuple[int, int], None, None]:
"""
Returns a generator of (cycle, node) tuples that are present in the DB.
"""
assert (
self.h5db is not None
), "Must open the database before calling genTimeSteps"
for groupName in sorted(self.h5db.keys()):
match = self.timeNodeGroupPattern.match(groupName)
if match:
cycle = int(match.groups()[0])
node = int(match.groups()[1])
yield (cycle, node)
[docs] def genAuxiliaryData(self, ts: Tuple[int, int]) -> Generator[str, None, None]:
"""
Returns a generator of names of auxiliary data on the requested time point.
"""
assert (
self.h5db is not None
), "Must open the database before calling genAuxiliaryData"
cycle, node = ts
groupName = getH5GroupName(cycle, node)
timeGroup = self.h5db[groupName]
exclude = set(ArmiObject.TYPES.keys())
exclude.add("layout")
return (groupName + "/" + key for key in timeGroup.keys() if key not in exclude)
[docs] @staticmethod
def getAuxiliaryDataPath(ts: Tuple[int, int], name: str) -> str:
return getH5GroupName(*ts) + "/" + name
[docs] def keys(self):
return (g.name for g in self.genTimeStepGroups())
[docs] def getH5Group(self, r, statePointName=None):
"""
Get the H5Group for the current ARMI timestep.
This method can be used to allow other interfaces to place data into the database
at the correct timestep.
"""
groupName = getH5GroupName(r.p.cycle, r.p.timeNode, statePointName)
if groupName in self.h5db:
return self.h5db[groupName]
else:
group = self.h5db.create_group(groupName)
group.attrs["cycle"] = r.p.cycle
group.attrs["timeNode"] = r.p.timeNode
return group
[docs] def hasTimeStep(self, cycle, timeNode, statePointName=""):
"""
Returns True if (cycle, timeNode, statePointName) is contained in the database.
"""
return getH5GroupName(cycle, timeNode, statePointName) in self.h5db
[docs] def writeToDB(self, reactor, statePointName=None):
assert self.h5db is not None, "Database must be open before writing."
# _createLayout is recursive
h5group = self.getH5Group(reactor, statePointName)
runLog.info("Writing to database for statepoint: {}".format(h5group.name))
layout = Layout((self.versionMajor, self.versionMinor), comp=reactor)
layout.writeToDB(h5group)
groupedComps = layout.groupedComps
for comps in groupedComps.values():
self._writeParams(h5group, comps)
[docs] def syncToSharedFolder(self):
"""
Copy DB to run working directory.
Needed when multiple MPI processes need to read the same db, for example
when a history is needed from independent runs (e.g. for fuel performance on
a variety of assemblies).
Notes
-----
At some future point, we may implement a client-server like DB system which
would render this kind of operation unnecessary.
"""
runLog.extra("Copying DB to shared working directory.")
self.h5db.flush()
shutil.copy(self._fullPath, self._fileName)
[docs] def load(
self,
cycle,
node,
cs=None,
bp=None,
statePointName=None,
allowMissing=False,
updateGlobalAssemNum=True,
):
"""Load a new reactor from (cycle, node).
Case settings and blueprints can be provided by the client, or read from the database itself.
Providing these from the client could be useful when performing snapshot runs
or where it is expected to use results from a run using different settings and
continue with new settings (or if blueprints are not on the database).
Geom is read from the database itself.
Parameters
----------
cycle : int
cycle number
node : int
time node
cs : armi.settings.Settings (optional)
if not provided one is read from the database
bp : armi.reactor.Blueprints (optional)
if not provided one is read from the database
statePointName : str
Optional arbitrary statepoint name (e.g., "special" for "c00n00-special/")
allowMissing : bool
Whether to emit a warning, rather than crash if reading a database
with undefined parameters. Default False.
updateGlobalAssemNum : bool
Whether to update the global assembly number to the value stored in
r.core.p.maxAssemNum. Default True.
Returns
-------
root : ArmiObject
The top-level object stored in the database; usually a Reactor.
"""
runLog.info("Loading reactor state for time node ({}, {})".format(cycle, node))
cs = cs or self.loadCS()
# apply to avoid defaults in getMasterCs calls
settings.setMasterCs(cs)
bp = bp or self.loadBlueprints()
h5group = self.h5db[getH5GroupName(cycle, node, statePointName)]
layout = Layout((self.versionMajor, self.versionMinor), h5group=h5group)
comps, groupedComps = layout._initComps(cs.caseTitle, bp)
# populate data onto initialized components
for compType, compTypeList in groupedComps.items():
self._readParams(h5group, compType, compTypeList, allowMissing=allowMissing)
# assign params from blueprints
if bp is not None:
self._assignBlueprintsParams(bp, groupedComps)
# stitch together
self._compose(iter(comps), cs)
# also, make sure to update the global serial number so we don't re-use a number
parameterCollections.GLOBAL_SERIAL_NUM = max(
parameterCollections.GLOBAL_SERIAL_NUM, layout.serialNum.max()
)
root = comps[0][0]
# ensure the max assembly number is correct, unless the user says no
if updateGlobalAssemNum:
updateGlobalAssemblyNum(root)
# usually a reactor object
return root
@staticmethod
def _assignBlueprintsParams(blueprints, groupedComps):
for compType, designs in (
(Block, blueprints.blockDesigns),
(Assembly, blueprints.assemDesigns),
):
paramsToSet = {
pDef.name
for pDef in compType.pDefs.inCategory(
parameters.Category.assignInBlueprints
)
}
for comp in groupedComps[compType]:
design = designs[comp.p.type]
for pName in paramsToSet:
val = getattr(design, pName)
if val is not None:
comp.p[pName] = val
def _compose(self, comps, cs, parent=None):
"""Given a flat collection of all of the ArmiObjects in the model, reconstitute the hierarchy."""
comp, _, numChildren, location = next(comps)
# attach the parent early, if provided; some cases need the parent attached for
# the rest of _compose to work properly.
comp.parent = parent
# The Reactor adds a Core child by default, this is not ideal
for spontaneousChild in list(comp):
comp.remove(spontaneousChild)
if isinstance(comp, Core):
pass
elif isinstance(comp, Assembly):
# Assemblies force their name to be something based on assemNum. When the
# assembly is created it gets a new assemNum, and throws out the correct
# name that we read from the DB
comp.name = comp.makeNameFromAssemNum(comp.p.assemNum)
comp.lastLocationLabel = Assembly.DATABASE
# set the spatialLocators on each component
if location is not None:
if parent is not None and parent.spatialGrid is not None:
comp.spatialLocator = parent.spatialGrid[location]
else:
comp.spatialLocator = grids.CoordinateLocation(
location[0], location[1], location[2], None
)
# Need to keep a collection of Component instances for linked dimension
# resolution, before they can be add()ed to their parents. Not just filtering
# out of `children`, since resolveLinkedDims() needs a dict
childComponents = collections.OrderedDict()
children = []
for _ in range(numChildren):
child = self._compose(comps, cs, parent=comp)
children.append(child)
if isinstance(child, Component):
childComponents[child.name] = child
for _childName, child in childComponents.items():
child.resolveLinkedDims(childComponents)
for child in children:
comp.add(child)
if isinstance(comp, Core):
# TODO: This is also an issue related to geoms and which core is "The Core".
# We only have a good geom for the main core, so can't do process loading on
# the SFP, etc.
if comp.hasFlags(Flags.CORE):
comp.processLoading(cs, dbLoad=True)
elif isinstance(comp, Assembly):
comp.calculateZCoords()
return comp
def _writeParams(self, h5group, comps):
c = comps[0]
groupName = c.__class__.__name__
if groupName not in h5group:
# Only create the group if it doesnt already exist. This happens when
# re-writing params in the same time node (e.g. something changed between
# EveryNode and EOC)
g = h5group.create_group(groupName)
else:
g = h5group[groupName]
for paramDef in c.p.paramDefs.toWriteToDB():
attrs = {}
if hasattr(c, "DIMENSION_NAMES") and paramDef.name in c.DIMENSION_NAMES:
linkedDims = []
data = []
for _, c in enumerate(comps):
val = c.p[paramDef.name]
if isinstance(val, tuple):
linkedDims.append("{}.{}".format(val[0].name, val[1]))
data.append(val[0].getDimension(val[1]))
else:
linkedDims.append("")
data.append(val)
data = numpy.array(data)
if any(linkedDims):
attrs["linkedDims"] = numpy.array(linkedDims).astype("S")
else:
# XXX: side effect is that after loading previously unset values will be
# the default
temp = [c.p.get(paramDef.name, paramDef.default) for c in comps]
if paramDef.serializer is not None:
data, sAttrs = paramDef.serializer.pack(temp)
assert (
data.dtype.kind != "O"
), "{} failed to convert {} to a numpy-supported type.".format(
paramDef.serializer.__name__, paramDef.name
)
attrs.update(sAttrs)
attrs[_SERIALIZER_NAME] = paramDef.serializer.__name__
attrs[_SERIALIZER_VERSION] = paramDef.serializer.version
else:
data = numpy.array(temp)
del temp
# Convert Unicode to byte-string
if data.dtype.kind == "U":
data = data.astype("S")
if data.dtype.kind == "O":
# Something was added to the data array that caused numpy to want to
# treat it as a general-purpose Object array. This usually happens
# because:
# - the data contain NoDefaults
# - the data contain one or more Nones,
# - the data contain special types like tuples, dicts, etc
# - the data are composed of arrays that numpy would otherwise happily
# convert to a higher-order array, but the dimensions of the sub-arrays
# are inconsistent ("jagged")
# - there is some sort of honest-to-goodness weird object
# We want to support the first two cases with minimal intrusion, since
# these should be pretty easy to faithfully represent in the db. The
# jagged case should be supported as well, but may require a less
# faithful representation (e.g. flattened), but the last case isn't
# really worth supporting.
# Here is one proposal:
# - Check to see if the array is jagged. all(shape == shape[0]). If not,
# flatten, store the data offsets and array shapes, and None locations
# as attrs
# - If not jagged, all top-level ndarrays are the same shape, so it is
# easier to replace Nones with ndarrays filled with special values.
if parameters.NoDefault in data:
data = None
else:
data, specialAttrs = packSpecialData(data, paramDef.name)
attrs.update(specialAttrs)
if data is None:
continue
try:
if paramDef.name in g:
raise ValueError(
"`{}` was already in `{}`. This time node "
"should have been empty".format(paramDef.name, g)
)
dataset = g.create_dataset(paramDef.name, data=data, compression="gzip")
if any(attrs):
_writeAttrs(dataset, h5group, attrs)
except Exception:
runLog.error(
"Failed to write {} to database. Data: "
"{}".format(paramDef.name, data)
)
raise
if isinstance(c, Block):
self._addHomogenizedNumberDensityParams(comps, g)
@staticmethod
def _addHomogenizedNumberDensityParams(blocks, h5group):
"""
Create on-the-fly block homog. number density params for XTVIEW viewing.
See also
--------
collectBlockNumberDensities
"""
nDens = collectBlockNumberDensities(blocks)
for nucName, numDens in nDens.items():
h5group.create_dataset(nucName, data=numDens, compression="gzip")
@staticmethod
def _readParams(h5group, compTypeName, comps, allowMissing=False):
g = h5group[compTypeName]
renames = getApp().getParamRenames()
pDefs = comps[0].pDefs
# this can also be made faster by specializing the method by type
for paramName, dataSet in g.items():
# Honor historical databases where the parameters may have changed names
# since.
while paramName in renames:
paramName = renames[paramName]
try:
pDef = pDefs[paramName]
except KeyError:
if re.match(r"^n[A-Z][a-z]?\d*", paramName):
# This is a temporary viz param (number density) made by
# _addHomogenizedNumberDensityParams ignore it safely
continue
else:
# If a parameter exists in the database but not in the application
# reading it, we can technically keep going. Since this may lead to
# potential correctness issues, raise a warning
if allowMissing:
runLog.warning(
"Found `{}` parameter `{}` in the database, which is not defined. "
"Ignoring it.".format(compTypeName, paramName)
)
continue
else:
raise
data = dataSet[:]
attrs = _resolveAttrs(dataSet.attrs, h5group)
if pDef.serializer is not None:
assert _SERIALIZER_NAME in dataSet.attrs
assert dataSet.attrs[_SERIALIZER_NAME] == pDef.serializer.__name__
assert _SERIALIZER_VERSION in dataSet.attrs
data = numpy.array(
pDef.serializer.unpack(
data, dataSet.attrs[_SERIALIZER_VERSION], attrs
)
)
if data.dtype.type is numpy.string_:
data = numpy.char.decode(data)
if attrs.get("specialFormatting", False):
data = unpackSpecialData(data, attrs, paramName)
linkedDims = []
if "linkedDims" in attrs:
linkedDims = numpy.char.decode(attrs["linkedDims"])
# iterating of numpy is not fast...
for c, val, linkedDim in itertools.zip_longest(
comps, data.tolist(), linkedDims, fillvalue=""
):
try:
if linkedDim != "":
c.p[paramName] = linkedDim
else:
c.p[paramName] = val
except AssertionError as ae:
# happens when a param was deprecated but being loaded from old DB
runLog.warning(
f"{str(ae)}\nSkipping load of invalid param `{paramName}`"
" (possibly loading from old DB)\n"
)
[docs] def getHistoryByLocation(
self,
comp: ArmiObject,
params: Optional[List[str]] = None,
timeSteps: Optional[Sequence[Tuple[int, int]]] = None,
) -> History:
"""Get the parameter histories at a specific location."""
return self.getHistoriesByLocation([comp], params=params, timeSteps=timeSteps)[
comp
]
[docs] def getHistoriesByLocation(
self,
comps: Sequence[ArmiObject],
params: Optional[List[str]] = None,
timeSteps: Optional[Sequence[Tuple[int, int]]] = None,
) -> Histories:
"""
Get the parameter histories at specific locations.
This has a number of limitations, which should in practice not be too limiting:
- The passed objects must have IndexLocations. This type of operation doesn't
make much sense otherwise.
- The passed objects must exist in a hierarchy that leads to a Core
object, which serves as an anchor that can fully define all index locations.
This could possibly be made more general by extending grids, but that gets a
little more complicated.
- All requested objects must exist under the **same** anchor object, and at the
same depth below it.
- All requested objects must have the same type.
Parameters
==========
comps : list of ArmiObject
The components/composites that currently occupy the location that you want
histories at. ArmiObjects are passed, rather than locations, because this
makes it easier to figure out things related to layout.
params : List of str, optional
The parameter names for the parameters that we want the history of. If None,
all parameter history is given
timeSteps : List of (cycle, node) tuples, optional
The time nodes that you want history for. If None, all available time nodes
will be returned.
"""
if self.versionMinor < 4:
raise ValueError(
f"Location-based histories are only supported for db "
"version 3.4 and greater. This database is version "
"{self.versionMajor}, {self.versionMinor}."
)
locations = [c.spatialLocator.getCompleteIndices() for c in comps]
histData: Histories = {
c: collections.defaultdict(collections.OrderedDict) for c in comps
}
# Check our assumptions about the passed locations:
# All locations must have the same parent and bear the same relationship to the
# anchor object
anchors = {
obj.getAncestorAndDistance(lambda a: isinstance(a, Core)) for obj in comps
}
if len(anchors) != 1:
raise ValueError(
"The passed objects do not have the same anchor or distance to that "
"anchor; encountered the following: {}".format(anchors)
)
anchorInfo = anchors.pop()
if anchorInfo is not None:
anchor, anchorDistance = anchorInfo
else:
raise ValueError(
"Could not determine an anchor object for the passed components"
)
anchorSerialNum = anchor.p.serialNum
# All objects of the same type
objectTypes = {type(obj) for obj in comps}
if len(objectTypes) != 1:
raise TypeError(
"The passed objects must be the same type; got objects of "
"types `{}`".format(objectTypes)
)
compType = objectTypes.pop()
objClassName = compType.__name__
locToComp = {c.spatialLocator.getCompleteIndices(): c for c in comps}
for h5TimeNodeGroup in self.genTimeStepGroups(timeSteps):
if "layout" not in h5TimeNodeGroup:
# layout hasnt been written for this time step, so we can't get anything
# useful here. Perhaps the current value is of use, in which case the
# DatabaseInterface should be used.
continue
cycle = h5TimeNodeGroup.attrs["cycle"]
timeNode = h5TimeNodeGroup.attrs["timeNode"]
layout = Layout(
(self.versionMajor, self.versionMinor), h5group=h5TimeNodeGroup
)
ancestors = layout.computeAncestors(
layout.serialNum, layout.numChildren, depth=anchorDistance
)
lLocation = layout.location
# filter for objects that live under the desired ancestor and at a desired location
objectIndicesInLayout = numpy.array(
[
i
for i, (ancestor, loc) in enumerate(zip(ancestors, lLocation))
if ancestor == anchorSerialNum and loc in locations
]
)
# This could also be way more efficient if lLocation were a numpy array
objectLocationsInLayout = [lLocation[i] for i in objectIndicesInLayout]
objectIndicesInData = numpy.array(layout.indexInData)[
objectIndicesInLayout
].tolist()
try:
h5GroupForType = h5TimeNodeGroup[objClassName]
except KeyError as ee:
runLog.error(
"{} not found in {} of {}".format(
objClassName, h5TimeNodeGroup, self
)
)
raise ee
for paramName in params or h5GroupForType.keys():
if paramName == "location":
# location is special, since it is stored in layout/
data = numpy.array(layout.location)[objectIndicesInLayout]
elif paramName in h5GroupForType:
dataSet = h5GroupForType[paramName]
try:
data = dataSet[objectIndicesInData]
except:
runLog.error(
"Failed to load index {} from {}@{}".format(
objectIndicesInData, dataSet, (cycle, timeNode)
)
)
raise
if data.dtype.type is numpy.string_:
data = numpy.char.decode(data)
if dataSet.attrs.get("specialFormatting", False):
if dataSet.attrs.get("nones", False):
data = replaceNonsenseWithNones(data, paramName)
else:
raise ValueError(
"History tracking for non-None, "
"special-formatted parameters is not supported: "
"{}, {}".format(
paramName, {k: v for k, v in dataSet.attrs.items()}
)
)
else:
# Nothing in the database for this param, so use the default value
data = numpy.repeat(
parameters.byNameAndType(paramName, compType).default,
len(comps),
)
# store data to the appropriate comps. This is where taking components
# as the argument (rather than locations) is a little bit peculiar.
#
# At this point, `data` are arranged by the order of elements in
# `objectIndicesInData`, which corresponds to the order of
# `objectIndicesInLayout`
for loc, val in zip(objectLocationsInLayout, data.tolist()):
comp = locToComp[loc]
histData[comp][paramName][cycle, timeNode] = val
return histData
[docs] def getHistory(
self,
comp: ArmiObject,
params: Optional[Sequence[str]] = None,
timeSteps: Optional[Sequence[Tuple[int, int]]] = None,
) -> History:
"""
Get parameter history for a single ARMI Object.
Parameters
----------
comps
An individual ArmiObject
params
parameters to gather
Returns
-------
dict
Dictionary of str/list pairs.
"""
return self.getHistories([comp], params, timeSteps)[comp]
[docs] def getHistories(
self,
comps: Sequence[ArmiObject],
params: Optional[Sequence[str]] = None,
timeSteps: Optional[Sequence[Tuple[int, int]]] = None,
) -> Histories:
"""
Get the parameter histories for a sequence of ARMI Objects.
This implementation is unaware of the state of the reactor outside of the
database itself, and is therefore not usually what client code should be calling
directly during normal ARMI operation. It only knows about historical data that
have actually been written to the database. Usually one wants to be able to get
historical, plus current data, for which the similar method on the
DatabaseInterface may be more useful.
Parameters
==========
comps
Something that is iterable multiple times
params
parameters to gather.
timeSteps
Selection of time nodes to get data for. If omitted, return full history
Returns
=======
dict
Dictionary ArmiObject (input): dict of str/list pairs containing ((cycle,
node), value).
"""
histData: Histories = {
c: collections.defaultdict(collections.OrderedDict) for c in comps
}
types = {c.__class__ for c in comps}
compsByTypeThenSerialNum: Dict[Type[ArmiObject], Dict[int, ArmiObject]] = {
t: dict() for t in types
}
for c in comps:
compsByTypeThenSerialNum[c.__class__][c.p.serialNum] = c
for h5TimeNodeGroup in self.genTimeStepGroups(timeSteps):
if "layout" not in h5TimeNodeGroup:
# Layout hasn't been written for this time step, so whatever is in there
# didn't come from the DatabaseInterface. Probably because it's the
# current time step and something has created the group to store aux
# data
continue
cycle = h5TimeNodeGroup.attrs["cycle"]
timeNode = h5TimeNodeGroup.attrs["timeNode"]
layout = Layout(
(self.versionMajor, self.versionMinor), h5group=h5TimeNodeGroup
)
for compType, compsBySerialNum in compsByTypeThenSerialNum.items():
compTypeName = compType.__name__
try:
h5GroupForType = h5TimeNodeGroup[compTypeName]
except KeyError as ee:
runLog.error(
"{} not found in {} of {}".format(
compTypeName, h5TimeNodeGroup, self
)
)
raise ee
layoutIndicesForType = numpy.where(layout.type == compTypeName)[0]
serialNumsForType = layout.serialNum[layoutIndicesForType].tolist()
layoutIndexInData = layout.indexInData[layoutIndicesForType].tolist()
indexInData = []
reorderedComps = []
for ii, sn in zip(layoutIndexInData, serialNumsForType):
d = compsBySerialNum.get(sn, None)
if d is not None:
indexInData.append(ii)
reorderedComps.append(d)
if not indexInData:
continue
# note this is very similar to _readParams, but there are some important
# differences.
# 1) we are not assigning to p[paramName]
# 2) not using linkedDims at all
# 3) not performing parameter renaming. This may become necessary
for paramName in params or h5GroupForType.keys():
if paramName == "location":
# cast to a numpy array so that we can use list indices
data = numpy.array(layout.location)[layoutIndicesForType][
indexInData
]
elif paramName in h5GroupForType:
dataSet = h5GroupForType[paramName]
try:
data = dataSet[indexInData]
except:
runLog.error(
"Failed to load index {} from {}@{}".format(
indexInData, dataSet, (cycle, timeNode)
)
)
raise
if data.dtype.type is numpy.string_:
data = numpy.char.decode(data)
if dataSet.attrs.get("specialFormatting", False):
if dataSet.attrs.get("nones", False):
data = replaceNonsenseWithNones(data, paramName)
else:
raise ValueError(
"History tracking for non-none special formatting "
"not supported: {}, {}".format(
paramName,
{k: v for k, v in dataSet.attrs.items()},
)
)
else:
# Nothing in the database, so use the default value
data = numpy.repeat(
parameters.byNameAndType(paramName, compType).default,
len(reorderedComps),
)
# iterating of numpy is not fast..
for c, val in zip(reorderedComps, data.tolist()):
if isinstance(val, list):
val = numpy.array(val)
histData[c][paramName][cycle, timeNode] = val
r = comps[0].getAncestorWithFlags(Flags.REACTOR)
cycleNode = r.p.cycle, r.p.timeNode
for c, paramHistories in histData.items():
for paramName, hist in paramHistories.items():
if cycleNode not in hist:
try:
hist[cycleNode] = c.p[paramName]
except:
if paramName == "location":
hist[cycleNode] = c.spatialLocator.indices
return histData
def _packLocations(
locations: List[grids.LocationBase], minorVersion: int = DB_MINOR
) -> Tuple[List[str], List[Tuple[int, int, int]]]:
"""
Extract information from a location needed to write it to this DB.
Each locator has one locationType and up to N location-defining datums,
where N is the number of entries in a possible multiindex, or just 1
for everything else.
Shrink grid locator names for storage efficiency.
Notes
-----
Contains some conditionals to still load databases made before
db version 3.3 which can be removed once no users care about
those DBs anymore.
"""
if minorVersion <= 2:
locationTypes, locationData = _packLocationsV1(locations)
elif minorVersion == 3:
locationTypes, locationData = _packLocationsV2(locations)
elif minorVersion > 3:
locationTypes, locationData = _packLocationsV3(locations)
else:
raise ValueError("Unsupported minor version: {}".format(minorVersion))
return locationTypes, locationData
def _packLocationsV1(
locations: List[grids.LocationBase],
) -> Tuple[List[str], List[Tuple[int, int, int]]]:
"""Delete when reading v <=3.2 DB's no longer wanted."""
locTypes = []
locData: List[Tuple[int, int, int]] = []
for loc in locations:
locationType = loc.__class__.__name__
if loc is None:
locationType = "None"
locDatum = [(0.0, 0.0, 0.0)]
elif isinstance(loc, grids.IndexLocation):
locDatum = [loc.indices]
else:
raise ValueError(f"Invalid location type: {loc}")
locTypes.append(locationType)
locData.extend(locDatum)
return locTypes, locData
def _packLocationsV2(
locations: List[grids.LocationBase],
) -> Tuple[List[str], List[Tuple[int, int, int]]]:
"""
Location packing implementation for minor version 3. See release notes above.
"""
locTypes = []
locData: List[Tuple[int, int, int]] = []
for loc in locations:
locationType = LOCATION_TYPE_LABELS[type(loc)]
if loc is None:
locDatum = [(0.0, 0.0, 0.0)]
elif loc.__class__ is grids.CoordinateLocation:
locDatum = [loc.indices]
elif loc.__class__ is grids.IndexLocation:
locDatum = [loc.indices]
elif loc.__class__ is grids.MultiIndexLocation:
# encode number of sub-locations to allow in-line unpacking.
locationType += f"{len(loc)}"
locDatum = [subloc.indices for subloc in loc]
else:
raise ValueError(f"Invalid location type: {loc}")
locTypes.append(locationType)
locData.extend(locDatum)
return locTypes, locData
def _packLocationsV3(
locations: List[grids.LocationBase],
) -> Tuple[List[str], List[Tuple[int, int, int]]]:
"""
Location packing implementation for minor version 4. See release notes above.
"""
locTypes = []
locData: List[Tuple[int, int, int]] = []
for loc in locations:
locationType = LOCATION_TYPE_LABELS[type(loc)]
if loc is None:
locDatum = [(0.0, 0.0, 0.0)]
elif type(loc) is grids.IndexLocation:
locDatum = [loc.getCompleteIndices()]
elif type(loc) is grids.CoordinateLocation:
# CoordinateLocations do not implement getCompleteIndices properly, and we
# do not really have a motivation to store them as we do with index
# locations.
locDatum = [loc.indices]
elif type(loc) is grids.MultiIndexLocation:
locationType += f"{len(loc)}"
locDatum = [subloc.indices for subloc in loc]
else:
raise ValueError(f"Invalid location type: {loc}")
locTypes.append(locationType)
locData.extend(locDatum)
return locTypes, locData
def _unpackLocations(locationTypes, locData, minorVersion: int = DB_MINOR):
"""
Convert location data as read from DB back into data structure for building reactor model.
location and locationType will only have different lengths when multiindex locations
are used.
"""
if minorVersion < 3:
return _unpackLocationsV1(locationTypes, locData)
else:
return _unpackLocationsV2(locationTypes, locData)
def _unpackLocationsV1(locationTypes, locData):
"""Delete when reading v <=3.2 DB's no longer wanted."""
locsIter = iter(locData)
unpackedLocs = []
for lt in locationTypes:
if lt == "None":
loc = next(locsIter)
unpackedLocs.append(None)
elif lt == "IndexLocation":
loc = next(locsIter)
# the data is stored as float, so cast back to int
unpackedLocs.append(tuple(int(i) for i in loc))
else:
loc = next(locsIter)
unpackedLocs.append(tuple(loc))
return unpackedLocs
def _unpackLocationsV2(locationTypes, locData):
"""
Location unpacking implementation for minor version 3+. See release notes above.
"""
locsIter = iter(locData)
unpackedLocs = []
for lt in locationTypes:
if lt == LOC_NONE:
loc = next(locsIter)
unpackedLocs.append(None)
elif lt == LOC_INDEX:
loc = next(locsIter)
# the data is stored as float, so cast back to int
unpackedLocs.append(tuple(int(i) for i in loc))
elif lt == LOC_COORD:
loc = next(locsIter)
unpackedLocs.append(tuple(loc))
elif lt.startswith(LOC_MULTI):
# extract number of sublocations from e.g. "M:345" string.
numSubLocs = int(lt.split(":")[1])
multiLocs = []
for _ in range(numSubLocs):
subLoc = next(locsIter)
# All multiindexes sublocs are index locs
multiLocs.append(tuple(int(i) for i in subLoc))
unpackedLocs.append(multiLocs)
else:
raise ValueError(f"Read unknown location type {lt}. Invalid DB.")
return unpackedLocs
[docs]class Layout:
"""
The Layout class describes the hierarchical layout of the composite Reactor model in a flat representation.
A Layout is built up by starting at the root of a composite tree and recursively
appending each node in the tree to the list of data. So for a typical Reactor model,
the data will be ordered by depth-first search: [r, c, a1, a1b1, a1b1c1, a1b1c2, a1b2,
a1b2c1, ..., a2, ...].
The layout is also responsible for storing Component attributes, like location,
material, and temperatures (from blueprints), which aren't stored as Parameters.
Temperatures, specifically, are rather complicated beasts in ARMI, and more
fundamental changes to how we deal with them may allow us to remove them from
Layout.
Notes
-----
As this format is liable to be consumed by other code, it is important to specify
its structure so that code attempting to read/write Layouts can make safe
assumptions. Below is a list of things to be aware of. More will be added as issues
arise or things become more precise:
* Elements in Layout are stored in depth-first order. This permits use of
algorithms such as Pre-Order Tree Traversal for efficient traversal of regions of
the model.
* ``indexInData`` increases monotonically within each object ``type``. This means
that, for instance, the data for all ``HexBlock`` children of a given parent
are stored contiguously within the ``HexBlock`` group, and will not be
interleaved with data from the ``HexBlock`` children of any of the parent's
siblings.
* Aside from the hierarchy itself, there is no guarantee what order objects are
stored in the layout. "`The` ``Core``" is not necessarily the first child of the
``Reactor``, and is not guaranteed to use the zeroth grid.
"""
def __init__(self, version: Tuple[int, int], h5group=None, comp=None):
self.type: List[str] = []
self.name: List[str] = []
self.serialNum: List[int] = []
# The index into the parameter datasets corresponding to each object's class.
# E.g., the 5th HexBlock object in the tree would get 5; to look up its
# "someParameter" value, you would extract cXXnYY/HexBlock/someParameter[5].
self.indexInData: List[int] = []
# The number of direct children this object has.
self.numChildren: List[int] = []
# The type of location that specifies the object's physical location; see the
# associated pack/unpackLocation functions for more information about how
# locations are handled.
self.locationType: List[str] = []
# There is a minor asymmetry here in that before writing to the DB, this is
# truly a flat list of tuples. However when reading, this may contain lists of
# tuples, which represent MI locations. This comes from the fact that we map the
# tuples to Location objects in Database3._compose, but map from Locations to
# tuples in Layout._createLayout. Ideally we would handle both directions in the
# same place so this can be less surprising. Resolving this would require
# changing the interface of the various pack/unpack functions, which have
# multiple versions, so the update would need to be done with care.
self.location: List[Tuple[int, int, int]] = []
# Which grid, as stored in the database, this object uses to arrange its
# children
self.gridIndex: List[int] = []
self.temperatures: List[float] = []
self.material: List[str] = []
# Used to cache all of the spatial locators so that we can pack them all at
# once. The benefit here is that the version checking can happen up front and
# less branching down below
self._spatialLocators: List[grids.LocationBase] = []
# set of grid parameters that have been seen in _createLayout. For efficient
# checks for uniqueness
self._seenGridParams: Dict[Any, Any] = dict()
# actual list of grid parameters, with stable order for safe indexing
self.gridParams: List[Any] = []
self.version = version
self.groupedComps: Dict[
Type[ArmiObject], List[ArmiObject]
] = collections.defaultdict(list)
# it should be noted, one of the two inputs must be non-None: comp/h5group
if comp is not None:
self._createLayout(comp)
self.locationType, self.location = _packLocations(self._spatialLocators)
else:
self._readLayout(h5group)
self._snToLayoutIndex = {sn: i for i, sn in enumerate(self.serialNum)}
def __getitem__(self, sn):
layoutIndex = self._snToLayoutIndex[sn]
return (
self.type[layoutIndex],
self.name[layoutIndex],
self.serialNum[layoutIndex],
self.indexInData[layoutIndex],
self.numChildren[layoutIndex],
self.locationType[layoutIndex],
self.location[layoutIndex],
self.temperatures[layoutIndex],
self.material[layoutIndex],
)
def _createLayout(self, comp):
"""
Populate a hierarchical representation and group the reactor model items by type.
This is used when writing a reactor model to the database.
Notes
-----
This is recursive.
See Also
--------
_readLayout : does the opposite
"""
compList = self.groupedComps[type(comp)]
compList.append(comp)
self.type.append(comp.__class__.__name__)
self.name.append(comp.name)
self.serialNum.append(comp.p.serialNum)
self.indexInData.append(len(compList) - 1)
self.numChildren.append(len(comp))
# determine how many components have been read in, to set the grid index
if comp.spatialGrid is not None:
gridType = type(comp.spatialGrid).__name__
gridParams = (gridType, comp.spatialGrid.reduce())
if gridParams not in self._seenGridParams:
self._seenGridParams[gridParams] = len(self.gridParams)
self.gridParams.append(gridParams)
self.gridIndex.append(self._seenGridParams[gridParams])
else:
self.gridIndex.append(None)
self._spatialLocators.append(comp.spatialLocator)
# set the materials and temperatures
try:
self.temperatures.append((comp.inputTemperatureInC, comp.temperatureInC))
self.material.append(comp.material.__class__.__name__)
except:
self.temperatures.append((-900, -900)) # an impossible temperature
self.material.append("")
try:
comps = sorted(list(comp))
except ValueError:
runLog.error(
"Failed to sort some collection of ArmiObjects for database output: {} "
"value {}".format(type(comp), list(comp))
)
raise
# depth-first search recursion of all components
for c in comps:
self._createLayout(c)
def _readLayout(self, h5group):
"""
Populate a hierarchical representation and group the reactor model items by type.
This is used when reading a reactor model from a database.
See Also
--------
_createLayout : does the opposite
"""
try:
# location is either an index, or a point
# iter over list is faster
locations = h5group["layout/location"][:].tolist()
self.locationType = numpy.char.decode(
h5group["layout/locationType"][:]
).tolist()
self.location = _unpackLocations(
self.locationType, locations, self.version[1]
)
self.type = numpy.char.decode(h5group["layout/type"][:])
self.name = numpy.char.decode(h5group["layout/name"][:])
self.serialNum = h5group["layout/serialNum"][:]
self.indexInData = h5group["layout/indexInData"][:]
self.numChildren = h5group["layout/numChildren"][:]
self.material = numpy.char.decode(h5group["layout/material"][:])
self.temperatures = h5group["layout/temperatures"][:]
self.gridIndex = replaceNonsenseWithNones(
h5group["layout/gridIndex"][:], "layout/gridIndex"
)
gridGroup = h5group["layout/grids"]
gridTypes = [t.decode() for t in gridGroup["type"][:]]
self.gridParams = []
for iGrid, gridType in enumerate(gridTypes):
thisGroup = gridGroup[str(iGrid)]
unitSteps = thisGroup["unitSteps"][:]
bounds = []
for ibound in range(3):
boundName = "bounds_{}".format(ibound)
if boundName in thisGroup:
bounds.append(thisGroup[boundName][:])
else:
bounds.append(None)
unitStepLimits = thisGroup["unitStepLimits"][:]
offset = thisGroup["offset"][:] if thisGroup.attrs["offset"] else None
geomType = (
thisGroup["geomType"].asstr()[()]
if "geomType" in thisGroup
else None
)
symmetry = (
thisGroup["symmetry"].asstr()[()]
if "symmetry" in thisGroup
else None
)
self.gridParams.append(
(
gridType,
grids.GridParameters(
unitSteps,
bounds,
unitStepLimits,
offset,
geomType,
symmetry,
),
)
)
except KeyError as e:
runLog.error(
"Failed to get layout information from group: {}".format(h5group.name)
)
raise e
def _initComps(self, caseTitle, bp):
comps = []
groupedComps = collections.defaultdict(list)
for (
compType,
name,
serialNum,
numChildren,
location,
material,
temperatures,
gridIndex,
) in zip(
self.type,
self.name,
self.serialNum,
self.numChildren,
self.location,
self.material,
self.temperatures,
self.gridIndex,
):
Klass = ArmiObject.TYPES[compType]
if issubclass(Klass, Reactor):
comp = Klass(caseTitle, bp)
elif issubclass(Klass, Core):
comp = Klass(name)
elif issubclass(Klass, Component):
# XXX: initialize all dimensions to 0, they will be loaded and assigned
# after load
kwargs = dict.fromkeys(Klass.DIMENSION_NAMES, 0)
kwargs["material"] = material
kwargs["name"] = name
kwargs["Tinput"] = temperatures[0]
kwargs["Thot"] = temperatures[1]
comp = Klass(**kwargs)
else:
comp = Klass(name)
if gridIndex is not None:
gridParams = self.gridParams[gridIndex]
comp.spatialGrid = GRID_CLASSES[gridParams[0]](
*gridParams[1], armiObject=comp
)
comps.append((comp, serialNum, numChildren, location))
groupedComps[compType].append(comp)
return comps, groupedComps
[docs] def writeToDB(self, h5group):
if "layout/type" in h5group:
# It looks like we have already written the layout to DB, skip for now
return
try:
h5group.create_dataset(
"layout/type",
data=numpy.array(self.type).astype("S"),
compression="gzip",
)
h5group.create_dataset(
"layout/name",
data=numpy.array(self.name).astype("S"),
compression="gzip",
)
h5group.create_dataset(
"layout/serialNum", data=self.serialNum, compression="gzip"
)
h5group.create_dataset(
"layout/indexInData", data=self.indexInData, compression="gzip"
)
h5group.create_dataset(
"layout/numChildren", data=self.numChildren, compression="gzip"
)
h5group.create_dataset(
"layout/location", data=self.location, compression="gzip"
)
h5group.create_dataset(
"layout/locationType",
data=numpy.array(self.locationType).astype("S"),
compression="gzip",
)
h5group.create_dataset(
"layout/material",
data=numpy.array(self.material).astype("S"),
compression="gzip",
)
h5group.create_dataset(
"layout/temperatures", data=self.temperatures, compression="gzip"
)
h5group.create_dataset(
"layout/gridIndex",
data=replaceNonesWithNonsense(
numpy.array(self.gridIndex), "layout/gridIndex"
),
compression="gzip",
)
gridsGroup = h5group.create_group("layout/grids")
gridsGroup.attrs["nGrids"] = len(self.gridParams)
gridsGroup.create_dataset(
"type", data=numpy.array([gp[0] for gp in self.gridParams]).astype("S")
)
for igrid, gridParams in enumerate(gp[1] for gp in self.gridParams):
thisGroup = gridsGroup.create_group(str(igrid))
thisGroup.create_dataset("unitSteps", data=gridParams.unitSteps)
for ibound, bound in enumerate(gridParams.bounds):
if bound is not None:
bound = numpy.array(bound)
thisGroup.create_dataset("bounds_{}".format(ibound), data=bound)
thisGroup.create_dataset(
"unitStepLimits", data=gridParams.unitStepLimits
)
offset = gridParams.offset
thisGroup.attrs["offset"] = offset is not None
if offset is not None:
thisGroup.create_dataset("offset", data=offset)
thisGroup.create_dataset("geomType", data=gridParams.geomType)
thisGroup.create_dataset("symmetry", data=gridParams.symmetry)
except RuntimeError:
runLog.error("Failed to create datasets in: {}".format(h5group))
raise
[docs] @staticmethod
def computeAncestors(serialNum, numChildren, depth=1) -> List[Optional[int]]:
"""
Return a list containing the serial number of the parent corresponding to each
object at the given depth.
Depth in this case means how many layers to reach up to find the desired
ancestor. A depth of 1 will yield the direct parent of each element, depth of 2
would yield the elemen's parent's parent, and so on.
The zero-th element will always be None, as the first object is the root element
and so has no parent. Subsequent depths will result in more Nones.
This function is useful for forming a lightweight sense of how the database
contents stitch together, without having to go to the trouble of fully unpacking
the Reactor model.
Parameters
----------
serialNum : List of int
List of serial numbers for each object/element, as laid out in Layout
numChildren : List of int
List of numbers of children for each object/element, as laid out in Layout
Note
----
This is not using a recursive approach for a couple of reasons. First, the
iterative form isn't so bad; we just need two stacks. Second, the interface of
the recursive function would be pretty unwieldy. We are progressively
consuming two lists, of which we would need to keep passing down with an
index/cursor, or progressively slice them as we go, which would be pretty
inefficient.
"""
ancestors: List[Optional[int]] = [None]
snStack = [serialNum[0]]
ncStack = [numChildren[0]]
for sn, nc in zip(serialNum[1:], numChildren[1:]):
ncStack[-1] -= 1
if nc > 0:
ancestors.append(snStack[-1])
snStack.append(sn)
ncStack.append(nc)
else:
ancestors.append(snStack[-1])
while ncStack and ncStack[-1] == 0:
snStack.pop()
ncStack.pop()
if depth > 1:
# handle deeper scenarios. This is a bit tricky. Store the original
# ancestors for the first generation, since that ultimately contains all of
# the information that we need. Then in a loop, keep hopping one more layer
# of indirection, and indexing into the corresponding locaition in the
# original ancestor array
indexMap = {sn: i for i, sn in enumerate(serialNum)}
origAncestors = ancestors
for _ in range(depth - 1):
ancestors = [
origAncestors[indexMap[ia]] if ia is not None else None
for ia in ancestors
]
return ancestors
[docs]def allSubclasses(cls):
"""This currently include Materials... and it should not."""
return set(cls.__subclasses__()).union(
[s for c in cls.__subclasses__() for s in allSubclasses(c)]
)
# TODO: This will likely become an issue with extensibility via plugins. There are a
# couple of options to resolve this:
# - Perform this operation each time we make a Layout. Wasteful, but robust
# - Scrape all of these names off of a set of Composites that register with a base
# metaclass. Less wasteful, but probably equally robust. Downside is it's metaclassy
# and Madjickal.
GRID_CLASSES = {c.__name__: c for c in allSubclasses(grids.Grid)}
GRID_CLASSES["Grid"] = grids.Grid
NONE_MAP = {float: float("nan"), str: "<!None!>"}
# XXX: we're going to assume no one assigns min(int)+2 as a meaningful value
NONE_MAP.update(
{
intType: numpy.iinfo(intType).min + 2
for intType in (
int,
numpy.int8,
numpy.int16,
numpy.int32,
numpy.int64,
)
}
)
NONE_MAP.update(
{
intType: numpy.iinfo(intType).max - 2
for intType in (
numpy.uint,
numpy.uint8,
numpy.uint16,
numpy.uint32,
numpy.uint64,
)
}
)
NONE_MAP.update({floatType: floatType("nan") for floatType in (float, numpy.float64)})
[docs]def packSpecialData(
data: numpy.ndarray, paramName: str
) -> Tuple[Optional[numpy.ndarray], Dict[str, Any]]:
"""
Reduce data that wouldn't otherwise play nicely with HDF5/numpy arrays to a format
that will.
This is the main entry point for conforming "strange" data into something that will
both fit into a numpy array/HDF5 dataset, and be recoverable to its original-ish
state when reading it back in. This is accomplished by detecting a handful of known
offenders and using various HDF5 attributes to store necessary auxiliary data. It is
important to keep in mind that the data that is passed in has already been converted
to a numpy array, so the top dimension is always representing the collection of
composites that are storing the parameters. For instance, if we are dealing with a
Block parameter, the first index in the numpy array of data is the block index; so
if each block has a parameter that is a dictionary, ``data`` would be a ndarray,
where each element is a dictionary. This routine supports a number of different
"strange" things:
* Dict[str, float]: These are stored by finding the set of all keys for all
instances, and storing those keys as a list in an attribute. The data themselves
are stored as arrays indexed by object, then key index. Dictionaries lacking data
for a key store a nan in it's place. This will work well in instances where most
objects have data for most keys.
* Jagged arrays: These are stored by concatenating all of the data into a single,
one-dimensional array, and storing attributes to describe the shapes of each
object's data, and an offset into the beginning of each object's data.
* Arrays with ``None`` in them: These are stored by replacing each instance of
``None`` with a magical value that shouldn't be encountered in realistic
scenarios.
Parameters
----------
data
An ndarray storing the data that we want to stuff into the database. These are
usually dtype=Object, which is how we usually end up here in the first place.
paramName
The parameter name that we are trying to store data for. This is mostly used for
diagnostics.
See Also
--------
unpackSpecialData
"""
# Check to make sure that we even need to do this. If the numpy data type is
# not "O", chances are we have nice, clean data.
if data.dtype != "O":
return data, {}
attrs: Dict[str, Any] = {"specialFormatting": True}
# make a copy of the data, so that the original is unchanged
data = copy.copy(data)
# find locations of Nones. The below works for ndarrays, whereas `data == None`
# gives a single True/False value
nones = numpy.where([d is None for d in data])[0]
if len(nones) == data.shape[0]:
# Everything is None, so why bother?
return None, attrs
if len(nones) > 0:
attrs["nones"] = True
# XXX: this whole if/then/elif/else can be optimized by looping once and then
# determining the correct action
# A robust solution would need
# to do this on a case-by-case basis, and re-do it any time we want to
# write, since circumstances may change. Not only that, but we may need
# to do perform more that one of these operations to get to an array
# that we want to put in the database.
if any(isinstance(d, dict) for d in data):
# we're assuming that a dict is {str: float}. We store the union of
# all of the keys for all of the objects as a special "keys"
# attribute, and store a value for all of those keys for all
# objects, whether or not there is actually data associated with
# that key (storing a nan when no data). This makes for a simple
# approach that is somewhat digestible just looking at the db, and
# should be quite efficient in the case where most objects have data
# for most keys.
attrs["dict"] = True
keys = sorted({k for d in data for k in d})
data = numpy.array([[d.get(k, numpy.nan) for k in keys] for d in data])
if data.dtype == "O":
# The data themselves are nasty. We could support this, but best to wait for
# a credible use case.
raise TypeError(
"Unable to coerce dictionary data into usable numpy array for "
"{}".format(paramName)
)
attrs["keys"] = numpy.array(keys).astype("S")
return data, attrs
# conform non-numpy arrays to numpy
for i, val in enumerate(data):
if isinstance(val, (list, tuple)):
data[i] = numpy.array(val)
if not any(isinstance(d, numpy.ndarray) for d in data):
# looks like 1-D plain-old-data
data = replaceNonesWithNonsense(data, paramName, nones)
return data, attrs
# check if data is jagged
candidate = next((d for d in data if d is not None))
shape = candidate.shape
ndim = candidate.ndim
isJagged = (
not all(d.shape == shape for d in data if d is not None) or candidate.size == 0
)
if isJagged:
assert all(
val.ndim == ndim for val in data if val is not None
), "Inconsistent dimensions in jagged array for: {}\nDimensions: {}".format(
paramName, [val.ndim for val in data if val is not None]
)
attrs["jagged"] = True
# offsets[i] is the index of the zero-th element of sub-array i
offsets = numpy.array(
[0]
+ list(
itertools.accumulate(val.size if val is not None else 0 for val in data)
)[:-1]
)
# shapes[i] is the shape of the i-th sub-array. Nones are represented by all
# zeros
shapes = numpy.array(
list(val.shape if val is not None else ndim * (0,) for val in data)
)
data = numpy.delete(data, nones)
data = numpy.concatenate(data, axis=None)
attrs["offsets"] = offsets
attrs["shapes"] = shapes
attrs["noneLocations"] = nones
return data, attrs
if any(isinstance(d, (tuple, list, numpy.ndarray)) for d in data):
data = replaceNonesWithNonsense(data, paramName, nones)
return data, attrs
if len(nones) == 0:
raise TypeError(
"Cannot write {} to the database, it did not resolve to a numpy/HDF5 "
"type.".format(paramName)
)
runLog.error("Data unable to find special none value: {}".format(data))
raise TypeError("Failed to process special data for {}".format(paramName))
[docs]def unpackSpecialData(data: numpy.ndarray, attrs, paramName: str) -> numpy.ndarray:
"""
Extract data from a specially-formatted HDF5 dataset into a numpy array.
This should invert the operations performed by :py:func:`packSpecialData`.
Parameters
----------
data
Specially-formatted data array straight from the database.
attrs
The attributes associated with the dataset that contained the data.
paramName
The name of the parameter that is being unpacked. Only used for diagnostics.
Returns
-------
numpy.ndarray
An ndarray containing the closest possible representation of the data that was
originally written to the database.
See Also
--------
packSpecialData
"""
if not attrs.get("specialFormatting", False):
# The data were not subjected to any special formatting; short circuit.
assert data.dtype != "O"
return data
unpackedData: List[Any]
if attrs.get("nones", False) and not attrs.get("jagged", False):
data = replaceNonsenseWithNones(data, paramName)
return data
if attrs.get("jagged", False):
offsets = attrs["offsets"]
shapes = attrs["shapes"]
ndim = len(shapes[0])
emptyArray = numpy.ndarray(ndim * (0,), dtype=data.dtype)
unpackedJaggedData: List[Optional[numpy.ndarray]] = []
for offset, shape in zip(offsets, shapes):
if tuple(shape) == ndim * (0,):
# Start with an empty array. This may be replaced with a None later
unpackedJaggedData.append(emptyArray)
else:
unpackedJaggedData.append(
numpy.ndarray(shape, dtype=data.dtype, buffer=data[offset:])
)
for i in attrs["noneLocations"]:
unpackedJaggedData[i] = None
return numpy.array(unpackedJaggedData, dtype=object)
if attrs.get("dict", False):
keys = numpy.char.decode(attrs["keys"])
unpackedData = []
assert data.ndim == 2
for d in data:
unpackedData.append(
{key: value for key, value in zip(keys, d) if not numpy.isnan(value)}
)
return numpy.array(unpackedData)
raise ValueError(
"Do not recognize the type of special formatting that was applied "
"to {}. Attrs: {}".format(paramName, {k: v for k, v in attrs.items()})
)
[docs]def replaceNonsenseWithNones(data: numpy.ndarray, paramName: str) -> numpy.ndarray:
"""
Replace special nonsense values with ``None``.
This essentially reverses the operations performed by
:py:func:`replaceNonesWithNonsense`.
Parameters
----------
data
The array from the database that contains special ``None`` nonsense values.
paramName
The param name who's data we are dealing with. Only used for diagnostics.
See Also
--------
replaceNonesWithNonsense
"""
# TODO: This is super closely-related to the NONE_MAP collection, and should
# probably use it somehow.
if numpy.issubdtype(data.dtype, numpy.floating):
isNone = numpy.isnan(data)
elif numpy.issubdtype(data.dtype, numpy.integer):
isNone = data == numpy.iinfo(data.dtype).min + 2
elif numpy.issubdtype(data.dtype, numpy.str_):
isNone = data == "<!None!>"
else:
raise TypeError(
"Unable to resolve values that should be None for `{}`".format(paramName)
)
if data.ndim > 1:
result = numpy.ndarray(data.shape[0], dtype=numpy.dtype("O"))
for i in range(data.shape[0]):
if isNone[i].all():
result[i] = None
elif isNone[i].any():
# TODO: This is not symmetric with the replaceNonesWithNonsense impl.
# That one assumes that Nones apply only at the highest dimension, and
# that the lower dimensions will be filled with the magic None value.
# Non-none entries below the top level fail to coerce to a serializable
# numpy array and would raise an exception when trying to write. TL;DR:
# this is a dead branch until the replaceNonesWithNonsense impl is more
# sophisticated.
result[i] = numpy.array(data[i], dtype=numpy.dtype("O"))
result[i][isNone[i]] = None
else:
result[i] = data[i]
else:
result = numpy.ndarray(data.shape, dtype=numpy.dtype("O"))
result[:] = data
result[isNone] = None
return result
[docs]def replaceNonesWithNonsense(
data: numpy.ndarray, paramName: str, nones: numpy.ndarray = None
) -> numpy.ndarray:
"""
Replace instances of ``None`` with nonsense values that can be detected/recovered
when reading.
Parameters
----------
data
The numpy array containing ``None`` values that need to be replaced.
paramName
The name of the parameter who's data we are treating. Only used for diagnostics.
nones
An array containing the index locations on the ``None`` elements. It is a little
strange to pass these, in but we find these indices to determine whether we need
to call this function in the first place, so might as well pass it in, so that
we don't need to perform the operation again.
Notes
-----
This only supports situations where the data is a straight-up ``None``, or a valid,
database-storable numpy array (or easily convertable to one (e.g. tuples/lists with
numerical values)). This does not support, for instance, a numpy ndarray with some
Nones in it.
For example, the following is supported::
[[1, 2, 3], None, [7, 8, 9]]
However, the following is not::
[[1, 2, 3], [4, None, 6], [7, 8, 9]]
See Also
--------
replaceNonsenseWithNones
Reverses this operation.
"""
if nones is None:
nones = numpy.where([d is None for d in data])[0]
try:
# loop to find what the default value should be. This is the first non-None
# value that we can find.
defaultValue = None
realType = None
val = None
for val in data:
if isinstance(val, numpy.ndarray):
# if multi-dimensional, val[0] could still be an array, val.flat is
# a flattened iterator, so next(val.flat) gives the first value in
# an n-dimensional array
realType = type(next(val.flat))
if realType is type(None):
continue
defaultValue = numpy.reshape(
numpy.repeat(NONE_MAP[realType], val.size), val.shape
)
break
else:
realType = type(val)
if realType is type(None):
continue
defaultValue = NONE_MAP[realType]
break
else:
# Couldn't find any non-None entries, so it really doesn't matter what type we
# use. Using float, because NaN is nice.
realType = float
defaultValue = NONE_MAP[realType]
if isinstance(val, numpy.ndarray):
data = numpy.array([d if d is not None else defaultValue for d in data])
else:
data[nones] = defaultValue
except Exception as ee:
runLog.error(
"Error while attempting to determine default for {}.\nvalue: {}\nError: {}".format(
paramName, val, ee
)
)
raise TypeError(
"Could not determine None replacement for {} with type {}, val {}, default {}".format(
paramName, realType, val, defaultValue
)
)
try:
data = data.astype(realType)
except:
raise ValueError(
"Could not coerce data for {} to {}, data:\n{}".format(
paramName, realType, data
)
)
if data.dtype.kind == "O":
raise TypeError(
"Failed to convert data to valid HDF5 type {}, data:{}".format(
paramName, data
)
)
return data
def _writeAttrs(obj, group, attrs):
"""
Handle safely writing attributes to a dataset, handling large data if necessary.
This will attempt to store attributes directly onto an HDF5 object if possible,
falling back to proper datasets and reference attributes if necessary. This is
needed because HDF5 tries to fit attributes into the object header, which has
limited space. If an attribute is too large, h5py raises a RuntimeError.
In such cases, this will store the attribute data in a proper dataset and
place a reference to that dataset in the attribute instead.
In practice, this takes ``linkedDims`` attrs from a particular component type (like
``c00n00/Circle/id``) and stores them in new datasets (like
``c00n00/attrs/1_linkedDims``, ``c00n00/attrs/2_linkedDims``) and then sets the
object's attrs to links to those datasets.
"""
for key, value in attrs.items():
try:
obj.attrs[key] = value
except RuntimeError as err:
if "object header message is too large" not in err.args[0]:
raise
runLog.info(
"Storing attribute `{}` for `{}` into it's own dataset within "
"`{}/attrs`".format(key, obj, group)
)
if "attrs" not in group:
attrGroup = group.create_group("attrs")
else:
attrGroup = group["attrs"]
dataName = str(len(attrGroup)) + "_" + key
attrGroup[dataName] = value
# using a soft link here allows us to cheaply copy time nodes without
# needing to crawl through and update object references.
linkName = attrGroup[dataName].name
obj.attrs[key] = "@{}".format(linkName)
def _resolveAttrs(attrs, group):
"""
Reverse the action of _writeAttrs.
This reads actual attrs and looks for the real data
in the datasets that the attrs were pointing to.
"""
resolved = {}
for key, val in attrs.items():
try:
if isinstance(val, h5py.h5r.Reference):
# Old style object reference. If this cannot be dereferenced, it is
# likely because mergeHistory was used to get the current database,
# which does not preserve references.
resolved[key] = group[val]
elif isinstance(val, str):
m = ATTR_LINK.match(val)
if m:
# dereference the path to get the data out of the dataset.
resolved[key] = group[m.group(1)][()]
else:
resolved[key] = val
else:
resolved[key] = val
except ValueError:
runLog.error(f"HDF error loading {key} : {val}\nGroup: {group}")
raise
return resolved
[docs]def collectBlockNumberDensities(blocks) -> Dict[str, numpy.ndarray]:
"""
Collect block-by-block homogenized number densities for each nuclide.
Long ago, composition was stored on block params. No longer; they are on the
component numberDensity params. These block-level params, are still useful to see
compositions in some visualization tools. Rather than keep them on the reactor
model, we dynamically compute them here and slap them in the database. These are
ignored upon reading and will not affect the results.
Remove this once a better viz tool can view composition distributions. Also remove
the try/except in ``_readParams``
"""
nucNames = sorted(list(set(nucName for b in blocks for nucName in b.getNuclides())))
nucBases = [nuclideBases.byName[nn] for nn in nucNames]
# it's faster to loop over blocks first and get all number densities from each
# than it is to get one nuclide at a time from each block because of area fraction
# calculations. So we use some RAM here instead.
nucDensityMatrix = []
for block in blocks:
nucDensityMatrix.append(block.getNuclideNumberDensities(nucNames))
nucDensityMatrix = numpy.array(nucDensityMatrix)
dataDict = dict()
for ni, nb in enumerate(nucBases):
# the nth column is a vector of nuclide densities for this nuclide across all blocks
dataDict[nb.getDatabaseName()] = nucDensityMatrix[:, ni]
return dataDict