1use std::mem;
16
17use anyhow::anyhow;
18use futures::stream::select;
19use futures::{FutureExt, TryFutureExt, TryStreamExt};
20use itertools::Itertools;
21use risingwave_common::array::Op;
22use risingwave_common::array::stream_chunk::StreamChunkMut;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnCatalog, Field};
25use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
26use risingwave_common_estimate_size::EstimateSize;
27use risingwave_common_estimate_size::collections::EstimatedVec;
28use risingwave_common_rate_limit::RateLimit;
29use risingwave_connector::dispatch_sink;
30use risingwave_connector::sink::catalog::SinkType;
31use risingwave_connector::sink::log_store::{
32 FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
33 LogWriter, LogWriterExt, LogWriterMetrics,
34};
35use risingwave_connector::sink::{
36 GLOBAL_SINK_METRICS, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam,
37};
38use thiserror_ext::AsReport;
39use tokio::select;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
41use tokio::sync::oneshot;
42
43use crate::common::compact_chunk::{StreamChunkCompactor, merge_chunk_row};
44use crate::executor::prelude::*;
45pub struct SinkExecutor<F: LogStoreFactory> {
46 actor_context: ActorContextRef,
47 info: ExecutorInfo,
48 input: Executor,
49 sink: SinkImpl,
50 input_columns: Vec<ColumnCatalog>,
51 sink_param: SinkParam,
52 log_store_factory: F,
53 sink_writer_param: SinkWriterParam,
54 chunk_size: usize,
55 input_data_types: Vec<DataType>,
56 need_advance_delete: bool,
57 re_construct_with_sink_pk: bool,
58 compact_chunk: bool,
59 rate_limit: Option<u32>,
60}
61
62fn force_append_only(c: StreamChunk) -> StreamChunk {
64 let mut c: StreamChunkMut = c.into();
65 for (_, mut r) in c.to_rows_mut() {
66 match r.op() {
67 Op::Insert => {}
68 Op::Delete | Op::UpdateDelete => r.set_vis(false),
69 Op::UpdateInsert => r.set_op(Op::Insert),
70 }
71 }
72 c.into()
73}
74
75fn force_delete_only(c: StreamChunk) -> StreamChunk {
77 let mut c: StreamChunkMut = c.into();
78 for (_, mut r) in c.to_rows_mut() {
79 match r.op() {
80 Op::Delete => {}
81 Op::Insert | Op::UpdateInsert => r.set_vis(false),
82 Op::UpdateDelete => r.set_op(Op::Delete),
83 }
84 }
85 c.into()
86}
87
88impl<F: LogStoreFactory> SinkExecutor<F> {
89 #[allow(clippy::too_many_arguments)]
90 #[expect(clippy::unused_async)]
91 pub async fn new(
92 actor_context: ActorContextRef,
93 info: ExecutorInfo,
94 input: Executor,
95 sink_writer_param: SinkWriterParam,
96 sink: SinkImpl,
97 sink_param: SinkParam,
98 columns: Vec<ColumnCatalog>,
99 log_store_factory: F,
100 chunk_size: usize,
101 input_data_types: Vec<DataType>,
102 rate_limit: Option<u32>,
103 ) -> StreamExecutorResult<Self> {
104 let sink_input_schema: Schema = columns
105 .iter()
106 .map(|column| Field::from(&column.column_desc))
107 .collect();
108
109 if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
110 assert_eq!(sink_input_schema.data_types(), {
112 let mut data_type = info.schema.data_types();
113 data_type.remove(col_dix);
114 data_type
115 });
116 } else {
117 assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
118 }
119
120 let stream_key = info.pk_indices.clone();
121 let stream_key_sink_pk_mismatch = {
122 stream_key
123 .iter()
124 .any(|i| !sink_param.downstream_pk.contains(i))
125 };
126 let need_advance_delete =
157 stream_key_sink_pk_mismatch && sink_param.sink_type != SinkType::AppendOnly;
158 let re_construct_with_sink_pk = need_advance_delete
160 && sink_param.sink_type == SinkType::Upsert
161 && !sink_param.downstream_pk.is_empty();
162 let compact_chunk = !sink.is_blackhole();
164
165 tracing::info!(
166 sink_id = sink_param.sink_id.sink_id,
167 actor_id = actor_context.id,
168 need_advance_delete,
169 re_construct_with_sink_pk,
170 compact_chunk,
171 "Sink executor info"
172 );
173
174 Ok(Self {
175 actor_context,
176 info,
177 input,
178 sink,
179 input_columns: columns,
180 sink_param,
181 log_store_factory,
182 sink_writer_param,
183 chunk_size,
184 input_data_types,
185 need_advance_delete,
186 re_construct_with_sink_pk,
187 compact_chunk,
188 rate_limit,
189 })
190 }
191
192 fn execute_inner(self) -> BoxedMessageStream {
193 let sink_id = self.sink_param.sink_id;
194 let actor_id = self.actor_context.id;
195 let fragment_id = self.actor_context.fragment_id;
196
197 let stream_key = self.info.pk_indices.clone();
198 let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
199 sink_id,
200 actor_id,
201 fragment_id,
202 );
203
204 let input = self.input.execute();
205
206 let input = input.inspect_ok(move |msg| {
207 if let Message::Chunk(c) = msg {
208 metrics.sink_input_row_count.inc_by(c.capacity() as u64);
209 metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
210 }
211 });
212
213 let processed_input = Self::process_msg(
214 input,
215 self.sink_param.sink_type,
216 stream_key,
217 self.need_advance_delete,
218 self.re_construct_with_sink_pk,
219 self.chunk_size,
220 self.input_data_types,
221 self.sink_param.downstream_pk.clone(),
222 metrics.sink_chunk_buffer_size,
223 self.compact_chunk,
224 );
225
226 if self.sink.is_sink_into_table() {
227 processed_input.boxed()
229 } else {
230 let labels = [
231 &actor_id.to_string(),
232 &sink_id.to_string(),
233 self.sink_param.sink_name.as_str(),
234 ];
235 let log_store_first_write_epoch = GLOBAL_SINK_METRICS
236 .log_store_first_write_epoch
237 .with_guarded_label_values(&labels);
238 let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
239 .log_store_latest_write_epoch
240 .with_guarded_label_values(&labels);
241 let log_store_write_rows = GLOBAL_SINK_METRICS
242 .log_store_write_rows
243 .with_guarded_label_values(&labels);
244 let log_writer_metrics = LogWriterMetrics {
245 log_store_first_write_epoch,
246 log_store_latest_write_epoch,
247 log_store_write_rows,
248 };
249
250 let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
251 rate_limit_tx.send(self.rate_limit.into()).unwrap();
253
254 let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
255
256 self.log_store_factory
257 .build()
258 .map(move |(log_reader, log_writer)| {
259 let write_log_stream = Self::execute_write_log(
260 processed_input,
261 log_writer.monitored(log_writer_metrics),
262 actor_id,
263 rate_limit_tx,
264 rebuild_sink_tx,
265 );
266
267 let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
268 let consume_log_stream = Self::execute_consume_log(
269 *sink,
270 log_reader,
271 self.input_columns,
272 self.sink_param,
273 self.sink_writer_param,
274 self.actor_context,
275 rate_limit_rx,
276 rebuild_sink_rx,
277 )
278 .instrument_await(
279 await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
280 )
281 .map_ok(|never| never); consume_log_stream.boxed()
284 });
285 select(consume_log_stream_future.into_stream(), write_log_stream)
286 })
287 .into_stream()
288 .flatten()
289 .boxed()
290 }
291 }
292
293 #[try_stream(ok = Message, error = StreamExecutorError)]
294 async fn execute_write_log<W: LogWriter>(
295 input: impl MessageStream,
296 mut log_writer: W,
297 actor_id: ActorId,
298 rate_limit_tx: UnboundedSender<RateLimit>,
299 rebuild_sink_tx: UnboundedSender<(Arc<Bitmap>, oneshot::Sender<()>)>,
300 ) {
301 pin_mut!(input);
302 let barrier = expect_first_barrier(&mut input).await?;
303 let epoch_pair = barrier.epoch;
304 let is_pause_on_startup = barrier.is_pause_on_startup();
305 yield Message::Barrier(barrier);
307
308 log_writer.init(epoch_pair, is_pause_on_startup).await?;
309
310 let mut is_paused = false;
311
312 #[for_await]
313 for msg in input {
314 match msg? {
315 Message::Watermark(w) => yield Message::Watermark(w),
316 Message::Chunk(chunk) => {
317 assert!(
318 !is_paused,
319 "Actor {actor_id} should not receive any data after pause"
320 );
321 log_writer.write_chunk(chunk.clone()).await?;
322 yield Message::Chunk(chunk);
323 }
324 Message::Barrier(barrier) => {
325 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
326 let post_flush = log_writer
327 .flush_current_epoch(
328 barrier.epoch.curr,
329 FlushCurrentEpochOptions {
330 is_checkpoint: barrier.kind.is_checkpoint(),
331 new_vnode_bitmap: update_vnode_bitmap.clone(),
332 is_stop: barrier.is_stop(actor_id),
333 },
334 )
335 .await?;
336
337 let mutation = barrier.mutation.clone();
338 yield Message::Barrier(barrier);
339 if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
340 && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
341 {
342 let (tx, rx) = oneshot::channel();
343 rebuild_sink_tx
344 .send((new_vnode_bitmap, tx))
345 .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
346 rx.await
347 .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
348 }
349 post_flush.post_yield_barrier().await?;
350
351 if let Some(mutation) = mutation.as_deref() {
352 match mutation {
353 Mutation::Pause => {
354 log_writer.pause()?;
355 is_paused = true;
356 }
357 Mutation::Resume => {
358 log_writer.resume()?;
359 is_paused = false;
360 }
361 Mutation::Throttle(actor_to_apply) => {
362 if let Some(new_rate_limit) = actor_to_apply.get(&actor_id) {
363 tracing::info!(
364 rate_limit = new_rate_limit,
365 "received sink rate limit on actor {actor_id}"
366 );
367 if let Err(e) = rate_limit_tx.send((*new_rate_limit).into()) {
368 error!(
369 error = %e.as_report(),
370 "fail to send sink ate limit update"
371 );
372 return Err(StreamExecutorError::from(
373 e.to_report_string(),
374 ));
375 }
376 }
377 }
378 _ => (),
379 }
380 }
381 }
382 }
383 }
384 }
385
386 #[allow(clippy::too_many_arguments)]
387 #[try_stream(ok = Message, error = StreamExecutorError)]
388 async fn process_msg(
389 input: impl MessageStream,
390 sink_type: SinkType,
391 stream_key: PkIndices,
392 need_advance_delete: bool,
393 re_construct_with_sink_pk: bool,
394 chunk_size: usize,
395 input_data_types: Vec<DataType>,
396 down_stream_pk: Vec<usize>,
397 sink_chunk_buffer_size_metrics: LabelGuardedIntGauge<3>,
398 compact_chunk: bool,
399 ) {
400 if need_advance_delete || re_construct_with_sink_pk {
402 let mut chunk_buffer = EstimatedVec::new();
403 let mut watermark: Option<super::Watermark> = None;
404 #[for_await]
405 for msg in input {
406 match msg? {
407 Message::Watermark(w) => watermark = Some(w),
408 Message::Chunk(c) => {
409 chunk_buffer.push(c);
410
411 sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
412 }
413 Message::Barrier(barrier) => {
414 let chunks = mem::take(&mut chunk_buffer).into_inner();
415 let chunks = if need_advance_delete {
416 let mut delete_chunks = vec![];
417 let mut insert_chunks = vec![];
418
419 for c in StreamChunkCompactor::new(stream_key.clone(), chunks)
420 .into_compacted_chunks()
421 {
422 if sink_type != SinkType::ForceAppendOnly {
423 let chunk = force_delete_only(c.clone());
427 if chunk.cardinality() > 0 {
428 delete_chunks.push(chunk);
429 }
430 }
431 let chunk = force_append_only(c);
432 if chunk.cardinality() > 0 {
433 insert_chunks.push(chunk);
434 }
435 }
436 delete_chunks
437 .into_iter()
438 .chain(insert_chunks.into_iter())
439 .collect()
440 } else {
441 chunks
442 };
443 if re_construct_with_sink_pk {
444 let chunks = StreamChunkCompactor::new(down_stream_pk.clone(), chunks)
445 .reconstructed_compacted_chunks(
446 chunk_size,
447 input_data_types.clone(),
448 sink_type != SinkType::ForceAppendOnly,
449 );
450 for c in chunks {
451 yield Message::Chunk(c);
452 }
453 } else {
454 let mut chunk_builder =
455 StreamChunkBuilder::new(chunk_size, input_data_types.clone());
456 for chunk in chunks {
457 for (op, row) in chunk.rows() {
458 if let Some(c) = chunk_builder.append_row(op, row) {
459 yield Message::Chunk(c);
460 }
461 }
462 }
463
464 if let Some(c) = chunk_builder.take() {
465 yield Message::Chunk(c);
466 }
467 };
468
469 if let Some(w) = mem::take(&mut watermark) {
470 yield Message::Watermark(w)
471 }
472 yield Message::Barrier(barrier);
473 }
474 }
475 }
476 } else {
477 #[for_await]
478 for msg in input {
479 match msg? {
480 Message::Watermark(w) => yield Message::Watermark(w),
481 Message::Chunk(mut chunk) => {
482 if compact_chunk {
485 chunk = merge_chunk_row(chunk, &stream_key);
486 }
487 match sink_type {
488 SinkType::AppendOnly => yield Message::Chunk(chunk),
489 SinkType::ForceAppendOnly => {
490 yield Message::Chunk(force_append_only(chunk))
494 }
495 SinkType::Upsert => {
496 for chunk in
499 StreamChunkCompactor::new(stream_key.clone(), vec![chunk])
500 .into_compacted_chunks()
501 {
502 yield Message::Chunk(chunk)
503 }
504 }
505 }
506 }
507 Message::Barrier(barrier) => {
508 yield Message::Barrier(barrier);
509 }
510 }
511 }
512 }
513 }
514
515 #[expect(clippy::too_many_arguments)]
516 async fn execute_consume_log<S: Sink, R: LogReader>(
517 sink: S,
518 log_reader: R,
519 columns: Vec<ColumnCatalog>,
520 sink_param: SinkParam,
521 mut sink_writer_param: SinkWriterParam,
522 actor_context: ActorContextRef,
523 rate_limit_rx: UnboundedReceiver<RateLimit>,
524 mut rebuild_sink_rx: UnboundedReceiver<(Arc<Bitmap>, oneshot::Sender<()>)>,
525 ) -> StreamExecutorResult<!> {
526 let visible_columns = columns
527 .iter()
528 .enumerate()
529 .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
530 .collect_vec();
531
532 let labels = [
533 &actor_context.id.to_string(),
534 sink_writer_param.connector.as_str(),
535 &sink_writer_param.sink_id.to_string(),
536 sink_writer_param.sink_name.as_str(),
537 ];
538 let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
539 .log_store_reader_wait_new_future_duration_ns
540 .with_guarded_label_values(&labels);
541 let log_store_read_rows = GLOBAL_SINK_METRICS
542 .log_store_read_rows
543 .with_guarded_label_values(&labels);
544 let log_store_read_bytes = GLOBAL_SINK_METRICS
545 .log_store_read_bytes
546 .with_guarded_label_values(&labels);
547 let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
548 .log_store_latest_read_epoch
549 .with_guarded_label_values(&labels);
550 let metrics = LogReaderMetrics {
551 log_store_latest_read_epoch,
552 log_store_read_rows,
553 log_store_read_bytes,
554 log_store_reader_wait_new_future_duration_ns,
555 };
556
557 let mut log_reader = log_reader
558 .transform_chunk(move |chunk| {
559 if visible_columns.len() != columns.len() {
560 chunk.project(&visible_columns)
563 } else {
564 chunk
565 }
566 })
567 .monitored(metrics)
568 .rate_limited(rate_limit_rx);
569
570 loop {
571 let future = async {
572 log_reader.init().await?;
573
574 loop {
575 let Err(e) = sink
576 .new_log_sinker(sink_writer_param.clone())
577 .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
578 .await;
579 GLOBAL_ERROR_METRICS.user_sink_error.report([
580 "sink_executor_error".to_owned(),
581 sink_param.sink_id.to_string(),
582 sink_param.sink_name.clone(),
583 actor_context.fragment_id.to_string(),
584 ]);
585
586 if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
587 meta_client
588 .add_sink_fail_evet_log(
589 sink_writer_param.sink_id.sink_id,
590 sink_writer_param.sink_name.clone(),
591 sink_writer_param.connector.clone(),
592 e.to_report_string(),
593 )
594 .await;
595 }
596
597 if F::ALLOW_REWIND {
598 match log_reader.rewind().await {
599 Ok(()) => {
600 warn!(
601 error = %e.as_report(),
602 executor_id = sink_writer_param.executor_id,
603 sink_id = sink_param.sink_id.sink_id,
604 "reset log reader stream successfully after sink error"
605 );
606 Ok(())
607 }
608 Err(rewind_err) => {
609 error!(
610 error = %rewind_err.as_report(),
611 "fail to rewind log reader"
612 );
613 Err(e)
614 }
615 }
616 } else {
617 Err(e)
618 }
619 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
620 }
621 };
622 select! {
623 result = future => {
624 let Err(e): StreamExecutorResult<!> = result;
625 return Err(e);
626 }
627 result = rebuild_sink_rx.recv() => {
628 let (new_vnode, notify) = result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))?;
629 sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
630 if notify.send(()).is_err() {
631 warn!("failed to notify rebuild sink");
632 }
633 }
634 }
635 }
636 }
637}
638
639impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
640 fn execute(self: Box<Self>) -> BoxedMessageStream {
641 self.execute_inner()
642 }
643}
644
645#[cfg(test)]
646mod test {
647 use risingwave_common::catalog::{ColumnDesc, ColumnId};
648 use risingwave_common::util::epoch::test_epoch;
649 use risingwave_connector::sink::build_sink;
650
651 use super::*;
652 use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
653 use crate::executor::test_utils::*;
654
655 #[tokio::test]
656 async fn test_force_append_only_sink() {
657 use risingwave_common::array::StreamChunkTestExt;
658 use risingwave_common::array::stream_chunk::StreamChunk;
659 use risingwave_common::types::DataType;
660
661 use crate::executor::Barrier;
662
663 let properties = maplit::btreemap! {
664 "connector".into() => "blackhole".into(),
665 "type".into() => "append-only".into(),
666 "force_append_only".into() => "true".into()
667 };
668
669 let columns = vec![
672 ColumnCatalog {
673 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
674 is_hidden: false,
675 },
676 ColumnCatalog {
677 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
678 is_hidden: false,
679 },
680 ColumnCatalog {
681 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
682 is_hidden: true,
683 },
684 ];
685 let schema: Schema = columns
686 .iter()
687 .map(|column| Field::from(column.column_desc.clone()))
688 .collect();
689 let pk_indices = vec![0];
690
691 let source = MockSource::with_messages(vec![
692 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
693 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
694 " I I I
695 + 3 2 1",
696 ))),
697 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
698 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
699 " I I I
700 U- 3 2 1
701 U+ 3 4 1
702 + 5 6 7",
703 ))),
704 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
705 " I I I
706 - 5 6 7",
707 ))),
708 ])
709 .into_executor(schema.clone(), pk_indices.clone());
710
711 let sink_param = SinkParam {
712 sink_id: 0.into(),
713 sink_name: "test".into(),
714 properties,
715
716 columns: columns
717 .iter()
718 .filter(|col| !col.is_hidden)
719 .map(|col| col.column_desc.clone())
720 .collect(),
721 downstream_pk: pk_indices.clone(),
722 sink_type: SinkType::ForceAppendOnly,
723 format_desc: None,
724 db_name: "test".into(),
725 sink_from_name: "test".into(),
726 };
727
728 let info = ExecutorInfo::new(schema, pk_indices, "SinkExecutor".to_owned(), 0);
729
730 let sink = build_sink(sink_param.clone()).unwrap();
731
732 let sink_executor = SinkExecutor::new(
733 ActorContext::for_test(0),
734 info,
735 source,
736 SinkWriterParam::for_test(),
737 sink,
738 sink_param,
739 columns.clone(),
740 BoundedInMemLogStoreFactory::new(1),
741 1024,
742 vec![DataType::Int32, DataType::Int32, DataType::Int32],
743 None,
744 )
745 .await
746 .unwrap();
747
748 let mut executor = sink_executor.boxed().execute();
749
750 executor.next().await.unwrap().unwrap();
752
753 let chunk_msg = executor.next().await.unwrap().unwrap();
754 assert_eq!(
755 chunk_msg.into_chunk().unwrap().compact(),
756 StreamChunk::from_pretty(
757 " I I I
758 + 3 2 1",
759 )
760 );
761
762 executor.next().await.unwrap().unwrap();
764
765 let chunk_msg = executor.next().await.unwrap().unwrap();
766 assert_eq!(
767 chunk_msg.into_chunk().unwrap().compact(),
768 StreamChunk::from_pretty(
769 " I I I
770 + 3 4 1
771 + 5 6 7",
772 )
773 );
774
775 executor.next().await.unwrap().unwrap();
780 }
781
782 #[tokio::test]
783 async fn stream_key_sink_pk_mismatch() {
784 use risingwave_common::array::StreamChunkTestExt;
785 use risingwave_common::array::stream_chunk::StreamChunk;
786 use risingwave_common::types::DataType;
787
788 use crate::executor::Barrier;
789
790 let properties = maplit::btreemap! {
791 "connector".into() => "blackhole".into(),
792 };
793
794 let columns = vec![
797 ColumnCatalog {
798 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
799 is_hidden: false,
800 },
801 ColumnCatalog {
802 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
803 is_hidden: false,
804 },
805 ColumnCatalog {
806 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
807 is_hidden: true,
808 },
809 ];
810 let schema: Schema = columns
811 .iter()
812 .map(|column| Field::from(column.column_desc.clone()))
813 .collect();
814
815 let source = MockSource::with_messages(vec![
816 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
817 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
818 " I I I
819 + 1 1 10",
820 ))),
821 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
822 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
823 " I I I
824 + 1 3 30",
825 ))),
826 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
827 " I I I
828 + 1 2 20
829 - 1 2 20",
830 ))),
831 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
832 " I I I
833 - 1 1 10
834 + 1 1 40",
835 ))),
836 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
837 ])
838 .into_executor(schema.clone(), vec![0, 1]);
839
840 let sink_param = SinkParam {
841 sink_id: 0.into(),
842 sink_name: "test".into(),
843 properties,
844
845 columns: columns
846 .iter()
847 .filter(|col| !col.is_hidden)
848 .map(|col| col.column_desc.clone())
849 .collect(),
850 downstream_pk: vec![0],
851 sink_type: SinkType::Upsert,
852 format_desc: None,
853 db_name: "test".into(),
854 sink_from_name: "test".into(),
855 };
856
857 let info = ExecutorInfo::new(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
858
859 let sink = build_sink(sink_param.clone()).unwrap();
860
861 let sink_executor = SinkExecutor::new(
862 ActorContext::for_test(0),
863 info,
864 source,
865 SinkWriterParam::for_test(),
866 sink,
867 sink_param,
868 columns.clone(),
869 BoundedInMemLogStoreFactory::new(1),
870 1024,
871 vec![DataType::Int64, DataType::Int64, DataType::Int64],
872 None,
873 )
874 .await
875 .unwrap();
876
877 let mut executor = sink_executor.boxed().execute();
878
879 executor.next().await.unwrap().unwrap();
881
882 let chunk_msg = executor.next().await.unwrap().unwrap();
883 assert_eq!(
884 chunk_msg.into_chunk().unwrap().compact(),
885 StreamChunk::from_pretty(
886 " I I I
887 + 1 1 10",
888 )
889 );
890
891 executor.next().await.unwrap().unwrap();
893
894 let chunk_msg = executor.next().await.unwrap().unwrap();
895 assert_eq!(
896 chunk_msg.into_chunk().unwrap().compact(),
897 StreamChunk::from_pretty(
898 " I I I
899 U- 1 1 10
900 U+ 1 1 40",
901 )
902 );
903
904 executor.next().await.unwrap().unwrap();
906 }
907
908 #[tokio::test]
909 async fn test_empty_barrier_sink() {
910 use risingwave_common::types::DataType;
911
912 use crate::executor::Barrier;
913
914 let properties = maplit::btreemap! {
915 "connector".into() => "blackhole".into(),
916 "type".into() => "append-only".into(),
917 "force_append_only".into() => "true".into()
918 };
919 let columns = vec![
920 ColumnCatalog {
921 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
922 is_hidden: false,
923 },
924 ColumnCatalog {
925 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
926 is_hidden: false,
927 },
928 ];
929 let schema: Schema = columns
930 .iter()
931 .map(|column| Field::from(column.column_desc.clone()))
932 .collect();
933 let pk_indices = vec![0];
934
935 let source = MockSource::with_messages(vec![
936 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
937 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
938 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
939 ])
940 .into_executor(schema.clone(), pk_indices.clone());
941
942 let sink_param = SinkParam {
943 sink_id: 0.into(),
944 sink_name: "test".into(),
945 properties,
946
947 columns: columns
948 .iter()
949 .filter(|col| !col.is_hidden)
950 .map(|col| col.column_desc.clone())
951 .collect(),
952 downstream_pk: pk_indices.clone(),
953 sink_type: SinkType::ForceAppendOnly,
954 format_desc: None,
955 db_name: "test".into(),
956 sink_from_name: "test".into(),
957 };
958
959 let info = ExecutorInfo::new(schema, pk_indices, "SinkExecutor".to_owned(), 0);
960
961 let sink = build_sink(sink_param.clone()).unwrap();
962
963 let sink_executor = SinkExecutor::new(
964 ActorContext::for_test(0),
965 info,
966 source,
967 SinkWriterParam::for_test(),
968 sink,
969 sink_param,
970 columns,
971 BoundedInMemLogStoreFactory::new(1),
972 1024,
973 vec![DataType::Int64, DataType::Int64],
974 None,
975 )
976 .await
977 .unwrap();
978
979 let mut executor = sink_executor.boxed().execute();
980
981 assert_eq!(
983 executor.next().await.unwrap().unwrap(),
984 Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
985 );
986
987 assert_eq!(
989 executor.next().await.unwrap().unwrap(),
990 Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
991 );
992
993 assert_eq!(
995 executor.next().await.unwrap().unwrap(),
996 Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
997 );
998 }
999}