diff --git a/tests/rptest/tests/timequery_test.py b/tests/rptest/tests/timequery_test.py index e5a689a6e8b06..040b60a2a647e 100644 --- a/tests/rptest/tests/timequery_test.py +++ b/tests/rptest/tests/timequery_test.py @@ -48,8 +48,10 @@ class BaseTimeQuery: test_context: dict logger: Logger + base_ts = int(time.time() - 600) * 1000 + def _create_and_produce( - self, cluster, cloud_storage, local_retention, base_ts, record_size, msg_count + self, cluster, cloud_storage, local_retention, record_size, msg_count ): topic = TopicSpec(name="tqtopic", partition_count=1, replication_factor=3) self.client().create_topic(topic) @@ -93,20 +95,19 @@ def _create_and_produce( # but small enough that we are getting plenty of distinct batches. batch_max_bytes=record_size * 10, # A totally arbitrary artificial timestamp base in milliseconds - fake_timestamp_ms=base_ts, + fake_timestamp_ms=self.base_ts, ) producer.start() producer.wait() # We know timestamps, they are generated linearly from the # base we provided to kgo-verifier - timestamps = dict((i, base_ts + i) for i in range(0, msg_count)) + timestamps = dict((i, self.base_ts + i) for i in range(0, msg_count)) return topic, timestamps def _test_timequery(self, cluster, cloud_storage: bool, batch_cache: bool): total_segments = 12 record_size = 1024 - base_ts = 1664453149000 msg_count = (self.log_segment_size * total_segments) // record_size local_retention = self.log_segment_size * 4 kcat = KafkaCat(cluster) @@ -116,13 +117,13 @@ def _test_timequery(self, cluster, cloud_storage: bool, batch_cache: bool): name="tq_empty_topic", partition_count=1, replication_factor=3 ) self.client().create_topic(empty_topic) - offset = kcat.query_offset(empty_topic.name, 0, base_ts) + offset = kcat.query_offset(empty_topic.name, 0, self.base_ts) self.logger.info(f"Time query returned offset {offset}") assert offset == -1, f"Expected -1, got {offset}" # Create a topic and produce a run of messages we will query. topic, timestamps = self._create_and_produce( - cluster, cloud_storage, local_retention, base_ts, record_size, msg_count + cluster, cloud_storage, local_retention, record_size, msg_count ) for k, v in timestamps.items(): self.logger.debug(f" Offset {k} -> Timestamp {v}") @@ -268,11 +269,10 @@ def _test_timequery_below_start_offset(self, cluster): total_segments = 3 local_retain_segments = 1 record_size = 1024 - base_ts = 1664453149000 msg_count = (self.log_segment_size * total_segments) // record_size topic, timestamps = self._create_and_produce( - cluster, False, local_retain_segments, base_ts, record_size, msg_count + cluster, False, local_retain_segments, record_size, msg_count ) self.client().alter_topic_config( @@ -293,7 +293,7 @@ def start_offset(): kcat = KafkaCat(cluster) lwm_before = start_offset() - offset = kcat.query_offset(topic.name, 0, base_ts) + offset = kcat.query_offset(topic.name, 0, self.base_ts) lwm_after = start_offset() # When querying before the start of the log, we should get @@ -409,11 +409,10 @@ def test_timequery_with_local_gc(self): self.set_up_cluster(cloud_storage=True, batch_cache=False, spillover=False) local_retention = self.log_segment_size * 4 record_size = 1024 - base_ts = 1664453149000 msg_count = (self.log_segment_size * total_segments) // record_size topic, timestamps = self._create_and_produce( - self.redpanda, True, local_retention, base_ts, record_size, msg_count + self.redpanda, True, local_retention, record_size, msg_count ) # While waiting for local GC to occur, run several concurrent @@ -487,11 +486,10 @@ def test_timequery_with_trim_prefix(self, cloud_storage: bool, spillover: bool): ) total_segments = 12 record_size = 1024 - base_ts = 1664453149000 msg_count = (self.log_segment_size * total_segments) // record_size local_retention = self.log_segment_size * 4 topic, timestamps = self._create_and_produce( - self.redpanda, True, local_retention, base_ts, record_size, msg_count + self.redpanda, True, local_retention, record_size, msg_count ) # Confirm messages written @@ -547,12 +545,11 @@ def test_timequery_with_spillover_gc_delayed(self): self.set_up_cluster(cloud_storage=True, batch_cache=False, spillover=True) total_segments = 16 record_size = 1024 - base_ts = 1664453149000 msg_count = (self.log_segment_size * total_segments) // record_size local_retention = self.log_segment_size * 4 topic_retention = self.log_segment_size * 8 topic, timestamps = self._create_and_produce( - self.redpanda, True, local_retention, base_ts, record_size, msg_count + self.redpanda, True, local_retention, record_size, msg_count ) # Confirm messages written @@ -648,11 +645,10 @@ def test_timequery_empty_local_log(self): total_segments = 3 record_size = 1024 - base_ts = 1664453149000 msg_count = (self.log_segment_size * total_segments) // record_size local_retention = 1 # Any value works for this test. topic, timestamps = self._create_and_produce( - self.redpanda, True, local_retention, base_ts, record_size, msg_count + self.redpanda, True, local_retention, record_size, msg_count ) # Confirm messages written @@ -748,6 +744,7 @@ class TestReadReplicaTimeQuery(RedpandaTest): log_segment_size = 1024 * 1024 topic_name = "panda-topic" + base_ts = int(time.time() - 600) * 1000 def __init__(self, test_context): super(TestReadReplicaTimeQuery, self).__init__( @@ -790,7 +787,7 @@ def create_read_replica_topic(self) -> None: return False return True - def setup_clusters(self, base_ts, num_messages=0, partition_count=1) -> None: + def setup_clusters(self, num_messages=0, partition_count=1) -> None: spec = TopicSpec( name=self.topic_name, partition_count=partition_count, @@ -807,7 +804,7 @@ def setup_clusters(self, base_ts, num_messages=0, partition_count=1) -> None: msg_size=1024, msg_count=num_messages, batch_max_bytes=10240, - fake_timestamp_ms=base_ts, + fake_timestamp_ms=self.base_ts, ) producer.start() @@ -846,9 +843,8 @@ def query_timestamp(self, ts, kcat_src, kcat_rr): @ducktape_cluster(num_nodes=7) def test_timequery(self): - base_ts = 1664453149000 num_messages = 1000 - self.setup_clusters(base_ts, num_messages, 3) + self.setup_clusters(num_messages, 3) def read_replica_ready(): orig = RpkTool(self.redpanda).describe_topic(self.topic_name) @@ -867,4 +863,4 @@ def read_replica_ready(): kcat1 = KafkaCat(self.redpanda) kcat2 = KafkaCat(self.rr_cluster) for ix in range(0, num_messages, 20): - self.query_timestamp(base_ts + ix, kcat1, kcat2) + self.query_timestamp(self.base_ts + ix, kcat1, kcat2)