diff --git a/examples/utilization/falcon.rb b/examples/utilization/falcon.rb index 2382197..c914dda 100644 --- a/examples/utilization/falcon.rb +++ b/examples/utilization/falcon.rb @@ -15,6 +15,11 @@ def call(env) # Simulate some work sleep(rand * 0.1) + # Delay after response is sent - use to verify whether this counts toward active requests: + # if response_finished = env["rack.response_finished"] + # response_finished << proc{sleep 10} + # end + return [200, {"content-type" => "text/plain"}, ["Hello, World!"]] end end @@ -47,7 +52,7 @@ def call(env) [ Async::Service::Supervisor::UtilizationMonitor.new( path: File.expand_path("utilization.shm", __dir__), - interval: 5.0 + interval: 1.0 ) ] end diff --git a/lib/falcon/body/request_finished.rb b/lib/falcon/body/request_finished.rb new file mode 100644 index 0000000..dbc2aa1 --- /dev/null +++ b/lib/falcon/body/request_finished.rb @@ -0,0 +1,61 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "protocol/http/body/wrapper" + +module Falcon + # @namespace + module Body + # Wraps a response body and decrements a metric after the body is closed. + # + # Runs close on the underlying body first (which invokes rack.response_finished), + # then decrements the metric. Use this so requests_active stays elevated until + # the request is fully finished (including response_finished callbacks). + class RequestFinished < Protocol::HTTP::Body::Wrapper + # Wrap a response body with a metric. If the body is nil or empty, decrements immediately. + # + # @parameter message [Protocol::HTTP::Response] The response whose body to wrap. + # @parameter metric [Async::Utilization::Metric] The metric to decrement when the body is closed. + # @returns [Protocol::HTTP::Response] The message (modified in place). + def self.wrap(message, metric) + if body = message&.body and !body.empty? + message.body = new(body, metric) + else + metric.decrement + end + + message + end + + # @parameter body [Protocol::HTTP::Body::Readable] The body to wrap. + # @parameter metric [Async::Utilization::Metric] The metric to decrement on close. + def initialize(body, metric) + super(body) + + @metric = metric + end + + # @returns [Boolean] False, the wrapper does not support rewinding. + def rewindable? + false + end + + # @returns [Boolean] False, rewinding is not supported. + def rewind + false + end + + # Closes the underlying body (invoking rack.response_finished), then decrements the metric. + # + # @parameter error [Exception, nil] Optional error that caused the close. + def close(error = nil) + super + + @metric&.decrement + @metric = nil + end + end + end +end diff --git a/lib/falcon/server.rb b/lib/falcon/server.rb index 8772504..d034342 100644 --- a/lib/falcon/server.rb +++ b/lib/falcon/server.rb @@ -10,6 +10,7 @@ require "async/http/cache" require "async/utilization" +require_relative "body/request_finished" require_relative "middleware/verbose" require "protocol/rack" @@ -62,11 +63,16 @@ def accept(...) end # Handle a request and track request statistics. + # + # Uses manual increment/decrement so requests_active stays elevated until the + # response body is closed (including rack.response_finished). The + # Body::RequestFinished wrapper runs the decrement after the body closes, + # so response_finished callbacks are counted as active. def call(...) @requests_total_metric.increment - @requests_active_metric.track do - super - end + @requests_active_metric.increment + + return Body::RequestFinished.wrap(super, @requests_active_metric) end # Generates a human-readable string representing the current statistics. diff --git a/releases.md b/releases.md index 8769407..250d87e 100644 --- a/releases.md +++ b/releases.md @@ -1,5 +1,9 @@ # Releases +## Unreleased + + - `requests_active` is decremented after the response body is closed, including `rack.response_finished` callbacks. + ## v0.55.0 - **Breaking**: Drop dependency on `async-container-supervisor`, you should migrate to `async-service-supervisor` instead. diff --git a/test/falcon/body/request_finished.rb b/test/falcon/body/request_finished.rb new file mode 100644 index 0000000..f000e04 --- /dev/null +++ b/test/falcon/body/request_finished.rb @@ -0,0 +1,93 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Samuel Williams. + +require "falcon/body/request_finished" +require "protocol/http/body/buffered" +require "protocol/http/response" +require "async/utilization" +require "sus/fixtures/async" + +describe Falcon::Body::RequestFinished do + include Sus::Fixtures::Async::ReactorContext + + let(:registry) {Async::Utilization::Registry.new} + let(:metric) {registry.metric(:requests_active)} + let(:body) {Protocol::HTTP::Body::Buffered.wrap("Hello World")} + let(:response) {Protocol::HTTP::Response[200, {"content-type" => "text/plain"}, body]} + + with ".wrap" do + with "non-empty body" do + it "wraps body and decrements metric when body is closed" do + metric.increment + expect(metric.value).to be == 1 + + wrapped = subject.wrap(response, metric) + expect(wrapped).to be == response + expect(response.body).to be_a(subject) + expect(metric.value).to be == 1 + + response.body.close + expect(metric.value).to be == 0 + end + + it "decrements only once on multiple close calls" do + metric.increment + subject.wrap(response, metric) + + response.body.close + response.body.close + + expect(metric.value).to be == 0 + end + end + + with "empty body" do + let(:body) {Protocol::HTTP::Body::Buffered.new} + + it "decrements immediately" do + metric.increment + expect(metric.value).to be == 1 + + subject.wrap(response, metric) + expect(metric.value).to be == 0 + expect(response.body).to be_equal(body) + end + end + + with "nil body" do + let(:response) {Protocol::HTTP::Response[204, {}, nil]} + + it "decrements immediately" do + metric.increment + expect(metric.value).to be == 1 + + subject.wrap(response, metric) + expect(metric.value).to be == 0 + end + end + + with "nil message" do + it "decrements immediately" do + metric.increment + subject.wrap(nil, metric) + expect(metric.value).to be == 0 + end + end + end + + with "#rewindable?" do + it "returns false" do + subject.wrap(response, metric) + expect(response.body).not.to be(:rewindable?) + end + end + + with "#rewind" do + it "returns false" do + subject.wrap(response, metric) + expect(response.body.rewind).to be == false + end + end +end diff --git a/test/falcon/server.rb b/test/falcon/server.rb index bae2506..be3f643 100644 --- a/test/falcon/server.rb +++ b/test/falcon/server.rb @@ -125,4 +125,28 @@ expect(response.read).to be =~ /Hello World!/ end end + + with "utilization tracking" do + let(:utilization_registry) {Async::Utilization::Registry.new} + + def make_server(endpoint) + ::Falcon::Server.new(middleware, endpoint, utilization_registry: utilization_registry) + end + + let(:app) do + lambda do |env| + [200, {}, ["OK"]] + end + end + + it "decrements requests_active after response body is consumed" do + expect(utilization_registry.metric(:requests_active).value).to be == 0 + + response = client.get("/") + expect(response.read).to be == "OK" + + expect(utilization_registry.metric(:requests_active).value).to be == 0 + expect(utilization_registry.metric(:requests_total).value).to be == 1 + end + end end