diff --git a/src/apify_client/_streamed_log.py b/src/apify_client/_streamed_log.py index afdc5860..92178e9a 100644 --- a/src/apify_client/_streamed_log.py +++ b/src/apify_client/_streamed_log.py @@ -218,8 +218,9 @@ async def _stream_log(self) -> None: async with self._log_client.stream(raw=True) as log_stream: if not log_stream: return - async for data in log_stream.aiter_bytes(): - self._process_new_data(data) - - # If the stream is finished, then the last part will be also processed. - self._log_buffer_content(include_last_part=True) + try: + async for data in log_stream.aiter_bytes(): + self._process_new_data(data) + finally: + # Flush the last buffered part even if the task is cancelled by `stop()`. + self._log_buffer_content(include_last_part=True) diff --git a/tests/unit/test_logging.py b/tests/unit/test_logging.py index 46eaa3a0..30da1b6a 100644 --- a/tests/unit/test_logging.py +++ b/tests/unit/test_logging.py @@ -719,6 +719,10 @@ async def test_async_watcher_aexit_skips_final_sleep_on_exception( assert elapsed < _FAST_EXIT_THRESHOLD_S, f'__aexit__ should skip final sleep on exception, took {elapsed:.2f}s' +_TAIL_FIRST_MESSAGE = '2025-05-13T07:24:12.588Z tail_test first complete line' +_TAIL_SECOND_MESSAGE = '2025-05-13T07:24:13.132Z tail_test trailing partial line' + + def _register_run_and_actor_endpoints(httpserver: HTTPServer) -> None: """Register the minimal run and actor endpoints required by `get_streamed_log`.""" httpserver.expect_request(f'/v2/actor-runs/{_MOCKED_RUN_ID}', method='GET').respond_with_json( @@ -774,6 +778,48 @@ def _register_run_and_actor_endpoints(httpserver: HTTPServer) -> None: @pytest.mark.usefixtures('propagate_stream_logs') +async def test_streamed_log_async_stop_flushes_buffered_tail( + caplog: LogCaptureFixture, + httpserver: HTTPServer, +) -> None: + """Verify the buffered tail is flushed to the logger when the async task is cancelled by `stop`.""" + stop_emitting = threading.Event() + + def _tail_handler(_request: Request) -> Response: + def generate_logs() -> Iterator[bytes]: + yield f'{_TAIL_FIRST_MESSAGE}\n'.encode() + # Second marker has no trailing newline/next-marker, so it stays in the buffer. + yield _TAIL_SECOND_MESSAGE.encode() + # Block until the test tears the server down (or stop releases us). + stop_emitting.wait(timeout=30) + + return Response(response=generate_logs(), status=200, mimetype='application/octet-stream') + + httpserver.expect_request( + f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true' + ).respond_with_handler(_tail_handler) + _register_run_and_actor_endpoints(httpserver) + + api_url = httpserver.url_for('/').removesuffix('/') + run_client = ApifyClientAsync(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID) + streamed_log = await run_client.get_streamed_log() + + logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' + + try: + with caplog.at_level(logging.DEBUG, logger=logger_name): + async with streamed_log: + # Wait long enough for both chunks to arrive and be processed. + await asyncio.sleep(1) + # Context exit calls stop(), which cancels the task mid-stream. + finally: + stop_emitting.set() + + messages = [record.message for record in caplog.records] + assert any(_TAIL_FIRST_MESSAGE in m for m in messages), f'First message missing. Got: {messages}' + assert any(_TAIL_SECOND_MESSAGE in m for m in messages), f'Buffered tail dropped on async stop(). Got: {messages}' + + def test_streamed_log_sync_stop_does_not_hang_on_silent_stream( httpserver: HTTPServer, monkeypatch: pytest.MonkeyPatch,