体育资讯网

您现在的位置是:首页 > 足球教学 > 正文

足球教学

kafka副本同步源码(kafka主从同步)

hacker2022-06-13 19:26:29足球教学37
本文目录一览:1、apachekafka源码怎么编译

本文目录一览:

apache kafka源码怎么编译

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.(Kafka是一个分布式的、可分区的(partitioned)、基于备份的(replicated)和commit-log存储的服务.。它提供了类似于messaging system的特性,但是在设计实现上完全不同)。kafka是一种高吞吐量的分布式发布订阅消息系统,它有如下特性:

(1)、通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。

(2)、高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。

(3)、支持通过kafka服务器和消费机集群来分区消息。

(4)、支持Hadoop并行数据加载。

一、用Kafka里面自带的脚本进行编译

下载好了Kafka源码,里面自带了一个gradlew的脚本,我们可以利用这个编译Kafka源码:

1 # wget

2 # tar -zxf kafka-0.8.1.1-src.tgz

3 # cd kafka-0.8.1.1-src

4 # ./gradlew releaseTarGz

运行上面的命令进行编译将会出现以下的异常信息:

01 :core:signArchives FAILED

02

03 FAILURE: Build failed with an exception.

04

05 * What went wrong:

06 Execution failed for task ':core:signArchives'.

07 Cannot perform signing task ':core:signArchives' because it

08 has no configured signatory

09

10 * Try:

11 Run with --stacktrace option to get the stack trace. Run with

12 --info or --debug option to get more log output.

13

14 BUILD FAILED

这是一个bug(),可以用下面的命令进行编译

1 ./gradlew releaseTarGzAll -x signArchives

这时候将会编译成功(在编译的过程中将会出现很多的)。在编译的过程中,我们也可以指定对应的Scala版本进行编译:

1 ./gradlew -PscalaVersion=2.10.3 releaseTarGz -x signArchives

编译完之后将会在core/build/distributions/里面生成kafka_2.10-0.8.1.1.tgz文件,这个和从网上下载的一样,可以直接用。

二、利用sbt进行编译

我们同样可以用sbt来编译Kafka,步骤如下:

01 # git clone

02 # cd kafka

03 # git checkout -b 0.8 remotes/origin/0.8

04 # ./sbt update

05 [info] [SUCCESSFUL ] org.eclipse.jdt#core;3.1.1!core.jar (2243ms)

06 [info] downloading ...

07 [info] [SUCCESSFUL ] ant#ant;1.6.5!ant.jar (1150ms)

08 [info] Done updating.

09 [info] Resolving org.apache.hadoop#hadoop-core;0.20.2 ...

10 [info] Done updating.

11 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...

12 [info] Done updating.

13 [info] Resolving com.yammer.metrics#metrics-annotation;2.2.0 ...

14 [info] Done updating.

15 [success] Total time: 168 s, completed Jun 18, 2014 6:51:38 PM

16

17 # ./sbt package

18 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)

19 Getting Scala 2.8.0 ...

20 :: retrieving :: org.scala-sbt#boot-scala

21 confs: [default]

22 3 artifacts copied, 0 already retrieved (14544kB/27ms)

23 [success] Total time: 1 s, completed Jun 18, 2014 6:52:37 PM

对于Kafka 0.8及以上版本还需要运行以下的命令:

01 # ./sbt assembly-package-dependency

02 [info] Loading project definition from /export1/spark/kafka/project

03 [warn] Multiple resolvers having different access mechanism configured with

04 same name 'sbt-plugin-releases'. To avoid conflict, Remove duplicate project

05 resolvers (`resolvers`) or rename publishing resolver (`publishTo`).

06 [info] Set current project to Kafka (in build file:/export1/spark/kafka/)

07 [warn] Credentials file /home/wyp/.m2/.credentials does not exist

08 [info] Including slf4j-api-1.7.2.jar

09 [info] Including metrics-annotation-2.2.0.jar

10 [info] Including scala-compiler.jar

11 [info] Including scala-library.jar

12 [info] Including slf4j-simple-1.6.4.jar

13 [info] Including metrics-core-2.2.0.jar

14 [info] Including snappy-java-1.0.4.1.jar

15 [info] Including zookeeper-3.3.4.jar

16 [info] Including log4j-1.2.15.jar

17 [info] Including zkclient-0.3.jar

18 [info] Including jopt-simple-3.2.jar

19 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'

20 [warn] Merging 'org/xerial/snappy/native/README' with strategy 'rename'

21 [warn] Merging 'META-INF/maven/org.xerial.snappy/snappy-java/LICENSE'

22 with strategy 'rename'

23 [warn] Merging 'LICENSE.txt' with strategy 'rename'

24 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'

25 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'

26 [warn] Strategy 'discard' was applied to a file

27 [warn] Strategy 'rename' was applied to 5 files

28 [success] Total time: 3 s, completed Jun 18, 2014 6:53:41 PM

当然,我们也可以在sbt里面指定scala的版本:

01 !--

02 User: 过往记忆

03 Date: 14-6-18

04 Time: 20:20

05 bolg:

06 本文地址:

07 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货

08 过往记忆博客微信公共帐号:iteblog_hadoop

09 --

10 sbt "++2.10.3 update"

11 sbt "++2.10.3 package"

12 sbt "++2.10.3 assembly-package-dependency"

kafka 有java的源码吗

kafka副本同步源码我这里是使用kafka副本同步源码的是kafka副本同步源码,kafka自带kafka副本同步源码的zookeeper。

以及关于kafka的日志文件啊,都放在默认里即/tmp下,我没修改。保存默认的

1、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ jps

2625 Jps

2、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties

此刻,这时,会一直停在这,因为是前端运行。

另开一窗口,

3、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties

也是前端运行。

kafka获取数据的几种方式

一、基于Receiver的方式

这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。

然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

如何进行Kafka数据源连接

1、在maven添加依赖

dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming-kafka_2.10/artifactId version1.4.1/version/dependency

2、scala代码

val kafkaStream = {val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"val kafkaParams = Map("zookeeper.connect" - "zookeeper1:2181","group.id" - "spark-streaming-test","zookeeper.connection.timeout.ms" - "1000")val inputTopic = "input-topic"val numPartitionsOfInputTopic = 5val streams = (1 to numPartitionsOfInputTopic) map { _ =KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic - 1), StorageLevel.MEMORY_ONLY_SER).map(_._2)}val unifiedStream = ssc.union(streams)val sparkProcessingParallelism = 1 // You'd probably pick a higher value than 1 in production.unifiedStream.repartition(sparkProcessingParallelism)}

需要注意的要点

1、Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。

2、可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。

3、如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER。

二、基于Direct的方式

这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

这种方式有如下优点:

1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

3、一次且仅一次的事务机制:

基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。

基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

scala连接代码

val topics = Set("teststreaming")val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092" val kafkaParams = Map[String, String]("metadata.broker.list" - brokers, "serializer.class" - "kafka.serializer.StringEncoder")// Create a direct stream val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)val events = kafkaStream.flatMap(line = {Some(line.toString())})

三、总结:两种方式在生产中都有广泛的应用,新api的Direct应该是以后的首选方式。

如何确定Kafka的分区数,key和consumer线程数,以及不消费问题解决

在Kafak中国社区的qq群中,这个问题被提及的比例是相当高的,这也是Kafka用户最常碰到的问题之一。本文结合Kafka源码试图对该问题相关的因素进行探讨。希望对大家有所帮助。

怎么确定分区数?

“我应该选择几个分区?”——如果你在Kafka中国社区的群里,这样的问题你会经常碰到的。不过有些遗憾的是,我们似乎并没有很权威的答案能够解答这样的问题。其实这也不奇怪,毕竟这样的问题通常都是没有固定答案的。Kafka官网上标榜自己是"high-throughput distributed messaging system",即一个高吞吐量的分布式消息引擎。那么怎么达到高吞吐量呢?Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。但是,这只是一个方面,毕竟单机优化的能力是有上限的。如何通过水平扩展甚至是线性扩展来进一步提升吞吐量呢? Kafka就是使用了分区(partition),通过将topic的消息打散到多个分区并分布保存在不同的broker上实现了消息处理(不管是producer还是consumer)的高吞吐量。

Kafka的生产者和消费者都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于producer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;而consumer呢,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费(具体如何确定consumer线程数目我们后面会详细说明)。所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。

但分区是否越多越好呢?显然也不是,因为每个分区都有自己的开销:

一、客户端/服务器端需要使用的内存就越多

先说说客户端的情况。Kafka 0.8.2之后推出了Java版的全新的producer,这个producer有个参数batch.size,默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。假设你有10000个分区,按照默认设置,这部分缓存需要占用约157MB的内存。而consumer端呢?我们抛开获取数据所需的内存不说,只说线程的开销。如果还是假设有10000个分区,同时consumer线程数要匹配分区数(大部分情况下是最佳的消费吞吐量配置)的话,那么在consumer client就要创建10000个线程,也需要创建大约10000个Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。

服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller,FetcherManager等,因此分区数越多,这种缓存的成本越久越大。

二、文件句柄的开销

每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄(file handler)。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。

三、降低高可用性

Kafka通过副本(replica)机制来保证高可用。具体做法就是为每个分区保存若干个副本(replica_factor指定副本数)。每个副本保存在不同的broker上。期中的一个副本充当leader 副本,负责处理producer和consumer请求。其他副本充当follower角色,由Kafka controller负责保证与leader的同步。如果leader所在的broker挂掉了,contorller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10个broker,也就是说平均每个broker上有1000个分区。此时这个broker挂掉了,那么zookeeper和controller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言,这必然要花更长的时间,并且通常不是线性累加的。如果这个broker还同时是controller情况就更糟了。

说了这么多“废话”,很多人肯定已经不耐烦了。那你说到底要怎么确定分区数呢?答案就是:视情况而定。基本上你还是需要通过一系列实验和测试来确定。当然测试的依据应该是吞吐量。虽然LinkedIn这篇文章做了Kafka的基准测试,但它的结果其实对你意义不大,因为不同的硬件、软件、负载情况测试出来的结果必然不一样。我经常碰到的问题类似于,官网说每秒能到10MB,为什么我的producer每秒才1MB? —— 且不说硬件条件,最后发现他使用的消息体有1KB,而官网的基准测试是用100B测出来的,因此根本没有可比性。不过你依然可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 = Tt / max(Tp, Tc)

Tp表示producer的吞吐量。测试producer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大, 因为Tc的值取决于你拿到消息之后执行什么操作,因此Tc的测试通常也要麻烦一些。

另外,Kafka并不能真正地做到线性扩展(其实任何系统都不能),所以你在规划你的分区数的时候最好多规划一下,这样未来扩展时候也更加方便。

消息-分区的分配

默认情况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions,如下图所示:

def partition(key: Any, numPartitions: Int): Int = {

Utils.abs(key.hashCode) % numPartitions

}

这就保证了相同key的消息一定会被路由到相同的分区。如果你没有指定key,那么Kafka是如何确定这条消息去往哪个分区的呢?

if(key == null) { // 如果没有指定key

val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有没有缓存的现成的分区Id

id match {

case Some(partitionId) =

partitionId // 如果有的话直接使用这个分区Id就好了

case None = // 如果没有的话,

val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出所有可用分区的leader所在的broker

if (availablePartitions.isEmpty)

throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)

val index = Utils.abs(Random.nextInt) % availablePartitions.size // 从中随机挑一个

val partitionId = availablePartitions(index).partitionId

sendPartitionPerTopicCache.put(topic, partitionId) // 更新缓存以备下一次直接使用

partitionId

}

}

可以看出,Kafka几乎就是随机找一个分区发送无key的消息,然后把这个分区号加入到缓存中以备后面直接使用——当然了,Kafka本身也会清空该缓存(默认每10分钟或每次请求topic元数据时)

如何设定consumer线程数

我个人的观点,如果你的分区数是N,那么最好线程数也保持为N,这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源,因为多出的线程不会被分配到任何分区。让我们来看看具体Kafka是如何分配的。

topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,即一个consumer线程可以消费多个分区的数据,比如Kafka提供的ConsoleConsumer,默认就只是一个线程来消费所有分区的数据。——其实ConsoleConsumer可以使用通配符的功能实现同时消费多个topic数据,但这和本文无关。

再讨论分配策略之前,先说说KafkaStream——它是consumer的关键类,提供了遍历方法用于consumer程序调用实现数据的消费。其底层维护了一个阻塞队列,所以在没有新消息到来时,consumer是处于阻塞状态的,表现出来的状态就是consumer程序一直在等待新消息的到来。——你当然可以配置成带超时的consumer,具体参看参数consumer.timeout.ms的用法。

下面说说 Kafka提供的两种分配策略: range和roundrobin,由参数partition.assignment.strategy指定,默认是range策略。本文只讨论range策略。所谓的range其实就是按照阶段平均分配。举个例子就明白了,假设你有10个分区,P0 ~ P9,consumer线程数是3, C0 ~ C2,那么每个线程都分配哪些分区呢?

C0 消费分区 0, 1, 2, 3

C1 消费分区 4, 5, 6

C2 消费分区 7, 8, 9

具体算法就是:

val nPartsPerConsumer = curPartitions.size / curConsumers.size // 每个consumer至少保证消费的分区数

val nConsumersWithExtraPart = curPartitions.size % curConsumers.size // 还剩下多少个分区需要单独分配给开头的线程们

...

for (consumerThreadId - consumerThreadIdSet) { // 对于每一个consumer线程

val myConsumerPosition = curConsumers.indexOf(consumerThreadId) //算出该线程在所有线程中的位置,介于[0, n-1]

assert(myConsumerPosition = 0)

// startPart 就是这个线程要消费的起始分区数

val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)

// nParts 就是这个线程总共要消费多少个分区

val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 nConsumersWithExtraPart) 0 else 1)

...

}

针对于这个例子,nPartsPerConsumer就是10/3=3,nConsumersWithExtraPart为10%3=1,说明每个线程至少保证3个分区,还剩下1个分区需要单独分配给开头的若干个线程。这就是为什么C0消费4个分区,后面的2个线程每个消费3个分区,具体过程详见下面的Debug截图信息:

ctx.myTopicThreadIds

nPartsPerConsumer = 10 / 3 = 3

nConsumersWithExtraPart = 10 % 3 = 1

第一次:

myConsumerPosition = 1

startPart = 1 * 3 + min(1, 1) = 4 ---也就是从分区4开始读

nParts = 3 + (if (1 + 1 1) 0 else 1) = 3 读取3个分区, 即4,5,6

第二次:

myConsumerPosition = 0

startPart = 3 * 0 + min(1, 0) =0 --- 从分区0开始读

nParts = 3 + (if (0 + 1 1) 0 else 1) = 4 读取4个分区,即0,1,2,3

第三次:

myConsumerPosition = 2

startPart = 3 * 2 + min(2, 1) = 7 --- 从分区7开始读

nParts = 3 + if (2 + 1 1) 0 else 1) = 3 读取3个分区,即7, 8, 9

至此10个分区都已经分配完毕

说到这里,经常有个需求就是我想让某个consumer线程消费指定的分区而不消费其他的分区。坦率来说,目前Kafka并没有提供自定义分配策略。做到这点很难,但仔细想一想,也许我们期望Kafka做的事情太多了,毕竟它只是个消息引擎,在Kafka中加入消息消费的逻辑也许并不是Kafka该做的事情。

不消费问题

第一步:参看消费者的基本情况

查看mwbops系统,【Consumer监控】--【对应的consumerId】

如果offset数字一直在动,说明一直在消费,说明不存在问题,return;

如果offset数字一直不动,看Owner是不是有值存在

如果Owner是空,说明消费端的程序已经跟Kafka断开连接,应该排查消费端是否正常,return;

如果Owner不为空,就是有上图上面的类似于 bennu_index_benuprdapp02-1444748505181-f558155a-0 的文字,继续看下面内容

第二步:查看消费端的程序代码

一般的消费代码是这样的

看看自己的消费代码里面,存不存在处理消息的时候出异常的情况

如果有,需要try-catch一下,其实不论有没有异常,都用try-catch包一下最好,如下面代码

return;

原因:如果在处理消息的时候有异常出现,又没有进行处理,那么while循环就会跳出,线程会结束,所以不会再去取消息,就是消费停止了。

第三步:查看消费端的配置

消费代码中一般以以下方式创建Consumer

消费端有一个配置,叫 fetch.message.max.bytes,默认是1M,此时如果有消息大于1M,会发生停止消费的情况。

此时,在配置中增加 props.put("fetch.message.max.bytes", "10 * 1024 * 1024"); 即可

return;

原因:目前Kafka集群配置的运行最大的消息大小是10M,如果客户端配置的运行接收的消息是1M,跟Kafka服务端配置的不一致,

则消息大于1M的情况下,消费端就无法消费,导致一直卡在这一条消息,现象就是消费停止。

发表评论

评论列表

  • 依疚千鲤(2022-06-13 21:31:25)回复取消回复

    opic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Rec

  • 萌懂鱼芗(2022-06-14 01:14:22)回复取消回复

    arts = 3 + (if (0 + 1 1) 0 else 1) = 4 读取4个分区,即0,1,2,3第三次:myConsumerPosition = 2s

  • 野欢勒言(2022-06-13 20:48:57)回复取消回复

    ,如果你的分区数是N,那么最好线程数也保持为N,这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源,因为多出的线程不会被分配到任何分区。让我们来看看具体Kafka是如何分配的。topic下的一个分区只能被同

  • 辙弃娇痞(2022-06-14 06:35:51)回复取消回复

    -annotation;2.2.0 ... 14 [info] Done updating. 15 [success] Total time: 168 s, completed Jun 18, 2014 6:51:38 PM 16 17

  • 森槿酷腻(2022-06-13 22:21:15)回复取消回复

    am val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDeco