Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ jobs:
# kotlin cross test are failing -> see THRIFT-5879
server_lang: ['java', 'go', 'rs', 'cpp', 'py', 'rb', 'php', 'nodejs', 'nodets']
# we always use comma join as many client langs as possible, to reduce the number of jobs
client_lang: ['java,kotlin', 'go,rs,cpp,py,php,nodejs,nodets', 'rb']
client_lang: ['java,kotlin', 'go,rs,cpp,py,php,nodejs,nodets,rb']
fail-fast: false
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
Expand Down
17 changes: 13 additions & 4 deletions lib/rb/lib/thrift/multiplexed_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,26 @@ module Thrift
class MultiplexedProcessor
def initialize
@actual_processors = {}
@default_processor = nil
end

def register_processor(service_name, processor)
@actual_processors[service_name] = processor
end

def register_default(processor)
@default_processor = processor
end

def process(iprot, oprot)
name, type, seqid = iprot.read_message_begin
check_type(type)
check_separator(name)
service_name, method = name.split(':')
if name.count(':') < 1
check_default_processor(name)
return @default_processor.process(StoredMessageProtocol.new(iprot, [name, type, seqid]), oprot)
end

service_name, method = name.split(':', 2)
processor(service_name).process(StoredMessageProtocol.new(iprot, [method, type, seqid]), oprot)
end

Expand All @@ -54,8 +63,8 @@ def check_type(type)
end
end

def check_separator(name)
if name.count(':') < 1
def check_default_processor(name)
unless @default_processor
raise Thrift::Exception.new("Service name not found in message name: #{name}. Did you forget to use a Thrift::Protocol::MultiplexedProtocol in your client?")
end
end
Expand Down
75 changes: 75 additions & 0 deletions lib/rb/spec/multiplexed_processor_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# frozen_string_literal: true
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

require 'spec_helper'

describe Thrift::MultiplexedProcessor do
before(:each) do
@processor = Thrift::MultiplexedProcessor.new
@iprot = double('MockInputProtocol')
@oprot = double('MockOutputProtocol')
end

it 'dispatches multiplexed calls to the registered service processor' do
actual_processor = double('ActualProcessor')
@processor.register_processor('ThriftTest', actual_processor)

expect(@iprot).to receive(:read_message_begin).and_return(['ThriftTest:testVoid', Thrift::MessageTypes::CALL, 1])
expect(actual_processor).to receive(:process) do |stored_protocol, oprot|
expect(stored_protocol.read_message_begin).to eq(['testVoid', Thrift::MessageTypes::CALL, 1])
expect(oprot).to eq(@oprot)
true
end

expect(@processor.process(@iprot, @oprot)).to eq(true)
end

it 'dispatches non-multiplexed calls to the default processor' do
default_processor = double('DefaultProcessor')
@processor.register_default(default_processor)

expect(@iprot).to receive(:read_message_begin).and_return(['testVoid', Thrift::MessageTypes::CALL, 2])
expect(default_processor).to receive(:process) do |stored_protocol, oprot|
expect(stored_protocol.read_message_begin).to eq(['testVoid', Thrift::MessageTypes::CALL, 2])
expect(oprot).to eq(@oprot)
true
end

expect(@processor.process(@iprot, @oprot)).to eq(true)
end

it 'raises for non-multiplexed calls when no default processor is registered' do
expect(@iprot).to receive(:read_message_begin).and_return(['testVoid', Thrift::MessageTypes::CALL, 3])

expect { @processor.process(@iprot, @oprot) }.to raise_error(
Thrift::Exception,
'Service name not found in message name: testVoid. Did you forget to use a Thrift::Protocol::MultiplexedProtocol in your client?'
)
end

it 'raises for unknown multiplexed service names' do
expect(@iprot).to receive(:read_message_begin).and_return(['Missing:testVoid', Thrift::MessageTypes::CALL, 4])

expect { @processor.process(@iprot, @oprot) }.to raise_error(
Thrift::Exception,
'Service name not found: Missing. Did you forget to call Thrift::MultiplexedProcessor#register_processor?'
)
end
end
28 changes: 27 additions & 1 deletion test/rb/integration/TestClient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
require 'test_helper'
require 'thrift'
require 'thrift_test'
require 'second_service'

$domain_socket = nil
$host = "localhost"
Expand All @@ -41,7 +42,7 @@
puts "\t--domain-socket arg (=) \t Unix domain socket path"
puts "\t--host arg (=localhost) \t Host to connect \t not valid with domain-socket"
puts "\t--port arg (=9090) \t Port number to listen \t not valid with domain-socket"
puts "\t--protocol arg (=binary) \t protocol: accel, binary, compact, json, header"
puts "\t--protocol arg (=binary) \t protocol: accel, binary, compact, json, header, multi, multic, multih, multij"
puts "\t--ssl \t use ssl \t not valid with domain-socket"
puts "\t--transport arg (=buffered) transport: buffered, framed, header, http"
exit
Expand Down Expand Up @@ -94,6 +95,7 @@ def setup
raise 'Unknown transport type'
end

@protocol2 = nil
if $protocolType == "binary"
@protocol = Thrift::BinaryProtocol.new(transportFactory)
elsif $protocolType == "compact"
Expand All @@ -105,10 +107,27 @@ def setup
elsif $protocolType == "header"
# HeaderProtocol wraps its own transport, so pass the selected transport
@protocol = Thrift::HeaderProtocol.new(transportFactory)
elsif $protocolType == "multi"
protocol = Thrift::BinaryProtocol.new(transportFactory)
@protocol = Thrift::MultiplexedProtocol.new(protocol, 'ThriftTest')
@protocol2 = Thrift::MultiplexedProtocol.new(protocol, 'SecondService')
elsif $protocolType == "multic"
protocol = Thrift::CompactProtocol.new(transportFactory)
@protocol = Thrift::MultiplexedProtocol.new(protocol, 'ThriftTest')
@protocol2 = Thrift::MultiplexedProtocol.new(protocol, 'SecondService')
elsif $protocolType == "multih"
protocol = Thrift::HeaderProtocol.new(transportFactory)
@protocol = Thrift::MultiplexedProtocol.new(protocol, 'ThriftTest')
@protocol2 = Thrift::MultiplexedProtocol.new(protocol, 'SecondService')
elsif $protocolType == "multij"
protocol = Thrift::JsonProtocol.new(transportFactory)
@protocol = Thrift::MultiplexedProtocol.new(protocol, 'ThriftTest')
@protocol2 = Thrift::MultiplexedProtocol.new(protocol, 'SecondService')
else
raise 'Unknown protocol type'
end
@client = Thrift::Test::ThriftTest::Client.new(@protocol)
@client2 = Thrift::Test::SecondService::Client.new(@protocol2) if @protocol2
@socket.open
end
end
Expand Down Expand Up @@ -161,6 +180,13 @@ def test_string
assert_equal(test_string, result_string.force_encoding(Encoding::UTF_8))
end

def test_multiplexed
return unless @client2

p 'test_multiplexed'
assert_equal('testString("foobar")', @client2.secondtestString('foobar'))
end

def test_bool
p 'test_bool'
assert_equal(@client.testBool(true), true)
Expand Down
29 changes: 27 additions & 2 deletions test/rb/integration/TestServer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
require 'thrift'
require 'thrift_test'
require 'thrift_test_types'
require 'second_service'
require 'logger'
require 'optparse'

Expand Down Expand Up @@ -107,6 +108,12 @@ def testOneway(arg0)

end

class SecondHandler
def secondtestString(argument)
"testString(\"#{argument}\")"
end
end

options = {
domain_socket: nil,
port: 9090,
Expand Down Expand Up @@ -138,7 +145,7 @@ def testOneway(arg0)
end
opts.on('--domain-socket=PATH', String, 'Unix domain socket path') { |v| options[:domain_socket] = v }
opts.on('--port=PORT', Integer, 'Port number to listen (not valid with domain-socket)') { |v| options[:port] = v }
opts.on('--protocol=PROTO', String, 'protocol: accel, binary, compact, json, header') { |v| options[:protocol] = v }
opts.on('--protocol=PROTO', String, 'protocol: accel, binary, compact, json, header, multi, multic, multih, multij') { |v| options[:protocol] = v }
opts.on('--ssl', 'use ssl (not valid with domain-socket)') { options[:ssl] = true }
opts.on('--transport=TRANSPORT', String, 'transport: buffered, framed, header') { |v| options[:transport] = v }
opts.on('--server-type=TYPE', String, 'type of server: simple, thread-pool, threaded, nonblocking') { |v| options[:server_type] = v }
Expand Down Expand Up @@ -170,7 +177,10 @@ def testOneway(arg0)
exit 1
end

protocol_factory = case options[:protocol].to_s.strip
protocol = options[:protocol].to_s.strip
multiplexed = protocol.start_with?('multi')

protocol_factory = case protocol
when '', 'binary'
Thrift::BinaryProtocolFactory.new
when 'compact'
Expand All @@ -181,6 +191,14 @@ def testOneway(arg0)
Thrift::BinaryProtocolAcceleratedFactory.new
when 'header'
Thrift::HeaderProtocolFactory.new
when 'multi'
Thrift::BinaryProtocolFactory.new
when 'multic'
Thrift::CompactProtocolFactory.new
when 'multih'
Thrift::HeaderProtocolFactory.new
when 'multij'
Thrift::JsonProtocolFactory.new
else
raise "Unknown protocol type '#{options[:protocol]}'"
end
Expand All @@ -202,6 +220,13 @@ def testOneway(arg0)

handler = SimpleHandler.new
processor = Thrift::Test::ThriftTest::Processor.new(handler)
if multiplexed
multiplexed_processor = Thrift::MultiplexedProcessor.new
multiplexed_processor.register_default(processor)
multiplexed_processor.register_processor('ThriftTest', processor)
multiplexed_processor.register_processor('SecondService', Thrift::Test::SecondService::Processor.new(SecondHandler.new))
processor = multiplexed_processor
end

transport = nil
if options[:domain_socket].to_s.strip.empty?
Expand Down
6 changes: 5 additions & 1 deletion test/tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,11 @@
"binary:accel",
"compact",
"json",
"header"
"header",
"multi",
"multic",
"multih",
"multij"
],
"workdir": "rb/gen-rb"
},
Expand Down
Loading