#
# @ 2023. Triad National Security, LLC. All rights reserved.
#
# This program was produced under U.S. Government contract 89233218CNA000001
# for Los Alamos National Laboratory (LANL), which is operated by Triad
# National Security, LLC for the U.S. Department of Energy/National Nuclear
# Security Administration. All rights in the program are reserved by Triad
# National Security, LLC, and the U.S. Department of Energy/National Nuclear
# Security Administration. The Government is granted for itself and others acting
# on its behalf a nonexclusive, paid-up, irrevocable worldwide license in this
# material to reproduce, prepare derivative works, distribute copies to the
# public, perform publicly and display publicly, and to permit others to do so.
#
# Author: Yu Zhang <zhy@lanl.gov>
#
# modified from a fdtd code (ref: )
r"""
Selects the backend for the openms-package.
The `openms` allows to choose a backend. The ``numpy`` backend is the
default one, but there are also several additional PyTorch backends:
- ``numpy`` (defaults to float64 arrays)
- ``torch`` (defaults to float64 tensors)
- ``torch.float32``
- ``torch.float64``
- ``torch.cuda`` (defaults to float64 tensors)
- ``torch.cuda.float32``
- ``torch.cuda.float64``
- cutensor (todo?)
For example, this is how to choose the `"torch"` backend: ::
openms.set_backend("torch")
In general, the ``numpy`` backend is preferred for standard CPU calculations
with `"float64"` precision. In general, ``float64`` precision is always
preferred over ``float32`` for FDTD simulations, however, ``float32`` might
give a significant performance boost.
The ``cuda`` backends are only available for computers with a GPU.
"""
## Imports
# Numpy Backend
import numpy # numpy has to be present
from functools import wraps
# used only by test runner.
# default must be idx 0.
backend_names = [
dict(backends="numpy"),
dict(backends="torch.float32"),
dict(backends="torch.float64"),
dict(backends="torch.cuda.float32"),
dict(backends="torch.cuda.float64"),
]
numpy_float_dtypes = {
getattr(numpy, "float_", numpy.float64),
getattr(numpy, "float16", numpy.float64),
getattr(numpy, "float32", numpy.float64),
getattr(numpy, "float64", numpy.float64),
getattr(numpy, "float128", numpy.float64),
}
# cupy backend
try:
import cupy
CUPY_AVAILABLE = True
except ImportError:
CUPY_AVAILABLE = False
#=================================================
# Torch Backends (and flags)
try:
import torch
torch.set_default_dtype(torch.float64) # we need more precision for FDTD
try: # we don't need gradients (for now)
torch._C.set_grad_enabled(False) # type: ignore
except AttributeError:
torch._C._set_grad_enabled(False)
TORCH_AVAILABLE = True
TORCH_CUDA_AVAILABLE = torch.cuda.is_available()
except ImportError:
TORCH_AVAILABLE = False
TORCH_CUDA_AVAILABLE = False
# TiledArray Backends (and flags)
try:
import tiledarray as TA
TA_AVAILABLE = True
# TA_CUDA_AVAILABLE = TA.cuda_available() # (todo)
except ImportError:
TA_AVAILABLE = False
TA_CUDA_AVAILABLE = False
# Base Class
[docs]class Backend:
"""Backend Base Class"""
# constants
pi = numpy.pi
def __repr__(self):
return self.__class__.__name__
def _replace_float(func):
"""replace the default dtype a function is called with"""
@wraps(func)
def new_func(self, *args, **kwargs):
result = func(*args, **kwargs)
if result.dtype in numpy_float_dtypes:
result = numpy.asarray(result, dtype=self.float)
return result
return new_func
# Numpy Backend
class NumpyBackend(Backend):
"""Numpy Backend"""
# cupy backend functions
def to_host_cpu(ndarray):
return ndarray
def to_host_gpu(mdarray):
return cupy.asnumpy(ndarray)
def get_cpu_free_memory():
try:
return os.sysconf("SC_PHYS_PAGES") * os.sysconf("SC_PAGE_SIZE") / 1024**3.0
except:
return 0.0
def get_gpu_free_memory():
free_bytes, total_bytes = cupy.cuda.Device().mem_info
used_bytes = total_bytes - free_bytes
return used_bytes, total_bytes
def synchronize_cpu():
pass
def synchronize_gpu():
cupy.cuda.stream.get_current_stream().synchronize()
# Numpy Backend
[docs]class NumpyBackend(Backend):
"""Numpy Backend"""
# types
int = numpy.int64
""" integer type for array"""
float = numpy.float64
""" floating type for array """
# methods
asarray = _replace_float(numpy.asarray)
exp = staticmethod(numpy.exp)
""" exponential of all elements in array """
sin = staticmethod(numpy.sin)
""" sine of all elements in array """
cos = staticmethod(numpy.cos)
""" cosine of all elements in array """
sum = staticmethod(numpy.sum)
""" sum elements in array """
max = staticmethod(numpy.max)
""" max element in array """
rand = staticmethod(numpy.random.rand)
stack = staticmethod(numpy.stack)
""" stack multiple arrays """
transpose = staticmethod(numpy.transpose)
""" transpose array by flipping two dimensions """
reshape = staticmethod(numpy.reshape)
""" reshape array into given shape """
squeeze = staticmethod(numpy.squeeze)
""" remove dim-1 dimensions """
broadcast_arrays = staticmethod(numpy.broadcast_arrays)
""" broadcast arrays """
broadcast_to = staticmethod(numpy.broadcast_to)
""" broadcast array into shape """
einsum = staticmethod(numpy.einsum)
allclose = staticmethod(numpy.allclose)
norm = staticmethod(numpy.linalg.norm)
[docs] @staticmethod
def bmm(arr1, arr2):
"""batch matrix multiply two arrays"""
return numpy.einsum("ijk,ikl->ijl", arr1, arr2)
[docs] @staticmethod
def is_array(arr):
"""check if an object is an array"""
return isinstance(arr, numpy.ndarray)
# constructors
array = _replace_float(numpy.array)
""" create an array from an array-like sequence """
ones = _replace_float(numpy.ones)
""" create an array filled with ones """
zeros = _replace_float(numpy.zeros)
""" create an array filled with zeros """
zeros_like = staticmethod(numpy.zeros_like)
""" create an array filled with zeros """
linspace = _replace_float(numpy.linspace)
""" create a linearly spaced array between two points """
arange = _replace_float(numpy.arange)
""" create a range of values """
pad = staticmethod(numpy.pad)
fftfreq = staticmethod(numpy.fft.fftfreq)
fft = staticmethod(numpy.fft.fft)
exp = staticmethod(numpy.exp)
divide = staticmethod(numpy.divide)
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# beware to future people:
# because this line *redefines numpy*,
# you have to add your new staticmethods /above/ this line to avoid mystification.
# <3 <3 <3 <3
#
# could this (and below) perhaps be changed to "to_numpy()"
# or maybe "check_numpy" ?
numpy = _replace_float(numpy.asarray)
""" convert the array to numpy array """
# Torch Backend
if TORCH_AVAILABLE:
import torch
class TorchBackend(Backend):
"""Torch Backend"""
# types
int = torch.int64
""" integer type for array"""
float = torch.get_default_dtype()
""" floating type for array """
# methods
asarray = staticmethod(torch.as_tensor)
""" create an array """
exp = staticmethod(torch.exp)
""" exponential of all elements in array """
sin = staticmethod(torch.sin)
""" sine of all elements in array """
cos = staticmethod(torch.cos)
""" cosine of all elements in array """
@staticmethod
def einsum(expr, *args):
return torch.einsum(expr, [*args])
allclose = staticmethod(torch.allclose)
norm = staticmethod(torch.linalg.norm)
sum = staticmethod(torch.sum)
""" sum elements in array """
max = staticmethod(torch.max)
""" max element in array """
rand = staticmethod(torch.rand)
stack = staticmethod(torch.stack)
""" stack multiple arrays """
@staticmethod
def transpose(arr, axes=None):
"""transpose array by flipping two dimensions"""
if axes is None:
axes = tuple(range(len(arr.shape) - 1, -1, -1))
return arr.permute(*axes)
squeeze = staticmethod(torch.squeeze)
""" remove dim-1 dimensions """
broadcast_arrays = staticmethod(torch.broadcast_tensors)
""" broadcast arrays """
broadcast_to = staticmethod(torch.broadcast_to)
""" broadcast array into shape """
reshape = staticmethod(torch.reshape)
""" reshape array into given shape """
bmm = staticmethod(torch.bmm)
""" batch matrix multiply two arrays """
@staticmethod
def is_array(arr):
"""check if an object is an array"""
# is this a reasonable implemenation?
return isinstance(arr, numpy.ndarray) or torch.is_tensor(arr)
def array(self, arr, dtype=None):
"""create an array from an array-like sequence"""
if dtype is None:
dtype = torch.get_default_dtype()
if torch.is_tensor(arr):
return arr.clone().to(device="cpu", dtype=dtype)
return torch.tensor(arr, device="cpu", dtype=dtype)
# constructors
ones = staticmethod(torch.ones)
""" create an array filled with ones """
zeros = staticmethod(torch.zeros)
""" create an array filled with zeros """
def linspace(self, start, stop, num=50, endpoint=True):
"""create a linearly spaced array between two points"""
delta = (stop - start) / float(num - float(endpoint))
if not delta:
return self.array([start] * num)
return torch.arange(start, stop + 0.5 * float(endpoint) * delta, delta)
arange = staticmethod(torch.arange)
""" create a range of values """
pad = staticmethod(torch.nn.functional.pad) # type: ignore
fftfreq = staticmethod(numpy.fft.fftfreq)
fft = staticmethod(torch.fft) # type: ignore
divide = staticmethod(torch.div)
exp = staticmethod(torch.exp)
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# The same warning applies here.
# <3 <3 <3 <3
def numpy(self, arr):
"""convert the array to numpy array"""
if torch.is_tensor(arr):
return arr.numpy()
else:
return numpy.asarray(arr)
# Torch Cuda Backend
if TORCH_CUDA_AVAILABLE:
class TorchCudaBackend(TorchBackend):
"""Torch Cuda Backend"""
def ones(self, shape):
"""create an array filled with ones"""
return torch.ones(shape, device="cuda")
def zeros(self, shape):
"""create an array filled with zeros"""
return torch.zeros(shape, device="cuda")
def array(self, arr, dtype=None):
"""create an array from an array-like sequence"""
if dtype is None:
dtype = torch.get_default_dtype()
if torch.is_tensor(arr):
return arr.clone().to(device="cuda", dtype=dtype)
return torch.tensor(arr, device="cuda", dtype=dtype)
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# The same warning applies here.
def numpy(self, arr):
"""convert the array to numpy array"""
if torch.is_tensor(arr):
return arr.cpu().numpy()
else:
return numpy.asarray(arr)
def linspace(self, start, stop, num=50, endpoint=True):
"""convert a linearly spaced interval of values"""
delta = (stop - start) / float(num - float(endpoint))
if not delta:
return self.array([start] * num)
return torch.arange(
start, stop + 0.5 * float(endpoint) * delta, delta, device="cuda"
)
[docs]class cuTensorBackend(Backend):
int = int
## Default Backend
# this backend object will be used for all array/tensor operations.
# the backend is changed by changing the class of the backend
# using the "set_backend" function. This "monkeypatch" will replace all the methods
# of the backend object by the methods supplied by the new class.
backend = NumpyBackend()
## Set backend
[docs]def set_backend(name: str):
"""Set the backend for the FDTD simulations
This function monkeypatches the backend object by changing its class.
This way, all methods of the backend object will be replaced.
Args:
name: name of the backend. Allowed backend names:
- ``numpy`` (defaults to float64 arrays)
- ``numpy.float16``
- ``numpy.float32``
- ``numpy.float64``
- ``numpy.float128``
- ``torch`` (defaults to float64 tensors)
- ``torch.float16``
- ``torch.float32``
- ``torch.float64``
- ``torch.cuda`` (defaults to float64 tensors)
- ``torch.cuda.float16``
- ``torch.cuda.float32``
- ``torch.cuda.float64``
- ``TiledArray``
- ``TiledArray.sparse``
- ``TiledArray.cuda``
- ``TiledArray.cuda.sparse``
"""
# perform checks
if name.startswith("torch") and not TORCH_AVAILABLE:
raise RuntimeError("Torch backend is not available. Is PyTorch installed?")
if name.startswith("torch.cuda") and not TORCH_CUDA_AVAILABLE:
raise RuntimeError(
"Torch cuda backend is not available.\n"
"Do you have a GPU on your computer?\n"
"Is PyTorch with cuda support installed?"
)
if name.startswith("TiledArray") and not TA_AVAILABLE:
raise RuntimeError(
"TiledArray backend is not available. Is TiledArray installed?"
)
if name.count(".") == 0:
dtype, device = "float64", "cpu"
elif name.count(".") == 1:
name, dtype = name.split(".")
if dtype == "cuda":
device, dtype = "cuda", "float64"
else:
device = "cpu"
elif name.count(".") == 2:
name, device, dtype = name.split(".")
else:
raise ValueError(f"Unknown backend '{name}'")
if name == "numpy":
if device == "cpu":
backend.__class__ = NumpyBackend
backend.float = getattr(numpy, dtype)
elif device == "cuda":
raise ValueError(
"Device 'cuda' not available for numpy backend. Use 'torch' backend in stead."
)
else:
raise ValueError(
"Unknown device '{device}'. Available devices: 'cpu', 'cuda'"
)
elif name == "cupy":
backend.__class__ = CupyBackend
backend.float = getattr(cupy, dtype)
elif name == "torch":
if device == "cpu":
backend.__class__ = TorchBackend
backend.float = getattr(torch, dtype)
elif device == "cuda":
backend.__class__ = TorchCudaBackend
backend.float = getattr(torch, dtype)
else:
raise ValueError(
"Unknown device '{device}'. Available devices: 'cpu', 'cuda'"
)
elif name == "TiledArray":
if device == "cpu":
backend.__class__ = TABackend
# backend.float = getattr(TA, dtype)
# elif device == "cuda":
# backend.__class__ = TACudaBackend
# backend.float = getattr(TA, dtype)
else:
raise ValueError(
"Unknown device '{device}'. Available devices: 'cpu', 'cuda'"
)
else:
raise ValueError(
"Unknown backend '{name}'. Available backends: 'numpy', 'torch', 'TiledArray'"
)
# move data to gpu if needed (todo)
[docs]def gpu_allocation(*args):
raise NotImplementedError
#### TA Array helpers
# TA Backend
if TA_AVAILABLE:
import tiledarray as TA
Array = TA.TArray
class TABackend(Backend):
"""TA Backend"""
blksize = 1
def __init__(self):
"""
storre world in the class?
"""
self.world = TA.get_default_world()
@staticmethod
def rand(size, block=None, world=None, device=None):
"""
generate a random tensor
"""
if world is None:
world = TA.get_default_world()
if block is None:
block = min(min(size), max(TABackend.blksize, 1))
op = lambda r: numpy.random.rand(*r.shape)
a = Array(size, block=block, world=world, op=op)
world.fence()
return a
def zeros(size, block=1, world=None, device=None):
"""
dtype: data type (TBA)
zero tensor:
size: array, size of of each dimension, [4,5] or [2,3,4] for example
"""
if world is None:
world = TA.get_default_world()
a = Array(size, block, world)
a.fill(0.0, False)
world.fence()
return a
def ones(size, block=1, world=None, device=None):
"""
unit tensor
"""
if world is None:
world = TA.get_default_world()
a = Array(size, block, world)
a.fill(1.0, False)
world.fence()
return a
def cos():
raise NotImplementedError
def sin():
raise NotImplementedError
def exp():
raise NotImplementedError
""" sum elements in array """
def sum():
raise NotImplementedError
def to_numpy(tensor):
""" """
a = numpy.zeros(tensor.shape)
for tile in tensor:
start = tile.range.start
stop = tile.range.stop
slices = tuple(slice(b, c) for b, c in zip(start, stop))
# print('tile.range=', tile.range)
# print('subarray=', tensor[slices])
a[slices] = tile.data
return a
def from_numpy(tensor):
"""
initialize a TA tensor from numpy tensor
"""
if world is None:
world = TA.get_default_world()
size = tensor.shape
a = Array(size, block, world=world)
for i, tile in enumerate(a):
start = tile.range.start
stop = tile.range.stop
slices = tuple(slice(b, c) for b, c in zip(start, stop))
# print('tile.range=', tile.range)
# print('subarray=', tensor[slices])
tile.data = tensor[slices]
return a
@staticmethod
def einsum(expr, *args):
"""
first, we need to figure out the shape of the output
get the block and world from the arguments:
ik, kj --> ij
"""
# input_shapes = [arg.shape for arg in args]
# output_shape = einsum_output_shape(expr, *args)
# print("output_shape=", output_shape)
cout = Array()
TA.einsum(expr, *args, cout)
return cout
# jax backend for autodiff
try:
import jax
JAX_AVAILABLE = True
except ImportError:
JAX_AVAILABLE = False