risingwave_stream/from_proto/source/
trad_source.rs1use risingwave_common::catalog::{
16 KAFKA_TIMESTAMP_COLUMN_NAME, default_key_column_name_version_mapping,
17};
18use risingwave_connector::source::reader::desc::SourceDescBuilder;
19use risingwave_connector::source::should_copy_to_format_encode_options;
20use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt};
21use risingwave_expr::bail;
22use risingwave_pb::data::data_type::TypeName as PbTypeName;
23use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
24use risingwave_pb::plan_common::{
25 AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp,
26 AdditionalColumnType as LegacyAdditionalColumnType, ColumnDescVersion, FormatType,
27 PbColumnCatalog, PbEncodeType,
28};
29use risingwave_pb::stream_plan::SourceNode;
30use risingwave_storage::panic_store::PanicStateStore;
31
32use super::*;
33use crate::executor::TroublemakerExecutor;
34use crate::executor::source::{
35 FsListExecutor, IcebergListExecutor, SourceExecutor, SourceStateTableHandler, StreamSourceCore,
36};
37
38pub struct SourceExecutorBuilder;
39
40pub fn create_source_desc_builder(
41 mut source_columns: Vec<PbColumnCatalog>,
42 params: &ExecutorParams,
43 source_info: PbStreamSourceInfo,
44 row_id_index: Option<u32>,
45 with_properties: WithOptionsSecResolved,
46) -> SourceDescBuilder {
47 {
48 if source_info.format() == FormatType::Upsert
51 && (source_info.row_encode() == PbEncodeType::Avro
52 || source_info.row_encode() == PbEncodeType::Protobuf
53 || source_info.row_encode() == PbEncodeType::Json)
54 {
55 for c in &mut source_columns {
56 if let Some(desc) = c.column_desc.as_mut() {
57 let is_bytea = desc
58 .get_column_type()
59 .map(|col_type| col_type.type_name == PbTypeName::Bytea as i32)
60 .unwrap();
61 if desc.name == default_key_column_name_version_mapping(
62 &desc.version()
63 )
64 && is_bytea
65 && desc.version == ColumnDescVersion::Unspecified as i32
67 {
68 desc.additional_column = Some(AdditionalColumn {
69 column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
70 });
71 }
72
73 if desc.additional_column_type == LegacyAdditionalColumnType::Key as i32 {
76 desc.additional_column = Some(AdditionalColumn {
77 column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
78 });
79 }
80 }
81 }
82 }
83 }
84
85 {
86 let _ = source_columns.iter_mut().map(|c| {
91 let _ = c.column_desc.as_mut().map(|desc| {
92 let is_timestamp = desc
93 .get_column_type()
94 .map(|col_type| col_type.type_name == PbTypeName::Timestamptz as i32)
95 .unwrap();
96 if desc.name == KAFKA_TIMESTAMP_COLUMN_NAME
97 && is_timestamp
98 && desc.version == ColumnDescVersion::Unspecified as i32
100 {
101 desc.additional_column = Some(AdditionalColumn {
102 column_type: Some(AdditionalColumnType::Timestamp(
103 AdditionalColumnTimestamp {},
104 )),
105 });
106 }
107 });
108 });
109 }
110
111 SourceDescBuilder::new(
112 source_columns.clone(),
113 params.env.source_metrics(),
114 row_id_index.map(|x| x as _),
115 with_properties,
116 source_info,
117 params.env.config().developer.connector_message_buffer_size,
118 params.info.pk_indices.clone(),
128 )
129}
130
131impl ExecutorBuilder for SourceExecutorBuilder {
132 type Node = SourceNode;
133
134 async fn new_boxed_executor(
135 params: ExecutorParams,
136 node: &Self::Node,
137 store: impl StateStore,
138 ) -> StreamResult<Executor> {
139 let barrier_receiver = params
140 .local_barrier_manager
141 .subscribe_barrier(params.actor_context.id);
142 let system_params = params.env.system_params_manager_ref().get_params();
143
144 if let Some(source) = &node.source_inner {
145 let exec = {
146 let source_id = TableId::new(source.source_id);
147 let source_name = source.source_name.clone();
148 let mut source_info = source.get_info()?.clone();
149
150 if source_info.format_encode_options.is_empty() {
151 let connector = get_connector_name(&source.with_properties);
154 source_info.format_encode_options.extend(
155 source.with_properties.iter().filter_map(|(k, v)| {
156 should_copy_to_format_encode_options(k, &connector)
157 .then_some((k.to_owned(), v.to_owned()))
158 }),
159 );
160 }
161
162 let with_properties = WithOptionsSecResolved::new(
163 source.with_properties.clone(),
164 source.secret_refs.clone(),
165 );
166
167 let source_desc_builder = create_source_desc_builder(
168 source.columns.clone(),
169 ¶ms,
170 source_info,
171 source.row_id_index,
172 with_properties,
173 );
174
175 let source_column_ids: Vec<_> = source_desc_builder
176 .column_catalogs_to_source_column_descs()
177 .iter()
178 .map(|column| column.column_id)
179 .collect();
180
181 let state_table_handler = SourceStateTableHandler::from_table_catalog(
182 source.state_table.as_ref().unwrap(),
183 store.clone(),
184 )
185 .await;
186 let stream_source_core = StreamSourceCore::new(
187 source_id,
188 source_name,
189 source_column_ids,
190 source_desc_builder,
191 state_table_handler,
192 );
193
194 let is_legacy_fs_connector = source.with_properties.is_legacy_fs_connector();
195 let is_fs_v2_connector = source.with_properties.is_new_fs_connector();
196
197 if is_legacy_fs_connector {
198 bail!(
200 "legacy s3 connector is fully deprecated since v2.4.0, please DROP and recreate the s3 source.\nexecutor: {:?}",
201 params
202 );
203 } else if is_fs_v2_connector {
204 FsListExecutor::new(
205 params.actor_context.clone(),
206 Some(stream_source_core),
207 params.executor_stats.clone(),
208 barrier_receiver,
209 system_params,
210 source.rate_limit,
211 )
212 .boxed()
213 } else if source.with_properties.is_iceberg_connector() {
214 IcebergListExecutor::new(
215 params.actor_context.clone(),
216 stream_source_core,
217 params.executor_stats.clone(),
218 barrier_receiver,
219 system_params,
220 source.rate_limit,
221 params.env.config().clone(),
222 )
223 .boxed()
224 } else {
225 let is_shared = source.info.as_ref().is_some_and(|info| info.is_shared());
226 SourceExecutor::new(
227 params.actor_context.clone(),
228 Some(stream_source_core),
229 params.executor_stats.clone(),
230 barrier_receiver,
231 system_params,
232 source.rate_limit,
233 is_shared && !source.with_properties.is_cdc_connector(),
234 )
235 .boxed()
236 }
237 };
238
239 if crate::consistency::insane() {
240 let mut info = params.info.clone();
241 info.identity = format!("{} (troubled)", info.identity);
242 Ok((
243 params.info,
244 TroublemakerExecutor::new(
245 (info, exec).into(),
246 params.env.config().developer.chunk_size,
247 ),
248 )
249 .into())
250 } else {
251 Ok((params.info, exec).into())
252 }
253 } else {
254 let exec = SourceExecutor::<PanicStateStore>::new(
257 params.actor_context,
258 None,
259 params.executor_stats,
260 barrier_receiver,
261 system_params,
262 None,
263 false,
264 );
265 Ok((params.info, exec).into())
266 }
267 }
268}