Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,29 @@ def _create_and_install_waiters(fs, return_when):

return waiter


def _yield_finished_futures(fs, waiter, ref_collect):
"""
Iterate on the list *fs*, yielding finished futures one by one in
reverse order.
Before yielding a future, *waiter* is removed from its waiters
and the future is removed from each set in the collection of sets
*ref_collect*.

The aim of this function is to avoid keeping stale references after
the future is yielded and before the iterator resumes.
"""
while fs:
f = fs[-1]
for futures_set in ref_collect:
futures_set.remove(f)
with f._condition:
f._waiters.remove(waiter)
del f
# Careful not to keep a reference to the popped value
yield fs.pop()


def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.

Expand All @@ -194,16 +217,19 @@ def as_completed(fs, timeout=None):
end_time = timeout + time.time()

fs = set(fs)
total_futures = len(fs)
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)

finished = list(finished)
try:
for future in finished:
yield future
for f in _yield_finished_futures(finished, waiter,
ref_collect=(fs,)):
f = [f]

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this? Why not just yield f?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh...because otherwise the future would still be referenced by the generator. Gotcha.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. Maybe Python 4 will consider fixing iteration target vars leaking to the scope :-).

yield f.pop()

while pending:
if timeout is None:
Expand All @@ -213,7 +239,7 @@ def as_completed(fs, timeout=None):
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), len(fs)))
len(pending), total_futures))

waiter.event.wait(wait_timeout)

Expand All @@ -222,11 +248,15 @@ def as_completed(fs, timeout=None):
waiter.finished_futures = []
waiter.event.clear()

for future in finished:
yield future
pending.remove(future)
# reverse to keep finishing order
finished.reverse()
for f in _yield_finished_futures(finished, waiter,
ref_collect=(fs, pending)):
f = [f]

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

yield f.pop()

finally:
# Remove waiter from unfinished futures
for f in fs:
with f._condition:
f._waiters.remove(waiter)
Expand Down Expand Up @@ -600,11 +630,14 @@ def map(self, fn, *iterables, **kwargs):
# before the first iterator value is required.
def result_iterator():
try:
for future in fs:
# reverse to keep finishing order
fs.reverse()
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield future.result()
yield fs.pop().result()
else:
yield future.result(end_time - time.time())
yield fs.pop().result(end_time - time.time())
finally:
for future in fs:
future.cancel()
Expand Down