risingwave_connector/source/reader/
reader.rs1use std::sync::Arc;
16
17use anyhow::Context;
18use async_nats::jetstream::consumer::AckPolicy;
19use futures::StreamExt;
20use futures::stream::pending;
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::ColumnId;
25use thiserror_ext::AsReport as _;
26
27use crate::WithOptionsSecResolved;
28use crate::error::ConnectorResult;
29use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
30use crate::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
31use crate::source::filesystem::opendal_source::{
32 DEFAULT_REFRESH_INTERVAL_SEC, OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3,
33 OpendalSource,
34};
35use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
36use crate::source::{
37 BoxSourceChunkStream, BoxTryStream, Column, ConnectorProperties, ConnectorState,
38 CreateSplitReaderOpt, CreateSplitReaderResult, SourceColumnDesc, SourceContext,
39 WaitCheckpointTask,
40};
41
42#[derive(Clone, Debug)]
43pub struct SourceReader {
44 pub config: ConnectorProperties,
45 pub columns: Vec<SourceColumnDesc>,
46 pub parser_config: SpecificParserConfig,
47 pub connector_message_buffer_size: usize,
48}
49
50impl SourceReader {
51 pub fn new(
52 properties: WithOptionsSecResolved,
53 columns: Vec<SourceColumnDesc>,
54 connector_message_buffer_size: usize,
55 parser_config: SpecificParserConfig,
56 ) -> ConnectorResult<Self> {
57 let config = ConnectorProperties::extract(properties, false)?;
58
59 Ok(Self {
60 config,
61 columns,
62 parser_config,
63 connector_message_buffer_size,
64 })
65 }
66
67 fn get_target_columns(
68 &self,
69 column_ids: Vec<ColumnId>,
70 ) -> ConnectorResult<Vec<SourceColumnDesc>> {
71 column_ids
72 .iter()
73 .map(|id| {
74 self.columns
75 .iter()
76 .find(|c| c.column_id == *id)
77 .with_context(|| {
78 format!("Failed to find column id: {} in source: {:?}", id, self)
79 })
80 .cloned()
81 })
82 .try_collect()
83 .map_err(Into::into)
84 }
85
86 pub fn get_source_list(&self) -> ConnectorResult<BoxTryStream<FsPageItem>> {
87 let config = self.config.clone();
88 let list_interval_sec: u64;
89 let get_list_interval_sec =
90 |interval: Option<u64>| -> u64 { interval.unwrap_or(DEFAULT_REFRESH_INTERVAL_SEC) };
91 match config {
92 ConnectorProperties::Gcs(prop) => {
93 list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
94 let lister: OpendalEnumerator<OpendalGcs> =
95 OpendalEnumerator::new_gcs_source(*prop)?;
96 Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
97 }
98 ConnectorProperties::OpendalS3(prop) => {
99 list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
100 let lister: OpendalEnumerator<OpendalS3> = OpendalEnumerator::new_s3_source(
101 &prop.s3_properties,
102 prop.assume_role,
103 prop.fs_common.compression_format,
104 )?;
105 Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
106 }
107 ConnectorProperties::Azblob(prop) => {
108 list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
109 let lister: OpendalEnumerator<OpendalAzblob> =
110 OpendalEnumerator::new_azblob_source(*prop)?;
111 Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
112 }
113 ConnectorProperties::PosixFs(prop) => {
114 list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
115 let lister: OpendalEnumerator<OpendalPosixFs> =
116 OpendalEnumerator::new_posix_fs_source(*prop)?;
117 Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
118 }
119 other => bail!("Unsupported source: {:?}", other),
120 }
121 }
122
123 pub async fn create_wait_checkpoint_task(&self) -> ConnectorResult<Option<WaitCheckpointTask>> {
125 Ok(match &self.config {
126 ConnectorProperties::PostgresCdc(_) => Some(WaitCheckpointTask::CommitCdcOffset(None)),
127 ConnectorProperties::GooglePubsub(prop) => Some(WaitCheckpointTask::AckPubsubMessage(
128 prop.subscription_client().await?,
129 vec![],
130 )),
131 ConnectorProperties::Nats(prop) => {
132 match prop.nats_properties_consumer.get_ack_policy()? {
133 a @ AckPolicy::Explicit | a @ AckPolicy::All => {
134 Some(WaitCheckpointTask::AckNatsJetStream(
135 prop.common.build_context().await?,
136 vec![],
137 a,
138 ))
139 }
140 AckPolicy::None => None,
141 }
142 }
143 _ => None,
144 })
145 }
146
147 pub async fn build_stream(
151 &self,
152 state: ConnectorState,
153 column_ids: Vec<ColumnId>,
154 source_ctx: Arc<SourceContext>,
155 seek_to_latest: bool,
156 ) -> ConnectorResult<(BoxSourceChunkStream, CreateSplitReaderResult)> {
157 let Some(splits) = state else {
158 return Ok((pending().boxed(), Default::default()));
159 };
160 let config = self.config.clone();
161 let columns = self.get_target_columns(column_ids)?;
162
163 let data_gen_columns = Some(
164 columns
165 .iter()
166 .map(|col| Column {
167 name: col.name.clone(),
168 data_type: col.data_type.clone(),
169 is_visible: col.is_visible(),
170 })
171 .collect_vec(),
172 );
173
174 let parser_config = ParserConfig {
175 specific: self.parser_config.clone(),
176 common: CommonParserConfig {
177 rw_columns: columns,
178 },
179 };
180
181 config
182 .create_split_reader(
183 splits,
184 parser_config,
185 source_ctx,
186 data_gen_columns,
187 CreateSplitReaderOpt {
188 seek_to_latest,
189 ..Default::default()
190 },
191 )
192 .await
193 }
194}
195
196#[try_stream(boxed, ok = FsPageItem, error = crate::error::ConnectorError)]
197async fn build_opendal_fs_list_stream<Src: OpendalSource>(
198 lister: OpendalEnumerator<Src>,
199 list_interval_sec: u64,
200) {
201 loop {
202 let matcher = lister.get_matcher();
203 let mut object_metadata_iter = lister.list().await?;
204 while let Some(list_res) = object_metadata_iter.next().await {
205 match list_res {
206 Ok(res) => {
207 if matcher
208 .as_ref()
209 .map(|m| m.matches(&res.name) || m.to_string() == res.name)
210 .unwrap_or(true)
211 {
212 yield res
213 } else {
214 continue;
215 }
216 }
217 Err(err) => {
218 tracing::error!(error = %err.as_report(), "list object fail");
219 return Err(err);
220 }
221 }
222 }
223 tokio::time::sleep(std::time::Duration::from_secs(list_interval_sec)).await;
224 }
225}
226
227#[try_stream(boxed, ok = OpendalFsSplit<Src>, error = crate::error::ConnectorError)]
228pub async fn build_opendal_fs_list_for_batch<Src: OpendalSource>(lister: OpendalEnumerator<Src>) {
229 let matcher = lister.get_matcher();
230 let mut object_metadata_iter = lister.list().await?;
231
232 while let Some(list_res) = object_metadata_iter.next().await {
233 match list_res {
234 Ok(res) => {
235 if matcher
236 .as_ref()
237 .map(|m| m.matches(&res.name))
238 .unwrap_or(true)
239 {
240 let split = OpendalFsSplit::new(res.name, 0, res.size as usize);
241 yield split
242 } else {
243 continue;
244 }
245 }
246 Err(err) => {
247 tracing::error!(error = %err.as_report(), "list object fail");
248 return Err(err);
249 }
250 }
251 }
252}