risingwave_connector/source/reader/
reader.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::Context;
18use async_nats::jetstream::consumer::AckPolicy;
19use futures::StreamExt;
20use futures::stream::pending;
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::ColumnId;
25use thiserror_ext::AsReport as _;
26
27use crate::WithOptionsSecResolved;
28use crate::error::ConnectorResult;
29use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
30use crate::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
31use crate::source::filesystem::opendal_source::{
32    DEFAULT_REFRESH_INTERVAL_SEC, OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3,
33    OpendalSource,
34};
35use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
36use crate::source::{
37    BoxSourceChunkStream, BoxTryStream, Column, ConnectorProperties, ConnectorState,
38    CreateSplitReaderOpt, CreateSplitReaderResult, SourceColumnDesc, SourceContext,
39    WaitCheckpointTask,
40};
41
42#[derive(Clone, Debug)]
43pub struct SourceReader {
44    pub config: ConnectorProperties,
45    pub columns: Vec<SourceColumnDesc>,
46    pub parser_config: SpecificParserConfig,
47    pub connector_message_buffer_size: usize,
48}
49
50impl SourceReader {
51    pub fn new(
52        properties: WithOptionsSecResolved,
53        columns: Vec<SourceColumnDesc>,
54        connector_message_buffer_size: usize,
55        parser_config: SpecificParserConfig,
56    ) -> ConnectorResult<Self> {
57        let config = ConnectorProperties::extract(properties, false)?;
58
59        Ok(Self {
60            config,
61            columns,
62            parser_config,
63            connector_message_buffer_size,
64        })
65    }
66
67    fn get_target_columns(
68        &self,
69        column_ids: Vec<ColumnId>,
70    ) -> ConnectorResult<Vec<SourceColumnDesc>> {
71        column_ids
72            .iter()
73            .map(|id| {
74                self.columns
75                    .iter()
76                    .find(|c| c.column_id == *id)
77                    .with_context(|| {
78                        format!("Failed to find column id: {} in source: {:?}", id, self)
79                    })
80                    .cloned()
81            })
82            .try_collect()
83            .map_err(Into::into)
84    }
85
86    pub fn get_source_list(&self) -> ConnectorResult<BoxTryStream<FsPageItem>> {
87        let config = self.config.clone();
88        let list_interval_sec: u64;
89        let get_list_interval_sec =
90            |interval: Option<u64>| -> u64 { interval.unwrap_or(DEFAULT_REFRESH_INTERVAL_SEC) };
91        match config {
92            ConnectorProperties::Gcs(prop) => {
93                list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
94                let lister: OpendalEnumerator<OpendalGcs> =
95                    OpendalEnumerator::new_gcs_source(*prop)?;
96                Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
97            }
98            ConnectorProperties::OpendalS3(prop) => {
99                list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
100                let lister: OpendalEnumerator<OpendalS3> = OpendalEnumerator::new_s3_source(
101                    &prop.s3_properties,
102                    prop.assume_role,
103                    prop.fs_common.compression_format,
104                )?;
105                Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
106            }
107            ConnectorProperties::Azblob(prop) => {
108                list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
109                let lister: OpendalEnumerator<OpendalAzblob> =
110                    OpendalEnumerator::new_azblob_source(*prop)?;
111                Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
112            }
113            ConnectorProperties::PosixFs(prop) => {
114                list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
115                let lister: OpendalEnumerator<OpendalPosixFs> =
116                    OpendalEnumerator::new_posix_fs_source(*prop)?;
117                Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
118            }
119            other => bail!("Unsupported source: {:?}", other),
120        }
121    }
122
123    /// Refer to `WaitCheckpointWorker` for more details.
124    pub async fn create_wait_checkpoint_task(&self) -> ConnectorResult<Option<WaitCheckpointTask>> {
125        Ok(match &self.config {
126            ConnectorProperties::PostgresCdc(_) => Some(WaitCheckpointTask::CommitCdcOffset(None)),
127            ConnectorProperties::GooglePubsub(prop) => Some(WaitCheckpointTask::AckPubsubMessage(
128                prop.subscription_client().await?,
129                vec![],
130            )),
131            ConnectorProperties::Nats(prop) => {
132                match prop.nats_properties_consumer.get_ack_policy()? {
133                    a @ AckPolicy::Explicit | a @ AckPolicy::All => {
134                        Some(WaitCheckpointTask::AckNatsJetStream(
135                            prop.common.build_context().await?,
136                            vec![],
137                            a,
138                        ))
139                    }
140                    AckPolicy::None => None,
141                }
142            }
143            _ => None,
144        })
145    }
146
147    /// Build `SplitReader`s and then `BoxSourceChunkStream` from the given `ConnectorState` (`SplitImpl`s).
148    ///
149    /// If `seek_to_latest` is true, will also return the latest splits after seek.
150    pub async fn build_stream(
151        &self,
152        state: ConnectorState,
153        column_ids: Vec<ColumnId>,
154        source_ctx: Arc<SourceContext>,
155        seek_to_latest: bool,
156    ) -> ConnectorResult<(BoxSourceChunkStream, CreateSplitReaderResult)> {
157        let Some(splits) = state else {
158            return Ok((pending().boxed(), Default::default()));
159        };
160        let config = self.config.clone();
161        let columns = self.get_target_columns(column_ids)?;
162
163        let data_gen_columns = Some(
164            columns
165                .iter()
166                .map(|col| Column {
167                    name: col.name.clone(),
168                    data_type: col.data_type.clone(),
169                    is_visible: col.is_visible(),
170                })
171                .collect_vec(),
172        );
173
174        let parser_config = ParserConfig {
175            specific: self.parser_config.clone(),
176            common: CommonParserConfig {
177                rw_columns: columns,
178            },
179        };
180
181        config
182            .create_split_reader(
183                splits,
184                parser_config,
185                source_ctx,
186                data_gen_columns,
187                CreateSplitReaderOpt {
188                    seek_to_latest,
189                    ..Default::default()
190                },
191            )
192            .await
193    }
194}
195
196#[try_stream(boxed, ok = FsPageItem, error = crate::error::ConnectorError)]
197async fn build_opendal_fs_list_stream<Src: OpendalSource>(
198    lister: OpendalEnumerator<Src>,
199    list_interval_sec: u64,
200) {
201    loop {
202        let matcher = lister.get_matcher();
203        let mut object_metadata_iter = lister.list().await?;
204        while let Some(list_res) = object_metadata_iter.next().await {
205            match list_res {
206                Ok(res) => {
207                    if matcher
208                        .as_ref()
209                        .map(|m| m.matches(&res.name) || m.to_string() == res.name)
210                        .unwrap_or(true)
211                    {
212                        yield res
213                    } else {
214                        continue;
215                    }
216                }
217                Err(err) => {
218                    tracing::error!(error = %err.as_report(), "list object fail");
219                    return Err(err);
220                }
221            }
222        }
223        tokio::time::sleep(std::time::Duration::from_secs(list_interval_sec)).await;
224    }
225}
226
227#[try_stream(boxed, ok = OpendalFsSplit<Src>,  error = crate::error::ConnectorError)]
228pub async fn build_opendal_fs_list_for_batch<Src: OpendalSource>(lister: OpendalEnumerator<Src>) {
229    let matcher = lister.get_matcher();
230    let mut object_metadata_iter = lister.list().await?;
231
232    while let Some(list_res) = object_metadata_iter.next().await {
233        match list_res {
234            Ok(res) => {
235                if matcher
236                    .as_ref()
237                    .map(|m| m.matches(&res.name))
238                    .unwrap_or(true)
239                {
240                    let split = OpendalFsSplit::new(res.name, 0, res.size as usize);
241                    yield split
242                } else {
243                    continue;
244                }
245            }
246            Err(err) => {
247                tracing::error!(error = %err.as_report(), "list object fail");
248                return Err(err);
249            }
250        }
251    }
252}