Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions bundle/deployer/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"log"
"os"
"path"
"path/filepath"
"strings"

Expand Down Expand Up @@ -71,7 +70,7 @@ func Create(ctx context.Context, env, localRoot, remoteRoot string, wsc *databri
if err != nil {
return nil, err
}
newLocker := CreateLocker(user.UserName, remoteRoot)
newLocker, err := CreateLocker(user.UserName, remoteRoot, wsc)
if err != nil {
return nil, err
}
Expand All @@ -89,15 +88,16 @@ func (b *Deployer) DefaultTerraformRoot() string {
}

func (b *Deployer) tfStateRemotePath() string {
return path.Join(b.remoteRoot, ".bundle", "terraform.tfstate")
// Note: remote paths are scoped to `remoteRoot` through the locker. Also see [Create].
return ".bundle/terraform.tfstate"
Comment thread
pietern marked this conversation as resolved.
}

func (b *Deployer) tfStateLocalPath() string {
return filepath.Join(b.DefaultTerraformRoot(), "terraform.tfstate")
}

func (b *Deployer) LoadTerraformState(ctx context.Context) error {
bytes, err := b.locker.GetRawJsonFileContent(ctx, b.wsc, b.tfStateRemotePath())
bytes, err := b.locker.GetRawJsonFileContent(ctx, b.tfStateRemotePath())
if err != nil {
// If remote tf state is absent, use local tf state
if strings.Contains(err.Error(), "File not found.") {
Expand All @@ -119,15 +119,15 @@ func (b *Deployer) SaveTerraformState(ctx context.Context) error {
if err != nil {
return err
}
return b.locker.PutFile(ctx, b.wsc, b.tfStateRemotePath(), bytes)
return b.locker.PutFile(ctx, b.tfStateRemotePath(), bytes)
}

func (d *Deployer) Lock(ctx context.Context, isForced bool) error {
return d.locker.Lock(ctx, d.wsc, isForced)
return d.locker.Lock(ctx, isForced)
}

func (d *Deployer) Unlock(ctx context.Context) error {
return d.locker.Unlock(ctx, d.wsc)
return d.locker.Unlock(ctx)
}

func (d *Deployer) ApplyTerraformConfig(ctx context.Context, configPath, terraformBinaryPath string, isForced bool) (DeploymentStatus, error) {
Expand Down
113 changes: 65 additions & 48 deletions bundle/deployer/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"path"
"io"
"strings"
"time"

"github.com/databricks/bricks/utilities"
"github.com/databricks/bricks/libs/filer"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/google/uuid"
)

Expand Down Expand Up @@ -41,6 +39,8 @@ import (
// of exclusive access that other clients with non forcefully acquired
// locks might have
type Locker struct {
filer filer.Filer

// scope of the locker
TargetDir string
// Active == true implies exclusive access to TargetDir for the client.
Expand All @@ -63,24 +63,31 @@ type LockState struct {
User string
}

// don't need to hold lock on TargetDir to read locker state
func GetActiveLockState(ctx context.Context, wsc *databricks.WorkspaceClient, path string) (*LockState, error) {
bytes, err := utilities.GetRawJsonFileContent(ctx, wsc, path)
// GetActiveLockState returns current lock state, irrespective of us holding it.
func (locker *Locker) GetActiveLockState(ctx context.Context) (*LockState, error) {
reader, err := locker.filer.Read(ctx, locker.RemotePath())
if err != nil {
return nil, err
}

bytes, err := io.ReadAll(reader)
if err != nil {
return nil, err
}

remoteLock := LockState{}
err = json.Unmarshal(bytes, &remoteLock)
if err != nil {
return nil, err
}

return &remoteLock, nil
}

// asserts whether lock is held by locker. Returns descriptive error with current
// holder details if locker does not hold the lock
func (locker *Locker) assertLockHeld(ctx context.Context, wsc *databricks.WorkspaceClient) error {
activeLockState, err := GetActiveLockState(ctx, wsc, locker.RemotePath())
func (locker *Locker) assertLockHeld(ctx context.Context) error {
activeLockState, err := locker.GetActiveLockState(ctx)
if err != nil && strings.Contains(err.Error(), "File not found.") {
return fmt.Errorf("no active lock on target dir: %s", err)
}
Expand All @@ -97,53 +104,58 @@ func (locker *Locker) assertLockHeld(ctx context.Context, wsc *databricks.Worksp
}

// idempotent function since overwrite is set to true
func (locker *Locker) PutFile(ctx context.Context, wsc *databricks.WorkspaceClient, pathToFile string, content []byte) error {
func (locker *Locker) PutFile(ctx context.Context, pathToFile string, content []byte) error {
if !locker.Active {
return fmt.Errorf("failed to put file. deploy lock not held")
}
apiClient, err := client.New(wsc.Config)
if err != nil {
return err
}
apiPath := fmt.Sprintf(
"/api/2.0/workspace-files/import-file/%s?overwrite=true",
strings.TrimLeft(pathToFile, "/"))

err = apiClient.Do(ctx, http.MethodPost, apiPath, bytes.NewReader(content), nil)
if err != nil {
// retry after creating parent dirs
err = wsc.Workspace.MkdirsByPath(ctx, path.Dir(pathToFile))
if err != nil {
return fmt.Errorf("could not mkdir to put file: %s", err)
}
err = apiClient.Do(ctx, http.MethodPost, apiPath, bytes.NewReader(content), nil)
}
return err
return locker.filer.Write(ctx, pathToFile, bytes.NewReader(content), filer.OverwriteIfExists, filer.CreateParentDirectories)
}

func (locker *Locker) GetRawJsonFileContent(ctx context.Context, wsc *databricks.WorkspaceClient, path string) ([]byte, error) {
func (locker *Locker) GetRawJsonFileContent(ctx context.Context, path string) ([]byte, error) {
if !locker.Active {
return nil, fmt.Errorf("failed to get file. deploy lock not held")
}
return utilities.GetRawJsonFileContent(ctx, wsc, path)
reader, err := locker.filer.Read(ctx, path)
if err != nil {
return nil, err
}
return io.ReadAll(reader)
}

func (locker *Locker) Lock(ctx context.Context, wsc *databricks.WorkspaceClient, isForced bool) error {
func (locker *Locker) Lock(ctx context.Context, isForced bool) error {
newLockerState := LockState{
ID: locker.State.ID,
AcquisitionTime: time.Now(),
IsForced: isForced,
User: locker.State.User,
}
bytes, err := json.Marshal(newLockerState)
buf, err := json.Marshal(newLockerState)
if err != nil {
return err
}
err = utilities.WriteFile(ctx, wsc, locker.RemotePath(), bytes, isForced)
if err != nil && !strings.Contains(err.Error(), fmt.Sprintf("%s already exists", locker.RemotePath())) {
return err

var modes = []filer.WriteMode{
// Always create parent directory if it doesn't yet exist.
filer.CreateParentDirectories,
}

// Only overwrite lock file if `isForced`.
if isForced {
modes = append(modes, filer.OverwriteIfExists)
}

err = locker.filer.Write(ctx, locker.RemotePath(), bytes.NewReader(buf), modes...)
if err != nil {
// If the write failed because the lock file already exists, don't return
// the error and instead fall through to [assertLockHeld] below.
// This function will return a more descriptive error message that includes
// details about the current holder of the lock.
if !errors.As(err, &filer.FileAlreadyExistsError{}) {
return err
}
}
err = locker.assertLockHeld(ctx, wsc)

err = locker.assertLockHeld(ctx)
if err != nil {
return err
}
Expand All @@ -153,20 +165,15 @@ func (locker *Locker) Lock(ctx context.Context, wsc *databricks.WorkspaceClient,
return nil
}

func (locker *Locker) Unlock(ctx context.Context, wsc *databricks.WorkspaceClient) error {
func (locker *Locker) Unlock(ctx context.Context) error {
if !locker.Active {
return fmt.Errorf("unlock called when lock is not held")
}
err := locker.assertLockHeld(ctx, wsc)
err := locker.assertLockHeld(ctx)
if err != nil {
return fmt.Errorf("unlock called when lock is not held: %s", err)
}
err = wsc.Workspace.Delete(ctx,
workspace.Delete{
Path: locker.RemotePath(),
Recursive: false,
},
)
err = locker.filer.Delete(ctx, locker.RemotePath())
if err != nil {
return err
}
Expand All @@ -175,16 +182,26 @@ func (locker *Locker) Unlock(ctx context.Context, wsc *databricks.WorkspaceClien
}

func (locker *Locker) RemotePath() string {
return path.Join(locker.TargetDir, ".bundle/deploy.lock")
// Note: remote paths are scoped to `targetDir`. Also see [CreateLocker].
return ".bundle/deploy.lock"
Comment thread
pietern marked this conversation as resolved.
}

func CreateLocker(user string, targetDir string) *Locker {
return &Locker{
func CreateLocker(user string, targetDir string, w *databricks.WorkspaceClient) (*Locker, error) {
filer, err := filer.NewWorkspaceFilesClient(w, targetDir)
if err != nil {
return nil, err
}

locker := &Locker{
filer: filer,

TargetDir: targetDir,
Active: false,
State: &LockState{
ID: uuid.New(),
User: user,
},
}

return locker, nil
}
31 changes: 17 additions & 14 deletions internal/locker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"math/rand"
"os"
"os/exec"
"path"
"path/filepath"
"sync"
"testing"
Expand Down Expand Up @@ -68,12 +67,16 @@ func TestAccLock(t *testing.T) {
// 50 lockers try to acquire a lock at the same time
numConcurrentLocks := 50

var err error
// Keep single locker unlocked.
// We use this to check on the current lock through GetActiveLockState.
locker, err := deployer.CreateLocker("humpty.dumpty@databricks.com", remoteProjectRoot, wsc)
require.NoError(t, err)

lockerErrs := make([]error, numConcurrentLocks)
lockers := make([]*deployer.Locker, numConcurrentLocks)

for i := 0; i < numConcurrentLocks; i++ {
lockers[i] = deployer.CreateLocker("humpty.dumpty@databricks.com", remoteProjectRoot)
lockers[i], err = deployer.CreateLocker("humpty.dumpty@databricks.com", remoteProjectRoot, wsc)
require.NoError(t, err)
}

var wg sync.WaitGroup
Expand All @@ -83,7 +86,7 @@ func TestAccLock(t *testing.T) {
go func() {
defer wg.Done()
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
lockerErrs[currentIndex] = lockers[currentIndex].Lock(ctx, wsc, false)
lockerErrs[currentIndex] = lockers[currentIndex].Lock(ctx, false)
}()
}
wg.Wait()
Expand All @@ -107,7 +110,7 @@ func TestAccLock(t *testing.T) {
assert.Equal(t, 1, countActive, "Exactly one locker should successfull acquire the lock")

// test remote lock matches active lock
remoteLocker, err := deployer.GetActiveLockState(ctx, wsc, lockers[indexOfActiveLocker].RemotePath())
remoteLocker, err := locker.GetActiveLockState(ctx)
require.NoError(t, err)
assert.Equal(t, remoteLocker.ID, lockers[indexOfActiveLocker].State.ID, "remote locker id does not match active locker")
assert.True(t, remoteLocker.AcquisitionTime.Equal(lockers[indexOfActiveLocker].State.AcquisitionTime), "remote locker acquisition time does not match active locker")
Expand All @@ -118,7 +121,7 @@ func TestAccLock(t *testing.T) {
continue
}
assert.NotEqual(t, remoteLocker.ID, lockers[i].State.ID)
err := lockers[i].Unlock(ctx, wsc)
err := lockers[i].Unlock(ctx)
assert.ErrorContains(t, err, "unlock called when lock is not held")
}

Expand All @@ -127,16 +130,16 @@ func TestAccLock(t *testing.T) {
if i == indexOfActiveLocker {
continue
}
err := lockers[i].PutFile(ctx, wsc, path.Join(remoteProjectRoot, "foo.json"), []byte(`'{"surname":"Khan", "name":"Shah Rukh"}`))
err := lockers[i].PutFile(ctx, "foo.json", []byte(`'{"surname":"Khan", "name":"Shah Rukh"}`))
assert.ErrorContains(t, err, "failed to put file. deploy lock not held")
}

// active locker file write succeeds
err = lockers[indexOfActiveLocker].PutFile(ctx, wsc, path.Join(remoteProjectRoot, "foo.json"), []byte(`{"surname":"Khan", "name":"Shah Rukh"}`))
err = lockers[indexOfActiveLocker].PutFile(ctx, "foo.json", []byte(`{"surname":"Khan", "name":"Shah Rukh"}`))
assert.NoError(t, err)

// active locker file read succeeds with expected results
bytes, err := lockers[indexOfActiveLocker].GetRawJsonFileContent(ctx, wsc, path.Join(remoteProjectRoot, "foo.json"))
bytes, err := lockers[indexOfActiveLocker].GetRawJsonFileContent(ctx, "foo.json")
var res map[string]string
json.Unmarshal(bytes, &res)
assert.NoError(t, err)
Expand All @@ -148,21 +151,21 @@ func TestAccLock(t *testing.T) {
if i == indexOfActiveLocker {
continue
}
_, err = lockers[i].GetRawJsonFileContent(ctx, wsc, path.Join(remoteProjectRoot, "foo.json"))
_, err = lockers[i].GetRawJsonFileContent(ctx, "foo.json")
assert.ErrorContains(t, err, "failed to get file. deploy lock not held")
}

// Unlock active lock and check it becomes inactive
err = lockers[indexOfActiveLocker].Unlock(ctx, wsc)
err = lockers[indexOfActiveLocker].Unlock(ctx)
assert.NoError(t, err)
remoteLocker, err = deployer.GetActiveLockState(ctx, wsc, lockers[indexOfActiveLocker].RemotePath())
remoteLocker, err = locker.GetActiveLockState(ctx)
assert.ErrorContains(t, err, "File not found.", "remote lock file not deleted on unlock")
assert.Nil(t, remoteLocker)
assert.False(t, lockers[indexOfActiveLocker].Active)

// A locker that failed to acquire the lock should now be able to acquire it
assert.False(t, lockers[indexOfAnInactiveLocker].Active)
err = lockers[indexOfAnInactiveLocker].Lock(ctx, wsc, false)
err = lockers[indexOfAnInactiveLocker].Lock(ctx, false)
assert.NoError(t, err)
assert.True(t, lockers[indexOfAnInactiveLocker].Active)
}
5 changes: 4 additions & 1 deletion libs/filer/workspace_files_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (
"golang.org/x/exp/slices"
)

// WorkspaceFilesClient implements the Files-in-Workspace API.
// WorkspaceFilesClient implements the files-in-workspace API.

// NOTE: This API is available for files under /Repos if a workspace has files-in-repos enabled.
// It can access any workspace path if files-in-workspace is enabled.
type WorkspaceFilesClient struct {
workspaceClient *databricks.WorkspaceClient
apiClient *client.DatabricksClient
Expand Down
Loading