最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

[Kafka] Kafka集群搭建

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-21 12:18:04 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
1、什么是Kafka
1、kafka是一个分布式的消息缓存系统
2、kafka集群中的服务器都叫做broker
3、kafka有两类客户端,一类叫producer(消息生产者),一类叫做consumer(消息消费者),客户端和broker服务器之间采用tcp协议连接
4、kafka中不同业务系统的消息可以通过topic进行区分,而且每一个消息topic都会被分区,以分担消息读写的负载
5、每一个分区都可以有多个副本,以防止数据的丢失
6、某一个分区中的数据如果需要更新,都必须通过该分区所有副本中的leader来更新
7、消费者可以分组,比如有两个消费者组A和B,共同消费一个topic:order_info,A和B所消费的消息不会重复
比如 order_info 中有100个消息,每个消息有一个id,编号从0-99,那么,如果A组消费0-49号,B组就消费50-99号
8、消费者在具体消费某个topic中的消息时,可以指定起始偏移量





2、Kafka集群搭建
[AppleScript] 纯文本查看 复制代码
前提是在Slave5 Slave6 Slave7上安装了zookeeper集群。[/color][/size][/font]
[font=微软雅黑][size=3][color=#000000]1、解压
2、在Slave5机器上进入到config目录中修改server.properties
zookeeper.connect=Slave5:2181,Slave6:2181,Slave7:2181
broker.id=0
3、复制到其他两台机器上
scp -r Kafka Slave6:/hadoop/app/
scp -r Kafka Slave6:/hadoop/app/

分别修改Slave6 Slave7机器上的server.properties文件中的
broker.id=1(Slave6上)
broker.id=2(Slave7上)

4、将zookeeper集群启动

5、在每一台节点上启动broker
bin/kafka-server-start.sh config/server.properties


6、在kafka集群中创建一个topic
bin/kafka-topics.sh --create --zookeeper Slave5:2181 --replication-factor 3 --partitions 1 --topic order

7、用一个producer向某一个topic中写入消息
bin/kafka-console-producer.sh --broker-list Slave5:9092 --topic order

8、用一个comsumer从某一个topic中读取信息
bin/kafka-console-consumer.sh --zookeeper Slave5:2181 --from-beginning --topic order

9、查看一个topic的分区及副本状态信息
bin/kafka-topics.sh --describe --zookeeper Slave5:2181 --topic order 
3、整合Kafka和storm


4、Kafka以及整合的API代码示例


4.1 Kafka API
[AppleScript] 纯文本查看 复制代码
package cn.itcast.kafka;[/color][/size][/font]
[font=微软雅黑][size=3][color=#000000]
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class ProducerDemo {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        //设置zookeeper机器
        props.put("zk.connect", "Slave5:2181,Slave6:2181,Slave7:2181");
        //设置broker所在的机器
        props.put("metadata.broker.list","Slave5:9092,Slave6:9092,Slave7:9092");
        //序列化,根据Producer<String, String>  如果Producer<String, Int>则为kafka.serializer.IntEncoder
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);

        // 发送业务消息
        // 一般是读取内存  读取内存数据库  读socket端口中的数据
        for (int i = 1; i <= 100; i++) {
            Thread.sleep(500);
            producer.send(new KeyedMessage<String, String>("wordcount",
                    "i said i love you baby for" + i + "times,will you have a nice day with me tomorrow"));
        }

    }
} 

ConsumerDemo.java
[AppleScript] 纯文本查看 复制代码
package cn.itcast.kafka;[/color][/size][/font]
[font=微软雅黑][size=3][color=#000000]
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class ConsumerDemo {
    private static final String topic = "mysons";
    private static final Integer threads = 1;

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("zookeeper.connect", "Slave5:2181,Slave6:2181,Slave7:2181");
        //设置组id
        props.put("group.id", "1111");
        //偏移量重新设置
        props.put("auto.offset.reset", "smallest");

        ConsumerConfig config = new ConsumerConfig(props);
        //创建java连接
        ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1);
        topicCountMap.put("mygirls", 1);
        topicCountMap.put("myboys", 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("mygirls");

        for(final KafkaStream<byte[], byte[]> kafkaStream : streams){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
                        String msg = new String(mm.message());
                        System.out.println(msg);
                    }
                }

            }).start();

        }
    }
}
4.2 Kafka——storm整合API


整合之前除了导入Kafka和storm lib目录下的jar包还需要先导入整合包storm-kafka-0.9.2-incubating.jar。


WordSpliter.java
[AppleScript] 纯文本查看 复制代码
package cn.itcast.storm.bolt;[/color][/size][/font]
[font=微软雅黑][size=3][color=#000000]
import org.apache.commons.lang.StringUtils;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordSpliter extends BaseBasicBolt {

    private static final long serialVersionUID = -5653803832498574866L;

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String line = input.getString(0);
        String[] words = line.split(" ");
        for (String word : words) {
            word = word.trim();
            if (StringUtils.isNotBlank(word)) {
                word = word.toLowerCase();
                collector.emit(new Values(word));
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
} 


WriterBolt.java
[AppleScript] 纯文本查看 复制代码
package cn.itcast.storm.bolt;[/color][/size][/font]
[font=微软雅黑][size=3][color=#000000]
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
/**
 * 将数据写入文件
 * @author [email]duanhaitao@itcast.cn[/email]
 *
 */
public class WriterBolt extends BaseBasicBolt {

    private static final long serialVersionUID = -6586283337287975719L;

    private FileWriter writer = null;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        try {
            writer = new FileWriter("c:\\storm-kafka\\" + "wordcount"+UUID.randomUUID().toString());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }


    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }


    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String s = input.getString(0);
        try {
            writer.write(s);
            writer.write("\n");
            writer.flush();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
} 


MessageScheme.java
[AppleScript] 纯文本查看 复制代码
package cn.itcast.storm.spout;

import java.io.UnsupportedEncodingException;
import java.util.List;

import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class MessageScheme implements Scheme {

    private static final long serialVersionUID = 8423372426211017613L;

    @Override
    public List<Object> deserialize(byte[] bytes) {
            try {
                String msg = new String(bytes, "UTF-8");
                return new Values(msg); 
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields("msg");
    }

} 



KafkaTopo.java
[AppleScript] 纯文本查看 复制代码
package cn.itcast.storm.topology;[/color][/size][/font]
[font=微软雅黑][size=3][color=#000000]
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import cn.itcast.storm.bolt.WordSpliter;
import cn.itcast.storm.bolt.WriterBolt;
import cn.itcast.storm.spout.MessageScheme;

public class KafkaTopo {

    public static void main(String[] args) throws Exception {

        String topic = "wordcount";
        String zkRoot = "/kafka-storm";
        String spoutId = "KafkaSpout";
        //指定broker主机
        BrokerHosts brokerHosts = new ZkHosts("Slave5:2181,Slave6:2181,Slave7:2181"); 

        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "wordcount", zkRoot, spoutId);
        //消息是否从头读起
        spoutConfig.forceFromStart = true;
        //制定消息格式
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
        TopologyBuilder builder = new TopologyBuilder();
        //设置一个spout用来从kaflka消息队列中读取数据并发送给下一级的bolt组件,此处用的spout组件并非自定义的,而是storm中已经开发好的KafkaSpout
        builder.setSpout("KafkaSpout", new KafkaSpout(spoutConfig));
        builder.setBolt("word-spilter", new WordSpliter()).shuffleGrouping(spoutId);
        builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("word-spilter", new Fields("word"));
        Config conf = new Config();
        conf.setNumWorkers(4);
        conf.setNumAckers(0);
        conf.setDebug(false);

        //LocalCluster用来将topology提交到本地模拟器运行,方便开发调试
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("WordCount", conf, builder.createTopology());

        //提交topology到storm集群中运行
//      StormSubmitter.submitTopology("sufei-topo", conf, builder.createTopology());
    }

} 


读取配置文件的一个工具类


config.properties
[AppleScript] 纯文本查看 复制代码
zkConnect=master:2181[/color][/size][/font]
[font=微软雅黑][size=3][color=#000000]zkSessionTimeoutMs=30000
zkConnectionTimeoutMs=30000
zkSyncTimeMs=5000

scheme=date,id,content
separator=,
target=date


PropertyUtil.java
[AppleScript] 纯文本查看 复制代码
package cn.itcast.storm.utils;[/color][/size][/font]
[font=微软雅黑][size=3][color=#000000]
import java.io.InputStream;
import java.util.Properties;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * 属性配置读取工具
 */
public class PropertyUtil {

    private static final Log log = LogFactory.getLog(PropertyUtil.class);
    private static Properties pros = new Properties();

    // 加载属性文件
    static {
        try {
            InputStream in = PropertyUtil.class.getClassLoader().getResourceAsStream("config.properties");
            pros.load(in);
        } catch (Exception e) {
            log.error("load configuration error", e);
        }
    }

    /**
     * 读取配置文中的属性值
     * @param key
     * @return
     */
    public static String getProperty(String key) {
        return pros.getProperty(key);
    }

} 


作者:Jorocco
原文:https://blog.csdn.net/Jorocco/article/details/80179632


楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-5-16 23:35

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表