Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions okhttp-sse/api/okhttp-sse.api
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
public abstract interface class okhttp3/sse/EventSource {
public static final field Companion Lokhttp3/sse/EventSource$Companion;
public abstract fun cancel ()V
public static fun create (Lokhttp3/Call;Lokhttp3/sse/EventSourceListener;)Lokhttp3/sse/EventSource;
public static fun create (Lokhttp3/Response;Lokhttp3/sse/EventSourceListener;)Lokhttp3/sse/EventSource;
public abstract fun request ()Lokhttp3/Request;
}

public final class okhttp3/sse/EventSource$Companion {
public final fun create (Lokhttp3/Call;Lokhttp3/sse/EventSourceListener;)Lokhttp3/sse/EventSource;
public final fun create (Lokhttp3/Response;Lokhttp3/sse/EventSourceListener;)Lokhttp3/sse/EventSource;
}

public abstract interface class okhttp3/sse/EventSource$Factory {
public static final field Companion Lokhttp3/sse/EventSource$Factory$Companion;
public static fun create (Lokhttp3/Call$Factory;)Lokhttp3/sse/EventSource$Factory;
public abstract fun newEventSource (Lokhttp3/Request;Lokhttp3/sse/EventSourceListener;)Lokhttp3/sse/EventSource;
}

public final class okhttp3/sse/EventSource$Factory$Companion {
public final fun create (Lokhttp3/Call$Factory;)Lokhttp3/sse/EventSource$Factory;
}

public abstract class okhttp3/sse/EventSourceListener {
public fun <init> ()V
public fun onClosed (Lokhttp3/sse/EventSource;)V
Expand Down
43 changes: 43 additions & 0 deletions okhttp-sse/src/main/kotlin/okhttp3/sse/EventSource.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package okhttp3.sse

import okhttp3.Call
import okhttp3.Request
import okhttp3.Response
import okhttp3.sse.internal.RealEventSource

interface EventSource {
/** Returns the original request that initiated this event source. */
Expand All @@ -37,5 +40,45 @@ interface EventSource {
request: Request,
listener: EventSourceListener,
): EventSource

companion object {

/**
* Wraps a [Call.Factory] into [EventSource.Factory].
*/
@JvmStatic
@JvmName("create")
fun Call.Factory.asEventSourceFactory(): Factory =
Comment thread
yschimke marked this conversation as resolved.
Factory { request, listener ->
val actualRequest =
if (request.header("Accept") == null) {
request.newBuilder().addHeader("Accept", "text/event-stream").build()
} else {
request
}

this.newCall(actualRequest).toEventSource(listener)
}
}
}

companion object {
/**
* Creates a new [EventSource] from the [Call] and immediately enqueue it.
*/
@JvmStatic
@JvmName("create")
fun Call.toEventSource(listener: EventSourceListener): EventSource {
Comment thread
yschimke marked this conversation as resolved.
Outdated
return RealEventSource(this, listener).also(this::enqueue)
}

/**
* Creates a new [EventSource] from the existing [Response].
*/
@JvmStatic
@JvmName("create")
fun Response.toEventSource(listener: EventSourceListener): EventSource {
return RealEventSource(this, listener)
}
}
}
8 changes: 6 additions & 2 deletions okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ abstract class EventSourceListener {
}

/**
* TODO description.
* Invoked when a new event has been sent to the client.
*
* @param id The `id` line of the event, might be null.
* @param type The `event` line of the event, might be null.
* @param data The `data` line of the event.
*/
open fun onEvent(
eventSource: EventSource,
Expand All @@ -40,7 +44,7 @@ abstract class EventSourceListener {
}

/**
* TODO description.
* Invoked when the HTTP connection has been closed normally.
*
* No further calls to this listener will be made.
*/
Expand Down
37 changes: 22 additions & 15 deletions okhttp-sse/src/main/kotlin/okhttp3/sse/EventSources.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package okhttp3.sse
import okhttp3.Call
import okhttp3.OkHttpClient
import okhttp3.Response
import okhttp3.sse.EventSource.Factory.Companion.asEventSourceFactory
import okhttp3.sse.internal.RealEventSource

object EventSources {
Expand All @@ -26,29 +27,35 @@ object EventSources {
level = DeprecationLevel.HIDDEN,
)
@JvmStatic
fun createFactory(client: OkHttpClient) = createFactory(client as Call.Factory)
fun createFactory(client: OkHttpClient) = client.asEventSourceFactory()

@Deprecated(
message = "Moved to extension function.",
replaceWith =
ReplaceWith(
expression = "callFactory.asEventSourceFactory()",
imports = ["okhttp3.sse.EventSource.Factory.Companion.asEventSourceFactory"],
),
level = DeprecationLevel.WARNING,
)
@JvmStatic
fun createFactory(callFactory: Call.Factory): EventSource.Factory =
EventSource.Factory { request, listener ->
val actualRequest =
if (request.header("Accept") == null) {
request.newBuilder().addHeader("Accept", "text/event-stream").build()
} else {
request
}

RealEventSource(actualRequest, listener).apply {
connect(callFactory)
}
}
callFactory.asEventSourceFactory()

@Deprecated(
message = "Moved to extension function.",
replaceWith =
ReplaceWith(
expression = "response.toEventSource(listener)",
Comment thread
yschimke marked this conversation as resolved.
Outdated
imports = ["okhttp3.sse.EventSource.Companion.toEventSource"],
),
level = DeprecationLevel.WARNING,
)
@JvmStatic
fun processResponse(
response: Response,
listener: EventSourceListener,
) {
val eventSource = RealEventSource(response.request, listener)
eventSource.processResponse(response)
RealEventSource(response, listener)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,30 @@ import okhttp3.internal.stripBody
import okhttp3.sse.EventSource
import okhttp3.sse.EventSourceListener

internal class RealEventSource(
internal class RealEventSource private constructor(
private val call: Call?,
private val request: Request,
private val listener: EventSourceListener,
) : EventSource,
ServerSentEventReader.Callback,
Callback {
private var call: Call? = null

@Volatile private var canceled = false
constructor(call: Call, listener: EventSourceListener) : this(call, call.request(), listener)

fun connect(callFactory: Call.Factory) {
call =
callFactory.newCall(request).apply {
enqueue(this@RealEventSource)
}
constructor(response: Response, listener: EventSourceListener) : this(null, response.request, listener) {
this.processResponse(response)
}

@Volatile private var canceled = false

override fun onResponse(
call: Call,
response: Response,
) {
processResponse(response)
}

fun processResponse(response: Response) {
private fun processResponse(response: Response) {
response.use {
if (!response.isSuccessful) {
listener.onFailure(this, null, response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import okhttp3.OkHttpClientTestRule
import okhttp3.RecordingEventListener
import okhttp3.Request
import okhttp3.sse.EventSource
import okhttp3.sse.EventSources.createFactory
import okhttp3.sse.EventSource.Factory.Companion.asEventSourceFactory
import okhttp3.testing.PlatformRule
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Tag
Expand Down Expand Up @@ -268,7 +268,7 @@ class EventSourceHttpTest {
builder.header("Accept", accept)
}
val request = builder.build()
val factory = createFactory(client)
val factory = client.asEventSourceFactory()
return factory.newEventSource(request, listener)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import mockwebserver3.MockWebServer
import mockwebserver3.junit5.StartStop
import okhttp3.OkHttpClientTestRule
import okhttp3.Request
import okhttp3.sse.EventSources.processResponse
import okhttp3.sse.EventSource.Companion.toEventSource
import okhttp3.testing.PlatformRule
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Tag
Expand Down Expand Up @@ -66,7 +66,7 @@ class EventSourcesHttpTest {
.url(server.url("/"))
.build()
val response = client.newCall(request).execute()
processResponse(response, listener)
response.toEventSource(listener)
listener.assertOpen()
listener.assertEvent(null, null, "hey")
listener.assertClose()
Expand All @@ -93,7 +93,7 @@ class EventSourcesHttpTest {
.url(server.url("/"))
.build()
val response = client.newCall(request).execute()
processResponse(response, listener)
response.toEventSource(listener)
listener.assertOpen()
listener.assertFailure("canceled")
}
Expand Down
Loading