From e784301290758825bc5a6cb8949aa7496684c428 Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Fri, 15 Sep 2023 17:11:25 +0200 Subject: [PATCH 1/4] Enable environment overrides for job tasks --- bundle/config/resources.go | 11 +++++ bundle/config/resources/job.go | 33 +++++++++++++ bundle/config/resources/job_test.go | 47 +++++++++++++++++++ bundle/config/root.go | 5 ++ .../tests/override_job_tasks/databricks.yml | 44 +++++++++++++++++ bundle/tests/override_job_tasks_test.go | 39 +++++++++++++++ 6 files changed, 179 insertions(+) create mode 100644 bundle/tests/override_job_tasks/databricks.yml create mode 100644 bundle/tests/override_job_tasks_test.go diff --git a/bundle/config/resources.go b/bundle/config/resources.go index c239b510b94..1c8da94cc99 100644 --- a/bundle/config/resources.go +++ b/bundle/config/resources.go @@ -141,3 +141,14 @@ func (r *Resources) MergeJobClusters() error { } return nil } + +// MergeJobTasks iterates over all jobs and merges their job tasks. +// This is called after applying the target overrides. +func (r *Resources) MergeJobTasks() error { + for _, job := range r.Jobs { + if err := job.MergeJobTasks(); err != nil { + return err + } + } + return nil +} diff --git a/bundle/config/resources/job.go b/bundle/config/resources/job.go index 66705afb2f4..6269f324f6e 100644 --- a/bundle/config/resources/job.go +++ b/bundle/config/resources/job.go @@ -47,3 +47,36 @@ func (j *Job) MergeJobClusters() error { j.JobClusters = output return nil } + +// MergeJobTasks merges job tasks with the same key. +// The job tasks field is a slice, and as such, overrides are appended to it. +// We can identify a job task by its task key, however, so we can use this key +// to figure out which definitions are actually overrides and merge them. +func (j *Job) MergeJobTasks() error { + keys := make(map[string]*jobs.Task) + tasks := make([]jobs.Task, 0, len(j.Tasks)) + + // Target overrides are always appended, so we can iterate in natural order to + // first find the base definition, and merge instances we encounter later. + for i := range j.Tasks { + key := j.Tasks[i].TaskKey + + // Register job task with key if not yet seen before. + ref, ok := keys[key] + if !ok { + tasks = append(tasks, j.Tasks[i]) + keys[key] = &j.Tasks[i] + continue + } + + // Merge this instance into the reference. + err := mergo.Merge(ref, &j.Tasks[i], mergo.WithOverride, mergo.WithAppendSlice) + if err != nil { + return err + } + } + + // Overwrite resulting slice. + j.Tasks = tasks + return nil +} diff --git a/bundle/config/resources/job_test.go b/bundle/config/resources/job_test.go index 2ff3205e0d3..9ce31a5bae4 100644 --- a/bundle/config/resources/job_test.go +++ b/bundle/config/resources/job_test.go @@ -55,3 +55,50 @@ func TestJobMergeJobClusters(t *testing.T) { jc1 := j.JobClusters[1].NewCluster assert.Equal(t, "10.4.x-scala2.12", jc1.SparkVersion) } + +func TestJobMergeJobTasks(t *testing.T) { + j := &Job{ + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + TaskKey: "foo", + NewCluster: &compute.ClusterSpec{ + SparkVersion: "13.3.x-scala2.12", + NodeTypeId: "i3.xlarge", + NumWorkers: 2, + }, + }, + { + TaskKey: "bar", + NewCluster: &compute.ClusterSpec{ + SparkVersion: "10.4.x-scala2.12", + }, + }, + { + TaskKey: "foo", + NewCluster: &compute.ClusterSpec{ + NodeTypeId: "i3.2xlarge", + NumWorkers: 4, + }, + }, + }, + }, + } + + err := j.MergeJobTasks() + require.NoError(t, err) + + assert.Len(t, j.Tasks, 2) + assert.Equal(t, "foo", j.Tasks[0].TaskKey) + assert.Equal(t, "bar", j.Tasks[1].TaskKey) + + // This job task was merged with a subsequent one. + task0 := j.Tasks[0].NewCluster + assert.Equal(t, "13.3.x-scala2.12", task0.SparkVersion) + assert.Equal(t, "i3.2xlarge", task0.NodeTypeId) + assert.Equal(t, 4, task0.NumWorkers) + + // This job task was left untouched. + task1 := j.Tasks[1].NewCluster + assert.Equal(t, "10.4.x-scala2.12", task1.SparkVersion) +} diff --git a/bundle/config/root.go b/bundle/config/root.go index 465d8a62e18..b1b877d0b00 100644 --- a/bundle/config/root.go +++ b/bundle/config/root.go @@ -242,6 +242,11 @@ func (r *Root) MergeTargetOverrides(target *Target) error { if err != nil { return err } + + err = r.Resources.MergeJobTasks() + if err != nil { + return err + } } if target.Variables != nil { diff --git a/bundle/tests/override_job_tasks/databricks.yml b/bundle/tests/override_job_tasks/databricks.yml new file mode 100644 index 00000000000..fb7170b79b7 --- /dev/null +++ b/bundle/tests/override_job_tasks/databricks.yml @@ -0,0 +1,44 @@ +bundle: + name: override_job_tasks + +workspace: + host: https://acme.cloud.databricks.com/ + +resources: + jobs: + foo: + name: job + tasks: + - task_key: key1 + new_cluster: + spark_version: 13.3.x-scala2.12 + spark_python_task: + python_file: ./test1.py + - task_key: key2 + new_cluster: + spark_version: 13.3.x-scala2.12 + spark_python_task: + python_file: ./test2.py + +environments: + development: + resources: + jobs: + foo: + tasks: + - task_key: key1 + new_cluster: + node_type_id: i3.xlarge + num_workers: 1 + + staging: + resources: + jobs: + foo: + tasks: + - task_key: key2 + new_cluster: + node_type_id: i3.2xlarge + num_workers: 4 + spark_python_task: + python_file: ./test3.py diff --git a/bundle/tests/override_job_tasks_test.go b/bundle/tests/override_job_tasks_test.go new file mode 100644 index 00000000000..1ff044fe7b3 --- /dev/null +++ b/bundle/tests/override_job_tasks_test.go @@ -0,0 +1,39 @@ +package config_tests + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestOverrideJobTasksDev(t *testing.T) { + b := loadTarget(t, "./override_job_tasks", "development") + assert.Equal(t, "job", b.Config.Resources.Jobs["foo"].Name) + assert.Len(t, b.Config.Resources.Jobs["foo"].Tasks, 2) + + tasks := b.Config.Resources.Jobs["foo"].Tasks + assert.Equal(t, tasks[0].TaskKey, "key1") + assert.Equal(t, tasks[0].NewCluster.NodeTypeId, "i3.xlarge") + assert.Equal(t, tasks[0].NewCluster.NumWorkers, 1) + assert.Equal(t, tasks[0].SparkPythonTask.PythonFile, "./test1.py") + + assert.Equal(t, tasks[1].TaskKey, "key2") + assert.Equal(t, tasks[1].NewCluster.SparkVersion, "13.3.x-scala2.12") + assert.Equal(t, tasks[1].SparkPythonTask.PythonFile, "./test2.py") +} + +func TestOverrideJobTasksStaging(t *testing.T) { + b := loadTarget(t, "./override_job_tasks", "staging") + assert.Equal(t, "job", b.Config.Resources.Jobs["foo"].Name) + assert.Len(t, b.Config.Resources.Jobs["foo"].Tasks, 2) + + tasks := b.Config.Resources.Jobs["foo"].Tasks + assert.Equal(t, tasks[0].TaskKey, "key1") + assert.Equal(t, tasks[0].NewCluster.SparkVersion, "13.3.x-scala2.12") + assert.Equal(t, tasks[0].SparkPythonTask.PythonFile, "./test1.py") + + assert.Equal(t, tasks[1].TaskKey, "key2") + assert.Equal(t, tasks[1].NewCluster.NodeTypeId, "i3.2xlarge") + assert.Equal(t, tasks[1].NewCluster.NumWorkers, 4) + assert.Equal(t, tasks[1].SparkPythonTask.PythonFile, "./test3.py") +} From 703b5e7fe5d0e628b1188230811c24c8cccf8639 Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Mon, 18 Sep 2023 09:40:24 +0200 Subject: [PATCH 2/4] renamed environments to targets --- bundle/tests/override_job_tasks/databricks.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bundle/tests/override_job_tasks/databricks.yml b/bundle/tests/override_job_tasks/databricks.yml index fb7170b79b7..ddee2879354 100644 --- a/bundle/tests/override_job_tasks/databricks.yml +++ b/bundle/tests/override_job_tasks/databricks.yml @@ -20,7 +20,7 @@ resources: spark_python_task: python_file: ./test2.py -environments: +targets: development: resources: jobs: From 86919b95187fa9bafd07f52740efa91c4ed0f91b Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Mon, 18 Sep 2023 09:42:36 +0200 Subject: [PATCH 3/4] job tasks to tasks rename --- bundle/config/resources.go | 2 +- bundle/config/resources/job.go | 8 ++++---- bundle/config/resources/job_test.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/bundle/config/resources.go b/bundle/config/resources.go index 1c8da94cc99..6d9a5e19f68 100644 --- a/bundle/config/resources.go +++ b/bundle/config/resources.go @@ -142,7 +142,7 @@ func (r *Resources) MergeJobClusters() error { return nil } -// MergeJobTasks iterates over all jobs and merges their job tasks. +// MergeJobTasks iterates over all jobs and merges their tasks. // This is called after applying the target overrides. func (r *Resources) MergeJobTasks() error { for _, job := range r.Jobs { diff --git a/bundle/config/resources/job.go b/bundle/config/resources/job.go index 6269f324f6e..939a040b352 100644 --- a/bundle/config/resources/job.go +++ b/bundle/config/resources/job.go @@ -48,9 +48,9 @@ func (j *Job) MergeJobClusters() error { return nil } -// MergeJobTasks merges job tasks with the same key. -// The job tasks field is a slice, and as such, overrides are appended to it. -// We can identify a job task by its task key, however, so we can use this key +// MergeJobTasks merges tasks with the same key. +// The tasks field is a slice, and as such, overrides are appended to it. +// We can identify a task by its task key, however, so we can use this key // to figure out which definitions are actually overrides and merge them. func (j *Job) MergeJobTasks() error { keys := make(map[string]*jobs.Task) @@ -61,7 +61,7 @@ func (j *Job) MergeJobTasks() error { for i := range j.Tasks { key := j.Tasks[i].TaskKey - // Register job task with key if not yet seen before. + // Register the task with key if not yet seen before. ref, ok := keys[key] if !ok { tasks = append(tasks, j.Tasks[i]) diff --git a/bundle/config/resources/job_test.go b/bundle/config/resources/job_test.go index 9ce31a5bae4..b38ff310bb1 100644 --- a/bundle/config/resources/job_test.go +++ b/bundle/config/resources/job_test.go @@ -92,13 +92,13 @@ func TestJobMergeJobTasks(t *testing.T) { assert.Equal(t, "foo", j.Tasks[0].TaskKey) assert.Equal(t, "bar", j.Tasks[1].TaskKey) - // This job task was merged with a subsequent one. + // This task was merged with a subsequent one. task0 := j.Tasks[0].NewCluster assert.Equal(t, "13.3.x-scala2.12", task0.SparkVersion) assert.Equal(t, "i3.2xlarge", task0.NodeTypeId) assert.Equal(t, 4, task0.NumWorkers) - // This job task was left untouched. + // This task was left untouched. task1 := j.Tasks[1].NewCluster assert.Equal(t, "10.4.x-scala2.12", task1.SparkVersion) } From 817c3365f82bfd88835fbac82ef27e0039099f9f Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Mon, 18 Sep 2023 16:04:48 +0200 Subject: [PATCH 4/4] renamed methods --- bundle/config/resources.go | 6 +++--- bundle/config/resources/job.go | 4 ++-- bundle/config/resources/job_test.go | 4 ++-- bundle/config/root.go | 2 +- bundle/tests/override_job_tasks_test.go | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/bundle/config/resources.go b/bundle/config/resources.go index 6d9a5e19f68..48621e443f5 100644 --- a/bundle/config/resources.go +++ b/bundle/config/resources.go @@ -142,11 +142,11 @@ func (r *Resources) MergeJobClusters() error { return nil } -// MergeJobTasks iterates over all jobs and merges their tasks. +// MergeTasks iterates over all jobs and merges their tasks. // This is called after applying the target overrides. -func (r *Resources) MergeJobTasks() error { +func (r *Resources) MergeTasks() error { for _, job := range r.Jobs { - if err := job.MergeJobTasks(); err != nil { + if err := job.MergeTasks(); err != nil { return err } } diff --git a/bundle/config/resources/job.go b/bundle/config/resources/job.go index 939a040b352..7fc5b7610bf 100644 --- a/bundle/config/resources/job.go +++ b/bundle/config/resources/job.go @@ -48,11 +48,11 @@ func (j *Job) MergeJobClusters() error { return nil } -// MergeJobTasks merges tasks with the same key. +// MergeTasks merges tasks with the same key. // The tasks field is a slice, and as such, overrides are appended to it. // We can identify a task by its task key, however, so we can use this key // to figure out which definitions are actually overrides and merge them. -func (j *Job) MergeJobTasks() error { +func (j *Job) MergeTasks() error { keys := make(map[string]*jobs.Task) tasks := make([]jobs.Task, 0, len(j.Tasks)) diff --git a/bundle/config/resources/job_test.go b/bundle/config/resources/job_test.go index b38ff310bb1..818d2ac2169 100644 --- a/bundle/config/resources/job_test.go +++ b/bundle/config/resources/job_test.go @@ -56,7 +56,7 @@ func TestJobMergeJobClusters(t *testing.T) { assert.Equal(t, "10.4.x-scala2.12", jc1.SparkVersion) } -func TestJobMergeJobTasks(t *testing.T) { +func TestJobMergeTasks(t *testing.T) { j := &Job{ JobSettings: &jobs.JobSettings{ Tasks: []jobs.Task{ @@ -85,7 +85,7 @@ func TestJobMergeJobTasks(t *testing.T) { }, } - err := j.MergeJobTasks() + err := j.MergeTasks() require.NoError(t, err) assert.Len(t, j.Tasks, 2) diff --git a/bundle/config/root.go b/bundle/config/root.go index b1b877d0b00..32883c746e2 100644 --- a/bundle/config/root.go +++ b/bundle/config/root.go @@ -243,7 +243,7 @@ func (r *Root) MergeTargetOverrides(target *Target) error { return err } - err = r.Resources.MergeJobTasks() + err = r.Resources.MergeTasks() if err != nil { return err } diff --git a/bundle/tests/override_job_tasks_test.go b/bundle/tests/override_job_tasks_test.go index 1ff044fe7b3..82da04da265 100644 --- a/bundle/tests/override_job_tasks_test.go +++ b/bundle/tests/override_job_tasks_test.go @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestOverrideJobTasksDev(t *testing.T) { +func TestOverrideTasksDev(t *testing.T) { b := loadTarget(t, "./override_job_tasks", "development") assert.Equal(t, "job", b.Config.Resources.Jobs["foo"].Name) assert.Len(t, b.Config.Resources.Jobs["foo"].Tasks, 2) @@ -22,7 +22,7 @@ func TestOverrideJobTasksDev(t *testing.T) { assert.Equal(t, tasks[1].SparkPythonTask.PythonFile, "./test2.py") } -func TestOverrideJobTasksStaging(t *testing.T) { +func TestOverrideTasksStaging(t *testing.T) { b := loadTarget(t, "./override_job_tasks", "staging") assert.Equal(t, "job", b.Config.Resources.Jobs["foo"].Name) assert.Len(t, b.Config.Resources.Jobs["foo"].Tasks, 2)