risingwave_meta_service/
cloud_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::sync::LazyLock;
17
18use async_trait::async_trait;
19use regex::Regex;
20use risingwave_connector::WithOptionsSecResolved;
21use risingwave_connector::source::{ConnectorProperties, SourceEnumeratorContext};
22use risingwave_pb::cloud_service::cloud_service_server::CloudService;
23use risingwave_pb::cloud_service::rw_cloud_validate_source_response::{Error, ErrorType};
24use risingwave_pb::cloud_service::{
25    RwCloudValidateSourceRequest, RwCloudValidateSourceResponse, SourceType,
26};
27use thiserror_ext::AsReport;
28use tonic::{Request, Response, Status};
29pub struct CloudServiceImpl {}
30
31impl CloudServiceImpl {
32    pub fn new() -> Self {
33        Self {}
34    }
35}
36
37#[inline(always)]
38fn new_rwc_validate_fail_response(
39    error_type: ErrorType,
40    error_message: String,
41) -> Response<RwCloudValidateSourceResponse> {
42    Response::new(RwCloudValidateSourceResponse {
43        ok: false,
44        error: Some(Error {
45            error_type: error_type.into(),
46            error_message,
47        }),
48    })
49}
50
51#[async_trait]
52impl CloudService for CloudServiceImpl {
53    async fn rw_cloud_validate_source(
54        &self,
55        request: Request<RwCloudValidateSourceRequest>,
56    ) -> Result<Response<RwCloudValidateSourceResponse>, Status> {
57        let req = request.into_inner();
58        if req.source_type() != SourceType::Kafka {
59            return Err(Status::invalid_argument(
60                "unexpected source type, only kafka source is supported",
61            ));
62        }
63        let source_cfg: BTreeMap<String, String> = req.source_config.into_iter().collect();
64
65        // XXX: We can't use secret in cloud validate source.
66        let source_cfg = WithOptionsSecResolved::without_secrets(source_cfg);
67
68        // try fetch kafka metadata, return error message on failure
69        let props = ConnectorProperties::extract(source_cfg, false);
70        if let Err(e) = props {
71            return Ok(new_rwc_validate_fail_response(
72                ErrorType::KafkaInvalidProperties,
73                e.to_report_string(),
74            ));
75        };
76        let props = props.unwrap();
77
78        let enumerator = props
79            .create_split_enumerator(SourceEnumeratorContext::dummy().into())
80            .await;
81        if let Err(e) = enumerator {
82            return Ok(new_rwc_validate_fail_response(
83                ErrorType::KafkaInvalidProperties,
84                e.to_report_string(),
85            ));
86        }
87        if let Err(e) = enumerator.unwrap().list_splits().await {
88            let error_message = e.to_report_string();
89            if error_message.contains("BrokerTransportFailure") {
90                return Ok(new_rwc_validate_fail_response(
91                    ErrorType::KafkaBrokerUnreachable,
92                    e.to_report_string(),
93                ));
94            }
95            static TOPIC_NOT_FOUND: LazyLock<Regex> =
96                LazyLock::new(|| Regex::new(r"topic .* not found").unwrap());
97            if TOPIC_NOT_FOUND.is_match(error_message.as_str()) {
98                return Ok(new_rwc_validate_fail_response(
99                    ErrorType::KafkaTopicNotFound,
100                    e.to_report_string(),
101                ));
102            }
103            return Ok(new_rwc_validate_fail_response(
104                ErrorType::KafkaOther,
105                e.to_report_string(),
106            ));
107        }
108
109        Ok(Response::new(RwCloudValidateSourceResponse {
110            ok: true,
111            error: None,
112        }))
113    }
114}