diff --git a/src/v/kafka/client/errors.h b/src/v/kafka/client/errors.h index 7947ea7164e12..1ed949b193ca4 100644 --- a/src/v/kafka/client/errors.h +++ b/src/v/kafka/client/errors.h @@ -113,6 +113,7 @@ inline bool is_retriable_error(kafka::error_code ec) { case error_code::duplicate_resource: case error_code::unacceptable_credential: case error_code::transactional_id_not_found: + case error_code::rebootstrap_required: return false; } } diff --git a/src/v/kafka/protocol/errors.cc b/src/v/kafka/protocol/errors.cc index 6eb0d6c4ab9dd..3b12be17c8ebc 100644 --- a/src/v/kafka/protocol/errors.cc +++ b/src/v/kafka/protocol/errors.cc @@ -201,6 +201,8 @@ std::string_view error_code_to_str(error_code error) { return "unknown_topic_id"; case error_code::transactional_id_not_found: return "transactional_id_not_found"; + case error_code::rebootstrap_required: + return "rebootstrap_required"; default: return "unknown_error_code"; } @@ -319,6 +321,7 @@ bool is_retriable(error_code error) { case error_code::duplicate_resource: case error_code::unacceptable_credential: case error_code::transactional_id_not_found: + case error_code::rebootstrap_required: break; } return false; diff --git a/src/v/kafka/protocol/errors.h b/src/v/kafka/protocol/errors.h index 36210c4f33068..a87b1e57ddf3e 100644 --- a/src/v/kafka/protocol/errors.h +++ b/src/v/kafka/protocol/errors.h @@ -240,6 +240,9 @@ enum class error_code : int16_t { unknown_topic_id = 100, // The transactional_id could not be found for describe tx request. transactional_id_not_found = 105, + // Client metadata is stale; the client should rebootstrap to obtain new + // metadata. Introduced by KIP-1102. + rebootstrap_required = 129, }; std::string_view error_code_to_str(error_code error); diff --git a/src/v/kafka/protocol/metadata.h b/src/v/kafka/protocol/metadata.h index 48daccf56bed9..09780966702ee 100644 --- a/src/v/kafka/protocol/metadata.h +++ b/src/v/kafka/protocol/metadata.h @@ -51,7 +51,7 @@ struct metadata_request { metadata_request copy() const { static_assert( - api_type::max_valid == api_version(12), + api_type::max_valid == api_version(13), "Please update the metadata_request::copy method when updating the " "Metadata API"); return { diff --git a/src/v/kafka/protocol/schemata/metadata_request.json b/src/v/kafka/protocol/schemata/metadata_request.json index 6709d3e86990f..aa8cb24a344d4 100644 --- a/src/v/kafka/protocol/schemata/metadata_request.json +++ b/src/v/kafka/protocol/schemata/metadata_request.json @@ -18,7 +18,7 @@ "type": "request", "listeners": ["zkBroker", "broker"], "name": "MetadataRequest", - "validVersions": "0-12", + "validVersions": "0-13", "flexibleVersions": "9+", "fields": [ // In version 0, an empty array indicates "request metadata for all topics." In version 1 and @@ -39,6 +39,8 @@ // Version 11 deprecates IncludeClusterAuthorizedOperations field. This is now exposed // by the DescribeCluster API (KIP-700). // Version 12 supports topic Id. + // + // Version 13 has no request-side changes (top-level ErrorCode added to the response only, KIP-1102). { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+", "about": "The topics to fetch metadata for.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." }, diff --git a/src/v/kafka/protocol/schemata/metadata_response.json b/src/v/kafka/protocol/schemata/metadata_response.json index 3d2adffeaddbb..8a8e85a230df5 100644 --- a/src/v/kafka/protocol/schemata/metadata_response.json +++ b/src/v/kafka/protocol/schemata/metadata_response.json @@ -42,7 +42,9 @@ // Version 11 deprecates ClusterAuthorizedOperations. This is now exposed // by the DescribeCluster API (KIP-700). // Version 12 supports topicId. - "validVersions": "0-12", + // + // Version 13 supports top-level error code in the response (KIP-1102). + "validVersions": "0-13", "flexibleVersions": "9+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, @@ -93,6 +95,8 @@ "about": "32-bit bitfield to represent authorized operations for this topic." } ]}, { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8-10", "default": "-2147483648", - "about": "32-bit bitfield to represent authorized operations for this cluster." } + "about": "32-bit bitfield to represent authorized operations for this cluster." }, + { "name": "ErrorCode", "type": "int16", "versions": "13+", "ignorable": true, + "about": "The top-level error code, or 0 if there was no error." } ] } \ No newline at end of file diff --git a/src/v/kafka/server/handlers/metadata.h b/src/v/kafka/server/handlers/metadata.h index f5359771bc9a1..e79e4d38a1f86 100644 --- a/src/v/kafka/server/handlers/metadata.h +++ b/src/v/kafka/server/handlers/metadata.h @@ -27,7 +27,7 @@ memory_estimate_fn metadata_memory_estimator; using metadata_handler = single_stage_handler< metadata_api, 0, - 12, + 13, metadata_memory_estimator, default_scheduling_group_provider, latency_hist::yes>; diff --git a/src/v/pandaproxy/error.cc b/src/v/pandaproxy/error.cc index bfcc5e24ea757..9b9f66569a456 100644 --- a/src/v/pandaproxy/error.cc +++ b/src/v/pandaproxy/error.cc @@ -277,6 +277,7 @@ std::error_condition make_error_condition(std::error_code ec) { case kec::group_subscribed_to_topic: case kec::unstable_offset_commit: case kec::no_reassignment_in_progress: + case kec::rebootstrap_required: return rec::kafka_error; case kec::network_exception: case kec::coordinator_load_in_progress: diff --git a/tests/docker/ducktape-deps/kcl b/tests/docker/ducktape-deps/kcl index e7d83b3f265fd..195ec5eb8c1ec 100644 --- a/tests/docker/ducktape-deps/kcl +++ b/tests/docker/ducktape-deps/kcl @@ -1,4 +1,4 @@ #!/usr/bin/env bash set -e -go install github.com/twmb/kcl@v0.16.0 +go install github.com/twmb/kcl@v0.18.0 mv /root/go/bin/kcl /usr/local/bin/ diff --git a/tests/rptest/clients/kcl.py b/tests/rptest/clients/kcl.py index 0115ab52ffadc..2808ba577e23f 100644 --- a/tests/rptest/clients/kcl.py +++ b/tests/rptest/clients/kcl.py @@ -20,6 +20,17 @@ from ducktape.utils.util import wait_until +# Kafka error codes → names, used to translate kcl v0.18's numeric `error` field +# back into the names tests grep for in RuntimeError messages (e.g. "INVALID_CONFIG"). +# kcl v0.16 printed names directly; v0.18 only prints codes. Unknown codes fall back +# to "ERROR_" so the numeric value still surfaces. +_KAFKA_ERROR_NAMES = { + 0: "NONE", + 40: "INVALID_CONFIG", + 89: "THROTTLING_QUOTA_EXCEEDED", +} + + class KclPartitionOffset(NamedTuple): broker: str topic: str @@ -135,8 +146,10 @@ def list_offsets(self, topics: str | list[str]) -> list[KclPartitionOffset]: lines = self._cmd(cmd).splitlines() ret: list[KclPartitionOffset] = [] for l in lines: + # kcl v0.18 list-offsets output: BROKER TOPIC PARTITION START STABLE END ERROR. + # The STABLE column was added; we don't surface it in KclPartitionOffset. m = re.match( - r" *(?P\d+) +(?P.+?) +(?P\d+) +(?P-?\d*?) +(?P-?\d*?) +(?P.*) *", + r" *(?P\d+) +(?P.+?) +(?P\d+) +(?P-?\d*?) +(?P-?\d*?) +(?P-?\d*?) +(?P.*) *", l, ) if m: @@ -190,7 +203,7 @@ def _alter_config( :param entity_type: one of 'broker', 'topic' :param entity: string-izable entity, or None to omit """ - cmd = ["admin", "configs", "alter"] + cmd = ["--format=json", "admin", "configs", "alter"] if entity_type == "broker": cmd.append("-tb") @@ -201,21 +214,38 @@ def _alter_config( if incremental: cmd.append("-i") + # `-s key=value` is the new clean flag and implicitly sends + # IncrementalAlterConfigs (kafka API key 44). + for k, v in values.items(): + cmd.extend(["-s", f"{k}={v}"]) else: # By default, non-incremental AlterConfig will prompt on stdin (and hang) - cmd.append("--no-confirm") - for k, v in values.items(): - cmd.extend(["-k", f"s:{k}={v}" if incremental else f"{k}={v}"]) + cmd.append("--yes") + # `-k key=value` is the deprecated --kv syntax preserved in v0.18. + # It still sends the legacy AlterConfigs (kafka API key 33) on the + # wire -- which is the path tests like ClusterConfigTest.test_alter_configs + # depend on to verify redpanda's rejection. v0.18's new --set/--delete + # flags auto-enable incremental mode and so cannot reach the legacy API. + for k, v in values.items(): + cmd.extend(["-k", f"{k}={v}"]) if entity: # cmd needs to be string, so handle things like broker=1 cmd.append(str(entity)) - r = self._cmd(cmd, attempts=1, node=node) - if "OK" not in r: - raise RuntimeError(r) - else: - return r + # kcl v0.18's text output no longer carries the kafka error name (it + # prints the numeric code and, for broker alters, an empty message on + # success). Parse the JSON response so we can synthesize an "OK" string + # on success and a RuntimeError whose message contains the kafka error + # name on failure -- the contract callers have relied on since v0.16. + raw = self._cmd(cmd, attempts=1, node=node) + result = json.loads(raw)["results"][0] + error_code = result["error"] + error_message = result["error_message"] + if error_code == 0: + return f"OK: {error_message}" if error_message else "OK" + error_name = _KAFKA_ERROR_NAMES.get(error_code, f"ERROR_{error_code}") + raise RuntimeError(f"{error_name}: {error_message}") def alter_broker_config( self, values: dict[str, Any], incremental: bool, broker: Any | None = None @@ -241,7 +271,7 @@ def delete_broker_config(self, keys: list[str], incremental: bool) -> str: if incremental: cmd.append("-i") for k in keys: - cmd.extend(["-k", f"d:{k}" if incremental else k]) + cmd.extend(["--delete", k]) return self._cmd(cmd, attempts=1) @@ -264,7 +294,22 @@ def describe_topic( if with_types: cmd.append("--with-types") - return self._cmd(cmd, attempts=1, node=node) + # kcl v0.18 puts the property table on stdout and (when --with-docs is + # set) the doc-strings section on stderr. Ask _cmd to merge stderr + # into stdout in that case so callers see the same single-blob output + # v0.16 produced. + stderr = subprocess.STDOUT if with_docs else subprocess.PIPE + output = self._cmd(cmd, attempts=1, node=node, stderr=stderr) + # kcl v0.18 prefixes the response with a "KEY [TYPE] VALUE SOURCE" + # header row; drop the first line so callers parsing the table see + # only data rows. Slice the original string to preserve trailing + # newlines that callers rely on when re-splitting (a `\n`.join here + # would collapse a trailing empty line). + if output.lstrip().startswith("KEY"): + newline_idx = output.find("\n") + if newline_idx != -1: + output = output[newline_idx + 1 :] + return output def offset_delete( self, group: str, topic_partitions: dict[str, list[int]] @@ -290,8 +335,15 @@ def offset_delete( ) ) - cmd = ["group", "offset-delete", "-j", group] + request_args_w_flags - return json.loads(self._cmd(cmd, attempts=5)) + # kcl v0.18 exits non-zero on per-item failures but still emits valid + # JSON on stdout; catch the CalledProcessError and parse e.output so + # callers can inspect per-item errors. + cmd = ["--format=json", "group", "offset-delete", group] + request_args_w_flags + try: + raw = self._cmd(cmd, attempts=1) + except subprocess.CalledProcessError as e: + raw = e.output + return json.loads(raw) def sasl_options(self) -> list[str]: if self.sasl_enabled(): @@ -351,6 +403,10 @@ def has_partition_err(line: str, err: str) -> bool: def do_alter_partitions() -> bool: nonlocal lines lines = self._cmd(cmd).splitlines() + # kcl v0.18 prefixes the response with a "TOPIC PARTITION STATUS + # DETAIL" header row; drop it so callers see only data rows. + if lines and lines[0].lstrip().startswith("TOPIC"): + lines = lines[1:] # Check for errors here instead of outside the KCL wrapper # because test writers can use method params to account for their expectations @@ -449,15 +505,43 @@ def _cmd( input: str | None = None, attempts: int = 5, node: Any | None = None, + as_version: str | None = None, + stderr: int = subprocess.PIPE, ) -> str: """ :param attempts: how many times to try before giving up (1 for no retries) - :return: stdout string + :param as_version: if set, pass ``--as-version `` as a global kcl + flag (e.g. ``"2.7.0"``). This caps the franz-go + client's per-API max-versions to that Kafka + release's table. Used by ``RawKCL`` to target + specific request versions without invoking kcl's + raw-req version-pin code path, which anchors + MinVersions to ``kversion.Stable()`` and would + mandate Metadata v13 from the broker. + :param stderr: passed through to ``subprocess.check_output``. The + default (``subprocess.PIPE``) captures stderr + separately so ``CalledProcessError.stderr`` + carries kcl's error text (v0.18 puts server-side + messages like "CLUSTER_AUTHORIZATION_FAILED" on + stderr; tests reading ``e.stderr`` pick those + up). Pass ``subprocess.STDOUT`` for the rare + command whose successful output is split across + streams (``admin configs describe --with-docs`` + puts the property table on stdout and the doc + strings on stderr) -- the merged blob is then + returned as the function's result. + :return: stdout (or stdout+stderr merged, depending on ``stderr``). + One v0.18 command (``group offset-delete``) exits non-zero + on per-item failures while still emitting a valid JSON body + on stdout; that call site wraps this helper in + ``try/except CalledProcessError`` and reads the response + from ``e.output``. """ brokers = node.name if node is not None else self._redpanda.brokers() cmd = ( ["kcl", "-X", f"seed_brokers={brokers}", "--no-config-file"] + + (["--as-version", as_version] if as_version is not None else []) + self.sasl_options() + cmd ) @@ -466,7 +550,10 @@ def _cmd( for retry in reversed(range(attempts)): try: res = subprocess.check_output( - cmd, text=True, input=input, stderr=subprocess.STDOUT + cmd, + text=True, + input=input, + stderr=stderr, ) self._redpanda.logger.debug(res) return res @@ -521,6 +608,60 @@ def create_topics( except Exception: return [] + @staticmethod + def _unwrap_raw_response(raw_json: str) -> Any: + """kcl v0.18 wraps `misc raw-req` output in + ``{"_command": ..., "_version": ..., "response": ...}``; return just + the response payload so callers don't need to know about the envelope. + """ + parsed: dict[str, Any] = json.loads(raw_json) + if "_command" in parsed and "response" in parsed: + return parsed["response"] + return parsed + + # Lookup table: (api_key, request_version) -> the smallest Kafka release tag + # whose per-API max version table has `max[api_key] == request_version`. Used + # by raw_* methods to target a specific request version via kcl's + # `--as-version` global flag (which calls `kgo.MaxVersions` with the release + # tag's table). Capping a single client invocation at exactly the requested + # version is functionally equivalent to pinning, because the broker supports + # at-or-above that version in every test scenario. + # + # `--as-version` is used instead of `-v` / JSON `"Version"` because those + # routes through kcl's raw-req pin code path (misc.go:303-316), which anchors + # MinVersions to `kversion.Stable()` and mandates Metadata v13 from the + # broker -- breaking requests against pre-v13 redpandas in upgrade tests. + _AS_VERSION_BY_API: dict[tuple[int, int], str] = { + # CreateTopics (19) + (19, 5): "2.4.0", + (19, 6): "2.7.0", + (19, 7): "2.8.0", + # DeleteTopics (20) + (20, 4): "2.4.0", + (20, 5): "2.7.0", + # CreatePartitions (37) + (37, 2): "2.5.0", + (37, 3): "2.7.0", + # AlterConfigs (33) + (33, 0): "0.11.0", + (33, 1): "2.0.0", + # FindCoordinator (10) + (10, 3): "2.4.0", + # JoinGroup (11) + (11, 5): "2.3.0", + } + + @classmethod + def _as_version_for(cls, api_key: int, request_version: int) -> str: + try: + return cls._AS_VERSION_BY_API[(api_key, request_version)] + except KeyError: + raise ValueError( + f"no --as-version mapping for api_key={api_key} " + f"version={request_version}; add an entry to " + f"RawKCL._AS_VERSION_BY_API" + ) + def raw_create_topics( self, version: int, @@ -531,7 +672,6 @@ def raw_create_topics( "version out of supported redpanda range for this API" ) create_topics_request = { - "Version": version, "ValidateOnly": validate_only, "TimeoutMillis": 60000, "Topics": [ @@ -543,24 +683,27 @@ def raw_create_topics( for t in topics ], } - return self._cmd( + res = self._cmd( ["misc", "raw-req", "-b", str(self._controller_id()), "-k", "19"], input=json.dumps(create_topics_request), + as_version=self._as_version_for(19, version), ) + return json.dumps(self._unwrap_raw_response(res)) def raw_delete_topics(self, version: int, topics: list[str]) -> str: assert version >= 0 and version <= 5, ( "version out of supported redpanda range for this API" ) delete_topics_request = { - "Version": version, "TimeoutMillis": 15000, "TopicNames": topics, } - return self._cmd( + res = self._cmd( ["misc", "raw-req", "-b", str(self._controller_id()), "-k", "20"], input=json.dumps(delete_topics_request), + as_version=self._as_version_for(20, version), ) + return json.dumps(self._unwrap_raw_response(res)) def raw_create_partitions( self, version: int, topics: list[KclCreatePartitionsRequestTopic] @@ -569,15 +712,16 @@ def raw_create_partitions( "version out of supported redpanda range for this API" ) create_partitions_request: dict[str, Any] = { - "Version": version, "ValidateOnly": False, "TimeoutMillis": 15000, "Topics": [{"Topic": t.name, "Count": t.num_partitions} for t in topics], } - return self._cmd( + res = self._cmd( ["misc", "raw-req", "-b", str(self._controller_id()), "-k", "37"], input=json.dumps(create_partitions_request), + as_version=self._as_version_for(37, version), ) + return json.dumps(self._unwrap_raw_response(res)) def raw_alter_topic_config( self, version: int, topic: str, configs: dict[str, Any] @@ -586,7 +730,6 @@ def raw_alter_topic_config( "version out of supported redpanda range for this API" ) alter_configs_request: dict[str, Any] = { - "Version": version, "TimeoutMillis": 15000, "Resources": [ {"ResourceType": "TOPIC", "ResourceName": topic, "Configs": []} @@ -599,10 +742,12 @@ def raw_alter_topic_config( ] self._redpanda.logger.info(f"DBG: {json.dumps(alter_configs_request)}") - return self._cmd( + res = self._cmd( ["misc", "raw-req", "-b", str(self._controller_id()), "-k", "33"], input=json.dumps(alter_configs_request), + as_version=self._as_version_for(33, version), ) + return json.dumps(self._unwrap_raw_response(res)) def raw_alter_quotas( self, body: dict[str, Any], node: Any | None = None @@ -612,26 +757,39 @@ def raw_alter_quotas( input=json.dumps(body), node=node, ) - return json.loads(res) + return self._unwrap_raw_response(res) def raw_describe_quotas(self, body: dict[str, Any]) -> dict[str, Any]: res = self._cmd( ["misc", "raw-req", "-b", str(self._controller_id()), "-k", "48"], input=json.dumps(body), ) - return json.loads(res) + return self._unwrap_raw_response(res) - def raw_find_coordinator(self, body: dict[str, Any]) -> dict[str, Any]: - res = self._cmd(["misc", "raw-req", "-k", "10"], input=json.dumps(body)) - return json.loads(res) + def raw_find_coordinator( + self, body: dict[str, Any], version: int | None = None + ) -> dict[str, Any]: + res = self._cmd( + ["misc", "raw-req", "-k", "10"], + input=json.dumps(body), + as_version=self._as_version_for(10, version) + if version is not None + else None, + ) + return self._unwrap_raw_response(res) - def raw_join_group(self, body: dict[str, Any]) -> dict[str, Any]: + def raw_join_group( + self, body: dict[str, Any], version: int | None = None + ) -> dict[str, Any]: res = self.raw_find_coordinator( - {"Version": 3, "CoordinatorKey": body["Group"], "CoordinatorType": 0} + {"CoordinatorKey": body["Group"], "CoordinatorType": 0}, + version=3, ) - res = self._cmd( ["misc", "raw-req", "-b", str(res["NodeID"]), "-k", "11"], input=json.dumps(body), + as_version=self._as_version_for(11, version) + if version is not None + else None, ) - return json.loads(res) + return self._unwrap_raw_response(res) diff --git a/tests/rptest/tests/consumer_group_test.py b/tests/rptest/tests/consumer_group_test.py index bfcc9e6724eae..a5560b812dc09 100644 --- a/tests/rptest/tests/consumer_group_test.py +++ b/tests/rptest/tests/consumer_group_test.py @@ -764,13 +764,13 @@ def test_last_member_expiry_with_pending_member(self): # pending members** resp = self.kcl.raw_join_group( { - "Version": 5, "Group": group, "SessionTimeoutMillis": 60000, "RebalanceTimeoutMillis": 60000, "ProtocolType": "consumer", "Protocols": [{"Name": "range"}], - } + }, + version=5, ) self.redpanda.logger.debug(f"JoinGroupResponse: {resp}") member_id_required = 79 @@ -970,10 +970,12 @@ def create_consumer(instance_id: int) -> Consumer: self.logger.info("Waiting for group to become stable") wait_until( - lambda: self.admin_client.describe_consumer_groups(group_ids=[group])[group] - .result() - .state - == ConsumerGroupState.STABLE, + lambda: ( + self.admin_client.describe_consumer_groups(group_ids=[group])[group] + .result() + .state + == ConsumerGroupState.STABLE + ), 20, 1, retry_on_exc=True, @@ -1171,10 +1173,12 @@ def get_coordinator(): moved = move_partition(topic="__consumer_offsets", partition=0) assert moved, "Failed to move coordinator" wait_until( - lambda: self.admin_client.describe_consumer_groups(group_ids=[group])[group] - .result() - .state - == ConsumerGroupState.STABLE, + lambda: ( + self.admin_client.describe_consumer_groups(group_ids=[group])[group] + .result() + .state + == ConsumerGroupState.STABLE + ), 20, 1, retry_on_exc=True, diff --git a/tests/rptest/tests/partition_reassignments_test.py b/tests/rptest/tests/partition_reassignments_test.py index 3bbd9be70f6f4..766cae831ea1b 100644 --- a/tests/rptest/tests/partition_reassignments_test.py +++ b/tests/rptest/tests/partition_reassignments_test.py @@ -501,7 +501,9 @@ def test_disable_alter_reassignment_api(self): kcl.alter_partition_reassignments({}) assert "alter partition reassignments should have failed" except subprocess.CalledProcessError as e: - assert "AlterPartitionReassignment API is disabled. See" in e.output + # kcl v0.18 writes server-side error text to stderr (v0.16 merged + # stderr into stdout, so the same message used to land in e.output). + assert "AlterPartitionReassignment API is disabled. See" in e.stderr @cluster(num_nodes=6) def test_add_partitions_with_inprogress_reassignments(self): @@ -671,7 +673,9 @@ def user_exists(): f"AlterPartition with user {username} passed. Expected fail." ) except subprocess.CalledProcessError as e: - if e.output.startswith("CLUSTER_AUTHORIZATION_FAILED"): + # kcl v0.18 writes server-side error text to stderr (v0.16 merged + # stderr into stdout, so the same message used to land in e.output). + if e.stderr.startswith("CLUSTER_AUTHORIZATION_FAILED"): pass else: raise @@ -683,7 +687,7 @@ def user_exists(): f"ListPartition with user {username} passed. Expected fail." ) except subprocess.CalledProcessError as e: - if e.output.startswith("CLUSTER_AUTHORIZATION_FAILED"): + if e.stderr.startswith("CLUSTER_AUTHORIZATION_FAILED"): pass else: raise