Managing Async Tasks in a Thread-safe Manner on iOS

 | 

Why Manage Async Tasks?

Let’s look at a very typical case. The mobile app fetches data from the cloud through a series of APIs: A separate fetch for data types A, B and C. There are no dependencies between these calls and they are fired simultaneously to reduce overall loading time. As each task is completed, the app needs to process payload. When all tasks are completed, the app notifies dependent components to reload and/or refresh UI. Each individual API call is executed on different queues/threads. Mutating shared states from different threads/queues results in fatal crashes or at the minimum, unexpected and often erroneous application behavior. As a result, we need to manage shared state safely. In this post, we will discuss and compare 3 different approaches.

OperationQueue

There are many ways to manage async tasks. For example, the OperationQueue class is designed to manage multiple operations concurrently or serially. It provides some nice features like chaining up dependent tasks and setting the max number of concurrent tasks. It appears to be a good solution to our problem. However, each task must be wrapped into an Operation and each subclassed operation must implement necessary interfaces like start(), finish(), isAsynchronous and isExecuting, etc. Obviously, this approach requires lots of work to fit all existing call/task implementations into custom Operation classes.

// Sample code of subclassing Operation class
class AsynTaskOperation: Operation {
    override var isAsynchronous: Bool {
        .......
    }
    
    override var isExecuting: Bool {
        .......
    }
    
    override var isFinished: Bool {
        .......
    }
        
    override func start() {
        .........
    }

    .............
}

Semaphore

GCD(Grand Central Dispatch) is a low level framework also designed to manage both synchronous and asynchronous tasks. GCD has very high performance on Apple platforms and is deeply integrated into the Darwin operating system. 

Here we use DispatchSemaphore to manage async tasks in the following way:

func start() {
  let backgroundQueue = NewBackgroundQueue()
  backgroundQueue.async {
    // use semaphore to wait for task completion
    let semaphore = DispatchSemaphore(value: 0)
     
    asyncCall1(completion: { semaphore.signal()  })   
    asyncCall2(completion: { semaphore.signal()  })
    asyncCall1(completion: { semaphore.signal()  })

    for _ in 1...numberOfCalls {
      // suspend current thread and 
      // wait for signals
      semaphore.wait(for: some_timeout_seconds)
    }
    
    // all tasks completed. Queue is resumed.
    // jump back to main queue
    DispatchQueue.main.async {
      allTasksCompleted()
    }
  }
}

The above code is simple and easy to read. A semaphore is created on a background queue. The wait(:) function is called after each async call is fired, and it suspends the current thread and waits for signals. When either the number of signals equals the number of waits, or when the wait time exceeds the timeout interval, the thread will resume by calling the allTasksCompleted function. While simple, this approach unfortunately also comes with some weaknesses. Although the semaphore is created on a background queue above, issues will surface if start() is called repetitively before the whole operation completes or times out. To be more precise, new semaphores are locking up threads faster than they can release the suspended threads. Eventually, the app would run out of background threads in the pool, and all threads would be locked in the state of semaphore_wait_trap. While the above problem can be avoided through the use of a single shared background thread for all operations, this limits us to a single operation at a time. 

After exploring the above 2 approaches, we are trying to look for a better solution with the following requirements:

  1. Be able to manage async tasks safely.
  2. Be simple and efficient.
  3. Support timeouts.
  4. Avoid locking thread when operation is running.
  5. Be easily unit-testable.

DispatchQueue

DispatchQueue enables developers to run code concurrently to take advantage of multiple-cores on the modern devices using the concept of queues instead of threads. The queue model is much simpler to use by abstracting away locks on threads and shared resources. By using DispatchQueue, we will demonstrate how to use a serial queue to keep track of different async tasks while mutating shared states safely. Here we create a new class to manage the state of the async task operation.

class AsyncTaskManager {
   var isRunning: Bool
   
   func startOperation(timeout: TimeInterval?, completion: @escaping ()->Void) -> Bool

   // call startTask when an async task starts
   func startTask()
   // call endTask when an async task ends
   func endTask()
}

Let’s look at the interface of the AsyncTaskManager class. It is designed to be very simple. The startTask() function is called when an async task starts. The endTask() function is called when a task finishes. The startOperation() function provides to start the whole operation with a timeout interval; it also provides a completion closure, which will be called when all tasks are completed, and will return true if the operation can be started. If the operation is running, it will prevent a new operation from starting before the current operation is completed. The above interfaces can be called on any thread or queue. 

Using AsyncTaskManager to achieve the same effect we had previously demonstrated using semaphores:


var manager = AsyncTaskManager()

func start() {
   guard !manager.isRunning else { return }
   
   // add 10 seconds timeout to the whole operation
   manager.startOperation(timeout: 10, completion: {
     self.completionAllTasks()     
   })

   manager.startTask()
   asyncCall1(completion: { manager.endTask()  })   

   manager.startTask()
   asyncCall2(completion: { manager.endTask() })

   manager.startTask()
   asyncCall1(completion: { manager.endTask()  })
}

The AsyncTask Manager class will use one serial queue (FIFO) to manage all state mutations inside the class, thus ensuring thread safety. The int tasksCount obviously keeps track of the number of actively running tasks. When a new task starts, tasksCount is incremented by 1. When a task finishes, tasksCount is decremented by 1. When tasksCount equals 0, all tasks have been completed.

class AsyncTaskManager {
  ..........
  private var taskCount: Int // number of tasks currently running
  private var queue = DispatchQueue() // a serial queue to manage state mutation

  var isRunning: Bool {
    var result = false
    queue.sync { result = taskCount > 0 }
    return result
  }

  func startTask() {
    queue.async {  self.tasksCount += 1 }
   }

   func endTask() {
     queue.async {
       defer {   
         if tasksCount == 0 {  
           completion?()  
           completion = nil
         }
       }

       guard self.tasksCount > 0 else { return }

       elf.tasks -= 1
     }
   }
}

In order to access the tasksCount variable to determine if an operation is still running, it uses queue.sync() to prevent mutations while tasksCount is being accessed. 

Timeout Support

For timeout handling, one option is to use a Timer. But Timer is fired on a specific thread’s runloop which means Timer should be created and fired on the same thread/queue. Here, we use a simpler solution with queue.asyncAfter(deadline:):

class AsyncTaskManager {
   ..........
   private var completion: ((Bool)->Void)? // pass true if it is called because of timeout
   private var operationId: UUID?

   func prepare(timeout: TimeInterval, completion: (Bool)->Void) {
     .........
     let identifier = UUID()
     queue.asyncAfter(deadline: .now() + timeout) {
       guard self.operationId == identifier else { return }
       self.allTasksCompleted(timeout: true)
     }
     operationId = identifier 
   }

  func endTask() {
    ........
    if tasksCount == 0 {
      allTasksCompleted(timeout: false)     
    }
  }

  func allTasksCompleted(timeout: Bool) {
    completion?(timeout)
    operationId = nil
    completion = nil
  }
}

An operationId is created before any task begins. 

1. If operation completes before timeout, operationId will be set to nil. 

2. If operation doesn’t complete before timeout, the asyncAfter closure will be called. If operationId is matched, the completion closure is called.

As you can see, this solution is very lightweight. Mutating an integer with an async closure in a serial queue adds no wait time with no executional overload, no thread or queue suspension required while supporting timeout handling. The isRunning property on the class can be used as a guard to avoid starting the same operation before the current operation completes.

Summary

We discussed 3 unique solutions to manage a series of async tasks. The first approach uses OperationQueue and requires each task to subclass Operation, which is not that easy to use. Because it requires each async task to subclass Operation and implement numerous interfaces. Second approach uses DispatchSemaphore. It is very simple to write and understand. However, the semaphore suspends the thread/queue it is being run on. If it is being called on a new thread each time, it can quickly lock up multiple threads/queues in a short period of time. Lastly, we introduce the third approach which utilizes DispatchQueue and TaskCount to achieve the same result.

OperationQueue VS Semaphore VS DispatchQueue

ProsCons
OperationQueue* Designed for operation management.
* Comes with lots of useful features.
* Not easy to use. Have to wrap tasks into a subclass of operation.
* Does not support timeout out of the box.
Semaphore* Simple to use.
* Supports timeout by default.
* Threads are being locked up
* No simple way to check if the operation is being run.
DispatchQueue* Easy to use
* Lightweight and fast.
* Supports timeout.
* Need to manually maintain task count and handle timeouts.

This AsyncTaskManager class is very fast and easy to use. It is also very easy to write unit-tests on. In our Livongo mobile app, we replaced the old semaphore implementation with AsyncTaskManager. We are happy with the results and no longer see bugs caused by thread locking.