""" Handlers for Content-Encoding. See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding """ import codecs import io import typing import zlib from ._exceptions import DecodingError try: import brotlicffi except ImportError: # pragma: nocover brotlicffi = None class ContentDecoder: def decode(self, data: bytes) -> bytes: raise NotImplementedError() # pragma: nocover def flush(self) -> bytes: raise NotImplementedError() # pragma: nocover class IdentityDecoder(ContentDecoder): """ Handle unencoded data. """ def decode(self, data: bytes) -> bytes: return data def flush(self) -> bytes: return b"" class DeflateDecoder(ContentDecoder): """ Handle 'deflate' decoding. See: https://stackoverflow.com/questions/1838699 """ def __init__(self) -> None: self.first_attempt = True self.decompressor = zlib.decompressobj() def decode(self, data: bytes) -> bytes: was_first_attempt = self.first_attempt self.first_attempt = False try: return self.decompressor.decompress(data) except zlib.error as exc: if was_first_attempt: self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS) return self.decode(data) raise DecodingError(str(exc)) from exc def flush(self) -> bytes: try: return self.decompressor.flush() except zlib.error as exc: # pragma: nocover raise DecodingError(str(exc)) from exc class GZipDecoder(ContentDecoder): """ Handle 'gzip' decoding. See: https://stackoverflow.com/questions/1838699 """ def __init__(self) -> None: self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16) def decode(self, data: bytes) -> bytes: try: return self.decompressor.decompress(data) except zlib.error as exc: raise DecodingError(str(exc)) from exc def flush(self) -> bytes: try: return self.decompressor.flush() except zlib.error as exc: # pragma: nocover raise DecodingError(str(exc)) from exc class BrotliDecoder(ContentDecoder): """ Handle 'brotli' decoding. Requires `pip install brotlipy`. See: https://brotlipy.readthedocs.io/ or `pip install brotli`. See https://github.com/google/brotli Supports both 'brotlipy' and 'Brotli' packages since they share an import name. The top branches are for 'brotlipy' and bottom branches for 'Brotli' """ def __init__(self) -> None: if brotlicffi is None: # pragma: nocover raise ImportError( "Using 'BrotliDecoder', but the 'brotlicffi' library " "is not installed." "Make sure to install httpx using `pip install httpx[brotli]`." ) from None self.decompressor = brotlicffi.Decompressor() self.seen_data = False if hasattr(self.decompressor, "decompress"): self._decompress = self.decompressor.decompress else: self._decompress = self.decompressor.process # pragma: nocover def decode(self, data: bytes) -> bytes: if not data: return b"" self.seen_data = True try: return self.decompressor.decompress(data) except brotlicffi.Error as exc: raise DecodingError(str(exc)) from exc def flush(self) -> bytes: if not self.seen_data: return b"" try: if hasattr(self.decompressor, "finish"): self.decompressor.finish() return b"" except brotlicffi.Error as exc: # pragma: nocover raise DecodingError(str(exc)) from exc class MultiDecoder(ContentDecoder): """ Handle the case where multiple encodings have been applied. """ def __init__(self, children: typing.Sequence[ContentDecoder]) -> None: """ 'children' should be a sequence of decoders in the order in which each was applied. """ # Note that we reverse the order for decoding. self.children = list(reversed(children)) def decode(self, data: bytes) -> bytes: for child in self.children: data = child.decode(data) return data def flush(self) -> bytes: data = b"" for child in self.children: data = child.decode(data) + child.flush() return data class ByteChunker: """ Handles returning byte content in fixed-size chunks. """ def __init__(self, chunk_size: int = None) -> None: self._buffer = io.BytesIO() self._chunk_size = chunk_size def decode(self, content: bytes) -> typing.List[bytes]: if self._chunk_size is None: return [content] self._buffer.write(content) if self._buffer.tell() >= self._chunk_size: value = self._buffer.getvalue() chunks = [ value[i : i + self._chunk_size] for i in range(0, len(value), self._chunk_size) ] if len(chunks[-1]) == self._chunk_size: self._buffer.seek(0) self._buffer.truncate() return chunks else: self._buffer.seek(0) self._buffer.write(chunks[-1]) self._buffer.truncate() return chunks[:-1] else: return [] def flush(self) -> typing.List[bytes]: value = self._buffer.getvalue() self._buffer.seek(0) self._buffer.truncate() return [value] if value else [] class TextChunker: """ Handles returning text content in fixed-size chunks. """ def __init__(self, chunk_size: int = None) -> None: self._buffer = io.StringIO() self._chunk_size = chunk_size def decode(self, content: str) -> typing.List[str]: if self._chunk_size is None: return [content] self._buffer.write(content) if self._buffer.tell() >= self._chunk_size: value = self._buffer.getvalue() chunks = [ value[i : i + self._chunk_size] for i in range(0, len(value), self._chunk_size) ] if len(chunks[-1]) == self._chunk_size: self._buffer.seek(0) self._buffer.truncate() return chunks else: self._buffer.seek(0) self._buffer.write(chunks[-1]) self._buffer.truncate() return chunks[:-1] else: return [] def flush(self) -> typing.List[str]: value = self._buffer.getvalue() self._buffer.seek(0) self._buffer.truncate() return [value] if value else [] class TextDecoder: """ Handles incrementally decoding bytes into text """ def __init__(self, encoding: typing.Optional[str] = None): self.decoder: typing.Optional[codecs.IncrementalDecoder] = None if encoding is not None: self.decoder = codecs.getincrementaldecoder(encoding)(errors="strict") def decode(self, data: bytes) -> str: """ If an encoding is explicitly specified, then we use that. Otherwise our strategy is to attempt UTF-8, and fallback to Windows 1252. Note that UTF-8 is a strict superset of ascii, and Windows 1252 is a superset of the non-control characters in iso-8859-1, so we essentially end up supporting any of ascii, utf-8, iso-8859-1, cp1252. Given that UTF-8 is now by *far* the most widely used encoding, this should be a pretty robust strategy for cases where a charset has not been explicitly included. Useful stats on the prevalence of different charsets in the wild... * https://w3techs.com/technologies/overview/character_encoding * https://w3techs.com/technologies/history_overview/character_encoding The HTML5 spec also has some useful guidelines, suggesting defaults of either UTF-8 or Windows 1252 in most cases... * https://dev.w3.org/html5/spec-LC/Overview.html """ if self.decoder is None: # If this is the first decode pass then we need to determine which # encoding to use by attempting UTF-8 and raising any decode errors. attempt_utf_8 = codecs.getincrementaldecoder("utf-8")(errors="strict") try: attempt_utf_8.decode(data) except UnicodeDecodeError: # Could not decode as UTF-8. Use Windows 1252. self.decoder = codecs.getincrementaldecoder("cp1252")(errors="replace") else: # Can decode as UTF-8. Use UTF-8 with lenient error settings. self.decoder = codecs.getincrementaldecoder("utf-8")(errors="replace") return self.decoder.decode(data) def flush(self) -> str: if self.decoder is None: return "" return self.decoder.decode(b"", True) class LineDecoder: """ Handles incrementally reading lines from text. Uses universal line decoding, supporting any of `\n`, `\r`, or `\r\n` as line endings, normalizing to `\n`. """ def __init__(self) -> None: self.buffer = "" def decode(self, text: str) -> typing.List[str]: lines = [] if text and self.buffer and self.buffer[-1] == "\r": if text.startswith("\n"): # Handle the case where we have an "\r\n" split across # our previous input, and our new chunk. lines.append(self.buffer[:-1] + "\n") self.buffer = "" text = text[1:] else: # Handle the case where we have "\r" at the end of our # previous input. lines.append(self.buffer[:-1] + "\n") self.buffer = "" while text: num_chars = len(text) for idx in range(num_chars): char = text[idx] next_char = None if idx + 1 == num_chars else text[idx + 1] if char == "\n": lines.append(self.buffer + text[: idx + 1]) self.buffer = "" text = text[idx + 1 :] break elif char == "\r" and next_char == "\n": lines.append(self.buffer + text[:idx] + "\n") self.buffer = "" text = text[idx + 2 :] break elif char == "\r" and next_char is not None: lines.append(self.buffer + text[:idx] + "\n") self.buffer = "" text = text[idx + 1 :] break elif next_char is None: self.buffer += text text = "" break return lines def flush(self) -> typing.List[str]: if self.buffer.endswith("\r"): # Handle the case where we had a trailing '\r', which could have # been a '\r\n' pair. lines = [self.buffer[:-1] + "\n"] elif self.buffer: lines = [self.buffer] else: lines = [] self.buffer = "" return lines SUPPORTED_DECODERS = { "identity": IdentityDecoder, "gzip": GZipDecoder, "deflate": DeflateDecoder, "br": BrotliDecoder, } if brotlicffi is None: SUPPORTED_DECODERS.pop("br") # pragma: nocover