1use core::fmt::Debug;
16use core::num::NonZeroU64;
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use anyhow::anyhow;
20use clickhouse::insert::Insert;
21use clickhouse::{Client as ClickHouseClient, Row as ClickHouseRow};
22use itertools::Itertools;
23use risingwave_common::array::{Op, StreamChunk};
24use risingwave_common::catalog::Schema;
25use risingwave_common::row::Row;
26use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
27use serde::Serialize;
28use serde::ser::{SerializeSeq, SerializeStruct};
29use serde_derive::Deserialize;
30use serde_with::{DisplayFromStr, serde_as};
31use thiserror_ext::AsReport;
32use tonic::async_trait;
33use tracing::warn;
34use with_options::WithOptions;
35
36use super::decouple_checkpoint_log_sink::{
37 DecoupleCheckpointLogSinkerOf, default_commit_checkpoint_interval,
38};
39use super::writer::SinkWriter;
40use super::{DummySinkCommitCoordinator, SinkWriterMetrics, SinkWriterParam};
41use crate::error::ConnectorResult;
42use crate::sink::{
43 Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink, SinkError, SinkParam,
44};
45
46const QUERY_ENGINE: &str =
47 "select distinct ?fields from system.tables where database = ? and name = ?";
48const QUERY_COLUMN: &str =
49 "select distinct ?fields from system.columns where database = ? and table = ? order by ?";
50pub const CLICKHOUSE_SINK: &str = "clickhouse";
51
52#[serde_as]
53#[derive(Deserialize, Debug, Clone, WithOptions)]
54pub struct ClickHouseCommon {
55 #[serde(rename = "clickhouse.url")]
56 pub url: String,
57 #[serde(rename = "clickhouse.user")]
58 pub user: String,
59 #[serde(rename = "clickhouse.password")]
60 pub password: String,
61 #[serde(rename = "clickhouse.database")]
62 pub database: String,
63 #[serde(rename = "clickhouse.table")]
64 pub table: String,
65 #[serde(rename = "clickhouse.delete.column")]
66 pub delete_column: Option<String>,
67 #[serde(default = "default_commit_checkpoint_interval")]
69 #[serde_as(as = "DisplayFromStr")]
70 pub commit_checkpoint_interval: u64,
71}
72
73#[allow(clippy::enum_variant_names)]
74#[derive(Debug)]
75enum ClickHouseEngine {
76 MergeTree,
77 ReplacingMergeTree(Option<String>),
78 SummingMergeTree,
79 AggregatingMergeTree,
80 CollapsingMergeTree(String),
81 VersionedCollapsingMergeTree(String),
82 GraphiteMergeTree,
83 ReplicatedMergeTree,
84 ReplicatedReplacingMergeTree(Option<String>),
85 ReplicatedSummingMergeTree,
86 ReplicatedAggregatingMergeTree,
87 ReplicatedCollapsingMergeTree(String),
88 ReplicatedVersionedCollapsingMergeTree(String),
89 ReplicatedGraphiteMergeTree,
90 SharedMergeTree,
91 SharedReplacingMergeTree(Option<String>),
92 SharedSummingMergeTree,
93 SharedAggregatingMergeTree,
94 SharedCollapsingMergeTree(String),
95 SharedVersionedCollapsingMergeTree(String),
96 SharedGraphiteMergeTree,
97 Null,
98}
99impl ClickHouseEngine {
100 pub fn is_collapsing_engine(&self) -> bool {
101 matches!(
102 self,
103 ClickHouseEngine::CollapsingMergeTree(_)
104 | ClickHouseEngine::VersionedCollapsingMergeTree(_)
105 | ClickHouseEngine::ReplicatedCollapsingMergeTree(_)
106 | ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(_)
107 | ClickHouseEngine::SharedCollapsingMergeTree(_)
108 | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
109 )
110 }
111
112 pub fn is_delete_replacing_engine(&self) -> bool {
113 match self {
114 ClickHouseEngine::ReplacingMergeTree(delete_col) => delete_col.is_some(),
115 ClickHouseEngine::ReplicatedReplacingMergeTree(delete_col) => delete_col.is_some(),
116 ClickHouseEngine::SharedReplacingMergeTree(delete_col) => delete_col.is_some(),
117 _ => false,
118 }
119 }
120
121 pub fn get_delete_col(&self) -> Option<String> {
122 match self {
123 ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.to_string()),
124 ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => {
125 Some(delete_col.to_string())
126 }
127 ClickHouseEngine::SharedReplacingMergeTree(Some(delete_col)) => {
128 Some(delete_col.to_string())
129 }
130 _ => None,
131 }
132 }
133
134 pub fn get_sign_name(&self) -> Option<String> {
135 match self {
136 ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.to_string()),
137 ClickHouseEngine::VersionedCollapsingMergeTree(sign_name) => {
138 Some(sign_name.to_string())
139 }
140 ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name) => {
141 Some(sign_name.to_string())
142 }
143 ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(sign_name) => {
144 Some(sign_name.to_string())
145 }
146 ClickHouseEngine::SharedCollapsingMergeTree(sign_name) => Some(sign_name.to_string()),
147 ClickHouseEngine::SharedVersionedCollapsingMergeTree(sign_name) => {
148 Some(sign_name.to_string())
149 }
150 _ => None,
151 }
152 }
153
154 pub fn is_shared_tree(&self) -> bool {
155 matches!(
156 self,
157 ClickHouseEngine::SharedMergeTree
158 | ClickHouseEngine::SharedReplacingMergeTree(_)
159 | ClickHouseEngine::SharedSummingMergeTree
160 | ClickHouseEngine::SharedAggregatingMergeTree
161 | ClickHouseEngine::SharedCollapsingMergeTree(_)
162 | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
163 | ClickHouseEngine::SharedGraphiteMergeTree
164 )
165 }
166
167 pub fn from_query_engine(
168 engine_name: &ClickhouseQueryEngine,
169 config: &ClickHouseConfig,
170 ) -> Result<Self> {
171 match engine_name.engine.as_str() {
172 "MergeTree" => Ok(ClickHouseEngine::MergeTree),
173 "Null" => Ok(ClickHouseEngine::Null),
174 "ReplacingMergeTree" => {
175 let delete_column = config.common.delete_column.clone();
176 Ok(ClickHouseEngine::ReplacingMergeTree(delete_column))
177 }
178 "SummingMergeTree" => Ok(ClickHouseEngine::SummingMergeTree),
179 "AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree),
180 "VersionedCollapsingMergeTree" => {
182 let sign_name = engine_name
183 .create_table_query
184 .split("VersionedCollapsingMergeTree(")
185 .last()
186 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
187 .split(',')
188 .next()
189 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
190 .trim()
191 .to_owned();
192 Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name))
193 }
194 "CollapsingMergeTree" => {
196 let sign_name = engine_name
197 .create_table_query
198 .split("CollapsingMergeTree(")
199 .last()
200 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
201 .split(')')
202 .next()
203 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
204 .trim()
205 .to_owned();
206 Ok(ClickHouseEngine::CollapsingMergeTree(sign_name))
207 }
208 "GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree),
209 "ReplicatedMergeTree" => Ok(ClickHouseEngine::ReplicatedMergeTree),
210 "ReplicatedReplacingMergeTree" => {
211 let delete_column = config.common.delete_column.clone();
212 Ok(ClickHouseEngine::ReplicatedReplacingMergeTree(
213 delete_column,
214 ))
215 }
216 "ReplicatedSummingMergeTree" => Ok(ClickHouseEngine::ReplicatedSummingMergeTree),
217 "ReplicatedAggregatingMergeTree" => {
218 Ok(ClickHouseEngine::ReplicatedAggregatingMergeTree)
219 }
220 "ReplicatedVersionedCollapsingMergeTree" => {
222 let sign_name = engine_name
223 .create_table_query
224 .split("ReplicatedVersionedCollapsingMergeTree(")
225 .last()
226 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
227 .split(',')
228 .rev()
229 .nth(1)
230 .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
231 .trim()
232 .to_owned();
233 Ok(ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(
234 sign_name,
235 ))
236 }
237 "ReplicatedCollapsingMergeTree" => {
239 let sign_name = engine_name
240 .create_table_query
241 .split("ReplicatedCollapsingMergeTree(")
242 .last()
243 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
244 .split(')')
245 .next()
246 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
247 .split(',')
248 .next_back()
249 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
250 .trim()
251 .to_owned();
252 Ok(ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name))
253 }
254 "ReplicatedGraphiteMergeTree" => Ok(ClickHouseEngine::ReplicatedGraphiteMergeTree),
255 "SharedMergeTree" => Ok(ClickHouseEngine::SharedMergeTree),
256 "SharedReplacingMergeTree" => {
257 let delete_column = config.common.delete_column.clone();
258 Ok(ClickHouseEngine::SharedReplacingMergeTree(delete_column))
259 }
260 "SharedSummingMergeTree" => Ok(ClickHouseEngine::SharedSummingMergeTree),
261 "SharedAggregatingMergeTree" => Ok(ClickHouseEngine::SharedAggregatingMergeTree),
262 "SharedVersionedCollapsingMergeTree" => {
264 let sign_name = engine_name
265 .create_table_query
266 .split("SharedVersionedCollapsingMergeTree(")
267 .last()
268 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
269 .split(',')
270 .rev()
271 .nth(1)
272 .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
273 .trim()
274 .to_owned();
275 Ok(ClickHouseEngine::SharedVersionedCollapsingMergeTree(
276 sign_name,
277 ))
278 }
279 "SharedCollapsingMergeTree" => {
281 let sign_name = engine_name
282 .create_table_query
283 .split("SharedCollapsingMergeTree(")
284 .last()
285 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
286 .split(')')
287 .next()
288 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
289 .split(',')
290 .next_back()
291 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
292 .trim()
293 .to_owned();
294 Ok(ClickHouseEngine::SharedCollapsingMergeTree(sign_name))
295 }
296 "SharedGraphiteMergeTree" => Ok(ClickHouseEngine::SharedGraphiteMergeTree),
297 _ => Err(SinkError::ClickHouse(format!(
298 "Cannot find clickhouse engine {:?}",
299 engine_name.engine
300 ))),
301 }
302 }
303}
304
305impl ClickHouseCommon {
306 pub(crate) fn build_client(&self) -> ConnectorResult<ClickHouseClient> {
307 let client = ClickHouseClient::default() .with_url(&self.url)
309 .with_user(&self.user)
310 .with_password(&self.password)
311 .with_database(&self.database);
312 Ok(client)
313 }
314}
315
316#[serde_as]
317#[derive(Clone, Debug, Deserialize, WithOptions)]
318pub struct ClickHouseConfig {
319 #[serde(flatten)]
320 pub common: ClickHouseCommon,
321
322 pub r#type: String, }
324
325#[derive(Clone, Debug)]
326pub struct ClickHouseSink {
327 pub config: ClickHouseConfig,
328 schema: Schema,
329 pk_indices: Vec<usize>,
330 is_append_only: bool,
331}
332
333impl ClickHouseConfig {
334 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
335 let config =
336 serde_json::from_value::<ClickHouseConfig>(serde_json::to_value(properties).unwrap())
337 .map_err(|e| SinkError::Config(anyhow!(e)))?;
338 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
339 return Err(SinkError::Config(anyhow!(
340 "`{}` must be {}, or {}",
341 SINK_TYPE_OPTION,
342 SINK_TYPE_APPEND_ONLY,
343 SINK_TYPE_UPSERT
344 )));
345 }
346 Ok(config)
347 }
348}
349
350impl TryFrom<SinkParam> for ClickHouseSink {
351 type Error = SinkError;
352
353 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
354 let schema = param.schema();
355 let config = ClickHouseConfig::from_btreemap(param.properties)?;
356 Ok(Self {
357 config,
358 schema,
359 pk_indices: param.downstream_pk,
360 is_append_only: param.sink_type.is_append_only(),
361 })
362 }
363}
364
365impl ClickHouseSink {
366 fn check_column_name_and_type(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
368 let rw_fields_name = build_fields_name_type_from_schema(&self.schema)?;
369 let clickhouse_columns_desc: HashMap<String, SystemColumn> = clickhouse_columns_desc
370 .iter()
371 .map(|s| (s.name.clone(), s.clone()))
372 .collect();
373
374 if rw_fields_name.len().gt(&clickhouse_columns_desc.len()) {
375 return Err(SinkError::ClickHouse("The columns of the sink must be equal to or a superset of the target table's columns.".to_owned()));
376 }
377
378 for i in rw_fields_name {
379 let value = clickhouse_columns_desc.get(&i.0).ok_or_else(|| {
380 SinkError::ClickHouse(format!(
381 "Column name don't find in clickhouse, risingwave is {:?} ",
382 i.0
383 ))
384 })?;
385
386 Self::check_and_correct_column_type(&i.1, value)?;
387 }
388 Ok(())
389 }
390
391 fn check_pk_match(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
393 let mut clickhouse_pks: HashSet<String> = clickhouse_columns_desc
394 .iter()
395 .filter(|s| s.is_in_primary_key == 1)
396 .map(|s| s.name.clone())
397 .collect();
398
399 for (_, field) in self
400 .schema
401 .fields()
402 .iter()
403 .enumerate()
404 .filter(|(index, _)| self.pk_indices.contains(index))
405 {
406 if !clickhouse_pks.remove(&field.name) {
407 return Err(SinkError::ClickHouse(
408 "Clicklhouse and RisingWave pk is not match".to_owned(),
409 ));
410 }
411 }
412
413 if !clickhouse_pks.is_empty() {
414 return Err(SinkError::ClickHouse(
415 "Clicklhouse and RisingWave pk is not match".to_owned(),
416 ));
417 }
418 Ok(())
419 }
420
421 fn check_and_correct_column_type(
423 fields_type: &DataType,
424 ck_column: &SystemColumn,
425 ) -> Result<()> {
426 let is_match = match fields_type {
428 risingwave_common::types::DataType::Boolean => Ok(ck_column.r#type.contains("Bool")),
429 risingwave_common::types::DataType::Int16 => Ok(ck_column.r#type.contains("UInt16")
430 | ck_column.r#type.contains("Int16")
431 | ck_column.r#type.contains("Enum16")),
434 risingwave_common::types::DataType::Int32 => {
435 Ok(ck_column.r#type.contains("UInt32") | ck_column.r#type.contains("Int32"))
436 }
437 risingwave_common::types::DataType::Int64 => {
438 Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
439 }
440 risingwave_common::types::DataType::Float32 => Ok(ck_column.r#type.contains("Float32")),
441 risingwave_common::types::DataType::Float64 => Ok(ck_column.r#type.contains("Float64")),
442 risingwave_common::types::DataType::Decimal => Ok(ck_column.r#type.contains("Decimal")),
443 risingwave_common::types::DataType::Date => Ok(ck_column.r#type.contains("Date32")),
444 risingwave_common::types::DataType::Varchar => Ok(ck_column.r#type.contains("String")),
445 risingwave_common::types::DataType::Time => Err(SinkError::ClickHouse(
446 "clickhouse can not support Time".to_owned(),
447 )),
448 risingwave_common::types::DataType::Timestamp => Err(SinkError::ClickHouse(
449 "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
450 )),
451 risingwave_common::types::DataType::Timestamptz => {
452 Ok(ck_column.r#type.contains("DateTime64"))
453 }
454 risingwave_common::types::DataType::Interval => Err(SinkError::ClickHouse(
455 "clickhouse can not support Interval".to_owned(),
456 )),
457 risingwave_common::types::DataType::Struct(_) => Err(SinkError::ClickHouse(
458 "struct needs to be converted into a list".to_owned(),
459 )),
460 risingwave_common::types::DataType::List(list) => {
461 Self::check_and_correct_column_type(list.as_ref(), ck_column)?;
462 Ok(ck_column.r#type.contains("Array"))
463 }
464 risingwave_common::types::DataType::Bytea => Err(SinkError::ClickHouse(
465 "clickhouse can not support Bytea".to_owned(),
466 )),
467 risingwave_common::types::DataType::Jsonb => Err(SinkError::ClickHouse(
468 "clickhouse rust can not support Json".to_owned(),
469 )),
470 risingwave_common::types::DataType::Serial => {
471 Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
472 }
473 risingwave_common::types::DataType::Int256 => Err(SinkError::ClickHouse(
474 "clickhouse can not support Int256".to_owned(),
475 )),
476 risingwave_common::types::DataType::Map(_) => Err(SinkError::ClickHouse(
477 "clickhouse can not support Map".to_owned(),
478 )),
479 };
480 if !is_match? {
481 return Err(SinkError::ClickHouse(format!(
482 "Column type can not match name is {:?}, risingwave is {:?} and clickhouse is {:?}",
483 ck_column.name, fields_type, ck_column.r#type
484 )));
485 }
486
487 Ok(())
488 }
489}
490impl Sink for ClickHouseSink {
491 type Coordinator = DummySinkCommitCoordinator;
492 type LogSinker = DecoupleCheckpointLogSinkerOf<ClickHouseSinkWriter>;
493
494 const SINK_NAME: &'static str = CLICKHOUSE_SINK;
495
496 async fn validate(&self) -> Result<()> {
497 if !self.is_append_only && self.pk_indices.is_empty() {
499 return Err(SinkError::Config(anyhow!(
500 "Primary key not defined for upsert clickhouse sink (please define in `primary_key` field)"
501 )));
502 }
503
504 let client = self.config.common.build_client()?;
506
507 let (clickhouse_column, clickhouse_engine) =
508 query_column_engine_from_ck(client, &self.config).await?;
509 if clickhouse_engine.is_shared_tree() {
510 risingwave_common::license::Feature::ClickHouseSharedEngine
511 .check_available()
512 .map_err(|e| anyhow::anyhow!(e))?;
513 }
514
515 if !self.is_append_only
516 && !clickhouse_engine.is_collapsing_engine()
517 && !clickhouse_engine.is_delete_replacing_engine()
518 {
519 return match clickhouse_engine {
520 ClickHouseEngine::ReplicatedReplacingMergeTree(None) | ClickHouseEngine::ReplacingMergeTree(None) | ClickHouseEngine::SharedReplacingMergeTree(None) => {
521 Err(SinkError::ClickHouse("To enable upsert with a `ReplacingMergeTree`, you must set a `clickhouse.delete.column` to the UInt8 column in ClickHouse used to signify deletes. See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree#is_deleted for more information".to_owned()))
522 }
523 _ => Err(SinkError::ClickHouse("If you want to use upsert, please use either `VersionedCollapsingMergeTree`, `CollapsingMergeTree` or the `ReplacingMergeTree` in ClickHouse".to_owned()))
524 };
525 }
526
527 self.check_column_name_and_type(&clickhouse_column)?;
528 if !self.is_append_only {
529 self.check_pk_match(&clickhouse_column)?;
530 }
531
532 if self.config.common.commit_checkpoint_interval == 0 {
533 return Err(SinkError::Config(anyhow!(
534 "`commit_checkpoint_interval` must be greater than 0"
535 )));
536 }
537 Ok(())
538 }
539
540 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
541 let writer = ClickHouseSinkWriter::new(
542 self.config.clone(),
543 self.schema.clone(),
544 self.pk_indices.clone(),
545 self.is_append_only,
546 )
547 .await?;
548 let commit_checkpoint_interval =
549 NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
550 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
551 );
552
553 Ok(DecoupleCheckpointLogSinkerOf::new(
554 writer,
555 SinkWriterMetrics::new(&writer_param),
556 commit_checkpoint_interval,
557 ))
558 }
559}
560pub struct ClickHouseSinkWriter {
561 pub config: ClickHouseConfig,
562 #[expect(dead_code)]
563 schema: Schema,
564 #[expect(dead_code)]
565 pk_indices: Vec<usize>,
566 client: ClickHouseClient,
567 #[expect(dead_code)]
568 is_append_only: bool,
569 column_correct_vec: Vec<ClickHouseSchemaFeature>,
571 rw_fields_name_after_calibration: Vec<String>,
572 clickhouse_engine: ClickHouseEngine,
573 inserter: Option<Insert<ClickHouseColumn>>,
574}
575#[derive(Debug)]
576struct ClickHouseSchemaFeature {
577 can_null: bool,
578 accuracy_time: u8,
580
581 accuracy_decimal: (u8, u8),
582}
583
584impl ClickHouseSinkWriter {
585 pub async fn new(
586 config: ClickHouseConfig,
587 schema: Schema,
588 pk_indices: Vec<usize>,
589 is_append_only: bool,
590 ) -> Result<Self> {
591 let client = config.common.build_client()?;
592
593 let (clickhouse_column, clickhouse_engine) =
594 query_column_engine_from_ck(client.clone(), &config).await?;
595
596 let column_correct_vec: Result<Vec<ClickHouseSchemaFeature>> = clickhouse_column
597 .iter()
598 .map(Self::build_column_correct_vec)
599 .collect();
600 let mut rw_fields_name_after_calibration = build_fields_name_type_from_schema(&schema)?
601 .iter()
602 .map(|(a, _)| a.clone())
603 .collect_vec();
604
605 if let Some(sign) = clickhouse_engine.get_sign_name() {
606 rw_fields_name_after_calibration.push(sign);
607 }
608 if let Some(delete_col) = clickhouse_engine.get_delete_col() {
609 rw_fields_name_after_calibration.push(delete_col);
610 }
611 Ok(Self {
612 config,
613 schema,
614 pk_indices,
615 client,
616 is_append_only,
617 column_correct_vec: column_correct_vec?,
618 rw_fields_name_after_calibration,
619 clickhouse_engine,
620 inserter: None,
621 })
622 }
623
624 fn build_column_correct_vec(ck_column: &SystemColumn) -> Result<ClickHouseSchemaFeature> {
627 let can_null = ck_column.r#type.contains("Nullable");
628 let accuracy_time = if ck_column.r#type.contains("DateTime64(") {
630 ck_column
631 .r#type
632 .split("DateTime64(")
633 .last()
634 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
635 .split(')')
636 .next()
637 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
638 .split(',')
639 .next()
640 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
641 .parse::<u8>()
642 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?
643 } else {
644 0_u8
645 };
646 let accuracy_decimal = if ck_column.r#type.contains("Decimal(") {
647 let decimal_all = ck_column
648 .r#type
649 .split("Decimal(")
650 .last()
651 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
652 .split(')')
653 .next()
654 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
655 .split(", ")
656 .collect_vec();
657 let length = decimal_all
658 .first()
659 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
660 .parse::<u8>()
661 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
662
663 if length > 38 {
664 return Err(SinkError::ClickHouse(
665 "RW don't support Decimal256".to_owned(),
666 ));
667 }
668
669 let scale = decimal_all
670 .last()
671 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
672 .parse::<u8>()
673 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
674 (length, scale)
675 } else {
676 (0_u8, 0_u8)
677 };
678 Ok(ClickHouseSchemaFeature {
679 can_null,
680 accuracy_time,
681 accuracy_decimal,
682 })
683 }
684
685 async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
686 if self.inserter.is_none() {
687 self.inserter = Some(self.client.insert_with_fields_name(
688 &self.config.common.table,
689 self.rw_fields_name_after_calibration.clone(),
690 )?);
691 }
692 for (op, row) in chunk.rows() {
693 let mut clickhouse_filed_vec = vec![];
694 for (index, data) in row.iter().enumerate() {
695 clickhouse_filed_vec.extend(ClickHouseFieldWithNull::from_scalar_ref(
696 data,
697 &self.column_correct_vec,
698 index,
699 )?);
700 }
701 match op {
702 Op::Insert | Op::UpdateInsert => {
703 if self.clickhouse_engine.is_collapsing_engine() {
704 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
705 ClickHouseField::Int8(1),
706 ));
707 }
708 if self.clickhouse_engine.is_delete_replacing_engine() {
709 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
710 ClickHouseField::Int8(0),
711 ))
712 }
713 }
714 Op::Delete | Op::UpdateDelete => {
715 if !self.clickhouse_engine.is_collapsing_engine()
716 && !self.clickhouse_engine.is_delete_replacing_engine()
717 {
718 return Err(SinkError::ClickHouse(
719 "Clickhouse engine don't support upsert".to_owned(),
720 ));
721 }
722 if self.clickhouse_engine.is_collapsing_engine() {
723 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
724 ClickHouseField::Int8(-1),
725 ));
726 }
727 if self.clickhouse_engine.is_delete_replacing_engine() {
728 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
729 ClickHouseField::Int8(1),
730 ))
731 }
732 }
733 }
734 let clickhouse_column = ClickHouseColumn {
735 row: clickhouse_filed_vec,
736 };
737 self.inserter
738 .as_mut()
739 .unwrap()
740 .write(&clickhouse_column)
741 .await?;
742 }
743 Ok(())
744 }
745}
746
747#[async_trait]
748impl SinkWriter for ClickHouseSinkWriter {
749 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
750 self.write(chunk).await
751 }
752
753 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
754 Ok(())
755 }
756
757 async fn abort(&mut self) -> Result<()> {
758 Ok(())
759 }
760
761 async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
762 if is_checkpoint && let Some(inserter) = self.inserter.take() {
763 inserter.end().await?;
764 }
765 Ok(())
766 }
767}
768
769#[derive(ClickHouseRow, Deserialize, Clone)]
770struct SystemColumn {
771 name: String,
772 r#type: String,
773 is_in_primary_key: u8,
774}
775
776#[derive(ClickHouseRow, Deserialize)]
777struct ClickhouseQueryEngine {
778 #[expect(dead_code)]
779 name: String,
780 engine: String,
781 create_table_query: String,
782}
783
784async fn query_column_engine_from_ck(
785 client: ClickHouseClient,
786 config: &ClickHouseConfig,
787) -> Result<(Vec<SystemColumn>, ClickHouseEngine)> {
788 let query_engine = QUERY_ENGINE;
789 let query_column = QUERY_COLUMN;
790
791 let clickhouse_engine = client
792 .query(query_engine)
793 .bind(config.common.database.clone())
794 .bind(config.common.table.clone())
795 .fetch_all::<ClickhouseQueryEngine>()
796 .await?;
797 let mut clickhouse_column = client
798 .query(query_column)
799 .bind(config.common.database.clone())
800 .bind(config.common.table.clone())
801 .bind("position")
802 .fetch_all::<SystemColumn>()
803 .await?;
804 if clickhouse_engine.is_empty() || clickhouse_column.is_empty() {
805 return Err(SinkError::ClickHouse(format!(
806 "table {:?}.{:?} is not find in clickhouse",
807 config.common.database, config.common.table
808 )));
809 }
810
811 let clickhouse_engine =
812 ClickHouseEngine::from_query_engine(clickhouse_engine.first().unwrap(), config)?;
813
814 if let Some(sign) = &clickhouse_engine.get_sign_name() {
815 clickhouse_column.retain(|a| sign.ne(&a.name))
816 }
817
818 if let Some(delete_col) = &clickhouse_engine.get_delete_col() {
819 clickhouse_column.retain(|a| delete_col.ne(&a.name))
820 }
821
822 Ok((clickhouse_column, clickhouse_engine))
823}
824
825#[derive(ClickHouseRow, Debug)]
827struct ClickHouseColumn {
828 row: Vec<ClickHouseFieldWithNull>,
829}
830
831#[derive(Debug)]
833enum ClickHouseField {
834 Int16(i16),
835 Int32(i32),
836 Int64(i64),
837 Serial(Serial),
838 Float32(f32),
839 Float64(f64),
840 String(String),
841 Bool(bool),
842 List(Vec<ClickHouseFieldWithNull>),
843 Int8(i8),
844 Decimal(ClickHouseDecimal),
845}
846#[derive(Debug)]
847enum ClickHouseDecimal {
848 Decimal32(i32),
849 Decimal64(i64),
850 Decimal128(i128),
851}
852impl Serialize for ClickHouseDecimal {
853 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
854 where
855 S: serde::Serializer,
856 {
857 match self {
858 ClickHouseDecimal::Decimal32(v) => serializer.serialize_i32(*v),
859 ClickHouseDecimal::Decimal64(v) => serializer.serialize_i64(*v),
860 ClickHouseDecimal::Decimal128(v) => serializer.serialize_i128(*v),
861 }
862 }
863}
864
865#[derive(Debug)]
867enum ClickHouseFieldWithNull {
868 WithSome(ClickHouseField),
869 WithoutSome(ClickHouseField),
870 None,
871}
872
873impl ClickHouseFieldWithNull {
874 pub fn from_scalar_ref(
875 data: Option<ScalarRefImpl<'_>>,
876 clickhouse_schema_feature_vec: &Vec<ClickHouseSchemaFeature>,
877 clickhouse_schema_feature_index: usize,
878 ) -> Result<Vec<ClickHouseFieldWithNull>> {
879 let clickhouse_schema_feature = clickhouse_schema_feature_vec
880 .get(clickhouse_schema_feature_index)
881 .ok_or_else(|| SinkError::ClickHouse(format!("No column found from clickhouse table schema, index is {clickhouse_schema_feature_index}")))?;
882 if data.is_none() {
883 if !clickhouse_schema_feature.can_null {
884 return Err(SinkError::ClickHouse(
885 "clickhouse column can not insert null".to_owned(),
886 ));
887 } else {
888 return Ok(vec![ClickHouseFieldWithNull::None]);
889 }
890 }
891 let data = match data.unwrap() {
892 ScalarRefImpl::Int16(v) => ClickHouseField::Int16(v),
893 ScalarRefImpl::Int32(v) => ClickHouseField::Int32(v),
894 ScalarRefImpl::Int64(v) => ClickHouseField::Int64(v),
895 ScalarRefImpl::Int256(_) => {
896 return Err(SinkError::ClickHouse(
897 "clickhouse can not support Int256".to_owned(),
898 ));
899 }
900 ScalarRefImpl::Serial(v) => ClickHouseField::Serial(v),
901 ScalarRefImpl::Float32(v) => ClickHouseField::Float32(v.into_inner()),
902 ScalarRefImpl::Float64(v) => ClickHouseField::Float64(v.into_inner()),
903 ScalarRefImpl::Utf8(v) => ClickHouseField::String(v.to_owned()),
904 ScalarRefImpl::Bool(v) => ClickHouseField::Bool(v),
905 ScalarRefImpl::Decimal(d) => {
906 let d = if let Decimal::Normalized(d) = d {
907 let scale =
908 clickhouse_schema_feature.accuracy_decimal.1 as i32 - d.scale() as i32;
909 if scale < 0 {
910 d.mantissa() / 10_i128.pow(scale.unsigned_abs())
911 } else {
912 d.mantissa() * 10_i128.pow(scale as u32)
913 }
914 } else if clickhouse_schema_feature.can_null {
915 warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse null!");
916 return Ok(vec![ClickHouseFieldWithNull::None]);
917 } else {
918 warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse 0!");
919 0_i128
920 };
921 if clickhouse_schema_feature.accuracy_decimal.0 <= 9 {
922 ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(d as i32))
923 } else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 {
924 ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(d as i64))
925 } else {
926 ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(d))
927 }
928 }
929 ScalarRefImpl::Interval(_) => {
930 return Err(SinkError::ClickHouse(
931 "clickhouse can not support Interval".to_owned(),
932 ));
933 }
934 ScalarRefImpl::Date(v) => {
935 let days = v.get_nums_days_unix_epoch();
936 ClickHouseField::Int32(days)
937 }
938 ScalarRefImpl::Time(_) => {
939 return Err(SinkError::ClickHouse(
940 "clickhouse can not support Time".to_owned(),
941 ));
942 }
943 ScalarRefImpl::Timestamp(_) => {
944 return Err(SinkError::ClickHouse(
945 "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
946 ));
947 }
948 ScalarRefImpl::Timestamptz(v) => {
949 let micros = v.timestamp_micros();
950 let ticks = match clickhouse_schema_feature.accuracy_time <= 6 {
951 true => {
952 micros / 10_i64.pow((6 - clickhouse_schema_feature.accuracy_time).into())
953 }
954 false => micros
955 .checked_mul(
956 10_i64.pow((clickhouse_schema_feature.accuracy_time - 6).into()),
957 )
958 .ok_or_else(|| SinkError::ClickHouse("DateTime64 overflow".to_owned()))?,
959 };
960 ClickHouseField::Int64(ticks)
961 }
962 ScalarRefImpl::Jsonb(_) => {
963 return Err(SinkError::ClickHouse(
964 "clickhouse rust interface can not support Json".to_owned(),
965 ));
966 }
967 ScalarRefImpl::Struct(v) => {
968 let mut struct_vec = vec![];
969 for (index, field) in v.iter_fields_ref().enumerate() {
970 let a = Self::from_scalar_ref(
971 field,
972 clickhouse_schema_feature_vec,
973 clickhouse_schema_feature_index + index,
974 )?;
975 struct_vec.push(ClickHouseFieldWithNull::WithoutSome(ClickHouseField::List(
976 a,
977 )));
978 }
979 return Ok(struct_vec);
980 }
981 ScalarRefImpl::List(v) => {
982 let mut vec = vec![];
983 for i in v.iter() {
984 vec.extend(Self::from_scalar_ref(
985 i,
986 clickhouse_schema_feature_vec,
987 clickhouse_schema_feature_index,
988 )?)
989 }
990 return Ok(vec![ClickHouseFieldWithNull::WithoutSome(
991 ClickHouseField::List(vec),
992 )]);
993 }
994 ScalarRefImpl::Bytea(_) => {
995 return Err(SinkError::ClickHouse(
996 "clickhouse can not support Bytea".to_owned(),
997 ));
998 }
999 ScalarRefImpl::Map(_) => {
1000 return Err(SinkError::ClickHouse(
1001 "clickhouse can not support Map".to_owned(),
1002 ));
1003 }
1004 };
1005 let data = if clickhouse_schema_feature.can_null {
1006 vec![ClickHouseFieldWithNull::WithSome(data)]
1007 } else {
1008 vec![ClickHouseFieldWithNull::WithoutSome(data)]
1009 };
1010 Ok(data)
1011 }
1012}
1013
1014impl Serialize for ClickHouseField {
1015 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1016 where
1017 S: serde::Serializer,
1018 {
1019 match self {
1020 ClickHouseField::Int16(v) => serializer.serialize_i16(*v),
1021 ClickHouseField::Int32(v) => serializer.serialize_i32(*v),
1022 ClickHouseField::Int64(v) => serializer.serialize_i64(*v),
1023 ClickHouseField::Serial(v) => v.serialize(serializer),
1024 ClickHouseField::Float32(v) => serializer.serialize_f32(*v),
1025 ClickHouseField::Float64(v) => serializer.serialize_f64(*v),
1026 ClickHouseField::String(v) => serializer.serialize_str(v),
1027 ClickHouseField::Bool(v) => serializer.serialize_bool(*v),
1028 ClickHouseField::List(v) => {
1029 let mut s = serializer.serialize_seq(Some(v.len()))?;
1030 for i in v {
1031 s.serialize_element(i)?;
1032 }
1033 s.end()
1034 }
1035 ClickHouseField::Decimal(v) => v.serialize(serializer),
1036 ClickHouseField::Int8(v) => serializer.serialize_i8(*v),
1037 }
1038 }
1039}
1040impl Serialize for ClickHouseFieldWithNull {
1041 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1042 where
1043 S: serde::Serializer,
1044 {
1045 match self {
1046 ClickHouseFieldWithNull::WithSome(v) => serializer.serialize_some(v),
1047 ClickHouseFieldWithNull::WithoutSome(v) => v.serialize(serializer),
1048 ClickHouseFieldWithNull::None => serializer.serialize_none(),
1049 }
1050 }
1051}
1052impl Serialize for ClickHouseColumn {
1053 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1054 where
1055 S: serde::Serializer,
1056 {
1057 let mut s = serializer.serialize_struct("useless", self.row.len())?;
1058 for data in &self.row {
1059 s.serialize_field("useless", &data)?
1060 }
1061 s.end()
1062 }
1063}
1064
1065pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result<Vec<(String, DataType)>> {
1068 let mut vec = vec![];
1069 for field in schema.fields() {
1070 if let DataType::Struct(st) = &field.data_type {
1071 for (name, data_type) in st.iter() {
1072 if matches!(data_type, DataType::Struct(_)) {
1073 return Err(SinkError::ClickHouse(
1074 "Only one level of nesting is supported for struct".to_owned(),
1075 ));
1076 } else {
1077 vec.push((
1078 format!("{}.{}", field.name, name),
1079 DataType::List(Box::new(data_type.clone())),
1080 ))
1081 }
1082 }
1083 } else {
1084 vec.push((field.name.clone(), field.data_type()));
1085 }
1086 }
1087 Ok(vec)
1088}