目的:实现自定义在开发、联调环境上面的日志报警,主要是通过企业微信通知
[TOC]
总体架构
总体思路借助成熟的日志采集系统ELK+ElasticAlert报警。 首先filebeat采集bserver、cserver服务所对应日志文件,汇总到logstash上面进行统一处理,最好推送到Elasticsearch,由ElasticAlert进行一个规则匹配报警
Filebeat
Filebeat 是一个用于转发和集中日志数据的轻量级传送器。作为代理安装在您的服务器上,Filebeat 监控您指定的日志文件或位置,收集日志事件,并将它们转发到Elasticsearch或 Logstash以进行索引。
Logstash 是一个具有实时流水线功能的开源数据收集引擎。Logstash 可以动态地统一来自不同来源的数据,并将数据规范化到您选择的目的地。为各种高级下游分析和可视化用例清理和普及所有数据
搭建环境
前置要求
前置要求:ELK环境已经搭建完成 + Filebeat 接入
ElastAlert搭建
https://elastalert2.readthedocs.io/en/latest/running_elastalert.html#as-a-docker-container
拉去镜像地址
docker pull jertel/elastalert2
运行elastalert容器
docker run –network host -d –name elastalert –restart=always \
-v /sensorsdata/soft/elastalert/config.yaml:/opt/elastalert/config.yaml \
-v /sensorsdata/soft/elastalert/rules:/opt/elastalert/rules \
jertel/elastalert2 –verbose
配置信息
报警配置根据各自的业务需求配置
# 监控 elasticsearch 的信息
es_host: 10.120.250.235
es_port: 9201
# rule name 必须是独一的
name: scms_log
# any 表示: 符合任何一个规则都会匹配,查询返回的每个命中将生成一个警报
type: any
# 监控 elasticsearch 的 index, 多个逗号分隔, 不能由空格, 可以使用 * 匹配
index: scms_log1-*
#用来拼配告警规则,elasticsearch 的query语句,支持 AND&OR等。
filter:
- query:
query_string:
query: "log-level: ERROR"
# command 表示: 报警被触发以后执行shell命令
alert:
- command
pipe_match_json: true
command: [
"/opt/elastalert/rules/send_weixin_message.py",
"--_id=%(_id)s",
"--level=%(log-level)s",
"--org_id=%(fields.env.organizationId)s",
"--pid=%(pid)s",
"--thread_name=%(thread_name)s",
"--class_name=%(class_name)s",
"--host_name=%(host.name)s",
"--index=%(_index)s",
"--message=%(message)s"
]
# 执行的shell命令, 可以使用 elasticsearch 中查询出的结果做作为参数
# # 这里调用 send_weixin_message.py 脚本, 并向程序传递参数
报警脚本
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
import requests
import json
parser = argparse.ArgumentParser(description='企业微信报警')
parser.add_argument('--_id', help='es索引id')
parser.add_argument('--level', help='报警级别')
parser.add_argument('--org_id', help='组织id')
parser.add_argument('--pid', help='线程id')
parser.add_argument('--thread_name', help='线程名称')
parser.add_argument('--class_name', help='java类名')
parser.add_argument('--host_name', help='主机名')
parser.add_argument('--index', help='es索引')
parser.add_argument('--message', help='警告消息')
args = parser.parse_args()
_id=args._id
level = args.level
org_id = args.org_id
pid = args.pid
thread_name = args.thread_name
class_name = args.class_name
host_name = args.host_name
index = args.index
message = args.message
message = message.replace("\\n", "\n")
message = message.replace("\\t", "\t")
data = {
'msgtype': 'markdown',
'markdown': {
'content': """报警企微测试\n
>es索引id:<font color=\"comment\">{}</font>
>报警级别:<font color=\"comment\">{}</font>
>环境组织id:<font color=\"comment\">{}</font>
>线程id:<font color=\"comment\">{}</font>
>线程名称:<font color=\"comment\">{}</font>
>java类名:<font color=\"comment\">{}</font>
>主机名:<font color=\"comment\">{}</font>
>es索引:<font color=\"comment\">{}</font>
>警告消息:<font color=\"comment\">{}</font>
""".format(_id, level, org_id, pid, thread_name, class_name, host_name, index, message[:500])
}
}
data=json.dumps(data)
print(data)
#url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=d4b9985c-603a-4f70-9349-bc72fd6e6ecd'
url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=a5e49c21-66c3-401f-81e3-2e7ac9908518'
headers = {
'Content-Type': 'application/json'
}
#print(message[:500])
res = requests.post(url=url, data=data, timeout=2)
print(res)