Проблема

Давайте сначала подготовим сцену.

Предположим, у нас есть платформа, на которой пользователи могут загружать видео, а другие пользователи могут просматривать эти видео (вроде Youtube). Сейчас нас интересует только один поток, то есть загрузка видеопотока.

В нашей системе есть простой UploadService, который принимает загруженное видео. Чтобы отправлять уведомления подписчикам пользователя, он отправляет HTTP-запрос в SubscriberService. Он также отправляет HTTP-запрос к AnalyzeService, который запускает алгоритм для анализа видео. Этот алгоритм может анализировать метаданные о видео, возможно, страну пользователя, продолжительность, описание и т. д. для рекомендательной системы нашей платформы. Это платформа, которая у нас есть прямо сейчас.

Однако время идет, и команда разработчиков добавляет требование по созданию системы комментариев. Поэтому наша команда быстро создает сервис комментариев. Однако затем они упоминают, что эти CommentService также должны взаимодействовать с AnalyzerService, поскольку они также хотят анализировать комментарии. Хорошо, мы можем сделать и это, мы просто отправим еще один HTTP-запрос из нового CommentService в AnalyzerService, и это должно сработать.

Через пару месяцев появится новое требование: пользователи могут подписаться на список рассылки для каналов. Когда выходит новое видео, они хотят получить электронное письмо для этого видео (хотя я не совсем уверен, зачем пользователям нужно электронное письмо, когда у них уже есть функция подписки, но разработчики продукта вежливо попросили, так что мы приступаем к кодированию!)

Хорошо, на данный момент наш код в UploadService выглядит довольно беспорядочно.

const uploadAPI = () => {
  // Send a request to Analyzer service
  let res = sendToAnalyzeService()
  // Send a request to Subscriber service
  res = sendToSubscriberService()
  // Send a request to Mail service
  res = sendToMailService()
}

Мало того, что мы должны думать о многих пограничных случаях, когда мы кодируем UploadService, что, если служба подписчика выйдет из строя? Что, если почтовая служба будет слишком долго отвечать? Что, если служба анализатора отправит статус 400 код? (Представьте, что вы разработчик, пишущий модульные тесты для этой функции) И если есть какие-либо изменения в маршрутизации, например, может быть, нам больше не нужно анализировать загрузку видео, нам также нужно внести изменения на уровне кода в службу загрузки. Все это начинает казаться пустяком.

Теперь представьте себе большую систему с большим количеством компонентов, и вы сможете увидеть проблемы, связанные с такой архитектурой.

Отказывается одна служба…

И ошибка либо распространяется на те службы, которые ее вызвали, либо им требуется обработка ошибок, чтобы исправить это.

Все это потому, что наши услуги тесно связаны друг с другом.

Очевидно, у нас могут быть тайм-ауты, обработка ошибок (и мы все равно должны делать все это), но основная проблема все еще существует, почему конкретная служба, скажем, UploadService должна быть осторожна или обрабатывать проблемы, которые могут возникнуть в совершенно отдельной службе, например АнализаторСервис. Почему он вообще должен заботиться или знать об AnalyzerService?! Задача UploaderService — просто загружать видео и сохранять их, а не пинговать все остальные службы в системе.

Если UploadService должен был взаимодействовать с таким количеством сервисов, разработчикам UploadService было бы трудно писать код для такого количества пограничных случаев и писать модульные тесты, а затем отлаживать, когда модульный тест терпит неудачу, и т. д.

Такую архитектуру трудно масштабировать, и для ее надежности потребуется много тестов и кода.

Представляем архитектуру, управляемую событиями

Что, если бы мы могли избавить наш UploadService от необходимости постоянно обновлять всех?

Если вы думаете об этих службах как об объектах, работающих над своими собственными проблемами, имеющих свои собственные обязанности, на самом деле задача службы загрузки не должна заключаться в том, чтобы сообщать AnalyzerService, что есть некоторые данные, готовые для анализа, или SubscriberService, что они должны отправлять уведомления пользователям. Если бы все, что он мог сделать, это прокричать: «Загрузка видео пользователем XYZ завершена!!» и все службы могли бы услышать это, и любой заинтересованный мог бы затем действовать в соответствии с этой информацией, это значительно облегчило бы его работу. Это было бы похоже на отправку группового сообщения в Slack о проекте, который касается 10 человек, вместо того, чтобы отправлять сообщения каждому из них по отдельности.

Это также кажется более… разумным. Это означает, что UploadService больше не несет ответственности за обновление всех. Ему просто нужно сделать свою работу, а потом кричать. Любой заинтересованный может прослушать и посмотреть новое видео, а те, кому это не интересно, могут просто игнорировать UploadService. Это также означает, что, поскольку UploadService не заботится ни о каких других службах, ему также все равно, если они сломаются или перестанут работать. На самом деле, сервисы могут быть недоступны для всех, кого это волнует.

Кроме того, у нас может быть что-то вроде централизованной системы маршрутизации, в которой каждый сервис может решать, кого он слушает, а кого игнорирует. Любая служба может выбрать, что она хочет слушать, и эта централизованная система может организовывать отправку сообщений всем заинтересованным службам, когда что-то происходит.

Вот как это выглядит на нашей архитектурной схеме

Паб-подсистема

Подсистемы пабов, такие как Kafka, Redis или RabbitMQ, предоставляют нам возможность разрабатывать эти слабо связанные сервисы.

А пока давайте просто поговорим о Redis.

В Redis издатели могут публиковать сообщения на канале, а потребители могут прослушивать определенные каналы, которые им интересны. Это означает, что UploadService может публиковать сообщения о новых загрузках видео в канал UploadChannel и любую службу, которая может быть в этом заинтересована. может подписаться на этот канал. Когда загружается новое видео, потребитель, подписанный на этот канал, получает уведомление о загрузке нового видео.

И Redis очень прост в использовании. Потребитель просто делает

SUBSCRIBE UploadChannel

Когда новое видео загружается, UploadService просто запускается

PUBLISH UploadChannel '{"video": "https://s3../video_bucket/1.mp4"}'

И это работает… по большей части.

Давайте обсудим несколько вопросов, возникающих при таком подходе.

Асинхронная связь

Что, если MailService отключится на техническое обслуживание, а пользователь загрузит новое видео? Почтовый сервис пропускает видео, которое загрузил пользователь. И поскольку почтовая служба не делает что-то очень критичное по времени, это нормально, если она отключается на несколько секунд, пока она может читать старые загрузки после резервного копирования. Но с Redis Pub/Sub это просто невозможно. Это скорее огонь и забвение. Он отправляет сообщение подписчикам, которые слушают в данный момент, и любой подписчик, который не слушает в этот момент, никогда больше не услышит это же сообщение.

Различные схемы доступа

Также весьма вероятно, что разные потребители могут иметь разные модели доступа. Например, может быть, мы запускаем модель ИИ с интенсивными вычислениями в сервисе анализатора. Поскольку этот сервис не делает ничего критичного по времени, для экономии затрат на сервер мы запускаем этот алгоритм ночью или в нерабочее время. Это по-прежнему отвечает требованиям продукта, потому что, возможно, наша система рекомендаций допускает задержку до 24 часов. Но если мы будем использовать эту систему Pub/Sub, то мы просто не сможем этого сделать. Мы должны запустить наш AnalyzerService 24/7, чтобы не пропустить ни одной загрузки видео.

Хранилище данных

В зависимости от нашего варианта использования также может быть важно сохранить эти сообщения. Может быть, наша продуктовая команда заинтересована в анализе пользовательских паттернов, а может быть, нам нужно что-то гораздо более интересное, например, поиск событий.

Давайте поговорим о том, как Redis Streams может помочь нам решить эти проблемы.

Потоки

Поток Redis работает и как решение для хранения данных, и как система pub/sub. Идея состоит в том, что издатели публикуют сообщения в поток, а потребители слушают эти потоки (аналогично тому, что мы делали в каналах раньше). Ключевое отличие состоит в том, что эти сообщения сохраняются в системе, а система только добавляется.

Это кажется незначительным изменением, но оно может иметь огромные последствия для нашей архитектуры. Наша архитектура больше не работает по принципу «выстрелил и забыл», а содержит сообщения, которые сохраняются в ней. Это означает, что потребители могут решить, какое сообщение читать и когда.

Хорошей аналогией может быть телефонная конференция или групповой текст. Системы Pub/Sub больше похожи на конференцию друзей, в которой участвуют 5 человек. Некоторые из них могут быть доступны в это время и могут присоединиться, а некоторые могут быть недоступны и в конечном итоге пропустят звонок. Вместо этого потоки Redis больше похожи на отправку сообщения в группу WhatsApp. Каждый может решить, когда он хочет прочитать сообщение, а также есть история всех старых сообщений. Некоторые могут быть заняты на работе и читать все сообщения ночью (в отличие от нашего AnalyzerService, у которого тоже есть время только ночью), а некоторые могут быть в сети и читать каждое сообщение по мере его публикации. Вкратце: я никоим образом не утверждаю, что Redis Streams всегда лучше, чем Redis Pub/Sub, я лишь говорю, что иногда лучше звонить людям, а иногда лучше отправлять сообщения.

Возвращаясь к нашей архитектуре с Redis Streams, UploadService просто публикует новое сообщение о видео в этом хранилище данных только для добавления, и потребители могут выбирать, как использовать эти сообщения. Некоторые, например AnalyzerService, могут захотеть использовать старые сообщения, в то время как некоторые, такие как MailService и SubscriberService, могут использовать сообщения в режиме реального времени. Некоторые потребители могут захотеть читать сообщения блокирующим способом, а некоторые могут вместо этого предпочесть неблокирующий способ. Некоторые потребители могут читать сообщения сразу, а некоторые предпочитают сообщения по одному. И все эти сообщения также сохраняются в потоке!

Давайте посмотрим, как это решит все проблемы, с которыми мы сталкивались раньше.

Асинхронная связь

Поскольку потребители сами решают, когда читать сообщения, им не обязательно слушать 24/7. Они могут спуститься вниз, снова вернуться и прочитать все пропущенные сообщения. С Redis Streams, если наш MailService отключится на несколько секунд и пропустит пару загрузок видео, это не будет проблемой, поскольку, когда он снова заработает, он может просто прочитать пропущенные сообщения.

Различные схемы доступа

Потребители могут решить, какие сообщения читать и когда, хотят ли они читать сообщения блокирующим или неблокирующим способом или хотят ли они читать 1, 2, 10 или 100 сообщений одновременно. Теперь мы можем сэкономить деньги, запуская AnalyzerService ночью, а не круглосуточно и без выходных.

Хранилище данных

Все эти сообщения также сохраняются в нашем потоке. В зависимости от нашего варианта использования и требований, это может быть полезной функцией. Теперь наши специалисты по продуктам могут загружать эти данные и анализировать шаблоны доступа пользователей, если захотят. Без Redis Streams нам, возможно, придется настроить другое решение для хранения данных. Мы можем очистить наши потоки или установить максимальное количество сообщений в потоке в зависимости от наших требований.

Заключение

Redis Streams научил меня интересному архитектурному шаблону, с которым я раньше не сталкивался. И кажется, что это имеет большой смысл и предлагает некоторые преимущества для некоторых вариантов использования по сравнению с Pub/Sub. Создание асинхронных сообщений и предоставление потребителям свободы решать, как они хотят использовать сообщения, добавляет потребителям большей гибкости и кажется интуитивно понятным. Однако у всего, связанного с технологиями, есть и недостатки, один из которых я вижу, это управление этими дополнительными данными. Хранение данных в памяти стоит дорого (как и все остальное в Redis, данные в потоках также хранятся в памяти), и нам нужно очистить наши потоки или установить максимальный предел. Это также только добавление, я, конечно, могу видеть случаи использования, когда время вычислений и хранилище будут потрачены впустую на потребление сообщений в реальном времени, которые могут не потребоваться (предположим, что сообщение обновляет сообщение. Если сообщение часто обновляется, например, опрос , то имеет значение только конечное состояние, промежуточные состояния устарели, а их хранение и обработка — пустая трата ресурсов). Как и все остальное в технологиях, мы должны решить, что важно, исходя из наших требований, а чем можно пожертвовать.

Хватит читать, давайте кодить?

Хотя мне нравилось читать и узнавать о потоках Redis, я чувствую, что просто недостаточно хорошо знаю технологию, пока не испачкаю руки кодом. У меня есть проект, который был бы классным небольшим способом протестировать потоки, но если мой послужной список проектов является доказательством, я могу заинтересоваться чем-то еще на полпути проекта (не ведите себя так, как будто вы этого не сделали). это раньше 😜). Но что бы я ни исследовал дальше, я обязательно напишу об этом здесь! Спасибо, что прочитали все это и дайте мне знать, если вам понравилось!