From ae706573a6a0dead83f811bff993ae89a0dccb8e Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Wed, 26 Jul 2023 11:53:55 +1000 Subject: [PATCH] Try no compression --- .../LowerHalibutLimitsForAllTests.cs | 26 +++++----- source/Halibut.Tests/UsageFixture.cs | 2 +- .../Protocol/MessageExchangeStream.cs | 5 +- .../Transport/Protocol/MessageSerializer.cs | 52 +++++++++++++++++-- 4 files changed, 65 insertions(+), 20 deletions(-) diff --git a/source/Halibut.Tests/LowerHalibutLimitsForAllTests.cs b/source/Halibut.Tests/LowerHalibutLimitsForAllTests.cs index 2d454db9..bb718426 100644 --- a/source/Halibut.Tests/LowerHalibutLimitsForAllTests.cs +++ b/source/Halibut.Tests/LowerHalibutLimitsForAllTests.cs @@ -17,19 +17,19 @@ public static void LowerHalibutLimits() // The following 4 can be overriden, so set them high and let the test author drop the values as needed. // Also set to a "weird value" to make it more obvious which timeout is at play in tests. - halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.PollingRequestQueueTimeout), TimeSpan.FromSeconds(66)); - halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.PollingRequestMaximumMessageProcessingTimeout), TimeSpan.FromSeconds(66)); - halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.RetryListeningSleepInterval), TimeSpan.FromSeconds(1)); - halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.ConnectionErrorRetryTimeout), TimeSpan.FromSeconds(66)); // Must always be greater than the heartbeat timeout. - - // Intentionally set higher than the heart beat, since some tests need to determine that the hart beat timeout applies. - halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientSendTimeout), HalftheTcpRecieveTimeout + HalftheTcpRecieveTimeout); - halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientReceiveTimeout), HalftheTcpRecieveTimeout + HalftheTcpRecieveTimeout); - - halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientHeartbeatSendTimeout), TimeSpan.FromSeconds(15)); - halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientHeartbeatReceiveTimeout), TimeSpan.FromSeconds(15)); - halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientConnectTimeout), TimeSpan.FromSeconds(20)); - halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.PollingQueueWaitTimeout), TimeSpan.FromSeconds(20)); + // halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.PollingRequestQueueTimeout), TimeSpan.FromSeconds(66)); + // halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.PollingRequestMaximumMessageProcessingTimeout), TimeSpan.FromSeconds(66)); + // halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.RetryListeningSleepInterval), TimeSpan.FromSeconds(1)); + // halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.ConnectionErrorRetryTimeout), TimeSpan.FromSeconds(66)); // Must always be greater than the heartbeat timeout. + // + // // Intentionally set higher than the heart beat, since some tests need to determine that the hart beat timeout applies. + // halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientSendTimeout), HalftheTcpRecieveTimeout + HalftheTcpRecieveTimeout); + // halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientReceiveTimeout), HalftheTcpRecieveTimeout + HalftheTcpRecieveTimeout); + // + // halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientHeartbeatSendTimeout), TimeSpan.FromSeconds(15)); + // halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientHeartbeatReceiveTimeout), TimeSpan.FromSeconds(15)); + // halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.TcpClientConnectTimeout), TimeSpan.FromSeconds(20)); + // halibutLimitType.ReflectionSetFieldValue(nameof(HalibutLimits.PollingQueueWaitTimeout), TimeSpan.FromSeconds(20)); } } } \ No newline at end of file diff --git a/source/Halibut.Tests/UsageFixture.cs b/source/Halibut.Tests/UsageFixture.cs index 8a2417d7..29b77fff 100644 --- a/source/Halibut.Tests/UsageFixture.cs +++ b/source/Halibut.Tests/UsageFixture.cs @@ -23,7 +23,7 @@ public async Task OctopusCanSendMessagesToTentacle_WithEchoService(ClientAndServ { using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() .WithStandardServices() - .WithHalibutLoggingLevel(LogLevel.Info) + .WithHalibutLoggingLevel(LogLevel.Trace) .Build(CancellationToken)) { var echo = clientAndService.CreateClient(); diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index 6adeed80..f006515e 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -29,10 +29,11 @@ public class MessageExchangeStream : IMessageExchangeStream public MessageExchangeStream(Stream stream, IMessageSerializer serializer, ILog log) { #if NETFRAMEWORK - this.stream = stream; + var myStream = stream; #else - this.stream = new RewindableBufferStream(stream, HalibutLimits.RewindableBufferStreamSize); + var myStream = new RewindableBufferStream(stream, HalibutLimits.RewindableBufferStreamSize); #endif + this.stream = myStream.ToRecordingStream("msgex", log); this.log = log; streamWriter = new StreamWriter(this.stream, new UTF8Encoding(false)) { NewLine = "\r\n" }; this.serializer = serializer; diff --git a/source/Halibut/Transport/Protocol/MessageSerializer.cs b/source/Halibut/Transport/Protocol/MessageSerializer.cs index 14f6852d..579a1325 100644 --- a/source/Halibut/Transport/Protocol/MessageSerializer.cs +++ b/source/Halibut/Transport/Protocol/MessageSerializer.cs @@ -1,6 +1,7 @@ using System; using System.IO; using System.IO.Compression; +using System.Text; using Halibut.Transport.Observability; using Newtonsoft.Json; using Newtonsoft.Json.Bson; @@ -43,13 +44,23 @@ internal MessageSerializer( { typeRegistry.AddToMessageContract(types); } + + private void ReallyEndReading(DeflateStream zip) + { + var b = new byte[1024]; + var res = zip.Read(b, 0, b.Length); + } + + CompressionLevel compressionLevel = CompressionLevel.NoCompression; public void WriteMessage(Stream stream, T message) { using var compressedByteCountingStream = new ByteCountingStream(stream, OnDispose.LeaveInputStreamOpen); - using (var zip = new DeflateStream(compressedByteCountingStream, CompressionMode.Compress, true)) - using (var bson = new BsonDataWriter(zip) { CloseOutput = false }) + using (var zip = new DeflateStream(compressedByteCountingStream, compressionLevel, true)) + //using (var bson = new BsonDataWriter(zip) { CloseOutput = false }) + using (StreamWriter writer = new StreamWriter(zip, Encoding.UTF8, -1, true)) + using (var bson = new JsonTextWriter(writer) { CloseOutput = false }) { // for the moment this MUST be object so that the $type property is included // If it is not, then an old receiver (eg, old tentacle) will not be able to understand messages from a new sender (server) @@ -62,6 +73,14 @@ public void WriteMessage(Stream stream, T message) public T ReadMessage(Stream stream) { + if (stream is StreamAndRecord streamAndRecord) + { + if (streamAndRecord.stream is IRewindableBuffer rewindable2) + { + return ReadCompressedMessageRewindable(stream, rewindable2); + } + } + if (stream is IRewindableBuffer rewindable) { return ReadCompressedMessageRewindable(stream, rewindable); @@ -75,15 +94,22 @@ T ReadCompressedMessage(Stream stream) using (var compressedByteCountingStream = new ByteCountingStream(stream, OnDispose.LeaveInputStreamOpen)) using (var zip = new DeflateStream(compressedByteCountingStream, CompressionMode.Decompress, true)) using (var decompressedByteCountingStream = new ByteCountingStream(zip, OnDispose.LeaveInputStreamOpen)) - using (var bson = new BsonDataReader(decompressedByteCountingStream) { CloseInput = false }) + //using (var bson = new BsonDataReader(decompressedByteCountingStream) { CloseInput = false }) + using (StreamReader reader = new StreamReader(zip, Encoding.UTF8, true, -1, true)) + using (var bson = new JsonTextReader(reader) {CloseInput = false}) { var messageEnvelope = DeserializeMessage(bson); observer.MessageRead(compressedByteCountingStream.BytesRead, decompressedByteCountingStream.BytesRead); + + // May be needed with no compression + ReallyEndReading(zip); return messageEnvelope.Message; } } + + T ReadCompressedMessageRewindable(Stream stream, IRewindableBuffer rewindable) { @@ -93,13 +119,31 @@ T ReadCompressedMessageRewindable(Stream stream, IRewindableBuffer rewindable using (var compressedByteCountingStream = new ByteCountingStream(stream, OnDispose.LeaveInputStreamOpen)) using (var zip = new DeflateStream(compressedByteCountingStream, CompressionMode.Decompress, true)) using (var decompressedObservableStream = new ByteCountingStream(zip, OnDispose.LeaveInputStreamOpen)) - using (var bson = new BsonDataReader(decompressedObservableStream) { CloseInput = false }) + //using (var bson = new BsonDataReader(decompressedObservableStream) { CloseInput = false }) + using (StreamReader reader = new StreamReader(zip, Encoding.UTF8, true, -1, true)) + using (var bson = new JsonTextReader(reader) {CloseInput = false}) { var messageEnvelope = DeserializeMessage(bson); + // May be needed with no compression + //ReallyEndReading(zip); + // Find the unused bytes in the DeflateStream input buffer if (deflateReflector.TryGetAvailableInputBufferSize(zip, out var unusedBytesCount)) { + if (unusedBytesCount == 0) + { + // Chance of a fix here: + var b = new byte[1024]; + var res = zip.Read(b, 0, b.Length); + // Chance of fix END. + deflateReflector.TryGetAvailableInputBufferSize(zip, out unusedBytesCount); + if (unusedBytesCount != 0) + { + // Fix used. + } + } + rewindable.FinishAndRewind(unusedBytesCount); } else