risingwave_connector/source/
base.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use async_trait::async_trait;
20use bytes::Bytes;
21use enum_as_inner::EnumAsInner;
22use futures::future::try_join_all;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt, TryStreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bail;
28use risingwave_common::id::{ActorId, FragmentId, SourceId};
29use risingwave_common::secret::LocalSecretManager;
30use risingwave_common::types::{JsonbVal, Scalar};
31use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
32use risingwave_pb::plan_common::ExternalTableDesc;
33use risingwave_pb::source::ConnectorSplit;
34use rw_futures_util::select_all;
35use serde::de::DeserializeOwned;
36use serde_json::json;
37use tokio::sync::mpsc;
38
39use super::cdc::DebeziumCdcMeta;
40use super::datagen::DatagenMeta;
41use super::google_pubsub::GooglePubsubMeta;
42use super::kafka::KafkaMeta;
43use super::kinesis::KinesisMeta;
44use super::monitor::SourceMetrics;
45use super::nats::source::NatsMeta;
46use super::nexmark::source::message::NexmarkMeta;
47use super::pulsar::source::PulsarMeta;
48use crate::enforce_secret::EnforceSecret;
49use crate::error::ConnectorResult as Result;
50use crate::parser::ParserConfig;
51use crate::parser::schema_change::SchemaChangeEnvelope;
52use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
53use crate::source::batch::BatchSourceSplitImpl;
54use crate::source::monitor::EnumeratorMetrics;
55use crate::with_options::WithOptions;
56use crate::{
57    WithOptionsSecResolved, WithPropertiesExt, dispatch_source_prop, dispatch_split_impl,
58    for_all_connections, for_all_sources, impl_connection, impl_connector_properties, impl_split,
59    match_source_name_str,
60};
61
62const SPLIT_TYPE_FIELD: &str = "split_type";
63const SPLIT_INFO_FIELD: &str = "split_info";
64pub const UPSTREAM_SOURCE_KEY: &str = "connector";
65
66pub const WEBHOOK_CONNECTOR: &str = "webhook";
67
68/// Callback wrapper for reporting CDC auto schema change fail events
69/// Parameters: (`table_id`, `table_name`, `cdc_table_id`, `upstream_ddl`, `fail_info`)
70#[derive(Clone)]
71pub struct CdcAutoSchemaChangeFailCallback(
72    Arc<dyn Fn(SourceId, String, String, String, String) + Send + Sync>,
73);
74
75impl CdcAutoSchemaChangeFailCallback {
76    pub fn new<F>(f: F) -> Self
77    where
78        F: Fn(SourceId, String, String, String, String) + Send + Sync + 'static,
79    {
80        Self(Arc::new(f))
81    }
82
83    pub fn call(
84        &self,
85        source_id: SourceId,
86        table_name: String,
87        cdc_table_id: String,
88        upstream_ddl: String,
89        fail_info: String,
90    ) {
91        self.0(source_id, table_name, cdc_table_id, upstream_ddl, fail_info);
92    }
93}
94
95impl std::fmt::Debug for CdcAutoSchemaChangeFailCallback {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        f.write_str("CdcAutoSchemaChangeFailCallback")
98    }
99}
100pub trait TryFromBTreeMap: Sized + UnknownFields {
101    /// Used to initialize the source properties from the raw untyped `WITH` options.
102    fn try_from_btreemap(
103        props: BTreeMap<String, String>,
104        deny_unknown_fields: bool,
105    ) -> Result<Self>;
106}
107
108/// Represents `WITH` options for sources.
109///
110/// Each instance should add a `#[derive(with_options::WithOptions)]` marker.
111pub trait SourceProperties:
112    TryFromBTreeMap + Clone + WithOptions + std::fmt::Debug + EnforceSecret
113{
114    const SOURCE_NAME: &'static str;
115    type Split: SplitMetaData
116        + TryFrom<SplitImpl, Error = crate::error::ConnectorError>
117        + Into<SplitImpl>;
118    type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
119    type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;
120
121    /// Load additional info from `PbSource`. Currently only used by CDC.
122    fn init_from_pb_source(&mut self, _source: &PbSource) {}
123
124    /// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
125    fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
126}
127
128pub trait UnknownFields {
129    /// Unrecognized fields in the `WITH` clause.
130    fn unknown_fields(&self) -> HashMap<String, String>;
131}
132
133impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
134    fn try_from_btreemap(
135        props: BTreeMap<String, String>,
136        deny_unknown_fields: bool,
137    ) -> Result<Self> {
138        let json_value = serde_json::to_value(props)?;
139        let res = serde_json::from_value::<P>(json_value)?;
140
141        if !deny_unknown_fields || res.unknown_fields().is_empty() {
142            Ok(res)
143        } else {
144            bail!(
145                "Unknown fields in the WITH clause: {:?}",
146                res.unknown_fields()
147            )
148        }
149    }
150}
151
152#[derive(Default)]
153pub struct CreateSplitReaderOpt {
154    pub support_multiple_splits: bool,
155    pub seek_to_latest: bool,
156}
157
158#[derive(Default)]
159pub struct CreateSplitReaderResult {
160    pub latest_splits: Option<Vec<SplitImpl>>,
161    pub backfill_info: HashMap<SplitId, BackfillInfo>,
162}
163
164pub async fn create_split_readers<P: SourceProperties>(
165    prop: P,
166    splits: Vec<SplitImpl>,
167    parser_config: ParserConfig,
168    source_ctx: SourceContextRef,
169    columns: Option<Vec<Column>>,
170    opt: CreateSplitReaderOpt,
171) -> Result<(BoxSourceReaderEventStream, CreateSplitReaderResult)> {
172    let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
173    let mut res = CreateSplitReaderResult {
174        backfill_info: HashMap::new(),
175        latest_splits: None,
176    };
177    if opt.support_multiple_splits {
178        let mut reader = P::SplitReader::new(
179            prop.clone(),
180            splits,
181            parser_config.clone(),
182            source_ctx.clone(),
183            columns.clone(),
184        )
185        .await?;
186        if opt.seek_to_latest {
187            res.latest_splits = Some(reader.seek_to_latest().await?);
188        }
189        res.backfill_info = reader.backfill_info();
190        Ok((reader.into_event_stream().boxed(), res))
191    } else {
192        let mut readers = try_join_all(splits.into_iter().map(|split| {
193            // TODO: is this reader split across multiple threads...? Realistically, we want
194            // source_ctx to live in a single actor.
195            P::SplitReader::new(
196                prop.clone(),
197                vec![split],
198                parser_config.clone(),
199                source_ctx.clone(),
200                columns.clone(),
201            )
202        }))
203        .await?;
204        if opt.seek_to_latest {
205            let mut latest_splits = vec![];
206            for reader in &mut readers {
207                latest_splits.extend(reader.seek_to_latest().await?);
208            }
209            res.latest_splits = Some(latest_splits);
210        }
211        res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
212        Ok((
213            select_all(readers.into_iter().map(|r| r.into_event_stream())).boxed(),
214            res,
215        ))
216    }
217}
218
219/// [`SplitEnumerator`] fetches the split metadata from the external source service.
220/// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate.
221#[async_trait]
222pub trait SplitEnumerator: Sized + Send {
223    type Split: SplitMetaData + Send;
224    type Properties;
225
226    async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
227    -> Result<Self>;
228    async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
229    /// Do some cleanup work when a fragment is dropped, e.g., drop Kafka consumer group.
230    async fn on_drop_fragments(&mut self, _fragment_ids: Vec<FragmentId>) -> Result<()> {
231        Ok(())
232    }
233    /// Do some cleanup work when a backfill fragment is finished, e.g., drop Kafka consumer group.
234    async fn on_finish_backfill(&mut self, _fragment_ids: Vec<FragmentId>) -> Result<()> {
235        Ok(())
236    }
237    /// Called after `worker.tick()` execution to perform periodic operations,
238    /// such as monitoring upstream PostgreSQL `confirmed_flush_lsn`, etc.
239    /// This can be extended to support more periodic operations in the future.
240    async fn on_tick(&mut self) -> Result<()> {
241        Ok(())
242    }
243}
244
245pub type SourceContextRef = Arc<SourceContext>;
246pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
247
248/// Dyn-compatible [`SplitEnumerator`].
249#[async_trait]
250pub trait AnySplitEnumerator: Send {
251    async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
252    async fn on_drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> Result<()>;
253    async fn on_finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> Result<()>;
254    async fn on_tick(&mut self) -> Result<()>;
255}
256
257#[async_trait]
258impl<T: SplitEnumerator<Split: Into<SplitImpl>> + 'static> AnySplitEnumerator for T {
259    async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
260        SplitEnumerator::list_splits(self)
261            .await
262            .map(|s| s.into_iter().map(|s| s.into()).collect())
263    }
264
265    async fn on_drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> Result<()> {
266        SplitEnumerator::on_drop_fragments(self, fragment_ids).await
267    }
268
269    async fn on_finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> Result<()> {
270        SplitEnumerator::on_finish_backfill(self, fragment_ids).await
271    }
272
273    async fn on_tick(&mut self) -> Result<()> {
274        SplitEnumerator::on_tick(self).await
275    }
276}
277
278/// The max size of a chunk yielded by source stream.
279pub const MAX_CHUNK_SIZE: usize = 1024;
280
281#[derive(Debug, Clone, Copy)]
282pub struct SourceCtrlOpts {
283    /// The max size of a chunk yielded by source stream.
284    pub chunk_size: usize,
285    /// Whether to allow splitting a transaction into multiple chunks to meet the `max_chunk_size`.
286    pub split_txn: bool,
287}
288
289// The options in `SourceCtrlOpts` are so important that we don't want to impl `Default` for it,
290// so that we can prevent any unintentional use of the default value.
291impl !Default for SourceCtrlOpts {}
292
293impl SourceCtrlOpts {
294    #[cfg(test)]
295    pub fn for_test() -> Self {
296        SourceCtrlOpts {
297            chunk_size: 256,
298            split_txn: false,
299        }
300    }
301}
302
303#[derive(Debug)]
304pub struct SourceEnumeratorContext {
305    pub info: SourceEnumeratorInfo,
306    pub metrics: Arc<EnumeratorMetrics>,
307}
308
309impl SourceEnumeratorContext {
310    /// Create a dummy `SourceEnumeratorContext` for testing purpose, or for the situation
311    /// where the real context doesn't matter.
312    pub fn dummy() -> SourceEnumeratorContext {
313        SourceEnumeratorContext {
314            info: SourceEnumeratorInfo {
315                source_id: 0.into(),
316            },
317            metrics: Arc::new(EnumeratorMetrics::default()),
318        }
319    }
320}
321
322#[derive(Clone, Debug)]
323pub struct SourceEnumeratorInfo {
324    pub source_id: SourceId,
325}
326
327#[derive(Clone, Debug)]
328pub struct SourceContext {
329    pub actor_id: ActorId,
330    pub source_id: SourceId,
331    pub fragment_id: FragmentId,
332    pub source_name: String,
333    pub metrics: Arc<SourceMetrics>,
334    pub source_ctrl_opts: SourceCtrlOpts,
335    pub connector_props: ConnectorProperties,
336    // source parser put schema change event into this channel
337    pub schema_change_tx:
338        Option<mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>>,
339    // callback function to report CDC auto schema change fail events
340    pub on_cdc_auto_schema_change_failure: Option<CdcAutoSchemaChangeFailCallback>,
341}
342
343impl SourceContext {
344    pub fn new(
345        actor_id: ActorId,
346        source_id: SourceId,
347        fragment_id: FragmentId,
348        source_name: String,
349        metrics: Arc<SourceMetrics>,
350        source_ctrl_opts: SourceCtrlOpts,
351        connector_props: ConnectorProperties,
352        schema_change_channel: Option<
353            mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
354        >,
355    ) -> Self {
356        Self::new_with_auto_schema_change_callback(
357            actor_id,
358            source_id,
359            fragment_id,
360            source_name,
361            metrics,
362            source_ctrl_opts,
363            connector_props,
364            schema_change_channel,
365            None,
366        )
367    }
368
369    pub fn new_with_auto_schema_change_callback(
370        actor_id: ActorId,
371        source_id: SourceId,
372        fragment_id: FragmentId,
373        source_name: String,
374        metrics: Arc<SourceMetrics>,
375        source_ctrl_opts: SourceCtrlOpts,
376        connector_props: ConnectorProperties,
377        schema_change_channel: Option<
378            mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
379        >,
380        on_cdc_auto_schema_change_failure: Option<CdcAutoSchemaChangeFailCallback>,
381    ) -> Self {
382        Self {
383            actor_id,
384            source_id,
385            fragment_id,
386            source_name,
387            metrics,
388            source_ctrl_opts,
389            connector_props,
390            schema_change_tx: schema_change_channel,
391            on_cdc_auto_schema_change_failure,
392        }
393    }
394
395    /// Create a dummy `SourceContext` for testing purpose, or for the situation
396    /// where the real context doesn't matter.
397    pub fn dummy() -> Self {
398        Self::new(
399            0.into(),
400            SourceId::new(0),
401            0.into(),
402            "dummy".to_owned(),
403            Arc::new(SourceMetrics::default()),
404            SourceCtrlOpts {
405                chunk_size: MAX_CHUNK_SIZE,
406                split_txn: false,
407            },
408            ConnectorProperties::default(),
409            None,
410        )
411    }
412
413    /// Report CDC auto schema change fail event
414    /// Parameters: (`source_id`, `table_name`, `cdc_table_id`, `upstream_ddl`, `fail_info`)
415    pub fn on_cdc_auto_schema_change_failure(
416        &self,
417        source_id: SourceId,
418        table_name: String,
419        cdc_table_id: String,
420        upstream_ddl: String,
421        fail_info: String,
422    ) {
423        if let Some(ref cdc_auto_schema_change_fail_callback) =
424            self.on_cdc_auto_schema_change_failure
425        {
426            cdc_auto_schema_change_fail_callback.call(
427                source_id,
428                table_name,
429                cdc_table_id,
430                upstream_ddl,
431                fail_info,
432            );
433        }
434    }
435}
436
437#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
438pub enum SourceFormat {
439    #[default]
440    Invalid,
441    Native,
442    None,
443    Debezium,
444    DebeziumMongo,
445    Maxwell,
446    Canal,
447    Upsert,
448    Plain,
449}
450
451/// Refer to [`crate::parser::EncodingProperties`]
452#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
453pub enum SourceEncode {
454    #[default]
455    Invalid,
456    Native,
457    None,
458    Avro,
459    Csv,
460    Protobuf,
461    Json,
462    Bytes,
463    Parquet,
464}
465
466#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
467pub struct SourceStruct {
468    pub format: SourceFormat,
469    pub encode: SourceEncode,
470}
471
472impl SourceStruct {
473    pub fn new(format: SourceFormat, encode: SourceEncode) -> Self {
474        Self { format, encode }
475    }
476}
477
478// Only return valid (format, encode)
479pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct> {
480    use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType};
481
482    // old version meta.
483    if let Ok(format) = info.get_row_format() {
484        let (format, encode) = match format {
485            RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json),
486            RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf),
487            RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json),
488            RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro),
489            RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json),
490            RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json),
491            RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv),
492            RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native),
493            RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro),
494            RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json),
495            RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro),
496            RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json),
497            RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes),
498            RowFormatType::RowUnspecified => unreachable!(),
499        };
500        return Ok(SourceStruct::new(format, encode));
501    }
502    let source_format = info.get_format()?;
503    let source_encode = info.get_row_encode()?;
504    let (format, encode) = match (source_format, source_encode) {
505        (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
506        (PbFormatType::Plain, PbEncodeType::Protobuf) => {
507            (SourceFormat::Plain, SourceEncode::Protobuf)
508        }
509        (PbFormatType::Debezium, PbEncodeType::Json) => {
510            (SourceFormat::Debezium, SourceEncode::Json)
511        }
512        (PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro),
513        (PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json),
514        (PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json),
515        (PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv),
516        (PbFormatType::Plain, PbEncodeType::Parquet) => {
517            (SourceFormat::Plain, SourceEncode::Parquet)
518        }
519        (PbFormatType::Native, PbEncodeType::Native) => {
520            (SourceFormat::Native, SourceEncode::Native)
521        }
522        (PbFormatType::None, PbEncodeType::None) => (SourceFormat::None, SourceEncode::None),
523        (PbFormatType::Debezium, PbEncodeType::Avro) => {
524            (SourceFormat::Debezium, SourceEncode::Avro)
525        }
526        (PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json),
527        (PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro),
528        (PbFormatType::DebeziumMongo, PbEncodeType::Json) => {
529            (SourceFormat::DebeziumMongo, SourceEncode::Json)
530        }
531        (PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes),
532        (PbFormatType::Upsert, PbEncodeType::Protobuf) => {
533            (SourceFormat::Upsert, SourceEncode::Protobuf)
534        }
535        (format, encode) => {
536            bail!(
537                "Unsupported combination of format {:?} and encode {:?}",
538                format,
539                encode
540            );
541        }
542    };
543    Ok(SourceStruct::new(format, encode))
544}
545
546/// Stream of [`SourceMessage`]. Messages flow through the stream in the unit of a batch.
547pub type BoxSourceMessageStream =
548    BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
549/// Stream of source message events.
550pub type BoxSourceMessageEventStream =
551    BoxStream<'static, crate::error::ConnectorResult<SourceMessageEvent>>;
552/// Stream of [`StreamChunk`]s parsed from the messages from the external source.
553pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
554/// Stream of source reader events.
555pub type BoxSourceReaderEventStream =
556    BoxStream<'static, crate::error::ConnectorResult<SourceReaderEvent>>;
557/// `StreamChunk` with the latest split state.
558/// The state is constructed in `StreamReaderBuilder::into_retry_stream`
559pub type StreamChunkWithState = (StreamChunk, HashMap<SplitId, SplitImpl>);
560/// See [`StreamChunkWithState`].
561pub type BoxSourceChunkWithStateStream =
562    BoxStream<'static, crate::error::ConnectorResult<StreamChunkWithState>>;
563
564#[derive(Debug)]
565pub enum SourceMessageEvent {
566    Data(Vec<SourceMessage>),
567    SplitProgress(HashMap<SplitId, String>),
568}
569
570#[derive(Debug)]
571pub enum SourceReaderEvent {
572    DataChunk(StreamChunk),
573    SplitProgress(HashMap<SplitId, String>),
574}
575
576/// Stream of [`Option<StreamChunk>`]s parsed from the messages from the external source.
577pub type BoxStreamingFileSourceChunkStream =
578    BoxStream<'static, crate::error::ConnectorResult<Option<StreamChunk>>>;
579
580// Manually expand the trait alias to improve IDE experience.
581pub trait SourceChunkStream:
582    Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
583{
584}
585impl<T> SourceChunkStream for T where
586    T: Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
587{
588}
589
590pub type BoxTryStream<M> = BoxStream<'static, crate::error::ConnectorResult<M>>;
591
592/// [`SplitReader`] is a new abstraction of the external connector read interface which is
593/// responsible for parsing, it is used to read messages from the outside and transform them into a
594/// stream of parsed [`StreamChunk`]
595#[async_trait]
596pub trait SplitReader: Sized + Send {
597    type Properties;
598    type Split: SplitMetaData;
599
600    async fn new(
601        properties: Self::Properties,
602        state: Vec<Self::Split>,
603        parser_config: ParserConfig,
604        source_ctx: SourceContextRef,
605        columns: Option<Vec<Column>>,
606    ) -> crate::error::ConnectorResult<Self>;
607
608    fn into_stream(self) -> BoxSourceChunkStream;
609
610    fn into_event_stream(self) -> BoxSourceReaderEventStream {
611        self.into_stream()
612            .map_ok(SourceReaderEvent::DataChunk)
613            .boxed()
614    }
615
616    fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
617        HashMap::new()
618    }
619
620    async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
621        Err(anyhow!("seek_to_latest is not supported for this connector").into())
622    }
623}
624
625/// Information used to determine whether we should start and finish source backfill.
626///
627/// XXX: if a connector cannot provide the latest offsets (but we want to make it shareable),
628/// perhaps we should ban blocking DDL for it.
629#[derive(Debug, Clone)]
630pub enum BackfillInfo {
631    HasDataToBackfill {
632        /// The last available offsets for each split (**inclusive**).
633        ///
634        /// This will be used to determine whether source backfill is finished when
635        /// there are no _new_ messages coming from upstream `SourceExecutor`. Otherwise,
636        /// blocking DDL cannot finish until new messages come.
637        ///
638        /// When there are upstream messages, we will use the latest offsets from the upstream.
639        latest_offset: String,
640    },
641    /// If there are no messages in the split at all, we don't need to start backfill.
642    /// In this case, there will be no message from the backfill stream too.
643    /// If we started backfill, we cannot finish it until new messages come.
644    /// So we mark this a special case for optimization.
645    NoDataToBackfill,
646}
647
648for_all_sources!(impl_connector_properties);
649
650impl Default for ConnectorProperties {
651    fn default() -> Self {
652        ConnectorProperties::Test(Box::default())
653    }
654}
655
656impl ConnectorProperties {
657    /// Creates typed source properties from the raw `WITH` properties.
658    ///
659    /// It checks the `connector` field, and them dispatches to the corresponding type's `try_from_btreemap` method.
660    ///
661    /// `deny_unknown_fields`: Since `WITH` options are persisted in meta, we do not deny unknown fields when restoring from
662    /// existing data to avoid breaking backwards compatibility. We only deny unknown fields when creating new sources.
663    pub fn extract(
664        with_properties: WithOptionsSecResolved,
665        deny_unknown_fields: bool,
666    ) -> Result<Self> {
667        let (options, secret_refs) = with_properties.into_parts();
668        let mut options_with_secret =
669            LocalSecretManager::global().fill_secrets(options, secret_refs)?;
670        let connector = options_with_secret
671            .remove(UPSTREAM_SOURCE_KEY)
672            .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
673            .to_lowercase();
674        match_source_name_str!(
675            connector.as_str(),
676            PropType,
677            PropType::try_from_btreemap(options_with_secret, deny_unknown_fields)
678                .map(ConnectorProperties::from),
679            |other| bail!("connector '{}' is not supported", other)
680        )
681    }
682
683    pub fn enforce_secret_source(
684        with_properties: &impl WithPropertiesExt,
685    ) -> crate::error::ConnectorResult<()> {
686        let connector = with_properties
687            .get_connector()
688            .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?
689            .to_lowercase();
690        let key_iter = with_properties.key_iter();
691        match_source_name_str!(
692            connector.as_str(),
693            PropType,
694            PropType::enforce_secret(key_iter),
695            |other| bail!("connector '{}' is not supported", other)
696        )
697    }
698
699    pub fn enable_drop_split(&self) -> bool {
700        // enable split scale in just for Kinesis
701        matches!(
702            self,
703            ConnectorProperties::Kinesis(_)
704                | ConnectorProperties::Nats(_)
705                | ConnectorProperties::GooglePubsub(_)
706        )
707    }
708
709    /// For most connectors, this should be false. When enabled, RisingWave should not track any progress.
710    pub fn enable_adaptive_splits(&self) -> bool {
711        matches!(
712            self,
713            ConnectorProperties::Nats(_) | ConnectorProperties::GooglePubsub(_)
714        )
715    }
716
717    /// Load additional info from `PbSource`. Currently only used by CDC.
718    pub fn init_from_pb_source(&mut self, source: &PbSource) {
719        dispatch_source_prop!(self, |prop| prop.init_from_pb_source(source))
720    }
721
722    /// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
723    pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
724        dispatch_source_prop!(self, |prop| prop
725            .init_from_pb_cdc_table_desc(cdc_table_desc))
726    }
727
728    pub fn support_multiple_splits(&self) -> bool {
729        matches!(self, ConnectorProperties::Kafka(_))
730            || matches!(self, ConnectorProperties::OpendalS3(_))
731            || matches!(self, ConnectorProperties::Gcs(_))
732            || matches!(self, ConnectorProperties::Azblob(_))
733    }
734
735    pub async fn create_split_enumerator(
736        self,
737        context: crate::source::base::SourceEnumeratorContextRef,
738    ) -> crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
739        let enumerator: Box<dyn AnySplitEnumerator> = dispatch_source_prop!(self, |prop| Box::new(
740            <PropType as SourceProperties>::SplitEnumerator::new(*prop, context).await?
741        ));
742        Ok(enumerator)
743    }
744
745    pub async fn create_split_reader(
746        self,
747        splits: Vec<SplitImpl>,
748        parser_config: ParserConfig,
749        source_ctx: SourceContextRef,
750        columns: Option<Vec<Column>>,
751        mut opt: crate::source::CreateSplitReaderOpt,
752    ) -> Result<(
753        BoxSourceReaderEventStream,
754        crate::source::CreateSplitReaderResult,
755    )> {
756        opt.support_multiple_splits = self.support_multiple_splits();
757        tracing::debug!(
758            ?splits,
759            support_multiple_splits = opt.support_multiple_splits,
760            "spawning connector split reader",
761        );
762
763        dispatch_source_prop!(self, |prop| create_split_readers(
764            *prop,
765            splits,
766            parser_config,
767            source_ctx,
768            columns,
769            opt
770        )
771        .await)
772    }
773}
774
775for_all_sources!(impl_split);
776for_all_connections!(impl_connection);
777
778impl From<&SplitImpl> for ConnectorSplit {
779    fn from(split: &SplitImpl) -> Self {
780        dispatch_split_impl!(split, |inner| {
781            ConnectorSplit {
782                split_type: String::from(PropType::SOURCE_NAME),
783                encoded_split: inner.encode_to_bytes().to_vec(),
784            }
785        })
786    }
787}
788
789impl TryFrom<&ConnectorSplit> for SplitImpl {
790    type Error = crate::error::ConnectorError;
791
792    fn try_from(split: &ConnectorSplit) -> std::result::Result<Self, Self::Error> {
793        let split_type = split.split_type.to_lowercase();
794        match_source_name_str!(
795            split_type.as_str(),
796            PropType,
797            {
798                <PropType as SourceProperties>::Split::restore_from_bytes(
799                    split.encoded_split.as_ref(),
800                )
801                .map(Into::into)
802            },
803            |other| bail!("connector '{}' is not supported", other)
804        )
805    }
806}
807
808impl SplitImpl {
809    fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
810        let split_type = split_type.to_lowercase();
811        match_source_name_str!(
812            split_type.as_str(),
813            PropType,
814            <PropType as SourceProperties>::Split::restore_from_json(value).map(Into::into),
815            |other| bail!("connector '{}' is not supported", other)
816        )
817    }
818
819    pub fn is_cdc_split(&self) -> bool {
820        matches!(
821            self,
822            MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) | SqlServerCdc(_)
823        )
824    }
825
826    /// Get the current split offset.
827    pub fn get_cdc_split_offset(&self) -> String {
828        match self {
829            MysqlCdc(split) => split.start_offset().clone().unwrap_or_default(),
830            PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(),
831            MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(),
832            CitusCdc(split) => split.start_offset().clone().unwrap_or_default(),
833            SqlServerCdc(split) => split.start_offset().clone().unwrap_or_default(),
834            _ => unreachable!("get_cdc_split_offset() is only for cdc split"),
835        }
836    }
837
838    pub fn into_batch_split(self) -> Option<BatchSourceSplitImpl> {
839        match self {
840            SplitImpl::BatchPosixFs(batch_posix_fs_split) => {
841                Some(BatchSourceSplitImpl::BatchPosixFs(batch_posix_fs_split))
842            }
843            _ => None,
844        }
845    }
846}
847
848impl SplitMetaData for SplitImpl {
849    fn id(&self) -> SplitId {
850        dispatch_split_impl!(self, |inner| inner.id())
851    }
852
853    fn encode_to_json(&self) -> JsonbVal {
854        use serde_json::json;
855        let inner = self.encode_to_json_inner().take();
856        json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
857    }
858
859    fn restore_from_json(value: JsonbVal) -> Result<Self> {
860        let mut value = value.take();
861        let json_obj = value.as_object_mut().unwrap();
862        let split_type = json_obj
863            .remove(SPLIT_TYPE_FIELD)
864            .unwrap()
865            .as_str()
866            .unwrap()
867            .to_owned();
868        let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
869        Self::restore_from_json_inner(&split_type, inner_value.into())
870    }
871
872    fn update_offset(&mut self, last_seen_offset: String) -> Result<()> {
873        dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset))
874    }
875}
876
877impl SplitImpl {
878    pub fn get_type(&self) -> String {
879        dispatch_split_impl!(self, |_inner| PropType::SOURCE_NAME.to_owned())
880    }
881
882    pub fn update_in_place(&mut self, last_seen_offset: String) -> Result<()> {
883        dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset)?);
884        Ok(())
885    }
886
887    pub fn encode_to_json_inner(&self) -> JsonbVal {
888        dispatch_split_impl!(self, |inner| inner.encode_to_json())
889    }
890}
891
892use risingwave_common::types::DataType;
893
894#[derive(Clone, Debug)]
895pub struct Column {
896    pub name: String,
897    pub data_type: DataType,
898    /// This field is only used by datagen.
899    pub is_visible: bool,
900}
901
902/// Split id resides in every source message, use `Arc` to avoid copying.
903pub type SplitId = Arc<str>;
904
905/// The message pumped from the external source service.
906/// The third-party message structs will eventually be transformed into this struct.
907#[derive(Debug, Clone)]
908pub struct SourceMessage {
909    pub key: Option<Vec<u8>>,
910    pub payload: Option<Vec<u8>>,
911    pub offset: String, // TODO: use `Arc<str>`
912    pub split_id: SplitId,
913    pub meta: SourceMeta,
914}
915
916impl SourceMessage {
917    /// Create a dummy `SourceMessage` with all fields unset for testing purposes.
918    pub fn dummy() -> Self {
919        Self {
920            key: None,
921            payload: None,
922            offset: "".to_owned(),
923            split_id: "".into(),
924            meta: SourceMeta::Empty,
925        }
926    }
927
928    /// Check whether the source message is a CDC heartbeat message.
929    pub fn is_cdc_heartbeat(&self) -> bool {
930        self.key.is_none() && self.payload.is_none()
931    }
932}
933
934#[derive(Debug, Clone)]
935pub enum SourceMeta {
936    Kafka(KafkaMeta),
937    Kinesis(KinesisMeta),
938    Pulsar(PulsarMeta),
939    Nexmark(NexmarkMeta),
940    GooglePubsub(GooglePubsubMeta),
941    Datagen(DatagenMeta),
942    DebeziumCdc(DebeziumCdcMeta),
943    Nats(NatsMeta),
944    // For the source that doesn't have meta data.
945    Empty,
946}
947
948/// Implement Eq manually to ignore the `meta` field.
949impl PartialEq for SourceMessage {
950    fn eq(&self, other: &Self) -> bool {
951        self.offset == other.offset
952            && self.split_id == other.split_id
953            && self.payload == other.payload
954    }
955}
956impl Eq for SourceMessage {}
957
958/// The metadata of a split.
959pub trait SplitMetaData: Sized {
960    fn id(&self) -> SplitId;
961    fn encode_to_bytes(&self) -> Bytes {
962        self.encode_to_json()
963            .as_scalar_ref()
964            .value_serialize()
965            .into()
966    }
967    fn restore_from_bytes(bytes: &[u8]) -> Result<Self> {
968        Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap())
969    }
970
971    /// Encode the whole split metadata to a JSON object
972    fn encode_to_json(&self) -> JsonbVal;
973    fn restore_from_json(value: JsonbVal) -> Result<Self>;
974    fn update_offset(&mut self, last_seen_offset: String) -> crate::error::ConnectorResult<()>;
975}
976
977/// [`ConnectorState`] maintains the consuming splits' info. In specific split readers,
978/// `ConnectorState` cannot be [`None`] and contains one(for mq split readers) or many(for fs
979/// split readers) [`SplitImpl`]. If no split is assigned to source executor, `ConnectorState` is
980/// [`None`] and the created source stream will be a pending stream.
981pub type ConnectorState = Option<Vec<SplitImpl>>;
982
983#[cfg(test)]
984mod tests {
985    use maplit::*;
986    use nexmark::event::EventType;
987
988    use super::*;
989    use crate::source::cdc::{DebeziumCdcSplit, Mysql};
990    use crate::source::kafka::KafkaSplit;
991
992    #[test]
993    fn test_split_impl_get_fn() -> Result<()> {
994        let split = KafkaSplit::new(0, Some(0), Some(0), "demo".to_owned());
995        let split_impl = SplitImpl::Kafka(split.clone());
996        let get_value = split_impl.into_kafka().unwrap();
997        println!("{:?}", get_value);
998        assert_eq!(split.encode_to_bytes(), get_value.encode_to_bytes());
999        assert_eq!(split.encode_to_json(), get_value.encode_to_json());
1000
1001        Ok(())
1002    }
1003
1004    #[test]
1005    fn test_cdc_split_state() -> Result<()> {
1006        let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
1007        let split = DebeziumCdcSplit::<Mysql>::new(1001, Some(offset_str.to_owned()), None);
1008        let split_impl = SplitImpl::MysqlCdc(split);
1009        let encoded_split = split_impl.encode_to_bytes();
1010        let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
1011        assert_eq!(
1012            split_impl.encode_to_bytes(),
1013            restored_split_impl.encode_to_bytes()
1014        );
1015        assert_eq!(
1016            split_impl.encode_to_json(),
1017            restored_split_impl.encode_to_json()
1018        );
1019
1020        let encoded_split = split_impl.encode_to_json();
1021        let restored_split_impl = SplitImpl::restore_from_json(encoded_split)?;
1022        assert_eq!(
1023            split_impl.encode_to_bytes(),
1024            restored_split_impl.encode_to_bytes()
1025        );
1026        assert_eq!(
1027            split_impl.encode_to_json(),
1028            restored_split_impl.encode_to_json()
1029        );
1030        Ok(())
1031    }
1032
1033    #[test]
1034    fn test_extract_nexmark_config() {
1035        let props = convert_args!(btreemap!(
1036            "connector" => "nexmark",
1037            "nexmark.table.type" => "Person",
1038            "nexmark.split.num" => "1",
1039        ));
1040
1041        let props =
1042            ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
1043                .unwrap();
1044
1045        if let ConnectorProperties::Nexmark(props) = props {
1046            assert_eq!(props.table_type, Some(EventType::Person));
1047            assert_eq!(props.split_num, 1);
1048        } else {
1049            panic!("extract nexmark config failed");
1050        }
1051    }
1052
1053    #[test]
1054    fn test_extract_kafka_config() {
1055        let props = convert_args!(btreemap!(
1056            "connector" => "kafka",
1057            "properties.bootstrap.server" => "b1,b2",
1058            "topic" => "test",
1059            "scan.startup.mode" => "earliest",
1060            "broker.rewrite.endpoints" => r#"{"b-1:9092":"dns-1", "b-2:9092":"dns-2"}"#,
1061        ));
1062
1063        let props =
1064            ConnectorProperties::extract(WithOptionsSecResolved::without_secrets(props), true)
1065                .unwrap();
1066        if let ConnectorProperties::Kafka(k) = props {
1067            let btreemap = btreemap! {
1068                "b-1:9092".to_owned() => "dns-1".to_owned(),
1069                "b-2:9092".to_owned() => "dns-2".to_owned(),
1070            };
1071            assert_eq!(k.privatelink_common.broker_rewrite_map, Some(btreemap));
1072        } else {
1073            panic!("extract kafka config failed");
1074        }
1075    }
1076
1077    #[test]
1078    fn test_extract_cdc_properties() {
1079        let user_props_mysql = convert_args!(btreemap!(
1080            "connector" => "mysql-cdc",
1081            "database.hostname" => "127.0.0.1",
1082            "database.port" => "3306",
1083            "database.user" => "root",
1084            "database.password" => "123456",
1085            "database.name" => "mydb",
1086            "table.name" => "products",
1087        ));
1088
1089        let user_props_postgres = convert_args!(btreemap!(
1090            "connector" => "postgres-cdc",
1091            "database.hostname" => "127.0.0.1",
1092            "database.port" => "5432",
1093            "database.user" => "root",
1094            "database.password" => "654321",
1095            "schema.name" => "public",
1096            "database.name" => "mypgdb",
1097            "table.name" => "orders",
1098        ));
1099
1100        let conn_props = ConnectorProperties::extract(
1101            WithOptionsSecResolved::without_secrets(user_props_mysql),
1102            true,
1103        )
1104        .unwrap();
1105        if let ConnectorProperties::MysqlCdc(c) = conn_props {
1106            assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
1107            assert_eq!(c.properties.get("database.port").unwrap(), "3306");
1108            assert_eq!(c.properties.get("database.user").unwrap(), "root");
1109            assert_eq!(c.properties.get("database.password").unwrap(), "123456");
1110            assert_eq!(c.properties.get("database.name").unwrap(), "mydb");
1111            assert_eq!(c.properties.get("table.name").unwrap(), "products");
1112        } else {
1113            panic!("extract cdc config failed");
1114        }
1115
1116        let conn_props = ConnectorProperties::extract(
1117            WithOptionsSecResolved::without_secrets(user_props_postgres),
1118            true,
1119        )
1120        .unwrap();
1121        if let ConnectorProperties::PostgresCdc(c) = conn_props {
1122            assert_eq!(c.properties.get("database.hostname").unwrap(), "127.0.0.1");
1123            assert_eq!(c.properties.get("database.port").unwrap(), "5432");
1124            assert_eq!(c.properties.get("database.user").unwrap(), "root");
1125            assert_eq!(c.properties.get("database.password").unwrap(), "654321");
1126            assert_eq!(c.properties.get("schema.name").unwrap(), "public");
1127            assert_eq!(c.properties.get("database.name").unwrap(), "mypgdb");
1128            assert_eq!(c.properties.get("table.name").unwrap(), "orders");
1129        } else {
1130            panic!("extract cdc config failed");
1131        }
1132    }
1133}