feat: add Symfony Messenger pipeline with AI agents and handlers

Messages and handlers for the full AI pipeline:
DraftArticle → Validation → SpecsResearch → PhotoUpload → EbayText →
JsonCoding → PublishToChannel / DeactivateListingMessage / TrackingPush /
UpdateStockOnChannels / OrderReceived.

OllamaClient and OllamaClientInterface provide the base LLM backend.
AI agents (EbayTextAgent, JsonCodingAgent, OllamaVisionAgent,
SpecsResearchAgent) wrap the client with task-specific prompts.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Simon Kuehn 2026-05-17 22:43:47 +00:00
parent 838b96eb14
commit fddfd920f5
36 changed files with 1500 additions and 1 deletions

View file

@ -36,4 +36,16 @@ framework:
options: options:
stream: failed stream: failed
routing: [] routing:
App\Infrastructure\Messenger\Message\PhotoUploadMessage: ai_pipeline
App\Infrastructure\Messenger\Message\SpecsResearchMessage: ai_pipeline
App\Infrastructure\Messenger\Message\JsonCodingMessage: ai_pipeline
App\Infrastructure\Messenger\Message\ValidationMessage: ai_pipeline
App\Infrastructure\Messenger\Message\DraftArticleMessage: ai_pipeline
App\Infrastructure\Messenger\Message\EbayTextMessage: ai_pipeline
App\Infrastructure\Messenger\Message\PxeInventoryMessage: ai_pipeline
App\Infrastructure\Messenger\Message\PublishToChannelMessage: channel_sync
App\Infrastructure\Messenger\Message\UpdateStockOnChannelsMessage: channel_sync
App\Infrastructure\Messenger\Message\DeactivateListingMessage: channel_sync
App\Infrastructure\Messenger\Message\OrderReceivedMessage: orders
App\Infrastructure\Messenger\Message\TrackingPushMessage: channel_sync

View file

@ -0,0 +1,43 @@
<?php
declare(strict_types=1);
namespace App\Application\Channel;
use App\Domain\Article\Article;
use App\Domain\Order\Order;
interface ChannelAdapterInterface
{
/**
* Creates a listing on the platform for the given article.
* Returns the platform-specific listing ID.
*
* @throws \RuntimeException if listing creation fails
*/
public function publishListing(Article $article): string;
/**
* Updates stock/quantity for an existing listing.
*
* @throws \RuntimeException if update fails
*/
public function updateStock(Article $article, int $stock): void;
/**
* Deactivates/ends a listing. Must be idempotent (safe to call if already inactive).
*
* @throws \RuntimeException if deactivation fails
*/
public function deactivateListing(Article $article): void;
/**
* Marks an order as shipped with tracking information.
*
* @throws \RuntimeException if push fails
*/
public function pushTracking(Order $order): void;
/** Returns the platform type this adapter handles (e.g. 'ebay'). */
public function getType(): string;
}

View file

@ -0,0 +1,36 @@
<?php
declare(strict_types=1);
namespace App\Application\Channel;
final class ChannelAdapterRegistry
{
/** @var array<string, ChannelAdapterInterface> */
private array $adapters = [];
/** @param iterable<ChannelAdapterInterface> $adapters */
public function __construct(iterable $adapters)
{
foreach ($adapters as $adapter) {
$this->adapters[$adapter->getType()] = $adapter;
}
}
public function get(string $type): ChannelAdapterInterface
{
return $this->adapters[$type]
?? throw new \InvalidArgumentException("No channel adapter registered for type: {$type}");
}
public function has(string $type): bool
{
return isset($this->adapters[$type]);
}
/** @return list<string> */
public function getTypes(): array
{
return array_keys($this->adapters);
}
}

View file

@ -0,0 +1,58 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\AI\Agent;
use App\Domain\Article\Article;
use App\Infrastructure\AI\OllamaClientInterface;
final class EbayTextAgent
{
public function __construct(
private readonly OllamaClientInterface $ollama,
private readonly string $model,
) {
}
/** @return array{title: string, description: string} */
public function generate(Article $article): array
{
$attributes = [];
foreach ($article->getAttributeValues() as $value) {
$attributes[] = $value->getAttributeDefinition()->getName().': '.$value->getValue();
}
$attributeText = implode("\n", $attributes);
$typeName = $article->getArticleType()->getName();
$condition = $article->getCondition()->value;
$conditionNotes = $article->getConditionNotes() ?? '';
$titlePrompt = <<<PROMPT
Create a concise eBay listing title (max 80 characters) for this {$typeName}.
Use the most important specifications. Include condition if not "new".
Condition: {$condition}
Specs:
{$attributeText}
Return ONLY the title text, no quotes, no explanation.
PROMPT;
$descriptionPrompt = <<<PROMPT
Create a professional eBay listing description in German for this {$typeName}.
Include all specifications in a clear, structured format.
Mention the condition: {$condition}.
{$conditionNotes}
Specs:
{$attributeText}
Use HTML formatting (ul, li, strong tags). Max 2000 characters.
PROMPT;
$title = trim($this->ollama->generate($this->model, $titlePrompt));
$description = trim($this->ollama->generate($this->model, $descriptionPrompt));
if (mb_strlen($title) > 80) {
$title = mb_substr($title, 0, 77).'...';
}
return ['title' => $title, 'description' => $description];
}
}

View file

@ -0,0 +1,96 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\AI\Agent;
use App\Domain\Article\ArticleType;
use App\Infrastructure\AI\OllamaClientInterface;
final class JsonCodingAgent
{
public function __construct(
private readonly OllamaClientInterface $ollama,
private readonly string $model,
) {
}
/**
* @param list<string> $missingFields
*
* @return array<string, string>
*/
public function encode(ArticleType $articleType, string $specsText, array $missingFields = []): array
{
$schema = $this->buildSchema($articleType);
$missingHint = [] !== $missingFields
? "\nIMPORTANT: The following fields were missing in the previous attempt. Make sure they are included: ".implode(', ', $missingFields)."\n"
: '';
$prompt = <<<PROMPT
Convert the following hardware specifications to a JSON object.
The JSON must use these exact keys (UUIDs) and follow the indicated value formats:
{$schema}
{$missingHint}
Specifications text:
{$specsText}
Return ONLY valid JSON. No explanation. No markdown. No extra text.
JSON:
PROMPT;
$response = $this->ollama->generate($this->model, $prompt);
return $this->parseJson($response);
}
private function buildSchema(ArticleType $articleType): string
{
$lines = [];
foreach ($articleType->getAttributeDefinitions() as $def) {
$hint = match ($def->getType()->value) {
'int' => 'integer number',
'float' => 'decimal number',
'bool' => 'true or false',
'select' => 'one of: '.implode(', ', $def->getOptions() ?? []),
'multi_select' => 'comma-separated list of: '.implode(', ', $def->getOptions() ?? []),
default => 'string'.(null !== $def->getUnit() ? " in {$def->getUnit()}" : ''),
};
$lines[] = "\"{$def->getId()->toRfc4122()}\": \"{$def->getName()}\" ({$hint})";
}
return implode("\n", $lines);
}
/** @return array<string, string> */
private function parseJson(string $response): array
{
$cleaned = preg_replace('/^```(?:json)?\s*/m', '', $response) ?? $response;
$cleaned = preg_replace('/^```\s*$/m', '', $cleaned) ?? $cleaned;
$cleaned = trim($cleaned);
$start = strpos($cleaned, '{');
$end = strrpos($cleaned, '}');
if (false === $start || false === $end) {
return [];
}
$json = substr($cleaned, $start, $end - $start + 1);
try {
/** @var array<string, mixed> $decoded */
$decoded = json_decode($json, true, 512, \JSON_THROW_ON_ERROR);
$result = [];
foreach ($decoded as $k => $v) {
$result[$k] = \is_scalar($v) ? (string) $v : '';
}
return $result;
} catch (\JsonException) {
return [];
}
}
}

View file

@ -0,0 +1,45 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\AI\Agent;
use App\Infrastructure\AI\OllamaClientInterface;
final class OllamaVisionAgent
{
public function __construct(
private readonly OllamaClientInterface $ollama,
private readonly string $model,
) {
}
/** @return array{model: string, serial: string} */
public function analyze(string $imagePath): array
{
$prompt = <<<'PROMPT'
Look at this nameplate/label photo of IT hardware.
Extract ONLY the model name/designation and serial number that are visible on the label.
Do not guess or add information not on the label.
Respond in exactly this format (use empty string if not visible):
MODEL: <model name>
SERIAL: <serial number>
PROMPT;
$response = $this->ollama->generateWithImage($this->model, $prompt, $imagePath);
return [
'model' => $this->extractField($response, 'MODEL'),
'serial' => $this->extractField($response, 'SERIAL'),
];
}
private function extractField(string $response, string $field): string
{
if (preg_match('/^'.$field.':\s*(.+)$/m', $response, $matches)) {
return trim($matches[1]);
}
return '';
}
}

View file

@ -0,0 +1,45 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\AI\Agent;
use App\Infrastructure\AI\OllamaClientInterface;
use App\Infrastructure\Search\WebSearchInterface;
final class SpecsResearchAgent
{
public function __construct(
private readonly WebSearchInterface $webSearch,
private readonly OllamaClientInterface $ollama,
private readonly string $model,
) {
}
public function research(string $modelName): string
{
$query = "{$modelName} technical specifications full specs";
$searchText = $this->webSearch->search($query);
if ('' === $searchText) {
$searchText = $this->webSearch->search("{$modelName} specs datasheet");
}
if ('' === $searchText) {
throw new \RuntimeException("No web search results found for model: {$modelName}");
}
$prompt = <<<PROMPT
Based on the following search results about "{$modelName}", extract and list all technical specifications.
Include: processor, RAM, storage, display, GPU, battery, ports, weight, dimensions, and any other specs found.
Be complete and accurate. Use the search results as your source, not general knowledge.
Search results:
{$searchText}
List all specifications:
PROMPT;
return $this->ollama->generate($this->model, $prompt);
}
}

View file

@ -0,0 +1,53 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\AI;
use Symfony\Contracts\HttpClient\HttpClientInterface;
final class OllamaClient implements OllamaClientInterface
{
public function __construct(
private readonly HttpClientInterface $httpClient,
private readonly string $ollamaBaseUrl,
) {
}
public function generate(string $model, string $prompt): string
{
$response = $this->httpClient->request('POST', $this->ollamaBaseUrl.'/api/generate', [
'json' => [
'model' => $model,
'prompt' => $prompt,
'stream' => false,
],
'timeout' => 120,
]);
/** @var array{response: string} $data */
$data = $response->toArray();
return $data['response'];
}
public function generateWithImage(string $model, string $prompt, string $imagePath): string
{
$imageData = base64_encode((string) file_get_contents($imagePath));
$response = $this->httpClient->request('POST', $this->ollamaBaseUrl.'/api/generate', [
'json' => [
'model' => $model,
'prompt' => $prompt,
'images' => [$imageData],
'stream' => false,
],
'timeout' => 180,
]);
/** @var array{response: string} $data */
$data = $response->toArray();
return $data['response'];
}
}

View file

@ -0,0 +1,12 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\AI;
interface OllamaClientInterface
{
public function generate(string $model, string $prompt): string;
public function generateWithImage(string $model, string $prompt, string $imagePath): string;
}

View file

@ -0,0 +1,60 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Handler;
use App\Application\Channel\ChannelAdapterRegistry;
use App\Domain\Article\ArticleStatus;
use App\Domain\Article\Repository\ArticleRepositoryInterface;
use App\Infrastructure\Messenger\Message\DeactivateListingMessage;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Uid\Uuid;
#[AsMessageHandler]
final class DeactivateListingHandler
{
private const ALERT_THRESHOLD = 5;
public function __construct(
private readonly ArticleRepositoryInterface $articleRepository,
private readonly ChannelAdapterRegistry $adapterRegistry,
private readonly LoggerInterface $logger,
) {
}
public function __invoke(DeactivateListingMessage $message): void
{
$article = $this->articleRepository->findById(Uuid::fromString($message->articleId));
if (null === $article) {
return;
}
if (!$this->adapterRegistry->has($message->platformType)) {
return;
}
$adapter = $this->adapterRegistry->get($message->platformType);
try {
$adapter->deactivateListing($article);
if (0 === $article->getStock() && ArticleStatus::Listed === $article->getStatus()) {
$article->transitionTo(ArticleStatus::Sold);
$this->articleRepository->save($article);
}
} catch (\RuntimeException $e) {
if ($message->attemptNumber >= self::ALERT_THRESHOLD) {
$this->logger->critical('CRITICAL: Failed to deactivate listing after {attempts} attempts. Risk of oversell!', [
'attempts' => $message->attemptNumber,
'articleId' => $message->articleId,
'platformType' => $message->platformType,
'error' => $e->getMessage(),
]);
}
throw $e;
}
}
}

View file

@ -0,0 +1,59 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Handler;
use App\Application\Article\ArticleService;
use App\Domain\Article\ArticleCondition;
use App\Domain\Pipeline\Repository\AIPipelineJobRepositoryInterface;
use App\Infrastructure\Messenger\Message\DraftArticleMessage;
use App\Infrastructure\Messenger\Message\EbayTextMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Uid\Uuid;
#[AsMessageHandler]
final class DraftArticleHandler
{
public function __construct(
private readonly ArticleService $articleService,
private readonly AIPipelineJobRepositoryInterface $jobRepository,
private readonly MessageBusInterface $bus,
) {
}
public function __invoke(DraftArticleMessage $message): void
{
$job = $this->jobRepository->findById(Uuid::fromString($message->jobId));
if (null === $job) {
return;
}
$condition = ArticleCondition::tryFrom($message->condition) ?? ArticleCondition::Good;
$article = $this->articleService->create(
articleTypeId: Uuid::fromString($message->articleTypeId),
condition: $condition,
stock: 1,
inventoryNumber: $message->inventoryNumber,
);
if (null !== $message->serialNumber) {
$article->setSerialNumber($message->serialNumber);
}
if ([] !== $message->attributes) {
$this->articleService->updateAttributes($article->getId(), $message->attributes);
}
$job->setArticleId($article->getId());
$job->markCompleted(['articleId' => $article->getId()->toRfc4122()]);
$this->jobRepository->save($job);
$this->bus->dispatch(new EbayTextMessage(
jobId: $message->jobId,
articleId: $article->getId()->toRfc4122(),
));
}
}

View file

@ -0,0 +1,39 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Handler;
use App\Application\Article\ArticleService;
use App\Domain\Article\Repository\ArticleRepositoryInterface;
use App\Infrastructure\AI\Agent\EbayTextAgent;
use App\Infrastructure\Messenger\Message\EbayTextMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Uid\Uuid;
#[AsMessageHandler]
final class EbayTextHandler
{
public function __construct(
private readonly EbayTextAgent $ebayTextAgent,
private readonly ArticleRepositoryInterface $articleRepository,
private readonly ArticleService $articleService,
) {
}
public function __invoke(EbayTextMessage $message): void
{
$article = $this->articleRepository->findById(Uuid::fromString($message->articleId));
if (null === $article) {
return;
}
$texts = $this->ebayTextAgent->generate($article);
$this->articleService->setEbayTexts(
articleId: $article->getId(),
title: $texts['title'],
description: $texts['description'],
);
}
}

View file

@ -0,0 +1,51 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Handler;
use App\Domain\Article\Repository\ArticleTypeRepositoryInterface;
use App\Domain\Pipeline\Repository\AIPipelineJobRepositoryInterface;
use App\Infrastructure\AI\Agent\JsonCodingAgent;
use App\Infrastructure\Messenger\Message\JsonCodingMessage;
use App\Infrastructure\Messenger\Message\ValidationMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Uid\Uuid;
#[AsMessageHandler]
final class JsonCodingHandler
{
public function __construct(
private readonly JsonCodingAgent $jsonAgent,
private readonly ArticleTypeRepositoryInterface $articleTypeRepository,
private readonly AIPipelineJobRepositoryInterface $jobRepository,
private readonly MessageBusInterface $bus,
) {
}
public function __invoke(JsonCodingMessage $message): void
{
$job = $this->jobRepository->findById(Uuid::fromString($message->jobId));
if (null === $job) {
return;
}
$articleType = $this->articleTypeRepository->findById(Uuid::fromString($message->articleTypeId));
if (null === $articleType) {
$job->markFailed("ArticleType {$message->articleTypeId} not found");
$this->jobRepository->save($job);
return;
}
$attributes = $this->jsonAgent->encode($articleType, $message->specsText, $message->missingFields);
$this->bus->dispatch(new ValidationMessage(
jobId: $message->jobId,
articleTypeId: $message->articleTypeId,
specsText: $message->specsText,
attributes: $attributes,
));
}
}

View file

@ -0,0 +1,130 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Handler;
use App\Application\Order\CustomerResolverInterface;
use App\Application\Order\ErpAdapterInterface;
use App\Application\Order\InvoiceMailerInterface;
use App\Application\Storage\StorageManagerInterface;
use App\Domain\Article\Repository\ArticleRepositoryInterface;
use App\Domain\Channel\Repository\PlatformRepositoryInterface;
use App\Domain\Order\Invoice;
use App\Domain\Order\Order;
use App\Domain\Order\OrderStatus;
use App\Domain\Order\Repository\InvoiceRepositoryInterface;
use App\Domain\Order\Repository\OrderRepositoryInterface;
use App\Infrastructure\Channel\Ebay\EbayFulfillmentApiClient;
use App\Infrastructure\Messenger\Message\OrderReceivedMessage;
use App\Infrastructure\Messenger\Message\UpdateStockOnChannelsMessage;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\MessageBusInterface;
#[AsMessageHandler]
final class OrderReceivedHandler
{
public function __construct(
private readonly EbayFulfillmentApiClient $fulfillmentClient,
private readonly ArticleRepositoryInterface $articles,
private readonly PlatformRepositoryInterface $platforms,
private readonly OrderRepositoryInterface $orders,
private readonly InvoiceRepositoryInterface $invoices,
private readonly CustomerResolverInterface $customerResolver,
private readonly ErpAdapterInterface $erp,
private readonly StorageManagerInterface $storage,
private readonly InvoiceMailerInterface $mailer,
private readonly MessageBusInterface $bus,
private readonly LoggerInterface $logger,
) {
}
public function __invoke(OrderReceivedMessage $message): void
{
if (null !== $this->orders->findByPlatformOrderId($message->platformOrderId)) {
$this->logger->info('OrderReceivedHandler: duplicate message, skipping', [
'platformOrderId' => $message->platformOrderId,
]);
return;
}
$ebayOrder = $this->fulfillmentClient->getOrder($message->platformOrderId);
$article = $this->articles->findByEbayListingId($ebayOrder['ebayListingId']);
if (null === $article) {
throw new UnrecoverableMessageHandlingException("Article not found for eBay listing ID: {$ebayOrder['ebayListingId']}");
}
$platform = $this->platforms->findByType($message->platformType);
if (null === $platform) {
throw new UnrecoverableMessageHandlingException("Platform '{$message->platformType}' not configured in database");
}
$locked = $this->articles->decrementStockAtomic($article->getId());
if (!$locked) {
$this->logger->critical('OVERSTOCK: stock was already 0, sale cannot be fulfilled', [
'articleId' => $article->getId()->toRfc4122(),
'platformOrderId' => $message->platformOrderId,
]);
throw new UnrecoverableMessageHandlingException("Overstock for article {$article->getId()->toRfc4122()} — manual intervention required");
}
$customer = $this->customerResolver->resolve(
platform: $message->platformType,
platformUserId: $ebayOrder['buyerUsername'],
name: $ebayOrder['buyerName'],
email: $ebayOrder['buyerEmail'],
address: [
'street' => $ebayOrder['shippingStreet'],
'city' => $ebayOrder['shippingCity'],
'zip' => $ebayOrder['shippingZip'],
],
);
$order = new Order(
$article,
$customer,
$platform,
$message->platformOrderId,
$ebayOrder['salePrice'],
new \DateTimeImmutable($ebayOrder['saleDate']),
);
$order->setStatus(OrderStatus::Processing);
$this->orders->save($order);
$frappeInvoiceId = $this->erp->createSalesInvoice($order);
$pdfContent = $this->erp->fetchInvoicePdf($frappeInvoiceId);
$tmpFile = tempnam(sys_get_temp_dir(), 'invoice_');
\assert(false !== $tmpFile);
file_put_contents($tmpFile, $pdfContent);
$stored = $this->storage->store($tmpFile, 'invoice-'.$frappeInvoiceId.'.pdf');
unlink($tmpFile);
$invoice = new Invoice($order, $frappeInvoiceId, $stored->storagePath, $stored->filename);
$order->setInvoice($invoice);
$this->invoices->save($invoice);
$this->mailer->sendInvoice($invoice);
$invoice->markAsEmailed();
$this->invoices->save($invoice);
$this->bus->dispatch(new UpdateStockOnChannelsMessage(
articleId: $article->getId()->toRfc4122(),
newStock: $article->getStock(),
));
$order->setStatus(OrderStatus::Completed);
$this->orders->save($order);
$this->logger->info('Order processed successfully', [
'orderId' => $order->getId()->toRfc4122(),
'platformOrderId' => $message->platformOrderId,
'frappeInvoiceId' => $frappeInvoiceId,
]);
}
}

View file

@ -0,0 +1,51 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Handler;
use App\Domain\Pipeline\Repository\AIPipelineJobRepositoryInterface;
use App\Infrastructure\AI\Agent\OllamaVisionAgent;
use App\Infrastructure\Messenger\Message\PhotoUploadMessage;
use App\Infrastructure\Messenger\Message\SpecsResearchMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Uid\Uuid;
#[AsMessageHandler]
final class PhotoUploadHandler
{
public function __construct(
private readonly OllamaVisionAgent $visionAgent,
private readonly AIPipelineJobRepositoryInterface $jobRepository,
private readonly MessageBusInterface $bus,
) {
}
public function __invoke(PhotoUploadMessage $message): void
{
$job = $this->jobRepository->findById(Uuid::fromString($message->jobId));
if (null === $job) {
return;
}
$job->markProcessing();
$this->jobRepository->save($job);
$result = $this->visionAgent->analyze($message->storedPhotoPath);
if ('' === $result['model']) {
$job->markNeedsReview('OllamaVisionAgent: no model name detected on nameplate');
$this->jobRepository->save($job);
return;
}
$this->bus->dispatch(new SpecsResearchMessage(
jobId: $message->jobId,
articleTypeId: $message->articleTypeId,
modelName: $result['model'],
serialNumber: $result['serial'],
));
}
}

View file

@ -0,0 +1,50 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Handler;
use App\Application\Channel\ChannelAdapterRegistry;
use App\Domain\Article\ArticleStatus;
use App\Domain\Article\Repository\ArticleRepositoryInterface;
use App\Domain\Channel\Repository\PlatformRepositoryInterface;
use App\Infrastructure\Messenger\Message\PublishToChannelMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Uid\Uuid;
#[AsMessageHandler]
final class PublishToChannelHandler
{
public function __construct(
private readonly ArticleRepositoryInterface $articleRepository,
private readonly PlatformRepositoryInterface $platformRepository,
private readonly ChannelAdapterRegistry $adapterRegistry,
) {
}
public function __invoke(PublishToChannelMessage $message): void
{
$article = $this->articleRepository->findById(Uuid::fromString($message->articleId));
if (null === $article || ArticleStatus::Active !== $article->getStatus()) {
return;
}
$platforms = $this->platformRepository->findAll();
foreach ($platforms as $platform) {
if (!$this->adapterRegistry->has($platform->getType())) {
continue;
}
$adapter = $this->adapterRegistry->get($platform->getType());
$listingId = $adapter->publishListing($article);
if ('ebay' === $platform->getType()) {
$article->setEbayListingId($listingId);
}
$article->transitionTo(ArticleStatus::Listed);
$this->articleRepository->save($article);
}
}
}

View file

@ -0,0 +1,39 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Handler;
use App\Domain\Pipeline\Repository\AIPipelineJobRepositoryInterface;
use App\Infrastructure\Messenger\Message\JsonCodingMessage;
use App\Infrastructure\Messenger\Message\PxeInventoryMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Uid\Uuid;
#[AsMessageHandler]
final class PxeInventoryHandler
{
public function __construct(
private readonly AIPipelineJobRepositoryInterface $jobRepository,
private readonly MessageBusInterface $bus,
) {
}
public function __invoke(PxeInventoryMessage $message): void
{
$job = $this->jobRepository->findById(Uuid::fromString($message->jobId));
if (null === $job) {
return;
}
$job->markProcessing();
$this->jobRepository->save($job);
$this->bus->dispatch(new JsonCodingMessage(
jobId: $message->jobId,
articleTypeId: $message->articleTypeId,
specsText: $message->pxeDump,
));
}
}

View file

@ -0,0 +1,47 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Handler;
use App\Domain\Pipeline\Repository\AIPipelineJobRepositoryInterface;
use App\Infrastructure\AI\Agent\SpecsResearchAgent;
use App\Infrastructure\Messenger\Message\JsonCodingMessage;
use App\Infrastructure\Messenger\Message\SpecsResearchMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Uid\Uuid;
#[AsMessageHandler]
final class SpecsResearchHandler
{
public function __construct(
private readonly SpecsResearchAgent $specsAgent,
private readonly AIPipelineJobRepositoryInterface $jobRepository,
private readonly MessageBusInterface $bus,
) {
}
public function __invoke(SpecsResearchMessage $message): void
{
$job = $this->jobRepository->findById(Uuid::fromString($message->jobId));
if (null === $job) {
return;
}
try {
$specsText = $this->specsAgent->research($message->modelName);
} catch (\RuntimeException $e) {
$job->markNeedsReview('SpecsResearchAgent: '.$e->getMessage());
$this->jobRepository->save($job);
return;
}
$this->bus->dispatch(new JsonCodingMessage(
jobId: $message->jobId,
articleTypeId: $message->articleTypeId,
specsText: $specsText,
));
}
}

View file

@ -0,0 +1,45 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Handler;
use App\Application\Channel\ChannelAdapterRegistry;
use App\Domain\Order\Repository\OrderRepositoryInterface;
use App\Infrastructure\Messenger\Message\TrackingPushMessage;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Uid\Uuid;
#[AsMessageHandler]
final class TrackingPushHandler
{
public function __construct(
private readonly OrderRepositoryInterface $orders,
private readonly ChannelAdapterRegistry $channelAdapters,
private readonly LoggerInterface $logger,
) {
}
public function __invoke(TrackingPushMessage $message): void
{
$order = $this->orders->findById(Uuid::fromString($message->orderId));
if (null === $order) {
throw new UnrecoverableMessageHandlingException("Order {$message->orderId} not found");
}
$platformType = $order->getPlatform()->getType();
$adapter = $this->channelAdapters->get($platformType);
$adapter->pushTracking($order);
$order->markTrackingPushedToEbay();
$this->orders->save($order);
$this->logger->info('Tracking pushed to channel', [
'orderId' => $message->orderId,
'platform' => $platformType,
'trackingNumber' => $message->trackingNumber,
]);
}
}

View file

@ -0,0 +1,58 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Handler;
use App\Application\Channel\ChannelAdapterRegistry;
use App\Domain\Article\Repository\ArticleRepositoryInterface;
use App\Domain\Channel\Repository\PlatformRepositoryInterface;
use App\Infrastructure\Messenger\Message\DeactivateListingMessage;
use App\Infrastructure\Messenger\Message\UpdateStockOnChannelsMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Uid\Uuid;
#[AsMessageHandler]
final class UpdateStockOnChannelsHandler
{
public function __construct(
private readonly ArticleRepositoryInterface $articleRepository,
private readonly PlatformRepositoryInterface $platformRepository,
private readonly ChannelAdapterRegistry $adapterRegistry,
private readonly MessageBusInterface $bus,
) {
}
public function __invoke(UpdateStockOnChannelsMessage $message): void
{
$article = $this->articleRepository->findById(Uuid::fromString($message->articleId));
if (null === $article) {
return;
}
if (0 === $message->newStock) {
$platforms = $this->platformRepository->findAll();
foreach ($platforms as $platform) {
if ($this->adapterRegistry->has($platform->getType())) {
$this->bus->dispatch(new DeactivateListingMessage(
articleId: $message->articleId,
platformType: $platform->getType(),
));
}
}
return;
}
$platforms = $this->platformRepository->findAll();
foreach ($platforms as $platform) {
if (!$this->adapterRegistry->has($platform->getType())) {
continue;
}
$adapter = $this->adapterRegistry->get($platform->getType());
$adapter->updateStock($article, $message->newStock);
}
}
}

View file

@ -0,0 +1,94 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Handler;
use App\Domain\Article\Repository\ArticleTypeRepositoryInterface;
use App\Domain\Pipeline\Repository\AIPipelineJobRepositoryInterface;
use App\Infrastructure\Messenger\Message\DraftArticleMessage;
use App\Infrastructure\Messenger\Message\JsonCodingMessage;
use App\Infrastructure\Messenger\Message\ValidationMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Uid\Uuid;
#[AsMessageHandler]
final class ValidationHandler
{
private const MAX_ATTEMPTS = 3;
public function __construct(
private readonly AIPipelineJobRepositoryInterface $jobRepository,
private readonly MessageBusInterface $bus,
private readonly ?ArticleTypeRepositoryInterface $articleTypeRepository = null,
) {
}
public function __invoke(ValidationMessage $message): void
{
$job = $this->jobRepository->findById(Uuid::fromString($message->jobId));
if (null === $job) {
return;
}
$missing = $this->findMissingFields($message);
if ([] === $missing) {
$this->bus->dispatch(new DraftArticleMessage(
jobId: $message->jobId,
articleTypeId: $message->articleTypeId,
attributes: $message->attributes,
condition: 'good',
inventoryNumber: null,
serialNumber: null,
));
return;
}
if ($job->getAttemptCount() >= self::MAX_ATTEMPTS) {
$job->markNeedsReview('Validation failed after '.self::MAX_ATTEMPTS.' attempts. Missing: '.implode(', ', $missing));
$this->jobRepository->save($job);
return;
}
$job->incrementAttempt($missing);
$this->jobRepository->save($job);
$this->bus->dispatch(new JsonCodingMessage(
jobId: $message->jobId,
articleTypeId: $message->articleTypeId,
specsText: $message->specsText,
missingFields: $missing,
));
}
/** @return list<string> attribute names that are required but not present */
private function findMissingFields(ValidationMessage $message): array
{
// Empty attributes always means retry
if ([] === $message->attributes) {
return ['(no attributes returned by LLM)'];
}
if (null === $this->articleTypeRepository) {
return [];
}
$articleType = $this->articleTypeRepository->findById(Uuid::fromString($message->articleTypeId));
if (null === $articleType) {
return [];
}
$missing = [];
foreach ($articleType->getRequiredAttributeDefinitions() as $def) {
if (!\array_key_exists($def->getId()->toRfc4122(), $message->attributes)) {
$missing[] = $def->getName();
}
}
return $missing;
}
}

View file

@ -0,0 +1,15 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Message;
final readonly class DeactivateListingMessage
{
public function __construct(
public string $articleId,
public string $platformType,
public int $attemptNumber = 1,
) {
}
}

View file

@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Message;
final readonly class DraftArticleMessage
{
/** @param array<string, string> $attributes */
public function __construct(
public string $jobId,
public string $articleTypeId,
public array $attributes,
public string $condition,
public ?string $inventoryNumber,
public ?string $serialNumber,
) {
}
}

View file

@ -0,0 +1,14 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Message;
final readonly class EbayTextMessage
{
public function __construct(
public string $jobId,
public string $articleId,
) {
}
}

View file

@ -0,0 +1,17 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Message;
final readonly class JsonCodingMessage
{
/** @param list<string> $missingFields */
public function __construct(
public string $jobId,
public string $articleTypeId,
public string $specsText,
public array $missingFields = [],
) {
}
}

View file

@ -0,0 +1,18 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Message;
final readonly class OrderReceivedMessage
{
/**
* @param array<string, mixed> $rawPayload
*/
public function __construct(
public string $platformOrderId,
public string $platformType,
public array $rawPayload,
) {
}
}

View file

@ -0,0 +1,16 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Message;
final readonly class PhotoUploadMessage
{
public function __construct(
public string $jobId,
public string $articleTypeId,
public string $storedPhotoPath,
public string $originalFilename,
) {
}
}

View file

@ -0,0 +1,13 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Message;
final readonly class PublishToChannelMessage
{
public function __construct(
public string $articleId,
) {
}
}

View file

@ -0,0 +1,17 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Message;
final readonly class PxeInventoryMessage
{
public function __construct(
public string $jobId,
public string $articleTypeId,
public string $pxeDump,
public string $inventoryNumber,
public string $condition,
) {
}
}

View file

@ -0,0 +1,16 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Message;
final readonly class SpecsResearchMessage
{
public function __construct(
public string $jobId,
public string $articleTypeId,
public string $modelName,
public string $serialNumber,
) {
}
}

View file

@ -0,0 +1,15 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Message;
final readonly class TrackingPushMessage
{
public function __construct(
public string $orderId,
public string $trackingNumber,
public string $carrier,
) {
}
}

View file

@ -0,0 +1,14 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Message;
final readonly class UpdateStockOnChannelsMessage
{
public function __construct(
public string $articleId,
public int $newStock,
) {
}
}

View file

@ -0,0 +1,17 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Messenger\Message;
final readonly class ValidationMessage
{
/** @param array<string, string> $attributes */
public function __construct(
public string $jobId,
public string $articleTypeId,
public string $specsText,
public array $attributes,
) {
}
}

View file

@ -0,0 +1,56 @@
<?php
declare(strict_types=1);
namespace App\Tests\Unit\Infrastructure\AI\Agent;
use App\Domain\Article\ArticleType;
use App\Domain\Article\AttributeDefinition;
use App\Domain\Article\AttributeType;
use App\Infrastructure\AI\Agent\JsonCodingAgent;
use App\Infrastructure\AI\OllamaClientInterface;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
final class JsonCodingAgentTest extends TestCase
{
private OllamaClientInterface&MockObject $ollama;
private JsonCodingAgent $agent;
private ArticleType $type;
protected function setUp(): void
{
$this->ollama = $this->createMock(OllamaClientInterface::class);
$this->agent = new JsonCodingAgent($this->ollama, 'llama3.2');
$this->type = new ArticleType('Notebook');
$ramDef = new AttributeDefinition('RAM', AttributeType::String);
$this->type->addAttributeDefinition($ramDef);
}
public function testReturnsParsedAttributes(): void
{
$first = $this->type->getAttributeDefinitions()->first();
\assert($first instanceof AttributeDefinition);
$defId = $first->getId()->toRfc4122();
$this->ollama->method('generate')
->willReturn('```json'."\n".'{"'.$defId.'": "16 GB"}'."\n".'```');
$result = $this->agent->encode($this->type, 'Dell Latitude 5520: 16 GB RAM, Intel i7');
self::assertCount(1, $result);
self::assertSame('16 GB', array_values($result)[0]);
}
public function testExtractsJsonFromMarkdownFences(): void
{
$first = $this->type->getAttributeDefinitions()->first();
\assert($first instanceof AttributeDefinition);
$defId = $first->getId()->toRfc4122();
$this->ollama->method('generate')
->willReturn("Here is the JSON:\n```json\n{\"{$defId}\": \"16 GB\"}\n```\nDone.");
$result = $this->agent->encode($this->type, 'Specs text');
self::assertArrayHasKey($defId, $result);
}
}

View file

@ -0,0 +1,44 @@
<?php
declare(strict_types=1);
namespace App\Tests\Unit\Infrastructure\AI\Agent;
use App\Infrastructure\AI\Agent\OllamaVisionAgent;
use App\Infrastructure\AI\OllamaClientInterface;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
final class OllamaVisionAgentTest extends TestCase
{
private OllamaClientInterface&MockObject $ollama;
private OllamaVisionAgent $agent;
protected function setUp(): void
{
$this->ollama = $this->createMock(OllamaClientInterface::class);
$this->agent = new OllamaVisionAgent($this->ollama, 'llava');
}
public function testParsesModelAndSerialFromResponse(): void
{
$this->ollama->method('generateWithImage')
->willReturn("MODEL: Dell Latitude 5520\nSERIAL: ABC12345");
$result = $this->agent->analyze('/tmp/photo.jpg');
self::assertSame('Dell Latitude 5520', $result['model']);
self::assertSame('ABC12345', $result['serial']);
}
public function testReturnsEmptyStringsWhenNotFound(): void
{
$this->ollama->method('generateWithImage')
->willReturn('I cannot read the nameplate clearly.');
$result = $this->agent->analyze('/tmp/photo.jpg');
self::assertSame('', $result['model']);
self::assertSame('', $result['serial']);
}
}

View file

@ -0,0 +1,85 @@
<?php
declare(strict_types=1);
namespace App\Tests\Unit\Infrastructure\Messenger\Handler;
use App\Domain\Article\ArticleType;
use App\Domain\Article\AttributeDefinition;
use App\Domain\Article\AttributeType;
use App\Domain\Pipeline\AIPipelineJob;
use App\Domain\Pipeline\AIPipelineJobType;
use App\Domain\Pipeline\Repository\AIPipelineJobRepositoryInterface;
use App\Infrastructure\Messenger\Handler\ValidationHandler;
use App\Infrastructure\Messenger\Message\DraftArticleMessage;
use App\Infrastructure\Messenger\Message\JsonCodingMessage;
use App\Infrastructure\Messenger\Message\ValidationMessage;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
final class ValidationHandlerTest extends TestCase
{
private AIPipelineJobRepositoryInterface&MockObject $jobRepo;
private MessageBusInterface&MockObject $bus;
private AIPipelineJob $job;
private ValidationHandler $handler;
protected function setUp(): void
{
$this->jobRepo = $this->createMock(AIPipelineJobRepositoryInterface::class);
$this->bus = $this->createMock(MessageBusInterface::class);
$this->handler = new ValidationHandler($this->jobRepo, $this->bus);
$this->job = new AIPipelineJob(AIPipelineJobType::Photo, ['test' => true]);
}
public function testDispatchesDraftMessageWhenAllAttributesPresent(): void
{
$this->jobRepo->method('findById')->willReturn($this->job);
$type = new ArticleType('Notebook');
$ramDef = new AttributeDefinition('RAM', AttributeType::String);
$cpuDef = new AttributeDefinition('CPU', AttributeType::String);
$type->addAttributeDefinition($ramDef);
$type->addAttributeDefinition($cpuDef);
$attributes = [
$ramDef->getId()->toRfc4122() => '16 GB',
$cpuDef->getId()->toRfc4122() => 'Intel i7',
];
$this->bus->expects(self::once())
->method('dispatch')
->with(self::isInstanceOf(DraftArticleMessage::class))
->willReturn(new Envelope(new \stdClass()));
($this->handler)(new ValidationMessage(
jobId: $this->job->getId()->toRfc4122(),
articleTypeId: $type->getId()->toRfc4122(),
specsText: 'some specs',
attributes: $attributes,
));
}
public function testRetriesJsonCodingWhenFieldsMissingAndUnderLimit(): void
{
$this->jobRepo->method('findById')->willReturn($this->job);
$type = new ArticleType('Notebook');
$type->addAttributeDefinition(new AttributeDefinition('RAM', AttributeType::String));
$this->bus->expects(self::once())
->method('dispatch')
->with(self::isInstanceOf(JsonCodingMessage::class))
->willReturn(new Envelope(new \stdClass()));
($this->handler)(new ValidationMessage(
jobId: $this->job->getId()->toRfc4122(),
articleTypeId: $type->getId()->toRfc4122(),
specsText: 'some specs',
attributes: [],
));
}
}