risingwave_ctl/cmd_impl/meta/
connection.rs1use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_pb::catalog::connection::Info;
19use risingwave_pb::cloud_service::SourceType;
20use serde_json::Value;
21
22use crate::common::CtlContext;
23
24pub async fn list_connections(context: &CtlContext) -> anyhow::Result<()> {
25 let meta_client = context.meta_client().await?;
26 let connections = meta_client.list_connections(None).await?;
27
28 for conn in connections {
29 println!(
30 "Connection#{}, connection_name: {}, {}",
31 conn.id,
32 conn.name,
33 match conn.info {
34 #[expect(deprecated)]
35 Some(Info::PrivateLinkService(svc)) => format!(
36 "PrivateLink: service_name: {}, endpoint_id: {}, dns_entries: {:?}",
37 svc.service_name, svc.endpoint_id, svc.dns_entries,
38 ),
39 Some(Info::ConnectionParams(params)) => {
40 format!(
41 "CONNECTION_PARAMS_{}: {}",
42 params.get_connection_type().unwrap().as_str_name(),
43 serde_json::to_string(¶ms.get_properties()).unwrap()
44 )
45 }
46 None => "None".to_owned(),
47 }
48 );
49 }
50 Ok(())
51}
52
53pub async fn validate_source(context: &CtlContext, props: String) -> anyhow::Result<()> {
54 let with_props: HashMap<String, String> =
55 serde_json::from_str::<HashMap<String, Value>>(props.as_str())
56 .expect("error parsing with props json")
57 .into_iter()
58 .map(|(key, val)| match val {
59 Value::String(s) => (key, s),
60 _ => (key, val.to_string()),
61 })
62 .collect();
63 let source_type = match with_props
64 .get("connector")
65 .expect("missing 'connector' in with clause")
66 .as_str()
67 {
68 "kafka" => Ok(SourceType::Kafka),
69 _ => Err(anyhow!(
70 "unsupported source type, only kafka sources are supported"
71 )),
72 }?;
73 let meta_client = context.meta_client().await?;
74 let resp = meta_client
75 .rw_cloud_validate_source(source_type, with_props)
76 .await?;
77 if !resp.ok {
78 eprintln!("{}", serde_json::to_string(&resp).unwrap());
79 std::process::exit(1);
80 }
81 Ok(())
82}