risingwave_connector/sink/
doris.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::{Context, anyhow};
18use async_trait::async_trait;
19use base64::Engine;
20use base64::engine::general_purpose;
21use bytes::{BufMut, Bytes, BytesMut};
22use risingwave_common::array::{Op, StreamChunk};
23use risingwave_common::catalog::Schema;
24use risingwave_common::types::DataType;
25use serde::Deserialize;
26use serde_derive::Serialize;
27use serde_json::Value;
28use serde_with::serde_as;
29use thiserror_ext::AsReport;
30use with_options::WithOptions;
31
32use super::doris_starrocks_connector::{
33    DORIS_DELETE_SIGN, DORIS_SUCCESS_STATUS, HeaderBuilder, InserterInner, InserterInnerBuilder,
34    POOL_IDLE_TIMEOUT,
35};
36use super::{
37    Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkWriterMetrics,
38};
39use crate::enforce_secret::EnforceSecret;
40use crate::sink::encoder::{JsonEncoder, RowEncoder};
41use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
42use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam};
43
44pub const DORIS_SINK: &str = "doris";
45
46#[derive(Deserialize, Debug, Clone, WithOptions)]
47pub struct DorisCommon {
48    #[serde(rename = "doris.url")]
49    pub url: String,
50    #[serde(rename = "doris.user")]
51    pub user: String,
52    #[serde(rename = "doris.password")]
53    pub password: String,
54    #[serde(rename = "doris.database")]
55    pub database: String,
56    #[serde(rename = "doris.table")]
57    pub table: String,
58    #[serde(rename = "doris.partial_update")]
59    pub partial_update: Option<String>,
60}
61
62impl EnforceSecret for DorisCommon {
63    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {
64        "doris.password", "doris.user"
65    };
66}
67
68impl DorisCommon {
69    pub(crate) fn build_get_client(&self) -> DorisSchemaClient {
70        DorisSchemaClient::new(
71            self.url.clone(),
72            self.table.clone(),
73            self.database.clone(),
74            self.user.clone(),
75            self.password.clone(),
76        )
77    }
78}
79
80#[serde_as]
81#[derive(Clone, Debug, Deserialize, WithOptions)]
82pub struct DorisConfig {
83    #[serde(flatten)]
84    pub common: DorisCommon,
85
86    pub r#type: String, // accept "append-only" or "upsert"
87}
88
89impl EnforceSecret for DorisConfig {
90    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
91        DorisCommon::enforce_one(prop)
92    }
93}
94
95impl DorisConfig {
96    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
97        let config =
98            serde_json::from_value::<DorisConfig>(serde_json::to_value(properties).unwrap())
99                .map_err(|e| SinkError::Config(anyhow!(e)))?;
100        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
101            return Err(SinkError::Config(anyhow!(
102                "`{}` must be {}, or {}",
103                SINK_TYPE_OPTION,
104                SINK_TYPE_APPEND_ONLY,
105                SINK_TYPE_UPSERT
106            )));
107        }
108        Ok(config)
109    }
110}
111
112#[derive(Debug)]
113pub struct DorisSink {
114    pub config: DorisConfig,
115    schema: Schema,
116    pk_indices: Vec<usize>,
117    is_append_only: bool,
118}
119
120impl EnforceSecret for DorisSink {
121    fn enforce_secret<'a>(
122        prop_iter: impl Iterator<Item = &'a str>,
123    ) -> crate::error::ConnectorResult<()> {
124        for prop in prop_iter {
125            DorisConfig::enforce_one(prop)?;
126        }
127        Ok(())
128    }
129}
130
131impl DorisSink {
132    pub fn new(
133        config: DorisConfig,
134        schema: Schema,
135        pk_indices: Vec<usize>,
136        is_append_only: bool,
137    ) -> Result<Self> {
138        Ok(Self {
139            config,
140            schema,
141            pk_indices,
142            is_append_only,
143        })
144    }
145}
146
147impl DorisSink {
148    fn check_column_name_and_type(&self, doris_column_fields: Vec<DorisField>) -> Result<()> {
149        let doris_columns_desc: HashMap<String, String> = doris_column_fields
150            .iter()
151            .map(|s| (s.name.clone(), s.r#type.clone()))
152            .collect();
153
154        let rw_fields_name = self.schema.fields();
155        if rw_fields_name.len() > doris_columns_desc.len() {
156            return Err(SinkError::Doris(
157                "The columns of the sink must be equal to or a superset of the target table's columns.".to_owned(),
158            ));
159        }
160
161        for i in rw_fields_name {
162            let value = doris_columns_desc.get(&i.name).ok_or_else(|| {
163                SinkError::Doris(format!(
164                    "Column name don't find in doris, risingwave is {:?} ",
165                    i.name
166                ))
167            })?;
168            if !Self::check_and_correct_column_type(&i.data_type, value.to_string())? {
169                return Err(SinkError::Doris(format!(
170                    "Column type don't match, column name is {:?}. doris type is {:?} risingwave type is {:?} ",
171                    i.name, value, i.data_type
172                )));
173            }
174        }
175        Ok(())
176    }
177
178    fn check_and_correct_column_type(
179        rw_data_type: &DataType,
180        doris_data_type: String,
181    ) -> Result<bool> {
182        match rw_data_type {
183            risingwave_common::types::DataType::Boolean => Ok(doris_data_type.contains("BOOLEAN")),
184            risingwave_common::types::DataType::Int16 => Ok(doris_data_type.contains("SMALLINT")),
185            risingwave_common::types::DataType::Int32 => Ok(doris_data_type.contains("INT")),
186            risingwave_common::types::DataType::Int64 => Ok(doris_data_type.contains("BIGINT")),
187            risingwave_common::types::DataType::Float32 => Ok(doris_data_type.contains("FLOAT")),
188            risingwave_common::types::DataType::Float64 => Ok(doris_data_type.contains("DOUBLE")),
189            risingwave_common::types::DataType::Decimal => Ok(doris_data_type.contains("DECIMAL")),
190            risingwave_common::types::DataType::Date => Ok(doris_data_type.contains("DATE")),
191            risingwave_common::types::DataType::Varchar => {
192                Ok(doris_data_type.contains("STRING") | doris_data_type.contains("VARCHAR"))
193            }
194            risingwave_common::types::DataType::Time => {
195                Err(SinkError::Doris("doris can not support Time".to_owned()))
196            }
197            risingwave_common::types::DataType::Timestamp => {
198                Ok(doris_data_type.contains("DATETIME"))
199            }
200            risingwave_common::types::DataType::Timestamptz => Err(SinkError::Doris(
201                "TIMESTAMP WITH TIMEZONE is not supported for Doris sink as Doris doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_owned(),
202            )),
203            risingwave_common::types::DataType::Interval => Err(SinkError::Doris(
204                "doris can not support Interval".to_owned(),
205            )),
206            risingwave_common::types::DataType::Struct(_) => Ok(doris_data_type.contains("STRUCT")),
207            risingwave_common::types::DataType::List(_) => Ok(doris_data_type.contains("ARRAY")),
208            risingwave_common::types::DataType::Bytea => {
209                Err(SinkError::Doris("doris can not support Bytea".to_owned()))
210            }
211            risingwave_common::types::DataType::Jsonb => Ok(doris_data_type.contains("JSON")),
212            risingwave_common::types::DataType::Serial => Ok(doris_data_type.contains("BIGINT")),
213            risingwave_common::types::DataType::Int256 => {
214                Err(SinkError::Doris("doris can not support Int256".to_owned()))
215            }
216            risingwave_common::types::DataType::Map(_) => {
217                Err(SinkError::Doris("doris can not support Map".to_owned()))
218            }
219            DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
220        }
221    }
222}
223
224impl Sink for DorisSink {
225    type Coordinator = DummySinkCommitCoordinator;
226    type LogSinker = LogSinkerOf<DorisSinkWriter>;
227
228    const SINK_NAME: &'static str = DORIS_SINK;
229
230    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
231        Ok(DorisSinkWriter::new(
232            self.config.clone(),
233            self.schema.clone(),
234            self.pk_indices.clone(),
235            self.is_append_only,
236        )
237        .await?
238        .into_log_sinker(SinkWriterMetrics::new(&writer_param)))
239    }
240
241    async fn validate(&self) -> Result<()> {
242        if !self.is_append_only && self.pk_indices.is_empty() {
243            return Err(SinkError::Config(anyhow!(
244                "Primary key not defined for upsert doris sink (please define in `primary_key` field)"
245            )));
246        }
247        // check reachability
248        let client = self.config.common.build_get_client();
249        let doris_schema = client.get_schema_from_doris().await?;
250
251        if !self.is_append_only && doris_schema.keys_type.ne("UNIQUE_KEYS") {
252            return Err(SinkError::Config(anyhow!(
253                "If you want to use upsert, please set the keysType of doris to UNIQUE_KEYS"
254            )));
255        }
256        self.check_column_name_and_type(doris_schema.properties)?;
257        Ok(())
258    }
259}
260
261pub struct DorisSinkWriter {
262    pub config: DorisConfig,
263    #[expect(dead_code)]
264    schema: Schema,
265    #[expect(dead_code)]
266    pk_indices: Vec<usize>,
267    inserter_inner_builder: InserterInnerBuilder,
268    is_append_only: bool,
269    client: Option<DorisClient>,
270    row_encoder: JsonEncoder,
271}
272
273impl TryFrom<SinkParam> for DorisSink {
274    type Error = SinkError;
275
276    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
277        let schema = param.schema();
278        let config = DorisConfig::from_btreemap(param.properties)?;
279        DorisSink::new(
280            config,
281            schema,
282            param.downstream_pk,
283            param.sink_type.is_append_only(),
284        )
285    }
286}
287
288impl DorisSinkWriter {
289    pub async fn new(
290        config: DorisConfig,
291        schema: Schema,
292        pk_indices: Vec<usize>,
293        is_append_only: bool,
294    ) -> Result<Self> {
295        let mut decimal_map = HashMap::default();
296        let doris_schema = config
297            .common
298            .build_get_client()
299            .get_schema_from_doris()
300            .await?;
301        for s in &doris_schema.properties {
302            if let Some(v) = s.get_decimal_pre_scale()? {
303                decimal_map.insert(s.name.clone(), v);
304            }
305        }
306
307        let header_builder = HeaderBuilder::new()
308            .add_common_header()
309            .set_user_password(config.common.user.clone(), config.common.password.clone())
310            .add_json_format()
311            .set_partial_columns(config.common.partial_update.clone())
312            .add_read_json_by_line();
313        let header = if !is_append_only {
314            header_builder.add_hidden_column().build()
315        } else {
316            header_builder.build()
317        };
318
319        let doris_insert_builder = InserterInnerBuilder::new(
320            config.common.url.clone(),
321            config.common.database.clone(),
322            config.common.table.clone(),
323            header,
324        )?;
325        Ok(Self {
326            config,
327            schema: schema.clone(),
328            pk_indices,
329            inserter_inner_builder: doris_insert_builder,
330            is_append_only,
331            client: None,
332            row_encoder: JsonEncoder::new_with_doris(schema, None, decimal_map),
333        })
334    }
335
336    async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
337        for (op, row) in chunk.rows() {
338            if op != Op::Insert {
339                continue;
340            }
341            let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string();
342            self.client
343                .as_mut()
344                .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
345                .write(row_json_string.into())
346                .await?;
347        }
348        Ok(())
349    }
350
351    async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> {
352        for (op, row) in chunk.rows() {
353            match op {
354                Op::Insert => {
355                    let mut row_json_value = self.row_encoder.encode(row)?;
356                    row_json_value
357                        .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("0".to_owned()));
358                    let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
359                        SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
360                    })?;
361                    self.client
362                        .as_mut()
363                        .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
364                        .write(row_json_string.into())
365                        .await?;
366                }
367                Op::Delete => {
368                    let mut row_json_value = self.row_encoder.encode(row)?;
369                    row_json_value
370                        .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("1".to_owned()));
371                    let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
372                        SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
373                    })?;
374                    self.client
375                        .as_mut()
376                        .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
377                        .write(row_json_string.into())
378                        .await?;
379                }
380                Op::UpdateDelete => {}
381                Op::UpdateInsert => {
382                    let mut row_json_value = self.row_encoder.encode(row)?;
383                    row_json_value
384                        .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("0".to_owned()));
385                    let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
386                        SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
387                    })?;
388                    self.client
389                        .as_mut()
390                        .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
391                        .write(row_json_string.into())
392                        .await?;
393                }
394            }
395        }
396        Ok(())
397    }
398}
399
400#[async_trait]
401impl SinkWriter for DorisSinkWriter {
402    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
403        if self.client.is_none() {
404            self.client = Some(DorisClient::new(self.inserter_inner_builder.build().await?));
405        }
406        if self.is_append_only {
407            self.append_only(chunk).await
408        } else {
409            self.upsert(chunk).await
410        }
411    }
412
413    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
414        Ok(())
415    }
416
417    async fn abort(&mut self) -> Result<()> {
418        Ok(())
419    }
420
421    async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> {
422        if self.client.is_some() {
423            let client = self
424                .client
425                .take()
426                .ok_or_else(|| SinkError::Doris("Can't find doris inserter".to_owned()))?;
427            client.finish().await?;
428        }
429        Ok(())
430    }
431}
432
433pub struct DorisSchemaClient {
434    url: String,
435    table: String,
436    db: String,
437    user: String,
438    password: String,
439}
440impl DorisSchemaClient {
441    pub fn new(url: String, table: String, db: String, user: String, password: String) -> Self {
442        Self {
443            url,
444            table,
445            db,
446            user,
447            password,
448        }
449    }
450
451    pub async fn get_schema_from_doris(&self) -> Result<DorisSchema> {
452        let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table);
453
454        let client = reqwest::Client::builder()
455            .pool_idle_timeout(POOL_IDLE_TIMEOUT)
456            .build()
457            .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
458
459        let response = client
460            .get(uri)
461            .header(
462                "Authorization",
463                format!(
464                    "Basic {}",
465                    general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password))
466                ),
467            )
468            .send()
469            .await
470            .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
471
472        let json: Value = response
473            .json()
474            .await
475            .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
476        let json_data = if json.get("code").is_some() && json.get("msg").is_some() {
477            json.get("data")
478                .ok_or_else(|| {
479                    SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data"))
480                })?
481                .clone()
482        } else {
483            json
484        };
485        let schema: DorisSchema = serde_json::from_value(json_data)
486            .context("Can't get schema from json")
487            .map_err(SinkError::DorisStarrocksConnect)?;
488        Ok(schema)
489    }
490}
491#[derive(Debug, Serialize, Deserialize)]
492pub struct DorisSchema {
493    status: i32,
494    #[serde(rename = "keysType")]
495    pub keys_type: String,
496    pub properties: Vec<DorisField>,
497}
498#[derive(Debug, Serialize, Deserialize)]
499pub struct DorisField {
500    pub name: String,
501    pub r#type: String,
502    comment: String,
503    pub precision: Option<String>,
504    pub scale: Option<String>,
505    aggregation_type: String,
506}
507impl DorisField {
508    pub fn get_decimal_pre_scale(&self) -> Result<Option<u8>> {
509        if self.r#type.contains("DECIMAL") {
510            let scale = self
511                .scale
512                .as_ref()
513                .ok_or_else(|| {
514                    SinkError::Doris(format!(
515                        "In doris, the type of {} is DECIMAL, but `scale` is not found",
516                        self.name
517                    ))
518                })?
519                .parse::<u8>()
520                .map_err(|err| {
521                    SinkError::Doris(format!(
522                        "Unable to convert decimal's scale to u8. error: {:?}",
523                        err.kind()
524                    ))
525                })?;
526            Ok(Some(scale))
527        } else {
528            Ok(None)
529        }
530    }
531}
532
533#[derive(Debug, Serialize, Deserialize)]
534pub struct DorisInsertResultResponse {
535    #[serde(rename = "TxnId")]
536    txn_id: i64,
537    #[serde(rename = "Label")]
538    label: String,
539    #[serde(rename = "Status")]
540    status: String,
541    #[serde(rename = "TwoPhaseCommit")]
542    two_phase_commit: String,
543    #[serde(rename = "Message")]
544    message: String,
545    #[serde(rename = "NumberTotalRows")]
546    number_total_rows: i64,
547    #[serde(rename = "NumberLoadedRows")]
548    number_loaded_rows: i64,
549    #[serde(rename = "NumberFilteredRows")]
550    number_filtered_rows: i32,
551    #[serde(rename = "NumberUnselectedRows")]
552    number_unselected_rows: i32,
553    #[serde(rename = "LoadBytes")]
554    load_bytes: i64,
555    #[serde(rename = "LoadTimeMs")]
556    load_time_ms: i32,
557    #[serde(rename = "BeginTxnTimeMs")]
558    begin_txn_time_ms: i32,
559    #[serde(rename = "StreamLoadPutTimeMs")]
560    stream_load_put_time_ms: i32,
561    #[serde(rename = "ReadDataTimeMs")]
562    read_data_time_ms: i32,
563    #[serde(rename = "WriteDataTimeMs")]
564    write_data_time_ms: i32,
565    #[serde(rename = "CommitAndPublishTimeMs")]
566    commit_and_publish_time_ms: i32,
567    #[serde(rename = "ErrorURL")]
568    err_url: Option<String>,
569}
570
571pub struct DorisClient {
572    insert: InserterInner,
573    is_first_record: bool,
574}
575impl DorisClient {
576    pub fn new(insert: InserterInner) -> Self {
577        Self {
578            insert,
579            is_first_record: true,
580        }
581    }
582
583    pub async fn write(&mut self, data: Bytes) -> Result<()> {
584        let mut data_build = BytesMut::new();
585        if self.is_first_record {
586            self.is_first_record = false;
587        } else {
588            data_build.put_slice("\n".as_bytes());
589        }
590        data_build.put_slice(&data);
591        self.insert.write(data_build.into()).await?;
592        Ok(())
593    }
594
595    pub async fn finish(self) -> Result<DorisInsertResultResponse> {
596        let raw = self.insert.finish().await?;
597        let res: DorisInsertResultResponse = serde_json::from_slice(&raw)
598            .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
599
600        if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) {
601            return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!(
602                "Insert error: {:?}, error url: {:?}",
603                res.message,
604                res.err_url
605            )));
606        };
607        Ok(res)
608    }
609}