Skip to content

Commit 97f085a

Browse files
authored
Merge pull request #73 from OpenDataAlex/process_tracker_python-12
process_tracker_python-12 Handling Process Dependency failures
2 parents 1cf04ce + 1669d15 commit 97f085a

8 files changed

Lines changed: 191 additions & 12 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,3 +173,5 @@ pip-selfcheck.json
173173
.idea/misc.xml
174174
.idea/modules.xml
175175
.idea/process_tracker_python.iml
176+
/tests/s3:/
177+
/tests/s3:\/

configs/mysql_config.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[DEFAULT]
22
log_level = ERROR
3+
max_sequential_failures = 5
34
data_store_type = mysql
45
data_store_username = pt_admin
56
data_store_password = Testing1!

configs/postgres_config.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[DEFAULT]
22
log_level = ERROR
3+
max_sequential_failures = 5
34
data_store_type = postgresql
45
data_store_username = pt_admin
56
data_store_password = Testing1!

dbscripts/mysql_process_tracker_defaults.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ INSERT INTO process_tracker.extract_status_lkup (extract_status_id, extract_stat
1111
INSERT INTO process_tracker.process_status_lkup (process_status_id, process_status_name) VALUES (default, 'running');
1212
INSERT INTO process_tracker.process_status_lkup (process_status_id, process_status_name) VALUES (default, 'completed');
1313
INSERT INTO process_tracker.process_status_lkup (process_status_id, process_status_name) VALUES (default, 'failed');
14+
INSERT INTO process_tracker.process_status_lkup (process_status_id, process_status_name) VALUES (default, 'on hold');
1415

1516
INSERT INTO process_tracker.error_type_lkup (error_type_id, error_type_name) VALUES (default, 'File Error');
1617
INSERT INTO process_tracker.error_type_lkup (error_type_id, error_type_name) VALUES (default, 'Data Error');

dbscripts/postgresql_process_tracker_defaults.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ INSERT INTO process_tracker.extract_status_lkup (extract_status_id, extract_stat
1111
INSERT INTO process_tracker.process_status_lkup (process_status_id, process_status_name) VALUES (default, 'running');
1212
INSERT INTO process_tracker.process_status_lkup (process_status_id, process_status_name) VALUES (default, 'completed');
1313
INSERT INTO process_tracker.process_status_lkup (process_status_id, process_status_name) VALUES (default, 'failed');
14+
INSERT INTO process_tracker.process_status_lkup (process_status_id, process_status_name) VALUES (default, 'on hold');
15+
1416

1517
INSERT INTO process_tracker.error_type_lkup (error_type_id, error_type_name) VALUES (default, 'File Error');
1618
INSERT INTO process_tracker.error_type_lkup (error_type_id, error_type_name) VALUES (default, 'Data Error');

process_tracker/process_tracker.py

Lines changed: 75 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,8 @@ def __init__(
8282
:type dataset_types: list
8383
"""
8484
self.config_location = config_location
85-
log_level = SettingsManager(
86-
config_location=self.config_location
87-
).determine_log_level()
85+
self.config = SettingsManager(config_location=self.config_location)
86+
log_level = self.config.determine_log_level()
8887

8988
self.logger = logging.getLogger(__name__)
9089
self.logger.setLevel(log_level)
@@ -144,6 +143,7 @@ def __init__(
144143
self.process_status_running = self.process_status_types["running"]
145144
self.process_status_complete = self.process_status_types["completed"]
146145
self.process_status_failed = self.process_status_types["failed"]
146+
self.process_status_hold = self.process_status_types["on hold"]
147147

148148
self.process_tracking_run = self.register_new_process_run()
149149

@@ -206,6 +206,61 @@ def change_run_status(self, new_status, end_date=None):
206206
else:
207207
raise Exception("The provided status type %s is invalid." % new_status)
208208

209+
def determine_hold_status(self, last_run_status, last_run_id):
210+
"""
211+
Based on the setting 'max_concurrent_failures', count the number of failures for that number of process runs.
212+
If the counts match, process will remain on hold. If last run is 'on_hold' process will remain on hold.
213+
:param last_run_status: The status of the previous run
214+
:param last_run_id: The process_run_id of the previous run
215+
:return:
216+
"""
217+
self.logger.debug("Determining if process should be put on or remain on hold.")
218+
219+
max_concurrent_failures = int(
220+
self.config.config["DEFAULT"]["max_sequential_failures"]
221+
)
222+
223+
self.logger.debug("Max Concurrent failures is %s" % max_concurrent_failures)
224+
# last_runs = (
225+
# self.session.query(ProcessTracking.process_tracking_id)
226+
# .join(Process)
227+
# .filter(Process.process_name == self.process_name)
228+
# .order_by(ProcessTracking.process_run_id.desc())
229+
# .limit(max_concurrent_failures)
230+
# .subquery()
231+
# )
232+
233+
failure_count = (
234+
self.session.query(ProcessTracking)
235+
.join(Process)
236+
.filter(Process.process_name == self.process_name)
237+
.filter(
238+
ProcessTracking.process_run_id > (last_run_id - max_concurrent_failures)
239+
)
240+
.filter(ProcessTracking.process_status_id == self.process_status_failed)
241+
.count()
242+
)
243+
244+
# failure_count = (
245+
# self.session.query(ProcessTracking)
246+
# .filter(ProcessTracking.process_tracking_id.in_(last_runs))
247+
# .filter(ProcessTracking.process_status_id == self.process_status_failed)
248+
# .count()
249+
# )
250+
251+
self.logger.debug("Number of failures in past runs is %s" % failure_count)
252+
253+
if last_run_status == self.process_status_hold:
254+
self.logger.error("Last run still in hold status. Need to remain in hold.")
255+
return True
256+
elif failure_count == max_concurrent_failures:
257+
self.logger.error(
258+
"Number of failures has reached max_concurrent_failures. Putting process on hold until resolved."
259+
)
260+
return True
261+
else:
262+
return False
263+
209264
def find_extracts_by_filename(self, filename, status="ready"):
210265
"""
211266
For the given filename, or filename part, find all matching extracts that are ready for processing.
@@ -326,9 +381,6 @@ def get_latest_tracking_record(self, process):
326381
.first()
327382
)
328383

329-
if instance is None:
330-
return False
331-
332384
return instance
333385

334386
def get_process_status_types(self):
@@ -480,16 +532,28 @@ def register_new_process_run(self):
480532
"Processes that this process is dependent on are running or failed."
481533
)
482534

483-
if last_run:
535+
if last_run is not None and last_run:
484536
# Must validate that the process is not currently running.
485537

486-
if last_run.process_status_id != self.process_status_running:
538+
if (
539+
last_run.process_status_id != self.process_status_running
540+
and last_run.process_status_id != self.process_status_hold
541+
):
487542
last_run.is_latest_run = False
488543
new_run_flag = True
489544
new_run_id = last_run.process_run_id + 1
490545
else:
491546
new_run_flag = False
492547

548+
if self.determine_hold_status(
549+
last_run_status=last_run.process_status_id,
550+
last_run_id=last_run.process_run_id,
551+
):
552+
self.logger.error(
553+
"Process is on hold due to number of concurrent failures or previous run is in on hold status."
554+
)
555+
new_run_flag = False
556+
493557
if new_run_flag:
494558
new_run = ProcessTracking(
495559
process_id=self.process.process_id,
@@ -508,7 +572,9 @@ def register_new_process_run(self):
508572
return new_run
509573

510574
else:
511-
raise Exception("The process %s is currently running." % self.process_name)
575+
raise Exception(
576+
"The process %s is currently running or on hold." % self.process_name
577+
)
512578

513579
def register_process_dataset_types(self, dataset_types):
514580
"""

tests/test_process_tracker.py

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,28 @@ def tearDown(self):
114114
self.session.query(ErrorType).delete()
115115
self.session.commit()
116116

117+
def process_run_setup(self, process_name, status, num_runs):
118+
"""
119+
Helper function to setup mutliple process tracking runs for a given process and set to a given status. Used to
120+
test the on_hold status logic.
121+
:param process_name: Name of the process that runs will be created of.
122+
:param status: Status the runs should be changed to
123+
:param num_runs: Number of runs that should be created
124+
:return:
125+
"""
126+
i = 1
127+
while i <= num_runs:
128+
process_run = ProcessTracker(
129+
process_name=process_name,
130+
process_type="Extract",
131+
actor_name="UnitTesting",
132+
tool_name="Spark",
133+
)
134+
135+
process_run.change_run_status(new_status=status)
136+
i += 1
137+
time.sleep(2)
138+
117139
def test_bulk_change_extract_status(self):
118140
"""
119141
Testing that bulk change occurs when extracts provided.
@@ -644,6 +666,90 @@ def test_initializing_process_tracking(self):
644666

645667
self.assertEqual(expected_result, given_result)
646668

669+
@unittest.skip("Issue with hanging queries on database.")
670+
def test_process_on_hold_max_failures(self):
671+
"""
672+
Testing that when number of failed processes matches the maximum_sequential_failures (default 5), process run
673+
goes on_hold.
674+
:return:
675+
"""
676+
677+
self.process_run_setup(
678+
process_name="On Hold Max Failures Test", status="failed", num_runs=5
679+
)
680+
681+
with self.assertRaises(Exception) as context:
682+
ProcessTracker(
683+
process_name="On Hold Max Failures Test",
684+
process_type="Extract",
685+
actor_name="UnitTesting",
686+
tool_name="Spark",
687+
)
688+
689+
self.assertTrue(
690+
"The process On Hold Max Failures Test is currently running or on_hold."
691+
in str(context.exception)
692+
)
693+
694+
@unittest.skip("Issue with hanging queries on database.")
695+
def test_process_on_hold_under_max_failures(self):
696+
"""
697+
Testing that when number of failed processes is less than the maximum_sequential_failures (default 5), process run
698+
continues.
699+
:return:
700+
"""
701+
process_name = "On Hold Under Max Failures Test"
702+
self.process_run_setup(process_name=process_name, status="failed", num_runs=3)
703+
704+
process_run = ProcessTracker(
705+
process_name=process_name,
706+
process_type="Extract",
707+
actor_name="UnitTesting",
708+
tool_name="Spark",
709+
)
710+
711+
current_run_status = (
712+
self.session.query(ProcessTracking)
713+
.join(Process)
714+
.filter(Process.process_name == process_name)
715+
.filter(ProcessTracking.is_latest_run == True)
716+
)
717+
given_result = current_run_status[0].process_status_id
718+
719+
expected_result = process_run.process_status_running
720+
721+
self.assertEqual(expected_result, given_result)
722+
723+
def test_process_on_hold_previous_run_on_hold(self):
724+
"""
725+
If the previous run does not get moved from on_hold status, then the next run will not kick off and the process
726+
will remain on_hold.
727+
:return:
728+
"""
729+
process_name = "On Hold Previous Run Test"
730+
731+
process_run = ProcessTracker(
732+
process_name=process_name,
733+
process_type="Extract",
734+
actor_name="UnitTesting",
735+
tool_name="Spark",
736+
)
737+
738+
process_run.change_run_status(new_status="on hold")
739+
740+
with self.assertRaises(Exception) as context:
741+
ProcessTracker(
742+
process_name=process_name,
743+
process_type="Extract",
744+
actor_name="UnitTesting",
745+
tool_name="Spark",
746+
)
747+
748+
self.assertTrue(
749+
"The process On Hold Previous Run Test is currently running or on hold."
750+
in str(context.exception)
751+
)
752+
647753
def test_register_extracts_by_location_local_file_count(self):
648754
"""
649755
Testing that when the location is local, all the extracts are counted and registered in the location's file count.
@@ -865,8 +971,8 @@ def test_register_new_process_run_exception(self):
865971
self.process_tracker.register_new_process_run()
866972

867973
return self.assertTrue(
868-
"The process Testing Process Tracking Initialization "
869-
"is currently running." in str(context.exception)
974+
"The process Testing Process Tracking Initialization is currently running or on hold."
975+
in str(context.exception)
870976
)
871977

872978
def test_register_new_process_run_with_previous_run(self):

tests/utilities/test_utilities.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def test_determine_low_high_date_invalid_date_type(self):
2626
utilities.determine_low_high_date(
2727
date=lower_low_date, previous_date=low_date, date_type="blarg"
2828
)
29-
print(context.exception)
29+
3030
return self.assertTrue(
3131
"blarg is not a valid date_type." in str(context.exception)
3232
)

0 commit comments

Comments
 (0)