Toggle navigation
Toggle navigation
This project
Loading...
Sign in
Franck Pommereau
/
snakes
Go to a project
Toggle navigation
Toggle navigation pinning
Projects
Groups
Snippets
Help
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Authored by
Franck Pommereau
2013-03-18 11:12:17 +0100
Browse Files
Options
Browse Files
Download
Email Patches
Plain Diff
Commit
baa7af9f24ae2f0fceefc168d89b3c1681c3f2cc
baa7af9f
1 parent
1157b819
doc update + synchro fixes
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
1019 additions
and
267 deletions
snakes/data.py
snakes/plugins/clusters.py
snakes/plugins/gv.py
snakes/plugins/labels.py
snakes/plugins/ops.py
snakes/plugins/pos.py
snakes/plugins/status.py
snakes/plugins/synchro.py
snakes/utils/apidoc.py
snakes/data.py
View file @
baa7af9
...
...
@@ -895,6 +895,25 @@ class Substitution (object) :
for
var
in
other
:
res
.
_dict
[
var
]
=
self
(
other
(
var
))
return
res
def
restrict
(
self
,
domain
)
:
"""Restrict the substitution to `domain`, ie remove all
elements that are not in `domain`. Note that `domain` may
include names that are not in the substitution, they are
simply ignored.
>>> s = Substitution(a=1, b=2, c=3, d=4).restrict(['a', 'b', 'z'])
>>> list(sorted(s.domain()))
['a', 'b']
@param domain: the new domain as a set/list/... of names
@type domain: `iterable`
@return: the restricted substitution
@rtype: `Substitution`
"""
result
=
self
.
copy
()
for
name
in
result
.
domain
()
-
set
(
domain
)
:
result
.
_dict
.
pop
(
name
,
None
)
return
result
class
Symbol
(
object
)
:
"""A symbol that may be used as a constant
...
...
snakes/plugins/clusters.py
View file @
baa7af9
"""
@todo: revise (actually make) documentation
"""This plugin defines a data structure `Cluster` that allows to group
nodes hierarchically. This is used by plugin `gv` to improve the
graphical layout of Petri nets obtained using plugin `ops`.
In general, this plugin is probably not needed by anyone,
consequently, documentation will be very terse.
"""
import
snakes.plugins
...
...
@@ -8,8 +12,13 @@ from snakes.pnml import Tree
from
snakes.data
import
iterate
class
Cluster
(
object
)
:
"""A hierarchical data structure to organise strings (intended to
be node names).
"""
def
__init__
(
self
,
nodes
=
[],
children
=
[])
:
"""
"""Create a cluster whose to-level nodes are `nodes` and with
sub-clusters given in `children`.
>>> Cluster(['a', 'b'],
... [Cluster(['1', '2'],
... [Cluster(['A'])]),
...
...
@@ -23,6 +32,7 @@ class Cluster (object) :
for
child
in
children
:
self
.
add_child
(
child
)
__pnmltag__
=
"clusters"
# apidoc skip
def
__pnmldump__
(
self
)
:
"""
>>> Cluster(['a', 'b'],
...
...
@@ -51,6 +61,7 @@ class Cluster (object) :
for
child
in
self
.
_children
:
result
.
add_child
(
Tree
.
from_obj
(
child
))
return
result
# apidoc skip
@classmethod
def
__pnmlload__
(
cls
,
tree
)
:
"""
...
...
@@ -74,8 +85,10 @@ class Cluster (object) :
else
:
result
.
add_child
(
child
.
to_obj
())
return
result
# apidoc skip
def
__str__
(
self
)
:
return
"cluster_
%
s"
%
str
(
id
(
self
))
.
replace
(
"-"
,
"m"
)
# apidoc skip
def
__repr__
(
self
)
:
"""
>>> Cluster(['a', 'b'],
...
...
@@ -92,6 +105,7 @@ class Cluster (object) :
return
"
%
s([
%
s], [
%
s])"
%
(
self
.
__class__
.
__name__
,
", "
.
join
(
repr
(
n
)
for
n
in
self
.
nodes
()),
", "
.
join
(
repr
(
c
)
for
c
in
self
.
children
()))
# apidoc skip
def
copy
(
self
)
:
"""
>>> Cluster(['a', 'b'],
...
...
@@ -108,13 +122,20 @@ class Cluster (object) :
return
self
.
__class__
(
self
.
_nodes
,
(
child
.
copy
()
for
child
in
self
.
_children
))
def
get_path
(
self
,
name
)
:
"""
"""Get the path of a name inside the cluster. This path is a
list of indexes for each child-cluster within its parent.
>>> Cluster(['a', 'b'],
... [Cluster(['1', '2'],
... [Cluster(['A'])]),
... Cluster(['3', '4', '5'],
... [Cluster(['C', 'D'])])]).get_path('C')
[1, 0]
@param name: the searched name
@type name: `str`
@return: the list of indexes as `int` values
@rtype: `list`
"""
if
name
in
self
.
_nodes
:
return
[]
...
...
@@ -123,7 +144,9 @@ class Cluster (object) :
if
name
in
child
:
return
[
num
]
+
child
.
get_path
(
name
)
def
add_node
(
self
,
name
,
path
=
None
)
:
"""
"""Add `name` to the cluster, optionally at a given position
`path`.
>>> c = Cluster(['a', 'b'],
... [Cluster(['1', '2'],
... [Cluster(['A'])]),
...
...
@@ -139,6 +162,12 @@ class Cluster (object) :
[Cluster(['A'], [])]),
Cluster(['...', '...', '...'],
[Cluster([...'E'...], [])])])
@param name: name to add
@type name: `str`
@param path: position where `name`should be added, given as a
list of indexes
@type path: `list`
"""
if
path
in
(
None
,
[],
())
:
self
.
_nodes
.
add
(
name
)
...
...
@@ -150,7 +179,8 @@ class Cluster (object) :
self
.
_cluster
[
name
]
=
target
return
target
def
remove_node
(
self
,
name
)
:
"""
"""Remove a name from the cluster.
>>> c = Cluster(['a', 'b'],
... [Cluster(['1', '2'],
... [Cluster(['A'])]),
...
...
@@ -163,13 +193,17 @@ class Cluster (object) :
[Cluster(['A'], [])]),
Cluster(['...', '...'],
[Cluster(['...', '...'], [])])])
@param name: name to remove
@type name: `str`
"""
if
name
in
self
.
_cluster
:
self
.
_cluster
[
name
]
.
remove_node
(
name
)
else
:
self
.
_nodes
.
remove
(
name
)
def
rename_node
(
self
,
old
,
new
)
:
"""
"""Change a name in the cluster.
>>> c = Cluster(['a', 'b'],
... [Cluster(['1', '2'],
... [Cluster(['A'])]),
...
...
@@ -182,6 +216,11 @@ class Cluster (object) :
[Cluster(['A'], [])]),
Cluster([...'42'...],
[Cluster(['...', '...'], [])])])
@param old: name to change
@type old: `str`
@param new: new name to replace `old`
@type new: `str`
"""
if
old
in
self
.
_cluster
:
self
.
_cluster
[
old
]
.
rename_node
(
old
,
new
)
...
...
@@ -194,7 +233,8 @@ class Cluster (object) :
for
child
in
self
.
children
()
:
child
.
rename_node
(
old
,
new
)
def
add_child
(
self
,
cluster
=
None
)
:
"""
"""Add a child cluster
>>> c = Cluster(['a', 'b'],
... [Cluster(['1', '2'],
... [Cluster(['A'])]),
...
...
@@ -212,6 +252,10 @@ class Cluster (object) :
[Cluster(['A'], [])]),
Cluster(['...', '...', '...'],
[Cluster(['...', '...'], [])])])])
@param cluster: the new child, if `None` is given, an empty
child is added
@type cluster: `Cluster`
"""
if
cluster
is
None
:
cluster
=
Cluster
()
...
...
@@ -220,7 +264,9 @@ class Cluster (object) :
self
.
_cluster
[
node
]
=
cluster
self
.
_children
.
append
(
cluster
)
def
nodes
(
self
,
all
=
False
)
:
"""
"""Returns the nodes in the cluster: only the top-level ones
is `all` is `False`, or all the nodes otherwise.
>>> list(sorted(Cluster(['a', 'b'],
... [Cluster(['1', '2'],
... [Cluster(['A'])]),
...
...
@@ -233,6 +279,12 @@ class Cluster (object) :
... Cluster(['3', '4', '5'],
... [Cluster(['C', 'D'])])]).nodes(True)))
['1', '2', '3', '4', '5', 'A', 'C', 'D', 'a', 'b']
@param all: whether all the nodes should be returned or only
the top-level ones
@type all: `bool`
@return: list of nodes
@rtype: `list`
"""
if
all
:
result
=
set
()
...
...
@@ -242,7 +294,8 @@ class Cluster (object) :
else
:
return
set
(
self
.
_nodes
)
def
children
(
self
)
:
"""
"""Return the children of the cluster.
>>> Cluster(['a', 'b'],
... [Cluster(['1', '2'],
... [Cluster(['A'])]),
...
...
@@ -252,10 +305,14 @@ class Cluster (object) :
[Cluster(['A'], [])]),
Cluster(['...', '...', '...'],
[Cluster(['...', '...'], [])]))
@return: the children of `self`
@rtype: `tuple`
"""
return
tuple
(
self
.
_children
)
def
__contains__
(
self
,
name
)
:
"""
"""Test if a name is in the cluster.
>>> c = Cluster(['a', 'b'],
... [Cluster(['1', '2'],
... [Cluster(['A'])]),
...
...
@@ -267,6 +324,11 @@ class Cluster (object) :
False
>>> '4' in c
True
@param name: the node to test
@type name: `str`
@return: whether `name` is in the cluster
@rtype: `bool`
"""
if
name
in
self
.
_nodes
:
return
True
...
...
@@ -275,7 +337,9 @@ class Cluster (object) :
return
True
return
False
def
__iter__
(
self
)
:
"""
"""Iterate over the clusters and its children, yielding lists
of nodes at each level.
>>> c = Cluster(['a', 'b'],
... [Cluster(['1', '2'],
... [Cluster(['A'])]),
...
...
@@ -297,6 +361,31 @@ class Cluster (object) :
@snakes.plugins.plugin
(
"snakes.nets"
)
def
extend
(
module
)
:
class
PetriNet
(
module
.
PetriNet
)
:
"""Class `PetriNet`is extended so that instances have an
attribute `clusters` to which the nodes are added.
"""
def
add_place
(
self
,
place
,
**
options
)
:
"""
@param place: the place to add
@type place: `Place`
@param options: additional options for plugins
@keyword cluster: position of the new place in the cluster
"""
path
=
options
.
pop
(
"cluster"
,
None
)
module
.
PetriNet
.
add_place
(
self
,
place
,
**
options
)
self
.
clusters
.
add_node
(
place
.
name
,
path
)
def
add_transition
(
self
,
trans
,
**
options
)
:
"""
@param trans: the transition to add
@type trans: `Transition`
@param options: additional options for plugins
@keyword cluster: position of the new transition in the
cluster
"""
path
=
options
.
pop
(
"cluster"
,
None
)
module
.
PetriNet
.
add_transition
(
self
,
trans
,
**
options
)
self
.
clusters
.
add_node
(
trans
.
name
,
path
)
# apidoc stop
def
__init__
(
self
,
name
,
**
options
)
:
module
.
PetriNet
.
__init__
(
self
,
name
,
**
options
)
self
.
clusters
=
Cluster
()
...
...
@@ -313,17 +402,9 @@ def extend (module) :
result
=
new_instance
(
cls
,
module
.
PetriNet
.
__pnmlload__
(
tree
))
result
.
clusters
=
tree
.
child
(
Cluster
.
__pnmltag__
)
.
to_obj
()
return
result
def
add_place
(
self
,
place
,
**
options
)
:
path
=
options
.
pop
(
"cluster"
,
None
)
module
.
PetriNet
.
add_place
(
self
,
place
,
**
options
)
self
.
clusters
.
add_node
(
place
.
name
,
path
)
def
remove_place
(
self
,
name
,
**
options
)
:
module
.
PetriNet
.
remove_place
(
self
,
name
,
**
options
)
self
.
clusters
.
remove_node
(
name
)
def
add_transition
(
self
,
trans
,
**
options
)
:
path
=
options
.
pop
(
"cluster"
,
None
)
module
.
PetriNet
.
add_transition
(
self
,
trans
,
**
options
)
self
.
clusters
.
add_node
(
trans
.
name
,
path
)
def
remove_transition
(
self
,
name
,
**
options
)
:
module
.
PetriNet
.
remove_transition
(
self
,
name
,
**
options
)
self
.
clusters
.
remove_node
(
name
)
...
...
snakes/plugins/gv.py
View file @
baa7af9
"""Draw Petri nets using PyGraphViz
"""Adds methods to draw `PetriNet` and `StateGraph` instances using
GraphViz.
* adds a method `draw` to `PetriNet` and `StateGraph` that creates
a drawing of the object in a file.
For example, let's first define a Petri net:
>>> import snakes.plugins
>>> snakes.plugins.load('gv', 'snakes.nets', 'nets')
...
...
@@ -16,18 +16,35 @@
>>> n.add_output('p11', 't10', Expression('x+1'))
>>> n.add_input('p11', 't01', Variable('y'))
>>> n.add_output('p00', 't01', Expression('y-1'))
Thanks to plugin `gv`, we can draw it using the various engines of
GraphViz; we can also draw the state graph:
>>> for engine in ('neato', 'dot', 'circo', 'twopi', 'fdp') :
... n.draw(',test-gv-
%
s.png'
%
engine, engine=engine)
>>> s = StateGraph(n)
>>> s.build()
>>> s.draw(',test-gv-graph.png')
>>> for node in sorted(n.node(), key=str) :
The plugin also allows to layout the nodes without drawing the net
(this is only available for `PetriNet`, not for `StateGraph`). We
first move every node to position `(-100, -100)`; then, we layout the
net; finally, we check that every node has indeed been moved away from
where we had put it:
>>> for node in n.node() :
... node.pos.moveto(-100, -100)
>>> all(node.pos.x == node.pos.y == -100 for node in n.node())
True
>>> n.layout()
>>> any(node.pos
== (-100, -100) for node in sorted(n.node(), key=str
))
>>> any(node.pos
.x == node.pos.y == -100 for node in n.node(
))
False
@todo: revise documentation
@note: setting nodes position has no influence on how a net is drawn:
GraphViz will redo the layout in any case. Method `layout` is here
just in the case you would need a layout of your nets.
@note: this plugin depens on plugins `pos` and `clusters` (that are
automatically loaded)
"""
import
os
,
os.path
,
subprocess
,
collections
...
...
@@ -35,6 +52,7 @@ import snakes.plugins
from
snakes.plugins.clusters
import
Cluster
from
snakes.compat
import
*
# apidoc skip
class
Graph
(
Cluster
)
:
def
__init__
(
self
,
attr
)
:
Cluster
.
__init__
(
self
)
...
...
@@ -150,37 +168,56 @@ class Graph (Cluster) :
"snakes.plugins.pos"
])
def
extend
(
module
)
:
class
PetriNet
(
module
.
PetriNet
)
:
"An extension with
a method `draw
`"
def
draw
(
self
,
filename
=
None
,
engine
=
"dot"
,
debug
=
False
,
"An extension with
methods `draw` and `layout
`"
def
draw
(
self
,
filename
,
engine
=
"dot"
,
debug
=
False
,
graph_attr
=
None
,
cluster_attr
=
None
,
place_attr
=
None
,
trans_attr
=
None
,
arc_attr
=
None
)
:
"""
@param filename: the name of the image file to create or
`None` if only the computed graph is needed
@type filename: `None` or `str`
@param engine: the layout engine to use: 'dot' (default),
'neato', 'circo', 'twopi' or 'fdp'
"""Draw the Petri net to a picture file. How the net is
rendered can be controlled using the arguments `..._attr`.
For instance, to draw place in red with names in
uppercase, and hide True guards, we can proceed as
follows:
>>> import snakes.plugins
>>> snakes.plugins.load('gv', 'snakes.nets', 'nets')
<module ...>
>>> from nets import *
>>> n = PetriNet('N')
>>> n.add_place(Place('p'))
>>> n.add_transition(Transition('t'))
>>> n.add_input('p', 't', Value(dot))
>>> def draw_place (place, attr) :
... attr['label'] = place.name.upper()
... attr['color'] = '#FF0000'
>>> def draw_transition (trans, attr) :
... if str(trans.guard) == 'True' :
... attr['label'] = trans.name
... else :
... attr['label'] = '
%
s
\\
n
%
s'
%
(trans.name, trans.guard)
>>> n.draw(',net-with-colors.png',
... place_attr=draw_place, trans_attr=draw_transition)
@param filename: the name of the image file to create
@type filename: `str`
@param engine: the layout engine to use: `'dot'` (default),
`'neato'`, `'circo'`, `'twopi'` or `'fdp'`
@type engine: `str`
@param place_attr: a function to format places, it will be
called with the place and its attributes dict as
parameters
@type place_attr: `
function(Place,dict)->Non
e`
@type place_attr: `
callabl
e`
@param trans_attr: a function to format transitions, it
will be called with the transition and its attributes
dict as parameters
@type trans_attr: `
function(Transition,dict)->Non
e`
@type trans_attr: `
callabl
e`
@param arc_attr: a function to format arcs, it will be
called with the label and its attributes dict as
parameters
@type arc_attr: `
function(ArcAnnotation,dict)->Non
e`
@type arc_attr: `
callabl
e`
@param cluster_attr: a function to format clusters of
nodes, it will be called with the cluster and its
attributes dict as parameters
@type cluster_attr:
`function(snakes.plugins.clusters.Cluster,dict)->None`
@return: `None` if `filename` is not `None`, the computed
graph otherwise
@rtype: `None` or `pygraphviz.AGraph`
@type cluster_attr: `callable`
"""
nodemap
=
dict
((
node
.
name
,
"node_
%
s"
%
num
)
for
num
,
node
in
enumerate
(
self
.
node
()))
...
...
@@ -236,17 +273,51 @@ def extend (module) :
def
layout
(
self
,
xscale
=
1.0
,
yscale
=
1.0
,
engine
=
"dot"
,
debug
=
False
,
graph_attr
=
None
,
cluster_attr
=
None
,
place_attr
=
None
,
trans_attr
=
None
,
arc_attr
=
None
)
:
"""Layout the nodes of the Petri net by calling GraphViz
and reading back the picture it creates. The effect is to
change attributes `pos` (see plugin `pos`) for every node
according to the positions calculated by GraphViz.
@param xscale: how much the image is scaled in the
horizontal axis after GraphViz has done the layout
@type xscale: `float`
@param yscale: how much the image is scaled in the
vertical axis after GraphViz has done the layout
@type yscale: `float`
@param filename: the name of the image file to create
@type filename: `str`
@param engine: the layout engine to use: `'dot'` (default),
`'neato'`, `'circo'`, `'twopi'` or `'fdp'`
@type engine: `str`
@param place_attr: a function to format places, it will be
called with the place and its attributes dict as
parameters
@type place_attr: `callable`
@param trans_attr: a function to format transitions, it
will be called with the transition and its attributes
dict as parameters
@type trans_attr: `callable`
@param arc_attr: a function to format arcs, it will be
called with the label and its attributes dict as
parameters
@type arc_attr: `callable`
@param cluster_attr: a function to format clusters of
nodes, it will be called with the cluster and its
attributes dict as parameters
@type cluster_attr: `callable`
"""
g
=
self
.
draw
(
None
,
engine
,
debug
,
graph_attr
,
cluster_attr
,
place_attr
,
trans_attr
,
arc_attr
)
node
=
dict
((
v
,
k
)
for
k
,
v
in
g
.
nodemap
.
items
())
for
n
,
x
,
y
in
g
.
layout
(
engine
,
debug
)
:
self
.
node
(
node
[
n
])
.
pos
.
moveto
(
x
*
xscale
,
y
*
yscale
)
class
StateGraph
(
module
.
StateGraph
)
:
"An extension with a method `draw`"
def
draw
(
self
,
filename
=
None
,
engine
=
"dot"
,
debug
=
False
,
def
draw
(
self
,
filename
,
engine
=
"dot"
,
debug
=
False
,
node_attr
=
None
,
edge_attr
=
None
,
graph_attr
=
None
)
:
"""@param filename: the name of the image file to create or
"""Draw the state graph to a picture file.
@param filename: the name of the image file to create or
`None` if only the computed graph is needed
@type filename: `None` or `str`
@param engine: the layout engine to use: 'dot' (default),
...
...
@@ -255,19 +326,16 @@ def extend (module) :
@param node_attr: a function to format nodes, it will be
called with the state number, the `StateGraph` object
and attributes dict as parameters
@type node_attr: `
function(int,StateGraph,dict)->Non
e`
@type node_attr: `
callabl
e`
@param edge_attr: a function to format edges, it will be
called with the transition, its mode and attributes
dict as parameters
@type trans_attr:
`
function(Transition,Substitution,dict)->Non
e`
`
callabl
e`
@param graph_attr: a function to format grapg, it will be
called with the state graphe and attributes dict as
parameters
@type graph_attr: `function(StateGraph,dict)->None`
@return: `None` if `filename` is not `None`, the computed
graph otherwise
@rtype: `None` or `pygraphviz.AGraph`
@type graph_attr: `callable`
"""
attr
=
dict
(
style
=
"invis"
,
splines
=
"true"
)
...
...
snakes/plugins/labels.py
View file @
baa7af9
"""A plugin to add labels to nodes and nets.
"""A plugin to add labels to nodes and nets. Labels are names (valid
Python identifiers) associated to arbitrary objects.
@todo: revise (actually make) documentation
>>> import snakes.plugins
>>> snakes.plugins.load('labels', 'snakes.nets', 'nets')
<module ...>
>>> from nets import *
>>> t = Transition('t')
>>> t.label(foo='bar', spam=42)
>>> t.label('foo')
'bar'
>>> t.label('spam')
42
Note that when nodes in a Petri net are merged, their labels are
merged too, but in an arbitrary order. So, for example:
>>> n = PetriNet('N')
>>> n.add_place(Place('p1'))
>>> n.place('p1').label(foo='bar', spam='ham')
>>> n.add_place(Place('p2'))
>>> n.place('p2').label(hello='world', spam='egg')
>>> n.merge_places('p', ['p1', 'p2'])
>>> n.place('p').label('hello')
'world'
>>> n.place('p').label('foo')
'bar'
>>> n.place('p').label('spam') in ['ham', 'egg']
True
In the latter statement, we cannot know whether the label will be one
or the other value because merging has been done in an arbitrary
order.
"""
from
snakes.plugins
import
plugin
,
new_instance
...
...
@@ -10,6 +40,22 @@ from snakes.pnml import Tree
def
extend
(
module
)
:
class
Transition
(
module
.
Transition
)
:
def
label
(
self
,
*
get
,
**
set
)
:
"""Get and set labels for the transition. The labels given
in `get` will be returned as a `tuple` and the labels
assigned in `set` will be changed accordingly. If a label
is given both in `get`and `set`, the returned value is
that it had at the beginning of the call, ie, before it is
set by the call.
@param get: labels which values have to be returned
@type get: `str`
@param set: labels which values have to be changed
@type set: `object`
@return: the tuples of values corresponding to `get`
@rtype: `tuple`
@raise KeyError: when a label given in `get` has not been
assigned previouly
"""
if
not
hasattr
(
self
,
"_labels"
)
:
self
.
_labels
=
{}
result
=
tuple
(
self
.
_labels
[
g
]
for
g
in
get
)
...
...
@@ -21,10 +67,22 @@ def extend (module) :
elif
len
(
set
)
==
0
:
return
self
.
_labels
.
copy
()
def
has_label
(
self
,
name
,
*
names
)
:
"""Check is a label has been assigned to the transition.
@param name: the label to check
@type name: `str`
@param names: additional labels to check, if used, the
return value is a `tuple` of `bool` instead of a
single `bool`
@return: a Boolean indicating of the checked labels are
present or not in the transitions
@rtype: `bool`
"""
if
len
(
names
)
==
0
:
return
name
in
self
.
_labels
else
:
return
tuple
(
n
in
self
.
_labels
for
n
in
(
name
,)
+
names
)
# apidoc stop
def
copy
(
self
,
name
=
None
,
**
options
)
:
if
not
hasattr
(
self
,
"_labels"
)
:
self
.
_labels
=
{}
...
...
@@ -79,6 +137,7 @@ def extend (module) :
return
t
class
Place
(
module
.
Place
)
:
def
label
(
self
,
*
get
,
**
set
)
:
"See documentation for `Transition.label` above"
if
not
hasattr
(
self
,
"_labels"
)
:
self
.
_labels
=
{}
result
=
tuple
(
self
.
_labels
[
g
]
for
g
in
get
)
...
...
@@ -90,10 +149,12 @@ def extend (module) :
elif
len
(
set
)
==
0
:
return
self
.
_labels
.
copy
()
def
has_label
(
self
,
name
,
*
names
)
:
"See documentation for `Transition.has_label` above"
if
len
(
names
)
==
0
:
return
name
in
self
.
_labels
else
:
return
tuple
(
n
in
self
.
_labels
for
n
in
(
name
,)
+
names
)
# apidoc stop
def
copy
(
self
,
name
=
None
,
**
options
)
:
if
not
hasattr
(
self
,
"_labels"
)
:
self
.
_labels
=
{}
...
...
@@ -152,6 +213,7 @@ def extend (module) :
return
p
class
PetriNet
(
module
.
PetriNet
)
:
def
label
(
self
,
*
get
,
**
set
)
:
"See documentation for `Transition.label` above"
if
not
hasattr
(
self
,
"_labels"
)
:
self
.
_labels
=
{}
result
=
tuple
(
self
.
_labels
[
g
]
for
g
in
get
)
...
...
@@ -163,10 +225,12 @@ def extend (module) :
elif
len
(
set
)
==
0
:
return
self
.
_labels
.
copy
()
def
has_label
(
self
,
name
,
*
names
)
:
"See documentation for `Transition.has_label` above"
if
len
(
names
)
==
0
:
return
name
in
self
.
_labels
else
:
return
tuple
(
n
in
self
.
_labels
for
n
in
(
name
,)
+
names
)
# apidoc stop
def
copy
(
self
,
name
=
None
,
**
options
)
:
if
not
hasattr
(
self
,
"_labels"
)
:
self
.
_labels
=
{}
...
...
snakes/plugins/ops.py
View file @
baa7af9
"""A plugin to compose nets.
# -*- encoding: latin-1
"""A plugin to compose nets _à la_ Petri Box Calculus.
The compositions are based on place status and automatically merge
some nodes (buffers and variables, tick transitions).
@note: this plugin depends on plugins `clusters` and `status` that are
automatically loaded
"""
"""
## Control-flow and buffers places ##
When this module is used, places are equipped with statuses (see also
plugin `status` that provides this service). We distinguish in
particular:
* _entry_ places marked at the initial state of the net
* _exit_ places marked at a final state of the net
* _internal_ places marked during the execution
* all together, these places form the _control-flow places_ and can
be marked only by black tokens (ie, they are typed `tBlackToken`
and thus can hold only `dot` values)
* _buffer_ places that may hold data of any type, each buffer place
is given a name that is the name of the buffer modelled by the
place
Plugin `ops` exports these statuses as Python objects:[^1] `entry`,
`internal` and `exit` are instances of class `Status` while `buffer`
is a function that returns an instance of `Buffer` (a subclass of
`Status`) when called with a name as its parameter.
[^1]: All these objects are actually defined in plugin `status`
Let's define a net with one entry place, one exit place and two buffer
places. Note how we use keyword argument `status` to the place
constructor to define the status of the created place:
>>> import snakes.plugins
>>> snakes.plugins.load('ops', 'snakes.nets', 'nets')
<module ...>
>>> from nets import *
>>> from snakes.plugins.status import entry, internal, exit, buffer
>>> basic = PetriNet('basic')
>>> basic.add_place(Place('e', status=entry))
>>> basic.add_place(Place('x', status=exit))
>>> basic.add_transition(Transition('t'))
>>> basic.add_input('e', 't', Value(1))
>>> basic.add_output('x', 't', Value(2))
>>> basic.add_place(Place('b', [1], status=buffer('buf')))
>>> basic.add_input('e', 't', Value(dot))
>>> basic.add_output('x', 't', Value(dot))
>>> basic.add_place(Place('b1', [1], status=buffer('egg')))
>>> basic.add_place(Place('b2', [2], status=buffer('spam')))
>>> basic.add_input('b1', 't', Variable('x'))
>>> basic.add_output('b2', 't', Expression('x+1'))
The simplest operation is to change buffer names, let's do it on a
copy of our net. This operation is called `hide` because it is
basically used to hide a buffer:
>>> n = basic.copy()
>>> n.hide(entry)
>>> n.node('e').status
>>> n.node('b1').status
Buffer('buffer','egg')
>>> n.hide(buffer('egg'))
>>> n.node('b1').status
Status(None)
>>> n.hide(buffer('buf'), buffer(None))
>>> n.node('b').status
Buffer('buffer')
>>> n = basic / 'buf'
>>> n.node('[b/buf]').status
As we can see, place `b1` now has a dummy status. But `hide` can
accept a second argument and allows to rename a buffer:
>>> n.node('b2').status
Buffer('buffer','spam')
>>> n.hide(buffer('spam'), buffer('ham'))
>>> n.node('b2').status
Buffer('buffer','ham')
A slighlty different way for hiding buffers is to use operator `/`,
which actually constructs a new net, changing the names of every node
in the original net to `'[.../egg]'`:
>>> n = basic / 'egg'
>>> n.node('[b1/egg]').status
Buffer('buffer')
As we can see, a buffer hidden using `/` still has a buffer status but
with no name associated. Such an anonymous buffer is treated in a
special way as we'll see later on.
The systematic renaming of nodes is something we should get used to
before to continue. When one or two nets are constructed through an
operation, the nodes of the operand nets are combined in one way or
another. For an arbitrary operation `A
%
B`, the resulting net will be
called `'[A.name
%
B.name]'` (if `A` and `B` are nets). Whenever a node
`a` from `A` is combined with a node `b` from `B`, the resulting node
will be called `'[a
%
b]'`. If only one node is copied in the resulting
net, it will be called `'[a
%
]'` or `'[
%
b]'` depending on wher it comes
from. This systematic renaming allows to ensure that even if `A` and
`B` have nodes with the same names, there will be no name clash in the
result, while, at the same time allowing users to predict the new name
of a node.
## Control-flow compositions ##
Using control-flow places, it becomes possible to build nets by
composing smaller nets through control flow operations. Let's start
with the _sequential composition_ `A & B`, basically, its combines the
exit places of net `A` with the entry places of net `B`, ensuring thus
that the resulting net behaves like if we execute `A` followed by `B`.
In the example below, we use method `status` of a net to get the
places with a given status:
>>> n = basic & basic
>>> n.status(internal)
('[x&e]',)
>>> n.place('[x&e]').pre
{'[t&]': Value(
2
)}
{'[t&]': Value(
dot
)}
>>> n.place('[x&e]').post
{'[&t]': Value(1)}
>>> n.status(buffer('buf'))
('[b&b]',)
{'[&t]': Value(dot)}
>>> n.status(buffer('egg'))
('[b1&b1]',)
>>> n.status(buffer('spam'))
('[b2&b2]',)
We can see that `n` now has one internal place that is the combination
of the exit place of the left copy of `basic` with the entry place of
the right copy of `basic`. (Hence its name: `'[x&e]'`.) We can see
also how it is connected to the left/right copy of `t` as its
input/output. Last, we can see that buffer places from the two copies
of `basic` have been merged when they have had the same name. This
merging won't occur for anonymous buffers. For example, hiding
`'spam'` in the right net of the sequential composition will result in
two buffer places, one still named `'spam'` that is the copy of `'b2'`
from the left operand of `&`, another that is anonymous and is the
copy of `'[b2/spam]'` (ie, `'b2'`after its status was hidden) from the
right operand of `&`.
>>> n = basic & (basic / 'spam')
>>> n.status(buffer('spam'))
('[b2&]',)
>>> n.status(buffer(None))
('[&[b2/spam]]',)
The next operation is the _choice `A + B` that behave either as `A` or
as `B`. This is obtained by comining the entry places of both nets one
the one hand, and by combining their exit places on the other hand.
>>> n = basic + basic
>>> n.status(entry)
('[e+e]',)
>>> list(sorted(n.place('[e+e]').post.items()))
[('[+t]', Value(
1)), ('[t+]', Value(1
))]
[('[+t]', Value(
dot)), ('[t+]', Value(dot
))]
>>> n.status(exit)
('[x+x]',)
>>> list(sorted(n.place('[x+x]').pre.items()))
[('[+t]', Value(2)), ('[t+]', Value(2))]
[('[+t]', Value(dot)), ('[t+]', Value(dot))]
Another operation is the _iteration_ `A * B` that behaves by executing
`A` repeatedly (including no repetition) followed by one execution of
`B`. This is obtained by combining the entry and exit places of `A`
with the entry place of `B`.
>>> n = basic * basic
>>> n.status(entry)
('[e,x*e]',)
>>> n.place('[e,x*e]').post
{'[t*]': Value(
1), '[*t]': Value(1
)}
{'[t*]': Value(
dot), '[*t]': Value(dot
)}
>>> n.place('[e,x*e]').pre
{'[t*]': Value(
2
)}
{'[t*]': Value(
dot
)}
Finally, there is the _parallel composition_ `A | B` that just
executes both nets in parallel. But because of the merging of buffer
places, they are able to communicate.
>>> n = basic | basic
>>> n.status(buffer('egg'))
('[b1|b1]',)
>>> n.status(buffer('spam'))
('[b2|b2]',)
>>> pass
>>> n1 = basic.copy()
>>> n1.declare('global x; x=1')
>>> n2 = basic.copy()
...
...
@@ -65,8 +181,6 @@ Buffer('buffer')
(1, 2)
>>> n._declare
['global x; x=1']
@todo: revise documentation
"""
import
snakes.plugins
...
...
@@ -113,13 +227,14 @@ def _glue (op, one, two) :
depends
=
[
"snakes.plugins.clusters"
,
"snakes.plugins.status"
])
def
extend
(
module
)
:
"Build the extended module"
"""Essentially, class `PetriNet` is extended to support the binary
operations discussed above."""
class
PetriNet
(
module
.
PetriNet
)
:
def
__or__
(
self
,
other
)
:
"Parallel"
"Parallel
composition
"
return
_glue
(
"|"
,
self
,
other
)
def
__and__
(
self
,
other
)
:
"Sequen
ce
"
"Sequen
tial composition
"
result
=
_glue
(
"&"
,
self
,
other
)
remove
=
set
()
for
x
,
e
in
cross
((
self
.
status
(
exit
),
other
.
status
(
entry
)))
:
...
...
@@ -161,11 +276,13 @@ def extend (module) :
result
.
remove_place
(
p
)
return
result
def
hide
(
self
,
old
,
new
=
None
)
:
"Status hiding and renaming"
if
new
is
None
:
new
=
Status
(
None
)
for
node
in
self
.
status
(
old
)
:
self
.
set_status
(
node
,
new
)
def
__div__
(
self
,
name
)
:
"Buffer hiding"
result
=
self
.
copy
()
for
node
in
result
.
node
()
:
result
.
rename_node
(
node
.
name
,
"[
%
s/
%
s]"
%
(
node
,
name
))
...
...
@@ -173,6 +290,7 @@ def extend (module) :
if
status
.
_value
==
name
:
result
.
hide
(
status
,
status
.
__class__
(
status
.
_name
,
None
))
return
result
# apidoc skip
def
__truediv__
(
self
,
other
)
:
return
self
.
__div__
(
other
)
return
PetriNet
...
...
snakes/plugins/pos.py
View file @
baa7af9
#-*- encoding: latin-1
"""A plugin to add positions to the nodes.
* `Place` and `Transition` constructors are added an optional argument
`pos=(x,y)` to set their position
* `Place` and `Transition` are added an attribute `pos` that is pair
of numbers with attributes `x` and `y` and methods `shift(dx, dy)`
and `moveto(x, y)`
* Petri nets are added methods `bbox()` that returns a pair of
extrema `((xmin, ymin), (xmax, ymax))`, a method `shift(dx, dy)`
that shift all the nodes, and a method `transpose()` that rotates
the net in such a way that the top-down direction becomes
left-right
`Place` and `Transition` are added an optional argument for the
constructor `pos=(x,y)` to set their position. Moreover, these classes
are added an attribute `pos` that holds a pair of numbers with
attributes `x` and `y` and methods `shift(dx, dy)` and `moveto(x, y)`.
So, when the plugin is loaded, we can specify and retreive nodes
positions:
>>> import snakes.plugins
>>> snakes.plugins.load('pos', 'snakes.nets', 'nets')
...
...
@@ -17,21 +14,35 @@
>>> from nets import PetriNet, Place, Transition
>>> n = PetriNet('N')
>>> n.add_place(Place('p00'))
>>> n.add_transition(Transition('t10', pos=(1, 0)))
>>> t10 = Transition('t10', pos=(1, 0))
>>> n.add_transition(t10)
>>> n.add_place(Place('p11', pos=(1, 1)))
>>> n.add_transition(Transition('t01', pos=(0, 1)))
>>>
n.node('t10')
.pos
>>>
t10
.pos
Position(1, 0)
>>>
n.node('t10')
.pos.x
>>>
t10
.pos.x
1
>>>
n.node('t10')
.pos.y
>>>
t10
.pos.y
0
>>> n.node('t10').pos.y = 1
>>> t10.pos()
(1, 0)
Nodes positions is immutable, we must use method `moveto` to change
positions:
>>> t10.pos.y = 1
Traceback (most recent call last):
...
AttributeError: readonly attribute
>>> n.node('t10').pos()
(1, 0)
>>> t10.pos.moveto(t10.pos.x, 1)
>>> t10.pos
Position(1, 1)
Petri nets are added methods `bbox()` that returns a pair of extrema
`((xmin, ymin), (xmax, ymax))`, a method `shift(dx, dy)` that shift
all the nodes, and a method `transpose()` that rotates the net in such
a way that the top-down direction becomes left-right:
>>> n.bbox()
((0, 0), (1, 1))
>>> n.shift(1, 2)
...
...
@@ -42,8 +53,6 @@ Position(1, 3)
>>> n.transpose()
>>> n.node('t01').pos
Position(-3, 1)
@todo: revise documentation
"""
from
snakes
import
SnakesError
...
...
@@ -54,46 +63,111 @@ from snakes.pnml import Tree
class
Position
(
object
)
:
"The position of a node"
def
__init__
(
self
,
x
,
y
)
:
"""Constructor expects the Cartesian coordinates of the node,
they can be provided as `float` or `int`.
@param x: horizontal position
@type x: `float`
@param y: vertical position
@type y: `float`
"""
self
.
__dict__
[
"x"
]
=
x
self
.
__dict__
[
"y"
]
=
y
# apidoc skip
def
__str__
(
self
)
:
return
"(
%
s,
%
s)"
%
(
str
(
self
.
x
),
str
(
self
.
y
))
# apidoc skip
def
__repr__
(
self
)
:
return
"Position(
%
s,
%
s)"
%
(
str
(
self
.
x
),
str
(
self
.
y
))
# apidoc skip
def
__setattr__
(
self
,
name
,
value
)
:
if
name
in
(
"x"
,
"y"
)
:
raise
AttributeError
(
"readonly attribute"
)
else
:
self
.
__dict__
[
name
]
=
value
def
moveto
(
self
,
x
,
y
)
:
"""Change current coordinates to the specified position
@param x: horizontal position
@type x: `float`
@param y: vertical position
@type y: `float`
"""
self
.
__init__
(
x
,
y
)
def
shift
(
self
,
dx
,
dy
)
:
"""Shift current coordinates by the specified amount.
@param dx: horizontal shift
@type dx: `float`
@param dy: vertical shift
@type dy: `float`
"""
self
.
__init__
(
self
.
x
+
dx
,
self
.
y
+
dy
)
def
__getitem__
(
self
,
rank
)
:
if
rank
==
0
:
def
__getitem__
(
self
,
index
)
:
"""Access coordinates by index
>>> Position(1, 2)[0]
1
>>> Position(1, 2)[1]
2
>>> Position(1, 2)[42]
Traceback (most recent call last):
...
IndexError: Position index out of range
@param index: 0 for `x` coordinate, 1 for `y`
@type index: `int`
@raise IndexError: when `index not in {0, 1}`
"""
if
index
==
0
:
return
self
.
x
elif
rank
==
1
:
elif
index
==
1
:
return
self
.
y
else
:
raise
IndexError
(
"Position index out of range"
)
def
__iter__
(
self
)
:
"""Successively yield `x` and `y` coordinates
>>> list(Position(1, 2))
[1, 2]
"""
yield
self
.
x
yield
self
.
y
def
__call__
(
self
)
:
"""Return the position as a pair of values
>>> Position(1, 2.0)()
(1, 2.0)
@return: the pair of coordinates `(x, y)`
@rtype: `tuple`
"""
return
(
self
.
x
,
self
.
y
)
@plugin
(
"snakes.nets"
)
def
extend
(
module
)
:
class
Place
(
module
.
Place
)
:
def
__init__
(
self
,
name
,
tokens
=
[],
check
=
None
,
**
args
)
:
"""If no position is given `(0, 0)` is chosen
>>> Place('p').pos
Position(0, 0)
>>> Place('p', pos=(1,2)).pos
Position(1, 2)
@keyword pos: the position of the new place
@type pos: `tuple`
"""
x
,
y
=
args
.
pop
(
"pos"
,
(
0
,
0
))
self
.
pos
=
Position
(
x
,
y
)
module
.
Place
.
__init__
(
self
,
name
,
tokens
,
check
,
**
args
)
# apidoc skip
def
copy
(
self
,
name
=
None
,
**
args
)
:
x
,
y
=
args
.
pop
(
"pos"
,
self
.
pos
())
result
=
module
.
Place
.
copy
(
self
,
name
,
**
args
)
result
.
pos
.
moveto
(
x
,
y
)
return
result
# apidoc skip
def
__pnmldump__
(
self
)
:
"""
>>> p = Place('p', pos=(1, 2))
...
...
@@ -121,6 +195,7 @@ def extend (module) :
x
=
str
(
self
.
pos
.
x
),
y
=
str
(
self
.
pos
.
y
)))
return
t
# apidoc skip
@classmethod
def
__pnmlload__
(
cls
,
tree
)
:
"""
...
...
@@ -144,14 +219,26 @@ def extend (module) :
return
result
class
Transition
(
module
.
Transition
)
:
def
__init__
(
self
,
name
,
guard
=
None
,
**
args
)
:
"""If no position is given `(0, 0)` is chosen
>>> Transition('t').pos
Position(0, 0)
>>> Transition('t', pos=(1,2)).pos
Position(1, 2)
@keyword pos: the position of the new transition
@type pos: `tuple`
"""
x
,
y
=
args
.
pop
(
"pos"
,
(
0
,
0
))
self
.
pos
=
Position
(
x
,
y
)
module
.
Transition
.
__init__
(
self
,
name
,
guard
,
**
args
)
# apidoc skip
def
copy
(
self
,
name
=
None
,
**
args
)
:
x
,
y
=
args
.
pop
(
"pos"
,
self
.
pos
())
result
=
module
.
Transition
.
copy
(
self
,
name
,
**
args
)
result
.
pos
.
moveto
(
x
,
y
)
return
result
# apidoc skip
def
__pnmldump__
(
self
)
:
"""
>>> t = Transition('t', pos=(2, 1))
...
...
@@ -171,6 +258,7 @@ def extend (module) :
x
=
str
(
self
.
pos
.
x
),
y
=
str
(
self
.
pos
.
y
))))
return
t
# apidoc skip
@classmethod
def
__pnmlload__
(
cls
,
tree
)
:
"""
...
...
@@ -194,16 +282,57 @@ def extend (module) :
return
result
class
PetriNet
(
module
.
PetriNet
)
:
def
add_place
(
self
,
place
,
**
args
)
:
"""Position can be set also when a place is added to the
net.
>>> n = PetriNet('n')
>>> n.add_place(Place('a', pos=(1, 2)))
>>> n.node('a').pos
Position(1, 2)
>>> n.add_place(Place('b'), pos=(3,4))
>>> n.node('b').pos
Position(3, 4)
>>> n.add_place(Place('c', pos=(42, 42)), pos=(5, 6))
>>> n.node('c').pos
Position(5, 6)
@keyword pos: the position of the added place
@type pos: `tuple`
"""
if
"pos"
in
args
:
x
,
y
=
args
.
pop
(
"pos"
)
place
.
pos
.
moveto
(
x
,
y
)
module
.
PetriNet
.
add_place
(
self
,
place
,
**
args
)
def
add_transition
(
self
,
trans
,
**
args
)
:
"""Position can be set also when a transitions is added to
the net. See method `add_place` above.
@keyword pos: the position of the added transition
@type pos: `tuple`
"""
if
"pos"
in
args
:
x
,
y
=
args
.
pop
(
"pos"
)
trans
.
pos
.
moveto
(
x
,
y
)
module
.
PetriNet
.
add_transition
(
self
,
trans
,
**
args
)
def
merge_places
(
self
,
target
,
sources
,
**
args
)
:
"""When places are merged, the position of the new place
is the barycentre of the positions of the merged nodes.
Optionally, position can be specified in the call the
`merge_places`.
>>> n = PetriNet('n')
>>> n.add_place(Place('a', pos=(1,2)))
>>> n.add_place(Place('b', pos=(3,4)))
>>> n.merge_places('c', ['a', 'b'])
>>> n.node('c').pos
Position(2.0, 3.0)
>>> n.merge_places('d', ['a', 'b'], pos=(0, 0))
>>> n.node('d').pos
Position(0, 0)
@keyword pos: the position of the added transition
@type pos: `tuple`
"""
pos
=
args
.
pop
(
"pos"
,
None
)
module
.
PetriNet
.
merge_places
(
self
,
target
,
sources
,
**
args
)
if
pos
is
None
:
...
...
@@ -215,6 +344,8 @@ def extend (module) :
x
,
y
=
pos
self
.
_place
[
target
]
.
pos
.
moveto
(
x
,
y
)
def
merge_transitions
(
self
,
target
,
sources
,
**
args
)
:
"""See method `merge_places` above.
"""
pos
=
args
.
pop
(
"pos"
,
None
)
module
.
PetriNet
.
merge_transitions
(
self
,
target
,
sources
,
**
args
)
if
pos
is
None
:
...
...
@@ -226,6 +357,13 @@ def extend (module) :
x
,
y
=
pos
self
.
_trans
[
target
]
.
pos
.
moveto
(
x
,
y
)
def
bbox
(
self
)
:
"""The bounding box of the net, that is, the smallest
rectangle that contains all nodes coordinates.
@return: rectangle coordinates as `((xmin, ymin), (xmax,
ymax))`
@rtype: tuple
"""
if
len
(
self
.
_node
)
==
0
:
return
(
0
,
0
),
(
0
,
0
)
else
:
...
...
@@ -240,9 +378,19 @@ def extend (module) :
ymax
=
max
(
ymax
,
y
)
return
(
xmin
,
ymin
),
(
xmax
,
ymax
)
def
shift
(
self
,
dx
,
dy
)
:
"""Shift every node by `(dx, dy)`
@param dx: horizontal shift
@type dx: `float`
@param dy: vertical shift
@type dy: `float`
"""
for
node
in
self
.
node
()
:
node
.
pos
.
shift
(
dx
,
dy
)
def
transpose
(
self
)
:
"""Perform a clockwise 90° rotation of node coordinates,
ie, change every position `(x, y)` to `(-y, x)`
"""
for
node
in
self
.
node
()
:
x
,
y
=
node
.
pos
()
node
.
pos
.
moveto
(
-
y
,
x
)
...
...
snakes/plugins/status.py
View file @
baa7af9
"""A plugin to add nodes status.
#-*- coding: latin-1
"""Add statuses to nodes: a status is a special kind of label that is
used to define Petri nets compositions _à la_ Petri Box Calculus. See
plugin `ops` to read more about how statuses are used in practice.
Several status are defined by default: `entry`, `internal`, `exit`,
`buffer`, `safebuffer` for places and `tick` for transitions.
Internally, a status is an instance of class `Status` or one of its
subclasses that is identified by a name and an optional value. For
instance the plugin defines:
:::python
entry = Status('entry')
exit = Status('exit')
internal = Status('internal')
The second parameter is omitted which means that there is no value for
these status. Buffer places have both a name and a value for their
status: the name is `'buffer'` and the value is used as the name of
the buffer.
Status can be added to nodes either when they are created or when they
are added to the net:
>>> import snakes.plugins
>>> snakes.plugins.load('status', 'snakes.nets', 'nets')
<module ...>
>>> from nets import *
>>> import snakes.plugins.status as status
>>> n = PetriNet('N')
>>> n.add_place(Place('p1'), status=
status.
entry)
>>> n.add_place(Place('p1'), status=entry)
>>> n.place('p1')
Place('p1', MultiSet([]), tAll, status=Status('entry'))
@todo: revise documentation
>>> n.add_place(Place('p2', status=exit))
>>> n.place('p2')
Place('p2', MultiSet([]), tAll, status=Status('exit'))
"""
import
operator
,
weakref
...
...
@@ -32,12 +51,13 @@ class Status (object) :
@param name: the name of the status
@type name: `str`
@param value: an optional additional value to make a
difference between status with te same name
@type value:
hashable
difference between status with te same name
@type value:
`hashable`
"""
self
.
_name
=
name
self
.
_value
=
value
__pnmltag__
=
"status"
# apidoc skip
def
__pnmldump__
(
self
)
:
"""Dump a `Status` as a PNML tree
...
...
@@ -62,6 +82,7 @@ class Status (object) :
return
Tree
(
self
.
__pnmltag__
,
None
,
Tree
(
"name"
,
self
.
_name
),
Tree
(
"value"
,
None
,
Tree
.
from_obj
(
self
.
_value
)))
# apidoc skip
@classmethod
def
__pnmlload__
(
cls
,
tree
)
:
"""Create a `Status` from a PNML tree
...
...
@@ -77,6 +98,7 @@ class Status (object) :
"""
return
cls
(
tree
.
child
(
"name"
)
.
data
,
tree
.
child
(
"value"
)
.
child
()
.
to_obj
())
# apidoc skip
def
copy
(
self
)
:
"""Return a copy of the status
...
...
@@ -87,6 +109,7 @@ class Status (object) :
@rtype: `Status`
"""
return
self
.
__class__
(
self
.
_name
,
self
.
_value
)
# apidoc skip
def
__str__
(
self
)
:
"""Short textual representation
...
...
@@ -102,6 +125,7 @@ class Status (object) :
return
str
(
self
.
_name
)
else
:
return
"
%
s(
%
s)"
%
(
self
.
_name
,
self
.
_value
)
# apidoc skip
def
__repr__
(
self
)
:
"""Detailed textual representation
...
...
@@ -118,6 +142,7 @@ class Status (object) :
else
:
return
"
%
s(
%
s,
%
s)"
%
(
self
.
__class__
.
__name__
,
repr
(
self
.
_name
),
repr
(
self
.
_value
))
# apidoc skip
def
__hash__
(
self
)
:
"""Hash a status
...
...
@@ -125,6 +150,7 @@ class Status (object) :
@rtype: `int`
"""
return
hash
((
self
.
_name
,
self
.
_value
))
# apidoc skip
def
__eq__
(
self
,
other
)
:
"""Compares two status for equality
...
...
@@ -146,17 +172,22 @@ class Status (object) :
return
(
self
.
_name
,
self
.
_value
)
==
(
other
.
_name
,
other
.
_value
)
except
:
return
False
# apidoc skip
def
__ne__
(
self
,
other
)
:
return
not
(
self
==
other
)
# apidoc skip
def
__add__
(
self
,
other
)
:
if
self
==
other
:
return
self
.
copy
()
else
:
raise
ConstraintError
(
"incompatible status"
)
# apidoc skip
def
name
(
self
)
:
return
self
.
_name
# apidoc skip
def
value
(
self
)
:
return
self
.
_value
# apidoc skip
def
merge
(
self
,
net
,
nodes
,
name
=
None
)
:
"""Merge `nodes` in `net` into a new node called `name`
...
...
@@ -179,7 +210,24 @@ exit = Status('exit')
internal
=
Status
(
'internal'
)
class
Buffer
(
Status
)
:
"A status for buffer places"
"""Status for buffer places, it can be used to merge all the nodes
with the same buffer name. For example:
>>> import snakes.plugins
>>> snakes.plugins.load('status', 'snakes.nets', 'nets')
<module ...>
>>> from nets import *
>>> n = PetriNet('N')
>>> n.add_place(Place('p3', range(2), status=buffer('buf')))
>>> n.add_place(Place('p4', range(3), status=buffer('buf')))
>>> n.status.merge(buffer('buf'), 'b')
>>> p = n.place('b')
>>> p
Place('b', MultiSet([...]), tAll, status=Buffer('buffer','buf'))
>>> p.tokens == MultiSet([0, 0, 1, 1, 2])
True
"""
# apidoc skip
def
merge
(
self
,
net
,
nodes
,
name
=
None
)
:
"""Merge `nodes` in `net`
...
...
@@ -190,22 +238,6 @@ class Buffer (Status) :
If `name` is `None` the name generated is a concatenation of
the nodes names separated by '+', with parenthesis outside.
>>> import snakes.plugins
>>> snakes.plugins.load('status', 'snakes.nets', 'nets')
<module ...>
>>> from nets import *
>>> n = PetriNet('N')
>>> import snakes.plugins.status as status
>>> buf = status.buffer('buf')
>>> n.add_place(Place('p3', range(2), status=buf))
>>> n.add_place(Place('p4', range(3), status=buf))
>>> n.status.merge(buf, 'b')
>>> p = n.place('b')
>>> p
Place('b', MultiSet([...]), tAll, status=Buffer('buffer','buf'))
>>> p.tokens == MultiSet([0, 0, 1, 1, 2])
True
@param net: the Petri net where places should be merged
@type net: `PetriNet`
@param nodes: a collection of place names to be merged
...
...
@@ -225,6 +257,9 @@ class Buffer (Status) :
def
buffer
(
name
)
:
"""Generate a buffer status called `name`
>>> buffer('foo')
Buffer('buffer','foo')
@param name: the name of the buffer
@type name: `str`
@return: `Buffer('buffer', name)`
...
...
@@ -233,7 +268,31 @@ def buffer (name) :
return
Buffer
(
'buffer'
,
name
)
class
Safebuffer
(
Buffer
)
:
"A status for safe buffers (ie, variables) places"
"""A status for safe buffers (ie, variables) places. The only
difference with `Buffer` status is that when buffer places with
`SafeBuffer` status are merged, they must have all the same
marking which also becomes the marking of the resulting place
(instead of adding the markings of the merged places).
>>> import snakes.plugins
>>> snakes.plugins.load('status', 'snakes.nets', 'nets')
<module ...>
>>> from nets import *
>>> n = PetriNet('N')
>>> var = safebuffer('var')
>>> n.add_place(Place('p5', [1], status=var))
>>> n.add_place(Place('p6', [1], status=var))
>>> n.add_place(Place('p7', [1], status=var))
>>> n.status.merge(var, 'v')
>>> n.place('v')
Place('v', MultiSet([1]), tAll, status=Safebuffer('safebuffer','var'))
>>> n.add_place(Place('p8', [3], status=var))
>>> n.status.merge(var, 'vv')
Traceback (most recent call last):
...
ConstraintError: incompatible markings
"""
# apidoc skip
def
merge
(
self
,
net
,
nodes
,
name
=
None
)
:
"""Merge `nodes` in `net`
...
...
@@ -245,24 +304,6 @@ class Safebuffer (Buffer) :
If `name` is `None` the name generated is a concatenation of
the nodes names separated by '+', with parenthesis outside.
>>> import snakes.plugins
>>> snakes.plugins.load('status', 'snakes.nets', 'nets')
<module ...>
>>> from nets import *
>>> import snakes.plugins.status as status
>>> n = PetriNet('N')
>>> var = status.safebuffer('var')
>>> n.add_place(Place('p5', [1], status=var))
>>> n.add_place(Place('p6', [1], status=var))
>>> n.add_place(Place('p7', [1], status=var))
>>> n.status.merge(var, 'v')
>>> n.place('v')
Place('v', MultiSet([1]), tAll, status=Safebuffer('safebuffer','var'))
>>> n.add_place(Place('p8', [3], status=var))
>>> try : n.status.merge(var, 'vv')
... except ConstraintError : print(sys.exc_info()[1])
incompatible markings
@param net: the Petri net where places should be merged
@type net: `PetriNet`
@param nodes: a collection of place names to be merged
...
...
@@ -288,6 +329,9 @@ class Safebuffer (Buffer) :
def
safebuffer
(
name
)
:
"""Generate a safebuffer status called `name`
>>> safebuffer('foo')
Safebuffer('safebuffer','foo')
@param name: the name of the safebuffer
@type name: `str`
@return: `Safebuffer('safebuffer', name)`
...
...
@@ -296,7 +340,13 @@ def safebuffer (name) :
return
Safebuffer
(
'safebuffer'
,
name
)
class
Tick
(
Status
)
:
"A status for tick transition"
"""A status for tick transition. Ticks are to transitions what
buffers are to places: they allow automatic merging of transitions
with the same tick status when nets are composed. This is used to
implement variants of the Petri Box Calculus with causal time.
When transitions are merged, their guards are `and`-ed.
"""
# apidoc skip
def
merge
(
self
,
net
,
nodes
,
name
=
None
)
:
"""Merge `nodes` in `net`
...
...
@@ -339,6 +389,9 @@ class Tick (Status) :
def
tick
(
name
)
:
"""Generate a tick status called `name`
>>> tick('spam')
Tick('tick','spam')
@param name: the name of the tick
@type name: `str`
@return: `Tick('tick', name)`
...
...
@@ -346,6 +399,7 @@ def tick (name) :
"""
return
Tick
(
'tick'
,
name
)
# apidoc skip
class
StatusDict
(
object
)
:
"A container to access the nodes of a net by their status"
def
__init__
(
self
,
net
)
:
...
...
@@ -413,8 +467,12 @@ class StatusDict (object) :
@snakes.plugins.plugin
(
"snakes.nets"
)
def
extend
(
module
)
:
"Build the extended module"
class
Place
(
module
.
Place
)
:
"""`Place` is extended to allow `status` keyword argument in
its constructor, which is later available as `status`
attribute.
"""
# apidoc stop
def
__init__
(
self
,
name
,
tokens
=
[],
check
=
None
,
**
args
)
:
self
.
status
=
args
.
pop
(
"status"
,
Status
(
None
))
module
.
Place
.
__init__
(
self
,
name
,
tokens
,
check
,
**
args
)
...
...
@@ -469,6 +527,11 @@ def extend (module) :
result
.
status
=
Status
(
None
)
return
result
class
Transition
(
module
.
Transition
)
:
"""`Transition` is extended to allow `status` keyword argument
in its constructor, which is later available as `status`
attribute.
"""
# apidoc stop
def
__init__
(
self
,
name
,
guard
=
None
,
**
args
)
:
self
.
status
=
args
.
pop
(
"status"
,
Status
(
None
))
.
copy
()
module
.
Transition
.
__init__
(
self
,
name
,
guard
,
**
args
)
...
...
@@ -519,55 +582,121 @@ def extend (module) :
return
"
%
s, status=
%
s)"
%
(
module
.
Transition
.
__repr__
(
self
)[:
-
1
],
repr
(
self
.
status
))
class
PetriNet
(
module
.
PetriNet
)
:
"""`PetriNet` is extended to allow `status` keyword argument
in several of its methods. An attributes `status` is also
available to allow retreiving nodes (actually their names) by
status or merge all the nodes with a given status.
The exact way how merging is performed depends on the exact
status: for exemple, as seen above, using `Buffer` or
`Safebuffer` does not lead to merge place the same way.
>>> import snakes.plugins
>>> snakes.plugins.load('status', 'snakes.nets', 'nets')
<module ...>
>>> from nets import *
>>> n = PetriNet('N')
>>> n.add_place(Place('a', range(2), status=buffer('buf')))
>>> n.add_place(Place('b', range(3), status=buffer('buf')))
>>> n.add_place(Place('c', range(3), status=buffer('spam')))
>>> list(sorted(n.status(buffer('buf'))))
['a', 'b']
>>> n.status.merge(buffer('buf'), 'd')
>>> list(sorted(n.status(buffer('buf'))))
['d']
Note in this example how nodes merged by status are removed
after being merge. This differs from the standard methods
`PetriNet.merge_places` and `PetriNet.merge_transitions` that
preserve the merged nodes and only add the new one.
"""
# apidoc skip
def
__init__
(
self
,
name
,
**
args
)
:
module
.
PetriNet
.
__init__
(
self
,
name
,
**
args
)
self
.
status
=
StatusDict
(
self
)
# apidoc skip
@classmethod
def
__pnmlload__
(
cls
,
tree
)
:
t
=
new_instance
(
cls
,
module
.
PetriNet
.
__pnmlload__
(
tree
))
t
.
status
=
StatusDict
(
t
)
return
t
# apidoc skip
def
copy
(
self
,
name
=
None
,
**
args
)
:
result
=
module
.
PetriNet
.
copy
(
self
,
name
,
**
args
)
result
.
status
=
self
.
status
.
copy
(
result
)
return
result
def
add_place
(
self
,
place
,
**
args
)
:
"""Extended with `status` keyword argument.
@keyword status: a status that is given to the node
@type status: `Status`
"""
place
.
status
=
args
.
pop
(
"status"
,
place
.
status
)
module
.
PetriNet
.
add_place
(
self
,
place
,
**
args
)
self
.
status
.
record
(
place
)
# apidoc skip
def
remove_place
(
self
,
name
,
**
args
)
:
place
=
self
.
place
(
name
)
self
.
status
.
remove
(
place
)
module
.
PetriNet
.
remove_place
(
self
,
name
,
**
args
)
def
add_transition
(
self
,
trans
,
**
args
)
:
"""Extended with `status` keyword argument.
@keyword status: a status that is given to the node
@type status: `Status`
"""
trans
.
status
=
args
.
pop
(
"status"
,
trans
.
status
)
module
.
PetriNet
.
add_transition
(
self
,
trans
,
**
args
)
self
.
status
.
record
(
trans
)
# apidoc skip
def
remove_transition
(
self
,
name
,
**
args
)
:
trans
=
self
.
transition
(
name
)
self
.
status
.
remove
(
trans
)
module
.
PetriNet
.
remove_transition
(
self
,
name
,
**
args
)
def
set_status
(
self
,
node
,
status
)
:
"""Assign a new status to a node.
@param node: the name of the node
@type node: `str`
@param status: a status that is given to the node
@type status: `Status`
"""
node
=
self
.
node
(
node
)
self
.
status
.
remove
(
node
)
node
.
status
=
status
self
.
status
.
record
(
node
)
# apidoc skip
def
rename_node
(
self
,
old
,
new
,
**
args
)
:
old_node
=
self
.
node
(
old
)
.
copy
()
module
.
PetriNet
.
rename_node
(
self
,
old
,
new
,
**
args
)
self
.
status
.
remove
(
old_node
)
self
.
status
.
record
(
self
.
node
(
new
))
def
copy_place
(
self
,
source
,
targets
,
**
args
)
:
"""Extended with `status` keyword argument.
@keyword status: a status that is given to the new node
@type status: `Status`
"""
status
=
args
.
pop
(
"status"
,
self
.
place
(
source
)
.
status
)
module
.
PetriNet
.
copy_place
(
self
,
source
,
targets
,
**
args
)
for
new
in
iterate
(
targets
)
:
self
.
set_status
(
new
,
status
)
def
copy_transition
(
self
,
source
,
targets
,
**
args
)
:
"""Extended with `status` keyword argument.
@keyword status: a status that is given to the new node
@type status: `Status`
"""
status
=
args
.
pop
(
"status"
,
self
.
transition
(
source
)
.
status
)
module
.
PetriNet
.
copy_transition
(
self
,
source
,
targets
,
**
args
)
for
new
in
iterate
(
targets
)
:
self
.
set_status
(
new
,
status
)
def
merge_places
(
self
,
target
,
sources
,
**
args
)
:
"""Extended with `status` keyword argument.
@keyword status: a status that is given to the new node
@type status: `Status`
"""
if
"status"
in
args
:
status
=
args
.
pop
(
"status"
)
else
:
...
...
@@ -576,6 +705,11 @@ def extend (module) :
module
.
PetriNet
.
merge_places
(
self
,
target
,
sources
,
**
args
)
self
.
set_status
(
target
,
status
)
def
merge_transitions
(
self
,
target
,
sources
,
**
args
)
:
"""Extended with `status` keyword argument.
@keyword status: a status that is given to the new node
@type status: `Status`
"""
if
"status"
in
args
:
status
=
args
.
pop
(
"status"
)
else
:
...
...
snakes/plugins/synchro.py
View file @
baa7af9
"""An implementation of the M-nets synchronisation.
This plugins extends the basic Petri net model in order to provide an
action-based synchronisation scheme that implements that of M-nets.
The plugin proposes a generalisation of the M-nets synchronisation in
that it does not impose a fixed correspondence between action names
and action arities.
"""This plugin provides an implementation of the action-based
synchronisation from algebras of Petri nets. With respect to the usual
definition of synchronisation, the plugin is slightly more general in
that it does not impose a fixed arity for action. The extensions are
as follows:
* class `Action` corresponds to a synchronisable action, it has a
name, a send/receive flag and a list of parameters. Actions have
...
...
@@ -12,60 +10,109 @@ and action arities.
arity will be able to synchronise
* class `MultiAction` corresponds to a multiset of actions. It is
forbidden to build a multiaction that holds a pair of conjugated
actions (this leads to infinite nets when synchronising)
* Transition.__init__ accepts a parameter `actions` that is a
collection of instances of `Action`, this multiaction is added in
the attribute `actions` of the transition
* PetriNet is given new methods: `synchronise(action_name)` to
perform the M-net synchronisation, `restrict(action_name)` to
perform the restriction and `scope(action_name)` for the scoping
**Remark:** the instances of `Substitution` used in this plugins must
map variable names to instances of `Variable` or `Value`, but not to
other variable names.
actions because this leads to infinite nets upon synchronisation
* the constructor of `Transition` accepts a parameter `actions` that
is a collection of instances of `Action`, this multiaction is
added in the attribute `actions` of the transition
* PetriNet is given new methods: `synchronise` to perform the
synchronisation, `restrict` to perform the restriction (ie, remove
transitions with a given action) and `scope` for the scoping (ie,
synchronisation followed by restriction)
### Example ###
Let's start with an example: we build a Petri net with two transitions
in parallel that will be synchronised later on.
>>> import snakes.plugins
>>> snakes.plugins.load('synchro', 'snakes.nets', 'nets')
<module ...>
>>> from nets import
PetriNet, Place, Transition, Expression
>>> from nets import
*
>>> n = PetriNet('N')
>>> n.add_place(Place('e1'))
>>> n.add_place(Place('x1'))
>>> n.add_transition(Transition('t1', guard=Expression('x!=y'),
... actions=[Action('a', True, [Variable('x'), Value(2)]),
... Action('a', True, [Value(3), Variable('y')]),
... Action('b', False, [Variable('x'), Variable('y')])]))
>>> m1 = [Action('a', True, [Variable('x'), Value(2)]),
... Action('b', True, [Value(3), Variable('y')]),
... Action('c', False, [Variable('x'), Variable('y')])]
>>> print(', '.join(str(action) for action in m1))
a!(x,2), b!(3,y), c?(x,y)
>>> n.add_transition(Transition('t1', guard=Expression('x!=y'), actions=m1))
>>> n.add_input('e1', 't1', Variable('x'))
>>> n.add_output('x1', 't1', Variable('z'))
>>> n.add_place(Place('e2'))
>>> n.add_place(Place('x2'))
>>> n.add_transition(Transition('t2', guard=Expression('z>0'),
... actions=[Action('a', False, [Variable('w'), Variable('y')]),
... Action('c', False, [Variable('z')])]))
>>> m2 = [Action('a', False, [Variable('w'), Variable('y')]),
... Action('d', False, [Variable('z')])]
>>> print(', '.join(str(action) for action in m2))
a?(w,y), d?(z)
>>> n.add_transition(Transition('t2', guard=Expression('z>0'), actions=m2))
>>> n.add_input('e2', 't2', Variable('w'))
>>> n.add_output('x2', 't2', Variable('z'))
>>> n.transition('t1').vars() == set(['x', 'y', 'z'])
True
>>> n.transition('t2').copy().vars() == set(['w', 'y', 'z'])
True
On transition `t1`, we have put a multiaction that can be abbreviated
as `a!(x,2), a!(3,y), b?(x,y)` and can be interpreted as three
synchronous communication performed atomically and simultaneously when
`t1` fires:
* `a!(x,2)` emits `(x,2)` on channel `a`
* `b!(2,y)` emits `(2,y)` on channel `b`
* `c?(x,y)` receives `(x,y)` on channel `c`
And similarly for transition `t2` that has two actions:
* `a?(w,y)` receives `(w,y)` on channel `a`
* `d?(z)` receives `z` on channel `d`
Thus, `t1` and `t2` hold _conjugated actions_, which are matching
emitting and receiving actions `a!(x,2)` and `a?(x,y)`. So we can
synchronise the net on `a` which builds a new transition whose firing
is exactly equivalent to the simultaneous firing of `t1` and `t2`
performing the communications over channel `a`.
>>> n.synchronise('a')
>>> for t in sorted(n.transition(), key=str) :
... print('
%
s
%
s'
%
(t, t.guard))
... for place, label in sorted(t.input(), key=str) :
... print('
%
s >>
%
s'
%
(place, label))
... for place, label in sorted(t.output(), key=str) :
... print('
%
s <<
%
s'
%
(place, label))
((t1{...}+t2{...})[a(...)]{...}+t2{...})[a(...)] (...)
...
t2 z>0
e2 >> w
x2 << z
>>> n.restrict('a')
>>> [t.name for t in sorted(n.transition(), key=str)]
["((t1{...}+t2{...})[a(...)]{...}+t2{...})[a(...)]",
"((t1{...}+t2{...})[a(...)]{...}+t2{...})[a(...)]"]
>>> t = [t for t in n.transition() if t.name not in ('t1', 't2')][0]
>>> print(t.name)
a(w,2)@(t1[x=w,y=e,z=f]+t2[y=2])
>>> print(t.guard)
((w != e)) and ((z > 0))
>>> print(', '.join(sorted(str(action) for action in t.actions)))
b!(3,e), c?(w,e), d?(z)
The second statement `t = ...` retrieves the new transition then we
print its name and guard. The names can be read as: this is the result
of execution action `a(w,2)` on `t1` substituted by `{x->w, y->e,
z->f}` and `t2` substituted by `{y->2}`. Indeed, both transitions have
variables `y` and `z`in common, so they are replaced in `t1` to avoid
names clashes, then, actions a can be `a!(x,2)` and `a?(w,y)` can be
matched by considering `x=w` and `y=2` which yields the rest of the
substitutions for the transitions. The resulting transition results
from the merging of the unified transitions, its guard is the `and` of
the guards of the merged transitions and its multiaction is the union
of the multiactions of the merged transitions minus the actions that
did synchronise.
The net now has three transitions: `t1`, `t2` and the new one
resulting from the synchronisation. This allows both synchronous and
asynchronous behaviour:
>>> for t in sorted(t.name for t in n.transition()) :
... print(t)
a(w,2)@(t1[x=w,y=e,z=f]+t2[y=2])
t1
t2
If we want to force the synchronous behaviour, we have to restrict
over `'a'` which removes any transition that hold an action `a?(...)`
or `a!(...)`. In practice, this is what we want and so we may have
used method `n.scope('a')` to apply directly the synchronisation
followed by the restriction.
@todo: revise documentation
>>> n.restrict('a')
>>> for t in sorted(t.name for t in n.transition()) :
... print(t)
a(w,2)@(t1[x=w,y=e,z=f]+t2[y=2])
"""
from
snakes
import
ConstraintError
...
...
@@ -77,15 +124,21 @@ from snakes.plugins import new_instance
from
snakes.compat
import
*
class
Action
(
object
)
:
"""Models one action with a name, a direction (send or receive)
and parameters.
"""
def
__init__
(
self
,
name
,
send
,
params
)
:
"""
"""Constructor. The direction is passed as a Boolean: `True`
for a send action, `False` for a receive.
@param name: the name of the action
@type name: `str`
@param send: a flag indicating whether this is a send or
receive action
@type send: `bool`
@param params: the list of parameters
@type params: `list` of `Variable` or `Value`
@param params: the list of parameters that must me instances
of `Variable` or `Value`
@type params: `list`
"""
self
.
name
=
name
self
.
send
=
send
...
...
@@ -99,13 +152,9 @@ class Action (object) :
<pnml>...
<action name="a" send="True">
<value>
<object type="int">
1
</object>
<object type="int">1</object>
</value>
<variable>
x
</variable>
<variable>x</variable>
</action>
</pnml>
"""
...
...
@@ -115,7 +164,7 @@ class Action (object) :
for
param
in
self
.
params
:
result
.
add_child
(
Tree
.
from_obj
(
param
))
return
result
# apidoc s
ki
p
# apidoc s
to
p
@classmethod
def
__pnmlload__
(
cls
,
tree
)
:
"""
...
...
@@ -138,7 +187,6 @@ class Action (object) :
return
"
%
s!(
%
s)"
%
(
self
.
name
,
","
.
join
([
str
(
p
)
for
p
in
self
]))
else
:
return
"
%
s?(
%
s)"
%
(
self
.
name
,
","
.
join
([
str
(
p
)
for
p
in
self
]))
# apidoc stop
def
__repr__
(
self
)
:
"""
>>> a = Action('a', True, [Value(1), Variable('x')])
...
...
@@ -213,9 +261,9 @@ class Action (object) :
Action('a', True, [Value(3), Value(2)])
@param subst: if not `None`, a substitution to apply to the
parameters of the copy
@type subst: `None` or `Substitution` mapping variables names
to `Value` or `Variable
`
parameters of the copy
mapping variables names to `Value`
or `Variable`
@type subst: `Substitution
`
@return: a copy of the action, substituted by `subst` if not
`None`
@rtype: `Action`
...
...
@@ -233,9 +281,9 @@ class Action (object) :
>>> a
Action('a', True, [Value(3), Value(2)])
@param subst: a substitution to apply to the parameters
@type subst: `Substitution` mapping variables names to `Valu
e`
or `Variable
`
@param subst: a substitution to apply to the parameters
,
mapping variables names to `Value` or `Variabl
e`
@type subst: `Substitution
`
"""
for
i
,
p
in
enumerate
(
self
.
params
)
:
if
isinstance
(
p
,
Variable
)
and
p
.
name
in
subst
:
...
...
@@ -320,16 +368,29 @@ class Action (object) :
return
result
class
MultiAction
(
object
)
:
"""Models a multiset of actions.
"""
def
__init__
(
self
,
actions
)
:
"""
"""The only restriction when building a multiaction is to
avoid putting two conjugated actions in it. Indeed, this may
lead to infinite Petri nets upon synchronisation. For example,
consider two transitions `t1` and `t2` with both `a?()` and
`a!()` in their multiactions. We can synchronise say `a?()`
from `t1` with `a!()` from `t2` yielding a transition whose
multiaction has `a!()` from `t1` and `a?() from `t2` and thus
can be synchronised with `t1` or `t2`, yielding a new
transition with `a?()` and `a!()`, etc. Fortunately, it makes
little sense to let a transition synchronise with itself, so
this situation is simply forbidden.
>>> try : MultiAction([Action('a', True, [Variable('x')]),
... Action('a', False, [Value(2)])])
... except ConstraintError : print(sys.exc_info()[1])
conjugated actions in the same multiaction
@param actions: a collection of
actions with no conjugated
actions in it
@type actions: `
list` of `Action
`
@param actions: a collection of
`Action` instances with no
conjugated
actions in it
@type actions: `
iterable
`
"""
self
.
_actions
=
[]
self
.
_sndrcv
=
{}
...
...
@@ -337,6 +398,7 @@ class MultiAction (object) :
for
act
in
actions
:
self
.
add
(
act
)
__pnmltag__
=
"multiaction"
# apidoc stop
def
__pnmldump__
(
self
)
:
"""
>>> MultiAction([Action('a', True, [Variable('x')]),
...
...
@@ -346,18 +408,12 @@ class MultiAction (object) :
<pnml>...
<multiaction>
<action name="a" send="True">
<variable>
x
</variable>
<variable>x</variable>
</action>
<action name="b" send="False">
<variable>
y
</variable>
<variable>y</variable>
<value>
<object type="int">
2
</object>
<object type="int">2</object>
</value>
</action>
</multiaction>
...
...
@@ -472,14 +528,11 @@ class MultiAction (object) :
@param subst: if not `None`, the substitution to apply to the
copy.
@type subst: `
None` or `
Substitution`
@type subst: `Substitution`
@return: a copy of the multiaction, optionally substituted
@rtype: `MultiAction`
"""
result
=
self
.
__class__
(
act
.
copy
()
for
act
in
self
.
_actions
)
if
subst
is
not
None
:
result
.
substitute
(
subst
)
return
result
return
self
.
__class__
(
act
.
copy
(
subst
)
for
act
in
self
.
_actions
)
def
__contains__
(
self
,
action
)
:
"""Search an action in the multiaction.
...
...
@@ -504,7 +557,7 @@ class MultiAction (object) :
@param action: an complete action, or its name or its name and
send flag
@type action: `Action`
or `str` or `tuple(str, bool)`
@type action: `Action`
@return: `True` if the specified action was found, `False`
otherwise
@rtype: `bool`
...
...
@@ -590,7 +643,19 @@ class MultiAction (object) :
for
action
in
self
.
_actions
:
result
.
update
(
action
.
vars
())
return
result
def
synchronise
(
self
,
other
,
name
)
:
def
names
(
self
)
:
"""Return the set of action names used in the multiaction.
>>> MultiAction([Action('a', True, [Variable('x'), Value(2)]),
... Action('a', True, [Value(3), Variable('y')]),
... Action('b', False, [Variable('x'), Variable('z')])]).names() == set(['a', 'b'])
True
@return: the set of variable names
@rtype: `set` of `str`
"""
return
set
([
action
.
name
for
action
in
self
.
_actions
])
def
synchronise
(
self
,
other
,
name
,
common
,
allnames
)
:
"""Search all the possible synchronisation on an action name with
another multiaction.
...
...
@@ -598,7 +663,6 @@ class MultiAction (object) :
synchronisation a 4-tuple whose components are:
* the sending action that did synchronise, it is already
unified, so the corresponding receiving action is just
the same with the reversed send flag
...
...
@@ -614,43 +678,58 @@ class MultiAction (object) :
... Action('b', False, [Variable('x'), Variable('y')])])
>>> n = MultiAction([Action('a', False, [Variable('w'), Variable('y')]),
... Action('c', False, [Variable('y')])])
>>> for a, x, u, v in m.synchronise(n, 'a') :
... print('
%
s
%
s
%
s
%
s'
%
(str(a), str(x), list(sorted(u.items())), list(sorted(v.items()))))
a!(w,2) [a!(3,y), b?(w,y), c?(a)] [('a', Value(2)), ('x', Variable('w'))] [('a', Value(2)), ('x', Variable('w')), ('y', Variable('a'))]
a!(3,a) [a!(x,2), b?(x,a), c?(a)] [('w', Value(3)), ('y', Variable('a'))] [('w', Value(3)), ('y', Variable('a'))]
>>> _m, _n = m.vars(), n.vars()
>>> for a, x, u, v in m.synchronise(n, 'a', _m & _n, _m | _n) :
... print('
%
s
%
s'
%
(str(a), str(x)))
... print(list(sorted(u.items())))
... print(list(sorted(v.items())))
a!(w,2) [a!(3,d), b?(w,d), c?(2)]
[('x', Variable('w')), ('y', Variable('d'))]
[('x', Variable('w')), ('y', Value(2))]
a!(3,y) [a!(x,2), b?(x,y), c?(y)]
[('d', Variable('y')), ('w', Value(3)), ('y', Variable('d'))]
[('d', Variable('y')), ('w', Value(3))]
@param other: the other multiaction to synchronise with
@type other: `MultiAction`
@param name: the name of the action to synchronise on
@type name: `str`
@param common: the set of names in common on both transitions
@type common: `set`
@param allnames: the set of all names involved in the transitions
@type allnames: `set`
@return: an iterator over the possible synchronisations
@rtype: iterator of `tuple(Action, MultiAction, Substitution,
Substitution)`
"""
renamer
=
Substitution
()
common
=
self
.
vars
()
&
other
.
vars
()
if
len
(
common
)
>
0
:
names
=
WordSet
(
common
)
if
common
:
names
=
WordSet
(
set
(
allnames
)
|
self
.
names
()
|
other
.
names
())
for
var
in
common
:
renamer
+=
Substitution
({
var
:
Variable
(
names
.
fresh
(
add
=
True
))})
for
left
in
(
act
for
act
in
self
.
_actions
if
act
.
name
==
name
)
:
for
right
in
(
act
for
act
in
other
.
_actions
if
act
.
name
==
name
if
act
.
send
!=
left
.
send
)
:
_
right
=
righ
t
.
copy
(
renamer
)
_
left
=
lef
t
.
copy
(
renamer
)
try
:
unifier
=
left
&
_
right
unifier
=
_left
&
right
except
:
continue
_unifier
=
unifier
*
renamer
_self
=
self
-
left
_self
=
self
.
copy
(
renamer
)
-
_left
_self
.
substitute
(
unifier
)
_other
=
other
-
right
_other
.
substitute
(
_unifier
)
yield
left
.
copy
(
unifier
),
_self
+
_other
,
unifier
,
_unifier
_other
.
substitute
(
unifier
)
yield
(
_left
.
copy
(
unifier
),
_self
+
_other
,
unifier
*
renamer
,
unifier
)
@snakes.plugins.plugin
(
"snakes.nets"
)
def
extend
(
module
)
:
class
Transition
(
module
.
Transition
)
:
"""Class `Transition` is extended to allow a keyword argument
`actions` in several of its methods `__init__` and `copy` (to
replace a multiaction upon copy).
"""
# apidoc stop
def
__init__
(
self
,
name
,
guard
=
None
,
**
args
)
:
self
.
actions
=
MultiAction
(
args
.
pop
(
"actions"
,
[]))
module
.
Transition
.
__init__
(
self
,
name
,
guard
,
**
args
)
...
...
@@ -677,18 +756,12 @@ def extend (module) :
<transition id="t">
<multiaction>
<action name="a" send="True">
<variable>
x
</variable>
<variable>x</variable>
</action>
<action name="b" send="False">
<variable>
y
</variable>
<variable>y</variable>
<value>
<object type="int">
2
</object>
<object type="int">2</object>
</value>
</action>
</multiaction>
...
...
@@ -713,6 +786,13 @@ def extend (module) :
return
result
class
PetriNet
(
module
.
PetriNet
)
:
def
synchronise
(
self
,
name
)
:
"""Synchronise the net wrt `name`.
@param name: the action name to be synchronised
@type name: `str`
@return: the synchronised Petri net
@rtype: `PetriNet`
"""
snd
=
[]
rcv
=
[]
for
trans
in
self
.
transition
()
:
...
...
@@ -729,11 +809,17 @@ def extend (module) :
if
(
_snd
.
name
,
_rcv
.
name
)
in
done
:
continue
try
:
new
=
_snd
.
actions
.
synchronise
(
_rcv
.
actions
,
name
)
_s
,
_r
=
_snd
.
vars
(),
_rcv
.
vars
()
new
=
_snd
.
actions
.
synchronise
(
_rcv
.
actions
,
name
,
_s
&
_r
,
_s
|
_r
)
except
ConstraintError
:
continue
for
a
,
m
,
s
,
r
in
new
:
t
=
self
.
_synchronise
(
_snd
,
s
,
_rcv
,
r
,
m
,
a
)
t
=
self
.
_synchronise
(
_snd
,
s
.
restrict
(
_snd
.
vars
()),
_rcv
,
r
.
restrict
(
_rcv
.
vars
()),
m
,
a
)
if
(
name
,
True
)
in
t
.
actions
:
snd
.
append
(
t
)
loop
=
True
...
...
@@ -742,17 +828,14 @@ def extend (module) :
loop
=
True
done
.
add
((
_snd
.
name
,
_rcv
.
name
))
def
_synchronise
(
self
,
snd
,
s
,
rcv
,
r
,
actions
,
sync
)
:
def
_str
(
binding
)
:
return
","
.
join
(
"
%
s=
%
s"
%
i
for
i
in
sorted
(
binding
.
items
()))
collect
=
[]
varset
=
WordSet
()
for
trans
,
subst
in
((
snd
,
s
),
(
rcv
,
r
))
:
new
=
"
%
s
%
s"
%
(
trans
.
name
,
str
(
subst
))
new
=
"
%
s
[
%
s]"
%
(
trans
.
name
,
_
str
(
subst
))
self
.
copy_transition
(
trans
.
name
,
new
)
collect
.
append
(
new
)
new
=
self
.
transition
(
new
)
nv
=
new
.
vars
()
for
v
in
varset
&
nv
:
new
.
substitute
(
Substitution
({
v
:
varset
.
fresh
(
add
=
True
)}))
varset
.
update
(
nv
)
for
var
,
val
in
subst
.
items
()
:
if
isinstance
(
val
,
Variable
)
:
new
.
substitute
(
Substitution
({
var
:
val
.
name
}))
...
...
@@ -766,21 +849,49 @@ def extend (module) :
self
.
remove_output
(
place
.
name
,
new
.
name
)
self
.
add_output
(
place
.
name
,
new
.
name
,
label
.
replace
(
Variable
(
var
),
val
))
merged
=
"(
%
s
%
s+
%
s
%
s)[
%
s]"
%
(
snd
.
name
,
str
(
s
),
rcv
.
name
,
str
(
s
),
str
(
sync
)
.
replace
(
"?"
,
""
)
.
replace
(
"!"
,
""
))
new
.
substitute
(
subst
)
merged
=
(
"
%
s@(
%
s)"
%
(
str
(
sync
)
.
replace
(
"?"
,
""
)
.
replace
(
"!"
,
""
),
"+"
.
join
(
collect
)))
self
.
merge_transitions
(
merged
,
collect
,
actions
=
actions
)
for
name
in
collect
:
self
.
remove_transition
(
name
)
return
self
.
transition
(
merged
)
def
restrict
(
self
,
action
)
:
def
restrict
(
self
,
name
)
:
"""Restrict the net wrt `name`.
@param name: the action name to be synchronised
@type name: `str`
@return: the synchronised Petri net
@rtype: `PetriNet`
"""
removed
=
[
trans
.
name
for
trans
in
self
.
transition
()
if
action
in
trans
.
actions
]
if
name
in
trans
.
actions
]
for
trans
in
removed
:
self
.
remove_transition
(
trans
)
def
scope
(
self
,
action
)
:
self
.
synchronise
(
action
)
self
.
restrict
(
action
)
def
scope
(
self
,
name
)
:
"""Scope the net wrt `name`, this is equivalent to apply
synchronisation followed by restriction on the same
`name`.
@param name: the action name to be synchronised
@type name: `str`
@return: the synchronised Petri net
@rtype: `PetriNet`
"""
self
.
synchronise
(
name
)
self
.
restrict
(
name
)
def
merge_transitions
(
self
,
target
,
sources
,
**
args
)
:
"""Accepts a keyword parameter `actions` to change the
multiaction of the resulting transition. If `actions` is
not given, the multiaction of the new transition is the
sum of the multiactions of the merged transition.
@keyword actions: the multiaction of the transition
resulting from the merge
@type actions: `MultiAction`
"""
actions
=
args
.
pop
(
"actions"
,
None
)
module
.
PetriNet
.
merge_transitions
(
self
,
target
,
sources
,
**
args
)
if
actions
is
None
:
...
...
@@ -791,6 +902,15 @@ def extend (module) :
else
:
self
.
transition
(
target
)
.
actions
=
MultiAction
(
actions
)
def
copy_transition
(
self
,
source
,
targets
,
**
args
)
:
"""Accepts a keyword parameter `actions` to change the
multiaction of the resulting transition. If `actions` is
not given, the multiaction of the new transition is the
the same multiaction as the copied transition.
@keyword actions: the multiaction of the transition
resulting from the copy
@type actions: `MultiAction`
"""
actions
=
args
.
pop
(
"actions"
,
None
)
module
.
PetriNet
.
copy_transition
(
self
,
source
,
targets
,
**
args
)
if
actions
is
None
:
...
...
snakes/utils/apidoc.py
View file @
baa7af9
...
...
@@ -382,7 +382,7 @@ class DocExtract (object) :
%
(
info
[
"type"
]
.
get
(
arg
,
"object"
)
.
strip
(
"`"
),
arg
))
for
kw
,
text
in
sorted
(
info
[
"keyword"
]
.
items
())
:
self
.
writelist
(
"`
%
s`:
%
s"
%
(
kw
,
text
))
self
.
writelist
(
"
keyword
`
%
s`:
%
s"
%
(
kw
,
text
))
if
any
(
k
in
info
for
k
in
(
"return"
,
"rtype"
))
:
if
"return"
in
info
:
self
.
writelist
(
"`return
%
s`:
%
s"
...
...
Please
register
or
login
to post a comment