risingwave_ctl/cmd_impl/meta/
source_props.rs1use std::collections::HashMap;
16
17use anyhow::{Context, Result, anyhow};
18use risingwave_meta_model::SourceId;
19
20use crate::CtlContext;
21
22pub async fn alter_source_properties_safe(
30 context: &CtlContext,
31 source_id: u32,
32 props_json: String,
33 reset_splits: bool,
34) -> Result<()> {
35 let meta_client = context.meta_client().await?;
36
37 let props: HashMap<String, String> =
38 serde_json::from_str(&props_json).context("Failed to parse props as JSON object")?;
39
40 if props.is_empty() {
41 return Err(anyhow!("No properties provided to update"));
42 }
43
44 println!("=== ALTER SOURCE PROPERTIES (SAFE) ===");
45 println!("Source ID: {}", source_id);
46 println!("Properties to update: {:?}", props);
47 println!("Reset splits: {}", reset_splits);
48 println!();
49
50 println!("WARNING: This operation will pause the source during the update.");
51 if reset_splits {
52 println!("WARNING: Split reset requested - this may cause data duplication or loss!");
53 }
54 println!();
55
56 meta_client
57 .alter_source_properties_safe(
58 SourceId::from(source_id),
59 props.into_iter().collect(),
60 Default::default(), reset_splits,
62 )
63 .await?;
64
65 println!("Source properties updated successfully!");
66 Ok(())
67}
68
69pub async fn reset_source_splits(context: &CtlContext, source_id: u32) -> Result<()> {
74 let meta_client = context.meta_client().await?;
75
76 println!("=== RESET SOURCE SPLITS (UNSAFE) ===");
77 println!("Source ID: {}", source_id);
78 println!();
79 println!("WARNING: This is an UNSAFE operation!");
80 println!("It will clear all cached split state and trigger re-discovery from upstream.");
81 println!("This may cause data duplication or loss depending on the connector!");
82 println!();
83
84 meta_client
85 .reset_source_splits(SourceId::from(source_id))
86 .await?;
87
88 println!("Source splits reset successfully!");
89 println!("New splits will be assigned on the next tick.");
90 Ok(())
91}
92
93pub async fn inject_source_offsets(
98 context: &CtlContext,
99 source_id: u32,
100 offsets_json: String,
101) -> Result<()> {
102 let meta_client = context.meta_client().await?;
103
104 let offsets: HashMap<String, String> =
105 serde_json::from_str(&offsets_json).context("Failed to parse offsets as JSON object")?;
106
107 if offsets.is_empty() {
108 return Err(anyhow!("No offsets provided to inject"));
109 }
110
111 println!("=== INJECT SOURCE OFFSETS (UNSAFE) ===");
112 println!("Source ID: {}", source_id);
113 println!("Offsets to inject:");
114 for (split_id, offset) in &offsets {
115 println!(" {} -> {}", split_id, offset);
116 }
117 println!();
118 println!("WARNING: This is an UNSAFE operation!");
119 println!("Injecting incorrect offsets WILL cause data duplication or loss!");
120 println!("Make sure you know the exact offsets needed before proceeding.");
121 println!();
122
123 let applied_splits = meta_client
124 .inject_source_offsets(SourceId::from(source_id), offsets)
125 .await?;
126
127 println!("Source offsets injected successfully!");
128 println!("Applied to splits: {:?}", applied_splits);
129 Ok(())
130}