risingwave_connector/source/
mod.rs1pub mod prelude {
16 pub use crate::source::datagen::DatagenSplitEnumerator;
18 pub use crate::source::filesystem::LegacyS3SplitEnumerator;
19 pub use crate::source::filesystem::opendal_source::OpendalEnumerator;
20 pub use crate::source::google_pubsub::PubsubSplitEnumerator as GooglePubsubSplitEnumerator;
21 pub use crate::source::iceberg::IcebergSplitEnumerator;
22 pub use crate::source::kafka::KafkaSplitEnumerator;
23 pub use crate::source::kinesis::KinesisSplitEnumerator;
24 pub use crate::source::mqtt::MqttSplitEnumerator;
25 pub use crate::source::nats::NatsSplitEnumerator;
26 pub use crate::source::nexmark::NexmarkSplitEnumerator;
27 pub use crate::source::pulsar::PulsarSplitEnumerator;
28 pub use crate::source::test_source::TestSourceSplitEnumerator as TestSplitEnumerator;
29 pub type AzblobSplitEnumerator =
30 OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalAzblob>;
31 pub type GcsSplitEnumerator =
32 OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalGcs>;
33 pub type OpendalS3SplitEnumerator =
34 OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalS3>;
35 pub type PosixFsSplitEnumerator =
36 OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalPosixFs>;
37 pub use crate::source::cdc::enumerator::DebeziumSplitEnumerator;
38 pub use crate::source::filesystem::opendal_source::BatchPosixFsEnumerator as BatchPosixFsSplitEnumerator;
39 pub type CitusCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Citus>;
40 pub type MongodbCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Mongodb>;
41 pub type PostgresCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Postgres>;
42 pub type MysqlCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Mysql>;
43 pub type SqlServerCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::SqlServer>;
44}
45
46pub mod base;
47pub mod batch;
48pub mod cdc;
49pub mod data_gen_util;
50pub mod datagen;
51pub mod filesystem;
52pub mod google_pubsub;
53pub mod kafka;
54pub mod kinesis;
55pub mod monitor;
56pub mod mqtt;
57pub mod nats;
58pub mod nexmark;
59pub mod pulsar;
60
61mod util;
62use std::future::IntoFuture;
63
64pub use base::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR, *};
65pub use batch::BatchSourceSplitImpl;
66pub(crate) use common::*;
67use google_cloud_pubsub::subscription::Subscription;
68pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR;
69pub use kafka::KAFKA_CONNECTOR;
70pub use kinesis::KINESIS_CONNECTOR;
71pub use mqtt::MQTT_CONNECTOR;
72pub use nats::NATS_CONNECTOR;
73mod common;
74pub mod iceberg;
75mod manager;
76pub mod reader;
77pub mod test_source;
78
79use async_nats::jetstream::consumer::AckPolicy as JetStreamAckPolicy;
80use async_nats::jetstream::context::Context as JetStreamContext;
81pub use manager::{SourceColumnDesc, SourceColumnType};
82use risingwave_common::array::{Array, ArrayRef};
83use risingwave_common::row::OwnedRow;
84use thiserror_ext::AsReport;
85pub use util::fill_adaptive_split;
86
87pub use crate::source::filesystem::LEGACY_S3_CONNECTOR;
88pub use crate::source::filesystem::opendal_source::{
89 AZBLOB_CONNECTOR, BATCH_POSIX_FS_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR,
90 POSIX_FS_CONNECTOR,
91};
92pub use crate::source::nexmark::NEXMARK_CONNECTOR;
93pub use crate::source::pulsar::PULSAR_CONNECTOR;
94
95pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool {
96 const PREFIXES: &[&str] = &[
97 "schema.registry",
98 "schema.location",
99 "message",
100 "key.message",
101 "without_header",
102 "delimiter",
103 "region",
105 "endpoint_url",
106 "access_key",
107 "secret_key",
108 "session_token",
109 "arn",
110 "external_id",
111 "profile",
112 ];
113 PREFIXES.iter().any(|prefix| key.starts_with(prefix))
114 || (key == "endpoint" && !connector.eq_ignore_ascii_case(KINESIS_CONNECTOR))
115}
116
117pub enum WaitCheckpointTask {
119 CommitCdcOffset(Option<(SplitId, String)>),
120 AckPubsubMessage(Subscription, Vec<ArrayRef>),
121 AckNatsJetStream(JetStreamContext, Vec<ArrayRef>, JetStreamAckPolicy),
122}
123
124impl WaitCheckpointTask {
125 pub async fn run(self) {
126 use std::str::FromStr;
127 match self {
128 WaitCheckpointTask::CommitCdcOffset(updated_offset) => {
129 if let Some((split_id, offset)) = updated_offset {
130 let source_id: u64 = u64::from_str(split_id.as_ref()).unwrap();
131 match cdc::jni_source::commit_cdc_offset(source_id, offset.clone()) {
133 Ok(()) => {}
134 Err(e) => {
135 tracing::error!(
136 error = %e.as_report(),
137 "source#{source_id}: failed to commit cdc offset: {offset}.",
138 )
139 }
140 }
141 }
142 }
143 WaitCheckpointTask::AckPubsubMessage(subscription, ack_id_arrs) => {
144 async fn ack(subscription: &Subscription, ack_ids: Vec<String>) {
145 tracing::trace!("acking pubsub messages {:?}", ack_ids);
146 match subscription.ack(ack_ids).await {
147 Ok(()) => {}
148 Err(e) => {
149 tracing::error!(
150 error = %e.as_report(),
151 "failed to ack pubsub messages",
152 )
153 }
154 }
155 }
156 const MAX_ACK_BATCH_SIZE: usize = 1000;
157 let mut ack_ids: Vec<String> = vec![];
158 for arr in ack_id_arrs {
159 for ack_id in arr.as_utf8().iter().flatten() {
160 ack_ids.push(ack_id.to_owned());
161 if ack_ids.len() >= MAX_ACK_BATCH_SIZE {
162 ack(&subscription, std::mem::take(&mut ack_ids)).await;
163 }
164 }
165 }
166 ack(&subscription, ack_ids).await;
167 }
168 WaitCheckpointTask::AckNatsJetStream(
169 ref context,
170 reply_subjects_arrs,
171 ref ack_policy,
172 ) => {
173 async fn ack(context: &JetStreamContext, reply_subject: String) {
174 match context.publish(reply_subject.clone(), "+ACK".into()).await {
175 Err(e) => {
176 tracing::error!(error = %e.as_report(), subject = ?reply_subject, "failed to ack NATS JetStream message");
177 }
178 Ok(ack_future) => {
179 let _ = ack_future.into_future().await;
180 }
181 }
182 }
183
184 let reply_subjects = reply_subjects_arrs
185 .iter()
186 .flat_map(|arr| {
187 arr.as_utf8()
188 .iter()
189 .flatten()
190 .map(|s| s.to_owned())
191 .collect::<Vec<String>>()
192 })
193 .collect::<Vec<String>>();
194
195 match ack_policy {
196 JetStreamAckPolicy::None => (),
197 JetStreamAckPolicy::Explicit => {
198 for reply_subject in reply_subjects {
199 if reply_subject.is_empty() {
200 continue;
201 }
202 ack(context, reply_subject).await;
203 }
204 }
205 JetStreamAckPolicy::All => {
206 if let Some(reply_subject) = reply_subjects.last() {
207 ack(context, reply_subject.clone()).await;
208 }
209 }
210 }
211 }
212 }
213 }
214}
215
216#[derive(Clone, Debug, PartialEq)]
217pub struct CdcTableSnapshotSplitCommon<T: Clone> {
218 pub split_id: i64,
219 pub left_bound_inclusive: T,
220 pub right_bound_exclusive: T,
221}
222
223pub type CdcTableSnapshotSplit = CdcTableSnapshotSplitCommon<OwnedRow>;
224pub type CdcTableSnapshotSplitRaw = CdcTableSnapshotSplitCommon<Vec<u8>>;