1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::{Context, anyhow};
18use async_trait::async_trait;
19use base64::Engine;
20use base64::engine::general_purpose;
21use bytes::{BufMut, Bytes, BytesMut};
22use risingwave_common::array::{Op, StreamChunk};
23use risingwave_common::catalog::Schema;
24use risingwave_common::types::DataType;
25use serde::Deserialize;
26use serde_derive::Serialize;
27use serde_json::Value;
28use serde_with::serde_as;
29use thiserror_ext::AsReport;
30use with_options::WithOptions;
31
32use super::doris_starrocks_connector::{
33 DORIS_DELETE_SIGN, DORIS_SUCCESS_STATUS, HeaderBuilder, InserterInner, InserterInnerBuilder,
34 POOL_IDLE_TIMEOUT,
35};
36use super::{
37 Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkWriterMetrics,
38};
39use crate::enforce_secret::EnforceSecret;
40use crate::sink::encoder::{JsonEncoder, RowEncoder};
41use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
42use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam};
43
44pub const DORIS_SINK: &str = "doris";
45
46#[derive(Deserialize, Debug, Clone, WithOptions)]
47pub struct DorisCommon {
48 #[serde(rename = "doris.url")]
49 pub url: String,
50 #[serde(rename = "doris.user")]
51 pub user: String,
52 #[serde(rename = "doris.password")]
53 pub password: String,
54 #[serde(rename = "doris.database")]
55 pub database: String,
56 #[serde(rename = "doris.table")]
57 pub table: String,
58 #[serde(rename = "doris.partial_update")]
59 pub partial_update: Option<String>,
60}
61
62impl EnforceSecret for DorisCommon {
63 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {
64 "doris.password", "doris.user"
65 };
66}
67
68impl DorisCommon {
69 pub(crate) fn build_get_client(&self) -> DorisSchemaClient {
70 DorisSchemaClient::new(
71 self.url.clone(),
72 self.table.clone(),
73 self.database.clone(),
74 self.user.clone(),
75 self.password.clone(),
76 )
77 }
78}
79
80#[serde_as]
81#[derive(Clone, Debug, Deserialize, WithOptions)]
82pub struct DorisConfig {
83 #[serde(flatten)]
84 pub common: DorisCommon,
85
86 pub r#type: String, }
88
89impl EnforceSecret for DorisConfig {
90 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
91 DorisCommon::enforce_one(prop)
92 }
93}
94
95impl DorisConfig {
96 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
97 let config =
98 serde_json::from_value::<DorisConfig>(serde_json::to_value(properties).unwrap())
99 .map_err(|e| SinkError::Config(anyhow!(e)))?;
100 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
101 return Err(SinkError::Config(anyhow!(
102 "`{}` must be {}, or {}",
103 SINK_TYPE_OPTION,
104 SINK_TYPE_APPEND_ONLY,
105 SINK_TYPE_UPSERT
106 )));
107 }
108 Ok(config)
109 }
110}
111
112#[derive(Debug)]
113pub struct DorisSink {
114 pub config: DorisConfig,
115 schema: Schema,
116 pk_indices: Vec<usize>,
117 is_append_only: bool,
118}
119
120impl EnforceSecret for DorisSink {
121 fn enforce_secret<'a>(
122 prop_iter: impl Iterator<Item = &'a str>,
123 ) -> crate::error::ConnectorResult<()> {
124 for prop in prop_iter {
125 DorisConfig::enforce_one(prop)?;
126 }
127 Ok(())
128 }
129}
130
131impl DorisSink {
132 pub fn new(
133 config: DorisConfig,
134 schema: Schema,
135 pk_indices: Vec<usize>,
136 is_append_only: bool,
137 ) -> Result<Self> {
138 Ok(Self {
139 config,
140 schema,
141 pk_indices,
142 is_append_only,
143 })
144 }
145}
146
147impl DorisSink {
148 fn check_column_name_and_type(&self, doris_column_fields: Vec<DorisField>) -> Result<()> {
149 let doris_columns_desc: HashMap<String, String> = doris_column_fields
150 .iter()
151 .map(|s| (s.name.clone(), s.r#type.clone()))
152 .collect();
153
154 let rw_fields_name = self.schema.fields();
155 if rw_fields_name.len() > doris_columns_desc.len() {
156 return Err(SinkError::Doris(
157 "The columns of the sink must be equal to or a superset of the target table's columns.".to_owned(),
158 ));
159 }
160
161 for i in rw_fields_name {
162 let value = doris_columns_desc.get(&i.name).ok_or_else(|| {
163 SinkError::Doris(format!(
164 "Column name don't find in doris, risingwave is {:?} ",
165 i.name
166 ))
167 })?;
168 if !Self::check_and_correct_column_type(&i.data_type, value.to_string())? {
169 return Err(SinkError::Doris(format!(
170 "Column type don't match, column name is {:?}. doris type is {:?} risingwave type is {:?} ",
171 i.name, value, i.data_type
172 )));
173 }
174 }
175 Ok(())
176 }
177
178 fn check_and_correct_column_type(
179 rw_data_type: &DataType,
180 doris_data_type: String,
181 ) -> Result<bool> {
182 match rw_data_type {
183 risingwave_common::types::DataType::Boolean => Ok(doris_data_type.contains("BOOLEAN")),
184 risingwave_common::types::DataType::Int16 => Ok(doris_data_type.contains("SMALLINT")),
185 risingwave_common::types::DataType::Int32 => Ok(doris_data_type.contains("INT")),
186 risingwave_common::types::DataType::Int64 => Ok(doris_data_type.contains("BIGINT")),
187 risingwave_common::types::DataType::Float32 => Ok(doris_data_type.contains("FLOAT")),
188 risingwave_common::types::DataType::Float64 => Ok(doris_data_type.contains("DOUBLE")),
189 risingwave_common::types::DataType::Decimal => Ok(doris_data_type.contains("DECIMAL")),
190 risingwave_common::types::DataType::Date => Ok(doris_data_type.contains("DATE")),
191 risingwave_common::types::DataType::Varchar => {
192 Ok(doris_data_type.contains("STRING") | doris_data_type.contains("VARCHAR"))
193 }
194 risingwave_common::types::DataType::Time => {
195 Err(SinkError::Doris("doris can not support Time".to_owned()))
196 }
197 risingwave_common::types::DataType::Timestamp => {
198 Ok(doris_data_type.contains("DATETIME"))
199 }
200 risingwave_common::types::DataType::Timestamptz => Err(SinkError::Doris(
201 "TIMESTAMP WITH TIMEZONE is not supported for Doris sink as Doris doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_owned(),
202 )),
203 risingwave_common::types::DataType::Interval => Err(SinkError::Doris(
204 "doris can not support Interval".to_owned(),
205 )),
206 risingwave_common::types::DataType::Struct(_) => Ok(doris_data_type.contains("STRUCT")),
207 risingwave_common::types::DataType::List(_) => Ok(doris_data_type.contains("ARRAY")),
208 risingwave_common::types::DataType::Bytea => {
209 Err(SinkError::Doris("doris can not support Bytea".to_owned()))
210 }
211 risingwave_common::types::DataType::Jsonb => Ok(doris_data_type.contains("JSON")),
212 risingwave_common::types::DataType::Serial => Ok(doris_data_type.contains("BIGINT")),
213 risingwave_common::types::DataType::Int256 => {
214 Err(SinkError::Doris("doris can not support Int256".to_owned()))
215 }
216 risingwave_common::types::DataType::Map(_) => {
217 Err(SinkError::Doris("doris can not support Map".to_owned()))
218 }
219 }
220 }
221}
222
223impl Sink for DorisSink {
224 type Coordinator = DummySinkCommitCoordinator;
225 type LogSinker = LogSinkerOf<DorisSinkWriter>;
226
227 const SINK_NAME: &'static str = DORIS_SINK;
228
229 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
230 Ok(DorisSinkWriter::new(
231 self.config.clone(),
232 self.schema.clone(),
233 self.pk_indices.clone(),
234 self.is_append_only,
235 )
236 .await?
237 .into_log_sinker(SinkWriterMetrics::new(&writer_param)))
238 }
239
240 async fn validate(&self) -> Result<()> {
241 if !self.is_append_only && self.pk_indices.is_empty() {
242 return Err(SinkError::Config(anyhow!(
243 "Primary key not defined for upsert doris sink (please define in `primary_key` field)"
244 )));
245 }
246 let client = self.config.common.build_get_client();
248 let doris_schema = client.get_schema_from_doris().await?;
249
250 if !self.is_append_only && doris_schema.keys_type.ne("UNIQUE_KEYS") {
251 return Err(SinkError::Config(anyhow!(
252 "If you want to use upsert, please set the keysType of doris to UNIQUE_KEYS"
253 )));
254 }
255 self.check_column_name_and_type(doris_schema.properties)?;
256 Ok(())
257 }
258}
259
260pub struct DorisSinkWriter {
261 pub config: DorisConfig,
262 #[expect(dead_code)]
263 schema: Schema,
264 #[expect(dead_code)]
265 pk_indices: Vec<usize>,
266 inserter_inner_builder: InserterInnerBuilder,
267 is_append_only: bool,
268 client: Option<DorisClient>,
269 row_encoder: JsonEncoder,
270}
271
272impl TryFrom<SinkParam> for DorisSink {
273 type Error = SinkError;
274
275 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
276 let schema = param.schema();
277 let config = DorisConfig::from_btreemap(param.properties)?;
278 DorisSink::new(
279 config,
280 schema,
281 param.downstream_pk,
282 param.sink_type.is_append_only(),
283 )
284 }
285}
286
287impl DorisSinkWriter {
288 pub async fn new(
289 config: DorisConfig,
290 schema: Schema,
291 pk_indices: Vec<usize>,
292 is_append_only: bool,
293 ) -> Result<Self> {
294 let mut decimal_map = HashMap::default();
295 let doris_schema = config
296 .common
297 .build_get_client()
298 .get_schema_from_doris()
299 .await?;
300 for s in &doris_schema.properties {
301 if let Some(v) = s.get_decimal_pre_scale()? {
302 decimal_map.insert(s.name.clone(), v);
303 }
304 }
305
306 let header_builder = HeaderBuilder::new()
307 .add_common_header()
308 .set_user_password(config.common.user.clone(), config.common.password.clone())
309 .add_json_format()
310 .set_partial_columns(config.common.partial_update.clone())
311 .add_read_json_by_line();
312 let header = if !is_append_only {
313 header_builder.add_hidden_column().build()
314 } else {
315 header_builder.build()
316 };
317
318 let doris_insert_builder = InserterInnerBuilder::new(
319 config.common.url.clone(),
320 config.common.database.clone(),
321 config.common.table.clone(),
322 header,
323 )?;
324 Ok(Self {
325 config,
326 schema: schema.clone(),
327 pk_indices,
328 inserter_inner_builder: doris_insert_builder,
329 is_append_only,
330 client: None,
331 row_encoder: JsonEncoder::new_with_doris(schema, None, decimal_map),
332 })
333 }
334
335 async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
336 for (op, row) in chunk.rows() {
337 if op != Op::Insert {
338 continue;
339 }
340 let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string();
341 self.client
342 .as_mut()
343 .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
344 .write(row_json_string.into())
345 .await?;
346 }
347 Ok(())
348 }
349
350 async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> {
351 for (op, row) in chunk.rows() {
352 match op {
353 Op::Insert => {
354 let mut row_json_value = self.row_encoder.encode(row)?;
355 row_json_value
356 .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("0".to_owned()));
357 let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
358 SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
359 })?;
360 self.client
361 .as_mut()
362 .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
363 .write(row_json_string.into())
364 .await?;
365 }
366 Op::Delete => {
367 let mut row_json_value = self.row_encoder.encode(row)?;
368 row_json_value
369 .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("1".to_owned()));
370 let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
371 SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
372 })?;
373 self.client
374 .as_mut()
375 .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
376 .write(row_json_string.into())
377 .await?;
378 }
379 Op::UpdateDelete => {}
380 Op::UpdateInsert => {
381 let mut row_json_value = self.row_encoder.encode(row)?;
382 row_json_value
383 .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("0".to_owned()));
384 let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
385 SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
386 })?;
387 self.client
388 .as_mut()
389 .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
390 .write(row_json_string.into())
391 .await?;
392 }
393 }
394 }
395 Ok(())
396 }
397}
398
399#[async_trait]
400impl SinkWriter for DorisSinkWriter {
401 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
402 if self.client.is_none() {
403 self.client = Some(DorisClient::new(self.inserter_inner_builder.build().await?));
404 }
405 if self.is_append_only {
406 self.append_only(chunk).await
407 } else {
408 self.upsert(chunk).await
409 }
410 }
411
412 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
413 Ok(())
414 }
415
416 async fn abort(&mut self) -> Result<()> {
417 Ok(())
418 }
419
420 async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> {
421 if self.client.is_some() {
422 let client = self
423 .client
424 .take()
425 .ok_or_else(|| SinkError::Doris("Can't find doris inserter".to_owned()))?;
426 client.finish().await?;
427 }
428 Ok(())
429 }
430}
431
432pub struct DorisSchemaClient {
433 url: String,
434 table: String,
435 db: String,
436 user: String,
437 password: String,
438}
439impl DorisSchemaClient {
440 pub fn new(url: String, table: String, db: String, user: String, password: String) -> Self {
441 Self {
442 url,
443 table,
444 db,
445 user,
446 password,
447 }
448 }
449
450 pub async fn get_schema_from_doris(&self) -> Result<DorisSchema> {
451 let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table);
452
453 let client = reqwest::Client::builder()
454 .pool_idle_timeout(POOL_IDLE_TIMEOUT)
455 .build()
456 .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
457
458 let response = client
459 .get(uri)
460 .header(
461 "Authorization",
462 format!(
463 "Basic {}",
464 general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password))
465 ),
466 )
467 .send()
468 .await
469 .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
470
471 let json: Value = response
472 .json()
473 .await
474 .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
475 let json_data = if json.get("code").is_some() && json.get("msg").is_some() {
476 json.get("data")
477 .ok_or_else(|| {
478 SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data"))
479 })?
480 .clone()
481 } else {
482 json
483 };
484 let schema: DorisSchema = serde_json::from_value(json_data)
485 .context("Can't get schema from json")
486 .map_err(SinkError::DorisStarrocksConnect)?;
487 Ok(schema)
488 }
489}
490#[derive(Debug, Serialize, Deserialize)]
491pub struct DorisSchema {
492 status: i32,
493 #[serde(rename = "keysType")]
494 pub keys_type: String,
495 pub properties: Vec<DorisField>,
496}
497#[derive(Debug, Serialize, Deserialize)]
498pub struct DorisField {
499 pub name: String,
500 pub r#type: String,
501 comment: String,
502 pub precision: Option<String>,
503 pub scale: Option<String>,
504 aggregation_type: String,
505}
506impl DorisField {
507 pub fn get_decimal_pre_scale(&self) -> Result<Option<u8>> {
508 if self.r#type.contains("DECIMAL") {
509 let scale = self
510 .scale
511 .as_ref()
512 .ok_or_else(|| {
513 SinkError::Doris(format!(
514 "In doris, the type of {} is DECIMAL, but `scale` is not found",
515 self.name
516 ))
517 })?
518 .parse::<u8>()
519 .map_err(|err| {
520 SinkError::Doris(format!(
521 "Unable to convert decimal's scale to u8. error: {:?}",
522 err.kind()
523 ))
524 })?;
525 Ok(Some(scale))
526 } else {
527 Ok(None)
528 }
529 }
530}
531
532#[derive(Debug, Serialize, Deserialize)]
533pub struct DorisInsertResultResponse {
534 #[serde(rename = "TxnId")]
535 txn_id: i64,
536 #[serde(rename = "Label")]
537 label: String,
538 #[serde(rename = "Status")]
539 status: String,
540 #[serde(rename = "TwoPhaseCommit")]
541 two_phase_commit: String,
542 #[serde(rename = "Message")]
543 message: String,
544 #[serde(rename = "NumberTotalRows")]
545 number_total_rows: i64,
546 #[serde(rename = "NumberLoadedRows")]
547 number_loaded_rows: i64,
548 #[serde(rename = "NumberFilteredRows")]
549 number_filtered_rows: i32,
550 #[serde(rename = "NumberUnselectedRows")]
551 number_unselected_rows: i32,
552 #[serde(rename = "LoadBytes")]
553 load_bytes: i64,
554 #[serde(rename = "LoadTimeMs")]
555 load_time_ms: i32,
556 #[serde(rename = "BeginTxnTimeMs")]
557 begin_txn_time_ms: i32,
558 #[serde(rename = "StreamLoadPutTimeMs")]
559 stream_load_put_time_ms: i32,
560 #[serde(rename = "ReadDataTimeMs")]
561 read_data_time_ms: i32,
562 #[serde(rename = "WriteDataTimeMs")]
563 write_data_time_ms: i32,
564 #[serde(rename = "CommitAndPublishTimeMs")]
565 commit_and_publish_time_ms: i32,
566 #[serde(rename = "ErrorURL")]
567 err_url: Option<String>,
568}
569
570pub struct DorisClient {
571 insert: InserterInner,
572 is_first_record: bool,
573}
574impl DorisClient {
575 pub fn new(insert: InserterInner) -> Self {
576 Self {
577 insert,
578 is_first_record: true,
579 }
580 }
581
582 pub async fn write(&mut self, data: Bytes) -> Result<()> {
583 let mut data_build = BytesMut::new();
584 if self.is_first_record {
585 self.is_first_record = false;
586 } else {
587 data_build.put_slice("\n".as_bytes());
588 }
589 data_build.put_slice(&data);
590 self.insert.write(data_build.into()).await?;
591 Ok(())
592 }
593
594 pub async fn finish(self) -> Result<DorisInsertResultResponse> {
595 let raw = self.insert.finish().await?;
596 let res: DorisInsertResultResponse = serde_json::from_slice(&raw)
597 .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
598
599 if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) {
600 return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!(
601 "Insert error: {:?}, error url: {:?}",
602 res.message,
603 res.err_url
604 )));
605 };
606 Ok(res)
607 }
608}