risingwave_connector/source/
mod.rs1pub mod prelude {
16 pub use crate::source::datagen::DatagenSplitEnumerator;
18 pub use crate::source::filesystem::LegacyS3SplitEnumerator;
19 pub use crate::source::filesystem::opendal_source::OpendalEnumerator;
20 pub use crate::source::google_pubsub::PubsubSplitEnumerator as GooglePubsubSplitEnumerator;
21 pub use crate::source::iceberg::IcebergSplitEnumerator;
22 pub use crate::source::kafka::KafkaSplitEnumerator;
23 pub use crate::source::kinesis::KinesisSplitEnumerator;
24 pub use crate::source::mqtt::MqttSplitEnumerator;
25 pub use crate::source::nats::NatsSplitEnumerator;
26 pub use crate::source::nexmark::NexmarkSplitEnumerator;
27 pub use crate::source::pulsar::PulsarSplitEnumerator;
28 pub use crate::source::test_source::TestSourceSplitEnumerator as TestSplitEnumerator;
29 pub type AzblobSplitEnumerator =
30 OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalAzblob>;
31 pub type GcsSplitEnumerator =
32 OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalGcs>;
33 pub type OpendalS3SplitEnumerator =
34 OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalS3>;
35 pub type PosixFsSplitEnumerator =
36 OpendalEnumerator<crate::source::filesystem::opendal_source::OpendalPosixFs>;
37 pub use crate::source::cdc::enumerator::DebeziumSplitEnumerator;
38 pub type CitusCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Citus>;
39 pub type MongodbCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Mongodb>;
40 pub type PostgresCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Postgres>;
41 pub type MysqlCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::Mysql>;
42 pub type SqlServerCdcSplitEnumerator = DebeziumSplitEnumerator<crate::source::cdc::SqlServer>;
43}
44
45pub mod base;
46pub mod cdc;
47pub mod data_gen_util;
48pub mod datagen;
49pub mod filesystem;
50pub mod google_pubsub;
51pub mod kafka;
52pub mod kinesis;
53pub mod monitor;
54pub mod mqtt;
55pub mod nats;
56pub mod nexmark;
57pub mod pulsar;
58mod util;
59
60use std::future::IntoFuture;
61
62pub use base::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR, *};
63pub(crate) use common::*;
64use google_cloud_pubsub::subscription::Subscription;
65pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR;
66pub use kafka::KAFKA_CONNECTOR;
67pub use kinesis::KINESIS_CONNECTOR;
68pub use mqtt::MQTT_CONNECTOR;
69pub use nats::NATS_CONNECTOR;
70mod common;
71pub mod iceberg;
72mod manager;
73pub mod reader;
74pub mod test_source;
75
76use async_nats::jetstream::consumer::AckPolicy as JetStreamAckPolicy;
77use async_nats::jetstream::context::Context as JetStreamContext;
78pub use manager::{SourceColumnDesc, SourceColumnType};
79use risingwave_common::array::{Array, ArrayRef};
80use thiserror_ext::AsReport;
81pub use util::fill_adaptive_split;
82
83pub use crate::source::filesystem::LEGACY_S3_CONNECTOR;
84pub use crate::source::filesystem::opendal_source::{
85 AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR,
86};
87pub use crate::source::nexmark::NEXMARK_CONNECTOR;
88pub use crate::source::pulsar::PULSAR_CONNECTOR;
89
90pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool {
91 const PREFIXES: &[&str] = &[
92 "schema.registry",
93 "schema.location",
94 "message",
95 "key.message",
96 "without_header",
97 "delimiter",
98 "region",
100 "endpoint_url",
101 "access_key",
102 "secret_key",
103 "session_token",
104 "arn",
105 "external_id",
106 "profile",
107 ];
108 PREFIXES.iter().any(|prefix| key.starts_with(prefix))
109 || (key == "endpoint" && !connector.eq_ignore_ascii_case(KINESIS_CONNECTOR))
110}
111
112pub enum WaitCheckpointTask {
114 CommitCdcOffset(Option<(SplitId, String)>),
115 AckPubsubMessage(Subscription, Vec<ArrayRef>),
116 AckNatsJetStream(JetStreamContext, Vec<ArrayRef>, JetStreamAckPolicy),
117}
118
119impl WaitCheckpointTask {
120 pub async fn run(self) {
121 use std::str::FromStr;
122 match self {
123 WaitCheckpointTask::CommitCdcOffset(updated_offset) => {
124 if let Some((split_id, offset)) = updated_offset {
125 let source_id: u64 = u64::from_str(split_id.as_ref()).unwrap();
126 match cdc::jni_source::commit_cdc_offset(source_id, offset.clone()) {
128 Ok(()) => {}
129 Err(e) => {
130 tracing::error!(
131 error = %e.as_report(),
132 "source#{source_id}: failed to commit cdc offset: {offset}.",
133 )
134 }
135 }
136 }
137 }
138 WaitCheckpointTask::AckPubsubMessage(subscription, ack_id_arrs) => {
139 async fn ack(subscription: &Subscription, ack_ids: Vec<String>) {
140 tracing::trace!("acking pubsub messages {:?}", ack_ids);
141 match subscription.ack(ack_ids).await {
142 Ok(()) => {}
143 Err(e) => {
144 tracing::error!(
145 error = %e.as_report(),
146 "failed to ack pubsub messages",
147 )
148 }
149 }
150 }
151 const MAX_ACK_BATCH_SIZE: usize = 1000;
152 let mut ack_ids: Vec<String> = vec![];
153 for arr in ack_id_arrs {
154 for ack_id in arr.as_utf8().iter().flatten() {
155 ack_ids.push(ack_id.to_owned());
156 if ack_ids.len() >= MAX_ACK_BATCH_SIZE {
157 ack(&subscription, std::mem::take(&mut ack_ids)).await;
158 }
159 }
160 }
161 ack(&subscription, ack_ids).await;
162 }
163 WaitCheckpointTask::AckNatsJetStream(
164 ref context,
165 reply_subjects_arrs,
166 ref ack_policy,
167 ) => {
168 async fn ack(context: &JetStreamContext, reply_subject: String) {
169 match context.publish(reply_subject.clone(), "+ACK".into()).await {
170 Err(e) => {
171 tracing::error!(error = %e.as_report(), subject = ?reply_subject, "failed to ack NATS JetStream message");
172 }
173 Ok(ack_future) => {
174 let _ = ack_future.into_future().await;
175 }
176 }
177 }
178
179 let reply_subjects = reply_subjects_arrs
180 .iter()
181 .flat_map(|arr| {
182 arr.as_utf8()
183 .iter()
184 .flatten()
185 .map(|s| s.to_owned())
186 .collect::<Vec<String>>()
187 })
188 .collect::<Vec<String>>();
189
190 match ack_policy {
191 JetStreamAckPolicy::None => (),
192 JetStreamAckPolicy::Explicit => {
193 for reply_subject in reply_subjects {
194 if reply_subject.is_empty() {
195 continue;
196 }
197 ack(context, reply_subject).await;
198 }
199 }
200 JetStreamAckPolicy::All => {
201 if let Some(reply_subject) = reply_subjects.last() {
202 ack(context, reply_subject.clone()).await;
203 }
204 }
205 }
206 }
207 }
208 }
209}