Working with Redis queues using rmq library in Golang
Rmq : Redis Messaging Queue
Github : https://github.com/adjust/rmq
Section 1 : Lets look at adding new items into a Redis queue from a Go routine
The following is a producer routine in Golang. It will create a queue named “things” inside the Redis server, and store items inside it. Redis implements the queue as a double-ended Queue allowing push and pop from both ends of the same queue. The items are timestamps recorded by the Go routine.
package main
import(
"fmt"
"time"
"strconv"
"github.com/adjust/rmq"
)
func main(){
connection := rmq.OpenConnection("test_producer", "tcp","localhost:6379", 1)
things := connection.OpenQueue("things")
defer connection.Close()
for i:=0;i<100;i++{
things.Publish(time.Now().String())
if i%5==0{
fmt.Println("Published "+strconv.Itoa(i)+" items")
}
time.Sleep(time.Second)
}
if !connection.Close(){
fmt.Println("Failed to close connection")
}else{
fmt.Println("Closed connection")
}
}
In the above snippet, “test_producer” is the name of the connection initiated from the Go routine to Redis server, “localhost:6379” is the address where the Redis server is hosted (and listening for new connections), “1” is the name of the database in Redis which is underlying for the caches being used by this program.
“things” is an identifier used to recognize the list in the Go routine. The queue name used in Redis server is derived from this identifier, and is actually “rmq::queue::[things]::ready”. The string inside [ ] is the queue name from the Go routine implementing the “rmq” package.
It is possible to track these values additions in the “rmq::queue::[things]::ready” queue in the Redis server using the ‘monitor’ utility in Redis.
$ redis-cli
127.0.0.1:6379>
When logging into Redis, the session is considered a “client” of the Redis server, which communicated over TCP. A session of “redis-cli” is a connection on the localhost.
By default, the redis-cli session is launched to connect to database “0”. In the example Go routine above, the database used is “1”. To connect to database “1”, use the “SELECT <index>” command.
$ redis-cli
127.0.0.1:6379> LLEN "rmq::queue::[things]::ready"
(integer) 0
127.0.0.1:6379> select 1
OK
127.0.0.1:6379[1]> LLEN "rmq::queue::[things]::ready"
(integer) 77
127.0.0.1:6379[1]> RPOP "rmq::queue::[things]::ready"
"2017-11-05 20:33:14.704133958 +0000 UTC"
Once the user changes the current database being used, the redis-cli session prompt also changes to reflect the name of the database being used. The LLEN command returns the length of the list mentioned. The RPOP command removes 1 item from the right end of the list and returns it.
Section 2: Adding and consuming items from a buffered queue in Redis
Let's try to add an item to a queue via one Go process, while another Go process tries to read (pop) items out from the queue. The 2 processes are separate and identify as 2 different clients to the Redis server instance. Using the same producer process from above, let's write a consumer process.
Note, the items are added into the queue as "strings" converted from time.Time in producer and back to time.Time in consumer.
$ cat producer.go
package main
import(
"fmt"
"time"
"github.com/adjust/rmq"
)
type Consumer struct{}
func main(){
connection := rmq.OpenConnection("test_consumer", "tcp", "localhost:6379", 1)
things := connection.OpenQueue("things")
defer connection.Close()
things.StartConsuming(10, time.Millisecond)
things.AddConsumer("consumer", &Consumer{})
select {} // block forever ;
}
var layout string = "2006-01-02 15:04:05.999999999 -0700 UTC"
func (consumer *Consumer) Consume(delivery rmq.Delivery) {
if t,err := time.Parse(layout, delivery.Payload()); err!=nil{
fmt.Println("Error = ", err)
}else{
fmt.Println(time.Since(t))
}
time.Sleep(time.Microsecond)
delivery.Ack()
}
Using these producer and consumer snippets, I am trying to get the difference between the timestamp when the message was produced to the time when the message was processed at the consumer, effectively yielding the delay in storing and retrieving an element from a queue in Redis.
For records, I get the following numbers on an Amazon EC2 instance, for the delay in writing and reading items from a queue in Redis:
$ go run consumer01.go
1.254667ms
187.863µs
295.619µs
953.573µs
492.762µs
401.866µs
863.34µs
293.802µs
462.159µs
458.827µs
774.988µs
895.418µs
482.816µs
212.149µs
768.191µs
309.082µs
729.349µs
480.413µs
751.638µs
772.174µs
633.271µs
1.141673ms
335.671µs
284.16µs
Now, let us look at the messages exchanged during these transactions made by the producer and the consumer on Redis:
1509922520.935075 [1 127.0.0.1:46996] "LLEN" "rmq::queue::[things]::ready"
1509922520.935968 [1 127.0.0.1:46998] "SET" "rmq::connection::test_producer-qVWom2::heartbeat" "1" "EX" "60"
1509922520.936278 [1 127.0.0.1:46998] "LPUSH" "rmq::queue::[things]::ready" "2017-11-05 22:55:20.936248195 +0000 UTC"
1509922520.936403 [1 127.0.0.1:46996] "LLEN" "rmq::queue::[things]::ready"
1509922520.936497 [1 127.0.0.1:46996] "RPOPLPUSH" "rmq::queue::[things]::ready" "rmq::connection::test_consumer-nYS5jf::queue::[things]::unacked"
1509922520.936609 [1 127.0.0.1:46996] "LLEN" "rmq::queue::[things]::ready"
1509922520.936818 [1 127.0.0.1:46996] "LREM" "rmq::connection::test_consumer-nYS5jf::queue::[things]::unacked" "1" "2017-11-05 22:55:20.936248195 +0000 UTC"
1509922520.937825 [1 127.0.0.1:46996] "LLEN" "rmq::queue::[things]::ready"
The LLEN queries are from the consumer process. The producer maintains a heartbeat. The double-ended queue is maintained by pushing in elements from the left and popping from the right. We will look into the details of each operation and how they correspond to the commands run from the Go process, in a later section.
1509922520.935968 [1 127.0.0.1:46998] "SET" "rmq::connection::test_producer-qVWom2::heartbeat" "1" "EX" "60"
1509922520.936278 [1 127.0.0.1:46998] "LPUSH" "rmq::queue::[things]::ready" "2017-11-05 22:55:20.936248195 +0000 UTC"
1509922520.936403 [1 127.0.0.1:46996] "LLEN" "rmq::queue::[things]::ready"
1509922520.936497 [1 127.0.0.1:46996] "RPOPLPUSH" "rmq::queue::[things]::ready" "rmq::connection::test_consumer-nYS5jf::queue::[things]::unacked"
1509922520.936609 [1 127.0.0.1:46996] "LLEN" "rmq::queue::[things]::ready"
1509922520.936818 [1 127.0.0.1:46996] "LREM" "rmq::connection::test_consumer-nYS5jf::queue::[things]::unacked" "1" "2017-11-05 22:55:20.936248195 +0000 UTC"
1509922520.937825 [1 127.0.0.1:46996] "LLEN" "rmq::queue::[things]::ready"
The LLEN queries are from the consumer process. The producer maintains a heartbeat. The double-ended queue is maintained by pushing in elements from the left and popping from the right. We will look into the details of each operation and how they correspond to the commands run from the Go process, in a later section.
Comments
Post a Comment