risingwave_connector/sink/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15feature_gated_sink_mod!(big_query, "bigquery");
16pub mod boxed;
17pub mod catalog;
18feature_gated_sink_mod!(clickhouse, ClickHouse, "clickhouse");
19pub mod coordinate;
20pub mod decouple_checkpoint_log_sink;
21feature_gated_sink_mod!(deltalake, DeltaLake, "deltalake");
22feature_gated_sink_mod!(doris, "doris");
23#[cfg(any(feature = "sink-doris", feature = "sink-starrocks"))]
24pub mod doris_starrocks_connector;
25pub mod dynamodb;
26pub mod elasticsearch_opensearch;
27pub mod encoder;
28pub mod file_sink;
29pub mod formatter;
30feature_gated_sink_mod!(google_pubsub, GooglePubSub, "google_pubsub");
31pub mod iceberg;
32pub mod kafka;
33pub mod kinesis;
34use risingwave_common::bail;
35use risingwave_pb::stream_plan::PbSinkSchemaChange;
36pub mod jdbc_jni_client;
37pub mod log_store;
38pub mod mock_coordination_client;
39pub mod mongodb;
40pub mod mqtt;
41pub mod nats;
42pub mod postgres;
43pub mod pulsar;
44pub mod redis;
45pub mod remote;
46pub mod snowflake_redshift;
47pub mod sqlserver;
48feature_gated_sink_mod!(starrocks, "starrocks");
49pub mod test_sink;
50pub mod trivial;
51pub mod utils;
52pub mod writer;
53pub mod prelude {
54    pub use crate::sink::{
55        Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
56        SINK_USER_FORCE_COMPACTION, Sink, SinkError, SinkParam, SinkWriterParam,
57    };
58}
59
60use std::collections::BTreeMap;
61use std::future::Future;
62use std::sync::{Arc, LazyLock};
63
64use ::redis::RedisError;
65use anyhow::anyhow;
66use async_trait::async_trait;
67use decouple_checkpoint_log_sink::{
68    COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
69    DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
70};
71use futures::future::BoxFuture;
72use opendal::Error as OpendalError;
73use prometheus::Registry;
74use risingwave_common::array::ArrayError;
75use risingwave_common::bitmap::Bitmap;
76use risingwave_common::catalog::{ColumnDesc, Field, Schema};
77use risingwave_common::config::StreamingConfig;
78use risingwave_common::hash::ActorId;
79use risingwave_common::metrics::{
80    LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
81    LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
82};
83use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
84use risingwave_common::secret::{LocalSecretManager, SecretError};
85use risingwave_common::session_config::sink_decouple::SinkDecouple;
86use risingwave_common::{
87    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
88    register_guarded_int_gauge_vec_with_registry,
89};
90use risingwave_pb::catalog::PbSinkType;
91use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
92use risingwave_pb::id::ExecutorId;
93use risingwave_rpc_client::MetaClient;
94use risingwave_rpc_client::error::RpcError;
95use starrocks::STARROCKS_SINK;
96use thiserror::Error;
97use thiserror_ext::AsReport;
98use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
99pub use tracing;
100
101use self::catalog::{SinkFormatDesc, SinkType};
102use self::clickhouse::CLICKHOUSE_SINK;
103use self::deltalake::DELTALAKE_SINK;
104use self::iceberg::ICEBERG_SINK;
105use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
106use crate::WithPropertiesExt;
107use crate::connector_common::IcebergSinkCompactionUpdate;
108use crate::error::{ConnectorError, ConnectorResult};
109use crate::sink::boxed::{BoxSinglePhaseCoordinator, BoxTwoPhaseCoordinator};
110use crate::sink::catalog::desc::SinkDesc;
111use crate::sink::catalog::{SinkCatalog, SinkId};
112use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
113use crate::sink::file_sink::fs::FsSink;
114use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
115use crate::sink::snowflake_redshift::snowflake::SNOWFLAKE_SINK_V2;
116use crate::sink::utils::feature_gated_sink_mod;
117
118const BOUNDED_CHANNEL_SIZE: usize = 16;
119#[macro_export]
120macro_rules! for_all_sinks {
121    ($macro:path $(, $arg:tt)*) => {
122        $macro! {
123            {
124                { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
125                { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
126                { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
127                { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
128                { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
129                { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
130                { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
131                { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
132                { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
133                { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
134                { Jdbc, $crate::sink::remote::JdbcSink, () },
135                { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
136                { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
137                { Cassandra, $crate::sink::remote::CassandraSink, () },
138                { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
139                { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
140                { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
141
142                { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
143                { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
144                { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
145
146                { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
147                { SnowflakeV2, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Sink, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Config },
148                { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
149                { RedShift, $crate::sink::snowflake_redshift::redshift::RedshiftSink, $crate::sink::snowflake_redshift::redshift::RedShiftConfig },
150                { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
151                { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
152                { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
153                { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
154                { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
155                { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
156
157                { Test, $crate::sink::test_sink::TestSink, () },
158                { Table, $crate::sink::trivial::TableSink, () }
159            }
160            $(,$arg)*
161        }
162    };
163}
164
165#[macro_export]
166macro_rules! generate_config_use_clauses {
167    ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
168        $(
169            $crate::generate_config_use_single! { $($config_type)+ }
170        )*
171    };
172}
173
174#[macro_export]
175macro_rules! generate_config_use_single {
176    // Skip () config types
177    (()) => {};
178
179    // Generate use clause for actual config types
180    ($config_type:path) => {
181        #[allow(unused_imports)]
182        pub(super) use $config_type;
183    };
184}
185
186// Convenience macro that uses for_all_sinks
187#[macro_export]
188macro_rules! use_all_sink_configs {
189    () => {
190        $crate::for_all_sinks! { $crate::generate_config_use_clauses }
191    };
192}
193
194#[macro_export]
195macro_rules! dispatch_sink {
196    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
197        use $crate::sink::SinkImpl;
198
199        match $impl {
200            $(
201                SinkImpl::$variant_name($sink) => $body,
202            )*
203        }
204    }};
205    ($impl:expr, $sink:ident, $body:expr) => {{
206        $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
207    }};
208}
209
210#[macro_export]
211macro_rules! match_sink_name_str {
212    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
213        use $crate::sink::Sink;
214        match $name_str {
215            $(
216                <$sink_type>::SINK_NAME => {
217                    type $type_name = $sink_type;
218                    {
219                        $body
220                    }
221                },
222            )*
223            other => ($on_other_closure)(other),
224        }
225    }};
226    ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
227        $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
228    }};
229}
230
231pub const CONNECTOR_TYPE_KEY: &str = "connector";
232pub const SINK_TYPE_OPTION: &str = "type";
233/// `snapshot = false` corresponds to [`risingwave_pb::stream_plan::StreamScanType::UpstreamOnly`]
234pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
235pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
236pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
237pub const SINK_TYPE_UPSERT: &str = "upsert";
238pub const SINK_TYPE_RETRACT: &str = "retract";
239/// Whether to drop DELETE and convert UPDATE to INSERT in the sink executor.
240pub const SINK_USER_IGNORE_DELETE_OPTION: &str = "ignore_delete";
241/// Alias for [`SINK_USER_IGNORE_DELETE_OPTION`], kept for backward compatibility.
242pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
243pub const SINK_USER_FORCE_COMPACTION: &str = "force_compaction";
244
245#[derive(Debug, Clone, PartialEq, Eq)]
246pub struct SinkParam {
247    pub sink_id: SinkId,
248    pub sink_name: String,
249    pub properties: BTreeMap<String, String>,
250    pub columns: Vec<ColumnDesc>,
251    /// User-defined primary key indices for upsert sink, if any.
252    pub downstream_pk: Option<Vec<usize>>,
253    pub sink_type: SinkType,
254    /// Whether to drop DELETE and convert UPDATE to INSERT in the sink executor.
255    pub ignore_delete: bool,
256    pub format_desc: Option<SinkFormatDesc>,
257    pub db_name: String,
258
259    /// - For `CREATE SINK ... FROM ...`, the name of the source table.
260    /// - For `CREATE SINK ... AS <query>`, the name of the sink itself.
261    ///
262    /// See also `gen_sink_plan`.
263    // TODO(eric): Why need these 2 fields (db_name and sink_from_name)?
264    pub sink_from_name: String,
265}
266
267impl SinkParam {
268    pub fn from_proto(pb_param: PbSinkParam) -> Self {
269        let ignore_delete = pb_param.ignore_delete();
270        let table_schema = pb_param.table_schema.expect("should contain table schema");
271        let format_desc = match pb_param.format_desc {
272            Some(f) => f.try_into().ok(),
273            None => {
274                let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
275                let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
276                match (connector, r#type) {
277                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
278                    _ => None,
279                }
280            }
281        };
282        Self {
283            sink_id: SinkId::from(pb_param.sink_id),
284            sink_name: pb_param.sink_name,
285            properties: pb_param.properties,
286            columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
287            downstream_pk: if table_schema.pk_indices.is_empty() {
288                None
289            } else {
290                Some(
291                    (table_schema.pk_indices.iter())
292                        .map(|i| *i as usize)
293                        .collect(),
294                )
295            },
296            sink_type: SinkType::from_proto(
297                PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
298            ),
299            ignore_delete,
300            format_desc,
301            db_name: pb_param.db_name,
302            sink_from_name: pb_param.sink_from_name,
303        }
304    }
305
306    pub fn to_proto(&self) -> PbSinkParam {
307        PbSinkParam {
308            sink_id: self.sink_id,
309            sink_name: self.sink_name.clone(),
310            properties: self.properties.clone(),
311            table_schema: Some(TableSchema {
312                columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
313                pk_indices: (self.downstream_pk.as_ref())
314                    .map_or_else(Vec::new, |pk| pk.iter().map(|i| *i as u32).collect()),
315            }),
316            sink_type: self.sink_type.to_proto().into(),
317            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
318            db_name: self.db_name.clone(),
319            sink_from_name: self.sink_from_name.clone(),
320            raw_ignore_delete: self.ignore_delete,
321        }
322    }
323
324    pub fn schema(&self) -> Schema {
325        Schema {
326            fields: self.columns.iter().map(Field::from).collect(),
327        }
328    }
329
330    /// Get the downstream primary key indices specified by the user. If not specified, return
331    /// an empty vector.
332    ///
333    /// Prefer directly accessing the `downstream_pk` field, as it uses `None` to represent
334    /// unspecified values, making it clearer.
335    pub fn downstream_pk_or_empty(&self) -> Vec<usize> {
336        self.downstream_pk.clone().unwrap_or_default()
337    }
338
339    // `SinkParams` should only be used when there is a secret context.
340    // FIXME: Use a new type for `SinkFormatDesc` with properties contain filled secrets.
341    pub fn fill_secret_for_format_desc(
342        format_desc: Option<SinkFormatDesc>,
343    ) -> Result<Option<SinkFormatDesc>> {
344        match format_desc {
345            Some(mut format_desc) => {
346                format_desc.options = LocalSecretManager::global()
347                    .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
348                Ok(Some(format_desc))
349            }
350            None => Ok(None),
351        }
352    }
353
354    /// Try to convert a `SinkCatalog` to a `SinkParam` and fill the secrets to properties.
355    pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
356        let columns = sink_catalog
357            .visible_columns()
358            .map(|col| col.column_desc.clone())
359            .collect();
360        let properties_with_secret = LocalSecretManager::global()
361            .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
362        let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
363        Ok(Self {
364            sink_id: sink_catalog.id,
365            sink_name: sink_catalog.name,
366            properties: properties_with_secret,
367            columns,
368            downstream_pk: sink_catalog.downstream_pk,
369            sink_type: sink_catalog.sink_type,
370            ignore_delete: sink_catalog.ignore_delete,
371            format_desc: format_desc_with_secret,
372            db_name: sink_catalog.db_name,
373            sink_from_name: sink_catalog.sink_from_name,
374        })
375    }
376}
377
378pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
379    use crate::enforce_secret::EnforceSecret;
380
381    let connector = props
382        .get_connector()
383        .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
384    let key_iter = props.key_iter();
385    match_sink_name_str!(
386        connector.as_str(),
387        PropType,
388        PropType::enforce_secret(key_iter),
389        |other| bail!("connector '{}' is not supported", other)
390    )
391}
392
393pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
394    LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
395
396#[derive(Clone)]
397pub struct SinkMetrics {
398    pub sink_commit_duration: LabelGuardedHistogramVec,
399    pub connector_sink_rows_received: LabelGuardedIntCounterVec,
400
401    // Log store writer metrics
402    pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
403    pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
404    pub log_store_write_rows: LabelGuardedIntCounterVec,
405
406    // Log store reader metrics
407    pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
408    pub log_store_read_rows: LabelGuardedIntCounterVec,
409    pub log_store_read_bytes: LabelGuardedIntCounterVec,
410    pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
411
412    // Iceberg metrics
413    pub iceberg_write_qps: LabelGuardedIntCounterVec,
414    pub iceberg_write_latency: LabelGuardedHistogramVec,
415    pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
416    pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
417    pub iceberg_partition_num: LabelGuardedIntGaugeVec,
418    pub iceberg_write_bytes: LabelGuardedIntCounterVec,
419    pub iceberg_snapshot_num: LabelGuardedIntGaugeVec,
420}
421
422impl SinkMetrics {
423    pub fn new(registry: &Registry) -> Self {
424        let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
425            "sink_commit_duration",
426            "Duration of commit op in sink",
427            &["actor_id", "connector", "sink_id", "sink_name"],
428            registry
429        )
430        .unwrap();
431
432        let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
433            "connector_sink_rows_received",
434            "Number of rows received by sink",
435            &["actor_id", "connector_type", "sink_id", "sink_name"],
436            registry
437        )
438        .unwrap();
439
440        let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
441            "log_store_first_write_epoch",
442            "The first write epoch of log store",
443            &["actor_id", "sink_id", "sink_name"],
444            registry
445        )
446        .unwrap();
447
448        let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
449            "log_store_latest_write_epoch",
450            "The latest write epoch of log store",
451            &["actor_id", "sink_id", "sink_name"],
452            registry
453        )
454        .unwrap();
455
456        let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
457            "log_store_write_rows",
458            "The write rate of rows",
459            &["actor_id", "sink_id", "sink_name"],
460            registry
461        )
462        .unwrap();
463
464        let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
465            "log_store_latest_read_epoch",
466            "The latest read epoch of log store",
467            &["actor_id", "connector", "sink_id", "sink_name"],
468            registry
469        )
470        .unwrap();
471
472        let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
473            "log_store_read_rows",
474            "The read rate of rows",
475            &["actor_id", "connector", "sink_id", "sink_name"],
476            registry
477        )
478        .unwrap();
479
480        let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
481            "log_store_read_bytes",
482            "Total size of chunks read by log reader",
483            &["actor_id", "connector", "sink_id", "sink_name"],
484            registry
485        )
486        .unwrap();
487
488        let log_store_reader_wait_new_future_duration_ns =
489            register_guarded_int_counter_vec_with_registry!(
490                "log_store_reader_wait_new_future_duration_ns",
491                "Accumulated duration of LogReader to wait for next call to create future",
492                &["actor_id", "connector", "sink_id", "sink_name"],
493                registry
494            )
495            .unwrap();
496
497        let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
498            "iceberg_write_qps",
499            "The qps of iceberg writer",
500            &["actor_id", "sink_id", "sink_name"],
501            registry
502        )
503        .unwrap();
504
505        let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
506            "iceberg_write_latency",
507            "The latency of iceberg writer",
508            &["actor_id", "sink_id", "sink_name"],
509            registry
510        )
511        .unwrap();
512
513        let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
514            "iceberg_rolling_unflushed_data_file",
515            "The unflushed data file count of iceberg rolling writer",
516            &["actor_id", "sink_id", "sink_name"],
517            registry
518        )
519        .unwrap();
520
521        let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
522            "iceberg_position_delete_cache_num",
523            "The delete cache num of iceberg position delete writer",
524            &["actor_id", "sink_id", "sink_name"],
525            registry
526        )
527        .unwrap();
528
529        let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
530            "iceberg_partition_num",
531            "The partition num of iceberg partition writer",
532            &["actor_id", "sink_id", "sink_name"],
533            registry
534        )
535        .unwrap();
536
537        let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
538            "iceberg_write_bytes",
539            "The write bytes of iceberg writer",
540            &["actor_id", "sink_id", "sink_name"],
541            registry
542        )
543        .unwrap();
544
545        let iceberg_snapshot_num = register_guarded_int_gauge_vec_with_registry!(
546            "iceberg_snapshot_num",
547            "The snapshot number of iceberg table",
548            &["sink_name", "catalog_name", "table_name"],
549            registry
550        )
551        .unwrap();
552
553        Self {
554            sink_commit_duration,
555            connector_sink_rows_received,
556            log_store_first_write_epoch,
557            log_store_latest_write_epoch,
558            log_store_write_rows,
559            log_store_latest_read_epoch,
560            log_store_read_rows,
561            log_store_read_bytes,
562            log_store_reader_wait_new_future_duration_ns,
563            iceberg_write_qps,
564            iceberg_write_latency,
565            iceberg_rolling_unflushed_data_file,
566            iceberg_position_delete_cache_num,
567            iceberg_partition_num,
568            iceberg_write_bytes,
569            iceberg_snapshot_num,
570        }
571    }
572}
573
574#[derive(Clone)]
575pub struct SinkWriterParam {
576    // TODO(eric): deprecate executor_id
577    pub executor_id: ExecutorId,
578    pub vnode_bitmap: Option<Bitmap>,
579    pub meta_client: Option<SinkMetaClient>,
580    // The val has two effect:
581    // 1. Indicates that the sink will accpect the data chunk with extra partition value column.
582    // 2. The index of the extra partition value column.
583    // More detail of partition value column, see `PartitionComputeInfo`
584    pub extra_partition_col_idx: Option<usize>,
585
586    pub actor_id: ActorId,
587    pub sink_id: SinkId,
588    pub sink_name: String,
589    pub connector: String,
590    pub streaming_config: StreamingConfig,
591}
592
593#[derive(Clone)]
594pub struct SinkWriterMetrics {
595    pub sink_commit_duration: LabelGuardedHistogram,
596    pub connector_sink_rows_received: LabelGuardedIntCounter,
597}
598
599impl SinkWriterMetrics {
600    pub fn new(writer_param: &SinkWriterParam) -> Self {
601        let labels = [
602            &writer_param.actor_id.to_string(),
603            writer_param.connector.as_str(),
604            &writer_param.sink_id.to_string(),
605            writer_param.sink_name.as_str(),
606        ];
607        let sink_commit_duration = GLOBAL_SINK_METRICS
608            .sink_commit_duration
609            .with_guarded_label_values(&labels);
610        let connector_sink_rows_received = GLOBAL_SINK_METRICS
611            .connector_sink_rows_received
612            .with_guarded_label_values(&labels);
613        Self {
614            sink_commit_duration,
615            connector_sink_rows_received,
616        }
617    }
618
619    #[cfg(test)]
620    pub fn for_test() -> Self {
621        Self {
622            sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
623            connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
624        }
625    }
626}
627
628#[derive(Clone)]
629pub enum SinkMetaClient {
630    MetaClient(MetaClient),
631    MockMetaClient(MockMetaClient),
632}
633
634impl SinkMetaClient {
635    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
636        match self {
637            SinkMetaClient::MetaClient(meta_client) => {
638                SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
639                    meta_client.sink_coordinate_client().await,
640                )
641            }
642            SinkMetaClient::MockMetaClient(mock_meta_client) => {
643                SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
644                    mock_meta_client.sink_coordinate_client(),
645                )
646            }
647        }
648    }
649
650    pub async fn add_sink_fail_evet_log(
651        &self,
652        sink_id: SinkId,
653        sink_name: String,
654        connector: String,
655        error: String,
656    ) {
657        match self {
658            SinkMetaClient::MetaClient(meta_client) => {
659                match meta_client
660                    .add_sink_fail_evet(sink_id, sink_name, connector, error)
661                    .await
662                {
663                    Ok(_) => {}
664                    Err(e) => {
665                        tracing::warn!(error = %e.as_report(), %sink_id, "Failed to add sink fail event to event log.");
666                    }
667                }
668            }
669            SinkMetaClient::MockMetaClient(_) => {}
670        }
671    }
672}
673
674impl SinkWriterParam {
675    pub fn for_test() -> Self {
676        SinkWriterParam {
677            executor_id: Default::default(),
678            vnode_bitmap: Default::default(),
679            meta_client: Default::default(),
680            extra_partition_col_idx: Default::default(),
681
682            actor_id: 1.into(),
683            sink_id: SinkId::new(1),
684            sink_name: "test_sink".to_owned(),
685            connector: "test_connector".to_owned(),
686            streaming_config: StreamingConfig::default(),
687        }
688    }
689}
690
691fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
692    matches!(
693        sink_name,
694        ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK | SNOWFLAKE_SINK_V2
695    )
696}
697pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
698    const SINK_NAME: &'static str;
699
700    type LogSinker: LogSinker;
701
702    fn set_default_commit_checkpoint_interval(
703        desc: &mut SinkDesc,
704        user_specified: &SinkDecouple,
705    ) -> Result<()> {
706        if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
707            match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
708                Some(commit_checkpoint_interval) => {
709                    let commit_checkpoint_interval = commit_checkpoint_interval
710                        .parse::<u64>()
711                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
712                    if matches!(user_specified, SinkDecouple::Disable)
713                        && commit_checkpoint_interval > 1
714                    {
715                        return Err(SinkError::Config(anyhow!(
716                            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
717                        )));
718                    }
719                }
720                None => match user_specified {
721                    SinkDecouple::Default | SinkDecouple::Enable => {
722                        if matches!(Self::SINK_NAME, ICEBERG_SINK) {
723                            desc.properties.insert(
724                                COMMIT_CHECKPOINT_INTERVAL.to_owned(),
725                                ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL.to_string(),
726                            );
727                        } else {
728                            desc.properties.insert(
729                                COMMIT_CHECKPOINT_INTERVAL.to_owned(),
730                                DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
731                            );
732                        }
733                    }
734                    SinkDecouple::Disable => {
735                        desc.properties.insert(
736                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
737                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
738                        );
739                    }
740                },
741            }
742        }
743        Ok(())
744    }
745
746    /// `user_specified` is the value of `sink_decouple` config.
747    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
748        match user_specified {
749            SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
750            SinkDecouple::Disable => Ok(false),
751        }
752    }
753
754    fn support_schema_change() -> bool {
755        false
756    }
757
758    fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
759        Ok(())
760    }
761
762    async fn validate(&self) -> Result<()>;
763    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
764
765    fn is_coordinated_sink(&self) -> bool {
766        false
767    }
768
769    async fn new_coordinator(
770        &self,
771        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
772    ) -> Result<SinkCommitCoordinator> {
773        Err(SinkError::Coordinator(anyhow!("no coordinator")))
774    }
775}
776
777pub trait SinkLogReader: Send {
778    fn start_from(
779        &mut self,
780        start_offset: Option<u64>,
781    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
782    /// Emit the next item.
783    ///
784    /// The implementation should ensure that the future is cancellation safe.
785    fn next_item(
786        &mut self,
787    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
788
789    /// Mark that all items emitted so far have been consumed and it is safe to truncate the log
790    /// from the current offset.
791    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
792}
793
794impl<R: LogReader> SinkLogReader for &mut R {
795    fn next_item(
796        &mut self,
797    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
798        <R as LogReader>::next_item(*self)
799    }
800
801    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
802        <R as LogReader>::truncate(*self, offset)
803    }
804
805    fn start_from(
806        &mut self,
807        start_offset: Option<u64>,
808    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
809        <R as LogReader>::start_from(*self, start_offset)
810    }
811}
812
813#[async_trait]
814pub trait LogSinker: 'static + Send {
815    // Note: Please rebuild the log reader's read stream before consuming the log store,
816    async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
817}
818pub type SinkCommittedEpochSubscriber = Arc<
819    dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
820        + Send
821        + Sync
822        + 'static,
823>;
824
825pub enum SinkCommitCoordinator {
826    SinglePhase(BoxSinglePhaseCoordinator),
827    TwoPhase(BoxTwoPhaseCoordinator),
828}
829
830#[async_trait]
831pub trait SinglePhaseCommitCoordinator {
832    /// Initialize the sink committer coordinator.
833    async fn init(&mut self) -> Result<()>;
834
835    /// Commit data directly using single-phase strategy.
836    async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
837
838    /// Idempotent implementation is required, because `commit_schema_change` in the same epoch could be called multiple
839    /// times.
840    async fn commit_schema_change(
841        &mut self,
842        _epoch: u64,
843        _schema_change: PbSinkSchemaChange,
844    ) -> Result<()> {
845        Err(SinkError::Coordinator(anyhow!(
846            "Schema change is not implemented for single-phase commit coordinator {}",
847            std::any::type_name::<Self>()
848        )))
849    }
850}
851
852#[async_trait]
853pub trait TwoPhaseCommitCoordinator {
854    /// Initialize the sink committer coordinator.
855    async fn init(&mut self) -> Result<()>;
856
857    /// Return serialized commit metadata to be passed to `commit`.
858    async fn pre_commit(
859        &mut self,
860        epoch: u64,
861        metadata: Vec<SinkMetadata>,
862        schema_change: Option<PbSinkSchemaChange>,
863    ) -> Result<Option<Vec<u8>>>;
864
865    /// Idempotent implementation is required, because `commit_data` in the same epoch could be called multiple times.
866    async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()>;
867
868    /// Idempotent implementation is required, because `commit_schema_change` in the same epoch could be called multiple
869    /// times.
870    async fn commit_schema_change(
871        &mut self,
872        _epoch: u64,
873        _schema_change: PbSinkSchemaChange,
874    ) -> Result<()> {
875        Err(SinkError::Coordinator(anyhow!(
876            "Schema change is not implemented for two-phase commit coordinator {}",
877            std::any::type_name::<Self>()
878        )))
879    }
880
881    /// Idempotent implementation is required, because `abort` in the same epoch could be called multiple times.
882    async fn abort(&mut self, epoch: u64, commit_metadata: Vec<u8>);
883}
884
885impl SinkImpl {
886    pub fn new(mut param: SinkParam) -> Result<Self> {
887        const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
888
889        // remove privatelink related properties if any
890        param.properties.remove(PRIVATE_LINK_TARGET_KEY);
891
892        let sink_type = param
893            .properties
894            .get(CONNECTOR_TYPE_KEY)
895            .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
896
897        let sink_type = sink_type.to_lowercase();
898        match_sink_name_str!(
899            sink_type.as_str(),
900            SinkType,
901            Ok(SinkType::try_from(param)?.into()),
902            |other| {
903                Err(SinkError::Config(anyhow!(
904                    "unsupported sink connector {}",
905                    other
906                )))
907            }
908        )
909    }
910
911    pub fn is_sink_into_table(&self) -> bool {
912        matches!(self, SinkImpl::Table(_))
913    }
914
915    pub fn is_blackhole(&self) -> bool {
916        matches!(self, SinkImpl::BlackHole(_))
917    }
918
919    pub fn is_coordinated_sink(&self) -> bool {
920        dispatch_sink!(self, sink, sink.is_coordinated_sink())
921    }
922}
923
924pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
925    SinkImpl::new(param)
926}
927
928macro_rules! def_sink_impl {
929    () => {
930        $crate::for_all_sinks! { def_sink_impl }
931    };
932    ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
933        #[derive(Debug)]
934        pub enum SinkImpl {
935            $(
936                $variant_name(Box<$sink_type>),
937            )*
938        }
939
940        $(
941            impl From<$sink_type> for SinkImpl {
942                fn from(sink: $sink_type) -> SinkImpl {
943                    SinkImpl::$variant_name(Box::new(sink))
944                }
945            }
946        )*
947    };
948}
949
950def_sink_impl!();
951
952pub type Result<T> = std::result::Result<T, SinkError>;
953
954#[derive(Error, Debug)]
955pub enum SinkError {
956    #[error("Kafka error: {0}")]
957    Kafka(#[from] rdkafka::error::KafkaError),
958    #[error("Kinesis error: {0}")]
959    Kinesis(
960        #[source]
961        #[backtrace]
962        anyhow::Error,
963    ),
964    #[error("Remote sink error: {0}")]
965    Remote(
966        #[source]
967        #[backtrace]
968        anyhow::Error,
969    ),
970    #[error("Encode error: {0}")]
971    Encode(String),
972    #[error("Avro error: {0}")]
973    Avro(#[from] apache_avro::Error),
974    #[error("Iceberg error: {0}")]
975    Iceberg(
976        #[source]
977        #[backtrace]
978        anyhow::Error,
979    ),
980    #[error("config error: {0}")]
981    Config(
982        #[source]
983        #[backtrace]
984        anyhow::Error,
985    ),
986    #[error("coordinator error: {0}")]
987    Coordinator(
988        #[source]
989        #[backtrace]
990        anyhow::Error,
991    ),
992    #[error("ClickHouse error: {0}")]
993    ClickHouse(String),
994    #[error("Redis error: {0}")]
995    Redis(String),
996    #[error("Mqtt error: {0}")]
997    Mqtt(
998        #[source]
999        #[backtrace]
1000        anyhow::Error,
1001    ),
1002    #[error("Nats error: {0}")]
1003    Nats(
1004        #[source]
1005        #[backtrace]
1006        anyhow::Error,
1007    ),
1008    #[error("Google Pub/Sub error: {0}")]
1009    GooglePubSub(
1010        #[source]
1011        #[backtrace]
1012        anyhow::Error,
1013    ),
1014    #[error("Doris/Starrocks connect error: {0}")]
1015    DorisStarrocksConnect(
1016        #[source]
1017        #[backtrace]
1018        anyhow::Error,
1019    ),
1020    #[error("Doris error: {0}")]
1021    Doris(String),
1022    #[error("DeltaLake error: {0}")]
1023    DeltaLake(
1024        #[source]
1025        #[backtrace]
1026        anyhow::Error,
1027    ),
1028    #[error("ElasticSearch/OpenSearch error: {0}")]
1029    ElasticSearchOpenSearch(
1030        #[source]
1031        #[backtrace]
1032        anyhow::Error,
1033    ),
1034    #[error("Starrocks error: {0}")]
1035    Starrocks(String),
1036    #[error("File error: {0}")]
1037    File(String),
1038    #[error("Pulsar error: {0}")]
1039    Pulsar(
1040        #[source]
1041        #[backtrace]
1042        anyhow::Error,
1043    ),
1044    #[error(transparent)]
1045    Internal(
1046        #[from]
1047        #[backtrace]
1048        anyhow::Error,
1049    ),
1050    #[error("BigQuery error: {0}")]
1051    BigQuery(
1052        #[source]
1053        #[backtrace]
1054        anyhow::Error,
1055    ),
1056    #[error("DynamoDB error: {0}")]
1057    DynamoDb(
1058        #[source]
1059        #[backtrace]
1060        anyhow::Error,
1061    ),
1062    #[error("SQL Server error: {0}")]
1063    SqlServer(
1064        #[source]
1065        #[backtrace]
1066        anyhow::Error,
1067    ),
1068    #[error("Postgres error: {0}")]
1069    Postgres(
1070        #[source]
1071        #[backtrace]
1072        anyhow::Error,
1073    ),
1074    #[error(transparent)]
1075    Connector(
1076        #[from]
1077        #[backtrace]
1078        ConnectorError,
1079    ),
1080    #[error("Secret error: {0}")]
1081    Secret(
1082        #[from]
1083        #[backtrace]
1084        SecretError,
1085    ),
1086    #[error("Mongodb error: {0}")]
1087    Mongodb(
1088        #[source]
1089        #[backtrace]
1090        anyhow::Error,
1091    ),
1092}
1093
1094impl From<sea_orm::DbErr> for SinkError {
1095    fn from(err: sea_orm::DbErr) -> Self {
1096        SinkError::Iceberg(anyhow!(err))
1097    }
1098}
1099
1100impl From<OpendalError> for SinkError {
1101    fn from(error: OpendalError) -> Self {
1102        SinkError::File(error.to_report_string())
1103    }
1104}
1105
1106impl From<parquet::errors::ParquetError> for SinkError {
1107    fn from(error: parquet::errors::ParquetError) -> Self {
1108        SinkError::File(error.to_report_string())
1109    }
1110}
1111
1112impl From<ArrayError> for SinkError {
1113    fn from(error: ArrayError) -> Self {
1114        SinkError::File(error.to_report_string())
1115    }
1116}
1117
1118impl From<RpcError> for SinkError {
1119    fn from(value: RpcError) -> Self {
1120        SinkError::Remote(anyhow!(value))
1121    }
1122}
1123
1124impl From<RedisError> for SinkError {
1125    fn from(value: RedisError) -> Self {
1126        SinkError::Redis(value.to_report_string())
1127    }
1128}
1129
1130impl From<tiberius::error::Error> for SinkError {
1131    fn from(err: tiberius::error::Error) -> Self {
1132        SinkError::SqlServer(anyhow!(err))
1133    }
1134}
1135
1136impl From<::elasticsearch::Error> for SinkError {
1137    fn from(err: ::elasticsearch::Error) -> Self {
1138        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1139    }
1140}
1141
1142impl From<::opensearch::Error> for SinkError {
1143    fn from(err: ::opensearch::Error) -> Self {
1144        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1145    }
1146}
1147
1148impl From<tokio_postgres::Error> for SinkError {
1149    fn from(err: tokio_postgres::Error) -> Self {
1150        SinkError::Postgres(anyhow!(err))
1151    }
1152}