risingwave_connector/sink/
doris.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::{Context, anyhow};
18use async_trait::async_trait;
19use base64::Engine;
20use base64::engine::general_purpose;
21use bytes::{BufMut, Bytes, BytesMut};
22use risingwave_common::array::{Op, StreamChunk};
23use risingwave_common::catalog::Schema;
24use risingwave_common::types::DataType;
25use serde::{Deserialize, Serialize};
26use serde_json::Value;
27use serde_with::{DisplayFromStr, serde_as};
28use thiserror_ext::AsReport;
29use with_options::WithOptions;
30
31use super::doris_starrocks_connector::{
32    DORIS_DELETE_SIGN, DORIS_SUCCESS_STATUS, HeaderBuilder, InserterInner, InserterInnerBuilder,
33    POOL_IDLE_TIMEOUT,
34};
35use super::{
36    Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkWriterMetrics,
37};
38use crate::enforce_secret::EnforceSecret;
39use crate::sink::encoder::{JsonEncoder, RowEncoder};
40use crate::sink::starrocks::_default_stream_load_http_timeout_ms;
41use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
42use crate::sink::{Sink, SinkParam, SinkWriter, SinkWriterParam};
43
44pub const DORIS_SINK: &str = "doris";
45
46#[derive(Deserialize, Debug, Clone, WithOptions)]
47pub struct DorisCommon {
48    #[serde(rename = "doris.url")]
49    pub url: String,
50    #[serde(rename = "doris.user")]
51    pub user: String,
52    #[serde(rename = "doris.password")]
53    pub password: String,
54    #[serde(rename = "doris.database")]
55    pub database: String,
56    #[serde(rename = "doris.table")]
57    pub table: String,
58    #[serde(rename = "doris.partial_update")]
59    pub partial_update: Option<String>,
60}
61
62impl EnforceSecret for DorisCommon {
63    const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {
64        "doris.password", "doris.user"
65    };
66}
67
68impl DorisCommon {
69    pub(crate) fn build_get_client(&self) -> DorisSchemaClient {
70        DorisSchemaClient::new(
71            self.url.clone(),
72            self.table.clone(),
73            self.database.clone(),
74            self.user.clone(),
75            self.password.clone(),
76        )
77    }
78}
79
80#[serde_as]
81#[derive(Clone, Debug, Deserialize, WithOptions)]
82pub struct DorisConfig {
83    #[serde(flatten)]
84    pub common: DorisCommon,
85
86    pub r#type: String, // accept "append-only" or "upsert"
87
88    /// The timeout in milliseconds for stream load http request, defaults to 10 seconds.
89    #[serde(
90        rename = "doris.stream_load.http.timeout.ms",
91        default = "_default_stream_load_http_timeout_ms"
92    )]
93    #[serde_as(as = "DisplayFromStr")]
94    #[with_option(allow_alter_on_fly)]
95    pub stream_load_http_timeout_ms: u64,
96}
97
98impl EnforceSecret for DorisConfig {
99    fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
100        DorisCommon::enforce_one(prop)
101    }
102}
103
104impl DorisConfig {
105    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
106        let config =
107            serde_json::from_value::<DorisConfig>(serde_json::to_value(properties).unwrap())
108                .map_err(|e| SinkError::Config(anyhow!(e)))?;
109        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
110            return Err(SinkError::Config(anyhow!(
111                "`{}` must be {}, or {}",
112                SINK_TYPE_OPTION,
113                SINK_TYPE_APPEND_ONLY,
114                SINK_TYPE_UPSERT
115            )));
116        }
117        Ok(config)
118    }
119}
120
121#[derive(Debug)]
122pub struct DorisSink {
123    pub config: DorisConfig,
124    schema: Schema,
125    pk_indices: Vec<usize>,
126    is_append_only: bool,
127}
128
129impl EnforceSecret for DorisSink {
130    fn enforce_secret<'a>(
131        prop_iter: impl Iterator<Item = &'a str>,
132    ) -> crate::error::ConnectorResult<()> {
133        for prop in prop_iter {
134            DorisConfig::enforce_one(prop)?;
135        }
136        Ok(())
137    }
138}
139
140impl DorisSink {
141    pub fn new(
142        config: DorisConfig,
143        schema: Schema,
144        pk_indices: Vec<usize>,
145        is_append_only: bool,
146    ) -> Result<Self> {
147        Ok(Self {
148            config,
149            schema,
150            pk_indices,
151            is_append_only,
152        })
153    }
154}
155
156impl DorisSink {
157    fn check_column_name_and_type(&self, doris_column_fields: Vec<DorisField>) -> Result<()> {
158        let doris_columns_desc: HashMap<String, String> = doris_column_fields
159            .iter()
160            .map(|s| (s.name.clone(), s.r#type.clone()))
161            .collect();
162
163        let rw_fields_name = self.schema.fields();
164        if rw_fields_name.len() > doris_columns_desc.len() {
165            return Err(SinkError::Doris(
166                "The columns of the sink must be equal to or a superset of the target table's columns.".to_owned(),
167            ));
168        }
169
170        for i in rw_fields_name {
171            let value = doris_columns_desc.get(&i.name).ok_or_else(|| {
172                SinkError::Doris(format!(
173                    "Column name don't find in doris, risingwave is {:?} ",
174                    i.name
175                ))
176            })?;
177            if !Self::check_and_correct_column_type(&i.data_type, value.clone())? {
178                return Err(SinkError::Doris(format!(
179                    "Column type don't match, column name is {:?}. doris type is {:?} risingwave type is {:?} ",
180                    i.name, value, i.data_type
181                )));
182            }
183        }
184        Ok(())
185    }
186
187    fn check_and_correct_column_type(
188        rw_data_type: &DataType,
189        doris_data_type: String,
190    ) -> Result<bool> {
191        match rw_data_type {
192            risingwave_common::types::DataType::Boolean => Ok(doris_data_type.contains("BOOLEAN")),
193            risingwave_common::types::DataType::Int16 => Ok(doris_data_type.contains("SMALLINT")),
194            risingwave_common::types::DataType::Int32 => Ok(doris_data_type.contains("INT")),
195            risingwave_common::types::DataType::Int64 => Ok(doris_data_type.contains("BIGINT")),
196            risingwave_common::types::DataType::Float32 => Ok(doris_data_type.contains("FLOAT")),
197            risingwave_common::types::DataType::Float64 => Ok(doris_data_type.contains("DOUBLE")),
198            risingwave_common::types::DataType::Decimal => Ok(doris_data_type.contains("DECIMAL")),
199            risingwave_common::types::DataType::Date => Ok(doris_data_type.contains("DATE")),
200            risingwave_common::types::DataType::Varchar => {
201                Ok(doris_data_type.contains("STRING") | doris_data_type.contains("VARCHAR"))
202            }
203            risingwave_common::types::DataType::Time => {
204                Err(SinkError::Doris("TIME is not supported for Doris sink. Please convert to VARCHAR or other supported types.".to_owned()))
205            }
206            risingwave_common::types::DataType::Timestamp => {
207                Ok(doris_data_type.contains("DATETIME"))
208            }
209            risingwave_common::types::DataType::Timestamptz => Err(SinkError::Doris(
210                "TIMESTAMP WITH TIMEZONE is not supported for Doris sink as Doris doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_owned(),
211            )),
212            risingwave_common::types::DataType::Interval => Err(SinkError::Doris(
213                "INTERVAL is not supported for Doris sink. Please convert to VARCHAR or other supported types.".to_owned(),
214            )),
215            risingwave_common::types::DataType::Struct(_) => Ok(doris_data_type.contains("STRUCT")),
216            risingwave_common::types::DataType::List(_) => Ok(doris_data_type.contains("ARRAY")),
217            risingwave_common::types::DataType::Bytea => {
218                Err(SinkError::Doris("BYTEA is not supported for Doris sink. Please convert to VARCHAR or other supported types.".to_owned()))
219            }
220            risingwave_common::types::DataType::Jsonb => Ok(doris_data_type.contains("JSON")),
221            risingwave_common::types::DataType::Serial => Ok(doris_data_type.contains("BIGINT")),
222            risingwave_common::types::DataType::Int256 => {
223                Err(SinkError::Doris("INT256 is not supported for Doris sink.".to_owned()))
224            }
225            risingwave_common::types::DataType::Map(_) => {
226                Err(SinkError::Doris("MAP is not supported for Doris sink.".to_owned()))
227            }
228            DataType::Vector(_) => {
229                Err(SinkError::Doris("VECTOR is not supported for Doris sink.".to_owned()))
230            },
231        }
232    }
233}
234
235impl Sink for DorisSink {
236    type LogSinker = LogSinkerOf<DorisSinkWriter>;
237
238    const SINK_NAME: &'static str = DORIS_SINK;
239
240    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
241        Ok(DorisSinkWriter::new(
242            self.config.clone(),
243            self.schema.clone(),
244            self.pk_indices.clone(),
245            self.is_append_only,
246        )
247        .await?
248        .into_log_sinker(SinkWriterMetrics::new(&writer_param)))
249    }
250
251    async fn validate(&self) -> Result<()> {
252        if !self.is_append_only && self.pk_indices.is_empty() {
253            return Err(SinkError::Config(anyhow!(
254                "Primary key not defined for upsert doris sink (please define in `primary_key` field)"
255            )));
256        }
257        // check reachability
258        let client = self.config.common.build_get_client();
259        let doris_schema = client.get_schema_from_doris().await?;
260
261        if !self.is_append_only && doris_schema.keys_type.ne("UNIQUE_KEYS") {
262            return Err(SinkError::Config(anyhow!(
263                "If you want to use upsert, please set the keysType of doris to UNIQUE_KEYS"
264            )));
265        }
266        self.check_column_name_and_type(doris_schema.properties)?;
267        Ok(())
268    }
269}
270
271pub struct DorisSinkWriter {
272    pub config: DorisConfig,
273    #[expect(dead_code)]
274    schema: Schema,
275    #[expect(dead_code)]
276    pk_indices: Vec<usize>,
277    inserter_inner_builder: InserterInnerBuilder,
278    is_append_only: bool,
279    client: Option<DorisClient>,
280    row_encoder: JsonEncoder,
281}
282
283impl TryFrom<SinkParam> for DorisSink {
284    type Error = SinkError;
285
286    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
287        let schema = param.schema();
288        let pk_indices = param.downstream_pk_or_empty();
289        let config = DorisConfig::from_btreemap(param.properties)?;
290        DorisSink::new(config, schema, pk_indices, param.sink_type.is_append_only())
291    }
292}
293
294impl DorisSinkWriter {
295    pub async fn new(
296        config: DorisConfig,
297        schema: Schema,
298        pk_indices: Vec<usize>,
299        is_append_only: bool,
300    ) -> Result<Self> {
301        let mut decimal_map = HashMap::default();
302        let doris_schema = config
303            .common
304            .build_get_client()
305            .get_schema_from_doris()
306            .await?;
307        for s in &doris_schema.properties {
308            if let Some(v) = s.get_decimal_pre_scale()? {
309                decimal_map.insert(s.name.clone(), v);
310            }
311        }
312
313        let header_builder = HeaderBuilder::new()
314            .add_common_header()
315            .set_user_password(config.common.user.clone(), config.common.password.clone())
316            .add_json_format()
317            .set_partial_columns(config.common.partial_update.clone())
318            .add_read_json_by_line();
319        let header = if !is_append_only {
320            header_builder.add_hidden_column().build()
321        } else {
322            header_builder.build()
323        };
324
325        let doris_insert_builder = InserterInnerBuilder::new(
326            config.common.url.clone(),
327            config.common.database.clone(),
328            config.common.table.clone(),
329            header,
330            config.stream_load_http_timeout_ms,
331        )?;
332        Ok(Self {
333            config,
334            schema: schema.clone(),
335            pk_indices,
336            inserter_inner_builder: doris_insert_builder,
337            is_append_only,
338            client: None,
339            row_encoder: JsonEncoder::new_with_doris(schema, None, decimal_map),
340        })
341    }
342
343    async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
344        for (op, row) in chunk.rows() {
345            if op != Op::Insert {
346                continue;
347            }
348            let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string();
349            self.client
350                .as_mut()
351                .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
352                .write(row_json_string.into())
353                .await?;
354        }
355        Ok(())
356    }
357
358    async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> {
359        for (op, row) in chunk.rows() {
360            match op {
361                Op::Insert => {
362                    let mut row_json_value = self.row_encoder.encode(row)?;
363                    row_json_value
364                        .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("0".to_owned()));
365                    let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
366                        SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
367                    })?;
368                    self.client
369                        .as_mut()
370                        .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
371                        .write(row_json_string.into())
372                        .await?;
373                }
374                Op::Delete => {
375                    let mut row_json_value = self.row_encoder.encode(row)?;
376                    row_json_value
377                        .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("1".to_owned()));
378                    let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
379                        SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
380                    })?;
381                    self.client
382                        .as_mut()
383                        .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
384                        .write(row_json_string.into())
385                        .await?;
386                }
387                Op::UpdateDelete => {}
388                Op::UpdateInsert => {
389                    let mut row_json_value = self.row_encoder.encode(row)?;
390                    row_json_value
391                        .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("0".to_owned()));
392                    let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
393                        SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
394                    })?;
395                    self.client
396                        .as_mut()
397                        .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
398                        .write(row_json_string.into())
399                        .await?;
400                }
401            }
402        }
403        Ok(())
404    }
405}
406
407#[async_trait]
408impl SinkWriter for DorisSinkWriter {
409    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
410        if self.client.is_none() {
411            self.client = Some(DorisClient::new(self.inserter_inner_builder.build().await?));
412        }
413        if self.is_append_only {
414            self.append_only(chunk).await
415        } else {
416            self.upsert(chunk).await
417        }
418    }
419
420    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
421        Ok(())
422    }
423
424    async fn abort(&mut self) -> Result<()> {
425        Ok(())
426    }
427
428    async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> {
429        if self.client.is_some() {
430            let client = self
431                .client
432                .take()
433                .ok_or_else(|| SinkError::Doris("Can't find doris inserter".to_owned()))?;
434            client.finish().await?;
435        }
436        Ok(())
437    }
438}
439
440pub struct DorisSchemaClient {
441    url: String,
442    table: String,
443    db: String,
444    user: String,
445    password: String,
446}
447impl DorisSchemaClient {
448    pub fn new(url: String, table: String, db: String, user: String, password: String) -> Self {
449        Self {
450            url,
451            table,
452            db,
453            user,
454            password,
455        }
456    }
457
458    pub async fn get_schema_from_doris(&self) -> Result<DorisSchema> {
459        let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table);
460
461        let client = reqwest::Client::builder()
462            .pool_idle_timeout(POOL_IDLE_TIMEOUT)
463            .build()
464            .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
465
466        let response = client
467            .get(uri)
468            .header(
469                "Authorization",
470                format!(
471                    "Basic {}",
472                    general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password))
473                ),
474            )
475            .send()
476            .await
477            .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
478
479        let json: Value = response
480            .json()
481            .await
482            .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
483        let json_data = if json.get("code").is_some() && json.get("msg").is_some() {
484            json.get("data")
485                .ok_or_else(|| {
486                    SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data"))
487                })?
488                .clone()
489        } else {
490            json
491        };
492        let schema: DorisSchema = serde_json::from_value(json_data)
493            .context("Can't get schema from json")
494            .map_err(SinkError::DorisStarrocksConnect)?;
495        Ok(schema)
496    }
497}
498#[derive(Debug, Serialize, Deserialize)]
499pub struct DorisSchema {
500    status: i32,
501    #[serde(rename = "keysType")]
502    pub keys_type: String,
503    pub properties: Vec<DorisField>,
504}
505#[derive(Debug, Serialize, Deserialize)]
506pub struct DorisField {
507    pub name: String,
508    pub r#type: String,
509    comment: String,
510    pub precision: Option<String>,
511    pub scale: Option<String>,
512    aggregation_type: String,
513}
514impl DorisField {
515    pub fn get_decimal_pre_scale(&self) -> Result<Option<u8>> {
516        if self.r#type.contains("DECIMAL") {
517            let scale = self
518                .scale
519                .as_ref()
520                .ok_or_else(|| {
521                    SinkError::Doris(format!(
522                        "In doris, the type of {} is DECIMAL, but `scale` is not found",
523                        self.name
524                    ))
525                })?
526                .parse::<u8>()
527                .map_err(|err| {
528                    SinkError::Doris(format!(
529                        "Unable to convert decimal's scale to u8. error: {:?}",
530                        err.kind()
531                    ))
532                })?;
533            Ok(Some(scale))
534        } else {
535            Ok(None)
536        }
537    }
538}
539
540#[derive(Debug, Serialize, Deserialize)]
541pub struct DorisInsertResultResponse {
542    #[serde(rename = "TxnId")]
543    txn_id: i64,
544    #[serde(rename = "Label")]
545    label: String,
546    #[serde(rename = "Status")]
547    status: String,
548    #[serde(rename = "TwoPhaseCommit")]
549    two_phase_commit: String,
550    #[serde(rename = "Message")]
551    message: String,
552    #[serde(rename = "NumberTotalRows")]
553    number_total_rows: i64,
554    #[serde(rename = "NumberLoadedRows")]
555    number_loaded_rows: i64,
556    #[serde(rename = "NumberFilteredRows")]
557    number_filtered_rows: i32,
558    #[serde(rename = "NumberUnselectedRows")]
559    number_unselected_rows: i32,
560    #[serde(rename = "LoadBytes")]
561    load_bytes: i64,
562    #[serde(rename = "LoadTimeMs")]
563    load_time_ms: i32,
564    #[serde(rename = "BeginTxnTimeMs")]
565    begin_txn_time_ms: i32,
566    #[serde(rename = "StreamLoadPutTimeMs")]
567    stream_load_put_time_ms: i32,
568    #[serde(rename = "ReadDataTimeMs")]
569    read_data_time_ms: i32,
570    #[serde(rename = "WriteDataTimeMs")]
571    write_data_time_ms: i32,
572    #[serde(rename = "CommitAndPublishTimeMs")]
573    commit_and_publish_time_ms: i32,
574    #[serde(rename = "ErrorURL")]
575    err_url: Option<String>,
576}
577
578pub struct DorisClient {
579    insert: InserterInner,
580    is_first_record: bool,
581}
582impl DorisClient {
583    pub fn new(insert: InserterInner) -> Self {
584        Self {
585            insert,
586            is_first_record: true,
587        }
588    }
589
590    pub async fn write(&mut self, data: Bytes) -> Result<()> {
591        let mut data_build = BytesMut::new();
592        if self.is_first_record {
593            self.is_first_record = false;
594        } else {
595            data_build.put_slice("\n".as_bytes());
596        }
597        data_build.put_slice(&data);
598        self.insert.write(data_build.into()).await?;
599        Ok(())
600    }
601
602    pub async fn finish(self) -> Result<DorisInsertResultResponse> {
603        let raw = self.insert.finish().await?;
604        let res: DorisInsertResultResponse = serde_json::from_slice(&raw)
605            .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
606
607        if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) {
608            return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!(
609                "Insert error: {:?}, error url: {:?}",
610                res.message,
611                res.err_url
612            )));
613        };
614        Ok(res)
615    }
616}