Взаимодействие между сервисами 🤝#
В предыдущем разделе вы создали приложение, которое принимает сообщения из очереди input-queue и логирует их обработку. Теперь пора узнать, как сервисы могут взаимодействовать через брокер сообщений! В этом разделе мы создадим два приложения: одно с двумя подписчиками для обработки сообщений внутри сервиса, другое — для обработки результатов в отдельном сервисе. Это наглядно покажет, как микросервисы обмениваются данными через RabbitMQ. Готовы соединить сервисы? Погнали! 🚀
Что мы сделаем? 📋#
Мы создадим два приложения:
- Первое приложение (
app.py):- Слушает очередь
input-queue, принимает текстовое сообщение и преобразует его в верхний регистр 🔠. - Отправляет результат в очередь
output-queueиfinal-queue📤. - Слушает очередь
output-queueи логирует промежуточный результат 📥.
- Слушает очередь
- Второе приложение (
app2.py):- Слушает очередь
final-queueи логирует финальный результат 📥.
- Слушает очередь
Это пример взаимодействия микросервисов: один сервис обрабатывает входные данные и передает их дальше, другой потребляет результаты. Мы используем две очереди (output-queue и final-queue), чтобы показать цепочку обработки.
Шаг 1: Написание кода для первого сервиса ✍️#
Обновите файл app.py в папке faststream-tutorial, добавив два подписчика:
Что здесь происходит? 🤔
RabbitBroker: Объект для подключения к RabbitMQ с адресомamqp://guest:guest@localhost:5672/.FastStream: Основное приложение, управляющее брокером.@broker.subscriber("input-queue"): Декоратор, реализованный как метод брокера, регистрирует функциюhandle_messageкак подписчика на очередьinput-queue. Она принимает сообщение, логирует его и отправляет обработанную версию вoutput-queueиfinal-queue.broker.publish: Метод для отправки обработанного сообщения в две очереди.@broker.subscriber("output-queue"): Декоратор, регистрирующий функциюcheck_resultкак подписчика на очередьoutput-queue. Она логирует промежуточный результат.handle_messageиcheck_result: Функции-обработчики, представляющие внутреннюю логику первого сервиса.
Tip
Альтернативный способ отправки 📤
Вместо broker.publish в handle_message можно использовать декоратор @broker.publisher:
@broker.subscriber("input-queue")
@broker.publisher("output-queue")
@broker.publisher("final-queue")
async def handle_message(msg: str, logger: Logger) -> str:
logger.info(f"Получено сообщение: {msg}")
return f"Обработано: {msg.upper()}"
Порядок декораторов 🔍
Если используете @broker.publisher, порядок важен:
@broker.subscriber— внешний (выше), регистрирует функцию как подписчика.@broker.publisher— внутренний (ниже), обрабатывает возвращаемое значение для публикации.
Можно использовать несколько @broker.publisher для разных очередей, как в примере выше. В Python декораторы выполняются снизу вверх, поэтому @broker.publisher оборачивает функцию первой, а @broker.subscriber — последней.
Шаг 2: Написание кода для второго сервиса ✍️#
Создайте файл app2.py в папке faststream-tutorial для второго сервиса:
Что здесь происходит? 🤔
@broker.subscriber("final-queue"): Декоратор, регистрирующий функциюfinal_resultкак подписчика на очередьfinal-queue.final_result: Функция-обработчик, которая логирует сообщения изfinal-queue, представляя второй сервис, потребляющий результаты первого.
Tip
Почему два сервиса? 🌐
app.py: Первый сервис обрабатывает входные данные (input-queue) и публикует результаты вoutput-queue(для внутреннего использования) иfinal-queue(для внешнего сервиса).app2.py: Второй сервис потребляет данные изfinal-queue, представляя независимый микросервис (например, для отправки уведомлений).
Это демонстрирует реальную микросервисную архитектуру, где сервисы взаимодействуют через брокер.
Шаг 3: Создание скрипта для отправки сообщений 🔌#
Чтобы протестировать взаимодействие сервисов, создадим скрипт для отправки тестового сообщения в input-queue. Создайте файл send_message.py в папке faststream-tutorial:
Почему контекстный менеджер? 🔍 Скрипт отправляет сообщение с broker.publish, не используя подписчиков, поэтому async with вызывает только broker.connect() и broker.close(). Это эффективно для одноразовой отправки.
Шаг 4: Запуск приложений ▶️#
Убедитесь, что RabbitMQ запущен (см. Установка и настройка). Если используете Docker, проверьте контейнер:
Запустите оба приложения в отдельных терминалах:
-
Для первого сервиса (
app.py):Вывод:
-
Для второго сервиса (
Вывод:app2.py):
Оба сервиса теперь слушают свои очереди и готовы к взаимодействию! 🎉
Шаг 5: Тестирование взаимодействия 📬#
Отправьте тестовое сообщение, запустив скрипт:
Вывод в терминале app.py:
INFO - input-queue | - Получено сообщение: Привет, FastStream!
INFO - output-queue | - Промежуточный результат: Обработано: ПРИВЕТ, FASTSTREAM!
Вывод в терминале app2.py:
Что произошло? 🔄
- Скрипт
send_message.pyотправил сообщение вinput-queue. - Первый сервис (
app.py): - Подписчик
handle_messageобработал сообщение изinput-queueи отправил результат вoutput-queueиfinal-queue. - Подписчик
check_resultполучил сообщение изoutput-queueи залогировал его как промежуточный результат. - Второй сервис (
app2.py): - Подписчик
final_resultполучил сообщение изfinal-queueи залогировал его как финальный результат.
Это показывает, как два независимых сервиса взаимодействуют через брокер, обмениваясь данными через очереди! 🌐
Шаг 6: Практическое задание: Закрепите знания:#
- Измените
final_resultвapp2.py, чтобы она добавляла другой префикс, например,Получено в сервисе B: {msg}. - Перепишите
handle_messageвapp.pyс@broker.publisherдля обеих очередей (output-queueиfinal-queue), сохранив правильный порядок декораторов, и проверьте результат. - (Дополнительно) Создайте третий сервис (
app3.py), который слушает новую очередь (например,archive-queue), кудаapp2.pyбудет отправлять сообщения после обработки. Обновитеapp2.py, чтобы публиковать вarchive-queue.
Что дальше? 🗺️#
Вы научились создавать микросервисы, которые взаимодействуют через брокер сообщений! 🎉 Это основа для построения масштабируемых систем. В следующем разделе мы добавим структурированные сообщения с помощью Pydantic, чтобы сделать данные более надежными. Перейдите к Структурированные сообщения с Pydantic, чтобы освоить валидацию JSON.
Если у вас есть идеи, вопросы или нужна помощь, загляните в официальную документацию FastStream, пишите в Telegram или Discord. Продолжайте соединять сервисы! 🚀