risingwave_stream/executor/
sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem;
16
17use anyhow::anyhow;
18use futures::stream::select;
19use futures::{FutureExt, TryFutureExt, TryStreamExt};
20use itertools::Itertools;
21use risingwave_common::array::Op;
22use risingwave_common::array::stream_chunk::StreamChunkMut;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnCatalog, Field};
25use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
26use risingwave_common_estimate_size::EstimateSize;
27use risingwave_common_estimate_size::collections::EstimatedVec;
28use risingwave_common_rate_limit::RateLimit;
29use risingwave_connector::dispatch_sink;
30use risingwave_connector::sink::catalog::SinkType;
31use risingwave_connector::sink::log_store::{
32    FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
33    LogWriter, LogWriterExt, LogWriterMetrics,
34};
35use risingwave_connector::sink::{
36    GLOBAL_SINK_METRICS, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam,
37};
38use thiserror_ext::AsReport;
39use tokio::select;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
41use tokio::sync::oneshot;
42
43use crate::common::compact_chunk::{StreamChunkCompactor, merge_chunk_row};
44use crate::executor::prelude::*;
45pub struct SinkExecutor<F: LogStoreFactory> {
46    actor_context: ActorContextRef,
47    info: ExecutorInfo,
48    input: Executor,
49    sink: SinkImpl,
50    input_columns: Vec<ColumnCatalog>,
51    sink_param: SinkParam,
52    log_store_factory: F,
53    sink_writer_param: SinkWriterParam,
54    chunk_size: usize,
55    input_data_types: Vec<DataType>,
56    need_advance_delete: bool,
57    re_construct_with_sink_pk: bool,
58    compact_chunk: bool,
59    rate_limit: Option<u32>,
60}
61
62// Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT.
63fn force_append_only(c: StreamChunk) -> StreamChunk {
64    let mut c: StreamChunkMut = c.into();
65    for (_, mut r) in c.to_rows_mut() {
66        match r.op() {
67            Op::Insert => {}
68            Op::Delete | Op::UpdateDelete => r.set_vis(false),
69            Op::UpdateInsert => r.set_op(Op::Insert),
70        }
71    }
72    c.into()
73}
74
75// Drop all the INSERT messages in this chunk and convert UPDATE DELETE into DELETE.
76fn force_delete_only(c: StreamChunk) -> StreamChunk {
77    let mut c: StreamChunkMut = c.into();
78    for (_, mut r) in c.to_rows_mut() {
79        match r.op() {
80            Op::Delete => {}
81            Op::Insert | Op::UpdateInsert => r.set_vis(false),
82            Op::UpdateDelete => r.set_op(Op::Delete),
83        }
84    }
85    c.into()
86}
87
88impl<F: LogStoreFactory> SinkExecutor<F> {
89    #[allow(clippy::too_many_arguments)]
90    #[expect(clippy::unused_async)]
91    pub async fn new(
92        actor_context: ActorContextRef,
93        info: ExecutorInfo,
94        input: Executor,
95        sink_writer_param: SinkWriterParam,
96        sink: SinkImpl,
97        sink_param: SinkParam,
98        columns: Vec<ColumnCatalog>,
99        log_store_factory: F,
100        chunk_size: usize,
101        input_data_types: Vec<DataType>,
102        rate_limit: Option<u32>,
103    ) -> StreamExecutorResult<Self> {
104        let sink_input_schema: Schema = columns
105            .iter()
106            .map(|column| Field::from(&column.column_desc))
107            .collect();
108
109        if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
110            // Remove the partition column from the schema.
111            assert_eq!(sink_input_schema.data_types(), {
112                let mut data_type = info.schema.data_types();
113                data_type.remove(col_dix);
114                data_type
115            });
116        } else {
117            assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
118        }
119
120        let stream_key = info.pk_indices.clone();
121        let stream_key_sink_pk_mismatch = {
122            stream_key
123                .iter()
124                .any(|i| !sink_param.downstream_pk.contains(i))
125        };
126        // When stream key is different from the user defined primary key columns for sinks. The operations could be out of order
127        // stream key: a,b
128        // sink pk: a
129
130        // original:
131        // (1,1) -> (1,2)
132        // (1,2) -> (1,3)
133
134        // mv fragment 1:
135        // delete (1,1)
136
137        // mv fragment 2:
138        // insert (1,2)
139        // delete (1,2)
140
141        // mv fragment 3:
142        // insert (1,3)
143
144        // merge to sink fragment:
145        // insert (1,3)
146        // insert (1,2)
147        // delete (1,2)
148        // delete (1,1)
149        // So we do additional compaction in the sink executor per barrier.
150
151        //     1. compact all the changes with the stream key.
152        //     2. sink all the delete events and then sink all insert events.
153
154        // after compacting with the stream key, the two event with the same user defined sink pk must have different stream key.
155        // So the delete event is not to delete the inserted record in our internal streaming SQL semantic.
156        let need_advance_delete =
157            stream_key_sink_pk_mismatch && sink_param.sink_type != SinkType::AppendOnly;
158        // NOTE(st1page): reconstruct with sink pk need extra cost to buffer a barrier's data, so currently we bind it with mismatch case.
159        let re_construct_with_sink_pk = need_advance_delete
160            && sink_param.sink_type == SinkType::Upsert
161            && !sink_param.downstream_pk.is_empty();
162        // Don't compact chunk for blackhole sink for better benchmark performance.
163        let compact_chunk = !sink.is_blackhole();
164
165        tracing::info!(
166            sink_id = sink_param.sink_id.sink_id,
167            actor_id = actor_context.id,
168            need_advance_delete,
169            re_construct_with_sink_pk,
170            compact_chunk,
171            "Sink executor info"
172        );
173
174        Ok(Self {
175            actor_context,
176            info,
177            input,
178            sink,
179            input_columns: columns,
180            sink_param,
181            log_store_factory,
182            sink_writer_param,
183            chunk_size,
184            input_data_types,
185            need_advance_delete,
186            re_construct_with_sink_pk,
187            compact_chunk,
188            rate_limit,
189        })
190    }
191
192    fn execute_inner(self) -> BoxedMessageStream {
193        let sink_id = self.sink_param.sink_id;
194        let actor_id = self.actor_context.id;
195        let fragment_id = self.actor_context.fragment_id;
196
197        let stream_key = self.info.pk_indices.clone();
198        let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
199            sink_id,
200            actor_id,
201            fragment_id,
202        );
203
204        let input = self.input.execute();
205
206        let input = input.inspect_ok(move |msg| {
207            if let Message::Chunk(c) = msg {
208                metrics.sink_input_row_count.inc_by(c.capacity() as u64);
209                metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
210            }
211        });
212
213        let processed_input = Self::process_msg(
214            input,
215            self.sink_param.sink_type,
216            stream_key,
217            self.need_advance_delete,
218            self.re_construct_with_sink_pk,
219            self.chunk_size,
220            self.input_data_types,
221            self.sink_param.downstream_pk.clone(),
222            metrics.sink_chunk_buffer_size,
223            self.compact_chunk,
224        );
225
226        if self.sink.is_sink_into_table() {
227            // TODO(hzxa21): support rate limit?
228            processed_input.boxed()
229        } else {
230            let labels = [
231                &actor_id.to_string(),
232                &sink_id.to_string(),
233                self.sink_param.sink_name.as_str(),
234            ];
235            let log_store_first_write_epoch = GLOBAL_SINK_METRICS
236                .log_store_first_write_epoch
237                .with_guarded_label_values(&labels);
238            let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
239                .log_store_latest_write_epoch
240                .with_guarded_label_values(&labels);
241            let log_store_write_rows = GLOBAL_SINK_METRICS
242                .log_store_write_rows
243                .with_guarded_label_values(&labels);
244            let log_writer_metrics = LogWriterMetrics {
245                log_store_first_write_epoch,
246                log_store_latest_write_epoch,
247                log_store_write_rows,
248            };
249
250            let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
251            // Init the rate limit
252            rate_limit_tx.send(self.rate_limit.into()).unwrap();
253
254            let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
255
256            self.log_store_factory
257                .build()
258                .map(move |(log_reader, log_writer)| {
259                    let write_log_stream = Self::execute_write_log(
260                        processed_input,
261                        log_writer.monitored(log_writer_metrics),
262                        actor_id,
263                        rate_limit_tx,
264                        rebuild_sink_tx,
265                    );
266
267                    let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
268                        let consume_log_stream = Self::execute_consume_log(
269                            *sink,
270                            log_reader,
271                            self.input_columns,
272                            self.sink_param,
273                            self.sink_writer_param,
274                            self.actor_context,
275                            rate_limit_rx,
276                            rebuild_sink_rx,
277                        )
278                        .instrument_await(
279                            await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
280                        )
281                        .map_ok(|never| never); // unify return type to `Message`
282
283                        consume_log_stream.boxed()
284                    });
285                    select(consume_log_stream_future.into_stream(), write_log_stream)
286                })
287                .into_stream()
288                .flatten()
289                .boxed()
290        }
291    }
292
293    #[try_stream(ok = Message, error = StreamExecutorError)]
294    async fn execute_write_log<W: LogWriter>(
295        input: impl MessageStream,
296        mut log_writer: W,
297        actor_id: ActorId,
298        rate_limit_tx: UnboundedSender<RateLimit>,
299        rebuild_sink_tx: UnboundedSender<(Arc<Bitmap>, oneshot::Sender<()>)>,
300    ) {
301        pin_mut!(input);
302        let barrier = expect_first_barrier(&mut input).await?;
303        let epoch_pair = barrier.epoch;
304        let is_pause_on_startup = barrier.is_pause_on_startup();
305        // Propagate the first barrier
306        yield Message::Barrier(barrier);
307
308        log_writer.init(epoch_pair, is_pause_on_startup).await?;
309
310        let mut is_paused = false;
311
312        #[for_await]
313        for msg in input {
314            match msg? {
315                Message::Watermark(w) => yield Message::Watermark(w),
316                Message::Chunk(chunk) => {
317                    assert!(
318                        !is_paused,
319                        "Actor {actor_id} should not receive any data after pause"
320                    );
321                    log_writer.write_chunk(chunk.clone()).await?;
322                    yield Message::Chunk(chunk);
323                }
324                Message::Barrier(barrier) => {
325                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
326                    let post_flush = log_writer
327                        .flush_current_epoch(
328                            barrier.epoch.curr,
329                            FlushCurrentEpochOptions {
330                                is_checkpoint: barrier.kind.is_checkpoint(),
331                                new_vnode_bitmap: update_vnode_bitmap.clone(),
332                                is_stop: barrier.is_stop(actor_id),
333                            },
334                        )
335                        .await?;
336
337                    let mutation = barrier.mutation.clone();
338                    yield Message::Barrier(barrier);
339                    if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
340                        && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
341                    {
342                        let (tx, rx) = oneshot::channel();
343                        rebuild_sink_tx
344                            .send((new_vnode_bitmap, tx))
345                            .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
346                        rx.await
347                            .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
348                    }
349                    post_flush.post_yield_barrier().await?;
350
351                    if let Some(mutation) = mutation.as_deref() {
352                        match mutation {
353                            Mutation::Pause => {
354                                log_writer.pause()?;
355                                is_paused = true;
356                            }
357                            Mutation::Resume => {
358                                log_writer.resume()?;
359                                is_paused = false;
360                            }
361                            Mutation::Throttle(actor_to_apply) => {
362                                if let Some(new_rate_limit) = actor_to_apply.get(&actor_id) {
363                                    tracing::info!(
364                                        rate_limit = new_rate_limit,
365                                        "received sink rate limit on actor {actor_id}"
366                                    );
367                                    if let Err(e) = rate_limit_tx.send((*new_rate_limit).into()) {
368                                        error!(
369                                            error = %e.as_report(),
370                                            "fail to send sink ate limit update"
371                                        );
372                                        return Err(StreamExecutorError::from(
373                                            e.to_report_string(),
374                                        ));
375                                    }
376                                }
377                            }
378                            _ => (),
379                        }
380                    }
381                }
382            }
383        }
384    }
385
386    #[allow(clippy::too_many_arguments)]
387    #[try_stream(ok = Message, error = StreamExecutorError)]
388    async fn process_msg(
389        input: impl MessageStream,
390        sink_type: SinkType,
391        stream_key: PkIndices,
392        need_advance_delete: bool,
393        re_construct_with_sink_pk: bool,
394        chunk_size: usize,
395        input_data_types: Vec<DataType>,
396        down_stream_pk: Vec<usize>,
397        sink_chunk_buffer_size_metrics: LabelGuardedIntGauge<3>,
398        compact_chunk: bool,
399    ) {
400        // need to buffer chunks during one barrier
401        if need_advance_delete || re_construct_with_sink_pk {
402            let mut chunk_buffer = EstimatedVec::new();
403            let mut watermark: Option<super::Watermark> = None;
404            #[for_await]
405            for msg in input {
406                match msg? {
407                    Message::Watermark(w) => watermark = Some(w),
408                    Message::Chunk(c) => {
409                        chunk_buffer.push(c);
410
411                        sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
412                    }
413                    Message::Barrier(barrier) => {
414                        let chunks = mem::take(&mut chunk_buffer).into_inner();
415                        let chunks = if need_advance_delete {
416                            let mut delete_chunks = vec![];
417                            let mut insert_chunks = vec![];
418
419                            for c in StreamChunkCompactor::new(stream_key.clone(), chunks)
420                                .into_compacted_chunks()
421                            {
422                                if sink_type != SinkType::ForceAppendOnly {
423                                    // Force append-only by dropping UPDATE/DELETE messages. We do this when the
424                                    // user forces the sink to be append-only while it is actually not based on
425                                    // the frontend derivation result.
426                                    let chunk = force_delete_only(c.clone());
427                                    if chunk.cardinality() > 0 {
428                                        delete_chunks.push(chunk);
429                                    }
430                                }
431                                let chunk = force_append_only(c);
432                                if chunk.cardinality() > 0 {
433                                    insert_chunks.push(chunk);
434                                }
435                            }
436                            delete_chunks
437                                .into_iter()
438                                .chain(insert_chunks.into_iter())
439                                .collect()
440                        } else {
441                            chunks
442                        };
443                        if re_construct_with_sink_pk {
444                            let chunks = StreamChunkCompactor::new(down_stream_pk.clone(), chunks)
445                                .reconstructed_compacted_chunks(
446                                    chunk_size,
447                                    input_data_types.clone(),
448                                    sink_type != SinkType::ForceAppendOnly,
449                                );
450                            for c in chunks {
451                                yield Message::Chunk(c);
452                            }
453                        } else {
454                            let mut chunk_builder =
455                                StreamChunkBuilder::new(chunk_size, input_data_types.clone());
456                            for chunk in chunks {
457                                for (op, row) in chunk.rows() {
458                                    if let Some(c) = chunk_builder.append_row(op, row) {
459                                        yield Message::Chunk(c);
460                                    }
461                                }
462                            }
463
464                            if let Some(c) = chunk_builder.take() {
465                                yield Message::Chunk(c);
466                            }
467                        };
468
469                        if let Some(w) = mem::take(&mut watermark) {
470                            yield Message::Watermark(w)
471                        }
472                        yield Message::Barrier(barrier);
473                    }
474                }
475            }
476        } else {
477            #[for_await]
478            for msg in input {
479                match msg? {
480                    Message::Watermark(w) => yield Message::Watermark(w),
481                    Message::Chunk(mut chunk) => {
482                        // Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE
483                        // V->V).
484                        if compact_chunk {
485                            chunk = merge_chunk_row(chunk, &stream_key);
486                        }
487                        match sink_type {
488                            SinkType::AppendOnly => yield Message::Chunk(chunk),
489                            SinkType::ForceAppendOnly => {
490                                // Force append-only by dropping UPDATE/DELETE messages. We do this when the
491                                // user forces the sink to be append-only while it is actually not based on
492                                // the frontend derivation result.
493                                yield Message::Chunk(force_append_only(chunk))
494                            }
495                            SinkType::Upsert => {
496                                // Making sure the optimization in https://github.com/risingwavelabs/risingwave/pull/12250 is correct,
497                                // it is needed to do the compaction here.
498                                for chunk in
499                                    StreamChunkCompactor::new(stream_key.clone(), vec![chunk])
500                                        .into_compacted_chunks()
501                                {
502                                    yield Message::Chunk(chunk)
503                                }
504                            }
505                        }
506                    }
507                    Message::Barrier(barrier) => {
508                        yield Message::Barrier(barrier);
509                    }
510                }
511            }
512        }
513    }
514
515    #[expect(clippy::too_many_arguments)]
516    async fn execute_consume_log<S: Sink, R: LogReader>(
517        sink: S,
518        log_reader: R,
519        columns: Vec<ColumnCatalog>,
520        sink_param: SinkParam,
521        mut sink_writer_param: SinkWriterParam,
522        actor_context: ActorContextRef,
523        rate_limit_rx: UnboundedReceiver<RateLimit>,
524        mut rebuild_sink_rx: UnboundedReceiver<(Arc<Bitmap>, oneshot::Sender<()>)>,
525    ) -> StreamExecutorResult<!> {
526        let visible_columns = columns
527            .iter()
528            .enumerate()
529            .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
530            .collect_vec();
531
532        let labels = [
533            &actor_context.id.to_string(),
534            sink_writer_param.connector.as_str(),
535            &sink_writer_param.sink_id.to_string(),
536            sink_writer_param.sink_name.as_str(),
537        ];
538        let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
539            .log_store_reader_wait_new_future_duration_ns
540            .with_guarded_label_values(&labels);
541        let log_store_read_rows = GLOBAL_SINK_METRICS
542            .log_store_read_rows
543            .with_guarded_label_values(&labels);
544        let log_store_read_bytes = GLOBAL_SINK_METRICS
545            .log_store_read_bytes
546            .with_guarded_label_values(&labels);
547        let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
548            .log_store_latest_read_epoch
549            .with_guarded_label_values(&labels);
550        let metrics = LogReaderMetrics {
551            log_store_latest_read_epoch,
552            log_store_read_rows,
553            log_store_read_bytes,
554            log_store_reader_wait_new_future_duration_ns,
555        };
556
557        let mut log_reader = log_reader
558            .transform_chunk(move |chunk| {
559                if visible_columns.len() != columns.len() {
560                    // Do projection here because we may have columns that aren't visible to
561                    // the downstream.
562                    chunk.project(&visible_columns)
563                } else {
564                    chunk
565                }
566            })
567            .monitored(metrics)
568            .rate_limited(rate_limit_rx);
569
570        loop {
571            let future = async {
572                log_reader.init().await?;
573
574                loop {
575                    let Err(e) = sink
576                        .new_log_sinker(sink_writer_param.clone())
577                        .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
578                        .await;
579                    GLOBAL_ERROR_METRICS.user_sink_error.report([
580                        "sink_executor_error".to_owned(),
581                        sink_param.sink_id.to_string(),
582                        sink_param.sink_name.clone(),
583                        actor_context.fragment_id.to_string(),
584                    ]);
585
586                    if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
587                        meta_client
588                            .add_sink_fail_evet_log(
589                                sink_writer_param.sink_id.sink_id,
590                                sink_writer_param.sink_name.clone(),
591                                sink_writer_param.connector.clone(),
592                                e.to_report_string(),
593                            )
594                            .await;
595                    }
596
597                    if F::ALLOW_REWIND {
598                        match log_reader.rewind().await {
599                            Ok(()) => {
600                                warn!(
601                                    error = %e.as_report(),
602                                    executor_id = sink_writer_param.executor_id,
603                                    sink_id = sink_param.sink_id.sink_id,
604                                    "reset log reader stream successfully after sink error"
605                                );
606                                Ok(())
607                            }
608                            Err(rewind_err) => {
609                                error!(
610                                    error = %rewind_err.as_report(),
611                                    "fail to rewind log reader"
612                                );
613                                Err(e)
614                            }
615                        }
616                    } else {
617                        Err(e)
618                    }
619                    .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
620                }
621            };
622            select! {
623                result = future => {
624                    let Err(e): StreamExecutorResult<!> = result;
625                    return Err(e);
626                }
627                result = rebuild_sink_rx.recv() => {
628                    let (new_vnode, notify) = result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))?;
629                    sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
630                    if notify.send(()).is_err() {
631                        warn!("failed to notify rebuild sink");
632                    }
633                }
634            }
635        }
636    }
637}
638
639impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
640    fn execute(self: Box<Self>) -> BoxedMessageStream {
641        self.execute_inner()
642    }
643}
644
645#[cfg(test)]
646mod test {
647    use risingwave_common::catalog::{ColumnDesc, ColumnId};
648    use risingwave_common::util::epoch::test_epoch;
649    use risingwave_connector::sink::build_sink;
650
651    use super::*;
652    use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
653    use crate::executor::test_utils::*;
654
655    #[tokio::test]
656    async fn test_force_append_only_sink() {
657        use risingwave_common::array::StreamChunkTestExt;
658        use risingwave_common::array::stream_chunk::StreamChunk;
659        use risingwave_common::types::DataType;
660
661        use crate::executor::Barrier;
662
663        let properties = maplit::btreemap! {
664            "connector".into() => "blackhole".into(),
665            "type".into() => "append-only".into(),
666            "force_append_only".into() => "true".into()
667        };
668
669        // We have two visible columns and one hidden column. The hidden column will be pruned out
670        // within the sink executor.
671        let columns = vec![
672            ColumnCatalog {
673                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
674                is_hidden: false,
675            },
676            ColumnCatalog {
677                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
678                is_hidden: false,
679            },
680            ColumnCatalog {
681                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
682                is_hidden: true,
683            },
684        ];
685        let schema: Schema = columns
686            .iter()
687            .map(|column| Field::from(column.column_desc.clone()))
688            .collect();
689        let pk_indices = vec![0];
690
691        let source = MockSource::with_messages(vec![
692            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
693            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
694                " I I I
695                    + 3 2 1",
696            ))),
697            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
698            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
699                "  I I I
700                    U- 3 2 1
701                    U+ 3 4 1
702                     + 5 6 7",
703            ))),
704            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
705                " I I I
706                    - 5 6 7",
707            ))),
708        ])
709        .into_executor(schema.clone(), pk_indices.clone());
710
711        let sink_param = SinkParam {
712            sink_id: 0.into(),
713            sink_name: "test".into(),
714            properties,
715
716            columns: columns
717                .iter()
718                .filter(|col| !col.is_hidden)
719                .map(|col| col.column_desc.clone())
720                .collect(),
721            downstream_pk: pk_indices.clone(),
722            sink_type: SinkType::ForceAppendOnly,
723            format_desc: None,
724            db_name: "test".into(),
725            sink_from_name: "test".into(),
726        };
727
728        let info = ExecutorInfo::new(schema, pk_indices, "SinkExecutor".to_owned(), 0);
729
730        let sink = build_sink(sink_param.clone()).unwrap();
731
732        let sink_executor = SinkExecutor::new(
733            ActorContext::for_test(0),
734            info,
735            source,
736            SinkWriterParam::for_test(),
737            sink,
738            sink_param,
739            columns.clone(),
740            BoundedInMemLogStoreFactory::new(1),
741            1024,
742            vec![DataType::Int32, DataType::Int32, DataType::Int32],
743            None,
744        )
745        .await
746        .unwrap();
747
748        let mut executor = sink_executor.boxed().execute();
749
750        // Barrier message.
751        executor.next().await.unwrap().unwrap();
752
753        let chunk_msg = executor.next().await.unwrap().unwrap();
754        assert_eq!(
755            chunk_msg.into_chunk().unwrap().compact(),
756            StreamChunk::from_pretty(
757                " I I I
758                + 3 2 1",
759            )
760        );
761
762        // Barrier message.
763        executor.next().await.unwrap().unwrap();
764
765        let chunk_msg = executor.next().await.unwrap().unwrap();
766        assert_eq!(
767            chunk_msg.into_chunk().unwrap().compact(),
768            StreamChunk::from_pretty(
769                " I I I
770                + 3 4 1
771                + 5 6 7",
772            )
773        );
774
775        // Should not receive the third stream chunk message because the force-append-only sink
776        // executor will drop all DELETE messages.
777
778        // The last barrier message.
779        executor.next().await.unwrap().unwrap();
780    }
781
782    #[tokio::test]
783    async fn stream_key_sink_pk_mismatch() {
784        use risingwave_common::array::StreamChunkTestExt;
785        use risingwave_common::array::stream_chunk::StreamChunk;
786        use risingwave_common::types::DataType;
787
788        use crate::executor::Barrier;
789
790        let properties = maplit::btreemap! {
791            "connector".into() => "blackhole".into(),
792        };
793
794        // We have two visible columns and one hidden column. The hidden column will be pruned out
795        // within the sink executor.
796        let columns = vec![
797            ColumnCatalog {
798                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
799                is_hidden: false,
800            },
801            ColumnCatalog {
802                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
803                is_hidden: false,
804            },
805            ColumnCatalog {
806                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
807                is_hidden: true,
808            },
809        ];
810        let schema: Schema = columns
811            .iter()
812            .map(|column| Field::from(column.column_desc.clone()))
813            .collect();
814
815        let source = MockSource::with_messages(vec![
816            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
817            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
818                " I I I
819                    + 1 1 10",
820            ))),
821            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
822            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
823                " I I I
824                    + 1 3 30",
825            ))),
826            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
827                " I I I
828                    + 1 2 20
829                    - 1 2 20",
830            ))),
831            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
832                " I I I
833                    - 1 1 10
834                    + 1 1 40",
835            ))),
836            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
837        ])
838        .into_executor(schema.clone(), vec![0, 1]);
839
840        let sink_param = SinkParam {
841            sink_id: 0.into(),
842            sink_name: "test".into(),
843            properties,
844
845            columns: columns
846                .iter()
847                .filter(|col| !col.is_hidden)
848                .map(|col| col.column_desc.clone())
849                .collect(),
850            downstream_pk: vec![0],
851            sink_type: SinkType::Upsert,
852            format_desc: None,
853            db_name: "test".into(),
854            sink_from_name: "test".into(),
855        };
856
857        let info = ExecutorInfo::new(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
858
859        let sink = build_sink(sink_param.clone()).unwrap();
860
861        let sink_executor = SinkExecutor::new(
862            ActorContext::for_test(0),
863            info,
864            source,
865            SinkWriterParam::for_test(),
866            sink,
867            sink_param,
868            columns.clone(),
869            BoundedInMemLogStoreFactory::new(1),
870            1024,
871            vec![DataType::Int64, DataType::Int64, DataType::Int64],
872            None,
873        )
874        .await
875        .unwrap();
876
877        let mut executor = sink_executor.boxed().execute();
878
879        // Barrier message.
880        executor.next().await.unwrap().unwrap();
881
882        let chunk_msg = executor.next().await.unwrap().unwrap();
883        assert_eq!(
884            chunk_msg.into_chunk().unwrap().compact(),
885            StreamChunk::from_pretty(
886                " I I I
887                + 1 1 10",
888            )
889        );
890
891        // Barrier message.
892        executor.next().await.unwrap().unwrap();
893
894        let chunk_msg = executor.next().await.unwrap().unwrap();
895        assert_eq!(
896            chunk_msg.into_chunk().unwrap().compact(),
897            StreamChunk::from_pretty(
898                " I I I
899                U- 1 1 10
900                U+ 1 1 40",
901            )
902        );
903
904        // The last barrier message.
905        executor.next().await.unwrap().unwrap();
906    }
907
908    #[tokio::test]
909    async fn test_empty_barrier_sink() {
910        use risingwave_common::types::DataType;
911
912        use crate::executor::Barrier;
913
914        let properties = maplit::btreemap! {
915            "connector".into() => "blackhole".into(),
916            "type".into() => "append-only".into(),
917            "force_append_only".into() => "true".into()
918        };
919        let columns = vec![
920            ColumnCatalog {
921                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
922                is_hidden: false,
923            },
924            ColumnCatalog {
925                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
926                is_hidden: false,
927            },
928        ];
929        let schema: Schema = columns
930            .iter()
931            .map(|column| Field::from(column.column_desc.clone()))
932            .collect();
933        let pk_indices = vec![0];
934
935        let source = MockSource::with_messages(vec![
936            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
937            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
938            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
939        ])
940        .into_executor(schema.clone(), pk_indices.clone());
941
942        let sink_param = SinkParam {
943            sink_id: 0.into(),
944            sink_name: "test".into(),
945            properties,
946
947            columns: columns
948                .iter()
949                .filter(|col| !col.is_hidden)
950                .map(|col| col.column_desc.clone())
951                .collect(),
952            downstream_pk: pk_indices.clone(),
953            sink_type: SinkType::ForceAppendOnly,
954            format_desc: None,
955            db_name: "test".into(),
956            sink_from_name: "test".into(),
957        };
958
959        let info = ExecutorInfo::new(schema, pk_indices, "SinkExecutor".to_owned(), 0);
960
961        let sink = build_sink(sink_param.clone()).unwrap();
962
963        let sink_executor = SinkExecutor::new(
964            ActorContext::for_test(0),
965            info,
966            source,
967            SinkWriterParam::for_test(),
968            sink,
969            sink_param,
970            columns,
971            BoundedInMemLogStoreFactory::new(1),
972            1024,
973            vec![DataType::Int64, DataType::Int64],
974            None,
975        )
976        .await
977        .unwrap();
978
979        let mut executor = sink_executor.boxed().execute();
980
981        // Barrier message.
982        assert_eq!(
983            executor.next().await.unwrap().unwrap(),
984            Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
985        );
986
987        // Barrier message.
988        assert_eq!(
989            executor.next().await.unwrap().unwrap(),
990            Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
991        );
992
993        // The last barrier message.
994        assert_eq!(
995            executor.next().await.unwrap().unwrap(),
996            Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
997        );
998    }
999}