1#![feature(let_chains)]
16
17use anyhow::Result;
18use clap::{Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use risingwave_common::util::tokio_util::sync::CancellationToken;
22use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
23use risingwave_meta::backup_restore::RestoreOpts;
24use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
25use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
26use thiserror_ext::AsReport;
27
28use crate::cmd_impl::hummock::{
29 build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
30};
31use crate::cmd_impl::throttle::apply_throttle;
32use crate::common::CtlContext;
33
34pub mod cmd_impl;
35pub mod common;
36
37#[derive(Parser)]
43#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
44#[clap(propagate_version = true)]
45#[clap(infer_subcommands = true)]
46pub struct CliOpts {
47 #[clap(subcommand)]
48 command: Commands,
49}
50
51#[derive(Subcommand)]
52#[clap(infer_subcommands = true)]
53enum Commands {
54 #[clap(subcommand)]
56 Compute(ComputeCommands),
57 #[clap(subcommand)]
59 Hummock(HummockCommands),
60 #[clap(subcommand)]
62 Table(TableCommands),
63 #[clap(subcommand)]
65 Meta(MetaCommands),
66 #[clap(subcommand)]
68 Scale(ScaleCommands),
69 #[clap(subcommand)]
71 Bench(BenchCommands),
72 #[clap(subcommand)]
74 #[clap(visible_alias("trace"))]
75 AwaitTree(AwaitTreeCommands),
76 #[clap(subcommand)]
79 Profile(ProfileCommands),
80 #[clap(subcommand)]
81 Throttle(ThrottleCommands),
82}
83
84#[derive(Subcommand)]
85enum ComputeCommands {
86 ShowConfig { host: String },
88}
89
90#[allow(clippy::large_enum_variant)]
91#[derive(Subcommand)]
92enum HummockCommands {
93 ListVersion {
95 #[clap(short, long = "verbose", default_value_t = false)]
96 verbose: bool,
97
98 #[clap(long = "verbose_key_range", default_value_t = false)]
99 verbose_key_range: bool,
100 },
101
102 ListVersionDeltas {
104 #[clap(short, long = "start-version-delta-id", default_value_t = 0)]
105 start_id: u64,
106
107 #[clap(short, long = "num-epochs", default_value_t = 100)]
108 num_epochs: u32,
109 },
110 DisableCommitEpoch,
112 ListKv {
114 #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
115 epoch: u64,
116
117 #[clap(short, long = "table-id")]
118 table_id: u32,
119
120 data_dir: Option<String>,
122
123 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
124 use_new_object_prefix_strategy: bool,
125 },
126 SstDump(SstDumpArgs),
127 TriggerManualCompaction {
129 #[clap(short, long = "compaction-group-id", default_value_t = 2)]
130 compaction_group_id: u64,
131
132 #[clap(short, long = "table-id", default_value_t = 0)]
133 table_id: u32,
134
135 #[clap(short, long = "level", default_value_t = 1)]
136 level: u32,
137
138 #[clap(short, long = "sst-ids", value_delimiter = ',')]
139 sst_ids: Vec<u64>,
140 },
141 TriggerFullGc {
144 #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
145 sst_retention_time_sec: u64,
146 #[clap(short, long = "prefix", required = false)]
147 prefix: Option<String>,
148 },
149 ListPinnedVersions {},
151 ListCompactionGroup,
153 UpdateCompactionConfig {
155 #[clap(long, value_delimiter = ',')]
156 compaction_group_ids: Vec<u64>,
157 #[clap(long)]
158 max_bytes_for_level_base: Option<u64>,
159 #[clap(long)]
160 max_bytes_for_level_multiplier: Option<u64>,
161 #[clap(long)]
162 max_compaction_bytes: Option<u64>,
163 #[clap(long)]
164 sub_level_max_compaction_bytes: Option<u64>,
165 #[clap(long)]
166 level0_tier_compact_file_number: Option<u64>,
167 #[clap(long)]
168 target_file_size_base: Option<u64>,
169 #[clap(long)]
170 compaction_filter_mask: Option<u32>,
171 #[clap(long)]
172 max_sub_compaction: Option<u32>,
173 #[clap(long)]
174 level0_stop_write_threshold_sub_level_number: Option<u64>,
175 #[clap(long)]
176 level0_sub_level_compact_level_count: Option<u32>,
177 #[clap(long)]
178 max_space_reclaim_bytes: Option<u64>,
179 #[clap(long)]
180 level0_max_compact_file_number: Option<u64>,
181 #[clap(long)]
182 level0_overlapping_sub_level_compact_level_count: Option<u32>,
183 #[clap(long)]
184 enable_emergency_picker: Option<bool>,
185 #[clap(long)]
186 tombstone_reclaim_ratio: Option<u32>,
187 #[clap(long)]
188 compression_level: Option<u32>,
189 #[clap(long)]
190 compression_algorithm: Option<String>,
191 #[clap(long)]
192 max_l0_compact_level: Option<u32>,
193 #[clap(long)]
194 sst_allowed_trivial_move_min_size: Option<u64>,
195 #[clap(long)]
196 disable_auto_group_scheduling: Option<bool>,
197 #[clap(long)]
198 max_overlapping_level_size: Option<u64>,
199 #[clap(long)]
200 sst_allowed_trivial_move_max_count: Option<u32>,
201 #[clap(long)]
202 emergency_level0_sst_file_count: Option<u32>,
203 #[clap(long)]
204 emergency_level0_sub_level_partition: Option<u32>,
205 #[clap(long)]
206 level0_stop_write_threshold_max_sst_count: Option<u32>,
207 #[clap(long)]
208 level0_stop_write_threshold_max_size: Option<u64>,
209 #[clap(long)]
210 enable_optimize_l0_interval_selection: Option<bool>,
211 },
212 SplitCompactionGroup {
214 #[clap(long)]
215 compaction_group_id: u64,
216 #[clap(long, value_delimiter = ',')]
217 table_ids: Vec<u32>,
218 #[clap(long, default_value_t = 0)]
219 partition_vnode_count: u32,
220 },
221 PauseVersionCheckpoint,
223 ResumeVersionCheckpoint,
225 ReplayVersion,
227 ListCompactionStatus {
229 #[clap(short, long = "verbose", default_value_t = false)]
230 verbose: bool,
231 },
232 GetCompactionScore {
233 #[clap(long)]
234 compaction_group_id: u64,
235 },
236 ValidateVersion,
238 RebuildTableStats,
240 CancelCompactTask {
241 #[clap(short, long)]
242 task_id: u64,
243 },
244 PrintUserKeyInArchive {
245 #[clap(long, value_delimiter = ',')]
247 archive_ids: Vec<u64>,
248 #[clap(long)]
250 data_dir: String,
251 #[clap(long)]
253 user_key: String,
254 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
255 use_new_object_prefix_strategy: bool,
256 },
257 PrintVersionDeltaInArchive {
258 #[clap(long, value_delimiter = ',')]
260 archive_ids: Vec<u64>,
261 #[clap(long)]
263 data_dir: String,
264 #[clap(long)]
266 sst_id: u64,
267 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
268 use_new_object_prefix_strategy: bool,
269 },
270 TieredCacheTracing {
271 #[clap(long)]
272 enable: bool,
273 #[clap(long)]
274 record_hybrid_insert_threshold_ms: Option<u32>,
275 #[clap(long)]
276 record_hybrid_get_threshold_ms: Option<u32>,
277 #[clap(long)]
278 record_hybrid_obtain_threshold_ms: Option<u32>,
279 #[clap(long)]
280 record_hybrid_remove_threshold_ms: Option<u32>,
281 #[clap(long)]
282 record_hybrid_fetch_threshold_ms: Option<u32>,
283 },
284 MergeCompactionGroup {
285 #[clap(long)]
286 left_group_id: u64,
287 #[clap(long)]
288 right_group_id: u64,
289 },
290 MigrateLegacyObject {
291 url: String,
292 source_dir: String,
293 target_dir: String,
294 #[clap(long, default_value = "100")]
295 concurrency: u32,
296 },
297 ResizeCache {
298 #[clap(long)]
299 meta_cache_capacity_mb: Option<u64>,
300 #[clap(long)]
301 data_cache_capacity_mb: Option<u64>,
302 },
303}
304
305#[derive(Subcommand)]
306enum TableCommands {
307 Scan {
309 mv_name: String,
311 data_dir: Option<String>,
313
314 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
315 use_new_object_prefix_strategy: bool,
316 },
317 ScanById {
319 table_id: u32,
321 data_dir: Option<String>,
323 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
324 use_new_object_prefix_strategy: bool,
325 },
326 List,
328}
329
330#[derive(Subcommand, Debug)]
331enum ScaleCommands {
332 #[clap(verbatim_doc_comment)]
334 Cordon {
335 #[clap(
337 long,
338 required = true,
339 value_delimiter = ',',
340 value_name = "id or host,..."
341 )]
342 workers: Vec<String>,
343 },
344 Uncordon {
346 #[clap(
348 long,
349 required = true,
350 value_delimiter = ',',
351 value_name = "id or host,..."
352 )]
353 workers: Vec<String>,
354 },
355}
356
357#[derive(Subcommand)]
358#[allow(clippy::large_enum_variant)]
359enum MetaCommands {
360 Pause,
362 Resume,
364 ClusterInfo,
366 SourceSplitInfo {
368 #[clap(long)]
369 ignore_id: bool,
370 },
371 #[clap(verbatim_doc_comment)]
388 #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
389 Reschedule {
390 #[clap(long, requires = "revision")]
392 plan: Option<String>,
393 #[clap(long)]
395 revision: Option<u64>,
396 #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
398 from: Option<String>,
399 #[clap(long, default_value = "false")]
401 dry_run: bool,
402 #[clap(long, default_value = "false")]
404 resolve_no_shuffle: bool,
405 },
406 BackupMeta {
408 #[clap(long)]
409 remarks: Option<String>,
410 },
411 RestoreMeta {
413 #[command(flatten)]
414 opts: RestoreOpts,
415 },
416 DeleteMetaSnapshots {
418 #[clap(long, value_delimiter = ',')]
419 snapshot_ids: Vec<u64>,
420 },
421
422 ListConnections,
424
425 ListServingFragmentMapping,
427
428 UnregisterWorkers {
430 #[clap(
432 long,
433 required = true,
434 value_delimiter = ',',
435 value_name = "worker_id or worker_host:worker_port, ..."
436 )]
437 workers: Vec<String>,
438
439 #[clap(short = 'y', long, default_value_t = false)]
441 yes: bool,
442
443 #[clap(long, default_value_t = false)]
445 ignore_not_found: bool,
446
447 #[clap(long, default_value_t = false)]
449 check_fragment_occupied: bool,
450 },
451
452 ValidateSource {
454 #[clap(long)]
457 props: String,
458 },
459
460 #[clap(verbatim_doc_comment)]
462 GraphCheck {
463 #[clap(long, required = true)]
465 endpoint: String,
466 },
467}
468
469#[derive(Subcommand, Clone, Debug)]
470pub enum AwaitTreeCommands {
471 Dump {
473 #[clap(short, long = "actor-traces-format")]
475 actor_traces_format: Option<String>,
476 },
477 Analyze {
479 #[clap(long = "path")]
483 path: Option<String>,
484 },
485 Transcribe {
487 #[clap(long = "path")]
489 path: String,
490 },
491}
492
493#[derive(Subcommand, Clone, Debug)]
494enum ThrottleCommands {
495 Source(ThrottleCommandArgs),
496 Mv(ThrottleCommandArgs),
497}
498
499#[derive(Clone, Debug, Args)]
500pub struct ThrottleCommandArgs {
501 id: u32,
502 rate: Option<u32>,
503}
504
505#[derive(Subcommand, Clone, Debug)]
506pub enum ProfileCommands {
507 Cpu {
509 #[clap(short, long = "sleep")]
511 sleep: u64,
512 },
513 Heap {
515 #[clap(long = "dir")]
517 dir: Option<String>,
518 },
519}
520
521pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
528 let context = CtlContext::default();
529
530 tokio::select! {
531 _ = shutdown.cancelled() => {
532 context.try_close().await;
534 }
535
536 result = start_fallible(opts, &context) => {
537 if let Err(e) = result {
538 eprintln!("Error: {:#?}", e.as_report()); std::process::exit(1);
540 }
541 }
542 }
543}
544
545pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
548 let result = start_impl(opts, context).await;
549 context.try_close().await;
550 result
551}
552
553async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
554 match opts.command {
555 Commands::Compute(ComputeCommands::ShowConfig { host }) => {
556 cmd_impl::compute::show_config(&host).await?
557 }
558 Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
559 cmd_impl::hummock::disable_commit_epoch(context).await?
560 }
561 Commands::Hummock(HummockCommands::ListVersion {
562 verbose,
563 verbose_key_range,
564 }) => {
565 cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
566 }
567 Commands::Hummock(HummockCommands::ListVersionDeltas {
568 start_id,
569 num_epochs,
570 }) => {
571 cmd_impl::hummock::list_version_deltas(
572 context,
573 HummockVersionId::new(start_id),
574 num_epochs,
575 )
576 .await?;
577 }
578 Commands::Hummock(HummockCommands::ListKv {
579 epoch,
580 table_id,
581 data_dir,
582 use_new_object_prefix_strategy,
583 }) => {
584 cmd_impl::hummock::list_kv(
585 context,
586 epoch,
587 table_id,
588 data_dir,
589 use_new_object_prefix_strategy,
590 )
591 .await?;
592 }
593 Commands::Hummock(HummockCommands::SstDump(args)) => {
594 cmd_impl::hummock::sst_dump(context, args).await.unwrap()
595 }
596 Commands::Hummock(HummockCommands::TriggerManualCompaction {
597 compaction_group_id,
598 table_id,
599 level,
600 sst_ids,
601 }) => {
602 cmd_impl::hummock::trigger_manual_compaction(
603 context,
604 compaction_group_id,
605 table_id,
606 level,
607 sst_ids,
608 )
609 .await?
610 }
611 Commands::Hummock(HummockCommands::TriggerFullGc {
612 sst_retention_time_sec,
613 prefix,
614 }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
615 Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
616 list_pinned_versions(context).await?
617 }
618 Commands::Hummock(HummockCommands::ListCompactionGroup) => {
619 cmd_impl::hummock::list_compaction_group(context).await?
620 }
621 Commands::Hummock(HummockCommands::UpdateCompactionConfig {
622 compaction_group_ids,
623 max_bytes_for_level_base,
624 max_bytes_for_level_multiplier,
625 max_compaction_bytes,
626 sub_level_max_compaction_bytes,
627 level0_tier_compact_file_number,
628 target_file_size_base,
629 compaction_filter_mask,
630 max_sub_compaction,
631 level0_stop_write_threshold_sub_level_number,
632 level0_sub_level_compact_level_count,
633 max_space_reclaim_bytes,
634 level0_max_compact_file_number,
635 level0_overlapping_sub_level_compact_level_count,
636 enable_emergency_picker,
637 tombstone_reclaim_ratio,
638 compression_level,
639 compression_algorithm,
640 max_l0_compact_level,
641 sst_allowed_trivial_move_min_size,
642 disable_auto_group_scheduling,
643 max_overlapping_level_size,
644 sst_allowed_trivial_move_max_count,
645 emergency_level0_sst_file_count,
646 emergency_level0_sub_level_partition,
647 level0_stop_write_threshold_max_sst_count,
648 level0_stop_write_threshold_max_size,
649 enable_optimize_l0_interval_selection,
650 }) => {
651 cmd_impl::hummock::update_compaction_config(
652 context,
653 compaction_group_ids,
654 build_compaction_config_vec(
655 max_bytes_for_level_base,
656 max_bytes_for_level_multiplier,
657 max_compaction_bytes,
658 sub_level_max_compaction_bytes,
659 level0_tier_compact_file_number,
660 target_file_size_base,
661 compaction_filter_mask,
662 max_sub_compaction,
663 level0_stop_write_threshold_sub_level_number,
664 level0_sub_level_compact_level_count,
665 max_space_reclaim_bytes,
666 level0_max_compact_file_number,
667 level0_overlapping_sub_level_compact_level_count,
668 enable_emergency_picker,
669 tombstone_reclaim_ratio,
670 if let Some(level) = compression_level {
671 assert!(compression_algorithm.is_some());
672 Some(CompressionAlgorithm {
673 level,
674 compression_algorithm: compression_algorithm.unwrap(),
675 })
676 } else {
677 None
678 },
679 max_l0_compact_level,
680 sst_allowed_trivial_move_min_size,
681 disable_auto_group_scheduling,
682 max_overlapping_level_size,
683 sst_allowed_trivial_move_max_count,
684 emergency_level0_sst_file_count,
685 emergency_level0_sub_level_partition,
686 level0_stop_write_threshold_max_sst_count,
687 level0_stop_write_threshold_max_size,
688 enable_optimize_l0_interval_selection,
689 ),
690 )
691 .await?
692 }
693 Commands::Hummock(HummockCommands::SplitCompactionGroup {
694 compaction_group_id,
695 table_ids,
696 partition_vnode_count,
697 }) => {
698 cmd_impl::hummock::split_compaction_group(
699 context,
700 compaction_group_id,
701 &table_ids,
702 partition_vnode_count,
703 )
704 .await?;
705 }
706 Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
707 cmd_impl::hummock::pause_version_checkpoint(context).await?;
708 }
709 Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
710 cmd_impl::hummock::resume_version_checkpoint(context).await?;
711 }
712 Commands::Hummock(HummockCommands::ReplayVersion) => {
713 cmd_impl::hummock::replay_version(context).await?;
714 }
715 Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
716 cmd_impl::hummock::list_compaction_status(context, verbose).await?;
717 }
718 Commands::Hummock(HummockCommands::GetCompactionScore {
719 compaction_group_id,
720 }) => {
721 cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
722 }
723 Commands::Hummock(HummockCommands::ValidateVersion) => {
724 cmd_impl::hummock::validate_version(context).await?;
725 }
726 Commands::Hummock(HummockCommands::RebuildTableStats) => {
727 cmd_impl::hummock::rebuild_table_stats(context).await?;
728 }
729 Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
730 cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
731 }
732 Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
733 archive_ids,
734 data_dir,
735 sst_id,
736 use_new_object_prefix_strategy,
737 }) => {
738 cmd_impl::hummock::print_version_delta_in_archive(
739 context,
740 archive_ids.into_iter().map(HummockVersionId::new),
741 data_dir,
742 sst_id,
743 use_new_object_prefix_strategy,
744 )
745 .await?;
746 }
747 Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
748 archive_ids,
749 data_dir,
750 user_key,
751 use_new_object_prefix_strategy,
752 }) => {
753 cmd_impl::hummock::print_user_key_in_archive(
754 context,
755 archive_ids.into_iter().map(HummockVersionId::new),
756 data_dir,
757 user_key,
758 use_new_object_prefix_strategy,
759 )
760 .await?;
761 }
762 Commands::Hummock(HummockCommands::TieredCacheTracing {
763 enable,
764 record_hybrid_insert_threshold_ms,
765 record_hybrid_get_threshold_ms,
766 record_hybrid_obtain_threshold_ms,
767 record_hybrid_remove_threshold_ms,
768 record_hybrid_fetch_threshold_ms,
769 }) => {
770 cmd_impl::hummock::tiered_cache_tracing(
771 context,
772 enable,
773 record_hybrid_insert_threshold_ms,
774 record_hybrid_get_threshold_ms,
775 record_hybrid_obtain_threshold_ms,
776 record_hybrid_remove_threshold_ms,
777 record_hybrid_fetch_threshold_ms,
778 )
779 .await?
780 }
781 Commands::Hummock(HummockCommands::MergeCompactionGroup {
782 left_group_id,
783 right_group_id,
784 }) => {
785 cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
786 .await?
787 }
788
789 Commands::Hummock(HummockCommands::MigrateLegacyObject {
790 url,
791 source_dir,
792 target_dir,
793 concurrency,
794 }) => {
795 migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
796 }
797 Commands::Hummock(HummockCommands::ResizeCache {
798 meta_cache_capacity_mb,
799 data_cache_capacity_mb,
800 }) => {
801 const MIB: u64 = 1024 * 1024;
802 cmd_impl::hummock::resize_cache(
803 context,
804 meta_cache_capacity_mb.map(|v| v * MIB),
805 data_cache_capacity_mb.map(|v| v * MIB),
806 )
807 .await?
808 }
809 Commands::Table(TableCommands::Scan {
810 mv_name,
811 data_dir,
812 use_new_object_prefix_strategy,
813 }) => {
814 cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
815 .await?
816 }
817 Commands::Table(TableCommands::ScanById {
818 table_id,
819 data_dir,
820 use_new_object_prefix_strategy,
821 }) => {
822 cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
823 .await?
824 }
825 Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
826 Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
827 Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
828 Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
829 Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
830 Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
831 cmd_impl::meta::source_split_info(context, ignore_id).await?
832 }
833 Commands::Meta(MetaCommands::Reschedule {
834 from,
835 dry_run,
836 plan,
837 revision,
838 resolve_no_shuffle,
839 }) => {
840 cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
841 .await?
842 }
843 Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
844 cmd_impl::meta::backup_meta(context, remarks).await?
845 }
846 Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
847 risingwave_meta::backup_restore::restore(opts).await?
848 }
849 Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
850 cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
851 }
852 Commands::Meta(MetaCommands::ListConnections) => {
853 cmd_impl::meta::list_connections(context).await?
854 }
855 Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
856 cmd_impl::meta::list_serving_fragment_mappings(context).await?
857 }
858 Commands::Meta(MetaCommands::UnregisterWorkers {
859 workers,
860 yes,
861 ignore_not_found,
862 check_fragment_occupied,
863 }) => {
864 cmd_impl::meta::unregister_workers(
865 context,
866 workers,
867 yes,
868 ignore_not_found,
869 check_fragment_occupied,
870 )
871 .await?
872 }
873 Commands::Meta(MetaCommands::ValidateSource { props }) => {
874 cmd_impl::meta::validate_source(context, props).await?
875 }
876 Commands::Meta(MetaCommands::GraphCheck { endpoint }) => {
877 cmd_impl::meta::graph_check(endpoint).await?
878 }
879 Commands::AwaitTree(AwaitTreeCommands::Dump {
880 actor_traces_format,
881 }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
882 Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
883 cmd_impl::await_tree::bottleneck_detect(context, path).await?
884 }
885 Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
886 rw_diagnose_tools::await_tree::transcribe(path)?
887 }
888 Commands::Profile(ProfileCommands::Cpu { sleep }) => {
889 cmd_impl::profile::cpu_profile(context, sleep).await?
890 }
891 Commands::Profile(ProfileCommands::Heap { dir }) => {
892 cmd_impl::profile::heap_profile(context, dir).await?
893 }
894 Commands::Scale(ScaleCommands::Cordon { workers }) => {
895 cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
896 .await?
897 }
898 Commands::Scale(ScaleCommands::Uncordon { workers }) => {
899 cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
900 .await?
901 }
902 Commands::Throttle(ThrottleCommands::Source(args)) => {
903 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
904 }
905 Commands::Throttle(ThrottleCommands::Mv(args)) => {
906 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
907 }
908 }
909 Ok(())
910}