Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queue flushing signal handler #792

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from

Conversation

atai92
Copy link
Contributor

@atai92 atai92 commented May 17, 2023

Closes #

💸 TL;DR

Add a queue flushing signal handler, so when the event publisher receives a SIGINT/SIGTERM signal it'll continue flushing the queue, so events aren't dropped on shutdown.

📜 Details

Design Doc

Jira

🧪 Testing Steps / Validation

Tested using the included Dockerfile using the following steps:

docker run --sysctl fs.mqueue.msg_max=10000 --sysctl fs.mqueue.msgsize_max=102400 -it -v ~/baseplate.py:/src --ulimit msgqueue=-1 c725da7d50d0
python -m baseplate.sidecars.event_publisher --queue-name test --debug test.ini

# In a separate tab, exec into the container
docker exec -it 07e5b9a2bb37 /bin/bash
python
# Within Python, ran the code below
from baseplate.lib.message_queue import MessageQueue
import time

queue_name = "test"

event_queue = MessageQueue(
        "/events-" + queue_name,
        max_messages=10000,
        max_message_size=102400,
    )


message = b'[{"source":"test","action":"test","noun":"test","client_timestamp":1683823393779,"uuid":"20de9dec-6632-44fa-a3a3-ca29d177776d","action_info":{"reason":"test"}}]'

current_time = time.time()
while time.time() - current_time < 5:
    event_queue.put(message)

This screenshot below shows the reproduction of a bug where we would exit from a BatchFull exception:
Screenshot 2023-05-16 at 5 16 01 PM

This screenshot below shows that the BatchFull exception is no longer present using the same steps above and sending a SIGINT to event_publisher will still cause it to wait for the POSIX queue to be empty before exiting:
Screenshot 2023-05-16 at 5 21 55 PM

✅ Checks

  • CI tests (if present) are passing
  • Adheres to code style for repo
  • Contributor License Agreement (CLA) completed if not a Reddit employee

@atai92 atai92 requested a review from a team as a code owner May 17, 2023 00:23
@KTAtkinson KTAtkinson requested review from redloaf and removed request for maeivysea May 18, 2023 16:46
try:
message = event_queue.get(timeout=0.2)
except TimedOutError:
if len(batcher.serialize()) > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The length here will always be greater than 0 because we add a header. We might want to add a items_count function to the batcher to count the number of items currently in the queue. It could use the length of batcher._items.

Copy link

@redloaf redloaf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants