Creating an Observable around an async/await method using RxSwift
Asked Answered
A

3

8

I am using the AWS Amplify library https://github.com/aws-amplify/amplify-swift to communicate with the Cognito service. Most of the functions have been rewritten using the new async/await approach.

Looking at the following method:

func fetchAuthSession() async throws -> AuthSession {
    return try await Amplify.Auth.fetchAuthSession()
}

How can I wrap the await call to return an Observable<AuthSession> using RxSwift?

Appliance answered 6/12, 2022 at 14:31 Comment(0)
S
16

Use Observable.create to create an Observable.

Use Task { ... } to perform async work.

Use Task { ... } inside Observable.create to perform async work in an Observable.

Something like this should work:

let authSessionObservable: Observable<AuthSession> = Observable.create { observer in
    let task = Task {
        do {
            let session = try await Amplify.Auth.fetchAuthSession()
            observer.on(.next(session))
            observer.on(.completed)
        } catch {
            observer.on(.error(error))
        }
    }
    return Disposables.create {
        task.cancel()
    }
}
Swimming answered 6/12, 2022 at 15:4 Comment(0)
U
6

It might help to see a generic version:

extension Observable {
    static func create(_ fn: @escaping () async throws -> Element) -> Observable<Element> {
        Observable.create { observer in
            let task = Task {
                do {
                    observer.on(.next(try await fn()))
                    observer.on(.completed)
                } catch {
                    observer.on(.error(error))
                }
            }
            return Disposables.create { task.cancel() }
        }
    }
}

Which would be used in this context like this:

func fetchAuthSession() -> Observable<AuthSession> {
    .create(Amplify.Auth.fetchAuthSession)
}
Unwind answered 6/12, 2022 at 19:44 Comment(0)
P
5

As others have said, you need to wrap your async call into a custom Observable. However, instead of using the type erased class, I'd recommend creating a Single:

extension Single {
    static func fromAsync<T>(_ fn: @escaping () async throws -> T) -> Single<T> {
        .create { observer in
            let task = Task {
                do { try await observer(.success(fn())) }
                catch { observer(.failure(error))}
            }
            return Disposables.create { task.cancel() }
        }
    }
}

A Single suits better, since it models an observable that emits exactly one element, which means in the first place better semantics, better transmitting of the intent, and increased compiler support when creating and consuming the observable.

Usage:

let fetchAuthSession = Single.fromAsync { try await Amplify.Auth.fetchAuthSession() }

, or (functional programming style):

let fetchAuthSession = Single.fromAsync(Amplify.Auth.fetchAuthSession)
Plumlee answered 24/4, 2023 at 5:35 Comment(1)
This is exactly what I want.Duppy

© 2022 - 2024 — McMap. All rights reserved.