Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions tableauserverclient/server/endpoint/datasources_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@

from tableauserverclient.server.endpoint.dqw_endpoint import _DataQualityWarningEndpoint
from tableauserverclient.server.endpoint.endpoint import QuerysetEndpoint, api, parameter_added_in
from tableauserverclient.server.endpoint.exceptions import InternalServerError, MissingRequiredFieldError
from tableauserverclient.server.endpoint.exceptions import (
DUPLICATE_EXTRACT_JOB_CODE,
InternalServerError,
MissingRequiredFieldError,
ServerResponseError,
)
from tableauserverclient.server.endpoint.permissions_endpoint import _PermissionsEndpoint
from tableauserverclient.server.endpoint.resource_tagger import TaggingMixin

Expand Down Expand Up @@ -433,7 +438,7 @@ def update_connections(
return connection_items

@api(version="2.8")
def refresh(self, datasource_item: DatasourceItem | str, incremental: bool = False) -> JobItem:
def refresh(self, datasource_item: DatasourceItem | str, incremental: bool = False) -> JobItem | None:
"""
Refreshes the extract of an existing workbook.

Expand All @@ -454,7 +459,13 @@ def refresh(self, datasource_item: DatasourceItem | str, incremental: bool = Fal
id_ = getattr(datasource_item, "id", datasource_item)
url = f"{self.baseurl}/{id_}/refresh"
refresh_req = RequestFactory.Task.refresh_req(incremental, self.parent_srv)
server_response = self.post_request(url, refresh_req)
try:
server_response = self.post_request(url, refresh_req)
except ServerResponseError as e:
if e.code == DUPLICATE_EXTRACT_JOB_CODE:
logger.warning(f"{e.summary} {e.detail}")
return None
raise
new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
return new_job

Expand Down
4 changes: 4 additions & 0 deletions tableauserverclient/server/endpoint/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from defusedxml.ElementTree import fromstring
from typing import TypeVar

# Server error code for "extract refresh already queued" — treated as a
# non-fatal warning rather than an exception in refresh() methods.
DUPLICATE_EXTRACT_JOB_CODE = "409093"


def split_pascal_case(s: str) -> str:
return "".join([f" {c}" if c.isupper() else c for c in s]).strip()
Expand Down
17 changes: 14 additions & 3 deletions tableauserverclient/server/endpoint/flows_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@

from tableauserverclient.server.endpoint.dqw_endpoint import _DataQualityWarningEndpoint
from tableauserverclient.server.endpoint.endpoint import QuerysetEndpoint, api
from tableauserverclient.server.endpoint.exceptions import InternalServerError, MissingRequiredFieldError
from tableauserverclient.server.endpoint.exceptions import (
DUPLICATE_EXTRACT_JOB_CODE,
InternalServerError,
MissingRequiredFieldError,
ServerResponseError,
)
from tableauserverclient.server.endpoint.permissions_endpoint import _PermissionsEndpoint
from tableauserverclient.server.endpoint.resource_tagger import _ResourceTagger, TaggingMixin
from tableauserverclient.models import FlowItem, PaginationItem, ConnectionItem, JobItem
Expand Down Expand Up @@ -305,7 +310,7 @@ def update_connection(self, flow_item: FlowItem, connection_item: ConnectionItem
return connection

@api(version="3.3")
def refresh(self, flow_item: FlowItem | str) -> JobItem:
def refresh(self, flow_item: FlowItem | str) -> JobItem | None:
"""
Runs the flow to refresh the data.

Expand All @@ -324,7 +329,13 @@ def refresh(self, flow_item: FlowItem | str) -> JobItem:
flow_id = getattr(flow_item, "id", flow_item)
url = f"{self.baseurl}/{flow_id}/run"
empty_req = RequestFactory.Empty.empty_req()
server_response = self.post_request(url, empty_req)
try:
server_response = self.post_request(url, empty_req)
except ServerResponseError as e:
if e.code == DUPLICATE_EXTRACT_JOB_CODE:
logger.warning(f"{e.summary} {e.detail}")
return None
raise
new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
return new_job

Expand Down
3 changes: 2 additions & 1 deletion tableauserverclient/server/endpoint/workbooks_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from tableauserverclient.server.endpoint.endpoint import QuerysetEndpoint, api, parameter_added_in
from tableauserverclient.server.endpoint.exceptions import (
DUPLICATE_EXTRACT_JOB_CODE,
InternalServerError,
MissingRequiredFieldError,
ServerResponseError,
Expand Down Expand Up @@ -147,7 +148,7 @@ def refresh(self, workbook_item: WorkbookItem | str, incremental: bool = False)
try:
server_response = self.post_request(url, refresh_req)
except ServerResponseError as e:
if e.code.startswith("409") and "already" in e.detail:
if e.code == DUPLICATE_EXTRACT_JOB_CODE:
logger.warning(f"{e.summary} {e.detail}")
return None
raise
Expand Down
3 changes: 3 additions & 0 deletions test/assets/datasource_refresh_duplicate.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<?xml version='1.0' encoding='UTF-8'?>
<tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api https://help.tableau.com/samples/en-us/rest_api/ts-api_3_27.xsd">
<error code="409093"><summary>Resource Conflict</summary><detail>Job for \'extract\' is already queued. Not queuing a duplicate.</detail></error></tsResponse>
3 changes: 3 additions & 0 deletions test/assets/flow_refresh_duplicate.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<?xml version='1.0' encoding='UTF-8'?>
<tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api https://help.tableau.com/samples/en-us/rest_api/ts-api_3_27.xsd">
<error code="409093"><summary>Resource Conflict</summary><detail>Job for \'extract\' is already queued. Not queuing a duplicate.</detail></error></tsResponse>
14 changes: 14 additions & 0 deletions test/test_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
UPDATE_CONNECTION_XML = TEST_ASSET_DIR / "datasource_connection_update.xml"
UPDATE_CONNECTIONS_XML = TEST_ASSET_DIR / "datasource_connections_update.xml"
UPDATE_CONNECTIONS_NO_AUTH_XML = TEST_ASSET_DIR / "datasource_connections_update_no_auth.xml"
REFRESH_DUPLICATE_XML = TEST_ASSET_DIR / "datasource_refresh_duplicate.xml"


@pytest.fixture(scope="function")
Expand Down Expand Up @@ -473,6 +474,19 @@ def test_refresh_object(server) -> None:
assert "7c3d599e-949f-44c3-94a1-f30ba85757e4" == new_job.id


def test_refresh_already_running(server) -> None:
server.version = "2.8"
response_xml = REFRESH_DUPLICATE_XML.read_text()
with requests_mock.mock() as m:
m.post(
server.datasources.baseurl + "/9dbd2263-16b5-46e1-9c43-a76bb8ab65fb/refresh",
status_code=409,
text=response_xml,
)
refresh_job = server.datasources.refresh("9dbd2263-16b5-46e1-9c43-a76bb8ab65fb")
assert refresh_job is None


def test_datasource_refresh_request_empty(server) -> None:
server.version = "2.8"
item = TSC.DatasourceItem("")
Expand Down
13 changes: 13 additions & 0 deletions test/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
PUBLISH_XML = TEST_ASSET_DIR / "flow_publish.xml"
UPDATE_XML = TEST_ASSET_DIR / "flow_update.xml"
REFRESH_XML = TEST_ASSET_DIR / "flow_refresh.xml"
REFRESH_DUPLICATE_XML = TEST_ASSET_DIR / "flow_refresh_duplicate.xml"


@pytest.fixture(scope="function")
Expand Down Expand Up @@ -232,6 +233,18 @@ def test_refresh_id_str(server: TSC.Server) -> None:
assert format_datetime(refresh_job.flow_run.started_at) == "2018-05-22T13:00:29Z"


def test_refresh_already_running(server: TSC.Server) -> None:
response_xml = REFRESH_DUPLICATE_XML.read_text()
with requests_mock.mock() as m:
m.post(
server.flows.baseurl + "/92967d2d-c7e2-46d0-8847-4802df58f484/run",
status_code=409,
text=response_xml,
)
refresh_job = server.flows.refresh("92967d2d-c7e2-46d0-8847-4802df58f484")
assert refresh_job is None


def test_bad_download_response(server: TSC.Server) -> None:
with requests_mock.mock() as m, tempfile.TemporaryDirectory() as td:
m.get(
Expand Down
Loading