Source code for rubin_scheduler.utils.parallel_utils
__all__ = ("SharedNumpyArray",)
from multiprocessing import shared_memory
import numpy as np
[docs]
class SharedNumpyArray:
    """Class to hold a numpy array that can be shared with
    multiple processes without copying
    Origianlly from:
    https://e-dorigatti.github.io/python/2020/06/19/multiprocessing-large-objects.html
    Parameters
    ----------
    array : `np.array`
        Array to copy into shared memory.
    """
    def __init__(self, array):
        # Create the shared memory location of the same size of the array
        self._shared = shared_memory.SharedMemory(create=True, size=array.nbytes)
        # Save data type and shape, necessary to read the data correctly
        self._dtype, self._shape = array.dtype, array.shape
        # Create a new numpy array that uses the shared memory we created.
        res = np.ndarray(self._shape, dtype=self._dtype, buffer=self._shared.buf)
        # Copy data from the array to the shared memory. Numpy will
        # take care of copying everything in the correct format
        np.copyto(res, array)
[docs]
    def read(self):
        """Read array without copy."""
        return np.ndarray(self._shape, self._dtype, buffer=self._shared.buf) 
[docs]
    def copy(self):
        """Copy array"""
        return np.copy(self.read()) 
[docs]
    def unlink(self):
        """Unlink when done with data"""
        self._shared.close()
        self._shared.unlink()