|
|
@ -5,38 +5,55 @@ use strict; |
|
|
|
use JSON; |
|
|
|
use JSON; |
|
|
|
use LWP::UserAgent; |
|
|
|
use LWP::UserAgent; |
|
|
|
use Encode qw(encode); |
|
|
|
use Encode qw(encode); |
|
|
|
use Data::Dumper; |
|
|
|
|
|
|
|
use Compress::Zlib; |
|
|
|
use Compress::Zlib; |
|
|
|
use Getopt::Long; |
|
|
|
use Getopt::Long; |
|
|
|
use YAML::Tiny; |
|
|
|
use YAML::Tiny; |
|
|
|
|
|
|
|
|
|
|
|
my $config = '/etc/systemd/journal-gelf.yml'; |
|
|
|
|
|
|
|
my $conf = {}; |
|
|
|
my $conf = {}; |
|
|
|
|
|
|
|
my $cmd = { |
|
|
|
|
|
|
|
config => '/etc/systemd/journal-gelf.yml' |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
if (-e $config) { |
|
|
|
GetOptions ( |
|
|
|
print "Reading config file $config\n"; |
|
|
|
'c|config=s' => \$cmd->{config}, |
|
|
|
my $yaml = YAML::Tiny->read( $config ) or die "Config file $config is invalid\n"; |
|
|
|
'state=s' => \$cmd->{state}, |
|
|
|
if (not $yaml->[0]) { |
|
|
|
'compress!' => \$cmd->{compress}, |
|
|
|
die "Config file $config is invalid\n" |
|
|
|
'url=s' => \$cmd->{url}, |
|
|
|
|
|
|
|
'username=s' => \$cmd->{username}, |
|
|
|
|
|
|
|
'password=s' => \$cmd->{password} |
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Open config file |
|
|
|
|
|
|
|
if (-e $cmd->{config}) { |
|
|
|
|
|
|
|
print "Reading config file " . $cmd->{config} . "\n"; |
|
|
|
|
|
|
|
my $yaml = YAML::Tiny->read( $cmd->{config} ) |
|
|
|
|
|
|
|
or die "Config file " . $cmd->{config} . " is invalid\n"; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if ( not $yaml->[0] ) { |
|
|
|
|
|
|
|
die "Config file " . $cmd->{config} . " is invalid\n"; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# File could be parsed, lets load |
|
|
|
|
|
|
|
# settings in $conf |
|
|
|
$conf = $yaml->[0]; |
|
|
|
$conf = $yaml->[0]; |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
print "Config file " . $cmd->{config} . " does not exist, ignoring it\n"; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
GetOptions ( |
|
|
|
# Command line override config file |
|
|
|
'state=s' => \$conf->{state}, |
|
|
|
foreach ( keys %{ $cmd } ){ |
|
|
|
'compress!' => \$conf->{compress}, |
|
|
|
$conf->{$_} = $cmd->{$_} if ( $cmd->{$_} ); |
|
|
|
'url=s' => \$conf->{url}, |
|
|
|
} |
|
|
|
'username=s' => \$conf->{username}, |
|
|
|
|
|
|
|
'password=s' => \$conf->{password} |
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Set some defaults is missing |
|
|
|
$conf->{state} //= '/var/lib/systemd-journal-gelf/state'; |
|
|
|
$conf->{state} //= '/var/lib/systemd-journal-gelf/state'; |
|
|
|
$conf->{compress} //= 1; |
|
|
|
$conf->{compress} //= 1; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Now check config makes sens |
|
|
|
if ( |
|
|
|
if ( |
|
|
|
not $conf->{url} or |
|
|
|
not $conf->{url} or |
|
|
|
($conf->{username} and not $conf->{password}) or |
|
|
|
( $conf->{username} and not $conf->{password} ) or |
|
|
|
(not $conf->{username} and $conf->{password}) |
|
|
|
( not $conf->{username} and $conf->{password} ) |
|
|
|
){ |
|
|
|
){ |
|
|
|
help(); |
|
|
|
help(); |
|
|
|
die; |
|
|
|
die; |
|
|
@ -45,9 +62,10 @@ if ( |
|
|
|
print "Starting the Systemd Journal GELF uploader daemon\n"; |
|
|
|
print "Starting the Systemd Journal GELF uploader daemon\n"; |
|
|
|
|
|
|
|
|
|
|
|
my $ua = LWP::UserAgent->new( |
|
|
|
my $ua = LWP::UserAgent->new( |
|
|
|
# env_proxy => 1, |
|
|
|
env_proxy => 1, |
|
|
|
keep_alive => 1 |
|
|
|
keep_alive => 1 |
|
|
|
); |
|
|
|
); |
|
|
|
|
|
|
|
|
|
|
|
$ua->default_header( 'Content-Type' => 'application/json' ); |
|
|
|
$ua->default_header( 'Content-Type' => 'application/json' ); |
|
|
|
if ( $conf->{compress} ){ |
|
|
|
if ( $conf->{compress} ){ |
|
|
|
$ua->default_header( 'Accept-Encoding' => HTTP::Message::decodable ); |
|
|
|
$ua->default_header( 'Accept-Encoding' => HTTP::Message::decodable ); |
|
|
@ -57,10 +75,10 @@ if ( $conf->{compress} ){ |
|
|
|
# Check if the state file exists and contains a valid cursor |
|
|
|
# Check if the state file exists and contains a valid cursor |
|
|
|
my $cursor_arg = ''; |
|
|
|
my $cursor_arg = ''; |
|
|
|
open CURSOR, "+<", $conf->{state}; |
|
|
|
open CURSOR, "+<", $conf->{state}; |
|
|
|
if (-e $conf->{state}){ |
|
|
|
if ( -e $conf->{state} ){ |
|
|
|
my $cursor = <CURSOR>; |
|
|
|
my $cursor = <CURSOR>; |
|
|
|
close CURSOR; |
|
|
|
close CURSOR; |
|
|
|
if ($cursor and $cursor =~ m/^s=[a-z\d]+;i=[a-z\d]+;b=[a-z\d]+;m=[a-z\d]+;t=[a-z\d]+;x=[a-z\d]+$/){ |
|
|
|
if ( $cursor and $cursor =~ m/^s=[a-z\d]+;i=[a-z\d]+;b=[a-z\d]+;m=[a-z\d]+;t=[a-z\d]+;x=[a-z\d]+$/ ){ |
|
|
|
print "Valid cursor found in " . $conf->{state} . ", will start back from here\n"; |
|
|
|
print "Valid cursor found in " . $conf->{state} . ", will start back from here\n"; |
|
|
|
$cursor_arg = " --after-cursor='" . $cursor . "'"; |
|
|
|
$cursor_arg = " --after-cursor='" . $cursor . "'"; |
|
|
|
} else { |
|
|
|
} else { |
|
|
@ -70,28 +88,41 @@ if (-e $conf->{state}){ |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
open JOURNAL, "/usr/bin/journalctl -f -o json$cursor_arg |"; |
|
|
|
open JOURNAL, "/usr/bin/journalctl -f -o json$cursor_arg |"; |
|
|
|
while (my $entry = <JOURNAL>){ |
|
|
|
while ( my $entry = <JOURNAL> ){ |
|
|
|
my $msg = from_json($entry); |
|
|
|
my $msg = from_json( $entry ); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if ( not $msg ) { |
|
|
|
|
|
|
|
# Oups, something is obviously wrong here |
|
|
|
|
|
|
|
# journalctl didn't sent us valid JSON ? |
|
|
|
|
|
|
|
print "Error parsing message ($msg) \n"; |
|
|
|
|
|
|
|
next; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Build a basic GELF message |
|
|
|
my $gelf = { |
|
|
|
my $gelf = { |
|
|
|
version => 1.1, |
|
|
|
version => 1.1, |
|
|
|
short_message => $msg->{MESSAGE}, |
|
|
|
short_message => $msg->{MESSAGE}, |
|
|
|
host => $msg->{_HOSTNAME}, |
|
|
|
host => $msg->{_HOSTNAME}, |
|
|
|
timestamp => int ($msg->{__REALTIME_TIMESTAMP} / (1000 * 1000)), |
|
|
|
timestamp => int ( $msg->{__REALTIME_TIMESTAMP} / ( 1000 * 1000 ) ), |
|
|
|
level => $msg->{PRIORITY} |
|
|
|
level => $msg->{PRIORITY} |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
# Now lets look at the message. If it starts with gelf: we can split it and have further |
|
|
|
# Now lets look at the message. If it starts with gelf: we can split it and have further |
|
|
|
# fields to send. I use this to handle httpd or nginx logs for example |
|
|
|
# fields to send. I use this to handle httpd or nginx logs for example |
|
|
|
if ($msg->{MESSAGE} =~ m/^gelf:([a-zA-Z\d]+=([^\|])\|?)+/){ |
|
|
|
if ( $msg->{MESSAGE} =~ m/^gelf:([a-zA-Z\d]+=([^\|])\|?)+/ ){ |
|
|
|
$msg->{MESSAGE} =~ s/^gelf://; |
|
|
|
$msg->{MESSAGE} =~ s/^gelf://; |
|
|
|
foreach (split /\|/, $msg->{MESSAGE}){ |
|
|
|
foreach ( split /\|/, $msg->{MESSAGE} ){ |
|
|
|
my ($key,$val) = split /=/, $_; |
|
|
|
my ( $key, $val ) = split /=/, $_; |
|
|
|
$gelf->{'_' . lc $key} = $val; |
|
|
|
$gelf->{'_' . lc $key} = $val; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
foreach (grep !/^MESSAGE|_HOSTNAME|__REALTIME_TIMESTAMP|PRIORITY$/, keys %$msg){ |
|
|
|
|
|
|
|
my $key = lc (($_ =~ m/^_/) ? $_ : '_' . $_); |
|
|
|
# Add the other attributes to the gelf message, except thos already treated |
|
|
|
$gelf->{$key} = $msg->{$_}; |
|
|
|
foreach ( grep !/^MESSAGE|_HOSTNAME|__REALTIME_TIMESTAMP|PRIORITY$/, keys %$msg ){ |
|
|
|
|
|
|
|
$gelf->{$_} = $msg->{$_}; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Now, we'll try to POST this message |
|
|
|
my $retry = 0; |
|
|
|
my $retry = 0; |
|
|
|
my $resp; |
|
|
|
my $resp; |
|
|
|
do { |
|
|
|
do { |
|
|
@ -100,14 +131,28 @@ while (my $entry = <JOURNAL>){ |
|
|
|
$resp->code . " (" . $resp->message . "). Tring again in $retry seconds\n"; |
|
|
|
$resp->code . " (" . $resp->message . "). Tring again in $retry seconds\n"; |
|
|
|
sleep $retry; |
|
|
|
sleep $retry; |
|
|
|
} |
|
|
|
} |
|
|
|
$resp = $ua->post($conf->{url}, Content => Compress::Zlib::memGzip(encode('utf-8', to_json($gelf)))); |
|
|
|
$resp = $ua->post( |
|
|
|
|
|
|
|
$conf->{url}, |
|
|
|
|
|
|
|
Content => Compress::Zlib::memGzip( |
|
|
|
|
|
|
|
encode( |
|
|
|
|
|
|
|
'utf-8', |
|
|
|
|
|
|
|
to_json($gelf) |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
); |
|
|
|
$retry = ($retry > 0) ? $retry * 2 : 1; |
|
|
|
$retry = ($retry > 0) ? $retry * 2 : 1; |
|
|
|
} while ($resp->code != 202 and $retry < 600); |
|
|
|
} while (not $resp->is_success and $retry < 600); |
|
|
|
if ($resp->code == 202){ |
|
|
|
|
|
|
|
|
|
|
|
# The message has been accepted, we can save the current cursor and |
|
|
|
|
|
|
|
# continue |
|
|
|
|
|
|
|
if ($resp->is_success){ |
|
|
|
open CURSOR, ">", $conf->{state}; |
|
|
|
open CURSOR, ">", $conf->{state}; |
|
|
|
print CURSOR $msg->{__CURSOR}; |
|
|
|
print CURSOR $msg->{__CURSOR}; |
|
|
|
close CURSOR |
|
|
|
close CURSOR |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
|
|
|
|
# We can't upload our current message for |
|
|
|
|
|
|
|
# too much time, no much left we can do, lets die and hope |
|
|
|
|
|
|
|
# our service manager will restart us :-) |
|
|
|
die "Error sending data to GELF server\n"; |
|
|
|
die "Error sending data to GELF server\n"; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|