Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2024 - 2026 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

require "elastic_graph/errors"
require "json"

module ElasticGraph
module IndexerLambda
# Resolves the raw body of an SQS message, including fetching offloaded payloads from S3.
#
# @private
class SqsMessageBodyLoader
S3_OFFLOADING_INDICATOR = '["software.amazon.payloadoffloading.PayloadS3Pointer"'

def initialize(s3_client: nil)
@s3_client = s3_client
end

# Loads the message body for the given SQS record.
#
# @param sqs_record [Hash] full SQS record carrying the body
# @return [String] resolved SQS message body
def load_body(sqs_record:)
body = sqs_record.fetch("body")
return body unless body.start_with?(S3_OFFLOADING_INDICATOR)

get_payload_from_s3(body)
end

private

def get_payload_from_s3(json_string)
s3_pointer = JSON.parse(json_string)[1]
bucket_name = s3_pointer.fetch("s3BucketName")
object_key = s3_pointer.fetch("s3Key")

begin
s3_client.get_object(bucket: bucket_name, key: object_key).body.read
rescue Aws::S3::Errors::ServiceError => e
raise Errors::S3OperationFailedError, "Error reading large message from S3. bucket: `#{bucket_name}` key: `#{object_key}` message: `#{e.message}`"
end
end

# The S3 client is lazily initialized because loading the AWS SDK is relatively expensive,
# and offloaded SQS messages should be uncommon.
def s3_client
@s3_client ||= begin
require "aws-sdk-s3"
Aws::S3::Client.new
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
require "elastic_graph/errors"
require "elastic_graph/indexer/indexing_failures_error"
require "elastic_graph/indexer_lambda/jsonl_decoder"
require "elastic_graph/indexer_lambda/sqs_message_body_loader"
require "json"

module ElasticGraph
Expand All @@ -21,12 +22,13 @@ class SqsProcessor
attr_reader :ignore_sqs_latency_timestamps_from_arns

# @param event_payload_decoder [#decode_events] decoder for resolved SQS message bodies
def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, event_payload_decoder: JSONLDecoder.new, s3_client: nil)
# @param message_body_loader [#load_body] loader for SQS message bodies
def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, event_payload_decoder: JSONLDecoder.new, message_body_loader: SqsMessageBodyLoader.new)
@indexer_processor = indexer_processor
@logger = logger
@s3_client = s3_client
@ignore_sqs_latency_timestamps_from_arns = ignore_sqs_latency_timestamps_from_arns
@event_payload_decoder = event_payload_decoder
@message_body_loader = message_body_loader
end

# Processes the ElasticGraph events in the given `lambda_event`, indexing the data in the datastore.
Expand Down Expand Up @@ -85,7 +87,7 @@ def events_from(lambda_event)

@event_payload_decoder.decode_events(
sqs_record: record,
body: body_from(record.fetch("body"))
body: @message_body_loader.load_body(sqs_record: record)
).map do |event|
ElasticGraph::Support::HashUtil.deep_merge(event, sqs_metadata)
end
Expand All @@ -97,15 +99,6 @@ def events_from(lambda_event)
end
end

S3_OFFLOADING_INDICATOR = '["software.amazon.payloadoffloading.PayloadS3Pointer"'

def body_from(body)
if body.start_with?(S3_OFFLOADING_INDICATOR)
body = get_payload_from_s3(body)
end
body
end

def extract_sqs_metadata(record)
sqs_timestamps = {
"processing_first_attempted_at" => millis_to_iso8601(record.dig("attributes", "ApproximateFirstReceiveTimestamp")),
Expand All @@ -124,28 +117,6 @@ def millis_to_iso8601(millis)
Time.at(seconds, millis, :millisecond).getutc.iso8601(3)
end

def get_payload_from_s3(json_string)
s3_pointer = JSON.parse(json_string)[1]
bucket_name = s3_pointer.fetch("s3BucketName")
object_key = s3_pointer.fetch("s3Key")

begin
s3_client.get_object(bucket: bucket_name, key: object_key).body.read
rescue Aws::S3::Errors::ServiceError => e
raise Errors::S3OperationFailedError, "Error reading large message from S3. bucket: `#{bucket_name}` key: `#{object_key}` message: `#{e.message}`"
end
end

# The s3 client is being lazily initialized, as it's slow to import/init and it will only be used
# in rare scenarios where large messages need offloaded from SQS -> S3.
# See: (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html)
def s3_client
@s3_client ||= begin
require "aws-sdk-s3"
Aws::S3::Client.new
end
end

# Formats the response, including any failures, based on
# https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
def format_response(failures)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module ElasticGraph
module IndexerLambda
class SqsMessageBodyLoader
def initialize: (?s3_client: Aws::S3::Client?) -> void
def load_body: (sqs_record: ::Hash[::String, untyped]) -> ::String

private

@s3_client: Aws::S3::Client?

S3_OFFLOADING_INDICATOR: ::String
def get_payload_from_s3: (::String) -> ::String
def s3_client: () -> Aws::S3::Client
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,37 @@ module ElasticGraph
) -> ::Array[::Hash[::String, untyped]]
end

interface _MessageBodyLoader
def load_body: (
sqs_record: ::Hash[::String, untyped]
) -> ::String
end

def initialize: (
Indexer::Processor,
logger: ::Logger,
ignore_sqs_latency_timestamps_from_arns: ::Set[::String],
?event_payload_decoder: _EventPayloadDecoder,
?s3_client: Aws::S3::Client?,
?message_body_loader: _MessageBodyLoader,
) -> void

def process: (::Hash[::String, untyped], ?refresh_indices: bool) -> void
def process: (
::Hash[::String, untyped],
?refresh_indices: bool
) -> {"batchItemFailures" => ::Array[{"itemIdentifier" => ::String}]}

private

@indexer_processor: Indexer::Processor
@logger: ::Logger
@event_payload_decoder: _EventPayloadDecoder
@s3_client: Aws::S3::Client?
@message_body_loader: _MessageBodyLoader

attr_reader ignore_sqs_latency_timestamps_from_arns: ::Set[::String]

def events_from: (::Hash[::String, untyped]) -> ::Array[::Hash[::String, untyped]]
S3_OFFLOADING_INDICATOR: String
def extract_sqs_metadata: (::Hash[String, untyped]) -> ::Hash[::String, untyped]
def millis_to_iso8601: (::String) -> ::String?
def get_payload_from_s3: (::String) -> ::String
def s3_client: () -> Aws::S3::Client
def format_response: (
::Array[Indexer::FailedEventError]
) -> {"batchItemFailures" => ::Array[{"itemIdentifier" => ::String}]}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright 2024 - 2026 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

require "aws-sdk-s3"
require "elastic_graph/errors"
require "elastic_graph/indexer_lambda/sqs_message_body_loader"
require "elastic_graph/spec_support/lambda_function"
require "json"

module ElasticGraph
module IndexerLambda
RSpec.describe SqsMessageBodyLoader do
describe "#load_body" do
it "returns inline SQS message bodies unchanged" do
loader = described_class.new

loaded_body = loader.load_body(sqs_record: {"body" => "{\"field1\":{}}"})

expect(loaded_body).to eq("{\"field1\":{}}")
end

it "retrieves large messages from S3 when an ElasticGraph event was offloaded there" do
bucket_name = "test-bucket-name"
s3_key = "88680f6d-53d4-4143-b8c7-f5b1189213b6"
body = "{\"field1\":{}}\n{\"field2\":{}}"
s3_client = Aws::S3::Client.new(stub_responses: true)
loader = described_class.new(s3_client: s3_client)

sqs_record = {
"body" => JSON.generate([
"software.amazon.payloadoffloading.PayloadS3Pointer",
{"s3BucketName" => bucket_name, "s3Key" => s3_key}
])
}

s3_client.stub_responses(:get_object, ->(context) {
expect(context.params).to include(bucket: bucket_name, key: s3_key)
{body: body}
})

expect(loader.load_body(sqs_record: sqs_record)).to eq(body)
end

it "raises a detailed error when fetching from S3 fails" do
bucket_name = "test-bucket-name"
s3_key = "88680f6d-53d4-4143-b8c7-f5b1189213b6"
s3_client = Aws::S3::Client.new(stub_responses: true)
loader = described_class.new(s3_client: s3_client)

sqs_record = {
"body" => JSON.generate([
"software.amazon.payloadoffloading.PayloadS3Pointer",
{"s3BucketName" => bucket_name, "s3Key" => s3_key}
])
}

s3_client.stub_responses(:get_object, "NoSuchkey")

expect {
loader.load_body(sqs_record: sqs_record)
}.to raise_error Errors::S3OperationFailedError, a_string_including(
"Error reading large message from S3. bucket: `#{bucket_name}` key: `#{s3_key}` message: `stubbed-response-error-message`"
)
end
end

context "when instantiated without an S3 client injection" do
include_context "lambda function"

it "lazily creates the S3 client when needed" do
expect(described_class.new.send(:s3_client)).to be_a Aws::S3::Client
end
end
end
end
end
Loading
Loading