Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ exclude = [
"dist",
"notebooks",
"examples",
# Test directories - remove as each stage is completed
# Stage 1 complete, Stage 2 partial (test_integration complete)
# test_subject and test_scheduler now pass ruff checks
"tests/test_observable",
]

[tool.ruff.lint]
Expand Down Expand Up @@ -109,7 +105,7 @@ asyncio_mode = "strict"
include = ["reactivex", "tests"]
exclude = [
# Stage 1 complete, Stage 2 complete (test_subject done)
# test_scheduler still has strict-mode pyright errors
# test_scheduler and test_observable still have strict-mode pyright errors
"tests/test_scheduler",
"tests/test_observable",
]
Expand Down
4 changes: 2 additions & 2 deletions tests/test_observable/test_amb.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
class TestAmb(unittest.TestCase):
def test_amb_never2(self):
scheduler = TestScheduler()
l = reactivex.never()
left = reactivex.never()
r = reactivex.never()

def create():
return l.pipe(ops.amb(r))
return left.pipe(ops.amb(r))

results = scheduler.start(create)
assert results.messages == []
Expand Down
2 changes: 1 addition & 1 deletion tests/test_observable/test_average.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def test_average_int32_empty(self):
res = scheduler.start(create=lambda: xs.pipe(_.average())).messages

assert len(res) == 1
assert res[0].value.kind == "E" and res[0].value.exception != None
assert res[0].value.kind == "E" and res[0].value.exception is not None
assert res[0].time == 250

def test_average_int32_return(self):
Expand Down
1 change: 0 additions & 1 deletion tests/test_observable/test_catch.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ def handler(e, source):
assert handler_called[0]

def test_catch_error_specific_caught_immediate(self):
ex = "ex"
handler_called = [False]
scheduler = TestScheduler()
msgs2 = [on_next(240, 4), on_completed(250)]
Expand Down
9 changes: 6 additions & 3 deletions tests/test_observable/test_combination_fluent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"""

import reactivex as rx
from reactivex import Observable, operators as ops
from reactivex import Observable
from reactivex import operators as ops


class TestMergeMethodChaining:
Expand Down Expand Up @@ -164,7 +165,8 @@ def test_join_equivalence(self) -> None:
right: Observable[int] = rx.of(4, 5, 6)

# Use rx.never() for duration to keep windows open
duration_mapper = lambda _: rx.never()
def duration_mapper(_):
return rx.never()

fluent_result: Observable[tuple[int, int]] = left.join(
right, duration_mapper, duration_mapper
Expand Down Expand Up @@ -192,7 +194,8 @@ def test_group_join_equivalence(self) -> None:
right: Observable[int] = rx.of(4, 5)

# Use rx.never() for duration to keep windows open
duration_mapper = lambda _: rx.never()
def duration_mapper(_):
return rx.never()

fluent_result: Observable[tuple[int, Observable[int]]] = left.group_join(
right, duration_mapper, duration_mapper
Expand Down
3 changes: 2 additions & 1 deletion tests/test_observable/test_conditional_fluent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"""

import reactivex as rx
from reactivex import Observable, operators as ops
from reactivex import Observable
from reactivex import operators as ops


class TestDefaultIfEmptyMethodChaining:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_observable/test_connectableobservable.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

class MySubject(Observable, ObserverBase):
def __init__(self):
super(MySubject, self).__init__()
super().__init__()

self.dispose_on_map = {}
self.subscribe_count = 0
Expand Down Expand Up @@ -82,7 +82,7 @@ def test_connectable_observable_connected(self):
subject = MySubject()

conn = ConnectableObservable(xs, subject)
disconnect = conn.connect(scheduler)
conn.connect(scheduler)

res = scheduler.start(lambda: conn)

Expand Down
3 changes: 1 addition & 2 deletions tests/test_observable/test_debounce.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import unittest

from reactivex import empty, never
from reactivex import empty, never, throw
from reactivex import operators as _
from reactivex import throw
from reactivex.testing import ReactiveTest, TestScheduler

on_next = ReactiveTest.on_next
Expand Down
3 changes: 1 addition & 2 deletions tests/test_observable/test_defaultifempty.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import unittest
from typing import Optional

from reactivex import Observable
from reactivex import operators as ops
Expand Down Expand Up @@ -27,7 +26,7 @@ def test_default_if_empty_non_empty1(self):
on_completed(420),
)

def create() -> Observable[Optional[int]]:
def create() -> Observable[int | None]:
return xs.pipe(ops.default_if_empty())

results = scheduler.start(create)
Expand Down
30 changes: 15 additions & 15 deletions tests/test_observable/test_doaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def on_completed():
# def test_do_next_error(self):
# ex = 'ex'
# scheduler = TestScheduler()
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_next(220, 3), on_next(230, 4), on_next(240, 5), on_error(250, ex))
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_next(220, 3), on_next(230, 4), on_next(240, 5), on_error(250, ex)) # noqa: E501
# i = [0]
# sum = [2 + 3 + 4 + 5]
# saw_error = False
Expand All @@ -156,7 +156,7 @@ def on_completed():

# def test_do_next_error_not(self):
# scheduler = TestScheduler()
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_next(220, 3), on_next(230, 4), on_next(240, 5), on_completed(250))
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_next(220, 3), on_next(230, 4), on_next(240, 5), on_completed(250)) # noqa: E501
# i = [0]
# sum = [2 + 3 + 4 + 5]
# saw_error = False
Expand All @@ -175,7 +175,7 @@ def on_completed():

# def test_do_next_error_completed(self):
# scheduler = TestScheduler()
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_next(220, 3), on_next(230, 4), on_next(240, 5), on_completed(250))
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_next(220, 3), on_next(230, 4), on_next(240, 5), on_completed(250)) # noqa: E501
# i = [0]
# sum = [2 + 3 + 4 + 5]
# saw_error = False
Expand All @@ -198,7 +198,7 @@ def on_completed():
# def test_do_next_error_completed_error(self):
# ex = 'ex'
# scheduler = TestScheduler()
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_next(220, 3), on_next(230, 4), on_next(240, 5), on_error(250, ex))
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_next(220, 3), on_next(230, 4), on_next(240, 5), on_error(250, ex)) # noqa: E501
# i = [0]
# sum = [2 + 3 + 4 + 5]
# saw_error = False
Expand Down Expand Up @@ -238,7 +238,7 @@ def on_completed():
# def test_Do_Observer_SomeDataWithError(self):
# ex = 'ex'
# scheduler = TestScheduler()
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_next(220, 3), on_next(230, 4), on_next(240, 5), on_error(250, ex))
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_next(220, 3), on_next(230, 4), on_next(240, 5), on_error(250, ex)) # noqa: E501
# i = [0]
# sum = [2 + 3 + 4 + 5]
# saw_error = False
Expand All @@ -260,7 +260,7 @@ def on_completed():

# def test_do_observer_some_data_with_error(self):
# scheduler = TestScheduler()
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_next(220, 3), on_next(230, 4), on_next(240, 5), on_completed(250))
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_next(220, 3), on_next(230, 4), on_next(240, 5), on_completed(250)) # noqa: E501
# i = [0]
# sum = [2 + 3 + 4 + 5]
# saw_error = False
Expand All @@ -283,7 +283,7 @@ def on_completed():
# def test_do1422_next_next_throws(self):
# ex = 'ex'
# scheduler = TestScheduler()
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_completed(250))
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_completed(250)) # noqa: E501
# results = scheduler.start(create)
# return xs.do_action(function () {
# raise Exception(ex)
Expand All @@ -294,7 +294,7 @@ def on_completed():
# def test_do1422_next_completed_next_throws(self):
# ex = 'ex'
# scheduler = TestScheduler()
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_completed(250))
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_completed(250)) # noqa: E501
# results = scheduler.start(create)
# return xs.do_action(function () {
# throw ex
Expand All @@ -305,7 +305,7 @@ def on_completed():
# def test_do1422_next_completed_completed_throws(self):
# ex = 'ex'
# scheduler = TestScheduler()
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_completed(250))
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_completed(250)) # noqa: E501
# results = scheduler.start(create)
# return xs.do_action(function () { }, _undefined, function () {
# throw ex
Expand All @@ -316,7 +316,7 @@ def on_completed():
# def test_do1422_next_error_next_throws(self):
# ex = 'ex'
# scheduler = TestScheduler()
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_completed(250))
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_completed(250)) # noqa: E501
# results = scheduler.start(create)
# return xs.do_action(function () {
# raise Exception(ex)
Expand All @@ -341,7 +341,7 @@ def on_completed():
# var ex, results, scheduler, xs
# ex = 'ex'
# scheduler = TestScheduler()
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_completed(250))
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_completed(250)) # noqa: E501
# results = scheduler.start(create)
# return xs.do_action(function () {
# raise Exception(ex)
Expand All @@ -365,7 +365,7 @@ def on_completed():
# def test_do1422_next_error_completed_completed_throws(self):
# ex = 'ex'
# scheduler = TestScheduler()
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_completed(250))
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_completed(250)) # noqa: E501
# results = scheduler.start(create)
# return xs.do_action(function () { }, function () { }, function () {
# raise Exception(ex)
Expand All @@ -376,7 +376,7 @@ def on_completed():
# def test_do1422_observer_next_throws(self):
# ex = 'ex'
# scheduler = TestScheduler()
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_completed(250))
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_completed(250)) # noqa: E501
# results = scheduler.start(create)
# return xs.do_action(Observer.create(function () {
# raise Exception(ex)
Expand All @@ -401,9 +401,9 @@ def on_completed():
# var ex, results, scheduler, xs
# ex = 'ex'
# scheduler = TestScheduler()
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_completed(250))
# xs = scheduler.create_hot_observable(on_next(150, 1), on_next(210, 2), on_completed(250)) # noqa: E501
# results = scheduler.start(create)
# return xs.do_action(Observer.create(function () { }, function () { }, function () {
# return xs.do_action(Observer.create(function () { }, function () { }, function () { # noqa: E501
# raise Exception(ex)
# }))

Expand Down
3 changes: 2 additions & 1 deletion tests/test_observable/test_error_handling_fluent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"""

import reactivex as rx
from reactivex import Observable, operators as ops
from reactivex import Observable
from reactivex import operators as ops


class TestCatchMethodChaining:
Expand Down
19 changes: 12 additions & 7 deletions tests/test_observable/test_filtering_fluent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
ensuring they produce identical results to the pipe-based functional syntax.
"""

from typing import Callable

import reactivex as rx
from reactivex import Observable, operators as ops
from reactivex import Observable
from reactivex import operators as ops


class TestFilterMethodChaining:
Expand All @@ -16,7 +15,9 @@ class TestFilterMethodChaining:
def test_filter_equivalence(self) -> None:
"""Verify fluent and functional styles are equivalent."""
source: Observable[int] = rx.of(1, 2, 3, 4, 5, 6)
predicate: Callable[[int], bool] = lambda x: x % 2 == 0

def predicate(x: int) -> bool:
return x % 2 == 0

# Fluent style
fluent_result: Observable[int] = source.filter(predicate)
Expand Down Expand Up @@ -138,7 +139,9 @@ def test_first_equivalence(self) -> None:
def test_first_with_predicate(self) -> None:
"""Test first with a predicate."""
source: Observable[int] = rx.of(1, 2, 3, 4, 5)
predicate: Callable[[int], bool] = lambda x: x > 3

def predicate(x: int) -> bool:
return x > 3

result: Observable[int] = source.first(predicate)

Expand Down Expand Up @@ -173,7 +176,9 @@ def test_last_equivalence(self) -> None:
def test_last_with_predicate(self) -> None:
"""Test last with a predicate."""
source: Observable[int] = rx.of(1, 2, 3, 4, 5)
predicate: Callable[[int], bool] = lambda x: x < 4

def predicate(x: int) -> bool:
return x < 4

result: Observable[int] = source.last(predicate)

Expand Down Expand Up @@ -792,7 +797,7 @@ def test_single_or_default_async_with_multiple_elements(self) -> None:
assert "more than one element" in str(pipe_errors[0]).lower()

def test_single_or_default_async_equivalence(self) -> None:
"""Verify single_or_default_async fluent and functional styles are equivalent."""
"""Verify single_or_default_async fluent and functional styles are equivalent.""" # noqa: E501
source: Observable[int] = rx.of(5)

fluent_result: Observable[int] = source.single_or_default_async(
Expand Down
2 changes: 1 addition & 1 deletion tests/test_observable/test_fromfuture.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def on_next(x):
success[0] = False

def on_error(err):
success[1] = type(err) == asyncio.CancelledError
success[1] = isinstance(err, asyncio.CancelledError)

def on_completed():
success[2] = False
Expand Down
1 change: 0 additions & 1 deletion tests/test_observable/test_generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ def create():

def test_generate_dispose(self):
scheduler = TestScheduler()
ex = "ex"

def create():
return reactivex.generate(
Expand Down
Loading