• Описание работы
  • Выдержки из опыта

3 разных способа написания

https://spring.io/projects/spring-cloud-function

 spring:
   cloud:
     function:
       definition: someEfficientConsumer
     stream:
      kafka:
        binder:
          consumer-properties:
            max.poll.records: ${KAFKA_CONSUMER_MAX_POLL_RECORDS:100}
      binders:
        kafka-ruptures-update-consume:
          type: kafka
          environment:
            spring.cloud.stream.kafka.binder:
              brokers: ${KAFKA_BROKERS:localhost:9092}
              configuration:
                security.protocol: ${KAFKA_SECURITY_PROTOCOL:PLAINTEXT}
      bindings:
        ruptureStatusConsumer-in-0:
          destination: ${KAFKA_TOPIC_RUPTURE_NOTIFY:ruptures-canonical-int}
          binder: kafka-ruptures-update-consume
          group: ${KAFKA_GROUP_RUPTURE_NOTIFY:ruptures-group}
          consumer:
            batchMode: true
            concurrency: ${KAFKA_RUPTURE_NOTIFY_CONSUMER_THREADS:1}
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

// Spring cloud services allows better configuration for consumers, concurrency, deserialization and less boiler plate code .


       // Sample of sink

@SpringBootApplication
@EnableBinding(Sink.class)
        public class LoggingConsumerApplication {

    public static void main(String[] args)
    {
        SpringApplication.run(LoggingConsumerApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void handle(Person person)
    {
        System.out.println("Received: " + person);
    }

    public static
    class Person {
        private String name;
        public String getName()
        {
            return name;
        }
        public void setName(String name)
        {
            this.name = name;
        }
        public String toString()
        {
            return this.name;
        }
    }
}

Sample configuration:

spring: cloud: stream: bindings: input: destination: group: consumer: headerMode: raw partitioned: true concurrency: 10 kafka: binder: brokers: More information available here https://cloud.spring.io/spring-cloud-stream/

позволяет быстрее десеарилизовывать сообщения

Kafka Binder Properties spring.cloud.stream.kafka.binder.brokers A list of brokers to which the Kafka binder connects.

1.3.2. Kafka Consumer Properties To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of spring.cloud.stream.kafka.default.consumer.=.

Под капотом spring-kafka использует все тот же KafkaConsumer из библиотеки kafka-clients и работа с ним осуществляется в отдельном потоке

Прописываем проперти Консьюмера

When creating your Kafka consumer, you’ll need to configure several properties that influence batch processing:

 spring:
   kafka:
     consumer:
      # required
      group-id=myGroup
       
      #optional 
      fetch-min-bytes= # This sets the minimum amount of data the server should return for a fetch request. If set to a higher value, the server will wait until it has enough data to send before responding to the consumer.
      fetch-max-bytes= #This controls the maximum amount of data the server should return for a fetch request.
      max-partition-fetch-bytes # This sets the maximum amount of data per partition the server will return.
      max-poll-records # This controls the maximum number of records a consumer will fetch in a single call to poll().
# также можно настроить batch size

Kafka Consumer Container Factories

https://docs.spring.io/spring-kafka/reference/kafka/container-factory.html

Purpose: Kafka container factories are used to create and configure Kafka consumer containers for processing messages from Kafka topics 12. Types: There are two main types of containers: KafkaMessageListenerContainer: Receives all messages from all topics or partitions on a single thread. ConcurrentMessageListenerContainer: Delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption 2.

Прописываем проперти Продьюсера

When creating your Kafka consumer, you’ll need to configure several properties that influence batch processing:

 spring:
   kafka:
     producer:
      # required
      group-id=myGroup
       
      #optional 
      transaction-id-prefix #is defined, a KafkaTransactionManager is automatically configured. Also, if a RecordMessageConverter bean is defined, it is automatically associated to the auto-configured KafkaTemplate
      fetch-min-bytes= # This sets the minimum amount of data the server should return for a fetch request. If set to a higher value, the server will wait until it has enough data to send before responding to the consumer.
      fetch-max-bytes= #This controls the maximum amount of data the server should return for a fetch request.
      max-partition-fetch-bytes # This sets the maximum amount of data per partition the server will return.
      max-poll-records # This controls the maximum number of records a consumer will fetch in a single call to poll().
# также можно настроить batch size

Повторная передача сообщений в Spring Kafka

Ситуация гонки в кафке, если нет ключа


если для сообщения нет ключа, оно можт одновременно добавиться в две партиуии?

Kafka batch processing allows consumers to receive multiple messages in a single poll operation, improving efficiency for certain use cases