Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use typing.Protocol to strictly enforce callback function signatures #2091

Open
Microwave-WYB opened this issue May 17, 2024 · 0 comments
Open

Comments

@Microwave-WYB
Copy link

The current implementation of callbacks is quite confusing. We must create a callback with the Callback class, while the class doesn't specify what keyword parameters should be included in a callback function. The only way to learn about the callback function protocol is to use the documentation.

To avoid using invalid callback functions in my project, I had to create protocols to enforce the format of the callback function.

This provides nice static-checking capability to my library. Invalid callback functions will be detected by the type checker and errors will be raised in the editor.

I would suggest implement something similar in the rq library.

"""RQ task"""

from dataclasses import dataclass
from typing import Any, Callable, Protocol, runtime_checkable

import rq
from redis import Redis
from rq.job import Job


@runtime_checkable
class SuccessCallback(Protocol):
    """Protocol for success callbacks."""

    def __call__(self, job: Job, connection: Redis, *args: Any, **kwargs: Any) -> None: ...


@runtime_checkable
class FailureCallback(Protocol):
    """Protocol for failure callbacks."""

    def __call__(
        self,
        job: Job,
        connection: Redis,
        type: Any,  # pylint: disable=redefined-builtin
        value: Any,
        traceback: Any,
    ) -> None: ...


@runtime_checkable
class StoppedCallback(Protocol):
    """Protocol for stopped callbacks."""

    def __call__(self, job: Job, connection: Redis) -> None: ...


@dataclass
class Task:
    """Template class for a task to be submitted to an RQ queue."""

    queue: str | rq.Queue
    handler: Callable
    args: tuple | None = None
    kwargs: dict[str, Any] | None = None
    on_success: SuccessCallback | None = None
    on_failure: FailureCallback | None = None
    on_stopped: StoppedCallback | None = None

    def submit(self) -> Job:
        """Submit the task to the queue."""
        if isinstance(self.queue, str):
            self.queue = rq.Queue(self.queue)

        return self.queue.enqueue(
            self.handler,
            args=self.args,
            kwargs=self.kwargs,
            on_success=self.on_success,
            on_failure=self.on_failure,
            on_stopped=self.on_stopped,
        )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant