Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion reactivex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ def from_callable(
def from_callback(
func: Callable[..., Callable[..., None]],
mapper: typing.Mapper[Any, Any] | None = None,
) -> Callable[[], Observable[Any]]:
) -> Callable[..., Observable[Any]]:
"""Converts a callback function to an observable sequence.

Args:
Expand Down
2 changes: 1 addition & 1 deletion reactivex/observable/fromcallback.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
def from_callback_(
func: Callable[..., Callable[..., None]],
mapper: typing.Mapper[Any, Any] | None = None,
) -> Callable[[], Observable[Any]]:
) -> Callable[..., Observable[Any]]:
"""Converts a callback function to an observable sequence.

Args:
Expand Down
3 changes: 2 additions & 1 deletion reactivex/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,8 @@ def dematerialize() -> Callable[[Observable[Notification[_T]]], Observable[_T]]:


def delay(
duetime: typing.RelativeTime, scheduler: abc.SchedulerBase | None = None
duetime: typing.AbsoluteOrRelativeTime,
scheduler: abc.SchedulerBase | None = None,
) -> Callable[[Observable[_T]], Observable[_T]]:
"""The delay operator.

Expand Down
4 changes: 2 additions & 2 deletions reactivex/operators/_delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

def observable_delay_timespan(
source: Observable[_T],
duetime: typing.RelativeTime,
duetime: typing.AbsoluteOrRelativeTime,
scheduler: abc.SchedulerBase | None = None,
) -> Observable[_T]:
def subscribe(
Expand Down Expand Up @@ -119,7 +119,7 @@ def action(scheduler: abc.SchedulerBase, state: Any = None):
@curry_flip
def delay_(
source: Observable[_T],
duetime: typing.RelativeTime,
duetime: typing.AbsoluteOrRelativeTime,
scheduler: abc.SchedulerBase | None = None,
) -> Observable[_T]:
"""Time shifts the observable sequence.
Expand Down
8 changes: 4 additions & 4 deletions tests/test_observable/test_amb.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_amb_regular_should_dispose_loser(self):
source_not_disposed = [False]
o1 = scheduler.create_hot_observable(msgs1)

def action():
def action(_):
source_not_disposed[0] = True

o2 = scheduler.create_hot_observable(msgs2).pipe(
Expand All @@ -91,7 +91,7 @@ def test_amb_winner_throws(self):
source_not_disposed = [False]
o1 = scheduler.create_hot_observable(msgs1)

def action():
def action(_):
source_not_disposed[0] = True

o2 = scheduler.create_hot_observable(msgs2).pipe(
Expand All @@ -112,7 +112,7 @@ def test_amb_loser_throws(self):
msgs2 = [on_next(150, 1), on_next(210, 3), on_completed(250)]
source_not_disposed = [False]

def action():
def action(_):
source_not_disposed[0] = True

o1 = scheduler.create_hot_observable(msgs1).pipe(
Expand All @@ -136,7 +136,7 @@ def test_amb_throws_before_election(self):
source_not_disposed = [False]
o1 = scheduler.create_hot_observable(msgs1)

def action():
def action(_):
source_not_disposed[0] = True

o2 = scheduler.create_hot_observable(msgs2).pipe(
Expand Down
3 changes: 2 additions & 1 deletion tests/test_observable/test_average.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import unittest

from reactivex import operators as _
from reactivex.notification import OnError
from reactivex.testing import ReactiveTest, TestScheduler

on_next = ReactiveTest.on_next
Expand All @@ -20,7 +21,7 @@ def test_average_int32_empty(self):
res = scheduler.start(create=lambda: xs.pipe(_.average())).messages

assert len(res) == 1
assert res[0].value.kind == "E" and res[0].value.exception is not None
assert isinstance(res[0].value, OnError) and res[0].value.exception is not None
assert res[0].time == 250

def test_average_int32_return(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_observable/test_combinelatest.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ def create():
]

def test_combine_latest_mapper_throws(self):
ex = "ex"
ex = Exception("ex")
scheduler = TestScheduler()
msgs1 = [on_next(150, 1), on_next(215, 2), on_completed(230)]
msgs2 = [on_next(150, 1), on_next(220, 3), on_completed(240)]
Expand Down
30 changes: 24 additions & 6 deletions tests/test_observable/test_concat.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import unittest

import reactivex
from reactivex import abc
from reactivex import operators as ops
from reactivex.disposable import Disposable
from reactivex.testing import ReactiveTest, TestScheduler

on_next = ReactiveTest.on_next
Expand Down Expand Up @@ -225,15 +227,23 @@ def create():

def test_concat_forward_scheduler(self):
scheduler = TestScheduler()
subscribe_schedulers = {"e1": "unknown", "e2": "unknown"}
subscribe_schedulers: dict[str, object] = {"e1": "unknown", "e2": "unknown"}

def subscribe_e1(observer, scheduler="not_set"):
def subscribe_e1(
observer: abc.ObserverBase[int],
scheduler: abc.SchedulerBase | None = None,
) -> abc.DisposableBase:
subscribe_schedulers["e1"] = scheduler
observer.on_completed()
return Disposable()

def subscribe_e2(observer, scheduler="not_set"):
def subscribe_e2(
observer: abc.ObserverBase[int],
scheduler: abc.SchedulerBase | None = None,
) -> abc.DisposableBase:
subscribe_schedulers["e2"] = scheduler
observer.on_completed()
return Disposable()

e1 = reactivex.create(subscribe_e1)
e2 = reactivex.create(subscribe_e2)
Expand All @@ -245,15 +255,23 @@ def subscribe_e2(observer, scheduler="not_set"):
assert subscribe_schedulers["e2"] is scheduler

def test_concat_forward_none_scheduler(self):
subscribe_schedulers = {"e1": "unknown", "e2": "unknown"}
subscribe_schedulers: dict[str, object] = {"e1": "unknown", "e2": "unknown"}

def subscribe_e1(observer, scheduler="not_set"):
def subscribe_e1(
observer: abc.ObserverBase[int],
scheduler: abc.SchedulerBase | None = None,
) -> abc.DisposableBase:
subscribe_schedulers["e1"] = scheduler
observer.on_completed()
return Disposable()

def subscribe_e2(observer, scheduler="not_set"):
def subscribe_e2(
observer: abc.ObserverBase[int],
scheduler: abc.SchedulerBase | None = None,
) -> abc.DisposableBase:
subscribe_schedulers["e2"] = scheduler
observer.on_completed()
return Disposable()

e1 = reactivex.create(subscribe_e1)
e2 = reactivex.create(subscribe_e2)
Expand Down
4 changes: 2 additions & 2 deletions tests/test_observable/test_debounce.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def mapper(x):
assert xs.subscriptions == [subscribe(200, 460)]

def test_debounce_duration_mapper_throws(self):
ex = "ex"
ex = Exception("ex")
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(
on_next(150, 1),
Expand All @@ -376,7 +376,7 @@ def mapper(x):
on_next(x * 10, "Ignore"), on_next(x * 10 + 5, "Aargh!")
)
else:
_raise(ex)
raise ex

return xs.pipe(_.throttle_with_mapper(mapper))

Expand Down
10 changes: 7 additions & 3 deletions tests/test_observable/test_defer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import reactivex
from reactivex.testing import ReactiveTest, TestScheduler
from reactivex.testing.coldobservable import ColdObservable

on_next = ReactiveTest.on_next
on_completed = ReactiveTest.on_completed
Expand All @@ -23,7 +24,7 @@ def _raise(ex):

class TestDefer(unittest.TestCase):
def test_defer_complete(self):
xs = [None]
xs: list[ColdObservable[int] | None] = [None]
invoked = [0]
scheduler = TestScheduler()

Expand All @@ -40,12 +41,13 @@ def defer(scheduler):
results = scheduler.start(create)
assert results.messages == [on_next(300, 200), on_completed(400)]
assert 1 == invoked[0]
assert xs[0] is not None
assert xs[0].subscriptions == [subscribe(200, 400)]

def test_defer_error(self):
scheduler = TestScheduler()
invoked = [0]
xs = [None]
xs: list[ColdObservable[int] | None] = [None]
ex = "ex"

def create():
Expand All @@ -62,12 +64,13 @@ def defer(scheduler):

assert results.messages == [on_next(300, 200), on_error(400, ex)]
assert 1 == invoked[0]
assert xs[0] is not None
assert xs[0].subscriptions == [subscribe(200, 400)]

def test_defer_dispose(self):
scheduler = TestScheduler()
invoked = [0]
xs = [None]
xs: list[ColdObservable[int] | None] = [None]

def create():
def defer(scheduler):
Expand All @@ -84,6 +87,7 @@ def defer(scheduler):
results = scheduler.start(create)
assert results.messages == [on_next(300, 200), on_next(400, 1)]
assert 1 == invoked[0]
assert xs[0] is not None
assert xs[0].subscriptions == [subscribe(200, 1000)]

def test_defer_on_error(self):
Expand Down
1 change: 0 additions & 1 deletion tests/test_observable/test_doaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def test_do_plain_action(self):
def create():
def action(x):
i[0] += 1
return i[0]

return xs.pipe(_.do_action(action))

Expand Down
6 changes: 4 additions & 2 deletions tests/test_observable/test_elementat.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import unittest

from reactivex import operators as ops
from reactivex.notification import OnError
from reactivex.testing import ReactiveTest, TestScheduler

on_next = ReactiveTest.on_next
Expand Down Expand Up @@ -54,8 +55,9 @@ def create():

self.assertEqual(1, len(results.messages))
self.assertEqual(600, results.messages[0].time)
self.assertEqual("E", results.messages[0].value.kind)
assert results.messages[0].value.exception
value = results.messages[0].value
assert isinstance(value, OnError)
assert value.exception

def test_elementat_error(self):
ex = "ex"
Expand Down
5 changes: 4 additions & 1 deletion tests/test_observable/test_empty.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ def test_empty_observer_throw_exception(self):
scheduler = TestScheduler()
xs = empty()
xs.subscribe(
lambda x: None, lambda ex: None, lambda: _raise("ex"), scheduler=scheduler
lambda x: None,
lambda ex: None,
lambda: _raise(Exception("ex")),
scheduler=scheduler,
)

with self.assertRaises(RxException):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_observable/test_expand.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_expand_empty(self):
xs = scheduler.create_hot_observable(on_completed(300))

def create():
def mapper():
def mapper(x):
return scheduler.create_cold_observable(
on_next(100, 1), on_next(200, 2), on_completed(300)
)
Expand Down
7 changes: 4 additions & 3 deletions tests/test_observable/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def predicate(x: int) -> bool:
def test_filter_dispose_in_predicate(self):
scheduler = TestScheduler()
invoked = [0]
ys = [None]
ys: list[Observable[int] | None] = [None]
xs = scheduler.create_hot_observable(
on_next(110, 1),
on_next(180, 2),
Expand Down Expand Up @@ -282,11 +282,11 @@ def predicate(x):
return is_prime(x)

ys[0] = xs.pipe(filter(predicate))
return ys[0]

scheduler.schedule_absolute(created, action)

def action1(scheduler, state):
assert ys[0] is not None
d.disposable = ys[0].subscribe(results)

scheduler.schedule_absolute(subscribed, action1)
Expand Down Expand Up @@ -510,7 +510,7 @@ def predicate(x, index):

def test_filter_indexed_dispose_in_predicate(self):
scheduler = TestScheduler()
ys = [None]
ys: list[Observable[int] | None] = [None]
invoked = [0]
xs = scheduler.create_hot_observable(
on_next(110, 1),
Expand Down Expand Up @@ -545,6 +545,7 @@ def predicate(x, index):
scheduler.schedule_absolute(created, action1)

def action2(scheduler, state):
assert ys[0] is not None
d.disposable = ys[0].subscribe(results)

scheduler.schedule_absolute(subscribed, action2)
Expand Down
14 changes: 9 additions & 5 deletions tests/test_observable/test_generate.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
from typing import Any, NoReturn

import reactivex
from reactivex import operators as ops
Expand All @@ -18,17 +19,20 @@ class RxException(Exception):


# Helper function for raising exceptions within lambdas
def _raise(ex):
def _raise(ex: Any) -> NoReturn:
raise RxException(ex)


_INITIAL: int = 0


class TestGenerate(unittest.TestCase):
def test_generate_finite(self):
scheduler = TestScheduler()

def create():
return reactivex.generate(
0,
_INITIAL,
lambda x: x <= 3,
lambda x: x + 1,
)
Expand All @@ -49,7 +53,7 @@ def test_generate_throw_condition(self):

def create():
return reactivex.generate(
0,
_INITIAL,
lambda x: _raise("ex"),
lambda x: x + 1,
)
Expand Down Expand Up @@ -78,7 +82,7 @@ def test_generate_dispose(self):

def create():
return reactivex.generate(
0,
_INITIAL,
lambda x: True,
lambda x: x + 1,
)
Expand All @@ -91,7 +95,7 @@ def test_generate_repeat(self):

def create():
return reactivex.generate(
0,
_INITIAL,
lambda x: x <= 3,
lambda x: x + 1,
).pipe(ops.repeat(2))
Expand Down
2 changes: 1 addition & 1 deletion tests/test_observable/test_interval.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def create():
def test_interval_timespan_observer_throws(self):
scheduler = TestScheduler()
xs = reactivex.interval(1)
xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler)
xs.subscribe(lambda x: _raise(Exception("ex")), scheduler=scheduler)

with self.assertRaises(RxException):
scheduler.start()
Loading
Loading