Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion process_tracker/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
]
preload_process_status_types = ["running", "completed", "failed"]
preload_process_types = ["extract", "load"]
preload_system_keys = [{"key": "version", "value": "0.1.0"}]
preload_system_keys = [{"key": "version", "value": "0.2.0"}]

relational_stores = ["postgresql", "mysql", "oracle", "mssql", "snowflake"]
nonrelational_stores = []
Expand Down
17 changes: 17 additions & 0 deletions process_tracker/process_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ def __init__(

self.process_tracking_run = self.register_new_process_run()

@staticmethod
def bulk_change_extract_status(extracts, extract_status):
"""
Given a set of extract objects, update the extract process record to reflect the association and updated status
as well as the extract record's' status.
:param extracts: List of Extract SQLAlchemy objects to be bulk updated.
:param extract_status: The status to change the extract files to.
:type extract_status: str
:return:
"""

for extract in extracts:
extract.change_extract_status(new_status=extract_status)

def change_run_status(self, new_status, end_date=None):
"""
Change a process tracking run record from 'running' to another status.
Expand Down Expand Up @@ -155,6 +169,8 @@ def find_ready_extracts_by_filename(self, filename):
.all()
)

self.logger.info("Returning extract files by filename.")

return process_files

def find_ready_extracts_by_location(self, location_name=None, location_path=None):
Expand Down Expand Up @@ -195,6 +211,7 @@ def find_ready_extracts_by_location(self, location_name=None, location_path=None
"A location name or path must be provided. Please try again."
)

self.logger.info("Returning extract files by location.")
return process_files

def find_ready_extracts_by_process(self, extract_process_name):
Expand Down
40 changes: 40 additions & 0 deletions tests/test_process_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,46 @@ def tearDown(self):
self.session.query(ErrorType).delete()
self.session.commit()

def test_bulk_change_extract_status(self):
"""
Testing that bulk change occurs when extracts provided.
:return:
"""
extract = ExtractTracker(
process_run=self.process_tracker,
filename="test_extract_filename2.csv",
location_name="Test Location",
location_path="/home/test/extract_dir",
)

extract2 = ExtractTracker(
process_run=self.process_tracker,
filename="test_extract_filename3.csv",
location_name="Test Location",
location_path="/home/test/extract_dir",
)

extracts = [extract, extract2]

self.process_tracker.bulk_change_extract_status(
extracts=extracts, extract_status="loading"
)

given_result = (
self.session.query(ExtractProcess)
.join(ExtractStatus)
.filter(
ExtractProcess.process_tracking_id
== self.process_tracker.process_tracking_run.process_tracking_id
)
.filter(ExtractStatus.extract_status_name == "loading")
.count()
)

expected_result = 2

self.assertEqual(expected_result, given_result)

def test_change_status_invalid_type(self):
"""
Testing that if an invalid process status type is passed, it will trigger an exception.
Expand Down