risingwave_connector/sink/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15feature_gated_sink_mod!(big_query, "bigquery");
16pub mod boxed;
17pub mod catalog;
18feature_gated_sink_mod!(clickhouse, ClickHouse, "clickhouse");
19pub mod coordinate;
20pub mod decouple_checkpoint_log_sink;
21feature_gated_sink_mod!(deltalake, DeltaLake, "deltalake");
22feature_gated_sink_mod!(doris, "doris");
23#[cfg(any(feature = "sink-doris", feature = "sink-starrocks"))]
24pub mod doris_starrocks_connector;
25pub mod dynamodb;
26pub mod elasticsearch_opensearch;
27pub mod encoder;
28pub mod file_sink;
29pub mod formatter;
30feature_gated_sink_mod!(google_pubsub, GooglePubSub, "google_pubsub");
31pub mod http;
32pub mod iceberg;
33pub mod kafka;
34pub mod kinesis;
35use risingwave_common::bail;
36use risingwave_pb::stream_plan::PbSinkSchemaChange;
37pub mod jdbc_jni_client;
38pub mod log_store;
39pub mod mock_coordination_client;
40pub mod mongodb;
41pub mod mqtt;
42pub mod nats;
43pub mod postgres;
44pub mod pulsar;
45pub mod redis;
46pub mod remote;
47pub mod snowflake_redshift;
48pub mod sqlserver;
49feature_gated_sink_mod!(starrocks, "starrocks");
50pub mod test_sink;
51pub mod trivial;
52pub mod utils;
53pub mod writer;
54pub mod prelude {
55    pub use crate::sink::{
56        Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
57        SINK_USER_FORCE_COMPACTION, Sink, SinkError, SinkParam, SinkWriterParam,
58    };
59}
60
61use std::collections::BTreeMap;
62use std::future::Future;
63use std::sync::{Arc, LazyLock};
64
65use ::redis::RedisError;
66use anyhow::anyhow;
67use async_trait::async_trait;
68use decouple_checkpoint_log_sink::{
69    COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
70    DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
71};
72use futures::future::BoxFuture;
73use opendal::Error as OpendalError;
74use prometheus::Registry;
75use risingwave_common::array::ArrayError;
76use risingwave_common::bitmap::Bitmap;
77use risingwave_common::catalog::{ColumnDesc, Field, Schema};
78use risingwave_common::config::StreamingConfig;
79use risingwave_common::hash::ActorId;
80use risingwave_common::metrics::{
81    LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
82    LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
83};
84use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
85use risingwave_common::secret::{LocalSecretManager, SecretError};
86use risingwave_common::session_config::sink_decouple::SinkDecouple;
87use risingwave_common::{
88    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
89    register_guarded_int_gauge_vec_with_registry,
90};
91use risingwave_pb::catalog::PbSinkType;
92use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
93use risingwave_pb::id::ExecutorId;
94use risingwave_rpc_client::MetaClient;
95use risingwave_rpc_client::error::RpcError;
96use starrocks::STARROCKS_SINK;
97use thiserror::Error;
98use thiserror_ext::AsReport;
99use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
100pub use tracing;
101
102use self::catalog::{SinkFormatDesc, SinkType};
103use self::clickhouse::CLICKHOUSE_SINK;
104use self::deltalake::DELTALAKE_SINK;
105use self::iceberg::ICEBERG_SINK;
106use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
107use crate::WithPropertiesExt;
108use crate::connector_common::IcebergSinkCompactionUpdate;
109use crate::error::{ConnectorError, ConnectorResult};
110use crate::sink::boxed::{BoxSinglePhaseCoordinator, BoxTwoPhaseCoordinator};
111use crate::sink::catalog::desc::SinkDesc;
112use crate::sink::catalog::{SinkCatalog, SinkId};
113use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
114use crate::sink::file_sink::fs::FsSink;
115use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
116use crate::sink::snowflake_redshift::snowflake::SNOWFLAKE_SINK_V2;
117use crate::sink::utils::feature_gated_sink_mod;
118
119const BOUNDED_CHANNEL_SIZE: usize = 16;
120#[macro_export]
121macro_rules! for_all_sinks {
122    ($macro:path $(, $arg:tt)*) => {
123        $macro! {
124            {
125                { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
126                { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
127                { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
128                { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
129                { Http, $crate::sink::http::HttpSink, $crate::sink::http::HttpConfig },
130                { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
131                { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
132                { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
133                { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
134                { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
135                { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
136                { Jdbc, $crate::sink::remote::JdbcSink, () },
137                { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
138                { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
139                { Cassandra, $crate::sink::remote::CassandraSink, () },
140                { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
141                { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
142                { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
143
144                { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
145                { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
146                { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
147
148                { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
149                { SnowflakeV2, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Sink, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Config },
150                { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
151                { RedShift, $crate::sink::snowflake_redshift::redshift::RedshiftSink, $crate::sink::snowflake_redshift::redshift::RedShiftConfig },
152                { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
153                { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
154                { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
155                { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
156                { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
157                { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
158
159                { Test, $crate::sink::test_sink::TestSink, () },
160                { Table, $crate::sink::trivial::TableSink, () }
161            }
162            $(,$arg)*
163        }
164    };
165}
166
167#[macro_export]
168macro_rules! generate_config_use_clauses {
169    ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
170        $(
171            $crate::generate_config_use_single! { $($config_type)+ }
172        )*
173    };
174}
175
176#[macro_export]
177macro_rules! generate_config_use_single {
178    // Skip () config types
179    (()) => {};
180
181    // Generate use clause for actual config types
182    ($config_type:path) => {
183        #[allow(unused_imports)]
184        pub(super) use $config_type;
185    };
186}
187
188// Convenience macro that uses for_all_sinks
189#[macro_export]
190macro_rules! use_all_sink_configs {
191    () => {
192        $crate::for_all_sinks! { $crate::generate_config_use_clauses }
193    };
194}
195
196#[macro_export]
197macro_rules! dispatch_sink {
198    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
199        use $crate::sink::SinkImpl;
200
201        match $impl {
202            $(
203                SinkImpl::$variant_name($sink) => $body,
204            )*
205        }
206    }};
207    ($impl:expr, $sink:ident, $body:expr) => {{
208        $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
209    }};
210}
211
212#[macro_export]
213macro_rules! match_sink_name_str {
214    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
215        use $crate::sink::Sink;
216        match $name_str {
217            $(
218                <$sink_type>::SINK_NAME => {
219                    type $type_name = $sink_type;
220                    {
221                        $body
222                    }
223                },
224            )*
225            other => ($on_other_closure)(other),
226        }
227    }};
228    ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
229        $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
230    }};
231}
232
233pub const CONNECTOR_TYPE_KEY: &str = "connector";
234pub const SINK_TYPE_OPTION: &str = "type";
235/// `snapshot = false` corresponds to [`risingwave_pb::stream_plan::StreamScanType::UpstreamOnly`]
236pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
237pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
238pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
239pub const SINK_TYPE_UPSERT: &str = "upsert";
240pub const SINK_TYPE_RETRACT: &str = "retract";
241/// Whether to drop DELETE and convert UPDATE to INSERT in the sink executor.
242pub const SINK_USER_IGNORE_DELETE_OPTION: &str = "ignore_delete";
243/// Alias for [`SINK_USER_IGNORE_DELETE_OPTION`], kept for backward compatibility.
244pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
245pub const SINK_USER_FORCE_COMPACTION: &str = "force_compaction";
246
247#[derive(Debug, Clone, PartialEq, Eq)]
248pub struct SinkParam {
249    pub sink_id: SinkId,
250    pub sink_name: String,
251    pub properties: BTreeMap<String, String>,
252    pub columns: Vec<ColumnDesc>,
253    /// User-defined primary key indices for upsert sink, if any.
254    pub downstream_pk: Option<Vec<usize>>,
255    pub sink_type: SinkType,
256    /// Whether to drop DELETE and convert UPDATE to INSERT in the sink executor.
257    pub ignore_delete: bool,
258    pub format_desc: Option<SinkFormatDesc>,
259    pub db_name: String,
260
261    /// - For `CREATE SINK ... FROM ...`, the name of the source table.
262    /// - For `CREATE SINK ... AS <query>`, the name of the sink itself.
263    ///
264    /// See also `gen_sink_plan`.
265    // TODO(eric): Why need these 2 fields (db_name and sink_from_name)?
266    pub sink_from_name: String,
267}
268
269impl SinkParam {
270    pub fn from_proto(pb_param: PbSinkParam) -> Self {
271        let ignore_delete = pb_param.ignore_delete();
272        let table_schema = pb_param.table_schema.expect("should contain table schema");
273        let format_desc = match pb_param.format_desc {
274            Some(f) => f.try_into().ok(),
275            None => {
276                let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
277                let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
278                match (connector, r#type) {
279                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
280                    _ => None,
281                }
282            }
283        };
284        Self {
285            sink_id: SinkId::from(pb_param.sink_id),
286            sink_name: pb_param.sink_name,
287            properties: pb_param.properties,
288            columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
289            downstream_pk: if table_schema.pk_indices.is_empty() {
290                None
291            } else {
292                Some(
293                    (table_schema.pk_indices.iter())
294                        .map(|i| *i as usize)
295                        .collect(),
296                )
297            },
298            sink_type: SinkType::from_proto(
299                PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
300            ),
301            ignore_delete,
302            format_desc,
303            db_name: pb_param.db_name,
304            sink_from_name: pb_param.sink_from_name,
305        }
306    }
307
308    pub fn to_proto(&self) -> PbSinkParam {
309        PbSinkParam {
310            sink_id: self.sink_id,
311            sink_name: self.sink_name.clone(),
312            properties: self.properties.clone(),
313            table_schema: Some(TableSchema {
314                columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
315                pk_indices: (self.downstream_pk.as_ref())
316                    .map_or_else(Vec::new, |pk| pk.iter().map(|i| *i as u32).collect()),
317            }),
318            sink_type: self.sink_type.to_proto().into(),
319            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
320            db_name: self.db_name.clone(),
321            sink_from_name: self.sink_from_name.clone(),
322            raw_ignore_delete: self.ignore_delete,
323        }
324    }
325
326    pub fn schema(&self) -> Schema {
327        Schema {
328            fields: self.columns.iter().map(Field::from).collect(),
329        }
330    }
331
332    /// Get the downstream primary key indices specified by the user. If not specified, return
333    /// an empty vector.
334    ///
335    /// Prefer directly accessing the `downstream_pk` field, as it uses `None` to represent
336    /// unspecified values, making it clearer.
337    pub fn downstream_pk_or_empty(&self) -> Vec<usize> {
338        self.downstream_pk.clone().unwrap_or_default()
339    }
340
341    // `SinkParams` should only be used when there is a secret context.
342    // FIXME: Use a new type for `SinkFormatDesc` with properties contain filled secrets.
343    pub fn fill_secret_for_format_desc(
344        format_desc: Option<SinkFormatDesc>,
345    ) -> Result<Option<SinkFormatDesc>> {
346        match format_desc {
347            Some(mut format_desc) => {
348                format_desc.options = LocalSecretManager::global()
349                    .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
350                Ok(Some(format_desc))
351            }
352            None => Ok(None),
353        }
354    }
355
356    /// Try to convert a `SinkCatalog` to a `SinkParam` and fill the secrets to properties.
357    pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
358        let columns = sink_catalog
359            .visible_columns()
360            .map(|col| col.column_desc.clone())
361            .collect();
362        let properties_with_secret = LocalSecretManager::global()
363            .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
364        let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
365        Ok(Self {
366            sink_id: sink_catalog.id,
367            sink_name: sink_catalog.name,
368            properties: properties_with_secret,
369            columns,
370            downstream_pk: sink_catalog.downstream_pk,
371            sink_type: sink_catalog.sink_type,
372            ignore_delete: sink_catalog.ignore_delete,
373            format_desc: format_desc_with_secret,
374            db_name: sink_catalog.db_name,
375            sink_from_name: sink_catalog.sink_from_name,
376        })
377    }
378}
379
380pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
381    use crate::enforce_secret::EnforceSecret;
382
383    let connector = props
384        .get_connector()
385        .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
386    let key_iter = props.key_iter();
387    match_sink_name_str!(
388        connector.as_str(),
389        PropType,
390        PropType::enforce_secret(key_iter),
391        |other| bail!("connector '{}' is not supported", other)
392    )
393}
394
395pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
396    LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
397
398#[derive(Clone)]
399pub struct SinkMetrics {
400    pub sink_commit_duration: LabelGuardedHistogramVec,
401    pub connector_sink_rows_received: LabelGuardedIntCounterVec,
402
403    // Log store writer metrics
404    pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
405    pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
406    pub log_store_write_rows: LabelGuardedIntCounterVec,
407
408    // Log store reader metrics
409    pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
410    pub log_store_read_rows: LabelGuardedIntCounterVec,
411    pub log_store_read_bytes: LabelGuardedIntCounterVec,
412    pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
413
414    // Iceberg metrics
415    pub iceberg_write_qps: LabelGuardedIntCounterVec,
416    pub iceberg_write_latency: LabelGuardedHistogramVec,
417    pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
418    pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
419    pub iceberg_partition_num: LabelGuardedIntGaugeVec,
420    pub iceberg_write_bytes: LabelGuardedIntCounterVec,
421    pub iceberg_snapshot_num: LabelGuardedIntGaugeVec,
422}
423
424impl SinkMetrics {
425    pub fn new(registry: &Registry) -> Self {
426        let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
427            "sink_commit_duration",
428            "Duration of commit op in sink",
429            &["actor_id", "connector", "sink_id", "sink_name"],
430            registry
431        )
432        .unwrap();
433
434        let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
435            "connector_sink_rows_received",
436            "Number of rows received by sink",
437            &["actor_id", "connector_type", "sink_id", "sink_name"],
438            registry
439        )
440        .unwrap();
441
442        let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
443            "log_store_first_write_epoch",
444            "The first write epoch of log store",
445            &["actor_id", "sink_id", "sink_name"],
446            registry
447        )
448        .unwrap();
449
450        let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
451            "log_store_latest_write_epoch",
452            "The latest write epoch of log store",
453            &["actor_id", "sink_id", "sink_name"],
454            registry
455        )
456        .unwrap();
457
458        let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
459            "log_store_write_rows",
460            "The write rate of rows",
461            &["actor_id", "sink_id", "sink_name"],
462            registry
463        )
464        .unwrap();
465
466        let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
467            "log_store_latest_read_epoch",
468            "The latest read epoch of log store",
469            &["actor_id", "connector", "sink_id", "sink_name"],
470            registry
471        )
472        .unwrap();
473
474        let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
475            "log_store_read_rows",
476            "The read rate of rows",
477            &["actor_id", "connector", "sink_id", "sink_name"],
478            registry
479        )
480        .unwrap();
481
482        let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
483            "log_store_read_bytes",
484            "Total size of chunks read by log reader",
485            &["actor_id", "connector", "sink_id", "sink_name"],
486            registry
487        )
488        .unwrap();
489
490        let log_store_reader_wait_new_future_duration_ns =
491            register_guarded_int_counter_vec_with_registry!(
492                "log_store_reader_wait_new_future_duration_ns",
493                "Accumulated duration of LogReader to wait for next call to create future",
494                &["actor_id", "connector", "sink_id", "sink_name"],
495                registry
496            )
497            .unwrap();
498
499        let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
500            "iceberg_write_qps",
501            "The qps of iceberg writer",
502            &["actor_id", "sink_id", "sink_name"],
503            registry
504        )
505        .unwrap();
506
507        let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
508            "iceberg_write_latency",
509            "The latency of iceberg writer",
510            &["actor_id", "sink_id", "sink_name"],
511            registry
512        )
513        .unwrap();
514
515        let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
516            "iceberg_rolling_unflushed_data_file",
517            "The unflushed data file count of iceberg rolling writer",
518            &["actor_id", "sink_id", "sink_name"],
519            registry
520        )
521        .unwrap();
522
523        let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
524            "iceberg_position_delete_cache_num",
525            "The delete cache num of iceberg position delete writer",
526            &["actor_id", "sink_id", "sink_name"],
527            registry
528        )
529        .unwrap();
530
531        let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
532            "iceberg_partition_num",
533            "The partition num of iceberg partition writer",
534            &["actor_id", "sink_id", "sink_name"],
535            registry
536        )
537        .unwrap();
538
539        let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
540            "iceberg_write_bytes",
541            "The write bytes of iceberg writer",
542            &["actor_id", "sink_id", "sink_name"],
543            registry
544        )
545        .unwrap();
546
547        let iceberg_snapshot_num = register_guarded_int_gauge_vec_with_registry!(
548            "iceberg_snapshot_num",
549            "The snapshot number of iceberg table",
550            &["sink_name", "catalog_name", "table_name"],
551            registry
552        )
553        .unwrap();
554
555        Self {
556            sink_commit_duration,
557            connector_sink_rows_received,
558            log_store_first_write_epoch,
559            log_store_latest_write_epoch,
560            log_store_write_rows,
561            log_store_latest_read_epoch,
562            log_store_read_rows,
563            log_store_read_bytes,
564            log_store_reader_wait_new_future_duration_ns,
565            iceberg_write_qps,
566            iceberg_write_latency,
567            iceberg_rolling_unflushed_data_file,
568            iceberg_position_delete_cache_num,
569            iceberg_partition_num,
570            iceberg_write_bytes,
571            iceberg_snapshot_num,
572        }
573    }
574}
575
576#[derive(Clone)]
577pub struct SinkWriterParam {
578    // TODO(eric): deprecate executor_id
579    pub executor_id: ExecutorId,
580    pub vnode_bitmap: Option<Bitmap>,
581    pub meta_client: Option<SinkMetaClient>,
582    // The val has two effect:
583    // 1. Indicates that the sink will accpect the data chunk with extra partition value column.
584    // 2. The index of the extra partition value column.
585    // More detail of partition value column, see `PartitionComputeInfo`
586    pub extra_partition_col_idx: Option<usize>,
587
588    pub actor_id: ActorId,
589    pub sink_id: SinkId,
590    pub sink_name: String,
591    pub connector: String,
592    pub streaming_config: StreamingConfig,
593}
594
595#[derive(Clone)]
596pub struct SinkWriterMetrics {
597    pub sink_commit_duration: LabelGuardedHistogram,
598    pub connector_sink_rows_received: LabelGuardedIntCounter,
599}
600
601impl SinkWriterMetrics {
602    pub fn new(writer_param: &SinkWriterParam) -> Self {
603        let labels = [
604            &writer_param.actor_id.to_string(),
605            writer_param.connector.as_str(),
606            &writer_param.sink_id.to_string(),
607            writer_param.sink_name.as_str(),
608        ];
609        let sink_commit_duration = GLOBAL_SINK_METRICS
610            .sink_commit_duration
611            .with_guarded_label_values(&labels);
612        let connector_sink_rows_received = GLOBAL_SINK_METRICS
613            .connector_sink_rows_received
614            .with_guarded_label_values(&labels);
615        Self {
616            sink_commit_duration,
617            connector_sink_rows_received,
618        }
619    }
620
621    #[cfg(test)]
622    pub fn for_test() -> Self {
623        Self {
624            sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
625            connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
626        }
627    }
628}
629
630#[derive(Clone)]
631pub enum SinkMetaClient {
632    MetaClient(MetaClient),
633    MockMetaClient(MockMetaClient),
634}
635
636impl SinkMetaClient {
637    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
638        match self {
639            SinkMetaClient::MetaClient(meta_client) => {
640                SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
641                    meta_client.sink_coordinate_client().await,
642                )
643            }
644            SinkMetaClient::MockMetaClient(mock_meta_client) => {
645                SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
646                    mock_meta_client.sink_coordinate_client(),
647                )
648            }
649        }
650    }
651
652    pub async fn add_sink_fail_evet_log(
653        &self,
654        sink_id: SinkId,
655        sink_name: String,
656        connector: String,
657        error: String,
658    ) {
659        match self {
660            SinkMetaClient::MetaClient(meta_client) => {
661                match meta_client
662                    .add_sink_fail_evet(sink_id, sink_name, connector, error)
663                    .await
664                {
665                    Ok(_) => {}
666                    Err(e) => {
667                        tracing::warn!(error = %e.as_report(), %sink_id, "Failed to add sink fail event to event log.");
668                    }
669                }
670            }
671            SinkMetaClient::MockMetaClient(_) => {}
672        }
673    }
674}
675
676impl SinkWriterParam {
677    pub fn for_test() -> Self {
678        SinkWriterParam {
679            executor_id: Default::default(),
680            vnode_bitmap: Default::default(),
681            meta_client: Default::default(),
682            extra_partition_col_idx: Default::default(),
683
684            actor_id: 1.into(),
685            sink_id: SinkId::new(1),
686            sink_name: "test_sink".to_owned(),
687            connector: "test_connector".to_owned(),
688            streaming_config: StreamingConfig::default(),
689        }
690    }
691}
692
693fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
694    matches!(
695        sink_name,
696        ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK | SNOWFLAKE_SINK_V2
697    )
698}
699pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
700    const SINK_NAME: &'static str;
701
702    type LogSinker: LogSinker;
703
704    fn set_default_commit_checkpoint_interval(
705        desc: &mut SinkDesc,
706        user_specified: &SinkDecouple,
707    ) -> Result<()> {
708        if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
709            match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
710                Some(commit_checkpoint_interval) => {
711                    let commit_checkpoint_interval = commit_checkpoint_interval
712                        .parse::<u64>()
713                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
714                    if matches!(user_specified, SinkDecouple::Disable)
715                        && commit_checkpoint_interval > 1
716                    {
717                        return Err(SinkError::Config(anyhow!(
718                            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
719                        )));
720                    }
721                }
722                None => match user_specified {
723                    SinkDecouple::Default | SinkDecouple::Enable => {
724                        if matches!(Self::SINK_NAME, ICEBERG_SINK) {
725                            desc.properties.insert(
726                                COMMIT_CHECKPOINT_INTERVAL.to_owned(),
727                                ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL.to_string(),
728                            );
729                        } else {
730                            desc.properties.insert(
731                                COMMIT_CHECKPOINT_INTERVAL.to_owned(),
732                                DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
733                            );
734                        }
735                    }
736                    SinkDecouple::Disable => {
737                        desc.properties.insert(
738                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
739                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
740                        );
741                    }
742                },
743            }
744        }
745        Ok(())
746    }
747
748    /// `user_specified` is the value of `sink_decouple` config.
749    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
750        match user_specified {
751            SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
752            SinkDecouple::Disable => Ok(false),
753        }
754    }
755
756    fn support_schema_change() -> bool {
757        false
758    }
759
760    fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
761        Ok(())
762    }
763
764    async fn validate(&self) -> Result<()>;
765    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
766
767    fn is_coordinated_sink(&self) -> bool {
768        false
769    }
770
771    async fn new_coordinator(
772        &self,
773        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
774    ) -> Result<SinkCommitCoordinator> {
775        Err(SinkError::Coordinator(anyhow!("no coordinator")))
776    }
777}
778
779pub trait SinkLogReader: Send {
780    fn start_from(
781        &mut self,
782        start_offset: Option<u64>,
783    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
784    /// Emit the next item.
785    ///
786    /// The implementation should ensure that the future is cancellation safe.
787    fn next_item(
788        &mut self,
789    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
790
791    /// Mark that all items emitted so far have been consumed and it is safe to truncate the log
792    /// from the current offset.
793    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
794}
795
796impl<R: LogReader> SinkLogReader for &mut R {
797    fn next_item(
798        &mut self,
799    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
800        <R as LogReader>::next_item(*self)
801    }
802
803    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
804        <R as LogReader>::truncate(*self, offset)
805    }
806
807    fn start_from(
808        &mut self,
809        start_offset: Option<u64>,
810    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
811        <R as LogReader>::start_from(*self, start_offset)
812    }
813}
814
815#[async_trait]
816pub trait LogSinker: 'static + Send {
817    // Note: Please rebuild the log reader's read stream before consuming the log store,
818    async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
819}
820pub type SinkCommittedEpochSubscriber = Arc<
821    dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
822        + Send
823        + Sync
824        + 'static,
825>;
826
827pub enum SinkCommitCoordinator {
828    SinglePhase(BoxSinglePhaseCoordinator),
829    TwoPhase(BoxTwoPhaseCoordinator),
830}
831
832#[async_trait]
833pub trait SinglePhaseCommitCoordinator {
834    /// Initialize the sink committer coordinator.
835    async fn init(&mut self) -> Result<()>;
836
837    /// Commit data directly using single-phase strategy.
838    async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
839
840    /// Idempotent implementation is required, because `commit_schema_change` in the same epoch could be called multiple
841    /// times.
842    async fn commit_schema_change(
843        &mut self,
844        _epoch: u64,
845        _schema_change: PbSinkSchemaChange,
846    ) -> Result<()> {
847        Err(SinkError::Coordinator(anyhow!(
848            "Schema change is not implemented for single-phase commit coordinator {}",
849            std::any::type_name::<Self>()
850        )))
851    }
852}
853
854#[async_trait]
855pub trait TwoPhaseCommitCoordinator {
856    /// Initialize the sink committer coordinator.
857    async fn init(&mut self) -> Result<()>;
858
859    /// Return serialized commit metadata to be passed to `commit`.
860    async fn pre_commit(
861        &mut self,
862        epoch: u64,
863        metadata: Vec<SinkMetadata>,
864        schema_change: Option<PbSinkSchemaChange>,
865    ) -> Result<Option<Vec<u8>>>;
866
867    /// Idempotent implementation is required, because `commit_data` in the same epoch could be called multiple times.
868    async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()>;
869
870    /// Idempotent implementation is required, because `commit_schema_change` in the same epoch could be called multiple
871    /// times.
872    async fn commit_schema_change(
873        &mut self,
874        _epoch: u64,
875        _schema_change: PbSinkSchemaChange,
876    ) -> Result<()> {
877        Err(SinkError::Coordinator(anyhow!(
878            "Schema change is not implemented for two-phase commit coordinator {}",
879            std::any::type_name::<Self>()
880        )))
881    }
882
883    /// Idempotent implementation is required, because `abort` in the same epoch could be called multiple times.
884    async fn abort(&mut self, epoch: u64, commit_metadata: Vec<u8>);
885}
886
887impl SinkImpl {
888    pub fn new(mut param: SinkParam) -> Result<Self> {
889        const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
890
891        // remove privatelink related properties if any
892        param.properties.remove(PRIVATE_LINK_TARGET_KEY);
893
894        let sink_type = param
895            .properties
896            .get(CONNECTOR_TYPE_KEY)
897            .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
898
899        let sink_type = sink_type.to_lowercase();
900        match_sink_name_str!(
901            sink_type.as_str(),
902            SinkType,
903            Ok(SinkType::try_from(param)?.into()),
904            |other| {
905                Err(SinkError::Config(anyhow!(
906                    "unsupported sink connector {}",
907                    other
908                )))
909            }
910        )
911    }
912
913    pub fn is_sink_into_table(&self) -> bool {
914        matches!(self, SinkImpl::Table(_))
915    }
916
917    pub fn is_blackhole(&self) -> bool {
918        matches!(self, SinkImpl::BlackHole(_))
919    }
920
921    pub fn is_coordinated_sink(&self) -> bool {
922        dispatch_sink!(self, sink, sink.is_coordinated_sink())
923    }
924}
925
926pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
927    SinkImpl::new(param)
928}
929
930macro_rules! def_sink_impl {
931    () => {
932        $crate::for_all_sinks! { def_sink_impl }
933    };
934    ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
935        #[derive(Debug)]
936        pub enum SinkImpl {
937            $(
938                $variant_name(Box<$sink_type>),
939            )*
940        }
941
942        $(
943            impl From<$sink_type> for SinkImpl {
944                fn from(sink: $sink_type) -> SinkImpl {
945                    SinkImpl::$variant_name(Box::new(sink))
946                }
947            }
948        )*
949    };
950}
951
952def_sink_impl!();
953
954pub type Result<T> = std::result::Result<T, SinkError>;
955
956#[derive(Error, Debug)]
957pub enum SinkError {
958    #[error("Kafka error: {0}")]
959    Kafka(#[from] rdkafka::error::KafkaError),
960    #[error("Kinesis error: {0}")]
961    Kinesis(
962        #[source]
963        #[backtrace]
964        anyhow::Error,
965    ),
966    #[error("Remote sink error: {0}")]
967    Remote(
968        #[source]
969        #[backtrace]
970        anyhow::Error,
971    ),
972    #[error("Encode error: {0}")]
973    Encode(String),
974    #[error("Avro error: {0}")]
975    Avro(#[from] apache_avro::Error),
976    #[error("Iceberg error: {0}")]
977    Iceberg(
978        #[source]
979        #[backtrace]
980        anyhow::Error,
981    ),
982    #[error("config error: {0}")]
983    Config(
984        #[source]
985        #[backtrace]
986        anyhow::Error,
987    ),
988    #[error("coordinator error: {0}")]
989    Coordinator(
990        #[source]
991        #[backtrace]
992        anyhow::Error,
993    ),
994    #[error("ClickHouse error: {0}")]
995    ClickHouse(String),
996    #[error("Redis error: {0}")]
997    Redis(String),
998    #[error("Http error: {0}")]
999    Http(
1000        #[source]
1001        #[backtrace]
1002        anyhow::Error,
1003    ),
1004    #[error("Mqtt error: {0}")]
1005    Mqtt(
1006        #[source]
1007        #[backtrace]
1008        anyhow::Error,
1009    ),
1010    #[error("Nats error: {0}")]
1011    Nats(
1012        #[source]
1013        #[backtrace]
1014        anyhow::Error,
1015    ),
1016    #[error("Google Pub/Sub error: {0}")]
1017    GooglePubSub(
1018        #[source]
1019        #[backtrace]
1020        anyhow::Error,
1021    ),
1022    #[error("Doris/Starrocks connect error: {0}")]
1023    DorisStarrocksConnect(
1024        #[source]
1025        #[backtrace]
1026        anyhow::Error,
1027    ),
1028    #[error("Doris error: {0}")]
1029    Doris(String),
1030    #[error("DeltaLake error: {0}")]
1031    DeltaLake(
1032        #[source]
1033        #[backtrace]
1034        anyhow::Error,
1035    ),
1036    #[error("ElasticSearch/OpenSearch error: {0}")]
1037    ElasticSearchOpenSearch(
1038        #[source]
1039        #[backtrace]
1040        anyhow::Error,
1041    ),
1042    #[error("Starrocks error: {0}")]
1043    Starrocks(String),
1044    #[error("File error: {0}")]
1045    File(String),
1046    #[error("Pulsar error: {0}")]
1047    Pulsar(
1048        #[source]
1049        #[backtrace]
1050        anyhow::Error,
1051    ),
1052    #[error(transparent)]
1053    Internal(
1054        #[from]
1055        #[backtrace]
1056        anyhow::Error,
1057    ),
1058    #[error("BigQuery error: {0}")]
1059    BigQuery(
1060        #[source]
1061        #[backtrace]
1062        anyhow::Error,
1063    ),
1064    #[error("DynamoDB error: {0}")]
1065    DynamoDb(
1066        #[source]
1067        #[backtrace]
1068        anyhow::Error,
1069    ),
1070    #[error("SQL Server error: {0}")]
1071    SqlServer(
1072        #[source]
1073        #[backtrace]
1074        anyhow::Error,
1075    ),
1076    #[error("Postgres error: {0}")]
1077    Postgres(
1078        #[source]
1079        #[backtrace]
1080        anyhow::Error,
1081    ),
1082    #[error(transparent)]
1083    Connector(
1084        #[from]
1085        #[backtrace]
1086        ConnectorError,
1087    ),
1088    #[error("Secret error: {0}")]
1089    Secret(
1090        #[from]
1091        #[backtrace]
1092        SecretError,
1093    ),
1094    #[error("Mongodb error: {0}")]
1095    Mongodb(
1096        #[source]
1097        #[backtrace]
1098        anyhow::Error,
1099    ),
1100    #[error("Redshift error: {0}")]
1101    Redshift(
1102        #[source]
1103        #[backtrace]
1104        anyhow::Error,
1105    ),
1106}
1107
1108impl From<sea_orm::DbErr> for SinkError {
1109    fn from(err: sea_orm::DbErr) -> Self {
1110        SinkError::Iceberg(anyhow!(err))
1111    }
1112}
1113
1114impl From<OpendalError> for SinkError {
1115    fn from(error: OpendalError) -> Self {
1116        SinkError::File(error.to_report_string())
1117    }
1118}
1119
1120impl From<parquet::errors::ParquetError> for SinkError {
1121    fn from(error: parquet::errors::ParquetError) -> Self {
1122        SinkError::File(error.to_report_string())
1123    }
1124}
1125
1126impl From<ArrayError> for SinkError {
1127    fn from(error: ArrayError) -> Self {
1128        SinkError::File(error.to_report_string())
1129    }
1130}
1131
1132impl From<RpcError> for SinkError {
1133    fn from(value: RpcError) -> Self {
1134        SinkError::Remote(anyhow!(value))
1135    }
1136}
1137
1138impl From<RedisError> for SinkError {
1139    fn from(value: RedisError) -> Self {
1140        SinkError::Redis(value.to_report_string())
1141    }
1142}
1143
1144impl From<tiberius::error::Error> for SinkError {
1145    fn from(err: tiberius::error::Error) -> Self {
1146        SinkError::SqlServer(anyhow!(err))
1147    }
1148}
1149
1150impl From<::elasticsearch::Error> for SinkError {
1151    fn from(err: ::elasticsearch::Error) -> Self {
1152        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1153    }
1154}
1155
1156impl From<::opensearch::Error> for SinkError {
1157    fn from(err: ::opensearch::Error) -> Self {
1158        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1159    }
1160}
1161
1162impl From<tokio_postgres::Error> for SinkError {
1163    fn from(err: tokio_postgres::Error) -> Self {
1164        SinkError::Postgres(anyhow!(err))
1165    }
1166}