Skip to main content

risingwave_stream/executor/
sink.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::{assert_matches, mem};
17
18use anyhow::anyhow;
19use futures::stream::select;
20use futures::{FutureExt, TryFutureExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::array::Op;
23use risingwave_common::array::stream_chunk::StreamChunkMut;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{ColumnCatalog, Field};
26use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
27use risingwave_common_estimate_size::EstimateSize;
28use risingwave_common_estimate_size::collections::EstimatedVec;
29use risingwave_common_rate_limit::RateLimit;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::catalog::{SinkId, SinkType};
32use risingwave_connector::sink::log_store::{
33    FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
34    LogWriter, LogWriterExt, LogWriterMetrics,
35};
36use risingwave_connector::sink::{
37    GLOBAL_SINK_METRICS, LogSinker, SINK_USER_FORCE_COMPACTION, Sink, SinkImpl, SinkParam,
38    SinkWriterParam,
39};
40use risingwave_pb::common::ThrottleType;
41use risingwave_pb::id::FragmentId;
42use risingwave_pb::stream_plan::stream_node::StreamKind;
43use thiserror_ext::AsReport;
44use tokio::select;
45use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
46use tokio::sync::oneshot;
47
48use crate::common::change_buffer::{OutputKind, output_kind};
49use crate::common::compact_chunk::{
50    InconsistencyBehavior, StreamChunkCompactor, compact_chunk_inline,
51};
52use crate::executor::prelude::*;
53pub struct SinkExecutor<F: LogStoreFactory> {
54    actor_context: ActorContextRef,
55    info: ExecutorInfo,
56    input: Executor,
57    sink: SinkImpl,
58    input_columns: Vec<ColumnCatalog>,
59    sink_param: SinkParam,
60    log_store_factory: F,
61    sink_writer_param: SinkWriterParam,
62    chunk_size: usize,
63    input_data_types: Vec<DataType>,
64    non_append_only_behavior: Option<NonAppendOnlyBehavior>,
65    rate_limit: Option<u32>,
66}
67
68// Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT.
69fn force_append_only(c: StreamChunk) -> StreamChunk {
70    let mut c: StreamChunkMut = c.into();
71    for (_, mut r) in c.to_rows_mut() {
72        match r.op() {
73            Op::Insert => {}
74            Op::Delete | Op::UpdateDelete => r.set_vis(false),
75            Op::UpdateInsert => r.set_op(Op::Insert),
76        }
77    }
78    c.into()
79}
80
81// Drop all the INSERT messages in this chunk and convert UPDATE DELETE into DELETE.
82fn force_delete_only(c: StreamChunk) -> StreamChunk {
83    let mut c: StreamChunkMut = c.into();
84    for (_, mut r) in c.to_rows_mut() {
85        match r.op() {
86            Op::Delete => {}
87            Op::Insert | Op::UpdateInsert => r.set_vis(false),
88            Op::UpdateDelete => r.set_op(Op::Delete),
89        }
90    }
91    c.into()
92}
93
94/// When the sink is non-append-only, i.e. upsert or retract, we need to do some extra work for
95/// correctness and performance.
96#[derive(Clone, Copy, Debug)]
97struct NonAppendOnlyBehavior {
98    /// Whether the user specifies a primary key for the sink, and it matches the derived stream
99    /// key of the stream.
100    ///
101    /// By matching, we mean that stream key is a subset of the downstream pk.
102    pk_specified_and_matched: bool,
103    /// Whether the user forces buffering all chunks between two barriers.
104    force_compaction: bool,
105}
106
107impl NonAppendOnlyBehavior {
108    /// NOTE(kwannoel):
109    ///
110    /// After the optimization in <https://github.com/risingwavelabs/risingwave/pull/12250>,
111    /// `DELETE`s will be sequenced before `INSERT`s in JDBC sinks and PG rust sink.
112    /// There's a risk that adjacent chunks with `DELETE`s on the same PK will get
113    /// merged into a single chunk, since the logstore format doesn't preserve chunk
114    /// boundaries. Then we will have double `DELETE`s followed by unspecified sequence
115    /// of `INSERT`s, and lead to inconsistent data downstream.
116    ///
117    /// We only need to do the compaction for non-append-only sinks, when the upstream and
118    /// downstream PKs are matched. When the upstream and downstream PKs are not matched,
119    /// we will buffer the chunks between two barriers, so the compaction is not needed,
120    /// since the barriers will preserve chunk boundaries.
121    ///
122    /// When `force_compaction` is true, we also skip compaction here, since the buffering
123    /// will also make compaction.
124    ///
125    /// When the sink is an append-only sink, it is either `force_append_only` or
126    /// `append_only`, we should only append to downstream, so there should not be any
127    /// overlapping keys.
128    fn should_compact_in_log_reader(self) -> bool {
129        self.pk_specified_and_matched && !self.force_compaction
130    }
131
132    /// When stream key is different from the user defined primary key columns for sinks.
133    /// The operations could be out of order.
134    ///
135    /// For example, we have a stream with derived stream key `a, b` and user-specified sink
136    /// primary key `a`. Assume that we have `(1, 1)` in the table. Then, we perform two `UPDATE`
137    /// operations:
138    ///
139    /// ```text
140    /// UPDATE SET b = 2 WHERE a = 1 ... which issues:
141    ///   - (1, 1)
142    ///   + (1, 2)
143    ///
144    /// UPDATE SET b = 3 WHERE a = 1 ... which issues:
145    ///   - (1, 2)
146    ///   + (1, 3)
147    /// ```
148    ///
149    /// When these changes go into streaming pipeline, they could be shuffled to different parallelism
150    /// (actor), given that they are under different stream keys.
151    ///
152    /// ```text
153    /// Actor 1:
154    /// - (1, 1)
155    ///
156    /// Actor 2:
157    /// + (1, 2)
158    /// - (1, 2)
159    ///
160    /// Actor 3:
161    /// + (1, 3)
162    /// ```
163    ///
164    /// When these records are merged back into sink actor, we may get the records from different
165    /// parallelism in arbitrary order, like:
166    ///
167    /// ```text
168    /// + (1, 2) -- Actor 2, first row
169    /// + (1, 3) -- Actor 3
170    /// - (1, 1) -- Actor 1
171    /// - (1, 2) -- Actor 2, second row
172    /// ```
173    ///
174    /// Note that in terms of stream key (`a, b`), the operations in the order above are completely
175    /// correct, because we are operating on 3 different rows. However, in terms of user defined sink
176    /// primary key `a`, we're violating the unique constraint all the time.
177    ///
178    /// Therefore, in this case, we have to do additional reordering in the sink executor per barrier.
179    /// Specifically, we need to:
180    ///
181    /// First, compact all the changes with the stream key, so we have:
182    /// ```text
183    /// + (1, 3)
184    /// - (1, 1)
185    /// ```
186    ///
187    /// Then, sink all the delete events before sinking all insert events, so we have:
188    /// ```text
189    /// - (1, 1)
190    /// + (1, 3)
191    /// ```
192    /// Since we've compacted the chunk with the stream key, the `DELETE` records survived must be to
193    /// delete an existing row, so we can safely move them to the front. After the deletion is done,
194    /// we can then safely sink the insert events with uniqueness guarantee.
195    ///
196    /// When `force_compaction` is true, we also perform additional reordering to gain the
197    /// benefits of compaction:
198    /// - reduce the number of output messages;
199    /// - emit at most one update per key within a barrier interval, simplifying downstream logic.
200    fn should_reorder_records(self) -> bool {
201        !self.pk_specified_and_matched || self.force_compaction
202    }
203}
204
205/// Get the output kind for chunk compaction based on the given sink type.
206fn compact_output_kind(sink_type: SinkType) -> OutputKind {
207    match sink_type {
208        SinkType::Upsert => output_kind::UPSERT,
209        SinkType::Retract => output_kind::RETRACT,
210        // There won't be any `Update` or `Delete` in the chunk, so it doesn't matter.
211        SinkType::AppendOnly => output_kind::RETRACT,
212    }
213}
214
215/// Dispatch the code block to different output kinds for chunk compaction based on sink type.
216macro_rules! dispatch_output_kind {
217    ($sink_type:expr, $KIND:ident, $body:tt) => {
218        #[allow(unused_braces)]
219        match compact_output_kind($sink_type) {
220            output_kind::UPSERT => {
221                const KIND: OutputKind = output_kind::UPSERT;
222                $body
223            }
224            output_kind::RETRACT => {
225                const KIND: OutputKind = output_kind::RETRACT;
226                $body
227            }
228        }
229    };
230}
231
232impl<F: LogStoreFactory> SinkExecutor<F> {
233    #[expect(clippy::too_many_arguments)]
234    pub fn new(
235        actor_context: ActorContextRef,
236        info: ExecutorInfo,
237        input: Executor,
238        sink_writer_param: SinkWriterParam,
239        sink: SinkImpl,
240        sink_param: SinkParam,
241        columns: Vec<ColumnCatalog>,
242        log_store_factory: F,
243        chunk_size: usize,
244        input_data_types: Vec<DataType>,
245        rate_limit: Option<u32>,
246    ) -> StreamExecutorResult<Self> {
247        let sink_input_schema: Schema = columns
248            .iter()
249            .map(|column| Field::from(&column.column_desc))
250            .collect();
251
252        if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
253            // Remove the partition column from the schema.
254            assert_eq!(sink_input_schema.data_types(), {
255                let mut data_type = info.schema.data_types();
256                data_type.remove(col_dix);
257                data_type
258            });
259        } else {
260            assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
261        }
262
263        let non_append_only_behavior = if !sink_param.sink_type.is_append_only() {
264            let stream_key = &info.stream_key;
265            let pk_specified_and_matched = (sink_param.downstream_pk.as_ref())
266                .is_some_and(|downstream_pk| stream_key.iter().all(|i| downstream_pk.contains(i)));
267            let force_compaction = sink_param
268                .properties
269                .get(SINK_USER_FORCE_COMPACTION)
270                .map(|v| v.eq_ignore_ascii_case("true"))
271                .unwrap_or(false);
272            Some(NonAppendOnlyBehavior {
273                pk_specified_and_matched,
274                force_compaction,
275            })
276        } else {
277            None
278        };
279
280        tracing::info!(
281            sink_id = %sink_param.sink_id,
282            actor_id = %actor_context.id,
283            ?non_append_only_behavior,
284            "Sink executor info"
285        );
286
287        Ok(Self {
288            actor_context,
289            info,
290            input,
291            sink,
292            input_columns: columns,
293            sink_param,
294            log_store_factory,
295            sink_writer_param,
296            chunk_size,
297            input_data_types,
298            non_append_only_behavior,
299            rate_limit,
300        })
301    }
302
303    fn execute_inner(self) -> BoxedMessageStream {
304        let sink_id = self.sink_param.sink_id;
305        let actor_id = self.actor_context.id;
306        let fragment_id = self.actor_context.fragment_id;
307
308        let stream_key = self.info.stream_key.clone();
309        let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
310            sink_id,
311            actor_id,
312            fragment_id,
313        );
314
315        // When processing upsert stream, we need to tolerate the inconsistency (mismatched `DELETE`
316        // and `INSERT` pairs) when compacting input chunks with derived stream key.
317        let input_compact_ib = if self.input.stream_kind() == StreamKind::Upsert {
318            InconsistencyBehavior::Tolerate
319        } else {
320            InconsistencyBehavior::Panic
321        };
322
323        let input = self.input.execute();
324
325        let input = input.inspect_ok(move |msg| {
326            if let Message::Chunk(c) = msg {
327                metrics.sink_input_row_count.inc_by(c.capacity() as u64);
328                metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
329            }
330        });
331
332        let processed_input = Self::process_msg(
333            input,
334            self.sink_param.sink_type,
335            stream_key,
336            self.chunk_size,
337            self.input_data_types,
338            input_compact_ib,
339            self.sink_param.downstream_pk.clone(),
340            self.non_append_only_behavior,
341            metrics.sink_chunk_buffer_size,
342            self.sink.is_blackhole(), // skip compact for blackhole for better benchmark results
343        );
344
345        let processed_input = if self.sink_param.ignore_delete {
346            // Drop UPDATE/DELETE messages if specified `ignore_delete` (formerly `force_append_only`).
347            processed_input
348                .map_ok(|msg| match msg {
349                    Message::Chunk(chunk) => Message::Chunk(force_append_only(chunk)),
350                    other => other,
351                })
352                .left_stream()
353        } else {
354            processed_input.right_stream()
355        };
356
357        if self.sink.is_sink_into_table() {
358            // TODO(hzxa21): support rate limit?
359            processed_input.boxed()
360        } else {
361            let labels = [
362                &actor_id.to_string(),
363                &sink_id.to_string(),
364                self.sink_param.sink_name.as_str(),
365            ];
366            let log_store_first_write_epoch = GLOBAL_SINK_METRICS
367                .log_store_first_write_epoch
368                .with_guarded_label_values(&labels);
369            let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
370                .log_store_latest_write_epoch
371                .with_guarded_label_values(&labels);
372            let log_store_write_rows = GLOBAL_SINK_METRICS
373                .log_store_write_rows
374                .with_guarded_label_values(&labels);
375            let log_writer_metrics = LogWriterMetrics {
376                log_store_first_write_epoch,
377                log_store_latest_write_epoch,
378                log_store_write_rows,
379            };
380
381            let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
382            // Init the rate limit
383            rate_limit_tx.send(self.rate_limit.into()).unwrap();
384
385            let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
386
387            self.log_store_factory
388                .build()
389                .map(move |(log_reader, log_writer)| {
390                    let write_log_stream = Self::execute_write_log(
391                        processed_input,
392                        log_writer.monitored(log_writer_metrics),
393                        actor_id,
394                        fragment_id,
395                        sink_id,
396                        rate_limit_tx,
397                        rebuild_sink_tx,
398                    );
399
400                    let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
401                        let consume_log_stream = Self::execute_consume_log(
402                            *sink,
403                            log_reader,
404                            self.input_columns,
405                            self.sink_param,
406                            self.sink_writer_param,
407                            self.non_append_only_behavior,
408                            self.actor_context,
409                            rate_limit_rx,
410                            rebuild_sink_rx,
411                        )
412                        .instrument_await(
413                            await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
414                        )
415                        .map_ok(|never| never); // unify return type to `Message`
416
417                        consume_log_stream.boxed()
418                    });
419                    select(consume_log_stream_future.into_stream(), write_log_stream)
420                })
421                .into_stream()
422                .flatten()
423                .boxed()
424        }
425    }
426
427    #[try_stream(ok = Message, error = StreamExecutorError)]
428    async fn execute_write_log<W: LogWriter>(
429        input: impl MessageStream,
430        mut log_writer: W,
431        actor_id: ActorId,
432        fragment_id: FragmentId,
433        sink_id: SinkId,
434        rate_limit_tx: UnboundedSender<RateLimit>,
435        rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
436    ) {
437        pin_mut!(input);
438        let barrier = expect_first_barrier(&mut input).await?;
439        let epoch_pair = barrier.epoch;
440        let is_pause_on_startup = barrier.is_pause_on_startup();
441        // Propagate the first barrier
442        yield Message::Barrier(barrier);
443
444        log_writer.init(epoch_pair, is_pause_on_startup).await?;
445
446        let mut is_paused = false;
447
448        #[for_await]
449        for msg in input {
450            match msg? {
451                Message::Watermark(w) => yield Message::Watermark(w),
452                Message::Chunk(chunk) => {
453                    assert!(
454                        !is_paused,
455                        "Actor {actor_id} should not receive any data after pause"
456                    );
457                    log_writer.write_chunk(chunk.clone()).await?;
458                    yield Message::Chunk(chunk);
459                }
460                Message::Barrier(barrier) => {
461                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
462                    let schema_change = barrier.as_sink_schema_change(sink_id);
463                    if let Some(schema_change) = &schema_change {
464                        info!(?schema_change, %sink_id, "sink receive schema change");
465                    }
466                    let post_flush = log_writer
467                        .flush_current_epoch(
468                            barrier.epoch.curr,
469                            FlushCurrentEpochOptions {
470                                is_checkpoint: barrier.kind.is_checkpoint(),
471                                new_vnode_bitmap: update_vnode_bitmap.clone(),
472                                is_stop: barrier.is_stop(actor_id),
473                                schema_change,
474                            },
475                        )
476                        .await?;
477
478                    let mutation = barrier.mutation.clone();
479                    yield Message::Barrier(barrier);
480                    if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
481                        && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
482                    {
483                        let (tx, rx) = oneshot::channel();
484                        rebuild_sink_tx
485                            .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
486                            .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
487                        rx.await
488                            .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
489                    }
490                    post_flush.post_yield_barrier().await?;
491
492                    if let Some(mutation) = mutation.as_deref() {
493                        match mutation {
494                            Mutation::Pause => {
495                                log_writer.pause()?;
496                                is_paused = true;
497                            }
498                            Mutation::Resume => {
499                                log_writer.resume()?;
500                                is_paused = false;
501                            }
502                            Mutation::Throttle(fragment_to_apply) => {
503                                if let Some(entry) = fragment_to_apply.get(&fragment_id)
504                                    && entry.throttle_type() == ThrottleType::Sink
505                                {
506                                    tracing::info!(
507                                        rate_limit = entry.rate_limit,
508                                        "received sink rate limit on actor {actor_id}"
509                                    );
510                                    if let Err(e) = rate_limit_tx.send(entry.rate_limit.into()) {
511                                        error!(
512                                            error = %e.as_report(),
513                                            "fail to send sink rate limit update"
514                                        );
515                                        return Err(StreamExecutorError::from(
516                                            e.to_report_string(),
517                                        ));
518                                    }
519                                }
520                            }
521                            Mutation::ConnectorPropsChange(config) => {
522                                if let Some(map) = config.get(&sink_id.as_raw_id())
523                                    && let Err(e) = rebuild_sink_tx
524                                        .send(RebuildSinkMessage::UpdateConfig(map.clone()))
525                                {
526                                    error!(
527                                        error = %e.as_report(),
528                                        "fail to send sink alter props"
529                                    );
530                                    return Err(StreamExecutorError::from(e.to_report_string()));
531                                }
532                            }
533                            _ => (),
534                        }
535                    }
536                }
537            }
538        }
539    }
540
541    #[expect(clippy::too_many_arguments)]
542    #[try_stream(ok = Message, error = StreamExecutorError)]
543    async fn process_msg(
544        input: impl MessageStream,
545        sink_type: SinkType,
546        stream_key: StreamKey,
547        chunk_size: usize,
548        input_data_types: Vec<DataType>,
549        input_compact_ib: InconsistencyBehavior,
550        downstream_pk: Option<Vec<usize>>,
551        non_append_only_behavior: Option<NonAppendOnlyBehavior>,
552        sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
553        skip_compact: bool,
554    ) {
555        // To reorder records, we need to buffer chunks of the entire epoch.
556        if let Some(b) = non_append_only_behavior
557            && b.should_reorder_records()
558        {
559            assert_matches!(sink_type, SinkType::Upsert | SinkType::Retract);
560
561            let mut chunk_buffer = EstimatedVec::new();
562            let mut watermark: Option<super::Watermark> = None;
563            #[for_await]
564            for msg in input {
565                match msg? {
566                    Message::Watermark(w) => watermark = Some(w),
567                    Message::Chunk(c) => {
568                        chunk_buffer.push(c);
569                        sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
570                    }
571                    Message::Barrier(barrier) => {
572                        let chunks = mem::take(&mut chunk_buffer).into_inner();
573
574                        // 1. Compact the chunk based on the **stream key**, so that we have at most 2 rows for each
575                        //    stream key. Then, move all delete records to the front.
576                        let mut delete_chunks = vec![];
577                        let mut insert_chunks = vec![];
578
579                        for c in dispatch_output_kind!(sink_type, KIND, {
580                            StreamChunkCompactor::new(stream_key.clone(), chunks)
581                                .into_compacted_chunks_inline::<KIND>(input_compact_ib)
582                        }) {
583                            let chunk = force_delete_only(c.clone());
584                            if chunk.cardinality() > 0 {
585                                delete_chunks.push(chunk);
586                            }
587                            let chunk = force_append_only(c);
588                            if chunk.cardinality() > 0 {
589                                insert_chunks.push(chunk);
590                            }
591                        }
592                        let chunks = delete_chunks
593                            .into_iter()
594                            .chain(insert_chunks.into_iter())
595                            .collect();
596
597                        // 2. If user specifies a primary key, compact the chunk based on the **downstream pk**
598                        //    to eliminate any unnecessary updates to external systems. This also rewrites the
599                        //    `DELETE` and `INSERT` operations on the same key into `UPDATE` operations, which
600                        //    usually have more efficient implementation.
601                        if let Some(downstream_pk) = &downstream_pk {
602                            let chunks = dispatch_output_kind!(sink_type, KIND, {
603                                StreamChunkCompactor::new(downstream_pk.clone(), chunks)
604                                    .into_compacted_chunks_reconstructed::<KIND>(
605                                        chunk_size,
606                                        input_data_types.clone(),
607                                        // When compacting based on user provided primary key, we should never panic
608                                        // on inconsistency in case the user provided primary key is not unique.
609                                        InconsistencyBehavior::Warn,
610                                    )
611                            });
612                            for c in chunks {
613                                yield Message::Chunk(c);
614                            }
615                        } else {
616                            let mut chunk_builder =
617                                StreamChunkBuilder::new(chunk_size, input_data_types.clone());
618                            for chunk in chunks {
619                                for (op, row) in chunk.rows() {
620                                    if let Some(c) = chunk_builder.append_row(op, row) {
621                                        yield Message::Chunk(c);
622                                    }
623                                }
624                            }
625
626                            if let Some(c) = chunk_builder.take() {
627                                yield Message::Chunk(c);
628                            }
629                        };
630
631                        // 3. Forward watermark and barrier.
632                        if let Some(w) = mem::take(&mut watermark) {
633                            yield Message::Watermark(w)
634                        }
635                        yield Message::Barrier(barrier);
636                    }
637                }
638            }
639        } else {
640            // In this branch, we don't need to reorder records, either because the stream key matches
641            // the downstream pk, or the sink is append-only.
642            #[for_await]
643            for msg in input {
644                match msg? {
645                    Message::Watermark(w) => yield Message::Watermark(w),
646                    Message::Chunk(mut chunk) => {
647                        // Compact the chunk to eliminate any unnecessary updates to external systems.
648                        // This should be performed against the downstream pk, not the stream key, to
649                        // ensure correct retract/upsert semantics from the downstream's perspective.
650                        if !sink_type.is_append_only()
651                            && let Some(downstream_pk) = &downstream_pk
652                        {
653                            if skip_compact {
654                                // We can only skip compaction if the keys are exactly the same, not just
655                                // matching by being a subset.
656                                assert_eq!(&stream_key, downstream_pk);
657                            } else {
658                                chunk = dispatch_output_kind!(sink_type, KIND, {
659                                    compact_chunk_inline::<KIND>(
660                                        chunk,
661                                        downstream_pk,
662                                        // When compacting based on user provided primary key, we should never panic
663                                        // on inconsistency in case the user provided primary key is not unique.
664                                        InconsistencyBehavior::Warn,
665                                    )
666                                });
667                            }
668                        }
669                        yield Message::Chunk(chunk);
670                    }
671                    Message::Barrier(barrier) => {
672                        yield Message::Barrier(barrier);
673                    }
674                }
675            }
676        }
677    }
678
679    #[expect(clippy::too_many_arguments)]
680    async fn execute_consume_log<S: Sink, R: LogReader>(
681        mut sink: S,
682        log_reader: R,
683        columns: Vec<ColumnCatalog>,
684        mut sink_param: SinkParam,
685        mut sink_writer_param: SinkWriterParam,
686        non_append_only_behavior: Option<NonAppendOnlyBehavior>,
687        actor_context: ActorContextRef,
688        rate_limit_rx: UnboundedReceiver<RateLimit>,
689        mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
690    ) -> StreamExecutorResult<!> {
691        let visible_columns = columns
692            .iter()
693            .enumerate()
694            .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
695            .collect_vec();
696
697        let labels = [
698            &actor_context.id.to_string(),
699            sink_writer_param.connector.as_str(),
700            &sink_writer_param.sink_id.to_string(),
701            sink_writer_param.sink_name.as_str(),
702        ];
703        let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
704            .log_store_reader_wait_new_future_duration_ns
705            .with_guarded_label_values(&labels);
706        let log_store_read_rows = GLOBAL_SINK_METRICS
707            .log_store_read_rows
708            .with_guarded_label_values(&labels);
709        let log_store_read_bytes = GLOBAL_SINK_METRICS
710            .log_store_read_bytes
711            .with_guarded_label_values(&labels);
712        let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
713            .log_store_latest_read_epoch
714            .with_guarded_label_values(&labels);
715        let metrics = LogReaderMetrics {
716            log_store_latest_read_epoch,
717            log_store_read_rows,
718            log_store_read_bytes,
719            log_store_reader_wait_new_future_duration_ns,
720        };
721
722        let downstream_pk = sink_param.downstream_pk.clone();
723
724        let mut log_reader = log_reader
725            .transform_chunk(move |chunk| {
726                let chunk = if let Some(b) = non_append_only_behavior
727                    && b.should_compact_in_log_reader()
728                {
729                    // This guarantees that user has specified a `downstream_pk`.
730                    let downstream_pk = downstream_pk.as_ref().unwrap();
731                    dispatch_output_kind!(sink_param.sink_type, KIND, {
732                        compact_chunk_inline::<KIND>(
733                            chunk,
734                            downstream_pk,
735                            // When compacting based on user provided primary key, we should never panic
736                            // on inconsistency in case the user provided primary key is not unique.
737                            InconsistencyBehavior::Warn,
738                        )
739                    })
740                } else {
741                    chunk
742                };
743                if visible_columns.len() != columns.len() {
744                    // Do projection here because we may have columns that aren't visible to
745                    // the downstream.
746                    chunk.project(&visible_columns)
747                } else {
748                    chunk
749                }
750            })
751            .monitored(metrics)
752            .rate_limited(rate_limit_rx);
753
754        log_reader.init().await?;
755        loop {
756            let future = async {
757                loop {
758                    let Err(e) = sink
759                        .new_log_sinker(sink_writer_param.clone())
760                        .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
761                        .await;
762                    GLOBAL_ERROR_METRICS.user_sink_error.report([
763                        "sink_executor_error".to_owned(),
764                        sink_param.sink_id.to_string(),
765                        sink_param.sink_name.clone(),
766                        actor_context.fragment_id.to_string(),
767                    ]);
768
769                    if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
770                        meta_client
771                            .add_sink_fail_evet_log(
772                                sink_writer_param.sink_id,
773                                sink_writer_param.sink_name.clone(),
774                                sink_writer_param.connector.clone(),
775                                e.to_report_string(),
776                            )
777                            .await;
778                    }
779
780                    if F::ALLOW_REWIND {
781                        match log_reader.rewind().await {
782                            Ok(()) => {
783                                error!(
784                                    error = %e.as_report(),
785                                    executor_id = %sink_writer_param.executor_id,
786                                    sink_id = %sink_param.sink_id,
787                                    "reset log reader stream successfully after sink error"
788                                );
789                                Ok(())
790                            }
791                            Err(rewind_err) => {
792                                error!(
793                                    error = %rewind_err.as_report(),
794                                    "fail to rewind log reader"
795                                );
796                                Err(e)
797                            }
798                        }
799                    } else {
800                        Err(e)
801                    }
802                    .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
803                }
804            };
805            select! {
806                result = future => {
807                    let Err(e): StreamExecutorResult<!> = result;
808                    return Err(e);
809                }
810                result = rebuild_sink_rx.recv() => {
811                    match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
812                        RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
813                            sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
814                            if notify.send(()).is_err() {
815                                warn!("failed to notify rebuild sink");
816                            }
817                            log_reader.init().await?;
818                        },
819                        RebuildSinkMessage::UpdateConfig(config) => {
820                            if !sink_config_has_changes(&sink_param.properties, &config) {
821                                info!(
822                                    executor_id = %sink_writer_param.executor_id,
823                                    sink_id = %sink_param.sink_id,
824                                    "skip alter sink config because properties are unchanged"
825                                );
826                                Ok(())
827                            } else if F::ALLOW_REWIND {
828                                match log_reader.rewind().await {
829                                    Ok(()) => {
830                                        sink_param.properties.extend(config.into_iter());
831                                        sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
832                                        info!(
833                                            executor_id = %sink_writer_param.executor_id,
834                                            sink_id = %sink_param.sink_id,
835                                            "alter sink config successfully with rewind"
836                                        );
837                                        Ok(())
838                                    }
839                                    Err(rewind_err) => {
840                                        error!(
841                                            error = %rewind_err.as_report(),
842                                            "fail to rewind log reader for alter sink config "
843                                        );
844                                        Err(anyhow!("fail to rewind log after alter table").into())
845                                    }
846                                }
847                            } else {
848                                sink_param.properties.extend(config.into_iter());
849                                sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
850                                Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
851                            }
852                            .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
853                        },
854                    }
855                }
856            }
857        }
858    }
859}
860
861enum RebuildSinkMessage {
862    RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
863    UpdateConfig(HashMap<String, String>),
864}
865
866fn sink_config_has_changes(
867    current: &BTreeMap<String, String>,
868    incoming: &HashMap<String, String>,
869) -> bool {
870    incoming
871        .iter()
872        .any(|(key, value)| current.get(key) != Some(value))
873}
874
875impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
876    fn execute(self: Box<Self>) -> BoxedMessageStream {
877        self.execute_inner()
878    }
879}
880
881#[cfg(test)]
882mod test {
883    use risingwave_common::catalog::{ColumnDesc, ColumnId};
884    use risingwave_common::util::epoch::test_epoch;
885    use risingwave_connector::sink::build_sink;
886
887    use super::*;
888    use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
889    use crate::executor::test_utils::*;
890
891    #[test]
892    fn test_sink_config_has_changes() {
893        let current = BTreeMap::from([
894            ("connector".to_owned(), "blackhole".to_owned()),
895            ("commit_checkpoint_interval".to_owned(), "1".to_owned()),
896        ]);
897
898        assert!(!sink_config_has_changes(
899            &current,
900            &HashMap::from([("commit_checkpoint_interval".to_owned(), "1".to_owned())])
901        ));
902        assert!(sink_config_has_changes(
903            &current,
904            &HashMap::from([("commit_checkpoint_interval".to_owned(), "2".to_owned())])
905        ));
906        assert!(sink_config_has_changes(
907            &current,
908            &HashMap::from([("force_append_only".to_owned(), "true".to_owned())])
909        ));
910    }
911
912    #[tokio::test]
913    async fn test_force_append_only_sink() {
914        use risingwave_common::array::StreamChunkTestExt;
915        use risingwave_common::array::stream_chunk::StreamChunk;
916        use risingwave_common::types::DataType;
917
918        use crate::executor::Barrier;
919
920        let properties = maplit::btreemap! {
921            "connector".into() => "blackhole".into(),
922            "type".into() => "append-only".into(),
923            "force_append_only".into() => "true".into()
924        };
925
926        // We have two visible columns and one hidden column. The hidden column will be pruned out
927        // within the sink executor.
928        let columns = vec![
929            ColumnCatalog {
930                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
931                is_hidden: false,
932            },
933            ColumnCatalog {
934                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
935                is_hidden: false,
936            },
937            ColumnCatalog {
938                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
939                is_hidden: true,
940            },
941        ];
942        let schema: Schema = columns
943            .iter()
944            .map(|column| Field::from(column.column_desc.clone()))
945            .collect();
946        let stream_key = vec![0];
947
948        let source = MockSource::with_messages(vec![
949            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
950            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
951                " I I I
952                    + 3 2 1",
953            ))),
954            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
955            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
956                "  I I I
957                    U- 3 2 1
958                    U+ 3 4 1
959                     + 5 6 7",
960            ))),
961            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
962                " I I I
963                    - 5 6 7",
964            ))),
965        ])
966        .into_executor(schema.clone(), stream_key.clone());
967
968        let sink_param = SinkParam {
969            sink_id: 0.into(),
970            sink_name: "test".into(),
971            properties,
972
973            columns: columns
974                .iter()
975                .filter(|col| !col.is_hidden)
976                .map(|col| col.column_desc.clone())
977                .collect(),
978            downstream_pk: Some(stream_key.clone()),
979            sink_type: SinkType::AppendOnly,
980            ignore_delete: true,
981            format_desc: None,
982            db_name: "test".into(),
983            sink_from_name: "test".into(),
984        };
985
986        let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
987
988        let sink = build_sink(sink_param.clone()).unwrap();
989
990        let sink_executor = SinkExecutor::new(
991            ActorContext::for_test(0),
992            info,
993            source,
994            SinkWriterParam::for_test(),
995            sink,
996            sink_param,
997            columns.clone(),
998            BoundedInMemLogStoreFactory::for_test(1),
999            1024,
1000            vec![DataType::Int32, DataType::Int32, DataType::Int32],
1001            None,
1002        )
1003        .unwrap();
1004
1005        let mut executor = sink_executor.boxed().execute();
1006
1007        // Barrier message.
1008        executor.next().await.unwrap().unwrap();
1009
1010        let chunk_msg = executor.next().await.unwrap().unwrap();
1011        assert_eq!(
1012            chunk_msg.into_chunk().unwrap().compact_vis(),
1013            StreamChunk::from_pretty(
1014                " I I I
1015                + 3 2 1",
1016            )
1017        );
1018
1019        // Barrier message.
1020        executor.next().await.unwrap().unwrap();
1021
1022        let chunk_msg = executor.next().await.unwrap().unwrap();
1023        assert_eq!(
1024            chunk_msg.into_chunk().unwrap().compact_vis(),
1025            StreamChunk::from_pretty(
1026                " I I I
1027                + 3 4 1
1028                + 5 6 7",
1029            )
1030        );
1031
1032        // Should not receive the third stream chunk message because the force-append-only sink
1033        // executor will drop all DELETE messages.
1034
1035        // The last barrier message.
1036        executor.next().await.unwrap().unwrap();
1037    }
1038
1039    #[tokio::test]
1040    async fn stream_key_sink_pk_mismatch_upsert() {
1041        stream_key_sink_pk_mismatch(SinkType::Upsert).await;
1042    }
1043
1044    #[tokio::test]
1045    async fn stream_key_sink_pk_mismatch_retract() {
1046        stream_key_sink_pk_mismatch(SinkType::Retract).await;
1047    }
1048
1049    async fn stream_key_sink_pk_mismatch(sink_type: SinkType) {
1050        use risingwave_common::array::StreamChunkTestExt;
1051        use risingwave_common::array::stream_chunk::StreamChunk;
1052        use risingwave_common::types::DataType;
1053
1054        use crate::executor::Barrier;
1055
1056        let properties = maplit::btreemap! {
1057            "connector".into() => "blackhole".into(),
1058        };
1059
1060        // We have two visible columns and one hidden column. The hidden column will be pruned out
1061        // within the sink executor.
1062        let columns = vec![
1063            ColumnCatalog {
1064                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1065                is_hidden: false,
1066            },
1067            ColumnCatalog {
1068                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1069                is_hidden: false,
1070            },
1071            ColumnCatalog {
1072                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1073                is_hidden: true,
1074            },
1075        ];
1076        let schema: Schema = columns
1077            .iter()
1078            .map(|column| Field::from(column.column_desc.clone()))
1079            .collect();
1080
1081        let source = MockSource::with_messages(vec![
1082            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1083            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1084                " I I I
1085                    + 1 1 10",
1086            ))),
1087            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1088            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1089                " I I I
1090                    + 1 3 30",
1091            ))),
1092            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1093                " I I I
1094                    + 1 2 20
1095                    - 1 2 20",
1096            ))),
1097            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1098                " I I I
1099                    - 1 1 10
1100                    + 1 1 40",
1101            ))),
1102            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1103        ])
1104        .into_executor(schema.clone(), vec![0, 1]);
1105
1106        let sink_param = SinkParam {
1107            sink_id: 0.into(),
1108            sink_name: "test".into(),
1109            properties,
1110
1111            columns: columns
1112                .iter()
1113                .filter(|col| !col.is_hidden)
1114                .map(|col| col.column_desc.clone())
1115                .collect(),
1116            downstream_pk: Some(vec![0]),
1117            sink_type,
1118            ignore_delete: false,
1119            format_desc: None,
1120            db_name: "test".into(),
1121            sink_from_name: "test".into(),
1122        };
1123
1124        let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1125
1126        let sink = build_sink(sink_param.clone()).unwrap();
1127
1128        let sink_executor = SinkExecutor::new(
1129            ActorContext::for_test(0),
1130            info,
1131            source,
1132            SinkWriterParam::for_test(),
1133            sink,
1134            sink_param,
1135            columns.clone(),
1136            BoundedInMemLogStoreFactory::for_test(1),
1137            1024,
1138            vec![DataType::Int64, DataType::Int64, DataType::Int64],
1139            None,
1140        )
1141        .unwrap();
1142
1143        let mut executor = sink_executor.boxed().execute();
1144
1145        // Barrier message.
1146        executor.next().await.unwrap().unwrap();
1147
1148        let chunk_msg = executor.next().await.unwrap().unwrap();
1149        assert_eq!(
1150            chunk_msg.into_chunk().unwrap().compact_vis(),
1151            StreamChunk::from_pretty(
1152                " I I I
1153                + 1 1 10",
1154            )
1155        );
1156
1157        // Barrier message.
1158        executor.next().await.unwrap().unwrap();
1159
1160        let chunk_msg = executor.next().await.unwrap().unwrap();
1161        let expected = match sink_type {
1162            SinkType::Retract => StreamChunk::from_pretty(
1163                " I I I
1164                U- 1 1 10
1165                U+ 1 1 40",
1166            ),
1167            SinkType::Upsert => StreamChunk::from_pretty(
1168                " I I I
1169                + 1 1 40", // For upsert format, there won't be `U- 1 1 10`.
1170            ),
1171            _ => unreachable!(),
1172        };
1173        assert_eq!(chunk_msg.into_chunk().unwrap().compact_vis(), expected);
1174
1175        // The last barrier message.
1176        executor.next().await.unwrap().unwrap();
1177    }
1178
1179    #[tokio::test]
1180    async fn test_empty_barrier_sink() {
1181        use risingwave_common::types::DataType;
1182
1183        use crate::executor::Barrier;
1184
1185        let properties = maplit::btreemap! {
1186            "connector".into() => "blackhole".into(),
1187            "type".into() => "append-only".into(),
1188            "force_append_only".into() => "true".into()
1189        };
1190        let columns = vec![
1191            ColumnCatalog {
1192                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1193                is_hidden: false,
1194            },
1195            ColumnCatalog {
1196                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1197                is_hidden: false,
1198            },
1199        ];
1200        let schema: Schema = columns
1201            .iter()
1202            .map(|column| Field::from(column.column_desc.clone()))
1203            .collect();
1204        let stream_key = vec![0];
1205
1206        let source = MockSource::with_messages(vec![
1207            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1208            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1209            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1210        ])
1211        .into_executor(schema.clone(), stream_key.clone());
1212
1213        let sink_param = SinkParam {
1214            sink_id: 0.into(),
1215            sink_name: "test".into(),
1216            properties,
1217
1218            columns: columns
1219                .iter()
1220                .filter(|col| !col.is_hidden)
1221                .map(|col| col.column_desc.clone())
1222                .collect(),
1223            downstream_pk: Some(stream_key.clone()),
1224            sink_type: SinkType::AppendOnly,
1225            ignore_delete: true,
1226            format_desc: None,
1227            db_name: "test".into(),
1228            sink_from_name: "test".into(),
1229        };
1230
1231        let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
1232
1233        let sink = build_sink(sink_param.clone()).unwrap();
1234
1235        let sink_executor = SinkExecutor::new(
1236            ActorContext::for_test(0),
1237            info,
1238            source,
1239            SinkWriterParam::for_test(),
1240            sink,
1241            sink_param,
1242            columns,
1243            BoundedInMemLogStoreFactory::for_test(1),
1244            1024,
1245            vec![DataType::Int64, DataType::Int64],
1246            None,
1247        )
1248        .unwrap();
1249
1250        let mut executor = sink_executor.boxed().execute();
1251
1252        // Barrier message.
1253        assert_eq!(
1254            executor.next().await.unwrap().unwrap(),
1255            Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1256        );
1257
1258        // Barrier message.
1259        assert_eq!(
1260            executor.next().await.unwrap().unwrap(),
1261            Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1262        );
1263
1264        // The last barrier message.
1265        assert_eq!(
1266            executor.next().await.unwrap().unwrap(),
1267            Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1268        );
1269    }
1270
1271    #[tokio::test]
1272    async fn test_force_compaction() {
1273        use risingwave_common::array::StreamChunkTestExt;
1274        use risingwave_common::array::stream_chunk::StreamChunk;
1275        use risingwave_common::types::DataType;
1276
1277        use crate::executor::Barrier;
1278
1279        let properties = maplit::btreemap! {
1280            "connector".into() => "blackhole".into(),
1281            "force_compaction".into() => "true".into()
1282        };
1283
1284        // We have two visible columns and one hidden column. The hidden column will be pruned out
1285        // within the sink executor.
1286        let columns = vec![
1287            ColumnCatalog {
1288                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1289                is_hidden: false,
1290            },
1291            ColumnCatalog {
1292                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1293                is_hidden: false,
1294            },
1295            ColumnCatalog {
1296                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1297                is_hidden: true,
1298            },
1299        ];
1300        let schema: Schema = columns
1301            .iter()
1302            .map(|column| Field::from(column.column_desc.clone()))
1303            .collect();
1304
1305        let source = MockSource::with_messages(vec![
1306            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1307            Message::Chunk(StreamChunk::from_pretty(
1308                " I I I
1309                    + 1 1 10",
1310            )),
1311            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1312            Message::Chunk(StreamChunk::from_pretty(
1313                " I I I
1314                    + 1 3 30",
1315            )),
1316            Message::Chunk(StreamChunk::from_pretty(
1317                " I I I
1318                    + 1 2 20
1319                    - 1 2 20
1320                    + 1 4 10",
1321            )),
1322            Message::Chunk(StreamChunk::from_pretty(
1323                " I I I
1324                    - 1 1 10
1325                    + 1 1 40",
1326            )),
1327            Message::Chunk(StreamChunk::from_pretty(
1328                " I I I
1329                    - 1 4 30",
1330            )),
1331            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1332        ])
1333        .into_executor(schema.clone(), vec![0, 1]);
1334
1335        let sink_param = SinkParam {
1336            sink_id: 0.into(),
1337            sink_name: "test".into(),
1338            properties,
1339
1340            columns: columns
1341                .iter()
1342                .filter(|col| !col.is_hidden)
1343                .map(|col| col.column_desc.clone())
1344                .collect(),
1345            downstream_pk: Some(vec![0, 1]),
1346            sink_type: SinkType::Upsert,
1347            ignore_delete: false,
1348            format_desc: None,
1349            db_name: "test".into(),
1350            sink_from_name: "test".into(),
1351        };
1352
1353        let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1354
1355        let sink = build_sink(sink_param.clone()).unwrap();
1356
1357        let sink_executor = SinkExecutor::new(
1358            ActorContext::for_test(0),
1359            info,
1360            source,
1361            SinkWriterParam::for_test(),
1362            sink,
1363            sink_param,
1364            columns.clone(),
1365            BoundedInMemLogStoreFactory::for_test(1),
1366            1024,
1367            vec![DataType::Int64, DataType::Int64, DataType::Int64],
1368            None,
1369        )
1370        .unwrap();
1371
1372        let mut executor = sink_executor.boxed().execute();
1373
1374        // Barrier message.
1375        executor.next().await.unwrap().unwrap();
1376
1377        let chunk_msg = executor.next().await.unwrap().unwrap();
1378        assert_eq!(
1379            chunk_msg.into_chunk().unwrap().compact_vis(),
1380            StreamChunk::from_pretty(
1381                " I I I
1382                + 1 1 10",
1383            )
1384        );
1385
1386        // Barrier message.
1387        executor.next().await.unwrap().unwrap();
1388
1389        let chunk_msg = executor.next().await.unwrap().unwrap();
1390        assert_eq!(
1391            chunk_msg.into_chunk().unwrap().compact_vis(),
1392            StreamChunk::from_pretty(
1393                " I I I
1394                + 1 3 30
1395                + 1 1 40",
1396            )
1397        );
1398
1399        // The last barrier message.
1400        executor.next().await.unwrap().unwrap();
1401    }
1402}