risingwave_connector/source/reader/
reader.rs1use std::sync::Arc;
16
17use anyhow::Context;
18use async_nats::jetstream::consumer::AckPolicy;
19use futures::StreamExt;
20use futures::stream::pending;
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::ColumnId;
25use thiserror_ext::AsReport as _;
26
27use crate::WithOptionsSecResolved;
28use crate::error::ConnectorResult;
29use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
30use crate::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
31use crate::source::filesystem::opendal_source::{
32 DEFAULT_REFRESH_INTERVAL_SEC, OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3,
33 OpendalSource,
34};
35use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
36use crate::source::{
37 BoxSourceChunkStream, BoxTryStream, Column, ConnectorProperties, ConnectorState,
38 CreateSplitReaderOpt, CreateSplitReaderResult, SourceColumnDesc, SourceContext,
39 WaitCheckpointTask,
40};
41
42#[derive(Clone, Debug)]
43pub struct SourceReader {
44 pub config: ConnectorProperties,
45 pub columns: Vec<SourceColumnDesc>,
46 pub parser_config: SpecificParserConfig,
47 pub connector_message_buffer_size: usize,
48}
49
50impl SourceReader {
51 pub fn new(
52 properties: WithOptionsSecResolved,
53 columns: Vec<SourceColumnDesc>,
54 connector_message_buffer_size: usize,
55 parser_config: SpecificParserConfig,
56 ) -> ConnectorResult<Self> {
57 let config = ConnectorProperties::extract(properties, false)?;
58
59 Ok(Self {
60 config,
61 columns,
62 parser_config,
63 connector_message_buffer_size,
64 })
65 }
66
67 fn get_target_columns(
68 &self,
69 column_ids: Vec<ColumnId>,
70 ) -> ConnectorResult<Vec<SourceColumnDesc>> {
71 column_ids
72 .iter()
73 .map(|id| {
74 self.columns
75 .iter()
76 .find(|c| c.column_id == *id)
77 .with_context(|| {
78 format!("Failed to find column id: {} in source: {:?}", id, self)
79 })
80 .cloned()
81 })
82 .try_collect()
83 .map_err(Into::into)
84 }
85
86 pub fn get_source_list(&self) -> ConnectorResult<BoxTryStream<FsPageItem>> {
87 let config = self.config.clone();
88 let list_interval_sec: u64;
89 let get_list_interval_sec =
90 |interval: Option<u64>| -> u64 { interval.unwrap_or(DEFAULT_REFRESH_INTERVAL_SEC) };
91 match config {
92 ConnectorProperties::Gcs(prop) => {
93 list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
94 let lister: OpendalEnumerator<OpendalGcs> =
95 OpendalEnumerator::new_gcs_source(*prop)?;
96 Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
97 }
98 ConnectorProperties::OpendalS3(prop) => {
99 list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
100 let lister: OpendalEnumerator<OpendalS3> = OpendalEnumerator::new_s3_source(
101 &prop.s3_properties,
102 prop.assume_role,
103 prop.fs_common.compression_format,
104 )?;
105 Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
106 }
107 ConnectorProperties::Azblob(prop) => {
108 list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
109 let lister: OpendalEnumerator<OpendalAzblob> =
110 OpendalEnumerator::new_azblob_source(*prop)?;
111 Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
112 }
113 ConnectorProperties::PosixFs(prop) => {
114 list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
115 let lister: OpendalEnumerator<OpendalPosixFs> =
116 OpendalEnumerator::new_posix_fs_source(*prop)?;
117 Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
118 }
119 other => bail!("Unsupported source: {:?}", other),
120 }
121 }
122
123 pub async fn create_wait_checkpoint_task(&self) -> ConnectorResult<Option<WaitCheckpointTask>> {
125 Ok(match &self.config {
126 ConnectorProperties::PostgresCdc(_) => Some(WaitCheckpointTask::CommitCdcOffset(None)),
127 ConnectorProperties::GooglePubsub(prop) => Some(WaitCheckpointTask::AckPubsubMessage(
128 prop.subscription_client().await?,
129 vec![],
130 )),
131 ConnectorProperties::Nats(prop) => {
132 match prop.nats_properties_consumer.get_ack_policy()? {
133 a @ AckPolicy::Explicit | a @ AckPolicy::All => {
134 Some(WaitCheckpointTask::AckNatsJetStream(
135 prop.common.build_context().await?,
136 vec![],
137 a,
138 ))
139 }
140 AckPolicy::None => None,
141 }
142 }
143 ConnectorProperties::Pulsar(_) => Some(WaitCheckpointTask::AckPulsarMessage(vec![])),
144 _ => None,
145 })
146 }
147
148 pub async fn build_stream(
152 &self,
153 state: ConnectorState,
154 column_ids: Vec<ColumnId>,
155 source_ctx: Arc<SourceContext>,
156 seek_to_latest: bool,
157 ) -> ConnectorResult<(BoxSourceChunkStream, CreateSplitReaderResult)> {
158 let Some(splits) = state else {
159 return Ok((pending().boxed(), Default::default()));
160 };
161 let config = self.config.clone();
162 let columns = self.get_target_columns(column_ids)?;
163
164 let data_gen_columns = Some(
165 columns
166 .iter()
167 .map(|col| Column {
168 name: col.name.clone(),
169 data_type: col.data_type.clone(),
170 is_visible: col.is_visible(),
171 })
172 .collect_vec(),
173 );
174
175 let parser_config = ParserConfig {
176 specific: self.parser_config.clone(),
177 common: CommonParserConfig {
178 rw_columns: columns,
179 },
180 };
181
182 config
183 .create_split_reader(
184 splits,
185 parser_config,
186 source_ctx,
187 data_gen_columns,
188 CreateSplitReaderOpt {
189 seek_to_latest,
190 ..Default::default()
191 },
192 )
193 .await
194 }
195}
196
197#[try_stream(boxed, ok = FsPageItem, error = crate::error::ConnectorError)]
198async fn build_opendal_fs_list_stream<Src: OpendalSource>(
199 lister: OpendalEnumerator<Src>,
200 list_interval_sec: u64,
201) {
202 loop {
203 let matcher = lister.get_matcher();
204 let mut object_metadata_iter = lister.list().await?;
205 while let Some(list_res) = object_metadata_iter.next().await {
206 match list_res {
207 Ok(res) => {
208 if matcher
209 .as_ref()
210 .map(|m| m.matches(&res.name) || m.to_string() == res.name)
211 .unwrap_or(true)
212 {
213 yield res
214 } else {
215 continue;
216 }
217 }
218 Err(err) => {
219 tracing::error!(error = %err.as_report(), "list object fail");
220 return Err(err);
221 }
222 }
223 }
224 tokio::time::sleep(std::time::Duration::from_secs(list_interval_sec)).await;
225 }
226}
227
228#[try_stream(boxed, ok = OpendalFsSplit<Src>, error = crate::error::ConnectorError)]
229pub async fn build_opendal_fs_list_for_batch<Src: OpendalSource>(lister: OpendalEnumerator<Src>) {
230 let matcher = lister.get_matcher();
231 let mut object_metadata_iter = lister.list().await?;
232
233 while let Some(list_res) = object_metadata_iter.next().await {
234 match list_res {
235 Ok(res) => {
236 if matcher
237 .as_ref()
238 .map(|m| m.matches(&res.name))
239 .unwrap_or(true)
240 {
241 let split = OpendalFsSplit::new(res.name, 0, res.size as usize);
242 yield split
243 } else {
244 continue;
245 }
246 }
247 Err(err) => {
248 tracing::error!(error = %err.as_report(), "list object fail");
249 return Err(err);
250 }
251 }
252 }
253}