From 248c48fb666b81fadd67060a997516473c751e69 Mon Sep 17 00:00:00 2001 From: "Oh, Hoon" Date: Thu, 12 Mar 2026 19:00:34 +0000 Subject: [PATCH 01/10] Merged PR 4967286: Catch Exceptions from openBatchSession. --- **Work Item:** #11316434 #11406267 --- **Summary** -- Kyuubi Batch Service creates `batchExecutor` to submit a batch job. However, when `openBatchSession` fails the executor dies and never recovers. This can result in a jobs stuck at PENDING state and no executor to submit any jobs stuck at INITIALIZED state. We added error handling logic to handle openBatchSession failures **Problem** -- `openBatchSession` may fail if connections.per.users are exceeded or metedata query fails. When `openBatchSession` fails, the executor dies and does not recover unless we restart kyuubi server pods. --- **Approach** --- Add try block to catch exception when `openBatchSession` fails. We mark the job from PENDING to FAILED to avoid PENDING jobs taking up submitter executors for long period of time. **Code Change** -- Fixed incorrect behavior of `withUpdateCount`. Added Try and Except block in `KyuubiBatchService` to catch all open batch session exceptions, then it continues to the next job. --- **Concern** It's not clear whether we should set the failed job with ERROR or INITIALZIED (INITIALIZED would allow it to retry). **Test** Related work items: #11406267 --- .../kyuubi/server/KyuubiBatchService.scala | 112 +++++++++++------- .../server/metadata/MetadataManager.scala | 4 + .../metadata/jdbc/JDBCMetadataStore.scala | 2 +- .../server/api/v1/BatchesResourceSuite.scala | 67 +++++++++++ 4 files changed, 139 insertions(+), 46 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala index 38bb999c342..cbe5de1cecb 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala @@ -60,57 +60,79 @@ class KyuubiBatchService( } override def start(): Unit = { + val UNINITIALIZED_BATCH_ID = "UNINITIALIZED_BATCH_ID" assert(running.compareAndSet(false, true)) val submitTask: Runnable = () => { restFrontend.waitForServerStarted() while (running.get) { - metadataManager.pickBatchForSubmitting(kyuubiInstance) match { - case None => Thread.sleep(1000) - case Some(metadata) => - val batchId = metadata.identifier - info(s"$batchId is picked for submission.") - val batchSession = sessionManager.createBatchSession( - metadata.username, - "anonymous", - metadata.ipAddress, - metadata.requestConf, - metadata.engineType, - Option(metadata.requestName), - metadata.resource, - metadata.className, - metadata.requestArgs, - Some(metadata), - fromRecovery = false) - sessionManager.openBatchSession(batchSession) - var submitted = false - while (!submitted) { // block until batch job submitted - submitted = metadataManager.getBatchSessionMetadata(batchId) match { - case Some(metadata) if OperationState.isTerminal(metadata.opState) => - true - case Some(metadata) if metadata.opState == OperationState.RUNNING => - metadata.appState match { - // app that is not submitted to resource manager - case None | Some(ApplicationState.NOT_FOUND) => false - // app that is pending in resource manager while the local startup - // process is alive. For example, in Spark YARN cluster mode, if set - // spark.yarn.submit.waitAppCompletion=false, the local spark-submit - // process exits immediately once Application goes ACCEPTED status, - // even no resource could be allocated for the AM container. - case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive => - false - // not sure, added for safe - case Some(ApplicationState.UNKNOWN) => false - case _ => true - } - case Some(_) => - false - case None => - error(s"$batchId does not existed in metastore, assume it is finished") - true + var batchId = UNINITIALIZED_BATCH_ID + try { + metadataManager.pickBatchForSubmitting(kyuubiInstance) match { + case None => Thread.sleep(1000) + case Some(metadata) => + batchId = metadata.identifier + info(s"$batchId is picked for submission.") + val batchSession = sessionManager.createBatchSession( + metadata.username, + "anonymous", + metadata.ipAddress, + metadata.requestConf, + metadata.engineType, + Option(metadata.requestName), + metadata.resource, + metadata.className, + metadata.requestArgs, + Some(metadata), + fromRecovery = false) + sessionManager.openBatchSession(batchSession) + var submitted = false + while (!submitted) { // block until batch job submitted + submitted = metadataManager.getBatchSessionMetadata(batchId) match { + case Some(metadata) if OperationState.isTerminal(metadata.opState) => + true + case Some(metadata) if metadata.opState == OperationState.RUNNING => + metadata.appState match { + // app that is not submitted to resource manager + case None | Some(ApplicationState.NOT_FOUND) => false + // app that is pending in resource manager while the local startup + // process is alive. For example, in Spark YARN cluster mode, if set + // spark.yarn.submit.waitAppCompletion=false, the local spark-submit + // process exits immediately once Application goes ACCEPTED status, + // even no resource could be allocated for the AM container. + case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive => + false + // not sure, added for safe + case Some(ApplicationState.UNKNOWN) => false + case _ => true + } + case Some(_) => + false + case None => + error(s"$batchId does not existed in metastore, assume it is finished") + true + } + if (!submitted) Thread.sleep(1000) + } + info(s"$batchId is submitted or finished.") + } + } catch { + // GEICO: If the batch session is not opened, + // reinitialize the batch state to ERROR + // This can be due to a DB error or connection limits + case e: Exception => + if (batchId == UNINITIALIZED_BATCH_ID) { + error(s"Error picking batch for submission", e) + } else { + error(s"Error opening batch session for $batchId", e) + try { + metadataManager.failScheduledBatch(batchId) + info(s"$batchId is marked as ERROR") + } catch { + case ex: Exception => + error(s"Unable to modify metadata for $batchId to ERROR", ex) } - if (!submitted) Thread.sleep(1000) } - info(s"$batchId is submitted or finished.") + Thread.sleep(1000) } } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala index c5182979ee7..85495c11ecd 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala @@ -181,6 +181,10 @@ class MetadataManager extends AbstractService("MetadataManager") { _metadataStore.transformMetadataState(batchId, "INITIALIZED", "CANCELED") } + def failScheduledBatch(batchId: String): Boolean = { + _metadataStore.transformMetadataState(batchId, "PENDING", "ERROR") + } + def getBatchesRecoveryMetadata( state: String, kyuubiInstance: String, diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala index b7e83cd563d..a9ee8fa9569 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala @@ -235,7 +235,7 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { targetState: String): Boolean = { val query = s"UPDATE $METADATA_TABLE SET state = ? WHERE identifier = ? AND state = ?" JdbcUtils.withConnection { connection => - withUpdateCount(connection, query, fromState, identifier, targetState) { updateCount => + withUpdateCount(connection, query, targetState, identifier, fromState) { updateCount => updateCount == 1 } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala index f836f6f1572..ce59aed11e5 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala @@ -44,6 +44,7 @@ import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState} import org.apache.kyuubi.operation.OperationState.OperationState import org.apache.kyuubi.server.KyuubiRestFrontendService import org.apache.kyuubi.server.http.util.HttpAuthUtils.{basicAuthorizationHeader, AUTHORIZATION_HEADER} +import org.apache.kyuubi.server.metadata.MetadataManager import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} import org.apache.kyuubi.service.authentication.{AnonymousAuthenticationProviderImpl, AuthUtils} import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, SessionHandle, SessionType} @@ -72,6 +73,72 @@ class BatchesV2ResourceSuite extends BatchesResourceSuiteBase { Utils.tryLogNonFatalError { sessionManager.closeSession(session.handle) } } } + + test("KyuubiBatchService catch block when openBatchSession fails during metadata update") { + val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager] + val realMetadataManager = sessionManager.metadataManager.get + + val wrapperMetadataManager = new MetadataManager { + override def updateMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true): Unit = { + throw new RuntimeException("test metadata update failure") + } + override def insertMetadata(metadata: Metadata, asyncRetryOnError: Boolean = true): Unit = { + realMetadataManager.insertMetadata(metadata, asyncRetryOnError) + } + override def getBatch(batchId: String) = realMetadataManager.getBatch(batchId) + } + wrapperMetadataManager.initialize(sessionManager.getConf) + + val originalMetadataManager = sessionManager.metadataManager + try { + sessionManager.metadataManager = Some(wrapperMetadataManager) + + val requestObj = newSparkBatchRequest(Map("spark.master" -> "local")) + val response = webTarget.path("api/v1/batches") + .request(MediaType.APPLICATION_JSON_TYPE) + .header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous")) + .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE)) + assert(response.getStatus == 200) + val batch = response.readEntity(classOf[Batch]) + val batchId = batch.getId + assert(batch.getState === "INITIALIZED") + + eventually(timeout(15.seconds), interval(1.second)) { + val batchInfoResponse = webTarget.path(s"api/v1/batches/$batchId") + .request(MediaType.APPLICATION_JSON_TYPE) + .header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous")) + .get() + assert(batchInfoResponse.getStatus == 200) + val batchInfo = batchInfoResponse.readEntity(classOf[Batch]) + assert( + batchInfo.getState === "INITIALIZED", + "Batch should remain INITIALIZED after openBatchSession failed and catch block ran " + + "(failScheduledBatch only transitions PENDING->ERROR)") + } + + sessionManager.metadataManager = originalMetadataManager + + val requestObj2 = newSparkBatchRequest(Map("spark.master" -> "local")) + val response2 = webTarget.path("api/v1/batches") + .request(MediaType.APPLICATION_JSON_TYPE) + .header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous")) + .post(Entity.entity(requestObj2, MediaType.APPLICATION_JSON_TYPE)) + assert(response2.getStatus == 200) + val batch2Id = response2.readEntity(classOf[Batch]).getId + eventually(timeout(30.seconds), interval(1.second)) { + val batch2InfoResponse = webTarget.path(s"api/v1/batches/$batch2Id") + .request(MediaType.APPLICATION_JSON_TYPE) + .header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous")) + .get() + val batch2Info = batch2InfoResponse.readEntity(classOf[Batch]) + assert( + batch2Info.getState === "PENDING" || batch2Info.getState === "RUNNING", + "Second batch should be processed (batch service still running after catch)") + } + } finally { + sessionManager.metadataManager = originalMetadataManager + } + } } abstract class BatchesResourceSuiteBase extends KyuubiFunSuite From 5d6f5dd30b06755ddb20a76a1d412412fbc38b3c Mon Sep 17 00:00:00 2001 From: Hoon Oh Date: Thu, 16 Apr 2026 10:07:43 -0400 Subject: [PATCH 02/10] More cleanups --- .../org/apache/kyuubi/server/KyuubiBatchService.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala index cbe5de1cecb..e0a103c747a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala @@ -111,14 +111,12 @@ class KyuubiBatchService( error(s"$batchId does not existed in metastore, assume it is finished") true } - if (!submitted) Thread.sleep(1000) - } + if (!submitted) Thread.sleep(1000) } info(s"$batchId is submitted or finished.") } } catch { - // GEICO: If the batch session is not opened, - // reinitialize the batch state to ERROR - // This can be due to a DB error or connection limits + // If the batch session failed to open, reinitialize the batch state to ERROR + // This can be due to a DB error or batch_connection_limits exceeded case e: Exception => if (batchId == UNINITIALIZED_BATCH_ID) { error(s"Error picking batch for submission", e) @@ -126,7 +124,6 @@ class KyuubiBatchService( error(s"Error opening batch session for $batchId", e) try { metadataManager.failScheduledBatch(batchId) - info(s"$batchId is marked as ERROR") } catch { case ex: Exception => error(s"Unable to modify metadata for $batchId to ERROR", ex) From 175fee038de5911c26a880a7890641ee51b4e19d Mon Sep 17 00:00:00 2001 From: Hoon Oh Date: Thu, 16 Apr 2026 15:30:43 -0400 Subject: [PATCH 03/10] Format cleanups --- .../scala/org/apache/kyuubi/server/KyuubiBatchService.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala index e0a103c747a..1edb0b5dc19 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala @@ -111,7 +111,8 @@ class KyuubiBatchService( error(s"$batchId does not existed in metastore, assume it is finished") true } - if (!submitted) Thread.sleep(1000) } + if (!submitted) Thread.sleep(1000) + } info(s"$batchId is submitted or finished.") } } catch { From 6504d27e98f6289de8f7db8b4bf5a5fbdfce63b3 Mon Sep 17 00:00:00 2001 From: Hoon Oh <92890928+oh0873@users.noreply.github.com> Date: Mon, 20 Apr 2026 09:41:38 -0400 Subject: [PATCH 04/10] Potential fix for pull request finding Grammar Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../scala/org/apache/kyuubi/server/KyuubiBatchService.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala index 1edb0b5dc19..85373269436 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala @@ -108,7 +108,7 @@ class KyuubiBatchService( case Some(_) => false case None => - error(s"$batchId does not existed in metastore, assume it is finished") + error(s"$batchId does not exist in metastore, assume it is finished") true } if (!submitted) Thread.sleep(1000) From a3449d6fc719590021018bb8ac78042582384938 Mon Sep 17 00:00:00 2001 From: Hoon Oh Date: Mon, 20 Apr 2026 10:21:40 -0400 Subject: [PATCH 05/10] fixing testcase --- .../org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala index ce59aed11e5..7e9ed14c951 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala @@ -87,7 +87,6 @@ class BatchesV2ResourceSuite extends BatchesResourceSuiteBase { } override def getBatch(batchId: String) = realMetadataManager.getBatch(batchId) } - wrapperMetadataManager.initialize(sessionManager.getConf) val originalMetadataManager = sessionManager.metadataManager try { From b06a0f41b5e932c8d4fd916d24f1659ad526b3f5 Mon Sep 17 00:00:00 2001 From: Hoon Oh Date: Mon, 20 Apr 2026 10:22:26 -0400 Subject: [PATCH 06/10] fixing testcase-2 --- .../apache/kyuubi/server/api/v1/BatchesResourceSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala index 7e9ed14c951..637562f98ec 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala @@ -110,9 +110,9 @@ class BatchesV2ResourceSuite extends BatchesResourceSuiteBase { assert(batchInfoResponse.getStatus == 200) val batchInfo = batchInfoResponse.readEntity(classOf[Batch]) assert( - batchInfo.getState === "INITIALIZED", - "Batch should remain INITIALIZED after openBatchSession failed and catch block ran " + - "(failScheduledBatch only transitions PENDING->ERROR)") + batchInfo.getState === "ERROR", + "Batch should eventually become ERROR after being picked and failed by the " + + "catch path, rather than remaining stuck in PENDING") } sessionManager.metadataManager = originalMetadataManager From 303f4f8737714d8b898d606ab04858f9ceaa6465 Mon Sep 17 00:00:00 2001 From: Hoon Oh Date: Mon, 20 Apr 2026 10:35:36 -0400 Subject: [PATCH 07/10] Handling InterruptedException. --- .../apache/kyuubi/server/KyuubiBatchService.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala index 85373269436..ade7e3bbe9e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala @@ -116,6 +116,19 @@ class KyuubiBatchService( info(s"$batchId is submitted or finished.") } } catch { + case e: InterruptedException => + if (batchId == UNINITIALIZED_BATCH_ID) { + error(s"Interrupted while picking batch for submission", e) + } else { + error(s"Interrupted while opening batch session for $batchId", e) + try { + metadataManager.failScheduledBatch(batchId) + } catch { + case ex: Exception => + error(s"Unable to modify metadata for $batchId to ERROR", ex) + } + } + throw e // If the batch session failed to open, reinitialize the batch state to ERROR // This can be due to a DB error or batch_connection_limits exceeded case e: Exception => From ca25fb2da18fef9906f216d38002d71be74ba2f2 Mon Sep 17 00:00:00 2001 From: Hoon Oh Date: Tue, 21 Apr 2026 13:33:17 -0400 Subject: [PATCH 08/10] Remove Sleep --- .../scala/org/apache/kyuubi/server/KyuubiBatchService.scala | 1 - .../apache/kyuubi/server/api/v1/BatchesResourceSuite.scala | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala index ade7e3bbe9e..a59ff5ad4c9 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala @@ -143,7 +143,6 @@ class KyuubiBatchService( error(s"Unable to modify metadata for $batchId to ERROR", ex) } } - Thread.sleep(1000) } } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala index 637562f98ec..abba4addfd5 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala @@ -111,8 +111,8 @@ class BatchesV2ResourceSuite extends BatchesResourceSuiteBase { val batchInfo = batchInfoResponse.readEntity(classOf[Batch]) assert( batchInfo.getState === "ERROR", - "Batch should eventually become ERROR after being picked and failed by the " + - "catch path, rather than remaining stuck in PENDING") + "Batch should eventually become ERROR after being picked and failed by the " + + "catch path, rather than remaining stuck in PENDING") } sessionManager.metadataManager = originalMetadataManager From 4a0889414595557a825c5855e44d0811ce68d97d Mon Sep 17 00:00:00 2001 From: Hoon Oh Date: Tue, 21 Apr 2026 14:18:07 -0400 Subject: [PATCH 09/10] Added comment and detailed error message --- .../apache/kyuubi/server/KyuubiBatchService.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala index a59ff5ad4c9..f43dd5e2cbb 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala @@ -125,7 +125,10 @@ class KyuubiBatchService( metadataManager.failScheduledBatch(batchId) } catch { case ex: Exception => - error(s"Unable to modify metadata for $batchId to ERROR", ex) + error( + s"Unable to modify metadata for $batchId to ERROR; " + + "an administrator may need to reset the batch state manually.", + ex) } } throw e @@ -140,9 +143,14 @@ class KyuubiBatchService( metadataManager.failScheduledBatch(batchId) } catch { case ex: Exception => - error(s"Unable to modify metadata for $batchId to ERROR", ex) + error( + s"Unable to modify metadata for $batchId to ERROR; " + + "an administrator may need to reset the batch state manually.", + ex) } } + // sleep 1 second to avoid excessive retries during transient network/DB failures + Thread.sleep(1000) } } } From c4f0db411a435d58e900b505d667460754583c44 Mon Sep 17 00:00:00 2001 From: Hoon Oh Date: Tue, 21 Apr 2026 14:19:20 -0400 Subject: [PATCH 10/10] Formatting --- .../scala/org/apache/kyuubi/server/KyuubiBatchService.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala index f43dd5e2cbb..db996af96fc 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala @@ -149,7 +149,7 @@ class KyuubiBatchService( ex) } } - // sleep 1 second to avoid excessive retries during transient network/DB failures + // sleep 1 second to avoid excessive retries during transient network/DB failures Thread.sleep(1000) } }