risingwave_connector/sink/
doris.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::{Context, anyhow};
18use async_trait::async_trait;
19use base64::Engine;
20use base64::engine::general_purpose;
21use bytes::{BufMut, Bytes, BytesMut};
22use risingwave_common::array::{Op, StreamChunk};
23use risingwave_common::catalog::Schema;
24use risingwave_common::types::DataType;
25use serde::Deserialize;
26use serde_derive::Serialize;
27use serde_json::Value;
28use serde_with::serde_as;
29use thiserror_ext::AsReport;
30use with_options::WithOptions;
31
32use super::doris_starrocks_connector::{
33    DORIS_DELETE_SIGN, DORIS_SUCCESS_STATUS, HeaderBuilder, InserterInner, InserterInnerBuilder,
34    POOL_IDLE_TIMEOUT,
35};
36use super::{
37    Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkWriterMetrics,
38};
39use crate::sink::encoder::{JsonEncoder, RowEncoder};
40use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
41use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam};
42
43pub const DORIS_SINK: &str = "doris";
44
45#[derive(Deserialize, Debug, Clone, WithOptions)]
46pub struct DorisCommon {
47    #[serde(rename = "doris.url")]
48    pub url: String,
49    #[serde(rename = "doris.user")]
50    pub user: String,
51    #[serde(rename = "doris.password")]
52    pub password: String,
53    #[serde(rename = "doris.database")]
54    pub database: String,
55    #[serde(rename = "doris.table")]
56    pub table: String,
57    #[serde(rename = "doris.partial_update")]
58    pub partial_update: Option<String>,
59}
60
61impl DorisCommon {
62    pub(crate) fn build_get_client(&self) -> DorisSchemaClient {
63        DorisSchemaClient::new(
64            self.url.clone(),
65            self.table.clone(),
66            self.database.clone(),
67            self.user.clone(),
68            self.password.clone(),
69        )
70    }
71}
72
73#[serde_as]
74#[derive(Clone, Debug, Deserialize, WithOptions)]
75pub struct DorisConfig {
76    #[serde(flatten)]
77    pub common: DorisCommon,
78
79    pub r#type: String, // accept "append-only" or "upsert"
80}
81impl DorisConfig {
82    pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
83        let config =
84            serde_json::from_value::<DorisConfig>(serde_json::to_value(properties).unwrap())
85                .map_err(|e| SinkError::Config(anyhow!(e)))?;
86        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
87            return Err(SinkError::Config(anyhow!(
88                "`{}` must be {}, or {}",
89                SINK_TYPE_OPTION,
90                SINK_TYPE_APPEND_ONLY,
91                SINK_TYPE_UPSERT
92            )));
93        }
94        Ok(config)
95    }
96}
97
98#[derive(Debug)]
99pub struct DorisSink {
100    pub config: DorisConfig,
101    schema: Schema,
102    pk_indices: Vec<usize>,
103    is_append_only: bool,
104}
105
106impl DorisSink {
107    pub fn new(
108        config: DorisConfig,
109        schema: Schema,
110        pk_indices: Vec<usize>,
111        is_append_only: bool,
112    ) -> Result<Self> {
113        Ok(Self {
114            config,
115            schema,
116            pk_indices,
117            is_append_only,
118        })
119    }
120}
121
122impl DorisSink {
123    fn check_column_name_and_type(&self, doris_column_fields: Vec<DorisField>) -> Result<()> {
124        let doris_columns_desc: HashMap<String, String> = doris_column_fields
125            .iter()
126            .map(|s| (s.name.clone(), s.r#type.clone()))
127            .collect();
128
129        let rw_fields_name = self.schema.fields();
130        if rw_fields_name.len() > doris_columns_desc.len() {
131            return Err(SinkError::Doris(
132                "The columns of the sink must be equal to or a superset of the target table's columns.".to_owned(),
133            ));
134        }
135
136        for i in rw_fields_name {
137            let value = doris_columns_desc.get(&i.name).ok_or_else(|| {
138                SinkError::Doris(format!(
139                    "Column name don't find in doris, risingwave is {:?} ",
140                    i.name
141                ))
142            })?;
143            if !Self::check_and_correct_column_type(&i.data_type, value.to_string())? {
144                return Err(SinkError::Doris(format!(
145                    "Column type don't match, column name is {:?}. doris type is {:?} risingwave type is {:?} ",
146                    i.name, value, i.data_type
147                )));
148            }
149        }
150        Ok(())
151    }
152
153    fn check_and_correct_column_type(
154        rw_data_type: &DataType,
155        doris_data_type: String,
156    ) -> Result<bool> {
157        match rw_data_type {
158            risingwave_common::types::DataType::Boolean => Ok(doris_data_type.contains("BOOLEAN")),
159            risingwave_common::types::DataType::Int16 => Ok(doris_data_type.contains("SMALLINT")),
160            risingwave_common::types::DataType::Int32 => Ok(doris_data_type.contains("INT")),
161            risingwave_common::types::DataType::Int64 => Ok(doris_data_type.contains("BIGINT")),
162            risingwave_common::types::DataType::Float32 => Ok(doris_data_type.contains("FLOAT")),
163            risingwave_common::types::DataType::Float64 => Ok(doris_data_type.contains("DOUBLE")),
164            risingwave_common::types::DataType::Decimal => Ok(doris_data_type.contains("DECIMAL")),
165            risingwave_common::types::DataType::Date => Ok(doris_data_type.contains("DATE")),
166            risingwave_common::types::DataType::Varchar => {
167                Ok(doris_data_type.contains("STRING") | doris_data_type.contains("VARCHAR"))
168            }
169            risingwave_common::types::DataType::Time => {
170                Err(SinkError::Doris("doris can not support Time".to_owned()))
171            }
172            risingwave_common::types::DataType::Timestamp => {
173                Ok(doris_data_type.contains("DATETIME"))
174            }
175            risingwave_common::types::DataType::Timestamptz => Err(SinkError::Doris(
176                "TIMESTAMP WITH TIMEZONE is not supported for Doris sink as Doris doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_owned(),
177            )),
178            risingwave_common::types::DataType::Interval => Err(SinkError::Doris(
179                "doris can not support Interval".to_owned(),
180            )),
181            risingwave_common::types::DataType::Struct(_) => Ok(doris_data_type.contains("STRUCT")),
182            risingwave_common::types::DataType::List(_) => Ok(doris_data_type.contains("ARRAY")),
183            risingwave_common::types::DataType::Bytea => {
184                Err(SinkError::Doris("doris can not support Bytea".to_owned()))
185            }
186            risingwave_common::types::DataType::Jsonb => Ok(doris_data_type.contains("JSON")),
187            risingwave_common::types::DataType::Serial => Ok(doris_data_type.contains("BIGINT")),
188            risingwave_common::types::DataType::Int256 => {
189                Err(SinkError::Doris("doris can not support Int256".to_owned()))
190            }
191            risingwave_common::types::DataType::Map(_) => {
192                Err(SinkError::Doris("doris can not support Map".to_owned()))
193            }
194        }
195    }
196}
197
198impl Sink for DorisSink {
199    type Coordinator = DummySinkCommitCoordinator;
200    type LogSinker = LogSinkerOf<DorisSinkWriter>;
201
202    const SINK_NAME: &'static str = DORIS_SINK;
203
204    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
205        Ok(DorisSinkWriter::new(
206            self.config.clone(),
207            self.schema.clone(),
208            self.pk_indices.clone(),
209            self.is_append_only,
210        )
211        .await?
212        .into_log_sinker(SinkWriterMetrics::new(&writer_param)))
213    }
214
215    async fn validate(&self) -> Result<()> {
216        if !self.is_append_only && self.pk_indices.is_empty() {
217            return Err(SinkError::Config(anyhow!(
218                "Primary key not defined for upsert doris sink (please define in `primary_key` field)"
219            )));
220        }
221        // check reachability
222        let client = self.config.common.build_get_client();
223        let doris_schema = client.get_schema_from_doris().await?;
224
225        if !self.is_append_only && doris_schema.keys_type.ne("UNIQUE_KEYS") {
226            return Err(SinkError::Config(anyhow!(
227                "If you want to use upsert, please set the keysType of doris to UNIQUE_KEYS"
228            )));
229        }
230        self.check_column_name_and_type(doris_schema.properties)?;
231        Ok(())
232    }
233}
234
235pub struct DorisSinkWriter {
236    pub config: DorisConfig,
237    #[expect(dead_code)]
238    schema: Schema,
239    #[expect(dead_code)]
240    pk_indices: Vec<usize>,
241    inserter_inner_builder: InserterInnerBuilder,
242    is_append_only: bool,
243    client: Option<DorisClient>,
244    row_encoder: JsonEncoder,
245}
246
247impl TryFrom<SinkParam> for DorisSink {
248    type Error = SinkError;
249
250    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
251        let schema = param.schema();
252        let config = DorisConfig::from_btreemap(param.properties)?;
253        DorisSink::new(
254            config,
255            schema,
256            param.downstream_pk,
257            param.sink_type.is_append_only(),
258        )
259    }
260}
261
262impl DorisSinkWriter {
263    pub async fn new(
264        config: DorisConfig,
265        schema: Schema,
266        pk_indices: Vec<usize>,
267        is_append_only: bool,
268    ) -> Result<Self> {
269        let mut decimal_map = HashMap::default();
270        let doris_schema = config
271            .common
272            .build_get_client()
273            .get_schema_from_doris()
274            .await?;
275        for s in &doris_schema.properties {
276            if let Some(v) = s.get_decimal_pre_scale()? {
277                decimal_map.insert(s.name.clone(), v);
278            }
279        }
280
281        let header_builder = HeaderBuilder::new()
282            .add_common_header()
283            .set_user_password(config.common.user.clone(), config.common.password.clone())
284            .add_json_format()
285            .set_partial_columns(config.common.partial_update.clone())
286            .add_read_json_by_line();
287        let header = if !is_append_only {
288            header_builder.add_hidden_column().build()
289        } else {
290            header_builder.build()
291        };
292
293        let doris_insert_builder = InserterInnerBuilder::new(
294            config.common.url.clone(),
295            config.common.database.clone(),
296            config.common.table.clone(),
297            header,
298        )?;
299        Ok(Self {
300            config,
301            schema: schema.clone(),
302            pk_indices,
303            inserter_inner_builder: doris_insert_builder,
304            is_append_only,
305            client: None,
306            row_encoder: JsonEncoder::new_with_doris(schema, None, decimal_map),
307        })
308    }
309
310    async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
311        for (op, row) in chunk.rows() {
312            if op != Op::Insert {
313                continue;
314            }
315            let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string();
316            self.client
317                .as_mut()
318                .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
319                .write(row_json_string.into())
320                .await?;
321        }
322        Ok(())
323    }
324
325    async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> {
326        for (op, row) in chunk.rows() {
327            match op {
328                Op::Insert => {
329                    let mut row_json_value = self.row_encoder.encode(row)?;
330                    row_json_value
331                        .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("0".to_owned()));
332                    let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
333                        SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
334                    })?;
335                    self.client
336                        .as_mut()
337                        .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
338                        .write(row_json_string.into())
339                        .await?;
340                }
341                Op::Delete => {
342                    let mut row_json_value = self.row_encoder.encode(row)?;
343                    row_json_value
344                        .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("1".to_owned()));
345                    let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
346                        SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
347                    })?;
348                    self.client
349                        .as_mut()
350                        .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
351                        .write(row_json_string.into())
352                        .await?;
353                }
354                Op::UpdateDelete => {}
355                Op::UpdateInsert => {
356                    let mut row_json_value = self.row_encoder.encode(row)?;
357                    row_json_value
358                        .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("0".to_owned()));
359                    let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
360                        SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
361                    })?;
362                    self.client
363                        .as_mut()
364                        .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
365                        .write(row_json_string.into())
366                        .await?;
367                }
368            }
369        }
370        Ok(())
371    }
372}
373
374#[async_trait]
375impl SinkWriter for DorisSinkWriter {
376    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
377        if self.client.is_none() {
378            self.client = Some(DorisClient::new(self.inserter_inner_builder.build().await?));
379        }
380        if self.is_append_only {
381            self.append_only(chunk).await
382        } else {
383            self.upsert(chunk).await
384        }
385    }
386
387    async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
388        Ok(())
389    }
390
391    async fn abort(&mut self) -> Result<()> {
392        Ok(())
393    }
394
395    async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> {
396        if self.client.is_some() {
397            let client = self
398                .client
399                .take()
400                .ok_or_else(|| SinkError::Doris("Can't find doris inserter".to_owned()))?;
401            client.finish().await?;
402        }
403        Ok(())
404    }
405}
406
407pub struct DorisSchemaClient {
408    url: String,
409    table: String,
410    db: String,
411    user: String,
412    password: String,
413}
414impl DorisSchemaClient {
415    pub fn new(url: String, table: String, db: String, user: String, password: String) -> Self {
416        Self {
417            url,
418            table,
419            db,
420            user,
421            password,
422        }
423    }
424
425    pub async fn get_schema_from_doris(&self) -> Result<DorisSchema> {
426        let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table);
427
428        let client = reqwest::Client::builder()
429            .pool_idle_timeout(POOL_IDLE_TIMEOUT)
430            .build()
431            .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
432
433        let response = client
434            .get(uri)
435            .header(
436                "Authorization",
437                format!(
438                    "Basic {}",
439                    general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password))
440                ),
441            )
442            .send()
443            .await
444            .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
445
446        let json: Value = response
447            .json()
448            .await
449            .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
450        let json_data = if json.get("code").is_some() && json.get("msg").is_some() {
451            json.get("data")
452                .ok_or_else(|| {
453                    SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data"))
454                })?
455                .clone()
456        } else {
457            json
458        };
459        let schema: DorisSchema = serde_json::from_value(json_data)
460            .context("Can't get schema from json")
461            .map_err(SinkError::DorisStarrocksConnect)?;
462        Ok(schema)
463    }
464}
465#[derive(Debug, Serialize, Deserialize)]
466pub struct DorisSchema {
467    status: i32,
468    #[serde(rename = "keysType")]
469    pub keys_type: String,
470    pub properties: Vec<DorisField>,
471}
472#[derive(Debug, Serialize, Deserialize)]
473pub struct DorisField {
474    pub name: String,
475    pub r#type: String,
476    comment: String,
477    pub precision: Option<String>,
478    pub scale: Option<String>,
479    aggregation_type: String,
480}
481impl DorisField {
482    pub fn get_decimal_pre_scale(&self) -> Result<Option<u8>> {
483        if self.r#type.contains("DECIMAL") {
484            let scale = self
485                .scale
486                .as_ref()
487                .ok_or_else(|| {
488                    SinkError::Doris(format!(
489                        "In doris, the type of {} is DECIMAL, but `scale` is not found",
490                        self.name
491                    ))
492                })?
493                .parse::<u8>()
494                .map_err(|err| {
495                    SinkError::Doris(format!(
496                        "Unable to convert decimal's scale to u8. error: {:?}",
497                        err.kind()
498                    ))
499                })?;
500            Ok(Some(scale))
501        } else {
502            Ok(None)
503        }
504    }
505}
506
507#[derive(Debug, Serialize, Deserialize)]
508pub struct DorisInsertResultResponse {
509    #[serde(rename = "TxnId")]
510    txn_id: i64,
511    #[serde(rename = "Label")]
512    label: String,
513    #[serde(rename = "Status")]
514    status: String,
515    #[serde(rename = "TwoPhaseCommit")]
516    two_phase_commit: String,
517    #[serde(rename = "Message")]
518    message: String,
519    #[serde(rename = "NumberTotalRows")]
520    number_total_rows: i64,
521    #[serde(rename = "NumberLoadedRows")]
522    number_loaded_rows: i64,
523    #[serde(rename = "NumberFilteredRows")]
524    number_filtered_rows: i32,
525    #[serde(rename = "NumberUnselectedRows")]
526    number_unselected_rows: i32,
527    #[serde(rename = "LoadBytes")]
528    load_bytes: i64,
529    #[serde(rename = "LoadTimeMs")]
530    load_time_ms: i32,
531    #[serde(rename = "BeginTxnTimeMs")]
532    begin_txn_time_ms: i32,
533    #[serde(rename = "StreamLoadPutTimeMs")]
534    stream_load_put_time_ms: i32,
535    #[serde(rename = "ReadDataTimeMs")]
536    read_data_time_ms: i32,
537    #[serde(rename = "WriteDataTimeMs")]
538    write_data_time_ms: i32,
539    #[serde(rename = "CommitAndPublishTimeMs")]
540    commit_and_publish_time_ms: i32,
541    #[serde(rename = "ErrorURL")]
542    err_url: Option<String>,
543}
544
545pub struct DorisClient {
546    insert: InserterInner,
547    is_first_record: bool,
548}
549impl DorisClient {
550    pub fn new(insert: InserterInner) -> Self {
551        Self {
552            insert,
553            is_first_record: true,
554        }
555    }
556
557    pub async fn write(&mut self, data: Bytes) -> Result<()> {
558        let mut data_build = BytesMut::new();
559        if self.is_first_record {
560            self.is_first_record = false;
561        } else {
562            data_build.put_slice("\n".as_bytes());
563        }
564        data_build.put_slice(&data);
565        self.insert.write(data_build.into()).await?;
566        Ok(())
567    }
568
569    pub async fn finish(self) -> Result<DorisInsertResultResponse> {
570        let raw = self.insert.finish().await?;
571        let res: DorisInsertResultResponse = serde_json::from_slice(&raw)
572            .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
573
574        if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) {
575            return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!(
576                "Insert error: {:?}, error url: {:?}",
577                res.message,
578                res.err_url
579            )));
580        };
581        Ok(res)
582    }
583}