From 990a763b115a747e86126639f9e54d945f06db7c Mon Sep 17 00:00:00 2001 From: Alex Meadows Date: Fri, 7 Jun 2019 11:36:20 -0400 Subject: [PATCH] process_tracker_python-21 Add or Delete Process Dependencies From CLI :sparkles: Ability to create and delete Process Dependencies from CLI Process Dependencies can now be managed via the CLI tool. Closes:#21 --- README.md | 1 + process_tracker/cli.py | 28 +++- process_tracker/data_store.py | 282 ++++++++++++++++++++-------------- tests/test_cli.py | 113 +++++++++++++- 4 files changed, 302 insertions(+), 122 deletions(-) diff --git a/README.md b/README.md index ee4cfae..4f84325 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ Data integration process management made easy! [![PyPI version](https://badge.fury.io/py/processtracker.svg)](https://badge.fury.io/py/processtracker) [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/ambv/black) [![License: GPL v3](https://img.shields.io/badge/License-GPLv3-blue.svg)](https://www.gnu.org/licenses/gpl-3.0) +[![Documentation Status](https://readthedocs.org/projects/process-tracker/badge/?version=latest)](https://process-tracker.readthedocs.io/en/latest/?badge=latest) This is the Python implementation of the ProcessTracker framework. ProcessTracker builds a standard framework that is tool agnostic. If you are working with data integration/cleansing processes within Python (i.e. using PySpark, Pandas, etc.) diff --git a/process_tracker/cli.py b/process_tracker/cli.py index ec566f2..f1d7c0a 100644 --- a/process_tracker/cli.py +++ b/process_tracker/cli.py @@ -44,31 +44,51 @@ def main(): @main.command() @click.option("-t", "--topic", help="The topic being created") @click.option("-n", "--name", help="The name for the topic.") -def create(topic, name): +@click.option( + "-p", "--parent", help="The parent process' name, if creating a process dependency" +) +@click.option( + "-c", "--child", help="The child process' name, if creating a process dependency" +) +def create(topic, name, parent=None, child=None): """ Create an item that is within the valid topics list. :param topic: The name of the topic. :type topic: string :param name: The name of the topic item to be added. :type name: string + :param parent: The parent process' name, if creating a process dependency + :type parent: string + :param child: The child process' name, if creating a process dependency + :type child: string """ click.echo("Attempting to create %s with name %s" % (topic, name)) - data_store.topic_creator(topic=topic, name=name) + data_store.topic_creator(topic=topic, name=name, parent=parent, child=child) @main.command() @click.option("-t", "--topic", help="The topic being created") @click.option("-n", "--name", help="The name for the topic.") -def delete(topic, name): +@click.option( + "-p", "--parent", help="The parent process' name, if deleting a process dependency" +) +@click.option( + "-c", "--child", help="The child process' name, if deleting a process dependency" +) +def delete(topic, name, parent=None, child=None): """ Delete an item that is within the valid topics list and not a pre-loaded item. :param topic: The name of the topic. :type topic: string :param name: The name of the topic item to be deleted. :type name: string + :param parent: The parent process' name, if deleting a process dependency + :type parent: string + :param child: The child process' name, if deleting a process dependency + :type child: string """ click.echo("Attempting to delete %s with name %s" % (topic, name)) - data_store.topic_deleter(topic=topic, name=name) + data_store.topic_deleter(topic=topic, name=name, parent=parent, child=child) @main.command() diff --git a/process_tracker/data_store.py b/process_tracker/data_store.py index f895a91..5954d7b 100755 --- a/process_tracker/data_store.py +++ b/process_tracker/data_store.py @@ -3,7 +3,7 @@ from click import ClickException from sqlalchemy import create_engine, MetaData -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import aliased, sessionmaker from sqlalchemy_utils import database_exists from process_tracker.utilities.settings import SettingsManager @@ -11,7 +11,13 @@ from process_tracker.models.model_base import Base from process_tracker.models.actor import Actor from process_tracker.models.extract import ExtractStatus -from process_tracker.models.process import ErrorType, ProcessType, ProcessStatus +from process_tracker.models.process import ( + ErrorType, + Process, + ProcessDependency, + ProcessType, + ProcessStatus, +) from process_tracker.models.source import Source from process_tracker.models.system import System from process_tracker.models.tool import Tool @@ -30,10 +36,7 @@ preload_process_types = ["extract", "load"] preload_system_keys = [{"key": "version", "value": "0.2.0"}] -relational_stores = ["postgresql", "mysql", "oracle", "mssql", "snowflake"] -nonrelational_stores = [] - -supported_data_stores = relational_stores + nonrelational_stores +supported_data_stores = ["postgresql", "mysql", "oracle", "mssql", "snowflake"] class DataStore: @@ -167,47 +170,74 @@ def initialize_data_store(self, overwrite=False): self.logger.debug("Finished the initialization check.") - def topic_creator(self, topic, name): + def topic_creator(self, topic, name, parent=None, child=None): """ For the command line tool, validate the topic and create the new instance. - :param topic: - :param name: + :param topic: The name of the topic. + :type topic: string + :param name: The name of the topic item to be added. + :type name: string + :param parent: The parent process' name, if creating a process dependency + :type parent: string + :param child: The child process' name, if creating a process dependency + :type child: string :return: """ self.logger.info("Attempting to create %s item: %s" % (topic, name)) if self.topic_validator(topic=topic): - try: - if topic == "actor": - item = self.get_or_create_item(model=Actor, actor_name=name) - self.logger.info("Actor created: %s" % item.__repr__) - if topic == "extract status": - item = self.get_or_create_item( - model=ExtractStatus, extract_status_name=name - ) - self.logger.info("Extract Status created: %s" % item.__repr__) - if topic == "error type": - item = self.get_or_create_item( - model=ErrorType, error_type_name=name - ) - self.logger.info("Error Type created: %s" % item.__repr__) - if topic == "process type": - item = self.get_or_create_item( - model=ProcessType, process_type_name=name - ) - self.logger.info("Process Type created: %s" % item.__repr__) - if topic == "process status": - item = self.get_or_create_item( - model=ProcessStatus, process_status_name=name - ) - self.logger.info("Process Status created: %s" % item.__repr__) - if topic == "source": - item = self.get_or_create_item(model=Source, source_name=name) - self.logger.info("Source created: %s" % item.__repr__) - if topic == "tool": - item = self.get_or_create_item(model=Tool, tool_name=name) - self.logger.info("Tool created: %s" % item.__repr__) - finally: + + if topic == "actor": + item = self.get_or_create_item(model=Actor, actor_name=name) + self.logger.info("Actor created: %s" % item.__repr__) + + elif topic == "extract status": + item = self.get_or_create_item( + model=ExtractStatus, extract_status_name=name + ) + self.logger.info("Extract Status created: %s" % item.__repr__) + + elif topic == "error type": + item = self.get_or_create_item(model=ErrorType, error_type_name=name) + self.logger.info("Error Type created: %s" % item.__repr__) + + elif topic == "process dependency": + parent_process = self.get_or_create_item( + model=Process, process_name=parent, create=False + ) + child_process = self.get_or_create_item( + model=Process, process_name=child, create=False + ) + + item = self.get_or_create_item( + model=ProcessDependency, + parent_process_id=parent_process.process_id, + child_process_id=child_process.process_id, + ) + + self.logger.info("Process Dependency created: %s" % item.__repr__) + + elif topic == "process type": + item = self.get_or_create_item( + model=ProcessType, process_type_name=name + ) + self.logger.info("Process Type created: %s" % item.__repr__) + + elif topic == "process status": + item = self.get_or_create_item( + model=ProcessStatus, process_status_name=name + ) + self.logger.info("Process Status created: %s" % item.__repr__) + + elif topic == "source": + item = self.get_or_create_item(model=Source, source_name=name) + self.logger.info("Source created: %s" % item.__repr__) + + elif topic == "tool": + item = self.get_or_create_item(model=Tool, tool_name=name) + self.logger.info("Tool created: %s" % item.__repr__) + + else: ClickException("Invalid topic type.").show() self.logger.error("Invalid topic type.") @@ -218,13 +248,17 @@ def topic_creator(self, topic, name): return item - def topic_deleter(self, topic, name): + def topic_deleter(self, topic, name, parent=None, child=None): """ For the command line tool, validate that the topic name is not a default value and if not, delete it. :param topic: The SQLAlchemy object type :type topic: SQLAlchemy object :param name: Name of the item to be deleted. :type name: string + :param parent: The parent process' name, if deleting a process dependency + :type parent: string + :param child: The child process' name, if deleting a process dependency + :type child: string :return: """ item_delete = False @@ -251,6 +285,26 @@ def topic_deleter(self, topic, name): ErrorType.error_type_name == name ).delete() self.logger.info("%s %s deleted." % (topic, name)) + elif topic == "process dependency": + item_delete = True + + parent_process = self.get_or_create_item( + model=Process, process_name=parent, create=False + ) + + child_process = self.get_or_create_item( + model=Process, process_name=child, create=False + ) + + item = self.get_or_create_item( + model=ProcessDependency, + parent_process_id=parent_process.process_id, + child_process_id=child_process.process_id, + ) + + self.session.delete(item) + + self.logger.info("%s %s - %s deleted." % (topic, parent, child)) elif topic == "process type" and name not in preload_process_types: item_delete = True @@ -291,6 +345,8 @@ def topic_deleter(self, topic, name): if item_delete: self.session.commit() + return "blarg" + def topic_updater(self, topic, initial_name, name): """ For the command line tool, validate that the topic name is not a default value and if not, update it. @@ -389,6 +445,7 @@ def topic_validator(self, topic): "actor", "error type", "extract status", + "process dependency", "process status", "process type", "source", @@ -453,94 +510,85 @@ def verify_and_connect_to_data_store(self): raise Exception(errors) if data_store_type in supported_data_stores: - engine = "" - meta = "" - session = "" + self.logger.info("Data store is supported.") + self.logger.info("Data store is %s" % data_store_type) - if data_store_type in relational_stores: - - self.logger.info("Data store is relational.") - self.logger.info("Data store is %s" % data_store_type) - - if ( - data_store_type == "postgresql" - or data_store_type == "oracle" - or data_store_type == "snowflake" - ): - - engine = create_engine( - data_store_type - + "://" - + data_store_username - + ":" - + data_store_password - + "@" - + data_store_host - + "/" - + data_store_name - ) - - elif data_store_type == "mysql": - - engine = create_engine( - "mysql+pymysql://" - + data_store_username - + ":" - + data_store_password - + "@" - + data_store_host - + "/" - + data_store_name - ) - elif data_store_type == "mssql": - - engine = create_engine( - "mssql+pymssql://" - + data_store_username - + ":" - + data_store_password - + "@" - + data_store_host - + "/" - + data_store_name - ) - - else: - self.logger.error("Data store type valid but not configured.") - raise Exception("Data store type valid but not configured.") - - self.logger.info( - "Attempting to connect to data store %s, found at %s:%s" - % (data_store_name, data_store_host, data_store_port) + if ( + data_store_type == "postgresql" + or data_store_type == "oracle" + or data_store_type == "snowflake" + ): + + engine = create_engine( + data_store_type + + "://" + + data_store_username + + ":" + + data_store_password + + "@" + + data_store_host + + "/" + + data_store_name ) - if database_exists(engine.url): + elif data_store_type == "mysql": + + engine = create_engine( + "mysql+pymysql://" + + data_store_username + + ":" + + data_store_password + + "@" + + data_store_host + + "/" + + data_store_name + ) + elif data_store_type == "mssql": + + engine = create_engine( + "mssql+pymssql://" + + data_store_username + + ":" + + data_store_password + + "@" + + data_store_host + + "/" + + data_store_name + ) - self.logger.info("Data store exists. Continuing to work.") + else: + self.logger.error("Data store type valid but not configured.") + raise Exception("Data store type valid but not configured.") + + self.logger.info( + "Attempting to connect to data store %s, found at %s:%s" + % (data_store_name, data_store_host, data_store_port) + ) - else: + if database_exists(engine.url): - self.logger.error( - "Data store does not exist. Please create and try again." - ) - raise Exception( - "Data store does not exist. Please create and try again." - ) + self.logger.info("Data store exists. Continuing to work.") - session = sessionmaker(bind=engine) + else: + + self.logger.error( + "Data store does not exist. Please create and try again." + ) + raise Exception( + "Data store does not exist. Please create and try again." + ) - session = session(expire_on_commit=False) + session = sessionmaker(bind=engine) - if data_store_type == "postgresql": - session.execute("SET search_path TO %s" % data_store_name) - elif data_store_type == "mysql": - session.execute("USE %s" % data_store_name) + session = session(expire_on_commit=False) - meta = MetaData(schema="process_tracking") + if data_store_type == "postgresql": + session.execute("SET search_path TO %s" % data_store_name) + elif data_store_type == "mysql": + session.execute("USE %s" % data_store_name) - elif data_store_type in nonrelational_stores: - session = "" + meta = MetaData(schema="process_tracking") data_store = dict() data_store["engine"] = engine diff --git a/tests/test_cli.py b/tests/test_cli.py index 34a0692..e0c1416 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -5,11 +5,17 @@ from process_tracker.cli import main from process_tracker.data_store import DataStore +from process_tracker.process_tracker import ProcessTracker from process_tracker.utilities.logging import console from process_tracker.models.actor import Actor from process_tracker.models.extract import ExtractStatus -from process_tracker.models.process import ErrorType, ProcessStatus, ProcessType +from process_tracker.models.process import ( + ErrorType, + ProcessDependency, + ProcessStatus, + ProcessType, +) from process_tracker.models.source import Source from process_tracker.models.tool import Tool @@ -119,6 +125,58 @@ def test_create_error_type(self): # expected_result = "Invalid topic type." # # return self.assertEqual(expected_result, given_result) + def test_create_process_dependency(self): + """ + Testing that when creating a process dependency record, it is created. + :return: + """ + parent_process = ProcessTracker( + process_name="Testing Process Tracking Dependency Parent", + process_type="Extract", + actor_name="UnitTesting", + tool_name="Spark", + sources="Unittests", + targets="Unittests", + ) + + parent_process.change_run_status("completed") + + child_process = ProcessTracker( + process_name="Testing Process Tracking Dependency Child", + process_type="Extract", + actor_name="UnitTesting", + tool_name="Spark", + sources="Unittests", + targets="Unittests", + ) + + child_process.change_run_status("completed") + + result = self.runner.invoke( + main, + 'create -t "process dependency" -p "%s" -c "%s"' + % (parent_process.process_name, child_process.process_name), + ) + + instance = ( + self.session.query(ProcessDependency) + .filter( + ProcessDependency.parent_process_id == parent_process.process.process_id + ) + .filter( + ProcessDependency.child_process_id == child_process.process.process_id + ) + .first() + ) + + given_result = [instance.parent_process_id, instance.child_process_id] + expected_result = [ + parent_process.process.process_id, + child_process.process.process_id, + ] + + self.assertEqual(expected_result, given_result) + self.assertEqual(0, result.exit_code) def test_create_process_type(self): """ @@ -294,6 +352,59 @@ def test_delete_error_type_protected(self): # # return self.assertEqual(expected_result, given_result) + def test_delete_process_dependency(self): + """ + Testing that when deleting a process dependency record, it is deleted. + :return: + """ + parent_process = ProcessTracker( + process_name="Testing Process Tracking Dependency Parent", + process_type="Extract", + actor_name="UnitTesting", + tool_name="Spark", + sources="Unittests", + targets="Unittests", + ) + + parent_process.change_run_status("completed") + + child_process = ProcessTracker( + process_name="Testing Process Tracking Dependency Child", + process_type="Extract", + actor_name="UnitTesting", + tool_name="Spark", + sources="Unittests", + targets="Unittests", + ) + + child_process.change_run_status("completed") + + self.runner.invoke( + main, + 'create -t "process dependency" -p "%s" -c "%s"' + % (parent_process.process_name, child_process.process_name), + ) + + result = self.runner.invoke( + main, + 'delete -t "process dependency" -p "%s" -c "%s"' + % (parent_process.process_name, child_process.process_name), + ) + + instance = ( + self.session.query(ProcessDependency) + .filter( + ProcessDependency.parent_process_id == parent_process.process.process_id + ) + .filter( + ProcessDependency.child_process_id == child_process.process.process_id + ) + .first() + ) + + self.assertEqual(None, instance) + self.assertEqual(0, result.exit_code) + def test_delete_process_type(self): """ Testing that when deleting an process type record not on the protected list, it is deleted.