diff --git a/okhttp-sse/api/okhttp-sse.api b/okhttp-sse/api/okhttp-sse.api index d1baa0735188..dbe1dae6fdee 100644 --- a/okhttp-sse/api/okhttp-sse.api +++ b/okhttp-sse/api/okhttp-sse.api @@ -1,12 +1,26 @@ public abstract interface class okhttp3/sse/EventSource { + public static final field Companion Lokhttp3/sse/EventSource$Companion; public abstract fun cancel ()V + public static fun enqueue (Lokhttp3/Call;Lokhttp3/sse/EventSourceListener;)Lokhttp3/sse/EventSource; + public static fun process (Lokhttp3/Response;Lokhttp3/sse/EventSourceListener;)V public abstract fun request ()Lokhttp3/Request; } +public final class okhttp3/sse/EventSource$Companion { + public final fun enqueue (Lokhttp3/Call;Lokhttp3/sse/EventSourceListener;)Lokhttp3/sse/EventSource; + public final fun process (Lokhttp3/Response;Lokhttp3/sse/EventSourceListener;)V +} + public abstract interface class okhttp3/sse/EventSource$Factory { + public static final field Companion Lokhttp3/sse/EventSource$Factory$Companion; + public static fun create (Lokhttp3/Call$Factory;)Lokhttp3/sse/EventSource$Factory; public abstract fun newEventSource (Lokhttp3/Request;Lokhttp3/sse/EventSourceListener;)Lokhttp3/sse/EventSource; } +public final class okhttp3/sse/EventSource$Factory$Companion { + public final fun create (Lokhttp3/Call$Factory;)Lokhttp3/sse/EventSource$Factory; +} + public abstract class okhttp3/sse/EventSourceListener { public fun ()V public fun onClosed (Lokhttp3/sse/EventSource;)V diff --git a/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSource.kt b/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSource.kt index 8235f17b0843..06ddc37abf81 100644 --- a/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSource.kt +++ b/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSource.kt @@ -15,7 +15,10 @@ */ package okhttp3.sse +import okhttp3.Call import okhttp3.Request +import okhttp3.Response +import okhttp3.sse.internal.RealEventSource interface EventSource { /** Returns the original request that initiated this event source. */ @@ -37,5 +40,40 @@ interface EventSource { request: Request, listener: EventSourceListener, ): EventSource + + companion object { + /** + * Wraps a [Call.Factory] into [EventSource.Factory]. + */ + @JvmStatic + @JvmName("create") + fun Call.Factory.asEventSourceFactory(): Factory = + Factory { request, listener -> + val actualRequest = + if (request.header("Accept") == null) { + request.newBuilder().addHeader("Accept", "text/event-stream").build() + } else { + request + } + + this.newCall(actualRequest).enqueueEventSource(listener) + } + } + } + + companion object { + /** + * Enqueues a [Call] and process it as [EventSource] with [listener]. + */ + @JvmStatic + @JvmName("enqueue") + fun Call.enqueueEventSource(listener: EventSourceListener): EventSource = RealEventSource(this, listener).also(this::enqueue) + + /** + * Processes the existing response with [listener]. + */ + @JvmStatic + @JvmName("process") + fun Response.processEventSource(listener: EventSourceListener) = RealEventSource(this, listener).processResponse(this) } } diff --git a/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt b/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt index 8c595b178f4f..1693a3bfe94d 100644 --- a/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt +++ b/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt @@ -29,7 +29,11 @@ abstract class EventSourceListener { } /** - * TODO description. + * Invoked when a new event has been sent to the client. + * + * @param id The `id` line of the event, might be null. + * @param type The `event` line of the event, might be null. + * @param data The `data` line of the event. */ open fun onEvent( eventSource: EventSource, @@ -40,7 +44,7 @@ abstract class EventSourceListener { } /** - * TODO description. + * Invoked when the HTTP connection has been closed normally. * * No further calls to this listener will be made. */ diff --git a/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSources.kt b/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSources.kt index 0725aab820ff..90c01842806c 100644 --- a/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSources.kt +++ b/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSources.kt @@ -18,7 +18,8 @@ package okhttp3.sse import okhttp3.Call import okhttp3.OkHttpClient import okhttp3.Response -import okhttp3.sse.internal.RealEventSource +import okhttp3.sse.EventSource.Companion.processEventSource +import okhttp3.sse.EventSource.Factory.Companion.asEventSourceFactory object EventSources { @Deprecated( @@ -26,29 +27,32 @@ object EventSources { level = DeprecationLevel.HIDDEN, ) @JvmStatic - fun createFactory(client: OkHttpClient) = createFactory(client as Call.Factory) + fun createFactory(client: OkHttpClient) = client.asEventSourceFactory() + @Deprecated( + message = "Moved to extension function.", + replaceWith = + ReplaceWith( + expression = "callFactory.asEventSourceFactory()", + imports = ["okhttp3.sse.EventSource.Factory.Companion.asEventSourceFactory"], + ), + level = DeprecationLevel.WARNING, + ) @JvmStatic - fun createFactory(callFactory: Call.Factory): EventSource.Factory = - EventSource.Factory { request, listener -> - val actualRequest = - if (request.header("Accept") == null) { - request.newBuilder().addHeader("Accept", "text/event-stream").build() - } else { - request - } - - RealEventSource(actualRequest, listener).apply { - connect(callFactory) - } - } + fun createFactory(callFactory: Call.Factory): EventSource.Factory = callFactory.asEventSourceFactory() + @Deprecated( + message = "Moved to extension function.", + replaceWith = + ReplaceWith( + expression = "response.processEventSource(listener)", + imports = ["okhttp3.sse.EventSource.Companion.processEventSource"], + ), + level = DeprecationLevel.WARNING, + ) @JvmStatic fun processResponse( response: Response, listener: EventSourceListener, - ) { - val eventSource = RealEventSource(response.request, listener) - eventSource.processResponse(response) - } + ): Unit = response.processEventSource(listener) } diff --git a/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt b/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt index 864eb19b66da..3611fed83e7f 100644 --- a/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt +++ b/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt @@ -25,22 +25,18 @@ import okhttp3.internal.stripBody import okhttp3.sse.EventSource import okhttp3.sse.EventSourceListener -internal class RealEventSource( +internal class RealEventSource private constructor( + private val call: Call?, private val request: Request, private val listener: EventSourceListener, ) : EventSource, ServerSentEventReader.Callback, Callback { - private var call: Call? = null + constructor(call: Call, listener: EventSourceListener) : this(call, call.request(), listener) - @Volatile private var canceled = false + constructor(response: Response, listener: EventSourceListener) : this(null, response.request, listener) - fun connect(callFactory: Call.Factory) { - call = - callFactory.newCall(request).apply { - enqueue(this@RealEventSource) - } - } + @Volatile private var canceled = false override fun onResponse( call: Call, @@ -49,7 +45,7 @@ internal class RealEventSource( processResponse(response) } - fun processResponse(response: Response) { + internal fun processResponse(response: Response) { response.use { if (!response.isSuccessful) { listener.onFailure(this, null, response) diff --git a/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/ServerSentEventReader.kt b/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/ServerSentEventReader.kt index c4933eb23378..a2a3fbb03540 100644 --- a/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/ServerSentEventReader.kt +++ b/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/ServerSentEventReader.kt @@ -22,7 +22,7 @@ import okio.BufferedSource import okio.ByteString.Companion.encodeUtf8 import okio.Options -class ServerSentEventReader( +internal class ServerSentEventReader( private val source: BufferedSource, private val callback: Callback, ) { @@ -119,7 +119,7 @@ class ServerSentEventReader( } companion object { - val options = + private val options = Options.of( // 0 "\r\n".encodeUtf8(), diff --git a/okhttp-sse/src/test/java/okhttp3/sse/internal/EventSourceFactoryTest.java b/okhttp-sse/src/test/java/okhttp3/sse/internal/EventSourceFactoryTest.java new file mode 100644 index 000000000000..7fd51be47d06 --- /dev/null +++ b/okhttp-sse/src/test/java/okhttp3/sse/internal/EventSourceFactoryTest.java @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2018 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.sse.internal; + +import mockwebserver3.MockResponse; +import mockwebserver3.MockWebServer; +import mockwebserver3.junit5.StartStop; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.sse.EventSource; +import okhttp3.sse.EventSourceListener; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class EventSourceFactoryTest { + + @StartStop + private final MockWebServer server = new MockWebServer(); + + @Test + public void testEventSourceFactory() throws Exception { + OkHttpClient client = new OkHttpClient(); + EventSource.Factory factory = EventSource.Factory.create(client); + server.enqueue( + new MockResponse.Builder() + .body("data: hello\n\n") + .setHeader("content-type", "text/event-stream") + .build() + ); + Request request = new Request.Builder().url(server.url("/")).build(); + CompletableFuture future = new CompletableFuture<>(); + factory.newEventSource(request, new EventSourceListener() { + @Override + public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) { + try { + assertEquals("text/event-stream", response.request().header("Accept")); + } catch (Exception e) { + future.completeExceptionally(e); + } + } + + @Override + public void onEvent(@NotNull EventSource eventSource, @Nullable String id, @Nullable String type, @NotNull String data) { + try { + assertEquals("hello", data); + future.complete(null); + } catch (Exception e) { + future.completeExceptionally(e); + } + } + + @Override + public void onClosed(@NotNull EventSource eventSource) { + future.completeExceptionally(new IllegalStateException("closed")); + } + + @Override + public void onFailure(@NotNull EventSource eventSource, @Nullable Throwable t, @Nullable Response response) { + future.completeExceptionally(t == null ? new NullPointerException() : t); + } + }); + future.get(); + } + +} diff --git a/okhttp-sse/src/test/java/okhttp3/sse/internal/EventSourceHttpTest.kt b/okhttp-sse/src/test/java/okhttp3/sse/internal/EventSourceHttpTest.kt index 75d2e67753e8..57499fc7a584 100644 --- a/okhttp-sse/src/test/java/okhttp3/sse/internal/EventSourceHttpTest.kt +++ b/okhttp-sse/src/test/java/okhttp3/sse/internal/EventSourceHttpTest.kt @@ -26,7 +26,7 @@ import okhttp3.OkHttpClientTestRule import okhttp3.RecordingEventListener import okhttp3.Request import okhttp3.sse.EventSource -import okhttp3.sse.EventSources.createFactory +import okhttp3.sse.EventSource.Factory.Companion.asEventSourceFactory import okhttp3.testing.PlatformRule import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Tag @@ -268,7 +268,7 @@ class EventSourceHttpTest { builder.header("Accept", accept) } val request = builder.build() - val factory = createFactory(client) + val factory = client.asEventSourceFactory() return factory.newEventSource(request, listener) } } diff --git a/okhttp-sse/src/test/java/okhttp3/sse/internal/EventSourcesHttpTest.kt b/okhttp-sse/src/test/java/okhttp3/sse/internal/EventSourcesHttpTest.kt index 22573aed1711..fe85623590f4 100644 --- a/okhttp-sse/src/test/java/okhttp3/sse/internal/EventSourcesHttpTest.kt +++ b/okhttp-sse/src/test/java/okhttp3/sse/internal/EventSourcesHttpTest.kt @@ -20,7 +20,7 @@ import mockwebserver3.MockWebServer import mockwebserver3.junit5.StartStop import okhttp3.OkHttpClientTestRule import okhttp3.Request -import okhttp3.sse.EventSources.processResponse +import okhttp3.sse.EventSource.Companion.processEventSource import okhttp3.testing.PlatformRule import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Tag @@ -66,7 +66,7 @@ class EventSourcesHttpTest { .url(server.url("/")) .build() val response = client.newCall(request).execute() - processResponse(response, listener) + response.processEventSource(listener) listener.assertOpen() listener.assertEvent(null, null, "hey") listener.assertClose() @@ -93,7 +93,7 @@ class EventSourcesHttpTest { .url(server.url("/")) .build() val response = client.newCall(request).execute() - processResponse(response, listener) + response.processEventSource(listener) listener.assertOpen() listener.assertFailure("canceled") }