kafka的优点,我想大家应该知道…. 一个可以支撑大流量的消息系统, 要比redis靠谱点,毕竟人家数据可以落地硬盘…
现在所有的日志都通过logstash收集到了elasticsearch里面。 我们可以通过kibana来进行查看各种日志报表,但是问题是我们如何针对某些异常进行告警… 比如我临时要过滤些条件,比如出现某种ERROR,又或者爬虫服务的xx.com出现了大量的超时,某些页面因为改版,导致我们抓不到他的具体信息了。
文章写的不是很严谨,欢迎来喷,另外该文后续有更新的,请到原文地址查看更新.
对于kafka服务端的安装是有些麻烦的,另外它是需要zookeeper协同调度的。 我这边就不说kafka安装流程了,大家自己找个文档搞搞就行了。 我们的模块都是在docker服务器上,就顺手也pull了一个docker化的kafka和zookeeper。
docker pull spotify/kafka
docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`boot2docker ip` --env ADVERTISED_PORT=9092 spotify/kafka
export KAFKA=`boot2docker ip`:9092
kafka-console-producer.sh --broker-list KAFKA --topic test
export ZOOKEEPER=`boot2docker ip`:2181
kafka-console-consumer.sh --zookeeperZOOKEEPER --topic test
下面的是logstash相关的配置,我们以前是直接在agent端把output扔到es里面,随着量级的增大,就改放到了redis里做broker消息队列。 现在要利用kafka做发布,那么我们只需要加入kafka就可以了。
这里有人可能疑惑了,为毛不直接解析进行监控? 直接Elasticsearch? 你怎么知道什么时候会出现某个条件的异常? 难道要实时的去query es? 大家要知道logstash很好的帮助大家做好了日志的切分,tail,offset机制。
# input {
kafka {
zk_connect => ... # string (optional), default: "localhost:2181"
group_id => ... # string (optional), default: "logstash"
topic_id => ... # string (optional), default: "test"
reset_beginning => ... # boolean (optional), default: false
consumer_threads => ... # number (optional), default: 1
queue_size => ... # number (optional), default: 20
rebalance_max_retries => ... # number (optional), default: 4
rebalance_backoff_ms => ... # number (optional), default: 2000
consumer_timeout_ms => ... # number (optional), default: -1
consumer_restart_on_error => ... # boolean (optional), default: true
consumer_restart_sleep_ms => ... # number (optional), default: 0
decorate_events => ... # boolean (optional), default: false
consumer_id => ... # string (optional) default: nil
fetch_message_max_bytes => ... # number (optional) default: 1048576
}
}
这是output输出
# output {
kafka {
broker_list => ... # string (optional), default: "localhost:9092"
topic_id => ... # string (optional), default: "test"
compression_codec => ... # string (optional), one of ["none", "gzip", "snappy"], default: "none"
compressed_topics => ... # string (optional), default: ""
request_required_acks => ... # number (optional), one of [-1, 0, 1], default: 0
serializer_class => ... # string, (optional) default: "kafka.serializer.StringEncoder"
partitioner_class => ... # string (optional) default: "kafka.producer.DefaultPartitioner"
request_timeout_ms => ... # number (optional) default: 10000
producer_type => ... # string (optional), one of ["sync", "async"] default => 'sync'
key_serializer_class => ... # string (optional) default: "kafka.serializer.StringEncoder"
message_send_max_retries => ... # number (optional) default: 3
retry_backoff_ms => ... # number (optional) default: 100
topic_metadata_refresh_interval_ms => ... # number (optional) default: 600 * 1000
queue_buffering_max_ms => ... # number (optional) default: 5000
queue_buffering_max_messages => ... # number (optional) default: 10000
queue_enqueue_timeout_ms => ... # number (optional) default: -1
batch_num_messages => ... # number (optional) default: 200
send_buffer_bytes => ... # number (optional) default: 100 * 1024
client_id => ... # string (optional) default: ""
}
}
上线是关于logstash里面关于kafka的input output的配置,我们再来看看同时订阅kafka broker的python程序是如何跑的。 具体代码我就不贴了,你可以通过一个demo很容易进行扩展.
#!/usr/bin/env python
#blog: xiaorui.cc
import threading, logging, time
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer
#这个是喂数据的
class Producer(threading.Thread):
daemon = True
def run(self):
client = KafkaClient("localhost:9092")
producer = SimpleProducer(client)
while True:
producer.send_messages('my-topic', "test")
producer.send_messages('my-topic', "\xc2Hola, mundo!")
time.sleep(1)
#这个是消费者,也就是咱们从kafka取数据的逻辑。
class Consumer(threading.Thread):
daemon = True
def run(self):
client = KafkaClient("localhost:9092")
consumer = SimpleConsumer(client, "test-group", "my-topic")
for message in consumer:
print(message)
def main():
threads = [
Producer(),
Consumer()
]
for t in threads:
t.start()
time.sleep(5)
if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
level=logging.DEBUG
)
main()
另外python kafka里面是支持多个consumer的, 要注意kafka的分区数量.
from kafka import KafkaClient, MultiProcessConsumer
kafka = KafkaClient("localhost:9092")
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic",
partitions_per_proc=2)
for message in consumer:
print(message)
for message in consumer.get_messages(count=5, block=True, timeout=4):
print(message)
本来是想写监控跟报警的…. 就先这样了,文章写得有些乱. 主题是,推荐大家使用kafka替换redis,如果出现后端的存储出现问题,那么就需要使用redis内存抗住数据…. kafka明显更合适.

这跟报警有毛关系
顶一个….我也做了了一套,不过使用kafka-spark-hdfs,报表写到mongodb中
感觉思路相当的不错,感觉在架构上又复杂了