risingwave_stream/from_proto/source/
trad_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{
16    KAFKA_TIMESTAMP_COLUMN_NAME, default_key_column_name_version_mapping,
17};
18use risingwave_connector::source::reader::desc::SourceDescBuilder;
19use risingwave_connector::source::should_copy_to_format_encode_options;
20use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt};
21use risingwave_expr::bail;
22use risingwave_pb::data::data_type::TypeName as PbTypeName;
23use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
24use risingwave_pb::plan_common::{
25    AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp,
26    AdditionalColumnType as LegacyAdditionalColumnType, ColumnDescVersion, FormatType,
27    PbColumnCatalog, PbEncodeType,
28};
29use risingwave_pb::stream_plan::SourceNode;
30
31use super::*;
32use crate::executor::TroublemakerExecutor;
33use crate::executor::source::{
34    BatchIcebergListExecutor, BatchPosixFsListExecutor, DummySourceExecutor, FsListExecutor,
35    IcebergListExecutor, SourceExecutor, SourceStateTableHandler, StreamSourceCore,
36};
37use crate::from_proto::source::is_full_reload_refresh;
38
39pub struct SourceExecutorBuilder;
40
41pub fn create_source_desc_builder(
42    mut source_columns: Vec<PbColumnCatalog>,
43    params: &ExecutorParams,
44    source_info: PbStreamSourceInfo,
45    row_id_index: Option<u32>,
46    with_properties: WithOptionsSecResolved,
47) -> SourceDescBuilder {
48    {
49        // compatible code: introduced in https://github.com/risingwavelabs/risingwave/pull/13707
50        // for upsert and (avro | protobuf) overwrite the `_rw_key` column's ColumnDesc.additional_column_type to Key
51        if source_info.format() == FormatType::Upsert
52            && (source_info.row_encode() == PbEncodeType::Avro
53                || source_info.row_encode() == PbEncodeType::Protobuf
54                || source_info.row_encode() == PbEncodeType::Json)
55        {
56            for c in &mut source_columns {
57                if let Some(desc) = c.column_desc.as_mut() {
58                    let is_bytea = desc
59                        .get_column_type()
60                        .map(|col_type| col_type.type_name == PbTypeName::Bytea as i32)
61                        .unwrap();
62                    if desc.name == default_key_column_name_version_mapping(
63                        &desc.version()
64                    )
65                        && is_bytea
66                        // the column is from a legacy version (before v1.5.x)
67                        && desc.version == ColumnDescVersion::Unspecified as i32
68                    {
69                        desc.additional_column = Some(AdditionalColumn {
70                            column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
71                        });
72                    }
73
74                    // the column is from a legacy version (v1.6.x)
75                    // introduced in https://github.com/risingwavelabs/risingwave/pull/15226
76                    if desc.additional_column_type == LegacyAdditionalColumnType::Key as i32 {
77                        desc.additional_column = Some(AdditionalColumn {
78                            column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
79                        });
80                    }
81                }
82            }
83        }
84    }
85
86    {
87        // compatible code: handle legacy column `_rw_kafka_timestamp`
88        // the column is auto added for all kafka source to empower batch query on source
89        // solution: rewrite the column `additional_column` to Timestamp
90
91        let _ = source_columns.iter_mut().map(|c| {
92            let _ = c.column_desc.as_mut().map(|desc| {
93                let is_timestamp = desc
94                    .get_column_type()
95                    .map(|col_type| col_type.type_name == PbTypeName::Timestamptz as i32)
96                    .unwrap();
97                if desc.name == KAFKA_TIMESTAMP_COLUMN_NAME
98                    && is_timestamp
99                    // the column is from a legacy version
100                    && desc.version == ColumnDescVersion::Unspecified as i32
101                {
102                    desc.additional_column = Some(AdditionalColumn {
103                        column_type: Some(AdditionalColumnType::Timestamp(
104                            AdditionalColumnTimestamp {},
105                        )),
106                    });
107                }
108            });
109        });
110    }
111
112    SourceDescBuilder::new(
113        source_columns.clone(),
114        params.env.source_metrics(),
115        row_id_index.map(|x| x as _),
116        with_properties,
117        source_info,
118        params.config.developer.connector_message_buffer_size,
119        // `pk_indices` is used to ensure that a message will be skipped instead of parsed
120        // with null pk when the pk column is missing.
121        //
122        // Currently pk_indices for source is always empty since pk information is not
123        // passed via `StreamSource` so null pk may be emitted to downstream.
124        //
125        // TODO: use the correct information to fill in pk_dicies.
126        // We should consider add back the "pk_column_ids" field removed by #8841 in
127        // StreamSource
128        params.info.stream_key.clone(),
129    )
130}
131
132impl ExecutorBuilder for SourceExecutorBuilder {
133    type Node = SourceNode;
134
135    async fn new_boxed_executor(
136        params: ExecutorParams,
137        node: &Self::Node,
138        store: impl StateStore,
139    ) -> StreamResult<Executor> {
140        let barrier_receiver = params
141            .local_barrier_manager
142            .subscribe_barrier(params.actor_context.id);
143        let system_params = params.env.system_params_manager_ref().get_params();
144
145        if let Some(source) = &node.source_inner {
146            let is_full_reload_refresh = is_full_reload_refresh(&source.refresh_mode);
147            let exec = {
148                let source_id = source.source_id;
149                let source_name = source.source_name.clone();
150                let mut source_info = source.get_info()?.clone();
151                let associated_table_id = source.associated_table_id;
152
153                if source_info.format_encode_options.is_empty() {
154                    // compatible code: quick fix for <https://github.com/risingwavelabs/risingwave/issues/14755>,
155                    // will move the logic to FragmentManager::init in release 1.7.
156                    let connector = get_connector_name(&source.with_properties);
157                    source_info.format_encode_options.extend(
158                        source.with_properties.iter().filter_map(|(k, v)| {
159                            should_copy_to_format_encode_options(k, &connector)
160                                .then_some((k.to_owned(), v.to_owned()))
161                        }),
162                    );
163                }
164
165                let with_properties = WithOptionsSecResolved::new(
166                    source.with_properties.clone(),
167                    source.secret_refs.clone(),
168                );
169
170                let source_desc_builder = create_source_desc_builder(
171                    source.columns.clone(),
172                    &params,
173                    source_info,
174                    source.row_id_index,
175                    with_properties,
176                );
177
178                let source_column_ids: Vec<_> = source_desc_builder
179                    .column_catalogs_to_source_column_descs()
180                    .iter()
181                    .map(|column| column.column_id)
182                    .collect();
183
184                let state_table_handler = SourceStateTableHandler::from_table_catalog(
185                    source.state_table.as_ref().unwrap(),
186                    store.clone(),
187                )
188                .await;
189                let stream_source_core = StreamSourceCore::new(
190                    source_id,
191                    source_name,
192                    source_column_ids,
193                    source_desc_builder,
194                    state_table_handler,
195                );
196
197                let is_legacy_fs_connector = source.with_properties.is_legacy_fs_connector();
198                let is_fs_v2_connector = source.with_properties.is_new_fs_connector();
199
200                if is_legacy_fs_connector {
201                    // Changed to default since v2.0 https://github.com/risingwavelabs/risingwave/pull/17963
202                    bail!(
203                        "legacy s3 connector is fully deprecated since v2.4.0, please DROP and recreate the s3 source.\nexecutor: {:?}",
204                        params
205                    );
206                } else if is_fs_v2_connector {
207                    FsListExecutor::new(
208                        params.actor_context.clone(),
209                        stream_source_core,
210                        params.executor_stats.clone(),
211                        barrier_receiver,
212                        system_params,
213                        source.rate_limit,
214                    )
215                    .boxed()
216                } else if source.with_properties.is_iceberg_connector() {
217                    if is_full_reload_refresh {
218                        BatchIcebergListExecutor::new(
219                            params.actor_context.clone(),
220                            stream_source_core,
221                            source
222                                .downstream_columns
223                                .as_ref()
224                                .map(|x| x.columns.clone().into_iter().map(|c| c.into()).collect()),
225                            params.executor_stats.clone(),
226                            barrier_receiver,
227                            params.local_barrier_manager.clone(),
228                            associated_table_id,
229                        )
230                        .boxed()
231                    } else {
232                        IcebergListExecutor::new(
233                            params.actor_context.clone(),
234                            stream_source_core,
235                            source
236                                .downstream_columns
237                                .as_ref()
238                                .map(|x| x.columns.clone().into_iter().map(|c| c.into()).collect()),
239                            params.executor_stats.clone(),
240                            barrier_receiver,
241                            system_params,
242                            source.rate_limit,
243                            params.config.clone(),
244                        )
245                        .boxed()
246                    }
247                } else if source.with_properties.is_batch_connector() {
248                    if source
249                        .with_properties
250                        .get_connector()
251                        .map(|c| {
252                            c.eq_ignore_ascii_case(
253                                risingwave_connector::source::BATCH_POSIX_FS_CONNECTOR,
254                            )
255                        })
256                        .unwrap_or(false)
257                    {
258                        BatchPosixFsListExecutor::new(
259                            params.actor_context.clone(),
260                            stream_source_core,
261                            params.executor_stats.clone(),
262                            barrier_receiver,
263                            system_params,
264                            source.rate_limit,
265                            params.local_barrier_manager.clone(),
266                            associated_table_id,
267                        )
268                        .boxed()
269                    } else {
270                        unreachable!("unknown batch connector");
271                    }
272                } else {
273                    let is_shared = source.info.as_ref().is_some_and(|info| info.is_shared());
274                    SourceExecutor::new(
275                        params.actor_context.clone(),
276                        stream_source_core,
277                        params.executor_stats.clone(),
278                        barrier_receiver,
279                        system_params,
280                        source.rate_limit,
281                        is_shared && !source.with_properties.is_cdc_connector(),
282                        params.local_barrier_manager.clone(),
283                    )
284                    .boxed()
285                }
286            };
287
288            if crate::consistency::insane() {
289                let mut info = params.info.clone();
290                info.identity = format!("{} (troubled)", info.identity);
291                Ok((
292                    params.info,
293                    TroublemakerExecutor::new(
294                        (info, exec).into(),
295                        params.config.developer.chunk_size,
296                    ),
297                )
298                    .into())
299            } else {
300                Ok((params.info, exec).into())
301            }
302        } else {
303            // If there is no external stream source, then no data should be persisted.
304            // Use DummySourceExecutor which only forwards barrier messages.
305            let exec = DummySourceExecutor::new(params.actor_context, barrier_receiver);
306            Ok((params.info, exec).into())
307        }
308    }
309}