Scenario:

You are using AWS SQS managed FIFO message queuing service that enables you to decouple and scale microservices. Now you observe that sometimes the messages are not processed (for different reasons) and these messages are dumped in the Dead letter FIFO message queue configured for the source FIFO queue. Order of messages is no problem for your application so you want these messages to be relayed back to the source queue. To do so there are different options available-

  1. Setup a Lambda function that could automate this process. Automate using Lambda
  2. But if you want to review the message to find the cause for not getting processed before it is replayed then you can set up a replay of the message as the manual process by creating a pipeline on GitLab. In this article, I will share the scripts and steps to do this.
  3. There could be some other better option which I am not aware of as yet.

Solution:

Steps:

1. Setup Queues and IAM user (skip this step if you already have source queue, DLQ queue, and IAM user having access to these queues).

>> Login to AWS console and select SQS service to create Source FIFO queue. Follow the screenshot highlights to create the FIFO queue-

 

 

 

>> Now create a DLQ queue for the source queue and then configure the Source queue with the DLQ queue. Follow the screenshots-

 

 

 

After creating DLQ queue, edit the source queue (source_queue.fifo)  to configure with DLQ queue and Save

 

>> Create IAM SQS access policy and attach to a user (as shown in screenshots)-

Using this JSON policy create an IAM policy (DO CHANGE the RED highlighted account number).

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:GetQueueUrl",
                "sqs:ListDeadLetterSourceQueues",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "sqs:SendMessage",
                "sqs:GetQueueAttributes",
                "sqs:ListQueueTags",
                "sqs:SetQueueAttributes"
            ],
            "Resource": "arn:aws:sqs:*:704982002588:*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "sqs:ListQueues",
            "Resource": "*"
        }
    ]
}

 

Create an IAM user and attach this policy. Note the AWS security credentials (AWS_ACCESS_KEY_ID & AWS_SECRET_ACCESS_KEY). We will need it to set up in GitLab.

 

2. Now Create a GitLab repository, pipeline, and configure variables.

>> Create a GitLab repository and place these files in it.

.gitlab-ci.yml

---
stages:
  - msgreplay

variables:
  AWS_SQS_Source_Queue_Name:
    value: "source_queue.fifo"  
    description: "AWS SQS Source Queue Name"
  AWS_SQS_DLQ_Queue_Name:
    value: "dlq.fifo"  
    description: "AWS SQS DLQ Queue Name"

replay:
  stage: msgreplay
  image: python:latest
  script:
    - pip3 install -r requirements.txt
    - python3 aws-sqs-replay-msg.py 

 

requirements.txt

 

aws-sqs-replay-msg.py  (you can change the AWS region as per yours)

import logging
import boto3
import os

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']

# Get boto3 client handle for sqs
sqs = boto3.client(
    'sqs',
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    region_name='ap-southeast-2'
)


#Get the SQS Queue URL for a given Queue name
def get_sqs_queue_url(sqs, sqs_queue_name):
    try:
        sqs_queue_url = sqs.get_queue_url(QueueName=sqs_queue_name)
    except ClientError:
        logger.exception('Could not find queue URL.')
        raise
    else:
        return sqs_queue_url

#Receive SQS messages from a Queue (upto 10 messages at a time)
def receive_queue_message(sqs, queue_url):
    try:
        response = sqs.receive_message(QueueUrl=queue_url, AttributeNames=['All'],
        MaxNumberOfMessages=10,
        WaitTimeSeconds=10)
    except ClientError:
        logger.exception(
            f'Could not receive the message from the - {queue_url}.')
        raise
    else:
        return response


#Send message to a given FIFO Queue (MessageGroupId is required)
def send_queue_message(sqs, queue_url, msg_attributes, msg_body):
    try:
        response = sqs.send_message(QueueUrl=queue_url,
                                           MessageGroupId=msg_attributes,
                                           MessageBody=msg_body)
    except ClientError:
        logger.exception(f'Could not send meessage to the - {queue_url}.')
        raise
    else:
        return response


#Deletes the specified message from the specified queue
def delete_dlq_queue_message(sqs, queue_url, receipt_handle):
    try:
        response = sqs.delete_message(QueueUrl=queue_url,
                                             ReceiptHandle=receipt_handle)
    except ClientError:
        logger.exception(f'Could not delete the meessage from the - {queue_url}.')
        raise
    else:
        return response



if __name__ == '__main__':
    
    sqs_dlq_queue_url = get_sqs_queue_url(sqs, os.environ.get('AWS_SQS_DLQ_Queue_Name'))
    print(sqs_dlq_queue_url)
    sqs_source_queue_url = get_sqs_queue_url(sqs, os.environ.get('AWS_SQS_Source_Queue_Name'))
    print(sqs_source_queue_url)
    sqs_dlq_queue_messages = receive_queue_message(sqs, sqs_dlq_queue_url['QueueUrl'])
    print(sqs_dlq_queue_messages)

    if sqs_dlq_queue_messages.get('Messages'):
        for message in sqs_dlq_queue_messages['Messages']:
            response = send_queue_message(sqs, sqs_source_queue_url['QueueUrl'], message['Attributes']['MessageGroupId'], message['Body'])
            if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                delete_dlq_queue_message(sqs, sqs_dlq_queue_url['QueueUrl'], message['ReceiptHandle'])
    else:
        raise ValueError("No messages available in {}".format(os.environ.get('AWS_SQS_DLQ_Queue_Name')))

 

Configure (AWS_ACCESS_KEY_ID & AWS_SECRET_ACCESS_KEY) as project settings-

 

Now the GitLab pipeline is set up and ready to run. Create some messages for the source queue and force them to the DLQ queue for testing. Run the GitLab pipeline manually and see that the messages from the DLQ queue are replayed to the source queue and removed from the DLQ queue.