Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ cd EventGate
```shell
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
pip3 install -r requirements.txt
```

## Run Pylint Tool Locally
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ jsonschema==4.25.1
PyJWT==2.10.1
requests==2.32.5
boto3==1.40.25
confluent-kafka==2.11.1
confluent-kafka==2.12.1
# psycopg2-binary==2.9.10 # Ideal for local development, but not for long-term production use
psycopg2==2.9.10
74 changes: 50 additions & 24 deletions src/writer_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import json
import logging
import os
import time
from typing import Any, Dict, Optional, Tuple

from confluent_kafka import Producer

try: # KafkaException may not exist in stubbed test module
Expand All @@ -35,8 +35,10 @@ class KafkaException(Exception): # type: ignore


STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "producer": None}
# Configurable flush timeout (seconds) to avoid hanging indefinitely
_KAFKA_FLUSH_TIMEOUT_SEC = float(os.environ.get("KAFKA_FLUSH_TIMEOUT", "5"))
# Configurable flush timeouts and retries via env variables to avoid hanging indefinitely
_KAFKA_FLUSH_TIMEOUT_SEC = float(os.environ.get("KAFKA_FLUSH_TIMEOUT", "7"))
_MAX_RETRIES = int(os.environ.get("KAFKA_FLUSH_RETRIES", "3"))
_RETRY_BACKOFF_SEC = float(os.environ.get("KAFKA_RETRY_BACKOFF", "0.5"))


def init(logger: logging.Logger, config: Dict[str, Any]) -> None:
Expand Down Expand Up @@ -86,7 +88,6 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
"""
logger = STATE["logger"]
producer: Optional[Producer] = STATE.get("producer") # type: ignore[assignment]

if producer is None:
logger.debug("Kafka producer not initialized - skipping")
return True, None
Expand All @@ -100,23 +101,48 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
value=json.dumps(message).encode("utf-8"),
callback=lambda err, msg: (errors.append(str(err)) if err is not None else None),
)
try:
remaining = producer.flush(_KAFKA_FLUSH_TIMEOUT_SEC) # type: ignore[arg-type]
except TypeError: # Fallback for stub producers without timeout parameter
remaining = producer.flush() # type: ignore[call-arg]
# remaining can be number of undelivered messages (confluent_kafka returns int)
if not errors and isinstance(remaining, int) and remaining > 0:
timeout_msg = f"Kafka flush timeout after {_KAFKA_FLUSH_TIMEOUT_SEC}s: {remaining} message(s) still pending"
logger.error(timeout_msg)
return False, timeout_msg
except KafkaException as e: # narrow exception capture
err_msg = f"The Kafka writer failed with unknown error: {str(e)}"
logger.exception(err_msg)
return False, err_msg

if errors:
msg = "; ".join(errors)
logger.error(msg)
return False, msg

return True, None

remaining: Optional[int] = None
Comment thread
oto-macenauer-absa marked this conversation as resolved.
Outdated
for attempt in range(1, _MAX_RETRIES + 1):
remaining = flush_with_timeout(producer, _KAFKA_FLUSH_TIMEOUT_SEC)
# Treat None (flush returns None in some stubs) as success equivalent to 0 pending
if (remaining is None or remaining == 0) and not errors:
break
if attempt < _MAX_RETRIES:
logger.warning(
"Kafka flush pending (%s message(s) remain) attempt %d/%d", remaining, attempt, _MAX_RETRIES
)
time.sleep(_RETRY_BACKOFF_SEC)

if errors:
err_msg_summary = "; ".join(errors)
logger.error(err_msg_summary)
return False, err_msg_summary

# Log a warning if there are still pending messages after retries
if isinstance(remaining, int) and remaining > 0:
logger.warning(
"Kafka flush timeout after %ss: %d message(s) still pending", _KAFKA_FLUSH_TIMEOUT_SEC, remaining
)

return True, None

except KafkaException as e:
err_text = f"The Kafka writer failed with a Kafka exception error: {e}"
Comment thread
oto-macenauer-absa marked this conversation as resolved.
Outdated
logger.exception(err_text)
return False, err_text


def flush_with_timeout(producer, timeout: float) -> int:
"""Flush the Kafka producer with a timeout, handling TypeError for stubs.

Args:
producer: Kafka Producer instance.
timeout: Timeout in seconds.
Returns:
Number of messages still pending after flush.
"""
try:
return producer.flush(timeout)
except TypeError: # Fallback for stub producers without timeout parameter
return producer.flush()
Comment thread
tmikula-dev marked this conversation as resolved.
Outdated
81 changes: 80 additions & 1 deletion tests/test_writer_kafka.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import logging
from types import SimpleNamespace
import src.writer_kafka as wk
Expand All @@ -23,6 +22,44 @@ def produce(self, topic, key, value, callback): # noqa: D401
callback("ERR", None)


class FakeProducerFlushSequence(FakeProducerSuccess):
def __init__(self, sequence): # sequence of remaining counts per flush call
super().__init__()
self.sequence = sequence
self.flush_calls = 0

def flush(self, *a, **kw):
# Simulate decreasing remaining messages
if self.flush_calls < len(self.sequence):
val = self.sequence[self.flush_calls]
else:
val = self.sequence[-1]
self.flush_calls += 1
return val


class FakeProducerTimeout(FakeProducerSuccess):
def __init__(self, remaining_value):
super().__init__()
self.remaining_value = remaining_value
self.flush_calls = 0

def flush(self, *a, **kw): # always returns same remaining >0 to force timeout warning
self.flush_calls += 1
return self.remaining_value


class FakeProducerTypeError(FakeProducerSuccess):
def __init__(self):
super().__init__()
self.flush_calls = 0

# Intentionally omit timeout parameter causing TypeError on first attempt inside flush_with_timeout
def flush(self): # noqa: D401
self.flush_calls += 1
return 0


def test_write_skips_when_producer_none(monkeypatch):
wk.STATE["logger"] = logging.getLogger("test")
wk.STATE["producer"] = None
Expand Down Expand Up @@ -60,3 +97,45 @@ def produce(self, *a, **kw): # noqa: D401
wk.STATE["producer"] = RaisingProducer()
ok, err = wk.write("topic", {"d": 4})
assert not ok and "boom" in err


def test_write_flush_retries_until_success(monkeypatch, caplog):
wk.STATE["logger"] = logging.getLogger("test")
caplog.set_level(logging.WARNING)
# Force smaller max retries for deterministic sequence length
monkeypatch.setattr(wk, "_MAX_RETRIES", 5, raising=False)
producer = FakeProducerFlushSequence([5, 4, 3, 1, 0])
wk.STATE["producer"] = producer
ok, err = wk.write("topic", {"e": 5})
assert ok and err is None
# It should break as soon as remaining == 0 (after flush call returning 0)
assert producer.flush_calls == 5 # sequence consumed until 0
# Warnings logged for attempts before success (flush_calls -1) because last attempt didn't warn
warn_messages = [r.message for r in caplog.records if r.levelno == logging.WARNING]
assert any("attempt 1" in m or "attempt 2" in m for m in warn_messages)


def test_write_timeout_warning_when_remaining_after_retries(monkeypatch, caplog):
wk.STATE["logger"] = logging.getLogger("test")
caplog.set_level(logging.WARNING)
monkeypatch.setattr(wk, "_MAX_RETRIES", 3, raising=False)
producer = FakeProducerTimeout(2)
wk.STATE["producer"] = producer
ok, err = wk.write("topic", {"f": 6})
timeout_warnings = [
r.message for r in caplog.records if "timeout" in r.message
] # final warning should mention timeout
assert ok and err is None # function returns success even if timeout warning
assert timeout_warnings, "Expected timeout warning logged"
assert producer.flush_calls == 3 # retried 3 times


def test_flush_with_timeout_typeerror_fallback(monkeypatch):
wk.STATE["logger"] = logging.getLogger("test")
monkeypatch.setattr(wk, "_MAX_RETRIES", 4, raising=False)
producer = FakeProducerTypeError()
wk.STATE["producer"] = producer
ok, err = wk.write("topic", {"g": 7})
assert ok and err is None
# Since flush returns 0 immediately, only one flush call should be needed
assert producer.flush_calls == 1
Loading