1use std::collections::HashMap;
16use std::mem;
17
18use anyhow::anyhow;
19use futures::stream::select;
20use futures::{FutureExt, TryFutureExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::array::Op;
23use risingwave_common::array::stream_chunk::StreamChunkMut;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{ColumnCatalog, Field};
26use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
27use risingwave_common_estimate_size::EstimateSize;
28use risingwave_common_estimate_size::collections::EstimatedVec;
29use risingwave_common_rate_limit::RateLimit;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::catalog::{SinkId, SinkType};
32use risingwave_connector::sink::log_store::{
33 FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
34 LogWriter, LogWriterExt, LogWriterMetrics,
35};
36use risingwave_connector::sink::{
37 GLOBAL_SINK_METRICS, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam,
38};
39use thiserror_ext::AsReport;
40use tokio::select;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::oneshot;
43
44use crate::common::compact_chunk::{StreamChunkCompactor, merge_chunk_row};
45use crate::executor::prelude::*;
46pub struct SinkExecutor<F: LogStoreFactory> {
47 actor_context: ActorContextRef,
48 info: ExecutorInfo,
49 input: Executor,
50 sink: SinkImpl,
51 input_columns: Vec<ColumnCatalog>,
52 sink_param: SinkParam,
53 log_store_factory: F,
54 sink_writer_param: SinkWriterParam,
55 chunk_size: usize,
56 input_data_types: Vec<DataType>,
57 need_advance_delete: bool,
58 re_construct_with_sink_pk: bool,
59 compact_chunk: bool,
60 rate_limit: Option<u32>,
61}
62
63fn force_append_only(c: StreamChunk) -> StreamChunk {
65 let mut c: StreamChunkMut = c.into();
66 for (_, mut r) in c.to_rows_mut() {
67 match r.op() {
68 Op::Insert => {}
69 Op::Delete | Op::UpdateDelete => r.set_vis(false),
70 Op::UpdateInsert => r.set_op(Op::Insert),
71 }
72 }
73 c.into()
74}
75
76fn force_delete_only(c: StreamChunk) -> StreamChunk {
78 let mut c: StreamChunkMut = c.into();
79 for (_, mut r) in c.to_rows_mut() {
80 match r.op() {
81 Op::Delete => {}
82 Op::Insert | Op::UpdateInsert => r.set_vis(false),
83 Op::UpdateDelete => r.set_op(Op::Delete),
84 }
85 }
86 c.into()
87}
88
89impl<F: LogStoreFactory> SinkExecutor<F> {
90 #[allow(clippy::too_many_arguments)]
91 #[expect(clippy::unused_async)]
92 pub async fn new(
93 actor_context: ActorContextRef,
94 info: ExecutorInfo,
95 input: Executor,
96 sink_writer_param: SinkWriterParam,
97 sink: SinkImpl,
98 sink_param: SinkParam,
99 columns: Vec<ColumnCatalog>,
100 log_store_factory: F,
101 chunk_size: usize,
102 input_data_types: Vec<DataType>,
103 rate_limit: Option<u32>,
104 ) -> StreamExecutorResult<Self> {
105 let sink_input_schema: Schema = columns
106 .iter()
107 .map(|column| Field::from(&column.column_desc))
108 .collect();
109
110 if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
111 assert_eq!(sink_input_schema.data_types(), {
113 let mut data_type = info.schema.data_types();
114 data_type.remove(col_dix);
115 data_type
116 });
117 } else {
118 assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
119 }
120
121 let stream_key = info.pk_indices.clone();
122 let stream_key_sink_pk_mismatch = {
123 stream_key
124 .iter()
125 .any(|i| !sink_param.downstream_pk.contains(i))
126 };
127 let need_advance_delete =
158 stream_key_sink_pk_mismatch && sink_param.sink_type != SinkType::AppendOnly;
159 let re_construct_with_sink_pk = need_advance_delete
161 && sink_param.sink_type == SinkType::Upsert
162 && !sink_param.downstream_pk.is_empty();
163 let compact_chunk = !sink.is_blackhole();
165
166 tracing::info!(
167 sink_id = sink_param.sink_id.sink_id,
168 actor_id = actor_context.id,
169 need_advance_delete,
170 re_construct_with_sink_pk,
171 compact_chunk,
172 "Sink executor info"
173 );
174
175 Ok(Self {
176 actor_context,
177 info,
178 input,
179 sink,
180 input_columns: columns,
181 sink_param,
182 log_store_factory,
183 sink_writer_param,
184 chunk_size,
185 input_data_types,
186 need_advance_delete,
187 re_construct_with_sink_pk,
188 compact_chunk,
189 rate_limit,
190 })
191 }
192
193 fn execute_inner(self) -> BoxedMessageStream {
194 let sink_id = self.sink_param.sink_id;
195 let actor_id = self.actor_context.id;
196 let fragment_id = self.actor_context.fragment_id;
197
198 let stream_key = self.info.pk_indices.clone();
199 let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
200 sink_id,
201 actor_id,
202 fragment_id,
203 );
204
205 let input = self.input.execute();
206
207 let input = input.inspect_ok(move |msg| {
208 if let Message::Chunk(c) = msg {
209 metrics.sink_input_row_count.inc_by(c.capacity() as u64);
210 metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
211 }
212 });
213
214 let processed_input = Self::process_msg(
215 input,
216 self.sink_param.sink_type,
217 stream_key,
218 self.need_advance_delete,
219 self.re_construct_with_sink_pk,
220 self.chunk_size,
221 self.input_data_types,
222 self.sink_param.downstream_pk.clone(),
223 metrics.sink_chunk_buffer_size,
224 self.compact_chunk,
225 );
226
227 if self.sink.is_sink_into_table() {
228 processed_input.boxed()
230 } else {
231 let labels = [
232 &actor_id.to_string(),
233 &sink_id.to_string(),
234 self.sink_param.sink_name.as_str(),
235 ];
236 let log_store_first_write_epoch = GLOBAL_SINK_METRICS
237 .log_store_first_write_epoch
238 .with_guarded_label_values(&labels);
239 let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
240 .log_store_latest_write_epoch
241 .with_guarded_label_values(&labels);
242 let log_store_write_rows = GLOBAL_SINK_METRICS
243 .log_store_write_rows
244 .with_guarded_label_values(&labels);
245 let log_writer_metrics = LogWriterMetrics {
246 log_store_first_write_epoch,
247 log_store_latest_write_epoch,
248 log_store_write_rows,
249 };
250
251 let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
252 rate_limit_tx.send(self.rate_limit.into()).unwrap();
254
255 let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
256
257 self.log_store_factory
258 .build()
259 .map(move |(log_reader, log_writer)| {
260 let write_log_stream = Self::execute_write_log(
261 processed_input,
262 log_writer.monitored(log_writer_metrics),
263 actor_id,
264 sink_id,
265 rate_limit_tx,
266 rebuild_sink_tx,
267 );
268
269 let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
270 let consume_log_stream = Self::execute_consume_log(
271 *sink,
272 log_reader,
273 self.input_columns,
274 self.sink_param,
275 self.sink_writer_param,
276 self.actor_context,
277 rate_limit_rx,
278 rebuild_sink_rx,
279 )
280 .instrument_await(
281 await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
282 )
283 .map_ok(|never| never); consume_log_stream.boxed()
286 });
287 select(consume_log_stream_future.into_stream(), write_log_stream)
288 })
289 .into_stream()
290 .flatten()
291 .boxed()
292 }
293 }
294
295 #[try_stream(ok = Message, error = StreamExecutorError)]
296 async fn execute_write_log<W: LogWriter>(
297 input: impl MessageStream,
298 mut log_writer: W,
299 actor_id: ActorId,
300 sink_id: SinkId,
301 rate_limit_tx: UnboundedSender<RateLimit>,
302 rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
303 ) {
304 pin_mut!(input);
305 let barrier = expect_first_barrier(&mut input).await?;
306 let epoch_pair = barrier.epoch;
307 let is_pause_on_startup = barrier.is_pause_on_startup();
308 yield Message::Barrier(barrier);
310
311 log_writer.init(epoch_pair, is_pause_on_startup).await?;
312
313 let mut is_paused = false;
314
315 #[for_await]
316 for msg in input {
317 match msg? {
318 Message::Watermark(w) => yield Message::Watermark(w),
319 Message::Chunk(chunk) => {
320 assert!(
321 !is_paused,
322 "Actor {actor_id} should not receive any data after pause"
323 );
324 log_writer.write_chunk(chunk.clone()).await?;
325 yield Message::Chunk(chunk);
326 }
327 Message::Barrier(barrier) => {
328 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
329 let post_flush = log_writer
330 .flush_current_epoch(
331 barrier.epoch.curr,
332 FlushCurrentEpochOptions {
333 is_checkpoint: barrier.kind.is_checkpoint(),
334 new_vnode_bitmap: update_vnode_bitmap.clone(),
335 is_stop: barrier.is_stop(actor_id),
336 },
337 )
338 .await?;
339
340 let mutation = barrier.mutation.clone();
341 yield Message::Barrier(barrier);
342 if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
343 && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
344 {
345 let (tx, rx) = oneshot::channel();
346 rebuild_sink_tx
347 .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
348 .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
349 rx.await
350 .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
351 }
352 post_flush.post_yield_barrier().await?;
353
354 if let Some(mutation) = mutation.as_deref() {
355 match mutation {
356 Mutation::Pause => {
357 log_writer.pause()?;
358 is_paused = true;
359 }
360 Mutation::Resume => {
361 log_writer.resume()?;
362 is_paused = false;
363 }
364 Mutation::Throttle(actor_to_apply) => {
365 if let Some(new_rate_limit) = actor_to_apply.get(&actor_id) {
366 tracing::info!(
367 rate_limit = new_rate_limit,
368 "received sink rate limit on actor {actor_id}"
369 );
370 if let Err(e) = rate_limit_tx.send((*new_rate_limit).into()) {
371 error!(
372 error = %e.as_report(),
373 "fail to send sink ate limit update"
374 );
375 return Err(StreamExecutorError::from(
376 e.to_report_string(),
377 ));
378 }
379 }
380 }
381 Mutation::ConnectorPropsChange(config) => {
382 if let Some(map) = config.get(&sink_id.sink_id) {
383 if let Err(e) = rebuild_sink_tx
384 .send(RebuildSinkMessage::UpdateConfig(map.clone()))
385 {
386 error!(
387 error = %e.as_report(),
388 "fail to send sink alter props"
389 );
390 return Err(StreamExecutorError::from(
391 e.to_report_string(),
392 ));
393 }
394 }
395 }
396 _ => (),
397 }
398 }
399 }
400 }
401 }
402 }
403
404 #[allow(clippy::too_many_arguments)]
405 #[try_stream(ok = Message, error = StreamExecutorError)]
406 async fn process_msg(
407 input: impl MessageStream,
408 sink_type: SinkType,
409 stream_key: PkIndices,
410 need_advance_delete: bool,
411 re_construct_with_sink_pk: bool,
412 chunk_size: usize,
413 input_data_types: Vec<DataType>,
414 down_stream_pk: Vec<usize>,
415 sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
416 compact_chunk: bool,
417 ) {
418 if need_advance_delete || re_construct_with_sink_pk {
420 let mut chunk_buffer = EstimatedVec::new();
421 let mut watermark: Option<super::Watermark> = None;
422 #[for_await]
423 for msg in input {
424 match msg? {
425 Message::Watermark(w) => watermark = Some(w),
426 Message::Chunk(c) => {
427 chunk_buffer.push(c);
428
429 sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
430 }
431 Message::Barrier(barrier) => {
432 let chunks = mem::take(&mut chunk_buffer).into_inner();
433 let chunks = if need_advance_delete {
434 let mut delete_chunks = vec![];
435 let mut insert_chunks = vec![];
436
437 for c in StreamChunkCompactor::new(stream_key.clone(), chunks)
438 .into_compacted_chunks()
439 {
440 if sink_type != SinkType::ForceAppendOnly {
441 let chunk = force_delete_only(c.clone());
445 if chunk.cardinality() > 0 {
446 delete_chunks.push(chunk);
447 }
448 }
449 let chunk = force_append_only(c);
450 if chunk.cardinality() > 0 {
451 insert_chunks.push(chunk);
452 }
453 }
454 delete_chunks
455 .into_iter()
456 .chain(insert_chunks.into_iter())
457 .collect()
458 } else {
459 chunks
460 };
461 if re_construct_with_sink_pk {
462 let chunks = StreamChunkCompactor::new(down_stream_pk.clone(), chunks)
463 .reconstructed_compacted_chunks(
464 chunk_size,
465 input_data_types.clone(),
466 sink_type != SinkType::ForceAppendOnly,
467 );
468 for c in chunks {
469 yield Message::Chunk(c);
470 }
471 } else {
472 let mut chunk_builder =
473 StreamChunkBuilder::new(chunk_size, input_data_types.clone());
474 for chunk in chunks {
475 for (op, row) in chunk.rows() {
476 if let Some(c) = chunk_builder.append_row(op, row) {
477 yield Message::Chunk(c);
478 }
479 }
480 }
481
482 if let Some(c) = chunk_builder.take() {
483 yield Message::Chunk(c);
484 }
485 };
486
487 if let Some(w) = mem::take(&mut watermark) {
488 yield Message::Watermark(w)
489 }
490 yield Message::Barrier(barrier);
491 }
492 }
493 }
494 } else {
495 #[for_await]
496 for msg in input {
497 match msg? {
498 Message::Watermark(w) => yield Message::Watermark(w),
499 Message::Chunk(mut chunk) => {
500 if compact_chunk {
503 chunk = merge_chunk_row(chunk, &stream_key);
504 }
505 match sink_type {
506 SinkType::AppendOnly => yield Message::Chunk(chunk),
507 SinkType::ForceAppendOnly => {
508 yield Message::Chunk(force_append_only(chunk))
512 }
513 SinkType::Upsert => {
514 for chunk in
517 StreamChunkCompactor::new(stream_key.clone(), vec![chunk])
518 .into_compacted_chunks()
519 {
520 yield Message::Chunk(chunk)
521 }
522 }
523 }
524 }
525 Message::Barrier(barrier) => {
526 yield Message::Barrier(barrier);
527 }
528 }
529 }
530 }
531 }
532
533 #[expect(clippy::too_many_arguments)]
534 async fn execute_consume_log<S: Sink, R: LogReader>(
535 mut sink: S,
536 log_reader: R,
537 columns: Vec<ColumnCatalog>,
538 mut sink_param: SinkParam,
539 mut sink_writer_param: SinkWriterParam,
540 actor_context: ActorContextRef,
541 rate_limit_rx: UnboundedReceiver<RateLimit>,
542 mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
543 ) -> StreamExecutorResult<!> {
544 let visible_columns = columns
545 .iter()
546 .enumerate()
547 .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
548 .collect_vec();
549
550 let labels = [
551 &actor_context.id.to_string(),
552 sink_writer_param.connector.as_str(),
553 &sink_writer_param.sink_id.to_string(),
554 sink_writer_param.sink_name.as_str(),
555 ];
556 let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
557 .log_store_reader_wait_new_future_duration_ns
558 .with_guarded_label_values(&labels);
559 let log_store_read_rows = GLOBAL_SINK_METRICS
560 .log_store_read_rows
561 .with_guarded_label_values(&labels);
562 let log_store_read_bytes = GLOBAL_SINK_METRICS
563 .log_store_read_bytes
564 .with_guarded_label_values(&labels);
565 let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
566 .log_store_latest_read_epoch
567 .with_guarded_label_values(&labels);
568 let metrics = LogReaderMetrics {
569 log_store_latest_read_epoch,
570 log_store_read_rows,
571 log_store_read_bytes,
572 log_store_reader_wait_new_future_duration_ns,
573 };
574
575 let mut log_reader = log_reader
576 .transform_chunk(move |chunk| {
577 if visible_columns.len() != columns.len() {
578 chunk.project(&visible_columns)
581 } else {
582 chunk
583 }
584 })
585 .monitored(metrics)
586 .rate_limited(rate_limit_rx);
587
588 log_reader.init().await?;
589 loop {
590 let future = async {
591 loop {
592 let Err(e) = sink
593 .new_log_sinker(sink_writer_param.clone())
594 .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
595 .await;
596 GLOBAL_ERROR_METRICS.user_sink_error.report([
597 "sink_executor_error".to_owned(),
598 sink_param.sink_id.to_string(),
599 sink_param.sink_name.clone(),
600 actor_context.fragment_id.to_string(),
601 ]);
602
603 if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
604 meta_client
605 .add_sink_fail_evet_log(
606 sink_writer_param.sink_id.sink_id,
607 sink_writer_param.sink_name.clone(),
608 sink_writer_param.connector.clone(),
609 e.to_report_string(),
610 )
611 .await;
612 }
613
614 if F::ALLOW_REWIND {
615 match log_reader.rewind().await {
616 Ok(()) => {
617 warn!(
618 error = %e.as_report(),
619 executor_id = sink_writer_param.executor_id,
620 sink_id = sink_param.sink_id.sink_id,
621 "reset log reader stream successfully after sink error"
622 );
623 Ok(())
624 }
625 Err(rewind_err) => {
626 error!(
627 error = %rewind_err.as_report(),
628 "fail to rewind log reader"
629 );
630 Err(e)
631 }
632 }
633 } else {
634 Err(e)
635 }
636 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
637 }
638 };
639 select! {
640 result = future => {
641 let Err(e): StreamExecutorResult<!> = result;
642 return Err(e);
643 }
644 result = rebuild_sink_rx.recv() => {
645 match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
646 RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
647 sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
648 if notify.send(()).is_err() {
649 warn!("failed to notify rebuild sink");
650 }
651 log_reader.init().await?;
652 },
653 RebuildSinkMessage::UpdateConfig(config) => {
654 if F::ALLOW_REWIND {
655 match log_reader.rewind().await {
656 Ok(()) => {
657 sink_param.properties = config.into_iter().collect();
658 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
659 info!(
660 executor_id = sink_writer_param.executor_id,
661 sink_id = sink_param.sink_id.sink_id,
662 "alter sink config successfully with rewind"
663 );
664 Ok(())
665 }
666 Err(rewind_err) => {
667 error!(
668 error = %rewind_err.as_report(),
669 "fail to rewind log reader for alter sink config "
670 );
671 Err(anyhow!("fail to rewind log after alter table").into())
672 }
673 }
674 } else {
675 sink_param.properties = config.into_iter().collect();
676 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
677 Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
678 }
679 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
680 },
681 }
682 }
683 }
684 }
685 }
686}
687
688enum RebuildSinkMessage {
689 RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
690 UpdateConfig(HashMap<String, String>),
691}
692
693impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
694 fn execute(self: Box<Self>) -> BoxedMessageStream {
695 self.execute_inner()
696 }
697}
698
699#[cfg(test)]
700mod test {
701 use risingwave_common::catalog::{ColumnDesc, ColumnId};
702 use risingwave_common::util::epoch::test_epoch;
703 use risingwave_connector::sink::build_sink;
704
705 use super::*;
706 use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
707 use crate::executor::test_utils::*;
708
709 #[tokio::test]
710 async fn test_force_append_only_sink() {
711 use risingwave_common::array::StreamChunkTestExt;
712 use risingwave_common::array::stream_chunk::StreamChunk;
713 use risingwave_common::types::DataType;
714
715 use crate::executor::Barrier;
716
717 let properties = maplit::btreemap! {
718 "connector".into() => "blackhole".into(),
719 "type".into() => "append-only".into(),
720 "force_append_only".into() => "true".into()
721 };
722
723 let columns = vec![
726 ColumnCatalog {
727 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
728 is_hidden: false,
729 },
730 ColumnCatalog {
731 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
732 is_hidden: false,
733 },
734 ColumnCatalog {
735 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
736 is_hidden: true,
737 },
738 ];
739 let schema: Schema = columns
740 .iter()
741 .map(|column| Field::from(column.column_desc.clone()))
742 .collect();
743 let pk_indices = vec![0];
744
745 let source = MockSource::with_messages(vec![
746 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
747 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
748 " I I I
749 + 3 2 1",
750 ))),
751 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
752 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
753 " I I I
754 U- 3 2 1
755 U+ 3 4 1
756 + 5 6 7",
757 ))),
758 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
759 " I I I
760 - 5 6 7",
761 ))),
762 ])
763 .into_executor(schema.clone(), pk_indices.clone());
764
765 let sink_param = SinkParam {
766 sink_id: 0.into(),
767 sink_name: "test".into(),
768 properties,
769
770 columns: columns
771 .iter()
772 .filter(|col| !col.is_hidden)
773 .map(|col| col.column_desc.clone())
774 .collect(),
775 downstream_pk: pk_indices.clone(),
776 sink_type: SinkType::ForceAppendOnly,
777 format_desc: None,
778 db_name: "test".into(),
779 sink_from_name: "test".into(),
780 };
781
782 let info = ExecutorInfo::new(schema, pk_indices, "SinkExecutor".to_owned(), 0);
783
784 let sink = build_sink(sink_param.clone()).unwrap();
785
786 let sink_executor = SinkExecutor::new(
787 ActorContext::for_test(0),
788 info,
789 source,
790 SinkWriterParam::for_test(),
791 sink,
792 sink_param,
793 columns.clone(),
794 BoundedInMemLogStoreFactory::new(1),
795 1024,
796 vec![DataType::Int32, DataType::Int32, DataType::Int32],
797 None,
798 )
799 .await
800 .unwrap();
801
802 let mut executor = sink_executor.boxed().execute();
803
804 executor.next().await.unwrap().unwrap();
806
807 let chunk_msg = executor.next().await.unwrap().unwrap();
808 assert_eq!(
809 chunk_msg.into_chunk().unwrap().compact(),
810 StreamChunk::from_pretty(
811 " I I I
812 + 3 2 1",
813 )
814 );
815
816 executor.next().await.unwrap().unwrap();
818
819 let chunk_msg = executor.next().await.unwrap().unwrap();
820 assert_eq!(
821 chunk_msg.into_chunk().unwrap().compact(),
822 StreamChunk::from_pretty(
823 " I I I
824 + 3 4 1
825 + 5 6 7",
826 )
827 );
828
829 executor.next().await.unwrap().unwrap();
834 }
835
836 #[tokio::test]
837 async fn stream_key_sink_pk_mismatch() {
838 use risingwave_common::array::StreamChunkTestExt;
839 use risingwave_common::array::stream_chunk::StreamChunk;
840 use risingwave_common::types::DataType;
841
842 use crate::executor::Barrier;
843
844 let properties = maplit::btreemap! {
845 "connector".into() => "blackhole".into(),
846 };
847
848 let columns = vec![
851 ColumnCatalog {
852 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
853 is_hidden: false,
854 },
855 ColumnCatalog {
856 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
857 is_hidden: false,
858 },
859 ColumnCatalog {
860 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
861 is_hidden: true,
862 },
863 ];
864 let schema: Schema = columns
865 .iter()
866 .map(|column| Field::from(column.column_desc.clone()))
867 .collect();
868
869 let source = MockSource::with_messages(vec![
870 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
871 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
872 " I I I
873 + 1 1 10",
874 ))),
875 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
876 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
877 " I I I
878 + 1 3 30",
879 ))),
880 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
881 " I I I
882 + 1 2 20
883 - 1 2 20",
884 ))),
885 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
886 " I I I
887 - 1 1 10
888 + 1 1 40",
889 ))),
890 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
891 ])
892 .into_executor(schema.clone(), vec![0, 1]);
893
894 let sink_param = SinkParam {
895 sink_id: 0.into(),
896 sink_name: "test".into(),
897 properties,
898
899 columns: columns
900 .iter()
901 .filter(|col| !col.is_hidden)
902 .map(|col| col.column_desc.clone())
903 .collect(),
904 downstream_pk: vec![0],
905 sink_type: SinkType::Upsert,
906 format_desc: None,
907 db_name: "test".into(),
908 sink_from_name: "test".into(),
909 };
910
911 let info = ExecutorInfo::new(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
912
913 let sink = build_sink(sink_param.clone()).unwrap();
914
915 let sink_executor = SinkExecutor::new(
916 ActorContext::for_test(0),
917 info,
918 source,
919 SinkWriterParam::for_test(),
920 sink,
921 sink_param,
922 columns.clone(),
923 BoundedInMemLogStoreFactory::new(1),
924 1024,
925 vec![DataType::Int64, DataType::Int64, DataType::Int64],
926 None,
927 )
928 .await
929 .unwrap();
930
931 let mut executor = sink_executor.boxed().execute();
932
933 executor.next().await.unwrap().unwrap();
935
936 let chunk_msg = executor.next().await.unwrap().unwrap();
937 assert_eq!(
938 chunk_msg.into_chunk().unwrap().compact(),
939 StreamChunk::from_pretty(
940 " I I I
941 + 1 1 10",
942 )
943 );
944
945 executor.next().await.unwrap().unwrap();
947
948 let chunk_msg = executor.next().await.unwrap().unwrap();
949 assert_eq!(
950 chunk_msg.into_chunk().unwrap().compact(),
951 StreamChunk::from_pretty(
952 " I I I
953 U- 1 1 10
954 U+ 1 1 40",
955 )
956 );
957
958 executor.next().await.unwrap().unwrap();
960 }
961
962 #[tokio::test]
963 async fn test_empty_barrier_sink() {
964 use risingwave_common::types::DataType;
965
966 use crate::executor::Barrier;
967
968 let properties = maplit::btreemap! {
969 "connector".into() => "blackhole".into(),
970 "type".into() => "append-only".into(),
971 "force_append_only".into() => "true".into()
972 };
973 let columns = vec![
974 ColumnCatalog {
975 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
976 is_hidden: false,
977 },
978 ColumnCatalog {
979 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
980 is_hidden: false,
981 },
982 ];
983 let schema: Schema = columns
984 .iter()
985 .map(|column| Field::from(column.column_desc.clone()))
986 .collect();
987 let pk_indices = vec![0];
988
989 let source = MockSource::with_messages(vec![
990 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
991 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
992 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
993 ])
994 .into_executor(schema.clone(), pk_indices.clone());
995
996 let sink_param = SinkParam {
997 sink_id: 0.into(),
998 sink_name: "test".into(),
999 properties,
1000
1001 columns: columns
1002 .iter()
1003 .filter(|col| !col.is_hidden)
1004 .map(|col| col.column_desc.clone())
1005 .collect(),
1006 downstream_pk: pk_indices.clone(),
1007 sink_type: SinkType::ForceAppendOnly,
1008 format_desc: None,
1009 db_name: "test".into(),
1010 sink_from_name: "test".into(),
1011 };
1012
1013 let info = ExecutorInfo::new(schema, pk_indices, "SinkExecutor".to_owned(), 0);
1014
1015 let sink = build_sink(sink_param.clone()).unwrap();
1016
1017 let sink_executor = SinkExecutor::new(
1018 ActorContext::for_test(0),
1019 info,
1020 source,
1021 SinkWriterParam::for_test(),
1022 sink,
1023 sink_param,
1024 columns,
1025 BoundedInMemLogStoreFactory::new(1),
1026 1024,
1027 vec![DataType::Int64, DataType::Int64],
1028 None,
1029 )
1030 .await
1031 .unwrap();
1032
1033 let mut executor = sink_executor.boxed().execute();
1034
1035 assert_eq!(
1037 executor.next().await.unwrap().unwrap(),
1038 Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1039 );
1040
1041 assert_eq!(
1043 executor.next().await.unwrap().unwrap(),
1044 Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1045 );
1046
1047 assert_eq!(
1049 executor.next().await.unwrap().unwrap(),
1050 Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1051 );
1052 }
1053}