feat: live pipeline status notifications via SSE

Adds real-time toast notifications to all admin pages for AI pipeline
job progress. The browser subscribes to an SSE endpoint
(GET /admin/pipeline/events) which polls the DB every 2 seconds and
emits events whenever a job's step or status changes.

- AIPipelineJob gains updatedAt (migration 20260519020000), bumped on
  every state-change method, with an index for efficient polling
- AIPipelineJobRepositoryInterface/Doctrine get findUpdatedSince()
- PipelineStreamController streams SSE with per-connection dedup and
  auto-reconnect (retry: 3000); streams for 90 s then signals reconnect
- pipeline-notifications.js handles EventSource, shows colour-coded
  toasts (queued/processing/completed/failed/needs_review) and is loaded
  globally via DashboardController::configureAssets()

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Simon Kuehn 2026-05-18 07:36:03 +00:00
parent a38fe7e72d
commit 4c16f8cd68
7 changed files with 350 additions and 0 deletions

View file

@ -0,0 +1,29 @@
<?php
declare(strict_types=1);
namespace DoctrineMigrations;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\Migrations\AbstractMigration;
final class Version20260519020000 extends AbstractMigration
{
public function getDescription(): string
{
return 'Add updated_at column to ai_pipeline_jobs for SSE change detection';
}
public function up(Schema $schema): void
{
$this->addSql("ALTER TABLE app.ai_pipeline_jobs ADD COLUMN updated_at TIMESTAMP(0) WITHOUT TIME ZONE NOT NULL DEFAULT NOW()");
$this->addSql("COMMENT ON COLUMN app.ai_pipeline_jobs.updated_at IS '(DC2Type:datetime_immutable)'");
$this->addSql('CREATE INDEX idx_pipeline_jobs_updated_at ON app.ai_pipeline_jobs (updated_at)');
}
public function down(Schema $schema): void
{
$this->addSql('DROP INDEX app.idx_pipeline_jobs_updated_at');
$this->addSql('ALTER TABLE app.ai_pipeline_jobs DROP COLUMN updated_at');
}
}

View file

@ -0,0 +1,158 @@
(function () {
'use strict';
// ── Styles ──────────────────────────────────────────────────────────────
const css = `
#pipeline-toasts {
position: fixed;
bottom: 1.5rem;
right: 1.5rem;
z-index: 9999;
display: flex;
flex-direction: column-reverse;
gap: .5rem;
max-width: 22rem;
pointer-events: none;
}
.pipeline-toast {
display: flex;
align-items: flex-start;
gap: .6rem;
padding: .65rem .9rem;
border-radius: .45rem;
background: #1e293b;
color: #f1f5f9;
font-size: .82rem;
line-height: 1.35;
box-shadow: 0 4px 16px rgba(0,0,0,.35);
pointer-events: all;
opacity: 0;
transform: translateX(110%);
transition: opacity .25s ease, transform .25s ease;
}
.pipeline-toast.visible {
opacity: 1;
transform: translateX(0);
}
.pipeline-toast.hiding {
opacity: 0;
transform: translateX(110%);
}
.pipeline-toast .pt-icon {
font-size: 1rem;
flex-shrink: 0;
margin-top: .05rem;
}
.pipeline-toast.status-queued { border-left: 3px solid #60a5fa; }
.pipeline-toast.status-processing { border-left: 3px solid #f59e0b; }
.pipeline-toast.status-completed { border-left: 3px solid #22c55e; }
.pipeline-toast.status-failed { border-left: 3px solid #ef4444; background: #2d1a1a; }
.pipeline-toast.status-needs_review { border-left: 3px solid #f97316; }
`;
const ICONS = {
queued: '🕐',
processing: '⚙️',
completed: '✅',
failed: '❌',
needs_review: '⚠️',
};
const DISPLAY_SECONDS = {
queued: 5,
processing: 6,
completed: 8,
failed: 12,
needs_review: 12,
};
// ── DOM setup ────────────────────────────────────────────────────────────
function inject() {
const style = document.createElement('style');
style.textContent = css;
document.head.appendChild(style);
const container = document.createElement('div');
container.id = 'pipeline-toasts';
document.body.appendChild(container);
return container;
}
function showToast(container, event) {
const status = event.status ?? 'queued';
const icon = ICONS[status] ?? '🔔';
const ttl = (DISPLAY_SECONDS[status] ?? 6) * 1000;
const toast = document.createElement('div');
toast.className = `pipeline-toast status-${status}`;
toast.innerHTML =
`<span class="pt-icon">${icon}</span>` +
`<span>${escapeHtml(event.message)}</span>`;
container.appendChild(toast);
// Animate in
requestAnimationFrame(() => {
requestAnimationFrame(() => toast.classList.add('visible'));
});
// Animate out and remove
const hide = () => {
toast.classList.add('hiding');
toast.addEventListener('transitionend', () => toast.remove(), { once: true });
};
const timer = setTimeout(hide, ttl);
toast.addEventListener('click', () => { clearTimeout(timer); hide(); });
}
function escapeHtml(str) {
return str
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/"/g, '&quot;');
}
// ── SSE connection ───────────────────────────────────────────────────────
function connect(container, lastEventId) {
const url = lastEventId
? `/admin/pipeline/events?lastEventId=${encodeURIComponent(lastEventId)}`
: '/admin/pipeline/events';
const es = new EventSource(url);
let storedLastId = lastEventId ?? null;
es.onmessage = (e) => {
if (e.lastEventId) storedLastId = e.lastEventId;
try {
const event = JSON.parse(e.data);
if (event && event.message) showToast(container, event);
} catch (_) { /* ignore malformed frames */ }
};
es.addEventListener('reconnect', () => {
es.close();
// Small delay so the server-side loop has fully exited
setTimeout(() => connect(container, storedLastId), 500);
});
es.onerror = () => {
es.close();
// Browser will retry via EventSource built-in retry, but we also
// reconnect manually to pass the last event ID properly.
setTimeout(() => connect(container, storedLastId), 4000);
};
}
// ── Boot ─────────────────────────────────────────────────────────────────
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', boot);
} else {
boot();
}
function boot() {
const container = inject();
connect(container, null);
}
}());

View file

@ -48,6 +48,9 @@ class AIPipelineJob
#[ORM\Column(type: 'datetime_immutable')] #[ORM\Column(type: 'datetime_immutable')]
private \DateTimeImmutable $createdAt; private \DateTimeImmutable $createdAt;
#[ORM\Column(type: 'datetime_immutable')]
private \DateTimeImmutable $updatedAt;
#[ORM\Column(type: 'datetime_immutable', nullable: true)] #[ORM\Column(type: 'datetime_immutable', nullable: true)]
private ?\DateTimeImmutable $completedAt = null; private ?\DateTimeImmutable $completedAt = null;
@ -59,6 +62,7 @@ class AIPipelineJob
$this->inputData = $inputData; $this->inputData = $inputData;
$this->status = AIPipelineJobStatus::Queued; $this->status = AIPipelineJobStatus::Queued;
$this->createdAt = new \DateTimeImmutable(); $this->createdAt = new \DateTimeImmutable();
$this->updatedAt = new \DateTimeImmutable();
} }
public function getId(): Uuid public function getId(): Uuid
@ -139,6 +143,7 @@ class AIPipelineJob
{ {
$this->currentStep = $step; $this->currentStep = $step;
$this->outputData[$step] = $data; $this->outputData[$step] = $data;
$this->updatedAt = new \DateTimeImmutable();
} }
public function resetForRetry(): void public function resetForRetry(): void
@ -146,6 +151,7 @@ class AIPipelineJob
$this->status = AIPipelineJobStatus::Queued; $this->status = AIPipelineJobStatus::Queued;
$this->errorMessage = null; $this->errorMessage = null;
$this->completedAt = null; $this->completedAt = null;
$this->updatedAt = new \DateTimeImmutable();
} }
public function getCreatedAt(): \DateTimeImmutable public function getCreatedAt(): \DateTimeImmutable
@ -153,6 +159,11 @@ class AIPipelineJob
return $this->createdAt; return $this->createdAt;
} }
public function getUpdatedAt(): \DateTimeImmutable
{
return $this->updatedAt;
}
public function getCompletedAt(): ?\DateTimeImmutable public function getCompletedAt(): ?\DateTimeImmutable
{ {
return $this->completedAt; return $this->completedAt;
@ -161,6 +172,7 @@ class AIPipelineJob
public function markProcessing(): void public function markProcessing(): void
{ {
$this->status = AIPipelineJobStatus::Processing; $this->status = AIPipelineJobStatus::Processing;
$this->updatedAt = new \DateTimeImmutable();
} }
/** @param array<string, mixed> $outputData */ /** @param array<string, mixed> $outputData */
@ -169,6 +181,7 @@ class AIPipelineJob
$this->status = AIPipelineJobStatus::Completed; $this->status = AIPipelineJobStatus::Completed;
$this->outputData = array_merge($this->outputData, $outputData); $this->outputData = array_merge($this->outputData, $outputData);
$this->completedAt = new \DateTimeImmutable(); $this->completedAt = new \DateTimeImmutable();
$this->updatedAt = new \DateTimeImmutable();
} }
public function markFailed(string $reason): void public function markFailed(string $reason): void
@ -176,6 +189,7 @@ class AIPipelineJob
$this->status = AIPipelineJobStatus::Failed; $this->status = AIPipelineJobStatus::Failed;
$this->errorMessage = $reason; $this->errorMessage = $reason;
$this->completedAt = new \DateTimeImmutable(); $this->completedAt = new \DateTimeImmutable();
$this->updatedAt = new \DateTimeImmutable();
} }
public function markNeedsReview(string $reason): void public function markNeedsReview(string $reason): void
@ -183,6 +197,7 @@ class AIPipelineJob
$this->status = AIPipelineJobStatus::NeedsReview; $this->status = AIPipelineJobStatus::NeedsReview;
$this->errorMessage = $reason; $this->errorMessage = $reason;
$this->completedAt = new \DateTimeImmutable(); $this->completedAt = new \DateTimeImmutable();
$this->updatedAt = new \DateTimeImmutable();
} }
/** @param list<string> $missing */ /** @param list<string> $missing */

View file

@ -17,5 +17,8 @@ interface AIPipelineJobRepositoryInterface
public function findByArticleId(Uuid $articleId): ?AIPipelineJob; public function findByArticleId(Uuid $articleId): ?AIPipelineJob;
/** @return list<AIPipelineJob> */
public function findUpdatedSince(\DateTimeImmutable $since): array;
public function save(AIPipelineJob $job): void; public function save(AIPipelineJob $job): void;
} }

View file

@ -5,6 +5,7 @@ declare(strict_types=1);
namespace App\Infrastructure\Http\Controller\Admin; namespace App\Infrastructure\Http\Controller\Admin;
use EasyCorp\Bundle\EasyAdminBundle\Attribute\AdminDashboard; use EasyCorp\Bundle\EasyAdminBundle\Attribute\AdminDashboard;
use EasyCorp\Bundle\EasyAdminBundle\Config\Assets;
use EasyCorp\Bundle\EasyAdminBundle\Config\Dashboard; use EasyCorp\Bundle\EasyAdminBundle\Config\Dashboard;
use EasyCorp\Bundle\EasyAdminBundle\Config\MenuItem; use EasyCorp\Bundle\EasyAdminBundle\Config\MenuItem;
use EasyCorp\Bundle\EasyAdminBundle\Config\UserMenu; use EasyCorp\Bundle\EasyAdminBundle\Config\UserMenu;
@ -30,6 +31,11 @@ final class DashboardController extends AbstractDashboardController
]); ]);
} }
public function configureAssets(): Assets
{
return Assets::new()->addJsFile('js/admin/pipeline-notifications.js');
}
public function configureDashboard(): Dashboard public function configureDashboard(): Dashboard
{ {
return Dashboard::new() return Dashboard::new()

View file

@ -0,0 +1,126 @@
<?php
declare(strict_types=1);
namespace App\Infrastructure\Http\Controller\Admin;
use App\Domain\Pipeline\AIPipelineJob;
use App\Domain\Pipeline\AIPipelineJobStatus;
use App\Domain\Pipeline\Repository\AIPipelineJobRepositoryInterface;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\StreamedResponse;
use Symfony\Component\Routing\Attribute\Route;
use Symfony\Component\Security\Http\Attribute\IsGranted;
#[IsGranted('ROLE_USER')]
final class PipelineStreamController extends AbstractController
{
private const array STEP_LABELS = [
'vision' => 'Foto analysiert',
'specs_research' => 'Specs recherchiert',
'json_coding' => 'Attribute kodiert',
'draft_article' => 'Artikel erstellt',
'ebay_text' => 'eBay-Texte generiert',
'validation' => 'Validierung',
];
public function __construct(
private readonly AIPipelineJobRepositoryInterface $jobRepository,
) {
}
#[Route('/admin/pipeline/events', name: 'admin_pipeline_events', methods: ['GET'])]
public function events(Request $request): StreamedResponse
{
$response = new StreamedResponse(function () use ($request): void {
// Disable output buffering so events reach the browser immediately
while (ob_get_level() > 0) {
ob_end_flush();
}
$lastEventId = $request->headers->get('Last-Event-ID')
?? $request->query->get('lastEventId');
$since = $lastEventId
? \DateTimeImmutable::createFromFormat('U.u', $lastEventId) ?: new \DateTimeImmutable()
: new \DateTimeImmutable();
// Per-connection state: jobId => [step, status] to suppress duplicates
/** @var array<string, array{step: ?string, status: string}> $seen */
$seen = [];
$startTime = time();
// Tell the client to reconnect after 3 s if the stream drops
echo "retry: 3000\n\n";
flush();
while (!connection_aborted() && (time() - $startTime) < 90) {
$now = new \DateTimeImmutable();
$jobs = $this->jobRepository->findUpdatedSince($since);
$since = $now;
foreach ($jobs as $job) {
$jobId = $job->getId()->toRfc4122();
$step = $job->getCurrentStep();
$status = $job->getStatus()->value;
$prev = $seen[$jobId] ?? null;
if ($prev !== null && $prev['step'] === $step && $prev['status'] === $status) {
continue;
}
$seen[$jobId] = ['step' => $step, 'status' => $status];
$event = $this->buildEvent($job, $step, $status);
$id = $now->format('U.u');
echo "id: {$id}\n";
echo 'data: '.json_encode($event, \JSON_THROW_ON_ERROR)."\n\n";
flush();
}
sleep(2);
}
// Signal the client that the stream ended normally — it will reconnect
echo "event: reconnect\ndata: {}\n\n";
flush();
});
$response->headers->set('Content-Type', 'text/event-stream');
$response->headers->set('Cache-Control', 'no-cache');
$response->headers->set('X-Accel-Buffering', 'no');
return $response;
}
/** @return array{jobId: string, label: string, status: string, step: ?string, message: string} */
private function buildEvent(AIPipelineJob $job, ?string $step, string $status): array
{
$inv = $job->getInventoryNumber() ?? substr($job->getId()->toRfc4122(), 0, 8);
$label = "Job #{$inv}";
$message = match (true) {
$status === AIPipelineJobStatus::Queued->value => "{$label} zur Pipeline hinzugefügt",
$status === AIPipelineJobStatus::Processing->value && $step === null
=> "{$label} gestartet",
$status === AIPipelineJobStatus::Processing->value
=> "{$label}: ".self::STEP_LABELS[$step] ?? $step,
$status === AIPipelineJobStatus::Completed->value => "{$label} abgeschlossen ✓",
$status === AIPipelineJobStatus::Failed->value => "{$label} fehlgeschlagen: ".($job->getErrorMessage() ?? ''),
$status === AIPipelineJobStatus::NeedsReview->value
=> "{$label} benötigt manuelle Prüfung",
default => "{$label}: {$status}",
};
return [
'jobId' => $job->getId()->toRfc4122(),
'label' => $label,
'status' => $status,
'step' => $step,
'message' => $message,
];
}
}

View file

@ -37,6 +37,19 @@ final class DoctrineAIPipelineJobRepository implements AIPipelineJobRepositoryIn
); );
} }
/** @return list<AIPipelineJob> */
public function findUpdatedSince(\DateTimeImmutable $since): array
{
/** @var list<AIPipelineJob> */
return $this->em->getRepository(AIPipelineJob::class)
->createQueryBuilder('j')
->where('j.updatedAt >= :since')
->setParameter('since', $since)
->orderBy('j.updatedAt', 'ASC')
->getQuery()
->getResult();
}
public function save(AIPipelineJob $job): void public function save(AIPipelineJob $job): void
{ {
$this->em->persist($job); $this->em->persist($job);