diff --git a/.gitignore b/.gitignore index 8345be0..11b87d6 100644 --- a/.gitignore +++ b/.gitignore @@ -129,3 +129,4 @@ dmypy.json .pyre/ test_cities_delete.csv +test_cities_delete_2.csv \ No newline at end of file diff --git a/README.md b/README.md index e57edf2..5998dea 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ The data engineering library to build robust, reliable and on time data pipelines in Python. ### To use with Dataplane Data Platform -Install Python Package on Dataplane: https://learn.dataplane.app/installing-python-packages +Install Python Package on Dataplane: https://learn.dataplane.app/installing-python-packages
Recipes to use Dataplane Python Package: https://recipes.dataplane.app/office-365/sharepoint-api ### Run tests in repo root @@ -11,9 +11,18 @@ pytest -s ``` ### For a specific test + ```shell pytest -s src/dataplane/data_storage/test_sharepoint_upload.py -pytest -s -k test_sharepoint_upload.py +pytest -s -k test_sharepoint.py +``` + +### Troubleshooting errors +If an error occurs, try removing .pytest_cache and ```__pycache__``` - can happen after devcontainers build + +```shell +find . | grep -E "(/__pycache__$|\.pyc$|\.pyo$)" | xargs rm -rf +rm -rf /home/vscode/.local/lib/python3.10/site-packages/ ``` ### S3 / Minio for testing in VS code devcontainers diff --git a/import_test.py b/import_test.py index d81b27d..e51109c 100644 --- a/import_test.py +++ b/import_test.py @@ -2,7 +2,7 @@ start = datetime.now() -from dataplane import hello +# from dataplane import hello duration = datetime.now() - start diff --git a/src/dataplane/Microsoft/Sharepoint/sharepoint_download.py b/src/dataplane/Microsoft/Sharepoint/sharepoint_download.py index 69b0b19..e8f1909 100644 --- a/src/dataplane/Microsoft/Sharepoint/sharepoint_download.py +++ b/src/dataplane/Microsoft/Sharepoint/sharepoint_download.py @@ -14,7 +14,7 @@ ProxyUrl: Proxy endpoint to use ProxyMethod: https or http, default https """ -def sharepoint_download(Host, TenantID, ClientID, Secret, SiteName, LocalFilePath, SharepointFilePath, ProxyUse=False, ProxyUrl="", ProxyMethod="https"): +def sharepoint_download(Host, TenantID, ClientID, Secret, SiteName, LocalFilePath, SharepointFilePath, Library="root", ProxyUse=False, ProxyUrl="", ProxyMethod="https"): # Start the timer start = datetime.now() @@ -63,11 +63,34 @@ def sharepoint_download(Host, TenantID, ClientID, Secret, SiteName, LocalFilePat SiteID = SiteID.json() - # ====== Get Item ID ===== - + # ====== Library # Note the target path must have a / prefix SharepointFilePath = SharepointFilePath.replace(" ", "%20") - url = f"https://graph.microsoft.com/v1.0/sites/{SiteID['id']}/drive/root:{SharepointFilePath}" + drive = "root" + if Library == "root": + # ====== Create an upload session ===== + url = f"https://graph.microsoft.com/v1.0/sites/{SiteID['id']}/drive/root:{SharepointFilePath}" + else: + + drive = "" + driveID = requests.request("GET", f"https://graph.microsoft.com/v1.0/sites/{SiteID['id']}/drives?$select=name,id", headers=headers, json=payload, proxies=proxies) + if driveID.status_code != 200: + duration = datetime.now() - start + return {"result":"Fail", "reason":"Sharepoint get drives", "duration": str(duration), "status": driveID.status_code, "error": driveID.json()} + + driveID = driveID.json() + for x in driveID["value"]: + if x["name"] == Library: + drive = x["id"] + break + + if drive =="": + duration = datetime.now() - start + return {"result":"Fail", "reason":"Sharepoint no drove found fpr library "+Library, "duration": str(duration)} + + url = f"https://graph.microsoft.com/v1.0/sites/{SiteID['id']}/drives/{drive}/root:{SharepointFilePath}" + + # ====== Get Item ID ===== ItemID = requests.request("GET", url, headers=headers, json=payload, proxies=proxies) diff --git a/src/dataplane/Microsoft/Sharepoint/sharepoint_upload.py b/src/dataplane/Microsoft/Sharepoint/sharepoint_upload.py index 6f81261..71d221b 100644 --- a/src/dataplane/Microsoft/Sharepoint/sharepoint_upload.py +++ b/src/dataplane/Microsoft/Sharepoint/sharepoint_upload.py @@ -15,7 +15,7 @@ FileConflict: "fail (default) | replace | rename" FileDescription: Sharepoint description for the file """ -def sharepoint_upload(Host, TenantID, ClientID, Secret, SiteName, TargetFilePath, SourceFilePath, ProxyUse=False, ProxyUrl="", ProxyMethod="https", FileConflict="fail"): +def sharepoint_upload(Host, TenantID, ClientID, Secret, SiteName, TargetFilePath, SourceFilePath, Library="root", ProxyUse=False, ProxyUrl="", ProxyMethod="https", FileConflict="fail"): # Start the timer start = datetime.now() @@ -64,16 +64,37 @@ def sharepoint_upload(Host, TenantID, ClientID, Secret, SiteName, TargetFilePath SiteID = SiteID.json() - # ====== Create an upload session ===== - + # print(SiteID) + # ====== Library # Note the target path must have a / prefix TargetFilePath = TargetFilePath.replace(" ", "%20") - url = f"https://graph.microsoft.com/v1.0/sites/{SiteID['id']}/drive/root:{TargetFilePath}:/createUploadSession" + drive = "root" + if Library == "root": + # ====== Create an upload session ===== + url = f"https://graph.microsoft.com/v1.0/sites/{SiteID['id']}/drive/root:{TargetFilePath}:/createUploadSession" + else: + + drive = "" + driveID = requests.request("GET", f"https://graph.microsoft.com/v1.0/sites/{SiteID['id']}/drives?$select=name,id", headers=headers, json=payload, proxies=proxies) + if driveID.status_code != 200: + duration = datetime.now() - start + return {"result":"Fail", "reason":"Sharepoint get drives", "duration": str(duration), "status": driveID.status_code, "error": driveID.json()} + + driveID = driveID.json() + for x in driveID["value"]: + if x["name"] == Library: + drive = x["id"] + break + + if drive =="": + duration = datetime.now() - start + return {"result":"Fail", "reason":"Sharepoint no drove found fpr library "+Library, "duration": str(duration)} + + url = f"https://graph.microsoft.com/v1.0/sites/{SiteID['id']}/drives/{drive}/root:{TargetFilePath}:/createUploadSession" # https://graph.microsoft.com/v1.0/sites/{name}.sharepoint.com/drive/root:/test/testing.xlsx:/createUploadSession - FileSize = os.path.getsize(SourceFilePath) payload = { diff --git a/src/dataplane/Microsoft/Sharepoint/test_sharepoint_nonroot.py b/src/dataplane/Microsoft/Sharepoint/test_sharepoint_nonroot.py new file mode 100644 index 0000000..a0822a6 --- /dev/null +++ b/src/dataplane/Microsoft/Sharepoint/test_sharepoint_nonroot.py @@ -0,0 +1,63 @@ + +import os +from .sharepoint_upload import sharepoint_upload +from .sharepoint_download import sharepoint_download +from nanoid import generate +import os +from dotenv import load_dotenv + +def test_sharepoint(): + + # ---------- DATAPLANE RUN ------------ + + # Dataplane run id + os.environ["DP_RUNID"] = generate('1234567890abcdef', 10) + + # Sharepoint connection + load_dotenv() + + RUN_ID = os.environ["DP_RUNID"] + HOST = os.getenv('SHAREPOINT_HOST') + AZURE_CLIENT_ID = os.getenv('AZURE_CLIENT_ID') + AZURE_CLIENT_SECRET = os.getenv('AZURE_CLIENT_SECRET') + AZURE_TENANT_ID = os.getenv('AZURE_TENANT_ID') + CURRENT_DIRECTORY = os.path.realpath(os.path.dirname(__file__)) + + # ---------- STORE File to Sharepoint ------------ + # SharepointUpload(Host, TenantID, ClientID, Secret, SiteName, TargetFilePath, SourceFilePath, FileDescription="", ProxyUse=False, ProxyUrl="", ProxyMethod="https", FileConflict="fail") + # print(CURRENT_DIRECTORY) + # Store the data with key hello - run id will be attached + rs = sharepoint_upload(Host=HOST, + TenantID=AZURE_TENANT_ID, + ClientID=AZURE_CLIENT_ID, + Secret=AZURE_CLIENT_SECRET, + SiteName="Dataplane Python", + TargetFilePath=f"/myfile {RUN_ID}.csv", + SourceFilePath=CURRENT_DIRECTORY+"/test_cities.csv", + Library="Doc library 2" + ) + print(rs) + assert rs["result"]=="OK" + + + # ---------- RETRIEVE PARQUET FROM S3 ------------ + + rs = sharepoint_download(Host=HOST, + TenantID=AZURE_TENANT_ID, + ClientID=AZURE_CLIENT_ID, + Secret=AZURE_CLIENT_SECRET, + SiteName="Dataplane Python", + SharepointFilePath=f"/myfile {RUN_ID}.csv", + LocalFilePath=CURRENT_DIRECTORY+"/test_cities_delete_2.csv", + Library="Doc library 2", + ProxyUse=False, ProxyUrl="", ProxyMethod="https") + print(rs) + assert rs["result"]=="OK" + # Get the data + # rsget = S3Get(StoreKey="s3me", S3Client=S3Connect, Bucket=bucket) + # print(rsget) + # df = rsget["dataframe"] + # print(df.shape[0]) + # # Test before and after rows + # assert df.shape[0] == dfrows + # assert rsget["result"]=="OK" \ No newline at end of file diff --git a/src/dataplane/__init__.py b/src/dataplane/__init__.py index 8877f60..d11436f 100644 --- a/src/dataplane/__init__.py +++ b/src/dataplane/__init__.py @@ -1,37 +1,37 @@ -# from dataplane.pipelinerun.data_persist.redis_store import ( -# pipeline_redis_store, -# pipeline_redis_get, -# ) +from dataplane.pipelinerun.data_persist.redis_store import ( + pipeline_redis_store, + pipeline_redis_get, +) -# from dataplane.pipelinerun.data_persist.s3_store import ( -# pipeline_s3_get, -# pipeline_s3_store, -# ) +from dataplane.pipelinerun.data_persist.s3_store import ( + pipeline_s3_get, + pipeline_s3_store, +) -# from dataplane.hello import ( -# hello, -# ) +from dataplane.hello import ( + hello, +) -# # Microsoft -# from dataplane.Microsoft.Teams.webhook_send import teams_webhook_send -# from dataplane.Microsoft.Sharepoint.sharepoint_download import sharepoint_download -# from dataplane.Microsoft.Sharepoint.sharepoint_upload import sharepoint_upload +# Microsoft +from dataplane.Microsoft.Teams.webhook_send import teams_webhook_send +from dataplane.Microsoft.Sharepoint.sharepoint_download import sharepoint_download +from dataplane.Microsoft.Sharepoint.sharepoint_upload import sharepoint_upload -# __all__ = [ +__all__ = [ -# # Test modules -# "hello", + # Test modules + "hello", -# # Pipeline transfers -# "pipeline_redis_store", -# "pipeline_redis_get", -# "pipeline_s3_get", -# "pipeline_s3_store", + # Pipeline transfers + "pipeline_redis_store", + "pipeline_redis_get", + "pipeline_s3_get", + "pipeline_s3_store", -# # Microsoft connectors -# "teams_webhook_send", -# "sharepoint_download", -# "sharepoint_upload", + # Microsoft connectors + "teams_webhook_send", + "sharepoint_download", + "sharepoint_upload", -# ] \ No newline at end of file + ] \ No newline at end of file diff --git a/src/dataplane/pipelinerun/data_persist/redis_store.py b/src/dataplane/pipelinerun/data_persist/redis_store.py index 636d57d..211abef 100644 --- a/src/dataplane/pipelinerun/data_persist/redis_store.py +++ b/src/dataplane/pipelinerun/data_persist/redis_store.py @@ -2,7 +2,6 @@ import io from datetime import datetime, timedelta import redis -import pandas as pd def RedisCheck(r): try: @@ -61,6 +60,7 @@ def pipeline_redis_get(StoreKey, Redis): # Retrieve dataframe from key buffer = io.BytesIO(Redis.get(InsertKey)) buffer.seek(0) + import pandas as pd df = pd.read_parquet(buffer) duration = datetime.now() - start diff --git a/src/dataplane/pipelinerun/data_persist/s3_store.py b/src/dataplane/pipelinerun/data_persist/s3_store.py index a9e2c7c..b8e6d60 100644 --- a/src/dataplane/pipelinerun/data_persist/s3_store.py +++ b/src/dataplane/pipelinerun/data_persist/s3_store.py @@ -1,6 +1,5 @@ -import pandas as pd # import requests import os from io import BytesIO @@ -44,6 +43,7 @@ def pipeline_s3_get(StoreKey, S3Client, Bucket): # Retrieve dataframe from key # buffer = BytesIO() objectGet = S3Client.get_object(Bucket=Bucket, Key=InsertKey, ChecksumMode='ENABLED')["Body"].read() + import pandas as pd df = pd.read_parquet(BytesIO(objectGet)) duration = datetime.now() - start diff --git a/src/dataplane/pipelinerun/data_persist/test_redis.py b/src/dataplane/pipelinerun/data_persist/test_redis.py index 4e9671e..55681d1 100644 --- a/src/dataplane/pipelinerun/data_persist/test_redis.py +++ b/src/dataplane/pipelinerun/data_persist/test_redis.py @@ -3,7 +3,6 @@ from .redis_store import pipeline_redis_store from .redis_store import pipeline_redis_get import redis -import pandas as pd from datetime import timedelta from nanoid import generate @@ -19,6 +18,7 @@ def test_redis_store(): "calories": [420, 380, 390], "duration": [50, 40, 45] } + import pandas as pd df = pd.DataFrame(data) dfrows = df.shape[0] diff --git a/src/dataplane/pipelinerun/data_persist/test_s3_store.py b/src/dataplane/pipelinerun/data_persist/test_s3_store.py index 0b83c69..70e60e1 100644 --- a/src/dataplane/pipelinerun/data_persist/test_s3_store.py +++ b/src/dataplane/pipelinerun/data_persist/test_s3_store.py @@ -4,7 +4,6 @@ from .s3_store import pipeline_s3_get import boto3 from botocore.client import Config -import pandas as pd from nanoid import generate def test_s3_store(): @@ -19,6 +18,7 @@ def test_s3_store(): "calories": [420, 380, 390], "duration": [50, 40, 45] } + import pandas as pd df = pd.DataFrame(data) dfrows = df.shape[0]