risingwave_connector/source/
mod.rs1pub mod prelude {
16 pub use crate::source::datagen::DatagenSplitEnumerator;
18 pub use crate::source::filesystem::LegacyS3SplitEnumerator;
19 pub use crate::source::filesystem::opendal_source::OpendalEnumerator;
20 pub use crate::source::google_pubsub::PubsubSplitEnumerator as GooglePubsubSplitEnumerator;
21 pub use crate::source::iceberg::IcebergSplitEnumerator;
22 pub use crate::source::kafka::KafkaSplitEnumerator;
23 pub use crate::source::kinesis::KinesisSplitEnumerator;
24 pub use crate::source::mqtt::MqttSplitEnumerator;
25 pub use crate::source::nats::NatsSplitEnumerator;
26 pub use crate::source::nexmark::NexmarkSplitEnumerator;
27 pub use crate::source::pulsar::PulsarSplitEnumerator;
28 pub use crate::source::test_source::TestSourceSplitEnumerator as TestSplitEnumerator;
29 pub type AzblobSplitEnumerator =
30 OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalAzblob>;
31 pub type GcsSplitEnumerator =
32 OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalGcs>;
33 pub type OpendalS3SplitEnumerator =
34 OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalS3>;
35 pub type PosixFsSplitEnumerator =
36 OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalPosixFs>;
37 pub use crate::source::cdc::enumerator::DebeziumSplitEnumerator;
38 pub use crate::source::filesystem::opendal_source::BatchPosixFsEnumerator as BatchPosixFsSplitEnumerator;
39 pub type CitusCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Citus>;
40 pub type MongodbCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Mongodb>;
41 pub type PostgresCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Postgres>;
42 pub type MysqlCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Mysql>;
43 pub type SqlServerCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::SqlServer>;
44}
45
46pub mod base;
47pub mod batch;
48pub mod cdc;
49pub mod data_gen_util;
50pub mod datagen;
51pub mod filesystem;
52pub mod google_pubsub;
53pub mod kafka;
54pub mod kinesis;
55pub mod monitor;
56pub mod mqtt;
57pub mod nats;
58pub mod nexmark;
59pub mod pulsar;
60pub mod utils;
61
62mod util;
63use std::future::IntoFuture;
64
65pub use base::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR, *};
66pub use batch::BatchSourceSplitImpl;
67pub(crate) use common::*;
68use google_cloud_pubsub::subscription::Subscription;
69pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR;
70pub use kafka::KAFKA_CONNECTOR;
71pub use kinesis::KINESIS_CONNECTOR;
72pub use mqtt::MQTT_CONNECTOR;
73pub use nats::NATS_CONNECTOR;
74use utils::feature_gated_source_mod;
75
76pub use self::adbc_snowflake::ADBC_SNOWFLAKE_CONNECTOR;
77mod common;
78pub mod iceberg;
79mod manager;
80pub mod reader;
81pub mod test_source;
82feature_gated_source_mod!(adbc_snowflake, "adbc_snowflake");
83
84use async_nats::jetstream::consumer::AckPolicy as JetStreamAckPolicy;
85use async_nats::jetstream::context::Context as JetStreamContext;
86pub use manager::{SourceColumnDesc, SourceColumnType};
87use risingwave_common::array::{Array, ArrayRef};
88use risingwave_common::row::OwnedRow;
89use risingwave_pb::id::SourceId;
90use thiserror_ext::AsReport;
91pub use util::fill_adaptive_split;
92
93pub use crate::source::filesystem::LEGACY_S3_CONNECTOR;
94pub use crate::source::filesystem::opendal_source::{
95 AZBLOB_CONNECTOR, BATCH_POSIX_FS_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR,
96 POSIX_FS_CONNECTOR,
97};
98pub use crate::source::nexmark::NEXMARK_CONNECTOR;
99pub use crate::source::pulsar::PULSAR_CONNECTOR;
100use crate::source::pulsar::source::reader::PULSAR_ACK_CHANNEL;
101
102pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool {
103 const PREFIXES: &[&str] = &[
104 "schema.registry",
105 "schema.location",
106 "message",
107 "key.message",
108 "without_header",
109 "delimiter",
110 "region",
112 "endpoint_url",
113 "access_key",
114 "secret_key",
115 "session_token",
116 "arn",
117 "external_id",
118 "profile",
119 ];
120 PREFIXES.iter().any(|prefix| key.starts_with(prefix))
121 || (key == "endpoint" && !connector.eq_ignore_ascii_case(KINESIS_CONNECTOR))
122}
123
124pub enum WaitCheckpointTask {
126 CommitCdcOffset(Option<(SplitId, String)>),
127 AckPubsubMessage(Subscription, Vec<ArrayRef>),
128 AckNatsJetStream(JetStreamContext, Vec<ArrayRef>, JetStreamAckPolicy),
129 AckPulsarMessage(Vec<(String, ArrayRef)>),
130}
131
132impl WaitCheckpointTask {
133 pub async fn run(self) {
134 self.run_with_on_commit_success(|_source_id, _offset| {
135 })
137 .await;
138 }
139
140 pub async fn run_with_on_commit_success<F>(self, mut on_commit_success: F)
141 where
142 F: FnMut(u64, &str),
143 {
144 use std::str::FromStr;
145 match self {
146 WaitCheckpointTask::CommitCdcOffset(updated_offset) => {
147 if let Some((split_id, offset)) = updated_offset {
148 let source_id: u64 = u64::from_str(split_id.as_ref()).unwrap();
149 match cdc::jni_source::commit_cdc_offset(source_id, offset.clone()) {
151 Ok(()) => {
152 on_commit_success(source_id, &offset);
154 }
155 Err(e) => {
156 tracing::error!(
157 error = %e.as_report(),
158 "source#{source_id}: failed to commit cdc offset: {offset}.",
159 )
160 }
161 }
162 }
163 }
164 WaitCheckpointTask::AckPulsarMessage(ack_array) => {
165 if let Some((ack_channel_id, to_cumulative_ack)) = ack_array.last() {
166 let encode_message_id_data = to_cumulative_ack
167 .as_bytea()
168 .iter()
169 .last()
170 .flatten()
171 .map(|x| x.to_owned())
172 .unwrap();
173 if let Some(ack_tx) = PULSAR_ACK_CHANNEL.get(ack_channel_id).await {
174 let _ = ack_tx.send(encode_message_id_data);
175 }
176 }
177 }
178 WaitCheckpointTask::AckPubsubMessage(subscription, ack_id_arrs) => {
179 async fn ack(subscription: &Subscription, ack_ids: Vec<String>) {
180 tracing::trace!("acking pubsub messages {:?}", ack_ids);
181 match subscription.ack(ack_ids).await {
182 Ok(()) => {}
183 Err(e) => {
184 tracing::error!(
185 error = %e.as_report(),
186 "failed to ack pubsub messages",
187 )
188 }
189 }
190 }
191 const MAX_ACK_BATCH_SIZE: usize = 1000;
192 let mut ack_ids: Vec<String> = vec![];
193 for arr in ack_id_arrs {
194 for ack_id in arr.as_utf8().iter().flatten() {
195 ack_ids.push(ack_id.to_owned());
196 if ack_ids.len() >= MAX_ACK_BATCH_SIZE {
197 ack(&subscription, std::mem::take(&mut ack_ids)).await;
198 }
199 }
200 }
201 ack(&subscription, ack_ids).await;
202 }
203 WaitCheckpointTask::AckNatsJetStream(
204 ref context,
205 reply_subjects_arrs,
206 ref ack_policy,
207 ) => {
208 async fn ack(context: &JetStreamContext, reply_subject: String) {
209 match context.publish(reply_subject.clone(), "+ACK".into()).await {
210 Err(e) => {
211 tracing::error!(error = %e.as_report(), subject = ?reply_subject, "failed to ack NATS JetStream message");
212 }
213 Ok(ack_future) => {
214 let _ = ack_future.into_future().await;
215 }
216 }
217 }
218
219 let reply_subjects = reply_subjects_arrs
220 .iter()
221 .flat_map(|arr| {
222 arr.as_utf8()
223 .iter()
224 .flatten()
225 .map(|s| s.to_owned())
226 .collect::<Vec<String>>()
227 })
228 .collect::<Vec<String>>();
229
230 match ack_policy {
231 JetStreamAckPolicy::None => (),
232 JetStreamAckPolicy::Explicit => {
233 for reply_subject in reply_subjects {
234 if reply_subject.is_empty() {
235 continue;
236 }
237 ack(context, reply_subject).await;
238 }
239 }
240 JetStreamAckPolicy::All => {
241 if let Some(reply_subject) = reply_subjects.last() {
242 ack(context, reply_subject.clone()).await;
243 }
244 }
245 }
246 }
247 }
248 }
249}
250
251#[derive(Clone, Debug, PartialEq)]
252pub struct CdcTableSnapshotSplitCommon<T: Clone> {
253 pub split_id: i64,
254 pub left_bound_inclusive: T,
255 pub right_bound_exclusive: T,
256}
257
258pub type CdcTableSnapshotSplit = CdcTableSnapshotSplitCommon<OwnedRow>;
259pub type CdcTableSnapshotSplitRaw = CdcTableSnapshotSplitCommon<Vec<u8>>;
260
261#[inline]
262pub fn build_pulsar_ack_channel_id(source_id: SourceId, split_id: &SplitId) -> String {
263 format!("{}-{}", source_id, split_id)
264}