I’ve been trying out various ways to extend and build upon the currently available Concurrency APIs introduced in Swift 5.5

I believe the Concurrency APIs are intentionally somewhat minimal so that developers would intentionally fall back on Combine for complex tasks. Maybe I’m right or wrong but I can see how a lot of people will want to extend the Concurrency APIs to suit better their current tasks.

In this post I’ll code a couple of extensions that show how to easily build your own concurrent power-ups.

Async sequence with Array.spread(delay:)

The two initializers AsyncStream(build:) and AsyncStream(unfolding:onCancel:) allow you to easily create push and pull streams over time. But of course those generic initializers might get quite verbose if you use them ad-hoc in your code.

For demos and test apps, I often need to work with some sequence of values that I want to process over time. In this secton I’ll write an extension to easily turn an Array into an asynchronous sequence.

I’ll add a method on Array that takes a time interval in seconds delay and returns an async sequence that emits each array element delay seconds apart:

1
2
3
4
5
extension Array {
  func spread(delay: TimeInterval) -> AsyncStream<Element> {
	
  }
}

The method will return an async stream with the same element type like the array. The stream will emit all the array’s elements with the given delay in-between.

First of all, the method will return a stream that uses a new Task to asynchronously emit elements:

1
2
3
4
5
AsyncStream { continuation in
  Task {
  
  }
}

So far, so good — the rest of code is simply looping over the array and making the task sleep between emitting:

1
2
3
4
for value in self {
  continuation.yield(value)
  try await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000))
}

Note that I don’t use an async loop — this is a synchronous for loop that takes a nap after emitting each element and before moving on to its next iteration.

The completed source code looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
extension Array {
  func spread(delay: TimeInterval) -> AsyncStream<Element> {
    AsyncStream { continuation in
      Task {
        for value in self {
          continuation.yield(value)
          try await Task.sleep(nanoseconds: 
            UInt64(delay * 1_000_000_000)
          )
        }
      }
    }
  }
}

I can now easily use the new method to asynchronously loop over sequences:

1
2
3
4
for await value in [1, 2, 3].spread(delay: 1.5) {
  print("\(Date().formatted(date: .omitted, 
    time: .standard)): \(value)")
}

and that prints 1, 2, and 3 in the console alongside the timestamp for each element:

10:31:57 AM: 1
10:31:58 AM: 2
10:32:00 AM: 3

Bindings with AsyncSequence.assign(to:on:)

More often than not, especially when testing code in a sample app, I need the async sequence values to update the UI — driving a label, a progress bar or maybe affect the size of a view on screen.

I’ve found Combine’s Publisher.assign(to:on:) to be super useful for binding values to UI components, so as the next step in this article let’s code a simple assign method on AsyncSequence.

I’ll start with defining a new method on AsyncSequence that mimics the Publisher.assign(to:on:) signature:

1
2
3
4
5
6
7
8
extension AsyncSequence {
  public func assign<Root>(
    to keyPath: ReferenceWritableKeyPath<Root, Self.Element>,
    on object: Root
  ) -> Void {

  }
}

The method takes some type and a writeable key path to use for value updates. Publisher.assign(to:on:) returns an AnyCancellable to allow you to do memory management correctly.

In Concurrency we don’t have cancellables but it still might be useful to be able to cancel the binding manually. I’ll adjust the method to return a Task in case I need to manually cancel the binding:

1
2
3
4
5
6
7
8
9
extension AsyncSequence {
  @discardableResult
  public func assign<Root>(
    to keyPath: ReferenceWritableKeyPath<Root, Self.Element>,
    on object: Root
  ) -> Task<Void, Error> {

  }
}

The implementation is an asynchronous task that loops over the sequence and updates the target object whenever a new element comes through:

1
2
3
4
5
Task {
  for try await value in self {
    object[keyPath: keyPath] = value
  }
}

And so, while the implementation is simple, the completed code gets a bit verbose (which is the reason I want this as reusable extension on AsyncSequence):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
extension AsyncSequence {
  @discardableResult
  public func assign<Root>(
    to keyPath: ReferenceWritableKeyPath<Root, Self.Element>,
    on object: Root
  ) -> Task<Void, Error> {
    Task {
      for try await value in self {
        object[keyPath: keyPath] = value
      }
    }
  }
}

Now let’s give everything above a try in a simple SwiftUI app. I’ll use the View.task() modifier to update a text on screen every second and a half, much like I did in the previous section in the console:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
struct ContentView: View {
  @State var counter = ""

  var body: some View {
    Text("Update: \(counter)")
      .task {

        [1, 2, 3, 4, 5, 6]
          .spread(delay: 1.5)
          .map { "\(Date().formatted(date: .omitted, 
             time: .standard)): \($0)" }
          .assign(to: \.counter, on: self)

        }
  }
}

Now I have the async sequence bound to the label in just few lines. I haven’t played much with testing proper cancellation but the code above should get you going with implementing the extensions you need.

For my next post I’ve prepared another example that covers how to add an AsyncSequence extension that produces a new type of sequence — similar to how filter, drop(while:), and others work.

Where to go from here?

If you’d like to support me, get my book on Swift Concurrency:

» swiftconcurrencybook.com «

Interested in discussing the new async/await Swift syntax and concurrency? Hit me up on twitter at https://twitter.com/icanzilb.