diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py index 7b0ea9becd3ee..736231ce13168 100644 --- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py @@ -17,50 +17,40 @@ from ducktape.tests.test import Test from ducktape.mark.resource import cluster from ducktape.mark import matrix -from ducktape.mark import ignore from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService import time import signal from random import randint -def broker_node(test, topic, broker_type): - """ Discover node of requested type. For leader type, discovers leader for our topic and partition 0 - """ - if broker_type == "leader": - node = test.kafka.leader(topic, partition=0) - elif broker_type == "controller": - node = test.kafka.controller() - else: - raise Exception("Unexpected broker type %s." % (broker_type)) - - return node +def leader_node(test, topic): + """Discover leader for our topic and partition 0.""" + return test.kafka.leader(topic, partition=0) def signal_node(test, node, sig): test.kafka.signal_node(node, sig) -def clean_shutdown(test, topic, broker_type): - """Discover broker node of requested type and shut it down cleanly. - """ - node = broker_node(test, topic, broker_type) +def clean_shutdown(test, topic): + """Discover leader broker node and shut it down cleanly.""" + node = leader_node(test, topic) signal_node(test, node, signal.SIGTERM) -def hard_shutdown(test, topic, broker_type): - """Discover broker node of requested type and shut it down with a hard kill.""" - node = broker_node(test, topic, broker_type) +def hard_shutdown(test, topic): + """Discover leader broker node and shut it down with a hard kill.""" + node = leader_node(test, topic) signal_node(test, node, signal.SIGKILL) -def clean_bounce(test, topic, broker_type): +def clean_bounce(test, topic): """Chase the leader of one partition and restart it cleanly a few times (5 times).""" for i in range(5): - prev_broker_node = broker_node(test, topic, broker_type) + prev_broker_node = leader_node(test, topic) test.kafka.restart_node(prev_broker_node, clean_shutdown=True) -def hard_bounce(test, topic, broker_type): +def hard_bounce(test, topic): """Chase the leader and restart it with a hard kill. Do this a few times (5).""" for i in range(5): - prev_broker_node = broker_node(test, topic, broker_type) + prev_broker_node = leader_node(test, topic) test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL) wait_until(lambda: not test.kafka.pids(prev_broker_node), @@ -145,11 +135,11 @@ def __init__(self, test_context): 'configs': {"min.insync.replicas": 2} } } - def fail_broker_type(self, failure_mode, broker_type): + def fail_leader(self, failure_mode): # Pick a random topic and bounce it's leader topic_index = randint(0, len(self.topics.keys()) - 1) topic = list(self.topics.keys())[topic_index] - failures[failure_mode](self, topic, broker_type) + failures[failure_mode](self, topic) def fail_many_brokers(self, failure_mode, num_failures): many_failures[failure_mode](self, num_failures) @@ -194,7 +184,7 @@ def setup_system(self, start_processor=True, num_threads=3, group_protocol='clas if (start_processor): self.processor1.start() - def collect_results(self, sleep_time_secs): + def collect_results(self): data = {} # End test self.driver.wait() @@ -204,13 +194,7 @@ def collect_results(self, sleep_time_secs): node = self.driver.node - # Success is declared if streams does not crash when sleep time > 0 - # It should give an exception when sleep time is 0 since we kill the brokers immediately - # and the topic manager cannot create internal topics with the desired replication factor - if (sleep_time_secs == 0): - output_streams = self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-EXCEPTION %s" % self.processor1.STDOUT_FILE, allow_fail=False) - else: - output_streams = self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False) + output_streams = self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False) for line in output_streams: data["Client closed"] = line @@ -229,12 +213,11 @@ def collect_results(self, sleep_time_secs): @cluster(num_nodes=7) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], - broker_type=["leader"], num_threads=[1, 3], sleep_time_secs=[120], metadata_quorum=[quorum.combined_kraft], group_protocol=["classic", "streams"]) - def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads, metadata_quorum, group_protocol): + def test_broker_type_bounce(self, failure_mode, sleep_time_secs, num_threads, metadata_quorum, group_protocol): """ Start a smoke test client, then kill one particular broker and ensure data is still received Record if records are delivered. @@ -247,34 +230,9 @@ def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, nu time.sleep(sleep_time_secs) # Fail brokers - self.fail_broker_type(failure_mode, broker_type) - - return self.collect_results(sleep_time_secs) - - @ignore - @cluster(num_nodes=7) - @matrix(failure_mode=["clean_shutdown"], - broker_type=["controller"], - sleep_time_secs=[0], - metadata_quorum=[quorum.combined_kraft], - group_protocol=["classic", "streams"]) - def test_broker_type_bounce_at_start(self, failure_mode, broker_type, sleep_time_secs, metadata_quorum, group_protocol): - """ - Start a smoke test client, then kill one particular broker immediately before streams stats - Streams should throw an exception since it cannot create topics with the desired - replication factor of 3 - """ - self.setup_system(start_processor=False, group_protocol=group_protocol) - - # Sleep to allow test to run for a bit - time.sleep(sleep_time_secs) - - # Fail brokers - self.fail_broker_type(failure_mode, broker_type) - - self.processor1.start() + self.fail_leader(failure_mode) - return self.collect_results(sleep_time_secs) + return self.collect_results() @cluster(num_nodes=10) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], @@ -294,7 +252,7 @@ def test_many_brokers_bounce(self, failure_mode, num_failures, metadata_quorum, # Fail brokers self.fail_many_brokers(failure_mode, num_failures) - return self.collect_results(120) + return self.collect_results() @cluster(num_nodes=10) @matrix(failure_mode=["clean_bounce", "hard_bounce"], @@ -321,4 +279,4 @@ def test_all_brokers_bounce(self, failure_mode, num_failures, metadata_quorum, g # Fail brokers self.fail_many_brokers(failure_mode, num_failures) - return self.collect_results(120) + return self.collect_results()