risingwave_ctl/cmd_impl/meta/
source_props.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::{Context, Result, anyhow};
18use risingwave_meta_model::SourceId;
19
20use crate::CtlContext;
21
22/// Alter source connector properties with pause/resume orchestration.
23///
24/// This is a safe way to update source properties that:
25/// 1. Pauses the source (already commits state)
26/// 2. Updates catalog and propagates changes
27/// 3. Optionally resets split assignments
28/// 4. Resumes the source
29pub async fn alter_source_properties_safe(
30    context: &CtlContext,
31    source_id: u32,
32    props_json: String,
33    reset_splits: bool,
34) -> Result<()> {
35    let meta_client = context.meta_client().await?;
36
37    let props: HashMap<String, String> =
38        serde_json::from_str(&props_json).context("Failed to parse props as JSON object")?;
39
40    if props.is_empty() {
41        return Err(anyhow!("No properties provided to update"));
42    }
43
44    println!("=== ALTER SOURCE PROPERTIES (SAFE) ===");
45    println!("Source ID: {}", source_id);
46    println!("Properties to update: {:?}", props);
47    println!("Reset splits: {}", reset_splits);
48    println!();
49
50    println!("WARNING: This operation will pause the source during the update.");
51    if reset_splits {
52        println!("WARNING: Split reset requested - this may cause data duplication or loss!");
53    }
54    println!();
55
56    meta_client
57        .alter_source_properties_safe(
58            SourceId::from(source_id),
59            props.into_iter().collect(),
60            Default::default(), // No secret refs for now
61            reset_splits,
62        )
63        .await?;
64
65    println!("Source properties updated successfully!");
66    Ok(())
67}
68
69/// Reset source split assignments.
70///
71/// WARNING: This is an UNSAFE operation that may cause data duplication or loss!
72/// It clears cached split state and triggers re-discovery from upstream.
73pub async fn reset_source_splits(context: &CtlContext, source_id: u32) -> Result<()> {
74    let meta_client = context.meta_client().await?;
75
76    println!("=== RESET SOURCE SPLITS (UNSAFE) ===");
77    println!("Source ID: {}", source_id);
78    println!();
79    println!("WARNING: This is an UNSAFE operation!");
80    println!("It will clear all cached split state and trigger re-discovery from upstream.");
81    println!("This may cause data duplication or loss depending on the connector!");
82    println!();
83
84    meta_client
85        .reset_source_splits(SourceId::from(source_id))
86        .await?;
87
88    println!("Source splits reset successfully!");
89    println!("New splits will be assigned on the next tick.");
90    Ok(())
91}
92
93/// Inject specific offsets into source splits.
94///
95/// WARNING: This is an UNSAFE operation that may cause data duplication or loss!
96/// Use with extreme caution and only when you know the exact offsets needed.
97pub async fn inject_source_offsets(
98    context: &CtlContext,
99    source_id: u32,
100    offsets_json: String,
101) -> Result<()> {
102    let meta_client = context.meta_client().await?;
103
104    let offsets: HashMap<String, String> =
105        serde_json::from_str(&offsets_json).context("Failed to parse offsets as JSON object")?;
106
107    if offsets.is_empty() {
108        return Err(anyhow!("No offsets provided to inject"));
109    }
110
111    println!("=== INJECT SOURCE OFFSETS (UNSAFE) ===");
112    println!("Source ID: {}", source_id);
113    println!("Offsets to inject:");
114    for (split_id, offset) in &offsets {
115        println!("  {} -> {}", split_id, offset);
116    }
117    println!();
118    println!("WARNING: This is an UNSAFE operation!");
119    println!("Injecting incorrect offsets WILL cause data duplication or loss!");
120    println!("Make sure you know the exact offsets needed before proceeding.");
121    println!();
122
123    let applied_splits = meta_client
124        .inject_source_offsets(SourceId::from(source_id), offsets)
125        .await?;
126
127    println!("Source offsets injected successfully!");
128    println!("Applied to splits: {:?}", applied_splits);
129    Ok(())
130}