Skip to content

Commit

Permalink
Azure 테이블 저장조 파티션을 재구성한다
Browse files Browse the repository at this point in the history
Azure 테이블 저장소 기반 이벤트 저장소 구현체가 영속 이벤트와 발행 대기
이벤트를 하나의 파티션에 기록해 두 이벤트의 기록 작업이 원자성을 가지도록
변경한다.

Issue: #28
  • Loading branch information
gyuwon committed Jan 7, 2018
1 parent b722ced commit d1ab7ba
Show file tree
Hide file tree
Showing 15 changed files with 617 additions and 466 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@

public abstract class AggregateEntity : TableEntity
{
public static string GetPartitionKey(Type aggregateType, Guid aggregateId)
public static string GetPartitionKey(Type sourceType, Guid sourceId)
{
if (aggregateType == null)
if (sourceType == null)
{
throw new ArgumentNullException(nameof(aggregateType));
throw new ArgumentNullException(nameof(sourceType));
}

if (aggregateId == Guid.Empty)
if (sourceId == Guid.Empty)
{
throw new ArgumentException("Value cannot be empty.", nameof(aggregateId));
throw new ArgumentException("Value cannot be empty.", nameof(sourceId));
}

return $"{aggregateType.Name}-{aggregateId:n}";
return $"{sourceType.Name}-{sourceId:n}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,12 @@
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;

using static Microsoft.WindowsAzure.Storage.Table.QueryComparisons;
using static Microsoft.WindowsAzure.Storage.Table.TableOperators;
using static Microsoft.WindowsAzure.Storage.Table.TableQuery;

public class AzureEventStore : IAzureEventStore
{
private CloudTable _eventTable;
private IMessageSerializer _serializer;
private readonly CloudTable _eventTable;
private readonly IMessageSerializer _serializer;

public AzureEventStore(
CloudTable eventTable, IMessageSerializer serializer)
public AzureEventStore(CloudTable eventTable, IMessageSerializer serializer)
{
_eventTable = eventTable ?? throw new ArgumentNullException(nameof(eventTable));
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
Expand All @@ -40,6 +35,13 @@ public Task SaveEvents<T>(

var domainEvents = events.ToList();

if (domainEvents.Count == 0)
{
return Task.FromResult(true);
}

IDomainEvent firstEvent = domainEvents.First();

for (int i = 0; i < domainEvents.Count; i++)
{
if (domainEvents[i] == null)
Expand All @@ -48,6 +50,20 @@ public Task SaveEvents<T>(
$"{nameof(events)} cannot contain null.",
nameof(events));
}

if (domainEvents[i].Version != firstEvent.Version + i)
{
throw new ArgumentException(
$"Versions of {nameof(events)} must be sequential.",
nameof(events));
}

if (domainEvents[i].SourceId != firstEvent.SourceId)
{
throw new ArgumentException(
$"All events must have the same source id.",
nameof(events));
}
}

return Save<T>(domainEvents, operationId, correlationId, contributor, cancellationToken);
Expand All @@ -61,53 +77,24 @@ private async Task Save<T>(
CancellationToken cancellationToken)
where T : class, IEventSourced
{
if (domainEvents.Any() == false)
{
return;
}

var envelopes = new List<Envelope>(
var envelopes = new List<Envelope<IDomainEvent>>(
from domainEvent in domainEvents
select new Envelope(Guid.NewGuid(), domainEvent, operationId, correlationId, contributor));

await InsertPendingEvents<T>(envelopes, cancellationToken).ConfigureAwait(false);
await InsertEventsAndCorrelation<T>(envelopes, correlationId, cancellationToken).ConfigureAwait(false);
}
let messageId = Guid.NewGuid()
select new Envelope<IDomainEvent>(messageId, domainEvent, operationId, correlationId, contributor));

private Task InsertPendingEvents<T>(
List<Envelope> envelopes,
CancellationToken cancellationToken)
where T : class, IEventSourced
{
var batch = new TableBatchOperation();

foreach (Envelope envelope in envelopes)
foreach (Envelope<IDomainEvent> envelope in envelopes)
{
batch.Insert(PendingEventTableEntity.FromEnvelope<T>(envelope, _serializer));
batch.Insert(PersistentEvent.Create(typeof(T), envelope, _serializer));
batch.Insert(PendingEvent.Create(typeof(T), envelope, _serializer));
}

return _eventTable.ExecuteBatch(batch, cancellationToken);
}

private async Task InsertEventsAndCorrelation<T>(
List<Envelope> envelopes,
Guid? correlationId,
CancellationToken cancellationToken)
where T : class, IEventSourced
{
var batch = new TableBatchOperation();

var firstEvent = (IDomainEvent)envelopes.First().Message;
Guid sourceId = firstEvent.SourceId;

foreach (Envelope envelope in envelopes)
{
batch.Insert(EventTableEntity.FromEnvelope<T>(envelope, _serializer));
}
Guid sourceId = domainEvents.First().SourceId;

if (correlationId.HasValue)
{
batch.Insert(CorrelationTableEntity.Create(typeof(T), sourceId, correlationId.Value));
batch.Insert(Correlation.Create(typeof(T), sourceId, correlationId.Value));
}

try
Expand All @@ -116,9 +103,9 @@ private async Task InsertEventsAndCorrelation<T>(
}
catch (StorageException exception) when (correlationId.HasValue)
{
string filter = CorrelationTableEntity.GetFilter(typeof(T), sourceId, correlationId.Value);
TableQuery<CorrelationTableEntity> query = new TableQuery<CorrelationTableEntity>().Where(filter);
if (await _eventTable.Any(query, cancellationToken))
string filter = Correlation.GetFilter(typeof(T), sourceId, correlationId.Value);
var query = new TableQuery<Correlation> { FilterString = filter };
if (await _eventTable.Any(query, cancellationToken).ConfigureAwait(false))
{
throw new DuplicateCorrelationException(
typeof(T),
Expand Down Expand Up @@ -152,18 +139,8 @@ private async Task<IEnumerable<IDomainEvent>> Load<T>(
CancellationToken cancellationToken)
where T : class, IEventSourced
{
string filter = CombineFilters(
GenerateFilterCondition(
nameof(ITableEntity.PartitionKey),
Equal,
EventTableEntity.GetPartitionKey(typeof(T), sourceId)),
And,
GenerateFilterCondition(
nameof(ITableEntity.RowKey),
GreaterThan,
EventTableEntity.GetRowKey(afterVersion)));

TableQuery<EventTableEntity> query = new TableQuery<EventTableEntity>().Where(filter);
string filter = PersistentEvent.GetFilter(typeof(T), sourceId, afterVersion);
var query = new TableQuery<EventTableEntity> { FilterString = filter };

IEnumerable<EventTableEntity> events = await _eventTable
.ExecuteQuery(query, cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
namespace Khala.EventSourcing.Azure
{
using System;
using Microsoft.WindowsAzure.Storage.Table;

public class Correlation : AggregateEntity
{
public Guid CorrelationId { get; set; }

public static string GetRowKey(Guid correlationId)
{
if (correlationId == Guid.Empty)
{
throw new ArgumentException("Value cannot be empty.", nameof(correlationId));
}

return $"Correlation-{correlationId:n}";
}

public static Correlation Create(
Type sourceType,
Guid sourceId,
Guid correlationId)
{
if (sourceType == null)
{
throw new ArgumentNullException(nameof(sourceType));
}

if (sourceId == Guid.Empty)
{
throw new ArgumentException("Value cannot be empty.", nameof(sourceId));
}

if (correlationId == Guid.Empty)
{
throw new ArgumentException("Value cannot be empty.", nameof(correlationId));
}

return new Correlation
{
PartitionKey = GetPartitionKey(sourceType, sourceId),
RowKey = GetRowKey(correlationId),
CorrelationId = correlationId,
};
}

public static string GetFilter(
Type sourceType,
Guid sourceId,
Guid correlationId)
{
if (sourceType == null)
{
throw new ArgumentNullException(nameof(sourceType));
}

if (sourceId == Guid.Empty)
{
throw new ArgumentException("Value cannot be empty.", nameof(sourceId));
}

if (correlationId == Guid.Empty)
{
throw new ArgumentException("Value cannot be empty.", nameof(correlationId));
}

string partitionCondition = TableQuery.GenerateFilterCondition(
nameof(PartitionKey),
QueryComparisons.Equal,
GetPartitionKey(sourceType, sourceId));

string rowCondition = TableQuery.GenerateFilterCondition(
nameof(RowKey),
QueryComparisons.Equal,
GetRowKey(correlationId));

return TableQuery.CombineFilters(partitionCondition, TableOperators.And, rowCondition);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@
{
using System;
using Khala.Messaging;
using Microsoft.WindowsAzure.Storage.Table;

public class PendingEvent : EventEntity
{
public static string GetRowKey(int version) => $"Pending-{version:D10}";

public static PendingEvent Create(
Type aggregateType,
Type sourceType,
Envelope<IDomainEvent> envelope,
IMessageSerializer serializer)
{
if (aggregateType == null)
if (sourceType == null)
{
throw new ArgumentNullException(nameof(aggregateType));
throw new ArgumentNullException(nameof(sourceType));
}

if (envelope == null)
Expand All @@ -29,7 +30,7 @@ public static PendingEvent Create(

return new PendingEvent
{
PartitionKey = GetPartitionKey(aggregateType, envelope.Message.SourceId),
PartitionKey = GetPartitionKey(sourceType, envelope.Message.SourceId),
RowKey = GetRowKey(envelope.Message.Version),
MessageId = envelope.MessageId,
EventJson = serializer.Serialize(envelope.Message),
Expand All @@ -38,5 +39,36 @@ public static PendingEvent Create(
Contributor = envelope.Contributor,
};
}

public static string GetFilter(Type sourceType, Guid sourceId, int afterVersion = default)
{
if (sourceType == null)
{
throw new ArgumentNullException(nameof(sourceType));
}

if (sourceId == Guid.Empty)
{
throw new ArgumentException("Value cannot be empty.", nameof(sourceId));
}

string partitionCondition = TableQuery.GenerateFilterCondition(
nameof(PartitionKey),
QueryComparisons.Equal,
GetPartitionKey(sourceType, sourceId));

string rowCondition = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition(
nameof(RowKey),
QueryComparisons.GreaterThan,
GetRowKey(afterVersion)),
TableOperators.And,
TableQuery.GenerateFilterCondition(
nameof(RowKey),
QueryComparisons.LessThanOrEqual,
GetRowKey(int.MaxValue)));

return TableQuery.CombineFilters(partitionCondition, TableOperators.And, rowCondition);
}
}
}
Loading

0 comments on commit d1ab7ba

Please sign in to comment.