From d79c460bf0d12b272e6ec6db5d55d394918a7beb Mon Sep 17 00:00:00 2001 From: Olav Loite Date: Fri, 9 Oct 2020 12:21:34 +0200 Subject: [PATCH 1/3] fix: AsyncTransactionManager should rollback on close AsyncTransctionManager should rollback the transaction if close is called while the transaction is still in state STARTED. Failing to do so, will keep the transaction open on the backend for longer than necessary, holding on to locks until it is garbage collected after 10 seconds. Fixes #504 --- .../spanner/AsyncTransactionManagerImpl.java | 3 +++ .../SessionPoolAsyncTransactionManager.java | 13 +++++++--- .../spanner/AsyncTransactionManagerTest.java | 26 +++++++++++++++++++ .../cloud/spanner/MockSpannerServiceImpl.java | 19 ++++++++++++++ 4 files changed, 58 insertions(+), 3 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java index 082fa827e73..59f3e00a430 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java @@ -54,6 +54,9 @@ public void setSpan(Span span) { @Override public void close() { + if (txnState == TransactionState.STARTED) { + rollbackAsync(); + } txn.close(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java index 55b6102a270..9aa0a4e2f20 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java @@ -59,10 +59,17 @@ public void run() { @Override public void close() { - delegate.addListener( - new Runnable() { + ApiFutures.addCallback( + delegate, + new ApiFutureCallback() { @Override - public void run() { + public void onFailure(Throwable t) { + session.close(); + } + + @Override + public void onSuccess(AsyncTransactionManagerImpl result) { + result.close(); session.close(); } }, diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java index c7b95f33f63..7854a0db1f4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java @@ -36,7 +36,9 @@ import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.cloud.spanner.Options.ReadOption; +import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Range; @@ -47,6 +49,8 @@ import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.ExecuteBatchDmlRequest; import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.RollbackRequest; +import com.google.spanner.v1.TransactionSelector; import io.grpc.Status; import java.util.Arrays; import java.util.Collection; @@ -181,6 +185,28 @@ public void onSuccess(long[] input) { } } + @Test + public void asyncTransactionManager_shouldRollbackOnClose() throws Exception { + AsyncTransactionManager manager = client().transactionManagerAsync(); + TransactionContext txn = manager.beginAsync().get(); + txn.executeUpdateAsync(UPDATE_STATEMENT).get(); + final TransactionSelector selector = ((TransactionContextImpl) txn).getTransactionSelector(); + + manager.close(); + mockSpanner.waitForRequestsToContain( + new Predicate() { + @Override + public boolean apply(AbstractMessage input) { + if (input instanceof RollbackRequest) { + RollbackRequest request = (RollbackRequest) input; + return request.getTransactionId().equals(selector.getId()); + } + return false; + } + }, + 5000L); + } + @Test public void asyncTransactionManagerUpdate() throws Exception { final SettableApiFuture updateCount = SettableApiFuture.create(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 4f55cd5ebd7..5ecf9607a49 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -23,8 +23,10 @@ import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.AbstractMessage; import com.google.protobuf.ByteString; @@ -1927,6 +1929,23 @@ public void waitForRequestsToContain(Class type, long } } + public void waitForRequestsToContain( + Predicate predicate, long timeoutMillis) + throws InterruptedException, TimeoutException { + Stopwatch watch = Stopwatch.createStarted(); + while (true) { + Iterable msg = Iterables.filter(getRequests(), predicate); + if (msg.iterator().hasNext()) { + break; + } + Thread.sleep(10L); + if (watch.elapsed(TimeUnit.MILLISECONDS) > timeoutMillis) { + throw new TimeoutException( + "Timeout while waiting for requests to contain the wanted request"); + } + } + } + @Override public void addResponse(AbstractMessage response) { throw new UnsupportedOperationException(); From daed417fdc2453d0365c545083b673eac3c06763 Mon Sep 17 00:00:00 2001 From: Olav Loite Date: Tue, 13 Oct 2020 15:58:14 +0200 Subject: [PATCH 2/3] feat: return rollback result from close --- .../spanner/AsyncTransactionManager.java | 10 +++++--- .../spanner/AsyncTransactionManagerImpl.java | 10 +++++++- .../SessionPoolAsyncTransactionManager.java | 25 ++++++++++++++++--- .../spanner/AsyncTransactionManagerTest.java | 8 +++--- 4 files changed, 43 insertions(+), 10 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java index d519c68013f..02d4a9dbd23 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java @@ -18,9 +18,6 @@ import com.google.api.core.ApiFuture; import com.google.cloud.Timestamp; -import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionFunction; -import com.google.cloud.spanner.AsyncTransactionManager.CommitTimestampFuture; -import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture; import com.google.cloud.spanner.TransactionManager.TransactionState; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -200,4 +197,11 @@ public interface AsyncTransactionFunction { */ @Override void close(); + + /** + * Closes the transaction manager. If there is an active transaction, it will be rolled back. The + * underlying session will be released back to the session pool. The returned {@link ApiFuture} is + * done when the transaction (if any) has been rolled back. + */ + ApiFuture closeAsync(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java index 59f3e00a430..350349af163 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java @@ -24,6 +24,7 @@ import com.google.cloud.spanner.SessionImpl.SessionTransaction; import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager; import com.google.cloud.spanner.TransactionManager.TransactionState; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; import io.opencensus.trace.Span; @@ -54,10 +55,17 @@ public void setSpan(Span span) { @Override public void close() { + closeAsync(); + } + + @Override + public ApiFuture closeAsync() { + ApiFuture res = null; if (txnState == TransactionState.STARTED) { - rollbackAsync(); + res = rollbackAsync(); } txn.close(); + return MoreObjects.firstNonNull(res, ApiFutures.immediateFuture(null)); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java index 9aa0a4e2f20..54b621b93b8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java @@ -22,7 +22,6 @@ import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.cloud.Timestamp; -import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager; import com.google.cloud.spanner.TransactionManager.TransactionState; @@ -59,6 +58,12 @@ public void run() { @Override public void close() { + SpannerApiFutures.get(closeAsync()); + } + + @Override + public ApiFuture closeAsync() { + final SettableApiFuture res = SettableApiFuture.create(); ApiFutures.addCallback( delegate, new ApiFutureCallback() { @@ -69,11 +74,25 @@ public void onFailure(Throwable t) { @Override public void onSuccess(AsyncTransactionManagerImpl result) { - result.close(); - session.close(); + ApiFutures.addCallback( + result.closeAsync(), + new ApiFutureCallback() { + @Override + public void onFailure(Throwable t) { + res.setException(t); + } + + @Override + public void onSuccess(Void result) { + session.close(); + res.set(result); + } + }, + MoreExecutors.directExecutor()); } }, MoreExecutors.directExecutor()); + return res; } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java index 7854a0db1f4..bf1f214a715 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java @@ -186,13 +186,15 @@ public void onSuccess(long[] input) { } @Test - public void asyncTransactionManager_shouldRollbackOnClose() throws Exception { + public void asyncTransactionManager_shouldRollbackOnCloseAsync() throws Exception { AsyncTransactionManager manager = client().transactionManagerAsync(); TransactionContext txn = manager.beginAsync().get(); txn.executeUpdateAsync(UPDATE_STATEMENT).get(); final TransactionSelector selector = ((TransactionContextImpl) txn).getTransactionSelector(); - manager.close(); + SpannerApiFutures.get(manager.closeAsync()); + // The mock server should already have the Rollback request, as we are waiting for the returned + // ApiFuture to be done. mockSpanner.waitForRequestsToContain( new Predicate() { @Override @@ -204,7 +206,7 @@ public boolean apply(AbstractMessage input) { return false; } }, - 5000L); + 0L); } @Test From 0614cc4ca8b180f9880affb8d40f695b03fd752c Mon Sep 17 00:00:00 2001 From: Olav Loite Date: Tue, 13 Oct 2020 17:00:28 +0200 Subject: [PATCH 3/3] fix: add ignored diff to clirr --- google-cloud-spanner/clirr-ignored-differences.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index cfbcb88f852..1f7beb76e9a 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -371,4 +371,10 @@ com/google/cloud/spanner/ResultSets com.google.cloud.spanner.AsyncResultSet toAsyncResultSet(com.google.cloud.spanner.ResultSet, com.google.api.gax.core.ExecutorProvider) + + + 7012 + com/google/cloud/spanner/AsyncTransactionManager + com.google.api.core.ApiFuture closeAsync() +