CKafka系列学习文章 - 云上消息队列它香不香?(十七)
一、先买买买,一个
1、供上购买链接:https://buy.cloud.tencent.com/ckafka?rid=1
2、先创建一个Ckafka实例
二、使用IntelliJ IDEA搭建Maven工程
好用的工具就自己下载吧:
https://www.jetbrains.com/products.html
安装JDK、Maven、nexus我就不教了,看前面的教程
生产者来了!!!
三、生产者来了:Producer.java
- import org.apache.kafka.clients.producer.Callback;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
-
- import java.util.Properties;
-
- public class Producer {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "ckafka-in5yxxxx.ap-guangzhou.ckafka.tencentcloudmq.com:6002");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
- props.put("session.timeout.ms", 30000);
- props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
- props.put("acks", "1");
- props.put("retries", 3);
- props.put("batch.size", 232840);
- props.put("linger.ms", 10);
- props.put("buffer.memory", 33554432);
- props.put("max.block.ms", 3000);
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\\"ckafka-in5xxxxf#Jensen\\" password=\\"xxxx\\";");
- Producer.asynSendRecord(props);
-
-
-
- }
- //异步发送消息,是不是有点浪漫
- public static void asynSendRecord(Properties props){
- KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
- for (int i = 0; i < 100; i++) {
- ProducerRecord<String,String> record=new ProducerRecord<String,String>("jasen", Integer.toString(i), Integer.toString(i));
- System.out.println("record:"+record.value());
- producer.send(record, new Callback() {
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e == null) {
- System.out.println("消息发送:"+"offset:"+recordMetadata.offset()+" timestamp:"+recordMetadata.timestamp()+" topic:"+recordMetadata.topic()+" partition:"+recordMetadata.partition());
- System.out.println("消息发送成功");
- } else {
- System.out.println(String.format("消息发送失败: %s", e.getMessage()));
- }
- }
- });
- }
- producer.close();
- }
- }
-
结果来了:
- C:\\Program Files\\Java\\jdk1.8.0_161\\bin\\java.exe" "-javaagent:D:\\program files\\JetBrains\\IntelliJ IDEA Community Edition 2019.3.3\\lib\\idea_rt.jar=51023:D:\\program files\\JetBrains\\IntelliJ IDEA Community Edition 2019.3.3\\bin" -Dfile.encoding=UTF-8 -classpath "C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\charsets.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\deploy.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\access-bridge-64.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\cldrdata.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\dnsns.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\jaccess.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\jfxrt.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\localedata.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\nashorn.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\sunec.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\sunjce_provider.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\sunmscapi.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\sunpkcs11.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\zipfs.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\javaws.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\jce.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\jfr.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\jfxswt.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\jsse.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\management-agent.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\plugin.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\resources.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\rt.jar;D:\\programming\\kafka-demo2\\target\\test-classes;C:\\Users\\Jensen\\.m2\\repository\\org\\apache\\kafka\\kafka-clients\\1.0.2\\kafka-clients-1.0.2.jar;C:\\Users\\Jensen\\.m2\\repository\\org\\lz4\\lz4-java\\1.4\\lz4-java-1.4.jar;C:\\Users\\Jensen\\.m2\\repository\\org\\xerial\\snappy\\snappy-java\\1.1.4\\snappy-java-1.1.4.jar;C:\\Users\\Jensen\\.m2\\repository\\org\\slf4j\\slf4j-api\\1.7.25\\slf4j-api-1.7.25.jar;C:\\Users\\Jensen\\.m2\\repository\\org\\slf4j\\slf4j-simple\\1.7.2\\slf4j-simple-1.7.2.jar" Producer
- [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
- acks = 1
- batch.size = 232840
- bootstrap.servers = [ckafka-in5yxxxx.ap-guangzhou.ckafka.tencentcloudmq.com:6002]
- buffer.memory = 33554432
- client.id =
- compression.type = none
- connections.max.idle.ms = 540000
- enable.idempotence = false
- interceptor.classes = null
- key.serializer = class org.apache.kafka.common.serialization.StringSerializer
- linger.ms = 10
- max.block.ms = 3000
- max.in.flight.requests.per.connection = 5
- max.request.size = 1048576
- metadata.max.age.ms = 300000
- metric.reporters = []
- metrics.num.samples = 2
- metrics.recording.level = INFO
- metrics.sample.window.ms = 30000
- partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
- receive.buffer.bytes = 32768
- reconnect.backoff.max.ms = 1000
- reconnect.backoff.ms = 50
- request.timeout.ms = 30000
- retries = 3
- retry.backoff.ms = 100
- sasl.jaas.config = [hidden]
- sasl.kerberos.kinit.cmd = /usr/bin/kinit
- sasl.kerberos.min.time.before.relogin = 60000
- sasl.kerberos.service.name = null
- sasl.kerberos.ticket.renew.jitter = 0.05
- sasl.kerberos.ticket.renew.window.factor = 0.8
- sasl.mechanism = PLAIN
- security.protocol = SASL_PLAINTEXT
- send.buffer.bytes = 131072
- ssl.cipher.suites = null
- ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
- ssl.endpoint.identification.algorithm = null
- ssl.key.password = null
- ssl.keymanager.algorithm = SunX509
- ssl.keystore.location = null
- ssl.keystore.password = null
- ssl.keystore.type = JKS
- ssl.protocol = TLS
- ssl.provider = null
- ssl.secure.random.implementation = null
- ssl.trustmanager.algorithm = PKIX
- ssl.truststore.location = null
- ssl.truststore.password = null
- ssl.truststore.type = JKS
- transaction.timeout.ms = 60000
- transactional.id = null
- value.serializer = class org.apache.kafka.common.serialization.StringSerializer
-
- [main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in.
- [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'session.timeout.ms' was supplied but isn't a known config.
- [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.2
- [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 2a121f7b1d402825
- record:0
- record:1
- record:2
- record:3
- record:4
- record:5
- record:6
- record:7
- record:8
- record:9
- record:10
- record:11
- 。。。。。
- record:90
- record:91
- record:92
- record:93
- record:94
- record:95
- record:96
- record:97
- record:98
- record:99
- [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
- 消息发送:offset:1382 timestamp:1592221545987 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1383 timestamp:1592221546002 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1384 timestamp:1592221546003 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1385 timestamp:1592221546003 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1386 timestamp:1592221546003 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1387 timestamp:1592221546003 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1388 timestamp:1592221546003 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1389 timestamp:1592221546003 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1390 timestamp:1592221546004 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1391 timestamp:1592221546004 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1392 timestamp:1592221546004 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1393 timestamp:1592221546004 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1394 timestamp:1592221546004 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1395 timestamp:1592221546004 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1396 timestamp:1592221546005 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1397 timestamp:1592221546005 topic:jasen partition:0
- 消息发送成功
- ......
- 消息发送:offset:1476 timestamp:1592221546014 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1477 timestamp:1592221546014 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1478 timestamp:1592221546014 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1479 timestamp:1592221546014 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1480 timestamp:1592221546014 topic:jasen partition:0
- 消息发送成功
- 消息发送:offset:1481 timestamp:1592221546014 topic:jasen partition:0
- 消息发送成功
-
- Process finished with exit code 0
-
四、尽情的消费吧--Consumer.java
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import java.util.Collections;
- import java.util.Properties;
-
- public class Consumer {
- public static void main(String[] args) {
- String topicName = "jasen";
- String groupId = "test-group";
-
- Properties props = new Properties();
- props.put("bootstrap.servers", "ckafka-in5yxxxx.ap-guangzhou.ckafka.tencentcloudmq.com:6002");
- props.put("group.id", groupId);
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
- props.put("session.timeout.ms", 30000);
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "1000");
- props.put("auto.offset.reset", "earliest");
- props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\\"ckafka-in5yxxxx#Jensen\\" password=\\"xxxx\\";");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
- // 订阅主题
- consumer.subscribe(Collections.singletonList(topicName));
- try {
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(1000);
- for (ConsumerRecord<String, String> record : records) {
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- }
- }
- } finally {
- consumer.close();
- }
- }
- }
-
成不成功就看你的:
- "C:\\Program Files\\Java\\jdk1.8.0_161\\bin\\java.exe" "-javaagent:D:\\program files\\JetBrains\\IntelliJ IDEA Community Edition 2019.3.3\\lib\\idea_rt.jar=51176:D:\\program files\\JetBrains\\IntelliJ IDEA Community Edition 2019.3.3\\bin" -Dfile.encoding=UTF-8 -classpath "C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\charsets.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\deploy.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\access-bridge-64.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\cldrdata.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\dnsns.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\jaccess.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\jfxrt.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\localedata.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\nashorn.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\sunec.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\sunjce_provider.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\sunmscapi.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\sunpkcs11.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\ext\\zipfs.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\javaws.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\jce.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\jfr.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\jfxswt.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\jsse.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\management-agent.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\plugin.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\resources.jar;C:\\Program Files\\Java\\jdk1.8.0_161\\jre\\lib\\rt.jar;D:\\programming\\kafka-demo2\\target\\test-classes;C:\\Users\\Jensen\\.m2\\repository\\org\\apache\\kafka\\kafka-clients\\1.0.2\\kafka-clients-1.0.2.jar;C:\\Users\\Jensen\\.m2\\repository\\org\\lz4\\lz4-java\\1.4\\lz4-java-1.4.jar;C:\\Users\\Jensen\\.m2\\repository\\org\\xerial\\snappy\\snappy-java\\1.1.4\\snappy-java-1.1.4.jar;C:\\Users\\Jensen\\.m2\\repository\\org\\slf4j\\slf4j-api\\1.7.25\\slf4j-api-1.7.25.jar;C:\\Users\\Jensen\\.m2\\repository\\org\\slf4j\\slf4j-simple\\1.7.2\\slf4j-simple-1.7.2.jar" Consumer
- [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
- auto.commit.interval.ms = 1000
- auto.offset.reset = earliest
- bootstrap.servers = [ckafka-in5yxxxx.ap-guangzhou.ckafka.tencentcloudmq.com:6002]
- check.crcs = true
- client.id =
- connections.max.idle.ms = 540000
- enable.auto.commit = true
- exclude.internal.topics = true
- fetch.max.bytes = 52428800
- fetch.max.wait.ms = 500
- fetch.min.bytes = 1
- group.id = test-group
- heartbeat.interval.ms = 3000
- interceptor.classes = null
- internal.leave.group.on.close = true
- isolation.level = read_uncommitted
- key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
- max.partition.fetch.bytes = 1048576
- max.poll.interval.ms = 300000
- max.poll.records = 500
- metadata.max.age.ms = 300000
- metric.reporters = []
- metrics.num.samples = 2
- metrics.recording.level = INFO
- metrics.sample.window.ms = 30000
- partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
- receive.buffer.bytes = 65536
- reconnect.backoff.max.ms = 1000
- reconnect.backoff.ms = 50
- request.timeout.ms = 305000
- retry.backoff.ms = 100
- sasl.jaas.config = [hidden]
- sasl.kerberos.kinit.cmd = /usr/bin/kinit
- sasl.kerberos.min.time.before.relogin = 60000
- sasl.kerberos.service.name = null
- sasl.kerberos.ticket.renew.jitter = 0.05
- sasl.kerberos.ticket.renew.window.factor = 0.8
- sasl.mechanism = PLAIN
- security.protocol = SASL_PLAINTEXT
- send.buffer.bytes = 131072
- session.timeout.ms = 30000
- ssl.cipher.suites = null
- ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
- ssl.endpoint.identification.algorithm = null
- ssl.key.password = null
- ssl.keymanager.algorithm = SunX509
- ssl.keystore.location = null
- ssl.keystore.password = null
- ssl.keystore.type = JKS
- ssl.protocol = TLS
- ssl.provider = null
- ssl.secure.random.implementation = null
- ssl.trustmanager.algorithm = PKIX
- ssl.truststore.location = null
- ssl.truststore.password = null
- ssl.truststore.type = JKS
- value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
-
- [main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in.
- [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.2
- [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 2a121f7b1d402825
- [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-group] Discovered group coordinator 111.230.124.164:18006 (id: 2147472928 rack: null)
- [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=test-group] Revoking previously assigned partitions []
- [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
- [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=test-group] Successfully joined group with generation 9
- [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=test-group] Setting newly assigned partitions [jasen-0]
- [main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=test-group] Fetch offset 942 is out of range for partition jasen-0, resetting offset
- offset = 1382, key = 0, value = 0
- offset = 1383, key = 1, value = 1
- offset = 1384, key = 2, value = 2
- offset = 1385, key = 3, value = 3
- offset = 1386, key = 4, value = 4
- offset = 1387, key = 5, value = 5
- offset = 1388, key = 6, value = 6
- offset = 1389, key = 7, value = 7
- offset = 1390, key = 8, value = 8
- offset = 1391, key = 9, value = 9
- offset = 1392, key = 10, value = 10
- offset = 1393, key = 11, value = 11
- offset = 1394, key = 12, value = 12
- offset = 1395, key = 13, value = 13
- offset = 1396, key = 14, value = 14
- offset = 1397, key = 15, value = 15
- offset = 1398, key = 16, value = 16
- 。。。。。就是一百条数据,绝对没少
- offset = 1464, key = 82, value = 82
- offset = 1465, key = 83, value = 83
- offset = 1466, key = 84, value = 84
- offset = 1467, key = 85, value = 85
- offset = 1468, key = 86, value = 86
- offset = 1469, key = 87, value = 87
- offset = 1470, key = 88, value = 88
- offset = 1471, key = 89, value = 89
- offset = 1472, key = 90, value = 90
- offset = 1473, key = 91, value = 91
- offset = 1474, key = 92, value = 92
- offset = 1475, key = 93, value = 93
- offset = 1476, key = 94, value = 94
- offset = 1477, key = 95, value = 95
- offset = 1478, key = 96, value = 96
- offset = 1479, key = 97, value = 97
- offset = 1480, key = 98, value = 98
- offset = 1481, key = 99, value = 99
五、总结一下:
香不香还得用了才知道吧,不信你自己搭建一 个kafka集群吧,zookeeper集群也不能少,万一要扩容怎么办?