马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据
您需要 登录 才可以下载或查看,没有帐号?立即注册
x
本帖最后由 168主编 于 2020-3-30 13:37 编辑
作者:独孤风
来源:https://www.cnblogs.com/tree1123/p/11150927.html
一、下载Kafka截至2019年7月8日 最新版本为 2.3.0 2.12为编译的scala版本 2.3.0为kafka版本 二、启动服务要先启动zookeeper kafka内置了一个 也可以不用 > bin/zookeeper-server-start.sh config/zookeeper.properties[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)...> bin/kafka-server-start.sh config/server.properties[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)...三、创建topicreplication-factor为1 partitions为1> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test查看topic> bin/kafka-topics.sh --list --bootstrap-server localhost:9092test也可以不创建topic 设置自动创建 当publish的时候 四、发送消息用command line client 进行测试 一行就是一条消息 > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testThis is a messageThis is another message五、消费者command line consumer 可以接收消息 > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginningThis is a messageThis is another message六、设置多broker集群单broker没有意思 我们可以设置三个broker 首先为每个broker 复制配置文件 > cp config/server.properties config/server-1.properties> cp config/server.properties config/server-2.properties然后编辑 config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dirs=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dirs=/tmp/kafka-logs-2broker.id是唯一的 cluster中每一个node的名字 我们在same machine上 所有要设置listeners和log.dirs 以防冲突 建一个topic 一个partitions 三个replication-factor > bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic用describe看看都是什么情况> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topicTopic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0有几个概念 : "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions. "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive. "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
刚才那个topic> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic testTopic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0发送 接收 > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic...my test message 1my test message 2^C> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic...my test message 1my test message 2^C试一下容错 fault-tolerance > ps aux | grep server-1.properties7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...> kill -9 7564看一下变化:Leader换了一个 因为1被干掉了> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topicTopic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0还是收到了消息> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic...my test message 1my test message 2^C七、使用kafka import/export data刚才都是console 的数据,其他的sources other systems呢 用Kafka Connect 弄一个数据> echo -e "foo\nbar" > test.txt启动 指定配置文件> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties验证一下> more test.sink.txtfoobar消费者端> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning{"schema":{"type":"string","optional":false},"payload":"foo"}{"schema":{"type":"string","optional":false},"payload":"bar"}...可以继续写入> echo Another line>> test.txt八、使用Kafka Streamshttp://kafka.apache.org/22/documentation/streams/quickstart WordCountDemo https://github.com/apache/kafka/blob/2.2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java 代码片段 // Serializers/deserializers (serde) for String and Long typesfinal Serde<String> stringSerde = Serdes.String();final Serde<Long> longSerde = Serdes.Long(); // Construct a `KStream` from the input topic "streams-plaintext-input", where message values// represent lines of text (for the sake of this example, we ignore whatever may be stored// in the message keys).KStream<String, String> textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde); KTable<String, Long> wordCounts = textLines // Split each text line, by whitespace, into words. .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) // Group the text words as message keys .groupBy((key, value) -> value) // Count the occurrences of each word (message key). .count() // Store the running counts as a changelog stream to the output topic.wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));建一个 Kafka producer 指定input topic output topic > bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic streams-wordcount-output \ --config cleanup.policy=compactCreated topic "streams-wordcount-output".启动WordCount demo application bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo启动一个生产者写数据 > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-inputall streams lead to kafkahello kafka streams启动一个消费者接数据 > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer all 1streams 1lead 1to 1kafka 1hello 1kafka 2streams 2kafka 1
|