Understanding Swift Concurrency’s AsyncStream and AsyncThrowingStream
Published on: January 2, 2023In an earlier post, I wrote about different ways that you can bridge your existing asynchronous code over to Swift’s new Concurrency system that leverages async / await. The mechanisms shown there work great for code where your code produces a single result that can be modeled as a single value.
Since writing this post we've gained AsyncStream.makeStream which makes stream creation a lot smoother. Learn more in this post.
However in some cases this isn’t possible because your existing code will provide multiple values over time. This is the case for things like download progress, the user’s current location, and other similar situations.
Generally speaking, these kinds of patterns would be modeled as AsyncSequence
objects that you can iterate over using an asynchronous for loop. A basic example of this would be the lines
property on URL
:
let url = URL(string: "https://donnywals.com")!
for try await line in url.lines {
// use line
}
But what’s the best way to build your own async sequences? Implementing the AsyncSequence
protocol and building your on AsyncIterator
sounds tedious and error-prone. Luckily, there’s no reason for you to be doing any of that.
In this post, I will show you how you can leverage Swift’s AsyncStream
to build custom async sequences that produce values whenever you need them to.
Producing a simple async stream
An async stream can be produced in various ways. The easiest way to create an async stream is to use the AsyncStream(unfolding:)
initializer. Its usage looks a bit as follows:
let stream = AsyncStream(unfolding: {
return Int.random(in: 0..<Int.max)
})
Of course, this example isn’t particularly useful on its own but it does show how simple the concept of AsyncStream(unfolding:)
is. We use this version of AsyncStream
whenever we can produce and return return values for our async stream. The closure that’s passed to unfolding
is async
so this means that we can await
asynchronous operations from within our unfolding closure. Your unfolding closure will be called every time you’re expected to begin producing a value for your stream. In practice this means that your closure will be called, you perform some work, you return a value and then your closure is called. This repeats until the for loop is cancelled, the task that contains your async for loop is cancelled, or until you return nil
from your unfolding closure.
The AsyncStream(unfolding:)
way to produce a stream of values is quite convenient but it’s particularly useful in situations where:
- You want to perform async work that needs to be awaited to produce elements
- You have a need to handle back pressure when bridging an API you own
When you’re bridging an existing API that’s based on delegates or for APIs that leverage callbacks to communicate results, you probably won’t be able to use AsyncStream(unfolding:)
. While it’s the simplest and least error-prone way to build an async stream, it’s also the way that I’ve found to be most limiting and it doesn’t often fit well with bridging existing code over to Swift Concurrency.
More flexibility can be found in the continuation based API for AsyncStream
.
Producing an async stream with a continuation
When an asynchronous closure doesn’t quite fit your use case for creating your own async stream, a continuation based approach might be a much better solution for you. With a continuation you have the ability to construct an async stream object and send values over the async stream whenever values become available.
We can do this by creating an AsyncStream
using the AsyncStream(build:)
initializer:
let stream2 = AsyncStream { cont in
cont.yield(Int.random(in: 0..<Int.max))
}
The example above creates an AsyncStream
that produces a single integer value. This value is produced by calling yield
on the continuation. Every time we have a value to send, we should call yield
on the continuation with the value that we want to send.
If we’re building an AsyncStream
that wraps a delegate based API, we can hold on to our continuation in the delegate object and call yield
whenever a relevant delegate method is called.
For example, we could call continuation.yield
from within a CLLocationManagerDelegate
whenever a new user location is made available to us:
class AsyncLocationStream: NSObject, CLLocationManagerDelegate {
lazy var stream: AsyncStream<CLLocation> = {
AsyncStream { (continuation: AsyncStream<CLLocation>.Continuation) -> Void in
self.continuation = continuation
}
}()
var continuation: AsyncStream<CLLocation>.Continuation?
func locationManager(_ manager: CLLocationManager, didUpdateLocations locations: [CLLocation]) {
for location in locations {
continuation?.yield(location)
}
}
}
The example above is a very naive starting point for creating an async stream of user locations. There are a couple of things we don’t fully take into account such as cancelling and starting location observation or asking for location permissions.
At its core though, this example is a great starting point for experimenting with async streams.
Note that this approach will not wait for consumers of your async stream to consume a value fully before you can send your next value down the stream. Instead, all values that you send will be buffered in your async stream by default which may or may not be what you want.
In practical terms this means that when you send values down your stream faster than the consuming for loop can process these values, you will end up with a buffer filled with values that will be delivered to the consuming for loop with a delay. This might be exactly what you need, but if the values you send are somewhat time sensitive and ephemeral it would potentially make sense to drop values if the consuming for loop isn’t ready to receive values.
We could decide that we never want to hold on to more than 1 location and that we only want to buffer the last known location to avoid processing stale data. We can do this by setting a buffering policy on our async stream:
lazy var stream: AsyncStream<CLLocation> = {
AsyncStream(bufferingPolicy: .bufferingNewest(1)) { (continuation: AsyncStream<CLLocation>.Continuation) -> Void in
self.continuation = continuation
}
}()
This code passes a bufferingPolicy
of .bufferingNewest(1)
to our AsyncStream
. This means that we will only buffer a single value if the consuming for loop isn’t processing items fast enough, and we will discard older values in favor of keeping only the latest location.
If our stream comes to a natural close, you can call finish()
on your continuation to end the stream of values.
If your stream might fail with an error, you can also choose to create an AsyncThrowingStream
instead of an AsyncStream
. The key difference is that consumers of a throwing stream must await new values using try await
instead just await
. To make your stream throw an error you can either call finish(throwing:)
on your continuation or you can call yield(with:)
using a Result
object that represents a failure.
While the basics of building an AsyncStream
aren’t particularly complex, we do need to think about how we manage the lifecycles of the things we create carefully. Especially because we’re not supposed to make our continuations outlive our streams which is a very easy mistake to make when you’re bridging existing delegate based code.
Managing your stream’s lifecycle
There are essentially two ways for an async stream to end. First, the stream might naturally end producing values because no further values can be produced. You will call finish
on your continuation and you can provide any cleanup that you need to do at the same time. For example, you could set the continuation that you’re holding on to to nil
to make sure you can’t accidentally use it anymore.
Alternatively, your stream can end because the task that’s used to run your async stream is cancelled. Consider the following:
let locations = AsyncLocationStream()
let task = Task {
for await location in locations.stream {
print(location)
}
}
task.cancel()
When something like the above happens, we will want to make sure that we don’t call yield
on our continuation anymore unless we start a new stream with a new, active, continuation.
We can detect and respond to the end of our stream by setting an onTermination
handler on our continuation:
self.continuation?.onTermination = { result in
print(result)
self.continuation = nil
}
Ideally we set this handler immediately when we first create our async stream.
In addition to the stream being cancelled or otherwise going out of scope, we could break
out of our loop which will eventually cause our task to finish. This is generally speaking not something this will end your async stream so if you want breaking out of your loop to end your stream, you will need to take this into account yourself.
Personally, I’ve found that the easiest way to make sure you do some cleanup is to have some method on your stream producing object to cancel the stream instead of just breaking out of an async for loop. That way, you can perform cleanup and not have a stream that’s sending values even though nobody is listening.
It’s also important to bear in mind that the pattern I showed earlier will only work if one consumer uses your location stream object. You cannot have multiple for loops iterating over a single stream in Swift Concurrency because by default, async sequences lack the ability to share their iterations with multiple loops.
If you're interested in seeing a practical application of async streams to bridge existing code into Swift Concurrency, take a look at this post where I use AsyncStream to iterate over incoming web socket messages.
In Summary
In this post, you learned a lot about async streams and how you can produce your own async sequences. First, you saw the unfolding
approach of building an async stream and you learned that this approach is relatively straightforward but might not be very useful for people that need to bridge existing delegate or callback based APIs.
After exploring unfolding
for a bit, we took a look at the build
closure for async streams. You learned that this approach leverages a continuation object that can be called to produce values if and when needed.
You saw a very rudimentary example of an object that would bridge a CLLocationManager
into async await, and you learned a but about correctly managing your continuations to prevent sending values into an already completed stream.
If you have any questions or comments for me about this post, please feel free to reach out on Twitter or on Mastodon.