Run fast asynchronous code from a synchronous code. Async Services provide a synchronous wrapper to run third party asynchronous code or any coroutine for that matter in a synchronous way from a synchronous code.
pip install async_services
Install project Dependencies
pip install -r requirements.txt
pip install -U .
You can run the tests with the following command
pytest .
pycodestyle .
from async_services.core import run_coro, run_manager, stop_manager
from async_services.core.manager import CoroStatus
import asyncio
async def coroutine(seconds=1, raise_exception=False):
await asyncio.sleep(seconds)
if raise_exception:
raise Exception("Sample Exception")
return "Hello World"
run_manager()
result = run_coro(coroutine(), block=True)
print(result)
assert result[0] == CoroStatus.Completed
assert result[1] == "Hello World"
stop_manager()
Result will be a tuple consisting of two values (status, result) status will be a integer between 0 and 5 and it defines the state of the coruotine
(1, 'Hello World')
Presently there are six status of a coruotine
- Queued = 0 -> Coroutine is still queued waiting to be executed or is being executed
- Completed = 1 -> Coroutine has Completed Successfully
- Failed = 2 -> Coroutine Completed Successfully , But callback function raised an exception
- Cancelled = 3 -> Coroutine was Cancelled
- Timeout = 4 -> Coroutine did not complete in the given time
- CoroutineException = 5 -> Coroutine Itself Raised an Exception
- Ankit Kathuria - Initial work
This project is licensed under the MIT License - see the LICENSE.md file for details
- Hat tip to anyone whose code was used
- Inspiration
- etc