risingwave_stream/executor/
sink.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::HashMap;
17use std::mem;
18
19use anyhow::anyhow;
20use futures::stream::select;
21use futures::{FutureExt, TryFutureExt, TryStreamExt};
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::array::stream_chunk::StreamChunkMut;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{ColumnCatalog, Field};
27use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
28use risingwave_common_estimate_size::EstimateSize;
29use risingwave_common_estimate_size::collections::EstimatedVec;
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_connector::dispatch_sink;
32use risingwave_connector::sink::catalog::{SinkId, SinkType};
33use risingwave_connector::sink::log_store::{
34    FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
35    LogWriter, LogWriterExt, LogWriterMetrics,
36};
37use risingwave_connector::sink::{
38    GLOBAL_SINK_METRICS, LogSinker, SINK_USER_FORCE_COMPACTION, Sink, SinkImpl, SinkParam,
39    SinkWriterParam,
40};
41use risingwave_pb::common::ThrottleType;
42use risingwave_pb::id::FragmentId;
43use risingwave_pb::stream_plan::stream_node::StreamKind;
44use thiserror_ext::AsReport;
45use tokio::select;
46use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
47use tokio::sync::oneshot;
48
49use crate::common::change_buffer::{OutputKind, output_kind};
50use crate::common::compact_chunk::{
51    InconsistencyBehavior, StreamChunkCompactor, compact_chunk_inline,
52};
53use crate::executor::prelude::*;
54pub struct SinkExecutor<F: LogStoreFactory> {
55    actor_context: ActorContextRef,
56    info: ExecutorInfo,
57    input: Executor,
58    sink: SinkImpl,
59    input_columns: Vec<ColumnCatalog>,
60    sink_param: SinkParam,
61    log_store_factory: F,
62    sink_writer_param: SinkWriterParam,
63    chunk_size: usize,
64    input_data_types: Vec<DataType>,
65    non_append_only_behavior: Option<NonAppendOnlyBehavior>,
66    rate_limit: Option<u32>,
67}
68
69// Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT.
70fn force_append_only(c: StreamChunk) -> StreamChunk {
71    let mut c: StreamChunkMut = c.into();
72    for (_, mut r) in c.to_rows_mut() {
73        match r.op() {
74            Op::Insert => {}
75            Op::Delete | Op::UpdateDelete => r.set_vis(false),
76            Op::UpdateInsert => r.set_op(Op::Insert),
77        }
78    }
79    c.into()
80}
81
82// Drop all the INSERT messages in this chunk and convert UPDATE DELETE into DELETE.
83fn force_delete_only(c: StreamChunk) -> StreamChunk {
84    let mut c: StreamChunkMut = c.into();
85    for (_, mut r) in c.to_rows_mut() {
86        match r.op() {
87            Op::Delete => {}
88            Op::Insert | Op::UpdateInsert => r.set_vis(false),
89            Op::UpdateDelete => r.set_op(Op::Delete),
90        }
91    }
92    c.into()
93}
94
95/// When the sink is non-append-only, i.e. upsert or retract, we need to do some extra work for
96/// correctness and performance.
97#[derive(Clone, Copy, Debug)]
98struct NonAppendOnlyBehavior {
99    /// Whether the user specifies a primary key for the sink, and it matches the derived stream
100    /// key of the stream.
101    ///
102    /// By matching, we mean that stream key is a subset of the downstream pk.
103    pk_specified_and_matched: bool,
104    /// Whether the user forces buffering all chunks between two barriers.
105    force_compaction: bool,
106}
107
108impl NonAppendOnlyBehavior {
109    /// NOTE(kwannoel):
110    ///
111    /// After the optimization in <https://github.com/risingwavelabs/risingwave/pull/12250>,
112    /// `DELETE`s will be sequenced before `INSERT`s in JDBC sinks and PG rust sink.
113    /// There's a risk that adjacent chunks with `DELETE`s on the same PK will get
114    /// merged into a single chunk, since the logstore format doesn't preserve chunk
115    /// boundaries. Then we will have double `DELETE`s followed by unspecified sequence
116    /// of `INSERT`s, and lead to inconsistent data downstream.
117    ///
118    /// We only need to do the compaction for non-append-only sinks, when the upstream and
119    /// downstream PKs are matched. When the upstream and downstream PKs are not matched,
120    /// we will buffer the chunks between two barriers, so the compaction is not needed,
121    /// since the barriers will preserve chunk boundaries.
122    ///
123    /// When `force_compaction` is true, we also skip compaction here, since the buffering
124    /// will also make compaction.
125    ///
126    /// When the sink is an append-only sink, it is either `force_append_only` or
127    /// `append_only`, we should only append to downstream, so there should not be any
128    /// overlapping keys.
129    fn should_compact_in_log_reader(self) -> bool {
130        self.pk_specified_and_matched && !self.force_compaction
131    }
132
133    /// When stream key is different from the user defined primary key columns for sinks.
134    /// The operations could be out of order.
135    ///
136    /// For example, we have a stream with derived stream key `a, b` and user-specified sink
137    /// primary key `a`. Assume that we have `(1, 1)` in the table. Then, we perform two `UPDATE`
138    /// operations:
139    ///
140    /// ```text
141    /// UPDATE SET b = 2 WHERE a = 1 ... which issues:
142    ///   - (1, 1)
143    ///   + (1, 2)
144    ///
145    /// UPDATE SET b = 3 WHERE a = 1 ... which issues:
146    ///   - (1, 2)
147    ///   + (1, 3)
148    /// ```
149    ///
150    /// When these changes go into streaming pipeline, they could be shuffled to different parallelism
151    /// (actor), given that they are under different stream keys.
152    ///
153    /// ```text
154    /// Actor 1:
155    /// - (1, 1)
156    ///
157    /// Actor 2:
158    /// + (1, 2)
159    /// - (1, 2)
160    ///
161    /// Actor 3:
162    /// + (1, 3)
163    /// ```
164    ///
165    /// When these records are merged back into sink actor, we may get the records from different
166    /// parallelism in arbitrary order, like:
167    ///
168    /// ```text
169    /// + (1, 2) -- Actor 2, first row
170    /// + (1, 3) -- Actor 3
171    /// - (1, 1) -- Actor 1
172    /// - (1, 2) -- Actor 2, second row
173    /// ```
174    ///
175    /// Note that in terms of stream key (`a, b`), the operations in the order above are completely
176    /// correct, because we are operating on 3 different rows. However, in terms of user defined sink
177    /// primary key `a`, we're violating the unique constraint all the time.
178    ///
179    /// Therefore, in this case, we have to do additional reordering in the sink executor per barrier.
180    /// Specifically, we need to:
181    ///
182    /// First, compact all the changes with the stream key, so we have:
183    /// ```text
184    /// + (1, 3)
185    /// - (1, 1)
186    /// ```
187    ///
188    /// Then, sink all the delete events before sinking all insert events, so we have:
189    /// ```text
190    /// - (1, 1)
191    /// + (1, 3)
192    /// ```
193    /// Since we've compacted the chunk with the stream key, the `DELETE` records survived must be to
194    /// delete an existing row, so we can safely move them to the front. After the deletion is done,
195    /// we can then safely sink the insert events with uniqueness guarantee.
196    ///
197    /// When `force_compaction` is true, we also perform additional reordering to gain the
198    /// benefits of compaction:
199    /// - reduce the number of output messages;
200    /// - emit at most one update per key within a barrier interval, simplifying downstream logic.
201    fn should_reorder_records(self) -> bool {
202        !self.pk_specified_and_matched || self.force_compaction
203    }
204}
205
206/// Get the output kind for chunk compaction based on the given sink type.
207fn compact_output_kind(sink_type: SinkType) -> OutputKind {
208    match sink_type {
209        SinkType::Upsert => output_kind::UPSERT,
210        SinkType::Retract => output_kind::RETRACT,
211        // There won't be any `Update` or `Delete` in the chunk, so it doesn't matter.
212        SinkType::AppendOnly => output_kind::RETRACT,
213    }
214}
215
216/// Dispatch the code block to different output kinds for chunk compaction based on sink type.
217macro_rules! dispatch_output_kind {
218    ($sink_type:expr, $KIND:ident, $body:tt) => {
219        #[allow(unused_braces)]
220        match compact_output_kind($sink_type) {
221            output_kind::UPSERT => {
222                const KIND: OutputKind = output_kind::UPSERT;
223                $body
224            }
225            output_kind::RETRACT => {
226                const KIND: OutputKind = output_kind::RETRACT;
227                $body
228            }
229        }
230    };
231}
232
233impl<F: LogStoreFactory> SinkExecutor<F> {
234    #[allow(clippy::too_many_arguments)]
235    #[expect(clippy::unused_async)]
236    pub async fn new(
237        actor_context: ActorContextRef,
238        info: ExecutorInfo,
239        input: Executor,
240        sink_writer_param: SinkWriterParam,
241        sink: SinkImpl,
242        sink_param: SinkParam,
243        columns: Vec<ColumnCatalog>,
244        log_store_factory: F,
245        chunk_size: usize,
246        input_data_types: Vec<DataType>,
247        rate_limit: Option<u32>,
248    ) -> StreamExecutorResult<Self> {
249        let sink_input_schema: Schema = columns
250            .iter()
251            .map(|column| Field::from(&column.column_desc))
252            .collect();
253
254        if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
255            // Remove the partition column from the schema.
256            assert_eq!(sink_input_schema.data_types(), {
257                let mut data_type = info.schema.data_types();
258                data_type.remove(col_dix);
259                data_type
260            });
261        } else {
262            assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
263        }
264
265        let non_append_only_behavior = if !sink_param.sink_type.is_append_only() {
266            let stream_key = &info.stream_key;
267            let pk_specified_and_matched = (sink_param.downstream_pk.as_ref())
268                .is_some_and(|downstream_pk| stream_key.iter().all(|i| downstream_pk.contains(i)));
269            let force_compaction = sink_param
270                .properties
271                .get(SINK_USER_FORCE_COMPACTION)
272                .map(|v| v.eq_ignore_ascii_case("true"))
273                .unwrap_or(false);
274            Some(NonAppendOnlyBehavior {
275                pk_specified_and_matched,
276                force_compaction,
277            })
278        } else {
279            None
280        };
281
282        tracing::info!(
283            sink_id = %sink_param.sink_id,
284            actor_id = %actor_context.id,
285            ?non_append_only_behavior,
286            "Sink executor info"
287        );
288
289        Ok(Self {
290            actor_context,
291            info,
292            input,
293            sink,
294            input_columns: columns,
295            sink_param,
296            log_store_factory,
297            sink_writer_param,
298            chunk_size,
299            input_data_types,
300            non_append_only_behavior,
301            rate_limit,
302        })
303    }
304
305    fn execute_inner(self) -> BoxedMessageStream {
306        let sink_id = self.sink_param.sink_id;
307        let actor_id = self.actor_context.id;
308        let fragment_id = self.actor_context.fragment_id;
309
310        let stream_key = self.info.stream_key.clone();
311        let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
312            sink_id,
313            actor_id,
314            fragment_id,
315        );
316
317        // When processing upsert stream, we need to tolerate the inconsistency (mismatched `DELETE`
318        // and `INSERT` pairs) when compacting input chunks with derived stream key.
319        let input_compact_ib = if self.input.stream_kind() == StreamKind::Upsert {
320            InconsistencyBehavior::Tolerate
321        } else {
322            InconsistencyBehavior::Panic
323        };
324
325        let input = self.input.execute();
326
327        let input = input.inspect_ok(move |msg| {
328            if let Message::Chunk(c) = msg {
329                metrics.sink_input_row_count.inc_by(c.capacity() as u64);
330                metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
331            }
332        });
333
334        let processed_input = Self::process_msg(
335            input,
336            self.sink_param.sink_type,
337            stream_key,
338            self.chunk_size,
339            self.input_data_types,
340            input_compact_ib,
341            self.sink_param.downstream_pk.clone(),
342            self.non_append_only_behavior,
343            metrics.sink_chunk_buffer_size,
344            self.sink.is_blackhole(), // skip compact for blackhole for better benchmark results
345        );
346
347        let processed_input = if self.sink_param.ignore_delete {
348            // Drop UPDATE/DELETE messages if specified `ignore_delete` (formerly `force_append_only`).
349            processed_input
350                .map_ok(|msg| match msg {
351                    Message::Chunk(chunk) => Message::Chunk(force_append_only(chunk)),
352                    other => other,
353                })
354                .left_stream()
355        } else {
356            processed_input.right_stream()
357        };
358
359        if self.sink.is_sink_into_table() {
360            // TODO(hzxa21): support rate limit?
361            processed_input.boxed()
362        } else {
363            let labels = [
364                &actor_id.to_string(),
365                &sink_id.to_string(),
366                self.sink_param.sink_name.as_str(),
367            ];
368            let log_store_first_write_epoch = GLOBAL_SINK_METRICS
369                .log_store_first_write_epoch
370                .with_guarded_label_values(&labels);
371            let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
372                .log_store_latest_write_epoch
373                .with_guarded_label_values(&labels);
374            let log_store_write_rows = GLOBAL_SINK_METRICS
375                .log_store_write_rows
376                .with_guarded_label_values(&labels);
377            let log_writer_metrics = LogWriterMetrics {
378                log_store_first_write_epoch,
379                log_store_latest_write_epoch,
380                log_store_write_rows,
381            };
382
383            let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
384            // Init the rate limit
385            rate_limit_tx.send(self.rate_limit.into()).unwrap();
386
387            let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
388
389            self.log_store_factory
390                .build()
391                .map(move |(log_reader, log_writer)| {
392                    let write_log_stream = Self::execute_write_log(
393                        processed_input,
394                        log_writer.monitored(log_writer_metrics),
395                        actor_id,
396                        fragment_id,
397                        sink_id,
398                        rate_limit_tx,
399                        rebuild_sink_tx,
400                    );
401
402                    let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
403                        let consume_log_stream = Self::execute_consume_log(
404                            *sink,
405                            log_reader,
406                            self.input_columns,
407                            self.sink_param,
408                            self.sink_writer_param,
409                            self.non_append_only_behavior,
410                            self.actor_context,
411                            rate_limit_rx,
412                            rebuild_sink_rx,
413                        )
414                        .instrument_await(
415                            await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
416                        )
417                        .map_ok(|never| never); // unify return type to `Message`
418
419                        consume_log_stream.boxed()
420                    });
421                    select(consume_log_stream_future.into_stream(), write_log_stream)
422                })
423                .into_stream()
424                .flatten()
425                .boxed()
426        }
427    }
428
429    #[try_stream(ok = Message, error = StreamExecutorError)]
430    async fn execute_write_log<W: LogWriter>(
431        input: impl MessageStream,
432        mut log_writer: W,
433        actor_id: ActorId,
434        fragment_id: FragmentId,
435        sink_id: SinkId,
436        rate_limit_tx: UnboundedSender<RateLimit>,
437        rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
438    ) {
439        pin_mut!(input);
440        let barrier = expect_first_barrier(&mut input).await?;
441        let epoch_pair = barrier.epoch;
442        let is_pause_on_startup = barrier.is_pause_on_startup();
443        // Propagate the first barrier
444        yield Message::Barrier(barrier);
445
446        log_writer.init(epoch_pair, is_pause_on_startup).await?;
447
448        let mut is_paused = false;
449
450        #[for_await]
451        for msg in input {
452            match msg? {
453                Message::Watermark(w) => yield Message::Watermark(w),
454                Message::Chunk(chunk) => {
455                    assert!(
456                        !is_paused,
457                        "Actor {actor_id} should not receive any data after pause"
458                    );
459                    log_writer.write_chunk(chunk.clone()).await?;
460                    yield Message::Chunk(chunk);
461                }
462                Message::Barrier(barrier) => {
463                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
464                    let schema_change = barrier.as_sink_schema_change(sink_id);
465                    if let Some(schema_change) = &schema_change {
466                        info!(?schema_change, %sink_id, "sink receive schema change");
467                    }
468                    let post_flush = log_writer
469                        .flush_current_epoch(
470                            barrier.epoch.curr,
471                            FlushCurrentEpochOptions {
472                                is_checkpoint: barrier.kind.is_checkpoint(),
473                                new_vnode_bitmap: update_vnode_bitmap.clone(),
474                                is_stop: barrier.is_stop(actor_id),
475                                schema_change,
476                            },
477                        )
478                        .await?;
479
480                    let mutation = barrier.mutation.clone();
481                    yield Message::Barrier(barrier);
482                    if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
483                        && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
484                    {
485                        let (tx, rx) = oneshot::channel();
486                        rebuild_sink_tx
487                            .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
488                            .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
489                        rx.await
490                            .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
491                    }
492                    post_flush.post_yield_barrier().await?;
493
494                    if let Some(mutation) = mutation.as_deref() {
495                        match mutation {
496                            Mutation::Pause => {
497                                log_writer.pause()?;
498                                is_paused = true;
499                            }
500                            Mutation::Resume => {
501                                log_writer.resume()?;
502                                is_paused = false;
503                            }
504                            Mutation::Throttle(fragment_to_apply) => {
505                                if let Some(entry) = fragment_to_apply.get(&fragment_id)
506                                    && entry.throttle_type() == ThrottleType::Sink
507                                {
508                                    tracing::info!(
509                                        rate_limit = entry.rate_limit,
510                                        "received sink rate limit on actor {actor_id}"
511                                    );
512                                    if let Err(e) = rate_limit_tx.send(entry.rate_limit.into()) {
513                                        error!(
514                                            error = %e.as_report(),
515                                            "fail to send sink rate limit update"
516                                        );
517                                        return Err(StreamExecutorError::from(
518                                            e.to_report_string(),
519                                        ));
520                                    }
521                                }
522                            }
523                            Mutation::ConnectorPropsChange(config) => {
524                                if let Some(map) = config.get(&sink_id.as_raw_id())
525                                    && let Err(e) = rebuild_sink_tx
526                                        .send(RebuildSinkMessage::UpdateConfig(map.clone()))
527                                {
528                                    error!(
529                                        error = %e.as_report(),
530                                        "fail to send sink alter props"
531                                    );
532                                    return Err(StreamExecutorError::from(e.to_report_string()));
533                                }
534                            }
535                            _ => (),
536                        }
537                    }
538                }
539            }
540        }
541    }
542
543    #[allow(clippy::too_many_arguments)]
544    #[try_stream(ok = Message, error = StreamExecutorError)]
545    async fn process_msg(
546        input: impl MessageStream,
547        sink_type: SinkType,
548        stream_key: StreamKey,
549        chunk_size: usize,
550        input_data_types: Vec<DataType>,
551        input_compact_ib: InconsistencyBehavior,
552        downstream_pk: Option<Vec<usize>>,
553        non_append_only_behavior: Option<NonAppendOnlyBehavior>,
554        sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
555        skip_compact: bool,
556    ) {
557        // To reorder records, we need to buffer chunks of the entire epoch.
558        if let Some(b) = non_append_only_behavior
559            && b.should_reorder_records()
560        {
561            assert_matches!(sink_type, SinkType::Upsert | SinkType::Retract);
562
563            let mut chunk_buffer = EstimatedVec::new();
564            let mut watermark: Option<super::Watermark> = None;
565            #[for_await]
566            for msg in input {
567                match msg? {
568                    Message::Watermark(w) => watermark = Some(w),
569                    Message::Chunk(c) => {
570                        chunk_buffer.push(c);
571                        sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
572                    }
573                    Message::Barrier(barrier) => {
574                        let chunks = mem::take(&mut chunk_buffer).into_inner();
575
576                        // 1. Compact the chunk based on the **stream key**, so that we have at most 2 rows for each
577                        //    stream key. Then, move all delete records to the front.
578                        let mut delete_chunks = vec![];
579                        let mut insert_chunks = vec![];
580
581                        for c in dispatch_output_kind!(sink_type, KIND, {
582                            StreamChunkCompactor::new(stream_key.clone(), chunks)
583                                .into_compacted_chunks_inline::<KIND>(input_compact_ib)
584                        }) {
585                            let chunk = force_delete_only(c.clone());
586                            if chunk.cardinality() > 0 {
587                                delete_chunks.push(chunk);
588                            }
589                            let chunk = force_append_only(c);
590                            if chunk.cardinality() > 0 {
591                                insert_chunks.push(chunk);
592                            }
593                        }
594                        let chunks = delete_chunks
595                            .into_iter()
596                            .chain(insert_chunks.into_iter())
597                            .collect();
598
599                        // 2. If user specifies a primary key, compact the chunk based on the **downstream pk**
600                        //    to eliminate any unnecessary updates to external systems. This also rewrites the
601                        //    `DELETE` and `INSERT` operations on the same key into `UPDATE` operations, which
602                        //    usually have more efficient implementation.
603                        if let Some(downstream_pk) = &downstream_pk {
604                            let chunks = dispatch_output_kind!(sink_type, KIND, {
605                                StreamChunkCompactor::new(downstream_pk.clone(), chunks)
606                                    .into_compacted_chunks_reconstructed::<KIND>(
607                                        chunk_size,
608                                        input_data_types.clone(),
609                                        // When compacting based on user provided primary key, we should never panic
610                                        // on inconsistency in case the user provided primary key is not unique.
611                                        InconsistencyBehavior::Warn,
612                                    )
613                            });
614                            for c in chunks {
615                                yield Message::Chunk(c);
616                            }
617                        } else {
618                            let mut chunk_builder =
619                                StreamChunkBuilder::new(chunk_size, input_data_types.clone());
620                            for chunk in chunks {
621                                for (op, row) in chunk.rows() {
622                                    if let Some(c) = chunk_builder.append_row(op, row) {
623                                        yield Message::Chunk(c);
624                                    }
625                                }
626                            }
627
628                            if let Some(c) = chunk_builder.take() {
629                                yield Message::Chunk(c);
630                            }
631                        };
632
633                        // 3. Forward watermark and barrier.
634                        if let Some(w) = mem::take(&mut watermark) {
635                            yield Message::Watermark(w)
636                        }
637                        yield Message::Barrier(barrier);
638                    }
639                }
640            }
641        } else {
642            // In this branch, we don't need to reorder records, either because the stream key matches
643            // the downstream pk, or the sink is append-only.
644            #[for_await]
645            for msg in input {
646                match msg? {
647                    Message::Watermark(w) => yield Message::Watermark(w),
648                    Message::Chunk(mut chunk) => {
649                        // Compact the chunk to eliminate any unnecessary updates to external systems.
650                        // This should be performed against the downstream pk, not the stream key, to
651                        // ensure correct retract/upsert semantics from the downstream's perspective.
652                        if !sink_type.is_append_only()
653                            && let Some(downstream_pk) = &downstream_pk
654                        {
655                            if skip_compact {
656                                // We can only skip compaction if the keys are exactly the same, not just
657                                // matching by being a subset.
658                                assert_eq!(&stream_key, downstream_pk);
659                            } else {
660                                chunk = dispatch_output_kind!(sink_type, KIND, {
661                                    compact_chunk_inline::<KIND>(
662                                        chunk,
663                                        downstream_pk,
664                                        // When compacting based on user provided primary key, we should never panic
665                                        // on inconsistency in case the user provided primary key is not unique.
666                                        InconsistencyBehavior::Warn,
667                                    )
668                                });
669                            }
670                        }
671                        yield Message::Chunk(chunk);
672                    }
673                    Message::Barrier(barrier) => {
674                        yield Message::Barrier(barrier);
675                    }
676                }
677            }
678        }
679    }
680
681    #[expect(clippy::too_many_arguments)]
682    async fn execute_consume_log<S: Sink, R: LogReader>(
683        mut sink: S,
684        log_reader: R,
685        columns: Vec<ColumnCatalog>,
686        mut sink_param: SinkParam,
687        mut sink_writer_param: SinkWriterParam,
688        non_append_only_behavior: Option<NonAppendOnlyBehavior>,
689        actor_context: ActorContextRef,
690        rate_limit_rx: UnboundedReceiver<RateLimit>,
691        mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
692    ) -> StreamExecutorResult<!> {
693        let visible_columns = columns
694            .iter()
695            .enumerate()
696            .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
697            .collect_vec();
698
699        let labels = [
700            &actor_context.id.to_string(),
701            sink_writer_param.connector.as_str(),
702            &sink_writer_param.sink_id.to_string(),
703            sink_writer_param.sink_name.as_str(),
704        ];
705        let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
706            .log_store_reader_wait_new_future_duration_ns
707            .with_guarded_label_values(&labels);
708        let log_store_read_rows = GLOBAL_SINK_METRICS
709            .log_store_read_rows
710            .with_guarded_label_values(&labels);
711        let log_store_read_bytes = GLOBAL_SINK_METRICS
712            .log_store_read_bytes
713            .with_guarded_label_values(&labels);
714        let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
715            .log_store_latest_read_epoch
716            .with_guarded_label_values(&labels);
717        let metrics = LogReaderMetrics {
718            log_store_latest_read_epoch,
719            log_store_read_rows,
720            log_store_read_bytes,
721            log_store_reader_wait_new_future_duration_ns,
722        };
723
724        let downstream_pk = sink_param.downstream_pk.clone();
725
726        let mut log_reader = log_reader
727            .transform_chunk(move |chunk| {
728                let chunk = if let Some(b) = non_append_only_behavior
729                    && b.should_compact_in_log_reader()
730                {
731                    // This guarantees that user has specified a `downstream_pk`.
732                    let downstream_pk = downstream_pk.as_ref().unwrap();
733                    dispatch_output_kind!(sink_param.sink_type, KIND, {
734                        compact_chunk_inline::<KIND>(
735                            chunk,
736                            downstream_pk,
737                            // When compacting based on user provided primary key, we should never panic
738                            // on inconsistency in case the user provided primary key is not unique.
739                            InconsistencyBehavior::Warn,
740                        )
741                    })
742                } else {
743                    chunk
744                };
745                if visible_columns.len() != columns.len() {
746                    // Do projection here because we may have columns that aren't visible to
747                    // the downstream.
748                    chunk.project(&visible_columns)
749                } else {
750                    chunk
751                }
752            })
753            .monitored(metrics)
754            .rate_limited(rate_limit_rx);
755
756        log_reader.init().await?;
757        loop {
758            let future = async {
759                loop {
760                    let Err(e) = sink
761                        .new_log_sinker(sink_writer_param.clone())
762                        .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
763                        .await;
764                    GLOBAL_ERROR_METRICS.user_sink_error.report([
765                        "sink_executor_error".to_owned(),
766                        sink_param.sink_id.to_string(),
767                        sink_param.sink_name.clone(),
768                        actor_context.fragment_id.to_string(),
769                    ]);
770
771                    if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
772                        meta_client
773                            .add_sink_fail_evet_log(
774                                sink_writer_param.sink_id,
775                                sink_writer_param.sink_name.clone(),
776                                sink_writer_param.connector.clone(),
777                                e.to_report_string(),
778                            )
779                            .await;
780                    }
781
782                    if F::ALLOW_REWIND {
783                        match log_reader.rewind().await {
784                            Ok(()) => {
785                                error!(
786                                    error = %e.as_report(),
787                                    executor_id = %sink_writer_param.executor_id,
788                                    sink_id = %sink_param.sink_id,
789                                    "reset log reader stream successfully after sink error"
790                                );
791                                Ok(())
792                            }
793                            Err(rewind_err) => {
794                                error!(
795                                    error = %rewind_err.as_report(),
796                                    "fail to rewind log reader"
797                                );
798                                Err(e)
799                            }
800                        }
801                    } else {
802                        Err(e)
803                    }
804                    .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
805                }
806            };
807            select! {
808                result = future => {
809                    let Err(e): StreamExecutorResult<!> = result;
810                    return Err(e);
811                }
812                result = rebuild_sink_rx.recv() => {
813                    match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
814                        RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
815                            sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
816                            if notify.send(()).is_err() {
817                                warn!("failed to notify rebuild sink");
818                            }
819                            log_reader.init().await?;
820                        },
821                        RebuildSinkMessage::UpdateConfig(config) => {
822                            if F::ALLOW_REWIND {
823                                match log_reader.rewind().await {
824                                    Ok(()) => {
825                                        sink_param.properties.extend(config.into_iter());
826                                        sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
827                                        info!(
828                                            executor_id = %sink_writer_param.executor_id,
829                                            sink_id = %sink_param.sink_id,
830                                            "alter sink config successfully with rewind"
831                                        );
832                                        Ok(())
833                                    }
834                                    Err(rewind_err) => {
835                                        error!(
836                                            error = %rewind_err.as_report(),
837                                            "fail to rewind log reader for alter sink config "
838                                        );
839                                        Err(anyhow!("fail to rewind log after alter table").into())
840                                    }
841                                }
842                            } else {
843                                sink_param.properties.extend(config.into_iter());
844                                sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
845                                Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
846                            }
847                            .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
848                        },
849                    }
850                }
851            }
852        }
853    }
854}
855
856enum RebuildSinkMessage {
857    RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
858    UpdateConfig(HashMap<String, String>),
859}
860
861impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
862    fn execute(self: Box<Self>) -> BoxedMessageStream {
863        self.execute_inner()
864    }
865}
866
867#[cfg(test)]
868mod test {
869    use risingwave_common::catalog::{ColumnDesc, ColumnId};
870    use risingwave_common::util::epoch::test_epoch;
871    use risingwave_connector::sink::build_sink;
872
873    use super::*;
874    use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
875    use crate::executor::test_utils::*;
876
877    #[tokio::test]
878    async fn test_force_append_only_sink() {
879        use risingwave_common::array::StreamChunkTestExt;
880        use risingwave_common::array::stream_chunk::StreamChunk;
881        use risingwave_common::types::DataType;
882
883        use crate::executor::Barrier;
884
885        let properties = maplit::btreemap! {
886            "connector".into() => "blackhole".into(),
887            "type".into() => "append-only".into(),
888            "force_append_only".into() => "true".into()
889        };
890
891        // We have two visible columns and one hidden column. The hidden column will be pruned out
892        // within the sink executor.
893        let columns = vec![
894            ColumnCatalog {
895                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
896                is_hidden: false,
897            },
898            ColumnCatalog {
899                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
900                is_hidden: false,
901            },
902            ColumnCatalog {
903                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
904                is_hidden: true,
905            },
906        ];
907        let schema: Schema = columns
908            .iter()
909            .map(|column| Field::from(column.column_desc.clone()))
910            .collect();
911        let stream_key = vec![0];
912
913        let source = MockSource::with_messages(vec![
914            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
915            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
916                " I I I
917                    + 3 2 1",
918            ))),
919            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
920            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
921                "  I I I
922                    U- 3 2 1
923                    U+ 3 4 1
924                     + 5 6 7",
925            ))),
926            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
927                " I I I
928                    - 5 6 7",
929            ))),
930        ])
931        .into_executor(schema.clone(), stream_key.clone());
932
933        let sink_param = SinkParam {
934            sink_id: 0.into(),
935            sink_name: "test".into(),
936            properties,
937
938            columns: columns
939                .iter()
940                .filter(|col| !col.is_hidden)
941                .map(|col| col.column_desc.clone())
942                .collect(),
943            downstream_pk: Some(stream_key.clone()),
944            sink_type: SinkType::AppendOnly,
945            ignore_delete: true,
946            format_desc: None,
947            db_name: "test".into(),
948            sink_from_name: "test".into(),
949        };
950
951        let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
952
953        let sink = build_sink(sink_param.clone()).unwrap();
954
955        let sink_executor = SinkExecutor::new(
956            ActorContext::for_test(0),
957            info,
958            source,
959            SinkWriterParam::for_test(),
960            sink,
961            sink_param,
962            columns.clone(),
963            BoundedInMemLogStoreFactory::for_test(1),
964            1024,
965            vec![DataType::Int32, DataType::Int32, DataType::Int32],
966            None,
967        )
968        .await
969        .unwrap();
970
971        let mut executor = sink_executor.boxed().execute();
972
973        // Barrier message.
974        executor.next().await.unwrap().unwrap();
975
976        let chunk_msg = executor.next().await.unwrap().unwrap();
977        assert_eq!(
978            chunk_msg.into_chunk().unwrap().compact_vis(),
979            StreamChunk::from_pretty(
980                " I I I
981                + 3 2 1",
982            )
983        );
984
985        // Barrier message.
986        executor.next().await.unwrap().unwrap();
987
988        let chunk_msg = executor.next().await.unwrap().unwrap();
989        assert_eq!(
990            chunk_msg.into_chunk().unwrap().compact_vis(),
991            StreamChunk::from_pretty(
992                " I I I
993                + 3 4 1
994                + 5 6 7",
995            )
996        );
997
998        // Should not receive the third stream chunk message because the force-append-only sink
999        // executor will drop all DELETE messages.
1000
1001        // The last barrier message.
1002        executor.next().await.unwrap().unwrap();
1003    }
1004
1005    #[tokio::test]
1006    async fn stream_key_sink_pk_mismatch_upsert() {
1007        stream_key_sink_pk_mismatch(SinkType::Upsert).await;
1008    }
1009
1010    #[tokio::test]
1011    async fn stream_key_sink_pk_mismatch_retract() {
1012        stream_key_sink_pk_mismatch(SinkType::Retract).await;
1013    }
1014
1015    async fn stream_key_sink_pk_mismatch(sink_type: SinkType) {
1016        use risingwave_common::array::StreamChunkTestExt;
1017        use risingwave_common::array::stream_chunk::StreamChunk;
1018        use risingwave_common::types::DataType;
1019
1020        use crate::executor::Barrier;
1021
1022        let properties = maplit::btreemap! {
1023            "connector".into() => "blackhole".into(),
1024        };
1025
1026        // We have two visible columns and one hidden column. The hidden column will be pruned out
1027        // within the sink executor.
1028        let columns = vec![
1029            ColumnCatalog {
1030                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1031                is_hidden: false,
1032            },
1033            ColumnCatalog {
1034                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1035                is_hidden: false,
1036            },
1037            ColumnCatalog {
1038                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1039                is_hidden: true,
1040            },
1041        ];
1042        let schema: Schema = columns
1043            .iter()
1044            .map(|column| Field::from(column.column_desc.clone()))
1045            .collect();
1046
1047        let source = MockSource::with_messages(vec![
1048            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1049            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1050                " I I I
1051                    + 1 1 10",
1052            ))),
1053            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1054            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1055                " I I I
1056                    + 1 3 30",
1057            ))),
1058            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1059                " I I I
1060                    + 1 2 20
1061                    - 1 2 20",
1062            ))),
1063            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1064                " I I I
1065                    - 1 1 10
1066                    + 1 1 40",
1067            ))),
1068            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1069        ])
1070        .into_executor(schema.clone(), vec![0, 1]);
1071
1072        let sink_param = SinkParam {
1073            sink_id: 0.into(),
1074            sink_name: "test".into(),
1075            properties,
1076
1077            columns: columns
1078                .iter()
1079                .filter(|col| !col.is_hidden)
1080                .map(|col| col.column_desc.clone())
1081                .collect(),
1082            downstream_pk: Some(vec![0]),
1083            sink_type,
1084            ignore_delete: false,
1085            format_desc: None,
1086            db_name: "test".into(),
1087            sink_from_name: "test".into(),
1088        };
1089
1090        let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1091
1092        let sink = build_sink(sink_param.clone()).unwrap();
1093
1094        let sink_executor = SinkExecutor::new(
1095            ActorContext::for_test(0),
1096            info,
1097            source,
1098            SinkWriterParam::for_test(),
1099            sink,
1100            sink_param,
1101            columns.clone(),
1102            BoundedInMemLogStoreFactory::for_test(1),
1103            1024,
1104            vec![DataType::Int64, DataType::Int64, DataType::Int64],
1105            None,
1106        )
1107        .await
1108        .unwrap();
1109
1110        let mut executor = sink_executor.boxed().execute();
1111
1112        // Barrier message.
1113        executor.next().await.unwrap().unwrap();
1114
1115        let chunk_msg = executor.next().await.unwrap().unwrap();
1116        assert_eq!(
1117            chunk_msg.into_chunk().unwrap().compact_vis(),
1118            StreamChunk::from_pretty(
1119                " I I I
1120                + 1 1 10",
1121            )
1122        );
1123
1124        // Barrier message.
1125        executor.next().await.unwrap().unwrap();
1126
1127        let chunk_msg = executor.next().await.unwrap().unwrap();
1128        let expected = match sink_type {
1129            SinkType::Retract => StreamChunk::from_pretty(
1130                " I I I
1131                U- 1 1 10
1132                U+ 1 1 40",
1133            ),
1134            SinkType::Upsert => StreamChunk::from_pretty(
1135                " I I I
1136                + 1 1 40", // For upsert format, there won't be `U- 1 1 10`.
1137            ),
1138            _ => unreachable!(),
1139        };
1140        assert_eq!(chunk_msg.into_chunk().unwrap().compact_vis(), expected);
1141
1142        // The last barrier message.
1143        executor.next().await.unwrap().unwrap();
1144    }
1145
1146    #[tokio::test]
1147    async fn test_empty_barrier_sink() {
1148        use risingwave_common::types::DataType;
1149
1150        use crate::executor::Barrier;
1151
1152        let properties = maplit::btreemap! {
1153            "connector".into() => "blackhole".into(),
1154            "type".into() => "append-only".into(),
1155            "force_append_only".into() => "true".into()
1156        };
1157        let columns = vec![
1158            ColumnCatalog {
1159                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1160                is_hidden: false,
1161            },
1162            ColumnCatalog {
1163                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1164                is_hidden: false,
1165            },
1166        ];
1167        let schema: Schema = columns
1168            .iter()
1169            .map(|column| Field::from(column.column_desc.clone()))
1170            .collect();
1171        let stream_key = vec![0];
1172
1173        let source = MockSource::with_messages(vec![
1174            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1175            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1176            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1177        ])
1178        .into_executor(schema.clone(), stream_key.clone());
1179
1180        let sink_param = SinkParam {
1181            sink_id: 0.into(),
1182            sink_name: "test".into(),
1183            properties,
1184
1185            columns: columns
1186                .iter()
1187                .filter(|col| !col.is_hidden)
1188                .map(|col| col.column_desc.clone())
1189                .collect(),
1190            downstream_pk: Some(stream_key.clone()),
1191            sink_type: SinkType::AppendOnly,
1192            ignore_delete: true,
1193            format_desc: None,
1194            db_name: "test".into(),
1195            sink_from_name: "test".into(),
1196        };
1197
1198        let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
1199
1200        let sink = build_sink(sink_param.clone()).unwrap();
1201
1202        let sink_executor = SinkExecutor::new(
1203            ActorContext::for_test(0),
1204            info,
1205            source,
1206            SinkWriterParam::for_test(),
1207            sink,
1208            sink_param,
1209            columns,
1210            BoundedInMemLogStoreFactory::for_test(1),
1211            1024,
1212            vec![DataType::Int64, DataType::Int64],
1213            None,
1214        )
1215        .await
1216        .unwrap();
1217
1218        let mut executor = sink_executor.boxed().execute();
1219
1220        // Barrier message.
1221        assert_eq!(
1222            executor.next().await.unwrap().unwrap(),
1223            Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1224        );
1225
1226        // Barrier message.
1227        assert_eq!(
1228            executor.next().await.unwrap().unwrap(),
1229            Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1230        );
1231
1232        // The last barrier message.
1233        assert_eq!(
1234            executor.next().await.unwrap().unwrap(),
1235            Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1236        );
1237    }
1238
1239    #[tokio::test]
1240    async fn test_force_compaction() {
1241        use risingwave_common::array::StreamChunkTestExt;
1242        use risingwave_common::array::stream_chunk::StreamChunk;
1243        use risingwave_common::types::DataType;
1244
1245        use crate::executor::Barrier;
1246
1247        let properties = maplit::btreemap! {
1248            "connector".into() => "blackhole".into(),
1249            "force_compaction".into() => "true".into()
1250        };
1251
1252        // We have two visible columns and one hidden column. The hidden column will be pruned out
1253        // within the sink executor.
1254        let columns = vec![
1255            ColumnCatalog {
1256                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1257                is_hidden: false,
1258            },
1259            ColumnCatalog {
1260                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1261                is_hidden: false,
1262            },
1263            ColumnCatalog {
1264                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1265                is_hidden: true,
1266            },
1267        ];
1268        let schema: Schema = columns
1269            .iter()
1270            .map(|column| Field::from(column.column_desc.clone()))
1271            .collect();
1272
1273        let source = MockSource::with_messages(vec![
1274            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1275            Message::Chunk(StreamChunk::from_pretty(
1276                " I I I
1277                    + 1 1 10",
1278            )),
1279            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1280            Message::Chunk(StreamChunk::from_pretty(
1281                " I I I
1282                    + 1 3 30",
1283            )),
1284            Message::Chunk(StreamChunk::from_pretty(
1285                " I I I
1286                    + 1 2 20
1287                    - 1 2 20
1288                    + 1 4 10",
1289            )),
1290            Message::Chunk(StreamChunk::from_pretty(
1291                " I I I
1292                    - 1 1 10
1293                    + 1 1 40",
1294            )),
1295            Message::Chunk(StreamChunk::from_pretty(
1296                " I I I
1297                    - 1 4 30",
1298            )),
1299            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1300        ])
1301        .into_executor(schema.clone(), vec![0, 1]);
1302
1303        let sink_param = SinkParam {
1304            sink_id: 0.into(),
1305            sink_name: "test".into(),
1306            properties,
1307
1308            columns: columns
1309                .iter()
1310                .filter(|col| !col.is_hidden)
1311                .map(|col| col.column_desc.clone())
1312                .collect(),
1313            downstream_pk: Some(vec![0, 1]),
1314            sink_type: SinkType::Upsert,
1315            ignore_delete: false,
1316            format_desc: None,
1317            db_name: "test".into(),
1318            sink_from_name: "test".into(),
1319        };
1320
1321        let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1322
1323        let sink = build_sink(sink_param.clone()).unwrap();
1324
1325        let sink_executor = SinkExecutor::new(
1326            ActorContext::for_test(0),
1327            info,
1328            source,
1329            SinkWriterParam::for_test(),
1330            sink,
1331            sink_param,
1332            columns.clone(),
1333            BoundedInMemLogStoreFactory::for_test(1),
1334            1024,
1335            vec![DataType::Int64, DataType::Int64, DataType::Int64],
1336            None,
1337        )
1338        .await
1339        .unwrap();
1340
1341        let mut executor = sink_executor.boxed().execute();
1342
1343        // Barrier message.
1344        executor.next().await.unwrap().unwrap();
1345
1346        let chunk_msg = executor.next().await.unwrap().unwrap();
1347        assert_eq!(
1348            chunk_msg.into_chunk().unwrap().compact_vis(),
1349            StreamChunk::from_pretty(
1350                " I I I
1351                + 1 1 10",
1352            )
1353        );
1354
1355        // Barrier message.
1356        executor.next().await.unwrap().unwrap();
1357
1358        let chunk_msg = executor.next().await.unwrap().unwrap();
1359        assert_eq!(
1360            chunk_msg.into_chunk().unwrap().compact_vis(),
1361            StreamChunk::from_pretty(
1362                " I I I
1363                + 1 3 30
1364                + 1 1 40",
1365            )
1366        );
1367
1368        // The last barrier message.
1369        executor.next().await.unwrap().unwrap();
1370    }
1371}