risingwave_stream/executor/
sink.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::HashMap;
17use std::mem;
18
19use anyhow::anyhow;
20use futures::stream::select;
21use futures::{FutureExt, TryFutureExt, TryStreamExt};
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::array::stream_chunk::StreamChunkMut;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{ColumnCatalog, Field};
27use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
28use risingwave_common_estimate_size::EstimateSize;
29use risingwave_common_estimate_size::collections::EstimatedVec;
30use risingwave_common_rate_limit::RateLimit;
31use risingwave_connector::dispatch_sink;
32use risingwave_connector::sink::catalog::{SinkId, SinkType};
33use risingwave_connector::sink::log_store::{
34    FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
35    LogWriter, LogWriterExt, LogWriterMetrics,
36};
37use risingwave_connector::sink::{
38    GLOBAL_SINK_METRICS, LogSinker, SINK_USER_FORCE_COMPACTION, Sink, SinkImpl, SinkParam,
39    SinkWriterParam,
40};
41use risingwave_pb::id::FragmentId;
42use risingwave_pb::stream_plan::stream_node::StreamKind;
43use thiserror_ext::AsReport;
44use tokio::select;
45use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
46use tokio::sync::oneshot;
47
48use crate::common::change_buffer::{OutputKind, output_kind};
49use crate::common::compact_chunk::{
50    InconsistencyBehavior, StreamChunkCompactor, compact_chunk_inline,
51};
52use crate::executor::prelude::*;
53pub struct SinkExecutor<F: LogStoreFactory> {
54    actor_context: ActorContextRef,
55    info: ExecutorInfo,
56    input: Executor,
57    sink: SinkImpl,
58    input_columns: Vec<ColumnCatalog>,
59    sink_param: SinkParam,
60    log_store_factory: F,
61    sink_writer_param: SinkWriterParam,
62    chunk_size: usize,
63    input_data_types: Vec<DataType>,
64    non_append_only_behavior: Option<NonAppendOnlyBehavior>,
65    rate_limit: Option<u32>,
66}
67
68// Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT.
69fn force_append_only(c: StreamChunk) -> StreamChunk {
70    let mut c: StreamChunkMut = c.into();
71    for (_, mut r) in c.to_rows_mut() {
72        match r.op() {
73            Op::Insert => {}
74            Op::Delete | Op::UpdateDelete => r.set_vis(false),
75            Op::UpdateInsert => r.set_op(Op::Insert),
76        }
77    }
78    c.into()
79}
80
81// Drop all the INSERT messages in this chunk and convert UPDATE DELETE into DELETE.
82fn force_delete_only(c: StreamChunk) -> StreamChunk {
83    let mut c: StreamChunkMut = c.into();
84    for (_, mut r) in c.to_rows_mut() {
85        match r.op() {
86            Op::Delete => {}
87            Op::Insert | Op::UpdateInsert => r.set_vis(false),
88            Op::UpdateDelete => r.set_op(Op::Delete),
89        }
90    }
91    c.into()
92}
93
94/// When the sink is non-append-only, i.e. upsert or retract, we need to do some extra work for
95/// correctness and performance.
96#[derive(Clone, Copy, Debug)]
97struct NonAppendOnlyBehavior {
98    /// Whether the user specifies a primary key for the sink, and it matches the derived stream
99    /// key of the stream.
100    ///
101    /// By matching, we mean that stream key is a subset of the downstream pk.
102    pk_specified_and_matched: bool,
103    /// Whether the user forces buffering all chunks between two barriers.
104    force_compaction: bool,
105}
106
107impl NonAppendOnlyBehavior {
108    /// NOTE(kwannoel):
109    ///
110    /// After the optimization in <https://github.com/risingwavelabs/risingwave/pull/12250>,
111    /// `DELETE`s will be sequenced before `INSERT`s in JDBC sinks and PG rust sink.
112    /// There's a risk that adjacent chunks with `DELETE`s on the same PK will get
113    /// merged into a single chunk, since the logstore format doesn't preserve chunk
114    /// boundaries. Then we will have double `DELETE`s followed by unspecified sequence
115    /// of `INSERT`s, and lead to inconsistent data downstream.
116    ///
117    /// We only need to do the compaction for non-append-only sinks, when the upstream and
118    /// downstream PKs are matched. When the upstream and downstream PKs are not matched,
119    /// we will buffer the chunks between two barriers, so the compaction is not needed,
120    /// since the barriers will preserve chunk boundaries.
121    ///
122    /// When `force_compaction` is true, we also skip compaction here, since the buffering
123    /// will also make compaction.
124    ///
125    /// When the sink is an append-only sink, it is either `force_append_only` or
126    /// `append_only`, we should only append to downstream, so there should not be any
127    /// overlapping keys.
128    fn should_compact_in_log_reader(self) -> bool {
129        self.pk_specified_and_matched && !self.force_compaction
130    }
131
132    /// When stream key is different from the user defined primary key columns for sinks.
133    /// The operations could be out of order.
134    ///
135    /// For example, we have a stream with derived stream key `a, b` and user-specified sink
136    /// primary key `a`. Assume that we have `(1, 1)` in the table. Then, we perform two `UPDATE`
137    /// operations:
138    ///
139    /// ```text
140    /// UPDATE SET b = 2 WHERE a = 1 ... which issues:
141    ///   - (1, 1)
142    ///   + (1, 2)
143    ///
144    /// UPDATE SET b = 3 WHERE a = 1 ... which issues:
145    ///   - (1, 2)
146    ///   + (1, 3)
147    /// ```
148    ///
149    /// When these changes go into streaming pipeline, they could be shuffled to different parallelism
150    /// (actor), given that they are under different stream keys.
151    ///
152    /// ```text
153    /// Actor 1:
154    /// - (1, 1)
155    ///
156    /// Actor 2:
157    /// + (1, 2)
158    /// - (1, 2)
159    ///
160    /// Actor 3:
161    /// + (1, 3)
162    /// ```
163    ///
164    /// When these records are merged back into sink actor, we may get the records from different
165    /// parallelism in arbitrary order, like:
166    ///
167    /// ```text
168    /// + (1, 2) -- Actor 2, first row
169    /// + (1, 3) -- Actor 3
170    /// - (1, 1) -- Actor 1
171    /// - (1, 2) -- Actor 2, second row
172    /// ```
173    ///
174    /// Note that in terms of stream key (`a, b`), the operations in the order above are completely
175    /// correct, because we are operating on 3 different rows. However, in terms of user defined sink
176    /// primary key `a`, we're violating the unique constraint all the time.
177    ///
178    /// Therefore, in this case, we have to do additional reordering in the sink executor per barrier.
179    /// Specifically, we need to:
180    ///
181    /// First, compact all the changes with the stream key, so we have:
182    /// ```text
183    /// + (1, 3)
184    /// - (1, 1)
185    /// ```
186    ///
187    /// Then, sink all the delete events before sinking all insert events, so we have:
188    /// ```text
189    /// - (1, 1)
190    /// + (1, 3)
191    /// ```
192    /// Since we've compacted the chunk with the stream key, the `DELETE` records survived must be to
193    /// delete an existing row, so we can safely move them to the front. After the deletion is done,
194    /// we can then safely sink the insert events with uniqueness guarantee.
195    ///
196    /// When `force_compaction` is true, we also perform additional reordering to gain the
197    /// benefits of compaction:
198    /// - reduce the number of output messages;
199    /// - emit at most one update per key within a barrier interval, simplifying downstream logic.
200    fn should_reorder_records(self) -> bool {
201        !self.pk_specified_and_matched || self.force_compaction
202    }
203}
204
205/// Get the output kind for chunk compaction based on the given sink type.
206fn compact_output_kind(sink_type: SinkType) -> OutputKind {
207    match sink_type {
208        SinkType::Upsert => output_kind::UPSERT,
209        SinkType::Retract => output_kind::RETRACT,
210        // There won't be any `Update` or `Delete` in the chunk, so it doesn't matter.
211        SinkType::AppendOnly => output_kind::RETRACT,
212    }
213}
214
215/// Dispatch the code block to different output kinds for chunk compaction based on sink type.
216macro_rules! dispatch_output_kind {
217    ($sink_type:expr, $KIND:ident, $body:tt) => {
218        #[allow(unused_braces)]
219        match compact_output_kind($sink_type) {
220            output_kind::UPSERT => {
221                const KIND: OutputKind = output_kind::UPSERT;
222                $body
223            }
224            output_kind::RETRACT => {
225                const KIND: OutputKind = output_kind::RETRACT;
226                $body
227            }
228        }
229    };
230}
231
232impl<F: LogStoreFactory> SinkExecutor<F> {
233    #[allow(clippy::too_many_arguments)]
234    #[expect(clippy::unused_async)]
235    pub async fn new(
236        actor_context: ActorContextRef,
237        info: ExecutorInfo,
238        input: Executor,
239        sink_writer_param: SinkWriterParam,
240        sink: SinkImpl,
241        sink_param: SinkParam,
242        columns: Vec<ColumnCatalog>,
243        log_store_factory: F,
244        chunk_size: usize,
245        input_data_types: Vec<DataType>,
246        rate_limit: Option<u32>,
247    ) -> StreamExecutorResult<Self> {
248        let sink_input_schema: Schema = columns
249            .iter()
250            .map(|column| Field::from(&column.column_desc))
251            .collect();
252
253        if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
254            // Remove the partition column from the schema.
255            assert_eq!(sink_input_schema.data_types(), {
256                let mut data_type = info.schema.data_types();
257                data_type.remove(col_dix);
258                data_type
259            });
260        } else {
261            assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
262        }
263
264        let non_append_only_behavior = if !sink_param.sink_type.is_append_only() {
265            let stream_key = &info.stream_key;
266            let pk_specified_and_matched = (sink_param.downstream_pk.as_ref())
267                .is_some_and(|downstream_pk| stream_key.iter().all(|i| downstream_pk.contains(i)));
268            let force_compaction = sink_param
269                .properties
270                .get(SINK_USER_FORCE_COMPACTION)
271                .map(|v| v.eq_ignore_ascii_case("true"))
272                .unwrap_or(false);
273            Some(NonAppendOnlyBehavior {
274                pk_specified_and_matched,
275                force_compaction,
276            })
277        } else {
278            None
279        };
280
281        tracing::info!(
282            sink_id = %sink_param.sink_id,
283            actor_id = %actor_context.id,
284            ?non_append_only_behavior,
285            "Sink executor info"
286        );
287
288        Ok(Self {
289            actor_context,
290            info,
291            input,
292            sink,
293            input_columns: columns,
294            sink_param,
295            log_store_factory,
296            sink_writer_param,
297            chunk_size,
298            input_data_types,
299            non_append_only_behavior,
300            rate_limit,
301        })
302    }
303
304    fn execute_inner(self) -> BoxedMessageStream {
305        let sink_id = self.sink_param.sink_id;
306        let actor_id = self.actor_context.id;
307        let fragment_id = self.actor_context.fragment_id;
308
309        let stream_key = self.info.stream_key.clone();
310        let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
311            sink_id,
312            actor_id,
313            fragment_id,
314        );
315
316        // When processing upsert stream, we need to tolerate the inconsistency (mismatched `DELETE`
317        // and `INSERT` pairs) when compacting input chunks with derived stream key.
318        let input_compact_ib = if self.input.stream_kind() == StreamKind::Upsert {
319            InconsistencyBehavior::Tolerate
320        } else {
321            InconsistencyBehavior::Panic
322        };
323
324        let input = self.input.execute();
325
326        let input = input.inspect_ok(move |msg| {
327            if let Message::Chunk(c) = msg {
328                metrics.sink_input_row_count.inc_by(c.capacity() as u64);
329                metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
330            }
331        });
332
333        let processed_input = Self::process_msg(
334            input,
335            self.sink_param.sink_type,
336            self.sink_param.ignore_delete,
337            stream_key,
338            self.chunk_size,
339            self.input_data_types,
340            input_compact_ib,
341            self.sink_param.downstream_pk.clone(),
342            self.non_append_only_behavior,
343            metrics.sink_chunk_buffer_size,
344            self.sink.is_blackhole(), // skip compact for blackhole for better benchmark results
345        );
346
347        if self.sink.is_sink_into_table() {
348            // TODO(hzxa21): support rate limit?
349            processed_input.boxed()
350        } else {
351            let labels = [
352                &actor_id.to_string(),
353                &sink_id.to_string(),
354                self.sink_param.sink_name.as_str(),
355            ];
356            let log_store_first_write_epoch = GLOBAL_SINK_METRICS
357                .log_store_first_write_epoch
358                .with_guarded_label_values(&labels);
359            let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
360                .log_store_latest_write_epoch
361                .with_guarded_label_values(&labels);
362            let log_store_write_rows = GLOBAL_SINK_METRICS
363                .log_store_write_rows
364                .with_guarded_label_values(&labels);
365            let log_writer_metrics = LogWriterMetrics {
366                log_store_first_write_epoch,
367                log_store_latest_write_epoch,
368                log_store_write_rows,
369            };
370
371            let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
372            // Init the rate limit
373            rate_limit_tx.send(self.rate_limit.into()).unwrap();
374
375            let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
376
377            self.log_store_factory
378                .build()
379                .map(move |(log_reader, log_writer)| {
380                    let write_log_stream = Self::execute_write_log(
381                        processed_input,
382                        log_writer.monitored(log_writer_metrics),
383                        actor_id,
384                        fragment_id,
385                        sink_id,
386                        rate_limit_tx,
387                        rebuild_sink_tx,
388                    );
389
390                    let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
391                        let consume_log_stream = Self::execute_consume_log(
392                            *sink,
393                            log_reader,
394                            self.input_columns,
395                            self.sink_param,
396                            self.sink_writer_param,
397                            self.non_append_only_behavior,
398                            self.actor_context,
399                            rate_limit_rx,
400                            rebuild_sink_rx,
401                        )
402                        .instrument_await(
403                            await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
404                        )
405                        .map_ok(|never| never); // unify return type to `Message`
406
407                        consume_log_stream.boxed()
408                    });
409                    select(consume_log_stream_future.into_stream(), write_log_stream)
410                })
411                .into_stream()
412                .flatten()
413                .boxed()
414        }
415    }
416
417    #[try_stream(ok = Message, error = StreamExecutorError)]
418    async fn execute_write_log<W: LogWriter>(
419        input: impl MessageStream,
420        mut log_writer: W,
421        actor_id: ActorId,
422        fragment_id: FragmentId,
423        sink_id: SinkId,
424        rate_limit_tx: UnboundedSender<RateLimit>,
425        rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
426    ) {
427        pin_mut!(input);
428        let barrier = expect_first_barrier(&mut input).await?;
429        let epoch_pair = barrier.epoch;
430        let is_pause_on_startup = barrier.is_pause_on_startup();
431        // Propagate the first barrier
432        yield Message::Barrier(barrier);
433
434        log_writer.init(epoch_pair, is_pause_on_startup).await?;
435
436        let mut is_paused = false;
437
438        #[for_await]
439        for msg in input {
440            match msg? {
441                Message::Watermark(w) => yield Message::Watermark(w),
442                Message::Chunk(chunk) => {
443                    assert!(
444                        !is_paused,
445                        "Actor {actor_id} should not receive any data after pause"
446                    );
447                    log_writer.write_chunk(chunk.clone()).await?;
448                    yield Message::Chunk(chunk);
449                }
450                Message::Barrier(barrier) => {
451                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
452                    let schema_change = barrier.as_sink_schema_change(sink_id);
453                    if let Some(schema_change) = &schema_change {
454                        info!(?schema_change, %sink_id, "sink receive schema change");
455                    }
456                    let post_flush = log_writer
457                        .flush_current_epoch(
458                            barrier.epoch.curr,
459                            FlushCurrentEpochOptions {
460                                is_checkpoint: barrier.kind.is_checkpoint(),
461                                new_vnode_bitmap: update_vnode_bitmap.clone(),
462                                is_stop: barrier.is_stop(actor_id),
463                                schema_change,
464                            },
465                        )
466                        .await?;
467
468                    let mutation = barrier.mutation.clone();
469                    yield Message::Barrier(barrier);
470                    if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
471                        && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
472                    {
473                        let (tx, rx) = oneshot::channel();
474                        rebuild_sink_tx
475                            .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
476                            .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
477                        rx.await
478                            .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
479                    }
480                    post_flush.post_yield_barrier().await?;
481
482                    if let Some(mutation) = mutation.as_deref() {
483                        match mutation {
484                            Mutation::Pause => {
485                                log_writer.pause()?;
486                                is_paused = true;
487                            }
488                            Mutation::Resume => {
489                                log_writer.resume()?;
490                                is_paused = false;
491                            }
492                            Mutation::Throttle(fragment_to_apply) => {
493                                if let Some(new_rate_limit) = fragment_to_apply.get(&fragment_id) {
494                                    tracing::info!(
495                                        rate_limit = new_rate_limit,
496                                        "received sink rate limit on actor {actor_id}"
497                                    );
498                                    if let Err(e) = rate_limit_tx.send((*new_rate_limit).into()) {
499                                        error!(
500                                            error = %e.as_report(),
501                                            "fail to send sink ate limit update"
502                                        );
503                                        return Err(StreamExecutorError::from(
504                                            e.to_report_string(),
505                                        ));
506                                    }
507                                }
508                            }
509                            Mutation::ConnectorPropsChange(config) => {
510                                if let Some(map) = config.get(&sink_id.as_raw_id())
511                                    && let Err(e) = rebuild_sink_tx
512                                        .send(RebuildSinkMessage::UpdateConfig(map.clone()))
513                                {
514                                    error!(
515                                        error = %e.as_report(),
516                                        "fail to send sink alter props"
517                                    );
518                                    return Err(StreamExecutorError::from(e.to_report_string()));
519                                }
520                            }
521                            _ => (),
522                        }
523                    }
524                }
525            }
526        }
527    }
528
529    #[allow(clippy::too_many_arguments)]
530    #[try_stream(ok = Message, error = StreamExecutorError)]
531    async fn process_msg(
532        input: impl MessageStream,
533        sink_type: SinkType,
534        ignore_delete: bool,
535        stream_key: StreamKey,
536        chunk_size: usize,
537        input_data_types: Vec<DataType>,
538        input_compact_ib: InconsistencyBehavior,
539        downstream_pk: Option<Vec<usize>>,
540        non_append_only_behavior: Option<NonAppendOnlyBehavior>,
541        sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
542        skip_compact: bool,
543    ) {
544        // To reorder records, we need to buffer chunks of the entire epoch.
545        if let Some(b) = non_append_only_behavior
546            && b.should_reorder_records()
547        {
548            assert_matches!(sink_type, SinkType::Upsert | SinkType::Retract);
549
550            let mut chunk_buffer = EstimatedVec::new();
551            let mut watermark: Option<super::Watermark> = None;
552            #[for_await]
553            for msg in input {
554                match msg? {
555                    Message::Watermark(w) => watermark = Some(w),
556                    Message::Chunk(c) => {
557                        chunk_buffer.push(c);
558                        sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
559                    }
560                    Message::Barrier(barrier) => {
561                        let chunks = mem::take(&mut chunk_buffer).into_inner();
562
563                        // 1. Compact the chunk based on the **stream key**, so that we have at most 2 rows for each
564                        //    stream key. Then, move all delete records to the front.
565                        let mut delete_chunks = vec![];
566                        let mut insert_chunks = vec![];
567
568                        for c in dispatch_output_kind!(sink_type, KIND, {
569                            StreamChunkCompactor::new(stream_key.clone(), chunks)
570                                .into_compacted_chunks_inline::<KIND>(input_compact_ib)
571                        }) {
572                            let chunk = force_delete_only(c.clone());
573                            if chunk.cardinality() > 0 {
574                                delete_chunks.push(chunk);
575                            }
576                            let chunk = force_append_only(c);
577                            if chunk.cardinality() > 0 {
578                                insert_chunks.push(chunk);
579                            }
580                        }
581                        let chunks = delete_chunks
582                            .into_iter()
583                            .chain(insert_chunks.into_iter())
584                            .collect();
585
586                        // 2. If user specifies a primary key, compact the chunk based on the **downstream pk**
587                        //    to eliminate any unnecessary updates to external systems. This also rewrites the
588                        //    `DELETE` and `INSERT` operations on the same key into `UPDATE` operations, which
589                        //    usually have more efficient implementation.
590                        if let Some(downstream_pk) = &downstream_pk {
591                            let chunks = dispatch_output_kind!(sink_type, KIND, {
592                                StreamChunkCompactor::new(downstream_pk.clone(), chunks)
593                                    .into_compacted_chunks_reconstructed::<KIND>(
594                                        chunk_size,
595                                        input_data_types.clone(),
596                                        // When compacting based on user provided primary key, we should never panic
597                                        // on inconsistency in case the user provided primary key is not unique.
598                                        InconsistencyBehavior::Warn,
599                                    )
600                            });
601                            for c in chunks {
602                                yield Message::Chunk(c);
603                            }
604                        } else {
605                            let mut chunk_builder =
606                                StreamChunkBuilder::new(chunk_size, input_data_types.clone());
607                            for chunk in chunks {
608                                for (op, row) in chunk.rows() {
609                                    if let Some(c) = chunk_builder.append_row(op, row) {
610                                        yield Message::Chunk(c);
611                                    }
612                                }
613                            }
614
615                            if let Some(c) = chunk_builder.take() {
616                                yield Message::Chunk(c);
617                            }
618                        };
619
620                        // 3. Forward watermark and barrier.
621                        if let Some(w) = mem::take(&mut watermark) {
622                            yield Message::Watermark(w)
623                        }
624                        yield Message::Barrier(barrier);
625                    }
626                }
627            }
628        } else {
629            // In this branch, we don't need to reorder records, either because the stream key matches
630            // the downstream pk, or the sink is append-only.
631            #[for_await]
632            for msg in input {
633                match msg? {
634                    Message::Watermark(w) => yield Message::Watermark(w),
635                    Message::Chunk(mut chunk) => {
636                        // Compact the chunk to eliminate any unnecessary updates to external systems.
637                        // This should be performed against the downstream pk, not the stream key, to
638                        // ensure correct retract/upsert semantics from the downstream's perspective.
639                        if !sink_type.is_append_only()
640                            && let Some(downstream_pk) = &downstream_pk
641                        {
642                            if skip_compact {
643                                // We can only skip compaction if the keys are exactly the same, not just
644                                // matching by being a subset.
645                                assert_eq!(&stream_key, downstream_pk);
646                            } else {
647                                chunk = dispatch_output_kind!(sink_type, KIND, {
648                                    compact_chunk_inline::<KIND>(
649                                        chunk,
650                                        downstream_pk,
651                                        // When compacting based on user provided primary key, we should never panic
652                                        // on inconsistency in case the user provided primary key is not unique.
653                                        InconsistencyBehavior::Warn,
654                                    )
655                                });
656                            }
657                        }
658                        if ignore_delete {
659                            // Force append-only by dropping UPDATE/DELETE messages. We do this when the
660                            // user forces the sink to be append-only while it is actually not based on
661                            // the frontend derivation result.
662                            chunk = force_append_only(chunk);
663                        }
664                        yield Message::Chunk(chunk);
665                    }
666                    Message::Barrier(barrier) => {
667                        yield Message::Barrier(barrier);
668                    }
669                }
670            }
671        }
672    }
673
674    #[expect(clippy::too_many_arguments)]
675    async fn execute_consume_log<S: Sink, R: LogReader>(
676        mut sink: S,
677        log_reader: R,
678        columns: Vec<ColumnCatalog>,
679        mut sink_param: SinkParam,
680        mut sink_writer_param: SinkWriterParam,
681        non_append_only_behavior: Option<NonAppendOnlyBehavior>,
682        actor_context: ActorContextRef,
683        rate_limit_rx: UnboundedReceiver<RateLimit>,
684        mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
685    ) -> StreamExecutorResult<!> {
686        let visible_columns = columns
687            .iter()
688            .enumerate()
689            .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
690            .collect_vec();
691
692        let labels = [
693            &actor_context.id.to_string(),
694            sink_writer_param.connector.as_str(),
695            &sink_writer_param.sink_id.to_string(),
696            sink_writer_param.sink_name.as_str(),
697        ];
698        let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
699            .log_store_reader_wait_new_future_duration_ns
700            .with_guarded_label_values(&labels);
701        let log_store_read_rows = GLOBAL_SINK_METRICS
702            .log_store_read_rows
703            .with_guarded_label_values(&labels);
704        let log_store_read_bytes = GLOBAL_SINK_METRICS
705            .log_store_read_bytes
706            .with_guarded_label_values(&labels);
707        let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
708            .log_store_latest_read_epoch
709            .with_guarded_label_values(&labels);
710        let metrics = LogReaderMetrics {
711            log_store_latest_read_epoch,
712            log_store_read_rows,
713            log_store_read_bytes,
714            log_store_reader_wait_new_future_duration_ns,
715        };
716
717        let downstream_pk = sink_param.downstream_pk.clone();
718
719        let mut log_reader = log_reader
720            .transform_chunk(move |chunk| {
721                let chunk = if let Some(b) = non_append_only_behavior
722                    && b.should_compact_in_log_reader()
723                {
724                    // This guarantees that user has specified a `downstream_pk`.
725                    let downstream_pk = downstream_pk.as_ref().unwrap();
726                    dispatch_output_kind!(sink_param.sink_type, KIND, {
727                        compact_chunk_inline::<KIND>(
728                            chunk,
729                            downstream_pk,
730                            // When compacting based on user provided primary key, we should never panic
731                            // on inconsistency in case the user provided primary key is not unique.
732                            InconsistencyBehavior::Warn,
733                        )
734                    })
735                } else {
736                    chunk
737                };
738                if visible_columns.len() != columns.len() {
739                    // Do projection here because we may have columns that aren't visible to
740                    // the downstream.
741                    chunk.project(&visible_columns)
742                } else {
743                    chunk
744                }
745            })
746            .monitored(metrics)
747            .rate_limited(rate_limit_rx);
748
749        log_reader.init().await?;
750        loop {
751            let future = async {
752                loop {
753                    let Err(e) = sink
754                        .new_log_sinker(sink_writer_param.clone())
755                        .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
756                        .await;
757                    GLOBAL_ERROR_METRICS.user_sink_error.report([
758                        "sink_executor_error".to_owned(),
759                        sink_param.sink_id.to_string(),
760                        sink_param.sink_name.clone(),
761                        actor_context.fragment_id.to_string(),
762                    ]);
763
764                    if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
765                        meta_client
766                            .add_sink_fail_evet_log(
767                                sink_writer_param.sink_id,
768                                sink_writer_param.sink_name.clone(),
769                                sink_writer_param.connector.clone(),
770                                e.to_report_string(),
771                            )
772                            .await;
773                    }
774
775                    if F::ALLOW_REWIND {
776                        match log_reader.rewind().await {
777                            Ok(()) => {
778                                error!(
779                                    error = %e.as_report(),
780                                    executor_id = %sink_writer_param.executor_id,
781                                    sink_id = %sink_param.sink_id,
782                                    "reset log reader stream successfully after sink error"
783                                );
784                                Ok(())
785                            }
786                            Err(rewind_err) => {
787                                error!(
788                                    error = %rewind_err.as_report(),
789                                    "fail to rewind log reader"
790                                );
791                                Err(e)
792                            }
793                        }
794                    } else {
795                        Err(e)
796                    }
797                    .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
798                }
799            };
800            select! {
801                result = future => {
802                    let Err(e): StreamExecutorResult<!> = result;
803                    return Err(e);
804                }
805                result = rebuild_sink_rx.recv() => {
806                    match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
807                        RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
808                            sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
809                            if notify.send(()).is_err() {
810                                warn!("failed to notify rebuild sink");
811                            }
812                            log_reader.init().await?;
813                        },
814                        RebuildSinkMessage::UpdateConfig(config) => {
815                            if F::ALLOW_REWIND {
816                                match log_reader.rewind().await {
817                                    Ok(()) => {
818                                        sink_param.properties.extend(config.into_iter());
819                                        sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
820                                        info!(
821                                            executor_id = %sink_writer_param.executor_id,
822                                            sink_id = %sink_param.sink_id,
823                                            "alter sink config successfully with rewind"
824                                        );
825                                        Ok(())
826                                    }
827                                    Err(rewind_err) => {
828                                        error!(
829                                            error = %rewind_err.as_report(),
830                                            "fail to rewind log reader for alter sink config "
831                                        );
832                                        Err(anyhow!("fail to rewind log after alter table").into())
833                                    }
834                                }
835                            } else {
836                                sink_param.properties.extend(config.into_iter());
837                                sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
838                                Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
839                            }
840                            .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id)))?;
841                        },
842                    }
843                }
844            }
845        }
846    }
847}
848
849enum RebuildSinkMessage {
850    RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
851    UpdateConfig(HashMap<String, String>),
852}
853
854impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
855    fn execute(self: Box<Self>) -> BoxedMessageStream {
856        self.execute_inner()
857    }
858}
859
860#[cfg(test)]
861mod test {
862    use risingwave_common::catalog::{ColumnDesc, ColumnId};
863    use risingwave_common::util::epoch::test_epoch;
864    use risingwave_connector::sink::build_sink;
865
866    use super::*;
867    use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
868    use crate::executor::test_utils::*;
869
870    #[tokio::test]
871    async fn test_force_append_only_sink() {
872        use risingwave_common::array::StreamChunkTestExt;
873        use risingwave_common::array::stream_chunk::StreamChunk;
874        use risingwave_common::types::DataType;
875
876        use crate::executor::Barrier;
877
878        let properties = maplit::btreemap! {
879            "connector".into() => "blackhole".into(),
880            "type".into() => "append-only".into(),
881            "force_append_only".into() => "true".into()
882        };
883
884        // We have two visible columns and one hidden column. The hidden column will be pruned out
885        // within the sink executor.
886        let columns = vec![
887            ColumnCatalog {
888                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
889                is_hidden: false,
890            },
891            ColumnCatalog {
892                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
893                is_hidden: false,
894            },
895            ColumnCatalog {
896                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
897                is_hidden: true,
898            },
899        ];
900        let schema: Schema = columns
901            .iter()
902            .map(|column| Field::from(column.column_desc.clone()))
903            .collect();
904        let stream_key = vec![0];
905
906        let source = MockSource::with_messages(vec![
907            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
908            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
909                " I I I
910                    + 3 2 1",
911            ))),
912            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
913            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
914                "  I I I
915                    U- 3 2 1
916                    U+ 3 4 1
917                     + 5 6 7",
918            ))),
919            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
920                " I I I
921                    - 5 6 7",
922            ))),
923        ])
924        .into_executor(schema.clone(), stream_key.clone());
925
926        let sink_param = SinkParam {
927            sink_id: 0.into(),
928            sink_name: "test".into(),
929            properties,
930
931            columns: columns
932                .iter()
933                .filter(|col| !col.is_hidden)
934                .map(|col| col.column_desc.clone())
935                .collect(),
936            downstream_pk: Some(stream_key.clone()),
937            sink_type: SinkType::AppendOnly,
938            ignore_delete: true,
939            format_desc: None,
940            db_name: "test".into(),
941            sink_from_name: "test".into(),
942        };
943
944        let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
945
946        let sink = build_sink(sink_param.clone()).unwrap();
947
948        let sink_executor = SinkExecutor::new(
949            ActorContext::for_test(0),
950            info,
951            source,
952            SinkWriterParam::for_test(),
953            sink,
954            sink_param,
955            columns.clone(),
956            BoundedInMemLogStoreFactory::for_test(1),
957            1024,
958            vec![DataType::Int32, DataType::Int32, DataType::Int32],
959            None,
960        )
961        .await
962        .unwrap();
963
964        let mut executor = sink_executor.boxed().execute();
965
966        // Barrier message.
967        executor.next().await.unwrap().unwrap();
968
969        let chunk_msg = executor.next().await.unwrap().unwrap();
970        assert_eq!(
971            chunk_msg.into_chunk().unwrap().compact_vis(),
972            StreamChunk::from_pretty(
973                " I I I
974                + 3 2 1",
975            )
976        );
977
978        // Barrier message.
979        executor.next().await.unwrap().unwrap();
980
981        let chunk_msg = executor.next().await.unwrap().unwrap();
982        assert_eq!(
983            chunk_msg.into_chunk().unwrap().compact_vis(),
984            StreamChunk::from_pretty(
985                " I I I
986                + 3 4 1
987                + 5 6 7",
988            )
989        );
990
991        // Should not receive the third stream chunk message because the force-append-only sink
992        // executor will drop all DELETE messages.
993
994        // The last barrier message.
995        executor.next().await.unwrap().unwrap();
996    }
997
998    #[tokio::test]
999    async fn stream_key_sink_pk_mismatch_upsert() {
1000        stream_key_sink_pk_mismatch(SinkType::Upsert).await;
1001    }
1002
1003    #[tokio::test]
1004    async fn stream_key_sink_pk_mismatch_retract() {
1005        stream_key_sink_pk_mismatch(SinkType::Retract).await;
1006    }
1007
1008    async fn stream_key_sink_pk_mismatch(sink_type: SinkType) {
1009        use risingwave_common::array::StreamChunkTestExt;
1010        use risingwave_common::array::stream_chunk::StreamChunk;
1011        use risingwave_common::types::DataType;
1012
1013        use crate::executor::Barrier;
1014
1015        let properties = maplit::btreemap! {
1016            "connector".into() => "blackhole".into(),
1017        };
1018
1019        // We have two visible columns and one hidden column. The hidden column will be pruned out
1020        // within the sink executor.
1021        let columns = vec![
1022            ColumnCatalog {
1023                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1024                is_hidden: false,
1025            },
1026            ColumnCatalog {
1027                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1028                is_hidden: false,
1029            },
1030            ColumnCatalog {
1031                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1032                is_hidden: true,
1033            },
1034        ];
1035        let schema: Schema = columns
1036            .iter()
1037            .map(|column| Field::from(column.column_desc.clone()))
1038            .collect();
1039
1040        let source = MockSource::with_messages(vec![
1041            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1042            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1043                " I I I
1044                    + 1 1 10",
1045            ))),
1046            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1047            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1048                " I I I
1049                    + 1 3 30",
1050            ))),
1051            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1052                " I I I
1053                    + 1 2 20
1054                    - 1 2 20",
1055            ))),
1056            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
1057                " I I I
1058                    - 1 1 10
1059                    + 1 1 40",
1060            ))),
1061            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1062        ])
1063        .into_executor(schema.clone(), vec![0, 1]);
1064
1065        let sink_param = SinkParam {
1066            sink_id: 0.into(),
1067            sink_name: "test".into(),
1068            properties,
1069
1070            columns: columns
1071                .iter()
1072                .filter(|col| !col.is_hidden)
1073                .map(|col| col.column_desc.clone())
1074                .collect(),
1075            downstream_pk: Some(vec![0]),
1076            sink_type,
1077            ignore_delete: false,
1078            format_desc: None,
1079            db_name: "test".into(),
1080            sink_from_name: "test".into(),
1081        };
1082
1083        let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1084
1085        let sink = build_sink(sink_param.clone()).unwrap();
1086
1087        let sink_executor = SinkExecutor::new(
1088            ActorContext::for_test(0),
1089            info,
1090            source,
1091            SinkWriterParam::for_test(),
1092            sink,
1093            sink_param,
1094            columns.clone(),
1095            BoundedInMemLogStoreFactory::for_test(1),
1096            1024,
1097            vec![DataType::Int64, DataType::Int64, DataType::Int64],
1098            None,
1099        )
1100        .await
1101        .unwrap();
1102
1103        let mut executor = sink_executor.boxed().execute();
1104
1105        // Barrier message.
1106        executor.next().await.unwrap().unwrap();
1107
1108        let chunk_msg = executor.next().await.unwrap().unwrap();
1109        assert_eq!(
1110            chunk_msg.into_chunk().unwrap().compact_vis(),
1111            StreamChunk::from_pretty(
1112                " I I I
1113                + 1 1 10",
1114            )
1115        );
1116
1117        // Barrier message.
1118        executor.next().await.unwrap().unwrap();
1119
1120        let chunk_msg = executor.next().await.unwrap().unwrap();
1121        let expected = match sink_type {
1122            SinkType::Retract => StreamChunk::from_pretty(
1123                " I I I
1124                U- 1 1 10
1125                U+ 1 1 40",
1126            ),
1127            SinkType::Upsert => StreamChunk::from_pretty(
1128                " I I I
1129                + 1 1 40", // For upsert format, there won't be `U- 1 1 10`.
1130            ),
1131            _ => unreachable!(),
1132        };
1133        assert_eq!(chunk_msg.into_chunk().unwrap().compact_vis(), expected);
1134
1135        // The last barrier message.
1136        executor.next().await.unwrap().unwrap();
1137    }
1138
1139    #[tokio::test]
1140    async fn test_empty_barrier_sink() {
1141        use risingwave_common::types::DataType;
1142
1143        use crate::executor::Barrier;
1144
1145        let properties = maplit::btreemap! {
1146            "connector".into() => "blackhole".into(),
1147            "type".into() => "append-only".into(),
1148            "force_append_only".into() => "true".into()
1149        };
1150        let columns = vec![
1151            ColumnCatalog {
1152                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1153                is_hidden: false,
1154            },
1155            ColumnCatalog {
1156                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1157                is_hidden: false,
1158            },
1159        ];
1160        let schema: Schema = columns
1161            .iter()
1162            .map(|column| Field::from(column.column_desc.clone()))
1163            .collect();
1164        let stream_key = vec![0];
1165
1166        let source = MockSource::with_messages(vec![
1167            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1168            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1169            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1170        ])
1171        .into_executor(schema.clone(), stream_key.clone());
1172
1173        let sink_param = SinkParam {
1174            sink_id: 0.into(),
1175            sink_name: "test".into(),
1176            properties,
1177
1178            columns: columns
1179                .iter()
1180                .filter(|col| !col.is_hidden)
1181                .map(|col| col.column_desc.clone())
1182                .collect(),
1183            downstream_pk: Some(stream_key.clone()),
1184            sink_type: SinkType::AppendOnly,
1185            ignore_delete: true,
1186            format_desc: None,
1187            db_name: "test".into(),
1188            sink_from_name: "test".into(),
1189        };
1190
1191        let info = ExecutorInfo::for_test(schema, stream_key, "SinkExecutor".to_owned(), 0);
1192
1193        let sink = build_sink(sink_param.clone()).unwrap();
1194
1195        let sink_executor = SinkExecutor::new(
1196            ActorContext::for_test(0),
1197            info,
1198            source,
1199            SinkWriterParam::for_test(),
1200            sink,
1201            sink_param,
1202            columns,
1203            BoundedInMemLogStoreFactory::for_test(1),
1204            1024,
1205            vec![DataType::Int64, DataType::Int64],
1206            None,
1207        )
1208        .await
1209        .unwrap();
1210
1211        let mut executor = sink_executor.boxed().execute();
1212
1213        // Barrier message.
1214        assert_eq!(
1215            executor.next().await.unwrap().unwrap(),
1216            Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1217        );
1218
1219        // Barrier message.
1220        assert_eq!(
1221            executor.next().await.unwrap().unwrap(),
1222            Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1223        );
1224
1225        // The last barrier message.
1226        assert_eq!(
1227            executor.next().await.unwrap().unwrap(),
1228            Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1229        );
1230    }
1231
1232    #[tokio::test]
1233    async fn test_force_compaction() {
1234        use risingwave_common::array::StreamChunkTestExt;
1235        use risingwave_common::array::stream_chunk::StreamChunk;
1236        use risingwave_common::types::DataType;
1237
1238        use crate::executor::Barrier;
1239
1240        let properties = maplit::btreemap! {
1241            "connector".into() => "blackhole".into(),
1242            "force_compaction".into() => "true".into()
1243        };
1244
1245        // We have two visible columns and one hidden column. The hidden column will be pruned out
1246        // within the sink executor.
1247        let columns = vec![
1248            ColumnCatalog {
1249                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
1250                is_hidden: false,
1251            },
1252            ColumnCatalog {
1253                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1254                is_hidden: false,
1255            },
1256            ColumnCatalog {
1257                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
1258                is_hidden: true,
1259            },
1260        ];
1261        let schema: Schema = columns
1262            .iter()
1263            .map(|column| Field::from(column.column_desc.clone()))
1264            .collect();
1265
1266        let source = MockSource::with_messages(vec![
1267            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1268            Message::Chunk(StreamChunk::from_pretty(
1269                " I I I
1270                    + 1 1 10",
1271            )),
1272            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1273            Message::Chunk(StreamChunk::from_pretty(
1274                " I I I
1275                    + 1 3 30",
1276            )),
1277            Message::Chunk(StreamChunk::from_pretty(
1278                " I I I
1279                    + 1 2 20
1280                    - 1 2 20
1281                    + 1 4 10",
1282            )),
1283            Message::Chunk(StreamChunk::from_pretty(
1284                " I I I
1285                    - 1 1 10
1286                    + 1 1 40",
1287            )),
1288            Message::Chunk(StreamChunk::from_pretty(
1289                " I I I
1290                    - 1 4 30",
1291            )),
1292            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1293        ])
1294        .into_executor(schema.clone(), vec![0, 1]);
1295
1296        let sink_param = SinkParam {
1297            sink_id: 0.into(),
1298            sink_name: "test".into(),
1299            properties,
1300
1301            columns: columns
1302                .iter()
1303                .filter(|col| !col.is_hidden)
1304                .map(|col| col.column_desc.clone())
1305                .collect(),
1306            downstream_pk: Some(vec![0, 1]),
1307            sink_type: SinkType::Upsert,
1308            ignore_delete: false,
1309            format_desc: None,
1310            db_name: "test".into(),
1311            sink_from_name: "test".into(),
1312        };
1313
1314        let info = ExecutorInfo::for_test(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
1315
1316        let sink = build_sink(sink_param.clone()).unwrap();
1317
1318        let sink_executor = SinkExecutor::new(
1319            ActorContext::for_test(0),
1320            info,
1321            source,
1322            SinkWriterParam::for_test(),
1323            sink,
1324            sink_param,
1325            columns.clone(),
1326            BoundedInMemLogStoreFactory::for_test(1),
1327            1024,
1328            vec![DataType::Int64, DataType::Int64, DataType::Int64],
1329            None,
1330        )
1331        .await
1332        .unwrap();
1333
1334        let mut executor = sink_executor.boxed().execute();
1335
1336        // Barrier message.
1337        executor.next().await.unwrap().unwrap();
1338
1339        let chunk_msg = executor.next().await.unwrap().unwrap();
1340        assert_eq!(
1341            chunk_msg.into_chunk().unwrap().compact_vis(),
1342            StreamChunk::from_pretty(
1343                " I I I
1344                + 1 1 10",
1345            )
1346        );
1347
1348        // Barrier message.
1349        executor.next().await.unwrap().unwrap();
1350
1351        let chunk_msg = executor.next().await.unwrap().unwrap();
1352        assert_eq!(
1353            chunk_msg.into_chunk().unwrap().compact_vis(),
1354            StreamChunk::from_pretty(
1355                " I I I
1356                + 1 3 30
1357                + 1 1 40",
1358            )
1359        );
1360
1361        // The last barrier message.
1362        executor.next().await.unwrap().unwrap();
1363    }
1364}