risingwave_connector/sink/
doris.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::{Context, anyhow};
18use async_trait::async_trait;
19use base64::Engine;
20use base64::engine::general_purpose;
21use bytes::{BufMut, Bytes, BytesMut};
22use risingwave_common::array::{Op, StreamChunk};
23use risingwave_common::catalog::Schema;
24use risingwave_common::types::DataType;
25use serde::Deserialize;
26use serde_derive::Serialize;
27use serde_json::Value;
28use serde_with::serde_as;
29use thiserror_ext::AsReport;
30use with_options::WithOptions;
31
32use super::doris_starrocks_connector::{
33    DORIS_DELETE_SIGN, DORIS_SUCCESS_STATUS, HeaderBuilder, InserterInner, InserterInnerBuilder,
34    POOL_IDLE_TIMEOUT,
35};
36use super::{
37    Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkWriterMetrics,
38};
39use crate::enforce_secret::EnforceSecret;
40use crate::sink::encoder::{JsonEncoder, RowEncoder};
41use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
42use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam};
43
44pub const DORIS_SINK: &str = "doris";
45
46#[derive(Deserialize, Debug, Clone, WithOptions)]
47pub struct DorisCommon {
48    #[serde(rename = "doris.url")]
49    pub url: String,
50    #[serde(rename = "doris.user")]
51    pub user: String,
52    #[serde(rename = "doris.password")]
53    pub password: String,
54    #[serde(rename = "doris.database")]
55    pub database: String,
56    #[serde(rename = "doris.table")]
57    pub table: String,
58    #[serde(rename = "doris.partial_update")]
59    pub partial_update: Option<String>,
60}
61
62impl EnforceSecret for DorisCommon {
63    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {
64        "doris.password", "doris.user"
65    };
66}
67
68impl DorisCommon {
69    pub(crate) fn build_get_client(&self) -> DorisSchemaClient {
70        DorisSchemaClient::new(
71            self.url.clone(),
72            self.table.clone(),
73            self.database.clone(),
74            self.user.clone(),
75            self.password.clone(),
76        )
77    }
78}
79
80#[serde_as]
81#[derive(Clone, Debug, Deserialize, WithOptions)]
82pub struct DorisConfig {
83    #[serde(flatten)]
84    pub common: DorisCommon,
85
86    pub r#type: String, // accept "append-only" or "upsert"
87}
88
89impl EnforceSecret for DorisConfig {
90    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
91        DorisCommon::enforce_one(prop)
92    }
93}
94
95impl DorisConfig {
96    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
97        let config =
98            serde_json::from_value::<DorisConfig>(serde_json::to_value(properties).unwrap())
99                .map_err(|e| SinkError::Config(anyhow!(e)))?;
100        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
101            return Err(SinkError::Config(anyhow!(
102                "`{}` must be {}, or {}",
103                SINK_TYPE_OPTION,
104                SINK_TYPE_APPEND_ONLY,
105                SINK_TYPE_UPSERT
106            )));
107        }
108        Ok(config)
109    }
110}
111
112#[derive(Debug)]
113pub struct DorisSink {
114    pub config: DorisConfig,
115    schema: Schema,
116    pk_indices: Vec<usize>,
117    is_append_only: bool,
118}
119
120impl EnforceSecret for DorisSink {
121    fn enforce_secret<'a>(
122        prop_iter: impl Iterator<Item = &'a str>,
123    ) -> crate::error::ConnectorResult<()> {
124        for prop in prop_iter {
125            DorisConfig::enforce_one(prop)?;
126        }
127        Ok(())
128    }
129}
130
131impl DorisSink {
132    pub fn new(
133        config: DorisConfig,
134        schema: Schema,
135        pk_indices: Vec<usize>,
136        is_append_only: bool,
137    ) -> Result<Self> {
138        Ok(Self {
139            config,
140            schema,
141            pk_indices,
142            is_append_only,
143        })
144    }
145}
146
147impl DorisSink {
148    fn check_column_name_and_type(&self, doris_column_fields: Vec<DorisField>) -> Result<()> {
149        let doris_columns_desc: HashMap<String, String> = doris_column_fields
150            .iter()
151            .map(|s| (s.name.clone(), s.r#type.clone()))
152            .collect();
153
154        let rw_fields_name = self.schema.fields();
155        if rw_fields_name.len() > doris_columns_desc.len() {
156            return Err(SinkError::Doris(
157                "The columns of the sink must be equal to or a superset of the target table's columns.".to_owned(),
158            ));
159        }
160
161        for i in rw_fields_name {
162            let value = doris_columns_desc.get(&i.name).ok_or_else(|| {
163                SinkError::Doris(format!(
164                    "Column name don't find in doris, risingwave is {:?} ",
165                    i.name
166                ))
167            })?;
168            if !Self::check_and_correct_column_type(&i.data_type, value.to_string())? {
169                return Err(SinkError::Doris(format!(
170                    "Column type don't match, column name is {:?}. doris type is {:?} risingwave type is {:?} ",
171                    i.name, value, i.data_type
172                )));
173            }
174        }
175        Ok(())
176    }
177
178    fn check_and_correct_column_type(
179        rw_data_type: &DataType,
180        doris_data_type: String,
181    ) -> Result<bool> {
182        match rw_data_type {
183            risingwave_common::types::DataType::Boolean => Ok(doris_data_type.contains("BOOLEAN")),
184            risingwave_common::types::DataType::Int16 => Ok(doris_data_type.contains("SMALLINT")),
185            risingwave_common::types::DataType::Int32 => Ok(doris_data_type.contains("INT")),
186            risingwave_common::types::DataType::Int64 => Ok(doris_data_type.contains("BIGINT")),
187            risingwave_common::types::DataType::Float32 => Ok(doris_data_type.contains("FLOAT")),
188            risingwave_common::types::DataType::Float64 => Ok(doris_data_type.contains("DOUBLE")),
189            risingwave_common::types::DataType::Decimal => Ok(doris_data_type.contains("DECIMAL")),
190            risingwave_common::types::DataType::Date => Ok(doris_data_type.contains("DATE")),
191            risingwave_common::types::DataType::Varchar => {
192                Ok(doris_data_type.contains("STRING") | doris_data_type.contains("VARCHAR"))
193            }
194            risingwave_common::types::DataType::Time => {
195                Err(SinkError::Doris("doris can not support Time".to_owned()))
196            }
197            risingwave_common::types::DataType::Timestamp => {
198                Ok(doris_data_type.contains("DATETIME"))
199            }
200            risingwave_common::types::DataType::Timestamptz => Err(SinkError::Doris(
201                "TIMESTAMP WITH TIMEZONE is not supported for Doris sink as Doris doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_owned(),
202            )),
203            risingwave_common::types::DataType::Interval => Err(SinkError::Doris(
204                "doris can not support Interval".to_owned(),
205            )),
206            risingwave_common::types::DataType::Struct(_) => Ok(doris_data_type.contains("STRUCT")),
207            risingwave_common::types::DataType::List(_) => Ok(doris_data_type.contains("ARRAY")),
208            risingwave_common::types::DataType::Bytea => {
209                Err(SinkError::Doris("doris can not support Bytea".to_owned()))
210            }
211            risingwave_common::types::DataType::Jsonb => Ok(doris_data_type.contains("JSON")),
212            risingwave_common::types::DataType::Serial => Ok(doris_data_type.contains("BIGINT")),
213            risingwave_common::types::DataType::Int256 => {
214                Err(SinkError::Doris("doris can not support Int256".to_owned()))
215            }
216            risingwave_common::types::DataType::Map(_) => {
217                Err(SinkError::Doris("doris can not support Map".to_owned()))
218            }
219        }
220    }
221}
222
223impl Sink for DorisSink {
224    type Coordinator = DummySinkCommitCoordinator;
225    type LogSinker = LogSinkerOf<DorisSinkWriter>;
226
227    const SINK_NAME: &'static str = DORIS_SINK;
228
229    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
230        Ok(DorisSinkWriter::new(
231            self.config.clone(),
232            self.schema.clone(),
233            self.pk_indices.clone(),
234            self.is_append_only,
235        )
236        .await?
237        .into_log_sinker(SinkWriterMetrics::new(&writer_param)))
238    }
239
240    async fn validate(&self) -> Result<()> {
241        if !self.is_append_only && self.pk_indices.is_empty() {
242            return Err(SinkError::Config(anyhow!(
243                "Primary key not defined for upsert doris sink (please define in `primary_key` field)"
244            )));
245        }
246        // check reachability
247        let client = self.config.common.build_get_client();
248        let doris_schema = client.get_schema_from_doris().await?;
249
250        if !self.is_append_only && doris_schema.keys_type.ne("UNIQUE_KEYS") {
251            return Err(SinkError::Config(anyhow!(
252                "If you want to use upsert, please set the keysType of doris to UNIQUE_KEYS"
253            )));
254        }
255        self.check_column_name_and_type(doris_schema.properties)?;
256        Ok(())
257    }
258}
259
260pub struct DorisSinkWriter {
261    pub config: DorisConfig,
262    #[expect(dead_code)]
263    schema: Schema,
264    #[expect(dead_code)]
265    pk_indices: Vec<usize>,
266    inserter_inner_builder: InserterInnerBuilder,
267    is_append_only: bool,
268    client: Option<DorisClient>,
269    row_encoder: JsonEncoder,
270}
271
272impl TryFrom<SinkParam> for DorisSink {
273    type Error = SinkError;
274
275    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
276        let schema = param.schema();
277        let config = DorisConfig::from_btreemap(param.properties)?;
278        DorisSink::new(
279            config,
280            schema,
281            param.downstream_pk,
282            param.sink_type.is_append_only(),
283        )
284    }
285}
286
287impl DorisSinkWriter {
288    pub async fn new(
289        config: DorisConfig,
290        schema: Schema,
291        pk_indices: Vec<usize>,
292        is_append_only: bool,
293    ) -> Result<Self> {
294        let mut decimal_map = HashMap::default();
295        let doris_schema = config
296            .common
297            .build_get_client()
298            .get_schema_from_doris()
299            .await?;
300        for s in &doris_schema.properties {
301            if let Some(v) = s.get_decimal_pre_scale()? {
302                decimal_map.insert(s.name.clone(), v);
303            }
304        }
305
306        let header_builder = HeaderBuilder::new()
307            .add_common_header()
308            .set_user_password(config.common.user.clone(), config.common.password.clone())
309            .add_json_format()
310            .set_partial_columns(config.common.partial_update.clone())
311            .add_read_json_by_line();
312        let header = if !is_append_only {
313            header_builder.add_hidden_column().build()
314        } else {
315            header_builder.build()
316        };
317
318        let doris_insert_builder = InserterInnerBuilder::new(
319            config.common.url.clone(),
320            config.common.database.clone(),
321            config.common.table.clone(),
322            header,
323        )?;
324        Ok(Self {
325            config,
326            schema: schema.clone(),
327            pk_indices,
328            inserter_inner_builder: doris_insert_builder,
329            is_append_only,
330            client: None,
331            row_encoder: JsonEncoder::new_with_doris(schema, None, decimal_map),
332        })
333    }
334
335    async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
336        for (op, row) in chunk.rows() {
337            if op != Op::Insert {
338                continue;
339            }
340            let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string();
341            self.client
342                .as_mut()
343                .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
344                .write(row_json_string.into())
345                .await?;
346        }
347        Ok(())
348    }
349
350    async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> {
351        for (op, row) in chunk.rows() {
352            match op {
353                Op::Insert => {
354                    let mut row_json_value = self.row_encoder.encode(row)?;
355                    row_json_value
356                        .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("0".to_owned()));
357                    let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
358                        SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
359                    })?;
360                    self.client
361                        .as_mut()
362                        .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
363                        .write(row_json_string.into())
364                        .await?;
365                }
366                Op::Delete => {
367                    let mut row_json_value = self.row_encoder.encode(row)?;
368                    row_json_value
369                        .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("1".to_owned()));
370                    let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
371                        SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
372                    })?;
373                    self.client
374                        .as_mut()
375                        .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
376                        .write(row_json_string.into())
377                        .await?;
378                }
379                Op::UpdateDelete => {}
380                Op::UpdateInsert => {
381                    let mut row_json_value = self.row_encoder.encode(row)?;
382                    row_json_value
383                        .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("0".to_owned()));
384                    let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
385                        SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
386                    })?;
387                    self.client
388                        .as_mut()
389                        .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
390                        .write(row_json_string.into())
391                        .await?;
392                }
393            }
394        }
395        Ok(())
396    }
397}
398
399#[async_trait]
400impl SinkWriter for DorisSinkWriter {
401    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
402        if self.client.is_none() {
403            self.client = Some(DorisClient::new(self.inserter_inner_builder.build().await?));
404        }
405        if self.is_append_only {
406            self.append_only(chunk).await
407        } else {
408            self.upsert(chunk).await
409        }
410    }
411
412    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
413        Ok(())
414    }
415
416    async fn abort(&mut self) -> Result<()> {
417        Ok(())
418    }
419
420    async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> {
421        if self.client.is_some() {
422            let client = self
423                .client
424                .take()
425                .ok_or_else(|| SinkError::Doris("Can't find doris inserter".to_owned()))?;
426            client.finish().await?;
427        }
428        Ok(())
429    }
430}
431
432pub struct DorisSchemaClient {
433    url: String,
434    table: String,
435    db: String,
436    user: String,
437    password: String,
438}
439impl DorisSchemaClient {
440    pub fn new(url: String, table: String, db: String, user: String, password: String) -> Self {
441        Self {
442            url,
443            table,
444            db,
445            user,
446            password,
447        }
448    }
449
450    pub async fn get_schema_from_doris(&self) -> Result<DorisSchema> {
451        let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table);
452
453        let client = reqwest::Client::builder()
454            .pool_idle_timeout(POOL_IDLE_TIMEOUT)
455            .build()
456            .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
457
458        let response = client
459            .get(uri)
460            .header(
461                "Authorization",
462                format!(
463                    "Basic {}",
464                    general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password))
465                ),
466            )
467            .send()
468            .await
469            .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
470
471        let json: Value = response
472            .json()
473            .await
474            .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
475        let json_data = if json.get("code").is_some() && json.get("msg").is_some() {
476            json.get("data")
477                .ok_or_else(|| {
478                    SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data"))
479                })?
480                .clone()
481        } else {
482            json
483        };
484        let schema: DorisSchema = serde_json::from_value(json_data)
485            .context("Can't get schema from json")
486            .map_err(SinkError::DorisStarrocksConnect)?;
487        Ok(schema)
488    }
489}
490#[derive(Debug, Serialize, Deserialize)]
491pub struct DorisSchema {
492    status: i32,
493    #[serde(rename = "keysType")]
494    pub keys_type: String,
495    pub properties: Vec<DorisField>,
496}
497#[derive(Debug, Serialize, Deserialize)]
498pub struct DorisField {
499    pub name: String,
500    pub r#type: String,
501    comment: String,
502    pub precision: Option<String>,
503    pub scale: Option<String>,
504    aggregation_type: String,
505}
506impl DorisField {
507    pub fn get_decimal_pre_scale(&self) -> Result<Option<u8>> {
508        if self.r#type.contains("DECIMAL") {
509            let scale = self
510                .scale
511                .as_ref()
512                .ok_or_else(|| {
513                    SinkError::Doris(format!(
514                        "In doris, the type of {} is DECIMAL, but `scale` is not found",
515                        self.name
516                    ))
517                })?
518                .parse::<u8>()
519                .map_err(|err| {
520                    SinkError::Doris(format!(
521                        "Unable to convert decimal's scale to u8. error: {:?}",
522                        err.kind()
523                    ))
524                })?;
525            Ok(Some(scale))
526        } else {
527            Ok(None)
528        }
529    }
530}
531
532#[derive(Debug, Serialize, Deserialize)]
533pub struct DorisInsertResultResponse {
534    #[serde(rename = "TxnId")]
535    txn_id: i64,
536    #[serde(rename = "Label")]
537    label: String,
538    #[serde(rename = "Status")]
539    status: String,
540    #[serde(rename = "TwoPhaseCommit")]
541    two_phase_commit: String,
542    #[serde(rename = "Message")]
543    message: String,
544    #[serde(rename = "NumberTotalRows")]
545    number_total_rows: i64,
546    #[serde(rename = "NumberLoadedRows")]
547    number_loaded_rows: i64,
548    #[serde(rename = "NumberFilteredRows")]
549    number_filtered_rows: i32,
550    #[serde(rename = "NumberUnselectedRows")]
551    number_unselected_rows: i32,
552    #[serde(rename = "LoadBytes")]
553    load_bytes: i64,
554    #[serde(rename = "LoadTimeMs")]
555    load_time_ms: i32,
556    #[serde(rename = "BeginTxnTimeMs")]
557    begin_txn_time_ms: i32,
558    #[serde(rename = "StreamLoadPutTimeMs")]
559    stream_load_put_time_ms: i32,
560    #[serde(rename = "ReadDataTimeMs")]
561    read_data_time_ms: i32,
562    #[serde(rename = "WriteDataTimeMs")]
563    write_data_time_ms: i32,
564    #[serde(rename = "CommitAndPublishTimeMs")]
565    commit_and_publish_time_ms: i32,
566    #[serde(rename = "ErrorURL")]
567    err_url: Option<String>,
568}
569
570pub struct DorisClient {
571    insert: InserterInner,
572    is_first_record: bool,
573}
574impl DorisClient {
575    pub fn new(insert: InserterInner) -> Self {
576        Self {
577            insert,
578            is_first_record: true,
579        }
580    }
581
582    pub async fn write(&mut self, data: Bytes) -> Result<()> {
583        let mut data_build = BytesMut::new();
584        if self.is_first_record {
585            self.is_first_record = false;
586        } else {
587            data_build.put_slice("\n".as_bytes());
588        }
589        data_build.put_slice(&data);
590        self.insert.write(data_build.into()).await?;
591        Ok(())
592    }
593
594    pub async fn finish(self) -> Result<DorisInsertResultResponse> {
595        let raw = self.insert.finish().await?;
596        let res: DorisInsertResultResponse = serde_json::from_slice(&raw)
597            .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
598
599        if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) {
600            return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!(
601                "Insert error: {:?}, error url: {:?}",
602                res.message,
603                res.err_url
604            )));
605        };
606        Ok(res)
607    }
608}