risingwave_stream/from_proto/source/
trad_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{
16    KAFKA_TIMESTAMP_COLUMN_NAME, default_key_column_name_version_mapping,
17};
18use risingwave_connector::source::reader::desc::SourceDescBuilder;
19use risingwave_connector::source::should_copy_to_format_encode_options;
20use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt};
21use risingwave_expr::bail;
22use risingwave_pb::data::data_type::TypeName as PbTypeName;
23use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
24use risingwave_pb::plan_common::{
25    AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp,
26    AdditionalColumnType as LegacyAdditionalColumnType, ColumnDescVersion, FormatType,
27    PbColumnCatalog, PbEncodeType,
28};
29use risingwave_pb::stream_plan::SourceNode;
30
31use super::*;
32use crate::executor::TroublemakerExecutor;
33use crate::executor::source::{
34    BatchAdbcSnowflakeListExecutor, BatchIcebergListExecutor, BatchPosixFsListExecutor,
35    DummySourceExecutor, FsListExecutor, IcebergListExecutor, SourceExecutor,
36    SourceStateTableHandler, StreamSourceCore,
37};
38use crate::from_proto::source::is_full_reload_refresh;
39
40pub struct SourceExecutorBuilder;
41
42pub fn create_source_desc_builder(
43    mut source_columns: Vec<PbColumnCatalog>,
44    params: &ExecutorParams,
45    source_info: PbStreamSourceInfo,
46    row_id_index: Option<u32>,
47    with_properties: WithOptionsSecResolved,
48) -> SourceDescBuilder {
49    {
50        // compatible code: introduced in https://github.com/risingwavelabs/risingwave/pull/13707
51        // for upsert and (avro | protobuf) overwrite the `_rw_key` column's ColumnDesc.additional_column_type to Key
52        if source_info.format() == FormatType::Upsert
53            && (source_info.row_encode() == PbEncodeType::Avro
54                || source_info.row_encode() == PbEncodeType::Protobuf
55                || source_info.row_encode() == PbEncodeType::Json)
56        {
57            for c in &mut source_columns {
58                if let Some(desc) = c.column_desc.as_mut() {
59                    let is_bytea = desc
60                        .get_column_type()
61                        .map(|col_type| col_type.type_name == PbTypeName::Bytea as i32)
62                        .unwrap();
63                    if desc.name == default_key_column_name_version_mapping(
64                        &desc.version()
65                    )
66                        && is_bytea
67                        // the column is from a legacy version (before v1.5.x)
68                        && desc.version == ColumnDescVersion::Unspecified as i32
69                    {
70                        desc.additional_column = Some(AdditionalColumn {
71                            column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
72                        });
73                    }
74
75                    // the column is from a legacy version (v1.6.x)
76                    // introduced in https://github.com/risingwavelabs/risingwave/pull/15226
77                    if desc.additional_column_type == LegacyAdditionalColumnType::Key as i32 {
78                        desc.additional_column = Some(AdditionalColumn {
79                            column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
80                        });
81                    }
82                }
83            }
84        }
85    }
86
87    {
88        // compatible code: handle legacy column `_rw_kafka_timestamp`
89        // the column is auto added for all kafka source to empower batch query on source
90        // solution: rewrite the column `additional_column` to Timestamp
91
92        let _ = source_columns.iter_mut().map(|c| {
93            let _ = c.column_desc.as_mut().map(|desc| {
94                let is_timestamp = desc
95                    .get_column_type()
96                    .map(|col_type| col_type.type_name == PbTypeName::Timestamptz as i32)
97                    .unwrap();
98                if desc.name == KAFKA_TIMESTAMP_COLUMN_NAME
99                    && is_timestamp
100                    // the column is from a legacy version
101                    && desc.version == ColumnDescVersion::Unspecified as i32
102                {
103                    desc.additional_column = Some(AdditionalColumn {
104                        column_type: Some(AdditionalColumnType::Timestamp(
105                            AdditionalColumnTimestamp {},
106                        )),
107                    });
108                }
109            });
110        });
111    }
112
113    SourceDescBuilder::new(
114        source_columns.clone(),
115        params.env.source_metrics(),
116        row_id_index.map(|x| x as _),
117        with_properties,
118        source_info,
119        params.config.developer.connector_message_buffer_size,
120        // `pk_indices` is used to ensure that a message will be skipped instead of parsed
121        // with null pk when the pk column is missing.
122        //
123        // Currently pk_indices for source is always empty since pk information is not
124        // passed via `StreamSource` so null pk may be emitted to downstream.
125        //
126        // TODO: use the correct information to fill in pk_dicies.
127        // We should consider add back the "pk_column_ids" field removed by #8841 in
128        // StreamSource
129        params.info.stream_key.clone(),
130    )
131}
132
133impl ExecutorBuilder for SourceExecutorBuilder {
134    type Node = SourceNode;
135
136    async fn new_boxed_executor(
137        params: ExecutorParams,
138        node: &Self::Node,
139        store: impl StateStore,
140    ) -> StreamResult<Executor> {
141        let barrier_receiver = params
142            .local_barrier_manager
143            .subscribe_barrier(params.actor_context.id);
144        let system_params = params.env.system_params_manager_ref().get_params();
145
146        if let Some(source) = &node.source_inner {
147            let is_full_reload_refresh = is_full_reload_refresh(&source.refresh_mode);
148            let exec = {
149                let source_id = source.source_id;
150                let source_name = source.source_name.clone();
151                let mut source_info = source.get_info()?.clone();
152                let associated_table_id = source.associated_table_id;
153
154                if source_info.format_encode_options.is_empty() {
155                    // compatible code: quick fix for <https://github.com/risingwavelabs/risingwave/issues/14755>,
156                    // will move the logic to FragmentManager::init in release 1.7.
157                    let connector = get_connector_name(&source.with_properties);
158                    source_info.format_encode_options.extend(
159                        source.with_properties.iter().filter_map(|(k, v)| {
160                            should_copy_to_format_encode_options(k, &connector)
161                                .then_some((k.to_owned(), v.to_owned()))
162                        }),
163                    );
164                }
165
166                let with_properties = WithOptionsSecResolved::new(
167                    source.with_properties.clone(),
168                    source.secret_refs.clone(),
169                );
170
171                let source_desc_builder = create_source_desc_builder(
172                    source.columns.clone(),
173                    &params,
174                    source_info,
175                    source.row_id_index,
176                    with_properties,
177                );
178
179                let source_column_ids: Vec<_> = source_desc_builder
180                    .column_catalogs_to_source_column_descs()
181                    .iter()
182                    .map(|column| column.column_id)
183                    .collect();
184
185                let state_table_handler = SourceStateTableHandler::from_table_catalog(
186                    source.state_table.as_ref().unwrap(),
187                    store.clone(),
188                )
189                .await;
190                let stream_source_core = StreamSourceCore::new(
191                    source_id,
192                    source_name,
193                    source_column_ids,
194                    source_desc_builder,
195                    state_table_handler,
196                );
197
198                let is_legacy_fs_connector = source.with_properties.is_legacy_fs_connector();
199                let is_fs_v2_connector = source.with_properties.is_new_fs_connector();
200
201                if is_legacy_fs_connector {
202                    // Changed to default since v2.0 https://github.com/risingwavelabs/risingwave/pull/17963
203                    bail!(
204                        "legacy s3 connector is fully deprecated since v2.4.0, please DROP and recreate the s3 source.\nexecutor: {:?}",
205                        params
206                    );
207                } else if is_fs_v2_connector {
208                    FsListExecutor::new(
209                        params.actor_context.clone(),
210                        stream_source_core,
211                        params.executor_stats.clone(),
212                        barrier_receiver,
213                        system_params,
214                        source.rate_limit,
215                    )
216                    .boxed()
217                } else if source.with_properties.is_iceberg_connector() {
218                    if is_full_reload_refresh {
219                        BatchIcebergListExecutor::new(
220                            params.actor_context.clone(),
221                            stream_source_core,
222                            source
223                                .downstream_columns
224                                .as_ref()
225                                .map(|x| x.columns.clone().into_iter().map(|c| c.into()).collect()),
226                            params.executor_stats.clone(),
227                            barrier_receiver,
228                            params.local_barrier_manager.clone(),
229                            associated_table_id,
230                        )
231                        .boxed()
232                    } else {
233                        IcebergListExecutor::new(
234                            params.actor_context.clone(),
235                            stream_source_core,
236                            source
237                                .downstream_columns
238                                .as_ref()
239                                .map(|x| x.columns.clone().into_iter().map(|c| c.into()).collect()),
240                            params.executor_stats.clone(),
241                            barrier_receiver,
242                            system_params,
243                            source.rate_limit,
244                            params.config.clone(),
245                        )
246                        .boxed()
247                    }
248                } else if source.with_properties.is_batch_connector() {
249                    if source
250                        .with_properties
251                        .get_connector()
252                        .map(|c| {
253                            c.eq_ignore_ascii_case(
254                                risingwave_connector::source::BATCH_POSIX_FS_CONNECTOR,
255                            )
256                        })
257                        .unwrap_or(false)
258                    {
259                        BatchPosixFsListExecutor::new(
260                            params.actor_context.clone(),
261                            stream_source_core,
262                            params.executor_stats.clone(),
263                            barrier_receiver,
264                            system_params,
265                            source.rate_limit,
266                            params.local_barrier_manager.clone(),
267                            associated_table_id,
268                        )
269                        .boxed()
270                    } else if source
271                        .with_properties
272                        .get_connector()
273                        .map(|c| {
274                            c.eq_ignore_ascii_case(
275                                risingwave_connector::source::ADBC_SNOWFLAKE_CONNECTOR,
276                            )
277                        })
278                        .unwrap_or(false)
279                    {
280                        BatchAdbcSnowflakeListExecutor::new(
281                            params.actor_context.clone(),
282                            stream_source_core,
283                            params.executor_stats.clone(),
284                            barrier_receiver,
285                            params.local_barrier_manager.clone(),
286                            associated_table_id,
287                        )
288                        .boxed()
289                    } else {
290                        unreachable!("unknown batch connector");
291                    }
292                } else {
293                    let is_shared = source.info.as_ref().is_some_and(|info| info.is_shared());
294                    SourceExecutor::new(
295                        params.actor_context.clone(),
296                        stream_source_core,
297                        params.executor_stats.clone(),
298                        barrier_receiver,
299                        system_params,
300                        source.rate_limit,
301                        is_shared && !source.with_properties.is_cdc_connector(),
302                        params.local_barrier_manager.clone(),
303                    )
304                    .boxed()
305                }
306            };
307
308            if crate::consistency::insane() {
309                let mut info = params.info.clone();
310                info.identity = format!("{} (troubled)", info.identity);
311                Ok((
312                    params.info,
313                    TroublemakerExecutor::new(
314                        (info, exec).into(),
315                        params.config.developer.chunk_size,
316                    ),
317                )
318                    .into())
319            } else {
320                Ok((params.info, exec).into())
321            }
322        } else {
323            // If there is no external stream source, then no data should be persisted.
324            // Use DummySourceExecutor which only forwards barrier messages.
325            let exec = DummySourceExecutor::new(params.actor_context, barrier_receiver);
326            Ok((params.info, exec).into())
327        }
328    }
329}