risingwave_stream/executor/
sink.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{BTreeMap, HashMap};
17use std::mem;
18
19use anyhow::anyhow;
20use futures::stream::select;
21use futures::{FutureExt, TryFutureExt, TryStreamExt};
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::array::stream_chunk::StreamChunkMut;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{ColumnCatalog, Field};
27use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
28use risingwave_common_estimate_size::EstimateSize;
29use risingwave_common_estimate_size::collections::EstimatedVec;
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_connector::dispatch_sink;
32use risingwave_connector::sink::catalog::{SinkId, SinkType};
33use risingwave_connector::sink::log_store::{
34    FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
35    LogWriter, LogWriterExt, LogWriterMetrics,
36};
37use risingwave_connector::sink::{
38    GLOBAL_SINK_METRICS, LogSinker, SINK_USER_FORCE_COMPACTION, Sink, SinkImpl, SinkParam,
39    SinkWriterParam,
40};
41use risingwave_pb::common::ThrottleType;
42use risingwave_pb::id::FragmentId;
43use risingwave_pb::stream_plan::stream_node::StreamKind;
44use thiserror_ext::AsReport;
45use tokio::select;
46use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
47use tokio::sync::oneshot;
48
49use crate::common::change_buffer::{OutputKind, output_kind};
50use crate::common::compact_chunk::{
51    InconsistencyBehavior, StreamChunkCompactor, compact_chunk_inline,
52};
53use crate::executor::prelude::*;
54pub struct SinkExecutor<F: LogStoreFactory> {
55    actor_context: ActorContextRef,
56    info: ExecutorInfo,
57    input: Executor,
58    sink: SinkImpl,
59    input_columns: Vec<ColumnCatalog>,
60    sink_param: SinkParam,
61    log_store_factory: F,
62    sink_writer_param: SinkWriterParam,
63    chunk_size: usize,
64    input_data_types: Vec<DataType>,
65    non_append_only_behavior: Option<NonAppendOnlyBehavior>,
66    rate_limit: Option<u32>,
67}
68
69// Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT.
70fn force_append_only(c: StreamChunk) -> StreamChunk {
71    let mut c: StreamChunkMut = c.into();
72    for (_, mut r) in c.to_rows_mut() {
73        match r.op() {
74            Op::Insert => {}
75            Op::Delete | Op::UpdateDelete => r.set_vis(false),
76            Op::UpdateInsert => r.set_op(Op::Insert),
77        }
78    }
79    c.into()
80}
81
82// Drop all the INSERT messages in this chunk and convert UPDATE DELETE into DELETE.
83fn force_delete_only(c: StreamChunk) -> StreamChunk {
84    let mut c: StreamChunkMut = c.into();
85    for (_, mut r) in c.to_rows_mut() {
86        match r.op() {
87            Op::Delete => {}
88            Op::Insert | Op::UpdateInsert => r.set_vis(false),
89            Op::UpdateDelete => r.set_op(Op::Delete),
90        }
91    }
92    c.into()
93}
94
95/// When the sink is non-append-only, i.e. upsert or retract, we need to do some extra work for
96/// correctness and performance.
97#[derive(Clone, Copy, Debug)]
98struct NonAppendOnlyBehavior {
99    /// Whether the user specifies a primary key for the sink, and it matches the derived stream
100    /// key of the stream.
101    ///
102    /// By matching, we mean that stream key is a subset of the downstream pk.
103    pk_specified_and_matched: bool,
104    /// Whether the user forces buffering all chunks between two barriers.
105    force_compaction: bool,
106}
107
108impl NonAppendOnlyBehavior {
109    /// NOTE(kwannoel):
110    ///
111    /// After the optimization in <https://github.com/risingwavelabs/risingwave/pull/12250>,
112    /// `DELETE`s will be sequenced before `INSERT`s in JDBC sinks and PG rust sink.
113    /// There's a risk that adjacent chunks with `DELETE`s on the same PK will get
114    /// merged into a single chunk, since the logstore format doesn't preserve chunk
115    /// boundaries. Then we will have double `DELETE`s followed by unspecified sequence
116    /// of `INSERT`s, and lead to inconsistent data downstream.
117    ///
118    /// We only need to do the compaction for non-append-only sinks, when the upstream and
119    /// downstream PKs are matched. When the upstream and downstream PKs are not matched,
120    /// we will buffer the chunks between two barriers, so the compaction is not needed,
121    /// since the barriers will preserve chunk boundaries.
122    ///
123    /// When `force_compaction` is true, we also skip compaction here, since the buffering
124    /// will also make compaction.
125    ///
126    /// When the sink is an append-only sink, it is either `force_append_only` or
127    /// `append_only`, we should only append to downstream, so there should not be any
128    /// overlapping keys.
129    fn should_compact_in_log_reader(self) -> bool {
130        self.pk_specified_and_matched && !self.force_compaction
131    }
132
133    /// When stream key is different from the user defined primary key columns for sinks.
134    /// The operations could be out of order.
135    ///
136    /// For example, we have a stream with derived stream key `a, b` and user-specified sink
137    /// primary key `a`. Assume that we have `(1, 1)` in the table. Then, we perform two `UPDATE`
138    /// operations:
139    ///
140    /// ```text
141    /// UPDATE SET b = 2 WHERE a = 1 ... which issues:
142    ///   - (1, 1)
143    ///   + (1, 2)
144    ///
145    /// UPDATE SET b = 3 WHERE a = 1 ... which issues:
146    ///   - (1, 2)
147    ///   + (1, 3)
148    /// ```
149    ///
150    /// When these changes go into streaming pipeline, they could be shuffled to different parallelism
151    /// (actor), given that they are under different stream keys.
152    ///
153    /// ```text
154    /// Actor 1:
155    /// - (1, 1)
156    ///
157    /// Actor 2:
158    /// + (1, 2)
159    /// - (1, 2)
160    ///
161    /// Actor 3:
162    /// + (1, 3)
163    /// ```
164    ///
165    /// When these records are merged back into sink actor, we may get the records from different
166    /// parallelism in arbitrary order, like:
167    ///
168    /// ```text
169    /// + (1, 2) -- Actor 2, first row
170    /// + (1, 3) -- Actor 3
171    /// - (1, 1) -- Actor 1
172    /// - (1, 2) -- Actor 2, second row
173    /// ```
174    ///
175    /// Note that in terms of stream key (`a, b`), the operations in the order above are completely
176    /// correct, because we are operating on 3 different rows. However, in terms of user defined sink
177    /// primary key `a`, we're violating the unique constraint all the time.
178    ///
179    /// Therefore, in this case, we have to do additional reordering in the sink executor per barrier.
180    /// Specifically, we need to:
181    ///
182    /// First, compact all the changes with the stream key, so we have:
183    /// ```text
184    /// + (1, 3)
185    /// - (1, 1)
186    /// ```
187    ///
188    /// Then, sink all the delete events before sinking all insert events, so we have:
189    /// ```text
190    /// - (1, 1)
191    /// + (1, 3)
192    /// ```
193    /// Since we've compacted the chunk with the stream key, the `DELETE` records survived must be to
194    /// delete an existing row, so we can safely move them to the front. After the deletion is done,
195    /// we can then safely sink the insert events with uniqueness guarantee.
196    ///
197    /// When `force_compaction` is true, we also perform additional reordering to gain the
198    /// benefits of compaction:
199    /// - reduce the number of output messages;
200    /// - emit at most one update per key within a barrier interval, simplifying downstream logic.
201    fn should_reorder_records(self) -> bool {
202        !self.pk_specified_and_matched || self.force_compaction
203    }
204}
205
206/// Get the output kind for chunk compaction based on the given sink type.
207fn compact_output_kind(sink_type: SinkType) -> OutputKind {
208    match sink_type {
209        SinkType::Upsert => output_kind::UPSERT,
210        SinkType::Retract => output_kind::RETRACT,
211        // There won't be any `Update` or `Delete` in the chunk, so it doesn't matter.
212        SinkType::AppendOnly => output_kind::RETRACT,
213    }
214}
215
216/// Dispatch the code block to different output kinds for chunk compaction based on sink type.
217macro_rules! dispatch_output_kind {
218    ($sink_type:expr, $KIND:ident, $body:tt) => {
219        #[allow(unused_braces)]
220        match compact_output_kind($sink_type) {
221            output_kind::UPSERT => {
222                const KIND: OutputKind = output_kind::UPSERT;
223                $body
224            }
225            output_kind::RETRACT => {
226                const KIND: OutputKind = output_kind::RETRACT;
227                $body
228            }
229        }
230    };
231}
232
233impl<F: LogStoreFactory> SinkExecutor<F> {
234    #[expect(clippy::too_many_arguments)]
235    pub fn new(
236        actor_context: ActorContextRef,
237        info: ExecutorInfo,
238        input: Executor,
239        sink_writer_param: SinkWriterParam,
240        sink: SinkImpl,
241        sink_param: SinkParam,
242        columns: Vec<ColumnCatalog>,
243        log_store_factory: F,
244        chunk_size: usize,
245        input_data_types: Vec<DataType>,
246        rate_limit: Option<u32>,
247    ) -> StreamExecutorResult<Self> {
248        let sink_input_schema: Schema = columns
249            .iter()
250            .map(|column| Field::from(&column.column_desc))
251            .collect();
252
253        if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
254            // Remove the partition column from the schema.
255            assert_eq!(sink_input_schema.data_types(), {
256                let mut data_type = info.schema.data_types();
257                data_type.remove(col_dix);
258                data_type
259            });
260        } else {
261            assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
262        }
263
264        let non_append_only_behavior = if !sink_param.sink_type.is_append_only() {
265            let stream_key = &info.stream_key;
266            let pk_specified_and_matched = (sink_param.downstream_pk.as_ref())
267                .is_some_and(|downstream_pk| stream_key.iter().all(|i| downstream_pk.contains(i)));
268            let force_compaction = sink_param
269                .properties
270                .get(SINK_USER_FORCE_COMPACTION)
271                .map(|v| v.eq_ignore_ascii_case("true"))
272                .unwrap_or(false);
273            Some(NonAppendOnlyBehavior {
274                pk_specified_and_matched,
275                force_compaction,
276            })
277        } else {
278            None
279        };
280
281        tracing::info!(
282            sink_id = %sink_param.sink_id,
283            actor_id = %actor_context.id,
284            ?non_append_only_behavior,
285            "Sink executor info"
286        );
287
288        Ok(Self {
289            actor_context,
290            info,
291            input,
292            sink,
293            input_columns: columns,
294            sink_param,
295            log_store_factory,
296            sink_writer_param,
297            chunk_size,
298            input_data_types,
299            non_append_only_behavior,
300            rate_limit,
301        })
302    }
303
304    fn execute_inner(self) -> BoxedMessageStream {
305        let sink_id = self.sink_param.sink_id;
306        let actor_id = self.actor_context.id;
307        let fragment_id = self.actor_context.fragment_id;
308
309        let stream_key = self.info.stream_key.clone();
310        let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
311            sink_id,
312            actor_id,
313            fragment_id,
314        );
315
316        // When processing upsert stream, we need to tolerate the inconsistency (mismatched `DELETE`
317        // and `INSERT` pairs) when compacting input chunks with derived stream key.
318        let input_compact_ib = if self.input.stream_kind() == StreamKind::Upsert {
319            InconsistencyBehavior::Tolerate
320        } else {
321            InconsistencyBehavior::Panic
322        };
323
324        let input = self.input.execute();
325
326        let input = input.inspect_ok(move |msg| {
327            if let Message::Chunk(c) = msg {
328                metrics.sink_input_row_count.inc_by(c.capacity() as u64);
329                metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
330            }
331        });
332
333        let processed_input = Self::process_msg(
334            input,
335            self.sink_param.sink_type,
336            stream_key,
337            self.chunk_size,
338            self.input_data_types,
339            input_compact_ib,
340            self.sink_param.downstream_pk.clone(),
341            self.non_append_only_behavior,
342            metrics.sink_chunk_buffer_size,
343            self.sink.is_blackhole(), // skip compact for blackhole for better benchmark results
344        );
345
346        let processed_input = if self.sink_param.ignore_delete {
347            // Drop UPDATE/DELETE messages if specified `ignore_delete` (formerly `force_append_only`).
348            processed_input
349                .map_ok(|msg| match msg {
350                    Message::Chunk(chunk) => Message::Chunk(force_append_only(chunk)),
351                    other => other,
352                })
353                .left_stream()
354        } else {
355            processed_input.right_stream()
356        };
357
358        if self.sink.is_sink_into_table() {
359            // TODO(hzxa21): support rate limit?
360            processed_input.boxed()
361        } else {
362            let labels = [
363                &actor_id.to_string(),
364                &sink_id.to_string(),
365                self.sink_param.sink_name.as_str(),
366            ];
367            let log_store_first_write_epoch = GLOBAL_SINK_METRICS
368                .log_store_first_write_epoch
369                .with_guarded_label_values(&labels);
370            let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
371                .log_store_latest_write_epoch
372                .with_guarded_label_values(&labels);
373            let log_store_write_rows = GLOBAL_SINK_METRICS
374                .log_store_write_rows
375                .with_guarded_label_values(&labels);
376            let log_writer_metrics = LogWriterMetrics {
377                log_store_first_write_epoch,
378                log_store_latest_write_epoch,
379                log_store_write_rows,
380            };
381
382            let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
383            // Init the rate limit
384            rate_limit_tx.send(self.rate_limit.into()).unwrap();
385
386            let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
387
388            self.log_store_factory
389                .build()
390                .map(move |(log_reader, log_writer)| {
391                    let write_log_stream = Self::execute_write_log(
392                        processed_input,
393                        log_writer.monitored(log_writer_metrics),
394                        actor_id,
395                        fragment_id,
396                        sink_id,
397                        rate_limit_tx,
398                        rebuild_sink_tx,
399                    );
400
401                    let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
402                        let consume_log_stream = Self::execute_consume_log(
403                            *sink,
404                            log_reader,
405                            self.input_columns,
406                            self.sink_param,
407                            self.sink_writer_param,
408                            self.non_append_only_behavior,
409                            self.actor_context,
410                            rate_limit_rx,
411                            rebuild_sink_rx,
412                        )
413                        .instrument_await(
414                            await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
415                        )
416                        .map_ok(|never| never); // unify return type to `Message`
417
418                        consume_log_stream.boxed()
419                    });
420                    select(consume_log_stream_future.into_stream(), write_log_stream)
421                })
422                .into_stream()
423                .flatten()
424                .boxed()
425        }
426    }
427
428    #[try_stream(ok = Message, error = StreamExecutorError)]
429    async fn execute_write_log<W: LogWriter>(
430        input: impl MessageStream,
431        mut log_writer: W,
432        actor_id: ActorId,
433        fragment_id: FragmentId,
434        sink_id: SinkId,
435        rate_limit_tx: UnboundedSender<RateLimit>,
436        rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
437    ) {
438        pin_mut!(input);
439        let barrier = expect_first_barrier(&mut input).await?;
440        let epoch_pair = barrier.epoch;
441        let is_pause_on_startup = barrier.is_pause_on_startup();
442        // Propagate the first barrier
443        yield Message::Barrier(barrier);
444
445        log_writer.init(epoch_pair, is_pause_on_startup).await?;
446
447        let mut is_paused = false;
448
449        #[for_await]
450        for msg in input {
451            match msg? {
452                Message::Watermark(w) => yield Message::Watermark(w),
453                Message::Chunk(chunk) => {
454                    assert!(
455                        !is_paused,
456                        "Actor {actor_id} should not receive any data after pause"
457                    );
458                    log_writer.write_chunk(chunk.clone()).await?;
459                    yield Message::Chunk(chunk);
460                }
461                Message::Barrier(barrier) => {
462                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
463                    let schema_change = barrier.as_sink_schema_change(sink_id);
464                    if let Some(schema_change) = &schema_change {
465                        info!(?schema_change, %sink_id, "sink receive schema change");
466                    }
467                    let post_flush = log_writer
468                        .flush_current_epoch(
469                            barrier.epoch.curr,
470                            FlushCurrentEpochOptions {
471                                is_checkpoint: barrier.kind.is_checkpoint(),
472                                new_vnode_bitmap: update_vnode_bitmap.clone(),
473                                is_stop: barrier.is_stop(actor_id),
474                                schema_change,
475                            },
476                        )
477                        .await?;
478
479                    let mutation = barrier.mutation.clone();
480                    yield Message::Barrier(barrier);
481                    if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
482                        && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
483                    {
484                        let (tx, rx) = oneshot::channel();
485                        rebuild_sink_tx
486                            .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
487                            .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
488                        rx.await
489                            .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
490                    }
491                    post_flush.post_yield_barrier().await?;
492
493                    if let Some(mutation) = mutation.as_deref() {
494                        match mutation {
495                            Mutation::Pause => {
496                                log_writer.pause()?;
497                                is_paused = true;
498                            }
499                            Mutation::Resume => {
500                                log_writer.resume()?;
501                                is_paused = false;
502                            }
503                            Mutation::Throttle(fragment_to_apply) => {
504                                if let Some(entry) = fragment_to_apply.get(&fragment_id)
505                                    && entry.throttle_type() == ThrottleType::Sink
506                                {
507                                    tracing::info!(
508                                        rate_limit = entry.rate_limit,
509                                        "received sink rate limit on actor {actor_id}"
510                                    );
511                                    if let Err(e) = rate_limit_tx.send(entry.rate_limit.into()) {
512                                        error!(
513                                            error = %e.as_report(),
514                                            "fail to send sink rate limit update"
515                                        );
516                                        return Err(StreamExecutorError::from(
517                                            e.to_report_string(),
518                                        ));
519                                    }
520                                }
521                            }
522                            Mutation::ConnectorPropsChange(config) => {
523                                if let Some(map) = config.get(&sink_id.as_raw_id())
524                                    && let Err(e) = rebuild_sink_tx
525                                        .send(RebuildSinkMessage::UpdateConfig(map.clone()))
526                                {
527                                    error!(
528                                        error = %e.as_report(),
529                                        "fail to send sink alter props"
530                                    );
531                                    return Err(StreamExecutorError::from(e.to_report_string()));
532                                }
533                            }
534                            _ => (),
535                        }
536                    }
537                }
538            }
539        }
540    }
541
542    #[expect(clippy::too_many_arguments)]
543    #[try_stream(ok = Message, error = StreamExecutorError)]
544    async fn process_msg(
545        input: impl MessageStream,
546        sink_type: SinkType,
547        stream_key: StreamKey,
548        chunk_size: usize,
549        input_data_types: Vec<DataType>,
550        input_compact_ib: InconsistencyBehavior,
551        downstream_pk: Option<Vec<usize>>,
552        non_append_only_behavior: Option<NonAppendOnlyBehavior>,
553        sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
554        skip_compact: bool,
555    ) {
556        // To reorder records, we need to buffer chunks of the entire epoch.
557        if let Some(b) = non_append_only_behavior
558            && b.should_reorder_records()
559        {
560            assert_matches!(sink_type, SinkType::Upsert | SinkType::Retract);
561
562            let mut chunk_buffer = EstimatedVec::new();
563            let mut watermark: Option<super::Watermark> = None;
564            #[for_await]
565            for msg in input {
566                match msg? {
567                    Message::Watermark(w) => watermark = Some(w),
568                    Message::Chunk(c) => {
569                        chunk_buffer.push(c);
570                        sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
571                    }
572                    Message::Barrier(barrier) => {
573                        let chunks = mem::take(&mut chunk_buffer).into_inner();
574
575                        // 1. Compact the chunk based on the **stream key**, so that we have at most 2 rows for each
576                        //    stream key. Then, move all delete records to the front.
577                        let mut delete_chunks = vec![];
578                        let mut insert_chunks = vec![];
579
580                        for c in dispatch_output_kind!(sink_type, KIND, {
581                            StreamChunkCompactor::new(stream_key.clone(), chunks)
582                                .into_compacted_chunks_inline::<KIND>(input_compact_ib)
583                        }) {
584                            let chunk = force_delete_only(c.clone());
585                            if chunk.cardinality() > 0 {
586                                delete_chunks.push(chunk);
587                            }
588                            let chunk = force_append_only(c);
589                            if chunk.cardinality() > 0 {
590                                insert_chunks.push(chunk);
591                            }
592                        }
593                        let chunks = delete_chunks
594                            .into_iter()
595                            .chain(insert_chunks.into_iter())
596                            .collect();
597
598                        // 2. If user specifies a primary key, compact the chunk based on the **downstream pk**
599                        //    to eliminate any unnecessary updates to external systems. This also rewrites the
600                        //    `DELETE` and `INSERT` operations on the same key into `UPDATE` operations, which
601                        //    usually have more efficient implementation.
602                        if let Some(downstream_pk) = &downstream_pk {
603                            let chunks = dispatch_output_kind!(sink_type, KIND, {
604                                StreamChunkCompactor::new(downstream_pk.clone(), chunks)
605                                    .into_compacted_chunks_reconstructed::<KIND>(
606                                        chunk_size,
607                                        input_data_types.clone(),
608                                        // When compacting based on user provided primary key, we should never panic
609                                        // on inconsistency in case the user provided primary key is not unique.
610                                        InconsistencyBehavior::Warn,
611                                    )
612                            });
613                            for c in chunks {
614                                yield Message::Chunk(c);
615                            }
616                        } else {
617                            let mut chunk_builder =
618                                StreamChunkBuilder::new(chunk_size, input_data_types.clone());
619                            for chunk in chunks {
620                                for (op, row) in chunk.rows() {
621                                    if let Some(c) = chunk_builder.append_row(op, row) {
622                                        yield Message::Chunk(c);
623                                    }
624                                }
625                            }
626
627                            if let Some(c) = chunk_builder.take() {
628                                yield Message::Chunk(c);
629                            }
630                        };
631
632                        // 3. Forward watermark and barrier.
633                        if let Some(w) = mem::take(&mut watermark) {
634                            yield Message::Watermark(w)
635                        }
636                        yield Message::Barrier(barrier);
637                    }
638                }
639            }
640        } else {
641            // In this branch, we don't need to reorder records, either because the stream key matches
642            // the downstream pk, or the sink is append-only.
643            #[for_await]
644            for msg in input {
645                match msg? {
646                    Message::Watermark(w) => yield Message::Watermark(w),
647                    Message::Chunk(mut chunk) => {
648                        // Compact the chunk to eliminate any unnecessary updates to external systems.
649                        // This should be performed against the downstream pk, not the stream key, to
650                        // ensure correct retract/upsert semantics from the downstream's perspective.
651                        if !sink_type.is_append_only()
652                            && let Some(downstream_pk) = &downstream_pk
653                        {
654                            if skip_compact {
655                                // We can only skip compaction if the keys are exactly the same, not just
656                                // matching by being a subset.
657                                assert_eq!(&stream_key, downstream_pk);
658                            } else {
659                                chunk = dispatch_output_kind!(sink_type, KIND, {
660                                    compact_chunk_inline::<KIND>(
661                                        chunk,
662                                        downstream_pk,
663                                        // When compacting based on user provided primary key, we should never panic
664                                        // on inconsistency in case the user provided primary key is not unique.
665                                        InconsistencyBehavior::Warn,
666                                    )
667                                });
668                            }
669                        }
670                        yield Message::Chunk(chunk);
671                    }
672                    Message::Barrier(barrier) => {
673                        yield Message::Barrier(barrier);
674                    }
675                }
676            }
677        }
678    }
679
680    #[expect(clippy::too_many_arguments)]
681    async fn execute_consume_log<S: Sink, R: LogReader>(
682        mut sink: S,
683        log_reader: R,
684        columns: Vec<ColumnCatalog>,
685        mut sink_param: SinkParam,
686        mut sink_writer_param: SinkWriterParam,
687        non_append_only_behavior: Option<NonAppendOnlyBehavior>,
688        actor_context: ActorContextRef,
689        rate_limit_rx: UnboundedReceiver<RateLimit>,
690        mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
691    ) -> StreamExecutorResult<!> {
692        let visible_columns = columns
693            .iter()
694            .enumerate()
695            .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
696            .collect_vec();
697
698        let labels = [
699            &actor_context.id.to_string(),
700            sink_writer_param.connector.as_str(),
701            &sink_writer_param.sink_id.to_string(),
702            sink_writer_param.sink_name.as_str(),
703        ];
704        let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
705            .log_store_reader_wait_new_future_duration_ns
706            .with_guarded_label_values(&labels);
707        let log_store_read_rows = GLOBAL_SINK_METRICS
708            .log_store_read_rows
709            .with_guarded_label_values(&labels);
710        let log_store_read_bytes = GLOBAL_SINK_METRICS
711            .log_store_read_bytes
712            .with_guarded_label_values(&labels);
713        let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
714            .log_store_latest_read_epoch
715            .with_guarded_label_values(&labels);
716        let metrics = LogReaderMetrics {
717            log_store_latest_read_epoch,
718            log_store_read_rows,
719            log_store_read_bytes,
720            log_store_reader_wait_new_future_duration_ns,
721        };
722
723        let downstream_pk = sink_param.downstream_pk.clone();
724
725        let mut log_reader = log_reader
726            .transform_chunk(move |chunk| {
727                let chunk = if let Some(b) = non_append_only_behavior
728                    && b.should_compact_in_log_reader()
729                {
730                    // This guarantees that user has specified a `downstream_pk`.
731                    let downstream_pk = downstream_pk.as_ref().unwrap();
732                    dispatch_output_kind!(sink_param.sink_type, KIND, {
733                        compact_chunk_inline::<KIND>(
734                            chunk,
735                            downstream_pk,
736                            // When compacting based on user provided primary key, we should never panic
737                            // on inconsistency in case the user provided primary key is not unique.
738                            InconsistencyBehavior::Warn,
739                        )
740                    })
741                } else {
742                    chunk
743                };
744                if visible_columns.len() != columns.len() {
745                    // Do projection here because we may have columns that aren't visible to
746                    // the downstream.
747                    chunk.project(&visible_columns)
748                } else {
749                    chunk
750                }
751            })
752            .monitored(metrics)
753            .rate_limited(rate_limit_rx);
754
755        log_reader.init().await?;
756        loop {
757            let future = async {
758                loop {
759                    let Err(e) = sink
760                        .new_log_sinker(sink_writer_param.clone())
761                        .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
762                        .await;
763                    GLOBAL_ERROR_METRICS.user_sink_error.report([
764                        "sink_executor_error".to_owned(),
765                        sink_param.sink_id.to_string(),
766                        sink_param.sink_name.clone(),
767                        actor_context.fragment_id.to_string(),
768                    ]);
769
770                    if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
771                        meta_client
772                            .add_sink_fail_evet_log(
773                                sink_writer_param.sink_id,
774                                sink_writer_param.sink_name.clone(),
775                                sink_writer_param.connector.clone(),
776                                e.to_report_string(),
777                            )
778                            .await;
779                    }
780
781                    if F::ALLOW_REWIND {
782                        match log_reader.rewind().await {
783                            Ok(()) => {
784                                error!(
785                                    error = %e.as_report(),
786                                    executor_id = %sink_writer_param.executor_id,
787                                    sink_id = %sink_param.sink_id,
788                                    "reset log reader stream successfully after sink error"
789                                );
790                                Ok(())
791                            }
792                            Err(rewind_err) => {
793                                error!(
794                                    error = %rewind_err.as_report(),
795                                    "fail to rewind log reader"
796                                );
797                                Err(e)
798                            }
799                        }
800                    } else {
801                        Err(e)
802                    }
803                    .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
804                }
805            };
806            select! {
807                result = future => {
808                    let Err(e): StreamExecutorResult<!> = result;
809                    return Err(e);
810                }
811                result = rebuild_sink_rx.recv() => {
812                    match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
813                        RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
814                            sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
815                            if notify.send(()).is_err() {
816                                warn!("failed to notify rebuild sink");
817                            }
818                            log_reader.init().await?;
819                        },
820                        RebuildSinkMessage::UpdateConfig(config) => {
821                            if !sink_config_has_changes(&sink_param.properties, &config) {
822                                info!(
823                                    executor_id = %sink_writer_param.executor_id,
824                                    sink_id = %sink_param.sink_id,
825                                    "skip alter sink config because properties are unchanged"
826                                );
827                                Ok(())
828                            } else if F::ALLOW_REWIND {
829                                match log_reader.rewind().await {
830                                    Ok(()) => {
831                                        sink_param.properties.extend(config.into_iter());
832                                        sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
833                                        info!(
834                                            executor_id = %sink_writer_param.executor_id,
835                                            sink_id = %sink_param.sink_id,
836                                            "alter sink config successfully with rewind"
837                                        );
838                                        Ok(())
839                                    }
840                                    Err(rewind_err) => {
841                                        error!(
842                                            error = %rewind_err.as_report(),
843                                            "fail to rewind log reader for alter sink config "
844                                        );
845                                        Err(anyhow!("fail to rewind log after alter table").into())
846                                    }
847                                }
848                            } else {
849                                sink_param.properties.extend(config.into_iter());
850                                sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
851                                Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
852                            }
853                            .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
854                        },
855                    }
856                }
857            }
858        }
859    }
860}
861
862enum RebuildSinkMessage {
863    RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
864    UpdateConfig(HashMap<String, String>),
865}
866
867fn sink_config_has_changes(
868    current: &BTreeMap<String, String>,
869    incoming: &HashMap<String, String>,
870) -> bool {
871    incoming
872        .iter()
873        .any(|(key, value)| current.get(key) != Some(value))
874}
875
876impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
877    fn execute(self: Box<Self>) -> BoxedMessageStream {
878        self.execute_inner()
879    }
880}
881
882#[cfg(test)]
883mod test {
884    use risingwave_common::catalog::{ColumnDesc, ColumnId};
885    use risingwave_common::util::epoch::test_epoch;
886    use risingwave_connector::sink::build_sink;
887
888    use super::*;
889    use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
890    use crate::executor::test_utils::*;
891
892    #[test]
893    fn test_sink_config_has_changes() {
894        let current = BTreeMap::from([
895            ("connector".to_owned(), "blackhole".to_owned()),
896            ("commit_checkpoint_interval".to_owned(), "1".to_owned()),
897        ]);
898
899        assert!(!sink_config_has_changes(
900            &current,
901            &HashMap::from([("commit_checkpoint_interval".to_owned(), "1".to_owned())])
902        ));
903        assert!(sink_config_has_changes(
904            &current,
905            &HashMap::from([("commit_checkpoint_interval".to_owned(), "2".to_owned())])
906        ));
907        assert!(sink_config_has_changes(
908            &current,
909            &HashMap::from([("force_append_only".to_owned(), "true".to_owned())])
910        ));
911    }
912
913    #[tokio::test]
914    async fn test_force_append_only_sink() {
915        use risingwave_common::array::StreamChunkTestExt;
916        use risingwave_common::array::stream_chunk::StreamChunk;
917        use risingwave_common::types::DataType;
918
919        use crate::executor::Barrier;
920
921        let properties = maplit::btreemap! {
922            "connector".into() => "blackhole".into(),
923            "type".into() => "append-only".into(),
924            "force_append_only".into() => "true".into()
925        };
926
927        // We have two visible columns and one hidden column. The hidden column will be pruned out
928        // within the sink executor.
929        let columns = vec![
930            ColumnCatalog {
931                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
932                is_hidden: false,
933            },
934            ColumnCatalog {
935                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
936                is_hidden: false,
937            },
938            ColumnCatalog {
939                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
940                is_hidden: true,
941            },
942        ];
943        let schema: Schema = columns
944            .iter()
945            .map(|column| Field::from(column.column_desc.clone()))
946            .collect();
947        let stream_key = vec![0];
948
949        let source = MockSource::with_messages(vec![
950            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
951            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
952                " I I I
953                    + 3 2 1",
954            ))),
955            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
956            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
957                "  I I I
958                    U- 3 2 1
959                    U+ 3 4 1
960                     + 5 6 7",
961            ))),
962            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
963                " I I I
964                    - 5 6 7",
965            ))),
966        ])
967        .into_executor(schema.clone(), stream_key.clone());
968
969        let sink_param = SinkParam {
970            sink_id: 0.into(),
971            sink_name: "test".into(),
972            properties,
973
974            columns: columns
975                .iter()
976                .filter(|col| !col.is_hidden)
977                .map(|col| col.column_desc.clone())
978                .collect(),
979            downstream_pk: Some(stream_key.clone()),
980            sink_type: SinkType::AppendOnly,
981            ignore_delete: true,
982            format_desc: None,
983            db_name: "test".into(),
984            sink_from_name: "test".into(),
985        };
986
987        let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
988
989        let sink = build_sink(sink_param.clone()).unwrap();
990
991        let sink_executor = SinkExecutor::new(
992            ActorContext::for_test(0),
993            info,
994            source,
995            SinkWriterParam::for_test(),
996            sink,
997            sink_param,
998            columns.clone(),
999            BoundedInMemLogStoreFactory::for_test(1),
1000            1024,
1001            vec![DataType::Int32, DataType::Int32, DataType::Int32],
1002            None,
1003        )
1004        .unwrap();
1005
1006        let mut executor = sink_executor.boxed().execute();
1007
1008        // Barrier message.
1009        executor.next().await.unwrap().unwrap();
1010
1011        let chunk_msg = executor.next().await.unwrap().unwrap();
1012        assert_eq!(
1013            chunk_msg.into_chunk().unwrap().compact_vis(),
1014            StreamChunk::from_pretty(
1015                " I I I
1016                + 3 2 1",
1017            )
1018        );
1019
1020        // Barrier message.
1021        executor.next().await.unwrap().unwrap();
1022
1023        let chunk_msg = executor.next().await.unwrap().unwrap();
1024        assert_eq!(
1025            chunk_msg.into_chunk().unwrap().compact_vis(),
1026            StreamChunk::from_pretty(
1027                " I I I
1028                + 3 4 1
1029                + 5 6 7",
1030            )
1031        );
1032
1033        // Should not receive the third stream chunk message because the force-append-only sink
1034        // executor will drop all DELETE messages.
1035
1036        // The last barrier message.
1037        executor.next().await.unwrap().unwrap();
1038    }
1039
1040    #[tokio::test]
1041    async fn stream_key_sink_pk_mismatch_upsert() {
1042        stream_key_sink_pk_mismatch(SinkType::Upsert).await;
1043    }
1044
1045    #[tokio::test]
1046    async fn stream_key_sink_pk_mismatch_retract() {
1047        stream_key_sink_pk_mismatch(SinkType::Retract).await;
1048    }
1049
1050    async fn stream_key_sink_pk_mismatch(sink_type: SinkType) {
1051        use risingwave_common::array::StreamChunkTestExt;
1052        use risingwave_common::array::stream_chunk::StreamChunk;
1053        use risingwave_common::types::DataType;
1054
1055        use crate::executor::Barrier;
1056
1057        let properties = maplit::btreemap! {
1058            "connector".into() => "blackhole".into(),
1059        };
1060
1061        // We have two visible columns and one hidden column. The hidden column will be pruned out
1062        // within the sink executor.
1063        let columns = vec![
1064            ColumnCatalog {
1065                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1066                is_hidden: false,
1067            },
1068            ColumnCatalog {
1069                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1070                is_hidden: false,
1071            },
1072            ColumnCatalog {
1073                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1074                is_hidden: true,
1075            },
1076        ];
1077        let schema: Schema = columns
1078            .iter()
1079            .map(|column| Field::from(column.column_desc.clone()))
1080            .collect();
1081
1082        let source = MockSource::with_messages(vec![
1083            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1084            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1085                " I I I
1086                    + 1 1 10",
1087            ))),
1088            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1089            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1090                " I I I
1091                    + 1 3 30",
1092            ))),
1093            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1094                " I I I
1095                    + 1 2 20
1096                    - 1 2 20",
1097            ))),
1098            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1099                " I I I
1100                    - 1 1 10
1101                    + 1 1 40",
1102            ))),
1103            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1104        ])
1105        .into_executor(schema.clone(), vec![0, 1]);
1106
1107        let sink_param = SinkParam {
1108            sink_id: 0.into(),
1109            sink_name: "test".into(),
1110            properties,
1111
1112            columns: columns
1113                .iter()
1114                .filter(|col| !col.is_hidden)
1115                .map(|col| col.column_desc.clone())
1116                .collect(),
1117            downstream_pk: Some(vec![0]),
1118            sink_type,
1119            ignore_delete: false,
1120            format_desc: None,
1121            db_name: "test".into(),
1122            sink_from_name: "test".into(),
1123        };
1124
1125        let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1126
1127        let sink = build_sink(sink_param.clone()).unwrap();
1128
1129        let sink_executor = SinkExecutor::new(
1130            ActorContext::for_test(0),
1131            info,
1132            source,
1133            SinkWriterParam::for_test(),
1134            sink,
1135            sink_param,
1136            columns.clone(),
1137            BoundedInMemLogStoreFactory::for_test(1),
1138            1024,
1139            vec![DataType::Int64, DataType::Int64, DataType::Int64],
1140            None,
1141        )
1142        .unwrap();
1143
1144        let mut executor = sink_executor.boxed().execute();
1145
1146        // Barrier message.
1147        executor.next().await.unwrap().unwrap();
1148
1149        let chunk_msg = executor.next().await.unwrap().unwrap();
1150        assert_eq!(
1151            chunk_msg.into_chunk().unwrap().compact_vis(),
1152            StreamChunk::from_pretty(
1153                " I I I
1154                + 1 1 10",
1155            )
1156        );
1157
1158        // Barrier message.
1159        executor.next().await.unwrap().unwrap();
1160
1161        let chunk_msg = executor.next().await.unwrap().unwrap();
1162        let expected = match sink_type {
1163            SinkType::Retract => StreamChunk::from_pretty(
1164                " I I I
1165                U- 1 1 10
1166                U+ 1 1 40",
1167            ),
1168            SinkType::Upsert => StreamChunk::from_pretty(
1169                " I I I
1170                + 1 1 40", // For upsert format, there won't be `U- 1 1 10`.
1171            ),
1172            _ => unreachable!(),
1173        };
1174        assert_eq!(chunk_msg.into_chunk().unwrap().compact_vis(), expected);
1175
1176        // The last barrier message.
1177        executor.next().await.unwrap().unwrap();
1178    }
1179
1180    #[tokio::test]
1181    async fn test_empty_barrier_sink() {
1182        use risingwave_common::types::DataType;
1183
1184        use crate::executor::Barrier;
1185
1186        let properties = maplit::btreemap! {
1187            "connector".into() => "blackhole".into(),
1188            "type".into() => "append-only".into(),
1189            "force_append_only".into() => "true".into()
1190        };
1191        let columns = vec![
1192            ColumnCatalog {
1193                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1194                is_hidden: false,
1195            },
1196            ColumnCatalog {
1197                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1198                is_hidden: false,
1199            },
1200        ];
1201        let schema: Schema = columns
1202            .iter()
1203            .map(|column| Field::from(column.column_desc.clone()))
1204            .collect();
1205        let stream_key = vec![0];
1206
1207        let source = MockSource::with_messages(vec![
1208            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1209            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1210            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1211        ])
1212        .into_executor(schema.clone(), stream_key.clone());
1213
1214        let sink_param = SinkParam {
1215            sink_id: 0.into(),
1216            sink_name: "test".into(),
1217            properties,
1218
1219            columns: columns
1220                .iter()
1221                .filter(|col| !col.is_hidden)
1222                .map(|col| col.column_desc.clone())
1223                .collect(),
1224            downstream_pk: Some(stream_key.clone()),
1225            sink_type: SinkType::AppendOnly,
1226            ignore_delete: true,
1227            format_desc: None,
1228            db_name: "test".into(),
1229            sink_from_name: "test".into(),
1230        };
1231
1232        let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
1233
1234        let sink = build_sink(sink_param.clone()).unwrap();
1235
1236        let sink_executor = SinkExecutor::new(
1237            ActorContext::for_test(0),
1238            info,
1239            source,
1240            SinkWriterParam::for_test(),
1241            sink,
1242            sink_param,
1243            columns,
1244            BoundedInMemLogStoreFactory::for_test(1),
1245            1024,
1246            vec![DataType::Int64, DataType::Int64],
1247            None,
1248        )
1249        .unwrap();
1250
1251        let mut executor = sink_executor.boxed().execute();
1252
1253        // Barrier message.
1254        assert_eq!(
1255            executor.next().await.unwrap().unwrap(),
1256            Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1257        );
1258
1259        // Barrier message.
1260        assert_eq!(
1261            executor.next().await.unwrap().unwrap(),
1262            Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1263        );
1264
1265        // The last barrier message.
1266        assert_eq!(
1267            executor.next().await.unwrap().unwrap(),
1268            Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1269        );
1270    }
1271
1272    #[tokio::test]
1273    async fn test_force_compaction() {
1274        use risingwave_common::array::StreamChunkTestExt;
1275        use risingwave_common::array::stream_chunk::StreamChunk;
1276        use risingwave_common::types::DataType;
1277
1278        use crate::executor::Barrier;
1279
1280        let properties = maplit::btreemap! {
1281            "connector".into() => "blackhole".into(),
1282            "force_compaction".into() => "true".into()
1283        };
1284
1285        // We have two visible columns and one hidden column. The hidden column will be pruned out
1286        // within the sink executor.
1287        let columns = vec![
1288            ColumnCatalog {
1289                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1290                is_hidden: false,
1291            },
1292            ColumnCatalog {
1293                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1294                is_hidden: false,
1295            },
1296            ColumnCatalog {
1297                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1298                is_hidden: true,
1299            },
1300        ];
1301        let schema: Schema = columns
1302            .iter()
1303            .map(|column| Field::from(column.column_desc.clone()))
1304            .collect();
1305
1306        let source = MockSource::with_messages(vec![
1307            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1308            Message::Chunk(StreamChunk::from_pretty(
1309                " I I I
1310                    + 1 1 10",
1311            )),
1312            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1313            Message::Chunk(StreamChunk::from_pretty(
1314                " I I I
1315                    + 1 3 30",
1316            )),
1317            Message::Chunk(StreamChunk::from_pretty(
1318                " I I I
1319                    + 1 2 20
1320                    - 1 2 20
1321                    + 1 4 10",
1322            )),
1323            Message::Chunk(StreamChunk::from_pretty(
1324                " I I I
1325                    - 1 1 10
1326                    + 1 1 40",
1327            )),
1328            Message::Chunk(StreamChunk::from_pretty(
1329                " I I I
1330                    - 1 4 30",
1331            )),
1332            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1333        ])
1334        .into_executor(schema.clone(), vec![0, 1]);
1335
1336        let sink_param = SinkParam {
1337            sink_id: 0.into(),
1338            sink_name: "test".into(),
1339            properties,
1340
1341            columns: columns
1342                .iter()
1343                .filter(|col| !col.is_hidden)
1344                .map(|col| col.column_desc.clone())
1345                .collect(),
1346            downstream_pk: Some(vec![0, 1]),
1347            sink_type: SinkType::Upsert,
1348            ignore_delete: false,
1349            format_desc: None,
1350            db_name: "test".into(),
1351            sink_from_name: "test".into(),
1352        };
1353
1354        let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1355
1356        let sink = build_sink(sink_param.clone()).unwrap();
1357
1358        let sink_executor = SinkExecutor::new(
1359            ActorContext::for_test(0),
1360            info,
1361            source,
1362            SinkWriterParam::for_test(),
1363            sink,
1364            sink_param,
1365            columns.clone(),
1366            BoundedInMemLogStoreFactory::for_test(1),
1367            1024,
1368            vec![DataType::Int64, DataType::Int64, DataType::Int64],
1369            None,
1370        )
1371        .unwrap();
1372
1373        let mut executor = sink_executor.boxed().execute();
1374
1375        // Barrier message.
1376        executor.next().await.unwrap().unwrap();
1377
1378        let chunk_msg = executor.next().await.unwrap().unwrap();
1379        assert_eq!(
1380            chunk_msg.into_chunk().unwrap().compact_vis(),
1381            StreamChunk::from_pretty(
1382                " I I I
1383                + 1 1 10",
1384            )
1385        );
1386
1387        // Barrier message.
1388        executor.next().await.unwrap().unwrap();
1389
1390        let chunk_msg = executor.next().await.unwrap().unwrap();
1391        assert_eq!(
1392            chunk_msg.into_chunk().unwrap().compact_vis(),
1393            StreamChunk::from_pretty(
1394                " I I I
1395                + 1 3 30
1396                + 1 1 40",
1397            )
1398        );
1399
1400        // The last barrier message.
1401        executor.next().await.unwrap().unwrap();
1402    }
1403}