Skip to main content

risingwave_connector/sink/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15feature_gated_sink_mod!(big_query, "bigquery");
16pub mod boxed;
17pub mod catalog;
18feature_gated_sink_mod!(clickhouse, ClickHouse, "clickhouse");
19pub mod coordinate;
20pub mod decouple_checkpoint_log_sink;
21feature_gated_sink_mod!(deltalake, DeltaLake, "deltalake");
22feature_gated_sink_mod!(doris, "doris");
23#[cfg(any(feature = "sink-doris", feature = "sink-starrocks"))]
24pub mod doris_starrocks_connector;
25pub mod dynamodb;
26pub mod elasticsearch_opensearch;
27pub mod encoder;
28pub mod file_sink;
29pub mod formatter;
30feature_gated_sink_mod!(google_pubsub, GooglePubSub, "google_pubsub");
31pub mod http;
32pub mod iceberg;
33pub mod kafka;
34pub mod kinesis;
35use risingwave_common::bail;
36use risingwave_pb::stream_plan::PbSinkSchemaChange;
37pub mod jdbc_jni_client;
38pub mod log_store;
39pub mod mock_coordination_client;
40pub mod mongodb;
41pub mod mqtt;
42pub mod nats;
43pub mod postgres;
44pub mod pulsar;
45pub mod redis;
46pub mod remote;
47pub mod snowflake_redshift;
48pub mod sqlserver;
49feature_gated_sink_mod!(starrocks, "starrocks");
50pub mod test_sink;
51pub mod trivial;
52pub mod turbopuffer;
53pub mod utils;
54pub mod writer;
55pub mod prelude {
56    pub use crate::sink::{
57        Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
58        SINK_USER_FORCE_COMPACTION, Sink, SinkError, SinkParam, SinkWriterParam,
59    };
60}
61
62use std::collections::BTreeMap;
63use std::future::Future;
64use std::sync::{Arc, LazyLock};
65
66use ::redis::RedisError;
67use anyhow::anyhow;
68use async_trait::async_trait;
69use chrono_tz::{Tz, UTC};
70use decouple_checkpoint_log_sink::{
71    COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
72    DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
73};
74use futures::future::BoxFuture;
75use opendal::Error as OpendalError;
76use prometheus::Registry;
77use risingwave_common::array::ArrayError;
78use risingwave_common::bitmap::Bitmap;
79use risingwave_common::catalog::{ColumnDesc, Field, Schema};
80use risingwave_common::config::StreamingConfig;
81use risingwave_common::hash::ActorId;
82use risingwave_common::metrics::{
83    LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
84    LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
85};
86use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
87use risingwave_common::secret::{LocalSecretManager, SecretError};
88use risingwave_common::session_config::sink_decouple::SinkDecouple;
89use risingwave_common::{
90    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
91    register_guarded_int_gauge_vec_with_registry,
92};
93use risingwave_pb::catalog::PbSinkType;
94use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
95use risingwave_pb::id::ExecutorId;
96use risingwave_rpc_client::MetaClient;
97use risingwave_rpc_client::error::RpcError;
98use starrocks::STARROCKS_SINK;
99use thiserror::Error;
100use thiserror_ext::AsReport;
101use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
102pub use tracing;
103
104use self::catalog::{SinkFormatDesc, SinkType};
105use self::clickhouse::CLICKHOUSE_SINK;
106use self::deltalake::DELTALAKE_SINK;
107use self::iceberg::ICEBERG_SINK;
108use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
109use crate::WithPropertiesExt;
110use crate::connector_common::IcebergSinkCompactionUpdate;
111use crate::error::{ConnectorError, ConnectorResult};
112use crate::sink::boxed::{BoxSinglePhaseCoordinator, BoxTwoPhaseCoordinator};
113use crate::sink::catalog::desc::SinkDesc;
114use crate::sink::catalog::{SinkCatalog, SinkId};
115use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
116use crate::sink::file_sink::fs::FsSink;
117use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
118use crate::sink::snowflake_redshift::snowflake::SNOWFLAKE_SINK_V2;
119use crate::sink::utils::feature_gated_sink_mod;
120
121const BOUNDED_CHANNEL_SIZE: usize = 16;
122#[macro_export]
123macro_rules! for_all_sinks {
124    ($macro:path $(, $arg:tt)*) => {
125        $macro! {
126            {
127                { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
128                { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
129                { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
130                { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
131                { Http, $crate::sink::http::HttpSink, $crate::sink::http::HttpConfig },
132                { Turbopuffer, $crate::sink::turbopuffer::TurbopufferSink, $crate::sink::turbopuffer::TurbopufferConfig },
133                { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
134                { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
135                { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
136                { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
137                { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
138                { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
139                { Jdbc, $crate::sink::remote::JdbcSink, () },
140                { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
141                { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
142                { Cassandra, $crate::sink::remote::CassandraSink, () },
143                { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
144                { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
145                { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
146
147                { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
148                { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
149                { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
150
151                { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
152                { SnowflakeV2, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Sink, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Config },
153                { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
154                { RedShift, $crate::sink::snowflake_redshift::redshift::RedshiftSink, $crate::sink::snowflake_redshift::redshift::RedShiftConfig },
155                { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
156                { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
157                { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
158                { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
159                { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
160                { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
161
162                { Test, $crate::sink::test_sink::TestSink, () },
163                { Table, $crate::sink::trivial::TableSink, () }
164            }
165            $(,$arg)*
166        }
167    };
168}
169
170#[macro_export]
171macro_rules! generate_config_use_clauses {
172    ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
173        $(
174            $crate::generate_config_use_single! { $($config_type)+ }
175        )*
176    };
177}
178
179#[macro_export]
180macro_rules! generate_config_use_single {
181    // Skip () config types
182    (()) => {};
183
184    // Generate use clause for actual config types
185    ($config_type:path) => {
186        #[allow(unused_imports)]
187        pub(super) use $config_type;
188    };
189}
190
191// Convenience macro that uses for_all_sinks
192#[macro_export]
193macro_rules! use_all_sink_configs {
194    () => {
195        $crate::for_all_sinks! { $crate::generate_config_use_clauses }
196    };
197}
198
199#[macro_export]
200macro_rules! dispatch_sink {
201    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
202        use $crate::sink::SinkImpl;
203
204        match $impl {
205            $(
206                SinkImpl::$variant_name($sink) => $body,
207            )*
208        }
209    }};
210    ($impl:expr, $sink:ident, $body:expr) => {{
211        $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
212    }};
213}
214
215#[macro_export]
216macro_rules! match_sink_name_str {
217    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
218        use $crate::sink::Sink;
219        match $name_str {
220            $(
221                <$sink_type>::SINK_NAME => {
222                    type $type_name = $sink_type;
223                    {
224                        $body
225                    }
226                },
227            )*
228            other => ($on_other_closure)(other),
229        }
230    }};
231    ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
232        $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
233    }};
234}
235
236pub const CONNECTOR_TYPE_KEY: &str = "connector";
237pub const SINK_TYPE_OPTION: &str = "type";
238/// `snapshot = false` corresponds to [`risingwave_pb::stream_plan::StreamScanType::UpstreamOnly`]
239pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
240pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
241pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
242pub const SINK_TYPE_UPSERT: &str = "upsert";
243pub const SINK_TYPE_RETRACT: &str = "retract";
244/// Whether to drop DELETE and convert UPDATE to INSERT in the sink executor.
245pub const SINK_USER_IGNORE_DELETE_OPTION: &str = "ignore_delete";
246/// Alias for [`SINK_USER_IGNORE_DELETE_OPTION`], kept for backward compatibility.
247pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
248pub const SINK_USER_FORCE_COMPACTION: &str = "force_compaction";
249
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct SinkParam {
252    pub sink_id: SinkId,
253    pub sink_name: String,
254    pub properties: BTreeMap<String, String>,
255    pub columns: Vec<ColumnDesc>,
256    /// User-defined primary key indices for upsert sink, if any.
257    pub downstream_pk: Option<Vec<usize>>,
258    pub sink_type: SinkType,
259    /// Whether to drop DELETE and convert UPDATE to INSERT in the sink executor.
260    pub ignore_delete: bool,
261    pub format_desc: Option<SinkFormatDesc>,
262    pub db_name: String,
263
264    /// - For `CREATE SINK ... FROM ...`, the name of the source table.
265    /// - For `CREATE SINK ... AS <query>`, the name of the sink itself.
266    ///
267    /// See also `gen_sink_plan`.
268    // TODO(eric): Why need these 2 fields (db_name and sink_from_name)?
269    pub sink_from_name: String,
270}
271
272impl SinkParam {
273    pub fn from_proto(pb_param: PbSinkParam) -> Self {
274        let ignore_delete = pb_param.ignore_delete();
275        let table_schema = pb_param.table_schema.expect("should contain table schema");
276        let format_desc = match pb_param.format_desc {
277            Some(f) => f.try_into().ok(),
278            None => {
279                let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
280                let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
281                match (connector, r#type) {
282                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
283                    _ => None,
284                }
285            }
286        };
287        Self {
288            sink_id: SinkId::from(pb_param.sink_id),
289            sink_name: pb_param.sink_name,
290            properties: pb_param.properties,
291            columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
292            downstream_pk: if table_schema.pk_indices.is_empty() {
293                None
294            } else {
295                Some(
296                    (table_schema.pk_indices.iter())
297                        .map(|i| *i as usize)
298                        .collect(),
299                )
300            },
301            sink_type: SinkType::from_proto(
302                PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
303            ),
304            ignore_delete,
305            format_desc,
306            db_name: pb_param.db_name,
307            sink_from_name: pb_param.sink_from_name,
308        }
309    }
310
311    pub fn to_proto(&self) -> PbSinkParam {
312        PbSinkParam {
313            sink_id: self.sink_id,
314            sink_name: self.sink_name.clone(),
315            properties: self.properties.clone(),
316            table_schema: Some(TableSchema {
317                columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
318                pk_indices: (self.downstream_pk.as_ref())
319                    .map_or_else(Vec::new, |pk| pk.iter().map(|i| *i as u32).collect()),
320            }),
321            sink_type: self.sink_type.to_proto().into(),
322            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
323            db_name: self.db_name.clone(),
324            sink_from_name: self.sink_from_name.clone(),
325            raw_ignore_delete: self.ignore_delete,
326        }
327    }
328
329    pub fn schema(&self) -> Schema {
330        Schema {
331            fields: self.columns.iter().map(Field::from).collect(),
332        }
333    }
334
335    /// Get the downstream primary key indices specified by the user. If not specified, return
336    /// an empty vector.
337    ///
338    /// Prefer directly accessing the `downstream_pk` field, as it uses `None` to represent
339    /// unspecified values, making it clearer.
340    pub fn downstream_pk_or_empty(&self) -> Vec<usize> {
341        self.downstream_pk.clone().unwrap_or_default()
342    }
343
344    // `SinkParams` should only be used when there is a secret context.
345    // FIXME: Use a new type for `SinkFormatDesc` with properties contain filled secrets.
346    pub fn fill_secret_for_format_desc(
347        format_desc: Option<SinkFormatDesc>,
348    ) -> Result<Option<SinkFormatDesc>> {
349        match format_desc {
350            Some(mut format_desc) => {
351                format_desc.options = LocalSecretManager::global()
352                    .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
353                Ok(Some(format_desc))
354            }
355            None => Ok(None),
356        }
357    }
358
359    /// Try to convert a `SinkCatalog` to a `SinkParam` and fill the secrets to properties.
360    pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
361        let columns = sink_catalog
362            .visible_columns()
363            .map(|col| col.column_desc.clone())
364            .collect();
365        let properties_with_secret = LocalSecretManager::global()
366            .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
367        let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
368        Ok(Self {
369            sink_id: sink_catalog.id,
370            sink_name: sink_catalog.name,
371            properties: properties_with_secret,
372            columns,
373            downstream_pk: sink_catalog.downstream_pk,
374            sink_type: sink_catalog.sink_type,
375            ignore_delete: sink_catalog.ignore_delete,
376            format_desc: format_desc_with_secret,
377            db_name: sink_catalog.db_name,
378            sink_from_name: sink_catalog.sink_from_name,
379        })
380    }
381}
382
383pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
384    use crate::enforce_secret::EnforceSecret;
385
386    let connector = props
387        .get_connector()
388        .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
389    let key_iter = props.key_iter();
390    match_sink_name_str!(
391        connector.as_str(),
392        PropType,
393        PropType::enforce_secret(key_iter),
394        |other| bail!("connector '{}' is not supported", other)
395    )
396}
397
398pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
399    LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
400
401#[derive(Clone)]
402pub struct SinkMetrics {
403    pub sink_commit_duration: LabelGuardedHistogramVec,
404    pub connector_sink_rows_received: LabelGuardedIntCounterVec,
405
406    // Log store writer metrics
407    pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
408    pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
409    pub log_store_write_rows: LabelGuardedIntCounterVec,
410
411    // Log store reader metrics
412    pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
413    pub log_store_read_rows: LabelGuardedIntCounterVec,
414    pub log_store_read_bytes: LabelGuardedIntCounterVec,
415    pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
416
417    // Iceberg metrics
418    pub iceberg_write_qps: LabelGuardedIntCounterVec,
419    pub iceberg_write_latency: LabelGuardedHistogramVec,
420    pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
421    pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
422    pub iceberg_partition_num: LabelGuardedIntGaugeVec,
423    pub iceberg_write_bytes: LabelGuardedIntCounterVec,
424    pub iceberg_snapshot_num: LabelGuardedIntGaugeVec,
425}
426
427impl SinkMetrics {
428    pub fn new(registry: &Registry) -> Self {
429        let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
430            "sink_commit_duration",
431            "Duration of commit op in sink",
432            &["actor_id", "connector", "sink_id", "sink_name"],
433            registry
434        )
435        .unwrap();
436
437        let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
438            "connector_sink_rows_received",
439            "Number of rows received by sink",
440            &["actor_id", "connector_type", "sink_id", "sink_name"],
441            registry
442        )
443        .unwrap();
444
445        let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
446            "log_store_first_write_epoch",
447            "The first write epoch of log store",
448            &["actor_id", "sink_id", "sink_name"],
449            registry
450        )
451        .unwrap();
452
453        let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
454            "log_store_latest_write_epoch",
455            "The latest write epoch of log store",
456            &["actor_id", "sink_id", "sink_name"],
457            registry
458        )
459        .unwrap();
460
461        let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
462            "log_store_write_rows",
463            "The write rate of rows",
464            &["actor_id", "sink_id", "sink_name"],
465            registry
466        )
467        .unwrap();
468
469        let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
470            "log_store_latest_read_epoch",
471            "The latest read epoch of log store",
472            &["actor_id", "connector", "sink_id", "sink_name"],
473            registry
474        )
475        .unwrap();
476
477        let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
478            "log_store_read_rows",
479            "The read rate of rows",
480            &["actor_id", "connector", "sink_id", "sink_name"],
481            registry
482        )
483        .unwrap();
484
485        let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
486            "log_store_read_bytes",
487            "Total size of chunks read by log reader",
488            &["actor_id", "connector", "sink_id", "sink_name"],
489            registry
490        )
491        .unwrap();
492
493        let log_store_reader_wait_new_future_duration_ns =
494            register_guarded_int_counter_vec_with_registry!(
495                "log_store_reader_wait_new_future_duration_ns",
496                "Accumulated duration of LogReader to wait for next call to create future",
497                &["actor_id", "connector", "sink_id", "sink_name"],
498                registry
499            )
500            .unwrap();
501
502        let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
503            "iceberg_write_qps",
504            "The qps of iceberg writer",
505            &["actor_id", "sink_id", "sink_name"],
506            registry
507        )
508        .unwrap();
509
510        let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
511            "iceberg_write_latency",
512            "The latency of iceberg writer",
513            &["actor_id", "sink_id", "sink_name"],
514            registry
515        )
516        .unwrap();
517
518        let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
519            "iceberg_rolling_unflushed_data_file",
520            "The unflushed data file count of iceberg rolling writer",
521            &["actor_id", "sink_id", "sink_name"],
522            registry
523        )
524        .unwrap();
525
526        let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
527            "iceberg_position_delete_cache_num",
528            "The delete cache num of iceberg position delete writer",
529            &["actor_id", "sink_id", "sink_name"],
530            registry
531        )
532        .unwrap();
533
534        let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
535            "iceberg_partition_num",
536            "The partition num of iceberg partition writer",
537            &["actor_id", "sink_id", "sink_name"],
538            registry
539        )
540        .unwrap();
541
542        let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
543            "iceberg_write_bytes",
544            "The write bytes of iceberg writer",
545            &["actor_id", "sink_id", "sink_name"],
546            registry
547        )
548        .unwrap();
549
550        let iceberg_snapshot_num = register_guarded_int_gauge_vec_with_registry!(
551            "iceberg_snapshot_num",
552            "The snapshot number of iceberg table",
553            &["sink_name", "catalog_name", "table_name"],
554            registry
555        )
556        .unwrap();
557
558        Self {
559            sink_commit_duration,
560            connector_sink_rows_received,
561            log_store_first_write_epoch,
562            log_store_latest_write_epoch,
563            log_store_write_rows,
564            log_store_latest_read_epoch,
565            log_store_read_rows,
566            log_store_read_bytes,
567            log_store_reader_wait_new_future_duration_ns,
568            iceberg_write_qps,
569            iceberg_write_latency,
570            iceberg_rolling_unflushed_data_file,
571            iceberg_position_delete_cache_num,
572            iceberg_partition_num,
573            iceberg_write_bytes,
574            iceberg_snapshot_num,
575        }
576    }
577}
578
579#[derive(Clone)]
580pub struct SinkWriterParam {
581    // TODO(eric): deprecate executor_id
582    pub executor_id: ExecutorId,
583    pub vnode_bitmap: Option<Bitmap>,
584    pub meta_client: Option<SinkMetaClient>,
585    // The val has two effect:
586    // 1. Indicates that the sink will accpect the data chunk with extra partition value column.
587    // 2. The index of the extra partition value column.
588    // More detail of partition value column, see `PartitionComputeInfo`
589    pub extra_partition_col_idx: Option<usize>,
590
591    pub actor_id: ActorId,
592    pub sink_id: SinkId,
593    pub sink_name: String,
594    pub connector: String,
595    pub streaming_config: StreamingConfig,
596    pub time_zone: Tz,
597}
598
599#[derive(Clone)]
600pub struct SinkWriterMetrics {
601    pub sink_commit_duration: LabelGuardedHistogram,
602    pub connector_sink_rows_received: LabelGuardedIntCounter,
603}
604
605impl SinkWriterMetrics {
606    pub fn new(writer_param: &SinkWriterParam) -> Self {
607        let labels = [
608            &writer_param.actor_id.to_string(),
609            writer_param.connector.as_str(),
610            &writer_param.sink_id.to_string(),
611            writer_param.sink_name.as_str(),
612        ];
613        let sink_commit_duration = GLOBAL_SINK_METRICS
614            .sink_commit_duration
615            .with_guarded_label_values(&labels);
616        let connector_sink_rows_received = GLOBAL_SINK_METRICS
617            .connector_sink_rows_received
618            .with_guarded_label_values(&labels);
619        Self {
620            sink_commit_duration,
621            connector_sink_rows_received,
622        }
623    }
624
625    #[cfg(test)]
626    pub fn for_test() -> Self {
627        Self {
628            sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
629            connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
630        }
631    }
632}
633
634#[derive(Clone)]
635pub enum SinkMetaClient {
636    MetaClient(MetaClient),
637    MockMetaClient(MockMetaClient),
638}
639
640impl SinkMetaClient {
641    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
642        match self {
643            SinkMetaClient::MetaClient(meta_client) => {
644                SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
645                    meta_client.sink_coordinate_client().await,
646                )
647            }
648            SinkMetaClient::MockMetaClient(mock_meta_client) => {
649                SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
650                    mock_meta_client.sink_coordinate_client(),
651                )
652            }
653        }
654    }
655
656    pub async fn add_sink_fail_evet_log(
657        &self,
658        sink_id: SinkId,
659        sink_name: String,
660        connector: String,
661        error: String,
662    ) {
663        match self {
664            SinkMetaClient::MetaClient(meta_client) => {
665                match meta_client
666                    .add_sink_fail_evet(sink_id, sink_name, connector, error)
667                    .await
668                {
669                    Ok(_) => {}
670                    Err(e) => {
671                        tracing::warn!(error = %e.as_report(), %sink_id, "Failed to add sink fail event to event log.");
672                    }
673                }
674            }
675            SinkMetaClient::MockMetaClient(_) => {}
676        }
677    }
678}
679
680impl SinkWriterParam {
681    pub fn for_test() -> Self {
682        SinkWriterParam {
683            executor_id: Default::default(),
684            vnode_bitmap: Default::default(),
685            meta_client: Default::default(),
686            extra_partition_col_idx: Default::default(),
687
688            actor_id: 1.into(),
689            sink_id: SinkId::new(1),
690            sink_name: "test_sink".to_owned(),
691            connector: "test_connector".to_owned(),
692            streaming_config: StreamingConfig::default(),
693            time_zone: UTC,
694        }
695    }
696}
697
698fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
699    matches!(
700        sink_name,
701        ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK | SNOWFLAKE_SINK_V2
702    )
703}
704pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
705    const SINK_NAME: &'static str;
706
707    type LogSinker: LogSinker;
708
709    fn set_default_commit_checkpoint_interval(
710        desc: &mut SinkDesc,
711        user_specified: &SinkDecouple,
712    ) -> Result<()> {
713        if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
714            match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
715                Some(commit_checkpoint_interval) => {
716                    let commit_checkpoint_interval = commit_checkpoint_interval
717                        .parse::<u64>()
718                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
719                    if matches!(user_specified, SinkDecouple::Disable)
720                        && commit_checkpoint_interval > 1
721                    {
722                        return Err(SinkError::Config(anyhow!(
723                            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
724                        )));
725                    }
726                }
727                None => match user_specified {
728                    SinkDecouple::Default | SinkDecouple::Enable => {
729                        if matches!(Self::SINK_NAME, ICEBERG_SINK) {
730                            desc.properties.insert(
731                                COMMIT_CHECKPOINT_INTERVAL.to_owned(),
732                                ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL.to_string(),
733                            );
734                        } else {
735                            desc.properties.insert(
736                                COMMIT_CHECKPOINT_INTERVAL.to_owned(),
737                                DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
738                            );
739                        }
740                    }
741                    SinkDecouple::Disable => {
742                        desc.properties.insert(
743                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
744                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
745                        );
746                    }
747                },
748            }
749        }
750        Ok(())
751    }
752
753    /// `user_specified` is the value of `sink_decouple` config.
754    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
755        match user_specified {
756            SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
757            SinkDecouple::Disable => Ok(false),
758        }
759    }
760
761    fn support_schema_change() -> bool {
762        false
763    }
764
765    fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
766        Ok(())
767    }
768
769    async fn validate(&self) -> Result<()>;
770    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
771
772    fn is_coordinated_sink(&self) -> bool {
773        false
774    }
775
776    async fn new_coordinator(
777        &self,
778        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
779    ) -> Result<SinkCommitCoordinator> {
780        Err(SinkError::Coordinator(anyhow!("no coordinator")))
781    }
782}
783
784pub trait SinkLogReader: Send {
785    fn start_from(
786        &mut self,
787        start_offset: Option<u64>,
788    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
789    /// Emit the next item.
790    ///
791    /// The implementation should ensure that the future is cancellation safe.
792    fn next_item(
793        &mut self,
794    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
795
796    /// Mark that all items emitted so far have been consumed and it is safe to truncate the log
797    /// from the current offset.
798    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
799}
800
801impl<R: LogReader> SinkLogReader for &mut R {
802    fn next_item(
803        &mut self,
804    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
805        <R as LogReader>::next_item(*self)
806    }
807
808    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
809        <R as LogReader>::truncate(*self, offset)
810    }
811
812    fn start_from(
813        &mut self,
814        start_offset: Option<u64>,
815    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
816        <R as LogReader>::start_from(*self, start_offset)
817    }
818}
819
820#[async_trait]
821pub trait LogSinker: 'static + Send {
822    // Note: Please rebuild the log reader's read stream before consuming the log store,
823    async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
824}
825pub type SinkCommittedEpochSubscriber = Arc<
826    dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
827        + Send
828        + Sync
829        + 'static,
830>;
831
832pub enum SinkCommitCoordinator {
833    SinglePhase(BoxSinglePhaseCoordinator),
834    TwoPhase(BoxTwoPhaseCoordinator),
835}
836
837#[async_trait]
838pub trait SinglePhaseCommitCoordinator {
839    /// Initialize the sink committer coordinator.
840    async fn init(&mut self) -> Result<()>;
841
842    /// Commit data directly using single-phase strategy.
843    async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
844
845    /// Idempotent implementation is required, because `commit_schema_change` in the same epoch could be called multiple
846    /// times.
847    async fn commit_schema_change(
848        &mut self,
849        _epoch: u64,
850        _schema_change: PbSinkSchemaChange,
851    ) -> Result<()> {
852        Err(SinkError::Coordinator(anyhow!(
853            "Schema change is not implemented for single-phase commit coordinator {}",
854            std::any::type_name::<Self>()
855        )))
856    }
857}
858
859#[async_trait]
860pub trait TwoPhaseCommitCoordinator {
861    /// Initialize the sink committer coordinator.
862    async fn init(&mut self) -> Result<()>;
863
864    /// Return serialized commit metadata to be passed to `commit`.
865    async fn pre_commit(
866        &mut self,
867        epoch: u64,
868        metadata: Vec<SinkMetadata>,
869        schema_change: Option<PbSinkSchemaChange>,
870    ) -> Result<Option<Vec<u8>>>;
871
872    /// Idempotent implementation is required, because `commit_data` in the same epoch could be called multiple times.
873    async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()>;
874
875    /// Idempotent implementation is required, because `commit_schema_change` in the same epoch could be called multiple
876    /// times.
877    async fn commit_schema_change(
878        &mut self,
879        _epoch: u64,
880        _schema_change: PbSinkSchemaChange,
881    ) -> Result<()> {
882        Err(SinkError::Coordinator(anyhow!(
883            "Schema change is not implemented for two-phase commit coordinator {}",
884            std::any::type_name::<Self>()
885        )))
886    }
887
888    /// Idempotent implementation is required, because `abort` in the same epoch could be called multiple times.
889    async fn abort(&mut self, epoch: u64, commit_metadata: Vec<u8>);
890}
891
892impl SinkImpl {
893    pub fn new(mut param: SinkParam) -> Result<Self> {
894        const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
895
896        // remove privatelink related properties if any
897        param.properties.remove(PRIVATE_LINK_TARGET_KEY);
898
899        let sink_type = param
900            .properties
901            .get(CONNECTOR_TYPE_KEY)
902            .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
903
904        let sink_type = sink_type.to_lowercase();
905        match_sink_name_str!(
906            sink_type.as_str(),
907            SinkType,
908            Ok(SinkType::try_from(param)?.into()),
909            |other| {
910                Err(SinkError::Config(anyhow!(
911                    "unsupported sink connector {}",
912                    other
913                )))
914            }
915        )
916    }
917
918    pub fn is_sink_into_table(&self) -> bool {
919        matches!(self, SinkImpl::Table(_))
920    }
921
922    pub fn is_blackhole(&self) -> bool {
923        matches!(self, SinkImpl::BlackHole(_))
924    }
925
926    pub fn is_coordinated_sink(&self) -> bool {
927        dispatch_sink!(self, sink, sink.is_coordinated_sink())
928    }
929}
930
931pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
932    SinkImpl::new(param)
933}
934
935macro_rules! def_sink_impl {
936    () => {
937        $crate::for_all_sinks! { def_sink_impl }
938    };
939    ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
940        #[derive(Debug)]
941        pub enum SinkImpl {
942            $(
943                $variant_name(Box<$sink_type>),
944            )*
945        }
946
947        $(
948            impl From<$sink_type> for SinkImpl {
949                fn from(sink: $sink_type) -> SinkImpl {
950                    SinkImpl::$variant_name(Box::new(sink))
951                }
952            }
953        )*
954    };
955}
956
957def_sink_impl!();
958
959pub type Result<T> = std::result::Result<T, SinkError>;
960
961#[derive(Error, Debug)]
962pub enum SinkError {
963    #[error("Kafka error: {0}")]
964    Kafka(#[from] rdkafka::error::KafkaError),
965    #[error("Kinesis error: {0}")]
966    Kinesis(
967        #[source]
968        #[backtrace]
969        anyhow::Error,
970    ),
971    #[error("Remote sink error: {0}")]
972    Remote(
973        #[source]
974        #[backtrace]
975        anyhow::Error,
976    ),
977    #[error("Encode error: {0}")]
978    Encode(String),
979    #[error("Avro error: {0}")]
980    Avro(#[from] apache_avro::Error),
981    #[error("Iceberg error: {0}")]
982    Iceberg(
983        #[source]
984        #[backtrace]
985        anyhow::Error,
986    ),
987    #[error("config error: {0}")]
988    Config(
989        #[source]
990        #[backtrace]
991        anyhow::Error,
992    ),
993    #[error("coordinator error: {0}")]
994    Coordinator(
995        #[source]
996        #[backtrace]
997        anyhow::Error,
998    ),
999    #[error("ClickHouse error: {0}")]
1000    ClickHouse(String),
1001    #[error("Redis error: {0}")]
1002    Redis(String),
1003    #[error("Http error: {0}")]
1004    Http(
1005        #[source]
1006        #[backtrace]
1007        anyhow::Error,
1008    ),
1009    #[error("Mqtt error: {0}")]
1010    Mqtt(
1011        #[source]
1012        #[backtrace]
1013        anyhow::Error,
1014    ),
1015    #[error("Nats error: {0}")]
1016    Nats(
1017        #[source]
1018        #[backtrace]
1019        anyhow::Error,
1020    ),
1021    #[error("Google Pub/Sub error: {0}")]
1022    GooglePubSub(
1023        #[source]
1024        #[backtrace]
1025        anyhow::Error,
1026    ),
1027    #[error("Doris/Starrocks connect error: {0}")]
1028    DorisStarrocksConnect(
1029        #[source]
1030        #[backtrace]
1031        anyhow::Error,
1032    ),
1033    #[error("Doris error: {0}")]
1034    Doris(String),
1035    #[error("DeltaLake error: {0}")]
1036    DeltaLake(
1037        #[source]
1038        #[backtrace]
1039        anyhow::Error,
1040    ),
1041    #[error("ElasticSearch/OpenSearch error: {0}")]
1042    ElasticSearchOpenSearch(
1043        #[source]
1044        #[backtrace]
1045        anyhow::Error,
1046    ),
1047    #[error("Starrocks error: {0}")]
1048    Starrocks(String),
1049    #[error("File error: {0}")]
1050    File(String),
1051    #[error("Pulsar error: {0}")]
1052    Pulsar(
1053        #[source]
1054        #[backtrace]
1055        anyhow::Error,
1056    ),
1057    #[error(transparent)]
1058    Internal(
1059        #[from]
1060        #[backtrace]
1061        anyhow::Error,
1062    ),
1063    #[error("BigQuery error: {0}")]
1064    BigQuery(
1065        #[source]
1066        #[backtrace]
1067        anyhow::Error,
1068    ),
1069    #[error("DynamoDB error: {0}")]
1070    DynamoDb(
1071        #[source]
1072        #[backtrace]
1073        anyhow::Error,
1074    ),
1075    #[error("SQL Server error: {0}")]
1076    SqlServer(
1077        #[source]
1078        #[backtrace]
1079        anyhow::Error,
1080    ),
1081    #[error("Postgres error: {0}")]
1082    Postgres(
1083        #[source]
1084        #[backtrace]
1085        anyhow::Error,
1086    ),
1087    #[error(transparent)]
1088    Connector(
1089        #[from]
1090        #[backtrace]
1091        ConnectorError,
1092    ),
1093    #[error("Secret error: {0}")]
1094    Secret(
1095        #[from]
1096        #[backtrace]
1097        SecretError,
1098    ),
1099    #[error("Mongodb error: {0}")]
1100    Mongodb(
1101        #[source]
1102        #[backtrace]
1103        anyhow::Error,
1104    ),
1105    #[error("Redshift error: {0}")]
1106    Redshift(
1107        #[source]
1108        #[backtrace]
1109        anyhow::Error,
1110    ),
1111}
1112
1113#[allow(clippy::disallowed_types)]
1114impl From<::iceberg::Error> for SinkError {
1115    fn from(err: ::iceberg::Error) -> Self {
1116        SinkError::Iceberg(anyhow!(err))
1117    }
1118}
1119
1120impl From<sea_orm::DbErr> for SinkError {
1121    fn from(err: sea_orm::DbErr) -> Self {
1122        SinkError::Iceberg(anyhow!(err))
1123    }
1124}
1125
1126impl From<OpendalError> for SinkError {
1127    fn from(error: OpendalError) -> Self {
1128        SinkError::File(error.to_report_string())
1129    }
1130}
1131
1132impl From<parquet::errors::ParquetError> for SinkError {
1133    fn from(error: parquet::errors::ParquetError) -> Self {
1134        SinkError::File(error.to_report_string())
1135    }
1136}
1137
1138impl From<ArrayError> for SinkError {
1139    fn from(error: ArrayError) -> Self {
1140        SinkError::File(error.to_report_string())
1141    }
1142}
1143
1144impl From<RpcError> for SinkError {
1145    fn from(value: RpcError) -> Self {
1146        SinkError::Remote(anyhow!(value))
1147    }
1148}
1149
1150impl From<RedisError> for SinkError {
1151    fn from(value: RedisError) -> Self {
1152        SinkError::Redis(value.to_report_string())
1153    }
1154}
1155
1156impl From<tiberius::error::Error> for SinkError {
1157    fn from(err: tiberius::error::Error) -> Self {
1158        SinkError::SqlServer(anyhow!(err))
1159    }
1160}
1161
1162impl From<::elasticsearch::Error> for SinkError {
1163    fn from(err: ::elasticsearch::Error) -> Self {
1164        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1165    }
1166}
1167
1168impl From<::opensearch::Error> for SinkError {
1169    fn from(err: ::opensearch::Error) -> Self {
1170        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1171    }
1172}
1173
1174impl From<tokio_postgres::Error> for SinkError {
1175    fn from(err: tokio_postgres::Error) -> Self {
1176        SinkError::Postgres(anyhow!(err))
1177    }
1178}