Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite streaming API to use websocket #383

Merged
merged 29 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1c048ec
Add API entities for streaming methods
PattaFeuFeu Dec 8, 2023
71e0c58
Implement streaming methods using WebSocket connection
PattaFeuFeu Dec 9, 2023
55d122f
Reuse access token from client initialisation
PattaFeuFeu Dec 9, 2023
c40d291
Add remaining documentation to public methods
PattaFeuFeu Dec 9, 2023
a1825f7
Remove obsolete streaming methods
PattaFeuFeu Dec 9, 2023
b85ffe2
Update documentation
PattaFeuFeu Dec 9, 2023
2711183
Propagate messages with binary string and those that cannot be parsed
PattaFeuFeu Dec 9, 2023
35cefa5
Remove unnecessarily duplicated code
PattaFeuFeu Dec 9, 2023
afad08a
Differentiate between technical and API events
PattaFeuFeu Dec 9, 2023
264041d
Try to parse EventType-specific data classes from raw stream events
PattaFeuFeu Dec 9, 2023
70d5e09
Limit visibility of EventType and StreamType to library only
PattaFeuFeu Dec 9, 2023
d061590
Move WebSocketEvent to entity/streaming package
PattaFeuFeu Dec 9, 2023
f7be595
Test Rx web socket implementation
PattaFeuFeu Dec 9, 2023
ce0ed57
Remove WebSocket close logging
PattaFeuFeu Dec 9, 2023
c392b7a
Add unit tests for non-Rx websocket streaming method
PattaFeuFeu Dec 10, 2023
f9ef845
Add checkServerHealth method
PattaFeuFeu Dec 11, 2023
7aea548
Use instance-specific streaming URL from API-retrieved Instance info
PattaFeuFeu Dec 16, 2023
12ba6c1
Turn MastodonClient properties to val if they won't change after build
PattaFeuFeu Dec 16, 2023
9908997
(unrelated) Add code folding suggestion to be able to fold methods
PattaFeuFeu Dec 16, 2023
27af088
Parse EventType directly and turn it non-nullable
PattaFeuFeu Dec 17, 2023
664ba8a
Add data class for AnnouncementReactionReceived payload
PattaFeuFeu Dec 17, 2023
47eef95
Close web socket in case of failure
PattaFeuFeu Dec 17, 2023
e3debd1
Review: Clarify “Available since” comments—Mastodon server, not BigBone
PattaFeuFeu Dec 18, 2023
fd47aa1
Review: Clarify server health method returns
PattaFeuFeu Dec 18, 2023
75be158
Review: Replace builder for streaming URL
PattaFeuFeu Dec 18, 2023
858b841
Review: Clarify server health method returns in Rx function
PattaFeuFeu Dec 18, 2023
f6d5c11
Review: Move WebSocketCallback to api.entity.streaming package
PattaFeuFeu Dec 19, 2023
91ba533
Review: Remove need for useStreamingApi call
PattaFeuFeu Dec 19, 2023
eb70dda
Merge branch 'master' into improve-streaming-apis
PattaFeuFeu Dec 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 11 additions & 34 deletions USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,6 @@ public class GetRawJson {

## Streaming API

v1.0.0 or later

__Kotlin__

```kotlin
Expand All @@ -218,47 +216,26 @@ val client: MastodonClient = MastodonClient.Builder(instanceHostname)
.useStreamingApi()
.build()

val handler = object : Handler {
override fun onStatus(status: Status) {
println(status.content)
}
override fun onNotification(notification: Notification) {/* no op */}
override fun onDelete(id: String) {/* no op */}
}

try {
val shutdownable = client.streaming.localPublic(handler)
Thread.sleep(10000L)
shutdownable.shutdown()
} catch(e: BigBoneRequestException) {
e.printStackTrace()
client.streaming.federatedPublic(
onlyMedia = false,
callback = { event: WebSocketEvent ->
println(event)
}
).use {
Thread.sleep(15_000L)
}
```

__Java__

```java
MastodonClient client = new MastodonClient.Builder(instanceHostname)
final MastodonClient client = new MastodonClient.Builder(instanceHostname)
.accessToken(accessToken)
.useStreamingApi()
.build();
Handler handler = new Handler() {
@Override
public void onStatus(@NotNull Status status) {
System.out.println(status.getContent());
}

@Override
public void onNotification(@NotNull Notification notification) {/* no op */}
@Override
public void onDelete(String id) {/* no op */}
};

try {
Shutdownable shutdownable = client.streaming().localPublic(handler);
Thread.sleep(10000L);
shutdownable.shutdown();
} catch (Exception e) {
e.printStackTrace();
// Start federated timeline streaming and stop after 20 seconds
try (Closeable ignored = client.streaming().federatedPublic(false, System.out::println)) {
Thread.sleep(20_000L);
}
```
130 changes: 95 additions & 35 deletions bigbone-rx/src/main/kotlin/social/bigbone/rx/RxStreamingMethods.kt
Original file line number Diff line number Diff line change
@@ -1,62 +1,122 @@
package social.bigbone.rx

import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Completable
import io.reactivex.rxjava3.core.Flowable
import social.bigbone.MastodonClient
import social.bigbone.api.Handler
import social.bigbone.api.Shutdownable
import social.bigbone.api.entity.Notification
import social.bigbone.api.entity.Status
import social.bigbone.api.WebSocketCallback
import social.bigbone.api.entity.streaming.MastodonApiEvent.GenericMessage
import social.bigbone.api.entity.streaming.MastodonApiEvent.StreamEvent
import social.bigbone.api.entity.streaming.TechnicalEvent.Closed
import social.bigbone.api.entity.streaming.TechnicalEvent.Closing
import social.bigbone.api.entity.streaming.TechnicalEvent.Failure
import social.bigbone.api.entity.streaming.TechnicalEvent.Open
import social.bigbone.api.entity.streaming.WebSocketEvent
import social.bigbone.api.method.StreamingMethods
import java.io.Closeable

/**
* Reactive implementation of [StreamingMethods].
* Allows access to API methods with endpoints having an "api/vX/streaming" prefix.
* @see <a href="https://docs.joinmastodon.org/methods/streaming/">Mastodon streaming API methods</a>
* @see <a href="https://docs.joinmastodon.org/methods/streaming/#streams">Mastodon streaming API methods</a>
*/
class RxStreamingMethods(client: MastodonClient) {

private val streamingMethods = StreamingMethods(client)

private fun stream(f: (Handler) -> Shutdownable): Flowable<Status> {
return Flowable.create<Status>({ emitter ->
val shutdownable = f(object : Handler {
override fun onStatus(status: Status) {
emitter.onNext(status)
}
/**
* Verify that the streaming service is alive before connecting to it.
*
* @return Completable that will complete if the server is “healthy” or emit an error via onError otherwise.
* @see <a href="https://docs.joinmastodon.org/methods/streaming/#health">streaming/#health API documentation</a>
*/
fun checkServerHealth(): Completable = Completable.fromAction {
streamingMethods.checkServerHealth()
}

override fun onNotification(notification: Notification) {
// no op
}
/**
* Stream all public posts known to this server. Analogous to the federated timeline.
*
* @param onlyMedia Filter for media attachments. Analogous to the federated timeline with “only media” enabled.
*/
fun federatedPublic(onlyMedia: Boolean): Flowable<WebSocketEvent> = stream {
streamingMethods.federatedPublic(onlyMedia, it)
}

override fun onDelete(id: String) {
// no op
}
})
emitter.setCancellable {
shutdownable.shutdown()
}
}, BackpressureStrategy.LATEST)
/**
* Stream all public posts originating from this server. Analogous to the local timeline.
*
* @param onlyMedia Filter for media attachments. Analogous to the local timeline with “only media” enabled.
*/
fun localPublic(onlyMedia: Boolean): Flowable<WebSocketEvent> = stream {
streamingMethods.localPublic(onlyMedia, it)
}

private fun statusStream(f: (Handler) -> Shutdownable): Flowable<Status> {
return stream { handler ->
f(handler)
}
/**
* Stream all public posts originating from other servers.
*
* @param onlyMedia Filter for media attachments.
*/
fun remotePublic(onlyMedia: Boolean): Flowable<WebSocketEvent> = stream {
streamingMethods.remotePublic(onlyMedia, it)
}

private fun tagStream(tag: String, f: (String, Handler) -> Shutdownable): Flowable<Status> {
return stream { handler ->
f(tag, handler)
}
/**
* Stream all public posts using the hashtag [tagName].
*
* @param tagName Hashtag the public posts you want to stream should have.
* @param onlyFromThisServer Filter for public posts originating from this server.
*/
fun hashtag(
tagName: String,
onlyFromThisServer: Boolean
): Flowable<WebSocketEvent> = stream { callback ->
streamingMethods.hashtag(
tagName = tagName,
onlyFromThisServer = onlyFromThisServer,
callback = callback
)
}

fun localPublic(): Flowable<Status> = statusStream(streamingMethods::localPublic)
/**
* Stream all events related to the current user, such as home feed updates and notifications.
*/
fun user(): Flowable<WebSocketEvent> = stream(streamingMethods::user)

fun federatedPublic(): Flowable<Status> = statusStream(streamingMethods::federatedPublic)
/**
* Stream all notifications for the current user.
*/
fun userNotifications(): Flowable<WebSocketEvent> = stream(streamingMethods::userNotifications)

fun localHashtag(tag: String): Flowable<Status> = tagStream(tag, streamingMethods::localHashtag)
/**
* Stream updates to the list with [listId].
*
* @param listId List you want to receive updates for.
*/
fun list(listId: String): Flowable<WebSocketEvent> = stream { callback ->
streamingMethods.list(
listId = listId,
callback = callback
)
}

fun federatedHashtag(tag: String): Flowable<Status> = tagStream(tag, streamingMethods::federatedHashtag)
/**
* Stream all updates to direct conversations.
*/
fun directConversations(): Flowable<WebSocketEvent> = stream(streamingMethods::directConversations)

// TODO user streaming
private fun stream(streamMethod: (WebSocketCallback) -> Closeable): Flowable<WebSocketEvent> =
Flowable.create({ emitter ->
val closeable = streamMethod { webSocketEvent ->
when (webSocketEvent) {
is Closed -> emitter.onComplete()
is Failure -> emitter.tryOnError(webSocketEvent.error)
Open,
is Closing,
is StreamEvent,
is GenericMessage -> emitter.onNext(webSocketEvent)
}
}
emitter.setCancellable { closeable.close() }
}, BackpressureStrategy.BUFFER)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package social.bigbone.rx

import io.mockk.mockk
import io.reactivex.rxjava3.schedulers.TestScheduler
import io.reactivex.rxjava3.subscribers.TestSubscriber
import okio.IOException
import org.junit.jupiter.api.Test
import social.bigbone.api.entity.streaming.MastodonApiEvent
import social.bigbone.api.entity.streaming.ParsedStreamEvent
import social.bigbone.api.entity.streaming.StreamType
import social.bigbone.api.entity.streaming.TechnicalEvent
import social.bigbone.api.entity.streaming.WebSocketEvent
import social.bigbone.rx.testtool.MockClient

class RxStreamingMethodsTest {

private val testScheduler = TestScheduler()

@Test
fun `Given client returning OK, when checking server health, then complete without errors`() {
val client = MockClient.mockClearText(clearTextResponse = "OK")
val streamingMethods = RxStreamingMethods(client)

val serverHealth = streamingMethods.checkServerHealth().test()

with(serverHealth) {
assertComplete()
assertNoErrors()
}
}

@Test
fun `Given websocket with 6 events lined up, when streaming federated public timeline, then expect emissions and no errors`() {
val mockedEvents: List<WebSocketEvent> = listOf(
TechnicalEvent.Open,
MastodonApiEvent.StreamEvent(
ParsedStreamEvent.FiltersChanged,
listOf(StreamType.PUBLIC)
),
MastodonApiEvent.StreamEvent(
ParsedStreamEvent.StatusCreated(mockk()),
listOf(StreamType.PUBLIC)
),
MastodonApiEvent.StreamEvent(
ParsedStreamEvent.StatusDeleted(deletedStatusId = "12345"),
listOf(StreamType.PUBLIC)
),
MastodonApiEvent.StreamEvent(
ParsedStreamEvent.AnnouncementDeleted(deletedAnnouncementId = "54321"),
listOf(StreamType.PUBLIC)
),
MastodonApiEvent.StreamEvent(
ParsedStreamEvent.StatusCreated(createdStatus = mockk()),
listOf(StreamType.PUBLIC)
)
)
val client = MockClient.mockWebSocket(events = mockedEvents)
val streamingMethods = RxStreamingMethods(client)

val testSubscriber: TestSubscriber<WebSocketEvent> = streamingMethods
.federatedPublic(onlyMedia = false)
.subscribeOn(testScheduler)
.observeOn(testScheduler)
.test()
testScheduler.triggerActions()

with(testSubscriber) {
assertValueCount(mockedEvents.size)
assertValueSequence(mockedEvents)
assertNotComplete()
assertNoErrors()
cancel()
}
}

@Test
fun `Given websocket with failure, when streaming federated public timeline, then expect error is propagated`() {
val expectedError = IOException("Expected")
val mockedEvents: List<WebSocketEvent> = listOf(
TechnicalEvent.Open,
TechnicalEvent.Failure(expectedError)
)
val client = MockClient.mockWebSocket(events = mockedEvents)
val streamingMethods = RxStreamingMethods(client)

val testSubscriber: TestSubscriber<WebSocketEvent> = streamingMethods
.federatedPublic(onlyMedia = false)
.subscribeOn(testScheduler)
.observeOn(testScheduler)
.test()
testScheduler.triggerActions()

with(testSubscriber) {
assertNotComplete()
assertError(expectedError)
cancel()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,57 @@ import okhttp3.RequestBody
import okhttp3.Response
import okhttp3.ResponseBody
import okhttp3.ResponseBody.Companion.toResponseBody
import okhttp3.WebSocket
import social.bigbone.MastodonClient
import social.bigbone.Parameters
import social.bigbone.api.WebSocketCallback
import social.bigbone.api.entity.streaming.WebSocketEvent
import social.bigbone.api.exception.BigBoneRequestException
import java.net.SocketTimeoutException

object MockClient {

/**
* Mocks a [MastodonClient] for functions testing the websocket streaming APIs.
*
* @param events [WebSocketEvent]s that should be lined up to be returned by the [WebSocketCallback]
*/
fun mockWebSocket(events: Collection<WebSocketEvent>): MastodonClient {
val webSocket = mockk<WebSocket> {
every { close(any<Int>(), any<String>()) } returns true
}
return mockk<MastodonClient> {
every { stream(any<Parameters>(), any<WebSocketCallback>()) } answers {
events.forEach { event: WebSocketEvent -> secondArg<WebSocketCallback>().onEvent(event) }
webSocket
}
}
}

fun mockClearText(
clearTextResponse: String,
requestUrl: String = "https://example.com"
): MastodonClient {
val response: Response = Response.Builder()
.code(200)
.message("OK")
.request(Request.Builder().url(requestUrl).build())
.protocol(Protocol.HTTP_1_1)
.body(clearTextResponse.toResponseBody())
.build()

return mockk {
every { get(any<String>(), any()) } returns response
every {
this@mockk["performAction"](
any<String>(),
any<MastodonClient.Method>(),
any<Parameters>()
)
} returns response
}
}

fun mock(
jsonName: String,
maxId: String? = null,
Expand Down
Loading