From f09770ed22a18a9ff5deb9a0c862f23f487e04ed Mon Sep 17 00:00:00 2001 From: wk989898 Date: Mon, 13 Oct 2025 09:24:38 +0000 Subject: [PATCH 1/2] init --- pkg/cluster/manager/builder.go | 1 + pkg/cluster/manager/manager_test.go | 17 +++++++------- pkg/cluster/manager/upgrade.go | 33 +++++++++++++++++----------- pkg/cluster/spec/alertmanager.go | 4 +++- pkg/cluster/spec/cdc.go | 6 ++++- pkg/cluster/spec/dashboard.go | 4 +++- pkg/cluster/spec/drainer.go | 4 +++- pkg/cluster/spec/grafana.go | 4 +++- pkg/cluster/spec/grafana_test.go | 6 ++--- pkg/cluster/spec/instance.go | 11 +++++++--- pkg/cluster/spec/monitoring.go | 4 +++- pkg/cluster/spec/pd.go | 4 +++- pkg/cluster/spec/pump.go | 4 +++- pkg/cluster/spec/scheduling.go | 4 +++- pkg/cluster/spec/tidb.go | 4 +++- pkg/cluster/spec/tiflash.go | 4 +++- pkg/cluster/spec/tikv.go | 4 +++- pkg/cluster/spec/tikv_cdc.go | 4 +++- pkg/cluster/spec/tiproxy.go | 4 +++- pkg/cluster/spec/tispark.go | 8 +++++-- pkg/cluster/spec/tso.go | 4 +++- pkg/cluster/task/builder.go | 3 ++- pkg/cluster/task/init_config.go | 3 ++- pkg/cluster/task/init_config_test.go | 1 + pkg/cluster/task/scale_config.go | 2 +- 25 files changed, 99 insertions(+), 48 deletions(-) diff --git a/pkg/cluster/manager/builder.go b/pkg/cluster/manager/builder.go index c6c0454b01..7b5ca9645e 100644 --- a/pkg/cluster/manager/builder.go +++ b/pkg/cluster/manager/builder.go @@ -689,6 +689,7 @@ func buildInitConfigTasks( Log: logDir, Cache: m.specManager.Path(name, spec.TempConfigPath), }, + spec.InstanceOpt{}, ). BuildAsStep(fmt.Sprintf(" - Generate config %s -> %s", compName, instance.ID())) tasks = append(tasks, t) diff --git a/pkg/cluster/manager/manager_test.go b/pkg/cluster/manager/manager_test.go index bef3756f09..b3789a7c00 100644 --- a/pkg/cluster/manager/manager_test.go +++ b/pkg/cluster/manager/manager_test.go @@ -23,19 +23,18 @@ import ( ) func TestVersionCompare(t *testing.T) { - var err error - err = versionCompare("v4.0.0", "v4.0.1") - assert.Nil(t, err) + res := versionCompare("v4.0.0", "v4.0.1") + assert.Equal(t, res, -1) - err = versionCompare("v4.0.1", "v4.0.0") - assert.NotNil(t, err) + res = versionCompare("v4.0.1", "v4.0.0") + assert.Equal(t, res, 1) - err = versionCompare("v4.0.0", "nightly") - assert.Nil(t, err) + res = versionCompare("v4.0.0", "nightly") + assert.Equal(t, res, -1) - err = versionCompare("nightly", "nightly") - assert.Nil(t, err) + res = versionCompare("nightly", "nightly") + assert.Equal(t, res, 0) } func TestValidateNewTopo(t *testing.T) { diff --git a/pkg/cluster/manager/upgrade.go b/pkg/cluster/manager/upgrade.go index 100dfa9442..d37732be51 100644 --- a/pkg/cluster/manager/upgrade.go +++ b/pkg/cluster/manager/upgrade.go @@ -38,6 +38,10 @@ import ( "golang.org/x/mod/semver" ) +const ( + CDCNewArchVersion = "v9.0.0" +) + func (m *Manager) upgradePrecheck(name string, componentVersions map[string]string, opt operator.Options, skipConfirm bool) error { if !skipConfirm && strings.ToLower(opt.DisplayMode) != "json" { for _, v := range componentVersions { @@ -86,9 +90,9 @@ func (m *Manager) Upgrade(name string, clusterVersion string, componentVersions uniqueComps = map[string]struct{}{} ) - if err := versionCompare(base.Version, clusterVersion); err != nil { + if versionCompare(base.Version, clusterVersion) == 1 { if !ignoreVersionCheck { - return err + return perrs.Errorf("please specify a higher or equle version than %s", base.Version) } m.logger.Warnf("%s", color.RedString("There is no guarantee that the cluster can be downgraded. Be careful before you continue.")) } @@ -143,8 +147,8 @@ This operation will upgrade %s %s cluster %s (with a concurrency of %d) to %s:%s hasImported := false for _, comp := range components { + oldver := comp.CalculateVersion(base.Version) version := comp.CalculateVersion(clusterVersion) - for _, inst := range comp.Instances() { // Download component from repository key := fmt.Sprintf("%s-%s-%s-%s", inst.ComponentSource(), version, inst.OS(), inst.Arch()) @@ -217,6 +221,10 @@ This operation will upgrade %s %s cluster %s (with a concurrency of %d) to %s:%s } } + checkCDCNewArch := false + if versionCompare(oldver, CDCNewArchVersion) == -1 && versionCompare(CDCNewArchVersion, clusterVersion) > -1 { + checkCDCNewArch = true + } tb.InitConfig( name, clusterVersion, @@ -230,6 +238,7 @@ This operation will upgrade %s %s cluster %s (with a concurrency of %d) to %s:%s Log: logDir, Cache: m.specManager.Path(name, spec.TempConfigPath), }, + spec.InstanceOpt{CheckCDCNewArch: checkCDCNewArch}, ) copyCompTasks = append(copyCompTasks, tb.Build()) } @@ -372,18 +381,16 @@ This operation will upgrade %s %s cluster %s (with a concurrency of %d) to %s:%s return nil } -func versionCompare(curVersion, newVersion string) error { +// versionCompare returns an integer comparing two versions according to semantic version precedence. +// The result will be 0 if curVersion == newVersion, -1 if curVersion < newVersion, or +1 if curVersion > newVersion. +func versionCompare(curVersion, newVersion string) int { + if newVersion == curVersion { + return 0 + } // Can always upgrade to 'nightly' event the current version is 'nightly' if newVersion == utils.NightlyVersionAlias { - return nil + return -1 } - switch semver.Compare(curVersion, newVersion) { - case -1, 0: - return nil - case 1: - return perrs.Errorf("please specify a higher or equle version than %s", curVersion) - default: - return perrs.Errorf("unreachable") - } + return semver.Compare(curVersion, newVersion) } diff --git a/pkg/cluster/spec/alertmanager.go b/pkg/cluster/spec/alertmanager.go index 7886310b36..64d2eea388 100644 --- a/pkg/cluster/spec/alertmanager.go +++ b/pkg/cluster/spec/alertmanager.go @@ -175,6 +175,7 @@ func (i *AlertManagerInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { gOpts := *i.topo.BaseTopo().GlobalOptions if err := i.BaseInstance.InitConfig(ctx, e, gOpts, deployUser, paths); err != nil { @@ -247,11 +248,12 @@ func (i *AlertManagerInstance) ScaleConfig( clusterVersion string, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { s := i.topo defer func() { i.topo = s }() i.topo = topo - return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt) } // setTLSConfig set TLS Config to support enable/disable TLS diff --git a/pkg/cluster/spec/cdc.go b/pkg/cluster/spec/cdc.go index 35ff673a61..1514d786e3 100644 --- a/pkg/cluster/spec/cdc.go +++ b/pkg/cluster/spec/cdc.go @@ -182,6 +182,7 @@ func (i *CDCInstance) ScaleConfig( clusterVersion, user string, paths meta.DirPaths, + opt InstanceOpt, ) error { s := i.topo defer func() { @@ -189,7 +190,7 @@ func (i *CDCInstance) ScaleConfig( }() i.topo = mustBeClusterTopo(topo) - return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths, opt) } // InitConfig implements Instance interface. @@ -200,6 +201,7 @@ func (i *CDCInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { topo := i.topo.(*Specification) if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { @@ -209,6 +211,8 @@ func (i *CDCInstance) InitConfig( spec := i.InstanceSpec.(*CDCSpec) globalConfig := topo.ServerConfigs.CDC instanceConfig := spec.Config + // clusterVersion v9.0 + // version v9.0.0.beta.1 version := i.CalculateVersion(clusterVersion) if !tidbver.TiCDCSupportConfigFile(version) { diff --git a/pkg/cluster/spec/dashboard.go b/pkg/cluster/spec/dashboard.go index d7afee4f76..b6ce8bb8fe 100644 --- a/pkg/cluster/spec/dashboard.go +++ b/pkg/cluster/spec/dashboard.go @@ -178,6 +178,7 @@ func (i *DashboardInstance) ScaleConfig( clusterVersion, user string, paths meta.DirPaths, + opt InstanceOpt, ) error { s := i.topo defer func() { @@ -185,7 +186,7 @@ func (i *DashboardInstance) ScaleConfig( }() i.topo = mustBeClusterTopo(topo) - return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths, opt) } // InitConfig implements Instance interface. @@ -196,6 +197,7 @@ func (i *DashboardInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { topo := i.topo.(*Specification) if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { diff --git a/pkg/cluster/spec/drainer.go b/pkg/cluster/spec/drainer.go index dd4181f594..a2bc667543 100644 --- a/pkg/cluster/spec/drainer.go +++ b/pkg/cluster/spec/drainer.go @@ -197,6 +197,7 @@ func (i *DrainerInstance) ScaleConfig( clusterVersion, user string, paths meta.DirPaths, + opt InstanceOpt, ) error { s := i.topo defer func() { @@ -204,7 +205,7 @@ func (i *DrainerInstance) ScaleConfig( }() i.topo = mustBeClusterTopo(topo) - return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths, opt) } // InitConfig implements Instance interface. @@ -215,6 +216,7 @@ func (i *DrainerInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { topo := i.topo.(*Specification) if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { diff --git a/pkg/cluster/spec/grafana.go b/pkg/cluster/spec/grafana.go index db15fc8045..5d093ed921 100644 --- a/pkg/cluster/spec/grafana.go +++ b/pkg/cluster/spec/grafana.go @@ -185,6 +185,7 @@ func (i *GrafanaInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { gOpts := *i.topo.BaseTopo().GlobalOptions if err := i.BaseInstance.InitConfig(ctx, e, gOpts, deployUser, paths); err != nil { @@ -453,11 +454,12 @@ func (i *GrafanaInstance) ScaleConfig( clusterVersion string, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { s := i.topo defer func() { i.topo = s }() i.topo = topo.Merge(i.topo) - return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt) } func mergeAdditionalGrafanaConf(source string, addition map[string]string) error { diff --git a/pkg/cluster/spec/grafana_test.go b/pkg/cluster/spec/grafana_test.go index 901f7cefbd..6a98a1dde0 100644 --- a/pkg/cluster/spec/grafana_test.go +++ b/pkg/cluster/spec/grafana_test.go @@ -225,7 +225,7 @@ func TestGrafanaDatasourceConfig(t *testing.T) { // Test datasource configuration clusterName := "test-cluster" - err := grafanaInstance.InitConfig(ctxt.New(ctx, 0, logprinter.NewLogger("")), mockExec, clusterName, "v5.4.0", "tidb", paths) + err := grafanaInstance.InitConfig(ctxt.New(ctx, 0, logprinter.NewLogger("")), mockExec, clusterName, "v5.4.0", "tidb", paths, InstanceOpt{}) require.NoError(t, err) // Verify the datasource configuration file @@ -247,7 +247,7 @@ func TestGrafanaDatasourceConfig(t *testing.T) { // Test without VM remote write enabled topo.Monitors[0].PromRemoteWriteToVM = false - err = grafanaInstance.InitConfig(ctxt.New(ctx, 0, logprinter.NewLogger("")), mockExec, clusterName, "v5.4.0", "tidb", paths) + err = grafanaInstance.InitConfig(ctxt.New(ctx, 0, logprinter.NewLogger("")), mockExec, clusterName, "v5.4.0", "tidb", paths, InstanceOpt{}) require.NoError(t, err) // Verify the datasource configuration file again @@ -400,7 +400,7 @@ func TestVictoriaMetricsDefaultDatasource(t *testing.T) { grafanaInstance := comp.Instances()[0].(*GrafanaInstance) // Run InitConfig which will process dashboards - err = grafanaInstance.InitConfig(ctxt.New(ctx, 0, logprinter.NewLogger("")), origExecutor, "test-cluster", "v5.4.0", "tidb", paths) + err = grafanaInstance.InitConfig(ctxt.New(ctx, 0, logprinter.NewLogger("")), origExecutor, "test-cluster", "v5.4.0", "tidb", paths, InstanceOpt{}) require.NoError(t, err) // Check if the dashboard file was created and datasource references were updated diff --git a/pkg/cluster/spec/instance.go b/pkg/cluster/spec/instance.go index 4ed6542a17..ab9e5d6499 100644 --- a/pkg/cluster/spec/instance.go +++ b/pkg/cluster/spec/instance.go @@ -97,8 +97,8 @@ type Instance interface { InstanceSpec ID() string Ready(context.Context, ctxt.Executor, uint64, *tls.Config) error - InitConfig(ctx context.Context, e ctxt.Executor, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error - ScaleConfig(ctx context.Context, e ctxt.Executor, topo Topology, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error + InitConfig(ctx context.Context, e ctxt.Executor, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths, opt InstanceOpt) error + ScaleConfig(ctx context.Context, e ctxt.Executor, topo Topology, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths, opt InstanceOpt) error PrepareStart(ctx context.Context, tlsCfg *tls.Config) error ComponentName() string ComponentSource() string @@ -149,6 +149,10 @@ func PortStopped(ctx context.Context, e ctxt.Executor, port int, timeout uint64) return w.Execute(ctx, e) } +type InstanceOpt struct { + CheckCDCNewArch bool +} + // BaseInstance implements some method of Instance interface.. type BaseInstance struct { InstanceSpec @@ -168,7 +172,8 @@ type BaseInstance struct { StatusFn func(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdHosts ...string) string UptimeFn func(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration - Component Component + Component Component + InstanceOpt InstanceOpt } // Ready implements Instance interface diff --git a/pkg/cluster/spec/monitoring.go b/pkg/cluster/spec/monitoring.go index 731eea011a..09ec8e6890 100644 --- a/pkg/cluster/spec/monitoring.go +++ b/pkg/cluster/spec/monitoring.go @@ -263,6 +263,7 @@ func (i *MonitorInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { gOpts := *i.topo.BaseTopo().GlobalOptions if err := i.BaseInstance.InitConfig(ctx, e, gOpts, deployUser, paths); err != nil { @@ -651,11 +652,12 @@ func (i *MonitorInstance) ScaleConfig( clusterVersion string, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { s := i.topo defer func() { i.topo = s }() i.topo = topo - return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt) } func mergeAdditionalScrapeConf(source string, addition map[string]any) error { diff --git a/pkg/cluster/spec/pd.go b/pkg/cluster/spec/pd.go index b09ca0b336..0fe809cb39 100644 --- a/pkg/cluster/spec/pd.go +++ b/pkg/cluster/spec/pd.go @@ -230,6 +230,7 @@ func (i *PDInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { topo := i.topo.(*Specification) if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { @@ -355,9 +356,10 @@ func (i *PDInstance) ScaleConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { // We need pd.toml here, but we don't need to check it - if err := i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths); err != nil && + if err := i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt); err != nil && errors.Cause(err) != ErrorCheckConfig { return err } diff --git a/pkg/cluster/spec/pump.go b/pkg/cluster/spec/pump.go index 5fb45db92e..20ab84410e 100644 --- a/pkg/cluster/spec/pump.go +++ b/pkg/cluster/spec/pump.go @@ -196,6 +196,7 @@ func (i *PumpInstance) ScaleConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { s := i.topo defer func() { @@ -203,7 +204,7 @@ func (i *PumpInstance) ScaleConfig( }() i.topo = mustBeClusterTopo(topo) - return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt) } // InitConfig implements Instance interface. @@ -214,6 +215,7 @@ func (i *PumpInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { topo := i.topo.(*Specification) if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { diff --git a/pkg/cluster/spec/scheduling.go b/pkg/cluster/spec/scheduling.go index 3593562f64..bf42455854 100644 --- a/pkg/cluster/spec/scheduling.go +++ b/pkg/cluster/spec/scheduling.go @@ -217,6 +217,7 @@ func (i *SchedulingInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { topo := i.topo.(*Specification) if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { @@ -341,13 +342,14 @@ func (i *SchedulingInstance) ScaleConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { s := i.topo defer func() { i.topo = s }() i.topo = mustBeClusterTopo(topo) - return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt) } var _ RollingUpdateInstance = &SchedulingInstance{} diff --git a/pkg/cluster/spec/tidb.go b/pkg/cluster/spec/tidb.go index 39ee25f370..ee17ed05dc 100644 --- a/pkg/cluster/spec/tidb.go +++ b/pkg/cluster/spec/tidb.go @@ -174,6 +174,7 @@ func (i *TiDBInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { topo := i.topo.(*Specification) if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { @@ -323,11 +324,12 @@ func (i *TiDBInstance) ScaleConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { s := i.topo defer func() { i.topo = s }() i.topo = mustBeClusterTopo(topo) - return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt) } func mustBeClusterTopo(topo Topology) *Specification { diff --git a/pkg/cluster/spec/tiflash.go b/pkg/cluster/spec/tiflash.go index 679f117d26..1403eff12f 100644 --- a/pkg/cluster/spec/tiflash.go +++ b/pkg/cluster/spec/tiflash.go @@ -758,6 +758,7 @@ func (i *TiFlashInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { topo := i.topo.(*Specification) if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { @@ -870,13 +871,14 @@ func (i *TiFlashInstance) ScaleConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { s := i.topo defer func() { i.topo = s }() i.topo = mustBeClusterTopo(topo) - return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt) } type replicateConfig struct { diff --git a/pkg/cluster/spec/tikv.go b/pkg/cluster/spec/tikv.go index 5b4bb43cfd..70d5921c42 100644 --- a/pkg/cluster/spec/tikv.go +++ b/pkg/cluster/spec/tikv.go @@ -249,6 +249,7 @@ func (i *TiKVInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { topo := i.topo.(*Specification) if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { @@ -375,13 +376,14 @@ func (i *TiKVInstance) ScaleConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { s := i.topo defer func() { i.topo = s }() i.topo = mustBeClusterTopo(topo) - return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt) } var _ RollingUpdateInstance = &TiKVInstance{} diff --git a/pkg/cluster/spec/tikv_cdc.go b/pkg/cluster/spec/tikv_cdc.go index 66eecb2b53..4d4e99201b 100644 --- a/pkg/cluster/spec/tikv_cdc.go +++ b/pkg/cluster/spec/tikv_cdc.go @@ -180,6 +180,7 @@ func (i *TiKVCDCInstance) ScaleConfig( clusterVersion, user string, paths meta.DirPaths, + opt InstanceOpt, ) error { s := i.topo defer func() { @@ -187,7 +188,7 @@ func (i *TiKVCDCInstance) ScaleConfig( }() i.topo = mustBeClusterTopo(topo) - return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths, opt) } // InitConfig implements Instance interface. @@ -198,6 +199,7 @@ func (i *TiKVCDCInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { if !tidbver.TiKVCDCSupportDeploy(clusterVersion) { return errors.New("tikv-cdc only supports cluster version v6.2.0 or later") diff --git a/pkg/cluster/spec/tiproxy.go b/pkg/cluster/spec/tiproxy.go index d6c634fa79..c00ce85117 100644 --- a/pkg/cluster/spec/tiproxy.go +++ b/pkg/cluster/spec/tiproxy.go @@ -203,6 +203,7 @@ func (i *TiProxyInstance) ScaleConfig( clusterVersion, user string, paths meta.DirPaths, + opt InstanceOpt, ) error { s := i.topo defer func() { @@ -210,7 +211,7 @@ func (i *TiProxyInstance) ScaleConfig( }() i.topo = mustBeClusterTopo(topo) - return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths, opt) } func (i *TiProxyInstance) checkConfig( @@ -245,6 +246,7 @@ func (i *TiProxyInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { topo := i.topo.(*Specification) if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { diff --git a/pkg/cluster/spec/tispark.go b/pkg/cluster/spec/tispark.go index 2f3447d703..4f46737912 100644 --- a/pkg/cluster/spec/tispark.go +++ b/pkg/cluster/spec/tispark.go @@ -229,6 +229,7 @@ func (i *TiSparkMasterInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) (err error) { // generate systemd service to invoke spark's start/stop scripts comp := i.Role() @@ -332,12 +333,13 @@ func (i *TiSparkMasterInstance) ScaleConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { s := i.topo defer func() { i.topo = s }() cluster := mustBeClusterTopo(topo) i.topo = cluster.Merge(i.topo) - return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt) } // TiSparkWorkerComponent represents TiSpark slave component. @@ -423,6 +425,7 @@ func (i *TiSparkWorkerInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) (err error) { // generate systemd service to invoke spark's start/stop scripts comp := i.Role() @@ -544,9 +547,10 @@ func (i *TiSparkWorkerInstance) ScaleConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { s := i.topo defer func() { i.topo = s }() i.topo = topo.Merge(i.topo) - return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt) } diff --git a/pkg/cluster/spec/tso.go b/pkg/cluster/spec/tso.go index c3f6481b60..ebb642b346 100644 --- a/pkg/cluster/spec/tso.go +++ b/pkg/cluster/spec/tso.go @@ -217,6 +217,7 @@ func (i *TSOInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { topo := i.topo.(*Specification) if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { @@ -341,13 +342,14 @@ func (i *TSOInstance) ScaleConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt InstanceOpt, ) error { s := i.topo defer func() { i.topo = s }() i.topo = mustBeClusterTopo(topo) - return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt) } var _ RollingUpdateInstance = &TSOInstance{} diff --git a/pkg/cluster/task/builder.go b/pkg/cluster/task/builder.go index 4a276bc006..7f4b39fdaa 100644 --- a/pkg/cluster/task/builder.go +++ b/pkg/cluster/task/builder.go @@ -244,7 +244,7 @@ func (b *Builder) BackupComponent(component, fromVer string, host, deployDir str } // InitConfig appends a CopyComponent task to the current task collection -func (b *Builder) InitConfig(clusterName, version string, specManager *spec.SpecManager, inst spec.Instance, deployUser string, ignoreCheck bool, paths meta.DirPaths) *Builder { +func (b *Builder) InitConfig(clusterName, version string, specManager *spec.SpecManager, inst spec.Instance, deployUser string, ignoreCheck bool, paths meta.DirPaths, opt spec.InstanceOpt) *Builder { // get nightly version var componentVersion utils.Version meta := specManager.NewMetadata() @@ -274,6 +274,7 @@ func (b *Builder) InitConfig(clusterName, version string, specManager *spec.Spec deployUser: deployUser, ignoreCheck: ignoreCheck, paths: paths, + opt: opt, }) return b } diff --git a/pkg/cluster/task/init_config.go b/pkg/cluster/task/init_config.go index d87b504f42..7b450ec901 100644 --- a/pkg/cluster/task/init_config.go +++ b/pkg/cluster/task/init_config.go @@ -33,6 +33,7 @@ type InitConfig struct { deployUser string ignoreCheck bool paths meta.DirPaths + opt spec.InstanceOpt } // Execute implements the Task interface @@ -47,7 +48,7 @@ func (c *InitConfig) Execute(ctx context.Context) error { return errors.Annotatef(err, "create cache directory failed: %s", c.paths.Cache) } - err := c.instance.InitConfig(ctx, exec, c.clusterName, c.clusterVersion, c.deployUser, c.paths) + err := c.instance.InitConfig(ctx, exec, c.clusterName, c.clusterVersion, c.deployUser, c.paths, c.opt) if err != nil { if c.ignoreCheck && errors.Cause(err) == spec.ErrorCheckConfig { return nil diff --git a/pkg/cluster/task/init_config_test.go b/pkg/cluster/task/init_config_test.go index 23e4669c2d..1cc1e3a73a 100644 --- a/pkg/cluster/task/init_config_test.go +++ b/pkg/cluster/task/init_config_test.go @@ -50,6 +50,7 @@ func (i *fakeInstance) InitConfig( clusterVersion string, deployUser string, paths meta.DirPaths, + opt spec.InstanceOpt, ) error { if i.hasConfigError { return errors.Annotate(spec.ErrorCheckConfig, "test error") diff --git a/pkg/cluster/task/scale_config.go b/pkg/cluster/task/scale_config.go index f336d8071a..13ed667ed9 100644 --- a/pkg/cluster/task/scale_config.go +++ b/pkg/cluster/task/scale_config.go @@ -47,7 +47,7 @@ func (c *ScaleConfig) Execute(ctx context.Context) error { return err } - return c.instance.ScaleConfig(ctx, exec, c.base, c.clusterName, c.clusterVersion, c.deployUser, c.paths) + return c.instance.ScaleConfig(ctx, exec, c.base, c.clusterName, c.clusterVersion, c.deployUser, c.paths, spec.InstanceOpt{}) } // Rollback implements the Task interface From f6b5c73cce9ecb2d4c22740783f6c69db1b8e41a Mon Sep 17 00:00:00 2001 From: wk989898 Date: Mon, 13 Oct 2025 09:34:46 +0000 Subject: [PATCH 2/2] update --- components/dm/spec/logic.go | 8 ++++++-- pkg/cluster/manager/manager_test.go | 1 - pkg/cluster/manager/upgrade.go | 4 ++-- pkg/cluster/spec/cdc.go | 2 -- pkg/cluster/spec/instance.go | 1 + 5 files changed, 9 insertions(+), 7 deletions(-) diff --git a/components/dm/spec/logic.go b/components/dm/spec/logic.go index 0c3a7f1cb8..afb4b5f710 100644 --- a/components/dm/spec/logic.go +++ b/components/dm/spec/logic.go @@ -138,6 +138,7 @@ func (i *MasterInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt spec.InstanceOpt, ) error { if err := i.BaseInstance.InitConfig(ctx, e, i.topo.GlobalOptions, deployUser, paths); err != nil { return err @@ -234,8 +235,9 @@ func (i *MasterInstance) ScaleConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt spec.InstanceOpt, ) error { - if err := i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths); err != nil { + if err := i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt); err != nil { return err } @@ -361,6 +363,7 @@ func (i *WorkerInstance) InitConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt spec.InstanceOpt, ) error { if err := i.BaseInstance.InitConfig(ctx, e, i.topo.GlobalOptions, deployUser, paths); err != nil { return err @@ -455,13 +458,14 @@ func (i *WorkerInstance) ScaleConfig( clusterVersion, deployUser string, paths meta.DirPaths, + opt spec.InstanceOpt, ) error { s := i.topo defer func() { i.topo = s }() i.topo = topo.(*Specification) - return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt) } // GetGlobalOptions returns cluster topology diff --git a/pkg/cluster/manager/manager_test.go b/pkg/cluster/manager/manager_test.go index b3789a7c00..e46f2d98a0 100644 --- a/pkg/cluster/manager/manager_test.go +++ b/pkg/cluster/manager/manager_test.go @@ -23,7 +23,6 @@ import ( ) func TestVersionCompare(t *testing.T) { - res := versionCompare("v4.0.0", "v4.0.1") assert.Equal(t, res, -1) diff --git a/pkg/cluster/manager/upgrade.go b/pkg/cluster/manager/upgrade.go index d37732be51..4bdc3b4d3e 100644 --- a/pkg/cluster/manager/upgrade.go +++ b/pkg/cluster/manager/upgrade.go @@ -39,7 +39,7 @@ import ( ) const ( - CDCNewArchVersion = "v9.0.0" + cdcNewArchVersion = "v9.0.0" ) func (m *Manager) upgradePrecheck(name string, componentVersions map[string]string, opt operator.Options, skipConfirm bool) error { @@ -222,7 +222,7 @@ This operation will upgrade %s %s cluster %s (with a concurrency of %d) to %s:%s } checkCDCNewArch := false - if versionCompare(oldver, CDCNewArchVersion) == -1 && versionCompare(CDCNewArchVersion, clusterVersion) > -1 { + if versionCompare(oldver, cdcNewArchVersion) == -1 && versionCompare(cdcNewArchVersion, clusterVersion) > -1 { checkCDCNewArch = true } tb.InitConfig( diff --git a/pkg/cluster/spec/cdc.go b/pkg/cluster/spec/cdc.go index 1514d786e3..b13e5a6140 100644 --- a/pkg/cluster/spec/cdc.go +++ b/pkg/cluster/spec/cdc.go @@ -211,8 +211,6 @@ func (i *CDCInstance) InitConfig( spec := i.InstanceSpec.(*CDCSpec) globalConfig := topo.ServerConfigs.CDC instanceConfig := spec.Config - // clusterVersion v9.0 - // version v9.0.0.beta.1 version := i.CalculateVersion(clusterVersion) if !tidbver.TiCDCSupportConfigFile(version) { diff --git a/pkg/cluster/spec/instance.go b/pkg/cluster/spec/instance.go index ab9e5d6499..1dec23697a 100644 --- a/pkg/cluster/spec/instance.go +++ b/pkg/cluster/spec/instance.go @@ -149,6 +149,7 @@ func PortStopped(ctx context.Context, e ctxt.Executor, port int, timeout uint64) return w.Execute(ctx, e) } +// InstanceOpt can be used to store options when initializing the configuration. type InstanceOpt struct { CheckCDCNewArch bool }