Questions tagged [apache-kafka]

Apache Kafka is a distributed streaming platform designed to store and process high-throughput data streams.

0
votes
1answer
10 views

Docker(-Compose) - Reserve CPU and Memory for Linux Host

I am running an APACHE KAFKA cluster including broker, connect and control center e.g in separate docker containers (using docker-compose). Problem is, that the load of the containers for some reason ...
1
vote
1answer
21 views

Writing to multiple Kafka partitions from Spark

I have Spark code that writes a batch to Kafka as specified here: https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html The code looks like the following: df.selectExpr("...
1
vote
1answer
11 views

How to compute in code what partition Kafka *would have sent (and did send earlier)* sent a message to?

Using a topic and msg key Kafka assigns a topic partition based on: ... Utils.Murmur2(bytes) % numPartitions .... Now it's simple enough to find murmur2 code. However, murmur2 needs a seed ...
0
votes
0answers
10 views

spark streaming join rdd from different input DStream

We have two inputDStream from two kafka topics, but we have to join the data of these two input together. The problem is that each InputDStream is processed independently, because of the foreachRDD, ...
0
votes
0answers
12 views

Kafka Spout read twice message on Storm Topology

I'm trying to simulate stream traffic using Kafka to Storm. I used KafkaSpout to read a message from one topic sent by a Producer that read these Tweets and send them to a topic. My problem is that ...
0
votes
0answers
5 views

Kafka OAuth: Unrecognized SASL Login callback

What I am trying to do is: Setup SASL/PLAIN for inter-broker communication Setup SASL/PLAIN for Broker-Zookeeper communication And Setup SASL/OAUTHBEARER for client-broker communication. My ...
0
votes
1answer
16 views

What is the difference between implementing Deserializer and Serde in Kafka Consumer API?

I try to simulate Gwen (Chen) Shapira's kafka-clickstream-enrich kafka-stream project on GitHub (https://github.com/onurtokat/kafka-clickstream-enrich). When I consume a topic using consumer class ...
0
votes
0answers
30 views

Kafka Consumer integration in unity3d C# [duplicate]

I have this code snippet in C# windows application. It is connecting and working fine in winform environment. Desktop APP Snippet static void Main(string[] args) { string bootstrapServers = "...
3
votes
2answers
29 views

Kafka cluster with single broker

I'm looking to start using Kafka for a system and I'm trying to cover all use cases. Normally it would be run as a cluster of brokers running on virtual servers (replication factor 3-5). but some ...
0
votes
0answers
7 views

How can I use a kafka KeyValueStore as KTable?

Is it possible to use a KeyValueStore as KTable? The KeyValueStore will we filled with transform operation from another topic. This is my KeyValueStore: StoreBuilder<KeyValueStore<String, ...
0
votes
0answers
8 views

How to configure multiple kafka consumer in application.yml file

Actually i have a springboot based micro-service , and i have used kafka to produce/consume data from different system. Now my question is i have two different topics and based on topics i have two ...
0
votes
1answer
18 views

Rabbitmq to Kafka Protocol Translation

I am using RabbitMq in Ubuntu Sever as a cluster. I want to configure Kafka to Rabbitmq. I have gone through below link But no luck. https://www.confluent.io/connector/kafka-connect-rabbitmq/
0
votes
0answers
17 views

How much infrastructure and configuration is required for Handling Big Data

We are setting up infrastructure in our premises for working on big data projects. The Amount of Data is in TeraBytes. The Source of data is both Realtime as well as Stored Data. For AI computing How ...
0
votes
1answer
18 views

Stream processor didn't process messages correct first, then multiplicates it

I have two topics which are connected by a StreamProcessor: TopicA -> ProcessorA -> TopicB I send a message for the first to Topics a which gets recorded there and then through the ProcessorA ...
0
votes
0answers
15 views

consumer consuming the same message twice at the starting only

At the very first consuming, my consumer is consuming the same message twice, this only happens at the first consuming, after that it only consumes once. Attaching the consumer conf code ...
0
votes
0answers
13 views

Node-red to Apache Kafka Connectivity

I have kafka installed in a ubuntu server, and node-red is in my personal laptop. I want to send data from node-red to the kafka topic. I tried using the kafka node in node-red to connect, but I am ...
-2
votes
1answer
41 views

Comparing IBM MQ to Kafka

I found several comparisons between Kafka and IBM MQ. But I didn't get all my answers yet. I have some conclusions and I need to confirm them. I know that IBM supports peer to peer. But, I'm comparing ...
-1
votes
0answers
15 views

unable to query json structed data with spark and kafka

all. I am using spark2.4.0 with kafka2.2(both with scala 2.11) to handle json streaming data. I followed some links here: example1 example2 my data format(random data) in kafka is json: {"yaw": 0,...
0
votes
0answers
22 views

Kafka Consumer Hangs Indefinitely after Rebalancing

I am trying to utilize a kafka consumer library that is prewritten in my organization. It takes JSON data from a Kafka topic and stores it in a Mongo database. While I cannot post this code, it is a ...
0
votes
0answers
13 views

Preventing KStream from emiting old unchanged aggregate value

I have a KStream pipeline which groups by key and then windows on some interval and then applies a custom aggregation on that: KStream<String, Integer> input = /* define input stream */ /* ...
0
votes
0answers
20 views

Supress aggregation until custom condition

I am using Kafka DSL. How would I proceed to suppress the output of an aggregation (similar behavior to this) with a custom condition? Let's say for every key I may have a START and a STOP event. I ...
0
votes
0answers
21 views

Magic v1 does not support record headers

I am trying to produce a message to Kafka. I am using Kafka-client with version 1.1.0 and camel-version: 2.22.1 but every time getting an error saying Magic v1 does not support record headers ...
1
vote
0answers
18 views

Quarkus Kafka consumer doesn't work in native mode

I have a Quarkus Kafka consumer. In VM mode it works well. After I build native runner with : ./mvnw package -Pnative when I run it in native mode I have this exception : 2019-05-23 17:17:42,...
1
vote
1answer
23 views

Kafka logs deleted even after setting log.retention.ms = -1

I'm running a hyperledger fabric 1.2.0 network with 5 Kafkas and 3 Zookeepers. The issue I'm facing is even after setting log.retention.ms = -1, Kafka deleted few initial logs. To my knowledge, after ...
1
vote
1answer
34 views

How can I use “MQTT protocol with Kafka as a broker”?

I want to implement a chat application which uses "MQTT protocol" in order to send messages from the device(Android phone). I need a "Kafka broker" which would run on the server and listen to these ...
0
votes
1answer
13 views

Mirror data in Kafka 0.8.2.1 cluster to Kafka 2.2.0 cluster

I wanted use Apache Spark Structured Streaming along with Kafka, Spark Structured Streaming Supports Kafka 0.10 and above and my Kafka cluster uses kafka version 0.8.2.1 . I want to replicate some of ...
0
votes
1answer
18 views

Real time data integration how does Kafka, Hadoop, Avro and HDFS fit together and what kind of architectures is there for data integration

I am trying to understand what architectures there is for real time data integration and how all the pieces fit together. I have tried to research on the internet but I could not find good resources. ...
0
votes
0answers
17 views

Too many open files on a project using spring-cloud stream and kafka after upgrade to 2.1

After upgrade to spring cloud stream 2.1 on a project using multi binder kafka(2 brokers) and rabbit(1 broker) we are facing too many open files problem. The number of files opened keeps growing to ...
1
vote
0answers
14 views

How to configure Fluentd configuration file for sending logs to Kafka?

I've been working to create a syslog application in which I have to collect different kinds of logs generated using Fluentd and send them to Cassandra for analysis later on. I have to use Kafka as a ...
0
votes
0answers
17 views

Can we use multiple queues for all the topics, like one dedicated queue to each topic in Apache Kafka?

I want to make a dedicated queue to for each topic in Kafka. Is that possible or feasible?
2
votes
0answers
34 views

Spark Structured Streaming OutOfMemoryError caused by thousands of KafkaMbean instances

Spark Structured Streaming executor fails with OutOfMemoryError Checking the heap allocation with VirtualVM indicate that JMX Mbean Server memory usage grows linearly with time. After a further ...
0
votes
0answers
15 views

Consume pushed kafka data in a dataframe

I am consuming data from Kafka which I am successfully able to do but now I want to perform some operation on it by saving the data in a data frame. I am new to Kafka and don't know how to go about ...
0
votes
0answers
31 views

KafkaStreams KTable compaction

In my application almost every incoming records value is nulled out(tomb-stoned) within a few seconds, however I don't see compaction happening as RocksDB size is not reducing and it still has ...
1
vote
1answer
24 views

How to make GetKafka always read from latest offset in Apache Nifi?

I'm using GetKafka of Apache Nifi to read message. When I restart the processor, I'd like to make it always read from the latest off-set, instead the off-set this group committed. How can I achieve ...
1
vote
0answers
29 views

How to join three DStreams in Spark streaming using Python

I have three kafka producers which are sending data streams on the same topic at random intervals between 5-10 seconds. There is a Spark consumer (python based ) which is consuming the data. The ...
2
votes
2answers
62 views

Is it possible to have one Kafka consumer thread per topic?

We have Springboot application that uses Spring-Kafka (2.1.7). We have enabled concurrency, so we can have one consumer thread per partition. So currently, if we have 3 topics, each with 2 partitions, ...
0
votes
1answer
44 views

Encryption, authentication and external access for Confluent Kafka on Kubernetes

Am trying to configure Encryption, authentication and external access for Confluent Kafka on Kubernetes. helm chart https://github.com/confluentinc/cp-helm-charts. Document following : https://...
1
vote
1answer
30 views

Reading a CSV file line by line and producing streams with some lines sent with a random delay

I am reading concurrently 3 CSV file line by line and sending them to a message queue (Apache Kafka). The data rows are ordered by increasing timestamp values. I am simulating the streams by looking ...
1
vote
0answers
23 views

Confluent - Splitting Avro messages from one kafka topic into multiple kafka topics

We have an incoming kafka topic with multiple Avro schema based messages serialized into it. We need to split the messages in Avro format into multiple other kafka topics based on certain value of a ...
0
votes
1answer
31 views

I'm having hard time setting up kafka on gke and would like to know the best way of setting it up?

I was trying to use statefulset to deploy the zookeeper and Kafka server in a cluster in gke but the communication between the Kafka and zookeeper fails with an error message in logs. I'd like to know ...
1
vote
0answers
20 views

Kafka aggregation: Issue when grouping key changes

We are using kafka KTable for aggregation and below is the kind of data we receive in input Input data - Transaction Detail (transaction Id, status, category, amount,.. ) We group the above based on ...
-1
votes
0answers
19 views

KafkaAvroDeserializer is not an instance of Deserializer

The code below in java throws exceptions as shown at the bottom. Environment : Tomcat, Java8 KafkaConsumer <String, Map<ConfidentialityLevel, CommonCoreEvent>> consumer; Properties props ...
0
votes
1answer
32 views

What is the impact of acknowledgement modes in asynchronous send in kafka?

Does acknowledgement modes ('0','1','all') have any impact, when we send messages using asynchronous send() call? I have tried measuring the send call latency (that is, by recording time before and ...
1
vote
0answers
15 views

How to setup proxy layer on kafka broker?

I am trying to setup a proxy server (preferred in java) that can pass my connection stream to kafka broker. Client (Consumer/Producer) <--> [PROXY SERVER] <--> Kafka Broker My use case ...
0
votes
0answers
20 views

Apache Spark Temp files Size on DIsk

I have a setup where the incoming data from Kafka cluster is processed by Apache Spark streaming job. Version Info :- Kafka = 0.8.x Spark Version = 2.3.1 Recently when the capacity of Kafka cluster ...
0
votes
1answer
33 views

How to schedule kafka consumer properly in apache nifi?

I'm trying to use one consumer continuously read data from kafka. How should I set the scheduling options? I have read the User Guide, but I can not figure out how to set the run schedule and run ...
0
votes
0answers
15 views

Configure log4j.properties for Kafka appender, error when parsing property bootstrap.servers

I want to add a Kafka appender to the audit-hdfs log in a Cloudera cluster. I have successfully configured a log4j2.xml file with a Kafka appender, I need to convert this log4j2.xml into a log4j2....
0
votes
0answers
18 views

kafka connect jdbc source setup is not reading data from db and so no data in kafka topic

we configured kafka connect jdbc to read data from db2 and publish to kafka topic and we are using one of the column of type timestamp as timestamp.column.name , but i see that kafka connect is not ...
2
votes
3answers
96 views

Stream join example with Apache Kafka?

I was looking for an example using Kafka Streams on how to do this sort of thing, i.e. join a customers table with a addresses table and sink the data to ES:- Customers +------+------------+---------...
2
votes
0answers
28 views

How to include both “latest” and “JSON with specific Offset” in “startingOffsets” while importing data from Kafka into Spark Structured Streaming

I have a streaming query saving data into filesink. I am using .option("startingOffsets", "latest") and a checkpoint location. If there is any down time on Spark and when the streaming query starts ...