Log management with netcat-rsyslog-kafka-logstash-elastic
First we generate sample json log with script in 2 way:
: Generate log with bash script.
{"$date_time" "$random_char" "iran": {"city": "Tehran","population": "$population","men": "$(($population*$percentage/100))","women": "$(($population*(100-$percentage)/100))","hOffset": "$(($population*2))","vOffset": "100","weather": "$sun"}
: Get data from API.
{"results":[{"gender":"male","name":{"title":"Mr","first":"سام","last":"نجاتی"},"location":{"street":{"number":5093,"name":"شهید آرش مهر"},"city":"اراک","state":"سیستان و بلوچستان","country":"Iran","postcode":85524,"coordinates":{"latitude":"14.8221","longitude":"-66.8774"},"timezone":{"offset":"-5:00","description":"Eastern Time (US & Canada), Bogota, Lima"}},"email":"[email protected]","login":{"uuid":"6a38607d-4498-492f-93ca-369819d90283","username":"smallostrich271","password":"circus","salt":"prLILh0M","md5":"dfafa1f6203c7589964d8e39d1dc4beb","sha1":"21996887ab6481a45e9694e6bc9281bf93fc214f","sha256":"c26345cd8199200d0016a03de687a7428df7d6c348a0ce5ec1260b4b3b1ead8b"},"dob":{"date":"1967-09-05T20:28:25.316Z","age":56},"registered":{"date":"2013-11-30T11:00:16.765Z","age":9},"phone":"006-85015204","cell":"0902-142-6178","id":{"name":"","value":null},"picture":{"large":"https://randomuser.me/api/portraits/men/50.jpg","medium":"https://randomuser.me/api/portraits/med/men/50.jpg","thumbnail":"https://randomuser.me/api/portraits/thumb/men/50.jpg"},"nat":"IR"}],"info":{"seed":"834d64feb14ac0ec","results":1,"page":1,"version":"1.4"}}
Then we use systemd service for that.
We use netcat to send logs to rsyslog TCP or UDP port.
Then write template that uses Kafka output module of Rsyslog.
Use logstash to parse logs and send that to Elasticsearch.
Using Elasticsearch. Then create index pattern, index template, index lifecycle policies.
yum install nc
cd /etc/yum.repos.d/ wget http://rpms.adiscon.com/v8-stable-daily/rsyslog-daily.repo # for CentOS 7,8,9 wget http://rpms.adiscon.com/v8-stable-daily/rsyslog-daily-rhel.repo # for RHEL 7,8,9 yum install rsyslog rsyslog-kafka
You can run Rsyslog as a Docker Container or Kubernetes Deployment:[https://itnext.io/run-rsyslog-server-in-kubernetes-bb51a7a6e227]
Note There is GUI Dashboard for Rsyslog that you can use that.
First we should copy generator.sh to
Then define log-generator.service in/etc/systemd/system/
[Unit] Description=Log Generator [Service] Environment="expose_port=9080" ExecStart=/opt/log-generator/generator.sh random_person [Install] WantedBy=multi-user.target
Note You can change expose_port with each not binding port you want.
systemctl enable --now log-generator
You can test that with this command:
nc -k -lv $expose_port
Download and install the public signing key:
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
Add the following in your
directory in a file with a.repo
suffix, for examplelogstash.repo
:[logstash-8.x] name=Elastic repository for 8.x packages baseurl=https://artifacts.elastic.co/packages/8.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
And your repository is ready for use. You can install it with:
sudo yum install logstash systemctl enable --now logstash
Note Images are available for running Logstash as a Docker container. They are available from the Elastic Docker registry.
See Running Logstash on Docker for details on how to configure and run Logstash Docker containers.
Elasticsearch and Kibana
Config Rsyslog to send logs to kafka
First we should Provide TCP syslog reception with this options in
:$ModLoad imtcp $InputTCPServerRun 9080
Then load module which use for sending message to kafka:
$ModLoad omkafka
Write template for messages that sent to kafka:
template(name="json_lines" type="list" option.json="on") { constant(value="{") constant(value="\"timestamp\":\"") property(name="timereported" dateFormat="rfc3339") constant(value="\",\"message\":\"") property(name="msg") constant(value="\",\"host\":\"") property(name="hostname") constant(value="\",\"severity\":\"") property(name="syslogseverity-text") constant(value="\",\"facility\":\"") property(name="syslogfacility-text") constant(value="\",\"syslog-tag\":\"") property(name="syslogtag") constant(value="\"}") }
Send just TCP syslogs on
to kafka:if $inputname == "imtcp"then { action(type="omkafka" template="json_lines" broker=["************:****"] topic="logs" partitions.auto="on" confParam=[ "socket.keepalive.enable=true" ] ) }
Note Also there are some other option that aren't use in this project but may be useful for other scenarios:
# Send all logs on and @@ for TCP and @ for UDP *.* @@
# Save logs in file with this format of path $template RemoteLogs,"/var/log/hosts/%HOSTNAME%/%$YEAR%/%$MONTH%/%$DAY%/syslog.log" *.* ?RemoteLogs
Config Logstash to parse and send logs from kafka to elastic
Create config file in
:input { kafka{ bootstrap_servers => "************:****" topics => ["logs"] codec => json {} } } filter { json { source => "message" } } output { elasticsearch { hosts => ["https://***********:****"] ssl_verification_mode => "none" user => "*****" password => "********" index => "*****" } }
You can check config with this command:/etc/logstash/logstash.yml
bin/logstash -f configfile.conf --config.reload.automatic
You can check logs in
Logstash configs like
are in/etc/logstash/logstash.yml
that you can change them for better performance. -
Elasticsearch configuration You should have index with proper shard and replica number due to cluster and architecture. Then you should create Index Template and Index Lifecycle Policy.