diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index bb972f0362..aa32cfcefe 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -49,6 +49,7 @@ kotlinx-serialization-core = { module = "org.jetbrains.kotlinx:kotlinx-serializa kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlinx-serialization" } kotlinx-serialization-proto = { module = "org.jetbrains.kotlinx:kotlinx-serialization-protobuf", version.ref = "kotlinx-serialization" } okhttp-client = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp" } +okhttp-sse = { module = "com.squareup.okhttp3:okhttp-sse", version.ref = "okhttp" } okhttp-loggingInterceptor = { module = "com.squareup.okhttp3:logging-interceptor", version.ref = "okhttp" } okhttp-mockwebserver = { module = "com.squareup.okhttp3:mockwebserver", version.ref = "okhttp" } junit = { module = "junit:junit", version = "4.13.2" } diff --git a/retrofit-adapters/sse/core/build.gradle b/retrofit-adapters/sse/core/build.gradle new file mode 100644 index 0000000000..b6f72dc012 --- /dev/null +++ b/retrofit-adapters/sse/core/build.gradle @@ -0,0 +1,21 @@ +apply plugin: 'java-library' +apply plugin: 'org.jetbrains.kotlin.jvm' +apply plugin: 'com.vanniktech.maven.publish' +apply plugin: 'org.jetbrains.dokka' + +dependencies { + api projects.retrofit + api libs.okhttp.sse + compileOnly libs.findBugsAnnotations + + testImplementation libs.junit + testImplementation libs.truth + testImplementation libs.okhttp.mockwebserver + testImplementation projects.retrofitConverters.scalars +} + +jar { + manifest { + attributes 'Automatic-Module-Name': 'retrofit2.adapter.sse-core' + } +} diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSource.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSource.kt new file mode 100644 index 0000000000..c9d741f78b --- /dev/null +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSource.kt @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse + +import okhttp3.Request + +interface EventSource { + + fun request(): Request + + fun cancel() + + fun subscribe(callback: SseCallback) +} diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt new file mode 100644 index 0000000000..79b1ef2f72 --- /dev/null +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse + +import java.lang.reflect.ParameterizedType +import java.lang.reflect.Type +import retrofit2.CallAdapter +import retrofit2.Retrofit +import retrofit2.adapter.sse.internal.EventSourceCallAdapter +import retrofit2.http.GET +import retrofit2.http.Streaming + +object EventSourceCallAdapterFactory : CallAdapter.Factory() { + + override fun get( + returnType: Type, + annotations: Array, + retrofit: Retrofit, + ): CallAdapter<*, *>? { + if (getRawType(returnType) != EventSource::class.java) { + return null + } + + if (returnType !is ParameterizedType) { + error( + "EventSource return type must be parameterized as EventSource" + + " or EventSource", + ) + } + + if (annotations.none { it is Streaming }) { + error("SSE endpoint must be annotated with @Streaming") + } + + if (annotations.none { it is GET }) { + error("SSE endpoint must use @GET method") + } + + val idType = getParameterUpperBound(0, returnType) + val typeType = getParameterUpperBound(1, returnType) + val dataType = getParameterUpperBound(2, returnType) + + return EventSourceCallAdapter( + retrofit, + idType, + typeType, + dataType, + ) + } +} diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/ServerSentEvent.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/ServerSentEvent.kt new file mode 100644 index 0000000000..615ddcdde4 --- /dev/null +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/ServerSentEvent.kt @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse + +/** + * A server-sent event. + */ +data class ServerSentEvent( + @get:JvmName("id") + val id: ID?, + + @get:JvmName("type") + val type: TYPE?, + + @get:JvmName("data") + val data: DATA, +) diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/SseCallback.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/SseCallback.kt new file mode 100644 index 0000000000..722c85b82b --- /dev/null +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/SseCallback.kt @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse + +interface SseCallback { + + fun onOpen(eventSource: EventSource) { + } + + fun onEvent(eventSource: EventSource, id: ID?, type: TYPE?, data: DATA) { + } + + fun onClosed(eventSource: EventSource) { + } + + fun onFailure(eventSource: EventSource, t: Throwable?) { + } +} diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/EventSourceCallAdapter.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/EventSourceCallAdapter.kt new file mode 100644 index 0000000000..b521207bb2 --- /dev/null +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/EventSourceCallAdapter.kt @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse.internal + +import java.lang.reflect.Type +import okhttp3.ResponseBody +import retrofit2.Call +import retrofit2.CallAdapter +import retrofit2.Converter +import retrofit2.Retrofit +import retrofit2.adapter.sse.EventSource + +private val EMPTY_ARRAY = emptyArray() + +class EventSourceCallAdapter( + retrofit: Retrofit, + private val idType: Type, + private val typeType: Type, + private val dataType: Type, +) : CallAdapter> { + + private val idConverter: Converter = retrofit.responseBodyConverter(idType, EMPTY_ARRAY) + private val typeConverter: Converter = retrofit.responseBodyConverter(typeType, EMPTY_ARRAY) + private val dataConverter: Converter = retrofit.responseBodyConverter(dataType, EMPTY_ARRAY) + + override fun responseType(): Type = ResponseBody::class.java + + override fun adapt(call: Call): EventSource { + return RealEventSource(idType, typeType, dataType, idConverter, typeConverter, dataConverter, call) + } +} diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/RealEventSource.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/RealEventSource.kt new file mode 100644 index 0000000000..0fff69ed72 --- /dev/null +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/RealEventSource.kt @@ -0,0 +1,124 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse.internal + +import java.lang.reflect.ParameterizedType +import java.lang.reflect.Type +import java.lang.reflect.WildcardType +import okhttp3.Request +import okhttp3.ResponseBody +import okhttp3.ResponseBody.Companion.toResponseBody +import okhttp3.sse.EventSourceListener +import okhttp3.sse.EventSources +import retrofit2.Call +import retrofit2.Converter +import retrofit2.Response +import retrofit2.adapter.sse.EventSource +import retrofit2.adapter.sse.SseCallback + +@Suppress("NOTHING_TO_INLINE") +private inline fun conversionError(value: T, type: Type): Nothing = error("Failed to convert $value to $type, actual type is ${value.javaClass}") + +private fun Response.asSse(listener: EventSourceListener) { + val okhttpResponse = raw().newBuilder().body(body() ?: error("Response body is null")).build() + EventSources.processResponse(okhttpResponse, listener) +} + +private fun Type.acceptsString(): Boolean = + when (this) { + String::class.java -> true + Object::class.java -> true + CharSequence::class.java -> true + Comparable::class.java -> true + is ParameterizedType -> rawType === Comparable::class.java && actualTypeArguments[0].acceptsString() + is WildcardType -> upperBounds[0].acceptsString() + else -> false + } + +internal class RealEventSource( + private val idType: Type, + private val typeType: Type, + private val dataType: Type, + private val idConverter: Converter, + private val typeConverter: Converter, + private val dataConverter: Converter, + private val call: Call, +) : EventSource { + override fun request(): Request = call.request() + + override fun cancel() = call.cancel() + + override fun subscribe(callback: SseCallback) { + call.enqueue( + object : retrofit2.Callback { + override fun onResponse(call: Call, response: Response) { + response.asSse( + object : EventSourceListener() { + override fun onOpen(eventSource: okhttp3.sse.EventSource, response: okhttp3.Response) { + callback.onOpen(this@RealEventSource) + } + + override fun onEvent(eventSource: okhttp3.sse.EventSource, id: String?, type: String?, data: String) { + val convertedId = convertId(id) + val convertedType = convertType(type) + val convertedData = convertData(data) + callback.onEvent(this@RealEventSource, convertedId, convertedType, convertedData) + } + + override fun onClosed(eventSource: okhttp3.sse.EventSource) { + callback.onClosed(this@RealEventSource) + } + + override fun onFailure(eventSource: okhttp3.sse.EventSource, t: Throwable?, response: okhttp3.Response?) { + callback.onFailure(this@RealEventSource, t) + } + }, + ) + } + + override fun onFailure(call: Call, t: Throwable) { + callback.onFailure(this@RealEventSource, t) + } + }, + ) + } + + private fun convertId(id: String?): ID? { + @Suppress("UNCHECKED_CAST") + return when { + idType.acceptsString() -> id as ID? + id != null -> idConverter.convert(id.toResponseBody()) ?: conversionError(id, idType) + else -> null + } + } + + private fun convertType(type: String?): TYPE? { + @Suppress("UNCHECKED_CAST") + return when { + typeType.acceptsString() -> type as TYPE? + type != null -> typeConverter.convert(type.toResponseBody()) ?: conversionError(type, typeType) + else -> null + } + } + + private fun convertData(data: String): DATA { + @Suppress("UNCHECKED_CAST") + return when { + dataType.acceptsString() -> data as DATA + else -> dataConverter.convert(data.toResponseBody()) ?: conversionError(data, dataType) + } + } +} diff --git a/retrofit-adapters/sse/core/src/test/java/retrofit2/adapter/sse/EventSourceCallAdapterFactoryTest.java b/retrofit-adapters/sse/core/src/test/java/retrofit2/adapter/sse/EventSourceCallAdapterFactoryTest.java new file mode 100644 index 0000000000..560670a736 --- /dev/null +++ b/retrofit-adapters/sse/core/src/test/java/retrofit2/adapter/sse/EventSourceCallAdapterFactoryTest.java @@ -0,0 +1,114 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse; + +import static com.google.common.truth.Truth.assertThat; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Retrofit; +import retrofit2.converter.scalars.ScalarsConverterFactory; +import retrofit2.http.GET; +import retrofit2.http.Streaming; + +public class EventSourceCallAdapterFactoryTest { + + @Rule public final MockWebServer server = new MockWebServer(); + + interface Service { + @Streaming + @GET("/") + EventSource sse(); + } + + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(ScalarsConverterFactory.create()) + .addCallAdapterFactory(EventSourceCallAdapterFactory.INSTANCE) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void simpleEvents() throws Exception { + MockResponse mockResponse = + new MockResponse() + .setHeader("Content-Type", "text/event-stream") + .setBody("id: 1\nevent: type1\ndata: foo\n\nid: 2\ndata: bar\n\n"); + server.enqueue(mockResponse); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + service + .sse() + .subscribe( + new SseCallback() { + private final AtomicInteger count = new AtomicInteger(0); + + @Override + public void onOpen(@NotNull EventSource eventSource) { + assertThat(count.get()).isEqualTo(0); + } + + @Override + public void onEvent( + @NotNull EventSource eventSource, + @Nullable Integer id, + @Nullable String type, + @NotNull String data) { + switch (count.incrementAndGet()) { + case 1: + assertThat(id).isEqualTo(1); + assertThat(type).isEqualTo("type1"); + assertThat(data).isEqualTo("foo"); + break; + case 2: + assertThat(id).isEqualTo(2); + assertThat(type).isEqualTo(null); + assertThat(data).isEqualTo("bar"); + break; + } + } + + @Override + public void onClosed(@NotNull EventSource eventSource) { + completableFuture.complete(null); + } + + @Override + public void onFailure( + @NotNull EventSource eventSource, + @Nullable Throwable t) { + completableFuture.completeExceptionally(t); + } + }); + + completableFuture.get(5, TimeUnit.SECONDS); + } +} diff --git a/retrofit-adapters/sse/juc-flow/build.gradle b/retrofit-adapters/sse/juc-flow/build.gradle new file mode 100644 index 0000000000..2eac5c98d2 --- /dev/null +++ b/retrofit-adapters/sse/juc-flow/build.gradle @@ -0,0 +1,19 @@ +apply plugin: 'java-library' +apply plugin: 'org.jetbrains.kotlin.jvm' +apply plugin: 'com.vanniktech.maven.publish' +apply plugin: 'org.jetbrains.dokka' + +dependencies { + api projects.retrofit + api projects.retrofitAdapters.sse.core + compileOnly libs.findBugsAnnotations + + testImplementation libs.junit + testImplementation libs.truth + testImplementation libs.okhttp.mockwebserver + testImplementation projects.retrofitConverters.scalars +} + +kotlin { + jvmToolchain 9 +} diff --git a/retrofit-adapters/sse/juc-flow/gradle.properties b/retrofit-adapters/sse/juc-flow/gradle.properties new file mode 100644 index 0000000000..ce4c81fe2a --- /dev/null +++ b/retrofit-adapters/sse/juc-flow/gradle.properties @@ -0,0 +1,3 @@ +POM_ARTIFACT_ID=adapter-sse-juc-flow +POM_NAME=Adapter: SSE JUC flow +POM_DESCRIPTION=A Retrofit CallAdapter for server-sent event (SSE) with Java 9's Flow. diff --git a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt new file mode 100644 index 0000000000..75fc036801 --- /dev/null +++ b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse.java9 + +import java.lang.reflect.Type +import java.util.concurrent.Executor +import java.util.concurrent.Flow +import java.util.concurrent.SubmissionPublisher +import okhttp3.ResponseBody +import retrofit2.Call +import retrofit2.CallAdapter +import retrofit2.adapter.sse.EventSource +import retrofit2.adapter.sse.ServerSentEvent +import retrofit2.adapter.sse.SseCallback + +internal class SseJucFlowCallAdapter( + private val executor: Executor, + private val maxBufferCapacity: Int, + private val delegation: CallAdapter>, +) : CallAdapter>> { + + override fun responseType(): Type = delegation.responseType() + + override fun adapt( + call: Call, + ): Flow.Publisher> { + val delegate = delegation.adapt(call) + return object : SubmissionPublisher>(executor, maxBufferCapacity) { + override fun subscribe(subscriber: Flow.Subscriber>?) { + super.subscribe(subscriber) + delegate.subscribe(object : SseCallback { + override fun onEvent( + eventSource: EventSource, + id: ID?, + type: TYPE?, + data: DATA, + ) { + submit(ServerSentEvent(id, type, data)) + } + + override fun onClosed(eventSource: EventSource) { + close() + } + + override fun onFailure(eventSource: EventSource, t: Throwable?) { + closeExceptionally(t ?: RuntimeException()) // TODO exception type + } + }) + } + + override fun close() { + delegate.cancel() + super.close() + } + } + } + +} diff --git a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt new file mode 100644 index 0000000000..182b89c688 --- /dev/null +++ b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse.java9 + +import java.lang.reflect.ParameterizedType +import java.lang.reflect.Type +import java.util.concurrent.Executor +import java.util.concurrent.Executors +import java.util.concurrent.Flow +import java.util.concurrent.ForkJoinPool +import retrofit2.CallAdapter +import retrofit2.Retrofit +import retrofit2.adapter.sse.EventSource +import retrofit2.adapter.sse.ServerSentEvent +import retrofit2.adapter.sse.internal.EventSourceCallAdapter +import retrofit2.http.GET +import retrofit2.http.Streaming + +class SseJucFlowCallAdapterFactory private constructor( + private val executor: Executor?, + private val maxBufferCapacity: Int, +) : CallAdapter.Factory() { + + companion object { + @JvmStatic + @JvmOverloads + fun create(executor: Executor? = null, maxBufferCapacity: Int = Flow.defaultBufferSize()) = + SseJucFlowCallAdapterFactory(executor, maxBufferCapacity) + } + + override fun get( + returnType: Type, + annotations: Array, + retrofit: Retrofit, + ): CallAdapter<*, *>? { + if (getRawType(returnType) != Flow.Publisher::class.java) { + return null + } + + if (returnType !is ParameterizedType) { + error( + "Flow.Publisher return type must be parameterized as Flow.Publisher or Flow.Publisher", + ) + } + + val innerType = getParameterUpperBound(0, returnType) + + if (getRawType(innerType) != ServerSentEvent::class.java) { + return null + } + + if (innerType !is ParameterizedType) { + error( + "ServerSentEvent must be parameterized as ServerSentEvent" + + " or ServerSentEvent", + ) + } + + if (annotations.none { it is Streaming }) { + error("SSE endpoint must be annotated with @Streaming") + } + + if (annotations.none { it is GET }) { + error("SSE endpoint must use @GET method") + } + + val idType = getParameterUpperBound(0, innerType) + val typeType = getParameterUpperBound(1, innerType) + val dataType = getParameterUpperBound(2, innerType) + + val returnType = object : ParameterizedType { + override fun getRawType(): Type = EventSource::class.java + override fun getOwnerType(): Type? = null + override fun getActualTypeArguments(): Array = arrayOf(idType, typeType, dataType) + } + + val delegation = runCatching { + retrofit.callAdapter(returnType, annotations) as? EventSourceCallAdapter<*, *, *> + }.getOrNull() ?: return null + + return SseJucFlowCallAdapter( + executorOrDefault(retrofit), + maxBufferCapacity, + delegation, + ) + } + + private fun executorOrDefault(retrofit: Retrofit): Executor = + executor ?: retrofit.callbackExecutor() + ?: ForkJoinPool.commonPool().takeIf { ForkJoinPool.getCommonPoolParallelism() > 1 } + ?: Executors.newCachedThreadPool() +} diff --git a/retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java b/retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java new file mode 100644 index 0000000000..da03277301 --- /dev/null +++ b/retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java @@ -0,0 +1,110 @@ +/* + * Copyright (C) 2016 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse.java9; + +import static com.google.common.truth.Truth.assertThat; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Retrofit; +import retrofit2.adapter.sse.EventSourceCallAdapterFactory; +import retrofit2.adapter.sse.ServerSentEvent; +import retrofit2.converter.scalars.ScalarsConverterFactory; +import retrofit2.http.GET; +import retrofit2.http.Streaming; + +public final class SseJucFlowCallAdapterFactoryTest { + + @Rule public final MockWebServer server = new MockWebServer(); + + interface Service { + @Streaming + @GET("/") + Flow.Publisher> sse(); + } + + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(ScalarsConverterFactory.create()) + .addCallAdapterFactory(SseJucFlowCallAdapterFactory.create()) + .addCallAdapterFactory(EventSourceCallAdapterFactory.INSTANCE) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void simpleEvents() throws Exception { + MockResponse mockResponse = + new MockResponse() + .setHeader("Content-Type", "text/event-stream") + .setBody("id: 1\nevent: type1\ndata: foo\n\nid: 2\ndata: bar\n\n"); + server.enqueue(mockResponse); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + service + .sse() + .subscribe( + new Flow.Subscriber<>() { + private final AtomicInteger count = new AtomicInteger(0); + + @Override + public void onSubscribe(Flow.Subscription subscription) { + subscription.request(2); + } + + @Override + public void onNext(ServerSentEvent serverSentEvent) { + switch (count.incrementAndGet()) { + case 1: + assertThat(serverSentEvent.id()).isEqualTo(1); + assertThat(serverSentEvent.type()).isEqualTo("type1"); + assertThat(serverSentEvent.data()).isEqualTo("foo"); + break; + case 2: + assertThat(serverSentEvent.id()).isEqualTo(2); + assertThat(serverSentEvent.type()).isEqualTo(null); + assertThat(serverSentEvent.data()).isEqualTo("bar"); + break; + } + } + + @Override + public void onError(Throwable throwable) { + completableFuture.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + completableFuture.complete(null); + } + }); + + completableFuture.get(5, TimeUnit.SECONDS); + } +} diff --git a/retrofit-adapters/sse/ktx-flow/build.gradle b/retrofit-adapters/sse/ktx-flow/build.gradle new file mode 100644 index 0000000000..070c24a3da --- /dev/null +++ b/retrofit-adapters/sse/ktx-flow/build.gradle @@ -0,0 +1,17 @@ +apply plugin: 'java-library' +apply plugin: 'org.jetbrains.kotlin.jvm' +apply plugin: 'com.vanniktech.maven.publish' +apply plugin: 'org.jetbrains.dokka' + +dependencies { + api projects.retrofit + api projects.retrofitAdapters.sse.core + api libs.kotlinx.coroutines + compileOnly libs.findBugsAnnotations + + testImplementation libs.junit + testImplementation libs.truth + testImplementation libs.okhttp.mockwebserver + testImplementation projects.retrofitConverters.scalars + testImplementation projects.retrofitConverters.gson +} diff --git a/retrofit-adapters/sse/ktx-flow/gradle.properties b/retrofit-adapters/sse/ktx-flow/gradle.properties new file mode 100644 index 0000000000..96cd0fd4e1 --- /dev/null +++ b/retrofit-adapters/sse/ktx-flow/gradle.properties @@ -0,0 +1,3 @@ +POM_ARTIFACT_ID=adapter-sse-kotlinx-flow +POM_NAME=Adapter: SSE kotlinx flow +POM_DESCRIPTION=A Retrofit CallAdapter for server-sent event (SSE) with kotlinx-coroutine's Flow. diff --git a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt new file mode 100644 index 0000000000..a0549fc27f --- /dev/null +++ b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse.kotlinx + +import java.lang.reflect.Type +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.channels.trySendBlocking +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow +import okhttp3.ResponseBody +import retrofit2.Call +import retrofit2.CallAdapter +import retrofit2.adapter.sse.EventSource +import retrofit2.adapter.sse.ServerSentEvent +import retrofit2.adapter.sse.SseCallback + +internal class SseKtxFlowCallAdapter( + private val delegation: CallAdapter>, +) : CallAdapter>> { + + override fun responseType(): Type = delegation.responseType() + + override fun adapt( + call: Call, + ): Flow> { + val delegate = delegation.adapt(call) + return callbackFlow { + delegate.subscribe( + object : SseCallback { + override fun onEvent( + eventSource: EventSource, + id: ID?, + type: TYPE?, + data: DATA, + ) { + trySendBlocking(ServerSentEvent(id, type, data)) + } + + override fun onClosed(eventSource: EventSource) { + close() + } + + override fun onFailure(eventSource: EventSource, t: Throwable?) { + close(t ?: RuntimeException()) // TODO exception type + } + }, + ) + + awaitClose(call::cancel) + } + } + +} diff --git a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt new file mode 100644 index 0000000000..0113630ec0 --- /dev/null +++ b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse.kotlinx + +import java.lang.reflect.ParameterizedType +import java.lang.reflect.Type +import kotlinx.coroutines.flow.Flow +import retrofit2.CallAdapter +import retrofit2.Retrofit +import retrofit2.adapter.sse.EventSource +import retrofit2.adapter.sse.ServerSentEvent +import retrofit2.adapter.sse.internal.EventSourceCallAdapter +import retrofit2.http.GET +import retrofit2.http.Streaming + +object SseKtxFlowCallAdapterFactory : CallAdapter.Factory() { + + override fun get( + returnType: Type, + annotations: Array, + retrofit: Retrofit, + ): CallAdapter<*, *>? { + if (getRawType(returnType) != Flow::class.java) { + return null + } + + if (returnType !is ParameterizedType) { + error( + "Flow return type must be parameterized as Flow or Flow", + ) + } + + val innerType = getParameterUpperBound(0, returnType) + + if (getRawType(innerType) != ServerSentEvent::class.java) { + return null + } + + if (innerType !is ParameterizedType) { + error( + "ServerSentEvent must be parameterized as ServerSentEvent" + + " or ServerSentEvent", + ) + } + + if (annotations.none { it is Streaming }) { + error("SSE endpoint must be annotated with @Streaming") + } + + if (annotations.none { it is GET }) { + error("SSE endpoint must use @GET method") + } + + val idType = getParameterUpperBound(0, innerType) + val typeType = getParameterUpperBound(1, innerType) + val dataType = getParameterUpperBound(2, innerType) + + val returnType = object : ParameterizedType { + override fun getRawType(): Type = EventSource::class.java + override fun getOwnerType(): Type? = null + override fun getActualTypeArguments(): Array = arrayOf(idType, typeType, dataType) + } + + val delegation = runCatching { + retrofit.callAdapter(returnType, annotations) as? EventSourceCallAdapter<*, *, *> + }.getOrNull() ?: return null + + return SseKtxFlowCallAdapter(delegation) + } +} diff --git a/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt b/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt new file mode 100644 index 0000000000..52bdd332a8 --- /dev/null +++ b/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse.kotlinx + +import com.google.common.truth.Truth.assertThat +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.runBlocking +import okhttp3.mockwebserver.MockResponse +import okhttp3.mockwebserver.MockWebServer +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import retrofit2.Retrofit +import retrofit2.adapter.sse.EventSourceCallAdapterFactory +import retrofit2.adapter.sse.ServerSentEvent +import retrofit2.converter.gson.GsonConverterFactory +import retrofit2.converter.scalars.ScalarsConverterFactory +import retrofit2.create +import retrofit2.http.GET +import retrofit2.http.Streaming + +class SseKtxFlowCallAdapterFactoryTest { + + @Rule + @JvmField + val server: MockWebServer = MockWebServer() + + data class EventData( + val data: String, + ) + + interface Service { + @Streaming + @GET("/") + fun sse(): Flow> + } + + private lateinit var service: Service + + @Before + fun setUp() { + val retrofit = + Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(ScalarsConverterFactory.create()) + .addConverterFactory(GsonConverterFactory.create()) + .addCallAdapterFactory(SseKtxFlowCallAdapterFactory) + .addCallAdapterFactory(EventSourceCallAdapterFactory) + .build() + service = retrofit.create() + } + + @Test + fun simpleEvents() = runBlocking { + val mockResponse = MockResponse() + .setHeader("Content-Type", "text/event-stream") + .setBody( + """ + |id: 1 + |event: TYPE1 + |data: {"data":"foo"} + | + |id: 2 + |data: {"data":"bar"} + | + """.trimMargin(), + ) + server.enqueue(mockResponse) + + var count = 0 + service.sse().collect { serverSentEvent -> + when (++count) { + 1 -> { + assertThat(serverSentEvent.id).isEqualTo(1) + assertThat(serverSentEvent.type).isEqualTo("TYPE1") + assertThat(serverSentEvent.data.data).isEqualTo("foo") + } + 2 -> { + assertThat(serverSentEvent.id).isEqualTo(2) + assertThat(serverSentEvent.type).isEqualTo(null) + assertThat(serverSentEvent.data.data).isEqualTo("bar") + } + } + } + } +} diff --git a/settings.gradle b/settings.gradle index e94afa5343..6ab03cf3d9 100644 --- a/settings.gradle +++ b/settings.gradle @@ -30,6 +30,9 @@ include ':retrofit-adapters:rxjava' include ':retrofit-adapters:rxjava2' include ':retrofit-adapters:rxjava3' include ':retrofit-adapters:scala' +include ':retrofit-adapters:sse:core' +include ':retrofit-adapters:sse:juc-flow' +include ':retrofit-adapters:sse:ktx-flow' include ':retrofit-converters:gson' include ':retrofit-converters:guava'