Source code for nbtparse.minecraft.terrain._voxel

"""Terrain editing at the level of individual blocks.

.. note::

    This is the pure Python version of the module.  Pass ``--cythonize`` to
    ``setup.py`` to build the Cython version.  Once it's built, ``setup.py``
    will install it automatically.

"""

import array
import collections
from collections import abc as cabc
import itertools
import logging
import struct
import sys
import warnings

from ...semantics import nbtobject, fields
from ... import exceptions
from . import tile

name = __name__.split('.')
name[-1] = 'voxel'
logging_name = '.'.join(name)

logger = logging.getLogger(logging_name)

del name
del logging_name


[docs]class Block: """A basic block, not including tile entity data. Blocks are not :class:`~.nbtobject.NBTObject`\ s because Minecraft does not store them that way. They cannot be serialized directly to NBT. Instead, Minecraft stores blocks in groups called chunks. id is the block id, a nonnegative integer. data is the damage value, also a nonnegative integer. Blocks compare equal if they have the same id and data. Blocks are immutable and hashable. """ def __init__(self, id: int, data: int=0): self._id = id self._data = data @property def id(self): """The block ID.""" return self._id @property def data(self): """The block damage value.""" return self._data def __repr__(self): return 'Block({!r}, {!r})'.format(self.id, self.data) def __eq__(self, other): if type(self) is not type(other): return False return self.id == other.id and self.data == other.data def __ne__(self, other): return not self == other def __hash__(self): return hash((self.id, self.data, type(self)))
[docs]class VoxelBuffer: """A 3D buffer of blocks. Stores block IDs and damage values in much the same way as Minecraft: a "packed" format involving 8-bit strings (i.e. :class:`bytearray`). Basic constructor will create a buffer full of air (block ID 0). Blocks may be retrieved or changed with standard subscription. Slicing is also acceptable, and assigning a block to a slice is interpreted as filling the slice with the block. However, it is not possible to resize a buffer once created. Replacing a single block with subscription will erase the accompanying tile entity. Slicing is a shallow copy, and will only duplicate references to tile entities. Slicing a VoxelBuffer is a potentially expensive operation. It is linear in the total volume sliced, which sounds fine until we realize that the volume of a cube is (logically enough) cubic in the edge length. Slices covering an entire VoxelBuffer are very fast, as they are optimized into a single memory copy. This currently does not apply to any other slices. Manipulating individual blocks is relatively fast, however. VoxelBuffers can be iterated. This is done in the rather esoteric YZX order rather than the conventional XYZ order because the former is more natural in practice. The :meth:`VoxelBuffer.enumerate` method may be used to prepend (x, y, z) coordinates in much the same way as the builtin :func:`~python:enumerate` does. Standard iteration is marginally faster than the following code, because it skips some unnecessary sanity checks:: for y in range(vb.height): for z in range(vb.width): for x in range(vb.length): yield vb[x, y, z] If you want to iterate in the XYZ ordering instead, use :meth:`xyz`. .. note:: Do not write this, as it does not work:: vb = VoxelBuffer(length, height, width) block = vb[x][y][z] Instead, write this:: vb = VoxelBuffer(length, height, width) block = vb[x, y, z] """ def __init__(self, length: int, height: int, width: int): logger.debug('Creating empty VoxelBuffer %r x %r x %r.', length, height, width) self._length = length self._height = height self._width = width volume = length * width * height self._contents = array.array('H', itertools.repeat(0, volume)) assert len(self._contents) == volume self._tilemap = {} # from relative coordinates to TileEntities self._observers = set() # of callable objects self._shape = (length, height, width) sizeof_short = self._contents.itemsize self._strides = (sizeof_short, length * width * sizeof_short, length * sizeof_short) @property def length(self): """The length (X-axis) of the buffer. .. note:: This should not be confused with the :func:`len` of the buffer, which is the product of the length, width, and height (i.e. the volume), for consistency with iteration. """ return self._length @property def width(self): """The width (Z-axis) of the buffer.""" return self._width @property def height(self): """The height (Y-axis) of the buffer.""" return self._height @property def tilemap(self) -> cabc.MutableMapping: """Mutable mapping from coordinates to tile entities. Overwriting a single block with subscription will delete the corresponding entry from this mapping, even if the block is otherwise unchanged. Overwriting a slice of blocks will replace the entries with the equivalent entries from the other buffer. Example code:: vb.tilemap[(x, y, z)] = tile.TileEntity(...) The following is exactly equivalent and, in the opinion of the author, more readable:: vb.tilemap[x, y, z] = tile.TileEntity(...) .. note:: This is a mapping, not a 3D array or similar. It cannot be sliced and negative indices will not be converted into positive indices. .. note:: Coordinates are always relative to the origin of the VoxelBuffer, as with subscription. This means they do not correspond to the :attr:`.TileEntity.coords` attribute. Furthermore, said attribute will be ignored and overwritten with the coordinates provided to this mapping when the chunk is placed into a region. """ return self._tilemap def __len__(self): return self.length * self.width * self.height def __eq__(self, other): if type(self) is not type(other): return False if (self.length != other.length or self.width != other.width or self.height != other.height): return False return self._contents == other._contents def __ne__(self, other): return not self == other def __repr__(self): return '<VoxelBuffer: {!r} x {!r} x {!r} blocks>'.format(self.length, self.height, self.width) def __array__(self): """Return a NumPy ndarray with the same contents as this VoxelBuffer. The 12 least significant bits of each element are the block ID, and the 4 next-least significant bits are the data ("damage") value. Tile entities are not included. To the greatest extent possible, we try to avoid unnecessary memory copying, so if you need an actual copy, you should make one explicitly. """ import numpy as np assert self._contents.itemsize == np.ushort.itemsize, ( "sizeof(unsigned short) is inconsistent between Python and NumPy") shape = self._shape strides = self._strides result = np.ndarray(shape=shape, buffer=self._contents, dtype=np.ushort, strides=strides) return result
[docs] def watch(self, observer: callable): """Register an observer. The observer will be called with three arguments every time the buffer is modified. The arguments will be one of the following: * The (x, y, z) coordinates of the single block which has changed. * Three :class:`range` objects, describing the set of blocks which have changed (the range of X coordinates, Y coordinates, and Z coordinates). The observer must be hashable. If this is a problem, wrap it in a lambda expression. Changes to :attr:`tilemap` will not trigger notifications. """ self._observers.add(observer)
[docs] def unwatch(self, observer: callable): """Unregister a previously-registered observer. Undoes a previous call to :meth:`watch`. :exc:`KeyError` is raised if the observer was never registered. Observers are not automatically unregistered, so it is important to call this method manually if you intend the VoxelBuffer to outlive the observers. """ self._observers.remove(observer)
def _notify_all(self, arg_x, arg_y, arg_z): """Notify all observers.""" for observer in self._observers: observer(arg_x, arg_y, arg_z) def _get_single_block(self, index: (int, int, int)) -> Block: """Helper function for VoxelBuffer subscription. Directly retrieves block number index=(x, y, z). No slicing, no sanity checks. .. warning:: Do not invoke directly. Instead use :meth:`VoxelBuffer.__getitem__`. If non-"sane" parameters are passed to this function, the result is undefined behavior. Furthermore, this function's standards are much stricter than what's acceptable to :meth:`VoxelBuffer.__getitem__`. """ x, y, z = index raw_index = (y * (self.length * self.width) + z * (self.length) + x) raw_data = self._contents[raw_index] block_id = raw_data & 0xFFF block_data = raw_data >> 12 return Block(block_id, block_data) def _set_single_block(self, index: (int, int, int), value: Block): """Helper function for VoxelBuffer subscription. Directly replaces block number index=(x, y, z). No slicing, no sanity checks. .. warning:: Do not invoke directly. Instead use :meth:`VoxelBuffer.__setitem__`. If non-"sane" parameters are passed to this function, the result is undefined behavior. Furthermore, this function's standards are much stricter than what's acceptable to :meth:`VoxelBuffer.__setitem__`. """ x, y, z = index raw_index = (y * (self.length * self.width) + z * (self.length) + x) raw_data = (value.id & 0xFFF) | (value.data << 12) self._contents[raw_index] = raw_data self._tilemap.pop(index, None) def _fix_index(self, index: tuple) -> tuple: """Do basic sanitization on :obj:`index` for n-dimensional indexing. :obj:`index` will be converted into a tuple of integers or slice objects via :obj:`Ellipsis` expansion. If this is not possible or :obj:`index` does not have a "sensible" value, a :exc:`TypeError` is raised. Additionally, negative integers are converted into nonnegative integers, if all items in the tuple are integers. If the tuple contains slices, all elements are converted into slices. """ PROPER_INDEX_LENGTH = 3 # Number of dimensions we want to end up with logger.debug('Sanitizing index.') if index is Ellipsis: index = (slice(None),) * PROPER_INDEX_LENGTH logger.debug('Expanded "..." to %r', index) if type(index) is not tuple: raise TypeError('Index must be a tuple or "..."') if len(index) > PROPER_INDEX_LENGTH: raise TypeError('Index must be a {}-tuple or shorter.' .format(PROPER_INDEX_LENGTH)) elif len(index) < PROPER_INDEX_LENGTH: try: ellipsis_loc = index.index(Ellipsis) # If there is no Ellipsis, the user gave us too few items except ValueError as exc: new_exc = TypeError('Index must be expandable (via "...") ' 'into a {}-tuple.' .format(PROPER_INDEX_LENGTH)) raise new_exc from exc before = index[:ellipsis_loc] # Everything before the Ellipsis after = index[ellipsis_loc+1:] # Everything after it # We only cut out one element, so: assert len(before) + 1 + len(after) == len(index) # Equivalently, len(before) + len(after) == len(index) - 1 # By the above math, the number of slices to expand into is as # follows: middle = (slice(None),) * (PROPER_INDEX_LENGTH - (len(index) - 1)) logger.debug('Expanded "..." to %r', middle) # Rebuild the index with the new slices: index = tuple(itertools.chain(before, middle, after)) assert len(index) == PROPER_INDEX_LENGTH if not all(type(x) is int or type(x) is slice for x in index): raise TypeError('Index should consist of ints and slices after ' '"..." expansion.') if all(type(x) is int for x in index): logger.debug('Index is numeric, doing range checking.') x, y, z = index if x < 0: x += self.length if y < 0: y += self.height if z < 0: z += self.width if not (0 <= x < self.length): raise IndexError('X coordinate is out of range') if not (0 <= y < self.height): raise IndexError('Y coordinate is out of range') if not (0 <= z < self.width): raise IndexError('Z coordinate is out of range') index = (x, y, z) elif any(type(x) is int for x in index): # Convert relevant items into slices warnings.warn('Mixture of slices and integers in index.', category=exceptions.SliceWarning, stacklevel=3) index = tuple(i if type(i) is slice else slice(i, i+1, None) for i in index) logger.debug('Index sanitized to %r.', index) return index def __getitem__(self, index: tuple): logger.info('Retrieving index = %r from %r.', index, self) index = self._fix_index(index) if all(type(x) is int for x in index): logger.info('Retrieving single block %r.', index) return self._get_single_block(index) # Create a new VoxelBuffer with the required values logger.info('Retrieving slice %r.', index) if index == (slice(None), slice(None), slice(None)): # Special case foo[...]; let Python do the heavy lifting result = type(self)(self.length, self.height, self.width) result._contents = self._contents[:] return result x_slice, y_slice, z_slice = index x_range = range(*x_slice.indices(self.length)) y_range = range(*y_slice.indices(self.height)) z_range = range(*z_slice.indices(self.width)) result = type(self)(len(x_range), len(y_range), len(z_range)) for x_count, x in enumerate(x_range): for y_count, y in enumerate(y_range): for z_count, z in enumerate(z_range): my_index = (x, y, z) their_index = (x_count, y_count, z_count) result._set_single_block(their_index, self._get_single_block(my_index)) for (x, y, z), tile in self._tilemap.items(): if x in x_range and y in y_range and z in z_range: my_index = (x, y, z) their_index = ((x - x_range.start) // x_range.step, (y - y_range.start) // y_range.step, (z - z_range.start) // z_range.step) result._tilemap[their_index] = tile return result def __setitem__(self, index: tuple, value): logger.info('Retrieving index = %r from %r.', index, self) index = self._fix_index(index) if all(type(x) is int for x in index): logger.info('Setting single block %r to %r.', index, value) if value.id < 0: raise ValueError('Cannot use block with negative ID') self._set_single_block(index, value) self._notify_all(*index) return logger.info('Setting slice %r to %r.', index, value) x_slice, y_slice, z_slice = index x_range = range(*x_slice.indices(self.length)) y_range = range(*y_slice.indices(self.height)) z_range = range(*z_slice.indices(self.width)) if type(value) is Block: # Fill the slice with value if value.id < 0: raise ValueError('Cannot ues block with negative ID') blk = value volume = len(x_range) * len(y_range) * len(z_range) value = VoxelBuffer(len(x_range), len(y_range), len(z_range)) raw_value = (blk.id & 0xFFF) | ((blk.data & 0xF) << 12) raw_data = struct.pack('H', raw_value) value._contents = array.array('H', raw_data*volume) if len(x_range) != value.length: raise ValueError('Cannot resize (in the X direction) via slicing') if len(y_range) != value.height: raise ValueError('Cannot resize (in the Y direction) via slicing') if len(z_range) != value.width: raise ValueError('Cannot resize (in the Z direction) via slicing') if index == (slice(None), slice(None), slice(None)): # Optimize foo[...] = bar self._contents = value._contents[:] self._notify_all(x_range, y_range, z_range) return if value is self: # Make a copy to guarantee sanity value = value[...] try: get_block = value._get_single_block except AttributeError: get_block = lambda index: value[index] for x_count, x in enumerate(x_range): for y_count, y in enumerate(y_range): for z_count, z in enumerate(z_range): my_index = (x, y, z) their_index = (x_count, y_count, z_count) self._set_single_block(my_index, get_block(their_index)) for (x, y, z), tile in list(self._tilemap.items()): if x in x_range and y in y_range and z in z_range: del self._tilemap[x, y, z] for (x, y, z), tile in value._tilemap.items(): their_index = (x, y, z) my_index = (x_range[x], y_range[y], z_range[z]) self._tilemap[my_index] = tile self._notify_all(x_range, y_range, z_range) @classmethod
[docs] def from_raw(cls, ids: bytes, addids: bytes, damages: bytes, length: int, height: int, width: int): """Create a new VoxelBuffer from the provided buffers. These buffers should be :class:`bytes` objects (or acceptable to :func:`bytes`). Each byte or nybble should correspond to a block, in the same format as minecraft stores terrain data. No tile entities will be attached to the terrain. You must do this manually. Do not use this to duplicate a VoxelBuffer. Instead, take a slice of the entire buffer. """ volume = length * height * width nybble_volume = (volume // 2) + (volume % 2) # Make copies and do type-checking: ids = bytearray(ids) addids = bytearray(addids) damages = bytearray(damages) if len(ids) != volume: raise ValueError("Wrong number of bytes in ids.") if len(addids) != nybble_volume: raise ValueError("Wrong number of bytes in addids.") if len(damages) != nybble_volume: raise ValueError("Wrong number of bytes in damages.") new = cls(length, height, width) for y in range(height): for z in range(width): for x in range(length): index = y*width*length + z*length + x nybble_index = index // 2 # Odd-numbered damage values are in the high nybble: nybble_shift = 4 * (index%2) block_id = ids[index] add_id = addids[nybble_index] >> nybble_shift add_id &= 0xF block_id |= add_id << 8 block_data = damages[nybble_index] >> nybble_shift block_data &= 0xF new._set_single_block((x, y, z), Block(block_id, block_data)) return new
[docs] def to_raw(self) -> (bytes, bytes, bytes): """Return the raw buffers as used by Minecraft.""" volume = len(self) nybble_volume = (volume // 2) + (volume % 2) base_ids = bytearray(volume) add_ids = bytearray(nybble_volume) damages = bytearray(nybble_volume) length = self.length height = self.height width = self.width for y in range(height): for z in range(width): for x in range(length): index = y*width*length + z*length + x nybble_index = index // 2 # Odd-numbered damage values are in the high nybble: nybble_shift = 4 * (index%2) blk = self._get_single_block((x, y, z)) base_id = blk.id & 0xFF add_id = blk.id >> 8 damage = blk.data base_ids[index] = base_id add_ids[nybble_index] |= add_id << nybble_shift damages[nybble_index] |= damage << nybble_shift return bytes(base_ids), bytes(add_ids), bytes(damages)
[docs] def empty(self) -> bool: """Return True if every block is equal to Block(0, 0).""" zero_block = Block(0, 0) return all(block == zero_block for block in self)
[docs] def enumerate(self): """Iterate over the blocks in the buffer, with coordinates. Produces the (x, y, z) coordinates of each block in addition to the block itself:: for (x, y, z), block in vb.enumerate(): # block is equivalent to vb[x, y, z] """ for y in range(self.height): for z in range(self.width): for x in range(self.length): yield ((x, y, z), self._get_single_block((x, y, z)))
[docs] def xyz(self): """Iterate over the blocks in the buffer, in XYZ order. Produces coordinates, just like :meth:`VoxelBuffer.enumerate`. """ for x in range(self.length): for y in range(self.height): for z in range(self.width): yield ((x, y, z), self._get_single_block((x, y, z)))
def __iter__(self): for _, block in self.enumerate(): yield block def __reversed__(self): for y in reversed(range(self.height)): for z in reversed(range(self.width)): for x in reversed(range(self.length)): yield self._get_single_block((x, y, z))