1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::{Context, anyhow};
18use async_trait::async_trait;
19use base64::Engine;
20use base64::engine::general_purpose;
21use bytes::{BufMut, Bytes, BytesMut};
22use risingwave_common::array::{Op, StreamChunk};
23use risingwave_common::catalog::Schema;
24use risingwave_common::types::DataType;
25use serde::Deserialize;
26use serde_derive::Serialize;
27use serde_json::Value;
28use serde_with::serde_as;
29use thiserror_ext::AsReport;
30use with_options::WithOptions;
31
32use super::doris_starrocks_connector::{
33 DORIS_DELETE_SIGN, DORIS_SUCCESS_STATUS, HeaderBuilder, InserterInner, InserterInnerBuilder,
34 POOL_IDLE_TIMEOUT,
35};
36use super::{
37 Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkWriterMetrics,
38};
39use crate::sink::encoder::{JsonEncoder, RowEncoder};
40use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
41use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam};
42
43pub const DORIS_SINK: &str = "doris";
44
45#[derive(Deserialize, Debug, Clone, WithOptions)]
46pub struct DorisCommon {
47 #[serde(rename = "doris.url")]
48 pub url: String,
49 #[serde(rename = "doris.user")]
50 pub user: String,
51 #[serde(rename = "doris.password")]
52 pub password: String,
53 #[serde(rename = "doris.database")]
54 pub database: String,
55 #[serde(rename = "doris.table")]
56 pub table: String,
57 #[serde(rename = "doris.partial_update")]
58 pub partial_update: Option<String>,
59}
60
61impl DorisCommon {
62 pub(crate) fn build_get_client(&self) -> DorisSchemaClient {
63 DorisSchemaClient::new(
64 self.url.clone(),
65 self.table.clone(),
66 self.database.clone(),
67 self.user.clone(),
68 self.password.clone(),
69 )
70 }
71}
72
73#[serde_as]
74#[derive(Clone, Debug, Deserialize, WithOptions)]
75pub struct DorisConfig {
76 #[serde(flatten)]
77 pub common: DorisCommon,
78
79 pub r#type: String, }
81impl DorisConfig {
82 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
83 let config =
84 serde_json::from_value::<DorisConfig>(serde_json::to_value(properties).unwrap())
85 .map_err(|e| SinkError::Config(anyhow!(e)))?;
86 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
87 return Err(SinkError::Config(anyhow!(
88 "`{}` must be {}, or {}",
89 SINK_TYPE_OPTION,
90 SINK_TYPE_APPEND_ONLY,
91 SINK_TYPE_UPSERT
92 )));
93 }
94 Ok(config)
95 }
96}
97
98#[derive(Debug)]
99pub struct DorisSink {
100 pub config: DorisConfig,
101 schema: Schema,
102 pk_indices: Vec<usize>,
103 is_append_only: bool,
104}
105
106impl DorisSink {
107 pub fn new(
108 config: DorisConfig,
109 schema: Schema,
110 pk_indices: Vec<usize>,
111 is_append_only: bool,
112 ) -> Result<Self> {
113 Ok(Self {
114 config,
115 schema,
116 pk_indices,
117 is_append_only,
118 })
119 }
120}
121
122impl DorisSink {
123 fn check_column_name_and_type(&self, doris_column_fields: Vec<DorisField>) -> Result<()> {
124 let doris_columns_desc: HashMap<String, String> = doris_column_fields
125 .iter()
126 .map(|s| (s.name.clone(), s.r#type.clone()))
127 .collect();
128
129 let rw_fields_name = self.schema.fields();
130 if rw_fields_name.len() > doris_columns_desc.len() {
131 return Err(SinkError::Doris(
132 "The columns of the sink must be equal to or a superset of the target table's columns.".to_owned(),
133 ));
134 }
135
136 for i in rw_fields_name {
137 let value = doris_columns_desc.get(&i.name).ok_or_else(|| {
138 SinkError::Doris(format!(
139 "Column name don't find in doris, risingwave is {:?} ",
140 i.name
141 ))
142 })?;
143 if !Self::check_and_correct_column_type(&i.data_type, value.to_string())? {
144 return Err(SinkError::Doris(format!(
145 "Column type don't match, column name is {:?}. doris type is {:?} risingwave type is {:?} ",
146 i.name, value, i.data_type
147 )));
148 }
149 }
150 Ok(())
151 }
152
153 fn check_and_correct_column_type(
154 rw_data_type: &DataType,
155 doris_data_type: String,
156 ) -> Result<bool> {
157 match rw_data_type {
158 risingwave_common::types::DataType::Boolean => Ok(doris_data_type.contains("BOOLEAN")),
159 risingwave_common::types::DataType::Int16 => Ok(doris_data_type.contains("SMALLINT")),
160 risingwave_common::types::DataType::Int32 => Ok(doris_data_type.contains("INT")),
161 risingwave_common::types::DataType::Int64 => Ok(doris_data_type.contains("BIGINT")),
162 risingwave_common::types::DataType::Float32 => Ok(doris_data_type.contains("FLOAT")),
163 risingwave_common::types::DataType::Float64 => Ok(doris_data_type.contains("DOUBLE")),
164 risingwave_common::types::DataType::Decimal => Ok(doris_data_type.contains("DECIMAL")),
165 risingwave_common::types::DataType::Date => Ok(doris_data_type.contains("DATE")),
166 risingwave_common::types::DataType::Varchar => {
167 Ok(doris_data_type.contains("STRING") | doris_data_type.contains("VARCHAR"))
168 }
169 risingwave_common::types::DataType::Time => {
170 Err(SinkError::Doris("doris can not support Time".to_owned()))
171 }
172 risingwave_common::types::DataType::Timestamp => {
173 Ok(doris_data_type.contains("DATETIME"))
174 }
175 risingwave_common::types::DataType::Timestamptz => Err(SinkError::Doris(
176 "TIMESTAMP WITH TIMEZONE is not supported for Doris sink as Doris doesn't store time values with timezone information. Please convert to TIMESTAMP first.".to_owned(),
177 )),
178 risingwave_common::types::DataType::Interval => Err(SinkError::Doris(
179 "doris can not support Interval".to_owned(),
180 )),
181 risingwave_common::types::DataType::Struct(_) => Ok(doris_data_type.contains("STRUCT")),
182 risingwave_common::types::DataType::List(_) => Ok(doris_data_type.contains("ARRAY")),
183 risingwave_common::types::DataType::Bytea => {
184 Err(SinkError::Doris("doris can not support Bytea".to_owned()))
185 }
186 risingwave_common::types::DataType::Jsonb => Ok(doris_data_type.contains("JSON")),
187 risingwave_common::types::DataType::Serial => Ok(doris_data_type.contains("BIGINT")),
188 risingwave_common::types::DataType::Int256 => {
189 Err(SinkError::Doris("doris can not support Int256".to_owned()))
190 }
191 risingwave_common::types::DataType::Map(_) => {
192 Err(SinkError::Doris("doris can not support Map".to_owned()))
193 }
194 }
195 }
196}
197
198impl Sink for DorisSink {
199 type Coordinator = DummySinkCommitCoordinator;
200 type LogSinker = LogSinkerOf<DorisSinkWriter>;
201
202 const SINK_NAME: &'static str = DORIS_SINK;
203
204 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
205 Ok(DorisSinkWriter::new(
206 self.config.clone(),
207 self.schema.clone(),
208 self.pk_indices.clone(),
209 self.is_append_only,
210 )
211 .await?
212 .into_log_sinker(SinkWriterMetrics::new(&writer_param)))
213 }
214
215 async fn validate(&self) -> Result<()> {
216 if !self.is_append_only && self.pk_indices.is_empty() {
217 return Err(SinkError::Config(anyhow!(
218 "Primary key not defined for upsert doris sink (please define in `primary_key` field)"
219 )));
220 }
221 let client = self.config.common.build_get_client();
223 let doris_schema = client.get_schema_from_doris().await?;
224
225 if !self.is_append_only && doris_schema.keys_type.ne("UNIQUE_KEYS") {
226 return Err(SinkError::Config(anyhow!(
227 "If you want to use upsert, please set the keysType of doris to UNIQUE_KEYS"
228 )));
229 }
230 self.check_column_name_and_type(doris_schema.properties)?;
231 Ok(())
232 }
233}
234
235pub struct DorisSinkWriter {
236 pub config: DorisConfig,
237 #[expect(dead_code)]
238 schema: Schema,
239 #[expect(dead_code)]
240 pk_indices: Vec<usize>,
241 inserter_inner_builder: InserterInnerBuilder,
242 is_append_only: bool,
243 client: Option<DorisClient>,
244 row_encoder: JsonEncoder,
245}
246
247impl TryFrom<SinkParam> for DorisSink {
248 type Error = SinkError;
249
250 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
251 let schema = param.schema();
252 let config = DorisConfig::from_btreemap(param.properties)?;
253 DorisSink::new(
254 config,
255 schema,
256 param.downstream_pk,
257 param.sink_type.is_append_only(),
258 )
259 }
260}
261
262impl DorisSinkWriter {
263 pub async fn new(
264 config: DorisConfig,
265 schema: Schema,
266 pk_indices: Vec<usize>,
267 is_append_only: bool,
268 ) -> Result<Self> {
269 let mut decimal_map = HashMap::default();
270 let doris_schema = config
271 .common
272 .build_get_client()
273 .get_schema_from_doris()
274 .await?;
275 for s in &doris_schema.properties {
276 if let Some(v) = s.get_decimal_pre_scale()? {
277 decimal_map.insert(s.name.clone(), v);
278 }
279 }
280
281 let header_builder = HeaderBuilder::new()
282 .add_common_header()
283 .set_user_password(config.common.user.clone(), config.common.password.clone())
284 .add_json_format()
285 .set_partial_columns(config.common.partial_update.clone())
286 .add_read_json_by_line();
287 let header = if !is_append_only {
288 header_builder.add_hidden_column().build()
289 } else {
290 header_builder.build()
291 };
292
293 let doris_insert_builder = InserterInnerBuilder::new(
294 config.common.url.clone(),
295 config.common.database.clone(),
296 config.common.table.clone(),
297 header,
298 )?;
299 Ok(Self {
300 config,
301 schema: schema.clone(),
302 pk_indices,
303 inserter_inner_builder: doris_insert_builder,
304 is_append_only,
305 client: None,
306 row_encoder: JsonEncoder::new_with_doris(schema, None, decimal_map),
307 })
308 }
309
310 async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
311 for (op, row) in chunk.rows() {
312 if op != Op::Insert {
313 continue;
314 }
315 let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string();
316 self.client
317 .as_mut()
318 .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
319 .write(row_json_string.into())
320 .await?;
321 }
322 Ok(())
323 }
324
325 async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> {
326 for (op, row) in chunk.rows() {
327 match op {
328 Op::Insert => {
329 let mut row_json_value = self.row_encoder.encode(row)?;
330 row_json_value
331 .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("0".to_owned()));
332 let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
333 SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
334 })?;
335 self.client
336 .as_mut()
337 .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
338 .write(row_json_string.into())
339 .await?;
340 }
341 Op::Delete => {
342 let mut row_json_value = self.row_encoder.encode(row)?;
343 row_json_value
344 .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("1".to_owned()));
345 let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
346 SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
347 })?;
348 self.client
349 .as_mut()
350 .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
351 .write(row_json_string.into())
352 .await?;
353 }
354 Op::UpdateDelete => {}
355 Op::UpdateInsert => {
356 let mut row_json_value = self.row_encoder.encode(row)?;
357 row_json_value
358 .insert(DORIS_DELETE_SIGN.to_owned(), Value::String("0".to_owned()));
359 let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| {
360 SinkError::Doris(format!("Json derialize error: {}", e.as_report()))
361 })?;
362 self.client
363 .as_mut()
364 .ok_or_else(|| SinkError::Doris("Can't find doris sink insert".to_owned()))?
365 .write(row_json_string.into())
366 .await?;
367 }
368 }
369 }
370 Ok(())
371 }
372}
373
374#[async_trait]
375impl SinkWriter for DorisSinkWriter {
376 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
377 if self.client.is_none() {
378 self.client = Some(DorisClient::new(self.inserter_inner_builder.build().await?));
379 }
380 if self.is_append_only {
381 self.append_only(chunk).await
382 } else {
383 self.upsert(chunk).await
384 }
385 }
386
387 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
388 Ok(())
389 }
390
391 async fn abort(&mut self) -> Result<()> {
392 Ok(())
393 }
394
395 async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> {
396 if self.client.is_some() {
397 let client = self
398 .client
399 .take()
400 .ok_or_else(|| SinkError::Doris("Can't find doris inserter".to_owned()))?;
401 client.finish().await?;
402 }
403 Ok(())
404 }
405}
406
407pub struct DorisSchemaClient {
408 url: String,
409 table: String,
410 db: String,
411 user: String,
412 password: String,
413}
414impl DorisSchemaClient {
415 pub fn new(url: String, table: String, db: String, user: String, password: String) -> Self {
416 Self {
417 url,
418 table,
419 db,
420 user,
421 password,
422 }
423 }
424
425 pub async fn get_schema_from_doris(&self) -> Result<DorisSchema> {
426 let uri = format!("{}/api/{}/{}/_schema", self.url, self.db, self.table);
427
428 let client = reqwest::Client::builder()
429 .pool_idle_timeout(POOL_IDLE_TIMEOUT)
430 .build()
431 .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
432
433 let response = client
434 .get(uri)
435 .header(
436 "Authorization",
437 format!(
438 "Basic {}",
439 general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password))
440 ),
441 )
442 .send()
443 .await
444 .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
445
446 let json: Value = response
447 .json()
448 .await
449 .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
450 let json_data = if json.get("code").is_some() && json.get("msg").is_some() {
451 json.get("data")
452 .ok_or_else(|| {
453 SinkError::DorisStarrocksConnect(anyhow::anyhow!("Can't find data"))
454 })?
455 .clone()
456 } else {
457 json
458 };
459 let schema: DorisSchema = serde_json::from_value(json_data)
460 .context("Can't get schema from json")
461 .map_err(SinkError::DorisStarrocksConnect)?;
462 Ok(schema)
463 }
464}
465#[derive(Debug, Serialize, Deserialize)]
466pub struct DorisSchema {
467 status: i32,
468 #[serde(rename = "keysType")]
469 pub keys_type: String,
470 pub properties: Vec<DorisField>,
471}
472#[derive(Debug, Serialize, Deserialize)]
473pub struct DorisField {
474 pub name: String,
475 pub r#type: String,
476 comment: String,
477 pub precision: Option<String>,
478 pub scale: Option<String>,
479 aggregation_type: String,
480}
481impl DorisField {
482 pub fn get_decimal_pre_scale(&self) -> Result<Option<u8>> {
483 if self.r#type.contains("DECIMAL") {
484 let scale = self
485 .scale
486 .as_ref()
487 .ok_or_else(|| {
488 SinkError::Doris(format!(
489 "In doris, the type of {} is DECIMAL, but `scale` is not found",
490 self.name
491 ))
492 })?
493 .parse::<u8>()
494 .map_err(|err| {
495 SinkError::Doris(format!(
496 "Unable to convert decimal's scale to u8. error: {:?}",
497 err.kind()
498 ))
499 })?;
500 Ok(Some(scale))
501 } else {
502 Ok(None)
503 }
504 }
505}
506
507#[derive(Debug, Serialize, Deserialize)]
508pub struct DorisInsertResultResponse {
509 #[serde(rename = "TxnId")]
510 txn_id: i64,
511 #[serde(rename = "Label")]
512 label: String,
513 #[serde(rename = "Status")]
514 status: String,
515 #[serde(rename = "TwoPhaseCommit")]
516 two_phase_commit: String,
517 #[serde(rename = "Message")]
518 message: String,
519 #[serde(rename = "NumberTotalRows")]
520 number_total_rows: i64,
521 #[serde(rename = "NumberLoadedRows")]
522 number_loaded_rows: i64,
523 #[serde(rename = "NumberFilteredRows")]
524 number_filtered_rows: i32,
525 #[serde(rename = "NumberUnselectedRows")]
526 number_unselected_rows: i32,
527 #[serde(rename = "LoadBytes")]
528 load_bytes: i64,
529 #[serde(rename = "LoadTimeMs")]
530 load_time_ms: i32,
531 #[serde(rename = "BeginTxnTimeMs")]
532 begin_txn_time_ms: i32,
533 #[serde(rename = "StreamLoadPutTimeMs")]
534 stream_load_put_time_ms: i32,
535 #[serde(rename = "ReadDataTimeMs")]
536 read_data_time_ms: i32,
537 #[serde(rename = "WriteDataTimeMs")]
538 write_data_time_ms: i32,
539 #[serde(rename = "CommitAndPublishTimeMs")]
540 commit_and_publish_time_ms: i32,
541 #[serde(rename = "ErrorURL")]
542 err_url: Option<String>,
543}
544
545pub struct DorisClient {
546 insert: InserterInner,
547 is_first_record: bool,
548}
549impl DorisClient {
550 pub fn new(insert: InserterInner) -> Self {
551 Self {
552 insert,
553 is_first_record: true,
554 }
555 }
556
557 pub async fn write(&mut self, data: Bytes) -> Result<()> {
558 let mut data_build = BytesMut::new();
559 if self.is_first_record {
560 self.is_first_record = false;
561 } else {
562 data_build.put_slice("\n".as_bytes());
563 }
564 data_build.put_slice(&data);
565 self.insert.write(data_build.into()).await?;
566 Ok(())
567 }
568
569 pub async fn finish(self) -> Result<DorisInsertResultResponse> {
570 let raw = self.insert.finish().await?;
571 let res: DorisInsertResultResponse = serde_json::from_slice(&raw)
572 .map_err(|err| SinkError::DorisStarrocksConnect(err.into()))?;
573
574 if !DORIS_SUCCESS_STATUS.contains(&res.status.as_str()) {
575 return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!(
576 "Insert error: {:?}, error url: {:?}",
577 res.message,
578 res.err_url
579 )));
580 };
581 Ok(res)
582 }
583}