Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions process_tracker/models/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ class Actor(Base):

actor_id = Column(Integer, Sequence('actor_lkup_actor_id_seq'), primary_key=True)
actor_name = Column(String(250), nullable=False, unique=True)

def __repr__(self):
return "<Actor id=%s, name=%s>" % (self.actor_id
, self.actor_name)
36 changes: 35 additions & 1 deletion process_tracker/models/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Models for Extract (Data) entities

from datetime import datetime
from os.path import join

from sqlalchemy import Column, DateTime, ForeignKey, Integer, Sequence, String
from sqlalchemy.orm import relationship
Expand All @@ -18,6 +19,11 @@ class ExtractStatus(Base):

extracts = relationship("ExtractProcess")

def __repr__(self):

return "<Extract Status id=%s, name=%s>" % (self.extract_status_id
, self.extract_status_name)


class Extract(Base):

Expand All @@ -32,6 +38,17 @@ class Extract(Base):
extract_process = relationship("ExtractProcess", back_populates='process_extracts')
locations = relationship("Location", foreign_keys=[extract_location_id])

def __repr__(self):

return "<Extract id=%s, filename=%s, location=%s, status=%s>" % (self.extract_id
, self.extract_filename
, self.extract_location_id
, self.extract_status_id)

def full_filepath(self):

return join(self.locations.location_path, self.extract_filename)


class ExtractProcess(Base):

Expand All @@ -45,6 +62,12 @@ class ExtractProcess(Base):
process_extracts = relationship('Extract', foreign_keys=[extract_tracking_id])
extract_processes = relationship('ProcessTracking', foreign_keys=[process_tracking_id])

def __repr__(self):

return "<ExtractProcess extract=%s, process_run=%s, extract_status=%s>" % (self.extract_tracking_id
, self.process_tracking_id
, self.extract_process_status_id)


class LocationType(Base):

Expand All @@ -54,7 +77,12 @@ class LocationType(Base):
location_type_name = Column(String(25), unique=True, nullable=False)

locations = relationship('Location', back_populates='location_types')


def __repr__(self):

return "<LocationType id=%s, name=%s>" % (self.location_type_id
, self.location_type_name)


class Location(Base):

Expand All @@ -68,3 +96,9 @@ class Location(Base):
extracts = relationship("Extract")

location_types = relationship('LocationType', foreign_keys=[location_type])

def __repr__(self):

return "<Location id=%s, name=%s, type=%s>" % (self.location_id
, self.location_name
, self.location_path)
95 changes: 20 additions & 75 deletions process_tracker/process_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from datetime import datetime
import logging
import os
from os.path import join

from sqlalchemy.orm import aliased

Expand All @@ -16,8 +15,7 @@

from process_tracker.models.actor import Actor
from process_tracker.models.extract import Extract, ExtractProcess, ExtractStatus, Location
from process_tracker.models.process import ErrorTracking, ErrorType, Process, ProcessDependency, ProcessTracking\
, ProcessStatus, ProcessSource, ProcessTarget, ProcessType
from process_tracker.models.process import ErrorTracking, ErrorType, Process, ProcessDependency, ProcessTracking, ProcessStatus, ProcessSource, ProcessTarget, ProcessType
from process_tracker.models.source import Source
from process_tracker.models.tool import Tool

Expand Down Expand Up @@ -80,13 +78,10 @@ def change_run_status(self, new_status, end_date=None):
:return:
"""
if end_date is None:
self.logger.info('Eng date was not set. Setting to current timestamp.')
end_date = datetime.now()

if self.process_status_types[new_status]:

self.logger.info('New status exists. Setting run to %s' % new_status)

self.process_tracking_run.process_status_id = self.process_status_types[new_status]

if (self.process_status_types[new_status] == self.process_status_complete) \
Expand All @@ -104,7 +99,6 @@ def change_run_status(self, new_status, end_date=None):
self.session.commit()

else:
self.logger.error('%s is not a valid process status type.' % new_status)
raise Exception('%s is not a valid process status type. '
'Please add the status to process_status_type_lkup' % new_status)

Expand All @@ -115,71 +109,54 @@ def find_ready_extracts_by_filename(self, filename):
:param filename:
:return:
"""
extract_files = []

self.logger.info('Searching for extracts with full/partial filename of %s' % filename)

process_files = self.session.query(Extract.extract_filename, Location.location_path)\
.join(Location)\
process_files = self.session.query(Extract)\
.join(ExtractStatus)\
.filter(Extract.extract_filename.like("%" + filename + "%"))\
.filter(ExtractStatus.extract_status_name == 'ready') \
.order_by(Extract.extract_registration_date_time)\
.order_by(Extract.extract_id)

for record in process_files:
extract_files.append(join(record.location_path, record.extract_filename))
self.logger.debug('Found file %s' % record.extract_filename)
.order_by(Extract.extract_id)\
.all()

return extract_files
return process_files

def find_ready_extracts_by_location(self, location):
"""
For the given location name, find all matching extracts that are ready for processing
:param location:
:return:
"""
extract_files = []

self.logger.info('Searching for extracts found at the location named %s' % location)

process_files = self.session.query(Extract.extract_filename, Location.location_path)\
process_files = self.session.query(Extract)\
.join(Location)\
.join(ExtractStatus)\
.filter(ExtractStatus.extract_status_name == 'ready')\
.filter(Location.location_name == location) \
.order_by(Extract.extract_registration_date_time)

for record in process_files:
extract_files.append(join(record.location_path, record.extract_filename))
self.logger.debug('Found file %s' % record.extract_filename)
.order_by(Extract.extract_registration_date_time)\
.all()

return extract_files
return process_files

def find_ready_extracts_by_process(self, extract_process_name):
"""
For the given named process, find the extracts that are ready for processing.
:return: List of OS specific filepaths with filenames.
"""
extract_files = []

self.logger.info('Searching for extracts related to process %s' % extract_process_name)

process_files = self.session.query(Extract.extract_filename, Location.location_path) \
process_files = self.session.query(Extract) \
.join(ExtractStatus, Extract.extract_status_id == ExtractStatus.extract_status_id) \
.join(Location, Extract.extract_location_id == Location.location_id) \
.join(ExtractProcess, Extract.extract_id == ExtractProcess.extract_tracking_id) \
.join(ProcessTracking) \
.join(Process) \
.filter(Process.process_name == extract_process_name
, ExtractStatus.extract_status_name == 'ready') \
.order_by(Extract.extract_registration_date_time)
.order_by(Extract.extract_registration_date_time)\
.all()

for record in process_files:
extract_files.append(join(record.location_path, record.extract_filename))
self.logger.debug('Found file %s' % record.extract_filename)
self.logger.info('Returning extract files by process.')

return extract_files
return process_files

def get_latest_tracking_record(self, process):
"""
Expand All @@ -188,14 +165,13 @@ def get_latest_tracking_record(self, process):
:type process: integer
:return:
"""
self.logger.info('Searching for latest process run for process %s' % process.process_name)

instance = self.session.query(ProcessTracking)\
.filter(ProcessTracking.process_id == process.process_id)\
.order_by(ProcessTracking.process_run_id.desc())\
.first()

if instance is None:
self.logger.info('Process run not found.')
return False

return instance
Expand All @@ -207,12 +183,7 @@ def get_process_status_types(self):
"""
status_types = {}

self.logger.info('Getting all process status types.')

for record in self.session.query(ProcessStatus):

self.logger.debug('Found process status %s' % record.process_status_name)

status_types[record.process_status_name] = record.process_status_id

return status_types
Expand All @@ -229,21 +200,14 @@ def raise_run_error(self, error_type_name, error_description=None, fail_run=Fals
:type end_date: datetime
:return:
"""
self.logger.info('Raising run error.')

if end_date is None:
self.logger.info('Setting end date since one was not provided.')
end_date = datetime.now() # Need the date to match across all parts of the event in case the run is failed.

if error_description is None:
self.logger.info('Setting default error description since one was not provided.')
error_description = 'Unspecified error.'

self.logger.info('Getting error type.')

error_type = self.data_store.get_or_create_item(model=ErrorType, create=False, error_type_name=error_type_name)

self.logger.info('Setting run error.')
run_error = ErrorTracking(error_type_id=error_type.error_type_id
, error_description=error_description
, process_tracking_id=self.process_tracking_run.process_tracking_id
Expand All @@ -256,7 +220,6 @@ def raise_run_error(self, error_type_name, error_description=None, fail_run=Fals
if fail_run:
self.change_run_status(new_status='failed', end_date=end_date)
self.session.commit()
self.logger.error('Process halting. An error triggered the process to fail.')
raise Exception('Process halting. An error triggered the process to fail.')

def register_extracts_by_location(self, location_path, location_name=None):
Expand All @@ -266,7 +229,6 @@ def register_extracts_by_location(self, location_path, location_name=None):
:param location_path: Path of the location
:return:
"""
self.logger.info('Getting location info for %s' % location_path)
location = LocationTracker(location_path=location_path, location_name=location_name)

# if location.location_type.location_type_name == "s3":
Expand All @@ -286,7 +248,6 @@ def register_extracts_by_location(self, location_path, location_name=None):
# , status='ready')
# else:
for file in os.listdir(location_path):
self.logger.debug('Registering file %s' % file)
ExtractTracker(process_run=self
, filename=file
, location=location
Expand All @@ -300,15 +261,12 @@ def register_new_process_run(self):
child_process = aliased(Process)
parent_process = aliased(Process)

self.logger.info('Finding latest process run.')

last_run = self.get_latest_tracking_record(process=self.process)

new_run_flag = True
new_run_id = 1

# Need to check the status of any dependencies. If dependencies are running or failed, halt this process.
self.logger.info('Checking process dependencies.')

dependency_hold = self.session.query(ProcessDependency)\
.join(parent_process, ProcessDependency.parent_process_id == parent_process.process_id)\
Expand All @@ -320,18 +278,15 @@ def register_new_process_run(self):
.count()

if dependency_hold > 0:
self.logger.error('Processes that this process is dependent on are running or failed.')
raise Exception('Processes that this process is dependent on are running or failed.')

if last_run:
# Must validate that the process is not currently running.
self.logger.info('Process run found. Verifying that the process is not running.')

if last_run.process_status_id != self.process_status_running:
last_run.is_latest_run = False
new_run_flag = True
new_run_id = last_run.process_run_id + 1
self.logger.info('Previous process run not running. Creating new run.')
else:
new_run_flag = False

Expand All @@ -346,13 +301,12 @@ def register_new_process_run(self):
self.session.add(new_run)
self.session.commit()

self.logger.info('Process tracking record added for %s' % self.process_name)

return new_run

self.logger.info('Process tracking record added for %s' % self.process_name)

else:
self.logger.error('Process %s is currently running.' % self.process_name)
raise Exception('Process %s is currently running.' % self.process_name)
raise Exception('The process %s is currently running.' % self.process_name)

def register_process_sources(self, sources):
"""
Expand All @@ -361,12 +315,10 @@ def register_process_sources(self, sources):
:return: List of source objects.
"""
if isinstance(sources, str):
self.logger.info('Only one source provided. Turning into list for processing.')
sources = [sources]
source_list = []

for source in sources:
self.logger.debug('Registering source %s' % source)
source = self.data_store.get_or_create_item(model=Source, source_name=source)

self.data_store.get_or_create_item(model=ProcessSource, source_id=source.source_id
Expand All @@ -382,12 +334,10 @@ def register_process_targets(self, targets):
:return: List of source objects.
"""
if isinstance(targets, str):
self.logger.info('Only one target provided. Turning into list for processing.')
targets = [targets]
target_list = []

for target in targets:
self.logger.debug('Registering target %s' % target)
source = self.data_store.get_or_create_item(model=Source, source_name=target)

self.data_store.get_or_create_item(model=ProcessTarget, target_source_id=source.source_id
Expand All @@ -410,11 +360,9 @@ def set_process_run_low_high_dates(self, low_date=None, high_date=None):
previous_high_date_time = self.process_tracking_run.process_run_low_date_time

if low_date is not None and (previous_low_date_time is None or low_date < previous_low_date_time):
self.logger.info('Setting process run low date to %s' % low_date)
self.process_tracking_run.process_run_low_date_time = low_date

if high_date is not None and (previous_high_date_time is None or high_date > previous_high_date_time):
self.logger.info('Setting process run high date to %s' % high_date)
self.process_tracking_run.process_run_high_date_time = high_date

self.session.commit()
Expand All @@ -429,13 +377,10 @@ def set_process_run_record_count(self, num_records):
process_run_records = self.process.total_record_count

if process_run_records == 0:
self.logger.info('Adding %s records to total record count.' % num_records)

self.process.total_record_count += num_records
else:
new_records = num_records - process_run_records

self.logger.info('Adding %s records to total record count.' % new_records)
self.process.total_record_count = self.process.total_record_count + new_records
self.process.total_record_count = self.process.total_record_count + (num_records - process_run_records)

self.process_tracking_run.process_run_record_count = num_records
self.session.commit()
Loading