From 278ee12b8a11ed9a14cef4e49ba86cbff780f080 Mon Sep 17 00:00:00 2001 From: Paul Prechtel Date: Sun, 10 Nov 2024 13:05:18 +0100 Subject: [PATCH] Use the packet trace timestamps in netflow records Instead of the current time. Equally, copy the incoming stream (on the in channel) timestamps. The aggregate segment will take the first timestamp as flow start and the last timestamp as flow end from all the connected sub-flows for the final flow (at least when the packet trace/flow data is sorted by time). --- segments/filter/aggregate/flowcache.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/segments/filter/aggregate/flowcache.go b/segments/filter/aggregate/flowcache.go index 2b88aa5..4ca2720 100644 --- a/segments/filter/aggregate/flowcache.go +++ b/segments/filter/aggregate/flowcache.go @@ -265,10 +265,10 @@ func (f *FlowExporter) Insert(pkt gopacket.Packet) { f.mutex.Lock() if record, exists = f.cache[key]; !exists { f.cache[key] = new(FlowRecord) - f.cache[key].TimeReceived = time.Now() + f.cache[key].TimeReceived = pkt.Metadata().Timestamp record = f.cache[key] } - record.LastUpdated = time.Now() + record.LastUpdated = pkt.Metadata().Timestamp record.SamplerAddress = f.samplerAddress record.Packets = append(record.Packets, pkt) @@ -291,10 +291,10 @@ func (f *FlowExporter) InsertFlow(flow *pb.EnrichedFlow) { f.mutex.Lock() if record, exists = f.cache[key]; !exists { f.cache[key] = new(FlowRecord) - f.cache[key].TimeReceived = time.Now() + f.cache[key].TimeReceived = time.Unix(int64(flow.TimeFlowStart), 0) record = f.cache[key] } - record.LastUpdated = time.Now() + record.LastUpdated = time.Unix(int64(flow.TimeFlowEnd), 0) record.SamplerAddress = f.samplerAddress record.Flows = append(record.Flows, flow) }