Priority queue with public clouds, Dapr and Go
PART 1: THE CHALLENGE
Message queues. A wonderful approach to connecting two or more services leveraging scaling and resiliency.
One of the applications serving client requests (“frontend”) can offload heavy tasks to some more powerful machines for further long processing. Those “Big Servers” also have capacity limitations, and considering the time-consuming nature of such tasks, they all can not fit into the fleet of “Big Servers” at once.
That’s when we need some kind of call center for applications when someone would quickly respond with “Your request is very important to us. We will call you back with the results in a while. Keep in touch.”. And in the background would feed tasks one by one across the “Big Servers” pool.
Here my story begins.
It’s about one monolith application migration to a cloud environment when the customer asked to “add some horizontal scalability”.
Don’t want to bore you with details. Just picture a simple API service accepting client requests to execute a very long task. And those spoiled users of this service do not want to wait the whole period for the task to be processed, but just to click a button and run off for another round of coffee with colleagues. Well, this is understandable. Who wants to wait while the robot works up for about an hour? Yup, me neither.
Long story short, the monolith has been decoupled into two services and linked with a queue (tada!). We all love microservices, right?
The customer then added one more ask: “Would be great if we could deliver this to multiple clouds at once”.
Here’s where Dapr has joined the project and took its place as an abstraction layer between the application and a cloud queue. This article is not about Dapr, so I can only add here it’s a great tool helping to switch between different cloud components (such as AWS SQS or GCP Pub/Sub) with no code changes in the application itself. Some more brief architectural descriptions of the Dapr you will find further down.
The Dapr has very interesting Declarative subscription mode to work with Pub/Sub queues when you don’t need to poll anything from the application side at all. You only need to listen on an HTTP/gRPC endpoint and Dapr will deliver every message to that endpoint and even wait while your application process it and then drop the message from the queue or return it for a retry if processing has failed.
This mode allowed us to minimize code changes in the application, especially considering its legacy nature (it’s Perl, aha).
Since we always claim “we’re agile!”, the customer added another ask: “Would be great to give some users capability to run their tasks with higher priority”.
Easy-peasy! We can just enable Dapr to listen to multiple queues and deliver messages according to the queue priority. What a nice tool….that does not support such a wonderful feature. :(
And none of the existing queueing systems in famous public clouds like AWS/GCP/Azure do not support message prioritization. Some support exists in the RabbitMQ but this was not an option for the project needs because not all cloud environments have Rabbit MQ out of the box and managed service was one of the requirements.
Well, at the end of the day, we got the following requirements list:
- API and “Big Servers” should be decoupled and scaled separately
- Queueing solution should be “cloud agnostic” with support of easy “driver” (i.e. queue type) change
- “Big Servers” should consume tasks from multiple queues considering priority (tasks from “high” priority are VIP citizens and should be served before others)
PART 2: DIY OR DIE
Do It Yourself!. It is always a great decision to reinvent the wheel once again.
Since Dapr was already integrated into all code pieces touching cooperation of the application and cloud resources, there was no way to ditch it for good and rewrite the logic everywhere.
What if Dapr Subscriber the component could be replaced with a custom one? The custom-made service could consume messages posted by Dapr Publisher on the other end and then forward it to the application the same way as Dapr does, keeping the message format Dapr uses?
And inside this custom application, a sophisticated very simple prioritization algorithm can be implemented to push
through messages arriving from a higher priority queue over the “normal” ones.
The queueing app structure could be the following:
- Poll messages across all queues in the “stack” starting from the highest priority queue and moving toward the lowest as no messages left on the current “level”
- Once a message is received, wrap it into Dapr message format and forward it to the application via HTTP, wait until the application process it and return a result
- Delete the message if the response from the application is “success”, otherwise - return the message to the queue for a re-try on another “Big Server” node
Looks pretty simple, right? :)
The only thing to keep in mind is cloud agnosticism when implementing the subscriber,
i.e. define an abstraction layer where any specific implementation could be injected to support a particular cloud service.
Most of the modern development platforms are supported by public cloud providers, so we can pick any SDK and implement
support for any of the queue services, i.e. create multiple “drivers” just like in Dapr and then switch between them in
the configuration.
PART 3: DESIGN FIRST
Now it’s time to code something :)
Where to start? The design of course!
The subscriber service should perform the following operations:
- Poll several message queues for messages considering the priority of each queue
- Forward each message to the application for processing and wait for the result
- Drop the message from the queue in case of success and return it to the queue for retry if else
The first guy coming to the room is the Queue
interface!
It is responsible for managing messages (receiving/deleting/returning).
type Queue interface {
QueueId() string
ReceiveMessage() (Message, error)
DeleteMessage(m Message) error
ReturnMessage(m Message) error
}
And Message
in this case is another interface that will wrap every queue-specific message for easy processing.
For Message
it’s important to know its unique Id
, which queue it belongs to (QueueId
), and the actual Data
to be
forwarded to the application for further processing. Each specific implementation will be different from queue to queue
since they all use different approaches to deliver messages.
type Message interface {
Id() string
QueueId() string
Data() []byte
}
These two interfaces live in queue
package and all implementations should also be law-abiding citizens of this package.
How a message received from the queue should be delivered to your application?
It’s a task for Processor
, one more handy interface!
type Processor interface {
Run(ctx context.Context, msg queue.Message, trans transform.TransformationFunc)
}
It’s on the Processor
to identify the data inside the message and decide how to parse it (or even not to) and where to forward it.
Btw, this guy is also settled in a separate
process
package along with all other implementations.
Take a closer look at the weird parameter trans
of type transform.TransformationFunc
.
This is where we can apply a data transformation function which can help to adjust data from the queue before posting it
to the application endpoint. This helper is used further in the article to adapt to the Dapr data format.
Well, all the vital components are here! Now it’s time to glue all this stuff using a single “director” that will orchestrate all the jobs for us.
Meet the Poller
function living in the poll
package!
type Poller func(ctx context.Context, wg *sync.WaitGroup, queues []queue.Queue, proc process.Processor, trans transform.TransformationFunc)
This function is designed to be running in multiple instances as goroutines
so
you can scale polling to required concurrency (i.e. throughput) by simply adjusting the number of concurrent goroutines
.
Keep in mind, for proper scalability each Poller
should consume only one message at once.
With this approach, the subscriber service can effectively control how many tasks the ”Big Server” can handle simultaneously to not overwhelm pricey resources.
PART 4: IMPLEMENT OR QUIT
What’s next? Yup, you’re right again - the subscriber service can not work with just interfaces.
This article will cover some parts of the implementation. Complete code of the Priority Pub/Sub
you can find in my repository:
https://github.com/burmuley/priority-pubsub
.
Here we only play with Dapr running on top of the AWS cloud.
The AWS SQS Queue
implementation is not
very important for this article. It’s a simple set of AWS API calls with error handling. You can check the source code
in the repository at
https://github.com/Burmuley/priority-pubsub/blob/main/queue/aws_sqs.go
The more important thing is how Dapr uses AWS services for messaging.
Dapr Pub/Sub messaging architecture
First of all, for message publishing Dapr targets an SNS topic (named the same way as the SQS queue). And it assumes that the SQS queue is subscribed to receive messages from the SNS topic.
That way, when you publish a message via Dapr API:
- Dapr wraps the message data into Cloud Events envelope (by default)
- Dapr Publisher pushes the wrapped message into the SNS topic, and this wraps the message into another SNS envelope
- AWS then forwards this message to the subscriber - SQS queue
- The Dapr Subscriber then polls the message from SQS, pulls original message data from the SNS envelope, and passes the original data to the application HTTP endpoint
Note: I could not get why Dapr developers decided to involve SNS instead of simply pushing messages to SQS, maybe it’s the design constraints. If you know for sure the reason - enlighten me in the comments to this article. Thanks! :)
In other words, after the Priority PubSub
service got the message published by Dapr from SQS, it needs to peel off the SNS
skin and pull out the original message before sending it to the application.
HTTP message processor
Note: Full code you can find in this file: process/http.go
The Http
processor is represented with the following structure:
type Http struct {
config HttpConfig
}
And HttpConfig
structure contains everything we need for successful communication with the application:
type HttpRawConfig struct {
SubscriberUrl string
Method string
Timeout int
FatalCodes []int
ContentType string `koanf:"content_type"`
}
Check out the Processor
interface implementation for Http
:
func (r *Http) Run(ctx context.Context, msg queue.Message, trans transform.TransformationFunc) error {
resChan := make(chan error)
data := msg.Data()
{
var err error
if trans != nil {
data, err = trans(data)
if err != nil {
return fmt.Errorf("%w: %w", ErrFatal, err)
}
}
}
go func() {
client := http.Client{
Timeout: time.Duration(r.config.Timeout) * time.Second,
}
req, err := http.NewRequestWithContext(ctx, r.config.Method, r.config.SubscriberUrl, bytes.NewBuffer(data))
if err != nil {
resChan <- fmt.Errorf("%w: %q", ErrFatal, err.Error())
close(resChan)
return
}
resp, err := client.Do(req)
if err != nil {
resChan <- fmt.Errorf("%w: %q", ErrFail, err.Error())
close(resChan)
return
}
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
resChan <- fmt.Errorf("%w: task execution has failed", ErrFail)
close(resChan)
return
}
resChan <- nil
close(resChan)
}()
for {
select {
case res := <-resChan:
return res
case <-ctx.Done():
return ctx.Err()
default:
time.Sleep(5 * time.Second)
}
}
}
At the top of the function, the resChan
channel is defined and then all the HTTP communications happen in another goroutine
.
This is done to be able to monitor for external signals from the context
that is passed to the Run
function.
With the context
it is possible to cancel the HTTP request and the Run
function in one shot, for example in the event
of the service stop.
Same time, this resChan
is used to receive an error
after HTTP communication is done, which indicates the status
of the task (whether it was finished successfully or failed and we need to retry the task once again).
You might have noticed a weird field FatalCodes
in the HttpConfig
structure. This is the list of the HTTP response codes the application can use when the message should not be returned to the queue for another retry.
For example, this approach can be used to limit the number of retries for a particular message.
Another interesting point here is the trans
parameter
(of type transform.TransformationFunc
)
which is the main player here when we need to process Dapr messages wrapped into several envelopes.
As I mentioned before, we need to pull off the original message from SNS “wrapper”.
And the TransformationFunc
implementation transform.DaprAws
does the perfect job here easy and simple way.
func DaprAws(b []byte) ([]byte, error) {
var snsEnvelope struct {
Message string `json:"Message"`
}
err := json.Unmarshal(b, &snsEnvelope)
if err != nil {
return nil, fmt.Errorf("data transformation error: %w", err)
}
return []byte(snsEnvelope.Message), nil
}
The beauty of the TransformationFunc
solution is that we can switch it on or off right in the Priority PubSub
service configuration file.
As you can see, the Processor
interface gives you complete control and flexibility on how you want to process the message.
You can even send the message to a printer and then await a call on PBX on a special number defined in the message for the response.
Or translate it into Morse code and pass it to the Moon and wait for aliens to arrive (but I guess for this to work there are no meaningful message processing timeouts in any of the Pub/Sub queues;) ).
Simple polling algorithm
Now it’s time to talk about the Poller
function, the glue layer for Queue
and Processor
.
Just to recap the function signature:
type Poller func(ctx context.Context, wg *sync.WaitGroup, queues []queue.Queue, proc process.Processor, trans transform.TransformationFunc)
Again, for the sake of brevity, I’ll not post here the full function code.
In the repository, you can find the full implementation of
Simple Poller
.
This Poller
continuously checks for messages across all the queues
in the list
(it’s assumed they are sorted from high to low priority) and when it’s received one simply passes it to the proc
function
locking for a wait on the result. Once the result is here - the error analysis is performed and then Poller
decides whether
the Message
should be deleted from the Queue
or sent back for another retry.
The function to poll messages considering Queue
priority is incredibly simple:
func receiveMessage(queues []queue.Queue) (queue.Message, error) {
for _, q := range queues {
message, err := q.ReceiveMessage()
if err != nil {
if errors.Is(err, queue.ErrNoMessages) {
continue
}
return nil, err
}
return message, nil
}
return nil, queue.ErrNoMessages
}
It starts from the first Queue
from the queues
list and there is a message present - return it immediately, if not -
checking the next Queue
in the list and so on.
Please note, that ErrNoMessages
returned by the receiveMessage
function is a special state indicating that there
were no messages in all queues
defined in the list. The’ Poller’ uses this “flag” to jump to another iteration.
Summary
That’s all, folks! :)
You have just observed how the priority queue solution has been built in a few simple steps.
On the output, we got a scalable service that can be used to implement the Priority Queue pattern. The current implementation is pretty weak in features and only covers the project needs. If I see any interest in this service I will continue its development to the best of my ability and availability.