Yee's 部落格

使用logstash, elasticsearch, kibana做日志分析系统

一般情况下,一个站点会由多台server构成, 每台server每天会产生多种不同的日志,为了方面分析线上日志,常用做法是把线上日志集中存储到一个地方,然后按照日期,日志类型,日志内关键字等条件查询。而logstash, elasticsearch, 和kibana就是用来做这件事的开源解决方案。

Logstash 运行于Jruby上的日志收集和分析系统,支持多种后端存储,包括elasticsearch, s3, riak, mongodb, redis等
Elasticsearch Java写的搜索引擎,简单好用,很适合日志的存储和查询, 简称ES
Kibana Elasticsearch的查询前端,最早用PHP写的,后来用Ruby重新实现了一遍,最新版本是纯Javascript的,直接在浏览器端运行。

架构上,在服务器不多的情况下,可以使用Logstash分析日志后,直接存储到ES里面,然后调用Kibana做日志查询。

如果服务器很多,或者是需要跨网络,那么可以使用前向代理,做成树状结构,树叶是Logstash,分析完成后传给树枝logstash-forwarder, 然后传给树干Logstash, 最终输出汇总给ES

树枝的角色一般叫做shipper,有很多方案可以代替logstash-forwarder, 比如redis, zeromq, rabbitmq等

Logstash, elasticsearch, kibana的安装都很简单,这里直接略过,下面说下logstash是如何完成日志收集工作的。

Logstash日志收集可以分成3大块,input, filter和output, 可以通过启动时指定配置文件来配置。

1
./bin/logstash -f config_file

Input是配置日志源的,这里使用文件做例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
input {
  file {
    #日志存储格式 /web/mywebsite/logs/201407/login_errr_10.log
    path => "/web/mywebsite/logs/*/*"  
    type => "web_log"
  }

  file {
    #日志存储格式 /web/mywebsite/logs/apache_access.log
    path => "/web/mywebsite/logs/*"
    type => "access_log"
  }
}

可以看到指定输入的时候可以使用通配符监视多个文件。

Filter用来对日志做过滤,比较常用的filter有

grok 通过正则表达式把每一行的日志做匹配,把非结构化的信息变成结构化的
date 可以用指定字段的值作为当前log的时间戳,而不是使用导入log时的系统时间。
multiline 可以把多行日志合并成一个logstash事件

解释起来比较拗口,看实际的配置例子更容易理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
filter {
  #可以使用条件语句的, 中括号里面的[type]代表引用type这个字段的值
  if [type] == "access_log" {
    grok {
      #COMBINEDAPACHELOG, 预定义的一组正则
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }

    #这里把[timestamp]这个字段的值按照"dd/MMM/yyyy:HH:mm:ss Z"的格式解析成日期,作为本条log的产生日期。
    date {
      match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
  } else {

    #匹配这样的消息:  2014-07-14T22:10:51-07:00 INFO (6): log body
    grok {
      match => {"message"  => "%{TIMESTAMP_ISO8601:timestamp} %{DATA:data}"}
    }

    #这里把[timestamp]这个字段的值按照iso8601的格式解析
    date {
      match => [ "timestamp" , "ISO8601" ]
    }

    #增加[site], [log]字段,比如/web/mywebsite/logs/login_error_10.log
    #匹配到的[site] == 'mywebsite', [log] = 'login_error_'
    ruby {
        code => "
           site = 'null'
           log = 'null'
           m = /(.*)\/(.*)\/logs(.*)\/([a-zA-Z\-_]+)(.*)\.(.*)/.match(event['path'])
           site = m[2]; log=m[4] if m
           event['site'] = site
           event['log'] = log
        "
    }
  }
}

上面的COMBINEDAPACHELOG是一组预定义的正则,https://github.com/logstash/logstash/tree/v1.4.2/patterns这里能看到更多的。

使用上面配置解析出的log如下

1
2
3
4
5
6
7
8
9
10
11
{
  "message" => "2014-07-14T22:12:26-07:00 INFO (6): Log detail",
  "@version" => "1",
  "@timestamp" => "2014-07-15T05:12:26.000Z",
  "type" => "web_log",
  "host" => "myserver.local",
  "path" => "/web/mywebsite/logs/201407/login_error_14.txt",
  "timestamp" => "2014-07-14T22:12:26-07:00"
  "site" => "mywebsite",
  "log" => "login_error_"  
}

##在使用Filter转换好格式后,就可以配置output输出了

1
2
3
4
output {
  stdout { codec => rubydebug }
  elasticsearch_http { host => localhost port => 9200}
}

可以看到,我配置了2个输出,一个使用rubydebug的格式打印到console上,一个使用elasticsearch_http输出到ES。

以上就是logstash的配置过程,最重点的部分在filter上。数据到了ES之后,使用kibana查询可以参考这里的介绍


补充: 如果不想处理多行消息,可以使用\v\r,代替\r\n,可以在写入log前统一处理, php代码如下:

1
2
$message = str_replace(array("\r\n","\n","\r"), "\v", $message);
$message = str_replace("\v", "\v\r", $message);