Source code for xonsh.xoreutils.cat

"""Implements a cat command for xonsh."""

import os
import sys
import time

import xonsh.procs.pipelines as xpp
from xonsh.built_ins import XSH
from xonsh.xoreutils.util import arg_handler, run_alias


def _cat_line(
    f, sep, last_was_blank, line_count, opts, out, enc, enc_errors, read_size
):
    _r = r = f.readline(80)
    restore_newline = False
    if isinstance(_r, str):
        _r = r = _r.encode(enc, enc_errors)
    if r == b"":
        return last_was_blank, line_count, read_size, True
    if r.endswith(sep):
        _r = _r[: -len(sep)]
        restore_newline = True
    this_one_blank = _r == b""
    if last_was_blank and this_one_blank and opts["squeeze_blank"]:
        return last_was_blank, line_count, read_size, False
    last_was_blank = this_one_blank
    if opts["number_all"] or (opts["number_nonblank"] and not this_one_blank):
        start = ("%6d " % line_count).encode(enc, enc_errors)
        _r = start + _r
        line_count += 1
    if opts["show_ends"]:
        _r = _r + b"$"
    if restore_newline:
        _r = _r + sep
    out.buffer.write(_r)
    out.flush()
    read_size += len(r)
    return last_was_blank, line_count, read_size, False


def _cat_single_file(opts, fname, stdin, out, err, line_count=1):
    env = XSH.env
    enc = env.get("XONSH_ENCODING")
    enc_errors = env.get("XONSH_ENCODING_ERRORS")
    read_size = 0
    file_size = fobj = None
    if fname == "-":
        f = stdin or sys.stdin
    elif os.path.isdir(fname):
        print(f"cat: {fname}: Is a directory.", file=err)
        return True, line_count
    elif not os.path.exists(fname):
        print(f"cat: No such file or directory: {fname}", file=err)
        return True, line_count
    else:
        file_size = os.stat(fname).st_size
        if file_size == 0:
            file_size = None
        fobj = open(fname, "rb")
        f = xpp.NonBlockingFDReader(fobj.fileno(), timeout=0.1)
    sep = os.linesep.encode(enc, enc_errors)
    last_was_blank = False
    while file_size is None or read_size < file_size:
        try:
            last_was_blank, line_count, read_size, endnow = _cat_line(
                f,
                sep,
                last_was_blank,
                line_count,
                opts,
                out,
                enc,
                enc_errors,
                read_size,
            )
            if endnow:
                break
            if last_was_blank:
                time.sleep(1e-3)
        except KeyboardInterrupt:
            print("got except", flush=True, file=out)
            break
        except Exception as e:
            print("xonsh:", e, flush=True, file=out)
            pass
    if fobj is not None:
        fobj.close()
    return False, line_count


[docs] def cat(args, stdin, stdout, stderr): """A cat command for xonsh.""" opts = _cat_parse_args(args) if opts is None: print(CAT_HELP_STR, file=stdout) return 0 line_count = 1 errors = False if len(args) == 0: args = ["-"] for i in args: o = _cat_single_file(opts, i, stdin, stdout, stderr, line_count) if o is None: return -1 _e, line_count = o errors = _e or errors return int(errors)
def _cat_parse_args(args): out = { "number_nonblank": False, "number_all": False, "squeeze_blank": False, "show_ends": False, } if "--help" in args: return arg_handler(args, out, "-b", "number_nonblank", True, "--number-nonblank") arg_handler(args, out, "-n", "number_all", True, "--number") arg_handler(args, out, "-E", "show_ends", True, "--show-ends") arg_handler(args, out, "-s", "squeeze_blank", True, "--squeeze-blank") arg_handler(args, out, "-T", "show_tabs", True, "--show-tabs") return out CAT_HELP_STR = """This version of cat was written in Python for the xonsh project: http://xon.sh Based on cat from GNU coreutils: http://www.gnu.org/software/coreutils/ Usage: cat [OPTION]... [FILE]... Concatenate FILE(s), or standard input, to standard output. -b, --number-nonblank number nonempty output lines, overrides -n -E, --show-ends display $ at end of each line -n, --number number all output lines -s, --squeeze-blank suppress repeated empty output lines -T, --show-tabs display TAB characters as ^I -u (ignored) --help display this help and exit With no FILE, or when FILE is -, read standard input. Examples: cat f - g Output f's contents, then standard input, then g's contents. cat Copy standard input to standard output.""" # NOT IMPLEMENTED: # -A, --show-all equivalent to -vET # -e equivalent to -vE # -t equivalent to -vT # -v, --show-nonprinting use ^ and M- notation, except for LFD and TAB # --version output version information and exit"""
[docs] def main(args=None): run_alias("cat", args)
if __name__ == "__main__": main()