-
Notifications
You must be signed in to change notification settings - Fork 4
/
rtmfp.py
2581 lines (2244 loc) · 128 KB
/
rtmfp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# (c) 2011, Cumulus Python <[email protected]>. No rights reserved.
# Experimental rendezvous server for RTMFP in pure Python.
#
# This is a re-write of OpenRTMFP's Cumulus project from C++ to Python to fit with rtmplite project architecture.
# The original Cumulus project is in C++ and allows rendezvous and man-in-middle mode at the server.
# You can download the original project from https://github.com/OpenRTMFP/Cumulus, and compile and start on Mac OS X as follows:
# $ cd CumulusLib; make -f Makefile-darwin clean; make -f Makefile-darwin
# $ cd ../CumulusService; make -f Makefile-darwin
# $ ./CumulusService -l8 -d all
#
# To get started with this rtmfp.py experiments, start it first in debug mode.
# $ export PYTHONPATH=.:/path/to/p2p-sip/src:/path/to/PyCrypto
# $ ./rtmfp.py -d --no-rtmp
# Then compile the test Flash application by editing testP2P/Makefile to supply the path for your mxmlc.
# $ cd testP2P; make; cd ..
# Alternatively use the supplied testP2P.swf.
# Launch your web browser to open the file testP2P/bin-debug/testP2P.swf
#
# For p2p-mode: first click on publisher connect and then on player connect to see the video stream going.
#
# For man-in-middle mode: start rtmfp.py with --middle argument.
# $ ./rtmfp.py -d --no-rtmp --middle
# Then click on publisher connect, copy the replacement peer id from the console of rtmfp.py and paste to the
# nearID/farID box in the browser, and finally click on the player connect button to see the video stream
# flowing via your server.
#
# For server connection mode instead of the direct (p2p) mode, start rtmfp.py without --middle argument, and
# then before clicking on publisher connect, uncheck the "direct" checkbox to enable FMS_CONNECTION mode in NetStream
# instead of DIRECT_CONNECTION.
#
# TODO: the server connection mode is not implemented yet.
# TODO: the interoperability with SIP is not implemented yet.
# TODO: the NAT traversal is not tested yet.
'''
This is a simple RTMFP rendezvous server to enable end-to-end and client-server UDP based media transport between Flash Player instances
and with this server.
Protocol Description
--------------------
(The description is based on the original OpenRTMFP's Cumulus project as well as http://www.ietf.org/proceedings/10mar/slides/tsvarea-1.pdf)
Session
An RTMFP session is an end-to-end bi-directional pipe between two UDP transport addresses. A transport address contains an IP address and port number, e.g.,
"192.1.2.3:1935". A session can have one or more flows where a flow is a logical path from one entity to another via zero or more intermediate entities. UDP
packets containing encrypted RTMFP data are exchanged in a session. A packet contains one or more messages. A packet is always encrypted using AES with 128-bit
keys.
In the protocol description below, all numbers are in network byte order (big-endian). The | operator indicates concatenation of data. The numbers are assumed
to be unsigned unless mentioned explicitly.
Scrambled Session ID
The packet format is as follows. Each packet has the first 32 bits of scrambled session-id followed by encrypted part. The scrambled (instead of raw) session-id
makes it difficult if not impossible to mangle packets by middle boxes such as NATs and layer-4 packet inspectors. The bit-wise XOR operator is used to scramble
the first 32-bit number with subsequent two 32-bit numbers. The XOR operator makes it possible to easily unscramble.
packet := scrambled-session-id | encrypted-part
To scramble a session-id,
scrambled-session-id = a^b^c
where ^ is the bit-wise XOR operator, a is session-id, and b and c are two 32-bit numbers from the first 8 bytes of the encrypted-part.
To unscramble,
session-id = x^y^z
where z is the scrambled-session-id, and b and c are two 32-bit numbers from the first 8 bytes of the encrypted-part.
The session-id determines which session keys are used for encryption and decryption of the encrypted part. There is one exception for the fourth message in the
handshake which contains the non-zero session-id but the handshake (symmetric) session keys are used for encryption/decryption. For the handshake messages, a
symmetric AES (advanced encryption standard) with 128-bit (16 bytes) key of "Adobe Systems 02" (without quotes) is used. For subsequent in-session messages the
established asymmetric session keys are used as described later.
Encryption
Assuming that the AES keys are known, the encryption and decryption of the encrypted-part is done as follows. For decryption, an initialization vector of all
zeros (0's) is used for every decryption operation. For encryption, the raw-part is assumed to be padded as described later, and an initialization vector of all
zeros (0's) is used for every encryption operation. The decryption operation does not add additional padding, and the byte-size of the encrypted-part and the
raw-part must be same.
The decrypted raw-part format is as follows. It starts with a 16-bit checksum, followed by variable bytes of network-layer data, followed by padding. The
network-layer data ignores the padding for convenience.
raw-part := checksum | network-layer-data | padding
The padding is a sequence of zero or more bytes where each byte is \xff. Since it uses 128-bit (16 bytes) key, padding ensures that the size in bytes of the
decrypted part is a multiple of 16. Thus, the size of padding is always less than 16 bytes and is calculated as follows:
len(padding) = 16*N - len(network-layer-data) - 1
where N is any positive number to make 0 <= padding-size < 16
For example, if network-layer-data is 84 bytes, then padding is 16*6-84-1=11 bytes. Adding a padding of 11 bytes makes the decrypted raw-part of size 96 which
is a multiple of 16 (bytes) hence works with AES with 128-bit key.
Checksum
The checksum is calculated over the concatenation of network-layer-data and padding. Thus for the encoding direction you should apply the padding followed by
checksum calculation and then AES encrypt, and for the decoding direction you should AES decrypt, verify checksum and then remove the (optional) padding if
needed. Usually padding removal is not needed because network-layer data decoders will ignore the remaining data anyway.
The 16-bit checksum number is calculated as follows. The concatenation of network-layer-data and padding is treated as a sequence of 16-bit numbers. If the size
in bytes is not an even number, i.e., not divisible by 2, then the last 16-bit number used in the checksum calculation has that last byte in the least-significant
position (weird!). All the 16-bit numbers are added in to a 32-bit number. The first 16-bit and last 16-bit numbers are again added, and the resulting number's
first 16 bits are added to itself. Only the least-significant 16 bit part of the resulting sum is used as the checksum.
Network Layer Data
The network-layer data contains flags, optional timestamp, optional timestamp echo and one or more chunks.
network-layer-data = flags | timestamp | timestamp-echo | chunks ...
The flags value is a single byte containing these information: time-critical forward notification, time-critical reverse notification, whether timestamp is
present? whether timestamp echo is present and initiator/responder marker. The initiator/responder marker is useful if the symmetric (handshake) session keys
are used for AES, so that it protects against packet loopback to sender.
The bit format of the flags is not clear, but the following applies. For the handshake messages, the flags is \x0b. When the flags' least-significant 4-bits
are 1101b then the timestamp-echo is present. The timestamp seems to be always present. For in-session messages, the last 4-bits are either 1101b or 1001b.
--------------------------------------------------------------------
flags meaning
--------------------------------------------------------------------
0000 1011 setup/handshake
0100 1010 in-session no timestamp-echo (server to Flash Player)
0100 1110 in-session with timestamp-echo (server to Flash Player)
xxxx 1001 in-session no timestamp-echo (Flash Player to server)
xxxx 1101 in-session with timestamp-echo (Flash Player to server)
--------------------------------------------------------------------
TODO: looks like bit \x04 indicates whether timestamp-echo is present. Probably \x80 indicates whether timestamp is present. last two bits of 11b indicates
handshake, 10b indicates server to client and 01b indicates client to server.
The timestamp is a 16-bit number that represents the time with 4 millisecond clock. The wall clock time can be used for generation of this timestamp value.
For example if the current time in seconds is tm = 1319571285.9947701 then timestamp is calculated as follows:
int(time * 1000/4) & 0xffff = 46586
, i.e., assuming 4-millisecond clock, calculate the clock units and use the least significant 16-bits.
The timestamp-echo is just the timestamp value that was received in the incoming request and is being echo'ed back. The timestamp and its echo allows the
system to calculate the round-trip-time (RTT) and keep it up-to-date.
Each chunk starts with an 8-bit type, followed by the 16-bit size of payload, followed by the payload of size bytes. Note that \xff is reserved and not used for
chunk-type. This is useful in detecting when the network-layer-data has finished and padding has started because padding uses \xff. Alternatively, \x00 can also
be used for padding as that is reserved type too!
chunk = type | size | payload
Message Flow
There are three types of session messages: session setup, control and flows. The session setup is part of the four-way handshake whereas control and flows are
in-session messages. The session setup contains initiator hello, responder hello, initiator initial keying, responder initial keying, responder hello cookie
change and responder redirect. The control messages are ping, ping reply, re-keying initiate, re-keying response, close, close acknowledge, forwarded initiator
hello. The flow messages are user data, next user data, buffer probe, user data ack (bitmap), user data ack (ranges) and flow exception report.
A new session starts with an handshake of the session setup. Under normal client-server case, the message flow is as follows:
initiator (client) target (server)
|-------initiator hello---------->|
|<------responder hello-----------|
Under peer-to-peer session setup case for NAT traversal, the server acts as a forwarder and forwards the hello to another connected client as follows:
initiator (client) forwarder (server) target (client)
|-------initiator hello---------->| |
| |---------- forwarded initiator hello-->|
| |<--------- ack ----------------------->|
|<------------responder hello---------------------------------------------|
Alternatively, the server could redirect to another target by supplying an alternative list of target addresses as follows:
initiator (client) redirector (server) target (client)
|-------initiator hello---------->|
|<------responder redirect--------|
|-------------initiator hello-------------------------------------------->|
|<------------responder hello---------------------------------------------|
Note that the initiator, target, forwarder and redirector are just roles for session setup whereas client and server are specific implementations such as
Flash Player and Flash Media Server, respectively. Even a server may initiate an initiator hello to a client in which case the server becomes the initiator and
client becomes the target for that session. This mechanism is used for the man-in-middle mode in the Cumulus project.
The initiator hello may be forwarded to another target but the responder hello is sent directly. After that the initiator initial keying and the responder
initial keying are exchanged (between the initiator and the responded target directly) to establish the session keys for the session between the initiator
and the target. The four-way handshake prevents denial-of-service (DoS) via SYN-flooding and port scanning.
As mentioned before the handshake messages for session-setup use the symmetric AES key "Adobe Systems 02" (without the quotes), whereas in-session messages
use the established asymmetric AES keys. Intuitively, the session setup is sent over pre-established AES cryptosystem, and it creates new asymmetric AES
cryptosystem for the new session. Note that a session-id is established for the new session during the initial keying process, hence the first three messages
(initiator-hello, responder-hello and initiator-initial-keying) use session-id of 0, and the last responder-initial-keying uses the session-id sent by the
initiator in the previous message. This is further explained later.
Message Types
The 8-bit type values and their meaning are shown below.
---------------------------------
type meaning
---------------------------------
\x30 initiator hello
\x70 responder hello
\x38 initiator initial keying
\x78 responder initial keying
\x0f forwarded initiator hello
\x71 forwarded hello response
\x10 normal user data
\x11 next user data
\x0c session failed on client side
\x4c session died
\x01 causes response with \x41, reset keep alive
\x41 reset times keep alive
\x5e negative ack
\x51 some ack
---------------------------------
TODO: most likely the bit \x01 indicates whether the transport-address is present or not.
The contents of the various message payloads are described below.
Variable Length Data
The protocol uses variable length data and variable length number. Any variable length data is usually prefixed by its size-in-bytes encoded as a variable
length number. A variable length number is an unsigned 28-bit number that is encoded in 1 to 4 bytes depending on its value. To get the bit-representation,
first assume the number to be composed of four 7-bit numbers as follows
number = 0000dddd dddccccc ccbbbbbb baaaaaaa (in binary)
where A=aaaaaaa, B=bbbbbbb, C=ccccccc, D=ddddddd are the four 7-bit numbers
The variable length number representation is as follows:
0aaaaaaa (1 byte) if B = C = D = 0
0bbbbbbb 0aaaaaaa (2 bytes) if C = D = 0 and B != 0
0ccccccc 0bbbbbbb 0aaaaaaa (3 bytes) if D = 0 and C != 0
0ddddddd 0ccccccc 0bbbbbbb 0aaaaaaa (4 bytes) if D != 0
Thus a 28-bit number is represented as 1 to 4 bytes of variable length number. This mechanism saves bandwidth since most numbers are small and can fit in 1 or 2
bytes, but still allows values up to 2^28-1 in some cases.
Handshake
The initiator-hello payload contains an endpoint discriminator (EPD) and a tag. The payload format is as follows:
initiator-hello payload = first | epd | tag
The first (8-bit) is unknown. The next epd is a variable length data that contains an epd-type (8-bit) and epd-value (remaining). Note that any variable length
data is prefixed by its length as a variable length number. The epd is typically less than 127 bytes, so only 8-bit length is enough. The tag is a fixed 128-bit
(16 bytes) randomly generated data. The fixed sized tag does not encode its length.
epd = epd-type | epd-value
The epd-type is \x0a for client-server and \x0f for peer-to-peer session. If epd-type is peer-to-peer, then the epd-value is peer-id whereas if epd-type is
client-server the epd-value is the RTMFP URL that the client uses to connect to. The initiator sets the epd-value such that the responder can tell whether the
initiator-hello is for them but an eavesdropper cannot deduce the identity from that epd. This is done, for example, using an one-way hash function of the
identity.
The tag is chosen randomly by the initiator, so that it can match the response against the pending session setup. Once the setup is complete the tag can be
forgotten.
When the target receives the initiator-hello, it checks whether the epd is for this endpoint. If it is for "another" endpoint, the initiator-hello is silently
discarded to avoid port scanning. If the target is an introducer (server) then it can respond with an responder, or redirect/proxy the message with
forwarded-initiator-hello to the actual target. In the general case, the target responds with responder-hello.
The responder-hello payload contains the tag echo, a new cookie and the responder certificate. The payload format is as follows:
responder-hello payload = tag-echo | cookie | responder-certificate
The tag echo is same as the original tag from the initiator-hello but encoded as variable length data with variable length size. Since the tag is 16 bytes, size
can fit in 8-bits.
The cookie is a randomly and statelessly generated variable length data that can be used by the responder to only accept the next message if this message was
actually received by the initiator. This eliminates the "SYN flood" attacks, e.g., if a server had to store the initial state then an attacker can overload the
state memory slots by flooding with bogus initiator-hello and prevent further legitimate initiator-hello messages. The SYN flooding attack is common in TCP
servers. The length of the cookie is 64 bytes, but stored as a variable length data.
The responder certificate is also a variable length data containing some opaque data that is understood by the higher level crypto system of the application. In
this application, it uses the diffie-hellman (DH) secure key exchange as the crypto system.
Note that multiple EPD might map to the single endpoint, and the endpoint has single certificate. A server that does not care about the man-in-middle attack or
does not create secure EPD can generate random certificate to be returned as the responder certificate.
certificate = \x01\x0A\x41\x0E | dh-public-num | \x02\x15\x02\x02\x15\x05\x02\x15\x0E
Here the dh-public-num is a 64-byte random number used for DH secure key exchange.
The initiator does not open another session to the same target identified by the responder certificate. If it detects that it already has an open session with
the target it moves the new flow requests to the existing open session and stops opening the new session. The responder has not stored any state so does not
need to care. (In our implementation we do store the initial state for simplicity, which may change later). This is one of the reason why the API is flow-based
rather than session-based, and session is implicitly handled at the lower layer.
If the initiator wants to continue opening the session, it sends the initiator-initial-keying message. The payload is as follows:
initiator-initial-keying payload = initiator-session-id | cookie-echo | initiator-certificate | initiator-component | 'X'
Note that the payload is terminated by \x58 (or 'X' character).
The initiator picks a new session-id (32-bit number) to identify this new session, and uses it to demultiplex subsequent received packet. The responder uses this
initiator-session-id as the session-id to format the scrambled session-id in the packet sent in this session.
The cookie-echo is the same variable length data that was received in the responder-hello message. This allows the responder to relate this message with the
previous responder-hello message. The responder will process this message only if it thinks that the cookie-echo is valid. If the responder thinks that the
cookie-echo is valid except that the source address has changed since the cookie was generated it sends a cookie change message to the initiator.
In this DH crypto system, p and g are publicly known. In particular, g is 2, and p is a 1024-bit number. The initiator picks a new random 1024-bit DH private
number (x1) and generates 1024-bit DH public number (y1) as follows.
y1 = g ^ x1 % p
The initiator-certificate is understood by the crypto system and contains the initiator's DH public number (y1) in the last 128 bytes.
The initiator-component is understood by the crypto system and contains an initiator-nonce to be used in DH algorithm as described later.
When the target receives this message, it generates a new random 1024-bit DH private number (x2) and generates 1024-bit DH public number (y2) as follows.
y2 = g ^ x2 % p
Now that the target knows the initiator's DH public number (y1) and it generates the 1024-bit DH shared secret as follows.
shared-secret = y1 ^ x2 % p
The target generates a responder-nonce to be sent back to the initiator. The responder-nonce is as follows.
responder-nonce = \x03\x1A\x00\x00\x02\x1E\x00\x81\x02\x0D\x02 | responder's DH public number
The peer-id is the 256-bit SHA256 (hash) of the certificate. At this time the responder knows the peer-id of the initiator from the initiator-certificate.
The target picks a new 32-bit responder's session-id number to demultiplex subsequent packet for this session. At this time the server creates a new session
context to identify the new session. It also generates asymmetric AES keys to be used for this session using the shared-secret and the initiator and responder
nonces as follows.
decode key = HMAC-SHA256(shared-secret, HMAC-SHA256(responder nonce, initiator nonce))[:16]
encode key = HMAC-SHA256(shared-secret, HMAC-SHA256(initiator nonce, responder nonce))[:16]
The decode key is used by the target to AES decode incoming packet containing this responder's session-id. The encode key is used by the target to AES encode
outgoing packet to the initiator's session-id. Only the first 16 bytes (128-bits) are used as the actual AES encode and decode keys.
The target sends the responder-initial-keying message back to the initiator. The payload is as follows.
responder-initial-keying payload = responder session-id | responder's nonce | 'X'
Note that the payload is terminated by \x58 (or 'X' character). Note also that this handshake response is encrypted using the symmetric (handshake) AES key
instead of the newly generated asymmetric keys.
When the initiator receives this message it also calculates the AES keys for this session.
encode key = HMAC-SHA256(shared-secret, HMAC-SHA256(responder nonce, initiator nonce))[:16]
decode key = HMAC-SHA256(shared-secret, HMAC-SHA256(initiator nonce, responder nonce))[:16]
As before, only the first 16 bytes (128-bits) are used as the AES keys. The encode key of initiator is same as the decode key of the responder and the decode
key of the initiator is same as the encode key of the responder.
When a server acts as a forwarder, it receives an incoming initiator-hello and sends a forwarded-initiator-hello in an existing session to the target. The
payload is follows.
forwarded initiator hello payload := first | epd | transport-address | tag
The first 8-bit value is \x22. The epd value is same as that in the initiator-hello -- a variable length data containing epd-type and epd-value. The epd-type
is \x0f for a peer-to-peer session. The epd-value is the target peer-id that was received as epd-value in the initiator-hello.
The tag is echoed from the incoming initiator-hello and is a fixed 16 bytes value.
The transport address contains a flag for indicating whether the address is private or public, the binary bits of IP address and optional port number. The
transport address is that of the initiator as known to the forwarder.
transport-address := flag | ip-address | port-number
The flag is an 8-bit number with the first most significant bit as 1 if the port-number is present, otherwise 0. The least significant two bits are 10b for
public IP address and 01b for private IP address.
The ip-address is either 4-bytes (IPv4) or 16-bytes (IPv6) binary representation of the IP address.
The optional port-number is 16-bit number and is present when the flag indicates so.
The server then sends a forwarded-hello-response message back to the initiator with the transport-address of the target.
forwarded-hello-response = transport-address | transport-address | ...
The payload is basically one or more transport addresses of the intended target, with the public address first.
After this the initiator client directly sends subsequent messages to the responder, and vice-versa.
A normal-user-data message type is used to deal with any user data in the flows. The payload is shown below.
normal-user-data payload := flags | flow-id | seq | forward-seq-offset | options | data
The flags, an 8-bits number, indicate fragmentation, options-present, abandon and/or final. Following table indicates the meaning of the bits from most
significant to least significant.
bit meaning
0x80 options are present if set, otherwise absent
0x40
0x20 with beforepart
0x10 with afterpart
0x08
0x04
0x02 abandon
0x01 final
The flow-id, seq and forward-seq-offset are all variable length numbers. The flow-id is the flow identifier. The seq is the sequence number. The
forward-seq-offset is used for partially reliable in-order delivery.
The options are present only when the flags indicate so using the most significant bit as 1. The options are as follows.
TODO: define options
The subsequent data in the fragment may be sent using next-user-data message with the payload as follows:
next-user-data := flags | data
This is just a compact form of the user data when multiple user data messages are sent in the same packet. The flow-id, seq and forward-seq-offset are implicit,
i.e., flow-id is same and subsequent next-user-data have incrementing seq and forward-seq-offset. Options are not present. A single packet never contains data
from more than one flow to avoid head-of-line blocking and to enable priority inversion in case of problems.
TODO
Fill in description of the remaining message flows beyond handshake.
Describe the man-in-middle mode that enables audio/video flowing through the server.
'''
import os, sys, traceback, urlparse, re, socket, struct, time, random, hmac, hashlib
import multitask, amf, rtmp
try:
from Crypto.Cipher import AES
class AESEncrypt(object):
def __init__(self, key):
self.key = key[:16]
self.iv = ''.join(['\x00' for i in xrange(16)]) # create null-IV
def encode(self, data):
self.cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
result = self.cipher.encrypt(data)
return result
class AESDecrypt(object):
def __init__(self, key):
self.key = key[:16]
self.iv = ''.join(['\x00' for i in xrange(16)]) # create null-IV
def decode(self, data):
self.cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
result = self.cipher.decrypt(data)
return result
except ImportError:
print 'WARNING: Please install PyCrypto in your PYTHONPATH for faster performance. Falling back to Python aes.py which is *very* slow'
import aes
class AESEncrypt(object):
def __init__(self, key):
self.key = key[:16]
def encode(self, data):
return aes.encrypt(self.key, data, iv=aes.iv_null())
class AESDecrypt(object):
def __init__(self, key):
self.key = key[:16]
def decode(self, data):
return aes.decrypt(self.key, data, iv=aes.iv_null()) # clip to original data length
_debug = False
#--------------------------------------
# GENERAL UTILITY
#--------------------------------------
def _isLocal(value):
'''Check whether the supplied address tuple ("dotted-ip", port) is local loopback or not?'''
return value[0] == '127.0.0.1'
def _str2address(value, hasPort=True):
'''Parse the given IPv4 or IPv6 address in to ("dotted-ip", port). If port is False, do not parse port, and return it as 0.
@param value the binary string representing ip (if hasPort is False) or ip plus port (if hasPort is True).
@param hasPort if False, then value must not have port. It returns port as 0.
>>> print _str2address('\x7f\x00\x00\x01\x00\x80')
('127.0.0.1', 128)
'''
if hasPort:
if len(value)>=2:
ip, port = (value[:-2], struct.unpack('>H', value[-2:])[0]) if hasPort else (value, 0)
else:
raise ValueError('invalid length of IP value ' + len(value))
else:
ip, port = value, 0
if len(ip) == 4:
return ('.'.join([ord(x) for x in ip]), port)
elif len(ip) == 16:
return (':'.join(['%X'%((ord(x) << 8 | ord(y)),) for x, y in zip(ip[::2], ip[1::2])]), port)
else:
raise ValueError('invalid length of IP value ' + len(value))
def _address2str(value, hasPort=True):
'''Represent the given ("dotted-ip", port) in to binary string.
@param value the tuple of length 2 of the form ('dotted-ip', port)
@param hasPort if False, do not represent port in the returned string. The value must always have port.
>>> print _address2str(('127.0.0.1', 128))
\x7f\x00\x00\x01\x00\x80
'''
host, port = value
if host.find(':') < 0: # IPv4
parts = host.split('.')
if len(parts) != 4:
raise ValueError('invalid dotted-ip')
ip = ''.join([chr(int(x, 10)) for x in parts])
else: # IPv6
parts = (host[1:-1] if host[0] == '[' and host[-1] == ']' else host).split(':')
if len(parts) != 8:
raise ValueError('invalid dotted-ipv6')
ip = ''.join([struct.pack('>H', int(x, 16)) for x in parts])
return (ip + struct.pack('>H', port)) if hasPort else ip
def _ipport2address(value):
'''Parse a string of the form "dotted-ip:port" to ("dotted-ip", port).'''
ip, ignore, port = value.rpartition(':')
return (ip, int(port) if port else 0)
def _address2ipport(address, default_port=1935):
return address[0] if address[1] in (0, default_port) else '%s:%d'%(address[0], address[1])
def _sizeLength7(value):
'''Return the length of storing value using 7 bits variable integer.'''
return 4 if value >= 0x200000 else 3 if value >= 0x4000 else 2 if value >= 0x80 else 1
def _packLength7(value):
d, c, b, a = (value & 0x7F), ((value & 0x03f80) >> 7), ((value & 0x1fc000) >> 14), ((value & 0x0fe00000) >> 21)
return (chr(0x80 | a) if a else '') + (chr(0x80 | b) if a or b else '') + (chr(0x80 | c) if a or b or c else '') + chr(d)
def _unpackLength7(data): # return (value, remaining)
value = index = 0
while index < 4:
byte = ord(data[index])
value = (value << 7) | (byte & 0x7f)
index += 1
if byte & 0x80 == 0: break
# if _debug: print 'unpackLength7 %r %r'%(data[:4], value)
return (value, data[index:])
def _packString(value, sizeLength=None):
return (struct.pack('>H', len(value)) if sizeLength == 16 else struct.pack('>B', len(value)) if sizeLength == 8 else _packLength7(len(value))) + value
def _unpackString(data, sizeLength=None): # returns (value, remaining)
length, data = (struct.unpack('>H', data[:2])[0], data[2:]) if sizeLength == 16 else (struct.unpack('>B', data[:1])[0], data[1:]) if sizeLength == 8 else _unpackLength7(data)
return (data[:length], data[length:])
def _packAddress(value, publicFlag):
return chr((0x02 if publicFlag else 0x01) | (0x80 if value[0].find(':') >= 0 else 0)) + _address2str(value)
def _url2pathquery(value):
'''Unpack an rtmfp URL to (path, dict) where dict is query parameters indexed by name, with value as list.'''
url1 = urlparse.urlparse(value)
url2 = urlparse.urlparse(re.sub('^' + url1.scheme, 'http', value))
return (url2.path, urlparse.parse_qs(url2.query))
def truncate(data, size=16, pre=8, post=5):
length, data = len(data), (data if len(data) <= size else data[:pre] + '...' + data[-post:])
return '[%d] %r'%(length, data)
#--------------------------------------
# SECURITY
#--------------------------------------
_key = 'Adobe Systems 02'
_int2bin = lambda x, size: (''.join(chr(a) for a in [((x>>c)&0x0ff) for c in xrange((size-1)*8,-8,-8)])) if x is not None else '\x00'*size
_bin2int = lambda x: long(''.join('%02x'%(ord(a)) for a in x), 16)
_dh1024p = _bin2int('\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xC9\x0F\xDA\xA2\x21\x68\xC2\x34\xC4\xC6\x62\x8B\x80\xDC\x1C\xD1\x29\x02\x4E\x08\x8A\x67\xCC\x74\x02\x0B\xBE\xA6\x3B\x13\x9B\x22\x51\x4A\x08\x79\x8E\x34\x04\xDD\xEF\x95\x19\xB3\xCD\x3A\x43\x1B\x30\x2B\x0A\x6D\xF2\x5F\x14\x37\x4F\xE1\x35\x6D\x6D\x51\xC2\x45\xE4\x85\xB5\x76\x62\x5E\x7E\xC6\xF4\x4C\x42\xE9\xA6\x37\xED\x6B\x0B\xFF\x5C\xB6\xF4\x06\xB7\xED\xEE\x38\x6B\xFB\x5A\x89\x9F\xA5\xAE\x9F\x24\x11\x7C\x4B\x1F\xE6\x49\x28\x66\x51\xEC\xE6\x53\x81\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF')
_random = lambda size: ''.join([chr(random.randint(0, 255)) for i in xrange(size)])
_bin2hex = lambda data: ''.join(['%02x'%(ord(x),) for x in data])
def _checkSum(data):
data, last = (data[:-1], ord(data[-1])) if len(data) % 2 != 0 else (data, 0)
sum = reduce(lambda x,y: x+y, [(ord(x) << 8 | ord(y)) for x, y in zip(data[::2], data[1::2])], 0) + last
sum = (sum >> 16) + (sum & 0xffff)
sum += (sum >> 16)
return (~sum) & 0xffff
def _decode(decoder, data):
raw = data[:4] + decoder.decode(data[4:])
if struct.unpack('>H', raw[4:6])[0] != _checkSum(raw[6:]):
if _debug: print 'ERROR: decode() invalid checksum %x != %x data=%r'%(struct.unpack('>H', raw[4:6])[0], _checkSum(raw[6:]), raw[6:])
raise ValueError('invalid checksum')
return raw
def _encode(encoder, data):
plen = (0xffffffff - len(data) + 5) & 0x0f # 4-bytes header, plen = 16*N - len for some int N and 0 <= plen < 16 (128 bits)
data += ''.join(['\xff' for i in xrange(plen)])
data = data[:4] + struct.pack('>H', _checkSum(data[6:])) + data[6:]
return data[:4] + encoder.encode(data[4:])
def _unpackId(data):
a, b, c = struct.unpack('>III', data[:12])
return a^b^c
def _packId(data, farId):
b, c = struct.unpack('>II', data[4:12])
a = b^c^farId
return struct.pack('>I', a) + data[4:]
def _beginDH():
'''Using known p (1024bit prime) and g=2, return (x, y) where x=random private value, y=g^x mod p public value.'''
g, x = 2, _bin2int(_random(128))
return (x, pow(g, x, _dh1024p))
def _endDH(x, y):
'''Using known p (1024bit prime), return secret=y^x mod p where x=random private value, y=other sides' public value.'''
return pow(y, x, _dh1024p)
def _asymetricKeys(secret, initNonce, respNonce): # returns (dkey, ekey)
return (hmac.new(secret, hmac.new(respNonce, initNonce, hashlib.sha256).digest(), hashlib.sha256).digest(), hmac.new(secret, hmac.new(initNonce, respNonce, hashlib.sha256).digest(), hashlib.sha256).digest())
#--------------------------------------
# DATA: Peer, Peers, Group, Client, Target, Cookie
#--------------------------------------
class Entity(object):
def __init__(self):
self.id = '\x00'*32 # 32 byte value
def __cmp__(self, other):
return cmp(self.id, other.id) if isinstance(other, Entity) else cmp(self.id, other)
def __hash__(self):
return hash(self.id)
def __repr__(self):
return '<%s id=%s/>'%(self.__class__.__name__, self.id and truncate(self.id))
class Client(Entity):
def __init__(self):
Entity.__init__(self)
self.swfUrl = self.pageUrl = self.path = self.data = self.params = None
def __repr__(self):
return '<%s id=%s swfUrl=%r pageUrl=%r path=%r/>'%(self.__class__.__name__, self.id and truncate(self.id), self.swfUrl, self.pageUrl, self.path)
class Peer(Client):
'''A single peer representation.
@ivar address (tuple) the address of the form ('ip', port) where ip is dotted-string and port is int.
@ivar privateAddress (list) the list of ('ip', port).
@ivar state (str) one of 'none', 'accepted', 'rejected'.
@ivar ping (float) ping round-trip-time in seconds to this Peer.
@ivar groups (list) list of Group objects this Peer is part of.
'''
NONE, ACCEPTED, REJECTED = 'none', 'accepted', 'rejected' # peer state
def __init__(self): # initialize the peer
Client.__init__(self)
self.address, self._privateAddress, self._ping, self.state, self.groups = None, [], 0, Peer.NONE, []
def __repr__(self):
return '<%s id=%s swfUrl=%r pageUrl=%r path=%r address=%r privateAddress=%r state=%r/>'%(self.__class__.__name__, self.id and truncate(self.id), self.swfUrl, self.pageUrl, self.path, self.address, self._privateAddress, self.state)
def close(self): # unsubscribe groups when closing this peer
for group in self.groups:
group.peers.remove(self)
self.groups[:] = []
def dup(self):
peer = Peer()
peer.id, peer.swfUrl, peer.pageUrl, peer.path, peer.data = self.id, self.swfUrl, self.pageUrl, self.path, self.data # Entity, Client
peer.address, peer._privateAddress, peer._ping, peer.state = self.address, self._privateAddress[:], self._ping, self.state # Peer
return peer
def _getPing(self):
return self._ping
def _setPing(self, value):
oldValue, self._ping = self._ping, value
for group in self.groups:
group.peers.remove(self) # re-add so that sorted by ping time
group.peers.add(self)
ping = property(fget=_getPing, fset=_setPing)
def _getPrivateAddress(self):
return self._privateAddress
def _setPrivateAddress(self, value):
self._privateAddress[:] = [(x[0], x[1] or self.address[1]) for x in value]
privateAddress = property(fget=_getPrivateAddress, fset=_setPrivateAddress)
class Peers(list):
'''List of Peer objects kept sorted by ping property. Must use add() method to add to the list.'''
def __init__(self):
list.__init__()
def close(self):
self[:] = []
def add(self, peer):
if peer not in self:
for index, p in enumerate(self):
if peer.ping >= p.ping:
break
self.insert(index, peer)
def best(self, asker, max_count=6):
return ([x for x in self if not _isLocal(x.address) and x != asker] + [x for x in self if _isLocal(x.address) and x != asker])[:max_count]
class Group(object):
'''A Group has a unique id and a list of peers.'''
def __init__(self, id):
self.id, self.peers = id, Peers()
def __cmp__(self, other):
return cmp(self.id, other.id if isinstance(other, group) else other)
def add(self, peer):
if peer not in self.peers:
peer.groups.append(self)
self.peers.add(peer)
def remove(self, peer):
if peer in self.peers:
peer.groups.remove(self)
self.peers.remove(peer)
class Target(Entity):
def __init__(self, address, cookie=None):
Entity.__init__(self)
self.address, self.isPeer = address, bool(cookie is not None)
self.peerId = self.Kp = self.DH = None
if not address[1]:
address = (address[0], 1935)
if cookie is not None:
self.DH, cookie.DH, self.Kp = cookie.DH, None, cookie.nonce[11:11+128]
cookie.nonce = cookie.nonce[:9] + '\x1d' + cookie.nonce[10:]
self.id = hashlib.sha256(cookie.nonce[7:]).digest()
cookie.nonce = cookie.nonce[:9] + '\x0d' + cookie.nonce[10:]
def close(self):
pass
#if self.DH: # why do we need to call _endDH() ?
# self.DH = None
class Cookie(object):
def __init__(self, value):
self.queryUrl, self.id, self.createdTs = '', 0, time.time()
self.target = self.nonce = self.DH = None
if isinstance(value, Target): # target
self.target, self.DH = value, value.DH
if _debug: print ' create cookie with target %r'%(value,)
self.nonce = '\x03\x1A\x00\x00\x02\x1E\x00\x41\x0E' + _random(64) # len is 9+64=73
else: # queryUrl
self.queryUrl = value
self.DH = _beginDH()
if _debug: print ' create cookie with queryUrl %r'%(value,)
self.nonce = '\x03\x1A\x00\x00\x02\x1E\x00\x81\x02\x0D\x02' + _int2bin(self.DH[1], 128) # len is 11+key
def close(self):
#if not self.target and self.DH:
# _endDH(self.DH)
pass
@property
def obsolete(self):
return (time.time() - self.createdTs) >= 120 # two minutes elapsed
def computeKeys(self, initKey, initNonce): # returns (dkey, ekey)
assert len(initKey) == 128
sharedSecret = _int2bin(_endDH(self.DH[0], _bin2int(initKey)), len(initKey))
assert len(sharedSecret) == 128
# return _asymetricKeys(sharedSecret, initNonce, self.nonce)
dkey, ekey = _asymetricKeys(sharedSecret, initNonce, self.nonce)
if _debug: print ' Cookie.computeKeys()\n dkey=%s\n ekey=%s'%(truncate(dkey), truncate(ekey))
return (dkey, ekey)
def __repr__(self):
return '<Cookie id=%r queryUrl=%r nonce=%s />'%(self.id, self.queryUrl, truncate(self.nonce))
def __str__(self):
return struct.pack('>I', self.id) + _packString(self.nonce) + struct.pack('>B', 0x58)
#--------------------------------------
# DATA: Stream, Flow, Packet, QoS
#--------------------------------------
class QoS(object):
'''Quality of service statistics for each stream and media type. Each sample is tuple (time, received, lost) of three int values.'''
def __init__(self):
self.droppedFrames = self.lossRate = self.jitter = self.prevTime = self.reception = 0
self.samples = []
def add(self, tm, received, lost):
now = time.time()
if self.prevTime > 0 and tm >= self.prevTime:
result = self.jitter + (now - self.reception) - (tm - self.prevTime)
self.jitter = int(result if result > 0 else 0)
self.reception, self.prevTime = time.time(), tm
self.samples[:] = [s for s in self.samples if (s[0] >= now - 10)] + [(tm, received, lost)] # keep only last 10 seconds of samples
total, lost = sum([s[1] for s in self.samples]), sum([s[2] for s in self.sampels])
if total != 0:
self.lossRate = float(lost)/(total+lost)
def close(self):
self.droppedFrames = self.lossRate = self.jitter = self.prevTime = self.reception = 0
self.samples[:] = []
class Publication(object):
def __init__(self, name):
self.publisherId, self.name, self._time, self._firstKeyFrame, self._listeners, self.videoQoS, self.audioQoS = 0, name, 0, False, {}, QoS(), QoS()
def close(self):
for item in self._listeners.values():
item.close()
self._listeners.clear()
def addListener(self, client, id, writer, unbuffered):
if id in self._listeners:
if _debug: print 'Publication.addListener() listener %r is already subscribed for publication %r'%(id, self.publisherId)
else:
self._listeners[id] = Listener(id, self, writer, unbuffered)
class Streams(object):
'''Collection of streams by numeric id.'''
def __init__(self):
self._nextId, self.publications, self._streams = 0, {}, [] # publications is str=>Publication, and _streams is set of numeric id
def create(self):
while self._nextId == 0 or self._nextId in self._streams:
self._nextId += 1
self._streams.append(self._nextId)
if _debug: print 'new stream %d'%(self._nextId,)
return self._nextId
def destroy(self, id):
if _debug: print 'delete stream %d'%(id,)
if id in self._streams:
self._streams.remove(id)
def publish(self, client, id, name):
pub = self.publications[name] = Publication(name)
return pub.start(client, id)
def unpublish(self, client, id, name):
if name not in self.publications:
if _debug: print 'the stream %s with a %u id does not exist, unpublish useless'%(name, id)
return
pub = self.publications[name]
pub.stop(client, id)
if pub.publisherId == 0 and len(pub._listeners) == 0:
del self.publications[name]
pub.close()
def subscribe(self, client, id, name, writer, start=-2000):
pub = self.publications[name] = Publication(name)
pub.addListener(client, id, writer, start == -3000)
def unsubscribe(self, client, id, name):
if name not in self.publications:
if _debug: print 'the stream %s does not exist, unsubscribe useless'%(name,)
return
pub = self.publications[name]
pub.removeListener(client, id)
if pub.publisherId == 0 and len(pub._listeners) == 0:
del self.publications[name]
pub.close()
class Packet(object):
'''Has one or more data fragments. Use data to access combined data, and count for number of fragments.'''
def __init__(self, data):
self.data, self.count = data or '', 1
def add(self, data):
self.data += data; self.count += 1
class Fragment(object):
'''Has data and flags.'''
def __init__(self, data, flags):
self.data, self.flags = data, flags
class Message(object):
def __init__(self, repeatable, data=None, memAck=None):
self.stream = amf.BytesIO(data) if data else amf.BytesIO()
self._reader, self.fragments, self.startStage, self.repeatable = self.stream, [], 0, repeatable
if memAck:
self._bufferAck, self._memAck = memAck, amf.BytesIO(memAck)
else:
self.amfWriter = amf.AMF0(data=self.stream)
def init(self, position):
self.stream.seek(position)
return self.stream.remaining() if not self.repeatable else self.stream.len
raise NotImplementedError('must use the derived class instance')
def memAck(self):
return self.reader() if self.repeatable else (self._memAck.remaining, self._readerAck)
def reader(self):
return (self.init(self.fragments and self.fragments[0] or 0), self._reader)
class Flow(object):
'''Flow serves as base class for individual flow type.
@ivar id (int) flow identifier
@ivar signature (str) security signature
'''
EMPTY, AUDIO, VIDEO, AMF_WITH_HANDLER, AMF = 0x00, 0x08, 0x09, 0x14, 0x0F
HEADER, WITH_AFTERPART, WITH_BEFOREPART, ABANDONMENT, END = 0x80, 0x10, 0x20, 0x02, 0x01 # message
def __init__(self, id, signature, peer, server, session):
self.id, self.peer, self.server, self.session = id, peer, server, session
self.error, self.packet, self.stage, self.completed, self.fragments, self.writer = None, None, 0, False, {}, FlowWriter(signature, session)
if self.writer.flowId == 0:
self.writer.flowId = id
def dup(self):
f = Flow(self.id, self.signature, Peer(), self.server, self.session)
f.stage, f.critical = self.stage, self.critical
self.close()
@property
def count(self):
return len(self._messages)
def close(self):
if not self.completed and self.writer.signature:
if _debug: print 'Flow.close() flow consumed: %r'%(self.id,)
self.completed = True
self.fragments.clear() # TODO: do we need to call close on values?
self.packet = None
self.writer.close()
def fail(self, error):
if _debug: print 'Flow.fail() flow failed: %r, %s'%(self,id, error)
if not self.completed:
self.session.writeMessage(0x5e, _packLength7(self.id) + '\x00')
def unpack(self, data): # given data, return (type, remaining)
if not data:
return self.EMPTY
type = ord(data[0])
if type == 0x11:
return (self.AMF_WITH_HANDLER, data[6:])
elif type == self.AMF_WITH_HANDLER:
return (self.AMF_WITH_HANDLER, data[5:])
elif type == self.AMF:
return (self.AMF, data[6:])
elif type in (self.AUDIO, self.VIDEO, 0x01):
return (type, data[1:])
elif type == 0x04:
return (type, data[5:])
else:
if _debug: print 'Flow.unpack() error in unpacking type 0x%02x'%(type,)
return (type, data[1:])
def commit(self):
self.session.writeMessage(0x51, _packLength7(self.id) + chr(0x7f if self.writer.signature else 0x00) + _packLength7(self.stage))
self.commitHandler()
self.writer.flush()
def fragmentHandler(self, stage, deltaNack, fragment, flags):
if self.completed:
return
nextStage = self.stage + 1
if stage < nextStage:
if _debug: print 'Flow.fragmentHandler() stage %r on flow %r has already been received'%(stage, self.id)
return
if deltaNack > stage or deltaNack == 0:
deltaNack = stage
if flags & self.ABANDONMENT or self.stage < (stage - deltaNack):
if _debug: print 'Flow.fragmentHandler() abandonment signal flag: %02x'%(flags,)
toRemove = []
for index, frag in self.fragments.iteritems():
if index > stage: # abandon all stages <= stage
break
if index <= (stage - 1):
self.fragmentSortedHandler(index, frag.data, frag.flags)
toRemove.append(index)
for index in toRemove: del self.fragments[index]
nextStage = stage
if stage > nextStage: # not following stage!
if stage not in self.fragments:
self.fragments[stage] = Fragment(fragment, flags)
if len(self.fragments) > 100:
if _debug: print 'Flow.fragmentHandler() fragments %d'%(len(self.fragments),)
else:
if _debug: print 'Flow.fragmentHandler() stage %u on flow %u already received'%(stage, self.id)
else:
self.fragmentSortedHandler(nextStage, fragment, flags)
nextStage += 1
toRemove = []
for index, frag in self.fragments.iteritems():
if index > nextStage:
break
self.fragmentSortedHandler(nextStage, frag.data, frag.flags)
nextStage += 1
toRemove.append(index)
for index in toRemove: del self.fragments[index]
def fragmentSortedHandler(self, stage, fragment, flags):
if stage <= self.stage:
if _debug: print 'Flow.fragmentSortedHandler() stage %u not sorted on flow %u'%(stage, self.id)
return
if stage > (self.stage + 1): # not following stage
self.lostFragmentsHandler(stage - self.stage - 1)
self.stage, self.packet = stage, None
if flags & self.WITH_BEFOREPART:
return
else:
self.stage = stage
msg = fragment
if flags & self.WITH_BEFOREPART:
if self.packet:
if _debug: print 'Flow.fragmentSortedHandler() a beforepart message received with previous buffer empty. possible some packets lost'
self.packet = None
return
self.packet.add(fragment)
if flags & self.WITH_AFTERPART:
return
msg = self.packet.data
elif flags & self.WITH_AFTERPART:
if self.packet: