Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
/doc/
/pkg/
/spec/reports/
/spec/examples.txt
/tmp/
1 change: 1 addition & 0 deletions .ruby-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2.5.0
5 changes: 2 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
language: ruby
rvm:
- 2.1.9
- 2.2.5
- 2.3.1
- 2.4.1
- 2.5.0
before_install: gem update bundler
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Changelog
All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- Ability to specify `last_id` of a channel
- Get access to `message_id` if specified in the callback
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ source 'https://rubygems.org'

# Specify your gem's dependencies in message_bus-client.gemspec
gemspec

gem "pry-byebug"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ The API is mostly equivalent with the JavaScript client:

```ruby
client = MessageBus::Client.new('http://chat.samsaffron.com/')
client.subscribe('/message') do |payload|
client.subscribe('/message') do |payload, message_id|
# Do stuff
end

Expand Down
15 changes: 12 additions & 3 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,18 @@ task :server do
end

at_exit do
$stderr.puts "Killing pid #{pid}"
Process.kill('KILL', pid)
Process.wait(pid)
if RbConfig::CONFIG["host_os"] =~ /mswin|mingw/
$stderr.puts "Killing pid #{pid}"
Process.kill('KILL', pid)
Process.wait(pid)
else
child_pid = `pgrep -P #{pid}`.to_i
pids = [child_pid, pid]
pids.each do |pid|
$stderr.puts "Killing pid #{pid}"
Process.kill('KILL', pid)
end
end
end

sleep 3
Expand Down
1 change: 1 addition & 0 deletions lib/message_bus/client.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'excon'
require 'json'
require "securerandom"

require 'message_bus/client/version'
require 'message_bus/client/configuration'
Expand Down
23 changes: 13 additions & 10 deletions lib/message_bus/client/message_handler.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
module MessageBus::Client::MessageHandler
SubscribedChannel = Struct.new(:callbacks, :last_id) do
def initialize(last_id = -1)
def initialize(last_id)
self.callbacks = []
self.last_id = last_id
end

def callback(payload)
def callback(payload, message_id)
callbacks.each do |callback|
callback.call(payload)
callback.call(payload, message_id)
end
end
end
Expand All @@ -20,14 +20,16 @@ def initialize(base_url)

@pending_messages = []
@subscribed_channels = {}
@subscribed_channels.default_proc = proc do |hash, key|
hash[key] = SubscribedChannel.new
end
@payload = String.new
end

def subscribe(channel, &callback)
@subscribed_channels[channel].callbacks << callback
def subscribe(channel, last_id=-1, &callback)
subscribed_channel = @subscribed_channels[channel]
if subscribed_channel.nil?
subscribed_channel = @subscribed_channels[channel] =
SubscribedChannel.new(last_id)
end
subscribed_channel.callbacks << callback
end

def unsubscribe
Expand Down Expand Up @@ -80,8 +82,9 @@ def handle_message(message)
subscription = @subscribed_channels[message['channel']]
return unless subscription

subscription.last_id = message['message_id']
subscription.callback(message['data'])
message_id = message['message_id']
subscription.last_id = message_id
subscription.callback(message['data'], message_id)
end

def handle_status_message(message)
Expand Down
65 changes: 0 additions & 65 deletions spec/coverage_helper.rb

This file was deleted.

20 changes: 18 additions & 2 deletions spec/message_bus/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def write_message(message, user = 'message_bus-client')
end
end

it 'receives messages' do
it 'receives new messages by default (last_id of -1)' do
subject.start

text = "Hello World! #{Random.rand}"
Expand Down Expand Up @@ -72,7 +72,7 @@ def write_message(message, user = 'message_bus-client')
subject.stop
end

it 'receives messages' do
it 'receives new messages by default (last_id of -1)' do
subject.start

text = "Hello World! #{Random.rand}"
Expand Down Expand Up @@ -113,4 +113,20 @@ def write_message(message, user = 'message_bus-client')
subject.resume
expect(result).to eq(true)
end

it 'allows subscription exposing the message_id' do
subject.start

text = "Hello World! #{Random.rand}"
result = false
subject.subscribe('/message') do |payload, message_id|
result = true if payload['data'] == text
expect(message_id).to be_an Integer
end

until result
write_message(text) # Keep writing because the message bus might not have started.
sleep(1)
end
end
end
2 changes: 1 addition & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
$LOAD_PATH.unshift File.expand_path('../../lib', __FILE__)
require 'coverage_helper'
require 'message_bus/client'
require "pry-byebug"

# This file was generated by the `rspec --init` command. Conventionally, all
# specs live under a `spec` directory, which RSpec adds to the `$LOAD_PATH`.
Expand Down