1#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use itertools::Itertools;
22use risingwave_common::util::tokio_util::sync::CancellationToken;
23use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
24use risingwave_meta::backup_restore::RestoreOpts;
25use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
26use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
27use thiserror_ext::AsReport;
28
29use crate::cmd_impl::hummock::{
30 build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
31};
32use crate::cmd_impl::profile::ProfileWorkerType;
33use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
34use crate::cmd_impl::throttle::apply_throttle;
35use crate::common::CtlContext;
36
37pub mod cmd_impl;
38pub mod common;
39
40#[derive(Parser)]
46#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
47#[clap(propagate_version = true)]
48#[clap(infer_subcommands = true)]
49pub struct CliOpts {
50 #[clap(subcommand)]
51 command: Commands,
52}
53
54#[derive(Subcommand)]
55#[clap(infer_subcommands = true)]
56enum Commands {
57 #[clap(subcommand)]
59 Compute(ComputeCommands),
60 #[clap(subcommand)]
62 Hummock(HummockCommands),
63 #[clap(subcommand)]
65 Table(TableCommands),
66 #[clap(subcommand)]
68 Meta(MetaCommands),
69 #[clap(subcommand)]
71 Scale(ScaleCommands),
72 #[clap(subcommand)]
74 Bench(BenchCommands),
75 #[clap(subcommand)]
77 #[clap(visible_alias("trace"))]
78 AwaitTree(AwaitTreeCommands),
79 #[clap(subcommand)]
81 Profile(ProfileCommands),
82 #[clap(subcommand)]
83 Throttle(ThrottleCommands),
84 #[clap(subcommand, hide = true)]
86 Test(TestCommands),
87}
88
89#[derive(Subcommand)]
90enum ComputeCommands {
91 ShowConfig { host: String },
93}
94
95#[allow(clippy::large_enum_variant)]
96#[derive(Subcommand)]
97enum HummockCommands {
98 ListVersion {
100 #[clap(short, long = "verbose", default_value_t = false)]
101 verbose: bool,
102
103 #[clap(long = "verbose_key_range", default_value_t = false)]
104 verbose_key_range: bool,
105 },
106
107 ListVersionDeltas {
109 #[clap(short, long = "start-version-delta-id", default_value_t = 0)]
110 start_id: u64,
111
112 #[clap(short, long = "num-epochs", default_value_t = 100)]
113 num_epochs: u32,
114 },
115 DisableCommitEpoch,
117 ListKv {
119 #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
120 epoch: u64,
121
122 #[clap(short, long = "table-id")]
123 table_id: u32,
124
125 data_dir: Option<String>,
127
128 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
129 use_new_object_prefix_strategy: bool,
130 },
131 SstDump(SstDumpArgs),
132 TriggerManualCompaction {
134 #[clap(short, long = "compaction-group-id", default_value_t = 2)]
135 compaction_group_id: u64,
136
137 #[clap(short, long = "table-id", default_value_t = 0)]
138 table_id: u32,
139
140 #[clap(short, long = "level", default_value_t = 1)]
141 level: u32,
142
143 #[clap(short, long = "sst-ids", value_delimiter = ',')]
144 sst_ids: Vec<u64>,
145 },
146 TriggerFullGc {
149 #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
150 sst_retention_time_sec: u64,
151 #[clap(short, long = "prefix", required = false)]
152 prefix: Option<String>,
153 },
154 ListPinnedVersions {},
156 ListCompactionGroup,
158 UpdateCompactionConfig {
160 #[clap(long, value_delimiter = ',')]
161 compaction_group_ids: Vec<u64>,
162 #[clap(long)]
163 max_bytes_for_level_base: Option<u64>,
164 #[clap(long)]
165 max_bytes_for_level_multiplier: Option<u64>,
166 #[clap(long)]
167 max_compaction_bytes: Option<u64>,
168 #[clap(long)]
169 sub_level_max_compaction_bytes: Option<u64>,
170 #[clap(long)]
171 level0_tier_compact_file_number: Option<u64>,
172 #[clap(long)]
173 target_file_size_base: Option<u64>,
174 #[clap(long)]
175 compaction_filter_mask: Option<u32>,
176 #[clap(long)]
177 max_sub_compaction: Option<u32>,
178 #[clap(long)]
179 level0_stop_write_threshold_sub_level_number: Option<u64>,
180 #[clap(long)]
181 level0_sub_level_compact_level_count: Option<u32>,
182 #[clap(long)]
183 max_space_reclaim_bytes: Option<u64>,
184 #[clap(long)]
185 level0_max_compact_file_number: Option<u64>,
186 #[clap(long)]
187 level0_overlapping_sub_level_compact_level_count: Option<u32>,
188 #[clap(long)]
189 enable_emergency_picker: Option<bool>,
190 #[clap(long)]
191 tombstone_reclaim_ratio: Option<u32>,
192 #[clap(long)]
193 compression_level: Option<u32>,
194 #[clap(long)]
195 compression_algorithm: Option<String>,
196 #[clap(long)]
197 max_l0_compact_level: Option<u32>,
198 #[clap(long)]
199 sst_allowed_trivial_move_min_size: Option<u64>,
200 #[clap(long)]
201 disable_auto_group_scheduling: Option<bool>,
202 #[clap(long)]
203 max_overlapping_level_size: Option<u64>,
204 #[clap(long)]
205 sst_allowed_trivial_move_max_count: Option<u32>,
206 #[clap(long)]
207 emergency_level0_sst_file_count: Option<u32>,
208 #[clap(long)]
209 emergency_level0_sub_level_partition: Option<u32>,
210 #[clap(long)]
211 level0_stop_write_threshold_max_sst_count: Option<u32>,
212 #[clap(long)]
213 level0_stop_write_threshold_max_size: Option<u64>,
214 #[clap(long)]
215 enable_optimize_l0_interval_selection: Option<bool>,
216 #[clap(long)]
217 vnode_aligned_level_size_threshold: Option<u64>,
218 #[clap(long)]
219 max_kv_count_for_xor16: Option<u64>,
220 },
221 SplitCompactionGroup {
223 #[clap(long)]
224 compaction_group_id: u64,
225 #[clap(long, value_delimiter = ',')]
226 table_ids: Vec<u32>,
227 #[clap(long, default_value_t = 0)]
228 partition_vnode_count: u32,
229 },
230 PauseVersionCheckpoint,
232 ResumeVersionCheckpoint,
234 ReplayVersion,
236 ListCompactionStatus {
238 #[clap(short, long = "verbose", default_value_t = false)]
239 verbose: bool,
240 },
241 GetCompactionScore {
242 #[clap(long)]
243 compaction_group_id: u64,
244 },
245 ValidateVersion,
247 RebuildTableStats,
249 CancelCompactTask {
250 #[clap(short, long)]
251 task_id: u64,
252 },
253 PrintUserKeyInArchive {
254 #[clap(long, value_delimiter = ',')]
256 archive_ids: Vec<u64>,
257 #[clap(long)]
259 data_dir: String,
260 #[clap(long)]
262 user_key: String,
263 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
264 use_new_object_prefix_strategy: bool,
265 },
266 PrintVersionDeltaInArchive {
267 #[clap(long, value_delimiter = ',')]
269 archive_ids: Vec<u64>,
270 #[clap(long)]
272 data_dir: String,
273 #[clap(long)]
275 sst_id: u64,
276 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
277 use_new_object_prefix_strategy: bool,
278 },
279 TieredCacheTracing {
280 #[clap(long)]
281 enable: bool,
282 #[clap(long)]
283 record_hybrid_insert_threshold_ms: Option<u32>,
284 #[clap(long)]
285 record_hybrid_get_threshold_ms: Option<u32>,
286 #[clap(long)]
287 record_hybrid_obtain_threshold_ms: Option<u32>,
288 #[clap(long)]
289 record_hybrid_remove_threshold_ms: Option<u32>,
290 #[clap(long)]
291 record_hybrid_fetch_threshold_ms: Option<u32>,
292 },
293 MergeCompactionGroup {
294 #[clap(long)]
295 left_group_id: u64,
296 #[clap(long)]
297 right_group_id: u64,
298 },
299 MigrateLegacyObject {
300 url: String,
301 source_dir: String,
302 target_dir: String,
303 #[clap(long, default_value = "100")]
304 concurrency: u32,
305 },
306 ResizeCache {
307 #[clap(long)]
308 meta_cache_capacity_mb: Option<u64>,
309 #[clap(long)]
310 data_cache_capacity_mb: Option<u64>,
311 },
312}
313
314#[derive(Subcommand)]
315enum TableCommands {
316 Scan {
318 mv_name: String,
320 data_dir: Option<String>,
322
323 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
324 use_new_object_prefix_strategy: bool,
325 },
326 ScanById {
328 table_id: u32,
330 data_dir: Option<String>,
332 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
333 use_new_object_prefix_strategy: bool,
334 },
335 List,
337}
338
339#[derive(Subcommand, Debug)]
340enum ScaleCommands {
341 #[clap(verbatim_doc_comment)]
343 Cordon {
344 #[clap(
346 long,
347 required = true,
348 value_delimiter = ',',
349 value_name = "id or host,..."
350 )]
351 workers: Vec<String>,
352 },
353 Uncordon {
355 #[clap(
357 long,
358 required = true,
359 value_delimiter = ',',
360 value_name = "id or host,..."
361 )]
362 workers: Vec<String>,
363 },
364}
365
366#[derive(Subcommand)]
367#[allow(clippy::large_enum_variant)]
368enum MetaCommands {
369 Pause,
371 Resume,
373 ClusterInfo,
375 SourceSplitInfo {
377 #[clap(long)]
378 ignore_id: bool,
379 },
380 #[clap(verbatim_doc_comment)]
397 #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
398 Reschedule {
399 #[clap(long, requires = "revision")]
401 plan: Option<String>,
402 #[clap(long)]
404 revision: Option<u64>,
405 #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
407 from: Option<String>,
408 #[clap(long, default_value = "false")]
410 dry_run: bool,
411 #[clap(long, default_value = "false")]
413 resolve_no_shuffle: bool,
414 },
415 BackupMeta {
417 #[clap(long)]
418 remarks: Option<String>,
419 },
420 RestoreMeta {
422 #[command(flatten)]
423 opts: RestoreOpts,
424 },
425 DeleteMetaSnapshots {
427 #[clap(long, value_delimiter = ',')]
428 snapshot_ids: Vec<u64>,
429 },
430
431 ListConnections,
433
434 ListServingFragmentMapping,
436
437 UnregisterWorkers {
439 #[clap(
441 long,
442 required = true,
443 value_delimiter = ',',
444 value_name = "worker_id or worker_host:worker_port, ..."
445 )]
446 workers: Vec<String>,
447
448 #[clap(short = 'y', long, default_value_t = false)]
450 yes: bool,
451
452 #[clap(long, default_value_t = false)]
454 ignore_not_found: bool,
455
456 #[clap(long, default_value_t = false)]
458 check_fragment_occupied: bool,
459 },
460
461 ValidateSource {
463 #[clap(long)]
466 props: String,
467 },
468
469 SetCdcTableBackfillParallelism {
470 #[clap(long, required = true)]
471 table_id: u32,
472 #[clap(long, required = true)]
473 parallelism: u32,
474 },
475}
476
477#[derive(Subcommand, Clone, Debug)]
478pub enum AwaitTreeCommands {
479 Dump {
481 #[clap(short, long = "actor-traces-format")]
483 actor_traces_format: Option<String>,
484 },
485 Analyze {
487 #[clap(long = "path")]
491 path: Option<String>,
492 },
493 Transcribe {
495 #[clap(long = "path")]
497 path: String,
498 },
499}
500
501#[derive(Subcommand, Clone, Debug)]
502enum TestCommands {
503 Jvm,
505}
506
507#[derive(Subcommand, Clone, Debug)]
508enum ThrottleCommands {
509 Source(ThrottleCommandArgs),
510 Mv(ThrottleCommandArgs),
511}
512
513#[derive(Clone, Debug, Args)]
514pub struct ThrottleCommandArgs {
515 id: u32,
516 rate: Option<u32>,
517}
518
519#[derive(Subcommand, Clone, Debug)]
520pub enum ProfileCommands {
521 Cpu {
523 #[clap(short, long = "sleep")]
525 sleep: u64,
526 #[clap(long = "worker-type", value_name = "TYPE")]
528 worker_types: Vec<ProfileWorkerType>,
529 },
530 Heap {
532 #[clap(long = "dir")]
534 dir: Option<String>,
535 #[clap(long = "worker-type", value_name = "TYPE")]
537 worker_types: Vec<ProfileWorkerType>,
538 },
539}
540
541pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
548 let context = CtlContext::default();
549
550 tokio::select! {
551 _ = shutdown.cancelled() => {
552 context.try_close().await;
554 }
555
556 result = start_fallible(opts, &context) => {
557 if let Err(e) = result {
558 eprintln!("Error: {:#?}", e.as_report()); std::process::exit(1);
560 }
561 }
562 }
563}
564
565pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
568 let result = start_impl(opts, context).await;
569 context.try_close().await;
570 result
571}
572
573#[expect(
574 clippy::large_stack_frames,
575 reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
576 post-layout generator stores only one arm at a time (~13–16 KiB)."
577)]
578async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
579 match opts.command {
580 Commands::Compute(ComputeCommands::ShowConfig { host }) => {
581 cmd_impl::compute::show_config(&host).await?
582 }
583 Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
584 cmd_impl::hummock::disable_commit_epoch(context).await?
585 }
586 Commands::Hummock(HummockCommands::ListVersion {
587 verbose,
588 verbose_key_range,
589 }) => {
590 cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
591 }
592 Commands::Hummock(HummockCommands::ListVersionDeltas {
593 start_id,
594 num_epochs,
595 }) => {
596 cmd_impl::hummock::list_version_deltas(
597 context,
598 HummockVersionId::new(start_id),
599 num_epochs,
600 )
601 .await?;
602 }
603 Commands::Hummock(HummockCommands::ListKv {
604 epoch,
605 table_id,
606 data_dir,
607 use_new_object_prefix_strategy,
608 }) => {
609 cmd_impl::hummock::list_kv(
610 context,
611 epoch,
612 table_id,
613 data_dir,
614 use_new_object_prefix_strategy,
615 )
616 .await?;
617 }
618 Commands::Hummock(HummockCommands::SstDump(args)) => {
619 cmd_impl::hummock::sst_dump(context, args).await.unwrap()
620 }
621 Commands::Hummock(HummockCommands::TriggerManualCompaction {
622 compaction_group_id,
623 table_id,
624 level,
625 sst_ids,
626 }) => {
627 cmd_impl::hummock::trigger_manual_compaction(
628 context,
629 compaction_group_id,
630 table_id.into(),
631 level,
632 sst_ids,
633 )
634 .await?
635 }
636 Commands::Hummock(HummockCommands::TriggerFullGc {
637 sst_retention_time_sec,
638 prefix,
639 }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
640 Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
641 list_pinned_versions(context).await?
642 }
643 Commands::Hummock(HummockCommands::ListCompactionGroup) => {
644 cmd_impl::hummock::list_compaction_group(context).await?
645 }
646 Commands::Hummock(HummockCommands::UpdateCompactionConfig {
647 compaction_group_ids,
648 max_bytes_for_level_base,
649 max_bytes_for_level_multiplier,
650 max_compaction_bytes,
651 sub_level_max_compaction_bytes,
652 level0_tier_compact_file_number,
653 target_file_size_base,
654 compaction_filter_mask,
655 max_sub_compaction,
656 level0_stop_write_threshold_sub_level_number,
657 level0_sub_level_compact_level_count,
658 max_space_reclaim_bytes,
659 level0_max_compact_file_number,
660 level0_overlapping_sub_level_compact_level_count,
661 enable_emergency_picker,
662 tombstone_reclaim_ratio,
663 compression_level,
664 compression_algorithm,
665 max_l0_compact_level,
666 sst_allowed_trivial_move_min_size,
667 disable_auto_group_scheduling,
668 max_overlapping_level_size,
669 sst_allowed_trivial_move_max_count,
670 emergency_level0_sst_file_count,
671 emergency_level0_sub_level_partition,
672 level0_stop_write_threshold_max_sst_count,
673 level0_stop_write_threshold_max_size,
674 enable_optimize_l0_interval_selection,
675 vnode_aligned_level_size_threshold,
676 max_kv_count_for_xor16,
677 }) => {
678 cmd_impl::hummock::update_compaction_config(
679 context,
680 compaction_group_ids,
681 build_compaction_config_vec(
682 max_bytes_for_level_base,
683 max_bytes_for_level_multiplier,
684 max_compaction_bytes,
685 sub_level_max_compaction_bytes,
686 level0_tier_compact_file_number,
687 target_file_size_base,
688 compaction_filter_mask,
689 max_sub_compaction,
690 level0_stop_write_threshold_sub_level_number,
691 level0_sub_level_compact_level_count,
692 max_space_reclaim_bytes,
693 level0_max_compact_file_number,
694 level0_overlapping_sub_level_compact_level_count,
695 enable_emergency_picker,
696 tombstone_reclaim_ratio,
697 if let Some(level) = compression_level {
698 assert!(compression_algorithm.is_some());
699 Some(CompressionAlgorithm {
700 level,
701 compression_algorithm: compression_algorithm.unwrap(),
702 })
703 } else {
704 None
705 },
706 max_l0_compact_level,
707 sst_allowed_trivial_move_min_size,
708 disable_auto_group_scheduling,
709 max_overlapping_level_size,
710 sst_allowed_trivial_move_max_count,
711 emergency_level0_sst_file_count,
712 emergency_level0_sub_level_partition,
713 level0_stop_write_threshold_max_sst_count,
714 level0_stop_write_threshold_max_size,
715 enable_optimize_l0_interval_selection,
716 vnode_aligned_level_size_threshold,
717 max_kv_count_for_xor16,
718 ),
719 )
720 .await?
721 }
722 Commands::Hummock(HummockCommands::SplitCompactionGroup {
723 compaction_group_id,
724 table_ids,
725 partition_vnode_count,
726 }) => {
727 cmd_impl::hummock::split_compaction_group(
728 context,
729 compaction_group_id,
730 &table_ids.into_iter().map_into().collect_vec(),
731 partition_vnode_count,
732 )
733 .await?;
734 }
735 Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
736 cmd_impl::hummock::pause_version_checkpoint(context).await?;
737 }
738 Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
739 cmd_impl::hummock::resume_version_checkpoint(context).await?;
740 }
741 Commands::Hummock(HummockCommands::ReplayVersion) => {
742 cmd_impl::hummock::replay_version(context).await?;
743 }
744 Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
745 cmd_impl::hummock::list_compaction_status(context, verbose).await?;
746 }
747 Commands::Hummock(HummockCommands::GetCompactionScore {
748 compaction_group_id,
749 }) => {
750 cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
751 }
752 Commands::Hummock(HummockCommands::ValidateVersion) => {
753 cmd_impl::hummock::validate_version(context).await?;
754 }
755 Commands::Hummock(HummockCommands::RebuildTableStats) => {
756 cmd_impl::hummock::rebuild_table_stats(context).await?;
757 }
758 Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
759 cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
760 }
761 Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
762 archive_ids,
763 data_dir,
764 sst_id,
765 use_new_object_prefix_strategy,
766 }) => {
767 cmd_impl::hummock::print_version_delta_in_archive(
768 context,
769 archive_ids.into_iter().map(HummockVersionId::new),
770 data_dir,
771 sst_id,
772 use_new_object_prefix_strategy,
773 )
774 .await?;
775 }
776 Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
777 archive_ids,
778 data_dir,
779 user_key,
780 use_new_object_prefix_strategy,
781 }) => {
782 cmd_impl::hummock::print_user_key_in_archive(
783 context,
784 archive_ids.into_iter().map(HummockVersionId::new),
785 data_dir,
786 user_key,
787 use_new_object_prefix_strategy,
788 )
789 .await?;
790 }
791 Commands::Hummock(HummockCommands::TieredCacheTracing {
792 enable,
793 record_hybrid_insert_threshold_ms,
794 record_hybrid_get_threshold_ms,
795 record_hybrid_obtain_threshold_ms,
796 record_hybrid_remove_threshold_ms,
797 record_hybrid_fetch_threshold_ms,
798 }) => {
799 cmd_impl::hummock::tiered_cache_tracing(
800 context,
801 enable,
802 record_hybrid_insert_threshold_ms,
803 record_hybrid_get_threshold_ms,
804 record_hybrid_obtain_threshold_ms,
805 record_hybrid_remove_threshold_ms,
806 record_hybrid_fetch_threshold_ms,
807 )
808 .await?
809 }
810 Commands::Hummock(HummockCommands::MergeCompactionGroup {
811 left_group_id,
812 right_group_id,
813 }) => {
814 cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
815 .await?
816 }
817
818 Commands::Hummock(HummockCommands::MigrateLegacyObject {
819 url,
820 source_dir,
821 target_dir,
822 concurrency,
823 }) => {
824 migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
825 }
826 Commands::Hummock(HummockCommands::ResizeCache {
827 meta_cache_capacity_mb,
828 data_cache_capacity_mb,
829 }) => {
830 const MIB: u64 = 1024 * 1024;
831 cmd_impl::hummock::resize_cache(
832 context,
833 meta_cache_capacity_mb.map(|v| v * MIB),
834 data_cache_capacity_mb.map(|v| v * MIB),
835 )
836 .await?
837 }
838 Commands::Table(TableCommands::Scan {
839 mv_name,
840 data_dir,
841 use_new_object_prefix_strategy,
842 }) => {
843 cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
844 .await?
845 }
846 Commands::Table(TableCommands::ScanById {
847 table_id,
848 data_dir,
849 use_new_object_prefix_strategy,
850 }) => {
851 cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
852 .await?
853 }
854 Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
855 Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
856 Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
857 Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
858 Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
859 Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
860 cmd_impl::meta::source_split_info(context, ignore_id).await?
861 }
862 Commands::Meta(MetaCommands::Reschedule {
863 from,
864 dry_run,
865 plan,
866 revision,
867 resolve_no_shuffle,
868 }) => {
869 cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
870 .await?
871 }
872 Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
873 cmd_impl::meta::backup_meta(context, remarks).await?
874 }
875 Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
876 risingwave_meta::backup_restore::restore(opts).await?
877 }
878 Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
879 cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
880 }
881 Commands::Meta(MetaCommands::ListConnections) => {
882 cmd_impl::meta::list_connections(context).await?
883 }
884 Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
885 cmd_impl::meta::list_serving_fragment_mappings(context).await?
886 }
887 Commands::Meta(MetaCommands::UnregisterWorkers {
888 workers,
889 yes,
890 ignore_not_found,
891 check_fragment_occupied,
892 }) => {
893 cmd_impl::meta::unregister_workers(
894 context,
895 workers,
896 yes,
897 ignore_not_found,
898 check_fragment_occupied,
899 )
900 .await?
901 }
902 Commands::Meta(MetaCommands::ValidateSource { props }) => {
903 cmd_impl::meta::validate_source(context, props).await?
904 }
905 Commands::AwaitTree(AwaitTreeCommands::Dump {
906 actor_traces_format,
907 }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
908 Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
909 cmd_impl::await_tree::bottleneck_detect(context, path).await?
910 }
911 Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
912 rw_diagnose_tools::await_tree::transcribe(path)?
913 }
914 Commands::Profile(ProfileCommands::Cpu {
915 sleep,
916 worker_types,
917 }) => cmd_impl::profile::cpu_profile(context, sleep, worker_types).await?,
918 Commands::Profile(ProfileCommands::Heap { dir, worker_types }) => {
919 cmd_impl::profile::heap_profile(context, dir, worker_types).await?
920 }
921 Commands::Scale(ScaleCommands::Cordon { workers }) => {
922 cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
923 .await?
924 }
925 Commands::Scale(ScaleCommands::Uncordon { workers }) => {
926 cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
927 .await?
928 }
929 Commands::Throttle(ThrottleCommands::Source(args)) => {
930 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
931 }
932 Commands::Throttle(ThrottleCommands::Mv(args)) => {
933 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
934 }
935 Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
936 table_id,
937 parallelism,
938 }) => {
939 set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
940 }
941 Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
942 }
943 Ok(())
944}