risingwave_meta_service/
cloud_service.rs1use std::collections::BTreeMap;
16use std::sync::LazyLock;
17
18use async_trait::async_trait;
19use regex::Regex;
20use risingwave_connector::WithOptionsSecResolved;
21use risingwave_connector::source::{ConnectorProperties, SourceEnumeratorContext};
22use risingwave_pb::cloud_service::cloud_service_server::CloudService;
23use risingwave_pb::cloud_service::rw_cloud_validate_source_response::{Error, ErrorType};
24use risingwave_pb::cloud_service::{
25 RwCloudValidateSourceRequest, RwCloudValidateSourceResponse, SourceType,
26};
27use thiserror_ext::AsReport;
28use tonic::{Request, Response, Status};
29pub struct CloudServiceImpl {}
30
31impl CloudServiceImpl {
32 pub fn new() -> Self {
33 Self {}
34 }
35}
36
37#[inline(always)]
38fn new_rwc_validate_fail_response(
39 error_type: ErrorType,
40 error_message: String,
41) -> Response<RwCloudValidateSourceResponse> {
42 Response::new(RwCloudValidateSourceResponse {
43 ok: false,
44 error: Some(Error {
45 error_type: error_type.into(),
46 error_message,
47 }),
48 })
49}
50
51#[async_trait]
52impl CloudService for CloudServiceImpl {
53 async fn rw_cloud_validate_source(
54 &self,
55 request: Request<RwCloudValidateSourceRequest>,
56 ) -> Result<Response<RwCloudValidateSourceResponse>, Status> {
57 let req = request.into_inner();
58 if req.source_type() != SourceType::Kafka {
59 return Err(Status::invalid_argument(
60 "unexpected source type, only kafka source is supported",
61 ));
62 }
63 let source_cfg: BTreeMap<String, String> = req.source_config.into_iter().collect();
64
65 let source_cfg = WithOptionsSecResolved::without_secrets(source_cfg);
67
68 let props = ConnectorProperties::extract(source_cfg, false);
70 if let Err(e) = props {
71 return Ok(new_rwc_validate_fail_response(
72 ErrorType::KafkaInvalidProperties,
73 e.to_report_string(),
74 ));
75 };
76 let props = props.unwrap();
77
78 let enumerator = props
79 .create_split_enumerator(SourceEnumeratorContext::dummy().into())
80 .await;
81 if let Err(e) = enumerator {
82 return Ok(new_rwc_validate_fail_response(
83 ErrorType::KafkaInvalidProperties,
84 e.to_report_string(),
85 ));
86 }
87 if let Err(e) = enumerator.unwrap().list_splits().await {
88 let error_message = e.to_report_string();
89 if error_message.contains("BrokerTransportFailure") {
90 return Ok(new_rwc_validate_fail_response(
91 ErrorType::KafkaBrokerUnreachable,
92 e.to_report_string(),
93 ));
94 }
95 static TOPIC_NOT_FOUND: LazyLock<Regex> =
96 LazyLock::new(|| Regex::new(r"topic .* not found").unwrap());
97 if TOPIC_NOT_FOUND.is_match(error_message.as_str()) {
98 return Ok(new_rwc_validate_fail_response(
99 ErrorType::KafkaTopicNotFound,
100 e.to_report_string(),
101 ));
102 }
103 return Ok(new_rwc_validate_fail_response(
104 ErrorType::KafkaOther,
105 e.to_report_string(),
106 ));
107 }
108
109 Ok(Response::new(RwCloudValidateSourceResponse {
110 ok: true,
111 error: None,
112 }))
113 }
114}