Source code for ndsharray.ndsharray

# python packages
import os
if os.name == "posix":
    from mmap import MAP_SHARED, PROT_READ
elif os.name == "nt":
    from mmap import ACCESS_READ, ACCESS_WRITE
import mmap
import uuid
import time
import sys
from typing import Union, Tuple
import struct
import warnings

# external python packages
import numpy as np

"""
:var supported_types: supported numpy dtype
"""
supported_types = [np.int8,
                   np.int16,
                   np.int32,
                   np.int64,
                   np.uint8,
                   np.uint16,
                   np.uint32,
                   np.uint64,
                   np.float16,
                   np.float32,
                   np.float64,
                   np.complex64,
                   np.complex128,
                   bool]

"""
:var n_bytes_for_int: get the number of bytes for a python integer, this is python version and system dependent
"""
n_bytes_for_int = len(sys.maxsize.to_bytes((sys.maxsize.bit_length() + 7) // 8, 'big'))


def int_to_bytes(i: int, *, signed: bool = False) -> bytes:
    """
    converts integer to bytes

    :param i:
    :param signed:
    :return:
    """
    global n_bytes_for_int
    return i.to_bytes(n_bytes_for_int, byteorder='big', signed=signed)


def bytes_to_int(b: bytes, *, signed: bool = False) -> int:
    """
    converts bytes to an integer

    :param b:
    :param signed:
    :return:
    """
    return int.from_bytes(b, byteorder='big', signed=signed)


def str_to_bytes(s: str) -> bytes:
    """
    converts string to bytes

    :param s:
    :return:
    """
    return bytearray(s.encode('utf-8'))


def bytes_to_str(b: bytes) -> str:
    """
    converts bytes to a string

    :param b:
    :return:
    """
    return b.decode('utf-8')


[docs]class NdShArray(object): """ sharing numpy array between different processes """
[docs] def __init__(self, name: str, array: np.ndarray = np.ndarray((0, ), dtype=np.uint8), r_w: Union[str, None] = None): """ :param name: :param array: :param r_w: 'r' or 'w' for 'read' or 'write' functionality, must be specified """ object.__init__(self) # save numpy array and its properties, this array is used for reading and writing self._array: np.ndarray = array # save the name for the mmap self._name: str = name # save the read / write property if r_w == "w" or r_w == "r" or r_w == "W" or r_w == "R": self._access: str = r_w.lower() else: raise ValueError("'r' or 'w' must be specified for input argument 'r_w'.") # initialize last read time self._last_write_time: float = 0.0 # set to zero to get in every case the next numpy array self._read_time_ms: float = 0.0 # write time will be saved to make sure it is increasing and unique self._write_time: float = 0.0 # unique identifier for the ndarray mmap name self._uuid: str = uuid.uuid4().hex # holds just the tag-name of the mmap of the ndarray self._mmap: mmap.mmap self._fd: Union[None, int] = None # holds the numpy ndarray self._ndarray_mmap: Union[None, mmap.mmap] = None self._ndarray_fd: Union[None, int] = None self._is_valid = False # buffer size of the _mmap_ndarray _bytes = self._array_to_bytes(self._array) self._buffer_size: int = len(_bytes) # create the mmap which holds the name of the ndarray mmap self._mmap, self._fd = self._create_mmap(self._name, len(self.ndarray_mmap_name), r_w=self._access) # create ndarray mmap self._is_valid = self._create_ndarray_mmap() if self._access == "w": self.write(array) # call write to force saving the array via mmap.flush!
def __del__(self): # closing the mmap if hasattr(self, "_mmap"): self._close_mmap(self._mmap, self._fd) # closing the ndarray mmap if hasattr(self, "_ndarray_mmap"): self._close_mmap(self._ndarray_mmap, self._ndarray_fd) @property def name(self) -> str: """ unique name of the mmap memory, serves as identifier for other processes the name must be declared at class instantiation and is read only after instantiation :return name: """ return self._name @property def is_valid(self) -> bool: """ checks if the header of the numpy array is valid or not :return: """ return self._is_valid @property def ndarray_mmap_name(self) -> str: """ returns the name of the mmap which holds the current ndarray ndarray_mmap_name consists of the name and an uuid which is generated for each new ndarray size (changes in dtype, shape or dimension does a change in size) :return ndarray_mmap_name: """ return "%s_%s" % (self._name, self._uuid) @property def access(self) -> str: """ access of the ndsharray; either 'w' for only writeable or 'r' for only readable :return access: """ return self._access @property def read_time_ms(self) -> float: """ returns the write-read time of the two processes in milliseconds :return: """ return self._read_time_ms def _array_to_bytes(self, array: np.ndarray) -> bytes: """ encodes a numpy array to bytes using an own protocol protocol usage: - write-time (8 bytes) - numpy dtype index (integer, 8 bytes) - number of dimension (integer, 8 bytes) - length of axis (array dimension) 0 (integer, 8 bytes) - length of axis (array dimension) 1 (integer, 8 bytes) - length of axis (array dimension) 2 (integer, 8 bytes) - length of axis (array dimension) . (integer, 8 bytes) - length of axis (array dimension) . (integer, 8 bytes) - length of axis (array dimension) n (integer, 8 bytes) - bytes of numpy array - write-time (8 bytes) note: size of integer may differ because the maximum integer size sys.maxsize will be used (on python3, amd64 it is 8 byte) :param array: byte-encoded numpy array using an own protocol :return: """ global supported_types if not isinstance(array, np.ndarray): raise TypeError("array must be from type np.ndarray.") if array.dtype not in supported_types: raise NotImplementedError("%s is a numpy.dtype which is not supported. " "The following numpy.dtypes are supported: %s" % (str(array.dtype), str([_t.__name__ for _t in supported_types]))) _now = time.monotonic() if _now <= self._write_time: _now = float(np.nextafter(self._write_time, float('inf'))) # +1 ULP self._write_time = _now _time = struct.pack("d", self._write_time) _bytes = b'' _bytes += _time _bytes += int_to_bytes(supported_types.index(array.dtype)) _bytes += int_to_bytes(int(array.ndim)) for s in range(array.ndim): _bytes += int_to_bytes(int(array.shape[s])) _bytes += array.tobytes() _bytes += _time return _bytes def _bytes_to_array(self, _bytes: bytes) -> Tuple[bool, bool, np.ndarray]: """ :param _bytes: :return mmap_correct: boolean shows, if the mmap does fit to the size of the numpy ndarray, if it is not correct, the mmap should be re-initialized and the buffer should be read out again if mmap_correct is False, validity will be also False and the numpy array will be empty :return validity: boolean displaying if the numpy array is corrupt or not (e.g. mixed numpy ndarray from previous writing) :return array: numpy.ndarray, mmap_correct and validity must be True, otherwise this array contains corrupt data """ global supported_types _mmap_correct = True _validity = False _array = np.ndarray((0, )) idx = 0 _time_start = struct.unpack("d", _bytes[idx:8])[0] idx += 8 _np_dtype = supported_types[bytes_to_int(_bytes[idx:idx+n_bytes_for_int])] idx += n_bytes_for_int if _np_dtype != self._array.dtype: return False, False, _array _np_dim = bytes_to_int(_bytes[idx:idx+n_bytes_for_int]) idx += n_bytes_for_int if _np_dim != self._array.ndim: return False, False, _array _np_shape = [] for s in range(_np_dim): _np_shape.append(bytes_to_int(_bytes[idx:idx + n_bytes_for_int])) idx += n_bytes_for_int _np_shape = tuple(_np_shape) if _np_shape != self._array.shape: return False, False, _array _byte_array = _bytes[idx:-8] idx = len(_bytes) - 8 try: _time_end = struct.unpack("d", _bytes[idx:])[0] except ValueError: _time_end = 0 _validity = _time_start == _time_end # check for mmap changes _array = np.frombuffer(_byte_array, dtype=_np_dtype).reshape(_np_shape) return _mmap_correct, _validity, _array
[docs] def write(self, array: np.ndarray) -> None: """ write a numpy array into the mmap file, it might be from any type, shape or dimension Important Note: a mmap will be silently re-created if type, dimension or shape will be changed. the other process will read the first line of the mmap and will also re-create its mmap. Re-creating the mmap needs more time than a normal read. :param array: a numpy.ndarray which shall be saved in mmap :return None: """ _bytes = self._array_to_bytes(array) # check, if a new mmap has to be generated if self._array.dtype != array.dtype or self._array.ndim != array.ndim or self._array.shape != array.shape: self._array = array self._buffer_size = len(_bytes) self._create_ndarray_mmap() self._ndarray_mmap.seek(0) self._ndarray_mmap.write(_bytes) self._ndarray_mmap.flush() # write name of ndarray mmap into mmap self._mmap.seek(0) self._mmap.write(str_to_bytes(self.ndarray_mmap_name)) self._mmap.flush()
[docs] def read(self) -> Tuple[bool, np.ndarray]: """ reading the shared memory with mmap and numpy's frombuffer, which returns a view of the buffer and not a copy. Citing the documentation from numpy.frombuffer: "This function creates a view into the original object. This should be safe in general, but it may make sense to copy the result when the original object is mutable or untrusted." Source: https://numpy.org/doc/stable/reference/generated/numpy.frombuffer.html :return validity: boolean displaying if the numpy array is ok or if it is either old or corrupt or not (e.g. mixed numpy ndarray from previous writing). Note: validity is checked by checking if buffer[0] and buffer[-1] have the same time stamp! :return array: numpy.ndarray """ global n_bytes_for_int, supported_types _recreated_map = False _mmap_correct = True _validity = False _numpy_array = self._array # get the ndarray mmap name self._mmap.seek(0) _ndarray_mmap_name = bytes_to_str(self._mmap.read(len(self._name)+33)) if _ndarray_mmap_name != self.ndarray_mmap_name: self._create_ndarray_mmap() _recreated_map = True if self._is_valid: # first stage of checking if new data have been arrived self._ndarray_mmap.seek(0) _bytes = self._ndarray_mmap.read(8) try: _write_time = struct.unpack("d", _bytes)[0] except ValueError: _write_time = 0 if _write_time <= self._last_write_time and not _recreated_map: return False, _numpy_array # without checking, read the whole buffer _bytes += self._ndarray_mmap.read() if len(_bytes) != self._buffer_size: self._create_ndarray_mmap() _mmap_correct, _validity, _numpy_array = self._bytes_to_array(_bytes) if not _mmap_correct: warnings.warn("The mmap of the ndarray seems to be corrupt and the used protocol does not fit.", BytesWarning) # for efficiency self._array = _numpy_array # for debug purpose self._read_time_ms = (time.monotonic()-_write_time) * 1000.0 self._last_write_time = _write_time return _validity, _numpy_array
def _create_ndarray_mmap(self) -> bool: """ creates two mmap: - the mmap with tag 'name' just holds the mmap-tag-name of ndarray - the mmap of the ndarray may change its name every time a new shape, dimension or dtype is detected :return: """ global n_bytes_for_int self._close_mmap(self._ndarray_mmap, self._ndarray_fd) # now rebuild the mmap if self._access == "w": # create new uuid self._uuid = uuid.uuid4().hex self._ndarray_mmap, self._ndarray_fd = self._create_mmap(self.ndarray_mmap_name, self._buffer_size, r_w=self._access) elif self._access == "r": self._mmap.seek(0) _ndarray_mmap_name = bytes_to_str(self._mmap.read(len(self._name)+33)) self._uuid = _ndarray_mmap_name[-32:] try: int(self._uuid, 16) self._is_valid = True except ValueError: self._is_valid = False try: if self._is_valid: # create temporary mmap to get the dtype and dimension of the array _tmp_mmap, _tmp_fd = self._create_mmap(self.ndarray_mmap_name, 8+2*n_bytes_for_int, r_w="r") _tmp_mmap.seek(0) _bytes = _tmp_mmap.read(8+2*n_bytes_for_int) # skip the time: +8 idx = 8 _np_dtype = supported_types[bytes_to_int(_bytes[idx:idx+n_bytes_for_int])] idx += n_bytes_for_int _np_dim = bytes_to_int(_bytes[idx:idx+n_bytes_for_int]) self._close_mmap(_tmp_mmap, _tmp_fd) # create temporary mmap to get the shape of the array _tmp_2_mmap, _tmp_2_fd = self._create_mmap(self.ndarray_mmap_name, 8 + 2 * n_bytes_for_int + _np_dim * n_bytes_for_int, r_w="r") _tmp_2_mmap.seek(8+2*n_bytes_for_int) # skip the time, dtype and dimension # read shape _bytes += _tmp_2_mmap.read(_np_dim * n_bytes_for_int) idx = 8 + 2 * n_bytes_for_int _np_shape = [] for s in range(_np_dim): _np_shape.append(bytes_to_int(_bytes[idx:idx + n_bytes_for_int])) idx += n_bytes_for_int _np_shape = tuple(_np_shape) self._close_mmap(_tmp_2_mmap, _tmp_2_fd) # rebuild _array and get the length of the byte array -> super lazy and inefficient... self._array = np.ndarray(_np_shape, dtype=_np_dtype) self._buffer_size = len(self._array_to_bytes(self._array)) self._ndarray_mmap, self._ndarray_fd = self._create_mmap(self.ndarray_mmap_name, self._buffer_size, r_w=self._access) except: self._is_valid = False return self._is_valid @staticmethod def _create_mmap(name: str, buffer_size: int, r_w: str) -> Tuple[mmap.mmap, Union[None, int]]: """ static helper function to create a mmap for a specific operating system :param name: :param buffer_size: :param r_w: :return: """ _mmap: Union[None, mmap.mmap] = None _fd: Union[None, int] = None if os.name == "nt": if r_w == "w": _mmap = mmap.mmap(-1, buffer_size, name, access=ACCESS_WRITE) _mmap.flush() elif r_w == "r": _mmap = mmap.mmap(-1, buffer_size, name, access=ACCESS_READ) elif os.name == "posix": if r_w == "w": _fd = os.open("/dev/shm/%s" % name, os.O_CREAT | os.O_TRUNC | os.O_RDWR) os.truncate("/dev/shm/%s" % name, buffer_size) # resize file _mmap = mmap.mmap(_fd, buffer_size, MAP_SHARED) _mmap.flush() elif r_w == "r": _fd = os.open("/dev/shm/%s" % name, os.O_RDONLY) _mmap = mmap.mmap(_fd, buffer_size, MAP_SHARED, PROT_READ) else: raise OSError("%s is not supported." % os.name) return _mmap, _fd @staticmethod def _close_mmap(_mmap: Union[mmap.mmap, None], _fd: Union[None, int]) -> None: """ static helper function to close a mmap for a specific operating system :param _mmap: :return: """ # closing the mmap if _mmap is not None: # closing the mmap _mmap.close() while not _mmap.closed: time.sleep(0.001) # closing the ndarray file if os.name == "posix": if _fd is not None: os.close(_fd)