risingwave_connector/source/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod prelude {
16    // import all split enumerators
17    pub use crate::source::datagen::DatagenSplitEnumerator;
18    pub use crate::source::filesystem::LegacyS3SplitEnumerator;
19    pub use crate::source::filesystem::opendal_source::OpendalEnumerator;
20    pub use crate::source::google_pubsub::PubsubSplitEnumerator as GooglePubsubSplitEnumerator;
21    pub use crate::source::iceberg::IcebergSplitEnumerator;
22    pub use crate::source::kafka::KafkaSplitEnumerator;
23    pub use crate::source::kinesis::KinesisSplitEnumerator;
24    pub use crate::source::mqtt::MqttSplitEnumerator;
25    pub use crate::source::nats::NatsSplitEnumerator;
26    pub use crate::source::nexmark::NexmarkSplitEnumerator;
27    pub use crate::source::pulsar::PulsarSplitEnumerator;
28    pub use crate::source::test_source::TestSourceSplitEnumerator as TestSplitEnumerator;
29    pub type AzblobSplitEnumerator =
30        OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalAzblob>;
31    pub type GcsSplitEnumerator =
32        OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalGcs>;
33    pub type OpendalS3SplitEnumerator =
34        OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalS3>;
35    pub type PosixFsSplitEnumerator =
36        OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalPosixFs>;
37    pub use crate::source::cdc::enumerator::DebeziumSplitEnumerator;
38    pub use crate::source::filesystem::opendal_source::BatchPosixFsEnumerator as BatchPosixFsSplitEnumerator;
39    pub type CitusCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Citus>;
40    pub type MongodbCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Mongodb>;
41    pub type PostgresCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Postgres>;
42    pub type MysqlCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Mysql>;
43    pub type SqlServerCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::SqlServer>;
44}
45
46pub mod base;
47pub mod batch;
48pub mod cdc;
49pub mod data_gen_util;
50pub mod datagen;
51pub mod filesystem;
52pub mod google_pubsub;
53pub mod kafka;
54pub mod kinesis;
55pub mod monitor;
56pub mod mqtt;
57pub mod nats;
58pub mod nexmark;
59pub mod pulsar;
60
61mod util;
62use std::future::IntoFuture;
63
64pub use base::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR, *};
65pub use batch::BatchSourceSplitImpl;
66pub(crate) use common::*;
67use google_cloud_pubsub::subscription::Subscription;
68pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR;
69pub use kafka::KAFKA_CONNECTOR;
70pub use kinesis::KINESIS_CONNECTOR;
71pub use mqtt::MQTT_CONNECTOR;
72pub use nats::NATS_CONNECTOR;
73use risingwave_common::catalog::TableId;
74mod common;
75pub mod iceberg;
76mod manager;
77pub mod reader;
78pub mod test_source;
79
80use async_nats::jetstream::consumer::AckPolicy as JetStreamAckPolicy;
81use async_nats::jetstream::context::Context as JetStreamContext;
82pub use manager::{SourceColumnDesc, SourceColumnType};
83use risingwave_common::array::{Array, ArrayRef};
84use risingwave_common::row::OwnedRow;
85use thiserror_ext::AsReport;
86pub use util::fill_adaptive_split;
87
88pub use crate::source::filesystem::LEGACY_S3_CONNECTOR;
89pub use crate::source::filesystem::opendal_source::{
90    AZBLOB_CONNECTOR, BATCH_POSIX_FS_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR,
91    POSIX_FS_CONNECTOR,
92};
93pub use crate::source::nexmark::NEXMARK_CONNECTOR;
94pub use crate::source::pulsar::PULSAR_CONNECTOR;
95use crate::source::pulsar::source::reader::PULSAR_ACK_CHANNEL;
96
97pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool {
98    const PREFIXES: &[&str] = &[
99        "schema.registry",
100        "schema.location",
101        "message",
102        "key.message",
103        "without_header",
104        "delimiter",
105        // AwsAuthProps
106        "region",
107        "endpoint_url",
108        "access_key",
109        "secret_key",
110        "session_token",
111        "arn",
112        "external_id",
113        "profile",
114    ];
115    PREFIXES.iter().any(|prefix| key.starts_with(prefix))
116        || (key == "endpoint" && !connector.eq_ignore_ascii_case(KINESIS_CONNECTOR))
117}
118
119/// Tasks executed by `WaitCheckpointWorker`
120pub enum WaitCheckpointTask {
121    CommitCdcOffset(Option<(SplitId, String)>),
122    AckPubsubMessage(Subscription, Vec<ArrayRef>),
123    AckNatsJetStream(JetStreamContext, Vec<ArrayRef>, JetStreamAckPolicy),
124    AckPulsarMessage(Vec<(String, ArrayRef)>),
125}
126
127impl WaitCheckpointTask {
128    pub async fn run(self) {
129        self.run_with_on_commit_success(|_source_id, _offset| {
130            // Default implementation: no action on commit success
131        })
132        .await;
133    }
134
135    pub async fn run_with_on_commit_success<F>(self, mut on_commit_success: F)
136    where
137        F: FnMut(u64, &str),
138    {
139        use std::str::FromStr;
140        match self {
141            WaitCheckpointTask::CommitCdcOffset(updated_offset) => {
142                if let Some((split_id, offset)) = updated_offset {
143                    let source_id: u64 = u64::from_str(split_id.as_ref()).unwrap();
144                    // notify cdc connector to commit offset
145                    match cdc::jni_source::commit_cdc_offset(source_id, offset.clone()) {
146                        Ok(()) => {
147                            // Execute callback after successful commit
148                            on_commit_success(source_id, &offset);
149                        }
150                        Err(e) => {
151                            tracing::error!(
152                                error = %e.as_report(),
153                                "source#{source_id}: failed to commit cdc offset: {offset}.",
154                            )
155                        }
156                    }
157                }
158            }
159            WaitCheckpointTask::AckPulsarMessage(ack_array) => {
160                if let Some((ack_channel_id, to_cumulative_ack)) = ack_array.last() {
161                    let encode_message_id_data = to_cumulative_ack
162                        .as_bytea()
163                        .iter()
164                        .last()
165                        .flatten()
166                        .map(|x| x.to_owned())
167                        .unwrap();
168                    if let Some(ack_tx) = PULSAR_ACK_CHANNEL.get(ack_channel_id).await {
169                        let _ = ack_tx.send(encode_message_id_data);
170                    }
171                }
172            }
173            WaitCheckpointTask::AckPubsubMessage(subscription, ack_id_arrs) => {
174                async fn ack(subscription: &Subscription, ack_ids: Vec<String>) {
175                    tracing::trace!("acking pubsub messages {:?}", ack_ids);
176                    match subscription.ack(ack_ids).await {
177                        Ok(()) => {}
178                        Err(e) => {
179                            tracing::error!(
180                                error = %e.as_report(),
181                                "failed to ack pubsub messages",
182                            )
183                        }
184                    }
185                }
186                const MAX_ACK_BATCH_SIZE: usize = 1000;
187                let mut ack_ids: Vec<String> = vec![];
188                for arr in ack_id_arrs {
189                    for ack_id in arr.as_utf8().iter().flatten() {
190                        ack_ids.push(ack_id.to_owned());
191                        if ack_ids.len() >= MAX_ACK_BATCH_SIZE {
192                            ack(&subscription, std::mem::take(&mut ack_ids)).await;
193                        }
194                    }
195                }
196                ack(&subscription, ack_ids).await;
197            }
198            WaitCheckpointTask::AckNatsJetStream(
199                ref context,
200                reply_subjects_arrs,
201                ref ack_policy,
202            ) => {
203                async fn ack(context: &JetStreamContext, reply_subject: String) {
204                    match context.publish(reply_subject.clone(), "+ACK".into()).await {
205                        Err(e) => {
206                            tracing::error!(error = %e.as_report(), subject = ?reply_subject, "failed to ack NATS JetStream message");
207                        }
208                        Ok(ack_future) => {
209                            let _ = ack_future.into_future().await;
210                        }
211                    }
212                }
213
214                let reply_subjects = reply_subjects_arrs
215                    .iter()
216                    .flat_map(|arr| {
217                        arr.as_utf8()
218                            .iter()
219                            .flatten()
220                            .map(|s| s.to_owned())
221                            .collect::<Vec<String>>()
222                    })
223                    .collect::<Vec<String>>();
224
225                match ack_policy {
226                    JetStreamAckPolicy::None => (),
227                    JetStreamAckPolicy::Explicit => {
228                        for reply_subject in reply_subjects {
229                            if reply_subject.is_empty() {
230                                continue;
231                            }
232                            ack(context, reply_subject).await;
233                        }
234                    }
235                    JetStreamAckPolicy::All => {
236                        if let Some(reply_subject) = reply_subjects.last() {
237                            ack(context, reply_subject.clone()).await;
238                        }
239                    }
240                }
241            }
242        }
243    }
244}
245
246#[derive(Clone, Debug, PartialEq)]
247pub struct CdcTableSnapshotSplitCommon<T: Clone> {
248    pub split_id: i64,
249    pub left_bound_inclusive: T,
250    pub right_bound_exclusive: T,
251}
252
253pub type CdcTableSnapshotSplit = CdcTableSnapshotSplitCommon<OwnedRow>;
254pub type CdcTableSnapshotSplitRaw = CdcTableSnapshotSplitCommon<Vec<u8>>;
255
256#[inline]
257pub fn build_pulsar_ack_channel_id(source_id: &TableId, split_id: &SplitId) -> String {
258    format!("{}-{}", source_id, split_id)
259}