forked from andrewf/pcap2har
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpcap.py
65 lines (59 loc) · 2.18 KB
/
pcap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import dpkt
from pcaputil import *
from socket import inet_ntoa
import logging as log
import os
import shutil
import tcp
from packetdispatcher import PacketDispatcher
def ParsePcap(dispatcher, filename=None, reader=None):
'''
Parses the passed pcap file or pcap reader.
Adds the packets to the PacketDispatcher. Keeps a list
Args:
dispatcher = PacketDispatcher
reader = pcaputil.ModifiedReader or None
filename = filename of pcap file or None
check for filename first; if there is one, load the reader from that. if
not, look for reader.
'''
if filename:
f = open(filename, 'rb')
pcap = ModifiedReader(f)
elif reader:
pcap = reader
else:
raise 'function ParsePcap needs either a filename or pcap reader'
#now we have the reader; read from it
packet_count = 1 # start from 1 like Wireshark
errors = [] # store errors for later inspection
try:
for packet in pcap:
ts = packet[0] # timestamp
buf = packet[1] # frame data
hdr = packet[2] # libpcap header
# discard incomplete packets
if hdr.caplen != hdr.len:
# log packet number so user can diagnose issue in wireshark
log.warning('ParsePcap: discarding incomplete packet, # %d' % packet_count)
continue
# parse packet
try:
# handle SLL packets, thanks Libo
dltoff = dpkt.pcap.dltoff
if pcap.dloff == dltoff[dpkt.pcap.DLT_LINUX_SLL]:
eth = dpkt.sll.SLL(pkt[1])
# otherwise, for now, assume Ethernet
else:
eth = dpkt.ethernet.Ethernet(buf)
dispatcher.add(ts, buf, eth)
# catch errors from this packet
except dpkt.Error as e:
errors.append((record, e, packet_count))
log.warning(e)
packet_count += 1
except dpkt.dpkt.NeedData as error:
log.warning(error)
log.warning('A packet in the pcap file was too short, '
'debug_pkt_count=%d' % debug_pkt_count)
self.errors.append((None, error))