Skip to content

KAFKA-20649: KIP-1352: Ranges and Range Aggregations in the Kafka Streams DSL#22441

Draft
buseynehannes wants to merge 7 commits into
apache:trunkfrom
buseynehannes:KIP/1352-range-aggregations
Draft

KAFKA-20649: KIP-1352: Ranges and Range Aggregations in the Kafka Streams DSL#22441
buseynehannes wants to merge 7 commits into
apache:trunkfrom
buseynehannes:KIP/1352-range-aggregations

Conversation

@buseynehannes
Copy link
Copy Markdown

This is a draft PR/Proof of Concept for KIP-1352.

KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1352%3A+Ranges+and+Range+Aggregations+in+the+Kafka+Streams+DSL
Jira: https://issues.apache.org/jira/browse/KAFKA-12345
Discuss Thread: (Link to your mailing list thread once it appears in the archives)

Note: This PR is currently a WIP to demonstrate the RangeStoreProcessor and RangeAggregateProcessor topology split, as well as the CloseableIterator lifecycle management.

buseynehannes and others added 6 commits June 1, 2026 13:53
Introduces rangeOver() on KGroupedStream, which lets users aggregate a
sliding window of records relative to each incoming record (the "anchor")
without waiting for a window boundary to close.

Public API additions:
- Range<K,V>               — abstract base; defines gracePeriodMs, fetch(),
                             and retentionMs() = rangeRetentionMs() + grace
- EventTimeRange           — time-bounded range [anchor.ts-before, anchor.ts+after];
                             static factories ofTimeBoundsWithNoGrace / AndGrace
- EventCountRange          — count-bounded range (before records back, after
                             records forward) bounded by maxTimeBefore;
                             static factories ofCountBoundsWithNoGrace / AndGrace
- RangeAggregator<K,V,VR>  — @FunctionalInterface (anchor, Iterable<Record>) -> VR
- RangedKStream<K,V>       — aggregate(RangeAggregator) and count()
- KGroupedStream.rangeOver(Range) / rangeOver(Range, Materialized)

Internal topology (two-processor design):
- KStreamRangeBuffer       — writes records to a retainDuplicates WindowStore,
                             enforces grace period, forwards downstream
- KStreamRangeAggregate    — on each record calls range.fetch() against the
                             shared buffer store, applies the aggregator
- RangeWindowStoreMaterializer — configures the WindowStore (plain, not
                             timestamped; retainDuplicates=true enforced)
- RangedKStreamImpl        — wires aggregate() calls as separate downstream
                             ProcessorGraphNodes sharing the same buffer store
- GroupedStreamAggregateBuilder.prepareRangedBuffer() — handles repartitioning
                             before attaching the buffer processor node

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Brings over EventTimeRange, EventCountRange, and processing flow
diagrams (PNG + PlantUML source) from KIP/trying_to_create_my_own_kip.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Four corrections vs. the actual implementation:
- Drop arrows no longer point back to upstream (records are discarded, not returned)
- Null key/value drop condition added before the grace period check
- Removed incorrect RAP->WindowStore close() call (iterator consumed inside apply())
- forward() label corrected to anchor.withValue(result)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ources

Range.fetch() now returns CloseableIterator<Record<K,V>> instead of
Iterable<Record<K,V>>, matching the KIP spec. The aggregate processor
wraps it in a try-with-resources block, guaranteeing the underlying
RocksDB cursor is always closed after aggregation — even if the user's
aggregator exits early or throws.

- New public interface CloseableIterator<T> (extends Iterator + AutoCloseable)
- LimitingWindowStoreIterator and ConcatenatingIterator implement it
- Both expose an idempotent close() that delegates to the inner store iterator
- KStreamRangeAggregate.process() closes the iterator via try-with-resources

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
KIP docs and diagrams belong on the Confluence wiki, not in the repo.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
ArrayList.add(0, e) is O(n) per prepend (shifts all elements), giving
O(before²) total. ArrayDeque.addFirst() is O(1) amortized with no
shifting; its iterator() already walks head-to-tail in the correct
ascending order.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@github-actions github-actions Bot added triage PRs from the community streams labels Jun 1, 2026
The lambda () -> it handed a bare CloseableIterator to the aggregator,
silently returning the same exhausted cursor on a second iterator() call.
Replace with SingleUseIterable, which throws IllegalStateException if
iterator() is called more than once — failing fast rather than silently
producing empty results.

Document the single-use contract on RangeAggregator.apply().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

streams triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant