diff --git a/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/CallEnqueueObservable.java b/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/CallEnqueueObservable.java index a6bac4e8b0..1fbc37f274 100644 --- a/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/CallEnqueueObservable.java +++ b/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/CallEnqueueObservable.java @@ -16,7 +16,7 @@ package retrofit2.adapter.rxjava2; import io.reactivex.Observable; -import io.reactivex.Observer; +import io.reactivex.ObservableEmitter; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.CompositeException; import io.reactivex.exceptions.Exceptions; @@ -25,33 +25,29 @@ import retrofit2.Callback; import retrofit2.Response; -final class CallEnqueueObservable extends Observable> { - private final Call originalCall; +final class CallEnqueueObservable { - CallEnqueueObservable(Call originalCall) { - this.originalCall = originalCall; - } - - @Override - protected void subscribeActual(Observer> observer) { - // Since Call is a one-shot type, clone it for each new observer. - Call call = originalCall.clone(); - CallCallback callback = new CallCallback<>(call, observer); - observer.onSubscribe(callback); - if (!callback.isDisposed()) { - call.enqueue(callback); - } + public static Observable> create(Call originalCall) { + return Observable.create(emitter -> { + // Since Call is a one-shot type, clone it for each new observer. + Call call = originalCall.clone(); + CallCallback callback = new CallCallback<>(call, emitter); + emitter.setDisposable(callback); + if (!callback.isDisposed()) { + call.enqueue(callback); + } + }); } private static final class CallCallback implements Disposable, Callback { private final Call call; - private final Observer> observer; + private final ObservableEmitter> emitter; private volatile boolean disposed; boolean terminated = false; - CallCallback(Call call, Observer> observer) { + CallCallback(Call call, ObservableEmitter> emitter) { this.call = call; - this.observer = observer; + this.emitter = emitter; } @Override @@ -59,11 +55,11 @@ public void onResponse(Call call, Response response) { if (disposed) return; try { - observer.onNext(response); + emitter.onNext(response); if (!disposed) { terminated = true; - observer.onComplete(); + emitter.onComplete(); } } catch (Throwable t) { Exceptions.throwIfFatal(t); @@ -71,7 +67,7 @@ public void onResponse(Call call, Response response) { RxJavaPlugins.onError(t); } else if (!disposed) { try { - observer.onError(t); + emitter.onError(t); } catch (Throwable inner) { Exceptions.throwIfFatal(inner); RxJavaPlugins.onError(new CompositeException(t, inner)); @@ -85,7 +81,7 @@ public void onFailure(Call call, Throwable t) { if (call.isCanceled()) return; try { - observer.onError(t); + emitter.onError(t); } catch (Throwable inner) { Exceptions.throwIfFatal(inner); RxJavaPlugins.onError(new CompositeException(t, inner)); diff --git a/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/CallExecuteObservable.java b/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/CallExecuteObservable.java index 6915e309e4..2cfad98465 100644 --- a/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/CallExecuteObservable.java +++ b/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/CallExecuteObservable.java @@ -16,7 +16,6 @@ package retrofit2.adapter.rxjava2; import io.reactivex.Observable; -import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.CompositeException; import io.reactivex.exceptions.Exceptions; @@ -24,46 +23,42 @@ import retrofit2.Call; import retrofit2.Response; -final class CallExecuteObservable extends Observable> { - private final Call originalCall; +final class CallExecuteObservable { - CallExecuteObservable(Call originalCall) { - this.originalCall = originalCall; - } - - @Override - protected void subscribeActual(Observer> observer) { - // Since Call is a one-shot type, clone it for each new observer. - Call call = originalCall.clone(); - CallDisposable disposable = new CallDisposable(call); - observer.onSubscribe(disposable); - if (disposable.isDisposed()) { - return; - } - - boolean terminated = false; - try { - Response response = call.execute(); - if (!disposable.isDisposed()) { - observer.onNext(response); - } - if (!disposable.isDisposed()) { - terminated = true; - observer.onComplete(); + static Observable> create(Call originalCall) { + return Observable.create(emitter -> { + // Since Call is a one-shot type, clone it for each new observer. + Call call = originalCall.clone(); + CallDisposable disposable = new CallDisposable(call); + emitter.setDisposable(disposable); + if (disposable.isDisposed()) { + return; } - } catch (Throwable t) { - Exceptions.throwIfFatal(t); - if (terminated) { - RxJavaPlugins.onError(t); - } else if (!disposable.isDisposed()) { - try { - observer.onError(t); - } catch (Throwable inner) { - Exceptions.throwIfFatal(inner); - RxJavaPlugins.onError(new CompositeException(t, inner)); + + boolean terminated = false; + try { + Response response = call.execute(); + if (!disposable.isDisposed()) { + emitter.onNext(response); + } + if (!disposable.isDisposed()) { + terminated = true; + emitter.onComplete(); + } + } catch (Throwable t) { + Exceptions.throwIfFatal(t); + if (terminated) { + RxJavaPlugins.onError(t); + } else if (!disposable.isDisposed()) { + try { + emitter.onError(t); + } catch (Throwable inner) { + Exceptions.throwIfFatal(inner); + RxJavaPlugins.onError(new CompositeException(t, inner)); + } } } - } + }); } private static final class CallDisposable implements Disposable { diff --git a/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/RxJava2CallAdapter.java b/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/RxJava2CallAdapter.java index b6bec97c5e..0a3d64ab9f 100644 --- a/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/RxJava2CallAdapter.java +++ b/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/RxJava2CallAdapter.java @@ -19,8 +19,11 @@ import io.reactivex.Observable; import io.reactivex.Scheduler; import io.reactivex.plugins.RxJavaPlugins; + import java.lang.reflect.Type; + import javax.annotation.Nullable; + import retrofit2.Call; import retrofit2.CallAdapter; import retrofit2.Response; @@ -37,15 +40,15 @@ final class RxJava2CallAdapter implements CallAdapter { private final boolean isCompletable; RxJava2CallAdapter( - Type responseType, - @Nullable Scheduler scheduler, - boolean isAsync, - boolean isResult, - boolean isBody, - boolean isFlowable, - boolean isSingle, - boolean isMaybe, - boolean isCompletable) { + Type responseType, + @Nullable Scheduler scheduler, + boolean isAsync, + boolean isResult, + boolean isBody, + boolean isFlowable, + boolean isSingle, + boolean isMaybe, + boolean isCompletable) { this.responseType = responseType; this.scheduler = scheduler; this.isAsync = isAsync; @@ -65,7 +68,7 @@ public Type responseType() { @Override public Object adapt(Call call) { Observable> responseObservable = - isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call); + isAsync ? CallEnqueueObservable.create(call):CallExecuteObservable.create(call); Observable observable; if (isResult) {