diff --git a/src/apify/scrapy/_async_thread.py b/src/apify/scrapy/_async_thread.py index 79de1162..0333531b 100644 --- a/src/apify/scrapy/_async_thread.py +++ b/src/apify/scrapy/_async_thread.py @@ -5,7 +5,7 @@ from concurrent import futures from datetime import timedelta from logging import getLogger -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal if TYPE_CHECKING: from collections.abc import Coroutine @@ -14,13 +14,16 @@ class AsyncThread: - """Class for running an asyncio event loop in a separate thread. + """Run an asyncio event loop in a dedicated background thread. - This allows running asynchronous coroutines from synchronous code by executingthem on an event loop - that runs in its own dedicated thread. + This lets synchronous Scrapy callbacks drive asynchronous Apify and Crawlee coroutines. The + scheduler and the HTTP cache storage each own their own `AsyncThread`, so the request queue and + the key-value store never share an event loop; they only share the read-only global + `Configuration`. A single shared loop would also work but would couple their lifecycles. """ - def __init__(self) -> None: + def __init__(self, default_timeout: timedelta = timedelta(seconds=60)) -> None: + self._default_timeout = default_timeout self._eventloop = asyncio.new_event_loop() # Start the event loop in a dedicated daemon thread. @@ -33,7 +36,7 @@ def __init__(self) -> None: def run_coro( self, coro: Coroutine, - timeout: timedelta = timedelta(seconds=60), + timeout: timedelta | Literal['default'] = 'default', ) -> Any: """Run a coroutine on an event loop running in a separate thread. @@ -42,7 +45,8 @@ def run_coro( Args: coro: The coroutine to run. - timeout: The maximum number of seconds to wait for the coroutine to finish. + timeout: The maximum time to wait for the coroutine to finish. Pass `'default'` to use the + `default_timeout` passed to the constructor. Returns: The result returned by the coroutine. @@ -52,6 +56,9 @@ def run_coro( TimeoutError: If the coroutine does not complete within the timeout. Exception: Any exception raised during coroutine execution. """ + if timeout == 'default': + timeout = self._default_timeout + if not self._eventloop.is_running(): raise RuntimeError(f'The coroutine {coro} cannot be executed because the event loop is not running.')