Skip to main content

risingwave_stream/from_proto/source/
trad_source.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{
16    KAFKA_TIMESTAMP_COLUMN_NAME, default_key_column_name_version_mapping,
17};
18use risingwave_connector::source::reader::desc::SourceDescBuilder;
19use risingwave_connector::source::should_copy_to_format_encode_options;
20use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt};
21use risingwave_expr::bail;
22use risingwave_pb::data::data_type::TypeName as PbTypeName;
23use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
24use risingwave_pb::plan_common::{
25    AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp,
26    AdditionalColumnType as LegacyAdditionalColumnType, ColumnDescVersion, FormatType,
27    PbColumnCatalog, PbEncodeType,
28};
29use risingwave_pb::stream_plan::SourceNode;
30
31use super::*;
32use crate::executor::TroublemakerExecutor;
33use crate::executor::source::{
34    BatchAdbcSnowflakeListExecutor, BatchIcebergListExecutor, BatchPosixFsListExecutor,
35    DummySourceExecutor, FsListExecutor, IcebergListExecutor, SourceExecutor,
36    SourceStateTableHandler, StreamSourceCore,
37};
38use crate::from_proto::source::is_full_reload_refresh;
39
40pub struct SourceExecutorBuilder;
41
42pub fn create_source_desc_builder(
43    mut source_columns: Vec<PbColumnCatalog>,
44    params: &ExecutorParams,
45    source_info: PbStreamSourceInfo,
46    row_id_index: Option<u32>,
47    with_properties: WithOptionsSecResolved,
48) -> SourceDescBuilder {
49    {
50        // compatible code: introduced in https://github.com/risingwavelabs/risingwave/pull/13707
51        // for upsert and (avro | protobuf) overwrite the `_rw_key` column's ColumnDesc.additional_column_type to Key
52        if source_info.format() == FormatType::Upsert
53            && (source_info.row_encode() == PbEncodeType::Avro
54                || source_info.row_encode() == PbEncodeType::Protobuf
55                || source_info.row_encode() == PbEncodeType::Json)
56        {
57            for c in &mut source_columns {
58                if let Some(desc) = c.column_desc.as_mut() {
59                    let is_bytea = desc
60                        .get_column_type()
61                        .map(|col_type| col_type.type_name == PbTypeName::Bytea as i32)
62                        .unwrap();
63                    if desc.name == default_key_column_name_version_mapping(
64                        &desc.version()
65                    )
66                        && is_bytea
67                        // the column is from a legacy version (before v1.5.x)
68                        && desc.version == ColumnDescVersion::Unspecified as i32
69                    {
70                        desc.additional_column = Some(AdditionalColumn {
71                            column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
72                        });
73                    }
74
75                    // the column is from a legacy version (v1.6.x)
76                    // introduced in https://github.com/risingwavelabs/risingwave/pull/15226
77                    if desc.additional_column_type == LegacyAdditionalColumnType::Key as i32 {
78                        desc.additional_column = Some(AdditionalColumn {
79                            column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
80                        });
81                    }
82                }
83            }
84        }
85    }
86
87    {
88        // compatible code: handle legacy column `_rw_kafka_timestamp`
89        // the column is auto added for all kafka source to empower batch query on source
90        // solution: rewrite the column `additional_column` to Timestamp
91
92        let _ = source_columns.iter_mut().map(|c| {
93            let _ = c.column_desc.as_mut().map(|desc| {
94                let is_timestamp = desc
95                    .get_column_type()
96                    .map(|col_type| col_type.type_name == PbTypeName::Timestamptz as i32)
97                    .unwrap();
98                if desc.name == KAFKA_TIMESTAMP_COLUMN_NAME
99                    && is_timestamp
100                    // the column is from a legacy version
101                    && desc.version == ColumnDescVersion::Unspecified as i32
102                {
103                    desc.additional_column = Some(AdditionalColumn {
104                        column_type: Some(AdditionalColumnType::Timestamp(
105                            AdditionalColumnTimestamp {},
106                        )),
107                    });
108                }
109            });
110        });
111    }
112
113    SourceDescBuilder::new(
114        source_columns.clone(),
115        params.env.source_metrics(),
116        row_id_index.map(|x| x as _),
117        with_properties,
118        source_info,
119        params.config.developer.connector_message_buffer_size,
120        // `pk_indices` is used to ensure that a message will be skipped instead of parsed
121        // with null pk when the pk column is missing.
122        //
123        // Currently pk_indices for source is always empty since pk information is not
124        // passed via `StreamSource` so null pk may be emitted to downstream.
125        //
126        // TODO: use the correct information to fill in pk_dicies.
127        // We should consider add back the "pk_column_ids" field removed by #8841 in
128        // StreamSource
129        params.info.stream_key.clone(),
130    )
131}
132
133impl_stream_node_body!(Source(SourceNode) => SourceExecutorBuilder);
134
135impl ExecutorBuilder for SourceExecutorBuilder {
136    type Node = SourceNode;
137
138    async fn new_boxed_executor(
139        params: ExecutorParams,
140        node: &Self::Node,
141        store: impl StateStore,
142    ) -> StreamResult<Executor> {
143        let barrier_receiver = params
144            .local_barrier_manager
145            .subscribe_barrier(params.actor_context.id);
146        let system_params = params.env.system_params_manager_ref().get_params();
147
148        if let Some(source) = &node.source_inner {
149            let is_full_reload_refresh = is_full_reload_refresh(&source.refresh_mode);
150            let exec = {
151                let source_id = source.source_id;
152                let source_name = source.source_name.clone();
153                let mut source_info = source.get_info()?.clone();
154                let associated_table_id = source.associated_table_id;
155
156                if source_info.format_encode_options.is_empty() {
157                    // compatible code: quick fix for <https://github.com/risingwavelabs/risingwave/issues/14755>,
158                    // will move the logic to FragmentManager::init in release 1.7.
159                    let connector = get_connector_name(&source.with_properties);
160                    source_info.format_encode_options.extend(
161                        source.with_properties.iter().filter_map(|(k, v)| {
162                            should_copy_to_format_encode_options(k, &connector)
163                                .then_some((k.to_owned(), v.to_owned()))
164                        }),
165                    );
166                }
167
168                let with_properties = WithOptionsSecResolved::new(
169                    source.with_properties.clone(),
170                    source.secret_refs.clone(),
171                );
172
173                let source_desc_builder = create_source_desc_builder(
174                    source.columns.clone(),
175                    &params,
176                    source_info,
177                    source.row_id_index,
178                    with_properties,
179                );
180
181                let source_column_ids: Vec<_> = source_desc_builder
182                    .column_catalogs_to_source_column_descs()
183                    .iter()
184                    .map(|column| column.column_id)
185                    .collect();
186
187                let state_table_handler = SourceStateTableHandler::from_table_catalog(
188                    source.state_table.as_ref().unwrap(),
189                    store.clone(),
190                )
191                .await;
192                let stream_source_core = StreamSourceCore::new(
193                    source_id,
194                    source_name,
195                    source_column_ids,
196                    source_desc_builder,
197                    state_table_handler,
198                );
199
200                let is_legacy_fs_connector = source.with_properties.is_legacy_fs_connector();
201                let is_fs_v2_connector = source.with_properties.is_new_fs_connector();
202
203                if is_legacy_fs_connector {
204                    // Changed to default since v2.0 https://github.com/risingwavelabs/risingwave/pull/17963
205                    bail!(
206                        "legacy s3 connector is fully deprecated since v2.4.0, please DROP and recreate the s3 source.\nexecutor: {:?}",
207                        params
208                    );
209                } else if is_fs_v2_connector {
210                    FsListExecutor::new(
211                        params.actor_context.clone(),
212                        stream_source_core,
213                        params.executor_stats.clone(),
214                        barrier_receiver,
215                        system_params,
216                        source.rate_limit,
217                    )
218                    .boxed()
219                } else if source.with_properties.is_iceberg_connector() {
220                    if is_full_reload_refresh {
221                        BatchIcebergListExecutor::new(
222                            params.actor_context.clone(),
223                            stream_source_core,
224                            source
225                                .downstream_columns
226                                .as_ref()
227                                .map(|x| x.columns.clone().into_iter().map(|c| c.into()).collect()),
228                            params.executor_stats.clone(),
229                            barrier_receiver,
230                            params.local_barrier_manager.clone(),
231                            associated_table_id,
232                        )
233                        .boxed()
234                    } else {
235                        IcebergListExecutor::new(
236                            params.actor_context.clone(),
237                            stream_source_core,
238                            source
239                                .downstream_columns
240                                .as_ref()
241                                .map(|x| x.columns.clone().into_iter().map(|c| c.into()).collect()),
242                            params.executor_stats.clone(),
243                            barrier_receiver,
244                            system_params,
245                            source.rate_limit,
246                            params.config.clone(),
247                        )
248                        .boxed()
249                    }
250                } else if source.with_properties.is_batch_connector() {
251                    if source
252                        .with_properties
253                        .get_connector()
254                        .map(|c| {
255                            c.eq_ignore_ascii_case(
256                                risingwave_connector::source::BATCH_POSIX_FS_CONNECTOR,
257                            )
258                        })
259                        .unwrap_or(false)
260                    {
261                        BatchPosixFsListExecutor::new(
262                            params.actor_context.clone(),
263                            stream_source_core,
264                            params.executor_stats.clone(),
265                            barrier_receiver,
266                            system_params,
267                            source.rate_limit,
268                            params.local_barrier_manager.clone(),
269                            associated_table_id,
270                        )
271                        .boxed()
272                    } else if source
273                        .with_properties
274                        .get_connector()
275                        .map(|c| {
276                            c.eq_ignore_ascii_case(
277                                risingwave_connector::source::ADBC_SNOWFLAKE_CONNECTOR,
278                            )
279                        })
280                        .unwrap_or(false)
281                    {
282                        BatchAdbcSnowflakeListExecutor::new(
283                            params.actor_context.clone(),
284                            stream_source_core,
285                            params.executor_stats.clone(),
286                            barrier_receiver,
287                            params.local_barrier_manager.clone(),
288                            associated_table_id,
289                        )
290                        .boxed()
291                    } else {
292                        unreachable!("unknown batch connector");
293                    }
294                } else {
295                    let is_shared = source.info.as_ref().is_some_and(|info| info.is_shared());
296                    SourceExecutor::new(
297                        params.actor_context.clone(),
298                        stream_source_core,
299                        params.executor_stats.clone(),
300                        barrier_receiver,
301                        system_params,
302                        source.rate_limit,
303                        is_shared && !source.with_properties.is_cdc_connector(),
304                        params.local_barrier_manager.clone(),
305                    )
306                    .boxed()
307                }
308            };
309
310            if crate::consistency::insane() {
311                let mut info = params.info.clone();
312                info.identity = format!("{} (troubled)", info.identity);
313                Ok((
314                    params.info,
315                    TroublemakerExecutor::new(
316                        (info, exec).into(),
317                        params.config.developer.chunk_size,
318                    ),
319                )
320                    .into())
321            } else {
322                Ok((params.info, exec).into())
323            }
324        } else {
325            // If there is no external stream source, then no data should be persisted.
326            // Use DummySourceExecutor which only forwards barrier messages.
327            let exec = DummySourceExecutor::new(params.actor_context, barrier_receiver);
328            Ok((params.info, exec).into())
329        }
330    }
331}