我有一个应用程序,有多个螺栓和单喷嘴。我的流媒体数据以非常快的速度从喷口进入第一个螺栓。在第一个螺栓中,处理单个元组大约需要50秒的时间,然后转到第二个螺栓。在第二个bolt中,也有复杂的处理过程,所以一个元组也需要20秒,但我的喷口获取数据的速度非常快。我的问题是,在两个螺栓之间,有任何队列像卡夫卡一样保存数据。或者必须在两个螺栓之间执行卡夫卡。或者任何其他方法,其中单个元组的处理时间为40-50秒,并且即将到来的数据流非常快
谢谢Storm使用了大量内部队列。有关概述,请参阅。应该不需要自己
我无法从应用程序向kafka发送文本消息,但当我尝试发送对象(事件)时,收到异常:
信息:ErrorMessage[payload=org.springframework.messaging.MessageHandlingException:消息处理程序[org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]中发生错误;嵌套异常为java.lang.NullPointerException:在or
卡夫卡博士说
压缩还允许从删除。带有密钥和空值的消息
有效负载将被视为从日志中删除。这是删除标记
将导致删除之前带有该密钥的任何消息(如
带有该键的新消息),但删除标记在它们中是特殊的
将在一段时间后从日志中清除它们自己。这个
上面给出了删除不再保留的时间点
我的场景是:我将消息流发送到内存引擎中,然后它们从另一端无序地出现。消息可以在引擎的状态机缓存(它是一个规则处理器引擎)中花费从零到数小时的时间
因此,我想知道是否可以使用日志压缩作为持久性的答案,以便始终知道引擎中仍有哪些消息用于容错。因
我有一个应用程序,目前正在运行,正在使用Rx流在不同来源之间移动数据。现在在这个应用程序中,我有几个流,我想把它们的消息写到一个卡夫卡主题。实际上,假设我有如下流1到流5:
Stream1-接受数据类型A
Stream2-接受数据类型B等
其中,这些流1至流5为Rx观察者,每个观察者观察不同类型的数据。我从流中获取的所有这些数据类型都转换为一个公共JSON结构。我希望将这个JSON结构推送到单个卡夫卡主题
现在的问题是:
我应该为这些流中的每一个创建一个KafkaProducer还是更确切地说为
我已使用此命令标记了3个要删除的主题
kafka主题——Zookeer localhost:2181——删除——主题名称
现在我无法使用它们,也无法重新创建它们如何解决此问题?
要么完全删除它们,然后重新创建它们,要么取消标记删除状态 您是否列出并查看这些主题是否存在?为了列出:
kafka-topics.sh——列表——zookeeper本地主机:2181
从0.8.2.x开始支持删除主题。您需要首先在所有代理上启用主题删除(将delete.topic.enable设置为true)
确保您已在
直到最近,我还认为制片人、领导人选举和元数据的过程是这样的:
制作人发布给一家领先的经纪公司,该经纪公司倒闭了,这将失败
生产者尝试了几次(或零次,取决于配置)
最终,制作人将“失败”地发布该消息
这将触发生产者联系代理获取新的元数据块,以便找到新的领导者并继续
然而,我观察到的是生产者在用尽重试次数后阻塞,并且在元数据“自动”刷新之前不做任何事情。此刷新将基于此属性中配置的时间(来自Apache的Kafka文档):
metadata.max.age.ms:在这段时间之后,我们强制刷新元数据,即
以前我一直在使用0.8API。当您将主题列表传递给它时,它将返回一个流映射(每个主题一个条目)。这允许我生成一个单独的线程,并将每个主题的流分配给它。由于每个主题中的数据太多,因此生成一个单独的线程有助于多任务处理
//0.8代码示例
映射用户映射=
consumer.createMessageStreams(topicCountMap);
我想升级到0.10。我检查了KafkaStreams和KafkaConsumer类KafkaConsumerobject获取配置属性并提供获取主题列表的s
目前我有一个卡夫卡主题
现在,我需要运行多个使用者,以便可以并行地读取和处理消息
这可能吗
我正在使用python和pykafka库
consumer = topic.get_simple_consumer(consumer_group=b"charlie",
auto_commit_enable=True)
在两个消费者身上都传达了相同的信息。我只需要处理一次消息。是的,您可以让多个使用者并行读取同一主题,前提是您使用相同的使
卡夫卡主题可以通过命令行工具或API创建
在有多个主题和多个环境(dev/qa/prod等)的大型系统中,有必要控制主题的创建及其配置(分区等)。例如,我们可能希望在源代码管理下维护一个shell脚本文件,该文件检查主题是否已经存在,如果不存在,则创建它。然后,可以将相同的脚本应用于每个后续环境(可能自动作为CI/CD的一部分),以创建或修改所需的主题
是否有任何标准文件格式或工具用于维护Kafka主题拓扑?Kafka没有提供现成的脚本,但您可以编写一个简单的SHELL脚本来实现此功能,如下所示
我在本地机器上玩open shift,当我尝试使用此工具部署Kafka节点时,我遇到了以下异常:
我不知道发生了什么,卡夫卡成功地和动物园管理员交谈了,我可以在这个日志中看到,但在那之后,这个例外,显然是不够的,我的意思是,卡夫卡想写什么?哪个用户正在尝试写入
[2017-05-24 13:06:28,835] INFO Opening socket connection to server zookeeper.myproject.svc.cluster.local/172.30.106.154
我在一个Kafka消费群体中尝试了多个消费者实例,但Kafka.common.notleaderorpartitionException总是失败
我的卡夫卡集群由3个代理和主题组成,主题有PartitionCount:2和ReplicationFactor:3
SparkConsumer.java
public class SparkConsumer {
private static Function2<Integer, Integer, Integer> MyReducerFun
卡夫卡版本:0.10.2.1(服务器)
动物园管理员:与卡夫卡捆绑
发行
如果删除主题,然后重新启动代理,则会失败。代理配置为delete.topic.enable=true
删除命令:
./kafka-topics.sh --zookeeper localhost:2182 --delete --topic MY.TOPIC.NAME
目前唯一的解决方法是手动转到日志目录并删除
主题目录使用rm rf。贴出来就行了
错误:
[2017-06-09 12:24:43,359] ERROR Th
我正试图为一种通过卡夫卡传输的消息数据定义一个通用的Avro模式。一些示例数据可能如下所示:
"REDHEAD BIGBODY SLIMLEGS 53"
"BLUEHEAD NOBODY NONE 7"
大致翻译成
{Name: Header, Range: [0:8], Type: String, Size: 9}
{Name: Body, Range: [9:17], Type: String, Size: 9}
{Name:
在Kafka中,分区的数量对生产者吞吐量有影响吗?
(我知道分区的数量是用户端并行度的上限,但它会影响生产者的性能吗?)
我使用Kafka中的producer performance工具在AWS上的Kafka集群设置上测试了这一点。我观察到,对于3、6和20个分区,集群中的聚合吞吐量大致相似(大约200 MB/s)。如果你能帮我澄清这个问题,我将不胜感激
谢谢。答案分为两部分:
从卡夫卡消费者的角度来看。是的,分区提高了Kafka消费者的吞吐量。但是,我发现如果您想要良好的可伸缩性,那么您确实希
我试图运行官方的“Kafka010Example.scala”,但它并没有像预期的那样从输入主题中读取并写入输出。我错过了什么或做错了什么?非常感谢任何帮助或暗示。下面就是我所做的:
在docker容器中启动kafka(spotify/kafka:最新版本)
$docker run-d-p 2181:2181-p 9092:9092 spotify/kafka:latest
已在容器内启动bash会话:
$docker exec-it 26d1cfced4cb/bin/bash
创建输入和输出主
如何通过卡夫卡主题中的信息实现这样的结果
即类似于changelog的功能—有多条消息进入主题,但我只关心最后一条消息
在主题被分区的情况下会发生什么
在卡夫卡中是否有可能实现此目的?要实现此目的,应将此主题的cleanup.policy设置为compact,如下所示:
创建主题:
bin/kafka-topics.sh-zookeer localhost:2181-create-topic my-topic-partitions 1-replication factor 1-config cl
我试图连接两个Ktable流,似乎作为连接操作的一个输出,我两次得到与输出相同的消息。似乎在此操作过程中调用了两次值Joiner
让我知道如何解决这个问题,以便只有一条消息作为连接操作的输出发出
KTable<ID, Message> joinedMsg = msg1.join(msg2, new MsgJoiner());
KTable joinedsg=msg1.join(msg2,新的MsgJoiner());
由于两个ktable(msg1和msg2)之间的连接,我收到两
团队,
Kafka streams应用程序部署应遵循什么策略
我们有一个应用服务器集群,根据部署策略,我们使用Jenkins CI/CD进行jar部署。
我们来平衡问题了。应用程序部署后,
在3-5分钟内,所有服务启动,组进入重新平衡状态
在部署之前,为分配的分区和使用者创建快照
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
对于Java/Kotlin Spring boot应用程序,如果我想向Kafka发送消息或使用来自Kafka的消息。您建议使用SpringKafka库还是只使用KafkaJavaAPI
不太确定Spring是否提供了更多的好处,或者只是一个包装器?对于Spring,它们提供了很多注释,当出现运行时错误时,这些注释看起来更神奇
想听听一些意见。完整披露:我是ApacheKafka的Spring项目负责人
这完全取决于你和你的同事
它在某种程度上可以与编写汇编代码和使用高级语言和编译器进行比较
对于
我为Jetty 9和Kafka使用ssl密钥库。我需要提供密钥库和密钥密码来访问密钥库和私钥。但是,我不想在配置文件中以明文形式提供这些密码。还有哪些其他选项可以安全地提供/加密密码?每种方法的优缺点是什么?自从Kafka 2.0.0以来,在启动代理之前,所有密码配置都可以在zookeeper中预加载。可以使用kafka configs.sh工具在Zookeeper中以加密格式存储密码,从而避免在属性文件中以明文形式指定密码
请参阅卡夫卡文档中的部分,特别是“启动代理之前在ZooKeeper中更
我有一个简单的Kafka代理,它运行的主题是原始事件
使用kafka控制台生产者-主题原始事件-代理列表kafka:29092
我有一个关于日志保留和Kafka Streams减少操作的问题
我有以下流配置
builder.
.stream("topic1", Comsumed.with(Serdes.string(), Serdes.string()))
.groupByKey(Serialized.with(Serdes.string(), Serdes.string()))
.reduce((val1, val2) -> val2, Materialzed.as("store1"))
我知道墓碑事件没有
主题中的卡夫卡数据可以流式传输、消费和接收到BigQuery/云存储中的可能选项有哪些
根据,,
GCP附带的数据流构建在Apache Beam编程模型之上。KafkaIO与Beam Pipeline一起使用是对传入数据执行实时转换的推荐方法吗
Kafka数据可以推送到cloud pub sub,然后推送到BigQuery表上。也可以使用GCP之外的卡夫卡流/火花作业
当数据完全由谷歌云平台(GCP)托管时,需要考虑的因素是什么? < P>在2016中,卡夫卡支持被添加到Apache束中,伴
我有一个Spring引导应用程序,它使用处理器API生成拓扑,还为同一拓扑生成一个addGlobalStateStore
我想创建另一个拓扑(从而创建另一个KafkaStreams)以从另一组主题中读取,并想在新拓扑中共享先前创建的存储。共享意味着两种拓扑的底层状态存储应该相同。从一个拓扑写入的任何数据都应该在另一个拓扑中可见
如果不编写包装器端点来访问状态存储(例如REST调用),这是否可能
或者我的用例是否需要外部状态存储,例如redis?不,您不能跨拓扑共享状态存储。如果可能的话,您可以将
我有两条溪流:
[topicA]->processingA->[topicB]->processingB->[topicC]
通过登录我的应用程序,我注意到从processingA向topicB发送输出和从topicB拾取消息以进行处理B之间,每次需要超过100毫秒(而不是150毫秒)。它可能不多,但它会累积,最终相当简单的处理级联几乎需要秒
我可以调整卡夫卡,使这些延迟尽可能接近于零吗?
哪些配置参数对这些延迟有影响
我有很多默认配置。是commit.interval.ms导致延迟吗?我已经将
我希望列出kafka代理上的所有活动配置。我可以在server.properties文件中看到配置,但这还不是全部,它没有显示所有配置。我希望能够看到所有配置,甚至是默认配置。这可能吗?
任何指向此方向的指针都将不胜感激。没有命令列出kafka代理的当前配置。但是,如果您想查看具有默认值和重要性的所有配置参数,请在此处列出
您可以通过Kafka AdminClient(我使用的是2.0 FWIW-界面仍在发展)以编程方式实现这一点:
final String brokerId=“1”;
最终Co
我正在尝试使用KafkaConsumer获取id为0的分区上的当前位置
consumer.position(new TopicPartition("my-topic-name", 0))
然而,这永远不会回来。它不断尝试发送以下两个请求:
已完成的请求:RequestHeader(apiKey=FIND\u COORDINATOR,apiVersion=1,clientId=consumer-1,correlationId=6)--{COORDINATOR\u key=,COORDINATO
我在本地机器Ubuntu VM上运行ApacheKafka和Couchbase。我有一个从MS SQL读取的SourceConnector和一个将写入Couchbase的sink连接器。我目前的水槽连接器有问题
当我设置接收器连接器并检查其状态时,它正在运行。但是,在我更新SQL DB中的记录a记录并检查连接器状态后,出现以下异常:
我已经证实,数据正在流入卡夫卡主题和流。唯一不起作用的是写信给Couchbase。(Couchbase未更新,并且我在连接器状态中得到异常)
以下是我的连接器JS
我将多线程kafka使用者用于单个60个分区的kafka主题,每个使用者线程映射到每个分区。消费完成后,使用KafkaConsumer.close()方法关闭每个消费线程
在使用相同的使用者groupid重新消费期间(每个使用者使用seek-to-start),1或2个使用者线程无法从其分配的分区消费。
如果提供了新的消费者组id,则不会显示此问题
因此,如果使用下面的脚本删除消费者组。
bin/kafka-consumer-groups.sh--引导服务器“3个服务器:端口”--删除--组组1
我有一个卡夫卡主题。
我有一个流,其中键是股票符号,值是高/低pojo。我还有一个KTable,它捕获流的当前状态
我想逐一处理流中的每条记录。对于每条记录,我想从KTable中查找该符号的当前值。然后根据Hi/Low是否更改,我想更新KTable,然后将消息写入流。JavaDoc展示了如何通过交互式查询功能查询KTable。我认为交互式查询对这个用例没有帮助。然而,总的来说,似乎需要使用处理器API而不是DSL来获得更大的灵活性。在自定义处理器中,可以使用KeyValueStore(实际上与K
我想为python java构建ssl配置
对于java
对于python
我将它们合并到一个外壳中
#/bin/bash
PASSWORD=“asdfasdf”
KEY_PASSWORD=${PASSWORD}
STORE_PASSWORD=${PASSWORD}
有效期=365000
#服务器
keytool-keystore kafka.server.keystore.jks-alias server_alias-validity${validity}-genkey-keypass${
我有卡夫卡输入和输出主题。
我所做的是写在主题A中,并希望在主题B中接收消息,但需要缓冲
我的代码如下所示:
@Bean
public Function<Flux<String>, Flux<String>> stringFlow() {
return flux ->
flux.window(4)
.flatMap(m -> m);
}
输出主题:
A
我正在尝试在EC2上运行模式注册表
我的卡夫卡正在AWS上运行
这是我的属性文件
listeners=http://0.0.0.0:8081
kafkastore.connection.url=z-3.***:2181,z-***:2181,z-**:2181
kafkastore.bootstrap.servers=PLAINTEXT://b-3.**:9092,PLAINTEXT://b-6.**:9092,PLAINTEXT://b-1.**:9092
kafkastore.topic=_
我是卡夫卡的新手。我一直在测试卡夫卡发送信息的能力
这就是我现在的处境
本地VM中的add.java定期向本地VM中的kafka发送消息
另一台服务器中的relay.java正在从本地VM中的kafka进行轮询,并生成另一台服务器中的kafka
当我从本地VM中的kafka向另一台服务器中的kafka发送消息时
我从笔记本电脑上拔出局域网电缆。几秒钟后,我再次将LAN电缆连接到它
然后我发现一些信息在局域网电缆断开时丢失了
但是,当网络重新连接时,我希望获得所有断开连接的消息,而不必重新连接
当我创建kafka producer时,如果同时使用checkpoint,则会导致以下问题:
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1260)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1155)
at org.
我正在使用kafka和elasticsearch设置flink流处理器。我想重播我的数据,但当我将并行度设置为大于1时,它不会完成程序,我认为这是因为kafka流只看到一条消息,将其标识为流的结尾
公共自定义架构(日期\u结束时间){
endTime=_endTime;
}
@凌驾
公共布尔值isEndOfStream(CustomTopicWrapper nextElement){
if(this.endTime!=null&&nextElement.messageTime.getTime(
标签: Apache Kafka
kafka-consumer-apiapache-kafka-streamsapache-kafka-connect
我正在编写KafkaStreams应用程序,并将max.num.threads设置为1。我有三个源主题,分别有6、8、8个分区。当前正在使用4个实例运行此streamtopology,因此有4个正在运行的streams线程
我在我的卡夫卡主题中得到了不完整的源主题元数据。我从github中找到以下代码
抛出此错误并尝试理解代码
final Map<String, InternalTopicConfig> repartitionTopicMetadata = new HashM
我开发了生产卡夫卡的环境,包括3个ZK服务器、3个卡夫卡代理和两个卡夫卡连接。我把我的tmp文件和我的卡夫卡主文件夹放在一起。我在远程ubuntu环境中运行它,但不在docker中运行
当我操作卡夫卡操作时,我遇到了一个错误,它告诉我我的磁盘被消耗了太多。我检查我的kafka tmp文件夹,其大小几乎是我磁盘大小的2/3,这将关闭我的kafka群集
我检查了每个kafka log_文件夹,发现:
25每台21MB的1号工人的连接偏移量
252号工人的连接偏移量221MB
25connect_st
我按照《快速入门指南》的说明在本地运行卡夫卡
然后我在config/consumer.properties中定义了我的消费者组配置,这样我的消费者就可以从定义的group.id
运行以下命令
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
结果,
test-consumer-group <-- group.id defined in conf/consumer.properties
consol
我想测试使用处理器API的Kafka流从源读取并写入主题列表时的场景,并且一个或两个主题无法访问(失败测试:尝试通过添加集群中不存在的1/2主题来模拟它)
我的理解是kafkaStreams应该为不存在的主题给出错误,并继续将记录转发到存在的主题1和主题2
但我看到的行为是,它会产生以下错误:
这是模拟无法访问的主题或主题不存在的问题的正确方法吗?还有,为什么即使在处理流和拓扑异常时,Kafka流也会因上述错误而关闭。
如果某个接收器主题因某种原因不可用或无法访问,kafka streams不应
我正在编写一个卡夫卡消费者,它应该反序列化来自avro的传入消息。我确实有消息的模式,并且想知道在香草卡夫卡中反序列化这些消息的最佳方式是什么
我搜索了一段时间,但我看到的所有示例都是这样的文件反序列化http://avro.apache.org/docs/1.9.0/gettingstartedjava.html 不要涵盖卡夫卡的信息部分
我确实使用了avro maven插件,并将我的模式转换为pojo类
任何建议都将不胜感激
谢谢您需要反转制作人的流程。如果生产者使用了模式注册表序列化程序,
我想使用filepulse连接器将xml文件加载到kafka
以下是我的环境:
plugin.path=/user/share/java,/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2
# plugin.path=/usr/share/java,/home/min/confluent-5.5.1/share/java/kafka-connect-jdbc
Win10WSL,已安装Ubuntu
下载了confluent platf
我有一个场景,我们有两个不同的流,我们在两个不同的时间获取它们的数据,我需要根据值中的时间戳将它们连接起来。我将尝试通过下面的例子来解释
inputStream1->
键111,值21:00 AAA
键111,值21:02 AAA
键111,值21:04 AAA
键111,值21:15 AAA
键111,值21:18 BBB
键111,值21:20 BBB
inputStream2->
键111,值21:01 10.0.0.1
键111,值21:04 10.0.0.2
键111,值21:1
Kafka mirrormaker 2能否将不同来源的多个相同主题的消息写入目标集群中的一个主题?主题元数据不需要复制,只需要复制消息,因为两个主题到一个主题将更改原始主题的元数据,我们不希望源主题与目标主题相同,只需要复制消息
例如,有两个主题为A的源代码,我们可以将这两个主题像目标中的A一样写入一个主题吗?Confluent replicator可以做到这一点,只需复制消息,而不是镜像所有内容
标签: Apache Kafka
kafka-consumer-apiapache-kafka-streamsconfluent-platform
我有以下几点:-
具有20个分区的单个主题
我有10个单独的实例为streams应用程序供电(这些实例使用上述主题),在配置中的每个Stream应用程序上,我添加了NUM_Stream_THREADS_CONFIG=5
以下理解正确吗
那么我最多可以有50个(10*5)逻辑任务,但由于分区的数量不是20个,所以只能增加20个任务
此外,由于流任务的分区分配是偶数的,每个分区是否会分配给20个任务中的每一个,即每个任务在单个分区上工作
到目前为止正确吗
感谢您的回复。是的,只要有20个分区
标签: Apache Kafka
avroconfluent-platformconfluent-schema-registry
合流模式注册表底层存储Kafka中_schematopic下的所有模式。是否可以将此后端存储主题拆分为多个,如_schema1和_schema2
为什么?因为在我的生产用例中,我希望存储1000多个模式,所以分离后端存储会减少主题的负载
此外,confluent schema registry是否支持卡夫卡主题以外的后端存储?无法拆分,否;在工作中,我们有远远超过1000个模式,并且注册表工作良好
“主题加载”不是您将遇到的问题,而是注册表服务器的堆使用情况,因为模式存储在多个哈希映射中
汇合注
是否有任何选项可以使用JMX获取卡夫卡经纪人的状态?
如果是,我应该查询什么指标?
或者我可以从其他指标推断?
谢谢
鲁迪你想要什么样的身份?如果代理进程关闭,那么JMX将不可用基本上,我需要知道代理是否已启动,这样我就可以知道服务是否已启动(对于我的仪表板),我正在VM上运行kafka群集。查询每个代理似乎很笨拙。我正在寻找一个指标(或任何其他方法),它将显示有多少代理处于运行状态,如果不查询前面提到的每个代理,如果代理JVM停止,JMX将不可用,因此任何指标都可以工作,但听起来您需要一个本地
我试图从卡夫卡向德鲁伊传送数据。
我为卡夫卡制作了数据,当我试图使用这些数据时,会出现以下错误。
我检查了2181端口和zookeeper,看起来像在运行。
谢谢
错误日志:
2021-05-05T20:29:14151警告[主发送线程(localhost:2181)]org.apache.zookeeper.ClientCnxn-服务器localhost/:2181的会话0x0,意外错误,正在关闭套接字连接并尝试重新连接
java.lang.IllegalArgumentException:无
我正在进行Kafka群集安装,但我看到有些人使用多个代理主机名连接到Kafka群集。为什么要为集群使用多个代理主机名?我们为什么不使用集群主机名呢?谢谢,群集没有主机名
客户端使用多个(或全部)代理地址作为容错引导连接谢谢,所以我们可以使用一个代理地址进行连接,对吗?我很确定我的回答是使用多个代理地址
我试图使用KSQL创建一个流,但返回了一个错误
我正在运行的声明是:
使用(卡夫卡主题='T3非结尾',值格式='avro')创建流s1
我得到一个NoClassDefFoundError-org.xerial.snappy.snappy
据我所知,这是因为/tmp被设置为noexec。从Confluent网站以及使用Snappy的其他应用程序来看,需要传递目录路径
有人知道我在使用KSQL时如何传递Snappy的目录路径吗?听起来像是在问如何设置JVM标志
export KSQL_OPTS=
1 2 3 4 5 6 ...
下一页 最后一页 共 128 页