Building an AsyncSequence with AsyncStream.makeStream
Published on: March 25, 2024A while ago I’ve published a post that explains how you can use AsyncStream to build your own asynchronous sequences in Swift Concurrency. Since writing that post, a new approach to creating AsyncStream objects has been introduced to allow for more convenience stream building.
In this post, I’ll expand on what we’ve already covered in the previous post so that we don’t have to go over everything from scratch.
By the end of this post you will understand the new and more convenient makeStream
method that was added to AsyncStream
. You’ll learn how and when it makes sense to build your own async streams, and I will reiterate some of their gotchas to help you avoid mistakes that I’ve had to make in the past.
If you prefer to learn by watching videos, this video is for you:
Reviewing the older situation
While I won’t explain the old approach in detail, I think it makes sense to go over the old approach in order to refresh your mind. Or if you weren’t familiar with the old approach, it will help put the improvements in Swift 5.9 into perspective a bit more.
Pre-Swift 5.9 we could create our AsyncStream
objects as follows:
let stream = AsyncStream(unfolding: {
return Int.random(in: 0..<Int.max)
})
The approach shown here is the simplest way to build an async stream but also the least flexible.
In short, the closure that we pass to unfolding
here will be called every time we’re expected to asynchronously produce a new value for our stream. Once the value is produced, you return it so that the for loop
iterating over this sequence can use the value. To terminate your async stream, you return nil
from your closure to indicate that there are no further values to be produced.
This approach lacks some flexibility and doesn’t fit very well for transforming things like delegate based code over into Swift Concurrency.
A more useful and flexible way to build an AsyncStream
that can bridge a callback based API like CLLocationManagerDelegate
looks as follows:
class AsyncLocationStream: NSObject, CLLocationManagerDelegate {
lazy var stream: AsyncStream<CLLocation> = {
AsyncStream { (continuation: AsyncStream<CLLocation>.Continuation) -> Void in
self.continuation = continuation
}
}()
var continuation: AsyncStream<CLLocation>.Continuation?
func locationManager(_ manager: CLLocationManager, didUpdateLocations locations: [CLLocation]) {
for location in locations {
continuation?.yield(location)
}
}
}
This code does a little bit more than build an async stream so let’s go over it in a bit more detail.
First, there’s a lazy var
that’s used to create an instance of AsyncStream
. When we create the async stream, we pass the AsyncStream
initializer a closure. This closure receives a continuation object that we can use to push values onto our AsyncStream
. Because we’re bridging a callback based API we need access to the continuation from outside of the initial closure so we assign the continuation to a var
on the AsyncLocationStream
object.
Next, we have the didUpdateLocations
delegate method. From that method, we call yield
on the continuation to push every received location onto our AsyncStream
which allows anybody that’s writing a for loop
over the stream
property to receive locations. Here’s what that would like like in a simplified example:
let locationStream = AsyncLocationStream()
for await value in locationStream.stream {
print("location received", value)
}
While this all works perfectly fine, there’s this optional continuation
that we’re dealing with. Luckily, the new makeStream
approach takes care of this.
Creating a stream with makeStream
In essence, a makeStream
based AsyncStream
works identical to the one you saw earlier.
We still work with a continuation that’s used to yield
values to whoever is iterating our stream. In order to end the stream we call finish
on the continuation, and to handle someone cancelling their Task
or breaking out of the for loop you can still use onTermination
on the continuation to perform cleanup. We’ll take a look at onTermination
in the next section.
For now, let’s focus on seeing how makeStream
allows us to rewrite the example you just saw to be a bit cleaner.
class AsyncLocationStream: NSObject, CLLocationManagerDelegate {
let stream: AsyncStream<CLLocation>
private let continuation: AsyncStream<CLLocation>.Continuation
override init() {
let (stream, continuation) = AsyncStream.makeStream(of: CLLocation.self)
self.stream = stream
self.continuation = continuation
super.init()
}
func locationManager(_ manager: CLLocationManager, didUpdateLocations locations: [CLLocation]) {
for location in locations {
continuation.yield(location)
}
}
}
We’ve written a little bit more code than we had before but the code we have now is slightly cleaner and more readable.
Instead of a lazy var
we can now define two let
properties which fits much better with what we’re trying to do. Additionally, we create our AsyncStream
and its continuation in a single line of code instead of needing a closure to lift the continuation from our closure onto our class.
Everything else remains pretty much the same. We still call yield
to push values onto our stream, and we still use finish
to end our continuation (we’re not calling that in the snippet above).
While this is all very convenient, AsyncStream.makeStream
comes with the same memory and lifecycle related issues as its older counterparts. Let’s take a brief look at these issues and how to fix them in the next section.
Avoiding memory leaks and infinite loops
When we’re iterating an async sequence from within a task, it’s reasonable to expect that at some point the object we’re iterating goes out of scope and that our iteration stops.
For example, if we’re leveraging the AsyncLocationStream
you saw before from within a ViewModel
we’d want the location updates to stop automatically whenever the screen, its ViewModel
, and the AsyncLocationStream
go out of scope.
In reality, these objects will go out of scope but any task that’s iterating the AsyncLocationStream
's stream
won’t end until the stream’s continuation is explicitly ended. I've explored this phenomenon more in depth in this post where I dig into lifecycle management for async sequences.
Let’s look at an example that demonstrates this effect. We’ll look at a dummy LocationProvider
first.
class LocationProvider {
let locations: AsyncStream<UUID>
private let continuation: AsyncStream<UUID>.Continuation
private let cancellable: AnyCancellable?
init() {
let stream = AsyncStream.makeStream(of: UUID.self)
locations = stream.stream
continuation = stream.continuation
}
deinit {
print("location provider is gone")
}
func startUpdates() {
cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
.autoconnect()
.sink(receiveValue: { [weak self] _ in
print("will send")
self?.continuation.yield(UUID())
})
}
}
The object above creates an AsyncStream
just like you saw before. When we call startUpdates
we start simulating receiving location updates. Every second, we send a new unique UUID
onto our stream.
To make the test realistic, I’ve added a MyViewModel
object that would normally serve as the interface in between the location provider and the view:
class MyViewModel {
let locationProvider = LocationProvider()
var locations: AsyncStream<UUID> {
locationProvider.locations
}
deinit {
print("view model is gone")
}
init() {
locationProvider.startUpdates()
}
}
We’re not doing anything special in this code so let’s move on to creating the test scenario itself:
var viewModel: MyViewModel? = MyViewModel()
let sampleTask = Task {
guard let locations = viewModel?.locations else { return }
print("before for loop")
for await location in locations {
print(location)
}
print("after for loop")
}
Task {
try await Task.sleep(for: .seconds(2))
viewModel = nil
}
In our test, we set up two tasks. One that we’ll use to iterate over our AsyncStream
and we print some strings before and after the loop.
We have a second task that runs in parallel. This task will wait for two seconds and then it sets the viewModel
property to nil
. This simulates a screen going away and the view model being deallocated because of it.
Let’s look at the printed results for this code:
before for loop
will send
B9BED2DE-B929-47A6-B47D-C28AD723FCB1
will send
FCE7DAD1-D47C-4D03-81FD-42B0BA38F976
view model is gone
location provider is gone
Notice how we’re not seeing after the loop
printed here.
This means that while the view model and location provider both get deallocated as expected, we’re not seeing the for loop end like we’d want to.
To fix this, we need to make sure that we finish
our continuation when the location provider is deallocated:
class LocationProvider {
// ...
deinit {
print("location provider is gone")
continuation.finish()
}
// ...
}
In the deinit
for LocationProvider
we can call continuation.finish()
which will fix the leak that we just saw. If we run the code again, we’ll see the following output:
before for loop
will send
B3DE2994-E0E1-4397-B04E-448047315133
will send
D790D3FA-FE40-4182-9F58-1FEC93335F18
view model is gone
location provider is gone
after for loop
So that fixed our for loop sitting and waiting for a value that would never come (and our Task
being stuck forever as a result). However, we’re not out of the woods yet. Let’s change the test setup a little bit. Instead of deallocating the view model, let’s try cancelling the Task
that we created to iterate the AsyncStream
.
var viewModel: MyViewModel? = MyViewModel()
let sampleTask = Task {
guard let locations = viewModel?.locations else { return }
print("before for loop")
for await location in locations {
print(location)
}
print("after for loop")
}
Task {
try await Task.sleep(for: .seconds(2))
sampleTask.cancel()
}
Running to code now results in the following output:
before for loop
will send
0B6E962F-F2ED-4C33-8155-140DB94F3AE0
will send
1E195613-2CE1-4763-80C4-590083E4353E
after for loop
will send
will send
will send
will send
So while our loop ended, the location updates don’t stop. We can add an onTermination
closure to our continuation to be notified of an ended for loop (which happens when you cancel a Task
that’s iterating an async sequence):
class LocationProvider {
// ...
func startUpdates() {
cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
.autoconnect()
.sink(receiveValue: { [weak self] _ in
print("will send")
self?.continuation.yield(UUID())
})
continuation.onTermination = { [weak self] _ in
self?.cancellable = nil
}
}
}
With this code in place, we can now handle both a task getting cancelled as well as our LocationProvider
being deallocated.
Whenever you’re writing your own async streams it’s important that you test what happens when the owner of your continuation is deallocated (you’ll usually want to finish your continuation) or when the for loop that iterates your stream is ended (you’ll want to perform some cleanup as needed).
Making mistakes here is quite easy so be sure to keep an eye out!
In Summary
In this post, you saw the new and more convenient AsyncStream.makeStream
method in action. You learned that this method replaces a less convenient AsyncStream
initializer that forced us to manually store a continuation outside of the closure which would usually lead to having a lazy var
for the stream and an optional for the continuation.
After showing you how you can use AsyncStream.makeStream
, you learned about some of the gotchas that come with async streams in general. I showed you how you can test for these gotchas, and how you can fix them to make sure that your streams end and clean up as and when you expect.