1use core::fmt::Debug;
16use core::num::NonZeroU64;
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use anyhow::anyhow;
20use clickhouse::insert::Insert;
21use clickhouse::{Client as ClickHouseClient, Row as ClickHouseRow};
22use itertools::Itertools;
23use phf::{Set, phf_set};
24use risingwave_common::array::{Op, StreamChunk};
25use risingwave_common::catalog::Schema;
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
28use serde::Serialize;
29use serde::ser::{SerializeSeq, SerializeStruct};
30use serde_derive::Deserialize;
31use serde_with::{DisplayFromStr, serde_as};
32use thiserror_ext::AsReport;
33use tonic::async_trait;
34use tracing::warn;
35use with_options::WithOptions;
36
37use super::decouple_checkpoint_log_sink::{
38 DecoupleCheckpointLogSinkerOf, default_commit_checkpoint_interval,
39};
40use super::writer::SinkWriter;
41use super::{DummySinkCommitCoordinator, SinkWriterMetrics, SinkWriterParam};
42use crate::enforce_secret::EnforceSecret;
43use crate::error::ConnectorResult;
44use crate::sink::{
45 Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink, SinkError, SinkParam,
46};
47
48const QUERY_ENGINE: &str =
49 "select distinct ?fields from system.tables where database = ? and name = ?";
50const QUERY_COLUMN: &str =
51 "select distinct ?fields from system.columns where database = ? and table = ? order by ?";
52pub const CLICKHOUSE_SINK: &str = "clickhouse";
53
54const ALLOW_EXPERIMENTAL_JSON_TYPE: &str = "allow_experimental_json_type";
55const INPUT_FORMAT_BINARY_READ_JSON_AS_STRING: &str = "input_format_binary_read_json_as_string";
56const OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING: &str = "output_format_binary_write_json_as_string";
57
58#[serde_as]
59#[derive(Deserialize, Debug, Clone, WithOptions)]
60pub struct ClickHouseCommon {
61 #[serde(rename = "clickhouse.url")]
62 pub url: String,
63 #[serde(rename = "clickhouse.user")]
64 pub user: String,
65 #[serde(rename = "clickhouse.password")]
66 pub password: String,
67 #[serde(rename = "clickhouse.database")]
68 pub database: String,
69 #[serde(rename = "clickhouse.table")]
70 pub table: String,
71 #[serde(rename = "clickhouse.delete.column")]
72 pub delete_column: Option<String>,
73 #[serde(default = "default_commit_checkpoint_interval")]
75 #[serde_as(as = "DisplayFromStr")]
76 #[with_option(allow_alter_on_fly)]
77 pub commit_checkpoint_interval: u64,
78}
79
80impl EnforceSecret for ClickHouseCommon {
81 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
82 "clickhouse.password", "clickhouse.user"
83 };
84}
85
86#[allow(clippy::enum_variant_names)]
87#[derive(Debug)]
88enum ClickHouseEngine {
89 MergeTree,
90 ReplacingMergeTree(Option<String>),
91 SummingMergeTree,
92 AggregatingMergeTree,
93 CollapsingMergeTree(String),
94 VersionedCollapsingMergeTree(String),
95 GraphiteMergeTree,
96 ReplicatedMergeTree,
97 ReplicatedReplacingMergeTree(Option<String>),
98 ReplicatedSummingMergeTree,
99 ReplicatedAggregatingMergeTree,
100 ReplicatedCollapsingMergeTree(String),
101 ReplicatedVersionedCollapsingMergeTree(String),
102 ReplicatedGraphiteMergeTree,
103 SharedMergeTree,
104 SharedReplacingMergeTree(Option<String>),
105 SharedSummingMergeTree,
106 SharedAggregatingMergeTree,
107 SharedCollapsingMergeTree(String),
108 SharedVersionedCollapsingMergeTree(String),
109 SharedGraphiteMergeTree,
110 Null,
111}
112impl ClickHouseEngine {
113 pub fn is_collapsing_engine(&self) -> bool {
114 matches!(
115 self,
116 ClickHouseEngine::CollapsingMergeTree(_)
117 | ClickHouseEngine::VersionedCollapsingMergeTree(_)
118 | ClickHouseEngine::ReplicatedCollapsingMergeTree(_)
119 | ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(_)
120 | ClickHouseEngine::SharedCollapsingMergeTree(_)
121 | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
122 )
123 }
124
125 pub fn is_delete_replacing_engine(&self) -> bool {
126 match self {
127 ClickHouseEngine::ReplacingMergeTree(delete_col) => delete_col.is_some(),
128 ClickHouseEngine::ReplicatedReplacingMergeTree(delete_col) => delete_col.is_some(),
129 ClickHouseEngine::SharedReplacingMergeTree(delete_col) => delete_col.is_some(),
130 _ => false,
131 }
132 }
133
134 pub fn get_delete_col(&self) -> Option<String> {
135 match self {
136 ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.clone()),
137 ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => {
138 Some(delete_col.clone())
139 }
140 ClickHouseEngine::SharedReplacingMergeTree(Some(delete_col)) => {
141 Some(delete_col.clone())
142 }
143 _ => None,
144 }
145 }
146
147 pub fn get_sign_name(&self) -> Option<String> {
148 match self {
149 ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.clone()),
150 ClickHouseEngine::VersionedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
151 ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
152 ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(sign_name) => {
153 Some(sign_name.clone())
154 }
155 ClickHouseEngine::SharedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
156 ClickHouseEngine::SharedVersionedCollapsingMergeTree(sign_name) => {
157 Some(sign_name.clone())
158 }
159 _ => None,
160 }
161 }
162
163 pub fn is_shared_tree(&self) -> bool {
164 matches!(
165 self,
166 ClickHouseEngine::SharedMergeTree
167 | ClickHouseEngine::SharedReplacingMergeTree(_)
168 | ClickHouseEngine::SharedSummingMergeTree
169 | ClickHouseEngine::SharedAggregatingMergeTree
170 | ClickHouseEngine::SharedCollapsingMergeTree(_)
171 | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
172 | ClickHouseEngine::SharedGraphiteMergeTree
173 )
174 }
175
176 pub fn from_query_engine(
177 engine_name: &ClickhouseQueryEngine,
178 config: &ClickHouseConfig,
179 ) -> Result<Self> {
180 match engine_name.engine.as_str() {
181 "MergeTree" => Ok(ClickHouseEngine::MergeTree),
182 "Null" => Ok(ClickHouseEngine::Null),
183 "ReplacingMergeTree" => {
184 let delete_column = config.common.delete_column.clone();
185 Ok(ClickHouseEngine::ReplacingMergeTree(delete_column))
186 }
187 "SummingMergeTree" => Ok(ClickHouseEngine::SummingMergeTree),
188 "AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree),
189 "VersionedCollapsingMergeTree" => {
191 let sign_name = engine_name
192 .create_table_query
193 .split("VersionedCollapsingMergeTree(")
194 .last()
195 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
196 .split(',')
197 .next()
198 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
199 .trim()
200 .to_owned();
201 Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name))
202 }
203 "CollapsingMergeTree" => {
205 let sign_name = engine_name
206 .create_table_query
207 .split("CollapsingMergeTree(")
208 .last()
209 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
210 .split(')')
211 .next()
212 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
213 .trim()
214 .to_owned();
215 Ok(ClickHouseEngine::CollapsingMergeTree(sign_name))
216 }
217 "GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree),
218 "ReplicatedMergeTree" => Ok(ClickHouseEngine::ReplicatedMergeTree),
219 "ReplicatedReplacingMergeTree" => {
220 let delete_column = config.common.delete_column.clone();
221 Ok(ClickHouseEngine::ReplicatedReplacingMergeTree(
222 delete_column,
223 ))
224 }
225 "ReplicatedSummingMergeTree" => Ok(ClickHouseEngine::ReplicatedSummingMergeTree),
226 "ReplicatedAggregatingMergeTree" => {
227 Ok(ClickHouseEngine::ReplicatedAggregatingMergeTree)
228 }
229 "ReplicatedVersionedCollapsingMergeTree" => {
231 let sign_name = engine_name
232 .create_table_query
233 .split("ReplicatedVersionedCollapsingMergeTree(")
234 .last()
235 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
236 .split(',')
237 .rev()
238 .nth(1)
239 .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
240 .trim()
241 .to_owned();
242 Ok(ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(
243 sign_name,
244 ))
245 }
246 "ReplicatedCollapsingMergeTree" => {
248 let sign_name = engine_name
249 .create_table_query
250 .split("ReplicatedCollapsingMergeTree(")
251 .last()
252 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
253 .split(')')
254 .next()
255 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
256 .split(',')
257 .next_back()
258 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
259 .trim()
260 .to_owned();
261 Ok(ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name))
262 }
263 "ReplicatedGraphiteMergeTree" => Ok(ClickHouseEngine::ReplicatedGraphiteMergeTree),
264 "SharedMergeTree" => Ok(ClickHouseEngine::SharedMergeTree),
265 "SharedReplacingMergeTree" => {
266 let delete_column = config.common.delete_column.clone();
267 Ok(ClickHouseEngine::SharedReplacingMergeTree(delete_column))
268 }
269 "SharedSummingMergeTree" => Ok(ClickHouseEngine::SharedSummingMergeTree),
270 "SharedAggregatingMergeTree" => Ok(ClickHouseEngine::SharedAggregatingMergeTree),
271 "SharedVersionedCollapsingMergeTree" => {
273 let sign_name = engine_name
274 .create_table_query
275 .split("SharedVersionedCollapsingMergeTree(")
276 .last()
277 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
278 .split(',')
279 .rev()
280 .nth(1)
281 .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
282 .trim()
283 .to_owned();
284 Ok(ClickHouseEngine::SharedVersionedCollapsingMergeTree(
285 sign_name,
286 ))
287 }
288 "SharedCollapsingMergeTree" => {
290 let sign_name = engine_name
291 .create_table_query
292 .split("SharedCollapsingMergeTree(")
293 .last()
294 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
295 .split(')')
296 .next()
297 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
298 .split(',')
299 .next_back()
300 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
301 .trim()
302 .to_owned();
303 Ok(ClickHouseEngine::SharedCollapsingMergeTree(sign_name))
304 }
305 "SharedGraphiteMergeTree" => Ok(ClickHouseEngine::SharedGraphiteMergeTree),
306 _ => Err(SinkError::ClickHouse(format!(
307 "Cannot find clickhouse engine {:?}",
308 engine_name.engine
309 ))),
310 }
311 }
312}
313
314impl ClickHouseCommon {
315 pub(crate) fn build_client(&self) -> ConnectorResult<ClickHouseClient> {
316 let client = ClickHouseClient::default() .with_url(&self.url)
318 .with_user(&self.user)
319 .with_password(&self.password)
320 .with_database(&self.database)
321 .with_option(ALLOW_EXPERIMENTAL_JSON_TYPE, "1")
322 .with_option(INPUT_FORMAT_BINARY_READ_JSON_AS_STRING, "1")
323 .with_option(OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING, "1");
324 Ok(client)
325 }
326}
327
328#[serde_as]
329#[derive(Clone, Debug, Deserialize, WithOptions)]
330pub struct ClickHouseConfig {
331 #[serde(flatten)]
332 pub common: ClickHouseCommon,
333
334 pub r#type: String, }
336
337impl EnforceSecret for ClickHouseConfig {
338 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
339 ClickHouseCommon::enforce_one(prop)
340 }
341
342 fn enforce_secret<'a>(
343 prop_iter: impl Iterator<Item = &'a str>,
344 ) -> crate::error::ConnectorResult<()> {
345 for prop in prop_iter {
346 ClickHouseCommon::enforce_one(prop)?;
347 }
348 Ok(())
349 }
350}
351
352#[derive(Clone, Debug)]
353pub struct ClickHouseSink {
354 pub config: ClickHouseConfig,
355 schema: Schema,
356 pk_indices: Vec<usize>,
357 is_append_only: bool,
358}
359
360impl EnforceSecret for ClickHouseSink {
361 fn enforce_secret<'a>(
362 prop_iter: impl Iterator<Item = &'a str>,
363 ) -> crate::error::ConnectorResult<()> {
364 for prop in prop_iter {
365 ClickHouseConfig::enforce_one(prop)?;
366 }
367 Ok(())
368 }
369}
370
371impl ClickHouseConfig {
372 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
373 let config =
374 serde_json::from_value::<ClickHouseConfig>(serde_json::to_value(properties).unwrap())
375 .map_err(|e| SinkError::Config(anyhow!(e)))?;
376 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
377 return Err(SinkError::Config(anyhow!(
378 "`{}` must be {}, or {}",
379 SINK_TYPE_OPTION,
380 SINK_TYPE_APPEND_ONLY,
381 SINK_TYPE_UPSERT
382 )));
383 }
384 Ok(config)
385 }
386}
387
388impl TryFrom<SinkParam> for ClickHouseSink {
389 type Error = SinkError;
390
391 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
392 let schema = param.schema();
393 let config = ClickHouseConfig::from_btreemap(param.properties)?;
394 Ok(Self {
395 config,
396 schema,
397 pk_indices: param.downstream_pk,
398 is_append_only: param.sink_type.is_append_only(),
399 })
400 }
401}
402
403impl ClickHouseSink {
404 fn check_column_name_and_type(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
406 let rw_fields_name = build_fields_name_type_from_schema(&self.schema)?;
407 let clickhouse_columns_desc: HashMap<String, SystemColumn> = clickhouse_columns_desc
408 .iter()
409 .map(|s| (s.name.clone(), s.clone()))
410 .collect();
411
412 if rw_fields_name.len().gt(&clickhouse_columns_desc.len()) {
413 return Err(SinkError::ClickHouse("The columns of the sink must be equal to or a superset of the target table's columns.".to_owned()));
414 }
415
416 for i in rw_fields_name {
417 let value = clickhouse_columns_desc.get(&i.0).ok_or_else(|| {
418 SinkError::ClickHouse(format!(
419 "Column name don't find in clickhouse, risingwave is {:?} ",
420 i.0
421 ))
422 })?;
423
424 Self::check_and_correct_column_type(&i.1, value)?;
425 }
426 Ok(())
427 }
428
429 fn check_pk_match(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
431 let mut clickhouse_pks: HashSet<String> = clickhouse_columns_desc
432 .iter()
433 .filter(|s| s.is_in_primary_key == 1)
434 .map(|s| s.name.clone())
435 .collect();
436
437 for (_, field) in self
438 .schema
439 .fields()
440 .iter()
441 .enumerate()
442 .filter(|(index, _)| self.pk_indices.contains(index))
443 {
444 if !clickhouse_pks.remove(&field.name) {
445 return Err(SinkError::ClickHouse(
446 "Clicklhouse and RisingWave pk is not match".to_owned(),
447 ));
448 }
449 }
450
451 if !clickhouse_pks.is_empty() {
452 return Err(SinkError::ClickHouse(
453 "Clicklhouse and RisingWave pk is not match".to_owned(),
454 ));
455 }
456 Ok(())
457 }
458
459 fn check_and_correct_column_type(
461 fields_type: &DataType,
462 ck_column: &SystemColumn,
463 ) -> Result<()> {
464 let is_match = match fields_type {
466 risingwave_common::types::DataType::Boolean => Ok(ck_column.r#type.contains("Bool")),
467 risingwave_common::types::DataType::Int16 => Ok(ck_column.r#type.contains("UInt16")
468 | ck_column.r#type.contains("Int16")
469 | ck_column.r#type.contains("Enum16")),
472 risingwave_common::types::DataType::Int32 => {
473 Ok(ck_column.r#type.contains("UInt32") | ck_column.r#type.contains("Int32"))
474 }
475 risingwave_common::types::DataType::Int64 => {
476 Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
477 }
478 risingwave_common::types::DataType::Float32 => Ok(ck_column.r#type.contains("Float32")),
479 risingwave_common::types::DataType::Float64 => Ok(ck_column.r#type.contains("Float64")),
480 risingwave_common::types::DataType::Decimal => Ok(ck_column.r#type.contains("Decimal")),
481 risingwave_common::types::DataType::Date => Ok(ck_column.r#type.contains("Date32")),
482 risingwave_common::types::DataType::Varchar => Ok(ck_column.r#type.contains("String")),
483 risingwave_common::types::DataType::Time => Err(SinkError::ClickHouse(
484 "clickhouse can not support Time".to_owned(),
485 )),
486 risingwave_common::types::DataType::Timestamp => Err(SinkError::ClickHouse(
487 "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
488 )),
489 risingwave_common::types::DataType::Timestamptz => {
490 Ok(ck_column.r#type.contains("DateTime64"))
491 }
492 risingwave_common::types::DataType::Interval => Err(SinkError::ClickHouse(
493 "clickhouse can not support Interval".to_owned(),
494 )),
495 risingwave_common::types::DataType::Struct(_) => Err(SinkError::ClickHouse(
496 "struct needs to be converted into a list".to_owned(),
497 )),
498 risingwave_common::types::DataType::List(list) => {
499 Self::check_and_correct_column_type(list.as_ref(), ck_column)?;
500 Ok(ck_column.r#type.contains("Array"))
501 }
502 risingwave_common::types::DataType::Bytea => Err(SinkError::ClickHouse(
503 "clickhouse can not support Bytea".to_owned(),
504 )),
505 risingwave_common::types::DataType::Jsonb => Ok(ck_column.r#type.contains("JSON")),
506 risingwave_common::types::DataType::Serial => {
507 Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
508 }
509 risingwave_common::types::DataType::Int256 => Err(SinkError::ClickHouse(
510 "clickhouse can not support Int256".to_owned(),
511 )),
512 risingwave_common::types::DataType::Map(_) => Err(SinkError::ClickHouse(
513 "clickhouse can not support Map".to_owned(),
514 )),
515 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
516 };
517 if !is_match? {
518 return Err(SinkError::ClickHouse(format!(
519 "Column type can not match name is {:?}, risingwave is {:?} and clickhouse is {:?}",
520 ck_column.name, fields_type, ck_column.r#type
521 )));
522 }
523
524 Ok(())
525 }
526}
527
528impl Sink for ClickHouseSink {
529 type Coordinator = DummySinkCommitCoordinator;
530 type LogSinker = DecoupleCheckpointLogSinkerOf<ClickHouseSinkWriter>;
531
532 const SINK_NAME: &'static str = CLICKHOUSE_SINK;
533
534 async fn validate(&self) -> Result<()> {
535 if !self.is_append_only && self.pk_indices.is_empty() {
537 return Err(SinkError::Config(anyhow!(
538 "Primary key not defined for upsert clickhouse sink (please define in `primary_key` field)"
539 )));
540 }
541
542 let client = self.config.common.build_client()?;
544
545 let (clickhouse_column, clickhouse_engine) =
546 query_column_engine_from_ck(client, &self.config).await?;
547 if clickhouse_engine.is_shared_tree() {
548 risingwave_common::license::Feature::ClickHouseSharedEngine
549 .check_available()
550 .map_err(|e| anyhow::anyhow!(e))?;
551 }
552
553 if !self.is_append_only
554 && !clickhouse_engine.is_collapsing_engine()
555 && !clickhouse_engine.is_delete_replacing_engine()
556 {
557 return match clickhouse_engine {
558 ClickHouseEngine::ReplicatedReplacingMergeTree(None) | ClickHouseEngine::ReplacingMergeTree(None) | ClickHouseEngine::SharedReplacingMergeTree(None) => {
559 Err(SinkError::ClickHouse("To enable upsert with a `ReplacingMergeTree`, you must set a `clickhouse.delete.column` to the UInt8 column in ClickHouse used to signify deletes. See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree#is_deleted for more information".to_owned()))
560 }
561 _ => Err(SinkError::ClickHouse("If you want to use upsert, please use either `VersionedCollapsingMergeTree`, `CollapsingMergeTree` or the `ReplacingMergeTree` in ClickHouse".to_owned()))
562 };
563 }
564
565 self.check_column_name_and_type(&clickhouse_column)?;
566 if !self.is_append_only {
567 self.check_pk_match(&clickhouse_column)?;
568 }
569
570 if self.config.common.commit_checkpoint_interval == 0 {
571 return Err(SinkError::Config(anyhow!(
572 "`commit_checkpoint_interval` must be greater than 0"
573 )));
574 }
575 Ok(())
576 }
577
578 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
579 ClickHouseConfig::from_btreemap(config.clone())?;
580 Ok(())
581 }
582
583 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
584 let writer = ClickHouseSinkWriter::new(
585 self.config.clone(),
586 self.schema.clone(),
587 self.pk_indices.clone(),
588 self.is_append_only,
589 )
590 .await?;
591 let commit_checkpoint_interval =
592 NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
593 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
594 );
595
596 Ok(DecoupleCheckpointLogSinkerOf::new(
597 writer,
598 SinkWriterMetrics::new(&writer_param),
599 commit_checkpoint_interval,
600 ))
601 }
602}
603pub struct ClickHouseSinkWriter {
604 pub config: ClickHouseConfig,
605 #[expect(dead_code)]
606 schema: Schema,
607 #[expect(dead_code)]
608 pk_indices: Vec<usize>,
609 client: ClickHouseClient,
610 #[expect(dead_code)]
611 is_append_only: bool,
612 column_correct_vec: Vec<ClickHouseSchemaFeature>,
614 rw_fields_name_after_calibration: Vec<String>,
615 clickhouse_engine: ClickHouseEngine,
616 inserter: Option<Insert<ClickHouseColumn>>,
617}
618#[derive(Debug)]
619struct ClickHouseSchemaFeature {
620 can_null: bool,
621 accuracy_time: u8,
623
624 accuracy_decimal: (u8, u8),
625}
626
627impl ClickHouseSinkWriter {
628 pub async fn new(
629 config: ClickHouseConfig,
630 schema: Schema,
631 pk_indices: Vec<usize>,
632 is_append_only: bool,
633 ) -> Result<Self> {
634 let client = config.common.build_client()?;
635
636 let (clickhouse_column, clickhouse_engine) =
637 query_column_engine_from_ck(client.clone(), &config).await?;
638
639 let column_correct_vec: Result<Vec<ClickHouseSchemaFeature>> = clickhouse_column
640 .iter()
641 .map(Self::build_column_correct_vec)
642 .collect();
643 let mut rw_fields_name_after_calibration = build_fields_name_type_from_schema(&schema)?
644 .iter()
645 .map(|(a, _)| a.clone())
646 .collect_vec();
647
648 if let Some(sign) = clickhouse_engine.get_sign_name() {
649 rw_fields_name_after_calibration.push(sign);
650 }
651 if let Some(delete_col) = clickhouse_engine.get_delete_col() {
652 rw_fields_name_after_calibration.push(delete_col);
653 }
654 Ok(Self {
655 config,
656 schema,
657 pk_indices,
658 client,
659 is_append_only,
660 column_correct_vec: column_correct_vec?,
661 rw_fields_name_after_calibration,
662 clickhouse_engine,
663 inserter: None,
664 })
665 }
666
667 fn build_column_correct_vec(ck_column: &SystemColumn) -> Result<ClickHouseSchemaFeature> {
670 let can_null = ck_column.r#type.contains("Nullable");
671 let accuracy_time = if ck_column.r#type.contains("DateTime64(") {
673 ck_column
674 .r#type
675 .split("DateTime64(")
676 .last()
677 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
678 .split(')')
679 .next()
680 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
681 .split(',')
682 .next()
683 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
684 .parse::<u8>()
685 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?
686 } else {
687 0_u8
688 };
689 let accuracy_decimal = if ck_column.r#type.contains("Decimal(") {
690 let decimal_all = ck_column
691 .r#type
692 .split("Decimal(")
693 .last()
694 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
695 .split(')')
696 .next()
697 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
698 .split(", ")
699 .collect_vec();
700 let length = decimal_all
701 .first()
702 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
703 .parse::<u8>()
704 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
705
706 if length > 38 {
707 return Err(SinkError::ClickHouse(
708 "RW don't support Decimal256".to_owned(),
709 ));
710 }
711
712 let scale = decimal_all
713 .last()
714 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
715 .parse::<u8>()
716 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
717 (length, scale)
718 } else {
719 (0_u8, 0_u8)
720 };
721 Ok(ClickHouseSchemaFeature {
722 can_null,
723 accuracy_time,
724 accuracy_decimal,
725 })
726 }
727
728 async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
729 if self.inserter.is_none() {
730 self.inserter = Some(self.client.insert_with_fields_name(
731 &self.config.common.table,
732 self.rw_fields_name_after_calibration.clone(),
733 )?);
734 }
735 for (op, row) in chunk.rows() {
736 let mut clickhouse_filed_vec = vec![];
737 for (index, data) in row.iter().enumerate() {
738 clickhouse_filed_vec.extend(ClickHouseFieldWithNull::from_scalar_ref(
739 data,
740 &self.column_correct_vec,
741 index,
742 )?);
743 }
744 match op {
745 Op::Insert | Op::UpdateInsert => {
746 if self.clickhouse_engine.is_collapsing_engine() {
747 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
748 ClickHouseField::Int8(1),
749 ));
750 }
751 if self.clickhouse_engine.is_delete_replacing_engine() {
752 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
753 ClickHouseField::Int8(0),
754 ))
755 }
756 }
757 Op::Delete | Op::UpdateDelete => {
758 if !self.clickhouse_engine.is_collapsing_engine()
759 && !self.clickhouse_engine.is_delete_replacing_engine()
760 {
761 return Err(SinkError::ClickHouse(
762 "Clickhouse engine don't support upsert".to_owned(),
763 ));
764 }
765 if self.clickhouse_engine.is_collapsing_engine() {
766 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
767 ClickHouseField::Int8(-1),
768 ));
769 }
770 if self.clickhouse_engine.is_delete_replacing_engine() {
771 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
772 ClickHouseField::Int8(1),
773 ))
774 }
775 }
776 }
777 let clickhouse_column = ClickHouseColumn {
778 row: clickhouse_filed_vec,
779 };
780 self.inserter
781 .as_mut()
782 .unwrap()
783 .write(&clickhouse_column)
784 .await?;
785 }
786 Ok(())
787 }
788}
789
790#[async_trait]
791impl SinkWriter for ClickHouseSinkWriter {
792 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
793 self.write(chunk).await
794 }
795
796 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
797 Ok(())
798 }
799
800 async fn abort(&mut self) -> Result<()> {
801 Ok(())
802 }
803
804 async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
805 if is_checkpoint && let Some(inserter) = self.inserter.take() {
806 inserter.end().await?;
807 }
808 Ok(())
809 }
810}
811
812#[derive(ClickHouseRow, Deserialize, Clone)]
813struct SystemColumn {
814 name: String,
815 r#type: String,
816 is_in_primary_key: u8,
817}
818
819#[derive(ClickHouseRow, Deserialize)]
820struct ClickhouseQueryEngine {
821 #[expect(dead_code)]
822 name: String,
823 engine: String,
824 create_table_query: String,
825}
826
827async fn query_column_engine_from_ck(
828 client: ClickHouseClient,
829 config: &ClickHouseConfig,
830) -> Result<(Vec<SystemColumn>, ClickHouseEngine)> {
831 let query_engine = QUERY_ENGINE;
832 let query_column = QUERY_COLUMN;
833
834 let clickhouse_engine = client
835 .query(query_engine)
836 .bind(config.common.database.clone())
837 .bind(config.common.table.clone())
838 .fetch_all::<ClickhouseQueryEngine>()
839 .await?;
840 let mut clickhouse_column = client
841 .query(query_column)
842 .bind(config.common.database.clone())
843 .bind(config.common.table.clone())
844 .bind("position")
845 .fetch_all::<SystemColumn>()
846 .await?;
847 if clickhouse_engine.is_empty() || clickhouse_column.is_empty() {
848 return Err(SinkError::ClickHouse(format!(
849 "table {:?}.{:?} is not find in clickhouse",
850 config.common.database, config.common.table
851 )));
852 }
853
854 let clickhouse_engine =
855 ClickHouseEngine::from_query_engine(clickhouse_engine.first().unwrap(), config)?;
856
857 if let Some(sign) = &clickhouse_engine.get_sign_name() {
858 clickhouse_column.retain(|a| sign.ne(&a.name))
859 }
860
861 if let Some(delete_col) = &clickhouse_engine.get_delete_col() {
862 clickhouse_column.retain(|a| delete_col.ne(&a.name))
863 }
864
865 Ok((clickhouse_column, clickhouse_engine))
866}
867
868#[derive(ClickHouseRow, Debug)]
870struct ClickHouseColumn {
871 row: Vec<ClickHouseFieldWithNull>,
872}
873
874#[derive(Debug)]
876enum ClickHouseField {
877 Int16(i16),
878 Int32(i32),
879 Int64(i64),
880 Serial(Serial),
881 Float32(f32),
882 Float64(f64),
883 String(String),
884 Bool(bool),
885 List(Vec<ClickHouseFieldWithNull>),
886 Int8(i8),
887 Decimal(ClickHouseDecimal),
888}
889#[derive(Debug)]
890enum ClickHouseDecimal {
891 Decimal32(i32),
892 Decimal64(i64),
893 Decimal128(i128),
894}
895impl Serialize for ClickHouseDecimal {
896 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
897 where
898 S: serde::Serializer,
899 {
900 match self {
901 ClickHouseDecimal::Decimal32(v) => serializer.serialize_i32(*v),
902 ClickHouseDecimal::Decimal64(v) => serializer.serialize_i64(*v),
903 ClickHouseDecimal::Decimal128(v) => serializer.serialize_i128(*v),
904 }
905 }
906}
907
908#[derive(Debug)]
910enum ClickHouseFieldWithNull {
911 WithSome(ClickHouseField),
912 WithoutSome(ClickHouseField),
913 None,
914}
915
916impl ClickHouseFieldWithNull {
917 pub fn from_scalar_ref(
918 data: Option<ScalarRefImpl<'_>>,
919 clickhouse_schema_feature_vec: &Vec<ClickHouseSchemaFeature>,
920 clickhouse_schema_feature_index: usize,
921 ) -> Result<Vec<ClickHouseFieldWithNull>> {
922 let clickhouse_schema_feature = clickhouse_schema_feature_vec
923 .get(clickhouse_schema_feature_index)
924 .ok_or_else(|| SinkError::ClickHouse(format!("No column found from clickhouse table schema, index is {clickhouse_schema_feature_index}")))?;
925 if data.is_none() {
926 if !clickhouse_schema_feature.can_null {
927 return Err(SinkError::ClickHouse(
928 "clickhouse column can not insert null".to_owned(),
929 ));
930 } else {
931 return Ok(vec![ClickHouseFieldWithNull::None]);
932 }
933 }
934 let data = match data.unwrap() {
935 ScalarRefImpl::Int16(v) => ClickHouseField::Int16(v),
936 ScalarRefImpl::Int32(v) => ClickHouseField::Int32(v),
937 ScalarRefImpl::Int64(v) => ClickHouseField::Int64(v),
938 ScalarRefImpl::Int256(_) => {
939 return Err(SinkError::ClickHouse(
940 "clickhouse can not support Int256".to_owned(),
941 ));
942 }
943 ScalarRefImpl::Serial(v) => ClickHouseField::Serial(v),
944 ScalarRefImpl::Float32(v) => ClickHouseField::Float32(v.into_inner()),
945 ScalarRefImpl::Float64(v) => ClickHouseField::Float64(v.into_inner()),
946 ScalarRefImpl::Utf8(v) => ClickHouseField::String(v.to_owned()),
947 ScalarRefImpl::Bool(v) => ClickHouseField::Bool(v),
948 ScalarRefImpl::Decimal(d) => {
949 let d = if let Decimal::Normalized(d) = d {
950 let scale =
951 clickhouse_schema_feature.accuracy_decimal.1 as i32 - d.scale() as i32;
952 if scale < 0 {
953 d.mantissa() / 10_i128.pow(scale.unsigned_abs())
954 } else {
955 d.mantissa() * 10_i128.pow(scale as u32)
956 }
957 } else if clickhouse_schema_feature.can_null {
958 warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse null!");
959 return Ok(vec![ClickHouseFieldWithNull::None]);
960 } else {
961 warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse 0!");
962 0_i128
963 };
964 if clickhouse_schema_feature.accuracy_decimal.0 <= 9 {
965 ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(d as i32))
966 } else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 {
967 ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(d as i64))
968 } else {
969 ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(d))
970 }
971 }
972 ScalarRefImpl::Interval(_) => {
973 return Err(SinkError::ClickHouse(
974 "clickhouse can not support Interval".to_owned(),
975 ));
976 }
977 ScalarRefImpl::Date(v) => {
978 let days = v.get_nums_days_unix_epoch();
979 ClickHouseField::Int32(days)
980 }
981 ScalarRefImpl::Time(_) => {
982 return Err(SinkError::ClickHouse(
983 "clickhouse can not support Time".to_owned(),
984 ));
985 }
986 ScalarRefImpl::Timestamp(_) => {
987 return Err(SinkError::ClickHouse(
988 "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
989 ));
990 }
991 ScalarRefImpl::Timestamptz(v) => {
992 let micros = v.timestamp_micros();
993 let ticks = match clickhouse_schema_feature.accuracy_time <= 6 {
994 true => {
995 micros / 10_i64.pow((6 - clickhouse_schema_feature.accuracy_time).into())
996 }
997 false => micros
998 .checked_mul(
999 10_i64.pow((clickhouse_schema_feature.accuracy_time - 6).into()),
1000 )
1001 .ok_or_else(|| SinkError::ClickHouse("DateTime64 overflow".to_owned()))?,
1002 };
1003 ClickHouseField::Int64(ticks)
1004 }
1005 ScalarRefImpl::Jsonb(v) => {
1006 let json_str = v.to_string();
1007 ClickHouseField::String(json_str)
1008 }
1009 ScalarRefImpl::Struct(v) => {
1010 let mut struct_vec = vec![];
1011 for (index, field) in v.iter_fields_ref().enumerate() {
1012 let a = Self::from_scalar_ref(
1013 field,
1014 clickhouse_schema_feature_vec,
1015 clickhouse_schema_feature_index + index,
1016 )?;
1017 struct_vec.push(ClickHouseFieldWithNull::WithoutSome(ClickHouseField::List(
1018 a,
1019 )));
1020 }
1021 return Ok(struct_vec);
1022 }
1023 ScalarRefImpl::List(v) => {
1024 let mut vec = vec![];
1025 for i in v.iter() {
1026 vec.extend(Self::from_scalar_ref(
1027 i,
1028 clickhouse_schema_feature_vec,
1029 clickhouse_schema_feature_index,
1030 )?)
1031 }
1032 return Ok(vec![ClickHouseFieldWithNull::WithoutSome(
1033 ClickHouseField::List(vec),
1034 )]);
1035 }
1036 ScalarRefImpl::Bytea(_) => {
1037 return Err(SinkError::ClickHouse(
1038 "clickhouse can not support Bytea".to_owned(),
1039 ));
1040 }
1041 ScalarRefImpl::Map(_) => {
1042 return Err(SinkError::ClickHouse(
1043 "clickhouse can not support Map".to_owned(),
1044 ));
1045 }
1046 ScalarRefImpl::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
1047 };
1048 let data = if clickhouse_schema_feature.can_null {
1049 vec![ClickHouseFieldWithNull::WithSome(data)]
1050 } else {
1051 vec![ClickHouseFieldWithNull::WithoutSome(data)]
1052 };
1053 Ok(data)
1054 }
1055}
1056
1057impl Serialize for ClickHouseField {
1058 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1059 where
1060 S: serde::Serializer,
1061 {
1062 match self {
1063 ClickHouseField::Int16(v) => serializer.serialize_i16(*v),
1064 ClickHouseField::Int32(v) => serializer.serialize_i32(*v),
1065 ClickHouseField::Int64(v) => serializer.serialize_i64(*v),
1066 ClickHouseField::Serial(v) => v.serialize(serializer),
1067 ClickHouseField::Float32(v) => serializer.serialize_f32(*v),
1068 ClickHouseField::Float64(v) => serializer.serialize_f64(*v),
1069 ClickHouseField::String(v) => serializer.serialize_str(v),
1070 ClickHouseField::Bool(v) => serializer.serialize_bool(*v),
1071 ClickHouseField::List(v) => {
1072 let mut s = serializer.serialize_seq(Some(v.len()))?;
1073 for i in v {
1074 s.serialize_element(i)?;
1075 }
1076 s.end()
1077 }
1078 ClickHouseField::Decimal(v) => v.serialize(serializer),
1079 ClickHouseField::Int8(v) => serializer.serialize_i8(*v),
1080 }
1081 }
1082}
1083impl Serialize for ClickHouseFieldWithNull {
1084 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1085 where
1086 S: serde::Serializer,
1087 {
1088 match self {
1089 ClickHouseFieldWithNull::WithSome(v) => serializer.serialize_some(v),
1090 ClickHouseFieldWithNull::WithoutSome(v) => v.serialize(serializer),
1091 ClickHouseFieldWithNull::None => serializer.serialize_none(),
1092 }
1093 }
1094}
1095impl Serialize for ClickHouseColumn {
1096 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1097 where
1098 S: serde::Serializer,
1099 {
1100 let mut s = serializer.serialize_struct("useless", self.row.len())?;
1101 for data in &self.row {
1102 s.serialize_field("useless", &data)?
1103 }
1104 s.end()
1105 }
1106}
1107
1108pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result<Vec<(String, DataType)>> {
1111 let mut vec = vec![];
1112 for field in schema.fields() {
1113 if let DataType::Struct(st) = &field.data_type {
1114 for (name, data_type) in st.iter() {
1115 if matches!(data_type, DataType::Struct(_)) {
1116 return Err(SinkError::ClickHouse(
1117 "Only one level of nesting is supported for struct".to_owned(),
1118 ));
1119 } else {
1120 vec.push((
1121 format!("{}.{}", field.name, name),
1122 DataType::List(Box::new(data_type.clone())),
1123 ))
1124 }
1125 }
1126 } else {
1127 vec.push((field.name.clone(), field.data_type()));
1128 }
1129 }
1130 Ok(vec)
1131}