1use std::assert_matches::assert_matches;
16use std::collections::HashMap;
17use std::mem;
18
19use anyhow::anyhow;
20use futures::stream::select;
21use futures::{FutureExt, TryFutureExt, TryStreamExt};
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::array::stream_chunk::StreamChunkMut;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{ColumnCatalog, Field};
27use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
28use risingwave_common_estimate_size::EstimateSize;
29use risingwave_common_estimate_size::collections::EstimatedVec;
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_connector::dispatch_sink;
32use risingwave_connector::sink::catalog::{SinkId, SinkType};
33use risingwave_connector::sink::log_store::{
34 FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
35 LogWriter, LogWriterExt, LogWriterMetrics,
36};
37use risingwave_connector::sink::{
38 GLOBAL_SINK_METRICS, LogSinker, SINK_USER_FORCE_COMPACTION, Sink, SinkImpl, SinkParam,
39 SinkWriterParam,
40};
41use risingwave_pb::stream_plan::stream_node::StreamKind;
42use thiserror_ext::AsReport;
43use tokio::select;
44use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
45use tokio::sync::oneshot;
46
47use crate::common::change_buffer::{OutputKind, output_kind};
48use crate::common::compact_chunk::{
49 InconsistencyBehavior, StreamChunkCompactor, compact_chunk_inline,
50};
51use crate::executor::prelude::*;
52pub struct SinkExecutor<F: LogStoreFactory> {
53 actor_context: ActorContextRef,
54 info: ExecutorInfo,
55 input: Executor,
56 sink: SinkImpl,
57 input_columns: Vec<ColumnCatalog>,
58 sink_param: SinkParam,
59 log_store_factory: F,
60 sink_writer_param: SinkWriterParam,
61 chunk_size: usize,
62 input_data_types: Vec<DataType>,
63 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
64 rate_limit: Option<u32>,
65}
66
67fn force_append_only(c: StreamChunk) -> StreamChunk {
69 let mut c: StreamChunkMut = c.into();
70 for (_, mut r) in c.to_rows_mut() {
71 match r.op() {
72 Op::Insert => {}
73 Op::Delete | Op::UpdateDelete => r.set_vis(false),
74 Op::UpdateInsert => r.set_op(Op::Insert),
75 }
76 }
77 c.into()
78}
79
80fn force_delete_only(c: StreamChunk) -> StreamChunk {
82 let mut c: StreamChunkMut = c.into();
83 for (_, mut r) in c.to_rows_mut() {
84 match r.op() {
85 Op::Delete => {}
86 Op::Insert | Op::UpdateInsert => r.set_vis(false),
87 Op::UpdateDelete => r.set_op(Op::Delete),
88 }
89 }
90 c.into()
91}
92
93#[derive(Clone, Copy, Debug)]
96struct NonAppendOnlyBehavior {
97 pk_specified_and_matched: bool,
102 force_compaction: bool,
104}
105
106impl NonAppendOnlyBehavior {
107 fn should_compact_in_log_reader(self) -> bool {
128 self.pk_specified_and_matched && !self.force_compaction
129 }
130
131 fn should_reorder_records(self) -> bool {
200 !self.pk_specified_and_matched || self.force_compaction
201 }
202}
203
204fn compact_output_kind(sink_type: SinkType) -> OutputKind {
206 match sink_type {
207 SinkType::Upsert => output_kind::UPSERT,
208 SinkType::Retract => output_kind::RETRACT,
209 SinkType::AppendOnly | SinkType::ForceAppendOnly => output_kind::RETRACT,
211 }
212}
213
214macro_rules! dispatch_output_kind {
216 ($sink_type:expr, $KIND:ident, $body:tt) => {
217 #[allow(unused_braces)]
218 match compact_output_kind($sink_type) {
219 output_kind::UPSERT => {
220 const KIND: OutputKind = output_kind::UPSERT;
221 $body
222 }
223 output_kind::RETRACT => {
224 const KIND: OutputKind = output_kind::RETRACT;
225 $body
226 }
227 }
228 };
229}
230
231impl<F: LogStoreFactory> SinkExecutor<F> {
232 #[allow(clippy::too_many_arguments)]
233 #[expect(clippy::unused_async)]
234 pub async fn new(
235 actor_context: ActorContextRef,
236 info: ExecutorInfo,
237 input: Executor,
238 sink_writer_param: SinkWriterParam,
239 sink: SinkImpl,
240 sink_param: SinkParam,
241 columns: Vec<ColumnCatalog>,
242 log_store_factory: F,
243 chunk_size: usize,
244 input_data_types: Vec<DataType>,
245 rate_limit: Option<u32>,
246 ) -> StreamExecutorResult<Self> {
247 let sink_input_schema: Schema = columns
248 .iter()
249 .map(|column| Field::from(&column.column_desc))
250 .collect();
251
252 if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
253 assert_eq!(sink_input_schema.data_types(), {
255 let mut data_type = info.schema.data_types();
256 data_type.remove(col_dix);
257 data_type
258 });
259 } else {
260 assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
261 }
262
263 let non_append_only_behavior = if !sink_param.sink_type.is_append_only() {
264 let stream_key = &info.stream_key;
265 let pk_specified_and_matched = (sink_param.downstream_pk.as_ref())
266 .is_some_and(|downstream_pk| stream_key.iter().all(|i| downstream_pk.contains(i)));
267 let force_compaction = sink_param
268 .properties
269 .get(SINK_USER_FORCE_COMPACTION)
270 .map(|v| v.eq_ignore_ascii_case("true"))
271 .unwrap_or(false);
272 Some(NonAppendOnlyBehavior {
273 pk_specified_and_matched,
274 force_compaction,
275 })
276 } else {
277 None
278 };
279
280 tracing::info!(
281 sink_id = %sink_param.sink_id,
282 actor_id = %actor_context.id,
283 ?non_append_only_behavior,
284 "Sink executor info"
285 );
286
287 Ok(Self {
288 actor_context,
289 info,
290 input,
291 sink,
292 input_columns: columns,
293 sink_param,
294 log_store_factory,
295 sink_writer_param,
296 chunk_size,
297 input_data_types,
298 non_append_only_behavior,
299 rate_limit,
300 })
301 }
302
303 fn execute_inner(self) -> BoxedMessageStream {
304 let sink_id = self.sink_param.sink_id;
305 let actor_id = self.actor_context.id;
306 let fragment_id = self.actor_context.fragment_id;
307
308 let stream_key = self.info.stream_key.clone();
309 let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
310 sink_id,
311 actor_id,
312 fragment_id,
313 );
314
315 let input_compact_ib = if self.input.stream_kind() == StreamKind::Upsert {
318 InconsistencyBehavior::Tolerate
319 } else {
320 InconsistencyBehavior::Panic
321 };
322
323 let input = self.input.execute();
324
325 let input = input.inspect_ok(move |msg| {
326 if let Message::Chunk(c) = msg {
327 metrics.sink_input_row_count.inc_by(c.capacity() as u64);
328 metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
329 }
330 });
331
332 let processed_input = Self::process_msg(
333 input,
334 self.sink_param.sink_type,
335 stream_key,
336 self.chunk_size,
337 self.input_data_types,
338 input_compact_ib,
339 self.sink_param.downstream_pk.clone(),
340 self.non_append_only_behavior,
341 metrics.sink_chunk_buffer_size,
342 self.sink.is_blackhole(), );
344
345 if self.sink.is_sink_into_table() {
346 processed_input.boxed()
348 } else {
349 let labels = [
350 &actor_id.to_string(),
351 &sink_id.to_string(),
352 self.sink_param.sink_name.as_str(),
353 ];
354 let log_store_first_write_epoch = GLOBAL_SINK_METRICS
355 .log_store_first_write_epoch
356 .with_guarded_label_values(&labels);
357 let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
358 .log_store_latest_write_epoch
359 .with_guarded_label_values(&labels);
360 let log_store_write_rows = GLOBAL_SINK_METRICS
361 .log_store_write_rows
362 .with_guarded_label_values(&labels);
363 let log_writer_metrics = LogWriterMetrics {
364 log_store_first_write_epoch,
365 log_store_latest_write_epoch,
366 log_store_write_rows,
367 };
368
369 let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
370 rate_limit_tx.send(self.rate_limit.into()).unwrap();
372
373 let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
374
375 self.log_store_factory
376 .build()
377 .map(move |(log_reader, log_writer)| {
378 let write_log_stream = Self::execute_write_log(
379 processed_input,
380 log_writer.monitored(log_writer_metrics),
381 actor_id,
382 sink_id,
383 rate_limit_tx,
384 rebuild_sink_tx,
385 );
386
387 let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
388 let consume_log_stream = Self::execute_consume_log(
389 *sink,
390 log_reader,
391 self.input_columns,
392 self.sink_param,
393 self.sink_writer_param,
394 self.non_append_only_behavior,
395 input_compact_ib,
396 self.actor_context,
397 rate_limit_rx,
398 rebuild_sink_rx,
399 )
400 .instrument_await(
401 await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
402 )
403 .map_ok(|never| never); consume_log_stream.boxed()
406 });
407 select(consume_log_stream_future.into_stream(), write_log_stream)
408 })
409 .into_stream()
410 .flatten()
411 .boxed()
412 }
413 }
414
415 #[try_stream(ok = Message, error = StreamExecutorError)]
416 async fn execute_write_log<W: LogWriter>(
417 input: impl MessageStream,
418 mut log_writer: W,
419 actor_id: ActorId,
420 sink_id: SinkId,
421 rate_limit_tx: UnboundedSender<RateLimit>,
422 rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
423 ) {
424 pin_mut!(input);
425 let barrier = expect_first_barrier(&mut input).await?;
426 let epoch_pair = barrier.epoch;
427 let is_pause_on_startup = barrier.is_pause_on_startup();
428 yield Message::Barrier(barrier);
430
431 log_writer.init(epoch_pair, is_pause_on_startup).await?;
432
433 let mut is_paused = false;
434
435 #[for_await]
436 for msg in input {
437 match msg? {
438 Message::Watermark(w) => yield Message::Watermark(w),
439 Message::Chunk(chunk) => {
440 assert!(
441 !is_paused,
442 "Actor {actor_id} should not receive any data after pause"
443 );
444 log_writer.write_chunk(chunk.clone()).await?;
445 yield Message::Chunk(chunk);
446 }
447 Message::Barrier(barrier) => {
448 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
449 let add_columns = barrier.as_sink_add_columns(sink_id);
450 if let Some(add_columns) = &add_columns {
451 info!(?add_columns, %sink_id, "sink receive add columns");
452 }
453 let post_flush = log_writer
454 .flush_current_epoch(
455 barrier.epoch.curr,
456 FlushCurrentEpochOptions {
457 is_checkpoint: barrier.kind.is_checkpoint(),
458 new_vnode_bitmap: update_vnode_bitmap.clone(),
459 is_stop: barrier.is_stop(actor_id),
460 add_columns,
461 },
462 )
463 .await?;
464
465 let mutation = barrier.mutation.clone();
466 yield Message::Barrier(barrier);
467 if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
468 && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
469 {
470 let (tx, rx) = oneshot::channel();
471 rebuild_sink_tx
472 .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
473 .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
474 rx.await
475 .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
476 }
477 post_flush.post_yield_barrier().await?;
478
479 if let Some(mutation) = mutation.as_deref() {
480 match mutation {
481 Mutation::Pause => {
482 log_writer.pause()?;
483 is_paused = true;
484 }
485 Mutation::Resume => {
486 log_writer.resume()?;
487 is_paused = false;
488 }
489 Mutation::Throttle(actor_to_apply) => {
490 if let Some(new_rate_limit) = actor_to_apply.get(&actor_id) {
491 tracing::info!(
492 rate_limit = new_rate_limit,
493 "received sink rate limit on actor {actor_id}"
494 );
495 if let Err(e) = rate_limit_tx.send((*new_rate_limit).into()) {
496 error!(
497 error = %e.as_report(),
498 "fail to send sink ate limit update"
499 );
500 return Err(StreamExecutorError::from(
501 e.to_report_string(),
502 ));
503 }
504 }
505 }
506 Mutation::ConnectorPropsChange(config) => {
507 if let Some(map) = config.get(&sink_id.as_raw_id())
508 && let Err(e) = rebuild_sink_tx
509 .send(RebuildSinkMessage::UpdateConfig(map.clone()))
510 {
511 error!(
512 error = %e.as_report(),
513 "fail to send sink alter props"
514 );
515 return Err(StreamExecutorError::from(e.to_report_string()));
516 }
517 }
518 _ => (),
519 }
520 }
521 }
522 }
523 }
524 }
525
526 #[allow(clippy::too_many_arguments)]
527 #[try_stream(ok = Message, error = StreamExecutorError)]
528 async fn process_msg(
529 input: impl MessageStream,
530 sink_type: SinkType,
531 stream_key: StreamKey,
532 chunk_size: usize,
533 input_data_types: Vec<DataType>,
534 input_compact_ib: InconsistencyBehavior,
535 downstream_pk: Option<Vec<usize>>,
536 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
537 sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
538 skip_compact: bool,
539 ) {
540 if let Some(b) = non_append_only_behavior
542 && b.should_reorder_records()
543 {
544 assert_matches!(sink_type, SinkType::Upsert | SinkType::Retract);
545
546 let mut chunk_buffer = EstimatedVec::new();
547 let mut watermark: Option<super::Watermark> = None;
548 #[for_await]
549 for msg in input {
550 match msg? {
551 Message::Watermark(w) => watermark = Some(w),
552 Message::Chunk(c) => {
553 chunk_buffer.push(c);
554 sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
555 }
556 Message::Barrier(barrier) => {
557 let chunks = mem::take(&mut chunk_buffer).into_inner();
558
559 let mut delete_chunks = vec![];
562 let mut insert_chunks = vec![];
563
564 for c in dispatch_output_kind!(sink_type, KIND, {
565 StreamChunkCompactor::new(stream_key.clone(), chunks)
566 .into_compacted_chunks_inline::<KIND>(input_compact_ib)
567 }) {
568 let chunk = force_delete_only(c.clone());
569 if chunk.cardinality() > 0 {
570 delete_chunks.push(chunk);
571 }
572 let chunk = force_append_only(c);
573 if chunk.cardinality() > 0 {
574 insert_chunks.push(chunk);
575 }
576 }
577 let chunks = delete_chunks
578 .into_iter()
579 .chain(insert_chunks.into_iter())
580 .collect();
581
582 if let Some(downstream_pk) = &downstream_pk {
587 let chunks = dispatch_output_kind!(sink_type, KIND, {
588 StreamChunkCompactor::new(downstream_pk.clone(), chunks)
589 .into_compacted_chunks_reconstructed::<KIND>(
590 chunk_size,
591 input_data_types.clone(),
592 InconsistencyBehavior::Warn,
595 )
596 });
597 for c in chunks {
598 yield Message::Chunk(c);
599 }
600 } else {
601 let mut chunk_builder =
602 StreamChunkBuilder::new(chunk_size, input_data_types.clone());
603 for chunk in chunks {
604 for (op, row) in chunk.rows() {
605 if let Some(c) = chunk_builder.append_row(op, row) {
606 yield Message::Chunk(c);
607 }
608 }
609 }
610
611 if let Some(c) = chunk_builder.take() {
612 yield Message::Chunk(c);
613 }
614 };
615
616 if let Some(w) = mem::take(&mut watermark) {
618 yield Message::Watermark(w)
619 }
620 yield Message::Barrier(barrier);
621 }
622 }
623 }
624 } else {
625 #[for_await]
626 for msg in input {
627 match msg? {
628 Message::Watermark(w) => yield Message::Watermark(w),
629 Message::Chunk(mut chunk) => {
630 if sink_type != SinkType::AppendOnly
634 && let Some(downstream_pk) = &downstream_pk
635 {
636 if skip_compact {
637 assert_eq!(&stream_key, downstream_pk);
640 } else {
641 chunk = dispatch_output_kind!(sink_type, KIND, {
642 compact_chunk_inline::<KIND>(
643 chunk,
644 downstream_pk,
645 input_compact_ib,
646 )
647 });
648 }
649 }
650 match sink_type {
651 SinkType::AppendOnly => yield Message::Chunk(chunk),
652 SinkType::ForceAppendOnly => {
653 yield Message::Chunk(force_append_only(chunk))
657 }
658 SinkType::Upsert | SinkType::Retract => yield Message::Chunk(chunk),
659 }
660 }
661 Message::Barrier(barrier) => {
662 yield Message::Barrier(barrier);
663 }
664 }
665 }
666 }
667 }
668
669 #[expect(clippy::too_many_arguments)]
670 async fn execute_consume_log<S: Sink, R: LogReader>(
671 mut sink: S,
672 log_reader: R,
673 columns: Vec<ColumnCatalog>,
674 mut sink_param: SinkParam,
675 mut sink_writer_param: SinkWriterParam,
676 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
677 input_compact_ib: InconsistencyBehavior,
678 actor_context: ActorContextRef,
679 rate_limit_rx: UnboundedReceiver<RateLimit>,
680 mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
681 ) -> StreamExecutorResult<!> {
682 let visible_columns = columns
683 .iter()
684 .enumerate()
685 .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
686 .collect_vec();
687
688 let labels = [
689 &actor_context.id.to_string(),
690 sink_writer_param.connector.as_str(),
691 &sink_writer_param.sink_id.to_string(),
692 sink_writer_param.sink_name.as_str(),
693 ];
694 let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
695 .log_store_reader_wait_new_future_duration_ns
696 .with_guarded_label_values(&labels);
697 let log_store_read_rows = GLOBAL_SINK_METRICS
698 .log_store_read_rows
699 .with_guarded_label_values(&labels);
700 let log_store_read_bytes = GLOBAL_SINK_METRICS
701 .log_store_read_bytes
702 .with_guarded_label_values(&labels);
703 let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
704 .log_store_latest_read_epoch
705 .with_guarded_label_values(&labels);
706 let metrics = LogReaderMetrics {
707 log_store_latest_read_epoch,
708 log_store_read_rows,
709 log_store_read_bytes,
710 log_store_reader_wait_new_future_duration_ns,
711 };
712
713 let downstream_pk = sink_param.downstream_pk.clone();
714
715 let mut log_reader = log_reader
716 .transform_chunk(move |chunk| {
717 let chunk = if let Some(b) = non_append_only_behavior
718 && b.should_compact_in_log_reader()
719 {
720 let downstream_pk = downstream_pk.as_ref().unwrap();
722 dispatch_output_kind!(sink_param.sink_type, KIND, {
723 compact_chunk_inline::<KIND>(chunk, downstream_pk, input_compact_ib)
724 })
725 } else {
726 chunk
727 };
728 if visible_columns.len() != columns.len() {
729 chunk.project(&visible_columns)
732 } else {
733 chunk
734 }
735 })
736 .monitored(metrics)
737 .rate_limited(rate_limit_rx);
738
739 log_reader.init().await?;
740 loop {
741 let future = async {
742 loop {
743 let Err(e) = sink
744 .new_log_sinker(sink_writer_param.clone())
745 .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
746 .await;
747 GLOBAL_ERROR_METRICS.user_sink_error.report([
748 "sink_executor_error".to_owned(),
749 sink_param.sink_id.to_string(),
750 sink_param.sink_name.clone(),
751 actor_context.fragment_id.to_string(),
752 ]);
753
754 if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
755 meta_client
756 .add_sink_fail_evet_log(
757 sink_writer_param.sink_id,
758 sink_writer_param.sink_name.clone(),
759 sink_writer_param.connector.clone(),
760 e.to_report_string(),
761 )
762 .await;
763 }
764
765 if F::ALLOW_REWIND {
766 match log_reader.rewind().await {
767 Ok(()) => {
768 error!(
769 error = %e.as_report(),
770 executor_id = sink_writer_param.executor_id,
771 sink_id = %sink_param.sink_id,
772 "reset log reader stream successfully after sink error"
773 );
774 Ok(())
775 }
776 Err(rewind_err) => {
777 error!(
778 error = %rewind_err.as_report(),
779 "fail to rewind log reader"
780 );
781 Err(e)
782 }
783 }
784 } else {
785 Err(e)
786 }
787 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
788 }
789 };
790 select! {
791 result = future => {
792 let Err(e): StreamExecutorResult<!> = result;
793 return Err(e);
794 }
795 result = rebuild_sink_rx.recv() => {
796 match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
797 RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
798 sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
799 if notify.send(()).is_err() {
800 warn!("failed to notify rebuild sink");
801 }
802 log_reader.init().await?;
803 },
804 RebuildSinkMessage::UpdateConfig(config) => {
805 if F::ALLOW_REWIND {
806 match log_reader.rewind().await {
807 Ok(()) => {
808 sink_param.properties.extend(config.into_iter());
809 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
810 info!(
811 executor_id = sink_writer_param.executor_id,
812 sink_id = %sink_param.sink_id,
813 "alter sink config successfully with rewind"
814 );
815 Ok(())
816 }
817 Err(rewind_err) => {
818 error!(
819 error = %rewind_err.as_report(),
820 "fail to rewind log reader for alter sink config "
821 );
822 Err(anyhow!("fail to rewind log after alter table").into())
823 }
824 }
825 } else {
826 sink_param.properties.extend(config.into_iter());
827 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
828 Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
829 }
830 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
831 },
832 }
833 }
834 }
835 }
836 }
837}
838
839enum RebuildSinkMessage {
840 RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
841 UpdateConfig(HashMap<String, String>),
842}
843
844impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
845 fn execute(self: Box<Self>) -> BoxedMessageStream {
846 self.execute_inner()
847 }
848}
849
850#[cfg(test)]
851mod test {
852 use risingwave_common::catalog::{ColumnDesc, ColumnId};
853 use risingwave_common::util::epoch::test_epoch;
854 use risingwave_connector::sink::build_sink;
855
856 use super::*;
857 use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
858 use crate::executor::test_utils::*;
859
860 #[tokio::test]
861 async fn test_force_append_only_sink() {
862 use risingwave_common::array::StreamChunkTestExt;
863 use risingwave_common::array::stream_chunk::StreamChunk;
864 use risingwave_common::types::DataType;
865
866 use crate::executor::Barrier;
867
868 let properties = maplit::btreemap! {
869 "connector".into() => "blackhole".into(),
870 "type".into() => "append-only".into(),
871 "force_append_only".into() => "true".into()
872 };
873
874 let columns = vec![
877 ColumnCatalog {
878 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
879 is_hidden: false,
880 },
881 ColumnCatalog {
882 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
883 is_hidden: false,
884 },
885 ColumnCatalog {
886 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
887 is_hidden: true,
888 },
889 ];
890 let schema: Schema = columns
891 .iter()
892 .map(|column| Field::from(column.column_desc.clone()))
893 .collect();
894 let stream_key = vec![0];
895
896 let source = MockSource::with_messages(vec![
897 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
898 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
899 " I I I
900 + 3 2 1",
901 ))),
902 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
903 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
904 " I I I
905 U- 3 2 1
906 U+ 3 4 1
907 + 5 6 7",
908 ))),
909 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
910 " I I I
911 - 5 6 7",
912 ))),
913 ])
914 .into_executor(schema.clone(), stream_key.clone());
915
916 let sink_param = SinkParam {
917 sink_id: 0.into(),
918 sink_name: "test".into(),
919 properties,
920
921 columns: columns
922 .iter()
923 .filter(|col| !col.is_hidden)
924 .map(|col| col.column_desc.clone())
925 .collect(),
926 downstream_pk: Some(stream_key.clone()),
927 sink_type: SinkType::ForceAppendOnly,
928 format_desc: None,
929 db_name: "test".into(),
930 sink_from_name: "test".into(),
931 };
932
933 let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
934
935 let sink = build_sink(sink_param.clone()).unwrap();
936
937 let sink_executor = SinkExecutor::new(
938 ActorContext::for_test(0),
939 info,
940 source,
941 SinkWriterParam::for_test(),
942 sink,
943 sink_param,
944 columns.clone(),
945 BoundedInMemLogStoreFactory::for_test(1),
946 1024,
947 vec![DataType::Int32, DataType::Int32, DataType::Int32],
948 None,
949 )
950 .await
951 .unwrap();
952
953 let mut executor = sink_executor.boxed().execute();
954
955 executor.next().await.unwrap().unwrap();
957
958 let chunk_msg = executor.next().await.unwrap().unwrap();
959 assert_eq!(
960 chunk_msg.into_chunk().unwrap().compact_vis(),
961 StreamChunk::from_pretty(
962 " I I I
963 + 3 2 1",
964 )
965 );
966
967 executor.next().await.unwrap().unwrap();
969
970 let chunk_msg = executor.next().await.unwrap().unwrap();
971 assert_eq!(
972 chunk_msg.into_chunk().unwrap().compact_vis(),
973 StreamChunk::from_pretty(
974 " I I I
975 + 3 4 1
976 + 5 6 7",
977 )
978 );
979
980 executor.next().await.unwrap().unwrap();
985 }
986
987 #[tokio::test]
988 async fn stream_key_sink_pk_mismatch_upsert() {
989 stream_key_sink_pk_mismatch(SinkType::Upsert).await;
990 }
991
992 #[tokio::test]
993 async fn stream_key_sink_pk_mismatch_retract() {
994 stream_key_sink_pk_mismatch(SinkType::Retract).await;
995 }
996
997 async fn stream_key_sink_pk_mismatch(sink_type: SinkType) {
998 use risingwave_common::array::StreamChunkTestExt;
999 use risingwave_common::array::stream_chunk::StreamChunk;
1000 use risingwave_common::types::DataType;
1001
1002 use crate::executor::Barrier;
1003
1004 let properties = maplit::btreemap! {
1005 "connector".into() => "blackhole".into(),
1006 };
1007
1008 let columns = vec![
1011 ColumnCatalog {
1012 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1013 is_hidden: false,
1014 },
1015 ColumnCatalog {
1016 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1017 is_hidden: false,
1018 },
1019 ColumnCatalog {
1020 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1021 is_hidden: true,
1022 },
1023 ];
1024 let schema: Schema = columns
1025 .iter()
1026 .map(|column| Field::from(column.column_desc.clone()))
1027 .collect();
1028
1029 let source = MockSource::with_messages(vec![
1030 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1031 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1032 " I I I
1033 + 1 1 10",
1034 ))),
1035 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1036 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1037 " I I I
1038 + 1 3 30",
1039 ))),
1040 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1041 " I I I
1042 + 1 2 20
1043 - 1 2 20",
1044 ))),
1045 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1046 " I I I
1047 - 1 1 10
1048 + 1 1 40",
1049 ))),
1050 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1051 ])
1052 .into_executor(schema.clone(), vec![0, 1]);
1053
1054 let sink_param = SinkParam {
1055 sink_id: 0.into(),
1056 sink_name: "test".into(),
1057 properties,
1058
1059 columns: columns
1060 .iter()
1061 .filter(|col| !col.is_hidden)
1062 .map(|col| col.column_desc.clone())
1063 .collect(),
1064 downstream_pk: Some(vec![0]),
1065 sink_type,
1066 format_desc: None,
1067 db_name: "test".into(),
1068 sink_from_name: "test".into(),
1069 };
1070
1071 let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1072
1073 let sink = build_sink(sink_param.clone()).unwrap();
1074
1075 let sink_executor = SinkExecutor::new(
1076 ActorContext::for_test(0),
1077 info,
1078 source,
1079 SinkWriterParam::for_test(),
1080 sink,
1081 sink_param,
1082 columns.clone(),
1083 BoundedInMemLogStoreFactory::for_test(1),
1084 1024,
1085 vec![DataType::Int64, DataType::Int64, DataType::Int64],
1086 None,
1087 )
1088 .await
1089 .unwrap();
1090
1091 let mut executor = sink_executor.boxed().execute();
1092
1093 executor.next().await.unwrap().unwrap();
1095
1096 let chunk_msg = executor.next().await.unwrap().unwrap();
1097 assert_eq!(
1098 chunk_msg.into_chunk().unwrap().compact_vis(),
1099 StreamChunk::from_pretty(
1100 " I I I
1101 + 1 1 10",
1102 )
1103 );
1104
1105 executor.next().await.unwrap().unwrap();
1107
1108 let chunk_msg = executor.next().await.unwrap().unwrap();
1109 let expected = match sink_type {
1110 SinkType::Retract => StreamChunk::from_pretty(
1111 " I I I
1112 U- 1 1 10
1113 U+ 1 1 40",
1114 ),
1115 SinkType::Upsert => StreamChunk::from_pretty(
1116 " I I I
1117 + 1 1 40", ),
1119 _ => unreachable!(),
1120 };
1121 assert_eq!(chunk_msg.into_chunk().unwrap().compact_vis(), expected);
1122
1123 executor.next().await.unwrap().unwrap();
1125 }
1126
1127 #[tokio::test]
1128 async fn test_empty_barrier_sink() {
1129 use risingwave_common::types::DataType;
1130
1131 use crate::executor::Barrier;
1132
1133 let properties = maplit::btreemap! {
1134 "connector".into() => "blackhole".into(),
1135 "type".into() => "append-only".into(),
1136 "force_append_only".into() => "true".into()
1137 };
1138 let columns = vec![
1139 ColumnCatalog {
1140 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1141 is_hidden: false,
1142 },
1143 ColumnCatalog {
1144 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1145 is_hidden: false,
1146 },
1147 ];
1148 let schema: Schema = columns
1149 .iter()
1150 .map(|column| Field::from(column.column_desc.clone()))
1151 .collect();
1152 let stream_key = vec![0];
1153
1154 let source = MockSource::with_messages(vec![
1155 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1156 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1157 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1158 ])
1159 .into_executor(schema.clone(), stream_key.clone());
1160
1161 let sink_param = SinkParam {
1162 sink_id: 0.into(),
1163 sink_name: "test".into(),
1164 properties,
1165
1166 columns: columns
1167 .iter()
1168 .filter(|col| !col.is_hidden)
1169 .map(|col| col.column_desc.clone())
1170 .collect(),
1171 downstream_pk: Some(stream_key.clone()),
1172 sink_type: SinkType::ForceAppendOnly,
1173 format_desc: None,
1174 db_name: "test".into(),
1175 sink_from_name: "test".into(),
1176 };
1177
1178 let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
1179
1180 let sink = build_sink(sink_param.clone()).unwrap();
1181
1182 let sink_executor = SinkExecutor::new(
1183 ActorContext::for_test(0),
1184 info,
1185 source,
1186 SinkWriterParam::for_test(),
1187 sink,
1188 sink_param,
1189 columns,
1190 BoundedInMemLogStoreFactory::for_test(1),
1191 1024,
1192 vec![DataType::Int64, DataType::Int64],
1193 None,
1194 )
1195 .await
1196 .unwrap();
1197
1198 let mut executor = sink_executor.boxed().execute();
1199
1200 assert_eq!(
1202 executor.next().await.unwrap().unwrap(),
1203 Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1204 );
1205
1206 assert_eq!(
1208 executor.next().await.unwrap().unwrap(),
1209 Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1210 );
1211
1212 assert_eq!(
1214 executor.next().await.unwrap().unwrap(),
1215 Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1216 );
1217 }
1218
1219 #[tokio::test]
1220 async fn test_force_compaction() {
1221 use risingwave_common::array::StreamChunkTestExt;
1222 use risingwave_common::array::stream_chunk::StreamChunk;
1223 use risingwave_common::types::DataType;
1224
1225 use crate::executor::Barrier;
1226
1227 let properties = maplit::btreemap! {
1228 "connector".into() => "blackhole".into(),
1229 "force_compaction".into() => "true".into()
1230 };
1231
1232 let columns = vec![
1235 ColumnCatalog {
1236 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1237 is_hidden: false,
1238 },
1239 ColumnCatalog {
1240 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1241 is_hidden: false,
1242 },
1243 ColumnCatalog {
1244 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1245 is_hidden: true,
1246 },
1247 ];
1248 let schema: Schema = columns
1249 .iter()
1250 .map(|column| Field::from(column.column_desc.clone()))
1251 .collect();
1252
1253 let source = MockSource::with_messages(vec![
1254 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1255 Message::Chunk(StreamChunk::from_pretty(
1256 " I I I
1257 + 1 1 10",
1258 )),
1259 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1260 Message::Chunk(StreamChunk::from_pretty(
1261 " I I I
1262 + 1 3 30",
1263 )),
1264 Message::Chunk(StreamChunk::from_pretty(
1265 " I I I
1266 + 1 2 20
1267 - 1 2 20
1268 + 1 4 10",
1269 )),
1270 Message::Chunk(StreamChunk::from_pretty(
1271 " I I I
1272 - 1 1 10
1273 + 1 1 40",
1274 )),
1275 Message::Chunk(StreamChunk::from_pretty(
1276 " I I I
1277 - 1 4 30",
1278 )),
1279 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1280 ])
1281 .into_executor(schema.clone(), vec![0, 1]);
1282
1283 let sink_param = SinkParam {
1284 sink_id: 0.into(),
1285 sink_name: "test".into(),
1286 properties,
1287
1288 columns: columns
1289 .iter()
1290 .filter(|col| !col.is_hidden)
1291 .map(|col| col.column_desc.clone())
1292 .collect(),
1293 downstream_pk: Some(vec![0, 1]),
1294 sink_type: SinkType::Upsert,
1295 format_desc: None,
1296 db_name: "test".into(),
1297 sink_from_name: "test".into(),
1298 };
1299
1300 let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1301
1302 let sink = build_sink(sink_param.clone()).unwrap();
1303
1304 let sink_executor = SinkExecutor::new(
1305 ActorContext::for_test(0),
1306 info,
1307 source,
1308 SinkWriterParam::for_test(),
1309 sink,
1310 sink_param,
1311 columns.clone(),
1312 BoundedInMemLogStoreFactory::for_test(1),
1313 1024,
1314 vec![DataType::Int64, DataType::Int64, DataType::Int64],
1315 None,
1316 )
1317 .await
1318 .unwrap();
1319
1320 let mut executor = sink_executor.boxed().execute();
1321
1322 executor.next().await.unwrap().unwrap();
1324
1325 let chunk_msg = executor.next().await.unwrap().unwrap();
1326 assert_eq!(
1327 chunk_msg.into_chunk().unwrap().compact_vis(),
1328 StreamChunk::from_pretty(
1329 " I I I
1330 + 1 1 10",
1331 )
1332 );
1333
1334 executor.next().await.unwrap().unwrap();
1336
1337 let chunk_msg = executor.next().await.unwrap().unwrap();
1338 assert_eq!(
1339 chunk_msg.into_chunk().unwrap().compact_vis(),
1340 StreamChunk::from_pretty(
1341 " I I I
1342 + 1 3 30
1343 + 1 1 40",
1344 )
1345 );
1346
1347 executor.next().await.unwrap().unwrap();
1349 }
1350}