diff --git a/process_tracker/data_store.py b/process_tracker/data_store.py index bdd0949..f895a91 100755 --- a/process_tracker/data_store.py +++ b/process_tracker/data_store.py @@ -28,7 +28,7 @@ ] preload_process_status_types = ["running", "completed", "failed"] preload_process_types = ["extract", "load"] -preload_system_keys = [{"key": "version", "value": "0.1.0"}] +preload_system_keys = [{"key": "version", "value": "0.2.0"}] relational_stores = ["postgresql", "mysql", "oracle", "mssql", "snowflake"] nonrelational_stores = [] diff --git a/process_tracker/process_tracker.py b/process_tracker/process_tracker.py index 7ec46c6..33a9455 100755 --- a/process_tracker/process_tracker.py +++ b/process_tracker/process_tracker.py @@ -98,6 +98,20 @@ def __init__( self.process_tracking_run = self.register_new_process_run() + @staticmethod + def bulk_change_extract_status(extracts, extract_status): + """ + Given a set of extract objects, update the extract process record to reflect the association and updated status + as well as the extract record's' status. + :param extracts: List of Extract SQLAlchemy objects to be bulk updated. + :param extract_status: The status to change the extract files to. + :type extract_status: str + :return: + """ + + for extract in extracts: + extract.change_extract_status(new_status=extract_status) + def change_run_status(self, new_status, end_date=None): """ Change a process tracking run record from 'running' to another status. @@ -155,6 +169,8 @@ def find_ready_extracts_by_filename(self, filename): .all() ) + self.logger.info("Returning extract files by filename.") + return process_files def find_ready_extracts_by_location(self, location_name=None, location_path=None): @@ -195,6 +211,7 @@ def find_ready_extracts_by_location(self, location_name=None, location_path=None "A location name or path must be provided. Please try again." ) + self.logger.info("Returning extract files by location.") return process_files def find_ready_extracts_by_process(self, extract_process_name): diff --git a/tests/test_process_tracker.py b/tests/test_process_tracker.py index 487ecd6..661d342 100755 --- a/tests/test_process_tracker.py +++ b/tests/test_process_tracker.py @@ -112,6 +112,46 @@ def tearDown(self): self.session.query(ErrorType).delete() self.session.commit() + def test_bulk_change_extract_status(self): + """ + Testing that bulk change occurs when extracts provided. + :return: + """ + extract = ExtractTracker( + process_run=self.process_tracker, + filename="test_extract_filename2.csv", + location_name="Test Location", + location_path="/home/test/extract_dir", + ) + + extract2 = ExtractTracker( + process_run=self.process_tracker, + filename="test_extract_filename3.csv", + location_name="Test Location", + location_path="/home/test/extract_dir", + ) + + extracts = [extract, extract2] + + self.process_tracker.bulk_change_extract_status( + extracts=extracts, extract_status="loading" + ) + + given_result = ( + self.session.query(ExtractProcess) + .join(ExtractStatus) + .filter( + ExtractProcess.process_tracking_id + == self.process_tracker.process_tracking_run.process_tracking_id + ) + .filter(ExtractStatus.extract_status_name == "loading") + .count() + ) + + expected_result = 2 + + self.assertEqual(expected_result, given_result) + def test_change_status_invalid_type(self): """ Testing that if an invalid process status type is passed, it will trigger an exception.