Franck Pommereau

API doc extraction

Showing 102 changed files with 4479 additions and 0 deletions
syntax: glob
doc/api/*
dist/*
dput.sh
*.pyc
*~
.gone
,*
*.class
abcd crash on [foo>>(s), foo<<(s+s)]
[foo(x,y,z)] creates a multi-arc instead of a tuple
abcd substitues for loops variables:
net Foo (x) :
buffer b : int = [x for x in range(4)]
^ ^
substituted (should not)
errors during expressions evaluation are directly sent to user, should be wrapper to indicate this is a model error. And should be catched in abcd --simul/check
recursive-include doc *
include *
all:
@echo "Commands:"
@echo " release prepare source for release"
@echo " tgz build a source tarball"
@echo " doc build API documentation"
@echo " dput build and upload Ubuntu packages"
@echo " clean delete some garbage files"
@echo " test run tests through supported Python implementations"
@echo " next-deb increments debian/VERSION"
@echo " next-ppa increments debian/PPA"
@echo " lang build generated files in snakes/lang"
@echo " emacs compile Emacs files"
committed:
hg summary|grep -q '^commit: (clean)$$'
next-deb:
echo 1 > debian/PPA
echo $$((1+$$(cat debian/VERSION))) > debian/VERSION
emacs:
emacs -batch -f batch-byte-compile utils/abcd-mode.el
next-ppa:
echo $$((1+$$(cat debian/PPA))) > debian/PPA
release: committed test doc tgz
hg tag version-$$(cat VERSION)
echo 1 > debian/PPA
echo 1 > debian/VERSION
hg commit -m "version $$(cat VERSION)"
hg push
lang:
python mklang.py
tgz: committed
hg archive snakes-$$(cat VERSION)-$$(cat debian/VERSION)
cd snakes-$$(cat VERSION)-$$(cat debian/VERSION) && make doc
tar cf snakes-$$(cat VERSION)-$$(cat debian/VERSION).tar snakes-$$(cat VERSION)-$$(cat debian/VERSION)
rm -rf snakes-$$(cat VERSION)-$$(cat debian/VERSION)
gzip -9 snakes-$$(cat VERSION)-$$(cat debian/VERSION).tar
gpg --armor --sign --detach-sig snakes-$$(cat VERSION)-$$(cat debian/VERSION).tar.gz
doc: snakes/*.py snakes/plugins/*.py snakes/utils/*.py snakes/lang/*.py
make -C doc
dput.sh: VERSION debian/*
python mkdeb.py
dput: committed dput.sh
sh dput.sh
clean:
rm -f $$(find . -name ",*")
rm -f $$(find . -name "*.pyc")
rm -f $$(find . -name "*~")
rm -f $$(find . -name "*.class")
rm -rf $$(find . -type d -name __pycache__)
test:
python2.5 test.py
python2.6 test.py
python2.7 test.py
python3 test.py
unladen test.py
pypy test.py
spypy test.py
stackless test.py
jython test.py
+ emacs mode for ABCD
! fixex nodes merge in plugin labels
! fixed nets.MultiArc.flow (thanks to Jan Ciger's report)
version 0.9.13 (Fri Jul 16 17:01:22 CEST 2010):
+ added inhibitor arcs
+ added Ubuntu Lucid (10.04) package
! fixed data.WordSet.fresh when base is used
! fixed reduce(xor, []) is some __hash__ methods
+ added PetriNet.layout method in snakes.plugins.gv
version 0.9.12 (Thu Apr 1 19:42:33 CEST 2010):
+ removed PyGraphviz dependency (layout method supressed temporarily)
+ now compatible with PyPy (1.2), Unladen-Swallow and Jython (2.5.1)
! fixed snakes.plugins.clusters.rename_node
! fixed snakes.nets.Flush.flow
! fixed an uncaught exception in snakes.data.MultiSet.__eq__
and snakes.data.Symbol.__eq__
! hopefully fixed LALR built in ABCD compiler
! fixed hash-related issues
* moved PLY stuff to snakes.utils.abcd
* snakes.compyler has been completely replaced (PLY dependency removed)
version 0.9.11 (Thu Mar 25 19:32:31 CET 2010):
! fixed various doctests
! fixed issues with attributes locking
! fixed issues related to missing __hash__ methods
! fixed renaming a node to itself in ABCD
+ added snakes.nets.Evaluator.__contains__
+ added base argument to snakes.data.WordSet
+ added option --symbols to ABCD compiler
+ added snakes.data.Symbol
+ added net instances naming in ABCD
+ added logo
+ added let function to update bindings from expressions
+ added snakes.data.Subtitution.__setitem__
version 0.9.10 (Fri Jun 19 13:32:45 CEST 2009):
! fixed inconstent hashing on clusters
! fixed mutability of hashed multisets
! fixed snakes.nets.Tuple.mode
version 0.9.9 (Tue, 19 May 2009 15:00:00 +0100):
+ added mkdeb.py to build deb packages for multiple distributions
+ ported to Python 2.6
version 0.9.8 (lun, 23 mar 2009 17:30:34 (CET)):
+ added graph_attr option to snakes.plugins.gv.StateGraph
* plugin gv now draws partially constructed marking graphs
! fixed expression compilation in present of net parameters in ABCD
compiler
! fixed flush arcs binding
* plugin lashdata has been removed because its too experimental
* merged plugin decorators into a single one
version 0.9.7 (Tue, 20 Jan 2009 13:04:15 +0100):
! fixed sharing of globals between net components
! fixed loading of PNML when some plugins fail to load
+ added cpp option to abcd
+ added some docstrings and doctests
! fixed sharing of globals between net components
version 0.9.6 (Fri, 28 Nov 2008 15:22:27 +0100):
+ added doc for ABCD
! fixed False and True handling in ABCD compiler
+ added cross product types to ABCD
version 0.9.5 (Wed, 19 Nov 2008 21:42:05 +0100):
+ added distutils setup.py
* strip PNML data before decoding (work around invalid XML)
! fixed Multiset.__pnmlload__ on iterable values
version 0.9.4 (mar, 28 oct 2008 12:28:02 (UTC+0100)):
! fixed nets.Value.__eq__, __ne__ and __hash__
! fixed nets.Token.__repr__
+ added Flush arcs
! fixed nets.Tuple.bind
! fixed PNML dump/load of subclasses
! fixed nets.PetriNet.merge_* in the presence of Tuple arcs
! fixed plugins.clusters.Cluster.path
! fixed plugins.status.PetriNet.__pnmlload__
! fixed ABCD lexer
+ improved ABCD compiler with parametric nets, Python declarations
! fixed black token values in ABCD arcs and flush
+ ABCD compiler launches pdb.post_mortem when an error occurs in
+ added TCP mode to query plugin debug mode
* removed dependency to epydoc in snakes.typing
+ updated PNML doc
+ added a producer/consumer ABCD example
version 0.9.3 (lun, 29 sep 2008 08:04:55 (CEST)):
! fixed a bug in place pnml loading
! fixed wrong association of pnml tags to classes
+ improved query and associated programs, added documentation
* improved loading of PNML files with plugins
* PNML tag <snakes> can only occur once as a child of <pnml>
+ abcd compiler adds many structural information to PNML (including
+ test.py checks is snakes.version is correct AST if asked)
+ query plugins now has two verbosity levels
+ added snakes.compyler.Tree.__pnmldump__()
version 0.9.2
+ added query plugin and demon client/server programs
! various small bugs fixed
+ improved PNML serialisation of standard objects
+ snakes.plugins.load puts new module in caller's environment
! fixed snakes.pnml.Tree.update
* Substitution.image() returns a set
+ added apix.py
! PNML export empty arcs fixed
! fixed broken abcd.py
* updated abcd.py so it uses gv
* improved clustering efficiency
version 0.9.1
+ added plugin gv to replace graphviz
+ added plugin clusters to manage clusters of nodes
+ updated plugins ops so it builds clusters
* the plugin posops is deprecated since gv does the work much better
version 0.9
* dropped compatibilty with Python 2.4
+ finished pnml
* the compyler module has been completely replaced
Changes in 0.8.4 (06.09.2007 11:30:21):
+ updated the tutorial
Changes in 0.8.3 (29.06.2007 11:57:39):
+ snakes.plugins.graphivz: added options to control the layout
+ snakes.plugins.graphviz: improved the rendering
+ updated the tutorial
Changes in 0.8.2 (22.06.2007 13:09:02):
+ snakes.plugins.ops: added a name hiding operator (PetriNet.__div__)
! fixed several copy/clone problems
Changes in 0.8.1 (20.06.2007 10:18:17):
* updated tutorial
+ snakes.plugins.pos: the position of a node can be redefined when
! snakes.plugins.pos: accept float positions when loading from pnml
Changes in 0.8 (19.06.2007 17:19:19): First public release.
SNAKES is the Net Algebra Kit for Editors and Simulators
========================================================
////////////////////////////////////////////////////////////////
This file is formatted in order to be processed by AsciiDoc
(http://www.methods.co.nz/asciidoc). It will be more comfortable
to render it or to read the HTML version available at:
http://www.univ-paris12.fr/pommereau/soft/snakes/index.html
////////////////////////////////////////////////////////////////
SNAKES is a Python library that provides all then necessary to define
and execute many sorts of Petri nets, in particular those of the PBC
and M-nets family. Its main aim is to be a general Petri net library,
being able to cope with most Petri nets models, and providing the
researcher with a tool to quickly prototype its new ideas. SNAKES
should be suitable to provide the data model for editors or
simulators; actually, any editor that use SNAKES may also be a
simulator as SNAKES can execute any net.
A key feature of SNAKES is the ability to use arbitrary Python objects
as tokens and arbitrary Python expressions in many points, for
instance in transitions guards or arcs outgoing of transitions. This
is what makes SNAKES that general. This relies on the capability of
Python to run dynamically provided Python code (the $eval$ function).
This feature may not be efficient enough for model-checking: speed is
the price to pay for the wide generality. However, in the case of a
new model, SNAKES may happen to be the only available tool.
Another important feature of SNAKES is the plugin system that allows
to extend the features and work with specialised classes of Petri
nets. Currently, the following plugins are provided:
pos:: adds to nodes the capability of holding their position. Nodes
can be moved or shifted, Petri nets can be shifted globally and their
bounding box can be computed.
gv:: adds a method to draw a Petri net or a state graph using the tool
http://www.graphviz.org[GraphViz] (through the Python binding
http://networkx.lanl.gov/wiki/pygraphviz[PyGraphViz]). This module
replaces the previous plugin called _graphviz_ and provides more
flexibility and security, _graphviz_ is still provided but deprecated.
status:: extends the Petri net model by adding status to the nodes.
This is similar to what is used in the models of the PBC or Mnets
family. Nodes can then merged automatically according to their status.
ops:: this plugins defines control flow operations on Petri nets
usually found in the PBC and Mnets family. Nets can be composed in
parallel, sequence, choice and iteration. These operations rely on the
places status.
labels:: allows to add arbitrary labels to most objects (places,
transitions, nets, ...)
posops:: combines the features of pos and ops plugins: the control
flow operations are modified in order to rearrange the nodes position
in order to provide well shaped nets. This plugin is deprecated
because the new _gv_ does the work much better.
synchro:: it defines the label-based transition synchronisation
defined in the Mnets model.
// export:: allows to save Petri nets objects in the format of the tools
// http://pep.sourceforge.net[PEP], http://helena.cnam.fr[Helena] and
// http://maria[Maria]. http://pnml[PNML] is also supported as it is
// built-in SNAKES.
lashdata:: allows to define data that is not handled in the places of
the Petri net but stored instead in the special structures handled by
the http://www.montefiore.ulg.ac.be/~boigelot/research/lash/[library
Lash]. This allows in particular to aggregate possibly infinite states
into one meta-state.
clusters:: this is an auxiliary plugin that allows to group nodes in a
Petri net. This feature is used by _ops_ in order to record how a net
is constructed, which is exploited by _gv_ in order to build a nice
layout of composed nets.
Getting SNAKES and installing it
--------------------------------
Download http://www.univ-paris12.fr/lacl/pommereau/soft/snakes/snakes-{VERSION}.tar.gz[$snakes-{VERSION}.tar.gz$]
({sys:../stat snakes-{VERSION}.tar.gz})
To install SNAKES, uncompress the archive and copy the directory
snakes in a location where Python can find it (_i.e._, in a directory
listed in your $PYTHONPATH$ environment variable).
SNAKES should work with a Python version at least 2.5 but will _not_
work for an older version. Optionally, you may want to install
additional software required by some plugins:
gv:: depends on http://www.graphviz.org[GraphViz] and its Python
binding http://networkx.lanl.gov/wiki/pygraphviz[PyGraphViz]. The
plugin _graphviz_ depends on GraphViz only but is now deprecated.
lashdata:: requires
http://www.montefiore.ulg.ac.be/~boigelot/research/lash[Lash] and
the
http://www.univ-paris12.fr/lacl/pommereau/soft/index.html#PyLash[Python
Lash binding].
[NOTE]
=====================
(C) 2007 Franck Pommereau <pommereau@univ-paris12.fr>
This library is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as
published by the Free Software Foundation; either version 2.1 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
USA
=====================
Contact
-------
Please feel free to send me comments, questions, bug reports or
contributions by mailto:pommereau@univ-paris12.fr[email].
If you wish to be notified of the new releases of SNAKES, please
register at the http://freshmeat.net/projects/snakes[FreshMeat page].
You may contribute to SNAKES by either send patches by email, or by
using the https://launchpad.net/snakes[SNAKES Launchpad page].
Documentation
-------------
A good starting point may be the link:tutorial.html[tutorial]. Then,
you may find the link:api/index.html[API reference manual] useful, it
documents all the API and gives number of examples of how to use the
various classes and functions.
If you do not program Python, you can learn it in a few hours thanks
to the very good http://docs.python.org/tut/tut.html[Python tutorial].
In order to know more about the PBC and M-nets family or the Petri net
compositions defined in the plugins, you may read papers from
http://www.univ-paris12.fr/lacl/pommereau/publis[my publications page]
(in particular those with _calculus_ in the title).
* add name to transitions, eg, [buff-(x) as name]
make it consistent with instance names?
* zero test/inhibitor arc
! parameter and global buffer with same names => parameter ignored
! accept net instances with too much parameters
0.9.16
#!/usr/bin/env python
import snakes.utils.abcd.main as abcdmain
abcdmain.main()
#!/usr/bin/env python
import socket, sys, readline
from snakes.pnml import dumps, loads
from snakes.plugins.query import Query
env = {}
def public (fun) :
env[fun.__name__] = fun
return fun
@public
def set (*larg, **karg) :
"""set(name, value) -> None
assign value (object) to name (str) on the server"""
return Query("set", *larg, **karg)
@public
def get (*larg, **karg) :
"""get(name) -> object
return the last value assigned to name (str)"""
return Query("get", *larg, **karg)
@public
def delete (*larg, **karg) :
"""delete(name) -> None
discard name (str)"""
return Query("del", *larg, **karg)
@public
def call (*larg, **karg) :
"""call(obj, ...) -> object
call obj (str or result from another call) with the additional arguments
return whatever the called object returns"""
return Query("call", *larg, **karg)
@public
def help (command=None) :
"""help(command) -> None
print help about command, if no command is given, list available commands"""
if command is None:
print "commands:", ", ".join(repr(cmd) for cmd in env
if not cmd.startswith("_"))
print " type 'help(cmd)' to ge help about a command"
elif command in env :
print env[command].__doc__
elif command.__name__ in env :
print command.__doc__
else :
print "unknown command %r" % command
@public
def quit () :
"""quit() -> None
terminate the client"""
print "bye"
sys.exit(0)
@public
def load (path) :
"""net(path) -> object
load a PNML file from path (str) and return the object is represents"""
return loads(open(path).read())
@public
def show (query) :
"""show(obj) -> None
show the PNML representation of obj (object), for instance of a query"""
print dumps(query)
_verbose = False
@public
def verbose (state=None) :
"""verbose(state) -> None
turn on (state=True), off (state=False) or toggle (state not
given) the printing of queries before they are sent to the
server"""
global _verbose
if state is None :
_verbose = not _verbose
else :
_verbose = state
if _verbose :
print "dump of queries enabled"
else :
print "dump of queries disabled"
try :
if sys.argv[1] in ("-t", "--tcp") :
proto = "TCP"
del sys.argv[1]
else :
proto = "UDP"
host, port = sys.argv[1:]
port = int(port)
except :
print >>sys.stderr, "Usage: snkc [--tcp] HOST PORT"
sys.exit(1)
sock = None
def sendto (data, address) :
global sock
if proto == "UDP" :
if sock is None :
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(2)
sock.sendto(data, address)
else :
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect(address)
sock.send(data)
def recvfrom (size) :
global sock
if proto == "UDP" :
data, address = sock.recvfrom(size)
else :
parts = []
while True :
parts.append(sock.recv(size))
if len(parts[-1]) < size :
break
address = sock.getpeername()
sock.close()
data = "".join(parts)
return data, address
while True :
try :
data = raw_input("? ")
q = eval(data.strip(), env)
except (EOFError, KeyboardInterrupt) :
quit()
except SystemExit :
raise
except Exception, e :
print "query error:", e
continue
if q is not None :
q = dumps(q)
if _verbose :
print "# query to %s:%u" % (host, port)
print q
sendto(q, (host, port))
try :
data, address = recvfrom(2**20)
if _verbose :
print "# answer from %s:%u" % address
print data.strip()
except socket.timeout :
print "# no answer received (timeout)"
print
#!/usr/bin/env python
import sys
import snakes.plugins
snakes.plugins.load("query", "snakes.nets", "nets")
port = 1234
size = 2**20
verbose = 0
proto = "UDP"
def help () :
print "Usage: snkd [OPTION]"
print "Options:"
print " -p PORT, --port PORT listen on port number PORT"
print " -t, --tcp use TCP instead of UDP"
print " -s SIZE, --size SIZE set buffer size for inputs"
print " -v, --verbose display information about queries"
print " (use '-v' twice to dump queries/answers)"
print " -h, --help print this help and exit"
args = sys.argv[1:]
try :
while len(args) > 0 :
arg = args.pop(0)
if arg in ("-p", "--port") :
port = int(args.pop(0))
elif arg in ("-v", "--verbose") :
verbose += 1
elif arg in ("-t", "--tcp") :
proto = "TCP"
elif arg in ("-s", "--size") :
size = int(args.pop(0))
elif arg in ("-h", "--help") :
help()
sys.exit(0)
else :
print >>sys.stderr("snkd: invalid command %r" % arg)
sys.exit(1)
except SystemExit :
raise
except :
cls, val, tb = sys.exc_info()
print >>sys.stderr, "snkd: %s, %s" % (cls.__name__, val)
sys.exit(1)
if verbose :
print "# starting"
print "# listen on: %s:%u" % (proto, port)
print "# buffer size: %uMb" % (size/1024)
print "# verbosity:", verbose
try :
if proto == "UDP" :
nets.UDPServer(port, size=size, verbose=verbose).run()
else :
nets.TCPServer(port, size=size, verbose=verbose).run()
except KeyboardInterrupt :
print "# bye"
except :
cls, val, tb = sys.exc_info()
if verbose > 1 :
raise
elif verbose :
print "# fatal error"
print >>sys.stderr, "snkd: %s, %s" % (cls.__name__, val)
sys.exit(2)
lucid 10.04 LTS
hardy 8.04 LTS
oneiric 11.10
precise 12.04 LTS
python-snakes (0.9.16-1) UNRELEASED; urgency=low
* see NEWS
-- Franck Pommereau <pommereau@univ-paris12.fr> Tue, 07 Jun 2011 12:23:33 +0200
python-snakes (0.9.15-1) UNRELEASED; urgency=low
* see NEWS
-- Franck Pommereau <pommereau@univ-paris12.fr> Tue, 10 May 2011 18:22:34 +0200
python-snakes (0.9.14-1) UNRELEASED; urgency=low
* see NEWS
-- Franck Pommereau <pommereau@univ-paris12.fr> Mon, 11 Apr 2011 16:45:50 +0200
python-snakes (0.9.13-2) UNRELEASED; urgency=low
* see NEWS
-- Franck Pommereau <pommereau@univ-paris12.fr> Fri, 23 Jul 2010 20:06:53 +0200
python-snakes (0.9.13-1) UNRELEASED; urgency=low
* see NEWS
-- Franck Pommereau <pommereau@univ-paris12.fr> Fri, 16 Jul 2010 17:09:14 +0200
python-snakes (0.9.12-2) UNRELEASED; urgency=low
* see NEWS
-- Franck Pommereau <pommereau@univ-paris12.fr> Sat, 03 Apr 2010 21:06:50 +0200
python-snakes (0.9.12-1) UNRELEASED; urgency=low
* see NEWS
-- Franck Pommereau <pommereau@univ-paris12.fr> Thu, 01 Apr 2010 19:46:02 +0200
python-snakes (0.9.11-1) UNRELEASED; urgency=low
* see NEWS
-- Franck Pommereau <pommereau@univ-paris12.fr> Thu, 25 Mar 2010 19:39:47 +0100
python-snakes (0.9.10-1) UNRELEASED; urgency=low
* see NEWS
-- Franck Pommereau <pommereau@univ-paris12.fr> Fri, 19 Jun 2009 13:40:36 +0200
python-snakes (0.9.9-2) UNRELEASED; urgency=low
* see NEWS
-- Franck Pommereau <pommereau@univ-paris12.fr> Tue, 19 May 2009 09:29:46 +0200
python-snakes (0.9.8-2) UNRELEASED; urgency=low
* see NEWS
-- Franck Pommereau <pommereau@univ-paris12.fr> Tue, 14 Apr 2009 20:02:32 +0200
python-snakes (0.9.8-1) UNRELEASED; urgency=low
* see NEWS
-- Franck Pommereau <pommereau@univ-paris12.fr> Mon, 23 Mar 2009 17:34:06 +0100
python-snakes (0.9.7-1) UNRELEASED; urgency=low
* see NEWS
-- Franck Pommereau <pommereau@univ-paris12.fr> Tue, 20 Jan 2009 13:07:09 +0100
python-snakes (0.9.6-2) UNRELEASED; urgency=low
* see NEWS
-- Franck Pommereau <pommereau@univ-paris12.fr> Tue, 02 Dec 2008 17:47:43 +0100
python-snakes (0.9.6-1) UNRELEASED; urgency=low
* see NEWS
-- Franck Pommereau <pommereau@univ-paris12.fr> Fri, 28 Nov 2008 15:24:05 +0100
python-snakes (0.9.5-2) UNRELEASED; urgency=low
* Fixed doc installation path
-- Franck Pommereau <pommereau@univ-paris12.fr> Fri, 21 Nov 2008 13:20:55 +0100
python-snakes (0.9.5-1) UNRELEASED; urgency=low
* Trying to fix build on Launchpad
-- Franck Pommereau <pommereau@univ-paris12.fr> Fri, 21 Nov 2008 08:19:04 +0100
python-snakes (0.9.5) UNRELEASED; urgency=low
* See NEWS
-- Franck Pommereau <pommereau@univ-paris12.fr> Wed, 19 Nov 2008 21:47:52 +0100
python-snakes (0.9.4) UNRELEASED; urgency=low
* Initial release. (Closes: #XXXXXX)
-- Franck Pommereau <pommereau@univ-paris12.fr> Tue, 18 Nov 2008 12:09:16 +0100
Source: python-snakes
Section: python
Priority: extra
Maintainer: Franck Pommereau <pommereau@univ-paris12.fr>
Homepage: http://lacl.univ-paris12.fr/pommereau/soft/snakes
Build-Depends: python (>=2.5), cdbs (>=0.4.49), debhelper (>= 5), python-central (>=0.5.6)
XS-Python-Version: >=2.5
Standards-Version: 3.7.2
Package: python-snakes
Architecture: all
XB-Python-Version: ${python:Versions}
Depends: python (>=2.5), python-central, graphviz, python-tk
Description: SNAKES is the Net Algebra Kit for Editors and Simulators
SNAKES is a general purpose Petri net Python library allowing to
define and execute most classes of Petri nets. It features a plugin
system to allow its extension. In particular, plugins are provided to
implement the operations usually found in the PBC and M-nets family.
This package was debianized by Franck Pommereau <pommereau@univ-paris12.fr> on
DATE
License:
This package is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public License
as published by the Free Software Foundation; either version 2 of
the License, or (at your option) any later version.
This package is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this package; if not, see
`http://www.gnu.org/licenses/lgpl-3.0.txt'.
On Debian systems, the complete text of the GNU General
Public License can be found in `/usr/share/common-licenses/LGPL'.
The Debian packaging is (C) 2008, Franck Pommereau
<pommereau@univ-paris12.fr> and is licensed under the LGPL, see above.
#!/usr/bin/make -f
DEB_PYTHON_SYSTEM=pycentral
include /usr/share/cdbs/1/rules/debhelper.mk
include /usr/share/cdbs/1/class/python-distutils.mk
GNU LESSER GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
This version of the GNU Lesser General Public License incorporates
the terms and conditions of version 3 of the GNU General Public
License, supplemented by the additional permissions listed below.
0. Additional Definitions.
As used herein, "this License" refers to version 3 of the GNU Lesser
General Public License, and the "GNU GPL" refers to version 3 of the GNU
General Public License.
"The Library" refers to a covered work governed by this License,
other than an Application or a Combined Work as defined below.
An "Application" is any work that makes use of an interface provided
by the Library, but which is not otherwise based on the Library.
Defining a subclass of a class defined by the Library is deemed a mode
of using an interface provided by the Library.
A "Combined Work" is a work produced by combining or linking an
Application with the Library. The particular version of the Library
with which the Combined Work was made is also called the "Linked
Version".
The "Minimal Corresponding Source" for a Combined Work means the
Corresponding Source for the Combined Work, excluding any source code
for portions of the Combined Work that, considered in isolation, are
based on the Application, and not on the Linked Version.
The "Corresponding Application Code" for a Combined Work means the
object code and/or source code for the Application, including any data
and utility programs needed for reproducing the Combined Work from the
Application, but excluding the System Libraries of the Combined Work.
1. Exception to Section 3 of the GNU GPL.
You may convey a covered work under sections 3 and 4 of this License
without being bound by section 3 of the GNU GPL.
2. Conveying Modified Versions.
If you modify a copy of the Library, and, in your modifications, a
facility refers to a function or data to be supplied by an Application
that uses the facility (other than as an argument passed when the
facility is invoked), then you may convey a copy of the modified
version:
a) under this License, provided that you make a good faith effort to
ensure that, in the event an Application does not supply the
function or data, the facility still operates, and performs
whatever part of its purpose remains meaningful, or
b) under the GNU GPL, with none of the additional permissions of
this License applicable to that copy.
3. Object Code Incorporating Material from Library Header Files.
The object code form of an Application may incorporate material from
a header file that is part of the Library. You may convey such object
code under terms of your choice, provided that, if the incorporated
material is not limited to numerical parameters, data structure
layouts and accessors, or small macros, inline functions and templates
(ten or fewer lines in length), you do both of the following:
a) Give prominent notice with each copy of the object code that the
Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the object code with a copy of the GNU GPL and this license
document.
4. Combined Works.
You may convey a Combined Work under terms of your choice that,
taken together, effectively do not restrict modification of the
portions of the Library contained in the Combined Work and reverse
engineering for debugging such modifications, if you also do each of
the following:
a) Give prominent notice with each copy of the Combined Work that
the Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the Combined Work with a copy of the GNU GPL and this license
document.
c) For a Combined Work that displays copyright notices during
execution, include the copyright notice for the Library among
these notices, as well as a reference directing the user to the
copies of the GNU GPL and this license document.
d) Do one of the following:
0) Convey the Minimal Corresponding Source under the terms of this
License, and the Corresponding Application Code in a form
suitable for, and under terms that permit, the user to
recombine or relink the Application with a modified version of
the Linked Version to produce a modified Combined Work, in the
manner specified by section 6 of the GNU GPL for conveying
Corresponding Source.
1) Use a suitable shared library mechanism for linking with the
Library. A suitable mechanism is one that (a) uses at run time
a copy of the Library already present on the user's computer
system, and (b) will operate properly with a modified version
of the Library that is interface-compatible with the Linked
Version.
e) Provide Installation Information, but only if you would otherwise
be required to provide such information under section 6 of the
GNU GPL, and only to the extent that such information is
necessary to install and execute a modified version of the
Combined Work produced by recombining or relinking the
Application with a modified version of the Linked Version. (If
you use option 4d0, the Installation Information must accompany
the Minimal Corresponding Source and Corresponding Application
Code. If you use option 4d1, you must provide the Installation
Information in the manner specified by section 6 of the GNU GPL
for conveying Corresponding Source.)
5. Combined Libraries.
You may place library facilities that are a work based on the
Library side by side in a single library together with other library
facilities that are not Applications and are not covered by this
License, and convey such a combined library under terms of your
choice, if you do both of the following:
a) Accompany the combined library with a copy of the same work based
on the Library, uncombined with any other library facilities,
conveyed under the terms of this License.
b) Give prominent notice with the combined library that part of it
is a work based on the Library, and explaining where to find the
accompanying uncombined form of the same work.
6. Revised Versions of the GNU Lesser General Public License.
The Free Software Foundation may publish revised and/or new versions
of the GNU Lesser General Public License from time to time. Such new
versions will be similar in spirit to the present version, but may
differ in detail to address new problems or concerns.
Each version is given a distinguishing version number. If the
Library as you received it specifies that a certain numbered version
of the GNU Lesser General Public License "or any later version"
applies to it, you have the option of following the terms and
conditions either of that published version or of any later version
published by the Free Software Foundation. If the Library as you
received it does not specify a version number of the GNU Lesser
General Public License, you may choose any version of the GNU Lesser
General Public License ever published by the Free Software Foundation.
If the Library as you received it specifies that a proxy can decide
whether future versions of the GNU Lesser General Public License shall
apply, that proxy's public statement of acceptance of any version is
permanent authorization for you to choose that version for the
Library.
all:
rm -rf api
epydoc --output api --no-frames --graph=all \
--name="SNAKES is the Net Algebra Kit for Editors and Simulators" \
--navlink='<img alt="SNAKES logo" src="snakes-logo.jpg" width="120" height="120"/>' \
--no-private ../snakes
convert ../logo/snakes-logo.png -background white -scale 120x120 ./api/snakes-logo.jpg
This diff is collapsed. Click to expand it.
prod-cons.abcd
a simple producer/consumer
railroad.abcd
a (not so) simple railroad crossing system
railroad.py
checks basic property of railroad.abcd
ns/ns.abcd
Needham-Schroeder public key authentication protocol
ns/ns.py
checks mutual authentication
This diff is collapsed. Click to expand it.
# communication network
buffer nw : object = ()
# implementation of nonces and Dolev-Yao attacker
from dolev_yao import *
net Alice (this, who: buffer) :
# protocol initiater
buffer peer : int = ()
buffer peer_nonce : Nonce = ()
[who?(B), peer+(B), nw+("crypt", ("pub", B), this, Nonce(this))]
; [nw-("crypt", ("pub", this), Na, Nb), peer_nonce+(Nb) if Na == Nonce(this)]
; [peer?(B), peer_nonce?(Nb), nw+("crypt", ("pub", B), Nb)]
net Bob (this) :
# protocol responder
buffer peer : int = ()
buffer peer_nonce : Nonce = ()
[nw-("crypt", ("pub", this), A, Na), peer+(A), peer_nonce+(Na)]
; [peer?(A), peer_nonce?(Na), nw+("crypt", ("pub", A), Na, Nonce(this))]
; [nw-("crypt", ("pub", this), Nb) if Nb == Nonce(this)]
net Mallory (this, init) :
# attacker
buffer knowledge : object = (this, Nonce(this), ("priv", this)) + init
# Dolev-Yao attacker, bound by protocol signature
buffer spy : object = Spy(("crypt", ("pub", int), int, Nonce),
("crypt", ("pub", int), Nonce, Nonce),
("crypt", ("pub", int), Nonce))
# capture on message and learn from it
([spy?(s), nw-(m), knowledge>>(k), knowledge<<(s.learn(m, k))]
# loose message or inject another one (may be the same)
; ([True] + [spy?(s), knowledge?(x), nw+(x) if s.message(x)]))
* [False]
# Alice will contact one of these agents
buffer agents : int = 2, 3
# main processes, with friendly names
alice::Alice(1, agents)
| bob::Bob(2)
| spy::Mallory(3, (1, ("pub", 1), 2, ("pub", 2)))
import snakes.plugins
snakes.plugins.load("status", "snakes.nets", "nets")
from nets import *
from dolev_yao import Nonce
ns = loads(",ns.pnml")
states = StateGraph(ns)
for s in states :
m = states.net.get_marking()
# skip non final markings
if "bob.x" not in m or "alice.x" not in m :
continue
# get Alice's and Bob's peers ids
bp = list(m["bob.peer"])[0]
ap = list(m["alice.peer"])[0]
# violation of mutual authentication
if bp == 1 and ap != 2 :
print(s, "A(1) <=> %s ; B(2) <=> %s" % (ap, bp))
print(m)
print(len(states), "states")
# get BlackToken
#from snakes.nets import *
buffer fork1 : BlackToken = dot
buffer fork2 : BlackToken = dot
buffer fork3 : BlackToken = dot
# buffer parameters have to be declared as such
net philo (left: buffer, right: buffer):
buffer eating : BlackToken = ()
([left-(dot), right-(dot), eating+(dot)]
; [left+(dot), right+(dot), eating-(dot)])
* [False]
philo(fork1, fork2)
| philo(fork2, fork3)
| philo(fork3, fork1)
# shared buffer between producers and consumers
buffer bag : int = ()
net prod () :
# produces 10 tokens: 1..9 in bag
buffer count : int = 0
[count-(x), count+(x+1), bag+(x) if x < 10] * [count-(x) if x == 10]
net odd () :
# consumes odd tokens in bag
[bag-(x) if (x % 2) == 1] * [False]
net even () :
# consumes even tokens un bag
[bag-(x) if (x % 2) == 0] * [False]
# main process with one instance of each net
odd() | even() | prod()
# symbols
symbol RED, GREEN, UP, DOWN, OPEN, MOVING, CLOSED
# states of the gate
typedef gatestate : enum(OPEN, MOVING, CLOSED)
# stores green light state
buffer light : enum(RED, GREEN) = GREEN
# commands send by the track to the gate
buffer command : enum(UP, DOWN) = ()
net gate () :
# a pair of gates
buffer state : gatestate = OPEN
([command-(DOWN), state-(OPEN), state+(MOVING)] ;
[state-(MOVING), state+(CLOSED), light-(RED), light+(GREEN)] ;
[command-(UP), state-(CLOSED), state+(MOVING)] ;
[state-(MOVING), state+(OPEN)])
* [False]
net track () :
# a track with trains passing on it
buffer crossing : bool = False
([command+(DOWN), light-(GREEN), light+(RED)] ;
[light?(GREEN), crossing-(False), crossing+(True)] ;
[crossing-(True), crossing+(False), command+(UP)])
* [False]
# main process
gate() | track()
import sys
import snakes.plugins
snakes.plugins.load("gv", "snakes.nets", "snk")
from snk import *
n = loads(sys.argv[1])
g = StateGraph(n)
for s in g :
m = g.net.get_marking()
# safety property: train present => gates closed
if ("train().crossing" in m
and True in m["train().crossing"]
and "closed" not in m["gate().state"]) :
print("%s %s" % (s, m))
print("checked %s states" % len(g))
g.draw(sys.argv[1].rsplit(".", 1)[0] + "-states.png")
This diff is collapsed. Click to expand it.
<?xml version="1.0" encoding="utf-8"?>
<pnml>
<net id="mynet">
<place id="p2">
<type domain="universal"/>
<initialMarking>
<multiset/>
</initialMarking>
</place>
<place id="p1">
<type domain="universal"/>
<initialMarking>
<multiset>
<item>
<value>
<object type="int">
1
</object>
</value>
<multiplicity>
1
</multiplicity>
</item>
<item>
<value>
<object type="int">
2
</object>
</value>
<multiplicity>
1
</multiplicity>
</item>
</multiset>
</initialMarking>
</place>
<transition id="t"/>
<arc id="p1:t" source="p1" target="t">
<inscription>
<variable>
x
</variable>
</inscription>
</arc>
<arc id="t:p2" source="t" target="p2">
<inscription>
<expression>
x+1
</expression>
</inscription>
</arc>
</net>
</pnml>
<?xml version="1.0" encoding="utf-8"?>
<pnml>
<net id="Simple P/T net">
<place id="p2">
<initialMarking>
<text>
0
</text>
</initialMarking>
</place>
<place id="p1">
<initialMarking>
<text>
1
</text>
</initialMarking>
</place>
<transition id="t2"/>
<transition id="t1"/>
<arc id="p2:t2" source="p2" target="t2">
<inscription>
<text>
1
</text>
</inscription>
</arc>
<arc id="t2:p1" source="t2" target="p1">
<inscription>
<text>
1
</text>
</inscription>
</arc>
<arc id="p1:t1" source="p1" target="t1">
<inscription>
<text>
1
</text>
</inscription>
</arc>
<arc id="t1:p2" source="t1" target="p2">
<inscription>
<text>
1
</text>
</inscription>
</arc>
</net>
</pnml>
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
import os, os.path
snakes_version = open("VERSION").readline().strip()
package_version = open("debian/VERSION").readline().strip()
ppa_version = open("debian/PPA").readline().strip()
changelog_version = open("debian/changelog").readline().split()[1].strip("()")
distribs = [l.strip().split()[0] for l in open("debian/DISTRIB")]
base_dir = os.getcwd()
base_dist_dir = "dist"
dput_sh = open("dput.sh", "w")
def system (command) :
print("*** %s" % command)
retcode = os.system(command)
if retcode != 0 :
print("*** error return status (%s)" % retcode)
sys.exit(retcode)
def chdir (path) :
print("*** cd %s" % path)
os.chdir(path)
def changelog (path, dist) :
full_version = "%s-%s~ppa%s~%s1" % (snakes_version, package_version,
ppa_version, dist)
chdir(path)
system("debchange -b --newversion %s --distribution %s 'see NEWS'"
% (full_version, dist))
chdir(base_dir)
def build_package (dist_dir, dist) :
full_version = "%s-%s~ppa%s~%s1" % (snakes_version, package_version,
ppa_version, dist)
deb_dir = os.path.join(dist_dir, "python-snakes_%s" % full_version)
if not os.path.isdir(dist_dir) :
print("*** make dir %r" % dist_dir)
os.makedirs(dist_dir)
if os.path.isdir(deb_dir) :
system("rm -rf %s" % deb_dir)
system("hg archive %s" % deb_dir)
changelog(deb_dir, dist)
system("sed -i -e 's/DATE/$(date -R)/' %s/debian/copyright" % deb_dir)
system("sed -i -e 's/UNRELEASED/%s/' %s/debian/changelog" % (dist, deb_dir))
chdir(deb_dir)
system("make doc")
system("dpkg-buildpackage")
system("dpkg-buildpackage -S -sa")
chdir(base_dir)
dput_sh.write("dput lp %s_source.changes\n" % deb_dir)
main_version = "%s-%s" % (snakes_version, package_version)
if main_version != changelog_version :
system("debchange --newversion %s --distribution UNRELEASED 'see NEWS'"
% main_version)
system("hg commit -m 'updated debian/changelog' debian/changelog")
for dist in distribs :
build_package(base_dist_dir, dist)
dput_sh.close()
import glob, os, os.path
for src in glob.glob("snakes/lang/*/*.pgen") :
tgt = os.path.join(os.path.dirname(src), "pgen.py")
if not os.path.isfile(tgt) or os.path.getmtime(src) > os.path.getmtime(tgt) :
print("python snakes/lang/pgen.py --output=%s %s" % (tgt, src))
os.system("python snakes/lang/pgen.py --output=%s %s" % (tgt, src))
for src in glob.glob("snakes/lang/*/*.asdl") :
tgt = os.path.join(os.path.dirname(src), "asdl.py")
if not os.path.isfile(tgt) or os.path.getmtime(src) > os.path.getmtime(tgt) :
print("python snakes/lang/asdl.py --output=%s %s" % (tgt, src))
os.system("python snakes/lang/asdl.py --output=%s %s" % (tgt, src))
#!/usr/bin/env python
import sys, os
from distutils.core import setup
def doc_files() :
import os, os.path
result = {}
for root, dirs, files in os.walk("doc") :
target_dir = os.path.join("share/doc/python-snakes",
*root.split(os.sep)[1:])
for name in files :
if target_dir not in result :
result[target_dir] = []
result[target_dir].append(os.path.join(root, name))
return list(result.items())
if __name__ == "__main__" :
print("Compiling Emacs files...")
os.system("emacs -batch -f batch-byte-compile utils/abcd-mode.el")
#
setup(name="SNAKES",
version=open("VERSION").read().strip(),
description="SNAKES is the Net Algebra Kit for Editors and Simulators",
long_description="""SNAKES is a general purpose Petri net Python
library allowing to define and execute most classes of Petri
nets. It features a plugin system to allow its extension. In
particular, plugins are provided to implement the operations
usually found in the PBC and M-nets family.""",
author="Franck Pommereau",
author_email="pommereau@univ-paris12.fr",
maintainer="Franck Pommereau",
maintainer_email="pommereau@univ-paris12.fr",
url="http://lacl.univ-paris12.fr/pommereau/soft/snakes",
scripts=["bin/abcd",
"bin/snkc",
"bin/snkd",
],
packages=["snakes",
"snakes.lang",
"snakes.lang.pylib",
"snakes.lang.python",
"snakes.lang.abcd",
"snakes.lang.ctlstar",
"snakes.plugins",
"snakes.utils",
"snakes.utils.abcd",
"snakes.utils.ctlstar",
],
data_files=(doc_files()
+ [("share/emacs/site-lisp", ["utils/abcd-mode.el",
"utils/abcd-mode.elc"])]),
)
"""SNAKES is the Net Algebra Kit for Editors and Simulators
@author: Franck Pommereau
@organization: University of Paris 12
@copyright: (C) 2005 Franck Pommereau
@license: GNU Lesser General Public Licence (aka. GNU LGPL),
see the file C{doc/COPYING} in the distribution or visit U{the GNU
web site<http://www.gnu.org/licenses/licenses.html#LGPL>}
@contact: pommereau@univ-paris12.fr
SNAKES is a Python library allowing to model all sorts of Petri nets
and to execute them. It is very general as most Petri nets annotations
can be arbitrary Python expressions while most values can be arbitrary
Python objects.
SNAKES can be further extended with plugins, several ones being
already provided, in particular two plugins implement the Petri nets
compositions defined for the Petri Box Calculus and its successors.
"""
version = "0.9.16"
defaultencoding = "utf-8"
class SnakesError (Exception) :
"An error in SNAKES"
pass
class ConstraintError (SnakesError) :
"Violation of a constraint"
pass
class NodeError (SnakesError) :
"Error related to a place or a transition"
pass
class DomainError (SnakesError) :
"Function applied out of its domain"
pass
class ModeError (SnakesError) :
"The modes of a transition cannot be found"
pass
class PluginError (SnakesError) :
"Error when adding a plugin"
pass
class UnificationError (SnakesError) :
"Error while unifying parameters"
pass
"""Python 2 and 3 compatibility layer
"""
import sys
try :
xrange
except NameError :
xrange = range
try :
reduce
except NameError :
from functools import reduce
try :
import StringIO as io
except ImportError :
import io
try :
next
except NameError :
def next (obj) :
return obj.next()
PY3 = sys.version > "3"
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
import sys
if sys.version_info[:2] in ((2, 6), (2, 7)) :
import ast
elif sys.version_info[0] == 3 :
import ast
elif hasattr(sys, "pypy_version_info") :
import astpypy as ast
elif hasattr(sys, "JYTHON_JAR") :
import astjy25 as ast
elif sys.version_info[:2] == (2, 5) :
import astpy25 as ast
else :
raise NotImplementedError("unsupported Python version")
sys.modules["snkast"] = ast
from . import unparse as _unparse
from snakes.compat import *
class Names (ast.NodeVisitor) :
def __init__ (self) :
ast.NodeVisitor.__init__(self)
self.names = set()
def visit_Name (self, node) :
self.names.add(node.id)
def getvars (expr) :
"""
>>> list(sorted(getvars('x+y<z')))
['x', 'y', 'z']
>>> list(sorted(getvars('x+y<z+f(3,t)')))
['f', 't', 'x', 'y', 'z']
"""
names = Names()
names.visit(ast.parse(expr))
return names.names - set(['None', 'True', 'False'])
class Unparser(_unparse.Unparser) :
boolops = {"And": 'and', "Or": 'or'}
def _Interactive (self, tree) :
for stmt in tree.body :
self.dispatch(stmt)
def _Expression (self, tree) :
self.dispatch(tree.body)
def _ClassDef(self, tree):
self.write("\n")
for deco in tree.decorator_list:
self.fill("@")
self.dispatch(deco)
self.fill("class "+tree.name)
if tree.bases:
self.write("(")
for a in tree.bases:
self.dispatch(a)
self.write(", ")
self.write(")")
self.enter()
self.dispatch(tree.body)
self.leave()
def unparse (st) :
output = io.StringIO()
Unparser(st, output)
return output.getvalue().strip()
class Renamer (ast.NodeTransformer) :
def __init__ (self, map_names) :
ast.NodeTransformer.__init__(self)
self.map = [map_names]
def visit_ListComp (self, node) :
bind = self.map[-1].copy()
for comp in node.generators :
for name in getvars(comp.target) :
if name in bind :
del bind[name]
self.map.append(bind)
node.elt = self.visit(node.elt)
self.map.pop(-1)
return node
def visit_SetComp (self, node) :
return self.visit_ListComp(node)
def visit_DictComp (self, node) :
bind = self.map[-1].copy()
for comp in node.generators :
for name in getvars(comp.target) :
if name in bind :
del bind[name]
self.map.append(bind)
node.key = self.visit(node.key)
node.value = self.visit(node.value)
self.map.pop(-1)
return node
def visit_Name (self, node) :
return ast.copy_location(ast.Name(id=self.map[-1].get(node.id,
node.id),
ctx=ast.Load()), node)
def rename (expr, map={}, **ren) :
"""
>>> rename('x+y<z', x='t')
'((t + y) < z)'
>>> rename('x+y<z+f(3,t)', f='g', t='z', z='t')
'((x + y) < (t + g(3, z)))'
>>> rename('[x+y for x in range(3)]', x='z')
'[(x + y) for x in range(3)]'
>>> rename('[x+y for x in range(3)]', y='z')
'[(x + z) for x in range(3)]'
"""
map_names = dict(map)
map_names.update(ren)
transf = Renamer(map_names)
return unparse(transf.visit(ast.parse(expr)))
class Binder (Renamer) :
def visit_Name (self, node) :
if node.id in self.map[-1] :
return self.map[-1][node.id]
else :
return node
def bind (expr, map={}, **ren) :
"""
>>> bind('x+y<z', x=ast.Num(n=2))
'((2 + y) < z)'
>>> bind('x+y<z', y=ast.Num(n=2))
'((x + 2) < z)'
>>> bind('[x+y for x in range(3)]', x=ast.Num(n=2))
'[(x + y) for x in range(3)]'
>>> bind('[x+y for x in range(3)]', y=ast.Num(n=2))
'[(x + 2) for x in range(3)]'
"""
map_names = dict(map)
map_names.update(ren)
transf = Binder(map_names)
return unparse(transf.visit(ast.parse(expr)))
if __name__ == "__main__" :
import doctest
doctest.testmod()
module ABCD version "$Revision: 1 $"
{
abcd = AbcdSpec(decl* context, process body, expr* asserts)
decl = AbcdTypedef(identifier name, abcdtype type)
| AbcdBuffer(identifier name, abcdtype type,
Slice? capacity, expr content)
| AbcdSymbol(identifier* symbols)
| AbcdConst(identifier name, expr value)
| AbcdNet(identifier name, arguments args, AbcdSpec body)
| AbcdTask(identifier name, AbcdSpec body,
abcdtype* input, ancdtype* output)
| stmt -- this too much, but does not harm
attributes (int lineno, int col_offset)
process = AbcdAction(access* accesses, object guard)
| AbcdFlowOp(process left, flowop op, process right)
| AbcdInstance(identifier net, identifier? asname, expr* args,
keyword* keywords, expr? starargs, expr? kwargs)
attributes (int lineno, int col_offset)
flowop = Sequence | Choice | Parallel | Loop
access = SimpleAccess(identifier buffer, arc arc, expr tokens)
| FlushAccess(identifier buffer, identifier target)
| SwapAccess(identifier buffer, expr target, expr tokens)
| Spawn(identifier net, expr pid, expr args)
| Wait(identifier net, expr pid, expr args)
| Suspend(identifier net, expr pid)
| Resume(identifier net, expr pid)
attributes (int lineno, int col_offset)
arc = Produce | Test | Consume | Fill
abcdtype = UnionType(abcdtype* types)
| IntersectionType(abcdtype* types)
| CrossType(abcdtype* types)
| ListType(abcdtype items)
| TupleType(abcdtype items)
| SetType(abcdtype items)
| DictType(abcdtype keys, abcdtype values)
| EnumType(expr* items)
| NamedType(identifier name)
attributes (int lineno, int col_offset)
--------------------------------------------------------------
-- the rest is copied from "snakes/lang/python/python.asdl" --
--------------------------------------------------------------
stmt = FunctionDef(identifier name, arguments args,
stmt* body, expr* decorator_list, expr? returns)
| ClassDef(identifier name,
expr* bases,
keyword* keywords,
expr? starargs,
expr? kwargs,
stmt* body,
expr *decorator_list)
| Return(expr? value)
| Delete(expr* targets)
| Assign(expr* targets, expr value)
| AugAssign(expr target, operator op, expr value)
| For(expr target, expr iter, stmt* body, stmt* orelse)
| While(expr test, stmt* body, stmt* orelse)
| If(expr test, stmt* body, stmt* orelse)
| With(expr context_expr, expr? optional_vars, stmt* body)
| Raise(expr? exc, expr? cause)
| TryExcept(stmt* body, excepthandler* handlers, stmt* orelse)
| TryFinally(stmt* body, stmt* finalbody)
| Assert(expr test, expr? msg)
| Import(alias* names)
| ImportFrom(identifier module, alias* names, int? level)
| Exec(expr body, expr? globals, expr? locals)
| Global(identifier* names)
| Nonlocal(identifier* names)
| Expr(expr value)
| Pass | Break | Continue
attributes (int lineno, int col_offset)
expr = BoolOp(boolop op, expr* values)
| BinOp(expr left, operator op, expr right)
| UnaryOp(unaryop op, expr operand)
| Lambda(arguments args, expr body)
| IfExp(expr test, expr body, expr orelse)
| Dict(expr* keys, expr* values)
| Set(expr* elts)
| ListComp(expr elt, comprehension* generators)
| SetComp(expr elt, comprehension* generators)
| DictComp(expr key, expr value, comprehension* generators)
| GeneratorExp(expr elt, comprehension* generators)
| Yield(expr? value)
| Compare(expr left, cmpop* ops, expr* comparators)
| Call(expr func, expr* args, keyword* keywords,
expr? starargs, expr? kwargs)
| Num(object n)
| Str(string s)
| Ellipsis
| Attribute(expr value, identifier attr, expr_context ctx)
| Subscript(expr value, slice slice, expr_context ctx)
| Starred(expr value, expr_context ctx)
| Name(identifier id, expr_context ctx)
| List(expr* elts, expr_context ctx)
| Tuple(expr* elts, expr_context ctx)
attributes (int lineno, int col_offset)
expr_context = Load | Store | Del | AugLoad | AugStore | Param
slice = Slice(expr? lower, expr? upper, expr? step)
| ExtSlice(slice* dims)
| Index(expr value)
boolop = And | Or
operator = Add | Sub | Mult | Div | Mod | Pow | LShift
| RShift | BitOr | BitXor | BitAnd | FloorDiv
unaryop = Invert | Not | UAdd | USub
cmpop = Eq | NotEq | Lt | LtE | Gt | GtE | Is | IsNot | In | NotIn
comprehension = (expr target, expr iter, expr* ifs)
excepthandler = ExceptHandler(expr? type, identifier? name, stmt* body)
attributes (int lineno, int col_offset)
arguments = (arg* args, identifier? vararg, expr? varargannotation,
arg* kwonlyargs, identifier? kwarg,
expr? kwargannotation, expr* defaults,
expr* kw_defaults)
arg = (identifier arg, expr? annotation)
keyword = (identifier arg, expr value)
alias = (identifier name, identifier? asname)
}
# Grammar for ABCD
# new tokens
$QUESTION '?'
$ELLIPSIS '...'
file_input: abcd_main ENDMARKER
abcd_main: (NEWLINE | abcd_global)* abcd_expr abcd_prop*
abcd_global: import_stmt | abcd_symbol | abcd_typedef | abcd_const | abcd_decl
abcd_spec: (NEWLINE | abcd_decl)* abcd_expr
abcd_decl: abcd_net | abcd_task | abcd_buffer
abcd_const: 'const' NAME '=' testlist
abcd_symbol: 'symbol' abcd_namelist
abcd_typedef: 'typedef' NAME ':' abcd_type
abcd_net: 'net' NAME parameters ':' abcd_suite
abcd_task: 'task' NAME typelist '-' '>' typelist ':' abcd_suite
abcd_suite: abcd_expr | NEWLINE INDENT abcd_spec DEDENT
abcd_buffer: [ decorators ] 'buffer' NAME ['[' test ']'] ':' abcd_type '=' testlist
abcd_namelist: NAME (',' NAME)*
typelist: '(' [abcd_type (',' abcd_type)*] ')'
abcd_type: abcd_and_type ('|' abcd_and_type)*
abcd_and_type: abcd_cross_type ('&' abcd_cross_type)*
abcd_cross_type: abcd_base_type ('*' abcd_base_type)*
abcd_base_type: (NAME ['(' abcd_type (',' abcd_type)* ')']
| 'enum' '(' test (',' test)* ')' | '(' abcd_type ')')
abcd_expr: abcd_choice_expr ('|' abcd_choice_expr)*
abcd_choice_expr: abcd_iter_expr ('+' abcd_iter_expr)*
abcd_iter_expr: abcd_seq_expr ('*' abcd_seq_expr)*
abcd_seq_expr: abcd_base_expr (';' abcd_base_expr)*
abcd_base_expr: (abcd_action | '(' abcd_expr ')') (NEWLINE)*
abcd_action: ('[' 'True' ']' |
'[' 'False' ']' |
'[' abcd_access_list ['if' test] ']' |
abcd_instance)
abcd_access_list: abcd_access (',' abcd_access)*
abcd_access: (NAME '+' '(' testlist ')' |
NAME '?' '(' testlist ')' |
NAME '-' '(' testlist ')' |
NAME '<>' '(' testlist '=' testlist ')' |
NAME '>>' '(' NAME ')' |
NAME '<<' '(' testlist_comp ')' |
NAME '.' NAME '(' test (',' test)* ')')
abcd_instance: [NAME ':' ':'] NAME '(' [arglist] ')'
tfpdef: NAME [':' ('net' | 'buffer' | 'task')]
abcd_prop: 'assert' test (NEWLINE)*
#
# the rest is from SNAKES/Python grammar
#
decorator: '@' dotted_name [ '(' [arglist] ')' ] NEWLINE
decorators: decorator+
decorated: decorators (classdef | funcdef)
funcdef: 'def' NAME parameters ['-' '>' test] ':' suite
parameters: '(' [typedargslist] ')'
typedargslist: ((tfpdef ['=' test] ',')*
('*' [tfpdef] (',' tfpdef ['=' test])* [',' '**' tfpdef]
| '**' tfpdef)
| tfpdef ['=' test] (',' tfpdef ['=' test])* [','])
varargslist: ((vfpdef ['=' test] ',')*
('*' [vfpdef] (',' vfpdef ['=' test])* [',' '**' vfpdef]
| '**' vfpdef)
| vfpdef ['=' test] (',' vfpdef ['=' test])* [','])
vfpdef: NAME
stmt: simple_stmt | compound_stmt
simple_stmt: small_stmt (';' small_stmt)* [';'] NEWLINE
small_stmt: (expr_stmt | del_stmt | pass_stmt | flow_stmt |
import_stmt | global_stmt | nonlocal_stmt | assert_stmt)
expr_stmt: testlist (augassign (yield_expr|testlist) |
('=' (yield_expr|testlist))*)
augassign: ('+=' | '-=' | '*=' | '/=' | '%=' | '&=' | '|=' | '^=' |
'<<=' | '>>=' | '**=' | '//=')
del_stmt: 'del' exprlist
pass_stmt: 'pass'
flow_stmt: break_stmt | continue_stmt | return_stmt | raise_stmt | yield_stmt
break_stmt: 'break'
continue_stmt: 'continue'
return_stmt: 'return' [testlist]
yield_stmt: yield_expr
raise_stmt: 'raise' [test ['from' test]]
import_stmt: import_name | import_from
import_name: 'import' dotted_as_names
import_from: ('from' (('.' | '...')* dotted_name | ('.' | '...')+)
'import' ('*' | '(' import_as_names ')' | import_as_names))
import_as_name: NAME ['as' NAME]
dotted_as_name: dotted_name ['as' NAME]
import_as_names: import_as_name (',' import_as_name)* [',']
dotted_as_names: dotted_as_name (',' dotted_as_name)*
dotted_name: NAME ('.' NAME)*
global_stmt: 'global' NAME (',' NAME)*
nonlocal_stmt: 'nonlocal' NAME (',' NAME)*
assert_stmt: 'assert' test [',' test]
compound_stmt: (if_stmt | while_stmt | for_stmt | try_stmt | with_stmt
| funcdef | classdef | decorated)
if_stmt: 'if' test ':' suite ('elif' test ':' suite)* ['else' ':' suite]
while_stmt: 'while' test ':' suite ['else' ':' suite]
for_stmt: 'for' exprlist 'in' testlist ':' suite ['else' ':' suite]
try_stmt: ('try' ':' suite
((except_clause ':' suite)+
['else' ':' suite]
['finally' ':' suite] |
'finally' ':' suite))
with_stmt: 'with' test [ with_var ] ':' suite
with_var: 'as' expr
except_clause: 'except' [test ['as' NAME]]
suite: simple_stmt | NEWLINE INDENT stmt+ DEDENT
test: or_test ['if' or_test 'else' test] | lambdef
test_nocond: or_test | lambdef_nocond
lambdef: 'lambda' [varargslist] ':' test
lambdef_nocond: 'lambda' [varargslist] ':' test_nocond
or_test: and_test ('or' and_test)*
and_test: not_test ('and' not_test)*
not_test: 'not' not_test | comparison
comparison: star_expr (comp_op star_expr)*
comp_op: '<'|'>'|'=='|'>='|'<='|'!='|'<>'|'in'|'not' 'in'|'is'|'is' 'not'
star_expr: ['*'] expr
expr: xor_expr ('|' xor_expr)*
xor_expr: and_expr ('^' and_expr)*
and_expr: shift_expr ('&' shift_expr)*
shift_expr: arith_expr (('<<'|'>>') arith_expr)*
arith_expr: term (('+'|'-') term)*
term: factor (('*'|'/'|'%'|'//') factor)*
factor: ('+'|'-'|'~') factor | power
power: atom trailer* ['**' factor]
atom: ('(' [yield_expr|testlist_comp] ')' |
'[' [testlist_comp] ']' |
'{' [dictorsetmaker] '}' |
NAME | NUMBER | STRING+ | '...' | 'None' | 'True' | 'False')
testlist_comp: test ( comp_for | (',' test)* [','] )
trailer: '(' [arglist] ')' | '[' subscriptlist ']' | '.' NAME
subscriptlist: subscript (',' subscript)* [',']
subscript: test | [test] ':' [test] [sliceop]
sliceop: ':' [test]
exprlist: star_expr (',' star_expr)* [',']
testlist: test (',' test)* [',']
dictorsetmaker: ( (test ':' test (comp_for | (',' test ':' test)* [','])) |
(test (comp_for | (',' test)* [','])) )
classdef: 'class' NAME ['(' [arglist] ')'] ':' suite
arglist: (argument ',')* (argument [',']
|'*' test (',' argument)* [',' '**' test]
|'**' test)
argument: test [comp_for] | test '=' test
comp_iter: comp_for | comp_if
comp_for: 'for' exprlist 'in' or_test [comp_iter]
comp_if: 'if' test_nocond [comp_iter]
yield_expr: 'yield' [testlist]
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff could not be displayed because it is too large.
import sys, datetime
from snakes.lang.pylib import asdl
from collections import defaultdict
from functools import partial
class memoize(object):
def __init__(self, function):
self.function = function
self.memoized = {}
def __call__(self, *args):
try:
return self.memoized[args]
except KeyError:
call = self.function(*args)
self.memoized[args] = call
return call
def __get__(self, obj, objtype):
"""Support instance methods."""
return partial(self.__call__, obj)
def component_has_cycle(node, graph, proceeding, visited):
if node in visited:
return False
if node in proceeding:
proceeding.append(node) # populate trace
return True
proceeding.append(node)
if node in graph:
for successor in graph[node]:
if component_has_cycle(successor, graph, proceeding, visited):
return True
proceeding.remove(node)
visited.add(node)
return False
def has_cycle(graph):
visited = set()
proceeding = list()
todo = set(graph.keys())
while todo:
node = todo.pop()
if component_has_cycle(node, graph, proceeding, visited):
i = proceeding.index(proceeding[-1])
return proceeding[i:]
todo.difference_update(visited)
return []
class CyclicDependencies(Exception):
def __init__(self, seq):
self.seq = seq
def __str__(self):
return "cyclic dependencies: {}".format(" -> ".join(self.seq))
def remove_duplicates(l):
d = {}
nl = []
for e in l:
if not e in d:
d[e] = 1
nl.append(e)
return nl
class CodeGen (asdl.VisitorBase) :
def __init__(self, node):
asdl.VisitorBase.__init__(self)
self.starting_node = None
self.current_node = None
self.hierarchy = defaultdict(list)
self.hierarchy['_AST'] = []
self.fields = defaultdict(list)
self.attributes = defaultdict(list)
self.code = defaultdict(list)
self.visit(node)
ret = has_cycle(self.hierarchy)
if ret:
raise CyclicDependencies(ret)
self._gen_code(node)
def visitModule(self, node):
for name, child in node.types.items():
if not self.starting_node:
self.starting_node = str(name)
self.current_node = str(name)
self.hierarchy[name]
self.visit(child)
def visitSum(self, node):
if hasattr(node, "fields"):
self.fields[self.current_node] = node.fields
else:
self.fields[self.current_node] = []
if hasattr(node, "attributes"):
self.attributes[self.current_node] = node.attributes
else:
self.attributes[self.current_node] = []
for child in node.types:
self.visit(child)
def visitConstructor (self, node):
if str(node.name) in self.fields:
#print >> sys.stderr, "constructor '{!s}' appears twice !".format(node.name)
#exit(0)
return
self.fields[str(node.name)].extend(node.fields)
self.hierarchy[str(node.name)].append(self.current_node)
def visitProduct(self, node):
self.fields[self.current_node].extend(node.fields)
@memoize
def _get_fields(self, name):
if self.fields.has_key(name):
fields = map(lambda f : f.name, self.fields[name])
for parent in self.hierarchy[name]:
fields.extend(self._get_fields(parent))
return fields
else:
return []
@memoize
def _get_attributes(self, name):
if name == '_AST':
return []
attributes = map(lambda a : a.name, self.attributes[name])
for parent in self.hierarchy[name]:
attributes.extend(self._get_attributes(parent))
return attributes
def _gen_code(self, node):
is_methods = []
for name in sorted(self.hierarchy):
if name != '_AST':
is_methods.extend(["",
"def is{!s}(self):".format(name),
["return False"]
])
cls = ["class _AST (ast.AST):",
["_fields = ()",
"_attributes = ()",
"",
"def __init__ (self, **ARGS):",
["ast.AST.__init__(self)",
"for k, v in ARGS.items():",
["setattr(self, k, v)"]
]
] + is_methods
]
self.code['_AST'] = cls
for name, parents in self.hierarchy.iteritems():
if name == '_AST':
continue
if not parents:
parents = ['_AST']
fields = self.fields[name]
args = []
assign = []
body = []
_fields = remove_duplicates(self._get_fields(name))
_attributes = remove_duplicates(self._get_attributes(name))
body = []
cls = ["class {!s} ({!s}):".format(name, ", ".join(parents)), body]
non_default_args = []
default_args = []
for f in fields:
if f.name.value == 'ctx':
f.opt = True
if f.opt:
default_args.append("{!s}=None".format(f.name))
assign.append("self.{0!s} = {0!s}".format(f.name))
elif f.seq:
default_args.append("{!s}=[]".format(f.name))
assign.append("self.{0!s} = list({0!s})".format(f.name))
else:
non_default_args.append("{!s}".format(f.name))
assign.append("self.{0!s} = {0!s}".format(f.name))
args = non_default_args + default_args
body.append("_fields = {!r}".format( tuple(map(repr, _fields))))
body.append("_attributes = {!r}".format( tuple(map(repr, _attributes))))
body.append("")
# ctor
args_str = ", ".join(args)
if args_str != "":
args_str += ", "
body.append("def __init__ (self, {!s} **ARGS):".format(args_str))
ctor_body = []
body.append(ctor_body)
ctor_body.extend(map(lambda base : "{!s}.__init__(self, **ARGS)".format(base), parents))
ctor_body.extend(assign)
body.extend(["", "def is{}(self):".format(name), ["return True"]])
self.code[name] = cls
@memoize
def _cost(self, name):
# print "call cost {}".format(name)
if name == '_AST':
return 0
parents = self.hierarchy[name]
return reduce(lambda acc, x: acc + self._cost(x), parents, 1)
@property
def python(self):
classes = self.hierarchy.keys()
classes.sort(lambda a, b: self._cost(a) - self._cost(b))
code = ["from snakes.lang import ast",
"from ast import *",
""]
for cls in classes:
code.extend(self.code[cls])
code.append("")
def python (code, indent) :
for line in code :
if isinstance(line, str) :
yield (4*indent) * " " + line
else :
for sub in python(line, indent+1) :
yield sub
return "\n".join(python(code, 0))
def compile_asdl(infilename, outfilename):
""" Helper function to compile asdl files. """
infile = open(infilename, 'r')
outfile = open(outfilename, 'w')
scanner = asdl.ASDLScanner()
parser = asdl.ASDLParser()
tokens = scanner.tokenize(infile.read())
node = parser.parse(tokens)
outfile.write(("# this file has been automatically generated running:\n"
"# %s\n# timestamp: %s\n\n") % (" ".join(sys.argv),
datetime.datetime.now()))
outfile.write(CodeGen(node).python)
outfile.close()
infile.close()
if __name__ == "__main__":
# a simple CLI
import getopt
outfile = sys.stdout
try :
opts, args = getopt.getopt(sys.argv[1:], "ho:",
["help", "output="])
if ("-h", "") in opts or ("--help", "") in opts :
opts = [("-h", "")]
args = [None]
elif not args :
raise getopt.GetoptError("no input file provided"
" (try -h to get help)")
elif len(args) > 1 :
raise getopt.GetoptError("more than one input file provided")
except getopt.GetoptError :
sys.stderr.write("%s: %s\n" % (__file__, sys.exc_info()[1]))
sys.exit(1)
for (flag, arg) in opts :
if flag in ("-h", "--help") :
print("""usage: %s [OPTIONS] INFILE
Options:
-h, --help print this help and exit
--output=OUTPUT set output file""" % __file__)
sys.exit(0)
elif flag in ("-o", "--output") :
outfile = open(arg, "w")
scanner = asdl.ASDLScanner()
parser = asdl.ASDLParser()
tokens = scanner.tokenize(open(args[0]).read())
node = parser.parse(tokens)
outfile.write(("# this file has been automatically generated running:\n"
"# %s\n# timestamp: %s\n\n") % (" ".join(sys.argv),
datetime.datetime.now()))
try:
outfile.write(CodeGen(node).python)
except CyclicDependencies as cycle:
msg = "[E] {!s}".format(cycle)
outfile.write(msg)
if outfile != sys.stdout:
print >> sys.stderr, msg
outfile.close()
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
module CTLstar version "$Revision: 1 $"
{
ctlstar = Spec(ctldecl* atoms, ctldecl* properties, form? main)
attributes (int lineno, int col_offset)
ctldecl = Atom(identifier name, ctlarg* args, ctlparams* params,
stmt* body)
| Property(identifier name, ctlargs* args,
ctlparams* params, form body)
attributes (int lineno, int col_offset)
ctlarg = Place(identifier name, string place)
| Token(identifier name, string place)
| Argument(identifier name, expr value, identifier type)
attributes (int lineno, int col_offset)
ctlparam = Parameter(identifier name, identifier type)
attributes (int lineno, int col_offset)
form = atom
| CtlUnary(ctlunary op, form child)
| CtlBinary(ctlbinary op, form left, form right)
attributes (int lineno, int col_offset)
ctlunary = notop | All | Exists | Next | Future | Globally
notop = Not
ctlbinary = boolop | Imply | Iff | Until | WeakUntil | Release
atom = InPlace(expr* data, ctlarg place)
| NotInPlace(expr* data, ctlarg place)
| EmptyPlace(ctlarg place)
| MarkedPlace(ctlarg place)
| Deadlock
| Boolean(bool val)
| Instance(identifier name, arg* args)
| Quantifier(ctlunary op,
identifier* vars,
ctlarg place,
form child,
bool distinct)
attributes (int lineno, int col_offset)
--------------------------------------------------------------
-- the rest is copied from "snakes/lang/python/python.asdl" --
--------------------------------------------------------------
stmt = FunctionDef(identifier name, arguments args,
stmt* body, expr* decorator_list, expr? returns)
| ClassDef(identifier name,
expr* bases,
keyword* keywords,
expr? starargs,
expr? kwargs,
stmt* body,
expr *decorator_list)
| Return(expr? value)
| Delete(expr* targets)
| Assign(expr* targets, expr value)
| AugAssign(expr target, operator op, expr value)
| For(expr target, expr iter, stmt* body, stmt* orelse)
| While(expr test, stmt* body, stmt* orelse)
| If(expr test, stmt* body, stmt* orelse)
| With(expr context_expr, expr? optional_vars, stmt* body)
| Raise(expr? exc, expr? cause)
| TryExcept(stmt* body, excepthandler* handlers, stmt* orelse)
| TryFinally(stmt* body, stmt* finalbody)
| Assert(expr test, expr? msg)
| Import(alias* names)
| ImportFrom(identifier module, alias* names, int? level)
| Exec(expr body, expr? globals, expr? locals)
| Global(identifier* names)
| Nonlocal(identifier* names)
| Expr(expr value)
| Pass | Break | Continue
attributes (int lineno, int col_offset)
expr = BoolOp(boolop op, expr* values)
| BinOp(expr left, operator op, expr right)
| UnaryOp(unaryop op, expr operand)
| Lambda(arguments args, expr body)
| IfExp(expr test, expr body, expr orelse)
| Dict(expr* keys, expr* values)
| Set(expr* elts)
| ListComp(expr elt, comprehension* generators)
| SetComp(expr elt, comprehension* generators)
| DictComp(expr key, expr value, comprehension* generators)
| GeneratorExp(expr elt, comprehension* generators)
| Yield(expr? value)
| Compare(expr left, cmpop* ops, expr* comparators)
| Call(expr func, expr* args, keyword* keywords,
expr? starargs, expr? kwargs)
| Num(object n)
| Str(string s)
| Ellipsis
| Attribute(expr value, identifier attr, expr_context ctx)
| Subscript(expr value, slice slice, expr_context ctx)
| Starred(expr value, expr_context ctx)
| Name(identifier id, expr_context ctx)
| List(expr* elts, expr_context ctx)
| Tuple(expr* elts, expr_context ctx)
attributes (int lineno, int col_offset)
expr_context = Load | Store | Del | AugLoad | AugStore | Param
slice = Slice(expr? lower, expr? upper, expr? step)
| ExtSlice(slice* dims)
| Index(expr value)
boolop = And | Or
operator = Add | Sub | Mult | Div | Mod | Pow | LShift
| RShift | BitOr | BitXor | BitAnd | FloorDiv
unaryop = Invert | notop | UAdd | USub
cmpop = Eq | NotEq | Lt | LtE | Gt | GtE | Is | IsNot | In | NotIn
comprehension = (expr target, expr iter, expr* ifs)
excepthandler = ExceptHandler(expr? type, identifier? name, stmt* body)
attributes (int lineno, int col_offset)
arguments = (arg* args, identifier? vararg, expr? varargannotation,
arg* kwonlyargs, identifier? kwarg,
expr? kwargannotation, expr* defaults,
expr* kw_defaults)
arg = (identifier arg, expr? annotation)
keyword = (identifier arg, expr value)
alias = (identifier name, identifier? asname)
}
# Grammar for (permissive) CTL*
# new tokens
$ELLIPSIS '...'
file_input: (NEWLINE | ctl_atomdef | ctl_propdef)* [ ctl_formula ] NEWLINE* ENDMARKER
ctl_atomdef: 'atom' NAME '(' [ctl_parameters] ')' ':' suite
ctl_propdef: 'prop' NAME '(' [ctl_parameters] ')' ':' ctl_suite
ctl_suite: ( ctl_formula NEWLINE
| NEWLINE INDENT ctl_formula NEWLINE+ DEDENT )
ctl_parameters: (ctl_param ',')* ctl_param
ctl_param: NAME ( '=' '@' STRING+ | ':' NAME )
ctl_formula: ctl_or_formula [ ctl_connector ctl_or_formula ]
ctl_connector: ( '=' '>' | '<=' '>' )
ctl_or_formula: ctl_and_formula ('or' ctl_and_formula)*
ctl_and_formula: ctl_not_formula ('and' ctl_not_formula)*
ctl_not_formula: ('not' ctl_not_formula | ctl_binary_formula)
ctl_binary_formula: ctl_unary_formula [ ctl_binary_op ctl_unary_formula ]
ctl_unary_formula: [ ctl_unary_op ] (ctl_atom_formula | '(' ctl_formula ')')
ctl_unary_op: ('A' | 'G' | 'F' | 'E' | 'X')
ctl_binary_op: ('R' | 'U' | 'W')
ctl_atom_formula: ( 'empty' '(' ctl_place ')'
| 'marked' '(' ctl_place ')'
| 'has' ['not'] '(' ctl_place ',' test (',' test)* ')'
| 'deadlock' | 'True' | 'False'
| NAME '(' ctl_arguments ')'
| 'forall' [ 'distinct' ] NAME (',' NAME)*
'in' ctl_place '(' ctl_atom_formula ')'
| 'exists' [ 'distinct' ] NAME (',' NAME)*
'in' ctl_place '(' ctl_atom_formula ')' )
ctl_arguments: (NAME '=' ctl_place_or_test ',')* NAME '=' ctl_place_or_test
ctl_place: '@' STRING+ | NAME
ctl_place_or_test: test | '@' STRING+
#
# the rest is from SNAKES/Python grammar
#
decorator: '@' dotted_name [ '(' [arglist] ')' ] NEWLINE
decorators: decorator+
decorated: decorators (classdef | funcdef)
funcdef: 'def' NAME parameters ['-' '>' test] ':' suite
parameters: '(' [typedargslist] ')'
typedargslist: ((tfpdef ['=' test] ',')*
('*' [tfpdef] (',' tfpdef ['=' test])* [',' '**' tfpdef]
| '**' tfpdef)
| tfpdef ['=' test] (',' tfpdef ['=' test])* [','])
tfpdef: NAME [':' test]
varargslist: ((vfpdef ['=' test] ',')*
('*' [vfpdef] (',' vfpdef ['=' test])* [',' '**' vfpdef]
| '**' vfpdef)
| vfpdef ['=' test] (',' vfpdef ['=' test])* [','])
vfpdef: NAME
stmt: simple_stmt | compound_stmt
simple_stmt: small_stmt (';' small_stmt)* [';'] NEWLINE
small_stmt: (expr_stmt | del_stmt | pass_stmt | flow_stmt |
import_stmt | global_stmt | nonlocal_stmt | assert_stmt)
expr_stmt: testlist (augassign (yield_expr|testlist) |
('=' (yield_expr|testlist))*)
augassign: ('+=' | '-=' | '*=' | '/=' | '%=' | '&=' | '|=' | '^=' |
'<<=' | '>>=' | '**=' | '//=')
del_stmt: 'del' exprlist
pass_stmt: 'pass'
flow_stmt: break_stmt | continue_stmt | return_stmt | raise_stmt | yield_stmt
break_stmt: 'break'
continue_stmt: 'continue'
return_stmt: 'return' [testlist]
yield_stmt: yield_expr
raise_stmt: 'raise' [test ['from' test]]
import_stmt: import_name | import_from
import_name: 'import' dotted_as_names
import_from: ('from' (('.' | '...')* dotted_name | ('.' | '...')+)
'import' ('*' | '(' import_as_names ')' | import_as_names))
import_as_name: NAME ['as' NAME]
dotted_as_name: dotted_name ['as' NAME]
import_as_names: import_as_name (',' import_as_name)* [',']
dotted_as_names: dotted_as_name (',' dotted_as_name)*
dotted_name: NAME ('.' NAME)*
global_stmt: 'global' NAME (',' NAME)*
nonlocal_stmt: 'nonlocal' NAME (',' NAME)*
assert_stmt: 'assert' test [',' test]
compound_stmt: (if_stmt | while_stmt | for_stmt | try_stmt | with_stmt
| funcdef | classdef | decorated)
if_stmt: 'if' test ':' suite ('elif' test ':' suite)* ['else' ':' suite]
while_stmt: 'while' test ':' suite ['else' ':' suite]
for_stmt: 'for' exprlist 'in' testlist ':' suite ['else' ':' suite]
try_stmt: ('try' ':' suite
((except_clause ':' suite)+
['else' ':' suite]
['finally' ':' suite] |
'finally' ':' suite))
with_stmt: 'with' test [ with_var ] ':' suite
with_var: 'as' expr
except_clause: 'except' [test ['as' NAME]]
suite: simple_stmt | NEWLINE INDENT stmt+ DEDENT
test: or_test ['if' or_test 'else' test] | lambdef
test_nocond: or_test | lambdef_nocond
lambdef: 'lambda' [varargslist] ':' test
lambdef_nocond: 'lambda' [varargslist] ':' test_nocond
or_test: and_test ('or' and_test)*
and_test: not_test ('and' not_test)*
not_test: 'not' not_test | comparison
comparison: star_expr (comp_op star_expr)*
comp_op: '<'|'>'|'=='|'>='|'<='|'!='|'<>'|'in'|'not' 'in'|'is'|'is' 'not'
star_expr: ['*'] expr
expr: xor_expr ('|' xor_expr)*
xor_expr: and_expr ('^' and_expr)*
and_expr: shift_expr ('&' shift_expr)*
shift_expr: arith_expr (('<<'|'>>') arith_expr)*
arith_expr: term (('+'|'-') term)*
term: factor (('*'|'/'|'%'|'//') factor)*
factor: ('+'|'-'|'~') factor | power
power: atom trailer* ['**' factor]
atom: ('(' [yield_expr|testlist_comp] ')' |
'[' [testlist_comp] ']' |
'{' [dictorsetmaker] '}' |
NAME | NUMBER | STRING+ | '...' | 'None' | 'True' | 'False')
testlist_comp: test ( comp_for | (',' test)* [','] )
trailer: '(' [arglist] ')' | '[' subscriptlist ']' | '.' NAME
subscriptlist: subscript (',' subscript)* [',']
subscript: test | [test] ':' [test] [sliceop]
sliceop: ':' [test]
exprlist: star_expr (',' star_expr)* [',']
testlist: test (',' test)* [',']
dictorsetmaker: ( (test ':' test (comp_for | (',' test ':' test)* [','])) |
(test (comp_for | (',' test)* [','])) )
classdef: 'class' NAME ['(' [arglist] ')'] ':' suite
arglist: (argument ',')* (argument [',']
|'*' test (',' argument)* [',' '**' test]
|'**' test)
argument: test [comp_for] | test '=' test
comp_iter: comp_for | comp_if
comp_for: 'for' exprlist 'in' or_test [comp_iter]
comp_if: 'if' test_nocond [comp_iter]
yield_expr: 'yield' [testlist]
This diff is collapsed. Click to expand it.
This diff could not be displayed because it is too large.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff could not be displayed because it is too large.
module Python version "$Revision: SNAKES $"
{
mod = Module(stmt* body)
| Interactive(stmt* body)
| Expression(expr body)
| Suite(stmt* body)
stmt = FunctionDef(identifier name, arguments args,
stmt* body, expr* decorator_list, expr? returns)
| ClassDef(identifier name,
expr* bases,
keyword* keywords,
expr? starargs,
expr? kwargs,
stmt* body,
expr *decorator_list)
| Return(expr? value)
| Delete(expr* targets)
| Assign(expr* targets, expr value)
| AugAssign(expr target, operator op, expr value)
| For(expr target, expr iter, stmt* body, stmt* orelse)
| While(expr test, stmt* body, stmt* orelse)
| If(expr test, stmt* body, stmt* orelse)
| With(expr context_expr, expr? optional_vars, stmt* body)
| Raise(expr? exc, expr? cause)
| TryExcept(stmt* body, excepthandler* handlers, stmt* orelse)
| TryFinally(stmt* body, stmt* finalbody)
| Assert(expr test, expr? msg)
| Import(alias* names)
| ImportFrom(identifier module, alias* names, int? level)
| Exec(expr body, expr? globals, expr? locals)
| Global(identifier* names)
| Nonlocal(identifier* names)
| Expr(expr value)
| Pass | Break | Continue
attributes (int lineno, int col_offset)
expr = BoolOp(boolop op, expr* values)
| BinOp(expr left, operator op, expr right)
| UnaryOp(unaryop op, expr operand)
| Lambda(arguments args, expr body)
| IfExp(expr test, expr body, expr orelse)
| Dict(expr* keys, expr* values)
| Set(expr* elts)
| ListComp(expr elt, comprehension* generators)
| SetComp(expr elt, comprehension* generators)
| DictComp(expr key, expr value, comprehension* generators)
| GeneratorExp(expr elt, comprehension* generators)
| Yield(expr? value)
| Compare(expr left, cmpop* ops, expr* comparators)
| Call(expr func, expr* args, keyword* keywords,
expr? starargs, expr? kwargs)
| Num(object n)
| Str(string s)
| Ellipsis
| Attribute(expr value, identifier attr, expr_context ctx)
| Subscript(expr value, slice slice, expr_context ctx)
| Starred(expr value, expr_context ctx)
| Name(identifier id, expr_context ctx)
| List(expr* elts, expr_context ctx)
| Tuple(expr* elts, expr_context ctx)
attributes (int lineno, int col_offset)
expr_context = Load | Store | Del | AugLoad | AugStore | Param
slice = Slice(expr? lower, expr? upper, expr? step)
| ExtSlice(slice* dims)
| Index(expr value)
boolop = And | Or
operator = Add | Sub | Mult | Div | Mod | Pow | LShift
| RShift | BitOr | BitXor | BitAnd | FloorDiv
unaryop = Invert | Not | UAdd | USub
cmpop = Eq | NotEq | Lt | LtE | Gt | GtE | Is | IsNot | In | NotIn
comprehension = (expr target, expr iter, expr* ifs)
excepthandler = ExceptHandler(expr? type, identifier? name, stmt* body)
attributes (int lineno, int col_offset)
arguments = (arg* args, identifier? vararg, expr? varargannotation,
arg* kwonlyargs, identifier? kwarg,
expr? kwargannotation, expr* defaults,
expr* kw_defaults)
arg = (identifier arg, expr? annotation)
keyword = (identifier arg, expr value)
alias = (identifier name, identifier? asname)
}
# Grammar for Python in SNAKES
# This is a mixture of the grammars from various Python versions
$ELLIPSIS '...'
file_input: (NEWLINE | stmt)* ENDMARKER
decorator: '@' dotted_name [ '(' [arglist] ')' ] NEWLINE
decorators: decorator+
decorated: decorators (classdef | funcdef)
funcdef: 'def' NAME parameters ['-' '>' test] ':' suite
parameters: '(' [typedargslist] ')'
typedargslist: ((tfpdef ['=' test] ',')*
('*' [tfpdef] (',' tfpdef ['=' test])* [',' '**' tfpdef]
| '**' tfpdef)
| tfpdef ['=' test] (',' tfpdef ['=' test])* [','])
tfpdef: NAME [':' test]
varargslist: ((vfpdef ['=' test] ',')*
('*' [vfpdef] (',' vfpdef ['=' test])* [',' '**' vfpdef]
| '**' vfpdef)
| vfpdef ['=' test] (',' vfpdef ['=' test])* [','])
vfpdef: NAME
stmt: simple_stmt | compound_stmt
simple_stmt: small_stmt (';' small_stmt)* [';'] NEWLINE
small_stmt: (expr_stmt | del_stmt | pass_stmt | flow_stmt |
import_stmt | global_stmt | nonlocal_stmt | assert_stmt)
expr_stmt: testlist (augassign (yield_expr|testlist) |
('=' (yield_expr|testlist))*)
augassign: ('+=' | '-=' | '*=' | '/=' | '%=' | '&=' | '|=' | '^=' |
'<<=' | '>>=' | '**=' | '//=')
del_stmt: 'del' exprlist
pass_stmt: 'pass'
flow_stmt: break_stmt | continue_stmt | return_stmt | raise_stmt | yield_stmt
break_stmt: 'break'
continue_stmt: 'continue'
return_stmt: 'return' [testlist]
yield_stmt: yield_expr
raise_stmt: 'raise' [test ['from' test]]
import_stmt: import_name | import_from
import_name: 'import' dotted_as_names
import_from: ('from' (('.' | '...')* dotted_name | ('.' | '...')+)
'import' ('*' | '(' import_as_names ')' | import_as_names))
import_as_name: NAME ['as' NAME]
dotted_as_name: dotted_name ['as' NAME]
import_as_names: import_as_name (',' import_as_name)* [',']
dotted_as_names: dotted_as_name (',' dotted_as_name)*
dotted_name: NAME ('.' NAME)*
global_stmt: 'global' NAME (',' NAME)*
nonlocal_stmt: 'nonlocal' NAME (',' NAME)*
assert_stmt: 'assert' test [',' test]
compound_stmt: (if_stmt | while_stmt | for_stmt | try_stmt | with_stmt
| funcdef | classdef | decorated)
if_stmt: 'if' test ':' suite ('elif' test ':' suite)* ['else' ':' suite]
while_stmt: 'while' test ':' suite ['else' ':' suite]
for_stmt: 'for' exprlist 'in' testlist ':' suite ['else' ':' suite]
try_stmt: ('try' ':' suite
((except_clause ':' suite)+
['else' ':' suite]
['finally' ':' suite] |
'finally' ':' suite))
with_stmt: 'with' test [ with_var ] ':' suite
with_var: 'as' expr
except_clause: 'except' [test ['as' NAME]]
suite: simple_stmt | NEWLINE INDENT stmt+ DEDENT
test: or_test ['if' or_test 'else' test] | lambdef
test_nocond: or_test | lambdef_nocond
lambdef: 'lambda' [varargslist] ':' test
lambdef_nocond: 'lambda' [varargslist] ':' test_nocond
or_test: and_test ('or' and_test)*
and_test: not_test ('and' not_test)*
not_test: 'not' not_test | comparison
comparison: star_expr (comp_op star_expr)*
comp_op: '<'|'>'|'=='|'>='|'<='|'!='|'<>'|'in'|'not' 'in'|'is'|'is' 'not'
star_expr: ['*'] expr
expr: xor_expr ('|' xor_expr)*
xor_expr: and_expr ('^' and_expr)*
and_expr: shift_expr ('&' shift_expr)*
shift_expr: arith_expr (('<<'|'>>') arith_expr)*
arith_expr: term (('+'|'-') term)*
term: factor (('*'|'/'|'%'|'//') factor)*
factor: ('+'|'-'|'~') factor | power
power: atom trailer* ['**' factor]
atom: ('(' [yield_expr|testlist_comp] ')' |
'[' [testlist_comp] ']' |
'{' [dictorsetmaker] '}' |
NAME | NUMBER | STRING+ | '...' | 'None' | 'True' | 'False')
testlist_comp: test ( comp_for | (',' test)* [','] )
trailer: '(' [arglist] ')' | '[' subscriptlist ']' | '.' NAME
subscriptlist: subscript (',' subscript)* [',']
subscript: test | [test] ':' [test] [sliceop]
sliceop: ':' [test]
exprlist: star_expr (',' star_expr)* [',']
testlist: test (',' test)* [',']
dictorsetmaker: ( (test ':' test (comp_for | (',' test ':' test)* [','])) |
(test (comp_for | (',' test)* [','])) )
classdef: 'class' NAME ['(' [arglist] ')'] ':' suite
arglist: (argument ',')* (argument [',']
|'*' test (',' argument)* [',' '**' test]
|'**' test)
argument: test [comp_for] | test '=' test
comp_iter: comp_for | comp_if
comp_for: 'for' exprlist 'in' or_test [comp_iter]
comp_if: 'if' test_nocond [comp_iter]
yield_expr: 'yield' [testlist]
This diff is collapsed. Click to expand it.
This diff could not be displayed because it is too large.
"""A plugins system.
The first example shows how to load a plugin: we load
C{snakes.plugins.hello} and plug it into C{snakes.nets}, which results
in a new module that actually C{snakes.nets} extended by
C{snakes.plugins.hello}.
>>> import snakes.plugins as plugins
>>> hello_nets = plugins.load('hello', 'snakes.nets')
>>> n = hello_nets.PetriNet('N')
>>> n.hello()
Hello from N
>>> n = hello_nets.PetriNet('N', hello='Hi, this is %s!')
>>> n.hello()
Hi, this is N!
The next example shows how to simulate the effect of C{import module}:
we give to C{load} a thrid argument that is the name of the created
module, from which it becomes possible to import names or C{*}.
B{Warning:} this feature will not work C{load} is not called from the
module where we then do the C{from ... import ...}. This is exactly
the same when, from a module C{foo} that you load a module C{bar}: if
C{bar} loads other modules they will not be imported in C{foo}.
>>> plugins.load('hello', 'snakes.nets', 'another_version')
<module ...>
>>> from another_version import PetriNet
>>> n = PetriNet('another net')
>>> n.hello()
Hello from another net
>>> n = PetriNet('yet another net', hello='Hi, this is %s!')
>>> n.hello()
Hi, this is yet another net!
How to define a plugin is explained in the example C{hello}.
"""
import imp, sys, inspect
from functools import wraps
def update (module, objects) :
"""Update a module content
"""
for obj in objects :
if isinstance(obj, tuple) :
try :
n, o = obj
except :
raise ValueError("expected (name, object) and got '%r'" % obj)
setattr(module, n, o)
elif inspect.isclass(obj) or inspect.isfunction(obj) :
setattr(module, obj.__name__, obj)
else :
raise ValueError("cannot plug '%r'" % obj)
def build (name, module, *objects) :
"""Builds an extended module.
The parameter C{module} is exactly that taken by the function
C{extend} of a plugin. This list argument C{objects} holds all the
objects, constructed in C{extend}, that are extensions of objects
from C{module}. The resulting value should be returned by
C{extend}.
@param name: the name of the constructed module
@type name: C{str}
@param module: the extended module
@type module: C{module}
@param objects: the sub-objects
@type objects: each is a class object
@return: the new module
@rtype: C{module}
"""
result = imp.new_module(name)
result.__dict__.update(module.__dict__)
update(result, objects)
result.__plugins__ = (module.__dict__.get("__plugins__",
(module.__name__,))
+ (name,))
for obj in objects :
if inspect.isclass(obj) :
obj.__plugins__ = result.__plugins__
return result
def load (plugins, base, name=None) :
"""Load plugins.
C{plugins} can be a single plugin name or module or a list of such
values. If C{name} is not C{None}, the extended module is loaded
ad C{name} in C{sys.modules} as well as in the global environment
from which C{load} was called.
@param plugins: the module that implements the plugin, or its name,
or a collection of such values
@type plugins: C{str} or C{module}, or a C{list}/C{tuple}/... of
such values
@param base: the module being extended or its name
@type base: C{str} or C{module}
@param name: the name of the created module
@type name: C{str}
@return: the extended module
@rtype: C{module}
"""
if type(base) is str :
result = __import__(base, fromlist=["__name__"])
else :
result = base
if isinstance(plugins, str) :
plugins = [plugins]
else :
try :
plugins = list(plugins)
except TypeError :
plugins = [plugins]
for i, p in enumerate(plugins) :
if isinstance(p, str) and not p.startswith("snakes.plugins.") :
plugins[i] = "snakes.plugins." + p
for plug in plugins :
if type(plug) is str :
plug = __import__(plug, fromlist=["__name__"])
result = plug.extend(result)
if name is not None :
result.__name__ = name
sys.modules[name] = result
inspect.stack()[1][0].f_globals[name] = result
return result
def plugin (base, depends=[], conflicts=[]) :
"""Decorator for extension functions
@param base: name of base module (usually 'snakes.nets')
@type base: str
@param depends: list of plugins on which this one depends
@type depends: list of str
@param conflicts: list of plugins with which this one conflicts
@type conflicts: list of str
@return: the appropriate decorator
@rtype: function
"""
def wrapper (fun) :
@wraps(fun)
def extend (module) :
try :
loaded = set(module.__plugins__)
except AttributeError :
loaded = set()
for name in depends :
if name not in loaded :
module = load(name, module)
loaded.update(module.__plugins__)
conf = set(conflicts) & loaded
if len(conf) > 0 :
raise ValueError("plugin conflict (%s)" % ", ".join(conf))
objects = fun(module)
if type(objects) is not tuple :
objects = (objects,)
return build(fun.__module__, module, *objects)
module = sys.modules[fun.__module__]
module.__test__ = {"extend" : extend}
objects = fun(__import__(base, fromlist=["__name__"]))
if type(objects) is not tuple :
objects = (objects,)
update(module, objects)
return extend
return wrapper
def new_instance (cls, obj) :
"""Create a copy of C{obj} which is an instance of C{cls}
"""
result = object.__new__(cls)
result.__dict__.update(obj.__dict__)
return result
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
"""An example plugin that allows instances class C{PetriNet} to say hello.
A new method C{hello} is added. The constructor is added a keyword
argument C{hello} that must be the C{str} to print when calling
C{hello}, with one C{%s} that will be replaced by the name of the net
when C{hello} is called.
Defining a plugins need writing a module with a single function called
C{extend} that takes a single argument that is the module to be
extended.
Inside the function, extensions of the classes in the module are
defined as normal sub-classes.
The function C{extend} should return the extended module created by
C{snakes.plugins.build} that takes as arguments: the name of the
extended module, the module taken as argument and the sub-classes
defined (expected as a list argument C{*args} in no special order).
If the plugin depends on other plugins, for instance C{foo} and
C{bar}, the function C{extend} should be decorated by
C{@depends('foo', 'bar')}.
Read the source code of this module to have an example
"""
import snakes.plugins
@snakes.plugins.plugin("snakes.nets")
def extend (module) :
"""Extends C{module}
"""
class PetriNet (module.PetriNet) :
"""Extension of the class C{PetriNet} in C{module}
"""
def __init__ (self, name, **args) :
"""When extending an existing method, take care that you
may be working on an already extended class, so you so not
know how its arguments have been changed. So, always use
those from the unextended class plus C{**args}, remove
from it what your plugin needs and pass it to the method
of the extended class if you need to call it.
>>> PetriNet('N').hello()
Hello from N
>>> PetriNet('N', hello='Hi! This is %s...').hello()
Hi! This is N...
@param args: plugin options
@keyword hello: the message to print, with C{%s} where the
net name should appear.
@type hello: C{str}
"""
self._hello = args.pop("hello", "Hello from %s")
module.PetriNet.__init__(self, name, **args)
def hello (self) :
"""A new method C{hello}
>>> n = PetriNet('N')
>>> n.hello()
Hello from N
"""
print(self._hello % self.name)
return PetriNet
"""A plugin to add labels to nodes and nets.
"""
from snakes.plugins import plugin, new_instance
from snakes.pnml import Tree
@plugin("snakes.nets")
def extend (module) :
class Transition (module.Transition) :
def label (self, *get, **set) :
if not hasattr(self, "_labels") :
self._labels = {}
result = tuple(self._labels[g] for g in get)
self._labels.update(set)
if len(get) == 1 :
return result[0]
elif len(get) > 1 :
return result
elif len(set) == 0 :
return self._labels.copy()
def has_label (self, name, *names) :
if len(names) == 0 :
return name in self._labels
else :
return tuple(n in self._labels for n in (name,) + names)
def copy (self, name=None, **options) :
if not hasattr(self, "_labels") :
self._labels = {}
result = module.Transition.copy(self, name, **options)
result._labels = self._labels.copy()
return result
def __pnmldump__ (self) :
"""
>>> t = Transition('t')
>>> t.label(foo='bar', spam=42)
>>> t.__pnmldump__()
<?xml version="1.0" encoding="utf-8"?>
<pnml>
<transition id="t">
<label name="foo">
<object type="str">
bar
</object>
</label>
<label name="spam">
<object type="int">
42
</object>
</label>
</transition>
</pnml>
"""
t = module.Transition.__pnmldump__(self)
if hasattr(self, "_labels") :
for key, val in self._labels.items() :
t.add_child(Tree("label", None,
Tree.from_obj(val),
name=key))
return t
@classmethod
def __pnmlload__ (cls, tree) :
"""
>>> old = Transition('t')
>>> old.label(foo='bar', spam=42)
>>> p = old.__pnmldump__()
>>> new = Transition.__pnmlload__(p)
>>> new
Transition('t', Expression('True'))
>>> new.__class__
<class 'snakes.plugins.labels.Transition'>
>>> new.label('foo', 'spam')
('bar', 42)
"""
t = new_instance(cls, module.Transition.__pnmlload__(tree))
t._labels = dict((lbl["name"], lbl.child().to_obj())
for lbl in tree.get_children("label"))
return t
class Place (module.Place) :
def label (self, *get, **set) :
if not hasattr(self, "_labels") :
self._labels = {}
result = tuple(self._labels[g] for g in get)
self._labels.update(set)
if len(get) == 1 :
return result[0]
elif len(get) > 1 :
return result
elif len(set) == 0 :
return self._labels.copy()
def has_label (self, name, *names) :
if len(names) == 0 :
return name in self._labels
else :
return tuple(n in self._labels for n in (name,) + names)
def copy (self, name=None, **options) :
if not hasattr(self, "_labels") :
self._labels = {}
result = module.Place.copy(self, name, **options)
result._labels = self._labels.copy()
return result
def __pnmldump__ (self) :
"""
>>> p = Place('p')
>>> p.label(foo='bar', spam=42)
>>> p.__pnmldump__()
<?xml version="1.0" encoding="utf-8"?>
<pnml>
<place id="p">
<type domain="universal"/>
<initialMarking>
<multiset/>
</initialMarking>
<label name="foo">
<object type="str">
bar
</object>
</label>
<label name="spam">
<object type="int">
42
</object>
</label>
</place>
</pnml>
"""
t = module.Place.__pnmldump__(self)
if hasattr(self, "_labels") :
for key, val in self._labels.items() :
t.add_child(Tree("label", None,
Tree.from_obj(val),
name=key))
return t
@classmethod
def __pnmlload__ (cls, tree) :
"""
>>> old = Place('p')
>>> old.label(foo='bar', spam=42)
>>> p = old.__pnmldump__()
>>> new = Place.__pnmlload__(p)
>>> new
Place('p', MultiSet([]), tAll)
>>> new.__class__
<class 'snakes.plugins.labels.Place'>
>>> new.label('foo', 'spam')
('bar', 42)
"""
p = new_instance(cls, module.Place.__pnmlload__(tree))
p._labels = dict((lbl["name"], lbl.child().to_obj())
for lbl in tree.get_children("label"))
return p
class PetriNet (module.PetriNet) :
def label (self, *get, **set) :
if not hasattr(self, "_labels") :
self._labels = {}
result = tuple(self._labels[g] for g in get)
self._labels.update(set)
if len(get) == 1 :
return result[0]
elif len(get) > 1 :
return result
elif len(set) == 0 :
return self._labels.copy()
def has_label (self, name, *names) :
if len(names) == 0 :
return name in self._labels
else :
return tuple(n in self._labels for n in (name,) + names)
def copy (self, name=None, **options) :
if not hasattr(self, "_labels") :
self._labels = {}
result = module.PetriNet.copy(self, name, **options)
result._labels = self._labels.copy()
return result
def __pnmldump__ (self) :
"""
>>> n = PetriNet('n')
>>> n.label(foo='bar', spam=42)
>>> n.__pnmldump__()
<?xml version="1.0" encoding="utf-8"?>
<pnml>
<net id="n">
<label name="foo">
<object type="str">
bar
</object>
</label>
<label name="spam">
<object type="int">
42
</object>
</label>
</net>
</pnml>
"""
t = module.PetriNet.__pnmldump__(self)
if hasattr(self, "_labels") :
for key, val in self._labels.items() :
t.add_child(Tree("label", None,
Tree.from_obj(val),
name=key))
return t
@classmethod
def __pnmlload__ (cls, tree) :
"""
>>> old = PetriNet('n')
>>> old.label(foo='bar', spam=42)
>>> p = old.__pnmldump__()
>>> new = PetriNet.__pnmlload__(p)
>>> new
PetriNet('n')
>>> new.__class__
<class 'snakes.plugins.labels.PetriNet'>
>>> new.label('foo', 'spam')
('bar', 42)
"""
n = new_instance(cls, module.PetriNet.__pnmlload__(tree))
n._labels = dict((lbl["name"], lbl.child().to_obj())
for lbl in tree.get_children("label"))
return n
def merge_places (self, target, sources, **options) :
module.PetriNet.merge_places(self, target, sources, **options)
new = self.place(target)
for place in sources :
new.label(**dict(self.place(place).label()))
def merge_transitions (self, target, sources, **options) :
module.PetriNet.merge_transitions(self, target, sources, **options)
new = self.transition(target)
for trans in sources :
new.label(**dict(self.transition(trans).label()))
return Transition, Place, PetriNet
"""A plugin to compose nets.
The compositions are based on place status and automatically merge
some nodes (buffers and variables, tick transitions).
>>> import snakes.plugins
>>> snakes.plugins.load('ops', 'snakes.nets', 'nets')
<module ...>
>>> from nets import *
>>> from snakes.plugins.status import entry, internal, exit, buffer
>>> basic = PetriNet('basic')
>>> basic.add_place(Place('e', status=entry))
>>> basic.add_place(Place('x', status=exit))
>>> basic.add_transition(Transition('t'))
>>> basic.add_input('e', 't', Value(1))
>>> basic.add_output('x', 't', Value(2))
>>> basic.add_place(Place('b', [1], status=buffer('buf')))
>>> n = basic.copy()
>>> n.hide(entry)
>>> n.node('e').status
Status(None)
>>> n.hide(buffer('buf'), buffer(None))
>>> n.node('b').status
Buffer('buffer')
>>> n = basic / 'buf'
>>> n.node('[b/buf]').status
Buffer('buffer')
>>> n = basic & basic
>>> n.status(internal)
('[x&e]',)
>>> n.place('[x&e]').pre
{'[t&]': Value(2)}
>>> n.place('[x&e]').post
{'[&t]': Value(1)}
>>> n.status(buffer('buf'))
('[b&b]',)
>>> n = basic + basic
>>> n.status(entry)
('[e+e]',)
>>> list(sorted(n.place('[e+e]').post.items()))
[('[+t]', Value(1)), ('[t+]', Value(1))]
>>> n.status(exit)
('[x+x]',)
>>> list(sorted(n.place('[x+x]').pre.items()))
[('[+t]', Value(2)), ('[t+]', Value(2))]
>>> n = basic * basic
>>> n.status(entry)
('[e,x*e]',)
>>> n.place('[e,x*e]').post
{'[t*]': Value(1), '[*t]': Value(1)}
>>> n.place('[e,x*e]').pre
{'[t*]': Value(2)}
>>> n1 = basic.copy()
>>> n1.declare('global x; x=1')
>>> n2 = basic.copy()
>>> n2.globals['y'] = 2
>>> n = n1 + n2
>>> n.globals['x'], n.globals['y']
(1, 2)
>>> n._declare
['global x; x=1']
"""
import snakes.plugins
from snakes.plugins.status import Status, entry, exit, internal
from snakes.data import cross
from snakes.plugins.clusters import Cluster
def _glue (op, one, two) :
result = one.__class__("(%s%s%s)" % (one.name, op, two.name))
def new (name) :
return "[%s%s]" % (name, op)
for net in (one, two) :
result.clusters.add_child(Cluster())
result._declare = list(set(result._declare) | set(net._declare))
result.globals.update(net.globals)
for place in net.place() :
result.add_place(place.copy(new(place.name)),
cluster=[-1]+net.clusters.get_path(place.name))
for trans in net.transition() :
result.add_transition(trans.copy(new(trans.name)),
cluster=[-1]+net.clusters.get_path(trans.name))
for place, label in trans.input() :
result.add_input(new(place.name),
new(trans.name),
label.copy())
for place, label in trans.output() :
result.add_output(new(place.name),
new(trans.name),
label.copy())
def new (name) :
return "[%s%s]" % (op, name)
for status in result.status :
result.status.merge(status)
new = result.status(status)
if len(new) == 1 :
name = "[%s%s%s]" % (",".join(sorted(one.status(status))),
op,
",".join(sorted(two.status(status))))
if name != new[0] :
result.rename_node(new[0], name)
return result
@snakes.plugins.plugin("snakes.nets",
depends=["snakes.plugins.clusters",
"snakes.plugins.status"])
def extend (module) :
"Build the extended module"
class PetriNet (module.PetriNet) :
def __or__ (self, other) :
"Parallel"
return _glue("|", self, other)
def __and__ (self, other) :
"Sequence"
result = _glue("&", self, other)
remove = set()
for x, e in cross((self.status(exit), other.status(entry))) :
new = "[%s&%s]" % (x, e)
new_x, new_e = "[%s&]" % x, "[&%s]" % e
result.merge_places(new, (new_x, new_e), status=internal)
remove.update((new_x, new_e))
for p in remove :
result.remove_place(p)
return result
def __add__ (self, other) :
"Choice"
result = _glue("+", self, other)
for status in (entry, exit) :
remove = set()
for l, r in cross((self.status(status),
other.status(status))) :
new = "[%s+%s]" % (l, r)
new_l, new_r = "[%s+]" % l, "[+%s]" % r
result.merge_places(new, (new_l, new_r), status=status)
remove.update((new_l, new_r))
for p in remove :
result.remove_place(p)
return result
def __mul__ (self, other) :
"Iteration"
result = _glue("*", self, other)
remove = set()
for e1, x1, e2 in cross((self.status(entry),
self.status(exit),
other.status(entry))) :
new = "[%s,%s*%s]" % (e1, x1, e2)
new_e1, new_x1 = "[%s*]" % e1, "[%s*]" % x1
new_e2 = "[*%s]" % e2
result.merge_places(new, (new_e1, new_x1, new_e2),
status=entry)
remove.update((new_e1, new_x1, new_e2))
for p in remove :
result.remove_place(p)
return result
def hide (self, old, new=None) :
if new is None :
new = Status(None)
for node in self.status(old) :
self.set_status(node, new)
def __div__ (self, name) :
result = self.copy()
for node in result.node() :
result.rename_node(node.name, "[%s/%s]" % (node, name))
for status in result.status :
if status._value == name :
result.hide(status, status.__class__(status._name, None))
return result
def __truediv__ (self, other) :
return self.__div__(other)
return PetriNet
"""A plugin to add positions to the nodes.
- C{Place} and C{Transition} constructors are added an optional
argument C{pos=(x,y)} to set their position
- C{Place} and C{Transition} are added an attribute C{pos} that is
pair of numbers with attributes C{x} and C{y} and methods
C{shift(dx, dy)} and C{moveto(x, y)}
- Petri nets are added methods C{bbox()} that returns a pair of
extrema C{((xmin, ymin), (xmax, ymax))}, a method C{shift(dx, dy)}
that shift all the nodes, and a method C{transpose()} that rotates
the net in such a way that the top-down direction becomes
left-right
>>> import snakes.plugins
>>> snakes.plugins.load('pos', 'snakes.nets', 'nets')
<module ...>
>>> from nets import PetriNet, Place, Transition
>>> n = PetriNet('N')
>>> n.add_place(Place('p00'))
>>> n.add_transition(Transition('t10', pos=(1, 0)))
>>> n.add_place(Place('p11', pos=(1, 1)))
>>> n.add_transition(Transition('t01', pos=(0, 1)))
>>> n.node('t10').pos
Position(1, 0)
>>> n.node('t10').pos.x
1
>>> n.node('t10').pos.y
0
>>> n.node('t10').pos.y = 1
Traceback (most recent call last):
...
AttributeError: readonly attribute
>>> n.node('t10').pos()
(1, 0)
>>> n.bbox()
((0, 0), (1, 1))
>>> n.shift(1, 2)
>>> n.bbox()
((1, 2), (2, 3))
>>> n.node('t01').copy().pos
Position(1, 3)
>>> n.transpose()
>>> n.node('t01').pos
Position(-3, 1)
"""
from snakes import SnakesError
from snakes.compat import *
from snakes.plugins import plugin, new_instance
from snakes.pnml import Tree
class Position (object) :
"The position of a node"
def __init__ (self, x, y) :
self.__dict__["x"] = x
self.__dict__["y"] = y
def __str__ (self) :
return "(%s, %s)" % (str(self.x), str(self.y))
def __repr__ (self) :
return "Position(%s, %s)" % (str(self.x), str(self.y))
def __setattr__ (self, name, value) :
if name in ("x", "y") :
raise AttributeError("readonly attribute")
else :
self.__dict__[name] = value
def moveto (self, x, y) :
self.__init__(x, y)
def shift (self, dx, dy) :
self.__init__(self.x + dx, self.y + dy)
def __getitem__ (self, rank) :
if rank == 0 :
return self.x
elif rank == 1 :
return self.y
else :
raise IndexError("Position index out of range")
def __iter__ (self) :
yield self.x
yield self.y
def __call__ (self) :
return (self.x, self.y)
@plugin("snakes.nets")
def extend (module) :
class Place (module.Place) :
def __init__ (self, name, tokens=[], check=None, **args) :
x, y = args.pop("pos", (0, 0))
self.pos = Position(x, y)
module.Place.__init__(self, name, tokens, check, **args)
def copy (self, name=None, **args) :
x, y = args.pop("pos", self.pos())
result = module.Place.copy(self, name, **args)
result.pos.moveto(x, y)
return result
def __pnmldump__ (self) :
"""
>>> p = Place('p', pos=(1, 2))
>>> p.__pnmldump__()
<?xml version="1.0" encoding="utf-8"?>
<pnml>
<place id="p">
<type domain="universal"/>
<initialMarking>
<multiset/>
</initialMarking>
<graphics>
<position x="1" y="2"/>
</graphics>
</place>
</pnml>
"""
t = module.Place.__pnmldump__(self)
try :
gfx = t.child("graphics")
except SnakesError :
gfx = Tree("graphics", None)
t.add_child(gfx)
gfx.add_child(Tree("position", None,
x=str(self.pos.x),
y=str(self.pos.y)))
return t
@classmethod
def __pnmlload__ (cls, tree) :
"""
>>> old = Place('p', pos=(1, 2))
>>> p = old.__pnmldump__()
>>> new = Place.__pnmlload__(p)
>>> new.pos
Position(1, 2)
>>> new
Place('p', MultiSet([]), tAll)
>>> new.__class__
<class 'snakes.plugins.pos.Place'>
"""
result = new_instance(cls, module.Place.__pnmlload__(tree))
try :
p = tree.child("graphics").child("position")
x, y = eval(p["x"]), eval(p["y"])
result.pos = Position(x, y)
except SnakesError :
result.pos = Position(0, 0)
return result
class Transition (module.Transition) :
def __init__ (self, name, guard=None, **args) :
x, y = args.pop("pos", (0, 0))
self.pos = Position(x, y)
module.Transition.__init__(self, name, guard, **args)
def copy (self, name=None, **args) :
x, y = args.pop("pos", self.pos())
result = module.Transition.copy(self, name, **args)
result.pos.moveto(x, y)
return result
def __pnmldump__ (self) :
"""
>>> t = Transition('t', pos=(2, 1))
>>> t.__pnmldump__()
<?xml version="1.0" encoding="utf-8"?>
<pnml>
<transition id="t">
<graphics>
<position x="2" y="1"/>
</graphics>
</transition>
</pnml>
"""
t = module.Transition.__pnmldump__(self)
t.add_child(Tree("graphics", None,
Tree("position", None,
x=str(self.pos.x),
y=str(self.pos.y))))
return t
@classmethod
def __pnmlload__ (cls, tree) :
"""
>>> old = Transition('t', pos=(2, 1))
>>> p = old.__pnmldump__()
>>> new = Transition.__pnmlload__(p)
>>> new.pos
Position(2, 1)
>>> new
Transition('t', Expression('True'))
>>> new.__class__
<class 'snakes.plugins.pos.Transition'>
"""
result = new_instance(cls, module.Transition.__pnmlload__(tree))
try :
p = tree.child("graphics").child("position")
x, y = eval(p["x"]), eval(p["y"])
result.pos = Position(x, y)
except SnakesError :
result.pos = Position(0, 0)
return result
class PetriNet (module.PetriNet) :
def add_place (self, place, **args) :
if "pos" in args :
x, y = args.pop("pos")
place.pos.moveto(x, y)
module.PetriNet.add_place(self, place, **args)
def add_transition (self, trans, **args) :
if "pos" in args :
x, y = args.pop("pos")
trans.pos.moveto(x, y)
module.PetriNet.add_transition(self, trans, **args)
def merge_places (self, target, sources, **args) :
pos = args.pop("pos", None)
module.PetriNet.merge_places(self, target, sources, **args)
if pos is None :
pos = reduce(complex.__add__,
(complex(*self._place[name].pos())
for name in sources)) / len(sources)
x, y = pos.real, pos.imag
else :
x, y = pos
self._place[target].pos.moveto(x, y)
def merge_transitions (self, target, sources, **args) :
pos = args.pop("pos", None)
module.PetriNet.merge_transitions(self, target, sources, **args)
if pos is None :
pos = reduce(complex.__add__,
(complex(*self._trans[name].pos())
for name in sources)) / len(sources)
x, y = pos.real, pos.imag
else :
x, y = pos
self._trans[target].pos.moveto(x, y)
def bbox (self) :
if len(self._node) == 0 :
return (0, 0), (0, 0)
else :
nodes = iter(self._node.values())
xmin, ymin = next(nodes).pos()
xmax, ymax = xmin, ymin
for n in nodes :
x, y = n.pos()
xmin = min(xmin, x)
xmax = max(xmax, x)
ymin = min(ymin, y)
ymax = max(ymax, y)
return (xmin, ymin), (xmax, ymax)
def shift (self, dx, dy) :
for node in self.node() :
node.pos.shift(dx, dy)
def transpose (self) :
for node in self.node() :
x, y = node.pos()
node.pos.moveto(-y, x)
return Place, Transition, PetriNet, Position
from snakes.plugins import plugin
from snakes.pnml import Tree, loads, dumps
import imp, sys, socket, traceback, operator
class QueryError (Exception) :
pass
class Query (object) :
def __init__ (self, name, *larg, **karg) :
self._name = name
self._larg = tuple(larg)
self._karg = dict(karg)
__pnmltag__ = "query"
def __pnmldump__ (self) :
"""
>>> Query('set', 'x', 42).__pnmldump__()
<?xml version="1.0" encoding="utf-8"?>
<pnml>
<query name="set">
<argument>
<object type="str">
x
</object>
</argument>
<argument>
<object type="int">
42
</object>
</argument>
</query>
</pnml>
>>> Query('test', x=1).__pnmldump__()
<?xml version="1.0" encoding="utf-8"?>
<pnml>
<query name="test">
<keyword name="x">
<object type="int">
1
</object>
</keyword>
</query>
</pnml>
>>> Query('test', 'x', 42, y=1).__pnmldump__()
<?xml version="1.0" encoding="utf-8"?>
<pnml>
<query name="test">
<argument>
<object type="str">
x
</object>
</argument>
<argument>
<object type="int">
42
</object>
</argument>
<keyword name="y">
<object type="int">
1
</object>
</keyword>
</query>
</pnml>
>>> Query('set', 'x', Query('call', 'x.upper')).__pnmldump__()
<?xml version="1.0" encoding="utf-8"?>
<pnml>
<query name="set">
<argument>
<object type="str">
x
</object>
</argument>
<argument>
<query name="call">
<argument>
<object type="str">
x.upper
</object>
</argument>
</query>
</argument>
</query>
</pnml>
"""
children = []
for arg in self._larg :
children.append(Tree("argument", None,
Tree.from_obj(arg)))
for name, value in self._karg.items() :
children.append(Tree("keyword", None,
Tree.from_obj(value),
name=name))
return Tree(self.__pnmltag__, None, *children, **{"name": self._name})
@classmethod
def __pnmlload__ (cls, tree) :
"""
>>> Tree._tag2obj = {'query': Query}
>>> t = Query('test', 'x', 42, y=1).__pnmldump__()
>>> q = Query.__pnmlload__(t)
>>> q._name
'test'
>>> q._larg
('x', 42)
>>> q._karg
{'y': 1}
"""
larg = (child.child().to_obj()
for child in tree.get_children("argument"))
karg = dict((child["name"], child.child().to_obj())
for child in tree.get_children("keyword"))
return cls(tree["name"], *larg, **karg)
def run (self, envt) :
"""
>>> import imp
>>> env = imp.new_module('environment')
>>> Query('set', 'x', 'hello').run(env)
>>> env.x
'hello'
>>> Query('set', 'x', Query('call', 'x.upper')).run(env)
>>> env.x
'HELLO'
>>> Query('test', 1, 2, 3).run(env)
Traceback (most recent call last):
...
QueryError: unknown query 'test'
"""
try :
handler = getattr(self, "_run_%s" % self._name)
except AttributeError :
raise QueryError("unknown query %r" % self._name)
self._envt = envt
larg = tuple(a.run(envt) if isinstance(a, self.__class__) else a
for a in self._larg)
karg = dict((n, v.run(envt) if isinstance(v, self.__class__) else v)
for n, v in self._karg.items())
try :
return handler(*larg, **karg)
except TypeError :
cls, val, tb = sys.exc_info()
try :
fun, msg = str(val).strip().split("()", 1)
except :
raise val
if fun.startswith("_run_") and hasattr(self, fun) :
raise TypeError(fun[5:] + "()" + msg)
raise val
def _get_object (self, path) :
obj = self._envt
for n in path :
obj = getattr(obj, n)
return obj
def _run_set (self, name, value) :
"""
>>> import imp
>>> env = imp.new_module('environment')
>>> Query('set', 'x', 1).run(env)
>>> env.x
1
"""
path = name.split(".")
setattr(self._get_object(path[:-1]), path[-1], value)
def _run_get (self, name) :
"""
>>> import imp
>>> env = imp.new_module('environment')
>>> env.x = 2
>>> Query('get', 'x').run(env)
2
"""
path = name.split(".")
return self._get_object(path)
def _run_del (self, name) :
"""
>>> import imp
>>> env = imp.new_module('environment')
>>> env.x = 2
>>> Query('del', 'x').run(env)
>>> env.x
Traceback (most recent call last):
...
AttributeError: 'module' object has no attribute 'x'
"""
path = name.split(".")
delattr(self._get_object(path[:-1]), path[-1])
def _run_call (self, fun, *larg, **karg) :
"""
>>> import imp
>>> env = imp.new_module('environment')
>>> env.x = 'hello'
>>> Query('call', 'x.center', 7).run(env)
' hello '
>>> env.__dict__.update(__builtins__)
>>> Query('call', Query('call', 'getattr',
... Query('call', 'x.center', 7),
... 'upper')).run(env)
' HELLO '
"""
if isinstance(fun, str) :
fun = self._get_object(fun.split("."))
return fun(*larg, **karg)
@plugin("snakes.nets")
def extend (module) :
class UDPServer (object) :
def __init__ (self, port, size=2**20, verbose=0) :
self._size = size
self._verbose = verbose
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.bind(("", port))
self._env = imp.new_module("snk")
self._env.__dict__.update(__builtins__)
self._env.__dict__.update(operator.__dict__)
self._env.__dict__.update(module.__dict__)
def recvfrom (self) :
return self._sock.recvfrom(self._size)
def sendto (self, data, address) :
self._sock.sendto(data.strip() + "\n", address)
def run (self) :
while True :
data, address = self.recvfrom()
data = data.strip()
if self._verbose :
print("# query from %s:%u" % address)
try :
if self._verbose > 1 :
print(data)
res = loads(data).run(self._env)
if res is None :
res = Tree("answer", None, status="ok")
else :
res = Tree("answer", None, Tree.from_obj(res),
status="ok")
except :
cls, val, tb = sys.exc_info()
res = Tree("answer", str(val).strip(),
error=cls.__name__, status="error")
if self._verbose > 1 :
print("# error")
for entry in traceback.format_exception(cls, val, tb) :
for line in entry.splitlines() :
print("## %s" % line)
if self._verbose :
if self._verbose > 1 :
print("# answer")
print(res.to_pnml())
elif res["status"] == "error" :
print("# answer: %s: %s" % (res["error"], res.data))
else :
print("# answer: %s" % res["status"])
self.sendto(res.to_pnml(), address)
class TCPServer (UDPServer) :
def __init__ (self, port, size=2**20, verbose=0) :
self._size = size
self._verbose = verbose
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.bind(("", port))
self._sock.listen(1)
self._env = imp.new_module("snk")
self._env.__dict__.update(__builtins__)
self._env.__dict__.update(operator.__dict__)
self._env.__dict__.update(module.__dict__)
self._connection = {}
def recvfrom (self) :
connection, address = self._sock.accept()
self._connection[address] = connection
parts = []
while True :
parts.append(connection.recv(self._size))
if len(parts[-1]) < self._size :
break
return "".join(parts), address
def sendto (self, data, address) :
self._connection[address].send(data.rstrip() + "\n")
self._connection[address].close()
del self._connection[address]
return Query, UDPServer, TCPServer
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.
File mode changed
from snakes import SnakesError
class CompilationError (SnakesError) :
pass
class DeclarationError (SnakesError) :
pass
This diff is collapsed. Click to expand it.
import heapq
from snakes.nets import StateGraph
import snakes.lang
import snkast as ast
class Checker (object) :
def __init__ (self, net) :
self.g = StateGraph(net)
self.f = [self.build(f) for f in net.label("asserts")]
def build (self, tree) :
src = """
def check (_) :
return %s
""" % tree.st.source()[7:]
ctx = dict(self.g.net.globals)
ctx["bounded"] = self.bounded
exec(src, ctx)
fun = ctx["check"]
fun.lineno = tree.lineno
return fun
def bounded (self, marking, max) :
return all(len(marking(p)) == 1 for p in marking)
def run (self) :
for state in self.g :
marking = self.g.net.get_marking()
for place in marking :
if max(marking(place).values()) > 1 :
return None, self.trace(state)
for check in self.f :
try :
if not check(marking) :
return check.lineno, self.trace(state)
except :
pass
return None, None
def path (self, tgt, src=0) :
q = [(0, src, ())]
visited = set()
while True :
(c, v1, path) = heapq.heappop(q)
if v1 not in visited :
path = path + (v1,)
if v1 == tgt :
return path
visited.add(v1)
for v2 in self.g.successors(v1) :
if v2 not in visited :
heapq.heappush(q, (c+1, v2, path))
def trace (self, state) :
path = self.path(state)
return tuple(self.g.successors(i)[j]
for i, j in zip(path[:-1], path[1:]))
import sys, optparse, os.path
import pdb, traceback
import snakes.plugins
from snakes.utils.abcd.build import Builder
from snakes.lang.abcd.parser import parse
from snakes.lang.pgen import ParseError
from snakes.utils.abcd import CompilationError, DeclarationError
from snakes.utils.abcd.simul import Simulator
from snakes.utils.abcd.checker import Checker
##
## error messages
##
ERR_ARG = 1
ERR_OPT = 2
ERR_IO = 3
ERR_PARSE = 4
ERR_PLUGIN = 5
ERR_COMPILE = 6
ERR_OUTPUT = 7
ERR_BUG = 255
def err (message) :
sys.stderr.write("abcd: %s\n" % message.strip())
def die (code, message=None) :
if message :
err(message)
if options.debug :
pdb.post_mortem(sys.exc_info()[2])
else :
sys.exit(code)
def bug () :
sys.stderr.write("""
********************************************************************
*** An unexpected error ocurred. Please report this bug to ***
*** <franck.pommereau@gmail.com>, together with the execution ***
*** trace below and, if possible, a stripped-down version of the ***
*** ABCD source code that caused this bug. Thank you for your ***
*** help in improving SNAKES! ***
********************************************************************
""")
traceback.print_exc()
if options.debug :
pdb.post_mortem(sys.exc_info()[2])
else :
sys.exit(ERR_BUG)
##
## options parsing
##
gv_engines = ("dot", "neato", "twopi", "circo", "fdp")
opt = optparse.OptionParser(prog="abcd",
usage="%prog [OPTION]... FILE")
opt.add_option("-l", "--load",
dest="plugins", action="append", default=[],
help="load plugin (this option can be repeated)",
metavar="PLUGIN")
opt.add_option("-p", "--pnml",
dest="pnml", action="store", default=None,
help="save net as PNML",
metavar="OUTFILE")
for engine in gv_engines :
opt.add_option("-" + engine[0], "--" + engine,
dest="gv" + engine, action="store", default=None,
help="draw net using '%s' (from GraphViz)" % engine,
metavar="OUTFILE")
opt.add_option("-a", "--all-names",
dest="allnames", action="store_true", default=False,
help="draw control-flow places names (default: hide)")
opt.add_option("--debug",
dest="debug", action="store_true", default=False,
help="launch debugger on compiler error (default: no)")
opt.add_option("-s", "--simul",
dest="simul", action="store_true", default=False,
help="launch interactive code simulator")
opt.add_option("--check",
dest="check", action="store_true", default=False,
help="check assertions")
def getopts (args) :
global options, abcd
(options, args) = opt.parse_args(args)
plugins = []
for p in options.plugins :
plugins.extend(t.strip() for t in p.split(","))
if "ops" not in options.plugins :
plugins.append("ops")
if "labels" not in plugins :
plugins.append("labels")
for engine in gv_engines :
gvopt = getattr(options, "gv%s" % engine)
if gvopt and "gv" not in plugins :
plugins.append("gv")
break
options.plugins = plugins
if len(args) < 1 :
err("no input file provided")
opt.print_help()
die(ERR_ARG)
elif len(args) > 1 :
err("more than one input file provided")
opt.print_help()
die(ERR_ARG)
abcd = args[0]
if options.pnml == abcd :
err("input file also used as output (--pnml)")
opt.print_help()
die(ERR_ARG)
for engine in gv_engines :
if getattr(options, "gv%s" % engine) == abcd :
err("input file also used as output (--%s)" % engine)
opt.print_help()
die(ERR_ARG)
##
## drawing nets
##
def place_attr (place, attr) :
# fix color
if place.status == snk.entry :
attr["fillcolor"] = "green"
elif place.status == snk.internal :
pass
elif place.status == snk.exit :
attr["fillcolor"] = "yellow"
else :
attr["fillcolor"] = "lightblue"
# fix shape
if (not options.allnames
and place.status in (snk.entry, snk.internal, snk.exit)) :
attr["shape"] = "circle"
# render marking
if place._check == snk.tBlackToken :
count = len(place.tokens)
if count == 0 :
marking = " "
elif count == 1 :
marking = "@"
else :
marking = "%s@" % count
else :
marking = str(place.tokens)
# node label
if (options.allnames
or place.status not in (snk.entry, snk.internal, snk.exit)) :
attr["label"] = "%s\\n%s" % (place.name, marking)
else :
attr["label"] = "%s" % marking
def trans_attr (trans, attr) :
pass
def arc_attr (label, attr) :
if label == snk.Value(snk.dot) :
del attr["label"]
elif isinstance(label, snk.Test) :
attr["arrowhead"] = "none"
attr["label"] = " %s " % label._annotation
elif isinstance(label, snk.Flush) :
attr["arrowhead"] = "box"
attr["label"] = " %s " % label._annotation
def draw (net, engine, target) :
try :
net.draw(target, engine=engine,
place_attr=place_attr,
trans_attr=trans_attr,
arc_attr=arc_attr)
except :
die(ERR_OUTPUT, str(sys.exc_info()[1]))
##
## save pnml
##
def save_pnml (net, target) :
try :
out = open(target, "w")
out.write(snk.dumps(net))
out.close()
except :
die(ERR_OUTPUT, str(sys.exc_info()[1]))
##
## main
##
def main (args=sys.argv[1:], src=None) :
global snk
# get options
try:
if src is None :
getopts(args)
else :
getopts(list(args) + ["<string>"])
except SystemExit :
raise
except :
die(ERR_OPT, str(sys.exc_info()[1]))
# read source
if src is not None :
source = src
else :
try :
source = open(abcd).read()
except :
die(ERR_IO, "could not read input file %r" % abcd)
# parse
try :
node = parse(source, filename=abcd)
except ParseError :
die(ERR_PARSE, str(sys.exc_info()[1]))
except :
bug()
# compile
dirname = os.path.dirname(abcd)
if dirname and dirname not in sys.path :
sys.path.append(dirname)
elif "." not in sys.path :
sys.path.append(".")
try :
snk = snakes.plugins.load(options.plugins, "snakes.nets", "snk")
except :
die(ERR_PLUGIN, str(sys.exc_info()[1]))
build = Builder(snk)
try :
net = build.build(node)
net.label(srcfile=abcd)
except (CompilationError, DeclarationError) :
die(ERR_COMPILE, str(sys.exc_info()[1]))
except :
bug()
# output
if options.pnml :
save_pnml(net, options.pnml)
for engine in gv_engines :
target = getattr(options, "gv%s" % engine)
if target :
draw(net, engine, target)
trace, lineno = [], None
if options.check :
lineno, trace = Checker(net).run()
if options.simul :
Simulator(snk, source, net, trace, lineno).run()
elif trace :
if lineno is None :
print("unsafe execution:")
else :
asserts = dict((a.lineno, a) for a in net.label("asserts"))
print("line %s, %r failed:"
% (lineno, asserts[lineno].st.source()))
for trans, mode in trace :
print(" %s %s" % (trans, mode))
return net
if __name__ == "__main__" :
main()
This diff is collapsed. Click to expand it.
from snakes.lang.abcd.parser import ast
class NodeCopier (ast.NodeTransformer) :
def copy (self, node, **replace) :
args = {}
for name in node._fields + node._attributes :
old = getattr(node, name, None)
if name in replace :
new = replace[name]
elif isinstance(old, list):
new = []
for val in old :
if isinstance(val, ast.AST) :
new.append(self.visit(val))
else :
new.append(val)
elif isinstance(old, ast.AST):
new = self.visit(old)
else :
new = old
args[name] = new
if hasattr(node, "st") :
args["st"] = node.st
return node.__class__(**args)
def generic_visit (self, node) :
return self.copy(node)
class ArgsBinder (NodeCopier) :
def __init__ (self, args, buffers, nets, tasks) :
NodeCopier.__init__(self)
self.args = args
self.buffers = buffers
self.nets = nets
self.tasks = tasks
def visit_Name (self, node) :
if node.id in self.args :
return self.copy(self.args[node.id])
else :
return self.copy(node)
def visit_Instance (self, node) :
if node.net in self.nets :
return self.copy(node, net=self.nets[node.net])
else :
return self.copy(node)
def _visit_access (self, node) :
if node.buffer in self.buffers :
return self.copy(node, buffer=self.buffers[node.buffer])
else :
return self.copy(node)
def visit_SimpleAccess (self, node) :
return self._visit_access(node)
def visit_FlushAccess (self, node) :
return self._visit_access(node)
def visit_SwapAccess (self, node) :
return self._visit_access(node)
def _visit_task (self, node) :
if node.net in self.tasks :
return self.copy(node, net=self.tasks[node.net])
else :
return self.copy(node)
def visit_Spawn (self, node) :
return self._visit_task(node)
def visit_Wait (self, node) :
return self._visit_task(node)
def visit_Suspend (self, node) :
return self._visit_task(node)
def visit_Resume (self, node) :
return self._visit_task(node)
def visit_AbcdNet (self, node) :
args = self.args.copy()
buffers = self.buffers.copy()
nets = self.nets.copy()
tasks = self.tasks.copy()
netargs = ([a.arg for a in node.args.args + node.args.kwonlyargs]
+ [node.args.vararg, node.args.kwarg])
copy = True
for a in netargs :
for d in (args, buffers, nets, tasks) :
if a in d :
del d[a]
copy = False
if copy :
return self.copy(node)
else :
return self.__class__(args, buffers, nets, tasks).visit(node)
if __name__ == "__main__" :
import doctest
doctest.testmod()
This diff is collapsed. Click to expand it.
from snakes.lang.ctlstar.parser import parse
from snakes.lang.ctlstar import asdl as ast
from snakes.lang import getvars, bind
import _ast
class SpecError (Exception) :
def __init__ (self, node, reason) :
Exception.__init__(self, "[line %s] %s" % (node.lineno, reason))
def astcopy (node) :
if not isinstance(node, _ast.AST) :
return node
attr = {}
for name in node._fields + node._attributes :
value = getattr(node, name)
if isinstance(value, list) :
attr[name] = [astcopy(child) for child in value]
else :
attr[name] = astcopy(value)
return node.__class__(**attr)
class Builder (object) :
def __init__ (self, spec) :
self.spec = spec
self.decl = {}
for node in spec.atoms + spec.properties :
if node.name in self.decl :
raise SpecError(node, "%r already declare line %s"
% (name.name, self.decl[name.name].lineno))
self.decl[node.name] = node
self.main = spec.main
def build (self, node) :
node = astcopy(node)
return self._build(node, {})
def _build (self, node, ctx) :
if isinstance(node, ast.atom) :
try :
builder = getattr(self, "_build_%s" % node.__class__.__name__)
except AttributeError :
node.atomic = True
else :
node = builder(node, ctx)
node.atomic = True
elif isinstance(node, ast.CtlBinary) :
node.left = self._build(node.left, ctx)
node.right = self._build(node.right, ctx)
node.atomic = (isinstance(node.op, (ast.boolop, ast.Imply,
ast.Iff))
and node.left.atomic
and node.right.atomic)
elif isinstance(node, ast.CtlUnary) :
node.child = self._build(node.child, ctx)
node.atomic = (isinstance(node.op, ast.Not)
and node.child.atomic)
else :
assert False, "how did we get there?"
return node
def _build_place (self, param, ctx) :
if isinstance(param, ast.Parameter) :
if param.name not in ctx :
raise SpecError(param, "place %r should be instantiated"
% param.name)
return ctx[param.name]
else :
return param
def _build_InPlace (self, node, ctx) :
node.data = [bind(child, ctx) for child in node.data]
node.place = self._build_place(node.place, ctx)
return node
def _build_NotInPlace (self, node, ctx) :
return self._build_InPlace(node, ctx)
def _build_EmptyPlace (self, node, ctx) :
node.place = self._build_place(node.place, ctx)
return node
def _build_MarkedPlace (self, node, ctx) :
return self._build_EmptyPlace(node, ctx)
# skip Deadlock and Boolean: nothing to do
def _build_Quantifier (self, node, ctx) :
node.place = self._build_place(node.place, ctx)
ctx = ctx.copy()
for name in node.vars :
ctx[name] = ast.Token(name, node.place.place)
node.child = self._build(node.child, ctx)
return node
def _build_Instance (self, node, ctx) :
if node.name not in self.decl :
raise SpecError(node, "undeclared object %r" % node.name)
ctx = ctx.copy()
decl = self.decl[node.name]
for arg in decl.args :
ctx[arg.name] = arg
if isinstance(decl, ast.Property) :
return self._build_Instance_Property(node, decl, ctx)
else :
return self._build_Instance_Atom(node, decl, ctx)
def _build_Instance_Property (self, node, prop, ctx) :
bound = set(a.name for a in prop.args)
args = dict((a.arg, a.annotation) for a in node.args)
for param in prop.params :
if param.name in bound :
raise SpecError(node, "argument %r already bound"
% param.name)
elif param.name in args :
arg = args.pop(param.name)
bound.add(param.name)
else :
raise SpecError(node, "missing argument %r" % param.name)
if param.type == "place" :
if not isinstance(arg, ast.Place) :
raise SpecError(node, "expected place for %r"
% param.name)
arg.name = param.name
ctx[param.name] = arg
if args :
raise SpecError(node, "too many arguments (%s)"
% ", ".join(repr(a) for a in args))
return self._build(astcopy(prop.body), ctx)
def _build_Instance_Atom (self, node, atom, ctx) :
bound = set(a.name for a in atom.args)
args = dict((a.arg, a.annotation) for a in node.args)
new = astcopy(atom)
for param in atom.params :
if param.name in bound :
raise SpecError(node, "argument %r already bound"
% param.name)
elif param.name in args :
arg = args.pop(param.name)
bound.add(param.name)
else :
raise SpecError(node, "missing argument %r" % param.name)
if param.type == "place" :
if not isinstance(arg, ast.Place) :
raise SpecError(node, "expected place for %r"
% param.name)
arg.name = param.name
else :
arg = ast.Argument(name=param.name,
value=arg,
type=param.type)
new.args.append(arg)
if args :
raise SpecError(node, "too many arguments (%s)"
% ", ".join(repr(a) for a in args))
del new.params[:]
return new
def build (spec, main=None) :
b = Builder(spec)
if main is None :
return b.build(spec.main)
else :
return b.build(main)
#!/bin/sh
exec python bin/abcd --dot ,railroad.png \
--pnml ,railroad.pnml \
doc/examples/abcd/railroad.abcd
\ No newline at end of file
import doctest, sys, os, glob
retcode = 0
import snakes
version = open("VERSION").read().strip()
if snakes.version != version :
print("Mismatched versions:")
print(" snakes.version = %r" % snakes.version)
print(" VERSION = %r" % version)
sys.exit(1)
def test (module) :
print(" Testing '%s'" % module.__name__)
f, t = doctest.testmod(module, #verbose=True,
optionflags=doctest.NORMALIZE_WHITESPACE
| doctest.REPORT_ONLY_FIRST_FAILURE
| doctest.ELLIPSIS)
return f
modules = ["snakes",
"snakes.hashables",
"snakes.lang",
"snakes.lang.python.parser",
"snakes.lang.abcd.parser",
"snakes.lang.ctlstar.parser",
"snakes.data",
"snakes.typing",
"snakes.nets",
"snakes.pnml",
"snakes.plugins",
"snakes.plugins.pos",
"snakes.plugins.status",
"snakes.plugins.ops",
"snakes.plugins.synchro",
"snakes.plugins.hello",
"snakes.plugins.gv",
"snakes.plugins.clusters",
"snakes.plugins.labels",
"snakes.utils.abcd.build",
]
stop = False
if len(sys.argv) > 1 :
if sys.argv[1] == "--stop" :
stop = True
del sys.argv[1]
doscripts = True
if len(sys.argv) > 1 :
modules = sys.argv[1:]
doscripts = False
for modname in modules :
try :
__import__(modname)
retcode = max(retcode, test(sys.modules[modname]))
if retcode and stop :
break
except :
print(" Could not test %r:" % modname)
c, e, t = sys.exc_info()
print(" %s: %s" % (c.__name__, e))
if doscripts :
for script in (glob.glob("test-scripts/test*.sh")
+ glob.glob("test-scripts/test*.py")) :
print(" Running '%s'" % script)
retcode = max(retcode, os.system(script))
if retcode and stop :
break
sys.exit(retcode)
(provide 'abcd-mode)
(define-derived-mode abcd-mode python-mode "ABCD"
(font-lock-add-keywords
nil
`((,(concat "\\<\\(buffer\\|typedef\\|net\\|enum\\|task\\|const\\|symbol\\)\\>")
1 font-lock-keyword-face t))))
This diff is collapsed. Click to expand it.
This diff is collapsed. Click to expand it.