Kafka:主要参数详解(转)
原文地址:http://kafka.apache.org/documentation.html
############################# System #############################<br/> #唯一标识在集群中的ID,要求是正数。<br/> broker.id=0<br/> #服务端口,默认9092<br/> port=9092<br/> #监听地址,不设为所有地址<br/> host.name=debugo01 # 处理网络请求的最大线程数<br/> num.network.threads=2<br/> # 处理磁盘I/O的线程数<br/> num.io.threads=8<br/> # 一些后台线程数<br/> background.threads = 4<br/> # 等待IO线程处理的请求队列最大数<br/> queued.max.requests = 500 # socket的发送缓冲区(SO_SNDBUF)<br/> socket.send.buffer.bytes=1048576<br/> # socket的接收缓冲区 (SO_RCVBUF)<br/> socket.receive.buffer.bytes=1048576<br/> # socket请求的最大字节数。为了防止内存溢出,message.max.bytes必然要小于<br/> socket.request.max.bytes = 104857600 ############################# Topic #############################<br/> # 每个topic的分区个数,更多的partition会产生更多的segment file<br/> num.partitions=2<br/> # 是否允许自动创建topic ,若是false,就需要通过命令创建topic<br/> auto.create.topics.enable =true<br/> # 一个topic ,默认分区的replication个数 ,不能大于集群中broker的个数。<br/> default.replication.factor =1<br/> # 消息体的最大大小,单位是字节<br/> message.max.bytes = 1000000 ############################# ZooKeeper #############################<br/> # Zookeeper quorum设置。如果有多个使用逗号分割<br/> zookeeper.connect=debugo01:2181,debugo02,debugo03<br/> # 连接zk的超时时间<br/> zookeeper.connection.timeout.ms=1000000<br/> # ZooKeeper集群中leader和follower之间的同步实际<br/> zookeeper.sync.time.ms = 2000 ############################# Log #############################<br/> #日志存放目录,多个目录使用逗号分割<br/> log.dirs=/var/log/kafka # 当达到下面的消息数量时,会将数据flush到日志文件中。默认10000<br/> #log.flush.interval.messages=10000<br/> # 当达到下面的时间(ms)时,执行一次强制的flush操作。interval.ms和interval.messages无论哪个达到,都会flush。默认3000ms<br/> #log.flush.interval.ms=1000<br/> # 检查是否需要将日志flush的时间间隔<br/> log.flush.scheduler.interval.ms = 3000 # 日志清理策略(delete|compact)<br/> log.cleanup.policy = delete<br/> # 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。<br/> log.retention.hours=168<br/> # 日志数据存储的最大字节数。超过这个时间会根据policy处理数据。<br/> #log.retention.bytes=1073741824 # 控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)<br/> log.segment.bytes=536870912<br/> # 当达到下面时间,会强制新建一个segment<br/> log.roll.hours = 24*7<br/> # 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(log.retention.hours或log.retention.bytes)<br/> log.retention.check.interval.ms=60000 # 是否开启压缩<br/> log.cleaner.enable=false<br/> # 对于压缩的日志保留的最长时间<br/> log.cleaner.delete.retention.ms = 1 day # 对于segment日志的索引文件大小限制<br/> log.index.size.max.bytes = 10 * 1024 * 1024<br/> #y索引计算的一个缓冲区,一般不需要设置。<br/> log.index.interval.bytes = 4096 ############################# replica #############################<br/> # partition management controller 与replicas之间通讯的超时时间<br/> controller.socket.timeout.ms = 30000<br/> # controller-to-broker-channels消息队列的尺寸大小<br/> controller.message.queue.size=10<br/> # replicas响应leader的最长等待时间,若是超过这个时间,就将replicas排除在管理之外<br/> replica.lag.time.max.ms = 10000<br/> # 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker<br/> controlled.shutdown.enable = false<br/> # 控制器关闭的尝试次数<br/> controlled.shutdown.max.retries = 3<br/> # 每次关闭尝试的时间间隔<br/> controlled.shutdown.retry.backoff.ms = 5000 # 如果relicas落后太多,将会认为此partition relicas已经失效。而一般情况下,因为网络延迟等原因,总会导致replicas中消息同步滞后。如果消息严重滞后,leader将认为此relicas网络延迟较大或者消息吞吐能力有限。在broker数量较少,或者网络不足的环境中,建议提高此值.<br/> replica.lag.max.messages = 4000<br/> #leader与relicas的socket超时时间<br/> replica.socket.timeout.ms= 30 * 1000<br/> # leader复制的socket缓存大小<br/> replica.socket.receive.buffer.bytes=64 * 1024<br/> # replicas每次获取数据的最大字节数<br/> replica.fetch.max.bytes = 1024 * 1024<br/> # replicas同leader之间通信的最大等待时间,失败了会重试<br/> replica.fetch.wait.max.ms = 500<br/> # 每一个fetch操作的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会等待直到数据达到这个大小<br/> replica.fetch.min.bytes =1<br/> # leader中进行复制的线程数,增大这个数值会增加relipca的IO<br/> num.replica.fetchers = 1<br/> # 每个replica将最高水位进行flush的时间间隔<br/> replica.high.watermark.checkpoint.interval.ms = 5000 # 是否自动平衡broker之间的分配策略<br/> auto.leader.rebalance.enable = false<br/> # leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡<br/> leader.imbalance.per.broker.percentage = 10<br/> # 检查leader是否不平衡的时间间隔<br/> leader.imbalance.check.interval.seconds = 300<br/> # 客户端保留offset信息的最大空间大小<br/> offset.metadata.max.bytes = 1024 #############################Consumer #############################<br/> # Consumer端核心的配置是group.id、zookeeper.connect<br/> # 决定该Consumer归属的唯一组ID,By setting the same group id multiple processes indicate that they are all part of the same consumer group.<br/> group.id<br/> # 消费者的ID,若是没有设置的话,会自增<br/> consumer.id<br/> # 一个用于跟踪调查的ID ,最好同group.id相同<br/> client.id = <group_id> # 对于zookeeper集群的指定,必须和broker使用同样的zk配置<br/> zookeeper.connect=debugo01:2182,debugo02:2182,debugo03:2182<br/> # zookeeper的心跳超时时间,查过这个时间就认为是无效的消费者<br/> zookeeper.session.timeout.ms = 6000<br/> # zookeeper的等待连接时间<br/> zookeeper.connection.timeout.ms = 6000<br/> # zookeeper的follower同leader的同步时间<br/> zookeeper.sync.time.ms = 2000<br/> # 当zookeeper中没有初始的offset时,或者超出offset上限时的处理方式 。<br/> # smallest :重置为最小值<br/> # largest:重置为最大值<br/> # anything else:抛出异常给consumer<br/> auto.offset.reset = largest # socket的超时时间,实际的超时时间为max.fetch.wait + socket.timeout.ms.<br/> socket.timeout.ms= 30 * 1000<br/> # socket的接收缓存空间大小<br/> socket.receive.buffer.bytes=64 * 1024<br/> #从每个分区fetch的消息大小限制<br/> fetch.message.max.bytes = 1024 * 1024 # true时,Consumer会在消费消息后将offset同步到zookeeper,这样当Consumer失败后,新的consumer就能从zookeeper获取最新的offset<br/> auto.commit.enable = true<br/> # 自动提交的时间间隔<br/> auto.commit.interval.ms = 60 * 1000 # 用于消费的最大数量的消息块缓冲大小,每个块可以等同于fetch.message.max.bytes中数值<br/> queued.max.message.chunks = 10 # 当有新的consumer加入到group时,将尝试reblance,将partitions的消费端迁移到新的consumer中, 该设置是尝试的次数<br/> rebalance.max.retries = 4<br/> # 每次reblance的时间间隔<br/> rebalance.backoff.ms = 2000<br/> # 每次重新选举leader的时间<br/> refresh.leader.backoff.ms # server发送到消费端的最小数据,若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。<br/> fetch.min.bytes = 1<br/> # 若是不满足fetch.min.bytes时,等待消费端请求的最长等待时间<br/> fetch.wait.max.ms = 100<br/> # 如果指定时间内没有新消息可用于消费,就抛出异常,默认-1表示不受限<br/> consumer.timeout.ms = -1 #############################Producer#############################<br/> # 核心的配置包括:<br/> # metadata.broker.list<br/> # request.required.acks<br/> # producer.type<br/> # serializer.class # 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip<br/> metadata.broker.list #消息的确认模式<br/> # 0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP<br/> # 1:发送消息,并会等待leader 收到确认后,一定的可靠性<br/> # -1:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性<br/> request.required.acks = 0 # 消息发送的最长等待时间<br/> request.timeout.ms = 10000<br/> # socket的缓存大小<br/> send.buffer.bytes=100*1024<br/> # key的序列化方式,若是没有设置,同serializer.class<br/> key.serializer.class<br/> # 分区的策略,默认是取模<br/> partitioner.class=kafka.producer.DefaultPartitioner<br/> # 消息的压缩模式,默认是none,可以有gzip和snappy<br/> compression.codec = none<br/> # 可以针对默写特定的topic进行压缩<br/> compressed.topics=null<br/> # 消息发送失败后的重试次数<br/> message.send.max.retries = 3<br/> # 每次失败后的间隔时间<br/> retry.backoff.ms = 100<br/> # 生产者定时更新topic元信息的时间间隔 ,若是设置为0,那么会在每个消息发送后都去更新数据<br/> topic.metadata.refresh.interval.ms = 600 * 1000<br/> # 用户随意指定,但是不能重复,主要用于跟踪记录消息<br/> client.id="" # 异步模式下缓冲数据的最大时间。例如设置为100则会集合100ms内的消息后发送,这样会提高吞吐量,但是会增加消息发送的延时<br/> queue.buffering.max.ms = 5000<br/> # 异步模式下缓冲的最大消息数,同上<br/> queue.buffering.max.messages = 10000<br/> # 异步模式下,消息进入队列的等待时间。若是设置为0,则消息不等待,如果进入不了队列,则直接被抛弃<br/> queue.enqueue.timeout.ms = -1<br/> # 异步模式下,每次发送的消息数,当queue.buffering.max.messages或queue.buffering.max.ms满足条件之一时producer会触发发送。<br/> batch.num.messages=200
转发申明:
本文转自互联网,由小站整理并发布,在于分享相关技术和知识。版权归原作者所有,如有侵权,请联系本站 top8488@163.com,将在24小时内删除。谢谢