Kafka patterns for microservices
Обеспечиваем консистентность между сервисами с помощью Kafka
Что имеем:
- Распределенная микросервисная система
- Базы данных
- Другие системы ходят к нам за данными [1]
Transactional outbox (и inbox)
Outbox
Следует начать с того, что существует шаблон проектирования “Исходящие” или шаблон “Исходящих сообщений” для того, чтобы обеспечивать гарантированную доставку. Он представляет собой таблицу Outbox, в которую приложение пишет данные, а другое приложение/процесс их вычитывает. Такая таблица “ожидания” нужна для того, чтобы, в случае сбоя операции/отправки, сообщение можно было снова найти в таблице и попробовать снова. После успешной обработки сообщение будет помечено “ОТПРАВЛЕНО” и позже может быть удалено.
Этот паттерн играет роль “передержки” сообщений (или “очереди на отправку”) на случай, если сторона-получатель будет недоступна.
Transactional Outbox
А есть Транзакционный аутбокс, который предназначен не сколько для гарантии доставки, сколько для сохранения консистентности данных между двумя сервисами, точнее, чтобы сообщение-событие (обычно это история обработки событий), которое пришло в сервис А сделало два действия: сохранилось у него и точно дошло до сервиса B.
Для этого мы записываем сообщение к себе в базу и в таблицу Outbox в транзакции, поэтому в случае перебоя с базой, сообщение полностью откатится: и у сервиса А не сохранится и не будет отправлено внешнему сервису (обработку нужно снова повторять, зато и себе не записали и ложно не оповестили другого). Поэтому паттерн и называется транзакционным: в нашу базу (сервиса А) и в базу “ожидания для отправки” сообщение должно сохраниться атомарно.
А как теряется эта ваша консистентность? Очень просто:
Вот недоступен брокер (или внешний сервис) / ошибка отправки. В базу A сохранили, а в сервис B сообщение не получилось доставить. Рассинхрон.
Вот недоступна база. Отправили сообщение, но сами о нем не знаем.
Ошибка в одной из этих операций может привести к неконсистентным данным.
В книге “кабанчика” этому паттерну дано следующее определение: Transactional Outbox - паттерн для обеспечения гарантированной доставки сообщений в распределенных системах при интеграции с использованием асинхронных коммуникаций, таких как очереди сообщений (напр. Rabbit) или лог изменений (напр. Kafka).
Да, аналогично с брокером сообщений:
Вместо того, чтобы отправить сообщение напрямую, наш продьюсер забирает данные из таблицы Outbox, которая синхронизирована с таблицей нашего сервиса, и отдает брокеру подготовленные сообщения на отправку. И состояние сохранилось и публикацию совершили.
Этими двумя паттернами обеспечивается “гарантированная доставка” сообщения принимающей стороне (гарантия похожа на то, что Kafka называется “at-least-once”, нестрогая доставка (“по крайней мере один раз будет отправлено, но может быть потеряно, но чтобы этого не случилось, может быть отправлено больше одного раза (ретрай)”). Ретраи обеспечиваются как раз таблицей Outbox, которая, в случае ошибки отправки сообщения получателю, оставляет его у себя в статусе “В ПРОЦЕССЕ”, пока отправщик в конце концов не добьется успешного OK или ACK (от брокера) и не обновит у себя статус “ОТПРАВЛЕНО”.
Вроде бы все четко: сообщение отправляем, не отправилось, пробуем снова, рано или поздно сообщение будет получено внешним сервисом.
Но вот мы получили заветный ACK, брокер или сервис ответили, что получили сообщение, и мы идем в Outbox обновлять статус уже отправленных сообщений или удалять их.. но неожиданно получаем ошибку, откатанную обратно транзакцию, но сообщения не помечены статусом “ОТПРАВЛЕНО”, и наш исполнитель снова зайдет забирать эти сообщения! И вот получателю улетает дубль..
В этом обновлении отправленных сообщений у Outbox и кроется уязвимое место шаблона.
+ proc:
Обычно паттерн Transactional Outbox используется в следующих случаях:
- Вы создаете приложение, в котором обновление базы данных инициирует уведомление о событии.
- Вы хотите обеспечить атомарность операций, которые затрагивают два сервиса.
- Вы хотите реализовать паттерн event sourcing. [2]
- cons:
-
проблема нагрузки на базу: запросы в таблицу outbox на деле очень нагружают базу: приходится соблюдать баланс между тем, чтобы не задерживать отправку, делая редкие запросы, и тем, чтобы опрашивать достаточно часто, но, чтобы частыми SELECT’ами (например, обычно такие интервалы бывают < 1 секунды) не нагружать базу, подобрать интервал. Решением может стать забор сообщений большими партиями, но тогда нужно учитывать некоторый риск, что если отправка пройдет неудачно, это количество сообщений не будет помечено “ОТПРАВЛЕНО”, а останется “В ПРОЦЕССЕ”, и таким образом задержит отправку, так как эту весомую пачку придется пытаться отправить снова; это проигрывает маленьким частым партиям. [3]
-
дубликаты сообщений: сервис может отправлять дубликаты сообщений или событий, поэтому рекомендуется сделать потребляющий сервис идемпотентным, отслеживая обработанные сообщения на его стороне.
-
порядок сообщений: отправляйте сообщения или события в том же порядке, в котором сервис обновляет базу данных. Это критично для паттерна event sourcing, где вы можете использовать хранилище событий для восстановления данных на определенный момент времени. Если порядок неверен, это может все испортить. При этом, eventual consistency и откат транзакций могут усугубить проблему, если порядок уведомлений не сохраняется.
-
откат транзакций: если транзакция откатывается, сообщение не должно отправляться.
-
транзакционность только в рамках одного сервиса: паттерн Outbox не решает проблему транзакционной целостности между несколькими сервисами. Для этого используйте паттерн saga orchestration. [4]
Transactional Outbox
- Микросервисный паттерн событийно-ориентированной архитектуры (event-driven architecture)
- В книге “кабанчика” называается шаблоном “публикации событий”
- для архитектур с брокером, где нужен at-least-once
inbox
…или “гарантированное получение”.
В каких то случаях нам нужно убедиться, что некоторый процесс, запущенный полученным сообщением успешно завершился. В случае, если мы получаем сообщение, но процесс завершился неуспешно (или в принципе достаточно долгий, что держит наше соединение с отправителем), мы теряем это сообщение-событие. На помощь нам приходит паттерн Inbox (“Входящие”).
В качестве первого шага мы создаем таблицу, которая работает как почтовый ящик для наших сообщений. Затем, после получения нового сообщения, мы не начинаем обработку сразу, а только вставляем сообщение в таблицу и подтверждаем. Наконец, фоновый процесс извлекает строки из таблицы Inbox в удобном темпе. После завершения работы соответствующая строка в таблице может быть обновлена, чтобы пометить задание как выполненное (или просто удалить из папки “Входящие”).
Здесь не будет transactional inbox, но нужно отметить, что в таблицу Inbox также открывается транзакция и в зависимости от успеха/неуспехва вставки записи, мы отвечаем “отправителю”, нужно ли переотправить сообщение.
Заметка из @MicroservicesThoughts:
У транзакционного инбокса можно выделить два вида:
Например, консьюмеру необходимо ровно один раз обработать сообщение
processMessage() {
databaseTx { // Может случиться ситуация, что databaseTx закоммитилась, но
…
}
message.commit() //message.commit() не отработал
}
//из-за этого мы снова обработаем то же сообщение
1) на основе ключа сообщения
processMessage() { // По-прежнему сначала обрабатываем сообщение
databaseTx {
if (!tryInsert(msgKey)) { //Но добавляем дедупликацию
message.commit()
return
}
…
}
message.commit() //потом коммитим
}
В таком случае даже если databaseTx закоммитилась, но message.commit() не отработал, то при повторном чтении мы увидим сохраненный ключ сообщения, и сразу его закоммитим
tryInsert ~ insert on conflict do nothing, который либо ничего не вставляет, либо вставляет и держит блокировку на ключ до окончания транзакции
В первом варианте подразумевается, что процессим сообщение прямо в рамках датабазной транзакции, поэтому если она упадёт, то и ключ в базу не вставится, и сообщение не подтвердится
Долгие транзакции действительно проблема, если нужно делать какие-то тяжелые вещи, делать кучу походов вовне. Но можно постараться вынести их из транзакции по возможности. Зачастую обработка сообщения — это локальная транзакция + коммит сообщения в transactional outbox, в таком случае всё будет ок
- В этом варианте же есть еще момент, что мы держим соединение к базе на все время обработки события. Еще как вариант без транзакций, если 100%-ая гарантия не требуется, то можно перед обработкой положить msgKey в redis set, если получилось - обрабатываем, иначе скип
2) используем таблицу
Сохраняем сообщение в таблицу, и фоновые воркеры достают сообщения из таблицы и обрабатывают
processMessage() {
databaseTx {
tryInsert(message)
}
message.commit()
}
pros: Несмотря на то, что такой подход решает ту же проблему, еще и при этом добавляет latency, у него есть весомый плюс — консюмер теперь может балансировать нагрузку на себя
Причем это работает в обе стороны:
1) Например, если сообщения в нас отправляют по http со слишком высоким рейтом, то мы просто сохраняем их в таблицу и процессим с доступной нам скоростью
2) И наоборот: если сообщения мы сами читаем из топика, но у топика слишком мало партиций, и существующие консюмеры не успевают обрабатывать приходящие сообщения, то можно также их просто сохранить в таблицу, и далее нужным количеством воркеров разгребать эту таблицу
cons: при использовании таблицы, если использвуется > 1 воркера порядок сообщений при вычитке из таблицы может потеряться. Тогда достаточно поддерживать не глобальную очередность, а очередность в рамках какого-то ключа. Например, чтобы события по одному агрегату шли гарантированно очередно. И тогда можно уже делать, чтобы несколько воркеров вычитывали.
Шаблон Retry
https://slurm.io/blog/tpost/yb83iy4db1-ispolzuete-kafka-s-mikroservisami-skoree
функционал отложенной обработки сообщений с помощью Kafka Retry topic: популярное ходовое решение
Одно из популярных решений, которое вы найдете на просторах интернета, включает в себя топик повтора (retry topic). Мелкие детали разнятся от реализации к реализации, но общая канва такова:
Консюмер делает попытку потребить сообщение из главного топика. Если первая попытка не удалась, консюмер публикует сообщение в топик повтора и затем фиксирует смещение сообщения, чтобы иметь возможность продолжить работу со следующим сообщением. На топик повтора подписан консюмер повтора, у которого та же самая логика, что и у основного консюмера. Этот консюмер вводит короткую отсрочку между попытками потребить сообщение. И если этот консюмер также не может удачно потребить сообщение, он публикует сообщение во второй топик повтора и фиксирует смещение сообщения. Так это продолжается с еще некоторым количеством повторных топиков и консюмеров, в каждом случае с увеличивающейся отсрочкой (что служит стратегией отката (backoff strategy)). В конце концов, когда сообщение не удается обработать последнему консюмеру повтора, оно отправляется в очередь отвергаемых сообщений (dead letter queue, DLQ), где его вручную сортируют инженеры.
Change Data Capture - CDC
Более сложный подход к получению данных из таблицы исходящих сообщений называется отслеживанием журналов базы данных. В реляционных базах данных каждая операция записывается в WAL (журнал упреждающей записи). Позже его можно запросить о новых записях, касающихся строк, вставленных в папку исходящих сообщений. Этот вид обработки называется CDC (фиксация изменения данных). Чтобы использовать этот метод, ваша база данных должна предлагать возможности CDC или вам нужно будет использовать какую-то структуру (например, Debezium).
Примеры: Debezium, AWS Database Migration Service, Oracle GoldenGate