Skip to content

Commit

Permalink
fix(IoT): fixing race condition in AWSIoTStreamThread .cxx_destruct a…
Browse files Browse the repository at this point in the history
…ws-amplify#5452

Related issue:
aws-amplify#5452

Description of changes:

1. Addition of Synchronization Primitives
New Properties:
 - dispatch_queue_t cleanupQueue
 - dispatch_semaphore_t cleanupSemaphore
 - BOOL isCleaningUp
Purpose: Ensures thread-safe access and modification of critical properties like isRunning, shouldDisconnect, and defaultRunLoopTimer.
Synchronization prevents race conditions during cleanup and cancellation processes.

2. Enhanced shouldContinueRunning Method

Before: Used direct property access without synchronization
After: Introduced synchronization using dispatch_sync for thread-safe checks
Purpose:Prevents inconsistencies if multiple threads attempt to read/write properties simultaneously.

3. Cleanup Enhancements

performCleanup and cleanupResources:
Added explicit synchronization: dispatch_sync and dispatch_semaphore ensure cleanup operations are thread-safe and do not overlap if called multiple times.
Handles complex cleanup sequences safely, such as invalidating timers, disconnecting streams, and deallocating the session.
Purpose: Ensures that cleanup actions (e.g., closing streams and invalidating timers) are thread-safe and only executed once.

4. Timer Initialization
Weak Reference to Prevent Retain Cycles: The timer in setupRunLoop now uses a __weak reference to avoid retain cycles
Before: Used a strong reference (target:self), which could result in a retain cycle.
Purpose: Avoids potential memory leaks by ensuring the thread does not retain itself via the timer.

5. Improved cancel Method
Before: Simple isRunning flag and direct super cancel call
After: Introduced thread-safe handling and ensured timer invalidation
Purpose: Prevents race conditions when canceling the thread, ensuring timers are invalidated and properties are safely updated.
  • Loading branch information
Andrei Konovalov committed Dec 4, 2024
1 parent b7504ff commit b7dc4fa
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 87 deletions.
2 changes: 1 addition & 1 deletion AWSIoT/Internal/AWSIoTStreamThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ NS_ASSUME_NONNULL_BEGIN

@interface AWSIoTStreamThread : NSThread

@property(strong, nullable) void (^onStop)(void);
@property(nonatomic, copy, nullable) void (^onStop)(void);

-(instancetype)initWithSession:(nonnull AWSMQTTSession *)session
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
Expand Down
214 changes: 150 additions & 64 deletions AWSIoT/Internal/AWSIoTStreamThread.m
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ @interface AWSIoTStreamThread()
@property(nonatomic, assign) NSTimeInterval defaultRunLoopTimeInterval;
@property(nonatomic, assign) BOOL isRunning;
@property(nonatomic, assign) BOOL shouldDisconnect;

// Add synchronization primitives
@property(nonatomic, strong) dispatch_queue_t cleanupQueue;
@property(nonatomic, strong) dispatch_semaphore_t cleanupSemaphore;
@property(nonatomic, assign) BOOL isCleaningUp;
@end

@implementation AWSIoTStreamThread

- (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session
- (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream {
return [self initWithSession:session
Expand All @@ -40,114 +45,195 @@ - (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session
outputStream:nil];
}

-(instancetype)initWithSession:(nonnull AWSMQTTSession *)session
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream
outputStream:(nullable NSOutputStream *)outputStream; {
- (instancetype)initWithSession:(nonnull AWSMQTTSession *)session
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream
outputStream:(nullable NSOutputStream *)outputStream {
if (self = [super init]) {
_session = session;
_decoderInputStream = decoderInputStream;
_encoderOutputStream = encoderOutputStream;
_outputStream = outputStream;
_defaultRunLoopTimeInterval = 10;
_shouldDisconnect = NO;
_isCleaningUp = NO;

// Initialize synchronization primitives
_cleanupQueue = dispatch_queue_create("com.amazonaws.iot.streamthread.cleanup", DISPATCH_QUEUE_SERIAL);
_cleanupSemaphore = dispatch_semaphore_create(1);
}
return self;
}

- (void)main {
AWSDDLogVerbose(@"Started execution of Thread: [%@]", self);
//This is invoked in a new thread by the webSocketDidOpen method or by the Connect method. Get the runLoop from the thread.
self.runLoopForStreamsThread = [NSRunLoop currentRunLoop];
@autoreleasepool {
AWSDDLogVerbose(@"Started execution of Thread: [%@]", self);

if (![self setupRunLoop]) {
AWSDDLogError(@"Failed to setup run loop for thread: [%@]", self);
return;
}

[self startIOOperations];

while ([self shouldContinueRunning]) {
@autoreleasepool {
[self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode
beforeDate:[NSDate dateWithTimeIntervalSinceNow:self.defaultRunLoopTimeInterval]];
}
}

[self performCleanup];

AWSDDLogVerbose(@"Finished execution of Thread: [%@]", self);
}
}

//Setup a default timer to ensure that the RunLoop always has atleast one timer on it. This is to prevent the while loop
//below to spin in tight loop when all input sources and session timers are shutdown during a reconnect sequence.
- (BOOL)setupRunLoop {
if (self.isRunning) {
AWSDDLogError(@"Thread already running");
return NO;
}

self.runLoopForStreamsThread = [NSRunLoop currentRunLoop];

// Setup timer with weak reference to prevent retain cycles
__weak typeof(self) weakSelf = self;
self.defaultRunLoopTimer = [[NSTimer alloc] initWithFireDate:[NSDate dateWithTimeIntervalSinceNow:60.0]
interval:60.0
target:self
selector:@selector(timerHandler:)
userInfo:nil
repeats:YES];
interval:60.0
target:weakSelf
selector:@selector(timerHandler:)
userInfo:nil
repeats:YES];

if (!self.defaultRunLoopTimer) {
AWSDDLogError(@"Failed to create run loop timer");
return NO;
}
[self.runLoopForStreamsThread addTimer:self.defaultRunLoopTimer
forMode:NSDefaultRunLoopMode];

self.isRunning = YES;
return YES;
}

- (void)startIOOperations {
if (self.outputStream) {
[self.outputStream scheduleInRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
forMode:NSDefaultRunLoopMode];
[self.outputStream open];
}

//Update the runLoop and runLoopMode in session.
[self.session connectToInputStream:self.decoderInputStream
outputStream:self.encoderOutputStream];
}

while (self.isRunning && !self.isCancelled) {
//This will continue run until the thread is cancelled
//Run one cycle of the runloop. This will return after a input source event or timer event is processed
[self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode
beforeDate:[NSDate dateWithTimeIntervalSinceNow:self.defaultRunLoopTimeInterval]];
}

[self cleanUp];
- (BOOL)shouldContinueRunning {
__block BOOL shouldRun;
dispatch_sync(self.cleanupQueue, ^{
shouldRun = self.isRunning && !self.isCancelled && self.defaultRunLoopTimer != nil;
});
return shouldRun;
}

AWSDDLogVerbose(@"Finished execution of Thread: [%@]", self);
- (void)invalidateTimer {
dispatch_sync(self.cleanupQueue, ^{
if (self.defaultRunLoopTimer) {
[self.defaultRunLoopTimer invalidate];
self.defaultRunLoopTimer = nil;
}
});
}

- (void)cancel {
AWSDDLogVerbose(@"Issued Cancel on thread [%@]", (NSThread *)self);
self.isRunning = NO;
[super cancel];
[self cancelWithDisconnect:NO];
}

- (void)cancelAndDisconnect:(BOOL)shouldDisconnect {
AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]", shouldDisconnect ? @"YES" : @"NO", (NSThread *)self);
self.shouldDisconnect = shouldDisconnect;
self.isRunning = NO;
[super cancel];
AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]",
shouldDisconnect ? @"YES" : @"NO", (NSThread *)self);
[self cancelWithDisconnect:shouldDisconnect];
}

- (void)cleanUp {
if (self.defaultRunLoopTimer) {
[self.defaultRunLoopTimer invalidate];
self.defaultRunLoopTimer = nil;
}

if (self.shouldDisconnect) {
if (self.session) {
[self.session close];
self.session = nil;
}

if (self.outputStream) {
self.outputStream.delegate = nil;
[self.outputStream close];
[self.outputStream removeFromRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
self.outputStream = nil;
- (void)cancelWithDisconnect:(BOOL)shouldDisconnect {
// Ensure thread-safe property updates
dispatch_sync(self.cleanupQueue, ^{
if (!self.isCleaningUp) {
self.shouldDisconnect = shouldDisconnect;
self.isRunning = NO;
[super cancel];

// Invalidate timer to trigger run loop exit
[self invalidateTimer];
}
});
}

if (self.decoderInputStream) {
[self.decoderInputStream close];
self.decoderInputStream = nil;
}
- (void)performCleanup {
dispatch_semaphore_wait(self.cleanupSemaphore, DISPATCH_TIME_FOREVER);

if (self.isCleaningUp) {
dispatch_semaphore_signal(self.cleanupSemaphore);
return;
}

self.isCleaningUp = YES;
dispatch_semaphore_signal(self.cleanupSemaphore);

dispatch_sync(self.cleanupQueue, ^{
[self cleanupResources];
});
}

if (self.encoderOutputStream) {
[self.encoderOutputStream close];
self.encoderOutputStream = nil;
}
- (void)cleanupResources {
if (self.shouldDisconnect) {
[self closeSession];
[self closeStreams];
} else {
AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self);
}

if (self.onStop) {
self.onStop();

// Handle onStop callback
dispatch_block_t stopBlock = self.onStop;
if (stopBlock) {
self.onStop = nil;
stopBlock();
}
}

- (void)closeSession {
if (self.session) {
[self.session close];
self.session = nil;
}
}

- (void)closeStreams {
if (self.outputStream) {
self.outputStream.delegate = nil;
[self.outputStream close];
[self.outputStream removeFromRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
self.outputStream = nil;
}

if (self.decoderInputStream) {
[self.decoderInputStream close];
self.decoderInputStream = nil;
}

if (self.encoderOutputStream) {
[self.encoderOutputStream close];
self.encoderOutputStream = nil;
}
}

- (void)timerHandler:(NSTimer*)theTimer {
AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@", self, self.isRunning ? @"YES" : @"NO", self.isCancelled ? @"YES" : @"NO");
AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@",
self, self.isRunning ? @"YES" : @"NO", self.isCancelled ? @"YES" : @"NO");
}

- (void)dealloc {
AWSDDLogVerbose(@"Deallocating AWSIoTStreamThread: [%@]", self);
}

@end
Loading

0 comments on commit b7dc4fa

Please sign in to comment.