Skip to content

Commit 759b9de

Browse files
committed
Extract S3 body loading from indexer lambda processor
1 parent cd70e31 commit 759b9de

6 files changed

Lines changed: 195 additions & 105 deletions

File tree

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Copyright 2024 - 2026 Block, Inc.
2+
#
3+
# Use of this source code is governed by an MIT-style
4+
# license that can be found in the LICENSE file or at
5+
# https://opensource.org/licenses/MIT.
6+
#
7+
# frozen_string_literal: true
8+
9+
require "elastic_graph/errors"
10+
require "json"
11+
12+
module ElasticGraph
13+
module IndexerLambda
14+
# Resolves the raw body of an SQS message, including fetching offloaded payloads from S3.
15+
#
16+
# @private
17+
class SqsMessageBodyLoader
18+
S3_OFFLOADING_INDICATOR = '["software.amazon.payloadoffloading.PayloadS3Pointer"'
19+
20+
def initialize(s3_client: nil)
21+
@s3_client = s3_client
22+
end
23+
24+
# Loads the message body for the given SQS record.
25+
#
26+
# @param sqs_record [Hash] full SQS record carrying the body
27+
# @return [String] resolved SQS message body
28+
def load_body(sqs_record:)
29+
body = sqs_record.fetch("body")
30+
return body unless body.start_with?(S3_OFFLOADING_INDICATOR)
31+
32+
get_payload_from_s3(body)
33+
end
34+
35+
private
36+
37+
def get_payload_from_s3(json_string)
38+
s3_pointer = JSON.parse(json_string)[1]
39+
bucket_name = s3_pointer.fetch("s3BucketName")
40+
object_key = s3_pointer.fetch("s3Key")
41+
42+
begin
43+
s3_client.get_object(bucket: bucket_name, key: object_key).body.read
44+
rescue Aws::S3::Errors::ServiceError => e
45+
raise Errors::S3OperationFailedError, "Error reading large message from S3. bucket: `#{bucket_name}` key: `#{object_key}` message: `#{e.message}`"
46+
end
47+
end
48+
49+
# The S3 client is lazily initialized because loading the AWS SDK is relatively expensive,
50+
# and offloaded SQS messages should be uncommon.
51+
def s3_client
52+
@s3_client ||= begin
53+
require "aws-sdk-s3"
54+
Aws::S3::Client.new
55+
end
56+
end
57+
end
58+
end
59+
end

elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb

Lines changed: 5 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
require "elastic_graph/errors"
1010
require "elastic_graph/indexer/indexing_failures_error"
1111
require "elastic_graph/indexer_lambda/jsonl_decoder"
12+
require "elastic_graph/indexer_lambda/sqs_message_body_loader"
1213
require "json"
1314

1415
module ElasticGraph
@@ -21,12 +22,13 @@ class SqsProcessor
2122
attr_reader :ignore_sqs_latency_timestamps_from_arns
2223

2324
# @param event_payload_decoder [#decode_events] decoder for resolved SQS message bodies
24-
def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, event_payload_decoder: JSONLDecoder.new, s3_client: nil)
25+
# @param message_body_loader [#load_body] loader for SQS message bodies
26+
def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, event_payload_decoder: JSONLDecoder.new, message_body_loader: SqsMessageBodyLoader.new)
2527
@indexer_processor = indexer_processor
2628
@logger = logger
27-
@s3_client = s3_client
2829
@ignore_sqs_latency_timestamps_from_arns = ignore_sqs_latency_timestamps_from_arns
2930
@event_payload_decoder = event_payload_decoder
31+
@message_body_loader = message_body_loader
3032
end
3133

3234
# Processes the ElasticGraph events in the given `lambda_event`, indexing the data in the datastore.
@@ -85,7 +87,7 @@ def events_from(lambda_event)
8587

8688
@event_payload_decoder.decode_events(
8789
sqs_record: record,
88-
body: body_from(record.fetch("body"))
90+
body: @message_body_loader.load_body(sqs_record: record)
8991
).map do |event|
9092
ElasticGraph::Support::HashUtil.deep_merge(event, sqs_metadata)
9193
end
@@ -97,15 +99,6 @@ def events_from(lambda_event)
9799
end
98100
end
99101

100-
S3_OFFLOADING_INDICATOR = '["software.amazon.payloadoffloading.PayloadS3Pointer"'
101-
102-
def body_from(body)
103-
if body.start_with?(S3_OFFLOADING_INDICATOR)
104-
body = get_payload_from_s3(body)
105-
end
106-
body
107-
end
108-
109102
def extract_sqs_metadata(record)
110103
sqs_timestamps = {
111104
"processing_first_attempted_at" => millis_to_iso8601(record.dig("attributes", "ApproximateFirstReceiveTimestamp")),
@@ -124,28 +117,6 @@ def millis_to_iso8601(millis)
124117
Time.at(seconds, millis, :millisecond).getutc.iso8601(3)
125118
end
126119

127-
def get_payload_from_s3(json_string)
128-
s3_pointer = JSON.parse(json_string)[1]
129-
bucket_name = s3_pointer.fetch("s3BucketName")
130-
object_key = s3_pointer.fetch("s3Key")
131-
132-
begin
133-
s3_client.get_object(bucket: bucket_name, key: object_key).body.read
134-
rescue Aws::S3::Errors::ServiceError => e
135-
raise Errors::S3OperationFailedError, "Error reading large message from S3. bucket: `#{bucket_name}` key: `#{object_key}` message: `#{e.message}`"
136-
end
137-
end
138-
139-
# The s3 client is being lazily initialized, as it's slow to import/init and it will only be used
140-
# in rare scenarios where large messages need offloaded from SQS -> S3.
141-
# See: (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html)
142-
def s3_client
143-
@s3_client ||= begin
144-
require "aws-sdk-s3"
145-
Aws::S3::Client.new
146-
end
147-
end
148-
149120
# Formats the response, including any failures, based on
150121
# https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
151122
def format_response(failures)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
module ElasticGraph
2+
module IndexerLambda
3+
class SqsMessageBodyLoader
4+
def initialize: (?s3_client: Aws::S3::Client?) -> void
5+
def load_body: (sqs_record: ::Hash[::String, untyped]) -> ::String
6+
7+
private
8+
9+
@s3_client: Aws::S3::Client?
10+
11+
S3_OFFLOADING_INDICATOR: ::String
12+
def get_payload_from_s3: (::String) -> ::String
13+
def s3_client: () -> Aws::S3::Client
14+
end
15+
end
16+
end

elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_processor.rbs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,31 +8,37 @@ module ElasticGraph
88
) -> ::Array[::Hash[::String, untyped]]
99
end
1010

11+
interface _MessageBodyLoader
12+
def load_body: (
13+
sqs_record: ::Hash[::String, untyped]
14+
) -> ::String
15+
end
16+
1117
def initialize: (
1218
Indexer::Processor,
1319
logger: ::Logger,
1420
ignore_sqs_latency_timestamps_from_arns: ::Set[::String],
1521
?event_payload_decoder: _EventPayloadDecoder,
16-
?s3_client: Aws::S3::Client?,
22+
?message_body_loader: _MessageBodyLoader,
1723
) -> void
1824

19-
def process: (::Hash[::String, untyped], ?refresh_indices: bool) -> void
25+
def process: (
26+
::Hash[::String, untyped],
27+
?refresh_indices: bool
28+
) -> {"batchItemFailures" => ::Array[{"itemIdentifier" => ::String}]}
2029

2130
private
2231

2332
@indexer_processor: Indexer::Processor
2433
@logger: ::Logger
2534
@event_payload_decoder: _EventPayloadDecoder
26-
@s3_client: Aws::S3::Client?
35+
@message_body_loader: _MessageBodyLoader
2736

2837
attr_reader ignore_sqs_latency_timestamps_from_arns: ::Set[::String]
2938

3039
def events_from: (::Hash[::String, untyped]) -> ::Array[::Hash[::String, untyped]]
31-
S3_OFFLOADING_INDICATOR: String
3240
def extract_sqs_metadata: (::Hash[String, untyped]) -> ::Hash[::String, untyped]
3341
def millis_to_iso8601: (::String) -> ::String?
34-
def get_payload_from_s3: (::String) -> ::String
35-
def s3_client: () -> Aws::S3::Client
3642
def format_response: (
3743
::Array[Indexer::FailedEventError]
3844
) -> {"batchItemFailures" => ::Array[{"itemIdentifier" => ::String}]}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Copyright 2024 - 2026 Block, Inc.
2+
#
3+
# Use of this source code is governed by an MIT-style
4+
# license that can be found in the LICENSE file or at
5+
# https://opensource.org/licenses/MIT.
6+
#
7+
# frozen_string_literal: true
8+
9+
require "aws-sdk-s3"
10+
require "elastic_graph/errors"
11+
require "elastic_graph/indexer_lambda/sqs_message_body_loader"
12+
require "elastic_graph/spec_support/lambda_function"
13+
require "json"
14+
15+
module ElasticGraph
16+
module IndexerLambda
17+
RSpec.describe SqsMessageBodyLoader do
18+
describe "#load_body" do
19+
it "returns inline SQS message bodies unchanged" do
20+
loader = described_class.new
21+
22+
loaded_body = loader.load_body(sqs_record: {"body" => "{\"field1\":{}}"})
23+
24+
expect(loaded_body).to eq("{\"field1\":{}}")
25+
end
26+
27+
it "retrieves large messages from S3 when an ElasticGraph event was offloaded there" do
28+
bucket_name = "test-bucket-name"
29+
s3_key = "88680f6d-53d4-4143-b8c7-f5b1189213b6"
30+
body = "{\"field1\":{}}\n{\"field2\":{}}"
31+
s3_client = Aws::S3::Client.new(stub_responses: true)
32+
loader = described_class.new(s3_client: s3_client)
33+
34+
sqs_record = {
35+
"body" => JSON.generate([
36+
"software.amazon.payloadoffloading.PayloadS3Pointer",
37+
{"s3BucketName" => bucket_name, "s3Key" => s3_key}
38+
])
39+
}
40+
41+
s3_client.stub_responses(:get_object, ->(context) {
42+
expect(context.params).to include(bucket: bucket_name, key: s3_key)
43+
{body: body}
44+
})
45+
46+
expect(loader.load_body(sqs_record: sqs_record)).to eq(body)
47+
end
48+
49+
it "raises a detailed error when fetching from S3 fails" do
50+
bucket_name = "test-bucket-name"
51+
s3_key = "88680f6d-53d4-4143-b8c7-f5b1189213b6"
52+
s3_client = Aws::S3::Client.new(stub_responses: true)
53+
loader = described_class.new(s3_client: s3_client)
54+
55+
sqs_record = {
56+
"body" => JSON.generate([
57+
"software.amazon.payloadoffloading.PayloadS3Pointer",
58+
{"s3BucketName" => bucket_name, "s3Key" => s3_key}
59+
])
60+
}
61+
62+
s3_client.stub_responses(:get_object, "NoSuchkey")
63+
64+
expect {
65+
loader.load_body(sqs_record: sqs_record)
66+
}.to raise_error Errors::S3OperationFailedError, a_string_including(
67+
"Error reading large message from S3. bucket: `#{bucket_name}` key: `#{s3_key}` message: `stubbed-response-error-message`"
68+
)
69+
end
70+
end
71+
72+
context "when instantiated without an S3 client injection" do
73+
include_context "lambda function"
74+
75+
it "lazily creates the S3 client when needed" do
76+
expect(described_class.new.send(:s3_client)).to be_a Aws::S3::Client
77+
end
78+
end
79+
end
80+
end
81+
end

0 commit comments

Comments
 (0)