Source code for armi.bookkeeping.tests.test_historyTracker

# Copyright 2019 TerraPower, LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

Tests for the history tracker interface.

These tests actually run a jupyter notebook that's in the documentation to build
a valid HDF5 file to load from as a test fixtures. Thus they take a little longer
than usual.
import os
import pathlib
import shutil
import unittest

from armi import settings
from armi import utils
from armi.bookkeeping import historyTracker
from armi.bookkeeping.tests._constants import TUTORIAL_FILES
from armi.cases import case
from armi.context import ROOT
from armi.reactor import blocks
from armi.reactor import grids
from armi.reactor.flags import Flags
from armi.tests import ArmiTestHelper
from armi.utils.directoryChangers import TemporaryDirectoryChanger

CASE_TITLE = "anl-afci-177"
THIS_DIR = os.path.dirname(__file__)  # b/c tests don't run in this folder
TUTORIAL_DIR = os.path.join(ROOT, "tests", "tutorials")

[docs]def runTutorialNotebook(): import nbformat from nbconvert.preprocessors import ExecutePreprocessor with open("data_model.ipynb") as f: nb =, as_version=4) ep = ExecutePreprocessor(timeout=600, kernel_name="python3") ep.preprocess(nb, {})
[docs]class TestHistoryTracker(ArmiTestHelper): """History tracker tests that require a Reactor Model.""" @classmethod def setUpClass(cls): # Do this work in a temp dir, to avoid race conditions. cls.dirChanger = TemporaryDirectoryChanger() cls.dirChanger.__enter__() os.mkdir("tutorials") os.mkdir(CASE_TITLE) for filePath in TUTORIAL_FILES: dirName = CASE_TITLE if CASE_TITLE in filePath else "tutorials" outFile = os.path.join( cls.dirChanger.destination, dirName, os.path.basename(filePath) ) shutil.copyfile(filePath, outFile) os.chdir(os.path.join(cls.dirChanger.destination, "tutorials")) runTutorialNotebook() @classmethod def tearDownClass(cls): cls.dirChanger.__exit__(None, None, None) def setUp(self): cs = settings.Settings(f"../{CASE_TITLE}/{CASE_TITLE}.yaml") newSettings = {} newSettings["db"] = True newSettings["nCycles"] = 2 newSettings["detailAssemLocationsBOL"] = ["001-001"] newSettings["loadStyle"] = "fromDB" newSettings["reloadDBName"] = pathlib.Path(f"{CASE_TITLE}.h5").absolute() newSettings["startNode"] = 1 cs = cs.modified(newSettings=newSettings) = TemporaryDirectoryChanger() c = case.Case(cs) case2 = c.clone(title="armiRun") self.o = case2.initializeOperator() self.r = self.o.r self.o.getInterface("main").interactBOL() dbi = self.o.getInterface("database") # Get to the database state at the end of stack of time node 1. # The end of the interface stack is when history tracker tends to run. dbi.loadState(0, 1) def tearDown(self): self.o.getInterface("database").database.close() self.r = None self.o = None, None, None)
[docs] def test_calcMGFluence(self): r""" This test confirms that mg flux has many groups when loaded with the history tracker. armi.bookeeping.db.hdf.hdfDB.readBlocksHistory requires historical_values\[historical_indices\] to be cast as a list to read more than the first energy group. This test shows that this behavior is preserved. .. test:: Demonstrate that a parameter stored at differing time nodes can be recovered. :id: T_ARMI_HIST_TRACK0 :tests: R_ARMI_HIST_TRACK """ o = self.o b = o.r.core.childrenByLocator[o.r.core.spatialGrid[0, 0, 0]].getFirstBlock( Flags.FUEL ) bVolume = b.getVolume() bName = # duration is None in this DB hti = o.getInterface("history") timesInYears = [duration or 1.0 for duration in hti.getTimeSteps()] timeStepsToRead = [ utils.getCycleNodeFromCumulativeNode(i, self.o.cs) for i in range(len(timesInYears)) ] hti.preloadBlockHistoryVals([bName], ["mgFlux"], timeStepsToRead) mgFluence = None for ts, years in enumerate(timesInYears): cycle, node = utils.getCycleNodeFromCumulativeNode(ts, self.o.cs) # b.p.mgFlux is vol integrated mgFlux = hti.getBlockHistoryVal(bName, "mgFlux", (cycle, node)) / bVolume timeInSec = years * 365 * 24 * 3600 if mgFluence is None: mgFluence = timeInSec * mgFlux else: mgFluence += timeInSec * mgFlux self.assertTrue(len(mgFluence) > 1, "mgFluence should have more than 1 group") # test that unloadBlockHistoryVals() is working self.assertIsNotNone(hti._preloadedBlockHistory) hti.unloadBlockHistoryVals() self.assertIsNone(hti._preloadedBlockHistory)
[docs] def test_historyParameters(self): """Retrieve various paramaters from the history. .. test:: Demonstrate that various parameters stored at differing time nodes can be recovered. :id: T_ARMI_HIST_TRACK1 :tests: R_ARMI_HIST_TRACK """ o = self.o b = o.r.core.childrenByLocator[o.r.core.spatialGrid[0, 0, 0]].getFirstBlock( Flags.FUEL ) b.getVolume() bName = # duration is None in this DB hti = o.getInterface("history") timesInYears = [duration or 1.0 for duration in hti.getTimeSteps()] timeStepsToRead = [ utils.getCycleNodeFromCumulativeNode(i, self.o.cs) for i in range(len(timesInYears)) ] hti.preloadBlockHistoryVals([bName], ["power"], timeStepsToRead) # read some parameters params = {} for param in ["height", "pdens", "power"]: params[param] = [] for ts, years in enumerate(timesInYears): cycle, node = utils.getCycleNodeFromCumulativeNode(ts, self.o.cs) params[param].append( hti.getBlockHistoryVal(bName, param, (cycle, node)) ) # verify the height parameter doesn't change over time self.assertGreater(params["height"][0], 0) self.assertEqual(params["height"][0], params["height"][1]) # verify the power parameter is retrievable from the history self.assertEqual(o.cs["power"], 1000000000.0) self.assertAlmostEqual(params["power"][0], 360, delta=0.1) # assembly was moved to the central location with 1/3rd symmetry self.assertEqual(params["power"][0] / 3, params["power"][1]) # verify the power density parameter is retrievable from the history self.assertAlmostEqual(params["pdens"][0], 0.0785, delta=0.001) self.assertEqual(params["pdens"][0], params["pdens"][1]) # test that unloadBlockHistoryVals() is working self.assertIsNotNone(hti._preloadedBlockHistory) hti.unloadBlockHistoryVals() self.assertIsNone(hti._preloadedBlockHistory)
[docs] def test_historyReport(self): """ Test generation of history report. This does a swap for 5 timesteps:: | TS 0 1 2 3 4 |LOC (1,1) (2,1) (3,1) (4,1) SFP """ history = self.o.getInterface("history") history.interactBOL() history.interactEOL() testLoc = self.o.r.core.spatialGrid[0, 0, 0] testAssem = self.o.r.core.childrenByLocator[testLoc] fileName = history._getAssemHistoryFileName(testAssem) actualFilePath = os.path.join(THIS_DIR, fileName) expectedFileName = os.path.join(THIS_DIR, fileName.replace(".txt", "-ref.txt")) # copy from fast path so the file is retrievable. shutil.move(fileName, os.path.join(THIS_DIR, fileName)) self.compareFilesLineByLine(expectedFileName, actualFilePath) # test that detailAssemblyNames() is working self.assertEqual(len(history.detailAssemblyNames), 1) history.addAllDetailedAssems() self.assertEqual(len(history.detailAssemblyNames), 54)
[docs] def test_getBlockInAssembly(self): history = self.o.getInterface("history") aFuel = self.o.r.core.getFirstAssembly(Flags.FUEL) b = history._getBlockInAssembly(aFuel) self.assertGreater(b.p.height, 1.0) self.assertEqual(b.getType(), "fuel") with self.assertRaises(RuntimeError): aShield = self.o.r.core.getFirstAssembly(Flags.SHIELD) history._getBlockInAssembly(aShield)
[docs]class TestHistoryTrackerNoModel(unittest.TestCase): """History tracker tests that do not require a Reactor Model.""" def setUp(self): cs = settings.Settings() self.history = historyTracker.HistoryTrackerInterface(None, cs=cs) self._origCaseTitle = ( self.history.cs.caseTitle ) # to avoid parallel test interference. self.history.cs.caseTitle = self._testMethodName + self._origCaseTitle def tearDown(self): self.history.cs.caseTitle = self._origCaseTitle
[docs] def test_timestepFiltering(self): times = range(30) self.history.cs = self.history.cs.modified(newSettings={"burnSteps": 2}) inputs = [ {"boc": True}, {"moc": True}, {"eoc": True}, {"boc": True, "eoc": True}, ] results = [ [0, 3, 6, 9, 12, 15, 18, 21, 24, 27], [1, 4, 7, 10, 13, 16, 19, 22, 25, 28], [2, 5, 8, 11, 14, 17, 20, 23, 26, 29], [0, 2, 3, 5, 6, 8, 9, 11, 12, 14, 15, 17, 18, 20, 21, 23, 24, 26, 27, 29], ] for i, expectedResults in zip(inputs, results): runResults = self.history.filterTimeIndices(times, **i) self.assertEqual(runResults, expectedResults)
[docs] def test_timestepFilteringWithGap(self): times = list(range(10)) + list(range(15, 20)) self.history.cs = self.history.cs.modified(newSettings={"burnSteps": 2}) runResults = self.history.filterTimeIndices(times, boc=True) self.assertEqual(runResults, [0, 3, 6, 9, 15, 18])
[docs] def test_blockName(self): block = blocks.HexBlock("blockName") block.spatialLocator = grids.IndexLocation(0, 0, 7, None) self.assertEqual( self.history._getBlockHistoryFileName(block), "{}-blockName7-bHist.txt".format(self.history.cs.caseTitle), )