Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -371,4 +371,10 @@
<className>com/google/cloud/spanner/ResultSets</className>
<method>com.google.cloud.spanner.AsyncResultSet toAsyncResultSet(com.google.cloud.spanner.ResultSet, com.google.api.gax.core.ExecutorProvider)</method>
</difference>

<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/AsyncTransactionManager</className>
<method>com.google.api.core.ApiFuture closeAsync()</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

import com.google.api.core.ApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionFunction;
import com.google.cloud.spanner.AsyncTransactionManager.CommitTimestampFuture;
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -200,4 +197,11 @@ public interface AsyncTransactionFunction<I, O> {
*/
@Override
void close();

/**
* Closes the transaction manager. If there is an active transaction, it will be rolled back. The
* underlying session will be released back to the session pool. The returned {@link ApiFuture} is
* done when the transaction (if any) has been rolled back.
*/
ApiFuture<Void> closeAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager;
import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import io.opencensus.trace.Span;
Expand Down Expand Up @@ -54,7 +55,17 @@ public void setSpan(Span span) {

@Override
public void close() {
closeAsync();
}

@Override
public ApiFuture<Void> closeAsync() {
ApiFuture<Void> res = null;
if (txnState == TransactionState.STARTED) {
res = rollbackAsync();
}
txn.close();
return MoreObjects.firstNonNull(res, ApiFutures.<Void>immediateFuture(null));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager;
import com.google.cloud.spanner.TransactionManager.TransactionState;
Expand Down Expand Up @@ -59,14 +58,41 @@ public void run() {

@Override
public void close() {
delegate.addListener(
new Runnable() {
SpannerApiFutures.get(closeAsync());
}

@Override
public ApiFuture<Void> closeAsync() {
final SettableApiFuture<Void> res = SettableApiFuture.create();
ApiFutures.addCallback(
delegate,
new ApiFutureCallback<AsyncTransactionManagerImpl>() {
@Override
public void run() {
public void onFailure(Throwable t) {
session.close();
}

@Override
public void onSuccess(AsyncTransactionManagerImpl result) {
ApiFutures.addCallback(
result.closeAsync(),
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable t) {
res.setException(t);
}

@Override
public void onSuccess(Void result) {
session.close();
res.set(result);
}
},
MoreExecutors.directExecutor());
}
},
MoreExecutors.directExecutor());
return res;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Range;
Expand All @@ -47,6 +49,8 @@
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.TransactionSelector;
import io.grpc.Status;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -181,6 +185,30 @@ public void onSuccess(long[] input) {
}
}

@Test
public void asyncTransactionManager_shouldRollbackOnCloseAsync() throws Exception {
AsyncTransactionManager manager = client().transactionManagerAsync();
TransactionContext txn = manager.beginAsync().get();
txn.executeUpdateAsync(UPDATE_STATEMENT).get();
final TransactionSelector selector = ((TransactionContextImpl) txn).getTransactionSelector();

SpannerApiFutures.get(manager.closeAsync());
// The mock server should already have the Rollback request, as we are waiting for the returned
// ApiFuture to be done.
mockSpanner.waitForRequestsToContain(
new Predicate<AbstractMessage>() {
@Override
public boolean apply(AbstractMessage input) {
if (input instanceof RollbackRequest) {
RollbackRequest request = (RollbackRequest) input;
return request.getTransactionId().equals(selector.getId());
}
return false;
}
},
0L);
}

@Test
public void asyncTransactionManagerUpdate() throws Exception {
final SettableApiFuture<Long> updateCount = SettableApiFuture.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -1927,6 +1929,23 @@ public void waitForRequestsToContain(Class<? extends AbstractMessage> type, long
}
}

public void waitForRequestsToContain(
Predicate<? super AbstractMessage> predicate, long timeoutMillis)
throws InterruptedException, TimeoutException {
Stopwatch watch = Stopwatch.createStarted();
while (true) {
Iterable<AbstractMessage> msg = Iterables.filter(getRequests(), predicate);
if (msg.iterator().hasNext()) {
break;
}
Thread.sleep(10L);
if (watch.elapsed(TimeUnit.MILLISECONDS) > timeoutMillis) {
throw new TimeoutException(
"Timeout while waiting for requests to contain the wanted request");
}
}
}

@Override
public void addResponse(AbstractMessage response) {
throw new UnsupportedOperationException();
Expand Down