8

The following logstash configuration is used to accept Windows Event Logs as json over a TCP connection and then after some filtering forward the result to Elastic search (source: https://gist.github.com/robinsmidsrod/4215337):

input {
    tcp {
        type => "syslog"
        host => "127.0.0.1"
        port => 3514
    }
    tcp {
        type   => "eventlog"
        host   => "10.1.1.2"
        port   => 3515
        format => 'json'
    }
}

# Details at http://cookbook.logstash.net/recipes/syslog-pri/
filter {

# Incoming data from rsyslog
    grok {
        type      => "syslog"
        pattern   => [ "<%{POSINT:syslog_pri}>(?:%{SYSLOGTIMESTAMP:syslog_timestamp}|%{TIMESTAMP_ISO8601:syslog_timestamp8601}) %{SYSLOGHOST:syslog_hostname} %{PROG:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" ]
        add_field => [ "received_at", "%{@timestamp}" ]
        add_field => [ "received_from", "%{@source_host}" ]
    }
    syslog_pri {
        type => "syslog"
    }
    date {
        type                 => "syslog"
        syslog_timestamp8601 => "ISO8601" # RSYSLOG_ForwardFormat
        syslog_timestamp     => [ "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
    mutate {
        type         => "syslog"
        exclude_tags => "_grokparsefailure"
        replace      => [ "@source_host", "%{syslog_hostname}" ]
        replace      => [ "@message", "%{syslog_message}" ]
    }
    mutate {
        type   => "syslog"
        remove => [ "syslog_hostname", "syslog_message", "syslog_timestamp", "syslog_timestamp8601" ]
    }

# Incoming Windows Event logs from nxlog
    # The EventReceivedTime field must contain only digits, or it is an invalid message
    grep {
        type              => "eventlog"
        EventReceivedTime => "\d+"
    }
    mutate {
        # Lowercase some values that are always in uppercase
        type      => "eventlog"
        lowercase => [ "EventType", "FileName", "Hostname", "Severity" ]
    }
    mutate {
        # Set source to what the message says
        type   => "eventlog"
        rename => [ "Hostname", "@source_host" ]
    }
    date {
        # Convert timestamp from integer in UTC
        type              => "eventlog"
        EventReceivedTime => "UNIX"
    }
    mutate {
        # Rename some fields into something more useful
        type   => "eventlog"
        rename => [ "Message", "@message" ]
        rename => [ "Severity", "eventlog_severity" ]
        rename => [ "SeverityValue", "eventlog_severity_code" ]
        rename => [ "Channel", "eventlog_channel" ]
        rename => [ "SourceName", "eventlog_program" ]
        rename => [ "SourceModuleName", "nxlog_input" ]
        rename => [ "Category", "eventlog_category" ]
        rename => [ "EventID", "eventlog_id" ]
        rename => [ "RecordNumber", "eventlog_record_number" ]
        rename => [ "ProcessID", "eventlog_pid" ]
    }
    mutate {
        # Remove redundant fields
        type   => "eventlog"
        remove => [ "SourceModuleType", "EventTimeWritten", "EventTime", "EventReceivedTime", "EventType" ]
    }
}

output {
    elasticsearch {
        embedded => true
    }
    graphite {
        # Ping the graphite server every time a syslog message is received
        type => "syslog"
        port => 2023     # carbon-aggregator
        metrics => [ "syslog.received.%{@source_host}.count", "1" ]
    }
    graphite {
        # Ping the graphite server every time an eventlog message is received
        type => "eventlog"
        port => 2023     # carbon-aggregator
        metrics => [ "eventlog.received.%{@source_host}.count", "1" ]
    }
}

What is the significance of the @ prefix on some field names at lines 58 and 68? i.e. @source_host and @message on these mutate filters:

mutate {
    # Set source to what the message says
    type   => "eventlog"
    rename => [ "Hostname", "@source_host" ]
}

and

mutate {
    # Rename some fields into something more useful
    type   => "eventlog"
    rename => [ "Message", "@message" ]
    rename => [ "Severity", "eventlog_severity" ]
    rename => [ "SeverityValue", "eventlog_severity_code" ]
    rename => [ "Channel", "eventlog_channel" ]
    rename => [ "SourceName", "eventlog_program" ]
    rename => [ "SourceModuleName", "nxlog_input" ]
    rename => [ "Category", "eventlog_category" ]
    rename => [ "EventID", "eventlog_id" ]
    rename => [ "RecordNumber", "eventlog_record_number" ]
    rename => [ "ProcessID", "eventlog_pid" ]
}
Kev
  • 7,777
  • 17
  • 78
  • 108

1 Answers1

6

I believe it was simply a namespace decision to avoid collisions.

It has been mostly purged from newer versions of logstash. Only @timestamp and @version remain. You should look to upgrading logstash and your shippers.

Dan Garthwaite
  • 2,922
  • 18
  • 29
  • See here: http://tobrunet.ch/2013/09/logstash-1-2-0-upgrade-notes-included/ – Dan Garthwaite Jan 11 '14 at 17:11
  • 2
    Thanks for the answer. I'm running the latest, but some config examples still use the `@` prefix and I couldn't find any mention of it in the docs/elsewhere. – Kev Jan 11 '14 at 23:09
  • 1
    Logstash 1.5 adds a `@metadata` field: https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#metadata – Miles Oct 02 '15 at 22:10