filebeat改造支持rocketmq(filebeat monitor)
ztj100 2025-07-23 19:26 2 浏览 0 评论
继续分享下以前在gitchat上发布的文章:filebeat改造支持rocketmq
1.概述
1.1问题概述
现在越来越多的日志采集使用 FileBeat,FileBeat 是个轻量型日志采集器,采用 Go 语言实现,性能稳健,占用资源少。FileBeat 现在支持采集的日志内容发送到 Redis、Elasticsearch、Kafka、Logstash。
那么我们如果想通过 FileBeat 采集日志到 RocketMQ 怎么办呢?不好意思,官方现在并不支持, 搜索下,也没有现成的实现。
让我给大家介绍下如何用 FileBeat 源码实现自己的 output.rocketmq。
在本场 Chat 中,会讲到如下内容:
- 如何基于 Beat 源码,实现自己的 output
- 实现输出到 RocketMQ的 output
适合人群: 对 ELK、FileBeat、RocketMQ 日志采集感感兴趣的技术人员
1.2 名词解析
名词 | 解释 | 备注 |
ELK | ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。 | |
Rocketmq | 高性能消息队列,采用java实现 | |
Beats | Beats是ELK Stack技术栈中负责单一用途数据采集并推送给Logstash或Elasticsearch的轻量级产品。 | |
Filebeat | Beats中轻量型日志采集器 |
2. 设计
2.1 filebeat介绍
Filebeat 简化了常见格式的日志的收集,支持采集的日志内容发送到 Redis、Elasticsearch、Kafka、Logstash中。
2.2 filebeat+rocketmq实际需求
实际项目有这样的需求,想利用现有的定制日志,采集内容到rocketmq,由rocketmq慢慢消费存到业务表中。
这样的好处,能利用现有的rocketmq集群,业务系统的请求性能提高,日志异步filebeat到rocketmq处理。
3.实现
让我们开始实现filebeat和rocketmq的集成。
3.1 准备工作
当前代码实现是基于beats源码 7.10.0实现的。
大家可以点击就行下载beats7.10.0
别的版本未经验证,有兴趣的读者需要自行测试验证。
3.2 output实现
a. 按beats规范,创建代码目录
## 解压下载的源码
cd beats-7.10.0/libbeat/outputs/
mkdir rocketmq ##目录名还是直接用rocketmq命名了
b. 按beats代码规范在rocketmq目录下增加config.go
package rocketmq
import "github.com/elastic/beats/v7/libbeat/outputs/codec"
type Config struct {
Codec codec.Config `config:"codec"`
Host string `config:"host"`
Topic string `config:"topic"`
}
var defaultConfig = Config{
Host: "", //rocketmq的host和port,比如192.168.10.100:9876
Topic: "", //rocketmq的topic
}
host和topic的定义是为了获取配置文件里配置的rocketmq地址和topic
c. 代码实现
package rocketmq
import (
"context"
"fmt"
"os"
"runtime"
"strings"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/beats/v7/libbeat/publisher"
)
type rocketmqOutput struct {
log *logp.Logger
out *os.File
observer outputs.Observer
index string
codec codec.Codec
host string
topic string
mq *RocketMq
}
func init() {//初始化,把我们定义的rocketmq struct注册进来
outputs.RegisterType("rocketmq", makerocketmq)
}
func makerocketmq(
_ outputs.IndexManager,
beat beat.Info,
observer outputs.Observer,
cfg *common.Config,
) (outputs.Group, error) {
config := defaultConfig
err := cfg.Unpack(&config)
if err != nil {
return outputs.Fail(err)
}
index := beat.Beat
enc, err := codec.CreateEncoder(beat, config.Codec)
//创建rocketmq struct, 传入配置文件的rocketmq host和topic进行保存。
out := &rocketmqOutput{log: logp.NewLogger("rocketmq"), out: os.Stdout, observer: observer, index: index, codec: enc, host: config.Host, topic: config.Topic}
arr:=strings.Split(config.Host,",")//针对rocketmq可能集群配置xxx:9876,xxxx:9876
//rocketmq的生产者开始注册,其中group写死了= logByFilebeat,重新
out.mq = RegisterRocketProducerMust(arr, "logByFilebeat", 1)
// check stdout actually being available
if runtime.GOOS != "windows" {
if _, err = out.out.Stat(); err != nil {
err = fmt.Errorf("rocketmq output initialization failed with: %v", err)
return outputs.Fail(err)
}
}
//没有大小限制=-1,不尝试重试=0
return outputs.Success(-1, 0, out)
}
//关闭触发函数
func (c *rocketmqOutput) Close() error {
if c.mq != nil {
c.mq.Shutdown()
}
return nil
}
//有新的日志信息产生,会触发该函数
func (c *rocketmqOutput) Publish(_ context.Context, batch publisher.Batch) error {
st := c.observer
events := batch.Events()
st.NewBatch(len(events))
dropped := 0
for i := range events {
ok := c.publishEvent(&events[i])
if !ok {
dropped++
}
}
batch.ACK()
st.Dropped(dropped)
st.Acked(len(events) - dropped)
return nil
}
func (c *rocketmqOutput) publishEvent(event *publisher.Event) bool {
serializedEvent, err := c.codec.Encode(c.index, &event.Content)
if err != nil {
if !event.Guaranteed() {
return false
}
c.log.Errorf("Unable to encode event: %+v", err)
c.log.Debugf("Failed event: %v", event)
return false
}
c.observer.WriteBytes(len(serializedEvent) + 1)
//判断生产者是否为空,如果为空重新初始化注册到rocketmq中
if c.mq.isShutdown() == true {
arr:=strings.Split(c.host,",")
c.mq = RegisterRocketProducerMust(arr, "logByFilebeat", 1)
}
//新增日志内容
str := string(serializedEvent)
//发送内容到rocketmq
msg, err := c.mq.SendMsg(c.topic, str)
c.log.Debug("msg:%v", msg)
if err != nil {
c.log.Errorf("send to rocketmq is error %+v", err)
return false
}
return true
}
//接口规范
func (c *rocketmqOutput) String() string {
return "rocketmq"
}
- 具体信息都在代码里增加了注释。
- 代码发送rocketmq底层采用了 rocketmq-client,并采用了orange框架里的rocketmq封装,该框架包转的很好用,但是有几点和实际需求不满足的地方。
- a. 当前不需要mq的消费者实现。
- b. 需要判断生产者是否为空,如果为空需要重新注册生产者
- 对此从新改造了下,改造的代码存放rocketmq目录下的queue.go中,具体见代码:
package rocketmq
import (
"context"
"errors"
"fmt"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
const (
_ = iota
SendMsg
)
type MqProducer interface {
SendMsg(topic string, body string) (mqMsg MqMsg, err error)
SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error)
isShutdown() (b bool, err error)
Shutdown() (err error)
}
type MqMsg struct {
RunType int `json:"run_type"`
Topic string `json:"topic"`
MsgId string `json:"msg_id"`
Offset int64 `json:"offset"`
Partition int32 `json:"partition"`
Timestamp time.Time `json:"timestamp"`
Body []byte `json:"body"`
}
type RocketMq struct {
endPoints []string
producerIns rocketmq.Producer
consumerIns rocketmq.PushConsumer
}
// RegisterRocketProducerMust 注册并启动生产者接口实现
func RegisterRocketProducerMust(endPoints []string, groupName string, retry int) (client *RocketMq) {
var err error
client, err = RegisterRocketMqProducer(endPoints, groupName, retry)
if err != nil {
panic(err)
}
return client
}
// 是否生产者为空
func (r *RocketMq) isShutdown() (b bool) {
if r.producerIns == nil {
return true
} else {
return false
}
}
// 关闭生产者
func (r *RocketMq) Shutdown() {
if r.producerIns != nil {
r.producerIns.Shutdown()
}
}
// SendMsg 按字符串类型生产数据
func (r *RocketMq) SendMsg(topic string, body string) (mqMsg MqMsg, err error) {
return r.SendByteMsg(topic, []byte(body))
}
// SendByteMsg 生产数据
func (r *RocketMq) SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error) {
if r.producerIns == nil {
return mqMsg, errors.New("RocketMq producer not register")
}
result, err := r.producerIns.SendSync(context.Background(), &primitive.Message{
Topic: topic,
Body: body,
})
if err != nil {
return
}
if result.Status != primitive.SendOK {
return mqMsg, errors.New(fmt.Sprintf("RocketMq producer send msg error status:%v", result.Status))
}
mqMsg = MqMsg{
RunType: SendMsg,
Topic: topic,
MsgId: result.MsgID,
Body: body,
}
return mqMsg, nil
}
// RegisterRocketMqProducer 注册rocketmq生产者
func RegisterRocketMqProducer(endPoints []string, groupName string, retry int) (mqIns *RocketMq, err error) {
addr, err := primitive.NewNamesrvAddr(endPoints...)
if err != nil {
return nil, err
}
mqIns = &RocketMq{
endPoints: endPoints,
}
if retry <= 0 {
retry = 0
}
mqIns.producerIns, err = rocketmq.NewProducer(
producer.WithNameServer(addr),
producer.WithRetry(retry),
producer.WithGroupName(groupName),
)
if err != nil {
return nil, err
}
err = mqIns.producerIns.Start()
if err != nil {
return nil, err
}
return mqIns, nil
}
3.3 注册output
开发好的output,需要在includes.go里注册才能使用。
- 编辑文件 beats-7.10.0/libbeat/publisher/includes/includes.go
- 在代码 github.com/elastic/beats/v7/libbeat/outputs/redis 下增加 github.com/elastic/beats/v7/libbeat/outputs/rocketmq
4.使用
4.1 命令行编译
- beats编译
- 由于https://proxy.golang.org代理非常容易超时。
- make编译之前输入命令
- go env -w GOPROXY=https://goproxy.cn
- 编译
- cd beats-7.10.0 make
- 此时会出现“Installing mage v1.10.0.” 这里需要花费一点时间.
- filebeat编译
- cd beates-7.10.0/filebeat make
- 正常编译通过的话,会在filebeat目录下生成filebeat。
- 不实用动态库
- 以上编译后默认采用动态库,比如glibc,实际linux服务器上会可能出现glibc不一致,导致无法运行。接近方案采用静态库。
go build编译时,CGO_ENABLED=1的,自动添加了一些动态库链接,所以编译时吧CGO_ENABLED=0就OK了;
CGO_ENABLED=0 go build -a -ldflags '-extldflags "-static"' .
make
4.2 配置
开始自己的output配置
output.rocketmq:
host: ip:9876
topic: topic名称
如果是rocketmq集群,可以通过逗号区分
output.rocketmq:
host: ip1:9876,ip2:9876
topic: topic名称
4.3 启动
./filebeat -c 配置文件 -e
4.4 快速测试
4.4.1 写个java的消费者测试类
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.util.List;
public class RocketMQConsumer {
public static void main(String[] args) throws MQClientException {
//创建消费者
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("default");
//设置NameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置实例名称
consumer.setInstanceName("consumer");
//订阅topic
consumer.subscribe("tbs_log_mq_topic_dev","*");//topic和配置文件对应
//监听消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//获取消息
for (MessageExt messageExt:list){
//RocketMQ由于是集群环境,所有产生的消息ID可能会重复
System.out.println(messageExt.getMsgId()+"---"+new String(messageExt.getBody()));
}
//接受消息状态 1.消费成功 2.消费失败 队列还有
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
System.out.println("consumer Started!");
}
}
4.4.2 filebeat测试配置
filebeat.inputs:
- type: log
enabled: true
paths:
- /opt/dir/logs/*_mq.log
fields:
log_topic: trans_log
level: debug
multiline.pattern: ^\[
multiline.negate: true
multiline.match: after
filebeat.registry.path: /opt/dir/logs/filebeatrocketmq
output.rocketmq:
host: 127.0.0.1:9876
topic: tbs_log_mq_topic_dev
5.注意事项
前面提到的几个点,这边注意事项再强调下
- 代理下载的问题 :go env -w GOPROXY=https://goproxy.cn。
- 不实用系统的glibc: CGO_ENABLED=0 go build -a -ldflags ‘-extldflags “-static”’ 。
- 测试配置文件参数filebeat.registry.path 对应目录创建,filebeat保存数据的目录。
- filebeat测试配置,设置multiline.pattern: ^[ ,匹配是否多行。
- 支持mq重启后,继续自动发送。
相关推荐
- 10条军规:电商API从数据泄露到高可用的全链路防护
-
电商API接口避坑指南:数据安全、版本兼容与成本控制的10个教训在电商行业数字化转型中,API接口已成为连接平台、商家、用户与第三方服务的核心枢纽。然而,从数据泄露到版本冲突,从成本超支到系统崩溃,A...
- Python 文件处理在实际项目中的困难与应对策略
-
在Python项目开发,文件处理是一项基础且关键的任务。然而,在实际项目中,Python文件处理往往会面临各种各样的困难和挑战,从文件格式兼容性、编码问题,到性能瓶颈、并发访问冲突等。本文将深入...
- The Future of Manufacturing with Custom CNC Parts
-
ThefutureofmanufacturingisincreasinglybeingshapedbytheintegrationofcustomCNC(ComputerNumericalContro...
- Innovative Solutions in Custom CNC Machining
-
Inrecentyears,thelandscapeofcustomCNCmachininghasevolvedrapidly,drivenbyincreasingdemandsforprecisio...
- C#.NET serilog 详解(c# repository)
-
简介Serilog是...
- Custom CNC Machining for Small Batch Production
-
Inmodernmanufacturing,producingsmallbatchesofcustomizedpartshasbecomeanincreasinglycommondemandacros...
- Custom CNC Machining for Customized Solutions
-
Thedemandforcustomizedsolutionsinmanufacturinghasgrownsignificantly,drivenbydiverseindustryneedsandt...
- Revolutionizing Manufacturing with Custom CNC Parts
-
Understandinghowmanufacturingisevolving,especiallythroughtheuseofcustomCNCparts,canseemcomplex.Thisa...
- Breaking Boundaries with Custom CNC Parts
-
BreakingboundarieswithcustomCNCpartsinvolvesexploringhowadvancedmanufacturingtechniquesaretransformi...
- Custom CNC Parts for Aerospace Industry
-
Intherealmofaerospacemanufacturing,precisionandreliabilityareparamount.Thecomponentsthatmakeupaircra...
- Cnc machining for custom parts and components
-
UnderstandingCNCmachiningforcustompartsandcomponentsinvolvesexploringitsprocesses,advantages,andcomm...
- 洞察宇宙(十八):深入理解C语言内存管理
-
分享乐趣,传播快乐,增长见识,留下美好。亲爱的您,这里是LearingYard学苑!今天小编为大家带来“深入理解C语言内存管理”...
- The Art of Crafting Custom CNC Parts
-
UnderstandingtheprocessofcreatingcustomCNCpartscanoftenbeconfusingforbeginnersandevensomeexperienced...
- Tailored Custom CNC Solutions for Automotive
-
Intheautomotiveindustry,precisionandefficiencyarecrucialforproducinghigh-qualityvehiclecomponents.Ta...
- 关于WEB服务器(.NET)一些经验累积(一)
-
以前做过技术支持,把一些遇到的问题累积保存起来,现在发出了。1.问题:未能加载文件或程序集“System.EnterpriseServices.Wrapper.dll”或它的某一个依赖项。拒绝访问。解...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- 10条军规:电商API从数据泄露到高可用的全链路防护
- Python 文件处理在实际项目中的困难与应对策略
- The Future of Manufacturing with Custom CNC Parts
- Innovative Solutions in Custom CNC Machining
- C#.NET serilog 详解(c# repository)
- Custom CNC Machining for Small Batch Production
- Custom CNC Machining for Customized Solutions
- Revolutionizing Manufacturing with Custom CNC Parts
- Breaking Boundaries with Custom CNC Parts
- Custom CNC Parts for Aerospace Industry
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)
- vmware17pro最新密钥 (34)
- mysql单表最大数据量 (35)