risingwave_stream/executor/
merge.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, VecDeque};
16use std::pin::Pin;
17use std::task::{Context, Poll};
18
19use anyhow::Context as _;
20use futures::future::try_join_all;
21use futures::stream::{FusedStream, FuturesUnordered, StreamFuture};
22use prometheus::Histogram;
23use risingwave_common::array::StreamChunkBuilder;
24use risingwave_common::config::MetricLevel;
25use risingwave_common::metrics::LabelGuardedMetric;
26use tokio::sync::mpsc;
27use tokio::time::Instant;
28
29use super::exchange::input::BoxedInput;
30use super::watermark::*;
31use super::*;
32use crate::executor::exchange::input::{
33    assert_equal_dispatcher_barrier, new_input, process_dispatcher_msg,
34};
35use crate::executor::prelude::*;
36use crate::task::LocalBarrierManager;
37
38pub(crate) enum MergeExecutorUpstream {
39    Singleton(BoxedInput),
40    Merge(SelectReceivers),
41}
42
43pub(crate) struct MergeExecutorInput {
44    upstream: MergeExecutorUpstream,
45    actor_context: ActorContextRef,
46    upstream_fragment_id: UpstreamFragmentId,
47    local_barrier_manager: LocalBarrierManager,
48    executor_stats: Arc<StreamingMetrics>,
49    pub(crate) info: ExecutorInfo,
50    chunk_size: usize,
51}
52
53impl MergeExecutorInput {
54    pub(crate) fn new(
55        upstream: MergeExecutorUpstream,
56        actor_context: ActorContextRef,
57        upstream_fragment_id: UpstreamFragmentId,
58        local_barrier_manager: LocalBarrierManager,
59        executor_stats: Arc<StreamingMetrics>,
60        info: ExecutorInfo,
61        chunk_size: usize,
62    ) -> Self {
63        Self {
64            upstream,
65            actor_context,
66            upstream_fragment_id,
67            local_barrier_manager,
68            executor_stats,
69            info,
70            chunk_size,
71        }
72    }
73
74    pub(crate) fn into_executor(self, barrier_rx: mpsc::UnboundedReceiver<Barrier>) -> Executor {
75        let fragment_id = self.actor_context.fragment_id;
76        let executor = match self.upstream {
77            MergeExecutorUpstream::Singleton(input) => ReceiverExecutor::new(
78                self.actor_context,
79                fragment_id,
80                self.upstream_fragment_id,
81                input,
82                self.local_barrier_manager,
83                self.executor_stats,
84                barrier_rx,
85            )
86            .boxed(),
87            MergeExecutorUpstream::Merge(inputs) => MergeExecutor::new(
88                self.actor_context,
89                fragment_id,
90                self.upstream_fragment_id,
91                inputs,
92                self.local_barrier_manager,
93                self.executor_stats,
94                barrier_rx,
95                self.chunk_size,
96                self.info.schema.clone(),
97            )
98            .boxed(),
99        };
100        (self.info, executor).into()
101    }
102}
103
104impl Stream for MergeExecutorInput {
105    type Item = DispatcherMessageStreamItem;
106
107    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
108        match &mut self.get_mut().upstream {
109            MergeExecutorUpstream::Singleton(input) => input.poll_next_unpin(cx),
110            MergeExecutorUpstream::Merge(inputs) => inputs.poll_next_unpin(cx),
111        }
112    }
113}
114
115/// `MergeExecutor` merges data from multiple channels. Dataflow from one channel
116/// will be stopped on barrier.
117pub struct MergeExecutor {
118    /// The context of the actor.
119    actor_context: ActorContextRef,
120
121    /// Upstream channels.
122    upstreams: SelectReceivers,
123
124    /// Belonged fragment id.
125    fragment_id: FragmentId,
126
127    /// Upstream fragment id.
128    upstream_fragment_id: FragmentId,
129
130    local_barrier_manager: LocalBarrierManager,
131
132    /// Streaming metrics.
133    metrics: Arc<StreamingMetrics>,
134
135    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
136
137    /// Chunk size for the `StreamChunkBuilder`
138    chunk_size: usize,
139
140    /// Data types for the `StreamChunkBuilder`
141    schema: Schema,
142}
143
144impl MergeExecutor {
145    #[allow(clippy::too_many_arguments)]
146    pub fn new(
147        ctx: ActorContextRef,
148        fragment_id: FragmentId,
149        upstream_fragment_id: FragmentId,
150        upstreams: SelectReceivers,
151        local_barrier_manager: LocalBarrierManager,
152        metrics: Arc<StreamingMetrics>,
153        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
154        chunk_size: usize,
155        schema: Schema,
156    ) -> Self {
157        Self {
158            actor_context: ctx,
159            upstreams,
160            fragment_id,
161            upstream_fragment_id,
162            local_barrier_manager,
163            metrics,
164            barrier_rx,
165            chunk_size,
166            schema,
167        }
168    }
169
170    #[cfg(test)]
171    pub fn for_test(
172        actor_id: ActorId,
173        inputs: Vec<super::exchange::permit::Receiver>,
174        local_barrier_manager: crate::task::LocalBarrierManager,
175        schema: Schema,
176    ) -> Self {
177        use super::exchange::input::LocalInput;
178        use crate::executor::exchange::input::Input;
179
180        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
181
182        let metrics = StreamingMetrics::unused();
183        let actor_ctx = ActorContext::for_test(actor_id);
184        let upstream = Self::new_select_receiver(
185            inputs
186                .into_iter()
187                .enumerate()
188                .map(|(idx, input)| LocalInput::new(input, idx as ActorId).boxed_input())
189                .collect(),
190            &metrics,
191            &actor_ctx,
192        );
193
194        Self::new(
195            actor_ctx,
196            514,
197            1919,
198            upstream,
199            local_barrier_manager,
200            metrics.into(),
201            barrier_rx,
202            100,
203            schema,
204        )
205    }
206
207    pub(crate) fn new_select_receiver(
208        upstreams: Vec<BoxedInput>,
209        metrics: &StreamingMetrics,
210        actor_context: &ActorContext,
211    ) -> SelectReceivers {
212        let merge_barrier_align_duration = if metrics.level >= MetricLevel::Debug {
213            Some(
214                metrics
215                    .merge_barrier_align_duration
216                    .with_guarded_label_values(&[
217                        &actor_context.id.to_string(),
218                        &actor_context.fragment_id.to_string(),
219                    ]),
220            )
221        } else {
222            None
223        };
224
225        // Futures of all active upstreams.
226        SelectReceivers::new(
227            actor_context.id,
228            upstreams,
229            merge_barrier_align_duration.clone(),
230        )
231    }
232
233    #[try_stream(ok = Message, error = StreamExecutorError)]
234    async fn execute_inner(mut self: Box<Self>) {
235        let select_all = self.upstreams;
236        let select_all = BufferChunks::new(select_all, self.chunk_size, self.schema);
237        let actor_id = self.actor_context.id;
238
239        let mut metrics = self.metrics.new_actor_input_metrics(
240            actor_id,
241            self.fragment_id,
242            self.upstream_fragment_id,
243        );
244
245        // Channels that're blocked by the barrier to align.
246        let mut start_time = Instant::now();
247        pin_mut!(select_all);
248        while let Some(msg) = select_all.next().await {
249            metrics
250                .actor_input_buffer_blocking_duration_ns
251                .inc_by(start_time.elapsed().as_nanos() as u64);
252            let msg: DispatcherMessage = msg?;
253            let mut msg: Message = process_dispatcher_msg(msg, &mut self.barrier_rx).await?;
254
255            match &mut msg {
256                Message::Watermark(_) => {
257                    // Do nothing.
258                }
259                Message::Chunk(chunk) => {
260                    metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
261                }
262                Message::Barrier(barrier) => {
263                    tracing::debug!(
264                        target: "events::stream::barrier::path",
265                        actor_id = actor_id,
266                        "receiver receives barrier from path: {:?}",
267                        barrier.passed_actors
268                    );
269                    barrier.passed_actors.push(actor_id);
270
271                    if let Some(Mutation::Update(UpdateMutation { dispatchers, .. })) =
272                        barrier.mutation.as_deref()
273                    {
274                        if select_all
275                            .upstream_actor_ids()
276                            .iter()
277                            .any(|actor_id| dispatchers.contains_key(actor_id))
278                        {
279                            // `Watermark` of upstream may become stale after downstream scaling.
280                            select_all
281                                .buffered_watermarks
282                                .values_mut()
283                                .for_each(|buffers| buffers.clear());
284                        }
285                    }
286
287                    if let Some(update) =
288                        barrier.as_update_merge(self.actor_context.id, self.upstream_fragment_id)
289                    {
290                        let new_upstream_fragment_id = update
291                            .new_upstream_fragment_id
292                            .unwrap_or(self.upstream_fragment_id);
293                        let removed_upstream_actor_id: HashSet<_> =
294                            if update.new_upstream_fragment_id.is_some() {
295                                select_all.upstream_actor_ids().iter().copied().collect()
296                            } else {
297                                update.removed_upstream_actor_id.iter().copied().collect()
298                            };
299
300                        // `Watermark` of upstream may become stale after upstream scaling.
301                        select_all
302                            .buffered_watermarks
303                            .values_mut()
304                            .for_each(|buffers| buffers.clear());
305
306                        if !update.added_upstream_actors.is_empty() {
307                            // Create new upstreams receivers.
308                            let new_upstreams: Vec<_> = try_join_all(
309                                update.added_upstream_actors.iter().map(|upstream_actor| {
310                                    new_input(
311                                        &self.local_barrier_manager,
312                                        self.metrics.clone(),
313                                        self.actor_context.id,
314                                        self.fragment_id,
315                                        upstream_actor,
316                                        new_upstream_fragment_id,
317                                    )
318                                }),
319                            )
320                            .await
321                            .context("failed to create upstream receivers")?;
322
323                            // Poll the first barrier from the new upstreams. It must be the same as
324                            // the one we polled from original upstreams.
325                            let mut select_new = SelectReceivers::new(
326                                self.actor_context.id,
327                                new_upstreams,
328                                select_all.merge_barrier_align_duration(),
329                            );
330                            let new_barrier = expect_first_barrier(&mut select_new).await?;
331                            assert_equal_dispatcher_barrier(barrier, &new_barrier);
332
333                            // Add the new upstreams to select.
334                            select_all.add_upstreams_from(select_new);
335
336                            // Add buffers to the buffered watermarks for all cols
337                            select_all
338                                .buffered_watermarks
339                                .values_mut()
340                                .for_each(|buffers| {
341                                    buffers.add_buffers(
342                                        update
343                                            .added_upstream_actors
344                                            .iter()
345                                            .map(|actor| actor.actor_id),
346                                    )
347                                });
348                        }
349
350                        if !removed_upstream_actor_id.is_empty() {
351                            // Remove upstreams.
352                            select_all.remove_upstreams(&removed_upstream_actor_id);
353
354                            for buffers in select_all.buffered_watermarks.values_mut() {
355                                // Call `check_heap` in case the only upstream(s) that does not have
356                                // watermark in heap is removed
357                                buffers.remove_buffer(removed_upstream_actor_id.clone());
358                            }
359                        }
360
361                        self.upstream_fragment_id = new_upstream_fragment_id;
362                        metrics = self.metrics.new_actor_input_metrics(
363                            actor_id,
364                            self.fragment_id,
365                            self.upstream_fragment_id,
366                        );
367
368                        select_all.update_actor_ids();
369                    }
370
371                    if barrier.is_stop(actor_id) {
372                        yield msg;
373                        break;
374                    }
375                }
376            }
377
378            yield msg;
379            start_time = Instant::now();
380        }
381    }
382}
383
384impl Execute for MergeExecutor {
385    fn execute(self: Box<Self>) -> BoxedMessageStream {
386        self.execute_inner().boxed()
387    }
388}
389
390/// A stream for merging messages from multiple upstreams.
391pub struct SelectReceivers {
392    /// The barrier we're aligning to. If this is `None`, then `blocked_upstreams` is empty.
393    barrier: Option<DispatcherBarrier>,
394    /// The upstreams that're blocked by the `barrier`.
395    blocked: Vec<BoxedInput>,
396    /// The upstreams that're not blocked and can be polled.
397    active: FuturesUnordered<StreamFuture<BoxedInput>>,
398    /// All upstream actor ids.
399    upstream_actor_ids: Vec<ActorId>,
400
401    /// The actor id of this fragment.
402    actor_id: u32,
403    /// watermark column index -> `BufferedWatermarks`
404    buffered_watermarks: BTreeMap<usize, BufferedWatermarks<ActorId>>,
405    /// If None, then we don't take `Instant::now()` and `observe` during `poll_next`
406    merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
407}
408
409impl Stream for SelectReceivers {
410    type Item = DispatcherMessageStreamItem;
411
412    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
413        if self.active.is_terminated() {
414            // This only happens if we've been asked to stop.
415            assert!(self.blocked.is_empty());
416            return Poll::Ready(None);
417        }
418
419        let mut start = None;
420        loop {
421            match futures::ready!(self.active.poll_next_unpin(cx)) {
422                // Directly forward the error.
423                Some((Some(Err(e)), _)) => {
424                    return Poll::Ready(Some(Err(e)));
425                }
426                // Handle the message from some upstream.
427                Some((Some(Ok(message)), remaining)) => {
428                    let actor_id = remaining.actor_id();
429                    match message {
430                        DispatcherMessage::Chunk(chunk) => {
431                            // Continue polling this upstream by pushing it back to `active`.
432                            self.active.push(remaining.into_future());
433                            return Poll::Ready(Some(Ok(DispatcherMessage::Chunk(chunk))));
434                        }
435                        DispatcherMessage::Watermark(watermark) => {
436                            // Continue polling this upstream by pushing it back to `active`.
437                            self.active.push(remaining.into_future());
438                            if let Some(watermark) = self.handle_watermark(actor_id, watermark) {
439                                return Poll::Ready(Some(Ok(DispatcherMessage::Watermark(
440                                    watermark,
441                                ))));
442                            }
443                        }
444                        DispatcherMessage::Barrier(barrier) => {
445                            // Block this upstream by pushing it to `blocked`.
446                            if self.blocked.is_empty()
447                                && self.merge_barrier_align_duration.is_some()
448                            {
449                                start = Some(Instant::now());
450                            }
451                            self.blocked.push(remaining);
452                            if let Some(current_barrier) = self.barrier.as_ref() {
453                                if current_barrier.epoch != barrier.epoch {
454                                    return Poll::Ready(Some(Err(
455                                        StreamExecutorError::align_barrier(
456                                            current_barrier.clone().map_mutation(|_| None),
457                                            barrier.map_mutation(|_| None),
458                                        ),
459                                    )));
460                                }
461                            } else {
462                                self.barrier = Some(barrier);
463                            }
464                        }
465                    }
466                }
467                // We use barrier as the control message of the stream. That is, we always stop the
468                // actors actively when we receive a `Stop` mutation, instead of relying on the stream
469                // termination.
470                //
471                // Besides, in abnormal cases when the other side of the `Input` closes unexpectedly,
472                // we also yield an `Err(ExchangeChannelClosed)`, which will hit the `Err` arm above.
473                // So this branch will never be reached in all cases.
474                Some((None, _)) => unreachable!(),
475                // There's no active upstreams. Process the barrier and resume the blocked ones.
476                None => {
477                    if let Some(start) = start
478                        && let Some(merge_barrier_align_duration) =
479                            &self.merge_barrier_align_duration
480                    {
481                        // Observe did a few atomic operation inside, we want to avoid the overhead.
482                        merge_barrier_align_duration.observe(start.elapsed().as_secs_f64())
483                    }
484                    break;
485                }
486            }
487        }
488
489        assert!(self.active.is_terminated());
490        let barrier = self.barrier.take().unwrap();
491
492        let upstreams = std::mem::take(&mut self.blocked);
493        self.extend_active(upstreams);
494        assert!(!self.active.is_terminated());
495
496        Poll::Ready(Some(Ok(DispatcherMessage::Barrier(barrier))))
497    }
498}
499
500impl SelectReceivers {
501    fn new(
502        actor_id: u32,
503        upstreams: Vec<BoxedInput>,
504        merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
505    ) -> Self {
506        assert!(!upstreams.is_empty());
507        let upstream_actor_ids = upstreams.iter().map(|input| input.actor_id()).collect();
508        let mut this = Self {
509            blocked: Vec::with_capacity(upstreams.len()),
510            active: Default::default(),
511            actor_id,
512            barrier: None,
513            upstream_actor_ids,
514            buffered_watermarks: Default::default(),
515            merge_barrier_align_duration,
516        };
517        this.extend_active(upstreams);
518        this
519    }
520
521    /// Extend the active upstreams with the given upstreams. The current stream must be at the
522    /// clean state right after a barrier.
523    fn extend_active(&mut self, upstreams: impl IntoIterator<Item = BoxedInput>) {
524        assert!(self.blocked.is_empty() && self.barrier.is_none());
525
526        self.active
527            .extend(upstreams.into_iter().map(|s| s.into_future()));
528    }
529
530    fn upstream_actor_ids(&self) -> &[ActorId] {
531        &self.upstream_actor_ids
532    }
533
534    fn update_actor_ids(&mut self) {
535        self.upstream_actor_ids = self
536            .blocked
537            .iter()
538            .map(|input| input.actor_id())
539            .chain(
540                self.active
541                    .iter()
542                    .map(|input| input.get_ref().unwrap().actor_id()),
543            )
544            .collect();
545    }
546
547    /// Handle a new watermark message. Optionally returns the watermark message to emit.
548    fn handle_watermark(&mut self, actor_id: ActorId, watermark: Watermark) -> Option<Watermark> {
549        let col_idx = watermark.col_idx;
550        // Insert a buffer watermarks when first received from a column.
551        let watermarks = self
552            .buffered_watermarks
553            .entry(col_idx)
554            .or_insert_with(|| BufferedWatermarks::with_ids(self.upstream_actor_ids.clone()));
555        watermarks.handle_watermark(actor_id, watermark)
556    }
557
558    /// Consume `other` and add its upstreams to `self`. The two streams must be at the clean state
559    /// right after a barrier.
560    fn add_upstreams_from(&mut self, other: Self) {
561        assert!(self.blocked.is_empty() && self.barrier.is_none());
562        assert!(other.blocked.is_empty() && other.barrier.is_none());
563        assert_eq!(self.actor_id, other.actor_id);
564
565        self.active.extend(other.active);
566    }
567
568    /// Remove upstreams from `self` in `upstream_actor_ids`. The current stream must be at the
569    /// clean state right after a barrier.
570    fn remove_upstreams(&mut self, upstream_actor_ids: &HashSet<ActorId>) {
571        assert!(self.blocked.is_empty() && self.barrier.is_none());
572
573        let new_upstreams = std::mem::take(&mut self.active)
574            .into_iter()
575            .map(|s| s.into_inner().unwrap())
576            .filter(|u| !upstream_actor_ids.contains(&u.actor_id()));
577        self.extend_active(new_upstreams);
578    }
579
580    fn merge_barrier_align_duration(&self) -> Option<LabelGuardedMetric<Histogram>> {
581        self.merge_barrier_align_duration.clone()
582    }
583}
584
585/// A wrapper that buffers the `StreamChunk`s from upstream until no more ready items are available.
586/// Besides, any message other than `StreamChunk` will trigger the buffered `StreamChunk`s
587/// to be emitted immediately along with the message itself.
588struct BufferChunks<S: Stream> {
589    inner: S,
590    chunk_builder: StreamChunkBuilder,
591
592    /// The items to be emitted. Whenever there's something here, we should return a `Poll::Ready` immediately.
593    pending_items: VecDeque<S::Item>,
594}
595
596impl<S: Stream> BufferChunks<S> {
597    pub(super) fn new(inner: S, chunk_size: usize, schema: Schema) -> Self {
598        assert!(chunk_size > 0);
599        let chunk_builder = StreamChunkBuilder::new(chunk_size, schema.data_types());
600        Self {
601            inner,
602            chunk_builder,
603            pending_items: VecDeque::new(),
604        }
605    }
606}
607
608impl<S: Stream> std::ops::Deref for BufferChunks<S> {
609    type Target = S;
610
611    fn deref(&self) -> &Self::Target {
612        &self.inner
613    }
614}
615
616impl<S: Stream> std::ops::DerefMut for BufferChunks<S> {
617    fn deref_mut(&mut self) -> &mut Self::Target {
618        &mut self.inner
619    }
620}
621
622impl<S: Stream> Stream for BufferChunks<S>
623where
624    S: Stream<Item = DispatcherMessageStreamItem> + Unpin,
625{
626    type Item = S::Item;
627
628    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
629        loop {
630            if let Some(item) = self.pending_items.pop_front() {
631                return Poll::Ready(Some(item));
632            }
633
634            match self.inner.poll_next_unpin(cx) {
635                Poll::Pending => {
636                    return if let Some(chunk_out) = self.chunk_builder.take() {
637                        Poll::Ready(Some(Ok(MessageInner::Chunk(chunk_out))))
638                    } else {
639                        Poll::Pending
640                    };
641                }
642
643                Poll::Ready(Some(result)) => {
644                    if let Ok(MessageInner::Chunk(chunk)) = result {
645                        for row in chunk.records() {
646                            if let Some(chunk_out) = self.chunk_builder.append_record(row) {
647                                self.pending_items
648                                    .push_back(Ok(MessageInner::Chunk(chunk_out)));
649                            }
650                        }
651                    } else {
652                        return if let Some(chunk_out) = self.chunk_builder.take() {
653                            self.pending_items.push_back(result);
654                            Poll::Ready(Some(Ok(MessageInner::Chunk(chunk_out))))
655                        } else {
656                            Poll::Ready(Some(result))
657                        };
658                    }
659                }
660
661                Poll::Ready(None) => {
662                    // See also the comments in `SelectReceivers::poll_next`.
663                    unreachable!("SelectReceivers should never return None");
664                }
665            }
666        }
667    }
668}
669
670#[cfg(test)]
671mod tests {
672    use std::sync::atomic::{AtomicBool, Ordering};
673    use std::time::Duration;
674
675    use assert_matches::assert_matches;
676    use futures::FutureExt;
677    use futures::future::try_join_all;
678    use risingwave_common::array::Op;
679    use risingwave_common::util::epoch::test_epoch;
680    use risingwave_pb::task_service::exchange_service_server::{
681        ExchangeService, ExchangeServiceServer,
682    };
683    use risingwave_pb::task_service::{
684        GetDataRequest, GetDataResponse, GetStreamRequest, GetStreamResponse, PbPermits,
685    };
686    use tokio::time::sleep;
687    use tokio_stream::wrappers::ReceiverStream;
688    use tonic::{Request, Response, Status, Streaming};
689
690    use super::*;
691    use crate::executor::exchange::input::{Input, LocalInput, RemoteInput};
692    use crate::executor::exchange::permit::channel_for_test;
693    use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
694    use crate::task::NewOutputRequest;
695    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
696    use crate::task::test_utils::helper_make_local_actor;
697
698    fn build_test_chunk(size: u64) -> StreamChunk {
699        let ops = vec![Op::Insert; size as usize];
700        StreamChunk::new(ops, vec![])
701    }
702
703    #[tokio::test]
704    async fn test_buffer_chunks() {
705        let test_env = LocalBarrierTestEnv::for_test().await;
706
707        let (tx, rx) = channel_for_test();
708        let input = LocalInput::new(rx, 1).boxed_input();
709        let mut buffer = BufferChunks::new(input, 100, Schema::new(vec![]));
710
711        // Send a chunk
712        tx.send(Message::Chunk(build_test_chunk(10)).into())
713            .await
714            .unwrap();
715        assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Chunk(chunk) => {
716            assert_eq!(chunk.ops().len() as u64, 10);
717        });
718
719        // Send 2 chunks and expect them to be merged.
720        tx.send(Message::Chunk(build_test_chunk(10)).into())
721            .await
722            .unwrap();
723        tx.send(Message::Chunk(build_test_chunk(10)).into())
724            .await
725            .unwrap();
726        assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Chunk(chunk) => {
727            assert_eq!(chunk.ops().len() as u64, 20);
728        });
729
730        // Send a watermark.
731        tx.send(
732            Message::Watermark(Watermark {
733                col_idx: 0,
734                data_type: DataType::Int64,
735                val: ScalarImpl::Int64(233),
736            })
737            .into(),
738        )
739        .await
740        .unwrap();
741        assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Watermark(watermark) => {
742            assert_eq!(watermark.val, ScalarImpl::Int64(233));
743        });
744
745        // Send 2 chunks before a watermark. Expect the 2 chunks to be merged and the watermark to be emitted.
746        tx.send(Message::Chunk(build_test_chunk(10)).into())
747            .await
748            .unwrap();
749        tx.send(Message::Chunk(build_test_chunk(10)).into())
750            .await
751            .unwrap();
752        tx.send(
753            Message::Watermark(Watermark {
754                col_idx: 0,
755                data_type: DataType::Int64,
756                val: ScalarImpl::Int64(233),
757            })
758            .into(),
759        )
760        .await
761        .unwrap();
762        assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Chunk(chunk) => {
763            assert_eq!(chunk.ops().len() as u64, 20);
764        });
765        assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Watermark(watermark) => {
766            assert_eq!(watermark.val, ScalarImpl::Int64(233));
767        });
768
769        // Send a barrier.
770        let barrier = Barrier::new_test_barrier(test_epoch(1));
771        test_env.inject_barrier(&barrier, [2]);
772        tx.send(Message::Barrier(barrier.clone().into_dispatcher()).into())
773            .await
774            .unwrap();
775        assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Barrier(Barrier { epoch: barrier_epoch, mutation: _, .. }) => {
776            assert_eq!(barrier_epoch.curr, test_epoch(1));
777        });
778
779        // Send 2 chunks before a barrier. Expect the 2 chunks to be merged and the barrier to be emitted.
780        tx.send(Message::Chunk(build_test_chunk(10)).into())
781            .await
782            .unwrap();
783        tx.send(Message::Chunk(build_test_chunk(10)).into())
784            .await
785            .unwrap();
786        let barrier = Barrier::new_test_barrier(test_epoch(2));
787        test_env.inject_barrier(&barrier, [2]);
788        tx.send(Message::Barrier(barrier.clone().into_dispatcher()).into())
789            .await
790            .unwrap();
791        assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Chunk(chunk) => {
792            assert_eq!(chunk.ops().len() as u64, 20);
793        });
794        assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Barrier(Barrier { epoch: barrier_epoch, mutation: _, .. }) => {
795            assert_eq!(barrier_epoch.curr, test_epoch(2));
796        });
797    }
798
799    #[tokio::test]
800    async fn test_merger() {
801        const CHANNEL_NUMBER: usize = 10;
802        let mut txs = Vec::with_capacity(CHANNEL_NUMBER);
803        let mut rxs = Vec::with_capacity(CHANNEL_NUMBER);
804        for _i in 0..CHANNEL_NUMBER {
805            let (tx, rx) = channel_for_test();
806            txs.push(tx);
807            rxs.push(rx);
808        }
809        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
810        let actor_id = 233;
811        let mut handles = Vec::with_capacity(CHANNEL_NUMBER);
812
813        let epochs = (10..1000u64)
814            .step_by(10)
815            .map(|idx| (idx, test_epoch(idx)))
816            .collect_vec();
817        let mut prev_epoch = 0;
818        let prev_epoch = &mut prev_epoch;
819        let barriers: HashMap<_, _> = epochs
820            .iter()
821            .map(|(_, epoch)| {
822                let barrier = Barrier::with_prev_epoch_for_test(*epoch, *prev_epoch);
823                *prev_epoch = *epoch;
824                barrier_test_env.inject_barrier(&barrier, [actor_id]);
825                (*epoch, barrier)
826            })
827            .collect();
828        let b2 = Barrier::with_prev_epoch_for_test(test_epoch(1000), *prev_epoch)
829            .with_mutation(Mutation::Stop(HashSet::default()));
830        barrier_test_env.inject_barrier(&b2, [actor_id]);
831        barrier_test_env.flush_all_events().await;
832
833        for (tx_id, tx) in txs.into_iter().enumerate() {
834            let epochs = epochs.clone();
835            let barriers = barriers.clone();
836            let b2 = b2.clone();
837            let handle = tokio::spawn(async move {
838                for (idx, epoch) in epochs {
839                    if idx % 20 == 0 {
840                        tx.send(Message::Chunk(build_test_chunk(10)).into())
841                            .await
842                            .unwrap();
843                    } else {
844                        tx.send(
845                            Message::Watermark(Watermark {
846                                col_idx: (idx as usize / 20 + tx_id) % CHANNEL_NUMBER,
847                                data_type: DataType::Int64,
848                                val: ScalarImpl::Int64(idx as i64),
849                            })
850                            .into(),
851                        )
852                        .await
853                        .unwrap();
854                    }
855                    tx.send(Message::Barrier(barriers[&epoch].clone().into_dispatcher()).into())
856                        .await
857                        .unwrap();
858                    sleep(Duration::from_millis(1)).await;
859                }
860                tx.send(Message::Barrier(b2.clone().into_dispatcher()).into())
861                    .await
862                    .unwrap();
863            });
864            handles.push(handle);
865        }
866
867        let merger = MergeExecutor::for_test(
868            actor_id,
869            rxs,
870            barrier_test_env.local_barrier_manager.clone(),
871            Schema::new(vec![]),
872        );
873        let mut merger = merger.boxed().execute();
874        for (idx, epoch) in epochs {
875            if idx % 20 == 0 {
876                // expect 1 or more chunks with 100 rows in total
877                let mut count = 0usize;
878                while count < 100 {
879                    assert_matches!(merger.next().await.unwrap().unwrap(), Message::Chunk(chunk) => {
880                        count += chunk.ops().len();
881                    });
882                }
883                assert_eq!(count, 100);
884            } else if idx as usize / 20 >= CHANNEL_NUMBER - 1 {
885                // expect n watermarks
886                for _ in 0..CHANNEL_NUMBER {
887                    assert_matches!(merger.next().await.unwrap().unwrap(), Message::Watermark(watermark) => {
888                        assert_eq!(watermark.val, ScalarImpl::Int64((idx - 20 * (CHANNEL_NUMBER as u64 - 1)) as i64));
889                    });
890                }
891            }
892            // expect a barrier
893            assert_matches!(merger.next().await.unwrap().unwrap(), Message::Barrier(Barrier{epoch:barrier_epoch,mutation:_,..}) => {
894                assert_eq!(barrier_epoch.curr, epoch);
895            });
896        }
897        assert_matches!(
898            merger.next().await.unwrap().unwrap(),
899            Message::Barrier(Barrier {
900                mutation,
901                ..
902            }) if mutation.as_deref().unwrap().is_stop()
903        );
904
905        for handle in handles {
906            handle.await.unwrap();
907        }
908    }
909
910    #[tokio::test]
911    async fn test_configuration_change() {
912        let actor_id = 233;
913        let (untouched, old, new) = (234, 235, 238); // upstream actors
914        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
915        let metrics = Arc::new(StreamingMetrics::unused());
916
917        // untouched -> actor_id
918        // old -> actor_id
919        // new -> actor_id
920
921        let (upstream_fragment_id, fragment_id) = (10, 18);
922
923        let inputs: Vec<_> =
924            try_join_all([untouched, old].into_iter().map(async |upstream_actor_id| {
925                new_input(
926                    &barrier_test_env.local_barrier_manager,
927                    metrics.clone(),
928                    actor_id,
929                    fragment_id,
930                    &helper_make_local_actor(upstream_actor_id),
931                    upstream_fragment_id,
932                )
933                .await
934            }))
935            .await
936            .unwrap();
937
938        let merge_updates = maplit::hashmap! {
939            (actor_id, upstream_fragment_id) => MergeUpdate {
940                actor_id,
941                upstream_fragment_id,
942                new_upstream_fragment_id: None,
943                added_upstream_actors: vec![helper_make_local_actor(new)],
944                removed_upstream_actor_id: vec![old],
945            }
946        };
947
948        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
949            UpdateMutation {
950                dispatchers: Default::default(),
951                merges: merge_updates,
952                vnode_bitmaps: Default::default(),
953                dropped_actors: Default::default(),
954                actor_splits: Default::default(),
955                actor_new_dispatchers: Default::default(),
956            },
957        ));
958        barrier_test_env.inject_barrier(&b1, [actor_id]);
959        barrier_test_env.flush_all_events().await;
960
961        let barrier_rx = barrier_test_env
962            .local_barrier_manager
963            .subscribe_barrier(actor_id);
964        let actor_ctx = ActorContext::for_test(actor_id);
965        let upstream = MergeExecutor::new_select_receiver(inputs, &metrics, &actor_ctx);
966
967        let mut merge = MergeExecutor::new(
968            actor_ctx,
969            fragment_id,
970            upstream_fragment_id,
971            upstream,
972            barrier_test_env.local_barrier_manager.clone(),
973            metrics.clone(),
974            barrier_rx,
975            100,
976            Schema::new(vec![]),
977        )
978        .boxed()
979        .execute();
980
981        let mut txs = HashMap::new();
982        macro_rules! send {
983            ($actors:expr, $msg:expr) => {
984                for actor in $actors {
985                    txs.get(&actor).unwrap().send($msg).await.unwrap();
986                }
987            };
988        }
989
990        macro_rules! assert_recv_pending {
991            () => {
992                assert!(
993                    merge
994                        .next()
995                        .now_or_never()
996                        .flatten()
997                        .transpose()
998                        .unwrap()
999                        .is_none()
1000                );
1001            };
1002        }
1003        macro_rules! recv {
1004            () => {
1005                merge.next().await.transpose().unwrap()
1006            };
1007        }
1008
1009        macro_rules! collect_upstream_tx {
1010            ($actors:expr) => {
1011                for upstream_id in $actors {
1012                    let mut output_requests = barrier_test_env
1013                        .take_pending_new_output_requests(upstream_id)
1014                        .await;
1015                    assert_eq!(output_requests.len(), 1);
1016                    let (downstream_actor_id, request) = output_requests.pop().unwrap();
1017                    assert_eq!(actor_id, downstream_actor_id);
1018                    let NewOutputRequest::Local(tx) = request else {
1019                        unreachable!()
1020                    };
1021                    txs.insert(upstream_id, tx);
1022                }
1023            };
1024        }
1025
1026        assert_recv_pending!();
1027        barrier_test_env.flush_all_events().await;
1028
1029        // 2. Take downstream receivers.
1030        collect_upstream_tx!([untouched, old]);
1031
1032        // 3. Send a chunk.
1033        send!([untouched, old], Message::Chunk(build_test_chunk(1)).into());
1034        assert_eq!(2, recv!().unwrap().as_chunk().unwrap().cardinality()); // We should be able to receive the chunk twice.
1035        assert_recv_pending!();
1036
1037        send!(
1038            [untouched, old],
1039            Message::Barrier(b1.clone().into_dispatcher()).into()
1040        );
1041        assert_recv_pending!(); // We should not receive the barrier, since merger is waiting for the new upstream new.
1042
1043        collect_upstream_tx!([new]);
1044
1045        send!([new], Message::Barrier(b1.clone().into_dispatcher()).into());
1046        recv!().unwrap().as_barrier().unwrap(); // We should now receive the barrier.
1047
1048        // 5. Send a chunk.
1049        send!([untouched, new], Message::Chunk(build_test_chunk(1)).into());
1050        assert_eq!(2, recv!().unwrap().as_chunk().unwrap().cardinality()); // We should be able to receive the chunk twice.
1051        assert_recv_pending!();
1052    }
1053
1054    struct FakeExchangeService {
1055        rpc_called: Arc<AtomicBool>,
1056    }
1057
1058    fn exchange_client_test_barrier() -> crate::executor::Barrier {
1059        Barrier::new_test_barrier(test_epoch(1))
1060    }
1061
1062    #[async_trait::async_trait]
1063    impl ExchangeService for FakeExchangeService {
1064        type GetDataStream = ReceiverStream<std::result::Result<GetDataResponse, Status>>;
1065        type GetStreamStream = ReceiverStream<std::result::Result<GetStreamResponse, Status>>;
1066
1067        async fn get_data(
1068            &self,
1069            _: Request<GetDataRequest>,
1070        ) -> std::result::Result<Response<Self::GetDataStream>, Status> {
1071            unimplemented!()
1072        }
1073
1074        async fn get_stream(
1075            &self,
1076            _request: Request<Streaming<GetStreamRequest>>,
1077        ) -> std::result::Result<Response<Self::GetStreamStream>, Status> {
1078            let (tx, rx) = tokio::sync::mpsc::channel(10);
1079            self.rpc_called.store(true, Ordering::SeqCst);
1080            // send stream_chunk
1081            let stream_chunk = StreamChunk::default().to_protobuf();
1082            tx.send(Ok(GetStreamResponse {
1083                message: Some(PbStreamMessageBatch {
1084                    stream_message_batch: Some(
1085                        risingwave_pb::stream_plan::stream_message_batch::StreamMessageBatch::StreamChunk(
1086                            stream_chunk,
1087                        ),
1088                    ),
1089                }),
1090                permits: Some(PbPermits::default()),
1091            }))
1092            .await
1093            .unwrap();
1094            // send barrier
1095            let barrier = exchange_client_test_barrier();
1096            tx.send(Ok(GetStreamResponse {
1097                message: Some(PbStreamMessageBatch {
1098                    stream_message_batch: Some(
1099                        risingwave_pb::stream_plan::stream_message_batch::StreamMessageBatch::BarrierBatch(
1100                            BarrierBatch {
1101                                barriers: vec![barrier.to_protobuf()],
1102                            },
1103                        ),
1104                    ),
1105                }),
1106                permits: Some(PbPermits::default()),
1107            }))
1108            .await
1109            .unwrap();
1110            Ok(Response::new(ReceiverStream::new(rx)))
1111        }
1112    }
1113
1114    #[tokio::test]
1115    async fn test_stream_exchange_client() {
1116        let rpc_called = Arc::new(AtomicBool::new(false));
1117        let server_run = Arc::new(AtomicBool::new(false));
1118        let addr = "127.0.0.1:12348".parse().unwrap();
1119
1120        // Start a server.
1121        let (shutdown_send, shutdown_recv) = tokio::sync::oneshot::channel();
1122        let exchange_svc = ExchangeServiceServer::new(FakeExchangeService {
1123            rpc_called: rpc_called.clone(),
1124        });
1125        let cp_server_run = server_run.clone();
1126        let join_handle = tokio::spawn(async move {
1127            cp_server_run.store(true, Ordering::SeqCst);
1128            tonic::transport::Server::builder()
1129                .add_service(exchange_svc)
1130                .serve_with_shutdown(addr, async move {
1131                    shutdown_recv.await.unwrap();
1132                })
1133                .await
1134                .unwrap();
1135        });
1136
1137        sleep(Duration::from_secs(1)).await;
1138        assert!(server_run.load(Ordering::SeqCst));
1139
1140        let test_env = LocalBarrierTestEnv::for_test().await;
1141
1142        let remote_input = {
1143            RemoteInput::new(
1144                &test_env.local_barrier_manager,
1145                addr.into(),
1146                (0, 0),
1147                (0, 0),
1148                Arc::new(StreamingMetrics::unused()),
1149            )
1150            .await
1151            .unwrap()
1152        };
1153
1154        test_env.inject_barrier(&exchange_client_test_barrier(), [remote_input.actor_id()]);
1155
1156        pin_mut!(remote_input);
1157
1158        assert_matches!(remote_input.next().await.unwrap().unwrap(), Message::Chunk(chunk) => {
1159            let (ops, columns, visibility) = chunk.into_inner();
1160            assert!(ops.is_empty());
1161            assert!(columns.is_empty());
1162            assert!(visibility.is_empty());
1163        });
1164        assert_matches!(remote_input.next().await.unwrap().unwrap(), Message::Barrier(Barrier { epoch: barrier_epoch, mutation: _, .. }) => {
1165            assert_eq!(barrier_epoch.curr, test_epoch(1));
1166        });
1167        assert!(rpc_called.load(Ordering::SeqCst));
1168
1169        shutdown_send.send(()).unwrap();
1170        join_handle.await.unwrap();
1171    }
1172}