Launch Week Day 3: Introducing Organization Notifications in Socket.Learn More
Socket
Book a DemoSign in
Socket

sqs-queue

Package Overview
Dependencies
Maintainers
4
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

sqs-queue

AWS SQS queue consumer/publisher

pipPyPI
Version
0.6.4
Maintainers
4

py-sqs-queue

Simple Python AWS SQS queue consumer and publisher

Installation

python setup.py install

Examples

from sqs_queue import Queue

my_queue = Queue('YOUR_QUEUE_NAME')
for message in my_queue:
    your_process_fn(message)

Or, if you'd like to leave unprocessable messages in the queue to be retried again later:

for message in my_queue:
    try:
        your_process_fn(message)
    except YourRetryableError:
        message.defer()
    except Exception as e:
        logger.warn(e)

And, you can publish to the queue as well:

queue.publish({'MessageId': 123, 'Message': '{"foo": "bar"}'})

If you already have a boto3 queue resource, pass this instead of a name:

import boto3
from sqs_queue import Queue

queue_resource = boto3.resource('sqs').Queue('YOUR_QUEUE_NAME')

my_queue = Queue(queue=queue_resource)

Configuration

You can put your AWS credentials in environment variables or any of the other places boto3 looks.

Other parameters can be passed into the Queue() initiator, or set with environment variables prefixed by SQS_QUEUE_, e.g. SQS_QUEUE_POLL_WAIT.

Parameters

poll_wait and poll_sleep

Behind the scenes, the generator is polling SQS for new messages. When the queue is empty, that call will wait up to 20 seconds for new messages, and if it times out before any arrive it will sleep for 40 seconds before trying again. Those time intervals are configurable:

queue = Queue('YOUR_QUEUE_NAME', poll_wait=20, poll_sleep=40)

drain

Normally, once the queue is empty, the generator waits for more messages. If you just want to process all existing messages and quit, you can pass this boolean parameter:

queue = Queue('YOUR_QUEUE_NAME', drain=True)

For example, if your queue is long and your consumers are falling behind, you can start a bunch of consumers with drain=True and they'll quit when you've caught up.

sns

If your SQS queue is being fed from an SNS topic, you can pass your Queue this boolean parameter, and then your messages will just contain the SNS notification data, so you don't have to fish it out of the SQS message and decode it:

queue = Queue('YOUR_QUEUE_NAME', sns=True)

When you use this option, the sns_message_id is added to the notification data, which can be used to make sure you only process each message once.

create

When you pass create=True then, if your SQS queue name is not found, a queue with that name will be created.

bulk_queue

You can pass this option another Queue, which will be checked only when the primary "priority" queue is empty. For example:

In [1]:   from sqs_queue import Queue

In [2]:   bulk = Queue(
   ...:       queue_name='bulk',
   ...:       create=True,
   ...:       poll_wait=2
   ...:   )

In [3]:   primary = Queue(
   ...:       queue_name='primary',
   ...:       bulk_queue=bulk,
   ...:       drain=True,
   ...:       create=True,
   ...:       poll_wait=2
   ...:   )

In [5]:   primary.publish('{"type": "priority", "id": 1}')
   ...:   bulk.publish('{"type": "bulk", "id": 1}')
   ...:   bulk.publish('{"type": "bulk", "id": 2}')

In [6]:   for msg in primary:
   ...:       print(msg)

{'type': 'priority', 'id': 1}
{'type': 'bulk', 'id': 1}
{'type': 'bulk', 'id': 2}

bulk_queue_check_pct

When using bulk_queue, the bulk queue is normally only checked when the primary queue is empty. With bulk_queue_check_pct, you can also randomly check the bulk queue after a percentage of non-empty primary queue polls:

primary = Queue(
    queue_name='primary',
    bulk_queue=bulk,
    bulk_queue_check_pct=25
)

This will check the bulk queue after approximately 25% of primary queue polls that returned messages, helping prevent bulk messages from being starved when the primary queue is continuously busy.

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts