Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KombuMessageHandler doesn't handle ServerTimeout #436

Open
aides opened this issue May 29, 2020 · 1 comment
Open

KombuMessageHandler doesn't handle ServerTimeout #436

aides opened this issue May 29, 2020 · 1 comment
Labels
bug good starter A good place to start devving on baseplate.py

Comments

@aides
Copy link
Contributor

aides commented May 29, 2020

Repro steps

  1. Setup consumer via KombuQueueConsumerFactory.new(...) like in the docs:
    https://baseplate.readthedocs.io/en/stable/api/baseplate/frameworks/queue_consumer/kombu.html

  2. Emulate a long-running operation in the handle_fn, e.g.

def process_links(
    context: RequestContext,
    body: Any,
    message: kombu.Message,
):
    sleep(11) # default server_timeout is 10 seconds, so just going over it. 
  1. Send the message to the queue.
  2. See that the ServerTimeout error is logged in the consumer
  3. Send another message to the queue

Actual

The message won't be consumed.

Expected

The message is consumed and ServerTimeout error is logged once again.

Details

What I found is that ServerTimeout is inherited from BaseException (https://github.com/reddit/baseplate.py/blob/master/baseplate/observers/timeout.py#L18) but MessageHandler handles Exception inherited messages only:
https://github.com/reddit/baseplate.py/blob/master/baseplate/frameworks/queue_consumer/kombu.py#L113

So the ServerTimeout exception is passed all the way through and it crashes the Thread the QueueConsumer is running in: https://github.com/reddit/baseplate.py/blob/master/baseplate/server/queue_consumer.py#L217

Proposal

I propose to handle ServerTimeout in MessageHandler as any other exception.

try:
...
except (Exception, ServerTimeout) as exc:
...
@pacejackson
Copy link
Contributor

pacejackson commented Jun 4, 2020

One thing to note, I think in your proposal, the message still wouldn't be consumed because an error requeue-s the message and in your test code, it's always going to time out.

I think your proposal makes sense, in reality a worker thread crashing should cause the entire queue consumer server to shut down (which should trigger it to restart)... but I think that logic might have the same problem. 😅

def watcher(fn: Callable) -> Callable:
"""Terminates the server (gracefully) if `fn` raises an Exception.
Used to monitor the pump and handler threads for Exceptions so we can
shut down the server if one of them exits unexpectedly.
"""
def _run_and_terminate(*a: Any, **kw: Any) -> Any:
try:
return fn(*a, **kw)
except Exception:
logger.exception("Unhandled error in pump or handler thread, terminating.")
self._terminate()
return _run_and_terminate

Maybe the thing to do is also make the watcher closure catch BaseException rather than Exception?

@spladug spladug added bug good starter A good place to start devving on baseplate.py labels Nov 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug good starter A good place to start devving on baseplate.py
Development

No branches or pull requests

3 participants