1use core::fmt::Debug;
16use core::num::NonZeroU64;
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use anyhow::anyhow;
20use clickhouse::insert::Insert;
21use clickhouse::{Client as ClickHouseClient, Row as ClickHouseRow};
22use itertools::Itertools;
23use phf::{Set, phf_set};
24use risingwave_common::array::{Op, StreamChunk};
25use risingwave_common::catalog::Schema;
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
28use serde::Serialize;
29use serde::ser::{SerializeSeq, SerializeStruct};
30use serde_derive::Deserialize;
31use serde_with::{DisplayFromStr, serde_as};
32use thiserror_ext::AsReport;
33use tonic::async_trait;
34use tracing::warn;
35use with_options::WithOptions;
36
37use super::decouple_checkpoint_log_sink::{
38 DecoupleCheckpointLogSinkerOf, default_commit_checkpoint_interval,
39};
40use super::writer::SinkWriter;
41use super::{DummySinkCommitCoordinator, SinkWriterMetrics, SinkWriterParam};
42use crate::enforce_secret::EnforceSecret;
43use crate::error::ConnectorResult;
44use crate::sink::{
45 Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink, SinkError, SinkParam,
46};
47
48const QUERY_ENGINE: &str =
49 "select distinct ?fields from system.tables where database = ? and name = ?";
50const QUERY_COLUMN: &str =
51 "select distinct ?fields from system.columns where database = ? and table = ? order by ?";
52pub const CLICKHOUSE_SINK: &str = "clickhouse";
53
54const ALLOW_EXPERIMENTAL_JSON_TYPE: &str = "allow_experimental_json_type";
55const INPUT_FORMAT_BINARY_READ_JSON_AS_STRING: &str = "input_format_binary_read_json_as_string";
56const OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING: &str = "output_format_binary_write_json_as_string";
57
58#[serde_as]
59#[derive(Deserialize, Debug, Clone, WithOptions)]
60pub struct ClickHouseCommon {
61 #[serde(rename = "clickhouse.url")]
62 pub url: String,
63 #[serde(rename = "clickhouse.user")]
64 pub user: String,
65 #[serde(rename = "clickhouse.password")]
66 pub password: String,
67 #[serde(rename = "clickhouse.database")]
68 pub database: String,
69 #[serde(rename = "clickhouse.table")]
70 pub table: String,
71 #[serde(rename = "clickhouse.delete.column")]
72 pub delete_column: Option<String>,
73 #[serde(default = "default_commit_checkpoint_interval")]
75 #[serde_as(as = "DisplayFromStr")]
76 pub commit_checkpoint_interval: u64,
77}
78
79impl EnforceSecret for ClickHouseCommon {
80 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
81 "clickhouse.password", "clickhouse.user"
82 };
83}
84
85#[allow(clippy::enum_variant_names)]
86#[derive(Debug)]
87enum ClickHouseEngine {
88 MergeTree,
89 ReplacingMergeTree(Option<String>),
90 SummingMergeTree,
91 AggregatingMergeTree,
92 CollapsingMergeTree(String),
93 VersionedCollapsingMergeTree(String),
94 GraphiteMergeTree,
95 ReplicatedMergeTree,
96 ReplicatedReplacingMergeTree(Option<String>),
97 ReplicatedSummingMergeTree,
98 ReplicatedAggregatingMergeTree,
99 ReplicatedCollapsingMergeTree(String),
100 ReplicatedVersionedCollapsingMergeTree(String),
101 ReplicatedGraphiteMergeTree,
102 SharedMergeTree,
103 SharedReplacingMergeTree(Option<String>),
104 SharedSummingMergeTree,
105 SharedAggregatingMergeTree,
106 SharedCollapsingMergeTree(String),
107 SharedVersionedCollapsingMergeTree(String),
108 SharedGraphiteMergeTree,
109 Null,
110}
111impl ClickHouseEngine {
112 pub fn is_collapsing_engine(&self) -> bool {
113 matches!(
114 self,
115 ClickHouseEngine::CollapsingMergeTree(_)
116 | ClickHouseEngine::VersionedCollapsingMergeTree(_)
117 | ClickHouseEngine::ReplicatedCollapsingMergeTree(_)
118 | ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(_)
119 | ClickHouseEngine::SharedCollapsingMergeTree(_)
120 | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
121 )
122 }
123
124 pub fn is_delete_replacing_engine(&self) -> bool {
125 match self {
126 ClickHouseEngine::ReplacingMergeTree(delete_col) => delete_col.is_some(),
127 ClickHouseEngine::ReplicatedReplacingMergeTree(delete_col) => delete_col.is_some(),
128 ClickHouseEngine::SharedReplacingMergeTree(delete_col) => delete_col.is_some(),
129 _ => false,
130 }
131 }
132
133 pub fn get_delete_col(&self) -> Option<String> {
134 match self {
135 ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.to_string()),
136 ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => {
137 Some(delete_col.to_string())
138 }
139 ClickHouseEngine::SharedReplacingMergeTree(Some(delete_col)) => {
140 Some(delete_col.to_string())
141 }
142 _ => None,
143 }
144 }
145
146 pub fn get_sign_name(&self) -> Option<String> {
147 match self {
148 ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.to_string()),
149 ClickHouseEngine::VersionedCollapsingMergeTree(sign_name) => {
150 Some(sign_name.to_string())
151 }
152 ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name) => {
153 Some(sign_name.to_string())
154 }
155 ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(sign_name) => {
156 Some(sign_name.to_string())
157 }
158 ClickHouseEngine::SharedCollapsingMergeTree(sign_name) => Some(sign_name.to_string()),
159 ClickHouseEngine::SharedVersionedCollapsingMergeTree(sign_name) => {
160 Some(sign_name.to_string())
161 }
162 _ => None,
163 }
164 }
165
166 pub fn is_shared_tree(&self) -> bool {
167 matches!(
168 self,
169 ClickHouseEngine::SharedMergeTree
170 | ClickHouseEngine::SharedReplacingMergeTree(_)
171 | ClickHouseEngine::SharedSummingMergeTree
172 | ClickHouseEngine::SharedAggregatingMergeTree
173 | ClickHouseEngine::SharedCollapsingMergeTree(_)
174 | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
175 | ClickHouseEngine::SharedGraphiteMergeTree
176 )
177 }
178
179 pub fn from_query_engine(
180 engine_name: &ClickhouseQueryEngine,
181 config: &ClickHouseConfig,
182 ) -> Result<Self> {
183 match engine_name.engine.as_str() {
184 "MergeTree" => Ok(ClickHouseEngine::MergeTree),
185 "Null" => Ok(ClickHouseEngine::Null),
186 "ReplacingMergeTree" => {
187 let delete_column = config.common.delete_column.clone();
188 Ok(ClickHouseEngine::ReplacingMergeTree(delete_column))
189 }
190 "SummingMergeTree" => Ok(ClickHouseEngine::SummingMergeTree),
191 "AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree),
192 "VersionedCollapsingMergeTree" => {
194 let sign_name = engine_name
195 .create_table_query
196 .split("VersionedCollapsingMergeTree(")
197 .last()
198 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
199 .split(',')
200 .next()
201 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
202 .trim()
203 .to_owned();
204 Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name))
205 }
206 "CollapsingMergeTree" => {
208 let sign_name = engine_name
209 .create_table_query
210 .split("CollapsingMergeTree(")
211 .last()
212 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
213 .split(')')
214 .next()
215 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
216 .trim()
217 .to_owned();
218 Ok(ClickHouseEngine::CollapsingMergeTree(sign_name))
219 }
220 "GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree),
221 "ReplicatedMergeTree" => Ok(ClickHouseEngine::ReplicatedMergeTree),
222 "ReplicatedReplacingMergeTree" => {
223 let delete_column = config.common.delete_column.clone();
224 Ok(ClickHouseEngine::ReplicatedReplacingMergeTree(
225 delete_column,
226 ))
227 }
228 "ReplicatedSummingMergeTree" => Ok(ClickHouseEngine::ReplicatedSummingMergeTree),
229 "ReplicatedAggregatingMergeTree" => {
230 Ok(ClickHouseEngine::ReplicatedAggregatingMergeTree)
231 }
232 "ReplicatedVersionedCollapsingMergeTree" => {
234 let sign_name = engine_name
235 .create_table_query
236 .split("ReplicatedVersionedCollapsingMergeTree(")
237 .last()
238 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
239 .split(',')
240 .rev()
241 .nth(1)
242 .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
243 .trim()
244 .to_owned();
245 Ok(ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(
246 sign_name,
247 ))
248 }
249 "ReplicatedCollapsingMergeTree" => {
251 let sign_name = engine_name
252 .create_table_query
253 .split("ReplicatedCollapsingMergeTree(")
254 .last()
255 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
256 .split(')')
257 .next()
258 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
259 .split(',')
260 .next_back()
261 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
262 .trim()
263 .to_owned();
264 Ok(ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name))
265 }
266 "ReplicatedGraphiteMergeTree" => Ok(ClickHouseEngine::ReplicatedGraphiteMergeTree),
267 "SharedMergeTree" => Ok(ClickHouseEngine::SharedMergeTree),
268 "SharedReplacingMergeTree" => {
269 let delete_column = config.common.delete_column.clone();
270 Ok(ClickHouseEngine::SharedReplacingMergeTree(delete_column))
271 }
272 "SharedSummingMergeTree" => Ok(ClickHouseEngine::SharedSummingMergeTree),
273 "SharedAggregatingMergeTree" => Ok(ClickHouseEngine::SharedAggregatingMergeTree),
274 "SharedVersionedCollapsingMergeTree" => {
276 let sign_name = engine_name
277 .create_table_query
278 .split("SharedVersionedCollapsingMergeTree(")
279 .last()
280 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
281 .split(',')
282 .rev()
283 .nth(1)
284 .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
285 .trim()
286 .to_owned();
287 Ok(ClickHouseEngine::SharedVersionedCollapsingMergeTree(
288 sign_name,
289 ))
290 }
291 "SharedCollapsingMergeTree" => {
293 let sign_name = engine_name
294 .create_table_query
295 .split("SharedCollapsingMergeTree(")
296 .last()
297 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
298 .split(')')
299 .next()
300 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
301 .split(',')
302 .next_back()
303 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
304 .trim()
305 .to_owned();
306 Ok(ClickHouseEngine::SharedCollapsingMergeTree(sign_name))
307 }
308 "SharedGraphiteMergeTree" => Ok(ClickHouseEngine::SharedGraphiteMergeTree),
309 _ => Err(SinkError::ClickHouse(format!(
310 "Cannot find clickhouse engine {:?}",
311 engine_name.engine
312 ))),
313 }
314 }
315}
316
317impl ClickHouseCommon {
318 pub(crate) fn build_client(&self) -> ConnectorResult<ClickHouseClient> {
319 let client = ClickHouseClient::default() .with_url(&self.url)
321 .with_user(&self.user)
322 .with_password(&self.password)
323 .with_database(&self.database)
324 .with_option(ALLOW_EXPERIMENTAL_JSON_TYPE, "1")
325 .with_option(INPUT_FORMAT_BINARY_READ_JSON_AS_STRING, "1")
326 .with_option(OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING, "1");
327 Ok(client)
328 }
329}
330
331#[serde_as]
332#[derive(Clone, Debug, Deserialize, WithOptions)]
333pub struct ClickHouseConfig {
334 #[serde(flatten)]
335 pub common: ClickHouseCommon,
336
337 pub r#type: String, }
339
340impl EnforceSecret for ClickHouseConfig {
341 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
342 ClickHouseCommon::enforce_one(prop)
343 }
344
345 fn enforce_secret<'a>(
346 prop_iter: impl Iterator<Item = &'a str>,
347 ) -> crate::error::ConnectorResult<()> {
348 for prop in prop_iter {
349 ClickHouseCommon::enforce_one(prop)?;
350 }
351 Ok(())
352 }
353}
354
355#[derive(Clone, Debug)]
356pub struct ClickHouseSink {
357 pub config: ClickHouseConfig,
358 schema: Schema,
359 pk_indices: Vec<usize>,
360 is_append_only: bool,
361}
362
363impl EnforceSecret for ClickHouseSink {
364 fn enforce_secret<'a>(
365 prop_iter: impl Iterator<Item = &'a str>,
366 ) -> crate::error::ConnectorResult<()> {
367 for prop in prop_iter {
368 ClickHouseConfig::enforce_one(prop)?;
369 }
370 Ok(())
371 }
372}
373
374impl ClickHouseConfig {
375 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
376 let config =
377 serde_json::from_value::<ClickHouseConfig>(serde_json::to_value(properties).unwrap())
378 .map_err(|e| SinkError::Config(anyhow!(e)))?;
379 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
380 return Err(SinkError::Config(anyhow!(
381 "`{}` must be {}, or {}",
382 SINK_TYPE_OPTION,
383 SINK_TYPE_APPEND_ONLY,
384 SINK_TYPE_UPSERT
385 )));
386 }
387 Ok(config)
388 }
389}
390
391impl TryFrom<SinkParam> for ClickHouseSink {
392 type Error = SinkError;
393
394 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
395 let schema = param.schema();
396 let config = ClickHouseConfig::from_btreemap(param.properties)?;
397 Ok(Self {
398 config,
399 schema,
400 pk_indices: param.downstream_pk,
401 is_append_only: param.sink_type.is_append_only(),
402 })
403 }
404}
405
406impl ClickHouseSink {
407 fn check_column_name_and_type(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
409 let rw_fields_name = build_fields_name_type_from_schema(&self.schema)?;
410 let clickhouse_columns_desc: HashMap<String, SystemColumn> = clickhouse_columns_desc
411 .iter()
412 .map(|s| (s.name.clone(), s.clone()))
413 .collect();
414
415 if rw_fields_name.len().gt(&clickhouse_columns_desc.len()) {
416 return Err(SinkError::ClickHouse("The columns of the sink must be equal to or a superset of the target table's columns.".to_owned()));
417 }
418
419 for i in rw_fields_name {
420 let value = clickhouse_columns_desc.get(&i.0).ok_or_else(|| {
421 SinkError::ClickHouse(format!(
422 "Column name don't find in clickhouse, risingwave is {:?} ",
423 i.0
424 ))
425 })?;
426
427 Self::check_and_correct_column_type(&i.1, value)?;
428 }
429 Ok(())
430 }
431
432 fn check_pk_match(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
434 let mut clickhouse_pks: HashSet<String> = clickhouse_columns_desc
435 .iter()
436 .filter(|s| s.is_in_primary_key == 1)
437 .map(|s| s.name.clone())
438 .collect();
439
440 for (_, field) in self
441 .schema
442 .fields()
443 .iter()
444 .enumerate()
445 .filter(|(index, _)| self.pk_indices.contains(index))
446 {
447 if !clickhouse_pks.remove(&field.name) {
448 return Err(SinkError::ClickHouse(
449 "Clicklhouse and RisingWave pk is not match".to_owned(),
450 ));
451 }
452 }
453
454 if !clickhouse_pks.is_empty() {
455 return Err(SinkError::ClickHouse(
456 "Clicklhouse and RisingWave pk is not match".to_owned(),
457 ));
458 }
459 Ok(())
460 }
461
462 fn check_and_correct_column_type(
464 fields_type: &DataType,
465 ck_column: &SystemColumn,
466 ) -> Result<()> {
467 let is_match = match fields_type {
469 risingwave_common::types::DataType::Boolean => Ok(ck_column.r#type.contains("Bool")),
470 risingwave_common::types::DataType::Int16 => Ok(ck_column.r#type.contains("UInt16")
471 | ck_column.r#type.contains("Int16")
472 | ck_column.r#type.contains("Enum16")),
475 risingwave_common::types::DataType::Int32 => {
476 Ok(ck_column.r#type.contains("UInt32") | ck_column.r#type.contains("Int32"))
477 }
478 risingwave_common::types::DataType::Int64 => {
479 Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
480 }
481 risingwave_common::types::DataType::Float32 => Ok(ck_column.r#type.contains("Float32")),
482 risingwave_common::types::DataType::Float64 => Ok(ck_column.r#type.contains("Float64")),
483 risingwave_common::types::DataType::Decimal => Ok(ck_column.r#type.contains("Decimal")),
484 risingwave_common::types::DataType::Date => Ok(ck_column.r#type.contains("Date32")),
485 risingwave_common::types::DataType::Varchar => Ok(ck_column.r#type.contains("String")),
486 risingwave_common::types::DataType::Time => Err(SinkError::ClickHouse(
487 "clickhouse can not support Time".to_owned(),
488 )),
489 risingwave_common::types::DataType::Timestamp => Err(SinkError::ClickHouse(
490 "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
491 )),
492 risingwave_common::types::DataType::Timestamptz => {
493 Ok(ck_column.r#type.contains("DateTime64"))
494 }
495 risingwave_common::types::DataType::Interval => Err(SinkError::ClickHouse(
496 "clickhouse can not support Interval".to_owned(),
497 )),
498 risingwave_common::types::DataType::Struct(_) => Err(SinkError::ClickHouse(
499 "struct needs to be converted into a list".to_owned(),
500 )),
501 risingwave_common::types::DataType::List(list) => {
502 Self::check_and_correct_column_type(list.as_ref(), ck_column)?;
503 Ok(ck_column.r#type.contains("Array"))
504 }
505 risingwave_common::types::DataType::Bytea => Err(SinkError::ClickHouse(
506 "clickhouse can not support Bytea".to_owned(),
507 )),
508 risingwave_common::types::DataType::Jsonb => Ok(ck_column.r#type.contains("JSON")),
509 risingwave_common::types::DataType::Serial => {
510 Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
511 }
512 risingwave_common::types::DataType::Int256 => Err(SinkError::ClickHouse(
513 "clickhouse can not support Int256".to_owned(),
514 )),
515 risingwave_common::types::DataType::Map(_) => Err(SinkError::ClickHouse(
516 "clickhouse can not support Map".to_owned(),
517 )),
518 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
519 };
520 if !is_match? {
521 return Err(SinkError::ClickHouse(format!(
522 "Column type can not match name is {:?}, risingwave is {:?} and clickhouse is {:?}",
523 ck_column.name, fields_type, ck_column.r#type
524 )));
525 }
526
527 Ok(())
528 }
529}
530
531impl Sink for ClickHouseSink {
532 type Coordinator = DummySinkCommitCoordinator;
533 type LogSinker = DecoupleCheckpointLogSinkerOf<ClickHouseSinkWriter>;
534
535 const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &["commit_checkpoint_interval"];
536 const SINK_NAME: &'static str = CLICKHOUSE_SINK;
537
538 async fn validate(&self) -> Result<()> {
539 if !self.is_append_only && self.pk_indices.is_empty() {
541 return Err(SinkError::Config(anyhow!(
542 "Primary key not defined for upsert clickhouse sink (please define in `primary_key` field)"
543 )));
544 }
545
546 let client = self.config.common.build_client()?;
548
549 let (clickhouse_column, clickhouse_engine) =
550 query_column_engine_from_ck(client, &self.config).await?;
551 if clickhouse_engine.is_shared_tree() {
552 risingwave_common::license::Feature::ClickHouseSharedEngine
553 .check_available()
554 .map_err(|e| anyhow::anyhow!(e))?;
555 }
556
557 if !self.is_append_only
558 && !clickhouse_engine.is_collapsing_engine()
559 && !clickhouse_engine.is_delete_replacing_engine()
560 {
561 return match clickhouse_engine {
562 ClickHouseEngine::ReplicatedReplacingMergeTree(None) | ClickHouseEngine::ReplacingMergeTree(None) | ClickHouseEngine::SharedReplacingMergeTree(None) => {
563 Err(SinkError::ClickHouse("To enable upsert with a `ReplacingMergeTree`, you must set a `clickhouse.delete.column` to the UInt8 column in ClickHouse used to signify deletes. See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree#is_deleted for more information".to_owned()))
564 }
565 _ => Err(SinkError::ClickHouse("If you want to use upsert, please use either `VersionedCollapsingMergeTree`, `CollapsingMergeTree` or the `ReplacingMergeTree` in ClickHouse".to_owned()))
566 };
567 }
568
569 self.check_column_name_and_type(&clickhouse_column)?;
570 if !self.is_append_only {
571 self.check_pk_match(&clickhouse_column)?;
572 }
573
574 if self.config.common.commit_checkpoint_interval == 0 {
575 return Err(SinkError::Config(anyhow!(
576 "`commit_checkpoint_interval` must be greater than 0"
577 )));
578 }
579 Ok(())
580 }
581
582 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
583 ClickHouseConfig::from_btreemap(config.clone())?;
584 Ok(())
585 }
586
587 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
588 let writer = ClickHouseSinkWriter::new(
589 self.config.clone(),
590 self.schema.clone(),
591 self.pk_indices.clone(),
592 self.is_append_only,
593 )
594 .await?;
595 let commit_checkpoint_interval =
596 NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
597 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
598 );
599
600 Ok(DecoupleCheckpointLogSinkerOf::new(
601 writer,
602 SinkWriterMetrics::new(&writer_param),
603 commit_checkpoint_interval,
604 ))
605 }
606}
607pub struct ClickHouseSinkWriter {
608 pub config: ClickHouseConfig,
609 #[expect(dead_code)]
610 schema: Schema,
611 #[expect(dead_code)]
612 pk_indices: Vec<usize>,
613 client: ClickHouseClient,
614 #[expect(dead_code)]
615 is_append_only: bool,
616 column_correct_vec: Vec<ClickHouseSchemaFeature>,
618 rw_fields_name_after_calibration: Vec<String>,
619 clickhouse_engine: ClickHouseEngine,
620 inserter: Option<Insert<ClickHouseColumn>>,
621}
622#[derive(Debug)]
623struct ClickHouseSchemaFeature {
624 can_null: bool,
625 accuracy_time: u8,
627
628 accuracy_decimal: (u8, u8),
629}
630
631impl ClickHouseSinkWriter {
632 pub async fn new(
633 config: ClickHouseConfig,
634 schema: Schema,
635 pk_indices: Vec<usize>,
636 is_append_only: bool,
637 ) -> Result<Self> {
638 let client = config.common.build_client()?;
639
640 let (clickhouse_column, clickhouse_engine) =
641 query_column_engine_from_ck(client.clone(), &config).await?;
642
643 let column_correct_vec: Result<Vec<ClickHouseSchemaFeature>> = clickhouse_column
644 .iter()
645 .map(Self::build_column_correct_vec)
646 .collect();
647 let mut rw_fields_name_after_calibration = build_fields_name_type_from_schema(&schema)?
648 .iter()
649 .map(|(a, _)| a.clone())
650 .collect_vec();
651
652 if let Some(sign) = clickhouse_engine.get_sign_name() {
653 rw_fields_name_after_calibration.push(sign);
654 }
655 if let Some(delete_col) = clickhouse_engine.get_delete_col() {
656 rw_fields_name_after_calibration.push(delete_col);
657 }
658 Ok(Self {
659 config,
660 schema,
661 pk_indices,
662 client,
663 is_append_only,
664 column_correct_vec: column_correct_vec?,
665 rw_fields_name_after_calibration,
666 clickhouse_engine,
667 inserter: None,
668 })
669 }
670
671 fn build_column_correct_vec(ck_column: &SystemColumn) -> Result<ClickHouseSchemaFeature> {
674 let can_null = ck_column.r#type.contains("Nullable");
675 let accuracy_time = if ck_column.r#type.contains("DateTime64(") {
677 ck_column
678 .r#type
679 .split("DateTime64(")
680 .last()
681 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
682 .split(')')
683 .next()
684 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
685 .split(',')
686 .next()
687 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
688 .parse::<u8>()
689 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?
690 } else {
691 0_u8
692 };
693 let accuracy_decimal = if ck_column.r#type.contains("Decimal(") {
694 let decimal_all = ck_column
695 .r#type
696 .split("Decimal(")
697 .last()
698 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
699 .split(')')
700 .next()
701 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
702 .split(", ")
703 .collect_vec();
704 let length = decimal_all
705 .first()
706 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
707 .parse::<u8>()
708 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
709
710 if length > 38 {
711 return Err(SinkError::ClickHouse(
712 "RW don't support Decimal256".to_owned(),
713 ));
714 }
715
716 let scale = decimal_all
717 .last()
718 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
719 .parse::<u8>()
720 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
721 (length, scale)
722 } else {
723 (0_u8, 0_u8)
724 };
725 Ok(ClickHouseSchemaFeature {
726 can_null,
727 accuracy_time,
728 accuracy_decimal,
729 })
730 }
731
732 async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
733 if self.inserter.is_none() {
734 self.inserter = Some(self.client.insert_with_fields_name(
735 &self.config.common.table,
736 self.rw_fields_name_after_calibration.clone(),
737 )?);
738 }
739 for (op, row) in chunk.rows() {
740 let mut clickhouse_filed_vec = vec![];
741 for (index, data) in row.iter().enumerate() {
742 clickhouse_filed_vec.extend(ClickHouseFieldWithNull::from_scalar_ref(
743 data,
744 &self.column_correct_vec,
745 index,
746 )?);
747 }
748 match op {
749 Op::Insert | Op::UpdateInsert => {
750 if self.clickhouse_engine.is_collapsing_engine() {
751 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
752 ClickHouseField::Int8(1),
753 ));
754 }
755 if self.clickhouse_engine.is_delete_replacing_engine() {
756 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
757 ClickHouseField::Int8(0),
758 ))
759 }
760 }
761 Op::Delete | Op::UpdateDelete => {
762 if !self.clickhouse_engine.is_collapsing_engine()
763 && !self.clickhouse_engine.is_delete_replacing_engine()
764 {
765 return Err(SinkError::ClickHouse(
766 "Clickhouse engine don't support upsert".to_owned(),
767 ));
768 }
769 if self.clickhouse_engine.is_collapsing_engine() {
770 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
771 ClickHouseField::Int8(-1),
772 ));
773 }
774 if self.clickhouse_engine.is_delete_replacing_engine() {
775 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
776 ClickHouseField::Int8(1),
777 ))
778 }
779 }
780 }
781 let clickhouse_column = ClickHouseColumn {
782 row: clickhouse_filed_vec,
783 };
784 self.inserter
785 .as_mut()
786 .unwrap()
787 .write(&clickhouse_column)
788 .await?;
789 }
790 Ok(())
791 }
792}
793
794#[async_trait]
795impl SinkWriter for ClickHouseSinkWriter {
796 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
797 self.write(chunk).await
798 }
799
800 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
801 Ok(())
802 }
803
804 async fn abort(&mut self) -> Result<()> {
805 Ok(())
806 }
807
808 async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
809 if is_checkpoint && let Some(inserter) = self.inserter.take() {
810 inserter.end().await?;
811 }
812 Ok(())
813 }
814}
815
816#[derive(ClickHouseRow, Deserialize, Clone)]
817struct SystemColumn {
818 name: String,
819 r#type: String,
820 is_in_primary_key: u8,
821}
822
823#[derive(ClickHouseRow, Deserialize)]
824struct ClickhouseQueryEngine {
825 #[expect(dead_code)]
826 name: String,
827 engine: String,
828 create_table_query: String,
829}
830
831async fn query_column_engine_from_ck(
832 client: ClickHouseClient,
833 config: &ClickHouseConfig,
834) -> Result<(Vec<SystemColumn>, ClickHouseEngine)> {
835 let query_engine = QUERY_ENGINE;
836 let query_column = QUERY_COLUMN;
837
838 let clickhouse_engine = client
839 .query(query_engine)
840 .bind(config.common.database.clone())
841 .bind(config.common.table.clone())
842 .fetch_all::<ClickhouseQueryEngine>()
843 .await?;
844 let mut clickhouse_column = client
845 .query(query_column)
846 .bind(config.common.database.clone())
847 .bind(config.common.table.clone())
848 .bind("position")
849 .fetch_all::<SystemColumn>()
850 .await?;
851 if clickhouse_engine.is_empty() || clickhouse_column.is_empty() {
852 return Err(SinkError::ClickHouse(format!(
853 "table {:?}.{:?} is not find in clickhouse",
854 config.common.database, config.common.table
855 )));
856 }
857
858 let clickhouse_engine =
859 ClickHouseEngine::from_query_engine(clickhouse_engine.first().unwrap(), config)?;
860
861 if let Some(sign) = &clickhouse_engine.get_sign_name() {
862 clickhouse_column.retain(|a| sign.ne(&a.name))
863 }
864
865 if let Some(delete_col) = &clickhouse_engine.get_delete_col() {
866 clickhouse_column.retain(|a| delete_col.ne(&a.name))
867 }
868
869 Ok((clickhouse_column, clickhouse_engine))
870}
871
872#[derive(ClickHouseRow, Debug)]
874struct ClickHouseColumn {
875 row: Vec<ClickHouseFieldWithNull>,
876}
877
878#[derive(Debug)]
880enum ClickHouseField {
881 Int16(i16),
882 Int32(i32),
883 Int64(i64),
884 Serial(Serial),
885 Float32(f32),
886 Float64(f64),
887 String(String),
888 Bool(bool),
889 List(Vec<ClickHouseFieldWithNull>),
890 Int8(i8),
891 Decimal(ClickHouseDecimal),
892}
893#[derive(Debug)]
894enum ClickHouseDecimal {
895 Decimal32(i32),
896 Decimal64(i64),
897 Decimal128(i128),
898}
899impl Serialize for ClickHouseDecimal {
900 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
901 where
902 S: serde::Serializer,
903 {
904 match self {
905 ClickHouseDecimal::Decimal32(v) => serializer.serialize_i32(*v),
906 ClickHouseDecimal::Decimal64(v) => serializer.serialize_i64(*v),
907 ClickHouseDecimal::Decimal128(v) => serializer.serialize_i128(*v),
908 }
909 }
910}
911
912#[derive(Debug)]
914enum ClickHouseFieldWithNull {
915 WithSome(ClickHouseField),
916 WithoutSome(ClickHouseField),
917 None,
918}
919
920impl ClickHouseFieldWithNull {
921 pub fn from_scalar_ref(
922 data: Option<ScalarRefImpl<'_>>,
923 clickhouse_schema_feature_vec: &Vec<ClickHouseSchemaFeature>,
924 clickhouse_schema_feature_index: usize,
925 ) -> Result<Vec<ClickHouseFieldWithNull>> {
926 let clickhouse_schema_feature = clickhouse_schema_feature_vec
927 .get(clickhouse_schema_feature_index)
928 .ok_or_else(|| SinkError::ClickHouse(format!("No column found from clickhouse table schema, index is {clickhouse_schema_feature_index}")))?;
929 if data.is_none() {
930 if !clickhouse_schema_feature.can_null {
931 return Err(SinkError::ClickHouse(
932 "clickhouse column can not insert null".to_owned(),
933 ));
934 } else {
935 return Ok(vec![ClickHouseFieldWithNull::None]);
936 }
937 }
938 let data = match data.unwrap() {
939 ScalarRefImpl::Int16(v) => ClickHouseField::Int16(v),
940 ScalarRefImpl::Int32(v) => ClickHouseField::Int32(v),
941 ScalarRefImpl::Int64(v) => ClickHouseField::Int64(v),
942 ScalarRefImpl::Int256(_) => {
943 return Err(SinkError::ClickHouse(
944 "clickhouse can not support Int256".to_owned(),
945 ));
946 }
947 ScalarRefImpl::Serial(v) => ClickHouseField::Serial(v),
948 ScalarRefImpl::Float32(v) => ClickHouseField::Float32(v.into_inner()),
949 ScalarRefImpl::Float64(v) => ClickHouseField::Float64(v.into_inner()),
950 ScalarRefImpl::Utf8(v) => ClickHouseField::String(v.to_owned()),
951 ScalarRefImpl::Bool(v) => ClickHouseField::Bool(v),
952 ScalarRefImpl::Decimal(d) => {
953 let d = if let Decimal::Normalized(d) = d {
954 let scale =
955 clickhouse_schema_feature.accuracy_decimal.1 as i32 - d.scale() as i32;
956 if scale < 0 {
957 d.mantissa() / 10_i128.pow(scale.unsigned_abs())
958 } else {
959 d.mantissa() * 10_i128.pow(scale as u32)
960 }
961 } else if clickhouse_schema_feature.can_null {
962 warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse null!");
963 return Ok(vec![ClickHouseFieldWithNull::None]);
964 } else {
965 warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse 0!");
966 0_i128
967 };
968 if clickhouse_schema_feature.accuracy_decimal.0 <= 9 {
969 ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(d as i32))
970 } else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 {
971 ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(d as i64))
972 } else {
973 ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(d))
974 }
975 }
976 ScalarRefImpl::Interval(_) => {
977 return Err(SinkError::ClickHouse(
978 "clickhouse can not support Interval".to_owned(),
979 ));
980 }
981 ScalarRefImpl::Date(v) => {
982 let days = v.get_nums_days_unix_epoch();
983 ClickHouseField::Int32(days)
984 }
985 ScalarRefImpl::Time(_) => {
986 return Err(SinkError::ClickHouse(
987 "clickhouse can not support Time".to_owned(),
988 ));
989 }
990 ScalarRefImpl::Timestamp(_) => {
991 return Err(SinkError::ClickHouse(
992 "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
993 ));
994 }
995 ScalarRefImpl::Timestamptz(v) => {
996 let micros = v.timestamp_micros();
997 let ticks = match clickhouse_schema_feature.accuracy_time <= 6 {
998 true => {
999 micros / 10_i64.pow((6 - clickhouse_schema_feature.accuracy_time).into())
1000 }
1001 false => micros
1002 .checked_mul(
1003 10_i64.pow((clickhouse_schema_feature.accuracy_time - 6).into()),
1004 )
1005 .ok_or_else(|| SinkError::ClickHouse("DateTime64 overflow".to_owned()))?,
1006 };
1007 ClickHouseField::Int64(ticks)
1008 }
1009 ScalarRefImpl::Jsonb(v) => {
1010 let json_str = v.to_string();
1011 ClickHouseField::String(json_str)
1012 }
1013 ScalarRefImpl::Struct(v) => {
1014 let mut struct_vec = vec![];
1015 for (index, field) in v.iter_fields_ref().enumerate() {
1016 let a = Self::from_scalar_ref(
1017 field,
1018 clickhouse_schema_feature_vec,
1019 clickhouse_schema_feature_index + index,
1020 )?;
1021 struct_vec.push(ClickHouseFieldWithNull::WithoutSome(ClickHouseField::List(
1022 a,
1023 )));
1024 }
1025 return Ok(struct_vec);
1026 }
1027 ScalarRefImpl::List(v) => {
1028 let mut vec = vec![];
1029 for i in v.iter() {
1030 vec.extend(Self::from_scalar_ref(
1031 i,
1032 clickhouse_schema_feature_vec,
1033 clickhouse_schema_feature_index,
1034 )?)
1035 }
1036 return Ok(vec![ClickHouseFieldWithNull::WithoutSome(
1037 ClickHouseField::List(vec),
1038 )]);
1039 }
1040 ScalarRefImpl::Bytea(_) => {
1041 return Err(SinkError::ClickHouse(
1042 "clickhouse can not support Bytea".to_owned(),
1043 ));
1044 }
1045 ScalarRefImpl::Map(_) => {
1046 return Err(SinkError::ClickHouse(
1047 "clickhouse can not support Map".to_owned(),
1048 ));
1049 }
1050 ScalarRefImpl::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
1051 };
1052 let data = if clickhouse_schema_feature.can_null {
1053 vec![ClickHouseFieldWithNull::WithSome(data)]
1054 } else {
1055 vec![ClickHouseFieldWithNull::WithoutSome(data)]
1056 };
1057 Ok(data)
1058 }
1059}
1060
1061impl Serialize for ClickHouseField {
1062 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1063 where
1064 S: serde::Serializer,
1065 {
1066 match self {
1067 ClickHouseField::Int16(v) => serializer.serialize_i16(*v),
1068 ClickHouseField::Int32(v) => serializer.serialize_i32(*v),
1069 ClickHouseField::Int64(v) => serializer.serialize_i64(*v),
1070 ClickHouseField::Serial(v) => v.serialize(serializer),
1071 ClickHouseField::Float32(v) => serializer.serialize_f32(*v),
1072 ClickHouseField::Float64(v) => serializer.serialize_f64(*v),
1073 ClickHouseField::String(v) => serializer.serialize_str(v),
1074 ClickHouseField::Bool(v) => serializer.serialize_bool(*v),
1075 ClickHouseField::List(v) => {
1076 let mut s = serializer.serialize_seq(Some(v.len()))?;
1077 for i in v {
1078 s.serialize_element(i)?;
1079 }
1080 s.end()
1081 }
1082 ClickHouseField::Decimal(v) => v.serialize(serializer),
1083 ClickHouseField::Int8(v) => serializer.serialize_i8(*v),
1084 }
1085 }
1086}
1087impl Serialize for ClickHouseFieldWithNull {
1088 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1089 where
1090 S: serde::Serializer,
1091 {
1092 match self {
1093 ClickHouseFieldWithNull::WithSome(v) => serializer.serialize_some(v),
1094 ClickHouseFieldWithNull::WithoutSome(v) => v.serialize(serializer),
1095 ClickHouseFieldWithNull::None => serializer.serialize_none(),
1096 }
1097 }
1098}
1099impl Serialize for ClickHouseColumn {
1100 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1101 where
1102 S: serde::Serializer,
1103 {
1104 let mut s = serializer.serialize_struct("useless", self.row.len())?;
1105 for data in &self.row {
1106 s.serialize_field("useless", &data)?
1107 }
1108 s.end()
1109 }
1110}
1111
1112pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result<Vec<(String, DataType)>> {
1115 let mut vec = vec![];
1116 for field in schema.fields() {
1117 if let DataType::Struct(st) = &field.data_type {
1118 for (name, data_type) in st.iter() {
1119 if matches!(data_type, DataType::Struct(_)) {
1120 return Err(SinkError::ClickHouse(
1121 "Only one level of nesting is supported for struct".to_owned(),
1122 ));
1123 } else {
1124 vec.push((
1125 format!("{}.{}", field.name, name),
1126 DataType::List(Box::new(data_type.clone())),
1127 ))
1128 }
1129 }
1130 } else {
1131 vec.push((field.name.clone(), field.data_type()));
1132 }
1133 }
1134 Ok(vec)
1135}