1use core::fmt::Debug;
16use core::num::NonZeroU64;
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use anyhow::anyhow;
20use clickhouse::insert::Insert;
21use clickhouse::{Client as ClickHouseClient, Row as ClickHouseRow};
22use itertools::Itertools;
23use phf::{Set, phf_set};
24use risingwave_common::array::{Op, StreamChunk};
25use risingwave_common::catalog::Schema;
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
28use serde::ser::{SerializeSeq, SerializeStruct};
29use serde::{Deserialize, Serialize};
30use serde_with::{DisplayFromStr, serde_as};
31use thiserror_ext::AsReport;
32use tonic::async_trait;
33use tracing::warn;
34use with_options::WithOptions;
35
36use super::decouple_checkpoint_log_sink::{
37 DecoupleCheckpointLogSinkerOf, default_commit_checkpoint_interval,
38};
39use super::writer::SinkWriter;
40use super::{SinkWriterMetrics, SinkWriterParam};
41use crate::enforce_secret::EnforceSecret;
42use crate::error::ConnectorResult;
43use crate::sink::{
44 Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink, SinkError, SinkParam,
45};
46
47const QUERY_ENGINE: &str =
48 "select distinct ?fields from system.tables where database = ? and name = ?";
49const QUERY_COLUMN: &str =
50 "select distinct ?fields from system.columns where database = ? and table = ? order by ?";
51pub const CLICKHOUSE_SINK: &str = "clickhouse";
52
53const ALLOW_EXPERIMENTAL_JSON_TYPE: &str = "allow_experimental_json_type";
54const INPUT_FORMAT_BINARY_READ_JSON_AS_STRING: &str = "input_format_binary_read_json_as_string";
55const OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING: &str = "output_format_binary_write_json_as_string";
56
57#[serde_as]
58#[derive(Deserialize, Debug, Clone, WithOptions)]
59pub struct ClickHouseCommon {
60 #[serde(rename = "clickhouse.url")]
61 pub url: String,
62 #[serde(rename = "clickhouse.user")]
63 pub user: String,
64 #[serde(rename = "clickhouse.password")]
65 pub password: String,
66 #[serde(rename = "clickhouse.database")]
67 pub database: String,
68 #[serde(rename = "clickhouse.table")]
69 pub table: String,
70 #[serde(rename = "clickhouse.delete.column")]
71 pub delete_column: Option<String>,
72 #[serde(default = "default_commit_checkpoint_interval")]
74 #[serde_as(as = "DisplayFromStr")]
75 #[with_option(allow_alter_on_fly)]
76 pub commit_checkpoint_interval: u64,
77}
78
79impl EnforceSecret for ClickHouseCommon {
80 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
81 "clickhouse.password", "clickhouse.user"
82 };
83}
84
85#[allow(clippy::enum_variant_names)]
86#[derive(Debug)]
87enum ClickHouseEngine {
88 MergeTree,
89 ReplacingMergeTree(Option<String>),
90 SummingMergeTree,
91 AggregatingMergeTree,
92 CollapsingMergeTree(String),
93 VersionedCollapsingMergeTree(String),
94 GraphiteMergeTree,
95 ReplicatedMergeTree,
96 ReplicatedReplacingMergeTree(Option<String>),
97 ReplicatedSummingMergeTree,
98 ReplicatedAggregatingMergeTree,
99 ReplicatedCollapsingMergeTree(String),
100 ReplicatedVersionedCollapsingMergeTree(String),
101 ReplicatedGraphiteMergeTree,
102 SharedMergeTree,
103 SharedReplacingMergeTree(Option<String>),
104 SharedSummingMergeTree,
105 SharedAggregatingMergeTree,
106 SharedCollapsingMergeTree(String),
107 SharedVersionedCollapsingMergeTree(String),
108 SharedGraphiteMergeTree,
109 Null,
110}
111impl ClickHouseEngine {
112 pub fn is_collapsing_engine(&self) -> bool {
113 matches!(
114 self,
115 ClickHouseEngine::CollapsingMergeTree(_)
116 | ClickHouseEngine::VersionedCollapsingMergeTree(_)
117 | ClickHouseEngine::ReplicatedCollapsingMergeTree(_)
118 | ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(_)
119 | ClickHouseEngine::SharedCollapsingMergeTree(_)
120 | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
121 )
122 }
123
124 pub fn is_delete_replacing_engine(&self) -> bool {
125 match self {
126 ClickHouseEngine::ReplacingMergeTree(delete_col) => delete_col.is_some(),
127 ClickHouseEngine::ReplicatedReplacingMergeTree(delete_col) => delete_col.is_some(),
128 ClickHouseEngine::SharedReplacingMergeTree(delete_col) => delete_col.is_some(),
129 _ => false,
130 }
131 }
132
133 pub fn get_delete_col(&self) -> Option<String> {
134 match self {
135 ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.clone()),
136 ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => {
137 Some(delete_col.clone())
138 }
139 ClickHouseEngine::SharedReplacingMergeTree(Some(delete_col)) => {
140 Some(delete_col.clone())
141 }
142 _ => None,
143 }
144 }
145
146 pub fn get_sign_name(&self) -> Option<String> {
147 match self {
148 ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.clone()),
149 ClickHouseEngine::VersionedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
150 ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
151 ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(sign_name) => {
152 Some(sign_name.clone())
153 }
154 ClickHouseEngine::SharedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
155 ClickHouseEngine::SharedVersionedCollapsingMergeTree(sign_name) => {
156 Some(sign_name.clone())
157 }
158 _ => None,
159 }
160 }
161
162 pub fn is_shared_tree(&self) -> bool {
163 matches!(
164 self,
165 ClickHouseEngine::SharedMergeTree
166 | ClickHouseEngine::SharedReplacingMergeTree(_)
167 | ClickHouseEngine::SharedSummingMergeTree
168 | ClickHouseEngine::SharedAggregatingMergeTree
169 | ClickHouseEngine::SharedCollapsingMergeTree(_)
170 | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
171 | ClickHouseEngine::SharedGraphiteMergeTree
172 )
173 }
174
175 pub fn from_query_engine(
176 engine_name: &ClickhouseQueryEngine,
177 config: &ClickHouseConfig,
178 ) -> Result<Self> {
179 match engine_name.engine.as_str() {
180 "MergeTree" => Ok(ClickHouseEngine::MergeTree),
181 "Null" => Ok(ClickHouseEngine::Null),
182 "ReplacingMergeTree" => {
183 let delete_column = config.common.delete_column.clone();
184 Ok(ClickHouseEngine::ReplacingMergeTree(delete_column))
185 }
186 "SummingMergeTree" => Ok(ClickHouseEngine::SummingMergeTree),
187 "AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree),
188 "VersionedCollapsingMergeTree" => {
190 let sign_name = engine_name
191 .create_table_query
192 .split("VersionedCollapsingMergeTree(")
193 .last()
194 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
195 .split(',')
196 .next()
197 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
198 .trim()
199 .to_owned();
200 Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name))
201 }
202 "CollapsingMergeTree" => {
204 let sign_name = engine_name
205 .create_table_query
206 .split("CollapsingMergeTree(")
207 .last()
208 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
209 .split(')')
210 .next()
211 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
212 .trim()
213 .to_owned();
214 Ok(ClickHouseEngine::CollapsingMergeTree(sign_name))
215 }
216 "GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree),
217 "ReplicatedMergeTree" => Ok(ClickHouseEngine::ReplicatedMergeTree),
218 "ReplicatedReplacingMergeTree" => {
219 let delete_column = config.common.delete_column.clone();
220 Ok(ClickHouseEngine::ReplicatedReplacingMergeTree(
221 delete_column,
222 ))
223 }
224 "ReplicatedSummingMergeTree" => Ok(ClickHouseEngine::ReplicatedSummingMergeTree),
225 "ReplicatedAggregatingMergeTree" => {
226 Ok(ClickHouseEngine::ReplicatedAggregatingMergeTree)
227 }
228 "ReplicatedVersionedCollapsingMergeTree" => {
230 let sign_name = engine_name
231 .create_table_query
232 .split("ReplicatedVersionedCollapsingMergeTree(")
233 .last()
234 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
235 .split(',')
236 .rev()
237 .nth(1)
238 .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
239 .trim()
240 .to_owned();
241 Ok(ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(
242 sign_name,
243 ))
244 }
245 "ReplicatedCollapsingMergeTree" => {
247 let sign_name = engine_name
248 .create_table_query
249 .split("ReplicatedCollapsingMergeTree(")
250 .last()
251 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
252 .split(')')
253 .next()
254 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
255 .split(',')
256 .next_back()
257 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
258 .trim()
259 .to_owned();
260 Ok(ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name))
261 }
262 "ReplicatedGraphiteMergeTree" => Ok(ClickHouseEngine::ReplicatedGraphiteMergeTree),
263 "SharedMergeTree" => Ok(ClickHouseEngine::SharedMergeTree),
264 "SharedReplacingMergeTree" => {
265 let delete_column = config.common.delete_column.clone();
266 Ok(ClickHouseEngine::SharedReplacingMergeTree(delete_column))
267 }
268 "SharedSummingMergeTree" => Ok(ClickHouseEngine::SharedSummingMergeTree),
269 "SharedAggregatingMergeTree" => Ok(ClickHouseEngine::SharedAggregatingMergeTree),
270 "SharedVersionedCollapsingMergeTree" => {
272 let sign_name = engine_name
273 .create_table_query
274 .split("SharedVersionedCollapsingMergeTree(")
275 .last()
276 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
277 .split(',')
278 .rev()
279 .nth(1)
280 .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
281 .trim()
282 .to_owned();
283 Ok(ClickHouseEngine::SharedVersionedCollapsingMergeTree(
284 sign_name,
285 ))
286 }
287 "SharedCollapsingMergeTree" => {
289 let sign_name = engine_name
290 .create_table_query
291 .split("SharedCollapsingMergeTree(")
292 .last()
293 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
294 .split(')')
295 .next()
296 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
297 .split(',')
298 .next_back()
299 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
300 .trim()
301 .to_owned();
302 Ok(ClickHouseEngine::SharedCollapsingMergeTree(sign_name))
303 }
304 "SharedGraphiteMergeTree" => Ok(ClickHouseEngine::SharedGraphiteMergeTree),
305 _ => Err(SinkError::ClickHouse(format!(
306 "Cannot find clickhouse engine {:?}",
307 engine_name.engine
308 ))),
309 }
310 }
311}
312
313impl ClickHouseCommon {
314 pub(crate) fn build_client(&self) -> ConnectorResult<ClickHouseClient> {
315 let client = ClickHouseClient::default() .with_url(&self.url)
317 .with_user(&self.user)
318 .with_password(&self.password)
319 .with_database(&self.database)
320 .with_option(ALLOW_EXPERIMENTAL_JSON_TYPE, "1")
321 .with_option(INPUT_FORMAT_BINARY_READ_JSON_AS_STRING, "1")
322 .with_option(OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING, "1");
323 Ok(client)
324 }
325}
326
327#[serde_as]
328#[derive(Clone, Debug, Deserialize, WithOptions)]
329pub struct ClickHouseConfig {
330 #[serde(flatten)]
331 pub common: ClickHouseCommon,
332
333 pub r#type: String, }
335
336impl EnforceSecret for ClickHouseConfig {
337 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
338 ClickHouseCommon::enforce_one(prop)
339 }
340
341 fn enforce_secret<'a>(
342 prop_iter: impl Iterator<Item = &'a str>,
343 ) -> crate::error::ConnectorResult<()> {
344 for prop in prop_iter {
345 ClickHouseCommon::enforce_one(prop)?;
346 }
347 Ok(())
348 }
349}
350
351#[derive(Clone, Debug)]
352pub struct ClickHouseSink {
353 pub config: ClickHouseConfig,
354 schema: Schema,
355 pk_indices: Vec<usize>,
356 is_append_only: bool,
357}
358
359impl EnforceSecret for ClickHouseSink {
360 fn enforce_secret<'a>(
361 prop_iter: impl Iterator<Item = &'a str>,
362 ) -> crate::error::ConnectorResult<()> {
363 for prop in prop_iter {
364 ClickHouseConfig::enforce_one(prop)?;
365 }
366 Ok(())
367 }
368}
369
370impl ClickHouseConfig {
371 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
372 let config =
373 serde_json::from_value::<ClickHouseConfig>(serde_json::to_value(properties).unwrap())
374 .map_err(|e| SinkError::Config(anyhow!(e)))?;
375 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
376 return Err(SinkError::Config(anyhow!(
377 "`{}` must be {}, or {}",
378 SINK_TYPE_OPTION,
379 SINK_TYPE_APPEND_ONLY,
380 SINK_TYPE_UPSERT
381 )));
382 }
383 Ok(config)
384 }
385}
386
387impl TryFrom<SinkParam> for ClickHouseSink {
388 type Error = SinkError;
389
390 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
391 let schema = param.schema();
392 let pk_indices = param.downstream_pk_or_empty();
393 let config = ClickHouseConfig::from_btreemap(param.properties)?;
394 Ok(Self {
395 config,
396 schema,
397 pk_indices,
398 is_append_only: param.sink_type.is_append_only(),
399 })
400 }
401}
402
403impl ClickHouseSink {
404 fn check_column_name_and_type(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
406 let rw_fields_name = build_fields_name_type_from_schema(&self.schema)?;
407 let clickhouse_columns_desc: HashMap<String, SystemColumn> = clickhouse_columns_desc
408 .iter()
409 .map(|s| (s.name.clone(), s.clone()))
410 .collect();
411
412 if rw_fields_name.len().gt(&clickhouse_columns_desc.len()) {
413 return Err(SinkError::ClickHouse("The columns of the sink must be equal to or a superset of the target table's columns.".to_owned()));
414 }
415
416 for i in rw_fields_name {
417 let value = clickhouse_columns_desc.get(&i.0).ok_or_else(|| {
418 SinkError::ClickHouse(format!(
419 "Column name don't find in clickhouse, risingwave is {:?} ",
420 i.0
421 ))
422 })?;
423
424 Self::check_and_correct_column_type(&i.1, value)?;
425 }
426 Ok(())
427 }
428
429 fn check_pk_match(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
431 let mut clickhouse_pks: HashSet<String> = clickhouse_columns_desc
432 .iter()
433 .filter(|s| s.is_in_primary_key == 1)
434 .map(|s| s.name.clone())
435 .collect();
436
437 for (_, field) in self
438 .schema
439 .fields()
440 .iter()
441 .enumerate()
442 .filter(|(index, _)| self.pk_indices.contains(index))
443 {
444 if !clickhouse_pks.remove(&field.name) {
445 return Err(SinkError::ClickHouse(
446 "Clicklhouse and RisingWave pk is not match".to_owned(),
447 ));
448 }
449 }
450
451 if !clickhouse_pks.is_empty() {
452 return Err(SinkError::ClickHouse(
453 "Clicklhouse and RisingWave pk is not match".to_owned(),
454 ));
455 }
456 Ok(())
457 }
458
459 fn check_and_correct_column_type(
461 fields_type: &DataType,
462 ck_column: &SystemColumn,
463 ) -> Result<()> {
464 let is_match = match fields_type {
466 risingwave_common::types::DataType::Boolean => Ok(ck_column.r#type.contains("Bool")),
467 risingwave_common::types::DataType::Int16 => Ok(ck_column.r#type.contains("UInt16")
468 | ck_column.r#type.contains("Int16")
469 | ck_column.r#type.contains("Enum16")),
472 risingwave_common::types::DataType::Int32 => {
473 Ok(ck_column.r#type.contains("UInt32") | ck_column.r#type.contains("Int32"))
474 }
475 risingwave_common::types::DataType::Int64 => {
476 Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
477 }
478 risingwave_common::types::DataType::Float32 => Ok(ck_column.r#type.contains("Float32")),
479 risingwave_common::types::DataType::Float64 => Ok(ck_column.r#type.contains("Float64")),
480 risingwave_common::types::DataType::Decimal => Ok(ck_column.r#type.contains("Decimal")),
481 risingwave_common::types::DataType::Date => Ok(ck_column.r#type.contains("Date32")),
482 risingwave_common::types::DataType::Varchar => Ok(ck_column.r#type.contains("String")),
483 risingwave_common::types::DataType::Time => Err(SinkError::ClickHouse(
484 "TIME is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
485 )),
486 risingwave_common::types::DataType::Timestamp => Err(SinkError::ClickHouse(
487 "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
488 )),
489 risingwave_common::types::DataType::Timestamptz => {
490 Ok(ck_column.r#type.contains("DateTime64"))
491 }
492 risingwave_common::types::DataType::Interval => Err(SinkError::ClickHouse(
493 "INTERVAL is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
494 )),
495 risingwave_common::types::DataType::Struct(_) => Err(SinkError::ClickHouse(
496 "struct needs to be converted into a list".to_owned(),
497 )),
498 risingwave_common::types::DataType::List(list) => {
499 Self::check_and_correct_column_type(list.elem(), ck_column)?;
500 Ok(ck_column.r#type.contains("Array"))
501 }
502 risingwave_common::types::DataType::Bytea => Err(SinkError::ClickHouse(
503 "BYTEA is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
504 )),
505 risingwave_common::types::DataType::Jsonb => Ok(ck_column.r#type.contains("JSON")),
506 risingwave_common::types::DataType::Serial => {
507 Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
508 }
509 risingwave_common::types::DataType::Int256 => Err(SinkError::ClickHouse(
510 "INT256 is not supported for ClickHouse sink.".to_owned(),
511 )),
512 risingwave_common::types::DataType::Map(_) => Err(SinkError::ClickHouse(
513 "MAP is not supported for ClickHouse sink.".to_owned(),
514 )),
515 DataType::Vector(_) => Err(SinkError::ClickHouse(
516 "VECTOR is not supported for ClickHouse sink.".to_owned(),
517 )),
518 };
519 if !is_match? {
520 return Err(SinkError::ClickHouse(format!(
521 "Column type mismatch for column {:?}: RisingWave type is {:?}, ClickHouse type is {:?}",
522 ck_column.name, fields_type, ck_column.r#type
523 )));
524 }
525
526 Ok(())
527 }
528}
529
530impl Sink for ClickHouseSink {
531 type LogSinker = DecoupleCheckpointLogSinkerOf<ClickHouseSinkWriter>;
532
533 const SINK_NAME: &'static str = CLICKHOUSE_SINK;
534
535 async fn validate(&self) -> Result<()> {
536 if !self.is_append_only && self.pk_indices.is_empty() {
538 return Err(SinkError::Config(anyhow!(
539 "Primary key not defined for upsert clickhouse sink (please define in `primary_key` field)"
540 )));
541 }
542
543 let client = self.config.common.build_client()?;
545
546 let (clickhouse_column, clickhouse_engine) =
547 query_column_engine_from_ck(client, &self.config).await?;
548 if clickhouse_engine.is_shared_tree() {
549 risingwave_common::license::Feature::ClickHouseSharedEngine
550 .check_available()
551 .map_err(|e| anyhow::anyhow!(e))?;
552 }
553
554 if !self.is_append_only
555 && !clickhouse_engine.is_collapsing_engine()
556 && !clickhouse_engine.is_delete_replacing_engine()
557 {
558 return match clickhouse_engine {
559 ClickHouseEngine::ReplicatedReplacingMergeTree(None) | ClickHouseEngine::ReplacingMergeTree(None) | ClickHouseEngine::SharedReplacingMergeTree(None) => {
560 Err(SinkError::ClickHouse("To enable upsert with a `ReplacingMergeTree`, you must set a `clickhouse.delete.column` to the UInt8 column in ClickHouse used to signify deletes. See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree#is_deleted for more information".to_owned()))
561 }
562 _ => Err(SinkError::ClickHouse("If you want to use upsert, please use either `VersionedCollapsingMergeTree`, `CollapsingMergeTree` or the `ReplacingMergeTree` in ClickHouse".to_owned()))
563 };
564 }
565
566 self.check_column_name_and_type(&clickhouse_column)?;
567 if !self.is_append_only {
568 self.check_pk_match(&clickhouse_column)?;
569 }
570
571 if self.config.common.commit_checkpoint_interval == 0 {
572 return Err(SinkError::Config(anyhow!(
573 "`commit_checkpoint_interval` must be greater than 0"
574 )));
575 }
576 Ok(())
577 }
578
579 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
580 ClickHouseConfig::from_btreemap(config.clone())?;
581 Ok(())
582 }
583
584 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
585 let writer = ClickHouseSinkWriter::new(
586 self.config.clone(),
587 self.schema.clone(),
588 self.pk_indices.clone(),
589 self.is_append_only,
590 )
591 .await?;
592 let commit_checkpoint_interval =
593 NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
594 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
595 );
596
597 Ok(DecoupleCheckpointLogSinkerOf::new(
598 writer,
599 SinkWriterMetrics::new(&writer_param),
600 commit_checkpoint_interval,
601 ))
602 }
603}
604pub struct ClickHouseSinkWriter {
605 pub config: ClickHouseConfig,
606 #[expect(dead_code)]
607 schema: Schema,
608 #[expect(dead_code)]
609 pk_indices: Vec<usize>,
610 client: ClickHouseClient,
611 #[expect(dead_code)]
612 is_append_only: bool,
613 column_correct_vec: Vec<ClickHouseSchemaFeature>,
615 rw_fields_name_after_calibration: Vec<String>,
616 clickhouse_engine: ClickHouseEngine,
617 inserter: Option<Insert<ClickHouseColumn>>,
618}
619#[derive(Debug)]
620struct ClickHouseSchemaFeature {
621 can_null: bool,
622 accuracy_time: u8,
624
625 accuracy_decimal: (u8, u8),
626}
627
628impl ClickHouseSinkWriter {
629 pub async fn new(
630 config: ClickHouseConfig,
631 schema: Schema,
632 pk_indices: Vec<usize>,
633 is_append_only: bool,
634 ) -> Result<Self> {
635 let client = config.common.build_client()?;
636
637 let (clickhouse_column, clickhouse_engine) =
638 query_column_engine_from_ck(client.clone(), &config).await?;
639
640 let column_correct_vec: Result<Vec<ClickHouseSchemaFeature>> = clickhouse_column
641 .iter()
642 .map(Self::build_column_correct_vec)
643 .collect();
644 let mut rw_fields_name_after_calibration = build_fields_name_type_from_schema(&schema)?
645 .iter()
646 .map(|(a, _)| a.clone())
647 .collect_vec();
648
649 if let Some(sign) = clickhouse_engine.get_sign_name() {
650 rw_fields_name_after_calibration.push(sign);
651 }
652 if let Some(delete_col) = clickhouse_engine.get_delete_col() {
653 rw_fields_name_after_calibration.push(delete_col);
654 }
655 Ok(Self {
656 config,
657 schema,
658 pk_indices,
659 client,
660 is_append_only,
661 column_correct_vec: column_correct_vec?,
662 rw_fields_name_after_calibration,
663 clickhouse_engine,
664 inserter: None,
665 })
666 }
667
668 fn build_column_correct_vec(ck_column: &SystemColumn) -> Result<ClickHouseSchemaFeature> {
671 let can_null = ck_column.r#type.contains("Nullable");
672 let accuracy_time = if ck_column.r#type.contains("DateTime64(") {
674 ck_column
675 .r#type
676 .split("DateTime64(")
677 .last()
678 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
679 .split(')')
680 .next()
681 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
682 .split(',')
683 .next()
684 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
685 .parse::<u8>()
686 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?
687 } else {
688 0_u8
689 };
690 let accuracy_decimal = if ck_column.r#type.contains("Decimal(") {
691 let decimal_all = ck_column
692 .r#type
693 .split("Decimal(")
694 .last()
695 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
696 .split(')')
697 .next()
698 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
699 .split(", ")
700 .collect_vec();
701 let length = decimal_all
702 .first()
703 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
704 .parse::<u8>()
705 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
706
707 if length > 38 {
708 return Err(SinkError::ClickHouse(
709 "RW don't support Decimal256".to_owned(),
710 ));
711 }
712
713 let scale = decimal_all
714 .last()
715 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
716 .parse::<u8>()
717 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
718 (length, scale)
719 } else {
720 (0_u8, 0_u8)
721 };
722 Ok(ClickHouseSchemaFeature {
723 can_null,
724 accuracy_time,
725 accuracy_decimal,
726 })
727 }
728
729 async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
730 if self.inserter.is_none() {
731 self.inserter = Some(self.client.insert_with_fields_name(
732 &self.config.common.table,
733 self.rw_fields_name_after_calibration.clone(),
734 )?);
735 }
736 for (op, row) in chunk.rows() {
737 let mut clickhouse_filed_vec = vec![];
738 for (index, data) in row.iter().enumerate() {
739 clickhouse_filed_vec.extend(ClickHouseFieldWithNull::from_scalar_ref(
740 data,
741 &self.column_correct_vec,
742 index,
743 )?);
744 }
745 match op {
746 Op::Insert | Op::UpdateInsert => {
747 if self.clickhouse_engine.is_collapsing_engine() {
748 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
749 ClickHouseField::Int8(1),
750 ));
751 }
752 if self.clickhouse_engine.is_delete_replacing_engine() {
753 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
754 ClickHouseField::Int8(0),
755 ))
756 }
757 }
758 Op::Delete | Op::UpdateDelete => {
759 if !self.clickhouse_engine.is_collapsing_engine()
760 && !self.clickhouse_engine.is_delete_replacing_engine()
761 {
762 return Err(SinkError::ClickHouse(
763 "Clickhouse engine don't support upsert".to_owned(),
764 ));
765 }
766 if self.clickhouse_engine.is_collapsing_engine() {
767 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
768 ClickHouseField::Int8(-1),
769 ));
770 }
771 if self.clickhouse_engine.is_delete_replacing_engine() {
772 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
773 ClickHouseField::Int8(1),
774 ))
775 }
776 }
777 }
778 let clickhouse_column = ClickHouseColumn {
779 row: clickhouse_filed_vec,
780 };
781 self.inserter
782 .as_mut()
783 .unwrap()
784 .write(&clickhouse_column)
785 .await?;
786 }
787 Ok(())
788 }
789}
790
791#[async_trait]
792impl SinkWriter for ClickHouseSinkWriter {
793 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
794 self.write(chunk).await
795 }
796
797 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
798 Ok(())
799 }
800
801 async fn abort(&mut self) -> Result<()> {
802 Ok(())
803 }
804
805 async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
806 if is_checkpoint && let Some(inserter) = self.inserter.take() {
807 inserter.end().await?;
808 }
809 Ok(())
810 }
811}
812
813#[derive(ClickHouseRow, Deserialize, Clone)]
814struct SystemColumn {
815 name: String,
816 r#type: String,
817 is_in_primary_key: u8,
818}
819
820#[derive(ClickHouseRow, Deserialize)]
821struct ClickhouseQueryEngine {
822 #[expect(dead_code)]
823 name: String,
824 engine: String,
825 create_table_query: String,
826}
827
828async fn query_column_engine_from_ck(
829 client: ClickHouseClient,
830 config: &ClickHouseConfig,
831) -> Result<(Vec<SystemColumn>, ClickHouseEngine)> {
832 let query_engine = QUERY_ENGINE;
833 let query_column = QUERY_COLUMN;
834
835 let clickhouse_engine = client
836 .query(query_engine)
837 .bind(config.common.database.clone())
838 .bind(config.common.table.clone())
839 .fetch_all::<ClickhouseQueryEngine>()
840 .await?;
841 let mut clickhouse_column = client
842 .query(query_column)
843 .bind(config.common.database.clone())
844 .bind(config.common.table.clone())
845 .bind("position")
846 .fetch_all::<SystemColumn>()
847 .await?;
848 if clickhouse_engine.is_empty() || clickhouse_column.is_empty() {
849 return Err(SinkError::ClickHouse(format!(
850 "table {:?}.{:?} is not find in clickhouse",
851 config.common.database, config.common.table
852 )));
853 }
854
855 let clickhouse_engine =
856 ClickHouseEngine::from_query_engine(clickhouse_engine.first().unwrap(), config)?;
857
858 if let Some(sign) = &clickhouse_engine.get_sign_name() {
859 clickhouse_column.retain(|a| sign.ne(&a.name))
860 }
861
862 if let Some(delete_col) = &clickhouse_engine.get_delete_col() {
863 clickhouse_column.retain(|a| delete_col.ne(&a.name))
864 }
865
866 Ok((clickhouse_column, clickhouse_engine))
867}
868
869#[derive(ClickHouseRow, Debug)]
871struct ClickHouseColumn {
872 row: Vec<ClickHouseFieldWithNull>,
873}
874
875#[derive(Debug)]
877enum ClickHouseField {
878 Int16(i16),
879 Int32(i32),
880 Int64(i64),
881 Serial(Serial),
882 Float32(f32),
883 Float64(f64),
884 String(String),
885 Bool(bool),
886 List(Vec<ClickHouseFieldWithNull>),
887 Int8(i8),
888 Decimal(ClickHouseDecimal),
889}
890#[derive(Debug)]
891enum ClickHouseDecimal {
892 Decimal32(i32),
893 Decimal64(i64),
894 Decimal128(i128),
895}
896impl Serialize for ClickHouseDecimal {
897 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
898 where
899 S: serde::Serializer,
900 {
901 match self {
902 ClickHouseDecimal::Decimal32(v) => serializer.serialize_i32(*v),
903 ClickHouseDecimal::Decimal64(v) => serializer.serialize_i64(*v),
904 ClickHouseDecimal::Decimal128(v) => serializer.serialize_i128(*v),
905 }
906 }
907}
908
909#[derive(Debug)]
911enum ClickHouseFieldWithNull {
912 WithSome(ClickHouseField),
913 WithoutSome(ClickHouseField),
914 None,
915}
916
917impl ClickHouseFieldWithNull {
918 pub fn from_scalar_ref(
919 data: Option<ScalarRefImpl<'_>>,
920 clickhouse_schema_feature_vec: &Vec<ClickHouseSchemaFeature>,
921 clickhouse_schema_feature_index: usize,
922 ) -> Result<Vec<ClickHouseFieldWithNull>> {
923 let clickhouse_schema_feature = clickhouse_schema_feature_vec
924 .get(clickhouse_schema_feature_index)
925 .ok_or_else(|| SinkError::ClickHouse(format!("No column found from clickhouse table schema, index is {clickhouse_schema_feature_index}")))?;
926 if data.is_none() {
927 if !clickhouse_schema_feature.can_null {
928 return Err(SinkError::ClickHouse(
929 "Cannot insert null value into non-nullable ClickHouse column".to_owned(),
930 ));
931 } else {
932 return Ok(vec![ClickHouseFieldWithNull::None]);
933 }
934 }
935 let data = match data.unwrap() {
936 ScalarRefImpl::Int16(v) => ClickHouseField::Int16(v),
937 ScalarRefImpl::Int32(v) => ClickHouseField::Int32(v),
938 ScalarRefImpl::Int64(v) => ClickHouseField::Int64(v),
939 ScalarRefImpl::Int256(_) => {
940 return Err(SinkError::ClickHouse(
941 "INT256 is not supported for ClickHouse sink.".to_owned(),
942 ));
943 }
944 ScalarRefImpl::Serial(v) => ClickHouseField::Serial(v),
945 ScalarRefImpl::Float32(v) => ClickHouseField::Float32(v.into_inner()),
946 ScalarRefImpl::Float64(v) => ClickHouseField::Float64(v.into_inner()),
947 ScalarRefImpl::Utf8(v) => ClickHouseField::String(v.to_owned()),
948 ScalarRefImpl::Bool(v) => ClickHouseField::Bool(v),
949 ScalarRefImpl::Decimal(d) => {
950 let d = if let Decimal::Normalized(d) = d {
951 let scale =
952 clickhouse_schema_feature.accuracy_decimal.1 as i32 - d.scale() as i32;
953 if scale < 0 {
954 d.mantissa() / 10_i128.pow(scale.unsigned_abs())
955 } else {
956 d.mantissa() * 10_i128.pow(scale as u32)
957 }
958 } else if clickhouse_schema_feature.can_null {
959 warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse null!");
960 return Ok(vec![ClickHouseFieldWithNull::None]);
961 } else {
962 warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse 0!");
963 0_i128
964 };
965 if clickhouse_schema_feature.accuracy_decimal.0 <= 9 {
966 ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(d as i32))
967 } else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 {
968 ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(d as i64))
969 } else {
970 ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(d))
971 }
972 }
973 ScalarRefImpl::Interval(_) => {
974 return Err(SinkError::ClickHouse(
975 "INTERVAL is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
976 ));
977 }
978 ScalarRefImpl::Date(v) => {
979 let days = v.get_nums_days_unix_epoch();
980 ClickHouseField::Int32(days)
981 }
982 ScalarRefImpl::Time(_) => {
983 return Err(SinkError::ClickHouse(
984 "TIME is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
985 ));
986 }
987 ScalarRefImpl::Timestamp(_) => {
988 return Err(SinkError::ClickHouse(
989 "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
990 ));
991 }
992 ScalarRefImpl::Timestamptz(v) => {
993 let micros = v.timestamp_micros();
994 let ticks = match clickhouse_schema_feature.accuracy_time <= 6 {
995 true => {
996 micros / 10_i64.pow((6 - clickhouse_schema_feature.accuracy_time).into())
997 }
998 false => micros
999 .checked_mul(
1000 10_i64.pow((clickhouse_schema_feature.accuracy_time - 6).into()),
1001 )
1002 .ok_or_else(|| SinkError::ClickHouse("DateTime64 overflow".to_owned()))?,
1003 };
1004 ClickHouseField::Int64(ticks)
1005 }
1006 ScalarRefImpl::Jsonb(v) => {
1007 let json_str = v.to_string();
1008 ClickHouseField::String(json_str)
1009 }
1010 ScalarRefImpl::Struct(v) => {
1011 let mut struct_vec = vec![];
1012 for (index, field) in v.iter_fields_ref().enumerate() {
1013 let a = Self::from_scalar_ref(
1014 field,
1015 clickhouse_schema_feature_vec,
1016 clickhouse_schema_feature_index + index,
1017 )?;
1018 struct_vec.push(ClickHouseFieldWithNull::WithoutSome(ClickHouseField::List(
1019 a,
1020 )));
1021 }
1022 return Ok(struct_vec);
1023 }
1024 ScalarRefImpl::List(v) => {
1025 let mut vec = vec![];
1026 for i in v.iter() {
1027 vec.extend(Self::from_scalar_ref(
1028 i,
1029 clickhouse_schema_feature_vec,
1030 clickhouse_schema_feature_index,
1031 )?)
1032 }
1033 return Ok(vec![ClickHouseFieldWithNull::WithoutSome(
1034 ClickHouseField::List(vec),
1035 )]);
1036 }
1037 ScalarRefImpl::Bytea(_) => {
1038 return Err(SinkError::ClickHouse(
1039 "BYTEA is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
1040 ));
1041 }
1042 ScalarRefImpl::Map(_) => {
1043 return Err(SinkError::ClickHouse(
1044 "MAP is not supported for ClickHouse sink.".to_owned(),
1045 ));
1046 }
1047 ScalarRefImpl::Vector(_) => {
1048 return Err(SinkError::ClickHouse(
1049 "VECTOR is not supported for ClickHouse sink.".to_owned(),
1050 ));
1051 }
1052 };
1053 let data = if clickhouse_schema_feature.can_null {
1054 vec![ClickHouseFieldWithNull::WithSome(data)]
1055 } else {
1056 vec![ClickHouseFieldWithNull::WithoutSome(data)]
1057 };
1058 Ok(data)
1059 }
1060}
1061
1062impl Serialize for ClickHouseField {
1063 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1064 where
1065 S: serde::Serializer,
1066 {
1067 match self {
1068 ClickHouseField::Int16(v) => serializer.serialize_i16(*v),
1069 ClickHouseField::Int32(v) => serializer.serialize_i32(*v),
1070 ClickHouseField::Int64(v) => serializer.serialize_i64(*v),
1071 ClickHouseField::Serial(v) => v.serialize(serializer),
1072 ClickHouseField::Float32(v) => serializer.serialize_f32(*v),
1073 ClickHouseField::Float64(v) => serializer.serialize_f64(*v),
1074 ClickHouseField::String(v) => serializer.serialize_str(v),
1075 ClickHouseField::Bool(v) => serializer.serialize_bool(*v),
1076 ClickHouseField::List(v) => {
1077 let mut s = serializer.serialize_seq(Some(v.len()))?;
1078 for i in v {
1079 s.serialize_element(i)?;
1080 }
1081 s.end()
1082 }
1083 ClickHouseField::Decimal(v) => v.serialize(serializer),
1084 ClickHouseField::Int8(v) => serializer.serialize_i8(*v),
1085 }
1086 }
1087}
1088impl Serialize for ClickHouseFieldWithNull {
1089 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1090 where
1091 S: serde::Serializer,
1092 {
1093 match self {
1094 ClickHouseFieldWithNull::WithSome(v) => serializer.serialize_some(v),
1095 ClickHouseFieldWithNull::WithoutSome(v) => v.serialize(serializer),
1096 ClickHouseFieldWithNull::None => serializer.serialize_none(),
1097 }
1098 }
1099}
1100impl Serialize for ClickHouseColumn {
1101 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1102 where
1103 S: serde::Serializer,
1104 {
1105 let mut s = serializer.serialize_struct("useless", self.row.len())?;
1106 for data in &self.row {
1107 s.serialize_field("useless", &data)?
1108 }
1109 s.end()
1110 }
1111}
1112
1113pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result<Vec<(String, DataType)>> {
1116 let mut vec = vec![];
1117 for field in schema.fields() {
1118 if let DataType::Struct(st) = &field.data_type {
1119 for (name, data_type) in st.iter() {
1120 if matches!(data_type, DataType::Struct(_)) {
1121 return Err(SinkError::ClickHouse(
1122 "Only one level of nesting is supported for struct".to_owned(),
1123 ));
1124 } else {
1125 vec.push((
1126 format!("{}.{}", field.name, name),
1127 DataType::list(data_type.clone()),
1128 ))
1129 }
1130 }
1131 } else {
1132 vec.push((field.name.clone(), field.data_type()));
1133 }
1134 }
1135 Ok(vec)
1136}