diff --git a/docs/named-channels.md b/docs/named-channels.md new file mode 100644 index 00000000..f3369a65 --- /dev/null +++ b/docs/named-channels.md @@ -0,0 +1,80 @@ +# Named Replication Channels + +MySQL 5.7+ supports multi-source replication, where a single replica can replicate from multiple masters simultaneously. Each replication connection is identified by a **named channel**. Orchestrator supports discovery, management, and failover of instances that use named replication channels. + +## Overview + +In a traditional single-source topology, a replica has exactly one `SHOW SLAVE STATUS` row. With multi-source replication, each channel appears as a separate row in `SHOW SLAVE STATUS`, each with its own IO thread, SQL thread, binlog coordinates, and lag. + +Orchestrator handles multi-source replicas by: + +1. Discovering all replication channels on each instance. +2. Selecting one channel as the **managed channel** for topology purposes. +3. Using channel-aware SQL operations (`FOR CHANNEL`) during failover and topology changes. +4. Preserving non-managed channels during promotion and recovery. + +## Discovery + +When orchestrator reads an instance's replication status via `SHOW SLAVE STATUS`, it parses all rows. Each row becomes a `ChannelStatus` entry stored in the instance's `ReplicationChannels` slice. + +For single-source instances (0 or 1 channels), behavior is identical to previous versions. For multi-source instances (2+ channels), orchestrator selects a **canonical channel** to represent the instance's primary replication relationship: + +1. The default channel (empty name `""`) is preferred. +2. Otherwise, the first non-Group-Replication-internal channel is selected. +3. As a fallback, the first channel is used. + +The selected channel's status (IO/SQL thread state, binlog coordinates, lag, master key, etc.) populates the instance's top-level replication fields. This ensures backward compatibility with all existing topology logic. + +## Managed Channel Name + +The `ManagedChannelName` field on an instance indicates which channel orchestrator manages for topology operations. When this field is empty, all operations use standard SQL without a `FOR CHANNEL` clause (single-source behavior). + +When `ManagedChannelName` is set (multi-source instances), orchestrator appends `FOR CHANNEL ''` to replication commands, ensuring only the managed channel is affected. Other channels remain untouched. + +## Channel-Aware Operations + +The following operations are channel-aware: + +- `STOP SLAVE` / `STOP REPLICA` -- stops only the managed channel +- `START SLAVE` / `START REPLICA` -- starts only the managed channel +- `CHANGE MASTER TO` -- targets only the managed channel +- `RESET SLAVE` / `RESET REPLICA` -- resets only the managed channel + +All of these append `FOR CHANNEL ''` when a channel name is specified. + +## Failover with Multi-Source Replicas + +### Dead Master Recovery + +When a master dies and orchestrator initiates recovery (`recoverDeadMaster`), replicas are regrouped using GTID or Pseudo-GTID. The underlying `StopReplication`, `ChangeMasterTo`, and `StartReplication` calls all respect the `ManagedChannelName`, so only the dead master's channel is affected on multi-source replicas. Other channels (replicating from other masters) continue operating normally. + +### Candidate Selection + +A multi-source replica where the dead master is one of its replication channels is a valid promotion candidate. During promotion, only the managed channel is modified; all other channels are preserved. + +### Graceful Master Takeover + +`GracefulMasterTakeover` uses `ChangeMasterToForChannel` and `StartReplicationForChannel` with the managed channel name. This ensures that when the demoted master is reconfigured to replicate from the promoted instance, only the relevant channel is set up, and any other channels on the demoted master remain intact. + +## API Endpoints + +### V1 API + +- `GET /api/instance-channels/{host}/{port}` -- Returns the `ReplicationChannels` slice as JSON for the given instance. Each entry includes channel name, master key, IO/SQL thread state, binlog coordinates, lag, and error information. + +- `GET /api/instance/{host}/{port}` -- The standard instance endpoint now includes `ReplicationChannels` and `ManagedChannelName` in its JSON response. + +### V2 API + +- `GET /api/v2/instances/{host}/{port}/channels` -- Returns channels in the V2 response envelope (`{"status": "ok", "data": [...]}`). + +## Group Replication Channels + +Group Replication uses internal channels named `group_replication_applier` and `group_replication_recovery`. These are automatically detected and excluded from canonical channel selection. Orchestrator will not select a GR internal channel as the managed channel unless no other channels exist. + +## Limitations + +- Orchestrator manages exactly one channel per instance for topology purposes. Manual management of other channels is expected. +- Channel-aware operations require MySQL 5.7+ or MariaDB 10.1+ (which support the `FOR CHANNEL` syntax). +- The backend database stores channel information in the `database_instance_channels` table. Ensure schema migrations have been applied. +- Multi-source replicas where multiple channels point to the same cluster may cause unexpected behavior in topology analysis. diff --git a/go/http/api.go b/go/http/api.go index 2096b89d..cd2ea45d 100644 --- a/go/http/api.go +++ b/go/http/api.go @@ -251,6 +251,23 @@ func (this *HttpAPI) Instance(w http.ResponseWriter, r *http.Request) { renderJSON(w, http.StatusOK, instance) } +// InstanceChannels returns the replication channels for a given instance. +// For multi-source replicas, this returns all named replication channels and their status. +func (this *HttpAPI) InstanceChannels(w http.ResponseWriter, r *http.Request) { + instanceKey, err := this.getInstanceKey(chi.URLParam(r, "host"), chi.URLParam(r, "port")) + + if err != nil { + Respond(w, &APIResponse{Code: ERROR, Message: err.Error()}) + return + } + instance, found, err := inst.ReadInstance(&instanceKey) + if (!found) || (err != nil) { + Respond(w, &APIResponse{Code: ERROR, Message: fmt.Sprintf("Cannot read instance: %+v", instanceKey)}) + return + } + renderJSON(w, http.StatusOK, instance.ReplicationChannels) +} + // AsyncDiscover issues an asynchronous read on an instance. This is // useful for bulk loads of a new set of instances and will not block // if the instance is slow to respond or not reachable. @@ -3926,6 +3943,7 @@ func (this *HttpAPI) RegisterRequests(router chi.Router) { this.registerAPIRequest(router, "masters", this.Masters) this.registerAPIRequest(router, "master/{clusterHint}", this.ClusterMaster) this.registerAPIRequest(router, "instance-replicas/{host}/{port}", this.InstanceReplicas) + this.registerAPIRequest(router, "instance-channels/{host}/{port}", this.InstanceChannels) this.registerAPIRequest(router, "all-instances", this.AllInstances) this.registerAPIRequest(router, "downtimed", this.Downtimed) this.registerAPIRequest(router, "downtimed/{clusterHint}", this.Downtimed) diff --git a/go/http/apiv2.go b/go/http/apiv2.go index 8b80b707..746553f9 100644 --- a/go/http/apiv2.go +++ b/go/http/apiv2.go @@ -80,6 +80,7 @@ func RegisterV2Routes(r chi.Router) { // Instance endpoints r.Get("/instances/{host}/{port}", V2Instance) + r.Get("/instances/{host}/{port}/channels", V2InstanceChannels) // Recovery endpoints r.Get("/recoveries", V2Recoveries) @@ -219,6 +220,34 @@ func V2Status(w http.ResponseWriter, r *http.Request) { respondOK(w, health) } +// V2InstanceChannels returns the replication channels for a specific MySQL instance. +// For multi-source replicas, this returns all named replication channels and their status. +func V2InstanceChannels(w http.ResponseWriter, r *http.Request) { + host := chi.URLParam(r, "host") + port := chi.URLParam(r, "port") + + instanceKey, err := inst.NewResolveInstanceKeyStrings(host, port) + if err != nil { + respondError(w, http.StatusBadRequest, "INVALID_INSTANCE", fmt.Sprintf("Invalid instance key: %v", err)) + return + } + instanceKey, err = inst.FigureInstanceKey(instanceKey, nil) + if err != nil { + respondError(w, http.StatusBadRequest, "INVALID_INSTANCE", fmt.Sprintf("Cannot resolve instance: %v", err)) + return + } + instance, found, err := inst.ReadInstance(instanceKey) + if err != nil { + respondError(w, http.StatusInternalServerError, "INSTANCE_READ_ERROR", fmt.Sprintf("Failed to read instance: %v", err)) + return + } + if !found { + respondNotFound(w, fmt.Sprintf("Instance not found: %s:%s", host, port)) + return + } + respondOK(w, instance.ReplicationChannels) +} + // V2ProxySQLServers returns all servers from ProxySQL's runtime_mysql_servers table. func V2ProxySQLServers(w http.ResponseWriter, r *http.Request) { hook := proxysql.GetHook() diff --git a/go/inst/channel_test.go b/go/inst/channel_test.go new file mode 100644 index 00000000..5cee61b1 --- /dev/null +++ b/go/inst/channel_test.go @@ -0,0 +1,211 @@ +package inst + +import ( + "database/sql" + "testing" + + test "github.com/proxysql/golib/tests" +) + +func TestSelectCanonicalChannelIndexEmpty(t *testing.T) { + channels := []ChannelStatus{} + idx := selectCanonicalChannelIndex(channels) + test.S(t).ExpectEquals(idx, -1) +} + +func TestSelectCanonicalChannelIndexSingleDefault(t *testing.T) { + channels := []ChannelStatus{ + {ChannelName: "", MasterKey: InstanceKey{Hostname: "master1", Port: 3306}}, + } + idx := selectCanonicalChannelIndex(channels) + test.S(t).ExpectEquals(idx, 0) +} + +func TestSelectCanonicalChannelIndexPrefersDefault(t *testing.T) { + channels := []ChannelStatus{ + {ChannelName: "channel_a", MasterKey: InstanceKey{Hostname: "master1", Port: 3306}}, + {ChannelName: "", MasterKey: InstanceKey{Hostname: "master2", Port: 3306}}, + {ChannelName: "channel_b", MasterKey: InstanceKey{Hostname: "master3", Port: 3306}}, + } + idx := selectCanonicalChannelIndex(channels) + test.S(t).ExpectEquals(idx, 1) +} + +func TestSelectCanonicalChannelIndexSkipsGRChannels(t *testing.T) { + channels := []ChannelStatus{ + {ChannelName: "group_replication_applier", MasterKey: InstanceKey{Hostname: "master1", Port: 3306}}, + {ChannelName: "group_replication_recovery", MasterKey: InstanceKey{Hostname: "master1", Port: 3306}}, + {ChannelName: "my_channel", MasterKey: InstanceKey{Hostname: "master2", Port: 3306}}, + } + idx := selectCanonicalChannelIndex(channels) + test.S(t).ExpectEquals(idx, 2) +} + +func TestSelectCanonicalChannelIndexAllGR(t *testing.T) { + channels := []ChannelStatus{ + {ChannelName: "group_replication_applier", MasterKey: InstanceKey{Hostname: "master1", Port: 3306}}, + {ChannelName: "group_replication_recovery", MasterKey: InstanceKey{Hostname: "master1", Port: 3306}}, + } + // When only GR channels exist, falls back to index 0 + idx := selectCanonicalChannelIndex(channels) + test.S(t).ExpectEquals(idx, 0) +} + +func TestSelectCanonicalChannelIndexMultipleNamed(t *testing.T) { + channels := []ChannelStatus{ + {ChannelName: "channel_a", MasterKey: InstanceKey{Hostname: "master1", Port: 3306}}, + {ChannelName: "channel_b", MasterKey: InstanceKey{Hostname: "master2", Port: 3306}}, + } + // No default channel, no GR channels: picks the first one + idx := selectCanonicalChannelIndex(channels) + test.S(t).ExpectEquals(idx, 0) +} + +func TestIsGRInternalChannel(t *testing.T) { + cs := ChannelStatus{ChannelName: "group_replication_applier"} + test.S(t).ExpectTrue(cs.IsGRInternalChannel()) + + cs2 := ChannelStatus{ChannelName: "group_replication_recovery"} + test.S(t).ExpectTrue(cs2.IsGRInternalChannel()) + + cs3 := ChannelStatus{ChannelName: "my_channel"} + test.S(t).ExpectFalse(cs3.IsGRInternalChannel()) + + cs4 := ChannelStatus{ChannelName: ""} + test.S(t).ExpectFalse(cs4.IsGRInternalChannel()) +} + +func TestForChannelClause(t *testing.T) { + test.S(t).ExpectEquals(forChannelClause(""), "") + test.S(t).ExpectEquals(forChannelClause("my_channel"), " FOR CHANNEL 'my_channel'") + test.S(t).ExpectEquals(forChannelClause("group_replication_applier"), " FOR CHANNEL 'group_replication_applier'") +} + +func TestSingleSourceBehaviorUnchanged(t *testing.T) { + // Verify that an instance with no replication channels behaves identically + // to pre-channel-support behavior + instance := NewInstance() + instance.Key = InstanceKey{Hostname: "replica1", Port: 3306} + instance.MasterKey = InstanceKey{Hostname: "master1", Port: 3306} + instance.ReadBinlogCoordinates = BinlogCoordinates{LogFile: "mysql-bin.000001", LogPos: 100} + instance.Version = "5.7.35" + + // No channels set + test.S(t).ExpectEquals(len(instance.ReplicationChannels), 0) + test.S(t).ExpectEquals(instance.ManagedChannelName, "") + test.S(t).ExpectTrue(instance.IsReplica()) +} + +func TestMultiSourceInstanceManagedChannel(t *testing.T) { + // Verify that when ReplicationChannels has multiple entries and we pick canonical, + // the ManagedChannelName reflects the chosen channel + channels := []ChannelStatus{ + { + ChannelName: "channel_a", + MasterKey: InstanceKey{Hostname: "master1", Port: 3306}, + ReplicationIOThreadRunning: true, + ReplicationSQLThreadRunning: true, + SecondsBehindMaster: sql.NullInt64{Int64: 0, Valid: true}, + }, + { + ChannelName: "channel_b", + MasterKey: InstanceKey{Hostname: "master2", Port: 3306}, + ReplicationIOThreadRunning: true, + ReplicationSQLThreadRunning: true, + SecondsBehindMaster: sql.NullInt64{Int64: 5, Valid: true}, + }, + } + + idx := selectCanonicalChannelIndex(channels) + test.S(t).ExpectEquals(idx, 0) + + // The canonical channel should be channel_a (first non-GR channel) + ch := channels[idx] + test.S(t).ExpectEquals(ch.ChannelName, "channel_a") + test.S(t).ExpectEquals(ch.MasterKey.Hostname, "master1") +} + +func TestChannelAwareSQLGeneration(t *testing.T) { + qsp := GetQueryStringProvider("5.7.35") + + // Without channel name -- no FOR CHANNEL clause + stopSQL := qsp.StopReplicaForChannel("") + test.S(t).ExpectTrue(len(stopSQL) > 0) + test.S(t).ExpectEquals(stopSQL, qsp.stop_slave()) + + startSQL := qsp.StartReplicaForChannel("") + test.S(t).ExpectEquals(startSQL, qsp.start_slave()) + + // With channel name -- should include FOR CHANNEL clause + stopSQLCh := qsp.StopReplicaForChannel("my_channel") + test.S(t).ExpectTrue(len(stopSQLCh) > len(stopSQL)) + expectedStop := qsp.stop_slave() + " FOR CHANNEL 'my_channel'" + test.S(t).ExpectEquals(stopSQLCh, expectedStop) + + startSQLCh := qsp.StartReplicaForChannel("my_channel") + expectedStart := qsp.start_slave() + " FOR CHANNEL 'my_channel'" + test.S(t).ExpectEquals(startSQLCh, expectedStart) + + resetSQLCh := qsp.ResetReplicaForChannel("my_channel") + expectedReset := qsp.reset_slave() + " FOR CHANNEL 'my_channel'" + test.S(t).ExpectEquals(resetSQLCh, expectedReset) +} + +func TestChannelAwareSQLGeneration84(t *testing.T) { + // Test with MySQL 8.4+ which uses "stop replica" / "start replica" syntax + qsp := GetQueryStringProvider("8.4.0") + + stopSQL := qsp.StopReplicaForChannel("my_channel") + test.S(t).ExpectEquals(stopSQL, "stop replica FOR CHANNEL 'my_channel'") + + startSQL := qsp.StartReplicaForChannel("my_channel") + test.S(t).ExpectEquals(startSQL, "start replica FOR CHANNEL 'my_channel'") + + resetSQL := qsp.ResetReplicaForChannel("my_channel") + test.S(t).ExpectEquals(resetSQL, "reset replica FOR CHANNEL 'my_channel'") +} + +func TestChannelAwareIOSQLThreadOperations(t *testing.T) { + qsp := GetQueryStringProvider("5.7.35") + + // IO thread operations + stopIO := qsp.StopReplicaIOThreadForChannel("ch1") + test.S(t).ExpectEquals(stopIO, "stop slave io_thread FOR CHANNEL 'ch1'") + + startIO := qsp.StartReplicaIOThreadForChannel("ch1") + test.S(t).ExpectEquals(startIO, "start slave io_thread FOR CHANNEL 'ch1'") + + // SQL thread operations + stopSQLThread := qsp.StopReplicaSQLThreadForChannel("ch1") + test.S(t).ExpectEquals(stopSQLThread, "stop slave sql_thread FOR CHANNEL 'ch1'") + + startSQLThread := qsp.StartReplicaSQLThreadForChannel("ch1") + test.S(t).ExpectEquals(startSQLThread, "start slave sql_thread FOR CHANNEL 'ch1'") + + // Without channel name -- no FOR CHANNEL + stopIODefault := qsp.StopReplicaIOThreadForChannel("") + test.S(t).ExpectEquals(stopIODefault, "stop slave io_thread") +} + +func TestSelectCanonicalChannelWithDefaultAndGR(t *testing.T) { + // When default channel exists alongside GR channels, prefer default + channels := []ChannelStatus{ + {ChannelName: "group_replication_applier"}, + {ChannelName: ""}, + {ChannelName: "group_replication_recovery"}, + } + idx := selectCanonicalChannelIndex(channels) + test.S(t).ExpectEquals(idx, 1) +} + +func TestSelectCanonicalChannelWithNamedAndGR(t *testing.T) { + // When no default channel but named + GR channels exist, prefer the named one + channels := []ChannelStatus{ + {ChannelName: "group_replication_applier"}, + {ChannelName: "group_replication_recovery"}, + {ChannelName: "custom_repl"}, + {ChannelName: "another_repl"}, + } + idx := selectCanonicalChannelIndex(channels) + test.S(t).ExpectEquals(idx, 2) +} diff --git a/go/logic/topology_recovery.go b/go/logic/topology_recovery.go index bf6ba1aa..6f375672 100644 --- a/go/logic/topology_recovery.go +++ b/go/logic/topology_recovery.go @@ -2228,7 +2228,9 @@ func GracefulMasterTakeover(clusterName string, designatedKey *inst.InstanceKey, if topologyRecovery.RecoveryType == MasterRecoveryGTID { gtidHint = inst.GTIDHintForce } - clusterMaster, err = inst.ChangeMasterTo(&clusterMaster.Key, &designatedInstance.Key, promotedMasterCoordinates, false, gtidHint) + // Use channel-aware operations to preserve other replication channels on multi-source replicas + managedChannel := clusterMaster.ManagedChannelName + clusterMaster, err = inst.ChangeMasterToForChannel(&clusterMaster.Key, &designatedInstance.Key, promotedMasterCoordinates, false, gtidHint, managedChannel) if !clusterMaster.SelfBinlogCoordinates.Equals(demotedMasterSelfBinlogCoordinates) { log.Errorf("GracefulMasterTakeover: sanity problem. Demoted master's coordinates changed from %+v to %+v while supposed to have been frozen", *demotedMasterSelfBinlogCoordinates, clusterMaster.SelfBinlogCoordinates) } @@ -2250,7 +2252,7 @@ func GracefulMasterTakeover(clusterName string, designatedKey *inst.InstanceKey, } } if auto { - _, startReplicationErr := inst.StartReplication(&clusterMaster.Key) + _, startReplicationErr := inst.StartReplicationForChannel(&clusterMaster.Key, managedChannel) if err == nil { err = startReplicationErr }