1#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{ArgGroup, Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use itertools::Itertools;
22use risingwave_common::util::tokio_util::sync::CancellationToken;
23use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
24use risingwave_meta::backup_restore::RestoreOpts;
25use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
26use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
27use thiserror_ext::AsReport;
28
29use crate::cmd_impl::hummock::{
30 build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
31};
32use crate::cmd_impl::profile::ProfileWorkerType;
33use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
34use crate::cmd_impl::throttle::apply_throttle;
35use crate::common::CtlContext;
36
37pub mod cmd_impl;
38pub mod common;
39
40#[derive(Parser)]
46#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
47#[clap(propagate_version = true)]
48#[clap(infer_subcommands = true)]
49pub struct CliOpts {
50 #[clap(subcommand)]
51 command: Commands,
52}
53
54#[derive(Subcommand)]
55#[clap(infer_subcommands = true)]
56enum Commands {
57 #[clap(subcommand)]
59 Compute(ComputeCommands),
60 #[clap(subcommand)]
62 Hummock(HummockCommands),
63 #[clap(subcommand)]
65 Table(TableCommands),
66 #[clap(subcommand)]
68 Meta(MetaCommands),
69 #[clap(subcommand)]
71 Scale(ScaleCommands),
72 #[clap(subcommand)]
74 Bench(BenchCommands),
75 #[clap(subcommand)]
77 #[clap(visible_alias("trace"))]
78 AwaitTree(AwaitTreeCommands),
79 #[clap(subcommand)]
81 Profile(ProfileCommands),
82 #[clap(subcommand)]
83 Throttle(ThrottleCommands),
84 #[clap(subcommand, hide = true)]
86 Test(TestCommands),
87}
88
89#[derive(Subcommand)]
90enum ComputeCommands {
91 ShowConfig { host: String },
93}
94
95#[allow(clippy::large_enum_variant)]
96#[derive(Subcommand)]
97enum HummockCommands {
98 ListVersion {
100 #[clap(short, long = "verbose", default_value_t = false)]
101 verbose: bool,
102
103 #[clap(long = "verbose_key_range", default_value_t = false)]
104 verbose_key_range: bool,
105 },
106
107 ListVersionDeltas {
109 #[clap(short, long = "start-version-delta-id", default_value_t = 0)]
110 start_id: u64,
111
112 #[clap(short, long = "num-epochs", default_value_t = 100)]
113 num_epochs: u32,
114 },
115 DisableCommitEpoch,
117 ListKv {
119 #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
120 epoch: u64,
121
122 #[clap(short, long = "table-id")]
123 table_id: u32,
124
125 data_dir: Option<String>,
127
128 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
129 use_new_object_prefix_strategy: bool,
130 },
131 SstDump(SstDumpArgs),
132 TriggerManualCompaction {
134 #[clap(short, long = "compaction-group-id", default_value_t = 2)]
135 compaction_group_id: u64,
136
137 #[clap(short, long = "table-id", default_value_t = 0)]
138 table_id: u32,
139
140 #[clap(short, long = "level", default_value_t = 1)]
141 level: u32,
142
143 #[clap(short, long = "sst-ids", value_delimiter = ',')]
144 sst_ids: Vec<u64>,
145 },
146 TriggerFullGc {
149 #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
150 sst_retention_time_sec: u64,
151 #[clap(short, long = "prefix", required = false)]
152 prefix: Option<String>,
153 },
154 ListPinnedVersions {},
156 ListCompactionGroup,
158 UpdateCompactionConfig {
160 #[clap(long, value_delimiter = ',')]
161 compaction_group_ids: Vec<u64>,
162 #[clap(long)]
163 max_bytes_for_level_base: Option<u64>,
164 #[clap(long)]
165 max_bytes_for_level_multiplier: Option<u64>,
166 #[clap(long)]
167 max_compaction_bytes: Option<u64>,
168 #[clap(long)]
169 sub_level_max_compaction_bytes: Option<u64>,
170 #[clap(long)]
171 level0_tier_compact_file_number: Option<u64>,
172 #[clap(long)]
173 target_file_size_base: Option<u64>,
174 #[clap(long)]
175 compaction_filter_mask: Option<u32>,
176 #[clap(long)]
177 max_sub_compaction: Option<u32>,
178 #[clap(long)]
179 level0_stop_write_threshold_sub_level_number: Option<u64>,
180 #[clap(long)]
181 level0_sub_level_compact_level_count: Option<u32>,
182 #[clap(long)]
183 max_space_reclaim_bytes: Option<u64>,
184 #[clap(long)]
185 level0_max_compact_file_number: Option<u64>,
186 #[clap(long)]
187 level0_overlapping_sub_level_compact_level_count: Option<u32>,
188 #[clap(long)]
189 enable_emergency_picker: Option<bool>,
190 #[clap(long)]
191 tombstone_reclaim_ratio: Option<u32>,
192 #[clap(long)]
193 compression_level: Option<u32>,
194 #[clap(long)]
195 compression_algorithm: Option<String>,
196 #[clap(long)]
197 max_l0_compact_level: Option<u32>,
198 #[clap(long)]
199 sst_allowed_trivial_move_min_size: Option<u64>,
200 #[clap(long)]
201 disable_auto_group_scheduling: Option<bool>,
202 #[clap(long)]
203 max_overlapping_level_size: Option<u64>,
204 #[clap(long)]
205 sst_allowed_trivial_move_max_count: Option<u32>,
206 #[clap(long)]
207 emergency_level0_sst_file_count: Option<u32>,
208 #[clap(long)]
209 emergency_level0_sub_level_partition: Option<u32>,
210 #[clap(long)]
211 level0_stop_write_threshold_max_sst_count: Option<u32>,
212 #[clap(long)]
213 level0_stop_write_threshold_max_size: Option<u64>,
214 #[clap(long)]
215 enable_optimize_l0_interval_selection: Option<bool>,
216 #[clap(long)]
217 vnode_aligned_level_size_threshold: Option<u64>,
218 #[clap(long)]
219 max_kv_count_for_xor16: Option<u64>,
220 },
221 SplitCompactionGroup {
223 #[clap(long)]
224 compaction_group_id: u64,
225 #[clap(long, value_delimiter = ',')]
226 table_ids: Vec<u32>,
227 #[clap(long, default_value_t = 0)]
228 partition_vnode_count: u32,
229 },
230 PauseVersionCheckpoint,
232 ResumeVersionCheckpoint,
234 ReplayVersion,
236 ListCompactionStatus {
238 #[clap(short, long = "verbose", default_value_t = false)]
239 verbose: bool,
240 },
241 GetCompactionScore {
242 #[clap(long)]
243 compaction_group_id: u64,
244 },
245 ValidateVersion,
247 RebuildTableStats,
249 CancelCompactTask {
250 #[clap(short, long)]
251 task_id: u64,
252 },
253 PrintUserKeyInArchive {
254 #[clap(long, value_delimiter = ',')]
256 archive_ids: Vec<u64>,
257 #[clap(long)]
259 data_dir: String,
260 #[clap(long)]
262 user_key: String,
263 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
264 use_new_object_prefix_strategy: bool,
265 },
266 PrintVersionDeltaInArchive {
267 #[clap(long, value_delimiter = ',')]
269 archive_ids: Vec<u64>,
270 #[clap(long)]
272 data_dir: String,
273 #[clap(long)]
275 sst_id: u64,
276 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
277 use_new_object_prefix_strategy: bool,
278 },
279 TieredCacheTracing {
280 #[clap(long)]
281 enable: bool,
282 #[clap(long)]
283 record_hybrid_insert_threshold_ms: Option<u32>,
284 #[clap(long)]
285 record_hybrid_get_threshold_ms: Option<u32>,
286 #[clap(long)]
287 record_hybrid_obtain_threshold_ms: Option<u32>,
288 #[clap(long)]
289 record_hybrid_remove_threshold_ms: Option<u32>,
290 #[clap(long)]
291 record_hybrid_fetch_threshold_ms: Option<u32>,
292 },
293 MergeCompactionGroup {
294 #[clap(long)]
295 left_group_id: u64,
296 #[clap(long)]
297 right_group_id: u64,
298 },
299 MigrateLegacyObject {
300 url: String,
301 source_dir: String,
302 target_dir: String,
303 #[clap(long, default_value = "100")]
304 concurrency: u32,
305 },
306 ResizeCache {
307 #[clap(long)]
308 meta_cache_capacity_mb: Option<u64>,
309 #[clap(long)]
310 data_cache_capacity_mb: Option<u64>,
311 },
312}
313
314#[derive(Subcommand)]
315enum TableCommands {
316 Scan {
318 mv_name: String,
320 data_dir: Option<String>,
322
323 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
324 use_new_object_prefix_strategy: bool,
325 },
326 ScanById {
328 table_id: u32,
330 data_dir: Option<String>,
332 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
333 use_new_object_prefix_strategy: bool,
334 },
335 List,
337}
338
339#[derive(Subcommand, Debug)]
340enum ScaleCommands {
341 #[clap(verbatim_doc_comment)]
343 Cordon {
344 #[clap(
346 long,
347 required = true,
348 value_delimiter = ',',
349 value_name = "id or host,..."
350 )]
351 workers: Vec<String>,
352 },
353 Uncordon {
355 #[clap(
357 long,
358 required = true,
359 value_delimiter = ',',
360 value_name = "id or host,..."
361 )]
362 workers: Vec<String>,
363 },
364}
365
366#[derive(Subcommand)]
367#[allow(clippy::large_enum_variant)]
368enum MetaCommands {
369 Pause,
371 Resume,
373 #[clap(
375 group(
376 ArgGroup::new("resume_backfill_target")
377 .required(true)
378 .args(&["job_id", "fragment_id"])
379 )
380 )]
381 ResumeBackfill {
382 #[clap(long)]
383 job_id: Option<u32>,
384 #[clap(long)]
385 fragment_id: Option<u32>,
386 },
387 ClusterInfo,
389 SourceSplitInfo {
391 #[clap(long)]
392 ignore_id: bool,
393 },
394 #[clap(verbatim_doc_comment)]
411 #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
412 Reschedule {
413 #[clap(long, requires = "revision")]
415 plan: Option<String>,
416 #[clap(long)]
418 revision: Option<u64>,
419 #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
421 from: Option<String>,
422 #[clap(long, default_value = "false")]
424 dry_run: bool,
425 #[clap(long, default_value = "false")]
427 resolve_no_shuffle: bool,
428 },
429 BackupMeta {
431 #[clap(long)]
432 remarks: Option<String>,
433 },
434 RestoreMeta {
436 #[command(flatten)]
437 opts: RestoreOpts,
438 },
439 DeleteMetaSnapshots {
441 #[clap(long, value_delimiter = ',')]
442 snapshot_ids: Vec<u64>,
443 },
444
445 ListConnections,
447
448 ListServingFragmentMapping,
450
451 UnregisterWorkers {
453 #[clap(
455 long,
456 required = true,
457 value_delimiter = ',',
458 value_name = "worker_id or worker_host:worker_port, ..."
459 )]
460 workers: Vec<String>,
461
462 #[clap(short = 'y', long, default_value_t = false)]
464 yes: bool,
465
466 #[clap(long, default_value_t = false)]
468 ignore_not_found: bool,
469
470 #[clap(long, default_value_t = false)]
472 check_fragment_occupied: bool,
473 },
474
475 ValidateSource {
477 #[clap(long)]
480 props: String,
481 },
482
483 SetCdcTableBackfillParallelism {
484 #[clap(long, required = true)]
485 table_id: u32,
486 #[clap(long, required = true)]
487 parallelism: u32,
488 },
489}
490
491#[derive(Subcommand, Clone, Debug)]
492pub enum AwaitTreeCommands {
493 Dump {
495 #[clap(short, long = "actor-traces-format")]
497 actor_traces_format: Option<String>,
498 },
499 Analyze {
501 #[clap(long = "path")]
505 path: Option<String>,
506 },
507 Transcribe {
509 #[clap(long = "path")]
511 path: String,
512 },
513}
514
515#[derive(Subcommand, Clone, Debug)]
516enum TestCommands {
517 Jvm,
519}
520
521#[derive(Subcommand, Clone, Debug)]
522enum ThrottleCommands {
523 Source(ThrottleCommandArgs),
524 Mv(ThrottleCommandArgs),
525 Sink(ThrottleCommandArgs),
526}
527
528#[derive(Clone, Debug, clap::ValueEnum)]
529pub enum ThrottleTypeArg {
530 Dml,
531 Backfill,
532 Source,
533 Sink,
534}
535
536#[derive(Clone, Debug, Args)]
537pub struct ThrottleCommandArgs {
538 #[clap(long, required = true)]
540 id: u32,
541 #[clap(long)]
543 rate: Option<u32>,
544 #[clap(long, value_enum, required = true)]
546 throttle_type: ThrottleTypeArg,
547}
548
549#[derive(Subcommand, Clone, Debug)]
550pub enum ProfileCommands {
551 Cpu {
553 #[clap(short, long = "sleep")]
555 sleep: u64,
556 #[clap(long = "worker-type", value_name = "TYPE")]
558 worker_types: Vec<ProfileWorkerType>,
559 },
560 Heap {
562 #[clap(long = "dir")]
564 dir: Option<String>,
565 #[clap(long = "worker-type", value_name = "TYPE")]
567 worker_types: Vec<ProfileWorkerType>,
568 },
569}
570
571pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
578 let context = CtlContext::default();
579
580 tokio::select! {
581 _ = shutdown.cancelled() => {
582 context.try_close().await;
584 }
585
586 result = start_fallible(opts, &context) => {
587 if let Err(e) = result {
588 eprintln!("Error: {:#?}", e.as_report()); std::process::exit(1);
590 }
591 }
592 }
593}
594
595pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
598 let result = start_impl(opts, context).await;
599 context.try_close().await;
600 result
601}
602
603#[expect(
604 clippy::large_stack_frames,
605 reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
606 post-layout generator stores only one arm at a time (~13–16 KiB)."
607)]
608async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
609 match opts.command {
610 Commands::Compute(ComputeCommands::ShowConfig { host }) => {
611 cmd_impl::compute::show_config(&host).await?
612 }
613 Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
614 cmd_impl::hummock::disable_commit_epoch(context).await?
615 }
616 Commands::Hummock(HummockCommands::ListVersion {
617 verbose,
618 verbose_key_range,
619 }) => {
620 cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
621 }
622 Commands::Hummock(HummockCommands::ListVersionDeltas {
623 start_id,
624 num_epochs,
625 }) => {
626 cmd_impl::hummock::list_version_deltas(
627 context,
628 HummockVersionId::new(start_id),
629 num_epochs,
630 )
631 .await?;
632 }
633 Commands::Hummock(HummockCommands::ListKv {
634 epoch,
635 table_id,
636 data_dir,
637 use_new_object_prefix_strategy,
638 }) => {
639 cmd_impl::hummock::list_kv(
640 context,
641 epoch,
642 table_id,
643 data_dir,
644 use_new_object_prefix_strategy,
645 )
646 .await?;
647 }
648 Commands::Hummock(HummockCommands::SstDump(args)) => {
649 cmd_impl::hummock::sst_dump(context, args).await.unwrap()
650 }
651 Commands::Hummock(HummockCommands::TriggerManualCompaction {
652 compaction_group_id,
653 table_id,
654 level,
655 sst_ids,
656 }) => {
657 cmd_impl::hummock::trigger_manual_compaction(
658 context,
659 compaction_group_id,
660 table_id.into(),
661 level,
662 sst_ids,
663 )
664 .await?
665 }
666 Commands::Hummock(HummockCommands::TriggerFullGc {
667 sst_retention_time_sec,
668 prefix,
669 }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
670 Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
671 list_pinned_versions(context).await?
672 }
673 Commands::Hummock(HummockCommands::ListCompactionGroup) => {
674 cmd_impl::hummock::list_compaction_group(context).await?
675 }
676 Commands::Hummock(HummockCommands::UpdateCompactionConfig {
677 compaction_group_ids,
678 max_bytes_for_level_base,
679 max_bytes_for_level_multiplier,
680 max_compaction_bytes,
681 sub_level_max_compaction_bytes,
682 level0_tier_compact_file_number,
683 target_file_size_base,
684 compaction_filter_mask,
685 max_sub_compaction,
686 level0_stop_write_threshold_sub_level_number,
687 level0_sub_level_compact_level_count,
688 max_space_reclaim_bytes,
689 level0_max_compact_file_number,
690 level0_overlapping_sub_level_compact_level_count,
691 enable_emergency_picker,
692 tombstone_reclaim_ratio,
693 compression_level,
694 compression_algorithm,
695 max_l0_compact_level,
696 sst_allowed_trivial_move_min_size,
697 disable_auto_group_scheduling,
698 max_overlapping_level_size,
699 sst_allowed_trivial_move_max_count,
700 emergency_level0_sst_file_count,
701 emergency_level0_sub_level_partition,
702 level0_stop_write_threshold_max_sst_count,
703 level0_stop_write_threshold_max_size,
704 enable_optimize_l0_interval_selection,
705 vnode_aligned_level_size_threshold,
706 max_kv_count_for_xor16,
707 }) => {
708 cmd_impl::hummock::update_compaction_config(
709 context,
710 compaction_group_ids,
711 build_compaction_config_vec(
712 max_bytes_for_level_base,
713 max_bytes_for_level_multiplier,
714 max_compaction_bytes,
715 sub_level_max_compaction_bytes,
716 level0_tier_compact_file_number,
717 target_file_size_base,
718 compaction_filter_mask,
719 max_sub_compaction,
720 level0_stop_write_threshold_sub_level_number,
721 level0_sub_level_compact_level_count,
722 max_space_reclaim_bytes,
723 level0_max_compact_file_number,
724 level0_overlapping_sub_level_compact_level_count,
725 enable_emergency_picker,
726 tombstone_reclaim_ratio,
727 if let Some(level) = compression_level {
728 assert!(compression_algorithm.is_some());
729 Some(CompressionAlgorithm {
730 level,
731 compression_algorithm: compression_algorithm.unwrap(),
732 })
733 } else {
734 None
735 },
736 max_l0_compact_level,
737 sst_allowed_trivial_move_min_size,
738 disable_auto_group_scheduling,
739 max_overlapping_level_size,
740 sst_allowed_trivial_move_max_count,
741 emergency_level0_sst_file_count,
742 emergency_level0_sub_level_partition,
743 level0_stop_write_threshold_max_sst_count,
744 level0_stop_write_threshold_max_size,
745 enable_optimize_l0_interval_selection,
746 vnode_aligned_level_size_threshold,
747 max_kv_count_for_xor16,
748 ),
749 )
750 .await?
751 }
752 Commands::Hummock(HummockCommands::SplitCompactionGroup {
753 compaction_group_id,
754 table_ids,
755 partition_vnode_count,
756 }) => {
757 cmd_impl::hummock::split_compaction_group(
758 context,
759 compaction_group_id,
760 &table_ids.into_iter().map_into().collect_vec(),
761 partition_vnode_count,
762 )
763 .await?;
764 }
765 Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
766 cmd_impl::hummock::pause_version_checkpoint(context).await?;
767 }
768 Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
769 cmd_impl::hummock::resume_version_checkpoint(context).await?;
770 }
771 Commands::Hummock(HummockCommands::ReplayVersion) => {
772 cmd_impl::hummock::replay_version(context).await?;
773 }
774 Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
775 cmd_impl::hummock::list_compaction_status(context, verbose).await?;
776 }
777 Commands::Hummock(HummockCommands::GetCompactionScore {
778 compaction_group_id,
779 }) => {
780 cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
781 }
782 Commands::Hummock(HummockCommands::ValidateVersion) => {
783 cmd_impl::hummock::validate_version(context).await?;
784 }
785 Commands::Hummock(HummockCommands::RebuildTableStats) => {
786 cmd_impl::hummock::rebuild_table_stats(context).await?;
787 }
788 Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
789 cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
790 }
791 Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
792 archive_ids,
793 data_dir,
794 sst_id,
795 use_new_object_prefix_strategy,
796 }) => {
797 cmd_impl::hummock::print_version_delta_in_archive(
798 context,
799 archive_ids.into_iter().map(HummockVersionId::new),
800 data_dir,
801 sst_id,
802 use_new_object_prefix_strategy,
803 )
804 .await?;
805 }
806 Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
807 archive_ids,
808 data_dir,
809 user_key,
810 use_new_object_prefix_strategy,
811 }) => {
812 cmd_impl::hummock::print_user_key_in_archive(
813 context,
814 archive_ids.into_iter().map(HummockVersionId::new),
815 data_dir,
816 user_key,
817 use_new_object_prefix_strategy,
818 )
819 .await?;
820 }
821 Commands::Hummock(HummockCommands::TieredCacheTracing {
822 enable,
823 record_hybrid_insert_threshold_ms,
824 record_hybrid_get_threshold_ms,
825 record_hybrid_obtain_threshold_ms,
826 record_hybrid_remove_threshold_ms,
827 record_hybrid_fetch_threshold_ms,
828 }) => {
829 cmd_impl::hummock::tiered_cache_tracing(
830 context,
831 enable,
832 record_hybrid_insert_threshold_ms,
833 record_hybrid_get_threshold_ms,
834 record_hybrid_obtain_threshold_ms,
835 record_hybrid_remove_threshold_ms,
836 record_hybrid_fetch_threshold_ms,
837 )
838 .await?
839 }
840 Commands::Hummock(HummockCommands::MergeCompactionGroup {
841 left_group_id,
842 right_group_id,
843 }) => {
844 cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
845 .await?
846 }
847
848 Commands::Hummock(HummockCommands::MigrateLegacyObject {
849 url,
850 source_dir,
851 target_dir,
852 concurrency,
853 }) => {
854 migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
855 }
856 Commands::Hummock(HummockCommands::ResizeCache {
857 meta_cache_capacity_mb,
858 data_cache_capacity_mb,
859 }) => {
860 const MIB: u64 = 1024 * 1024;
861 cmd_impl::hummock::resize_cache(
862 context,
863 meta_cache_capacity_mb.map(|v| v * MIB),
864 data_cache_capacity_mb.map(|v| v * MIB),
865 )
866 .await?
867 }
868 Commands::Table(TableCommands::Scan {
869 mv_name,
870 data_dir,
871 use_new_object_prefix_strategy,
872 }) => {
873 cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
874 .await?
875 }
876 Commands::Table(TableCommands::ScanById {
877 table_id,
878 data_dir,
879 use_new_object_prefix_strategy,
880 }) => {
881 cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
882 .await?
883 }
884 Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
885 Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
886 Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
887 Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
888 Commands::Meta(MetaCommands::ResumeBackfill {
889 job_id,
890 fragment_id,
891 }) => cmd_impl::meta::resume_backfill(context, job_id, fragment_id).await?,
892 Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
893 Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
894 cmd_impl::meta::source_split_info(context, ignore_id).await?
895 }
896 Commands::Meta(MetaCommands::Reschedule {
897 from,
898 dry_run,
899 plan,
900 revision,
901 resolve_no_shuffle,
902 }) => {
903 cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
904 .await?
905 }
906 Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
907 cmd_impl::meta::backup_meta(context, remarks).await?
908 }
909 Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
910 risingwave_meta::backup_restore::restore(opts).await?
911 }
912 Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
913 cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
914 }
915 Commands::Meta(MetaCommands::ListConnections) => {
916 cmd_impl::meta::list_connections(context).await?
917 }
918 Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
919 cmd_impl::meta::list_serving_fragment_mappings(context).await?
920 }
921 Commands::Meta(MetaCommands::UnregisterWorkers {
922 workers,
923 yes,
924 ignore_not_found,
925 check_fragment_occupied,
926 }) => {
927 cmd_impl::meta::unregister_workers(
928 context,
929 workers,
930 yes,
931 ignore_not_found,
932 check_fragment_occupied,
933 )
934 .await?
935 }
936 Commands::Meta(MetaCommands::ValidateSource { props }) => {
937 cmd_impl::meta::validate_source(context, props).await?
938 }
939 Commands::AwaitTree(AwaitTreeCommands::Dump {
940 actor_traces_format,
941 }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
942 Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
943 cmd_impl::await_tree::bottleneck_detect(context, path).await?
944 }
945 Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
946 rw_diagnose_tools::await_tree::transcribe(path)?
947 }
948 Commands::Profile(ProfileCommands::Cpu {
949 sleep,
950 worker_types,
951 }) => cmd_impl::profile::cpu_profile(context, sleep, worker_types).await?,
952 Commands::Profile(ProfileCommands::Heap { dir, worker_types }) => {
953 cmd_impl::profile::heap_profile(context, dir, worker_types).await?
954 }
955 Commands::Scale(ScaleCommands::Cordon { workers }) => {
956 cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
957 .await?
958 }
959 Commands::Scale(ScaleCommands::Uncordon { workers }) => {
960 cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
961 .await?
962 }
963 Commands::Throttle(ThrottleCommands::Source(args)) => {
964 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
965 }
966 Commands::Throttle(ThrottleCommands::Mv(args)) => {
967 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
968 }
969 Commands::Throttle(ThrottleCommands::Sink(args)) => {
970 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Sink, args).await?;
971 }
972 Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
973 table_id,
974 parallelism,
975 }) => {
976 set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
977 }
978 Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
979 }
980 Ok(())
981}