From 76f2753cca0b9e96f1995285a7776df7e7014fb2 Mon Sep 17 00:00:00 2001 From: Jac Fitzgerald Date: Wed, 17 Jun 2026 17:58:44 -0700 Subject: [PATCH] fix: treat duplicate extract job (409093) as warning in datasources and flows Fixes #1090 Co-Authored-By: Claude Sonnet 4.6 --- .../server/endpoint/datasources_endpoint.py | 17 ++++++++++++++--- .../server/endpoint/exceptions.py | 4 ++++ .../server/endpoint/flows_endpoint.py | 17 ++++++++++++++--- .../server/endpoint/workbooks_endpoint.py | 3 ++- test/assets/datasource_refresh_duplicate.xml | 3 +++ test/assets/flow_refresh_duplicate.xml | 3 +++ test/test_datasource.py | 14 ++++++++++++++ test/test_flow.py | 13 +++++++++++++ 8 files changed, 67 insertions(+), 7 deletions(-) create mode 100644 test/assets/datasource_refresh_duplicate.xml create mode 100644 test/assets/flow_refresh_duplicate.xml diff --git a/tableauserverclient/server/endpoint/datasources_endpoint.py b/tableauserverclient/server/endpoint/datasources_endpoint.py index 0fb5c7dc9..bac05461c 100644 --- a/tableauserverclient/server/endpoint/datasources_endpoint.py +++ b/tableauserverclient/server/endpoint/datasources_endpoint.py @@ -19,7 +19,12 @@ from tableauserverclient.server.endpoint.dqw_endpoint import _DataQualityWarningEndpoint from tableauserverclient.server.endpoint.endpoint import QuerysetEndpoint, api, parameter_added_in -from tableauserverclient.server.endpoint.exceptions import InternalServerError, MissingRequiredFieldError +from tableauserverclient.server.endpoint.exceptions import ( + DUPLICATE_EXTRACT_JOB_CODE, + InternalServerError, + MissingRequiredFieldError, + ServerResponseError, +) from tableauserverclient.server.endpoint.permissions_endpoint import _PermissionsEndpoint from tableauserverclient.server.endpoint.resource_tagger import TaggingMixin @@ -433,7 +438,7 @@ def update_connections( return connection_items @api(version="2.8") - def refresh(self, datasource_item: DatasourceItem | str, incremental: bool = False) -> JobItem: + def refresh(self, datasource_item: DatasourceItem | str, incremental: bool = False) -> JobItem | None: """ Refreshes the extract of an existing workbook. @@ -454,7 +459,13 @@ def refresh(self, datasource_item: DatasourceItem | str, incremental: bool = Fal id_ = getattr(datasource_item, "id", datasource_item) url = f"{self.baseurl}/{id_}/refresh" refresh_req = RequestFactory.Task.refresh_req(incremental, self.parent_srv) - server_response = self.post_request(url, refresh_req) + try: + server_response = self.post_request(url, refresh_req) + except ServerResponseError as e: + if e.code == DUPLICATE_EXTRACT_JOB_CODE: + logger.warning(f"{e.summary} {e.detail}") + return None + raise new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0] return new_job diff --git a/tableauserverclient/server/endpoint/exceptions.py b/tableauserverclient/server/endpoint/exceptions.py index 9d30ce331..154aa4549 100644 --- a/tableauserverclient/server/endpoint/exceptions.py +++ b/tableauserverclient/server/endpoint/exceptions.py @@ -1,6 +1,10 @@ from defusedxml.ElementTree import fromstring from typing import TypeVar +# Server error code for "extract refresh already queued" — treated as a +# non-fatal warning rather than an exception in refresh() methods. +DUPLICATE_EXTRACT_JOB_CODE = "409093" + def split_pascal_case(s: str) -> str: return "".join([f" {c}" if c.isupper() else c for c in s]).strip() diff --git a/tableauserverclient/server/endpoint/flows_endpoint.py b/tableauserverclient/server/endpoint/flows_endpoint.py index 2d480e883..ea7b59526 100644 --- a/tableauserverclient/server/endpoint/flows_endpoint.py +++ b/tableauserverclient/server/endpoint/flows_endpoint.py @@ -10,7 +10,12 @@ from tableauserverclient.server.endpoint.dqw_endpoint import _DataQualityWarningEndpoint from tableauserverclient.server.endpoint.endpoint import QuerysetEndpoint, api -from tableauserverclient.server.endpoint.exceptions import InternalServerError, MissingRequiredFieldError +from tableauserverclient.server.endpoint.exceptions import ( + DUPLICATE_EXTRACT_JOB_CODE, + InternalServerError, + MissingRequiredFieldError, + ServerResponseError, +) from tableauserverclient.server.endpoint.permissions_endpoint import _PermissionsEndpoint from tableauserverclient.server.endpoint.resource_tagger import _ResourceTagger, TaggingMixin from tableauserverclient.models import FlowItem, PaginationItem, ConnectionItem, JobItem @@ -305,7 +310,7 @@ def update_connection(self, flow_item: FlowItem, connection_item: ConnectionItem return connection @api(version="3.3") - def refresh(self, flow_item: FlowItem | str) -> JobItem: + def refresh(self, flow_item: FlowItem | str) -> JobItem | None: """ Runs the flow to refresh the data. @@ -324,7 +329,13 @@ def refresh(self, flow_item: FlowItem | str) -> JobItem: flow_id = getattr(flow_item, "id", flow_item) url = f"{self.baseurl}/{flow_id}/run" empty_req = RequestFactory.Empty.empty_req() - server_response = self.post_request(url, empty_req) + try: + server_response = self.post_request(url, empty_req) + except ServerResponseError as e: + if e.code == DUPLICATE_EXTRACT_JOB_CODE: + logger.warning(f"{e.summary} {e.detail}") + return None + raise new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0] return new_job diff --git a/tableauserverclient/server/endpoint/workbooks_endpoint.py b/tableauserverclient/server/endpoint/workbooks_endpoint.py index ce605806f..d6d77e207 100644 --- a/tableauserverclient/server/endpoint/workbooks_endpoint.py +++ b/tableauserverclient/server/endpoint/workbooks_endpoint.py @@ -11,6 +11,7 @@ from tableauserverclient.server.endpoint.endpoint import QuerysetEndpoint, api, parameter_added_in from tableauserverclient.server.endpoint.exceptions import ( + DUPLICATE_EXTRACT_JOB_CODE, InternalServerError, MissingRequiredFieldError, ServerResponseError, @@ -147,7 +148,7 @@ def refresh(self, workbook_item: WorkbookItem | str, incremental: bool = False) try: server_response = self.post_request(url, refresh_req) except ServerResponseError as e: - if e.code.startswith("409") and "already" in e.detail: + if e.code == DUPLICATE_EXTRACT_JOB_CODE: logger.warning(f"{e.summary} {e.detail}") return None raise diff --git a/test/assets/datasource_refresh_duplicate.xml b/test/assets/datasource_refresh_duplicate.xml new file mode 100644 index 000000000..38a258933 --- /dev/null +++ b/test/assets/datasource_refresh_duplicate.xml @@ -0,0 +1,3 @@ + + +Resource ConflictJob for \'extract\' is already queued. Not queuing a duplicate. diff --git a/test/assets/flow_refresh_duplicate.xml b/test/assets/flow_refresh_duplicate.xml new file mode 100644 index 000000000..38a258933 --- /dev/null +++ b/test/assets/flow_refresh_duplicate.xml @@ -0,0 +1,3 @@ + + +Resource ConflictJob for \'extract\' is already queued. Not queuing a duplicate. diff --git a/test/test_datasource.py b/test/test_datasource.py index cb1157bc5..4802c7d33 100644 --- a/test/test_datasource.py +++ b/test/test_datasource.py @@ -35,6 +35,7 @@ UPDATE_CONNECTION_XML = TEST_ASSET_DIR / "datasource_connection_update.xml" UPDATE_CONNECTIONS_XML = TEST_ASSET_DIR / "datasource_connections_update.xml" UPDATE_CONNECTIONS_NO_AUTH_XML = TEST_ASSET_DIR / "datasource_connections_update_no_auth.xml" +REFRESH_DUPLICATE_XML = TEST_ASSET_DIR / "datasource_refresh_duplicate.xml" @pytest.fixture(scope="function") @@ -473,6 +474,19 @@ def test_refresh_object(server) -> None: assert "7c3d599e-949f-44c3-94a1-f30ba85757e4" == new_job.id +def test_refresh_already_running(server) -> None: + server.version = "2.8" + response_xml = REFRESH_DUPLICATE_XML.read_text() + with requests_mock.mock() as m: + m.post( + server.datasources.baseurl + "/9dbd2263-16b5-46e1-9c43-a76bb8ab65fb/refresh", + status_code=409, + text=response_xml, + ) + refresh_job = server.datasources.refresh("9dbd2263-16b5-46e1-9c43-a76bb8ab65fb") + assert refresh_job is None + + def test_datasource_refresh_request_empty(server) -> None: server.version = "2.8" item = TSC.DatasourceItem("") diff --git a/test/test_flow.py b/test/test_flow.py index 9ebbbe5d6..008c39a96 100644 --- a/test/test_flow.py +++ b/test/test_flow.py @@ -17,6 +17,7 @@ PUBLISH_XML = TEST_ASSET_DIR / "flow_publish.xml" UPDATE_XML = TEST_ASSET_DIR / "flow_update.xml" REFRESH_XML = TEST_ASSET_DIR / "flow_refresh.xml" +REFRESH_DUPLICATE_XML = TEST_ASSET_DIR / "flow_refresh_duplicate.xml" @pytest.fixture(scope="function") @@ -232,6 +233,18 @@ def test_refresh_id_str(server: TSC.Server) -> None: assert format_datetime(refresh_job.flow_run.started_at) == "2018-05-22T13:00:29Z" +def test_refresh_already_running(server: TSC.Server) -> None: + response_xml = REFRESH_DUPLICATE_XML.read_text() + with requests_mock.mock() as m: + m.post( + server.flows.baseurl + "/92967d2d-c7e2-46d0-8847-4802df58f484/run", + status_code=409, + text=response_xml, + ) + refresh_job = server.flows.refresh("92967d2d-c7e2-46d0-8847-4802df58f484") + assert refresh_job is None + + def test_bad_download_response(server: TSC.Server) -> None: with requests_mock.mock() as m, tempfile.TemporaryDirectory() as td: m.get(