Перейти к содержанию

Взаимодействие между сервисами 🤝#

В предыдущем разделе вы создали приложение, которое принимает сообщения из очереди input-queue и логирует их обработку. Теперь пора узнать, как сервисы могут взаимодействовать через брокер сообщений! В этом разделе мы создадим два приложения: одно с двумя подписчиками для обработки сообщений внутри сервиса, другое — для обработки результатов в отдельном сервисе. Это наглядно покажет, как микросервисы обмениваются данными через RabbitMQ. Готовы соединить сервисы? Погнали! 🚀

Что мы сделаем? 📋#

Мы создадим два приложения:

  • Первое приложение (app.py):
    • Слушает очередь input-queue, принимает текстовое сообщение и преобразует его в верхний регистр 🔠.
    • Отправляет результат в очередь output-queue и final-queue 📤.
    • Слушает очередь output-queue и логирует промежуточный результат 📥.
  • Второе приложение (app2.py):
    • Слушает очередь final-queue и логирует финальный результат 📥.

Это пример взаимодействия микросервисов: один сервис обрабатывает входные данные и передает их дальше, другой потребляет результаты. Мы используем две очереди (output-queue и final-queue), чтобы показать цепочку обработки.

Шаг 1: Написание кода для первого сервиса ✍️#

Обновите файл app.py в папке faststream-tutorial, добавив два подписчика:

app.py
from faststream import FastStream, Logger
from faststream.rabbit import RabbitBroker

# Создаем брокер RabbitMQ
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# Создаем приложение FastStream
app = FastStream(broker)


# Первый подписчик: обрабатывает сообщения из input-queue
@broker.subscriber("input-queue")
async def handle_message(msg: str, logger: Logger) -> None:
    logger.info(f"Получено сообщение: {msg}")
    # Отправляем результат в output-queue и final-queue
    processed_msg = f"Обработано: {msg.upper()}"
    await broker.publish(processed_msg, queue="output-queue")
    await broker.publish(processed_msg, queue="final-queue")


# Второй подписчик: обрабатывает сообщения из output-queue
@broker.subscriber("output-queue")
async def check_result(msg: str, logger: Logger) -> None:
    logger.info(f"Промежуточный результат: {msg}")

Что здесь происходит? 🤔

  • RabbitBroker: Объект для подключения к RabbitMQ с адресом amqp://guest:guest@localhost:5672/.
  • FastStream: Основное приложение, управляющее брокером.
  • @broker.subscriber("input-queue"): Декоратор, реализованный как метод брокера, регистрирует функцию handle_message как подписчика на очередь input-queue. Она принимает сообщение, логирует его и отправляет обработанную версию в output-queue и final-queue.
  • broker.publish: Метод для отправки обработанного сообщения в две очереди.
  • @broker.subscriber("output-queue"): Декоратор, регистрирующий функцию check_result как подписчика на очередь output-queue. Она логирует промежуточный результат.
  • handle_message и check_result: Функции-обработчики, представляющие внутреннюю логику первого сервиса.

Tip

Альтернативный способ отправки 📤
Вместо broker.publish в handle_message можно использовать декоратор @broker.publisher:

@broker.subscriber("input-queue")
@broker.publisher("output-queue")
@broker.publisher("final-queue")
async def handle_message(msg: str, logger: Logger) -> str:
    logger.info(f"Получено сообщение: {msg}")
    return f"Обработано: {msg.upper()}"

Порядок декораторов 🔍
Если используете @broker.publisher, порядок важен:

  • @broker.subscriberвнешний (выше), регистрирует функцию как подписчика.
  • @broker.publisherвнутренний (ниже), обрабатывает возвращаемое значение для публикации.

Можно использовать несколько @broker.publisher для разных очередей, как в примере выше. В Python декораторы выполняются снизу вверх, поэтому @broker.publisher оборачивает функцию первой, а @broker.subscriber — последней.

Шаг 2: Написание кода для второго сервиса ✍️#

Создайте файл app2.py в папке faststream-tutorial для второго сервиса:

app2.py
from faststream import FastStream, Logger
from faststream.rabbit import RabbitBroker

# Создаем брокер RabbitMQ
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# Создаем приложение FastStream
app = FastStream(broker)


# Подписчик: обрабатывает сообщения из final-queue
@broker.subscriber("final-queue")
async def final_result(msg: str, logger: Logger) -> None:
    logger.info(f"Финальный результат: {msg}")

Что здесь происходит? 🤔

  • @broker.subscriber("final-queue"): Декоратор, регистрирующий функцию final_result как подписчика на очередь final-queue.
  • final_result: Функция-обработчик, которая логирует сообщения из final-queue, представляя второй сервис, потребляющий результаты первого.

Tip

Почему два сервиса? 🌐

  • app.py: Первый сервис обрабатывает входные данные (input-queue) и публикует результаты в output-queue (для внутреннего использования) и final-queue (для внешнего сервиса).
  • app2.py: Второй сервис потребляет данные из final-queue, представляя независимый микросервис (например, для отправки уведомлений).

Это демонстрирует реальную микросервисную архитектуру, где сервисы взаимодействуют через брокер.

Шаг 3: Создание скрипта для отправки сообщений 🔌#

Чтобы протестировать взаимодействие сервисов, создадим скрипт для отправки тестового сообщения в input-queue. Создайте файл send_message.py в папке faststream-tutorial:

send_message.py
import asyncio
from faststream.rabbit import RabbitBroker


async def send_message(broker: RabbitBroker) -> None:
    await broker.publish("Привет, FastStream!", queue="input-queue")


async def main() -> None:
    async with RabbitBroker("amqp://guest:guest@localhost:5672/") as broker:
        await send_message(broker)


if __name__ == "__main__":
    asyncio.run(main())

Почему контекстный менеджер? 🔍 Скрипт отправляет сообщение с broker.publish, не используя подписчиков, поэтому async with вызывает только broker.connect() и broker.close(). Это эффективно для одноразовой отправки.

Шаг 4: Запуск приложений ▶️#

Убедитесь, что RabbitMQ запущен (см. Установка и настройка). Если используете Docker, проверьте контейнер:

docker ps

Запустите оба приложения в отдельных терминалах:

  1. Для первого сервиса (app.py):

    faststream run app:app
    

    Вывод:

    INFO     - FastStream app starting...
    INFO     - input-queue  |            - `handle_message` waiting for messages
    INFO     - output-queue |            - `check_result` waiting for messages
    INFO     - FastStream app started successfully! To exit press CTRL+C
    
  2. Для второго сервиса (app2.py):

    faststream run app2:app
    
    Вывод:

    INFO     - FastStream app starting...
    INFO     - final-queue  |            - `final_result` waiting for messages
    INFO     - FastStream app started successfully! To exit press CTRL+C
    

Оба сервиса теперь слушают свои очереди и готовы к взаимодействию! 🎉

Шаг 5: Тестирование взаимодействия 📬#

Отправьте тестовое сообщение, запустив скрипт:

python send_message.py

Вывод в терминале app.py:

INFO     - input-queue  |            - Получено сообщение: Привет, FastStream!
INFO     - output-queue |            - Промежуточный результат: Обработано: ПРИВЕТ, FASTSTREAM!

Вывод в терминале app2.py:

INFO     - final-queue  |            - Финальный результат: Обработано: ПРИВЕТ, FASTSTREAM!

Что произошло? 🔄

  1. Скрипт send_message.py отправил сообщение в input-queue.
  2. Первый сервис (app.py):
  3. Подписчик handle_message обработал сообщение из input-queue и отправил результат в output-queue и final-queue.
  4. Подписчик check_result получил сообщение из output-queue и залогировал его как промежуточный результат.
  5. Второй сервис (app2.py):
  6. Подписчик final_result получил сообщение из final-queue и залогировал его как финальный результат.

Это показывает, как два независимых сервиса взаимодействуют через брокер, обмениваясь данными через очереди! 🌐

Шаг 6: Практическое задание: Закрепите знания:#

  1. Измените final_result в app2.py, чтобы она добавляла другой префикс, например, Получено в сервисе B: {msg}.
  2. Перепишите handle_message в app.py с @broker.publisher для обеих очередей (output-queue и final-queue), сохранив правильный порядок декораторов, и проверьте результат.
  3. (Дополнительно) Создайте третий сервис (app3.py), который слушает новую очередь (например, archive-queue), куда app2.py будет отправлять сообщения после обработки. Обновите app2.py, чтобы публиковать в archive-queue.

Что дальше? 🗺️#

Вы научились создавать микросервисы, которые взаимодействуют через брокер сообщений! 🎉 Это основа для построения масштабируемых систем. В следующем разделе мы добавим структурированные сообщения с помощью Pydantic, чтобы сделать данные более надежными. Перейдите к Структурированные сообщения с Pydantic, чтобы освоить валидацию JSON.

Если у вас есть идеи, вопросы или нужна помощь, загляните в официальную документацию FastStream, пишите в Telegram или Discord. Продолжайте соединять сервисы! 🚀