risingwave_stream/executor/
rearranged_chain.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::channel::{mpsc, oneshot};
16use futures::stream;
17use futures::stream::select_with_strategy;
18
19use crate::executor::prelude::*;
20use crate::task::CreateMviewProgressReporter;
21
22/// `ChainExecutor` is an executor that enables synchronization between the existing stream and
23/// newly appended executors. Currently, `ChainExecutor` is mainly used to implement MV on MV
24/// feature. It pipes new data of existing MVs to newly created MV only all of the old data in the
25/// existing MVs are dispatched.
26///
27/// [`RearrangedChainExecutor`] resolves the latency problem when creating MV with a huge amount of
28/// existing data, by rearranging the barrier from the upstream. Check the design doc for details.
29pub struct RearrangedChainExecutor {
30    snapshot: Executor,
31
32    upstream: Executor,
33
34    progress: CreateMviewProgressReporter,
35
36    actor_id: ActorId,
37}
38
39#[derive(Debug)]
40enum RearrangedMessage {
41    RearrangedBarrier(Barrier),
42    PhantomBarrier(Barrier),
43    Chunk(StreamChunk),
44    // This watermark is just a place holder.
45    Watermark,
46}
47
48impl RearrangedMessage {
49    fn phantom_into(self) -> Option<Message> {
50        match self {
51            RearrangedMessage::RearrangedBarrier(_) | RearrangedMessage::Watermark => None,
52            RearrangedMessage::PhantomBarrier(barrier) => Message::Barrier(barrier).into(),
53            RearrangedMessage::Chunk(chunk) => Message::Chunk(chunk).into(),
54        }
55    }
56}
57
58impl RearrangedMessage {
59    fn rearranged_from(msg: Message) -> Self {
60        match msg {
61            Message::Watermark(_) => RearrangedMessage::Watermark,
62            Message::Chunk(chunk) => RearrangedMessage::Chunk(chunk),
63            Message::Barrier(barrier) => RearrangedMessage::RearrangedBarrier(barrier),
64        }
65    }
66
67    fn phantom_from(msg: Message) -> Self {
68        match msg {
69            Message::Watermark(_) => RearrangedMessage::Watermark,
70            Message::Chunk(chunk) => RearrangedMessage::Chunk(chunk),
71            Message::Barrier(barrier) => RearrangedMessage::PhantomBarrier(barrier),
72        }
73    }
74}
75
76impl RearrangedChainExecutor {
77    pub fn new(
78        snapshot: Executor,
79        upstream: Executor,
80        progress: CreateMviewProgressReporter,
81    ) -> Self {
82        Self {
83            snapshot,
84            upstream,
85            actor_id: progress.actor_id(),
86            progress,
87        }
88    }
89
90    #[try_stream(ok = Message, error = StreamExecutorError)]
91    async fn execute_inner(mut self) {
92        let mut upstream = pin!(self.upstream.execute());
93
94        // 1. Poll the upstream to get the first barrier.
95        let first_barrier = expect_first_barrier(&mut upstream).await?;
96        let create_epoch = first_barrier.epoch;
97
98        // If the barrier is a conf change of creating this mview, init snapshot from its epoch
99        // and begin to consume the snapshot.
100        // Otherwise, it means we've recovered and the snapshot is already consumed.
101        let to_consume_snapshot = first_barrier.is_newly_added(self.actor_id);
102
103        // The first barrier message should be propagated.
104        yield Message::Barrier(first_barrier.clone());
105
106        if to_consume_snapshot {
107            // If we need to consume the snapshot ...
108            // We will spawn a background task to poll the upstream actively, in order to get the
109            // barrier as soon as possible and then to rearrange(steal) it.
110            // The upstream after transforming the barriers to phantom barriers.
111            let (upstream_tx, upstream_rx) = mpsc::unbounded();
112            // When we catch-up the progress, notify the task to stop.
113            let (stop_rearrange_tx, stop_rearrange_rx) = oneshot::channel();
114
115            // 2. Actually, the `first_msg` is rearranged too. So we need to put a phantom barrier.
116            upstream_tx
117                .unbounded_send(RearrangedMessage::PhantomBarrier(first_barrier))
118                .unwrap();
119
120            let mut processed_rows: u64 = 0;
121
122            {
123                // 3. Rearrange stream, will yield the barriers polled from upstream to rearrange.
124                let rearranged_barrier = pin!(
125                    Self::rearrange_barrier(&mut upstream, upstream_tx, stop_rearrange_rx)
126                        .map(|result| result.map(RearrangedMessage::RearrangedBarrier)),
127                );
128
129                // 4. Init the snapshot with reading epoch.
130                let snapshot = self.snapshot.execute_with_epoch(create_epoch.prev);
131
132                // Chain the `snapshot` and `upstream_rx` to get a unified `rearranged_chunks`
133                // stream.
134                let rearranged_chunks = snapshot
135                    .map(|result| result.map(RearrangedMessage::rearranged_from))
136                    .chain(upstream_rx.map(Ok));
137
138                // 5. Merge the rearranged barriers with chunks, with the priority of barrier.
139                let mut rearranged =
140                    select_with_strategy(rearranged_barrier, rearranged_chunks, |_: &mut ()| {
141                        stream::PollNext::Left
142                    });
143
144                // Record the epoch of the last rearranged barrier we received.
145                let mut last_rearranged_epoch = create_epoch;
146                let mut stop_rearrange_tx = Some(stop_rearrange_tx);
147
148                #[for_await]
149                for rearranged_msg in &mut rearranged {
150                    match rearranged_msg? {
151                        // If we received a phantom barrier, update the progress and check whether
152                        // we catches up with the progress of upstream MV.
153                        //
154                        // Note that there's no phantom barrier in the snapshot. So we must have
155                        // already consumed the whole snapshot and be on the
156                        // upstream now.
157                        RearrangedMessage::PhantomBarrier(barrier) => {
158                            // Update the progress since we've consumed all chunks before this
159                            // phantom.
160                            self.progress.update(
161                                last_rearranged_epoch,
162                                barrier.epoch.curr,
163                                processed_rows,
164                            );
165
166                            if barrier.epoch.curr >= last_rearranged_epoch.curr {
167                                // Stop the background rearrangement task.
168                                stop_rearrange_tx.take().unwrap().send(()).map_err(|_| {
169                                    StreamExecutorError::channel_closed("stop rearrange")
170                                })?;
171                                break;
172                            }
173                        }
174
175                        // If we received a message, yield it.
176                        RearrangedMessage::RearrangedBarrier(barrier) => {
177                            last_rearranged_epoch = barrier.epoch;
178                            yield Message::Barrier(barrier);
179                        }
180                        RearrangedMessage::Chunk(chunk) => {
181                            processed_rows += chunk.cardinality() as u64;
182                            yield Message::Chunk(chunk)
183                        }
184                        RearrangedMessage::Watermark => {
185                            // Ignore watermark during snapshot consumption.
186                        }
187                    }
188                }
189
190                // 7. Rearranged task finished.
191                // The reason for finish must be that we told it to stop.
192                tracing::trace!("rearranged task finished");
193                if stop_rearrange_tx.is_some() {
194                    tracing::error!("rearrangement finished passively");
195                }
196
197                // 8. Consume remainings.
198                // Note that there may still be some messages in `rearranged`. However the
199                // rearranged barriers must be ignored, we should take the phantoms.
200                #[for_await]
201                for msg in rearranged {
202                    let msg: RearrangedMessage = msg?;
203                    let Some(msg) = msg.phantom_into() else {
204                        continue;
205                    };
206                    if let Some(barrier) = msg.as_barrier() {
207                        self.progress.finish(barrier.epoch, processed_rows);
208                    }
209                    yield msg;
210                }
211            }
212
213            // Consume remaining upstream.
214            tracing::trace!("begin to consume remaining upstream");
215
216            #[for_await]
217            for msg in upstream {
218                let msg: Message = msg?;
219                if let Some(barrier) = msg.as_barrier() {
220                    self.progress.finish(barrier.epoch, processed_rows);
221                }
222                yield msg;
223            }
224        } else {
225            // If there's no need to consume the snapshot ...
226            // We directly forward the messages from the upstream.
227
228            #[for_await]
229            for msg in upstream {
230                yield msg?;
231            }
232        }
233    }
234
235    /// Rearrangement stream. The `upstream: U` will be taken out from the mutex, then put back
236    /// after stopped.
237    ///
238    /// Check `execute_inner` for more details.
239    #[try_stream(ok = Barrier, error = StreamExecutorError)]
240    async fn rearrange_barrier<U>(
241        upstream: &mut U,
242        upstream_tx: mpsc::UnboundedSender<RearrangedMessage>,
243        mut stop_rearrange_rx: oneshot::Receiver<()>,
244    ) where
245        U: MessageStream + std::marker::Unpin,
246    {
247        loop {
248            use futures::future::{Either, select};
249
250            // Stop when `stop_rearrange_rx` is received.
251            match select(&mut stop_rearrange_rx, upstream.next()).await {
252                Either::Left((Ok(_), _)) => break,
253                Either::Left((Err(_e), _)) => {
254                    return Err(StreamExecutorError::channel_closed("stop rearrange"));
255                }
256
257                Either::Right((Some(msg), _)) => {
258                    let msg = msg?;
259
260                    // If we polled a barrier, rearrange it by yielding and leave a phantom barrier
261                    // with `RearrangedMessage::phantom_from` in-place.
262                    // If we polled a chunk, simply put it to the `upstream_tx`.
263                    if let Some(barrier) = msg.as_barrier().cloned() {
264                        yield barrier;
265                    }
266                    upstream_tx
267                        .unbounded_send(RearrangedMessage::phantom_from(msg))
268                        .map_err(|_| StreamExecutorError::channel_closed("rearranged upstream"))?;
269                }
270                Either::Right((None, _)) => {
271                    Err(StreamExecutorError::channel_closed("upstream"))?;
272                }
273            }
274        }
275    }
276}
277
278impl Execute for RearrangedChainExecutor {
279    fn execute(self: Box<Self>) -> super::BoxedMessageStream {
280        self.execute_inner().boxed()
281    }
282}
283
284// TODO: add new unit tests for rearranged chain