-
Notifications
You must be signed in to change notification settings - Fork 1
/
ttcn3_debuger
executable file
·285 lines (233 loc) · 7.14 KB
/
ttcn3_debuger
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
#!/usr/bin/python
#
# this is first crude atempt automating mctr debuger
# This scripts wraps around mctr and provides telnet-like server
#
# https://github.com/eclipse/titan.core/tree/master/mctr2/mctr
# (similar to ttcn3_start)
#
####################################################
import pty,sys,os,time,thread,select,threading,socket
from subprocess import Popen
####################################################
HOST = '' # Symbolic name, meaning all available interfaces
PORT = 6666 # Arbitrary non-privileged port
PROMPT = 'MC2> ' # mctr prompt
logFileName = 'ttcn3_debug.log'
####################################################
logFile = open(logFileName, "wb")
def log(msg,stdout):
d=time.time()
logFile.write(str(d)+": "+msg+"\n")
logFile.flush()
if stdout:
print msg
def log_input(msg): log("<<"+msg,False)
def log_output(msg): log(">>"+msg,False)
##################################################
class RequestStatus():
idle = 0
sending = 1
readResponse = 2
class Request():
def __init__(self):
self.REQUEST=None
self.RSP=None
self.onResponse=threading.Event()
self.ResponseTimeout=10
self.status=RequestStatus.idle
def send(self,request):
self.onResponse.clear()
self.RSP=""
self.REQUEST=request
self.status=RequestStatus.sending
succes=self.onResponse.wait(self.ResponseTimeout)#blocking
log("Request"+str(succes),False)
self.status=RequestStatus.idle
self.onResponse.clear()
self.REQUEST=None
return (succes,self.RSP)
def end(self):
self.onResponse.set()
self.status=RequestStatus.idle
def write(self,msg):
self.RSP=self.RSP+msg
request=Request()
########################################################################
ETS=""
CONF_FILE=""
MCTR_HOST='127.0.0.1'
MCTR_PORT=0
WELCOME_MSG=""
class MCTRstatus():
init=0
wait4ETS=1
cmtc=2
enableDebuger=3
run=10
exit=-1
#TODO:
# seperate mctr logic from this function
# "breakpoint reached at"
# $log off
# $doutput both
#
def stdout_read(fd):
global request,ETS,MCTR_PORT,WELCOME_MSG,PROMPT
status=MCTRstatus.init
log('stdout_read start',False)
i=0#idle time counter
while True:
r, w, e = select.select([ fd ], [], [], 0.1)
if fd in r:
data = os.read(fd, 4096)
i=0
if not data:
log("read null",False)
continue
if request.status==RequestStatus.readResponse:
log(request.REQUEST+">>"+data,False)
request.write(data)
if data.endswith(PROMPT):
log("request: responed '"+request.RSP+"'",False)
request.end()
else:
log_output(data)
if status==MCTRstatus.init:
WELCOME_MSG=WELCOME_MSG+data
l=data.find('Listening on TCP port ')
if l!=-1:
l=l+len('Listening on TCP port ')
MCTR_PORT=data[l:-1]
log("found port"+MCTR_PORT,False)
# spawn $ETS $hostname $port
pid = Popen([ETS, MCTR_HOST, MCTR_PORT]).pid
log("spawning ETS PIS:%s"%pid,False)
status=MCTRstatus.wait4ETS
elif status==MCTRstatus.wait4ETS:
if data.find('New HC connected from')!=-1:
log("ETS ready!"+MCTR_PORT,False)
log_input("cmtc")
os.write(fd,"cmtc"+"\n")
status=MCTRstatus.cmtc
elif status==MCTRstatus.cmtc:
if data.find('MTC is created.')!=-1:
log("MTC ready!"+MCTR_PORT,False)
log_input("debug on")
os.write(fd,"debug on"+"\n")
status=MCTRstatus.enableDebuger
elif status==MCTRstatus.enableDebuger:
if data.find('Debugger switched on.')!=-1:
log("debuger ready!"+MCTR_PORT,False)
status=MCTRstatus.run
elif status==MCTRstatus.run:
if data.find('Shutdown complete.')!=-1:
status=MCTRstatus.exit
break
else:
log("WTF?! unknow state",False)
else:
i=i+1
if i%300==0:
log("tick status"+str(status),False)
if request.status==RequestStatus.readResponse and i==10:
log("request: abort/timeout '"+request.RSP+"'",False)
request.end()
if status==MCTRstatus.exit:
break
if request.status==RequestStatus.sending:
request.status=RequestStatus.readResponse
log_input(request.REQUEST)
os.write(fd,request.REQUEST+"\n")
# if request.REQUEST=='exit'):
# break
i=0
log('stdout_read end',False)
sys.exit(0)
return
########################################################################
def processRequest(data,conn):
global request
log("new request '%s'"%data,False)
ret=request.send(data)
#log("request: end (data:'%s ret:%s,%s)'"%(data,ret[0],ret[1]),False)
if ret[0]==False:
conn.sendall("ERROR: timeout!\n")
else:
conn.sendall(ret[1])
return
def clientHandler(conn):
global PROMPT
conn.send('Welcome to the server. Type something and hit enter\n') #send only takes string a
# conn.send(WELCOME_MSG)
conn.send(PROMPT+' ')
while True:
#Receiving from client
data = conn.recv(1024)
if not data: break
data=str(data).strip()
if len(data)==0: break
for line in data.split("\n"):
processRequest(data.strip(),conn)
if data=="exit":
return False
return True#continue tcp server
def tcpServerThread_start():
global s
log('tcpServerThread start',False)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
log('Socket created',True)
#Bind socket to local host and port
try:
s.bind((HOST, PORT))
except socket.error as msg:
log('Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1],True)
return False
t=threading.Thread (name="tcpServerThread",target=tcpServerThread)
t.start()
return t
def tcpServerThread():
log('Socket bind PORT:%s complete'%PORT,True)
s.listen(1)
log('Socket now listening at %d'%PORT,False)
while True:
#wait to accept a connection - blocking call
conn, addr = s.accept()
who=""+addr[0] + ':' + str(addr[1])
log('tcpServerThread connected with ' + who,False)
try:
if not clientHandler(conn):
break
conn.close()
except Exception as e:
log("tcpServerThread Exception "+str(e)+" for "+who,False)
log("tcpServerThread disconected from "+who,False)
log('tcpServerThread end',False)
return
if __name__ == "__main__":
####################################################################
if not len(sys.argv)!=3 or \
not os.path.isfile(sys.argv[1]) or \
not os.path.isfile(sys.argv[2]) or not sys.argv[2].endswith('.cfg'):
log('%s must have ETS (Arg1): executable (Arg2): config. Optionally [Arg3]: deguger batch file'%str(sys.argv[0]),True)
sys.exit()
ETS=sys.argv[1]
CONFIG_FILE=sys.argv[2]
TTCN3_DIR=os.environ.get('TTCN3_DIR')
if TTCN3_DIR==None:
log('enviroment var "TTCN3_DIR" is missing',True)
sys.exit()
MCTR=TTCN3_DIR+"/bin/mctr";
if not os.path.isfile(MCTR):
log(MCTR,' is missing',True)
sys.exit()
###############################################################
log("start",True)
t=tcpServerThread_start()
if not t:
sys.exit()
log("starting mctr",True)
pty.spawn([MCTR,CONFIG_FILE], stdout_read)
log("mctr done!",True)
t.join()
s.close()