diff --git a/migrations/Version20260519020000.php b/migrations/Version20260519020000.php new file mode 100644 index 0000000..0b7c2c4 --- /dev/null +++ b/migrations/Version20260519020000.php @@ -0,0 +1,29 @@ +addSql("ALTER TABLE app.ai_pipeline_jobs ADD COLUMN updated_at TIMESTAMP(0) WITHOUT TIME ZONE NOT NULL DEFAULT NOW()"); + $this->addSql("COMMENT ON COLUMN app.ai_pipeline_jobs.updated_at IS '(DC2Type:datetime_immutable)'"); + $this->addSql('CREATE INDEX idx_pipeline_jobs_updated_at ON app.ai_pipeline_jobs (updated_at)'); + } + + public function down(Schema $schema): void + { + $this->addSql('DROP INDEX app.idx_pipeline_jobs_updated_at'); + $this->addSql('ALTER TABLE app.ai_pipeline_jobs DROP COLUMN updated_at'); + } +} diff --git a/public/js/admin/pipeline-notifications.js b/public/js/admin/pipeline-notifications.js new file mode 100644 index 0000000..e83096d --- /dev/null +++ b/public/js/admin/pipeline-notifications.js @@ -0,0 +1,158 @@ +(function () { + 'use strict'; + + // ── Styles ────────────────────────────────────────────────────────────── + const css = ` +#pipeline-toasts { + position: fixed; + bottom: 1.5rem; + right: 1.5rem; + z-index: 9999; + display: flex; + flex-direction: column-reverse; + gap: .5rem; + max-width: 22rem; + pointer-events: none; +} +.pipeline-toast { + display: flex; + align-items: flex-start; + gap: .6rem; + padding: .65rem .9rem; + border-radius: .45rem; + background: #1e293b; + color: #f1f5f9; + font-size: .82rem; + line-height: 1.35; + box-shadow: 0 4px 16px rgba(0,0,0,.35); + pointer-events: all; + opacity: 0; + transform: translateX(110%); + transition: opacity .25s ease, transform .25s ease; +} +.pipeline-toast.visible { + opacity: 1; + transform: translateX(0); +} +.pipeline-toast.hiding { + opacity: 0; + transform: translateX(110%); +} +.pipeline-toast .pt-icon { + font-size: 1rem; + flex-shrink: 0; + margin-top: .05rem; +} +.pipeline-toast.status-queued { border-left: 3px solid #60a5fa; } +.pipeline-toast.status-processing { border-left: 3px solid #f59e0b; } +.pipeline-toast.status-completed { border-left: 3px solid #22c55e; } +.pipeline-toast.status-failed { border-left: 3px solid #ef4444; background: #2d1a1a; } +.pipeline-toast.status-needs_review { border-left: 3px solid #f97316; } +`; + + const ICONS = { + queued: '🕐', + processing: '⚙️', + completed: '✅', + failed: '❌', + needs_review: '⚠️', + }; + + const DISPLAY_SECONDS = { + queued: 5, + processing: 6, + completed: 8, + failed: 12, + needs_review: 12, + }; + + // ── DOM setup ──────────────────────────────────────────────────────────── + function inject() { + const style = document.createElement('style'); + style.textContent = css; + document.head.appendChild(style); + + const container = document.createElement('div'); + container.id = 'pipeline-toasts'; + document.body.appendChild(container); + return container; + } + + function showToast(container, event) { + const status = event.status ?? 'queued'; + const icon = ICONS[status] ?? '🔔'; + const ttl = (DISPLAY_SECONDS[status] ?? 6) * 1000; + + const toast = document.createElement('div'); + toast.className = `pipeline-toast status-${status}`; + toast.innerHTML = + `${icon}` + + `${escapeHtml(event.message)}`; + + container.appendChild(toast); + + // Animate in + requestAnimationFrame(() => { + requestAnimationFrame(() => toast.classList.add('visible')); + }); + + // Animate out and remove + const hide = () => { + toast.classList.add('hiding'); + toast.addEventListener('transitionend', () => toast.remove(), { once: true }); + }; + const timer = setTimeout(hide, ttl); + toast.addEventListener('click', () => { clearTimeout(timer); hide(); }); + } + + function escapeHtml(str) { + return str + .replace(/&/g, '&') + .replace(//g, '>') + .replace(/"/g, '"'); + } + + // ── SSE connection ─────────────────────────────────────────────────────── + function connect(container, lastEventId) { + const url = lastEventId + ? `/admin/pipeline/events?lastEventId=${encodeURIComponent(lastEventId)}` + : '/admin/pipeline/events'; + + const es = new EventSource(url); + let storedLastId = lastEventId ?? null; + + es.onmessage = (e) => { + if (e.lastEventId) storedLastId = e.lastEventId; + try { + const event = JSON.parse(e.data); + if (event && event.message) showToast(container, event); + } catch (_) { /* ignore malformed frames */ } + }; + + es.addEventListener('reconnect', () => { + es.close(); + // Small delay so the server-side loop has fully exited + setTimeout(() => connect(container, storedLastId), 500); + }); + + es.onerror = () => { + es.close(); + // Browser will retry via EventSource built-in retry, but we also + // reconnect manually to pass the last event ID properly. + setTimeout(() => connect(container, storedLastId), 4000); + }; + } + + // ── Boot ───────────────────────────────────────────────────────────────── + if (document.readyState === 'loading') { + document.addEventListener('DOMContentLoaded', boot); + } else { + boot(); + } + + function boot() { + const container = inject(); + connect(container, null); + } +}()); diff --git a/src/Domain/Pipeline/AIPipelineJob.php b/src/Domain/Pipeline/AIPipelineJob.php index 929ed9e..1af8761 100644 --- a/src/Domain/Pipeline/AIPipelineJob.php +++ b/src/Domain/Pipeline/AIPipelineJob.php @@ -48,6 +48,9 @@ class AIPipelineJob #[ORM\Column(type: 'datetime_immutable')] private \DateTimeImmutable $createdAt; + #[ORM\Column(type: 'datetime_immutable')] + private \DateTimeImmutable $updatedAt; + #[ORM\Column(type: 'datetime_immutable', nullable: true)] private ?\DateTimeImmutable $completedAt = null; @@ -59,6 +62,7 @@ class AIPipelineJob $this->inputData = $inputData; $this->status = AIPipelineJobStatus::Queued; $this->createdAt = new \DateTimeImmutable(); + $this->updatedAt = new \DateTimeImmutable(); } public function getId(): Uuid @@ -139,6 +143,7 @@ class AIPipelineJob { $this->currentStep = $step; $this->outputData[$step] = $data; + $this->updatedAt = new \DateTimeImmutable(); } public function resetForRetry(): void @@ -146,6 +151,7 @@ class AIPipelineJob $this->status = AIPipelineJobStatus::Queued; $this->errorMessage = null; $this->completedAt = null; + $this->updatedAt = new \DateTimeImmutable(); } public function getCreatedAt(): \DateTimeImmutable @@ -153,6 +159,11 @@ class AIPipelineJob return $this->createdAt; } + public function getUpdatedAt(): \DateTimeImmutable + { + return $this->updatedAt; + } + public function getCompletedAt(): ?\DateTimeImmutable { return $this->completedAt; @@ -161,6 +172,7 @@ class AIPipelineJob public function markProcessing(): void { $this->status = AIPipelineJobStatus::Processing; + $this->updatedAt = new \DateTimeImmutable(); } /** @param array $outputData */ @@ -169,6 +181,7 @@ class AIPipelineJob $this->status = AIPipelineJobStatus::Completed; $this->outputData = array_merge($this->outputData, $outputData); $this->completedAt = new \DateTimeImmutable(); + $this->updatedAt = new \DateTimeImmutable(); } public function markFailed(string $reason): void @@ -176,6 +189,7 @@ class AIPipelineJob $this->status = AIPipelineJobStatus::Failed; $this->errorMessage = $reason; $this->completedAt = new \DateTimeImmutable(); + $this->updatedAt = new \DateTimeImmutable(); } public function markNeedsReview(string $reason): void @@ -183,6 +197,7 @@ class AIPipelineJob $this->status = AIPipelineJobStatus::NeedsReview; $this->errorMessage = $reason; $this->completedAt = new \DateTimeImmutable(); + $this->updatedAt = new \DateTimeImmutable(); } /** @param list $missing */ diff --git a/src/Domain/Pipeline/Repository/AIPipelineJobRepositoryInterface.php b/src/Domain/Pipeline/Repository/AIPipelineJobRepositoryInterface.php index 53523be..4071354 100644 --- a/src/Domain/Pipeline/Repository/AIPipelineJobRepositoryInterface.php +++ b/src/Domain/Pipeline/Repository/AIPipelineJobRepositoryInterface.php @@ -17,5 +17,8 @@ interface AIPipelineJobRepositoryInterface public function findByArticleId(Uuid $articleId): ?AIPipelineJob; + /** @return list */ + public function findUpdatedSince(\DateTimeImmutable $since): array; + public function save(AIPipelineJob $job): void; } diff --git a/src/Infrastructure/Http/Controller/Admin/DashboardController.php b/src/Infrastructure/Http/Controller/Admin/DashboardController.php index 6b28198..ea6a8e1 100644 --- a/src/Infrastructure/Http/Controller/Admin/DashboardController.php +++ b/src/Infrastructure/Http/Controller/Admin/DashboardController.php @@ -5,6 +5,7 @@ declare(strict_types=1); namespace App\Infrastructure\Http\Controller\Admin; use EasyCorp\Bundle\EasyAdminBundle\Attribute\AdminDashboard; +use EasyCorp\Bundle\EasyAdminBundle\Config\Assets; use EasyCorp\Bundle\EasyAdminBundle\Config\Dashboard; use EasyCorp\Bundle\EasyAdminBundle\Config\MenuItem; use EasyCorp\Bundle\EasyAdminBundle\Config\UserMenu; @@ -30,6 +31,11 @@ final class DashboardController extends AbstractDashboardController ]); } + public function configureAssets(): Assets + { + return Assets::new()->addJsFile('js/admin/pipeline-notifications.js'); + } + public function configureDashboard(): Dashboard { return Dashboard::new() diff --git a/src/Infrastructure/Http/Controller/Admin/PipelineStreamController.php b/src/Infrastructure/Http/Controller/Admin/PipelineStreamController.php new file mode 100644 index 0000000..c87be9f --- /dev/null +++ b/src/Infrastructure/Http/Controller/Admin/PipelineStreamController.php @@ -0,0 +1,126 @@ + 'Foto analysiert', + 'specs_research' => 'Specs recherchiert', + 'json_coding' => 'Attribute kodiert', + 'draft_article' => 'Artikel erstellt', + 'ebay_text' => 'eBay-Texte generiert', + 'validation' => 'Validierung', + ]; + + public function __construct( + private readonly AIPipelineJobRepositoryInterface $jobRepository, + ) { + } + + #[Route('/admin/pipeline/events', name: 'admin_pipeline_events', methods: ['GET'])] + public function events(Request $request): StreamedResponse + { + $response = new StreamedResponse(function () use ($request): void { + // Disable output buffering so events reach the browser immediately + while (ob_get_level() > 0) { + ob_end_flush(); + } + + $lastEventId = $request->headers->get('Last-Event-ID') + ?? $request->query->get('lastEventId'); + + $since = $lastEventId + ? \DateTimeImmutable::createFromFormat('U.u', $lastEventId) ?: new \DateTimeImmutable() + : new \DateTimeImmutable(); + + // Per-connection state: jobId => [step, status] to suppress duplicates + /** @var array $seen */ + $seen = []; + + $startTime = time(); + + // Tell the client to reconnect after 3 s if the stream drops + echo "retry: 3000\n\n"; + flush(); + + while (!connection_aborted() && (time() - $startTime) < 90) { + $now = new \DateTimeImmutable(); + $jobs = $this->jobRepository->findUpdatedSince($since); + $since = $now; + + foreach ($jobs as $job) { + $jobId = $job->getId()->toRfc4122(); + $step = $job->getCurrentStep(); + $status = $job->getStatus()->value; + + $prev = $seen[$jobId] ?? null; + if ($prev !== null && $prev['step'] === $step && $prev['status'] === $status) { + continue; + } + + $seen[$jobId] = ['step' => $step, 'status' => $status]; + + $event = $this->buildEvent($job, $step, $status); + $id = $now->format('U.u'); + + echo "id: {$id}\n"; + echo 'data: '.json_encode($event, \JSON_THROW_ON_ERROR)."\n\n"; + flush(); + } + + sleep(2); + } + + // Signal the client that the stream ended normally — it will reconnect + echo "event: reconnect\ndata: {}\n\n"; + flush(); + }); + + $response->headers->set('Content-Type', 'text/event-stream'); + $response->headers->set('Cache-Control', 'no-cache'); + $response->headers->set('X-Accel-Buffering', 'no'); + + return $response; + } + + /** @return array{jobId: string, label: string, status: string, step: ?string, message: string} */ + private function buildEvent(AIPipelineJob $job, ?string $step, string $status): array + { + $inv = $job->getInventoryNumber() ?? substr($job->getId()->toRfc4122(), 0, 8); + $label = "Job #{$inv}"; + + $message = match (true) { + $status === AIPipelineJobStatus::Queued->value => "{$label} zur Pipeline hinzugefügt", + $status === AIPipelineJobStatus::Processing->value && $step === null + => "{$label} gestartet", + $status === AIPipelineJobStatus::Processing->value + => "{$label}: ".self::STEP_LABELS[$step] ?? $step, + $status === AIPipelineJobStatus::Completed->value => "{$label} abgeschlossen ✓", + $status === AIPipelineJobStatus::Failed->value => "{$label} fehlgeschlagen: ".($job->getErrorMessage() ?? ''), + $status === AIPipelineJobStatus::NeedsReview->value + => "{$label} benötigt manuelle Prüfung", + default => "{$label}: {$status}", + }; + + return [ + 'jobId' => $job->getId()->toRfc4122(), + 'label' => $label, + 'status' => $status, + 'step' => $step, + 'message' => $message, + ]; + } +} diff --git a/src/Infrastructure/Persistence/Repository/DoctrineAIPipelineJobRepository.php b/src/Infrastructure/Persistence/Repository/DoctrineAIPipelineJobRepository.php index 8e67333..50b4e93 100644 --- a/src/Infrastructure/Persistence/Repository/DoctrineAIPipelineJobRepository.php +++ b/src/Infrastructure/Persistence/Repository/DoctrineAIPipelineJobRepository.php @@ -37,6 +37,19 @@ final class DoctrineAIPipelineJobRepository implements AIPipelineJobRepositoryIn ); } + /** @return list */ + public function findUpdatedSince(\DateTimeImmutable $since): array + { + /** @var list */ + return $this->em->getRepository(AIPipelineJob::class) + ->createQueryBuilder('j') + ->where('j.updatedAt >= :since') + ->setParameter('since', $since) + ->orderBy('j.updatedAt', 'ASC') + ->getQuery() + ->getResult(); + } + public function save(AIPipelineJob $job): void { $this->em->persist($job);