Verifying PubSub Services from Rails using Redis

Vipul

By Vipul

on May 9, 2015

Let's say that we have a Service that reads and writes to Redis. We have BaseRedisService for managing connection, RedisWriterService to write to Redis and RedisReaderService to read from Redis.

1require 'redis'
2
3# Base class to manage connection
4class BaseRedisService
5 REDIS_HOST = '127.0.0.1'
6
7 def connection
8   if !(defined?(@@connection) && @@connection)
9     @@connection =  Redis.new({ host: REDIS_HOST })
10   end
11   @@connection
12 end
13
14end
15
1# Class to write to Redis
2class RedisWriterService <  BaseRedisService
3 attr_reader :key, :value
4
5 def initialize key, value
6   @key, @value = key, value
7 end
8
9 def process
10   connection.set key, value
11 end
12
13end
14
1# Class to read from Redis
2class RedisReaderService < BaseRedisService
3attr_reader :key
4
5def initialize key
6  @key = key
7end
8
9def process
10  connection.get key
11end
12
13end
14

A test for the above mentioned case might look like this.

1
2require 'test_helper'
3
4class RedisPubSubServiceTest < ActiveSupport::TestCase
5
6def test_writing_and_reading_value_using_redis
7  value = 'Vipul A M'
8
9  RedisWriterService.new('name', value).process
10  assert_equal value, RedisReaderService.new('name').process
11end
12
13end
14

But now, we need to add PubSub to the service and verify that the service sends proper messages to Redis. For verifying from such a service, the service would need to listen to messages sent to Redis. Problem is that Redis listens in a loop. We would need to explicitly release control from listening and allow our tests to go ahead and verify some scenario.

Lets look at how these services would look.

1class RedisPublisherService < BaseRedisService
2attr_reader :options, :channel
3
4def initialize channel, options
5  @channel     = channel
6  @options = options
7end
8
9def process
10  connection.publish(options, channel)
11end
12
13end

and our Subscriber looks like this.

1class RedisSubscriberService < BaseRedisService
2attr_reader :channel
3
4def initialize channel
5  @channel = channel
6end
7
8def process
9  connection.subscribe(channel) do |on|
10    on.message do |channel, message|
11      puts message
12    end
13  end
14end
15
16end

Notice that we don't have control over returning value from the loop easily. Right now we just print on receiving a new message.

Now, lets start persisting our messages to some array in our Service. Also we will start exposing this from a thread variable so that it could be accessed from outside of execution of this listen loop.

1class RedisSubscriberService < BaseRedisService
2attr_reader :channel, :messages, :messages_count
3
4def initialize channel, messages_count = 5
5  @channel        = channel
6  @messages       = []
7  @messages_count = messages_count
8end
9
10def process
11  connection.subscribe(channel) do |on|
12    on.message do |channel, message|
13      messages.unshift message
14      Thread.current[:messages] = messages
15    end
16  end
17end
18
19end

We now have a way to access message state from the service to read any messages received by it. Lets say we define a new SubscriberService from a Thread, we could read messages like this.

1subscriber = Thread.new { RedisSubscriberService.new('payment_alerts').process }
2# Print first message from messages received
3puts subscriber[:messages].first
4

Armed with this, we can now define a set of Rails helpers to use in our Rails tests.

1
2module SubscriberHelpers
3THREAD_PROCESS_TIMEOUT = 0.5
4
5def setup_subscriber channel = 'test_channel'
6  @subscriber = Thread.new { RedisSubscriberService.new(channel).process }
7end
8
9def teardown_subscriber
10  @subscriber.kill
11end
12
13def with_subscriber
14  yield
15  @subscriber.join THREAD_PROCESS_TIMEOUT
16end
17
18def message_from_subscription
19  @subscriber[:messages].first
20end
21end

Notice the with_subscriber method. It executes some code passed to it, then passes method execution to the subscriber process to read any messages sent and store onto messages store.

The count of the variable THREAD_PROCESS_TIMEOUT, can be experimented to set to a value that suites the system that's being verified.

In our tests, we can now verify PubSub as-

1
2require 'test_helper'
3
4class RedisPubSubServiceTest < ActiveSupport::TestCase
5include SubscriberHelpers
6
7def setup
8  setup_subscriber
9end
10
11def teardown
12  teardown_subscriber
13end
14
15def test_writing_and_reading_back_values_from_pub_sub
16  value = 'Vipul A M'
17
18  with_subscriber do
19    RedisPublisherService.new('test_channel', value).process
20  end
21
22  assert_equal value, message_from_subscription
23end
24end

Summary

We took a look at how PubSub based services can be verified by using threads and exposing messages from them, for verification. These can be tailored to support any similar PubSub service like Redis, and can be used to easily verify values being published from our services from Rails tests.

If this blog was helpful, check out our full blog archive.

Stay up to date with our blogs.

Subscribe to receive email notifications for new blog posts.