diff --git a/reactivex/__init__.py b/reactivex/__init__.py index 54e011b7..e5e93c87 100644 --- a/reactivex/__init__.py +++ b/reactivex/__init__.py @@ -469,7 +469,7 @@ def from_callable( def from_callback( func: Callable[..., Callable[..., None]], mapper: typing.Mapper[Any, Any] | None = None, -) -> Callable[[], Observable[Any]]: +) -> Callable[..., Observable[Any]]: """Converts a callback function to an observable sequence. Args: diff --git a/reactivex/observable/fromcallback.py b/reactivex/observable/fromcallback.py index c4dc83a3..fb275c52 100644 --- a/reactivex/observable/fromcallback.py +++ b/reactivex/observable/fromcallback.py @@ -8,7 +8,7 @@ def from_callback_( func: Callable[..., Callable[..., None]], mapper: typing.Mapper[Any, Any] | None = None, -) -> Callable[[], Observable[Any]]: +) -> Callable[..., Observable[Any]]: """Converts a callback function to an observable sequence. Args: diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index ad4583a4..b7e73ee8 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -707,7 +707,8 @@ def dematerialize() -> Callable[[Observable[Notification[_T]]], Observable[_T]]: def delay( - duetime: typing.RelativeTime, scheduler: abc.SchedulerBase | None = None + duetime: typing.AbsoluteOrRelativeTime, + scheduler: abc.SchedulerBase | None = None, ) -> Callable[[Observable[_T]], Observable[_T]]: """The delay operator. diff --git a/reactivex/operators/_delay.py b/reactivex/operators/_delay.py index 404b8f45..846732fe 100644 --- a/reactivex/operators/_delay.py +++ b/reactivex/operators/_delay.py @@ -20,7 +20,7 @@ def observable_delay_timespan( source: Observable[_T], - duetime: typing.RelativeTime, + duetime: typing.AbsoluteOrRelativeTime, scheduler: abc.SchedulerBase | None = None, ) -> Observable[_T]: def subscribe( @@ -119,7 +119,7 @@ def action(scheduler: abc.SchedulerBase, state: Any = None): @curry_flip def delay_( source: Observable[_T], - duetime: typing.RelativeTime, + duetime: typing.AbsoluteOrRelativeTime, scheduler: abc.SchedulerBase | None = None, ) -> Observable[_T]: """Time shifts the observable sequence. diff --git a/tests/test_observable/test_amb.py b/tests/test_observable/test_amb.py index 849668e4..d73525fa 100644 --- a/tests/test_observable/test_amb.py +++ b/tests/test_observable/test_amb.py @@ -68,7 +68,7 @@ def test_amb_regular_should_dispose_loser(self): source_not_disposed = [False] o1 = scheduler.create_hot_observable(msgs1) - def action(): + def action(_): source_not_disposed[0] = True o2 = scheduler.create_hot_observable(msgs2).pipe( @@ -91,7 +91,7 @@ def test_amb_winner_throws(self): source_not_disposed = [False] o1 = scheduler.create_hot_observable(msgs1) - def action(): + def action(_): source_not_disposed[0] = True o2 = scheduler.create_hot_observable(msgs2).pipe( @@ -112,7 +112,7 @@ def test_amb_loser_throws(self): msgs2 = [on_next(150, 1), on_next(210, 3), on_completed(250)] source_not_disposed = [False] - def action(): + def action(_): source_not_disposed[0] = True o1 = scheduler.create_hot_observable(msgs1).pipe( @@ -136,7 +136,7 @@ def test_amb_throws_before_election(self): source_not_disposed = [False] o1 = scheduler.create_hot_observable(msgs1) - def action(): + def action(_): source_not_disposed[0] = True o2 = scheduler.create_hot_observable(msgs2).pipe( diff --git a/tests/test_observable/test_average.py b/tests/test_observable/test_average.py index f48fa30c..da0c9e1c 100644 --- a/tests/test_observable/test_average.py +++ b/tests/test_observable/test_average.py @@ -1,6 +1,7 @@ import unittest from reactivex import operators as _ +from reactivex.notification import OnError from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -20,7 +21,7 @@ def test_average_int32_empty(self): res = scheduler.start(create=lambda: xs.pipe(_.average())).messages assert len(res) == 1 - assert res[0].value.kind == "E" and res[0].value.exception is not None + assert isinstance(res[0].value, OnError) and res[0].value.exception is not None assert res[0].time == 250 def test_average_int32_return(self): diff --git a/tests/test_observable/test_combinelatest.py b/tests/test_observable/test_combinelatest.py index b842b3c0..7a4f0f4d 100644 --- a/tests/test_observable/test_combinelatest.py +++ b/tests/test_observable/test_combinelatest.py @@ -472,7 +472,7 @@ def create(): ] def test_combine_latest_mapper_throws(self): - ex = "ex" + ex = Exception("ex") scheduler = TestScheduler() msgs1 = [on_next(150, 1), on_next(215, 2), on_completed(230)] msgs2 = [on_next(150, 1), on_next(220, 3), on_completed(240)] diff --git a/tests/test_observable/test_concat.py b/tests/test_observable/test_concat.py index b7d1b063..b3c2a4b0 100644 --- a/tests/test_observable/test_concat.py +++ b/tests/test_observable/test_concat.py @@ -1,7 +1,9 @@ import unittest import reactivex +from reactivex import abc from reactivex import operators as ops +from reactivex.disposable import Disposable from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -225,15 +227,23 @@ def create(): def test_concat_forward_scheduler(self): scheduler = TestScheduler() - subscribe_schedulers = {"e1": "unknown", "e2": "unknown"} + subscribe_schedulers: dict[str, object] = {"e1": "unknown", "e2": "unknown"} - def subscribe_e1(observer, scheduler="not_set"): + def subscribe_e1( + observer: abc.ObserverBase[int], + scheduler: abc.SchedulerBase | None = None, + ) -> abc.DisposableBase: subscribe_schedulers["e1"] = scheduler observer.on_completed() + return Disposable() - def subscribe_e2(observer, scheduler="not_set"): + def subscribe_e2( + observer: abc.ObserverBase[int], + scheduler: abc.SchedulerBase | None = None, + ) -> abc.DisposableBase: subscribe_schedulers["e2"] = scheduler observer.on_completed() + return Disposable() e1 = reactivex.create(subscribe_e1) e2 = reactivex.create(subscribe_e2) @@ -245,15 +255,23 @@ def subscribe_e2(observer, scheduler="not_set"): assert subscribe_schedulers["e2"] is scheduler def test_concat_forward_none_scheduler(self): - subscribe_schedulers = {"e1": "unknown", "e2": "unknown"} + subscribe_schedulers: dict[str, object] = {"e1": "unknown", "e2": "unknown"} - def subscribe_e1(observer, scheduler="not_set"): + def subscribe_e1( + observer: abc.ObserverBase[int], + scheduler: abc.SchedulerBase | None = None, + ) -> abc.DisposableBase: subscribe_schedulers["e1"] = scheduler observer.on_completed() + return Disposable() - def subscribe_e2(observer, scheduler="not_set"): + def subscribe_e2( + observer: abc.ObserverBase[int], + scheduler: abc.SchedulerBase | None = None, + ) -> abc.DisposableBase: subscribe_schedulers["e2"] = scheduler observer.on_completed() + return Disposable() e1 = reactivex.create(subscribe_e1) e2 = reactivex.create(subscribe_e2) diff --git a/tests/test_observable/test_debounce.py b/tests/test_observable/test_debounce.py index 8532e44f..9f61195d 100644 --- a/tests/test_observable/test_debounce.py +++ b/tests/test_observable/test_debounce.py @@ -359,7 +359,7 @@ def mapper(x): assert xs.subscriptions == [subscribe(200, 460)] def test_debounce_duration_mapper_throws(self): - ex = "ex" + ex = Exception("ex") scheduler = TestScheduler() xs = scheduler.create_hot_observable( on_next(150, 1), @@ -376,7 +376,7 @@ def mapper(x): on_next(x * 10, "Ignore"), on_next(x * 10 + 5, "Aargh!") ) else: - _raise(ex) + raise ex return xs.pipe(_.throttle_with_mapper(mapper)) diff --git a/tests/test_observable/test_defer.py b/tests/test_observable/test_defer.py index 912f60d1..669fbf40 100644 --- a/tests/test_observable/test_defer.py +++ b/tests/test_observable/test_defer.py @@ -2,6 +2,7 @@ import reactivex from reactivex.testing import ReactiveTest, TestScheduler +from reactivex.testing.coldobservable import ColdObservable on_next = ReactiveTest.on_next on_completed = ReactiveTest.on_completed @@ -23,7 +24,7 @@ def _raise(ex): class TestDefer(unittest.TestCase): def test_defer_complete(self): - xs = [None] + xs: list[ColdObservable[int] | None] = [None] invoked = [0] scheduler = TestScheduler() @@ -40,12 +41,13 @@ def defer(scheduler): results = scheduler.start(create) assert results.messages == [on_next(300, 200), on_completed(400)] assert 1 == invoked[0] + assert xs[0] is not None assert xs[0].subscriptions == [subscribe(200, 400)] def test_defer_error(self): scheduler = TestScheduler() invoked = [0] - xs = [None] + xs: list[ColdObservable[int] | None] = [None] ex = "ex" def create(): @@ -62,12 +64,13 @@ def defer(scheduler): assert results.messages == [on_next(300, 200), on_error(400, ex)] assert 1 == invoked[0] + assert xs[0] is not None assert xs[0].subscriptions == [subscribe(200, 400)] def test_defer_dispose(self): scheduler = TestScheduler() invoked = [0] - xs = [None] + xs: list[ColdObservable[int] | None] = [None] def create(): def defer(scheduler): @@ -84,6 +87,7 @@ def defer(scheduler): results = scheduler.start(create) assert results.messages == [on_next(300, 200), on_next(400, 1)] assert 1 == invoked[0] + assert xs[0] is not None assert xs[0].subscriptions == [subscribe(200, 1000)] def test_defer_on_error(self): diff --git a/tests/test_observable/test_doaction.py b/tests/test_observable/test_doaction.py index e28ff24d..5b1c1d13 100644 --- a/tests/test_observable/test_doaction.py +++ b/tests/test_observable/test_doaction.py @@ -55,7 +55,6 @@ def test_do_plain_action(self): def create(): def action(x): i[0] += 1 - return i[0] return xs.pipe(_.do_action(action)) diff --git a/tests/test_observable/test_elementat.py b/tests/test_observable/test_elementat.py index df004973..5dd7c9cb 100644 --- a/tests/test_observable/test_elementat.py +++ b/tests/test_observable/test_elementat.py @@ -1,6 +1,7 @@ import unittest from reactivex import operators as ops +from reactivex.notification import OnError from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -54,8 +55,9 @@ def create(): self.assertEqual(1, len(results.messages)) self.assertEqual(600, results.messages[0].time) - self.assertEqual("E", results.messages[0].value.kind) - assert results.messages[0].value.exception + value = results.messages[0].value + assert isinstance(value, OnError) + assert value.exception def test_elementat_error(self): ex = "ex" diff --git a/tests/test_observable/test_empty.py b/tests/test_observable/test_empty.py index dd2f70d6..66dc5ffd 100644 --- a/tests/test_observable/test_empty.py +++ b/tests/test_observable/test_empty.py @@ -45,7 +45,10 @@ def test_empty_observer_throw_exception(self): scheduler = TestScheduler() xs = empty() xs.subscribe( - lambda x: None, lambda ex: None, lambda: _raise("ex"), scheduler=scheduler + lambda x: None, + lambda ex: None, + lambda: _raise(Exception("ex")), + scheduler=scheduler, ) with self.assertRaises(RxException): diff --git a/tests/test_observable/test_expand.py b/tests/test_observable/test_expand.py index 2691aada..0fa7d315 100644 --- a/tests/test_observable/test_expand.py +++ b/tests/test_observable/test_expand.py @@ -27,7 +27,7 @@ def test_expand_empty(self): xs = scheduler.create_hot_observable(on_completed(300)) def create(): - def mapper(): + def mapper(x): return scheduler.create_cold_observable( on_next(100, 1), on_next(200, 2), on_completed(300) ) diff --git a/tests/test_observable/test_filter.py b/tests/test_observable/test_filter.py index 2635e66b..7a319e90 100644 --- a/tests/test_observable/test_filter.py +++ b/tests/test_observable/test_filter.py @@ -252,7 +252,7 @@ def predicate(x: int) -> bool: def test_filter_dispose_in_predicate(self): scheduler = TestScheduler() invoked = [0] - ys = [None] + ys: list[Observable[int] | None] = [None] xs = scheduler.create_hot_observable( on_next(110, 1), on_next(180, 2), @@ -282,11 +282,11 @@ def predicate(x): return is_prime(x) ys[0] = xs.pipe(filter(predicate)) - return ys[0] scheduler.schedule_absolute(created, action) def action1(scheduler, state): + assert ys[0] is not None d.disposable = ys[0].subscribe(results) scheduler.schedule_absolute(subscribed, action1) @@ -510,7 +510,7 @@ def predicate(x, index): def test_filter_indexed_dispose_in_predicate(self): scheduler = TestScheduler() - ys = [None] + ys: list[Observable[int] | None] = [None] invoked = [0] xs = scheduler.create_hot_observable( on_next(110, 1), @@ -545,6 +545,7 @@ def predicate(x, index): scheduler.schedule_absolute(created, action1) def action2(scheduler, state): + assert ys[0] is not None d.disposable = ys[0].subscribe(results) scheduler.schedule_absolute(subscribed, action2) diff --git a/tests/test_observable/test_generate.py b/tests/test_observable/test_generate.py index 0dfdec25..f58a190f 100644 --- a/tests/test_observable/test_generate.py +++ b/tests/test_observable/test_generate.py @@ -1,4 +1,5 @@ import unittest +from typing import Any, NoReturn import reactivex from reactivex import operators as ops @@ -18,17 +19,20 @@ class RxException(Exception): # Helper function for raising exceptions within lambdas -def _raise(ex): +def _raise(ex: Any) -> NoReturn: raise RxException(ex) +_INITIAL: int = 0 + + class TestGenerate(unittest.TestCase): def test_generate_finite(self): scheduler = TestScheduler() def create(): return reactivex.generate( - 0, + _INITIAL, lambda x: x <= 3, lambda x: x + 1, ) @@ -49,7 +53,7 @@ def test_generate_throw_condition(self): def create(): return reactivex.generate( - 0, + _INITIAL, lambda x: _raise("ex"), lambda x: x + 1, ) @@ -78,7 +82,7 @@ def test_generate_dispose(self): def create(): return reactivex.generate( - 0, + _INITIAL, lambda x: True, lambda x: x + 1, ) @@ -91,7 +95,7 @@ def test_generate_repeat(self): def create(): return reactivex.generate( - 0, + _INITIAL, lambda x: x <= 3, lambda x: x + 1, ).pipe(ops.repeat(2)) diff --git a/tests/test_observable/test_interval.py b/tests/test_observable/test_interval.py index 8c2d1ab3..7287b2f2 100644 --- a/tests/test_observable/test_interval.py +++ b/tests/test_observable/test_interval.py @@ -75,7 +75,7 @@ def create(): def test_interval_timespan_observer_throws(self): scheduler = TestScheduler() xs = reactivex.interval(1) - xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler) + xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler) with self.assertRaises(RxException): scheduler.start() diff --git a/tests/test_observable/test_map.py b/tests/test_observable/test_map.py index 51466db6..2937d2a9 100644 --- a/tests/test_observable/test_map.py +++ b/tests/test_observable/test_map.py @@ -1,6 +1,7 @@ import unittest +from typing import Any, NoReturn -from reactivex import create, empty, return_value, throw +from reactivex import abc, create, empty, return_value, throw from reactivex.disposable import SerialDisposable from reactivex.operators import map, map_indexed from reactivex.testing import ReactiveTest, TestScheduler @@ -21,7 +22,7 @@ class RxException(Exception): # Helper function for raising exceptions within lambdas -def _raise(ex): +def _raise(ex: Any) -> NoReturn: raise RxException(ex) @@ -29,18 +30,21 @@ class TestSelect(unittest.TestCase): def test_map_throws(self): mapper = map(lambda x: x) with self.assertRaises(RxException): - return_value(1).pipe(mapper).subscribe(lambda x: _raise("ex")) + return_value(1).pipe(mapper).subscribe(lambda x: _raise(Exception("ex"))) with self.assertRaises(RxException): throw("ex").pipe(mapper).subscribe(on_error=lambda ex: _raise(ex)) with self.assertRaises(RxException): empty().pipe(mapper).subscribe( - lambda x: x, lambda ex: ex, lambda: _raise("ex") + lambda x: x, lambda ex: None, lambda: _raise(Exception("ex")) ) - def subscribe(observer, scheduler=None): - _raise("ex") + def subscribe( + observer: abc.ObserverBase[int], + scheduler: abc.SchedulerBase | None = None, + ) -> abc.DisposableBase: + _raise(Exception("ex")) with self.assertRaises(RxException): create(subscribe).pipe(map(lambda x: x)).subscribe() @@ -61,7 +65,7 @@ def projection(x, *args, **kw): d.dispose() return x - d.disposable = xs.pipe(map(projection)).subscribe(results, scheduler) + d.disposable = xs.pipe(map(projection)).subscribe(results, scheduler=scheduler) def action(scheduler, state): return d.dispose() @@ -263,10 +267,13 @@ def projection(x): assert invoked[0] == 3 def test_map_with_index_throws(self): + mapper = map_indexed(lambda x, index: x) with self.assertRaises(RxException): - mapper = map_indexed(lambda x, index: x) - - return return_value(1).pipe(mapper).subscribe(lambda x: _raise("ex")) + return ( + return_value(1) + .pipe(mapper) + .subscribe(lambda x: _raise(Exception("ex"))) + ) with self.assertRaises(RxException): return ( @@ -277,11 +284,13 @@ def test_map_with_index_throws(self): return ( empty() .pipe(mapper) - .subscribe(lambda x: x, lambda ex: None, lambda: _raise("ex")) + .subscribe( + lambda x: x, lambda ex: None, lambda: _raise(Exception("ex")) + ) ) with self.assertRaises(RxException): - return create(lambda o, s: _raise("ex")).pipe(mapper).subscribe() + return create(lambda o, s: _raise(Exception("ex"))).pipe(mapper).subscribe() def test_map_with_index_dispose_inside_mapper(self): scheduler = TestScheduler() diff --git a/tests/test_observable/test_observeon.py b/tests/test_observable/test_observeon.py index e9e869a1..85f1855a 100644 --- a/tests/test_observable/test_observeon.py +++ b/tests/test_observable/test_observeon.py @@ -1,7 +1,9 @@ import unittest import reactivex +from reactivex import abc from reactivex import operators as ops +from reactivex.disposable import Disposable from reactivex.scheduler import ImmediateScheduler from reactivex.testing import ReactiveTest, TestScheduler @@ -78,10 +80,14 @@ def test_observe_on_forward_subscribe_scheduler(self): actual_subscribe_scheduler = None - def subscribe(observer, scheduler): + def subscribe( + observer: abc.ObserverBase[int], + scheduler: abc.SchedulerBase | None = None, + ) -> abc.DisposableBase: nonlocal actual_subscribe_scheduler actual_subscribe_scheduler = scheduler observer.on_completed() + return Disposable() xs = reactivex.create(subscribe) diff --git a/tests/test_observable/test_of.py b/tests/test_observable/test_of.py index 122a9249..4a368ffb 100644 --- a/tests/test_observable/test_of.py +++ b/tests/test_observable/test_of.py @@ -48,7 +48,7 @@ def teest_of_with_scheduler_empty(self): scheduler = TestScheduler() def create(): - return reactivex.of(scheduler=scheduler) + return reactivex.of(scheduler=scheduler) # type: ignore[call-arg] results = scheduler.start(create=create) diff --git a/tests/test_observable/test_onerrorresumenext.py b/tests/test_observable/test_onerrorresumenext.py index d672e9e0..02a18596 100644 --- a/tests/test_observable/test_onerrorresumenext.py +++ b/tests/test_observable/test_onerrorresumenext.py @@ -160,7 +160,7 @@ def test_on_error_resume_next_start_with_factory(self): msgs1 = [on_next(150, 1), on_next(210, 2), on_next(220, 3), on_error(230, "ex")] o1 = scheduler.create_hot_observable(msgs1) - def factory(ex: Exception): + def factory(ex: Exception | None): assert str(ex) == "ex" msgs2 = [on_next(240, 4), on_completed(250)] o2 = scheduler.create_hot_observable(msgs2) diff --git a/tests/test_observable/test_reduce.py b/tests/test_observable/test_reduce.py index 415663a7..a75efa28 100644 --- a/tests/test_observable/test_reduce.py +++ b/tests/test_observable/test_reduce.py @@ -2,6 +2,7 @@ import reactivex from reactivex import operators as ops +from reactivex.notification import OnError from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -88,7 +89,7 @@ def create(): res = scheduler.start(create=create).messages assert len(res) == 1 - assert res[0].value.kind == "E" and res[0].value.exception is not None + assert isinstance(res[0].value, OnError) and res[0].value.exception is not None assert res[0].time == 250 def test_reduce_without_seed_return(self): diff --git a/tests/test_observable/test_repeat.py b/tests/test_observable/test_repeat.py index 80324425..f35d27dc 100644 --- a/tests/test_observable/test_repeat.py +++ b/tests/test_observable/test_repeat.py @@ -143,21 +143,23 @@ def test_repeat_observable_error(self): def test_repeat_observable_throws(self): scheduler1 = TestScheduler() xs = reactivex.return_value(11).pipe(ops.repeat()) - xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler1) + xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler1) with self.assertRaises(RxException): scheduler1.start() scheduler2 = TestScheduler() ys = reactivex.throw("ex").pipe(ops.repeat()) - ys.subscribe(lambda ex: _raise("ex"), scheduler=scheduler2) + ys.subscribe(lambda ex: _raise(Exception("ex")), scheduler=scheduler2) with self.assertRaises(Exception): scheduler2.start() scheduler3 = TestScheduler() zs = reactivex.return_value(1).pipe(ops.repeat()) - d = zs.subscribe(on_completed=lambda: _raise("ex"), scheduler=scheduler3) + d = zs.subscribe( + on_completed=lambda: _raise(Exception("ex")), scheduler=scheduler3 + ) scheduler3.schedule_absolute(210, lambda sc, st: d.dispose()) scheduler3.start() @@ -236,14 +238,14 @@ def test_repeat_observable_repeat_count_error(self): def test_repeat_observable_repeat_count_throws(self): scheduler1 = TestScheduler() xs = reactivex.return_value(1).pipe(ops.repeat(3)) - xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler1) + xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler1) with self.assertRaises(RxException): scheduler1.start() scheduler2 = TestScheduler() ys = reactivex.throw("ex1").pipe(ops.repeat(3)) - ys.subscribe(on_error=lambda ex: _raise("ex2"), scheduler=scheduler2) + ys.subscribe(on_error=lambda ex: _raise(Exception("ex2")), scheduler=scheduler2) with self.assertRaises(RxException): scheduler2.start() diff --git a/tests/test_observable/test_retry.py b/tests/test_observable/test_retry.py index 8ef31716..d39cd869 100644 --- a/tests/test_observable/test_retry.py +++ b/tests/test_observable/test_retry.py @@ -1,4 +1,5 @@ import unittest +from typing import Any, NoReturn import pytest @@ -20,7 +21,7 @@ class RxException(Exception): # Helper function for raising exceptions within lambdas -def _raise(ex): +def _raise(ex: Any) -> NoReturn: raise RxException(ex) @@ -79,21 +80,23 @@ def test_retry_observable_error(self): def test_retry_observable_throws(self): scheduler1 = TestScheduler() xs = reactivex.return_value(1).pipe(ops.retry()) - xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler1) + xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler1) with pytest.raises(RxException): scheduler1.start() scheduler2 = TestScheduler() ys = reactivex.throw("ex").pipe(ops.retry()) - d = ys.subscribe(on_error=lambda ex: _raise("ex"), scheduler=scheduler2) + d = ys.subscribe( + on_error=lambda ex: _raise(Exception("ex")), scheduler=scheduler2 + ) scheduler2.schedule_absolute(210, lambda sc, st: d.dispose()) scheduler2.start() scheduler3 = TestScheduler() zs = reactivex.return_value(1).pipe(ops.retry()) - zs.subscribe(on_completed=lambda: _raise("ex"), scheduler=scheduler3) + zs.subscribe(on_completed=lambda: _raise(Exception("ex")), scheduler=scheduler3) with pytest.raises(RxException): scheduler3.start() @@ -168,13 +171,15 @@ def test_retry_observable_retry_count_dispose_iii(self): def test_retry_observable_retry_count_throws(self): scheduler1 = TestScheduler() xs = reactivex.return_value(1).pipe(ops.retry(3)) - xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler1) + xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler1) self.assertRaises(RxException, scheduler1.start) scheduler2 = TestScheduler() ys = reactivex.throw("ex").pipe(ops.retry(100)) - d = ys.subscribe(on_error=lambda ex: _raise("ex"), scheduler=scheduler2) + d = ys.subscribe( + on_error=lambda ex: _raise(Exception("ex")), scheduler=scheduler2 + ) def dispose(_, __): d.dispose() @@ -184,12 +189,14 @@ def dispose(_, __): scheduler3 = TestScheduler() zs = reactivex.return_value(1).pipe(ops.retry(100)) - zs.subscribe(on_completed=lambda: _raise("ex"), scheduler=scheduler3) + zs.subscribe(on_completed=lambda: _raise(Exception("ex")), scheduler=scheduler3) with pytest.raises(RxException): scheduler3.start() - xss = reactivex.create(lambda o: _raise("ex")).pipe(ops.retry(100)) + xss = reactivex.create(lambda o, s: _raise(Exception("ex"))).pipe( + ops.retry(100) + ) with pytest.raises(Exception): xss.subscribe() diff --git a/tests/test_observable/test_returnvalue.py b/tests/test_observable/test_returnvalue.py index 8cc2959e..0810342f 100644 --- a/tests/test_observable/test_returnvalue.py +++ b/tests/test_observable/test_returnvalue.py @@ -70,14 +70,17 @@ def on_completed(): def test_return_observer_throws(self): scheduler1 = TestScheduler() xs = reactivex.return_value(1) - xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler1) + xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler1) self.assertRaises(RxException, scheduler1.start) scheduler2 = TestScheduler() ys = reactivex.return_value(1) ys.subscribe( - lambda x: x, lambda ex: ex, lambda: _raise("ex"), scheduler=scheduler2 + lambda x: None, + lambda ex: None, + lambda: _raise(Exception("ex")), + scheduler=scheduler2, ) self.assertRaises(RxException, scheduler2.start) diff --git a/tests/test_observable/test_skipuntil.py b/tests/test_observable/test_skipuntil.py index a114bdab..faa18504 100644 --- a/tests/test_observable/test_skipuntil.py +++ b/tests/test_observable/test_skipuntil.py @@ -1,8 +1,9 @@ import unittest import reactivex -from reactivex import Observable +from reactivex import Observable, abc from reactivex import operators as ops +from reactivex.disposable import Disposable from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -157,8 +158,12 @@ def test_skip_until_has_completed_causes_disposal(self): disposed = [False] left = scheduler.create_hot_observable(l_msgs) - def subscribe(observer, scheduler=None): + def subscribe( + observer: abc.ObserverBase[int], + scheduler: abc.SchedulerBase | None = None, + ) -> abc.DisposableBase: disposed[0] = True + return Disposable() r = Observable(subscribe) diff --git a/tests/test_observable/test_takeuntil.py b/tests/test_observable/test_takeuntil.py index 4ddb3b86..7ee6c76f 100644 --- a/tests/test_observable/test_takeuntil.py +++ b/tests/test_observable/test_takeuntil.py @@ -179,7 +179,7 @@ def test_take_until_preempt_beforefirstproduced_remain_silent_and_proper_dispose right_msgs = [on_next(150, 1), on_next(210, 2), on_completed(220)] source_not_disposed = [False] - def action(): + def action(_): source_not_disposed[0] = True left = scheduler.create_hot_observable(left_msgs).pipe( @@ -203,7 +203,7 @@ def test_take_until_nopreempt_afterlastproduced_proper_disposed_signal(self): signal_not_disposed = [False] left = scheduler.create_hot_observable(left_msgs) - def action(): + def action(_): signal_not_disposed[0] = True right = scheduler.create_hot_observable(right_msgs).pipe( diff --git a/tests/test_observable/test_throw.py b/tests/test_observable/test_throw.py index 8b5815ae..05bac612 100644 --- a/tests/test_observable/test_throw.py +++ b/tests/test_observable/test_throw.py @@ -45,7 +45,10 @@ def test_throw_observer_throws(self): scheduler = TestScheduler() xs = throw("ex") xs.subscribe( - lambda x: None, lambda ex: _raise("ex"), lambda: None, scheduler=scheduler + lambda x: None, + lambda ex: _raise(Exception("ex")), + lambda: None, + scheduler=scheduler, ) self.assertRaises(RxException, scheduler.start) diff --git a/tests/test_observable/test_timer.py b/tests/test_observable/test_timer.py index 8f4009fa..8cacc08b 100644 --- a/tests/test_observable/test_timer.py +++ b/tests/test_observable/test_timer.py @@ -57,7 +57,7 @@ def test_oneshot_timer_date_observer_throws(self): scheduler = TestScheduler() date = scheduler.to_datetime(250.0) xs = reactivex.timer(date) - xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler) + xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler) self.assertRaises(RxException, scheduler.start) @@ -100,13 +100,13 @@ def create(): def test_oneshot_timer_timespan_observer_throws(self): scheduler1 = TestScheduler() xs = reactivex.timer(11) - xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler1) + xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler1) self.assertRaises(RxException, scheduler1.start) scheduler2 = TestScheduler() ys = reactivex.timer(1, period=None) - ys.subscribe(on_completed=lambda: _raise("ex"), scheduler=scheduler2) + ys.subscribe(on_completed=lambda: _raise(Exception("ex")), scheduler=scheduler2) self.assertRaises(RxException, scheduler2.start) diff --git a/tests/test_observable/test_tofuture.py b/tests/test_observable/test_tofuture.py index e5a27fda..e72b7415 100644 --- a/tests/test_observable/test_tofuture.py +++ b/tests/test_observable/test_tofuture.py @@ -1,8 +1,10 @@ import asyncio import unittest +from typing import cast import reactivex import reactivex.operators as ops +from reactivex import Observable from reactivex.internal.exceptions import SequenceContainsNoElementsError from reactivex.subject import Subject from reactivex.testing import ReactiveTest @@ -102,7 +104,7 @@ def test_dispose_on_cancel(self): async def using_sub(): # Since the subject never completes, this await statement # will never be complete either. We wait forever. - await reactivex.using(lambda: sub, lambda s: s) + await reactivex.using(lambda: sub, lambda s: cast(Observable[int], s)) async def go(): await asyncio.wait_for(using_sub(), 0.1) diff --git a/tests/test_observable/test_toiterable.py b/tests/test_observable/test_toiterable.py index dc2455cd..ff40c8e0 100644 --- a/tests/test_observable/test_toiterable.py +++ b/tests/test_observable/test_toiterable.py @@ -1,6 +1,7 @@ import unittest from reactivex import operators as ops +from reactivex.notification import OnNext from reactivex.testing import ReactiveTest, TestScheduler @@ -27,8 +28,9 @@ def create(): assert len(results) == 2 assert results[0].time == 660 - assert results[0].value.kind == "N" - assert results[0].value.value == [2, 3, 4, 5] + value = results[0].value + assert isinstance(value, OnNext) + assert value.value == [2, 3, 4, 5] assert self.on_completed(660).equals(results[1]) assert xs.subscriptions == [self.subscribe(200, 660)] diff --git a/tests/test_observable/test_while_do.py b/tests/test_observable/test_while_do.py index 125e6fa5..2992ef6b 100644 --- a/tests/test_observable/test_while_do.py +++ b/tests/test_observable/test_while_do.py @@ -1,7 +1,9 @@ import unittest import reactivex +from reactivex import abc from reactivex import operators as ops +from reactivex.disposable import Disposable from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -102,10 +104,13 @@ def predicate(x): n[0] += 1 return n[0] < 100 - def subscribe(o, scheduler=None): + def subscribe( + o: abc.ObserverBase[int], + scheduler: abc.SchedulerBase | None = None, + ) -> abc.DisposableBase: o.on_next(1) o.on_completed() - return lambda: None + return Disposable() return reactivex.create(subscribe).pipe(ops.while_do(predicate))