1use std::assert_matches::assert_matches;
16use std::collections::HashMap;
17use std::mem;
18
19use anyhow::anyhow;
20use futures::stream::select;
21use futures::{FutureExt, TryFutureExt, TryStreamExt};
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::array::stream_chunk::StreamChunkMut;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{ColumnCatalog, Field};
27use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
28use risingwave_common_estimate_size::EstimateSize;
29use risingwave_common_estimate_size::collections::EstimatedVec;
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_connector::dispatch_sink;
32use risingwave_connector::sink::catalog::{SinkId, SinkType};
33use risingwave_connector::sink::log_store::{
34 FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
35 LogWriter, LogWriterExt, LogWriterMetrics,
36};
37use risingwave_connector::sink::{
38 GLOBAL_SINK_METRICS, LogSinker, SINK_USER_FORCE_COMPACTION, Sink, SinkImpl, SinkParam,
39 SinkWriterParam,
40};
41use risingwave_pb::common::ThrottleType;
42use risingwave_pb::id::FragmentId;
43use risingwave_pb::stream_plan::stream_node::StreamKind;
44use thiserror_ext::AsReport;
45use tokio::select;
46use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
47use tokio::sync::oneshot;
48
49use crate::common::change_buffer::{OutputKind, output_kind};
50use crate::common::compact_chunk::{
51 InconsistencyBehavior, StreamChunkCompactor, compact_chunk_inline,
52};
53use crate::executor::prelude::*;
54pub struct SinkExecutor<F: LogStoreFactory> {
55 actor_context: ActorContextRef,
56 info: ExecutorInfo,
57 input: Executor,
58 sink: SinkImpl,
59 input_columns: Vec<ColumnCatalog>,
60 sink_param: SinkParam,
61 log_store_factory: F,
62 sink_writer_param: SinkWriterParam,
63 chunk_size: usize,
64 input_data_types: Vec<DataType>,
65 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
66 rate_limit: Option<u32>,
67}
68
69fn force_append_only(c: StreamChunk) -> StreamChunk {
71 let mut c: StreamChunkMut = c.into();
72 for (_, mut r) in c.to_rows_mut() {
73 match r.op() {
74 Op::Insert => {}
75 Op::Delete | Op::UpdateDelete => r.set_vis(false),
76 Op::UpdateInsert => r.set_op(Op::Insert),
77 }
78 }
79 c.into()
80}
81
82fn force_delete_only(c: StreamChunk) -> StreamChunk {
84 let mut c: StreamChunkMut = c.into();
85 for (_, mut r) in c.to_rows_mut() {
86 match r.op() {
87 Op::Delete => {}
88 Op::Insert | Op::UpdateInsert => r.set_vis(false),
89 Op::UpdateDelete => r.set_op(Op::Delete),
90 }
91 }
92 c.into()
93}
94
95#[derive(Clone, Copy, Debug)]
98struct NonAppendOnlyBehavior {
99 pk_specified_and_matched: bool,
104 force_compaction: bool,
106}
107
108impl NonAppendOnlyBehavior {
109 fn should_compact_in_log_reader(self) -> bool {
130 self.pk_specified_and_matched && !self.force_compaction
131 }
132
133 fn should_reorder_records(self) -> bool {
202 !self.pk_specified_and_matched || self.force_compaction
203 }
204}
205
206fn compact_output_kind(sink_type: SinkType) -> OutputKind {
208 match sink_type {
209 SinkType::Upsert => output_kind::UPSERT,
210 SinkType::Retract => output_kind::RETRACT,
211 SinkType::AppendOnly => output_kind::RETRACT,
213 }
214}
215
216macro_rules! dispatch_output_kind {
218 ($sink_type:expr, $KIND:ident, $body:tt) => {
219 #[allow(unused_braces)]
220 match compact_output_kind($sink_type) {
221 output_kind::UPSERT => {
222 const KIND: OutputKind = output_kind::UPSERT;
223 $body
224 }
225 output_kind::RETRACT => {
226 const KIND: OutputKind = output_kind::RETRACT;
227 $body
228 }
229 }
230 };
231}
232
233impl<F: LogStoreFactory> SinkExecutor<F> {
234 #[allow(clippy::too_many_arguments)]
235 #[expect(clippy::unused_async)]
236 pub async fn new(
237 actor_context: ActorContextRef,
238 info: ExecutorInfo,
239 input: Executor,
240 sink_writer_param: SinkWriterParam,
241 sink: SinkImpl,
242 sink_param: SinkParam,
243 columns: Vec<ColumnCatalog>,
244 log_store_factory: F,
245 chunk_size: usize,
246 input_data_types: Vec<DataType>,
247 rate_limit: Option<u32>,
248 ) -> StreamExecutorResult<Self> {
249 let sink_input_schema: Schema = columns
250 .iter()
251 .map(|column| Field::from(&column.column_desc))
252 .collect();
253
254 if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
255 assert_eq!(sink_input_schema.data_types(), {
257 let mut data_type = info.schema.data_types();
258 data_type.remove(col_dix);
259 data_type
260 });
261 } else {
262 assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
263 }
264
265 let non_append_only_behavior = if !sink_param.sink_type.is_append_only() {
266 let stream_key = &info.stream_key;
267 let pk_specified_and_matched = (sink_param.downstream_pk.as_ref())
268 .is_some_and(|downstream_pk| stream_key.iter().all(|i| downstream_pk.contains(i)));
269 let force_compaction = sink_param
270 .properties
271 .get(SINK_USER_FORCE_COMPACTION)
272 .map(|v| v.eq_ignore_ascii_case("true"))
273 .unwrap_or(false);
274 Some(NonAppendOnlyBehavior {
275 pk_specified_and_matched,
276 force_compaction,
277 })
278 } else {
279 None
280 };
281
282 tracing::info!(
283 sink_id = %sink_param.sink_id,
284 actor_id = %actor_context.id,
285 ?non_append_only_behavior,
286 "Sink executor info"
287 );
288
289 Ok(Self {
290 actor_context,
291 info,
292 input,
293 sink,
294 input_columns: columns,
295 sink_param,
296 log_store_factory,
297 sink_writer_param,
298 chunk_size,
299 input_data_types,
300 non_append_only_behavior,
301 rate_limit,
302 })
303 }
304
305 fn execute_inner(self) -> BoxedMessageStream {
306 let sink_id = self.sink_param.sink_id;
307 let actor_id = self.actor_context.id;
308 let fragment_id = self.actor_context.fragment_id;
309
310 let stream_key = self.info.stream_key.clone();
311 let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
312 sink_id,
313 actor_id,
314 fragment_id,
315 );
316
317 let input_compact_ib = if self.input.stream_kind() == StreamKind::Upsert {
320 InconsistencyBehavior::Tolerate
321 } else {
322 InconsistencyBehavior::Panic
323 };
324
325 let input = self.input.execute();
326
327 let input = input.inspect_ok(move |msg| {
328 if let Message::Chunk(c) = msg {
329 metrics.sink_input_row_count.inc_by(c.capacity() as u64);
330 metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
331 }
332 });
333
334 let processed_input = Self::process_msg(
335 input,
336 self.sink_param.sink_type,
337 stream_key,
338 self.chunk_size,
339 self.input_data_types,
340 input_compact_ib,
341 self.sink_param.downstream_pk.clone(),
342 self.non_append_only_behavior,
343 metrics.sink_chunk_buffer_size,
344 self.sink.is_blackhole(), );
346
347 let processed_input = if self.sink_param.ignore_delete {
348 processed_input
350 .map_ok(|msg| match msg {
351 Message::Chunk(chunk) => Message::Chunk(force_append_only(chunk)),
352 other => other,
353 })
354 .left_stream()
355 } else {
356 processed_input.right_stream()
357 };
358
359 if self.sink.is_sink_into_table() {
360 processed_input.boxed()
362 } else {
363 let labels = [
364 &actor_id.to_string(),
365 &sink_id.to_string(),
366 self.sink_param.sink_name.as_str(),
367 ];
368 let log_store_first_write_epoch = GLOBAL_SINK_METRICS
369 .log_store_first_write_epoch
370 .with_guarded_label_values(&labels);
371 let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
372 .log_store_latest_write_epoch
373 .with_guarded_label_values(&labels);
374 let log_store_write_rows = GLOBAL_SINK_METRICS
375 .log_store_write_rows
376 .with_guarded_label_values(&labels);
377 let log_writer_metrics = LogWriterMetrics {
378 log_store_first_write_epoch,
379 log_store_latest_write_epoch,
380 log_store_write_rows,
381 };
382
383 let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
384 rate_limit_tx.send(self.rate_limit.into()).unwrap();
386
387 let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
388
389 self.log_store_factory
390 .build()
391 .map(move |(log_reader, log_writer)| {
392 let write_log_stream = Self::execute_write_log(
393 processed_input,
394 log_writer.monitored(log_writer_metrics),
395 actor_id,
396 fragment_id,
397 sink_id,
398 rate_limit_tx,
399 rebuild_sink_tx,
400 );
401
402 let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
403 let consume_log_stream = Self::execute_consume_log(
404 *sink,
405 log_reader,
406 self.input_columns,
407 self.sink_param,
408 self.sink_writer_param,
409 self.non_append_only_behavior,
410 self.actor_context,
411 rate_limit_rx,
412 rebuild_sink_rx,
413 )
414 .instrument_await(
415 await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
416 )
417 .map_ok(|never| never); consume_log_stream.boxed()
420 });
421 select(consume_log_stream_future.into_stream(), write_log_stream)
422 })
423 .into_stream()
424 .flatten()
425 .boxed()
426 }
427 }
428
429 #[try_stream(ok = Message, error = StreamExecutorError)]
430 async fn execute_write_log<W: LogWriter>(
431 input: impl MessageStream,
432 mut log_writer: W,
433 actor_id: ActorId,
434 fragment_id: FragmentId,
435 sink_id: SinkId,
436 rate_limit_tx: UnboundedSender<RateLimit>,
437 rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
438 ) {
439 pin_mut!(input);
440 let barrier = expect_first_barrier(&mut input).await?;
441 let epoch_pair = barrier.epoch;
442 let is_pause_on_startup = barrier.is_pause_on_startup();
443 yield Message::Barrier(barrier);
445
446 log_writer.init(epoch_pair, is_pause_on_startup).await?;
447
448 let mut is_paused = false;
449
450 #[for_await]
451 for msg in input {
452 match msg? {
453 Message::Watermark(w) => yield Message::Watermark(w),
454 Message::Chunk(chunk) => {
455 assert!(
456 !is_paused,
457 "Actor {actor_id} should not receive any data after pause"
458 );
459 log_writer.write_chunk(chunk.clone()).await?;
460 yield Message::Chunk(chunk);
461 }
462 Message::Barrier(barrier) => {
463 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
464 let schema_change = barrier.as_sink_schema_change(sink_id);
465 if let Some(schema_change) = &schema_change {
466 info!(?schema_change, %sink_id, "sink receive schema change");
467 }
468 let post_flush = log_writer
469 .flush_current_epoch(
470 barrier.epoch.curr,
471 FlushCurrentEpochOptions {
472 is_checkpoint: barrier.kind.is_checkpoint(),
473 new_vnode_bitmap: update_vnode_bitmap.clone(),
474 is_stop: barrier.is_stop(actor_id),
475 schema_change,
476 },
477 )
478 .await?;
479
480 let mutation = barrier.mutation.clone();
481 yield Message::Barrier(barrier);
482 if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
483 && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
484 {
485 let (tx, rx) = oneshot::channel();
486 rebuild_sink_tx
487 .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
488 .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
489 rx.await
490 .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
491 }
492 post_flush.post_yield_barrier().await?;
493
494 if let Some(mutation) = mutation.as_deref() {
495 match mutation {
496 Mutation::Pause => {
497 log_writer.pause()?;
498 is_paused = true;
499 }
500 Mutation::Resume => {
501 log_writer.resume()?;
502 is_paused = false;
503 }
504 Mutation::Throttle(fragment_to_apply) => {
505 if let Some(entry) = fragment_to_apply.get(&fragment_id)
506 && entry.throttle_type() == ThrottleType::Sink
507 {
508 tracing::info!(
509 rate_limit = entry.rate_limit,
510 "received sink rate limit on actor {actor_id}"
511 );
512 if let Err(e) = rate_limit_tx.send(entry.rate_limit.into()) {
513 error!(
514 error = %e.as_report(),
515 "fail to send sink rate limit update"
516 );
517 return Err(StreamExecutorError::from(
518 e.to_report_string(),
519 ));
520 }
521 }
522 }
523 Mutation::ConnectorPropsChange(config) => {
524 if let Some(map) = config.get(&sink_id.as_raw_id())
525 && let Err(e) = rebuild_sink_tx
526 .send(RebuildSinkMessage::UpdateConfig(map.clone()))
527 {
528 error!(
529 error = %e.as_report(),
530 "fail to send sink alter props"
531 );
532 return Err(StreamExecutorError::from(e.to_report_string()));
533 }
534 }
535 _ => (),
536 }
537 }
538 }
539 }
540 }
541 }
542
543 #[allow(clippy::too_many_arguments)]
544 #[try_stream(ok = Message, error = StreamExecutorError)]
545 async fn process_msg(
546 input: impl MessageStream,
547 sink_type: SinkType,
548 stream_key: StreamKey,
549 chunk_size: usize,
550 input_data_types: Vec<DataType>,
551 input_compact_ib: InconsistencyBehavior,
552 downstream_pk: Option<Vec<usize>>,
553 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
554 sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
555 skip_compact: bool,
556 ) {
557 if let Some(b) = non_append_only_behavior
559 && b.should_reorder_records()
560 {
561 assert_matches!(sink_type, SinkType::Upsert | SinkType::Retract);
562
563 let mut chunk_buffer = EstimatedVec::new();
564 let mut watermark: Option<super::Watermark> = None;
565 #[for_await]
566 for msg in input {
567 match msg? {
568 Message::Watermark(w) => watermark = Some(w),
569 Message::Chunk(c) => {
570 chunk_buffer.push(c);
571 sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
572 }
573 Message::Barrier(barrier) => {
574 let chunks = mem::take(&mut chunk_buffer).into_inner();
575
576 let mut delete_chunks = vec![];
579 let mut insert_chunks = vec![];
580
581 for c in dispatch_output_kind!(sink_type, KIND, {
582 StreamChunkCompactor::new(stream_key.clone(), chunks)
583 .into_compacted_chunks_inline::<KIND>(input_compact_ib)
584 }) {
585 let chunk = force_delete_only(c.clone());
586 if chunk.cardinality() > 0 {
587 delete_chunks.push(chunk);
588 }
589 let chunk = force_append_only(c);
590 if chunk.cardinality() > 0 {
591 insert_chunks.push(chunk);
592 }
593 }
594 let chunks = delete_chunks
595 .into_iter()
596 .chain(insert_chunks.into_iter())
597 .collect();
598
599 if let Some(downstream_pk) = &downstream_pk {
604 let chunks = dispatch_output_kind!(sink_type, KIND, {
605 StreamChunkCompactor::new(downstream_pk.clone(), chunks)
606 .into_compacted_chunks_reconstructed::<KIND>(
607 chunk_size,
608 input_data_types.clone(),
609 InconsistencyBehavior::Warn,
612 )
613 });
614 for c in chunks {
615 yield Message::Chunk(c);
616 }
617 } else {
618 let mut chunk_builder =
619 StreamChunkBuilder::new(chunk_size, input_data_types.clone());
620 for chunk in chunks {
621 for (op, row) in chunk.rows() {
622 if let Some(c) = chunk_builder.append_row(op, row) {
623 yield Message::Chunk(c);
624 }
625 }
626 }
627
628 if let Some(c) = chunk_builder.take() {
629 yield Message::Chunk(c);
630 }
631 };
632
633 if let Some(w) = mem::take(&mut watermark) {
635 yield Message::Watermark(w)
636 }
637 yield Message::Barrier(barrier);
638 }
639 }
640 }
641 } else {
642 #[for_await]
645 for msg in input {
646 match msg? {
647 Message::Watermark(w) => yield Message::Watermark(w),
648 Message::Chunk(mut chunk) => {
649 if !sink_type.is_append_only()
653 && let Some(downstream_pk) = &downstream_pk
654 {
655 if skip_compact {
656 assert_eq!(&stream_key, downstream_pk);
659 } else {
660 chunk = dispatch_output_kind!(sink_type, KIND, {
661 compact_chunk_inline::<KIND>(
662 chunk,
663 downstream_pk,
664 InconsistencyBehavior::Warn,
667 )
668 });
669 }
670 }
671 yield Message::Chunk(chunk);
672 }
673 Message::Barrier(barrier) => {
674 yield Message::Barrier(barrier);
675 }
676 }
677 }
678 }
679 }
680
681 #[expect(clippy::too_many_arguments)]
682 async fn execute_consume_log<S: Sink, R: LogReader>(
683 mut sink: S,
684 log_reader: R,
685 columns: Vec<ColumnCatalog>,
686 mut sink_param: SinkParam,
687 mut sink_writer_param: SinkWriterParam,
688 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
689 actor_context: ActorContextRef,
690 rate_limit_rx: UnboundedReceiver<RateLimit>,
691 mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
692 ) -> StreamExecutorResult<!> {
693 let visible_columns = columns
694 .iter()
695 .enumerate()
696 .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
697 .collect_vec();
698
699 let labels = [
700 &actor_context.id.to_string(),
701 sink_writer_param.connector.as_str(),
702 &sink_writer_param.sink_id.to_string(),
703 sink_writer_param.sink_name.as_str(),
704 ];
705 let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
706 .log_store_reader_wait_new_future_duration_ns
707 .with_guarded_label_values(&labels);
708 let log_store_read_rows = GLOBAL_SINK_METRICS
709 .log_store_read_rows
710 .with_guarded_label_values(&labels);
711 let log_store_read_bytes = GLOBAL_SINK_METRICS
712 .log_store_read_bytes
713 .with_guarded_label_values(&labels);
714 let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
715 .log_store_latest_read_epoch
716 .with_guarded_label_values(&labels);
717 let metrics = LogReaderMetrics {
718 log_store_latest_read_epoch,
719 log_store_read_rows,
720 log_store_read_bytes,
721 log_store_reader_wait_new_future_duration_ns,
722 };
723
724 let downstream_pk = sink_param.downstream_pk.clone();
725
726 let mut log_reader = log_reader
727 .transform_chunk(move |chunk| {
728 let chunk = if let Some(b) = non_append_only_behavior
729 && b.should_compact_in_log_reader()
730 {
731 let downstream_pk = downstream_pk.as_ref().unwrap();
733 dispatch_output_kind!(sink_param.sink_type, KIND, {
734 compact_chunk_inline::<KIND>(
735 chunk,
736 downstream_pk,
737 InconsistencyBehavior::Warn,
740 )
741 })
742 } else {
743 chunk
744 };
745 if visible_columns.len() != columns.len() {
746 chunk.project(&visible_columns)
749 } else {
750 chunk
751 }
752 })
753 .monitored(metrics)
754 .rate_limited(rate_limit_rx);
755
756 log_reader.init().await?;
757 loop {
758 let future = async {
759 loop {
760 let Err(e) = sink
761 .new_log_sinker(sink_writer_param.clone())
762 .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
763 .await;
764 GLOBAL_ERROR_METRICS.user_sink_error.report([
765 "sink_executor_error".to_owned(),
766 sink_param.sink_id.to_string(),
767 sink_param.sink_name.clone(),
768 actor_context.fragment_id.to_string(),
769 ]);
770
771 if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
772 meta_client
773 .add_sink_fail_evet_log(
774 sink_writer_param.sink_id,
775 sink_writer_param.sink_name.clone(),
776 sink_writer_param.connector.clone(),
777 e.to_report_string(),
778 )
779 .await;
780 }
781
782 if F::ALLOW_REWIND {
783 match log_reader.rewind().await {
784 Ok(()) => {
785 error!(
786 error = %e.as_report(),
787 executor_id = %sink_writer_param.executor_id,
788 sink_id = %sink_param.sink_id,
789 "reset log reader stream successfully after sink error"
790 );
791 Ok(())
792 }
793 Err(rewind_err) => {
794 error!(
795 error = %rewind_err.as_report(),
796 "fail to rewind log reader"
797 );
798 Err(e)
799 }
800 }
801 } else {
802 Err(e)
803 }
804 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
805 }
806 };
807 select! {
808 result = future => {
809 let Err(e): StreamExecutorResult<!> = result;
810 return Err(e);
811 }
812 result = rebuild_sink_rx.recv() => {
813 match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
814 RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
815 sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
816 if notify.send(()).is_err() {
817 warn!("failed to notify rebuild sink");
818 }
819 log_reader.init().await?;
820 },
821 RebuildSinkMessage::UpdateConfig(config) => {
822 if F::ALLOW_REWIND {
823 match log_reader.rewind().await {
824 Ok(()) => {
825 sink_param.properties.extend(config.into_iter());
826 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
827 info!(
828 executor_id = %sink_writer_param.executor_id,
829 sink_id = %sink_param.sink_id,
830 "alter sink config successfully with rewind"
831 );
832 Ok(())
833 }
834 Err(rewind_err) => {
835 error!(
836 error = %rewind_err.as_report(),
837 "fail to rewind log reader for alter sink config "
838 );
839 Err(anyhow!("fail to rewind log after alter table").into())
840 }
841 }
842 } else {
843 sink_param.properties.extend(config.into_iter());
844 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
845 Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
846 }
847 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
848 },
849 }
850 }
851 }
852 }
853 }
854}
855
856enum RebuildSinkMessage {
857 RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
858 UpdateConfig(HashMap<String, String>),
859}
860
861impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
862 fn execute(self: Box<Self>) -> BoxedMessageStream {
863 self.execute_inner()
864 }
865}
866
867#[cfg(test)]
868mod test {
869 use risingwave_common::catalog::{ColumnDesc, ColumnId};
870 use risingwave_common::util::epoch::test_epoch;
871 use risingwave_connector::sink::build_sink;
872
873 use super::*;
874 use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
875 use crate::executor::test_utils::*;
876
877 #[tokio::test]
878 async fn test_force_append_only_sink() {
879 use risingwave_common::array::StreamChunkTestExt;
880 use risingwave_common::array::stream_chunk::StreamChunk;
881 use risingwave_common::types::DataType;
882
883 use crate::executor::Barrier;
884
885 let properties = maplit::btreemap! {
886 "connector".into() => "blackhole".into(),
887 "type".into() => "append-only".into(),
888 "force_append_only".into() => "true".into()
889 };
890
891 let columns = vec![
894 ColumnCatalog {
895 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
896 is_hidden: false,
897 },
898 ColumnCatalog {
899 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
900 is_hidden: false,
901 },
902 ColumnCatalog {
903 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
904 is_hidden: true,
905 },
906 ];
907 let schema: Schema = columns
908 .iter()
909 .map(|column| Field::from(column.column_desc.clone()))
910 .collect();
911 let stream_key = vec![0];
912
913 let source = MockSource::with_messages(vec![
914 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
915 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
916 " I I I
917 + 3 2 1",
918 ))),
919 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
920 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
921 " I I I
922 U- 3 2 1
923 U+ 3 4 1
924 + 5 6 7",
925 ))),
926 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
927 " I I I
928 - 5 6 7",
929 ))),
930 ])
931 .into_executor(schema.clone(), stream_key.clone());
932
933 let sink_param = SinkParam {
934 sink_id: 0.into(),
935 sink_name: "test".into(),
936 properties,
937
938 columns: columns
939 .iter()
940 .filter(|col| !col.is_hidden)
941 .map(|col| col.column_desc.clone())
942 .collect(),
943 downstream_pk: Some(stream_key.clone()),
944 sink_type: SinkType::AppendOnly,
945 ignore_delete: true,
946 format_desc: None,
947 db_name: "test".into(),
948 sink_from_name: "test".into(),
949 };
950
951 let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
952
953 let sink = build_sink(sink_param.clone()).unwrap();
954
955 let sink_executor = SinkExecutor::new(
956 ActorContext::for_test(0),
957 info,
958 source,
959 SinkWriterParam::for_test(),
960 sink,
961 sink_param,
962 columns.clone(),
963 BoundedInMemLogStoreFactory::for_test(1),
964 1024,
965 vec![DataType::Int32, DataType::Int32, DataType::Int32],
966 None,
967 )
968 .await
969 .unwrap();
970
971 let mut executor = sink_executor.boxed().execute();
972
973 executor.next().await.unwrap().unwrap();
975
976 let chunk_msg = executor.next().await.unwrap().unwrap();
977 assert_eq!(
978 chunk_msg.into_chunk().unwrap().compact_vis(),
979 StreamChunk::from_pretty(
980 " I I I
981 + 3 2 1",
982 )
983 );
984
985 executor.next().await.unwrap().unwrap();
987
988 let chunk_msg = executor.next().await.unwrap().unwrap();
989 assert_eq!(
990 chunk_msg.into_chunk().unwrap().compact_vis(),
991 StreamChunk::from_pretty(
992 " I I I
993 + 3 4 1
994 + 5 6 7",
995 )
996 );
997
998 executor.next().await.unwrap().unwrap();
1003 }
1004
1005 #[tokio::test]
1006 async fn stream_key_sink_pk_mismatch_upsert() {
1007 stream_key_sink_pk_mismatch(SinkType::Upsert).await;
1008 }
1009
1010 #[tokio::test]
1011 async fn stream_key_sink_pk_mismatch_retract() {
1012 stream_key_sink_pk_mismatch(SinkType::Retract).await;
1013 }
1014
1015 async fn stream_key_sink_pk_mismatch(sink_type: SinkType) {
1016 use risingwave_common::array::StreamChunkTestExt;
1017 use risingwave_common::array::stream_chunk::StreamChunk;
1018 use risingwave_common::types::DataType;
1019
1020 use crate::executor::Barrier;
1021
1022 let properties = maplit::btreemap! {
1023 "connector".into() => "blackhole".into(),
1024 };
1025
1026 let columns = vec![
1029 ColumnCatalog {
1030 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1031 is_hidden: false,
1032 },
1033 ColumnCatalog {
1034 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1035 is_hidden: false,
1036 },
1037 ColumnCatalog {
1038 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1039 is_hidden: true,
1040 },
1041 ];
1042 let schema: Schema = columns
1043 .iter()
1044 .map(|column| Field::from(column.column_desc.clone()))
1045 .collect();
1046
1047 let source = MockSource::with_messages(vec![
1048 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1049 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1050 " I I I
1051 + 1 1 10",
1052 ))),
1053 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1054 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1055 " I I I
1056 + 1 3 30",
1057 ))),
1058 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1059 " I I I
1060 + 1 2 20
1061 - 1 2 20",
1062 ))),
1063 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1064 " I I I
1065 - 1 1 10
1066 + 1 1 40",
1067 ))),
1068 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1069 ])
1070 .into_executor(schema.clone(), vec![0, 1]);
1071
1072 let sink_param = SinkParam {
1073 sink_id: 0.into(),
1074 sink_name: "test".into(),
1075 properties,
1076
1077 columns: columns
1078 .iter()
1079 .filter(|col| !col.is_hidden)
1080 .map(|col| col.column_desc.clone())
1081 .collect(),
1082 downstream_pk: Some(vec![0]),
1083 sink_type,
1084 ignore_delete: false,
1085 format_desc: None,
1086 db_name: "test".into(),
1087 sink_from_name: "test".into(),
1088 };
1089
1090 let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1091
1092 let sink = build_sink(sink_param.clone()).unwrap();
1093
1094 let sink_executor = SinkExecutor::new(
1095 ActorContext::for_test(0),
1096 info,
1097 source,
1098 SinkWriterParam::for_test(),
1099 sink,
1100 sink_param,
1101 columns.clone(),
1102 BoundedInMemLogStoreFactory::for_test(1),
1103 1024,
1104 vec![DataType::Int64, DataType::Int64, DataType::Int64],
1105 None,
1106 )
1107 .await
1108 .unwrap();
1109
1110 let mut executor = sink_executor.boxed().execute();
1111
1112 executor.next().await.unwrap().unwrap();
1114
1115 let chunk_msg = executor.next().await.unwrap().unwrap();
1116 assert_eq!(
1117 chunk_msg.into_chunk().unwrap().compact_vis(),
1118 StreamChunk::from_pretty(
1119 " I I I
1120 + 1 1 10",
1121 )
1122 );
1123
1124 executor.next().await.unwrap().unwrap();
1126
1127 let chunk_msg = executor.next().await.unwrap().unwrap();
1128 let expected = match sink_type {
1129 SinkType::Retract => StreamChunk::from_pretty(
1130 " I I I
1131 U- 1 1 10
1132 U+ 1 1 40",
1133 ),
1134 SinkType::Upsert => StreamChunk::from_pretty(
1135 " I I I
1136 + 1 1 40", ),
1138 _ => unreachable!(),
1139 };
1140 assert_eq!(chunk_msg.into_chunk().unwrap().compact_vis(), expected);
1141
1142 executor.next().await.unwrap().unwrap();
1144 }
1145
1146 #[tokio::test]
1147 async fn test_empty_barrier_sink() {
1148 use risingwave_common::types::DataType;
1149
1150 use crate::executor::Barrier;
1151
1152 let properties = maplit::btreemap! {
1153 "connector".into() => "blackhole".into(),
1154 "type".into() => "append-only".into(),
1155 "force_append_only".into() => "true".into()
1156 };
1157 let columns = vec![
1158 ColumnCatalog {
1159 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1160 is_hidden: false,
1161 },
1162 ColumnCatalog {
1163 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1164 is_hidden: false,
1165 },
1166 ];
1167 let schema: Schema = columns
1168 .iter()
1169 .map(|column| Field::from(column.column_desc.clone()))
1170 .collect();
1171 let stream_key = vec![0];
1172
1173 let source = MockSource::with_messages(vec![
1174 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1175 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1176 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1177 ])
1178 .into_executor(schema.clone(), stream_key.clone());
1179
1180 let sink_param = SinkParam {
1181 sink_id: 0.into(),
1182 sink_name: "test".into(),
1183 properties,
1184
1185 columns: columns
1186 .iter()
1187 .filter(|col| !col.is_hidden)
1188 .map(|col| col.column_desc.clone())
1189 .collect(),
1190 downstream_pk: Some(stream_key.clone()),
1191 sink_type: SinkType::AppendOnly,
1192 ignore_delete: true,
1193 format_desc: None,
1194 db_name: "test".into(),
1195 sink_from_name: "test".into(),
1196 };
1197
1198 let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
1199
1200 let sink = build_sink(sink_param.clone()).unwrap();
1201
1202 let sink_executor = SinkExecutor::new(
1203 ActorContext::for_test(0),
1204 info,
1205 source,
1206 SinkWriterParam::for_test(),
1207 sink,
1208 sink_param,
1209 columns,
1210 BoundedInMemLogStoreFactory::for_test(1),
1211 1024,
1212 vec![DataType::Int64, DataType::Int64],
1213 None,
1214 )
1215 .await
1216 .unwrap();
1217
1218 let mut executor = sink_executor.boxed().execute();
1219
1220 assert_eq!(
1222 executor.next().await.unwrap().unwrap(),
1223 Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1224 );
1225
1226 assert_eq!(
1228 executor.next().await.unwrap().unwrap(),
1229 Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1230 );
1231
1232 assert_eq!(
1234 executor.next().await.unwrap().unwrap(),
1235 Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1236 );
1237 }
1238
1239 #[tokio::test]
1240 async fn test_force_compaction() {
1241 use risingwave_common::array::StreamChunkTestExt;
1242 use risingwave_common::array::stream_chunk::StreamChunk;
1243 use risingwave_common::types::DataType;
1244
1245 use crate::executor::Barrier;
1246
1247 let properties = maplit::btreemap! {
1248 "connector".into() => "blackhole".into(),
1249 "force_compaction".into() => "true".into()
1250 };
1251
1252 let columns = vec![
1255 ColumnCatalog {
1256 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1257 is_hidden: false,
1258 },
1259 ColumnCatalog {
1260 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1261 is_hidden: false,
1262 },
1263 ColumnCatalog {
1264 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1265 is_hidden: true,
1266 },
1267 ];
1268 let schema: Schema = columns
1269 .iter()
1270 .map(|column| Field::from(column.column_desc.clone()))
1271 .collect();
1272
1273 let source = MockSource::with_messages(vec![
1274 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1275 Message::Chunk(StreamChunk::from_pretty(
1276 " I I I
1277 + 1 1 10",
1278 )),
1279 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1280 Message::Chunk(StreamChunk::from_pretty(
1281 " I I I
1282 + 1 3 30",
1283 )),
1284 Message::Chunk(StreamChunk::from_pretty(
1285 " I I I
1286 + 1 2 20
1287 - 1 2 20
1288 + 1 4 10",
1289 )),
1290 Message::Chunk(StreamChunk::from_pretty(
1291 " I I I
1292 - 1 1 10
1293 + 1 1 40",
1294 )),
1295 Message::Chunk(StreamChunk::from_pretty(
1296 " I I I
1297 - 1 4 30",
1298 )),
1299 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1300 ])
1301 .into_executor(schema.clone(), vec![0, 1]);
1302
1303 let sink_param = SinkParam {
1304 sink_id: 0.into(),
1305 sink_name: "test".into(),
1306 properties,
1307
1308 columns: columns
1309 .iter()
1310 .filter(|col| !col.is_hidden)
1311 .map(|col| col.column_desc.clone())
1312 .collect(),
1313 downstream_pk: Some(vec![0, 1]),
1314 sink_type: SinkType::Upsert,
1315 ignore_delete: false,
1316 format_desc: None,
1317 db_name: "test".into(),
1318 sink_from_name: "test".into(),
1319 };
1320
1321 let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1322
1323 let sink = build_sink(sink_param.clone()).unwrap();
1324
1325 let sink_executor = SinkExecutor::new(
1326 ActorContext::for_test(0),
1327 info,
1328 source,
1329 SinkWriterParam::for_test(),
1330 sink,
1331 sink_param,
1332 columns.clone(),
1333 BoundedInMemLogStoreFactory::for_test(1),
1334 1024,
1335 vec![DataType::Int64, DataType::Int64, DataType::Int64],
1336 None,
1337 )
1338 .await
1339 .unwrap();
1340
1341 let mut executor = sink_executor.boxed().execute();
1342
1343 executor.next().await.unwrap().unwrap();
1345
1346 let chunk_msg = executor.next().await.unwrap().unwrap();
1347 assert_eq!(
1348 chunk_msg.into_chunk().unwrap().compact_vis(),
1349 StreamChunk::from_pretty(
1350 " I I I
1351 + 1 1 10",
1352 )
1353 );
1354
1355 executor.next().await.unwrap().unwrap();
1357
1358 let chunk_msg = executor.next().await.unwrap().unwrap();
1359 assert_eq!(
1360 chunk_msg.into_chunk().unwrap().compact_vis(),
1361 StreamChunk::from_pretty(
1362 " I I I
1363 + 1 3 30
1364 + 1 1 40",
1365 )
1366 );
1367
1368 executor.next().await.unwrap().unwrap();
1370 }
1371}