1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::{Context, anyhow};
18use async_trait::async_trait;
19use base64::Engine;
20use base64::engine::general_purpose;
21use bytes::{BufMut, Bytes, BytesMut};
22use risingwave_common::array::{Op, StreamChunk};
23use risingwave_common::catalog::Schema;
24use risingwave_common::types::DataType;
25use serde::Deserialize;
26use serde_derive::Serialize;
27use serde_json::Value;
28use serde_with::serde_as;
29use thiserror_ext::AsReport;
30use with_options::WithOptions;
31
32use super::doris_starrocks_connector::{
33 DORIS_DELETE_SIGN, DORIS_SUCCESS_STATUS, HeaderBuilder, InserterInner, InserterInnerBuilder,
34 POOL_IDLE_TIMEOUT,
35};
36use super::{
37 Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkWriterMetrics,
38};
39use crate::enforce_secret::EnforceSecret;
40use crate::sink::encoder::{JsonEncoder, RowEncoder};
41use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
42use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam};
43
44pub const DORIS_SINK: &str = "doris";
45
46#[derive(Deserialize, Debug, Clone, WithOptions)]
47pub struct DorisCommon {
48 #[serde(rename = "doris.url")]
49 pub url: String,
50 #[serde(rename = "doris.user")]
51 pub user: String,
52 #[serde(rename = "doris.password")]
53 pub password: String,
54 #[serde(rename = "doris.database")]
55 pub database: String,
56 #[serde(rename = "doris.table")]
57 pub table: String,
58 #[serde(rename = "doris.partial_update")]
59 pub partial_update: Option<String>,
60}
61
62impl EnforceSecret for DorisCommon {
63 const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {
64 "doris.password", "doris.user"
65 };
66}
67
68impl DorisCommon {
69 pub(crate) fn build_get_client(&self) -> DorisSchemaClient {
70 DorisSchemaClient::new(
71 self.url.clone(),
72 self.table.clone(),
73 self.database.clone(),
74 self.user.clone(),
75 self.password.clone(),
76 )
77 }
78}
79
80#[serde_as]
81#[derive(Clone, Debug, Deserialize, WithOptions)]
82pub struct DorisConfig {
83 #[serde(flatten)]
84 pub common: DorisCommon,
85
86 pub r#type: String, }
88
89impl EnforceSecret for DorisConfig {
90 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
91 DorisCommon::enforce_one(prop)
92 }
93}
94
95impl DorisConfig {
96 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
97 let config =
98 serde_json::from_value::<DorisConfig>(serde_json::to_value(properties).unwrap())
99 .map_err(|e| SinkError::Config(anyhow!(e)))?;
100 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
101 return Err(SinkError::Config(anyhow!(
102 "`{}` must be {}, or {}",
103 SINK_TYPE_OPTION,
104 SINK_TYPE_APPEND_ONLY,
105 SINK_TYPE_UPSERT
106 )));
107 }
108 Ok(config)
109 }
110}
111
112#[derive(Debug)]
113pub struct DorisSink {
114 pub config: DorisConfig,
115 schema: Schema,
116 pk_indices: Vec<usize>,
117 is_append_only: bool,
118}
119
120impl EnforceSecret for DorisSink {
121 fn enforce_secret<'a>(
122 prop_iter: impl Iterator<Item = &'a str>,
123 ) -> crate::error::ConnectorResult<()> {
124 for prop in prop_iter {
125 DorisConfig::enforce_one(prop)?;
126 }
127 Ok(())
128 }
129}
130
131impl DorisSink {
132 pub fn new(
133 config: DorisConfig,
134 schema: Schema,
135 pk_indices: Vec<usize>,
136 is_append_only: bool,
137 ) -> Result<Self> {
138 Ok(Self {
139 config,
140 schema,
141 pk_indices,
142 is_append_only,
143 })
144 }
145}
146
147impl DorisSink {
148 fn check_column_name_and_type(&self, doris_column_fields: Vec<DorisField>) -> Result<()> {
149 let doris_columns_desc: HashMap<String, String> = doris_column_fields
150 .iter()
151 .map(|s| (s.name.clone(), s.r#type.clone()))
152 .collect();
153
154 let rw_fields_name = self.schema.fields();
155 if rw_fields_name.len() > doris_columns_desc.len() {
156 return Err(SinkError::Doris(
157 "The columns of the sink must be equal to or a superset of the target table's columns.".to_owned(),
158 ));
159 }
160
161 for i in rw_fields_name {
162 let value = doris_columns_desc.get(&i.name).ok_or_else(|| {
163 SinkError::Doris(format!(
164 "Column name don't find in doris, risingwave is {:?} ",
165 i.name
166 ))
167 })?;
168 if !Self::check_and_correct_column_type(&i.data_type, value.to_string())? {
169 return Err(SinkError::Doris(format!(
170 "Column type don't match, column name is {:?}. doris type is {:?} risingwave type is {:?} ",
171 i.name, value, i.data_type
172 )));
173 }
174 }
175 Ok(())
176 }
177
178 fn check_and_correct_column_type(
179 rw_data_type: &DataType,
180 doris_data_type: String,
181 ) -> Result<bool> {
182 match rw_data_type {
183 risingwave_common::types::DataType::Boolean => Ok(doris_data_type.contains("BOOLEAN")),
184 risingwave_common::types::DataType::Int16 => Ok(doris_data_type.contains("SMALLINT")),
185 risingwave_common::types::DataType::Int32 => Ok(doris_data_type.contains("INT")),
186 risingwave_common::types::DataType::Int64 => Ok(doris_data_type.contains("BIGINT")),
187 risingwave_common::types::DataType::Float32 => Ok(doris_data_type.contains("FLOAT")),
188 risingwave_common::types::DataType::Float64 => Ok(doris_data_type.contains("DOUBLE")),
189 risingwave_common::types::DataType::Decimal => Ok(doris_data_type.contains("DECIMAL")),
190 risingwave_common::types::DataType::Date => Ok(doris_data_type.contains("DATE")),
191 risingwave_common::types::DataType::Varchar => {
192 Ok(doris_data_type.contains("STRING") | doris_data_type.contains("VARCHAR"))
193 }
194 risingwave_common::types::DataType::Time => {
195 Err(SinkError::Doris("doris can not support Time".to_owned()))
196 }
197 risingwave_common::types::DataType::Timestamp => {
198 Ok(doris_data_type.contains("DATETIME"))
199 }
200 risingwave_common::types::DataType::Timestamptz => Err(SinkError::Doris(
201 "TIMESTAMP WITH TIMEZONE is not supported for Doris sink as Doris doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_owned(),
202 )),
203 risingwave_common::types::DataType::Interval => Err(SinkError::Doris(
204 "doris can not support Interval".to_owned(),
205 )),
206 risingwave_common::types::DataType::Struct(_) => Ok(doris_data_type.contains("STRUCT")),
207 risingwave_common::types::DataType::List(_) => Ok(doris_data_type.contains("ARRAY")),
208 risingwave_common::types::DataType::Bytea => {
209 Err(SinkError::Doris("doris can not support Bytea".to_owned()))
210 }
211 risingwave_common::types::DataType::Jsonb => Ok(doris_data_type.contains("JSON")),
212 risingwave_common::types::DataType::Serial => Ok(doris_data_type.contains("BIGINT")),
213 risingwave_common::types::DataType::Int256 => {
214 Err(SinkError::Doris("doris can not support Int256".to_owned()))
215 }
216 risingwave_common::types::DataType::Map(_) => {
217 Err(SinkError::Doris("doris can not support Map".to_owned()))
218 }
219 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
220 }
221 }
222}
223
224impl Sink for DorisSink {
225 type Coordinator = DummySinkCommitCoordinator;
226 type LogSinker = LogSinkerOf<DorisSinkWriter>;
227
228 const SINK_NAME: &'static str = DORIS_SINK;
229
230 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
231 Ok(DorisSinkWriter::new(
232 self.config.clone(),
233 self.schema.clone(),
234 self.pk_indices.clone(),
235 self.is_append_only,
236 )
237 .await?
238 .into_log_sinker(SinkWriterMetrics::new(&writer_param)))
239 }
240
241 async fn validate(&self) -> Result<()> {
242 if !self.is_append_only && self.pk_indices.is_empty() {
243 return Err(SinkError::Config(anyhow!(
244 "Primary key not defined for upsert doris sink (please define in `primary_key` field)"
245 )));
246 }
247 let client = self.config.common.build_get_client();
249 let doris_schema = client.get_schema_from_doris().await?;
250
251 if !self.is_append_only && doris_schema.keys_type.ne("UNIQUE_KEYS") {
252 return Err(SinkError::Config(anyhow!(
253 "If you want to use upsert, please set the keysType of doris to UNIQUE_KEYS"
254 )));
255 }
256 self.check_column_name_and_type(doris_schema.properties)?;
257 Ok(())
258 }
259}
260
261pub struct DorisSinkWriter {
262 pub config: DorisConfig,
263 #[expect(dead_code)]
264 schema: Schema,
265 #[expect(dead_code)]
266 pk_indices: Vec<usize>,
267 inserter_inner_builder: InserterInnerBuilder,
268 is_append_only: bool,
269 client: Option<DorisClient>,
270 row_encoder: JsonEncoder,
271}
272
273impl TryFrom<SinkParam> for DorisSink {
274 type Error = SinkError;
275
276 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
277 let schema = param.schema();
278 let config = DorisConfig::from_btreemap(param.properties)?;
279 DorisSink::new(
280 config,
281 schema,
282 param.downstream_pk,
283 param.sink_type.is_append_only(),
284 )
285 }
286}
287
288impl DorisSinkWriter {
289 pub async fn new(
290 config: DorisConfig,
291 schema: Schema,
292 pk_indices: Vec<usize>,
293 is_append_only: bool,
294 ) -> Result<Self> {
295 let mut decimal_map = HashMap::default();
296 let doris_schema = config
297 .common
298 .build_get_client()
299 .get_schema_from_doris()
300 .await?;
301 for s in &doris_schema.properties {
302 if let Some(v) = s.get_decimal_pre_scale()? {
303 decimal_map.insert(s.name.clone(), v);
304 }
305 }
306
307 let header_builder = HeaderBuilder::new()
308 .add_common_header()
309 .set_user_password(config.common.user.clone(), config.common.password.clone())
310 .add_json_format()
311 .set_partial_columns(config.common.partial_update.clone())
312 .add_read_json_by_line();
313 let header = if !is_append_only {
314 header_builder.add_hidden_column().build()
315 } else {
316 header_builder.build()
317 };
318
319 let doris_insert_builder = InserterInnerBuilder::new(
320 config.common.url.clone(),
321 config.common.database.clone(),
322 config.common.table.clone(),
323 header,
324 )?;
325 Ok(Self {
326 config,
327 schema: schema.clone(),
328 pk_indices,
329 inserter_inner_builder: doris_insert_builder,
330 is_append_only,
331 client: None,
332 row_encoder: JsonEncoder::new_with_doris(schema, None, decimal_map),
333 })
334 }
335
336 async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
337 for (op, row) in chunk.rows() {
338 if op != Op::Insert {
339 continue;
340 }
341 let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string();
342 self.client
343 .as_mut()
344 .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
345 .write(row_json_string.into())
346 .await?;
347 }
348 Ok(())
349 }
350
351 async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> {
352 for (op, row) in chunk.rows() {
353 match op {
354 Op::Insert => {
355 let mut row_json_value = self.row_encoder.encode(row)?;
356 row_json_value
357 .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("0".to_owned()));
358 let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
359 SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
360 })?;
361 self.client
362 .as_mut()
363 .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
364 .write(row_json_string.into())
365 .await?;
366 }
367 Op::Delete => {
368 let mut row_json_value = self.row_encoder.encode(row)?;
369 row_json_value
370 .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("1".to_owned()));
371 let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
372 SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
373 })?;
374 self.client
375 .as_mut()
376 .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
377 .write(row_json_string.into())
378 .await?;
379 }
380 Op::UpdateDelete => {}
381 Op::UpdateInsert => {
382 let mut row_json_value = self.row_encoder.encode(row)?;
383 row_json_value
384 .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("0".to_owned()));
385 let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
386 SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
387 })?;
388 self.client
389 .as_mut()
390 .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
391 .write(row_json_string.into())
392 .await?;
393 }
394 }
395 }
396 Ok(())
397 }
398}
399
400#[async_trait]
401impl SinkWriter for DorisSinkWriter {
402 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
403 if self.client.is_none() {
404 self.client = Some(DorisClient::new(self.inserter_inner_builder.build().await?));
405 }
406 if self.is_append_only {
407 self.append_only(chunk).await
408 } else {
409 self.upsert(chunk).await
410 }
411 }
412
413 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
414 Ok(())
415 }
416
417 async fn abort(&mut self) -> Result<()> {
418 Ok(())
419 }
420
421 async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> {
422 if self.client.is_some() {
423 let client = self
424 .client
425 .take()
426 .ok_or_else(|| SinkError::Doris("Can't find doris inserter".to_owned()))?;
427 client.finish().await?;
428 }
429 Ok(())
430 }
431}
432
433pub struct DorisSchemaClient {
434 url: String,
435 table: String,
436 db: String,
437 user: String,
438 password: String,
439}
440impl DorisSchemaClient {
441 pub fn new(url: String, table: String, db: String, user: String, password: String) -> Self {
442 Self {
443 url,
444 table,
445 db,
446 user,
447 password,
448 }
449 }
450
451 pub async fn get_schema_from_doris(&self) -> Result<DorisSchema> {
452 let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table);
453
454 let client = reqwest::Client::builder()
455 .pool_idle_timeout(POOL_IDLE_TIMEOUT)
456 .build()
457 .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
458
459 let response = client
460 .get(uri)
461 .header(
462 "Authorization",
463 format!(
464 "Basic {}",
465 general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password))
466 ),
467 )
468 .send()
469 .await
470 .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
471
472 let json: Value = response
473 .json()
474 .await
475 .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
476 let json_data = if json.get("code").is_some() && json.get("msg").is_some() {
477 json.get("data")
478 .ok_or_else(|| {
479 SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data"))
480 })?
481 .clone()
482 } else {
483 json
484 };
485 let schema: DorisSchema = serde_json::from_value(json_data)
486 .context("Can't get schema from json")
487 .map_err(SinkError::DorisStarrocksConnect)?;
488 Ok(schema)
489 }
490}
491#[derive(Debug, Serialize, Deserialize)]
492pub struct DorisSchema {
493 status: i32,
494 #[serde(rename = "keysType")]
495 pub keys_type: String,
496 pub properties: Vec<DorisField>,
497}
498#[derive(Debug, Serialize, Deserialize)]
499pub struct DorisField {
500 pub name: String,
501 pub r#type: String,
502 comment: String,
503 pub precision: Option<String>,
504 pub scale: Option<String>,
505 aggregation_type: String,
506}
507impl DorisField {
508 pub fn get_decimal_pre_scale(&self) -> Result<Option<u8>> {
509 if self.r#type.contains("DECIMAL") {
510 let scale = self
511 .scale
512 .as_ref()
513 .ok_or_else(|| {
514 SinkError::Doris(format!(
515 "In doris, the type of {} is DECIMAL, but `scale` is not found",
516 self.name
517 ))
518 })?
519 .parse::<u8>()
520 .map_err(|err| {
521 SinkError::Doris(format!(
522 "Unable to convert decimal's scale to u8. error: {:?}",
523 err.kind()
524 ))
525 })?;
526 Ok(Some(scale))
527 } else {
528 Ok(None)
529 }
530 }
531}
532
533#[derive(Debug, Serialize, Deserialize)]
534pub struct DorisInsertResultResponse {
535 #[serde(rename = "TxnId")]
536 txn_id: i64,
537 #[serde(rename = "Label")]
538 label: String,
539 #[serde(rename = "Status")]
540 status: String,
541 #[serde(rename = "TwoPhaseCommit")]
542 two_phase_commit: String,
543 #[serde(rename = "Message")]
544 message: String,
545 #[serde(rename = "NumberTotalRows")]
546 number_total_rows: i64,
547 #[serde(rename = "NumberLoadedRows")]
548 number_loaded_rows: i64,
549 #[serde(rename = "NumberFilteredRows")]
550 number_filtered_rows: i32,
551 #[serde(rename = "NumberUnselectedRows")]
552 number_unselected_rows: i32,
553 #[serde(rename = "LoadBytes")]
554 load_bytes: i64,
555 #[serde(rename = "LoadTimeMs")]
556 load_time_ms: i32,
557 #[serde(rename = "BeginTxnTimeMs")]
558 begin_txn_time_ms: i32,
559 #[serde(rename = "StreamLoadPutTimeMs")]
560 stream_load_put_time_ms: i32,
561 #[serde(rename = "ReadDataTimeMs")]
562 read_data_time_ms: i32,
563 #[serde(rename = "WriteDataTimeMs")]
564 write_data_time_ms: i32,
565 #[serde(rename = "CommitAndPublishTimeMs")]
566 commit_and_publish_time_ms: i32,
567 #[serde(rename = "ErrorURL")]
568 err_url: Option<String>,
569}
570
571pub struct DorisClient {
572 insert: InserterInner,
573 is_first_record: bool,
574}
575impl DorisClient {
576 pub fn new(insert: InserterInner) -> Self {
577 Self {
578 insert,
579 is_first_record: true,
580 }
581 }
582
583 pub async fn write(&mut self, data: Bytes) -> Result<()> {
584 let mut data_build = BytesMut::new();
585 if self.is_first_record {
586 self.is_first_record = false;
587 } else {
588 data_build.put_slice("\n".as_bytes());
589 }
590 data_build.put_slice(&data);
591 self.insert.write(data_build.into()).await?;
592 Ok(())
593 }
594
595 pub async fn finish(self) -> Result<DorisInsertResultResponse> {
596 let raw = self.insert.finish().await?;
597 let res: DorisInsertResultResponse = serde_json::from_slice(&raw)
598 .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
599
600 if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) {
601 return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!(
602 "Insert error: {:?}, error url: {:?}",
603 res.message,
604 res.err_url
605 )));
606 };
607 Ok(res)
608 }
609}