risingwave_stream/executor/rearranged_chain.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::channel::{mpsc, oneshot};
16use futures::stream;
17use futures::stream::select_with_strategy;
18
19use crate::executor::prelude::*;
20use crate::task::CreateMviewProgressReporter;
21
22/// `ChainExecutor` is an executor that enables synchronization between the existing stream and
23/// newly appended executors. Currently, `ChainExecutor` is mainly used to implement MV on MV
24/// feature. It pipes new data of existing MVs to newly created MV only all of the old data in the
25/// existing MVs are dispatched.
26///
27/// [`RearrangedChainExecutor`] resolves the latency problem when creating MV with a huge amount of
28/// existing data, by rearranging the barrier from the upstream. Check the design doc for details.
29pub struct RearrangedChainExecutor {
30 snapshot: Executor,
31
32 upstream: Executor,
33
34 progress: CreateMviewProgressReporter,
35
36 actor_id: ActorId,
37}
38
39#[derive(Debug)]
40enum RearrangedMessage {
41 RearrangedBarrier(Barrier),
42 PhantomBarrier(Barrier),
43 Chunk(StreamChunk),
44 // This watermark is just a place holder.
45 Watermark,
46}
47
48impl RearrangedMessage {
49 fn phantom_into(self) -> Option<Message> {
50 match self {
51 RearrangedMessage::RearrangedBarrier(_) | RearrangedMessage::Watermark => None,
52 RearrangedMessage::PhantomBarrier(barrier) => Message::Barrier(barrier).into(),
53 RearrangedMessage::Chunk(chunk) => Message::Chunk(chunk).into(),
54 }
55 }
56}
57
58impl RearrangedMessage {
59 fn rearranged_from(msg: Message) -> Self {
60 match msg {
61 Message::Watermark(_) => RearrangedMessage::Watermark,
62 Message::Chunk(chunk) => RearrangedMessage::Chunk(chunk),
63 Message::Barrier(barrier) => RearrangedMessage::RearrangedBarrier(barrier),
64 }
65 }
66
67 fn phantom_from(msg: Message) -> Self {
68 match msg {
69 Message::Watermark(_) => RearrangedMessage::Watermark,
70 Message::Chunk(chunk) => RearrangedMessage::Chunk(chunk),
71 Message::Barrier(barrier) => RearrangedMessage::PhantomBarrier(barrier),
72 }
73 }
74}
75
76impl RearrangedChainExecutor {
77 pub fn new(
78 snapshot: Executor,
79 upstream: Executor,
80 progress: CreateMviewProgressReporter,
81 ) -> Self {
82 Self {
83 snapshot,
84 upstream,
85 actor_id: progress.actor_id(),
86 progress,
87 }
88 }
89
90 #[try_stream(ok = Message, error = StreamExecutorError)]
91 async fn execute_inner(mut self) {
92 let mut upstream = pin!(self.upstream.execute());
93
94 // 1. Poll the upstream to get the first barrier.
95 let first_barrier = expect_first_barrier(&mut upstream).await?;
96 let create_epoch = first_barrier.epoch;
97
98 // If the barrier is a conf change of creating this mview, init snapshot from its epoch
99 // and begin to consume the snapshot.
100 // Otherwise, it means we've recovered and the snapshot is already consumed.
101 let to_consume_snapshot = first_barrier.is_newly_added(self.actor_id);
102
103 // The first barrier message should be propagated.
104 yield Message::Barrier(first_barrier.clone());
105
106 if to_consume_snapshot {
107 // If we need to consume the snapshot ...
108 // We will spawn a background task to poll the upstream actively, in order to get the
109 // barrier as soon as possible and then to rearrange(steal) it.
110 // The upstream after transforming the barriers to phantom barriers.
111 let (upstream_tx, upstream_rx) = mpsc::unbounded();
112 // When we catch-up the progress, notify the task to stop.
113 let (stop_rearrange_tx, stop_rearrange_rx) = oneshot::channel();
114
115 // 2. Actually, the `first_msg` is rearranged too. So we need to put a phantom barrier.
116 upstream_tx
117 .unbounded_send(RearrangedMessage::PhantomBarrier(first_barrier))
118 .unwrap();
119
120 let mut processed_rows: u64 = 0;
121
122 {
123 // 3. Rearrange stream, will yield the barriers polled from upstream to rearrange.
124 let rearranged_barrier = pin!(
125 Self::rearrange_barrier(&mut upstream, upstream_tx, stop_rearrange_rx)
126 .map(|result| result.map(RearrangedMessage::RearrangedBarrier)),
127 );
128
129 // 4. Init the snapshot with reading epoch.
130 let snapshot = self.snapshot.execute_with_epoch(create_epoch.prev);
131
132 // Chain the `snapshot` and `upstream_rx` to get a unified `rearranged_chunks`
133 // stream.
134 let rearranged_chunks = snapshot
135 .map(|result| result.map(RearrangedMessage::rearranged_from))
136 .chain(upstream_rx.map(Ok));
137
138 // 5. Merge the rearranged barriers with chunks, with the priority of barrier.
139 let mut rearranged =
140 select_with_strategy(rearranged_barrier, rearranged_chunks, |_: &mut ()| {
141 stream::PollNext::Left
142 });
143
144 // Record the epoch of the last rearranged barrier we received.
145 let mut last_rearranged_epoch = create_epoch;
146 let mut stop_rearrange_tx = Some(stop_rearrange_tx);
147
148 #[for_await]
149 for rearranged_msg in &mut rearranged {
150 match rearranged_msg? {
151 // If we received a phantom barrier, update the progress and check whether
152 // we catches up with the progress of upstream MV.
153 //
154 // Note that there's no phantom barrier in the snapshot. So we must have
155 // already consumed the whole snapshot and be on the
156 // upstream now.
157 RearrangedMessage::PhantomBarrier(barrier) => {
158 // Update the progress since we've consumed all chunks before this
159 // phantom.
160 self.progress.update(
161 last_rearranged_epoch,
162 barrier.epoch.curr,
163 processed_rows,
164 );
165
166 if barrier.epoch.curr >= last_rearranged_epoch.curr {
167 // Stop the background rearrangement task.
168 stop_rearrange_tx.take().unwrap().send(()).map_err(|_| {
169 StreamExecutorError::channel_closed("stop rearrange")
170 })?;
171 break;
172 }
173 }
174
175 // If we received a message, yield it.
176 RearrangedMessage::RearrangedBarrier(barrier) => {
177 last_rearranged_epoch = barrier.epoch;
178 yield Message::Barrier(barrier);
179 }
180 RearrangedMessage::Chunk(chunk) => {
181 processed_rows += chunk.cardinality() as u64;
182 yield Message::Chunk(chunk)
183 }
184 RearrangedMessage::Watermark => {
185 // Ignore watermark during snapshot consumption.
186 }
187 }
188 }
189
190 // 7. Rearranged task finished.
191 // The reason for finish must be that we told it to stop.
192 tracing::trace!("rearranged task finished");
193 if stop_rearrange_tx.is_some() {
194 tracing::error!("rearrangement finished passively");
195 }
196
197 // 8. Consume remainings.
198 // Note that there may still be some messages in `rearranged`. However the
199 // rearranged barriers must be ignored, we should take the phantoms.
200 #[for_await]
201 for msg in rearranged {
202 let msg: RearrangedMessage = msg?;
203 let Some(msg) = msg.phantom_into() else {
204 continue;
205 };
206 if let Some(barrier) = msg.as_barrier() {
207 self.progress.finish(barrier.epoch, processed_rows);
208 }
209 yield msg;
210 }
211 }
212
213 // Consume remaining upstream.
214 tracing::trace!("begin to consume remaining upstream");
215
216 #[for_await]
217 for msg in upstream {
218 let msg: Message = msg?;
219 if let Some(barrier) = msg.as_barrier() {
220 self.progress.finish(barrier.epoch, processed_rows);
221 }
222 yield msg;
223 }
224 } else {
225 // If there's no need to consume the snapshot ...
226 // We directly forward the messages from the upstream.
227
228 #[for_await]
229 for msg in upstream {
230 yield msg?;
231 }
232 }
233 }
234
235 /// Rearrangement stream. The `upstream: U` will be taken out from the mutex, then put back
236 /// after stopped.
237 ///
238 /// Check `execute_inner` for more details.
239 #[try_stream(ok = Barrier, error = StreamExecutorError)]
240 async fn rearrange_barrier<U>(
241 upstream: &mut U,
242 upstream_tx: mpsc::UnboundedSender<RearrangedMessage>,
243 mut stop_rearrange_rx: oneshot::Receiver<()>,
244 ) where
245 U: MessageStream + std::marker::Unpin,
246 {
247 loop {
248 use futures::future::{Either, select};
249
250 // Stop when `stop_rearrange_rx` is received.
251 match select(&mut stop_rearrange_rx, upstream.next()).await {
252 Either::Left((Ok(_), _)) => break,
253 Either::Left((Err(_e), _)) => {
254 return Err(StreamExecutorError::channel_closed("stop rearrange"));
255 }
256
257 Either::Right((Some(msg), _)) => {
258 let msg = msg?;
259
260 // If we polled a barrier, rearrange it by yielding and leave a phantom barrier
261 // with `RearrangedMessage::phantom_from` in-place.
262 // If we polled a chunk, simply put it to the `upstream_tx`.
263 if let Some(barrier) = msg.as_barrier().cloned() {
264 yield barrier;
265 }
266 upstream_tx
267 .unbounded_send(RearrangedMessage::phantom_from(msg))
268 .map_err(|_| StreamExecutorError::channel_closed("rearranged upstream"))?;
269 }
270 Either::Right((None, _)) => {
271 Err(StreamExecutorError::channel_closed("upstream"))?;
272 }
273 }
274 }
275 }
276}
277
278impl Execute for RearrangedChainExecutor {
279 fn execute(self: Box<Self>) -> super::BoxedMessageStream {
280 self.execute_inner().boxed()
281 }
282}
283
284// TODO: add new unit tests for rearranged chain