1use core::fmt::Debug;
16use core::num::NonZeroU64;
17use std::collections::{BTreeMap, HashMap, HashSet};
18
19use anyhow::anyhow;
20use clickhouse::insert::Insert;
21use clickhouse::{Client as ClickHouseClient, Row as ClickHouseRow};
22use itertools::Itertools;
23use phf::{Set, phf_set};
24use risingwave_common::array::{Op, StreamChunk};
25use risingwave_common::catalog::Schema;
26use risingwave_common::row::Row;
27use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
28use serde::Serialize;
29use serde::ser::{SerializeSeq, SerializeStruct};
30use serde_derive::Deserialize;
31use serde_with::{DisplayFromStr, serde_as};
32use thiserror_ext::AsReport;
33use tonic::async_trait;
34use tracing::warn;
35use with_options::WithOptions;
36
37use super::decouple_checkpoint_log_sink::{
38 DecoupleCheckpointLogSinkerOf, default_commit_checkpoint_interval,
39};
40use super::writer::SinkWriter;
41use super::{SinkWriterMetrics, SinkWriterParam};
42use crate::enforce_secret::EnforceSecret;
43use crate::error::ConnectorResult;
44use crate::sink::{
45 Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, Sink, SinkError, SinkParam,
46};
47
48const QUERY_ENGINE: &str =
49 "select distinct ?fields from system.tables where database = ? and name = ?";
50const QUERY_COLUMN: &str =
51 "select distinct ?fields from system.columns where database = ? and table = ? order by ?";
52pub const CLICKHOUSE_SINK: &str = "clickhouse";
53
54const ALLOW_EXPERIMENTAL_JSON_TYPE: &str = "allow_experimental_json_type";
55const INPUT_FORMAT_BINARY_READ_JSON_AS_STRING: &str = "input_format_binary_read_json_as_string";
56const OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING: &str = "output_format_binary_write_json_as_string";
57
58#[serde_as]
59#[derive(Deserialize, Debug, Clone, WithOptions)]
60pub struct ClickHouseCommon {
61 #[serde(rename = "clickhouse.url")]
62 pub url: String,
63 #[serde(rename = "clickhouse.user")]
64 pub user: String,
65 #[serde(rename = "clickhouse.password")]
66 pub password: String,
67 #[serde(rename = "clickhouse.database")]
68 pub database: String,
69 #[serde(rename = "clickhouse.table")]
70 pub table: String,
71 #[serde(rename = "clickhouse.delete.column")]
72 pub delete_column: Option<String>,
73 #[serde(default = "default_commit_checkpoint_interval")]
75 #[serde_as(as = "DisplayFromStr")]
76 #[with_option(allow_alter_on_fly)]
77 pub commit_checkpoint_interval: u64,
78}
79
80impl EnforceSecret for ClickHouseCommon {
81 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
82 "clickhouse.password", "clickhouse.user"
83 };
84}
85
86#[allow(clippy::enum_variant_names)]
87#[derive(Debug)]
88enum ClickHouseEngine {
89 MergeTree,
90 ReplacingMergeTree(Option<String>),
91 SummingMergeTree,
92 AggregatingMergeTree,
93 CollapsingMergeTree(String),
94 VersionedCollapsingMergeTree(String),
95 GraphiteMergeTree,
96 ReplicatedMergeTree,
97 ReplicatedReplacingMergeTree(Option<String>),
98 ReplicatedSummingMergeTree,
99 ReplicatedAggregatingMergeTree,
100 ReplicatedCollapsingMergeTree(String),
101 ReplicatedVersionedCollapsingMergeTree(String),
102 ReplicatedGraphiteMergeTree,
103 SharedMergeTree,
104 SharedReplacingMergeTree(Option<String>),
105 SharedSummingMergeTree,
106 SharedAggregatingMergeTree,
107 SharedCollapsingMergeTree(String),
108 SharedVersionedCollapsingMergeTree(String),
109 SharedGraphiteMergeTree,
110 Null,
111}
112impl ClickHouseEngine {
113 pub fn is_collapsing_engine(&self) -> bool {
114 matches!(
115 self,
116 ClickHouseEngine::CollapsingMergeTree(_)
117 | ClickHouseEngine::VersionedCollapsingMergeTree(_)
118 | ClickHouseEngine::ReplicatedCollapsingMergeTree(_)
119 | ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(_)
120 | ClickHouseEngine::SharedCollapsingMergeTree(_)
121 | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
122 )
123 }
124
125 pub fn is_delete_replacing_engine(&self) -> bool {
126 match self {
127 ClickHouseEngine::ReplacingMergeTree(delete_col) => delete_col.is_some(),
128 ClickHouseEngine::ReplicatedReplacingMergeTree(delete_col) => delete_col.is_some(),
129 ClickHouseEngine::SharedReplacingMergeTree(delete_col) => delete_col.is_some(),
130 _ => false,
131 }
132 }
133
134 pub fn get_delete_col(&self) -> Option<String> {
135 match self {
136 ClickHouseEngine::ReplacingMergeTree(Some(delete_col)) => Some(delete_col.clone()),
137 ClickHouseEngine::ReplicatedReplacingMergeTree(Some(delete_col)) => {
138 Some(delete_col.clone())
139 }
140 ClickHouseEngine::SharedReplacingMergeTree(Some(delete_col)) => {
141 Some(delete_col.clone())
142 }
143 _ => None,
144 }
145 }
146
147 pub fn get_sign_name(&self) -> Option<String> {
148 match self {
149 ClickHouseEngine::CollapsingMergeTree(sign_name) => Some(sign_name.clone()),
150 ClickHouseEngine::VersionedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
151 ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
152 ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(sign_name) => {
153 Some(sign_name.clone())
154 }
155 ClickHouseEngine::SharedCollapsingMergeTree(sign_name) => Some(sign_name.clone()),
156 ClickHouseEngine::SharedVersionedCollapsingMergeTree(sign_name) => {
157 Some(sign_name.clone())
158 }
159 _ => None,
160 }
161 }
162
163 pub fn is_shared_tree(&self) -> bool {
164 matches!(
165 self,
166 ClickHouseEngine::SharedMergeTree
167 | ClickHouseEngine::SharedReplacingMergeTree(_)
168 | ClickHouseEngine::SharedSummingMergeTree
169 | ClickHouseEngine::SharedAggregatingMergeTree
170 | ClickHouseEngine::SharedCollapsingMergeTree(_)
171 | ClickHouseEngine::SharedVersionedCollapsingMergeTree(_)
172 | ClickHouseEngine::SharedGraphiteMergeTree
173 )
174 }
175
176 pub fn from_query_engine(
177 engine_name: &ClickhouseQueryEngine,
178 config: &ClickHouseConfig,
179 ) -> Result<Self> {
180 match engine_name.engine.as_str() {
181 "MergeTree" => Ok(ClickHouseEngine::MergeTree),
182 "Null" => Ok(ClickHouseEngine::Null),
183 "ReplacingMergeTree" => {
184 let delete_column = config.common.delete_column.clone();
185 Ok(ClickHouseEngine::ReplacingMergeTree(delete_column))
186 }
187 "SummingMergeTree" => Ok(ClickHouseEngine::SummingMergeTree),
188 "AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree),
189 "VersionedCollapsingMergeTree" => {
191 let sign_name = engine_name
192 .create_table_query
193 .split("VersionedCollapsingMergeTree(")
194 .last()
195 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
196 .split(',')
197 .next()
198 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
199 .trim()
200 .to_owned();
201 Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name))
202 }
203 "CollapsingMergeTree" => {
205 let sign_name = engine_name
206 .create_table_query
207 .split("CollapsingMergeTree(")
208 .last()
209 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
210 .split(')')
211 .next()
212 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
213 .trim()
214 .to_owned();
215 Ok(ClickHouseEngine::CollapsingMergeTree(sign_name))
216 }
217 "GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree),
218 "ReplicatedMergeTree" => Ok(ClickHouseEngine::ReplicatedMergeTree),
219 "ReplicatedReplacingMergeTree" => {
220 let delete_column = config.common.delete_column.clone();
221 Ok(ClickHouseEngine::ReplicatedReplacingMergeTree(
222 delete_column,
223 ))
224 }
225 "ReplicatedSummingMergeTree" => Ok(ClickHouseEngine::ReplicatedSummingMergeTree),
226 "ReplicatedAggregatingMergeTree" => {
227 Ok(ClickHouseEngine::ReplicatedAggregatingMergeTree)
228 }
229 "ReplicatedVersionedCollapsingMergeTree" => {
231 let sign_name = engine_name
232 .create_table_query
233 .split("ReplicatedVersionedCollapsingMergeTree(")
234 .last()
235 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
236 .split(',')
237 .rev()
238 .nth(1)
239 .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
240 .trim()
241 .to_owned();
242 Ok(ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(
243 sign_name,
244 ))
245 }
246 "ReplicatedCollapsingMergeTree" => {
248 let sign_name = engine_name
249 .create_table_query
250 .split("ReplicatedCollapsingMergeTree(")
251 .last()
252 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
253 .split(')')
254 .next()
255 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
256 .split(',')
257 .next_back()
258 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
259 .trim()
260 .to_owned();
261 Ok(ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name))
262 }
263 "ReplicatedGraphiteMergeTree" => Ok(ClickHouseEngine::ReplicatedGraphiteMergeTree),
264 "SharedMergeTree" => Ok(ClickHouseEngine::SharedMergeTree),
265 "SharedReplacingMergeTree" => {
266 let delete_column = config.common.delete_column.clone();
267 Ok(ClickHouseEngine::SharedReplacingMergeTree(delete_column))
268 }
269 "SharedSummingMergeTree" => Ok(ClickHouseEngine::SharedSummingMergeTree),
270 "SharedAggregatingMergeTree" => Ok(ClickHouseEngine::SharedAggregatingMergeTree),
271 "SharedVersionedCollapsingMergeTree" => {
273 let sign_name = engine_name
274 .create_table_query
275 .split("SharedVersionedCollapsingMergeTree(")
276 .last()
277 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
278 .split(',')
279 .rev()
280 .nth(1)
281 .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_owned()))?
282 .trim()
283 .to_owned();
284 Ok(ClickHouseEngine::SharedVersionedCollapsingMergeTree(
285 sign_name,
286 ))
287 }
288 "SharedCollapsingMergeTree" => {
290 let sign_name = engine_name
291 .create_table_query
292 .split("SharedCollapsingMergeTree(")
293 .last()
294 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
295 .split(')')
296 .next()
297 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
298 .split(',')
299 .next_back()
300 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
301 .trim()
302 .to_owned();
303 Ok(ClickHouseEngine::SharedCollapsingMergeTree(sign_name))
304 }
305 "SharedGraphiteMergeTree" => Ok(ClickHouseEngine::SharedGraphiteMergeTree),
306 _ => Err(SinkError::ClickHouse(format!(
307 "Cannot find clickhouse engine {:?}",
308 engine_name.engine
309 ))),
310 }
311 }
312}
313
314impl ClickHouseCommon {
315 pub(crate) fn build_client(&self) -> ConnectorResult<ClickHouseClient> {
316 let client = ClickHouseClient::default() .with_url(&self.url)
318 .with_user(&self.user)
319 .with_password(&self.password)
320 .with_database(&self.database)
321 .with_option(ALLOW_EXPERIMENTAL_JSON_TYPE, "1")
322 .with_option(INPUT_FORMAT_BINARY_READ_JSON_AS_STRING, "1")
323 .with_option(OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING, "1");
324 Ok(client)
325 }
326}
327
328#[serde_as]
329#[derive(Clone, Debug, Deserialize, WithOptions)]
330pub struct ClickHouseConfig {
331 #[serde(flatten)]
332 pub common: ClickHouseCommon,
333
334 pub r#type: String, }
336
337impl EnforceSecret for ClickHouseConfig {
338 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
339 ClickHouseCommon::enforce_one(prop)
340 }
341
342 fn enforce_secret<'a>(
343 prop_iter: impl Iterator<Item = &'a str>,
344 ) -> crate::error::ConnectorResult<()> {
345 for prop in prop_iter {
346 ClickHouseCommon::enforce_one(prop)?;
347 }
348 Ok(())
349 }
350}
351
352#[derive(Clone, Debug)]
353pub struct ClickHouseSink {
354 pub config: ClickHouseConfig,
355 schema: Schema,
356 pk_indices: Vec<usize>,
357 is_append_only: bool,
358}
359
360impl EnforceSecret for ClickHouseSink {
361 fn enforce_secret<'a>(
362 prop_iter: impl Iterator<Item = &'a str>,
363 ) -> crate::error::ConnectorResult<()> {
364 for prop in prop_iter {
365 ClickHouseConfig::enforce_one(prop)?;
366 }
367 Ok(())
368 }
369}
370
371impl ClickHouseConfig {
372 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
373 let config =
374 serde_json::from_value::<ClickHouseConfig>(serde_json::to_value(properties).unwrap())
375 .map_err(|e| SinkError::Config(anyhow!(e)))?;
376 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
377 return Err(SinkError::Config(anyhow!(
378 "`{}` must be {}, or {}",
379 SINK_TYPE_OPTION,
380 SINK_TYPE_APPEND_ONLY,
381 SINK_TYPE_UPSERT
382 )));
383 }
384 Ok(config)
385 }
386}
387
388impl TryFrom<SinkParam> for ClickHouseSink {
389 type Error = SinkError;
390
391 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
392 let schema = param.schema();
393 let config = ClickHouseConfig::from_btreemap(param.properties)?;
394 Ok(Self {
395 config,
396 schema,
397 pk_indices: param.downstream_pk,
398 is_append_only: param.sink_type.is_append_only(),
399 })
400 }
401}
402
403impl ClickHouseSink {
404 fn check_column_name_and_type(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
406 let rw_fields_name = build_fields_name_type_from_schema(&self.schema)?;
407 let clickhouse_columns_desc: HashMap<String, SystemColumn> = clickhouse_columns_desc
408 .iter()
409 .map(|s| (s.name.clone(), s.clone()))
410 .collect();
411
412 if rw_fields_name.len().gt(&clickhouse_columns_desc.len()) {
413 return Err(SinkError::ClickHouse("The columns of the sink must be equal to or a superset of the target table's columns.".to_owned()));
414 }
415
416 for i in rw_fields_name {
417 let value = clickhouse_columns_desc.get(&i.0).ok_or_else(|| {
418 SinkError::ClickHouse(format!(
419 "Column name don't find in clickhouse, risingwave is {:?} ",
420 i.0
421 ))
422 })?;
423
424 Self::check_and_correct_column_type(&i.1, value)?;
425 }
426 Ok(())
427 }
428
429 fn check_pk_match(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
431 let mut clickhouse_pks: HashSet<String> = clickhouse_columns_desc
432 .iter()
433 .filter(|s| s.is_in_primary_key == 1)
434 .map(|s| s.name.clone())
435 .collect();
436
437 for (_, field) in self
438 .schema
439 .fields()
440 .iter()
441 .enumerate()
442 .filter(|(index, _)| self.pk_indices.contains(index))
443 {
444 if !clickhouse_pks.remove(&field.name) {
445 return Err(SinkError::ClickHouse(
446 "Clicklhouse and RisingWave pk is not match".to_owned(),
447 ));
448 }
449 }
450
451 if !clickhouse_pks.is_empty() {
452 return Err(SinkError::ClickHouse(
453 "Clicklhouse and RisingWave pk is not match".to_owned(),
454 ));
455 }
456 Ok(())
457 }
458
459 fn check_and_correct_column_type(
461 fields_type: &DataType,
462 ck_column: &SystemColumn,
463 ) -> Result<()> {
464 let is_match = match fields_type {
466 risingwave_common::types::DataType::Boolean => Ok(ck_column.r#type.contains("Bool")),
467 risingwave_common::types::DataType::Int16 => Ok(ck_column.r#type.contains("UInt16")
468 | ck_column.r#type.contains("Int16")
469 | ck_column.r#type.contains("Enum16")),
472 risingwave_common::types::DataType::Int32 => {
473 Ok(ck_column.r#type.contains("UInt32") | ck_column.r#type.contains("Int32"))
474 }
475 risingwave_common::types::DataType::Int64 => {
476 Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
477 }
478 risingwave_common::types::DataType::Float32 => Ok(ck_column.r#type.contains("Float32")),
479 risingwave_common::types::DataType::Float64 => Ok(ck_column.r#type.contains("Float64")),
480 risingwave_common::types::DataType::Decimal => Ok(ck_column.r#type.contains("Decimal")),
481 risingwave_common::types::DataType::Date => Ok(ck_column.r#type.contains("Date32")),
482 risingwave_common::types::DataType::Varchar => Ok(ck_column.r#type.contains("String")),
483 risingwave_common::types::DataType::Time => Err(SinkError::ClickHouse(
484 "TIME is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
485 )),
486 risingwave_common::types::DataType::Timestamp => Err(SinkError::ClickHouse(
487 "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
488 )),
489 risingwave_common::types::DataType::Timestamptz => {
490 Ok(ck_column.r#type.contains("DateTime64"))
491 }
492 risingwave_common::types::DataType::Interval => Err(SinkError::ClickHouse(
493 "INTERVAL is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
494 )),
495 risingwave_common::types::DataType::Struct(_) => Err(SinkError::ClickHouse(
496 "struct needs to be converted into a list".to_owned(),
497 )),
498 risingwave_common::types::DataType::List(list) => {
499 Self::check_and_correct_column_type(list.as_ref(), ck_column)?;
500 Ok(ck_column.r#type.contains("Array"))
501 }
502 risingwave_common::types::DataType::Bytea => Err(SinkError::ClickHouse(
503 "BYTEA is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
504 )),
505 risingwave_common::types::DataType::Jsonb => Ok(ck_column.r#type.contains("JSON")),
506 risingwave_common::types::DataType::Serial => {
507 Ok(ck_column.r#type.contains("UInt64") | ck_column.r#type.contains("Int64"))
508 }
509 risingwave_common::types::DataType::Int256 => Err(SinkError::ClickHouse(
510 "INT256 is not supported for ClickHouse sink.".to_owned(),
511 )),
512 risingwave_common::types::DataType::Map(_) => Err(SinkError::ClickHouse(
513 "MAP is not supported for ClickHouse sink.".to_owned(),
514 )),
515 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
516 };
517 if !is_match? {
518 return Err(SinkError::ClickHouse(format!(
519 "Column type mismatch for column {:?}: RisingWave type is {:?}, ClickHouse type is {:?}",
520 ck_column.name, fields_type, ck_column.r#type
521 )));
522 }
523
524 Ok(())
525 }
526}
527
528impl Sink for ClickHouseSink {
529 type LogSinker = DecoupleCheckpointLogSinkerOf<ClickHouseSinkWriter>;
530
531 const SINK_NAME: &'static str = CLICKHOUSE_SINK;
532
533 async fn validate(&self) -> Result<()> {
534 if !self.is_append_only && self.pk_indices.is_empty() {
536 return Err(SinkError::Config(anyhow!(
537 "Primary key not defined for upsert clickhouse sink (please define in `primary_key` field)"
538 )));
539 }
540
541 let client = self.config.common.build_client()?;
543
544 let (clickhouse_column, clickhouse_engine) =
545 query_column_engine_from_ck(client, &self.config).await?;
546 if clickhouse_engine.is_shared_tree() {
547 risingwave_common::license::Feature::ClickHouseSharedEngine
548 .check_available()
549 .map_err(|e| anyhow::anyhow!(e))?;
550 }
551
552 if !self.is_append_only
553 && !clickhouse_engine.is_collapsing_engine()
554 && !clickhouse_engine.is_delete_replacing_engine()
555 {
556 return match clickhouse_engine {
557 ClickHouseEngine::ReplicatedReplacingMergeTree(None) | ClickHouseEngine::ReplacingMergeTree(None) | ClickHouseEngine::SharedReplacingMergeTree(None) => {
558 Err(SinkError::ClickHouse("To enable upsert with a `ReplacingMergeTree`, you must set a `clickhouse.delete.column` to the UInt8 column in ClickHouse used to signify deletes. See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree#is_deleted for more information".to_owned()))
559 }
560 _ => Err(SinkError::ClickHouse("If you want to use upsert, please use either `VersionedCollapsingMergeTree`, `CollapsingMergeTree` or the `ReplacingMergeTree` in ClickHouse".to_owned()))
561 };
562 }
563
564 self.check_column_name_and_type(&clickhouse_column)?;
565 if !self.is_append_only {
566 self.check_pk_match(&clickhouse_column)?;
567 }
568
569 if self.config.common.commit_checkpoint_interval == 0 {
570 return Err(SinkError::Config(anyhow!(
571 "`commit_checkpoint_interval` must be greater than 0"
572 )));
573 }
574 Ok(())
575 }
576
577 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
578 ClickHouseConfig::from_btreemap(config.clone())?;
579 Ok(())
580 }
581
582 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
583 let writer = ClickHouseSinkWriter::new(
584 self.config.clone(),
585 self.schema.clone(),
586 self.pk_indices.clone(),
587 self.is_append_only,
588 )
589 .await?;
590 let commit_checkpoint_interval =
591 NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
592 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
593 );
594
595 Ok(DecoupleCheckpointLogSinkerOf::new(
596 writer,
597 SinkWriterMetrics::new(&writer_param),
598 commit_checkpoint_interval,
599 ))
600 }
601}
602pub struct ClickHouseSinkWriter {
603 pub config: ClickHouseConfig,
604 #[expect(dead_code)]
605 schema: Schema,
606 #[expect(dead_code)]
607 pk_indices: Vec<usize>,
608 client: ClickHouseClient,
609 #[expect(dead_code)]
610 is_append_only: bool,
611 column_correct_vec: Vec<ClickHouseSchemaFeature>,
613 rw_fields_name_after_calibration: Vec<String>,
614 clickhouse_engine: ClickHouseEngine,
615 inserter: Option<Insert<ClickHouseColumn>>,
616}
617#[derive(Debug)]
618struct ClickHouseSchemaFeature {
619 can_null: bool,
620 accuracy_time: u8,
622
623 accuracy_decimal: (u8, u8),
624}
625
626impl ClickHouseSinkWriter {
627 pub async fn new(
628 config: ClickHouseConfig,
629 schema: Schema,
630 pk_indices: Vec<usize>,
631 is_append_only: bool,
632 ) -> Result<Self> {
633 let client = config.common.build_client()?;
634
635 let (clickhouse_column, clickhouse_engine) =
636 query_column_engine_from_ck(client.clone(), &config).await?;
637
638 let column_correct_vec: Result<Vec<ClickHouseSchemaFeature>> = clickhouse_column
639 .iter()
640 .map(Self::build_column_correct_vec)
641 .collect();
642 let mut rw_fields_name_after_calibration = build_fields_name_type_from_schema(&schema)?
643 .iter()
644 .map(|(a, _)| a.clone())
645 .collect_vec();
646
647 if let Some(sign) = clickhouse_engine.get_sign_name() {
648 rw_fields_name_after_calibration.push(sign);
649 }
650 if let Some(delete_col) = clickhouse_engine.get_delete_col() {
651 rw_fields_name_after_calibration.push(delete_col);
652 }
653 Ok(Self {
654 config,
655 schema,
656 pk_indices,
657 client,
658 is_append_only,
659 column_correct_vec: column_correct_vec?,
660 rw_fields_name_after_calibration,
661 clickhouse_engine,
662 inserter: None,
663 })
664 }
665
666 fn build_column_correct_vec(ck_column: &SystemColumn) -> Result<ClickHouseSchemaFeature> {
669 let can_null = ck_column.r#type.contains("Nullable");
670 let accuracy_time = if ck_column.r#type.contains("DateTime64(") {
672 ck_column
673 .r#type
674 .split("DateTime64(")
675 .last()
676 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
677 .split(')')
678 .next()
679 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
680 .split(',')
681 .next()
682 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
683 .parse::<u8>()
684 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?
685 } else {
686 0_u8
687 };
688 let accuracy_decimal = if ck_column.r#type.contains("Decimal(") {
689 let decimal_all = ck_column
690 .r#type
691 .split("Decimal(")
692 .last()
693 .ok_or_else(|| SinkError::ClickHouse("must have last".to_owned()))?
694 .split(')')
695 .next()
696 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
697 .split(", ")
698 .collect_vec();
699 let length = decimal_all
700 .first()
701 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
702 .parse::<u8>()
703 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
704
705 if length > 38 {
706 return Err(SinkError::ClickHouse(
707 "RW don't support Decimal256".to_owned(),
708 ));
709 }
710
711 let scale = decimal_all
712 .last()
713 .ok_or_else(|| SinkError::ClickHouse("must have next".to_owned()))?
714 .parse::<u8>()
715 .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?;
716 (length, scale)
717 } else {
718 (0_u8, 0_u8)
719 };
720 Ok(ClickHouseSchemaFeature {
721 can_null,
722 accuracy_time,
723 accuracy_decimal,
724 })
725 }
726
727 async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
728 if self.inserter.is_none() {
729 self.inserter = Some(self.client.insert_with_fields_name(
730 &self.config.common.table,
731 self.rw_fields_name_after_calibration.clone(),
732 )?);
733 }
734 for (op, row) in chunk.rows() {
735 let mut clickhouse_filed_vec = vec![];
736 for (index, data) in row.iter().enumerate() {
737 clickhouse_filed_vec.extend(ClickHouseFieldWithNull::from_scalar_ref(
738 data,
739 &self.column_correct_vec,
740 index,
741 )?);
742 }
743 match op {
744 Op::Insert | Op::UpdateInsert => {
745 if self.clickhouse_engine.is_collapsing_engine() {
746 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
747 ClickHouseField::Int8(1),
748 ));
749 }
750 if self.clickhouse_engine.is_delete_replacing_engine() {
751 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
752 ClickHouseField::Int8(0),
753 ))
754 }
755 }
756 Op::Delete | Op::UpdateDelete => {
757 if !self.clickhouse_engine.is_collapsing_engine()
758 && !self.clickhouse_engine.is_delete_replacing_engine()
759 {
760 return Err(SinkError::ClickHouse(
761 "Clickhouse engine don't support upsert".to_owned(),
762 ));
763 }
764 if self.clickhouse_engine.is_collapsing_engine() {
765 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
766 ClickHouseField::Int8(-1),
767 ));
768 }
769 if self.clickhouse_engine.is_delete_replacing_engine() {
770 clickhouse_filed_vec.push(ClickHouseFieldWithNull::WithoutSome(
771 ClickHouseField::Int8(1),
772 ))
773 }
774 }
775 }
776 let clickhouse_column = ClickHouseColumn {
777 row: clickhouse_filed_vec,
778 };
779 self.inserter
780 .as_mut()
781 .unwrap()
782 .write(&clickhouse_column)
783 .await?;
784 }
785 Ok(())
786 }
787}
788
789#[async_trait]
790impl SinkWriter for ClickHouseSinkWriter {
791 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
792 self.write(chunk).await
793 }
794
795 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
796 Ok(())
797 }
798
799 async fn abort(&mut self) -> Result<()> {
800 Ok(())
801 }
802
803 async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
804 if is_checkpoint && let Some(inserter) = self.inserter.take() {
805 inserter.end().await?;
806 }
807 Ok(())
808 }
809}
810
811#[derive(ClickHouseRow, Deserialize, Clone)]
812struct SystemColumn {
813 name: String,
814 r#type: String,
815 is_in_primary_key: u8,
816}
817
818#[derive(ClickHouseRow, Deserialize)]
819struct ClickhouseQueryEngine {
820 #[expect(dead_code)]
821 name: String,
822 engine: String,
823 create_table_query: String,
824}
825
826async fn query_column_engine_from_ck(
827 client: ClickHouseClient,
828 config: &ClickHouseConfig,
829) -> Result<(Vec<SystemColumn>, ClickHouseEngine)> {
830 let query_engine = QUERY_ENGINE;
831 let query_column = QUERY_COLUMN;
832
833 let clickhouse_engine = client
834 .query(query_engine)
835 .bind(config.common.database.clone())
836 .bind(config.common.table.clone())
837 .fetch_all::<ClickhouseQueryEngine>()
838 .await?;
839 let mut clickhouse_column = client
840 .query(query_column)
841 .bind(config.common.database.clone())
842 .bind(config.common.table.clone())
843 .bind("position")
844 .fetch_all::<SystemColumn>()
845 .await?;
846 if clickhouse_engine.is_empty() || clickhouse_column.is_empty() {
847 return Err(SinkError::ClickHouse(format!(
848 "table {:?}.{:?} is not find in clickhouse",
849 config.common.database, config.common.table
850 )));
851 }
852
853 let clickhouse_engine =
854 ClickHouseEngine::from_query_engine(clickhouse_engine.first().unwrap(), config)?;
855
856 if let Some(sign) = &clickhouse_engine.get_sign_name() {
857 clickhouse_column.retain(|a| sign.ne(&a.name))
858 }
859
860 if let Some(delete_col) = &clickhouse_engine.get_delete_col() {
861 clickhouse_column.retain(|a| delete_col.ne(&a.name))
862 }
863
864 Ok((clickhouse_column, clickhouse_engine))
865}
866
867#[derive(ClickHouseRow, Debug)]
869struct ClickHouseColumn {
870 row: Vec<ClickHouseFieldWithNull>,
871}
872
873#[derive(Debug)]
875enum ClickHouseField {
876 Int16(i16),
877 Int32(i32),
878 Int64(i64),
879 Serial(Serial),
880 Float32(f32),
881 Float64(f64),
882 String(String),
883 Bool(bool),
884 List(Vec<ClickHouseFieldWithNull>),
885 Int8(i8),
886 Decimal(ClickHouseDecimal),
887}
888#[derive(Debug)]
889enum ClickHouseDecimal {
890 Decimal32(i32),
891 Decimal64(i64),
892 Decimal128(i128),
893}
894impl Serialize for ClickHouseDecimal {
895 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
896 where
897 S: serde::Serializer,
898 {
899 match self {
900 ClickHouseDecimal::Decimal32(v) => serializer.serialize_i32(*v),
901 ClickHouseDecimal::Decimal64(v) => serializer.serialize_i64(*v),
902 ClickHouseDecimal::Decimal128(v) => serializer.serialize_i128(*v),
903 }
904 }
905}
906
907#[derive(Debug)]
909enum ClickHouseFieldWithNull {
910 WithSome(ClickHouseField),
911 WithoutSome(ClickHouseField),
912 None,
913}
914
915impl ClickHouseFieldWithNull {
916 pub fn from_scalar_ref(
917 data: Option<ScalarRefImpl<'_>>,
918 clickhouse_schema_feature_vec: &Vec<ClickHouseSchemaFeature>,
919 clickhouse_schema_feature_index: usize,
920 ) -> Result<Vec<ClickHouseFieldWithNull>> {
921 let clickhouse_schema_feature = clickhouse_schema_feature_vec
922 .get(clickhouse_schema_feature_index)
923 .ok_or_else(|| SinkError::ClickHouse(format!("No column found from clickhouse table schema, index is {clickhouse_schema_feature_index}")))?;
924 if data.is_none() {
925 if !clickhouse_schema_feature.can_null {
926 return Err(SinkError::ClickHouse(
927 "Cannot insert null value into non-nullable ClickHouse column".to_owned(),
928 ));
929 } else {
930 return Ok(vec![ClickHouseFieldWithNull::None]);
931 }
932 }
933 let data = match data.unwrap() {
934 ScalarRefImpl::Int16(v) => ClickHouseField::Int16(v),
935 ScalarRefImpl::Int32(v) => ClickHouseField::Int32(v),
936 ScalarRefImpl::Int64(v) => ClickHouseField::Int64(v),
937 ScalarRefImpl::Int256(_) => {
938 return Err(SinkError::ClickHouse(
939 "INT256 is not supported for ClickHouse sink.".to_owned(),
940 ));
941 }
942 ScalarRefImpl::Serial(v) => ClickHouseField::Serial(v),
943 ScalarRefImpl::Float32(v) => ClickHouseField::Float32(v.into_inner()),
944 ScalarRefImpl::Float64(v) => ClickHouseField::Float64(v.into_inner()),
945 ScalarRefImpl::Utf8(v) => ClickHouseField::String(v.to_owned()),
946 ScalarRefImpl::Bool(v) => ClickHouseField::Bool(v),
947 ScalarRefImpl::Decimal(d) => {
948 let d = if let Decimal::Normalized(d) = d {
949 let scale =
950 clickhouse_schema_feature.accuracy_decimal.1 as i32 - d.scale() as i32;
951 if scale < 0 {
952 d.mantissa() / 10_i128.pow(scale.unsigned_abs())
953 } else {
954 d.mantissa() * 10_i128.pow(scale as u32)
955 }
956 } else if clickhouse_schema_feature.can_null {
957 warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse null!");
958 return Ok(vec![ClickHouseFieldWithNull::None]);
959 } else {
960 warn!("Inf, -Inf, Nan in RW decimal is converted into clickhouse 0!");
961 0_i128
962 };
963 if clickhouse_schema_feature.accuracy_decimal.0 <= 9 {
964 ClickHouseField::Decimal(ClickHouseDecimal::Decimal32(d as i32))
965 } else if clickhouse_schema_feature.accuracy_decimal.0 <= 18 {
966 ClickHouseField::Decimal(ClickHouseDecimal::Decimal64(d as i64))
967 } else {
968 ClickHouseField::Decimal(ClickHouseDecimal::Decimal128(d))
969 }
970 }
971 ScalarRefImpl::Interval(_) => {
972 return Err(SinkError::ClickHouse(
973 "INTERVAL is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
974 ));
975 }
976 ScalarRefImpl::Date(v) => {
977 let days = v.get_nums_days_unix_epoch();
978 ClickHouseField::Int32(days)
979 }
980 ScalarRefImpl::Time(_) => {
981 return Err(SinkError::ClickHouse(
982 "TIME is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
983 ));
984 }
985 ScalarRefImpl::Timestamp(_) => {
986 return Err(SinkError::ClickHouse(
987 "clickhouse does not have a type corresponding to naive timestamp".to_owned(),
988 ));
989 }
990 ScalarRefImpl::Timestamptz(v) => {
991 let micros = v.timestamp_micros();
992 let ticks = match clickhouse_schema_feature.accuracy_time <= 6 {
993 true => {
994 micros / 10_i64.pow((6 - clickhouse_schema_feature.accuracy_time).into())
995 }
996 false => micros
997 .checked_mul(
998 10_i64.pow((clickhouse_schema_feature.accuracy_time - 6).into()),
999 )
1000 .ok_or_else(|| SinkError::ClickHouse("DateTime64 overflow".to_owned()))?,
1001 };
1002 ClickHouseField::Int64(ticks)
1003 }
1004 ScalarRefImpl::Jsonb(v) => {
1005 let json_str = v.to_string();
1006 ClickHouseField::String(json_str)
1007 }
1008 ScalarRefImpl::Struct(v) => {
1009 let mut struct_vec = vec![];
1010 for (index, field) in v.iter_fields_ref().enumerate() {
1011 let a = Self::from_scalar_ref(
1012 field,
1013 clickhouse_schema_feature_vec,
1014 clickhouse_schema_feature_index + index,
1015 )?;
1016 struct_vec.push(ClickHouseFieldWithNull::WithoutSome(ClickHouseField::List(
1017 a,
1018 )));
1019 }
1020 return Ok(struct_vec);
1021 }
1022 ScalarRefImpl::List(v) => {
1023 let mut vec = vec![];
1024 for i in v.iter() {
1025 vec.extend(Self::from_scalar_ref(
1026 i,
1027 clickhouse_schema_feature_vec,
1028 clickhouse_schema_feature_index,
1029 )?)
1030 }
1031 return Ok(vec![ClickHouseFieldWithNull::WithoutSome(
1032 ClickHouseField::List(vec),
1033 )]);
1034 }
1035 ScalarRefImpl::Bytea(_) => {
1036 return Err(SinkError::ClickHouse(
1037 "BYTEA is not supported for ClickHouse sink. Please convert to VARCHAR or other supported types.".to_owned(),
1038 ));
1039 }
1040 ScalarRefImpl::Map(_) => {
1041 return Err(SinkError::ClickHouse(
1042 "MAP is not supported for ClickHouse sink.".to_owned(),
1043 ));
1044 }
1045 ScalarRefImpl::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
1046 };
1047 let data = if clickhouse_schema_feature.can_null {
1048 vec![ClickHouseFieldWithNull::WithSome(data)]
1049 } else {
1050 vec![ClickHouseFieldWithNull::WithoutSome(data)]
1051 };
1052 Ok(data)
1053 }
1054}
1055
1056impl Serialize for ClickHouseField {
1057 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1058 where
1059 S: serde::Serializer,
1060 {
1061 match self {
1062 ClickHouseField::Int16(v) => serializer.serialize_i16(*v),
1063 ClickHouseField::Int32(v) => serializer.serialize_i32(*v),
1064 ClickHouseField::Int64(v) => serializer.serialize_i64(*v),
1065 ClickHouseField::Serial(v) => v.serialize(serializer),
1066 ClickHouseField::Float32(v) => serializer.serialize_f32(*v),
1067 ClickHouseField::Float64(v) => serializer.serialize_f64(*v),
1068 ClickHouseField::String(v) => serializer.serialize_str(v),
1069 ClickHouseField::Bool(v) => serializer.serialize_bool(*v),
1070 ClickHouseField::List(v) => {
1071 let mut s = serializer.serialize_seq(Some(v.len()))?;
1072 for i in v {
1073 s.serialize_element(i)?;
1074 }
1075 s.end()
1076 }
1077 ClickHouseField::Decimal(v) => v.serialize(serializer),
1078 ClickHouseField::Int8(v) => serializer.serialize_i8(*v),
1079 }
1080 }
1081}
1082impl Serialize for ClickHouseFieldWithNull {
1083 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1084 where
1085 S: serde::Serializer,
1086 {
1087 match self {
1088 ClickHouseFieldWithNull::WithSome(v) => serializer.serialize_some(v),
1089 ClickHouseFieldWithNull::WithoutSome(v) => v.serialize(serializer),
1090 ClickHouseFieldWithNull::None => serializer.serialize_none(),
1091 }
1092 }
1093}
1094impl Serialize for ClickHouseColumn {
1095 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1096 where
1097 S: serde::Serializer,
1098 {
1099 let mut s = serializer.serialize_struct("useless", self.row.len())?;
1100 for data in &self.row {
1101 s.serialize_field("useless", &data)?
1102 }
1103 s.end()
1104 }
1105}
1106
1107pub fn build_fields_name_type_from_schema(schema: &Schema) -> Result<Vec<(String, DataType)>> {
1110 let mut vec = vec![];
1111 for field in schema.fields() {
1112 if let DataType::Struct(st) = &field.data_type {
1113 for (name, data_type) in st.iter() {
1114 if matches!(data_type, DataType::Struct(_)) {
1115 return Err(SinkError::ClickHouse(
1116 "Only one level of nesting is supported for struct".to_owned(),
1117 ));
1118 } else {
1119 vec.push((
1120 format!("{}.{}", field.name, name),
1121 DataType::List(Box::new(data_type.clone())),
1122 ))
1123 }
1124 }
1125 } else {
1126 vec.push((field.name.clone(), field.data_type()));
1127 }
1128 }
1129 Ok(vec)
1130}