1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::{Context, anyhow};
18use async_trait::async_trait;
19use base64::Engine;
20use base64::engine::general_purpose;
21use bytes::{BufMut, Bytes, BytesMut};
22use risingwave_common::array::{Op, StreamChunk};
23use risingwave_common::catalog::Schema;
24use risingwave_common::types::DataType;
25use serde::{Deserialize, Serialize};
26use serde_json::Value;
27use serde_with::{DisplayFromStr, serde_as};
28use thiserror_ext::AsReport;
29use with_options::WithOptions;
30
31use super::doris_starrocks_connector::{
32 DORIS_DELETE_SIGN, DORIS_SUCCESS_STATUS, HeaderBuilder, InserterInner, InserterInnerBuilder,
33 POOL_IDLE_TIMEOUT,
34};
35use super::{
36 Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkWriterMetrics,
37};
38use crate::enforce_secret::EnforceSecret;
39use crate::sink::encoder::{JsonEncoder, RowEncoder};
40use crate::sink::starrocks::_default_stream_load_http_timeout_ms;
41use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
42use crate::sink::{Sink, SinkParam, SinkWriter, SinkWriterParam};
43
44pub const DORIS_SINK: &str = "doris";
45
46#[derive(Deserialize, Debug, Clone, WithOptions)]
47pub struct DorisCommon {
48 #[serde(rename = "doris.url")]
49 pub url: String,
50 #[serde(rename = "doris.user")]
51 pub user: String,
52 #[serde(rename = "doris.password")]
53 pub password: String,
54 #[serde(rename = "doris.database")]
55 pub database: String,
56 #[serde(rename = "doris.table")]
57 pub table: String,
58 #[serde(rename = "doris.partial_update")]
59 pub partial_update: Option<String>,
60}
61
62impl EnforceSecret for DorisCommon {
63 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {
64 "doris.password", "doris.user"
65 };
66}
67
68impl DorisCommon {
69 pub(crate) fn build_get_client(&self) -> DorisSchemaClient {
70 DorisSchemaClient::new(
71 self.url.clone(),
72 self.table.clone(),
73 self.database.clone(),
74 self.user.clone(),
75 self.password.clone(),
76 )
77 }
78}
79
80#[serde_as]
81#[derive(Clone, Debug, Deserialize, WithOptions)]
82pub struct DorisConfig {
83 #[serde(flatten)]
84 pub common: DorisCommon,
85
86 pub r#type: String, #[serde(
90 rename = "doris.stream_load.http.timeout.ms",
91 default = "_default_stream_load_http_timeout_ms"
92 )]
93 #[serde_as(as = "DisplayFromStr")]
94 #[with_option(allow_alter_on_fly)]
95 pub stream_load_http_timeout_ms: u64,
96}
97
98impl EnforceSecret for DorisConfig {
99 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
100 DorisCommon::enforce_one(prop)
101 }
102}
103
104impl DorisConfig {
105 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
106 let config =
107 serde_json::from_value::<DorisConfig>(serde_json::to_value(properties).unwrap())
108 .map_err(|e| SinkError::Config(anyhow!(e)))?;
109 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
110 return Err(SinkError::Config(anyhow!(
111 "`{}` must be {}, or {}",
112 SINK_TYPE_OPTION,
113 SINK_TYPE_APPEND_ONLY,
114 SINK_TYPE_UPSERT
115 )));
116 }
117 Ok(config)
118 }
119}
120
121#[derive(Debug)]
122pub struct DorisSink {
123 pub config: DorisConfig,
124 schema: Schema,
125 pk_indices: Vec<usize>,
126 is_append_only: bool,
127}
128
129impl EnforceSecret for DorisSink {
130 fn enforce_secret<'a>(
131 prop_iter: impl Iterator<Item = &'a str>,
132 ) -> crate::error::ConnectorResult<()> {
133 for prop in prop_iter {
134 DorisConfig::enforce_one(prop)?;
135 }
136 Ok(())
137 }
138}
139
140impl DorisSink {
141 pub fn new(
142 config: DorisConfig,
143 schema: Schema,
144 pk_indices: Vec<usize>,
145 is_append_only: bool,
146 ) -> Result<Self> {
147 Ok(Self {
148 config,
149 schema,
150 pk_indices,
151 is_append_only,
152 })
153 }
154}
155
156impl DorisSink {
157 fn check_column_name_and_type(&self, doris_column_fields: Vec<DorisField>) -> Result<()> {
158 let doris_columns_desc: HashMap<String, String> = doris_column_fields
159 .iter()
160 .map(|s| (s.name.clone(), s.r#type.clone()))
161 .collect();
162
163 let rw_fields_name = self.schema.fields();
164 if rw_fields_name.len() > doris_columns_desc.len() {
165 return Err(SinkError::Doris(
166 "The columns of the sink must be equal to or a superset of the target table's columns.".to_owned(),
167 ));
168 }
169
170 for i in rw_fields_name {
171 let value = doris_columns_desc.get(&i.name).ok_or_else(|| {
172 SinkError::Doris(format!(
173 "Column name don't find in doris, risingwave is {:?} ",
174 i.name
175 ))
176 })?;
177 if !Self::check_and_correct_column_type(&i.data_type, value.clone())? {
178 return Err(SinkError::Doris(format!(
179 "Column type don't match, column name is {:?}. doris type is {:?} risingwave type is {:?} ",
180 i.name, value, i.data_type
181 )));
182 }
183 }
184 Ok(())
185 }
186
187 fn check_and_correct_column_type(
188 rw_data_type: &DataType,
189 doris_data_type: String,
190 ) -> Result<bool> {
191 match rw_data_type {
192 risingwave_common::types::DataType::Boolean => Ok(doris_data_type.contains("BOOLEAN")),
193 risingwave_common::types::DataType::Int16 => Ok(doris_data_type.contains("SMALLINT")),
194 risingwave_common::types::DataType::Int32 => Ok(doris_data_type.contains("INT")),
195 risingwave_common::types::DataType::Int64 => Ok(doris_data_type.contains("BIGINT")),
196 risingwave_common::types::DataType::Float32 => Ok(doris_data_type.contains("FLOAT")),
197 risingwave_common::types::DataType::Float64 => Ok(doris_data_type.contains("DOUBLE")),
198 risingwave_common::types::DataType::Decimal => Ok(doris_data_type.contains("DECIMAL")),
199 risingwave_common::types::DataType::Date => Ok(doris_data_type.contains("DATE")),
200 risingwave_common::types::DataType::Varchar => {
201 Ok(doris_data_type.contains("STRING") | doris_data_type.contains("VARCHAR"))
202 }
203 risingwave_common::types::DataType::Time => {
204 Err(SinkError::Doris("TIME is not supported for Doris sink. Please convert to VARCHAR or other supported types.".to_owned()))
205 }
206 risingwave_common::types::DataType::Timestamp => {
207 Ok(doris_data_type.contains("DATETIME"))
208 }
209 risingwave_common::types::DataType::Timestamptz => Err(SinkError::Doris(
210 "TIMESTAMP WITH TIMEZONE is not supported for Doris sink as Doris doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_owned(),
211 )),
212 risingwave_common::types::DataType::Interval => Err(SinkError::Doris(
213 "INTERVAL is not supported for Doris sink. Please convert to VARCHAR or other supported types.".to_owned(),
214 )),
215 risingwave_common::types::DataType::Struct(_) => Ok(doris_data_type.contains("STRUCT")),
216 risingwave_common::types::DataType::List(_) => Ok(doris_data_type.contains("ARRAY")),
217 risingwave_common::types::DataType::Bytea => {
218 Err(SinkError::Doris("BYTEA is not supported for Doris sink. Please convert to VARCHAR or other supported types.".to_owned()))
219 }
220 risingwave_common::types::DataType::Jsonb => Ok(doris_data_type.contains("JSON")),
221 risingwave_common::types::DataType::Serial => Ok(doris_data_type.contains("BIGINT")),
222 risingwave_common::types::DataType::Int256 => {
223 Err(SinkError::Doris("INT256 is not supported for Doris sink.".to_owned()))
224 }
225 risingwave_common::types::DataType::Map(_) => {
226 Err(SinkError::Doris("MAP is not supported for Doris sink.".to_owned()))
227 }
228 DataType::Vector(_) => {
229 Err(SinkError::Doris("VECTOR is not supported for Doris sink.".to_owned()))
230 },
231 }
232 }
233}
234
235impl Sink for DorisSink {
236 type LogSinker = LogSinkerOf<DorisSinkWriter>;
237
238 const SINK_NAME: &'static str = DORIS_SINK;
239
240 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
241 Ok(DorisSinkWriter::new(
242 self.config.clone(),
243 self.schema.clone(),
244 self.pk_indices.clone(),
245 self.is_append_only,
246 )
247 .await?
248 .into_log_sinker(SinkWriterMetrics::new(&writer_param)))
249 }
250
251 async fn validate(&self) -> Result<()> {
252 if !self.is_append_only && self.pk_indices.is_empty() {
253 return Err(SinkError::Config(anyhow!(
254 "Primary key not defined for upsert doris sink (please define in `primary_key` field)"
255 )));
256 }
257 let client = self.config.common.build_get_client();
259 let doris_schema = client.get_schema_from_doris().await?;
260
261 if !self.is_append_only && doris_schema.keys_type.ne("UNIQUE_KEYS") {
262 return Err(SinkError::Config(anyhow!(
263 "If you want to use upsert, please set the keysType of doris to UNIQUE_KEYS"
264 )));
265 }
266 self.check_column_name_and_type(doris_schema.properties)?;
267 Ok(())
268 }
269}
270
271pub struct DorisSinkWriter {
272 pub config: DorisConfig,
273 #[expect(dead_code)]
274 schema: Schema,
275 #[expect(dead_code)]
276 pk_indices: Vec<usize>,
277 inserter_inner_builder: InserterInnerBuilder,
278 is_append_only: bool,
279 client: Option<DorisClient>,
280 row_encoder: JsonEncoder,
281}
282
283impl TryFrom<SinkParam> for DorisSink {
284 type Error = SinkError;
285
286 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
287 let schema = param.schema();
288 let pk_indices = param.downstream_pk_or_empty();
289 let config = DorisConfig::from_btreemap(param.properties)?;
290 DorisSink::new(config, schema, pk_indices, param.sink_type.is_append_only())
291 }
292}
293
294impl DorisSinkWriter {
295 pub async fn new(
296 config: DorisConfig,
297 schema: Schema,
298 pk_indices: Vec<usize>,
299 is_append_only: bool,
300 ) -> Result<Self> {
301 let mut decimal_map = HashMap::default();
302 let doris_schema = config
303 .common
304 .build_get_client()
305 .get_schema_from_doris()
306 .await?;
307 for s in &doris_schema.properties {
308 if let Some(v) = s.get_decimal_pre_scale()? {
309 decimal_map.insert(s.name.clone(), v);
310 }
311 }
312
313 let header_builder = HeaderBuilder::new()
314 .add_common_header()
315 .set_user_password(config.common.user.clone(), config.common.password.clone())
316 .add_json_format()
317 .set_partial_columns(config.common.partial_update.clone())
318 .add_read_json_by_line();
319 let header = if !is_append_only {
320 header_builder.add_hidden_column().build()
321 } else {
322 header_builder.build()
323 };
324
325 let doris_insert_builder = InserterInnerBuilder::new(
326 config.common.url.clone(),
327 config.common.database.clone(),
328 config.common.table.clone(),
329 header,
330 config.stream_load_http_timeout_ms,
331 )?;
332 Ok(Self {
333 config,
334 schema: schema.clone(),
335 pk_indices,
336 inserter_inner_builder: doris_insert_builder,
337 is_append_only,
338 client: None,
339 row_encoder: JsonEncoder::new_with_doris(schema, None, decimal_map),
340 })
341 }
342
343 async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
344 for (op, row) in chunk.rows() {
345 if op != Op::Insert {
346 continue;
347 }
348 let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string();
349 self.client
350 .as_mut()
351 .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
352 .write(row_json_string.into())
353 .await?;
354 }
355 Ok(())
356 }
357
358 async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> {
359 for (op, row) in chunk.rows() {
360 match op {
361 Op::Insert => {
362 let mut row_json_value = self.row_encoder.encode(row)?;
363 row_json_value
364 .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("0".to_owned()));
365 let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
366 SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
367 })?;
368 self.client
369 .as_mut()
370 .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
371 .write(row_json_string.into())
372 .await?;
373 }
374 Op::Delete => {
375 let mut row_json_value = self.row_encoder.encode(row)?;
376 row_json_value
377 .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("1".to_owned()));
378 let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
379 SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
380 })?;
381 self.client
382 .as_mut()
383 .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
384 .write(row_json_string.into())
385 .await?;
386 }
387 Op::UpdateDelete => {}
388 Op::UpdateInsert => {
389 let mut row_json_value = self.row_encoder.encode(row)?;
390 row_json_value
391 .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("0".to_owned()));
392 let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
393 SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
394 })?;
395 self.client
396 .as_mut()
397 .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
398 .write(row_json_string.into())
399 .await?;
400 }
401 }
402 }
403 Ok(())
404 }
405}
406
407#[async_trait]
408impl SinkWriter for DorisSinkWriter {
409 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
410 if self.client.is_none() {
411 self.client = Some(DorisClient::new(self.inserter_inner_builder.build().await?));
412 }
413 if self.is_append_only {
414 self.append_only(chunk).await
415 } else {
416 self.upsert(chunk).await
417 }
418 }
419
420 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
421 Ok(())
422 }
423
424 async fn abort(&mut self) -> Result<()> {
425 Ok(())
426 }
427
428 async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> {
429 if self.client.is_some() {
430 let client = self
431 .client
432 .take()
433 .ok_or_else(|| SinkError::Doris("Can't find doris inserter".to_owned()))?;
434 client.finish().await?;
435 }
436 Ok(())
437 }
438}
439
440pub struct DorisSchemaClient {
441 url: String,
442 table: String,
443 db: String,
444 user: String,
445 password: String,
446}
447impl DorisSchemaClient {
448 pub fn new(url: String, table: String, db: String, user: String, password: String) -> Self {
449 Self {
450 url,
451 table,
452 db,
453 user,
454 password,
455 }
456 }
457
458 pub async fn get_schema_from_doris(&self) -> Result<DorisSchema> {
459 let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table);
460
461 let client = reqwest::Client::builder()
462 .pool_idle_timeout(POOL_IDLE_TIMEOUT)
463 .build()
464 .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
465
466 let response = client
467 .get(uri)
468 .header(
469 "Authorization",
470 format!(
471 "Basic {}",
472 general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password))
473 ),
474 )
475 .send()
476 .await
477 .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
478
479 let json: Value = response
480 .json()
481 .await
482 .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
483 let json_data = if json.get("code").is_some() && json.get("msg").is_some() {
484 json.get("data")
485 .ok_or_else(|| {
486 SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data"))
487 })?
488 .clone()
489 } else {
490 json
491 };
492 let schema: DorisSchema = serde_json::from_value(json_data)
493 .context("Can't get schema from json")
494 .map_err(SinkError::DorisStarrocksConnect)?;
495 Ok(schema)
496 }
497}
498#[derive(Debug, Serialize, Deserialize)]
499pub struct DorisSchema {
500 status: i32,
501 #[serde(rename = "keysType")]
502 pub keys_type: String,
503 pub properties: Vec<DorisField>,
504}
505#[derive(Debug, Serialize, Deserialize)]
506pub struct DorisField {
507 pub name: String,
508 pub r#type: String,
509 comment: String,
510 pub precision: Option<String>,
511 pub scale: Option<String>,
512 aggregation_type: String,
513}
514impl DorisField {
515 pub fn get_decimal_pre_scale(&self) -> Result<Option<u8>> {
516 if self.r#type.contains("DECIMAL") {
517 let scale = self
518 .scale
519 .as_ref()
520 .ok_or_else(|| {
521 SinkError::Doris(format!(
522 "In doris, the type of {} is DECIMAL, but `scale` is not found",
523 self.name
524 ))
525 })?
526 .parse::<u8>()
527 .map_err(|err| {
528 SinkError::Doris(format!(
529 "Unable to convert decimal's scale to u8. error: {:?}",
530 err.kind()
531 ))
532 })?;
533 Ok(Some(scale))
534 } else {
535 Ok(None)
536 }
537 }
538}
539
540#[derive(Debug, Serialize, Deserialize)]
541pub struct DorisInsertResultResponse {
542 #[serde(rename = "TxnId")]
543 txn_id: i64,
544 #[serde(rename = "Label")]
545 label: String,
546 #[serde(rename = "Status")]
547 status: String,
548 #[serde(rename = "TwoPhaseCommit")]
549 two_phase_commit: String,
550 #[serde(rename = "Message")]
551 message: String,
552 #[serde(rename = "NumberTotalRows")]
553 number_total_rows: i64,
554 #[serde(rename = "NumberLoadedRows")]
555 number_loaded_rows: i64,
556 #[serde(rename = "NumberFilteredRows")]
557 number_filtered_rows: i32,
558 #[serde(rename = "NumberUnselectedRows")]
559 number_unselected_rows: i32,
560 #[serde(rename = "LoadBytes")]
561 load_bytes: i64,
562 #[serde(rename = "LoadTimeMs")]
563 load_time_ms: i32,
564 #[serde(rename = "BeginTxnTimeMs")]
565 begin_txn_time_ms: i32,
566 #[serde(rename = "StreamLoadPutTimeMs")]
567 stream_load_put_time_ms: i32,
568 #[serde(rename = "ReadDataTimeMs")]
569 read_data_time_ms: i32,
570 #[serde(rename = "WriteDataTimeMs")]
571 write_data_time_ms: i32,
572 #[serde(rename = "CommitAndPublishTimeMs")]
573 commit_and_publish_time_ms: i32,
574 #[serde(rename = "ErrorURL")]
575 err_url: Option<String>,
576}
577
578pub struct DorisClient {
579 insert: InserterInner,
580 is_first_record: bool,
581}
582impl DorisClient {
583 pub fn new(insert: InserterInner) -> Self {
584 Self {
585 insert,
586 is_first_record: true,
587 }
588 }
589
590 pub async fn write(&mut self, data: Bytes) -> Result<()> {
591 let mut data_build = BytesMut::new();
592 if self.is_first_record {
593 self.is_first_record = false;
594 } else {
595 data_build.put_slice("\n".as_bytes());
596 }
597 data_build.put_slice(&data);
598 self.insert.write(data_build.into()).await?;
599 Ok(())
600 }
601
602 pub async fn finish(self) -> Result<DorisInsertResultResponse> {
603 let raw = self.insert.finish().await?;
604 let res: DorisInsertResultResponse = serde_json::from_slice(&raw)
605 .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
606
607 if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) {
608 return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!(
609 "Insert error: {:?}, error url: {:?}",
610 res.message,
611 res.err_url
612 )));
613 };
614 Ok(res)
615 }
616}