Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 18 additions & 22 deletions tests/rptest/tests/timequery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ class BaseTimeQuery:
test_context: dict
logger: Logger

base_ts = int(time.time() - 600) * 1000

def _create_and_produce(
self, cluster, cloud_storage, local_retention, base_ts, record_size, msg_count
self, cluster, cloud_storage, local_retention, record_size, msg_count
):
topic = TopicSpec(name="tqtopic", partition_count=1, replication_factor=3)
self.client().create_topic(topic)
Expand Down Expand Up @@ -93,20 +95,19 @@ def _create_and_produce(
# but small enough that we are getting plenty of distinct batches.
batch_max_bytes=record_size * 10,
# A totally arbitrary artificial timestamp base in milliseconds
fake_timestamp_ms=base_ts,
fake_timestamp_ms=self.base_ts,
)
producer.start()
producer.wait()

# We know timestamps, they are generated linearly from the
# base we provided to kgo-verifier
timestamps = dict((i, base_ts + i) for i in range(0, msg_count))
timestamps = dict((i, self.base_ts + i) for i in range(0, msg_count))
return topic, timestamps

def _test_timequery(self, cluster, cloud_storage: bool, batch_cache: bool):
total_segments = 12
record_size = 1024
base_ts = 1664453149000
msg_count = (self.log_segment_size * total_segments) // record_size
local_retention = self.log_segment_size * 4
kcat = KafkaCat(cluster)
Expand All @@ -116,13 +117,13 @@ def _test_timequery(self, cluster, cloud_storage: bool, batch_cache: bool):
name="tq_empty_topic", partition_count=1, replication_factor=3
)
self.client().create_topic(empty_topic)
offset = kcat.query_offset(empty_topic.name, 0, base_ts)
offset = kcat.query_offset(empty_topic.name, 0, self.base_ts)
self.logger.info(f"Time query returned offset {offset}")
assert offset == -1, f"Expected -1, got {offset}"

# Create a topic and produce a run of messages we will query.
topic, timestamps = self._create_and_produce(
cluster, cloud_storage, local_retention, base_ts, record_size, msg_count
cluster, cloud_storage, local_retention, record_size, msg_count
)
for k, v in timestamps.items():
self.logger.debug(f" Offset {k} -> Timestamp {v}")
Expand Down Expand Up @@ -268,11 +269,10 @@ def _test_timequery_below_start_offset(self, cluster):
total_segments = 3
local_retain_segments = 1
record_size = 1024
base_ts = 1664453149000
msg_count = (self.log_segment_size * total_segments) // record_size

topic, timestamps = self._create_and_produce(
cluster, False, local_retain_segments, base_ts, record_size, msg_count
cluster, False, local_retain_segments, record_size, msg_count
)

self.client().alter_topic_config(
Expand All @@ -293,7 +293,7 @@ def start_offset():

kcat = KafkaCat(cluster)
lwm_before = start_offset()
offset = kcat.query_offset(topic.name, 0, base_ts)
offset = kcat.query_offset(topic.name, 0, self.base_ts)
lwm_after = start_offset()

# When querying before the start of the log, we should get
Expand Down Expand Up @@ -409,11 +409,10 @@ def test_timequery_with_local_gc(self):
self.set_up_cluster(cloud_storage=True, batch_cache=False, spillover=False)
local_retention = self.log_segment_size * 4
record_size = 1024
base_ts = 1664453149000
msg_count = (self.log_segment_size * total_segments) // record_size

topic, timestamps = self._create_and_produce(
self.redpanda, True, local_retention, base_ts, record_size, msg_count
self.redpanda, True, local_retention, record_size, msg_count
)

# While waiting for local GC to occur, run several concurrent
Expand Down Expand Up @@ -487,11 +486,10 @@ def test_timequery_with_trim_prefix(self, cloud_storage: bool, spillover: bool):
)
total_segments = 12
record_size = 1024
base_ts = 1664453149000
msg_count = (self.log_segment_size * total_segments) // record_size
local_retention = self.log_segment_size * 4
topic, timestamps = self._create_and_produce(
self.redpanda, True, local_retention, base_ts, record_size, msg_count
self.redpanda, True, local_retention, record_size, msg_count
)

# Confirm messages written
Expand Down Expand Up @@ -547,12 +545,11 @@ def test_timequery_with_spillover_gc_delayed(self):
self.set_up_cluster(cloud_storage=True, batch_cache=False, spillover=True)
total_segments = 16
record_size = 1024
base_ts = 1664453149000
msg_count = (self.log_segment_size * total_segments) // record_size
local_retention = self.log_segment_size * 4
topic_retention = self.log_segment_size * 8
topic, timestamps = self._create_and_produce(
self.redpanda, True, local_retention, base_ts, record_size, msg_count
self.redpanda, True, local_retention, record_size, msg_count
)

# Confirm messages written
Expand Down Expand Up @@ -648,11 +645,10 @@ def test_timequery_empty_local_log(self):

total_segments = 3
record_size = 1024
base_ts = 1664453149000
msg_count = (self.log_segment_size * total_segments) // record_size
local_retention = 1 # Any value works for this test.
topic, timestamps = self._create_and_produce(
self.redpanda, True, local_retention, base_ts, record_size, msg_count
self.redpanda, True, local_retention, record_size, msg_count
)

# Confirm messages written
Expand Down Expand Up @@ -748,6 +744,7 @@ class TestReadReplicaTimeQuery(RedpandaTest):

log_segment_size = 1024 * 1024
topic_name = "panda-topic"
base_ts = int(time.time() - 600) * 1000

def __init__(self, test_context):
super(TestReadReplicaTimeQuery, self).__init__(
Expand Down Expand Up @@ -790,7 +787,7 @@ def create_read_replica_topic(self) -> None:
return False
return True

def setup_clusters(self, base_ts, num_messages=0, partition_count=1) -> None:
def setup_clusters(self, num_messages=0, partition_count=1) -> None:
spec = TopicSpec(
name=self.topic_name,
partition_count=partition_count,
Expand All @@ -807,7 +804,7 @@ def setup_clusters(self, base_ts, num_messages=0, partition_count=1) -> None:
msg_size=1024,
msg_count=num_messages,
batch_max_bytes=10240,
fake_timestamp_ms=base_ts,
fake_timestamp_ms=self.base_ts,
)

producer.start()
Expand Down Expand Up @@ -846,9 +843,8 @@ def query_timestamp(self, ts, kcat_src, kcat_rr):

@ducktape_cluster(num_nodes=7)
def test_timequery(self):
base_ts = 1664453149000
num_messages = 1000
self.setup_clusters(base_ts, num_messages, 3)
self.setup_clusters(num_messages, 3)

def read_replica_ready():
orig = RpkTool(self.redpanda).describe_topic(self.topic_name)
Expand All @@ -867,4 +863,4 @@ def read_replica_ready():
kcat1 = KafkaCat(self.redpanda)
kcat2 = KafkaCat(self.rr_cluster)
for ix in range(0, num_messages, 20):
self.query_timestamp(base_ts + ix, kcat1, kcat2)
self.query_timestamp(self.base_ts + ix, kcat1, kcat2)