diff --git a/tableauserverclient/server/endpoint/datasources_endpoint.py b/tableauserverclient/server/endpoint/datasources_endpoint.py
index 0fb5c7dc9..bac05461c 100644
--- a/tableauserverclient/server/endpoint/datasources_endpoint.py
+++ b/tableauserverclient/server/endpoint/datasources_endpoint.py
@@ -19,7 +19,12 @@
from tableauserverclient.server.endpoint.dqw_endpoint import _DataQualityWarningEndpoint
from tableauserverclient.server.endpoint.endpoint import QuerysetEndpoint, api, parameter_added_in
-from tableauserverclient.server.endpoint.exceptions import InternalServerError, MissingRequiredFieldError
+from tableauserverclient.server.endpoint.exceptions import (
+ DUPLICATE_EXTRACT_JOB_CODE,
+ InternalServerError,
+ MissingRequiredFieldError,
+ ServerResponseError,
+)
from tableauserverclient.server.endpoint.permissions_endpoint import _PermissionsEndpoint
from tableauserverclient.server.endpoint.resource_tagger import TaggingMixin
@@ -433,7 +438,7 @@ def update_connections(
return connection_items
@api(version="2.8")
- def refresh(self, datasource_item: DatasourceItem | str, incremental: bool = False) -> JobItem:
+ def refresh(self, datasource_item: DatasourceItem | str, incremental: bool = False) -> JobItem | None:
"""
Refreshes the extract of an existing workbook.
@@ -454,7 +459,13 @@ def refresh(self, datasource_item: DatasourceItem | str, incremental: bool = Fal
id_ = getattr(datasource_item, "id", datasource_item)
url = f"{self.baseurl}/{id_}/refresh"
refresh_req = RequestFactory.Task.refresh_req(incremental, self.parent_srv)
- server_response = self.post_request(url, refresh_req)
+ try:
+ server_response = self.post_request(url, refresh_req)
+ except ServerResponseError as e:
+ if e.code == DUPLICATE_EXTRACT_JOB_CODE:
+ logger.warning(f"{e.summary} {e.detail}")
+ return None
+ raise
new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
return new_job
diff --git a/tableauserverclient/server/endpoint/exceptions.py b/tableauserverclient/server/endpoint/exceptions.py
index 9d30ce331..154aa4549 100644
--- a/tableauserverclient/server/endpoint/exceptions.py
+++ b/tableauserverclient/server/endpoint/exceptions.py
@@ -1,6 +1,10 @@
from defusedxml.ElementTree import fromstring
from typing import TypeVar
+# Server error code for "extract refresh already queued" — treated as a
+# non-fatal warning rather than an exception in refresh() methods.
+DUPLICATE_EXTRACT_JOB_CODE = "409093"
+
def split_pascal_case(s: str) -> str:
return "".join([f" {c}" if c.isupper() else c for c in s]).strip()
diff --git a/tableauserverclient/server/endpoint/flows_endpoint.py b/tableauserverclient/server/endpoint/flows_endpoint.py
index 2d480e883..ea7b59526 100644
--- a/tableauserverclient/server/endpoint/flows_endpoint.py
+++ b/tableauserverclient/server/endpoint/flows_endpoint.py
@@ -10,7 +10,12 @@
from tableauserverclient.server.endpoint.dqw_endpoint import _DataQualityWarningEndpoint
from tableauserverclient.server.endpoint.endpoint import QuerysetEndpoint, api
-from tableauserverclient.server.endpoint.exceptions import InternalServerError, MissingRequiredFieldError
+from tableauserverclient.server.endpoint.exceptions import (
+ DUPLICATE_EXTRACT_JOB_CODE,
+ InternalServerError,
+ MissingRequiredFieldError,
+ ServerResponseError,
+)
from tableauserverclient.server.endpoint.permissions_endpoint import _PermissionsEndpoint
from tableauserverclient.server.endpoint.resource_tagger import _ResourceTagger, TaggingMixin
from tableauserverclient.models import FlowItem, PaginationItem, ConnectionItem, JobItem
@@ -305,7 +310,7 @@ def update_connection(self, flow_item: FlowItem, connection_item: ConnectionItem
return connection
@api(version="3.3")
- def refresh(self, flow_item: FlowItem | str) -> JobItem:
+ def refresh(self, flow_item: FlowItem | str) -> JobItem | None:
"""
Runs the flow to refresh the data.
@@ -324,7 +329,13 @@ def refresh(self, flow_item: FlowItem | str) -> JobItem:
flow_id = getattr(flow_item, "id", flow_item)
url = f"{self.baseurl}/{flow_id}/run"
empty_req = RequestFactory.Empty.empty_req()
- server_response = self.post_request(url, empty_req)
+ try:
+ server_response = self.post_request(url, empty_req)
+ except ServerResponseError as e:
+ if e.code == DUPLICATE_EXTRACT_JOB_CODE:
+ logger.warning(f"{e.summary} {e.detail}")
+ return None
+ raise
new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
return new_job
diff --git a/tableauserverclient/server/endpoint/workbooks_endpoint.py b/tableauserverclient/server/endpoint/workbooks_endpoint.py
index ce605806f..d6d77e207 100644
--- a/tableauserverclient/server/endpoint/workbooks_endpoint.py
+++ b/tableauserverclient/server/endpoint/workbooks_endpoint.py
@@ -11,6 +11,7 @@
from tableauserverclient.server.endpoint.endpoint import QuerysetEndpoint, api, parameter_added_in
from tableauserverclient.server.endpoint.exceptions import (
+ DUPLICATE_EXTRACT_JOB_CODE,
InternalServerError,
MissingRequiredFieldError,
ServerResponseError,
@@ -147,7 +148,7 @@ def refresh(self, workbook_item: WorkbookItem | str, incremental: bool = False)
try:
server_response = self.post_request(url, refresh_req)
except ServerResponseError as e:
- if e.code.startswith("409") and "already" in e.detail:
+ if e.code == DUPLICATE_EXTRACT_JOB_CODE:
logger.warning(f"{e.summary} {e.detail}")
return None
raise
diff --git a/test/assets/datasource_refresh_duplicate.xml b/test/assets/datasource_refresh_duplicate.xml
new file mode 100644
index 000000000..38a258933
--- /dev/null
+++ b/test/assets/datasource_refresh_duplicate.xml
@@ -0,0 +1,3 @@
+
+
+Resource ConflictJob for \'extract\' is already queued. Not queuing a duplicate.
diff --git a/test/assets/flow_refresh_duplicate.xml b/test/assets/flow_refresh_duplicate.xml
new file mode 100644
index 000000000..38a258933
--- /dev/null
+++ b/test/assets/flow_refresh_duplicate.xml
@@ -0,0 +1,3 @@
+
+
+Resource ConflictJob for \'extract\' is already queued. Not queuing a duplicate.
diff --git a/test/test_datasource.py b/test/test_datasource.py
index cb1157bc5..4802c7d33 100644
--- a/test/test_datasource.py
+++ b/test/test_datasource.py
@@ -35,6 +35,7 @@
UPDATE_CONNECTION_XML = TEST_ASSET_DIR / "datasource_connection_update.xml"
UPDATE_CONNECTIONS_XML = TEST_ASSET_DIR / "datasource_connections_update.xml"
UPDATE_CONNECTIONS_NO_AUTH_XML = TEST_ASSET_DIR / "datasource_connections_update_no_auth.xml"
+REFRESH_DUPLICATE_XML = TEST_ASSET_DIR / "datasource_refresh_duplicate.xml"
@pytest.fixture(scope="function")
@@ -473,6 +474,19 @@ def test_refresh_object(server) -> None:
assert "7c3d599e-949f-44c3-94a1-f30ba85757e4" == new_job.id
+def test_refresh_already_running(server) -> None:
+ server.version = "2.8"
+ response_xml = REFRESH_DUPLICATE_XML.read_text()
+ with requests_mock.mock() as m:
+ m.post(
+ server.datasources.baseurl + "/9dbd2263-16b5-46e1-9c43-a76bb8ab65fb/refresh",
+ status_code=409,
+ text=response_xml,
+ )
+ refresh_job = server.datasources.refresh("9dbd2263-16b5-46e1-9c43-a76bb8ab65fb")
+ assert refresh_job is None
+
+
def test_datasource_refresh_request_empty(server) -> None:
server.version = "2.8"
item = TSC.DatasourceItem("")
diff --git a/test/test_flow.py b/test/test_flow.py
index 9ebbbe5d6..008c39a96 100644
--- a/test/test_flow.py
+++ b/test/test_flow.py
@@ -17,6 +17,7 @@
PUBLISH_XML = TEST_ASSET_DIR / "flow_publish.xml"
UPDATE_XML = TEST_ASSET_DIR / "flow_update.xml"
REFRESH_XML = TEST_ASSET_DIR / "flow_refresh.xml"
+REFRESH_DUPLICATE_XML = TEST_ASSET_DIR / "flow_refresh_duplicate.xml"
@pytest.fixture(scope="function")
@@ -232,6 +233,18 @@ def test_refresh_id_str(server: TSC.Server) -> None:
assert format_datetime(refresh_job.flow_run.started_at) == "2018-05-22T13:00:29Z"
+def test_refresh_already_running(server: TSC.Server) -> None:
+ response_xml = REFRESH_DUPLICATE_XML.read_text()
+ with requests_mock.mock() as m:
+ m.post(
+ server.flows.baseurl + "/92967d2d-c7e2-46d0-8847-4802df58f484/run",
+ status_code=409,
+ text=response_xml,
+ )
+ refresh_job = server.flows.refresh("92967d2d-c7e2-46d0-8847-4802df58f484")
+ assert refresh_job is None
+
+
def test_bad_download_response(server: TSC.Server) -> None:
with requests_mock.mock() as m, tempfile.TemporaryDirectory() as td:
m.get(