
Symfony Messenger
— компонент, позволяющий приложениям обмениваться сообщениями через очередь.
Теория и принципы
PHP-приложения уже давно переросли простую парадигму «получил запрос — отправил ответ», обзавелись множеством возможностей реализации бизнес-логики и инструментов, позволяющих любую такого рода логику реализовать.
Предположим, мы делаем сервис, отдающий потребителям информацию, собранную из агрегированных источников. Для примера возьмём информацию о компаниях, находящихся рядом с определёнными координатами. Пользователь видит карту, кликает на любую точку этой карты и получает список компаний, находящихся рядом с этой точкой.
Есть два сторонних сервиса: один определяет почтовый индекс места по гео-координатам, а второй возвращает список коммерческих компаний, зарегистрированных по адресам с определённым почтовым индексом.
Традиционная парадигма подразумевает, что:
- приложение получает запрос с гео-коодинатами;
- отправляет запрос к одному из сервисов;
- дожидается ответа;
- разбирает этот ответ, проверяет валидность данных;
- отправляет запрос к другому сервису;
- дожидается ответа уже от него;
- разбирает этот ответ, проверяет его валидность;
- сериализует эти данные и отправляет их пользователю.
Как мы видим, это довольно длинная цепочка, причём всё время, нужное на отправку и обработку данных сторонних API, экземпляр нашего приложения занят и не может отвечать на другие запросы пользователей. Это нерационально (приложение по большей части просто ждёт ответа от стороннего сервера) и ненадёжно — если запрос от пользователя или запрос к стороннему API прервался на уровне http / tcp, сторонний API по какой-то причине не ответил, или произошло что-то подобное — конечный пользователь получит ошибку, а не данные.
Выход из положения предоставляет нам концепция очередей в целом и Symfony Messenger в частности.
Вот как выглядит схема работы нашего приложения с использованием Messenger:
- Приложение получает запрос, присваивает ему уникальный идентификатор и тут же отправляет ответ с этим идентификатором;
- При получении запроса создаётся объект сообщения, который отправляется в шину сообщений;
- Работающая в фоне (независимо — возможно, даже на другом сервере или в другом экземпляре приложения) консольная команда получает сообщение из шины и запускает сервисы и процессы, необходимые для обработки этого сообщения.
- Обратите внимание, что ответ от приложения пользователю уже отправлен и сбой на уровне сетевых процессов уже невозможен — то есть наш пользователь по-прежнему ждёт ответа, и при этом он уверен в том, что ответ будет.
- Тем временем наши сервисы отправляют запросы, ждут ответов и обрабатывают их. Здесь стоит заметить, что сам Messenger имеет возможность повторять неудачные запросы столько раз, сколько это необходимо: то есть, если сторонний API не ответил один раз, можно его переспросить.
- Когда все ответы получены и обработаны, наше приложение отправляет http-ответ с данными на callback-url, который предоставил пользователь. Вообразим, что со стороны интерфейса приложения есть некий сервис, который связан непосредственно с браузером пользователя и имеет точку входа для данных. Этот сервис получает их, сопоставляет идентификатор запроса, который у него есть, с полученным и отправляет эти данные уже конкретному пользователю, который их ждёт.
Таким образом, используя концепции очередей и шины сообщений, мы обошли возможные сбои в сетевой структуре, тем самым значительно улучшив вероятность доставки данных конечному пользователю. Кроме того, повысилась и отзывчивость приложения — как уже было сказано, консьюмер очереди может работать независимо, не занимая в этот момент приложение как таковое.
Конечно, такой подход немного усложняет архитектуру в целом — нам, как минимум, нужен сервис, который будет связан с браузером пользователя и без запроса со стороны этого браузера отправит в него данные. Но с современными технологиями (вебсокеты, server-sent live updates и подобные) это не такая уж и сложная задача.
Практические примеры реализации
Предположим, что у нас уже существуют два сервиса для получения данных их сторонних API с интерфейсами PoscodeByGeoInterface и CompayByPostcodeInterface. Они оба имеют публичный метод request, возвращающий, в случае PostcodeByGeoInterface коллекцию объектов PostCode, а в случае с CompayByPostcodeInterface коллекцию объектов Company.
Реализуем класс сообщения для передачи запроса к сервису PoscodeByGeoInterface:
<?php declare(strict_types=1);
namespace App\Message;
use Doctrine\Common\Collections\{Collection, ArrayCollection};
use App\DataClass\PostCode;
class PostcodeByGeoMessage
{
private string $requestId;
private Collection $postcodes;
private array $request;
public function __construct(string $requestId, array $request)
{
$this->requestId = $requestId;
\array_walk($request, fn($coord) => (float) $coord);
$this->request = $request;
$this->postcodes = new ArrayCollection();
}
public function getRequestId(): string
{
return $this->requestId;
}
public function getPostcodes(): Collection
{
return $this->postcodes;
}
public function addPostcode(PostCode $postCode): self
{
if (!$this->postcodes->contains($postCode)) {
$this->postcodes->add($postCode);
}
return $this;
}
public function removePostCode(PostCode $postCode): self
{
if ($this->postcodes->contains($postCode)) {
$this->postcodes->removeElement($postCode);
}
return $this;
}
public function getLat(): float
{
return $this->postcodes[0];
}
public function getLong(): float
{
return $this->postcodes[1];
}
}
Обратите внимание, что здесь не реализована проверка переданных данных (array $request). Хорошей практикой будет преобразовать запрос пользователя в отдельный объект (с помощью ParamConverterInterface) и проверить валидность данных заранее.
Как вы можете видеть, это простой класс для хранения данных, не имплементирующий никакой логики, кроме стандартных get / set методов.
Обработчик (handler) для сообщения
Ниже приведен упрощённый вариант хэндлера, который обрабатывает сообщение из очереди:
<?php declare(strict_types=1);
namespace App\MessageHandler;
use Doctrine\Common\Collections\Collection;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use App\Message\{PostcodeByGeoMessage, Exceptions\PostcodeSyGeoServiceException}
use Symfony\Component\Messenger\{Envelope, Exception\UnrecoverableMessageHandlingException, Handler\MessageHandlerInterface, MessageBusInterface, Stamp\DispatchAfterCurrentBusStamp
};
class PostcodeByGeoMessageHandler implements MessageHandlerInterface
{
private PostcodeByGeoInterface $postcodeByGeo;
public function __construct(PostcodeByGeoInterface $postcodeByGeo, MessageBusInterface $messageBus)
{
$this->postcodeByGeo = $postcodeByGeo;
}
public function __invoke(PostcodeByGeoMessage $message): void
{
try {
$postcodes = $this->postcodeByGeo->request($message->getLat(), $message->getLong());
$companyMessage = new CompanyMessage($postcodes);
$this->messageBus->dispatch((new Envelope($companyMessage))->with(new DispatchAfterCurrentBusStamp())); // Next message to queue — will do request to another service
} catch (PostcodeSyGeoServiceException $e) {
throw new \RuntimeException($e->getMessage());
}
}
}
Обратите внимание, что в примере мы можем видеть только самый общий принцип — в частности, я не привожу пример CompanyMessage, но это и не нужно — в этой части всё устроено одинаково.
Стоит обратить внимание на то, что в экземпляр PostcodeByGeoMessageHandler мы загружаем MessageBusInterface — это позволяет передавать в шину сообщения, сформированные на основании полученных данных. Так, CompanyMessage должно, очевидно, получать в конструкторе коллекцию посткодов, а затем отправлять их в качестве запроса в сторонний API.
Таким образом становится понятно, что в общем итоге у нас организуется цепочка handler-ов, последнее звено которой будет отправлять данные конечному пользователю.
Настройки и конфигурация очереди и компонента
Конфигурация компонента Messenger хранится в узле messenger конфигурации symfony framework. Конфигурация задаёт параметры для транспорта (хендлера) и роутинга.
Например, наша текущая конфигурация будет примерно такая:
framework:
messenger:
transports:
postcodes.by.geo:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%' # Define it in .env, see examples below
retry_strategy:
max_retries: 5
delay: 60000
multiplier: 1
max_delay: 0
options:
stream: messages
group: my_group
consumer: consumer
auto_setup: true
routing:
App\Message\PostcodeByGeoMessage: postcodes.by.geo
Переменная MESSENGER_TRANSPORT_DSN определяет, какой именно брокер очередей будет использоваться. По-умолчанию транспортом могут выступать redis streams, doctrine (база данных) или AMQP (RabbitMQ, например).
Команда для запуска консьюмера очереди в этом случае будет такой:
bin/console messenger:consume postcodes.by.geo
Выводы и перспективы
Мы привели упрощённый пример использования Messenger, который должен подтолкнуть вас к собственным идеям реализации такого использования. Реализовать асинхронные ответы такого рода непросто, но и возможности использования их поистине огромны.
Вообразите, например, насколько сократится время ответа приложения, если все запросы к его API будут получать лишь идентификатор и отправляться пользователю, а вся работа с ресурсоёмкими частями (базой данных, сериализацией / десериализацией, отправкой запросов в сторонние API и прочим) будет проходить в фоне, невидимо для пользователя? Ведь зачастую запрос в REST API для создания чего-либо не требует ответа вообще, а значит, и дожидаться этого ответа смысла нет!
API-Platform, к слову уже реализовали подобную схему у себя.