Kafka - developer's notes
- Описание работы
- Выдержки из опыта
3 разных способа написания
https://spring.io/projects/spring-cloud-function
spring:
cloud:
function:
definition: someEfficientConsumer
stream:
kafka:
binder:
consumer-properties:
max.poll.records: ${KAFKA_CONSUMER_MAX_POLL_RECORDS:100}
binders:
kafka-ruptures-update-consume:
type: kafka
environment:
spring.cloud.stream.kafka.binder:
brokers: ${KAFKA_BROKERS:localhost:9092}
configuration:
security.protocol: ${KAFKA_SECURITY_PROTOCOL:PLAINTEXT}
bindings:
ruptureStatusConsumer-in-0:
destination: ${KAFKA_TOPIC_RUPTURE_NOTIFY:ruptures-canonical-int}
binder: kafka-ruptures-update-consume
group: ${KAFKA_GROUP_RUPTURE_NOTIFY:ruptures-group}
consumer:
batchMode: true
concurrency: ${KAFKA_RUPTURE_NOTIFY_CONSUMER_THREADS:1}
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
// Spring cloud services allows better configuration for consumers, concurrency, deserialization and less boiler plate code .
// Sample of sink
@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {
public static void main(String[] args)
{
SpringApplication.run(LoggingConsumerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void handle(Person person)
{
System.out.println("Received: " + person);
}
public static
class Person {
private String name;
public String getName()
{
return name;
}
public void setName(String name)
{
this.name = name;
}
public String toString()
{
return this.name;
}
}
}
Sample configuration:
spring:
cloud:
stream:
bindings:
input:
destination:
позволяет быстрее десеарилизовывать сообщения
Kafka Binder Properties spring.cloud.stream.kafka.binder.brokers A list of brokers to which the Kafka binder connects.
1.3.2. Kafka Consumer Properties
To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of spring.cloud.stream.kafka.default.consumer.
Под капотом spring-kafka использует все тот же KafkaConsumer из библиотеки kafka-clients и работа с ним осуществляется в отдельном потоке
Прописываем проперти Консьюмера
When creating your Kafka consumer, you’ll need to configure several properties that influence batch processing:
spring:
kafka:
consumer:
# required
group-id=myGroup
#optional
fetch-min-bytes= # This sets the minimum amount of data the server should return for a fetch request. If set to a higher value, the server will wait until it has enough data to send before responding to the consumer.
fetch-max-bytes= #This controls the maximum amount of data the server should return for a fetch request.
max-partition-fetch-bytes # This sets the maximum amount of data per partition the server will return.
max-poll-records # This controls the maximum number of records a consumer will fetch in a single call to poll().
# также можно настроить batch size
Kafka Consumer Container Factories
https://docs.spring.io/spring-kafka/reference/kafka/container-factory.html
Purpose: Kafka container factories are used to create and configure Kafka consumer containers for processing messages from Kafka topics 12. Types: There are two main types of containers: KafkaMessageListenerContainer: Receives all messages from all topics or partitions on a single thread. ConcurrentMessageListenerContainer: Delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption 2.
Прописываем проперти Продьюсера
When creating your Kafka consumer, you’ll need to configure several properties that influence batch processing:
spring:
kafka:
producer:
# required
group-id=myGroup
#optional
transaction-id-prefix #is defined, a KafkaTransactionManager is automatically configured. Also, if a RecordMessageConverter bean is defined, it is automatically associated to the auto-configured KafkaTemplate
fetch-min-bytes= # This sets the minimum amount of data the server should return for a fetch request. If set to a higher value, the server will wait until it has enough data to send before responding to the consumer.
fetch-max-bytes= #This controls the maximum amount of data the server should return for a fetch request.
max-partition-fetch-bytes # This sets the maximum amount of data per partition the server will return.
max-poll-records # This controls the maximum number of records a consumer will fetch in a single call to poll().
# также можно настроить batch size
Повторная передача сообщений в Spring Kafka
Ситуация гонки в кафке, если нет ключа
если для сообщения нет ключа, оно можт одновременно добавиться в две партиуии?
Kafka batch processing allows consumers to receive multiple messages in a single poll operation, improving efficiency for certain use cases