Conversation
This PR - Eliminates race condition: Application fully initializes before store queries - More deterministic tests: Waits for RUNNING state instead of hoping timing works out - Especially helps in-memory stores: They need time to rebuild state from scratch - Follows Kafka Streams test best practices: Aligns with patterns used elsewhere in the codebase Reviewers: TengYao Chi <frankvicky@apache.org>
…pache#21720) Implement target assignment batching for consumer groups. Delay computing the next assignment for consumer groups until their assignment interval has elapsed since their last assignment computation finished. We run request tests with assignment batching disabled, except for heartbeat tests, which we run with and without assignment batching. We run a selection of integration and system tests with and without assignment batching. Reviewers: Christo Lolov <lolovc@amazon.com>, David Jacot <djacot@confluent.io>
…pache#21726) see apache#21573 (comment) Replace `Utils.mkMap()` calls with `Map.of()` equivalents throughout the clients module. Use `LinkedHashMap` where iteration order matters (toString tests, partition assignor logic). Reviewers: Christo Lolov <lolovc@amazon.com>, Mickael Maison <mimaison@users.noreply.github.com>, Nilesh Kumar <nileshkumar3@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
We had several issues with offset management that were causing the system tests to fail: 1. An NPE in `RocksDBStore`, caused by a `null` offset being passed to `commit`. 2. Inconsistent behaviour between `RocksDBStore` and `LegacyCheckpointingStateStore` for both `null` offsets and an empty `Map` in `commit`. - This was largely due to these cases not being properly defined in the `commit` method contract; which has now been addressed. 3. No way to wipe offsets from `RocksDBStore` when the store was corrupted. We now interpret an empty `Map` in `commit` as an instruction to wipe all committed offsets. - `ProcessorStateManager` now commits empty offsets to `corrupted` stores to force them to wipe their offsets. 3. `GlobalStateManagerImpl` didn't wipe state under EOS when a store was detected as corrupted. This has now been added, consistent with the behaviour of `ProcessorStateManager` 4. Many `StateStore` implementations that delegate to an internal `RocksDBStore` did not implement either `managesOffsets` or `committedOffset`, despite implementing `commit`. This caused these stores to be incorrectly wrapped in a `LegacyCheckpointingStateStore`, which conflicted with the offsets being tracked in RocksDB. 5. Now that `.checkpoint` files are not used for `RocksDBStore`, some tests no longer make sense. 6. In `GlobalKTableEOSIntegrationTest`, `shouldSkipOverTxMarkersOnRestore` and `shouldSkipOverAbortedMessagesOnRestore` had to be removed, as they depended on the ability to externally preload checkpoint offsets, which is no longer possible now they're stored in RocksDB. 7. `LegacyCheckpointingStateStore` no longer uses the `OFFSET_UNKNOWN` sentinel value, except during migration of old `.checkpoint` files. Reviewers: Bill Bejeck <bbejeck@apache.org>
…e#21453) This change adds a new handleLoadBootstrap callback to RaftClient.Listener objects so that applications can handle bootstrapping snapshots. The snapshot sent to the handleLoadBootstrap callback contains state that has not been committed by KRaft. The snapshot sent to handleLoadSnapshot contains state that has been committed by KRaft. In a future change, QuorumController will use this feature to implement metadata bootstrapping using a KRaft bootstrap snapshot instead of the bootstrap.checkpoint. Reviewers: José Armando García Sancio <jsancio@apache.org>, Kevin Wu <kevin.wu2412@gmail.com>
…pache#21784) This patch removes dead code `isTimeStamped`. Reviewers: Alieh Saeedi <asaeedi@confluent.io>, Matthias J. Sax <matthias@confluent.io>
…t Race Condition (apache#21809) This patch slightly revises the logic to resolve topicId in an offset commit request. It avoids setting topicId twice if `topic.topicId` already exists, avoiding race condition in between. This is a follow-up of apache#21692 Reviewers: David Jacot <djacot@confluent.io>
…ache#21721) Implement target assignment batching for share groups. Delay computing the next assignment for share groups until their assignment interval has elapsed since their last assignment computation finished. Also fix a bug where we would bump the group epoch on every heartbeat when the target assignment is delayed and there are unassigned initialized share partitions. As a side effect, we bump the epoch fewer times in tests when assignment batching is enabled. After the epoch is bumped by a metadata update, it is not bumped again by share partition initialization if the next assignment has not been computed yet. We run request tests with assignment batching disabled, except for heartbeat tests, which we run with and without assignment batching. We run a selection of integration and system tests with and without assignment batching. Reviewers: Sushant Mahajan <smahajan@confluent.io>, David Jacot <djacot@confluent.io>
Minor improvements in test and events: - fix test to create the actual event used in real life flow - unmodifiable collections in new events, aligned with existing ones Reviewers: Andrew Schofield <aschofield@confluent.io>, Nilesh Kumar <nileshkumar3@gmail.com>
…che#21810) Remove unused `createNetworkClient` overload in `ClientUtils`. Reviewers: TengYao Chi <frankvicky@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
…pache#21722) Implement target assignment batching for streams groups. Delay computing the next assignment for streams groups until their assignment interval has elapsed since their last assignment computation finished. We run request tests with assignment batching disabled, except for heartbeat tests, which we run with and without assignment batching. We run a selection of integration and system tests with and without assignment batching. Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, David Jacot <djacot@confluent.io>
Add missing enable_assignment_batching parameter to test_consumer_failure. Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Add some test to dumpLogSegmentsTest to enhance test coverage. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
…1780) This PR enables time-ordered window store with headers. Part of KIP-1285. Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Flaky on trunk, log showed poll timer expired, so increasing it to 1000 (aligned with other tests in the file) to ensure that it's still low enough so that when a consumer poll is stopped it eventually hits the poll timer and leaves, but not too low that the other stable consumers may get an expired poll-timer across rebalances. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
…pache#21814) Kafka Streams 4.3 moved offset management from per-task `.checkpoint` files into RocksDB column families (KAFKA-17411/KAFKA-19712). An upgrade path exists (`migrateLegacyOffsets()`) to migrate offsets from old `.checkpoint` files into the new system, but there is no downgrade path — if a user rolls back to a pre-4.3 version, the old `.checkpoint` files no longer exist. This change adds downgrade support: when `upgrade.from` is set to a version older than 4.3, a consolidated per-task `.checkpoint` file is written during close so that an older Kafka Streams version can find its offsets. The conversion uses `OFFSET_UNKNOWN` for null offsets, matching the legacy checkpoint format. The downgrade checkpoint is written from `ProcessorStateManager.close()` for regular tasks and `GlobalStateManagerImpl.close()` for global stores. Reviewers: Bill Bejeck <bbejeck@apache.org>
…in storage and metadata (apache#21659) This PR is another one with changing old JDK 8 API to JDK 11. Question: I wonder if we should add also parts in test code or simply divide it? I didn't cover tests in my previous PR so maybe to stay consistent just make changes to production code and then we can simply add into tests multiple modules? Reviewers: Christo Lolov <lolovc@amazon.com>, Ken Huang <s7133700@gmail.com>, Nilesh Kumar <nileshkumar3@gmail.com> --------- Signed-off-by: see-quick <maros.orsak159@gmail.com>
…oad (apache#21627) Add group-level {consumer,share,streams}.assignment.interval.ms config options to control the delay between assignment calculation. These config options override the dynamic broker-level configs. Add group-level {consumer,share,streams}.assignor.offload.enable config options to control whether assignment calculation is offloaded to a group coordinator background thread. These config options override the dynamic broker-level configs. Since the broker-level configs for these group-level configs are dynamic, we have to use a different approach compared to the existing group-level configs. In the interests of getting these new configs into Apache Kafka 4.3, we defer refactoring GroupConfig until a later patch. Reviewers: majialong <majialoong@gmail.com>, Christo Lolov <lolovc@amazon.com>, David Jacot <djacot@confluent.io>
As described in https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=34840886#ReleaseProcess-CutBranches Reviewers: Christo Lolov <christololov@gmail.com>
testFetchEarliestPendingUploadTimestampNoRemoteStorage ~ testTransactionIndexUpdated part. Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Refactor `warnIfConfigDefinedInWrongRole` to ensure consistent warning messages Reviewers: PoAn Yang <payang@apache.org>, khilesh Chaganti <akhileshchg@users.noreply.github.com>, Chia-Ping Tsai <chia7712@gmail.com>
…e#21794) This PR fixes a bug where `RocksDbTimeOrderedSessionBytesStoreSupplier.get` with `withHeaders=true` created segments using `KeyValueSegment` (default-CF only) instead of `SessionSegmentWithHeaders` (dual-CF with lazy migration), which would cause the upgrade path from non-headers to headers format to fail. The fix introduces `RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders`, widens `RocksDBTimeOrderedSessionStore` to accept either segment type, and updates the supplier to create the correct bytes store. Upgrade tests validate lazy migration at both the store layer and DSL supplier path, plus a `TopologyTestDriver` end-to-end test exercises ON_WINDOW_CLOSE with both PLAIN and HEADERS formats. Reviewers: Matthias J. Sax <matthias@confluent.io>, TengYao Chi <frankvicky@apache.org> Co-authored-by: Matthias J. Sax <matthias@confluent.io>
…Metrics (apache#21038) The intention of the `Consumer` API is that after `close()`, `metrics()` should return an empty map of metrics. The lifecycle management of the various metrics managers in the consumer is inconsistent. Some managers remove all the metrics that were created, some registries remove some of them, and some don't make any effort to remove the metrics at all. This change introduces `AbstractConsumerMetricsManager` as a shared base class for consumer metrics managers, consolidating the steps of metric registration and cleanup logic, as well as unit tests to ensure that the different metrics managers perform the cleanup step. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, TengYao Chi <frankvicky@apache.org>
…e#21825) `shouldCreateHeadersStoreWithOnWindowCloseAndCachingEnabled` is incorrect – we removed a check to make it pass for now, but need to add back the correct assertion after the root cause was fixed. ref: https://github.com/apache/kafka/pull/21580/changes#r2921529513 Reviewers: Alieh Saeedi <asaeedi@confluent.io>, Matthias J. Sax <matthias@confluent.io>
… repartition topics (apache#21817) This PR fixes `StreamsGroupCommand` to include repartition source topics when retrieving committed offsets for streams groups. Offset resets and deletions are still limited exclusively to source topics. Testing - unit tests: - testGetCommittedOffsetsIncludesRepartitionTopics: Verifies that repartition topics are included while changelog and output topics are excluded - testGetCommittedOffsetsWithMultipleSubtopologies: Verifies correct behavior across multiple subtopologies Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
Updates the docker_scan.yml to contain 4.0.2 instead of 4.0.1 for CVE scans. Reviewers: Andrew Schofield <aschofield@confluent.io>
…treams (apache#21781) Mark partitions lost to avoid returning buffered records in the case of a race after the callback completes (app thread moves onto polling, while background moves onto clearing the lost assignment) Same fix applied for Consumer and Stream managers (Share has no callbacks so this does not apply) Reviewers: Nilesh Kumar [nileshkumar3@gmail.com](mailto:nileshkumar3@gmail.com), Lan Ding <isDing_L@163.com>, Ken Huang <s7133700@gmail.com>, PoAn Yang <payang@apache.org>, Lucas Brutschy <lbrutschy@confluent.io>
Resolves a couple of review comments from apache#21711. The remaining comments are in `ShareConsumerTest` and will be addressed separately. Reviewers: Lianet Magrans <lmagrans@confluent.io>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
…pache#21683) Reviewers: TengYao Chi <frankvicky@confluent.io>
…zers (apache#21706) This patch implements two optimizations, and their JMH benchmarks, and opportunistic refactoring. 1. Skipping headers Previously the raw value extraction in headers-aware deserializers undergoes deserialization and/or copying of headers, while only skipping is required. This happens in one case for both empty and nonempty headers. 2. Extracting value / timestamp after empty headers Empty headers have constant metadata footprint: the headers size is varint-encoded 1 byte of 0, and headers themselves consume no bytes. Based on this invariant, the ByteBuffer-based extraction can be replaced with a direct `System.arraycopy`, which is a Java native method optimized for specific platforms. The optimized headers-aware extraction methods: - rawAggregation - rawTimestampedValue - rawValue / rawPlainValue - ValueTimestampHeadersDeserializer.timestamp - ValueTimestampHeadersDeserializer.headers - AggregationWithHeadersDeserializer.headers **Benchmark:** This patch also includes JMH benchmarks to test the speedup. On my local machine, Optimization 1 speedup is 2-6x speedup. Optimization 2 is 1.2-1.3x. Below is the throughput comparison of a recorded JMH run (higher score is better): ``` Benchmark Mode Cnt Score Error Units RawBytesExtractionBenchmark.testHeadersWithoutHeaders thrpt 15 10158.764 ± 85.564 ops/s RawBytesExtractionBenchmark.testHeadersWithoutHeadersOpt thrpt 15 14824.176 ± 1244.455 ops/s RawBytesExtractionBenchmark.testRawAggregationWithHeaders thrpt 15 1473.459 ± 7.170 ops/s RawBytesExtractionBenchmark.testRawAggregationWithHeadersOpt thrpt 15 11618.187 ± 235.385 ops/s RawBytesExtractionBenchmark.testRawAggregationWithoutHeaders thrpt 15 8337.728 ± 199.919 ops/s RawBytesExtractionBenchmark.testRawAggregationWithoutHeadersOpt thrpt 15 14564.899 ± 186.405 ops/s RawBytesExtractionBenchmark.testRawTimestampedValueWithoutHeaders thrpt 15 10217.292 ± 108.552 ops/s RawBytesExtractionBenchmark.testRawTimestampedValueWithoutHeadersOpt thrpt 15 12121.074 ± 201.235 ops/s RawBytesExtractionBenchmark.testRawValueWithoutHeaders thrpt 15 11632.484 ± 138.505 ops/s RawBytesExtractionBenchmark.testRawValueWithoutHeadersOpt thrpt 15 14669.563 ± 43.458 ops/s RawBytesExtractionBenchmark.testTimestampWithoutHeaders thrpt 15 14858.778 ± 39.301 ops/s RawBytesExtractionBenchmark.testTimestampWithoutHeadersOpt thrpt 15 19832.718 ± 916.980 ops/s JMH benchmarks done ``` **Test:** - `AggregationWithHeadersDeserializer.rawAggregate` - empty headers: `SessionToHeadersStoreAdapterTest.shouldStripHeadersFromRawAggregationValue` - `Utils.rawPlainValue` - empty headers: `UtilsTest.shouldExtractRawValueWithEmptyHeaders` - empty headers, no timestamp: `UtilsTest.testRawPlainValueWithEmptyHeadersAndInvalidTimestamp` - `Utils.rawTimestampedValue` - empty headers: `UtilsTest.testRawTimestampedValueWithEmptyHeaders` - empty headers, no timestamp: `UtilsTest.testRawTimestampedValueWithEmptyHeadersAndInvalidTimestamp` - `Utils.hasEmptyHeadersAndTimestamp` - min size violation - empty headers - non-empty headers **Refactor** - point all calls to raw value (with timestamp and headers) extraction to common one in Utils. Reviewers: Alieh Saeedi <asaeedi@confluent.io>, TengYao Chi <frankvicky@apache.org>, Matthias J. Sax <matthias@confluent.io>
apache#22126) As explianed in KAFKA-20505, there can be a deadlock when future is completed for the request where next set of actions tries to attain lock on purgatory (checkAndComplete/trigger waiting requests). As the lock might not always be released hence a deadlock can happen. The PR moves such futures out of the lock. I have also reviewed other future completions and doesn't seems we need other changes. I have tested using franz-go Kafka test and can't reproduce the issues in 160 continuous runs. Earlier the issue was reproducible between 20-50 consecutive runs. ``` === Run 160 === === RUN TestShareGroupETL === PAUSE TestShareGroupETL === CONT TestShareGroupETL [09:59:38.788 1][INFO] producing to a new topic for the first time, fetching metadata to learn its partitions; topic: f2c35a17978f41d684a36ab8297dd873138cb3c982f095a7d0e74d7a8589411e ... ... [09:59:43.94 3][INFO] immediate metadata update triggered; why: forced load because we are producing to a topic for the first time [09:59:43.947 3][INFO] done waiting for metadata for new topic; topic: 6ce53f956e91c6b106843ff7aa728451f290cbd9064cefc8dc73ee2bcadef7b9 share_test.go:507: level 1 phase 2: adding consumers after 122923 consumed [09:59:44.225 3][INFO] flushing [09:59:44.225 4][INFO] immediate metadata update triggered; why: querying metadata for consumer initialization ... ... [09:59:44.226 5][INFO] beginning to manage the share group lifecycle; group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35 [09:59:44.226 10][INFO] beginning to manage the share group lifecycle; group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35 [09:59:44.227 3][INFO] leaving share group; group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35, member_id: LP4lpqzQjAm-QxQdCRkSXA== [09:59:44.227 7][INFO] assigning share partitions; group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35, assignments: map[] [09:59:44.227 4][INFO] assigning share partitions; group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35, assignments: map[] ... ... [09:59:49.232 7][INFO] assigning share partitions; group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35, assignments: map[f2c35a17978f41d684a36ab8297dd873138cb3c982f095a7d0e74d7a8589411e:[1 2]] [09:59:49.232 6][INFO] assigning share partitions; group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35, assignments: map[f2c35a17978f41d684a36ab8297dd873138cb3c982f095a7d0e74d7a8589411e:[1]] ... ... [09:59:49.466 18][INFO] immediate metadata update triggered; why: querying metadata for consumer initialization [09:59:49.466 13][INFO] immediate metadata update triggered; why: querying metadata for consumer initialization [09:59:49.466 15][INFO] immediate metadata update triggered; why: querying metadata for consumer initialization [09:59:49.467 16][INFO] beginning to manage the share group lifecycle; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6 [09:59:49.467 14][INFO] beginning to manage the share group lifecycle; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6 [09:59:49.467 11][INFO] beginning to manage the share group lifecycle; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6 ... ... [09:59:49.467 15][INFO] beginning to manage the share group lifecycle; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6 [09:59:49.468 13][INFO] assigning share partitions; group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35, assignments: map[] [09:59:49.469 17][INFO] assigning share partitions; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6, assignments: map[] ... ... [09:59:54.472 18][INFO] assigning share partitions; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6, assignments: map[5da63960cb4fe97d887d575a73dfbddc51a2eb8071d119b3a5ba5a2b0d87bc7e:[1]] [09:59:54.485 14][INFO] assigning share partitions; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6, assignments: map[6ce53f956e91c6b106843ff7aa728451f290cbd9064cefc8dc73ee2bcadef7b9:[0]] ... ... [09:59:54.494 12][INFO] metadata update triggered; why: reload trigger due to produce topic still not known [09:59:54.495 12][INFO] producer id initialization success; id: 3524, epoch: 0 [09:59:54.5 13][INFO] producing to a new topic for the first time, fetching metadata to learn its partitions; topic: 6ce53f956e91c6b106843ff7aa728451f290cbd9064cefc8dc73ee2bcadef7b9 [09:59:54.5 13][INFO] immediate metadata update triggered; why: forced load because we are producing to a topic for the first time ... ... [09:59:54.525 11][INFO] leaving share group; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6, member_id: _4Jk9faoDdUlGMlSR9zKmg== share_test.go:605: level 2 rebalance 1: killing l2-c1 after 169339 consumed [09:59:55.101 14][INFO] flushing [09:59:55.101 19][INFO] immediate metadata update triggered; why: querying metadata for consumer initialization [09:59:55.102 19][INFO] beginning to manage the share group lifecycle; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6 [09:59:55.103 14][INFO] leaving share group; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6, member_id: wJibgxG934tiAuqCPloF_w== [09:59:55.107 19][INFO] assigning share partitions; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6, assignments: map[] share_test.go:619: level 2 rebalance 2: killing l2-c3 after 375726 consumed [09:59:55.401 18][INFO] flushing ... ... [10:00:00.915 20][INFO] leaving share group; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6, member_id: HjzG4-5QwncfYfr8pSmEUQ== share_test.go:377: level 1: 499900 unique keys, 500624 total accepts, 500624 produced, 724 duplicates, 35614 redelivered, max dc 3, consumed 532987 share_test.go:377: level 2: 499900 unique keys, 501513 total accepts, 501513 produced, 1613 duplicates, 20272 redelivered, max dc 2, consumed 518049 share_test.go:704: level 1: 100 purely rejected, 35614 redelivered share_test.go:60: deleting topic f2c35a17978f41d684a36ab8297dd873138cb3c982f095a7d0e74d7a8589411e share_test.go:61: deleting topic f7e388a2de7ef0814328f9186e8c4b73b1f2437490e1b98730af9fb17ee74175 share_test.go:62: deleting topic 5da63960cb4fe97d887d575a73dfbddc51a2eb8071d119b3a5ba5a2b0d87bc7e share_test.go:63: deleting topic 6ce53f956e91c6b106843ff7aa728451f290cbd9064cefc8dc73ee2bcadef7b9 share_test.go:64: deleting topic 7e74eb054cbb02e0de5da8a8018115dc01094496222039f323841770b11b8a12 share_test.go:65: deleting topic 4b8c44d4071cd22272ae9ac694342faa3404bd10b479fe88874bdef4a8a4276d --- PASS: TestShareGroupETL (22.73s) PASS ok github.com/twmb/franz-go/pkg/kgo 22.926s ``` Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang <payang@apache.org>
…roker when using bootstrap controller (apache#22104) Ref apache#22070 (comment) Remove the pre-flight DescribeConfigs existence check in `alterResourceConfig()` since deleting a non-existent config is idempotent, and the check causes a timeout when the target broker is offline. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Trivial PR to remove an overload of the `Formatter.setMetadataLogDirectory` which takes the metadata log directory as an `Optional<String>`, because it's not used. The overall codebase uses the other overload taking just a String (because most of the callers have a String to pass) and then wrapping it into an `Optional`. Reviewers: Andrew Schofield <aschofield@confluent.io> Signed-off-by: Paolo Patierno <ppatierno@live.com>
Several items in the Kafka docs configuration reference seem to include links to other parts of the documentation that do not work anymore on the new website. This PR updates them in the Java code so that they could be fixed in the next release. It is using the pattern already used for some other configuration otpions, so hopefully it is the correct one. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
… parameterized tests (apache#22112) - Same output topic is used for all parameterizations. Isolating them with creating topic prefixing with app id - wait for restoring task is called so that assert works for all of those 10 unmatched recs Reviewers: Matthias J. Sax <matthias@confluent.io>
…ration tests (apache#22125) Cuts redundant parameterized cases from three slow Streams integration tests without losing meaningful coverage: - IQv2StoreIntegrationTest — skips withHeaders=true rows for kind=PAPI and for global stores. DSL_STORE_FORMAT_CONFIG (the setting withHeaders toggles) only affects DSL stores built without an explicit supplier, so those rows are exact duplicates of the withHeaders=false runs. Reviewers: TengYao Chi <frankvicky@apache.org>, Bill Bejeck <bill@confluent.io>
…pache#21760) Description: This PR fixes a potential NullPointerException in OffsetFetcherUtils.regroupPartitionMapByNode when regrouping partitions by leader during offset reset / list-offsets. Background Partitions are grouped by leader via metadata.fetch().leaderFor(tp). If metadata changes between the initial leader lookup and the regroup step (e.g. leadership change or stale metadata), leaderFor(tp) can return null. The previous implementation used Collectors.groupingBy(..., leaderFor(...)), which throws an NPE when the classifier returns null. Fix OffsetFetcherUtils.regroupPartitionMapByNode Replaced the stream-based grouping with a loop that skips partitions whose leader is null, adds them to a caller-provided partitionsToRetry set, and does not trigger metadata refresh (callers are responsible for retry and metadata). Callers OffsetFetcher (classic consumer): passes partitionsToRetry into the helper; in resetPositionsAsync, when the set is non-empty, calls setNextAllowedRetry(partitionsToRetry, now + retryBackoffMs) and metadata.requestUpdate(false). OffsetsRequestManager (new consumer): passes a local retry set into the helper, then adds skipped partitions to state.remainingToSearch (with timestamp) and calls metadata.requestUpdate(false) when the set is non-empty. This keeps existing retry semantics and avoids the NPE. Tests OffsetFetcherTest.testResetPositionsMetadataRefreshWhenLeaderBecomesUnknownDuringRegroup Simulates leaderFor(tp) returning null during regroup (first metadata.fetch() stubbed to a cluster with no partition, then real method). Asserts no exception, partition stays pending reset, and after backoff and a second attempt with valid metadata the offset reset succeeds. OffsetsRequestManagerTest.testFetchOffsetsRegroupSkipsNullLeaderPartition_NoNPE Simulates the same scenario in the fetch-offsets path: currentLeader has a leader but metadata.fetch() returns a cluster where one partition has no leader. Asserts no NPE, one request sent (for the partition with a leader), and that the skipped partition is retried after metadata update and completes successfully. Reviewers: TengYao Chi <frankvicky@apache.org> --------- Co-authored-by: TengYao Chi <kitingiao@gmail.com>
…pache#22140) KIP-1035 implementation added a new path when closing Rocksdb stores: persist a status flag that helps to identify unclean shutdowns at the store level. We should only persist the status as closed if the store was previously open (and fully reachable) because after an unclean shutdown, RocksDB usually executes a background recovery process that causes stalls on the first write. Reviewers: Bill Bejeck <bbejeck@apache.org>, Matthias J. Sax <matthias@confluent.io>
The current **SafeObjectInputStream** uses a denylist based approach - having a fixed denylist to be validated against for deserialization. This is a bad security practise and has also been advised so in the original PR. Making this as a allowlist instead and allowing safe BASE_TYPES which are required by current caller (_org.apache.kafka.connect.storage.FileOffsetBackingStore_) Also providing a `SafeObjectInputStream(InputStream in, Set<String> allowedClasses)` so if any consumer require any specific allowedClasses they can pass in here. Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
…rtitionMetadataClient (apache#21842) ### Description This pull request addresses the redundant thread usage detailed in [KAFKA-19937](https://issues.apache.org/jira/browse/KAFKA-19937) affecting the `PersisterStateManager` and `NetworkPartitionMetadataClient` classes specifically. Presently each creates/manages its own separate `SystemTimerReaper` instances, but rely on identical timers with independent tasks. The changes proposed address this by introducing a new, sharable instance of the thread to reduce overhead. ### Key Changes - Updated `BrokerServer` to create a single shared `SystemTimerReaper` instance used by both `PersisterStateManager` and `NetworkPartitionMetadataClient`, with cleanup in the shutdown path after both components have been stopped. - Moved timer ownership to the caller for the affected classes to the caller (e.g., `PersisterStateManager.stop()` and `NetworkPartitionMetadataClient.close()` no longer close their injected timer, as lifecycle is managed by `BrokerServer`). - This specific timer ownership behavior is documented via JavaDocs for both `PersisterStateManager` and `NetworkPartitionMetadataClient` - Added null validation to the `SystemTimerReaper` constructor arguments. ### Tests and Verification Verified that all existing test suites still pass as expected and added the following to verify new behavior and usage related to the above changes: - Extended `SystemTimerReaperTest.java` to verify null validity and timer-sharing behavior (e.g., two consumers sharing a timer can both schedule and expire tasks independently). - Updated `PersisterStateManagerTest.java` to verify that `stop()` does not close the timer, consistent with the new caller-ownership contract. ### Reviewer(s) Tagging @AndrewJSchofield (initial reporter) Reviewers: Sushant Mahajan <smahajan@confluent.io>, Andrew Schofield <aschofield@confluent.io>
…ache#21772) GroupCoordinatorIntegrationTest had several tests that only covered consumer groups but not streams groups. This adds four streams group equivalents to match coverage: - Coordinator failover after compaction with a member joining and leaving - Coordinator failover after compaction with a member leaving and rejoining - Coordinator failover after compaction with a deleted group - Recreating the __consumer_offsets topic with a streams group Reviewers: Matthias J. Sax <matthias@confluent.io>
…yncKafkaConsumer (apache#22018) When a consumer uses manual assignment (assign()) instead of group subscription, the member remains in the UNSUBSCRIBED state. In this state, heartbeats are skipped. Previously, because the heartbeat interval was initialized to 0, the `maximumTimeToWait` calculation would return 0 when heartbeats were skipped. This caused `pollForFetches` to return immediately and enter a busy-loop, consuming excessive CPU. This patch fixes the issue by ensuring `maximumTimeToWait` returns Long.MAX_VALUE whenever `shouldSkipHeartbeat()` is true. Reviewers: Jiayao Sun <jiayao.s@outlook.com>, Kirk True <kirk@kirktrue.pro>, Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
The last part of testRackAwareAssignment was found to be flaky. This
part moves all topic partitions to different racks and waits for
consumer assignments to settle. Each of the three consumers is expected
to revoke all its partitions and be assigned partitions previously held
by another within a 15 second timeout.
This timeout is not always sufficient. The consumer heartbeat interval
is left at the default of 5,000 ms and each consumer polls every
3,000 ms. In the worst case, it takes a consumer around 7,000 ms to
reconcile an assignment change. An additional 3,000 ms round of polling
may be required when a consumer needs to auto-commit offsets. Two rounds
of reconciliation must happen within 15,000 ms.
The timeline of an example failing run looks like:
-02.956 Group coordinator computes target assignment at epoch=6
consumer0=[0] consumer1=[1, 2] consumer2=[3, 4, 5]
+00.000 15 second timeout starts
+03.179 consumer0 heartbeats This is the first heartbeat since
the rack reassignments. +03.179 Group coordinator computes target
assignment at epoch=7 consumer0=[5] consumer1=[3, 4]
consumer2=[0, 1, 2] +03.186 consumer0 heartbeat receives assignment []
+04.151 consumer1 starts poll() +04.877 consumer1 heartbeats +04.878
consumer1 heartbeat receives assignment [] +05.155 consumer1 ends
poll()
+07.259 consumer1 starts poll() +07.259 consumer1 sends auto-commit
with offsets for [1, 2] +07.288 consumer1 receives auto-commit
response +08.263 consumer1 ends poll()
+10.379 consumer1 starts poll() +10.379 consumer1 calls
onPartitionsRevoked with [1, 2] +10.379 consumer1 calls
onPartitionsAssigned with [] +10.382 consumer1 heartbeats with
owned partitions [] +10.387 consumer1 heartbeat receives assignment
[3, 4] +10.483 consumer1 calls onPartitionsAssigned [3, 4] +10.483
consumer1 heartbeats with owned partitions [3, 4] +11.384 consumer1
ends poll()
+15.000 15 second timeout elapses and the test fails +15.300 consumer2
heartbeat receives assignment [0, 1, 2]
To fix the test we:
* Make config changes to reduce the reconciliation time. This also
reduces the test duration from 60 seconds to 20 seconds.
* Disable auto-commit, since the consumers do not consume any records.
* Reduce the heartbeat interval to 1,000 ms.
* Reduce the poll timeouts to 100 ms, so that polls happen every
300 ms.
* Raise the final timeout to 30 seconds, since under heavy CI load, the
reduced intervals above aren't effective.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, David Jacot
<djacot@confluent.io>
…elevant protocol (apache#22144) Ref : https://issues.apache.org/jira/browse/KAFKA-20424 - 18 tests with updated comments (classic only) - 1 test (testAutoCommitSentBeforePositionUpdate) now runs for both protocols. Reviewers: Andrew Schofield <aschofield@confluent.io>
Fix an incorrect assertion in ConsumerGroupHeartbeatRequestTest's testConsumerGroupHeartbeatRebalance. Reviewers: Andrew Schofield <aschofield@confluent.io>
This patch refactors `TxnOffsetCommitResponse.Builder` to mirror the class hierarchy used by `OffsetCommitResponse.Builder`: the `Builder` becomes abstract and the existing topic-name-keyed logic moves to a `TopicNameBuilder` subclass. A new `newBuilder()` factory replaces the only direct `new Builder()` call site in `KafkaApis`. Unit tests covering `addPartition`, `addPartitions`, `merge`, and the null-name guard are added in `TxnOffsetCommitResponseTest`. The change is behavior-preserving. It prepares the ground for a future `TopicIdBuilder` subclass that will be added when `TxnOffsetCommit` gains topic ID support. Reviewers: Andrew Schofield <aschofield@confluent.io>
This patch refactors `TxnOffsetCommitRequest.Builder` to match the factory-method style used by `OffsetCommitRequest.Builder`: the public constructors are replaced by a private one plus a single static `forTopicNames(TxnOffsetCommitRequestData, boolean)` factory. The `getTopics(...)` helper is made public so callers can build the topic list themselves. All call sites in production and test code are migrated to build a `TxnOffsetCommitRequestData` and pass it to the factory. The change is behavior-preserving. It prepares the ground for a future `forTopicIdsOrNames(...)` factory that will be added when `TxnOffsetCommit` gains topic ID support. Reviewers: Andrew Schofield <aschofield@confluent.io>
…OffsetCommit (apache#22159) This patch adds a `topicId` parameter to the `TxnOffsetCommitRequestPartition` overload of `OffsetAndMetadata.fromRequest`, mirroring the existing `OffsetCommitRequestPartition` overload. The single caller in `OffsetMetadataManager.commitTransactionalOffset` passes `Uuid.ZERO_UUID` to preserve the current behavior. The `testFromTransactionalRequest` test is converted to a parameterized test that covers both `Uuid.ZERO_UUID` and a random topic ID. Reviewers: Andrew Schofield <aschofield@confluent.io>
…apache#22134) FYI: apache#22104 (comment) Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
…he#22135) The `resolve_reviewer` introduced in apache#22108 used local `git log` to find past `Reviewers:` trailers, but the PR linter runs with the default shallow checkout (`fetch-depth: 1`), so older merged PR trailers are not available on the runner. This PR switches reviewer email resolution to GitHub APIs in this order: 1. the reviewer's latest apache/kafka commit author email 2. `gh search commits` for prior `Reviewers:` trailers by display name, accepted only when the matched commit's associated PR has a review from the same GitHub login 3. the reviewer's GitHub public profile email 4. `Name (github:login)` fallback, without tagging the reviewer It also skips `pr-reviewed.yml` artifact creation once the PR is no longer open. For the motivating apache#22108 case, this resolves `@mimaison` as `Mickael Maison <mimaison@apache.org>` instead of falling back to the GitHub handle. `gh search commits` uses GitHub's Search API bucket. In CI it is authenticated via `GH_TOKEN`, so the limit is 30 requests/minute (unauthenticated is 10 requests/minute). This workflow normally resolves one reviewer per run. In local checks, `mjsax` and `mingyen066` resolved via T1 in 0.5-0.8s, while `mimaison` and `UladzislauBlok` resolved via T2's search-plus-review verification in about 3.0s. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
…config guide (apache#22067) The Streams configuration guide already lists transaction.timeout.ms and its default, but it is easy to overlook when enabling exactly-once processing. This adds a short note directly under the processing.guarantee section to make it more obvious that users should consider the transaction timeout when tuning EOS applications, including the broker-side constraint (transaction.max.timeout.ms) and the trade-offs of increasing the value. Reviewers: Andrew Schofield <aschofield@confluent.io> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
PR apache#7136 standardised the output format for TopicCommand to print `none` when the leader for a TP is unknown. `KafkaServer#leader` in system-tests however fails to account for this change and possibly panics trying to type cast `none` to an integer. This change fixes the regression so that we parse the value correctly and return None if the leader is undefined. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Federico Valeri <fedevaleri@gmail.com>
MINOR: Fix Javadoc for ConfigCommand.java Ref: checkstyle/checkstyle#19718 Reviewers: Ming-Yen Chung <mingyen066@gmail.com>, Roman Ivanov <ivanov-jr@mail.ru>, Chia-Ping Tsai <chia7712@gmail.com>
KIP-1071 adds new broker side configs: - num.warmup.replicas - max.warmup.replicas This PR adds both configs to the broker. Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Lan Ding <isDing_L@163.com>, Chia-Ping Tsai <chia7712@gmail.com>
…e fetch (apache#22170) The PR fixes a suppressed NPE when topic name cannot be resolved during share fetch. The metrics update triggers NPE when topic name is null and the error response gets mapped to UNKNOWN_SERVER_ERROR. Correcting the behaviour to return the correct error code per partition response. Reviewers: Andrew Schofield <aschofield@confluent.io>
Reviewers: Mickael Maison <mickael.maison@gmail.com>
…2167) * Register TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG in LogConfig. * Update tests. Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ming-Yen Chung <mingyen066@gmail.com>
…st (apache#22183) The `emitStrategyAndHeadersMatrix()` currently returns four entries but they are duplicates of ON_WINDOW_UPDATE (and therefore also runs each case twice). This also drops coverage for EmitStrategy.StrategyType.ON_WINDOW_CLOSE, which was previously covered via @EnumSource. Update the method source to generate the full matrix of strategy type(s) and withHeaders values (eg, include both ON_WINDOW_UPDATE and ON_WINDOW_CLOSE, each with withHeaders=false/true) without duplicates. Reviewers: Murali Basani <muralidhar.basani@aiven.io>, Matthias J. Sax <matthias@confluent.io>
The `Uuid#randomUuid` method has been modified to avoid generating UUIDs containing hyphens. Reviewers: Mickael Maison <mickael.maison@gmail.com>, David Arthur <mumrah@gmail.com>, Sean Quah <squah@confluent.io>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Replicated from Apache Kafka PR apache#21313
Original PR: apache#21313
This PR is part of a demo/workshop setup showcasing AI SRE capabilities.