diff --git a/process_tracker/models/actor.py b/process_tracker/models/actor.py index 7e81932..aff6f4d 100755 --- a/process_tracker/models/actor.py +++ b/process_tracker/models/actor.py @@ -11,3 +11,7 @@ class Actor(Base): actor_id = Column(Integer, Sequence('actor_lkup_actor_id_seq'), primary_key=True) actor_name = Column(String(250), nullable=False, unique=True) + + def __repr__(self): + return "" % (self.actor_id + , self.actor_name) diff --git a/process_tracker/models/extract.py b/process_tracker/models/extract.py index 963cbb9..fdf672c 100755 --- a/process_tracker/models/extract.py +++ b/process_tracker/models/extract.py @@ -2,6 +2,7 @@ # Models for Extract (Data) entities from datetime import datetime +from os.path import join from sqlalchemy import Column, DateTime, ForeignKey, Integer, Sequence, String from sqlalchemy.orm import relationship @@ -18,6 +19,11 @@ class ExtractStatus(Base): extracts = relationship("ExtractProcess") + def __repr__(self): + + return "" % (self.extract_status_id + , self.extract_status_name) + class Extract(Base): @@ -32,6 +38,17 @@ class Extract(Base): extract_process = relationship("ExtractProcess", back_populates='process_extracts') locations = relationship("Location", foreign_keys=[extract_location_id]) + def __repr__(self): + + return "" % (self.extract_id + , self.extract_filename + , self.extract_location_id + , self.extract_status_id) + + def full_filepath(self): + + return join(self.locations.location_path, self.extract_filename) + class ExtractProcess(Base): @@ -45,6 +62,12 @@ class ExtractProcess(Base): process_extracts = relationship('Extract', foreign_keys=[extract_tracking_id]) extract_processes = relationship('ProcessTracking', foreign_keys=[process_tracking_id]) + def __repr__(self): + + return "" % (self.extract_tracking_id + , self.process_tracking_id + , self.extract_process_status_id) + class LocationType(Base): @@ -54,7 +77,12 @@ class LocationType(Base): location_type_name = Column(String(25), unique=True, nullable=False) locations = relationship('Location', back_populates='location_types') - + + def __repr__(self): + + return "" % (self.location_type_id + , self.location_type_name) + class Location(Base): @@ -68,3 +96,9 @@ class Location(Base): extracts = relationship("Extract") location_types = relationship('LocationType', foreign_keys=[location_type]) + + def __repr__(self): + + return "" % (self.location_id + , self.location_name + , self.location_path) \ No newline at end of file diff --git a/process_tracker/process_tracker.py b/process_tracker/process_tracker.py index ddd483f..c5ae735 100755 --- a/process_tracker/process_tracker.py +++ b/process_tracker/process_tracker.py @@ -4,7 +4,6 @@ from datetime import datetime import logging import os -from os.path import join from sqlalchemy.orm import aliased @@ -16,8 +15,7 @@ from process_tracker.models.actor import Actor from process_tracker.models.extract import Extract, ExtractProcess, ExtractStatus, Location -from process_tracker.models.process import ErrorTracking, ErrorType, Process, ProcessDependency, ProcessTracking\ - , ProcessStatus, ProcessSource, ProcessTarget, ProcessType +from process_tracker.models.process import ErrorTracking, ErrorType, Process, ProcessDependency, ProcessTracking, ProcessStatus, ProcessSource, ProcessTarget, ProcessType from process_tracker.models.source import Source from process_tracker.models.tool import Tool @@ -80,13 +78,10 @@ def change_run_status(self, new_status, end_date=None): :return: """ if end_date is None: - self.logger.info('Eng date was not set. Setting to current timestamp.') end_date = datetime.now() if self.process_status_types[new_status]: - self.logger.info('New status exists. Setting run to %s' % new_status) - self.process_tracking_run.process_status_id = self.process_status_types[new_status] if (self.process_status_types[new_status] == self.process_status_complete) \ @@ -104,7 +99,6 @@ def change_run_status(self, new_status, end_date=None): self.session.commit() else: - self.logger.error('%s is not a valid process status type.' % new_status) raise Exception('%s is not a valid process status type. ' 'Please add the status to process_status_type_lkup' % new_status) @@ -115,23 +109,16 @@ def find_ready_extracts_by_filename(self, filename): :param filename: :return: """ - extract_files = [] - - self.logger.info('Searching for extracts with full/partial filename of %s' % filename) - process_files = self.session.query(Extract.extract_filename, Location.location_path)\ - .join(Location)\ + process_files = self.session.query(Extract)\ .join(ExtractStatus)\ .filter(Extract.extract_filename.like("%" + filename + "%"))\ .filter(ExtractStatus.extract_status_name == 'ready') \ .order_by(Extract.extract_registration_date_time)\ - .order_by(Extract.extract_id) - - for record in process_files: - extract_files.append(join(record.location_path, record.extract_filename)) - self.logger.debug('Found file %s' % record.extract_filename) + .order_by(Extract.extract_id)\ + .all() - return extract_files + return process_files def find_ready_extracts_by_location(self, location): """ @@ -139,33 +126,24 @@ def find_ready_extracts_by_location(self, location): :param location: :return: """ - extract_files = [] - self.logger.info('Searching for extracts found at the location named %s' % location) - - process_files = self.session.query(Extract.extract_filename, Location.location_path)\ + process_files = self.session.query(Extract)\ .join(Location)\ .join(ExtractStatus)\ .filter(ExtractStatus.extract_status_name == 'ready')\ .filter(Location.location_name == location) \ - .order_by(Extract.extract_registration_date_time) - - for record in process_files: - extract_files.append(join(record.location_path, record.extract_filename)) - self.logger.debug('Found file %s' % record.extract_filename) + .order_by(Extract.extract_registration_date_time)\ + .all() - return extract_files + return process_files def find_ready_extracts_by_process(self, extract_process_name): """ For the given named process, find the extracts that are ready for processing. :return: List of OS specific filepaths with filenames. """ - extract_files = [] - self.logger.info('Searching for extracts related to process %s' % extract_process_name) - - process_files = self.session.query(Extract.extract_filename, Location.location_path) \ + process_files = self.session.query(Extract) \ .join(ExtractStatus, Extract.extract_status_id == ExtractStatus.extract_status_id) \ .join(Location, Extract.extract_location_id == Location.location_id) \ .join(ExtractProcess, Extract.extract_id == ExtractProcess.extract_tracking_id) \ @@ -173,13 +151,12 @@ def find_ready_extracts_by_process(self, extract_process_name): .join(Process) \ .filter(Process.process_name == extract_process_name , ExtractStatus.extract_status_name == 'ready') \ - .order_by(Extract.extract_registration_date_time) + .order_by(Extract.extract_registration_date_time)\ + .all() - for record in process_files: - extract_files.append(join(record.location_path, record.extract_filename)) - self.logger.debug('Found file %s' % record.extract_filename) + self.logger.info('Returning extract files by process.') - return extract_files + return process_files def get_latest_tracking_record(self, process): """ @@ -188,14 +165,13 @@ def get_latest_tracking_record(self, process): :type process: integer :return: """ - self.logger.info('Searching for latest process run for process %s' % process.process_name) + instance = self.session.query(ProcessTracking)\ .filter(ProcessTracking.process_id == process.process_id)\ .order_by(ProcessTracking.process_run_id.desc())\ .first() if instance is None: - self.logger.info('Process run not found.') return False return instance @@ -207,12 +183,7 @@ def get_process_status_types(self): """ status_types = {} - self.logger.info('Getting all process status types.') - for record in self.session.query(ProcessStatus): - - self.logger.debug('Found process status %s' % record.process_status_name) - status_types[record.process_status_name] = record.process_status_id return status_types @@ -229,21 +200,14 @@ def raise_run_error(self, error_type_name, error_description=None, fail_run=Fals :type end_date: datetime :return: """ - self.logger.info('Raising run error.') - if end_date is None: - self.logger.info('Setting end date since one was not provided.') end_date = datetime.now() # Need the date to match across all parts of the event in case the run is failed. if error_description is None: - self.logger.info('Setting default error description since one was not provided.') error_description = 'Unspecified error.' - self.logger.info('Getting error type.') - error_type = self.data_store.get_or_create_item(model=ErrorType, create=False, error_type_name=error_type_name) - self.logger.info('Setting run error.') run_error = ErrorTracking(error_type_id=error_type.error_type_id , error_description=error_description , process_tracking_id=self.process_tracking_run.process_tracking_id @@ -256,7 +220,6 @@ def raise_run_error(self, error_type_name, error_description=None, fail_run=Fals if fail_run: self.change_run_status(new_status='failed', end_date=end_date) self.session.commit() - self.logger.error('Process halting. An error triggered the process to fail.') raise Exception('Process halting. An error triggered the process to fail.') def register_extracts_by_location(self, location_path, location_name=None): @@ -266,7 +229,6 @@ def register_extracts_by_location(self, location_path, location_name=None): :param location_path: Path of the location :return: """ - self.logger.info('Getting location info for %s' % location_path) location = LocationTracker(location_path=location_path, location_name=location_name) # if location.location_type.location_type_name == "s3": @@ -286,7 +248,6 @@ def register_extracts_by_location(self, location_path, location_name=None): # , status='ready') # else: for file in os.listdir(location_path): - self.logger.debug('Registering file %s' % file) ExtractTracker(process_run=self , filename=file , location=location @@ -300,15 +261,12 @@ def register_new_process_run(self): child_process = aliased(Process) parent_process = aliased(Process) - self.logger.info('Finding latest process run.') - last_run = self.get_latest_tracking_record(process=self.process) new_run_flag = True new_run_id = 1 # Need to check the status of any dependencies. If dependencies are running or failed, halt this process. - self.logger.info('Checking process dependencies.') dependency_hold = self.session.query(ProcessDependency)\ .join(parent_process, ProcessDependency.parent_process_id == parent_process.process_id)\ @@ -320,18 +278,15 @@ def register_new_process_run(self): .count() if dependency_hold > 0: - self.logger.error('Processes that this process is dependent on are running or failed.') raise Exception('Processes that this process is dependent on are running or failed.') if last_run: # Must validate that the process is not currently running. - self.logger.info('Process run found. Verifying that the process is not running.') if last_run.process_status_id != self.process_status_running: last_run.is_latest_run = False new_run_flag = True new_run_id = last_run.process_run_id + 1 - self.logger.info('Previous process run not running. Creating new run.') else: new_run_flag = False @@ -346,13 +301,12 @@ def register_new_process_run(self): self.session.add(new_run) self.session.commit() - self.logger.info('Process tracking record added for %s' % self.process_name) - return new_run + self.logger.info('Process tracking record added for %s' % self.process_name) + else: - self.logger.error('Process %s is currently running.' % self.process_name) - raise Exception('Process %s is currently running.' % self.process_name) + raise Exception('The process %s is currently running.' % self.process_name) def register_process_sources(self, sources): """ @@ -361,12 +315,10 @@ def register_process_sources(self, sources): :return: List of source objects. """ if isinstance(sources, str): - self.logger.info('Only one source provided. Turning into list for processing.') sources = [sources] source_list = [] for source in sources: - self.logger.debug('Registering source %s' % source) source = self.data_store.get_or_create_item(model=Source, source_name=source) self.data_store.get_or_create_item(model=ProcessSource, source_id=source.source_id @@ -382,12 +334,10 @@ def register_process_targets(self, targets): :return: List of source objects. """ if isinstance(targets, str): - self.logger.info('Only one target provided. Turning into list for processing.') targets = [targets] target_list = [] for target in targets: - self.logger.debug('Registering target %s' % target) source = self.data_store.get_or_create_item(model=Source, source_name=target) self.data_store.get_or_create_item(model=ProcessTarget, target_source_id=source.source_id @@ -410,11 +360,9 @@ def set_process_run_low_high_dates(self, low_date=None, high_date=None): previous_high_date_time = self.process_tracking_run.process_run_low_date_time if low_date is not None and (previous_low_date_time is None or low_date < previous_low_date_time): - self.logger.info('Setting process run low date to %s' % low_date) self.process_tracking_run.process_run_low_date_time = low_date if high_date is not None and (previous_high_date_time is None or high_date > previous_high_date_time): - self.logger.info('Setting process run high date to %s' % high_date) self.process_tracking_run.process_run_high_date_time = high_date self.session.commit() @@ -429,13 +377,10 @@ def set_process_run_record_count(self, num_records): process_run_records = self.process.total_record_count if process_run_records == 0: - self.logger.info('Adding %s records to total record count.' % num_records) + self.process.total_record_count += num_records else: - new_records = num_records - process_run_records - - self.logger.info('Adding %s records to total record count.' % new_records) - self.process.total_record_count = self.process.total_record_count + new_records + self.process.total_record_count = self.process.total_record_count + (num_records - process_run_records) self.process_tracking_run.process_run_record_count = num_records self.session.commit() diff --git a/tests/test_process_tracker.py b/tests/test_process_tracker.py index 8a69572..91dc7da 100755 --- a/tests/test_process_tracker.py +++ b/tests/test_process_tracker.py @@ -129,8 +129,9 @@ def test_find_ready_extracts_by_filename_full(self): expected_result = ['/home/test/extract_dir/test_extract_filename2.csv'] given_result = self.process_tracker.find_ready_extracts_by_filename('test_extract_filename2.csv') + given_result = [record.full_filepath() for record in given_result] - self.assertEqual(expected_result, given_result) + self.assertCountEqual(expected_result, given_result) def test_find_ready_extracts_by_filename_partial(self): """ @@ -161,8 +162,9 @@ def test_find_ready_extracts_by_filename_partial(self): , '/home/test/extract_dir/test_extract_filename3-2.csv'] given_result = self.process_tracker.find_ready_extracts_by_filename('test_extract_filename') + given_result = [record.full_filepath() for record in given_result] - self.assertEqual(expected_result, given_result) + self.assertCountEqual(expected_result, given_result) def test_find_ready_extracts_by_filename_partial_not_descending(self): """ @@ -193,6 +195,7 @@ def test_find_ready_extracts_by_filename_partial_not_descending(self): , '/home/test/extract_dir/test_extract_filename3-1.csv'] given_result = self.process_tracker.find_ready_extracts_by_filename('test_extract_filename') + given_result = [record.full_filepath() for record in given_result] self.assertNotEqual(expected_result, given_result) @@ -225,8 +228,9 @@ def test_find_ready_extracts_by_location(self): , '/home/test/extract_dir/test_extract_filename4-2.csv'] given_result = self.process_tracker.find_ready_extracts_by_location('Test Location') + given_result = [record.full_filepath() for record in given_result] - self.assertEqual(expected_result, given_result) + self.assertCountEqual(expected_result, given_result) def test_find_ready_extracts_by_location_not_descending(self): """ @@ -257,6 +261,7 @@ def test_find_ready_extracts_by_location_not_descending(self): , '/home/test/extract_dir/test_extract_filename4-1.csv'] given_result = self.process_tracker.find_ready_extracts_by_location('Test Location') + given_result = [record.full_filepath() for record in given_result] self.assertNotEqual(expected_result, given_result) @@ -288,8 +293,9 @@ def test_find_ready_extracts_by_process(self): , '/home/test/extract_dir/test_extract_filename5-2.csv'] given_result = self.process_tracker.find_ready_extracts_by_process('Testing Process Tracking Initialization') + given_result = [record.full_filepath() for record in given_result] - self.assertEqual(sorted(expected_result), sorted(given_result)) + self.assertCountEqual(expected_result, given_result) def test_find_ready_extracts_by_process_not_descending(self): """ @@ -323,6 +329,7 @@ def test_find_ready_extracts_by_process_not_descending(self): , '/home/test/extract_dir/test_extract_filename5-1.csv'] given_result = self.process_tracker.find_ready_extracts_by_process('Testing Process Tracking Initialization') + given_result = [record.full_filepath() for record in given_result] self.assertNotEqual(expected_result, given_result) @@ -424,8 +431,8 @@ def test_register_new_process_run_exception(self): with self.assertRaises(Exception) as context: # Running registration a second time to mimic job being run twice self.process_tracker.register_new_process_run() - - return self.assertTrue('Process Testing Process Tracking Initialization ' + print(context.exception) + return self.assertTrue('The process Testing Process Tracking Initialization ' 'is currently running.' in str(context.exception)) def test_register_new_process_run_with_previous_run(self):