risingwave_stream/from_proto/source/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod trad_source;
16
17use std::collections::BTreeMap;
18
19pub use trad_source::{SourceExecutorBuilder, create_source_desc_builder};
20mod fs_fetch;
21pub use fs_fetch::FsFetchExecutorBuilder;
22use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
23use risingwave_pb::catalog::PbStreamSourceInfo;
24use risingwave_pb::plan_common::SourceRefreshMode;
25use risingwave_pb::plan_common::source_refresh_mode::RefreshMode;
26
27use super::*;
28
29fn get_connector_name(with_props: &BTreeMap<String, String>) -> String {
30    with_props
31        .get(UPSTREAM_SOURCE_KEY)
32        .map(|s| s.to_lowercase())
33        .unwrap_or_default()
34}
35
36fn is_full_reload_refresh(refresh_mode: &Option<SourceRefreshMode>) -> bool {
37    refresh_mode
38        .as_ref()
39        .map(|refresh_mode| matches!(refresh_mode.refresh_mode, Some(RefreshMode::FullReload(_))))
40        .unwrap_or(false)
41}