1use std::collections::HashMap;
16use std::mem;
17
18use anyhow::anyhow;
19use futures::stream::select;
20use futures::{FutureExt, TryFutureExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::array::Op;
23use risingwave_common::array::stream_chunk::StreamChunkMut;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{ColumnCatalog, Field};
26use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
27use risingwave_common_estimate_size::EstimateSize;
28use risingwave_common_estimate_size::collections::EstimatedVec;
29use risingwave_common_rate_limit::RateLimit;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::catalog::{SinkId, SinkType};
32use risingwave_connector::sink::log_store::{
33 FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
34 LogWriter, LogWriterExt, LogWriterMetrics,
35};
36use risingwave_connector::sink::{
37 GLOBAL_SINK_METRICS, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam,
38};
39use thiserror_ext::AsReport;
40use tokio::select;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::oneshot;
43
44use crate::common::compact_chunk::{StreamChunkCompactor, merge_chunk_row};
45use crate::executor::prelude::*;
46pub struct SinkExecutor<F: LogStoreFactory> {
47 actor_context: ActorContextRef,
48 info: ExecutorInfo,
49 input: Executor,
50 sink: SinkImpl,
51 input_columns: Vec<ColumnCatalog>,
52 sink_param: SinkParam,
53 log_store_factory: F,
54 sink_writer_param: SinkWriterParam,
55 chunk_size: usize,
56 input_data_types: Vec<DataType>,
57 need_advance_delete: bool,
58 re_construct_with_sink_pk: bool,
59 pk_matched: bool,
61 compact_chunk: bool,
62 rate_limit: Option<u32>,
63}
64
65fn force_append_only(c: StreamChunk) -> StreamChunk {
67 let mut c: StreamChunkMut = c.into();
68 for (_, mut r) in c.to_rows_mut() {
69 match r.op() {
70 Op::Insert => {}
71 Op::Delete | Op::UpdateDelete => r.set_vis(false),
72 Op::UpdateInsert => r.set_op(Op::Insert),
73 }
74 }
75 c.into()
76}
77
78fn force_delete_only(c: StreamChunk) -> StreamChunk {
80 let mut c: StreamChunkMut = c.into();
81 for (_, mut r) in c.to_rows_mut() {
82 match r.op() {
83 Op::Delete => {}
84 Op::Insert | Op::UpdateInsert => r.set_vis(false),
85 Op::UpdateDelete => r.set_op(Op::Delete),
86 }
87 }
88 c.into()
89}
90
91impl<F: LogStoreFactory> SinkExecutor<F> {
92 #[allow(clippy::too_many_arguments)]
93 #[expect(clippy::unused_async)]
94 pub async fn new(
95 actor_context: ActorContextRef,
96 info: ExecutorInfo,
97 input: Executor,
98 sink_writer_param: SinkWriterParam,
99 sink: SinkImpl,
100 sink_param: SinkParam,
101 columns: Vec<ColumnCatalog>,
102 log_store_factory: F,
103 chunk_size: usize,
104 input_data_types: Vec<DataType>,
105 rate_limit: Option<u32>,
106 ) -> StreamExecutorResult<Self> {
107 let sink_input_schema: Schema = columns
108 .iter()
109 .map(|column| Field::from(&column.column_desc))
110 .collect();
111
112 if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
113 assert_eq!(sink_input_schema.data_types(), {
115 let mut data_type = info.schema.data_types();
116 data_type.remove(col_dix);
117 data_type
118 });
119 } else {
120 assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
121 }
122
123 let stream_key = info.pk_indices.clone();
124 let stream_key_sink_pk_mismatch = {
125 stream_key
126 .iter()
127 .any(|i| !sink_param.downstream_pk.contains(i))
128 };
129 let need_advance_delete = stream_key_sink_pk_mismatch
160 && !matches!(
161 sink_param.sink_type,
162 SinkType::AppendOnly | SinkType::ForceAppendOnly
163 );
164 let re_construct_with_sink_pk = need_advance_delete
166 && sink_param.sink_type == SinkType::Upsert
167 && !sink_param.downstream_pk.is_empty();
168 let pk_matched = !stream_key_sink_pk_mismatch;
169 let compact_chunk = !sink.is_blackhole();
171
172 tracing::info!(
173 sink_id = sink_param.sink_id.sink_id,
174 actor_id = actor_context.id,
175 need_advance_delete,
176 re_construct_with_sink_pk,
177 pk_matched,
178 compact_chunk,
179 "Sink executor info"
180 );
181
182 Ok(Self {
183 actor_context,
184 info,
185 input,
186 sink,
187 input_columns: columns,
188 sink_param,
189 log_store_factory,
190 sink_writer_param,
191 chunk_size,
192 input_data_types,
193 need_advance_delete,
194 re_construct_with_sink_pk,
195 pk_matched,
196 compact_chunk,
197 rate_limit,
198 })
199 }
200
201 fn execute_inner(self) -> BoxedMessageStream {
202 let sink_id = self.sink_param.sink_id;
203 let actor_id = self.actor_context.id;
204 let fragment_id = self.actor_context.fragment_id;
205
206 let stream_key = self.info.pk_indices.clone();
207 let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
208 sink_id,
209 actor_id,
210 fragment_id,
211 );
212
213 let input = self.input.execute();
214
215 let input = input.inspect_ok(move |msg| {
216 if let Message::Chunk(c) = msg {
217 metrics.sink_input_row_count.inc_by(c.capacity() as u64);
218 metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
219 }
220 });
221
222 let processed_input = Self::process_msg(
223 input,
224 self.sink_param.sink_type,
225 stream_key,
226 self.need_advance_delete,
227 self.re_construct_with_sink_pk,
228 self.chunk_size,
229 self.input_data_types,
230 self.sink_param.downstream_pk.clone(),
231 metrics.sink_chunk_buffer_size,
232 self.compact_chunk,
233 );
234
235 if self.sink.is_sink_into_table() {
236 processed_input.boxed()
238 } else {
239 let labels = [
240 &actor_id.to_string(),
241 &sink_id.to_string(),
242 self.sink_param.sink_name.as_str(),
243 ];
244 let log_store_first_write_epoch = GLOBAL_SINK_METRICS
245 .log_store_first_write_epoch
246 .with_guarded_label_values(&labels);
247 let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
248 .log_store_latest_write_epoch
249 .with_guarded_label_values(&labels);
250 let log_store_write_rows = GLOBAL_SINK_METRICS
251 .log_store_write_rows
252 .with_guarded_label_values(&labels);
253 let log_writer_metrics = LogWriterMetrics {
254 log_store_first_write_epoch,
255 log_store_latest_write_epoch,
256 log_store_write_rows,
257 };
258
259 let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
260 rate_limit_tx.send(self.rate_limit.into()).unwrap();
262
263 let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
264
265 self.log_store_factory
266 .build()
267 .map(move |(log_reader, log_writer)| {
268 let write_log_stream = Self::execute_write_log(
269 processed_input,
270 log_writer.monitored(log_writer_metrics),
271 actor_id,
272 sink_id,
273 rate_limit_tx,
274 rebuild_sink_tx,
275 );
276
277 let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
278 let consume_log_stream = Self::execute_consume_log(
279 *sink,
280 log_reader,
281 self.input_columns,
282 self.sink_param,
283 self.sink_writer_param,
284 self.pk_matched,
285 self.actor_context,
286 rate_limit_rx,
287 rebuild_sink_rx,
288 )
289 .instrument_await(
290 await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
291 )
292 .map_ok(|never| never); consume_log_stream.boxed()
295 });
296 select(consume_log_stream_future.into_stream(), write_log_stream)
297 })
298 .into_stream()
299 .flatten()
300 .boxed()
301 }
302 }
303
304 #[try_stream(ok = Message, error = StreamExecutorError)]
305 async fn execute_write_log<W: LogWriter>(
306 input: impl MessageStream,
307 mut log_writer: W,
308 actor_id: ActorId,
309 sink_id: SinkId,
310 rate_limit_tx: UnboundedSender<RateLimit>,
311 rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
312 ) {
313 pin_mut!(input);
314 let barrier = expect_first_barrier(&mut input).await?;
315 let epoch_pair = barrier.epoch;
316 let is_pause_on_startup = barrier.is_pause_on_startup();
317 yield Message::Barrier(barrier);
319
320 log_writer.init(epoch_pair, is_pause_on_startup).await?;
321
322 let mut is_paused = false;
323
324 #[for_await]
325 for msg in input {
326 match msg? {
327 Message::Watermark(w) => yield Message::Watermark(w),
328 Message::Chunk(chunk) => {
329 assert!(
330 !is_paused,
331 "Actor {actor_id} should not receive any data after pause"
332 );
333 log_writer.write_chunk(chunk.clone()).await?;
334 yield Message::Chunk(chunk);
335 }
336 Message::Barrier(barrier) => {
337 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
338 let post_flush = log_writer
339 .flush_current_epoch(
340 barrier.epoch.curr,
341 FlushCurrentEpochOptions {
342 is_checkpoint: barrier.kind.is_checkpoint(),
343 new_vnode_bitmap: update_vnode_bitmap.clone(),
344 is_stop: barrier.is_stop(actor_id),
345 },
346 )
347 .await?;
348
349 let mutation = barrier.mutation.clone();
350 yield Message::Barrier(barrier);
351 if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
352 && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
353 {
354 let (tx, rx) = oneshot::channel();
355 rebuild_sink_tx
356 .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
357 .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
358 rx.await
359 .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
360 }
361 post_flush.post_yield_barrier().await?;
362
363 if let Some(mutation) = mutation.as_deref() {
364 match mutation {
365 Mutation::Pause => {
366 log_writer.pause()?;
367 is_paused = true;
368 }
369 Mutation::Resume => {
370 log_writer.resume()?;
371 is_paused = false;
372 }
373 Mutation::Throttle(actor_to_apply) => {
374 if let Some(new_rate_limit) = actor_to_apply.get(&actor_id) {
375 tracing::info!(
376 rate_limit = new_rate_limit,
377 "received sink rate limit on actor {actor_id}"
378 );
379 if let Err(e) = rate_limit_tx.send((*new_rate_limit).into()) {
380 error!(
381 error = %e.as_report(),
382 "fail to send sink ate limit update"
383 );
384 return Err(StreamExecutorError::from(
385 e.to_report_string(),
386 ));
387 }
388 }
389 }
390 Mutation::ConnectorPropsChange(config) => {
391 if let Some(map) = config.get(&sink_id.sink_id)
392 && let Err(e) = rebuild_sink_tx
393 .send(RebuildSinkMessage::UpdateConfig(map.clone()))
394 {
395 error!(
396 error = %e.as_report(),
397 "fail to send sink alter props"
398 );
399 return Err(StreamExecutorError::from(e.to_report_string()));
400 }
401 }
402 _ => (),
403 }
404 }
405 }
406 }
407 }
408 }
409
410 #[allow(clippy::too_many_arguments)]
411 #[try_stream(ok = Message, error = StreamExecutorError)]
412 async fn process_msg(
413 input: impl MessageStream,
414 sink_type: SinkType,
415 stream_key: PkIndices,
416 need_advance_delete: bool,
417 re_construct_with_sink_pk: bool,
418 chunk_size: usize,
419 input_data_types: Vec<DataType>,
420 down_stream_pk: Vec<usize>,
421 sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
422 compact_chunk: bool,
423 ) {
424 if need_advance_delete || re_construct_with_sink_pk {
426 let mut chunk_buffer = EstimatedVec::new();
427 let mut watermark: Option<super::Watermark> = None;
428 #[for_await]
429 for msg in input {
430 match msg? {
431 Message::Watermark(w) => watermark = Some(w),
432 Message::Chunk(c) => {
433 chunk_buffer.push(c);
434
435 sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
436 }
437 Message::Barrier(barrier) => {
438 let chunks = mem::take(&mut chunk_buffer).into_inner();
439 let chunks = if need_advance_delete {
440 let mut delete_chunks = vec![];
441 let mut insert_chunks = vec![];
442
443 for c in StreamChunkCompactor::new(stream_key.clone(), chunks)
444 .into_compacted_chunks()
445 {
446 {
448 let chunk = force_delete_only(c.clone());
452 if chunk.cardinality() > 0 {
453 delete_chunks.push(chunk);
454 }
455 }
456 let chunk = force_append_only(c);
457 if chunk.cardinality() > 0 {
458 insert_chunks.push(chunk);
459 }
460 }
461 delete_chunks
462 .into_iter()
463 .chain(insert_chunks.into_iter())
464 .collect()
465 } else {
466 chunks
467 };
468 if re_construct_with_sink_pk {
469 let chunks = StreamChunkCompactor::new(down_stream_pk.clone(), chunks)
470 .reconstructed_compacted_chunks(
471 chunk_size,
472 input_data_types.clone(),
473 sink_type != SinkType::ForceAppendOnly,
474 );
475 for c in chunks {
476 yield Message::Chunk(c);
477 }
478 } else {
479 let mut chunk_builder =
480 StreamChunkBuilder::new(chunk_size, input_data_types.clone());
481 for chunk in chunks {
482 for (op, row) in chunk.rows() {
483 if let Some(c) = chunk_builder.append_row(op, row) {
484 yield Message::Chunk(c);
485 }
486 }
487 }
488
489 if let Some(c) = chunk_builder.take() {
490 yield Message::Chunk(c);
491 }
492 };
493
494 if let Some(w) = mem::take(&mut watermark) {
495 yield Message::Watermark(w)
496 }
497 yield Message::Barrier(barrier);
498 }
499 }
500 }
501 } else {
502 #[for_await]
503 for msg in input {
504 match msg? {
505 Message::Watermark(w) => yield Message::Watermark(w),
506 Message::Chunk(mut chunk) => {
507 if compact_chunk {
510 chunk = merge_chunk_row(chunk, &stream_key);
511 }
512 match sink_type {
513 SinkType::AppendOnly => yield Message::Chunk(chunk),
514 SinkType::ForceAppendOnly => {
515 yield Message::Chunk(force_append_only(chunk))
519 }
520 SinkType::Upsert => yield Message::Chunk(chunk),
521 }
522 }
523 Message::Barrier(barrier) => {
524 yield Message::Barrier(barrier);
525 }
526 }
527 }
528 }
529 }
530
531 #[expect(clippy::too_many_arguments)]
532 async fn execute_consume_log<S: Sink, R: LogReader>(
533 mut sink: S,
534 log_reader: R,
535 columns: Vec<ColumnCatalog>,
536 mut sink_param: SinkParam,
537 mut sink_writer_param: SinkWriterParam,
538 pk_matched: bool,
539 actor_context: ActorContextRef,
540 rate_limit_rx: UnboundedReceiver<RateLimit>,
541 mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
542 ) -> StreamExecutorResult<!> {
543 let visible_columns = columns
544 .iter()
545 .enumerate()
546 .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
547 .collect_vec();
548
549 let labels = [
550 &actor_context.id.to_string(),
551 sink_writer_param.connector.as_str(),
552 &sink_writer_param.sink_id.to_string(),
553 sink_writer_param.sink_name.as_str(),
554 ];
555 let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
556 .log_store_reader_wait_new_future_duration_ns
557 .with_guarded_label_values(&labels);
558 let log_store_read_rows = GLOBAL_SINK_METRICS
559 .log_store_read_rows
560 .with_guarded_label_values(&labels);
561 let log_store_read_bytes = GLOBAL_SINK_METRICS
562 .log_store_read_bytes
563 .with_guarded_label_values(&labels);
564 let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
565 .log_store_latest_read_epoch
566 .with_guarded_label_values(&labels);
567 let metrics = LogReaderMetrics {
568 log_store_latest_read_epoch,
569 log_store_read_rows,
570 log_store_read_bytes,
571 log_store_reader_wait_new_future_duration_ns,
572 };
573
574 let downstream_pk = sink_param.downstream_pk.clone();
575
576 let mut log_reader = log_reader
577 .transform_chunk(move |chunk| {
578 let chunk = if pk_matched && matches!(sink_param.sink_type, SinkType::Upsert) {
595 merge_chunk_row(chunk, &downstream_pk)
596 } else {
597 chunk
598 };
599 if visible_columns.len() != columns.len() {
600 chunk.project(&visible_columns)
603 } else {
604 chunk
605 }
606 })
607 .monitored(metrics)
608 .rate_limited(rate_limit_rx);
609
610 log_reader.init().await?;
611 loop {
612 let future = async {
613 loop {
614 let Err(e) = sink
615 .new_log_sinker(sink_writer_param.clone())
616 .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
617 .await;
618 GLOBAL_ERROR_METRICS.user_sink_error.report([
619 "sink_executor_error".to_owned(),
620 sink_param.sink_id.to_string(),
621 sink_param.sink_name.clone(),
622 actor_context.fragment_id.to_string(),
623 ]);
624
625 if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
626 meta_client
627 .add_sink_fail_evet_log(
628 sink_writer_param.sink_id.sink_id,
629 sink_writer_param.sink_name.clone(),
630 sink_writer_param.connector.clone(),
631 e.to_report_string(),
632 )
633 .await;
634 }
635
636 if F::ALLOW_REWIND {
637 match log_reader.rewind().await {
638 Ok(()) => {
639 warn!(
640 error = %e.as_report(),
641 executor_id = sink_writer_param.executor_id,
642 sink_id = sink_param.sink_id.sink_id,
643 "reset log reader stream successfully after sink error"
644 );
645 Ok(())
646 }
647 Err(rewind_err) => {
648 error!(
649 error = %rewind_err.as_report(),
650 "fail to rewind log reader"
651 );
652 Err(e)
653 }
654 }
655 } else {
656 Err(e)
657 }
658 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
659 }
660 };
661 select! {
662 result = future => {
663 let Err(e): StreamExecutorResult<!> = result;
664 return Err(e);
665 }
666 result = rebuild_sink_rx.recv() => {
667 match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
668 RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
669 sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
670 if notify.send(()).is_err() {
671 warn!("failed to notify rebuild sink");
672 }
673 log_reader.init().await?;
674 },
675 RebuildSinkMessage::UpdateConfig(config) => {
676 if F::ALLOW_REWIND {
677 match log_reader.rewind().await {
678 Ok(()) => {
679 sink_param.properties = config.into_iter().collect();
680 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
681 info!(
682 executor_id = sink_writer_param.executor_id,
683 sink_id = sink_param.sink_id.sink_id,
684 "alter sink config successfully with rewind"
685 );
686 Ok(())
687 }
688 Err(rewind_err) => {
689 error!(
690 error = %rewind_err.as_report(),
691 "fail to rewind log reader for alter sink config "
692 );
693 Err(anyhow!("fail to rewind log after alter table").into())
694 }
695 }
696 } else {
697 sink_param.properties = config.into_iter().collect();
698 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
699 Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
700 }
701 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
702 },
703 }
704 }
705 }
706 }
707 }
708}
709
710enum RebuildSinkMessage {
711 RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
712 UpdateConfig(HashMap<String, String>),
713}
714
715impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
716 fn execute(self: Box<Self>) -> BoxedMessageStream {
717 self.execute_inner()
718 }
719}
720
721#[cfg(test)]
722mod test {
723 use risingwave_common::catalog::{ColumnDesc, ColumnId};
724 use risingwave_common::util::epoch::test_epoch;
725 use risingwave_connector::sink::build_sink;
726
727 use super::*;
728 use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
729 use crate::executor::test_utils::*;
730
731 #[tokio::test]
732 async fn test_force_append_only_sink() {
733 use risingwave_common::array::StreamChunkTestExt;
734 use risingwave_common::array::stream_chunk::StreamChunk;
735 use risingwave_common::types::DataType;
736
737 use crate::executor::Barrier;
738
739 let properties = maplit::btreemap! {
740 "connector".into() => "blackhole".into(),
741 "type".into() => "append-only".into(),
742 "force_append_only".into() => "true".into()
743 };
744
745 let columns = vec![
748 ColumnCatalog {
749 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
750 is_hidden: false,
751 },
752 ColumnCatalog {
753 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
754 is_hidden: false,
755 },
756 ColumnCatalog {
757 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
758 is_hidden: true,
759 },
760 ];
761 let schema: Schema = columns
762 .iter()
763 .map(|column| Field::from(column.column_desc.clone()))
764 .collect();
765 let pk_indices = vec![0];
766
767 let source = MockSource::with_messages(vec![
768 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
769 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
770 " I I I
771 + 3 2 1",
772 ))),
773 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
774 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
775 " I I I
776 U- 3 2 1
777 U+ 3 4 1
778 + 5 6 7",
779 ))),
780 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
781 " I I I
782 - 5 6 7",
783 ))),
784 ])
785 .into_executor(schema.clone(), pk_indices.clone());
786
787 let sink_param = SinkParam {
788 sink_id: 0.into(),
789 sink_name: "test".into(),
790 properties,
791
792 columns: columns
793 .iter()
794 .filter(|col| !col.is_hidden)
795 .map(|col| col.column_desc.clone())
796 .collect(),
797 downstream_pk: pk_indices.clone(),
798 sink_type: SinkType::ForceAppendOnly,
799 format_desc: None,
800 db_name: "test".into(),
801 sink_from_name: "test".into(),
802 };
803
804 let info = ExecutorInfo::new(schema, pk_indices, "SinkExecutor".to_owned(), 0);
805
806 let sink = build_sink(sink_param.clone()).unwrap();
807
808 let sink_executor = SinkExecutor::new(
809 ActorContext::for_test(0),
810 info,
811 source,
812 SinkWriterParam::for_test(),
813 sink,
814 sink_param,
815 columns.clone(),
816 BoundedInMemLogStoreFactory::new(1),
817 1024,
818 vec![DataType::Int32, DataType::Int32, DataType::Int32],
819 None,
820 )
821 .await
822 .unwrap();
823
824 let mut executor = sink_executor.boxed().execute();
825
826 executor.next().await.unwrap().unwrap();
828
829 let chunk_msg = executor.next().await.unwrap().unwrap();
830 assert_eq!(
831 chunk_msg.into_chunk().unwrap().compact(),
832 StreamChunk::from_pretty(
833 " I I I
834 + 3 2 1",
835 )
836 );
837
838 executor.next().await.unwrap().unwrap();
840
841 let chunk_msg = executor.next().await.unwrap().unwrap();
842 assert_eq!(
843 chunk_msg.into_chunk().unwrap().compact(),
844 StreamChunk::from_pretty(
845 " I I I
846 + 3 4 1
847 + 5 6 7",
848 )
849 );
850
851 executor.next().await.unwrap().unwrap();
856 }
857
858 #[tokio::test]
859 async fn stream_key_sink_pk_mismatch() {
860 use risingwave_common::array::StreamChunkTestExt;
861 use risingwave_common::array::stream_chunk::StreamChunk;
862 use risingwave_common::types::DataType;
863
864 use crate::executor::Barrier;
865
866 let properties = maplit::btreemap! {
867 "connector".into() => "blackhole".into(),
868 };
869
870 let columns = vec![
873 ColumnCatalog {
874 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
875 is_hidden: false,
876 },
877 ColumnCatalog {
878 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
879 is_hidden: false,
880 },
881 ColumnCatalog {
882 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
883 is_hidden: true,
884 },
885 ];
886 let schema: Schema = columns
887 .iter()
888 .map(|column| Field::from(column.column_desc.clone()))
889 .collect();
890
891 let source = MockSource::with_messages(vec![
892 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
893 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
894 " I I I
895 + 1 1 10",
896 ))),
897 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
898 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
899 " I I I
900 + 1 3 30",
901 ))),
902 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
903 " I I I
904 + 1 2 20
905 - 1 2 20",
906 ))),
907 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
908 " I I I
909 - 1 1 10
910 + 1 1 40",
911 ))),
912 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
913 ])
914 .into_executor(schema.clone(), vec![0, 1]);
915
916 let sink_param = SinkParam {
917 sink_id: 0.into(),
918 sink_name: "test".into(),
919 properties,
920
921 columns: columns
922 .iter()
923 .filter(|col| !col.is_hidden)
924 .map(|col| col.column_desc.clone())
925 .collect(),
926 downstream_pk: vec![0],
927 sink_type: SinkType::Upsert,
928 format_desc: None,
929 db_name: "test".into(),
930 sink_from_name: "test".into(),
931 };
932
933 let info = ExecutorInfo::new(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
934
935 let sink = build_sink(sink_param.clone()).unwrap();
936
937 let sink_executor = SinkExecutor::new(
938 ActorContext::for_test(0),
939 info,
940 source,
941 SinkWriterParam::for_test(),
942 sink,
943 sink_param,
944 columns.clone(),
945 BoundedInMemLogStoreFactory::new(1),
946 1024,
947 vec![DataType::Int64, DataType::Int64, DataType::Int64],
948 None,
949 )
950 .await
951 .unwrap();
952
953 let mut executor = sink_executor.boxed().execute();
954
955 executor.next().await.unwrap().unwrap();
957
958 let chunk_msg = executor.next().await.unwrap().unwrap();
959 assert_eq!(
960 chunk_msg.into_chunk().unwrap().compact(),
961 StreamChunk::from_pretty(
962 " I I I
963 + 1 1 10",
964 )
965 );
966
967 executor.next().await.unwrap().unwrap();
969
970 let chunk_msg = executor.next().await.unwrap().unwrap();
971 assert_eq!(
972 chunk_msg.into_chunk().unwrap().compact(),
973 StreamChunk::from_pretty(
974 " I I I
975 U- 1 1 10
976 U+ 1 1 40",
977 )
978 );
979
980 executor.next().await.unwrap().unwrap();
982 }
983
984 #[tokio::test]
985 async fn test_empty_barrier_sink() {
986 use risingwave_common::types::DataType;
987
988 use crate::executor::Barrier;
989
990 let properties = maplit::btreemap! {
991 "connector".into() => "blackhole".into(),
992 "type".into() => "append-only".into(),
993 "force_append_only".into() => "true".into()
994 };
995 let columns = vec![
996 ColumnCatalog {
997 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
998 is_hidden: false,
999 },
1000 ColumnCatalog {
1001 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1002 is_hidden: false,
1003 },
1004 ];
1005 let schema: Schema = columns
1006 .iter()
1007 .map(|column| Field::from(column.column_desc.clone()))
1008 .collect();
1009 let pk_indices = vec![0];
1010
1011 let source = MockSource::with_messages(vec![
1012 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1013 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1014 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1015 ])
1016 .into_executor(schema.clone(), pk_indices.clone());
1017
1018 let sink_param = SinkParam {
1019 sink_id: 0.into(),
1020 sink_name: "test".into(),
1021 properties,
1022
1023 columns: columns
1024 .iter()
1025 .filter(|col| !col.is_hidden)
1026 .map(|col| col.column_desc.clone())
1027 .collect(),
1028 downstream_pk: pk_indices.clone(),
1029 sink_type: SinkType::ForceAppendOnly,
1030 format_desc: None,
1031 db_name: "test".into(),
1032 sink_from_name: "test".into(),
1033 };
1034
1035 let info = ExecutorInfo::new(schema, pk_indices, "SinkExecutor".to_owned(), 0);
1036
1037 let sink = build_sink(sink_param.clone()).unwrap();
1038
1039 let sink_executor = SinkExecutor::new(
1040 ActorContext::for_test(0),
1041 info,
1042 source,
1043 SinkWriterParam::for_test(),
1044 sink,
1045 sink_param,
1046 columns,
1047 BoundedInMemLogStoreFactory::new(1),
1048 1024,
1049 vec![DataType::Int64, DataType::Int64],
1050 None,
1051 )
1052 .await
1053 .unwrap();
1054
1055 let mut executor = sink_executor.boxed().execute();
1056
1057 assert_eq!(
1059 executor.next().await.unwrap().unwrap(),
1060 Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1061 );
1062
1063 assert_eq!(
1065 executor.next().await.unwrap().unwrap(),
1066 Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1067 );
1068
1069 assert_eq!(
1071 executor.next().await.unwrap().unwrap(),
1072 Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1073 );
1074 }
1075}