risingwave_connector/sink/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15feature_gated_sink_mod!(big_query, "bigquery");
16pub mod boxed;
17pub mod catalog;
18feature_gated_sink_mod!(clickhouse, ClickHouse, "clickhouse");
19pub mod coordinate;
20pub mod decouple_checkpoint_log_sink;
21feature_gated_sink_mod!(deltalake, DeltaLake, "deltalake");
22feature_gated_sink_mod!(doris, "doris");
23#[cfg(any(feature = "sink-doris", feature = "sink-starrocks"))]
24pub mod doris_starrocks_connector;
25pub mod dynamodb;
26pub mod elasticsearch_opensearch;
27pub mod encoder;
28pub mod file_sink;
29pub mod formatter;
30feature_gated_sink_mod!(google_pubsub, GooglePubSub, "google_pubsub");
31pub mod iceberg;
32pub mod kafka;
33pub mod kinesis;
34use risingwave_common::bail;
35pub mod jdbc_jni_client;
36pub mod log_store;
37pub mod mock_coordination_client;
38pub mod mongodb;
39pub mod mqtt;
40pub mod nats;
41pub mod postgres;
42pub mod pulsar;
43pub mod redis;
44pub mod remote;
45pub mod snowflake_redshift;
46pub mod sqlserver;
47feature_gated_sink_mod!(starrocks, "starrocks");
48pub mod test_sink;
49pub mod trivial;
50pub mod utils;
51pub mod writer;
52pub mod prelude {
53    pub use crate::sink::{
54        Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
55        SINK_USER_FORCE_COMPACTION, Sink, SinkError, SinkParam, SinkWriterParam,
56    };
57}
58
59use std::collections::BTreeMap;
60use std::future::Future;
61use std::sync::{Arc, LazyLock};
62
63use ::redis::RedisError;
64use anyhow::anyhow;
65use async_trait::async_trait;
66use decouple_checkpoint_log_sink::{
67    COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
68    DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
69};
70use futures::future::BoxFuture;
71use opendal::Error as OpendalError;
72use prometheus::Registry;
73use risingwave_common::array::ArrayError;
74use risingwave_common::bitmap::Bitmap;
75use risingwave_common::catalog::{ColumnDesc, Field, Schema};
76use risingwave_common::config::StreamingConfig;
77use risingwave_common::hash::ActorId;
78use risingwave_common::metrics::{
79    LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
80    LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
81};
82use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
83use risingwave_common::secret::{LocalSecretManager, SecretError};
84use risingwave_common::session_config::sink_decouple::SinkDecouple;
85use risingwave_common::{
86    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
87    register_guarded_int_gauge_vec_with_registry,
88};
89use risingwave_pb::catalog::PbSinkType;
90use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
91use risingwave_rpc_client::MetaClient;
92use risingwave_rpc_client::error::RpcError;
93use sea_orm::DatabaseConnection;
94use thiserror::Error;
95use thiserror_ext::AsReport;
96use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
97pub use tracing;
98
99use self::catalog::{SinkFormatDesc, SinkType};
100use self::clickhouse::CLICKHOUSE_SINK;
101use self::deltalake::DELTALAKE_SINK;
102use self::iceberg::ICEBERG_SINK;
103use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
104use self::starrocks::STARROCKS_SINK;
105use crate::WithPropertiesExt;
106use crate::connector_common::IcebergSinkCompactionUpdate;
107use crate::error::{ConnectorError, ConnectorResult};
108use crate::sink::catalog::desc::SinkDesc;
109use crate::sink::catalog::{SinkCatalog, SinkId};
110use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
111use crate::sink::file_sink::fs::FsSink;
112use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
113use crate::sink::snowflake_redshift::snowflake::SNOWFLAKE_SINK_V2;
114use crate::sink::utils::feature_gated_sink_mod;
115
116const BOUNDED_CHANNEL_SIZE: usize = 16;
117#[macro_export]
118macro_rules! for_all_sinks {
119    ($macro:path $(, $arg:tt)*) => {
120        $macro! {
121            {
122                { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
123                { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
124                { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
125                { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
126                { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
127                { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
128                { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
129                { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
130                { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
131                { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
132                { Jdbc, $crate::sink::remote::JdbcSink, () },
133                { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
134                { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
135                { Cassandra, $crate::sink::remote::CassandraSink, () },
136                { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
137                { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
138                { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
139
140                { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
141                { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
142                { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
143
144                { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
145                { SnowflakeV2, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Sink, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Config },
146                { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
147                { RedShift, $crate::sink::snowflake_redshift::redshift::RedshiftSink, $crate::sink::snowflake_redshift::redshift::RedShiftConfig },
148                { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
149                { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
150                { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
151                { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
152                { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
153                { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
154
155                { Test, $crate::sink::test_sink::TestSink, () },
156                { Table, $crate::sink::trivial::TableSink, () }
157            }
158            $(,$arg)*
159        }
160    };
161}
162
163#[macro_export]
164macro_rules! generate_config_use_clauses {
165    ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
166        $(
167            $crate::generate_config_use_single! { $($config_type)+ }
168        )*
169    };
170}
171
172#[macro_export]
173macro_rules! generate_config_use_single {
174    // Skip () config types
175    (()) => {};
176
177    // Generate use clause for actual config types
178    ($config_type:path) => {
179        #[allow(unused_imports)]
180        pub(super) use $config_type;
181    };
182}
183
184// Convenience macro that uses for_all_sinks
185#[macro_export]
186macro_rules! use_all_sink_configs {
187    () => {
188        $crate::for_all_sinks! { $crate::generate_config_use_clauses }
189    };
190}
191
192#[macro_export]
193macro_rules! dispatch_sink {
194    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
195        use $crate::sink::SinkImpl;
196
197        match $impl {
198            $(
199                SinkImpl::$variant_name($sink) => $body,
200            )*
201        }
202    }};
203    ($impl:expr, $sink:ident, $body:expr) => {{
204        $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
205    }};
206}
207
208#[macro_export]
209macro_rules! match_sink_name_str {
210    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
211        use $crate::sink::Sink;
212        match $name_str {
213            $(
214                <$sink_type>::SINK_NAME => {
215                    type $type_name = $sink_type;
216                    {
217                        $body
218                    }
219                },
220            )*
221            other => ($on_other_closure)(other),
222        }
223    }};
224    ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
225        $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
226    }};
227}
228
229pub const CONNECTOR_TYPE_KEY: &str = "connector";
230pub const SINK_TYPE_OPTION: &str = "type";
231/// `snapshot = false` corresponds to [`risingwave_pb::stream_plan::StreamScanType::UpstreamOnly`]
232pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
233pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
234pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
235pub const SINK_TYPE_UPSERT: &str = "upsert";
236pub const SINK_TYPE_RETRACT: &str = "retract";
237pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
238pub const SINK_USER_FORCE_COMPACTION: &str = "force_compaction";
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct SinkParam {
242    pub sink_id: SinkId,
243    pub sink_name: String,
244    pub properties: BTreeMap<String, String>,
245    pub columns: Vec<ColumnDesc>,
246    /// User-defined primary key indices for upsert sink, if any.
247    pub downstream_pk: Option<Vec<usize>>,
248    pub sink_type: SinkType,
249    pub format_desc: Option<SinkFormatDesc>,
250    pub db_name: String,
251
252    /// - For `CREATE SINK ... FROM ...`, the name of the source table.
253    /// - For `CREATE SINK ... AS <query>`, the name of the sink itself.
254    ///
255    /// See also `gen_sink_plan`.
256    // TODO(eric): Why need these 2 fields (db_name and sink_from_name)?
257    pub sink_from_name: String,
258}
259
260impl SinkParam {
261    pub fn from_proto(pb_param: PbSinkParam) -> Self {
262        let table_schema = pb_param.table_schema.expect("should contain table schema");
263        let format_desc = match pb_param.format_desc {
264            Some(f) => f.try_into().ok(),
265            None => {
266                let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
267                let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
268                match (connector, r#type) {
269                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
270                    _ => None,
271                }
272            }
273        };
274        Self {
275            sink_id: SinkId::from(pb_param.sink_id),
276            sink_name: pb_param.sink_name,
277            properties: pb_param.properties,
278            columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
279            downstream_pk: if table_schema.pk_indices.is_empty() {
280                None
281            } else {
282                Some(
283                    (table_schema.pk_indices.iter())
284                        .map(|i| *i as usize)
285                        .collect(),
286                )
287            },
288            sink_type: SinkType::from_proto(
289                PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
290            ),
291            format_desc,
292            db_name: pb_param.db_name,
293            sink_from_name: pb_param.sink_from_name,
294        }
295    }
296
297    pub fn to_proto(&self) -> PbSinkParam {
298        PbSinkParam {
299            sink_id: self.sink_id,
300            sink_name: self.sink_name.clone(),
301            properties: self.properties.clone(),
302            table_schema: Some(TableSchema {
303                columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
304                pk_indices: (self.downstream_pk.as_ref())
305                    .map_or_else(Vec::new, |pk| pk.iter().map(|i| *i as u32).collect()),
306            }),
307            sink_type: self.sink_type.to_proto().into(),
308            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
309            db_name: self.db_name.clone(),
310            sink_from_name: self.sink_from_name.clone(),
311        }
312    }
313
314    pub fn schema(&self) -> Schema {
315        Schema {
316            fields: self.columns.iter().map(Field::from).collect(),
317        }
318    }
319
320    /// Get the downstream primary key indices specified by the user. If not specified, return
321    /// an empty vector.
322    ///
323    /// Prefer directly accessing the `downstream_pk` field, as it uses `None` to represent
324    /// unspecified values, making it clearer.
325    pub fn downstream_pk_or_empty(&self) -> Vec<usize> {
326        self.downstream_pk.clone().unwrap_or_default()
327    }
328
329    // `SinkParams` should only be used when there is a secret context.
330    // FIXME: Use a new type for `SinkFormatDesc` with properties contain filled secrets.
331    pub fn fill_secret_for_format_desc(
332        format_desc: Option<SinkFormatDesc>,
333    ) -> Result<Option<SinkFormatDesc>> {
334        match format_desc {
335            Some(mut format_desc) => {
336                format_desc.options = LocalSecretManager::global()
337                    .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
338                Ok(Some(format_desc))
339            }
340            None => Ok(None),
341        }
342    }
343
344    /// Try to convert a `SinkCatalog` to a `SinkParam` and fill the secrets to properties.
345    pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
346        let columns = sink_catalog
347            .visible_columns()
348            .map(|col| col.column_desc.clone())
349            .collect();
350        let properties_with_secret = LocalSecretManager::global()
351            .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
352        let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
353        Ok(Self {
354            sink_id: sink_catalog.id,
355            sink_name: sink_catalog.name,
356            properties: properties_with_secret,
357            columns,
358            downstream_pk: sink_catalog.downstream_pk,
359            sink_type: sink_catalog.sink_type,
360            format_desc: format_desc_with_secret,
361            db_name: sink_catalog.db_name,
362            sink_from_name: sink_catalog.sink_from_name,
363        })
364    }
365}
366
367pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
368    use crate::enforce_secret::EnforceSecret;
369
370    let connector = props
371        .get_connector()
372        .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
373    let key_iter = props.key_iter();
374    match_sink_name_str!(
375        connector.as_str(),
376        PropType,
377        PropType::enforce_secret(key_iter),
378        |other| bail!("connector '{}' is not supported", other)
379    )
380}
381
382pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
383    LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
384
385#[derive(Clone)]
386pub struct SinkMetrics {
387    pub sink_commit_duration: LabelGuardedHistogramVec,
388    pub connector_sink_rows_received: LabelGuardedIntCounterVec,
389
390    // Log store writer metrics
391    pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
392    pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
393    pub log_store_write_rows: LabelGuardedIntCounterVec,
394
395    // Log store reader metrics
396    pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
397    pub log_store_read_rows: LabelGuardedIntCounterVec,
398    pub log_store_read_bytes: LabelGuardedIntCounterVec,
399    pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
400
401    // Iceberg metrics
402    pub iceberg_write_qps: LabelGuardedIntCounterVec,
403    pub iceberg_write_latency: LabelGuardedHistogramVec,
404    pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
405    pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
406    pub iceberg_partition_num: LabelGuardedIntGaugeVec,
407    pub iceberg_write_bytes: LabelGuardedIntCounterVec,
408    pub iceberg_snapshot_num: LabelGuardedIntGaugeVec,
409}
410
411impl SinkMetrics {
412    pub fn new(registry: &Registry) -> Self {
413        let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
414            "sink_commit_duration",
415            "Duration of commit op in sink",
416            &["actor_id", "connector", "sink_id", "sink_name"],
417            registry
418        )
419        .unwrap();
420
421        let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
422            "connector_sink_rows_received",
423            "Number of rows received by sink",
424            &["actor_id", "connector_type", "sink_id", "sink_name"],
425            registry
426        )
427        .unwrap();
428
429        let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
430            "log_store_first_write_epoch",
431            "The first write epoch of log store",
432            &["actor_id", "sink_id", "sink_name"],
433            registry
434        )
435        .unwrap();
436
437        let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
438            "log_store_latest_write_epoch",
439            "The latest write epoch of log store",
440            &["actor_id", "sink_id", "sink_name"],
441            registry
442        )
443        .unwrap();
444
445        let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
446            "log_store_write_rows",
447            "The write rate of rows",
448            &["actor_id", "sink_id", "sink_name"],
449            registry
450        )
451        .unwrap();
452
453        let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
454            "log_store_latest_read_epoch",
455            "The latest read epoch of log store",
456            &["actor_id", "connector", "sink_id", "sink_name"],
457            registry
458        )
459        .unwrap();
460
461        let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
462            "log_store_read_rows",
463            "The read rate of rows",
464            &["actor_id", "connector", "sink_id", "sink_name"],
465            registry
466        )
467        .unwrap();
468
469        let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
470            "log_store_read_bytes",
471            "Total size of chunks read by log reader",
472            &["actor_id", "connector", "sink_id", "sink_name"],
473            registry
474        )
475        .unwrap();
476
477        let log_store_reader_wait_new_future_duration_ns =
478            register_guarded_int_counter_vec_with_registry!(
479                "log_store_reader_wait_new_future_duration_ns",
480                "Accumulated duration of LogReader to wait for next call to create future",
481                &["actor_id", "connector", "sink_id", "sink_name"],
482                registry
483            )
484            .unwrap();
485
486        let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
487            "iceberg_write_qps",
488            "The qps of iceberg writer",
489            &["actor_id", "sink_id", "sink_name"],
490            registry
491        )
492        .unwrap();
493
494        let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
495            "iceberg_write_latency",
496            "The latency of iceberg writer",
497            &["actor_id", "sink_id", "sink_name"],
498            registry
499        )
500        .unwrap();
501
502        let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
503            "iceberg_rolling_unflushed_data_file",
504            "The unflushed data file count of iceberg rolling writer",
505            &["actor_id", "sink_id", "sink_name"],
506            registry
507        )
508        .unwrap();
509
510        let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
511            "iceberg_position_delete_cache_num",
512            "The delete cache num of iceberg position delete writer",
513            &["actor_id", "sink_id", "sink_name"],
514            registry
515        )
516        .unwrap();
517
518        let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
519            "iceberg_partition_num",
520            "The partition num of iceberg partition writer",
521            &["actor_id", "sink_id", "sink_name"],
522            registry
523        )
524        .unwrap();
525
526        let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
527            "iceberg_write_bytes",
528            "The write bytes of iceberg writer",
529            &["actor_id", "sink_id", "sink_name"],
530            registry
531        )
532        .unwrap();
533
534        let iceberg_snapshot_num = register_guarded_int_gauge_vec_with_registry!(
535            "iceberg_snapshot_num",
536            "The snapshot number of iceberg table",
537            &["sink_name", "catalog_name", "table_name"],
538            registry
539        )
540        .unwrap();
541
542        Self {
543            sink_commit_duration,
544            connector_sink_rows_received,
545            log_store_first_write_epoch,
546            log_store_latest_write_epoch,
547            log_store_write_rows,
548            log_store_latest_read_epoch,
549            log_store_read_rows,
550            log_store_read_bytes,
551            log_store_reader_wait_new_future_duration_ns,
552            iceberg_write_qps,
553            iceberg_write_latency,
554            iceberg_rolling_unflushed_data_file,
555            iceberg_position_delete_cache_num,
556            iceberg_partition_num,
557            iceberg_write_bytes,
558            iceberg_snapshot_num,
559        }
560    }
561}
562
563#[derive(Clone)]
564pub struct SinkWriterParam {
565    // TODO(eric): deprecate executor_id
566    pub executor_id: u64,
567    pub vnode_bitmap: Option<Bitmap>,
568    pub meta_client: Option<SinkMetaClient>,
569    // The val has two effect:
570    // 1. Indicates that the sink will accpect the data chunk with extra partition value column.
571    // 2. The index of the extra partition value column.
572    // More detail of partition value column, see `PartitionComputeInfo`
573    pub extra_partition_col_idx: Option<usize>,
574
575    pub actor_id: ActorId,
576    pub sink_id: SinkId,
577    pub sink_name: String,
578    pub connector: String,
579    pub streaming_config: StreamingConfig,
580}
581
582#[derive(Clone)]
583pub struct SinkWriterMetrics {
584    pub sink_commit_duration: LabelGuardedHistogram,
585    pub connector_sink_rows_received: LabelGuardedIntCounter,
586}
587
588impl SinkWriterMetrics {
589    pub fn new(writer_param: &SinkWriterParam) -> Self {
590        let labels = [
591            &writer_param.actor_id.to_string(),
592            writer_param.connector.as_str(),
593            &writer_param.sink_id.to_string(),
594            writer_param.sink_name.as_str(),
595        ];
596        let sink_commit_duration = GLOBAL_SINK_METRICS
597            .sink_commit_duration
598            .with_guarded_label_values(&labels);
599        let connector_sink_rows_received = GLOBAL_SINK_METRICS
600            .connector_sink_rows_received
601            .with_guarded_label_values(&labels);
602        Self {
603            sink_commit_duration,
604            connector_sink_rows_received,
605        }
606    }
607
608    #[cfg(test)]
609    pub fn for_test() -> Self {
610        Self {
611            sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
612            connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
613        }
614    }
615}
616
617#[derive(Clone)]
618pub enum SinkMetaClient {
619    MetaClient(MetaClient),
620    MockMetaClient(MockMetaClient),
621}
622
623impl SinkMetaClient {
624    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
625        match self {
626            SinkMetaClient::MetaClient(meta_client) => {
627                SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
628                    meta_client.sink_coordinate_client().await,
629                )
630            }
631            SinkMetaClient::MockMetaClient(mock_meta_client) => {
632                SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
633                    mock_meta_client.sink_coordinate_client(),
634                )
635            }
636        }
637    }
638
639    pub async fn add_sink_fail_evet_log(
640        &self,
641        sink_id: SinkId,
642        sink_name: String,
643        connector: String,
644        error: String,
645    ) {
646        match self {
647            SinkMetaClient::MetaClient(meta_client) => {
648                match meta_client
649                    .add_sink_fail_evet(sink_id, sink_name, connector, error)
650                    .await
651                {
652                    Ok(_) => {}
653                    Err(e) => {
654                        tracing::warn!(error = %e.as_report(), %sink_id, "Failed to add sink fail event to event log.");
655                    }
656                }
657            }
658            SinkMetaClient::MockMetaClient(_) => {}
659        }
660    }
661}
662
663impl SinkWriterParam {
664    pub fn for_test() -> Self {
665        SinkWriterParam {
666            executor_id: Default::default(),
667            vnode_bitmap: Default::default(),
668            meta_client: Default::default(),
669            extra_partition_col_idx: Default::default(),
670
671            actor_id: 1.into(),
672            sink_id: SinkId::new(1),
673            sink_name: "test_sink".to_owned(),
674            connector: "test_connector".to_owned(),
675            streaming_config: StreamingConfig::default(),
676        }
677    }
678}
679
680fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
681    matches!(
682        sink_name,
683        ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK | SNOWFLAKE_SINK_V2
684    )
685}
686pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
687    const SINK_NAME: &'static str;
688
689    type LogSinker: LogSinker;
690    #[expect(deprecated)]
691    type Coordinator: SinkCommitCoordinator = NoSinkCommitCoordinator;
692
693    fn set_default_commit_checkpoint_interval(
694        desc: &mut SinkDesc,
695        user_specified: &SinkDecouple,
696    ) -> Result<()> {
697        if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
698            match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
699                Some(commit_checkpoint_interval) => {
700                    let commit_checkpoint_interval = commit_checkpoint_interval
701                        .parse::<u64>()
702                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
703                    if matches!(user_specified, SinkDecouple::Disable)
704                        && commit_checkpoint_interval > 1
705                    {
706                        return Err(SinkError::Config(anyhow!(
707                            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
708                        )));
709                    }
710                }
711                None => match user_specified {
712                    SinkDecouple::Default | SinkDecouple::Enable => {
713                        if matches!(Self::SINK_NAME, ICEBERG_SINK) {
714                            desc.properties.insert(
715                                COMMIT_CHECKPOINT_INTERVAL.to_owned(),
716                                ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL.to_string(),
717                            );
718                        } else {
719                            desc.properties.insert(
720                                COMMIT_CHECKPOINT_INTERVAL.to_owned(),
721                                DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
722                            );
723                        }
724                    }
725                    SinkDecouple::Disable => {
726                        desc.properties.insert(
727                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
728                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
729                        );
730                    }
731                },
732            }
733        }
734        Ok(())
735    }
736
737    /// `user_specified` is the value of `sink_decouple` config.
738    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
739        match user_specified {
740            SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
741            SinkDecouple::Disable => Ok(false),
742        }
743    }
744
745    fn support_schema_change() -> bool {
746        false
747    }
748
749    fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
750        Ok(())
751    }
752
753    async fn validate(&self) -> Result<()>;
754    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
755
756    fn is_coordinated_sink(&self) -> bool {
757        false
758    }
759
760    async fn new_coordinator(
761        &self,
762        _db: DatabaseConnection,
763        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
764    ) -> Result<Self::Coordinator> {
765        Err(SinkError::Coordinator(anyhow!("no coordinator")))
766    }
767}
768
769pub trait SinkLogReader: Send {
770    fn start_from(
771        &mut self,
772        start_offset: Option<u64>,
773    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
774    /// Emit the next item.
775    ///
776    /// The implementation should ensure that the future is cancellation safe.
777    fn next_item(
778        &mut self,
779    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
780
781    /// Mark that all items emitted so far have been consumed and it is safe to truncate the log
782    /// from the current offset.
783    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
784}
785
786impl<R: LogReader> SinkLogReader for &mut R {
787    fn next_item(
788        &mut self,
789    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
790        <R as LogReader>::next_item(*self)
791    }
792
793    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
794        <R as LogReader>::truncate(*self, offset)
795    }
796
797    fn start_from(
798        &mut self,
799        start_offset: Option<u64>,
800    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
801        <R as LogReader>::start_from(*self, start_offset)
802    }
803}
804
805#[async_trait]
806pub trait LogSinker: 'static + Send {
807    // Note: Please rebuild the log reader's read stream before consuming the log store,
808    async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
809}
810pub type SinkCommittedEpochSubscriber = Arc<
811    dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
812        + Send
813        + Sync
814        + 'static,
815>;
816
817#[async_trait]
818pub trait SinkCommitCoordinator {
819    /// Initialize the sink committer coordinator, return the log store rewind start offset.
820    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>>;
821    /// After collecting the metadata from each sink writer, a coordinator will call `commit` with
822    /// the set of metadata. The metadata is serialized into bytes, because the metadata is expected
823    /// to be passed between different gRPC node, so in this general trait, the metadata is
824    /// serialized bytes.
825    async fn commit(
826        &mut self,
827        epoch: u64,
828        metadata: Vec<SinkMetadata>,
829        add_columns: Option<Vec<Field>>,
830    ) -> Result<()>;
831}
832
833#[deprecated]
834/// A place holder struct of `SinkCommitCoordinator` for sink without coordinator.
835///
836/// It can never be constructed because it holds a never type, and therefore it's safe to
837/// mark all its methods as unreachable.
838///
839/// Explicitly mark this struct as `deprecated` so that when developers accidentally declare it explicitly as
840/// the associated type `Coordinator` when implementing `Sink` trait, they can be warned, and remove the explicit
841/// declaration.
842///
843/// Note:
844///     When we implement a sink without coordinator, don't explicitly write `type Coordinator = NoSinkCommitCoordinator`.
845///     Just remove the explicit declaration and use the default associated type, and besides, don't explicitly implement
846///     `fn new_coordinator(...)` and use the default implementation.
847pub struct NoSinkCommitCoordinator(!);
848
849#[expect(deprecated)]
850#[async_trait]
851impl SinkCommitCoordinator for NoSinkCommitCoordinator {
852    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
853        unreachable!()
854    }
855
856    async fn commit(
857        &mut self,
858        _epoch: u64,
859        _metadata: Vec<SinkMetadata>,
860        _add_columns: Option<Vec<Field>>,
861    ) -> Result<()> {
862        unreachable!()
863    }
864}
865
866impl SinkImpl {
867    pub fn new(mut param: SinkParam) -> Result<Self> {
868        const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
869
870        // remove privatelink related properties if any
871        param.properties.remove(PRIVATE_LINK_TARGET_KEY);
872
873        let sink_type = param
874            .properties
875            .get(CONNECTOR_TYPE_KEY)
876            .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
877
878        let sink_type = sink_type.to_lowercase();
879        match_sink_name_str!(
880            sink_type.as_str(),
881            SinkType,
882            Ok(SinkType::try_from(param)?.into()),
883            |other| {
884                Err(SinkError::Config(anyhow!(
885                    "unsupported sink connector {}",
886                    other
887                )))
888            }
889        )
890    }
891
892    pub fn is_sink_into_table(&self) -> bool {
893        matches!(self, SinkImpl::Table(_))
894    }
895
896    pub fn is_blackhole(&self) -> bool {
897        matches!(self, SinkImpl::BlackHole(_))
898    }
899
900    pub fn is_coordinated_sink(&self) -> bool {
901        dispatch_sink!(self, sink, sink.is_coordinated_sink())
902    }
903}
904
905pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
906    SinkImpl::new(param)
907}
908
909macro_rules! def_sink_impl {
910    () => {
911        $crate::for_all_sinks! { def_sink_impl }
912    };
913    ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
914        #[derive(Debug)]
915        pub enum SinkImpl {
916            $(
917                $variant_name(Box<$sink_type>),
918            )*
919        }
920
921        $(
922            impl From<$sink_type> for SinkImpl {
923                fn from(sink: $sink_type) -> SinkImpl {
924                    SinkImpl::$variant_name(Box::new(sink))
925                }
926            }
927        )*
928    };
929}
930
931def_sink_impl!();
932
933pub type Result<T> = std::result::Result<T, SinkError>;
934
935#[derive(Error, Debug)]
936pub enum SinkError {
937    #[error("Kafka error: {0}")]
938    Kafka(#[from] rdkafka::error::KafkaError),
939    #[error("Kinesis error: {0}")]
940    Kinesis(
941        #[source]
942        #[backtrace]
943        anyhow::Error,
944    ),
945    #[error("Remote sink error: {0}")]
946    Remote(
947        #[source]
948        #[backtrace]
949        anyhow::Error,
950    ),
951    #[error("Encode error: {0}")]
952    Encode(String),
953    #[error("Avro error: {0}")]
954    Avro(#[from] apache_avro::Error),
955    #[error("Iceberg error: {0}")]
956    Iceberg(
957        #[source]
958        #[backtrace]
959        anyhow::Error,
960    ),
961    #[error("config error: {0}")]
962    Config(
963        #[source]
964        #[backtrace]
965        anyhow::Error,
966    ),
967    #[error("coordinator error: {0}")]
968    Coordinator(
969        #[source]
970        #[backtrace]
971        anyhow::Error,
972    ),
973    #[error("ClickHouse error: {0}")]
974    ClickHouse(String),
975    #[error("Redis error: {0}")]
976    Redis(String),
977    #[error("Mqtt error: {0}")]
978    Mqtt(
979        #[source]
980        #[backtrace]
981        anyhow::Error,
982    ),
983    #[error("Nats error: {0}")]
984    Nats(
985        #[source]
986        #[backtrace]
987        anyhow::Error,
988    ),
989    #[error("Google Pub/Sub error: {0}")]
990    GooglePubSub(
991        #[source]
992        #[backtrace]
993        anyhow::Error,
994    ),
995    #[error("Doris/Starrocks connect error: {0}")]
996    DorisStarrocksConnect(
997        #[source]
998        #[backtrace]
999        anyhow::Error,
1000    ),
1001    #[error("Doris error: {0}")]
1002    Doris(String),
1003    #[error("DeltaLake error: {0}")]
1004    DeltaLake(
1005        #[source]
1006        #[backtrace]
1007        anyhow::Error,
1008    ),
1009    #[error("ElasticSearch/OpenSearch error: {0}")]
1010    ElasticSearchOpenSearch(
1011        #[source]
1012        #[backtrace]
1013        anyhow::Error,
1014    ),
1015    #[error("Starrocks error: {0}")]
1016    Starrocks(String),
1017    #[error("File error: {0}")]
1018    File(String),
1019    #[error("Pulsar error: {0}")]
1020    Pulsar(
1021        #[source]
1022        #[backtrace]
1023        anyhow::Error,
1024    ),
1025    #[error(transparent)]
1026    Internal(
1027        #[from]
1028        #[backtrace]
1029        anyhow::Error,
1030    ),
1031    #[error("BigQuery error: {0}")]
1032    BigQuery(
1033        #[source]
1034        #[backtrace]
1035        anyhow::Error,
1036    ),
1037    #[error("DynamoDB error: {0}")]
1038    DynamoDb(
1039        #[source]
1040        #[backtrace]
1041        anyhow::Error,
1042    ),
1043    #[error("SQL Server error: {0}")]
1044    SqlServer(
1045        #[source]
1046        #[backtrace]
1047        anyhow::Error,
1048    ),
1049    #[error("Postgres error: {0}")]
1050    Postgres(
1051        #[source]
1052        #[backtrace]
1053        anyhow::Error,
1054    ),
1055    #[error(transparent)]
1056    Connector(
1057        #[from]
1058        #[backtrace]
1059        ConnectorError,
1060    ),
1061    #[error("Secret error: {0}")]
1062    Secret(
1063        #[from]
1064        #[backtrace]
1065        SecretError,
1066    ),
1067    #[error("Mongodb error: {0}")]
1068    Mongodb(
1069        #[source]
1070        #[backtrace]
1071        anyhow::Error,
1072    ),
1073}
1074
1075impl From<sea_orm::DbErr> for SinkError {
1076    fn from(err: sea_orm::DbErr) -> Self {
1077        SinkError::Iceberg(anyhow!(err))
1078    }
1079}
1080
1081impl From<OpendalError> for SinkError {
1082    fn from(error: OpendalError) -> Self {
1083        SinkError::File(error.to_report_string())
1084    }
1085}
1086
1087impl From<parquet::errors::ParquetError> for SinkError {
1088    fn from(error: parquet::errors::ParquetError) -> Self {
1089        SinkError::File(error.to_report_string())
1090    }
1091}
1092
1093impl From<ArrayError> for SinkError {
1094    fn from(error: ArrayError) -> Self {
1095        SinkError::File(error.to_report_string())
1096    }
1097}
1098
1099impl From<RpcError> for SinkError {
1100    fn from(value: RpcError) -> Self {
1101        SinkError::Remote(anyhow!(value))
1102    }
1103}
1104
1105impl From<RedisError> for SinkError {
1106    fn from(value: RedisError) -> Self {
1107        SinkError::Redis(value.to_report_string())
1108    }
1109}
1110
1111impl From<tiberius::error::Error> for SinkError {
1112    fn from(err: tiberius::error::Error) -> Self {
1113        SinkError::SqlServer(anyhow!(err))
1114    }
1115}
1116
1117impl From<::elasticsearch::Error> for SinkError {
1118    fn from(err: ::elasticsearch::Error) -> Self {
1119        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1120    }
1121}
1122
1123impl From<::opensearch::Error> for SinkError {
1124    fn from(err: ::opensearch::Error) -> Self {
1125        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1126    }
1127}
1128
1129impl From<tokio_postgres::Error> for SinkError {
1130    fn from(err: tokio_postgres::Error) -> Self {
1131        SinkError::Postgres(anyhow!(err))
1132    }
1133}