Skip to content
Fabrice Bacchella edited this page Apr 21, 2023 · 17 revisions

Processing syslog

input {
     loghub.receivers.Udp { port: 514, decoder: loghub.decoders.StringCodec}
} | $syslog

pipeline[syslog] {
      [type] = "syslog"
    | loghub.processors.Grok {
        pattern: "%{SYSLOG_LINE}",
        field: "message",
    }
    | loghub.processors.SyslogPriority {
        field: "syslog_pri",
    }
    | loghub.processors.DateParser {
        field: "timestamp",
        destination: "@timestamp",
        timezone: "CET",
        success: [timestamp]-
    }
    | loghub.processors.DateParser {
        fields: ["nexustimestamp", "utctimestamp"],
        patterns: ["MMM dd HH:mm:ss", "MMM dd yyyy HH:mm:ss"],
        destination: "@timestamp",
        timezone: "UTC",
        success: ( [nexustimestamp]- | [utctimestamp]- )
    }
    | loghub.processors.DateParser {
        field: "timestamp8601",
        patterns: ["ISO_INSTANT", "yyyy-MM-dd'T'HH:m:ss.SSSSSSXXX"],
        destination: "@timestamp",
        success: [timestamp8601]-
    }
    | (java.lang.Integer) [syslog5424_ver]
    # a set of subpipeline that can process further message according to the program
    | [program] == "centreon-engine" ? $nagios
    | [program] == "proftpd" ? $proftpd
    | [program] == "sendmail" ? $sendmail
    | [program] == "imap" || [program] == "imaps" ? $imap
} | $main

Processing Apache logs

This pattern uses a binary logger for apache, mod_log_net than encode request information in a msgpack’ed UDP message.

It can be configured with

<IfModule !log_net_module>
  LoadModule log_net_module   /usr/lib64/httpd/modules/mod_log_net.so
</IfModule>
#SetEnvIf needed, setenv is evaluated too late
SetEnvIf Request_URI . instance=webfront

LognetHost loghost
LognetPort 1516
LognetEntries virtual_host remote_address request_duration_microseconds bytes_sent request_query request_method
LognetEntry header_in Host name=host_header
LognetEntry header_in User-agent name=user-agent
LognetEntry header_in Referer name=referrer
LognetEntry hostname name=servername
LognetEntry env instance name=instance
LognetEntry status request=final
LognetEntry request_uri request=original

And to use log messages, the loghub configuration is:

geoip2data: "/var/lib/loghub/GeoIP2-Country.mmdb"

input {
    loghub.receivers.Udp {
        port: 1516, 
        decoder: loghub.decoders.Msgpack,
    }
} | $http

pipeline[http] {
    [type] = "http"
    | loghub.processors.Geoip2 {
        types: ["country","city", "location"],
        field: "remote_address",
        destination: "location",
    }
    | loghub.processors.VarExtractor { 
        parser: "(?<name>\\p{Alnum}+)=(?<value>[^&]+)&?",
        path: "query",
        field: ".request_query",
    }
    | loghub.processors.DecodeUrl { 
        path: "query",
        fields: [ "*" ],
    }
    | loghub.processors.UserAgent { 
        field: "user-agent",
        destination: "user-agent-content",
    }
} | $main

It will output events likes:

{
    "date": "2016-08-12 21:16:09",
    "instance": "webfront",
    "query": {
      "q1": "someargs",
      "q2": "another"
    },
    "remote_address": "8.8.8.8",
    "request_method": "GET",
    "request_duration_microseconds": 2068691,
    "bytes_sent": 11884,
    "request_uri": "/process/",
    "host_header": "www.loghub.com",
    "referrer": "https://github.com/fbacchella/LogHub",
    "@timestamp": "2016-08-12T21:16:09.677+0200",
    "host": "sourcehost",
    "servername": "sourcehost",
    "location": {
      "continent": "Europe",
      "country": {
        "name": "Greece",
        "code": "GR"
      }
    },
    "virtual_host": "www.loghub.com",
    "user-agent": "Mozilla/5.0 (Windows NT 6.3; WOW64; rv:39.0) Gecko/20100101 Firefox/39.0"
    "user-agent-content": {
      "os": {
        "family": "Windows 8.1"
      },
      "device": "Other",
      "userAgent": {
        "family": "Firefox",
        "major": "39",
        "minor": "0"
      }
    },
    "status": 200
  },

From the request

[2016-08-12 21:16:09] 8.8.8.8 "www.loghub.com" "GET /process/?q1=someargs&q2=another HTTP/1.1" 2068691 200 - 11884 "https://github.com/fbacchella/LogHub" "Mozilla/5.0 (Windows NT 6.3; WOW64; rv:39.0) Gecko/20100101 Firefox/39.0"

Watching isc-dhcp

pipeline[syslog] {
    ...
    | [program] == "dhcpd" ? $dhcpd
}

pipeline [dhcpd] {
    loghub.processors.Grok {
        pattern: "(?<verb>DHCPACK|DHCPREQUEST|DHCPOFFER) (?:for|on) %{IP:ip} (?:from|to) %{MAC:src_mac}(?: \\(%{HOSTNAME:dhcp_client_name}\\))?(?:(?: via %{IP:dhcp_proxy})|(?: via %{WORD:dhcp_interface}))",
        success: [message]-,
    }
    | loghub.processors.Grok {
        pattern: "DHCPDISCOVER from %{MAC:src_mac}(?:(?: via %{IP:dhcp_proxy})|(?: via %{WORD:dhcp_interface}))",
        success: ([message]- | [verb]="DHCPDISCOVER"),
    }
    | loghub.processors.Grok {
        pattern: "PXE boot: 1:%{PXEMAC:src_mac}",
        success: {[message]- | [verb]="PXE boot"},
        customPatterns: {
            "PXEMAC": "(?:[a-f0-9]{1,2}:){5}(?:[a-f0-9]{1,2})",
        },
    }
}

For example, given the following syslog message:

DHCPREQUEST for 169.254.1.1 from 92:18:02:71:49:f8 (hostname) via bond0

It will add to the event the following information:

{
    "dhcp_client_name":"hostname",
    "dhcp_interface":"bond0",
    "ip":"169.254.1.1",
    "src_mac":"92:18:02:71:49:f8",
    "verb":"DHCPREQUEST"
}

Watching sudo

pipeline[syslog] {
    ...
    | [program] == "sudo" ? $sudo
}

pipeline[sudo] {
    loghub.processors.Grok {
        pattern: "%{WORD:user} : %{GREEDYDATA:message}",
        if: [message] ==~ /.*TTY=.*/,
    }
    | loghub.processors.VarExtractor { 
        parser: "(?<name>\\p{Alnum}+)=(?<value>.+?)(?:(?: ; )|$)",
        path: "sudo",
        field: ".message",
        if: [user] != null
    }
}

For example, given the following syslog message:

nrpe : TTY=unknown ; PWD=/ ; USER=root ; COMMAND=/sbin/reboot

It will add to the event the following information:

{
    "sudo":{
        "COMMAND":"/sbin/reboot",
        "PWD":"/",
        "TTY":"unknown",
        "USER":"root"
    },
    "user":"nrpe"
}

Listening for Netflow

LogHub can decode Netflow/IPFIX paquets. Only versions 5,9 and 10 (IPFIX) are supported now.

To use it with a UDP receiver, one can write:

input {
    loghub.receivers.Udp { port: 2055, decoder: loghub.decoders.netflow.NetflowDecoder }
} | $main

pipeline[main] {
    loghub.netflow.Processor
    | [type]="NetFlow"
}

The processor will split a Netflow/IPFIX message in set of event, one for the global head information, and one for each record. They are linked using a unique UUID for each message received.

The receiver will generate events like:

{
  "sequenceNumber": 1250953,
  "records": [
    {
      "destinationIPv4Address": "127.0.0.1",
      "icmpTypeCodeIPv4": 771,
      "_type": "Records",
      "sourceIPv4Address": "127.0.0.1",
      "ingressInterface": 1,
      "ipClassOfService": 192,
      "packetDeltaCount": 9,
      "flowEndReason": 1,
      "protocolIdentifier": 1,
      "egressInterface": 65535,
      "octetDeltaCount": 5184,
      "ipNextHopIPv4Address": "0.0.0.0",
      "flowEndMilliseconds": "2017-10-26T09:10:56.094+0000",
      "flowStartMilliseconds": "2017-10-26T09:09:57.284+0000"
    },
    {
      "destinationIPv4Address": "127.0.0.1",
      "icmpTypeCodeIPv4": 771,
      "_type": "Records",
      "sourceIPv4Address": "127.0.0.1",
      "ingressInterface": 65535,
      "ipClassOfService": 192,
      "packetDeltaCount": 9,
      "flowEndReason": 1,
      "protocolIdentifier": 1,
      "egressInterface": 1,
      "octetDeltaCount": 5184,
      "ipNextHopIPv4Address": "0.0.0.0",
      "flowEndMilliseconds": "2017-10-26T09:10:56.094+0000",
      "flowStartMilliseconds": "2017-10-26T09:09:57.284+0000"
    }
  ],
  "host": "127.0.0.1",
  "version": 10
}

The processor will flatten the content as 3 events:

{
  "sequenceNumber": 1250953,
  "host": "127.0.0.1",
  "UUID": "c3f2812b-5224-48b5-b1a8-bb4ec241d01a",
  "version": 10
}
{
  "record": {
    "destinationIPv4Address": "127.0.0.1",
    "icmpTypeCodeIPv4": 771,
    "sourceIPv4Address": "127.0.0.1",
    "ingressInterface": 1,
    "ipClassOfService": 192,
    "packetDeltaCount": 9,
    "flowEndReason": 1,
    "protocolIdentifier": 1,
    "egressInterface": 65535,
    "octetDeltaCount": 5184,
    "ipNextHopIPv4Address": "0.0.0.0",
    "flowEndMilliseconds": "2017-10-26T09:10:56.094+0000",
    "flowStartMilliseconds": "2017-10-26T09:09:57.284+0000"
  },
  "host": "127.0.0.1",
  "msgUUID": "c3f2812b-5224-48b5-b1a8-bb4ec241d01a"
}
{
  "record": {
    "destinationIPv4Address": "127.0.0.1",
    "icmpTypeCodeIPv4": 771,
    "sourceIPv4Address": "127.0.0.1",
    "ingressInterface": 65535,
    "ipClassOfService": 192,
    "packetDeltaCount": 9,
    "flowEndReason": 1,
    "protocolIdentifier": 1,
    "egressInterface": 1,
    "octetDeltaCount": 5184,
    "ipNextHopIPv4Address": "0.0.0.0",
    "flowEndMilliseconds": "2017-10-26T09:10:56.094+0000",
    "flowStartMilliseconds": "2017-10-26T09:09:57.284+0000"
  },
  "host": "127.0.0.1",
  "msgUUID": "c3f2812b-5224-48b5-b1a8-bb4ec241d01a"
}

Processing HAProxy syslogs.

Configure HAProxy 1.5 with the following settings:

global
    log loghost.prod.exalead.com:514 daemon
    log-send-hostname
    log-tag haproxy
...
defaults
    log     global
    option log-separate-errors
    mode    http
    option  httplog
    option  dontlognull
...

It will send syslog events than are more compliant than syslog format and easier to parse.

LogHub comme with some default rules for parsing events. To use them, add the following rules:

pipeline[syslog] {
    | [program] == "haproxy" ? $haproxy
}
...
pipeline[haproxy] {
    path [haproxy] (
        loghub.processors.Grok {
            pattern: "%{HAPROXYHTTPBASE}",
            field: ".message",
            success: (
                [. message]- |
                [haproxy_year]- | [haproxy_month]- | [haproxy_monthday]- | 
                [haproxy_time]- | [haproxy_hour]- | [haproxy_minute]- | [haproxy_second]- | [haproxy_milliseconds]- | 
                (java.lang.Integer) [time_backend_response] |
                (java.lang.Integer) [time_queue] |
                (java.lang.Integer) [client_port] |
                (java.lang.Integer) [srvconn] |
                (java.lang.Integer) [beconn] |
                (java.lang.Integer) [feconn] |
                (java.lang.Integer) [actconn] |
                (java.lang.Integer) [http_status_code] |
                (java.lang.Integer) [srv_queue] |
                (java.lang.Integer) [bytes_read] |
                (java.lang.Integer) [retries] |
                (java.lang.Integer) [backend_queue] |
                (java.lang.Integer) [time_request] |
                (java.lang.Integer) [time_duration] |
                (java.lang.Integer) [time_backend_connect] |
                loghub.processors.DateParser {
                    field: "accept_date",
                    patterns: ["dd/MMM/YYYY:HH:mm:ss.SSS"],
                    destination: "@timestamp",
                    timezone : "CET",
                    success: [accept_date]-,
                } |
                [client ip] < [client_ip] |
                path[client] (
                    loghub.processors.NettyNameResolver {
                        field: "ip",
                        destination: "name",
                        resolver: "10.83.31.246",
                        timeout: 10,
                    }
                ) |
                loghub.processors.OnigurumaRegex {
                    pattern: "^(?<file>[^?]*)(?:\\?(?<query>.*))?",
                    field: "http_request",
                    success: [http_request]-
                } |
                [query] != null ? (
                    path[query_args] (
                        [query] < [. haproxy query] |
                        loghub.processors.VarExtractor {
                            parser: "(?<name>[a-zA-Z_0-9\\.]+)(?:=(?<value>[^&]*))?&?",
                            field: [query],
                            success: [query]-
                        }
                    )
                ) : [query]-
            )
        }
    )
}

You should also define some mapping in syslog's indices. It's formatting as a yaml format, more readable that json.

      haproxy:
        type: object
        properties:
            client:
              type: object
              properties:
                ip:
                  type: ip
                name:
                  type: keyword
                  fields:
                    parts:
                      analyzer: "domain_name_analyzer"
                      type: text
            query_args:
              type: object
              enabled: false

This will generate such kinds of loghub events:

{
    "pid": 11324,
    "program": "haproxy",
    "logsource": "haserver",
    "@timestamp": "2019-09-01T15:27:46.627+0200",
    "host": {
      "ip": "10.1.1.1",
      "name": "haserver.local.net"
    },
    "haproxy": {
      "server_name": "server.local.net",
      "srvconn": 15,
      "time_backend_response": 1551,
      "actconn": 375,
      "query_args": {
        "cgiarg1": "1",
      },
      "time_queue": 0,
      "http_verb": "GET",
      "client_port": 55150,
      "file": "/status",
      "backend_name": "backend_servers",
      "beconn": 369,
      "client": {
        "ip": "192.168.1.1",
        "name": "client.local.net"
      },
      "captured_response_cookie": "-",
      "http_status_code": 200,
      "captured_request_cookie": "-",
      "termination_state": "----",
      "feconn": 375,
      "srv_queue": 0,
      "http_version": "1.1",
      "bytes_read": 63794,
      "retries": 0,
      "backend_queue": 0,
      "time_request": 60,
      "frontend_name": "localnodes",
      "time_duration": 1611,
      "time_backend_connect": 0
    },
    "syslog_pri": {
      "severity": "informational",
      "facility": "daemon"
    }
}
Clone this wiki locally