Взаимодействие между сервисами 🤝#
В предыдущем разделе вы создали приложение, которое принимает сообщения из очереди input-queue
и логирует их обработку. Теперь пора узнать, как сервисы могут взаимодействовать через брокер сообщений! В этом разделе мы создадим два приложения: одно с двумя подписчиками для обработки сообщений внутри сервиса, другое — для обработки результатов в отдельном сервисе. Это наглядно покажет, как микросервисы обмениваются данными через RabbitMQ. Готовы соединить сервисы? Погнали! 🚀
Что мы сделаем? 📋#
Мы создадим два приложения:
- Первое приложение (
app.py
):- Слушает очередь
input-queue
, принимает текстовое сообщение и преобразует его в верхний регистр 🔠. - Отправляет результат в очередь
output-queue
иfinal-queue
📤. - Слушает очередь
output-queue
и логирует промежуточный результат 📥.
- Слушает очередь
- Второе приложение (
app2.py
):- Слушает очередь
final-queue
и логирует финальный результат 📥.
- Слушает очередь
Это пример взаимодействия микросервисов: один сервис обрабатывает входные данные и передает их дальше, другой потребляет результаты. Мы используем две очереди (output-queue
и final-queue
), чтобы показать цепочку обработки.
Шаг 1: Написание кода для первого сервиса ✍️#
Обновите файл app.py
в папке faststream-tutorial
, добавив два подписчика:
Что здесь происходит? 🤔
RabbitBroker
: Объект для подключения к RabbitMQ с адресомamqp://guest:guest@localhost:5672/
.FastStream
: Основное приложение, управляющее брокером.@broker.subscriber("input-queue")
: Декоратор, реализованный как метод брокера, регистрирует функциюhandle_message
как подписчика на очередьinput-queue
. Она принимает сообщение, логирует его и отправляет обработанную версию вoutput-queue
иfinal-queue
.broker.publish
: Метод для отправки обработанного сообщения в две очереди.@broker.subscriber("output-queue")
: Декоратор, регистрирующий функциюcheck_result
как подписчика на очередьoutput-queue
. Она логирует промежуточный результат.handle_message
иcheck_result
: Функции-обработчики, представляющие внутреннюю логику первого сервиса.
Tip
Альтернативный способ отправки 📤
Вместо broker.publish
в handle_message
можно использовать декоратор @broker.publisher
:
@broker.subscriber("input-queue")
@broker.publisher("output-queue")
@broker.publisher("final-queue")
async def handle_message(msg: str, logger: Logger) -> str:
logger.info(f"Получено сообщение: {msg}")
return f"Обработано: {msg.upper()}"
Порядок декораторов 🔍
Если используете @broker.publisher
, порядок важен:
@broker.subscriber
— внешний (выше), регистрирует функцию как подписчика.@broker.publisher
— внутренний (ниже), обрабатывает возвращаемое значение для публикации.
Можно использовать несколько @broker.publisher
для разных очередей, как в примере выше. В Python декораторы выполняются снизу вверх, поэтому @broker.publisher
оборачивает функцию первой, а @broker.subscriber
— последней.
Шаг 2: Написание кода для второго сервиса ✍️#
Создайте файл app2.py
в папке faststream-tutorial
для второго сервиса:
Что здесь происходит? 🤔
@broker.subscriber("final-queue")
: Декоратор, регистрирующий функциюfinal_result
как подписчика на очередьfinal-queue
.final_result
: Функция-обработчик, которая логирует сообщения изfinal-queue
, представляя второй сервис, потребляющий результаты первого.
Tip
Почему два сервиса? 🌐
app.py
: Первый сервис обрабатывает входные данные (input-queue
) и публикует результаты вoutput-queue
(для внутреннего использования) иfinal-queue
(для внешнего сервиса).app2.py
: Второй сервис потребляет данные изfinal-queue
, представляя независимый микросервис (например, для отправки уведомлений).
Это демонстрирует реальную микросервисную архитектуру, где сервисы взаимодействуют через брокер.
Шаг 3: Создание скрипта для отправки сообщений 🔌#
Чтобы протестировать взаимодействие сервисов, создадим скрипт для отправки тестового сообщения в input-queue
. Создайте файл send_message.py
в папке faststream-tutorial
:
Почему контекстный менеджер? 🔍 Скрипт отправляет сообщение с broker.publish
, не используя подписчиков, поэтому async with
вызывает только broker.connect()
и broker.close()
. Это эффективно для одноразовой отправки.
Шаг 4: Запуск приложений ▶️#
Убедитесь, что RabbitMQ запущен (см. Установка и настройка). Если используете Docker, проверьте контейнер:
Запустите оба приложения в отдельных терминалах:
-
Для первого сервиса (
app.py
):Вывод:
-
Для второго сервиса (
Вывод:app2.py
):
Оба сервиса теперь слушают свои очереди и готовы к взаимодействию! 🎉
Шаг 5: Тестирование взаимодействия 📬#
Отправьте тестовое сообщение, запустив скрипт:
Вывод в терминале app.py
:
INFO - input-queue | - Получено сообщение: Привет, FastStream!
INFO - output-queue | - Промежуточный результат: Обработано: ПРИВЕТ, FASTSTREAM!
Вывод в терминале app2.py
:
Что произошло? 🔄
- Скрипт
send_message.py
отправил сообщение вinput-queue
. - Первый сервис (
app.py
): - Подписчик
handle_message
обработал сообщение изinput-queue
и отправил результат вoutput-queue
иfinal-queue
. - Подписчик
check_result
получил сообщение изoutput-queue
и залогировал его как промежуточный результат. - Второй сервис (
app2.py
): - Подписчик
final_result
получил сообщение изfinal-queue
и залогировал его как финальный результат.
Это показывает, как два независимых сервиса взаимодействуют через брокер, обмениваясь данными через очереди! 🌐
Шаг 6: Практическое задание: Закрепите знания:#
- Измените
final_result
вapp2.py
, чтобы она добавляла другой префикс, например,Получено в сервисе B: {msg}
. - Перепишите
handle_message
вapp.py
с@broker.publisher
для обеих очередей (output-queue
иfinal-queue
), сохранив правильный порядок декораторов, и проверьте результат. - (Дополнительно) Создайте третий сервис (
app3.py
), который слушает новую очередь (например,archive-queue
), кудаapp2.py
будет отправлять сообщения после обработки. Обновитеapp2.py
, чтобы публиковать вarchive-queue
.
Что дальше? 🗺️#
Вы научились создавать микросервисы, которые взаимодействуют через брокер сообщений! 🎉 Это основа для построения масштабируемых систем. В следующем разделе мы добавим структурированные сообщения с помощью Pydantic, чтобы сделать данные более надежными. Перейдите к Структурированные сообщения с Pydantic, чтобы освоить валидацию JSON.
Если у вас есть идеи, вопросы или нужна помощь, загляните в официальную документацию FastStream, пишите в Telegram или Discord. Продолжайте соединять сервисы! 🚀