Структурированные сообщения с Pydantic ✅#
В предыдущем разделе вы научились создавать сервисы, которые взаимодействуют через очереди RabbitMQ. Теперь пора сделать приложение мощнее! В этом разделе мы добавим Pydantic — фреймворк для структурирования и валидации данных. Это позволит обрабатывать JSON-сообщения с четкой структурой и автоматически проверять их корректность. Мы также настроим приложение для отправки данных в две очереди, чтобы взаимодействовать с другим сервисом. Готовы улучшить ваше приложение? Погнали! 🚀
Зачем нужен Pydantic? 🤔#
Когда вы работаете с брокерами сообщений, данные часто приходят в формате JSON. Без проверки легко получить ошибки, если, например, в сообщении отсутствует нужное поле или поле имеет неверный тип. Pydantic решает эту проблему:
- Структурирует данные 📋: Вы определяете, как должны выглядеть сообщения (например,
{ "username": str, "message": str }
). - Валидирует автоматически 🛡️: Если сообщение не соответствует структуре, FastStream выбросит ошибку.
- Упрощает код ✨: Вы работаете с типизированными объектами, а не с сырыми словарями.
В FastStream Pydantic интегрирован “из коробки”, что делает работу с JSON-сообщениями удобной и безопасной. 😎
Шаг 1: Обновление приложения ✍️#
Обновим приложение из app.py
, чтобы оно обрабатывало JSON-сообщения с полями username
и message
и отправляло результаты в output-queue
и final-queue
. Откройте app.py
и замените код:
Что изменилось? 🔍
- Модель
UserMessage
: Класс Pydantic с полямиusername
иmessage
. Метаданные (Field
) добавим позже в разделе про AsyncAPI. @broker.subscriber
: Декораторы, регистрирующие функции как подписчиков на очередиinput-queue
иoutput-queue
.handle_message
: Принимает объектUserMessage
, логирует данные и отправляет новыйUserMessage
сmessage
в верхнем регистре вoutput-queue
иfinal-queue
.broker.publish
: Отправляет структурированное сообщение в обе очереди, чтобы первый сервис (app.py
) мог обработать промежуточный результат, а второй (app2.py
) — финальный.check_result
: Валидирует структуру UserMessage на соответствие правильных полей и логирует результат изoutput-queue
, показывая, как первый сервис потребляет промежуточные данные.
Примечание 🌐: Сообщения из final-queue
обрабатываются вторым сервисом (app2.py
), как описано в разделе про взаимодействие сервисов.
Альтернативный способ отправки 📤
Вместо broker.publish
можно использовать @broker.publisher
:
@broker.subscriber("input-queue")
@broker.publisher("output-queue")
@broker.publisher("final-queue")
async def handle_message(data: UserMessage, logger: Logger) -> UserMessage:
logger.info(f"Получено: {data.username} сказал {data.message!r}")
return UserMessage(username=data.username, message=data.message.upper())
Порядок декораторов важен: @broker.subscriber
— внешний, @broker.publisher
— внутренний, так как первый регистрирует подписчика, а второй обрабатывает возвращаемое значение. Несколько @broker.publisher
позволяют отправлять данные в разные очереди.
Шаг 2: Обновление отправки сообщений 📬#
Чтобы протестировать приложение с Pydantic, обновите файл send_message.py
в папке faststream-tutorial
, чтобы он отправлял структурированное сообщение с использованием Pydantic:
Что здесь происходит? 🔍
UserMessage
: Pydantic-модель, соответствующая модели вapp.py
, для согласованности (в реальном проекте её можно вынести в общий модуль).broker.publish
: Отправляет объектUserMessage
, который FastStream автоматически сериализует в JSON{ "username": "Alice", "message": "Hello" }
.- Контекстный менеджер: Используется
async with
для одноразового подключения к RabbitMQ и отправки сообщения.
Шаг 3: Запуск и публикация сообщений ▶️#
-
Запустите RabbitMQ (если не запущен):
-
Запустите оба сервиса в отдельных терминалах:
-
Для первого сервиса (
Вывод:app.py
): -
Для второго сервиса (
Вывод:app2.py
): -
Отправьте сообщение:
Вывод в терминале app.py
:
INFO - input-queue | - Получено: Alice сказал 'Hello'
INFO - output-queue | - Промежуточный результат: Alice сказал 'HELLO'
Вывод в терминале app2.py
:
Что произошло? 🔄
FastStream получил JSON из input-queue
, проверил его через Pydantic, обработал сообщение в handle_message
и отправил результат в output-queue
и final-queue
. Подписчик check_result
обработал данные из output-queue
, а app2.py
— из final-queue
. Всё работает! 🎉
Шаг 4: Проверка валидации 🛡️#
Pydantic автоматически валидирует сообщения. Проверим, что происходит при отправке неверного JSON. Создайте send_message_invalid.py
:
Убедитесь, что оба сервиса (app.py
и app2.py
) запущены. Запустите:
Что произойдет? 🔍
Сообщение { "wrong_field": "Hello" }
не соответствует структуре UserMessage
, так как модель ожидает поля username
и message
, оба типа str
. Pydantic выбросит ошибку валидации, и handle_message
не обработает сообщение. В терминале app.py
вы увидите лог ошибки, например:
Сообщение не будет отправлено в output-queue
или final-queue
, поэтому check_result
и app2.py
ничего не получат. Pydantic защищает приложение от некорректных данных! 🛡️
Шаг 5: Практическое задание 📚#
Закрепите знания:
- Добавьте в
UserMessage
полеtimestamp: datetime
, обновитеhandle_message
, чтобы включать текущую дату (datetime.now()
), и проверьте результат в логахcheck_result
иapp2.py
. - Измените
send_message.py
, чтобы отправлять сообщение сtimestamp
, и проверьте валидацию. - (Дополнительно) Настройте
message
сmin_length=1
и проверьте, что пустая строка вызывает ошибку валидации.
Что дальше? 🗺️#
Вы научились использовать Pydantic для структурирования и валидации сообщений! 🎉 Это делает ваше приложение надежным. В следующем разделе мы протестируем приложение без реального брокера. Перейдите к Тестирование приложения, чтобы освоить инструменты тестирования FastStream.
Если у вас есть вопросы или нужна помощь, загляните в официальную документацию FastStream, пишите в Telegram или Discord. Продолжайте кодить! 🚀