diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/StepExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/StepExecutionDao.java index ac9a626a06..3ed2af08fc 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/StepExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/StepExecutionDao.java @@ -68,6 +68,13 @@ default StepExecution createStepExecution(String stepName, JobExecution jobExecu @Deprecated(since = "6.0", forRemoval = true) StepExecution getStepExecution(JobExecution jobExecution, long stepExecutionId); + /** + * Because it may be possible that the status of a StepExecution is updated while + * running, the following method will synchronize only the status and version fields. + * @param stepExecution to be updated. + */ + void synchronizeStatus(StepExecution stepExecution); + /** * Retrieve the last {@link StepExecution} for a given {@link JobInstance} ordered by * creation time and then id. diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcStepExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcStepExecutionDao.java index e6371bddb2..b828d53ea4 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcStepExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcStepExecutionDao.java @@ -19,9 +19,7 @@ import java.sql.ResultSet; import java.sql.Timestamp; import java.sql.Types; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -90,6 +88,12 @@ public class JdbcStepExecutionDao extends AbstractJdbcBatchMetadataDao implement private static final String GET_STEP_EXECUTION = GET_RAW_STEP_EXECUTIONS + " WHERE STEP_EXECUTION_ID = ?"; + private static final String GET_VERSION_AND_STATUS = """ + SELECT VERSION, STATUS + FROM %PREFIX%STEP_EXECUTION + WHERE STEP_EXECUTION_ID = ? + """; + private static final String GET_LAST_STEP_EXECUTION = """ SELECT SE.STEP_EXECUTION_ID, SE.STEP_NAME, SE.START_TIME, SE.END_TIME, SE.STATUS, SE.COMMIT_COUNT, SE.READ_COUNT, SE.FILTER_COUNT, SE.WRITE_COUNT, SE.EXIT_CODE, SE.EXIT_MESSAGE, SE.READ_SKIP_COUNT, SE.WRITE_SKIP_COUNT, SE.PROCESS_SKIP_COUNT, SE.ROLLBACK_COUNT, SE.LAST_UPDATED, SE.VERSION, SE.CREATE_TIME, JE.JOB_EXECUTION_ID, JE.START_TIME, JE.END_TIME, JE.STATUS, JE.EXIT_CODE, JE.EXIT_MESSAGE, JE.CREATE_TIME, JE.LAST_UPDATED, JE.VERSION FROM %PREFIX%JOB_EXECUTION JE @@ -308,6 +312,20 @@ public StepExecution getStepExecution(JobExecution jobExecution, long stepExecut } } + @Override + public void synchronizeStatus(StepExecution stepExecution) { + getJdbcTemplate().query(getQuery(GET_VERSION_AND_STATUS), rs -> { + Integer currentVersion = rs.getInt("VERSION"); + if (!Objects.equals(currentVersion, stepExecution.getVersion())) { + BatchStatus currentStatus = BatchStatus.valueOf(rs.getString("STATUS")); + if (currentStatus.isGreaterThan(stepExecution.getStatus())) { + stepExecution.upgradeStatus(currentStatus); + } + stepExecution.setVersion(currentVersion); + } + }, stepExecution.getId()); + } + @Nullable @Override public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) { diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java index d56ffcee0f..113b882620 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java @@ -110,6 +110,17 @@ public StepExecution getStepExecution(JobExecution jobExecution, long stepExecut return stepExecution != null ? this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution) : null; } + @Override + public void synchronizeStatus(StepExecution stepExecution) { + StepExecution currentStepExecution = getStepExecution(stepExecution.getId()); + if (currentStepExecution != null && currentStepExecution.getStatus().isGreaterThan(stepExecution.getStatus())) { + stepExecution.upgradeStatus(currentStepExecution.getStatus()); + } + // TODO the contract mentions to update the version as well. Double check if this + // is needed as the version is not used in the tests following the call sites of + // synchronizeStatus + } + @Nullable @Override public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) { diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java index 6234889564..b20218daaf 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java @@ -159,15 +159,11 @@ public void update(StepExecution stepExecution) { stepExecution.setLastUpdated(LocalDateTime.now()); - StepExecution latestStepExecution = getStepExecution(stepExecution.getId()); - Assert.state(latestStepExecution != null, - "StepExecution with id " + stepExecution.getId() + "not found. Batch metadata state may be corrupted."); - - if (latestStepExecution.getJobExecution().isStopped() || latestStepExecution.getJobExecution().isStopping()) { - Integer version = latestStepExecution.getVersion(); - if (version != null) { - stepExecution.setVersion(version); - } + JobExecution jobExecution = stepExecution.getJobExecution(); + this.jobExecutionDao.synchronizeStatus(jobExecution); + + if (jobExecution.isStopped() || jobExecution.isStopping()) { + this.stepExecutionDao.synchronizeStatus(stepExecution); stepExecution.setTerminateOnly(); } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoStepExecutionDaoIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoStepExecutionDaoIntegrationTests.java index 1251fc0c2a..49fcb1a4a3 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoStepExecutionDaoIntegrationTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoStepExecutionDaoIntegrationTests.java @@ -161,6 +161,54 @@ void testConcurrentModificationException() { assertThrows(OptimisticLockingFailureException.class, () -> dao.updateStepExecution(exec2)); } + /** + * Successful synchronization from STARTED to STOPPING status. + */ + @Test + void testSynchronizeStatusUpgrade() { + + StepExecution exec1 = dao.createStepExecution("step", jobExecution); + exec1.setStatus(BatchStatus.STOPPING); + dao.updateStepExecution(exec1); + + StepExecution exec2 = dao.getStepExecution(exec1.getId()); + assertNotNull(exec2); + exec2.setStatus(BatchStatus.STARTED); + // exec2.setVersion(7); + // assertNotSame(exec1.getVersion(), exec2.getVersion()); + assertNotSame(exec1.getStatus(), exec2.getStatus()); + + dao.synchronizeStatus(exec2); + + // assertEquals(exec1.getVersion(), exec2.getVersion()); + assertEquals(exec1.getStatus(), exec2.getStatus()); + } + + /** + * UNKNOWN status won't be changed by synchronizeStatus, because it is the 'largest' + * BatchStatus (will not downgrade). + */ + @Test + void testSynchronizeStatusDowngrade() { + + StepExecution exec1 = dao.createStepExecution("step", jobExecution); + exec1.setStatus(BatchStatus.STARTED); + dao.updateStepExecution(exec1); + + StepExecution exec2 = dao.getStepExecution(exec1.getId()); + assertNotNull(exec2); + + exec2.setStatus(BatchStatus.UNKNOWN); + // exec2.setVersion(7); + // assertNotSame(exec1.getVersion(), exec2.getVersion()); + assertTrue(exec1.getStatus().isLessThan(exec2.getStatus())); + + dao.synchronizeStatus(exec2); + + // assertEquals(exec1.getVersion(), exec2.getVersion()); + assertEquals(BatchStatus.UNKNOWN, exec2.getStatus()); + } + private void assertStepExecutionsAreEqual(StepExecution expected, StepExecution actual) { assertEquals(expected.getId(), actual.getId()); assertTemporalEquals(expected.getStartTime(), actual.getStartTime());