risingwave_connector/source/reader/
reader.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::Context;
18use async_nats::jetstream::consumer::AckPolicy;
19use futures::StreamExt;
20use futures::stream::pending;
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::ColumnId;
25use thiserror_ext::AsReport as _;
26
27use crate::WithOptionsSecResolved;
28use crate::error::ConnectorResult;
29use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
30use crate::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
31use crate::source::filesystem::opendal_source::{
32    DEFAULT_REFRESH_INTERVAL_SEC, OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3,
33    OpendalSource,
34};
35use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
36use crate::source::{
37    BoxSourceChunkStream, BoxTryStream, Column, ConnectorProperties, ConnectorState,
38    CreateSplitReaderOpt, CreateSplitReaderResult, SourceColumnDesc, SourceContext,
39    WaitCheckpointTask,
40};
41
42#[derive(Clone, Debug)]
43pub struct SourceReader {
44    pub config: ConnectorProperties,
45    pub columns: Vec<SourceColumnDesc>,
46    pub parser_config: SpecificParserConfig,
47    pub connector_message_buffer_size: usize,
48}
49
50impl SourceReader {
51    pub fn new(
52        properties: WithOptionsSecResolved,
53        columns: Vec<SourceColumnDesc>,
54        connector_message_buffer_size: usize,
55        parser_config: SpecificParserConfig,
56    ) -> ConnectorResult<Self> {
57        let config = ConnectorProperties::extract(properties, false)?;
58
59        Ok(Self {
60            config,
61            columns,
62            parser_config,
63            connector_message_buffer_size,
64        })
65    }
66
67    fn get_target_columns(
68        &self,
69        column_ids: Vec<ColumnId>,
70    ) -> ConnectorResult<Vec<SourceColumnDesc>> {
71        column_ids
72            .iter()
73            .map(|id| {
74                self.columns
75                    .iter()
76                    .find(|c| c.column_id == *id)
77                    .with_context(|| {
78                        format!("Failed to find column id: {} in source: {:?}", id, self)
79                    })
80                    .cloned()
81            })
82            .try_collect()
83            .map_err(Into::into)
84    }
85
86    pub fn get_source_list(&self) -> ConnectorResult<BoxTryStream<FsPageItem>> {
87        let config = self.config.clone();
88        let list_interval_sec: u64;
89        let get_list_interval_sec =
90            |interval: Option<u64>| -> u64 { interval.unwrap_or(DEFAULT_REFRESH_INTERVAL_SEC) };
91        match config {
92            ConnectorProperties::Gcs(prop) => {
93                list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
94                let lister: OpendalEnumerator<OpendalGcs> =
95                    OpendalEnumerator::new_gcs_source(*prop)?;
96                Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
97            }
98            ConnectorProperties::OpendalS3(prop) => {
99                list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
100                let lister: OpendalEnumerator<OpendalS3> =
101                    OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?;
102                Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
103            }
104            ConnectorProperties::Azblob(prop) => {
105                list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
106                let lister: OpendalEnumerator<OpendalAzblob> =
107                    OpendalEnumerator::new_azblob_source(*prop)?;
108                Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
109            }
110            ConnectorProperties::PosixFs(prop) => {
111                list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
112                let lister: OpendalEnumerator<OpendalPosixFs> =
113                    OpendalEnumerator::new_posix_fs_source(*prop)?;
114                Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
115            }
116            other => bail!("Unsupported source: {:?}", other),
117        }
118    }
119
120    /// Refer to `WaitCheckpointWorker` for more details.
121    pub async fn create_wait_checkpoint_task(&self) -> ConnectorResult<Option<WaitCheckpointTask>> {
122        Ok(match &self.config {
123            ConnectorProperties::PostgresCdc(_) => Some(WaitCheckpointTask::CommitCdcOffset(None)),
124            ConnectorProperties::GooglePubsub(prop) => Some(WaitCheckpointTask::AckPubsubMessage(
125                prop.subscription_client().await?,
126                vec![],
127            )),
128            ConnectorProperties::Nats(prop) => {
129                match prop.nats_properties_consumer.get_ack_policy()? {
130                    a @ AckPolicy::Explicit | a @ AckPolicy::All => {
131                        Some(WaitCheckpointTask::AckNatsJetStream(
132                            prop.common.build_context().await?,
133                            vec![],
134                            a,
135                        ))
136                    }
137                    AckPolicy::None => None,
138                }
139            }
140            _ => None,
141        })
142    }
143
144    /// Build `SplitReader`s and then `BoxSourceChunkStream` from the given `ConnectorState` (`SplitImpl`s).
145    ///
146    /// If `seek_to_latest` is true, will also return the latest splits after seek.
147    pub async fn build_stream(
148        &self,
149        state: ConnectorState,
150        column_ids: Vec<ColumnId>,
151        source_ctx: Arc<SourceContext>,
152        seek_to_latest: bool,
153    ) -> ConnectorResult<(BoxSourceChunkStream, CreateSplitReaderResult)> {
154        let Some(splits) = state else {
155            return Ok((pending().boxed(), Default::default()));
156        };
157        let config = self.config.clone();
158        let columns = self.get_target_columns(column_ids)?;
159
160        let data_gen_columns = Some(
161            columns
162                .iter()
163                .map(|col| Column {
164                    name: col.name.clone(),
165                    data_type: col.data_type.clone(),
166                    is_visible: col.is_visible(),
167                })
168                .collect_vec(),
169        );
170
171        let parser_config = ParserConfig {
172            specific: self.parser_config.clone(),
173            common: CommonParserConfig {
174                rw_columns: columns,
175            },
176        };
177
178        config
179            .create_split_reader(
180                splits,
181                parser_config,
182                source_ctx,
183                data_gen_columns,
184                CreateSplitReaderOpt {
185                    seek_to_latest,
186                    ..Default::default()
187                },
188            )
189            .await
190    }
191}
192
193#[try_stream(boxed, ok = FsPageItem, error = crate::error::ConnectorError)]
194async fn build_opendal_fs_list_stream<Src: OpendalSource>(
195    lister: OpendalEnumerator<Src>,
196    list_interval_sec: u64,
197) {
198    loop {
199        let matcher = lister.get_matcher();
200        let mut object_metadata_iter = lister.list().await?;
201        while let Some(list_res) = object_metadata_iter.next().await {
202            match list_res {
203                Ok(res) => {
204                    if matcher
205                        .as_ref()
206                        .map(|m| m.matches(&res.name) || m.to_string() == res.name)
207                        .unwrap_or(true)
208                    {
209                        yield res
210                    } else {
211                        continue;
212                    }
213                }
214                Err(err) => {
215                    tracing::error!(error = %err.as_report(), "list object fail");
216                    return Err(err);
217                }
218            }
219        }
220        tokio::time::sleep(std::time::Duration::from_secs(list_interval_sec)).await;
221    }
222}
223
224#[try_stream(boxed, ok = OpendalFsSplit<Src>,  error = crate::error::ConnectorError)]
225pub async fn build_opendal_fs_list_for_batch<Src: OpendalSource>(lister: OpendalEnumerator<Src>) {
226    let matcher = lister.get_matcher();
227    let mut object_metadata_iter = lister.list().await?;
228
229    while let Some(list_res) = object_metadata_iter.next().await {
230        match list_res {
231            Ok(res) => {
232                if matcher
233                    .as_ref()
234                    .map(|m| m.matches(&res.name))
235                    .unwrap_or(true)
236                {
237                    let split = OpendalFsSplit::new(res.name, 0, res.size as usize);
238                    yield split
239                } else {
240                    continue;
241                }
242            }
243            Err(err) => {
244                tracing::error!(error = %err.as_report(), "list object fail");
245                return Err(err);
246            }
247        }
248    }
249}