risingwave_ctl/cmd_impl/meta/
connection.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_pb::catalog::connection::Info;
19use risingwave_pb::cloud_service::SourceType;
20use serde_json::Value;
21
22use crate::common::CtlContext;
23
24pub async fn list_connections(context: &CtlContext) -> anyhow::Result<()> {
25    let meta_client = context.meta_client().await?;
26    let connections = meta_client.list_connections(None).await?;
27
28    for conn in connections {
29        println!(
30            "Connection#{}, connection_name: {}, {}",
31            conn.id,
32            conn.name,
33            match conn.info {
34                Some(Info::PrivateLinkService(svc)) => format!(
35                    "PrivateLink: service_name: {}, endpoint_id: {}, dns_entries: {:?}",
36                    svc.service_name, svc.endpoint_id, svc.dns_entries,
37                ),
38                Some(Info::ConnectionParams(params)) => {
39                    format!(
40                        "CONNECTION_PARAMS_{}: {}",
41                        params.get_connection_type().unwrap().as_str_name(),
42                        serde_json::to_string(&params.get_properties()).unwrap()
43                    )
44                }
45                None => "None".to_owned(),
46            }
47        );
48    }
49    Ok(())
50}
51
52pub async fn validate_source(context: &CtlContext, props: String) -> anyhow::Result<()> {
53    let with_props: HashMap<String, String> =
54        serde_json::from_str::<HashMap<String, Value>>(props.as_str())
55            .expect("error parsing with props json")
56            .into_iter()
57            .map(|(key, val)| match val {
58                Value::String(s) => (key, s),
59                _ => (key, val.to_string()),
60            })
61            .collect();
62    let source_type = match with_props
63        .get("connector")
64        .expect("missing 'connector' in with clause")
65        .as_str()
66    {
67        "kafka" => Ok(SourceType::Kafka),
68        _ => Err(anyhow!(
69            "unsupported source type, only kafka sources are supported"
70        )),
71    }?;
72    let meta_client = context.meta_client().await?;
73    let resp = meta_client
74        .rw_cloud_validate_source(source_type, with_props)
75        .await?;
76    if !resp.ok {
77        eprintln!("{}", serde_json::to_string(&resp).unwrap());
78        std::process::exit(1);
79    }
80    Ok(())
81}