-
Notifications
You must be signed in to change notification settings - Fork 20
Add option to send indexed fields along with events #25
base: master
Are you sure you want to change the base?
Add option to send indexed fields along with events #25
Conversation
I am testing this out as I would really like this feature. Though I am having trouble getting a correct "fields" json object with the record transformer plugin. Similar to your example, I have:
but I get an error:
I am using |
Hi Brian,
Unfortunately I have not been using the more recent versions of fluentd. I
have been using "gem install fluentd -v 0.12.35" for my deployments in
Kubernetes. Perhaps something has changed in the more recent version that I
didn't anticipate. I know that isn't very helpful.
I had also done the same testing with td-agent version 2. . .but that is
also apparently fluentd version 0.12.
Perhaps I can get td-agent version 3 installed somewhere and do a quick
test to see what happens with it. I think that td-agent version 3 is very
close, if not exactly, fluentd version 1.0.0
…On Tue, Mar 6, 2018 at 10:40 AM, Brian Wong ***@***.***> wrote:
I am testing this out as I would really like this feature. Though I am
having trouble getting a correct "fields" json object with the record
transformer plugin. Similar to your example, I have:
fields '{"logfile": "${record["logfile"]}"}'
but I get an error:
2018-03-06 01:31:49 +0000 [warn]: #0 failed to parse {"logfile":
"${record["logfile"]}"} as json. Assuming {"logfile":
"${record["logfile"]}"} is a string error_class=JSON::ParserError
error="765: unexpected token at '{\"logfile\": \"${record[\"logfile\"]}\"}'"
I am using fluentd-1.1.0 pid=7 ruby="2.4.3". This may be a general
Fluentd question but I was hoping you can help so I can test this PR myself.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#25 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AbwQc8uNKx6wqGnVTx72eg7hCvU2Wg0bks5tbtgJgaJpZM4SRdat>
.
|
Thanks for the quick response. I was not able to get this working on v0.12.42 either. |
Can you post a more comprehensive configuration so I can verify my settings? |
Sure thing, here are the relevant parts of my fluentd config for this:
First, I am receiving what are essentially custom syslog messages on port
5140 udp:
<source>
@type syslog
port 5140
bind 0.0.0.0
tag syslog
format /^(?<time>[^ ]*\s*[^ ]* [^ ]*) (?<host>\S+) (?<app-name>\S+)
(?<procid>\S+) (?<msgid>\S+) \[(?<sd-id>\S+) index="(?<index>[^"]*)"
cluster="(?<cluster>[^"]*)"\] (?<message>.*)$/
message_format auto
message_length_limit 10240
</source>
Note that in the syslog message, they are sending a key value pair (doesn't
have to be, could be anything you capture as a named group, just happens to
be a key value pair here.) for "cluster" which I capture as "cluster". Now
I believe it is in the record as "cluster": "blah"
Next, I use a filter with record transformer to in inject the "fields" json
object into the record:
<filter *.syslog.local0.**>
@type record_transformer
<record>
fields '{"cluster": "${record["cluster"]}" }'
</record>
</filter>
Then I send them out to the http event collector:
<match *.syslog.local0.**>
@type splunk-http-eventcollector
server servername.corp.theplatform.com:8088
verify false
send_fields true
token D17501D5-5DA1-4096-BE8E-B0CD05C318CB
host '${record["host"]}'
index '${record["index"]}'
check_index false
source
"#{Socket.gethostname}.${tag_parts[1]}.${tag_parts[2]}.${tag_parts[3]}"
sourcetype '${record["app-name"]}-${record["msgid"]}'
fields '${record["fields"]}'
buffer_type memory
buffer_queue_limit 16
buffer_chunk_limit 8m
flush_interval 5s
</match>
Hopefully that helps!
…On Tue, Mar 6, 2018 at 4:01 PM, Brian Wong ***@***.***> wrote:
Can you post a more comprehensive configuration so I can verify my
settings?
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#25 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AbwQc3CYGuRalo9cILOueQgi_lq5_KHvks5tbyNcgaJpZM4SRdat>
.
|
Thanks for this. It turns out that the error I am seeing does not affect the functionality. Your changes works perfectly. Thank you for this. |
can we get this change in soon? |
+1 for this change |
"host" => @placeholder_expander.expand(@host.to_s, placeholders), | ||
"index" => @placeholder_expander.expand(@index, placeholders) | ||
] | ||
if @send_fields |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this be better as:
splunk_object = Hash[
"time" => time.to_i,
"source" => if @source.nil? then tag.to_s else @placeholder_expander.expand(@source, placeholders) end,
"sourcetype" => @placeholder_expander.expand(@sourcetype.to_s, placeholders),
"host" => @placeholder_expander.expand(@host.to_s, placeholders),
"index" => @placeholder_expander.expand(@index, placeholders)
]
if @send_fields
splunk_object = splunk_object.merge(Hash[
"fields" => JSON.parse(@placeholder_expander.expand(@fields.to_s, placeholders))
])
We collect events with fluentd via syslog input that match the following regex format:
^(?<time>[^ ]*\s*[^ ]* [^ ]*) (?<host>\S+) sourcetype=(?<sourcetype>\S+) cluster=(?<cluster>\S+)::(?<message>.*)$
We want to send in cluster=clustername as an indexed field with these events.
We modify the records to have a "fields" json object with the record transformer plugin:
fields '{"cluster": "${record["cluster"]}" }'
Then we configure the http event collector plugin to include the indexed field by setting the following options:
send_fields true
fields '${record["fields"]}'