Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,18 @@
* to provide common properties and methods for building chunk-oriented steps.
*
* @author Mahmoud Ben Hassine
* @author Yanming Zhou
* @since 6.0
*/
public class ChunkOrientedStepBuilder<I, O> extends StepBuilderHelper<ChunkOrientedStepBuilder<I, O>> {

private final int chunkSize;

private @Nullable ItemReader<I> reader;
private @Nullable ItemReader<? extends I> reader;

private @Nullable ItemProcessor<I, O> processor;
private @Nullable ItemProcessor<? super I, ? extends O> processor;

private @Nullable ItemWriter<O> writer;
private @Nullable ItemWriter<? super O> writer;

private PlatformTransactionManager transactionManager = new ResourcelessTransactionManager();

Expand Down Expand Up @@ -155,7 +156,7 @@ protected ChunkOrientedStepBuilder<I, O> self() {
* @param reader an item reader
* @return this for fluent chaining
*/
public ChunkOrientedStepBuilder<I, O> reader(ItemReader<I> reader) {
public ChunkOrientedStepBuilder<I, O> reader(ItemReader<? extends I> reader) {
this.reader = reader;
return self();
}
Expand All @@ -167,7 +168,7 @@ public ChunkOrientedStepBuilder<I, O> reader(ItemReader<I> reader) {
* @param processor an item processor
* @return this for fluent chaining
*/
public ChunkOrientedStepBuilder<I, O> processor(ItemProcessor<I, O> processor) {
public ChunkOrientedStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> processor) {
this.processor = processor;
return self();
}
Expand All @@ -179,7 +180,7 @@ public ChunkOrientedStepBuilder<I, O> processor(ItemProcessor<I, O> processor) {
* @param writer an item writer
* @return this for fluent chaining
*/
public ChunkOrientedStepBuilder<I, O> writer(ItemWriter<O> writer) {
public ChunkOrientedStepBuilder<I, O> writer(ItemWriter<? super O> writer) {
this.writer = writer;
return self();
}
Expand Down Expand Up @@ -395,7 +396,6 @@ public ChunkOrientedStepBuilder<I, O> observationRegistry(ObservationRegistry ob
return self();
}

@SuppressWarnings("unchecked")
public ChunkOrientedStep<I, O> build() {
Assert.notNull(this.reader, "Item reader must not be null");
Assert.notNull(this.writer, "Item writer must not be null");
Expand Down Expand Up @@ -457,6 +457,7 @@ public ChunkOrientedStep<I, O> build() {
return chunkOrientedStep;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private void registerTypedListener(StepListener stepListener, ChunkOrientedStep<I, O> chunkOrientedStep) {
if (stepListener instanceof ItemReadListener listener) {
chunkOrientedStep.registerItemReadListener(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
* @author Mahmoud Ben Hassine
* @author Andrey Litvitski
* @author xeounxzxu
* @author Yanming Zhou
* @since 6.0
*/
public class ChunkOrientedStep<I, O> extends AbstractStep {
Expand All @@ -99,16 +100,16 @@ public class ChunkOrientedStep<I, O> extends AbstractStep {
/*
* Step Input / Output parameters
*/
private final ItemReader<I> itemReader;
private final ItemReader<? extends I> itemReader;

private final CompositeItemReadListener<I> compositeItemReadListener = new CompositeItemReadListener<>();

@SuppressWarnings("unchecked")
private ItemProcessor<I, O> itemProcessor = item -> (O) item;
private ItemProcessor<? super I, ? extends O> itemProcessor = item -> (O) item;

private final CompositeItemProcessListener<I, O> compositeItemProcessListener = new CompositeItemProcessListener<>();

private final ItemWriter<O> itemWriter;
private final ItemWriter<? super O> itemWriter;

private final CompositeItemWriteListener<O> compositeItemWriteListener = new CompositeItemWriteListener<>();

Expand Down Expand Up @@ -167,8 +168,8 @@ public class ChunkOrientedStep<I, O> extends AbstractStep {
* @param itemWriter the item writer to write items
* @param jobRepository the job repository to use for this step
*/
public ChunkOrientedStep(String name, int chunkSize, ItemReader<I> itemReader, ItemWriter<O> itemWriter,
JobRepository jobRepository) {
public ChunkOrientedStep(String name, int chunkSize, ItemReader<? extends I> itemReader,
ItemWriter<? super O> itemWriter, JobRepository jobRepository) {
super(jobRepository);
this.chunkSize = chunkSize;
this.itemReader = itemReader;
Expand All @@ -180,7 +181,7 @@ public ChunkOrientedStep(String name, int chunkSize, ItemReader<I> itemReader, I
* Set the item processor to use for processing items.
* @param itemProcessor the item processor to set
*/
public void setItemProcessor(ItemProcessor<I, O> itemProcessor) {
public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) {
Assert.notNull(itemProcessor, "Item processor must not be null");
this.itemProcessor = itemProcessor;
}
Expand Down
Loading