Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2024 - 2026 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

require "json"

module ElasticGraph
module IndexerLambda
# Decodes SQS message payloads encoded as JSON Lines into ElasticGraph events.
#
# `SqsProcessor` accepts alternate decoders that implement the same
# `#decode_events(sqs_record:, body:)` contract and return event hashes.
#
# @private
class JSONLDecoder
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is stateless, so we could define this as a module and do def self.decode_events below so that there's no garbage instance created for no reason.

# Decodes the given message payload into zero or more ElasticGraph events.
#
# @param sqs_record [Hash] full SQS record carrying the payload
# @param body [String] resolved SQS message body
# @return [Array<Hash>] decoded ElasticGraph events
def decode_events(sqs_record:, body:)
_ = sqs_record
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this line? It looks like it doesn't do anything...

More generally, how do you expect sqs_record to get used?

body.split("\n").map { |event| JSON.parse(event) }
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
body.split("\n").map { |event| JSON.parse(event) }
body.split("\n").map { |event| ::JSON.parse(event) }

end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

require "elastic_graph/errors"
require "elastic_graph/indexer/indexing_failures_error"
require "elastic_graph/indexer_lambda/jsonl_decoder"
require "json"

module ElasticGraph
Expand All @@ -19,11 +20,13 @@ class SqsProcessor
# @dynamic ignore_sqs_latency_timestamps_from_arns
attr_reader :ignore_sqs_latency_timestamps_from_arns

def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, s3_client: nil)
# @param event_payload_decoder [#decode_events] decoder for resolved SQS message bodies
def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, event_payload_decoder: JSONLDecoder.new, s3_client: nil)
@indexer_processor = indexer_processor
@logger = logger
@s3_client = s3_client
@ignore_sqs_latency_timestamps_from_arns = ignore_sqs_latency_timestamps_from_arns
@event_payload_decoder = event_payload_decoder
end

# Processes the ElasticGraph events in the given `lambda_event`, indexing the data in the datastore.
Expand Down Expand Up @@ -80,7 +83,10 @@ def events_from(lambda_event)
sqs_metadata = sqs_metadata.except("latency_timestamps")
end

parse_jsonl(record.fetch("body")).map do |event|
@event_payload_decoder.decode_events(
sqs_record: record,
body: body_from(record.fetch("body"))
).map do |event|
ElasticGraph::Support::HashUtil.deep_merge(event, sqs_metadata)
end
end.tap do
Expand All @@ -93,11 +99,11 @@ def events_from(lambda_event)

S3_OFFLOADING_INDICATOR = '["software.amazon.payloadoffloading.PayloadS3Pointer"'

def parse_jsonl(jsonl_string)
if jsonl_string.start_with?(S3_OFFLOADING_INDICATOR)
jsonl_string = get_payload_from_s3(jsonl_string)
def body_from(body)
if body.start_with?(S3_OFFLOADING_INDICATOR)
body = get_payload_from_s3(body)
end
jsonl_string.split("\n").map { |event| JSON.parse(event) }
body
end

def extract_sqs_metadata(record)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module ElasticGraph
module IndexerLambda
class JSONLDecoder
def decode_events: (
sqs_record: ::Hash[::String, untyped],
body: ::String
) -> ::Array[::Hash[::String, untyped]]
Comment on lines +4 to +7
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def decode_events: (
sqs_record: ::Hash[::String, untyped],
body: ::String
) -> ::Array[::Hash[::String, untyped]]
extend _EventPayloadDecoder

This is the RBS equivalent of "implements the interface at the class level".

end
end
end
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
module ElasticGraph
module IndexerLambda
class SqsProcessor
interface _EventPayloadDecoder
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably move out of SqsProcessor so it can be referenced without the SqsProcessor:: prefix. Maybe moved into the jsonl_decoder file?

def decode_events: (
sqs_record: ::Hash[::String, untyped],
body: ::String
) -> ::Array[::Hash[::String, untyped]]
end

def initialize: (
Indexer::Processor,
logger: ::Logger,
ignore_sqs_latency_timestamps_from_arns: ::Set[::String],
?event_payload_decoder: _EventPayloadDecoder,
?s3_client: Aws::S3::Client?,
) -> void

Expand All @@ -14,6 +22,7 @@ module ElasticGraph

@indexer_processor: Indexer::Processor
@logger: ::Logger
@event_payload_decoder: _EventPayloadDecoder
@s3_client: Aws::S3::Client?

attr_reader ignore_sqs_latency_timestamps_from_arns: ::Set[::String]
Expand All @@ -22,7 +31,6 @@ module ElasticGraph
S3_OFFLOADING_INDICATOR: String
def extract_sqs_metadata: (::Hash[String, untyped]) -> ::Hash[::String, untyped]
def millis_to_iso8601: (::String) -> ::String?
def parse_jsonl: (::String) -> ::Array[::Hash[::String, untyped]]
def get_payload_from_s3: (::String) -> ::String
def s3_client: () -> Aws::S3::Client
def format_response: (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2024 - 2026 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

require "elastic_graph/indexer_lambda/jsonl_decoder"

module ElasticGraph
module IndexerLambda
RSpec.describe JSONLDecoder do
describe "#decode_events" do
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
describe "#decode_events" do
describe ".decode_events" do

(If you adopt my suggestion to make it callable directly on JSONLDecoder with no instance needed...).

it "parses JSON Lines payloads into ElasticGraph events" do
decoder = described_class.new

decoded_events = decoder.decode_events(
sqs_record: {"messageId" => "123"},
body: %({"id":"1"}\n{"id":"2","record":{"name":"Widget"}})
)

expect(decoded_events).to eq([
{"id" => "1"},
{"id" => "2", "record" => {"name" => "Widget"}}
])
end

it "returns no events for an empty message body" do
decoder = described_class.new

expect(
decoder.decode_events(
sqs_record: {"messageId" => "123"},
body: ""
)
).to eq([])
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,28 @@ module IndexerLambda
], refresh_indices: false)
end

it "decodes message bodies using the configured event payload decoder" do
event_payload_decoder = instance_double(JSONLDecoder, decode_events: [{"field1" => {}}])
sqs_processor = build_sqs_processor(event_payload_decoder: event_payload_decoder)

lambda_event = {
"Records" => [
sqs_message("a", {"field1" => {}})
]
}

sqs_processor.process(lambda_event)

expect(event_payload_decoder).to have_received(:decode_events).with(
sqs_record: lambda_event.fetch("Records").first,
body: "{\"field1\":{}}"
)

expect(indexer_processor).to have_received(:process_returning_failures).with([
{"field1" => {}, "message_id" => "a"}
], refresh_indices: false)
end

it "logs the SQS message ids received in the lambda event and the `sqs_received_at` if available" do
sent_timestamp_millis = "796010423456"
sent_timestamp_iso8601 = "1995-03-24T02:00:23.456Z"
Expand Down
Loading