logstash联合python kafka进行异常数据监控

kafka的优点,我想大家应该知道…. 一个可以支撑大流量的消息系统, 要比redis靠谱点,毕竟人家数据可以落地硬盘…


现在所有的日志都通过logstash收集到了elasticsearch里面。 我们可以通过kibana来进行查看各种日志报表,但是问题是我们如何针对某些异常进行告警…   比如我临时要过滤些条件,比如出现某种ERROR,又或者爬虫服务的xx.com出现了大量的超时,某些页面因为改版,导致我们抓不到他的具体信息了。


文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新. 

http://xiaorui.cc/2015/04/05/logstash%E8%81%94%E5%90%88python-kafka%E8%BF%9B%E8%A1%8C%E5%BC%82%E5%B8%B8%E6%95%B0%E6%8D%AE%E7%9B%91%E6%8E%A7%E6%8A%A5%E8%AD%A6/


对于kafka服务端的安装是有些麻烦的,另外它是需要zookeeper协同调度的。   我这边就不说kafka安装流程了,大家自己找个文档搞搞就行了。 我们的模块都是在docker服务器上,就顺手也pull了一个docker化的kafka和zookeeper。 

docker pull spotify/kafka
docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`boot2docker ip` --env ADVERTISED_PORT=9092 spotify/kafka
export KAFKA=`boot2docker ip`:9092
kafka-console-producer.sh --broker-list KAFKA --topic test
export ZOOKEEPER=`boot2docker ip`:2181
kafka-console-consumer.sh --zookeeperZOOKEEPER --topic test

下面的是logstash相关的配置,我们以前是直接在agent端把output扔到es里面,随着量级的增大,就改放到了redis里做broker消息队列。  现在要利用kafka做发布,那么我们只需要加入kafka就可以了。 


这里有人可能疑惑了,为毛不直接解析进行监控?  直接Elasticsearch? 你怎么知道什么时候会出现某个条件的异常?  难道要实时的去query es?   大家要知道logstash很好的帮助大家做好了日志的切分,tail,offset机制。

# input {
    kafka {
        zk_connect => ... # string (optional), default: "localhost:2181"
        group_id => ... # string (optional), default: "logstash"
        topic_id => ... # string (optional), default: "test"
        reset_beginning => ... # boolean (optional), default: false
        consumer_threads => ... # number (optional), default: 1
        queue_size => ... # number (optional), default: 20
        rebalance_max_retries => ... # number (optional), default: 4
        rebalance_backoff_ms => ... # number (optional), default:  2000
        consumer_timeout_ms => ... # number (optional), default: -1
        consumer_restart_on_error => ... # boolean (optional), default: true
        consumer_restart_sleep_ms => ... # number (optional), default: 0
        decorate_events => ... # boolean (optional), default: false
        consumer_id => ... # string (optional) default: nil
        fetch_message_max_bytes => ... # number (optional) default: 1048576
    }
}

这是output输出

# output {
    kafka {
        broker_list => ... # string (optional), default: "localhost:9092"
        topic_id => ... # string (optional), default: "test"
        compression_codec => ... # string (optional), one of ["none", "gzip", "snappy"], default: "none"
        compressed_topics => ... # string (optional), default: ""
        request_required_acks => ... # number (optional), one of [-1, 0, 1], default: 0
        serializer_class => ... # string, (optional) default: "kafka.serializer.StringEncoder"
        partitioner_class => ... # string (optional) default: "kafka.producer.DefaultPartitioner"
        request_timeout_ms => ... # number (optional) default: 10000
        producer_type => ... # string (optional), one of ["sync", "async"] default => 'sync'
        key_serializer_class => ... # string (optional) default: "kafka.serializer.StringEncoder"
        message_send_max_retries => ... # number (optional) default: 3
        retry_backoff_ms => ... # number (optional) default: 100
        topic_metadata_refresh_interval_ms => ... # number (optional) default: 600 * 1000
        queue_buffering_max_ms => ... # number (optional) default: 5000
        queue_buffering_max_messages => ... # number (optional) default: 10000
        queue_enqueue_timeout_ms => ... # number (optional) default: -1
        batch_num_messages => ... # number (optional) default: 200
        send_buffer_bytes => ... # number (optional) default: 100 * 1024
        client_id => ... # string (optional) default: ""
    }
}

上线是关于logstash里面关于kafka的input output的配置,我们再来看看同时订阅kafka broker的python程序是如何跑的。   具体代码我就不贴了,你可以通过一个demo很容易进行扩展. 

#!/usr/bin/env python
#blog: xiaorui.cc
import threading, logging, time

from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer

#这个是喂数据的
class Producer(threading.Thread):
    daemon = True

    def run(self):
        client = KafkaClient("localhost:9092")
        producer = SimpleProducer(client)

        while True:
            producer.send_messages('my-topic', "test")
            producer.send_messages('my-topic', "\xc2Hola, mundo!")

            time.sleep(1)

#这个是消费者,也就是咱们从kafka取数据的逻辑。 
class Consumer(threading.Thread):
    daemon = True

    def run(self):
        client = KafkaClient("localhost:9092")
        consumer = SimpleConsumer(client, "test-group", "my-topic")

        for message in consumer:
            print(message)

def main():
    threads = [
        Producer(),
        Consumer()
    ]

    for t in threads:
        t.start()

    time.sleep(5)

if __name__ == "__main__":
    logging.basicConfig(
        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
        level=logging.DEBUG
        )
    main()

另外python kafka里面是支持多个consumer的,  要注意kafka的分区数量.

from kafka import KafkaClient, MultiProcessConsumer

kafka = KafkaClient("localhost:9092")


consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)

consumer = MultiProcessConsumer(kafka, "my-group", "my-topic",
                                partitions_per_proc=2)

for message in consumer:
    print(message)

for message in consumer.get_messages(count=5, block=True, timeout=4):
    print(message)

本来是想写监控跟报警的…. 就先这样了,文章写得有些乱.   主题是,推荐大家使用kafka替换redis,如果出现后端的存储出现问题,那么就需要使用redis内存抗住数据…. kafka明显更合适.


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

3 Responses

  1. 多宝鱼 2016年3月24日 / 下午5:37

    这跟报警有毛关系

  2. Addbook 2015年6月5日 / 上午11:56

    顶一个….我也做了了一套,不过使用kafka-spark-hdfs,报表写到mongodb中

  3. 运维 2015年4月7日 / 上午8:52

    感觉思路相当的不错,感觉在架构上又复杂了

发表评论

邮箱地址不会被公开。 必填项已用*标注