From 7d8d88b8b9a47cfeb7dd824d1af8fa1c8ffa0e55 Mon Sep 17 00:00:00 2001 From: Yannick Huber Date: Tue, 29 Oct 2024 10:37:46 +0100 Subject: [PATCH 1/2] new segment mongodb Allows exporting to an external mongoDb. By using capped collections, the disk size used by flowpipeline can be limited to a fixed size --- go.mod | 19 +- go.sum | 28 +++ main.go | 1 + segments/output/mongodb/mongo.go | 310 ++++++++++++++++++++++++++ segments/output/mongodb/mongo_test.go | 122 ++++++++++ 5 files changed, 475 insertions(+), 5 deletions(-) create mode 100644 segments/output/mongodb/mongo.go create mode 100644 segments/output/mongodb/mongo_test.go diff --git a/go.mod b/go.mod index 1928f1a..9e77cea 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,14 @@ require ( gopkg.in/yaml.v2 v2.4.0 ) +require ( + github.com/montanaflynn/stats v0.7.1 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect +) + require ( github.com/ClickHouse/ch-go v0.53.0 // indirect github.com/alecthomas/participle/v2 v2.0.0-beta.1 // indirect @@ -92,13 +100,14 @@ require ( github.com/subosito/gotenv v1.4.2 // indirect github.com/vishvananda/netlink v1.2.1-beta.2 // indirect github.com/vishvananda/netns v0.0.4 // indirect + go.mongodb.org/mongo-driver v1.17.1 go.opentelemetry.io/otel v1.13.0 // indirect go.opentelemetry.io/otel/trace v1.13.0 // indirect - golang.org/x/crypto v0.6.0 // indirect - golang.org/x/net v0.7.0 // indirect - golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.5.0 // indirect - golang.org/x/text v0.7.0 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/net v0.21.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.23.0 // indirect + golang.org/x/text v0.17.0 // indirect google.golang.org/genproto v0.0.0-20230222225845-10f96fb3dbec // indirect google.golang.org/grpc v1.53.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index 5d47c55..54c55fa 100644 --- a/go.sum +++ b/go.sum @@ -376,6 +376,8 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/netsampler/goflow2 v1.1.1 h1:GpVlvPq4yRbyzoiz0Vp3XilNr5js/0UhHcQI7Ol/MDk= @@ -510,16 +512,26 @@ github.com/vishvananda/netns v0.0.0-20220913150850-18c4f4234207 h1:nn7SOQy8xCu3i github.com/vishvananda/netns v0.0.0-20220913150850-18c4f4234207/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.mongodb.org/mongo-driver v1.11.1/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8= +go.mongodb.org/mongo-driver v1.17.1 h1:Wic5cJIwJgSpBhe3lx3+/RybR5PiYRMpVFgO7cOHyIM= +go.mongodb.org/mongo-driver v1.17.1/go.mod h1:wwWm/+BuOddhcq3n68LKRmgk2wXzmF6s0SFOa0GINL4= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -553,6 +565,8 @@ golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -587,6 +601,7 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -626,11 +641,14 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220513224357-95641704303c/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -653,10 +671,13 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 h1:ZrnxWX62AgTKOSagEqxvb3ffipvEDX2pl7E1TdqLqIc= golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -711,6 +732,7 @@ golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220513210249-45d2b4557a2a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -718,6 +740,8 @@ golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -728,10 +752,13 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -787,6 +814,7 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/main.go b/main.go index 9da6133..2ba4d76 100644 --- a/main.go +++ b/main.go @@ -55,6 +55,7 @@ import ( _ "github.com/bwNetFlow/flowpipeline/segments/output/json" _ "github.com/bwNetFlow/flowpipeline/segments/output/kafkaproducer" _ "github.com/bwNetFlow/flowpipeline/segments/output/lumberjack" + _ "github.com/bwNetFlow/flowpipeline/segments/output/mongodb" _ "github.com/bwNetFlow/flowpipeline/segments/output/sqlite" _ "github.com/bwNetFlow/flowpipeline/segments/print/count" diff --git a/segments/output/mongodb/mongo.go b/segments/output/mongodb/mongo.go new file mode 100644 index 0000000..802a161 --- /dev/null +++ b/segments/output/mongodb/mongo.go @@ -0,0 +1,310 @@ +//go:build cgo +// +build cgo + +// Dumps all incoming flow messages to a local mongodb database using a capped collection to limit the used disk space +package mongodb + +import ( + "context" + "errors" + "fmt" + "log" + "net" + "reflect" + "strconv" + "strings" + "sync" + + "github.com/bwNetFlow/flowpipeline/pb" + "github.com/bwNetFlow/flowpipeline/segments" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +type Mongodb struct { + segments.BaseSegment + mongodbUri string + dbCollection *mongo.Collection + fieldTypes []string + fieldNames []string + ringbufferSize int64 + + databaseName string // default flowdata + collectionName string // default ringbuffer + Fields string // optional comma-separated list of fields to export, default is "", meaning all fields + BatchSize int // optional how many flows to hold in memory between INSERTs, default is 1000 +} + +// Every Segment must implement a New method, even if there isn't any config +// it is interested in. +func (segment Mongodb) New(configx map[string]string) segments.Segment { + newsegment := &Mongodb{} + + newsegment, err := fillSegmentWithConfig(newsegment, configx) + if err != nil { + log.Printf("[error] '%s'", err.Error()) + return nil + } + + ctx := context.Background() + + //Test if db connection works + client, err := mongo.Connect(ctx, options.Client().ApplyURI(newsegment.mongodbUri)) + if err == nil { + //test if the connection was acutally sucessful + err = client.Ping(ctx, options.Client().ReadPreference) + } + if err != nil { + log.Printf("[error] mongoDB: Could not open DB connection due to '%s", err.Error()) + return nil + } + db := client.Database(newsegment.databaseName) + + // collection in the mongdo should be capped to limit the used disk space + convertToCappedCollection(db, newsegment) + return newsegment +} + +func (segment *Mongodb) Run(wg *sync.WaitGroup) { + ctx := context.Background() + defer func() { + close(segment.Out) + wg.Done() + }() + + client, err := mongo.Connect(ctx, options.Client().ApplyURI(segment.mongodbUri)) + if err != nil { + log.Panic(err) // this has already been checked in New + } + db := client.Database(segment.databaseName) + segment.dbCollection = db.Collection(segment.collectionName) + + defer client.Disconnect(ctx) + + var unsaved []*pb.EnrichedFlow + + for msg := range segment.In { + unsaved = append(unsaved, msg) + if len(unsaved) >= segment.BatchSize { + err := segment.bulkInsert(unsaved, ctx) + if err != nil { + log.Printf("[error] %s", err) + } + unsaved = []*pb.EnrichedFlow{} + } + segment.Out <- msg + } + segment.bulkInsert(unsaved, ctx) +} + +func fillSegmentWithConfig(newsegment *Mongodb, config map[string]string) (*Mongodb, error) { + if config == nil { + return newsegment, errors.New("missing configuration for segment mongodb") + } + + if config["mongodb_uri"] == "" { + return newsegment, errors.New("mongoDB: mongodb_uri not defined") + } + newsegment.mongodbUri = config["mongodb_uri"] + + if config["database"] == "" { + log.Println("[INFO] mongoDB: no database defined - using default value (flowdata)") + config["database"] = "flowdata" + } + newsegment.databaseName = config["database"] + + if config["collection"] == "" { + log.Println("[INFO] mongoDB: no collection defined - using default value (ringbuffer)") + config["collection"] = "ringbuffer" + } + newsegment.collectionName = config["collection"] + + var ringbufferSize int64 = 10737418240 + if config["max_disk_usage"] == "" { + log.Println("[INFO] mongoDB: no ring buffer size defined - using default value (10GB)") + } else { + size, err := sizeInBytes(config["max_disk_usage"]) + if err == nil { + log.Println("[INFO] mongoDB: setting ring buffer size to " + config["max_disk_usage"]) + ringbufferSize = size + } else { + log.Println("[Warning] mongoDB: failed setting ring buffer size to " + config["max_disk_usage"] + " - using default as fallback (10GB)") + } + } + newsegment.ringbufferSize = ringbufferSize + + newsegment.BatchSize = 1000 + if config["batchsize"] != "" { + if parsedBatchSize, err := strconv.ParseUint(config["batchsize"], 10, 32); err == nil { + if parsedBatchSize == 0 { + return newsegment, errors.New("MongoDO: Batch size 0 is not allowed. Set this in relation to the expected flows per second") + } + newsegment.BatchSize = int(parsedBatchSize) + } else { + log.Println("[error] MongoDO: Could not parse 'batchsize' parameter, using default 1000.") + } + } else { + log.Println("[info] MongoDO: 'batchsize' set to default '1000'.") + } + + // determine field set + if config["fields"] != "" { + protofields := reflect.TypeOf(pb.EnrichedFlow{}) + conffields := strings.Split(config["fields"], ",") + for _, field := range conffields { + protofield, found := protofields.FieldByName(field) + if !found { + return newsegment, errors.New("csv: Field specified in 'fields' does not exist") + } + newsegment.fieldNames = append(newsegment.fieldNames, field) + newsegment.fieldTypes = append(newsegment.fieldTypes, protofield.Type.String()) + } + } else { + protofields := reflect.TypeOf(pb.EnrichedFlow{}) + // +-3 skips over protobuf state, sizeCache and unknownFields + newsegment.fieldNames = make([]string, protofields.NumField()-3) + newsegment.fieldTypes = make([]string, protofields.NumField()-3) + for i := 3; i < protofields.NumField(); i++ { + field := protofields.Field(i) + newsegment.fieldNames[i-3] = field.Name + newsegment.fieldTypes[i-3] = field.Type.String() + } + newsegment.Fields = config["fields"] + } + + return newsegment, nil +} + +func (segment Mongodb) bulkInsert(unsavedFlows []*pb.EnrichedFlow, ctx context.Context) error { + // not using transactions due to limitations of capped collectiction + // ("You cannot write to capped collections in transactions." + // https://www.mongodb.com/docs/manual/core/capped-collections/) + if len(unsavedFlows) == 0 { + return nil + } + unsavedFlowData := bson.A{} + for _, msg := range unsavedFlows { + singleFlowData := bson.M{} + values := reflect.ValueOf(msg).Elem() + for i, fieldname := range segment.fieldNames { + protofield := values.FieldByName(fieldname) + switch segment.fieldTypes[i] { + case "[]uint8": // this is neccessary for proper formatting + ipstring := net.IP(protofield.Interface().([]uint8)).String() + if ipstring == "" { + ipstring = "" + } + singleFlowData[fieldname] = ipstring + case "string": // this is because doing nothing is also much faster than Sprint + singleFlowData[fieldname] = protofield.Interface().(string) + default: + singleFlowData[fieldname] = fmt.Sprint(protofield) + } + } + unsavedFlowData = append(unsavedFlowData, singleFlowData) + } + _, err := segment.dbCollection.InsertMany(ctx, unsavedFlowData) + if err != nil { + log.Println("[error] mongoDB: Failed to insert due to " + err.Error()) + return err + } + return nil +} + +func init() { + segment := &Mongodb{} + segments.RegisterSegment("mongodb", segment) +} + +func sizeInBytes(sizeStr string) (int64, error) { + // Split into number and unit + parts := strings.Fields(sizeStr) + if len(parts) > 2 || len(parts) < 1 { + return 0, fmt.Errorf("invalid size format") + } + + size, err := strconv.ParseInt(parts[0], 10, 64) + if err != nil { + return 0, err + } + + if len(parts) == 1 { + return size, nil + } + + // Calculate bytes if a size was provided + unit := strings.ToUpper(parts[1]) + switch unit { + case "B": + return size, nil + case "KB": + return size * 1024, nil + case "MB": + return size * 1024 * 1024, nil + case "GB": + return size * 1024 * 1024 * 1024, nil + case "TB": + return size * 1024 * 1024 * 1024 * 1024, nil + default: + return 0, fmt.Errorf("unknown unit: %s", unit) + } +} + +/************************************************************************************************ +** Checks if the collection segment.collectionName in the db is a capped collection +** If not it converts it to a capped collection with the size segment.ringbufferSize +*************************************************************************************************/ +func convertToCappedCollection(db *mongo.Database, segment *Mongodb) error { + ctx := context.Background() + + collStats := db.RunCommand(ctx, bson.D{{Key: "collStats", Value: segment.collectionName}}) + + var collInfo struct { + Name string `bson:"name"` + Capped bool `bson:"capped"` + MaxSize int32 `bson:"maxSize"` + Count int64 `bson:"count"` + Size int64 `bson:"size"` + } + + if collStats.Err() != nil { + log.Printf("[Error] Failed to check Collection '%s' due to: '%s'\n", segment.collectionName, collStats.Err().Error()) + return collStats.Err() + } + + if err := collStats.Decode(&collInfo); err != nil { + return fmt.Errorf("failed to decode collection info: %v", err) + } + + if collInfo.Count == 0 { + // Create a new capped collection + cappedOptions := options.CreateCollection().SetCapped(true).SetSizeInBytes(segment.ringbufferSize) + err := db.CreateCollection(ctx, segment.collectionName, cappedOptions) + if err != nil { + return fmt.Errorf("failed to create capped collection: %v", err) + } + + log.Printf("[Debug] Capped collection '%s' created successfully.\n", segment.collectionName) + return nil + } + + if !collInfo.Capped { + log.Printf("[Warning] Collection '%s' is not capped. Starting converting it...\n", segment.collectionName) + db.RunCommand(ctx, bson.D{ + {Key: "convertToCapped", Value: segment.collectionName}, + {Key: "size", Value: segment.ringbufferSize}, + }) + return nil + } + + log.Printf("[INFO] Collection '%s' is already capped.\n", segment.collectionName) + if collInfo.MaxSize != int32(segment.ringbufferSize) { + log.Printf("[Warning] Changing max size of collection '%s' from '%d' to '%d'.\n", segment.collectionName, collInfo.MaxSize, segment.ringbufferSize) + db.RunCommand(ctx, bson.D{ + {Key: "collMod", Value: segment.collectionName}, + {Key: "cappedSize", Value: segment.ringbufferSize}, + }) + } + return nil +} diff --git a/segments/output/mongodb/mongo_test.go b/segments/output/mongodb/mongo_test.go new file mode 100644 index 0000000..af7bfbf --- /dev/null +++ b/segments/output/mongodb/mongo_test.go @@ -0,0 +1,122 @@ +//go:build cgo +// +build cgo + +package mongodb + +import ( + "io" + "log" + "os" + "sync" + "testing" + + "github.com/bwNetFlow/flowpipeline/pb" + // "github.com/bwNetFlow/flowpipeline/segments" +) + +// Mongodb Segment test, passthrough test only +func TestSegment_Mongodb_passthrough(t *testing.T) { + // result := segments.TestSegment("Mongodb", map[string]string{"mongodb_uri": "mongodb://localhost:27017/" , "database":"testing"}, + // &pb.EnrichedFlow{SrcAddr: []byte{192, 168, 88, 142}, DstAddr: []byte{192, 168, 88, 143}, Proto: 45}) + // if result == nil { + // t.Error("Segment Mongodb is not passing through flows.") + // } + segment := Mongodb{}.New(map[string]string{"mongodb_uri": "mongodb://localhost:27017/", "database": "testing"}) + + in, out := make(chan *pb.EnrichedFlow), make(chan *pb.EnrichedFlow) + segment.Rewire(in, out) + + wg := &sync.WaitGroup{} + wg.Add(1) + go segment.Run(wg) + in <- &pb.EnrichedFlow{SrcAddr: []byte{192, 168, 88, 1}, DstAddr: []byte{192, 168, 88, 1}, Proto: 1} + <-out + in <- &pb.EnrichedFlow{SrcAddr: []byte{192, 168, 88, 2}, DstAddr: []byte{192, 168, 88, 2}, Proto: 2} + <-out + close(in) + wg.Wait() +} + +// Mongodb Segment benchmark with 1000 samples stored in memory +func BenchmarkMongodb_1000(b *testing.B) { + log.SetOutput(io.Discard) + os.Stdout, _ = os.Open(os.DevNull) + + segment := Mongodb{}.New(map[string]string{"mongodb_uri": "mongodb://localhost:27017/", "database": "testing"}) + + in, out := make(chan *pb.EnrichedFlow), make(chan *pb.EnrichedFlow) + segment.Rewire(in, out) + + wg := &sync.WaitGroup{} + wg.Add(1) + go segment.Run(wg) + + for n := 0; n < b.N; n++ { + in <- &pb.EnrichedFlow{SrcAddr: []byte{192, 168, 88, 142}, DstAddr: []byte{192, 168, 88, 143}, Proto: 45} + _ = <-out + } + close(in) +} + +// Mongodb Segment benchmark with 10000 samples stored in memory +func BenchmarkMongodb_10000(b *testing.B) { + log.SetOutput(io.Discard) + os.Stdout, _ = os.Open(os.DevNull) + + segment := Mongodb{}.New(map[string]string{"mongodb_uri": "mongodb://localhost:27017/", "database": "testing", "batchsize": "10000"}) + + in, out := make(chan *pb.EnrichedFlow), make(chan *pb.EnrichedFlow) + segment.Rewire(in, out) + + wg := &sync.WaitGroup{} + wg.Add(1) + go segment.Run(wg) + + for n := 0; n < b.N; n++ { + in <- &pb.EnrichedFlow{SrcAddr: []byte{192, 168, 88, 142}, DstAddr: []byte{192, 168, 88, 143}, Proto: 45} + _ = <-out + } + close(in) +} + +// Mongodb Segment benchmark with 10000 samples stored in memory +func BenchmarkMongodb_100000(b *testing.B) { + log.SetOutput(io.Discard) + os.Stdout, _ = os.Open(os.DevNull) + + segment := Mongodb{}.New(map[string]string{"mongodb_uri": "mongodb://localhost:27017/", "database": "testing", "batchsize": "100000"}) + + in, out := make(chan *pb.EnrichedFlow), make(chan *pb.EnrichedFlow) + segment.Rewire(in, out) + + wg := &sync.WaitGroup{} + wg.Add(1) + go segment.Run(wg) + + for n := 0; n < b.N; n++ { + in <- &pb.EnrichedFlow{SrcAddr: []byte{192, 168, 88, 142}, DstAddr: []byte{192, 168, 88, 143}, Proto: 45} + _ = <-out + } + close(in) +} + +// Mongodb Segment benchmark with 10000 samples stored in memory +func BenchmarkMongodb_100000_with_storage_limit(b *testing.B) { + log.SetOutput(io.Discard) + os.Stdout, _ = os.Open(os.DevNull) + + segment := Mongodb{}.New(map[string]string{"mongodb_uri": "mongodb://localhost:27017/", "database": "testing", "batchsize": "100000", "max_disk_usage": "100 MB"}) + + in, out := make(chan *pb.EnrichedFlow), make(chan *pb.EnrichedFlow) + segment.Rewire(in, out) + + wg := &sync.WaitGroup{} + wg.Add(1) + go segment.Run(wg) + + for n := 0; n < b.N; n++ { + in <- &pb.EnrichedFlow{SrcAddr: []byte{192, 168, 88, 142}, DstAddr: []byte{192, 168, 88, 143}, Proto: 45} + _ = <-out + } + close(in) +} From 2f61b068c713dbb1f0c2ae9887342f4e050b7d3d Mon Sep 17 00:00:00 2001 From: Yannick Huber Date: Thu, 21 Nov 2024 08:51:27 +0100 Subject: [PATCH 2/2] skip mongodb test if no db is available --- segments/output/mongodb/mongo_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/segments/output/mongodb/mongo_test.go b/segments/output/mongodb/mongo_test.go index af7bfbf..8f8c80f 100644 --- a/segments/output/mongodb/mongo_test.go +++ b/segments/output/mongodb/mongo_test.go @@ -22,6 +22,9 @@ func TestSegment_Mongodb_passthrough(t *testing.T) { // t.Error("Segment Mongodb is not passing through flows.") // } segment := Mongodb{}.New(map[string]string{"mongodb_uri": "mongodb://localhost:27017/", "database": "testing"}) + if segment == nil { + t.Skip() + } in, out := make(chan *pb.EnrichedFlow), make(chan *pb.EnrichedFlow) segment.Rewire(in, out) @@ -43,6 +46,9 @@ func BenchmarkMongodb_1000(b *testing.B) { os.Stdout, _ = os.Open(os.DevNull) segment := Mongodb{}.New(map[string]string{"mongodb_uri": "mongodb://localhost:27017/", "database": "testing"}) + if segment == nil { + b.Skip() + } in, out := make(chan *pb.EnrichedFlow), make(chan *pb.EnrichedFlow) segment.Rewire(in, out) @@ -64,6 +70,9 @@ func BenchmarkMongodb_10000(b *testing.B) { os.Stdout, _ = os.Open(os.DevNull) segment := Mongodb{}.New(map[string]string{"mongodb_uri": "mongodb://localhost:27017/", "database": "testing", "batchsize": "10000"}) + if segment == nil { + b.Skip() + } in, out := make(chan *pb.EnrichedFlow), make(chan *pb.EnrichedFlow) segment.Rewire(in, out) @@ -85,6 +94,9 @@ func BenchmarkMongodb_100000(b *testing.B) { os.Stdout, _ = os.Open(os.DevNull) segment := Mongodb{}.New(map[string]string{"mongodb_uri": "mongodb://localhost:27017/", "database": "testing", "batchsize": "100000"}) + if segment == nil { + b.Skip() + } in, out := make(chan *pb.EnrichedFlow), make(chan *pb.EnrichedFlow) segment.Rewire(in, out) @@ -106,6 +118,9 @@ func BenchmarkMongodb_100000_with_storage_limit(b *testing.B) { os.Stdout, _ = os.Open(os.DevNull) segment := Mongodb{}.New(map[string]string{"mongodb_uri": "mongodb://localhost:27017/", "database": "testing", "batchsize": "100000", "max_disk_usage": "100 MB"}) + if segment == nil { + b.Skip() + } in, out := make(chan *pb.EnrichedFlow), make(chan *pb.EnrichedFlow) segment.Rewire(in, out)