Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions docs/named-channels.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Named Replication Channels

MySQL 5.7+ supports multi-source replication, where a single replica can replicate from multiple masters simultaneously. Each replication connection is identified by a **named channel**. Orchestrator supports discovery, management, and failover of instances that use named replication channels.

## Overview

In a traditional single-source topology, a replica has exactly one `SHOW SLAVE STATUS` row. With multi-source replication, each channel appears as a separate row in `SHOW SLAVE STATUS`, each with its own IO thread, SQL thread, binlog coordinates, and lag.

Orchestrator handles multi-source replicas by:

1. Discovering all replication channels on each instance.
2. Selecting one channel as the **managed channel** for topology purposes.
3. Using channel-aware SQL operations (`FOR CHANNEL`) during failover and topology changes.
4. Preserving non-managed channels during promotion and recovery.

## Discovery

When orchestrator reads an instance's replication status via `SHOW SLAVE STATUS`, it parses all rows. Each row becomes a `ChannelStatus` entry stored in the instance's `ReplicationChannels` slice.

For single-source instances (0 or 1 channels), behavior is identical to previous versions. For multi-source instances (2+ channels), orchestrator selects a **canonical channel** to represent the instance's primary replication relationship:

1. The default channel (empty name `""`) is preferred.
2. Otherwise, the first non-Group-Replication-internal channel is selected.
3. As a fallback, the first channel is used.

The selected channel's status (IO/SQL thread state, binlog coordinates, lag, master key, etc.) populates the instance's top-level replication fields. This ensures backward compatibility with all existing topology logic.

## Managed Channel Name

The `ManagedChannelName` field on an instance indicates which channel orchestrator manages for topology operations. When this field is empty, all operations use standard SQL without a `FOR CHANNEL` clause (single-source behavior).

When `ManagedChannelName` is set (multi-source instances), orchestrator appends `FOR CHANNEL '<name>'` to replication commands, ensuring only the managed channel is affected. Other channels remain untouched.

## Channel-Aware Operations

The following operations are channel-aware:

- `STOP SLAVE` / `STOP REPLICA` -- stops only the managed channel
- `START SLAVE` / `START REPLICA` -- starts only the managed channel
- `CHANGE MASTER TO` -- targets only the managed channel
- `RESET SLAVE` / `RESET REPLICA` -- resets only the managed channel

All of these append `FOR CHANNEL '<name>'` when a channel name is specified.

## Failover with Multi-Source Replicas

### Dead Master Recovery

When a master dies and orchestrator initiates recovery (`recoverDeadMaster`), replicas are regrouped using GTID or Pseudo-GTID. The underlying `StopReplication`, `ChangeMasterTo`, and `StartReplication` calls all respect the `ManagedChannelName`, so only the dead master's channel is affected on multi-source replicas. Other channels (replicating from other masters) continue operating normally.

### Candidate Selection

A multi-source replica where the dead master is one of its replication channels is a valid promotion candidate. During promotion, only the managed channel is modified; all other channels are preserved.

### Graceful Master Takeover

`GracefulMasterTakeover` uses `ChangeMasterToForChannel` and `StartReplicationForChannel` with the managed channel name. This ensures that when the demoted master is reconfigured to replicate from the promoted instance, only the relevant channel is set up, and any other channels on the demoted master remain intact.

## API Endpoints

### V1 API

- `GET /api/instance-channels/{host}/{port}` -- Returns the `ReplicationChannels` slice as JSON for the given instance. Each entry includes channel name, master key, IO/SQL thread state, binlog coordinates, lag, and error information.

- `GET /api/instance/{host}/{port}` -- The standard instance endpoint now includes `ReplicationChannels` and `ManagedChannelName` in its JSON response.

### V2 API

- `GET /api/v2/instances/{host}/{port}/channels` -- Returns channels in the V2 response envelope (`{"status": "ok", "data": [...]}`).

## Group Replication Channels

Group Replication uses internal channels named `group_replication_applier` and `group_replication_recovery`. These are automatically detected and excluded from canonical channel selection. Orchestrator will not select a GR internal channel as the managed channel unless no other channels exist.

## Limitations

- Orchestrator manages exactly one channel per instance for topology purposes. Manual management of other channels is expected.
- Channel-aware operations require MySQL 5.7+ or MariaDB 10.1+ (which support the `FOR CHANNEL` syntax).
- The backend database stores channel information in the `database_instance_channels` table. Ensure schema migrations have been applied.
- Multi-source replicas where multiple channels point to the same cluster may cause unexpected behavior in topology analysis.
18 changes: 18 additions & 0 deletions go/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,23 @@
renderJSON(w, http.StatusOK, instance)
}

// InstanceChannels returns the replication channels for a given instance.
// For multi-source replicas, this returns all named replication channels and their status.
func (this *HttpAPI) InstanceChannels(w http.ResponseWriter, r *http.Request) {

Check failure on line 256 in go/http/api.go

View workflow job for this annotation

GitHub Actions / lint

ST1006: receiver name should be a reflection of its identity; don't use generic names such as "this" or "self" (staticcheck)
instanceKey, err := this.getInstanceKey(chi.URLParam(r, "host"), chi.URLParam(r, "port"))

if err != nil {
Respond(w, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
instance, found, err := inst.ReadInstance(&instanceKey)
if (!found) || (err != nil) {
Respond(w, &APIResponse{Code: ERROR, Message: fmt.Sprintf("Cannot read instance: %+v", instanceKey)})
return
}
renderJSON(w, http.StatusOK, instance.ReplicationChannels)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If the instance has no replication channels, instance.ReplicationChannels may be nil, causing the API to return null instead of an empty array []. For consistency with other collection-returning endpoints in the V1 API, it is better to ensure an empty slice is returned.

	channels := instance.ReplicationChannels
	if channels == nil {
		channels = []inst.ChannelStatus{}
	}
	renderJSON(w, http.StatusOK, channels)

}

// AsyncDiscover issues an asynchronous read on an instance. This is
// useful for bulk loads of a new set of instances and will not block
// if the instance is slow to respond or not reachable.
Expand Down Expand Up @@ -3926,6 +3943,7 @@
this.registerAPIRequest(router, "masters", this.Masters)
this.registerAPIRequest(router, "master/{clusterHint}", this.ClusterMaster)
this.registerAPIRequest(router, "instance-replicas/{host}/{port}", this.InstanceReplicas)
this.registerAPIRequest(router, "instance-channels/{host}/{port}", this.InstanceChannels)
this.registerAPIRequest(router, "all-instances", this.AllInstances)
this.registerAPIRequest(router, "downtimed", this.Downtimed)
this.registerAPIRequest(router, "downtimed/{clusterHint}", this.Downtimed)
Expand Down
29 changes: 29 additions & 0 deletions go/http/apiv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func RegisterV2Routes(r chi.Router) {

// Instance endpoints
r.Get("/instances/{host}/{port}", V2Instance)
r.Get("/instances/{host}/{port}/channels", V2InstanceChannels)

// Recovery endpoints
r.Get("/recoveries", V2Recoveries)
Expand Down Expand Up @@ -219,6 +220,34 @@ func V2Status(w http.ResponseWriter, r *http.Request) {
respondOK(w, health)
}

// V2InstanceChannels returns the replication channels for a specific MySQL instance.
// For multi-source replicas, this returns all named replication channels and their status.
func V2InstanceChannels(w http.ResponseWriter, r *http.Request) {
host := chi.URLParam(r, "host")
port := chi.URLParam(r, "port")

instanceKey, err := inst.NewResolveInstanceKeyStrings(host, port)
if err != nil {
respondError(w, http.StatusBadRequest, "INVALID_INSTANCE", fmt.Sprintf("Invalid instance key: %v", err))
return
}
instanceKey, err = inst.FigureInstanceKey(instanceKey, nil)
if err != nil {
respondError(w, http.StatusBadRequest, "INVALID_INSTANCE", fmt.Sprintf("Cannot resolve instance: %v", err))
return
}
instance, found, err := inst.ReadInstance(instanceKey)
if err != nil {
respondError(w, http.StatusInternalServerError, "INSTANCE_READ_ERROR", fmt.Sprintf("Failed to read instance: %v", err))
return
}
if !found {
respondNotFound(w, fmt.Sprintf("Instance not found: %s:%s", host, port))
return
}
respondOK(w, instance.ReplicationChannels)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the V1 API, instance.ReplicationChannels might be nil if no channels exist, resulting in a null value in the JSON response. Initializing it to an empty slice ensures a more predictable API contract for clients expecting an array.

Suggested change
respondOK(w, instance.ReplicationChannels)
channels := instance.ReplicationChannels
if channels == nil {
channels = []inst.ChannelStatus{}
}
respondOK(w, channels)

}

// V2ProxySQLServers returns all servers from ProxySQL's runtime_mysql_servers table.
func V2ProxySQLServers(w http.ResponseWriter, r *http.Request) {
hook := proxysql.GetHook()
Expand Down
211 changes: 211 additions & 0 deletions go/inst/channel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package inst

import (
"database/sql"
"testing"

test "github.com/proxysql/golib/tests"
)

func TestSelectCanonicalChannelIndexEmpty(t *testing.T) {
channels := []ChannelStatus{}
idx := selectCanonicalChannelIndex(channels)
test.S(t).ExpectEquals(idx, -1)
}

func TestSelectCanonicalChannelIndexSingleDefault(t *testing.T) {
channels := []ChannelStatus{
{ChannelName: "", MasterKey: InstanceKey{Hostname: "master1", Port: 3306}},
}
idx := selectCanonicalChannelIndex(channels)
test.S(t).ExpectEquals(idx, 0)
}

func TestSelectCanonicalChannelIndexPrefersDefault(t *testing.T) {
channels := []ChannelStatus{
{ChannelName: "channel_a", MasterKey: InstanceKey{Hostname: "master1", Port: 3306}},
{ChannelName: "", MasterKey: InstanceKey{Hostname: "master2", Port: 3306}},
{ChannelName: "channel_b", MasterKey: InstanceKey{Hostname: "master3", Port: 3306}},
}
idx := selectCanonicalChannelIndex(channels)
test.S(t).ExpectEquals(idx, 1)
}

func TestSelectCanonicalChannelIndexSkipsGRChannels(t *testing.T) {
channels := []ChannelStatus{
{ChannelName: "group_replication_applier", MasterKey: InstanceKey{Hostname: "master1", Port: 3306}},
{ChannelName: "group_replication_recovery", MasterKey: InstanceKey{Hostname: "master1", Port: 3306}},
{ChannelName: "my_channel", MasterKey: InstanceKey{Hostname: "master2", Port: 3306}},
}
idx := selectCanonicalChannelIndex(channels)
test.S(t).ExpectEquals(idx, 2)
}

func TestSelectCanonicalChannelIndexAllGR(t *testing.T) {
channels := []ChannelStatus{
{ChannelName: "group_replication_applier", MasterKey: InstanceKey{Hostname: "master1", Port: 3306}},
{ChannelName: "group_replication_recovery", MasterKey: InstanceKey{Hostname: "master1", Port: 3306}},
}
// When only GR channels exist, falls back to index 0
idx := selectCanonicalChannelIndex(channels)
test.S(t).ExpectEquals(idx, 0)
}

func TestSelectCanonicalChannelIndexMultipleNamed(t *testing.T) {
channels := []ChannelStatus{
{ChannelName: "channel_a", MasterKey: InstanceKey{Hostname: "master1", Port: 3306}},
{ChannelName: "channel_b", MasterKey: InstanceKey{Hostname: "master2", Port: 3306}},
}
// No default channel, no GR channels: picks the first one
idx := selectCanonicalChannelIndex(channels)
test.S(t).ExpectEquals(idx, 0)
}

func TestIsGRInternalChannel(t *testing.T) {
cs := ChannelStatus{ChannelName: "group_replication_applier"}
test.S(t).ExpectTrue(cs.IsGRInternalChannel())

cs2 := ChannelStatus{ChannelName: "group_replication_recovery"}
test.S(t).ExpectTrue(cs2.IsGRInternalChannel())

cs3 := ChannelStatus{ChannelName: "my_channel"}
test.S(t).ExpectFalse(cs3.IsGRInternalChannel())

cs4 := ChannelStatus{ChannelName: ""}
test.S(t).ExpectFalse(cs4.IsGRInternalChannel())
}

func TestForChannelClause(t *testing.T) {
test.S(t).ExpectEquals(forChannelClause(""), "")
test.S(t).ExpectEquals(forChannelClause("my_channel"), " FOR CHANNEL 'my_channel'")
test.S(t).ExpectEquals(forChannelClause("group_replication_applier"), " FOR CHANNEL 'group_replication_applier'")
}

func TestSingleSourceBehaviorUnchanged(t *testing.T) {
// Verify that an instance with no replication channels behaves identically
// to pre-channel-support behavior
instance := NewInstance()
instance.Key = InstanceKey{Hostname: "replica1", Port: 3306}
instance.MasterKey = InstanceKey{Hostname: "master1", Port: 3306}
instance.ReadBinlogCoordinates = BinlogCoordinates{LogFile: "mysql-bin.000001", LogPos: 100}
instance.Version = "5.7.35"

// No channels set
test.S(t).ExpectEquals(len(instance.ReplicationChannels), 0)
test.S(t).ExpectEquals(instance.ManagedChannelName, "")
test.S(t).ExpectTrue(instance.IsReplica())
}

func TestMultiSourceInstanceManagedChannel(t *testing.T) {
// Verify that when ReplicationChannels has multiple entries and we pick canonical,
// the ManagedChannelName reflects the chosen channel
channels := []ChannelStatus{
{
ChannelName: "channel_a",
MasterKey: InstanceKey{Hostname: "master1", Port: 3306},
ReplicationIOThreadRunning: true,
ReplicationSQLThreadRunning: true,
SecondsBehindMaster: sql.NullInt64{Int64: 0, Valid: true},
},
{
ChannelName: "channel_b",
MasterKey: InstanceKey{Hostname: "master2", Port: 3306},
ReplicationIOThreadRunning: true,
ReplicationSQLThreadRunning: true,
SecondsBehindMaster: sql.NullInt64{Int64: 5, Valid: true},
},
}

idx := selectCanonicalChannelIndex(channels)
test.S(t).ExpectEquals(idx, 0)

// The canonical channel should be channel_a (first non-GR channel)
ch := channels[idx]
test.S(t).ExpectEquals(ch.ChannelName, "channel_a")
test.S(t).ExpectEquals(ch.MasterKey.Hostname, "master1")
}

func TestChannelAwareSQLGeneration(t *testing.T) {
qsp := GetQueryStringProvider("5.7.35")

// Without channel name -- no FOR CHANNEL clause
stopSQL := qsp.StopReplicaForChannel("")
test.S(t).ExpectTrue(len(stopSQL) > 0)
test.S(t).ExpectEquals(stopSQL, qsp.stop_slave())

startSQL := qsp.StartReplicaForChannel("")
test.S(t).ExpectEquals(startSQL, qsp.start_slave())

// With channel name -- should include FOR CHANNEL clause
stopSQLCh := qsp.StopReplicaForChannel("my_channel")
test.S(t).ExpectTrue(len(stopSQLCh) > len(stopSQL))
expectedStop := qsp.stop_slave() + " FOR CHANNEL 'my_channel'"
test.S(t).ExpectEquals(stopSQLCh, expectedStop)

startSQLCh := qsp.StartReplicaForChannel("my_channel")
expectedStart := qsp.start_slave() + " FOR CHANNEL 'my_channel'"
test.S(t).ExpectEquals(startSQLCh, expectedStart)

resetSQLCh := qsp.ResetReplicaForChannel("my_channel")
expectedReset := qsp.reset_slave() + " FOR CHANNEL 'my_channel'"
test.S(t).ExpectEquals(resetSQLCh, expectedReset)
}

func TestChannelAwareSQLGeneration84(t *testing.T) {
// Test with MySQL 8.4+ which uses "stop replica" / "start replica" syntax
qsp := GetQueryStringProvider("8.4.0")

stopSQL := qsp.StopReplicaForChannel("my_channel")
test.S(t).ExpectEquals(stopSQL, "stop replica FOR CHANNEL 'my_channel'")

startSQL := qsp.StartReplicaForChannel("my_channel")
test.S(t).ExpectEquals(startSQL, "start replica FOR CHANNEL 'my_channel'")

resetSQL := qsp.ResetReplicaForChannel("my_channel")
test.S(t).ExpectEquals(resetSQL, "reset replica FOR CHANNEL 'my_channel'")
}

func TestChannelAwareIOSQLThreadOperations(t *testing.T) {
qsp := GetQueryStringProvider("5.7.35")

// IO thread operations
stopIO := qsp.StopReplicaIOThreadForChannel("ch1")
test.S(t).ExpectEquals(stopIO, "stop slave io_thread FOR CHANNEL 'ch1'")

startIO := qsp.StartReplicaIOThreadForChannel("ch1")
test.S(t).ExpectEquals(startIO, "start slave io_thread FOR CHANNEL 'ch1'")

// SQL thread operations
stopSQLThread := qsp.StopReplicaSQLThreadForChannel("ch1")
test.S(t).ExpectEquals(stopSQLThread, "stop slave sql_thread FOR CHANNEL 'ch1'")

startSQLThread := qsp.StartReplicaSQLThreadForChannel("ch1")
test.S(t).ExpectEquals(startSQLThread, "start slave sql_thread FOR CHANNEL 'ch1'")

// Without channel name -- no FOR CHANNEL
stopIODefault := qsp.StopReplicaIOThreadForChannel("")
test.S(t).ExpectEquals(stopIODefault, "stop slave io_thread")
}

func TestSelectCanonicalChannelWithDefaultAndGR(t *testing.T) {
// When default channel exists alongside GR channels, prefer default
channels := []ChannelStatus{
{ChannelName: "group_replication_applier"},
{ChannelName: ""},
{ChannelName: "group_replication_recovery"},
}
idx := selectCanonicalChannelIndex(channels)
test.S(t).ExpectEquals(idx, 1)
}

func TestSelectCanonicalChannelWithNamedAndGR(t *testing.T) {
// When no default channel but named + GR channels exist, prefer the named one
channels := []ChannelStatus{
{ChannelName: "group_replication_applier"},
{ChannelName: "group_replication_recovery"},
{ChannelName: "custom_repl"},
{ChannelName: "another_repl"},
}
idx := selectCanonicalChannelIndex(channels)
test.S(t).ExpectEquals(idx, 2)
}
6 changes: 4 additions & 2 deletions go/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2228,7 +2228,9 @@ func GracefulMasterTakeover(clusterName string, designatedKey *inst.InstanceKey,
if topologyRecovery.RecoveryType == MasterRecoveryGTID {
gtidHint = inst.GTIDHintForce
}
clusterMaster, err = inst.ChangeMasterTo(&clusterMaster.Key, &designatedInstance.Key, promotedMasterCoordinates, false, gtidHint)
// Use channel-aware operations to preserve other replication channels on multi-source replicas
managedChannel := clusterMaster.ManagedChannelName
clusterMaster, err = inst.ChangeMasterToForChannel(&clusterMaster.Key, &designatedInstance.Key, promotedMasterCoordinates, false, gtidHint, managedChannel)
Comment on lines +2232 to +2233

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The assignment to clusterMaster at line 2233 can potentially result in a nil pointer if inst.ChangeMasterToForChannel fails to read the instance after the operation. This would cause a panic on the subsequent line when accessing clusterMaster.SelfBinlogCoordinates. It is safer to use a temporary variable and only update clusterMaster if the result is non-nil, ensuring the pointer remains valid for the sanity check and subsequent replication operations.

	managedChannel := clusterMaster.ManagedChannelName
	if updatedMaster, changeErr := inst.ChangeMasterToForChannel(&clusterMaster.Key, &designatedInstance.Key, promotedMasterCoordinates, false, gtidHint, managedChannel); updatedMaster != nil {
		clusterMaster = updatedMaster
		err = changeErr
	} else {
		err = changeErr
	}

if !clusterMaster.SelfBinlogCoordinates.Equals(demotedMasterSelfBinlogCoordinates) {
log.Errorf("GracefulMasterTakeover: sanity problem. Demoted master's coordinates changed from %+v to %+v while supposed to have been frozen", *demotedMasterSelfBinlogCoordinates, clusterMaster.SelfBinlogCoordinates)
}
Expand All @@ -2250,7 +2252,7 @@ func GracefulMasterTakeover(clusterName string, designatedKey *inst.InstanceKey,
}
}
if auto {
_, startReplicationErr := inst.StartReplication(&clusterMaster.Key)
_, startReplicationErr := inst.StartReplicationForChannel(&clusterMaster.Key, managedChannel)
if err == nil {
err = startReplicationErr
}
Expand Down
Loading