risingwave_ctl/cmd_impl/meta/
connection.rs1use std::collections::HashMap;
16
17use anyhow::anyhow;
18use risingwave_pb::catalog::connection::Info;
19use risingwave_pb::cloud_service::SourceType;
20use serde_json::Value;
21
22use crate::common::CtlContext;
23
24pub async fn list_connections(context: &CtlContext) -> anyhow::Result<()> {
25 let meta_client = context.meta_client().await?;
26 let connections = meta_client.list_connections(None).await?;
27
28 for conn in connections {
29 println!(
30 "Connection#{}, connection_name: {}, {}",
31 conn.id,
32 conn.name,
33 match conn.info {
34 Some(Info::PrivateLinkService(svc)) => format!(
35 "PrivateLink: service_name: {}, endpoint_id: {}, dns_entries: {:?}",
36 svc.service_name, svc.endpoint_id, svc.dns_entries,
37 ),
38 Some(Info::ConnectionParams(params)) => {
39 format!(
40 "CONNECTION_PARAMS_{}: {}",
41 params.get_connection_type().unwrap().as_str_name(),
42 serde_json::to_string(¶ms.get_properties()).unwrap()
43 )
44 }
45 None => "None".to_owned(),
46 }
47 );
48 }
49 Ok(())
50}
51
52pub async fn validate_source(context: &CtlContext, props: String) -> anyhow::Result<()> {
53 let with_props: HashMap<String, String> =
54 serde_json::from_str::<HashMap<String, Value>>(props.as_str())
55 .expect("error parsing with props json")
56 .into_iter()
57 .map(|(key, val)| match val {
58 Value::String(s) => (key, s),
59 _ => (key, val.to_string()),
60 })
61 .collect();
62 let source_type = match with_props
63 .get("connector")
64 .expect("missing 'connector' in with clause")
65 .as_str()
66 {
67 "kafka" => Ok(SourceType::Kafka),
68 _ => Err(anyhow!(
69 "unsupported source type, only kafka sources are supported"
70 )),
71 }?;
72 let meta_client = context.meta_client().await?;
73 let resp = meta_client
74 .rw_cloud_validate_source(source_type, with_props)
75 .await?;
76 if !resp.ok {
77 eprintln!("{}", serde_json::to_string(&resp).unwrap());
78 std::process::exit(1);
79 }
80 Ok(())
81}