Source code for armi.utils.pathTools

# Copyright 2019 TerraPower, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains commonly used functions relating to directories, files and path
manipulations.
"""
from time import sleep
import importlib
import os
import pathlib
import shutil

from armi import context
from armi import runLog

DO_NOT_CLEAN_PATHS = [
    "armiruns",
    "failedruns",
    "mc2run",
    "mongoose",
    "shufflebranches",
    "snapshot",
    "tests",
]


[docs]def armiAbsPath(*pathParts): """ Convert a list of path components to an absolute path, without drive letters if possible. This is mostly useful on Windows systems, where drive letters are not well defined across systems. In these cases, it is useful to try to convert to a UNC path if possible. """ # imported here to prevent cluster failures, unsure why this causes an error result = os.path.abspath(os.path.join(*pathParts)) try: from ccl import common_operations return common_operations.convert_to_unc_path(result) except Exception: return result
[docs]def copyOrWarn(fileDescription, sourcePath, destinationPath): """Copy a file, or warn if the file doesn't exist. Parameters ---------- fileDescription : str a description of the file and/or operation being performed. sourcePath : str Path of the file to be copied. destinationPath : str Path for the copied file. """ try: shutil.copy(sourcePath, destinationPath) runLog.debug( "Copied {}: {} -> {}".format(fileDescription, sourcePath, destinationPath) ) except shutil.SameFileError: pass except Exception as e: runLog.warning( "Could not copy {} from {} to {}\nError was: {}".format( fileDescription, sourcePath, destinationPath, e ) )
[docs]def isFilePathNewer(path1, path2): """Returns true if path1 is newer than path2. Returns true if path1 is newer than path2, or if path1 exists and path2 does not, otherwise raises an IOError. """ exist1 = os.path.exists(path1) exist2 = os.path.exists(path2) if exist1 and exist2: path1stat = os.stat(path1) path2stat = os.stat(path2) return path1stat.st_mtime > path2stat.st_mtime elif exist1 and not exist2: return True else: raise IOError("Path 1 does not exist: {}".format(path1))
[docs]def isAccessible(path): """Check whether user has access to a given path. Parameters ---------- path : str a directory or file """ return os.path.exists(path)
[docs]def separateModuleAndAttribute(pathAttr): """ Return True of the specified python module, and attribute of the module exist. Parameters ---------- pathAttr : str Path to a python module followed by the desired attribute. e.g.: `/path/to/my/thing.py:MyClass` Notes ----- The attribute of the module could be a class, function, variable, etc. Raises ------ ValueError: If there is no `:` separating the path and attr. """ # rindex gives last index. # The last is needed because the first colon index could be mapped drives in windows. lastColonIndex = pathAttr.rindex(":") # this raises a valueError # there should be at least 1 colon. 2 is possible due to mapped drives in windows. return (pathAttr[:lastColonIndex]), pathAttr[lastColonIndex + 1 :]
[docs]def importCustomPyModule(modulePath): """ Dynamically import a custom module. Parameters ---------- modulePath : str Path to a python module. Returns ------- userSpecifiedModule : module The imported python module. """ modulePath = pathlib.Path(modulePath) if not modulePath.exists() or not modulePath.is_file(): raise IOError(r"Cannot import module from the given path: `{modulePath}`") _dir, moduleName = os.path.split(modulePath) moduleName = os.path.splitext(moduleName)[0] # take off the extension spec = importlib.util.spec_from_file_location(moduleName, modulePath) userSpecifiedModule = importlib.util.module_from_spec(spec) spec.loader.exec_module(userSpecifiedModule) return userSpecifiedModule
[docs]def moduleAndAttributeExist(pathAttr): """ Return True if the specified python module, and attribute of the module exist. Parameters ---------- pathAttr : str Path to a python module followed by the desired attribute. e.g.: `/path/to/my/thing.py:MyClass` Returns ------- bool True if the specified python module, and attribute of the module exist. Notes ----- The attribute of the module could be a class, function, variable, etc. """ try: modulePath, moduleAttributeName = separateModuleAndAttribute(pathAttr) except ValueError: return False modulePath = pathlib.Path(modulePath) if not modulePath.is_file(): return False try: userSpecifiedModule = importCustomPyModule(modulePath) # Blanket except is okay since we are checking to see if a custom import will work. except Exception: return False return moduleAttributeName in userSpecifiedModule.__dict__
[docs]def cleanPath(path, mpiRank=0): """Recursively delete a path. !!! Be careful with this !!! It can delete the entire cluster. We add copious os.path.exists checks in case an MPI set of things is trying to delete everything at the same time. Always check filenames for some special flag when calling this, especially with full permissions on the cluster. You could accidentally delete everyone's work with one misplaced line! This doesn't ask questions. Safety nets include an allow-list of paths. This makes use of shutil.rmtree and os.remove Returns ------- success : bool True if file was deleted. False if it was not. """ valid = False if not os.path.exists(path): return True for validPath in DO_NOT_CLEAN_PATHS: if validPath in path.lower(): valid = True if pathlib.Path(context.APP_DATA) in pathlib.Path(path).parents: valid = True if not valid: raise Exception( "You tried to delete {0}, but it does not seem safe to do so.".format(path) ) # delete the file/directory from only one process if mpiRank == context.MPI_RANK: if os.path.exists(path) and os.path.isdir(path): shutil.rmtree(path) elif not os.path.isdir(path): # it's just a file. Delete it. os.remove(path) # Potentially, wait for the deletion to finish. maxLoops = 6 waitTime = 0.5 loopCounter = 0 while os.path.exists(path): loopCounter += 1 if loopCounter > maxLoops: break sleep(waitTime) return not os.path.exists(path)