risingwave_connector/sink/
clickhouse.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::fmt::Debug;
16use core::num::NonZeroU64;
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use anyhow::anyhow;
20use clickhouse::insert::Insert;
21use clickhouse::{Client as ClickHouseClient, Row as ClickHouseRow};
22use itertools::Itertools;
23use risingwave_common::array::{Op, StreamChunk};
24use risingwave_common::catalog::Schema;
25use risingwave_common::row::Row;
26use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
27use serde::Serialize;
28use serde::ser::{SerializeSeq, SerializeStruct};
29use serde_derive::Deserialize;
30use serde_with::{DisplayFromStr, serde_as};
31use thiserror_ext::AsReport;
32use tonic::async_trait;
33use tracing::warn;
34use with_options::WithOptions;
35
36use super::decouple_checkpoint_log_sink::{
37    DecoupleCheckpointLogSinkerOf, default_commit_checkpoint_interval,
38};
39use super::writer::SinkWriter;
40use super::{DummySinkCommitCoordinator, SinkWriterMetrics, SinkWriterParam};
41use crate::error::ConnectorResult;
42use crate::sink::{
43    Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink, SinkError, SinkParam,
44};
45
46const QUERY_ENGINE: &str =
47    "select distinct ?fields from system.tables where database = ? and name = ?";
48const QUERY_COLUMN: &str =
49    "select distinct ?fields from system.columns where database = ? and table = ? order by ?";
50pub const CLICKHOUSE_SINK: &str = "clickhouse";
51
52#[serde_as]
53#[derive(Deserialize, Debug, Clone, WithOptions)]
54pub struct ClickHouseCommon {
55    #[serde(rename = "clickhouse.url")]
56    pub url: String,
57    #[serde(rename = "clickhouse.user")]
58    pub user: String,
59    #[serde(rename = "clickhouse.password")]
60    pub password: String,
61    #[serde(rename = "clickhouse.database")]
62    pub database: String,
63    #[serde(rename = "clickhouse.table")]
64    pub table: String,
65    #[serde(rename = "clickhouse.delete.column")]
66    pub delete_column: Option<String>,
67    /// Commit every n(>0) checkpoints, default is 10.
68    #[serde(default = "default_commit_checkpoint_interval")]
69    #[serde_as(as = "DisplayFromStr")]
70    pub commit_checkpoint_interval: u64,
71}
72
73#[allow(clippy::enum_variant_names)]
74#[derive(Debug)]
75enum ClickHouseEngine {
76    MergeTree,
77    ReplacingMergeTree(Option<String>),
78    SummingMergeTree,
79    AggregatingMergeTree,
80    CollapsingMergeTree(String),
81    VersionedCollapsingMergeTree(String),
82    GraphiteMergeTree,
83    ReplicatedMergeTree,
84    ReplicatedReplacingMergeTree(Option<String>),
85    ReplicatedSummingMergeTree,
86    ReplicatedAggregatingMergeTree,
87    ReplicatedCollapsingMergeTree(String),
88    ReplicatedVersionedCollapsingMergeTree(String),
89    ReplicatedGraphiteMergeTree,
90    SharedMergeTree,
91    SharedReplacingMergeTree(Option<String>),
92    SharedSummingMergeTree,
93    SharedAggregatingMergeTree,
94    SharedCollapsingMergeTree(String),
95    SharedVersionedCollapsingMergeTree(String),
96    SharedGraphiteMergeTree,
97    Null,
98}
99impl ClickHouseEngine {
100    pub fn is_collapsing_engine(&self) -> bool {
101        matches!(
102            self,
103            ClickHouseEngine::CollapsingMergeTree(_)
104                | ClickHouseEngine::VersionedCollapsingMergeTree(_)
105                | ClickHouseEngine::ReplicatedCollapsingMergeTree(_)
106                | ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(_)
107                | ClickHouseEngine::SharedCollapsingMergeTree(_)
108                | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
109        )
110    }
111
112    pub fn is_delete_replacing_engine(&self) -> bool {
113        match self {
114            ClickHouseEngine::ReplacingMergeTree(delete_col) => delete_col.is_some(),
115            ClickHouseEngine::ReplicatedReplacingMergeTree(delete_col) => delete_col.is_some(),
116            ClickHouseEngine::SharedReplacingMergeTree(delete_col) => delete_col.is_some(),
117            _ => false,
118        }
119    }
120
121    pub fn get_delete_col(&self) -> Option<String> {
122        match self {
123            ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.to_string()),
124            ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => {
125                Some(delete_col.to_string())
126            }
127            ClickHouseEngine::SharedReplacingMergeTree(Some(delete_col)) => {
128                Some(delete_col.to_string())
129            }
130            _ => None,
131        }
132    }
133
134    pub fn get_sign_name(&self) -> Option<String> {
135        match self {
136            ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.to_string()),
137            ClickHouseEngine::VersionedCollapsingMergeTree(sign_name) => {
138                Some(sign_name.to_string())
139            }
140            ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name) => {
141                Some(sign_name.to_string())
142            }
143            ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(sign_name) => {
144                Some(sign_name.to_string())
145            }
146            ClickHouseEngine::SharedCollapsingMergeTree(sign_name) => Some(sign_name.to_string()),
147            ClickHouseEngine::SharedVersionedCollapsingMergeTree(sign_name) => {
148                Some(sign_name.to_string())
149            }
150            _ => None,
151        }
152    }
153
154    pub fn is_shared_tree(&self) -> bool {
155        matches!(
156            self,
157            ClickHouseEngine::SharedMergeTree
158                | ClickHouseEngine::SharedReplacingMergeTree(_)
159                | ClickHouseEngine::SharedSummingMergeTree
160                | ClickHouseEngine::SharedAggregatingMergeTree
161                | ClickHouseEngine::SharedCollapsingMergeTree(_)
162                | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
163                | ClickHouseEngine::SharedGraphiteMergeTree
164        )
165    }
166
167    pub fn from_query_engine(
168        engine_name: &ClickhouseQueryEngine,
169        config: &ClickHouseConfig,
170    ) -> Result<Self> {
171        match engine_name.engine.as_str() {
172            "MergeTree" => Ok(ClickHouseEngine::MergeTree),
173            "Null" => Ok(ClickHouseEngine::Null),
174            "ReplacingMergeTree" => {
175                let delete_column = config.common.delete_column.clone();
176                Ok(ClickHouseEngine::ReplacingMergeTree(delete_column))
177            }
178            "SummingMergeTree" => Ok(ClickHouseEngine::SummingMergeTree),
179            "AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree),
180            // VersionedCollapsingMergeTree(sign_name,"a")
181            "VersionedCollapsingMergeTree" => {
182                let sign_name = engine_name
183                    .create_table_query
184                    .split("VersionedCollapsingMergeTree(")
185                    .last()
186                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
187                    .split(',')
188                    .next()
189                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
190                    .trim()
191                    .to_owned();
192                Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name))
193            }
194            // CollapsingMergeTree(sign_name)
195            "CollapsingMergeTree" => {
196                let sign_name = engine_name
197                    .create_table_query
198                    .split("CollapsingMergeTree(")
199                    .last()
200                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
201                    .split(')')
202                    .next()
203                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
204                    .trim()
205                    .to_owned();
206                Ok(ClickHouseEngine::CollapsingMergeTree(sign_name))
207            }
208            "GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree),
209            "ReplicatedMergeTree" => Ok(ClickHouseEngine::ReplicatedMergeTree),
210            "ReplicatedReplacingMergeTree" => {
211                let delete_column = config.common.delete_column.clone();
212                Ok(ClickHouseEngine::ReplicatedReplacingMergeTree(
213                    delete_column,
214                ))
215            }
216            "ReplicatedSummingMergeTree" => Ok(ClickHouseEngine::ReplicatedSummingMergeTree),
217            "ReplicatedAggregatingMergeTree" => {
218                Ok(ClickHouseEngine::ReplicatedAggregatingMergeTree)
219            }
220            // ReplicatedVersionedCollapsingMergeTree("a","b",sign_name,"c")
221            "ReplicatedVersionedCollapsingMergeTree" => {
222                let sign_name = engine_name
223                    .create_table_query
224                    .split("ReplicatedVersionedCollapsingMergeTree(")
225                    .last()
226                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
227                    .split(',')
228                    .rev()
229                    .nth(1)
230                    .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
231                    .trim()
232                    .to_owned();
233                Ok(ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(
234                    sign_name,
235                ))
236            }
237            // ReplicatedCollapsingMergeTree("a","b",sign_name)
238            "ReplicatedCollapsingMergeTree" => {
239                let sign_name = engine_name
240                    .create_table_query
241                    .split("ReplicatedCollapsingMergeTree(")
242                    .last()
243                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
244                    .split(')')
245                    .next()
246                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
247                    .split(',')
248                    .next_back()
249                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
250                    .trim()
251                    .to_owned();
252                Ok(ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name))
253            }
254            "ReplicatedGraphiteMergeTree" => Ok(ClickHouseEngine::ReplicatedGraphiteMergeTree),
255            "SharedMergeTree" => Ok(ClickHouseEngine::SharedMergeTree),
256            "SharedReplacingMergeTree" => {
257                let delete_column = config.common.delete_column.clone();
258                Ok(ClickHouseEngine::SharedReplacingMergeTree(delete_column))
259            }
260            "SharedSummingMergeTree" => Ok(ClickHouseEngine::SharedSummingMergeTree),
261            "SharedAggregatingMergeTree" => Ok(ClickHouseEngine::SharedAggregatingMergeTree),
262            // SharedVersionedCollapsingMergeTree("a","b",sign_name,"c")
263            "SharedVersionedCollapsingMergeTree" => {
264                let sign_name = engine_name
265                    .create_table_query
266                    .split("SharedVersionedCollapsingMergeTree(")
267                    .last()
268                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
269                    .split(',')
270                    .rev()
271                    .nth(1)
272                    .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
273                    .trim()
274                    .to_owned();
275                Ok(ClickHouseEngine::SharedVersionedCollapsingMergeTree(
276                    sign_name,
277                ))
278            }
279            // SharedCollapsingMergeTree("a","b",sign_name)
280            "SharedCollapsingMergeTree" => {
281                let sign_name = engine_name
282                    .create_table_query
283                    .split("SharedCollapsingMergeTree(")
284                    .last()
285                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
286                    .split(')')
287                    .next()
288                    .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
289                    .split(',')
290                    .next_back()
291                    .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
292                    .trim()
293                    .to_owned();
294                Ok(ClickHouseEngine::SharedCollapsingMergeTree(sign_name))
295            }
296            "SharedGraphiteMergeTree" => Ok(ClickHouseEngine::SharedGraphiteMergeTree),
297            _ => Err(SinkError::ClickHouse(format!(
298                "Cannot find clickhouse engine {:?}",
299                engine_name.engine
300            ))),
301        }
302    }
303}
304
305impl ClickHouseCommon {
306    pub(crate) fn build_client(&self) -> ConnectorResult<ClickHouseClient> {
307        let client = ClickHouseClient::default() // hyper(0.14) client inside
308            .with_url(&self.url)
309            .with_user(&self.user)
310            .with_password(&self.password)
311            .with_database(&self.database);
312        Ok(client)
313    }
314}
315
316#[serde_as]
317#[derive(Clone, Debug, Deserialize, WithOptions)]
318pub struct ClickHouseConfig {
319    #[serde(flatten)]
320    pub common: ClickHouseCommon,
321
322    pub r#type: String, // accept "append-only" or "upsert"
323}
324
325#[derive(Clone, Debug)]
326pub struct ClickHouseSink {
327    pub config: ClickHouseConfig,
328    schema: Schema,
329    pk_indices: Vec<usize>,
330    is_append_only: bool,
331}
332
333impl ClickHouseConfig {
334    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
335        let config =
336            serde_json::from_value::<ClickHouseConfig>(serde_json::to_value(properties).unwrap())
337                .map_err(|e| SinkError::Config(anyhow!(e)))?;
338        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
339            return Err(SinkError::Config(anyhow!(
340                "`{}` must be {}, or {}",
341                SINK_TYPE_OPTION,
342                SINK_TYPE_APPEND_ONLY,
343                SINK_TYPE_UPSERT
344            )));
345        }
346        Ok(config)
347    }
348}
349
350impl TryFrom<SinkParam> for ClickHouseSink {
351    type Error = SinkError;
352
353    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
354        let schema = param.schema();
355        let config = ClickHouseConfig::from_btreemap(param.properties)?;
356        Ok(Self {
357            config,
358            schema,
359            pk_indices: param.downstream_pk,
360            is_append_only: param.sink_type.is_append_only(),
361        })
362    }
363}
364
365impl ClickHouseSink {
366    /// Check that the column names and types of risingwave and clickhouse are identical
367    fn check_column_name_and_type(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
368        let rw_fields_name = build_fields_name_type_from_schema(&self.schema)?;
369        let clickhouse_columns_desc: HashMap<String, SystemColumn> = clickhouse_columns_desc
370            .iter()
371            .map(|s| (s.name.clone(), s.clone()))
372            .collect();
373
374        if rw_fields_name.len().gt(&clickhouse_columns_desc.len()) {
375            return Err(SinkError::ClickHouse("The columns of the sink must be equal to or a superset of the target table's columns.".to_owned()));
376        }
377
378        for i in rw_fields_name {
379            let value = clickhouse_columns_desc.get(&i.0).ok_or_else(|| {
380                SinkError::ClickHouse(format!(
381                    "Column name don't find in clickhouse, risingwave is {:?} ",
382                    i.0
383                ))
384            })?;
385
386            Self::check_and_correct_column_type(&i.1, value)?;
387        }
388        Ok(())
389    }
390
391    /// Check that the column names and types of risingwave and clickhouse are identical
392    fn check_pk_match(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
393        let mut clickhouse_pks: HashSet<String> = clickhouse_columns_desc
394            .iter()
395            .filter(|s| s.is_in_primary_key == 1)
396            .map(|s| s.name.clone())
397            .collect();
398
399        for (_, field) in self
400            .schema
401            .fields()
402            .iter()
403            .enumerate()
404            .filter(|(index, _)| self.pk_indices.contains(index))
405        {
406            if !clickhouse_pks.remove(&field.name) {
407                return Err(SinkError::ClickHouse(
408                    "Clicklhouse and RisingWave pk is not match".to_owned(),
409                ));
410            }
411        }
412
413        if !clickhouse_pks.is_empty() {
414            return Err(SinkError::ClickHouse(
415                "Clicklhouse and RisingWave pk is not match".to_owned(),
416            ));
417        }
418        Ok(())
419    }
420
421    /// Check that the column types of risingwave and clickhouse are identical
422    fn check_and_correct_column_type(
423        fields_type: &DataType,
424        ck_column: &SystemColumn,
425    ) -> Result<()> {
426        // FIXME: the "contains" based implementation is wrong
427        let is_match = match fields_type {
428            risingwave_common::types::DataType::Boolean => Ok(ck_column.r#type.contains("Bool")),
429            risingwave_common::types::DataType::Int16 => Ok(ck_column.r#type.contains("UInt16")
430                | ck_column.r#type.contains("Int16")
431                // Allow Int16 to be pushed to Enum16, they share an encoding and value range
432                // No special care is taken to ensure values are valid.
433                | ck_column.r#type.contains("Enum16")),
434            risingwave_common::types::DataType::Int32 => {
435                Ok(ck_column.r#type.contains("UInt32") | ck_column.r#type.contains("Int32"))
436            }
437            risingwave_common::types::DataType::Int64 => {
438                Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
439            }
440            risingwave_common::types::DataType::Float32 => Ok(ck_column.r#type.contains("Float32")),
441            risingwave_common::types::DataType::Float64 => Ok(ck_column.r#type.contains("Float64")),
442            risingwave_common::types::DataType::Decimal => Ok(ck_column.r#type.contains("Decimal")),
443            risingwave_common::types::DataType::Date => Ok(ck_column.r#type.contains("Date32")),
444            risingwave_common::types::DataType::Varchar => Ok(ck_column.r#type.contains("String")),
445            risingwave_common::types::DataType::Time => Err(SinkError::ClickHouse(
446                "clickhouse can not support Time".to_owned(),
447            )),
448            risingwave_common::types::DataType::Timestamp => Err(SinkError::ClickHouse(
449                "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
450            )),
451            risingwave_common::types::DataType::Timestamptz => {
452                Ok(ck_column.r#type.contains("DateTime64"))
453            }
454            risingwave_common::types::DataType::Interval => Err(SinkError::ClickHouse(
455                "clickhouse can not support Interval".to_owned(),
456            )),
457            risingwave_common::types::DataType::Struct(_) => Err(SinkError::ClickHouse(
458                "struct needs to be converted into a list".to_owned(),
459            )),
460            risingwave_common::types::DataType::List(list) => {
461                Self::check_and_correct_column_type(list.as_ref(), ck_column)?;
462                Ok(ck_column.r#type.contains("Array"))
463            }
464            risingwave_common::types::DataType::Bytea => Err(SinkError::ClickHouse(
465                "clickhouse can not support Bytea".to_owned(),
466            )),
467            risingwave_common::types::DataType::Jsonb => Err(SinkError::ClickHouse(
468                "clickhouse rust can not support Json".to_owned(),
469            )),
470            risingwave_common::types::DataType::Serial => {
471                Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
472            }
473            risingwave_common::types::DataType::Int256 => Err(SinkError::ClickHouse(
474                "clickhouse can not support Int256".to_owned(),
475            )),
476            risingwave_common::types::DataType::Map(_) => Err(SinkError::ClickHouse(
477                "clickhouse can not support Map".to_owned(),
478            )),
479        };
480        if !is_match? {
481            return Err(SinkError::ClickHouse(format!(
482                "Column type can not match name is {:?}, risingwave is {:?} and clickhouse is {:?}",
483                ck_column.name, fields_type, ck_column.r#type
484            )));
485        }
486
487        Ok(())
488    }
489}
490impl Sink for ClickHouseSink {
491    type Coordinator = DummySinkCommitCoordinator;
492    type LogSinker = DecoupleCheckpointLogSinkerOf<ClickHouseSinkWriter>;
493
494    const SINK_NAME: &'static str = CLICKHOUSE_SINK;
495
496    async fn validate(&self) -> Result<()> {
497        // For upsert clickhouse sink, the primary key must be defined.
498        if !self.is_append_only && self.pk_indices.is_empty() {
499            return Err(SinkError::Config(anyhow!(
500                "Primary key not defined for upsert clickhouse sink (please define in `primary_key` field)"
501            )));
502        }
503
504        // check reachability
505        let client = self.config.common.build_client()?;
506
507        let (clickhouse_column, clickhouse_engine) =
508            query_column_engine_from_ck(client, &self.config).await?;
509        if clickhouse_engine.is_shared_tree() {
510            risingwave_common::license::Feature::ClickHouseSharedEngine
511                .check_available()
512                .map_err(|e| anyhow::anyhow!(e))?;
513        }
514
515        if !self.is_append_only
516            && !clickhouse_engine.is_collapsing_engine()
517            && !clickhouse_engine.is_delete_replacing_engine()
518        {
519            return match clickhouse_engine {
520                ClickHouseEngine::ReplicatedReplacingMergeTree(None) | ClickHouseEngine::ReplacingMergeTree(None) | ClickHouseEngine::SharedReplacingMergeTree(None) =>  {
521                    Err(SinkError::ClickHouse("To enable upsert with a `ReplacingMergeTree`, you must set a `clickhouse.delete.column` to the UInt8 column in ClickHouse used to signify deletes. See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree#is_deleted for more information".to_owned()))
522                }
523                _ => Err(SinkError::ClickHouse("If you want to use upsert, please use either `VersionedCollapsingMergeTree`, `CollapsingMergeTree` or the `ReplacingMergeTree` in ClickHouse".to_owned()))
524            };
525        }
526
527        self.check_column_name_and_type(&clickhouse_column)?;
528        if !self.is_append_only {
529            self.check_pk_match(&clickhouse_column)?;
530        }
531
532        if self.config.common.commit_checkpoint_interval == 0 {
533            return Err(SinkError::Config(anyhow!(
534                "`commit_checkpoint_interval` must be greater than 0"
535            )));
536        }
537        Ok(())
538    }
539
540    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
541        let writer = ClickHouseSinkWriter::new(
542            self.config.clone(),
543            self.schema.clone(),
544            self.pk_indices.clone(),
545            self.is_append_only,
546        )
547        .await?;
548        let commit_checkpoint_interval =
549    NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
550        "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
551    );
552
553        Ok(DecoupleCheckpointLogSinkerOf::new(
554            writer,
555            SinkWriterMetrics::new(&writer_param),
556            commit_checkpoint_interval,
557        ))
558    }
559}
560pub struct ClickHouseSinkWriter {
561    pub config: ClickHouseConfig,
562    #[expect(dead_code)]
563    schema: Schema,
564    #[expect(dead_code)]
565    pk_indices: Vec<usize>,
566    client: ClickHouseClient,
567    #[expect(dead_code)]
568    is_append_only: bool,
569    // Save some features of the clickhouse column type
570    column_correct_vec: Vec<ClickHouseSchemaFeature>,
571    rw_fields_name_after_calibration: Vec<String>,
572    clickhouse_engine: ClickHouseEngine,
573    inserter: Option<Insert<ClickHouseColumn>>,
574}
575#[derive(Debug)]
576struct ClickHouseSchemaFeature {
577    can_null: bool,
578    // Time accuracy in clickhouse for rw and ck conversions
579    accuracy_time: u8,
580
581    accuracy_decimal: (u8, u8),
582}
583
584impl ClickHouseSinkWriter {
585    pub async fn new(
586        config: ClickHouseConfig,
587        schema: Schema,
588        pk_indices: Vec<usize>,
589        is_append_only: bool,
590    ) -> Result<Self> {
591        let client = config.common.build_client()?;
592
593        let (clickhouse_column, clickhouse_engine) =
594            query_column_engine_from_ck(client.clone(), &config).await?;
595
596        let column_correct_vec: Result<Vec<ClickHouseSchemaFeature>> = clickhouse_column
597            .iter()
598            .map(Self::build_column_correct_vec)
599            .collect();
600        let mut rw_fields_name_after_calibration = build_fields_name_type_from_schema(&schema)?
601            .iter()
602            .map(|(a, _)| a.clone())
603            .collect_vec();
604
605        if let Some(sign) = clickhouse_engine.get_sign_name() {
606            rw_fields_name_after_calibration.push(sign);
607        }
608        if let Some(delete_col) = clickhouse_engine.get_delete_col() {
609            rw_fields_name_after_calibration.push(delete_col);
610        }
611        Ok(Self {
612            config,
613            schema,
614            pk_indices,
615            client,
616            is_append_only,
617            column_correct_vec: column_correct_vec?,
618            rw_fields_name_after_calibration,
619            clickhouse_engine,
620            inserter: None,
621        })
622    }
623
624    /// Check if clickhouse's column is 'Nullable', valid bits of `DateTime64`. And save it in
625    /// `column_correct_vec`
626    fn build_column_correct_vec(ck_column: &SystemColumn) -> Result<ClickHouseSchemaFeature> {
627        let can_null = ck_column.r#type.contains("Nullable");
628        // `DateTime64` without precision is already displayed as `DateTime(3)` in `system.columns`.
629        let accuracy_time = if ck_column.r#type.contains("DateTime64(") {
630            ck_column
631                .r#type
632                .split("DateTime64(")
633                .last()
634                .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
635                .split(')')
636                .next()
637                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
638                .split(',')
639                .next()
640                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
641                .parse::<u8>()
642                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?
643        } else {
644            0_u8
645        };
646        let accuracy_decimal = if ck_column.r#type.contains("Decimal(") {
647            let decimal_all = ck_column
648                .r#type
649                .split("Decimal(")
650                .last()
651                .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
652                .split(')')
653                .next()
654                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
655                .split(", ")
656                .collect_vec();
657            let length = decimal_all
658                .first()
659                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
660                .parse::<u8>()
661                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
662
663            if length > 38 {
664                return Err(SinkError::ClickHouse(
665                    "RW don't support Decimal256".to_owned(),
666                ));
667            }
668
669            let scale = decimal_all
670                .last()
671                .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
672                .parse::<u8>()
673                .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
674            (length, scale)
675        } else {
676            (0_u8, 0_u8)
677        };
678        Ok(ClickHouseSchemaFeature {
679            can_null,
680            accuracy_time,
681            accuracy_decimal,
682        })
683    }
684
685    async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
686        if self.inserter.is_none() {
687            self.inserter = Some(self.client.insert_with_fields_name(
688                &self.config.common.table,
689                self.rw_fields_name_after_calibration.clone(),
690            )?);
691        }
692        for (op, row) in chunk.rows() {
693            let mut clickhouse_filed_vec = vec![];
694            for (index, data) in row.iter().enumerate() {
695                clickhouse_filed_vec.extend(ClickHouseFieldWithNull::from_scalar_ref(
696                    data,
697                    &self.column_correct_vec,
698                    index,
699                )?);
700            }
701            match op {
702                Op::Insert | Op::UpdateInsert => {
703                    if self.clickhouse_engine.is_collapsing_engine() {
704                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
705                            ClickHouseField::Int8(1),
706                        ));
707                    }
708                    if self.clickhouse_engine.is_delete_replacing_engine() {
709                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
710                            ClickHouseField::Int8(0),
711                        ))
712                    }
713                }
714                Op::Delete | Op::UpdateDelete => {
715                    if !self.clickhouse_engine.is_collapsing_engine()
716                        && !self.clickhouse_engine.is_delete_replacing_engine()
717                    {
718                        return Err(SinkError::ClickHouse(
719                            "Clickhouse engine don't support upsert".to_owned(),
720                        ));
721                    }
722                    if self.clickhouse_engine.is_collapsing_engine() {
723                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
724                            ClickHouseField::Int8(-1),
725                        ));
726                    }
727                    if self.clickhouse_engine.is_delete_replacing_engine() {
728                        clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
729                            ClickHouseField::Int8(1),
730                        ))
731                    }
732                }
733            }
734            let clickhouse_column = ClickHouseColumn {
735                row: clickhouse_filed_vec,
736            };
737            self.inserter
738                .as_mut()
739                .unwrap()
740                .write(&clickhouse_column)
741                .await?;
742        }
743        Ok(())
744    }
745}
746
747#[async_trait]
748impl SinkWriter for ClickHouseSinkWriter {
749    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
750        self.write(chunk).await
751    }
752
753    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
754        Ok(())
755    }
756
757    async fn abort(&mut self) -> Result<()> {
758        Ok(())
759    }
760
761    async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
762        if is_checkpoint && let Some(inserter) = self.inserter.take() {
763            inserter.end().await?;
764        }
765        Ok(())
766    }
767}
768
769#[derive(ClickHouseRow, Deserialize, Clone)]
770struct SystemColumn {
771    name: String,
772    r#type: String,
773    is_in_primary_key: u8,
774}
775
776#[derive(ClickHouseRow, Deserialize)]
777struct ClickhouseQueryEngine {
778    #[expect(dead_code)]
779    name: String,
780    engine: String,
781    create_table_query: String,
782}
783
784async fn query_column_engine_from_ck(
785    client: ClickHouseClient,
786    config: &ClickHouseConfig,
787) -> Result<(Vec<SystemColumn>, ClickHouseEngine)> {
788    let query_engine = QUERY_ENGINE;
789    let query_column = QUERY_COLUMN;
790
791    let clickhouse_engine = client
792        .query(query_engine)
793        .bind(config.common.database.clone())
794        .bind(config.common.table.clone())
795        .fetch_all::<ClickhouseQueryEngine>()
796        .await?;
797    let mut clickhouse_column = client
798        .query(query_column)
799        .bind(config.common.database.clone())
800        .bind(config.common.table.clone())
801        .bind("position")
802        .fetch_all::<SystemColumn>()
803        .await?;
804    if clickhouse_engine.is_empty() || clickhouse_column.is_empty() {
805        return Err(SinkError::ClickHouse(format!(
806            "table {:?}.{:?} is not find in clickhouse",
807            config.common.database, config.common.table
808        )));
809    }
810
811    let clickhouse_engine =
812        ClickHouseEngine::from_query_engine(clickhouse_engine.first().unwrap(), config)?;
813
814    if let Some(sign) = &clickhouse_engine.get_sign_name() {
815        clickhouse_column.retain(|a| sign.ne(&a.name))
816    }
817
818    if let Some(delete_col) = &clickhouse_engine.get_delete_col() {
819        clickhouse_column.retain(|a| delete_col.ne(&a.name))
820    }
821
822    Ok((clickhouse_column, clickhouse_engine))
823}
824
825/// Serialize this structure to simulate the `struct` call clickhouse interface
826#[derive(ClickHouseRow, Debug)]
827struct ClickHouseColumn {
828    row: Vec<ClickHouseFieldWithNull>,
829}
830
831/// Basic data types for use with the clickhouse interface
832#[derive(Debug)]
833enum ClickHouseField {
834    Int16(i16),
835    Int32(i32),
836    Int64(i64),
837    Serial(Serial),
838    Float32(f32),
839    Float64(f64),
840    String(String),
841    Bool(bool),
842    List(Vec<ClickHouseFieldWithNull>),
843    Int8(i8),
844    Decimal(ClickHouseDecimal),
845}
846#[derive(Debug)]
847enum ClickHouseDecimal {
848    Decimal32(i32),
849    Decimal64(i64),
850    Decimal128(i128),
851}
852impl Serialize for ClickHouseDecimal {
853    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
854    where
855        S: serde::Serializer,
856    {
857        match self {
858            ClickHouseDecimal::Decimal32(v) => serializer.serialize_i32(*v),
859            ClickHouseDecimal::Decimal64(v) => serializer.serialize_i64(*v),
860            ClickHouseDecimal::Decimal128(v) => serializer.serialize_i128(*v),
861        }
862    }
863}
864
865/// Enum that support clickhouse nullable
866#[derive(Debug)]
867enum ClickHouseFieldWithNull {
868    WithSome(ClickHouseField),
869    WithoutSome(ClickHouseField),
870    None,
871}
872
873impl ClickHouseFieldWithNull {
874    pub fn from_scalar_ref(
875        data: Option<ScalarRefImpl<'_>>,
876        clickhouse_schema_feature_vec: &Vec<ClickHouseSchemaFeature>,
877        clickhouse_schema_feature_index: usize,
878    ) -> Result<Vec<ClickHouseFieldWithNull>> {
879        let clickhouse_schema_feature = clickhouse_schema_feature_vec
880            .get(clickhouse_schema_feature_index)
881            .ok_or_else(|| SinkError::ClickHouse(format!("No column found from clickhouse table schema, index is {clickhouse_schema_feature_index}")))?;
882        if data.is_none() {
883            if !clickhouse_schema_feature.can_null {
884                return Err(SinkError::ClickHouse(
885                    "clickhouse column can not insert null".to_owned(),
886                ));
887            } else {
888                return Ok(vec![ClickHouseFieldWithNull::None]);
889            }
890        }
891        let data = match data.unwrap() {
892            ScalarRefImpl::Int16(v) => ClickHouseField::Int16(v),
893            ScalarRefImpl::Int32(v) => ClickHouseField::Int32(v),
894            ScalarRefImpl::Int64(v) => ClickHouseField::Int64(v),
895            ScalarRefImpl::Int256(_) => {
896                return Err(SinkError::ClickHouse(
897                    "clickhouse can not support Int256".to_owned(),
898                ));
899            }
900            ScalarRefImpl::Serial(v) => ClickHouseField::Serial(v),
901            ScalarRefImpl::Float32(v) => ClickHouseField::Float32(v.into_inner()),
902            ScalarRefImpl::Float64(v) => ClickHouseField::Float64(v.into_inner()),
903            ScalarRefImpl::Utf8(v) => ClickHouseField::String(v.to_owned()),
904            ScalarRefImpl::Bool(v) => ClickHouseField::Bool(v),
905            ScalarRefImpl::Decimal(d) => {
906                let d = if let Decimal::Normalized(d) = d {
907                    let scale =
908                        clickhouse_schema_feature.accuracy_decimal.1 as i32 - d.scale() as i32;
909                    if scale < 0 {
910                        d.mantissa() / 10_i128.pow(scale.unsigned_abs())
911                    } else {
912                        d.mantissa() * 10_i128.pow(scale as u32)
913                    }
914                } else if clickhouse_schema_feature.can_null {
915                    warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse null!");
916                    return Ok(vec![ClickHouseFieldWithNull::None]);
917                } else {
918                    warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse 0!");
919                    0_i128
920                };
921                if clickhouse_schema_feature.accuracy_decimal.0 <= 9 {
922                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(d as i32))
923                } else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 {
924                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(d as i64))
925                } else {
926                    ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(d))
927                }
928            }
929            ScalarRefImpl::Interval(_) => {
930                return Err(SinkError::ClickHouse(
931                    "clickhouse can not support Interval".to_owned(),
932                ));
933            }
934            ScalarRefImpl::Date(v) => {
935                let days = v.get_nums_days_unix_epoch();
936                ClickHouseField::Int32(days)
937            }
938            ScalarRefImpl::Time(_) => {
939                return Err(SinkError::ClickHouse(
940                    "clickhouse can not support Time".to_owned(),
941                ));
942            }
943            ScalarRefImpl::Timestamp(_) => {
944                return Err(SinkError::ClickHouse(
945                    "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
946                ));
947            }
948            ScalarRefImpl::Timestamptz(v) => {
949                let micros = v.timestamp_micros();
950                let ticks = match clickhouse_schema_feature.accuracy_time <= 6 {
951                    true => {
952                        micros / 10_i64.pow((6 - clickhouse_schema_feature.accuracy_time).into())
953                    }
954                    false => micros
955                        .checked_mul(
956                            10_i64.pow((clickhouse_schema_feature.accuracy_time - 6).into()),
957                        )
958                        .ok_or_else(|| SinkError::ClickHouse("DateTime64 overflow".to_owned()))?,
959                };
960                ClickHouseField::Int64(ticks)
961            }
962            ScalarRefImpl::Jsonb(_) => {
963                return Err(SinkError::ClickHouse(
964                    "clickhouse rust interface can not support Json".to_owned(),
965                ));
966            }
967            ScalarRefImpl::Struct(v) => {
968                let mut struct_vec = vec![];
969                for (index, field) in v.iter_fields_ref().enumerate() {
970                    let a = Self::from_scalar_ref(
971                        field,
972                        clickhouse_schema_feature_vec,
973                        clickhouse_schema_feature_index + index,
974                    )?;
975                    struct_vec.push(ClickHouseFieldWithNull::WithoutSome(ClickHouseField::List(
976                        a,
977                    )));
978                }
979                return Ok(struct_vec);
980            }
981            ScalarRefImpl::List(v) => {
982                let mut vec = vec![];
983                for i in v.iter() {
984                    vec.extend(Self::from_scalar_ref(
985                        i,
986                        clickhouse_schema_feature_vec,
987                        clickhouse_schema_feature_index,
988                    )?)
989                }
990                return Ok(vec![ClickHouseFieldWithNull::WithoutSome(
991                    ClickHouseField::List(vec),
992                )]);
993            }
994            ScalarRefImpl::Bytea(_) => {
995                return Err(SinkError::ClickHouse(
996                    "clickhouse can not support Bytea".to_owned(),
997                ));
998            }
999            ScalarRefImpl::Map(_) => {
1000                return Err(SinkError::ClickHouse(
1001                    "clickhouse can not support Map".to_owned(),
1002                ));
1003            }
1004        };
1005        let data = if clickhouse_schema_feature.can_null {
1006            vec![ClickHouseFieldWithNull::WithSome(data)]
1007        } else {
1008            vec![ClickHouseFieldWithNull::WithoutSome(data)]
1009        };
1010        Ok(data)
1011    }
1012}
1013
1014impl Serialize for ClickHouseField {
1015    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1016    where
1017        S: serde::Serializer,
1018    {
1019        match self {
1020            ClickHouseField::Int16(v) => serializer.serialize_i16(*v),
1021            ClickHouseField::Int32(v) => serializer.serialize_i32(*v),
1022            ClickHouseField::Int64(v) => serializer.serialize_i64(*v),
1023            ClickHouseField::Serial(v) => v.serialize(serializer),
1024            ClickHouseField::Float32(v) => serializer.serialize_f32(*v),
1025            ClickHouseField::Float64(v) => serializer.serialize_f64(*v),
1026            ClickHouseField::String(v) => serializer.serialize_str(v),
1027            ClickHouseField::Bool(v) => serializer.serialize_bool(*v),
1028            ClickHouseField::List(v) => {
1029                let mut s = serializer.serialize_seq(Some(v.len()))?;
1030                for i in v {
1031                    s.serialize_element(i)?;
1032                }
1033                s.end()
1034            }
1035            ClickHouseField::Decimal(v) => v.serialize(serializer),
1036            ClickHouseField::Int8(v) => serializer.serialize_i8(*v),
1037        }
1038    }
1039}
1040impl Serialize for ClickHouseFieldWithNull {
1041    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1042    where
1043        S: serde::Serializer,
1044    {
1045        match self {
1046            ClickHouseFieldWithNull::WithSome(v) => serializer.serialize_some(v),
1047            ClickHouseFieldWithNull::WithoutSome(v) => v.serialize(serializer),
1048            ClickHouseFieldWithNull::None => serializer.serialize_none(),
1049        }
1050    }
1051}
1052impl Serialize for ClickHouseColumn {
1053    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1054    where
1055        S: serde::Serializer,
1056    {
1057        let mut s = serializer.serialize_struct("useless", self.row.len())?;
1058        for data in &self.row {
1059            s.serialize_field("useless", &data)?
1060        }
1061        s.end()
1062    }
1063}
1064
1065/// 'Struct'(clickhouse type name is nested) will be converted into some arrays by clickhouse. So we
1066/// need to make some conversions
1067pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result<Vec<(String, DataType)>> {
1068    let mut vec = vec![];
1069    for field in schema.fields() {
1070        if let DataType::Struct(st) = &field.data_type {
1071            for (name, data_type) in st.iter() {
1072                if matches!(data_type, DataType::Struct(_)) {
1073                    return Err(SinkError::ClickHouse(
1074                        "Only one level of nesting is supported for struct".to_owned(),
1075                    ));
1076                } else {
1077                    vec.push((
1078                        format!("{}.{}", field.name, name),
1079                        DataType::List(Box::new(data_type.clone())),
1080                    ))
1081                }
1082            }
1083        } else {
1084            vec.push((field.name.clone(), field.data_type()));
1085        }
1086    }
1087    Ok(vec)
1088}