pytex.py 12.7 KB
#!/usr/bin/env python

import sys, os, os.path, argparse, inspect, ast, glob
import SocketServer, socket, multiprocessing, threading
import psutil

try :
    import pygments, pygments.lexers, pygments.formatters
except :
    pygments = None

def log (message) :
    sys.stdout.write(message)
    sys.stdout.flush()

class Interpreter (object) :
    def __init__ (self, jobname) :
        self._t = []
        if pygments is None :
            self.do_pygmentize = self._do_not_pygmentize
        self.base = jobname + "-pygments"
        self.do_reset()
    def do_reset (self) :
        self._t, self._g, self._l = [], {}, {}
        for name, method in inspect.getmembers(self, inspect.ismethod) :
            if name.startswith("do_") :
                self._g[name[3:]] = method
        if pygments :
            source = pygments.highlight(
                "pass",
                pygments.lexers.PythonLexer(),
                pygments.formatters.LatexFormatter(full=True, encoding="utf8"))
            with open(self.base + ".tex", "w") as out :
                for line in source.splitlines(True) :
                    if line.strip().startswith(r"\documentclass{") :
                        pass
                    elif line.strip() == r"\begin{document}" :
                        break
                    elif line.strip() == r"\usepackage[utf8]{inputenc}" :
                        pass
                    else :
                        out.write(line)
            self.do_tex(r"\input{%s.tex}" % self.base)
    def _do_not_pygmentize (self, path, include=True, inline=False) :
        outfile = os.path.splitext(path)[0] + ".tex"
        source = open(path).read()
        if inline :
            source = (r"\Verb$"
                      + "".join(l.rstrip() for l in
                                source.splitlines()).replace("$", r"$\Verb|$|\Verb$")
                      + r"$\endinput")
        with open(outfile, "w") as out :
            out.write(source)
        if include :
            self.do_tex(r"\input{%s}" % outfile)
    def do_pygmentize (self, path, include=True, inline=False) :
        outfile = os.path.splitext(path)[0] + ".tex"
        source = pygments.highlight(open(path).read(),
                                    pygments.lexers.PythonLexer(),
                                    pygments.formatters.LatexFormatter())
        if inline :
            lines = source.splitlines(True)
            lines[0] = r"\fvset{%s}\Verb$" % lines[0].split("[", 1)[-1].split("]", 1)[0]
            lines.pop(-1)
            source = "".join(l.rstrip() for l in lines) + r"$\endinput"
        with open(outfile, "w") as out :
            out.write(source)
        if include :
            self.do_tex(r"\input{%s}" % outfile)
    def do_makedef (self, name, path) :
        out = open(path + ".out").read().rsplit("\\", 1)[0]
        self.do_tex(r"\def\%s{%s}" % (name, self.do_escape(out)))
    def do_escape (self, text) :
        escape = {"\\" : "{\\textbackslash}",
                  "^"  : "{\\textasciicircum}",
                  "~"  : "{\\textasciitilde}"}
        return "".join(c if c not in "{}$%#&_\\^~" else escape.get(c, "\\" + c)
                       for c in str(text))
    def _format (self, text, *args, **opt) :
        char = opt.pop("char", "@")
        if opt :
            raise ValueError("unexpected option %r" % iter(opt.keys()).next())
        if not char :
            raise ValueError("empty separator")
        char2, clen, c2len = 2*char, len(char), 2*len(char)
        data = []
        pos = 0
        for i, c in enumerate(text) :
            if c[i:i+c2len] == char2 :
                data.append(char)
            elif c[i:i+clen] == char :
                data.append(self.do_escape(args[pos]))
                pos += 1
            else :
                data.append(c)
        return "".join(data)
    def do_tex (self, text, *args, **opt) :
        if args or opt :
            self._t.append(self._format(text, *args, **opt))
        else :
            self._t.append(text)
    def _clean (self, source) :
        lines = source.splitlines(True)
        first = lines[0]
        indent = len(first) - len(first.lstrip())
        return "".join(l[indent:] for l in lines)
    def out (self, text) :
        with open(self.base + ".out", "w") as out :
            out.write(text + r"\endinput")
    def __call__ (self, path, mode) :
        self.path = path
        self.base = os.path.splitext(path)[0]
        source = self._clean(open(path).read())
        self._t = []
        if mode == "exec" :
            exec(source, self._g, self._l)
            self.out("".join(self._t))
        elif mode == "eval" :
            out = eval(source, self._g, self._l)
            self.out("".join(self._t) + self.do_escape(str(out)))
        else :
            raise ValueError("invalid mode %r" % mode)

class Handler (SocketServer.BaseRequestHandler) :
    def handle (self) :
        data, sock = self.request
        try :
            req = ast.literal_eval(data)
        except :
            sock.sendto(repr({"status" : "error",
                              "message" : "cannot parse request"}),
                        self.client_address)
            return
        if "method" not in req :
            sock.sendto(repr({"status" : "error",
                              "message" : "no method given"}),
                        self.client_address)
            return
        elif req["method"] not in ["exec", "eval", "quit"] :
            sock.sendto(repr({"status" : "error",
                              "message" : "unknown method"}),
                        self.client_address)
            return
        elif req["method"] == "quit" :
            sock.sendto(repr({"status" : "ok",
                              "message" : "server is shutting down",
                              "pid" : os.getpid()}),
                        self.client_address)
            threading.Thread(target=self.server.shutdown).start()
            return
        elif "path" not in req :
            sock.sendto(repr({"status" : "error",
                              "message" : "no path given"}),
                        self.client_address)
            return
        elif not os.path.isfile(req["path"]) :
            sock.sendto(repr({"status" : "error",
                              "message" : "file not found %r" % req["path"]}),
                        self.client_address)
            return
        try :
            self.server.py(req["path"], req["method"])
        except Exception as err :
            sock.sendto(repr({"status" : "error",
                              "message" : "raised %s: %s"
                              % (err.__class__.__name__, err)}),
                        self.client_address)
            return
        sock.sendto(repr({"status" : "ok",
                          "message" : "<%(method)s %(path)s>" % req}),
                    self.client_address)

class Server (multiprocessing.Process) :
    @classmethod
    def daemonize (cls, port, jobname) :
        child = cls(port, jobname)
        child.start()
        multiprocessing.process._current_process._children.discard(child)
        log("<server started at %s>" % child.pid)
    def __init__ (self, port, jobname) :
        multiprocessing.Process.__init__(self)
        self.port = port
        self.jobname = jobname
    def run (self) :
        self.server = SocketServer.UDPServer(("", self.port), Handler)
        self.server.py = Interpreter(self.jobname)
        self.server.serve_forever()
    @classmethod
    def quit (cls, port) :
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.settimeout(1)
        sock.sendto(repr({"method": "quit"}), ("127.0.0.1", port))
        return sock.recv(1024)
    @classmethod
    def process (cls, port, mode, path) :
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.settimeout(1)
        sock.sendto(repr({"method": mode, "path" : path}), ("127.0.0.1", port))
        return sock.recv(1024)

def _getjobs (path) :
    return [l.strip().split(None, 1) for l in open(path) if l.strip()]

def getjobs (jobname) :
    if os.path.exists(jobname + ".pytex") :
        return jobname, _getjobs(jobname + ".pytex")
    _jobname = os.path.splitext(jobname)[0]
    if os.path.exists(_jobname + ".pytex") :
        return jobname, _getjobs(_jobname + ".pytex")
    return jobname, None

def main () :
    VERSION = "1.0"
    parser = argparse.ArgumentParser(prog="pytex",
                                     description="companion to pytex LaTeX package")
    parser.add_argument("-v", "--version", action="version",
                        version="%%(prog)s %s" % VERSION)
    parser.add_argument("-b", "--base", action="store", default=None,
                        dest="base", help="base DIR/NAME for temporary files")
    server = parser.add_mutually_exclusive_group()
    server.add_argument("--clean", action="store_true", default=False,
                        dest="clean", help="remove all temporary and auxiliary files")
    server.add_argument("--listen", action="store", default=None, type=int,
                        metavar="PORT", dest="listen", help="start server on PORT")
    server.add_argument("--call", action="store", default=None, type=int,
                        metavar="PORT", dest="call",
                        help="run file calling server on PORT")
    server.add_argument("--shutdown", action="store", default=None, type=int,
                        metavar="PORT", dest="shutdown", help="shutdown server on PORT")
    parser.add_argument("args", nargs="*", metavar="ARG",
                        help="LaTeX jobname (or other arguments depending on options)")
    args = parser.parse_args([a.strip("{}") for a in sys.argv[1:]])
    if args.listen :
        if len(args.args) != 1 :
            log("usage: pytex [-b BASE] --listen PORT JOBNAME")
            sys.exit(1)
        Server.daemonize(args.listen, *args.args)
        return
    elif args.call :
        if len(args.args) != 2 :
            log("usage: pytex [-b BASE] --call PORT MODE FILE")
            sys.exit(1)
        elif args.args[0] not in ("eval", "exec") :
            log("pytex: expected 'exec' or 'eval', but got %r" % args.args[0])
            sys.exit(1)
        resp = Server.process(args.call, *args.args)
        try :
            resp = ast.literal_eval(resp)
        except :
            resp = {"message": "<invalid answer from server>"}
        log(resp.get("message", "<invalid answer from server>"))
        return
    elif args.shutdown :
        if len(args.args) > 0 :
            log("usage: pytex [-b BASE] --shutdown PORT")
            sys.exit(1)
        try :
            resp = Server.quit(args.shutdown)
        except socket.timeout :
            log("<could not reach server to shutdown>")
            sys.exit(1)
        try :
            resp = ast.literal_eval(resp)
            pid = int(resp["pid"])
        except :
            log("<invalid answer from server>")
            sys.exit(1)
        child = psutil.Process(pid)
        try :
            child.wait(1)
            log("<server at %s has shutdown>" % child.pid)
            return
        except :
            child.terminate()
        try :
            child.wait(1)
            log("<server at %s terminated>" % child.pid)
            return
        except :
            child.kill()
            log("<server at %s killed>" % child.pid)
        return
    elif len(args.args) != 1 :
        log("usage: pytex [-b BASE] [--clean] JOBNAME")
        sys.exit(1)
    try :
        jobname, jobs = getjobs(args.args[0])
    except :
        log("pytex: invalid file %r" % args.args[0])
        sys.exit(2)
    if args.base is None :
        if jobs :
            first = jobs[0][1]
            args.base = first[:-4]
        else :
            args.base = jobname + ".pytmp" + os.sep
            jobs = []
            if not os.path.isdir(jobname + ".pytmp") :
                os.mkdir(jobname + ".pytmp")
            if not os.path.exists(os.path.join(jobname + ".pytmp", jobname)) :
                open(os.path.join(jobname + ".pytmp", jobname), "w").close()
    if args.clean :
        for path in ([jobname + ".pytex", jobname + "-pygments.tex"]
                     + glob.glob(args.base + "*.py")
                     + glob.glob(args.base + "*.out")
                     + glob.glob(args.base + "*.tex")) :
            try :
                os.unlink(path)
            except :
                pass
        return
    py = Interpreter(jobname)
    for mode, path in jobs :
        if not os.path.exists(path) :
            log("not found: %s" % path)
        elif mode in ("eval", "exec") :
            log("%s: %s" % (mode, path))
            py(path, mode)
        elif mode == "code" :
            log("skip: %s" % path)
        else :
            log("pytex: invalid mode %r in file %r" % (mode, jobname + ".pytex"))

if __name__ == "__main__" :
    main()