Oceanus实践-消费 CMQ 主题模型数据源
实时即未来,最近在腾讯云Oceanus进行Flink实时计算服务,以下为flink消费腾讯云CMQ数据实践。原文自Raigor,已获得授权,分享给大家~
Oceanus Flink CMQ connector 支持队列模型的数据源表和目的表,暂时不支持主题模型数据源表和目的表。CMQ 主题订阅可以实时同步主题模型数据到队列模型,借助这种机制,我们可以在 Oceanus 实现 CMQ 主题模型数据源表的读取。
1. 环境搭建
1.1 创建 Oceanus 集群
在 Oceanus 控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。
若之前未使用过VPC,日志,存储这些组件,需要先进行创建。
创建完后的集群如下:
CMQ 主题
1.3 新建 CMQ 队列
在 CMQ 控制台的【队列】-> 【新建】主题,输入队列名称、消息生命周期、堆积消息数量上限,其他保持默认值即可。我们这里新建两个队列,其中一个用来订阅 CMQ 主题模型数据,另一个用作 Oceanus 作业的目的表。新建的主题如下:
CMQ 读取 & 写入
CREATE TABLE `CMQSourceTable` (
`id` bigint,
`request_method` varchar(80),
`response` varchar(80),
PRIMARY KEY (`id`) NOT ENFORCED --如果想做到数据去重的操作,则需要指定PK,按照这个主键来区分不同的数据
) WITH (
'connector' = 'cmq', --必须为 'cmq'
'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', --cmq所在地域的nameServer
'queue' = 'cs2', --cmq的队列名
'secret-id' = 'Your SecretId', --账号secretId
'secret-key' = 'Your SecretKey', --账号secretKey
'sign-method' = 'HmacSHA1', --签名的方式
'format' = 'csv', --定义数据格式(JSON 格式)
'batch-size' = '16', --批量消费消息的个数/批量发送消息的个数
'request-timeout' = '5000ms', --请求的超时时间
'polling-wait-timeout'= '10s', --source参数; 获取不到数据情况下的等待时间
'key-alive-timeout'= '5min', --source参数;含primary key的消息,CMQ去重的有效时间
'retry-times' = '3', --sink参数;发送消息的重试次数
'max-block-timeout' = '0s' --sink参数;批量发送数据的最大等待时间
);
CREATE TABLE `CMQSinkTable` (
`id` bigint,
`request_method` varchar(80),
`response` varchar(80),
PRIMARY KEY (`id`) NOT ENFORCED --如果想做到数据去重的操作,则需要指定PK,按照这个主键来区分不同的数据
) WITH (
'connector' = 'cmq', --必须为 'cmq'
'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com', --cmq所在地域的nameServer
'queue' = 'sink_queue', --cmq的队列名
'secret-id' = 'Your SecretId', --账号secretId
'secret-key' = 'Your SecretKey', --账号secretKey
'sign-method' = 'HmacSHA1', --签名的方式
'format' = 'csv', --定义数据格式(JSON 格式)
'batch-size' = '16', --批量消费消息的个数/批量发送消息的个数
'request-timeout' = '5000ms', --请求的超时时间
'polling-wait-timeout'= '10s', --source参数; 获取不到数据情况下的等待时间
'key-alive-timeout'= '5min', --source参数;含primary key的消息,CMQ去重的有效时间
'retry-times' = '3', --sink参数;发送消息的重试次数
'max-block-timeout' = '0s' --sink参数;批量发送数据的最大等待时间
);
insert into CMQSinkTable select * from CMQSourceTable;
2.3 算子操作
这里只做最简单的数据插入。
insert into CMQSinkTable select *from CMQSourceTable;
3. 验证总结
在 CMQ 控制台往名为test的主题中发送消息,可在sink_queue的队列中接收到消息。
原文链接:https://cloud.tencent.com/developer/article/1857665