Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ lazy val commonLib = project("common-lib").settings(
"com.gu" %% "pan-domain-auth-play_3-0" % "9.0.0",
"com.amazonaws" % "aws-java-sdk-iam" % awsSdkVersion,
"com.amazonaws" % "aws-java-sdk-s3" % awsSdkVersion,
"software.amazon.awssdk" % "s3" % awsSdkV2Version,
"com.amazonaws" % "aws-java-sdk-ec2" % awsSdkVersion,
"com.amazonaws" % "aws-java-sdk-sqs" % awsSdkVersion,
"com.amazonaws" % "aws-java-sdk-sns" % awsSdkVersion,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package com.gu.mediaservice.lib

import com.amazonaws.services.s3.model.{DeleteObjectsRequest, MultiObjectDeleteException}

import java.io.File
import com.gu.mediaservice.lib.config.CommonConfig
import com.gu.mediaservice.lib.aws.S3Object
import com.gu.mediaservice.lib.config.CommonConfig
import com.gu.mediaservice.lib.logging.LogMarker
import com.gu.mediaservice.model.{MimeType, Png}
import org.joda.time.DateTime
import com.gu.mediaservice.model.MimeType
import software.amazon.awssdk.services.s3.model._

import java.io.File
import java.time.Instant
import scala.concurrent.Future
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -48,20 +47,20 @@ class ImageIngestOperations(imageBucket: String, thumbnailBucket: String, config
private def bulkDelete(bucket: String, keys: List[String]): Future[Map[String, Boolean]] = keys match {
case Nil => Future.successful(Map.empty)
case _ => Future {
try {
client.deleteObjects(
new DeleteObjectsRequest(bucket).withKeys(keys: _*)
)
val objectsToDelete = keys.map(key => ObjectIdentifier.builder().key(key).build()).asJava
val resp = client.deleteObjects(
DeleteObjectsRequest.builder().bucket(bucket).delete(Delete.builder().objects(objectsToDelete).build()).build()
)
if (resp.hasErrors) {
val errorKeys = resp.errors().asScala.map(_.key()).toSet
logger.warn(s"Partial failure when deleting images from $bucket: ${resp.errors()}")
keys.map { key =>
key -> !errorKeys.contains(key)
}.toMap
} else {
keys.map { key =>
key -> true
}.toMap
} catch {
case partialFailure: MultiObjectDeleteException =>
logger.warn(s"Partial failure when deleting images from $bucket: ${partialFailure.getMessage} ${partialFailure.getErrors}")
val errorKeys = partialFailure.getErrors.asScala.map(_.getKey).toSet
keys.map { key =>
key -> !errorKeys.contains(key)
}.toMap
}
}
}
Expand All @@ -73,8 +72,14 @@ class ImageIngestOperations(imageBucket: String, thumbnailBucket: String, config
def deletePNG(id: String)(implicit logMarker: LogMarker): Future[Unit] = deleteImage(imageBucket, optimisedPngKeyFromId(id))
def deletePNGs(ids: Set[String]) = bulkDelete(imageBucket, ids.map(optimisedPngKeyFromId).toList)

def doesOriginalExist(id: String): Boolean =
client.doesObjectExist(imageBucket, fileKeyFromId(id))
def doesOriginalExist(id: String): Boolean = {
try {
client.headObject(HeadObjectRequest.builder().bucket(imageBucket).key(fileKeyFromId(id)).build())
true
} catch {
case _: NoSuchKeyException => false
}
}
}

sealed trait ImageWrapper {
Expand All @@ -95,7 +100,7 @@ sealed trait StorableImage extends ImageWrapper {
}

case class StorableThumbImage(id: String, file: File, mimeType: MimeType, meta: Map[String, String] = Map.empty) extends StorableImage
case class StorableOriginalImage(id: String, file: File, mimeType: MimeType, lastModified: DateTime, meta: Map[String, String] = Map.empty) extends StorableImage {
case class StorableOriginalImage(id: String, file: File, mimeType: MimeType, lastModified: Instant, meta: Map[String, String] = Map.empty) extends StorableImage {
override def toProjectedS3Object(thumbBucket: String): S3Object = S3Object(
thumbBucket,
ImageIngestOperations.fileKeyFromId(id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import com.gu.mediaservice.lib.aws.S3
import com.gu.mediaservice.lib.config.CommonConfig
import com.gu.mediaservice.lib.logging.{GridLogging, LogMarker}
import com.gu.mediaservice.model.MimeType
import org.slf4j.LoggerFactory
import software.amazon.awssdk.services.s3.model.{DeleteObjectRequest, HeadObjectRequest, ListObjectsRequest}

import java.io.File
import scala.jdk.CollectionConverters._
import scala.concurrent.Future
import scala.jdk.CollectionConverters._

// TODO: If deleteObject fails - we should be catching the errors here to avoid them bubbling to the application
class S3ImageStorage(config: CommonConfig) extends S3(config) with ImageStorage with GridLogging {
Expand All @@ -28,19 +28,19 @@ class S3ImageStorage(config: CommonConfig) extends S3(config) with ImageStorage
}

def deleteImage(bucket: String, id: String)(implicit logMarker: LogMarker) = Future {
client.deleteObject(bucket, id)
client.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(id).build())
logger.info(logMarker, s"Deleted image $id from bucket $bucket")
}

def deleteVersionedImage(bucket: String, id: String)(implicit logMarker: LogMarker) = Future {
val objectVersion = client.getObjectMetadata(bucket, id).getVersionId
client.deleteVersion(bucket, id, objectVersion)
val objectVersion = client.headObject(HeadObjectRequest.builder().bucket(bucket).key(id).build()).versionId()
client.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(id).versionId(objectVersion).build())
logger.info(logMarker, s"Deleted image $id from bucket $bucket (version: $objectVersion)")
}

def deleteFolder(bucket: String, id: String)(implicit logMarker: LogMarker) = Future {
val files = client.listObjects(bucket, id).getObjectSummaries.asScala
files.foreach(file => client.deleteObject(bucket, file.getKey))
val files = client.listObjects(ListObjectsRequest.builder().bucket(bucket).prefix(s"$id/").build()).contents().asScala
files.foreach(file => client.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(file.key()).build()))
logger.info(logMarker, s"Deleting images in folder $id from bucket $bucket")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ class KeyStore(bucket: String, config: CommonConfig)(implicit ec: ExecutionConte

def lookupIdentity(key: String): Option[ApiAccessor] = store.get().get(key)

def findKey(prefix: String): Option[String] = s3.syncFindKey(bucket, prefix)

def update(): Unit = {
store.set(fetchAll)
}
Expand Down
114 changes: 60 additions & 54 deletions common-lib/src/main/scala/com/gu/mediaservice/lib/aws/S3.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
package com.gu.mediaservice.lib.aws

import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder, model}
import com.amazonaws.AmazonServiceException
import com.amazonaws.util.IOUtils
import com.amazonaws.{AmazonServiceException, ClientConfiguration}
import com.gu.mediaservice.lib.config.CommonConfig
import com.gu.mediaservice.lib.logging.{GridLogging, LogMarker, Stopwatch}
import com.gu.mediaservice.model._
import org.joda.time.{DateTime, Duration}
import software.amazon.awssdk.core.ResponseInputStream
import software.amazon.awssdk.core.sync.RequestBody
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.model.{GetObjectRequest, GetObjectResponse, HeadObjectRequest, HeadObjectResponse, ListObjectsRequest, PutObjectRequest, S3Exception, S3Object => S3ObjectSummary}
import software.amazon.awssdk.services.s3.presigner.S3Presigner
import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest

import java.io.File
import java.net.URI
import scala.jdk.CollectionConverters._
import java.time.Instant
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters.ScalaDurationOps
import scala.language.implicitConversions

case class S3Object(uri: URI, size: Long, metadata: S3Metadata)

Expand All @@ -25,7 +33,7 @@ object S3Object {
def apply(bucket: String, key: String, size: Long, metadata: S3Metadata): S3Object =
apply(objectUrl(bucket, key), size, metadata)

def apply(bucket: String, key: String, file: File, mimeType: Option[MimeType], lastModified: Option[DateTime],
def apply(bucket: String, key: String, file: File, mimeType: Option[MimeType], lastModified: Option[Instant],
meta: Map[String, String] = Map.empty, cacheControl: Option[String] = None): S3Object = {
S3Object(
bucket,
Expand All @@ -46,95 +54,97 @@ object S3Object {
case class S3Metadata(userMetadata: Map[String, String], objectMetadata: S3ObjectMetadata)

object S3Metadata {
def apply(meta: ObjectMetadata): S3Metadata = {
def fromHeadObjectResponse(hor: HeadObjectResponse): S3Metadata = {
S3Metadata(
meta.getUserMetadata.asScala.toMap,
hor.metadata().asScala.toMap,
S3ObjectMetadata(
contentType = Option(meta.getContentType).filterNot(_.toLowerCase == "application/octet-stream").map(MimeType.apply),
cacheControl = Option(meta.getCacheControl),
lastModified = Option(meta.getLastModified).map(new DateTime(_))
contentType = Option(hor.contentType()).filterNot(_.toLowerCase == "application/octet-stream").map(MimeType.apply),
cacheControl = Option(hor.cacheControl()),
lastModified = Option(hor.lastModified())
)
)
}
}

case class S3ObjectMetadata(contentType: Option[MimeType], cacheControl: Option[String], lastModified: Option[DateTime])
case class S3ObjectMetadata(contentType: Option[MimeType], cacheControl: Option[String], lastModified: Option[Instant])

class S3(config: CommonConfig) extends GridLogging with ContentDisposition with RoundedExpiration {
type Bucket = String
type Key = String
type UserMetadata = Map[String, String]

lazy val client: AmazonS3 = S3Ops.buildS3Client(config)
lazy val client: S3Client = S3Ops.buildS3Client(config)
lazy val presigner = S3Presigner.create()

def signUrl(bucket: Bucket, url: URI, image: Image, expiration: DateTime = cachableExpiration(), imageType: ImageFileType = Source): String = {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

turns out nothing's ever caching the presigned urls, so no need to round to make cacheable?
(if we do need this, it's annoying as v2 sdk only allows you to specify an expiry duration, not an expiry time, and expiry time is calculated internally against a call to Instant.now(), so impossible to guarantee urls match!)

def signUrl(bucket: Bucket, url: URI, image: Image, imageType: ImageFileType = Source): String = {
// get path and remove leading `/`
val key: Key = url.getPath.drop(1)

val contentDisposition = getContentDisposition(image, imageType, config.shortenDownloadFilename)

val headers = new ResponseHeaderOverrides().withContentDisposition(contentDisposition)
val objReq = GetObjectRequest.builder().bucket(bucket).key(key).responseContentDisposition(contentDisposition).build()
val requestt = GetObjectPresignRequest.builder().getObjectRequest(objReq).signatureDuration(10.minutes.toJava).build()

val request = new GeneratePresignedUrlRequest(bucket, key).withExpiration(expiration.toDate).withResponseHeaders(headers)
client.generatePresignedUrl(request).toExternalForm
presigner.presignGetObject(requestt).url().toExternalForm
}

def getObject(bucket: Bucket, url: URI): model.S3Object = {
def getObject(bucket: Bucket, url: URI): ResponseInputStream[GetObjectResponse] = {
// get path and remove leading `/`
val key: Key = url.getPath.drop(1)
client.getObject(new GetObjectRequest(bucket, key))
}

def getObjectAsString(bucket: Bucket, key: String): Option[String] = {
val content = client.getObject(new GetObjectRequest(bucket, key))
val stream = content.getObjectContent

val content = client.getObject(GetObjectRequest.builder().bucket(bucket).key(key).build())
try {
Some(IOUtils.toString(stream).trim)
Some(IOUtils.toString(content).trim)
} catch {
case e: AmazonServiceException if e.getErrorCode == "NoSuchKey" =>
logger.warn(s"Cannot find key: $key in bucket: $bucket")
None
}
finally {
stream.close()
content.close()
}
}

def store(bucket: Bucket, id: Key, file: File, mimeType: Option[MimeType], meta: UserMetadata = Map.empty, cacheControl: Option[String] = None)
(implicit ex: ExecutionContext, logMarker: LogMarker): Future[S3Object] =
Future {
val metadata = new ObjectMetadata
mimeType.foreach(m => metadata.setContentType(m.name))
cacheControl.foreach(metadata.setCacheControl)
metadata.setUserMetadata(meta.asJava)

val fileMarkers = Map(
"bucket" -> bucket,
"fileName" -> id,
"mimeType" -> mimeType.getOrElse("none"),
)
val markers = logMarker ++ fileMarkers

val req = new PutObjectRequest(bucket, id, file).withMetadata(metadata)
val req = {
val builder = PutObjectRequest.builder().bucket(bucket).key(id).metadata(meta.asJava)
mimeType.foreach(mime => builder.contentType(mime.name))
cacheControl.foreach(builder.cacheControl)
builder.build()
}

Stopwatch(s"S3 client.putObject ($req)"){
client.putObject(req)
client.putObject(req, RequestBody.fromFile(file))
// once we've completed the PUT read back to ensure that we are returning reality
val metadata = client.getObjectMetadata(bucket, id)
S3Object(bucket, id, metadata.getContentLength, S3Metadata(metadata))
val response = client.headObject(HeadObjectRequest.builder().bucket(bucket).key(id).build())
S3Object(bucket, id, response.contentLength(), S3Metadata.fromHeadObjectResponse(response))
}(markers)
}

def storeIfNotPresent(bucket: Bucket, id: Key, file: File, mimeType: Option[MimeType], meta: UserMetadata = Map.empty, cacheControl: Option[String] = None)
(implicit ex: ExecutionContext, logMarker: LogMarker): Future[S3Object] = {
Future{
Some(client.getObjectMetadata(bucket, id))
Future {
Some(client.headObject(HeadObjectRequest.builder().bucket(bucket).key(id).build()))
}.recover {
// translate this exception into the object not existing
case as3e:AmazonS3Exception if as3e.getStatusCode == 404 => None
case as3e: S3Exception if as3e.statusCode() == 404 => None
}.flatMap {
case Some(objectMetadata) =>
logger.info(logMarker, s"Skipping storing of S3 file $id as key is already present in bucket $bucket")
Future.successful(S3Object(bucket, id, objectMetadata.getContentLength, S3Metadata(objectMetadata)))
Future.successful(S3Object(bucket, id, objectMetadata.contentLength(), S3Metadata.fromHeadObjectResponse(objectMetadata)))
case None =>
store(bucket, id, file, mimeType, meta, cacheControl)
}
Expand All @@ -143,47 +153,43 @@ class S3(config: CommonConfig) extends GridLogging with ContentDisposition with
def list(bucket: Bucket, prefixDir: String)
(implicit ex: ExecutionContext): Future[List[S3Object]] =
Future {
val req = new ListObjectsRequest().withBucketName(bucket).withPrefix(s"$prefixDir/")
val req = ListObjectsRequest.builder().bucket(bucket).prefix(s"$prefixDir/").build()
val listing = client.listObjects(req)
val summaries = listing.getObjectSummaries.asScala
summaries.map(summary => (summary.getKey, summary)).foldLeft(List[S3Object]()) {
val summaries = listing.contents().asScala
summaries.map(summary => (summary.key(), summary)).foldLeft(List[S3Object]()) {
case (memo: List[S3Object], (key: String, summary: S3ObjectSummary)) =>
S3Object(bucket, key, summary.getSize, getMetadata(bucket, key)) :: memo
S3Object(bucket, key, summary.size(), getMetadata(bucket, key)) :: memo
}
}

def getMetadata(bucket: Bucket, key: Key): S3Metadata = {
val meta = client.getObjectMetadata(bucket, key)
S3Metadata(meta)
val resp = client.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build())
S3Metadata.fromHeadObjectResponse(resp)
}

def getUserMetadata(bucket: Bucket, key: Key): Map[Bucket, Bucket] =
client.getObjectMetadata(bucket, key).getUserMetadata.asScala.toMap

def syncFindKey(bucket: Bucket, prefixName: String): Option[Key] = {
val req = new ListObjectsRequest().withBucketName(bucket).withPrefix(s"$prefixName-")
val listing = client.listObjects(req)
val summaries = listing.getObjectSummaries.asScala
summaries.headOption.map(_.getKey)
}

client.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build()).metadata().asScala.toMap
}

object S3Ops {
// TODO make this localstack friendly
// TODO: Make this region aware - i.e. RegionUtils.getRegion(region).getServiceEndpoint(AmazonS3.ENDPOINT_PREFIX)
val s3Endpoint = "s3.amazonaws.com"

def buildS3Client(config: CommonConfig, localstackAware: Boolean = true, maybeRegionOverride: Option[String] = None): AmazonS3 = {
def buildS3Client(
config: CommonConfig,
localstackAware: Boolean = true,
maybeRegionOverride: Option[Region] = None
): S3Client = {
val builder = config.awsLocalEndpoint match {
case Some(_) if config.isDev =>
// TODO revise closer to the time of deprecation https://aws.amazon.com/blogs/aws/amazon-s3-path-deprecation-plan-the-rest-of-the-story/
// `withPathStyleAccessEnabled` for localstack
// see https://github.com/localstack/localstack/issues/1512
AmazonS3ClientBuilder.standard().withPathStyleAccessEnabled(true)
case _ => AmazonS3ClientBuilder.standard()
S3Client.builder().forcePathStyle(true)
case _ => S3Client.builder()
}

config.withAWSCredentials(builder, localstackAware, maybeRegionOverride).build()
config.withAWSCredentialsV2(builder, localstackAware, maybeRegionOverride).build()
}
}
Loading