test_encoder_decoder.py 8.01 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/usr/bin/python
# -*- coding: utf-8 -*-

import argparse
import imp
import labcomm
import math
import os
import re
import struct
import subprocess
import sys
import threading


def labcomm_compile(lc, name, args):
    for lang in [ 'c', 'csharp', 'java', 'python']:
        destdir = 'gen/%s/%s' % (name, lang)
        if not os.path.exists(destdir):
            os.makedirs(destdir)
            pass
        pass
    cmd = args.labcomm.split() + [ 
        "--c=gen/%s/c/%s.c" % (name, name),
        "--h=gen/%s/c/%s.h" % (name, name),
        "--cs=gen/%s/csharp/%s.cs" % (name, name),
        "--python=gen/%s/python/%s.py" % (name, name),
        "--java=gen/%s/java/" % name,
        "--typeinfo=gen/%s/%s.typeinfo" % (name, name),
        lc]
    subprocess.check_call(cmd)
    pass

def get_signatures(path):
    fp, pathname, description = imp.find_module(os.path.basename(path)[0:-3], 
                                                [ os.path.dirname(path) ])
    with fp as fp:
        m = imp.load_module('signatures', fp, pathname, description)
        pass
40
    return map(lambda s: s.signature, m.sample)
41
42
43
44
45
46
47
48

class Test:
    
    def __init__(self, program, signatures):
        self.program = program
        self.signatures = signatures
        pass

49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
    def generate(self, decl):
        if decl.__class__ == labcomm.sample:
            result = []
            for values in self.generate(decl.decl):
                result.append((decl, values))
            return result
    
        elif decl.__class__ == labcomm.struct:
            result = []
            if len(decl.field) == 0:
                result.append({})
            else:
                values1 = self.generate(decl.field[0][1])
                values2 = self.generate(labcomm.struct(decl.field[1:]))
                for v1 in values1:
                    for v2 in values2:
                        v = dict(v2)
                        v[decl.field[0][0]] = v1
                        result.append(v)
            return result
        
        elif decl.__class__ == labcomm.array:
            if len(decl.indices) == 1:
                values = self.generate(decl.decl)
                if decl.indices[0] == 0:
                    lengths = [0, 1, 2]
                else:
                    lengths = [ decl.indices[0] ]
            else:
                values = self.generate(labcomm.array(decl.indices[1:],
                                                     decl.decl))
                if decl.indices[0] == 0:
                    lengths = [1, 2]
                else:
                    lengths = [ decl.indices[0] ]
            result = []
            for v in values:
                for i in lengths:
                    element = []
                    for j in range(i):
                        element.append(v)
                    result.append(element)
            return result
    
        elif decl.__class__ == labcomm.BOOLEAN:
            return [False, True]
    
        elif decl.__class__ == labcomm.BYTE:
            return [0, 127, 128, 255]
    
        elif decl.__class__ == labcomm.SHORT:
            return [-32768, 0, 32767]
    
        elif decl.__class__ == labcomm.INTEGER:
            return [-2147483648, 0, 2147483647]
    
        elif decl.__class__ == labcomm.LONG:
            return [-9223372036854775808, 0, 9223372036854775807]
    
        elif decl.__class__ == labcomm.FLOAT:
            def tofloat(v):
                return struct.unpack('f', struct.pack('f', v))[0]
            return [tofloat(-math.pi), 0.0, tofloat(math.pi)]
    
        elif decl.__class__ == labcomm.DOUBLE:
            return [-math.pi, 0.0, math.pi]
    
        elif decl.__class__ == labcomm.STRING:
            return ['string', u'sträng' ]
    
        elif decl.__class__ == labcomm.SAMPLE:
120
            return self.signatures
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
    
        print>>sys.stderr, decl
        raise Exception("unhandled decl %s" % decl.__class__)

    def uses_refs(self, decls):
        for decl in decls:
            if decl.__class__ == labcomm.sample:
                if self.uses_refs([ decl.decl ]):
                    return True
    
            elif decl.__class__ == labcomm.struct:
                if self.uses_refs([ d for n,d in decl.field ]):
                    return True
        
            elif decl.__class__ == labcomm.array:
                if self.uses_refs([ decl.decl ]):
                    return True

            elif decl.__class__ == labcomm.SAMPLE:
                return True

        return False
        

145
    def run(self):
146
        print>>sys.stderr, 'Testing', self.program
147
148
        p = subprocess.Popen(self.program, 
                             stdin=subprocess.PIPE,
149
150
                             stdout=subprocess.PIPE,
                             stderr=sys.stderr)
151
152
        self.expected = None
        self.failed = False
153
        self.next = threading.Condition()
154
155
        decoder = threading.Thread(target=self.decode, args=(p.stdout,))
        decoder.start()
156
        encoder = labcomm.Encoder(labcomm.StreamWriter(p.stdin))
157
        for signature in self.signatures:
158
159
            encoder.add_decl(signature)
            pass
160
161
        if self.uses_refs(self.signatures):
            for signature in self.signatures:
162
                encoder.add_ref(signature)
163
164
        for signature in self.signatures:
            print>>sys.stderr, "Checking", signature.name,
165
166
            for decl,value in self.generate(signature):
                sys.stderr.write('.')
167
168
                #print name,decl,value,value.__class__
                self.next.acquire()
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
                self.received_value = None
                self.received_decl = None
                encoder.encode(value, decl)
                self.next.wait(2)
                self.next.release()
                if p.poll() != None:
                    print>>sys.stderr, "Failed with:", p.poll()
                    self.failed = True
                elif value != self.received_value:
                    print>>sys.stderr, "Coding error"
                    print>>sys.stderr,value == self.received_value
                    print>>sys.stderr, "Got:     ", self.received_value 
                    print>>sys.stderr, "         ", self.received_decl 
                    print>>sys.stderr, "Expected:", value
                    print>>sys.stderr, "         ", decl
                    self.failed = True
                    
                if self.failed: 
                    if p.poll() == None:
                        p.terminate()
189
190
                    exit(1)
                pass
191
            print>>sys.stderr
192
193
            pass
        p.stdin.close()
194
195
196
        if p.wait() != 0:
            exit(1)
            pass
197
198
199
        pass

    def decode(self, f):
200
        decoder = labcomm.Decoder(labcomm.StreamReader(f))
201
202
203
204
        try:
            while True:
                value,decl = decoder.decode()
                if value != None:
205
206
207
208
209
                    self.next.acquire()
                    self.received_value = value
                    self.received_decl = decl
                    self.expected = None
                    self.next.notify_all()
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
                    self.next.release()
                pass
            pass
        except EOFError:
            pass
        pass

    pass
    
def run(test, signatures):
    t = Test(test, signatures)
    t.run()
    pass

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Run encoding test.')
    class test_action(argparse.Action):
        def __call__(self, parser, namespace, values, option_string=None):
228
229
230
231
232
233
234
            old = list(getattr(namespace, self.dest))
            def strip_slash(s):
                if s.startswith('\\'):
                    return s[1:]
                return s
            old.append(map(strip_slash, values))
            setattr(namespace, self.dest, old)
235
236
237
238
239
240
241
242
243
244
245
246
    parser.add_argument('--signatures')
    parser.add_argument('--test', nargs='*', action=test_action, default=[])

    args = parser.parse_args()
    signatures = get_signatures(args.signatures)
    for test in args.test:
        run(test, signatures)
    exit(0)
    
    for lc in args.lc:
        run(lc, args)
        pass
247
    pass