# Copyright 2023 TerraPower, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests of the MPI portion of the Parameters class."""
from distutils.spawn import find_executable
import unittest
from armi import context
from armi.reactor import composites
from armi.reactor import parameters
# determine if this is a parallel run, and MPI is installed
MPI_EXE = None
if find_executable("mpiexec.exe") is not None:
MPI_EXE = "mpiexec.exe"
elif find_executable("mpiexec") is not None:
MPI_EXE = "mpiexec"
[docs]class MockSyncPC(parameters.ParameterCollection):
pDefs = parameters.ParameterDefinitionCollection()
with pDefs.createBuilder(
default=0.0, location=parameters.ParamLocation.AVERAGE
) as pb:
pb.defParam("param1", "units", "p1 description", categories=["cat1"])
pb.defParam("param2", "units", "p2 description", categories=["cat2"])
pb.defParam("param3", "units", "p3 description", categories=["cat3"])
[docs]def makeComp(name):
"""Helper method for MPI sync tests: mock up a Composite with a minimal param collections."""
c = composites.Composite(name)
c.p = MockSyncPC()
return c
[docs]class SynchronizationTests(unittest.TestCase):
"""Some tests that must be run with mpirun instead of the standard unittest system."""
def setUp(self):
self.r = makeComp("reactor")
self.r.core = makeComp("core")
self.r.add(self.r.core)
for ai in range(context.MPI_SIZE * 3):
a = makeComp("assembly{}".format(ai))
self.r.core.add(a)
for bi in range(3):
a.add(makeComp("block{}-{}".format(ai, bi)))
self.comps = [self.r.core] + self.r.core.getChildren(deep=True)
[docs] @unittest.skipIf(context.MPI_SIZE <= 1 or MPI_EXE is None, "Parallel test only")
def test_noConflicts(self):
"""Make sure sync works across processes.
.. test:: Synchronize a reactor's state across processes.
:id: T_ARMI_CMP_MPI0
:tests: R_ARMI_CMP_MPI
"""
_syncCount = self.r.syncMpiState()
for ci, comp in enumerate(self.comps):
if ci % context.MPI_SIZE == context.MPI_RANK:
comp.p.param1 = (context.MPI_RANK + 1) * 30.0
else:
self.assertNotEqual((context.MPI_RANK + 1) * 30.0, comp.p.param1)
syncCount = self.r.syncMpiState()
self.assertEqual(len(self.comps), syncCount)
for ci, comp in enumerate(self.comps):
self.assertEqual((ci % context.MPI_SIZE + 1) * 30.0, comp.p.param1)
[docs] @unittest.skipIf(context.MPI_SIZE <= 1 or MPI_EXE is None, "Parallel test only")
def test_withConflicts(self):
"""Test conflicts arise correctly if we force a conflict.
.. test:: Raise errors when there are conflicts across processes.
:id: T_ARMI_CMP_MPI1
:tests: R_ARMI_CMP_MPI
"""
self.r.core.p.param1 = (context.MPI_RANK + 1) * 99.0
with self.assertRaises(ValueError):
self.r.syncMpiState()
[docs] @unittest.skipIf(context.MPI_SIZE <= 1 or MPI_EXE is None, "Parallel test only")
def test_withConflictsButSameValue(self):
"""Test that conflicts are ignored if the values are the same.
.. test:: Don't raise errors when multiple processes make the same changes.
:id: T_ARMI_CMP_MPI2
:tests: R_ARMI_CMP_MPI
"""
self.r.core.p.param1 = (context.MPI_SIZE + 1) * 99.0
self.r.syncMpiState()
self.assertEqual((context.MPI_SIZE + 1) * 99.0, self.r.core.p.param1)
[docs] @unittest.skipIf(context.MPI_SIZE <= 1 or MPI_EXE is None, "Parallel test only")
def test_conflictsMaintainWithStateRetainer(self):
"""Test that the state retainer fails correctly when it should."""
with self.r.retainState(parameters.inCategory("cat2")):
for _, comp in enumerate(self.comps):
comp.p.param2 = 99 * context.MPI_RANK
with self.assertRaises(ValueError):
self.r.syncMpiState()