Skip to content

Commit

Permalink
Merge pull request #24 from patchlevel/support-stream-store
Browse files Browse the repository at this point in the history
Support stream store
  • Loading branch information
DavidBadura authored Jan 15, 2025
2 parents 9420c76 + 601645d commit 550e255
Show file tree
Hide file tree
Showing 9 changed files with 575 additions and 347 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
],
"require": {
"php": "~8.1.0 || ~8.2.0 || ~8.3.0",
"patchlevel/event-sourcing": "^3.6.0",
"patchlevel/event-sourcing-bundle": "^3.0.0",
"symfony/asset": "^5.4.33|^6.4.1|^7.0.1",
"symfony/asset-mapper": "^6.4.1|^7.0.1",
Expand Down
553 changes: 319 additions & 234 deletions composer.lock

Large diffs are not rendered by default.

74 changes: 51 additions & 23 deletions src/Controller/InspectionController.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@
use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
use Patchlevel\EventSourcing\Aggregate\CustomId;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadataFactory;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Snapshot\SnapshotStore;
use Patchlevel\EventSourcing\Store\Criteria\AggregateIdCriterion;
use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Store\StreamStore;
use Patchlevel\Hydrator\Hydrator;
use Symfony\Component\HttpFoundation\RedirectResponse;
use Symfony\Component\HttpFoundation\Request;
Expand Down Expand Up @@ -71,6 +75,7 @@ public function indexAction(Request $request): Response

public function showAction(Request $request, string $aggregateName, string $aggregateId): Response
{
$criteria = $this->getCriteria($aggregateName, $aggregateId);
$until = null;

if ($request->query->has('until')) {
Expand All @@ -82,18 +87,8 @@ public function showAction(Request $request, string $aggregateName, string $aggr
$aggregateClass = $this->aggregateRootRegistry->aggregateClass($aggregateName);
$aggregate = $this->aggregate($aggregateName, $aggregateId, $until);

$criteria = new Criteria(
new AggregateNameCriterion($aggregateName),
new AggregateIdCriterion($aggregateId),
);

$messages = $this->store->load(
$criteria,
);

$count = $this->store->count(
$criteria,
);
$count = $this->store->count($criteria);
$messages = $this->store->load($criteria);

try {
$serializedError = null;
Expand All @@ -108,10 +103,7 @@ public function showAction(Request $request, string $aggregateName, string $aggr

try {
$snapshotError = null;
$snapshot = $this->snapshotStore->load(
$aggregateClass,
CustomId::fromString($aggregateId),
);
$snapshot = $this->snapshotStore->load($aggregateClass, CustomId::fromString($aggregateId));
} catch (Throwable $e) {
$snapshot = null;
$snapshotError = $e->getMessage();
Expand All @@ -138,11 +130,7 @@ public function showAction(Request $request, string $aggregateName, string $aggr

private function aggregate(string $aggregateName, string $aggregateId, int|null $until = null): AggregateRoot
{
$criteria = new Criteria(
new AggregateNameCriterion($aggregateName),
new AggregateIdCriterion($aggregateId),
);

$criteria = $this->getCriteria($aggregateName, $aggregateId);
$stream = null;

try {
Expand All @@ -160,7 +148,7 @@ private function aggregate(string $aggregateName, string $aggregateId, int|null

return $aggregateClass::createFromEvents(
$this->unpack($stream, $until),
$firstMessage->header(AggregateHeader::class)->playhead - 1,
$this->getPlayhead($firstMessage) - 1,
);
} finally {
$stream?->close();
Expand All @@ -171,11 +159,51 @@ private function aggregate(string $aggregateName, string $aggregateId, int|null
private function unpack(Stream $stream, int|null $until = null): Traversable
{
foreach ($stream as $message) {
if ($until !== null && $message->header(AggregateHeader::class)->playhead > $until) {
if ($until === null) {
yield $message->event();
}

if ($message->hasHeader(AggregateHeader::class) && $message->header(AggregateHeader::class)->playhead > $until) {
break;
}

if ($message->hasHeader(PlayheadHeader::class) && $message->header(PlayheadHeader::class)->playhead > $until) {
break;
}

yield $message->event();
}
}

private function getCriteria(string $aggregateName, string $aggregateId): Criteria
{
if ($this->store instanceof StreamStore) {
return new Criteria(
new StreamCriterion(
$this->aggregateRootMetadataFactory->metadata(
$this->aggregateRootRegistry->aggregateClass($aggregateName),
)->streamName($aggregateId),
),
);
}

return new Criteria(
new AggregateNameCriterion($aggregateName),
new AggregateIdCriterion($aggregateId),
);
}

/**
* @param Message<object> $message
*
* @return positive-int
*/
public function getPlayhead(Message $message): int
{
if ($message->hasHeader(AggregateHeader::class)) {
return $message->header(AggregateHeader::class)->playhead;
}

return $message->header(PlayheadHeader::class)->playhead;
}
}
5 changes: 5 additions & 0 deletions src/Controller/StoreController.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ private function criteria(Request $request): Criteria
$criteriaBuilder->aggregateId($aggregateId);
}

$streamName = $request->query->getString('streamName');
if ($streamName) {
$criteriaBuilder->streamName($streamName);
}

return $criteriaBuilder->build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public function load(array $configs, ContainerBuilder $container): void
new Reference(EventRegistry::class),
new Reference(EventSerializer::class),
new Reference(TokenMapper::class),
new Reference(Store::class),
])
->addTag('twig.extension');

Expand Down
72 changes: 72 additions & 0 deletions src/Twig/EventSourcingAdminExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Patchlevel\EventSourcingAdminBundle\Twig;

use DateTimeImmutable;
use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
use Patchlevel\EventSourcing\Message\HeaderNotFound;
Expand All @@ -12,6 +13,11 @@
use Patchlevel\EventSourcing\Metadata\Event\EventRegistry;
use Patchlevel\EventSourcing\Serializer\Encoder\JsonEncoder;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\StreamStore;
use Patchlevel\EventSourcingAdminBundle\Message\Header\RequestIdHeader;
use Patchlevel\EventSourcingAdminBundle\TokenMapper;
use Twig\Extension\AbstractExtension;
Expand All @@ -28,6 +34,7 @@ public function __construct(
private readonly EventRegistry $eventRegistry,
private readonly EventSerializer $eventSerializer,
private readonly TokenMapper $tokenMapper,
private readonly Store $store,
) {
}

Expand All @@ -36,6 +43,12 @@ public function getFunctions(): array
{
return [
new TwigFunction('eventsourcing_aggregate_class', $this->aggregateClass(...)),
new TwigFunction('eventsourcing_aggregate_name', $this->aggregateName(...)),
new TwigFunction('eventsourcing_aggregate_id', $this->aggregateId(...)),
new TwigFunction('eventsourcing_uses_stream_store', $this->usesStreamStore(...)),
new TwigFunction('eventsourcing_stream_name', $this->streamName(...)),
new TwigFunction('eventsourcing_playhead', $this->playhead(...)),
new TwigFunction('eventsourcing_recorded_on', $this->recordedOn(...)),
new TwigFunction('eventsourcing_event_class', $this->eventClass(...)),
new TwigFunction('eventsourcing_event_name', $this->eventName(...)),
new TwigFunction('eventsourcing_event_payload', $this->eventPayload(...)),
Expand All @@ -61,6 +74,65 @@ public function aggregateClass(Message $message): string
return $this->aggregateRootRegistry->aggregateClass($message->header(AggregateHeader::class)->aggregateName);
}

/** @param Message<object> $message */
public function aggregateName(Message $message): string
{
return $message->header(AggregateHeader::class)->aggregateName;
}

/** @param Message<object> $message */
public function aggregateId(Message $message): string
{
return $message->header(AggregateHeader::class)->aggregateId;
}

public function usesStreamStore(): bool
{
return $this->store instanceof StreamStore;
}

/** @param Message<object> $message */
public function streamName(Message $message): string
{
try {
return $message->header(StreamNameHeader::class)->streamName;
} catch (HeaderNotFound) {
return $message->header(AggregateHeader::class)->streamName();
}
}

/** @param Message<object> $message */
public function playhead(Message $message): int|null
{
try {
return $message->header(PlayheadHeader::class)->playhead;
} catch (HeaderNotFound) {
}

try {
return $message->header(AggregateHeader::class)->playhead;
} catch (HeaderNotFound) {
}

return null;
}

/** @param Message<object> $message */
public function recordedOn(Message $message): DateTimeImmutable|null
{
try {
return $message->header(RecordedOnHeader::class)->recordedOn;
} catch (HeaderNotFound) {
}

try {
return $message->header(AggregateHeader::class)->recordedOn;
} catch (HeaderNotFound) {
}

return null;
}

/**
* @param Message<T> $message
*
Expand Down
6 changes: 2 additions & 4 deletions templates/inspection/show.html.twig
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@
<div class="flow-root">
<ul role="list" class="-mb-8">
{% for message in messages %}
{% set aggregateHeader = message.header("Patchlevel\\EventSourcing\\Aggregate\\AggregateHeader") %}

<li class="group/message relative {% if until and loop.index > until %}saturate-0{% endif %}">
<span id="{{ loop.index }}" class="absolute -top-48"></span>
<div id="marker-{{ loop.index }}" class="absolute -left-10 z-20 invisible">
Expand All @@ -186,7 +184,7 @@
data-modal="modal-{{ loop.index }}">
{{ heroicon('magnifying-glass', 'h-5 w-5 inline') }}
</button>
<a href="{{ path('patchlevel_event_sourcing_admin_inspection_show', { aggregateId: aggregateId, aggregateName: aggregateName, tab: tab, until: aggregateHeader.playhead }) }}#{{ aggregateHeader.playhead }}"
<a href="{{ path('patchlevel_event_sourcing_admin_inspection_show', { aggregateId: aggregateId, aggregateName: aggregateName, tab: tab, until: eventsourcing_playhead(message) }) }}#{{ eventsourcing_playhead(message) }}"
class="text-gray-500 hover:text-gray-900 invisible group-hover/message:visible"
>
{{ heroicon('play-pause', 'h-5 w-5 inline') }}
Expand All @@ -206,7 +204,7 @@
</dialog>
</div>
<div class="whitespace-nowrap text-right text-xs text-gray-500">
{{ aggregateHeader.recordedOn|date('Y-m-d H:i:s') }}
{{ eventsourcing_recorded_on(message) ? eventsourcing_recorded_on(message)|date('Y-m-d H:i:s') : '-' }}
</div>
</div>
</div>
Expand Down
32 changes: 17 additions & 15 deletions templates/store/detail.html.twig
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
<dl class="divide-y divide-gray-100">
{% set aggregateHeader = message.header("Patchlevel\\EventSourcing\\Aggregate\\AggregateHeader") %}
{% if eventsourcing_uses_stream_store() %}
{{ _self.text('Stream', eventsourcing_stream_name(message)) }}
{% else %}
{{ _self.text('Aggregate', eventsourcing_aggregate_name(message)) }}
{{ _self.text('Aggregate Class', eventsourcing_aggregate_class(message)) }}

{{ _self.text('Aggregate', aggregateHeader.aggregateName) }}
{{ _self.text('Aggregate Class', eventsourcing_aggregate_class(message)) }}

<div class="p-4 sm:grid sm:grid-cols-3 sm:gap-4 sm:px-0 group/parent">
<dt class="text-sm font-medium leading-6 text-gray-900">Aggregate Id</dt>
<dd class="mt-1 text-sm leading-6 text-gray-500 sm:col-span-2 sm:mt-0">
{{ aggregateHeader.aggregateId }}
<a class="group-hover/parent:visible invisible" href="{{ path('patchlevel_event_sourcing_admin_inspection_show', {aggregateName: aggregateHeader.aggregateName, aggregateId: aggregateHeader.aggregateId}) }}">
{{ heroicon('identification', 'h-5 w-5 -mt-1 text-gray-400 inline') }}
</a>
</dd>
</div>
<div class="p-4 sm:grid sm:grid-cols-3 sm:gap-4 sm:px-0 group/parent">
<dt class="text-sm font-medium leading-6 text-gray-900">Aggregate Id</dt>
<dd class="mt-1 text-sm leading-6 text-gray-500 sm:col-span-2 sm:mt-0">
{{ eventsourcing_aggregate_id(message) }}
<a class="group-hover/parent:visible invisible" href="{{ path('patchlevel_event_sourcing_admin_inspection_show', {aggregateName: eventsourcing_aggregate_name(message), aggregateId: eventsourcing_aggregate_id(message)}) }}">
{{ heroicon('identification', 'h-5 w-5 -mt-1 text-gray-400 inline') }}
</a>
</dd>
</div>
{% endif %}

{{ _self.text('Aggregate Playhead', aggregateHeader.playhead) }}
{{ _self.text('Playhead', eventsourcing_playhead(message)) }}

<div class="p-4 sm:grid sm:grid-cols-3 sm:gap-4 sm:px-0 group/parent">
<dt class="text-sm font-medium leading-6 text-gray-900">Event Name</dt>
Expand All @@ -36,7 +38,7 @@
</dd>
</div>

{{ _self.text('Recorded on', aggregateHeader.recordedOn|date('Y-m-d H:i:s')) }}
{{ _self.text('Recorded on', eventsourcing_recorded_on(message)|date('Y-m-d H:i:s')) }}
<div class="p-4 sm:grid sm:grid-cols-3 sm:gap-4 sm:px-0">
<dt class="text-sm font-medium leading-6 text-gray-900">Profiler Token</dt>
<dd class="mt-1 text-sm leading-6 text-gray-500 sm:col-span-2 sm:mt-0">
Expand Down
Loading

0 comments on commit 550e255

Please sign in to comment.