risingwave_connector/source/reader/
reader.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::Context;
18use async_nats::jetstream::consumer::AckPolicy;
19use futures::StreamExt;
20use futures::stream::pending;
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::ColumnId;
25use thiserror_ext::AsReport as _;
26
27use crate::WithOptionsSecResolved;
28use crate::error::ConnectorResult;
29use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
30use crate::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
31use crate::source::filesystem::opendal_source::{
32    DEFAULT_REFRESH_INTERVAL_SEC, OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3,
33    OpendalSource,
34};
35use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
36use crate::source::{
37    BoxSourceChunkStream, BoxTryStream, Column, ConnectorProperties, ConnectorState,
38    CreateSplitReaderOpt, CreateSplitReaderResult, SourceColumnDesc, SourceContext,
39    WaitCheckpointTask,
40};
41
42#[derive(Clone, Debug)]
43pub struct SourceReader {
44    pub config: ConnectorProperties,
45    pub columns: Vec<SourceColumnDesc>,
46    pub parser_config: SpecificParserConfig,
47    pub connector_message_buffer_size: usize,
48}
49
50impl SourceReader {
51    pub fn new(
52        properties: WithOptionsSecResolved,
53        columns: Vec<SourceColumnDesc>,
54        connector_message_buffer_size: usize,
55        parser_config: SpecificParserConfig,
56    ) -> ConnectorResult<Self> {
57        let config = ConnectorProperties::extract(properties, false)?;
58
59        Ok(Self {
60            config,
61            columns,
62            parser_config,
63            connector_message_buffer_size,
64        })
65    }
66
67    fn get_target_columns(
68        &self,
69        column_ids: Vec<ColumnId>,
70    ) -> ConnectorResult<Vec<SourceColumnDesc>> {
71        column_ids
72            .iter()
73            .map(|id| {
74                self.columns
75                    .iter()
76                    .find(|c| c.column_id == *id)
77                    .with_context(|| {
78                        format!("Failed to find column id: {} in source: {:?}", id, self)
79                    })
80                    .cloned()
81            })
82            .try_collect()
83            .map_err(Into::into)
84    }
85
86    pub fn get_source_list(&self) -> ConnectorResult<BoxTryStream<FsPageItem>> {
87        let config = self.config.clone();
88        let list_interval_sec: u64;
89        let get_list_interval_sec =
90            |interval: Option<u64>| -> u64 { interval.unwrap_or(DEFAULT_REFRESH_INTERVAL_SEC) };
91        match config {
92            ConnectorProperties::Gcs(prop) => {
93                list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
94                let lister: OpendalEnumerator<OpendalGcs> =
95                    OpendalEnumerator::new_gcs_source(*prop)?;
96                Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
97            }
98            ConnectorProperties::OpendalS3(prop) => {
99                list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
100                let lister: OpendalEnumerator<OpendalS3> = OpendalEnumerator::new_s3_source(
101                    &prop.s3_properties,
102                    prop.assume_role,
103                    prop.fs_common.compression_format,
104                )?;
105                Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
106            }
107            ConnectorProperties::Azblob(prop) => {
108                list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
109                let lister: OpendalEnumerator<OpendalAzblob> =
110                    OpendalEnumerator::new_azblob_source(*prop)?;
111                Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
112            }
113            ConnectorProperties::PosixFs(prop) => {
114                list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
115                let lister: OpendalEnumerator<OpendalPosixFs> =
116                    OpendalEnumerator::new_posix_fs_source(*prop)?;
117                Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
118            }
119            other => bail!("Unsupported source: {:?}", other),
120        }
121    }
122
123    /// Refer to `WaitCheckpointWorker` for more details.
124    pub async fn create_wait_checkpoint_task(&self) -> ConnectorResult<Option<WaitCheckpointTask>> {
125        Ok(match &self.config {
126            ConnectorProperties::PostgresCdc(_) => Some(WaitCheckpointTask::CommitCdcOffset(None)),
127            ConnectorProperties::GooglePubsub(prop) => Some(WaitCheckpointTask::AckPubsubMessage(
128                prop.subscription_client().await?,
129                vec![],
130            )),
131            ConnectorProperties::Nats(prop) => {
132                match prop.nats_properties_consumer.get_ack_policy()? {
133                    a @ AckPolicy::Explicit | a @ AckPolicy::All => {
134                        Some(WaitCheckpointTask::AckNatsJetStream(
135                            prop.common.build_context().await?,
136                            vec![],
137                            a,
138                        ))
139                    }
140                    AckPolicy::None => None,
141                }
142            }
143            ConnectorProperties::Pulsar(_) => Some(WaitCheckpointTask::AckPulsarMessage(vec![])),
144            _ => None,
145        })
146    }
147
148    /// Build `SplitReader`s and then `BoxSourceChunkStream` from the given `ConnectorState` (`SplitImpl`s).
149    ///
150    /// If `seek_to_latest` is true, will also return the latest splits after seek.
151    pub async fn build_stream(
152        &self,
153        state: ConnectorState,
154        column_ids: Vec<ColumnId>,
155        source_ctx: Arc<SourceContext>,
156        seek_to_latest: bool,
157    ) -> ConnectorResult<(BoxSourceChunkStream, CreateSplitReaderResult)> {
158        let Some(splits) = state else {
159            return Ok((pending().boxed(), Default::default()));
160        };
161        let config = self.config.clone();
162        let columns = self.get_target_columns(column_ids)?;
163
164        let data_gen_columns = Some(
165            columns
166                .iter()
167                .map(|col| Column {
168                    name: col.name.clone(),
169                    data_type: col.data_type.clone(),
170                    is_visible: col.is_visible(),
171                })
172                .collect_vec(),
173        );
174
175        let parser_config = ParserConfig {
176            specific: self.parser_config.clone(),
177            common: CommonParserConfig {
178                rw_columns: columns,
179            },
180        };
181
182        config
183            .create_split_reader(
184                splits,
185                parser_config,
186                source_ctx,
187                data_gen_columns,
188                CreateSplitReaderOpt {
189                    seek_to_latest,
190                    ..Default::default()
191                },
192            )
193            .await
194    }
195}
196
197#[try_stream(boxed, ok = FsPageItem, error = crate::error::ConnectorError)]
198async fn build_opendal_fs_list_stream<Src: OpendalSource>(
199    lister: OpendalEnumerator<Src>,
200    list_interval_sec: u64,
201) {
202    loop {
203        let matcher = lister.get_matcher();
204        let mut object_metadata_iter = lister.list().await?;
205        while let Some(list_res) = object_metadata_iter.next().await {
206            match list_res {
207                Ok(res) => {
208                    if matcher
209                        .as_ref()
210                        .map(|m| m.matches(&res.name) || m.to_string() == res.name)
211                        .unwrap_or(true)
212                    {
213                        yield res
214                    } else {
215                        continue;
216                    }
217                }
218                Err(err) => {
219                    tracing::error!(error = %err.as_report(), "list object fail");
220                    return Err(err);
221                }
222            }
223        }
224        tokio::time::sleep(std::time::Duration::from_secs(list_interval_sec)).await;
225    }
226}
227
228#[try_stream(boxed, ok = OpendalFsSplit<Src>,  error = crate::error::ConnectorError)]
229pub async fn build_opendal_fs_list_for_batch<Src: OpendalSource>(lister: OpendalEnumerator<Src>) {
230    let matcher = lister.get_matcher();
231    let mut object_metadata_iter = lister.list().await?;
232
233    while let Some(list_res) = object_metadata_iter.next().await {
234        match list_res {
235            Ok(res) => {
236                if matcher
237                    .as_ref()
238                    .map(|m| m.matches(&res.name))
239                    .unwrap_or(true)
240                {
241                    let split = OpendalFsSplit::new(res.name, 0, res.size as usize);
242                    yield split
243                } else {
244                    continue;
245                }
246            }
247            Err(err) => {
248                tracing::error!(error = %err.as_report(), "list object fail");
249                return Err(err);
250            }
251        }
252    }
253}