From 033ca1982265c9defbf52df12da4d0e276e2b78f Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Fri, 7 Nov 2025 11:07:36 -0500 Subject: [PATCH] rptest: Fix timequery test The test uses constant timestamp to generate records and query them. But the timestamp is too old so retention policy starts to remove data while the test is running. This commit makes the test use proper base timestamp. Signed-off-by: Evgeny Lazin <4lazin@gmail.com> --- tests/rptest/tests/timequery_test.py | 40 +++++++++++++--------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/tests/rptest/tests/timequery_test.py b/tests/rptest/tests/timequery_test.py index 8c91abeec8d94..d9ec7571be0ca 100644 --- a/tests/rptest/tests/timequery_test.py +++ b/tests/rptest/tests/timequery_test.py @@ -48,8 +48,10 @@ class BaseTimeQuery: test_context: dict logger: Logger + base_ts = int(time.time() - 600) * 1000 + def _create_and_produce( - self, cluster, cloud_storage, local_retention, base_ts, record_size, msg_count + self, cluster, cloud_storage, local_retention, record_size, msg_count ): topic = TopicSpec(name="tqtopic", partition_count=1, replication_factor=3) self.client().create_topic(topic) @@ -95,20 +97,19 @@ def _create_and_produce( # but small enough that we are getting plenty of distinct batches. batch_max_bytes=record_size * 10, # A totally arbitrary artificial timestamp base in milliseconds - fake_timestamp_ms=base_ts, + fake_timestamp_ms=self.base_ts, ) producer.start() producer.wait() # We know timestamps, they are generated linearly from the # base we provided to kgo-verifier - timestamps = dict((i, base_ts + i) for i in range(0, msg_count)) + timestamps = dict((i, self.base_ts + i) for i in range(0, msg_count)) return topic, timestamps def _test_timequery(self, cluster, cloud_storage: bool, batch_cache: bool): total_segments = 12 record_size = 1024 - base_ts = 1664453149000 msg_count = (self.log_segment_size * total_segments) // record_size local_retention = self.log_segment_size * 4 kcat = KafkaCat(cluster) @@ -118,13 +119,13 @@ def _test_timequery(self, cluster, cloud_storage: bool, batch_cache: bool): name="tq_empty_topic", partition_count=1, replication_factor=3 ) self.client().create_topic(empty_topic) - offset = kcat.query_offset(empty_topic.name, 0, base_ts) + offset = kcat.query_offset(empty_topic.name, 0, self.base_ts) self.logger.info(f"Time query returned offset {offset}") assert offset == -1, f"Expected -1, got {offset}" # Create a topic and produce a run of messages we will query. topic, timestamps = self._create_and_produce( - cluster, cloud_storage, local_retention, base_ts, record_size, msg_count + cluster, cloud_storage, local_retention, record_size, msg_count ) for k, v in timestamps.items(): self.logger.debug(f" Offset {k} -> Timestamp {v}") @@ -270,11 +271,10 @@ def _test_timequery_below_start_offset(self, cluster): total_segments = 3 local_retain_segments = 1 record_size = 1024 - base_ts = 1664453149000 msg_count = (self.log_segment_size * total_segments) // record_size topic, timestamps = self._create_and_produce( - cluster, False, local_retain_segments, base_ts, record_size, msg_count + cluster, False, local_retain_segments, record_size, msg_count ) self.client().alter_topic_config( @@ -295,7 +295,7 @@ def start_offset(): kcat = KafkaCat(cluster) lwm_before = start_offset() - offset = kcat.query_offset(topic.name, 0, base_ts) + offset = kcat.query_offset(topic.name, 0, self.base_ts) lwm_after = start_offset() # When querying before the start of the log, we should get @@ -416,11 +416,10 @@ def test_timequery_with_local_gc(self): self.set_up_cluster(cloud_storage=True, batch_cache=False, spillover=False) local_retention = self.log_segment_size * 4 record_size = 1024 - base_ts = 1664453149000 msg_count = (self.log_segment_size * total_segments) // record_size topic, timestamps = self._create_and_produce( - self.redpanda, True, local_retention, base_ts, record_size, msg_count + self.redpanda, True, local_retention, record_size, msg_count ) # While waiting for local GC to occur, run several concurrent @@ -494,11 +493,10 @@ def test_timequery_with_trim_prefix(self, cloud_storage: bool, spillover: bool): ) total_segments = 12 record_size = 1024 - base_ts = 1664453149000 msg_count = (self.log_segment_size * total_segments) // record_size local_retention = self.log_segment_size * 4 topic, timestamps = self._create_and_produce( - self.redpanda, True, local_retention, base_ts, record_size, msg_count + self.redpanda, True, local_retention, record_size, msg_count ) # Confirm messages written @@ -554,12 +552,11 @@ def test_timequery_with_spillover_gc_delayed(self): self.set_up_cluster(cloud_storage=True, batch_cache=False, spillover=True) total_segments = 16 record_size = 1024 - base_ts = 1664453149000 msg_count = (self.log_segment_size * total_segments) // record_size local_retention = self.log_segment_size * 4 topic_retention = self.log_segment_size * 8 topic, timestamps = self._create_and_produce( - self.redpanda, True, local_retention, base_ts, record_size, msg_count + self.redpanda, True, local_retention, record_size, msg_count ) # Confirm messages written @@ -655,11 +652,10 @@ def test_timequery_empty_local_log(self): total_segments = 3 record_size = 1024 - base_ts = 1664453149000 msg_count = (self.log_segment_size * total_segments) // record_size local_retention = 1 # Any value works for this test. topic, timestamps = self._create_and_produce( - self.redpanda, True, local_retention, base_ts, record_size, msg_count + self.redpanda, True, local_retention, record_size, msg_count ) # Confirm messages written @@ -770,6 +766,7 @@ class TestReadReplicaTimeQuery(RedpandaTest): log_segment_size = 1024 * 1024 topic_name = "panda-topic" + base_ts = int(time.time() - 600) * 1000 def __init__(self, test_context): super(TestReadReplicaTimeQuery, self).__init__( @@ -812,7 +809,7 @@ def create_read_replica_topic(self) -> None: return False return True - def setup_clusters(self, base_ts, num_messages=0, partition_count=1) -> None: + def setup_clusters(self, num_messages=0, partition_count=1) -> None: spec = TopicSpec( name=self.topic_name, partition_count=partition_count, @@ -829,7 +826,7 @@ def setup_clusters(self, base_ts, num_messages=0, partition_count=1) -> None: msg_size=1024, msg_count=num_messages, batch_max_bytes=10240, - fake_timestamp_ms=base_ts, + fake_timestamp_ms=self.base_ts, ) producer.start() @@ -868,9 +865,8 @@ def query_timestamp(self, ts, kcat_src, kcat_rr): @ducktape_cluster(num_nodes=7) def test_timequery(self): - base_ts = 1664453149000 num_messages = 1000 - self.setup_clusters(base_ts, num_messages, 3) + self.setup_clusters(num_messages, 3) def read_replica_ready(): orig = RpkTool(self.redpanda).describe_topic(self.topic_name) @@ -889,4 +885,4 @@ def read_replica_ready(): kcat1 = KafkaCat(self.redpanda) kcat2 = KafkaCat(self.rr_cluster) for ix in range(0, num_messages, 20): - self.query_timestamp(base_ts + ix, kcat1, kcat2) + self.query_timestamp(self.base_ts + ix, kcat1, kcat2)