1#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use itertools::Itertools;
22use risingwave_common::util::tokio_util::sync::CancellationToken;
23use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
24use risingwave_meta::backup_restore::RestoreOpts;
25use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
26use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
27use thiserror_ext::AsReport;
28
29use crate::cmd_impl::hummock::{
30 build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
31};
32use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
33use crate::cmd_impl::throttle::apply_throttle;
34use crate::common::CtlContext;
35
36pub mod cmd_impl;
37pub mod common;
38
39#[derive(Parser)]
45#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
46#[clap(propagate_version = true)]
47#[clap(infer_subcommands = true)]
48pub struct CliOpts {
49 #[clap(subcommand)]
50 command: Commands,
51}
52
53#[derive(Subcommand)]
54#[clap(infer_subcommands = true)]
55enum Commands {
56 #[clap(subcommand)]
58 Compute(ComputeCommands),
59 #[clap(subcommand)]
61 Hummock(HummockCommands),
62 #[clap(subcommand)]
64 Table(TableCommands),
65 #[clap(subcommand)]
67 Meta(MetaCommands),
68 #[clap(subcommand)]
70 Scale(ScaleCommands),
71 #[clap(subcommand)]
73 Bench(BenchCommands),
74 #[clap(subcommand)]
76 #[clap(visible_alias("trace"))]
77 AwaitTree(AwaitTreeCommands),
78 #[clap(subcommand)]
81 Profile(ProfileCommands),
82 #[clap(subcommand)]
83 Throttle(ThrottleCommands),
84 #[clap(subcommand, hide = true)]
86 Test(TestCommands),
87}
88
89#[derive(Subcommand)]
90enum ComputeCommands {
91 ShowConfig { host: String },
93}
94
95#[allow(clippy::large_enum_variant)]
96#[derive(Subcommand)]
97enum HummockCommands {
98 ListVersion {
100 #[clap(short, long = "verbose", default_value_t = false)]
101 verbose: bool,
102
103 #[clap(long = "verbose_key_range", default_value_t = false)]
104 verbose_key_range: bool,
105 },
106
107 ListVersionDeltas {
109 #[clap(short, long = "start-version-delta-id", default_value_t = 0)]
110 start_id: u64,
111
112 #[clap(short, long = "num-epochs", default_value_t = 100)]
113 num_epochs: u32,
114 },
115 DisableCommitEpoch,
117 ListKv {
119 #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
120 epoch: u64,
121
122 #[clap(short, long = "table-id")]
123 table_id: u32,
124
125 data_dir: Option<String>,
127
128 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
129 use_new_object_prefix_strategy: bool,
130 },
131 SstDump(SstDumpArgs),
132 TriggerManualCompaction {
134 #[clap(short, long = "compaction-group-id", default_value_t = 2)]
135 compaction_group_id: u64,
136
137 #[clap(short, long = "table-id", default_value_t = 0)]
138 table_id: u32,
139
140 #[clap(short, long = "level", default_value_t = 1)]
141 level: u32,
142
143 #[clap(short, long = "sst-ids", value_delimiter = ',')]
144 sst_ids: Vec<u64>,
145 },
146 TriggerFullGc {
149 #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
150 sst_retention_time_sec: u64,
151 #[clap(short, long = "prefix", required = false)]
152 prefix: Option<String>,
153 },
154 ListPinnedVersions {},
156 ListCompactionGroup,
158 UpdateCompactionConfig {
160 #[clap(long, value_delimiter = ',')]
161 compaction_group_ids: Vec<u64>,
162 #[clap(long)]
163 max_bytes_for_level_base: Option<u64>,
164 #[clap(long)]
165 max_bytes_for_level_multiplier: Option<u64>,
166 #[clap(long)]
167 max_compaction_bytes: Option<u64>,
168 #[clap(long)]
169 sub_level_max_compaction_bytes: Option<u64>,
170 #[clap(long)]
171 level0_tier_compact_file_number: Option<u64>,
172 #[clap(long)]
173 target_file_size_base: Option<u64>,
174 #[clap(long)]
175 compaction_filter_mask: Option<u32>,
176 #[clap(long)]
177 max_sub_compaction: Option<u32>,
178 #[clap(long)]
179 level0_stop_write_threshold_sub_level_number: Option<u64>,
180 #[clap(long)]
181 level0_sub_level_compact_level_count: Option<u32>,
182 #[clap(long)]
183 max_space_reclaim_bytes: Option<u64>,
184 #[clap(long)]
185 level0_max_compact_file_number: Option<u64>,
186 #[clap(long)]
187 level0_overlapping_sub_level_compact_level_count: Option<u32>,
188 #[clap(long)]
189 enable_emergency_picker: Option<bool>,
190 #[clap(long)]
191 tombstone_reclaim_ratio: Option<u32>,
192 #[clap(long)]
193 compression_level: Option<u32>,
194 #[clap(long)]
195 compression_algorithm: Option<String>,
196 #[clap(long)]
197 max_l0_compact_level: Option<u32>,
198 #[clap(long)]
199 sst_allowed_trivial_move_min_size: Option<u64>,
200 #[clap(long)]
201 disable_auto_group_scheduling: Option<bool>,
202 #[clap(long)]
203 max_overlapping_level_size: Option<u64>,
204 #[clap(long)]
205 sst_allowed_trivial_move_max_count: Option<u32>,
206 #[clap(long)]
207 emergency_level0_sst_file_count: Option<u32>,
208 #[clap(long)]
209 emergency_level0_sub_level_partition: Option<u32>,
210 #[clap(long)]
211 level0_stop_write_threshold_max_sst_count: Option<u32>,
212 #[clap(long)]
213 level0_stop_write_threshold_max_size: Option<u64>,
214 #[clap(long)]
215 enable_optimize_l0_interval_selection: Option<bool>,
216 #[clap(long)]
217 vnode_aligned_level_size_threshold: Option<u64>,
218 },
219 SplitCompactionGroup {
221 #[clap(long)]
222 compaction_group_id: u64,
223 #[clap(long, value_delimiter = ',')]
224 table_ids: Vec<u32>,
225 #[clap(long, default_value_t = 0)]
226 partition_vnode_count: u32,
227 },
228 PauseVersionCheckpoint,
230 ResumeVersionCheckpoint,
232 ReplayVersion,
234 ListCompactionStatus {
236 #[clap(short, long = "verbose", default_value_t = false)]
237 verbose: bool,
238 },
239 GetCompactionScore {
240 #[clap(long)]
241 compaction_group_id: u64,
242 },
243 ValidateVersion,
245 RebuildTableStats,
247 CancelCompactTask {
248 #[clap(short, long)]
249 task_id: u64,
250 },
251 PrintUserKeyInArchive {
252 #[clap(long, value_delimiter = ',')]
254 archive_ids: Vec<u64>,
255 #[clap(long)]
257 data_dir: String,
258 #[clap(long)]
260 user_key: String,
261 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
262 use_new_object_prefix_strategy: bool,
263 },
264 PrintVersionDeltaInArchive {
265 #[clap(long, value_delimiter = ',')]
267 archive_ids: Vec<u64>,
268 #[clap(long)]
270 data_dir: String,
271 #[clap(long)]
273 sst_id: u64,
274 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
275 use_new_object_prefix_strategy: bool,
276 },
277 TieredCacheTracing {
278 #[clap(long)]
279 enable: bool,
280 #[clap(long)]
281 record_hybrid_insert_threshold_ms: Option<u32>,
282 #[clap(long)]
283 record_hybrid_get_threshold_ms: Option<u32>,
284 #[clap(long)]
285 record_hybrid_obtain_threshold_ms: Option<u32>,
286 #[clap(long)]
287 record_hybrid_remove_threshold_ms: Option<u32>,
288 #[clap(long)]
289 record_hybrid_fetch_threshold_ms: Option<u32>,
290 },
291 MergeCompactionGroup {
292 #[clap(long)]
293 left_group_id: u64,
294 #[clap(long)]
295 right_group_id: u64,
296 },
297 MigrateLegacyObject {
298 url: String,
299 source_dir: String,
300 target_dir: String,
301 #[clap(long, default_value = "100")]
302 concurrency: u32,
303 },
304 ResizeCache {
305 #[clap(long)]
306 meta_cache_capacity_mb: Option<u64>,
307 #[clap(long)]
308 data_cache_capacity_mb: Option<u64>,
309 },
310}
311
312#[derive(Subcommand)]
313enum TableCommands {
314 Scan {
316 mv_name: String,
318 data_dir: Option<String>,
320
321 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
322 use_new_object_prefix_strategy: bool,
323 },
324 ScanById {
326 table_id: u32,
328 data_dir: Option<String>,
330 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
331 use_new_object_prefix_strategy: bool,
332 },
333 List,
335}
336
337#[derive(Subcommand, Debug)]
338enum ScaleCommands {
339 #[clap(verbatim_doc_comment)]
341 Cordon {
342 #[clap(
344 long,
345 required = true,
346 value_delimiter = ',',
347 value_name = "id or host,..."
348 )]
349 workers: Vec<String>,
350 },
351 Uncordon {
353 #[clap(
355 long,
356 required = true,
357 value_delimiter = ',',
358 value_name = "id or host,..."
359 )]
360 workers: Vec<String>,
361 },
362}
363
364#[derive(Subcommand)]
365#[allow(clippy::large_enum_variant)]
366enum MetaCommands {
367 Pause,
369 Resume,
371 ClusterInfo,
373 SourceSplitInfo {
375 #[clap(long)]
376 ignore_id: bool,
377 },
378 #[clap(verbatim_doc_comment)]
395 #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
396 Reschedule {
397 #[clap(long, requires = "revision")]
399 plan: Option<String>,
400 #[clap(long)]
402 revision: Option<u64>,
403 #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
405 from: Option<String>,
406 #[clap(long, default_value = "false")]
408 dry_run: bool,
409 #[clap(long, default_value = "false")]
411 resolve_no_shuffle: bool,
412 },
413 BackupMeta {
415 #[clap(long)]
416 remarks: Option<String>,
417 },
418 RestoreMeta {
420 #[command(flatten)]
421 opts: RestoreOpts,
422 },
423 DeleteMetaSnapshots {
425 #[clap(long, value_delimiter = ',')]
426 snapshot_ids: Vec<u64>,
427 },
428
429 ListConnections,
431
432 ListServingFragmentMapping,
434
435 UnregisterWorkers {
437 #[clap(
439 long,
440 required = true,
441 value_delimiter = ',',
442 value_name = "worker_id or worker_host:worker_port, ..."
443 )]
444 workers: Vec<String>,
445
446 #[clap(short = 'y', long, default_value_t = false)]
448 yes: bool,
449
450 #[clap(long, default_value_t = false)]
452 ignore_not_found: bool,
453
454 #[clap(long, default_value_t = false)]
456 check_fragment_occupied: bool,
457 },
458
459 ValidateSource {
461 #[clap(long)]
464 props: String,
465 },
466
467 SetCdcTableBackfillParallelism {
468 #[clap(long, required = true)]
469 table_id: u32,
470 #[clap(long, required = true)]
471 parallelism: u32,
472 },
473}
474
475#[derive(Subcommand, Clone, Debug)]
476pub enum AwaitTreeCommands {
477 Dump {
479 #[clap(short, long = "actor-traces-format")]
481 actor_traces_format: Option<String>,
482 },
483 Analyze {
485 #[clap(long = "path")]
489 path: Option<String>,
490 },
491 Transcribe {
493 #[clap(long = "path")]
495 path: String,
496 },
497}
498
499#[derive(Subcommand, Clone, Debug)]
500enum TestCommands {
501 Jvm,
503}
504
505#[derive(Subcommand, Clone, Debug)]
506enum ThrottleCommands {
507 Source(ThrottleCommandArgs),
508 Mv(ThrottleCommandArgs),
509}
510
511#[derive(Clone, Debug, Args)]
512pub struct ThrottleCommandArgs {
513 id: u32,
514 rate: Option<u32>,
515}
516
517#[derive(Subcommand, Clone, Debug)]
518pub enum ProfileCommands {
519 Cpu {
521 #[clap(short, long = "sleep")]
523 sleep: u64,
524 },
525 Heap {
527 #[clap(long = "dir")]
529 dir: Option<String>,
530 },
531}
532
533pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
540 let context = CtlContext::default();
541
542 tokio::select! {
543 _ = shutdown.cancelled() => {
544 context.try_close().await;
546 }
547
548 result = start_fallible(opts, &context) => {
549 if let Err(e) = result {
550 eprintln!("Error: {:#?}", e.as_report()); std::process::exit(1);
552 }
553 }
554 }
555}
556
557pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
560 let result = start_impl(opts, context).await;
561 context.try_close().await;
562 result
563}
564
565#[expect(
566 clippy::large_stack_frames,
567 reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
568 post-layout generator stores only one arm at a time (~13–16 KiB)."
569)]
570async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
571 match opts.command {
572 Commands::Compute(ComputeCommands::ShowConfig { host }) => {
573 cmd_impl::compute::show_config(&host).await?
574 }
575 Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
576 cmd_impl::hummock::disable_commit_epoch(context).await?
577 }
578 Commands::Hummock(HummockCommands::ListVersion {
579 verbose,
580 verbose_key_range,
581 }) => {
582 cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
583 }
584 Commands::Hummock(HummockCommands::ListVersionDeltas {
585 start_id,
586 num_epochs,
587 }) => {
588 cmd_impl::hummock::list_version_deltas(
589 context,
590 HummockVersionId::new(start_id),
591 num_epochs,
592 )
593 .await?;
594 }
595 Commands::Hummock(HummockCommands::ListKv {
596 epoch,
597 table_id,
598 data_dir,
599 use_new_object_prefix_strategy,
600 }) => {
601 cmd_impl::hummock::list_kv(
602 context,
603 epoch,
604 table_id,
605 data_dir,
606 use_new_object_prefix_strategy,
607 )
608 .await?;
609 }
610 Commands::Hummock(HummockCommands::SstDump(args)) => {
611 cmd_impl::hummock::sst_dump(context, args).await.unwrap()
612 }
613 Commands::Hummock(HummockCommands::TriggerManualCompaction {
614 compaction_group_id,
615 table_id,
616 level,
617 sst_ids,
618 }) => {
619 cmd_impl::hummock::trigger_manual_compaction(
620 context,
621 compaction_group_id,
622 table_id.into(),
623 level,
624 sst_ids,
625 )
626 .await?
627 }
628 Commands::Hummock(HummockCommands::TriggerFullGc {
629 sst_retention_time_sec,
630 prefix,
631 }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
632 Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
633 list_pinned_versions(context).await?
634 }
635 Commands::Hummock(HummockCommands::ListCompactionGroup) => {
636 cmd_impl::hummock::list_compaction_group(context).await?
637 }
638 Commands::Hummock(HummockCommands::UpdateCompactionConfig {
639 compaction_group_ids,
640 max_bytes_for_level_base,
641 max_bytes_for_level_multiplier,
642 max_compaction_bytes,
643 sub_level_max_compaction_bytes,
644 level0_tier_compact_file_number,
645 target_file_size_base,
646 compaction_filter_mask,
647 max_sub_compaction,
648 level0_stop_write_threshold_sub_level_number,
649 level0_sub_level_compact_level_count,
650 max_space_reclaim_bytes,
651 level0_max_compact_file_number,
652 level0_overlapping_sub_level_compact_level_count,
653 enable_emergency_picker,
654 tombstone_reclaim_ratio,
655 compression_level,
656 compression_algorithm,
657 max_l0_compact_level,
658 sst_allowed_trivial_move_min_size,
659 disable_auto_group_scheduling,
660 max_overlapping_level_size,
661 sst_allowed_trivial_move_max_count,
662 emergency_level0_sst_file_count,
663 emergency_level0_sub_level_partition,
664 level0_stop_write_threshold_max_sst_count,
665 level0_stop_write_threshold_max_size,
666 enable_optimize_l0_interval_selection,
667 vnode_aligned_level_size_threshold,
668 }) => {
669 cmd_impl::hummock::update_compaction_config(
670 context,
671 compaction_group_ids,
672 build_compaction_config_vec(
673 max_bytes_for_level_base,
674 max_bytes_for_level_multiplier,
675 max_compaction_bytes,
676 sub_level_max_compaction_bytes,
677 level0_tier_compact_file_number,
678 target_file_size_base,
679 compaction_filter_mask,
680 max_sub_compaction,
681 level0_stop_write_threshold_sub_level_number,
682 level0_sub_level_compact_level_count,
683 max_space_reclaim_bytes,
684 level0_max_compact_file_number,
685 level0_overlapping_sub_level_compact_level_count,
686 enable_emergency_picker,
687 tombstone_reclaim_ratio,
688 if let Some(level) = compression_level {
689 assert!(compression_algorithm.is_some());
690 Some(CompressionAlgorithm {
691 level,
692 compression_algorithm: compression_algorithm.unwrap(),
693 })
694 } else {
695 None
696 },
697 max_l0_compact_level,
698 sst_allowed_trivial_move_min_size,
699 disable_auto_group_scheduling,
700 max_overlapping_level_size,
701 sst_allowed_trivial_move_max_count,
702 emergency_level0_sst_file_count,
703 emergency_level0_sub_level_partition,
704 level0_stop_write_threshold_max_sst_count,
705 level0_stop_write_threshold_max_size,
706 enable_optimize_l0_interval_selection,
707 vnode_aligned_level_size_threshold,
708 ),
709 )
710 .await?
711 }
712 Commands::Hummock(HummockCommands::SplitCompactionGroup {
713 compaction_group_id,
714 table_ids,
715 partition_vnode_count,
716 }) => {
717 cmd_impl::hummock::split_compaction_group(
718 context,
719 compaction_group_id,
720 &table_ids.into_iter().map_into().collect_vec(),
721 partition_vnode_count,
722 )
723 .await?;
724 }
725 Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
726 cmd_impl::hummock::pause_version_checkpoint(context).await?;
727 }
728 Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
729 cmd_impl::hummock::resume_version_checkpoint(context).await?;
730 }
731 Commands::Hummock(HummockCommands::ReplayVersion) => {
732 cmd_impl::hummock::replay_version(context).await?;
733 }
734 Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
735 cmd_impl::hummock::list_compaction_status(context, verbose).await?;
736 }
737 Commands::Hummock(HummockCommands::GetCompactionScore {
738 compaction_group_id,
739 }) => {
740 cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
741 }
742 Commands::Hummock(HummockCommands::ValidateVersion) => {
743 cmd_impl::hummock::validate_version(context).await?;
744 }
745 Commands::Hummock(HummockCommands::RebuildTableStats) => {
746 cmd_impl::hummock::rebuild_table_stats(context).await?;
747 }
748 Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
749 cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
750 }
751 Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
752 archive_ids,
753 data_dir,
754 sst_id,
755 use_new_object_prefix_strategy,
756 }) => {
757 cmd_impl::hummock::print_version_delta_in_archive(
758 context,
759 archive_ids.into_iter().map(HummockVersionId::new),
760 data_dir,
761 sst_id,
762 use_new_object_prefix_strategy,
763 )
764 .await?;
765 }
766 Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
767 archive_ids,
768 data_dir,
769 user_key,
770 use_new_object_prefix_strategy,
771 }) => {
772 cmd_impl::hummock::print_user_key_in_archive(
773 context,
774 archive_ids.into_iter().map(HummockVersionId::new),
775 data_dir,
776 user_key,
777 use_new_object_prefix_strategy,
778 )
779 .await?;
780 }
781 Commands::Hummock(HummockCommands::TieredCacheTracing {
782 enable,
783 record_hybrid_insert_threshold_ms,
784 record_hybrid_get_threshold_ms,
785 record_hybrid_obtain_threshold_ms,
786 record_hybrid_remove_threshold_ms,
787 record_hybrid_fetch_threshold_ms,
788 }) => {
789 cmd_impl::hummock::tiered_cache_tracing(
790 context,
791 enable,
792 record_hybrid_insert_threshold_ms,
793 record_hybrid_get_threshold_ms,
794 record_hybrid_obtain_threshold_ms,
795 record_hybrid_remove_threshold_ms,
796 record_hybrid_fetch_threshold_ms,
797 )
798 .await?
799 }
800 Commands::Hummock(HummockCommands::MergeCompactionGroup {
801 left_group_id,
802 right_group_id,
803 }) => {
804 cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
805 .await?
806 }
807
808 Commands::Hummock(HummockCommands::MigrateLegacyObject {
809 url,
810 source_dir,
811 target_dir,
812 concurrency,
813 }) => {
814 migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
815 }
816 Commands::Hummock(HummockCommands::ResizeCache {
817 meta_cache_capacity_mb,
818 data_cache_capacity_mb,
819 }) => {
820 const MIB: u64 = 1024 * 1024;
821 cmd_impl::hummock::resize_cache(
822 context,
823 meta_cache_capacity_mb.map(|v| v * MIB),
824 data_cache_capacity_mb.map(|v| v * MIB),
825 )
826 .await?
827 }
828 Commands::Table(TableCommands::Scan {
829 mv_name,
830 data_dir,
831 use_new_object_prefix_strategy,
832 }) => {
833 cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
834 .await?
835 }
836 Commands::Table(TableCommands::ScanById {
837 table_id,
838 data_dir,
839 use_new_object_prefix_strategy,
840 }) => {
841 cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
842 .await?
843 }
844 Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
845 Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
846 Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
847 Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
848 Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
849 Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
850 cmd_impl::meta::source_split_info(context, ignore_id).await?
851 }
852 Commands::Meta(MetaCommands::Reschedule {
853 from,
854 dry_run,
855 plan,
856 revision,
857 resolve_no_shuffle,
858 }) => {
859 cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
860 .await?
861 }
862 Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
863 cmd_impl::meta::backup_meta(context, remarks).await?
864 }
865 Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
866 risingwave_meta::backup_restore::restore(opts).await?
867 }
868 Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
869 cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
870 }
871 Commands::Meta(MetaCommands::ListConnections) => {
872 cmd_impl::meta::list_connections(context).await?
873 }
874 Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
875 cmd_impl::meta::list_serving_fragment_mappings(context).await?
876 }
877 Commands::Meta(MetaCommands::UnregisterWorkers {
878 workers,
879 yes,
880 ignore_not_found,
881 check_fragment_occupied,
882 }) => {
883 cmd_impl::meta::unregister_workers(
884 context,
885 workers,
886 yes,
887 ignore_not_found,
888 check_fragment_occupied,
889 )
890 .await?
891 }
892 Commands::Meta(MetaCommands::ValidateSource { props }) => {
893 cmd_impl::meta::validate_source(context, props).await?
894 }
895 Commands::AwaitTree(AwaitTreeCommands::Dump {
896 actor_traces_format,
897 }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
898 Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
899 cmd_impl::await_tree::bottleneck_detect(context, path).await?
900 }
901 Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
902 rw_diagnose_tools::await_tree::transcribe(path)?
903 }
904 Commands::Profile(ProfileCommands::Cpu { sleep }) => {
905 cmd_impl::profile::cpu_profile(context, sleep).await?
906 }
907 Commands::Profile(ProfileCommands::Heap { dir }) => {
908 cmd_impl::profile::heap_profile(context, dir).await?
909 }
910 Commands::Scale(ScaleCommands::Cordon { workers }) => {
911 cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
912 .await?
913 }
914 Commands::Scale(ScaleCommands::Uncordon { workers }) => {
915 cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
916 .await?
917 }
918 Commands::Throttle(ThrottleCommands::Source(args)) => {
919 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
920 }
921 Commands::Throttle(ThrottleCommands::Mv(args)) => {
922 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
923 }
924 Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
925 table_id,
926 parallelism,
927 }) => {
928 set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
929 }
930 Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
931 }
932 Ok(())
933}