1#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use risingwave_common::util::tokio_util::sync::CancellationToken;
22use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
23use risingwave_meta::backup_restore::RestoreOpts;
24use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
25use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
26use thiserror_ext::AsReport;
27
28use crate::cmd_impl::hummock::{
29    build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
30};
31use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
32use crate::cmd_impl::throttle::apply_throttle;
33use crate::common::CtlContext;
34
35pub mod cmd_impl;
36pub mod common;
37
38#[derive(Parser)]
44#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
45#[clap(propagate_version = true)]
46#[clap(infer_subcommands = true)]
47pub struct CliOpts {
48    #[clap(subcommand)]
49    command: Commands,
50}
51
52#[derive(Subcommand)]
53#[clap(infer_subcommands = true)]
54enum Commands {
55    #[clap(subcommand)]
57    Compute(ComputeCommands),
58    #[clap(subcommand)]
60    Hummock(HummockCommands),
61    #[clap(subcommand)]
63    Table(TableCommands),
64    #[clap(subcommand)]
66    Meta(MetaCommands),
67    #[clap(subcommand)]
69    Scale(ScaleCommands),
70    #[clap(subcommand)]
72    Bench(BenchCommands),
73    #[clap(subcommand)]
75    #[clap(visible_alias("trace"))]
76    AwaitTree(AwaitTreeCommands),
77    #[clap(subcommand)]
80    Profile(ProfileCommands),
81    #[clap(subcommand)]
82    Throttle(ThrottleCommands),
83    #[clap(subcommand, hide = true)]
85    Test(TestCommands),
86}
87
88#[derive(Subcommand)]
89enum ComputeCommands {
90    ShowConfig { host: String },
92}
93
94#[allow(clippy::large_enum_variant)]
95#[derive(Subcommand)]
96enum HummockCommands {
97    ListVersion {
99        #[clap(short, long = "verbose", default_value_t = false)]
100        verbose: bool,
101
102        #[clap(long = "verbose_key_range", default_value_t = false)]
103        verbose_key_range: bool,
104    },
105
106    ListVersionDeltas {
108        #[clap(short, long = "start-version-delta-id", default_value_t = 0)]
109        start_id: u64,
110
111        #[clap(short, long = "num-epochs", default_value_t = 100)]
112        num_epochs: u32,
113    },
114    DisableCommitEpoch,
116    ListKv {
118        #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
119        epoch: u64,
120
121        #[clap(short, long = "table-id")]
122        table_id: u32,
123
124        data_dir: Option<String>,
126
127        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
128        use_new_object_prefix_strategy: bool,
129    },
130    SstDump(SstDumpArgs),
131    TriggerManualCompaction {
133        #[clap(short, long = "compaction-group-id", default_value_t = 2)]
134        compaction_group_id: u64,
135
136        #[clap(short, long = "table-id", default_value_t = 0)]
137        table_id: u32,
138
139        #[clap(short, long = "level", default_value_t = 1)]
140        level: u32,
141
142        #[clap(short, long = "sst-ids", value_delimiter = ',')]
143        sst_ids: Vec<u64>,
144    },
145    TriggerFullGc {
148        #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
149        sst_retention_time_sec: u64,
150        #[clap(short, long = "prefix", required = false)]
151        prefix: Option<String>,
152    },
153    ListPinnedVersions {},
155    ListCompactionGroup,
157    UpdateCompactionConfig {
159        #[clap(long, value_delimiter = ',')]
160        compaction_group_ids: Vec<u64>,
161        #[clap(long)]
162        max_bytes_for_level_base: Option<u64>,
163        #[clap(long)]
164        max_bytes_for_level_multiplier: Option<u64>,
165        #[clap(long)]
166        max_compaction_bytes: Option<u64>,
167        #[clap(long)]
168        sub_level_max_compaction_bytes: Option<u64>,
169        #[clap(long)]
170        level0_tier_compact_file_number: Option<u64>,
171        #[clap(long)]
172        target_file_size_base: Option<u64>,
173        #[clap(long)]
174        compaction_filter_mask: Option<u32>,
175        #[clap(long)]
176        max_sub_compaction: Option<u32>,
177        #[clap(long)]
178        level0_stop_write_threshold_sub_level_number: Option<u64>,
179        #[clap(long)]
180        level0_sub_level_compact_level_count: Option<u32>,
181        #[clap(long)]
182        max_space_reclaim_bytes: Option<u64>,
183        #[clap(long)]
184        level0_max_compact_file_number: Option<u64>,
185        #[clap(long)]
186        level0_overlapping_sub_level_compact_level_count: Option<u32>,
187        #[clap(long)]
188        enable_emergency_picker: Option<bool>,
189        #[clap(long)]
190        tombstone_reclaim_ratio: Option<u32>,
191        #[clap(long)]
192        compression_level: Option<u32>,
193        #[clap(long)]
194        compression_algorithm: Option<String>,
195        #[clap(long)]
196        max_l0_compact_level: Option<u32>,
197        #[clap(long)]
198        sst_allowed_trivial_move_min_size: Option<u64>,
199        #[clap(long)]
200        disable_auto_group_scheduling: Option<bool>,
201        #[clap(long)]
202        max_overlapping_level_size: Option<u64>,
203        #[clap(long)]
204        sst_allowed_trivial_move_max_count: Option<u32>,
205        #[clap(long)]
206        emergency_level0_sst_file_count: Option<u32>,
207        #[clap(long)]
208        emergency_level0_sub_level_partition: Option<u32>,
209        #[clap(long)]
210        level0_stop_write_threshold_max_sst_count: Option<u32>,
211        #[clap(long)]
212        level0_stop_write_threshold_max_size: Option<u64>,
213        #[clap(long)]
214        enable_optimize_l0_interval_selection: Option<bool>,
215    },
216    SplitCompactionGroup {
218        #[clap(long)]
219        compaction_group_id: u64,
220        #[clap(long, value_delimiter = ',')]
221        table_ids: Vec<u32>,
222        #[clap(long, default_value_t = 0)]
223        partition_vnode_count: u32,
224    },
225    PauseVersionCheckpoint,
227    ResumeVersionCheckpoint,
229    ReplayVersion,
231    ListCompactionStatus {
233        #[clap(short, long = "verbose", default_value_t = false)]
234        verbose: bool,
235    },
236    GetCompactionScore {
237        #[clap(long)]
238        compaction_group_id: u64,
239    },
240    ValidateVersion,
242    RebuildTableStats,
244    CancelCompactTask {
245        #[clap(short, long)]
246        task_id: u64,
247    },
248    PrintUserKeyInArchive {
249        #[clap(long, value_delimiter = ',')]
251        archive_ids: Vec<u64>,
252        #[clap(long)]
254        data_dir: String,
255        #[clap(long)]
257        user_key: String,
258        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
259        use_new_object_prefix_strategy: bool,
260    },
261    PrintVersionDeltaInArchive {
262        #[clap(long, value_delimiter = ',')]
264        archive_ids: Vec<u64>,
265        #[clap(long)]
267        data_dir: String,
268        #[clap(long)]
270        sst_id: u64,
271        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
272        use_new_object_prefix_strategy: bool,
273    },
274    TieredCacheTracing {
275        #[clap(long)]
276        enable: bool,
277        #[clap(long)]
278        record_hybrid_insert_threshold_ms: Option<u32>,
279        #[clap(long)]
280        record_hybrid_get_threshold_ms: Option<u32>,
281        #[clap(long)]
282        record_hybrid_obtain_threshold_ms: Option<u32>,
283        #[clap(long)]
284        record_hybrid_remove_threshold_ms: Option<u32>,
285        #[clap(long)]
286        record_hybrid_fetch_threshold_ms: Option<u32>,
287    },
288    MergeCompactionGroup {
289        #[clap(long)]
290        left_group_id: u64,
291        #[clap(long)]
292        right_group_id: u64,
293    },
294    MigrateLegacyObject {
295        url: String,
296        source_dir: String,
297        target_dir: String,
298        #[clap(long, default_value = "100")]
299        concurrency: u32,
300    },
301    ResizeCache {
302        #[clap(long)]
303        meta_cache_capacity_mb: Option<u64>,
304        #[clap(long)]
305        data_cache_capacity_mb: Option<u64>,
306    },
307}
308
309#[derive(Subcommand)]
310enum TableCommands {
311    Scan {
313        mv_name: String,
315        data_dir: Option<String>,
317
318        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
319        use_new_object_prefix_strategy: bool,
320    },
321    ScanById {
323        table_id: u32,
325        data_dir: Option<String>,
327        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
328        use_new_object_prefix_strategy: bool,
329    },
330    List,
332}
333
334#[derive(Subcommand, Debug)]
335enum ScaleCommands {
336    #[clap(verbatim_doc_comment)]
338    Cordon {
339        #[clap(
341            long,
342            required = true,
343            value_delimiter = ',',
344            value_name = "id or host,..."
345        )]
346        workers: Vec<String>,
347    },
348    Uncordon {
350        #[clap(
352            long,
353            required = true,
354            value_delimiter = ',',
355            value_name = "id or host,..."
356        )]
357        workers: Vec<String>,
358    },
359}
360
361#[derive(Subcommand)]
362#[allow(clippy::large_enum_variant)]
363enum MetaCommands {
364    Pause,
366    Resume,
368    ClusterInfo,
370    SourceSplitInfo {
372        #[clap(long)]
373        ignore_id: bool,
374    },
375    #[clap(verbatim_doc_comment)]
392    #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
393    Reschedule {
394        #[clap(long, requires = "revision")]
396        plan: Option<String>,
397        #[clap(long)]
399        revision: Option<u64>,
400        #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
402        from: Option<String>,
403        #[clap(long, default_value = "false")]
405        dry_run: bool,
406        #[clap(long, default_value = "false")]
408        resolve_no_shuffle: bool,
409    },
410    BackupMeta {
412        #[clap(long)]
413        remarks: Option<String>,
414    },
415    RestoreMeta {
417        #[command(flatten)]
418        opts: RestoreOpts,
419    },
420    DeleteMetaSnapshots {
422        #[clap(long, value_delimiter = ',')]
423        snapshot_ids: Vec<u64>,
424    },
425
426    ListConnections,
428
429    ListServingFragmentMapping,
431
432    UnregisterWorkers {
434        #[clap(
436            long,
437            required = true,
438            value_delimiter = ',',
439            value_name = "worker_id or worker_host:worker_port, ..."
440        )]
441        workers: Vec<String>,
442
443        #[clap(short = 'y', long, default_value_t = false)]
445        yes: bool,
446
447        #[clap(long, default_value_t = false)]
449        ignore_not_found: bool,
450
451        #[clap(long, default_value_t = false)]
453        check_fragment_occupied: bool,
454    },
455
456    ValidateSource {
458        #[clap(long)]
461        props: String,
462    },
463
464    SetCdcTableBackfillParallelism {
465        #[clap(long, required = true)]
466        table_id: u32,
467        #[clap(long, required = true)]
468        parallelism: u32,
469    },
470}
471
472#[derive(Subcommand, Clone, Debug)]
473pub enum AwaitTreeCommands {
474    Dump {
476        #[clap(short, long = "actor-traces-format")]
478        actor_traces_format: Option<String>,
479    },
480    Analyze {
482        #[clap(long = "path")]
486        path: Option<String>,
487    },
488    Transcribe {
490        #[clap(long = "path")]
492        path: String,
493    },
494}
495
496#[derive(Subcommand, Clone, Debug)]
497enum TestCommands {
498    Jvm,
500}
501
502#[derive(Subcommand, Clone, Debug)]
503enum ThrottleCommands {
504    Source(ThrottleCommandArgs),
505    Mv(ThrottleCommandArgs),
506}
507
508#[derive(Clone, Debug, Args)]
509pub struct ThrottleCommandArgs {
510    id: u32,
511    rate: Option<u32>,
512}
513
514#[derive(Subcommand, Clone, Debug)]
515pub enum ProfileCommands {
516    Cpu {
518        #[clap(short, long = "sleep")]
520        sleep: u64,
521    },
522    Heap {
524        #[clap(long = "dir")]
526        dir: Option<String>,
527    },
528}
529
530pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
537    let context = CtlContext::default();
538
539    tokio::select! {
540        _ = shutdown.cancelled() => {
541            context.try_close().await;
543        }
544
545        result = start_fallible(opts, &context) => {
546            if let Err(e) = result {
547                eprintln!("Error: {:#?}", e.as_report()); std::process::exit(1);
549            }
550        }
551    }
552}
553
554pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
557    let result = start_impl(opts, context).await;
558    context.try_close().await;
559    result
560}
561
562#[expect(
563    clippy::large_stack_frames,
564    reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
565              post-layout generator stores only one arm at a time (~13–16 KiB)."
566)]
567async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
568    match opts.command {
569        Commands::Compute(ComputeCommands::ShowConfig { host }) => {
570            cmd_impl::compute::show_config(&host).await?
571        }
572        Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
573            cmd_impl::hummock::disable_commit_epoch(context).await?
574        }
575        Commands::Hummock(HummockCommands::ListVersion {
576            verbose,
577            verbose_key_range,
578        }) => {
579            cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
580        }
581        Commands::Hummock(HummockCommands::ListVersionDeltas {
582            start_id,
583            num_epochs,
584        }) => {
585            cmd_impl::hummock::list_version_deltas(
586                context,
587                HummockVersionId::new(start_id),
588                num_epochs,
589            )
590            .await?;
591        }
592        Commands::Hummock(HummockCommands::ListKv {
593            epoch,
594            table_id,
595            data_dir,
596            use_new_object_prefix_strategy,
597        }) => {
598            cmd_impl::hummock::list_kv(
599                context,
600                epoch,
601                table_id,
602                data_dir,
603                use_new_object_prefix_strategy,
604            )
605            .await?;
606        }
607        Commands::Hummock(HummockCommands::SstDump(args)) => {
608            cmd_impl::hummock::sst_dump(context, args).await.unwrap()
609        }
610        Commands::Hummock(HummockCommands::TriggerManualCompaction {
611            compaction_group_id,
612            table_id,
613            level,
614            sst_ids,
615        }) => {
616            cmd_impl::hummock::trigger_manual_compaction(
617                context,
618                compaction_group_id,
619                table_id,
620                level,
621                sst_ids,
622            )
623            .await?
624        }
625        Commands::Hummock(HummockCommands::TriggerFullGc {
626            sst_retention_time_sec,
627            prefix,
628        }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
629        Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
630            list_pinned_versions(context).await?
631        }
632        Commands::Hummock(HummockCommands::ListCompactionGroup) => {
633            cmd_impl::hummock::list_compaction_group(context).await?
634        }
635        Commands::Hummock(HummockCommands::UpdateCompactionConfig {
636            compaction_group_ids,
637            max_bytes_for_level_base,
638            max_bytes_for_level_multiplier,
639            max_compaction_bytes,
640            sub_level_max_compaction_bytes,
641            level0_tier_compact_file_number,
642            target_file_size_base,
643            compaction_filter_mask,
644            max_sub_compaction,
645            level0_stop_write_threshold_sub_level_number,
646            level0_sub_level_compact_level_count,
647            max_space_reclaim_bytes,
648            level0_max_compact_file_number,
649            level0_overlapping_sub_level_compact_level_count,
650            enable_emergency_picker,
651            tombstone_reclaim_ratio,
652            compression_level,
653            compression_algorithm,
654            max_l0_compact_level,
655            sst_allowed_trivial_move_min_size,
656            disable_auto_group_scheduling,
657            max_overlapping_level_size,
658            sst_allowed_trivial_move_max_count,
659            emergency_level0_sst_file_count,
660            emergency_level0_sub_level_partition,
661            level0_stop_write_threshold_max_sst_count,
662            level0_stop_write_threshold_max_size,
663            enable_optimize_l0_interval_selection,
664        }) => {
665            cmd_impl::hummock::update_compaction_config(
666                context,
667                compaction_group_ids,
668                build_compaction_config_vec(
669                    max_bytes_for_level_base,
670                    max_bytes_for_level_multiplier,
671                    max_compaction_bytes,
672                    sub_level_max_compaction_bytes,
673                    level0_tier_compact_file_number,
674                    target_file_size_base,
675                    compaction_filter_mask,
676                    max_sub_compaction,
677                    level0_stop_write_threshold_sub_level_number,
678                    level0_sub_level_compact_level_count,
679                    max_space_reclaim_bytes,
680                    level0_max_compact_file_number,
681                    level0_overlapping_sub_level_compact_level_count,
682                    enable_emergency_picker,
683                    tombstone_reclaim_ratio,
684                    if let Some(level) = compression_level {
685                        assert!(compression_algorithm.is_some());
686                        Some(CompressionAlgorithm {
687                            level,
688                            compression_algorithm: compression_algorithm.unwrap(),
689                        })
690                    } else {
691                        None
692                    },
693                    max_l0_compact_level,
694                    sst_allowed_trivial_move_min_size,
695                    disable_auto_group_scheduling,
696                    max_overlapping_level_size,
697                    sst_allowed_trivial_move_max_count,
698                    emergency_level0_sst_file_count,
699                    emergency_level0_sub_level_partition,
700                    level0_stop_write_threshold_max_sst_count,
701                    level0_stop_write_threshold_max_size,
702                    enable_optimize_l0_interval_selection,
703                ),
704            )
705            .await?
706        }
707        Commands::Hummock(HummockCommands::SplitCompactionGroup {
708            compaction_group_id,
709            table_ids,
710            partition_vnode_count,
711        }) => {
712            cmd_impl::hummock::split_compaction_group(
713                context,
714                compaction_group_id,
715                &table_ids,
716                partition_vnode_count,
717            )
718            .await?;
719        }
720        Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
721            cmd_impl::hummock::pause_version_checkpoint(context).await?;
722        }
723        Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
724            cmd_impl::hummock::resume_version_checkpoint(context).await?;
725        }
726        Commands::Hummock(HummockCommands::ReplayVersion) => {
727            cmd_impl::hummock::replay_version(context).await?;
728        }
729        Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
730            cmd_impl::hummock::list_compaction_status(context, verbose).await?;
731        }
732        Commands::Hummock(HummockCommands::GetCompactionScore {
733            compaction_group_id,
734        }) => {
735            cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
736        }
737        Commands::Hummock(HummockCommands::ValidateVersion) => {
738            cmd_impl::hummock::validate_version(context).await?;
739        }
740        Commands::Hummock(HummockCommands::RebuildTableStats) => {
741            cmd_impl::hummock::rebuild_table_stats(context).await?;
742        }
743        Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
744            cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
745        }
746        Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
747            archive_ids,
748            data_dir,
749            sst_id,
750            use_new_object_prefix_strategy,
751        }) => {
752            cmd_impl::hummock::print_version_delta_in_archive(
753                context,
754                archive_ids.into_iter().map(HummockVersionId::new),
755                data_dir,
756                sst_id,
757                use_new_object_prefix_strategy,
758            )
759            .await?;
760        }
761        Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
762            archive_ids,
763            data_dir,
764            user_key,
765            use_new_object_prefix_strategy,
766        }) => {
767            cmd_impl::hummock::print_user_key_in_archive(
768                context,
769                archive_ids.into_iter().map(HummockVersionId::new),
770                data_dir,
771                user_key,
772                use_new_object_prefix_strategy,
773            )
774            .await?;
775        }
776        Commands::Hummock(HummockCommands::TieredCacheTracing {
777            enable,
778            record_hybrid_insert_threshold_ms,
779            record_hybrid_get_threshold_ms,
780            record_hybrid_obtain_threshold_ms,
781            record_hybrid_remove_threshold_ms,
782            record_hybrid_fetch_threshold_ms,
783        }) => {
784            cmd_impl::hummock::tiered_cache_tracing(
785                context,
786                enable,
787                record_hybrid_insert_threshold_ms,
788                record_hybrid_get_threshold_ms,
789                record_hybrid_obtain_threshold_ms,
790                record_hybrid_remove_threshold_ms,
791                record_hybrid_fetch_threshold_ms,
792            )
793            .await?
794        }
795        Commands::Hummock(HummockCommands::MergeCompactionGroup {
796            left_group_id,
797            right_group_id,
798        }) => {
799            cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
800                .await?
801        }
802
803        Commands::Hummock(HummockCommands::MigrateLegacyObject {
804            url,
805            source_dir,
806            target_dir,
807            concurrency,
808        }) => {
809            migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
810        }
811        Commands::Hummock(HummockCommands::ResizeCache {
812            meta_cache_capacity_mb,
813            data_cache_capacity_mb,
814        }) => {
815            const MIB: u64 = 1024 * 1024;
816            cmd_impl::hummock::resize_cache(
817                context,
818                meta_cache_capacity_mb.map(|v| v * MIB),
819                data_cache_capacity_mb.map(|v| v * MIB),
820            )
821            .await?
822        }
823        Commands::Table(TableCommands::Scan {
824            mv_name,
825            data_dir,
826            use_new_object_prefix_strategy,
827        }) => {
828            cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
829                .await?
830        }
831        Commands::Table(TableCommands::ScanById {
832            table_id,
833            data_dir,
834            use_new_object_prefix_strategy,
835        }) => {
836            cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
837                .await?
838        }
839        Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
840        Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
841        Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
842        Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
843        Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
844        Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
845            cmd_impl::meta::source_split_info(context, ignore_id).await?
846        }
847        Commands::Meta(MetaCommands::Reschedule {
848            from,
849            dry_run,
850            plan,
851            revision,
852            resolve_no_shuffle,
853        }) => {
854            cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
855                .await?
856        }
857        Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
858            cmd_impl::meta::backup_meta(context, remarks).await?
859        }
860        Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
861            risingwave_meta::backup_restore::restore(opts).await?
862        }
863        Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
864            cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
865        }
866        Commands::Meta(MetaCommands::ListConnections) => {
867            cmd_impl::meta::list_connections(context).await?
868        }
869        Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
870            cmd_impl::meta::list_serving_fragment_mappings(context).await?
871        }
872        Commands::Meta(MetaCommands::UnregisterWorkers {
873            workers,
874            yes,
875            ignore_not_found,
876            check_fragment_occupied,
877        }) => {
878            cmd_impl::meta::unregister_workers(
879                context,
880                workers,
881                yes,
882                ignore_not_found,
883                check_fragment_occupied,
884            )
885            .await?
886        }
887        Commands::Meta(MetaCommands::ValidateSource { props }) => {
888            cmd_impl::meta::validate_source(context, props).await?
889        }
890        Commands::AwaitTree(AwaitTreeCommands::Dump {
891            actor_traces_format,
892        }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
893        Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
894            cmd_impl::await_tree::bottleneck_detect(context, path).await?
895        }
896        Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
897            rw_diagnose_tools::await_tree::transcribe(path)?
898        }
899        Commands::Profile(ProfileCommands::Cpu { sleep }) => {
900            cmd_impl::profile::cpu_profile(context, sleep).await?
901        }
902        Commands::Profile(ProfileCommands::Heap { dir }) => {
903            cmd_impl::profile::heap_profile(context, dir).await?
904        }
905        Commands::Scale(ScaleCommands::Cordon { workers }) => {
906            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
907                .await?
908        }
909        Commands::Scale(ScaleCommands::Uncordon { workers }) => {
910            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
911                .await?
912        }
913        Commands::Throttle(ThrottleCommands::Source(args)) => {
914            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
915        }
916        Commands::Throttle(ThrottleCommands::Mv(args)) => {
917            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
918        }
919        Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
920            table_id,
921            parallelism,
922        }) => {
923            set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
924        }
925        Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
926    }
927    Ok(())
928}