diff --git a/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_message_body_loader.rb b/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_message_body_loader.rb new file mode 100644 index 000000000..d7156d2be --- /dev/null +++ b/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_message_body_loader.rb @@ -0,0 +1,59 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/errors" +require "json" + +module ElasticGraph + module IndexerLambda + # Resolves the raw body of an SQS message, including fetching offloaded payloads from S3. + # + # @private + class SqsMessageBodyLoader + S3_OFFLOADING_INDICATOR = '["software.amazon.payloadoffloading.PayloadS3Pointer"' + + def initialize(s3_client: nil) + @s3_client = s3_client + end + + # Loads the message body for the given SQS record. + # + # @param sqs_record [Hash] full SQS record carrying the body + # @return [String] resolved SQS message body + def load_body(sqs_record:) + body = sqs_record.fetch("body") + return body unless body.start_with?(S3_OFFLOADING_INDICATOR) + + get_payload_from_s3(body) + end + + private + + def get_payload_from_s3(json_string) + s3_pointer = JSON.parse(json_string)[1] + bucket_name = s3_pointer.fetch("s3BucketName") + object_key = s3_pointer.fetch("s3Key") + + begin + s3_client.get_object(bucket: bucket_name, key: object_key).body.read + rescue Aws::S3::Errors::ServiceError => e + raise Errors::S3OperationFailedError, "Error reading large message from S3. bucket: `#{bucket_name}` key: `#{object_key}` message: `#{e.message}`" + end + end + + # The S3 client is lazily initialized because loading the AWS SDK is relatively expensive, + # and offloaded SQS messages should be uncommon. + def s3_client + @s3_client ||= begin + require "aws-sdk-s3" + Aws::S3::Client.new + end + end + end + end +end diff --git a/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb b/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb index f3fcb34ad..3d8125dd7 100644 --- a/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb +++ b/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb @@ -9,6 +9,7 @@ require "elastic_graph/errors" require "elastic_graph/indexer/indexing_failures_error" require "elastic_graph/indexer_lambda/jsonl_decoder" +require "elastic_graph/indexer_lambda/sqs_message_body_loader" require "json" module ElasticGraph @@ -21,12 +22,13 @@ class SqsProcessor attr_reader :ignore_sqs_latency_timestamps_from_arns # @param event_payload_decoder [#decode_events] decoder for resolved SQS message bodies - def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, event_payload_decoder: JSONLDecoder.new, s3_client: nil) + # @param message_body_loader [#load_body] loader for SQS message bodies + def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, event_payload_decoder: JSONLDecoder.new, message_body_loader: SqsMessageBodyLoader.new) @indexer_processor = indexer_processor @logger = logger - @s3_client = s3_client @ignore_sqs_latency_timestamps_from_arns = ignore_sqs_latency_timestamps_from_arns @event_payload_decoder = event_payload_decoder + @message_body_loader = message_body_loader end # Processes the ElasticGraph events in the given `lambda_event`, indexing the data in the datastore. @@ -85,7 +87,7 @@ def events_from(lambda_event) @event_payload_decoder.decode_events( sqs_record: record, - body: body_from(record.fetch("body")) + body: @message_body_loader.load_body(sqs_record: record) ).map do |event| ElasticGraph::Support::HashUtil.deep_merge(event, sqs_metadata) end @@ -97,15 +99,6 @@ def events_from(lambda_event) end end - S3_OFFLOADING_INDICATOR = '["software.amazon.payloadoffloading.PayloadS3Pointer"' - - def body_from(body) - if body.start_with?(S3_OFFLOADING_INDICATOR) - body = get_payload_from_s3(body) - end - body - end - def extract_sqs_metadata(record) sqs_timestamps = { "processing_first_attempted_at" => millis_to_iso8601(record.dig("attributes", "ApproximateFirstReceiveTimestamp")), @@ -124,28 +117,6 @@ def millis_to_iso8601(millis) Time.at(seconds, millis, :millisecond).getutc.iso8601(3) end - def get_payload_from_s3(json_string) - s3_pointer = JSON.parse(json_string)[1] - bucket_name = s3_pointer.fetch("s3BucketName") - object_key = s3_pointer.fetch("s3Key") - - begin - s3_client.get_object(bucket: bucket_name, key: object_key).body.read - rescue Aws::S3::Errors::ServiceError => e - raise Errors::S3OperationFailedError, "Error reading large message from S3. bucket: `#{bucket_name}` key: `#{object_key}` message: `#{e.message}`" - end - end - - # The s3 client is being lazily initialized, as it's slow to import/init and it will only be used - # in rare scenarios where large messages need offloaded from SQS -> S3. - # See: (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html) - def s3_client - @s3_client ||= begin - require "aws-sdk-s3" - Aws::S3::Client.new - end - end - # Formats the response, including any failures, based on # https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting def format_response(failures) diff --git a/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_message_body_loader.rbs b/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_message_body_loader.rbs new file mode 100644 index 000000000..a91fd09e6 --- /dev/null +++ b/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_message_body_loader.rbs @@ -0,0 +1,16 @@ +module ElasticGraph + module IndexerLambda + class SqsMessageBodyLoader + def initialize: (?s3_client: Aws::S3::Client?) -> void + def load_body: (sqs_record: ::Hash[::String, untyped]) -> ::String + + private + + @s3_client: Aws::S3::Client? + + S3_OFFLOADING_INDICATOR: ::String + def get_payload_from_s3: (::String) -> ::String + def s3_client: () -> Aws::S3::Client + end + end +end diff --git a/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_processor.rbs b/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_processor.rbs index 20aa5cc1a..6d22a8c8a 100644 --- a/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_processor.rbs +++ b/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_processor.rbs @@ -8,31 +8,37 @@ module ElasticGraph ) -> ::Array[::Hash[::String, untyped]] end + interface _MessageBodyLoader + def load_body: ( + sqs_record: ::Hash[::String, untyped] + ) -> ::String + end + def initialize: ( Indexer::Processor, logger: ::Logger, ignore_sqs_latency_timestamps_from_arns: ::Set[::String], ?event_payload_decoder: _EventPayloadDecoder, - ?s3_client: Aws::S3::Client?, + ?message_body_loader: _MessageBodyLoader, ) -> void - def process: (::Hash[::String, untyped], ?refresh_indices: bool) -> void + def process: ( + ::Hash[::String, untyped], + ?refresh_indices: bool + ) -> {"batchItemFailures" => ::Array[{"itemIdentifier" => ::String}]} private @indexer_processor: Indexer::Processor @logger: ::Logger @event_payload_decoder: _EventPayloadDecoder - @s3_client: Aws::S3::Client? + @message_body_loader: _MessageBodyLoader attr_reader ignore_sqs_latency_timestamps_from_arns: ::Set[::String] def events_from: (::Hash[::String, untyped]) -> ::Array[::Hash[::String, untyped]] - S3_OFFLOADING_INDICATOR: String def extract_sqs_metadata: (::Hash[String, untyped]) -> ::Hash[::String, untyped] def millis_to_iso8601: (::String) -> ::String? - def get_payload_from_s3: (::String) -> ::String - def s3_client: () -> Aws::S3::Client def format_response: ( ::Array[Indexer::FailedEventError] ) -> {"batchItemFailures" => ::Array[{"itemIdentifier" => ::String}]} diff --git a/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_message_body_loader_spec.rb b/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_message_body_loader_spec.rb new file mode 100644 index 000000000..0a1095a6e --- /dev/null +++ b/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_message_body_loader_spec.rb @@ -0,0 +1,81 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "aws-sdk-s3" +require "elastic_graph/errors" +require "elastic_graph/indexer_lambda/sqs_message_body_loader" +require "elastic_graph/spec_support/lambda_function" +require "json" + +module ElasticGraph + module IndexerLambda + RSpec.describe SqsMessageBodyLoader do + describe "#load_body" do + it "returns inline SQS message bodies unchanged" do + loader = described_class.new + + loaded_body = loader.load_body(sqs_record: {"body" => "{\"field1\":{}}"}) + + expect(loaded_body).to eq("{\"field1\":{}}") + end + + it "retrieves large messages from S3 when an ElasticGraph event was offloaded there" do + bucket_name = "test-bucket-name" + s3_key = "88680f6d-53d4-4143-b8c7-f5b1189213b6" + body = "{\"field1\":{}}\n{\"field2\":{}}" + s3_client = Aws::S3::Client.new(stub_responses: true) + loader = described_class.new(s3_client: s3_client) + + sqs_record = { + "body" => JSON.generate([ + "software.amazon.payloadoffloading.PayloadS3Pointer", + {"s3BucketName" => bucket_name, "s3Key" => s3_key} + ]) + } + + s3_client.stub_responses(:get_object, ->(context) { + expect(context.params).to include(bucket: bucket_name, key: s3_key) + {body: body} + }) + + expect(loader.load_body(sqs_record: sqs_record)).to eq(body) + end + + it "raises a detailed error when fetching from S3 fails" do + bucket_name = "test-bucket-name" + s3_key = "88680f6d-53d4-4143-b8c7-f5b1189213b6" + s3_client = Aws::S3::Client.new(stub_responses: true) + loader = described_class.new(s3_client: s3_client) + + sqs_record = { + "body" => JSON.generate([ + "software.amazon.payloadoffloading.PayloadS3Pointer", + {"s3BucketName" => bucket_name, "s3Key" => s3_key} + ]) + } + + s3_client.stub_responses(:get_object, "NoSuchkey") + + expect { + loader.load_body(sqs_record: sqs_record) + }.to raise_error Errors::S3OperationFailedError, a_string_including( + "Error reading large message from S3. bucket: `#{bucket_name}` key: `#{s3_key}` message: `stubbed-response-error-message`" + ) + end + end + + context "when instantiated without an S3 client injection" do + include_context "lambda function" + + it "lazily creates the S3 client when needed" do + expect(described_class.new.send(:s3_client)).to be_a Aws::S3::Client + end + end + end + end +end diff --git a/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_processor_spec.rb b/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_processor_spec.rb index 8d18b94e4..8483c3425 100644 --- a/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_processor_spec.rb +++ b/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_processor_spec.rb @@ -11,8 +11,6 @@ require "elastic_graph/indexer/processor" require "elastic_graph/indexer_lambda/sqs_processor" require "elastic_graph/spec_support/lambda_function" -require "json" -require "aws-sdk-s3" module ElasticGraph module IndexerLambda @@ -21,7 +19,6 @@ module IndexerLambda let(:indexer_processor) { instance_double(Indexer::Processor, process_returning_failures: []) } describe "#process" do - let(:s3_client) { Aws::S3::Client.new(stub_responses: true) } let(:sqs_processor) { build_sqs_processor } it "processes a lambda event containing a single SQS message with a single ElasticGraph event" do @@ -81,7 +78,7 @@ module IndexerLambda lambda_event = { "Records" => [ - sqs_message("a", {"field1" => {}}) + sqs_message("a", "{\"field1\":{}}") ] } @@ -97,6 +94,27 @@ module IndexerLambda ], refresh_indices: false) end + it "loads message bodies using the configured message body loader" do + message_body_loader = instance_double(SqsMessageBodyLoader, load_body: "{\"field1\":{}}") + sqs_processor = build_sqs_processor(message_body_loader: message_body_loader) + + lambda_event = { + "Records" => [ + sqs_message("a", {"field1" => {}}) + ] + } + + sqs_processor.process(lambda_event) + + expect(message_body_loader).to have_received(:load_body).with( + sqs_record: lambda_event.fetch("Records").first + ) + + expect(indexer_processor).to have_received(:process_returning_failures).with([ + {"field1" => {}, "message_id" => "a"} + ], refresh_indices: false) + end + it "logs the SQS message ids received in the lambda event and the `sqs_received_at` if available" do sent_timestamp_millis = "796010423456" sent_timestamp_iso8601 = "1995-03-24T02:00:23.456Z" @@ -147,55 +165,6 @@ module IndexerLambda expect(indexer_processor).not_to have_received(:process_returning_failures) end - it "retrieves large messages from s3 when an ElasticGraph event was offloaded there" do - bucket_name = "test-bucket-name" - s3_key = "88680f6d-53d4-4143-b8c7-f5b1189213b6" - event_payload = {"test" => "data"} - - lambda_event = { - "Records" => [ - sqs_message("a", JSON.generate([ - "software.amazon.payloadoffloading.PayloadS3Pointer", - {"s3BucketName" => bucket_name, "s3Key" => s3_key} - ])) - ] - } - - s3_client.stub_responses(:get_object, ->(context) { - expect(context.params).to include(bucket: bucket_name, key: s3_key) - {body: jsonl(event_payload)} - }) - - sqs_processor.process(lambda_event) - - expect(indexer_processor).to have_received(:process_returning_failures).with( - [event_payload.merge("message_id" => "a")], - refresh_indices: false - ) - end - - it "throws a detailed error when fetching from s3 fails" do - bucket_name = "test-bucket-name" - s3_key = "88680f6d-53d4-4143-b8c7-f5b1189213b6" - - lambda_event = { - "Records" => [ - sqs_message("a", JSON.generate([ - "software.amazon.payloadoffloading.PayloadS3Pointer", - {"s3BucketName" => bucket_name, "s3Key" => s3_key} - ])) - ] - } - - s3_client.stub_responses(:get_object, "NoSuchkey") - - expect { - sqs_processor.process(lambda_event) - }.to raise_error Errors::S3OperationFailedError, a_string_including( - "Error reading large message from S3. bucket: `#{bucket_name}` key: `#{s3_key}` message: `stubbed-response-error-message`" - ) - end - it "parses and merges SQS timestamps into non-existing `latency_timestamps` field" do approximate_first_receive_timestamp_millis = "1696334412345" sent_timestamp_millis = "796010423456" @@ -361,18 +330,6 @@ module IndexerLambda def failure_of(id, message: "boom", event: {}) instance_double(Indexer::FailedEventError, id: id, message: message, event: event) end - - def build_sqs_processor(**options) - super(s3_client: s3_client, **options) - end - end - - context "when instantiated without an S3 client injection" do - include_context "lambda function" - - it "lazily creates the S3 client when needed" do - expect(build_sqs_processor.send(:s3_client)).to be_a Aws::S3::Client - end end def sqs_message(message_id, *body, event_source_arn: "arn:aws:sqs:us-east-2:123456789012:my-queue", attributes: nil)