Skip to content

Commit cd70e31

Browse files
committed
Refactor indexer lambda event decoding
1 parent 89b5446 commit cd70e31

6 files changed

Lines changed: 126 additions & 7 deletions

File tree

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Copyright 2024 - 2026 Block, Inc.
2+
#
3+
# Use of this source code is governed by an MIT-style
4+
# license that can be found in the LICENSE file or at
5+
# https://opensource.org/licenses/MIT.
6+
#
7+
# frozen_string_literal: true
8+
9+
require "json"
10+
11+
module ElasticGraph
12+
module IndexerLambda
13+
# Decodes SQS message payloads encoded as JSON Lines into ElasticGraph events.
14+
#
15+
# `SqsProcessor` accepts alternate decoders that implement the same
16+
# `#decode_events(sqs_record:, body:)` contract and return event hashes.
17+
#
18+
# @private
19+
class JSONLDecoder
20+
# Decodes the given message payload into zero or more ElasticGraph events.
21+
#
22+
# @param sqs_record [Hash] full SQS record carrying the payload
23+
# @param body [String] resolved SQS message body
24+
# @return [Array<Hash>] decoded ElasticGraph events
25+
def decode_events(sqs_record:, body:)
26+
_ = sqs_record
27+
body.split("\n").map { |event| JSON.parse(event) }
28+
end
29+
end
30+
end
31+
end

elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
require "elastic_graph/errors"
1010
require "elastic_graph/indexer/indexing_failures_error"
11+
require "elastic_graph/indexer_lambda/jsonl_decoder"
1112
require "json"
1213

1314
module ElasticGraph
@@ -19,11 +20,13 @@ class SqsProcessor
1920
# @dynamic ignore_sqs_latency_timestamps_from_arns
2021
attr_reader :ignore_sqs_latency_timestamps_from_arns
2122

22-
def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, s3_client: nil)
23+
# @param event_payload_decoder [#decode_events] decoder for resolved SQS message bodies
24+
def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, event_payload_decoder: JSONLDecoder.new, s3_client: nil)
2325
@indexer_processor = indexer_processor
2426
@logger = logger
2527
@s3_client = s3_client
2628
@ignore_sqs_latency_timestamps_from_arns = ignore_sqs_latency_timestamps_from_arns
29+
@event_payload_decoder = event_payload_decoder
2730
end
2831

2932
# Processes the ElasticGraph events in the given `lambda_event`, indexing the data in the datastore.
@@ -80,7 +83,10 @@ def events_from(lambda_event)
8083
sqs_metadata = sqs_metadata.except("latency_timestamps")
8184
end
8285

83-
parse_jsonl(record.fetch("body")).map do |event|
86+
@event_payload_decoder.decode_events(
87+
sqs_record: record,
88+
body: body_from(record.fetch("body"))
89+
).map do |event|
8490
ElasticGraph::Support::HashUtil.deep_merge(event, sqs_metadata)
8591
end
8692
end.tap do
@@ -93,11 +99,11 @@ def events_from(lambda_event)
9399

94100
S3_OFFLOADING_INDICATOR = '["software.amazon.payloadoffloading.PayloadS3Pointer"'
95101

96-
def parse_jsonl(jsonl_string)
97-
if jsonl_string.start_with?(S3_OFFLOADING_INDICATOR)
98-
jsonl_string = get_payload_from_s3(jsonl_string)
102+
def body_from(body)
103+
if body.start_with?(S3_OFFLOADING_INDICATOR)
104+
body = get_payload_from_s3(body)
99105
end
100-
jsonl_string.split("\n").map { |event| JSON.parse(event) }
106+
body
101107
end
102108

103109
def extract_sqs_metadata(record)
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
module ElasticGraph
2+
module IndexerLambda
3+
class JSONLDecoder
4+
def decode_events: (
5+
sqs_record: ::Hash[::String, untyped],
6+
body: ::String
7+
) -> ::Array[::Hash[::String, untyped]]
8+
end
9+
end
10+
end

elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_processor.rbs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
module ElasticGraph
22
module IndexerLambda
33
class SqsProcessor
4+
interface _EventPayloadDecoder
5+
def decode_events: (
6+
sqs_record: ::Hash[::String, untyped],
7+
body: ::String
8+
) -> ::Array[::Hash[::String, untyped]]
9+
end
10+
411
def initialize: (
512
Indexer::Processor,
613
logger: ::Logger,
714
ignore_sqs_latency_timestamps_from_arns: ::Set[::String],
15+
?event_payload_decoder: _EventPayloadDecoder,
816
?s3_client: Aws::S3::Client?,
917
) -> void
1018

@@ -14,6 +22,7 @@ module ElasticGraph
1422

1523
@indexer_processor: Indexer::Processor
1624
@logger: ::Logger
25+
@event_payload_decoder: _EventPayloadDecoder
1726
@s3_client: Aws::S3::Client?
1827

1928
attr_reader ignore_sqs_latency_timestamps_from_arns: ::Set[::String]
@@ -22,7 +31,6 @@ module ElasticGraph
2231
S3_OFFLOADING_INDICATOR: String
2332
def extract_sqs_metadata: (::Hash[String, untyped]) -> ::Hash[::String, untyped]
2433
def millis_to_iso8601: (::String) -> ::String?
25-
def parse_jsonl: (::String) -> ::Array[::Hash[::String, untyped]]
2634
def get_payload_from_s3: (::String) -> ::String
2735
def s3_client: () -> Aws::S3::Client
2836
def format_response: (
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Copyright 2024 - 2026 Block, Inc.
2+
#
3+
# Use of this source code is governed by an MIT-style
4+
# license that can be found in the LICENSE file or at
5+
# https://opensource.org/licenses/MIT.
6+
#
7+
# frozen_string_literal: true
8+
9+
require "elastic_graph/indexer_lambda/jsonl_decoder"
10+
11+
module ElasticGraph
12+
module IndexerLambda
13+
RSpec.describe JSONLDecoder do
14+
describe "#decode_events" do
15+
it "parses JSON Lines payloads into ElasticGraph events" do
16+
decoder = described_class.new
17+
18+
decoded_events = decoder.decode_events(
19+
sqs_record: {"messageId" => "123"},
20+
body: %({"id":"1"}\n{"id":"2","record":{"name":"Widget"}})
21+
)
22+
23+
expect(decoded_events).to eq([
24+
{"id" => "1"},
25+
{"id" => "2", "record" => {"name" => "Widget"}}
26+
])
27+
end
28+
29+
it "returns no events for an empty message body" do
30+
decoder = described_class.new
31+
32+
expect(
33+
decoder.decode_events(
34+
sqs_record: {"messageId" => "123"},
35+
body: ""
36+
)
37+
).to eq([])
38+
end
39+
end
40+
end
41+
end
42+
end

elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_processor_spec.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,28 @@ module IndexerLambda
7575
], refresh_indices: false)
7676
end
7777

78+
it "decodes message bodies using the configured event payload decoder" do
79+
event_payload_decoder = instance_double(JSONLDecoder, decode_events: [{"field1" => {}}])
80+
sqs_processor = build_sqs_processor(event_payload_decoder: event_payload_decoder)
81+
82+
lambda_event = {
83+
"Records" => [
84+
sqs_message("a", {"field1" => {}})
85+
]
86+
}
87+
88+
sqs_processor.process(lambda_event)
89+
90+
expect(event_payload_decoder).to have_received(:decode_events).with(
91+
sqs_record: lambda_event.fetch("Records").first,
92+
body: "{\"field1\":{}}"
93+
)
94+
95+
expect(indexer_processor).to have_received(:process_returning_failures).with([
96+
{"field1" => {}, "message_id" => "a"}
97+
], refresh_indices: false)
98+
end
99+
78100
it "logs the SQS message ids received in the lambda event and the `sqs_received_at` if available" do
79101
sent_timestamp_millis = "796010423456"
80102
sent_timestamp_iso8601 = "1995-03-24T02:00:23.456Z"

0 commit comments

Comments
 (0)