Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 76 additions & 108 deletions Lib/multiprocessing/shared_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,7 @@ def unlink(self):
_posixshmem.shm_unlink(self._name)
resource_tracker.unregister(self._name, "shared_memory")


_encoding = "utf8"

class ShareableList:
"""Pattern for a mutable list-like object shareable via a shared
memory block. It differs from the built-in list type in that these
Expand All @@ -258,13 +256,11 @@ class ShareableList:

# The shared memory area is organized as follows:
# - 8 bytes: number of items (N) as a 64-bit integer
# - (N + 1) * 8 bytes: offsets of each element from the start of the
# data area
# - (2 * N + 1) * 8 bytes: offsets from the start of the data
# area, the `struct` format string and the index
# into _back_transforms_mapping for each elements
# - K bytes: the data area storing item values (with encoding and size
# depending on their respective types)
# - N * 8 bytes: `struct` format string for each element
# - N bytes: index into _back_transforms_mapping for each element
# (for reconstructing the corresponding Python value)
_types_mapping = {
int: "q",
float: "d",
Expand Down Expand Up @@ -295,14 +291,25 @@ def _extract_recreation_code(value):
else:
return 3 # NoneType

@staticmethod
def _encode_if_string(value):
"""
Encode the value into bytes if the value is a string
"""
return value.encode(_encoding) if isinstance(value, str) else value

def __init__(self, sequence=None, *, name=None):
if name is None or sequence is not None:
sequence = sequence or ()
_formats = [
self._types_mapping[type(item)]
if not isinstance(item, (str, bytes))
else self._types_mapping[type(item)] % (
self._alignment * (len(item) // self._alignment + 1),
self._alignment
* max(
((len(self._encode_if_string(item)) - 1)
// self._alignment + 1),
1),
)
for item in sequence
]
Expand All @@ -312,149 +319,124 @@ def __init__(self, sequence=None, *, name=None):
# The offsets of each list element into the shared memory's
# data area (0 meaning the start of the data area, not the start
# of the shared memory area).
self._allocated_offsets = [0]
for fmt in _formats:
offset += self._alignment if fmt[-1] != "s" else int(fmt[:-1])
self._allocated_offsets.append(offset)
_recreation_codes = [
self._extract_recreation_code(item) for item in sequence
]
_metainfo = [0]
for item, fmt in zip(sequence, _formats):
fmt_str = fmt[-2:] if fmt[-1] != "s" else fmt[-1]
fmt_count = -1 if fmt_str != "s" else int(fmt[:-1])
offset += self._alignment if fmt_str != "s" else fmt_count
_metainfo.append(fmt_count)
_metainfo.append(fmt_str.encode(_encoding))
_metainfo.append(self._extract_recreation_code(item))
_metainfo.append(offset)

requested_size = struct.calcsize(
"q" + self._format_size_metainfo +
"".join(_formats) +
self._format_packing_metainfo +
self._format_back_transform_codes
"q" + self._format_metainfo +
"".join(_formats)
)

self.shm = SharedMemory(name, create=True, size=requested_size)
else:
self.shm = SharedMemory(name)

if sequence is not None:
_enc = _encoding
self._data_size = _metainfo[-1]
struct.pack_into(
"q" + self._format_size_metainfo,
"q" + self._format_metainfo,
self.shm.buf,
0,
self._list_len,
*(self._allocated_offsets)
*(_metainfo)
)
struct.pack_into(
"".join(_formats),
self.shm.buf,
self._offset_data_start,
*(v.encode(_enc) if isinstance(v, str) else v for v in sequence)
)
struct.pack_into(
self._format_packing_metainfo,
self.shm.buf,
self._offset_packing_formats,
*(v.encode(_enc) for v in _formats)
)
struct.pack_into(
self._format_back_transform_codes,
self.shm.buf,
self._offset_back_transform_codes,
*(_recreation_codes)
*(self._encode_if_string(v) for v in sequence)
)

else:
self._list_len = len(self) # Obtains size from offset 0 in buffer.
self._allocated_offsets = list(
struct.unpack_from(
self._format_size_metainfo,
self.shm.buf,
1 * 8
)
)
self._data_size = struct.unpack_from(
"q",
self.shm.buf,
(2 * self._list_len + 1) * 8
)[0]

def _get_packing_format(self, position):
def _get_metainfo(self, position):
"Gets the packing format for a single value stored in the list."
position = position if position >= 0 else position + self._list_len
if (position >= self._list_len) or (self._list_len < 0):
raise IndexError("Requested position out of range.")

v = struct.unpack_from(
"8s",
offset, fmt_count, fmt, transform_code = struct.unpack_from(
"qi2sb",
self.shm.buf,
self._offset_packing_formats + position * 8
)[0]
fmt = v.rstrip(b'\x00')
(2 * position + 1) * 8
)
fmt = fmt.rstrip(b'\x00')
fmt_as_str = fmt.decode(_encoding)

return fmt_as_str

def _get_back_transform(self, position):
"Gets the back transformation function for a single value."

if (position >= self._list_len) or (self._list_len < 0):
raise IndexError("Requested position out of range.")

transform_code = struct.unpack_from(
"b",
self.shm.buf,
self._offset_back_transform_codes + position
)[0]
if fmt_as_str == "s":
fmt_as_str = f"{fmt_count}s"
elif "?" in fmt_as_str:
fmt_as_str = "xxxxxx" + fmt_as_str
transform_function = self._back_transforms_mapping[transform_code]
return offset, fmt_as_str, transform_function

return transform_function
def _get_packing_format(self, position):
return self._get_metainfo(position)[1]

def _set_packing_format_and_transform(self, position, fmt_as_str, value):
"""Sets the packing format and back transformation code for a
single value in the list at the specified position."""

if (position >= self._list_len) or (self._list_len < 0):
raise IndexError("Requested position out of range.")
transform_code = self._extract_recreation_code(value)

struct.pack_into(
"8s",
self.shm.buf,
self._offset_packing_formats + position * 8,
fmt_as_str.encode(_encoding)
)
fmt_str = fmt_as_str[-2:] if fmt_as_str[-1] != "s" else fmt_as_str[-1]
fmt_count = -1 if fmt_str != "s" else int(fmt_as_str[:-1])

transform_code = self._extract_recreation_code(value)
struct.pack_into(
"b",
"i2sb",
self.shm.buf,
self._offset_back_transform_codes + position,
(2 * position + 2) * 8,
fmt_count,
fmt_str.encode(_encoding),
transform_code
)

def __getitem__(self, position):
position = position if position >= 0 else position + self._list_len
try:
offset = self._offset_data_start + self._allocated_offsets[position]
item_offset, format, back_transform = self._get_metainfo(position)
offset = self._offset_data_start + item_offset
(v,) = struct.unpack_from(
self._get_packing_format(position),
format,
self.shm.buf,
offset
)
except IndexError:
raise IndexError("index out of range")

back_transform = self._get_back_transform(position)
v = back_transform(v)

return v

def __setitem__(self, position, value):
position = position if position >= 0 else position + self._list_len
try:
item_offset = self._allocated_offsets[position]
item_offset, current_format, _ = self._get_metainfo(position)
offset = self._offset_data_start + item_offset
current_format = self._get_packing_format(position)
except IndexError:
raise IndexError("assignment index out of range")

if not isinstance(value, (str, bytes)):
new_format = self._types_mapping[type(value)]
encoded_value = value
else:
allocated_length = self._allocated_offsets[position + 1] - item_offset
if position + 1 == self._list_len:
next_item_offset = self._data_size
else:
next_item_offset, _, _ = self._get_metainfo(position + 1)
allocated_length = next_item_offset - item_offset

encoded_value = (value.encode(_encoding)
if isinstance(value, str) else value)
encoded_value = self._encode_if_string(value)
if len(encoded_value) > allocated_length:
raise ValueError("bytes/str item exceeds available storage")
if current_format[-1] == "s":
Expand Down Expand Up @@ -484,37 +466,23 @@ def __repr__(self):
def format(self):
"The struct packing format used by all currently stored items."
return "".join(
self._get_packing_format(i) for i in range(self._list_len)
self._get_metainfo(i)[1] for i in range(self._list_len)
)

@property
def _format_size_metainfo(self):
"The struct packing format used for the items' storage offsets."
return "q" * (self._list_len + 1)

@property
def _format_packing_metainfo(self):
"The struct packing format used for the items' packing formats."
return "8s" * self._list_len

@property
def _format_back_transform_codes(self):
"The struct packing format used for the items' back transforms."
return "b" * self._list_len
def _format_metainfo(self):
"""
The struct packing format used for the items' storage offsets,
recreation code and packing formats.
"""
return "qi2sbx" * self._list_len + "q"

@property
def _offset_data_start(self):
# - 8 bytes for the list length
# - (N + 1) * 8 bytes for the element offsets
return (self._list_len + 2) * 8

@property
def _offset_packing_formats(self):
return self._offset_data_start + self._allocated_offsets[-1]

@property
def _offset_back_transform_codes(self):
return self._offset_packing_formats + self._list_len * 8
# - (2 * N + 1) * 8 bytes for the element offsets, packing format,
# and recreation code
return (self._list_len * 2 + 2) * 8

def count(self, value):
"L.count(value) -> integer -- return number of occurrences of value."
Expand Down
10 changes: 10 additions & 0 deletions Misc/NEWS.d/next/Library/2022-02-20-09-03-53.bpo-46799.BgnVIE.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Fix :class:`multiprocessing.shared_memory.ShareableList` memory bloat by
reading the offsets directly from the shared memory. Improve
:class:`multiprocessing.shared_memory.ShareableList` performance by merging
the area in shared memory dedicated to offsets and packing formats together.
This allows a single :func:`struct.unpack_from` call to retrieve both the
offset and the packing format of a sinlge entry Fix UnicodeDecodeError with
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small typo sinlge

multibyte utf8 characters in
:class:`multiprocessing.shared_memory.ShareableList` by allocating the
shared memory using the length of the utf8 encoded string rather than the
length of the string.