This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Event Management

Before it was SIEM

Back in the dawn of time, we called it ‘Central Logging’ and it looked kind of like this:

# The classical way you'd implement this is via a tiered system.

Log Shipper --\                   /--> Log Parser --\
Log Shipper ---+--> Log Broker --+---> Log Parser ---+--> Log Storage --> Log Visualizer 
Log Shipper --/                   \--> Log Parser --/

# The modern way is more distributed. The clients are more powerful so you spread the load out and they can connect to distributed storage directly.

Log Parser Shipper --\ /-- Log Storage <-\
Log Parser Shipper ---+--- Log Storage <--+-  Visualizer 
Log Parser Shipper --/ \-- Log Storage <-/

# ELK (Elasticsearch Logstash and Kibana) is a good example.

Logstash --\ /-- Elasticsearch <-\
Logstash ---+--- Elasticsearch <--+--> Kibana 
Logstash --/ \-- Elasticsearch <-/

More recently, there’s a move toward shippers like NXLog and Elasticsearch’s beats client. A native client saves you from deploying Java and is better suited for thin or micro instances.

# NXLog has an output module for Elasticsearch now. Beats is an Elasticsearch product.
nxlog --\   
nxlog ---+--> Elasticsearch <-- Kibana
beats --/ 

Windows has it’s own log forwarding technology. You can put it to work without installing anything on the clients. This makes Windows admins a lot happier.

# It's built-in and fine for windows events - just doesn't do text files. Beats can read the events and push to elasticsearch.
Windows Event Forwarding --\   
Windows Event Forwarding ---+--> Central Windows Event Manager -> Beats/Elasticsearch --> Kibana
Windows Event Forwarding --/ 

Unix has several ways to do it, but the most modern/least-overhead way is to use the native journald system.

# Built-in to systemd
journald send --> central journald receive --> Beats/Elasticsearch --> Kibana

But Why?

The original answer used to be ‘reporting’. It was easier to get all the data together and do an analysis in one place.

Now the answer is ‘correlation’. If someone is probing your systems, they’ll do it very slowly and from multiple IPs to evade thresholds if they can, trying to break up patterns of attack. These patterns can become clear however, when you have a complete picture in one place.

1 - Elastic Stack

This is also referred to ELK, and is an acronym that stands for Elasticsearch, Logstash and Kibana

This is a trio of tools that <www.elasticsearch.org> has packaged up into a simple and flexible way to handle, store and visualize data. Logstash collects the logs, parses them and stores them in Elasticsearch. Kibana is a web application that knows how to to talk to Elasticsearch and visualizes the data.

Quite simple and powerful

To make use of this tio, start by deploying in this order:

  • Elasticseach (first, you have have some place to put things)
  • Kibana (so you can see what’s going on in elasticsearch easily)
  • Logstash (to start collecting data)

More recently, you can use the Elasticsearch Beats client in place of Logstash. These are natively compiled clients that have less capability, but are easier on the infrastructure than Logstash, a Java application.

1.1 - Elasticsearch

1.1.1 - Installation (Linux)

This is circa 2014 - use with a grain of salt.

This is generally the first step, as you need a place to collect your logs. Elasticsearch itself is a NoSQL database and well suited for pure-web style integrations.

Java is required, and you may wish to deploy Oracle’s java per the Elasticsearch team’s recommendation. You may also want to dedicate a data partition. By default, data is stored in /var/lib/elasticsearch and that can fill up. We will also install the ‘kopf’ plugin that makes it easier to manage your data.

Install Java and Elasticsearch

# (add a java repo)
sudo yum install java

# (add the elasticsearch repo)
sudo yum install elasticsearch

# Change the storage location
sudo mkdir /opt/elasticsearch
sudo chown elasticsearch:elasticsearch /opt/elasticsearch

sudo vim /etc/elasticsearch/elasticsearch.yml

    ...
    path.data: /opt/elasticsearch/data
    ...

# Allow connections on ports 9200, 9300-9400 and set the cluster IP

# By design, Elasticsearch is open so control access with care
sudo iptables --insert INPUT --protocol tcp --source 10.18.0.0/16 --dport 9200 --jump ACCEPT

sudo iptables --insert INPUT --protocol tcp --source 10.18.0.0/16 --dport 9300:9300 --jump ACCEPT

sudo vim /etc/elasticsearch/elasticsearch.yml
    ...
    # Failing to set the 'publish_host can result in the cluster auto-detecting an interface clients or other
    # nodes can't reach. If you only have one interface you can leave commented out. 
    network.publish_host: 10.18.3.1
    ...


# Increase the heap size
sudo vim  /etc/sysconfig/elasticsearch

    # Heap size defaults to 256m min, 1g max
    # Set ES_HEAP_SIZE to 50% of available RAM, but no more than 31g
ES_HEAP_SIZE=2g

# Install the kopf plugin and access it via your browser

sudo /usr/share/elasticsearch/bin/plugin -install lmenezes/elasticsearch-kopf
sudo service elasticsearch restart

In your browser, navigate to

http://10.18.3.1:9200/_plugin/kopf/

If everything is working correctly you should see a web page with KOPF at the top.

1.1.2 - Installation (Windows)

You may need to install on windows to ensure the ‘maximum amount of service ability with existing support staff’. I’ve used it on both Windows and Linux and it’s fine either way. Windows just requires a few more steps.

Requirements and Versions

The current version of Elasticsearch at time of writing these notes is 7.6. It requires an OS and Java. The latest of those supported are:

  • Windows Server 2016
  • OpenJDK 13

Installation

The installation instructions are at https://www.elastic.co/guide/en/elastic-stack-get-started/current/get-started-elastic-stack.html

Note: Elasicsearch has both an zip and a MSI. The former comes with a java distro but the MSI includes a service installer.

Java

The OpenJDK 13 GA Releases at https://jdk.java.net/13/ no longer include installers or the JRE. But you can install via a MSI from https://github.com/ojdkbuild/ojdkbuild

Download the latest java-13-openjdk-jre-13.X and execute. Use the advanced settings to include the configuration of the JAVA_HOME and other useful variables.

To test the install, open a command prompt and check the version

C:\Users\allen>java --version
openjdk 13.0.2 2020-01-14
OpenJDK Runtime Environment 19.9 (build 13.0.2+8)
OpenJDK 64-Bit Server VM 19.9 (build 13.0.2+8, mixed mode, sharing)

Elasticsearch

Download the MSI installer from https://www.elastic.co/downloads/elasticsearch. It may be tagged as beta, but it installs the GA product well. Importantly, it also installs a windows service for Elasticsearch.

Verify the installation by checking your services for ‘Elasticsearch’, which should be running.

Troubleshooting

Elasticsearch only listing on localhhost

By default, this is the case. You must edit the config file.

# In an elevated command prompt
notepad C:\ProgramDaata\Elastic\Elasticsearach\config\elasticsearch.yml

# add
discovery.type: single-node
network.host: 0.0.0.0

https://stackoverflow.com/questions/59350069/elasticsearch-start-up-error-the-default-discovery-settings-are-unsuitable-for

failure while checking if template exists: 405 Method Not Allowed

You can’t run newer versions of the filebeat with older versions of elasticsearch. Download the old deb and sudo apt install ./some.deb

https://discuss.elastic.co/t/filebeat-receives-http-405-from-elasticsearch-after-7-x-8-1-upgrade/303821 https://discuss.elastic.co/t/cant-start-filebeat/181050

1.1.3 - Common Tasks

This is circa 2014 - use with a grain of salt.

Configuration of elasticsearch itself is seldom needed. You will have to maintain the data in your indexes however. This is done by either using the kopf tool, or at the command line.

After you have some data in elasticsearch, you’ll see that your ‘documents’ are organized into ‘indexes’. This is a simply a container for your data that was specified when logstash originally sent it, and the naming is arbitrarily defined by the client.

Deleting Data

The first thing you’re likely to need is to delete some badly-parsed data from your testing.

Delete all indexes with the name test*

curl -XDELETE http://localhost:9200/test*

Delete from all indexes documents of type ‘WindowsEvent’

curl -XDELETE http://localhost:9200/_all/WindowsEvent

Delete from all indexes documents have the attribute ‘path’ equal to ‘/var/log/httpd/ssl_request.log’

curl -XDELETE 'http://localhost:9200/_all/_query?q=path:/var/log/https/ssl_request.log'

Delete from the index ’logstash-2014.10.29’ documents of type ‘shib-access’

curl -XDELETE http://localhost:9200/logstash-2014.10.29/shib-access

http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-delete-by-query.html

Curator

All the maintenance by hand has to stop at some point and Curator is a good tool to automate some of it. This is a script that will do some curls for you, so to speak.

Install

wget https://bootstrap.pypa.io/get-pip.py
sudo python get-pip.py
sudo pip install elasticsearch-curator
sudo pip install argparse

Use

curator --help
curator delete --help

And in your crontab

# Note: you must escape % characters with a \ in crontabs
20 0 * * * curator delete indices --time-unit days --older-than 14 --timestring '\%Y.\%m.\%d' --regex '^logstash-bb-.*'
20 0 * * * curator delete indices --time-unit days --older-than 14 --timestring '\%Y.\%m.\%d' --regex '^logstash-adfsv2-.*'
20 0 * * * curator delete indices --time-unit days --older-than 14 --timestring '\%Y.\%m.\%d' --regex '^logstash-20.*'

Sometimes you’ll need to do an inverse match.

0 20 * * * curator delete indices --regex '^((?!logstash).)*$'

A good way to test your regex is by using the show indices method

curator show indices --regex '^((?!logstash).)*$'

Here’s some OLD posts and links, but be aware the syntax had changed and it’s been several versions since these

http://www.ragingcomputer.com/2014/02/removing-old-records-for-logstash-elasticsearch-kibana http://www.elasticsearch.org/blog/curator-tending-your-time-series-indices/ http://stackoverflow.com/questions/406230/regular-expression-to-match-line-that-doesnt-contain-a-word

Replication and Yellow Cluster Status

By default, elasticsearch assumes you want to have two nodes and replicate your data and the default for new indexes is to have 1 replica. You may not want to do that to start with however, so you change the default and change the replica settings on your existing data in-bulk with:

http://stackoverflow.com/questions/24553718/updating-the-default-index-number-of-replicas-setting-for-new-indices

Set all existing replica requirements to just one copy

curl -XPUT 'localhost:9200/_settings' -d '
{ 
  "index" : { "number_of_replicas" : 0 } 
}'

Change the default settings for new indexes to have just one copy

curl -XPUT 'localhost:9200/_template/logstash_template' -d ' 
{ 
  "template" : "*", 
  "settings" : {"number_of_replicas" : 0 }
} '

http://stackoverflow.com/questions/24553718/updating-the-default-index-number-of-replicas-setting-for-new-indices

Unassigned Shards

You will occasionally have a hiccup where you run out of disk space or something similar and be left with indexes that have no data in them or have shards unassigned. Generally, you will have to delete them but you can also manually reassign them.

http://stackoverflow.com/questions/19967472/elasticsearch-unassigned-shards-how-to-fix

Listing Index Info

You can get a decent human readable list of your indexes using the cat api

curl localhost:9200/_cat/indices

If you wanted to list by size, they use the example

curl localhost:9200/_cat/indices?bytes=b | sort -rnk8 

1.2 - Kibana

1.2.1 - Installation (Windows)

Kibana is a Node.js app using the Express Web framework - meaning to us it looks like a web server running on port 5601. If you’re running elasticsearch on the same box, it will connect with the defaults.

https://www.elastic.co/guide/en/kibana/current/windows.html

Download and Extract

No MSI or installer is available for windows so you must download the .zip from https://www.elastic.co/downloads/kibana. Uncompress (this will take a while), rename it to ‘Kibana’ and move it to Program Files.

So that you may access it later, edit the config file at {location}/config/kibana.yaml with wordpad and set the server.host entry to:

server.host: "0.0.0.0"

Create a Service

Download the service manager NSSM from https://nssm.cc/download and extract. Start an admin powershell, navigate to the extracted location and run the installation command like so:

C:\Users\alleng\Downloads\nssm-2.24\nssm-2.24\win64> .\nssm.exe install Kibana

In the Pop-Up, set the application path to the below. The start up path will auto populate.

C:\Program Files\Kibana\kibana-7.6.2-windows-x86_64\bin\kibana.bat

Click ‘Install service’ and it should indicate success. Go to the service manager to find and start it. After a minute (Check process manager for the CPU to drop) You should be able to access it at:

http://localhost:5601/app/kibana#/home

1.2.2 - Troubleshooting

Rounding Errors

Kibana rounds to 16 significant digits

Turns out, if you have a value of type integer, that’s just the limit. While elasticsearch shows you this:

    curl http://localhost:9200/logstash-db-2016/isim-process/8163783564660983218?pretty
    {
      "_index" : "logstash-db-2016",
      "_type" : "isim-process",
      "_id" : "8163783564660983218",
      "_version" : 1,
      "found" : true,
      "_source":{"requester_name":"8163783564660983218","request_num":8163783618037078861,"started":"2016-04-07 15:16:16:139 GMT","completed":"2016-04-07 15:16:16:282 GMT","subject_service":"Service","request_type":"EP","result_summary":"AA","requestee_name":"Mr. Requester","subject":"mrRequest","@version":"1","@timestamp":"2016-04-07T15:16:16.282Z"}
    }

Kibana shows you this

View: Table / JSON / Raw
Field Action Value
request_num    8163783618037079000

Looking at the JSON will give you the clue - it’s being treated as an integer and not a string.

 "_source": {
    "requester_name": "8163783564660983218",
    "request_num": 8163783618037079000,
    "started": "2016-04-07 15:16:16:139 GMT",
    "completed": "2016-04-07 15:16:16:282 GMT",

Mutate it to string in logstash to get your precision back.

https://github.com/elastic/kibana/issues/4356

1.3 - Logstash

Logstash is a parser and shipper. It reads from (usually) a file, parses the data into JSON, then connects to something else and send the data. That something else can be Elasticsearch, a systlog server, and others.

Logstash v/s Beats

But for most things these days, Beats is a better choice. Give that a look fist.

1.3.1 - Installation

Note: Before you install logstash, take a look at Elasticsearch’s Beats. It’s lighter-weight for most tasks.

Quick Install

This is a summary of the current install page. Visit and adjust versions as needed.

# Install java
apt install default-jre-headless
apt-get install apt-transport-https
apt install gnupg2
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | apt-key add -

# Check for the current version - 7 is no longer the current version by now
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | tee -a /etc/apt/sources.list.d/elastic-7.x.list
apt update
apt-get install logstash

Logstash has a NetFlow module, but it has been deprecated2. One should instead use the Filebeat Netflow Module.3

The rest of this page is circa 2014 - use with a grain of salt.

Installation - Linux Clients

Install Java

If you don’t already have it, install it. You’ll need at least 1.7 and Oracle is recommended. However, with older systems do yourself a favor and use the OpenJDK as older versions of Sun and IBM do things with cryptography leading to strange bugs in recent releases of logstash.

# On RedHat flavors, install the OpenJDK and select it for use (in case there are others) with the system alternatives utility
sudo yum install java-1.7.0-openjdk

sudo /usr/sbin/alternatives --config java

Install Logstash

This is essentially:

( Look at https://www.elastic.co/downloads/logstash to get the lastest version or add the repo)
wget (some link from the above page)
sudo yum --nogpgcheck localinstall logstash*

# You may want to grab a plugin, like the syslog output, though elasticsearch installs by default
cd /opt/logstash/
sudo bin/plugin install logstash-output-syslog

# If you're ready to configure the service
sudo vim /etc/logstash/conf.d/logstash.conf

sudo service logstash start

https://www.elastic.co/guide/en/logstash/current/index.html

Operating

Input

The most common use of logstash is to tail and parse log files. You do this by specifying a file and filter like so

[gattis@someHost ~]$ vim /etc/logstash/conf.d/logstash.conf


input {
  file {
    path => "/var/log/httpd/request.log"
  }
}
filter {
  grok {
    match => [ "message", "%{COMBINEDAPACHELOG}"]
  }
}
output {
  stdout {
    codec => rubydebug
  }
}

Filter

There are many different types of filters, but the main one you’ll be using is grok. It’s all about parsing the message into fields. Without this, you just have a bunch of un-indexed text in your database. It ships with some handy macros such as %{COMBINEDAPACHELOG} that takes this:

10.138.120.138 - schmoej [01/Apr/2016:09:39:04 -0400] "GET /some/url.do?action=start HTTP/1.1" 200 10680 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36" 

And turns it into

agent        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36"
auth         schmoej
bytes                   10680
clientip   10.138.120.138
httpversion 1.1
path           /var/pdweb/www-default/log/request.log
referrer   "-"
request   /some/url.do?action=start
response   200
timestamp   01/Apr/2016:09:39:04 -0400
verb        GET 

See the grok’ing for more details

Output

We’re outputting to the console so we can see what’s going on with our config. If you get some output, but it’s not parsed fully because of an error in the parsing, you’ll see something like the below with a “_grokparsefailure” tag. That means you have to dig into a custom pattern as in described in grok’ing.

Note: by default, logstash is ’tailing’ your logs, so you’ll only see new entries. If you’ve got no traffic you’ll have to generate some

{
       "message" => "test message",
      "@version" => "1",
    "@timestamp" => "2014-10-31T17:39:28.925Z",
          "host" => "some.app.private",
          "tags" => [
        [0] "_grokparsefailure"
    ]
}

If it looks good, you’ll want to send it on to your database. Change your output to look like so which will put your data in a default index that kibana (the visualizer) can show by default.

output {

  elasticsearch {
    hosts => ["10.17.153.1:9200"]
  }
}

Troubleshooting

If you don’t get any output at all, check that the logstash user can actually read the file in question. Check your log files and try running logstash as yourself with the output going to the console.

cat /var/log/logstash/*

/opt/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf

1.3.2 - Operation

Basic Operation

Generally, you create a config with 3 sections;

  • input
  • filter
  • output

This example uses the grok filter to parse the message.

sudo vi /etc/logstash/conf.d/logstash.conf
input {
  file {
        path => "/var/pdweb/www-default/log/request.log"        
      }
}
filter {
  grok {
    match => [ "message", "%{COMBINEDAPACHELOG}"]
  }
}
output {
  stdout { }
}

Then you test it at the command line

# Test the config file itself
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf --configtest

# Test the parsing of data
/opt/logstash/bin/logstash -e -f /etc/logstash/conf.d/logstash.conf

You should get some nicely parsed lines. If that’s the case, you can edit your config to add a sincedb and an actual destination.

input {
  file {
        path => "/var/pdweb/www-default/log/request.log"
        sincedb_path => "/opt/logstash/sincedb"
  }
}
filter {
  grok {
    match => [ "message", "%{COMBINEDAPACHELOG}"]
  }
}
output {
  elasticsearch {
    host => "some.server.private"
    protocol => "http"
  }
}

If instead you see output with a _grokparsefailure like below, you need to change the filter. Take a look at the common gotchas, then the parse failure section below it.

{
       "message" => "test message",
      "@version" => "1",
    "@timestamp" => "2014-10-31T17:39:28.925Z",
          "host" => "some.app.private",
          "tags" => [
        [0] "_grokparsefailure"
    ]
}

Common Gotchas

No New Data

Logstash reads new lines by default. If you don’t have anyone actually hitting your webserver, but you do have some log entries in the file itself, you can tell logstash to process the exiting entries and not save it’s place in the file.

file {
  path => "/var/log/httpd/request.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
}

Multiple Conf files

Logstash uses all the files in the conf.d directory - even if they don’t end in .conf. Make sure to remove any you don’t want as they can conflict.

Default Index

Logstash creates Elasticsearch indexes that look like:

logstash-%{+YYYY.MM.dd}

The logstash folks have some great material on how to get started. Really top notch.

http://logstash.net/docs/1.4.2/configuration#fieldreferences

Parse Failures

The Greedy Method

The best way to start is to change your match to a simple pattern and work out from there. Try the ‘GREEDYDATA’ pattern and assign it to a field named ‘Test’. This takes the form of:

%{GREEDYDATA:Test}

And it looks like:

filter {
  grok {
    match => [ "message" => "%{GREEDYDATA:Test}" ]
  }
}


       "message" => "test message",
      "@version" => "1",
    "@timestamp" => "2014-10-31T17:39:28.925Z",
          "host" => "some.app.private",
          "Test" => "The rest of your message

That should give you some output. You can then start cutting it up with the patterns (also called macros) found here;

You can also use the online grok debugger and the list of default patterns.

Combining Patterns

There may not be a standard pattern for what you want, but it’s easy to pull together several existing ones. Here’s an example that pulls in a custom timestamp.

Example:
Sun Oct 26 22:20:55 2014 File does not exist: /var/www/html/favicon.ico

Pattern:
match => { "message" => "(?<timestamp>%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR})"}

Notice the ‘?’ at the beginning of the parenthetical enclosure. That tells the pattern matching engine not to bother capturing that for later use. Like opting out of a ( ) and \1 in sed.

Optional Fields

Some log formats simply skip columns when they don’t have data. This will cause your parse to fail unless you make some fields optional with a ‘?’, like this:

match => [ "message", "%{HOSTNAME:VHost}? %{COMBINEDAPACHELOG} %{IP:XForwardedFor}?"]

Date Formats

http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html

Dropping Events

Oftentimes, you’ll have messages that you don’t care about and you’ll want to drop those. Best practice is to do coarse actions first, so you’ll want to compare and drop with a general conditional like:

filter {
  if [message] =~ /File does not exist/ {
    drop { }
  }
  grok {
    ...
    ...

You can also directly reference fields once you have grok’d the message

filter {
  grok {
    match => { "message" => "%{HOSTNAME:VHost}? %{COMBINEDAPACHELOG} %{IP:XForwardedFor}?"}
  }  
  if [request] == "/status" {
        drop { }
  }
}

http://logstash.net/docs/1.4.2/configuration#conditionals

Dating Messages

By default, logstash date stamps the message when it sees them. However, there can be a delay between when an action happens and when it gets logged to a file. To remedy this - and allow you to suck in old files without the date on every event being the same - you add a date filter.

Note - you actually have to grok out the date into it’s own variable, you can’t just attempt to match on the whole message. The combined apache macro below does this for us.

filter { grok { match => { “message” => “%{HOSTNAME:VHost}? %{COMBINEDAPACHELOG} %{IP:XForwardedFor}?”} } date { match => [ “timestamp” , “dd/MMM/yyyy:HH:mm:ss Z” ] } }

In the above case, ’timestamp’ is a parsed field and you’re using the date language to tell it what the component parts are

http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html

Sending to Multiple Servers

In addition to an elasticsearch server, you may want to send it to a syslog server at the same time.

    input {
      file {
        path => "/var/pdweb/www-default/log/request.log"
        sincedb_path => "/opt/logstash/sincedb"
      }
    }

    filter {
      grok {
        match => [ "message", "%{HOSTNAME:VHost}? %{COMBINEDAPACHELOG} %{IP:XForwardedFor}?"]
      }
      date {
        match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
      }

    }

    output {
      elasticsearch {
        host => "some.server.private"
        protocol => "http"
      }
      syslog {
        host => "some.syslog.server"
        port => "514"
        severity => "notice"
        facility => "daemon"
      }
    }

Deleting Sent Events

Sometimes you’ll accidentally send a bunch of event to the server and need to delete and resend corrected versions.

http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-delete-mapping.html

curl -XDELETE <http://localhost:9200/_all/SOMEINDEX>
curl -XDELETE <http://localhost:9200/_all/SOMEINDEX?q=path:"/var/log/httpd/ssl_request_log>"

1.3.3 - Index Routing

When using logstash as a broker, you will want to route events to different indexes according to their type. You have two basic ways to do this;

  • Using Mutates with a single output
  • Using multiple Outputs

The latter is significantly better for performance. The less you touch the event, the better it seems. When testing these two different configs in the lab, the multiple output method was about 40% faster when under CPU constraint. (i.e. you can always add more CPU if you want to mutate the events.)

Multiple Outputs

    input {
      ...
      ...
    }
    filter {
      ...
      ...
    }
    output {

      if [type] == "RADIUS" {
        elasticsearch {
          hosts => ["localhost:9200"]
          index => "logstash-radius-%{+YYYY.MM.dd}"
        }
      }

      else if [type] == "RADIUSAccounting" {
        elasticsearch {
          hosts => ["localhost:9200"]
          index => "logstash-radius-accounting-%{+YYYY.MM.dd}"
        }
      }

      else {
        elasticsearch {
          hosts => ["localhost:9200"]
          index => "logstash-test-%{+YYYY.MM.dd}"
        }
      }

    }

Mutates

If your source system includes a field to tell you want index to place it in, you might be able to skip mutating altogether, but often you must look at the contents to make that determination. Doing so does reduce performance.

input {
  ...
  ...
}
filter {
  ...
  ... 

  # Add a metadata field with the destination index based on the type of event this was
  if [type] == "RADIUS" {
    mutate { add_field => { "[@metadata][index-name]" => "logstash-radius" } } 
  }
  else  if [type] == "RADIUSAccounting" {
    mutate { add_field => { "[@metadata][index-name]" => "logstash-radius-accounting" } } 
  }
  else {
    mutate { add_field => { "[@metadata][index-name]" => "logstash-test" } } 
  }
}
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "%{[@metadata][index-name]}-%{+YYYY.MM.dd}"
  }
}

https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#metadata

1.3.4 - Database Connections

You can connect Logstash to a database to poll events almost as easily as tailing a log file.

Installation

The JDBC plug-in ships with logstash so no installation of that is needed. However, you do need the JDBC driver for the DB in question.

Here’s an example for DB2, for which you can get the jar from either the server itself or the DB2 fix-pack associated with the DB Version you’re running. The elasticsearch docs say to just put it in your path. I’ve put it in the logstash folder (based on some old examples) and we’ll see if it survives upgrades.

sudo mkdir /opt/logstash/vendor/jars
sudo cp /home/gattis/db2jcc4.jar /opt/logstash/vendor/jars
sudo chown -R logstash:logstash /opt/logstash/vendor/jars

Configuration

Configuring the input

Edit the config file like so

sudo vim /etc/logstash/conf.d/logstash.conf

    input {
      jdbc {
        jdbc_driver_library => "/opt/logstash/vendor/jars/db2jcc4.jar"
        jdbc_driver_class => "com.ibm.db2.jcc.DB2Driver"
        jdbc_connection_string => "jdbc:db2://db1.tim.private:50000/itimdb"
        jdbc_user => "itimuser"
        jdbc_password => "somePassword"
        statement => "select * from someTable"
      }
    }

Filtering

You don’t need to do any pattern matching, as the input emits the event pre-parsed based on the DB columns. You may however, want to match a timestamp in the database.

    # A sample value in the 'completed' column is 2016-04-07 00:41:03:291 GMT

    filter {
      date {
        match => [ "completed" , "yyyy-MM-dd HH:mm:ss:SSS zzz" ]
      }
    }

Output

One recommended trick is to link the primary keys between the database and kibana. That way, if you run the query again you update the existing elasticsearch records rather than create duplicates ones. Simply tell the output plugin to use the existing primary key from the database for the document_id when it sends it to elasticsearch.

    # Database key is the column 'id'

    output {
      elasticsearch {
        hosts => ["10.17.153.1:9200"]
        index => "logstash-db-%{+YYYY}"

        document_id => "${id}"

        type => "isim-process"

      }
    }

Other Notes

If any of your columns are non-string type, logstash and elasticsearch will happily store them as such. But be warned that kibana will round them to 16 digits due to a limitation of javascript.

https://github.com/elastic/kibana/issues/4356

Sources

https://www.elastic.co/blog/logstash-jdbc-input-plugin https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html

1.3.5 - Multiline Matching

Here’s an example that uses the multiline codec (preferred over the multiline filter, as it’s more appropriate when you might have more than one input)

input {
  file {
    path => "/opt/IBM/tivoli/common/CTGIM/logs/access.log"
    type => "itim-access"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => multiline {
      pattern => "^<Message Id"
      negate => true
      what => previous
    }
  }
}

Getting a match can be difficult, as grok by default does not match against multiple lines. You can mutate to remove all the new lines, or use a seemingly secret preface, the ‘(?m)’ directive as shown below

filter {
  grok {
    match => { "message" => "(?m)(?<timestamp>%{YEAR}.%{MONTHNUM}.%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}%{ISO8601_TIMEZONE})%{DATA}com.ibm.itim.security.%{WORD:catagory}%{DATA}CDATA\[%{DATA:auth}\]%{DATA}CDATA\[%{DATA:clientip}\]"}
  }

https://logstash.jira.com/browse/LOGSTASH-509

1.4 - Beats

Beats are a family of lightweight shippers that you should consider as a first-solution for sending data to Elasticsearch. The two most common ones to use are:

  • Filebeat
  • Winlogbeat

Filebeat is used both for files, and for other general types, like syslog and NetFlow data.

Winlogbeat is used to load Windows events into Elasticsearch and works well with Windows Event Forwarding.

1.4.1 - Linux Installation

On Linux

A summary from the general docs. View and adjust versions as needed.

If you haven’t already added the repo:

apt-get install apt-transport-https
apt install gnupg2
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | apt-key add -
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | tee -a /etc/apt/sources.list.d/elastic-7.x.list
apt update

apt install filebeat
systemctl enable filebeat

Filebeat uses a default config file at /etc/filebeat/filebeat.yml. If you don’t want to edit that, you can use the ‘modules’ to configure it for you. That command will also load dashboard elements into Kibana, so you must have that already installed Kibana to make use of it.

Here’s a simple test

mv /etc/filebeat/filebeat.yml /etc/filebeat/filebeat.yml.orig
vi /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/*.log
output.file:
  path: "/tmp/filebeat"
  filename: filebeat
  #rotate_every_kb: 10000
  #number_of_files: 7
  #permissions: 0600

1.4.2 - Windows Installation

Installation

Download the .zip version (the msi doesn’t include the server install script) from the URL below. Extract, rename to Filebeat and move it the to the c:\Program Files directory.

https://www.elastic.co/downloads/beats/filebeat

Start an admin powershell, change to that directory and run the service install command. (Keep the shell up for later when done)

PowerShell.exe -ExecutionPolicy UnRestricted -File .\install-service-filebeat.ps1

Basic Configuration

Edit the filebeat config file.

write.exe filebeat.yml

You need to configure the input and output sections. The output is already set to elasticsearch localhost so you only have to change the input from the unix to the windows style.

  paths:
    #- /var/log/*.log
    - c:\programdata\elasticsearch\logs\*

Test as per normal

  ./filebeat test config -e

Filebeat specific dashboards must be added to Kibana. Do that with the setup argument:

  .\filebeat.exe setup --dashboards

To start Filebeat in the forrgound (to see any interesting messages)

  .\filebeat.exe -e

If you’re happy with the results, you can stop the application then start the service

  Ctrl-C
  Start-Service filebeat

Adapted from the guide at

https://www.elastic.co/guide/en/beats/filebeat/7.6/filebeat-getting-started.html

1.4.3 - NetFlow Forwarding

The NetFlow protocol is now implemented in Filebeat1. Assuming you’ve installed Filebeat and configured Elasticsearch and Kibana, you can use this input module to auto configure the inputs, indexes and dashboards.

./filebeat modules enable netflow
filebeat setup -e

If you are just testing and don’t want to add the full stack, you can set up the netflow input2 which the module is a wrapper for.

filebeat.inputs:
- type: netflow
  max_message_size: 10KiB
  host: "0.0.0.0:2055"
  protocols: [ v5, v9, ipfix ]
  expiration_timeout: 30m
  queue_size: 8192
output.file:
  path: "/tmp/filebeat"
  filename: filebeat
filebeat test config -e

Consider dropping all the fields you don’t care about as there are a lot of them. Use the include_fields processor to limit what you take in

  - include_fields:
      fields: ["destination.port", "destination.ip", "source.port", "source.mac", "source.ip"]

1.4.4 - Palo Example

# This filebeat config accepts TRAFFIC and SYSTEM syslog messages from a Palo Alto, 
# tags and parses them 

# This is an arbitrary port. The normal port for syslog is UDP 512
filebeat.inputs:
  - type: syslog
    protocol.udp:
      host: ":9000"

processors:
    # The message field will have "TRAFFIC" for  netflow logs and we can 
    # extract the details with a CSV decoder and array extractor
  - if:
      contains:
        message: ",TRAFFIC,"
    then:
      - add_tags:
          tags: "netflow"
      - decode_csv_fields:
          fields:
            message: csv
      - extract_array:
          field: csv
          overwrite_keys: true
          omit_empty: true
          fail_on_error: false
          mappings:
            source.ip: 7
            destination.ip: 8
            source.nat.ip: 9
            network.application: 14
            source.port: 24
            destination.port: 25
            source.nat.port: 26
      - drop_fields:
          fields: ["csv", "message"] 
    else:
        # The message field will have "SYSTEM,dhcp" for dhcp logs and we can 
        # do a similar process to above
      - if:
          contains:
            message: ",SYSTEM,dhcp"
        then:
        - add_tags:
            tags: "dhcp"
        - decode_csv_fields:
            fields:
              message: csv
        - extract_array:
            field: csv
            overwrite_keys: true
            omit_empty: true
            fail_on_error: false
            mappings:
              message: 14
        # The DHCP info can be further pulled apart using space as a delimiter
        - decode_csv_fields:
            fields:
              message: csv2
            separator: " "
        - extract_array:
            field: csv2
            overwrite_keys: true
            omit_empty: true
            fail_on_error: false
            mappings:
              source.ip: 4
              source.mac: 7
              hostname: 10
        - drop_fields:
            fields: ["csv","csv2"] # Can drop message too like above when we have watched a few        
  - drop_fields:
      fields: ["agent.ephemeral_id", "agent.hostname", "agent.id", "agent.type", "agent.version", "ecs.version","host.name","event.severity","input.type","hostname","log.source.address","syslog.facility", "syslog.facility_label", "syslog.priority", "syslog.priority_label","syslog.severity_label"]
      ignore_missing: true
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
output.elasticsearch:
  hosts: ["localhost:9200"]

1.4.5 - RADIUS Forwarding

Here’s an example of sending FreeRADIUS logs to Elasticsearch.

cat /etc/filebeat/filebeat.yml
filebeat.inputs:
  - type: log
    paths:
      - /var/log/freeradius/radius.log
    include_lines: ['\) Login OK','incorrect']
    tags: ["radius"]
processors:
  - drop_event:
      when:
        contains:
          message: "previously"
  - if:
      contains:
        message: "Login OK"
    then: 
      - dissect:
          tokenizer: "%{key1} [%{source.user.id}/%{key3}cli %{source.mac})"
          target_prefix: ""
      - drop_fields:
          fields: ["key1","key3"]
      - script:
          lang: javascript
          source: >
            function process(event) {
                var mac = event.Get("source.mac");
                if(mac != null) {
                        mac = mac.toLowerCase();
                         mac = mac.replace(/-/g,":");
                         event.Put("source.mac", mac);
                }
              }            
    else:
      - dissect:
          tokenizer: "%{key1} [%{source.user.id}/<via %{key3}"
          target_prefix: ""
      - drop_fields: 
          fields: ["key1","key3"]
output.elasticsearch:
  hosts: ["http://logcollector.yourorg.local:9200"]        
  allow_older_versions: true
  setup.ilm.enabled: false

1.4.6 - Syslog Forwarding

You may have an older system or appliance that can transmit syslog data. You can use filebeat to accept that data and store it in Elasticsearch.

Add Syslog Input

Install filebeat and test the reception the /tmp.

vi  /etc/filebeat/filebeat.yml

filebeat.inputs:
- type: syslog
  protocol.udp:
    host: ":9000"
output.file:
  path: "/tmp"
  filename: filebeat


sudo systemctl filebeat restart

pfSense Example

The instructions are NetGate’s remote logging example.

Status -> System Logs -> Settings

Enable and configure. Internet rumor has it that it’s UDP only so the config above reflects that. Interpreting the output requires parsing the message section detailed in the filter log format docs.

'5,,,1000000103,bge1.1099,match,block,in,4,0x0,,64,0,0,DF,17,udp,338,10.99.147.15,255.255.255.255,2048,30003,318'

'5,,,1000000103,bge2,match,block,in,4,0x0,,84,1,0,DF,17,udp,77,157.240.18.15,205.133.125.165,443,61343,57'

'222,,,1000029965,bge2,match,pass,out,4,0x0,,128,27169,0,DF,6,tcp,52,205.133.125.142,205.133.125.106,5225,445,0,S,1248570004,,8192,,mss;nop;wscale;nop;nop;sackOK'

'222,,,1000029965,bge2,match,pass,out,4,0x0,,128,11613,0,DF,6,tcp,52,205.133.125.142,211.24.111.75,15305,445,0,S,2205942835,,8192,,mss;nop;wscale;nop;nop;sackOK'

2 - Loki

Loki is a system for handling logs (unstructured data) but is lighter-weight than Elasticsearch. It also has fewer add-ons. But if you’re already using Prometheus and Grafana and you want to do it yourself, it can be a better solution.

Installation

Install Loki and Promtail together. These are available in the debian stable repos at current version. No need to go to backports or testing

sudo apt install loki promtail
curl localhost:3100/metrics

Configuration

Default config files are create in /etc/loki and /etc/promtail. Promtail is tailing /var/log/*log file, pushing them to localhost loki on the default port (3100) and loki is saving data in the /tmp directory. This is fine for testing.

Promtail runs as the promtail user (not root) and can’t read anything useful, so add them to the adm group.

sudo usermod -a -G adm promtail
sudo systemctl restart promtail

Grafana Integration

In grafana, add a datasource.

Configuration –> Add new data source –> Loki

Set the URL to http://localhost:3100

Then view the logs

Explore –> Select label (filename) –> Select value (daemon)

Troubleshooting

error notifying frontend about finished query

Edit the timeout setting in your loki datasource. The default may be too short so set it to 30s or some such

Failed to load log volume for this query

If you added a logfmt parser like the gui suggested, you may find not all your entries can be parsed, leading to this error.:w

3 - Network Traffic

Recoding traffic on the network is critical for troubleshooting and compliance. For the latter, the most common strategy is to record the “flows”. These are the connections each host makes or accepts, and how much data is involved.

You can collect this information at the LAN on individual switches, but the WAN (at the router) is usually more important. And if the router is performing NAT, it’s the only place to record the mappings of internal to external IPs and ports.

Network Log System

A network flow log system usually has three main parts.

Exporter --> Collector --> Analyzer

The Exporter, which records the data, the Collector, which is where the data is stored, and the Analyzer which makes the data more human-readable.

Example

We’ll use a Palo Alto NG Firewall as our exporter, and an Elasticsearch back-end. The data we are collecting is essentially log data, and Elasticsearch is probably the best at handling unstructured information.

At small scale, you can combine all of the the collection and analysis parts on a single system. We’ll use windows servers in our example as well.

graph LR

A(Palo)
B(Beats)
C(ElasticSearch)
D(Kibana) 

subgraph Exporter
A
end

subgraph Collector and Analyzer
B --> C --> D
end

A --> B

Installation

Start with Elasticsearch and Kibana, then install Beats.

Configuration

Beats and Palo have a couple of protocols in common. NetFlow is the traditional protocol, but when you’re using NAT the best choice is the syslog protocol as the Palo will directly tell you the NAT info all in one record and you don’t have to correlate multiple interface flows to see who did what.

Beats

On the Beats server, start an admin powershell session, change to the Filebeat directory, edit the config file and restart the server.

There is a bunch of example text in the config so tread carefully and keep in mind that indentation matters. Stick this block right under the filebeat.inputs: line and you should be OK.

This config stanza has a processor block that decodes the CVS content sent over in the message field, extracts a few select fields, then discards the rest. There’s quite a bit left over though, so see tuning below if you’d like to reduce the data load even more.

cd "C:\Program Files\Filebeat"
write.exe filebeat.yml

filebeat.inputs:
- type: syslog
  protocol.udp:
    host: ":9000"
  processors:
  - decode_csv_fields:
      fields:
        message: csv
  - extract_array:
      field: csv
      overwrite_keys: true
      omit_empty: true
      fail_on_error: false
      mappings:
        source.ip: 7
        destination.ip: 8
        source.nat.ip: 9
        network.application: 14
        source.port: 24
        destination.port: 25
        source.nat.port: 26
  - drop_fields:
        fields: ["csv", "message"]

A larger is example is under the beats documentation.

Palo Alto Setup

Perform steps 1 and 2 of the Palo setup guide with the notes below.

https://docs.paloaltonetworks.com/pan-os/10-0/pan-os-admin/monitoring/use-syslog-for-monitoring/configure-syslog-monitoring

  • In step 1 - The panw module defaults to 9001
  • In step 2 - Make sure to choose Traffic as the type of log

Tuning

You can reduce the amount of data even more by adding a few more Beats directives.

# At the very top level of the file, you can add this processor to affect global fields
processors:
- drop_fields:
   fields: ["agent.ephemeral_id","agent.id","agent.hostname","agent.type","agent.version","ecs.version","host.name"]

# You can also drop syslog fields that aren't that useful (you may need to put this under the syslog input)
- drop_fields:
    fields: ["event.severity","input.type","hostname","syslog.facility", "syslog.facility_label", "syslog.priority", "syslog.priority_label","syslog.severity_label"]

You may want even more data. See the Full Palo Syslog data to see what’s available. An example

Conclusion

At this point you can navigate to the Kibana web console and explore the logs. There is no dashboard as this is just for log retention and covers the minimum required. If you’re interested in more, check out the SIEM and Netflow dashboards Elasticsearch offers.

Sources

Palo Shipping

https://docs.logz.io/shipping/security-sources/palo-alto-networks.html

4 - NXLog

This info on NXLog is circa 2014 - use with caution.

NXLog is best used when Windows Event Forwarding can’t be and filebeats isn’t sufficient.

Background

There are several solutions for capturing logs in Windows, but NXLog has some advantages;

  • Cross-platform and Open Source
  • Captures windows events pre-parsed
  • Native windows installer and service

You could just run logstash everywhere. But in practice, Logstash’s memory requirements are several times NXLog and not everyone likes to install java everywhere.

Deploy on Windows

Download from http://nxlog.org/download. This will take you to the sourceforge site and the MSI you can install from. This installation is clean and the service installs automatically.

Configure on Windows

NXLog uses a config file with blocks in the basic pattern of:

  • Input Block
  • Output Block
  • Routing Block

The latter being what ties together your inputs and outputs. You start out with one variable, called the $raw_event with everything in it. As you call modules, that variable gets parsed out to more useful individual variables.

Event Viewer Example

Here’s an example of invoking the module that pulls in data from the windows event log entries associated.

  • Navigate to C:\Program Files (x86)\nxlog\conf
  • Edit the security settings on the file nxlog.conf. Change the ‘Users’ to have modify rights. This allows you to actually edit the config file.
  • Open that file in notepad and simply change it to look like so
    # Set the ROOT to the folder your nxlog was installed into
    define ROOT C:\Program Files (x86)\nxlog

    ## Default required locations based on the above
    Moduledir %ROOT%\modules
    CacheDir %ROOT%\data
    Pidfile %ROOT%\data\nxlog.pid
    SpoolDir %ROOT%\data
    LogFile %ROOT%\data\nxlog.log

    # Increase to DEBUG if needed for diagnosis
    LogLevel INFO

    # Input the windows event logs
    <Input in>
      Module      im_msvistalog
    </Input>


    # Output the logs to a file for testing
    <Output out>
        Module      om_file
        File        "C:/Program Files (x86)/nxlog/data/log-test-output.txt"
    </Output>

    # Define the route by mapping the input to an output
    <Route 1>
        Path        in => out
    </Route>

With any luck, you’ve now got some lines in your output file.

File Input Example

    # Set the ROOT to the folder your nxlog was installed into
    define ROOT C:\Program Files (x86)\nxlog

    ## Default required locations based on the above
    Moduledir %ROOT%\modules
    CacheDir %ROOT%\data
    Pidfile %ROOT%\data\nxlog.pid
    SpoolDir %ROOT%\data
    LogFile %ROOT%\data\nxlog.log

    # Increase to DEBUG if needed for diagnosis
    LogLevel INFO

    # Input a test file 
    <Input in>
        Module      im_file
        File ""C:/Program Files (x86)/nxlog/data/test-in.txt"
        SavePos     FALSE   
        ReadFromLast FALSE
    </Input>

    # Output the logs to a file for testing
    <Output out>
        Module      om_file
        File        "C:/Program Files (x86)/nxlog/data/log-test-output.txt"
    </Output>

    # Define the route by mapping the input to an output
    <Route 1>
        Path        in => out
    </Route>

Sending Events to a Remote Logstash Receiver

To be useful, you need to send your logs somewhere. Here’s an example of sending them to a Logstash receiver.

    # Set the ROOT to the folder your nxlog was installed into
    define ROOT C:\Program Files (x86)\nxlog

    ## Default required locations based on the above
    Moduledir %ROOT%\modules
    CacheDir %ROOT%\data
    Pidfile %ROOT%\data\nxlog.pid
    SpoolDir %ROOT%\data
    LogFile %ROOT%\data\nxlog.log

    # Increase to DEBUG if needed for diagnosis
    LogLevel INFO

    # Load the JSON module needed by the output module
    <Extension json>
        Module      xm_json
    </Extension>

    # Input the windows event logs
    <Input in>
      Module      im_msvistalog
    </Input>


    # Output the logs out using the TCP module, convert to JSON format (important)
    <Output out>
        Module      om_tcp
        Host        some.server
        Port        6379
        Exec to_json();
    </Output>

    # Define the route by mapping the input to an output
    <Route 1>
        Path        in => out
    </Route>

    Restart the service in the windows services, and you are in business.

Note about JSON

You’re probably shipping logs to a logstash broker (or similar json based tcp receiver). In that case, make sure to specify JSON on the way out, as in the example above or you’ll spend hours trying to figure out why you’re getting a glob of plain txt and loose all the pre-parsed windows event messages which are nearly impossible to parse back from plain text.

Using that to_json() will replace the contents. The variable we mentioned earlier, $raw_event, with all of the already parsed fields. If you hand’t invoked a module to parse that data out, you’d just get a bunch of empty events as the data was replaced with a bunch of nothing.

4.1 - Drop Events

Exec

You can use the ‘Exec’ statement in any block and some pattern matching to drop events you don’t care about.

<Input in>
  Module im_file
  File "E:/Imports/get_accessplans/log-test.txt"
  Exec if $raw_event =~ /someThing/ drop();
</Input>

Or the inverse, with the operator !~

Dropping Events with pm_pattern

The alternative is the patternDB approach as it has some parallelization advantages you’ll read about in the docs should you dig into it further. This matters when you have lots of patterns to check against.

# Set the ROOT to the folder your nxlog was installed into
define ROOT C:\Program Files (x86)\nxlog

## Default required locations based on the above
Moduledir %ROOT%\modules
CacheDir %ROOT%\data
Pidfile %ROOT%\data\nxlog.pid
SpoolDir %ROOT%\data
LogFile %ROOT%\data\nxlog.log

# Increase to DEBUG if needed for diagnosis
LogLevel INFO

# Load the JSON module needed by the output module
<Extension json>
    Module      xm_json
</Extension>

# Input the windows event logs
<Input in>
  Module      im_msvistalog
</Input>

# Process log events 
<Processor pattern>
  Module  pm_pattern
  PatternFile %ROOT%/conf/patterndb.xml
</Processor>

# Output the logs out using the TCP module, convert to JSON format (important)
<Output out>
    Module      om_tcp
    Host        some.server
    Port        6379
    Exec to_json();
</Output>

# Define the route by mapping the input to an output
<Route 1>
    Path        in => pattern => out
</Route>

And create an XML file like so:

<?xml version="1.0" encoding="UTF-8"?>
<patterndb>

  <group>
    <name>eventlog</name>
    <id>1</id>

   <pattern>
      <id>2</id>
      <name>500s not needed</name> 
      <matchfield>
        <name>EventID</name>
        <type>exact</type>
        <value>500</value>
      </matchfield>
      <exec>drop();</exec>
    </pattern>

    
  </group>

</patterndb>

4.2 - Event Log

Limiting Log Messages

You may not want ALL the event logs. You can add a query to that module however, and limit logs to the security logs, like so

<Input in>
  Module im_msvistalog
  Query <QueryList><Query Id="0" Path="Security"><Select Path="Security">*</Select></Query></QueryList>
</Input>

You can break that into multiple lines for easier reading by escaping the returns. Here’s an example that ships the ADFS Admin logs.

<Input in>
  Module im_msvistalog
  Query <QueryList>\
            <Query Id="0">\
                <Select Path='AD FS 2.0/Admin'>*</Select>\
            </Query>\
        </QueryList>
</Input>

Pulling out Custom Logs

If you’re interested in very specific logs, you can create a custom view in the windows event viewer, and after selecting the criteria in with the graphical tool, click on the XML tab to see what the query is. For example, to ship all the ADFS 2 logs (assuming you’ve turned on auditing) Take the output of the XML tab (shown below) and modify to be compliant with the nxlog format.

<QueryList>
  <Query Id="0" Path="AD FS 2.0 Tracing/Debug">
    <Select Path="AD FS 2.0 Tracing/Debug">*[System[Provider[@Name='AD FS 2.0' or @Name='AD FS 2.0 Auditing' or @Name='AD FS 2.0 Tracing']]]</Select>
    <Select Path="AD FS 2.0/Admin">*[System[Provider[@Name='AD FS 2.0' or @Name='AD FS 2.0 Auditing' or @Name='AD FS 2.0 Tracing']]]</Select>
    <Select Path="Security">*[System[Provider[@Name='AD FS 2.0' or @Name='AD FS 2.0 Auditing' or @Name='AD FS 2.0 Tracing']]]</Select>
  </Query>
</QueryList

Here’s the query from a MS NPS

<QueryList>
  <Query Id="0" Path="System">
    <Select Path="System">*[System[Provider[@Name='NPS']]]</Select>
    <Select Path="System">*[System[Provider[@Name='HRA']]]</Select>
    <Select Path="System">*[System[Provider[@Name='Microsoft-Windows-HCAP']]]</Select>
    <Select Path="System">*[System[Provider[@Name='RemoteAccess']]]</Select>
    <Select Path="Security">*[System[Provider[@Name='Microsoft-Windows-Security-Auditing'] and Task = 12552]]</Select>
  </Query>
</QueryList>

4.3 - Input File Rotation

NXLog has decent ability to rotate it’s own output files, but it’s doesn’t come with a lot of methods to rotate input files - i.e. your reading in Accounting logs from a windows RADIUS and it would be nice to archive those with NXLog, because Windows won’t do it. You could bust out some perl (if you’re on unix) and use the xm_perl module, but there’s a simpler way.

On windows, the solution is to use an exec block with a scheduled command. The forfiles executable is already present in windows and does the trick. The only gotcha is that ALL the parameters must be delimited like below.

So the command

forfiles /P "E:\IAS_Logs" /D -1 /C "cmd /c move @file \\server\share"

Becomes

<Extension exec>
  Module xm_exec
  <Schedule>
    When @daily
 Exec exec('C:\Windows\System32\forfiles.exe','/P','"E:\IAS_Logs"','/D','-1','/C','"cmd','/c','move','@file','\\server\share"');
  </Schedule>
</Extension>

A slightly more complex example with added compression and removal of old files (there isn’t a great command line zip utility for windows in advance of powershell 5)

# Add log rotation for the windows input files
<Extension exec>
    Module xm_exec
    <Schedule>
     When @daily
         # Make a compressed copy of .log files older than 1 day
         Exec exec('C:\Windows\System32\forfiles.exe','/P','"E:\IAS_Logs"','/M','*.log','/D','-1','/C','"cmd','/c','makecab','@file"')
         # Delete original files after 2 days, leaving the compressed copies 
         Exec exec('C:\Windows\System32\forfiles.exe','/P','"E:\IAS_Logs"','/M','*.log','/D','-2','/C','"cmd','/c','del','@file"')
         # Move compressed files to the depot after 2 days
         Exec exec('C:\Windows\System32\forfiles.exe','/P','"E:\IAS_Logs"','/M','*.lo_','/D','-2','/C','"cmd','/c','move','@file','\\shared.ohio.edu\appshare\radius\logs\radius1.oit.ohio.edu"');
    </Schedule>
</Extension>

The @daily runs right at 0 0 0 0 0 (midnight every night). Check the manual for more precise cron controls

4.4 - Inverse Matching

You can use the ‘Exec’ statement to match inverse like so

<Input in>
  Module im_file
  File "E:/Imports/get_accessplans/log-test.txt"
  Exec if $raw_event !~ /someThing/ drop();
</Input>

However, when you’re using a pattern db this is harder as the REGEXP doesn’t seem to honor inverses like you’d expect. Instead, you must look for matches in your pattern db like normal;

<?xml version="1.0" encoding="UTF-8"?>
<patterndb>

  <group>
    <name>eventlog</name>
    <id>1</id>

    <pattern>
      <id>2</id>
      <name>Identify user login success usernames</name>

      <matchfield>
        <name>EventID</name>
        <type>exact</type>
        <value>501</value>
      </matchfield>

      <matchfield>
        <name>Message</name>
        <type>REGEXP</type>
        <value>windowsaccountname \r\n(\S+)</value>
        <capturedfield>
          <name>ADFSLoginSuccessID</name>
          <type>STRING</type>
        </capturedfield>
      </matchfield

   </pattern>
  </group>
</patterndb>

Then, add a section to your nxlog.conf to take action when the above capture field doesn’t existing (meaning there wasn’t a regexp match).

...

# Process log events 
<Processor pattern>
  Module  pm_pattern
  PatternFile %ROOT%/conf/patterndb.xml
</Processor>

# Using a null processor just to have a place to put the exec statement
<Processor filter>
 Module pm_null
 Exec if (($EventID == 501) and ($ADFSLoginSucccessID == undef)) drop();
</Processor>

# Output the logs out using the TCP module, convert to JSON format (important)
<Output out>
    Module      om_tcp
    Host        some.server
    Port        6379
    Exec to_json();
</Output>

# Define the route by mapping the input to an output
<Route 1>
    Path        in => pattern => filter => out
</Route>

4.5 - Logstash Broker

When using logstash as a Broker/Parser to receive events from nxlog, you’ll need to explicitly tell it that the message is in json format with a filter, like so:

input {
  tcp {
    port => 6379
    type => "WindowsEventLog"
  }
}
filter {
  json {
    source => message
  }
}
output {
  stdout { codec => rubydebug }
}

4.6 - Manipulating Data

Core Fields

NXLog makes and handful of attributes about the event available to you. Some of these are from the ‘core’ module

$raw_event
$EventReceivedTime
$SourceModuleName
$SourceModuleType

Additional Fields

These are always present and added to by the input module or processing module you use. For example, the mseventlog module adds all the attributes from the windows event logs as attributes to the nxlog event. So your event contains:

$raw_event
$EventReceivedTime
$SourceModuleName
$SourceModuleType
$Message
$EventTime
$Hostname
$SourceName
$EventID
...

You can also create new attributes by using a processing module, such as parsing an input file’s XML. This will translate all the tags (within limites) into attributes.

<Extension xml>
    Module xm_xml
</Extension>
<Input IAS_Accounting_Logs>
    Module im_file
    File "E:\IAS_Logs\IN*.log"
    Exec parse_xml();
</Input>

And you can also add an Exec at any point to create or replace new attribute as desired

<Input IAS_Accounting_Logs>
    Module im_file
    File "E:\IAS_Logs\IN*.log"
    Exec $type = "RADIUSAccounting";
</Input>

Rewriting Data

Rather than manipulate everything in the input and output modules, use the pm_null module to group a block together.

<Processor rewrite>
    Module pm_null
    Exec parse_syslog_bsd();\
 if $Message =~ /error/ \
                {\
                  $SeverityValue = syslog_severity_value("error");\
                  to_syslog_bsd(); \
                }
</Processor>


<Route 1>
    Path in => rewrite => fileout
</Route>

4.7 - NPS Example

define ROOT C:\Program Files (x86)\nxlog

Moduledir %ROOT%\modules
CacheDir %ROOT%\data
Pidfile %ROOT%\data\nxlog.pid
SpoolDir %ROOT%\data
LogFile %ROOT%\data\nxlog.log


# Load the modules needed by the outputs
<Extension json>
    Module      xm_json
</Extension>

<Extension xml>
    Module xm_xml
</Extension>


# Inputs. Add the field '$type' so the receiver can easily tell what type they are.
<Input IAS_Event_Logs>
    Module      im_msvistalog
    Query \
 <QueryList>\
 <Query Id="0" Path="System">\
 <Select Path="System">*[System[Provider[@Name='NPS']]]</Select>\
 <Select Path="System">*[System[Provider[@Name='HRA']]]</Select>\
  <Select Path="System">*[System[Provider[@Name='Microsoft-Windows-HCAP']]]</Select>\
 <Select Path="System">*[System[Provider[@Name='RemoteAccess']]]</Select>\
 <Select Path="Security">*[System[Provider[@Name='Microsoft-Windows-Security-Auditing'] and Task = 12552]]</Select>\
 </Query>\
 </QueryList>
    Exec $type = "RADIUS";
</Input>

<Input IAS_Accounting_Logs>
    Module      im_file
    File  "E:\IAS_Logs\IN*.log"
    Exec parse_xml();
    Exec $type = "RADIUSAccounting";
</Input>


# Output the logs out using the TCP module, convert to JSON format (important)
<Output broker>
    Module      om_tcp
    Host        192.168.1.1
    Port        8899
    Exec to_json();
</Output>


# Routes
<Route 1>
    Path        IAS_Event_Logs,IAS_Accounting_Logs => broker
</Route>


# Rotate the input logs while we're at it, so we don't need a separate tool
<Extension exec>
    Module xm_exec
    <Schedule>
     When @daily
      #Note -  the Exec statement is one line but may appear wrapped
                Exec exec('C:\Windows\System32\forfiles.exe','/P','"E:\IAS_Logs"','/D','-1','/C','"cmd','/c','move','@file','\\some.windows.server\share\logs\radius1"');
    </Schedule>
</Extension>

4.8 - Parsing

You can also extract and set values with a pattern_db, like this; (Note, nxlog uses perl pattern matching syntax if you need to look things up)

<?xml version="1.0" encoding="UTF-8"?>
<patterndb>

  <group>
    <name>ADFS Logs</name>
    <id>1</id>

   <pattern>
      <id>2</id>
      <name>Identify user login fails</name>
      <matchfield>
        <name>EventID</name>
        <type>exact</type>
        <value>111</value>
      </matchfield>

      <matchfield>
        <name>Message</name>
        <type>REGEXP</type>
        <value>LogonUser failed for the '(\S+)'</value>
        <capturedfield>
          <name>ADFSLoginFailUsername</name>
          <type>STRING</type>
        </capturedfield>
      </matchfield>

      <set>
        <field>
          <name>ADFSLoginFail</name>
          <value>failure</value>
          <type>string</type>
        </field>
      </set>
    </pattern>

And a more complex example, where we’re matching against a sting like:

2015-03-03T19:45:03 get_records 58  DailyAddAcct completed (Success) with: 15 Records Processed 0 adds 0 removes 0 modified 15 unchanged 


<?xml version="1.0" encoding="UTF-8"?>
<patterndb>

  <group>
    <name>Bbts Logs</name>
    <id>1</id>
  
    <pattern>
      <id>2</id>
      <name>Get TS Records</name> 
 
      <matchfield>
        <name>raw_event</name>
        <type>REGEXP</type>
        <value>^(\S+) get_record (\S+)\s+(\S+) completed \((\S+)\) with: (\S+) Records Processed (\S+) adds (\S+) removes (\S+) modified (\S+) unchanged</value>
 
        <capturedfield>
          <name>timestamp</name>
          <type>STRING</type>
        </capturedfield>

        <capturedfield>
          <name>Transaction_ID</name>
          <type>STRING</type>
        </capturedfield>

         <capturedfield>
          <name>Job_Subtype</name>
          <type>STRING</type>
        </capturedfield>

        <capturedfield>
          <name>Job_Status</name>
          <type>STRING</type>
        </capturedfield>

        <capturedfield>
          <name>Record_Total</name>
          <type>STRING</type>
        </capturedfield>

        <capturedfield>
          <name>Record_Add</name>
          <type>STRING</type>
        </capturedfield>

        <capturedfield>
          <name>Record_Remove</name>
          <type>STRING</type>
        </capturedfield>

        <capturedfield>
          <name>Record_Mod</name>
          <type>STRING</type>
        </capturedfield>

        <capturedfield>
          <name>Record_NoChange</name>
          <type>STRING</type>
        </capturedfield>

      </matchfield>

      <set>
        <field>
          <name>Job_Type</name>
          <value>Get_Records</value>
          <type>string</type>
        </field>
      </set>
    </pattern>


  </group>
</patterndb>

4.9 - Reprocessing

Sometimes you have a parse error when you’re testing and you need to feed all your source files back in. Problem is you’re usually saving position and reading only new entries by default.

Defeat this by adding a line to the nxlog config so it starts reading files at the beginning and deleting the ConfigCache file (so there’s no last position to start from).

    <Input IAS_Accounting_Logs>
        Module      im_file
        ReadFromLast FALSE
        File  "E:\IAS_Logs\IN*.log"
        Exec parse_xml();
        Exec $type = "RADIUSAccounting";
    </Input>

del C:\Program Files (x86)\nxlog\data\configcache.dat

Restart and it will begin reprocessing all the data. When you’re done, remove the ReadFromLast line and restart.

Note: If you had just deleted the cache file, nxlog would have resumed at the tail of the file. You could have told it not to save position, but you actually do want that for when you’re ready to resume normal operation.

https://www.mail-archive.com/[email protected]/msg00158.html

4.10 - Syslog

There are two components; adding the syslog module and adding the export path.

    <Extension syslog>
        Module xm_syslog
    </Extension>

    <Input IAS_Accounting_Logs>
        Module      im_file
        File  "E:\IAS_Logs\IN*.log"
        Exec $type = "RADIUSAccounting";
    </Input>

    <Output siem>
    Module om_udp
    Host 192.168.1.1
    Port 514
    Exec to_syslog_ietf();

    </Output>

    <Route 1>
        Path        IAS_Accounting_Logs => siem
    </Route>

4.11 - Troubleshooting

If you see this error message from nxlog:

ERROR Couldn't read next event, corrupted eventlog?; The data is invalid.

Congrats - you’ve hit a bug.

https://nxlog.org/support-tickets/immsvistalog-maximum-event-log-count-support

The work-around is to limit your log event subscriptions on the input side by using a query. Example:

<Input in>
  Module im_msvistalog
  Query <QueryList><Query Id="0" Path="Microsoft-Windows-PrintService/Operational"><Select Path="Microsoft-Windows-PrintService/Operational">*</Select></Query></QueryList>
  Exec if $EventID != 307 drop();
  Exec $type = "IDWorks";
</Input>

Parse failure on windows to logstash

We found that nxlog made for the best windows log-shipper. But it didn’t seem to parse the events in the event log. Output to logstash seemed not to be in json format, and we confirmed this by writing directly to disk. This happens even though the event log input module explicitly emits the log attributes atomically.

Turns out you have to explicitly tell the output module to use json. This isn’t well documented.

4.12 - UNC Paths

When using Windows UNC paths, don’t forget that the backslash is also used for escaping characters, so the path

    \\server\radius 

looks like

    \\server;adius 

in your error log message. You’ll want to escape your back slashes like this;

\\\\server\\radius\\file.log

4.13 - Unicode Normalization

Files you’re reading may be any character set and this can cause strange things when you modify or pass the data on, as an example at stack exchange shows. This isn’t a problem with windows event logs, but windows applications use several different types of charsets.

Best practice is to convert everything to UTF-8. This is especially true when invoking modules such as json, that don’t handle other codes well.

NXLog has the ability to convert and can even to this automatically. However, there is some room for error. If you can, identity what the encoding is by looking at it in a hex editor and comparing to MS’s identification chart.

Here’s an snippet of a manual conversion of a powershell generated log. Having looked at the first part and identified it as UTF-16LE

...
<Extension charconv>
    Module xm_charconv
    AutodetectCharsets utf-8, utf-16, utf-32, iso8859-2, ucs-2le
</Extension>

<Input in1>
    Module      im_file
    File "E:/Imports/log.txt"
    Exec  $raw_event = convert($raw_event,"UTF-16LE","UTF-8");
</Input>
...

Notice however that the charconv module has an automatic directive. You can use that as long as what you have is included as marked in bold here.

<Extension charconv>
    Module xm_charconv
    AutodetectCharsets utf-8, utf-16, utf-16le, utf-32, iso8859-2
</Extension>


<Input sql-ERlogs>
    Module      im_file
    File 'C:\Program Files\Microsoft SQL Server\MSSQL11.SQL\MSSQL\Log\ER*'
    ReadFromLast TRUE
    Exec        convert_fields("AUTO", "utf-8");
</Input>

If you’re curious what charsets are supported, you can type this command in any unix system to see the names.

iconv -i

4.14 - Windows Files

Windows uses UTF-16 by default. Other services may use derivations thereof. In any event, it’s recommended to normalize things to UTF-8. Here’s a good example of what will happen if you don’t;

<http://stackoverflow.com/questions/27596676/nxlog-logs-are-in-unicode-charecters>

The answer to that question is to use the specific code field, as “AUTO” doesn’t seem to detect properly.

<Input in>
    Module      im_file
    File "E:/Imports/get_accessplans/log-test.txt"
    Exec if $raw_event == '' drop(); 
    Exec $Event = convert($raw_event,"UCS-2LE","UTF-8"); to_json();
    SavePos     FALSE   
    ReadFromLast FALSE
</Input>

From the manual on SQL Server

Microsoft SQL Server

Microsoft SQL Server stores its logs in UTF-16 encoding using a line-based format.
It is recommended to normalize the encoding to UTF-8. The following config snipped
will do that.

<Extension _charconv>
    Module xm_charconv
</Extension>

<Input in>
    Module im_file
    File "C:\\MSSQL\\ERRORLOG"
    Exec convert_fields('UCS-2LE','UTF-8'); if $raw_event == '' drop();
</Input>

As of this writing, the LineBased parser, the default InputType for im_file is not able to properly read the double-byte UTF-16 encoded files and will read an additional empty line (because of the double-byte CRLF). The above drop() call is intended to fix this.

convert_fields('UTF-16','UTF-8'); might also work instead of UCS-2LE.

5 - Windows Event Forwarding

If you’re in a Windows shop, this is the best way to keep the Windows admins happy. No installation of extra tools. ‘Keeps it in the MS family’ so to speak.

Configure your servers to push1 logs to a cental location and use a client there, to send it on. Beats works well for this.

The key seems to be

  • Create a domain service account or add the machine account
  • add that to the group on the client

check the runtime status on the collector

For printing, in Event Viewer navigate to Microsoft-Windows-PrintService/Operational and enable it as its not on by default.

Make sure to enable for latency or you’ll spend a long time wondering why there is no data.

Sources

https://hackernoon.com/the-windows-event-forwarding-survival-guide-2010db7a68c4 https://www.ibm.com/docs/en/netcoolomnibus/8?topic=acquisition-forwarded-event-log https://www.youtube.com/watch?v=oyPuRE51k3o&t=158s