Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,4 @@ dmypy.json
.pyre/

test_cities_delete.csv
test_cities_delete_2.csv
13 changes: 11 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
The data engineering library to build robust, reliable and on time data pipelines in Python.

### To use with Dataplane Data Platform
Install Python Package on Dataplane: https://learn.dataplane.app/installing-python-packages
Install Python Package on Dataplane: https://learn.dataplane.app/installing-python-packages<br />
Recipes to use Dataplane Python Package: https://recipes.dataplane.app/office-365/sharepoint-api

### Run tests in repo root
Expand All @@ -11,9 +11,18 @@ pytest -s
```

### For a specific test

```shell
pytest -s src/dataplane/data_storage/test_sharepoint_upload.py
pytest -s -k test_sharepoint_upload.py
pytest -s -k test_sharepoint.py
```

### Troubleshooting errors
If an error occurs, try removing .pytest_cache and ```__pycache__``` - can happen after devcontainers build

```shell
find . | grep -E "(/__pycache__$|\.pyc$|\.pyo$)" | xargs rm -rf
rm -rf /home/vscode/.local/lib/python3.10/site-packages/
```

### S3 / Minio for testing in VS code devcontainers
Expand Down
2 changes: 1 addition & 1 deletion import_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

start = datetime.now()

from dataplane import hello
# from dataplane import hello

duration = datetime.now() - start

Expand Down
31 changes: 27 additions & 4 deletions src/dataplane/Microsoft/Sharepoint/sharepoint_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
ProxyUrl: Proxy endpoint to use
ProxyMethod: https or http, default https
"""
def sharepoint_download(Host, TenantID, ClientID, Secret, SiteName, LocalFilePath, SharepointFilePath, ProxyUse=False, ProxyUrl="", ProxyMethod="https"):
def sharepoint_download(Host, TenantID, ClientID, Secret, SiteName, LocalFilePath, SharepointFilePath, Library="root", ProxyUse=False, ProxyUrl="", ProxyMethod="https"):

# Start the timer
start = datetime.now()
Expand Down Expand Up @@ -63,11 +63,34 @@ def sharepoint_download(Host, TenantID, ClientID, Secret, SiteName, LocalFilePat

SiteID = SiteID.json()

# ====== Get Item ID =====

# ====== Library
# Note the target path must have a / prefix
SharepointFilePath = SharepointFilePath.replace(" ", "%20")
url = f"https://graph.microsoft.com/v1.0/sites/{SiteID['id']}/drive/root:{SharepointFilePath}"
drive = "root"
if Library == "root":
# ====== Create an upload session =====
url = f"https://graph.microsoft.com/v1.0/sites/{SiteID['id']}/drive/root:{SharepointFilePath}"
else:

drive = ""
driveID = requests.request("GET", f"https://graph.microsoft.com/v1.0/sites/{SiteID['id']}/drives?$select=name,id", headers=headers, json=payload, proxies=proxies)
if driveID.status_code != 200:
duration = datetime.now() - start
return {"result":"Fail", "reason":"Sharepoint get drives", "duration": str(duration), "status": driveID.status_code, "error": driveID.json()}

driveID = driveID.json()
for x in driveID["value"]:
if x["name"] == Library:
drive = x["id"]
break

if drive =="":
duration = datetime.now() - start
return {"result":"Fail", "reason":"Sharepoint no drove found fpr library "+Library, "duration": str(duration)}

url = f"https://graph.microsoft.com/v1.0/sites/{SiteID['id']}/drives/{drive}/root:{SharepointFilePath}"

# ====== Get Item ID =====


ItemID = requests.request("GET", url, headers=headers, json=payload, proxies=proxies)
Expand Down
31 changes: 26 additions & 5 deletions src/dataplane/Microsoft/Sharepoint/sharepoint_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
FileConflict: "fail (default) | replace | rename"
FileDescription: Sharepoint description for the file
"""
def sharepoint_upload(Host, TenantID, ClientID, Secret, SiteName, TargetFilePath, SourceFilePath, ProxyUse=False, ProxyUrl="", ProxyMethod="https", FileConflict="fail"):
def sharepoint_upload(Host, TenantID, ClientID, Secret, SiteName, TargetFilePath, SourceFilePath, Library="root", ProxyUse=False, ProxyUrl="", ProxyMethod="https", FileConflict="fail"):

# Start the timer
start = datetime.now()
Expand Down Expand Up @@ -64,16 +64,37 @@ def sharepoint_upload(Host, TenantID, ClientID, Secret, SiteName, TargetFilePath

SiteID = SiteID.json()

# ====== Create an upload session =====

# print(SiteID)

# ====== Library
# Note the target path must have a / prefix
TargetFilePath = TargetFilePath.replace(" ", "%20")
url = f"https://graph.microsoft.com/v1.0/sites/{SiteID['id']}/drive/root:{TargetFilePath}:/createUploadSession"
drive = "root"
if Library == "root":
# ====== Create an upload session =====
url = f"https://graph.microsoft.com/v1.0/sites/{SiteID['id']}/drive/root:{TargetFilePath}:/createUploadSession"
else:

drive = ""
driveID = requests.request("GET", f"https://graph.microsoft.com/v1.0/sites/{SiteID['id']}/drives?$select=name,id", headers=headers, json=payload, proxies=proxies)
if driveID.status_code != 200:
duration = datetime.now() - start
return {"result":"Fail", "reason":"Sharepoint get drives", "duration": str(duration), "status": driveID.status_code, "error": driveID.json()}

driveID = driveID.json()
for x in driveID["value"]:
if x["name"] == Library:
drive = x["id"]
break

if drive =="":
duration = datetime.now() - start
return {"result":"Fail", "reason":"Sharepoint no drove found fpr library "+Library, "duration": str(duration)}

url = f"https://graph.microsoft.com/v1.0/sites/{SiteID['id']}/drives/{drive}/root:{TargetFilePath}:/createUploadSession"

# https://graph.microsoft.com/v1.0/sites/{name}.sharepoint.com/drive/root:/test/testing.xlsx:/createUploadSession


FileSize = os.path.getsize(SourceFilePath)

payload = {
Expand Down
63 changes: 63 additions & 0 deletions src/dataplane/Microsoft/Sharepoint/test_sharepoint_nonroot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@

import os
from .sharepoint_upload import sharepoint_upload
from .sharepoint_download import sharepoint_download
from nanoid import generate
import os
from dotenv import load_dotenv

def test_sharepoint():

# ---------- DATAPLANE RUN ------------

# Dataplane run id
os.environ["DP_RUNID"] = generate('1234567890abcdef', 10)

# Sharepoint connection
load_dotenv()

RUN_ID = os.environ["DP_RUNID"]
HOST = os.getenv('SHAREPOINT_HOST')
AZURE_CLIENT_ID = os.getenv('AZURE_CLIENT_ID')
AZURE_CLIENT_SECRET = os.getenv('AZURE_CLIENT_SECRET')
AZURE_TENANT_ID = os.getenv('AZURE_TENANT_ID')
CURRENT_DIRECTORY = os.path.realpath(os.path.dirname(__file__))

# ---------- STORE File to Sharepoint ------------
# SharepointUpload(Host, TenantID, ClientID, Secret, SiteName, TargetFilePath, SourceFilePath, FileDescription="", ProxyUse=False, ProxyUrl="", ProxyMethod="https", FileConflict="fail")
# print(CURRENT_DIRECTORY)
# Store the data with key hello - run id will be attached
rs = sharepoint_upload(Host=HOST,
TenantID=AZURE_TENANT_ID,
ClientID=AZURE_CLIENT_ID,
Secret=AZURE_CLIENT_SECRET,
SiteName="Dataplane Python",
TargetFilePath=f"/myfile {RUN_ID}.csv",
SourceFilePath=CURRENT_DIRECTORY+"/test_cities.csv",
Library="Doc library 2"
)
print(rs)
assert rs["result"]=="OK"


# ---------- RETRIEVE PARQUET FROM S3 ------------

rs = sharepoint_download(Host=HOST,
TenantID=AZURE_TENANT_ID,
ClientID=AZURE_CLIENT_ID,
Secret=AZURE_CLIENT_SECRET,
SiteName="Dataplane Python",
SharepointFilePath=f"/myfile {RUN_ID}.csv",
LocalFilePath=CURRENT_DIRECTORY+"/test_cities_delete_2.csv",
Library="Doc library 2",
ProxyUse=False, ProxyUrl="", ProxyMethod="https")
print(rs)
assert rs["result"]=="OK"
# Get the data
# rsget = S3Get(StoreKey="s3me", S3Client=S3Connect, Bucket=bucket)
# print(rsget)
# df = rsget["dataframe"]
# print(df.shape[0])
# # Test before and after rows
# assert df.shape[0] == dfrows
# assert rsget["result"]=="OK"
56 changes: 28 additions & 28 deletions src/dataplane/__init__.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,37 @@
# from dataplane.pipelinerun.data_persist.redis_store import (
# pipeline_redis_store,
# pipeline_redis_get,
# )
from dataplane.pipelinerun.data_persist.redis_store import (
pipeline_redis_store,
pipeline_redis_get,
)

# from dataplane.pipelinerun.data_persist.s3_store import (
# pipeline_s3_get,
# pipeline_s3_store,
# )
from dataplane.pipelinerun.data_persist.s3_store import (
pipeline_s3_get,
pipeline_s3_store,
)

# from dataplane.hello import (
# hello,
# )
from dataplane.hello import (
hello,
)

# # Microsoft
# from dataplane.Microsoft.Teams.webhook_send import teams_webhook_send
# from dataplane.Microsoft.Sharepoint.sharepoint_download import sharepoint_download
# from dataplane.Microsoft.Sharepoint.sharepoint_upload import sharepoint_upload
# Microsoft
from dataplane.Microsoft.Teams.webhook_send import teams_webhook_send
from dataplane.Microsoft.Sharepoint.sharepoint_download import sharepoint_download
from dataplane.Microsoft.Sharepoint.sharepoint_upload import sharepoint_upload


# __all__ = [
__all__ = [

# # Test modules
# "hello",
# Test modules
"hello",

# # Pipeline transfers
# "pipeline_redis_store",
# "pipeline_redis_get",
# "pipeline_s3_get",
# "pipeline_s3_store",
# Pipeline transfers
"pipeline_redis_store",
"pipeline_redis_get",
"pipeline_s3_get",
"pipeline_s3_store",

# # Microsoft connectors
# "teams_webhook_send",
# "sharepoint_download",
# "sharepoint_upload",
# Microsoft connectors
"teams_webhook_send",
"sharepoint_download",
"sharepoint_upload",

# ]
]
2 changes: 1 addition & 1 deletion src/dataplane/pipelinerun/data_persist/redis_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import io
from datetime import datetime, timedelta
import redis
import pandas as pd

def RedisCheck(r):
try:
Expand Down Expand Up @@ -61,6 +60,7 @@ def pipeline_redis_get(StoreKey, Redis):
# Retrieve dataframe from key
buffer = io.BytesIO(Redis.get(InsertKey))
buffer.seek(0)
import pandas as pd
df = pd.read_parquet(buffer)

duration = datetime.now() - start
Expand Down
2 changes: 1 addition & 1 deletion src/dataplane/pipelinerun/data_persist/s3_store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@


import pandas as pd
# import requests
import os
from io import BytesIO
Expand Down Expand Up @@ -44,6 +43,7 @@ def pipeline_s3_get(StoreKey, S3Client, Bucket):
# Retrieve dataframe from key
# buffer = BytesIO()
objectGet = S3Client.get_object(Bucket=Bucket, Key=InsertKey, ChecksumMode='ENABLED')["Body"].read()
import pandas as pd
df = pd.read_parquet(BytesIO(objectGet))

duration = datetime.now() - start
Expand Down
2 changes: 1 addition & 1 deletion src/dataplane/pipelinerun/data_persist/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from .redis_store import pipeline_redis_store
from .redis_store import pipeline_redis_get
import redis
import pandas as pd
from datetime import timedelta
from nanoid import generate

Expand All @@ -19,6 +18,7 @@ def test_redis_store():
"calories": [420, 380, 390],
"duration": [50, 40, 45]
}
import pandas as pd
df = pd.DataFrame(data)
dfrows = df.shape[0]

Expand Down
2 changes: 1 addition & 1 deletion src/dataplane/pipelinerun/data_persist/test_s3_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from .s3_store import pipeline_s3_get
import boto3
from botocore.client import Config
import pandas as pd
from nanoid import generate

def test_s3_store():
Expand All @@ -19,6 +18,7 @@ def test_s3_store():
"calories": [420, 380, 390],
"duration": [50, 40, 45]
}
import pandas as pd
df = pd.DataFrame(data)
dfrows = df.shape[0]

Expand Down