risingwave_ctl/cmd_impl/meta/
connection.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_pb::catalog::connection::Info;
19use risingwave_pb::cloud_service::SourceType;
20use serde_json::Value;
21
22use crate::common::CtlContext;
23
24pub async fn list_connections(context: &CtlContext) -> anyhow::Result<()> {
25    let meta_client = context.meta_client().await?;
26    let connections = meta_client.list_connections(None).await?;
27
28    for conn in connections {
29        println!(
30            "Connection#{}, connection_name: {}, {}",
31            conn.id,
32            conn.name,
33            match conn.info {
34                #[expect(deprecated)]
35                Some(Info::PrivateLinkService(svc)) => format!(
36                    "PrivateLink: service_name: {}, endpoint_id: {}, dns_entries: {:?}",
37                    svc.service_name, svc.endpoint_id, svc.dns_entries,
38                ),
39                Some(Info::ConnectionParams(params)) => {
40                    format!(
41                        "CONNECTION_PARAMS_{}: {}",
42                        params.get_connection_type().unwrap().as_str_name(),
43                        serde_json::to_string(&params.get_properties()).unwrap()
44                    )
45                }
46                None => "None".to_owned(),
47            }
48        );
49    }
50    Ok(())
51}
52
53pub async fn validate_source(context: &CtlContext, props: String) -> anyhow::Result<()> {
54    let with_props: HashMap<String, String> =
55        serde_json::from_str::<HashMap<String, Value>>(props.as_str())
56            .expect("error parsing with props json")
57            .into_iter()
58            .map(|(key, val)| match val {
59                Value::String(s) => (key, s),
60                _ => (key, val.to_string()),
61            })
62            .collect();
63    let source_type = match with_props
64        .get("connector")
65        .expect("missing 'connector' in with clause")
66        .as_str()
67    {
68        "kafka" => Ok(SourceType::Kafka),
69        _ => Err(anyhow!(
70            "unsupported source type, only kafka sources are supported"
71        )),
72    }?;
73    let meta_client = context.meta_client().await?;
74    let resp = meta_client
75        .rw_cloud_validate_source(source_type, with_props)
76        .await?;
77    if !resp.ok {
78        eprintln!("{}", serde_json::to_string(&resp).unwrap());
79        std::process::exit(1);
80    }
81    Ok(())
82}