diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 50ea868b3..2aa70ba01 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -624,15 +624,15 @@ def _after_hook(self, session): measuring_point_estimator = MeasuringPointEstimator() # add things thate need well id query = session.query(Thing).filter(Thing.thing_type == "water well") - count = query.count() - wells = query.all() - # for j, chunk in enumerate(chunk_by_size(query.all(), 100)): chunk_size = 100 - for j, chunki in enumerate(range(0, count, chunk_size)): - chunk = wells[chunki : chunki + chunk_size] + count = query.count() + processed = 0 + chunk = [] + + def _process_chunk(chunk_index: int, wells_chunk: list[Thing]): step_start_time = time.time() all_objects = [] - for i, well in enumerate(chunk): + for well in wells_chunk: objs = self._after_hook_chunk( well, formations, measuring_point_estimator ) @@ -649,10 +649,21 @@ def _after_hook(self, session): finally: save_time = time.time() - save_time + processed_count = chunk_index * chunk_size + len(wells_chunk) logger.info( - f"After hook: {(j+1)*100}/{count} took {time.time() - step_start_time:.2f}s, " + f"After hook: {processed_count}/{count} took {time.time() - step_start_time:.2f}s, " f"n_objects={len(all_objects)}, save_time={save_time}" ) + return processed_count + + for well in query.yield_per(chunk_size): + chunk.append(well) + if len(chunk) == chunk_size: + processed = _process_chunk(processed // chunk_size, chunk) + chunk = [] + + if chunk: + _process_chunk(processed // chunk_size, chunk) def _after_hook_chunk(self, well, formations, measuring_point_estimator):