risingwave_stream/from_proto/source/
trad_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{
16    KAFKA_TIMESTAMP_COLUMN_NAME, default_key_column_name_version_mapping,
17};
18use risingwave_connector::source::reader::desc::SourceDescBuilder;
19use risingwave_connector::source::should_copy_to_format_encode_options;
20use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt};
21use risingwave_expr::bail;
22use risingwave_pb::data::data_type::TypeName as PbTypeName;
23use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
24use risingwave_pb::plan_common::{
25    AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp,
26    AdditionalColumnType as LegacyAdditionalColumnType, ColumnDescVersion, FormatType,
27    PbColumnCatalog, PbEncodeType,
28};
29use risingwave_pb::stream_plan::SourceNode;
30use risingwave_storage::panic_store::PanicStateStore;
31
32use super::*;
33use crate::executor::TroublemakerExecutor;
34use crate::executor::source::{
35    FsListExecutor, IcebergListExecutor, SourceExecutor, SourceStateTableHandler, StreamSourceCore,
36};
37
38pub struct SourceExecutorBuilder;
39
40pub fn create_source_desc_builder(
41    mut source_columns: Vec<PbColumnCatalog>,
42    params: &ExecutorParams,
43    source_info: PbStreamSourceInfo,
44    row_id_index: Option<u32>,
45    with_properties: WithOptionsSecResolved,
46) -> SourceDescBuilder {
47    {
48        // compatible code: introduced in https://github.com/risingwavelabs/risingwave/pull/13707
49        // for upsert and (avro | protobuf) overwrite the `_rw_key` column's ColumnDesc.additional_column_type to Key
50        if source_info.format() == FormatType::Upsert
51            && (source_info.row_encode() == PbEncodeType::Avro
52                || source_info.row_encode() == PbEncodeType::Protobuf
53                || source_info.row_encode() == PbEncodeType::Json)
54        {
55            for c in &mut source_columns {
56                if let Some(desc) = c.column_desc.as_mut() {
57                    let is_bytea = desc
58                        .get_column_type()
59                        .map(|col_type| col_type.type_name == PbTypeName::Bytea as i32)
60                        .unwrap();
61                    if desc.name == default_key_column_name_version_mapping(
62                        &desc.version()
63                    )
64                        && is_bytea
65                        // the column is from a legacy version (before v1.5.x)
66                        && desc.version == ColumnDescVersion::Unspecified as i32
67                    {
68                        desc.additional_column = Some(AdditionalColumn {
69                            column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
70                        });
71                    }
72
73                    // the column is from a legacy version (v1.6.x)
74                    // introduced in https://github.com/risingwavelabs/risingwave/pull/15226
75                    if desc.additional_column_type == LegacyAdditionalColumnType::Key as i32 {
76                        desc.additional_column = Some(AdditionalColumn {
77                            column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
78                        });
79                    }
80                }
81            }
82        }
83    }
84
85    {
86        // compatible code: handle legacy column `_rw_kafka_timestamp`
87        // the column is auto added for all kafka source to empower batch query on source
88        // solution: rewrite the column `additional_column` to Timestamp
89
90        let _ = source_columns.iter_mut().map(|c| {
91            let _ = c.column_desc.as_mut().map(|desc| {
92                let is_timestamp = desc
93                    .get_column_type()
94                    .map(|col_type| col_type.type_name == PbTypeName::Timestamptz as i32)
95                    .unwrap();
96                if desc.name == KAFKA_TIMESTAMP_COLUMN_NAME
97                    && is_timestamp
98                    // the column is from a legacy version
99                    && desc.version == ColumnDescVersion::Unspecified as i32
100                {
101                    desc.additional_column = Some(AdditionalColumn {
102                        column_type: Some(AdditionalColumnType::Timestamp(
103                            AdditionalColumnTimestamp {},
104                        )),
105                    });
106                }
107            });
108        });
109    }
110
111    SourceDescBuilder::new(
112        source_columns.clone(),
113        params.env.source_metrics(),
114        row_id_index.map(|x| x as _),
115        with_properties,
116        source_info,
117        params.env.config().developer.connector_message_buffer_size,
118        // `pk_indices` is used to ensure that a message will be skipped instead of parsed
119        // with null pk when the pk column is missing.
120        //
121        // Currently pk_indices for source is always empty since pk information is not
122        // passed via `StreamSource` so null pk may be emitted to downstream.
123        //
124        // TODO: use the correct information to fill in pk_dicies.
125        // We should consdier add back the "pk_column_ids" field removed by #8841 in
126        // StreamSource
127        params.info.pk_indices.clone(),
128    )
129}
130
131impl ExecutorBuilder for SourceExecutorBuilder {
132    type Node = SourceNode;
133
134    async fn new_boxed_executor(
135        params: ExecutorParams,
136        node: &Self::Node,
137        store: impl StateStore,
138    ) -> StreamResult<Executor> {
139        let barrier_receiver = params
140            .local_barrier_manager
141            .subscribe_barrier(params.actor_context.id);
142        let system_params = params.env.system_params_manager_ref().get_params();
143
144        if let Some(source) = &node.source_inner {
145            let exec = {
146                let source_id = TableId::new(source.source_id);
147                let source_name = source.source_name.clone();
148                let mut source_info = source.get_info()?.clone();
149
150                if source_info.format_encode_options.is_empty() {
151                    // compatible code: quick fix for <https://github.com/risingwavelabs/risingwave/issues/14755>,
152                    // will move the logic to FragmentManager::init in release 1.7.
153                    let connector = get_connector_name(&source.with_properties);
154                    source_info.format_encode_options.extend(
155                        source.with_properties.iter().filter_map(|(k, v)| {
156                            should_copy_to_format_encode_options(k, &connector)
157                                .then_some((k.to_owned(), v.to_owned()))
158                        }),
159                    );
160                }
161
162                let with_properties = WithOptionsSecResolved::new(
163                    source.with_properties.clone(),
164                    source.secret_refs.clone(),
165                );
166
167                let source_desc_builder = create_source_desc_builder(
168                    source.columns.clone(),
169                    &params,
170                    source_info,
171                    source.row_id_index,
172                    with_properties,
173                );
174
175                let source_column_ids: Vec<_> = source_desc_builder
176                    .column_catalogs_to_source_column_descs()
177                    .iter()
178                    .map(|column| column.column_id)
179                    .collect();
180
181                let state_table_handler = SourceStateTableHandler::from_table_catalog(
182                    source.state_table.as_ref().unwrap(),
183                    store.clone(),
184                )
185                .await;
186                let stream_source_core = StreamSourceCore::new(
187                    source_id,
188                    source_name,
189                    source_column_ids,
190                    source_desc_builder,
191                    state_table_handler,
192                );
193
194                let is_legacy_fs_connector = source.with_properties.is_legacy_fs_connector();
195                let is_fs_v2_connector = source.with_properties.is_new_fs_connector();
196
197                if is_legacy_fs_connector {
198                    // Changed to default since v2.0 https://github.com/risingwavelabs/risingwave/pull/17963
199                    bail!(
200                        "legacy s3 connector is fully deprecated since v2.4.0, please DROP and recreate the s3 source.\nexecutor: {:?}",
201                        params
202                    );
203                } else if is_fs_v2_connector {
204                    FsListExecutor::new(
205                        params.actor_context.clone(),
206                        Some(stream_source_core),
207                        params.executor_stats.clone(),
208                        barrier_receiver,
209                        system_params,
210                        source.rate_limit,
211                    )
212                    .boxed()
213                } else if source.with_properties.is_iceberg_connector() {
214                    IcebergListExecutor::new(
215                        params.actor_context.clone(),
216                        stream_source_core,
217                        params.executor_stats.clone(),
218                        barrier_receiver,
219                        system_params,
220                        source.rate_limit,
221                        params.env.config().clone(),
222                    )
223                    .boxed()
224                } else {
225                    let is_shared = source.info.as_ref().is_some_and(|info| info.is_shared());
226                    SourceExecutor::new(
227                        params.actor_context.clone(),
228                        Some(stream_source_core),
229                        params.executor_stats.clone(),
230                        barrier_receiver,
231                        system_params,
232                        source.rate_limit,
233                        is_shared && !source.with_properties.is_cdc_connector(),
234                    )
235                    .boxed()
236                }
237            };
238
239            if crate::consistency::insane() {
240                let mut info = params.info.clone();
241                info.identity = format!("{} (troubled)", info.identity);
242                Ok((
243                    params.info,
244                    TroublemakerExecutor::new(
245                        (info, exec).into(),
246                        params.env.config().developer.chunk_size,
247                    ),
248                )
249                    .into())
250            } else {
251                Ok((params.info, exec).into())
252            }
253        } else {
254            // If there is no external stream source, then no data should be persisted. We pass a
255            // `PanicStateStore` type here for indication.
256            let exec = SourceExecutor::<PanicStateStore>::new(
257                params.actor_context,
258                None,
259                params.executor_stats,
260                barrier_receiver,
261                system_params,
262                None,
263                false,
264            );
265            Ok((params.info, exec).into())
266        }
267    }
268}