Skip to content

Commit

Permalink
Merge pull request #4 from ledjon-behluli/fix-pass-through-stream
Browse files Browse the repository at this point in the history
Fix missing streamChannel write for BaseAgent
  • Loading branch information
ledjon-behluli authored Aug 26, 2023
2 parents 6ba4772 + 69e79e5 commit 99afa47
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
7 changes: 3 additions & 4 deletions src/OrleanSpaces/Agents/BaseAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ async ValueTask ISpaceRouter<TTuple, TTemplate>.RouteAction(TupleAction<TTuple>
case TupleActionType.Insert:
{
tuples = tuples.Add(action.Tuple);
if (streamChannel is not null)
{
await streamChannel.Writer.WriteAsync(action.Tuple);
}
await streamChannel.WriteIfNotNull(action.Tuple);
}
break;
case TupleActionType.Remove:
Expand Down Expand Up @@ -86,6 +83,8 @@ public async Task WriteAsync(TTuple tuple)
ThrowHelpers.EmptyTuple(tuple);

await tupleStore.Insert(new(agentId, tuple, TupleActionType.Insert));
await streamChannel.WriteIfNotNull(tuple);

tuples = tuples.Add(tuple);
}

Expand Down
12 changes: 4 additions & 8 deletions src/OrleanSpaces/Agents/SpaceAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ async ValueTask ISpaceRouter<SpaceTuple, SpaceTemplate>.RouteAction(TupleAction<
case TupleActionType.Insert:
{
tuples = tuples.Add(action.Tuple);
if (streamChannel is not null)
{
await streamChannel.Writer.WriteAsync(action.Tuple);
}
await streamChannel.WriteIfNotNull(action.Tuple);
}
break;
case TupleActionType.Remove:
Expand Down Expand Up @@ -81,12 +78,11 @@ public void Unsubscribe(Guid observerId)
public async Task WriteAsync(SpaceTuple tuple)
{
ThrowHelpers.EmptyTuple(tuple);

await tupleStore.Insert(new(agentId, tuple, TupleActionType.Insert));
await streamChannel.WriteIfNotNull(tuple);

tuples = tuples.Add(tuple);
if (streamChannel is not null)
{
await streamChannel.Writer.WriteAsync(tuple);
}
}

public ValueTask EvaluateAsync(Func<Task<SpaceTuple>> evaluation)
Expand Down
11 changes: 11 additions & 0 deletions src/OrleanSpaces/Helpers/Helpers.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using OrleanSpaces.Helpers;
using OrleanSpaces.Tuples;

Expand Down Expand Up @@ -46,4 +47,14 @@ public static bool AllocateAndExecute<T, TConsumer>(this TConsumer consumer, int
return result;
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static async Task WriteIfNotNull<T>(this Channel<T>? channel, T tuple)
where T : ISpaceTuple
{
if (channel is not null)
{
await channel.Writer.WriteAsync(tuple);
}
}
}

0 comments on commit 99afa47

Please sign in to comment.