From b21b03ded075192293775df6c0cc914dd7ec05e1 Mon Sep 17 00:00:00 2001 From: Shubha Rajan Date: Mon, 9 Mar 2020 13:42:17 -0700 Subject: [PATCH 1/2] refactored and increased time limit for inspect samples --- .../dlp/snippets/InspectBigQueryTable.java | 61 ++++++++++-------- .../dlp/snippets/InspectDatastoreEntity.java | 62 +++++++++++-------- .../java/dlp/snippets/InspectGcsFile.java | 58 ++++++++++------- 3 files changed, 104 insertions(+), 77 deletions(-) diff --git a/dlp/src/main/java/dlp/snippets/InspectBigQueryTable.java b/dlp/src/main/java/dlp/snippets/InspectBigQueryTable.java index 951be74e270..7a2c5272833 100644 --- a/dlp/src/main/java/dlp/snippets/InspectBigQueryTable.java +++ b/dlp/src/main/java/dlp/snippets/InspectBigQueryTable.java @@ -18,11 +18,11 @@ // [START dlp_inspect_bigquery] +import com.google.api.core.SettableApiFuture; import com.google.cloud.dlp.v2.DlpServiceClient; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Subscriber; -import com.google.common.util.concurrent.SettableFuture; import com.google.privacy.dlp.v2.Action; import com.google.privacy.dlp.v2.BigQueryOptions; import com.google.privacy.dlp.v2.BigQueryTable; @@ -54,10 +54,9 @@ public static void inspectBigQueryTable() String projectId = "your-project-id"; String bigQueryDatasetId = "your-bigquery-dataset-id"; String bigQueryTableId = "your-bigquery-table-id"; - String pubSubTopicId = "your-pubsub-topic-id"; - String pubSubSubscriptionId = "your-pubsub-subscription-id"; - inspectBigQueryTable( - projectId, bigQueryDatasetId, bigQueryTableId, pubSubTopicId, pubSubSubscriptionId); + String topicId = "your-pubsub-topic-id"; + String subscriptionId = "your-pubsub-subscription-id"; + inspectBigQueryTable(projectId, bigQueryDatasetId, bigQueryTableId, topicId, subscriptionId); } // Inspects a BigQuery Table @@ -65,8 +64,8 @@ public static void inspectBigQueryTable( String projectId, String bigQueryDatasetId, String bigQueryTableId, - String pubSubTopicId, - String pubSubSubscriptionName) + String topicId, + String subscriptionId) throws ExecutionException, InterruptedException, IOException { // Initialize client that will be used to send requests. This client only needs to be created // once, and can be reused for multiple requests. After completing all of your requests, call @@ -98,7 +97,7 @@ public static void inspectBigQueryTable( InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build(); // Specify the action that is triggered when the job completes. - String pubSubTopic = String.format("projects/%s/topics/%s", projectId, pubSubTopicId); + String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId); Action.PublishToPubSub publishToPubSub = Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build(); Action action = Action.newBuilder().setPubSub(publishToPubSub).build(); @@ -119,36 +118,32 @@ public static void inspectBigQueryTable( .build(); // Use the client to send the request. - final DlpJob job = dlp.createDlpJob(createDlpJobRequest); - System.out.println("Job created: " + job.getName()); + final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest); + System.out.println("Job created: " + dlpJob.getName()); + + // Set up a Pub/Sub subscriber to listen on the job completion status + final SettableApiFuture done = SettableApiFuture.create(); - // Set up a Pub/Sub subscriber to listen for the job completion status - SettableFuture jobDone = SettableFuture.create(); ProjectSubscriptionName subscriptionName = - ProjectSubscriptionName.of(projectId, pubSubSubscriptionName); - MessageReceiver handleMessage = + ProjectSubscriptionName.of(projectId, subscriptionId); + + MessageReceiver messageHandler = (PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> { - String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName"); - if (job.getName().equals(messageAttribute)) { - jobDone.set(null); - ackReplyConsumer.ack(); - } else { - ackReplyConsumer.nack(); - } + handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer); }; - Subscriber subscriber = Subscriber.newBuilder(subscriptionName, handleMessage).build(); - subscriber.startAsync(); // Let the subscriber listen to messages + Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build(); + subscriber.startAsync(); // Wait for the original job to complete try { - jobDone.get(10, TimeUnit.MINUTES); + done.get(15, TimeUnit.MINUTES); } catch (TimeoutException e) { - System.out.println("Job was not completed after 10 minutes."); + System.out.println("Job was not completed after 15 minutes."); return; } // Get the latest state of the job from the service - GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(job.getName()).build(); + GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build(); DlpJob completedJob = dlp.getDlpJob(request); // Parse the response and process results. @@ -161,5 +156,19 @@ public static void inspectBigQueryTable( } } } + // handleMessage injects the job and settableFuture into the message reciever interface + private static void handleMessage( + DlpJob job, + SettableApiFuture done, + PubsubMessage pubsubMessage, + AckReplyConsumer ackReplyConsumer) { + String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName"); + if (job.getName().equals(messageAttribute)) { + done.set(true); + ackReplyConsumer.ack(); + } else { + ackReplyConsumer.nack(); + } + } } // [END dlp_inspect_bigquery] diff --git a/dlp/src/main/java/dlp/snippets/InspectDatastoreEntity.java b/dlp/src/main/java/dlp/snippets/InspectDatastoreEntity.java index 1d35fd7c34c..f878424ebde 100644 --- a/dlp/src/main/java/dlp/snippets/InspectDatastoreEntity.java +++ b/dlp/src/main/java/dlp/snippets/InspectDatastoreEntity.java @@ -18,11 +18,11 @@ // [START dlp_inspect_datastore] +import com.google.api.core.SettableApiFuture; import com.google.cloud.dlp.v2.DlpServiceClient; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Subscriber; -import com.google.common.util.concurrent.SettableFuture; import com.google.privacy.dlp.v2.Action; import com.google.privacy.dlp.v2.CreateDlpJobRequest; import com.google.privacy.dlp.v2.DatastoreOptions; @@ -55,10 +55,9 @@ public static void insepctDatastoreEntity() String projectId = "your-project-id"; String datastoreNamespace = "your-datastore-namespace"; String datastoreKind = "your-datastore-kind"; - String pubSubTopicId = "your-pubsub-topic-id"; - String pubSubSubscriptionId = "your-pubsub-subscription-id"; - insepctDatastoreEntity( - projectId, datastoreNamespace, datastoreKind, pubSubTopicId, pubSubSubscriptionId); + String topicId = "your-pubsub-topic-id"; + String subscriptionId = "your-pubsub-subscription-id"; + insepctDatastoreEntity(projectId, datastoreNamespace, datastoreKind, topicId, subscriptionId); } // Inspects a Datastore Entity. @@ -66,8 +65,8 @@ public static void insepctDatastoreEntity( String projectId, String datastoreNamespce, String datastoreKind, - String pubSubTopicId, - String pubSubSubscriptionName) + String topicId, + String subscriptionId) throws ExecutionException, InterruptedException, IOException { // Initialize client that will be used to send requests. This client only needs to be created // once, and can be reused for multiple requests. After completing all of your requests, call @@ -99,7 +98,7 @@ public static void insepctDatastoreEntity( InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build(); // Specify the action that is triggered when the job completes. - String pubSubTopic = String.format("projects/%s/topics/%s", projectId, pubSubTopicId); + String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId); Action.PublishToPubSub publishToPubSub = Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build(); Action action = Action.newBuilder().setPubSub(publishToPubSub).build(); @@ -120,37 +119,32 @@ public static void insepctDatastoreEntity( .build(); // Use the client to send the request. - final DlpJob job = dlp.createDlpJob(createDlpJobRequest); - System.out.println("Job created: " + job.getName()); + final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest); + System.out.println("Job created: " + dlpJob.getName()); + + // Set up a Pub/Sub subscriber to listen on the job completion status + final SettableApiFuture done = SettableApiFuture.create(); - // Set up a Pub/Sub subscriber to listen for the job completion status - SettableFuture jobDone = SettableFuture.create(); ProjectSubscriptionName subscriptionName = - ProjectSubscriptionName.of(projectId, pubSubSubscriptionName); - MessageReceiver handleMessage = + ProjectSubscriptionName.of(projectId, subscriptionId); + + MessageReceiver messageHandler = (PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> { - String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName"); - if (job.getName().equals(messageAttribute)) { - jobDone.set(null); - ackReplyConsumer.ack(); - } else { - ackReplyConsumer.nack(); - ; - } + handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer); }; - Subscriber subscriber = Subscriber.newBuilder(subscriptionName, handleMessage).build(); - subscriber.startAsync(); // Let the subscriber listen to messages + Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build(); + subscriber.startAsync(); // Wait for the original job to complete try { - jobDone.get(10, TimeUnit.MINUTES); + done.get(15, TimeUnit.MINUTES); } catch (TimeoutException e) { - System.out.println("Job was not completed after 10 minutes."); + System.out.println("Job was not completed after 15 minutes."); return; } // Get the latest state of the job from the service - GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(job.getName()).build(); + GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build(); DlpJob completedJob = dlp.getDlpJob(request); // Parse the response and process results. @@ -163,5 +157,19 @@ public static void insepctDatastoreEntity( } } } + // handleMessage injects the job and settableFuture into the message reciever interface + private static void handleMessage( + DlpJob job, + SettableApiFuture done, + PubsubMessage pubsubMessage, + AckReplyConsumer ackReplyConsumer) { + String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName"); + if (job.getName().equals(messageAttribute)) { + done.set(true); + ackReplyConsumer.ack(); + } else { + ackReplyConsumer.nack(); + } + } } // [END dlp_inspect_datastore] diff --git a/dlp/src/main/java/dlp/snippets/InspectGcsFile.java b/dlp/src/main/java/dlp/snippets/InspectGcsFile.java index 1d3d6d571ee..3f15658fdc6 100644 --- a/dlp/src/main/java/dlp/snippets/InspectGcsFile.java +++ b/dlp/src/main/java/dlp/snippets/InspectGcsFile.java @@ -17,7 +17,7 @@ package dlp.snippets; // [START dlp_inspect_gcs] - +import com.google.api.core.SettableApiFuture; import com.google.cloud.dlp.v2.DlpServiceClient; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; @@ -52,14 +52,14 @@ public static void inspectGcsFile() throws InterruptedException, ExecutionExcept // TODO(developer): Replace these variables before running the sample. String projectId = "your-project-id"; String gcsUri = "gs://" + "your-bucket-name" + "/path/to/your/file.txt"; - String pubSubTopicId = "your-pubsub-topic-id"; - String pubSubSubscriptionId = "your-pubsub-subscription-id"; - inspectGcsFile(projectId, gcsUri, pubSubTopicId, pubSubSubscriptionId); + String topicId = "your-pubsub-topic-id"; + String subscriptionId = "your-pubsub-subscription-id"; + inspectGcsFile(projectId, gcsUri, topicId, subscriptionId); } // Inspects a file in a Google Cloud Storage Bucket. public static void inspectGcsFile( - String projectId, String gcsUri, String pubSubTopicId, String pubSubSubscriptionName) + String projectId, String gcsUri, String topicId, String subscriptionId) throws ExecutionException, InterruptedException, IOException { // Initialize client that will be used to send requests. This client only needs to be created // once, and can be reused for multiple requests. After completing all of your requests, call @@ -84,7 +84,7 @@ public static void inspectGcsFile( InspectConfig.newBuilder().addAllInfoTypes(infoTypes).setIncludeQuote(true).build(); // Specify the action that is triggered when the job completes. - String pubSubTopic = String.format("projects/%s/topics/%s", projectId, pubSubTopicId); + String pubSubTopic = String.format("projects/%s/topics/%s", projectId, topicId); Action.PublishToPubSub publishToPubSub = Action.PublishToPubSub.newBuilder().setTopic(pubSubTopic).build(); Action action = Action.newBuilder().setPubSub(publishToPubSub).build(); @@ -105,36 +105,32 @@ public static void inspectGcsFile( .build(); // Use the client to send the request. - final DlpJob job = dlp.createDlpJob(createDlpJobRequest); - System.out.println("Job created: " + job.getName()); + final DlpJob dlpJob = dlp.createDlpJob(createDlpJobRequest); + System.out.println("Job created: " + dlpJob.getName()); + + // Set up a Pub/Sub subscriber to listen on the job completion status + final SettableApiFuture done = SettableApiFuture.create(); - // Set up a Pub/Sub subscriber to listen for the job completion status - SettableFuture jobDone = SettableFuture.create(); ProjectSubscriptionName subscriptionName = - ProjectSubscriptionName.of(projectId, pubSubSubscriptionName); - MessageReceiver handleMessage = + ProjectSubscriptionName.of(projectId, subscriptionId); + + MessageReceiver messageHandler = (PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) -> { - String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName"); - if (job.getName().equals(messageAttribute)) { - jobDone.set(null); - ackReplyConsumer.ack(); - } else { - ackReplyConsumer.nack(); - } + handleMessage(dlpJob, done, pubsubMessage, ackReplyConsumer); }; - Subscriber subscriber = Subscriber.newBuilder(subscriptionName, handleMessage).build(); - subscriber.startAsync(); // Let the subscriber listen to messages + Subscriber subscriber = Subscriber.newBuilder(subscriptionName, messageHandler).build(); + subscriber.startAsync(); // Wait for the original job to complete try { - jobDone.get(10, TimeUnit.MINUTES); + done.get(15, TimeUnit.MINUTES); } catch (TimeoutException e) { - System.out.println("Job was not completed after 10 minutes."); + System.out.println("Job was not completed after 15 minutes."); return; } // Get the latest state of the job from the service - GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(job.getName()).build(); + GetDlpJobRequest request = GetDlpJobRequest.newBuilder().setName(dlpJob.getName()).build(); DlpJob completedJob = dlp.getDlpJob(request); // Parse the response and process results. @@ -147,5 +143,19 @@ public static void inspectGcsFile( } } } + // handleMessage injects the job and settableFuture into the message reciever interface + private static void handleMessage( + DlpJob job, + SettableApiFuture done, + PubsubMessage pubsubMessage, + AckReplyConsumer ackReplyConsumer) { + String messageAttribute = pubsubMessage.getAttributesMap().get("DlpJobName"); + if (job.getName().equals(messageAttribute)) { + done.set(true); + ackReplyConsumer.ack(); + } else { + ackReplyConsumer.nack(); + } + } } // [END dlp_inspect_gcs] From abeb641094aa6d570ff3316302fcca0d74d0a25f Mon Sep 17 00:00:00 2001 From: Shubha Rajan Date: Mon, 9 Mar 2020 14:27:00 -0700 Subject: [PATCH 2/2] fixed checkstyle violations --- dlp/src/main/java/dlp/snippets/InspectBigQueryTable.java | 1 + dlp/src/main/java/dlp/snippets/InspectDatastoreEntity.java | 1 + dlp/src/main/java/dlp/snippets/InspectGcsFile.java | 1 + 3 files changed, 3 insertions(+) diff --git a/dlp/src/main/java/dlp/snippets/InspectBigQueryTable.java b/dlp/src/main/java/dlp/snippets/InspectBigQueryTable.java index 7a2c5272833..2e7f2a968d9 100644 --- a/dlp/src/main/java/dlp/snippets/InspectBigQueryTable.java +++ b/dlp/src/main/java/dlp/snippets/InspectBigQueryTable.java @@ -156,6 +156,7 @@ public static void inspectBigQueryTable( } } } + // handleMessage injects the job and settableFuture into the message reciever interface private static void handleMessage( DlpJob job, diff --git a/dlp/src/main/java/dlp/snippets/InspectDatastoreEntity.java b/dlp/src/main/java/dlp/snippets/InspectDatastoreEntity.java index f878424ebde..ef6810b49b2 100644 --- a/dlp/src/main/java/dlp/snippets/InspectDatastoreEntity.java +++ b/dlp/src/main/java/dlp/snippets/InspectDatastoreEntity.java @@ -157,6 +157,7 @@ public static void insepctDatastoreEntity( } } } + // handleMessage injects the job and settableFuture into the message reciever interface private static void handleMessage( DlpJob job, diff --git a/dlp/src/main/java/dlp/snippets/InspectGcsFile.java b/dlp/src/main/java/dlp/snippets/InspectGcsFile.java index 3f15658fdc6..5b9a7a01618 100644 --- a/dlp/src/main/java/dlp/snippets/InspectGcsFile.java +++ b/dlp/src/main/java/dlp/snippets/InspectGcsFile.java @@ -143,6 +143,7 @@ public static void inspectGcsFile( } } } + // handleMessage injects the job and settableFuture into the message reciever interface private static void handleMessage( DlpJob job,