Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/apify_client/_streamed_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,9 @@ async def _stream_log(self) -> None:
async with self._log_client.stream(raw=True) as log_stream:
if not log_stream:
return
async for data in log_stream.aiter_bytes():
self._process_new_data(data)

# If the stream is finished, then the last part will be also processed.
self._log_buffer_content(include_last_part=True)
try:
async for data in log_stream.aiter_bytes():
self._process_new_data(data)
finally:
# Flush the last buffered part even if the task is cancelled by `stop()`.
self._log_buffer_content(include_last_part=True)
102 changes: 102 additions & 0 deletions tests/unit/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import json
import logging
import threading
import time
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -716,3 +717,104 @@ async def test_async_watcher_aexit_skips_final_sleep_on_exception(
elapsed = time.monotonic() - start

assert elapsed < _FAST_EXIT_THRESHOLD_S, f'__aexit__ should skip final sleep on exception, took {elapsed:.2f}s'


_TAIL_FIRST_MESSAGE = '2025-05-13T07:24:12.588Z tail_test first complete line'
_TAIL_SECOND_MESSAGE = '2025-05-13T07:24:13.132Z tail_test trailing partial line'


def _register_run_and_actor_endpoints(httpserver: HTTPServer) -> None:
"""Register the minimal run and actor endpoints required by `get_streamed_log`."""
httpserver.expect_request(f'/v2/actor-runs/{_MOCKED_RUN_ID}', method='GET').respond_with_json(
{
'data': {
'id': _MOCKED_RUN_ID,
'actId': _MOCKED_ACTOR_ID,
'userId': 'test_user_id',
'startedAt': '2019-11-30T07:34:24.202Z',
'finishedAt': '2019-12-12T09:30:12.202Z',
'status': 'RUNNING',
'statusMessage': 'Running',
'isStatusMessageTerminal': False,
'meta': {'origin': 'WEB'},
'stats': {'restartCount': 0, 'resurrectCount': 0, 'computeUnits': 0.1},
'options': {'build': 'latest', 'timeoutSecs': 300, 'memoryMbytes': 1024, 'diskMbytes': 2048},
'buildId': 'test_build_id',
'generalAccess': 'RESTRICTED',
'defaultKeyValueStoreId': 'test_kvs_id',
'defaultDatasetId': 'test_dataset_id',
'defaultRequestQueueId': 'test_rq_id',
'buildNumber': '0.0.1',
'containerUrl': 'https://test.runs.apify.net',
}
}
)
httpserver.expect_request(f'/v2/acts/{_MOCKED_ACTOR_ID}', method='GET').respond_with_json(
{
'data': {
'id': _MOCKED_ACTOR_ID,
'userId': 'test_user_id',
'name': _MOCKED_ACTOR_NAME,
'username': 'test_user',
'isPublic': False,
'createdAt': '2019-07-08T11:27:57.401Z',
'modifiedAt': '2019-07-08T14:01:05.546Z',
'stats': {
'totalBuilds': 0,
'totalRuns': 0,
'totalUsers': 0,
'totalUsers7Days': 0,
'totalUsers30Days': 0,
'totalUsers90Days': 0,
'totalMetamorphs': 0,
'lastRunStartedAt': '2019-07-08T14:01:05.546Z',
},
'versions': [],
'defaultRunOptions': {'build': 'latest', 'timeoutSecs': 3600, 'memoryMbytes': 2048},
'deploymentKey': 'test_key',
}
}
)


@pytest.mark.usefixtures('propagate_stream_logs')
async def test_streamed_log_async_stop_flushes_buffered_tail(
caplog: LogCaptureFixture,
httpserver: HTTPServer,
) -> None:
"""Verify the buffered tail is flushed to the logger when the async task is cancelled by `stop`."""
stop_emitting = threading.Event()

def _tail_handler(_request: Request) -> Response:
def generate_logs() -> Iterator[bytes]:
yield f'{_TAIL_FIRST_MESSAGE}\n'.encode()
# Second marker has no trailing newline/next-marker, so it stays in the buffer.
yield _TAIL_SECOND_MESSAGE.encode()
# Block until the test tears the server down (or stop releases us).
stop_emitting.wait(timeout=30)

return Response(response=generate_logs(), status=200, mimetype='application/octet-stream')

httpserver.expect_request(
f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true'
).respond_with_handler(_tail_handler)
_register_run_and_actor_endpoints(httpserver)

api_url = httpserver.url_for('/').removesuffix('/')
run_client = ApifyClientAsync(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID)
streamed_log = await run_client.get_streamed_log()

logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'

try:
with caplog.at_level(logging.DEBUG, logger=logger_name):
async with streamed_log:
# Wait long enough for both chunks to arrive and be processed.
await asyncio.sleep(1)
# Context exit calls stop(), which cancels the task mid-stream.
finally:
stop_emitting.set()

messages = [record.message for record in caplog.records]
assert any(_TAIL_FIRST_MESSAGE in m for m in messages), f'First message missing. Got: {messages}'
assert any(_TAIL_SECOND_MESSAGE in m for m in messages), f'Buffered tail dropped on async stop(). Got: {messages}'
Loading