diff --git a/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/jsonl_decoder.rb b/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/jsonl_decoder.rb new file mode 100644 index 000000000..4f629d5a0 --- /dev/null +++ b/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/jsonl_decoder.rb @@ -0,0 +1,31 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "json" + +module ElasticGraph + module IndexerLambda + # Decodes SQS message payloads encoded as JSON Lines into ElasticGraph events. + # + # `SqsProcessor` accepts alternate decoders that implement the same + # `#decode_events(sqs_record:, body:)` contract and return event hashes. + # + # @private + class JSONLDecoder + # Decodes the given message payload into zero or more ElasticGraph events. + # + # @param sqs_record [Hash] full SQS record carrying the payload + # @param body [String] resolved SQS message body + # @return [Array] decoded ElasticGraph events + def decode_events(sqs_record:, body:) + _ = sqs_record + body.split("\n").map { |event| JSON.parse(event) } + end + end + end +end diff --git a/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb b/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb index 635b6dd0c..f3fcb34ad 100644 --- a/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb +++ b/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb @@ -8,6 +8,7 @@ require "elastic_graph/errors" require "elastic_graph/indexer/indexing_failures_error" +require "elastic_graph/indexer_lambda/jsonl_decoder" require "json" module ElasticGraph @@ -19,11 +20,13 @@ class SqsProcessor # @dynamic ignore_sqs_latency_timestamps_from_arns attr_reader :ignore_sqs_latency_timestamps_from_arns - def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, s3_client: nil) + # @param event_payload_decoder [#decode_events] decoder for resolved SQS message bodies + def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, event_payload_decoder: JSONLDecoder.new, s3_client: nil) @indexer_processor = indexer_processor @logger = logger @s3_client = s3_client @ignore_sqs_latency_timestamps_from_arns = ignore_sqs_latency_timestamps_from_arns + @event_payload_decoder = event_payload_decoder end # Processes the ElasticGraph events in the given `lambda_event`, indexing the data in the datastore. @@ -80,7 +83,10 @@ def events_from(lambda_event) sqs_metadata = sqs_metadata.except("latency_timestamps") end - parse_jsonl(record.fetch("body")).map do |event| + @event_payload_decoder.decode_events( + sqs_record: record, + body: body_from(record.fetch("body")) + ).map do |event| ElasticGraph::Support::HashUtil.deep_merge(event, sqs_metadata) end end.tap do @@ -93,11 +99,11 @@ def events_from(lambda_event) S3_OFFLOADING_INDICATOR = '["software.amazon.payloadoffloading.PayloadS3Pointer"' - def parse_jsonl(jsonl_string) - if jsonl_string.start_with?(S3_OFFLOADING_INDICATOR) - jsonl_string = get_payload_from_s3(jsonl_string) + def body_from(body) + if body.start_with?(S3_OFFLOADING_INDICATOR) + body = get_payload_from_s3(body) end - jsonl_string.split("\n").map { |event| JSON.parse(event) } + body end def extract_sqs_metadata(record) diff --git a/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/jsonl_decoder.rbs b/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/jsonl_decoder.rbs new file mode 100644 index 000000000..2672820fd --- /dev/null +++ b/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/jsonl_decoder.rbs @@ -0,0 +1,10 @@ +module ElasticGraph + module IndexerLambda + class JSONLDecoder + def decode_events: ( + sqs_record: ::Hash[::String, untyped], + body: ::String + ) -> ::Array[::Hash[::String, untyped]] + end + end +end diff --git a/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_processor.rbs b/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_processor.rbs index 65df06668..20aa5cc1a 100644 --- a/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_processor.rbs +++ b/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_processor.rbs @@ -1,10 +1,18 @@ module ElasticGraph module IndexerLambda class SqsProcessor + interface _EventPayloadDecoder + def decode_events: ( + sqs_record: ::Hash[::String, untyped], + body: ::String + ) -> ::Array[::Hash[::String, untyped]] + end + def initialize: ( Indexer::Processor, logger: ::Logger, ignore_sqs_latency_timestamps_from_arns: ::Set[::String], + ?event_payload_decoder: _EventPayloadDecoder, ?s3_client: Aws::S3::Client?, ) -> void @@ -14,6 +22,7 @@ module ElasticGraph @indexer_processor: Indexer::Processor @logger: ::Logger + @event_payload_decoder: _EventPayloadDecoder @s3_client: Aws::S3::Client? attr_reader ignore_sqs_latency_timestamps_from_arns: ::Set[::String] @@ -22,7 +31,6 @@ module ElasticGraph S3_OFFLOADING_INDICATOR: String def extract_sqs_metadata: (::Hash[String, untyped]) -> ::Hash[::String, untyped] def millis_to_iso8601: (::String) -> ::String? - def parse_jsonl: (::String) -> ::Array[::Hash[::String, untyped]] def get_payload_from_s3: (::String) -> ::String def s3_client: () -> Aws::S3::Client def format_response: ( diff --git a/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/jsonl_decoder_spec.rb b/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/jsonl_decoder_spec.rb new file mode 100644 index 000000000..6c4c88058 --- /dev/null +++ b/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/jsonl_decoder_spec.rb @@ -0,0 +1,42 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/indexer_lambda/jsonl_decoder" + +module ElasticGraph + module IndexerLambda + RSpec.describe JSONLDecoder do + describe "#decode_events" do + it "parses JSON Lines payloads into ElasticGraph events" do + decoder = described_class.new + + decoded_events = decoder.decode_events( + sqs_record: {"messageId" => "123"}, + body: %({"id":"1"}\n{"id":"2","record":{"name":"Widget"}}) + ) + + expect(decoded_events).to eq([ + {"id" => "1"}, + {"id" => "2", "record" => {"name" => "Widget"}} + ]) + end + + it "returns no events for an empty message body" do + decoder = described_class.new + + expect( + decoder.decode_events( + sqs_record: {"messageId" => "123"}, + body: "" + ) + ).to eq([]) + end + end + end + end +end diff --git a/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_processor_spec.rb b/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_processor_spec.rb index eb1343e5d..8d18b94e4 100644 --- a/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_processor_spec.rb +++ b/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_processor_spec.rb @@ -75,6 +75,28 @@ module IndexerLambda ], refresh_indices: false) end + it "decodes message bodies using the configured event payload decoder" do + event_payload_decoder = instance_double(JSONLDecoder, decode_events: [{"field1" => {}}]) + sqs_processor = build_sqs_processor(event_payload_decoder: event_payload_decoder) + + lambda_event = { + "Records" => [ + sqs_message("a", {"field1" => {}}) + ] + } + + sqs_processor.process(lambda_event) + + expect(event_payload_decoder).to have_received(:decode_events).with( + sqs_record: lambda_event.fetch("Records").first, + body: "{\"field1\":{}}" + ) + + expect(indexer_processor).to have_received(:process_returning_failures).with([ + {"field1" => {}, "message_id" => "a"} + ], refresh_indices: false) + end + it "logs the SQS message ids received in the lambda event and the `sqs_received_at` if available" do sent_timestamp_millis = "796010423456" sent_timestamp_iso8601 = "1995-03-24T02:00:23.456Z"