Skip to content

Commit

Permalink
- added CoFuture init with promise callback
Browse files Browse the repository at this point in the history
- added subscribeCoChannel() for Publisher
- added publisher() to CoChannel
  • Loading branch information
Alex Belozierov committed Jun 12, 2020
1 parent c3da301 commit c680c3d
Show file tree
Hide file tree
Showing 61 changed files with 666 additions and 72 deletions.
19 changes: 19 additions & 0 deletions Sources/SwiftCoroutine/CoFuture/Core/CoFuture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ extension CoFuture: _CoFutureCancellable {
self.init(_result: nil)
self.parent = parent
}

/// Initializes a future that invokes a promise closure.
/// ```
/// func someAsyncFunc(callback: @escaping (Result<Int, Error>) -> Void) { ... }
///
/// let future = CoFuture(promise: someAsyncFunc)
/// ```
/// - Parameter promise: A closure to fulfill this future.
@inlinable public convenience init<E: Error>(promise: (@escaping (Result<Value, E>) -> Void) -> Void) {
self.init(_result: nil)
promise(setResult2)
}

/// Starts a new coroutine and initializes future with its result.
///
Expand Down Expand Up @@ -154,6 +166,13 @@ extension CoFuture: _CoFutureCancellable {
nodes.close()?.finish(with: result)
}

@inlinable internal func setResult2<E: Error>(_ result: Result<Value, E>) {
switch result {
case .success(let value): setResult(.success(value))
case .failure(let error): setResult(.failure(error))
}
}

// MARK: - Callback

@usableFromInline internal func addCallback(_ callback: @escaping (_Result) -> Void) {
Expand Down
5 changes: 1 addition & 4 deletions Sources/SwiftCoroutine/CoFuture/Core/CoPromise.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ extension CoPromise {
}

@inlinable public func complete<E: Error>(with result: Result<Value, E>) {
switch result {
case .success(let value): setResult(.success(value))
case .failure(let error): setResult(.failure(error))
}
setResult2(result)
}

@inlinable public func success(_ value: Value) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//
// CoChannel+Combine.swift
// SwiftCoroutine
//
// Created by Alex Belozierov on 11.06.2020.
// Copyright © 2020 Alex Belozierov. All rights reserved.
//

#if canImport(Combine)
import Combine

@available(OSX 10.15, iOS 13.0, *)
extension CoChannel {

// MARK: - publisher

/// Returns a publisher that emits elements of this `CoChannel`.
@inlinable public func publisher() -> AnyPublisher<Element, CoChannelError> {
channel.publisher()
}

}

@available(OSX 10.15, iOS 13.0, *)
extension CoChannel.Receiver {

// MARK: - publisher

/// Returns a publisher that emits elements of this `Receiver`.
public func publisher() -> AnyPublisher<Element, CoChannelError> {
CoChannelPublisher(receiver: self).eraseToAnyPublisher()
}

}

@available(OSX 10.15, iOS 13.0, *)
extension Publisher {

/// Attaches `CoChannel.Receiver` as a subscriber and returns it.
public func subscribeCoChannel(buffer: CoChannel<Output>.BufferType = .unlimited) -> CoChannel<Output>.Receiver {
let channel = CoChannel<Output>(bufferType: buffer)
let cancellable = sink(receiveCompletion: { _ in channel.close() },
receiveValue: { channel.offer($0) })
channel.whenCanceled(cancellable.cancel)
return channel.receiver
}

}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//
// CoChannelPublisher.swift
// SwiftCoroutine
//
// Created by Alex Belozierov on 11.06.2020.
// Copyright © 2020 Alex Belozierov. All rights reserved.
//

#if canImport(Combine)
import Combine

@available(OSX 10.15, iOS 13.0, *)
internal final class CoChannelPublisher<Output> {

internal typealias Failure = CoChannelError
internal let receiver: CoChannel<Output>.Receiver

@inlinable internal init(receiver: CoChannel<Output>.Receiver) {
self.receiver = receiver
}

}

@available(OSX 10.15, iOS 13.0, *)
extension CoChannelPublisher: Publisher {

@inlinable internal func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
let subscription = CoChannelSubscription(subscriber: subscriber, receiver: receiver)
subscriber.receive(subscription: subscription)
}

}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//
// CoChannelSubscription.swift
// SwiftCoroutine
//
// Created by Alex Belozierov on 11.06.2020.
// Copyright © 2020 Alex Belozierov. All rights reserved.
//

#if canImport(Combine)
import Combine

@available(OSX 10.15, iOS 13.0, *)
internal final class CoChannelSubscription<S: Subscriber, T>: Subscription where S.Input == T, S.Failure == CoChannelError {

private let receiver: CoChannel<T>.Receiver
private var subscriber: S?

@inlinable internal init(subscriber: S, receiver: CoChannel<T>.Receiver) {
self.receiver = receiver
self.subscriber = subscriber
@inline(__always) func subscribe() {
receiver.whenReceive { result in
guard let subscriber = self.subscriber else { return }
switch result {
case .success(let result):
_ = subscriber.receive(result)
subscribe()
case .failure(let error) where error == .canceled:
subscriber.receive(completion: .failure(error))
case .failure:
subscriber.receive(completion: .finished)
}
}
}
subscribe()
}

@inlinable internal func cancel() {
subscriber = nil
}

@inlinable internal func request(_ demand: Subscribers.Demand) {}

}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import Combine

@available(OSX 10.15, iOS 13.0, *)
final class CoFuturePublisher<Output, Future: CoFuture<Output>> {
internal final class CoFuturePublisher<Output> {

typealias Failure = Error
internal typealias Failure = Error

let future: Future
internal let future: CoFuture<Output>

@inlinable init(future: Future) {
@inlinable internal init(future: CoFuture<Output>) {
self.future = future
}

Expand All @@ -25,8 +25,8 @@ final class CoFuturePublisher<Output, Future: CoFuture<Output>> {
@available(OSX 10.15, iOS 13.0, *)
extension CoFuturePublisher: Publisher {

@inlinable func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
let subscription = CoSubscription(subscriber: subscriber, future: future)
@inlinable internal func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
let subscription = CoFutureSubscription(subscriber: subscriber, future: future)
subscriber.receive(subscription: subscription)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// CoSubscription.swift
// CoFutureSubscription.swift
// SwiftCoroutine
//
// Created by Alex Belozierov on 15.03.2020.
Expand All @@ -10,7 +10,7 @@
import Combine

@available(OSX 10.15, iOS 13.0, *)
internal final class CoSubscription<S: Subscriber, T>: Subscription where S.Input == T, S.Failure == Error {
internal final class CoFutureSubscription<S: Subscriber, T>: Subscription where S.Input == T, S.Failure == Error {

private let future: CoFuture<T>
private var subscriber: S?
Expand Down
Loading

0 comments on commit c680c3d

Please sign in to comment.