diff --git a/cmd/cnetflow/cnetflow.go b/cmd/cnetflow/cnetflow.go index f8780fc..1eae6de 100644 --- a/cmd/cnetflow/cnetflow.go +++ b/cmd/cnetflow/cnetflow.go @@ -27,6 +27,7 @@ var ( LogFmt = flag.String("logfmt", "normal", "Log formatter") EnableKafka = flag.Bool("kafka", true, "Enable Kafka") + FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf") MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address") MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path") TemplatePath = flag.String("templates.path", "/templates", "NetFlow/IPFIX templates list") @@ -80,6 +81,7 @@ func main() { if err != nil { log.Fatal(err) } + kafkaState.FixedLengthProto = *FixedLength s.Transport = kafkaState } log.WithFields(log.Fields{ diff --git a/cmd/cnflegacy/cnflegacy.go b/cmd/cnflegacy/cnflegacy.go index 722d559..30c8078 100644 --- a/cmd/cnflegacy/cnflegacy.go +++ b/cmd/cnflegacy/cnflegacy.go @@ -27,6 +27,7 @@ var ( LogFmt = flag.String("logfmt", "normal", "Log formatter") EnableKafka = flag.Bool("kafka", true, "Enable Kafka") + FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf") MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address") MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path") @@ -78,6 +79,7 @@ func main() { if err != nil { log.Fatal(err) } + kafkaState.FixedLengthProto = *FixedLength s.Transport = kafkaState } log.WithFields(log.Fields{ diff --git a/cmd/csflow/csflow.go b/cmd/csflow/csflow.go index 92e3e40..c326fe0 100644 --- a/cmd/csflow/csflow.go +++ b/cmd/csflow/csflow.go @@ -27,6 +27,7 @@ var ( LogFmt = flag.String("logfmt", "normal", "Log formatter") EnableKafka = flag.Bool("kafka", true, "Enable Kafka") + FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf") MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address") MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path") @@ -78,6 +79,7 @@ func main() { if err != nil { log.Fatal(err) } + kafkaState.FixedLengthProto = *FixedLength s.Transport = kafkaState } log.WithFields(log.Fields{ diff --git a/cmd/goflow/goflow.go b/cmd/goflow/goflow.go index 5c8afad..4221d1f 100644 --- a/cmd/goflow/goflow.go +++ b/cmd/goflow/goflow.go @@ -39,6 +39,7 @@ var ( LogFmt = flag.String("logfmt", "normal", "Log formatter") EnableKafka = flag.Bool("kafka", true, "Enable Kafka") + FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf") MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address") MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path") @@ -101,6 +102,8 @@ func main() { if err != nil { log.Fatal(err) } + kafkaState.FixedLengthProto = *FixedLength + sSFlow.Transport = kafkaState sNFL.Transport = kafkaState sNF.Transport = kafkaState diff --git a/transport/kafka.go b/transport/kafka.go index 085a937..ec3dae4 100644 --- a/transport/kafka.go +++ b/transport/kafka.go @@ -33,10 +33,11 @@ var ( ) type KafkaState struct { - producer sarama.AsyncProducer - topic string - hashing bool - keying []string + FixedLengthProto bool + producer sarama.AsyncProducer + topic string + hashing bool + keying []string } // SetKafkaVersion sets the KafkaVersion that is used to set the log message format version @@ -164,7 +165,14 @@ func (s KafkaState) SendKafkaFlowMessage(flowMessage *flowmessage.FlowMessage) { keyStr := HashProto(s.keying, flowMessage) key = sarama.StringEncoder(keyStr) } - b, _ := proto.Marshal(flowMessage) + var b []byte + if !s.FixedLengthProto { + b, _ = proto.Marshal(flowMessage) + } else { + buf := proto.NewBuffer([]byte{}) + buf.EncodeMessage(flowMessage) + b = buf.Bytes() + } s.producer.Input() <- &sarama.ProducerMessage{ Topic: s.topic, Key: key,