risingwave_stream/from_proto/source/
trad_source.rs1use risingwave_common::catalog::{
16 KAFKA_TIMESTAMP_COLUMN_NAME, default_key_column_name_version_mapping,
17};
18use risingwave_connector::source::reader::desc::SourceDescBuilder;
19use risingwave_connector::source::should_copy_to_format_encode_options;
20use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt};
21use risingwave_expr::bail;
22use risingwave_pb::data::data_type::TypeName as PbTypeName;
23use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
24use risingwave_pb::plan_common::{
25 AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp,
26 AdditionalColumnType as LegacyAdditionalColumnType, ColumnDescVersion, FormatType,
27 PbColumnCatalog, PbEncodeType,
28};
29use risingwave_pb::stream_plan::SourceNode;
30
31use super::*;
32use crate::executor::TroublemakerExecutor;
33use crate::executor::source::{
34 BatchIcebergListExecutor, BatchPosixFsListExecutor, DummySourceExecutor, FsListExecutor,
35 IcebergListExecutor, SourceExecutor, SourceStateTableHandler, StreamSourceCore,
36};
37use crate::from_proto::source::is_full_reload_refresh;
38
39pub struct SourceExecutorBuilder;
40
41pub fn create_source_desc_builder(
42 mut source_columns: Vec<PbColumnCatalog>,
43 params: &ExecutorParams,
44 source_info: PbStreamSourceInfo,
45 row_id_index: Option<u32>,
46 with_properties: WithOptionsSecResolved,
47) -> SourceDescBuilder {
48 {
49 if source_info.format() == FormatType::Upsert
52 && (source_info.row_encode() == PbEncodeType::Avro
53 || source_info.row_encode() == PbEncodeType::Protobuf
54 || source_info.row_encode() == PbEncodeType::Json)
55 {
56 for c in &mut source_columns {
57 if let Some(desc) = c.column_desc.as_mut() {
58 let is_bytea = desc
59 .get_column_type()
60 .map(|col_type| col_type.type_name == PbTypeName::Bytea as i32)
61 .unwrap();
62 if desc.name == default_key_column_name_version_mapping(
63 &desc.version()
64 )
65 && is_bytea
66 && desc.version == ColumnDescVersion::Unspecified as i32
68 {
69 desc.additional_column = Some(AdditionalColumn {
70 column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
71 });
72 }
73
74 if desc.additional_column_type == LegacyAdditionalColumnType::Key as i32 {
77 desc.additional_column = Some(AdditionalColumn {
78 column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
79 });
80 }
81 }
82 }
83 }
84 }
85
86 {
87 let _ = source_columns.iter_mut().map(|c| {
92 let _ = c.column_desc.as_mut().map(|desc| {
93 let is_timestamp = desc
94 .get_column_type()
95 .map(|col_type| col_type.type_name == PbTypeName::Timestamptz as i32)
96 .unwrap();
97 if desc.name == KAFKA_TIMESTAMP_COLUMN_NAME
98 && is_timestamp
99 && desc.version == ColumnDescVersion::Unspecified as i32
101 {
102 desc.additional_column = Some(AdditionalColumn {
103 column_type: Some(AdditionalColumnType::Timestamp(
104 AdditionalColumnTimestamp {},
105 )),
106 });
107 }
108 });
109 });
110 }
111
112 SourceDescBuilder::new(
113 source_columns.clone(),
114 params.env.source_metrics(),
115 row_id_index.map(|x| x as _),
116 with_properties,
117 source_info,
118 params.config.developer.connector_message_buffer_size,
119 params.info.stream_key.clone(),
129 )
130}
131
132impl ExecutorBuilder for SourceExecutorBuilder {
133 type Node = SourceNode;
134
135 async fn new_boxed_executor(
136 params: ExecutorParams,
137 node: &Self::Node,
138 store: impl StateStore,
139 ) -> StreamResult<Executor> {
140 let barrier_receiver = params
141 .local_barrier_manager
142 .subscribe_barrier(params.actor_context.id);
143 let system_params = params.env.system_params_manager_ref().get_params();
144
145 if let Some(source) = &node.source_inner {
146 let is_full_reload_refresh = is_full_reload_refresh(&source.refresh_mode);
147 let exec = {
148 let source_id = source.source_id;
149 let source_name = source.source_name.clone();
150 let mut source_info = source.get_info()?.clone();
151 let associated_table_id = source.associated_table_id;
152
153 if source_info.format_encode_options.is_empty() {
154 let connector = get_connector_name(&source.with_properties);
157 source_info.format_encode_options.extend(
158 source.with_properties.iter().filter_map(|(k, v)| {
159 should_copy_to_format_encode_options(k, &connector)
160 .then_some((k.to_owned(), v.to_owned()))
161 }),
162 );
163 }
164
165 let with_properties = WithOptionsSecResolved::new(
166 source.with_properties.clone(),
167 source.secret_refs.clone(),
168 );
169
170 let source_desc_builder = create_source_desc_builder(
171 source.columns.clone(),
172 ¶ms,
173 source_info,
174 source.row_id_index,
175 with_properties,
176 );
177
178 let source_column_ids: Vec<_> = source_desc_builder
179 .column_catalogs_to_source_column_descs()
180 .iter()
181 .map(|column| column.column_id)
182 .collect();
183
184 let state_table_handler = SourceStateTableHandler::from_table_catalog(
185 source.state_table.as_ref().unwrap(),
186 store.clone(),
187 )
188 .await;
189 let stream_source_core = StreamSourceCore::new(
190 source_id,
191 source_name,
192 source_column_ids,
193 source_desc_builder,
194 state_table_handler,
195 );
196
197 let is_legacy_fs_connector = source.with_properties.is_legacy_fs_connector();
198 let is_fs_v2_connector = source.with_properties.is_new_fs_connector();
199
200 if is_legacy_fs_connector {
201 bail!(
203 "legacy s3 connector is fully deprecated since v2.4.0, please DROP and recreate the s3 source.\nexecutor: {:?}",
204 params
205 );
206 } else if is_fs_v2_connector {
207 FsListExecutor::new(
208 params.actor_context.clone(),
209 stream_source_core,
210 params.executor_stats.clone(),
211 barrier_receiver,
212 system_params,
213 source.rate_limit,
214 )
215 .boxed()
216 } else if source.with_properties.is_iceberg_connector() {
217 if is_full_reload_refresh {
218 BatchIcebergListExecutor::new(
219 params.actor_context.clone(),
220 stream_source_core,
221 source
222 .downstream_columns
223 .as_ref()
224 .map(|x| x.columns.clone().into_iter().map(|c| c.into()).collect()),
225 params.executor_stats.clone(),
226 barrier_receiver,
227 params.local_barrier_manager.clone(),
228 associated_table_id,
229 )
230 .boxed()
231 } else {
232 IcebergListExecutor::new(
233 params.actor_context.clone(),
234 stream_source_core,
235 source
236 .downstream_columns
237 .as_ref()
238 .map(|x| x.columns.clone().into_iter().map(|c| c.into()).collect()),
239 params.executor_stats.clone(),
240 barrier_receiver,
241 system_params,
242 source.rate_limit,
243 params.config.clone(),
244 )
245 .boxed()
246 }
247 } else if source.with_properties.is_batch_connector() {
248 if source
249 .with_properties
250 .get_connector()
251 .map(|c| {
252 c.eq_ignore_ascii_case(
253 risingwave_connector::source::BATCH_POSIX_FS_CONNECTOR,
254 )
255 })
256 .unwrap_or(false)
257 {
258 BatchPosixFsListExecutor::new(
259 params.actor_context.clone(),
260 stream_source_core,
261 params.executor_stats.clone(),
262 barrier_receiver,
263 system_params,
264 source.rate_limit,
265 params.local_barrier_manager.clone(),
266 associated_table_id,
267 )
268 .boxed()
269 } else {
270 unreachable!("unknown batch connector");
271 }
272 } else {
273 let is_shared = source.info.as_ref().is_some_and(|info| info.is_shared());
274 SourceExecutor::new(
275 params.actor_context.clone(),
276 stream_source_core,
277 params.executor_stats.clone(),
278 barrier_receiver,
279 system_params,
280 source.rate_limit,
281 is_shared && !source.with_properties.is_cdc_connector(),
282 params.local_barrier_manager.clone(),
283 )
284 .boxed()
285 }
286 };
287
288 if crate::consistency::insane() {
289 let mut info = params.info.clone();
290 info.identity = format!("{} (troubled)", info.identity);
291 Ok((
292 params.info,
293 TroublemakerExecutor::new(
294 (info, exec).into(),
295 params.config.developer.chunk_size,
296 ),
297 )
298 .into())
299 } else {
300 Ok((params.info, exec).into())
301 }
302 } else {
303 let exec = DummySourceExecutor::new(params.actor_context, barrier_receiver);
306 Ok((params.info, exec).into())
307 }
308 }
309}