From 8b35bdcff1597495ea89d82baba1534af871cb63 Mon Sep 17 00:00:00 2001 From: Rolando Santamaria Maso Date: Thu, 4 Jun 2026 20:59:53 +0200 Subject: [PATCH 01/11] feat(schedule): native in-process scheduler core (phase 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce internal/schedule — the engine for odek's native cron capability, replacing the Docker + supercronic approach. Running in-process means the host already has resolved config (API key, model, bot token, default chat) in memory, so a scheduled task sees exactly what an interactive one does — no environment-inheritance games, no external cron daemon, no container-only behaviour. This phase is the standalone core only — no CLI or bot wiring yet. - types.go: Job / Delivery / RunState. Definitions and runtime state persist to separate files so a hand-edit never races a state write. - cronexpr.go: stdlib-only 5-field cron parser (ranges, steps, lists, names, @macros) with correct Vixie dom/dow union semantics, timezone-aware Next() via coarse unit-stepping, and a horizon that clears the leap-century gap. - store.go: atomic (temp+rename, 0600) CRUD for schedules.json and schedule-state.json, mirroring session.Store; validates jobs on write. - scheduler.go: firing engine decoupled from the agent/telegram via Runner and Deliverer interfaces. Earliest-fire timer (no per-minute polling), bounded concurrency, per-job overlap guard, missed-run skip/catchup policy, mtime hot-reload, and graceful drain on context cancellation. Tests: 39 cases, 87.9% coverage, green under -race. Parser table tests (ranges/steps/lists/names/macros/dom-dow union/leap day/timezone/errors); engine tests drive reconcile/fireDue directly with explicit clocks plus one real-clock lifecycle test — deterministic, no flaky sleeps. Co-Authored-By: Claude Opus 4.8 (1M context) --- internal/schedule/cronexpr.go | 305 +++++++++++++++++++++++ internal/schedule/cronexpr_test.go | 278 +++++++++++++++++++++ internal/schedule/scheduler.go | 336 +++++++++++++++++++++++++ internal/schedule/scheduler_test.go | 369 ++++++++++++++++++++++++++++ internal/schedule/store.go | 362 +++++++++++++++++++++++++++ internal/schedule/store_test.go | 293 ++++++++++++++++++++++ internal/schedule/types.go | 74 ++++++ 7 files changed, 2017 insertions(+) create mode 100644 internal/schedule/cronexpr.go create mode 100644 internal/schedule/cronexpr_test.go create mode 100644 internal/schedule/scheduler.go create mode 100644 internal/schedule/scheduler_test.go create mode 100644 internal/schedule/store.go create mode 100644 internal/schedule/store_test.go create mode 100644 internal/schedule/types.go diff --git a/internal/schedule/cronexpr.go b/internal/schedule/cronexpr.go new file mode 100644 index 0000000..79598d5 --- /dev/null +++ b/internal/schedule/cronexpr.go @@ -0,0 +1,305 @@ +package schedule + +import ( + "fmt" + "strconv" + "strings" + "time" +) + +// Schedule is a parsed cron expression bound to a timezone. It answers one +// question — "given an instant, when does this next fire?" — via Next. +// +// Supported syntax is standard 5-field Vixie cron: +// +// ┌ minute 0-59 +// │ ┌ hour 0-23 +// │ │ ┌ dom 1-31 +// │ │ │ ┌ month 1-12 or JAN-DEC +// │ │ │ │ ┌ dow 0-6 or SUN-SAT (0 and 7 both mean Sunday) +// * * * * * +// +// Each field accepts: a wildcard "*", a single value, a range "a-b", a step +// "*/n" / "a-b/n" / "a/n" (from a to the field max), and comma-separated lists +// of any of those. Month and day-of-week also accept three-letter names. +// +// Macros: @yearly (@annually), @monthly, @weekly, @daily (@midnight), @hourly. +// +// Day-of-month / day-of-week coupling follows Vixie semantics: when BOTH +// fields are restricted (neither is "*"), a day matches if EITHER field +// matches (union). When at least one is a wildcard, the usual intersection +// applies. This is why "0 0 13 * 5" fires on the 13th OR any Friday, not only +// Friday-the-13th. +type Schedule struct { + minute uint64 // bitset over 0..59 + hour uint64 // bitset over 0..23 + dom uint64 // bitset over 1..31 + month uint64 // bitset over 1..12 + dow uint64 // bitset over 0..6 (Sunday=0) + + domStar bool // dom field was a wildcard ("*" or "*/n") + dowStar bool // dow field was a wildcard + + loc *time.Location + expr string // original expression, for String() +} + +// matchHorizon bounds Next's search. The rarest legitimate expression is +// Feb 29, whose gap can reach 8 years across a non-leap century boundary +// (e.g. 2096 → 2104, since 2100 is not a leap year); 9 years clears it with +// margin. Stepping past the horizon means the expression matches nothing +// reachable (e.g. "0 0 30 2 *", Feb 30) and Next returns the zero time. Even +// then the coarse unit-stepping gives up in a few thousand iterations. +const matchHorizon = 9 * 366 * 24 * time.Hour + +var monthNames = map[string]int{ + "jan": 1, "feb": 2, "mar": 3, "apr": 4, "may": 5, "jun": 6, + "jul": 7, "aug": 8, "sep": 9, "oct": 10, "nov": 11, "dec": 12, +} + +var dowNames = map[string]int{ + "sun": 0, "mon": 1, "tue": 2, "wed": 3, "thu": 4, "fri": 5, "sat": 6, +} + +// Parse compiles a cron expression in UTC. +func Parse(expr string) (*Schedule, error) { + return ParseInLocation(expr, time.UTC) +} + +// ParseInLocation compiles a cron expression bound to loc. A nil loc defaults +// to UTC. The location only affects Next/Matches, not parsing. +func ParseInLocation(expr string, loc *time.Location) (*Schedule, error) { + if loc == nil { + loc = time.UTC + } + raw := strings.TrimSpace(expr) + if raw == "" { + return nil, fmt.Errorf("cron: empty expression") + } + + if strings.HasPrefix(raw, "@") { + expanded, err := expandMacro(raw) + if err != nil { + return nil, err + } + raw = expanded + } + + fields := strings.Fields(raw) + if len(fields) != 5 { + return nil, fmt.Errorf("cron: expected 5 fields, got %d in %q", len(fields), expr) + } + + s := &Schedule{loc: loc, expr: strings.TrimSpace(expr)} + var err error + + if s.minute, _, err = parseField(fields[0], 0, 59, nil); err != nil { + return nil, fmt.Errorf("cron: minute: %w", err) + } + if s.hour, _, err = parseField(fields[1], 0, 23, nil); err != nil { + return nil, fmt.Errorf("cron: hour: %w", err) + } + if s.dom, s.domStar, err = parseField(fields[2], 1, 31, nil); err != nil { + return nil, fmt.Errorf("cron: day-of-month: %w", err) + } + if s.month, _, err = parseField(fields[3], 1, 12, monthNames); err != nil { + return nil, fmt.Errorf("cron: month: %w", err) + } + // Day-of-week accepts 0..7; 7 is a second spelling of Sunday (0). + dowMask, dowStar, err := parseField(fields[4], 0, 7, dowNames) + if err != nil { + return nil, fmt.Errorf("cron: day-of-week: %w", err) + } + if dowMask&(1<<7) != 0 { + dowMask |= 1 << 0 // fold 7 → 0 + dowMask &^= 1 << 7 + } + s.dow, s.dowStar = dowMask, dowStar + + return s, nil +} + +// expandMacro rewrites a @macro into its 5-field equivalent. +func expandMacro(m string) (string, error) { + switch strings.ToLower(m) { + case "@yearly", "@annually": + return "0 0 1 1 *", nil + case "@monthly": + return "0 0 1 * *", nil + case "@weekly": + return "0 0 * * 0", nil + case "@daily", "@midnight": + return "0 0 * * *", nil + case "@hourly": + return "0 * * * *", nil + case "@reboot": + // @reboot has no meaning for a persistent scheduler — the catchup + // flag covers "run if a fire was missed while down". Reject it + // explicitly rather than silently never firing. + return "", fmt.Errorf("cron: @reboot is not supported (use the job's catchup option)") + default: + return "", fmt.Errorf("cron: unknown macro %q", m) + } +} + +// parseField parses one cron field into a bitset over [min,max]. star reports +// whether the field began with "*" (a wildcard), which the caller needs for +// the dom/dow union rule. names, if non-nil, maps lowercased symbolic names +// (e.g. "mon") to values. +func parseField(field string, min, max int, names map[string]int) (mask uint64, star bool, err error) { + if field == "" { + return 0, false, fmt.Errorf("empty field") + } + star = strings.HasPrefix(field, "*") + for item := range strings.SplitSeq(field, ",") { + m, err := parseItem(item, min, max, names) + if err != nil { + return 0, false, err + } + mask |= m + } + return mask, star, nil +} + +// parseItem parses a single comma-separated element: "*", "*/n", "a", "a-b", +// "a-b/n", or "a/n". +func parseItem(item string, min, max int, names map[string]int) (uint64, error) { + rng := item + step := 1 + if before, stepStr, found := strings.Cut(item, "/"); found { + rng = before + n, err := strconv.Atoi(stepStr) + if err != nil || n <= 0 { + return 0, fmt.Errorf("invalid step %q in %q", stepStr, item) + } + step = n + } + + var lo, hi int + switch { + case rng == "*": + lo, hi = min, max + case strings.ContainsRune(rng, '-'): + parts := strings.SplitN(rng, "-", 2) + var err error + if lo, err = parseValue(parts[0], names); err != nil { + return 0, err + } + if hi, err = parseValue(parts[1], names); err != nil { + return 0, err + } + default: + v, err := parseValue(rng, names) + if err != nil { + return 0, err + } + lo = v + // "a/n" means "from a to the maximum, stepping n"; a bare "a" is just a. + if step > 1 { + hi = max + } else { + hi = v + } + } + + if lo < min || hi > max || lo > hi { + return 0, fmt.Errorf("value out of range [%d,%d] in %q", min, max, item) + } + + var mask uint64 + for v := lo; v <= hi; v += step { + mask |= 1 << uint(v) + } + return mask, nil +} + +// parseValue resolves a single token to an int, accepting symbolic names when +// names is non-nil. +func parseValue(tok string, names map[string]int) (int, error) { + tok = strings.TrimSpace(tok) + if tok == "" { + return 0, fmt.Errorf("empty value") + } + if names != nil { + if v, ok := names[strings.ToLower(tok)]; ok { + return v, nil + } + } + v, err := strconv.Atoi(tok) + if err != nil { + return 0, fmt.Errorf("invalid value %q", tok) + } + return v, nil +} + +// Matches reports whether t (in the schedule's location, to the minute) is a +// firing time. +func (s *Schedule) Matches(t time.Time) bool { + t = t.In(s.loc) + if s.minute&(1< maxSleep { + return maxSleep + } + return d +} + +// compile parses a job's cron expression in its timezone (or the supplied +// default when the job specifies none). +func compile(job Job, defaultTZ *time.Location) (*Schedule, error) { + loc := defaultTZ + if job.Timezone != "" { + l, err := time.LoadLocation(job.Timezone) + if err != nil { + return nil, err + } + loc = l + } + return ParseInLocation(job.Cron, loc) +} + +// preview truncates a result for storage in RunState.LastResult. +func preview(s string) string { + r := []rune(s) + if len(r) <= resultPreviewRunes { + return s + } + return string(r[:resultPreviewRunes]) + "…" +} diff --git a/internal/schedule/scheduler_test.go b/internal/schedule/scheduler_test.go new file mode 100644 index 0000000..8494e1a --- /dev/null +++ b/internal/schedule/scheduler_test.go @@ -0,0 +1,369 @@ +package schedule + +import ( + "context" + "strings" + "sync" + "testing" + "time" +) + +// ── Test doubles ──────────────────────────────────────────────────────── + +type fakeRunner struct { + mu sync.Mutex + calls []Job + result string + err error + block chan struct{} // if non-nil, Run blocks on it (simulate a slow job) + started chan string // if non-nil, receives job ID when Run begins +} + +func (f *fakeRunner) Run(_ context.Context, job Job) (string, int64, error) { + f.mu.Lock() + f.calls = append(f.calls, job) + f.mu.Unlock() + if f.started != nil { + f.started <- job.ID + } + if f.block != nil { + <-f.block + } + return f.result, 7, f.err +} + +func (f *fakeRunner) callCount() int { + f.mu.Lock() + defer f.mu.Unlock() + return len(f.calls) +} + +type fakeDeliverer struct { + mu sync.Mutex + delivered []string + err error +} + +func (f *fakeDeliverer) Deliver(_ Job, result string) error { + f.mu.Lock() + defer f.mu.Unlock() + if f.err != nil { + return f.err + } + f.delivered = append(f.delivered, result) + return nil +} + +// peekNext exposes a job's scheduled next-fire for assertions. +func (s *Scheduler) peekNext(id string) time.Time { + s.mu.Lock() + defer s.mu.Unlock() + return s.next[id] +} + +// addJob is a helper that adds a job and returns its stored copy. +func addJob(t *testing.T, st *Store, j Job) Job { + t.Helper() + got, err := st.Add(j) + if err != nil { + t.Fatalf("Add: %v", err) + } + return got +} + +// ── fireDue ───────────────────────────────────────────────────────────── + +func TestFireDue_RunsAndDelivers(t *testing.T) { + st := newTestStore(t) + job := addJob(t, st, Job{Name: "j", Cron: "* * * * *", Task: "do it", + Deliver: Delivery{Kind: DeliverStdout}, Enabled: true}) + + runner := &fakeRunner{result: "hello"} + deliv := &fakeDeliverer{} + s := New(st, runner, deliv, Options{}) + + t0 := time.Date(2026, 6, 4, 10, 0, 0, 0, time.UTC) + s.reconcile(t0) + + // Not due yet (next is strictly after t0). + s.fireDue(context.Background(), t0) + s.Wait() + if runner.callCount() != 0 { + t.Fatalf("fired before due: %d calls", runner.callCount()) + } + + // Fire at the scheduled instant. + due := s.peekNext(job.ID) + s.fireDue(context.Background(), due) + s.Wait() + + if runner.callCount() != 1 { + t.Fatalf("expected 1 run, got %d", runner.callCount()) + } + if len(deliv.delivered) != 1 || deliv.delivered[0] != "hello" { + t.Fatalf("delivered = %v", deliv.delivered) + } + state, _ := st.LoadState() + if state[job.ID].LastStatus != StatusOK { + t.Errorf("status = %q, want ok", state[job.ID].LastStatus) + } + if state[job.ID].Runs != 1 { + t.Errorf("runs = %d, want 1", state[job.ID].Runs) + } + if state[job.ID].LastResult != "hello" { + t.Errorf("LastResult = %q", state[job.ID].LastResult) + } + // next must have advanced strictly past the fire instant. + if !s.peekNext(job.ID).After(due) { + t.Errorf("next did not advance after fire") + } +} + +func TestFireDue_RunnerError(t *testing.T) { + st := newTestStore(t) + job := addJob(t, st, Job{Name: "j", Cron: "* * * * *", Task: "x", + Deliver: Delivery{Kind: DeliverStdout}, Enabled: true}) + runner := &fakeRunner{err: context.DeadlineExceeded} + deliv := &fakeDeliverer{} + s := New(st, runner, deliv, Options{}) + + t0 := time.Date(2026, 6, 4, 10, 0, 0, 0, time.UTC) + s.reconcile(t0) + s.fireDue(context.Background(), s.peekNext(job.ID)) + s.Wait() + + if len(deliv.delivered) != 0 { + t.Error("delivery should not happen when runner errored") + } + state, _ := st.LoadState() + if state[job.ID].LastStatus != StatusError || state[job.ID].LastError == "" { + t.Errorf("expected error state, got %+v", state[job.ID]) + } +} + +func TestFireDue_DeliveryError(t *testing.T) { + st := newTestStore(t) + job := addJob(t, st, Job{Name: "j", Cron: "* * * * *", Task: "x", + Deliver: Delivery{Kind: DeliverStdout}, Enabled: true}) + runner := &fakeRunner{result: "ok"} + deliv := &fakeDeliverer{err: context.Canceled} + s := New(st, runner, deliv, Options{}) + + t0 := time.Date(2026, 6, 4, 10, 0, 0, 0, time.UTC) + s.reconcile(t0) + s.fireDue(context.Background(), s.peekNext(job.ID)) + s.Wait() + + state, _ := st.LoadState() + if state[job.ID].LastStatus != StatusError { + t.Errorf("status = %q, want error", state[job.ID].LastStatus) + } + if !strings.HasPrefix(state[job.ID].LastError, "delivery:") { + t.Errorf("LastError = %q, want delivery: prefix", state[job.ID].LastError) + } +} + +func TestFireDue_OverlapGuard(t *testing.T) { + st := newTestStore(t) + job := addJob(t, st, Job{Name: "slow", Cron: "* * * * *", Task: "x", + Deliver: Delivery{Kind: DeliverStdout}, Enabled: true}) + runner := &fakeRunner{result: "ok", block: make(chan struct{}), started: make(chan string, 1)} + s := New(st, runner, &fakeDeliverer{}, Options{}) + + t0 := time.Date(2026, 6, 4, 10, 0, 0, 0, time.UTC) + s.reconcile(t0) + + // First fire starts and blocks inside the runner. + due1 := s.peekNext(job.ID) + s.fireDue(context.Background(), due1) + <-runner.started // ensure it's in flight + + // A second due fire while the first is still running must be skipped. + due2 := s.peekNext(job.ID) + s.fireDue(context.Background(), due2) + if c := runner.callCount(); c != 1 { + t.Fatalf("overlap guard failed: %d concurrent runs", c) + } + + close(runner.block) // let the first finish + s.Wait() + if c := runner.callCount(); c != 1 { + t.Errorf("expected exactly 1 run total, got %d", c) + } +} + +// ── Missed-run policy ─────────────────────────────────────────────────── + +func TestReconcile_MissedSkip(t *testing.T) { + st := newTestStore(t) + job := addJob(t, st, Job{Name: "j", Cron: "0 9 * * *", Task: "x", + Deliver: Delivery{Kind: DeliverStdout}, Enabled: true, Catchup: false}) + // Pretend a fire was due in the past while we were down. + past := time.Date(2026, 6, 3, 9, 0, 0, 0, time.UTC) + _ = st.SaveState(RunState{JobID: job.ID, NextRun: past}) + + runner := &fakeRunner{} + s := New(st, runner, &fakeDeliverer{}, Options{}) + now := time.Date(2026, 6, 4, 10, 0, 0, 0, time.UTC) + s.reconcile(now) + + // Next must be in the future (forward-scheduled), not the missed instant. + if !s.peekNext(job.ID).After(now) { + t.Errorf("missed fire not skipped forward: next=%v", s.peekNext(job.ID)) + } + s.fireDue(context.Background(), now) + s.Wait() + if runner.callCount() != 0 { + t.Error("missed fire should not run when catchup is off") + } + state, _ := st.LoadState() + if state[job.ID].LastStatus != StatusSkipped { + t.Errorf("status = %q, want skipped", state[job.ID].LastStatus) + } +} + +func TestReconcile_MissedCatchup(t *testing.T) { + st := newTestStore(t) + job := addJob(t, st, Job{Name: "j", Cron: "0 9 * * *", Task: "x", + Deliver: Delivery{Kind: DeliverStdout}, Enabled: true, Catchup: true}) + past := time.Date(2026, 6, 3, 9, 0, 0, 0, time.UTC) + _ = st.SaveState(RunState{JobID: job.ID, NextRun: past}) + + runner := &fakeRunner{result: "caught up"} + deliv := &fakeDeliverer{} + s := New(st, runner, deliv, Options{}) + now := time.Date(2026, 6, 4, 10, 0, 0, 0, time.UTC) + s.reconcile(now) + + // Catchup schedules an immediate fire. + if s.peekNext(job.ID).After(now) { + t.Fatalf("catchup did not schedule an immediate fire: next=%v", s.peekNext(job.ID)) + } + s.fireDue(context.Background(), now) + s.Wait() + if runner.callCount() != 1 { + t.Errorf("catchup did not run the missed job: %d calls", runner.callCount()) + } +} + +// ── Reconcile lifecycle ───────────────────────────────────────────────── + +func TestReconcile_DropsDisabled(t *testing.T) { + st := newTestStore(t) + job := addJob(t, st, Job{Name: "j", Cron: "* * * * *", Task: "x", + Deliver: Delivery{Kind: DeliverStdout}, Enabled: true}) + s := New(st, &fakeRunner{}, &fakeDeliverer{}, Options{}) + now := time.Date(2026, 6, 4, 10, 0, 0, 0, time.UTC) + s.reconcile(now) + if s.peekNext(job.ID).IsZero() { + t.Fatal("job not tracked after reconcile") + } + if err := st.SetEnabled(job.ID, false); err != nil { + t.Fatalf("SetEnabled: %v", err) + } + s.reconcile(now) + if !s.peekNext(job.ID).IsZero() { + t.Error("disabled job not dropped from schedule") + } +} + +func TestReconcile_UnchangedKeepsNextFire(t *testing.T) { + st := newTestStore(t) + a := addJob(t, st, Job{Name: "a", Cron: "0 9 * * *", Task: "x", + Deliver: Delivery{Kind: DeliverStdout}, Enabled: true}) + s := New(st, &fakeRunner{}, &fakeDeliverer{}, Options{}) + now := time.Date(2026, 6, 4, 10, 0, 0, 0, time.UTC) + s.reconcile(now) + firstNext := s.peekNext(a.ID) + + // Adding an unrelated job and reconciling again must not shift a's fire. + addJob(t, st, Job{Name: "b", Cron: "0 10 * * *", Task: "y", + Deliver: Delivery{Kind: DeliverStdout}, Enabled: true}) + s.reconcile(now.Add(time.Minute)) + if !s.peekNext(a.ID).Equal(firstNext) { + t.Errorf("unrelated reconcile shifted job a: %v != %v", s.peekNext(a.ID), firstNext) + } +} + +func TestReconcile_SkipsInvalidJobWrittenDirectly(t *testing.T) { + // A malformed job that bypassed Validate (e.g. hand-edited file) must be + // skipped without aborting the reconcile of healthy jobs. + st := newTestStore(t) + good := addJob(t, st, Job{Name: "good", Cron: "* * * * *", Task: "x", + Deliver: Delivery{Kind: DeliverStdout}, Enabled: true}) + // Write a bad job directly into the doc, sidestepping Add's validation. + doc, _ := st.loadDoc() + doc.Jobs = append(doc.Jobs, Job{ID: "jb-bad", Name: "bad", Cron: "not-a-cron", + Task: "x", Deliver: Delivery{Kind: DeliverStdout}, Enabled: true}) + _ = st.saveDoc(doc) + + s := New(st, &fakeRunner{}, &fakeDeliverer{}, Options{}) + s.reconcile(time.Date(2026, 6, 4, 10, 0, 0, 0, time.UTC)) + if s.peekNext(good.ID).IsZero() { + t.Error("good job dropped because a sibling was invalid") + } + if !s.peekNext("jb-bad").IsZero() { + t.Error("invalid job should not be scheduled") + } +} + +func TestReconcile_JobTimezone(t *testing.T) { + berlin, err := time.LoadLocation("Europe/Berlin") + if err != nil { + t.Skipf("tz data unavailable: %v", err) + } + st := newTestStore(t) + // Daily 09:00 Berlin; the engine's DefaultTZ is UTC, so the job's own + // Timezone must win when compiling. + job := addJob(t, st, Job{Name: "tz", Cron: "0 9 * * *", Task: "x", + Deliver: Delivery{Kind: DeliverStdout}, Enabled: true, Timezone: "Europe/Berlin"}) + s := New(st, &fakeRunner{}, &fakeDeliverer{}, Options{DefaultTZ: time.UTC}) + + // 06:00 UTC = 08:00 Berlin (CEST) → next fire is 09:00 Berlin today. + now := time.Date(2026, 6, 4, 6, 0, 0, 0, time.UTC) + s.reconcile(now) + got := s.peekNext(job.ID) + want := time.Date(2026, 6, 4, 9, 0, 0, 0, berlin) + if !got.Equal(want) { + t.Errorf("next = %v, want %v (job timezone must override DefaultTZ)", got.In(berlin), want) + } +} + +// ── Run lifecycle ─────────────────────────────────────────────────────── + +func TestRun_FiresThenStopsCleanly(t *testing.T) { + st := newTestStore(t) + // Far-future cron, but a missed catchup fire forces an immediate run on + // startup — so we exercise Run's real loop without waiting a wall minute. + job := addJob(t, st, Job{Name: "j", Cron: "0 0 1 1 *", Task: "x", + Deliver: Delivery{Kind: DeliverStdout}, Enabled: true, Catchup: true}) + _ = st.SaveState(RunState{JobID: job.ID, NextRun: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)}) + + runner := &fakeRunner{result: "ok", started: make(chan string, 1)} + s := New(st, runner, &fakeDeliverer{}, Options{ReloadEvery: 20 * time.Millisecond}) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- s.Run(ctx) }() + + select { + case <-runner.started: + // fired as expected + case <-time.After(2 * time.Second): + cancel() + t.Fatal("scheduler did not fire the catchup job") + } + + cancel() + select { + case err := <-done: + if err != context.Canceled { + t.Errorf("Run returned %v, want context.Canceled", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Run did not return after cancel") + } + if runner.callCount() != 1 { + t.Errorf("expected 1 run, got %d", runner.callCount()) + } +} diff --git a/internal/schedule/store.go b/internal/schedule/store.go new file mode 100644 index 0000000..9f792c4 --- /dev/null +++ b/internal/schedule/store.go @@ -0,0 +1,362 @@ +package schedule + +import ( + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sort" + "sync" + "time" +) + +// File names under ~/.odek. +const ( + schedulesFile = "schedules.json" // job definitions + stateFile = "schedule-state.json" // runtime state, keyed by job ID +) + +// scheduleDoc is the on-disk shape of schedules.json. Wrapping the slice in a +// versioned object leaves room to evolve the format without a breaking change. +type scheduleDoc struct { + Version int `json:"version"` + Jobs []Job `json:"jobs"` +} + +// stateDoc is the on-disk shape of schedule-state.json. +type stateDoc struct { + Version int `json:"version"` + States map[string]RunState `json:"states"` +} + +// Store persists schedule definitions and runtime state as two JSON files +// under a directory (normally ~/.odek). It is a thin, mutex-guarded file +// manager in the same spirit as session.Store: all Job fields are public, so +// callers read a Job, mutate it, and write it back. +type Store struct { + dir string + mu sync.Mutex +} + +// NewStore opens the schedule store rooted at ~/.odek, creating the directory +// if needed. +func NewStore() (*Store, error) { + home, err := os.UserHomeDir() + if err != nil { + return nil, fmt.Errorf("schedule: home dir: %w", err) + } + return NewStoreAt(filepath.Join(home, ".odek")) +} + +// NewStoreAt opens the schedule store rooted at dir. Used by tests and by +// callers that resolve ~/.odek themselves. +func NewStoreAt(dir string) (*Store, error) { + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("schedule: create dir: %w", err) + } + return &Store{dir: dir}, nil +} + +// ── Validation ────────────────────────────────────────────────────────── + +// Validate checks that a job is well-formed enough to persist and run: a +// parseable cron expression, a known delivery kind, a non-empty task, and a +// loadable timezone if one is set. It does not assign IDs or defaults. +func (j Job) Validate() error { + if j.Task == "" { + return fmt.Errorf("schedule: job %q has an empty task", j.Name) + } + loc := time.UTC + if j.Timezone != "" { + l, err := time.LoadLocation(j.Timezone) + if err != nil { + return fmt.Errorf("schedule: job %q: invalid timezone %q: %w", j.Name, j.Timezone, err) + } + loc = l + } + if _, err := ParseInLocation(j.Cron, loc); err != nil { + return fmt.Errorf("schedule: job %q: %w", j.Name, err) + } + switch j.Deliver.Kind { + case DeliverTelegram, DeliverStdout, DeliverLog: + case "": + return fmt.Errorf("schedule: job %q has no delivery kind", j.Name) + default: + return fmt.Errorf("schedule: job %q has unknown delivery kind %q", j.Name, j.Deliver.Kind) + } + return nil +} + +// ── Job CRUD ──────────────────────────────────────────────────────────── + +// Add validates and appends a job. If job.ID is empty a stable ID is +// generated; if job.CreatedAt is zero it is stamped with now. The stored job +// (with ID/CreatedAt filled in) is returned. +func (s *Store) Add(job Job) (Job, error) { + if err := job.Validate(); err != nil { + return Job{}, err + } + s.mu.Lock() + defer s.mu.Unlock() + + doc, err := s.loadDoc() + if err != nil { + return Job{}, err + } + if job.ID == "" { + job.ID = newJobID() + } + for _, existing := range doc.Jobs { + if existing.ID == job.ID { + return Job{}, fmt.Errorf("schedule: job ID %q already exists", job.ID) + } + } + if job.CreatedAt.IsZero() { + job.CreatedAt = time.Now().UTC() + } + doc.Jobs = append(doc.Jobs, job) + if err := s.saveDoc(doc); err != nil { + return Job{}, err + } + return job, nil +} + +// List returns all jobs, sorted by creation time then ID for stable output. +func (s *Store) List() ([]Job, error) { + s.mu.Lock() + defer s.mu.Unlock() + doc, err := s.loadDoc() + if err != nil { + return nil, err + } + sort.Slice(doc.Jobs, func(i, j int) bool { + if doc.Jobs[i].CreatedAt.Equal(doc.Jobs[j].CreatedAt) { + return doc.Jobs[i].ID < doc.Jobs[j].ID + } + return doc.Jobs[i].CreatedAt.Before(doc.Jobs[j].CreatedAt) + }) + return doc.Jobs, nil +} + +// Get returns the job with the given ID. The bool reports whether it was found. +func (s *Store) Get(id string) (Job, bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + doc, err := s.loadDoc() + if err != nil { + return Job{}, false, err + } + for _, j := range doc.Jobs { + if j.ID == id { + return j, true, nil + } + } + return Job{}, false, nil +} + +// Put upserts a job by ID: it replaces an existing job with the same ID, or +// appends it if absent. The job is validated first. +func (s *Store) Put(job Job) error { + if err := job.Validate(); err != nil { + return err + } + if job.ID == "" { + return fmt.Errorf("schedule: Put requires a job ID") + } + s.mu.Lock() + defer s.mu.Unlock() + doc, err := s.loadDoc() + if err != nil { + return err + } + for i := range doc.Jobs { + if doc.Jobs[i].ID == job.ID { + if job.CreatedAt.IsZero() { + job.CreatedAt = doc.Jobs[i].CreatedAt + } + doc.Jobs[i] = job + return s.saveDoc(doc) + } + } + if job.CreatedAt.IsZero() { + job.CreatedAt = time.Now().UTC() + } + doc.Jobs = append(doc.Jobs, job) + return s.saveDoc(doc) +} + +// Remove deletes a job (and its runtime state) by ID. Removing a job that +// does not exist returns an error so the CLI can report it. +func (s *Store) Remove(id string) error { + s.mu.Lock() + defer s.mu.Unlock() + doc, err := s.loadDoc() + if err != nil { + return err + } + idx := -1 + for i := range doc.Jobs { + if doc.Jobs[i].ID == id { + idx = i + break + } + } + if idx < 0 { + return fmt.Errorf("schedule: no job with ID %q", id) + } + doc.Jobs = append(doc.Jobs[:idx], doc.Jobs[idx+1:]...) + if err := s.saveDoc(doc); err != nil { + return err + } + // Best-effort cleanup of orphaned runtime state. + sd, err := s.loadState() + if err == nil { + if _, ok := sd.States[id]; ok { + delete(sd.States, id) + _ = s.saveState(sd) + } + } + return nil +} + +// SetEnabled flips a job's Enabled flag. +func (s *Store) SetEnabled(id string, enabled bool) error { + s.mu.Lock() + defer s.mu.Unlock() + doc, err := s.loadDoc() + if err != nil { + return err + } + for i := range doc.Jobs { + if doc.Jobs[i].ID == id { + doc.Jobs[i].Enabled = enabled + return s.saveDoc(doc) + } + } + return fmt.Errorf("schedule: no job with ID %q", id) +} + +// ModTime returns the last-modified time of the schedules file, or the zero +// time if it does not exist yet. The engine polls this for cheap hot-reload +// detection without parsing the file. +func (s *Store) ModTime() time.Time { + info, err := os.Stat(filepath.Join(s.dir, schedulesFile)) + if err != nil { + return time.Time{} + } + return info.ModTime() +} + +// ── Runtime state ─────────────────────────────────────────────────────── + +// LoadState returns runtime state for all jobs, keyed by job ID. A missing +// state file yields an empty (non-nil) map. +func (s *Store) LoadState() (map[string]RunState, error) { + s.mu.Lock() + defer s.mu.Unlock() + sd, err := s.loadState() + if err != nil { + return nil, err + } + return sd.States, nil +} + +// SaveState writes (or replaces) the runtime state for a single job. Other +// jobs' state is preserved. +func (s *Store) SaveState(st RunState) error { + if st.JobID == "" { + return fmt.Errorf("schedule: SaveState requires a JobID") + } + s.mu.Lock() + defer s.mu.Unlock() + sd, err := s.loadState() + if err != nil { + return err + } + sd.States[st.JobID] = st + return s.saveState(sd) +} + +// ── Internal IO (callers hold s.mu) ───────────────────────────────────── + +func (s *Store) loadDoc() (*scheduleDoc, error) { + doc := &scheduleDoc{Version: 1} + if err := readJSON(filepath.Join(s.dir, schedulesFile), doc); err != nil { + return nil, err + } + return doc, nil +} + +func (s *Store) saveDoc(doc *scheduleDoc) error { + if doc.Version == 0 { + doc.Version = 1 + } + return writeJSONAtomic(filepath.Join(s.dir, schedulesFile), doc) +} + +func (s *Store) loadState() (*stateDoc, error) { + sd := &stateDoc{Version: 1, States: map[string]RunState{}} + if err := readJSON(filepath.Join(s.dir, stateFile), sd); err != nil { + return nil, err + } + if sd.States == nil { + sd.States = map[string]RunState{} + } + return sd, nil +} + +func (s *Store) saveState(sd *stateDoc) error { + if sd.Version == 0 { + sd.Version = 1 + } + return writeJSONAtomic(filepath.Join(s.dir, stateFile), sd) +} + +// readJSON decodes path into v. A missing file is not an error — v is left at +// its zero/default value so callers start from an empty document. +func readJSON(path string, v any) error { + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("schedule: read %s: %w", filepath.Base(path), err) + } + if len(data) == 0 { + return nil + } + if err := json.Unmarshal(data, v); err != nil { + return fmt.Errorf("schedule: parse %s: %w", filepath.Base(path), err) + } + return nil +} + +// writeJSONAtomic marshals v and writes it to path via a temp file + rename, +// so a reader never observes a half-written file and a swapped-in symlink is +// replaced rather than followed. Files are 0600 since tasks may reference +// secrets. +func writeJSONAtomic(path string, v any) error { + data, err := json.MarshalIndent(v, "", " ") + if err != nil { + return fmt.Errorf("schedule: marshal %s: %w", filepath.Base(path), err) + } + tmp := path + ".tmp" + if err := os.WriteFile(tmp, data, 0600); err != nil { + os.Remove(tmp) + return fmt.Errorf("schedule: write %s: %w", filepath.Base(path), err) + } + if err := os.Rename(tmp, path); err != nil { + os.Remove(tmp) + return fmt.Errorf("schedule: rename %s: %w", filepath.Base(path), err) + } + return nil +} + +// newJobID returns a stable, collision-resistant job ID like "jb-1a2b3c4d". +func newJobID() string { + buf := make([]byte, 4) + rand.Read(buf) //nolint:errcheck // crypto/rand.Read never returns an error on supported platforms + return "jb-" + hex.EncodeToString(buf) +} diff --git a/internal/schedule/store_test.go b/internal/schedule/store_test.go new file mode 100644 index 0000000..31c60fc --- /dev/null +++ b/internal/schedule/store_test.go @@ -0,0 +1,293 @@ +package schedule + +import ( + "os" + "path/filepath" + "sync" + "testing" + "time" +) + +func newTestStore(t *testing.T) *Store { + t.Helper() + st, err := NewStoreAt(t.TempDir()) + if err != nil { + t.Fatalf("NewStoreAt: %v", err) + } + return st +} + +func sampleJob() Job { + return Job{ + Name: "standup", + Cron: "0 9 * * 1-5", + Task: "Remind me about standup", + Deliver: Delivery{Kind: DeliverStdout}, + Enabled: true, + } +} + +// ── Validation ────────────────────────────────────────────────────────── + +func TestValidate(t *testing.T) { + tests := []struct { + name string + mutate func(*Job) + wantErr bool + }{ + {"valid", func(*Job) {}, false}, + {"empty task", func(j *Job) { j.Task = "" }, true}, + {"bad cron", func(j *Job) { j.Cron = "nope" }, true}, + {"bad timezone", func(j *Job) { j.Timezone = "Mars/Phobos" }, true}, + {"good timezone", func(j *Job) { j.Timezone = "Europe/Berlin" }, false}, + {"empty deliver kind", func(j *Job) { j.Deliver.Kind = "" }, true}, + {"unknown deliver kind", func(j *Job) { j.Deliver.Kind = "carrier-pigeon" }, true}, + {"telegram deliver", func(j *Job) { j.Deliver = Delivery{Kind: DeliverTelegram, ChatID: 42} }, false}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + j := sampleJob() + tc.mutate(&j) + err := j.Validate() + if (err != nil) != tc.wantErr { + t.Errorf("Validate() err = %v, wantErr %v", err, tc.wantErr) + } + }) + } +} + +// ── CRUD ──────────────────────────────────────────────────────────────── + +func TestAdd_AssignsIDAndCreatedAt(t *testing.T) { + st := newTestStore(t) + got, err := st.Add(sampleJob()) + if err != nil { + t.Fatalf("Add: %v", err) + } + if got.ID == "" { + t.Error("Add did not assign an ID") + } + if got.CreatedAt.IsZero() { + t.Error("Add did not stamp CreatedAt") + } +} + +func TestAdd_RejectsInvalid(t *testing.T) { + st := newTestStore(t) + bad := sampleJob() + bad.Cron = "garbage" + if _, err := st.Add(bad); err == nil { + t.Error("Add accepted an invalid job") + } + // Nothing should have been persisted. + jobs, _ := st.List() + if len(jobs) != 0 { + t.Errorf("expected 0 jobs after rejected Add, got %d", len(jobs)) + } +} + +func TestAdd_DuplicateID(t *testing.T) { + st := newTestStore(t) + j := sampleJob() + j.ID = "jb-fixed" + if _, err := st.Add(j); err != nil { + t.Fatalf("first Add: %v", err) + } + if _, err := st.Add(j); err == nil { + t.Error("Add accepted a duplicate ID") + } +} + +func TestGetAndList(t *testing.T) { + st := newTestStore(t) + a, _ := st.Add(sampleJob()) + b := sampleJob() + b.Name = "second" + bAdded, _ := st.Add(b) + + got, ok, err := st.Get(a.ID) + if err != nil || !ok { + t.Fatalf("Get(%s) ok=%v err=%v", a.ID, ok, err) + } + if got.Name != "standup" { + t.Errorf("Get returned wrong job: %+v", got) + } + + _, ok, _ = st.Get("jb-missing") + if ok { + t.Error("Get returned ok for a missing ID") + } + + jobs, _ := st.List() + if len(jobs) != 2 { + t.Fatalf("List len = %d, want 2", len(jobs)) + } + _ = bAdded +} + +func TestPut_UpsertAndPreservesCreatedAt(t *testing.T) { + st := newTestStore(t) + a, _ := st.Add(sampleJob()) + created := a.CreatedAt + + a.Task = "updated task" + a.CreatedAt = time.Time{} // simulate a caller that didn't carry it + if err := st.Put(a); err != nil { + t.Fatalf("Put: %v", err) + } + got, _, _ := st.Get(a.ID) + if got.Task != "updated task" { + t.Errorf("Put did not update task: %q", got.Task) + } + if !got.CreatedAt.Equal(created) { + t.Errorf("Put did not preserve CreatedAt: got %v want %v", got.CreatedAt, created) + } + + // Put with a fresh ID inserts. + n := sampleJob() + n.ID = "jb-new" + if err := st.Put(n); err != nil { + t.Fatalf("Put insert: %v", err) + } + if jobs, _ := st.List(); len(jobs) != 2 { + t.Errorf("expected 2 jobs after Put-insert, got %d", len(jobs)) + } +} + +func TestRemove(t *testing.T) { + st := newTestStore(t) + a, _ := st.Add(sampleJob()) + // Seed runtime state so we can confirm it is cleaned up. + if err := st.SaveState(RunState{JobID: a.ID, LastStatus: StatusOK}); err != nil { + t.Fatalf("SaveState: %v", err) + } + if err := st.Remove(a.ID); err != nil { + t.Fatalf("Remove: %v", err) + } + if jobs, _ := st.List(); len(jobs) != 0 { + t.Errorf("expected 0 jobs after Remove, got %d", len(jobs)) + } + states, _ := st.LoadState() + if _, ok := states[a.ID]; ok { + t.Error("Remove did not clean up runtime state") + } + if err := st.Remove("jb-missing"); err == nil { + t.Error("Remove of missing ID should error") + } +} + +func TestSetEnabled(t *testing.T) { + st := newTestStore(t) + a, _ := st.Add(sampleJob()) + if err := st.SetEnabled(a.ID, false); err != nil { + t.Fatalf("SetEnabled: %v", err) + } + got, _, _ := st.Get(a.ID) + if got.Enabled { + t.Error("SetEnabled(false) did not disable the job") + } + if err := st.SetEnabled("jb-missing", true); err == nil { + t.Error("SetEnabled on missing ID should error") + } +} + +// ── Persistence / round-trip ──────────────────────────────────────────── + +func TestPersistenceRoundTrip(t *testing.T) { + dir := t.TempDir() + st1, _ := NewStoreAt(dir) + a, _ := st1.Add(sampleJob()) + + // Re-open from the same dir — data must survive. + st2, _ := NewStoreAt(dir) + got, ok, err := st2.Get(a.ID) + if err != nil || !ok { + t.Fatalf("reopened Get ok=%v err=%v", ok, err) + } + if got.Cron != "0 9 * * 1-5" || got.Name != "standup" { + t.Errorf("round-trip mismatch: %+v", got) + } +} + +func TestState_RoundTripAndIsolation(t *testing.T) { + st := newTestStore(t) + now := time.Now().UTC().Truncate(time.Second) + if err := st.SaveState(RunState{JobID: "jb-1", LastStatus: StatusOK, LastRun: now}); err != nil { + t.Fatalf("SaveState 1: %v", err) + } + if err := st.SaveState(RunState{JobID: "jb-2", LastStatus: StatusError, LastError: "boom"}); err != nil { + t.Fatalf("SaveState 2: %v", err) + } + // Updating jb-2 must not disturb jb-1. + if err := st.SaveState(RunState{JobID: "jb-2", LastStatus: StatusOK}); err != nil { + t.Fatalf("SaveState 2b: %v", err) + } + states, _ := st.LoadState() + // jb-1 must be untouched by jb-2's writes. + if states["jb-1"].LastStatus != StatusOK || !states["jb-1"].LastRun.Equal(now) { + t.Errorf("jb-1 state corrupted: %+v", states["jb-1"]) + } + // jb-2 was re-saved; a fresh write replaces the whole entry, so the prior + // LastError ("boom") must be gone. + if states["jb-2"].LastStatus != StatusOK { + t.Errorf("jb-2 status not updated: %+v", states["jb-2"]) + } + if states["jb-2"].LastError != "" { + t.Errorf("jb-2 LastError should be cleared by full-entry replace, got %q", states["jb-2"].LastError) + } + if err := st.SaveState(RunState{}); err == nil { + t.Error("SaveState with empty JobID should error") + } +} + +func TestModTime(t *testing.T) { + st := newTestStore(t) + if !st.ModTime().IsZero() { + t.Error("ModTime should be zero before any write") + } + if _, err := st.Add(sampleJob()); err != nil { + t.Fatalf("Add: %v", err) + } + if st.ModTime().IsZero() { + t.Error("ModTime should be set after a write") + } +} + +// ── Atomicity / no temp leftovers ─────────────────────────────────────── + +func TestAtomicWrite_NoTempLeftover(t *testing.T) { + dir := t.TempDir() + st, _ := NewStoreAt(dir) + if _, err := st.Add(sampleJob()); err != nil { + t.Fatalf("Add: %v", err) + } + entries, _ := os.ReadDir(dir) + for _, e := range entries { + if filepath.Ext(e.Name()) == ".tmp" { + t.Errorf("temp file left behind: %s", e.Name()) + } + } +} + +// ── Concurrency (run with -race) ──────────────────────────────────────── + +func TestConcurrentStateWrites(t *testing.T) { + st := newTestStore(t) + var wg sync.WaitGroup + for i := range 20 { + wg.Add(1) + go func(n int) { + defer wg.Done() + id := "jb-" + string(rune('a'+n%5)) + _ = st.SaveState(RunState{JobID: id, Runs: n, LastStatus: StatusOK}) + }(i) + } + wg.Wait() + states, err := st.LoadState() + if err != nil { + t.Fatalf("LoadState: %v", err) + } + if len(states) != 5 { + t.Errorf("expected 5 distinct job states, got %d", len(states)) + } +} diff --git a/internal/schedule/types.go b/internal/schedule/types.go new file mode 100644 index 0000000..50e9228 --- /dev/null +++ b/internal/schedule/types.go @@ -0,0 +1,74 @@ +// Package schedule provides a native, in-process task scheduler for odek. +// +// It runs agent tasks on a cron schedule from inside a long-lived process +// (the Telegram bot, the `odek schedule daemon`, or `odek serve`) and +// delivers each result somewhere (Telegram, stdout, a log file). Running +// in-process is the whole point: the host process has already resolved its +// configuration (API key, model, bot token, default chat) into memory, so a +// scheduled task sees exactly what an interactive one does — no environment +// inheritance games, no external cron daemon, no container-only behaviour. +// +// The package is deliberately decoupled from the agent and Telegram packages. +// The firing engine (Scheduler) talks to the rest of odek through two small +// interfaces, Runner and Deliverer, so it can be unit-tested against fakes +// and reused by every host process unchanged. +// +// Layout on disk (mirrors the rest of ~/.odek): +// +// ~/.odek/schedules.json job definitions (user-editable, 0600) +// ~/.odek/schedule-state.json runtime state: last/next run, status (0600) +// +// Definitions and runtime state are kept in separate files on purpose: the +// definitions file is something a human edits or the CLI rewrites, while the +// state file churns on every fire. Keeping them apart means a hand-edit never +// races with a state write and the definitions file stays diff-clean. +package schedule + +import "time" + +// Delivery kinds. A job's result is routed to exactly one destination. +const ( + DeliverTelegram = "telegram" // send via the bot to ChatID (0 = default_chat_id) + DeliverStdout = "stdout" // print to the daemon's stdout + DeliverLog = "log" // append to the schedule run log +) + +// Run-status values recorded in RunState.LastStatus. +const ( + StatusOK = "ok" // task ran and delivered + StatusError = "error" // task or delivery failed (see LastError) + StatusSkipped = "skipped" // a due fire was intentionally not run (e.g. missed while down, catchup off) +) + +// Delivery describes where a job's result is sent. +type Delivery struct { + Kind string `json:"kind"` // one of the Deliver* constants + ChatID int64 `json:"chat_id,omitempty"` // telegram only; 0 = use the configured default_chat_id +} + +// Job is a single scheduled agent task. Definitions live in schedules.json. +// All fields are exported so the CLI layer can construct and mutate jobs +// directly, matching the convention used by session.Session. +type Job struct { + ID string `json:"id"` // stable short id, e.g. "jb-ab12cd" + Name string `json:"name"` // human-readable label + Cron string `json:"cron"` // 5-field expression or @macro (see cronexpr.go) + Task string `json:"task"` // the prompt handed to the agent + Deliver Delivery `json:"deliver"` // where the result goes + Enabled bool `json:"enabled"` // disabled jobs are parsed but never fired + Catchup bool `json:"catchup,omitempty"` // if a fire was missed while the process was down, run once on startup + Timezone string `json:"timezone,omitempty"` // IANA name (e.g. "Europe/Berlin"); "" = scheduler default + CreatedAt time.Time `json:"created_at"` // when the job was added +} + +// RunState is the mutable runtime state for one job, persisted in +// schedule-state.json keyed by Job.ID. It is updated after every fire. +type RunState struct { + JobID string `json:"job_id"` + LastRun time.Time `json:"last_run,omitzero"` // omitzero (not omitempty) — time.Time is a struct + LastStatus string `json:"last_status,omitempty"` // one of the Status* constants + LastError string `json:"last_error,omitempty"` // populated when LastStatus == StatusError + LastResult string `json:"last_result,omitempty"` // truncated preview of the delivered text + NextRun time.Time `json:"next_run,omitzero"` // computed projected next fire + Runs int `json:"runs,omitempty"` // total successful + failed fires +} From f6753e2261e7c82183ec064d4195b4c4ec06798d Mon Sep 17 00:00:00 2001 From: Rolando Santamaria Maso Date: Thu, 4 Jun 2026 21:10:15 +0200 Subject: [PATCH 02/11] feat(schedule): odek schedule CLI + headless runner/deliverers (phase 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire the scheduler core into the CLI and give it a way to actually run tasks. - cmd/odek/schedule.go: `odek schedule `. * add: flag-parsed (--name/--cron/--deliver/--tz/--catchup/--disabled) with a trailing task; validates and shows the next fire. * list: tabular view with computed next-fire (local time) and last status. * next: previews upcoming fires for a job ID or a raw expression. * run: fires one job immediately and delivers (test a job). * daemon: foreground scheduler with a singleton pid lock (refuses a second instance rather than usurping a live one) and graceful SIGINT/SIGTERM drain. - runTaskHeadless: builds a fresh agent with a silent (io.Discard) renderer, interaction off, and no approver — the resolved danger policy governs what an unattended task may do, mirroring non-interactive `odek run`. - agentRunner / cliDeliverer implement the schedule.Runner / schedule.Deliverer interfaces; delivery routes to stdout, ~/.odek/schedule.log, or Telegram (honouring a per-job chat ID, falling back to default_chat_id). - dispatch + printUsage wired for the new command. Tests cover parseDeliver, deliverString, firstWords, jobSchedule, and the deliverer branches (log append, telegram misconfig errors, unknown kind). Smoke-tested end to end: add/list/next/enable/disable/rm, schedules.json at 0600, and daemon start → second-instance refused → clean SIGINT drain. Co-Authored-By: Claude Opus 4.8 (1M context) --- cmd/odek/dispatch.go | 2 + cmd/odek/main.go | 5 + cmd/odek/schedule.go | 532 ++++++++++++++++++++++++++++++++++++++ cmd/odek/schedule_test.go | 135 ++++++++++ 4 files changed, 674 insertions(+) create mode 100644 cmd/odek/schedule.go create mode 100644 cmd/odek/schedule_test.go diff --git a/cmd/odek/dispatch.go b/cmd/odek/dispatch.go index 12be9cf..95c9db2 100644 --- a/cmd/odek/dispatch.go +++ b/cmd/odek/dispatch.go @@ -51,6 +51,8 @@ func dispatch(args []string) int { return cliExit(mcpCmd(rest)) case "telegram": return cliExit(telegramCmd(rest)) + case "schedule": + return cliExit(scheduleCmd(rest)) default: fmt.Fprintf(os.Stderr, "odek: unknown command %q\n", cmd) printUsage() diff --git a/cmd/odek/main.go b/cmd/odek/main.go index 32155d8..d17f06e 100644 --- a/cmd/odek/main.go +++ b/cmd/odek/main.go @@ -476,6 +476,7 @@ func printUsage() { odek skill odek mcp [--sandbox] odek telegram + odek schedule odek version Commands: @@ -497,6 +498,10 @@ Commands: mcp Start MCP server (Model Context Protocol) over stdio Exposes all built-in tools for Claude Code, Cursor, etc. telegram Start Telegram bot (long-polling mode) + schedule Manage native in-process scheduled tasks (cron) + Subcommands: list, add, rm, enable, disable, run, next, daemon + The daemon (or the Telegram bot) fires jobs and delivers + results to stdout, a log, or a Telegram chat. init Create a config file (default: ./odek.json) version Print version and exit diff --git a/cmd/odek/schedule.go b/cmd/odek/schedule.go new file mode 100644 index 0000000..e10b87c --- /dev/null +++ b/cmd/odek/schedule.go @@ -0,0 +1,532 @@ +package main + +import ( + "context" + "flag" + "fmt" + "io" + "os" + "os/signal" + "path/filepath" + "strconv" + "strings" + "syscall" + "text/tabwriter" + "time" + + "github.com/BackendStack21/odek" + "github.com/BackendStack21/odek/internal/config" + "github.com/BackendStack21/odek/internal/llm" + "github.com/BackendStack21/odek/internal/render" + "github.com/BackendStack21/odek/internal/schedule" + "github.com/BackendStack21/odek/internal/telegram" +) + +// scheduleCmd is the entry point for "odek schedule" — management of native, +// in-process scheduled agent tasks (see internal/schedule). +func scheduleCmd(args []string) error { + if len(args) == 0 { + printScheduleUsage() + return nil + } + + // daemon and run resolve their own config; the rest only touch the store. + switch args[0] { + case "daemon": + return scheduleDaemon(args[1:]) + case "run": + return scheduleRunNow(args[1:]) + } + + st, err := schedule.NewStore() + if err != nil { + return err + } + switch args[0] { + case "list", "ls": + return scheduleList(st) + case "add": + return scheduleAdd(st, args[1:]) + case "rm", "remove", "delete": + return scheduleRemove(st, args[1:]) + case "enable": + return scheduleSetEnabled(st, args[1:], true) + case "disable": + return scheduleSetEnabled(st, args[1:], false) + case "next": + return scheduleNext(st, args[1:]) + default: + return fmt.Errorf("unknown schedule command %q (use list, add, rm, enable, disable, run, next, daemon)", args[0]) + } +} + +func printScheduleUsage() { + fmt.Println(`Usage: odek schedule + +Commands: + list List scheduled jobs (id, next fire, last status) + add --cron "" Add a job (see flags below) + rm Remove a job + enable Enable a job + disable Disable a job (kept, but never fires) + run Run a job once now and deliver (test it) + next Show the next few fire times + daemon Run the scheduler in the foreground + +Add flags: + --name