How to develop a beat

作者 Lu Liang 日期 2018-12-18
How to develop a beat

ELK(Elasticsearch, Logstash, Kibana)几乎已经成为cloud上面标准的日志收集及展示解决方案。这里我们探讨一下ELK中的日志收集组件beat的原理,以及如何实现一个定制的beat。

beat在ELK中的位置

如下图所示:beat在ELK中用于收集各种数据并且输入到其它component中。

beat组件原理及接口介绍

beat是以GO开发的一个可执行程序,主要以下有两个部分组成。

  • data collect component 收集实际的信息数据.
  • publisher 将收集的信息数据通过事件机制发布到elasticsearch。 事件通常通过一个JSON-like object(GO 中的 map[string] interface{})去封装实际收集的信息数据。 publisher 已经被Libbeat实现了,Libbeat 也提供了其它通用的功能例如,配置管理,日志管理等。

每个自定义的beat需要实现Beater接口, 该接口在libbeat中定义。如下所示,当beat启动后,Run()会一直运行,直到接收到退出信号,调用Stop()退出。

type Beater interface {
// The main event loop. This method should block until signalled to stop by an
// invocation of the Stop() method.
Run(b *Beat) error

// Stop is invoked to signal that the Run method should finish its execution.
// It will be invoked at most once.
Stop()
}

以下代码是本例中custombeat的具体实现。

package beater

//import relevant packages of libbeat, so we do not need to worry about configuration, logging etc.

import (
"fmt"
"time"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"

"github.com/luliang/custombeat/config"
)

// Custombeat configuration.
// define the struct for custom beat
// In this struct,
// use pre-imported config.Config and beat.client.
// use channel to receive exit signal.

type Custombeat struct {
done chan struct{}
config config.Config
client beat.Client
}

// New creates an instance of custombeat.
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
c := config.DefaultConfig
if err := cfg.Unpack(&c); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}

bt := &Custombeat{
done: make(chan struct{}),
config: c,
}
return bt, nil
}

// Run starts custombeat.
func (bt *Custombeat) Run(b *beat.Beat) error {
logp.Info("custombeat is running! Hit CTRL-C to stop it.")

var err error
// use beat.client to connect and publish data.
bt.client, err = b.Publisher.Connect()
if err != nil {
return err
}

// get the time interval from config by using config.Config
ticker := time.NewTicker(bt.config.Period)
counter := 1
for {
select {
case <-bt.done:
return nil
case <-ticker.C:
}

// generate event data
event := beat.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"type": b.Info.Name,
"counter": counter,
},
}

// publish event data
bt.client.Publish(event)
logp.Info("Event sent")
counter++
}
}

// Stop stops custombeat.
func (bt *Custombeat) Stop() {
bt.client.Close()
close(bt.done)
}

开发自定义beat

  1. Install python2.7, go and configure GOPATH

    ➜  github.com python --version
    Python 2.7.11

    ➜ github.com go version
    go version go1.11 darwin/amd64

    ➜ github.com echo $GOPATH
    /Users/luliang/git/gobeat
  2. Clone beats project

    git clone https://github.com/elastic/beats ${GOPATH}/src/github.com/elastic/beats
  3. Generate custom beat with generator generate.py

    python $GOPATH/src/github.com/elastic/beats/script/generate.py

    ➜ github.com python $GOPATH/src/github.com/elastic/beats/script/generate.py
    Beat Name [Examplebeat]: CustomBeat
    Your Github Name [your-github-name]: luliang
    Beat Path [github.com/luliang/custombeat]:
    Firstname Lastname: Liang Lu

    ➜ github.com ls
    elastic luliang

    ➜ github.com cd luliang
    ➜ luliang ls
    **custombeat**
  4. Call “make setup” and “make” to build the custombeat

    ➜  luliang cd custombeat
    ➜ custombeat make setup

    ➜ beater pwd
    /Users/luliang/git/gobeat/src/github.com/luliang/custombeat/beater
    ➜ beater ls
    custombeat.go

    ➜ custombeat make

如下图所示,customebeat.go 实现了beat接口, 我们需要在这里定义实现自己的逻辑。

  1. Run and test the custombeat
    $GOPATH/src/github.com/luliang/custombeat -e -d "*"

常见beats如下:

数据类型 Beat类型
Audit data AuditbeatAuditbeat
Log files FilebeatFilebeat
Cloud data FunctionbeatFunctionbeat
Availability HeartbeatHeartbeat
Systemd journals JournalbeatJournalbeat
Metrics MetricbeatMetricbeat
Network traffic PacketbeatPacketbeat
Windows event logs WinlogbeatWinlogbeat