risingwave_stream/executor/
sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::mem;
17
18use anyhow::anyhow;
19use futures::stream::select;
20use futures::{FutureExt, TryFutureExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::array::Op;
23use risingwave_common::array::stream_chunk::StreamChunkMut;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{ColumnCatalog, Field};
26use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntGauge};
27use risingwave_common_estimate_size::EstimateSize;
28use risingwave_common_estimate_size::collections::EstimatedVec;
29use risingwave_common_rate_limit::RateLimit;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::catalog::{SinkId, SinkType};
32use risingwave_connector::sink::log_store::{
33    FlushCurrentEpochOptions, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory,
34    LogWriter, LogWriterExt, LogWriterMetrics,
35};
36use risingwave_connector::sink::{
37    GLOBAL_SINK_METRICS, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam,
38};
39use thiserror_ext::AsReport;
40use tokio::select;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::oneshot;
43
44use crate::common::compact_chunk::{StreamChunkCompactor, merge_chunk_row};
45use crate::executor::prelude::*;
46pub struct SinkExecutor<F: LogStoreFactory> {
47    actor_context: ActorContextRef,
48    info: ExecutorInfo,
49    input: Executor,
50    sink: SinkImpl,
51    input_columns: Vec<ColumnCatalog>,
52    sink_param: SinkParam,
53    log_store_factory: F,
54    sink_writer_param: SinkWriterParam,
55    chunk_size: usize,
56    input_data_types: Vec<DataType>,
57    need_advance_delete: bool,
58    re_construct_with_sink_pk: bool,
59    /// Upstream and Downstream PK matched.
60    pk_matched: bool,
61    compact_chunk: bool,
62    rate_limit: Option<u32>,
63}
64
65// Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT.
66fn force_append_only(c: StreamChunk) -> StreamChunk {
67    let mut c: StreamChunkMut = c.into();
68    for (_, mut r) in c.to_rows_mut() {
69        match r.op() {
70            Op::Insert => {}
71            Op::Delete | Op::UpdateDelete => r.set_vis(false),
72            Op::UpdateInsert => r.set_op(Op::Insert),
73        }
74    }
75    c.into()
76}
77
78// Drop all the INSERT messages in this chunk and convert UPDATE DELETE into DELETE.
79fn force_delete_only(c: StreamChunk) -> StreamChunk {
80    let mut c: StreamChunkMut = c.into();
81    for (_, mut r) in c.to_rows_mut() {
82        match r.op() {
83            Op::Delete => {}
84            Op::Insert | Op::UpdateInsert => r.set_vis(false),
85            Op::UpdateDelete => r.set_op(Op::Delete),
86        }
87    }
88    c.into()
89}
90
91impl<F: LogStoreFactory> SinkExecutor<F> {
92    #[allow(clippy::too_many_arguments)]
93    #[expect(clippy::unused_async)]
94    pub async fn new(
95        actor_context: ActorContextRef,
96        info: ExecutorInfo,
97        input: Executor,
98        sink_writer_param: SinkWriterParam,
99        sink: SinkImpl,
100        sink_param: SinkParam,
101        columns: Vec<ColumnCatalog>,
102        log_store_factory: F,
103        chunk_size: usize,
104        input_data_types: Vec<DataType>,
105        rate_limit: Option<u32>,
106    ) -> StreamExecutorResult<Self> {
107        let sink_input_schema: Schema = columns
108            .iter()
109            .map(|column| Field::from(&column.column_desc))
110            .collect();
111
112        if let Some(col_dix) = sink_writer_param.extra_partition_col_idx {
113            // Remove the partition column from the schema.
114            assert_eq!(sink_input_schema.data_types(), {
115                let mut data_type = info.schema.data_types();
116                data_type.remove(col_dix);
117                data_type
118            });
119        } else {
120            assert_eq!(sink_input_schema.data_types(), info.schema.data_types());
121        }
122
123        let stream_key = info.pk_indices.clone();
124        let stream_key_sink_pk_mismatch = {
125            stream_key
126                .iter()
127                .any(|i| !sink_param.downstream_pk.contains(i))
128        };
129        // When stream key is different from the user defined primary key columns for sinks. The operations could be out of order
130        // stream key: a,b
131        // sink pk: a
132
133        // original:
134        // (1,1) -> (1,2)
135        // (1,2) -> (1,3)
136
137        // mv fragment 1:
138        // delete (1,1)
139
140        // mv fragment 2:
141        // insert (1,2)
142        // delete (1,2)
143
144        // mv fragment 3:
145        // insert (1,3)
146
147        // merge to sink fragment:
148        // insert (1,3)
149        // insert (1,2)
150        // delete (1,2)
151        // delete (1,1)
152        // So we do additional compaction in the sink executor per barrier.
153
154        //     1. compact all the changes with the stream key.
155        //     2. sink all the delete events and then sink all insert events.
156
157        // after compacting with the stream key, the two event with the same user defined sink pk must have different stream key.
158        // So the delete event is not to delete the inserted record in our internal streaming SQL semantic.
159        let need_advance_delete = stream_key_sink_pk_mismatch
160            && !matches!(
161                sink_param.sink_type,
162                SinkType::AppendOnly | SinkType::ForceAppendOnly
163            );
164        // NOTE(st1page): reconstruct with sink pk need extra cost to buffer a barrier's data, so currently we bind it with mismatch case.
165        let re_construct_with_sink_pk = need_advance_delete
166            && sink_param.sink_type == SinkType::Upsert
167            && !sink_param.downstream_pk.is_empty();
168        let pk_matched = !stream_key_sink_pk_mismatch;
169        // Don't compact chunk for blackhole sink for better benchmark performance.
170        let compact_chunk = !sink.is_blackhole();
171
172        tracing::info!(
173            sink_id = sink_param.sink_id.sink_id,
174            actor_id = actor_context.id,
175            need_advance_delete,
176            re_construct_with_sink_pk,
177            pk_matched,
178            compact_chunk,
179            "Sink executor info"
180        );
181
182        Ok(Self {
183            actor_context,
184            info,
185            input,
186            sink,
187            input_columns: columns,
188            sink_param,
189            log_store_factory,
190            sink_writer_param,
191            chunk_size,
192            input_data_types,
193            need_advance_delete,
194            re_construct_with_sink_pk,
195            pk_matched,
196            compact_chunk,
197            rate_limit,
198        })
199    }
200
201    fn execute_inner(self) -> BoxedMessageStream {
202        let sink_id = self.sink_param.sink_id;
203        let actor_id = self.actor_context.id;
204        let fragment_id = self.actor_context.fragment_id;
205
206        let stream_key = self.info.pk_indices.clone();
207        let metrics = self.actor_context.streaming_metrics.new_sink_exec_metrics(
208            sink_id,
209            actor_id,
210            fragment_id,
211        );
212
213        let input = self.input.execute();
214
215        let input = input.inspect_ok(move |msg| {
216            if let Message::Chunk(c) = msg {
217                metrics.sink_input_row_count.inc_by(c.capacity() as u64);
218                metrics.sink_input_bytes.inc_by(c.estimated_size() as u64);
219            }
220        });
221
222        let processed_input = Self::process_msg(
223            input,
224            self.sink_param.sink_type,
225            stream_key,
226            self.need_advance_delete,
227            self.re_construct_with_sink_pk,
228            self.chunk_size,
229            self.input_data_types,
230            self.sink_param.downstream_pk.clone(),
231            metrics.sink_chunk_buffer_size,
232            self.compact_chunk,
233        );
234
235        if self.sink.is_sink_into_table() {
236            // TODO(hzxa21): support rate limit?
237            processed_input.boxed()
238        } else {
239            let labels = [
240                &actor_id.to_string(),
241                &sink_id.to_string(),
242                self.sink_param.sink_name.as_str(),
243            ];
244            let log_store_first_write_epoch = GLOBAL_SINK_METRICS
245                .log_store_first_write_epoch
246                .with_guarded_label_values(&labels);
247            let log_store_latest_write_epoch = GLOBAL_SINK_METRICS
248                .log_store_latest_write_epoch
249                .with_guarded_label_values(&labels);
250            let log_store_write_rows = GLOBAL_SINK_METRICS
251                .log_store_write_rows
252                .with_guarded_label_values(&labels);
253            let log_writer_metrics = LogWriterMetrics {
254                log_store_first_write_epoch,
255                log_store_latest_write_epoch,
256                log_store_write_rows,
257            };
258
259            let (rate_limit_tx, rate_limit_rx) = unbounded_channel();
260            // Init the rate limit
261            rate_limit_tx.send(self.rate_limit.into()).unwrap();
262
263            let (rebuild_sink_tx, rebuild_sink_rx) = unbounded_channel();
264
265            self.log_store_factory
266                .build()
267                .map(move |(log_reader, log_writer)| {
268                    let write_log_stream = Self::execute_write_log(
269                        processed_input,
270                        log_writer.monitored(log_writer_metrics),
271                        actor_id,
272                        sink_id,
273                        rate_limit_tx,
274                        rebuild_sink_tx,
275                    );
276
277                    let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
278                        let consume_log_stream = Self::execute_consume_log(
279                            *sink,
280                            log_reader,
281                            self.input_columns,
282                            self.sink_param,
283                            self.sink_writer_param,
284                            self.pk_matched,
285                            self.actor_context,
286                            rate_limit_rx,
287                            rebuild_sink_rx,
288                        )
289                        .instrument_await(
290                            await_tree::span!("consume_log (sink_id {sink_id})").long_running(),
291                        )
292                        .map_ok(|never| never); // unify return type to `Message`
293
294                        consume_log_stream.boxed()
295                    });
296                    select(consume_log_stream_future.into_stream(), write_log_stream)
297                })
298                .into_stream()
299                .flatten()
300                .boxed()
301        }
302    }
303
304    #[try_stream(ok = Message, error = StreamExecutorError)]
305    async fn execute_write_log<W: LogWriter>(
306        input: impl MessageStream,
307        mut log_writer: W,
308        actor_id: ActorId,
309        sink_id: SinkId,
310        rate_limit_tx: UnboundedSender<RateLimit>,
311        rebuild_sink_tx: UnboundedSender<RebuildSinkMessage>,
312    ) {
313        pin_mut!(input);
314        let barrier = expect_first_barrier(&mut input).await?;
315        let epoch_pair = barrier.epoch;
316        let is_pause_on_startup = barrier.is_pause_on_startup();
317        // Propagate the first barrier
318        yield Message::Barrier(barrier);
319
320        log_writer.init(epoch_pair, is_pause_on_startup).await?;
321
322        let mut is_paused = false;
323
324        #[for_await]
325        for msg in input {
326            match msg? {
327                Message::Watermark(w) => yield Message::Watermark(w),
328                Message::Chunk(chunk) => {
329                    assert!(
330                        !is_paused,
331                        "Actor {actor_id} should not receive any data after pause"
332                    );
333                    log_writer.write_chunk(chunk.clone()).await?;
334                    yield Message::Chunk(chunk);
335                }
336                Message::Barrier(barrier) => {
337                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
338                    let post_flush = log_writer
339                        .flush_current_epoch(
340                            barrier.epoch.curr,
341                            FlushCurrentEpochOptions {
342                                is_checkpoint: barrier.kind.is_checkpoint(),
343                                new_vnode_bitmap: update_vnode_bitmap.clone(),
344                                is_stop: barrier.is_stop(actor_id),
345                            },
346                        )
347                        .await?;
348
349                    let mutation = barrier.mutation.clone();
350                    yield Message::Barrier(barrier);
351                    if F::REBUILD_SINK_ON_UPDATE_VNODE_BITMAP
352                        && let Some(new_vnode_bitmap) = update_vnode_bitmap.clone()
353                    {
354                        let (tx, rx) = oneshot::channel();
355                        rebuild_sink_tx
356                            .send(RebuildSinkMessage::RebuildSink(new_vnode_bitmap, tx))
357                            .map_err(|_| anyhow!("fail to send rebuild sink to reader"))?;
358                        rx.await
359                            .map_err(|_| anyhow!("fail to wait rebuild sink finish"))?;
360                    }
361                    post_flush.post_yield_barrier().await?;
362
363                    if let Some(mutation) = mutation.as_deref() {
364                        match mutation {
365                            Mutation::Pause => {
366                                log_writer.pause()?;
367                                is_paused = true;
368                            }
369                            Mutation::Resume => {
370                                log_writer.resume()?;
371                                is_paused = false;
372                            }
373                            Mutation::Throttle(actor_to_apply) => {
374                                if let Some(new_rate_limit) = actor_to_apply.get(&actor_id) {
375                                    tracing::info!(
376                                        rate_limit = new_rate_limit,
377                                        "received sink rate limit on actor {actor_id}"
378                                    );
379                                    if let Err(e) = rate_limit_tx.send((*new_rate_limit).into()) {
380                                        error!(
381                                            error = %e.as_report(),
382                                            "fail to send sink ate limit update"
383                                        );
384                                        return Err(StreamExecutorError::from(
385                                            e.to_report_string(),
386                                        ));
387                                    }
388                                }
389                            }
390                            Mutation::ConnectorPropsChange(config) => {
391                                if let Some(map) = config.get(&sink_id.sink_id)
392                                    && let Err(e) = rebuild_sink_tx
393                                        .send(RebuildSinkMessage::UpdateConfig(map.clone()))
394                                {
395                                    error!(
396                                        error = %e.as_report(),
397                                        "fail to send sink alter props"
398                                    );
399                                    return Err(StreamExecutorError::from(e.to_report_string()));
400                                }
401                            }
402                            _ => (),
403                        }
404                    }
405                }
406            }
407        }
408    }
409
410    #[allow(clippy::too_many_arguments)]
411    #[try_stream(ok = Message, error = StreamExecutorError)]
412    async fn process_msg(
413        input: impl MessageStream,
414        sink_type: SinkType,
415        stream_key: PkIndices,
416        need_advance_delete: bool,
417        re_construct_with_sink_pk: bool,
418        chunk_size: usize,
419        input_data_types: Vec<DataType>,
420        down_stream_pk: Vec<usize>,
421        sink_chunk_buffer_size_metrics: LabelGuardedIntGauge,
422        compact_chunk: bool,
423    ) {
424        // need to buffer chunks during one barrier
425        if need_advance_delete || re_construct_with_sink_pk {
426            let mut chunk_buffer = EstimatedVec::new();
427            let mut watermark: Option<super::Watermark> = None;
428            #[for_await]
429            for msg in input {
430                match msg? {
431                    Message::Watermark(w) => watermark = Some(w),
432                    Message::Chunk(c) => {
433                        chunk_buffer.push(c);
434
435                        sink_chunk_buffer_size_metrics.set(chunk_buffer.estimated_size() as i64);
436                    }
437                    Message::Barrier(barrier) => {
438                        let chunks = mem::take(&mut chunk_buffer).into_inner();
439                        let chunks = if need_advance_delete {
440                            let mut delete_chunks = vec![];
441                            let mut insert_chunks = vec![];
442
443                            for c in StreamChunkCompactor::new(stream_key.clone(), chunks)
444                                .into_compacted_chunks()
445                            {
446                                // We only enter the branch if need_advance_delete, in which case `sink_type` is not ForceAppendOnly or AppendOnly.
447                                {
448                                    // Force append-only by dropping UPDATE/DELETE messages. We do this when the
449                                    // user forces the sink to be append-only while it is actually not based on
450                                    // the frontend derivation result.
451                                    let chunk = force_delete_only(c.clone());
452                                    if chunk.cardinality() > 0 {
453                                        delete_chunks.push(chunk);
454                                    }
455                                }
456                                let chunk = force_append_only(c);
457                                if chunk.cardinality() > 0 {
458                                    insert_chunks.push(chunk);
459                                }
460                            }
461                            delete_chunks
462                                .into_iter()
463                                .chain(insert_chunks.into_iter())
464                                .collect()
465                        } else {
466                            chunks
467                        };
468                        if re_construct_with_sink_pk {
469                            let chunks = StreamChunkCompactor::new(down_stream_pk.clone(), chunks)
470                                .reconstructed_compacted_chunks(
471                                    chunk_size,
472                                    input_data_types.clone(),
473                                    sink_type != SinkType::ForceAppendOnly,
474                                );
475                            for c in chunks {
476                                yield Message::Chunk(c);
477                            }
478                        } else {
479                            let mut chunk_builder =
480                                StreamChunkBuilder::new(chunk_size, input_data_types.clone());
481                            for chunk in chunks {
482                                for (op, row) in chunk.rows() {
483                                    if let Some(c) = chunk_builder.append_row(op, row) {
484                                        yield Message::Chunk(c);
485                                    }
486                                }
487                            }
488
489                            if let Some(c) = chunk_builder.take() {
490                                yield Message::Chunk(c);
491                            }
492                        };
493
494                        if let Some(w) = mem::take(&mut watermark) {
495                            yield Message::Watermark(w)
496                        }
497                        yield Message::Barrier(barrier);
498                    }
499                }
500            }
501        } else {
502            #[for_await]
503            for msg in input {
504                match msg? {
505                    Message::Watermark(w) => yield Message::Watermark(w),
506                    Message::Chunk(mut chunk) => {
507                        // Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE
508                        // V->V).
509                        if compact_chunk {
510                            chunk = merge_chunk_row(chunk, &stream_key);
511                        }
512                        match sink_type {
513                            SinkType::AppendOnly => yield Message::Chunk(chunk),
514                            SinkType::ForceAppendOnly => {
515                                // Force append-only by dropping UPDATE/DELETE messages. We do this when the
516                                // user forces the sink to be append-only while it is actually not based on
517                                // the frontend derivation result.
518                                yield Message::Chunk(force_append_only(chunk))
519                            }
520                            SinkType::Upsert => yield Message::Chunk(chunk),
521                        }
522                    }
523                    Message::Barrier(barrier) => {
524                        yield Message::Barrier(barrier);
525                    }
526                }
527            }
528        }
529    }
530
531    #[expect(clippy::too_many_arguments)]
532    async fn execute_consume_log<S: Sink, R: LogReader>(
533        mut sink: S,
534        log_reader: R,
535        columns: Vec<ColumnCatalog>,
536        mut sink_param: SinkParam,
537        mut sink_writer_param: SinkWriterParam,
538        pk_matched: bool,
539        actor_context: ActorContextRef,
540        rate_limit_rx: UnboundedReceiver<RateLimit>,
541        mut rebuild_sink_rx: UnboundedReceiver<RebuildSinkMessage>,
542    ) -> StreamExecutorResult<!> {
543        let visible_columns = columns
544            .iter()
545            .enumerate()
546            .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx))
547            .collect_vec();
548
549        let labels = [
550            &actor_context.id.to_string(),
551            sink_writer_param.connector.as_str(),
552            &sink_writer_param.sink_id.to_string(),
553            sink_writer_param.sink_name.as_str(),
554        ];
555        let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS
556            .log_store_reader_wait_new_future_duration_ns
557            .with_guarded_label_values(&labels);
558        let log_store_read_rows = GLOBAL_SINK_METRICS
559            .log_store_read_rows
560            .with_guarded_label_values(&labels);
561        let log_store_read_bytes = GLOBAL_SINK_METRICS
562            .log_store_read_bytes
563            .with_guarded_label_values(&labels);
564        let log_store_latest_read_epoch = GLOBAL_SINK_METRICS
565            .log_store_latest_read_epoch
566            .with_guarded_label_values(&labels);
567        let metrics = LogReaderMetrics {
568            log_store_latest_read_epoch,
569            log_store_read_rows,
570            log_store_read_bytes,
571            log_store_reader_wait_new_future_duration_ns,
572        };
573
574        let downstream_pk = sink_param.downstream_pk.clone();
575
576        let mut log_reader = log_reader
577            .transform_chunk(move |chunk| {
578                // NOTE(kwannoel):
579                // After the optimization in https://github.com/risingwavelabs/risingwave/pull/12250,
580                // `DELETE`s will be sequenced before `INSERT`s in JDBC sinks and PG rust sink.
581                // There's a risk that adjacent chunks with `DELETE`s on the same PK will get
582                // merged into a single chunk, since the logstore format doesn't preserve chunk
583                // boundaries. Then we will have double `DELETE`s followed by unspecified sequence
584                // of `INSERT`s, and lead to inconsistent data downstream.
585                //
586                // We only need to do the compaction for upsert sinks, when the upstream and
587                // downstream PKs are matched. When the upstream and downstream PKs are not matched,
588                // we will buffer the chunks between two barriers, so the compaction is not needed,
589                // since the barriers will preserve chunk boundaries.
590                //
591                // When the sink is not an upsert sink, it is either `force_append_only` or
592                // `append_only`, we should only append to downstream, so there should not be any
593                // overlapping keys.
594                let chunk = if pk_matched && matches!(sink_param.sink_type, SinkType::Upsert) {
595                    merge_chunk_row(chunk, &downstream_pk)
596                } else {
597                    chunk
598                };
599                if visible_columns.len() != columns.len() {
600                    // Do projection here because we may have columns that aren't visible to
601                    // the downstream.
602                    chunk.project(&visible_columns)
603                } else {
604                    chunk
605                }
606            })
607            .monitored(metrics)
608            .rate_limited(rate_limit_rx);
609
610        log_reader.init().await?;
611        loop {
612            let future = async {
613                loop {
614                    let Err(e) = sink
615                        .new_log_sinker(sink_writer_param.clone())
616                        .and_then(|log_sinker| log_sinker.consume_log_and_sink(&mut log_reader))
617                        .await;
618                    GLOBAL_ERROR_METRICS.user_sink_error.report([
619                        "sink_executor_error".to_owned(),
620                        sink_param.sink_id.to_string(),
621                        sink_param.sink_name.clone(),
622                        actor_context.fragment_id.to_string(),
623                    ]);
624
625                    if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
626                        meta_client
627                            .add_sink_fail_evet_log(
628                                sink_writer_param.sink_id.sink_id,
629                                sink_writer_param.sink_name.clone(),
630                                sink_writer_param.connector.clone(),
631                                e.to_report_string(),
632                            )
633                            .await;
634                    }
635
636                    if F::ALLOW_REWIND {
637                        match log_reader.rewind().await {
638                            Ok(()) => {
639                                warn!(
640                                    error = %e.as_report(),
641                                    executor_id = sink_writer_param.executor_id,
642                                    sink_id = sink_param.sink_id.sink_id,
643                                    "reset log reader stream successfully after sink error"
644                                );
645                                Ok(())
646                            }
647                            Err(rewind_err) => {
648                                error!(
649                                    error = %rewind_err.as_report(),
650                                    "fail to rewind log reader"
651                                );
652                                Err(e)
653                            }
654                        }
655                    } else {
656                        Err(e)
657                    }
658                    .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
659                }
660            };
661            select! {
662                result = future => {
663                    let Err(e): StreamExecutorResult<!> = result;
664                    return Err(e);
665                }
666                result = rebuild_sink_rx.recv() => {
667                    match result.ok_or_else(|| anyhow!("failed to receive rebuild sink notify"))? {
668                        RebuildSinkMessage::RebuildSink(new_vnode, notify) => {
669                            sink_writer_param.vnode_bitmap = Some((*new_vnode).clone());
670                            if notify.send(()).is_err() {
671                                warn!("failed to notify rebuild sink");
672                            }
673                            log_reader.init().await?;
674                        },
675                        RebuildSinkMessage::UpdateConfig(config) => {
676                            if F::ALLOW_REWIND {
677                                match log_reader.rewind().await {
678                                    Ok(()) => {
679                                        sink_param.properties = config.into_iter().collect();
680                                        sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
681                                        info!(
682                                            executor_id = sink_writer_param.executor_id,
683                                            sink_id = sink_param.sink_id.sink_id,
684                                            "alter sink config successfully with rewind"
685                                        );
686                                        Ok(())
687                                    }
688                                    Err(rewind_err) => {
689                                        error!(
690                                            error = %rewind_err.as_report(),
691                                            "fail to rewind log reader for alter sink config "
692                                        );
693                                        Err(anyhow!("fail to rewind log after alter table").into())
694                                    }
695                                }
696                            } else {
697                                sink_param.properties = config.into_iter().collect();
698                                sink = TryFrom::try_from(sink_param.clone()).map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
699                                Err(anyhow!("This is not an actual error condition. The system is intentionally triggering recovery procedures to ensure ALTER SINK CONFIG are fully applied.").into())
700                            }
701                            .map_err(|e| StreamExecutorError::from((e, sink_param.sink_id.sink_id)))?;
702                        },
703                    }
704                }
705            }
706        }
707    }
708}
709
710enum RebuildSinkMessage {
711    RebuildSink(Arc<Bitmap>, oneshot::Sender<()>),
712    UpdateConfig(HashMap<String, String>),
713}
714
715impl<F: LogStoreFactory> Execute for SinkExecutor<F> {
716    fn execute(self: Box<Self>) -> BoxedMessageStream {
717        self.execute_inner()
718    }
719}
720
721#[cfg(test)]
722mod test {
723    use risingwave_common::catalog::{ColumnDesc, ColumnId};
724    use risingwave_common::util::epoch::test_epoch;
725    use risingwave_connector::sink::build_sink;
726
727    use super::*;
728    use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
729    use crate::executor::test_utils::*;
730
731    #[tokio::test]
732    async fn test_force_append_only_sink() {
733        use risingwave_common::array::StreamChunkTestExt;
734        use risingwave_common::array::stream_chunk::StreamChunk;
735        use risingwave_common::types::DataType;
736
737        use crate::executor::Barrier;
738
739        let properties = maplit::btreemap! {
740            "connector".into() => "blackhole".into(),
741            "type".into() => "append-only".into(),
742            "force_append_only".into() => "true".into()
743        };
744
745        // We have two visible columns and one hidden column. The hidden column will be pruned out
746        // within the sink executor.
747        let columns = vec![
748            ColumnCatalog {
749                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
750                is_hidden: false,
751            },
752            ColumnCatalog {
753                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
754                is_hidden: false,
755            },
756            ColumnCatalog {
757                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
758                is_hidden: true,
759            },
760        ];
761        let schema: Schema = columns
762            .iter()
763            .map(|column| Field::from(column.column_desc.clone()))
764            .collect();
765        let pk_indices = vec![0];
766
767        let source = MockSource::with_messages(vec![
768            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
769            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
770                " I I I
771                    + 3 2 1",
772            ))),
773            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
774            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
775                "  I I I
776                    U- 3 2 1
777                    U+ 3 4 1
778                     + 5 6 7",
779            ))),
780            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
781                " I I I
782                    - 5 6 7",
783            ))),
784        ])
785        .into_executor(schema.clone(), pk_indices.clone());
786
787        let sink_param = SinkParam {
788            sink_id: 0.into(),
789            sink_name: "test".into(),
790            properties,
791
792            columns: columns
793                .iter()
794                .filter(|col| !col.is_hidden)
795                .map(|col| col.column_desc.clone())
796                .collect(),
797            downstream_pk: pk_indices.clone(),
798            sink_type: SinkType::ForceAppendOnly,
799            format_desc: None,
800            db_name: "test".into(),
801            sink_from_name: "test".into(),
802        };
803
804        let info = ExecutorInfo::new(schema, pk_indices, "SinkExecutor".to_owned(), 0);
805
806        let sink = build_sink(sink_param.clone()).unwrap();
807
808        let sink_executor = SinkExecutor::new(
809            ActorContext::for_test(0),
810            info,
811            source,
812            SinkWriterParam::for_test(),
813            sink,
814            sink_param,
815            columns.clone(),
816            BoundedInMemLogStoreFactory::new(1),
817            1024,
818            vec![DataType::Int32, DataType::Int32, DataType::Int32],
819            None,
820        )
821        .await
822        .unwrap();
823
824        let mut executor = sink_executor.boxed().execute();
825
826        // Barrier message.
827        executor.next().await.unwrap().unwrap();
828
829        let chunk_msg = executor.next().await.unwrap().unwrap();
830        assert_eq!(
831            chunk_msg.into_chunk().unwrap().compact(),
832            StreamChunk::from_pretty(
833                " I I I
834                + 3 2 1",
835            )
836        );
837
838        // Barrier message.
839        executor.next().await.unwrap().unwrap();
840
841        let chunk_msg = executor.next().await.unwrap().unwrap();
842        assert_eq!(
843            chunk_msg.into_chunk().unwrap().compact(),
844            StreamChunk::from_pretty(
845                " I I I
846                + 3 4 1
847                + 5 6 7",
848            )
849        );
850
851        // Should not receive the third stream chunk message because the force-append-only sink
852        // executor will drop all DELETE messages.
853
854        // The last barrier message.
855        executor.next().await.unwrap().unwrap();
856    }
857
858    #[tokio::test]
859    async fn stream_key_sink_pk_mismatch() {
860        use risingwave_common::array::StreamChunkTestExt;
861        use risingwave_common::array::stream_chunk::StreamChunk;
862        use risingwave_common::types::DataType;
863
864        use crate::executor::Barrier;
865
866        let properties = maplit::btreemap! {
867            "connector".into() => "blackhole".into(),
868        };
869
870        // We have two visible columns and one hidden column. The hidden column will be pruned out
871        // within the sink executor.
872        let columns = vec![
873            ColumnCatalog {
874                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
875                is_hidden: false,
876            },
877            ColumnCatalog {
878                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
879                is_hidden: false,
880            },
881            ColumnCatalog {
882                column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
883                is_hidden: true,
884            },
885        ];
886        let schema: Schema = columns
887            .iter()
888            .map(|column| Field::from(column.column_desc.clone()))
889            .collect();
890
891        let source = MockSource::with_messages(vec![
892            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
893            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
894                " I I I
895                    + 1 1 10",
896            ))),
897            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
898            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
899                " I I I
900                    + 1 3 30",
901            ))),
902            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
903                " I I I
904                    + 1 2 20
905                    - 1 2 20",
906            ))),
907            Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty(
908                " I I I
909                    - 1 1 10
910                    + 1 1 40",
911            ))),
912            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
913        ])
914        .into_executor(schema.clone(), vec![0, 1]);
915
916        let sink_param = SinkParam {
917            sink_id: 0.into(),
918            sink_name: "test".into(),
919            properties,
920
921            columns: columns
922                .iter()
923                .filter(|col| !col.is_hidden)
924                .map(|col| col.column_desc.clone())
925                .collect(),
926            downstream_pk: vec![0],
927            sink_type: SinkType::Upsert,
928            format_desc: None,
929            db_name: "test".into(),
930            sink_from_name: "test".into(),
931        };
932
933        let info = ExecutorInfo::new(schema, vec![0, 1], "SinkExecutor".to_owned(), 0);
934
935        let sink = build_sink(sink_param.clone()).unwrap();
936
937        let sink_executor = SinkExecutor::new(
938            ActorContext::for_test(0),
939            info,
940            source,
941            SinkWriterParam::for_test(),
942            sink,
943            sink_param,
944            columns.clone(),
945            BoundedInMemLogStoreFactory::new(1),
946            1024,
947            vec![DataType::Int64, DataType::Int64, DataType::Int64],
948            None,
949        )
950        .await
951        .unwrap();
952
953        let mut executor = sink_executor.boxed().execute();
954
955        // Barrier message.
956        executor.next().await.unwrap().unwrap();
957
958        let chunk_msg = executor.next().await.unwrap().unwrap();
959        assert_eq!(
960            chunk_msg.into_chunk().unwrap().compact(),
961            StreamChunk::from_pretty(
962                " I I I
963                + 1 1 10",
964            )
965        );
966
967        // Barrier message.
968        executor.next().await.unwrap().unwrap();
969
970        let chunk_msg = executor.next().await.unwrap().unwrap();
971        assert_eq!(
972            chunk_msg.into_chunk().unwrap().compact(),
973            StreamChunk::from_pretty(
974                " I I I
975                U- 1 1 10
976                U+ 1 1 40",
977            )
978        );
979
980        // The last barrier message.
981        executor.next().await.unwrap().unwrap();
982    }
983
984    #[tokio::test]
985    async fn test_empty_barrier_sink() {
986        use risingwave_common::types::DataType;
987
988        use crate::executor::Barrier;
989
990        let properties = maplit::btreemap! {
991            "connector".into() => "blackhole".into(),
992            "type".into() => "append-only".into(),
993            "force_append_only".into() => "true".into()
994        };
995        let columns = vec![
996            ColumnCatalog {
997                column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
998                is_hidden: false,
999            },
1000            ColumnCatalog {
1001                column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
1002                is_hidden: false,
1003            },
1004        ];
1005        let schema: Schema = columns
1006            .iter()
1007            .map(|column| Field::from(column.column_desc.clone()))
1008            .collect();
1009        let pk_indices = vec![0];
1010
1011        let source = MockSource::with_messages(vec![
1012            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1013            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1014            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1015        ])
1016        .into_executor(schema.clone(), pk_indices.clone());
1017
1018        let sink_param = SinkParam {
1019            sink_id: 0.into(),
1020            sink_name: "test".into(),
1021            properties,
1022
1023            columns: columns
1024                .iter()
1025                .filter(|col| !col.is_hidden)
1026                .map(|col| col.column_desc.clone())
1027                .collect(),
1028            downstream_pk: pk_indices.clone(),
1029            sink_type: SinkType::ForceAppendOnly,
1030            format_desc: None,
1031            db_name: "test".into(),
1032            sink_from_name: "test".into(),
1033        };
1034
1035        let info = ExecutorInfo::new(schema, pk_indices, "SinkExecutor".to_owned(), 0);
1036
1037        let sink = build_sink(sink_param.clone()).unwrap();
1038
1039        let sink_executor = SinkExecutor::new(
1040            ActorContext::for_test(0),
1041            info,
1042            source,
1043            SinkWriterParam::for_test(),
1044            sink,
1045            sink_param,
1046            columns,
1047            BoundedInMemLogStoreFactory::new(1),
1048            1024,
1049            vec![DataType::Int64, DataType::Int64],
1050            None,
1051        )
1052        .await
1053        .unwrap();
1054
1055        let mut executor = sink_executor.boxed().execute();
1056
1057        // Barrier message.
1058        assert_eq!(
1059            executor.next().await.unwrap().unwrap(),
1060            Message::Barrier(Barrier::new_test_barrier(test_epoch(1)))
1061        );
1062
1063        // Barrier message.
1064        assert_eq!(
1065            executor.next().await.unwrap().unwrap(),
1066            Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))
1067        );
1068
1069        // The last barrier message.
1070        assert_eq!(
1071            executor.next().await.unwrap().unwrap(),
1072            Message::Barrier(Barrier::new_test_barrier(test_epoch(3)))
1073        );
1074    }
1075}