1use std::assert_matches::assert_matches;
16use std::collections::HashMap;
17use std::mem;
18
19use anyhow::anyhow;
20use futures::stream::select;
21use futures::{FutureExt, TryFutureExt, TryStreamExt};
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::array::stream_chunk::StreamChunkMut;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{ColumnCatalog, Field};
27use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
28use risingwave_common_estimate_size::EstimateSize;
29use risingwave_common_estimate_size::collections::EstimatedVec;
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_connector::dispatch_sink;
32use risingwave_connector::sink::catalog::{SinkId, SinkType};
33use risingwave_connector::sink::log_store::{
34 FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
35 LogWriter, LogWriterExt, LogWriterMetrics,
36};
37use risingwave_connector::sink::{
38 GLOBAL_SINK_METRICS, LogSinker, SINK_USER_FORCE_COMPACTION, Sink, SinkImpl, SinkParam,
39 SinkWriterParam,
40};
41use risingwave_pb::id::FragmentId;
42use risingwave_pb::stream_plan::stream_node::StreamKind;
43use thiserror_ext::AsReport;
44use tokio::select;
45use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
46use tokio::sync::oneshot;
47
48use crate::common::change_buffer::{OutputKind, output_kind};
49use crate::common::compact_chunk::{
50 InconsistencyBehavior, StreamChunkCompactor, compact_chunk_inline,
51};
52use crate::executor::prelude::*;
53pub struct SinkExecutor<F: LogStoreFactory> {
54 actor_context: ActorContextRef,
55 info: ExecutorInfo,
56 input: Executor,
57 sink: SinkImpl,
58 input_columns: Vec<ColumnCatalog>,
59 sink_param: SinkParam,
60 log_store_factory: F,
61 sink_writer_param: SinkWriterParam,
62 chunk_size: usize,
63 input_data_types: Vec<DataType>,
64 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
65 rate_limit: Option<u32>,
66}
67
68fn force_append_only(c: StreamChunk) -> StreamChunk {
70 let mut c: StreamChunkMut = c.into();
71 for (_, mut r) in c.to_rows_mut() {
72 match r.op() {
73 Op::Insert => {}
74 Op::Delete | Op::UpdateDelete => r.set_vis(false),
75 Op::UpdateInsert => r.set_op(Op::Insert),
76 }
77 }
78 c.into()
79}
80
81fn force_delete_only(c: StreamChunk) -> StreamChunk {
83 let mut c: StreamChunkMut = c.into();
84 for (_, mut r) in c.to_rows_mut() {
85 match r.op() {
86 Op::Delete => {}
87 Op::Insert | Op::UpdateInsert => r.set_vis(false),
88 Op::UpdateDelete => r.set_op(Op::Delete),
89 }
90 }
91 c.into()
92}
93
94#[derive(Clone, Copy, Debug)]
97struct NonAppendOnlyBehavior {
98 pk_specified_and_matched: bool,
103 force_compaction: bool,
105}
106
107impl NonAppendOnlyBehavior {
108 fn should_compact_in_log_reader(self) -> bool {
129 self.pk_specified_and_matched && !self.force_compaction
130 }
131
132 fn should_reorder_records(self) -> bool {
201 !self.pk_specified_and_matched || self.force_compaction
202 }
203}
204
205fn compact_output_kind(sink_type: SinkType) -> OutputKind {
207 match sink_type {
208 SinkType::Upsert => output_kind::UPSERT,
209 SinkType::Retract => output_kind::RETRACT,
210 SinkType::AppendOnly => output_kind::RETRACT,
212 }
213}
214
215macro_rules! dispatch_output_kind {
217 ($sink_type:expr, $KIND:ident, $body:tt) => {
218 #[allow(unused_braces)]
219 match compact_output_kind($sink_type) {
220 output_kind::UPSERT => {
221 const KIND: OutputKind = output_kind::UPSERT;
222 $body
223 }
224 output_kind::RETRACT => {
225 const KIND: OutputKind = output_kind::RETRACT;
226 $body
227 }
228 }
229 };
230}
231
232impl<F: LogStoreFactory> SinkExecutor<F> {
233 #[allow(clippy::too_many_arguments)]
234 #[expect(clippy::unused_async)]
235 pub async fn new(
236 actor_context: ActorContextRef,
237 info: ExecutorInfo,
238 input: Executor,
239 sink_writer_param: SinkWriterParam,
240 sink: SinkImpl,
241 sink_param: SinkParam,
242 columns: Vec<ColumnCatalog>,
243 log_store_factory: F,
244 chunk_size: usize,
245 input_data_types: Vec<DataType>,
246 rate_limit: Option<u32>,
247 ) -> StreamExecutorResult<Self> {
248 let sink_input_schema: Schema = columns
249 .iter()
250 .map(|column| Field::from(&column.column_desc))
251 .collect();
252
253 if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
254 assert_eq!(sink_input_schema.data_types(), {
256 let mut data_type = info.schema.data_types();
257 data_type.remove(col_dix);
258 data_type
259 });
260 } else {
261 assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
262 }
263
264 let non_append_only_behavior = if !sink_param.sink_type.is_append_only() {
265 let stream_key = &info.stream_key;
266 let pk_specified_and_matched = (sink_param.downstream_pk.as_ref())
267 .is_some_and(|downstream_pk| stream_key.iter().all(|i| downstream_pk.contains(i)));
268 let force_compaction = sink_param
269 .properties
270 .get(SINK_USER_FORCE_COMPACTION)
271 .map(|v| v.eq_ignore_ascii_case("true"))
272 .unwrap_or(false);
273 Some(NonAppendOnlyBehavior {
274 pk_specified_and_matched,
275 force_compaction,
276 })
277 } else {
278 None
279 };
280
281 tracing::info!(
282 sink_id = %sink_param.sink_id,
283 actor_id = %actor_context.id,
284 ?non_append_only_behavior,
285 "Sink executor info"
286 );
287
288 Ok(Self {
289 actor_context,
290 info,
291 input,
292 sink,
293 input_columns: columns,
294 sink_param,
295 log_store_factory,
296 sink_writer_param,
297 chunk_size,
298 input_data_types,
299 non_append_only_behavior,
300 rate_limit,
301 })
302 }
303
304 fn execute_inner(self) -> BoxedMessageStream {
305 let sink_id = self.sink_param.sink_id;
306 let actor_id = self.actor_context.id;
307 let fragment_id = self.actor_context.fragment_id;
308
309 let stream_key = self.info.stream_key.clone();
310 let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
311 sink_id,
312 actor_id,
313 fragment_id,
314 );
315
316 let input_compact_ib = if self.input.stream_kind() == StreamKind::Upsert {
319 InconsistencyBehavior::Tolerate
320 } else {
321 InconsistencyBehavior::Panic
322 };
323
324 let input = self.input.execute();
325
326 let input = input.inspect_ok(move |msg| {
327 if let Message::Chunk(c) = msg {
328 metrics.sink_input_row_count.inc_by(c.capacity() as u64);
329 metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
330 }
331 });
332
333 let processed_input = Self::process_msg(
334 input,
335 self.sink_param.sink_type,
336 self.sink_param.ignore_delete,
337 stream_key,
338 self.chunk_size,
339 self.input_data_types,
340 input_compact_ib,
341 self.sink_param.downstream_pk.clone(),
342 self.non_append_only_behavior,
343 metrics.sink_chunk_buffer_size,
344 self.sink.is_blackhole(), );
346
347 if self.sink.is_sink_into_table() {
348 processed_input.boxed()
350 } else {
351 let labels = [
352 &actor_id.to_string(),
353 &sink_id.to_string(),
354 self.sink_param.sink_name.as_str(),
355 ];
356 let log_store_first_write_epoch = GLOBAL_SINK_METRICS
357 .log_store_first_write_epoch
358 .with_guarded_label_values(&labels);
359 let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
360 .log_store_latest_write_epoch
361 .with_guarded_label_values(&labels);
362 let log_store_write_rows = GLOBAL_SINK_METRICS
363 .log_store_write_rows
364 .with_guarded_label_values(&labels);
365 let log_writer_metrics = LogWriterMetrics {
366 log_store_first_write_epoch,
367 log_store_latest_write_epoch,
368 log_store_write_rows,
369 };
370
371 let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
372 rate_limit_tx.send(self.rate_limit.into()).unwrap();
374
375 let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
376
377 self.log_store_factory
378 .build()
379 .map(move |(log_reader, log_writer)| {
380 let write_log_stream = Self::execute_write_log(
381 processed_input,
382 log_writer.monitored(log_writer_metrics),
383 actor_id,
384 fragment_id,
385 sink_id,
386 rate_limit_tx,
387 rebuild_sink_tx,
388 );
389
390 let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
391 let consume_log_stream = Self::execute_consume_log(
392 *sink,
393 log_reader,
394 self.input_columns,
395 self.sink_param,
396 self.sink_writer_param,
397 self.non_append_only_behavior,
398 self.actor_context,
399 rate_limit_rx,
400 rebuild_sink_rx,
401 )
402 .instrument_await(
403 await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
404 )
405 .map_ok(|never| never); consume_log_stream.boxed()
408 });
409 select(consume_log_stream_future.into_stream(), write_log_stream)
410 })
411 .into_stream()
412 .flatten()
413 .boxed()
414 }
415 }
416
417 #[try_stream(ok = Message, error = StreamExecutorError)]
418 async fn execute_write_log<W: LogWriter>(
419 input: impl MessageStream,
420 mut log_writer: W,
421 actor_id: ActorId,
422 fragment_id: FragmentId,
423 sink_id: SinkId,
424 rate_limit_tx: UnboundedSender<RateLimit>,
425 rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
426 ) {
427 pin_mut!(input);
428 let barrier = expect_first_barrier(&mut input).await?;
429 let epoch_pair = barrier.epoch;
430 let is_pause_on_startup = barrier.is_pause_on_startup();
431 yield Message::Barrier(barrier);
433
434 log_writer.init(epoch_pair, is_pause_on_startup).await?;
435
436 let mut is_paused = false;
437
438 #[for_await]
439 for msg in input {
440 match msg? {
441 Message::Watermark(w) => yield Message::Watermark(w),
442 Message::Chunk(chunk) => {
443 assert!(
444 !is_paused,
445 "Actor {actor_id} should not receive any data after pause"
446 );
447 log_writer.write_chunk(chunk.clone()).await?;
448 yield Message::Chunk(chunk);
449 }
450 Message::Barrier(barrier) => {
451 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
452 let schema_change = barrier.as_sink_schema_change(sink_id);
453 if let Some(schema_change) = &schema_change {
454 info!(?schema_change, %sink_id, "sink receive schema change");
455 }
456 let post_flush = log_writer
457 .flush_current_epoch(
458 barrier.epoch.curr,
459 FlushCurrentEpochOptions {
460 is_checkpoint: barrier.kind.is_checkpoint(),
461 new_vnode_bitmap: update_vnode_bitmap.clone(),
462 is_stop: barrier.is_stop(actor_id),
463 schema_change,
464 },
465 )
466 .await?;
467
468 let mutation = barrier.mutation.clone();
469 yield Message::Barrier(barrier);
470 if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
471 && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
472 {
473 let (tx, rx) = oneshot::channel();
474 rebuild_sink_tx
475 .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
476 .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
477 rx.await
478 .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
479 }
480 post_flush.post_yield_barrier().await?;
481
482 if let Some(mutation) = mutation.as_deref() {
483 match mutation {
484 Mutation::Pause => {
485 log_writer.pause()?;
486 is_paused = true;
487 }
488 Mutation::Resume => {
489 log_writer.resume()?;
490 is_paused = false;
491 }
492 Mutation::Throttle(fragment_to_apply) => {
493 if let Some(new_rate_limit) = fragment_to_apply.get(&fragment_id) {
494 tracing::info!(
495 rate_limit = new_rate_limit,
496 "received sink rate limit on actor {actor_id}"
497 );
498 if let Err(e) = rate_limit_tx.send((*new_rate_limit).into()) {
499 error!(
500 error = %e.as_report(),
501 "fail to send sink ate limit update"
502 );
503 return Err(StreamExecutorError::from(
504 e.to_report_string(),
505 ));
506 }
507 }
508 }
509 Mutation::ConnectorPropsChange(config) => {
510 if let Some(map) = config.get(&sink_id.as_raw_id())
511 && let Err(e) = rebuild_sink_tx
512 .send(RebuildSinkMessage::UpdateConfig(map.clone()))
513 {
514 error!(
515 error = %e.as_report(),
516 "fail to send sink alter props"
517 );
518 return Err(StreamExecutorError::from(e.to_report_string()));
519 }
520 }
521 _ => (),
522 }
523 }
524 }
525 }
526 }
527 }
528
529 #[allow(clippy::too_many_arguments)]
530 #[try_stream(ok = Message, error = StreamExecutorError)]
531 async fn process_msg(
532 input: impl MessageStream,
533 sink_type: SinkType,
534 ignore_delete: bool,
535 stream_key: StreamKey,
536 chunk_size: usize,
537 input_data_types: Vec<DataType>,
538 input_compact_ib: InconsistencyBehavior,
539 downstream_pk: Option<Vec<usize>>,
540 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
541 sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
542 skip_compact: bool,
543 ) {
544 if let Some(b) = non_append_only_behavior
546 && b.should_reorder_records()
547 {
548 assert_matches!(sink_type, SinkType::Upsert | SinkType::Retract);
549
550 let mut chunk_buffer = EstimatedVec::new();
551 let mut watermark: Option<super::Watermark> = None;
552 #[for_await]
553 for msg in input {
554 match msg? {
555 Message::Watermark(w) => watermark = Some(w),
556 Message::Chunk(c) => {
557 chunk_buffer.push(c);
558 sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
559 }
560 Message::Barrier(barrier) => {
561 let chunks = mem::take(&mut chunk_buffer).into_inner();
562
563 let mut delete_chunks = vec![];
566 let mut insert_chunks = vec![];
567
568 for c in dispatch_output_kind!(sink_type, KIND, {
569 StreamChunkCompactor::new(stream_key.clone(), chunks)
570 .into_compacted_chunks_inline::<KIND>(input_compact_ib)
571 }) {
572 let chunk = force_delete_only(c.clone());
573 if chunk.cardinality() > 0 {
574 delete_chunks.push(chunk);
575 }
576 let chunk = force_append_only(c);
577 if chunk.cardinality() > 0 {
578 insert_chunks.push(chunk);
579 }
580 }
581 let chunks = delete_chunks
582 .into_iter()
583 .chain(insert_chunks.into_iter())
584 .collect();
585
586 if let Some(downstream_pk) = &downstream_pk {
591 let chunks = dispatch_output_kind!(sink_type, KIND, {
592 StreamChunkCompactor::new(downstream_pk.clone(), chunks)
593 .into_compacted_chunks_reconstructed::<KIND>(
594 chunk_size,
595 input_data_types.clone(),
596 InconsistencyBehavior::Warn,
599 )
600 });
601 for c in chunks {
602 yield Message::Chunk(c);
603 }
604 } else {
605 let mut chunk_builder =
606 StreamChunkBuilder::new(chunk_size, input_data_types.clone());
607 for chunk in chunks {
608 for (op, row) in chunk.rows() {
609 if let Some(c) = chunk_builder.append_row(op, row) {
610 yield Message::Chunk(c);
611 }
612 }
613 }
614
615 if let Some(c) = chunk_builder.take() {
616 yield Message::Chunk(c);
617 }
618 };
619
620 if let Some(w) = mem::take(&mut watermark) {
622 yield Message::Watermark(w)
623 }
624 yield Message::Barrier(barrier);
625 }
626 }
627 }
628 } else {
629 #[for_await]
632 for msg in input {
633 match msg? {
634 Message::Watermark(w) => yield Message::Watermark(w),
635 Message::Chunk(mut chunk) => {
636 if !sink_type.is_append_only()
640 && let Some(downstream_pk) = &downstream_pk
641 {
642 if skip_compact {
643 assert_eq!(&stream_key, downstream_pk);
646 } else {
647 chunk = dispatch_output_kind!(sink_type, KIND, {
648 compact_chunk_inline::<KIND>(
649 chunk,
650 downstream_pk,
651 InconsistencyBehavior::Warn,
654 )
655 });
656 }
657 }
658 if ignore_delete {
659 chunk = force_append_only(chunk);
663 }
664 yield Message::Chunk(chunk);
665 }
666 Message::Barrier(barrier) => {
667 yield Message::Barrier(barrier);
668 }
669 }
670 }
671 }
672 }
673
674 #[expect(clippy::too_many_arguments)]
675 async fn execute_consume_log<S: Sink, R: LogReader>(
676 mut sink: S,
677 log_reader: R,
678 columns: Vec<ColumnCatalog>,
679 mut sink_param: SinkParam,
680 mut sink_writer_param: SinkWriterParam,
681 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
682 actor_context: ActorContextRef,
683 rate_limit_rx: UnboundedReceiver<RateLimit>,
684 mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
685 ) -> StreamExecutorResult<!> {
686 let visible_columns = columns
687 .iter()
688 .enumerate()
689 .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
690 .collect_vec();
691
692 let labels = [
693 &actor_context.id.to_string(),
694 sink_writer_param.connector.as_str(),
695 &sink_writer_param.sink_id.to_string(),
696 sink_writer_param.sink_name.as_str(),
697 ];
698 let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
699 .log_store_reader_wait_new_future_duration_ns
700 .with_guarded_label_values(&labels);
701 let log_store_read_rows = GLOBAL_SINK_METRICS
702 .log_store_read_rows
703 .with_guarded_label_values(&labels);
704 let log_store_read_bytes = GLOBAL_SINK_METRICS
705 .log_store_read_bytes
706 .with_guarded_label_values(&labels);
707 let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
708 .log_store_latest_read_epoch
709 .with_guarded_label_values(&labels);
710 let metrics = LogReaderMetrics {
711 log_store_latest_read_epoch,
712 log_store_read_rows,
713 log_store_read_bytes,
714 log_store_reader_wait_new_future_duration_ns,
715 };
716
717 let downstream_pk = sink_param.downstream_pk.clone();
718
719 let mut log_reader = log_reader
720 .transform_chunk(move |chunk| {
721 let chunk = if let Some(b) = non_append_only_behavior
722 && b.should_compact_in_log_reader()
723 {
724 let downstream_pk = downstream_pk.as_ref().unwrap();
726 dispatch_output_kind!(sink_param.sink_type, KIND, {
727 compact_chunk_inline::<KIND>(
728 chunk,
729 downstream_pk,
730 InconsistencyBehavior::Warn,
733 )
734 })
735 } else {
736 chunk
737 };
738 if visible_columns.len() != columns.len() {
739 chunk.project(&visible_columns)
742 } else {
743 chunk
744 }
745 })
746 .monitored(metrics)
747 .rate_limited(rate_limit_rx);
748
749 log_reader.init().await?;
750 loop {
751 let future = async {
752 loop {
753 let Err(e) = sink
754 .new_log_sinker(sink_writer_param.clone())
755 .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
756 .await;
757 GLOBAL_ERROR_METRICS.user_sink_error.report([
758 "sink_executor_error".to_owned(),
759 sink_param.sink_id.to_string(),
760 sink_param.sink_name.clone(),
761 actor_context.fragment_id.to_string(),
762 ]);
763
764 if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
765 meta_client
766 .add_sink_fail_evet_log(
767 sink_writer_param.sink_id,
768 sink_writer_param.sink_name.clone(),
769 sink_writer_param.connector.clone(),
770 e.to_report_string(),
771 )
772 .await;
773 }
774
775 if F::ALLOW_REWIND {
776 match log_reader.rewind().await {
777 Ok(()) => {
778 error!(
779 error = %e.as_report(),
780 executor_id = %sink_writer_param.executor_id,
781 sink_id = %sink_param.sink_id,
782 "reset log reader stream successfully after sink error"
783 );
784 Ok(())
785 }
786 Err(rewind_err) => {
787 error!(
788 error = %rewind_err.as_report(),
789 "fail to rewind log reader"
790 );
791 Err(e)
792 }
793 }
794 } else {
795 Err(e)
796 }
797 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
798 }
799 };
800 select! {
801 result = future => {
802 let Err(e): StreamExecutorResult<!> = result;
803 return Err(e);
804 }
805 result = rebuild_sink_rx.recv() => {
806 match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
807 RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
808 sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
809 if notify.send(()).is_err() {
810 warn!("failed to notify rebuild sink");
811 }
812 log_reader.init().await?;
813 },
814 RebuildSinkMessage::UpdateConfig(config) => {
815 if F::ALLOW_REWIND {
816 match log_reader.rewind().await {
817 Ok(()) => {
818 sink_param.properties.extend(config.into_iter());
819 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
820 info!(
821 executor_id = %sink_writer_param.executor_id,
822 sink_id = %sink_param.sink_id,
823 "alter sink config successfully with rewind"
824 );
825 Ok(())
826 }
827 Err(rewind_err) => {
828 error!(
829 error = %rewind_err.as_report(),
830 "fail to rewind log reader for alter sink config "
831 );
832 Err(anyhow!("fail to rewind log after alter table").into())
833 }
834 }
835 } else {
836 sink_param.properties.extend(config.into_iter());
837 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
838 Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
839 }
840 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
841 },
842 }
843 }
844 }
845 }
846 }
847}
848
849enum RebuildSinkMessage {
850 RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
851 UpdateConfig(HashMap<String, String>),
852}
853
854impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
855 fn execute(self: Box<Self>) -> BoxedMessageStream {
856 self.execute_inner()
857 }
858}
859
860#[cfg(test)]
861mod test {
862 use risingwave_common::catalog::{ColumnDesc, ColumnId};
863 use risingwave_common::util::epoch::test_epoch;
864 use risingwave_connector::sink::build_sink;
865
866 use super::*;
867 use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
868 use crate::executor::test_utils::*;
869
870 #[tokio::test]
871 async fn test_force_append_only_sink() {
872 use risingwave_common::array::StreamChunkTestExt;
873 use risingwave_common::array::stream_chunk::StreamChunk;
874 use risingwave_common::types::DataType;
875
876 use crate::executor::Barrier;
877
878 let properties = maplit::btreemap! {
879 "connector".into() => "blackhole".into(),
880 "type".into() => "append-only".into(),
881 "force_append_only".into() => "true".into()
882 };
883
884 let columns = vec![
887 ColumnCatalog {
888 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
889 is_hidden: false,
890 },
891 ColumnCatalog {
892 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
893 is_hidden: false,
894 },
895 ColumnCatalog {
896 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
897 is_hidden: true,
898 },
899 ];
900 let schema: Schema = columns
901 .iter()
902 .map(|column| Field::from(column.column_desc.clone()))
903 .collect();
904 let stream_key = vec![0];
905
906 let source = MockSource::with_messages(vec![
907 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
908 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
909 " I I I
910 + 3 2 1",
911 ))),
912 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
913 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
914 " I I I
915 U- 3 2 1
916 U+ 3 4 1
917 + 5 6 7",
918 ))),
919 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
920 " I I I
921 - 5 6 7",
922 ))),
923 ])
924 .into_executor(schema.clone(), stream_key.clone());
925
926 let sink_param = SinkParam {
927 sink_id: 0.into(),
928 sink_name: "test".into(),
929 properties,
930
931 columns: columns
932 .iter()
933 .filter(|col| !col.is_hidden)
934 .map(|col| col.column_desc.clone())
935 .collect(),
936 downstream_pk: Some(stream_key.clone()),
937 sink_type: SinkType::AppendOnly,
938 ignore_delete: true,
939 format_desc: None,
940 db_name: "test".into(),
941 sink_from_name: "test".into(),
942 };
943
944 let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
945
946 let sink = build_sink(sink_param.clone()).unwrap();
947
948 let sink_executor = SinkExecutor::new(
949 ActorContext::for_test(0),
950 info,
951 source,
952 SinkWriterParam::for_test(),
953 sink,
954 sink_param,
955 columns.clone(),
956 BoundedInMemLogStoreFactory::for_test(1),
957 1024,
958 vec![DataType::Int32, DataType::Int32, DataType::Int32],
959 None,
960 )
961 .await
962 .unwrap();
963
964 let mut executor = sink_executor.boxed().execute();
965
966 executor.next().await.unwrap().unwrap();
968
969 let chunk_msg = executor.next().await.unwrap().unwrap();
970 assert_eq!(
971 chunk_msg.into_chunk().unwrap().compact_vis(),
972 StreamChunk::from_pretty(
973 " I I I
974 + 3 2 1",
975 )
976 );
977
978 executor.next().await.unwrap().unwrap();
980
981 let chunk_msg = executor.next().await.unwrap().unwrap();
982 assert_eq!(
983 chunk_msg.into_chunk().unwrap().compact_vis(),
984 StreamChunk::from_pretty(
985 " I I I
986 + 3 4 1
987 + 5 6 7",
988 )
989 );
990
991 executor.next().await.unwrap().unwrap();
996 }
997
998 #[tokio::test]
999 async fn stream_key_sink_pk_mismatch_upsert() {
1000 stream_key_sink_pk_mismatch(SinkType::Upsert).await;
1001 }
1002
1003 #[tokio::test]
1004 async fn stream_key_sink_pk_mismatch_retract() {
1005 stream_key_sink_pk_mismatch(SinkType::Retract).await;
1006 }
1007
1008 async fn stream_key_sink_pk_mismatch(sink_type: SinkType) {
1009 use risingwave_common::array::StreamChunkTestExt;
1010 use risingwave_common::array::stream_chunk::StreamChunk;
1011 use risingwave_common::types::DataType;
1012
1013 use crate::executor::Barrier;
1014
1015 let properties = maplit::btreemap! {
1016 "connector".into() => "blackhole".into(),
1017 };
1018
1019 let columns = vec![
1022 ColumnCatalog {
1023 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1024 is_hidden: false,
1025 },
1026 ColumnCatalog {
1027 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1028 is_hidden: false,
1029 },
1030 ColumnCatalog {
1031 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1032 is_hidden: true,
1033 },
1034 ];
1035 let schema: Schema = columns
1036 .iter()
1037 .map(|column| Field::from(column.column_desc.clone()))
1038 .collect();
1039
1040 let source = MockSource::with_messages(vec![
1041 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1042 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1043 " I I I
1044 + 1 1 10",
1045 ))),
1046 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1047 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1048 " I I I
1049 + 1 3 30",
1050 ))),
1051 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1052 " I I I
1053 + 1 2 20
1054 - 1 2 20",
1055 ))),
1056 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1057 " I I I
1058 - 1 1 10
1059 + 1 1 40",
1060 ))),
1061 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1062 ])
1063 .into_executor(schema.clone(), vec![0, 1]);
1064
1065 let sink_param = SinkParam {
1066 sink_id: 0.into(),
1067 sink_name: "test".into(),
1068 properties,
1069
1070 columns: columns
1071 .iter()
1072 .filter(|col| !col.is_hidden)
1073 .map(|col| col.column_desc.clone())
1074 .collect(),
1075 downstream_pk: Some(vec![0]),
1076 sink_type,
1077 ignore_delete: false,
1078 format_desc: None,
1079 db_name: "test".into(),
1080 sink_from_name: "test".into(),
1081 };
1082
1083 let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1084
1085 let sink = build_sink(sink_param.clone()).unwrap();
1086
1087 let sink_executor = SinkExecutor::new(
1088 ActorContext::for_test(0),
1089 info,
1090 source,
1091 SinkWriterParam::for_test(),
1092 sink,
1093 sink_param,
1094 columns.clone(),
1095 BoundedInMemLogStoreFactory::for_test(1),
1096 1024,
1097 vec![DataType::Int64, DataType::Int64, DataType::Int64],
1098 None,
1099 )
1100 .await
1101 .unwrap();
1102
1103 let mut executor = sink_executor.boxed().execute();
1104
1105 executor.next().await.unwrap().unwrap();
1107
1108 let chunk_msg = executor.next().await.unwrap().unwrap();
1109 assert_eq!(
1110 chunk_msg.into_chunk().unwrap().compact_vis(),
1111 StreamChunk::from_pretty(
1112 " I I I
1113 + 1 1 10",
1114 )
1115 );
1116
1117 executor.next().await.unwrap().unwrap();
1119
1120 let chunk_msg = executor.next().await.unwrap().unwrap();
1121 let expected = match sink_type {
1122 SinkType::Retract => StreamChunk::from_pretty(
1123 " I I I
1124 U- 1 1 10
1125 U+ 1 1 40",
1126 ),
1127 SinkType::Upsert => StreamChunk::from_pretty(
1128 " I I I
1129 + 1 1 40", ),
1131 _ => unreachable!(),
1132 };
1133 assert_eq!(chunk_msg.into_chunk().unwrap().compact_vis(), expected);
1134
1135 executor.next().await.unwrap().unwrap();
1137 }
1138
1139 #[tokio::test]
1140 async fn test_empty_barrier_sink() {
1141 use risingwave_common::types::DataType;
1142
1143 use crate::executor::Barrier;
1144
1145 let properties = maplit::btreemap! {
1146 "connector".into() => "blackhole".into(),
1147 "type".into() => "append-only".into(),
1148 "force_append_only".into() => "true".into()
1149 };
1150 let columns = vec![
1151 ColumnCatalog {
1152 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1153 is_hidden: false,
1154 },
1155 ColumnCatalog {
1156 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1157 is_hidden: false,
1158 },
1159 ];
1160 let schema: Schema = columns
1161 .iter()
1162 .map(|column| Field::from(column.column_desc.clone()))
1163 .collect();
1164 let stream_key = vec![0];
1165
1166 let source = MockSource::with_messages(vec![
1167 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1168 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1169 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1170 ])
1171 .into_executor(schema.clone(), stream_key.clone());
1172
1173 let sink_param = SinkParam {
1174 sink_id: 0.into(),
1175 sink_name: "test".into(),
1176 properties,
1177
1178 columns: columns
1179 .iter()
1180 .filter(|col| !col.is_hidden)
1181 .map(|col| col.column_desc.clone())
1182 .collect(),
1183 downstream_pk: Some(stream_key.clone()),
1184 sink_type: SinkType::AppendOnly,
1185 ignore_delete: true,
1186 format_desc: None,
1187 db_name: "test".into(),
1188 sink_from_name: "test".into(),
1189 };
1190
1191 let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
1192
1193 let sink = build_sink(sink_param.clone()).unwrap();
1194
1195 let sink_executor = SinkExecutor::new(
1196 ActorContext::for_test(0),
1197 info,
1198 source,
1199 SinkWriterParam::for_test(),
1200 sink,
1201 sink_param,
1202 columns,
1203 BoundedInMemLogStoreFactory::for_test(1),
1204 1024,
1205 vec![DataType::Int64, DataType::Int64],
1206 None,
1207 )
1208 .await
1209 .unwrap();
1210
1211 let mut executor = sink_executor.boxed().execute();
1212
1213 assert_eq!(
1215 executor.next().await.unwrap().unwrap(),
1216 Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1217 );
1218
1219 assert_eq!(
1221 executor.next().await.unwrap().unwrap(),
1222 Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1223 );
1224
1225 assert_eq!(
1227 executor.next().await.unwrap().unwrap(),
1228 Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1229 );
1230 }
1231
1232 #[tokio::test]
1233 async fn test_force_compaction() {
1234 use risingwave_common::array::StreamChunkTestExt;
1235 use risingwave_common::array::stream_chunk::StreamChunk;
1236 use risingwave_common::types::DataType;
1237
1238 use crate::executor::Barrier;
1239
1240 let properties = maplit::btreemap! {
1241 "connector".into() => "blackhole".into(),
1242 "force_compaction".into() => "true".into()
1243 };
1244
1245 let columns = vec![
1248 ColumnCatalog {
1249 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1250 is_hidden: false,
1251 },
1252 ColumnCatalog {
1253 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1254 is_hidden: false,
1255 },
1256 ColumnCatalog {
1257 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1258 is_hidden: true,
1259 },
1260 ];
1261 let schema: Schema = columns
1262 .iter()
1263 .map(|column| Field::from(column.column_desc.clone()))
1264 .collect();
1265
1266 let source = MockSource::with_messages(vec![
1267 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1268 Message::Chunk(StreamChunk::from_pretty(
1269 " I I I
1270 + 1 1 10",
1271 )),
1272 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1273 Message::Chunk(StreamChunk::from_pretty(
1274 " I I I
1275 + 1 3 30",
1276 )),
1277 Message::Chunk(StreamChunk::from_pretty(
1278 " I I I
1279 + 1 2 20
1280 - 1 2 20
1281 + 1 4 10",
1282 )),
1283 Message::Chunk(StreamChunk::from_pretty(
1284 " I I I
1285 - 1 1 10
1286 + 1 1 40",
1287 )),
1288 Message::Chunk(StreamChunk::from_pretty(
1289 " I I I
1290 - 1 4 30",
1291 )),
1292 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1293 ])
1294 .into_executor(schema.clone(), vec![0, 1]);
1295
1296 let sink_param = SinkParam {
1297 sink_id: 0.into(),
1298 sink_name: "test".into(),
1299 properties,
1300
1301 columns: columns
1302 .iter()
1303 .filter(|col| !col.is_hidden)
1304 .map(|col| col.column_desc.clone())
1305 .collect(),
1306 downstream_pk: Some(vec![0, 1]),
1307 sink_type: SinkType::Upsert,
1308 ignore_delete: false,
1309 format_desc: None,
1310 db_name: "test".into(),
1311 sink_from_name: "test".into(),
1312 };
1313
1314 let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1315
1316 let sink = build_sink(sink_param.clone()).unwrap();
1317
1318 let sink_executor = SinkExecutor::new(
1319 ActorContext::for_test(0),
1320 info,
1321 source,
1322 SinkWriterParam::for_test(),
1323 sink,
1324 sink_param,
1325 columns.clone(),
1326 BoundedInMemLogStoreFactory::for_test(1),
1327 1024,
1328 vec![DataType::Int64, DataType::Int64, DataType::Int64],
1329 None,
1330 )
1331 .await
1332 .unwrap();
1333
1334 let mut executor = sink_executor.boxed().execute();
1335
1336 executor.next().await.unwrap().unwrap();
1338
1339 let chunk_msg = executor.next().await.unwrap().unwrap();
1340 assert_eq!(
1341 chunk_msg.into_chunk().unwrap().compact_vis(),
1342 StreamChunk::from_pretty(
1343 " I I I
1344 + 1 1 10",
1345 )
1346 );
1347
1348 executor.next().await.unwrap().unwrap();
1350
1351 let chunk_msg = executor.next().await.unwrap().unwrap();
1352 assert_eq!(
1353 chunk_msg.into_chunk().unwrap().compact_vis(),
1354 StreamChunk::from_pretty(
1355 " I I I
1356 + 1 3 30
1357 + 1 1 40",
1358 )
1359 );
1360
1361 executor.next().await.unwrap().unwrap();
1363 }
1364}