Jordan de la Houssaye

integrates andy's frontend and backend

# entities: tuple of name of the entities, initial level, tuple of decays 0
# denotes unbounded decay (omega)
# examples:
# entities = ( ('B',4, (0,2,2,2,3)), ('P',0, (0,0)), ('C',0, (0,0)),
# ('G',0, (0,0)) )
# entities = ( ('Sugar',1, (0,2)), ('Aspartame',0, (0,2)),
# ('Glycemia',2, (0,2,2,2)), ('Glucagon',0, (0,2)),
# ('Insulin',0,(0,2,2)) )
entities = ( ('s1',0, (0,1)), ('s2',0, (0,1)), ('s3',0, (0,1)) )
# Activities: Tuple of (activators, inhibitors, results, duration)
# activators, inhibitors are dictionaries of pairs
# (entity, level)
# results are dictionaries of pairs (entity, +z)
# potential activities examples:
# potential = ( (dict([('P',0)]),dict([('P',1)]),dict([('P',1)]),0),
# (dict([('P',1)]),dict(),dict([('P',-1)]),0),
# (dict([('C',0)]),dict([('C',1)]),dict([('C',1)]),0),
# (dict([('C',1)]),dict(),dict([('C',-1)]),0),
# (dict([('G',0)]),dict([('G',1)]),dict([('G',1)]),0),
# (dict([('G',1)]),dict(),dict([('G',-1)]),0) )
# potential = ( (dict([('Sugar',1)]),dict(),
# dict([('Insulin',1),('Glycemia',1)]),0),
# (dict([('Aspartame',1)]),dict(),dict([('Insulin',1)]),0),
# (dict(),dict([('Glycemia',1)]),dict([('Glucagon',1)]),0),
# (dict([('Glycemia',3)]),dict(),dict([('Insulin',1)]),0),
# (dict([('Insulin',2)]),dict(),dict([('Glycemia',-1)]),0),
# (dict([('Insulin',1),('Glycemia',3)]), dict(),
# dict([('Glycemia',-1)]),0),
# (dict([('Insulin',1)]),dict([('Glycemia',2)]),
# dict([('Glycemia',-1)]),0),
# (dict([('Glucagon',1)]),dict(),dict([('Glycemia',+1)]),0)
# )
potential = ( (dict(), dict([('s1',1)]), dict([('s2',1)]), 1),
(dict(), dict([('s2',1)]), dict([('s3',1)]), 1),
(dict(), dict([('s3',1)]), dict([('s1',1)]), 1) )
# obligatory activities examples:
# obligatory = ( (dict([('P',1)]),dict(),dict([('B',1)]),1),
# (dict([('C',1)]),dict(),dict([('B',-1)]),3),
# (dict([('G',1)]),dict(),dict([('B',-2)]),3))
obligatory = ()
"""This package features the ABCD compiler, this is mainly a
"""This package features the Andy compiler, this is mainly a
command-line tool but it can be called also from Python. The API is
very simple and mimics the command line interface
### Function `snakes.utils.abcd.main.main` ###
### Function `snakes.utils.andy.main.main` ###
:::python
def main (args=sys.argv[1:], src=None) : ...
def main (args=sys.argv[1:]) : ...
Entry point of the compiler
##### Call API #####
* `list args`:
* `str src`:
* `return PetriNet`:
##### Exceptions #####
......
import snakes.plugins
snakes.plugins.load(['gv', 'ops'], 'snakes.nets', 'nets')
from nets import *
###############################################################################
## AUXILIARY FUNCTIONS ########################################################
......@@ -132,14 +127,14 @@ def potentialt (name, lp, up, lambdap, R) :
###############################################################################
## MAIN #######################################################################
def andy2snakes(entities, potential, obligatory):
def andy2snakes(snk, entities, potential, obligatory):
# compute maximal duration of activities
D=0
for alpha in potential : D = max(D, alpha[3])
for alpha in obligatory : D = max(D, alpha[3])
n = PetriNet('andy')
n = snk.PetriNet('andy')
n.globals["obligatory"] = obligatory
n.globals["D"] = D
......@@ -153,13 +148,13 @@ def andy2snakes(entities, potential, obligatory):
level = entities[i][1]
deltas = entities[i][2]
vector = [0]*len(deltas)
n.add_place(Place(name, [(level,0, tuple(vector))]))
n.add_place(snk.Place(name, [(level,0, tuple(vector))]))
################# clock transition
inputlist = dict()
n.globals["inputlist"] = inputlist
n.add_transition(Transition('tc'))
n.add_transition(snk.Transition('tc'))
# connect all obligatory clocks
......@@ -167,7 +162,7 @@ def andy2snakes(entities, potential, obligatory):
# transition name
obname = 'ob'+str(i)
# for every obligatory activity connect corresponding place to clock
n.add_place(Place('p'+obname, [0]))
n.add_place(snk.Place('p'+obname, [0]))
n.add_input('p'+obname, 'tc', Variable('w'+obname))
inputlist.update({obname:'w'+obname})
......@@ -178,20 +173,20 @@ def andy2snakes(entities, potential, obligatory):
deltas = entities[i][2]
n.globals["deltas"+name] = deltas
n.globals[name] = name
n.add_input(name, 'tc', Tuple([Variable('l'+name), Variable('u'+name), Variable('lambda'+name) ]))
n.add_input(name, 'tc', snk.Tuple([snk.Variable('l'+name), snk.Variable('u'+name), snk.Variable('lambda'+name) ]))
inputlist.update({name:['l'+name, 'u'+name, 'lambda'+name ]})
for i in range(0,len(entities)):
name=entities[i][0]
n.add_output(name, 'tc', Expression("clockt(obligatory,"+name+",l"+name+',u'+name+',lambda'+name+',deltas'+name+',inputlist,D)'))
n.add_output(name, 'tc', snk.Expression("clockt(obligatory,"+name+",l"+name+',u'+name+',lambda'+name+',deltas'+name+',inputlist,D)'))
for i in range(0,len(obligatory)):
obname = 'ob'+str(i)
# for every obligatory activity connect corresponding place to clock
n.add_output('p'+obname, 'tc', Expression("clockbetat(obligatory,"+str(i)+',w'+obname+',inputlist,D)'))
n.add_output('p'+obname, 'tc', snk.Expression("clockbetat(obligatory,"+str(i)+',w'+obname+',inputlist,D)'))
## potential activities
......@@ -200,15 +195,15 @@ def andy2snakes(entities, potential, obligatory):
trname = 'tr'+str(i)
# for every potential activity connect corresponding place to clock
n.add_place(Place('p'+trname, [0]))
n.add_input('p'+trname, 'tc', Variable('w'+trname))
n.add_output('p'+trname, 'tc', Expression('min(D,w'+trname+'+1)'))
n.add_place(snk.Place('p'+trname, [0]))
n.add_input('p'+trname, 'tc', snk.Variable('w'+trname))
n.add_output('p'+trname, 'tc', snk.Expression('min(D,w'+trname+'+1)'))
activators = potential[i][0]
inhibitors = potential[i][1]
results = potential[i][2]
print results
#print results
n.globals["results"+trname] = results
duration = potential[i][3]
......@@ -240,14 +235,14 @@ def andy2snakes(entities, potential, obligatory):
level = str(inhibitors[nameinhib[j]])
guard += ' and l'+spec+'< '+ level + ' and lambda' +spec+'['+level+']>='+str(duration)
n.add_transition(Transition(trname, Expression(guard)))
n.add_input('p'+trname, trname, Variable('w'))
n.add_output('p'+trname, trname, Expression('0'))
n.add_transition(snk.Transition(trname, snk.Expression(guard)))
n.add_input('p'+trname, trname, snk.Variable('w'))
n.add_output('p'+trname, trname, snk.Expression('0'))
# arcs of the transition from and to involved entities
for j in range(0,len(names)) :
n.add_input(names[j], trname, Tuple([Variable('l'+names[j]), Variable('u'+names[j]), Variable('lambda'+names[j]) ]))
n.add_output(names[j], trname, Expression("potentialt(" +names[j]+",l"+names[j]+',u'+names[j]+',lambda'+names[j]+', results'+trname+')'))
n.add_input(names[j], trname, snk.Tuple([snk.Variable('l'+names[j]), snk.Variable('u'+names[j]), snk.Variable('lambda'+names[j]) ]))
n.add_output(names[j], trname, snk.Expression("potentialt(" +names[j]+",l"+names[j]+',u'+names[j]+',lambda'+names[j]+', results'+trname+')'))
return n
......@@ -255,7 +250,7 @@ def andy2snakes(entities, potential, obligatory):
def draw_net(net, out_name='repress'):
net.draw(out_name+'.ps')
def draw_stategraph(net, entities_names, out_name='repressgraph',
def draw_stategraph(snk, net, entities_names, out_name='repressgraph',
with_dot=True):
def node_attr (state, graph, attr) :
marking = graph[state]
......@@ -264,7 +259,7 @@ def draw_stategraph(net, entities_names, out_name='repressgraph',
def edge_attr (trans, mode, attr) :
attr["label"] = trans.name
s = StateGraph(net)
s = snk.StateGraph(net)
s.build()
s.draw(out_name+'.ps', node_attr=node_attr, edge_attr=edge_attr,
......@@ -278,6 +273,9 @@ def draw_stategraph(net, entities_names, out_name='repressgraph',
g.render(out_name+"-layout.dot", engine="dot")
if __name__=='__main__':
import snakes.plugins
snakes.plugins.load(['gv', 'ops'], 'snakes.nets', 'snk')
# entities: tuple of name of the entities, initial level, tuple of decays 0
# denotes unbounded decay (omega)
# examples:
......@@ -325,7 +323,7 @@ if __name__=='__main__':
obligatory = ()
net = andy2snakes(entities, potential, obligatory)
net = andy2snakes(snk, entities, potential, obligatory)
draw_net(net, out_name="repress")
draw_stategraph(net, ("s1", "s2", "s3"), out_name="repressgraph")
draw_stategraph(snk, net, ("s1", "s2", "s3"), out_name="repressgraph")
......
import sys, operator, inspect, re, collections
from snakes.utils.abcd import CompilationError, DeclarationError
from snakes.lang.abcd.parser import ast
from snakes.lang import unparse
import snakes.utils.abcd.transform as transform
from snakes.data import MultiSet
from snakes import *
class Decl (object) :
OBJECT = "object"
TYPE = "type"
BUFFER = "buffer"
SYMBOL = "symbol"
CONST = "const"
NET = "net"
TASK = "task"
IMPORT = "import"
def __init__ (self, node, kind=None, **data) :
self.node = node
classname = node.__class__.__name__
if kind is not None :
self.kind = kind
elif classname == "AbcdTypedef" :
self.kind = self.TYPE
elif classname == "AbcdBuffer" :
self.kind = self.BUFFER
elif classname == "AbcdSymbol" :
self.kind = self.SYMBOL
elif classname == "AbcdConst" :
self.kind = self.CONST
elif classname == "AbcdNet" :
self.kind = self.NET
elif classname == "AbcdTask" :
self.kind = self.TASK
elif classname in ("Import", "ImportFrom") :
self.kind = self.IMPORT
else :
self.kind = self.OBJECT
for key, val in data.items() :
setattr(self, key, val)
class GetInstanceArgs (object) :
"""Bind arguments for a net instance
"""
def __init__ (self, node) :
self.argspec = []
self.arg = {}
self.buffer = {}
self.net = {}
self.task = {}
seen = set()
for a in node.args.args + node.args.kwonlyargs :
if a.arg in seen :
self._raise(CompilationError,
"duplicate argument %r" % a.arg)
seen.add(a.arg)
if a.annotation is None :
self.argspec.append((a.arg, "arg"))
else :
self.argspec.append((a.arg, a.annotation.id))
def __call__ (self, *args) :
self.arg.clear()
self.buffer.clear()
self.net.clear()
self.task.clear()
for (name, kind), value in zip(self.argspec, args) :
getattr(self, kind)[name] = value
return self.arg, self.buffer, self.net, self.task
class Builder (object) :
def __init__ (self, snk, path=[], up=None) :
self.snk = snk
self.path = path
self.up = up
self.env = {"True": Decl(None, kind=Decl.CONST, value=True),
"False": Decl(None, kind=Decl.CONST, value=False),
"None": Decl(None, kind=Decl.CONST, value=None),
"dot": Decl(None, kind=Decl.CONST, value=self.snk.dot),
"BlackToken": Decl(None, kind=Decl.TYPE,
type=self.snk.Instance(self.snk.BlackToken))}
self.stack = []
if up :
self.globals = up.globals
else :
self.globals = snk.Evaluator(dot=self.snk.dot,
BlackToken=self.snk.BlackToken)
self.instances = MultiSet()
# utilities
def _raise (self, error, message) :
"""raise an exception with appropriate location
"""
if self.stack :
pos = "[%s:%s]: " % (self.stack[-1].lineno,
self.stack[-1].col_offset)
else :
pos = ""
raise error(pos+message)
def _eval (self, expr, *largs, **kwargs) :
env = self.globals.copy()
if isinstance(expr, ast.AST) :
expr = unparse(expr)
return env(expr, dict(*largs, **kwargs))
# declarations management
def __setitem__ (self, name, value) :
if name in self.env :
self._raise(DeclarationError, "duplicated declaration of %r" % name)
self.env[name] = value
def __getitem__ (self, name) :
if name in self.env :
return self.env[name]
elif self.up is None :
self._raise(DeclarationError, "%r not declared" % name)
else :
return self.up[name]
def __contains__ (self, name) :
if name in self.env :
return True
elif self.up is None :
return False
else :
return name in self.up
def goto (self, name) :
if name in self.env :
return self
elif self.up is None :
self._raise(DeclarationError, "%r not declared" % name)
else :
return self.up.goto(name)
def get_buffer (self, name) :
if name not in self :
self._raise(DeclarationError,
"buffer %r not declared" % name)
decl = self[name]
if decl.kind != Decl.BUFFER :
self._raise(DeclarationError,
"%r declared as %s but used as buffer"
% (name, decl.kind))
elif decl.capacity is not None :
pass
#self._raise(NotImplementedError, "capacities not (yet) supported")
return decl
def get_net (self, name) :
if name not in self :
self._raise(DeclarationError,
"net %r not declared" % name)
decl = self[name]
if decl.kind != Decl.NET :
self._raise(DeclarationError,
"%r declared as %s but used as net"
% (name, decl.kind))
return decl
def get_task (self, name) :
if name not in self :
self._raise(DeclarationError,
"task %r not declared" % name)
decl = self[name]
if decl.kind != Decl.TASK :
self._raise(DeclarationError,
"%r declared as %s but used as task"
% (name, decl.kind))
return decl
# main compiler entry point
def build (self, node, prefix="", fallback=None) :
self.stack.append(node)
if prefix :
prefix += "_"
method = "build_" + prefix + node.__class__.__name__
visitor = getattr(self, method, fallback or self.build_fail)
try :
return visitor(node)
finally :
self.stack.pop(-1)
def build_fail (self, node) :
self._raise(CompilationError, "do not know how to compile %s"
% node.__class__.__name__)
def build_arc (self, node) :
return self.build(node, "arc", self.build_arc_expr)
# specification
def build_AbcdSpec (self, node) :
for decl in node.context :
self.build(decl)
tasks = [self._build_TaskNet(decl.node)
for name, decl in self.env.items()
if decl.kind == Decl.TASK and decl.used]
net = reduce(operator.or_, tasks, self.build(node.body))
# set local buffers marking, and hide them
for name, decl in ((n, d) for n, d in self.env.items()
if d.kind == Decl.BUFFER) :
status = self.snk.buffer(name)
for place in net.status(status) :
place = net.place(place)
try :
place.reset(decl.marking)
except ValueError as err :
self._raise(CompilationError,
"invalid initial marking (%s)" % err)
if decl.capacity is None :
cap = None
else :
#cap = [c.n if c else None for c in decl.capacity]
# TODO: accept more than integers as capacities
cap = []
for c in decl.capacity :
if c is None :
cap.append(None)
else :
try :
cap.append(self._eval(c))
except :
err = sys.exc_info()[1]
self._raise(CompilationError,
"could not evaluate %r, %s"
% (unparse(c), err))
place.label(path=self.path,
capacity=cap)
# TODO: check capacity
net.hide(status)
if self.up is None :
# set entry marking
for place in net.status(self.snk.entry) :
net.place(place).reset(self.snk.dot)
# rename nodes
self._rename_nodes(net)
# copy global declarations
net.globals.update(self.globals)
# add info about source file
net.label(srcfile=str(node.st.text.filename))
# add assertions
net.label(asserts=node.asserts)
return net
def _build_TaskNet (self, node) :
self._raise(NotImplementedError, "tasks not (yet) supported")
def _rename_nodes (self, net) :
# generate unique names
total = collections.defaultdict(int)
count = collections.defaultdict(int)
def ren (node) :
if net.has_transition(node.name) :
status = node.label("srctext")
else :
if node.status == self.snk.entry :
status = "e"
elif node.status == self.snk.internal :
status = "i"
elif node.status == self.snk.exit :
status = "x"
else :
status = node.label("buffer")
name = ".".join(node.label("path") + [status])
if total[name] > 1 :
count[name] += 1
name = "%s#%s" % (name, count[name])
return name
# count occurrences of each name base
_total = collections.defaultdict(int)
for node in net.node() :
_total[ren(node)] += 1
total = _total
# rename nodes using a depth-first traversal
done = set(net.status(self.snk.entry))
todo = [net.node(n) for n in done]
while todo :
node = todo.pop(-1)
new = ren(node)
if new != node.name :
net.rename_node(node.name, new)
done.add(new)
for n in net.post(new) - done :
todo.append(net.node(n))
done.add(n)
# rename isolated nodes
for letter, method in (("p", net.place), ("t", net.transition)) :
for node in method() :
if node.name not in done :
net.rename_node(node.name, ren(node))
# declarations
def build_AbcdTypedef (self, node) :
"""
>>> import snakes.nets
>>> b = Builder(snakes.nets)
>>> b.build(ast.AbcdTypedef(name='number', type=ast.UnionType(types=[ast.NamedType(name='int'), ast.NamedType(name='float')])))
>>> b.env['number'].type
(Instance(int) | Instance(float))
>>> b.build(ast.ImportFrom(module='inspect', names=[ast.alias(name='isbuiltin')]))
>>> b.build(ast.AbcdTypedef(name='builtin', type=ast.NamedType(name='isbuiltin')))
>>> b.env['builtin'].type
TypeCheck(inspect.isbuiltin)
"""
self[node.name] = Decl(node, type=self.build(node.type))
def build_AbcdBuffer (self, node) :
self[node.name] = Decl(node,
type=self.build(node.type),
capacity=node.capacity,
marking=self._eval(node.content))
def build_AbcdSymbol (self, node) :
for name in node.symbols :
value = self.snk.Symbol(name, False)
self[name] = Decl(node, value=value)
self.globals[name] = value
def build_AbcdConst (self, node) :
value = self._eval(node.value)
self[node.name] = Decl(node, value=value)
self.globals[node.name] = value
def build_AbcdNet (self, node) :
self[node.name] = Decl(node, getargs=GetInstanceArgs(node))
def build_AbcdTask (self, node) :
self._raise(NotImplementedError, "tasks not (yet) supported")
self[node.name] = Decl(node, used=False)
def build_Import (self, node) :
for alias in node.names :
self[alias.asname or alias.name] = Decl(node)
self.globals.declare(unparse(node))
def build_ImportFrom (self, node) :
self.build_Import(node)
# processes
def build_AbcdAction (self, node) :
if node.guard is True :
return self._build_True(node)
elif node.guard is False :
return self._build_False(node)
else :
return self._build_action(node)
def _build_True (self, node) :
net = self.snk.PetriNet("true")
e = self.snk.Place("e", [], self.snk.tBlackToken,
status=self.snk.entry)
e.label(path=self.path)
net.add_place(e)
x = self.snk.Place("x", [], self.snk.tBlackToken,
status=self.snk.exit)
x.label(path=self.path)
net.add_place(x)
t = self.snk.Transition("t")
t.label(srctext=node.st.source(),
srcloc=(node.st.srow, node.st.scol,
node.st.erow, node.st.ecol),
path=self.path)
net.add_transition(t)
net.add_input("e", "t", self.snk.Value(self.snk.dot))
net.add_output("x", "t", self.snk.Value(self.snk.dot))
return net
def _build_False (self, node) :
net = self.snk.PetriNet("false")
e = self.snk.Place("e", [], self.snk.tBlackToken,
status=self.snk.entry)
e.label(path=self.path)
net.add_place(e)
x = self.snk.Place("x", [], self.snk.tBlackToken,
status=self.snk.exit)
x.label(path=self.path)
net.add_place(x)
return net
def _build_action (self, node) :
net = self.snk.PetriNet("flow")
e = self.snk.Place("e", [], self.snk.tBlackToken,
status=self.snk.entry)
e.label(path=self.path)
net.add_place(e)
x = self.snk.Place("x", [], self.snk.tBlackToken,
status=self.snk.exit)
x.label(path=self.path)
net.add_place(x)
t = self.snk.Transition("t", self.snk.Expression(unparse(node.guard)),
status=self.snk.tick("action"))
t.label(srctext=node.st.source(),
srcloc=(node.st.srow, node.st.scol,
node.st.erow, node.st.ecol),
path=self.path)
net.add_transition(t)
net.add_input("e", "t", self.snk.Value(self.snk.dot))
net.add_output("x", "t", self.snk.Value(self.snk.dot))
net = reduce(operator.or_, [self.build(a) for a in node.accesses],
net)
net.hide(self.snk.tick("action"))
return net
def build_AbcdFlowOp (self, node) :
return self.build(node.op)(self.build(node.left),
self.build(node.right))
def _get_instance_arg (self, arg) :
if arg.__class__.__name__ == "Name" and arg.id in self :
return self[arg.id]
else :
try :
self._eval(arg)
except :
self._raise(CompilationError,
"could not evaluate argument %r"
% arg.st.source())
return arg
def build_AbcdInstance (self, node) :
if node.net not in self :
self._raise(DeclarationError, "%r not declared" % node.net)
elif node.starargs :
self._raise(CompilationError, "* argument not allowed here")
elif node.kwargs :
self._raise(CompilationError, "** argument not allowed here")
decl = self[node.net]
if decl.kind != Decl.NET :
self._raise(DeclarationError,
"%r declared as %s but used as net"
% (name, decl.kind))
# unpack args
posargs, kwargs = [], {}
for arg in node.args :
posargs.append(self._get_instance_arg(arg))
for kw in node.keywords :
kwargs[kw.arg] = self._get_instance_arg(kw.value)
# bind args
try :
args, buffers, nets, tasks = decl.getargs(*posargs, **kwargs)
except TypeError :
c, v, t = sys.exc_info()
self._raise(CompilationError, str(v))
for d, kind in ((buffers, Decl.BUFFER),
(nets, Decl.NET),
(tasks, Decl.TASK)) :
for k, v in d.items() :
if v.kind != kind :
self._raise(DeclarationError,
"%r declared as %s but used as %s"
% (k, v.kind, kind))
d[k] = v.node.name
# build sub-net
binder = transform.ArgsBinder(args, buffers, nets, tasks)
spec = binder.visit(decl.node.body)
if node.asname :
name = str(node.asname)
else :
name = node.st.source()
if name in self.instances :
name = "%s#%s" % (name, self.instances(name))
self.instances.add(name)
path = self.path + [name]
builder = self.__class__(self.snk, path, self)
net = builder.build(spec)
src = (node.st.source(),
node.st.srow, node.st.scol,
node.st.erow, node.st.ecol)
for trans in net.transition() :
try :
lbl = trans.label("instances")
trans.label(instances=[src] + lbl)
except KeyError :
trans.label(instances=[src])
for place in net.place() :
if place.status == self.snk.Status(None) :
try :
lbl = place.label("instances")
place.label(instances=[src] + lbl)
except KeyError :
place.label(instances=[src])
return net
# control flow operations
def build_Sequence (self, node) :
return self.snk.PetriNet.__and__
def build_Choice (self, node) :
return self.snk.PetriNet.__add__
def build_Parallel (self, node) :
return self.snk.PetriNet.__or__
def build_Loop (self, node) :
return self.snk.PetriNet.__mul__
# accesses :
def build_SimpleAccess (self, node) :
decl = self.get_buffer(node.buffer)
net = self.snk.PetriNet("access")
net.add_transition(self.snk.Transition("t", status=self.snk.tick("action")))
b = self.snk.Place(str(node.buffer), [], decl.type,
status=self.snk.buffer(node.buffer))
b.label(path=self.path,
buffer=str(node.buffer),
srctext=decl.node.st.source(),
srcloc=(decl.node.st.srow, decl.node.st.scol,
decl.node.st.erow, decl.node.st.ecol))
net.add_place(b)
self.build(node.arc)(net, node.buffer, "t", self.build_arc(node.tokens))
return net
def build_FlushAccess (self, node) :
decl = self.get_buffer(node.buffer)
net = self.snk.PetriNet("access")
net.add_transition(self.snk.Transition("t", status=self.snk.tick("action")))
b = self.snk.Place(str(node.buffer), [], decl.type,
status=self.snk.buffer(node.buffer))
b.label(path=self.path,
buffer=str(node.buffer),
srctext=decl.node.st.source(),
srcloc=(decl.node.st.srow, decl.node.st.scol,
decl.node.st.erow, decl.node.st.ecol))
net.add_place(b)
net.add_input(node.buffer, "t", self.snk.Flush(node.target))
return net
def build_SwapAccess (self, node) :
decl = self.get_buffer(node.buffer)
net = self.snk.PetriNet("access")
net.add_transition(self.snk.Transition("t", status=self.snk.tick("action")))
b = self.snk.Place(node.buffer, [], decl.type,
status=self.snk.buffer(node.buffer))
b.label(path=self.path,
buffer=str(node.buffer),
srctext=decl.node.st.source(),
srcloc=(decl.node.st.srow, decl.node.st.scol,
decl.node.st.erow, decl.node.st.ecol))
net.add_place(b)
net.add_input(node.buffer, "t", self.build_arc(node.target))
net.add_output(node.buffer, "t", self.build_arc(node.tokens))
return net
def build_Spawn (self, node) :
self._raise(NotImplementedError, "tasks not (yet) supported")
def build_Wait (self, node) :
self._raise(NotImplementedError, "tasks not (yet) supported")
def build_Suspend (self, node) :
self._raise(NotImplementedError, "tasks not (yet) supported")
def build_Resume (self, node) :
self._raise(NotImplementedError, "tasks not (yet) supported")
# arc labels
def build_arc_Name (self, node) :
if node.id in self :
decl = self[node.id]
if decl.kind in (Decl.CONST, Decl.SYMBOL) :
return self.snk.Value(decl.value)
return self.snk.Variable(node.id)
def build_arc_Num (self, node) :
return self.snk.Value(node.n)
def build_arc_Str (self, node) :
return self.snk.Value(node.s)
def build_arc_Tuple (self, node) :
return self.snk.Tuple([self.build_arc(elt) for elt in node.elts])
def build_arc_expr (self, node) :
return self.snk.Expression(unparse(node))
# arcs
def build_Produce (self, node) :
def arc (net, place, trans, label) :
net.add_output(place, trans, label)
return arc
def build_Test (self, node) :
def arc (net, place, trans, label) :
net.add_input(place, trans, self.snk.Test(label))
return arc
def build_Consume (self, node) :
def arc (net, place, trans, label) :
net.add_input(place, trans, label)
return arc
def build_Fill (self, node) :
def arc (net, place, trans, label) :
net.add_output(place, trans, self.snk.Flush(str(label)))
return arc
# types
def build_UnionType (self, node) :
return reduce(operator.or_, (self.build(child)
for child in node.types))
def build_IntersectionType (self, node) :
return reduce(operator.and_, (self.build(child)
for child in node.types))
def build_CrossType (self, node) :
return self.snk.CrossProduct(*(self.build(child)
for child in node.types))
def build_ListType (self, node) :
return self.snk.List(self.build(node.items))
def build_TupleType (self, node) :
return self.snk.Collection(self.snk.Instance(tuple),
(self.build(node.items)))
def build_SetType (self, node) :
return self.snk.Set(self.build(node.items))
def build_DictType (self, node) :
return self.snk.Mapping(keys=self.build(node.keys),
items=self.build(node.items),
_dict=self.snk.Instance(self.snk.hdict))
def build_EnumType (self, node) :
return self.snk.OneOf(*(self._eval(child) for child in node.items))
def build_NamedType (self, node) :
name = node.name
if name in self and self[name].kind == Decl.TYPE :
return self[name].type
elif name in self.globals :
obj = self.globals[name]
if inspect.isclass(obj) :
return self.snk.Instance(obj)
elif inspect.isroutine(obj) :
return self.snk.TypeCheck(obj)
elif hasattr(sys.modules["__builtin__"], name) :
obj = getattr(sys.modules["__builtin__"], name)
if inspect.isclass(obj) :
return self.snk.Instance(obj)
elif inspect.isroutine(obj) :
return self.snk.TypeCheck(obj)
self._raise(CompilationError,
"invalid type %r" % name)
if __name__ == "__main__" :
import doctest
doctest.testmod(optionflags=doctest.NORMALIZE_WHITESPACE
| doctest.REPORT_ONLY_FIRST_FAILURE
| doctest.ELLIPSIS)
from snakes.lang.abcd.parser import parse
node = parse(open(sys.argv[1]).read())
import snakes.plugins
snk = snakes.plugins.load(["ops", "gv", "labels"], "snakes.nets", "snk")
build = Builder(snk)
net = build.build(node)
net.draw(sys.argv[1] + ".png")
import heapq
from snakes.nets import StateGraph
import snakes.lang
import snkast as ast
class Checker (object) :
def __init__ (self, net) :
self.g = StateGraph(net)
self.f = [self.build(f) for f in net.label("asserts")]
def build (self, tree) :
src = """
def check (_) :
return %s
""" % tree.st.source()[7:]
ctx = dict(self.g.net.globals)
ctx["bounded"] = self.bounded
exec(src, ctx)
fun = ctx["check"]
fun.lineno = tree.lineno
return fun
def bounded (self, marking, max) :
return all(len(marking(p)) == 1 for p in marking)
def run (self) :
for state in self.g :
marking = self.g.net.get_marking()
for place in marking :
if max(marking(place).values()) > 1 :
return None, self.trace(state)
for check in self.f :
try :
if not check(marking) :
return check.lineno, self.trace(state)
except :
pass
return None, None
def path (self, tgt, src=0) :
q = [(0, src, ())]
visited = set()
while True :
(c, v1, path) = heapq.heappop(q)
if v1 not in visited :
path = path + (v1,)
if v1 == tgt :
return path
visited.add(v1)
for v2 in self.g.successors(v1) :
if v2 not in visited :
heapq.heappush(q, (c+1, v2, path))
def trace (self, state) :
path = self.path(state)
return tuple(self.g.successors(i)[j]
for i, j in zip(path[:-1], path[1:]))
from snakes.compat import io
from snakes.lang.abcd.parser import ast, parse
import os, tempfile, re, codecs, collections
try :
from cgi import escape
except :
from html import escape
template_css = u"""/* ABCD source code */
.abcd { border:solid 1px #DDD; border-radius:5px; padding:5px 10px; margin:5px; background-color:#F4F4F4; overflow:auto; }
.abcd .comment { color:#888; }
.abcd .ident { color:#808; }
.abcd .string { color:#088; }
.abcd .kw { color:#800; font-weight:bold; }
.abcd .flow { color:#800; font-weight:bold; }
.abcd .buffer .decl { color:#080; font-weight:bold; }
.abcd .net .decl { color:#008; font-weight:bold; }
.abcd .instance .name { color:#008; }
.abcd .action .delim { font-weight:bold; }
.abcd .action .name { color:#080; }
.abcd .highlight { background-color:yellow; }
/* Petri net picture */
.petrinet { border:solid 1px #DDD; border-radius:5px; padding:5px 10px; margin:5px; background-color:#FFF; overflow:auto; clear:both; }
/* Objects tree */
.tree { border:solid 1px #DDD; border-radius:5px; padding:5px 10px; margin:5px; background-color:#F4F4F4; overflow:auto; font-family:monospace; }
.tree .kw { color:#800; font-weight:bold; }
.tree .buffer { color:#080; font-weight:bold; }
.tree .ident { color:#808; }
.tree .instance .name { color:#008; }
.tree .action .delim { font-weight:bold; }
.tree .action .name { color:#080; }
.tree .string { color:#088; }
.tree .highlight { background-color:yellow; }
"""
template_html = u'''<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><title>%(filename)s</title>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
<style type="text/css">%(css)s</style>
%(headers)s
%(jscode)s
</head><body>
<h1><tt>%(filename)s</tt></h1>
%(abcd)s
%(tree)s
%(svg)s
</body></html>
'''
template_tree = '''<div class="tree">%(tree)s</div>'''
template_headers = '''<script src="http://code.jquery.com/jquery-1.11.0.min.js" type="text/javascript"></script>'''
template_jscode = '''<script type="text/javascript">
var nodeColor;
function abcdon () {
obj = jQuery(this);
if (obj.attr("class") == "node") {
node = obj.children().children().first();
nodeColor = node.attr("fill");
node.attr("fill", "yellow");
} else {
obj.addClass("highlight");
}
jQuery(obj.attr("data-abcd")).addClass("highlight");
};
function abcdoff () {
obj = jQuery(this);
if (obj.attr("class") == "node") {
node = obj.children().children().first();
node.attr("fill", nodeColor);
} else {
obj.removeClass("highlight");
}
jQuery(obj.attr("data-abcd")).removeClass("highlight");
};
function treeon () {
obj = jQuery(this);
if (obj.attr("class") != "node") {
obj.addClass("highlight");
}
jQuery(obj.attr("data-tree")).addClass("highlight");
};
function treeoff () {
obj = jQuery(this);
if (obj.attr("class") != "node") {
obj.removeClass("highlight");
}
jQuery(obj.attr("data-tree")).removeClass("highlight");
};
function neton () {
obj = jQuery(this);
jQuery(obj.attr("data-net")).each(function () {
node = jQuery(this).children().children().first();
nodeColor = node.attr("fill");
node.attr("fill", "yellow");
});
obj.addClass("highlight");
};
function netoff () {
obj = jQuery(this);
jQuery(obj.attr("data-net")).each(function () {
node = jQuery(this).children().children().first();
node.attr("fill", nodeColor);
});
obj.removeClass("highlight");
};
function setwidth () {
abcd = jQuery(".abcd");
tree = jQuery(".tree");
width = (jQuery(".petrinet").outerWidth(true) / 2) - 35;
console.log(width);
height = Math.max(abcd.height(), tree.height());
abcd.css({float: "left", width: width, height: height});
tree.css({float: "right", width: width, height: height});
}
jQuery(document).ready(function() {
jQuery("[data-abcd]").hover(abcdon, abcdoff);
jQuery("[data-tree]").hover(treeon, treeoff);
jQuery("[data-net]").hover(neton, netoff);
jQuery(".tree .instance, .tree .action").each(function () {
obj = jQuery(this);
obj.html(jQuery(obj.attr("data-abcd")).html());
});
setwidth();
jQuery(window).resize(setwidth);
});
</script>'''
class Span (object) :
def __init__ (self, cls=[], id=None, net=[], abcd=[], tree=[]) :
self.cls = set([cls]) if isinstance(cls, str) else set(cls)
self.id = id
self.net = set([net]) if isinstance(net, str) else set(net)
self.abcd = set([abcd]) if isinstance(abcd, str) else set(abcd)
self.tree = set([tree]) if isinstance(tree, str) else set(tree)
def copy (self, **attr) :
span = self.__class__(cls=self.cls, id=self.id, net=self.net,
abcd=self.abcd, tree=self.tree)
for k, v in attr.items() :
if isinstance(getattr(span, k), set) :
v = set(v)
setattr(span, k, v)
return span
def __bool__ (self) :
return self.__nonzero__()
def __nonzero__ (self) :
return bool(self.cls or self.id or self.net or self.abcd
or self.tree)
def span (self) :
attr = []
for src, dst in (("cls", "class"), ("id", "id"),
("net", "data-net"), ("abcd", "data-abcd"),
("tree", "data-tree")) :
val = getattr(self, src)
if not val :
continue
elif isinstance(val, str) :
attr.append("%s=%r" % (dst, val))
elif dst.startswith("data-") :
attr.append("%s=%r" % (dst, ", ".join("#" + v for v in val)))
else :
attr.append("%s=%r" % (dst, ", ".join(val)))
return "<span %s>" % " ".join(attr)
def __str__ (self) :
return self.span()
keywords = {"False", "True", "and", "as", "assert", "buffer", "const",
"else", "enum", "for", "from", "if", "import", "in", "is",
"lambda", "net", "not", "or", "symbol", "task", "typedef"}
class ABCD2HTML (ast.NodeVisitor) :
def __init__ (self, tree) :
self.tree = tree
self.x = "%%0%uX" % len("%x" % (tree.st.erow + 1))
self.st = {}
self.path = []
self.nets = {}
self.visit(tree)
def __getitem__ (self, id) :
cls = id.__class__.__name__
if cls in ("Place", "Transition") :
x, y, _, _ = id.label("srcloc")
c = "B" if cls == "Place" else "A"
id = self.newid(c, x, y)
elif isinstance(id, tuple) :
id = self.newid(*id)
if isinstance(self.st[id], Span) :
return self.st[id]
else :
return self.st[id].span
def newid (self, c, x, y) :
return "".join([c[0].upper(), self.x % x, "%X" % y])
def setspan (self, cls, node, **args) :
if isinstance(node, ast.AST) :
node = node.st
if cls in ("action", "buffer", "instance", "net") :
x, y = node.srow, node.scol
ident = self.newid(cls, x, y)
ident = args.pop("id", ident)
span = node.span = Span(cls=cls.lower(), id=ident, **args)
self.st[ident] = node
elif node is None :
span = Span(cls=cls.lower(), **args)
if "id" in args :
self.st[args["id"]] = span
else :
span = node.span = Span(cls=cls.lower(), **args)
if "id" in args :
self.st[args["id"]] = node
return span
def visit_AbcdBuffer (self, node) :
t = node.st
while t.symbol != "abcd_buffer" :
t = t[0]
self.setspan("decl", t[1])
self.setspan("buffer", node)
self.generic_visit(node)
def visit_AbcdNet (self, node) :
t = node.st
while t.symbol != "abcd_net" :
t = t[0]
self.setspan("decl", t[1])
self.setspan("body", node.body)
span = self.setspan("net", node)
self.setspan("proto", None, id="P" + span.id)
self.nets[".".join(self.path + [node.name])] = span.id
self.path.append(node.name)
self.generic_visit(node)
self.path.pop(-1)
def visit_AbcdInstance (self, node) :
t = node.st
while t.symbol != "abcd_instance" :
t = t[0]
self.setspan("name", t[0])
pid = "P" + self.nets[".".join(self.path + [node.net])]
span = self.setspan("instance", node, abcd=[pid])
self[pid].abcd.add(span.id)
self.generic_visit(node)
def visit_AbcdAction (self, node) :
t = node.st
while t[0].text != '[' :
t = t[0]
span = self.setspan("action", node)
self.setspan("delim", t[0], id="L" + span.id)
self.setspan("delim", t[-1], id="R" + span.id)
self.generic_visit(node)
def visit_AbcdFlowOp (self, node) :
t = node.st
while len(t) == 1 :
t = t[0]
while t[0].text == '(' :
t = t[1]
for op in t[1::2] :
self.setspan("flow", op)
self.generic_visit(node)
def visit_SimpleAccess (self, node) :
self.setspan("name", node.st[0])
self.setspan("access", node)
self.generic_visit(node)
def visit_FlushAccess (self, node) :
self.visit_SimpleAccess(node)
def visit_SwapAccess (self, node) :
self.visit_SimpleAccess(node)
def escape (self, text) :
return escape(text)
def build (self, st) :
# collect text skipped during parsing (blanks and comments)
output = io.StringIO()
if st.srow > self.row :
for line in st.text.lexer.lines[self.row-1:st.srow-1] :
output.write(line[self.col:])
self.col = 0
output.write(st.text.lexer.lines[st.srow-1][:st.scol])
elif st.scol > self.col :
output.write(st.text.lexer.lines[self.row-1][self.col:st.scol])
# insert skipped text with comments rendering
for line in output.getvalue().splitlines(True) :
if "#" in line :
left, right = line.split("#", 1)
self.output.write("%s<span class=%r>%s</span>"
% (self.escape(left),
"comment",
self.escape("#" + right)))
else :
self.output.write(self.escape(line))
# adjust current position in source code
self.row, self.col = st.srow, st.scol
# close span for net declaration
span = getattr(st, "span", Span())
if "body" in (c.lower() for c in span.cls) :
self.output.write("</span>")
# generate <span ...> if necessary
if span :
self.output.write(str(span))
# generate span for net declaration
if (span.id or "").startswith("N") :
self.output.write(str(self["P" + span.id]))
# render tree or its children
if len(st) :
for child in st :
# add span tags on special elements
if not hasattr(child, "span") :
if child.symbol == "NAME" :
if child.text in keywords :
self.setspan("kw", child)
else :
self.setspan("ident", child)
elif child.symbol == "STRING" :
self.setspan("string", child)
elif child.symbol == "COLON" :
self.setspan("kw", child)
self.build(child)
else :
if st.symbol not in ("DEDENT", "ENDMARKER") :
self.output.write(self.escape(st.text))
self.row, self.col = st.erow, st.ecol
# generate </span> if necessary
if span :
self.output.write("</span>")
def html (self) :
self.output = io.StringIO()
self.indent, self.row, self.col = False, 1, 0
self.build(self.tree.st)
return "<pre class='abcd'>%s</pre>" % self.output.getvalue()
def Tree () :
return collections.defaultdict(Tree)
class TreeInfo (object) :
def __init__ (self, span, name) :
self.span, self.name = span, name
def __hash__ (self) :
return hash(self.name)
def __eq__ (self, other) :
try :
return self.name == other.name
except :
return False
def __ne__ (self, other) :
return not self.__eq__(other)
def __iter__ (self) :
yield self.span
yield self.name
_svgclean = [(re.compile(r, re.I), s) for r, s in
[(r"<[?!][^>]*>\n*", ""),
(r"<title>[^<>]*</title>\n*", ""),
(r"<g [^<>]*></g>\n*", ""),
]]
class Net2HTML (object) :
def __init__ (self, net, gv, abcd) :
self.gv = gv
self.abcd = abcd
self.tree = Tree()
self.n2a = collections.defaultdict(set)
self.n2t = {}
snk = net.label("snakes")
self.count = collections.defaultdict(int)
for place in net.place() :
nid = gv.nodemap[place.name]
if place.status in (snk.entry, snk.internal, snk.exit) :
for char, trans in ([("R", net.transition(t))
for t in place.pre]
+ [("L", net.transition(t))
for t in place.post]) :
span = abcd[trans]
self.n2a[nid].add(char + span.id)
else :
self.addtree(0, "buffer", place)
for trans in net.transition() :
self.addtree(10, "action", trans)
def addtree (self, weight, kind, node) :
nid = self.gv.nodemap[node.name]
aid = self.abcd[node]
tid = aid.copy(id=("T%X" % self.count[aid.id]) + aid.id,
tree=[], abcd=[aid.id], net=[nid])
self.count[aid.id] += 1
aid.tree.add(tid.id)
aid.net.add(nid)
self.n2a[nid].add(aid.id)
self.n2t[nid] = tid.id
pos = self.tree
path = node.label("path")
try :
inst = node.label("instances")
except :
inst = [None] * len(path)
for name, (_, srow, scol, _, _) in zip(path, inst) :
a = self.abcd["I", srow, scol]
t = a.copy(id=("T%X" % self.count[a.id]) + a.id,
tree=[], abcd=[a.id], net=[])
self.count[a.id] += 1
a.tree.add(t.id)
pos = pos[((20, srow, scol), "instance", TreeInfo(t, name))]
prefix = sum(len(p) for p in path) + len(path)
srow, scol, _, _ = node.label("srcloc")
pos[((weight, srow, scol), kind, (node, node.name[prefix:]))] = tid
def _tree (self, tree, indent="") :
yield indent + "<ul>"
for (_, kind, data), child in sorted(tree.items()) :
if kind == "instance" :
yield indent + "<li>%s%s</span>" % tuple(data)
for item in self._tree(child, indent + " ") :
yield item
yield indent + "</li>"
else :
node, name = data
if kind == "buffer" :
content = ("<span class='kw'>buffer</span> "
"<span class='name'>%s</span> = "
"<span class='content'>%s</span>"
% (name, node.tokens))
yield indent + "<li>%s%s</span></li>" % (child, content)
elif kind == "action" :
content = name
yield (indent + "<li>%s%s</span><ul class='modes'>"
+ "</ul></li>") % (child, content)
else :
raise ValueError("unexpected data %r" % kind)
yield indent + "</ul>"
def html (self) :
return template_tree % {"tree" : "\n".join(self._tree(self.tree))}
def svg (self) :
# load SVG file
with tempfile.NamedTemporaryFile(suffix=".svg") as tmp :
self.gv.render(tmp.name)
with codecs.open(tmp.name, "r", "utf-8") as infile :
svg = infile.read()
for r, s in _svgclean :
svg = r.sub(s, svg)
for node, abcd in self.n2a.items() :
abcd = ", ".join("#" + t for t in abcd)
if node in self.n2t :
svg = svg.replace(' id="%s" ' % node,
' id="%s" data-abcd="%s" data-tree="#%s" '
% (node, abcd, self.n2t[node]))
else :
svg = svg.replace(' id="%s" ' % node,
' id="%s" data-abcd="%s" ' % (node, abcd))
return u"<div class='petrinet'>%s</div>" % svg
def build (abcd, node, net, gv, outfile, tpl=template_html, **args) :
abcd = ABCD2HTML(node)
pnet = Net2HTML(net, gv, abcd)
d = {"filename" : node.st.filename,
"css" : template_css,
"jscode" : template_jscode,
"headers" : template_headers,
"abcd" : abcd.html(),
"tree" : pnet.html(),
"svg" : pnet.svg()}
d.update(args)
if tpl is not None and outfile :
with codecs.open(outfile, "w", "utf-8") as out :
out.write(tpl % d)
return d
import sys, optparse, os.path, webbrowser
import pdb, traceback
import snakes.plugins
from snakes.utils.abcd.build import Builder
from snakes.lang.abcd.parser import parse
from snakes.lang.pgen import ParseError
from snakes.utils.abcd import CompilationError, DeclarationError
from snakes.utils.abcd.simul import Simulator, ABCDSimulator
from snakes.utils.abcd.checker import Checker
from snakes.utils.abcd.html import build as html
from snakes.utils.andy import CompilationError, DeclarationError
from snakes.utils.andy.simul import Simulator, AndySimulator
from snakes.utils.andy.andy import andy2snakes
from snakes.utils.simul.html import json
##
......@@ -26,11 +22,11 @@ ERR_OUTPUT = 7
ERR_BUG = 255
def log (message) :
sys.stdout.write("abcd: %s\n" % message.strip())
sys.stdout.write("andy: %s\n" % message.strip())
sys.stdout.flush()
def err (message) :
sys.stderr.write("abcd: %s\n" % message.strip())
sys.stderr.write("andy: %s\n" % message.strip())
sys.stderr.flush()
def die (code, message=None) :
......@@ -66,7 +62,7 @@ def bug () :
gv_engines = ("dot", "neato", "twopi", "circo", "fdp")
opt = optparse.OptionParser(prog="abcd",
opt = optparse.OptionParser(prog="andy",
usage="%prog [OPTION]... FILE")
opt.add_option("-l", "--load",
dest="plugins", action="append", default=[],
......@@ -81,9 +77,6 @@ for engine in gv_engines :
dest="gv" + engine, action="store", default=None,
help="draw net using '%s' (from GraphViz)" % engine,
metavar="OUTFILE")
opt.add_option("-a", "--all-names",
dest="allnames", action="store_true", default=False,
help="draw control-flow places names (default: hide)")
opt.add_option("--debug",
dest="debug", action="store_true", default=False,
help="launch debugger on compiler error (default: no)")
......@@ -98,22 +91,13 @@ opt.add_option("--port",
dest="port", action="store", default=8000, type=int,
help="port on which the simulator server runs",
metavar="PORT")
opt.add_option("-H", "--html",
dest="html", action="store", default=None,
help="save net as HTML",
metavar="OUTFILE")
opt.add_option("--check",
dest="check", action="store_true", default=False,
help="check assertions")
def getopts (args) :
global options, abcd, tmp
global options, andy, tmp
(options, args) = opt.parse_args(args)
plugins = []
for p in options.plugins :
plugins.extend(t.strip() for t in p.split(","))
if "ops" not in options.plugins :
plugins.append("ops")
if "labels" not in plugins :
plugins.append("labels")
for engine in gv_engines :
......@@ -121,7 +105,7 @@ def getopts (args) :
if gvopt and "gv" not in plugins :
plugins.append("gv")
break
if (options.html or options.simul) and "gv" not in plugins :
if (options.simul) and "gv" not in plugins :
plugins.append("gv")
options.plugins = plugins
if len(args) < 1 :
......@@ -132,17 +116,13 @@ def getopts (args) :
err("more than one input file provided")
opt.print_help()
die(ERR_ARG)
abcd = args[0]
if options.pnml == abcd :
andy = args[0]
if options.pnml == andy :
err("input file also used as output (--pnml)")
opt.print_help()
die(ERR_ARG)
if options.html == abcd :
err("input file also used as output (--html)")
opt.print_help()
die(ERR_ARG)
for engine in gv_engines :
if getattr(options, "gv%s" % engine) == abcd :
if getattr(options, "gv%s" % engine) == andy :
err("input file also used as output (--%s)" % engine)
opt.print_help()
die(ERR_ARG)
......@@ -151,58 +131,9 @@ def getopts (args) :
## drawing nets
##
def place_attr (place, attr) :
# fix color
if place.status == snk.entry :
attr["fillcolor"] = "green"
elif place.status == snk.internal :
pass
elif place.status == snk.exit :
attr["fillcolor"] = "orange"
else :
attr["fillcolor"] = "lightblue"
# fix shape
if (not options.allnames
and place.status in (snk.entry, snk.internal, snk.exit)) :
attr["shape"] = "circle"
# render marking
if place._check == snk.tBlackToken :
count = len(place.tokens)
if count == 0 :
marking = " "
elif count == 1 :
marking = "&#8226;"
else :
marking = "%s&#8226;" % count
else :
marking = str(place.tokens)
# node label
if (options.allnames
or place.status not in (snk.entry, snk.internal, snk.exit)) :
attr["label"] = "%s\\n%s" % (place.name, marking)
else :
attr["label"] = "%s" % marking
def trans_attr (trans, attr) :
pass
def arc_attr (label, attr) :
if label == snk.Value(snk.dot) :
del attr["label"]
elif isinstance(label, snk.Test) :
attr["arrowhead"] = "none"
attr["label"] = " %s " % label._annotation
elif isinstance(label, snk.Flush) :
attr["arrowhead"] = "box"
attr["label"] = " %s " % label._annotation
def draw (net, target, engine="dot") :
try :
return net.draw(target, engine=engine,
place_attr=place_attr,
trans_attr=trans_attr,
arc_attr=arc_attr)
return net.draw(target, engine=engine)
except :
die(ERR_OUTPUT, str(sys.exc_info()[1]))
......@@ -222,49 +153,36 @@ def save_pnml (net, target) :
## simulator (not standalone)
##
def simulate (source, filename="<string>") :
def simulate (entities, potential, obligatory, filename='<string>') :
global options, snk
getopts(["--simul", filename])
node = parse(source, filename=filename)
snk = snakes.plugins.load(options.plugins, "snakes.nets", "snk")
build = Builder(snk)
net = build.build(node)
net = andy2snakes(snk, entities, potential, obligatory)
net.label(srcfile=filename, snakes=snk)
return ABCDSimulator(node, net, draw(net, None))
return AndySimulator(net)
##
## main
##
def main (args=sys.argv[1:], src=None) :
def main (args=sys.argv[1:]) :
global options, snk
# get options
try:
if src is None :
getopts(args)
else :
getopts(list(args) + ["<string>"])
getopts(args)
except SystemExit :
raise
except :
die(ERR_OPT, str(sys.exc_info()[1]))
# read source
if src is not None :
source = src
else :
try :
source = open(abcd).read()
except :
die(ERR_IO, "could not read input file %r" % abcd)
# parse
try :
node = parse(source, filename=abcd)
except ParseError :
die(ERR_PARSE, str(sys.exc_info()[1]))
except :
bug()
# compile
dirname = os.path.dirname(abcd)
# read andy spec
try:
env = {}
execfile(andy, env)
del env['__builtins__']
except:
die(ERR_IO, "could not read input file %r" % andy)
# compile to net
dirname = os.path.dirname(andy)
if dirname and dirname not in sys.path :
sys.path.append(dirname)
elif "." not in sys.path :
......@@ -273,14 +191,13 @@ def main (args=sys.argv[1:], src=None) :
snk = snakes.plugins.load(options.plugins, "snakes.nets", "snk")
except :
die(ERR_PLUGIN, str(sys.exc_info()[1]))
build = Builder(snk)
try :
net = build.build(node)
net.label(srcfile=abcd, snakes=snk)
except (CompilationError, DeclarationError) :
die(ERR_COMPILE, str(sys.exc_info()[1]))
except :
try:
net = andy2snakes(snk, env['entities'], env['potential'],
env['obligatory'])
except:
bug()
# output
if options.pnml :
save_pnml(net, options.pnml)
......@@ -288,17 +205,10 @@ def main (args=sys.argv[1:], src=None) :
target = getattr(options, "gv%s" % engine)
if target :
draw(net, target, engine)
if options.html :
try :
html(abcd, node, net, draw(net, None), options.html)
except :
bug()
trace, lineno = [], None
if options.check :
lineno, trace = Checker(net).run()
if options.simul :
try :
simul = Simulator(node, net, draw(net, None), options.port)
simul = Simulator(net, options.port)
except :
bug()
simul.start()
......
<!DOCTYPE html>
<html>
<head>
<link type="text/css" href="r/css/bootstrap-theme.css" rel="stylesheet"/>
<link type="text/css" href="r/css/bootstrap.css" rel="stylesheet"/>
<link type="text/css" href="r/css/docs.css" rel="stylesheet"/>
<link type="text/css" href="r/simulator.css" rel="stylesheet"/>
<link type="text/css" href="r/model.css" rel="stylesheet"/>
<script src="r/jquery.min.js"></script>
<script src="r/js/bootstrap.min.js"></script>
<script src="r/jquery.periodic.js"></script>
<script src="r/js/bootstrap.file-input.js"></script>
<script src="r/d3.min.js"></script>
<script src="r/js/petri.js"></script>
<script src="r/simulator.js"></script>
</head>
<head>
<link type="text/css" href="r/css/bootstrap-theme.css" rel="stylesheet"/>
<link type="text/css" href="r/css/bootstrap.css" rel="stylesheet"/>
<link type="text/css" href="r/css/docs.css" rel="stylesheet"/>
<link type="text/css" href="r/simulator.css" rel="stylesheet"/>
<link type="text/css" href="r/model.css" rel="stylesheet"/>
<script src="r/jquery.min.js"></script>
<script src="r/js/bootstrap.min.js"></script>
<script src="r/jquery.periodic.js"></script>
<script src="r/js/bootstrap.file-input.js"></script>
<script src="r/d3.min.js"></script>
<script src="r/js/petri.js"></script>
<script src="r/simulator.js"></script>
<style>
path {
stroke: steelblue;
stroke-width: 2;
fill: none;
}
line {
stroke: black;
}
text {
font-family: Arial;
font-size: 9pt;
}
path { stroke: steelblue;
stroke-width: 2;
fill: none; }
line { stroke: black; }
text { font-family: Arial;
font-size: 9pt; }
</style>
<body>
</head>
<body>
<header class="navbar navbar-static-top">
<div class="container">
<nav class="navbar navbar-default" role="navigation">
<div class="container-fluid">
<div class="navbar-header"><button type="button" class="navbar-toggle" data-toggle="collapse" data-target="#bs-example-navbar-collapse-6"> <span class="sr-only">Toggle navigation</span> <span class="icon-bar"></span> <span class="icon-bar"></span> <span class="icon-bar"></span> </button> <a class="navbar-brand" href="#">Simulation SNAKES</a></div>
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-6">
<ul id="nav_sim" class="nav navbar-nav">
<li><a href="#" id="ui-reset">Reset Simulation</a></li>
<li><a href="#" id="ui-quit">Stop Simulation</a></li>
<li><a href="#" id="ui-help">Help</a></li>
</ul>
<div class="navbar-header">
<button type="button" class="navbar-toggle"
data-toggle="collapse"
data-target="#bs-example-navbar-collapse-6">
<span class="sr-only">Toggle navigation</span>
</button>
<a class="navbar-brand" href="#">Simulation SNAKES</a>
</div>
<div class="collapse navbar-collapse"
id="bs-example-navbar-collapse-6">
<ul class="nav navbar-nav" id="nav_sim">
<li><a id="ui-reset">Reset Simulation</a></li>
<li><a id="ui-quit">Stop Simulation</a></li>
<li><a id="ui-help">Help</a></li>
</ul>
</div><!-- /.navbar-collapse -->
</div>
</nav>
</div>
</header>
<div class="container">
%(model)s
<!-- model to be simulated -->
%(model)s
<div class="modal fade" id="about" tabindex="-1" role="dialog" aria-labelledby="myModalLabel" aria-hidden="true">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header"><button type="button" class="close" data-dismiss="modal" aria-hidden="true">&times;</button>
<h4 class="modal-title" id="myModalLabel">Petri Net</h4>
</div>
<div class="modal-body">
<p><span class="title">ABCD simulator is part of the SNAKES toolkit</span><br /> (C) 2014 Franck Pommereau</p>
<p><a href="http://www.ibisc.univ-evry.fr/~fpommereau/SNAKES" target="_blank">SNAKES homepage</a></p>
</div>
<div class="modal-footer"><button type="button" class="btn btn-default" data-dismiss="modal">Close</button></div>
<!-- information about the simulator -->
<div aria-hidden="true" aria-labelledby="myModalLabel" class="modal fade"
id="about" role="dialog" tabindex="-1">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header">
<button aria-hidden="true" class="close"
data-dismiss="modal" type="button">x</button>
<h4 class="modal-title" id="myModalLabel">Petri Net</h4>
</div>
<div class="modal-body">
<p>
<span class="title">Andy simulator is part of the
SNAKES toolkit</span><br />
(C) 2014 Franck Pommereau
</p>
<p>
<a href="http://www.ibisc.univ-evry.fr/~fpommereau/SNAKES"
target="_blank">SNAKES homepage</a>
</p>
</div>
<div class="modal-footer">
<button class="btn btn-default" data-dismiss="modal"
type="button">Close</button>
</div>
</div>
</div>
<script>
$(document).ready(function(){
$.simisc({
"nav":{
"about": {
"text": "About",
"attr": {
"id":"ui-about",
"data-toggle": "modal",
"data-target": "#about"
}
},
"net": {
"text": "Petri Net",
"attr": {
"id":"ui-net",
"data-toggle": "modal",
"data-target": "#net"
}
}
},
"graph": {
"activate": false
}
});
});
</script>
</div>
<script>
$(document).ready(function(){
$.simisc({ "nav":{ "about": { "text": "About",
"attr": { "id":"ui-about",
"data-toggle": "modal",
"data-target": "#about"
} } },
"graph": { "color": { "y":"red",
"x":"blue" } } });
});
</script>
</body>
</html>
......
<div class="container-fluid">
<div class="row">
<div class="page-header">
<h1><tt>%(filename)s</tt> <small> powered by Franck</small></h1>
<h1><tt>Andy Simulator</tt> <small> powered by Franck</small></h1>
</div>
</div>
<div class="row">
<div class="col-md-3">
<div class="row">
<div class="col-md-4">
<div class="row">
<h3>Player</h3>
<div id="player"></div>
<div id="player">&nbsp;</div>
</div>
</div>
</div>
<div id="model" class="row">
<div class="col-md-6">
<div class="row">
<h3>ABCD</h3>
<div class="abcd">%(abcd)s</div>
<h3>Modes</h3>
<div id="modes">&nbsp;</div>
</div>
</div>
<div class=col-md-6>
<div class="row">
<h3>Tree</h3>
<div class="tree">%(tree)s</div>
<h3>Command Panel</h3>
<div id="Command_center">&nbsp;</div>
</div>
</div>
<div class="col-md-12">
<div id="trace_zone"></div>
<div class="col-md-8">
<div class="row">
<h3>Net Graph</h3>
<div id="graph">&nbsp;</div>
</div>
</div>
</div>
<div class="modal fade" id="net" tabindex="-1" role="dialog" aria-labelledby="myModalLabel" aria-hidden="true">
<div class="modal-dialog" style="width:auto;">
<div class="modal-content" style="overflow: scroll;">
%(net)s
</div>
<div class="row">
<div class="col-md-12">
<div id="trace_zone">&nbsp;</div>
</div>
</div>
\ No newline at end of file
</div>
</div>
......
from snakes.utils.simul import BaseSimulator, BaseHTTPSimulator
import snakes.utils.abcd.html as html
class ABCDSimulator (BaseSimulator) :
def __init__ (self, node, net, gv) :
BaseSimulator.__init__(self, net)
a2html = html.ABCD2HTML(node)
n2html = html.Net2HTML(net, gv, a2html)
self.info = {"filename" : node.st.filename,
"abcd" : a2html.html(),
"tree" : n2html.html(),
"net" : n2html.svg()}
self.tree = {}
for node in net.node() :
nid = gv.nodemap[node.name]
if nid in n2html.n2t :
self.tree[node.name] = "#" + n2html.n2t[nid]
self.places = [place.name for place in net.place()
if place.name in self.tree]
self.abcd = {}
self.transid = []
for trans in net.transition() :
nid = gv.nodemap[trans.name]
self.transid.append(self.tree[trans.name])
if nid in n2html.n2a :
self.abcd[trans.name] = ", ".join("#" + i for i in
n2html.n2a[nid])
def init (self, state=-1) :
res = BaseSimulator.init(self, state)
res.update(self.info)
res["help"] = self.init_help()
return res
def init_help (self) :
help = BaseSimulator.init_help(self)
help.update({
"#model .abcd" : {
"title" : "Source",
"content" : "ABCD source code"
},
"#model .tree" : {
"title" : "State",
"content" : "hierarchy of ABCD objects",
},
"#model .petrinet" : {
"title" : "Net",
"content" : "Petri nets semantics"
}})
return help
class AndySimulator (BaseSimulator) :
def getstate (self, state) :
ret = BaseSimulator.getstate(self, state)
marking = self.states[state]
modes = dict((t, []) for t in self.transid)
for i, (trans, mode) in enumerate(marking.modes) :
modes[self.tree[trans.name]].append({"state" : state,
"mode" : i,
"html" : str(mode)})
return {"id" : state,
"states" :
[{"do" : "dropclass",
"select" : "#model .active",
"class" : "active"}]
+ [{"do" : "addclass",
"select" : self.abcd[trans.name],
"class" : "active"} for trans, mode in marking.modes]
+ [{"do" : "addclass",
"select" : self.tree[trans.name],
"class" : "active"} for trans, mode in marking.modes]
+ [{"do" : "settext",
"select" : "%s .content" % self.tree[place],
"text" : "{}"} for place in self.places
if place not in marking]
+ [{"do" : "settext",
"select" : "%s .content" % self.tree[place],
"text" : str(marking[place])} for place in marking
if place in self.tree],
"modes" : [{"select" : "%s + .modes" % trans,
"items" : items}
for trans, items in modes.items()],
}
ret["variables"] = dict((place, tokens.items()[0])
for place, tokens in marking.items())
ret["groups"] = ["timed", "even", "odd"]
modes = []
for i, (trans, binding) in enumerate(marking.modes) :
if (state + i) % 5 == 0 :
groups = ["timed"]
else :
groups = []
modes.append(
{"state" : state,
"mode" : i,
"html" : "%s (%s)" % (trans.name[7:], binding),
"groups" : groups + ["odd" if (state % 2) else "even"]
})
ret["modes"] = [{"select": "#modes", "items": modes}]
return ret
#class Simulator (BaseHTTPSimulator) :
# def __init__ (self, **system) :
# simul = AndySimulator(**system)
# BaseHTTPSimulator.__init__(self, simulator=simul)
class Simulator (BaseHTTPSimulator) :
def __init__ (self, node, net, gv, port) :
simul = ABCDSimulator(node, net, gv)
def __init__ (self, net, port) :
simul = AndySimulator(net)
BaseHTTPSimulator.__init__(self, net, simulator=simul, port=port)
def init_model (self) :
return self.res["model.html"] % self.simul.info
def init_ui (self) :
return BaseHTTPSimulator.init_ui(self)[:-1] + [{
"label" : "Show net",
"id" : "ui-shownet",
"href" : "#",
"script" : "dialog($('#model .petrinet').html())"
}]
# def init_model (self) :
# return self.res["model.html"] % self.simul.info
# def init_ui (self) :
# return BaseHTTPSimulator.init_ui(self)[:-1] + [{
# "label" : "Show net",
# "id" : "ui-shownet",
# "href" : "#",
# "script" : "dialog($('#model .petrinet').html())"
# }]
......
from snakes.lang.abcd.parser import ast
class NodeCopier (ast.NodeTransformer) :
def copy (self, node, **replace) :
args = {}
for name in node._fields + node._attributes :
old = getattr(node, name, None)
if name in replace :
new = replace[name]
elif isinstance(old, list):
new = []
for val in old :
if isinstance(val, ast.AST) :
new.append(self.visit(val))
else :
new.append(val)
elif isinstance(old, ast.AST):
new = self.visit(old)
else :
new = old
args[name] = new
if hasattr(node, "st") :
args["st"] = node.st
return node.__class__(**args)
def generic_visit (self, node) :
return self.copy(node)
class ArgsBinder (NodeCopier) :
def __init__ (self, args, buffers, nets, tasks) :
NodeCopier.__init__(self)
self.args = args
self.buffers = buffers
self.nets = nets
self.tasks = tasks
def visit_Name (self, node) :
if node.id in self.args :
return self.copy(self.args[node.id])
else :
return self.copy(node)
def visit_Instance (self, node) :
if node.net in self.nets :
return self.copy(node, net=self.nets[node.net])
else :
return self.copy(node)
def _visit_access (self, node) :
if node.buffer in self.buffers :
return self.copy(node, buffer=self.buffers[node.buffer])
else :
return self.copy(node)
def visit_SimpleAccess (self, node) :
return self._visit_access(node)
def visit_FlushAccess (self, node) :
return self._visit_access(node)
def visit_SwapAccess (self, node) :
return self._visit_access(node)
def _visit_task (self, node) :
if node.net in self.tasks :
return self.copy(node, net=self.tasks[node.net])
else :
return self.copy(node)
def visit_Spawn (self, node) :
return self._visit_task(node)
def visit_Wait (self, node) :
return self._visit_task(node)
def visit_Suspend (self, node) :
return self._visit_task(node)
def visit_Resume (self, node) :
return self._visit_task(node)
def visit_AbcdNet (self, node) :
args = self.args.copy()
buffers = self.buffers.copy()
nets = self.nets.copy()
tasks = self.tasks.copy()
netargs = ([a.arg for a in node.args.args + node.args.kwonlyargs]
+ [node.args.vararg, node.args.kwarg])
copy = True
for a in netargs :
for d in (args, buffers, nets, tasks) :
if a in d :
del d[a]
copy = False
if copy :
return self.copy(node)
else :
return self.__class__(args, buffers, nets, tasks).visit(node)
if __name__ == "__main__" :
import doctest
doctest.testmod()