Skip to content
Snippets Groups Projects
Commit a427fdc2 authored by Anders Blomdell's avatar Anders Blomdell
Browse files

Initial commit

parents
No related branches found
No related tags found
No related merge requests found
*~
__pycache__
#!/usr/bin/python3
import asyncio
class FSM:
def __init__(self, initial, states):
self.current = initial
self.states = states
self.terminated = False
@asyncio.coroutine
def run():
self.current.enter()
while True:
if self.terminated:
return False
else:
yield True
next = self.current.trigger()
# print("### %s -> %s" % (self.current, next))
if next != None:
self.current.leave()
if self.terminated:
continue
self.current = next
self.current.enter()
if self.terminated:
continue
self.run = run()
def trig(self):
return self.run.send(None)
def terminate(self):
self.terminated = True
#!/usr/bin/python3
from .parser import parser
fsm = parser.fsm
initial_state = parser.initial_state
state = parser.state
enter = parser.enter
leave = parser.leave
transition = parser.transition
#!/usr/bin/python3
#
# Requires Python >= 3.3 due to use of __qualname__
#
import threading
from . import FSM
class State:
def __init__(self, guard):
self.f = guard.f
self.name = self.f.__name__
self._enter = None
self._leave = None
self._transitions = []
def init(self, fsm):
self.f(fsm)
def add_transition(self, t):
self._transitions.append(t)
def set_enter(self, f):
assert self._enter == None, "at most one @enter per state"
self._enter = f
def enter(self):
if self._enter:
self._enter()
def set_leave(self, f):
assert self._leave == None, "at most one @leave per state"
self._leave = f
def leave(self):
if self._leave:
self._leave()
def trigger(self):
for transition in self._transitions:
trigged, next = transition()
assert(isinstance(trigged, bool))
assert(isinstance(next, State))
if trigged:
return next
return None
def __call__(self, *args, **kwars):
raise Exception("States should only be accessed with \n"
" .activate\n"
" .connect\n"
" .disconnect\n")
def __repr__(self):
return 'State(%s)' % (self.name)
class FSMParser:
class Guard:
def __init__(self, f):
self.f = f
self.name = f.__qualname__
def __hash__(self):
return self.name.__hash__()
def __eq__(self, other):
return self.name == other.name
def __repr__(self):
return 'Guard(%s [File "%s", line %d])' % (self.name,
self.f.__code__.co_filename,
self.f.__code__.co_firstlineno)
def __init__(self):
self._initial = list()
self._guard = object()
self._states = set()
self._mutex = threading.Lock()
def initial_state(self, f):
""" Collect the single initial state for this FSM """
g = self.state(f)
with self._mutex:
# Stack needed in case of nested FSM's
self._initial.append(g)
return g
def state(self, f):
""" Transform state functions to a Guard object """
g = self.Guard(f)
with self._mutex:
if g in self._states:
raise Exception('%s already defined %s' %
(self._states[g.name], g))
self._states.add(g)
return g
def transition(self, t):
""" Add transition to state """
if self._state:
self._state.add_transition(t)
return None
def enter(self, e):
""" Add enter function to state """
if self._state:
self._state.set_enter(e)
return None
def leave(self, l):
""" Add leave function to state """
if self._state:
self._state.set_leave(l)
return None
def fsm(self, klass):
""" Transform the decorated class into a FSM """
with self._mutex:
g_states = set([ s for s in self._states if
s.name.startswith(klass.__qualname__ + '.') ])
self._states -= g_states
# Declaration time sanity check of FSM
assert(self._initial)
assert(self._initial[-1] in g_states)
g_initial = self._initial.pop()
g_states -= set([g_initial])
self._state = None
# Declaration sanity check
for s in g_states:
try:
s.f(None)
except:
raise Exception('Static sanity check failed for %s' % s)
# Run time FSM init function
def wrap(*args, **kwargs):
initial = State(g_initial)
states = set(map(State, g_states))
states.add(initial)
instance = klass(*args, **kwargs)
with self._mutex:
for s in states:
self._state = s
s.init(instance)
setattr(instance, s.name, s)
self._state = None
f = FSM.FSM(initial, states)
assert not hasattr(instance, 'fsm')
setattr(instance, 'fsm', f)
return instance
# Prepare for more FSM declarations
return wrap
parser = FSMParser()
test.py 0 → 100755
#!/usr/bin/python3
import fsm
import asyncio
@fsm.fsm
class Y:
def __init__(self, label, M, N):
self.label = label
self.M = M
self.N = N
self.m = 0
self.n = 0
def trig(self):
return self.fsm.trig()
@fsm.initial_state
def a(self):
@fsm.enter
def _():
print("Enter %s.a %d %d" % (self.label, self.m, self.n))
self.m += 1
@fsm.transition
def _():
return self.m < self.M, self.a
@fsm.transition
def _(): return True, self.b
@fsm.leave
def _():
print("Leave %s.a %d %d" % (self.label, self.m, self.n))
@fsm.state
def b(self):
@fsm.enter
def _():
self.m = 0
self.n += 1
@fsm.transition
def _():
return False,self.b
@fsm.transition
def _(): return True,self.a
@fsm.leave
def _():
if self.n > self.N:
print("Done:", self.label)
self.fsm.terminate()
@asyncio.coroutine
def tick(fsm, h):
while True:
fsm.trig()
yield from asyncio.sleep(h)
if __name__ == '__main__':
el = loop = asyncio.get_event_loop()
y1 = Y('y1', 5, 2)
y2 = Y('y2', 2, 5)
el.create_task(tick(y1, 0.1))
el.create_task(tick(y2, 0.5))
tasks = [
asyncio.async(tick(y1, 0.1)),
asyncio.async(tick(y2, 0.5))
]
loop.run_until_complete(asyncio.wait(tasks))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment