1use std::assert_matches::assert_matches;
16use std::collections::HashMap;
17use std::mem;
18
19use anyhow::anyhow;
20use futures::stream::select;
21use futures::{FutureExt, TryFutureExt, TryStreamExt};
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::array::stream_chunk::StreamChunkMut;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{ColumnCatalog, Field};
27use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
28use risingwave_common_estimate_size::EstimateSize;
29use risingwave_common_estimate_size::collections::EstimatedVec;
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_connector::dispatch_sink;
32use risingwave_connector::sink::catalog::{SinkId, SinkType};
33use risingwave_connector::sink::log_store::{
34 FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
35 LogWriter, LogWriterExt, LogWriterMetrics,
36};
37use risingwave_connector::sink::{
38 GLOBAL_SINK_METRICS, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam,
39};
40use risingwave_pb::stream_plan::stream_node::StreamKind;
41use thiserror_ext::AsReport;
42use tokio::select;
43use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
44use tokio::sync::oneshot;
45
46use crate::common::change_buffer::{OutputKind, output_kind};
47use crate::common::compact_chunk::{
48 InconsistencyBehavior, StreamChunkCompactor, compact_chunk_inline,
49};
50use crate::executor::prelude::*;
51pub struct SinkExecutor<F: LogStoreFactory> {
52 actor_context: ActorContextRef,
53 info: ExecutorInfo,
54 input: Executor,
55 sink: SinkImpl,
56 input_columns: Vec<ColumnCatalog>,
57 sink_param: SinkParam,
58 log_store_factory: F,
59 sink_writer_param: SinkWriterParam,
60 chunk_size: usize,
61 input_data_types: Vec<DataType>,
62 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
63 rate_limit: Option<u32>,
64}
65
66fn force_append_only(c: StreamChunk) -> StreamChunk {
68 let mut c: StreamChunkMut = c.into();
69 for (_, mut r) in c.to_rows_mut() {
70 match r.op() {
71 Op::Insert => {}
72 Op::Delete | Op::UpdateDelete => r.set_vis(false),
73 Op::UpdateInsert => r.set_op(Op::Insert),
74 }
75 }
76 c.into()
77}
78
79fn force_delete_only(c: StreamChunk) -> StreamChunk {
81 let mut c: StreamChunkMut = c.into();
82 for (_, mut r) in c.to_rows_mut() {
83 match r.op() {
84 Op::Delete => {}
85 Op::Insert | Op::UpdateInsert => r.set_vis(false),
86 Op::UpdateDelete => r.set_op(Op::Delete),
87 }
88 }
89 c.into()
90}
91
92#[derive(Clone, Copy, Debug)]
95struct NonAppendOnlyBehavior {
96 pk_specified_and_matched: bool,
101}
102
103impl NonAppendOnlyBehavior {
104 fn should_compact_in_log_reader(self) -> bool {
122 self.pk_specified_and_matched
123 }
124
125 fn should_reorder_records(self) -> bool {
189 !self.pk_specified_and_matched
190 }
191}
192
193fn compact_output_kind(sink_type: SinkType) -> OutputKind {
195 match sink_type {
196 SinkType::Upsert => output_kind::UPSERT,
197 SinkType::Retract => output_kind::RETRACT,
198 SinkType::AppendOnly | SinkType::ForceAppendOnly => output_kind::RETRACT,
200 }
201}
202
203macro_rules! dispatch_output_kind {
205 ($sink_type:expr, $KIND:ident, $body:tt) => {
206 #[allow(unused_braces)]
207 match compact_output_kind($sink_type) {
208 output_kind::UPSERT => {
209 const KIND: OutputKind = output_kind::UPSERT;
210 $body
211 }
212 output_kind::RETRACT => {
213 const KIND: OutputKind = output_kind::RETRACT;
214 $body
215 }
216 }
217 };
218}
219
220impl<F: LogStoreFactory> SinkExecutor<F> {
221 #[allow(clippy::too_many_arguments)]
222 #[expect(clippy::unused_async)]
223 pub async fn new(
224 actor_context: ActorContextRef,
225 info: ExecutorInfo,
226 input: Executor,
227 sink_writer_param: SinkWriterParam,
228 sink: SinkImpl,
229 sink_param: SinkParam,
230 columns: Vec<ColumnCatalog>,
231 log_store_factory: F,
232 chunk_size: usize,
233 input_data_types: Vec<DataType>,
234 rate_limit: Option<u32>,
235 ) -> StreamExecutorResult<Self> {
236 let sink_input_schema: Schema = columns
237 .iter()
238 .map(|column| Field::from(&column.column_desc))
239 .collect();
240
241 if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
242 assert_eq!(sink_input_schema.data_types(), {
244 let mut data_type = info.schema.data_types();
245 data_type.remove(col_dix);
246 data_type
247 });
248 } else {
249 assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
250 }
251
252 let non_append_only_behavior = if !sink_param.sink_type.is_append_only() {
253 let stream_key = &info.stream_key;
254 let pk_specified_and_matched = (sink_param.downstream_pk.as_ref())
255 .is_some_and(|downstream_pk| stream_key.iter().all(|i| downstream_pk.contains(i)));
256 Some(NonAppendOnlyBehavior {
257 pk_specified_and_matched,
258 })
259 } else {
260 None
261 };
262
263 tracing::info!(
264 sink_id = sink_param.sink_id.sink_id,
265 actor_id = actor_context.id,
266 ?non_append_only_behavior,
267 "Sink executor info"
268 );
269
270 Ok(Self {
271 actor_context,
272 info,
273 input,
274 sink,
275 input_columns: columns,
276 sink_param,
277 log_store_factory,
278 sink_writer_param,
279 chunk_size,
280 input_data_types,
281 non_append_only_behavior,
282 rate_limit,
283 })
284 }
285
286 fn execute_inner(self) -> BoxedMessageStream {
287 let sink_id = self.sink_param.sink_id;
288 let actor_id = self.actor_context.id;
289 let fragment_id = self.actor_context.fragment_id;
290
291 let stream_key = self.info.stream_key.clone();
292 let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
293 sink_id,
294 actor_id,
295 fragment_id,
296 );
297
298 let input_compact_ib = if self.input.stream_kind() == StreamKind::Upsert {
301 InconsistencyBehavior::Tolerate
302 } else {
303 InconsistencyBehavior::Panic
304 };
305
306 let input = self.input.execute();
307
308 let input = input.inspect_ok(move |msg| {
309 if let Message::Chunk(c) = msg {
310 metrics.sink_input_row_count.inc_by(c.capacity() as u64);
311 metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
312 }
313 });
314
315 let processed_input = Self::process_msg(
316 input,
317 self.sink_param.sink_type,
318 stream_key,
319 self.chunk_size,
320 self.input_data_types,
321 input_compact_ib,
322 self.sink_param.downstream_pk.clone(),
323 self.non_append_only_behavior,
324 metrics.sink_chunk_buffer_size,
325 self.sink.is_blackhole(), );
327
328 if self.sink.is_sink_into_table() {
329 processed_input.boxed()
331 } else {
332 let labels = [
333 &actor_id.to_string(),
334 &sink_id.to_string(),
335 self.sink_param.sink_name.as_str(),
336 ];
337 let log_store_first_write_epoch = GLOBAL_SINK_METRICS
338 .log_store_first_write_epoch
339 .with_guarded_label_values(&labels);
340 let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
341 .log_store_latest_write_epoch
342 .with_guarded_label_values(&labels);
343 let log_store_write_rows = GLOBAL_SINK_METRICS
344 .log_store_write_rows
345 .with_guarded_label_values(&labels);
346 let log_writer_metrics = LogWriterMetrics {
347 log_store_first_write_epoch,
348 log_store_latest_write_epoch,
349 log_store_write_rows,
350 };
351
352 let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
353 rate_limit_tx.send(self.rate_limit.into()).unwrap();
355
356 let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
357
358 self.log_store_factory
359 .build()
360 .map(move |(log_reader, log_writer)| {
361 let write_log_stream = Self::execute_write_log(
362 processed_input,
363 log_writer.monitored(log_writer_metrics),
364 actor_id,
365 sink_id,
366 rate_limit_tx,
367 rebuild_sink_tx,
368 );
369
370 let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
371 let consume_log_stream = Self::execute_consume_log(
372 *sink,
373 log_reader,
374 self.input_columns,
375 self.sink_param,
376 self.sink_writer_param,
377 self.non_append_only_behavior,
378 input_compact_ib,
379 self.actor_context,
380 rate_limit_rx,
381 rebuild_sink_rx,
382 )
383 .instrument_await(
384 await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
385 )
386 .map_ok(|never| never); consume_log_stream.boxed()
389 });
390 select(consume_log_stream_future.into_stream(), write_log_stream)
391 })
392 .into_stream()
393 .flatten()
394 .boxed()
395 }
396 }
397
398 #[try_stream(ok = Message, error = StreamExecutorError)]
399 async fn execute_write_log<W: LogWriter>(
400 input: impl MessageStream,
401 mut log_writer: W,
402 actor_id: ActorId,
403 sink_id: SinkId,
404 rate_limit_tx: UnboundedSender<RateLimit>,
405 rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
406 ) {
407 pin_mut!(input);
408 let barrier = expect_first_barrier(&mut input).await?;
409 let epoch_pair = barrier.epoch;
410 let is_pause_on_startup = barrier.is_pause_on_startup();
411 yield Message::Barrier(barrier);
413
414 log_writer.init(epoch_pair, is_pause_on_startup).await?;
415
416 let mut is_paused = false;
417
418 #[for_await]
419 for msg in input {
420 match msg? {
421 Message::Watermark(w) => yield Message::Watermark(w),
422 Message::Chunk(chunk) => {
423 assert!(
424 !is_paused,
425 "Actor {actor_id} should not receive any data after pause"
426 );
427 log_writer.write_chunk(chunk.clone()).await?;
428 yield Message::Chunk(chunk);
429 }
430 Message::Barrier(barrier) => {
431 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
432 let add_columns = barrier.as_sink_add_columns(sink_id);
433 if let Some(add_columns) = &add_columns {
434 info!(?add_columns, %sink_id, "sink receive add columns");
435 }
436 let post_flush = log_writer
437 .flush_current_epoch(
438 barrier.epoch.curr,
439 FlushCurrentEpochOptions {
440 is_checkpoint: barrier.kind.is_checkpoint(),
441 new_vnode_bitmap: update_vnode_bitmap.clone(),
442 is_stop: barrier.is_stop(actor_id),
443 add_columns,
444 },
445 )
446 .await?;
447
448 let mutation = barrier.mutation.clone();
449 yield Message::Barrier(barrier);
450 if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
451 && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
452 {
453 let (tx, rx) = oneshot::channel();
454 rebuild_sink_tx
455 .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
456 .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
457 rx.await
458 .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
459 }
460 post_flush.post_yield_barrier().await?;
461
462 if let Some(mutation) = mutation.as_deref() {
463 match mutation {
464 Mutation::Pause => {
465 log_writer.pause()?;
466 is_paused = true;
467 }
468 Mutation::Resume => {
469 log_writer.resume()?;
470 is_paused = false;
471 }
472 Mutation::Throttle(actor_to_apply) => {
473 if let Some(new_rate_limit) = actor_to_apply.get(&actor_id) {
474 tracing::info!(
475 rate_limit = new_rate_limit,
476 "received sink rate limit on actor {actor_id}"
477 );
478 if let Err(e) = rate_limit_tx.send((*new_rate_limit).into()) {
479 error!(
480 error = %e.as_report(),
481 "fail to send sink ate limit update"
482 );
483 return Err(StreamExecutorError::from(
484 e.to_report_string(),
485 ));
486 }
487 }
488 }
489 Mutation::ConnectorPropsChange(config) => {
490 if let Some(map) = config.get(&sink_id.sink_id)
491 && let Err(e) = rebuild_sink_tx
492 .send(RebuildSinkMessage::UpdateConfig(map.clone()))
493 {
494 error!(
495 error = %e.as_report(),
496 "fail to send sink alter props"
497 );
498 return Err(StreamExecutorError::from(e.to_report_string()));
499 }
500 }
501 _ => (),
502 }
503 }
504 }
505 }
506 }
507 }
508
509 #[allow(clippy::too_many_arguments)]
510 #[try_stream(ok = Message, error = StreamExecutorError)]
511 async fn process_msg(
512 input: impl MessageStream,
513 sink_type: SinkType,
514 stream_key: StreamKey,
515 chunk_size: usize,
516 input_data_types: Vec<DataType>,
517 input_compact_ib: InconsistencyBehavior,
518 downstream_pk: Option<Vec<usize>>,
519 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
520 sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
521 skip_compact: bool,
522 ) {
523 if let Some(b) = non_append_only_behavior
525 && b.should_reorder_records()
526 {
527 assert_matches!(sink_type, SinkType::Upsert | SinkType::Retract);
528
529 let mut chunk_buffer = EstimatedVec::new();
530 let mut watermark: Option<super::Watermark> = None;
531 #[for_await]
532 for msg in input {
533 match msg? {
534 Message::Watermark(w) => watermark = Some(w),
535 Message::Chunk(c) => {
536 chunk_buffer.push(c);
537 sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
538 }
539 Message::Barrier(barrier) => {
540 let chunks = mem::take(&mut chunk_buffer).into_inner();
541
542 let mut delete_chunks = vec![];
545 let mut insert_chunks = vec![];
546
547 for c in dispatch_output_kind!(sink_type, KIND, {
548 StreamChunkCompactor::new(stream_key.clone(), chunks)
549 .into_compacted_chunks_inline::<KIND>(input_compact_ib)
550 }) {
551 let chunk = force_delete_only(c.clone());
552 if chunk.cardinality() > 0 {
553 delete_chunks.push(chunk);
554 }
555 let chunk = force_append_only(c);
556 if chunk.cardinality() > 0 {
557 insert_chunks.push(chunk);
558 }
559 }
560 let chunks = delete_chunks
561 .into_iter()
562 .chain(insert_chunks.into_iter())
563 .collect();
564
565 if let Some(downstream_pk) = &downstream_pk {
570 let chunks = dispatch_output_kind!(sink_type, KIND, {
571 StreamChunkCompactor::new(downstream_pk.clone(), chunks)
572 .into_compacted_chunks_reconstructed::<KIND>(
573 chunk_size,
574 input_data_types.clone(),
575 InconsistencyBehavior::Warn,
578 )
579 });
580 for c in chunks {
581 yield Message::Chunk(c);
582 }
583 } else {
584 let mut chunk_builder =
585 StreamChunkBuilder::new(chunk_size, input_data_types.clone());
586 for chunk in chunks {
587 for (op, row) in chunk.rows() {
588 if let Some(c) = chunk_builder.append_row(op, row) {
589 yield Message::Chunk(c);
590 }
591 }
592 }
593
594 if let Some(c) = chunk_builder.take() {
595 yield Message::Chunk(c);
596 }
597 };
598
599 if let Some(w) = mem::take(&mut watermark) {
601 yield Message::Watermark(w)
602 }
603 yield Message::Barrier(barrier);
604 }
605 }
606 }
607 } else {
608 #[for_await]
609 for msg in input {
610 match msg? {
611 Message::Watermark(w) => yield Message::Watermark(w),
612 Message::Chunk(mut chunk) => {
613 if !skip_compact {
615 chunk = dispatch_output_kind!(sink_type, KIND, {
616 compact_chunk_inline::<KIND>(chunk, &stream_key, input_compact_ib)
617 });
618 }
619 match sink_type {
620 SinkType::AppendOnly => yield Message::Chunk(chunk),
621 SinkType::ForceAppendOnly => {
622 yield Message::Chunk(force_append_only(chunk))
626 }
627 SinkType::Upsert | SinkType::Retract => yield Message::Chunk(chunk),
628 }
629 }
630 Message::Barrier(barrier) => {
631 yield Message::Barrier(barrier);
632 }
633 }
634 }
635 }
636 }
637
638 #[expect(clippy::too_many_arguments)]
639 async fn execute_consume_log<S: Sink, R: LogReader>(
640 mut sink: S,
641 log_reader: R,
642 columns: Vec<ColumnCatalog>,
643 mut sink_param: SinkParam,
644 mut sink_writer_param: SinkWriterParam,
645 non_append_only_behavior: Option<NonAppendOnlyBehavior>,
646 input_compact_ib: InconsistencyBehavior,
647 actor_context: ActorContextRef,
648 rate_limit_rx: UnboundedReceiver<RateLimit>,
649 mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
650 ) -> StreamExecutorResult<!> {
651 let visible_columns = columns
652 .iter()
653 .enumerate()
654 .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
655 .collect_vec();
656
657 let labels = [
658 &actor_context.id.to_string(),
659 sink_writer_param.connector.as_str(),
660 &sink_writer_param.sink_id.to_string(),
661 sink_writer_param.sink_name.as_str(),
662 ];
663 let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
664 .log_store_reader_wait_new_future_duration_ns
665 .with_guarded_label_values(&labels);
666 let log_store_read_rows = GLOBAL_SINK_METRICS
667 .log_store_read_rows
668 .with_guarded_label_values(&labels);
669 let log_store_read_bytes = GLOBAL_SINK_METRICS
670 .log_store_read_bytes
671 .with_guarded_label_values(&labels);
672 let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
673 .log_store_latest_read_epoch
674 .with_guarded_label_values(&labels);
675 let metrics = LogReaderMetrics {
676 log_store_latest_read_epoch,
677 log_store_read_rows,
678 log_store_read_bytes,
679 log_store_reader_wait_new_future_duration_ns,
680 };
681
682 let downstream_pk = sink_param.downstream_pk.clone();
683
684 let mut log_reader = log_reader
685 .transform_chunk(move |chunk| {
686 let chunk = if let Some(b) = non_append_only_behavior
687 && b.should_compact_in_log_reader()
688 {
689 let downstream_pk = downstream_pk.as_ref().unwrap();
691 dispatch_output_kind!(sink_param.sink_type, KIND, {
692 compact_chunk_inline::<KIND>(chunk, downstream_pk, input_compact_ib)
693 })
694 } else {
695 chunk
696 };
697 if visible_columns.len() != columns.len() {
698 chunk.project(&visible_columns)
701 } else {
702 chunk
703 }
704 })
705 .monitored(metrics)
706 .rate_limited(rate_limit_rx);
707
708 log_reader.init().await?;
709 loop {
710 let future = async {
711 loop {
712 let Err(e) = sink
713 .new_log_sinker(sink_writer_param.clone())
714 .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
715 .await;
716 GLOBAL_ERROR_METRICS.user_sink_error.report([
717 "sink_executor_error".to_owned(),
718 sink_param.sink_id.to_string(),
719 sink_param.sink_name.clone(),
720 actor_context.fragment_id.to_string(),
721 ]);
722
723 if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
724 meta_client
725 .add_sink_fail_evet_log(
726 sink_writer_param.sink_id.sink_id,
727 sink_writer_param.sink_name.clone(),
728 sink_writer_param.connector.clone(),
729 e.to_report_string(),
730 )
731 .await;
732 }
733
734 if F::ALLOW_REWIND {
735 match log_reader.rewind().await {
736 Ok(()) => {
737 error!(
738 error = %e.as_report(),
739 executor_id = sink_writer_param.executor_id,
740 sink_id = sink_param.sink_id.sink_id,
741 "reset log reader stream successfully after sink error"
742 );
743 Ok(())
744 }
745 Err(rewind_err) => {
746 error!(
747 error = %rewind_err.as_report(),
748 "fail to rewind log reader"
749 );
750 Err(e)
751 }
752 }
753 } else {
754 Err(e)
755 }
756 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
757 }
758 };
759 select! {
760 result = future => {
761 let Err(e): StreamExecutorResult<!> = result;
762 return Err(e);
763 }
764 result = rebuild_sink_rx.recv() => {
765 match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
766 RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
767 sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
768 if notify.send(()).is_err() {
769 warn!("failed to notify rebuild sink");
770 }
771 log_reader.init().await?;
772 },
773 RebuildSinkMessage::UpdateConfig(config) => {
774 if F::ALLOW_REWIND {
775 match log_reader.rewind().await {
776 Ok(()) => {
777 sink_param.properties.extend(config.into_iter());
778 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
779 info!(
780 executor_id = sink_writer_param.executor_id,
781 sink_id = sink_param.sink_id.sink_id,
782 "alter sink config successfully with rewind"
783 );
784 Ok(())
785 }
786 Err(rewind_err) => {
787 error!(
788 error = %rewind_err.as_report(),
789 "fail to rewind log reader for alter sink config "
790 );
791 Err(anyhow!("fail to rewind log after alter table").into())
792 }
793 }
794 } else {
795 sink_param.properties.extend(config.into_iter());
796 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
797 Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
798 }
799 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
800 },
801 }
802 }
803 }
804 }
805 }
806}
807
808enum RebuildSinkMessage {
809 RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
810 UpdateConfig(HashMap<String, String>),
811}
812
813impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
814 fn execute(self: Box<Self>) -> BoxedMessageStream {
815 self.execute_inner()
816 }
817}
818
819#[cfg(test)]
820mod test {
821 use risingwave_common::catalog::{ColumnDesc, ColumnId};
822 use risingwave_common::util::epoch::test_epoch;
823 use risingwave_connector::sink::build_sink;
824
825 use super::*;
826 use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
827 use crate::executor::test_utils::*;
828
829 #[tokio::test]
830 async fn test_force_append_only_sink() {
831 use risingwave_common::array::StreamChunkTestExt;
832 use risingwave_common::array::stream_chunk::StreamChunk;
833 use risingwave_common::types::DataType;
834
835 use crate::executor::Barrier;
836
837 let properties = maplit::btreemap! {
838 "connector".into() => "blackhole".into(),
839 "type".into() => "append-only".into(),
840 "force_append_only".into() => "true".into()
841 };
842
843 let columns = vec![
846 ColumnCatalog {
847 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
848 is_hidden: false,
849 },
850 ColumnCatalog {
851 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
852 is_hidden: false,
853 },
854 ColumnCatalog {
855 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
856 is_hidden: true,
857 },
858 ];
859 let schema: Schema = columns
860 .iter()
861 .map(|column| Field::from(column.column_desc.clone()))
862 .collect();
863 let stream_key = vec![0];
864
865 let source = MockSource::with_messages(vec![
866 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
867 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
868 " I I I
869 + 3 2 1",
870 ))),
871 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
872 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
873 " I I I
874 U- 3 2 1
875 U+ 3 4 1
876 + 5 6 7",
877 ))),
878 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
879 " I I I
880 - 5 6 7",
881 ))),
882 ])
883 .into_executor(schema.clone(), stream_key.clone());
884
885 let sink_param = SinkParam {
886 sink_id: 0.into(),
887 sink_name: "test".into(),
888 properties,
889
890 columns: columns
891 .iter()
892 .filter(|col| !col.is_hidden)
893 .map(|col| col.column_desc.clone())
894 .collect(),
895 downstream_pk: Some(stream_key.clone()),
896 sink_type: SinkType::ForceAppendOnly,
897 format_desc: None,
898 db_name: "test".into(),
899 sink_from_name: "test".into(),
900 };
901
902 let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
903
904 let sink = build_sink(sink_param.clone()).unwrap();
905
906 let sink_executor = SinkExecutor::new(
907 ActorContext::for_test(0),
908 info,
909 source,
910 SinkWriterParam::for_test(),
911 sink,
912 sink_param,
913 columns.clone(),
914 BoundedInMemLogStoreFactory::for_test(1),
915 1024,
916 vec![DataType::Int32, DataType::Int32, DataType::Int32],
917 None,
918 )
919 .await
920 .unwrap();
921
922 let mut executor = sink_executor.boxed().execute();
923
924 executor.next().await.unwrap().unwrap();
926
927 let chunk_msg = executor.next().await.unwrap().unwrap();
928 assert_eq!(
929 chunk_msg.into_chunk().unwrap().compact_vis(),
930 StreamChunk::from_pretty(
931 " I I I
932 + 3 2 1",
933 )
934 );
935
936 executor.next().await.unwrap().unwrap();
938
939 let chunk_msg = executor.next().await.unwrap().unwrap();
940 assert_eq!(
941 chunk_msg.into_chunk().unwrap().compact_vis(),
942 StreamChunk::from_pretty(
943 " I I I
944 + 3 4 1
945 + 5 6 7",
946 )
947 );
948
949 executor.next().await.unwrap().unwrap();
954 }
955
956 #[tokio::test]
957 async fn stream_key_sink_pk_mismatch_upsert() {
958 stream_key_sink_pk_mismatch(SinkType::Upsert).await;
959 }
960
961 #[tokio::test]
962 async fn stream_key_sink_pk_mismatch_retract() {
963 stream_key_sink_pk_mismatch(SinkType::Retract).await;
964 }
965
966 async fn stream_key_sink_pk_mismatch(sink_type: SinkType) {
967 use risingwave_common::array::StreamChunkTestExt;
968 use risingwave_common::array::stream_chunk::StreamChunk;
969 use risingwave_common::types::DataType;
970
971 use crate::executor::Barrier;
972
973 let properties = maplit::btreemap! {
974 "connector".into() => "blackhole".into(),
975 };
976
977 let columns = vec![
980 ColumnCatalog {
981 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
982 is_hidden: false,
983 },
984 ColumnCatalog {
985 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
986 is_hidden: false,
987 },
988 ColumnCatalog {
989 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
990 is_hidden: true,
991 },
992 ];
993 let schema: Schema = columns
994 .iter()
995 .map(|column| Field::from(column.column_desc.clone()))
996 .collect();
997
998 let source = MockSource::with_messages(vec![
999 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1000 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1001 " I I I
1002 + 1 1 10",
1003 ))),
1004 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1005 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1006 " I I I
1007 + 1 3 30",
1008 ))),
1009 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1010 " I I I
1011 + 1 2 20
1012 - 1 2 20",
1013 ))),
1014 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1015 " I I I
1016 - 1 1 10
1017 + 1 1 40",
1018 ))),
1019 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1020 ])
1021 .into_executor(schema.clone(), vec![0, 1]);
1022
1023 let sink_param = SinkParam {
1024 sink_id: 0.into(),
1025 sink_name: "test".into(),
1026 properties,
1027
1028 columns: columns
1029 .iter()
1030 .filter(|col| !col.is_hidden)
1031 .map(|col| col.column_desc.clone())
1032 .collect(),
1033 downstream_pk: Some(vec![0]),
1034 sink_type,
1035 format_desc: None,
1036 db_name: "test".into(),
1037 sink_from_name: "test".into(),
1038 };
1039
1040 let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1041
1042 let sink = build_sink(sink_param.clone()).unwrap();
1043
1044 let sink_executor = SinkExecutor::new(
1045 ActorContext::for_test(0),
1046 info,
1047 source,
1048 SinkWriterParam::for_test(),
1049 sink,
1050 sink_param,
1051 columns.clone(),
1052 BoundedInMemLogStoreFactory::for_test(1),
1053 1024,
1054 vec![DataType::Int64, DataType::Int64, DataType::Int64],
1055 None,
1056 )
1057 .await
1058 .unwrap();
1059
1060 let mut executor = sink_executor.boxed().execute();
1061
1062 executor.next().await.unwrap().unwrap();
1064
1065 let chunk_msg = executor.next().await.unwrap().unwrap();
1066 assert_eq!(
1067 chunk_msg.into_chunk().unwrap().compact_vis(),
1068 StreamChunk::from_pretty(
1069 " I I I
1070 + 1 1 10",
1071 )
1072 );
1073
1074 executor.next().await.unwrap().unwrap();
1076
1077 let chunk_msg = executor.next().await.unwrap().unwrap();
1078 let expected = match sink_type {
1079 SinkType::Retract => StreamChunk::from_pretty(
1080 " I I I
1081 U- 1 1 10
1082 U+ 1 1 40",
1083 ),
1084 SinkType::Upsert => StreamChunk::from_pretty(
1085 " I I I
1086 + 1 1 40", ),
1088 _ => unreachable!(),
1089 };
1090 assert_eq!(chunk_msg.into_chunk().unwrap().compact_vis(), expected);
1091
1092 executor.next().await.unwrap().unwrap();
1094 }
1095
1096 #[tokio::test]
1097 async fn test_empty_barrier_sink() {
1098 use risingwave_common::types::DataType;
1099
1100 use crate::executor::Barrier;
1101
1102 let properties = maplit::btreemap! {
1103 "connector".into() => "blackhole".into(),
1104 "type".into() => "append-only".into(),
1105 "force_append_only".into() => "true".into()
1106 };
1107 let columns = vec![
1108 ColumnCatalog {
1109 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1110 is_hidden: false,
1111 },
1112 ColumnCatalog {
1113 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1114 is_hidden: false,
1115 },
1116 ];
1117 let schema: Schema = columns
1118 .iter()
1119 .map(|column| Field::from(column.column_desc.clone()))
1120 .collect();
1121 let stream_key = vec![0];
1122
1123 let source = MockSource::with_messages(vec![
1124 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1125 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1126 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1127 ])
1128 .into_executor(schema.clone(), stream_key.clone());
1129
1130 let sink_param = SinkParam {
1131 sink_id: 0.into(),
1132 sink_name: "test".into(),
1133 properties,
1134
1135 columns: columns
1136 .iter()
1137 .filter(|col| !col.is_hidden)
1138 .map(|col| col.column_desc.clone())
1139 .collect(),
1140 downstream_pk: Some(stream_key.clone()),
1141 sink_type: SinkType::ForceAppendOnly,
1142 format_desc: None,
1143 db_name: "test".into(),
1144 sink_from_name: "test".into(),
1145 };
1146
1147 let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
1148
1149 let sink = build_sink(sink_param.clone()).unwrap();
1150
1151 let sink_executor = SinkExecutor::new(
1152 ActorContext::for_test(0),
1153 info,
1154 source,
1155 SinkWriterParam::for_test(),
1156 sink,
1157 sink_param,
1158 columns,
1159 BoundedInMemLogStoreFactory::for_test(1),
1160 1024,
1161 vec![DataType::Int64, DataType::Int64],
1162 None,
1163 )
1164 .await
1165 .unwrap();
1166
1167 let mut executor = sink_executor.boxed().execute();
1168
1169 assert_eq!(
1171 executor.next().await.unwrap().unwrap(),
1172 Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1173 );
1174
1175 assert_eq!(
1177 executor.next().await.unwrap().unwrap(),
1178 Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1179 );
1180
1181 assert_eq!(
1183 executor.next().await.unwrap().unwrap(),
1184 Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1185 );
1186 }
1187}