Questions tagged [confluent-kafka]

A family of Apache Kafka clients for different languages by Confluent, all based on libkafka. Add a language tag to specify the exact client.

0
votes
0answers
10 views

Kafka producer - multiple connections to broker vs single global connection

I am using confluent C# client for connecting with Kafka and trying to create an API that could be used by multiple existing applications to push messages in a topic. Now I am unsure if I should ...
0
votes
1answer
12 views

Setting Kafka producer properties on AvroProducer in python

I am wondering how to add properties to an AvroProducer in python. Here is code I have tried you can see something like i wish to do but adding in acks = all results in this error TypeError: ...
0
votes
0answers
7 views

How to push batch results to Kafka in .Net?

Net and Kafka. I am using latest version of Kafka in .Net. I am trying to write code for batchproducer. Previous beta version has method BeginProduce. Now latest Kafka version doesn't have this method....
0
votes
0answers
10 views

Confluent REST proxy API SSL handshake fails

I have a kafka cluster on docker using confluent images. I am using docker-compose to build the containers. When I tried to run the container it starts but can't communicate with any broker due to ...
0
votes
0answers
9 views

Delete messages and reset topic offset to 0 irrespective of consumer group

I need to create a Confluent Kafka setup where at the start of each day, all messages in a topic must get deleted and its offset reset to 0. I have tried to do this by setting log.retention to 1 day ...
2
votes
0answers
21 views

Kafka consumer gets stuck after exceeding max.poll.interval.ms

When the consumer does not receives a message for 5 mins (default value of max.poll.interval.ms 300000ms) the consumer comes to a halt without exiting the program. The consumer process hangs and does ...
0
votes
0answers
25 views

How to properly implement kafka consumer as a background service on .NET Core

I have implemented a kafka consumer as a console app by using BackgroundService on .NET Core 2.2. I am using confluent-kafka-dotnet v 1.0.1.1 as a client for Apache Kafka. I have doubts about the way ...
0
votes
0answers
11 views

How to execute my confluent kafka Python producer and consumer code from local windows machine

from confluent_kafka import Producer p = Producer({'bootstrap.servers': 'localhost:9092'})# localhost has IP address of the linux system p.produce('topic', "Hello World!") p.flush() I ran the ...
0
votes
1answer
9 views

No data is getting into a table built over a stream from ksql binary, it get's there if I do the same with ksql prompt?

I'm trying to create a table over a stream using ksql binary. The table gets created and starts running but when I query it there is no data inside the table. When I create the same using ksql prompt ...
0
votes
1answer
45 views

Confluent Kafka Golang Client Producer “Broker: Not enough in-sync replicas”

I am attempting to test out a producer writing messages to a topic on a kafka cluster using the Golang client. This works fine writing to a topic on a local cluster, I just copied and pasted the ...
0
votes
0answers
19 views

Regex for channel subscription in python

I want to match a pattern for kafka channel subscription which has changing pre-fix in python kafka consumer code. Eg : testsetup-test-topic prodsetup-test-topic I tried the following c.subscribe([...
2
votes
1answer
31 views

Docker alias incorrectly resolved by application when container started

I am creating two docker-compose files (mainly because I don't want to have to keep restarting my infrastructure while developing my application.) that need to reside on the same docker network so ...
0
votes
1answer
68 views

Kafka consumers unable to send heartbeat but receives messages

I'm using a Kafka Cluster from Bitnami on Azure (https://bitnami.com/stack/kafka/cloud/azure). It is a single node cluster with broker on one VM and Zookeeper on another. I have enabled the ports 9090 ...
0
votes
1answer
15 views

Use AvroConsumer to get messages in partitions and topics

I want to be able to read messages that come in specific partitions of a topic and also messages in another topic like a I would with a simple Consumer. self.consumer = AvroConsumer(conf) parts = [...
0
votes
1answer
78 views

Database connection error while celery worker remains idle for 24 hours

I have a django based web application where I am using Kafka to process some orders. Now I use Celery Workers to assign a Kafka Consumer to each topics. Each Kafka Consumer is assigned to a Kafka ...
0
votes
1answer
98 views

Confluent-Kafka Python : How to list all topics programmatically

I have a kafka broker running in my local system. To communicate with the broken using my Django based web application I am using confluent-kafka wrapper. However by browsing through the admin api, I ...
1
vote
0answers
47 views

Error when trying to push csv file to a kafka topic using confluent_kafka python

I am trying to push csv files to a Kafka topic using AvroProducer.s I am reading a csv file and looping row by row in the Kafka topic. The csv files has 10 columns which are same as defined in ...
0
votes
0answers
23 views

manual offset management in confluent-kafka

How to pass the specific offset value say from 25th offset(probably from database) into confluent-kafka consumer? What are steps to do I'm having 5 partition in the producer side, I need to consume ...
0
votes
1answer
88 views

How to read GenericRecord specific data in confluent kafka C#

This is my simple code snippet which try to to read the Avro generic record form consumer: using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = ...
1
vote
1answer
27 views

KSQL stream not fetching output from topic

I have connected kafka to mysql. The topic contains data in AVRO format. Now i want to convert that data into CSV format . For that i am using KSQL stream. When i am creating stream like CREATE ...
1
vote
1answer
45 views

Consumer not pulling Kafka message

We have some micro-services developed in c# and go languages. we use confluent-kafka for messaging and facing a problem: Go services are able to consume messages published from all other services (be ...
0
votes
0answers
19 views

kafka - how to set the timeout for a consumer that takes a long time to consume?

I have an kafka ambient where the consumer is killed after some time. Searching for information about this problem i've found this github issue: https://github.com/confluentinc/kafka-rest/issues/211 ...
0
votes
0answers
21 views

How to find the last committed message time stamp from kafka topic

I have created a Kafka topic consumer application using confluent.kafka library and c#. I need to check consumer application status using the Kafka topic. Is it possible to find the committed ...
1
vote
1answer
109 views

How to send and consume json messages using confluent-kafka in Python

I am fairly new to Python and getting started with Kafka. So I have setup a Kafka broker and I am trying to communicate with it using confluent-kafka. I have been able to produce and consume simple ...
2
votes
2answers
85 views

Poll several messages from Kafka

I'm using confluent_kafka package for working with Kafka. I create topic in this way: from confluent_kafka import avro from confluent_kafka.avro import AvroProducer def my_producer(): ...
1
vote
1answer
38 views

Is confluent kafka provides streaming, grouping and aggregation in python?

Is confluent kafka provides API for streaming, grouping and aggregation in python language?
0
votes
0answers
161 views

Cannot manage heartbeat frequency

I am using Confluent's C# Kafka client. When the consumer starts consuming, I observe using Wireshark that about 16 request are made to the server per second. I tried setting "HeartbeatIntervalMs = ...
0
votes
0answers
125 views

Using Confluent Kafka Consumer in Python

i`m having a problem when using confluent-kafka consumer with kerberos. I have Kafka running on Cloudera Cluster, secured with a kerberos to authenticate. And i`m developing python app on Linux using ...
0
votes
0answers
109 views

Random partitioner does not distribute messages between Kafka topic partitions

I've created a topic in Kafka with 9 partitions, naming it aptly 'test', and knocked together two simple applications in C# (.NET Core), using Confluent.Kafka client library: a producer and a consumer....
0
votes
0answers
32 views

Python Consumer Code not working on Google Cloud Platform + GKE

I have installed Confluent CP Charts on GKE from HERE Using these 2 commands, git clone https://github.com/confluentinc/cp-helm-charts.git helm install cp-helm-charts After successful installation,...
0
votes
1answer
53 views

Change schema name strategy in confluent Kafka

I am using confluent Kafka and schema registry and I cannot figure out a way to have custom schema name in python. From this blog I got how to do it in Java, but do not have any idea what the ...
1
vote
2answers
101 views

Kafka .net client does not receive any messages when using Assign()

I am trying to make my service re-read kafka topic from the beginning to the end on start to initialize internal data structures. I am using Confluent .NET client. From my understanding the following ...
8
votes
1answer
253 views

Python librdkafka producer perform against the native Apache Kafka Producer

I am testing Apache Kafka Producer with native java implementation against Python's confluent-kafka to see which has the maximum throughput. I am deploying a Kafka cluster with 3 Kafka brokers and 3 ...
0
votes
2answers
75 views

kafka produce to topic and write to state store in a single transaction

Is it possible to produce to a Kafka topic and write to state store in a single transaction? But not start the transaction as part of a topic consumption. EDIT: The reason I what to do this is to be ...
2
votes
1answer
192 views

Kafka integration in unity3d throwing Win32Exception error

I am trying to run a code sample of Kafka in unity environment and for this reason, I created a consumer client (Code given below). using System.Collections; using System.Collections.Generic; using ...
1
vote
0answers
276 views

BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is STRING

Data from source system - [email protected] # kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic TTDF.TCDCPOC_DATA_TYPES --from-beginning --property print.key=true --property print....
0
votes
0answers
218 views

Broker Transport Failure while connection to kafka producer

From Node.js application I am trying to connect to Apache Kafka broker using node-rdkafka client .Since the kafka broker lists are SSL enabled hence configuring the node-rdkafka producer with ssl ...
2
votes
1answer
117 views

librdkafka C API Kafka Consumer doesn't read all messages correctly

I am using librdkafka C API consumer (specifically using rd_kafka_consumer_poll to read and I did call rd_kafka_poll_set_consumer before this) Problem I see is that in my google test I do following ...
0
votes
0answers
204 views

Kafka consumer does not start processing from offset#0

I have a topic with single partition. I have 2 different consumer groups. Each consumer group has a consumer. Consumer group#1 has been running ever since; I started producer. On the other hand, ...
1
vote
1answer
117 views

How can I ensure my consumers process messages in kafka topics in order, only once?

I've never used kafka before. I have two test Go programs accessing a local kafka instance: a reader and a writer. I'm trying to tweak my producer, consumer, and kafka server settings to get a ...
1
vote
1answer
87 views

Disable auto topic creation when calling GetMetadata

I am using confluent golang for my Kafka client. I use AdminClient to create/delete/get topics in kafka cluster. Here is my code to initialize AdminClient adminClient, err := kafka.NewAdminClient(&...
1
vote
1answer
324 views

Kafka Python producer integration with django web app

I have a question on how can we integrate kafka producer with a front end web app. get the data for every minute or second . Can the web app pass the JSON object to a running producer each time the it ...
0
votes
0answers
36 views

Confluent.Kafka consumer - Safely shutdown if there are no messages in the Topic

I have implemented a Kafka consumer using .net core 2.1 with Confluent.Kafka SDK(C#). The poll function is set to run for a time of 10 minutes and the code is deployed as a AWS Lambda function which ...
0
votes
1answer
44 views

Trying to produce message to a kafka topic for every iteration but looks like I end up sending no msg to consumer

Not able to write message into kafka topic (producer) when calling kakfa produce class with a loop. I'm very new to Python and Kafka. I'm trying to write a python program to write messages into a ...
0
votes
0answers
71 views

Kafka MQTT sink can't forward to different topics

I'm trying to send MQTT messages to different topics using Kafka. I'm using Confluent MqttSinkConnector and my connector config looks like this { "name" : "mqttSinkConnector", "config" : { "...
0
votes
1answer
407 views

Is there any way to check if all the brokers are up and running in Kafka?

I am using Confluent.kafka for my c# publisher and consumer application. Before I publish or consume any message, I would like to check if all the brokers (Endpoints) are up and running. I have found ...
7
votes
1answer
472 views

How to Consume from specific TopicPartitionOffset with Confluent.Kafka in .Net

I need my consumer to consume from an specific TopicPartitionOffset(here from offset 278). Suppose that Messages have been produced by some Producer in Specific topic like ="Test_1" before. Here is ...
0
votes
1answer
188 views

Clear kafka topic programmatically using C#

I need to clear or delete Kafka topics programmatically using C# language. Currently, I have used Confluent.Kafka library for publishing and consuming Kafka topics. I can delete Kafka topics using ...
0
votes
1answer
136 views

Not to able to get multiple records through poll api in confluent kafka python client

I am using confluent kafka python library to consume messages, I want to get multiple records in poll call. but somehow I am getting only a single record at a time. from confluent_kafka import ...
1
vote
0answers
71 views

Confluent Kafka: Lag in Kafka Consumer when streaming a webcam feed

I am working on Streaming my webcam feed to kakfa consumer to do some facial sentiment analysis and store it to DB. However, I am noticing a considerable lag in Consumer side (3 Sec). In the process ...