risingwave_stream/from_proto/source/
trad_source.rs1use risingwave_common::catalog::{
16 KAFKA_TIMESTAMP_COLUMN_NAME, default_key_column_name_version_mapping,
17};
18use risingwave_connector::source::reader::desc::SourceDescBuilder;
19use risingwave_connector::source::should_copy_to_format_encode_options;
20use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt};
21use risingwave_expr::bail;
22use risingwave_pb::data::data_type::TypeName as PbTypeName;
23use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
24use risingwave_pb::plan_common::{
25 AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp,
26 AdditionalColumnType as LegacyAdditionalColumnType, ColumnDescVersion, FormatType,
27 PbColumnCatalog, PbEncodeType,
28};
29use risingwave_pb::stream_plan::SourceNode;
30
31use super::*;
32use crate::executor::TroublemakerExecutor;
33use crate::executor::source::{
34 BatchAdbcSnowflakeListExecutor, BatchIcebergListExecutor, BatchPosixFsListExecutor,
35 DummySourceExecutor, FsListExecutor, IcebergListExecutor, SourceExecutor,
36 SourceStateTableHandler, StreamSourceCore,
37};
38use crate::from_proto::source::is_full_reload_refresh;
39
40pub struct SourceExecutorBuilder;
41
42pub fn create_source_desc_builder(
43 mut source_columns: Vec<PbColumnCatalog>,
44 params: &ExecutorParams,
45 source_info: PbStreamSourceInfo,
46 row_id_index: Option<u32>,
47 with_properties: WithOptionsSecResolved,
48) -> SourceDescBuilder {
49 {
50 if source_info.format() == FormatType::Upsert
53 && (source_info.row_encode() == PbEncodeType::Avro
54 || source_info.row_encode() == PbEncodeType::Protobuf
55 || source_info.row_encode() == PbEncodeType::Json)
56 {
57 for c in &mut source_columns {
58 if let Some(desc) = c.column_desc.as_mut() {
59 let is_bytea = desc
60 .get_column_type()
61 .map(|col_type| col_type.type_name == PbTypeName::Bytea as i32)
62 .unwrap();
63 if desc.name == default_key_column_name_version_mapping(
64 &desc.version()
65 )
66 && is_bytea
67 && desc.version == ColumnDescVersion::Unspecified as i32
69 {
70 desc.additional_column = Some(AdditionalColumn {
71 column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
72 });
73 }
74
75 if desc.additional_column_type == LegacyAdditionalColumnType::Key as i32 {
78 desc.additional_column = Some(AdditionalColumn {
79 column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
80 });
81 }
82 }
83 }
84 }
85 }
86
87 {
88 let _ = source_columns.iter_mut().map(|c| {
93 let _ = c.column_desc.as_mut().map(|desc| {
94 let is_timestamp = desc
95 .get_column_type()
96 .map(|col_type| col_type.type_name == PbTypeName::Timestamptz as i32)
97 .unwrap();
98 if desc.name == KAFKA_TIMESTAMP_COLUMN_NAME
99 && is_timestamp
100 && desc.version == ColumnDescVersion::Unspecified as i32
102 {
103 desc.additional_column = Some(AdditionalColumn {
104 column_type: Some(AdditionalColumnType::Timestamp(
105 AdditionalColumnTimestamp {},
106 )),
107 });
108 }
109 });
110 });
111 }
112
113 SourceDescBuilder::new(
114 source_columns.clone(),
115 params.env.source_metrics(),
116 row_id_index.map(|x| x as _),
117 with_properties,
118 source_info,
119 params.config.developer.connector_message_buffer_size,
120 params.info.stream_key.clone(),
130 )
131}
132
133impl_stream_node_body!(Source(SourceNode) => SourceExecutorBuilder);
134
135impl ExecutorBuilder for SourceExecutorBuilder {
136 type Node = SourceNode;
137
138 async fn new_boxed_executor(
139 params: ExecutorParams,
140 node: &Self::Node,
141 store: impl StateStore,
142 ) -> StreamResult<Executor> {
143 let barrier_receiver = params
144 .local_barrier_manager
145 .subscribe_barrier(params.actor_context.id);
146 let system_params = params.env.system_params_manager_ref().get_params();
147
148 if let Some(source) = &node.source_inner {
149 let is_full_reload_refresh = is_full_reload_refresh(&source.refresh_mode);
150 let exec = {
151 let source_id = source.source_id;
152 let source_name = source.source_name.clone();
153 let mut source_info = source.get_info()?.clone();
154 let associated_table_id = source.associated_table_id;
155
156 if source_info.format_encode_options.is_empty() {
157 let connector = get_connector_name(&source.with_properties);
160 source_info.format_encode_options.extend(
161 source.with_properties.iter().filter_map(|(k, v)| {
162 should_copy_to_format_encode_options(k, &connector)
163 .then_some((k.to_owned(), v.to_owned()))
164 }),
165 );
166 }
167
168 let with_properties = WithOptionsSecResolved::new(
169 source.with_properties.clone(),
170 source.secret_refs.clone(),
171 );
172
173 let source_desc_builder = create_source_desc_builder(
174 source.columns.clone(),
175 ¶ms,
176 source_info,
177 source.row_id_index,
178 with_properties,
179 );
180
181 let source_column_ids: Vec<_> = source_desc_builder
182 .column_catalogs_to_source_column_descs()
183 .iter()
184 .map(|column| column.column_id)
185 .collect();
186
187 let state_table_handler = SourceStateTableHandler::from_table_catalog(
188 source.state_table.as_ref().unwrap(),
189 store.clone(),
190 )
191 .await;
192 let stream_source_core = StreamSourceCore::new(
193 source_id,
194 source_name,
195 source_column_ids,
196 source_desc_builder,
197 state_table_handler,
198 );
199
200 let is_legacy_fs_connector = source.with_properties.is_legacy_fs_connector();
201 let is_fs_v2_connector = source.with_properties.is_new_fs_connector();
202
203 if is_legacy_fs_connector {
204 bail!(
206 "legacy s3 connector is fully deprecated since v2.4.0, please DROP and recreate the s3 source.\nexecutor: {:?}",
207 params
208 );
209 } else if is_fs_v2_connector {
210 FsListExecutor::new(
211 params.actor_context.clone(),
212 stream_source_core,
213 params.executor_stats.clone(),
214 barrier_receiver,
215 system_params,
216 source.rate_limit,
217 )
218 .boxed()
219 } else if source.with_properties.is_iceberg_connector() {
220 if is_full_reload_refresh {
221 BatchIcebergListExecutor::new(
222 params.actor_context.clone(),
223 stream_source_core,
224 source
225 .downstream_columns
226 .as_ref()
227 .map(|x| x.columns.clone().into_iter().map(|c| c.into()).collect()),
228 params.executor_stats.clone(),
229 barrier_receiver,
230 params.local_barrier_manager.clone(),
231 associated_table_id,
232 )
233 .boxed()
234 } else {
235 IcebergListExecutor::new(
236 params.actor_context.clone(),
237 stream_source_core,
238 source
239 .downstream_columns
240 .as_ref()
241 .map(|x| x.columns.clone().into_iter().map(|c| c.into()).collect()),
242 params.executor_stats.clone(),
243 barrier_receiver,
244 system_params,
245 source.rate_limit,
246 params.config.clone(),
247 )
248 .boxed()
249 }
250 } else if source.with_properties.is_batch_connector() {
251 if source
252 .with_properties
253 .get_connector()
254 .map(|c| {
255 c.eq_ignore_ascii_case(
256 risingwave_connector::source::BATCH_POSIX_FS_CONNECTOR,
257 )
258 })
259 .unwrap_or(false)
260 {
261 BatchPosixFsListExecutor::new(
262 params.actor_context.clone(),
263 stream_source_core,
264 params.executor_stats.clone(),
265 barrier_receiver,
266 system_params,
267 source.rate_limit,
268 params.local_barrier_manager.clone(),
269 associated_table_id,
270 )
271 .boxed()
272 } else if source
273 .with_properties
274 .get_connector()
275 .map(|c| {
276 c.eq_ignore_ascii_case(
277 risingwave_connector::source::ADBC_SNOWFLAKE_CONNECTOR,
278 )
279 })
280 .unwrap_or(false)
281 {
282 BatchAdbcSnowflakeListExecutor::new(
283 params.actor_context.clone(),
284 stream_source_core,
285 params.executor_stats.clone(),
286 barrier_receiver,
287 params.local_barrier_manager.clone(),
288 associated_table_id,
289 )
290 .boxed()
291 } else {
292 unreachable!("unknown batch connector");
293 }
294 } else {
295 let is_shared = source.info.as_ref().is_some_and(|info| info.is_shared());
296 SourceExecutor::new(
297 params.actor_context.clone(),
298 stream_source_core,
299 params.executor_stats.clone(),
300 barrier_receiver,
301 system_params,
302 source.rate_limit,
303 is_shared && !source.with_properties.is_cdc_connector(),
304 params.local_barrier_manager.clone(),
305 )
306 .boxed()
307 }
308 };
309
310 if crate::consistency::insane() {
311 let mut info = params.info.clone();
312 info.identity = format!("{} (troubled)", info.identity);
313 Ok((
314 params.info,
315 TroublemakerExecutor::new(
316 (info, exec).into(),
317 params.config.developer.chunk_size,
318 ),
319 )
320 .into())
321 } else {
322 Ok((params.info, exec).into())
323 }
324 } else {
325 let exec = DummySourceExecutor::new(params.actor_context, barrier_receiver);
328 Ok((params.info, exec).into())
329 }
330 }
331}