diff --git a/go/db/generate_patches.go b/go/db/generate_patches.go index 7d1945b5..51f05963 100644 --- a/go/db/generate_patches.go +++ b/go/db/generate_patches.go @@ -622,4 +622,22 @@ var generateSQLPatches = []string{ database_instance ADD COLUMN provider_type varchar(20) CHARACTER SET ascii NOT NULL DEFAULT 'mysql' AFTER replication_group_primary_port `, + // Multi-source replication (named channels) support + ` + CREATE TABLE IF NOT EXISTS database_instance_channels ( + hostname varchar(128) NOT NULL, + port smallint(5) unsigned NOT NULL, + channel_name varchar(128) NOT NULL, + master_host varchar(128) NOT NULL, + master_port smallint(5) unsigned NOT NULL, + master_uuid varchar(64) NOT NULL DEFAULT '', + replication_io_running tinyint NOT NULL DEFAULT 0, + replication_sql_running tinyint NOT NULL DEFAULT 0, + seconds_behind_master bigint DEFAULT NULL, + last_io_error text NOT NULL, + last_sql_error text NOT NULL, + last_seen timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (hostname, port, channel_name) + ) + `, } diff --git a/go/inst/instance.go b/go/inst/instance.go index 8fc5d1a9..1b19d8f6 100644 --- a/go/inst/instance.go +++ b/go/inst/instance.go @@ -31,6 +31,35 @@ import ( const ReasonableDiscoveryLatency = 500 * time.Millisecond +// ChannelStatus represents the replication status of a single named channel. +// In multi-source replication (MySQL 5.7+), each channel has its own IO/SQL thread, +// coordinates, and lag information. +type ChannelStatus struct { + ChannelName string + MasterKey InstanceKey + MasterUUID string + ReplicationIOThreadRunning bool + ReplicationSQLThreadRunning bool + ReplicationIOThreadState ReplicationThreadState + ReplicationSQLThreadState ReplicationThreadState + ReadBinlogCoordinates BinlogCoordinates + ExecBinlogCoordinates BinlogCoordinates + RelaylogCoordinates BinlogCoordinates + SecondsBehindMaster sql.NullInt64 + SQLDelay uint + LastSQLError string + LastIOError string + UsingOracleGTID bool + UsingMariaDBGTID bool + HasReplicationFilters bool + HasReplicationCredentials bool +} + +// IsGRInternalChannel returns true if this channel is a Group Replication internal channel. +func (cs *ChannelStatus) IsGRInternalChannel() bool { + return cs.ChannelName == "group_replication_applier" || cs.ChannelName == "group_replication_recovery" +} + // Instance represents a database instance, including its current configuration & status. // It presents important replication configuration and detailed replication status. type Instance struct { @@ -131,6 +160,10 @@ type Instance struct { LastDiscoveryLatency time.Duration + // Multi-source replication (named channels) + ReplicationChannels []ChannelStatus // All replication channels (empty for single-source) + ManagedChannelName string // The channel orchestrator manages for this instance (empty = default) + seed bool // Means we force this instance to be written to backend, even if it's invalid, empty or forgotten /* All things Group Replication below */ diff --git a/go/inst/instance_dao.go b/go/inst/instance_dao.go index 9e5c836c..7a63ceb1 100644 --- a/go/inst/instance_dao.go +++ b/go/inst/instance_dao.go @@ -425,6 +425,7 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, masterHostnameTmp := "" var masterPortTmp int var masterKey *InstanceKey + var replicationChannels []ChannelStatus if !instanceKey.IsValid() { latency.Start("backend") @@ -525,43 +526,74 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, return err } - instance.HasReplicationCredentials = (user != "") - instance.ReplicationIOThreadState = ReplicationThreadStateFromStatus(m.GetString(instance.QSP.slave_io_running())) - instance.ReplicationSQLThreadState = ReplicationThreadStateFromStatus(m.GetString(instance.QSP.slave_sql_running())) - instance.ReplicationIOThreadRuning = instance.ReplicationIOThreadState.IsRunning() + // Extract the channel name. MySQL 5.7+ includes Channel_Name in SHOW SLAVE STATUS. + // For older versions or single-source replication, this will be empty string. + channelName := m.GetStringD("Channel_Name", "") + + // Build a ChannelStatus for this row + channelStatus := ChannelStatus{ + ChannelName: channelName, + MasterUUID: m.GetStringD(instance.QSP.master_uuid(), "No"), + HasReplicationCredentials: (user != ""), + ReplicationIOThreadState: ReplicationThreadStateFromStatus(m.GetString(instance.QSP.slave_io_running())), + ReplicationSQLThreadState: ReplicationThreadStateFromStatus(m.GetString(instance.QSP.slave_sql_running())), + SQLDelay: m.GetUintD("SQL_Delay", 0), + UsingOracleGTID: (m.GetIntD("Auto_Position", 0) == 1), + UsingMariaDBGTID: (m.GetStringD("Using_Gtid", "No") != "No"), + HasReplicationFilters: ((m.GetStringD("Replicate_Do_DB", "") != "") || (m.GetStringD("Replicate_Ignore_DB", "") != "") || (m.GetStringD("Replicate_Do_Table", "") != "") || (m.GetStringD("Replicate_Ignore_Table", "") != "") || (m.GetStringD("Replicate_Wild_Do_Table", "") != "") || (m.GetStringD("Replicate_Wild_Ignore_Table", "") != "")), + LastSQLError: emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(m.GetString("Last_SQL_Error")), ""), + LastIOError: emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(m.GetString("Last_IO_Error")), ""), + SecondsBehindMaster: m.GetNullInt64(instance.QSP.seconds_behind_master()), + } + channelStatus.ReplicationIOThreadRunning = channelStatus.ReplicationIOThreadState.IsRunning() + channelStatus.ReplicationSQLThreadRunning = channelStatus.ReplicationSQLThreadState.IsRunning() + channelStatus.ReadBinlogCoordinates.LogFile = m.GetString(instance.QSP.master_log_file()) + channelStatus.ReadBinlogCoordinates.LogPos = m.GetInt64(instance.QSP.read_master_log_pos()) + channelStatus.ExecBinlogCoordinates.LogFile = m.GetString(instance.QSP.relay_master_log_file()) + channelStatus.ExecBinlogCoordinates.LogPos = m.GetInt64(instance.QSP.relay_master_log_position()) + channelStatus.RelaylogCoordinates.LogFile = m.GetString("Relay_Log_File") + channelStatus.RelaylogCoordinates.LogPos = m.GetInt64("Relay_Log_Pos") + channelStatus.RelaylogCoordinates.Type = RelayLog + + chMasterHost := m.GetString(instance.QSP.master_host()) + if isMaxScale110 { + chMasterHost = maxScaleMasterHostname + } + channelStatus.MasterKey = InstanceKey{Hostname: chMasterHost, Port: m.GetInt(instance.QSP.master_port())} + + replicationChannels = append(replicationChannels, channelStatus) + + // Populate the canonical Instance fields from this row (same as before). + // For single-source, this is the only row. For multi-source, each row overwrites; + // we select the canonical channel below after the loop. + instance.HasReplicationCredentials = channelStatus.HasReplicationCredentials + instance.ReplicationIOThreadState = channelStatus.ReplicationIOThreadState + instance.ReplicationSQLThreadState = channelStatus.ReplicationSQLThreadState + instance.ReplicationIOThreadRuning = channelStatus.ReplicationIOThreadRunning if isMaxScale110 { // Covering buggy MaxScale 1.1.0 instance.ReplicationIOThreadRuning = instance.ReplicationIOThreadRuning && (m.GetString(instance.QSP.slave_io_state()) == "Binlog Dump") } - instance.ReplicationSQLThreadRuning = instance.ReplicationSQLThreadState.IsRunning() - instance.ReadBinlogCoordinates.LogFile = m.GetString(instance.QSP.master_log_file()) - instance.ReadBinlogCoordinates.LogPos = m.GetInt64(instance.QSP.read_master_log_pos()) - instance.ExecBinlogCoordinates.LogFile = m.GetString(instance.QSP.relay_master_log_file()) - instance.ExecBinlogCoordinates.LogPos = m.GetInt64(instance.QSP.relay_master_log_position()) + instance.ReplicationSQLThreadRuning = channelStatus.ReplicationSQLThreadRunning + instance.ReadBinlogCoordinates = channelStatus.ReadBinlogCoordinates + instance.ExecBinlogCoordinates = channelStatus.ExecBinlogCoordinates instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates() - instance.RelaylogCoordinates.LogFile = m.GetString("Relay_Log_File") - instance.RelaylogCoordinates.LogPos = m.GetInt64("Relay_Log_Pos") - instance.RelaylogCoordinates.Type = RelayLog - instance.LastSQLError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(m.GetString("Last_SQL_Error")), "") - instance.LastIOError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(m.GetString("Last_IO_Error")), "") - instance.SQLDelay = m.GetUintD("SQL_Delay", 0) - instance.UsingOracleGTID = (m.GetIntD("Auto_Position", 0) == 1) - instance.UsingMariaDBGTID = (m.GetStringD("Using_Gtid", "No") != "No") - instance.MasterUUID = m.GetStringD(instance.QSP.master_uuid(), "No") - instance.HasReplicationFilters = ((m.GetStringD("Replicate_Do_DB", "") != "") || (m.GetStringD("Replicate_Ignore_DB", "") != "") || (m.GetStringD("Replicate_Do_Table", "") != "") || (m.GetStringD("Replicate_Ignore_Table", "") != "") || (m.GetStringD("Replicate_Wild_Do_Table", "") != "") || (m.GetStringD("Replicate_Wild_Ignore_Table", "") != "")) + instance.RelaylogCoordinates = channelStatus.RelaylogCoordinates + instance.LastSQLError = channelStatus.LastSQLError + instance.LastIOError = channelStatus.LastIOError + instance.SQLDelay = channelStatus.SQLDelay + instance.UsingOracleGTID = channelStatus.UsingOracleGTID + instance.UsingMariaDBGTID = channelStatus.UsingMariaDBGTID + instance.MasterUUID = channelStatus.MasterUUID + instance.HasReplicationFilters = channelStatus.HasReplicationFilters // Remember master hostname:port. Once we update resolve cache below // we will use it to set instance's members - masterHostnameTmp = m.GetString(instance.QSP.master_host()) - if isMaxScale110 { - // Buggy buggy maxscale 1.1.0. Reported Master_Host can be corrupted. - // Therefore we (currently) take @@hostname (which is masquarading as master host anyhow) - masterHostnameTmp = maxScaleMasterHostname - } - masterPortTmp = m.GetInt(instance.QSP.master_port()) + masterHostnameTmp = channelStatus.MasterKey.Hostname + masterPortTmp = channelStatus.MasterKey.Port instance.IsDetachedMaster = instance.MasterKey.IsDetached() - instance.SecondsBehindMaster = m.GetNullInt64(instance.QSP.seconds_behind_master()) + instance.SecondsBehindMaster = channelStatus.SecondsBehindMaster if instance.SecondsBehindMaster.Valid && instance.SecondsBehindMaster.Int64 < 0 { _ = log.Warningf("Host: %+v, instance.SecondsBehindMaster < 0 [%+v], correcting to 0", instanceKey, instance.SecondsBehindMaster.Int64) instance.SecondsBehindMaster.Int64 = 0 @@ -578,6 +610,43 @@ func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, goto Cleanup } + // Store all discovered channels on the instance + instance.ReplicationChannels = replicationChannels + + // For multi-source replication, select the canonical channel to represent + // the Instance's master. This ensures backward compatibility: for single-source + // instances, the behavior is identical to before. + if len(replicationChannels) > 1 { + canonicalIdx := selectCanonicalChannelIndex(replicationChannels) + if canonicalIdx >= 0 { + ch := replicationChannels[canonicalIdx] + instance.HasReplicationCredentials = ch.HasReplicationCredentials + instance.ReplicationIOThreadState = ch.ReplicationIOThreadState + instance.ReplicationSQLThreadState = ch.ReplicationSQLThreadState + instance.ReplicationIOThreadRuning = ch.ReplicationIOThreadRunning + instance.ReplicationSQLThreadRuning = ch.ReplicationSQLThreadRunning + instance.ReadBinlogCoordinates = ch.ReadBinlogCoordinates + instance.ExecBinlogCoordinates = ch.ExecBinlogCoordinates + instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates() + instance.RelaylogCoordinates = ch.RelaylogCoordinates + instance.LastSQLError = ch.LastSQLError + instance.LastIOError = ch.LastIOError + instance.SQLDelay = ch.SQLDelay + instance.UsingOracleGTID = ch.UsingOracleGTID + instance.UsingMariaDBGTID = ch.UsingMariaDBGTID + instance.MasterUUID = ch.MasterUUID + instance.HasReplicationFilters = ch.HasReplicationFilters + instance.SecondsBehindMaster = ch.SecondsBehindMaster + if instance.SecondsBehindMaster.Valid && instance.SecondsBehindMaster.Int64 < 0 { + instance.SecondsBehindMaster.Int64 = 0 + } + instance.ReplicationLagSeconds = instance.SecondsBehindMaster + masterHostnameTmp = ch.MasterKey.Hostname + masterPortTmp = ch.MasterKey.Port + instance.ManagedChannelName = ch.ChannelName + } + } + if !isMaxScale { // We begin with a few operations we can run concurrently, and which do not depend on anything { @@ -1172,6 +1241,10 @@ Cleanup: } else { WriteInstance(instance, instanceFound, err) } + // Persist multi-source replication channel data + if len(instance.ReplicationChannels) > 0 { + _ = writeInstanceChannels(instance) + } lastAttemptedCheckTimer.Stop() latency.Stop("backend") return instance, instanceDiscoverySkipped, nil @@ -1514,6 +1587,9 @@ func readInstanceRow(m sqlutils.RowMap) *Instance { instance.QSP = GetQueryStringProvider(instance.Version) + // Load multi-source replication channels from the backend + _ = readInstanceChannels(instance) + return instance } @@ -3615,3 +3691,96 @@ func isInjectedPseudoGTID(clusterName string) (injected bool, err error) { clusterInjectedPseudoGTIDCache.Set(clusterName, injected, cache.DefaultExpiration) return injected, log.Errore(err) } + +// selectCanonicalChannelIndex picks the canonical replication channel from a list of channels. +// For multi-source replication, this determines which channel orchestrator considers the +// "primary" channel for topology purposes. +// Selection priority: +// 1. The default channel (empty name "") +// 2. The first non-GR-internal channel +// 3. Index 0 as fallback +func selectCanonicalChannelIndex(channels []ChannelStatus) int { + if len(channels) == 0 { + return -1 + } + // Prefer the default channel (empty string name) + for i, ch := range channels { + if ch.ChannelName == "" { + return i + } + } + // Skip GR internal channels, pick first non-GR channel + for i, ch := range channels { + if !ch.IsGRInternalChannel() { + return i + } + } + // Fallback + return 0 +} + +// writeInstanceChannels persists all replication channel data for an instance. +// It deletes existing rows and inserts current channel data. +func writeInstanceChannels(instance *Instance) error { + if len(instance.ReplicationChannels) == 0 { + return nil + } + writeFunc := func() error { + // Delete existing channel rows for this instance + _, err := db.ExecOrchestrator(` + delete from database_instance_channels + where hostname = ? and port = ?`, + instance.Key.Hostname, instance.Key.Port, + ) + if err != nil { + return log.Errore(err) + } + // Insert one row per channel + for _, ch := range instance.ReplicationChannels { + _, err := db.ExecOrchestrator(` + insert into database_instance_channels ( + hostname, port, channel_name, master_host, master_port, + master_uuid, replication_io_running, replication_sql_running, + seconds_behind_master, last_io_error, last_sql_error, last_seen + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW())`, + instance.Key.Hostname, instance.Key.Port, ch.ChannelName, + ch.MasterKey.Hostname, ch.MasterKey.Port, ch.MasterUUID, + ch.ReplicationIOThreadRunning, ch.ReplicationSQLThreadRunning, + ch.SecondsBehindMaster, ch.LastIOError, ch.LastSQLError, + ) + if err != nil { + return log.Errore(err) + } + } + return nil + } + return ExecDBWriteFunc(writeFunc) +} + +// readInstanceChannels populates instance.ReplicationChannels from the backend database. +func readInstanceChannels(instance *Instance) error { + query := ` + select + channel_name, master_host, master_port, master_uuid, + replication_io_running, replication_sql_running, + seconds_behind_master, last_io_error, last_sql_error + from database_instance_channels + where hostname = ? and port = ? + order by channel_name` + err := db.QueryOrchestrator(query, sqlutils.Args(instance.Key.Hostname, instance.Key.Port), func(m sqlutils.RowMap) error { + ch := ChannelStatus{ + ChannelName: m.GetString("channel_name"), + MasterUUID: m.GetString("master_uuid"), + ReplicationIOThreadRunning: m.GetBool("replication_io_running"), + ReplicationSQLThreadRunning: m.GetBool("replication_sql_running"), + SecondsBehindMaster: m.GetNullInt64("seconds_behind_master"), + LastIOError: m.GetString("last_io_error"), + LastSQLError: m.GetString("last_sql_error"), + } + ch.MasterKey.Hostname = m.GetString("master_host") + ch.MasterKey.Port = m.GetInt("master_port") + instance.ReplicationChannels = append(instance.ReplicationChannels, ch) + return nil + }) + return err +} diff --git a/go/inst/instance_topology_dao.go b/go/inst/instance_topology_dao.go index 5572e636..178f79ec 100644 --- a/go/inst/instance_topology_dao.go +++ b/go/inst/instance_topology_dao.go @@ -398,6 +398,13 @@ func StopReplicas(replicas [](*Instance), stopReplicationMethod StopReplicationM // StopReplication stops replication on a given instance func StopReplication(instanceKey *InstanceKey) (*Instance, error) { + return StopReplicationForChannel(instanceKey, "") +} + +// StopReplicationForChannel stops replication on a given instance, optionally for a specific channel. +// When channelName is empty, it stops all replication (backward compatible behavior). +// When channelName is specified, it stops only that channel. +func StopReplicationForChannel(instanceKey *InstanceKey, channelName string) (*Instance, error) { instance, err := ReadTopologyInstance(instanceKey) if err != nil { return instance, log.Errore(err) @@ -407,7 +414,13 @@ func StopReplication(instanceKey *InstanceKey) (*Instance, error) { return instance, fmt.Errorf("instance is not a replica: %+v", instanceKey) } - _, err = ExecInstance(instanceKey, instance.QSP.stop_slave()) + // For multi-source instances with no explicit channel, use the managed channel + effectiveChannel := channelName + if effectiveChannel == "" && len(instance.ReplicationChannels) > 1 { + effectiveChannel = instance.ManagedChannelName + } + + _, err = ExecInstance(instanceKey, instance.QSP.StopReplicaForChannel(effectiveChannel)) if err != nil { // Patch; current MaxScale behavior for STOP SLAVE is to throw an error if replica already stopped. if instance.isMaxScale() && err.Error() == "Error 1199: Slave connection is not running" { @@ -419,7 +432,7 @@ func StopReplication(instanceKey *InstanceKey) (*Instance, error) { } instance, err = ReadTopologyInstance(instanceKey) - log.Infof("Stopped replication on %+v, Self:%+v, Exec:%+v", *instanceKey, instance.SelfBinlogCoordinates, instance.ExecBinlogCoordinates) + log.Infof("Stopped replication on %+v (channel %q), Self:%+v, Exec:%+v", *instanceKey, effectiveChannel, instance.SelfBinlogCoordinates, instance.ExecBinlogCoordinates) return instance, err } @@ -448,6 +461,13 @@ func waitForReplicationState(instance *Instance, instanceKey *InstanceKey, expec // StartReplication starts replication on a given instance. func StartReplication(instanceKey *InstanceKey) (*Instance, error) { + return StartReplicationForChannel(instanceKey, "") +} + +// StartReplicationForChannel starts replication on a given instance, optionally for a specific channel. +// When channelName is empty, it starts all replication (backward compatible behavior). +// When channelName is specified, it starts only that channel. +func StartReplicationForChannel(instanceKey *InstanceKey, channelName string) (*Instance, error) { instance, err := ReadTopologyInstance(instanceKey) if err != nil { return instance, log.Errore(err) @@ -472,11 +492,17 @@ func StartReplication(instanceKey *InstanceKey) (*Instance, error) { return instance, log.Errore(err) } - _, err = ExecInstance(instanceKey, instance.QSP.start_slave()) + // For multi-source instances with no explicit channel, use the managed channel + effectiveChannel := channelName + if effectiveChannel == "" && len(instance.ReplicationChannels) > 1 { + effectiveChannel = instance.ManagedChannelName + } + + _, err = ExecInstance(instanceKey, instance.QSP.StartReplicaForChannel(effectiveChannel)) if err != nil { return instance, log.Errore(err) } - log.Infof("Started replication on %+v", instanceKey) + log.Infof("Started replication on %+v (channel %q)", instanceKey, effectiveChannel) waitForReplicationState(instance, instanceKey, ReplicationThreadStateRunning) @@ -942,6 +968,13 @@ func workaroundBug83713(instance *Instance, instanceKey *InstanceKey) { // ChangeMasterTo changes the given instance's master according to given input. func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinlogCoordinates *BinlogCoordinates, skipUnresolve bool, gtidHint OperationGTIDHint) (*Instance, error) { + return ChangeMasterToForChannel(instanceKey, masterKey, masterBinlogCoordinates, skipUnresolve, gtidHint, "") +} + +// ChangeMasterToForChannel changes the given instance's master for a specific replication channel. +// When channelName is empty, this behaves identically to the original ChangeMasterTo for +// backward compatibility with single-source replication. +func ChangeMasterToForChannel(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinlogCoordinates *BinlogCoordinates, skipUnresolve bool, gtidHint OperationGTIDHint, channelName string) (*Instance, error) { instance, err := ReadTopologyInstance(instanceKey) if err != nil { return instance, log.Errore(err) @@ -950,7 +983,7 @@ func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinl if instance.ReplicationThreadsExist() && !instance.ReplicationThreadsStopped() { return instance, fmt.Errorf("ChangeMasterTo: Cannot change master on: %+v because replication threads are not stopped", *instanceKey) } - log.Debugf("ChangeMasterTo: will attempt changing master on %+v to %+v, %+v", *instanceKey, *masterKey, *masterBinlogCoordinates) + log.Debugf("ChangeMasterTo: will attempt changing master on %+v to %+v, %+v (channel %q)", *instanceKey, *masterKey, *masterBinlogCoordinates, channelName) changeToMasterKey := masterKey if !skipUnresolve { unresolvedMasterKey, nameUnresolved, err := UnresolveHostname(masterKey) @@ -971,12 +1004,15 @@ func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinl originalMasterKey := instance.MasterKey originalExecBinlogCoordinates := instance.ExecBinlogCoordinates + // Build the FOR CHANNEL suffix for multi-source replication + channelClause := forChannelClause(channelName) + var changeMasterFunc func() error changedViaGTID := false if instance.UsingMariaDBGTID && gtidHint != GTIDHintDeny { // Keep on using GTID changeMasterFunc = func() error { - _, err := ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port(), + _, err := ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port()+channelClause, changeToMasterKey.Hostname, changeToMasterKey.Port) return err } @@ -984,7 +1020,7 @@ func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinl } else if instance.UsingMariaDBGTID && gtidHint == GTIDHintDeny { // Make sure to not use GTID changeMasterFunc = func() error { - _, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port_log_gtid_no(), + _, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port_log_gtid_no()+channelClause, changeToMasterKey.Hostname, changeToMasterKey.Port, masterBinlogCoordinates.LogFile, masterBinlogCoordinates.LogPos) return err } @@ -1001,7 +1037,7 @@ func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinl mariadbGTIDHint = "current_pos" } changeMasterFunc = func() error { - _, err = ExecInstance(instanceKey, fmt.Sprintf("change master to master_host=?, master_port=?, master_use_gtid=%s", mariadbGTIDHint), + _, err = ExecInstance(instanceKey, fmt.Sprintf("change master to master_host=?, master_port=?, master_use_gtid=%s", mariadbGTIDHint)+channelClause, changeToMasterKey.Hostname, changeToMasterKey.Port) return err } @@ -1009,7 +1045,7 @@ func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinl } else if instance.UsingOracleGTID && gtidHint != GTIDHintDeny { // Is Oracle; already uses GTID; keep using it. changeMasterFunc = func() error { - _, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port(), + _, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port()+channelClause, changeToMasterKey.Hostname, changeToMasterKey.Port) return err } @@ -1017,14 +1053,14 @@ func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinl } else if instance.UsingOracleGTID && gtidHint == GTIDHintDeny { // Is Oracle; already uses GTID changeMasterFunc = func() error { - _, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port_log_autoposition_no(), + _, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port_log_autoposition_no()+channelClause, changeToMasterKey.Hostname, changeToMasterKey.Port, masterBinlogCoordinates.LogFile, masterBinlogCoordinates.LogPos) return err } } else if instance.SupportsOracleGTID && gtidHint == GTIDHintForce { // Is Oracle; not using GTID right now; turn into GTID changeMasterFunc = func() error { - _, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port_autoposition_yes(), + _, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port_autoposition_yes()+channelClause, changeToMasterKey.Hostname, changeToMasterKey.Port) return err } @@ -1032,7 +1068,7 @@ func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinl } else { // Normal binlog file:pos changeMasterFunc = func() error { - _, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port_log(), + _, err = ExecInstance(instanceKey, instance.QSP.change_master_to_master_host_port_log()+channelClause, changeToMasterKey.Hostname, changeToMasterKey.Port, masterBinlogCoordinates.LogFile, masterBinlogCoordinates.LogPos) return err } @@ -1049,7 +1085,7 @@ func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinl WriteMasterPositionEquivalence(&originalMasterKey, &originalExecBinlogCoordinates, changeToMasterKey, masterBinlogCoordinates) ResetInstanceRelaylogCoordinatesHistory(instanceKey) - log.Infof("ChangeMasterTo: Changed master on %+v to: %+v, %+v. GTID: %+v", *instanceKey, masterKey, masterBinlogCoordinates, changedViaGTID) + log.Infof("ChangeMasterTo: Changed master on %+v to: %+v, %+v. GTID: %+v, Channel: %q", *instanceKey, masterKey, masterBinlogCoordinates, changedViaGTID, channelName) instance, err = ReadTopologyInstance(instanceKey) return instance, err diff --git a/go/inst/query_string_provider.go b/go/inst/query_string_provider.go index 4bc9386d..c23b1ff8 100644 --- a/go/inst/query_string_provider.go +++ b/go/inst/query_string_provider.go @@ -391,3 +391,53 @@ func GetQueryStringProvider(version string) QueryStringProvider { return queryStringProvider80 } + +// forChannelClause returns the "FOR CHANNEL 'name'" SQL suffix for channel-aware commands. +// When channelName is empty (the default channel), it returns an empty string for +// backward compatibility with single-source replication. +func forChannelClause(channelName string) string { + if channelName == "" { + return "" + } + return " FOR CHANNEL '" + channelName + "'" +} + +// StopReplicaForChannel returns the stop slave/replica SQL with an optional channel clause. +func (qps *QueryStringProvider) StopReplicaForChannel(channelName string) string { + return qps.stop_slave() + forChannelClause(channelName) +} + +// StartReplicaForChannel returns the start slave/replica SQL with an optional channel clause. +func (qps *QueryStringProvider) StartReplicaForChannel(channelName string) string { + return qps.start_slave() + forChannelClause(channelName) +} + +// ResetReplicaForChannel returns the reset slave/replica SQL with an optional channel clause. +func (qps *QueryStringProvider) ResetReplicaForChannel(channelName string) string { + return qps.reset_slave() + forChannelClause(channelName) +} + +// ResetReplicaAllForChannel returns the reset slave all SQL with an optional channel clause. +func (qps *QueryStringProvider) ResetReplicaAllForChannel(channelName string) string { + return qps.reset_slave_50603_all() + forChannelClause(channelName) +} + +// StopReplicaIOThreadForChannel returns the stop IO thread SQL with an optional channel clause. +func (qps *QueryStringProvider) StopReplicaIOThreadForChannel(channelName string) string { + return qps.stop_slave_io_thread() + forChannelClause(channelName) +} + +// StopReplicaSQLThreadForChannel returns the stop SQL thread SQL with an optional channel clause. +func (qps *QueryStringProvider) StopReplicaSQLThreadForChannel(channelName string) string { + return qps.stop_slave_sql_thread() + forChannelClause(channelName) +} + +// StartReplicaSQLThreadForChannel returns the start SQL thread SQL with an optional channel clause. +func (qps *QueryStringProvider) StartReplicaSQLThreadForChannel(channelName string) string { + return qps.start_slave_sql_thread() + forChannelClause(channelName) +} + +// StartReplicaIOThreadForChannel returns the start IO thread SQL with an optional channel clause. +func (qps *QueryStringProvider) StartReplicaIOThreadForChannel(channelName string) string { + return qps.start_slave_io_thread() + forChannelClause(channelName) +}