kafka的优点,我想大家应该知道…. 一个可以支撑大流量的消息系统, 要比redis靠谱点,毕竟人家数据可以落地硬盘…
现在所有的日志都通过logstash收集到了elasticsearch里面。 我们可以通过kibana来进行查看各种日志报表,但是问题是我们如何针对某些异常进行告警… 比如我临时要过滤些条件,比如出现某种ERROR,又或者爬虫服务的xx.com出现了大量的超时,某些页面因为改版,导致我们抓不到他的具体信息了。
文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新.
对于kafka服务端的安装是有些麻烦的,另外它是需要zookeeper协同调度的。 我这边就不说kafka安装流程了,大家自己找个文档搞搞就行了。 我们的模块都是在docker服务器上,就顺手也pull了一个docker化的kafka和zookeeper。
docker pull spotify/kafka
docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`boot2docker ip` --env ADVERTISED_PORT=9092 spotify/kafka
export KAFKA=`boot2docker ip`:9092
kafka-console-producer.sh --broker-list KAFKA --topic test
export ZOOKEEPER=`boot2docker ip`:2181
kafka-console-consumer.sh --zookeeperZOOKEEPER --topic test
下面的是logstash相关的配置,我们以前是直接在agent端把output扔到es里面,随着量级的增大,就改放到了redis里做broker消息队列。 现在要利用kafka做发布,那么我们只需要加入kafka就可以了。
这里有人可能疑惑了,为毛不直接解析进行监控? 直接Elasticsearch? 你怎么知道什么时候会出现某个条件的异常? 难道要实时的去query es? 大家要知道logstash很好的帮助大家做好了日志的切分,tail,offset机制。
# input { kafka { zk_connect => ... # string (optional), default: "localhost:2181" group_id => ... # string (optional), default: "logstash" topic_id => ... # string (optional), default: "test" reset_beginning => ... # boolean (optional), default: false consumer_threads => ... # number (optional), default: 1 queue_size => ... # number (optional), default: 20 rebalance_max_retries => ... # number (optional), default: 4 rebalance_backoff_ms => ... # number (optional), default: 2000 consumer_timeout_ms => ... # number (optional), default: -1 consumer_restart_on_error => ... # boolean (optional), default: true consumer_restart_sleep_ms => ... # number (optional), default: 0 decorate_events => ... # boolean (optional), default: false consumer_id => ... # string (optional) default: nil fetch_message_max_bytes => ... # number (optional) default: 1048576 } }
这是output输出
# output { kafka { broker_list => ... # string (optional), default: "localhost:9092" topic_id => ... # string (optional), default: "test" compression_codec => ... # string (optional), one of ["none", "gzip", "snappy"], default: "none" compressed_topics => ... # string (optional), default: "" request_required_acks => ... # number (optional), one of [-1, 0, 1], default: 0 serializer_class => ... # string, (optional) default: "kafka.serializer.StringEncoder" partitioner_class => ... # string (optional) default: "kafka.producer.DefaultPartitioner" request_timeout_ms => ... # number (optional) default: 10000 producer_type => ... # string (optional), one of ["sync", "async"] default => 'sync' key_serializer_class => ... # string (optional) default: "kafka.serializer.StringEncoder" message_send_max_retries => ... # number (optional) default: 3 retry_backoff_ms => ... # number (optional) default: 100 topic_metadata_refresh_interval_ms => ... # number (optional) default: 600 * 1000 queue_buffering_max_ms => ... # number (optional) default: 5000 queue_buffering_max_messages => ... # number (optional) default: 10000 queue_enqueue_timeout_ms => ... # number (optional) default: -1 batch_num_messages => ... # number (optional) default: 200 send_buffer_bytes => ... # number (optional) default: 100 * 1024 client_id => ... # string (optional) default: "" } }
上线是关于logstash里面关于kafka的input output的配置,我们再来看看同时订阅kafka broker的python程序是如何跑的。 具体代码我就不贴了,你可以通过一个demo很容易进行扩展.
#!/usr/bin/env python #blog: xiaorui.cc import threading, logging, time from kafka.client import KafkaClient from kafka.consumer import SimpleConsumer from kafka.producer import SimpleProducer #这个是喂数据的 class Producer(threading.Thread): daemon = True def run(self): client = KafkaClient("localhost:9092") producer = SimpleProducer(client) while True: producer.send_messages('my-topic', "test") producer.send_messages('my-topic', "\xc2Hola, mundo!") time.sleep(1) #这个是消费者,也就是咱们从kafka取数据的逻辑。 class Consumer(threading.Thread): daemon = True def run(self): client = KafkaClient("localhost:9092") consumer = SimpleConsumer(client, "test-group", "my-topic") for message in consumer: print(message) def main(): threads = [ Producer(), Consumer() ] for t in threads: t.start() time.sleep(5) if __name__ == "__main__": logging.basicConfig( format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', level=logging.DEBUG ) main()
另外python kafka里面是支持多个consumer的, 要注意kafka的分区数量.
from kafka import KafkaClient, MultiProcessConsumer kafka = KafkaClient("localhost:9092") consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", partitions_per_proc=2) for message in consumer: print(message) for message in consumer.get_messages(count=5, block=True, timeout=4): print(message)
本来是想写监控跟报警的…. 就先这样了,文章写得有些乱. 主题是,推荐大家使用kafka替换redis,如果出现后端的存储出现问题,那么就需要使用redis内存抗住数据…. kafka明显更合适.
这跟报警有毛关系
顶一个….我也做了了一套,不过使用kafka-spark-hdfs,报表写到mongodb中
感觉思路相当的不错,感觉在架构上又复杂了