risingwave_stream/from_proto/source/
trad_source.rs1use risingwave_common::catalog::{
16 KAFKA_TIMESTAMP_COLUMN_NAME, default_key_column_name_version_mapping,
17};
18use risingwave_connector::source::reader::desc::SourceDescBuilder;
19use risingwave_connector::source::should_copy_to_format_encode_options;
20use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt};
21use risingwave_expr::bail;
22use risingwave_pb::data::data_type::TypeName as PbTypeName;
23use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
24use risingwave_pb::plan_common::{
25 AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp,
26 AdditionalColumnType as LegacyAdditionalColumnType, ColumnDescVersion, FormatType,
27 PbColumnCatalog, PbEncodeType,
28};
29use risingwave_pb::stream_plan::SourceNode;
30
31use super::*;
32use crate::executor::TroublemakerExecutor;
33use crate::executor::source::{
34 BatchAdbcSnowflakeListExecutor, BatchIcebergListExecutor, BatchPosixFsListExecutor,
35 DummySourceExecutor, FsListExecutor, IcebergListExecutor, SourceExecutor,
36 SourceStateTableHandler, StreamSourceCore,
37};
38use crate::from_proto::source::is_full_reload_refresh;
39
40pub struct SourceExecutorBuilder;
41
42pub fn create_source_desc_builder(
43 mut source_columns: Vec<PbColumnCatalog>,
44 params: &ExecutorParams,
45 source_info: PbStreamSourceInfo,
46 row_id_index: Option<u32>,
47 with_properties: WithOptionsSecResolved,
48) -> SourceDescBuilder {
49 {
50 if source_info.format() == FormatType::Upsert
53 && (source_info.row_encode() == PbEncodeType::Avro
54 || source_info.row_encode() == PbEncodeType::Protobuf
55 || source_info.row_encode() == PbEncodeType::Json)
56 {
57 for c in &mut source_columns {
58 if let Some(desc) = c.column_desc.as_mut() {
59 let is_bytea = desc
60 .get_column_type()
61 .map(|col_type| col_type.type_name == PbTypeName::Bytea as i32)
62 .unwrap();
63 if desc.name == default_key_column_name_version_mapping(
64 &desc.version()
65 )
66 && is_bytea
67 && desc.version == ColumnDescVersion::Unspecified as i32
69 {
70 desc.additional_column = Some(AdditionalColumn {
71 column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
72 });
73 }
74
75 if desc.additional_column_type == LegacyAdditionalColumnType::Key as i32 {
78 desc.additional_column = Some(AdditionalColumn {
79 column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
80 });
81 }
82 }
83 }
84 }
85 }
86
87 {
88 let _ = source_columns.iter_mut().map(|c| {
93 let _ = c.column_desc.as_mut().map(|desc| {
94 let is_timestamp = desc
95 .get_column_type()
96 .map(|col_type| col_type.type_name == PbTypeName::Timestamptz as i32)
97 .unwrap();
98 if desc.name == KAFKA_TIMESTAMP_COLUMN_NAME
99 && is_timestamp
100 && desc.version == ColumnDescVersion::Unspecified as i32
102 {
103 desc.additional_column = Some(AdditionalColumn {
104 column_type: Some(AdditionalColumnType::Timestamp(
105 AdditionalColumnTimestamp {},
106 )),
107 });
108 }
109 });
110 });
111 }
112
113 SourceDescBuilder::new(
114 source_columns.clone(),
115 params.env.source_metrics(),
116 row_id_index.map(|x| x as _),
117 with_properties,
118 source_info,
119 params.config.developer.connector_message_buffer_size,
120 params.info.stream_key.clone(),
130 )
131}
132
133impl ExecutorBuilder for SourceExecutorBuilder {
134 type Node = SourceNode;
135
136 async fn new_boxed_executor(
137 params: ExecutorParams,
138 node: &Self::Node,
139 store: impl StateStore,
140 ) -> StreamResult<Executor> {
141 let barrier_receiver = params
142 .local_barrier_manager
143 .subscribe_barrier(params.actor_context.id);
144 let system_params = params.env.system_params_manager_ref().get_params();
145
146 if let Some(source) = &node.source_inner {
147 let is_full_reload_refresh = is_full_reload_refresh(&source.refresh_mode);
148 let exec = {
149 let source_id = source.source_id;
150 let source_name = source.source_name.clone();
151 let mut source_info = source.get_info()?.clone();
152 let associated_table_id = source.associated_table_id;
153
154 if source_info.format_encode_options.is_empty() {
155 let connector = get_connector_name(&source.with_properties);
158 source_info.format_encode_options.extend(
159 source.with_properties.iter().filter_map(|(k, v)| {
160 should_copy_to_format_encode_options(k, &connector)
161 .then_some((k.to_owned(), v.to_owned()))
162 }),
163 );
164 }
165
166 let with_properties = WithOptionsSecResolved::new(
167 source.with_properties.clone(),
168 source.secret_refs.clone(),
169 );
170
171 let source_desc_builder = create_source_desc_builder(
172 source.columns.clone(),
173 ¶ms,
174 source_info,
175 source.row_id_index,
176 with_properties,
177 );
178
179 let source_column_ids: Vec<_> = source_desc_builder
180 .column_catalogs_to_source_column_descs()
181 .iter()
182 .map(|column| column.column_id)
183 .collect();
184
185 let state_table_handler = SourceStateTableHandler::from_table_catalog(
186 source.state_table.as_ref().unwrap(),
187 store.clone(),
188 )
189 .await;
190 let stream_source_core = StreamSourceCore::new(
191 source_id,
192 source_name,
193 source_column_ids,
194 source_desc_builder,
195 state_table_handler,
196 );
197
198 let is_legacy_fs_connector = source.with_properties.is_legacy_fs_connector();
199 let is_fs_v2_connector = source.with_properties.is_new_fs_connector();
200
201 if is_legacy_fs_connector {
202 bail!(
204 "legacy s3 connector is fully deprecated since v2.4.0, please DROP and recreate the s3 source.\nexecutor: {:?}",
205 params
206 );
207 } else if is_fs_v2_connector {
208 FsListExecutor::new(
209 params.actor_context.clone(),
210 stream_source_core,
211 params.executor_stats.clone(),
212 barrier_receiver,
213 system_params,
214 source.rate_limit,
215 )
216 .boxed()
217 } else if source.with_properties.is_iceberg_connector() {
218 if is_full_reload_refresh {
219 BatchIcebergListExecutor::new(
220 params.actor_context.clone(),
221 stream_source_core,
222 source
223 .downstream_columns
224 .as_ref()
225 .map(|x| x.columns.clone().into_iter().map(|c| c.into()).collect()),
226 params.executor_stats.clone(),
227 barrier_receiver,
228 params.local_barrier_manager.clone(),
229 associated_table_id,
230 )
231 .boxed()
232 } else {
233 IcebergListExecutor::new(
234 params.actor_context.clone(),
235 stream_source_core,
236 source
237 .downstream_columns
238 .as_ref()
239 .map(|x| x.columns.clone().into_iter().map(|c| c.into()).collect()),
240 params.executor_stats.clone(),
241 barrier_receiver,
242 system_params,
243 source.rate_limit,
244 params.config.clone(),
245 )
246 .boxed()
247 }
248 } else if source.with_properties.is_batch_connector() {
249 if source
250 .with_properties
251 .get_connector()
252 .map(|c| {
253 c.eq_ignore_ascii_case(
254 risingwave_connector::source::BATCH_POSIX_FS_CONNECTOR,
255 )
256 })
257 .unwrap_or(false)
258 {
259 BatchPosixFsListExecutor::new(
260 params.actor_context.clone(),
261 stream_source_core,
262 params.executor_stats.clone(),
263 barrier_receiver,
264 system_params,
265 source.rate_limit,
266 params.local_barrier_manager.clone(),
267 associated_table_id,
268 )
269 .boxed()
270 } else if source
271 .with_properties
272 .get_connector()
273 .map(|c| {
274 c.eq_ignore_ascii_case(
275 risingwave_connector::source::ADBC_SNOWFLAKE_CONNECTOR,
276 )
277 })
278 .unwrap_or(false)
279 {
280 BatchAdbcSnowflakeListExecutor::new(
281 params.actor_context.clone(),
282 stream_source_core,
283 params.executor_stats.clone(),
284 barrier_receiver,
285 params.local_barrier_manager.clone(),
286 associated_table_id,
287 )
288 .boxed()
289 } else {
290 unreachable!("unknown batch connector");
291 }
292 } else {
293 let is_shared = source.info.as_ref().is_some_and(|info| info.is_shared());
294 SourceExecutor::new(
295 params.actor_context.clone(),
296 stream_source_core,
297 params.executor_stats.clone(),
298 barrier_receiver,
299 system_params,
300 source.rate_limit,
301 is_shared && !source.with_properties.is_cdc_connector(),
302 params.local_barrier_manager.clone(),
303 )
304 .boxed()
305 }
306 };
307
308 if crate::consistency::insane() {
309 let mut info = params.info.clone();
310 info.identity = format!("{} (troubled)", info.identity);
311 Ok((
312 params.info,
313 TroublemakerExecutor::new(
314 (info, exec).into(),
315 params.config.developer.chunk_size,
316 ),
317 )
318 .into())
319 } else {
320 Ok((params.info, exec).into())
321 }
322 } else {
323 let exec = DummySourceExecutor::new(params.actor_context, barrier_receiver);
326 Ok((params.info, exec).into())
327 }
328 }
329}