Skip to content

Commit

Permalink
Try no compression
Browse files Browse the repository at this point in the history
  • Loading branch information
LukeButters committed Jul 26, 2023
1 parent 4f1f669 commit ae70657
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 20 deletions.
26 changes: 13 additions & 13 deletions source/Halibut.Tests/LowerHalibutLimitsForAllTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ public static void LowerHalibutLimits()

// The following 4 can be overriden, so set them high and let the test author drop the values as needed.
// Also set to a "weird value" to make it more obvious which timeout is at play in tests.
halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.PollingRequestQueueTimeout), TimeSpan.FromSeconds(66));
halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.PollingRequestMaximumMessageProcessingTimeout), TimeSpan.FromSeconds(66));
halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.RetryListeningSleepInterval), TimeSpan.FromSeconds(1));
halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.ConnectionErrorRetryTimeout), TimeSpan.FromSeconds(66)); // Must always be greater than the heartbeat timeout.

// Intentionally set higher than the heart beat, since some tests need to determine that the hart beat timeout applies.
halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientSendTimeout), HalftheTcpRecieveTimeout + HalftheTcpRecieveTimeout);
halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientReceiveTimeout), HalftheTcpRecieveTimeout + HalftheTcpRecieveTimeout);

halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientHeartbeatSendTimeout), TimeSpan.FromSeconds(15));
halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientHeartbeatReceiveTimeout), TimeSpan.FromSeconds(15));
halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientConnectTimeout), TimeSpan.FromSeconds(20));
halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.PollingQueueWaitTimeout), TimeSpan.FromSeconds(20));
// halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.PollingRequestQueueTimeout), TimeSpan.FromSeconds(66));
// halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.PollingRequestMaximumMessageProcessingTimeout), TimeSpan.FromSeconds(66));
// halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.RetryListeningSleepInterval), TimeSpan.FromSeconds(1));
// halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.ConnectionErrorRetryTimeout), TimeSpan.FromSeconds(66)); // Must always be greater than the heartbeat timeout.
//
// // Intentionally set higher than the heart beat, since some tests need to determine that the hart beat timeout applies.
// halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientSendTimeout), HalftheTcpRecieveTimeout + HalftheTcpRecieveTimeout);
// halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientReceiveTimeout), HalftheTcpRecieveTimeout + HalftheTcpRecieveTimeout);
//
// halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientHeartbeatSendTimeout), TimeSpan.FromSeconds(15));
// halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientHeartbeatReceiveTimeout), TimeSpan.FromSeconds(15));
// halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientConnectTimeout), TimeSpan.FromSeconds(20));
// halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.PollingQueueWaitTimeout), TimeSpan.FromSeconds(20));
}
}
}
2 changes: 1 addition & 1 deletion source/Halibut.Tests/UsageFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public async Task OctopusCanSendMessagesToTentacle_WithEchoService(ClientAndServ
{
using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
.WithStandardServices()
.WithHalibutLoggingLevel(LogLevel.Info)
.WithHalibutLoggingLevel(LogLevel.Trace)
.Build(CancellationToken))
{
var echo = clientAndService.CreateClient<IEchoService>();
Expand Down
5 changes: 3 additions & 2 deletions source/Halibut/Transport/Protocol/MessageExchangeStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ public class MessageExchangeStream : IMessageExchangeStream
public MessageExchangeStream(Stream stream, IMessageSerializer serializer, ILog log)
{
#if NETFRAMEWORK
this.stream = stream;
var myStream = stream;
#else
this.stream = new RewindableBufferStream(stream, HalibutLimits.RewindableBufferStreamSize);
var myStream = new RewindableBufferStream(stream, HalibutLimits.RewindableBufferStreamSize);
#endif
this.stream = myStream.ToRecordingStream("msgex", log);
this.log = log;
streamWriter = new StreamWriter(this.stream, new UTF8Encoding(false)) { NewLine = "\r\n" };
this.serializer = serializer;
Expand Down
52 changes: 48 additions & 4 deletions source/Halibut/Transport/Protocol/MessageSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.IO;
using System.IO.Compression;
using System.Text;
using Halibut.Transport.Observability;
using Newtonsoft.Json;
using Newtonsoft.Json.Bson;
Expand Down Expand Up @@ -43,13 +44,23 @@ internal MessageSerializer(
{
typeRegistry.AddToMessageContract(types);
}

private void ReallyEndReading(DeflateStream zip)
{
var b = new byte[1024];
var res = zip.Read(b, 0, b.Length);
}

CompressionLevel compressionLevel = CompressionLevel.NoCompression;

public void WriteMessage<T>(Stream stream, T message)
{
using var compressedByteCountingStream = new ByteCountingStream(stream, OnDispose.LeaveInputStreamOpen);

using (var zip = new DeflateStream(compressedByteCountingStream, CompressionMode.Compress, true))
using (var bson = new BsonDataWriter(zip) { CloseOutput = false })
using (var zip = new DeflateStream(compressedByteCountingStream, compressionLevel, true))
//using (var bson = new BsonDataWriter(zip) { CloseOutput = false })
using (StreamWriter writer = new StreamWriter(zip, Encoding.UTF8, -1, true))
using (var bson = new JsonTextWriter(writer) { CloseOutput = false })
{
// for the moment this MUST be object so that the $type property is included
// If it is not, then an old receiver (eg, old tentacle) will not be able to understand messages from a new sender (server)
Expand All @@ -62,6 +73,14 @@ public void WriteMessage<T>(Stream stream, T message)

public T ReadMessage<T>(Stream stream)
{
if (stream is StreamAndRecord streamAndRecord)
{
if (streamAndRecord.stream is IRewindableBuffer rewindable2)
{
return ReadCompressedMessageRewindable<T>(stream, rewindable2);
}
}

if (stream is IRewindableBuffer rewindable)
{
return ReadCompressedMessageRewindable<T>(stream, rewindable);
Expand All @@ -75,15 +94,22 @@ T ReadCompressedMessage<T>(Stream stream)
using (var compressedByteCountingStream = new ByteCountingStream(stream, OnDispose.LeaveInputStreamOpen))
using (var zip = new DeflateStream(compressedByteCountingStream, CompressionMode.Decompress, true))
using (var decompressedByteCountingStream = new ByteCountingStream(zip, OnDispose.LeaveInputStreamOpen))
using (var bson = new BsonDataReader(decompressedByteCountingStream) { CloseInput = false })
//using (var bson = new BsonDataReader(decompressedByteCountingStream) { CloseInput = false })
using (StreamReader reader = new StreamReader(zip, Encoding.UTF8, true, -1, true))
using (var bson = new JsonTextReader(reader) {CloseInput = false})
{
var messageEnvelope = DeserializeMessage<T>(bson);

observer.MessageRead(compressedByteCountingStream.BytesRead, decompressedByteCountingStream.BytesRead);

// May be needed with no compression
ReallyEndReading(zip);

return messageEnvelope.Message;
}
}



T ReadCompressedMessageRewindable<T>(Stream stream, IRewindableBuffer rewindable)
{
Expand All @@ -93,13 +119,31 @@ T ReadCompressedMessageRewindable<T>(Stream stream, IRewindableBuffer rewindable
using (var compressedByteCountingStream = new ByteCountingStream(stream, OnDispose.LeaveInputStreamOpen))
using (var zip = new DeflateStream(compressedByteCountingStream, CompressionMode.Decompress, true))
using (var decompressedObservableStream = new ByteCountingStream(zip, OnDispose.LeaveInputStreamOpen))
using (var bson = new BsonDataReader(decompressedObservableStream) { CloseInput = false })
//using (var bson = new BsonDataReader(decompressedObservableStream) { CloseInput = false })
using (StreamReader reader = new StreamReader(zip, Encoding.UTF8, true, -1, true))
using (var bson = new JsonTextReader(reader) {CloseInput = false})
{
var messageEnvelope = DeserializeMessage<T>(bson);

// May be needed with no compression
//ReallyEndReading(zip);

// Find the unused bytes in the DeflateStream input buffer
if (deflateReflector.TryGetAvailableInputBufferSize(zip, out var unusedBytesCount))
{
if (unusedBytesCount == 0)
{
// Chance of a fix here:
var b = new byte[1024];
var res = zip.Read(b, 0, b.Length);
// Chance of fix END.
deflateReflector.TryGetAvailableInputBufferSize(zip, out unusedBytesCount);
if (unusedBytesCount != 0)
{
// Fix used.
}
}

rewindable.FinishAndRewind(unusedBytesCount);
}
else
Expand Down

0 comments on commit ae70657

Please sign in to comment.