Skip to content

Commit

Permalink
combineLatest extensions for Observable and Flowable (ReactiveX#116)
Browse files Browse the repository at this point in the history
* combineLatest extensions for Observable and Flowable

* jdk8
  • Loading branch information
stepango authored and thomasnield committed May 3, 2017
1 parent 7d6cdaf commit 28db815
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: java

jdk:
- oraclejdk7
- oraclejdk8

sudo: false
# as per http://blog.travis-ci.com/2014-12-17-faster-builds-with-container-based-infrastructure/
Expand Down
14 changes: 14 additions & 0 deletions src/main/kotlin/io/reactivex/rxkotlin/flowable.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.reactivex.rxkotlin

import io.reactivex.Flowable
import io.reactivex.functions.BiFunction
import io.reactivex.functions.Function3


fun BooleanArray.toFlowable(): Flowable<Boolean> = asIterable().toFlowable()
Expand Down Expand Up @@ -63,6 +65,18 @@ private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
override fun iterator(): Iterator<T> = this@toIterable
}

/**
* Combine latest operator that produces [Pair]
*/
fun <T : Any, R : Any> Flowable<T>.combineLatest(flowable: Flowable<R>): Flowable<Pair<T, R>>
= Flowable.combineLatest(this, flowable, BiFunction(::Pair))

/**
* Combine latest operator that produces [Triple]
*/
fun <T : Any, R : Any, U : Any> Flowable<T>.combineLatest(flowable1: Flowable<R>, flowable2: Flowable<U>): Flowable<Triple<T, R, U>>
= Flowable.combineLatest(this, flowable1, flowable2, Function3(::Triple))

//EXTENSION FUNCTION OPERATORS

/**
Expand Down
13 changes: 13 additions & 0 deletions src/main/kotlin/io/reactivex/rxkotlin/observable.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.reactivex.rxkotlin

import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import io.reactivex.functions.Function3


fun BooleanArray.toObservable(): Observable<Boolean> = asIterable().toObservable()
Expand Down Expand Up @@ -63,6 +65,17 @@ private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
override fun iterator(): Iterator<T> = this@toIterable
}

/**
* Combine latest operator that produces [Pair]
*/
fun <T : Any, R : Any> Observable<T>.combineLatest(observable: Observable<R>): Observable<Pair<T, R>>
= Observable.combineLatest(this, observable, BiFunction(::Pair))

/**
* Combine latest operator that produces [Triple]
*/
fun <T : Any, R : Any, U : Any> Observable<T>.combineLatest(observable1: Observable<R>, observable2: Observable<U>): Observable<Triple<T, R, U>>
= Observable.combineLatest(this, observable1, observable2, Function3(::Triple))

// EXTENSION FUNCTION OPERATORS

Expand Down
78 changes: 49 additions & 29 deletions src/test/kotlin/io/reactivex/rxkotlin/FlowableTest.kt
Original file line number Diff line number Diff line change
@@ -1,37 +1,41 @@
package io.reactivex.rxkotlin

import io.reactivex.BackpressureStrategy
import io.reactivex.Flowable
import org.junit.Assert
import io.reactivex.Flowable.create
import io.reactivex.FlowableEmitter
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotNull
import org.junit.Ignore
import org.junit.Test
import java.util.concurrent.atomic.AtomicInteger

class FlowableTest {

private fun <T: Any> bufferedFlowable(source: (io.reactivex.FlowableEmitter<T>) -> Unit) =
io.reactivex.Flowable.create(source, io.reactivex.BackpressureStrategy.BUFFER)
private fun <T : Any> bufferedFlowable(source: (FlowableEmitter<T>) -> Unit) =
create(source, BackpressureStrategy.BUFFER)

@org.junit.Test fun testCreation() {
val o0: io.reactivex.Flowable<Int> = io.reactivex.Flowable.empty()
val o0: Flowable<Int> = Flowable.empty()
val list = bufferedFlowable<Int> { s ->
s.onNext(1)
s.onNext(777)
s.onComplete()
}.toList().blockingGet()
org.junit.Assert.assertEquals(listOf(1, 777), list)
val o1: io.reactivex.Flowable<Int> = listOf(1, 2, 3).toFlowable()
val o2: io.reactivex.Flowable<List<Int>> = io.reactivex.Flowable.just(listOf(1, 2, 3))

val o3: io.reactivex.Flowable<Int> = io.reactivex.Flowable.defer { bufferedFlowable<Int> { s -> s.onNext(1) } }
val o4: io.reactivex.Flowable<Int> = Array(3) { 0 }.toFlowable()
val o5: io.reactivex.Flowable<Int> = IntArray(3).toFlowable()

org.junit.Assert.assertNotNull(o0)
org.junit.Assert.assertNotNull(o1)
org.junit.Assert.assertNotNull(o2)
org.junit.Assert.assertNotNull(o3)
org.junit.Assert.assertNotNull(o4)
org.junit.Assert.assertNotNull(o5)
assertEquals(listOf(1, 777), list)
val o1: Flowable<Int> = listOf(1, 2, 3).toFlowable()
val o2: Flowable<List<Int>> = Flowable.just(listOf(1, 2, 3))

val o3: Flowable<Int> = Flowable.defer { bufferedFlowable<Int> { s -> s.onNext(1) } }
val o4: Flowable<Int> = Array(3) { 0 }.toFlowable()
val o5: Flowable<Int> = IntArray(3).toFlowable()

assertNotNull(o0)
assertNotNull(o1)
assertNotNull(o2)
assertNotNull(o3)
assertNotNull(o4)
assertNotNull(o5)
}

@org.junit.Test fun testExampleFromReadme() {
Expand All @@ -48,34 +52,34 @@ class FlowableTest {
map { it.toString() }.
blockingGet()

Assert.assertEquals("Hello", result)
assertEquals("Hello", result)
}

@Test fun iteratorFlowable() {
Assert.assertEquals(listOf(1, 2, 3), listOf(1, 2, 3).iterator().toFlowable().toList().blockingGet())
assertEquals(listOf(1, 2, 3), listOf(1, 2, 3).iterator().toFlowable().toList().blockingGet())
}

@Test fun intProgressionStep1Empty() {
Assert.assertEquals(listOf(1), (1..1).toFlowable().toList().blockingGet())
assertEquals(listOf(1), (1..1).toFlowable().toList().blockingGet())
}

@Test fun intProgressionStep1() {
Assert.assertEquals((1..10).toList(), (1..10).toFlowable().toList().blockingGet())
assertEquals((1..10).toList(), (1..10).toFlowable().toList().blockingGet())
}

@Test fun intProgressionDownTo() {
Assert.assertEquals((1 downTo 10).toList(), (1 downTo 10).toFlowable().toList().blockingGet())
assertEquals((1 downTo 10).toList(), (1 downTo 10).toFlowable().toList().blockingGet())
}

@Ignore("Too slow")
@Test fun intProgressionOverflow() {
Assert.assertEquals((0..10).toList().reversed(), (-10..Integer.MAX_VALUE).toFlowable().skip(Integer.MAX_VALUE.toLong()).map { Integer.MAX_VALUE - it }.toList().blockingGet())
assertEquals((0..10).toList().reversed(), (-10..Integer.MAX_VALUE).toFlowable().skip(Integer.MAX_VALUE.toLong()).map { Integer.MAX_VALUE - it }.toList().blockingGet())
}


@Test fun testFold() {
val result = listOf(1, 2, 3).toFlowable().reduce(0) { acc, e -> acc + e }.blockingGet()
Assert.assertEquals(6, result)
assertEquals(6, result)
}

@Test fun `kotlin sequence should produce expected items and flowable be able to handle em`() {
Expand All @@ -93,24 +97,24 @@ class FlowableTest {
toList().
subscribe()

Assert.assertEquals(100, generated.get())
assertEquals(100, generated.get())
}

@Test fun testFlatMapSequence() {
Assert.assertEquals(
assertEquals(
listOf(1, 2, 3, 2, 3, 4, 3, 4, 5),
listOf(1, 2, 3).toFlowable().flatMapSequence { listOf(it, it + 1, it + 2).asSequence() }.toList().blockingGet()
)
}

@Test fun testCombineLatest() {
val list = listOf(1, 2, 3, 2, 3, 4, 3, 4, 5)
Assert.assertEquals(list, list.map { Flowable.just(it) }.combineLatest { it }.blockingFirst())
assertEquals(list, list.map { Flowable.just(it) }.combineLatest { it }.blockingFirst())
}

@Test fun testZip() {
val list = listOf(1, 2, 3, 2, 3, 4, 3, 4, 5)
Assert.assertEquals(list, list.map { Flowable.just(it) }.zip { it }.blockingFirst())
assertEquals(list, list.map { Flowable.just(it) }.zip { it }.blockingFirst())
}

@Test fun testCast() {
Expand All @@ -129,4 +133,20 @@ class FlowableTest {
flowable.test()
.assertError(ClassCastException::class.java)
}

@Test fun combineLatestPair() {
Flowable.just(3)
.combineLatest(Flowable.just(10))
.map { (x, y) -> x * y }
.test()
.assertValues(30)
}

@Test fun combineLatestTriple() {
Flowable.just(3)
.combineLatest(Flowable.just(10), Flowable.just(20))
.map { (x, y, z) -> x * y * z }
.test()
.assertValues(600)
}
}
16 changes: 15 additions & 1 deletion src/test/kotlin/io/reactivex/rxkotlin/ObservableTest.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.reactivex.rxkotlin

import io.reactivex.Observable
import io.reactivex.observers.TestObserver
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotNull
import org.junit.Ignore
Expand Down Expand Up @@ -172,7 +171,22 @@ class ObservableTest {
.assertValues(BigDecimal.valueOf(15, 1), 2, BigDecimal.valueOf(42), 15)
.assertNoErrors()
.assertComplete()
}

@Test fun combineLatestPair() {
Observable.just(3)
.combineLatest(Observable.just(10))
.map { (x, y) -> x * y }
.test()
.assertValues(30)
}

@Test fun combineLatestTriple() {
Observable.just(3)
.combineLatest(Observable.just(10), Observable.just(20))
.map { (x, y, z) -> x * y * z }
.test()
.assertValues(600)
}

}

0 comments on commit 28db815

Please sign in to comment.