技术选型
filebeat简介
- processors配置: 可以对event做很多加工, 比如add_fields,add_labels, add_tags, rename, drop_event等等
- Inputs配置: 定义各种input配置。
- Moduels: 对各种开源组件的日志收集支持,这些modules针对各个组件的日志做了parse与结构化处理,同时集成了ES, Kibana, 为这些开源组件提供一整套日志收集展示方案。
- HTTP endpoint: 配置http rest api获取metrics。
filebeat收集meta持久化机制
{"op":"set","id":1}
{"k":"filebeat::logs::native::3162611-64768","v":{"id":"native::3162611-64768","prev_id":"","source":"/root/ysptest1.log","offset":0,"ttl":-1,"type":"log","timestamp":[2061828726851,1628151202],"FileStateOS":{"inode":3162611,"device":64768},"identifier_name":"native"}}
{"op":"set","id":2}
{"k":"filebeat::logs::native::3162611-64768","v":{"id":"native::3162611-64768","prev_id":"","timestamp":[2061975276931,1628151203],"ttl":-1,"FileStateOS":{"inode":3162611,"device":64768},"identifier_name":"native","source":"/root/ysptest1.log","offset":165,"type":"log"}}其中记录了input type, source端文件路径以及当前收集到的offset,时间戳, 还有用于唯一标识文件的inode, device信息。ack确认反馈会从output端一直反馈到input端,input端收到ack确认后会负责将meta信息持久化到这个文件中。
filebeat反压机制
日志收集方案
设计目标:
- 提供统一运维监控管理,降低运维成本。
- 收集的日志可以同时满足实时,离线需求。
- 日志收集pipeline支持反压,支持at least once语义,支持日志重放。
- 跨IDC容灾,支持动态修改agent配置,将日志收集定向到其他IDC。
设计方案:
- 提供一键agent部署脚本,提供agentManager管理metrcis上报与agent配置修改动态感知
- 将kafka作为统一日志收集目的地。
- logstash 作为日志汇聚层, 避免agent过多对kafka带来压力。
- 监控告警: 获取filebeat,logstash metrics并解析成prom格式上报给prometheus, 通过grafana展示,提供实例级别的监控,对收集延迟,失败及时告警, 对收集的日志count进行统计,方便对数。
- 产品化集成到ultron平台,基于项目粒度进行统一管理,运维,监控。
以下是日志收集的总体架构示意图:
对于日志收集处理可以分为如下四个层次:
#Q1 日志收集agent层:
#Q2 日志汇聚层:
这是个可选层,分以下两种情况:
- 对于agent不是很多的case, 直接采用filebeat + kafka的方案是很高效的
- 对于agent很多的case,我们提供logstash汇聚层来对收集数据汇总然后发送给kafka, 避免对kafka造成连接过多的问题
#Q3 DataBus层:
我们将日志采集统一到kafka, 这样离线实时需求都可以得到满足, 我们在每个IDC都有kafka集群,这样当某个IDC不可用时,动态修改filebeat配置即可完成重新收集,相当于具备了跨IDC容灾的能力。
#Q4 异构传输层:
这一层主要是对收集日志的处理使用, 我们当前通过自研的hamal2落地hdfs/hive为离线etl提供支持, 通过flink/spark/storm/druid/clickhouse等实时消费处理。hamal2是我们基于flink实现的一个异构数据同步传输框架,用于Kafka数据实时入仓入湖。
下面主要介绍两种方案:
- filebeat —> kafka 直连, 这是目前主要在用的方案。
- filebeat —> logstash —> kafka 这个方案加入了logstash汇聚层。
filebeat —> kafka
对于agent不是很多的场景, 直接使用filebeat kafka output写入kafka是高效简洁的方式, 根据我们上面阐述的filebeat收集meta持久化机制和反压机制,在kafka有问题写入不成功的情况下,会触发filebeat反压, 日志收集文件的offset也将停止持久化,这样是符合我们预期的。目前360商业化没有有很多agent的场景, 所以主要使用这种模式。
下面是filebeat–>kafka的简单架构示意图:
filebeat.yml 配置示例:
filebeat.inputs:
- type: log
enabled: true
paths:
- /root/test*.log
fields:
topic_name: test
- type: log
enabled: true
paths:
- /root/test2*.log
fields:
topic_name: test2
output.kafka:
version: 2.0.0
hosts: ["xxx:9092", "xxx:9092", "xxx:9092"]
# message topic selection + partitioning
topic: '%{[fields.topic_name]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: lz4
max_message_bytes: 10000000
sasl.mechanism: SCRAM-SHA-256
username: xxx
password: xxx
worker: 1 # producer实例数
refresh_frequency: 5m
codec.format:
string: '%{[message]}' # 定义输出的日志,默认是带有很多meta信息的json string
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
# 定义http endpoint, 用于获取metrics
http.enabled: true
http.host: your host
http.port: 5066
还有比较重要的配置是codec.format,可以重新定义输出日志的格式。
filebeat–>logstash–>kafka
- persistent queues(PQ): 默认是不开启的,日志会先写内存queue再output出去,但这种方式在异常情况下会丢数据, 为了保证日志完整性,我们必须开启PQ,开启后所有日志将先持久化到disk然后再output出去, 这样可以做到at least once语义,可以通过配置queue.type来开启PQ:
queue.type: persisted
queue.max_bytes: 8gb
queue.max_events: 3000000
path.queue: /path/to/queuefile
- 反压机制: 在PQ写满的情况下会反压到上游filebeat, filebeat再反压到input停止日志收集。这种反压传导机制和flink的有点像。
通过logstash的PQ机制以及反压机制就可以保证在极端情况下整个pipeline的at least once语义。
下面是filebeat–>logstash–>kafka的简单架构示意图:
filebeat.yml 示例:
filebeat.inputs:
- type: log
enabled: true
paths:
- /root/test*.log
fields:
topic_name: test
kafka_cluster: cluster1
- type: log
enabled: true
paths:
- /root/test2*.log
fields:
topic_name: test2
kafka_cluster: cluster2
output.logstash:
hosts: ["logstash.k8s.domain:5044"]
# 定义http endpoint, 用于获取metrics
http.enabled: true
http.host: your host
http.port: 5066
output.logstash中定义的logstash.k8s.domain:5044 其实是一个lvs域名端口,其后对应了多个logstash实例。这里我们没有使用filebeat的loadbalance设置,因为不是很灵活。
logstash.yml示例
queue.type: persisted
queue.max_bytes: 8gb
queue.max_events: 3000000
path.queue: /path/to/queuefile
logstash-pipeline.conf 示例
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
output {
stdout { codec => rubydebug {metadata => true}}
if [fields][kafka_cluster] == "xxx" { # 日志分拣
kafka {
codec => plain {
format => '{ message:"%{message}"}'
}
bootstrap_servers => "xxx:9092,xxx:9092,xxx:9092"
topic_id => "%{[fields][topic_name]}"
compression_type => "lz4"
}
}
if [fields][kafka_cluster] == "xxx" { # 日志分拣
kafka {
codec => plain {
format => '{ message:"%{message}"}'
}
bootstrap_servers => "xxx:9092,xxx:9092,xxx:9092"
topic_id => "%{[fields][topic_name]}"
jaas_path => "/root/logstash/kafka-jaas.conf" # 支持SASL SCRAM
sasl_mechanism => "SCRAM-SHA-256"
security_protocol => "SASL_PLAINTEXT"
compression_type => "lz4"
}
}
}
监控
以下是filebeat监控截图, 主要就是将http endpoint中的metrics都展示出来:
总结
参考资料
https://www.elastic.co/guide/en/beats/filebeat/current/index.html
https://www.elastic.co/guide/en/logstash/current/index.html
https://cloud.tencent.com/developer/article/1634020