-
Notifications
You must be signed in to change notification settings - Fork 4
/
pep.py
executable file
·1308 lines (1073 loc) · 64.6 KB
/
pep.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import sys
import time
import math
import errno
import random
import select
import struct
import socket
import logging
import argparse
from pickle import dumps
from collections import deque
from ctypes import sizeof, c_int, c_ubyte, string_at, byref, POINTER, cast
from pystreamc import DEC_ALLOC, parameters, streamc
from channel import CH_READ, CH_WRITE, OpenTcpListenChannel, OpenUdpChannel, OpenInConnChannel, OpenOutConnChannel, PollChannels, CloseChannel
from protocol import *
import cProfile
class InfoQueue :
""" The information queue for sent packets,
divided into two queues: source queue and repair queue.
"""
def __init__(self) :
self.sourceQueue = []
self.repairQueue = []
def Add(self, pktType, pktId, sendTime, anotherPktNum, delivered, firstSentTime, deliveredTime) :
pktInfo = PacketInfo(pktId, pktType, sendTime, anotherPktNum, delivered, firstSentTime, deliveredTime)
if pktInfo.pktType == PacketInfoType['SOURCE_PACKET'] :
self.sourceQueue.append(pktInfo)
else :
self.repairQueue.append(pktInfo)
def Find(self, pktType, pktId) :
""" Binary search
To obtain the information of the specified id of packets when they were sent out,
"""
pos = -1
resultInfo = None
if pktType == PacketInfoType['SOURCE_PACKET'] :
left = 0
right = len(self.sourceQueue) - 1
while left <= right :
mid = int(left + (right - left) / 2)
if self.sourceQueue[mid].pktId < pktId :
left = mid + 1
elif self.sourceQueue[mid].pktId > pktId :
right = mid - 1
else :
pos = mid
resultInfo = self.sourceQueue[mid]
break
if pos >= 0 :
self.sourceQueue = self.sourceQueue[pos + 1 : ]
else :
left = 0
right = len(self.repairQueue) - 1
while left <= right :
mid = int(left + (right - left) / 2)
if self.repairQueue[mid].pktId < pktId :
left = mid + 1
elif self.repairQueue[mid].pktId > pktId :
right = mid - 1
else :
pos = mid
resultInfo = self.repairQueue[mid]
break
if pos >= 0 :
self.repairQueue = self.repairQueue[pos + 1 : ]
return resultInfo
def GetSourceQueueSize(self) :
return len(self.sourceQueue)
def GetRepairQueueSize(self) :
return len(self.repairQueue)
class BwQueue :
def __init__(self, maxSize) :
self.m_size = maxSize
self.m_BwQueue = deque()
self.m_BwMaxQueue = deque()
def GetMaxBw(self) :
return self.m_BwMaxQueue[0]
def Insert(self, newBw) :
self.m_BwQueue.append(newBw)
if len(self.m_BwQueue) > self.m_size :
if self.m_BwQueue[0] == self.m_BwMaxQueue[0] :
self.m_BwMaxQueue.popleft()
self.m_BwQueue.popleft()
while len(self.m_BwMaxQueue) != 0 and newBw > self.m_BwMaxQueue[-1] :
self.m_BwMaxQueue.pop()
self.m_BwMaxQueue.append(newBw)
class MaxBwFilter :
def __init__(self, roundPeriod) :
self.m_roundPeriod = roundPeriod
self.m_estTime = deque()
self.m_BwQueue = deque()
self.m_BwMaxQueue = deque()
def GetMaxBw(self) :
return self.m_BwMaxQueue[0]
def IsEmpty(self) :
return (not len(self.m_BwQueue))
def Insert(self, newEstTime, newBw) :
self.m_estTime.append(newEstTime)
self.m_BwQueue.append(newBw)
while self.m_estTime[0] <= newEstTime - self.m_roundPeriod :
if self.m_BwQueue[0] == self.m_BwMaxQueue[0] :
self.m_BwMaxQueue.popleft()
self.m_estTime.popleft()
self.m_BwQueue.popleft()
while len(self.m_BwMaxQueue) != 0 and newBw > self.m_BwMaxQueue[-1] :
self.m_BwMaxQueue.pop()
self.m_BwMaxQueue.append(newBw)
class pepApp :
def __init__(self) :
# Sockets
self.m_tcpListener = None
self.m_udpSocket = None
# Self address and peer PEPesc's address
self.m_selfAddress = None
self.m_peerAddress = None
# Statistics the size of the transmitted data
self.m_totalDataSentSize = 0 # byte
self.m_totalDataRecvSize = 0 # byte
self.m_rejectConnectionNum = 0
# Connection with peer PEPesc
self.m_peerOnline = False
self.m_handShakeTimes = 0
self.m_lastHandShakeTime = 0.0
self.m_selfClose = False
self.m_selfPreClose = False
# HeartBeat
self.m_lastResponseTime = 0.0
self.m_heartBeatTimes = 0
self.m_lastHeartBeatTime = 0.0
# Flags
self.m_detailFlag = False
self.m_maxAllowedBw = None
self.m_constBw = None
# Channels for non-blocking IO
self.m_channels = {} # Every TCP channel only serves one TCP connection,format:{channel id : channel}
self.m_tcpListenChid = -1 # Channel's id used by tcpListener
self.m_udpChid = -1 # Channel's id used by udpSocket
self.m_findChidForPEPescToSend = {} # 存储TCP连接使用的channel的id,格式:{(from ip, from port ,to ip, to port) : channel id}
self.m_tcpSentBufLen = {} # 存储每个neighbor已enqueue TCP数据长度,当TCP Client断开连接时,通知对端PEPesc此时的长度,对端接收并发送完整长度TCP数据至TCP Server后,关闭连接。格式:{(from ip, from port ,to ip, to port) : int}
self.m_tcpRecvBufLen = {} # 存储每个neighbor已接收TCP数据长度,对端PEPesc通知该连接即将关闭后,在发送完整TCP数据后,关闭与TCP Server的连接。格式:{(from ip, from port ,to ip, to port) : int}
self.m_linkToBeClosed = {} # 存储将要被关闭的连接中对端PEPesc通知的TCP数据长度,当发送完整TCP数据后,即刻关闭。格式:{(from ip, from port ,to ip, to port) : int}
# Waiting for a connection to be established
self.m_tcpReceiverWaiting = {}
self.m_tcpSenderWaiting = {}
# Parameters used for streaming coding
self.m_cp = parameters()
self.m_cp.gfpower = 8
self.m_cp.pktsize = 0
self.m_cp.repfreq = 0.0 # This Sc parameter is not used in PEP, where the frequency of sending repair packet is determine by TimeToSendRepairPacket
self.m_cp.seed = 0
# initialize encoder and decoder
self.m_enc = streamc.initialize_encoder(byref(self.m_cp), None, 0)
self.m_dec = streamc.initialize_decoder(byref(self.m_cp))
# Parameters used for repair packets selective sending
self.m_lastSentSourceTime = 0.0
self.m_lastSentRepairTime = 0.0
self.m_newDataIdleState = False
self.m_idleStateChangeTime = -1.0
self.m_numSentRepairExcludeIdle = 0
self.m_numSentRepairAfterIdle = 0
self.m_idleCanSendRepairCount = 0
self.m_duplicatedInorder = False
self.m_lastStuckInorder = -1
self.m_numSentRepairAfterStuck = 0
self.m_numSentRepairAfterRtt = 0
# Detect consecutive packet loss and avoid again
self.m_burstAvoidPeriod = 3.0
self.m_lastBurstTime = 0.0
self.m_lastRecvSourceId = -1
self.m_lastRecvRepairId = -1
# number of current encoded source packets
self.m_currentMaxSourceId = -1
self.m_lastStreamcQueueSize = 0
# for sending ack
self.m_latestRecvPktType = -1
self.m_latestRecvSourceNum = 0 # decoder目前收到的源分组数目
self.m_latestRecvRepairNum = 0 # decoder目前收到的修复分组数目
self.m_numLastAcked = 0 # 上一次反馈ACK时收到的分组数
self.m_inorderNext = 0
self.m_inorderAckId = 0
self.m_lastDataAckSendTime = 0
self.m_numRecvSinceLastSourceAck = 0
self.m_inorderAckPacketSize = InorderACK().getPackedSize()+PepHeaderLength
# for receiving ack
self.m_inorderAck = InorderACK()
self.m_lastAckedSourceId = -1
self.m_lastAckedRepairId = -1
self.m_lastAckedInorderId = -1
self.m_lastAckedSourceNum = 0
self.m_lastAckedRepairNum = 0
self.m_lastSentSourceId = -1
self.m_lastSentRepairId = -1
self.m_lastAckedPacketSentTime = 0
# CWND Adaption
self.m_initCWnd = 10
self.m_cWnd = 10
self.m_packetsInFlight = 0
# rtt estimation
self.m_rtt = 0.0
self.m_rttMin = 1e6
self.m_lastDecSuccTime = 0.0 # Record the time when the client notified the last successful decoding, which is used to ignore the RTT estimation of some packets
# bandwidth estimation
self.m_useJersyFlag = True
self.m_estBw = 0.0 # estimated end-to-end bandwidth (in pkt/sec)
self.m_estBwMax = 0.0
self.m_maxBwFilter = MaxBwFilter(BwWindowPeriod)
self.m_lastAckTime = -1.0 # time of receiving last ACK (in sec.)
self.m_lastFirstSentTime = -1.0 # sent time of last acked packet (in sec.)
#self.m_bwWindowLength = 10
#self.m_maxBwFilter = BwQueue(self.m_bwWindowLength)
#self.m_acksSinceLastEst = 0
#self.m_nRecvSinceLastEst = 0
#self.m_lastBwEstTime = 0.0
#self.m_lastEstBw = 0
# loss rate estimation
self.m_lossRate = 0.0 # estimated transmission loss rate based on ACKs
# pacing
self.m_pacing = True#False
self.m_pacingRate = 5 * 1024 * 1024
self.m_pacingTimer = 0.0 # !< Pacing Event
self.m_lastPacketSentTime = 0
# bookkeeping information for sent not-yet acked packets
self.m_pktInfoQueue = InfoQueue()
# active bandwidth probe
self.m_activeProbeBw = True
self.m_lastProbedTime = -1
self.m_probeValidity = False
self.m_probeBw = 0
self.m_probePacketSentTimes = []
self.m_firstProbeArriveTime = 0
self.m_lastProbeArrivedId = -1
def SetAttribute(self, args, packetSize) :
self.m_selfAddress = (args.selfIp, args.selfPort)
self.m_peerAddress = (args.peerIp, args.peerPort)
self.m_cp.pktsize = packetSize
self.m_detailFlag = args.detail
self.m_activeProbeBw = not args.deactivateProbeBw#False if args.maxBw else not args.deactivateProbeBw
self.m_maxAllowedBw = float(args.maxBw[0:-4]) / PacingGain * 1024 * 1024 / (ScPacketSize * 8) if args.maxBw else None
self.m_constBw = float(args.ConstBw[0:-4]) * 1024 * 1024 / (ScPacketSize * 8) if args.ConstBw else None
self.m_useJersyFlag = False if args.bwEstMethod == "BBR" else True
self.m_tcpListener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.m_tcpListener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.m_tcpListener.setsockopt(socket.SOL_IP, socket.IP_TRANSPARENT, 1)
self.m_tcpListener.bind(("0.0.0.0", self.m_selfAddress[1]))
self.m_tcpListener.listen(128)
self.m_udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.m_udpSocket.bind((self.m_selfAddress[0], self.m_selfAddress[1]))
self.m_tcpListenChid = OpenTcpListenChannel(self.m_channels, self.m_tcpListener)
self.m_udpChid = OpenUdpChannel(self.m_channels, self.m_udpSocket)
return
def HandleLog(self, logLevel, logType, log, detailFlag=False) :
if logLevel == 'DEBUG' :
logging.debug(logType + log)
elif logLevel == 'INFO' :
logging.info(logType + log)
elif logLevel == 'WARNING' :
logging.warning(logType + log)
elif logLevel == 'ERROR' :
logging.error(logType + log)
if detailFlag :
print("[%s][%s:%d] %s." % (time.strftime('%Y-%m-%d %X',time.localtime()), self.m_selfAddress[0], self.m_selfAddress[1], log))
def EnqueuePackets(self, msg, tcpSourceAddr=None, tcpDestinationAddr=None, contents=b"") :
if msg != ScProtectedMsg['TCP_RAW_DATA'] :
scPayLoad = SCPayload(msg, tcpSourceAddr, tcpDestinationAddr, contents)
buf = (c_ubyte * self.m_cp.pktsize).from_buffer_copy(scPayLoad.packed()) # class bytes to ctypes.c_ubyte_Array
streamc.enqueue_packet(self.m_enc, self.m_currentMaxSourceId+1, buf)
self.m_currentMaxSourceId += 1
logging.debug("[EncoderStatus] headsid: %d tailsid: %d nextsid: %d" \
% (self.m_enc.contents.headsid, self.m_enc.contents.tailsid, self.m_enc.contents.nextsid))
else :
num = math.ceil(len(contents) / MsgDataMaxLength)
for i in range(num) :
tcpRawData = contents[i * MsgDataMaxLength : (i+1) * MsgDataMaxLength] # class 'bytes'
scPayLoad = SCPayload(msg, tcpSourceAddr, tcpDestinationAddr, tcpRawData)
buf = (c_ubyte * self.m_cp.pktsize).from_buffer_copy(scPayLoad.packed()) # class bytes to ctypes.c_ubyte_Array
streamc.enqueue_packet(self.m_enc, self.m_currentMaxSourceId+1, buf)
self.m_currentMaxSourceId += 1
logging.debug("[EncoderStatus] headsid: %d tailsid: %d nextsid: %d" \
% (self.m_enc.contents.headsid, self.m_enc.contents.tailsid, self.m_enc.contents.nextsid))
def RttEstimation(self, receiveTime, sendTime) :
if sendTime == -1 :
return
alpha = 0.9
historyRtt = self.m_rtt
newRtt = receiveTime - sendTime
if historyRtt == 0 :
self.m_rtt = newRtt
self.m_rttMin = newRtt
else :
self.m_rtt = alpha * historyRtt + (1 - alpha) * newRtt # smoothed RTT estimation (follow standard TCP, Karn's algorithm is not needed since no retransmission is incurred)
self.m_rttMin = min(self.m_rttMin, newRtt) # update minimum rtt observed (which should be close to the true propagation delay)
def BwEstimationJersy(self, numAcked) :
# TCP Jersy's TSW algorithm
currentTime = time.time()
ackInterval = currentTime - self.m_lastAckTime
if self.m_lastAckTime == -1.0 :
# The first data ack, the interval cannot be calculated, no estimation is made
self.m_lastAckTime = currentTime
return
self.m_estBw = (self.m_rtt * self.m_estBw + numAcked) / (ackInterval + self.m_rtt)
self.m_maxBwFilter.Insert(currentTime , self.m_estBw)
log = "[Jersy-ABE] ackInterval: %f" % ackInterval
log += " numAcked: %d" % numAcked
log += " Estimated-BW: %f" % self.m_estBw
log += " Current-Max-Bw: %f" % self.m_maxBwFilter.GetMaxBw()
logging.debug(log)
def BwEstimationBBR(self, delivered, ackElapsed, sendElapsed) :
# BBR's delivery rate estimation algorithm
# https://datatracker.ietf.org/doc/html/draft-cheng-iccrg-delivery-rate-estimation
deliveryElapsed = max(ackElapsed, sendElapsed)
self.m_estBw = delivered / deliveryElapsed # pkts/sec.
self.m_maxBwFilter.Insert(time.time() , self.m_estBw)
log = "[BBR-ABE] delivered: %d ackElapsed: %f sendElapsed: %f Estimated-BW: %f Current-Max-Bw: %f" \
% (delivered, ackElapsed, sendElapsed, self.m_estBw, self.m_maxBwFilter.GetMaxBw())
logging.debug(log)
def PeEstimation(self, nTotalLoss, nTotalSent) :
alpha = 0.9
new_lossRate = nTotalLoss / nTotalSent
if self.m_lossRate == 0 :
self.m_lossRate = new_lossRate
else :
self.m_lossRate = self.m_lossRate * alpha + new_lossRate * (1 - alpha)
def UpdateCwnd(self) :
cWndGain = CwndGain #if self.m_useJersyFlag else 1.2
# Update estimated maximum bandwidth
if self.m_maxBwFilter.IsEmpty() :
self.m_estBwMax = self.m_probeBw * 0.8
else :
self.m_estBwMax = self.m_maxBwFilter.GetMaxBw()
# Update CWND
if self.m_constBw :
cWndPreset = self.m_constBw * self.m_rttMin * cWndGain
elif self.m_maxAllowedBw :
cWndPreset = min(self.m_maxAllowedBw, self.m_estBwMax) * self.m_rttMin * cWndGain
else :
cWndPreset = self.m_estBwMax * self.m_rttMin * cWndGain
self.m_cWnd = math.floor(max(10.0, cWndPreset))
self.m_pacing = True if self.m_cWnd > 10 else False
#self.m_pacing = False if self.m_constBw else self.m_pacing
# Update pacing rate
if self.m_pacing == True and self.m_cWnd > self.m_packetsInFlight :
self.m_pacingRate = self.m_constBw * ScPacketSize if self.m_constBw \
else self.m_estBwMax * ScPacketSize * PacingGain
def SendDataAck(self) :
inorderAck = InorderACK(self.m_inorderAckId, self.m_dec.contents.inorder, self.m_latestRecvSourceNum, self.m_latestRecvRepairNum,
self.m_latestRecvPktType, self.m_lastRecvSourceId, self.m_lastRecvRepairId)
rpkt = PepPacket(PepHeader(PepPacketType['SC_DATA_ACK']), inorderAck.packed())
self.m_udpSocket.sendto(rpkt.packed(), self.m_peerAddress)
self.m_inorderAckId += 1
self.m_lastDataAckSendTime = time.time()
self.m_numLastAcked = self.m_latestRecvSourceNum + self.m_latestRecvRepairNum
logging.debug("[SendDataAck] Send data ACK %s" % inorderAck)
def RecvDataAck(self, pkt) :
self.m_inorderAck.parse(pkt.body)
inorder, nsource, nrepair = self.m_inorderAck.inorder, self.m_inorderAck.nsource, self.m_inorderAck.nrepair
latestRecvPktType, latestRecvSourceId, latestRecvRepairId = self.m_inorderAck.latestRecvPktType, self.m_inorderAck.latestRecvSourceId, self.m_inorderAck.latestRecvRepairId
latestRecvPktId = latestRecvSourceId if latestRecvPktType == PacketInfoType['SOURCE_PACKET'] else latestRecvRepairId
log = "[RecvDataAck] current inorder: %d [nsource, nrepair] = [ %d , %d ] ACKed inorder: %d [nsource, nrepair] = [ %d , %d ]" % (self.m_lastAckedInorderId,
self.m_lastAckedSourceNum,
self.m_lastAckedRepairNum,
inorder, nsource, nrepair)
# If the following three ACK messages are the same as the last time,
# it means that the state of the receiving end has not changed,
# so there is no need to estimate the information, and do not print.
if self.m_lastAckedInorderId == inorder and self.m_lastAckedSourceNum == nsource and self.m_lastAckedRepairNum == nrepair :
return
# Discard the ACK if the reported numbers are out-dated. This can happen if there are re-ordering of ACK packets over the network
if inorder < self.m_lastAckedInorderId or nsource < self.m_lastAckedSourceNum or nrepair < self.m_lastAckedRepairNum :
return
# A source packet is lost, and the inorder of the peer PEPesc is stuck.
# Prepare to send quantitative repair packets for minor compensation.
if self.m_lastAckedInorderId == inorder \
and self.m_lastStuckInorder != inorder \
and latestRecvPktType == PacketInfoType['SOURCE_PACKET'] :
self.m_duplicatedInorder = True
self.m_lastStuckInorder = inorder
resultInfo = self.m_pktInfoQueue.Find(latestRecvPktType, latestRecvPktId)
sendTime, otherTypePacketNum = resultInfo.sendTime, resultInfo.anotherPktNum
deliveredAsOfSend, firstSentTime, deliveredTime = resultInfo.delivered, resultInfo.firstSentTime, resultInfo.deliveredTime
recvAckTime = time.time()
# Available bandwidth estimation
#if sendTime > self.m_idleStateChangeTime :
#if sendTime - self.m_lastAckedPacketSentTime <= 2 * self.CalculateBytesTxTime() :
if self.m_useJersyFlag :
numAcked = nsource + nrepair - self.m_lastAckedSourceNum - self.m_lastAckedRepairNum
self.BwEstimationJersy(numAcked)
else :
delivered = nsource+nrepair-deliveredAsOfSend
ackElapsed = recvAckTime-deliveredTime
sendElapsed = sendTime-firstSentTime
self.BwEstimationBBR(delivered, ackElapsed, sendElapsed)
self.m_lastAckTime = recvAckTime
self.m_lastFirstSentTime = sendTime
self.m_lastAckedPacketSentTime = sendTime
self.m_lastAckedInorderId = inorder
self.m_lastAckedSourceNum = nsource
self.m_lastAckedRepairNum = nrepair
# Only perform RTT estimation for packets whose sending time is later than the successful decoding time
if sendTime >= self.m_lastDecSuccTime :
self.RttEstimation(recvAckTime, sendTime)
# Pe estimation
# TODO:收到out-of-order分组的ACK时不应进行丢包率估计 (totalLoss和in-flight无法判断)
sourceSentCount, repairSentCount = 0, 0 # The number of two types of packets sent by the sender at the moment of the latest packet received by the sending client
nTotalLoss = 0
if latestRecvPktType == PacketInfoType['SOURCE_PACKET'] :
pktTypeStr = "SOURCE"
sourceSentCount = latestRecvPktId + 1
repairSentCount = otherTypePacketNum
else :
pktTypeStr = "REPAIR"
repairSentCount = latestRecvPktId + 1
sourceSentCount = otherTypePacketNum
nTotalLoss = sourceSentCount + repairSentCount - nsource - nrepair
self.PeEstimation(nTotalLoss, sourceSentCount + repairSentCount)
# Update packets-in-flight and cWnd
self.m_lastAckedSourceId = latestRecvSourceId
self.m_lastAckedRepairId = latestRecvRepairId
oldPacketsInFlight = self.m_packetsInFlight
self.m_packetsInFlight = (self.m_lastSentSourceId - self.m_lastAckedSourceId + self.m_lastSentRepairId - self.m_lastAckedRepairId) * (1 - self.m_lossRate)
self.UpdateCwnd()
if inorder >= 0 and inorder < self.m_currentMaxSourceId :
streamc.flush_acked_packets(self.m_enc, inorder)
log += " latest ACKed %s packet of ID: %d" % (pktTypeStr, latestRecvPktId)
logging.debug(log)
log = "[Estimation] BandWidth: %f" % self.m_estBw
log += " (pkts./sec.) RTT: %f" % (self.m_rtt * 1000)
log += " (ms) LossRate: %f" % self.m_lossRate
log += " Current cWnd: %d (pkts.)" % self.m_cWnd
log += " packetsInFlight: %d" % self.m_packetsInFlight
log += " totalLoss: %d" % nTotalLoss
log += " maxBw: %f." % self.m_estBwMax
log += " totalSourceSent: %d" % sourceSentCount
log += " totalRepairSent: %d" % repairSentCount
log += " minRTT: %f" % self.m_rttMin
log += " oldPacketsInFlight: %d" % oldPacketsInFlight
logging.debug(log)
# Record current encoder status
logging.debug("[UpdatedEncoderStatusOnAck] headsid: %d tailsid: %d nextsid: %d" \
% (self.m_enc.contents.headsid, self.m_enc.contents.tailsid, self.m_enc.contents.nextsid))
"""
# Record the arrival time when the packet acknowledges the receipt of the ACK
if pktInfo.pktType == PacketInfoType['SOURCE_PACKET'] :
log = "[ %f ]" % recvAckTime + "[RecvAck] pepSender receives SOURCE packet ACK %d" % pktInfo.pktId
else :
log = "[ %f ]" % recvAckTime + "[RecvAck] pepSender receives REPAIR packet ACK %d" % pktInfo.pktId
logging.info(log)
"""
# Calculates the transmission time at this data rate.
def CalculateBytesTxTime(self) :
return ScPacketSize / self.m_pacingRate
def SendDataPackets(self) :
# Continue to send data packets if there is data and cWnd allows
while self.m_cWnd > self.m_packetsInFlight :
currentTime = time.time()
# if pacing, sending is controlled by pacingTimer
if self.m_pacing == True and self.m_cWnd != self.m_initCWnd :
remainTime = currentTime - self.m_lastPacketSentTime - self.m_pacingTimer
if remainTime < 0 :
break
else :
self.m_pacingTimer = 0
# Output packet
cpkt = None
# Decide whether to send repair packet
if self.TimeToSendRepairPacket() == True :
if random.uniform(0, 1) < 0.95 :
cpkt = streamc.output_repair_packet_short(self.m_enc, 128)
else :
cpkt = streamc.output_repair_packet(self.m_enc)
# Decide whether to send source packet
elif self.m_lastSentSourceId < self.m_currentMaxSourceId and cpkt == None :
cpkt = streamc.output_source_packet(self.m_enc)
if cpkt == None :
break
# Serialize packets and do SC-UDP encapsulation
pktstr = streamc.serialize_packet(self.m_enc, cpkt) # class ctypes.LP_c_ubyte
pp = string_at(pktstr, self.m_cp.pktsize + 4 * sizeof(c_int)) # class 'bytes'
pkt = PepPacket(PepHeader(PepPacketType['SC_PROTECTED_PKT']), pp)
#pkt = PepPacket(PepHeader(PepPacketType['SC_PROTECTED_PKT']), cpkt.contents.serialize(self.m_cp.pktsize))
# Send the scpkt and record the sending time
self.m_udpSocket.sendto(pkt.packed(), self.m_peerAddress)
sendTime = currentTime
self.m_lastPacketSentTime = sendTime
# Record the packet sending time and other corresponding status values
log = ""
if cpkt.contents.sourceid != -1 :
log = "[SendDataPacket] Send SOURCE packet %d" % cpkt.contents.sourceid
self.m_pktInfoQueue.Add(PacketInfoType['SOURCE_PACKET'], cpkt.contents.sourceid, sendTime, self.m_enc.contents.rcount, self.m_lastAckedSourceNum+self.m_lastAckedRepairNum, self.m_lastFirstSentTime, self.m_lastAckTime)
self.m_lastSentSourceId = cpkt.contents.sourceid
self.m_lastSentSourceTime = currentTime
else :
log = "[SendDataPacket] Send REPAIR packet %d" % cpkt.contents.repairid
self.m_pktInfoQueue.Add(PacketInfoType['REPAIR_PACKET'], cpkt.contents.repairid, sendTime, self.m_enc.contents.nextsid, self.m_lastAckedSourceNum+self.m_lastAckedRepairNum, self.m_lastFirstSentTime, self.m_lastAckTime)
self.m_lastSentRepairId = cpkt.contents.repairid
self.m_lastSentRepairTime = currentTime
log += " Idle State: True" if self.m_newDataIdleState else " Idle State: False"
logging.debug(log)
# Record current encoder status
logging.debug("[EncoderStatus] headsid: %d tailsid: %d nextsid: %d" \
% (self.m_enc.contents.headsid, self.m_enc.contents.tailsid, self.m_enc.contents.nextsid))
# Free the packet and pktstr, then count in-flight
streamc.free_packet(cpkt)
streamc.free_serialized_packet(pktstr)
self.m_packetsInFlight += 1
# Set the next sending time according to the pacing rate
if self.m_pacing == True :
if self.m_pacingTimer == 0 :
# print("Current Pacing Rate %f." % self.m_pacingRate)
# print("Timer is in expired state, activate it %f." % self.CalculateBytesTxTime())
self.m_pacingTimer = self.CalculateBytesTxTime()
break
currentStreamcQueueSize = self.m_currentMaxSourceId - self.m_lastAckedSourceId
if currentStreamcQueueSize != self.m_lastStreamcQueueSize :
logging.debug("[StreamcQueueSize] %d" % currentStreamcQueueSize)
self.m_lastStreamcQueueSize = currentStreamcQueueSize
def TimeToSendRepairPacket(self) :
# If all currently existing source packets are sent, PEPesc will be marked as idle
# and use 'self.m_numSentRepairAfterIdle' to count repair packets sent.
# Until new data arrives, pepesc will get out of the idle state
# and use 'self.m_numSentRepairExcludeIdle' to count repair packets sent.
currentTime = time.time()
if self.m_lastSentSourceId == self.m_currentMaxSourceId :
if self.m_newDataIdleState == False :
self.m_idleStateChangeTime = currentTime
self.m_newDataIdleState = True
else :
if self.m_newDataIdleState == True :
self.m_idleStateChangeTime = currentTime
self.m_numSentRepairAfterIdle = 0
self.m_idleCanSendRepairCount = 0
self.m_newDataIdleState = False
# The inoder of the peer PEPesc is stuck, send repair packets earlier when the source packets are under-saturated.
if self.m_duplicatedInorder == True :
if self.m_numSentRepairAfterStuck < 2 :
self.m_numSentRepairAfterStuck += 1
return True
else :
self.m_duplicatedInorder = False
self.m_numSentRepairAfterStuck = 0
if self.m_newDataIdleState :
# heuristic
if self.m_numSentRepairAfterIdle < 1 :
self.m_idleCanSendRepairCount += 1
if self.m_idleCanSendRepairCount == round (1 / (self.m_lossRate + ExtraRepairRate)) :
self.m_numSentRepairAfterIdle += 1
return True
# If the current last source packet has been sent,
# and the Ack has not been received within the past min rtt, send repair packets
if currentTime - max(self.m_lastSentSourceTime, self.m_lastSentRepairTime) >= self.m_rttMin :
return True
else :
# Repair the target insertion frequency of packets so that the expected mean decoding delay is 1/repairExcess packets
targetRepairFreq = self.m_lossRate + ExtraRepairRate
# Calculate the current required repair packet insertion frequency
currentRepairFreq = self.m_numSentRepairExcludeIdle / (self.m_lastSentSourceId+1 + self.m_numSentRepairExcludeIdle) if self.m_lastSentSourceId >= 0 else 1
if currentRepairFreq < targetRepairFreq and self.m_enc.contents.headsid < self.m_enc.contents.nextsid - 1 :
self.m_numSentRepairExcludeIdle += 1
return True
else :
return False
return False
def RecvDataPackets(self, pkt) :
buf = cast(pkt.body, POINTER(c_ubyte))
rpkt = streamc.deserialize_packet(self.m_dec, buf)
#rpkt.deserialize(buf, self.m_cp.pktsize)
receiveTime = time.time()
outOrderRecv = False
if rpkt.contents.sourceid != -1 :
self.m_latestRecvSourceNum += 1
self.m_numRecvSinceLastSourceAck += 1
self.m_latestRecvPktType = PacketInfoType['SOURCE_PACKET']
# reject the received source packet
if rpkt.contents.sourceid <= self.m_dec.contents.inorder :
logging.warning("[RecvDataPacket] Received out-dated source packet: %d current inorder: %d" % (rpkt.contents.sourceid, self.m_dec.contents.inorder))
outOrderRecv = True
return
if rpkt.contents.sourceid < self.m_dec.contents.win_e :
logging.warning("[RecvDataPacket] Out-of-order source packet %d received, inorder: %d, win_e: %d" % (rpkt.contents.sourceid,
self.m_dec.contents.inorder,
self.m_dec.contents.win_e))
outOrderRecv = True
else :
self.m_latestRecvRepairNum += 1
self.m_latestRecvPktType = PacketInfoType['REPAIR_PACKET']
if rpkt.contents.repairid < self.m_lastRecvRepairId :
outOrderRecv = True
oldState = self.m_dec.contents.active
oldInorder = self.m_dec.contents.inorder
streamc.receive_packet(self.m_dec, rpkt)
# streamc.free_packet(rpkt)
newState = self.m_dec.contents.active
newInorder = self.m_dec.contents.inorder
log = "[DecoderStatus] inorder: %d" % self.m_dec.contents.inorder
log += " SOURCE packet %d" % rpkt.contents.sourceid if rpkt.contents.sourceid != -1 else " REPAIR packet %d" % rpkt.contents.repairid
if rpkt.contents.repairid != -1 :
log += " encoding window: [ %d , %d ]" % (rpkt.contents.win_s, rpkt.contents.win_e)
if self.m_dec.contents.active :
log += " current decoder state: active with window: [ %d , %d ]" % (self.m_dec.contents.win_s, self.m_dec.contents.win_e)
else :
log += " current decoder state: inactive"
logging.debug(log)
# Record the time of decoding source packets
if newInorder > oldInorder :
for i in range(oldInorder+1, newInorder+1) :
logging.debug("[RecvDataPacket] Receive SOURCE packet %d" % i)
# 解码器成功解码恢复出丢失分组,通知发送端解码成功
currentTime = time.time()
if oldState == 1 and newState == 0 :
self.m_udpSocket.sendto(PepPacket(PepHeader(PepPacketType['DECODE_SUCCESS']), str(currentTime).encode()).packed(), self.m_peerAddress)
log = "[DecodingSuccess] Decoder is inactivated and delivered in-order source packets between: [%d , %d]" % (oldInorder + 1 , newInorder)
log += " with repair packet %d" % rpkt.contents.repairid
log += " arrive time: %f" % receiveTime
log += " decoding cost time: %f" % (currentTime - receiveTime)
logging.debug(log)
# 检测是否发生连续分组丢失现象
if rpkt.contents.sourceid != -1 :
if rpkt.contents.sourceid - self.m_lastRecvSourceId > 9 :
message = str(str(currentTime)) + ' SOURCE ' + str(rpkt.contents.sourceid - self.m_lastRecvSourceId)
self.m_udpSocket.sendto(PepPacket(PepHeader(PepPacketType['ADVERTISE_BURST']), message.encode()).packed(), self.m_peerAddress)
self.m_lastRecvSourceId = rpkt.contents.sourceid
else :
if rpkt.contents.repairid - self.m_lastRecvRepairId > 9 :
message = str(str(currentTime) + ' REPAIR ' + str(rpkt.contents.repairid - self.m_lastRecvRepairId))
self.m_udpSocket.sendto(PepPacket(PepHeader(PepPacketType['ADVERTISE_BURST']), message.encode()).packed(), self.m_peerAddress)
self.m_lastRecvRepairId = rpkt.contents.repairid
if not outOrderRecv and self.m_latestRecvSourceNum + self.m_latestRecvRepairNum != self.m_numLastAcked :
threshold = self.m_initCWnd if self.m_activeProbeBw else 1000
if rpkt.contents.repairid != -1 :
self.SendDataAck()
elif self.m_numRecvSinceLastSourceAck >= SourceAckInterval or rpkt.contents.sourceid < threshold :
self.m_numRecvSinceLastSourceAck = 0
self.SendDataAck()
return
def SendProbePackets(self) :
self.m_probeBw = 0
self.m_probePacketSentTimes = []
probePacketId = 0
while probePacketId < ProbeTrainLength :
filler = ' ' * (ProbePacketSize - PepHeaderLength - len(str(probePacketId).encode()))
self.m_udpSocket.sendto(PepPacket(PepHeader(PepPacketType['PROBE']), str(probePacketId).encode() + filler.encode()).packed(), self.m_peerAddress)
probePacketId += 1
self.m_probePacketSentTimes.append(time.time())
self.m_lastProbedTime = time.time()
return
def RecvProbePacketAndSendProbeAck(self, pkt) :
probePacketId = int(pkt.body)
if probePacketId == 0 :
self.m_lastProbeArrivedId = 0
self.m_probeValidity = True
self.m_firstProbeArriveTime = time.time()
else :
# Regardless of whether the last packet arrived, or whether the variable was reset in the last bandwidth probe,
# if any of the packets in this bandwidth probe are lost (including the first one), the condition will not be valid,
# the variables will be reset, and the bandwidth probe will be invalidated.
if probePacketId != self.m_lastProbeArrivedId+1 :
self.m_lastProbeArrivedId = -1
self.m_probeValidity = False
else :
self.m_lastProbeArrivedId += 1
if self.m_probeValidity :
if probePacketId == ProbeTrainLength-1 :
trainDispersion = time.time() - self.m_firstProbeArriveTime
message = str(probePacketId) + ' ' + str(trainDispersion)
else :
message = str(probePacketId)
self.m_udpSocket.sendto(PepPacket(PepHeader(PepPacketType['PROBE_ACK']), message.encode()).packed(), self.m_peerAddress)
def RecvProbeAcks(self, pkt) :
message = pkt.body.decode().split(' ')
probeAckId = int(message[0])
sendTime = self.m_probePacketSentTimes[probeAckId]
recvTime = time.time()
self.RttEstimation(recvTime, sendTime)
if probeAckId == ProbeTrainLength-1 :
alpha = 0.9
trainDispersion = float(message[1])
instantaneousEstBw = (ProbeTrainLength-1) / trainDispersion * ProbePacketSize / ScPacketSize # pkts/sec.
self.m_probeBw = alpha * self.m_probeBw + (1-alpha) * instantaneousEstBw if self.m_probeBw != 0 else instantaneousEstBw # smoothed probe bandwidth
self.m_estBw = self.m_probeBw * 0.8
self.m_maxBwFilter.Insert(recvTime, self.m_estBw)
self.UpdateCwnd()
log = "EstBw: %f Mbps CWND: %d pkts RTT: %f ms." % (self.m_estBw*ScPacketSize*8/1024/1024, self.m_cWnd, self.m_rtt*1000)
logging.info("[BwProbe] %s" % log)
if self.m_detailFlag :
print("[%s][%s:%d] %s" % (time.strftime('%Y-%m-%d %X',time.localtime()), self.m_selfAddress[0], self.m_selfAddress[1], log))
return
def ReceiveAndHandlePepPacket(self) :
# Receive pep packet
data, addr = self.m_udpSocket.recvfrom(UdpBufSize)
if len(data) == 0 :
return
pkt = PepPacket()
pkt.parse(data)
# Handle pep packet
if pkt.header.mtype == PepPacketType['HANDSHAKE'] :
self.m_udpSocket.sendto(PepPacket(PepHeader(PepPacketType['HANDSHAKE_ACK'])).packed(), self.m_peerAddress)
elif pkt.header.mtype == PepPacketType['HANDSHAKE_ACK'] :
self.EstablishPEPConnection(pkt)
elif pkt.header.mtype == PepPacketType['WAVEHAND'] :
self.ClosePEPConnection(pkt)
elif pkt.header.mtype == PepPacketType['HEARTBEAT'] :
self.m_udpSocket.sendto(PepPacket(PepHeader(PepPacketType['HEARTBEAT_ACK'])).packed(), self.m_peerAddress)
elif pkt.header.mtype == PepPacketType['SC_PROTECTED_PKT'] :
self.RecvDataPackets(pkt)
elif pkt.header.mtype == PepPacketType['SC_DATA_ACK'] :
self.RecvDataAck(pkt)
elif pkt.header.mtype == PepPacketType['PROBE'] :
self.RecvProbePacketAndSendProbeAck(pkt)
elif pkt.header.mtype == PepPacketType['PROBE_ACK'] :
self.RecvProbeAcks(pkt)
elif pkt.header.mtype == PepPacketType['ADVERTISE_BURST'] :
# The receiver feedback that a continuous packet loss occurred before one RTT
message = pkt.body.decode().split(' ')
peerBurstTime = float(message[0])
burstPacketType = message[1]
burstPacketsNumber = int(message[2])
self.m_lastBurstTime = time.time()
logging.warning("[Burst] Peer PEPesc Receiver advertised burst %s %d packets." % (burstPacketType, burstPacketsNumber))
elif pkt.header.mtype == PepPacketType['DECODE_SUCCESS'] :
self.m_lastDecSuccTime = float(pkt.body.decode())
elif pkt.header.mtype == PepPacketType['HEARTBEAT_ACK'] :
logging.debug("[HEARTBEAT_ACK] heartbeat ACK received.") # nothing needs to be done here, since response update has been done when reading packets from channel
else :
logging.debug("Unknown PepPacket type")
return
def HandleScPayloads(self) :
readScPayloadNumber = 0
maxAllowReadOnce = 10
while self.m_dec.contents.inorder >= self.m_inorderNext and readScPayloadNumber < maxAllowReadOnce :
buf = self.m_dec.contents.recovered[self.m_inorderNext % DEC_ALLOC]
readScPayloadNumber += 1
self.m_inorderNext += 1
if buf is None :
break
payload = bytes(cast(buf, POINTER(c_ubyte * self.m_cp.pktsize))[0]) # class 'bytes'
scPayLoad = SCPayload()
scPayLoad.parse(payload)
neighbor = scPayLoad.tcpDestinationAddr
remote = scPayLoad.tcpSourceAddr
recvAddr = (remote[0], remote[1], neighbor[0], neighbor[1])
sendAddr = (neighbor[0], neighbor[1], remote[0], remote[1])
if scPayLoad.msg == ScProtectedMsg['TCP_RAW_DATA'] :
# Use the corresponding channel for application sending
if recvAddr not in self.m_findChidForPEPescToSend :
return
channel = self.m_channels[self.m_findChidForPEPescToSend[recvAddr]]
channel.send(scPayLoad.msgData)
self.m_tcpRecvBufLen[recvAddr] += len(scPayLoad.msgData)
if recvAddr in self.m_linkToBeClosed and self.m_tcpRecvBufLen[recvAddr] == self.m_linkToBeClosed[recvAddr] :
neighborRecvTcpDataLength = self.m_tcpRecvBufLen[recvAddr]
neighborSentTcpDataLength = self.m_tcpSentBufLen[sendAddr]
chid = self.m_findChidForPEPescToSend[recvAddr]
CloseChannel(self.m_channels, chid)
del self.m_findChidForPEPescToSend[recvAddr]
del self.m_linkToBeClosed[recvAddr]
del self.m_tcpRecvBufLen[recvAddr]
del self.m_tcpSentBufLen[sendAddr]
if self.m_detailFlag :
print("[%s][%s:%d] Close channel with %s:%d."\
% (time.strftime('%Y-%m-%d %X',time.localtime()), self.m_selfAddress[0], self.m_selfAddress[1], neighbor[0], neighbor[1]))
print("[%s][%s:%d] TCP connection {%s:%d -> %s:%d} Total sent %.3fMBytes Total recv %.3fMBytes."\
% (time.strftime('%Y-%m-%d %X',time.localtime()), self.m_selfAddress[0], self.m_selfAddress[1],\
neighbor[0], neighbor[1], remote[0], remote[1], neighborSentTcpDataLength/1024/1024, neighborRecvTcpDataLength/1024/1024))
elif scPayLoad.msg == ScProtectedMsg['REMOTE_REQUEST'] :
chid = OpenOutConnChannel(self.m_channels, neighbor, remote, maxWaitTime=0.02)
if chid == -1 :
print("Open channel error,Exit!")
sys.exit()
self.m_findChidForPEPescToSend[recvAddr] = chid
self.m_tcpRecvBufLen[recvAddr] = 0
self.m_tcpSentBufLen[sendAddr] = 0
#print("[%s][%s:%d] Try to connect to %s:%d."\
# % (time.strftime('%Y-%m-%d %X',time.localtime()), self.m_selfAddress[0], self.m_selfAddress[1], neighbor[0], neighbor[1]))
elif scPayLoad.msg == ScProtectedMsg['REMOTE_EXIST'] :
tcpReceiver = self.m_tcpReceiverWaiting[sendAddr]
del self.m_tcpReceiverWaiting[sendAddr]
chid = OpenInConnChannel(self.m_channels, tcpReceiver, remote, maxWaitTime=0.02)
self.m_findChidForPEPescToSend[recvAddr] = chid
self.m_tcpRecvBufLen[recvAddr] = 0
self.m_tcpSentBufLen[sendAddr] = 0
if self.m_detailFlag :
print("[%s][%s:%d] Peer PEPesc reports that connecting to %s:%d successfully."\
% (time.strftime('%Y-%m-%d %X',time.localtime()), self.m_selfAddress[0], self.m_selfAddress[1], remote[0], remote[1]))
logging.info("[TCP] Connect success {%s:%d -> %s:%d}" % (neighbor[0], neighbor[1], remote[0], remote[1]))
elif scPayLoad.msg == ScProtectedMsg['REMOTE_NOT_EXIST'] :
tcpReceiver = self.m_tcpReceiverWaiting[sendAddr]
tcpReceiver.close()
del self.m_tcpReceiverWaiting[sendAddr]
if self.m_detailFlag :
print("[%s][%s:%d] Peer PEPesc reports that failed to connect to %s:%d."\
% (time.strftime('%Y-%m-%d %X',time.localtime()), self.m_selfAddress[0], self.m_selfAddress[1], remote[0], remote[1]))
elif scPayLoad.msg == ScProtectedMsg['REMOTE_EXIT'] :
remoteTotalSentTcpDataLength = int(scPayLoad.msgData)
# If I have not created a channel for the connection or has closed the channel, ignore this notification
if recvAddr not in self.m_findChidForPEPescToSend :
return
if self.m_detailFlag :
print("[%s][%s:%d] Peer PEPesc reports that %s:%d has exited."\
% (time.strftime('%Y-%m-%d %X',time.localtime()), self.m_selfAddress[0], self.m_selfAddress[1], remote[0], remote[1]))
# If I have sent all the data, immediately closing the connection with my neighbor, or waiting to receive and send full TCP data
if self.m_tcpRecvBufLen[recvAddr] == remoteTotalSentTcpDataLength :
chid = self.m_findChidForPEPescToSend[recvAddr]
CloseChannel(self.m_channels, chid)
neighborRecvTcpDataLength = self.m_tcpRecvBufLen[recvAddr]
neighborSentTcpDataLength = self.m_tcpSentBufLen[sendAddr]
self.m_totalDataSentSize += neighborSentTcpDataLength
self.m_totalDataRecvSize += neighborRecvTcpDataLength
del self.m_findChidForPEPescToSend[recvAddr]
del self.m_tcpRecvBufLen[recvAddr]
del self.m_tcpSentBufLen[sendAddr]
if self.m_detailFlag :
print("[%s][%s:%d] Close channel with %s:%d."\
% (time.strftime('%Y-%m-%d %X',time.localtime()), self.m_selfAddress[0], self.m_selfAddress[1], neighbor[0], neighbor[1]))
print("[%s][%s:%d] TCP connection {%s:%d -> %s:%d} Total sent %.3fMBytes Total recv %.3fMBytes."\
% (time.strftime('%Y-%m-%d %X',time.localtime()), self.m_selfAddress[0], self.m_selfAddress[1],\
neighbor[0], neighbor[1], remote[0], remote[1], neighborSentTcpDataLength/1024/1024, neighborRecvTcpDataLength/1024/1024))
else :
self.m_linkToBeClosed[recvAddr] = remoteTotalSentTcpDataLength
return
def ReadChannels(self, readableChannelIds) :
for chid in readableChannelIds :#list(self.m_channels) :
ch = self.m_channels[chid]
sendAddr = (ch.neighbor[0], ch.neighbor[1], ch.remote[0], ch.remote[1])