Patrick Kelley 04da5c1250 Inital
2025-05-28 14:31:31 -04:00

85 lines
2.5 KiB
Plaintext

#
# Copyright (c) 2016-2018 RockNSM.
#
# This file is part of RockNSM
# (see http://rocknsm.io).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# vim ft=bro
# Ansible managed
#
module Kafka;
redef Kafka::kafka_conf = table (
["metadata.broker.list"] = "127.0.0.1:9092",
["client.id"] = fmt("zeek-%s", split_string(gethostname(), /\./)[0])
);
redef Kafka::tag_json = F;
redef Kafka::topic_name = "";
redef Kafka::json_timestamps = JSON::TS_ISO8601;
redef Kafka::logs_to_send = {};
export {
const logs_to_ignore: set[Log::ID] &redef;
const ignore_services = set("dns", "krb", "krb_tcp") &redef;
}
#
# NOTE: Uncomment the following line to write syslog.log to the local filesystem
# redef Kafka::logs_to_ignore = { Syslog::LOG };
#
event zeek_init() &priority=-5
{
for (stream_id in Log::active_streams) {
if (stream_id in Kafka::logs_to_ignore) {
next;
}
if ( (|Kafka::logs_to_send| == 0) || stream_id in Kafka::logs_to_send ) {
local pathname = to_lower(sub(fmt("bro_%s", stream_id),/:.*$/, ""));
local filter: Log::Filter = [
$name = fmt("kafka-%s", stream_id),
$writer = Log::WRITER_KAFKAWRITER,
$config = table(["stream_id"] = fmt("%s", pathname)),
$path = pathname
];
if (stream_id == Conn::LOG) {
filter$pred = Conn_Filter::filter_conn_log_pred;
}
if (stream_id == DNS::LOG) {
filter$pred = DNS_Filter::filter_dns_log_pred;
}
if (stream_id == Files::LOG) {
filter$pred = FILES_FILTER::filter_files_log_pred;
}
if (stream_id == SSL::LOG) {
filter$pred = SSL_FILTER::filter_ssl_log_pred;
}
if (stream_id == DCE_RPC::LOG) {
filter$pred = DCE_RPC_FILTER::filter_dce_rpc_log_pred;
}
Log::add_filter(stream_id, filter);
}
}
}