Блог CREATIVE
Интересное Разработка

Использование Symfony Messenger на практике




Symfony Messenger
— компонент, позволяющий приложениям обмениваться сообщениями через очередь.


Теория и принципы


PHP-приложения уже давно переросли простую парадигму «получил запрос — отправил ответ», обзавелись множеством возможностей реализации бизнес-логики и инструментов, позволяющих любую такого рода логику реализовать.

Предположим, мы делаем сервис, отдающий потребителям информацию, собранную из агрегированных источников. Для примера возьмём информацию о компаниях, находящихся рядом с определёнными координатами. Пользователь видит карту, кликает на любую точку этой карты и получает список компаний, находящихся рядом с этой точкой.

Есть два сторонних сервиса: один определяет почтовый индекс места по гео-координатам, а второй возвращает список коммерческих компаний, зарегистрированных по адресам с определённым почтовым индексом.


Традиционная парадигма подразумевает, что:


  • приложение получает запрос с гео-коодинатами;
  • отправляет запрос к одному из сервисов;
  • дожидается ответа;
  • разбирает этот ответ, проверяет валидность данных;
  • отправляет запрос к другому сервису;
  • дожидается ответа уже от него;
  • разбирает этот ответ, проверяет его валидность;
  • сериализует эти данные и отправляет их пользователю.

Как мы видим, это довольно длинная цепочка, причём всё время, нужное на отправку и обработку данных сторонних API, экземпляр нашего приложения занят и не может отвечать на другие запросы пользователей. Это нерационально (приложение по большей части просто ждёт ответа от стороннего сервера) и ненадёжно — если запрос от пользователя или запрос к стороннему API прервался на уровне http / tcp, сторонний API по какой-то причине не ответил, или произошло что-то подобное — конечный пользователь получит ошибку, а не данные.

Выход из положения предоставляет нам концепция очередей в целом и Symfony Messenger в частности.


Вот как выглядит схема работы нашего приложения с использованием Messenger:


  • Приложение получает запрос, присваивает ему уникальный идентификатор и тут же отправляет ответ с этим идентификатором;
  • При получении запроса создаётся объект сообщения, который отправляется в шину сообщений;
  • Работающая в фоне (независимо — возможно, даже на другом сервере или в другом экземпляре приложения) консольная команда получает сообщение из шины и запускает сервисы и процессы, необходимые для обработки этого сообщения.
  • Обратите внимание, что ответ от приложения пользователю уже отправлен и сбой на уровне сетевых процессов уже невозможен — то есть наш пользователь по-прежнему ждёт ответа, и при этом он уверен в том, что ответ будет.
  • Тем временем наши сервисы отправляют запросы, ждут ответов и обрабатывают их. Здесь стоит заметить, что сам Messenger имеет возможность повторять неудачные запросы столько раз, сколько это необходимо: то есть, если сторонний API не ответил один раз, можно его переспросить.
  • Когда все ответы получены и обработаны, наше приложение отправляет http-ответ с данными на callback-url, который предоставил пользователь. Вообразим, что со стороны интерфейса приложения есть некий сервис, который связан непосредственно с браузером пользователя и имеет точку входа для данных. Этот сервис получает их, сопоставляет идентификатор запроса, который у него есть, с полученным и отправляет эти данные уже конкретному пользователю, который их ждёт.

Таким образом, используя концепции очередей и шины сообщений, мы обошли возможные сбои в сетевой структуре, тем самым значительно улучшив вероятность доставки данных конечному пользователю. Кроме того, повысилась и отзывчивость приложения — как уже было сказано, консьюмер очереди может работать независимо, не занимая в этот момент приложение как таковое.

Конечно, такой подход немного усложняет архитектуру в целом — нам, как минимум, нужен сервис, который будет связан с браузером пользователя и без запроса со стороны этого браузера отправит в него данные. Но с современными технологиями (вебсокеты, server-sent live updates и подобные) это не такая уж и сложная задача.


Практические примеры реализации


Предположим, что у нас уже существуют два сервиса для получения данных их сторонних API с интерфейсами PoscodeByGeoInterface и CompayByPostcodeInterface. Они оба имеют публичный метод request, возвращающий, в случае PostcodeByGeoInterface коллекцию объектов PostCode, а в случае с CompayByPostcodeInterface коллекцию объектов Company.


Реализуем класс сообщения для передачи запроса к сервису PoscodeByGeoInterface:

<?php declare(strict_types=1);
namespace App\Message;
use Doctrine\Common\Collections\{Collection, ArrayCollection};
use App\DataClass\PostCode;
class PostcodeByGeoMessage
{
private string $requestId;
private Collection $postcodes;
private array $request;

public function __construct(string $requestId, array $request)
{
$this->requestId = $requestId;
\array_walk($request, fn($coord) => (float) $coord);
$this->request = $request;
$this->postcodes = new ArrayCollection();
}

public function getRequestId(): string
{
return $this->requestId;
}

public function getPostcodes(): Collection
{
return $this->postcodes;
}

public function addPostcode(PostCode $postCode): self
{
if (!$this->postcodes->contains($postCode)) {
$this->postcodes->add($postCode);
}
return $this;
}

public function removePostCode(PostCode $postCode): self
{
if ($this->postcodes->contains($postCode)) {
$this->postcodes->removeElement($postCode);
}
return $this;
}

public function getLat(): float
{
return $this->postcodes[0];
}

public function getLong(): float
{
return $this->postcodes[1];
}

}


Обратите внимание, что здесь не реализована проверка переданных данных (array $request). Хорошей практикой будет преобразовать запрос пользователя в отдельный объект (с помощью ParamConverterInterface) и проверить валидность данных заранее.

Как вы можете видеть, это простой класс для хранения данных, не имплементирующий никакой логики, кроме стандартных get / set методов.


Обработчик (handler) для сообщения


Ниже приведен упрощённый вариант хэндлера, который обрабатывает сообщение из очереди:

<?php declare(strict_types=1);
namespace App\MessageHandler;
use Doctrine\Common\Collections\Collection;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use App\Message\{PostcodeByGeoMessage, Exceptions\PostcodeSyGeoServiceException}
use Symfony\Component\Messenger\{Envelope, Exception\UnrecoverableMessageHandlingException, Handler\MessageHandlerInterface, MessageBusInterface, Stamp\DispatchAfterCurrentBusStamp
};

class PostcodeByGeoMessageHandler implements MessageHandlerInterface
{
private PostcodeByGeoInterface $postcodeByGeo;
public function __construct(PostcodeByGeoInterface $postcodeByGeo, MessageBusInterface $messageBus)
{
$this->postcodeByGeo = $postcodeByGeo;
}

public function __invoke(PostcodeByGeoMessage $message): void
{
try {
$postcodes = $this->postcodeByGeo->request($message->getLat(), $message->getLong());
$companyMessage = new CompanyMessage($postcodes);
$this->messageBus->dispatch((new Envelope($companyMessage))->with(new DispatchAfterCurrentBusStamp())); // Next message to queue — will do request to another service
} catch (PostcodeSyGeoServiceException $e) {
throw new \RuntimeException($e->getMessage());
}
}
}


Обратите внимание, что в примере мы можем видеть только самый общий принцип — в частности, я не привожу пример CompanyMessage, но это и не нужно — в этой части всё устроено одинаково.

Стоит обратить внимание на то, что в экземпляр PostcodeByGeoMessageHandler мы загружаем MessageBusInterface — это позволяет передавать в шину сообщения, сформированные на основании полученных данных. Так, CompanyMessage должно, очевидно, получать в конструкторе коллекцию посткодов, а затем отправлять их в качестве запроса в сторонний API.

Таким образом становится понятно, что в общем итоге у нас организуется цепочка handler-ов, последнее звено которой будет отправлять данные конечному пользователю.


Настройки и конфигурация очереди и компонента


Конфигурация компонента Messenger хранится в узле messenger конфигурации symfony framework. Конфигурация задаёт параметры для транспорта (хендлера) и роутинга.

Например, наша текущая конфигурация будет примерно такая:

framework:
messenger:
transports:
postcodes.by.geo:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%' # Define it in .env, see examples below

retry_strategy:
max_retries: 5
delay: 60000
multiplier: 1
max_delay: 0

options:
stream: messages
group: my_group
consumer: consumer
auto_setup: true

routing:
App\Message\PostcodeByGeoMessage: postcodes.by.geo

Переменная MESSENGER_TRANSPORT_DSN определяет, какой именно брокер очередей будет использоваться. По-умолчанию транспортом могут выступать redis streams, doctrine (база данных) или AMQP (RabbitMQ, например).

Команда для запуска консьюмера очереди в этом случае будет такой:

bin/console messenger:consume postcodes.by.geo


Выводы и перспективы


Мы привели упрощённый пример использования Messenger, который должен подтолкнуть вас к собственным идеям реализации такого использования. Реализовать асинхронные ответы такого рода непросто, но и возможности использования их поистине огромны.

Вообразите, например, насколько сократится время ответа приложения, если все запросы к его API будут получать лишь идентификатор и отправляться пользователю, а вся работа с ресурсоёмкими частями (базой данных, сериализацией / десериализацией, отправкой запросов в сторонние API и прочим) будет проходить в фоне, невидимо для пользователя? Ведь зачастую запрос в REST API для создания чего-либо не требует ответа вообще, а значит, и дожидаться этого ответа смысла нет!

API-Platform, к слову уже реализовали подобную схему у себя.