From 85f9b9d2a018e0b5f5e3778213464b98bd2733d7 Mon Sep 17 00:00:00 2001 From: "Robin F. Pronk" Date: Wed, 27 May 2026 12:40:29 +0200 Subject: [PATCH 1/7] Add basic health metrics to prometheus exporter Signed-off-by: Robin F. Pronk --- go/metrics/prometheus.go | 45 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/go/metrics/prometheus.go b/go/metrics/prometheus.go index e0fae156..b01758a4 100644 --- a/go/metrics/prometheus.go +++ b/go/metrics/prometheus.go @@ -17,9 +17,13 @@ package metrics import ( + "sync/atomic" + "github.com/prometheus/client_golang/prometheus" "github.com/proxysql/golib/log" "github.com/proxysql/orchestrator/go/config" + "github.com/proxysql/orchestrator/go/process" + orcraft "github.com/proxysql/orchestrator/go/raft" ) // Prometheus metric variables, exported so other packages can update them. @@ -54,6 +58,43 @@ var ( Help: "Duration of recovery operations in seconds.", Buckets: prometheus.ExponentialBuckets(0.5, 2, 12), // 0.5s to ~1024s }) + + HealthIsReady = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "orchestrator_health_is_ready", + Help: "1 if the node is ready, 0 otherwise.", + }, func() float64 { + if atomic.LoadInt64(&process.LastContinousCheckHealthy) == 1 { + return 1 + } + return 0 + }) + + HealthIsLive = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "orchestrator_health_is_live", + Help: "1 if the node is live, 0 otherwise.", + }, func() float64 { + return 1 + }) + + RaftIsHealthy = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "orchestrator_raft_is_healthy", + Help: "1 if the node is a healthy raft node, 0 otherwise.", + }, func() float64 { + if orcraft.IsRaftEnabled() && orcraft.IsHealthy() { + return 1 + } + return 0 + }) + + RaftIsLeader = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "orchestrator_raft_is_leader", + Help: "1 if the node is the raft leader, 0 otherwise.", + }, func() float64 { + if orcraft.IsRaftEnabled() && orcraft.IsLeader() { + return 1 + } + return 0 + }) ) // InitPrometheus registers all Prometheus metrics. Should be called once at @@ -70,5 +111,9 @@ func InitPrometheus() { ClustersTotal, RecoveriesTotal, RecoveryDurationSeconds, + HealthIsReady, + HealthIsLive, + RaftIsHealthy, + RaftIsLeader, ) } From 6ee801d357a2f7b5524f50fc6f714627be548805 Mon Sep 17 00:00:00 2001 From: "Robin F. Pronk" Date: Wed, 27 May 2026 12:45:52 +0200 Subject: [PATCH 2/7] Add more useful metrics Signed-off-by: Robin F. Pronk --- go/logic/orchestrator.go | 20 +++++++++++++++ go/metrics/prometheus.go | 54 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/go/logic/orchestrator.go b/go/logic/orchestrator.go index 04f99dca..fb530cc4 100644 --- a/go/logic/orchestrator.go +++ b/go/logic/orchestrator.go @@ -91,6 +91,7 @@ func init() { ometrics.OnMetricsTick(func() { discoveryQueueLengthGauge.Update(int64(discoveryQueue.QueueLen())) + ometrics.DiscoveryQueueLength.Set(float64(discoveryQueue.QueueLen())) }) ometrics.OnMetricsTick(func() { if recentDiscoveryOperationKeys == nil { @@ -114,6 +115,24 @@ func init() { ometrics.OnMetricsTick(func() { isRaftLeaderGauge.Update(atomic.LoadInt64(&isElectedNode)) }) + ometrics.OnMetricsTick(func() { + if instances, err := inst.ReadDowntimedInstances(""); err == nil { + ometrics.DowntimedInstances.Set(float64(len(instances))) + } + if recoveries, err := ReadActiveRecoveries(); err == nil { + ometrics.ActiveRecoveries.Set(float64(len(recoveries))) + } + if analysis, err := inst.GetReplicationAnalysis("", &inst.ReplicationAnalysisHints{}); err == nil { + issuesCount := make(map[string]int) + for _, a := range analysis { + issuesCount[string(a.Analysis)]++ + } + ometrics.ActiveTopologyIssues.Reset() + for issueType, count := range issuesCount { + ometrics.ActiveTopologyIssues.WithLabelValues(issueType).Set(float64(count)) + } + } + }) } func IsLeader() bool { @@ -192,6 +211,7 @@ func handleDiscoveryRequests() { _ = metrics.Register("discoveries.dead_instances_queue_length", deadInstancesDiscoveryQueueLengthGauge) ometrics.OnMetricsTick(func() { deadInstancesDiscoveryQueueLengthGauge.Update(int64(deadInstancesDiscoveryQueue.QueueLen())) + ometrics.DeadInstancesDiscoveryQueueLength.Set(float64(deadInstancesDiscoveryQueue.QueueLen())) }) // create a pool of discovery workers diff --git a/go/metrics/prometheus.go b/go/metrics/prometheus.go index b01758a4..86e17d32 100644 --- a/go/metrics/prometheus.go +++ b/go/metrics/prometheus.go @@ -95,6 +95,53 @@ var ( } return 0 }) + + DiscoveryQueueLength = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "orchestrator_discovery_queue_length", + Help: "Length of the discovery queue.", + }) + + DeadInstancesDiscoveryQueueLength = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "orchestrator_dead_instances_discovery_queue_length", + Help: "Length of the dead instances discovery queue.", + }) + + ActiveRecoveries = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "orchestrator_active_recoveries", + Help: "Number of active (in-progress) recoveries.", + }) + + DowntimedInstances = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "orchestrator_downtimed_instances_total", + Help: "Number of instances currently marked as downtimed.", + }) + + ActiveTopologyIssues = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "orchestrator_active_topology_issues", + Help: "Number of active topology issues, broken down by issue type.", + }, []string{"issue_type"}) + + RaftPeersTotal = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "orchestrator_raft_peers_total", + Help: "Number of known raft peers.", + }, func() float64 { + if orcraft.IsRaftEnabled() { + if peers, err := orcraft.GetPeers(); err == nil { + return float64(len(peers)) + } + } + return 0 + }) + + RaftIsPartOfQuorum = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "orchestrator_raft_is_part_of_quorum", + Help: "1 if the node is part of raft quorum, 0 otherwise.", + }, func() float64 { + if orcraft.IsRaftEnabled() && orcraft.IsPartOfQuorum() { + return 1 + } + return 0 + }) ) // InitPrometheus registers all Prometheus metrics. Should be called once at @@ -115,5 +162,12 @@ func InitPrometheus() { HealthIsLive, RaftIsHealthy, RaftIsLeader, + DiscoveryQueueLength, + DeadInstancesDiscoveryQueueLength, + ActiveRecoveries, + DowntimedInstances, + ActiveTopologyIssues, + RaftPeersTotal, + RaftIsPartOfQuorum, ) } From 01f14af2f330ecf22d8ffdca41f3590f17666b13 Mon Sep 17 00:00:00 2001 From: "Robin F. Pronk" Date: Wed, 27 May 2026 12:47:57 +0200 Subject: [PATCH 3/7] Fix orchestrator_instances_total Signed-off-by: Robin F. Pronk --- go/logic/orchestrator.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/go/logic/orchestrator.go b/go/logic/orchestrator.go index fb530cc4..ed4a369d 100644 --- a/go/logic/orchestrator.go +++ b/go/logic/orchestrator.go @@ -132,6 +132,14 @@ func init() { ometrics.ActiveTopologyIssues.WithLabelValues(issueType).Set(float64(count)) } } + if clusters, err := inst.ReadClustersInfo(""); err == nil { + ometrics.ClustersTotal.Set(float64(len(clusters))) + var instancesTotal uint + for _, c := range clusters { + instancesTotal += c.CountInstances + } + ometrics.InstancesTotal.Set(float64(instancesTotal)) + } }) } From 38005f5069ec79eeb7d082445deced379c7130fc Mon Sep 17 00:00:00 2001 From: "Robin F. Pronk" Date: Wed, 27 May 2026 12:50:06 +0200 Subject: [PATCH 4/7] Fix discovery related metrics Signed-off-by: Robin F. Pronk --- go/logic/orchestrator.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/logic/orchestrator.go b/go/logic/orchestrator.go index ed4a369d..7716c23f 100644 --- a/go/logic/orchestrator.go +++ b/go/logic/orchestrator.go @@ -298,6 +298,7 @@ func DiscoverInstance(instanceKey inst.InstanceKey) { } discoveriesCounter.Inc(1) + ometrics.DiscoveriesTotal.Inc() // First we've ever heard of this instance. Continue investigation: var err error @@ -318,6 +319,7 @@ func DiscoverInstance(instanceKey inst.InstanceKey) { if instance == nil { failedDiscoveriesCounter.Inc(1) + ometrics.DiscoveryErrorsTotal.Inc() _ = discoveryMetrics.Append(&discovery.Metric{ Timestamp: time.Now(), InstanceKey: instanceKey, From 23173b988f8011291261b4b0005d4217f67237ab Mon Sep 17 00:00:00 2001 From: "Robin F. Pronk" Date: Wed, 27 May 2026 12:58:10 +0200 Subject: [PATCH 5/7] Fix orchestrator_recoveries_total and orchestrator_recovery_duration_seconds Signed-off-by: Robin F. Pronk --- go/logic/topology_recovery.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/go/logic/topology_recovery.go b/go/logic/topology_recovery.go index af8cce1a..03de102d 100644 --- a/go/logic/topology_recovery.go +++ b/go/logic/topology_recovery.go @@ -1895,7 +1895,20 @@ func executeCheckAndRecoverFunction(analysisEntry inst.ReplicationAnalysis, cand if isActionableRecovery || util.ClearToLog("executeCheckAndRecoverFunction: recovery", analysisEntry.AnalyzedInstanceKey.StringCode()) { log.Infof("executeCheckAndRecoverFunction: proceeding with %+v recovery on %+v; isRecoverable?: %+v; skipProcesses: %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceKey, isActionableRecovery, skipProcesses) } + startTime := time.Now() recoveryAttempted, topologyRecovery, err = checkAndRecoverFunction(analysisEntry, candidateInstanceKey, forceInstanceRecovery, skipProcesses) + + if recoveryAttempted && topologyRecovery != nil { + duration := time.Since(startTime).Seconds() + ometrics.RecoveryDurationSeconds.Observe(duration) + + result := "success" + if !topologyRecovery.IsSuccessful { + result = "failed" + } + ometrics.RecoveriesTotal.WithLabelValues(string(analysisEntry.Analysis), result).Inc() + } + if !recoveryAttempted { return recoveryAttempted, topologyRecovery, err } From ad282dd1801899c66959ea31cc3121aff4ec6b9b Mon Sep 17 00:00:00 2001 From: "Robin F. Pronk" Date: Wed, 27 May 2026 15:15:20 +0200 Subject: [PATCH 6/7] Add docstring, performance tuning and nil protection Signed-off-by: Robin F. Pronk --- go/logic/orchestrator.go | 63 ++++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/go/logic/orchestrator.go b/go/logic/orchestrator.go index 7716c23f..fbab1652 100644 --- a/go/logic/orchestrator.go +++ b/go/logic/orchestrator.go @@ -90,8 +90,10 @@ func init() { _ = metrics.Register("raft.is_leader", isRaftLeaderGauge) ometrics.OnMetricsTick(func() { - discoveryQueueLengthGauge.Update(int64(discoveryQueue.QueueLen())) - ometrics.DiscoveryQueueLength.Set(float64(discoveryQueue.QueueLen())) + if discoveryQueue != nil { + discoveryQueueLengthGauge.Update(int64(discoveryQueue.QueueLen())) + ometrics.DiscoveryQueueLength.Set(float64(discoveryQueue.QueueLen())) + } }) ometrics.OnMetricsTick(func() { if recentDiscoveryOperationKeys == nil { @@ -115,34 +117,9 @@ func init() { ometrics.OnMetricsTick(func() { isRaftLeaderGauge.Update(atomic.LoadInt64(&isElectedNode)) }) - ometrics.OnMetricsTick(func() { - if instances, err := inst.ReadDowntimedInstances(""); err == nil { - ometrics.DowntimedInstances.Set(float64(len(instances))) - } - if recoveries, err := ReadActiveRecoveries(); err == nil { - ometrics.ActiveRecoveries.Set(float64(len(recoveries))) - } - if analysis, err := inst.GetReplicationAnalysis("", &inst.ReplicationAnalysisHints{}); err == nil { - issuesCount := make(map[string]int) - for _, a := range analysis { - issuesCount[string(a.Analysis)]++ - } - ometrics.ActiveTopologyIssues.Reset() - for issueType, count := range issuesCount { - ometrics.ActiveTopologyIssues.WithLabelValues(issueType).Set(float64(count)) - } - } - if clusters, err := inst.ReadClustersInfo(""); err == nil { - ometrics.ClustersTotal.Set(float64(len(clusters))) - var instancesTotal uint - for _, c := range clusters { - instancesTotal += c.CountInstances - } - ometrics.InstancesTotal.Set(float64(instancesTotal)) - } - }) } +// IsLeader returns true if the current orchestrator instance is the cluster leader. func IsLeader() bool { if orcraft.IsRaftEnabled() { return orcraft.IsLeader() @@ -150,6 +127,8 @@ func IsLeader() bool { return atomic.LoadInt64(&isElectedNode) == 1 } +// IsLeaderOrActive returns true if the current orchestrator instance is either the cluster leader +// or an active member (when Raft is enabled, returns true if part of quorum). func IsLeaderOrActive() bool { if orcraft.IsRaftEnabled() { return orcraft.IsPartOfQuorum() @@ -611,6 +590,7 @@ func ContinuousDiscovery() { raftCaretakingTick := time.Tick(10 * time.Minute) recoveryTick := time.Tick(time.Duration(config.RecoveryPollSeconds) * time.Second) autoPseudoGTIDTick := time.Tick(time.Duration(config.PseudoGTIDIntervalSeconds) * time.Second) + dbMetricsTick := time.Tick(30 * time.Second) var recoveryEntrance int64 var snapshotTopologiesTick <-chan time.Time if config.Config.SnapshotTopologiesIntervalHours > 0 { @@ -642,6 +622,33 @@ func ContinuousDiscovery() { log.Infof("continuous discovery: starting") for { select { + case <-dbMetricsTick: + go func() { + if instances, err := inst.ReadDowntimedInstances(""); err == nil { + ometrics.DowntimedInstances.Set(float64(len(instances))) + } + if recoveries, err := ReadActiveRecoveries(); err == nil { + ometrics.ActiveRecoveries.Set(float64(len(recoveries))) + } + if analysis, err := inst.GetReplicationAnalysis("", &inst.ReplicationAnalysisHints{}); err == nil { + issuesCount := make(map[string]int) + for _, a := range analysis { + issuesCount[string(a.Analysis)]++ + } + ometrics.ActiveTopologyIssues.Reset() + for issueType, count := range issuesCount { + ometrics.ActiveTopologyIssues.WithLabelValues(issueType).Set(float64(count)) + } + } + if clusters, err := inst.ReadClustersInfo(""); err == nil { + ometrics.ClustersTotal.Set(float64(len(clusters))) + var instancesTotal uint + for _, c := range clusters { + instancesTotal += c.CountInstances + } + ometrics.InstancesTotal.Set(float64(instancesTotal)) + } + }() case <-healthTick: go func() { onHealthTick() From 13d98a262cc6b85fc1455676236a57057ac125e4 Mon Sep 17 00:00:00 2001 From: "Robin F. Pronk" Date: Tue, 2 Jun 2026 12:04:10 +0200 Subject: [PATCH 7/7] Share QueueLen invocation Signed-off-by: Robin F. Pronk --- go/logic/orchestrator.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/go/logic/orchestrator.go b/go/logic/orchestrator.go index fbab1652..a4cca8c7 100644 --- a/go/logic/orchestrator.go +++ b/go/logic/orchestrator.go @@ -91,8 +91,9 @@ func init() { ometrics.OnMetricsTick(func() { if discoveryQueue != nil { - discoveryQueueLengthGauge.Update(int64(discoveryQueue.QueueLen())) - ometrics.DiscoveryQueueLength.Set(float64(discoveryQueue.QueueLen())) + queueLen := discoveryQueue.QueueLen() + discoveryQueueLengthGauge.Update(int64(queueLen)) + ometrics.DiscoveryQueueLength.Set(float64(queueLen)) } }) ometrics.OnMetricsTick(func() {