1use std::collections::HashMap;
16use std::mem;
17
18use anyhow::anyhow;
19use futures::stream::select;
20use futures::{FutureExt, TryFutureExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::array::Op;
23use risingwave_common::array::stream_chunk::StreamChunkMut;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{ColumnCatalog, Field};
26use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
27use risingwave_common_estimate_size::EstimateSize;
28use risingwave_common_estimate_size::collections::EstimatedVec;
29use risingwave_common_rate_limit::RateLimit;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::catalog::{SinkId, SinkType};
32use risingwave_connector::sink::log_store::{
33 FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
34 LogWriter, LogWriterExt, LogWriterMetrics,
35};
36use risingwave_connector::sink::{
37 GLOBAL_SINK_METRICS, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam,
38};
39use thiserror_ext::AsReport;
40use tokio::select;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::oneshot;
43
44use crate::common::compact_chunk::{StreamChunkCompactor, merge_chunk_row};
45use crate::executor::prelude::*;
46pub struct SinkExecutor<F: LogStoreFactory> {
47 actor_context: ActorContextRef,
48 info: ExecutorInfo,
49 input: Executor,
50 sink: SinkImpl,
51 input_columns: Vec<ColumnCatalog>,
52 sink_param: SinkParam,
53 log_store_factory: F,
54 sink_writer_param: SinkWriterParam,
55 chunk_size: usize,
56 input_data_types: Vec<DataType>,
57 need_advance_delete: bool,
58 re_construct_with_sink_pk: bool,
59 pk_matched: bool,
61 compact_chunk: bool,
62 rate_limit: Option<u32>,
63}
64
65fn force_append_only(c: StreamChunk) -> StreamChunk {
67 let mut c: StreamChunkMut = c.into();
68 for (_, mut r) in c.to_rows_mut() {
69 match r.op() {
70 Op::Insert => {}
71 Op::Delete | Op::UpdateDelete => r.set_vis(false),
72 Op::UpdateInsert => r.set_op(Op::Insert),
73 }
74 }
75 c.into()
76}
77
78fn force_delete_only(c: StreamChunk) -> StreamChunk {
80 let mut c: StreamChunkMut = c.into();
81 for (_, mut r) in c.to_rows_mut() {
82 match r.op() {
83 Op::Delete => {}
84 Op::Insert | Op::UpdateInsert => r.set_vis(false),
85 Op::UpdateDelete => r.set_op(Op::Delete),
86 }
87 }
88 c.into()
89}
90
91impl<F: LogStoreFactory> SinkExecutor<F> {
92 #[allow(clippy::too_many_arguments)]
93 #[expect(clippy::unused_async)]
94 pub async fn new(
95 actor_context: ActorContextRef,
96 info: ExecutorInfo,
97 input: Executor,
98 sink_writer_param: SinkWriterParam,
99 sink: SinkImpl,
100 sink_param: SinkParam,
101 columns: Vec<ColumnCatalog>,
102 log_store_factory: F,
103 chunk_size: usize,
104 input_data_types: Vec<DataType>,
105 rate_limit: Option<u32>,
106 ) -> StreamExecutorResult<Self> {
107 let sink_input_schema: Schema = columns
108 .iter()
109 .map(|column| Field::from(&column.column_desc))
110 .collect();
111
112 if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
113 assert_eq!(sink_input_schema.data_types(), {
115 let mut data_type = info.schema.data_types();
116 data_type.remove(col_dix);
117 data_type
118 });
119 } else {
120 assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
121 }
122
123 let stream_key = info.pk_indices.clone();
124 let stream_key_sink_pk_mismatch = {
125 stream_key
126 .iter()
127 .any(|i| !sink_param.downstream_pk.contains(i))
128 };
129 let need_advance_delete =
160 stream_key_sink_pk_mismatch && sink_param.sink_type != SinkType::AppendOnly;
161 let re_construct_with_sink_pk = need_advance_delete
163 && sink_param.sink_type == SinkType::Upsert
164 && !sink_param.downstream_pk.is_empty();
165 let pk_matched = !stream_key_sink_pk_mismatch;
166 let compact_chunk = !sink.is_blackhole();
168
169 tracing::info!(
170 sink_id = sink_param.sink_id.sink_id,
171 actor_id = actor_context.id,
172 need_advance_delete,
173 re_construct_with_sink_pk,
174 pk_matched,
175 compact_chunk,
176 "Sink executor info"
177 );
178
179 Ok(Self {
180 actor_context,
181 info,
182 input,
183 sink,
184 input_columns: columns,
185 sink_param,
186 log_store_factory,
187 sink_writer_param,
188 chunk_size,
189 input_data_types,
190 need_advance_delete,
191 re_construct_with_sink_pk,
192 pk_matched,
193 compact_chunk,
194 rate_limit,
195 })
196 }
197
198 fn execute_inner(self) -> BoxedMessageStream {
199 let sink_id = self.sink_param.sink_id;
200 let actor_id = self.actor_context.id;
201 let fragment_id = self.actor_context.fragment_id;
202
203 let stream_key = self.info.pk_indices.clone();
204 let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
205 sink_id,
206 actor_id,
207 fragment_id,
208 );
209
210 let input = self.input.execute();
211
212 let input = input.inspect_ok(move |msg| {
213 if let Message::Chunk(c) = msg {
214 metrics.sink_input_row_count.inc_by(c.capacity() as u64);
215 metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
216 }
217 });
218
219 let processed_input = Self::process_msg(
220 input,
221 self.sink_param.sink_type,
222 stream_key,
223 self.need_advance_delete,
224 self.re_construct_with_sink_pk,
225 self.chunk_size,
226 self.input_data_types,
227 self.sink_param.downstream_pk.clone(),
228 metrics.sink_chunk_buffer_size,
229 self.compact_chunk,
230 );
231
232 if self.sink.is_sink_into_table() {
233 processed_input.boxed()
235 } else {
236 let labels = [
237 &actor_id.to_string(),
238 &sink_id.to_string(),
239 self.sink_param.sink_name.as_str(),
240 ];
241 let log_store_first_write_epoch = GLOBAL_SINK_METRICS
242 .log_store_first_write_epoch
243 .with_guarded_label_values(&labels);
244 let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
245 .log_store_latest_write_epoch
246 .with_guarded_label_values(&labels);
247 let log_store_write_rows = GLOBAL_SINK_METRICS
248 .log_store_write_rows
249 .with_guarded_label_values(&labels);
250 let log_writer_metrics = LogWriterMetrics {
251 log_store_first_write_epoch,
252 log_store_latest_write_epoch,
253 log_store_write_rows,
254 };
255
256 let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
257 rate_limit_tx.send(self.rate_limit.into()).unwrap();
259
260 let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
261
262 self.log_store_factory
263 .build()
264 .map(move |(log_reader, log_writer)| {
265 let write_log_stream = Self::execute_write_log(
266 processed_input,
267 log_writer.monitored(log_writer_metrics),
268 actor_id,
269 sink_id,
270 rate_limit_tx,
271 rebuild_sink_tx,
272 );
273
274 let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
275 let consume_log_stream = Self::execute_consume_log(
276 *sink,
277 log_reader,
278 self.input_columns,
279 self.sink_param,
280 self.sink_writer_param,
281 self.pk_matched,
282 self.actor_context,
283 rate_limit_rx,
284 rebuild_sink_rx,
285 )
286 .instrument_await(
287 await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
288 )
289 .map_ok(|never| never); consume_log_stream.boxed()
292 });
293 select(consume_log_stream_future.into_stream(), write_log_stream)
294 })
295 .into_stream()
296 .flatten()
297 .boxed()
298 }
299 }
300
301 #[try_stream(ok = Message, error = StreamExecutorError)]
302 async fn execute_write_log<W: LogWriter>(
303 input: impl MessageStream,
304 mut log_writer: W,
305 actor_id: ActorId,
306 sink_id: SinkId,
307 rate_limit_tx: UnboundedSender<RateLimit>,
308 rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
309 ) {
310 pin_mut!(input);
311 let barrier = expect_first_barrier(&mut input).await?;
312 let epoch_pair = barrier.epoch;
313 let is_pause_on_startup = barrier.is_pause_on_startup();
314 yield Message::Barrier(barrier);
316
317 log_writer.init(epoch_pair, is_pause_on_startup).await?;
318
319 let mut is_paused = false;
320
321 #[for_await]
322 for msg in input {
323 match msg? {
324 Message::Watermark(w) => yield Message::Watermark(w),
325 Message::Chunk(chunk) => {
326 assert!(
327 !is_paused,
328 "Actor {actor_id} should not receive any data after pause"
329 );
330 log_writer.write_chunk(chunk.clone()).await?;
331 yield Message::Chunk(chunk);
332 }
333 Message::Barrier(barrier) => {
334 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
335 let post_flush = log_writer
336 .flush_current_epoch(
337 barrier.epoch.curr,
338 FlushCurrentEpochOptions {
339 is_checkpoint: barrier.kind.is_checkpoint(),
340 new_vnode_bitmap: update_vnode_bitmap.clone(),
341 is_stop: barrier.is_stop(actor_id),
342 },
343 )
344 .await?;
345
346 let mutation = barrier.mutation.clone();
347 yield Message::Barrier(barrier);
348 if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
349 && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
350 {
351 let (tx, rx) = oneshot::channel();
352 rebuild_sink_tx
353 .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
354 .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
355 rx.await
356 .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
357 }
358 post_flush.post_yield_barrier().await?;
359
360 if let Some(mutation) = mutation.as_deref() {
361 match mutation {
362 Mutation::Pause => {
363 log_writer.pause()?;
364 is_paused = true;
365 }
366 Mutation::Resume => {
367 log_writer.resume()?;
368 is_paused = false;
369 }
370 Mutation::Throttle(actor_to_apply) => {
371 if let Some(new_rate_limit) = actor_to_apply.get(&actor_id) {
372 tracing::info!(
373 rate_limit = new_rate_limit,
374 "received sink rate limit on actor {actor_id}"
375 );
376 if let Err(e) = rate_limit_tx.send((*new_rate_limit).into()) {
377 error!(
378 error = %e.as_report(),
379 "fail to send sink ate limit update"
380 );
381 return Err(StreamExecutorError::from(
382 e.to_report_string(),
383 ));
384 }
385 }
386 }
387 Mutation::ConnectorPropsChange(config) => {
388 if let Some(map) = config.get(&sink_id.sink_id)
389 && let Err(e) = rebuild_sink_tx
390 .send(RebuildSinkMessage::UpdateConfig(map.clone()))
391 {
392 error!(
393 error = %e.as_report(),
394 "fail to send sink alter props"
395 );
396 return Err(StreamExecutorError::from(e.to_report_string()));
397 }
398 }
399 _ => (),
400 }
401 }
402 }
403 }
404 }
405 }
406
407 #[allow(clippy::too_many_arguments)]
408 #[try_stream(ok = Message, error = StreamExecutorError)]
409 async fn process_msg(
410 input: impl MessageStream,
411 sink_type: SinkType,
412 stream_key: PkIndices,
413 need_advance_delete: bool,
414 re_construct_with_sink_pk: bool,
415 chunk_size: usize,
416 input_data_types: Vec<DataType>,
417 down_stream_pk: Vec<usize>,
418 sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
419 compact_chunk: bool,
420 ) {
421 if need_advance_delete || re_construct_with_sink_pk {
423 let mut chunk_buffer = EstimatedVec::new();
424 let mut watermark: Option<super::Watermark> = None;
425 #[for_await]
426 for msg in input {
427 match msg? {
428 Message::Watermark(w) => watermark = Some(w),
429 Message::Chunk(c) => {
430 chunk_buffer.push(c);
431
432 sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
433 }
434 Message::Barrier(barrier) => {
435 let chunks = mem::take(&mut chunk_buffer).into_inner();
436 let chunks = if need_advance_delete {
437 let mut delete_chunks = vec![];
438 let mut insert_chunks = vec![];
439
440 for c in StreamChunkCompactor::new(stream_key.clone(), chunks)
441 .into_compacted_chunks()
442 {
443 if sink_type != SinkType::ForceAppendOnly {
444 let chunk = force_delete_only(c.clone());
448 if chunk.cardinality() > 0 {
449 delete_chunks.push(chunk);
450 }
451 }
452 let chunk = force_append_only(c);
453 if chunk.cardinality() > 0 {
454 insert_chunks.push(chunk);
455 }
456 }
457 delete_chunks
458 .into_iter()
459 .chain(insert_chunks.into_iter())
460 .collect()
461 } else {
462 chunks
463 };
464 if re_construct_with_sink_pk {
465 let chunks = StreamChunkCompactor::new(down_stream_pk.clone(), chunks)
466 .reconstructed_compacted_chunks(
467 chunk_size,
468 input_data_types.clone(),
469 sink_type != SinkType::ForceAppendOnly,
470 );
471 for c in chunks {
472 yield Message::Chunk(c);
473 }
474 } else {
475 let mut chunk_builder =
476 StreamChunkBuilder::new(chunk_size, input_data_types.clone());
477 for chunk in chunks {
478 for (op, row) in chunk.rows() {
479 if let Some(c) = chunk_builder.append_row(op, row) {
480 yield Message::Chunk(c);
481 }
482 }
483 }
484
485 if let Some(c) = chunk_builder.take() {
486 yield Message::Chunk(c);
487 }
488 };
489
490 if let Some(w) = mem::take(&mut watermark) {
491 yield Message::Watermark(w)
492 }
493 yield Message::Barrier(barrier);
494 }
495 }
496 }
497 } else {
498 #[for_await]
499 for msg in input {
500 match msg? {
501 Message::Watermark(w) => yield Message::Watermark(w),
502 Message::Chunk(mut chunk) => {
503 if compact_chunk {
506 chunk = merge_chunk_row(chunk, &stream_key);
507 }
508 match sink_type {
509 SinkType::AppendOnly => yield Message::Chunk(chunk),
510 SinkType::ForceAppendOnly => {
511 yield Message::Chunk(force_append_only(chunk))
515 }
516 SinkType::Upsert => yield Message::Chunk(chunk),
517 }
518 }
519 Message::Barrier(barrier) => {
520 yield Message::Barrier(barrier);
521 }
522 }
523 }
524 }
525 }
526
527 #[expect(clippy::too_many_arguments)]
528 async fn execute_consume_log<S: Sink, R: LogReader>(
529 mut sink: S,
530 log_reader: R,
531 columns: Vec<ColumnCatalog>,
532 mut sink_param: SinkParam,
533 mut sink_writer_param: SinkWriterParam,
534 pk_matched: bool,
535 actor_context: ActorContextRef,
536 rate_limit_rx: UnboundedReceiver<RateLimit>,
537 mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
538 ) -> StreamExecutorResult<!> {
539 let visible_columns = columns
540 .iter()
541 .enumerate()
542 .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
543 .collect_vec();
544
545 let labels = [
546 &actor_context.id.to_string(),
547 sink_writer_param.connector.as_str(),
548 &sink_writer_param.sink_id.to_string(),
549 sink_writer_param.sink_name.as_str(),
550 ];
551 let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
552 .log_store_reader_wait_new_future_duration_ns
553 .with_guarded_label_values(&labels);
554 let log_store_read_rows = GLOBAL_SINK_METRICS
555 .log_store_read_rows
556 .with_guarded_label_values(&labels);
557 let log_store_read_bytes = GLOBAL_SINK_METRICS
558 .log_store_read_bytes
559 .with_guarded_label_values(&labels);
560 let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
561 .log_store_latest_read_epoch
562 .with_guarded_label_values(&labels);
563 let metrics = LogReaderMetrics {
564 log_store_latest_read_epoch,
565 log_store_read_rows,
566 log_store_read_bytes,
567 log_store_reader_wait_new_future_duration_ns,
568 };
569
570 let downstream_pk = sink_param.downstream_pk.clone();
571
572 let mut log_reader = log_reader
573 .transform_chunk(move |chunk| {
574 let chunk = if pk_matched && matches!(sink_param.sink_type, SinkType::Upsert) {
591 merge_chunk_row(chunk, &downstream_pk)
592 } else {
593 chunk
594 };
595 if visible_columns.len() != columns.len() {
596 chunk.project(&visible_columns)
599 } else {
600 chunk
601 }
602 })
603 .monitored(metrics)
604 .rate_limited(rate_limit_rx);
605
606 log_reader.init().await?;
607 loop {
608 let future = async {
609 loop {
610 let Err(e) = sink
611 .new_log_sinker(sink_writer_param.clone())
612 .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
613 .await;
614 GLOBAL_ERROR_METRICS.user_sink_error.report([
615 "sink_executor_error".to_owned(),
616 sink_param.sink_id.to_string(),
617 sink_param.sink_name.clone(),
618 actor_context.fragment_id.to_string(),
619 ]);
620
621 if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
622 meta_client
623 .add_sink_fail_evet_log(
624 sink_writer_param.sink_id.sink_id,
625 sink_writer_param.sink_name.clone(),
626 sink_writer_param.connector.clone(),
627 e.to_report_string(),
628 )
629 .await;
630 }
631
632 if F::ALLOW_REWIND {
633 match log_reader.rewind().await {
634 Ok(()) => {
635 warn!(
636 error = %e.as_report(),
637 executor_id = sink_writer_param.executor_id,
638 sink_id = sink_param.sink_id.sink_id,
639 "reset log reader stream successfully after sink error"
640 );
641 Ok(())
642 }
643 Err(rewind_err) => {
644 error!(
645 error = %rewind_err.as_report(),
646 "fail to rewind log reader"
647 );
648 Err(e)
649 }
650 }
651 } else {
652 Err(e)
653 }
654 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
655 }
656 };
657 select! {
658 result = future => {
659 let Err(e): StreamExecutorResult<!> = result;
660 return Err(e);
661 }
662 result = rebuild_sink_rx.recv() => {
663 match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
664 RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
665 sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
666 if notify.send(()).is_err() {
667 warn!("failed to notify rebuild sink");
668 }
669 log_reader.init().await?;
670 },
671 RebuildSinkMessage::UpdateConfig(config) => {
672 if F::ALLOW_REWIND {
673 match log_reader.rewind().await {
674 Ok(()) => {
675 sink_param.properties = config.into_iter().collect();
676 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
677 info!(
678 executor_id = sink_writer_param.executor_id,
679 sink_id = sink_param.sink_id.sink_id,
680 "alter sink config successfully with rewind"
681 );
682 Ok(())
683 }
684 Err(rewind_err) => {
685 error!(
686 error = %rewind_err.as_report(),
687 "fail to rewind log reader for alter sink config "
688 );
689 Err(anyhow!("fail to rewind log after alter table").into())
690 }
691 }
692 } else {
693 sink_param.properties = config.into_iter().collect();
694 sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
695 Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
696 }
697 .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
698 },
699 }
700 }
701 }
702 }
703 }
704}
705
706enum RebuildSinkMessage {
707 RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
708 UpdateConfig(HashMap<String, String>),
709}
710
711impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
712 fn execute(self: Box<Self>) -> BoxedMessageStream {
713 self.execute_inner()
714 }
715}
716
717#[cfg(test)]
718mod test {
719 use risingwave_common::catalog::{ColumnDesc, ColumnId};
720 use risingwave_common::util::epoch::test_epoch;
721 use risingwave_connector::sink::build_sink;
722
723 use super::*;
724 use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
725 use crate::executor::test_utils::*;
726
727 #[tokio::test]
728 async fn test_force_append_only_sink() {
729 use risingwave_common::array::StreamChunkTestExt;
730 use risingwave_common::array::stream_chunk::StreamChunk;
731 use risingwave_common::types::DataType;
732
733 use crate::executor::Barrier;
734
735 let properties = maplit::btreemap! {
736 "connector".into() => "blackhole".into(),
737 "type".into() => "append-only".into(),
738 "force_append_only".into() => "true".into()
739 };
740
741 let columns = vec![
744 ColumnCatalog {
745 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
746 is_hidden: false,
747 },
748 ColumnCatalog {
749 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
750 is_hidden: false,
751 },
752 ColumnCatalog {
753 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
754 is_hidden: true,
755 },
756 ];
757 let schema: Schema = columns
758 .iter()
759 .map(|column| Field::from(column.column_desc.clone()))
760 .collect();
761 let pk_indices = vec![0];
762
763 let source = MockSource::with_messages(vec![
764 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
765 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
766 " I I I
767 + 3 2 1",
768 ))),
769 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
770 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
771 " I I I
772 U- 3 2 1
773 U+ 3 4 1
774 + 5 6 7",
775 ))),
776 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
777 " I I I
778 - 5 6 7",
779 ))),
780 ])
781 .into_executor(schema.clone(), pk_indices.clone());
782
783 let sink_param = SinkParam {
784 sink_id: 0.into(),
785 sink_name: "test".into(),
786 properties,
787
788 columns: columns
789 .iter()
790 .filter(|col| !col.is_hidden)
791 .map(|col| col.column_desc.clone())
792 .collect(),
793 downstream_pk: pk_indices.clone(),
794 sink_type: SinkType::ForceAppendOnly,
795 format_desc: None,
796 db_name: "test".into(),
797 sink_from_name: "test".into(),
798 };
799
800 let info = ExecutorInfo::new(schema, pk_indices, "SinkExecutor".to_owned(), 0);
801
802 let sink = build_sink(sink_param.clone()).unwrap();
803
804 let sink_executor = SinkExecutor::new(
805 ActorContext::for_test(0),
806 info,
807 source,
808 SinkWriterParam::for_test(),
809 sink,
810 sink_param,
811 columns.clone(),
812 BoundedInMemLogStoreFactory::new(1),
813 1024,
814 vec![DataType::Int32, DataType::Int32, DataType::Int32],
815 None,
816 )
817 .await
818 .unwrap();
819
820 let mut executor = sink_executor.boxed().execute();
821
822 executor.next().await.unwrap().unwrap();
824
825 let chunk_msg = executor.next().await.unwrap().unwrap();
826 assert_eq!(
827 chunk_msg.into_chunk().unwrap().compact(),
828 StreamChunk::from_pretty(
829 " I I I
830 + 3 2 1",
831 )
832 );
833
834 executor.next().await.unwrap().unwrap();
836
837 let chunk_msg = executor.next().await.unwrap().unwrap();
838 assert_eq!(
839 chunk_msg.into_chunk().unwrap().compact(),
840 StreamChunk::from_pretty(
841 " I I I
842 + 3 4 1
843 + 5 6 7",
844 )
845 );
846
847 executor.next().await.unwrap().unwrap();
852 }
853
854 #[tokio::test]
855 async fn stream_key_sink_pk_mismatch() {
856 use risingwave_common::array::StreamChunkTestExt;
857 use risingwave_common::array::stream_chunk::StreamChunk;
858 use risingwave_common::types::DataType;
859
860 use crate::executor::Barrier;
861
862 let properties = maplit::btreemap! {
863 "connector".into() => "blackhole".into(),
864 };
865
866 let columns = vec![
869 ColumnCatalog {
870 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
871 is_hidden: false,
872 },
873 ColumnCatalog {
874 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
875 is_hidden: false,
876 },
877 ColumnCatalog {
878 column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
879 is_hidden: true,
880 },
881 ];
882 let schema: Schema = columns
883 .iter()
884 .map(|column| Field::from(column.column_desc.clone()))
885 .collect();
886
887 let source = MockSource::with_messages(vec![
888 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
889 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
890 " I I I
891 + 1 1 10",
892 ))),
893 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
894 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
895 " I I I
896 + 1 3 30",
897 ))),
898 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
899 " I I I
900 + 1 2 20
901 - 1 2 20",
902 ))),
903 Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
904 " I I I
905 - 1 1 10
906 + 1 1 40",
907 ))),
908 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
909 ])
910 .into_executor(schema.clone(), vec![0, 1]);
911
912 let sink_param = SinkParam {
913 sink_id: 0.into(),
914 sink_name: "test".into(),
915 properties,
916
917 columns: columns
918 .iter()
919 .filter(|col| !col.is_hidden)
920 .map(|col| col.column_desc.clone())
921 .collect(),
922 downstream_pk: vec![0],
923 sink_type: SinkType::Upsert,
924 format_desc: None,
925 db_name: "test".into(),
926 sink_from_name: "test".into(),
927 };
928
929 let info = ExecutorInfo::new(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
930
931 let sink = build_sink(sink_param.clone()).unwrap();
932
933 let sink_executor = SinkExecutor::new(
934 ActorContext::for_test(0),
935 info,
936 source,
937 SinkWriterParam::for_test(),
938 sink,
939 sink_param,
940 columns.clone(),
941 BoundedInMemLogStoreFactory::new(1),
942 1024,
943 vec![DataType::Int64, DataType::Int64, DataType::Int64],
944 None,
945 )
946 .await
947 .unwrap();
948
949 let mut executor = sink_executor.boxed().execute();
950
951 executor.next().await.unwrap().unwrap();
953
954 let chunk_msg = executor.next().await.unwrap().unwrap();
955 assert_eq!(
956 chunk_msg.into_chunk().unwrap().compact(),
957 StreamChunk::from_pretty(
958 " I I I
959 + 1 1 10",
960 )
961 );
962
963 executor.next().await.unwrap().unwrap();
965
966 let chunk_msg = executor.next().await.unwrap().unwrap();
967 assert_eq!(
968 chunk_msg.into_chunk().unwrap().compact(),
969 StreamChunk::from_pretty(
970 " I I I
971 U- 1 1 10
972 U+ 1 1 40",
973 )
974 );
975
976 executor.next().await.unwrap().unwrap();
978 }
979
980 #[tokio::test]
981 async fn test_empty_barrier_sink() {
982 use risingwave_common::types::DataType;
983
984 use crate::executor::Barrier;
985
986 let properties = maplit::btreemap! {
987 "connector".into() => "blackhole".into(),
988 "type".into() => "append-only".into(),
989 "force_append_only".into() => "true".into()
990 };
991 let columns = vec![
992 ColumnCatalog {
993 column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
994 is_hidden: false,
995 },
996 ColumnCatalog {
997 column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
998 is_hidden: false,
999 },
1000 ];
1001 let schema: Schema = columns
1002 .iter()
1003 .map(|column| Field::from(column.column_desc.clone()))
1004 .collect();
1005 let pk_indices = vec![0];
1006
1007 let source = MockSource::with_messages(vec![
1008 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1009 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1010 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1011 ])
1012 .into_executor(schema.clone(), pk_indices.clone());
1013
1014 let sink_param = SinkParam {
1015 sink_id: 0.into(),
1016 sink_name: "test".into(),
1017 properties,
1018
1019 columns: columns
1020 .iter()
1021 .filter(|col| !col.is_hidden)
1022 .map(|col| col.column_desc.clone())
1023 .collect(),
1024 downstream_pk: pk_indices.clone(),
1025 sink_type: SinkType::ForceAppendOnly,
1026 format_desc: None,
1027 db_name: "test".into(),
1028 sink_from_name: "test".into(),
1029 };
1030
1031 let info = ExecutorInfo::new(schema, pk_indices, "SinkExecutor".to_owned(), 0);
1032
1033 let sink = build_sink(sink_param.clone()).unwrap();
1034
1035 let sink_executor = SinkExecutor::new(
1036 ActorContext::for_test(0),
1037 info,
1038 source,
1039 SinkWriterParam::for_test(),
1040 sink,
1041 sink_param,
1042 columns,
1043 BoundedInMemLogStoreFactory::new(1),
1044 1024,
1045 vec![DataType::Int64, DataType::Int64],
1046 None,
1047 )
1048 .await
1049 .unwrap();
1050
1051 let mut executor = sink_executor.boxed().execute();
1052
1053 assert_eq!(
1055 executor.next().await.unwrap().unwrap(),
1056 Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1057 );
1058
1059 assert_eq!(
1061 executor.next().await.unwrap().unwrap(),
1062 Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1063 );
1064
1065 assert_eq!(
1067 executor.next().await.unwrap().unwrap(),
1068 Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1069 );
1070 }
1071}