Source code for xonsh.lazyjson

"""Implements a lazy JSON file class that wraps around json data."""

import collections.abc as cabc
import contextlib
import io
import weakref

try:
    import ujson as json
except ImportError:
    import json  # type: ignore


def _to_json_with_size(obj, offset=0, sort_keys=False):
    if isinstance(obj, str):
        s = json.dumps(obj)
        o = offset
        n = size = len(s.encode())  # size in bytes
    elif isinstance(obj, cabc.Mapping):
        s = "{"
        j = offset + 1
        o = {}
        size = {}
        items = sorted(obj.items()) if sort_keys else obj.items()
        for key, val in items:
            s_k, o_k, n_k, size_k = _to_json_with_size(
                key, offset=j, sort_keys=sort_keys
            )
            s += s_k + ": "
            j += n_k + 2
            s_v, o_v, n_v, size_v = _to_json_with_size(
                val, offset=j, sort_keys=sort_keys
            )
            o[key] = o_v
            size[key] = size_v
            s += s_v + ", "
            j += n_v + 2
        if s.endswith(", "):
            s = s[:-2]
        s += "}\n"
        n = len(s)
        o["__total__"] = offset
        size["__total__"] = n
    elif isinstance(obj, cabc.Sequence):
        s = "["
        j = offset + 1
        o = []
        size = []
        for x in obj:
            s_x, o_x, n_x, size_x = _to_json_with_size(x, offset=j, sort_keys=sort_keys)
            o.append(o_x)
            size.append(size_x)
            s += s_x + ", "
            j += n_x + 2
        if s.endswith(", "):
            s = s[:-2]
        s += "]\n"
        n = len(s)
        o.append(offset)
        size.append(n)
    else:
        s = json.dumps(obj, sort_keys=sort_keys)
        o = offset
        n = size = len(s)
    return s, o, n, size


[docs] def index(obj, sort_keys=False): """Creates an index for a JSON file.""" idx = {} json_obj = _to_json_with_size(obj, sort_keys=sort_keys) s, idx["offsets"], _, idx["sizes"] = json_obj return s, idx
JSON_FORMAT = """{{"locs": [{iloc:>10}, {ilen:>10}, {dloc:>10}, {dlen:>10}], "index": {index}, "data": {data} }} """
[docs] def dumps(obj, sort_keys=False): """Dumps an object to JSON with an index.""" data, idx = index(obj, sort_keys=sort_keys) jdx = json.dumps(idx, sort_keys=sort_keys) iloc = 69 ilen = len(jdx) dloc = iloc + ilen + 11 dlen = len(data) s = JSON_FORMAT.format( index=jdx, data=data, iloc=iloc, ilen=ilen, dloc=dloc, dlen=dlen ) return s
[docs] def ljdump(obj, fp, sort_keys=False): """Dumps an object to JSON file.""" s = dumps(obj, sort_keys=sort_keys) fp.write(s)
[docs] class LJNode(cabc.Mapping, cabc.Sequence): """A proxy node for JSON nodes. Acts as both sequence and mapping.""" def __init__(self, offsets, sizes, root): """Parameters ---------- offsets : dict, list, or int offsets of corresponding data structure, in bytes sizes : dict, list, or int sizes of corresponding data structure, in bytes root : weakref.proxy of LazyJSON weakref back to root node, which should be a LazyJSON object. """ self.offsets = offsets self.sizes = sizes self.root = root self.is_mapping = isinstance(self.offsets, cabc.Mapping) self.is_sequence = isinstance(self.offsets, cabc.Sequence) def __len__(self): # recall that for maps, the '__total__' key is added and for # sequences the last element represents the total size/offset. return len(self.sizes) - 1
[docs] def load(self): """Returns the Python data structure represented by the node.""" if self.is_mapping: offset = self.offsets["__total__"] size = self.sizes["__total__"] elif self.is_sequence: offset = self.offsets[-1] size = self.sizes[-1] elif isinstance(self.offsets, int): offset = self.offsets size = self.sizes return self._load_or_node(offset, size)
def _load_or_node(self, offset, size): if isinstance(offset, int): with self.root._open(newline="\n") as f: f.seek(self.root.dloc + offset) s = f.read(size) val = json.loads(s) elif isinstance(offset, (cabc.Mapping, cabc.Sequence)): val = LJNode(offset, size, self.root) else: raise TypeError("incorrect types for offset node") return val def _getitem_mapping(self, key): if key == "__total__": raise KeyError('"__total__" is a special LazyJSON key!') offset = self.offsets[key] size = self.sizes[key] return self._load_or_node(offset, size) def _getitem_sequence(self, key): if isinstance(key, int): rtn = self._load_or_node(self.offsets[key], self.sizes[key]) elif isinstance(key, slice): key = slice(*key.indices(len(self))) rtn = list(map(self._load_or_node, self.offsets[key], self.sizes[key])) else: raise TypeError("only integer indexing available") return rtn def __getitem__(self, key): if self.is_mapping: rtn = self._getitem_mapping(key) elif self.is_sequence: rtn = self._getitem_sequence(key) else: raise NotImplementedError return rtn def __iter__(self): if self.is_mapping: keys = set(self.offsets.keys()) keys.discard("__total__") yield from iter(keys) elif self.is_sequence: i = 0 n = len(self) while i < n: yield self._load_or_node(self.offsets[i], self.sizes[i]) i += 1 else: raise NotImplementedError
[docs] class LazyJSON(LJNode): """Represents a lazy json file. Can be used like a normal Python dict or list. """ def __init__(self, f, reopen=True): """Parameters ---------- f : file handle or str JSON file to open. reopen : bool, optional Whether new file handle should be opened for each load. """ self._f = f self.reopen = reopen if not reopen and isinstance(f, str): self._f = open(f, newline="\n") self._load_index() self.root = weakref.proxy(self) self.is_mapping = isinstance(self.offsets, cabc.Mapping) self.is_sequence = isinstance(self.offsets, cabc.Sequence) def __del__(self): self.close()
[docs] def close(self): """Close the file handle, if appropriate.""" if not self.reopen and isinstance(self._f, io.IOBase): try: self._f.close() except OSError: pass
@contextlib.contextmanager def _open(self, *args, **kwargs): if self.reopen and isinstance(self._f, str): f = open(self._f, *args, **kwargs) yield f f.close() else: yield self._f def _load_index(self): """Loads the index from the start of the file.""" with self._open(newline="\n") as f: # read in the location data f.seek(9) locs = f.read(48) locs = json.loads(locs) self.iloc, self.ilen, self.dloc, self.dlen = locs # read in the index f.seek(self.iloc) idx = f.read(self.ilen) idx = json.loads(idx) self.offsets = idx["offsets"] self.sizes = idx["sizes"] def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close()