​云函数实践(含代码):将日志服务的日志投递到自建 Kafka 的 3 个步骤

上文提到 将K8S日志采集到日志服务,这次介绍将采集的日志投递到自建 Kafka 中,用于 Spark 计算。

核心流程

容器日志 -> 日志服务 -> 使用函数处理,将日志投递至自建 Kafka

本文介绍如何创建云函数,将日志投递至 Kafka 中。

1. 创建云函数 SCF

打开 函数服务列表,基于模板 CLS 数据转存到 Ckafka 创建函数。

虽然模板是投递 Ckafka,不过 Ckafka和 Kafka 兼容性好,所以投递 Kafka 也没问题。

基于模板创建SCF

1.1 填写基础配置

启用私有网络,函数服务使用的 VPC 和 Kafka 所在 VPC 相同。

如果不同,可以使用 对等连接 解决。

启用 VPC

1.2 函数代码

默认模板会把日志原始数据当成字符串,把每个字符当成一行 message 进行输出(for record in records:),需要调整代码。

不知道是因为我的日志服务原始数据设置的是 JSON 格式,还是当前 CLS 数据转存到 Ckafka 模板过时了

SCF 函数代码

有 3 处代码修改,详见注释,完整代码如下:

  • #!/usr/bin/env python
  • # -*- coding: utf-8 -*-
  • import time
  • import logging
  • import os
  • import base64
  • import json
  • import gzip
  • import urllib
  • from kafka import KafkaProducer
  • from kafka.errors import KafkaError
  • from StringIO import StringIO
  • logger = logging.getLogger('kafka')
  • logger.setLevel(logging.INFO)
  • class ClsToKafka(object):
  • """
  • CLS 消息投递 kafka
  • """
  • def __init__(self, host, **kwargs):
  • self.host = host
  • self.producer = KafkaProducer(bootstrap_servers = [self.host],
  • retries = 10,
  • max_in_flight_requests_per_connection = 1,
  • request_timeout_ms = 30000,
  • max_block_ms = 60000,
  • **kwargs
  • )def send(self, topic, records):
  • """
  • 异步生产 kafka 消息
  • """
  • global count
  • count = 0
  • def on_send_success(record_metadata):
  • global count
  • count = count +1
  • def on_send_error(excp):
  • logger.error('failed to send message', exc_info = excp)
  • s_time = time.time()
  • try:
  • ## 修改 1: 原始消息是 JSON 格式(日志服务采集容器服务输出的日志格式是 JSON),每条消息位于 .records(type: List)
  • ## for record in records:
  • for record in records['records']:
  • key = ""# 当 key 为""或者为"None" 时,要传入 key=None,这样 python kafka 库会随机选取一个 partition 写入消息
  • if key == "" or key =="None":
  • key = None
  • ## 修改 2:record 是 dict,因为原始数据就是 JSON,需要转成 str,否则调用 self.producer.send 会报错 "assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))"
  • value = json.dumps(record)
  • # 也可以对消息进行处理后再转存
  • #value = deal_message(data)
  • self.producer.send(topic, key = key, value = value).add_callback(on_send_success).add_errback(on_send_error)
  • # block until all async messages are sent
  • self.producer.flush()
  • except KafkaError as e:
  • return e
  • finally:
  • if self.producer is not None:
  • self.producer.close()e_time = time.time()
  • return "{} messages delivered in {}s".format(count, e_time - s_time)
  • # 这里可以对消息进行处理后返回
  • def deal_message(message):
  • return message
  • def main_handler(event, context):
  • kafka_address = os.getenv("kafka_address")
  • kafka_topic_name = os.getenv("kafka_topic_name")
  • kafka_to_kafka = ClsToKafka(
  • kafka_address
  • #security_protocol = "SASL_PLAINTEXT",
  • #sasl_mechanism = "PLAIN",
  • #sasl_plain_username = "ckafka-80o10xxx#lkoxx",
  • #sasl_plain_password = "ccllxxxx",
  • #api_version=(0, 10, 2)
  • )event = json.loads(gzip.GzipFile(fileobj=StringIO(event['clslogs']['data'].decode('base64'))).read())# print("type of event: %s" % type(event))
  • ## 修改 3:直接使用 event 这个字典,便于从字典中获取每条消息的内容
  • ## data = json.dumps(event, indent=4, sort_keys=True)
  • ## ret = kafka_to_kafka.send(kafka_topic_name,data)
  • ret = kafka_to_kafka.send(kafka_topic_name,event)
  • logger.info(ret)
  • return ret
展开

1.4 高级设置:环境设置

云函数需要使用 kafka_addresskafka_topic_name 这 2 个变量,在 环境配置 中配好。

设置环境变量

点击创建后,部署成功。

创建SCF成功

2. 为日志服务的日志主题设置函数处理

在日志服务的 日志主题 页面找到需要投递消息的主题,在 函数处理 TAB 中 选择刚创建的函数即可。

为日志主题设置函数处理

函数处理创建成功。

函数处理设置成功

3. 查看投递到自建 Kafka 的效果

等待 1 分钟后,查看函数每次调用的日志,可以看到调用已成功。

查看SCF的调用日志

同时可以了解整体调用监控数据。

查看SCF的调用监控

自建的 Kakfa 是使用 Cloudera Management 创建的,在 CM 中看到 Topic 已有数据写入。

查看SCF的调用监控

使用命令行也可以看到数据持续写入。

  • # ./kafka-console-consumer.sh --bootstrap-server 10.0.0.29:9092 --topic scf_topic --offset latest --partition 0
  • {"content": "{\\"Accept\\":\\"*/*\\",\\"Body\\":\\"\\",\\"Host\\":\\"header.dev.xxx.cn\\",\\"Method\\":\\"GET\\",\\"Protocol\\":\\"HTTP/1.1\\",\\"Referer\\":\\"\\",\\"RemoteAddr\\":\\"172.16.7.71:37468\\",\\"RequestURI\\":\\"/\\",\\"Type\\":\\"web\\",\\"UserAgent\\":\\"Qcloud-boce\\",\\"X-Forwarded-For\\":\\"58.87.66.69\\"}", "timestamp": 1618716491203428}
  • {"content": "{\\"Accept\\":\\"*/*\\",\\"Body\\":\\"\\",\\"Host\\":\\"header.dev.xxx.cn\\",\\"Method\\":\\"GET\\",\\"Protocol\\":\\"HTTP/1.1\\",\\"Referer\\":\\"\\",\\"RemoteAddr\\":\\"172.16.7.71:36864\\",\\"RequestURI\\":\\"/\\",\\"Type\\":\\"curl\\",\\"UserAgent\\":\\"Qcloud-curl\\",\\"X-Forwarded-For\\":\\"180.163.9.66\\"}", "timestamp": 1618716494178403}
  • ...
  • Processed a total of 9 messages

FAQ

自建 Kafka 对外提供服务

如果函数调用有日志有如下报错,则证明 Kafka broker 未设置对外可访问的地址,参照 Won’t Connect to My Apache Kafka Cluster 修改 advertised.listeners 配置即可。

  • DNS lookup failed for hadoop-29.com:9092, exception was [Errno -2] Name or service not known. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?
  • DNS lookup failed for hadoop-29.com:9092 (0)

reference

本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
<<上一篇
下一篇>>