Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • anders_blomdell/labcomm
  • klaren/labcomm
  • tommyo/labcomm
  • erikj/labcomm
  • sven/labcomm
5 results
Show changes
#!/usr/bin/python
# -*- coding: iso8859-15 -*-
import math
import os
import sys
import re
import traceback
def run_labcomm(base):
if not os.path.exists("gen/java/%s" % base):
os.makedirs("gen/java/%s" % base)
cmd = " ".join([
"java -jar ../labComm.jar",
"--c=gen/%s.c" % base,
"--h=gen/%s.h" % base,
"--python=gen/%s.py" % base,
"--java=gen/java/%s" % base,
"%s.lc" % base
])
print cmd
os.system(cmd)
class hexwriter(object):
def __init__(self):
self.pos = 0
self.ascii = ''
def write(self, data):
for c in data:
if ' ' <= c and c <= '}':
self.ascii += c
else:
self.ascii += '.'
sys.stdout.write("%2.2x " % ord(c))
self.pos += 1
if self.pos >= 15:
sys.stdout.write("%s\n" % self.ascii)
self.pos = 0
self.ascii = ""
def mark(self):
pass
def flush(self):
for i in range(self.pos, 15):
sys.stdout.write(" ")
sys.stdout.write("%s\n" % self.ascii)
self.pos = 0
self.ascii = ""
def generate(decl):
if decl.__class__ == labcomm.sample:
result = []
for values in generate(decl.decl):
result.append((decl, values))
return result
elif decl.__class__ == labcomm.struct:
result = []
if len(decl.field) == 0:
result.append({})
else:
values1 = generate(decl.field[0][1])
values2 = generate(labcomm.struct(decl.field[1:]))
for v1 in values1:
for v2 in values2:
v = dict(v2)
v[decl.field[0][0]] = v1
result.append(v)
return result
elif decl.__class__ == labcomm.array:
if len(decl.indices) == 1:
values = generate(decl.decl)
if decl.indices[0] == 0:
lengths = [0, 1, 2]
else:
lengths = [ decl.indices[0] ]
else:
values = generate(labcomm.array(decl.indices[1:],decl.decl))
if decl.indices[0] == 0:
lengths = [1, 2]
else:
lengths = [ decl.indices[0] ]
result = []
for v in values:
for i in lengths:
element = []
for j in range(i):
element.append(v)
result.append(element)
return result
elif decl.__class__ == labcomm.BOOLEAN:
return [False, True]
elif decl.__class__ == labcomm.BYTE:
return [-128, 0, 127]
elif decl.__class__ == labcomm.SHORT:
return [-32768, 0, 32767]
elif decl.__class__ == labcomm.INTEGER:
return [-2147483648, 0, 2147483647]
elif decl.__class__ == labcomm.LONG:
return [-9223372036854775808, 0, 9223372036854775807]
elif decl.__class__ == labcomm.FLOAT:
return [-math.pi, 0.0, math.pi]
elif decl.__class__ == labcomm.DOUBLE:
return [-math.pi, 0.0, math.pi]
elif decl.__class__ == labcomm.STRING:
return ['string',
'teckenstrng'.decode("iso8859-15")]
print decl
raise Exception("unhandled decl %s" % decl.__class__)
if __name__ == "__main__":
sys.path.insert(0, "../lib/python/")
import labcomm
print os.getcwd(), sys.argv
if not os.path.exists("gen"):
os.makedirs("gen")
files = sys.argv[1:]
print sys.argv, files
if len(files) == 0:
files = os.listdir(".")
for f in files:
m = re.match("(.*).lc", f)
if m:
run_labcomm(m.group(1))
sys.path.insert(0, "gen")
for f in files:
m = re.match("(.*).lc", f)
if m:
print f
h = hexwriter()
encoder = labcomm.Encoder(h)
signatures = []
i = __import__(m.group(1))
for k in dir(i):
v = getattr(i, k)
try:
s = v.signature
except:
s = None
if s:
signatures.append(s)
encoder.add_decl(s)
for s in signatures:
for e in generate(s):
encoder.encode(e[1], s)
h.flush()
......@@ -9,4 +9,4 @@ sample struct {
int a;
int b;
} an_int_struct;
sample void a_void;
\ No newline at end of file
sample void a_void;
#!/usr/bin/python
# -*- coding: utf-8 -*-
import argparse
import imp
import labcomm2014
import math
import os
import re
import struct
import subprocess
import sys
import threading
def labcomm_compile(lc, name, args):
for lang in [ 'c', 'csharp', 'java', 'python']:
destdir = 'gen/%s/%s' % (name, lang)
if not os.path.exists(destdir):
os.makedirs(destdir)
pass
pass
cmd = args.labcomm.split() + [
"--c=gen/%s/c/%s.c" % (name, name),
"--h=gen/%s/c/%s.h" % (name, name),
"--cs=gen/%s/csharp/%s.cs" % (name, name),
"--python=gen/%s/python/%s.py" % (name, name),
"--java=gen/%s/java/" % name,
"--typeinfo=gen/%s/%s.typeinfo" % (name, name),
lc]
subprocess.check_call(cmd)
pass
def get_signatures(path):
fp, pathname, description = imp.find_module(os.path.basename(path)[0:-3],
[ os.path.dirname(path) ])
with fp as fp:
m = imp.load_module('signatures', fp, pathname, description)
pass
return map(lambda s: s.signature, m.sample)
class Test:
def __init__(self, program, signatures):
self.program = program
self.signatures = signatures
pass
def generate(self, decl):
if decl.__class__ == labcomm2014.sample:
result = []
for values in self.generate(decl.decl):
result.append((decl, values))
return result
elif decl.__class__ == labcomm2014.typedef:
result = []
for values in self.generate(decl.decl):
result.append(values)
return result
elif decl.__class__ == labcomm2014.struct:
result = []
if len(decl.field) == 0:
result.append({})
else:
values1 = self.generate(decl.field[0][1])
values2 = self.generate(labcomm2014.struct(decl.field[1:]))
for v1 in values1:
for v2 in values2:
v = dict(v2)
v[decl.field[0][0]] = v1
result.append(v)
return result
elif decl.__class__ == labcomm2014.array:
if len(decl.indices) == 1:
values = self.generate(decl.decl)
if decl.indices[0] == 0:
lengths = [0, 1, 2]
else:
lengths = [ decl.indices[0] ]
else:
values = self.generate(labcomm2014.array(decl.indices[1:],
decl.decl))
if decl.indices[0] == 0:
lengths = [1, 2]
else:
lengths = [ decl.indices[0] ]
result = []
for v in values:
for i in lengths:
element = []
for j in range(i):
element.append(v)
result.append(element)
return result
elif decl.__class__ == labcomm2014.BOOLEAN:
return [False, True]
elif decl.__class__ == labcomm2014.BYTE:
return [0, 127, 128, 255]
elif decl.__class__ == labcomm2014.SHORT:
return [-32768, 0, 32767]
elif decl.__class__ == labcomm2014.INTEGER:
return [-2147483648, 0, 2147483647]
elif decl.__class__ == labcomm2014.LONG:
return [-9223372036854775808, 0, 9223372036854775807]
elif decl.__class__ == labcomm2014.FLOAT:
def tofloat(v):
return struct.unpack('f', struct.pack('f', v))[0]
return [tofloat(-math.pi), 0.0, tofloat(math.pi)]
elif decl.__class__ == labcomm2014.DOUBLE:
return [-math.pi, 0.0, math.pi]
elif decl.__class__ == labcomm2014.STRING:
return ['string', u'sträng' ]
elif decl.__class__ == labcomm2014.SAMPLE:
return self.signatures
print>>sys.stderr, decl
raise Exception("unhandled decl %s" % decl.__class__)
def uses_refs(self, decls):
for decl in decls:
if decl.__class__ == labcomm2014.sample:
if self.uses_refs([ decl.decl ]):
return True
elif decl.__class__ == labcomm2014.struct:
if self.uses_refs([ d for n,d in decl.field ]):
return True
elif decl.__class__ == labcomm2014.array:
if self.uses_refs([ decl.decl ]):
return True
elif decl.__class__ == labcomm2014.SAMPLE:
return True
return False
def run(self):
print>>sys.stderr, 'Testing', self.program
p = subprocess.Popen(self.program,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=sys.stderr)
self.expected = None
self.failed = False
self.next = threading.Condition()
decoder = threading.Thread(target=self.decode, args=(p.stdout,))
decoder.start()
encoder = labcomm2014.Encoder(labcomm2014.StreamWriter(p.stdin))
for signature in self.signatures:
encoder.add_decl(signature)
pass
if self.uses_refs(self.signatures):
for signature in self.signatures:
encoder.add_ref(signature)
for signature in self.signatures:
print>>sys.stderr, "Checking", signature.name,
for decl,value in self.generate(signature):
sys.stderr.write('.')
self.next.acquire()
self.received_value = None
self.received_decl = None
encoder.encode(value, decl)
self.next.wait(2)
self.next.release()
if p.poll() != None:
print>>sys.stderr, "Failed with:", p.poll()
self.failed = True
elif value != self.received_value or decl != self.received_decl:
print>>sys.stderr, "Coding error"
print>>sys.stderr, value == self.received_value
print>>sys.stderr, "Got: ", self.received_value
print>>sys.stderr, " ", self.received_decl
print>>sys.stderr, "Expected:", value
print>>sys.stderr, " ", decl
self.failed = True
if self.failed:
if p.poll() == None:
p.terminate()
exit(1)
pass
print>>sys.stderr
pass
p.stdin.close()
if p.wait() != 0:
exit(1)
pass
pass
def decode(self, f):
decoder = labcomm2014.Decoder(labcomm2014.StreamReader(f))
try:
while True:
value,decl = decoder.decode()
if value != None:
self.next.acquire()
self.received_value = value
self.received_decl = decl
self.expected = None
self.next.notify_all()
self.next.release()
pass
pass
except EOFError:
pass
pass
pass
def run(test, signatures):
t = Test(test, signatures)
t.run()
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Run encoding test.')
class test_action(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
old = list(getattr(namespace, self.dest))
def strip_slash(s):
if s.startswith('\\'):
return s[1:]
return s
old.append(map(strip_slash, values))
setattr(namespace, self.dest, old)
parser.add_argument('--signatures')
parser.add_argument('--test', nargs='*', action=test_action, default=[])
args = parser.parse_args()
signatures = get_signatures(args.signatures)
for test in args.test:
run(test, signatures)
exit(0)
for lc in args.lc:
run(lc, args)
pass
pass
#!/usr/bin/python
import sys
import argparse
import subprocess
# returns true if test fails
def test_labcomm_compile_OK(lc, args):
cmd = args.labcomm.split() + [lc]
try:
res = subprocess.check_call(cmd)
print "sucess!"
return False
except subprocess.CalledProcessError as ex:
print ex.output
print ex.returncode
return True
def test_labcomm_compile_fail(lc, args):
cmd = args.labcomm.split() + [lc]
try:
res = subprocess.check_call(cmd)
print "failed! (should have produced an error)"
return True
except subprocess.CalledProcessError as ex:
print "sucess!"
print ex.output
print ex.returncode
return False;
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Run test of error messages.')
parser.add_argument('--labcomm');
parser.add_argument('--testOK', nargs='*', default=[])
parser.add_argument('--testNOK', nargs='*', default=[])
args = parser.parse_args()
fail = False;
for test in args.testOK:
fail = fail or test_labcomm_compile_OK(test, args)
pass
for test in args.testNOK:
fail = fail or test_labcomm_compile_fail(test, args)
pass
if fail:
print "*** fail ***"
else:
print "*** success ***"
#!/usr/bin/python
# -*- coding: utf-8 -*-
import argparse
import imp
import labcomm2014
import math
import os
import re
import struct
import subprocess
import sys
import threading
def labcomm_compile(lc, name, args):
for lang in [ 'c', 'csharp', 'java', 'python']:
destdir = 'gen/%s/%s' % (name, lang)
if not os.path.exists(destdir):
os.makedirs(destdir)
pass
pass
cmd = args.labcomm.split() + [
"--c=gen/%s/c/%s.c" % (name, name),
"--h=gen/%s/c/%s.h" % (name, name),
"--cs=gen/%s/csharp/%s.cs" % (name, name),
"--python=gen/%s/python/%s.py" % (name, name),
"--java=gen/%s/java/" % name,
"--typeinfo=gen/%s/%s.typeinfo" % (name, name),
lc]
subprocess.check_call(cmd)
pass
def get_signatures(path):
fp, pathname, description = imp.find_module(os.path.basename(path)[0:-3],
[ os.path.dirname(path) ])
with fp as fp:
m = imp.load_module('signatures', fp, pathname, description)
pass
return map(lambda s: s.signature, m.sample)
class Test:
def __init__(self, program, signatures):
self.program = program
self.signatures = map(lambda s: s.rename('prefix:%s:suffix' % s.name),
signatures)
pass
def generate(self, decl):
if decl.__class__ == labcomm2014.sample:
result = []
for values in self.generate(decl.decl):
result.append((decl, values))
return result
elif decl.__class__ == labcomm2014.typedef:
result = []
for values in self.generate(decl.decl):
result.append(values)
return result
elif decl.__class__ == labcomm2014.struct:
result = []
if len(decl.field) == 0:
result.append({})
else:
values1 = self.generate(decl.field[0][1])
values2 = self.generate(labcomm2014.struct(decl.field[1:]))
for v1 in values1:
for v2 in values2:
v = dict(v2)
v[decl.field[0][0]] = v1
result.append(v)
return result
elif decl.__class__ == labcomm2014.array:
if len(decl.indices) == 1:
values = self.generate(decl.decl)
if decl.indices[0] == 0:
lengths = [0, 1, 2]
else:
lengths = [ decl.indices[0] ]
else:
values = self.generate(labcomm2014.array(decl.indices[1:],
decl.decl))
if decl.indices[0] == 0:
lengths = [1, 2]
else:
lengths = [ decl.indices[0] ]
result = []
for v in values:
for i in lengths:
element = []
for j in range(i):
element.append(v)
result.append(element)
return result
elif decl.__class__ == labcomm2014.BOOLEAN:
return [False, True]
elif decl.__class__ == labcomm2014.BYTE:
return [0, 127, 128, 255]
elif decl.__class__ == labcomm2014.SHORT:
return [-32768, 0, 32767]
elif decl.__class__ == labcomm2014.INTEGER:
return [-2147483648, 0, 2147483647]
elif decl.__class__ == labcomm2014.LONG:
return [-9223372036854775808, 0, 9223372036854775807]
elif decl.__class__ == labcomm2014.FLOAT:
def tofloat(v):
return struct.unpack('f', struct.pack('f', v))[0]
return [tofloat(-math.pi), 0.0, tofloat(math.pi)]
elif decl.__class__ == labcomm2014.DOUBLE:
return [-math.pi, 0.0, math.pi]
elif decl.__class__ == labcomm2014.STRING:
return ['string', u'sträng' ]
elif decl.__class__ == labcomm2014.SAMPLE:
return self.signatures
print>>sys.stderr, decl
raise Exception("unhandled decl %s" % decl.__class__)
def uses_refs(self, decls):
for decl in decls:
if decl.__class__ == labcomm2014.sample:
if self.uses_refs([ decl.decl ]):
return True
elif decl.__class__ == labcomm2014.struct:
if self.uses_refs([ d for n,d in decl.field ]):
return True
elif decl.__class__ == labcomm2014.array:
if self.uses_refs([ decl.decl ]):
return True
elif decl.__class__ == labcomm2014.SAMPLE:
return True
return False
def run(self):
print>>sys.stderr, 'Testing', self.program
p = subprocess.Popen(self.program,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=sys.stderr)
self.expected = None
self.failed = False
self.next = threading.Condition()
decoder = threading.Thread(target=self.decode, args=(p.stdout,))
decoder.start()
encoder = labcomm2014.Encoder(labcomm2014.StreamWriter(p.stdin))
for signature in self.signatures:
encoder.add_decl(signature)
pass
if self.uses_refs(self.signatures):
for signature in self.signatures:
encoder.add_ref(signature)
for signature in self.signatures:
print>>sys.stderr, "Checking", signature.name,
for decl,value in self.generate(signature):
sys.stderr.write('.')
self.next.acquire()
self.received_value = None
self.received_decl = None
encoder.encode(value, decl)
self.next.wait(2)
self.next.release()
if p.poll() != None:
print>>sys.stderr, "Failed with:", p.poll()
self.failed = True
elif value != self.received_value or decl != self.received_decl:
print>>sys.stderr, "Coding error"
print>>sys.stderr, value == self.received_value
print>>sys.stderr, "Got: ", self.received_value
print>>sys.stderr, " ", self.received_decl
print>>sys.stderr, "Expected:", value
print>>sys.stderr, " ", decl
self.failed = True
if self.failed:
if p.poll() == None:
p.terminate()
exit(1)
pass
print>>sys.stderr
pass
p.stdin.close()
if p.wait() != 0:
exit(1)
pass
pass
def decode(self, f):
decoder = labcomm2014.Decoder(labcomm2014.StreamReader(f))
try:
while True:
value,decl = decoder.decode()
if value != None:
self.next.acquire()
self.received_value = value
self.received_decl = decl
self.expected = None
self.next.notify_all()
self.next.release()
pass
pass
except EOFError:
pass
pass
pass
def run(test, signatures):
t = Test(test, signatures)
t.run()
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Run encoding test.')
class test_action(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
old = list(getattr(namespace, self.dest))
def strip_slash(s):
if s.startswith('\\'):
return s[1:]
return s
old.append(map(strip_slash, values))
setattr(namespace, self.dest, old)
parser.add_argument('--signatures')
parser.add_argument('--test', nargs='*', action=test_action, default=[])
args = parser.parse_args()
signatures = get_signatures(args.signatures)
for test in args.test:
run(test, signatures)
exit(0)
for lc in args.lc:
run(lc, args)
pass
pass
#include <labcomm.h>
#include <labcomm_fd_reader_writer.h>
#include "gen/simple.h"
void simple_an_int_handler(simple_an_int *v,
void *context)
{
struct labcomm_encoder *e = context;
labcomm_encode_simple_an_int(e, v);
}
int main(int argc, char *argv[]) {
struct labcomm_decoder *d;
struct labcomm_encoder *e;
labcomm_decoder_register_simple_an_int(d, simple_an_int_handler, e);
return 0;
}
File deleted
File deleted
File deleted
#!/usr/bin/env python
import argparse
import labcomm2014
import sys
import time
class Reader(object):
def __init__(self, file_):
self._file = file_
def read(self, count):
data = self._file.read(count)
if len(data) == 0:
raise EOFError()
return data
def mark(self, value, decl):
pass
class FollowingReader(Reader):
def __init__(self, file_, interval, timeout):
super(FollowingReader, self).__init__(file_)
self._interval = interval
self._timeout = timeout
def read(self, count):
data = ''
t_start = time.time()
while len(data) < count:
tmp = self._file.read(count - len(data))
if tmp:
data += tmp
else:
time.sleep(self._interval)
if self._timeout and time.time() - t_start > self._timeout:
raise EOFError()
return data
def flatten(sample, _type):
if isinstance(_type, labcomm2014.sample):
flatten(sample, _type.decl)
elif isinstance(_type, labcomm2014.array):
for e in sample:
flatten(e, _type.decl)
elif isinstance(_type, labcomm2014.struct):
for name, decl in _type.field:
flatten(sample[name], decl)
elif isinstance(_type, labcomm2014.BOOLEAN):
print "%d," % sample,
elif isinstance(_type, labcomm2014.STRING):
print "\"%s\"," % sample,
elif isinstance(_type, labcomm2014.primitive):
print "%s," % sample,
else:
raise Exception("Unhandled type. " + str(type(type_)) + " " + str(type_))
def flatten_labels(_type, prefix=""):
if isinstance(_type, labcomm2014.sample):
flatten_labels(_type.decl, _type.name)
elif isinstance(_type, labcomm2014.array):
if len(_type.indices) != 1:
raise Exception("Fix multidimensional arrays")
if len(_type.indices) == 0:
raise Exception("We dont't handle dynamical sizes yet %s" % _type)
for i in range(0, _type.indices[0]):
flatten_labels(_type.decl, prefix + "[%d]" % i)
elif isinstance(_type, labcomm2014.struct):
for name, decl in _type.field:
flatten_labels(decl,
prefix + "." + name)
elif isinstance(_type, labcomm2014.primitive):
print '"%s",' % prefix,
else:
raise Exception("Unhandled type. " + str(type(type_)) + " " + str(type_))
def default(type_):
if isinstance(type_, labcomm2014.sample):
return default(type_.decl)
elif isinstance(type_, labcomm2014.array):
if len(type_.indices) != 1:
raise Exception("Fix multidimensional arrays")
if len(type_.indices) == 0:
raise Exception("We dont't handle dynamical sizes yet %s" % type_)
for i in range(0, type_.indices[0]):
return [default(type_.decl) for _ in range(type_.indices[0])]
elif isinstance(type_, labcomm2014.struct):
return {name: default(decl) for name, decl in type_.field}
elif isinstance(type_, labcomm2014.STRING):
return ''
elif isinstance(type_, labcomm2014.BOOLEAN):
return False
elif (isinstance(type_, labcomm2014.FLOAT) or
isinstance(type_, labcomm2014.DOUBLE)):
return float('NaN')
elif (isinstance(type_, labcomm2014.BYTE) or
isinstance(type_, labcomm2014.SHORT) or
isinstance(type_, labcomm2014.INTEGER) or
isinstance(type_, labcomm2014.LONG)):
return 0
else:
raise Exception("Unhandled type. " + str(type(type_)) + " " + str(type_))
def dump(sample, _type):
for k in sorted(_type.keys()):
flatten(sample[k], _type[k])
print
def dump_labels(type_):
for k in sorted(type_.keys()):
flatten_labels(type_[k])
print
def defaults(current, type_):
for k in sorted(type_.keys()):
if k not in current:
current[k] = default(type_[k])
def main(main_args):
parser = argparse.ArgumentParser()
parser.add_argument('elc', type=str, help="The log file.")
parser.add_argument('-f', '--follow', action='store_true',
help="find all registrations that already "
"exist, then watch the file for changes. All "
"future registrations are ignored (because "
"the header has already been written).")
parser.add_argument('-s', '--interval', action="store", type=float,
default=0.040,
help="time to sleep between failed reads. Requires -f.")
parser.add_argument('-t', '--timeout', action="store", type=float,
help="timeout to terminate when no changes are detected. "
"Requires -f.")
parser.add_argument('-w', '--no-default-columns', action="store_true",
help="Do not fill columns for which there is no "
"data with default values. Wait instead until at least "
"one sample has arrived for each registration.")
parser.add_argument('-a', '--trigger-all', action="store_true",
help="Output one line for each sample instead of for "
"each sample of the registration that has arrived with "
"the highest frequency.")
args = parser.parse_args(main_args)
n_samples = {} # The number of received samples for each sample reg.
current = {} # The most recent sample for each sample reg.
type_ = {} # The type (declaration) of each sample reg.
file_ = open(args.elc)
if args.follow:
reader = FollowingReader(file_, args.interval, args.timeout)
else:
reader = Reader(file_)
d = labcomm2014.Decoder(reader)
while True:
try:
o, t = d.decode()
if o is None:
n_samples[t.name] = 0
type_[t.name] = t
else:
n_samples[t.name] += 1
current[t.name] = o
break
except EOFError:
break
dump_labels(type_)
if not args.no_default_columns:
defaults(current, type_)
n_rows = 0
while True:
try:
o, t = d.decode()
if o is None:
continue
current[t.name] = o
n_samples[t.name] += 1
if len(current) < len(type_):
continue
if args.trigger_all:
dump(current, type_)
else:
if n_samples[t.name] > n_rows:
n_rows = n_samples[t.name]
dump(current, type_)
except EOFError:
break
if __name__ == "__main__":
main(sys.argv[1:])
#!/usr/bin/python
import sys
import imp
import subprocess
import os
import labcomm2014
TRANSLATE={
labcomm2014.SHORT() : 'short',
labcomm2014.DOUBLE() : 'double'
}
def compile_lc(lc):
p = subprocess.Popen([ 'labcomm2014', '--python=/dev/stdout', lc],
stdout=subprocess.PIPE)
src = p.stdout.read()
code = compile(src, 'lc_import', 'exec')
mod = sys.modules.setdefault('lc_import', imp.new_module('lc_import'))
exec code in mod.__dict__
import lc_import
return (lc_import.typedef, lc_import.sample)
def gen_binding(decl, lc_prefix, prefix, suffix):
if isinstance(decl, labcomm2014.sample):
if isinstance(decl.decl, labcomm2014.typedef):
print "%(n1)s = coder.cstructname(%(n1)s, '%(lc)s_%(n2)s')" % dict(
n1=decl.name, n2=decl.decl.name, lc=lc_prefix)
else:
print "%(n1)s = coder.cstructname(%(n1)s, '%(lc)s_%(n2)s')" % dict(
n1=decl.name, n2=decl.name, lc=lc_prefix)
gen_binding(decl.decl, lc_prefix, '%s.' % decl.name, suffix)
elif isinstance(decl, labcomm2014.typedef):
print "%(p)s%(s)s = coder.cstructname(%(p)s%(s)s, '%(lc)s_%(n)s')" % dict(
n=decl.name, lc=lc_prefix, p=prefix, s=suffix)
gen_binding(decl.decl, lc_prefix, prefix, suffix)
elif isinstance(decl, labcomm2014.array):
raise Exception("Array unhandled")
elif isinstance(decl, labcomm2014.struct):
for n, d in decl.field:
gen_binding(d, lc_prefix, '%sFields.%s' % (prefix, n), suffix)
elif isinstance(decl, labcomm2014.primitive):
pass
else:
raise Exception("Unhandled type. %s", decl)
def gen_sample(lc_prefix, decl):
if isinstance(decl, labcomm2014.sample):
print "%s = " % decl.name,
gen_sample(lc_prefix, decl.decl)
print
gen_binding(decl, lc_prefix, '', '')
elif isinstance(decl, labcomm2014.typedef):
# Expand in place
gen_sample(lc_prefix, decl.decl)
elif isinstance(decl, labcomm2014.array):
raise Exception("Array unhandled")
elif isinstance(decl, labcomm2014.struct):
print "struct(..."
for n, d in decl.field:
print "'%s, " % n,
gen_sample(lc_prefix, d)
print ")..."
elif isinstance(decl, labcomm2014.primitive):
print "%s(0), ..." % TRANSLATE[decl]
else:
raise Exception("Unhandled type. %s", decl)
"""
robtarget = struct(...
'orientation', ...
struct(...
'q1', double(0), ...
'q2', double(0), ...
'q3', double(0), ...
'q4', double(0) ...
), ...
'translation', ...
struct(...
'x', double(0), ...
'y', double(0), ...
'z', double(0) ...
), ...
'configuration', ...
struct(...
'cf1', int16(0), ...
'cf4', int16(0), ...
'cf6', int16(0), ...
'cfx', int16(0) ...
) ...
);
robtarget_types = coder.typeof(robtarget);
act = coder.cstructname(robtarget_types, 'egm_pb2lc_robtarget');
act.Fields.translation = coder.cstructname(act.Fields.translation, 'egm_pb2lc_cartesian');
act.Fields.orientation = coder.cstructname(act.Fields.orientation, 'egm_pb2lc_quaternion');
act.Fields.configuration = coder.cstructname(act.Fields.configuration, 'egm_pb2lc_confdata');
"""
def gen_matlab(lc):
lc_prefix = os.path.basename(lc).split('.')[0]
typedef, sample = compile_lc(lc)
for s in sample:
gen_sample(lc_prefix, s.signature)
if __name__ == '__main__':
for lc in sys.argv[1:]:
gen_matlab(lc)