1#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use itertools::Itertools;
22use risingwave_common::util::tokio_util::sync::CancellationToken;
23use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
24use risingwave_meta::backup_restore::RestoreOpts;
25use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
26use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
27use thiserror_ext::AsReport;
28
29use crate::cmd_impl::hummock::{
30 build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
31};
32use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
33use crate::cmd_impl::throttle::apply_throttle;
34use crate::common::CtlContext;
35
36pub mod cmd_impl;
37pub mod common;
38
39#[derive(Parser)]
45#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
46#[clap(propagate_version = true)]
47#[clap(infer_subcommands = true)]
48pub struct CliOpts {
49 #[clap(subcommand)]
50 command: Commands,
51}
52
53#[derive(Subcommand)]
54#[clap(infer_subcommands = true)]
55enum Commands {
56 #[clap(subcommand)]
58 Compute(ComputeCommands),
59 #[clap(subcommand)]
61 Hummock(HummockCommands),
62 #[clap(subcommand)]
64 Table(TableCommands),
65 #[clap(subcommand)]
67 Meta(MetaCommands),
68 #[clap(subcommand)]
70 Scale(ScaleCommands),
71 #[clap(subcommand)]
73 Bench(BenchCommands),
74 #[clap(subcommand)]
76 #[clap(visible_alias("trace"))]
77 AwaitTree(AwaitTreeCommands),
78 #[clap(subcommand)]
81 Profile(ProfileCommands),
82 #[clap(subcommand)]
83 Throttle(ThrottleCommands),
84 #[clap(subcommand, hide = true)]
86 Test(TestCommands),
87}
88
89#[derive(Subcommand)]
90enum ComputeCommands {
91 ShowConfig { host: String },
93}
94
95#[allow(clippy::large_enum_variant)]
96#[derive(Subcommand)]
97enum HummockCommands {
98 ListVersion {
100 #[clap(short, long = "verbose", default_value_t = false)]
101 verbose: bool,
102
103 #[clap(long = "verbose_key_range", default_value_t = false)]
104 verbose_key_range: bool,
105 },
106
107 ListVersionDeltas {
109 #[clap(short, long = "start-version-delta-id", default_value_t = 0)]
110 start_id: u64,
111
112 #[clap(short, long = "num-epochs", default_value_t = 100)]
113 num_epochs: u32,
114 },
115 DisableCommitEpoch,
117 ListKv {
119 #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
120 epoch: u64,
121
122 #[clap(short, long = "table-id")]
123 table_id: u32,
124
125 data_dir: Option<String>,
127
128 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
129 use_new_object_prefix_strategy: bool,
130 },
131 SstDump(SstDumpArgs),
132 TriggerManualCompaction {
134 #[clap(short, long = "compaction-group-id", default_value_t = 2)]
135 compaction_group_id: u64,
136
137 #[clap(short, long = "table-id", default_value_t = 0)]
138 table_id: u32,
139
140 #[clap(short, long = "level", default_value_t = 1)]
141 level: u32,
142
143 #[clap(short, long = "sst-ids", value_delimiter = ',')]
144 sst_ids: Vec<u64>,
145 },
146 TriggerFullGc {
149 #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
150 sst_retention_time_sec: u64,
151 #[clap(short, long = "prefix", required = false)]
152 prefix: Option<String>,
153 },
154 ListPinnedVersions {},
156 ListCompactionGroup,
158 UpdateCompactionConfig {
160 #[clap(long, value_delimiter = ',')]
161 compaction_group_ids: Vec<u64>,
162 #[clap(long)]
163 max_bytes_for_level_base: Option<u64>,
164 #[clap(long)]
165 max_bytes_for_level_multiplier: Option<u64>,
166 #[clap(long)]
167 max_compaction_bytes: Option<u64>,
168 #[clap(long)]
169 sub_level_max_compaction_bytes: Option<u64>,
170 #[clap(long)]
171 level0_tier_compact_file_number: Option<u64>,
172 #[clap(long)]
173 target_file_size_base: Option<u64>,
174 #[clap(long)]
175 compaction_filter_mask: Option<u32>,
176 #[clap(long)]
177 max_sub_compaction: Option<u32>,
178 #[clap(long)]
179 level0_stop_write_threshold_sub_level_number: Option<u64>,
180 #[clap(long)]
181 level0_sub_level_compact_level_count: Option<u32>,
182 #[clap(long)]
183 max_space_reclaim_bytes: Option<u64>,
184 #[clap(long)]
185 level0_max_compact_file_number: Option<u64>,
186 #[clap(long)]
187 level0_overlapping_sub_level_compact_level_count: Option<u32>,
188 #[clap(long)]
189 enable_emergency_picker: Option<bool>,
190 #[clap(long)]
191 tombstone_reclaim_ratio: Option<u32>,
192 #[clap(long)]
193 compression_level: Option<u32>,
194 #[clap(long)]
195 compression_algorithm: Option<String>,
196 #[clap(long)]
197 max_l0_compact_level: Option<u32>,
198 #[clap(long)]
199 sst_allowed_trivial_move_min_size: Option<u64>,
200 #[clap(long)]
201 disable_auto_group_scheduling: Option<bool>,
202 #[clap(long)]
203 max_overlapping_level_size: Option<u64>,
204 #[clap(long)]
205 sst_allowed_trivial_move_max_count: Option<u32>,
206 #[clap(long)]
207 emergency_level0_sst_file_count: Option<u32>,
208 #[clap(long)]
209 emergency_level0_sub_level_partition: Option<u32>,
210 #[clap(long)]
211 level0_stop_write_threshold_max_sst_count: Option<u32>,
212 #[clap(long)]
213 level0_stop_write_threshold_max_size: Option<u64>,
214 #[clap(long)]
215 enable_optimize_l0_interval_selection: Option<bool>,
216 },
217 SplitCompactionGroup {
219 #[clap(long)]
220 compaction_group_id: u64,
221 #[clap(long, value_delimiter = ',')]
222 table_ids: Vec<u32>,
223 #[clap(long, default_value_t = 0)]
224 partition_vnode_count: u32,
225 },
226 PauseVersionCheckpoint,
228 ResumeVersionCheckpoint,
230 ReplayVersion,
232 ListCompactionStatus {
234 #[clap(short, long = "verbose", default_value_t = false)]
235 verbose: bool,
236 },
237 GetCompactionScore {
238 #[clap(long)]
239 compaction_group_id: u64,
240 },
241 ValidateVersion,
243 RebuildTableStats,
245 CancelCompactTask {
246 #[clap(short, long)]
247 task_id: u64,
248 },
249 PrintUserKeyInArchive {
250 #[clap(long, value_delimiter = ',')]
252 archive_ids: Vec<u64>,
253 #[clap(long)]
255 data_dir: String,
256 #[clap(long)]
258 user_key: String,
259 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
260 use_new_object_prefix_strategy: bool,
261 },
262 PrintVersionDeltaInArchive {
263 #[clap(long, value_delimiter = ',')]
265 archive_ids: Vec<u64>,
266 #[clap(long)]
268 data_dir: String,
269 #[clap(long)]
271 sst_id: u64,
272 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
273 use_new_object_prefix_strategy: bool,
274 },
275 TieredCacheTracing {
276 #[clap(long)]
277 enable: bool,
278 #[clap(long)]
279 record_hybrid_insert_threshold_ms: Option<u32>,
280 #[clap(long)]
281 record_hybrid_get_threshold_ms: Option<u32>,
282 #[clap(long)]
283 record_hybrid_obtain_threshold_ms: Option<u32>,
284 #[clap(long)]
285 record_hybrid_remove_threshold_ms: Option<u32>,
286 #[clap(long)]
287 record_hybrid_fetch_threshold_ms: Option<u32>,
288 },
289 MergeCompactionGroup {
290 #[clap(long)]
291 left_group_id: u64,
292 #[clap(long)]
293 right_group_id: u64,
294 },
295 MigrateLegacyObject {
296 url: String,
297 source_dir: String,
298 target_dir: String,
299 #[clap(long, default_value = "100")]
300 concurrency: u32,
301 },
302 ResizeCache {
303 #[clap(long)]
304 meta_cache_capacity_mb: Option<u64>,
305 #[clap(long)]
306 data_cache_capacity_mb: Option<u64>,
307 },
308}
309
310#[derive(Subcommand)]
311enum TableCommands {
312 Scan {
314 mv_name: String,
316 data_dir: Option<String>,
318
319 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
320 use_new_object_prefix_strategy: bool,
321 },
322 ScanById {
324 table_id: u32,
326 data_dir: Option<String>,
328 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
329 use_new_object_prefix_strategy: bool,
330 },
331 List,
333}
334
335#[derive(Subcommand, Debug)]
336enum ScaleCommands {
337 #[clap(verbatim_doc_comment)]
339 Cordon {
340 #[clap(
342 long,
343 required = true,
344 value_delimiter = ',',
345 value_name = "id or host,..."
346 )]
347 workers: Vec<String>,
348 },
349 Uncordon {
351 #[clap(
353 long,
354 required = true,
355 value_delimiter = ',',
356 value_name = "id or host,..."
357 )]
358 workers: Vec<String>,
359 },
360}
361
362#[derive(Subcommand)]
363#[allow(clippy::large_enum_variant)]
364enum MetaCommands {
365 Pause,
367 Resume,
369 ClusterInfo,
371 SourceSplitInfo {
373 #[clap(long)]
374 ignore_id: bool,
375 },
376 #[clap(verbatim_doc_comment)]
393 #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
394 Reschedule {
395 #[clap(long, requires = "revision")]
397 plan: Option<String>,
398 #[clap(long)]
400 revision: Option<u64>,
401 #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
403 from: Option<String>,
404 #[clap(long, default_value = "false")]
406 dry_run: bool,
407 #[clap(long, default_value = "false")]
409 resolve_no_shuffle: bool,
410 },
411 BackupMeta {
413 #[clap(long)]
414 remarks: Option<String>,
415 },
416 RestoreMeta {
418 #[command(flatten)]
419 opts: RestoreOpts,
420 },
421 DeleteMetaSnapshots {
423 #[clap(long, value_delimiter = ',')]
424 snapshot_ids: Vec<u64>,
425 },
426
427 ListConnections,
429
430 ListServingFragmentMapping,
432
433 UnregisterWorkers {
435 #[clap(
437 long,
438 required = true,
439 value_delimiter = ',',
440 value_name = "worker_id or worker_host:worker_port, ..."
441 )]
442 workers: Vec<String>,
443
444 #[clap(short = 'y', long, default_value_t = false)]
446 yes: bool,
447
448 #[clap(long, default_value_t = false)]
450 ignore_not_found: bool,
451
452 #[clap(long, default_value_t = false)]
454 check_fragment_occupied: bool,
455 },
456
457 ValidateSource {
459 #[clap(long)]
462 props: String,
463 },
464
465 SetCdcTableBackfillParallelism {
466 #[clap(long, required = true)]
467 table_id: u32,
468 #[clap(long, required = true)]
469 parallelism: u32,
470 },
471}
472
473#[derive(Subcommand, Clone, Debug)]
474pub enum AwaitTreeCommands {
475 Dump {
477 #[clap(short, long = "actor-traces-format")]
479 actor_traces_format: Option<String>,
480 },
481 Analyze {
483 #[clap(long = "path")]
487 path: Option<String>,
488 },
489 Transcribe {
491 #[clap(long = "path")]
493 path: String,
494 },
495}
496
497#[derive(Subcommand, Clone, Debug)]
498enum TestCommands {
499 Jvm,
501}
502
503#[derive(Subcommand, Clone, Debug)]
504enum ThrottleCommands {
505 Source(ThrottleCommandArgs),
506 Mv(ThrottleCommandArgs),
507}
508
509#[derive(Clone, Debug, Args)]
510pub struct ThrottleCommandArgs {
511 id: u32,
512 rate: Option<u32>,
513}
514
515#[derive(Subcommand, Clone, Debug)]
516pub enum ProfileCommands {
517 Cpu {
519 #[clap(short, long = "sleep")]
521 sleep: u64,
522 },
523 Heap {
525 #[clap(long = "dir")]
527 dir: Option<String>,
528 },
529}
530
531pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
538 let context = CtlContext::default();
539
540 tokio::select! {
541 _ = shutdown.cancelled() => {
542 context.try_close().await;
544 }
545
546 result = start_fallible(opts, &context) => {
547 if let Err(e) = result {
548 eprintln!("Error: {:#?}", e.as_report()); std::process::exit(1);
550 }
551 }
552 }
553}
554
555pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
558 let result = start_impl(opts, context).await;
559 context.try_close().await;
560 result
561}
562
563#[expect(
564 clippy::large_stack_frames,
565 reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
566 post-layout generator stores only one arm at a time (~13–16 KiB)."
567)]
568async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
569 match opts.command {
570 Commands::Compute(ComputeCommands::ShowConfig { host }) => {
571 cmd_impl::compute::show_config(&host).await?
572 }
573 Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
574 cmd_impl::hummock::disable_commit_epoch(context).await?
575 }
576 Commands::Hummock(HummockCommands::ListVersion {
577 verbose,
578 verbose_key_range,
579 }) => {
580 cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
581 }
582 Commands::Hummock(HummockCommands::ListVersionDeltas {
583 start_id,
584 num_epochs,
585 }) => {
586 cmd_impl::hummock::list_version_deltas(
587 context,
588 HummockVersionId::new(start_id),
589 num_epochs,
590 )
591 .await?;
592 }
593 Commands::Hummock(HummockCommands::ListKv {
594 epoch,
595 table_id,
596 data_dir,
597 use_new_object_prefix_strategy,
598 }) => {
599 cmd_impl::hummock::list_kv(
600 context,
601 epoch,
602 table_id,
603 data_dir,
604 use_new_object_prefix_strategy,
605 )
606 .await?;
607 }
608 Commands::Hummock(HummockCommands::SstDump(args)) => {
609 cmd_impl::hummock::sst_dump(context, args).await.unwrap()
610 }
611 Commands::Hummock(HummockCommands::TriggerManualCompaction {
612 compaction_group_id,
613 table_id,
614 level,
615 sst_ids,
616 }) => {
617 cmd_impl::hummock::trigger_manual_compaction(
618 context,
619 compaction_group_id,
620 table_id.into(),
621 level,
622 sst_ids,
623 )
624 .await?
625 }
626 Commands::Hummock(HummockCommands::TriggerFullGc {
627 sst_retention_time_sec,
628 prefix,
629 }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
630 Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
631 list_pinned_versions(context).await?
632 }
633 Commands::Hummock(HummockCommands::ListCompactionGroup) => {
634 cmd_impl::hummock::list_compaction_group(context).await?
635 }
636 Commands::Hummock(HummockCommands::UpdateCompactionConfig {
637 compaction_group_ids,
638 max_bytes_for_level_base,
639 max_bytes_for_level_multiplier,
640 max_compaction_bytes,
641 sub_level_max_compaction_bytes,
642 level0_tier_compact_file_number,
643 target_file_size_base,
644 compaction_filter_mask,
645 max_sub_compaction,
646 level0_stop_write_threshold_sub_level_number,
647 level0_sub_level_compact_level_count,
648 max_space_reclaim_bytes,
649 level0_max_compact_file_number,
650 level0_overlapping_sub_level_compact_level_count,
651 enable_emergency_picker,
652 tombstone_reclaim_ratio,
653 compression_level,
654 compression_algorithm,
655 max_l0_compact_level,
656 sst_allowed_trivial_move_min_size,
657 disable_auto_group_scheduling,
658 max_overlapping_level_size,
659 sst_allowed_trivial_move_max_count,
660 emergency_level0_sst_file_count,
661 emergency_level0_sub_level_partition,
662 level0_stop_write_threshold_max_sst_count,
663 level0_stop_write_threshold_max_size,
664 enable_optimize_l0_interval_selection,
665 }) => {
666 cmd_impl::hummock::update_compaction_config(
667 context,
668 compaction_group_ids,
669 build_compaction_config_vec(
670 max_bytes_for_level_base,
671 max_bytes_for_level_multiplier,
672 max_compaction_bytes,
673 sub_level_max_compaction_bytes,
674 level0_tier_compact_file_number,
675 target_file_size_base,
676 compaction_filter_mask,
677 max_sub_compaction,
678 level0_stop_write_threshold_sub_level_number,
679 level0_sub_level_compact_level_count,
680 max_space_reclaim_bytes,
681 level0_max_compact_file_number,
682 level0_overlapping_sub_level_compact_level_count,
683 enable_emergency_picker,
684 tombstone_reclaim_ratio,
685 if let Some(level) = compression_level {
686 assert!(compression_algorithm.is_some());
687 Some(CompressionAlgorithm {
688 level,
689 compression_algorithm: compression_algorithm.unwrap(),
690 })
691 } else {
692 None
693 },
694 max_l0_compact_level,
695 sst_allowed_trivial_move_min_size,
696 disable_auto_group_scheduling,
697 max_overlapping_level_size,
698 sst_allowed_trivial_move_max_count,
699 emergency_level0_sst_file_count,
700 emergency_level0_sub_level_partition,
701 level0_stop_write_threshold_max_sst_count,
702 level0_stop_write_threshold_max_size,
703 enable_optimize_l0_interval_selection,
704 ),
705 )
706 .await?
707 }
708 Commands::Hummock(HummockCommands::SplitCompactionGroup {
709 compaction_group_id,
710 table_ids,
711 partition_vnode_count,
712 }) => {
713 cmd_impl::hummock::split_compaction_group(
714 context,
715 compaction_group_id,
716 &table_ids.into_iter().map_into().collect_vec(),
717 partition_vnode_count,
718 )
719 .await?;
720 }
721 Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
722 cmd_impl::hummock::pause_version_checkpoint(context).await?;
723 }
724 Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
725 cmd_impl::hummock::resume_version_checkpoint(context).await?;
726 }
727 Commands::Hummock(HummockCommands::ReplayVersion) => {
728 cmd_impl::hummock::replay_version(context).await?;
729 }
730 Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
731 cmd_impl::hummock::list_compaction_status(context, verbose).await?;
732 }
733 Commands::Hummock(HummockCommands::GetCompactionScore {
734 compaction_group_id,
735 }) => {
736 cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
737 }
738 Commands::Hummock(HummockCommands::ValidateVersion) => {
739 cmd_impl::hummock::validate_version(context).await?;
740 }
741 Commands::Hummock(HummockCommands::RebuildTableStats) => {
742 cmd_impl::hummock::rebuild_table_stats(context).await?;
743 }
744 Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
745 cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
746 }
747 Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
748 archive_ids,
749 data_dir,
750 sst_id,
751 use_new_object_prefix_strategy,
752 }) => {
753 cmd_impl::hummock::print_version_delta_in_archive(
754 context,
755 archive_ids.into_iter().map(HummockVersionId::new),
756 data_dir,
757 sst_id,
758 use_new_object_prefix_strategy,
759 )
760 .await?;
761 }
762 Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
763 archive_ids,
764 data_dir,
765 user_key,
766 use_new_object_prefix_strategy,
767 }) => {
768 cmd_impl::hummock::print_user_key_in_archive(
769 context,
770 archive_ids.into_iter().map(HummockVersionId::new),
771 data_dir,
772 user_key,
773 use_new_object_prefix_strategy,
774 )
775 .await?;
776 }
777 Commands::Hummock(HummockCommands::TieredCacheTracing {
778 enable,
779 record_hybrid_insert_threshold_ms,
780 record_hybrid_get_threshold_ms,
781 record_hybrid_obtain_threshold_ms,
782 record_hybrid_remove_threshold_ms,
783 record_hybrid_fetch_threshold_ms,
784 }) => {
785 cmd_impl::hummock::tiered_cache_tracing(
786 context,
787 enable,
788 record_hybrid_insert_threshold_ms,
789 record_hybrid_get_threshold_ms,
790 record_hybrid_obtain_threshold_ms,
791 record_hybrid_remove_threshold_ms,
792 record_hybrid_fetch_threshold_ms,
793 )
794 .await?
795 }
796 Commands::Hummock(HummockCommands::MergeCompactionGroup {
797 left_group_id,
798 right_group_id,
799 }) => {
800 cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
801 .await?
802 }
803
804 Commands::Hummock(HummockCommands::MigrateLegacyObject {
805 url,
806 source_dir,
807 target_dir,
808 concurrency,
809 }) => {
810 migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
811 }
812 Commands::Hummock(HummockCommands::ResizeCache {
813 meta_cache_capacity_mb,
814 data_cache_capacity_mb,
815 }) => {
816 const MIB: u64 = 1024 * 1024;
817 cmd_impl::hummock::resize_cache(
818 context,
819 meta_cache_capacity_mb.map(|v| v * MIB),
820 data_cache_capacity_mb.map(|v| v * MIB),
821 )
822 .await?
823 }
824 Commands::Table(TableCommands::Scan {
825 mv_name,
826 data_dir,
827 use_new_object_prefix_strategy,
828 }) => {
829 cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
830 .await?
831 }
832 Commands::Table(TableCommands::ScanById {
833 table_id,
834 data_dir,
835 use_new_object_prefix_strategy,
836 }) => {
837 cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
838 .await?
839 }
840 Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
841 Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
842 Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
843 Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
844 Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
845 Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
846 cmd_impl::meta::source_split_info(context, ignore_id).await?
847 }
848 Commands::Meta(MetaCommands::Reschedule {
849 from,
850 dry_run,
851 plan,
852 revision,
853 resolve_no_shuffle,
854 }) => {
855 cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
856 .await?
857 }
858 Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
859 cmd_impl::meta::backup_meta(context, remarks).await?
860 }
861 Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
862 risingwave_meta::backup_restore::restore(opts).await?
863 }
864 Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
865 cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
866 }
867 Commands::Meta(MetaCommands::ListConnections) => {
868 cmd_impl::meta::list_connections(context).await?
869 }
870 Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
871 cmd_impl::meta::list_serving_fragment_mappings(context).await?
872 }
873 Commands::Meta(MetaCommands::UnregisterWorkers {
874 workers,
875 yes,
876 ignore_not_found,
877 check_fragment_occupied,
878 }) => {
879 cmd_impl::meta::unregister_workers(
880 context,
881 workers,
882 yes,
883 ignore_not_found,
884 check_fragment_occupied,
885 )
886 .await?
887 }
888 Commands::Meta(MetaCommands::ValidateSource { props }) => {
889 cmd_impl::meta::validate_source(context, props).await?
890 }
891 Commands::AwaitTree(AwaitTreeCommands::Dump {
892 actor_traces_format,
893 }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
894 Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
895 cmd_impl::await_tree::bottleneck_detect(context, path).await?
896 }
897 Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
898 rw_diagnose_tools::await_tree::transcribe(path)?
899 }
900 Commands::Profile(ProfileCommands::Cpu { sleep }) => {
901 cmd_impl::profile::cpu_profile(context, sleep).await?
902 }
903 Commands::Profile(ProfileCommands::Heap { dir }) => {
904 cmd_impl::profile::heap_profile(context, dir).await?
905 }
906 Commands::Scale(ScaleCommands::Cordon { workers }) => {
907 cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
908 .await?
909 }
910 Commands::Scale(ScaleCommands::Uncordon { workers }) => {
911 cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
912 .await?
913 }
914 Commands::Throttle(ThrottleCommands::Source(args)) => {
915 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
916 }
917 Commands::Throttle(ThrottleCommands::Mv(args)) => {
918 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
919 }
920 Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
921 table_id,
922 parallelism,
923 }) => {
924 set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
925 }
926 Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
927 }
928 Ok(())
929}