Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions transfers/well_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,15 +624,15 @@ def _after_hook(self, session):
measuring_point_estimator = MeasuringPointEstimator()
# add things thate need well id
query = session.query(Thing).filter(Thing.thing_type == "water well")
count = query.count()
wells = query.all()
# for j, chunk in enumerate(chunk_by_size(query.all(), 100)):
chunk_size = 100
for j, chunki in enumerate(range(0, count, chunk_size)):
chunk = wells[chunki : chunki + chunk_size]
count = query.count()
processed = 0
chunk = []

def _process_chunk(chunk_index: int, wells_chunk: list[Thing]):
step_start_time = time.time()
all_objects = []
for i, well in enumerate(chunk):
for well in wells_chunk:
objs = self._after_hook_chunk(
well, formations, measuring_point_estimator
)
Expand All @@ -649,10 +649,21 @@ def _after_hook(self, session):
finally:
save_time = time.time() - save_time

processed_count = chunk_index * chunk_size + len(wells_chunk)
logger.info(
f"After hook: {(j+1)*100}/{count} took {time.time() - step_start_time:.2f}s, "
f"After hook: {processed_count}/{count} took {time.time() - step_start_time:.2f}s, "
f"n_objects={len(all_objects)}, save_time={save_time}"
)
return processed_count

for well in query.yield_per(chunk_size):
chunk.append(well)
if len(chunk) == chunk_size:
processed = _process_chunk(processed // chunk_size, chunk)
Comment on lines +657 to +660

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid committing while streaming yield_per results

The new for well in query.yield_per(chunk_size) loop keeps the SELECT cursor open while _process_chunk commits the session. On PostgreSQL and other backends, committing a transaction closes the streaming cursor, so once more than chunk_size wells exist the next fetch will raise or silently stop and only the first batch will be processed. The previous list-based approach materialized all wells before each commit and did not hit this cursor-closing behavior. Consider deferring the commit until after iteration or reading wells in a separate session/connection that is not committed mid-stream.

Useful? React with 👍 / 👎.

chunk = []

if chunk:
_process_chunk(processed // chunk_size, chunk)

def _after_hook_chunk(self, well, formations, measuring_point_estimator):

Expand Down