1use std::collections::HashMap;
16use std::mem;
17
18use anyhow::anyhow;
19use futures::stream::select;
20use futures::{FutureExt, TryFutureExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::array::Op;
23use risingwave_common::array::stream_chunk::StreamChunkMut;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{ColumnCatalog, Field};
26use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
27use risingwave_common_estimate_size::EstimateSize;
28use risingwave_common_estimate_size::collections::EstimatedVec;
29use risingwave_common_rate_limit::RateLimit;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::catalog::{SinkId, SinkType};
32use risingwave_connector::sink::log_store::{
33 FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
34 LogWriter, LogWriterExt, LogWriterMetrics,
35};
36use risingwave_connector::sink::{
37 GLOBAL_SINK_METRICS, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam,
38};
39use risingwave_pb::stream_plan::stream_node::StreamKind;
40use thiserror_ext::AsReport;
41use tokio::select;
42use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
43use tokio::sync::oneshot;
44
45use crate::common::compact_chunk::{InconsistencyBehavior, StreamChunkCompactor, merge_chunk_row};
46use crate::executor::prelude::*;
47pub struct SinkExecutor<F: LogStoreFactory> {
48 actor_context: ActorContextRef,
49 info: ExecutorInfo,
50 input: Executor,
51 sink: SinkImpl,
52 input_columns: Vec<ColumnCatalog>,
53 sink_param: SinkParam,
54 log_store_factory: F,
55 sink_writer_param: SinkWriterParam,
56 chunk_size: usize,
57 input_data_types: Vec<DataType>,
58 need_advance_delete: bool,
59 re_construct_with_sink_pk: bool,
60 pk_matched: bool,
62 compact_chunk: bool,
63 rate_limit: Option<u32>,
64}
65
66fn force_append_only(c: StreamChunk) -> StreamChunk {
68 let mut c: StreamChunkMut = c.into();
69 for (_, mut r) in c.to_rows_mut() {
70 match r.op() {
71 Op::Insert => {}
72 Op::Delete | Op::UpdateDelete => r.set_vis(false),
73 Op::UpdateInsert => r.set_op(Op::Insert),
74 }
75 }
76 c.into()
77}
78
79fn force_delete_only(c: StreamChunk) -> StreamChunk {
81 let mut c: StreamChunkMut = c.into();
82 for (_, mut r) in c.to_rows_mut() {
83 match r.op() {
84 Op::Delete => {}
85 Op::Insert | Op::UpdateInsert => r.set_vis(false),
86 Op::UpdateDelete => r.set_op(Op::Delete),
87 }
88 }
89 c.into()
90}
91
92impl<F: LogStoreFactory> SinkExecutor<F> {
93 #[allow(clippy::too_many_arguments)]
94 #[expect(clippy::unused_async)]
95 pub async fn new(
96 actor_context: ActorContextRef,
97 info: ExecutorInfo,
98 input: Executor,
99 sink_writer_param: SinkWriterParam,
100 sink: SinkImpl,
101 sink_param: SinkParam,
102 columns: Vec<ColumnCatalog>,
103 log_store_factory: F,
104 chunk_size: usize,
105 input_data_types: Vec<DataType>,
106 rate_limit: Option<u32>,
107 ) -> StreamExecutorResult<Self> {
108 let sink_input_schema: Schema = columns
109 .iter()
110 .map(|column| Field::from(&column.column_desc))
111 .collect();
112
113 if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
114 assert_eq!(sink_input_schema.data_types(), {
116 let mut data_type = info.schema.data_types();
117 data_type.remove(col_dix);
118 data_type
119 });
120 } else {
121 assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
122 }
123
124 let stream_key = info.pk_indices.clone();
125 let stream_key_sink_pk_mismatch = {
126 stream_key
127 .iter()
128 .any(|i| !sink_param.downstream_pk.contains(i))
129 };
130 let need_advance_delete = stream_key_sink_pk_mismatch
161 && !matches!(
162 sink_param.sink_type,
163 SinkType::AppendOnly | SinkType::ForceAppendOnly
164 );
165 let re_construct_with_sink_pk = need_advance_delete
167 && sink_param.sink_type == SinkType::Upsert
168 && !sink_param.downstream_pk.is_empty();
169 let pk_matched = !stream_key_sink_pk_mismatch;
170 let compact_chunk = !sink.is_blackhole();
172
173 tracing::info!(
174 sink_id = sink_param.sink_id.sink_id,
175 actor_id = actor_context.id,
176 need_advance_delete,
177 re_construct_with_sink_pk,
178 pk_matched,
179 compact_chunk,
180 "Sink executor info"
181 );
182
183 Ok(Self {
184 actor_context,
185 info,
186 input,
187 sink,
188 input_columns: columns,
189 sink_param,
190 log_store_factory,
191 sink_writer_param,
192 chunk_size,
193 input_data_types,
194 need_advance_delete,
195 re_construct_with_sink_pk,
196 pk_matched,
197 compact_chunk,
198 rate_limit,
199 })
200 }
201
202 fn execute_inner(self) -> BoxedMessageStream {
203 let sink_id = self.sink_param.sink_id;
204 let actor_id = self.actor_context.id;
205 let fragment_id = self.actor_context.fragment_id;
206
207 let stream_key = self.info.pk_indices.clone();
208 let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
209 sink_id,
210 actor_id,
211 fragment_id,
212 );
213
214 let input_compact_ib = if self.input.stream_kind() == StreamKind::Upsert {
217 InconsistencyBehavior::Tolerate
218 } else {
219 InconsistencyBehavior::Panic
220 };
221
222 let input = self.input.execute();
223
224 let input = input.inspect_ok(move |msg| {
225 if let Message::Chunk(c) = msg {
226 metrics.sink_input_row_count.inc_by(c.capacity() as u64);
227 metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
228 }
229 });
230
231 let processed_input = Self::process_msg(
232 input,
233 self.sink_param.sink_type,
234 stream_key,
235 self.need_advance_delete,
236 self.re_construct_with_sink_pk,
237 self.chunk_size,
238 self.input_data_types,
239 input_compact_ib,
240 self.sink_param.downstream_pk.clone(),
241 metrics.sink_chunk_buffer_size,
242 self.compact_chunk,
243 );
244
245 if self.sink.is_sink_into_table() {
246 processed_input.boxed()
248 } else {
249 let labels = [
250 &actor_id.to_string(),
251 &sink_id.to_string(),
252 self.sink_param.sink_name.as_str(),
253 ];
254 let log_store_first_write_epoch = GLOBAL_SINK_METRICS
255 .log_store_first_write_epoch
256 .with_guarded_label_values(&labels);
257 let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
258 .log_store_latest_write_epoch
259 .with_guarded_label_values(&labels);
260 let log_store_write_rows = GLOBAL_SINK_METRICS
261 .log_store_write_rows
262 .with_guarded_label_values(&labels);
263 let log_writer_metrics = LogWriterMetrics {
264 log_store_first_write_epoch,
265 log_store_latest_write_epoch,
266 log_store_write_rows,
267 };
268
269 let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
270 rate_limit_tx.send(self.rate_limit.into()).unwrap();
272
273 let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
274
275 self.log_store_factory
276 .build()
277 .map(move |(log_reader, log_writer)| {
278 let write_log_stream = Self::execute_write_log(
279 processed_input,
280 log_writer.monitored(log_writer_metrics),
281 actor_id,
282 sink_id,
283 rate_limit_tx,
284 rebuild_sink_tx,
285 );
286
287 let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
288 let consume_log_stream = Self::execute_consume_log(
289 *sink,
290 log_reader,
291 self.input_columns,
292 self.sink_param,
293 self.sink_writer_param,
294 self.pk_matched,
295 input_compact_ib,
296 self.actor_context,
297 rate_limit_rx,
298 rebuild_sink_rx,
299 )
300 .instrument_await(
301 await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
302 )
303 .map_ok(|never| never); consume_log_stream.boxed()
306 });
307 select(consume_log_stream_future.into_stream(), write_log_stream)
308 })
309 .into_stream()
310 .flatten()
311 .boxed()
312 }
313 }
314
315 #[try_stream(ok = Message, error = StreamExecutorError)]
316 async fn execute_write_log<W: LogWriter>(
317 input: impl MessageStream,
318 mut log_writer: W,
319 actor_id: ActorId,
320 sink_id: SinkId,
321 rate_limit_tx: UnboundedSender<RateLimit>,
322 rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
323 ) {
324 pin_mut!(input);
325 let barrier = expect_first_barrier(&mut input).await?;
326 let epoch_pair = barrier.epoch;
327 let is_pause_on_startup = barrier.is_pause_on_startup();
328 yield Message::Barrier(barrier);
330
331 log_writer.init(epoch_pair, is_pause_on_startup).await?;
332
333 let mut is_paused = false;
334
335 #[for_await]
336 for msg in input {
337 match msg? {
338 Message::Watermark(w) => yield Message::Watermark(w),
339 Message::Chunk(chunk) => {
340 assert!(
341 !is_paused,
342 "Actor {actor_id} should not receive any data after pause"
343 );
344 log_writer.write_chunk(chunk.clone()).await?;
345 yield Message::Chunk(chunk);
346 }
347 Message::Barrier(barrier) => {
348 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
349 let add_columns = barrier.as_sink_add_columns(sink_id);
350 if let Some(add_columns) = &add_columns {
351 info!(?add_columns, %sink_id, "sink receive add columns");
352 }
353 let post_flush = log_writer
354 .flush_current_epoch(
355 barrier.epoch.curr,
356 FlushCurrentEpochOptions {
357 is_checkpoint: barrier.kind.is_checkpoint(),
358 new_vnode_bitmap: update_vnode_bitmap.clone(),
359 is_stop: barrier.is_stop(actor_id),
360 add_columns,
361 },
362 )
363 .await?;
364
365 let mutation = barrier.mutation.clone();
366 yield Message::Barrier(barrier);
367 if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
368 && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
369 {
370 let (tx, rx) = oneshot::channel();
371 rebuild_sink_tx
372 .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
373 .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
374 rx.await
375 .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
376 }
377 post_flush.post_yield_barrier().await?;
378
379 if let Some(mutation) = mutation.as_deref() {
380 match mutation {
381 Mutation::Pause => {
382 log_writer.pause()?;
383 is_paused = true;
384 }
385 Mutation::Resume => {
386 log_writer.resume()?;
387 is_paused = false;
388 }
389 Mutation::Throttle(actor_to_apply) => {
390 if let Some(new_rate_limit) = actor_to_apply.get(&actor_id) {
391 tracing::info!(
392 rate_limit = new_rate_limit,
393 "received sink rate limit on actor {actor_id}"
394 );
395 if let Err(e) = rate_limit_tx.send((*new_rate_limit).into()) {
396 error!(
397 error = %e.as_report(),
398 "fail to send sink ate limit update"
399 );
400 return Err(StreamExecutorError::from(
401 e.to_report_string(),
402 ));
403 }
404 }
405 }
406 Mutation::ConnectorPropsChange(config) => {
407 if let Some(map) = config.get(&sink_id.sink_id)
408 && let Err(e) = rebuild_sink_tx
409 .send(RebuildSinkMessage::UpdateConfig(map.clone()))
410 {
411 error!(
412 error = %e.as_report(),
413 "fail to send sink alter props"
414 );
415 return Err(StreamExecutorError::from(e.to_report_string()));
416 }
417 }
418 _ => (),
419 }
420 }
421 }
422 }
423 }
424 }
425
426 #[allow(clippy::too_many_arguments)]
427 #[try_stream(ok = Message, error = StreamExecutorError)]
428 async fn process_msg(
429 input: impl MessageStream,
430 sink_type: SinkType,
431 stream_key: PkIndices,
432 need_advance_delete: bool,
433 re_construct_with_sink_pk: bool,
434 chunk_size: usize,
435 input_data_types: Vec<DataType>,
436 input_compact_ib: InconsistencyBehavior,
437 down_stream_pk: Vec<usize>,
438 sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
439 compact_chunk: bool,
440 ) {
441 if need_advance_delete || re_construct_with_sink_pk {
443 let mut chunk_buffer = EstimatedVec::new();
444 let mut watermark: Option<super::Watermark> = None;
445 #[for_await]
446 for msg in input {
447 match msg? {
448 Message::Watermark(w) => watermark = Some(w),
449 Message::Chunk(c) => {
450 chunk_buffer.push(c);
451
452 sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
453 }
454 Message::Barrier(barrier) => {
455 let chunks = mem::take(&mut chunk_buffer).into_inner();
456 let chunks = if need_advance_delete {
457 let mut delete_chunks = vec![];
458 let mut insert_chunks = vec![];
459
460 for c in StreamChunkCompactor::new(stream_key.clone(), chunks)
461 .into_compacted_chunks(input_compact_ib)
462 {
463 {
465 let chunk = force_delete_only(c.clone());
469 if chunk.cardinality() > 0 {
470 delete_chunks.push(chunk);
471 }
472 }
473 let chunk = force_append_only(c);
474 if chunk.cardinality() > 0 {
475 insert_chunks.push(chunk);
476 }
477 }
478 delete_chunks
479 .into_iter()
480 .chain(insert_chunks.into_iter())
481 .collect()
482 } else {
483 chunks
484 };
485 if re_construct_with_sink_pk {
486 let chunks = StreamChunkCompactor::new(down_stream_pk.clone(), chunks)
487 .reconstructed_compacted_chunks(
488 chunk_size,
489 input_data_types.clone(),
490 if sink_type == SinkType::ForceAppendOnly {
493 InconsistencyBehavior::Tolerate
494 } else {
495 InconsistencyBehavior::Warn
496 },
497 );
498 for c in chunks {
499 yield Message::Chunk(c);
500 }
501 } else {
502 let mut chunk_builder =
503 StreamChunkBuilder::new(chunk_size, input_data_types.clone());
504 for chunk in chunks {
505 for (op, row) in chunk.rows() {
506 if let Some(c) = chunk_builder.append_row(op, row) {
507 yield Message::Chunk(c);
508 }
509 }
510 }
511
512 if let Some(c) = chunk_builder.take() {
513 yield Message::Chunk(c);
514 }
515 };
516
517 if let Some(w) = mem::take(&mut watermark) {
518 yield Message::Watermark(w)
519 }
520 yield Message::Barrier(barrier);
521 }
522 }
523 }
524 } else {
525 #[for_await]
526 for msg in input {
527 match msg? {
528 Message::Watermark(w) => yield Message::Watermark(w),
529 Message::Chunk(mut chunk) => {
530 if compact_chunk {
533 chunk = merge_chunk_row(chunk, &stream_key, input_compact_ib);
534 }
535 match sink_type {
536 SinkType::AppendOnly => yield Message::Chunk(chunk),
537 SinkType::ForceAppendOnly => {
538 yield Message::Chunk(force_append_only(chunk))
542 }
543 SinkType::Upsert => yield Message::Chunk(chunk),
544 }
545 }
546 Message::Barrier(barrier) => {
547 yield Message::Barrier(barrier);
548 }
549 }
550 }
551 }
552 }
553
554 #[expect(clippy::too_many_arguments)]
555 async fn execute_consume_log<S: Sink, R: LogReader>(
556 mut sink: S,
557 log_reader: R,
558 columns: Vec<ColumnCatalog>,
559 mut sink_param: SinkParam,
560 mut sink_writer_param: SinkWriterParam,
561 pk_matched: bool,
562 input_compact_ib: InconsistencyBehavior,
563 actor_context: ActorContextRef,
564 rate_limit_rx: UnboundedReceiver<RateLimit>,
565 mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
566 ) -> StreamExecutorResult<!> {
567 let visible_columns = columns
568 .iter()
569 .enumerate()
570 .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
571 .collect_vec();
572
573 let labels = [
574 &actor_context.id.to_string(),
575 sink_writer_param.connector.as_str(),
576 &sink_writer_param.sink_id.to_string(),
577 sink_writer_param.sink_name.as_str(),
578 ];
579 let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
580 .log_store_reader_wait_new_future_duration_ns
581 .with_guarded_label_values(&labels);
582 let log_store_read_rows = GLOBAL_SINK_METRICS
583 .log_store_read_rows
584 .with_guarded_label_values(&labels);
585 let log_store_read_bytes = GLOBAL_SINK_METRICS
586 .log_store_read_bytes
587 .with_guarded_label_values(&labels);
588 let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
589 .log_store_latest_read_epoch
590 .with_guarded_label_values(&labels);
591 let metrics = LogReaderMetrics {
592 log_store_latest_read_epoch,
593 log_store_read_rows,
594 log_store_read_bytes,
595 log_store_reader_wait_new_future_duration_ns,
596 };
597
598 let downstream_pk = sink_param.downstream_pk.clone();
599
600 let mut log_reader = log_reader
601 .transform_chunk(move |chunk| {
602 let chunk = if pk_matched && matches!(sink_param.sink_type, SinkType::Upsert) {
619 merge_chunk_row(chunk, &downstream_pk, input_compact_ib)
620 } else {
621 chunk
622 };
623 if visible_columns.len() != columns.len() {
624 chunk.project(&visible_columns)
627 } else {
628 chunk
629 }
630 })
631 .monitored(metrics)
632 .rate_limited(rate_limit_rx);
633
634 log_reader.init().await?;
635 loop {
636 let future = async {
637 loop {
638 let Err(e) = sink
639 .new_log_sinker(sink_writer_param.clone())
640 .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
641 .await;
642 GLOBAL_ERROR_METRICS.user_sink_error.report([
643 "sink_executor_error".to_owned(),
644 sink_param.sink_id.to_string(),
645 sink_param.sink_name.clone(),
646 actor_context.fragment_id.to_string(),
647 ]);
648
649 if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
650 meta_client
651 .add_sink_fail_evet_log(
652 sink_writer_param.sink_id.sink_id,
653 sink_writer_param.sink_name.clone(),
654 sink_writer_param.connector.clone(),
655 e.to_report_string(),
656 )
657 .await;
658 }
659
660 if F::ALLOW_REWIND {
661 match log_reader.rewind().await {
662 Ok(()) => {
663 error!(
664 error = %e.as_report(),
665 executor_id = sink_writer_param.executor_id,
666 sink_id = sink_param.sink_id.sink_id,
667 "reset log reader stream successfully after sink error"
668 );
669 Ok(())
670 }
671 Err(rewind_err) => {
672 error!(
673 error = %rewind_err.as_report(),
674 "fail to rewind log reader"
675 );
676 Err(e)
677 }
678 }
679 } else {
680 Err(e)
681 }
682 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
683 }
684 };
685 select! {
686 result = future => {
687 let Err(e): StreamExecutorResult<!> = result;
688 return Err(e);
689 }
690 result = rebuild_sink_rx.recv() => {
691 match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
692 RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
693 sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
694 if notify.send(()).is_err() {
695 warn!("failed to notify rebuild sink");
696 }
697 log_reader.init().await?;
698 },
699 RebuildSinkMessage::UpdateConfig(config) => {
700 if F::ALLOW_REWIND {
701 match log_reader.rewind().await {
702 Ok(()) => {
703 sink_param.properties = config.into_iter().collect();
704 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
705 info!(
706 executor_id = sink_writer_param.executor_id,
707 sink_id = sink_param.sink_id.sink_id,
708 "alter sink config successfully with rewind"
709 );
710 Ok(())
711 }
712 Err(rewind_err) => {
713 error!(
714 error = %rewind_err.as_report(),
715 "fail to rewind log reader for alter sink config "
716 );
717 Err(anyhow!("fail to rewind log after alter table").into())
718 }
719 }
720 } else {
721 sink_param.properties = config.into_iter().collect();
722 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
723 Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
724 }
725 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
726 },
727 }
728 }
729 }
730 }
731 }
732}
733
734enum RebuildSinkMessage {
735 RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
736 UpdateConfig(HashMap<String, String>),
737}
738
739impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
740 fn execute(self: Box<Self>) -> BoxedMessageStream {
741 self.execute_inner()
742 }
743}
744
745#[cfg(test)]
746mod test {
747 use risingwave_common::catalog::{ColumnDesc, ColumnId};
748 use risingwave_common::util::epoch::test_epoch;
749 use risingwave_connector::sink::build_sink;
750
751 use super::*;
752 use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
753 use crate::executor::test_utils::*;
754
755 #[tokio::test]
756 async fn test_force_append_only_sink() {
757 use risingwave_common::array::StreamChunkTestExt;
758 use risingwave_common::array::stream_chunk::StreamChunk;
759 use risingwave_common::types::DataType;
760
761 use crate::executor::Barrier;
762
763 let properties = maplit::btreemap! {
764 "connector".into() => "blackhole".into(),
765 "type".into() => "append-only".into(),
766 "force_append_only".into() => "true".into()
767 };
768
769 let columns = vec![
772 ColumnCatalog {
773 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
774 is_hidden: false,
775 },
776 ColumnCatalog {
777 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
778 is_hidden: false,
779 },
780 ColumnCatalog {
781 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
782 is_hidden: true,
783 },
784 ];
785 let schema: Schema = columns
786 .iter()
787 .map(|column| Field::from(column.column_desc.clone()))
788 .collect();
789 let pk_indices = vec![0];
790
791 let source = MockSource::with_messages(vec![
792 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
793 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
794 " I I I
795 + 3 2 1",
796 ))),
797 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
798 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
799 " I I I
800 U- 3 2 1
801 U+ 3 4 1
802 + 5 6 7",
803 ))),
804 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
805 " I I I
806 - 5 6 7",
807 ))),
808 ])
809 .into_executor(schema.clone(), pk_indices.clone());
810
811 let sink_param = SinkParam {
812 sink_id: 0.into(),
813 sink_name: "test".into(),
814 properties,
815
816 columns: columns
817 .iter()
818 .filter(|col| !col.is_hidden)
819 .map(|col| col.column_desc.clone())
820 .collect(),
821 downstream_pk: pk_indices.clone(),
822 sink_type: SinkType::ForceAppendOnly,
823 format_desc: None,
824 db_name: "test".into(),
825 sink_from_name: "test".into(),
826 };
827
828 let info = ExecutorInfo::for_test(schema, pk_indices, "SinkExecutor".to_owned(), 0);
829
830 let sink = build_sink(sink_param.clone()).unwrap();
831
832 let sink_executor = SinkExecutor::new(
833 ActorContext::for_test(0),
834 info,
835 source,
836 SinkWriterParam::for_test(),
837 sink,
838 sink_param,
839 columns.clone(),
840 BoundedInMemLogStoreFactory::for_test(1),
841 1024,
842 vec![DataType::Int32, DataType::Int32, DataType::Int32],
843 None,
844 )
845 .await
846 .unwrap();
847
848 let mut executor = sink_executor.boxed().execute();
849
850 executor.next().await.unwrap().unwrap();
852
853 let chunk_msg = executor.next().await.unwrap().unwrap();
854 assert_eq!(
855 chunk_msg.into_chunk().unwrap().compact(),
856 StreamChunk::from_pretty(
857 " I I I
858 + 3 2 1",
859 )
860 );
861
862 executor.next().await.unwrap().unwrap();
864
865 let chunk_msg = executor.next().await.unwrap().unwrap();
866 assert_eq!(
867 chunk_msg.into_chunk().unwrap().compact(),
868 StreamChunk::from_pretty(
869 " I I I
870 + 3 4 1
871 + 5 6 7",
872 )
873 );
874
875 executor.next().await.unwrap().unwrap();
880 }
881
882 #[tokio::test]
883 async fn stream_key_sink_pk_mismatch() {
884 use risingwave_common::array::StreamChunkTestExt;
885 use risingwave_common::array::stream_chunk::StreamChunk;
886 use risingwave_common::types::DataType;
887
888 use crate::executor::Barrier;
889
890 let properties = maplit::btreemap! {
891 "connector".into() => "blackhole".into(),
892 };
893
894 let columns = vec![
897 ColumnCatalog {
898 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
899 is_hidden: false,
900 },
901 ColumnCatalog {
902 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
903 is_hidden: false,
904 },
905 ColumnCatalog {
906 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
907 is_hidden: true,
908 },
909 ];
910 let schema: Schema = columns
911 .iter()
912 .map(|column| Field::from(column.column_desc.clone()))
913 .collect();
914
915 let source = MockSource::with_messages(vec![
916 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
917 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
918 " I I I
919 + 1 1 10",
920 ))),
921 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
922 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
923 " I I I
924 + 1 3 30",
925 ))),
926 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
927 " I I I
928 + 1 2 20
929 - 1 2 20",
930 ))),
931 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
932 " I I I
933 - 1 1 10
934 + 1 1 40",
935 ))),
936 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
937 ])
938 .into_executor(schema.clone(), vec![0, 1]);
939
940 let sink_param = SinkParam {
941 sink_id: 0.into(),
942 sink_name: "test".into(),
943 properties,
944
945 columns: columns
946 .iter()
947 .filter(|col| !col.is_hidden)
948 .map(|col| col.column_desc.clone())
949 .collect(),
950 downstream_pk: vec![0],
951 sink_type: SinkType::Upsert,
952 format_desc: None,
953 db_name: "test".into(),
954 sink_from_name: "test".into(),
955 };
956
957 let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
958
959 let sink = build_sink(sink_param.clone()).unwrap();
960
961 let sink_executor = SinkExecutor::new(
962 ActorContext::for_test(0),
963 info,
964 source,
965 SinkWriterParam::for_test(),
966 sink,
967 sink_param,
968 columns.clone(),
969 BoundedInMemLogStoreFactory::for_test(1),
970 1024,
971 vec![DataType::Int64, DataType::Int64, DataType::Int64],
972 None,
973 )
974 .await
975 .unwrap();
976
977 let mut executor = sink_executor.boxed().execute();
978
979 executor.next().await.unwrap().unwrap();
981
982 let chunk_msg = executor.next().await.unwrap().unwrap();
983 assert_eq!(
984 chunk_msg.into_chunk().unwrap().compact(),
985 StreamChunk::from_pretty(
986 " I I I
987 + 1 1 10",
988 )
989 );
990
991 executor.next().await.unwrap().unwrap();
993
994 let chunk_msg = executor.next().await.unwrap().unwrap();
995 assert_eq!(
996 chunk_msg.into_chunk().unwrap().compact(),
997 StreamChunk::from_pretty(
998 " I I I
999 U- 1 1 10
1000 U+ 1 1 40",
1001 )
1002 );
1003
1004 executor.next().await.unwrap().unwrap();
1006 }
1007
1008 #[tokio::test]
1009 async fn test_empty_barrier_sink() {
1010 use risingwave_common::types::DataType;
1011
1012 use crate::executor::Barrier;
1013
1014 let properties = maplit::btreemap! {
1015 "connector".into() => "blackhole".into(),
1016 "type".into() => "append-only".into(),
1017 "force_append_only".into() => "true".into()
1018 };
1019 let columns = vec![
1020 ColumnCatalog {
1021 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1022 is_hidden: false,
1023 },
1024 ColumnCatalog {
1025 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1026 is_hidden: false,
1027 },
1028 ];
1029 let schema: Schema = columns
1030 .iter()
1031 .map(|column| Field::from(column.column_desc.clone()))
1032 .collect();
1033 let pk_indices = vec![0];
1034
1035 let source = MockSource::with_messages(vec![
1036 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1037 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1038 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1039 ])
1040 .into_executor(schema.clone(), pk_indices.clone());
1041
1042 let sink_param = SinkParam {
1043 sink_id: 0.into(),
1044 sink_name: "test".into(),
1045 properties,
1046
1047 columns: columns
1048 .iter()
1049 .filter(|col| !col.is_hidden)
1050 .map(|col| col.column_desc.clone())
1051 .collect(),
1052 downstream_pk: pk_indices.clone(),
1053 sink_type: SinkType::ForceAppendOnly,
1054 format_desc: None,
1055 db_name: "test".into(),
1056 sink_from_name: "test".into(),
1057 };
1058
1059 let info = ExecutorInfo::for_test(schema, pk_indices, "SinkExecutor".to_owned(), 0);
1060
1061 let sink = build_sink(sink_param.clone()).unwrap();
1062
1063 let sink_executor = SinkExecutor::new(
1064 ActorContext::for_test(0),
1065 info,
1066 source,
1067 SinkWriterParam::for_test(),
1068 sink,
1069 sink_param,
1070 columns,
1071 BoundedInMemLogStoreFactory::for_test(1),
1072 1024,
1073 vec![DataType::Int64, DataType::Int64],
1074 None,
1075 )
1076 .await
1077 .unwrap();
1078
1079 let mut executor = sink_executor.boxed().execute();
1080
1081 assert_eq!(
1083 executor.next().await.unwrap().unwrap(),
1084 Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1085 );
1086
1087 assert_eq!(
1089 executor.next().await.unwrap().unwrap(),
1090 Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1091 );
1092
1093 assert_eq!(
1095 executor.next().await.unwrap().unwrap(),
1096 Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1097 );
1098 }
1099}