From 3077ea44ed77fb7ad4cd0064a97dcccc70229c62 Mon Sep 17 00:00:00 2001 From: Sebastian Villena <97059974+ruisebas@users.noreply.github.com> Date: Fri, 13 Dec 2024 13:46:15 -0500 Subject: [PATCH] fix(IoT): Fixing race conditions during StreamThread cleanup --- AWSIoT/Internal/AWSIoTStreamThread.m | 222 ++++++++-------------- AWSIoTUnitTests/AWSIoTStreamThreadTests.m | 92 ++++----- CHANGELOG.md | 5 +- 3 files changed, 119 insertions(+), 200 deletions(-) diff --git a/AWSIoT/Internal/AWSIoTStreamThread.m b/AWSIoT/Internal/AWSIoTStreamThread.m index 16ab900a89e..059c4d6b8d8 100644 --- a/AWSIoT/Internal/AWSIoTStreamThread.m +++ b/AWSIoT/Internal/AWSIoTStreamThread.m @@ -27,16 +27,13 @@ @interface AWSIoTStreamThread() @property(nonatomic, assign) NSTimeInterval defaultRunLoopTimeInterval; @property(nonatomic, assign) BOOL isRunning; @property(nonatomic, assign) BOOL shouldDisconnect; - -// Add synchronization primitives -@property(nonatomic, strong) dispatch_queue_t cleanupQueue; -@property(nonatomic, strong) dispatch_semaphore_t cleanupSemaphore; -@property(nonatomic, assign) BOOL isCleaningUp; +@property(nonatomic, strong) dispatch_queue_t serialQueue; +@property(nonatomic, assign) BOOL didCleanUp; @end @implementation AWSIoTStreamThread -- (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session +- (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session decoderInputStream:(nonnull NSInputStream *)decoderInputStream encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream { return [self initWithSession:session @@ -56,184 +53,127 @@ - (instancetype)initWithSession:(nonnull AWSMQTTSession *)session _outputStream = outputStream; _defaultRunLoopTimeInterval = 10; _shouldDisconnect = NO; - _isCleaningUp = NO; - - // Initialize synchronization primitives - _cleanupQueue = dispatch_queue_create("com.amazonaws.iot.streamthread.cleanup", DISPATCH_QUEUE_SERIAL); - _cleanupSemaphore = dispatch_semaphore_create(1); + _serialQueue = dispatch_queue_create("com.amazonaws.iot.streamthread.syncQueue", DISPATCH_QUEUE_SERIAL); + _didCleanUp = NO; } return self; } - (void)main { - @autoreleasepool { - AWSDDLogVerbose(@"Started execution of Thread: [%@]", self); - - if (![self setupRunLoop]) { - AWSDDLogError(@"Failed to setup run loop for thread: [%@]", self); - return; - } - - [self startIOOperations]; - - while ([self shouldContinueRunning]) { - @autoreleasepool { - [self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode - beforeDate:[NSDate dateWithTimeIntervalSinceNow:self.defaultRunLoopTimeInterval]]; - } - } - - [self performCleanup]; - - AWSDDLogVerbose(@"Finished execution of Thread: [%@]", self); - } -} - -- (BOOL)setupRunLoop { if (self.isRunning) { - AWSDDLogError(@"Thread already running"); - return NO; + AWSDDLogWarn(@"Attempted to start a thread that is already running: [%@]", self); + return; } - + + AWSDDLogVerbose(@"Started execution of Thread: [%@]", self); + //This is invoked in a new thread by the webSocketDidOpen method or by the Connect method. Get the runLoop from the thread. self.runLoopForStreamsThread = [NSRunLoop currentRunLoop]; - - // Setup timer with weak reference to prevent retain cycles + + //Setup a default timer to ensure that the RunLoop always has atleast one timer on it. This is to prevent the while loop + //below to spin in tight loop when all input sources and session timers are shutdown during a reconnect sequence. __weak typeof(self) weakSelf = self; self.defaultRunLoopTimer = [[NSTimer alloc] initWithFireDate:[NSDate dateWithTimeIntervalSinceNow:60.0] interval:60.0 - target:weakSelf - selector:@selector(timerHandler:) - userInfo:nil - repeats:YES]; - - if (!self.defaultRunLoopTimer) { - AWSDDLogError(@"Failed to create run loop timer"); - return NO; - } + repeats:YES + block:^(NSTimer * _Nonnull timer) { + AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@", weakSelf, weakSelf.isRunning ? @"YES" : @"NO", weakSelf.isCancelled ? @"YES" : @"NO"); + }]; [self.runLoopForStreamsThread addTimer:self.defaultRunLoopTimer forMode:NSDefaultRunLoopMode]; - self.isRunning = YES; - return YES; -} -- (void)startIOOperations { + self.isRunning = YES; if (self.outputStream) { [self.outputStream scheduleInRunLoop:self.runLoopForStreamsThread - forMode:NSDefaultRunLoopMode]; + forMode:NSDefaultRunLoopMode]; [self.outputStream open]; } + + //Update the runLoop and runLoopMode in session. [self.session connectToInputStream:self.decoderInputStream outputStream:self.encoderOutputStream]; + + while ([self shouldContinueRunning]) { + //This will continue run until the thread is cancelled + //Run one cycle of the runloop. This will return after a input source event or timer event is processed + [self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode + beforeDate:[NSDate dateWithTimeIntervalSinceNow:self.defaultRunLoopTimeInterval]]; + } + + [self cleanUp]; + + AWSDDLogVerbose(@"Finished execution of Thread: [%@]", self); } - (BOOL)shouldContinueRunning { __block BOOL shouldRun; - dispatch_sync(self.cleanupQueue, ^{ + dispatch_sync(self.serialQueue, ^{ shouldRun = self.isRunning && !self.isCancelled && self.defaultRunLoopTimer != nil; }); return shouldRun; } -- (void)invalidateTimer { - dispatch_sync(self.cleanupQueue, ^{ - if (self.defaultRunLoopTimer) { - [self.defaultRunLoopTimer invalidate]; - self.defaultRunLoopTimer = nil; - } - }); -} - - (void)cancel { AWSDDLogVerbose(@"Issued Cancel on thread [%@]", (NSThread *)self); - [self cancelWithDisconnect:NO]; + dispatch_sync(self.serialQueue, ^{ + self.isRunning = NO; + [super cancel]; + }); } - (void)cancelAndDisconnect:(BOOL)shouldDisconnect { - AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]", - shouldDisconnect ? @"YES" : @"NO", (NSThread *)self); - [self cancelWithDisconnect:shouldDisconnect]; + AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]", shouldDisconnect ? @"YES" : @"NO", (NSThread *)self); + dispatch_sync(self.serialQueue, ^{ + self.shouldDisconnect = shouldDisconnect; + self.isRunning = NO; + [super cancel]; + }); } -- (void)cancelWithDisconnect:(BOOL)shouldDisconnect { - // Ensure thread-safe property updates - dispatch_sync(self.cleanupQueue, ^{ - if (!self.isCleaningUp) { - self.shouldDisconnect = shouldDisconnect; - self.isRunning = NO; - [super cancel]; - - // Invalidate timer to trigger run loop exit - [self invalidateTimer]; +- (void)cleanUp { + dispatch_sync(self.serialQueue, ^{ + if (self.didCleanUp) { + AWSDDLogVerbose(@"Clean up already called for thread: [%@]", (NSThread *)self); + return; } - }); -} -- (void)performCleanup { - dispatch_semaphore_wait(self.cleanupSemaphore, DISPATCH_TIME_FOREVER); - - if (self.isCleaningUp) { - dispatch_semaphore_signal(self.cleanupSemaphore); - return; - } - - self.isCleaningUp = YES; - dispatch_semaphore_signal(self.cleanupSemaphore); - - dispatch_sync(self.cleanupQueue, ^{ - [self cleanupResources]; - }); -} + self.didCleanUp = YES; + if (self.defaultRunLoopTimer) { + [self.defaultRunLoopTimer invalidate]; + self.defaultRunLoopTimer = nil; + } -- (void)cleanupResources { - if (self.shouldDisconnect) { - [self closeSession]; - [self closeStreams]; - } else { - AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self); - } - - // Handle onStop callback - dispatch_block_t stopBlock = self.onStop; - if (stopBlock) { - self.onStop = nil; - stopBlock(); - } -} + if (self.shouldDisconnect) { + if (self.session) { + [self.session close]; + self.session = nil; + } -- (void)closeSession { - if (self.session) { - [self.session close]; - self.session = nil; - } -} + if (self.outputStream) { + self.outputStream.delegate = nil; + [self.outputStream close]; + [self.outputStream removeFromRunLoop:self.runLoopForStreamsThread + forMode:NSDefaultRunLoopMode]; + self.outputStream = nil; + } -- (void)closeStreams { - if (self.outputStream) { - self.outputStream.delegate = nil; - [self.outputStream close]; - [self.outputStream removeFromRunLoop:self.runLoopForStreamsThread - forMode:NSDefaultRunLoopMode]; - self.outputStream = nil; - } - - if (self.decoderInputStream) { - [self.decoderInputStream close]; - self.decoderInputStream = nil; - } - - if (self.encoderOutputStream) { - [self.encoderOutputStream close]; - self.encoderOutputStream = nil; - } -} + if (self.decoderInputStream) { + [self.decoderInputStream close]; + self.decoderInputStream = nil; + } -- (void)timerHandler:(NSTimer*)theTimer { - AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@", - self, self.isRunning ? @"YES" : @"NO", self.isCancelled ? @"YES" : @"NO"); -} + if (self.encoderOutputStream) { + [self.encoderOutputStream close]; + self.encoderOutputStream = nil; + } + } else { + AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self); + } -- (void)dealloc { - AWSDDLogVerbose(@"Deallocating AWSIoTStreamThread: [%@]", self); + if (self.onStop) { + self.onStop(); + self.onStop = nil; + } + }); } @end diff --git a/AWSIoTUnitTests/AWSIoTStreamThreadTests.m b/AWSIoTUnitTests/AWSIoTStreamThreadTests.m index 11b61fa055c..294bfa8a216 100644 --- a/AWSIoTUnitTests/AWSIoTStreamThreadTests.m +++ b/AWSIoTUnitTests/AWSIoTStreamThreadTests.m @@ -18,17 +18,12 @@ #import "AWSIoTStreamThread.h" @interface AWSIoTStreamThread() - @property(nonatomic, assign) NSTimeInterval defaultRunLoopTimeInterval; @property (nonatomic, assign) BOOL isRunning; -@property (nonatomic, strong) dispatch_queue_t cleanupQueue; -@property (nonatomic, assign) BOOL isCleaningUp; +@property (nonatomic, strong) dispatch_queue_t serialQueue; +@property (nonatomic, assign) BOOL didCleanUp; @property (nonatomic, strong, nullable) NSTimer *defaultRunLoopTimer; @property (nonatomic, strong, nullable) NSRunLoop *runLoopForStreamsThread; - -- (void)invalidateTimer; - - @end @@ -61,7 +56,6 @@ - (void)setUp { encoderOutputStream:self.encoderOutputStream outputStream:self.outputStream]; self.thread.defaultRunLoopTimeInterval = 0.1; - [self.thread start]; [self waitForExpectations:@[startExpectation] timeout:1]; } @@ -107,67 +101,51 @@ - (void)testCancelAndDisconnect_shouldCloseStreams_andInvokeOnStop { /// When: The thread is cancelled with disconnect set to NO /// Then: Neither the session nor the streams are closed - (void)testCancel_shouldNotCloseStreams_andInvokeOnStop { - XCTestExpectation *stopExpectation = [self expectationWithDescription:@"AWSIoTStreamThread.onStop expectation"]; - self.thread.onStop = ^{ - [stopExpectation fulfill]; - }; - - __block BOOL didInvokeSessionClose = NO; - OCMStub([self.session close]).andDo(^(NSInvocation *invocation) { - didInvokeSessionClose = YES; - }); - - __block BOOL didInvokeDecoderInputStreamClose = NO; - OCMStub([self.decoderInputStream close]).andDo(^(NSInvocation *invocation) { - didInvokeDecoderInputStreamClose = YES; - }); - - __block BOOL didInvokeEncoderOutputStreamClose = NO; - OCMStub([self.encoderOutputStream close]).andDo(^(NSInvocation *invocation) { - didInvokeEncoderOutputStreamClose = YES; - }); - - __block BOOL didInvokeOutputStreamClose = NO; - OCMStub([self.outputStream close]).andDo(^(NSInvocation *invocation) { - didInvokeOutputStreamClose = YES; - }); - - [self.thread cancelAndDisconnect:NO]; - [self waitForExpectations:@[stopExpectation] timeout:1]; - - XCTAssertFalse(didInvokeSessionClose, @"The `close` method on `session` should not be invoked"); - XCTAssertFalse(didInvokeDecoderInputStreamClose, @"The `close` method on `decoderInputStream` should not be invoked"); - XCTAssertFalse(didInvokeEncoderOutputStreamClose, @"The `close` method on `encoderOutputStream` should not be invoked"); - XCTAssertFalse(didInvokeOutputStreamClose, @"The `close` method on `outputStream` should not be invoked"); -} - -- (void)testCancelAndDisconnect_shouldSetIsCleaningUp { XCTestExpectation *stopExpectation = [self expectationWithDescription:@"AWSIoTStreamThread.onStop expectation"]; self.thread.onStop = ^{ [stopExpectation fulfill]; }; - [self.thread cancelAndDisconnect:YES]; + __block BOOL didInvokeSessionClose = NO; + [OCMStub([self.session close]) andDo:^(NSInvocation *invocation) { + didInvokeSessionClose = YES; + }]; - __block BOOL isCleaningUp; - dispatch_sync(self.thread.cleanupQueue, ^{ - isCleaningUp = self.thread.isCleaningUp; - }); + __block BOOL didInvokeDecoderInputStreamClose = NO; + [OCMStub([self.decoderInputStream close]) andDo:^(NSInvocation *invocation) { + didInvokeDecoderInputStreamClose = YES; + }]; + + __block BOOL didInvokeEncoderOutputStreamClose = NO; + [OCMStub([self.encoderOutputStream close]) andDo:^(NSInvocation *invocation) { + didInvokeEncoderOutputStreamClose = YES; + }]; - XCTAssertTrue(isCleaningUp, @"isCleaningUp should be YES during cleanup"); + __block BOOL didInvokeOutputStreamClose = NO; + [OCMStub([self.outputStream close]) andDo:^(NSInvocation *invocation) { + didInvokeOutputStreamClose = YES; + }]; + + [self.thread cancelAndDisconnect:NO]; [self waitForExpectations:@[stopExpectation] timeout:1]; + + XCTAssertFalse(didInvokeSessionClose, @"The `close` method on `session` should not be invoked"); + XCTAssertFalse(didInvokeDecoderInputStreamClose, @"The `close` method on `decoderInputStream` should not be invoked"); + XCTAssertFalse(didInvokeEncoderOutputStreamClose, @"The `close` method on `encoderOutputStream` should not be invoked"); + XCTAssertFalse(didInvokeOutputStreamClose, @"The `close` method on `outputStream` should not be invoked"); } -- (void)testInvalidateTimer_shouldInvalidateAndSetTimerToNil { - [self.thread invalidateTimer]; +- (void)testCancelAndDisconnect_shouldSeDidCleanUp_andInvalidateTimer { + XCTestExpectation *stopExpectation = [self expectationWithDescription:@"AWSIoTStreamThread.onStop expectation"]; + self.thread.onStop = ^{ + [stopExpectation fulfill]; + }; - __block BOOL isTimerInvalidated = NO; - OCMStub([self.thread.defaultRunLoopTimer invalidate]).andDo(^(NSInvocation *invocation) { - isTimerInvalidated = YES; - }); + [self.thread cancelAndDisconnect:YES]; + [self waitForExpectations:@[stopExpectation] timeout:1]; + XCTAssertTrue(self.thread.didCleanUp, @"didCleanUp should be YES after cleanup"); XCTAssertNil(self.thread.defaultRunLoopTimer, @"defaultRunLoopTimer should be nil after invalidation"); - XCTAssertTrue(isTimerInvalidated, @"Timer invalidate method should have been called"); } - (void)testRunLoop_shouldInvokeRunModeBeforeDate { @@ -200,7 +178,7 @@ - (void)testCancelAndDisconnect_shouldSynchronizeOnCleanupQueue { // Validate synchronization __block BOOL didSynchronize = NO; - dispatch_sync(self.thread.cleanupQueue, ^{ + dispatch_sync(self.thread.serialQueue, ^{ didSynchronize = YES; }); diff --git a/CHANGELOG.md b/CHANGELOG.md index 85a05f90afb..67d609d59ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,9 @@ ### Bug Fixes - **AWSIoT** - - Fixing a race condition when invalidating/creating the reconnect timer (#5454) - - Fixing a potential race condition in the timer ring queue (#5461) + - Fixing race conditions during cleanup in `AWSIoTStreamThread` (#5477) + - Fixing a race condition when invalidating/creating the reconnect timer in `AWSIoTMQTTClient` (#5454) + - Fixing a potential race condition in the timer ring queue in `AWSMQTTSession` (#5461) ## 2.38.0