risingwave_connector/source/reader/
reader.rs1use std::sync::Arc;
16
17use anyhow::Context;
18use async_nats::jetstream::consumer::AckPolicy;
19use futures::StreamExt;
20use futures::stream::pending;
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::ColumnId;
25use thiserror_ext::AsReport as _;
26
27use crate::WithOptionsSecResolved;
28use crate::error::ConnectorResult;
29use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
30use crate::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
31use crate::source::filesystem::opendal_source::{
32 DEFAULT_REFRESH_INTERVAL_SEC, OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3,
33 OpendalSource,
34};
35use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
36use crate::source::{
37 BoxSourceChunkStream, BoxTryStream, Column, ConnectorProperties, ConnectorState,
38 CreateSplitReaderOpt, CreateSplitReaderResult, SourceColumnDesc, SourceContext,
39 WaitCheckpointTask,
40};
41
42#[derive(Clone, Debug)]
43pub struct SourceReader {
44 pub config: ConnectorProperties,
45 pub columns: Vec<SourceColumnDesc>,
46 pub parser_config: SpecificParserConfig,
47 pub connector_message_buffer_size: usize,
48}
49
50impl SourceReader {
51 pub fn new(
52 properties: WithOptionsSecResolved,
53 columns: Vec<SourceColumnDesc>,
54 connector_message_buffer_size: usize,
55 parser_config: SpecificParserConfig,
56 ) -> ConnectorResult<Self> {
57 let config = ConnectorProperties::extract(properties, false)?;
58
59 Ok(Self {
60 config,
61 columns,
62 parser_config,
63 connector_message_buffer_size,
64 })
65 }
66
67 fn get_target_columns(
68 &self,
69 column_ids: Vec<ColumnId>,
70 ) -> ConnectorResult<Vec<SourceColumnDesc>> {
71 column_ids
72 .iter()
73 .map(|id| {
74 self.columns
75 .iter()
76 .find(|c| c.column_id == *id)
77 .with_context(|| {
78 format!("Failed to find column id: {} in source: {:?}", id, self)
79 })
80 .cloned()
81 })
82 .try_collect()
83 .map_err(Into::into)
84 }
85
86 pub fn get_source_list(&self) -> ConnectorResult<BoxTryStream<FsPageItem>> {
87 let config = self.config.clone();
88 let list_interval_sec: u64;
89 let get_list_interval_sec =
90 |interval: Option<u64>| -> u64 { interval.unwrap_or(DEFAULT_REFRESH_INTERVAL_SEC) };
91 match config {
92 ConnectorProperties::Gcs(prop) => {
93 list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
94 let lister: OpendalEnumerator<OpendalGcs> =
95 OpendalEnumerator::new_gcs_source(*prop)?;
96 Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
97 }
98 ConnectorProperties::OpendalS3(prop) => {
99 list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
100 let lister: OpendalEnumerator<OpendalS3> =
101 OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?;
102 Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
103 }
104 ConnectorProperties::Azblob(prop) => {
105 list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
106 let lister: OpendalEnumerator<OpendalAzblob> =
107 OpendalEnumerator::new_azblob_source(*prop)?;
108 Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
109 }
110 ConnectorProperties::PosixFs(prop) => {
111 list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
112 let lister: OpendalEnumerator<OpendalPosixFs> =
113 OpendalEnumerator::new_posix_fs_source(*prop)?;
114 Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
115 }
116 other => bail!("Unsupported source: {:?}", other),
117 }
118 }
119
120 pub async fn create_wait_checkpoint_task(&self) -> ConnectorResult<Option<WaitCheckpointTask>> {
122 Ok(match &self.config {
123 ConnectorProperties::PostgresCdc(_) => Some(WaitCheckpointTask::CommitCdcOffset(None)),
124 ConnectorProperties::GooglePubsub(prop) => Some(WaitCheckpointTask::AckPubsubMessage(
125 prop.subscription_client().await?,
126 vec![],
127 )),
128 ConnectorProperties::Nats(prop) => {
129 match prop.nats_properties_consumer.get_ack_policy()? {
130 a @ AckPolicy::Explicit | a @ AckPolicy::All => {
131 Some(WaitCheckpointTask::AckNatsJetStream(
132 prop.common.build_context().await?,
133 vec![],
134 a,
135 ))
136 }
137 AckPolicy::None => None,
138 }
139 }
140 _ => None,
141 })
142 }
143
144 pub async fn build_stream(
148 &self,
149 state: ConnectorState,
150 column_ids: Vec<ColumnId>,
151 source_ctx: Arc<SourceContext>,
152 seek_to_latest: bool,
153 ) -> ConnectorResult<(BoxSourceChunkStream, CreateSplitReaderResult)> {
154 let Some(splits) = state else {
155 return Ok((pending().boxed(), Default::default()));
156 };
157 let config = self.config.clone();
158 let columns = self.get_target_columns(column_ids)?;
159
160 let data_gen_columns = Some(
161 columns
162 .iter()
163 .map(|col| Column {
164 name: col.name.clone(),
165 data_type: col.data_type.clone(),
166 is_visible: col.is_visible(),
167 })
168 .collect_vec(),
169 );
170
171 let parser_config = ParserConfig {
172 specific: self.parser_config.clone(),
173 common: CommonParserConfig {
174 rw_columns: columns,
175 },
176 };
177
178 config
179 .create_split_reader(
180 splits,
181 parser_config,
182 source_ctx,
183 data_gen_columns,
184 CreateSplitReaderOpt {
185 seek_to_latest,
186 ..Default::default()
187 },
188 )
189 .await
190 }
191}
192
193#[try_stream(boxed, ok = FsPageItem, error = crate::error::ConnectorError)]
194async fn build_opendal_fs_list_stream<Src: OpendalSource>(
195 lister: OpendalEnumerator<Src>,
196 list_interval_sec: u64,
197) {
198 loop {
199 let matcher = lister.get_matcher();
200 let mut object_metadata_iter = lister.list().await?;
201 while let Some(list_res) = object_metadata_iter.next().await {
202 match list_res {
203 Ok(res) => {
204 if matcher
205 .as_ref()
206 .map(|m| m.matches(&res.name) || m.to_string() == res.name)
207 .unwrap_or(true)
208 {
209 yield res
210 } else {
211 continue;
212 }
213 }
214 Err(err) => {
215 tracing::error!(error = %err.as_report(), "list object fail");
216 return Err(err);
217 }
218 }
219 }
220 tokio::time::sleep(std::time::Duration::from_secs(list_interval_sec)).await;
221 }
222}
223
224#[try_stream(boxed, ok = OpendalFsSplit<Src>, error = crate::error::ConnectorError)]
225pub async fn build_opendal_fs_list_for_batch<Src: OpendalSource>(lister: OpendalEnumerator<Src>) {
226 let matcher = lister.get_matcher();
227 let mut object_metadata_iter = lister.list().await?;
228
229 while let Some(list_res) = object_metadata_iter.next().await {
230 match list_res {
231 Ok(res) => {
232 if matcher
233 .as_ref()
234 .map(|m| m.matches(&res.name))
235 .unwrap_or(true)
236 {
237 let split = OpendalFsSplit::new(res.name, 0, res.size as usize);
238 yield split
239 } else {
240 continue;
241 }
242 }
243 Err(err) => {
244 tracing::error!(error = %err.as_report(), "list object fail");
245 return Err(err);
246 }
247 }
248 }
249}