Source code for nbtparse.syntax.tags

"""Types for the various NBT tags.

Every tag type has a corresponding Pythonic class.  Most of these are
subclasses of various built-in classes with additional NBT encoding and
decoding functionality.  Generally speaking, ``TAG_Foo``'s counterpart will be
called ``FooTag``.  Most tags are immutable and hashable, and use a
values-based definition of equality.  They generally mimic the behavior of the
corresponding built-in class closely if not actually inheriting from it.

The tags all inherit from :class:`AbstractTag`, an abstract mixin class.
:class:`AbstractTag` provides some method implementations and a great deal of
high-level documentation; if a particular tag's documentation is unclear,
consult :class:`AbstractTag` as well.

This module will also log the encoding and decoding process via
:mod:`logging`; the logger has the same name as the module.  Since encoding
and decoding are generally very low-level processes, nearly everything is
logged at the ``DEBUG`` level; some irregularities when decoding are logged at
``WARNING``, and irregularities while encoding will instead generate ordinary
warnings (i.e.  :func:`warnings.warn`).  See the :mod:`logging` documentation
for instructions on how to access this data or ignore it.

"""

import abc
import math
import numbers
import operator
import struct
import warnings
from functools import total_ordering
import io
import logging

from . import ids
from .. import exceptions


logger = logging.getLogger(__name__)


[docs]def decode_named(input: io.BufferedIOBase, errors: str='strict'): """Decode a named tag from input and returns (name, tag). Errors will be passed to the Unicode decoder when decoding the name and payload. .. note:: :meth:`input.read` must perform any buffering that may be necessary to the underlying I/O; if it returns less data than was requested, that will be interpreted as EOF. :class:`io.BufferedIOBase` and its subclasses satisfy this requirement when in non-interactive mode, but may raise :exc:`BlockingIOError` if in non-blocking mode. If you want to use non-blocking I/O here, consider using :func:`asyncio.run_in_executor` to run the blocking I/O on a separate thread or in a separate process. """ logger.debug("Decoding named tag...") tag_id = int(ByteTag._decode_payload(input, errors)) if tag_id == ids.TAG_End: # TAG_End has no name and no payload result = EndTag() # XXX: Special cases are ugly. logger.debug("Decoded tag %s.", repr(result)) return (None, result) name = StringTag._decode_payload(input, errors) tag = decode_payload(input, tag_id, errors) logger.debug("Decoded tag named %s.", name) return (name, tag)
[docs]class AbstractTag(metaclass=abc.ABCMeta): """Abstract mixin class for tags. All NBT tags inherit from AbstractTag. """ __slots__ = () @abc.abstractmethod def _encode_payload(self, output: io.BufferedIOBase, errors: str='strict') -> int: """Encode the payload of this tag. Writes to output and returns number of bytes written. Output should provide a :meth:`write` method but is otherwise unconstrained in type. If a string needs to be encoded, pass :obj:`errors` to the Unicode encoder; ignored on tags which don't need to encode strings. If a value is out of range, an :exc:`OverflowError` may result. Writes to output and returns bytes written; see :meth:`encode_named` for some caveats related to this. """ raise NotImplementedError('AbstractTag does not implement ' '_encode_payload.') @property @abc.abstractmethod def tag_id(self) -> int: """The ID of this tag (e.g. 1 for a TAG_Byte).""" raise NotImplementedError("AbstractTag does not implement tag_id.")
[docs] def encode_named(self, name: str, output: io.BufferedIOBase, errors: str='strict') -> int: """Encode this tag with a name (e.g. in a ``TAG_Compound``). Name should be a :class:`unicode` object, not a string. :obj:`errors` will be used in encoding the name and payload of this tag. .. note:: :meth:`output.write` must perform any buffering that may be necessary to the underlying I/O; it should write its entire argument, unless something has gone wrong. :class:`io.BufferedIOBase` and its subclasses satisfy this requirement when in non-interactive mode, but may raise :exc:`BlockingIOError` if in non-blocking mode. If you want to use non-blocking I/O here, consider using :func:`asyncio.run_in_executor` to run the blocking I/O on a separate thread or in a separate process. """ total_length = ByteTag(self.tag_id)._encode_payload(output) total_length += StringTag(name)._encode_payload(output, errors) total_length += self._encode_payload(output, errors) logger.debug("Encoded named tag '%s' to output: %i bytes.", name, total_length) return total_length
@classmethod @abc.abstractmethod def _decode_payload(cls, input: io.BufferedIOBase, errors: str='strict'): """Decode a payload from :obj:`input`. Reads from :obj:`input` and returns an instance of this tag. :obj:`input` should provide a :meth:`read` method but is otherwise unconstrained in type. If a string needs to be decoded, pass :obj:`errors` to the Unicode decoder; ignored on tags which don't need to encode strings. Reads from input; see :meth:`~AbstractTag._decode_payload` for some caveats related to this. """ raise NotImplementedError('AbstractTag does not implement ' '_decode_payload.')
@total_ordering
[docs]class EndTag(AbstractTag): """Represents a ``TAG_End``. :class:`EndTag`\ s always compare equal to one another, are immutable and hashable, and are considered :obj:`False` by :func:`bool`. Subclassing it is probably not a great idea. For all practical purposes, you can think of :func:`EndTag` as the tag equivalent of :obj:`None`. You probably won't need this very often; TAG_End mostly only shows up as the terminating sentinel value for ``TAG_Compound``, and :class:`CompoundTag` handles that automatically. It's here if you need it, though. """ __slots__ = () def __repr__(self): return "EndTag()" def __hash__(self): return hash((None, type(self))) # Always the same value def __eq__(self, other): return type(other) is type(self) # All EndTags are equal def __ne__(self, other): return not self == other def __lt__(self, other): if self == other: return False else: return NotImplemented def __bool__(self): return False def _encode_payload(self, output: io.BufferedIOBase, errors: str='strict') -> int: """Does nothing, since ``TAG_End`` has no payload.""" return 0 @property def tag_id(self) -> int: """Equal to :obj:`.ids.TAG_End`.""" return ids.TAG_End
[docs] def encode_named(self, name: str, output: io.BufferedIOBase, errors: str='strict') -> int: """Writes a single null byte to :obj:`output`.""" output.write(b'\x00') return 1
@classmethod def _decode_payload(cls, input: io.BufferedIOBase, errors: str='strict'): """Returns an :class:`EndTag` Does not interact with :obj:`input` at all. """ return cls()
def _make_special_methods(operation): def left_magic_method(self, other): if type(self) is not type(other): return operation(self.value, other) result_value = operation(self.value, other.value) return type(self)(result_value) def right_magic_method(self, other): return operation(other, self.value) return left_magic_method, right_magic_method def _make_unary_special_method(operation): def magic_method(self): result = type(self)(operation(self.value)) class _RealTag(AbstractTag, numbers.Real): __slots__ = ('_value',) _struct_code = None # type: str # String that you pass to struct to indicate how this type is encoded. _byte_width = None # type: int # How many bytes you pass to struct to decode this type. def __init__(self, value): if not isinstance(value, numbers.Real): value = float(value) if isinstance(value, _RealTag): value = value.value try: struct.pack(self._struct_code, value) except struct.error as exc: raise OverflowError('{} is too far from zero to encode.' .format(value)) from exc self._value = value def __float__(self): return float(self.value) def __int__(self): return int(self.value) def _encode_payload(self, output: io.BufferedIOBase, errors='strict'): """Encode a fixed-width value to output.""" try: raw = struct.pack(self._struct_code, self.value) except struct.error: assert False, 'This should have been caught in __init__()' raise # in case someone passed -O to python output.write(raw) total_length = len(raw) logger.debug('Encoded %r: %i bytes.', self, total_length) return total_length @classmethod def _decode_payload(cls, input: io.BufferedIOBase, errors: str='strict') -> int: """Decode a fixed-width value from input.""" buf = io.BytesIO() while buf.tell() < cls._byte_width: raw = input.read(cls._byte_width - buf.tell()) if not raw: raise exceptions.IncompleteSequenceError( 'Needed {} bytes, got {}.'.format(length, len(buf))) buf.write(raw) raw = buf.getvalue() assert len(raw) == cls._byte_width, ( '{!r} != {!r}'.format(len(raw), cls._byte_width)) (value,) = struct.unpack(cls._struct_code, raw) result = cls(value) logger.debug("Decoded fixed-width tag: %r.", result) return result def __repr__(self): return '{}({!r})'.format(type(self).__name__, self.value) @property def value(self): """The numerical value of this tag. Returns the same value that was passed to the constructor, without any conversions (unlike int(self) or float(self)). """ return self._value __abs__ = _make_unary_special_method(operator.abs) __add__, __radd__ = _make_special_methods(operator.add) __floordiv__, __rfloordiv__ = _make_special_methods(operator.floordiv) __mod__, __rmod__ = _make_special_methods(operator.mod) __mul__, __rmul__ = _make_special_methods(operator.mul) __neg__ = _make_unary_special_method(operator.neg) __pos__ = _make_unary_special_method(operator.pos) __pow__, __rpow__ = _make_special_methods(operator.pow) __sub__, __rsub__ = _make_special_methods(operator.sub) __truediv__, __rtruediv__ = _make_special_methods(operator.truediv) __ceil__ = _make_unary_special_method(math.ceil) __floor__ = _make_unary_special_method(math.floor) __trunc__ = _make_unary_special_method(math.trunc) def __round__(self, n=None): if n is not None: return type(self)(round(self.value, n)) else: return round(self.value) def __bool__(self): return bool(self.value) def __eq__(self, other): return other == self.value def __hash__(self): return hash(self.value) # functools.total_ordering won't override abstract methods, so do it by # hand. Note that we always put other on the left so it has an # opportunity to further delegate before the value type gets involved: def __lt__(self, other): return other > self.value def __le__(self, other): return other >= self.value def __gt__(self, other): return other < self.value def __ge__(self, other): return other <= self.value class _IntegralTag(_RealTag, numbers.Integral): def __init__(self, value): if not isinstance(value, numbers.Integral): value = int(value) super().__init__(value) __slots__ = () __and__, __rand__ = _make_special_methods(operator.and_) __invert__ = _make_unary_special_method(operator.inv) __lshift__, __rlshift__ = _make_special_methods(operator.lshift) __or__, __ror__ = _make_special_methods(operator.or_) __rshift__, __rrshift__ = _make_special_methods(operator.rshift) __xor__, __rxor__ = _make_special_methods(operator.xor) __index__ = _RealTag.__int__
[docs]class ByteTag(_IntegralTag): """Represents a ``TAG_Byte``. Acts like a numerical type in every respect except that it doesn't pass explicit type checks. """ __slots__ = () _struct_code = '>b' _byte_width = 1 @property def tag_id(self) -> int: """Equal to :obj:`ids.TAG_Byte`.""" return ids.TAG_Byte
[docs]class ShortTag(_IntegralTag): """Represents a ``TAG_Short``. Acts like a numerical type in every respect except that it doesn't pass explicit type checks. """ __slots__ = () _struct_code = '>h' _byte_width = 2 @property def tag_id(self) -> int: """Equal to :obj:`ids.TAG_Short`.""" return ids.TAG_Short
[docs]class IntTag(_IntegralTag): """Represents a ``TAG_Int``. Acts like a numerical type in every respect except that it doesn't pass explicit type checks. """ __slots__ = () _struct_code = '>i' _byte_width = 4 @property def tag_id(self) -> int: """Equal to :obj:`.ids.TAG_Int`.""" return ids.TAG_Int
[docs]class LongTag(_IntegralTag): """Represents a ``TAG_Long``. Acts like a numerical type in every respect except that it doesn't pass explicit type checks. """ __slots__ = () _struct_code = '>q' _byte_width = 8 @property def tag_id(self) -> int: """Equal to :obj:`.ids.TAG_Long`.""" return ids.TAG_Long
[docs]class FloatTag(_RealTag): """Represents a ``TAG_Float``. Acts like a numerical type in every respect except that it doesn't pass explicit type checks. """ __slots__ = () _struct_code = '>f' _byte_width = 4 @property def tag_id(self) -> int: """Equal to :obj:`.ids.TAG_Float`.""" return ids.TAG_Float
[docs]class DoubleTag(_RealTag): """Represents a ``TAG_Double``. Acts like a numerical type in every respect except that it doesn't pass explicit type checks. """ __slots__ = () _struct_code = '>d' _byte_width = 8 @property def tag_id(self) -> int: """Equal to :obj:`.ids.TAG_Double`.""" return ids.TAG_Double
[docs]class ByteArrayTag(AbstractTag, bytes): """Represents a ``TAG_Byte_Array``. Derives from :class:`bytes`, and can be used anywhere that :class:`bytes` would be valid. Note that this is generally not used to represent text because it lacks encoding information; see :class:`StringTag` for that. """ __slots__ = () def __repr__(self): return "ByteArrayTag({})".format(super().__repr__()) def __str__(self): return super().__str__() def _encode_payload(self, output: io.BufferedIOBase, errors: str='strict') -> int: """Writes this tag as a sequence of raw bytes to output. Returns the total number of bytes written, including the length. """ logger.debug("Encoding TAG_Byte_Array: len = %i.", len(self)) total_length = IntTag(len(self))._encode_payload(output, errors) total_length += len(self) output.write(self) logger.debug("Encoded TAG_Byte_Array: %i bytes.", total_length) return total_length @property def tag_id(self) -> int: """Equal to :obj:`.ids.TAG_Byte_Array`.""" return ids.TAG_Byte_Array @classmethod def _decode_payload(cls, input: io.BufferedIOBase, errors: str='strict'): """Read a ``TAG_Byte_Array`` payload into a new :class:`ByteArrayTag`. """ logger.debug("Decoding TAG_Byte_Array...") array_len = int(IntTag._decode_payload(input, errors)) raw = input.read(array_len) if len(raw) < array_len: raise exceptions.IncompleteSequenceError("Expected {} bytes, " "got {}" .format(len(raw), array_len)) result = cls(raw) logger.debug("Decoded TAG_Byte_Array: len = %i.", len(result)) return result
[docs]class StringTag(AbstractTag, str): """Represents a ``TAG_String``. Derives from :class:`str` and can be used anywhere that :class:`str` is valid. """ __slots__ = () def __repr__(self): return "StringTag({})".format(super().__repr__()) def __str__(self): return super().__str__() def _encode_payload(self, output: io.BufferedIOBase, errors: str='strict') -> int: """Writes this tag as UTF-8 to output. Returns total bytes written, including length. Errors is passed to the Unicode encoder. The default value of ``'strict'`` will cause any problems (e.g. invalid surrogates) to raise a :exc:`UnicodeError`. """ logger.debug("Encoding TAG_String: %r.", self) raw = self.encode('utf_8', errors) total_length = ShortTag(len(raw))._encode_payload(output, errors) total_length += len(raw) output.write(raw) logger.debug("Encoded TAG_String: %i bytes.", total_length) return total_length @property def tag_id(self) -> int: """Equal to :obj:`.ids.TAG_String`.""" return ids.TAG_String @classmethod def _decode_payload(cls, input: io.BufferedIOBase, errors: str='strict'): """Reads a TAG_String payload into a new StringTag. TAG_String is always in UTF-8. Errors is passed to the Unicode encoder. The default value of 'strict' will cause any problems (e.g. invalid UTF-8) to raise a :exc:`UnicodeError`. """ logger.debug("Decoding TAG_String...") length = int(ShortTag._decode_payload(input, errors)) raw = input.read(length) if len(raw) < length: raise exceptions.IncompleteSequenceError("Expected {} bytes," " got {}" .format(len(raw), length)) result = cls(raw, 'utf_8', errors) logger.debug("Decoded TAG_String: %r.", result) return result
[docs]class ListTag(AbstractTag, list): """Represents a ``TAG_List``. Unlike most other tags, this tag is mutable and unhashable. :obj:`instance.content_id<content_id>` identifies the type of the tags listed in this tag. During initialization, ListTag will attempt to guess content_id if it is not provided. If the list is empty, it defaults to None and the list will not be encodable. """ __slots__ = ('_content_id',) def __init__(self, iterable=None, content_id=None): if iterable is None: self._content_id = content_id super().__init__() return super().__init__(iterable) for tag in self: if content_id is None: content_id = tag.tag_id elif tag.tag_id != content_id: raise TypeError("{} has id {}, not {}.".format(repr(tag), tag.tag_id, content_id)) self._content_id = content_id # Bypass property since we just checked @property def content_id(self) -> int: """Identifies the tag id of the tags listed in this ``TAG_List``. Starts at :obj:`None` if the list was initially empty and a content_id was not provided. While this is :obj:`None`, the tag cannot be encoded. """ return self._content_id @content_id.setter def content_id(self, value): for tag in self: if tag.tag_id != value: raise TypeError("{} has id {}, not {}.".format(repr(tag), tag.tag_id, value)) self._content_id = value def __repr__(self): return 'ListTag({}, {})'.format(super().__repr__(), repr(self.content_id)) def __str__(self): return super().__str__() def __eq__(self, other): return (super().__eq__(other) and hasattr(other, "content_id") and self.content_id == other.content_id) def __ne__(self, other): return not self == other def __lt__(self, other): if super().__lt__(other): return True elif super().__eq__(other): if hasattr(other, "content_id"): return self.content_id < other.content_id else: return NotImplemented else: return False # functools.total_ordering won't override list.__gt__ etc. # so do it by hand: def __gt__(self, other): return not self == other and not self < other def __ge__(self, other): return self > other or self == other def __le__(self, other): return self < other or self == other def _encode_payload(self, output: io.BufferedIOBase, errors: str='strict') -> int: """Encodes a series of tag payloads to :obj:`output`. Returns the total number of bytes written, including metadata. """ logger.debug("Encoding TAG_List: %i items.", len(self)) if self.content_id is None: raise ValueError("No content_id specified.") self.content_id = ByteTag(self.content_id) total_length = self.content_id._encode_payload(output, errors) total_length += IntTag(len(self))._encode_payload(output, errors) for tag in self: if tag.tag_id != self.content_id: raise TypeError("{} has id {}, not {}." .format(repr(tag), tag.tag_id, self.content_id)) total_length += tag._encode_payload(output, errors) logger.debug("Encoded TAG_List: %i bytes.", total_length) return total_length @property def tag_id(self) -> int: """Equal to :obj:`.ids.TAG_List`.""" return ids.TAG_List @classmethod def _decode_payload(cls, input: io.BufferedIOBase, errors: str='strict'): """Decode a list of tags.""" logger.debug("Decoding TAG_List...") content_id = int(ByteTag._decode_payload(input, errors)) length = IntTag._decode_payload(input, errors) result = cls(content_id=content_id) for _ in range(length): next_item = decode_payload(input, content_id, errors) result.append(next_item) logger.debug("Decoded TAG_List: %i items.", len(result)) return result
[docs]class CompoundTag(AbstractTag, dict): """Represents a ``TAG_Compound``. Unlike most other tags, this tag is mutable and unhashable. Derives from :class:`dict` and may be used in place of one. Keys are names, values are tags. The terminating ``TAG_End`` is handled automatically; you do not need to worry about it. This implementation does not preserve the order of the tags; this is explicitly permitted under the NBT standard. """ __slots__ = () def __repr__(self): return 'CompoundTag({})'.format(super().__repr__()) def __str__(self): return super().__str__() def _encode_payload(self, output: io.BufferedIOBase, errors: str='strict') -> int: """Encodes contents as a series of named tags. Tags are fully formed, including ids and names. Errors is passed to the Unicode encoder for encoding names, and to the individual tag encoders. """ logger.debug("Encoding TAG_Compound: %i entries.", len(self)) total_length = 0 for name, tag in self.items(): if tag == EndTag(): warnings.warn("Skipping EndTag() in {!r}".format(self), category=exceptions.ValueWarning, stacklevel=2) continue total_length += tag.encode_named(name, output, errors) total_length += EndTag().encode_named(None, output, errors) logger.debug("Encoded TAG_Compound: %i bytes.", total_length) return total_length @property def tag_id(self) -> int: """Equal to :obj:`.ids.TAG_Compound`.""" return ids.TAG_Compound @classmethod def _decode_payload(cls, input: io.BufferedIOBase, errors: str='strict'): """Decodes a series of named tags into a new :class:`CompoundTag`.""" logger.debug("Decoding TAG_Compound...") result = cls() sentinel = EndTag() new_name, new_tag = decode_named(input, errors) while new_tag != sentinel: if new_name in result: logger.warn("Found duplicate %s in TAG_Compound, " "ignoring.", new_name) continue result[new_name] = new_tag new_name, new_tag = decode_named(input, errors) logger.debug("Decoded TAG_Compound: %i entries.", len(result)) return result
[docs]class IntArrayTag(AbstractTag, list): """Represents a ``TAG_Int_Array``. Unlike most other tags, this tag is mutable and unhashable. Derives from :class:`list` and may be used in place of one. """ __slots__ = () def __repr__(self): return 'IntArrayTag({})'.format(super().__repr__()) def __str__(self): return super().__str__() def _encode_payload(self, output, errors='strict'): """Encodes contents as a series of integers.""" logger.debug("Encoding TAG_Int_Array: %i integers.", len(self)) cooked = [IntTag(x) for x in self] length = IntTag(len(cooked)) total_length = length._encode_payload(output, errors) for tag in cooked: total_length += tag._encode_payload(output, errors) logger.debug("Encoded TAG_Int_Array: %i bytes.", total_length) return total_length @property def tag_id(self) -> int: """Equal to :obj:`.ids.TAG_Int_Array`.""" return ids.TAG_Int_Array @classmethod def _decode_payload(cls, input: io.BufferedIOBase, errors: str='strict'): """Decodes a series of integers into a new :class:`IntArrayTag`.""" logger.debug("Decoding TAG_Int_Array...") result = cls() length = IntTag._decode_payload(input, errors) for _ in range(length): item = IntTag._decode_payload(input, errors) result.append(item) logger.debug("Decoded TAG_Int_Array: %i integers.", len(result)) return result
[docs]def decode_payload(input, tag_id: int, errors: str='strict') -> AbstractTag: """Decode a payload with tag ID :obj:`tag_id`. Helper function to look up the appropriate class and call its :meth:`~AbstractTag._decode_payload` method. """ classes = { ids.TAG_End: EndTag, ids.TAG_Byte: ByteTag, ids.TAG_Short: ShortTag, ids.TAG_Int: IntTag, ids.TAG_Long: LongTag, ids.TAG_Float: FloatTag, ids.TAG_Double: DoubleTag, ids.TAG_Byte_Array: ByteArrayTag, ids.TAG_String: StringTag, ids.TAG_List: ListTag, ids.TAG_Compound: CompoundTag, ids.TAG_Int_Array: IntArrayTag, } klass = classes[tag_id] return klass._decode_payload(input, errors)