最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

[Kafka] Hadoop—Kafka强化

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-21 11:01:21 | 只看该作者 |只看大图 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
本帖最后由 168主编 于 2018-12-21 17:00 编辑

1、kafka的特点

分布式流处理平台。在系统之间构建实时数据流管道。以topic分类对记录进行存储,每个记录包含key-value+timestamp每秒钟百万消息吞吐量。

[AppleScript] 纯文本查看 复制代码
producer            //消息生产者[/color][/size][/font]
[font=微软雅黑][size=3][color=#000000]consumer            //消息消费者
consumer group      //消费者组
kafka server        //broker,kafka服务器
topic               //主题,副本数,分区.






2、安装kafaka

0.选择s202 ~ s204三台主机安装kafka

1.准备zk


2.jdk


3.tar文件

4.环境变量


5.配置kafka
[AppleScript] 纯文本查看 复制代码
[kafka/config/server.properties][/color][/size][/font]
[font=微软雅黑][size=3][color=#000000]    ...
    broker.id=202
    ...
    listeners=PLAINTEXT://:9092
    ...
    log.dirs=/home/centos/kafka/logs
    ...
    zookeeper.connect=s201:2181,s202:2181,s203:2181 
6.分发server.properties,同时修改每个文件的broker.id为IP地址最后一组,其他的也可以,只要保证唯一性即可。


7.启动kafka服务器

a)先启动zk

b)启动kafka

[AppleScript] 纯文本查看 复制代码
[s202 ~ s204][/color][/size][/font]
[font=微软雅黑][size=3][color=#000000]$>bin/kafka-server-start.sh -daemon config/server.properties
c)验证kafka服务器是否启动
$>netstat -anop | grep 9092
8.创建主题


[AppleScript] 纯文本查看 复制代码
bin$> ./kafka-topics.sh --create --zookeeper s201:2181 --replication-factor 3 --partitions 3 --topic test

9.查看主题列表
[AppleScript] 纯文本查看 复制代码
bin$> ./kafka-topics.sh --list --zookeeper s201:2181



10.启动控制台生产者
[AppleScript] 纯文本查看 复制代码
bin$> ./kafka-console-producer.sh --broker-list s202:9092 --topic test

11.启动控制台消费者
[AppleScript] 纯文本查看 复制代码
bin$> ./kafka-console-consumer.sh --bootstrap-server s202:9092 --topic test --from-beginning --zookeeper s202:2181

12.在生产者控制台输入hello world


3、kafka集群在zk的配置
[AppleScript] 纯文本查看 复制代码
-----------------------
    /controller         ===>    {"version":1,"brokerid":202,"timestamp":"1490926369148"

    /controller_epoch   ===>    1

    /brokers
    /brokers/ids
    /brokers/ids/202    ===>    {"jmx_port":-1,"timestamp":"1490926370304","endpoints":["PLAINTEXT://s202:9092"],"host":"s202","version":3,"port":9092}
    /brokers/ids/203
    /brokers/ids/204    


    /brokers/topics/test/partitions/0/state ===>{"controller_epoch":1,"leader":203,"version":1,"leader_epoch":0,"isr":[203,204,202]}
    /brokers/topics/test/partitions/1/state ===>...
    /brokers/topics/test/partitions/2/state ===>...

    /brokers/seqid      ===> null

    /admin
    /admin/delete_topics/test       ===>标记删除的主题

    /isr_change_notification

    /consumers/xxxx/
    /config 

创建主题


每个分区都有leader,follow,follow就是副本个数,它具有容错性,直到所有的副本都挂掉,否则它还能保持容错。


repliation_factor 2 partitions 5:表示2个副本,五个分区,即每个分区具有2个副本,它是以文件夹的形式存在的


$>kafka-topic.sh --zookeeper s202:2181 --replication_factor 2 --partitions 5 --create --topic test3


2 x 5 = 10 //10个文件夹
[AppleScript] 纯文本查看 复制代码
   [s202][/color][/size][/font]
[font=微软雅黑][size=3][color=#000000]    test2-1         //
    test2-2         //
    test2-3         //

    [s203]
    test2-0
    test2-2
    test2-3
    test2-4

    [s204]
    test2-0
    test2-1
    test2-4 
4、重新布局

重新布局分区和副本,手动再平衡


[AppleScript] 纯文本查看 复制代码
$>kafka-topics.sh --alter --zookeeper s202:2181 --topic test2 --replica-assignment 203:204,203:204,203:204,203:204,203:204



副本
broker存放消息以消息达到顺序存放。生产和消费都是副本感知的。
支持到n-1故障。每个分区都有leader,follow.
leader挂掉时,消息分区写入到本地log或者,向生产者发送消息确认回执之前,生产者向新的leader发送消息。
新leader的选举是通过isr进行,第一个注册的follower成为leader。


kafka支持副本模式


同步复制
[AppleScript] 纯文本查看 复制代码
1.producer联系zk识别leader[/color][/size][/font]
[font=微软雅黑][size=3][color=#000000]2.向leader发送消息
3.leadr收到消息写入到本地log
4.follower从leader pull消息
5.follower向本地写入log
6.follower向leader发送ack消息
7.leader收到所有follower的ack消息
8.leader向producer回传ack 




异步副本
和同步复制的区别在与leader写入本地log之后,
直接向client回传ack消息,不需要等待所有follower复制完成。


5、通过java API实现消息生产者,发送消息
[AppleScript] 纯文本查看 复制代码
package cn.ctgu.kafkademo.test;[/color][/size][/font]
[font=微软雅黑][size=3][color=#000000]
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import org.junit.Test;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * Created by Administrator on 2017/3/31.
 */
public class TestProducer {

    //创建生产者
    @Test
    public void testSend(){
        Properties props = new Properties();
        //broker列表
        props.put("metadata.broker.list", "s2:9092");
        //串行化
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        //
        props.put("request.required.acks", "1");

        //创建生产者配置对象
        ProducerConfig config = new ProducerConfig(props);

        //创建生产者
        Producer<String, String> producer = new Producer<String, String>(config);

        KeyedMessage<String, String> msg = new KeyedMessage<String, String>("test3","100" ,"hello world tomas100");
        producer.send(msg);
        System.out.println("send over!");
    }
    //创建消费者
    @Test
    public void testConsumer(){
        Properties props = new Properties();
        props.put("zookeeper.connect", "s2:2181");
        props.put("group.id", "g3");
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        //创建消费者配置对象
        ConsumerConfig config = new ConsumerConfig(props);
        //
        Map<String, Integer> map = new HashMap<String, Integer>();
        map.put("test", new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> msgs = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)).createMessageStreams(map);
        List<KafkaStream<byte[], byte[]>> msgList = msgs.get("test");
        for(KafkaStream<byte[],byte[]> stream : msgList){
            ConsumerIterator<byte[],byte[]> it = stream.iterator();
            while(it.hasNext()){
                byte[] message = it.next().message();
                System.out.println(new String(message));
            }
        }
    }
} 
6、flume集成kafka



6.1KafkaSink [生产者]
[AppleScript] 纯文本查看 复制代码
a1.sources = r1[/color][/size][/font]
[font=微软雅黑][size=3][color=#000000]        a1.sinks = k1
        a1.channels = c1

        a1.sources.r1.type=netcat
        a1.sources.r1.bind=localhost
        a1.sources.r1.port=8888

        a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
        a1.sinks.k1.kafka.topic = test3
        a1.sinks.k1.kafka.bootstrap.servers = s202:9092
        a1.sinks.k1.kafka.flumeBatchSize = 20
        a1.sinks.k1.kafka.producer.acks = 1

        a1.channels.c1.type=memory

        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1 

6.2 KafkaSource[消费者]


     
[AppleScript] 纯文本查看 复制代码
   a1.sources = r1[/color][/size][/font]
[font=微软雅黑][size=3][color=#000000]        a1.sinks = k1
        a1.channels = c1

        a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
        a1.sources.r1.batchSize = 5000
        a1.sources.r1.batchDurationMillis = 2000
        a1.sources.r1.kafka.bootstrap.servers = s202:9092
        a1.sources.r1.kafka.topics = test3
        a1.sources.r1.kafka.consumer.group.id = g4

        a1.sinks.k1.type = logger

        a1.channels.c1.type=memory

        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1 


6.3 Channel[生产者 + 消费者]
[AppleScript] 纯文本查看 复制代码
    a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        a1.sources.r1.type = avro
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888

        a1.sinks.k1.type = logger

        a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
        a1.channels.c1.kafka.bootstrap.servers = s202:9092
        a1.channels.c1.kafka.topic = test3
        a1.channels.c1.kafka.consumer.group.id = g6

        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1 

   
---------------------
作者:Jorocco
原文:https://blog.csdn.net/jorocco/article/details/80807163

楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-4-29 16:41

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表