diff --git a/README.md b/README.md index 4a845f0..aad0b8b 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,7 @@ Refering $SPARK_HOME to the Spark installation directory. | kinesis.client.numRetries | 3 | Maximum Number of retries for Kinesis API requests | | kinesis.client.retryIntervalMs | 1000 | Cool-off period before retrying Kinesis API | | kinesis.client.maxRetryIntervalMs | 10000 | Max Cool-off period between 2 retries | +| kinesis.client.clientExecutionTimeout | 0 | Amazon SDK client execution timeout | | kinesis.client.avoidEmptyBatches| false | Avoid creating an empty microbatch job by checking upfront if there are any unread data in the stream before the batch is started ## Kinesis Sink Configuration diff --git a/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala b/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala index da9120d..ff9202d 100644 --- a/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala +++ b/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala @@ -22,7 +22,7 @@ import java.util import java.util.{ArrayList, Locale} import java.util.concurrent.{Executors, ThreadFactory} -import com.amazonaws.AbortedException +import com.amazonaws.{AbortedException, ClientConfiguration, ClientConfigurationFactory} import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord import com.amazonaws.services.kinesis.model.{GetRecordsRequest, ListShardsRequest, Shard, _} @@ -72,13 +72,20 @@ private[kinesis] case class KinesisReader( readerOptions.getOrElse("client.maxRetryIntervalMs".toLowerCase(Locale.ROOT), "10000").toLong } + private val clientExecutionTimeout: Int = { + readerOptions.get("client.clientExecutionTimeout".toLowerCase(Locale.ROOT)) + .fold(ClientConfiguration.DEFAULT_CLIENT_EXECUTION_TIMEOUT)(_.toInt) + } + private val maxSupportedShardsPerStream = 10000; private var _amazonClient: AmazonKinesisClient = null private def getAmazonClient(): AmazonKinesisClient = { if (_amazonClient == null) { - _amazonClient = new AmazonKinesisClient(kinesisCredsProvider.provider) + val config = new ClientConfigurationFactory().getConfig + config.setClientExecutionTimeout(clientExecutionTimeout) + _amazonClient = new AmazonKinesisClient(kinesisCredsProvider.provider, config) _amazonClient.setEndpoint(endpointUrl) } _amazonClient diff --git a/src/test/scala/org/apache/spark/sql/kinesis/KinesisSourceSuite.scala b/src/test/scala/org/apache/spark/sql/kinesis/KinesisSourceSuite.scala index 9900b51..37b82d2 100644 --- a/src/test/scala/org/apache/spark/sql/kinesis/KinesisSourceSuite.scala +++ b/src/test/scala/org/apache/spark/sql/kinesis/KinesisSourceSuite.scala @@ -124,6 +124,9 @@ class KinesisSourceOptionsSuite extends StreamTest with SharedSparkSession { ("kinesis.executor.maxFetchTimeInMs", "10000"), ("kinesis.client.numRetries", "2") ) + testKinesisOptions( + ("kinesis.client.clientExecutionTimeout", "20000") + ) } test("test for failOnDataLoss") {