From 2516580d3de1aedb7503ff10cff106d332961205 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sun, 26 Apr 2026 04:19:29 +0000 Subject: [PATCH 1/2] test: fix pyright strict-mode errors in test_observable (batch 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix pyright strict-mode type errors in 30 files under tests/test_observable/, continuing the incremental cleanup of the pyright exclusion list. Error patterns fixed: - _raise("string") → _raise(Exception("string")) - def action(): → def action(_): for on_next callbacks (OnNext[T] requires a param) - Notification union-attr access annotated with # type: ignore[union-attr] - subscribe(scheduler="not_set") → subscribe(scheduler=None) for proper typing - mapper/generate iterate lambdas annotated with # type: ignore[arg-type] - do_action callback removing spurious return value - factory(ex: Exception) → factory(ex: Exception | None) for on_error_resume_next - return lambda: None → return None in subscribe functions - Hoisted variables to avoid unbound-var errors inside assertRaises blocks - Various other minor fixes (arg-type, call-arg, misc type: ignore) 260 tests pass. Part of ongoing effort to clean up pyright exclusions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- tests/test_observable/test_amb.py | 8 +++---- tests/test_observable/test_average.py | 2 +- tests/test_observable/test_combinelatest.py | 2 +- tests/test_observable/test_concat.py | 20 ++++++++--------- tests/test_observable/test_debounce.py | 4 ++-- tests/test_observable/test_defer.py | 6 ++--- tests/test_observable/test_delay.py | 10 ++++----- tests/test_observable/test_doaction.py | 1 - tests/test_observable/test_elementat.py | 4 ++-- tests/test_observable/test_empty.py | 2 +- tests/test_observable/test_expand.py | 2 +- tests/test_observable/test_filter.py | 7 +++--- tests/test_observable/test_fromcallback.py | 4 ++-- tests/test_observable/test_generate.py | 8 +++---- tests/test_observable/test_interval.py | 2 +- tests/test_observable/test_map.py | 22 +++++++++---------- tests/test_observable/test_observeon.py | 4 ++-- tests/test_observable/test_of.py | 2 +- .../test_observable/test_onerrorresumenext.py | 2 +- tests/test_observable/test_reduce.py | 2 +- tests/test_observable/test_repeat.py | 10 ++++----- tests/test_observable/test_retry.py | 14 ++++++------ tests/test_observable/test_returnvalue.py | 4 ++-- tests/test_observable/test_skipuntil.py | 3 ++- tests/test_observable/test_takeuntil.py | 4 ++-- tests/test_observable/test_throw.py | 2 +- tests/test_observable/test_timer.py | 6 ++--- tests/test_observable/test_tofuture.py | 2 +- tests/test_observable/test_toiterable.py | 4 ++-- tests/test_observable/test_while_do.py | 4 ++-- 30 files changed, 84 insertions(+), 83 deletions(-) diff --git a/tests/test_observable/test_amb.py b/tests/test_observable/test_amb.py index 849668e40..d73525fa0 100644 --- a/tests/test_observable/test_amb.py +++ b/tests/test_observable/test_amb.py @@ -68,7 +68,7 @@ def test_amb_regular_should_dispose_loser(self): source_not_disposed = [False] o1 = scheduler.create_hot_observable(msgs1) - def action(): + def action(_): source_not_disposed[0] = True o2 = scheduler.create_hot_observable(msgs2).pipe( @@ -91,7 +91,7 @@ def test_amb_winner_throws(self): source_not_disposed = [False] o1 = scheduler.create_hot_observable(msgs1) - def action(): + def action(_): source_not_disposed[0] = True o2 = scheduler.create_hot_observable(msgs2).pipe( @@ -112,7 +112,7 @@ def test_amb_loser_throws(self): msgs2 = [on_next(150, 1), on_next(210, 3), on_completed(250)] source_not_disposed = [False] - def action(): + def action(_): source_not_disposed[0] = True o1 = scheduler.create_hot_observable(msgs1).pipe( @@ -136,7 +136,7 @@ def test_amb_throws_before_election(self): source_not_disposed = [False] o1 = scheduler.create_hot_observable(msgs1) - def action(): + def action(_): source_not_disposed[0] = True o2 = scheduler.create_hot_observable(msgs2).pipe( diff --git a/tests/test_observable/test_average.py b/tests/test_observable/test_average.py index f48fa30c7..3fc82db7d 100644 --- a/tests/test_observable/test_average.py +++ b/tests/test_observable/test_average.py @@ -20,7 +20,7 @@ def test_average_int32_empty(self): res = scheduler.start(create=lambda: xs.pipe(_.average())).messages assert len(res) == 1 - assert res[0].value.kind == "E" and res[0].value.exception is not None + assert res[0].value.kind == "E" and res[0].value.exception is not None # type: ignore[union-attr] assert res[0].time == 250 def test_average_int32_return(self): diff --git a/tests/test_observable/test_combinelatest.py b/tests/test_observable/test_combinelatest.py index b842b3c04..7a4f0f4d6 100644 --- a/tests/test_observable/test_combinelatest.py +++ b/tests/test_observable/test_combinelatest.py @@ -472,7 +472,7 @@ def create(): ] def test_combine_latest_mapper_throws(self): - ex = "ex" + ex = Exception("ex") scheduler = TestScheduler() msgs1 = [on_next(150, 1), on_next(215, 2), on_completed(230)] msgs2 = [on_next(150, 1), on_next(220, 3), on_completed(240)] diff --git a/tests/test_observable/test_concat.py b/tests/test_observable/test_concat.py index b7d1b063c..4bf04bf03 100644 --- a/tests/test_observable/test_concat.py +++ b/tests/test_observable/test_concat.py @@ -225,18 +225,18 @@ def create(): def test_concat_forward_scheduler(self): scheduler = TestScheduler() - subscribe_schedulers = {"e1": "unknown", "e2": "unknown"} + subscribe_schedulers: dict[str, object] = {"e1": "unknown", "e2": "unknown"} - def subscribe_e1(observer, scheduler="not_set"): + def subscribe_e1(observer, scheduler=None): # type: ignore[misc] subscribe_schedulers["e1"] = scheduler observer.on_completed() - def subscribe_e2(observer, scheduler="not_set"): + def subscribe_e2(observer, scheduler=None): # type: ignore[misc] subscribe_schedulers["e2"] = scheduler observer.on_completed() - e1 = reactivex.create(subscribe_e1) - e2 = reactivex.create(subscribe_e2) + e1 = reactivex.create(subscribe_e1) # type: ignore[arg-type] + e2 = reactivex.create(subscribe_e2) # type: ignore[arg-type] stream = e1.pipe(ops.concat(e2)) stream.subscribe(scheduler=scheduler) @@ -245,18 +245,18 @@ def subscribe_e2(observer, scheduler="not_set"): assert subscribe_schedulers["e2"] is scheduler def test_concat_forward_none_scheduler(self): - subscribe_schedulers = {"e1": "unknown", "e2": "unknown"} + subscribe_schedulers: dict[str, object] = {"e1": "unknown", "e2": "unknown"} - def subscribe_e1(observer, scheduler="not_set"): + def subscribe_e1(observer, scheduler=None): # type: ignore[misc] subscribe_schedulers["e1"] = scheduler observer.on_completed() - def subscribe_e2(observer, scheduler="not_set"): + def subscribe_e2(observer, scheduler=None): # type: ignore[misc] subscribe_schedulers["e2"] = scheduler observer.on_completed() - e1 = reactivex.create(subscribe_e1) - e2 = reactivex.create(subscribe_e2) + e1 = reactivex.create(subscribe_e1) # type: ignore[arg-type] + e2 = reactivex.create(subscribe_e2) # type: ignore[arg-type] stream = e1.pipe(ops.concat(e2)) stream.subscribe() diff --git a/tests/test_observable/test_debounce.py b/tests/test_observable/test_debounce.py index 8532e44f9..9f61195d4 100644 --- a/tests/test_observable/test_debounce.py +++ b/tests/test_observable/test_debounce.py @@ -359,7 +359,7 @@ def mapper(x): assert xs.subscriptions == [subscribe(200, 460)] def test_debounce_duration_mapper_throws(self): - ex = "ex" + ex = Exception("ex") scheduler = TestScheduler() xs = scheduler.create_hot_observable( on_next(150, 1), @@ -376,7 +376,7 @@ def mapper(x): on_next(x * 10, "Ignore"), on_next(x * 10 + 5, "Aargh!") ) else: - _raise(ex) + raise ex return xs.pipe(_.throttle_with_mapper(mapper)) diff --git a/tests/test_observable/test_defer.py b/tests/test_observable/test_defer.py index 912f60d15..20e720334 100644 --- a/tests/test_observable/test_defer.py +++ b/tests/test_observable/test_defer.py @@ -40,7 +40,7 @@ def defer(scheduler): results = scheduler.start(create) assert results.messages == [on_next(300, 200), on_completed(400)] assert 1 == invoked[0] - assert xs[0].subscriptions == [subscribe(200, 400)] + assert xs[0].subscriptions == [subscribe(200, 400)] # type: ignore[union-attr] def test_defer_error(self): scheduler = TestScheduler() @@ -62,7 +62,7 @@ def defer(scheduler): assert results.messages == [on_next(300, 200), on_error(400, ex)] assert 1 == invoked[0] - assert xs[0].subscriptions == [subscribe(200, 400)] + assert xs[0].subscriptions == [subscribe(200, 400)] # type: ignore[union-attr] def test_defer_dispose(self): scheduler = TestScheduler() @@ -84,7 +84,7 @@ def defer(scheduler): results = scheduler.start(create) assert results.messages == [on_next(300, 200), on_next(400, 1)] assert 1 == invoked[0] - assert xs[0].subscriptions == [subscribe(200, 1000)] + assert xs[0].subscriptions == [subscribe(200, 1000)] # type: ignore[union-attr] def test_defer_on_error(self): scheduler = TestScheduler() diff --git a/tests/test_observable/test_delay.py b/tests/test_observable/test_delay.py index d11000bb9..6710afda7 100644 --- a/tests/test_observable/test_delay.py +++ b/tests/test_observable/test_delay.py @@ -63,7 +63,7 @@ def test_delay_datetime_offset_simple1_impl(self): def create(): dt = datetime.fromtimestamp(300.0, tz=timezone.utc) - return xs.pipe(delay(dt)) + return xs.pipe(delay(dt)) # type: ignore[arg-type] results = scheduler.start(create) assert results.messages == [ @@ -107,7 +107,7 @@ def test_delay_datetime_offset_simple2_impl(self): ) def create(): - return xs.pipe(delay(datetime.fromtimestamp(250, tz=timezone.utc))) + return xs.pipe(delay(datetime.fromtimestamp(250, tz=timezone.utc))) # type: ignore[arg-type] results = scheduler.start(create) @@ -153,7 +153,7 @@ def test_delay_datetime_offset_simple3_impl(self): ) def create(): - return xs.pipe(delay(datetime.fromtimestamp(350, tz=timezone.utc))) + return xs.pipe(delay(datetime.fromtimestamp(350, tz=timezone.utc))) # type: ignore[arg-type] results = scheduler.start(create) @@ -201,7 +201,7 @@ def test_delay_datetime_offset_error1_impl(self): ) def create(): - return xs.pipe(delay(datetime.fromtimestamp(250, tz=timezone.utc))) + return xs.pipe(delay(datetime.fromtimestamp(250, tz=timezone.utc))) # type: ignore[arg-type] results = scheduler.start(create) @@ -244,7 +244,7 @@ def test_delay_datetime_offset_error2_impl(self): ) def create(): - return xs.pipe(delay(datetime.fromtimestamp(350, tz=timezone.utc))) + return xs.pipe(delay(datetime.fromtimestamp(350, tz=timezone.utc))) # type: ignore[arg-type] results = scheduler.start(create) diff --git a/tests/test_observable/test_doaction.py b/tests/test_observable/test_doaction.py index e28ff24d9..5b1c1d13c 100644 --- a/tests/test_observable/test_doaction.py +++ b/tests/test_observable/test_doaction.py @@ -55,7 +55,6 @@ def test_do_plain_action(self): def create(): def action(x): i[0] += 1 - return i[0] return xs.pipe(_.do_action(action)) diff --git a/tests/test_observable/test_elementat.py b/tests/test_observable/test_elementat.py index df0049738..18fcfc8e6 100644 --- a/tests/test_observable/test_elementat.py +++ b/tests/test_observable/test_elementat.py @@ -54,8 +54,8 @@ def create(): self.assertEqual(1, len(results.messages)) self.assertEqual(600, results.messages[0].time) - self.assertEqual("E", results.messages[0].value.kind) - assert results.messages[0].value.exception + self.assertEqual("E", results.messages[0].value.kind) # type: ignore[union-attr] + assert results.messages[0].value.exception # type: ignore[union-attr] def test_elementat_error(self): ex = "ex" diff --git a/tests/test_observable/test_empty.py b/tests/test_observable/test_empty.py index dd2f70d6e..17296e72a 100644 --- a/tests/test_observable/test_empty.py +++ b/tests/test_observable/test_empty.py @@ -45,7 +45,7 @@ def test_empty_observer_throw_exception(self): scheduler = TestScheduler() xs = empty() xs.subscribe( - lambda x: None, lambda ex: None, lambda: _raise("ex"), scheduler=scheduler + lambda x: None, lambda ex: None, lambda: _raise(Exception("ex")), scheduler=scheduler ) with self.assertRaises(RxException): diff --git a/tests/test_observable/test_expand.py b/tests/test_observable/test_expand.py index 2691aadaf..0fa7d315d 100644 --- a/tests/test_observable/test_expand.py +++ b/tests/test_observable/test_expand.py @@ -27,7 +27,7 @@ def test_expand_empty(self): xs = scheduler.create_hot_observable(on_completed(300)) def create(): - def mapper(): + def mapper(x): return scheduler.create_cold_observable( on_next(100, 1), on_next(200, 2), on_completed(300) ) diff --git a/tests/test_observable/test_filter.py b/tests/test_observable/test_filter.py index 2635e66bf..7a319e909 100644 --- a/tests/test_observable/test_filter.py +++ b/tests/test_observable/test_filter.py @@ -252,7 +252,7 @@ def predicate(x: int) -> bool: def test_filter_dispose_in_predicate(self): scheduler = TestScheduler() invoked = [0] - ys = [None] + ys: list[Observable[int] | None] = [None] xs = scheduler.create_hot_observable( on_next(110, 1), on_next(180, 2), @@ -282,11 +282,11 @@ def predicate(x): return is_prime(x) ys[0] = xs.pipe(filter(predicate)) - return ys[0] scheduler.schedule_absolute(created, action) def action1(scheduler, state): + assert ys[0] is not None d.disposable = ys[0].subscribe(results) scheduler.schedule_absolute(subscribed, action1) @@ -510,7 +510,7 @@ def predicate(x, index): def test_filter_indexed_dispose_in_predicate(self): scheduler = TestScheduler() - ys = [None] + ys: list[Observable[int] | None] = [None] invoked = [0] xs = scheduler.create_hot_observable( on_next(110, 1), @@ -545,6 +545,7 @@ def predicate(x, index): scheduler.schedule_absolute(created, action1) def action2(scheduler, state): + assert ys[0] is not None d.disposable = ys[0].subscribe(results) scheduler.schedule_absolute(subscribed, action2) diff --git a/tests/test_observable/test_fromcallback.py b/tests/test_observable/test_fromcallback.py index ca46db720..02c9a06b1 100644 --- a/tests/test_observable/test_fromcallback.py +++ b/tests/test_observable/test_fromcallback.py @@ -37,7 +37,7 @@ def on_completed(): res.subscribe(on_next, on_error, on_completed) def test_from_callback_single(self): - res = reactivex.from_callback(lambda file, cb: cb(file))("file.txt") + res = reactivex.from_callback(lambda file, cb: cb(file))("file.txt") # type: ignore[call-arg] def on_next(r): self.assertEqual(r, "file.txt") @@ -52,7 +52,7 @@ def on_completed(): def test_from_node_callback_mapper(self): res = reactivex.from_callback(lambda f, s, t, cb: cb(f, s, t), lambda r: r[0])( - 1, 2, 3 + 1, 2, 3 # type: ignore[call-arg] ) def on_next(r): diff --git a/tests/test_observable/test_generate.py b/tests/test_observable/test_generate.py index 0dfdec253..0145a6027 100644 --- a/tests/test_observable/test_generate.py +++ b/tests/test_observable/test_generate.py @@ -30,7 +30,7 @@ def create(): return reactivex.generate( 0, lambda x: x <= 3, - lambda x: x + 1, + lambda x: x + 1, # type: ignore[arg-type] ) results = scheduler.start(create) @@ -51,7 +51,7 @@ def create(): return reactivex.generate( 0, lambda x: _raise("ex"), - lambda x: x + 1, + lambda x: x + 1, # type: ignore[arg-type] ) results = scheduler.start(create) @@ -80,7 +80,7 @@ def create(): return reactivex.generate( 0, lambda x: True, - lambda x: x + 1, + lambda x: x + 1, # type: ignore[arg-type] ) results = scheduler.start(create, disposed=200) @@ -93,7 +93,7 @@ def create(): return reactivex.generate( 0, lambda x: x <= 3, - lambda x: x + 1, + lambda x: x + 1, # type: ignore[arg-type] ).pipe(ops.repeat(2)) results = scheduler.start(create) diff --git a/tests/test_observable/test_interval.py b/tests/test_observable/test_interval.py index 8c2d1ab35..7287b2f24 100644 --- a/tests/test_observable/test_interval.py +++ b/tests/test_observable/test_interval.py @@ -75,7 +75,7 @@ def create(): def test_interval_timespan_observer_throws(self): scheduler = TestScheduler() xs = reactivex.interval(1) - xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler) + xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler) with self.assertRaises(RxException): scheduler.start() diff --git a/tests/test_observable/test_map.py b/tests/test_observable/test_map.py index 51466db62..6c8b85198 100644 --- a/tests/test_observable/test_map.py +++ b/tests/test_observable/test_map.py @@ -29,21 +29,22 @@ class TestSelect(unittest.TestCase): def test_map_throws(self): mapper = map(lambda x: x) with self.assertRaises(RxException): - return_value(1).pipe(mapper).subscribe(lambda x: _raise("ex")) + return_value(1).pipe(mapper).subscribe(lambda x: _raise(Exception("ex"))) with self.assertRaises(RxException): throw("ex").pipe(mapper).subscribe(on_error=lambda ex: _raise(ex)) with self.assertRaises(RxException): empty().pipe(mapper).subscribe( - lambda x: x, lambda ex: ex, lambda: _raise("ex") + lambda x: x, lambda ex: None, lambda: _raise(Exception("ex")) ) - def subscribe(observer, scheduler=None): - _raise("ex") + def subscribe(observer, scheduler=None): # type: ignore[misc] + _raise(Exception("ex")) + return None with self.assertRaises(RxException): - create(subscribe).pipe(map(lambda x: x)).subscribe() + create(subscribe).pipe(map(lambda x: x)).subscribe() # type: ignore[arg-type] def test_map_disposeinsidemapper(self): scheduler = TestScheduler() @@ -61,7 +62,7 @@ def projection(x, *args, **kw): d.dispose() return x - d.disposable = xs.pipe(map(projection)).subscribe(results, scheduler) + d.disposable = xs.pipe(map(projection)).subscribe(results, scheduler=scheduler) def action(scheduler, state): return d.dispose() @@ -263,10 +264,9 @@ def projection(x): assert invoked[0] == 3 def test_map_with_index_throws(self): + mapper = map_indexed(lambda x, index: x) with self.assertRaises(RxException): - mapper = map_indexed(lambda x, index: x) - - return return_value(1).pipe(mapper).subscribe(lambda x: _raise("ex")) + return return_value(1).pipe(mapper).subscribe(lambda x: _raise(Exception("ex"))) with self.assertRaises(RxException): return ( @@ -277,11 +277,11 @@ def test_map_with_index_throws(self): return ( empty() .pipe(mapper) - .subscribe(lambda x: x, lambda ex: None, lambda: _raise("ex")) + .subscribe(lambda x: x, lambda ex: None, lambda: _raise(Exception("ex"))) ) with self.assertRaises(RxException): - return create(lambda o, s: _raise("ex")).pipe(mapper).subscribe() + return create(lambda o, s: _raise(Exception("ex"))).pipe(mapper).subscribe() def test_map_with_index_dispose_inside_mapper(self): scheduler = TestScheduler() diff --git a/tests/test_observable/test_observeon.py b/tests/test_observable/test_observeon.py index e9e869a1f..0e6b0ac64 100644 --- a/tests/test_observable/test_observeon.py +++ b/tests/test_observable/test_observeon.py @@ -78,12 +78,12 @@ def test_observe_on_forward_subscribe_scheduler(self): actual_subscribe_scheduler = None - def subscribe(observer, scheduler): + def subscribe(observer, scheduler=None): nonlocal actual_subscribe_scheduler actual_subscribe_scheduler = scheduler observer.on_completed() - xs = reactivex.create(subscribe) + xs = reactivex.create(subscribe) # type: ignore[arg-type] xs.pipe(ops.observe_on(scheduler)).subscribe( scheduler=expected_subscribe_scheduler diff --git a/tests/test_observable/test_of.py b/tests/test_observable/test_of.py index 122a92490..4a368ffb1 100644 --- a/tests/test_observable/test_of.py +++ b/tests/test_observable/test_of.py @@ -48,7 +48,7 @@ def teest_of_with_scheduler_empty(self): scheduler = TestScheduler() def create(): - return reactivex.of(scheduler=scheduler) + return reactivex.of(scheduler=scheduler) # type: ignore[call-arg] results = scheduler.start(create=create) diff --git a/tests/test_observable/test_onerrorresumenext.py b/tests/test_observable/test_onerrorresumenext.py index d672e9e02..02a18596c 100644 --- a/tests/test_observable/test_onerrorresumenext.py +++ b/tests/test_observable/test_onerrorresumenext.py @@ -160,7 +160,7 @@ def test_on_error_resume_next_start_with_factory(self): msgs1 = [on_next(150, 1), on_next(210, 2), on_next(220, 3), on_error(230, "ex")] o1 = scheduler.create_hot_observable(msgs1) - def factory(ex: Exception): + def factory(ex: Exception | None): assert str(ex) == "ex" msgs2 = [on_next(240, 4), on_completed(250)] o2 = scheduler.create_hot_observable(msgs2) diff --git a/tests/test_observable/test_reduce.py b/tests/test_observable/test_reduce.py index 415663a79..ead871256 100644 --- a/tests/test_observable/test_reduce.py +++ b/tests/test_observable/test_reduce.py @@ -88,7 +88,7 @@ def create(): res = scheduler.start(create=create).messages assert len(res) == 1 - assert res[0].value.kind == "E" and res[0].value.exception is not None + assert res[0].value.kind == "E" and res[0].value.exception is not None # type: ignore[union-attr] assert res[0].time == 250 def test_reduce_without_seed_return(self): diff --git a/tests/test_observable/test_repeat.py b/tests/test_observable/test_repeat.py index 803244259..9189b6f65 100644 --- a/tests/test_observable/test_repeat.py +++ b/tests/test_observable/test_repeat.py @@ -143,21 +143,21 @@ def test_repeat_observable_error(self): def test_repeat_observable_throws(self): scheduler1 = TestScheduler() xs = reactivex.return_value(11).pipe(ops.repeat()) - xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler1) + xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler1) with self.assertRaises(RxException): scheduler1.start() scheduler2 = TestScheduler() ys = reactivex.throw("ex").pipe(ops.repeat()) - ys.subscribe(lambda ex: _raise("ex"), scheduler=scheduler2) + ys.subscribe(lambda ex: _raise(Exception("ex")), scheduler=scheduler2) with self.assertRaises(Exception): scheduler2.start() scheduler3 = TestScheduler() zs = reactivex.return_value(1).pipe(ops.repeat()) - d = zs.subscribe(on_completed=lambda: _raise("ex"), scheduler=scheduler3) + d = zs.subscribe(on_completed=lambda: _raise(Exception("ex")), scheduler=scheduler3) scheduler3.schedule_absolute(210, lambda sc, st: d.dispose()) scheduler3.start() @@ -236,14 +236,14 @@ def test_repeat_observable_repeat_count_error(self): def test_repeat_observable_repeat_count_throws(self): scheduler1 = TestScheduler() xs = reactivex.return_value(1).pipe(ops.repeat(3)) - xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler1) + xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler1) with self.assertRaises(RxException): scheduler1.start() scheduler2 = TestScheduler() ys = reactivex.throw("ex1").pipe(ops.repeat(3)) - ys.subscribe(on_error=lambda ex: _raise("ex2"), scheduler=scheduler2) + ys.subscribe(on_error=lambda ex: _raise(Exception("ex2")), scheduler=scheduler2) with self.assertRaises(RxException): scheduler2.start() diff --git a/tests/test_observable/test_retry.py b/tests/test_observable/test_retry.py index 8ef31716f..62f624ada 100644 --- a/tests/test_observable/test_retry.py +++ b/tests/test_observable/test_retry.py @@ -79,21 +79,21 @@ def test_retry_observable_error(self): def test_retry_observable_throws(self): scheduler1 = TestScheduler() xs = reactivex.return_value(1).pipe(ops.retry()) - xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler1) + xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler1) with pytest.raises(RxException): scheduler1.start() scheduler2 = TestScheduler() ys = reactivex.throw("ex").pipe(ops.retry()) - d = ys.subscribe(on_error=lambda ex: _raise("ex"), scheduler=scheduler2) + d = ys.subscribe(on_error=lambda ex: _raise(Exception("ex")), scheduler=scheduler2) scheduler2.schedule_absolute(210, lambda sc, st: d.dispose()) scheduler2.start() scheduler3 = TestScheduler() zs = reactivex.return_value(1).pipe(ops.retry()) - zs.subscribe(on_completed=lambda: _raise("ex"), scheduler=scheduler3) + zs.subscribe(on_completed=lambda: _raise(Exception("ex")), scheduler=scheduler3) with pytest.raises(RxException): scheduler3.start() @@ -168,13 +168,13 @@ def test_retry_observable_retry_count_dispose_iii(self): def test_retry_observable_retry_count_throws(self): scheduler1 = TestScheduler() xs = reactivex.return_value(1).pipe(ops.retry(3)) - xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler1) + xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler1) self.assertRaises(RxException, scheduler1.start) scheduler2 = TestScheduler() ys = reactivex.throw("ex").pipe(ops.retry(100)) - d = ys.subscribe(on_error=lambda ex: _raise("ex"), scheduler=scheduler2) + d = ys.subscribe(on_error=lambda ex: _raise(Exception("ex")), scheduler=scheduler2) def dispose(_, __): d.dispose() @@ -184,12 +184,12 @@ def dispose(_, __): scheduler3 = TestScheduler() zs = reactivex.return_value(1).pipe(ops.retry(100)) - zs.subscribe(on_completed=lambda: _raise("ex"), scheduler=scheduler3) + zs.subscribe(on_completed=lambda: _raise(Exception("ex")), scheduler=scheduler3) with pytest.raises(RxException): scheduler3.start() - xss = reactivex.create(lambda o: _raise("ex")).pipe(ops.retry(100)) + xss = reactivex.create(lambda o: _raise(Exception("ex"))).pipe(ops.retry(100)) # type: ignore[arg-type] with pytest.raises(Exception): xss.subscribe() diff --git a/tests/test_observable/test_returnvalue.py b/tests/test_observable/test_returnvalue.py index 8cc2959e0..b9010de59 100644 --- a/tests/test_observable/test_returnvalue.py +++ b/tests/test_observable/test_returnvalue.py @@ -70,14 +70,14 @@ def on_completed(): def test_return_observer_throws(self): scheduler1 = TestScheduler() xs = reactivex.return_value(1) - xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler1) + xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler1) self.assertRaises(RxException, scheduler1.start) scheduler2 = TestScheduler() ys = reactivex.return_value(1) ys.subscribe( - lambda x: x, lambda ex: ex, lambda: _raise("ex"), scheduler=scheduler2 + lambda x: None, lambda ex: None, lambda: _raise(Exception("ex")), scheduler=scheduler2 ) self.assertRaises(RxException, scheduler2.start) diff --git a/tests/test_observable/test_skipuntil.py b/tests/test_observable/test_skipuntil.py index a114bdabb..7218df448 100644 --- a/tests/test_observable/test_skipuntil.py +++ b/tests/test_observable/test_skipuntil.py @@ -159,8 +159,9 @@ def test_skip_until_has_completed_causes_disposal(self): def subscribe(observer, scheduler=None): disposed[0] = True + return None - r = Observable(subscribe) + r = Observable(subscribe) # type: ignore[arg-type] def create(): return left.pipe(ops.skip_until(r)) diff --git a/tests/test_observable/test_takeuntil.py b/tests/test_observable/test_takeuntil.py index 4ddb3b868..7ee6c76f8 100644 --- a/tests/test_observable/test_takeuntil.py +++ b/tests/test_observable/test_takeuntil.py @@ -179,7 +179,7 @@ def test_take_until_preempt_beforefirstproduced_remain_silent_and_proper_dispose right_msgs = [on_next(150, 1), on_next(210, 2), on_completed(220)] source_not_disposed = [False] - def action(): + def action(_): source_not_disposed[0] = True left = scheduler.create_hot_observable(left_msgs).pipe( @@ -203,7 +203,7 @@ def test_take_until_nopreempt_afterlastproduced_proper_disposed_signal(self): signal_not_disposed = [False] left = scheduler.create_hot_observable(left_msgs) - def action(): + def action(_): signal_not_disposed[0] = True right = scheduler.create_hot_observable(right_msgs).pipe( diff --git a/tests/test_observable/test_throw.py b/tests/test_observable/test_throw.py index 8b5815aea..9c410bfad 100644 --- a/tests/test_observable/test_throw.py +++ b/tests/test_observable/test_throw.py @@ -45,7 +45,7 @@ def test_throw_observer_throws(self): scheduler = TestScheduler() xs = throw("ex") xs.subscribe( - lambda x: None, lambda ex: _raise("ex"), lambda: None, scheduler=scheduler + lambda x: None, lambda ex: _raise(Exception("ex")), lambda: None, scheduler=scheduler ) self.assertRaises(RxException, scheduler.start) diff --git a/tests/test_observable/test_timer.py b/tests/test_observable/test_timer.py index 8f4009fa4..8cacc08be 100644 --- a/tests/test_observable/test_timer.py +++ b/tests/test_observable/test_timer.py @@ -57,7 +57,7 @@ def test_oneshot_timer_date_observer_throws(self): scheduler = TestScheduler() date = scheduler.to_datetime(250.0) xs = reactivex.timer(date) - xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler) + xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler) self.assertRaises(RxException, scheduler.start) @@ -100,13 +100,13 @@ def create(): def test_oneshot_timer_timespan_observer_throws(self): scheduler1 = TestScheduler() xs = reactivex.timer(11) - xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler1) + xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler1) self.assertRaises(RxException, scheduler1.start) scheduler2 = TestScheduler() ys = reactivex.timer(1, period=None) - ys.subscribe(on_completed=lambda: _raise("ex"), scheduler=scheduler2) + ys.subscribe(on_completed=lambda: _raise(Exception("ex")), scheduler=scheduler2) self.assertRaises(RxException, scheduler2.start) diff --git a/tests/test_observable/test_tofuture.py b/tests/test_observable/test_tofuture.py index e5a27fdab..eae1bdc70 100644 --- a/tests/test_observable/test_tofuture.py +++ b/tests/test_observable/test_tofuture.py @@ -102,7 +102,7 @@ def test_dispose_on_cancel(self): async def using_sub(): # Since the subject never completes, this await statement # will never be complete either. We wait forever. - await reactivex.using(lambda: sub, lambda s: s) + await reactivex.using(lambda: sub, lambda s: s) # type: ignore[arg-type] async def go(): await asyncio.wait_for(using_sub(), 0.1) diff --git a/tests/test_observable/test_toiterable.py b/tests/test_observable/test_toiterable.py index dc2455cd4..5a3f5e52b 100644 --- a/tests/test_observable/test_toiterable.py +++ b/tests/test_observable/test_toiterable.py @@ -27,8 +27,8 @@ def create(): assert len(results) == 2 assert results[0].time == 660 - assert results[0].value.kind == "N" - assert results[0].value.value == [2, 3, 4, 5] + assert results[0].value.kind == "N" # type: ignore[union-attr] + assert results[0].value.value == [2, 3, 4, 5] # type: ignore[union-attr] assert self.on_completed(660).equals(results[1]) assert xs.subscriptions == [self.subscribe(200, 660)] diff --git a/tests/test_observable/test_while_do.py b/tests/test_observable/test_while_do.py index 125e6fa51..a161d9f7e 100644 --- a/tests/test_observable/test_while_do.py +++ b/tests/test_observable/test_while_do.py @@ -105,9 +105,9 @@ def predicate(x): def subscribe(o, scheduler=None): o.on_next(1) o.on_completed() - return lambda: None + return None - return reactivex.create(subscribe).pipe(ops.while_do(predicate)) + return reactivex.create(subscribe).pipe(ops.while_do(predicate)) # type: ignore[arg-type] results = scheduler.start(create=create) From 8fcfde896e7ff930e9402d58c3fb298708615af8 Mon Sep 17 00:00:00 2001 From: Dag Brattli Date: Sun, 26 Apr 2026 13:26:36 +0200 Subject: [PATCH 2/2] refactor(tests): replace type:ignore comments with proper type annotations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Builds on PR #782 to replace the `# type: ignore` suppressions with real type annotations and structural fixes: - Use `isinstance(value, OnError|OnNext)` to narrow `Recorded.value` instead of `# type: ignore[union-attr]` (test_average, test_elementat, test_reduce, test_toiterable). - Properly annotate test-local `subscribe` functions with `ObserverBase[T] / SchedulerBase | None / DisposableBase` and return `Disposable()` so `reactivex.create(...)` accepts them without `# type: ignore[misc]/[arg-type]` (test_concat, test_observeon, test_map, test_skipuntil, test_while_do). - Type `xs` as `list[ColdObservable[int] | None]` in test_defer instead of bare `[None]`, and add `assert xs[0] is not None` narrowing. - Annotate `_raise` as `NoReturn` in test_map, test_retry, test_generate so the surrounding code type-checks without trailing dummy returns. - Use a typed module-level seed in test_generate to widen `Literal[0]` to `int` so `lambda x: x + 1` matches `Mapper[int, int]`. - `cast(Observable[int], s)` in test_tofuture instead of `# type: ignore`. Also widens two clearly-incorrect public type signatures (the implementation already supports the wider input): - `delay()` and `delay_()` now accept `AbsoluteOrRelativeTime` instead of only `RelativeTime`. The implementation already branches on `isinstance(duetime, datetime)`, so this just makes the signature match reality and removes 5 `# type: ignore[arg-type]` in test_delay. - `from_callback()` now returns `Callable[..., Observable[Any]]` instead of `Callable[[], Observable[Any]]`. The inner `function(*args)` already accepts arbitrary args, so this matches reality and removes 2 `# type: ignore[call-arg]` in test_fromcallback. Only `test_of.py:51` keeps a `# type: ignore[call-arg]` — it's in a test that never runs (typo `teest_` in the name) and intentionally calls `reactivex.of()` with a kwarg that doesn't exist; fixing it is out of scope. Pyright strict: 0 errors. All 1494 tests pass. --- reactivex/__init__.py | 2 +- reactivex/observable/fromcallback.py | 2 +- reactivex/operators/__init__.py | 3 +- reactivex/operators/_delay.py | 4 +-- tests/test_observable/test_average.py | 3 +- tests/test_observable/test_concat.py | 34 +++++++++++++++++----- tests/test_observable/test_defer.py | 16 ++++++---- tests/test_observable/test_delay.py | 10 +++---- tests/test_observable/test_elementat.py | 6 ++-- tests/test_observable/test_empty.py | 5 +++- tests/test_observable/test_fromcallback.py | 4 +-- tests/test_observable/test_generate.py | 22 ++++++++------ tests/test_observable/test_map.py | 23 ++++++++++----- tests/test_observable/test_observeon.py | 10 +++++-- tests/test_observable/test_reduce.py | 3 +- tests/test_observable/test_repeat.py | 4 ++- tests/test_observable/test_retry.py | 15 +++++++--- tests/test_observable/test_returnvalue.py | 5 +++- tests/test_observable/test_skipuntil.py | 12 +++++--- tests/test_observable/test_throw.py | 5 +++- tests/test_observable/test_tofuture.py | 4 ++- tests/test_observable/test_toiterable.py | 6 ++-- tests/test_observable/test_while_do.py | 11 +++++-- 23 files changed, 143 insertions(+), 66 deletions(-) diff --git a/reactivex/__init__.py b/reactivex/__init__.py index 54e011b79..e5e93c873 100644 --- a/reactivex/__init__.py +++ b/reactivex/__init__.py @@ -469,7 +469,7 @@ def from_callable( def from_callback( func: Callable[..., Callable[..., None]], mapper: typing.Mapper[Any, Any] | None = None, -) -> Callable[[], Observable[Any]]: +) -> Callable[..., Observable[Any]]: """Converts a callback function to an observable sequence. Args: diff --git a/reactivex/observable/fromcallback.py b/reactivex/observable/fromcallback.py index c4dc83a39..fb275c525 100644 --- a/reactivex/observable/fromcallback.py +++ b/reactivex/observable/fromcallback.py @@ -8,7 +8,7 @@ def from_callback_( func: Callable[..., Callable[..., None]], mapper: typing.Mapper[Any, Any] | None = None, -) -> Callable[[], Observable[Any]]: +) -> Callable[..., Observable[Any]]: """Converts a callback function to an observable sequence. Args: diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index ad4583a4b..b7e73ee85 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -707,7 +707,8 @@ def dematerialize() -> Callable[[Observable[Notification[_T]]], Observable[_T]]: def delay( - duetime: typing.RelativeTime, scheduler: abc.SchedulerBase | None = None + duetime: typing.AbsoluteOrRelativeTime, + scheduler: abc.SchedulerBase | None = None, ) -> Callable[[Observable[_T]], Observable[_T]]: """The delay operator. diff --git a/reactivex/operators/_delay.py b/reactivex/operators/_delay.py index 404b8f452..846732fed 100644 --- a/reactivex/operators/_delay.py +++ b/reactivex/operators/_delay.py @@ -20,7 +20,7 @@ def observable_delay_timespan( source: Observable[_T], - duetime: typing.RelativeTime, + duetime: typing.AbsoluteOrRelativeTime, scheduler: abc.SchedulerBase | None = None, ) -> Observable[_T]: def subscribe( @@ -119,7 +119,7 @@ def action(scheduler: abc.SchedulerBase, state: Any = None): @curry_flip def delay_( source: Observable[_T], - duetime: typing.RelativeTime, + duetime: typing.AbsoluteOrRelativeTime, scheduler: abc.SchedulerBase | None = None, ) -> Observable[_T]: """Time shifts the observable sequence. diff --git a/tests/test_observable/test_average.py b/tests/test_observable/test_average.py index 3fc82db7d..da0c9e1c6 100644 --- a/tests/test_observable/test_average.py +++ b/tests/test_observable/test_average.py @@ -1,6 +1,7 @@ import unittest from reactivex import operators as _ +from reactivex.notification import OnError from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -20,7 +21,7 @@ def test_average_int32_empty(self): res = scheduler.start(create=lambda: xs.pipe(_.average())).messages assert len(res) == 1 - assert res[0].value.kind == "E" and res[0].value.exception is not None # type: ignore[union-attr] + assert isinstance(res[0].value, OnError) and res[0].value.exception is not None assert res[0].time == 250 def test_average_int32_return(self): diff --git a/tests/test_observable/test_concat.py b/tests/test_observable/test_concat.py index 4bf04bf03..b3c2a4b00 100644 --- a/tests/test_observable/test_concat.py +++ b/tests/test_observable/test_concat.py @@ -1,7 +1,9 @@ import unittest import reactivex +from reactivex import abc from reactivex import operators as ops +from reactivex.disposable import Disposable from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -227,16 +229,24 @@ def test_concat_forward_scheduler(self): scheduler = TestScheduler() subscribe_schedulers: dict[str, object] = {"e1": "unknown", "e2": "unknown"} - def subscribe_e1(observer, scheduler=None): # type: ignore[misc] + def subscribe_e1( + observer: abc.ObserverBase[int], + scheduler: abc.SchedulerBase | None = None, + ) -> abc.DisposableBase: subscribe_schedulers["e1"] = scheduler observer.on_completed() + return Disposable() - def subscribe_e2(observer, scheduler=None): # type: ignore[misc] + def subscribe_e2( + observer: abc.ObserverBase[int], + scheduler: abc.SchedulerBase | None = None, + ) -> abc.DisposableBase: subscribe_schedulers["e2"] = scheduler observer.on_completed() + return Disposable() - e1 = reactivex.create(subscribe_e1) # type: ignore[arg-type] - e2 = reactivex.create(subscribe_e2) # type: ignore[arg-type] + e1 = reactivex.create(subscribe_e1) + e2 = reactivex.create(subscribe_e2) stream = e1.pipe(ops.concat(e2)) stream.subscribe(scheduler=scheduler) @@ -247,16 +257,24 @@ def subscribe_e2(observer, scheduler=None): # type: ignore[misc] def test_concat_forward_none_scheduler(self): subscribe_schedulers: dict[str, object] = {"e1": "unknown", "e2": "unknown"} - def subscribe_e1(observer, scheduler=None): # type: ignore[misc] + def subscribe_e1( + observer: abc.ObserverBase[int], + scheduler: abc.SchedulerBase | None = None, + ) -> abc.DisposableBase: subscribe_schedulers["e1"] = scheduler observer.on_completed() + return Disposable() - def subscribe_e2(observer, scheduler=None): # type: ignore[misc] + def subscribe_e2( + observer: abc.ObserverBase[int], + scheduler: abc.SchedulerBase | None = None, + ) -> abc.DisposableBase: subscribe_schedulers["e2"] = scheduler observer.on_completed() + return Disposable() - e1 = reactivex.create(subscribe_e1) # type: ignore[arg-type] - e2 = reactivex.create(subscribe_e2) # type: ignore[arg-type] + e1 = reactivex.create(subscribe_e1) + e2 = reactivex.create(subscribe_e2) stream = e1.pipe(ops.concat(e2)) stream.subscribe() diff --git a/tests/test_observable/test_defer.py b/tests/test_observable/test_defer.py index 20e720334..669fbf406 100644 --- a/tests/test_observable/test_defer.py +++ b/tests/test_observable/test_defer.py @@ -2,6 +2,7 @@ import reactivex from reactivex.testing import ReactiveTest, TestScheduler +from reactivex.testing.coldobservable import ColdObservable on_next = ReactiveTest.on_next on_completed = ReactiveTest.on_completed @@ -23,7 +24,7 @@ def _raise(ex): class TestDefer(unittest.TestCase): def test_defer_complete(self): - xs = [None] + xs: list[ColdObservable[int] | None] = [None] invoked = [0] scheduler = TestScheduler() @@ -40,12 +41,13 @@ def defer(scheduler): results = scheduler.start(create) assert results.messages == [on_next(300, 200), on_completed(400)] assert 1 == invoked[0] - assert xs[0].subscriptions == [subscribe(200, 400)] # type: ignore[union-attr] + assert xs[0] is not None + assert xs[0].subscriptions == [subscribe(200, 400)] def test_defer_error(self): scheduler = TestScheduler() invoked = [0] - xs = [None] + xs: list[ColdObservable[int] | None] = [None] ex = "ex" def create(): @@ -62,12 +64,13 @@ def defer(scheduler): assert results.messages == [on_next(300, 200), on_error(400, ex)] assert 1 == invoked[0] - assert xs[0].subscriptions == [subscribe(200, 400)] # type: ignore[union-attr] + assert xs[0] is not None + assert xs[0].subscriptions == [subscribe(200, 400)] def test_defer_dispose(self): scheduler = TestScheduler() invoked = [0] - xs = [None] + xs: list[ColdObservable[int] | None] = [None] def create(): def defer(scheduler): @@ -84,7 +87,8 @@ def defer(scheduler): results = scheduler.start(create) assert results.messages == [on_next(300, 200), on_next(400, 1)] assert 1 == invoked[0] - assert xs[0].subscriptions == [subscribe(200, 1000)] # type: ignore[union-attr] + assert xs[0] is not None + assert xs[0].subscriptions == [subscribe(200, 1000)] def test_defer_on_error(self): scheduler = TestScheduler() diff --git a/tests/test_observable/test_delay.py b/tests/test_observable/test_delay.py index 6710afda7..d11000bb9 100644 --- a/tests/test_observable/test_delay.py +++ b/tests/test_observable/test_delay.py @@ -63,7 +63,7 @@ def test_delay_datetime_offset_simple1_impl(self): def create(): dt = datetime.fromtimestamp(300.0, tz=timezone.utc) - return xs.pipe(delay(dt)) # type: ignore[arg-type] + return xs.pipe(delay(dt)) results = scheduler.start(create) assert results.messages == [ @@ -107,7 +107,7 @@ def test_delay_datetime_offset_simple2_impl(self): ) def create(): - return xs.pipe(delay(datetime.fromtimestamp(250, tz=timezone.utc))) # type: ignore[arg-type] + return xs.pipe(delay(datetime.fromtimestamp(250, tz=timezone.utc))) results = scheduler.start(create) @@ -153,7 +153,7 @@ def test_delay_datetime_offset_simple3_impl(self): ) def create(): - return xs.pipe(delay(datetime.fromtimestamp(350, tz=timezone.utc))) # type: ignore[arg-type] + return xs.pipe(delay(datetime.fromtimestamp(350, tz=timezone.utc))) results = scheduler.start(create) @@ -201,7 +201,7 @@ def test_delay_datetime_offset_error1_impl(self): ) def create(): - return xs.pipe(delay(datetime.fromtimestamp(250, tz=timezone.utc))) # type: ignore[arg-type] + return xs.pipe(delay(datetime.fromtimestamp(250, tz=timezone.utc))) results = scheduler.start(create) @@ -244,7 +244,7 @@ def test_delay_datetime_offset_error2_impl(self): ) def create(): - return xs.pipe(delay(datetime.fromtimestamp(350, tz=timezone.utc))) # type: ignore[arg-type] + return xs.pipe(delay(datetime.fromtimestamp(350, tz=timezone.utc))) results = scheduler.start(create) diff --git a/tests/test_observable/test_elementat.py b/tests/test_observable/test_elementat.py index 18fcfc8e6..5dd7c9cb7 100644 --- a/tests/test_observable/test_elementat.py +++ b/tests/test_observable/test_elementat.py @@ -1,6 +1,7 @@ import unittest from reactivex import operators as ops +from reactivex.notification import OnError from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -54,8 +55,9 @@ def create(): self.assertEqual(1, len(results.messages)) self.assertEqual(600, results.messages[0].time) - self.assertEqual("E", results.messages[0].value.kind) # type: ignore[union-attr] - assert results.messages[0].value.exception # type: ignore[union-attr] + value = results.messages[0].value + assert isinstance(value, OnError) + assert value.exception def test_elementat_error(self): ex = "ex" diff --git a/tests/test_observable/test_empty.py b/tests/test_observable/test_empty.py index 17296e72a..66dc5ffd2 100644 --- a/tests/test_observable/test_empty.py +++ b/tests/test_observable/test_empty.py @@ -45,7 +45,10 @@ def test_empty_observer_throw_exception(self): scheduler = TestScheduler() xs = empty() xs.subscribe( - lambda x: None, lambda ex: None, lambda: _raise(Exception("ex")), scheduler=scheduler + lambda x: None, + lambda ex: None, + lambda: _raise(Exception("ex")), + scheduler=scheduler, ) with self.assertRaises(RxException): diff --git a/tests/test_observable/test_fromcallback.py b/tests/test_observable/test_fromcallback.py index 02c9a06b1..ca46db720 100644 --- a/tests/test_observable/test_fromcallback.py +++ b/tests/test_observable/test_fromcallback.py @@ -37,7 +37,7 @@ def on_completed(): res.subscribe(on_next, on_error, on_completed) def test_from_callback_single(self): - res = reactivex.from_callback(lambda file, cb: cb(file))("file.txt") # type: ignore[call-arg] + res = reactivex.from_callback(lambda file, cb: cb(file))("file.txt") def on_next(r): self.assertEqual(r, "file.txt") @@ -52,7 +52,7 @@ def on_completed(): def test_from_node_callback_mapper(self): res = reactivex.from_callback(lambda f, s, t, cb: cb(f, s, t), lambda r: r[0])( - 1, 2, 3 # type: ignore[call-arg] + 1, 2, 3 ) def on_next(r): diff --git a/tests/test_observable/test_generate.py b/tests/test_observable/test_generate.py index 0145a6027..f58a190f3 100644 --- a/tests/test_observable/test_generate.py +++ b/tests/test_observable/test_generate.py @@ -1,4 +1,5 @@ import unittest +from typing import Any, NoReturn import reactivex from reactivex import operators as ops @@ -18,19 +19,22 @@ class RxException(Exception): # Helper function for raising exceptions within lambdas -def _raise(ex): +def _raise(ex: Any) -> NoReturn: raise RxException(ex) +_INITIAL: int = 0 + + class TestGenerate(unittest.TestCase): def test_generate_finite(self): scheduler = TestScheduler() def create(): return reactivex.generate( - 0, + _INITIAL, lambda x: x <= 3, - lambda x: x + 1, # type: ignore[arg-type] + lambda x: x + 1, ) results = scheduler.start(create) @@ -49,9 +53,9 @@ def test_generate_throw_condition(self): def create(): return reactivex.generate( - 0, + _INITIAL, lambda x: _raise("ex"), - lambda x: x + 1, # type: ignore[arg-type] + lambda x: x + 1, ) results = scheduler.start(create) @@ -78,9 +82,9 @@ def test_generate_dispose(self): def create(): return reactivex.generate( - 0, + _INITIAL, lambda x: True, - lambda x: x + 1, # type: ignore[arg-type] + lambda x: x + 1, ) results = scheduler.start(create, disposed=200) @@ -91,9 +95,9 @@ def test_generate_repeat(self): def create(): return reactivex.generate( - 0, + _INITIAL, lambda x: x <= 3, - lambda x: x + 1, # type: ignore[arg-type] + lambda x: x + 1, ).pipe(ops.repeat(2)) results = scheduler.start(create) diff --git a/tests/test_observable/test_map.py b/tests/test_observable/test_map.py index 6c8b85198..2937d2a94 100644 --- a/tests/test_observable/test_map.py +++ b/tests/test_observable/test_map.py @@ -1,6 +1,7 @@ import unittest +from typing import Any, NoReturn -from reactivex import create, empty, return_value, throw +from reactivex import abc, create, empty, return_value, throw from reactivex.disposable import SerialDisposable from reactivex.operators import map, map_indexed from reactivex.testing import ReactiveTest, TestScheduler @@ -21,7 +22,7 @@ class RxException(Exception): # Helper function for raising exceptions within lambdas -def _raise(ex): +def _raise(ex: Any) -> NoReturn: raise RxException(ex) @@ -39,12 +40,14 @@ def test_map_throws(self): lambda x: x, lambda ex: None, lambda: _raise(Exception("ex")) ) - def subscribe(observer, scheduler=None): # type: ignore[misc] + def subscribe( + observer: abc.ObserverBase[int], + scheduler: abc.SchedulerBase | None = None, + ) -> abc.DisposableBase: _raise(Exception("ex")) - return None with self.assertRaises(RxException): - create(subscribe).pipe(map(lambda x: x)).subscribe() # type: ignore[arg-type] + create(subscribe).pipe(map(lambda x: x)).subscribe() def test_map_disposeinsidemapper(self): scheduler = TestScheduler() @@ -266,7 +269,11 @@ def projection(x): def test_map_with_index_throws(self): mapper = map_indexed(lambda x, index: x) with self.assertRaises(RxException): - return return_value(1).pipe(mapper).subscribe(lambda x: _raise(Exception("ex"))) + return ( + return_value(1) + .pipe(mapper) + .subscribe(lambda x: _raise(Exception("ex"))) + ) with self.assertRaises(RxException): return ( @@ -277,7 +284,9 @@ def test_map_with_index_throws(self): return ( empty() .pipe(mapper) - .subscribe(lambda x: x, lambda ex: None, lambda: _raise(Exception("ex"))) + .subscribe( + lambda x: x, lambda ex: None, lambda: _raise(Exception("ex")) + ) ) with self.assertRaises(RxException): diff --git a/tests/test_observable/test_observeon.py b/tests/test_observable/test_observeon.py index 0e6b0ac64..85f1855a1 100644 --- a/tests/test_observable/test_observeon.py +++ b/tests/test_observable/test_observeon.py @@ -1,7 +1,9 @@ import unittest import reactivex +from reactivex import abc from reactivex import operators as ops +from reactivex.disposable import Disposable from reactivex.scheduler import ImmediateScheduler from reactivex.testing import ReactiveTest, TestScheduler @@ -78,12 +80,16 @@ def test_observe_on_forward_subscribe_scheduler(self): actual_subscribe_scheduler = None - def subscribe(observer, scheduler=None): + def subscribe( + observer: abc.ObserverBase[int], + scheduler: abc.SchedulerBase | None = None, + ) -> abc.DisposableBase: nonlocal actual_subscribe_scheduler actual_subscribe_scheduler = scheduler observer.on_completed() + return Disposable() - xs = reactivex.create(subscribe) # type: ignore[arg-type] + xs = reactivex.create(subscribe) xs.pipe(ops.observe_on(scheduler)).subscribe( scheduler=expected_subscribe_scheduler diff --git a/tests/test_observable/test_reduce.py b/tests/test_observable/test_reduce.py index ead871256..a75efa285 100644 --- a/tests/test_observable/test_reduce.py +++ b/tests/test_observable/test_reduce.py @@ -2,6 +2,7 @@ import reactivex from reactivex import operators as ops +from reactivex.notification import OnError from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -88,7 +89,7 @@ def create(): res = scheduler.start(create=create).messages assert len(res) == 1 - assert res[0].value.kind == "E" and res[0].value.exception is not None # type: ignore[union-attr] + assert isinstance(res[0].value, OnError) and res[0].value.exception is not None assert res[0].time == 250 def test_reduce_without_seed_return(self): diff --git a/tests/test_observable/test_repeat.py b/tests/test_observable/test_repeat.py index 9189b6f65..f35d27dc7 100644 --- a/tests/test_observable/test_repeat.py +++ b/tests/test_observable/test_repeat.py @@ -157,7 +157,9 @@ def test_repeat_observable_throws(self): scheduler3 = TestScheduler() zs = reactivex.return_value(1).pipe(ops.repeat()) - d = zs.subscribe(on_completed=lambda: _raise(Exception("ex")), scheduler=scheduler3) + d = zs.subscribe( + on_completed=lambda: _raise(Exception("ex")), scheduler=scheduler3 + ) scheduler3.schedule_absolute(210, lambda sc, st: d.dispose()) scheduler3.start() diff --git a/tests/test_observable/test_retry.py b/tests/test_observable/test_retry.py index 62f624ada..d39cd869f 100644 --- a/tests/test_observable/test_retry.py +++ b/tests/test_observable/test_retry.py @@ -1,4 +1,5 @@ import unittest +from typing import Any, NoReturn import pytest @@ -20,7 +21,7 @@ class RxException(Exception): # Helper function for raising exceptions within lambdas -def _raise(ex): +def _raise(ex: Any) -> NoReturn: raise RxException(ex) @@ -86,7 +87,9 @@ def test_retry_observable_throws(self): scheduler2 = TestScheduler() ys = reactivex.throw("ex").pipe(ops.retry()) - d = ys.subscribe(on_error=lambda ex: _raise(Exception("ex")), scheduler=scheduler2) + d = ys.subscribe( + on_error=lambda ex: _raise(Exception("ex")), scheduler=scheduler2 + ) scheduler2.schedule_absolute(210, lambda sc, st: d.dispose()) scheduler2.start() @@ -174,7 +177,9 @@ def test_retry_observable_retry_count_throws(self): scheduler2 = TestScheduler() ys = reactivex.throw("ex").pipe(ops.retry(100)) - d = ys.subscribe(on_error=lambda ex: _raise(Exception("ex")), scheduler=scheduler2) + d = ys.subscribe( + on_error=lambda ex: _raise(Exception("ex")), scheduler=scheduler2 + ) def dispose(_, __): d.dispose() @@ -189,7 +194,9 @@ def dispose(_, __): with pytest.raises(RxException): scheduler3.start() - xss = reactivex.create(lambda o: _raise(Exception("ex"))).pipe(ops.retry(100)) # type: ignore[arg-type] + xss = reactivex.create(lambda o, s: _raise(Exception("ex"))).pipe( + ops.retry(100) + ) with pytest.raises(Exception): xss.subscribe() diff --git a/tests/test_observable/test_returnvalue.py b/tests/test_observable/test_returnvalue.py index b9010de59..0810342fe 100644 --- a/tests/test_observable/test_returnvalue.py +++ b/tests/test_observable/test_returnvalue.py @@ -77,7 +77,10 @@ def test_return_observer_throws(self): scheduler2 = TestScheduler() ys = reactivex.return_value(1) ys.subscribe( - lambda x: None, lambda ex: None, lambda: _raise(Exception("ex")), scheduler=scheduler2 + lambda x: None, + lambda ex: None, + lambda: _raise(Exception("ex")), + scheduler=scheduler2, ) self.assertRaises(RxException, scheduler2.start) diff --git a/tests/test_observable/test_skipuntil.py b/tests/test_observable/test_skipuntil.py index 7218df448..faa185046 100644 --- a/tests/test_observable/test_skipuntil.py +++ b/tests/test_observable/test_skipuntil.py @@ -1,8 +1,9 @@ import unittest import reactivex -from reactivex import Observable +from reactivex import Observable, abc from reactivex import operators as ops +from reactivex.disposable import Disposable from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -157,11 +158,14 @@ def test_skip_until_has_completed_causes_disposal(self): disposed = [False] left = scheduler.create_hot_observable(l_msgs) - def subscribe(observer, scheduler=None): + def subscribe( + observer: abc.ObserverBase[int], + scheduler: abc.SchedulerBase | None = None, + ) -> abc.DisposableBase: disposed[0] = True - return None + return Disposable() - r = Observable(subscribe) # type: ignore[arg-type] + r = Observable(subscribe) def create(): return left.pipe(ops.skip_until(r)) diff --git a/tests/test_observable/test_throw.py b/tests/test_observable/test_throw.py index 9c410bfad..05bac612f 100644 --- a/tests/test_observable/test_throw.py +++ b/tests/test_observable/test_throw.py @@ -45,7 +45,10 @@ def test_throw_observer_throws(self): scheduler = TestScheduler() xs = throw("ex") xs.subscribe( - lambda x: None, lambda ex: _raise(Exception("ex")), lambda: None, scheduler=scheduler + lambda x: None, + lambda ex: _raise(Exception("ex")), + lambda: None, + scheduler=scheduler, ) self.assertRaises(RxException, scheduler.start) diff --git a/tests/test_observable/test_tofuture.py b/tests/test_observable/test_tofuture.py index eae1bdc70..e72b74151 100644 --- a/tests/test_observable/test_tofuture.py +++ b/tests/test_observable/test_tofuture.py @@ -1,8 +1,10 @@ import asyncio import unittest +from typing import cast import reactivex import reactivex.operators as ops +from reactivex import Observable from reactivex.internal.exceptions import SequenceContainsNoElementsError from reactivex.subject import Subject from reactivex.testing import ReactiveTest @@ -102,7 +104,7 @@ def test_dispose_on_cancel(self): async def using_sub(): # Since the subject never completes, this await statement # will never be complete either. We wait forever. - await reactivex.using(lambda: sub, lambda s: s) # type: ignore[arg-type] + await reactivex.using(lambda: sub, lambda s: cast(Observable[int], s)) async def go(): await asyncio.wait_for(using_sub(), 0.1) diff --git a/tests/test_observable/test_toiterable.py b/tests/test_observable/test_toiterable.py index 5a3f5e52b..ff40c8e0b 100644 --- a/tests/test_observable/test_toiterable.py +++ b/tests/test_observable/test_toiterable.py @@ -1,6 +1,7 @@ import unittest from reactivex import operators as ops +from reactivex.notification import OnNext from reactivex.testing import ReactiveTest, TestScheduler @@ -27,8 +28,9 @@ def create(): assert len(results) == 2 assert results[0].time == 660 - assert results[0].value.kind == "N" # type: ignore[union-attr] - assert results[0].value.value == [2, 3, 4, 5] # type: ignore[union-attr] + value = results[0].value + assert isinstance(value, OnNext) + assert value.value == [2, 3, 4, 5] assert self.on_completed(660).equals(results[1]) assert xs.subscriptions == [self.subscribe(200, 660)] diff --git a/tests/test_observable/test_while_do.py b/tests/test_observable/test_while_do.py index a161d9f7e..2992ef6be 100644 --- a/tests/test_observable/test_while_do.py +++ b/tests/test_observable/test_while_do.py @@ -1,7 +1,9 @@ import unittest import reactivex +from reactivex import abc from reactivex import operators as ops +from reactivex.disposable import Disposable from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -102,12 +104,15 @@ def predicate(x): n[0] += 1 return n[0] < 100 - def subscribe(o, scheduler=None): + def subscribe( + o: abc.ObserverBase[int], + scheduler: abc.SchedulerBase | None = None, + ) -> abc.DisposableBase: o.on_next(1) o.on_completed() - return None + return Disposable() - return reactivex.create(subscribe).pipe(ops.while_do(predicate)) # type: ignore[arg-type] + return reactivex.create(subscribe).pipe(ops.while_do(predicate)) results = scheduler.start(create=create)