Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion examples/utilization/falcon.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ def call(env)
# Simulate some work
sleep(rand * 0.1)

# Delay after response is sent - use to verify whether this counts toward active requests:
# if response_finished = env["rack.response_finished"]
# response_finished << proc{sleep 10}
# end

return [200, {"content-type" => "text/plain"}, ["Hello, World!"]]
end
end
Expand Down Expand Up @@ -47,7 +52,7 @@ def call(env)
[
Async::Service::Supervisor::UtilizationMonitor.new(
path: File.expand_path("utilization.shm", __dir__),
interval: 5.0
interval: 1.0
)
]
end
Expand Down
61 changes: 61 additions & 0 deletions lib/falcon/body/request_finished.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2025, by Samuel Williams.

require "protocol/http/body/wrapper"

module Falcon
# @namespace
module Body
# Wraps a response body and decrements a metric after the body is closed.
#
# Runs close on the underlying body first (which invokes rack.response_finished),
# then decrements the metric. Use this so requests_active stays elevated until
# the request is fully finished (including response_finished callbacks).
class RequestFinished < Protocol::HTTP::Body::Wrapper
# Wrap a response body with a metric. If the body is nil or empty, decrements immediately.
#
# @parameter message [Protocol::HTTP::Response] The response whose body to wrap.
# @parameter metric [Async::Utilization::Metric] The metric to decrement when the body is closed.
# @returns [Protocol::HTTP::Response] The message (modified in place).
def self.wrap(message, metric)
if body = message&.body and !body.empty?
message.body = new(body, metric)
else
metric.decrement
end

message
end

# @parameter body [Protocol::HTTP::Body::Readable] The body to wrap.
# @parameter metric [Async::Utilization::Metric] The metric to decrement on close.
def initialize(body, metric)
super(body)

@metric = metric
end

# @returns [Boolean] False, the wrapper does not support rewinding.
def rewindable?
false
end

# @returns [Boolean] False, rewinding is not supported.
def rewind
false
end

# Closes the underlying body (invoking rack.response_finished), then decrements the metric.
#
# @parameter error [Exception, nil] Optional error that caused the close.
def close(error = nil)
super

@metric&.decrement
@metric = nil
end
end
end
end
12 changes: 9 additions & 3 deletions lib/falcon/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

require "async/http/cache"
require "async/utilization"
require_relative "body/request_finished"
require_relative "middleware/verbose"
require "protocol/rack"

Expand Down Expand Up @@ -62,11 +63,16 @@ def accept(...)
end

# Handle a request and track request statistics.
#
# Uses manual increment/decrement so requests_active stays elevated until the
# response body is closed (including rack.response_finished). The
# Body::RequestFinished wrapper runs the decrement after the body closes,
# so response_finished callbacks are counted as active.
def call(...)
@requests_total_metric.increment
@requests_active_metric.track do
super
end
@requests_active_metric.increment

return Body::RequestFinished.wrap(super, @requests_active_metric)
end

# Generates a human-readable string representing the current statistics.
Expand Down
4 changes: 4 additions & 0 deletions releases.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Releases

## Unreleased

- `requests_active` is decremented after the response body is closed, including `rack.response_finished` callbacks.

## v0.55.0

- **Breaking**: Drop dependency on `async-container-supervisor`, you should migrate to `async-service-supervisor` instead.
Expand Down
93 changes: 93 additions & 0 deletions test/falcon/body/request_finished.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2026, by Samuel Williams.

require "falcon/body/request_finished"
require "protocol/http/body/buffered"
require "protocol/http/response"
require "async/utilization"
require "sus/fixtures/async"

describe Falcon::Body::RequestFinished do
include Sus::Fixtures::Async::ReactorContext

let(:registry) {Async::Utilization::Registry.new}
let(:metric) {registry.metric(:requests_active)}
let(:body) {Protocol::HTTP::Body::Buffered.wrap("Hello World")}
let(:response) {Protocol::HTTP::Response[200, {"content-type" => "text/plain"}, body]}

with ".wrap" do
with "non-empty body" do
it "wraps body and decrements metric when body is closed" do
metric.increment
expect(metric.value).to be == 1

wrapped = subject.wrap(response, metric)
expect(wrapped).to be == response
expect(response.body).to be_a(subject)
expect(metric.value).to be == 1

response.body.close
expect(metric.value).to be == 0
end

it "decrements only once on multiple close calls" do
metric.increment
subject.wrap(response, metric)

response.body.close
response.body.close

expect(metric.value).to be == 0
end
end

with "empty body" do
let(:body) {Protocol::HTTP::Body::Buffered.new}

it "decrements immediately" do
metric.increment
expect(metric.value).to be == 1

subject.wrap(response, metric)
expect(metric.value).to be == 0
expect(response.body).to be_equal(body)
end
end

with "nil body" do
let(:response) {Protocol::HTTP::Response[204, {}, nil]}

it "decrements immediately" do
metric.increment
expect(metric.value).to be == 1

subject.wrap(response, metric)
expect(metric.value).to be == 0
end
end

with "nil message" do
it "decrements immediately" do
metric.increment
subject.wrap(nil, metric)
expect(metric.value).to be == 0
end
end
end

with "#rewindable?" do
it "returns false" do
subject.wrap(response, metric)
expect(response.body).not.to be(:rewindable?)
end
end

with "#rewind" do
it "returns false" do
subject.wrap(response, metric)
expect(response.body.rewind).to be == false
end
end
end
24 changes: 24 additions & 0 deletions test/falcon/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,28 @@
expect(response.read).to be =~ /Hello World!/
end
end

with "utilization tracking" do
let(:utilization_registry) {Async::Utilization::Registry.new}

def make_server(endpoint)
::Falcon::Server.new(middleware, endpoint, utilization_registry: utilization_registry)
end

let(:app) do
lambda do |env|
[200, {}, ["OK"]]
end
end

it "decrements requests_active after response body is consumed" do
expect(utilization_registry.metric(:requests_active).value).to be == 0

response = client.get("/")
expect(response.read).to be == "OK"

expect(utilization_registry.metric(:requests_active).value).to be == 0
expect(utilization_registry.metric(:requests_total).value).to be == 1
end
end
end
Loading