Структурированные сообщения с Pydantic ✅#
В предыдущем разделе вы научились создавать сервисы, которые взаимодействуют через очереди RabbitMQ. Теперь пора сделать приложение мощнее! В этом разделе мы добавим Pydantic — фреймворк для структурирования и валидации данных. Это позволит обрабатывать JSON-сообщения с четкой структурой и автоматически проверять их корректность. Мы также настроим приложение для отправки данных в две очереди, чтобы взаимодействовать с другим сервисом. Готовы улучшить ваше приложение? Погнали! 🚀
Зачем нужен Pydantic? 🤔#
Когда вы работаете с брокерами сообщений, данные часто приходят в формате JSON. Без проверки легко получить ошибки, если, например, в сообщении отсутствует нужное поле или поле имеет неверный тип. Pydantic решает эту проблему:
- Структурирует данные 📋: Вы определяете, как должны выглядеть сообщения (например,
{ "username": str, "message": str }). - Валидирует автоматически 🛡️: Если сообщение не соответствует структуре, FastStream выбросит ошибку.
- Упрощает код ✨: Вы работаете с типизированными объектами, а не с сырыми словарями.
В FastStream Pydantic интегрирован “из коробки”, что делает работу с JSON-сообщениями удобной и безопасной. 😎
Шаг 1: Обновление приложения ✍️#
Обновим приложение из app.py, чтобы оно обрабатывало JSON-сообщения с полями username и message и отправляло результаты в output-queue и final-queue. Откройте app.py и замените код:
Что изменилось? 🔍
- Модель
UserMessage: Класс Pydantic с полямиusernameиmessage. Метаданные (Field) добавим позже в разделе про AsyncAPI. @broker.subscriber: Декораторы, регистрирующие функции как подписчиков на очередиinput-queueиoutput-queue.handle_message: Принимает объектUserMessage, логирует данные и отправляет новыйUserMessageсmessageв верхнем регистре вoutput-queueиfinal-queue.broker.publish: Отправляет структурированное сообщение в обе очереди, чтобы первый сервис (app.py) мог обработать промежуточный результат, а второй (app2.py) — финальный.check_result: Валидирует структуру UserMessage на соответствие правильных полей и логирует результат изoutput-queue, показывая, как первый сервис потребляет промежуточные данные.
Примечание 🌐: Сообщения из final-queue обрабатываются вторым сервисом (app2.py), как описано в разделе про взаимодействие сервисов.
Альтернативный способ отправки 📤
Вместо broker.publish можно использовать @broker.publisher:
@broker.subscriber("input-queue")
@broker.publisher("output-queue")
@broker.publisher("final-queue")
async def handle_message(data: UserMessage, logger: Logger) -> UserMessage:
logger.info(f"Получено: {data.username} сказал {data.message!r}")
return UserMessage(username=data.username, message=data.message.upper())
Порядок декораторов важен: @broker.subscriber — внешний, @broker.publisher — внутренний, так как первый регистрирует подписчика, а второй обрабатывает возвращаемое значение. Несколько @broker.publisher позволяют отправлять данные в разные очереди.
Шаг 2: Обновление отправки сообщений 📬#
Чтобы протестировать приложение с Pydantic, обновите файл send_message.py в папке faststream-tutorial, чтобы он отправлял структурированное сообщение с использованием Pydantic:
Что здесь происходит? 🔍
UserMessage: Pydantic-модель, соответствующая модели вapp.py, для согласованности (в реальном проекте её можно вынести в общий модуль).broker.publish: Отправляет объектUserMessage, который FastStream автоматически сериализует в JSON{ "username": "Alice", "message": "Hello" }.- Контекстный менеджер: Используется
async withдля одноразового подключения к RabbitMQ и отправки сообщения.
Шаг 3: Запуск и публикация сообщений ▶️#
-
Запустите RabbitMQ (если не запущен):
-
Запустите оба сервиса в отдельных терминалах:
-
Для первого сервиса (
Вывод:app.py): -
Для второго сервиса (
Вывод:app2.py): -
Отправьте сообщение:
Вывод в терминале app.py:
INFO - input-queue | - Получено: Alice сказал 'Hello'
INFO - output-queue | - Промежуточный результат: Alice сказал 'HELLO'
Вывод в терминале app2.py:
Что произошло? 🔄
FastStream получил JSON из input-queue, проверил его через Pydantic, обработал сообщение в handle_message и отправил результат в output-queue и final-queue. Подписчик check_result обработал данные из output-queue, а app2.py — из final-queue. Всё работает! 🎉
Шаг 4: Проверка валидации 🛡️#
Pydantic автоматически валидирует сообщения. Проверим, что происходит при отправке неверного JSON. Создайте send_message_invalid.py:
Убедитесь, что оба сервиса (app.py и app2.py) запущены. Запустите:
Что произойдет? 🔍
Сообщение { "wrong_field": "Hello" } не соответствует структуре UserMessage, так как модель ожидает поля username и message, оба типа str. Pydantic выбросит ошибку валидации, и handle_message не обработает сообщение. В терминале app.py вы увидите лог ошибки, например:
Сообщение не будет отправлено в output-queue или final-queue, поэтому check_result и app2.py ничего не получат. Pydantic защищает приложение от некорректных данных! 🛡️
Шаг 5: Практическое задание 📚#
Закрепите знания:
- Добавьте в
UserMessageполеtimestamp: datetime, обновитеhandle_message, чтобы включать текущую дату (datetime.now()), и проверьте результат в логахcheck_resultиapp2.py. - Измените
send_message.py, чтобы отправлять сообщение сtimestamp, и проверьте валидацию. - (Дополнительно) Настройте
messageсmin_length=1и проверьте, что пустая строка вызывает ошибку валидации.
Что дальше? 🗺️#
Вы научились использовать Pydantic для структурирования и валидации сообщений! 🎉 Это делает ваше приложение надежным. В следующем разделе мы протестируем приложение без реального брокера. Перейдите к Тестирование приложения, чтобы освоить инструменты тестирования FastStream.
Если у вас есть вопросы или нужна помощь, загляните в официальную документацию FastStream, пишите в Telegram или Discord. Продолжайте кодить! 🚀