• Home
  • Testimonials
  • Blog
  • Contact Us

At a Glance of a Key

Crafting Dreams into Ventures, Code into Excellence: Your Journey to Success

  • Home
  • Testimonials
  • Blog
  • Contact Us

SQS Benchmark (with large messages)

2017-07-27 Development 2 Comments 4 minute read

Amazon Simple Queue Service (Amazon SQS) is a scalable and fully managed message queuing service that allows users to transmit any amount of data through the web without administrative responsibility.

Recently, I tried to evaluate whenever the SQS service will fit my needs for a design I’m working on. My interest was getting information regarding it’s throughput and latencies with large messages and different concurrencies.

After trying to find the information on the internet, I found some nice benchmarks for SQS (for example an article from SoftwareMills regarding “Amazon’s SQS performance and latency“) but all of them used 100 bytes messages.

Note: If you don’t want to read the whole technical part, the benchmark results can be found here.

I gave up pretty fast and went to implementing my own benchmark tool.
AWS has a nice C++ SDK that is pretty simple to use.

The benchmark tool sends messages to an SQS queue and has 3 arguments:

  • Queue Depth – The concurrency we want to use when sending messages.
  • Message size in KB
  • Number of messages

The AWS credentials can be supplied in many ways that are described here.

In the beginning, we’ll create a random buffer with the given size and will use it for all the messages (I think it is safe to assume that there is no client side de-duplication implemented in the SDK). In this case performance is not an issue so I just picked an example from the internet and used it (can be found here).

In order to make things easier, I’ve created class called “MessageSender” that is responsible for all the activity related to SQS (sending messages, aggregating results, measure latencies and etc.).

Measuring latency is simple and can be done by using std::chrono::high_resolution_clock and the following pattern:

1
2
3
4
5
6
auto start = std::chrono::high_resolution_clock::now();
startAsyncOperation([start]()
    {
        auto finish = std::chrono::high_resolution_clock::now();
        auto latency = std::chrono::duration_cast<chrono::milliseconds>(finish - start).count();
    });

All the communication with SQS is done using the SQS client provided by the SDK. There are some constructors that can be used and the default constructor usually gets the job done. In our example, we’ll pass some client options that we want to specify.

1
2
3
4
5
6
7
8
9
10
void MessageSender::createSQSClient()
{
    Aws::Client::ClientConfiguration conf;

    conf.region = Aws::Region::EU_WEST_1;
    conf.scheme = Aws::Http::Scheme::HTTPS;
    conf.verifySSL = false;

    m_client = std::unique_ptr<aws::SQS::SQSClient>(new Aws::SQS::SQSClient(conf));
}

Sending message will be done asynchronously so we won’t need to build a threading model and we can leave this job to the SDK. SQS messages cannot contain binary data so I’ve used the message attributes to put the actual data I want to send (the generated random buffer we’ve created earlier):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void MessageSender::sendMessage()
{
    Aws::SQS::Model::SendMessageRequest request;
    request.SetQueueUrl(m_url);
    request.SetMessageBody("");

    Aws::SQS::Model::MessageAttributeValue data;
    data.SetDataType("Binary");
    data.SetBinaryValue(m_message);
    request.AddMessageAttributes("data", data);

    auto start = std::chrono::high_resolution_clock::now();
    m_client->SendMessageAsync(request, [this, start](const Aws::SQS::SQSClient*,
                                                      const Aws::SQS::Model::SendMessageRequest&,
                                                      const Aws::SQS::Model::SendMessageOutcome& response,
                                                      const std::shared_ptr<const Aws::Client::AsyncCallerContext>&)
        {
            onMessageSent(start, response);
        });
}

When MessageSender finishes it’s work, it will call a callback and pass the results in a struct that contains the following members:

1
2
3
4
5
6
7
8
9
10
11
12
13
struct SendResults
{
    uint32_t concurrency;
    uint32_t messageSizeKB;

    uint64_t failures;
    uint64_t messages;
    double totalSizeMB;
    double throughput;
    Counter latency;

    uint64_t testDurationMs;
};

Please note that the type of the latency field is Counter, it is another class that I’ve wrote in order to maintain extra information regarding the latency (min, max, mean, stddev).
Each access to this struct should be safe because I have a mutex in MessageSender that protects us. For thread safety, an internal mutex can be introduced.

In order to implemented the requested queue depth (concurrency), we’ll maintain two class members: m_inflight and m_left. m_left will hold the number of messages that are not sent yet (or are still inflight) and m_inflight will hold the number of messages that are currently inflight (should be equal to the concurrency most of the time).
m_left is initialized to hold the number of requested messages and each time we get a response we’ll decrement it. Once m_left reaches zero, we don’t have any more messages to send so we start decrementing the m_inflight member until we reach the response for the last message (m_left = 0 && m_inflight = 1). Now we know that everything is done and we can calculate the results and finish the benchmark run.

The whole sequence implementation in MessageSender looks like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
void MessageSender::start()
{
    m_startTime = std::chrono::high_resolution_clock::now();

    uint32_t jobs = m_concurrency;
    for(uint32_t i = 0; i < jobs; ++i) {
        sendMessage();
    }
}

void MessageSender::sendMessage() {
    Aws::SQS::Model::SendMessageRequest request;
    request.SetQueueUrl(m_url);
    request.SetMessageBody("");

    Aws::SQS::Model::MessageAttributeValue data;
    data.SetDataType("Binary");
    data.SetBinaryValue(m_message);

    request.AddMessageAttributes("data", data);

    auto start = std::chrono::high_resolution_clock::now();
    m_client->SendMessageAsync(request, [this, start](const Aws::SQS::SQSClient*,
                                                      const Aws::SQS::Model::SendMessageRequest&,
                                                      const Aws::SQS::Model::SendMessageOutcome& response,
                                                      const std::shared_ptr<const Aws::Client::AsyncCallerContext>&)
        {
            onMessageSent(start, response);
        });
}

void MessageSender::onMessageSent(const std::chrono::time_point<std::chrono::high_resolution_cloc>& start, const Aws::SQS::Model::SendMessageOutcome& response)
{
    auto finish = std::chrono::high_resolution_clock::now();

    unique_lock<mutex> guard(m_lock);
    bool result = response.IsSuccess();
    if (__glibc_likely(result)) {
        ++(m_results.messages);

        auto latency = std::chrono::duration_cast<chrono::milliseconds>(finish - start).count();
        m_results.latency.submit(latency);
    }
    else {
        std::cerr << "Error sending message to " << m_url << ": " << response.GetError().GetMessage() << std::endl;
        ++(m_results.failures);
    }

    bool done = (m_left == 0) && (m_inflight == 1);
    if (done) {
        m_finishTime = finish;
        guard.unlock();

        complete();
        return;
    }

    // Nothing to do but there are still inflight requests
    if (m_left == 0) {
        --m_inflight;
        return;
    }

    --m_left;
    guard.unlock();
    sendMessage();
}

void MessageSender::complete()
{
    unique_ptr<messageSender> kill(this);
    m_results.totalSizeMB = m_results.messages * m_message.GetLength() / (1024.0 * 1024);

    m_results.testDurationMs = std::chrono::duration_cast<chrono::milliseconds>(m_finishTime - m_startTime).count();
    m_results.throughput = m_results.totalSizeMB / (m_results.testDurationMs / 1000.0);

    m_callback(m_results);
}

And the usage is simple as well:

1
2
3
4
5
6
7
8
9
10
11
12
void runBenchmark(const std::string& queueUrl, uint32_t concurrency, uint32_t sizeKB, uint64_t messages)
{
    Synchronizer sync;
    MessageSender *sender = new MessageSender(queueUrl, concurrency, sizeKB, messages, [&sync](const SendResults& results)
    {
        showResults(results);
        sync.notify();
    });

    sender->start();
    sync.wait();
}

Synchronizer is another utility class that makes the thread sleep until it is notified. This way we don’t finish the runBenchmark function until the lambda function is called.

Running the benchmark tool requires some arguments:

1
2
[22:00 alexander ~/Projects/sqs-benchmark-tool ]$ ./benchmark
Usage: ./benchmark <queueUrl> <qd> <sizeKB> <messages>

For example, running the tool with 20,000 messages of 64KB and QD=64:

1
2
3
4
5
6
7
8
9
10
11
12
[22:00 alexander ~/Projects/sqs-benchmark-tool ]$ ./benchmark <put-url-here> 64 64 20000

Generating 64KB buffer
Initializing MessageSender: QD=64, messageSize=64 KB, numOfMessages=20000
--------------------------------------------------------
Benchmark results for 64 KB messages with QD=64
Duration: 32.354 sec
Transferred: 1250 MB
Messages: 20000
Failures: 0
Latency [ms]: min=33, max=856, mean=102.54, stdev=40.0585
Throughput: 312.5 MBit/sec

The full source code can be found here. Feel free to modify and use it.
Building the project is done using CMake:

1
2
cmake -Daws-sdk-cpp_DIR=/path/to/aws-sdk-cpp
make -j8

In order to get some numbers, I’ve created an SQS queue and a c4.2xlarge instance and ran the benchmark tool in various message sizes and concurrences.
The results I got can be found here.

– Alexander

Oh hi there 👋
It’s nice to meet you.

Sign up to receive a notification when new posts are published!

We don’t spam!

Check your inbox or spam folder to confirm your subscription.

APIAWSC++CloudPerformance

Serverless On-call duty notifier - Part 2

How to consume all messages from an SQS queue ?

2 thoughts on “SQS Benchmark (with large messages)”
  1. pubg free cheat
    2017-09-06 at 3:06 AM

    I enjoying, will read more. Cheers!

    Reply
  2. Brigitte Dekok
    2018-09-12 at 6:46 PM

    I visited a lot of website but I think this one contains something extra in it in it

    Reply
Leave a Reply Cancel reply

About Me

Principal Software Engineer and an industry leader with startup and FAANG experience. I specialize in distributed systems, storage, data protection services and payment processors.

Beyond technical expertise, I am passionate about supporting fellow engineers in their careers. Through approachable blogs and hands-on guidance, I help navigate the ever-evolving landscape of technology, empowering individuals to thrive in their professional journeys.

Open LinkedIn

Recent Posts

  • Building a Delayed Message System with Redis and FastAPI
  • Go Concurrency, Practical Example
  • Using GORM – Part 3: Models and Idempotency
  • Using GORM – Part 2: Transactions and Save Points
  • Using GORM – Part 1: Introduction

Archives

  • January 2025
  • December 2024
  • March 2023
  • February 2023
  • September 2022
  • July 2022
  • July 2021
  • June 2021
  • February 2021
  • April 2018
  • March 2018
  • January 2018
  • July 2017
  • June 2017
  • May 2017

Categories

  • AWS
  • Career Growth
  • Cyber Security
  • Debugging
  • Development
  • Storage
  • Tips & Tricks

Tags

API AWS Azure Bash Brainfuck C++ Challenge Cloud Cloud Bursting Concurrency Database DevOps Disassembly DLL Documentation DynamoDB Go Golang Guice Java Jenkins Mossad NoSQL OOP Performance Programming Python Redis Security Serverless Singleton Streams Testing Unit Tests WebService

All Rights Reserved 2025 © Sirotin Enterprises Inc.
Proudly powered by WordPress | Theme: Doo by ThemeVS.