Scenario:

You are using AWS SQS managed FIFO message queuing service that enables you to decouple and scale microservices. Now you observe that sometimes the messages are not processed (for different reasons) and these messages are dumped in the Dead letter FIFO message queue configured for the source FIFO queue. Order of messages is no problem for your application so you want these messages to be relayed back to the source queue. To do so there are different options available-

  1. Setup a Lambda function that could automate this process. In this article, I will share the scripts and steps to do this.
  2. But if you want to review the message to find the cause for not getting processed before it is replayed then you can set up a replay of the message as the manual process by creating a pipeline on GitLab.
  3. There could be some other better option which I am not aware of as yet.

https://youtu.be/bjaxo1HE9-M

Solution:

Steps->

1. Setup Queues and IAM user (skip this step if you already have source queue, DLQ queue, and IAM user having access to these queues).

>> Login to AWS console and select SQS service to create Source FIFO queue. Follow the screenshot highlights to create the FIFO queue-

 

 

 

>> Now create a DLQ queue for the source queue and then configure the Source queue with the DLQ queue. Follow the screenshots-

 

 

 

After creating DLQ queue, edit the source queue (source_queue.fifo)  to configure with DLQ queue and Save

 

>> Create IAM Role for Lambda-

 

Copy the ARN of the Role and edit the SQS source queue and SQS DLQ queue to configure the access policy

 

>> Create Lambda function (as shown in screenshots)-

 

 

Add SQS trigger to notify Lambda about the message received

 

Add this python code to the lambda function (Note: messagegroupid is must for FIFO message)

import json
import os
import sys
import logging
import boto3
 
logger = logging.getLogger()
logger.setLevel(logging.INFO)
 
SQS = boto3.client('sqs')
 
def lambda_handler(event, context):
 
    #logger.info('Event: {}'.format(event))
    for record in event['Records']:
        #logger.info("record + %s", record)
        DeadLetterQueueSourceArn = record['attributes']['DeadLetterQueueSourceArn']
        #logger.info("DeadLetterQueueSourceArn + %s", DeadLetterQueueSourceArn)
        x = DeadLetterQueueSourceArn.split(":")
        SQS_MAIN_URL = "https://"+x[2]+"."+x[3]+".amazonaws.com/"+x[4]+"/"+x[5]
        #logger.info("SQS_MAIN_URL + %s",SQS_MAIN_URL)
        msgreplay = "Message replayed to main SQS queue"
        logger.info("msgreplay + %s", msgreplay)
 
        SQS.send_message(
            QueueUrl=SQS_MAIN_URL,
            MessageBody=record['body'],
            MessageGroupId=record['attributes']['MessageGroupId'],
            MessageAttributes=record['messageAttributes']
        )

 

 

Now create some message for the source FIFO queue. Poll the messages which will move them to the DLQ FIFO queue. The message will be visible in the DLQ FIFO queue after 5 minutes. For the FIFO queue, messages relayed to the source FIFO queue within 5 minutes are treated as duplicates and will be dropped. Lambda will auto-trigger as soon as the messages are visible in the DLQ FIFO queue. It will replay them to Source FIFO Queue and remove them from the DLQ FIFO queue.