From ec37f4d39bb01f1e25c74e084ae86c065ae14ea4 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 2 Jun 2023 12:40:00 +0200 Subject: [PATCH 1/5] Speed up sync integration tests This change implements: * Channels for line-by-line output from stdout/stderr * A function to wait for a sync step to complete (using above) * Ensure all tests are prefixed `TestAccSync` * Use temporary paths in WSFS instead of cloning a repo --- internal/helpers.go | 45 +++++- internal/sync_test.go | 334 ++++++++++++++++++------------------------ 2 files changed, 182 insertions(+), 197 deletions(-) diff --git a/internal/helpers.go b/internal/helpers.go index b51d005b27e..feb839711a1 100644 --- a/internal/helpers.go +++ b/internal/helpers.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "fmt" + "io" "math/rand" "os" "path/filepath" @@ -54,18 +55,51 @@ type cobraTestRunner struct { stdout bytes.Buffer stderr bytes.Buffer + // Line-by-line output. + // Background goroutines populate these channels by reading from stdout/stderr pipes. + stdoutLines <-chan string + stderrLines <-chan string + errch <-chan error } +func consumeLines(ctx context.Context, r io.Reader) <-chan string { + ch := make(chan string, 1000) + go func() { + defer close(ch) + scanner := bufio.NewScanner(r) + for scanner.Scan() { + select { + case <-ctx.Done(): + return + case ch <- scanner.Text(): + } + } + }() + return ch +} + func (t *cobraTestRunner) RunBackground() { + var stdoutR, stderrR io.Reader + var stdoutW, stderrW io.WriteCloser + stdoutR, stdoutW = io.Pipe() + stderrR, stderrW = io.Pipe() root := root.RootCmd - root.SetOut(&t.stdout) - root.SetErr(&t.stderr) + root.SetOut(stdoutW) + root.SetErr(stderrW) root.SetArgs(t.args) errch := make(chan error) ctx, cancel := context.WithCancel(context.Background()) + // Tee stdout/stderr to buffers. + stdoutR = io.TeeReader(stdoutR, &t.stdout) + stderrR = io.TeeReader(stderrR, &t.stderr) + + // Consume stdout/stderr line-by-line. + t.stdoutLines = consumeLines(ctx, stdoutR) + t.stderrLines = consumeLines(ctx, stderrR) + // Run command in background. go func() { cmd, err := root.ExecuteContextC(ctx) @@ -73,6 +107,10 @@ func (t *cobraTestRunner) RunBackground() { t.Logf("Error running command: %s", err) } + // Close pipes to signal EOF. + stdoutW.Close() + stderrW.Close() + if t.stdout.Len() > 0 { // Make a copy of the buffer such that it remains "unread". scanner := bufio.NewScanner(bytes.NewBuffer(t.stdout.Bytes())) @@ -127,6 +165,9 @@ func (c *cobraTestRunner) Eventually(condition func() bool, waitFor time.Duratio ticker := time.NewTicker(tick) defer ticker.Stop() + // Kick off condition check immediately. + go func() { ch <- condition() }() + for tick := ticker.C; ; { select { case err := <-c.errch: diff --git a/internal/sync_test.go b/internal/sync_test.go index bcff7b22a6b..ece52620021 100644 --- a/internal/sync_test.go +++ b/internal/sync_test.go @@ -25,8 +25,7 @@ import ( ) var ( - repoUrl = "https://github.com/databricks/databricks-empty-ide-project.git" - repoFiles = []string{"README-IDE.md"} + repoUrl = "https://github.com/databricks/databricks-empty-ide-project.git" ) // This test needs auth env vars to run. @@ -59,7 +58,7 @@ func setupRepo(t *testing.T, wsc *databricks.WorkspaceClient, ctx context.Contex return localRoot, remoteRoot } -type assertSync struct { +type syncTest struct { t *testing.T c *cobraTestRunner w *databricks.WorkspaceClient @@ -67,7 +66,54 @@ type assertSync struct { remoteRoot string } -func (a *assertSync) remoteDirContent(ctx context.Context, relativeDir string, expectedFiles []string) { +func setupSyncTest(t *testing.T, args ...string) *syncTest { + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + w := databricks.Must(databricks.NewWorkspaceClient()) + localRoot := t.TempDir() + remoteRoot := temporaryWorkspaceDir(t, w) + + // Prepend common arguments. + args = append([]string{ + "sync", + localRoot, + remoteRoot, + "--output", + "json", + }, args...) + + c := NewCobraTestRunner(t, args...) + c.RunBackground() + + return &syncTest{ + t: t, + c: c, + w: w, + localRoot: localRoot, + remoteRoot: remoteRoot, + } +} + +func (s *syncTest) waitForCompletionMarker() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + for { + select { + case <-ctx.Done(): + s.t.Fatal("timed out waiting for sync to complete") + case line := <-s.c.stdoutLines: + var event sync.EventBase + err := json.Unmarshal([]byte(line), &event) + require.NoError(s.t, err) + if event.Type == sync.EventTypeComplete { + return + } + } + } +} + +func (a *syncTest) remoteDirContent(ctx context.Context, relativeDir string, expectedFiles []string) { remoteDir := path.Join(a.remoteRoot, relativeDir) a.c.Eventually(func() bool { objects, err := a.w.Workspace.ListAll(ctx, workspace.ListWorkspaceRequest{ @@ -92,7 +138,7 @@ func (a *assertSync) remoteDirContent(ctx context.Context, relativeDir string, e } } -func (a *assertSync) remoteFileContent(ctx context.Context, relativePath string, expectedContent string) { +func (a *syncTest) remoteFileContent(ctx context.Context, relativePath string, expectedContent string) { filePath := path.Join(a.remoteRoot, relativePath) // Remove leading "/" so we can use it in the URL. @@ -113,7 +159,7 @@ func (a *assertSync) remoteFileContent(ctx context.Context, relativePath string, }, 30*time.Second, 5*time.Second) } -func (a *assertSync) objectType(ctx context.Context, relativePath string, expected string) { +func (a *syncTest) objectType(ctx context.Context, relativePath string, expected string) { path := path.Join(a.remoteRoot, relativePath) a.c.Eventually(func() bool { @@ -125,7 +171,7 @@ func (a *assertSync) objectType(ctx context.Context, relativePath string, expect }, 30*time.Second, 5*time.Second) } -func (a *assertSync) language(ctx context.Context, relativePath string, expected string) { +func (a *syncTest) language(ctx context.Context, relativePath string, expected string) { path := path.Join(a.remoteRoot, relativePath) a.c.Eventually(func() bool { @@ -137,7 +183,7 @@ func (a *assertSync) language(ctx context.Context, relativePath string, expected }, 30*time.Second, 5*time.Second) } -func (a *assertSync) snapshotContains(files []string) { +func (a *syncTest) snapshotContains(files []string) { snapshotPath := filepath.Join(a.localRoot, ".databricks/sync-snapshots", sync.GetFileName(a.w.Config.Host, a.remoteRoot)) assert.FileExists(a.t, snapshotPath) @@ -160,176 +206,128 @@ func (a *assertSync) snapshotContains(files []string) { assert.Equal(a.t, len(files), len(s.LastUpdatedTimes)) } -func TestAccFullFileSync(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - wsc := databricks.Must(databricks.NewWorkspaceClient()) +func TestAccSyncFullFileSync(t *testing.T) { ctx := context.Background() - - localRepoPath, remoteRepoPath := setupRepo(t, wsc, ctx) - - // Run `databricks sync` in the background. - c := NewCobraTestRunner(t, "sync", localRepoPath, remoteRepoPath, "--full", "--watch") - c.RunBackground() - - assertSync := assertSync{ - t: t, - c: c, - w: wsc, - localRoot: localRepoPath, - remoteRoot: remoteRepoPath, - } + assertSync := setupSyncTest(t, "--full", "--watch") // .gitignore is created by the sync process to enforce .databricks is not synced - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", []string{".gitignore"}) // New file - localFilePath := filepath.Join(localRepoPath, "foo.txt") + localFilePath := filepath.Join(assertSync.localRoot, "foo.txt") f := testfile.CreateFile(t, localFilePath) defer f.Close(t) - assertSync.remoteDirContent(ctx, "", append(repoFiles, "foo.txt", ".gitignore")) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", []string{"foo.txt", ".gitignore"}) assertSync.remoteFileContent(ctx, "foo.txt", "") // Write to file f.Overwrite(t, `{"statement": "Mi Gente"}`) + assertSync.waitForCompletionMarker() assertSync.remoteFileContent(ctx, "foo.txt", `{"statement": "Mi Gente"}`) // Write again f.Overwrite(t, `{"statement": "Young Dumb & Broke"}`) + assertSync.waitForCompletionMarker() assertSync.remoteFileContent(ctx, "foo.txt", `{"statement": "Young Dumb & Broke"}`) // delete f.Remove(t) - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", []string{".gitignore"}) } -func TestAccIncrementalFileSync(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - wsc := databricks.Must(databricks.NewWorkspaceClient()) +func TestAccSyncIncrementalFileSync(t *testing.T) { ctx := context.Background() - - localRepoPath, remoteRepoPath := setupRepo(t, wsc, ctx) - - // Run `databricks sync` in the background. - c := NewCobraTestRunner(t, "sync", localRepoPath, remoteRepoPath, "--watch") - c.RunBackground() - - assertSync := assertSync{ - t: t, - c: c, - w: wsc, - localRoot: localRepoPath, - remoteRoot: remoteRepoPath, - } + assertSync := setupSyncTest(t, "--watch") // .gitignore is created by the sync process to enforce .databricks is not synced - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", []string{".gitignore"}) // New file - localFilePath := filepath.Join(localRepoPath, "foo.txt") + localFilePath := filepath.Join(assertSync.localRoot, "foo.txt") f := testfile.CreateFile(t, localFilePath) defer f.Close(t) - assertSync.remoteDirContent(ctx, "", append(repoFiles, "foo.txt", ".gitignore")) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", []string{"foo.txt", ".gitignore"}) assertSync.remoteFileContent(ctx, "foo.txt", "") - assertSync.snapshotContains(append(repoFiles, "foo.txt", ".gitignore")) + assertSync.snapshotContains([]string{"foo.txt", ".gitignore"}) // Write to file f.Overwrite(t, `{"statement": "Mi Gente"}`) + assertSync.waitForCompletionMarker() assertSync.remoteFileContent(ctx, "foo.txt", `{"statement": "Mi Gente"}`) // Write again f.Overwrite(t, `{"statement": "Young Dumb & Broke"}`) + assertSync.waitForCompletionMarker() assertSync.remoteFileContent(ctx, "foo.txt", `{"statement": "Young Dumb & Broke"}`) // delete f.Remove(t) - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) - assertSync.snapshotContains(append(repoFiles, ".gitignore")) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", []string{".gitignore"}) + assertSync.snapshotContains([]string{".gitignore"}) } -func TestAccNestedFolderSync(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - wsc := databricks.Must(databricks.NewWorkspaceClient()) +func TestAccSyncNestedFolderSync(t *testing.T) { ctx := context.Background() - - localRepoPath, remoteRepoPath := setupRepo(t, wsc, ctx) - - // Run `databricks sync` in the background. - c := NewCobraTestRunner(t, "sync", localRepoPath, remoteRepoPath, "--watch") - c.RunBackground() - - assertSync := assertSync{ - t: t, - c: c, - w: wsc, - localRoot: localRepoPath, - remoteRoot: remoteRepoPath, - } + assertSync := setupSyncTest(t, "--watch") // .gitignore is created by the sync process to enforce .databricks is not synced - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", []string{".gitignore"}) // New file - localFilePath := filepath.Join(localRepoPath, "dir1/dir2/dir3/foo.txt") + localFilePath := filepath.Join(assertSync.localRoot, "dir1/dir2/dir3/foo.txt") err := os.MkdirAll(filepath.Dir(localFilePath), 0o755) assert.NoError(t, err) f := testfile.CreateFile(t, localFilePath) defer f.Close(t) - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "dir1")) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", []string{".gitignore", "dir1"}) assertSync.remoteDirContent(ctx, "dir1", []string{"dir2"}) assertSync.remoteDirContent(ctx, "dir1/dir2", []string{"dir3"}) assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", []string{"foo.txt"}) - assertSync.snapshotContains(append(repoFiles, ".gitignore", filepath.FromSlash("dir1/dir2/dir3/foo.txt"))) + assertSync.snapshotContains([]string{".gitignore", filepath.FromSlash("dir1/dir2/dir3/foo.txt")}) // delete f.Remove(t) + assertSync.waitForCompletionMarker() // directories are not cleaned up right now. This is not ideal assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", []string{}) - assertSync.snapshotContains(append(repoFiles, ".gitignore")) + assertSync.snapshotContains([]string{".gitignore"}) } -func TestAccNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - wsc := databricks.Must(databricks.NewWorkspaceClient()) +func TestAccSyncNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { ctx := context.Background() - - localRepoPath, remoteRepoPath := setupRepo(t, wsc, ctx) - - // Run `databricks sync` in the background. - c := NewCobraTestRunner(t, "sync", localRepoPath, remoteRepoPath, "--watch") - c.RunBackground() - - assertSync := assertSync{ - t: t, - c: c, - w: wsc, - localRoot: localRepoPath, - remoteRoot: remoteRepoPath, - } + assertSync := setupSyncTest(t, "--watch") // .gitignore is created by the sync process to enforce .databricks is not synced - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", []string{".gitignore"}) // New file - localFilePath := filepath.Join(localRepoPath, "dir1/a b+c/c+d e/e+f g#i.txt") + localFilePath := filepath.Join(assertSync.localRoot, "dir1/a b+c/c+d e/e+f g#i.txt") err := os.MkdirAll(filepath.Dir(localFilePath), 0o755) assert.NoError(t, err) f := testfile.CreateFile(t, localFilePath) defer f.Close(t) - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "dir1")) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", []string{".gitignore", "dir1"}) assertSync.remoteDirContent(ctx, "dir1", []string{"a b+c"}) assertSync.remoteDirContent(ctx, "dir1/a b+c", []string{"c+d e"}) assertSync.remoteDirContent(ctx, "dir1/a b+c/c+d e", []string{"e+f g#i.txt"}) - assertSync.snapshotContains(append(repoFiles, ".gitignore", filepath.FromSlash("dir1/a b+c/c+d e/e+f g#i.txt"))) + assertSync.snapshotContains([]string{".gitignore", filepath.FromSlash("dir1/a b+c/c+d e/e+f g#i.txt")}) // delete f.Remove(t) + assertSync.waitForCompletionMarker() // directories are not cleaned up right now. This is not ideal assertSync.remoteDirContent(ctx, "dir1/a b+c/c+d e", []string{}) - assertSync.snapshotContains(append(repoFiles, ".gitignore")) + assertSync.snapshotContains([]string{".gitignore"}) } // sync does not clean up empty directories from the workspace file system. @@ -341,166 +339,112 @@ func TestAccNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { // // In the above scenario sync should delete the empty folder and add foo to the remote // file system -func TestAccIncrementalFileOverwritesFolder(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - wsc := databricks.Must(databricks.NewWorkspaceClient()) +func TestAccSyncIncrementalFileOverwritesFolder(t *testing.T) { ctx := context.Background() - - localRepoPath, remoteRepoPath := setupRepo(t, wsc, ctx) - - // Run `databricks sync` in the background. - c := NewCobraTestRunner(t, "sync", localRepoPath, remoteRepoPath, "--watch") - c.RunBackground() - - assertSync := assertSync{ - t: t, - c: c, - w: wsc, - localRoot: localRepoPath, - remoteRoot: remoteRepoPath, - } + assertSync := setupSyncTest(t, "--watch") // create foo/bar.txt - localFilePath := filepath.Join(localRepoPath, "foo/bar.txt") + localFilePath := filepath.Join(assertSync.localRoot, "foo/bar.txt") err := os.MkdirAll(filepath.Dir(localFilePath), 0o755) assert.NoError(t, err) f := testfile.CreateFile(t, localFilePath) defer f.Close(t) - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo")) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", []string{".gitignore", "foo"}) assertSync.remoteDirContent(ctx, "foo", []string{"bar.txt"}) - assertSync.snapshotContains(append(repoFiles, ".gitignore", filepath.FromSlash("foo/bar.txt"))) + assertSync.snapshotContains([]string{".gitignore", filepath.FromSlash("foo/bar.txt")}) // delete foo/bar.txt f.Remove(t) - os.Remove(filepath.Join(localRepoPath, "foo")) + os.Remove(filepath.Join(assertSync.localRoot, "foo")) + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "foo", []string{}) assertSync.objectType(ctx, "foo", "DIRECTORY") - assertSync.snapshotContains(append(repoFiles, ".gitignore")) + assertSync.snapshotContains([]string{".gitignore"}) - f2 := testfile.CreateFile(t, filepath.Join(localRepoPath, "foo")) + f2 := testfile.CreateFile(t, filepath.Join(assertSync.localRoot, "foo")) defer f2.Close(t) - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo")) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", []string{".gitignore", "foo"}) assertSync.objectType(ctx, "foo", "FILE") - assertSync.snapshotContains(append(repoFiles, ".gitignore", "foo")) + assertSync.snapshotContains([]string{".gitignore", "foo"}) } -func TestAccIncrementalSyncPythonNotebookToFile(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - wsc := databricks.Must(databricks.NewWorkspaceClient()) +func TestAccSyncIncrementalSyncPythonNotebookToFile(t *testing.T) { ctx := context.Background() - - localRepoPath, remoteRepoPath := setupRepo(t, wsc, ctx) + assertSync := setupSyncTest(t, "--watch") // create python notebook - localFilePath := filepath.Join(localRepoPath, "foo.py") + localFilePath := filepath.Join(assertSync.localRoot, "foo.py") f := testfile.CreateFile(t, localFilePath) defer f.Close(t) f.Overwrite(t, "# Databricks notebook source") - // Run `databricks sync` in the background. - c := NewCobraTestRunner(t, "sync", localRepoPath, remoteRepoPath, "--watch") - c.RunBackground() - - assertSync := assertSync{ - t: t, - c: c, - w: wsc, - localRoot: localRepoPath, - remoteRoot: remoteRepoPath, - } - // notebook was uploaded properly - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo")) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", []string{".gitignore", "foo"}) assertSync.objectType(ctx, "foo", "NOTEBOOK") assertSync.language(ctx, "foo", "PYTHON") - assertSync.snapshotContains(append(repoFiles, ".gitignore", "foo.py")) + assertSync.snapshotContains([]string{".gitignore", "foo.py"}) // convert to vanilla python file f.Overwrite(t, "# No longer a python notebook") + assertSync.waitForCompletionMarker() assertSync.objectType(ctx, "foo.py", "FILE") - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo.py")) - assertSync.snapshotContains(append(repoFiles, ".gitignore", "foo.py")) + assertSync.remoteDirContent(ctx, "", []string{".gitignore", "foo.py"}) + assertSync.snapshotContains([]string{".gitignore", "foo.py"}) // delete the vanilla python file f.Remove(t) - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) - assertSync.snapshotContains(append(repoFiles, ".gitignore")) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", []string{".gitignore"}) + assertSync.snapshotContains([]string{".gitignore"}) } -func TestAccIncrementalSyncFileToPythonNotebook(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - wsc := databricks.Must(databricks.NewWorkspaceClient()) +func TestAccSyncIncrementalSyncFileToPythonNotebook(t *testing.T) { ctx := context.Background() - - localRepoPath, remoteRepoPath := setupRepo(t, wsc, ctx) - - // Run `databricks sync` in the background. - c := NewCobraTestRunner(t, "sync", localRepoPath, remoteRepoPath, "--watch") - c.RunBackground() - - assertSync := assertSync{ - t: t, - c: c, - w: wsc, - localRoot: localRepoPath, - remoteRoot: remoteRepoPath, - } + assertSync := setupSyncTest(t, "--watch") // create vanilla python file - localFilePath := filepath.Join(localRepoPath, "foo.py") + localFilePath := filepath.Join(assertSync.localRoot, "foo.py") f := testfile.CreateFile(t, localFilePath) defer f.Close(t) + assertSync.waitForCompletionMarker() // assert file upload - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo.py")) + assertSync.remoteDirContent(ctx, "", []string{".gitignore", "foo.py"}) assertSync.objectType(ctx, "foo.py", "FILE") - assertSync.snapshotContains(append(repoFiles, ".gitignore", "foo.py")) + assertSync.snapshotContains([]string{".gitignore", "foo.py"}) // convert to notebook f.Overwrite(t, "# Databricks notebook source") + assertSync.waitForCompletionMarker() assertSync.objectType(ctx, "foo", "NOTEBOOK") assertSync.language(ctx, "foo", "PYTHON") - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo")) - assertSync.snapshotContains(append(repoFiles, ".gitignore", "foo.py")) + assertSync.remoteDirContent(ctx, "", []string{".gitignore", "foo"}) + assertSync.snapshotContains([]string{".gitignore", "foo.py"}) } -func TestAccIncrementalSyncPythonNotebookDelete(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - wsc := databricks.Must(databricks.NewWorkspaceClient()) +func TestAccSyncIncrementalSyncPythonNotebookDelete(t *testing.T) { ctx := context.Background() - - localRepoPath, remoteRepoPath := setupRepo(t, wsc, ctx) + assertSync := setupSyncTest(t, "--watch") // create python notebook - localFilePath := filepath.Join(localRepoPath, "foo.py") + localFilePath := filepath.Join(assertSync.localRoot, "foo.py") f := testfile.CreateFile(t, localFilePath) defer f.Close(t) f.Overwrite(t, "# Databricks notebook source") - - // Run `databricks sync` in the background. - c := NewCobraTestRunner(t, "sync", localRepoPath, remoteRepoPath, "--watch") - c.RunBackground() - - assertSync := assertSync{ - t: t, - c: c, - w: wsc, - localRoot: localRepoPath, - remoteRoot: remoteRepoPath, - } + assertSync.waitForCompletionMarker() // notebook was uploaded properly - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo")) + assertSync.remoteDirContent(ctx, "", []string{".gitignore", "foo"}) assertSync.objectType(ctx, "foo", "NOTEBOOK") assertSync.language(ctx, "foo", "PYTHON") // Delete notebook f.Remove(t) - assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", []string{".gitignore"}) } func TestAccSyncEnsureRemotePathIsUsableIfRepoDoesntExist(t *testing.T) { From 5059d643a0e7b2782c3595e8546980c5b4e6dc43 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 2 Jun 2023 12:57:48 +0200 Subject: [PATCH 2/5] Revert append(repoFiles, ...) change --- internal/sync_test.go | 89 ++++++++++++++++++++++--------------------- 1 file changed, 45 insertions(+), 44 deletions(-) diff --git a/internal/sync_test.go b/internal/sync_test.go index ece52620021..35c9fff299a 100644 --- a/internal/sync_test.go +++ b/internal/sync_test.go @@ -25,7 +25,8 @@ import ( ) var ( - repoUrl = "https://github.com/databricks/databricks-empty-ide-project.git" + repoUrl = "https://github.com/databricks/databricks-empty-ide-project.git" + repoFiles = []string{} ) // This test needs auth env vars to run. @@ -212,14 +213,14 @@ func TestAccSyncFullFileSync(t *testing.T) { // .gitignore is created by the sync process to enforce .databricks is not synced assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "", []string{".gitignore"}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) // New file localFilePath := filepath.Join(assertSync.localRoot, "foo.txt") f := testfile.CreateFile(t, localFilePath) defer f.Close(t) assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "", []string{"foo.txt", ".gitignore"}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, "foo.txt", ".gitignore")) assertSync.remoteFileContent(ctx, "foo.txt", "") // Write to file @@ -235,7 +236,7 @@ func TestAccSyncFullFileSync(t *testing.T) { // delete f.Remove(t) assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "", []string{".gitignore"}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) } func TestAccSyncIncrementalFileSync(t *testing.T) { @@ -244,16 +245,16 @@ func TestAccSyncIncrementalFileSync(t *testing.T) { // .gitignore is created by the sync process to enforce .databricks is not synced assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "", []string{".gitignore"}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) // New file localFilePath := filepath.Join(assertSync.localRoot, "foo.txt") f := testfile.CreateFile(t, localFilePath) defer f.Close(t) assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "", []string{"foo.txt", ".gitignore"}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, "foo.txt", ".gitignore")) assertSync.remoteFileContent(ctx, "foo.txt", "") - assertSync.snapshotContains([]string{"foo.txt", ".gitignore"}) + assertSync.snapshotContains(append(repoFiles, "foo.txt", ".gitignore")) // Write to file f.Overwrite(t, `{"statement": "Mi Gente"}`) @@ -268,8 +269,8 @@ func TestAccSyncIncrementalFileSync(t *testing.T) { // delete f.Remove(t) assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "", []string{".gitignore"}) - assertSync.snapshotContains([]string{".gitignore"}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) + assertSync.snapshotContains(append(repoFiles, ".gitignore")) } func TestAccSyncNestedFolderSync(t *testing.T) { @@ -278,7 +279,7 @@ func TestAccSyncNestedFolderSync(t *testing.T) { // .gitignore is created by the sync process to enforce .databricks is not synced assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "", []string{".gitignore"}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) // New file localFilePath := filepath.Join(assertSync.localRoot, "dir1/dir2/dir3/foo.txt") @@ -287,18 +288,18 @@ func TestAccSyncNestedFolderSync(t *testing.T) { f := testfile.CreateFile(t, localFilePath) defer f.Close(t) assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "", []string{".gitignore", "dir1"}) - assertSync.remoteDirContent(ctx, "dir1", []string{"dir2"}) - assertSync.remoteDirContent(ctx, "dir1/dir2", []string{"dir3"}) - assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", []string{"foo.txt"}) - assertSync.snapshotContains([]string{".gitignore", filepath.FromSlash("dir1/dir2/dir3/foo.txt")}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "dir1")) + assertSync.remoteDirContent(ctx, "dir1", append(repoFiles, "dir2")) + assertSync.remoteDirContent(ctx, "dir1/dir2", append(repoFiles, "dir3")) + assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", append(repoFiles, "foo.txt")) + assertSync.snapshotContains(append(repoFiles, ".gitignore", filepath.FromSlash("dir1/dir2/dir3/foo.txt"))) // delete f.Remove(t) assertSync.waitForCompletionMarker() // directories are not cleaned up right now. This is not ideal - assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", []string{}) - assertSync.snapshotContains([]string{".gitignore"}) + assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", append(repoFiles)) + assertSync.snapshotContains(append(repoFiles, ".gitignore")) } func TestAccSyncNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { @@ -307,7 +308,7 @@ func TestAccSyncNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { // .gitignore is created by the sync process to enforce .databricks is not synced assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "", []string{".gitignore"}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) // New file localFilePath := filepath.Join(assertSync.localRoot, "dir1/a b+c/c+d e/e+f g#i.txt") @@ -316,18 +317,18 @@ func TestAccSyncNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { f := testfile.CreateFile(t, localFilePath) defer f.Close(t) assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "", []string{".gitignore", "dir1"}) - assertSync.remoteDirContent(ctx, "dir1", []string{"a b+c"}) - assertSync.remoteDirContent(ctx, "dir1/a b+c", []string{"c+d e"}) - assertSync.remoteDirContent(ctx, "dir1/a b+c/c+d e", []string{"e+f g#i.txt"}) - assertSync.snapshotContains([]string{".gitignore", filepath.FromSlash("dir1/a b+c/c+d e/e+f g#i.txt")}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "dir1")) + assertSync.remoteDirContent(ctx, "dir1", append(repoFiles, "a b+c")) + assertSync.remoteDirContent(ctx, "dir1/a b+c", append(repoFiles, "c+d e")) + assertSync.remoteDirContent(ctx, "dir1/a b+c/c+d e", append(repoFiles, "e+f g#i.txt")) + assertSync.snapshotContains(append(repoFiles, ".gitignore", filepath.FromSlash("dir1/a b+c/c+d e/e+f g#i.txt"))) // delete f.Remove(t) assertSync.waitForCompletionMarker() // directories are not cleaned up right now. This is not ideal - assertSync.remoteDirContent(ctx, "dir1/a b+c/c+d e", []string{}) - assertSync.snapshotContains([]string{".gitignore"}) + assertSync.remoteDirContent(ctx, "dir1/a b+c/c+d e", append(repoFiles)) + assertSync.snapshotContains(append(repoFiles, ".gitignore")) } // sync does not clean up empty directories from the workspace file system. @@ -350,24 +351,24 @@ func TestAccSyncIncrementalFileOverwritesFolder(t *testing.T) { f := testfile.CreateFile(t, localFilePath) defer f.Close(t) assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "", []string{".gitignore", "foo"}) - assertSync.remoteDirContent(ctx, "foo", []string{"bar.txt"}) - assertSync.snapshotContains([]string{".gitignore", filepath.FromSlash("foo/bar.txt")}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo")) + assertSync.remoteDirContent(ctx, "foo", append(repoFiles, "bar.txt")) + assertSync.snapshotContains(append(repoFiles, ".gitignore", filepath.FromSlash("foo/bar.txt"))) // delete foo/bar.txt f.Remove(t) os.Remove(filepath.Join(assertSync.localRoot, "foo")) assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "foo", []string{}) + assertSync.remoteDirContent(ctx, "foo", append(repoFiles)) assertSync.objectType(ctx, "foo", "DIRECTORY") - assertSync.snapshotContains([]string{".gitignore"}) + assertSync.snapshotContains(append(repoFiles, ".gitignore")) f2 := testfile.CreateFile(t, filepath.Join(assertSync.localRoot, "foo")) defer f2.Close(t) assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "", []string{".gitignore", "foo"}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo")) assertSync.objectType(ctx, "foo", "FILE") - assertSync.snapshotContains([]string{".gitignore", "foo"}) + assertSync.snapshotContains(append(repoFiles, ".gitignore", "foo")) } func TestAccSyncIncrementalSyncPythonNotebookToFile(t *testing.T) { @@ -382,23 +383,23 @@ func TestAccSyncIncrementalSyncPythonNotebookToFile(t *testing.T) { // notebook was uploaded properly assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "", []string{".gitignore", "foo"}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo")) assertSync.objectType(ctx, "foo", "NOTEBOOK") assertSync.language(ctx, "foo", "PYTHON") - assertSync.snapshotContains([]string{".gitignore", "foo.py"}) + assertSync.snapshotContains(append(repoFiles, ".gitignore", "foo.py")) // convert to vanilla python file f.Overwrite(t, "# No longer a python notebook") assertSync.waitForCompletionMarker() assertSync.objectType(ctx, "foo.py", "FILE") - assertSync.remoteDirContent(ctx, "", []string{".gitignore", "foo.py"}) - assertSync.snapshotContains([]string{".gitignore", "foo.py"}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo.py")) + assertSync.snapshotContains(append(repoFiles, ".gitignore", "foo.py")) // delete the vanilla python file f.Remove(t) assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "", []string{".gitignore"}) - assertSync.snapshotContains([]string{".gitignore"}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) + assertSync.snapshotContains(append(repoFiles, ".gitignore")) } func TestAccSyncIncrementalSyncFileToPythonNotebook(t *testing.T) { @@ -412,17 +413,17 @@ func TestAccSyncIncrementalSyncFileToPythonNotebook(t *testing.T) { assertSync.waitForCompletionMarker() // assert file upload - assertSync.remoteDirContent(ctx, "", []string{".gitignore", "foo.py"}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo.py")) assertSync.objectType(ctx, "foo.py", "FILE") - assertSync.snapshotContains([]string{".gitignore", "foo.py"}) + assertSync.snapshotContains(append(repoFiles, ".gitignore", "foo.py")) // convert to notebook f.Overwrite(t, "# Databricks notebook source") assertSync.waitForCompletionMarker() assertSync.objectType(ctx, "foo", "NOTEBOOK") assertSync.language(ctx, "foo", "PYTHON") - assertSync.remoteDirContent(ctx, "", []string{".gitignore", "foo"}) - assertSync.snapshotContains([]string{".gitignore", "foo.py"}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo")) + assertSync.snapshotContains(append(repoFiles, ".gitignore", "foo.py")) } func TestAccSyncIncrementalSyncPythonNotebookDelete(t *testing.T) { @@ -437,14 +438,14 @@ func TestAccSyncIncrementalSyncPythonNotebookDelete(t *testing.T) { assertSync.waitForCompletionMarker() // notebook was uploaded properly - assertSync.remoteDirContent(ctx, "", []string{".gitignore", "foo"}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo")) assertSync.objectType(ctx, "foo", "NOTEBOOK") assertSync.language(ctx, "foo", "PYTHON") // Delete notebook f.Remove(t) assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "", []string{".gitignore"}) + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) } func TestAccSyncEnsureRemotePathIsUsableIfRepoDoesntExist(t *testing.T) { From b8fb17916f837e39e68332622ab7770b396fe53b Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 2 Jun 2023 13:00:30 +0200 Subject: [PATCH 3/5] Fixes --- internal/sync_test.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/sync_test.go b/internal/sync_test.go index 35c9fff299a..89b318a9706 100644 --- a/internal/sync_test.go +++ b/internal/sync_test.go @@ -289,16 +289,16 @@ func TestAccSyncNestedFolderSync(t *testing.T) { defer f.Close(t) assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "dir1")) - assertSync.remoteDirContent(ctx, "dir1", append(repoFiles, "dir2")) - assertSync.remoteDirContent(ctx, "dir1/dir2", append(repoFiles, "dir3")) - assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", append(repoFiles, "foo.txt")) + assertSync.remoteDirContent(ctx, "dir1", []string{"dir2"}) + assertSync.remoteDirContent(ctx, "dir1/dir2", []string{"dir3"}) + assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", []string{"foo.txt"}) assertSync.snapshotContains(append(repoFiles, ".gitignore", filepath.FromSlash("dir1/dir2/dir3/foo.txt"))) // delete f.Remove(t) assertSync.waitForCompletionMarker() // directories are not cleaned up right now. This is not ideal - assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", append(repoFiles)) + assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", []string{}) assertSync.snapshotContains(append(repoFiles, ".gitignore")) } @@ -318,16 +318,16 @@ func TestAccSyncNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { defer f.Close(t) assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "dir1")) - assertSync.remoteDirContent(ctx, "dir1", append(repoFiles, "a b+c")) - assertSync.remoteDirContent(ctx, "dir1/a b+c", append(repoFiles, "c+d e")) - assertSync.remoteDirContent(ctx, "dir1/a b+c/c+d e", append(repoFiles, "e+f g#i.txt")) + assertSync.remoteDirContent(ctx, "dir1", []string{"a b+c"}) + assertSync.remoteDirContent(ctx, "dir1/a b+c", []string{"c+d e"}) + assertSync.remoteDirContent(ctx, "dir1/a b+c/c+d e", []string{"e+f g#i.txt"}) assertSync.snapshotContains(append(repoFiles, ".gitignore", filepath.FromSlash("dir1/a b+c/c+d e/e+f g#i.txt"))) // delete f.Remove(t) assertSync.waitForCompletionMarker() // directories are not cleaned up right now. This is not ideal - assertSync.remoteDirContent(ctx, "dir1/a b+c/c+d e", append(repoFiles)) + assertSync.remoteDirContent(ctx, "dir1/a b+c/c+d e", []string{}) assertSync.snapshotContains(append(repoFiles, ".gitignore")) } @@ -352,14 +352,14 @@ func TestAccSyncIncrementalFileOverwritesFolder(t *testing.T) { defer f.Close(t) assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo")) - assertSync.remoteDirContent(ctx, "foo", append(repoFiles, "bar.txt")) + assertSync.remoteDirContent(ctx, "foo", []string{"bar.txt"}) assertSync.snapshotContains(append(repoFiles, ".gitignore", filepath.FromSlash("foo/bar.txt"))) // delete foo/bar.txt f.Remove(t) os.Remove(filepath.Join(assertSync.localRoot, "foo")) assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "foo", append(repoFiles)) + assertSync.remoteDirContent(ctx, "foo", []string{}) assertSync.objectType(ctx, "foo", "DIRECTORY") assertSync.snapshotContains(append(repoFiles, ".gitignore")) From bc94841b70fc3beb9ba82592bc8aac2864b4d6ae Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 2 Jun 2023 13:09:24 +0200 Subject: [PATCH 4/5] Drain stdout/stderr pipes on command termination --- internal/helpers.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/helpers.go b/internal/helpers.go index feb839711a1..5ec738afdd0 100644 --- a/internal/helpers.go +++ b/internal/helpers.go @@ -111,6 +111,12 @@ func (t *cobraTestRunner) RunBackground() { stdoutW.Close() stderrW.Close() + // Read remaining data from pipes to ensure it ends up in the buffers. + // We rely on [consumeLines] to read everything but upon cancellation + // it terminates and we might have a few extra bytes in the pipe. + io.ReadAll(stdoutR) + io.ReadAll(stderrR) + if t.stdout.Len() > 0 { // Make a copy of the buffer such that it remains "unread". scanner := bufio.NewScanner(bytes.NewBuffer(t.stdout.Bytes())) From 35ef339254a51dfe52710b1ba37472f7d7ce2649 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 2 Jun 2023 14:31:38 +0200 Subject: [PATCH 5/5] Synchronize on completion of the consumeLines goroutines --- internal/helpers.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/internal/helpers.go b/internal/helpers.go index 5ec738afdd0..1f756871997 100644 --- a/internal/helpers.go +++ b/internal/helpers.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "strings" + "sync" "testing" "time" @@ -63,10 +64,12 @@ type cobraTestRunner struct { errch <-chan error } -func consumeLines(ctx context.Context, r io.Reader) <-chan string { +func consumeLines(ctx context.Context, wg *sync.WaitGroup, r io.Reader) <-chan string { ch := make(chan string, 1000) + wg.Add(1) go func() { defer close(ch) + defer wg.Done() scanner := bufio.NewScanner(r) for scanner.Scan() { select { @@ -97,8 +100,9 @@ func (t *cobraTestRunner) RunBackground() { stderrR = io.TeeReader(stderrR, &t.stderr) // Consume stdout/stderr line-by-line. - t.stdoutLines = consumeLines(ctx, stdoutR) - t.stderrLines = consumeLines(ctx, stderrR) + var wg sync.WaitGroup + t.stdoutLines = consumeLines(ctx, &wg, stdoutR) + t.stderrLines = consumeLines(ctx, &wg, stderrR) // Run command in background. go func() { @@ -111,11 +115,9 @@ func (t *cobraTestRunner) RunBackground() { stdoutW.Close() stderrW.Close() - // Read remaining data from pipes to ensure it ends up in the buffers. - // We rely on [consumeLines] to read everything but upon cancellation - // it terminates and we might have a few extra bytes in the pipe. - io.ReadAll(stdoutR) - io.ReadAll(stderrR) + // Wait for the [consumeLines] routines to finish now that + // the pipes they're reading from have closed. + wg.Wait() if t.stdout.Len() > 0 { // Make a copy of the buffer such that it remains "unread".