risingwave_stream/executor/
rearranged_chain.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::channel::{mpsc, oneshot};
use futures::stream;
use futures::stream::select_with_strategy;

use crate::executor::prelude::*;
use crate::task::CreateMviewProgressReporter;

/// `ChainExecutor` is an executor that enables synchronization between the existing stream and
/// newly appended executors. Currently, `ChainExecutor` is mainly used to implement MV on MV
/// feature. It pipes new data of existing MVs to newly created MV only all of the old data in the
/// existing MVs are dispatched.
///
/// [`RearrangedChainExecutor`] resolves the latency problem when creating MV with a huge amount of
/// existing data, by rearranging the barrier from the upstream. Check the design doc for details.
pub struct RearrangedChainExecutor {
    snapshot: Executor,

    upstream: Executor,

    progress: CreateMviewProgressReporter,

    actor_id: ActorId,
}

#[derive(Debug)]
enum RearrangedMessage {
    RearrangedBarrier(Barrier),
    PhantomBarrier(Barrier),
    Chunk(StreamChunk),
    // This watermark is just a place holder.
    Watermark,
}

impl RearrangedMessage {
    fn phantom_into(self) -> Option<Message> {
        match self {
            RearrangedMessage::RearrangedBarrier(_) | RearrangedMessage::Watermark => None,
            RearrangedMessage::PhantomBarrier(barrier) => Message::Barrier(barrier).into(),
            RearrangedMessage::Chunk(chunk) => Message::Chunk(chunk).into(),
        }
    }
}

impl RearrangedMessage {
    fn rearranged_from(msg: Message) -> Self {
        match msg {
            Message::Watermark(_) => RearrangedMessage::Watermark,
            Message::Chunk(chunk) => RearrangedMessage::Chunk(chunk),
            Message::Barrier(barrier) => RearrangedMessage::RearrangedBarrier(barrier),
        }
    }

    fn phantom_from(msg: Message) -> Self {
        match msg {
            Message::Watermark(_) => RearrangedMessage::Watermark,
            Message::Chunk(chunk) => RearrangedMessage::Chunk(chunk),
            Message::Barrier(barrier) => RearrangedMessage::PhantomBarrier(barrier),
        }
    }
}

impl RearrangedChainExecutor {
    pub fn new(
        snapshot: Executor,
        upstream: Executor,
        progress: CreateMviewProgressReporter,
    ) -> Self {
        Self {
            snapshot,
            upstream,
            actor_id: progress.actor_id(),
            progress,
        }
    }

    #[try_stream(ok = Message, error = StreamExecutorError)]
    async fn execute_inner(mut self) {
        let mut upstream = pin!(self.upstream.execute());

        // 1. Poll the upstream to get the first barrier.
        let first_barrier = expect_first_barrier(&mut upstream).await?;
        let create_epoch = first_barrier.epoch;

        // If the barrier is a conf change of creating this mview, init snapshot from its epoch
        // and begin to consume the snapshot.
        // Otherwise, it means we've recovered and the snapshot is already consumed.
        let to_consume_snapshot = first_barrier.is_newly_added(self.actor_id);

        // The first barrier message should be propagated.
        yield Message::Barrier(first_barrier.clone());

        if to_consume_snapshot {
            // If we need to consume the snapshot ...
            // We will spawn a background task to poll the upstream actively, in order to get the
            // barrier as soon as possible and then to rearrange(steal) it.
            // The upstream after transforming the barriers to phantom barriers.
            let (upstream_tx, upstream_rx) = mpsc::unbounded();
            // When we catch-up the progress, notify the task to stop.
            let (stop_rearrange_tx, stop_rearrange_rx) = oneshot::channel();

            // 2. Actually, the `first_msg` is rearranged too. So we need to put a phantom barrier.
            upstream_tx
                .unbounded_send(RearrangedMessage::PhantomBarrier(first_barrier))
                .unwrap();

            let mut processed_rows: u64 = 0;

            {
                // 3. Rearrange stream, will yield the barriers polled from upstream to rearrange.
                let rearranged_barrier =
                    pin!(
                        Self::rearrange_barrier(&mut upstream, upstream_tx, stop_rearrange_rx)
                            .map(|result| result.map(RearrangedMessage::RearrangedBarrier)),
                    );

                // 4. Init the snapshot with reading epoch.
                let snapshot = self.snapshot.execute_with_epoch(create_epoch.prev);

                // Chain the `snapshot` and `upstream_rx` to get a unified `rearranged_chunks`
                // stream.
                let rearranged_chunks = snapshot
                    .map(|result| result.map(RearrangedMessage::rearranged_from))
                    .chain(upstream_rx.map(Ok));

                // 5. Merge the rearranged barriers with chunks, with the priority of barrier.
                let mut rearranged =
                    select_with_strategy(rearranged_barrier, rearranged_chunks, |_: &mut ()| {
                        stream::PollNext::Left
                    });

                // Record the epoch of the last rearranged barrier we received.
                let mut last_rearranged_epoch = create_epoch;
                let mut stop_rearrange_tx = Some(stop_rearrange_tx);

                #[for_await]
                for rearranged_msg in &mut rearranged {
                    match rearranged_msg? {
                        // If we received a phantom barrier, update the progress and check whether
                        // we catches up with the progress of upstream MV.
                        //
                        // Note that there's no phantom barrier in the snapshot. So we must have
                        // already consumed the whole snapshot and be on the
                        // upstream now.
                        RearrangedMessage::PhantomBarrier(barrier) => {
                            // Update the progress since we've consumed all chunks before this
                            // phantom.
                            self.progress.update(
                                last_rearranged_epoch,
                                barrier.epoch.curr,
                                processed_rows,
                            );

                            if barrier.epoch.curr >= last_rearranged_epoch.curr {
                                // Stop the background rearrangement task.
                                stop_rearrange_tx.take().unwrap().send(()).map_err(|_| {
                                    StreamExecutorError::channel_closed("stop rearrange")
                                })?;
                                break;
                            }
                        }

                        // If we received a message, yield it.
                        RearrangedMessage::RearrangedBarrier(barrier) => {
                            last_rearranged_epoch = barrier.epoch;
                            yield Message::Barrier(barrier);
                        }
                        RearrangedMessage::Chunk(chunk) => {
                            processed_rows += chunk.cardinality() as u64;
                            yield Message::Chunk(chunk)
                        }
                        RearrangedMessage::Watermark => {
                            // Ignore watermark during snapshot consumption.
                        }
                    }
                }

                // 7. Rearranged task finished.
                // The reason for finish must be that we told it to stop.
                tracing::trace!("rearranged task finished");
                if stop_rearrange_tx.is_some() {
                    tracing::error!("rearrangement finished passively");
                }

                // 8. Consume remainings.
                // Note that there may still be some messages in `rearranged`. However the
                // rearranged barriers must be ignored, we should take the phantoms.
                #[for_await]
                for msg in rearranged {
                    let msg: RearrangedMessage = msg?;
                    let Some(msg) = msg.phantom_into() else {
                        continue;
                    };
                    if let Some(barrier) = msg.as_barrier() {
                        self.progress.finish(barrier.epoch, processed_rows);
                    }
                    yield msg;
                }
            }

            // Consume remaining upstream.
            tracing::trace!("begin to consume remaining upstream");

            #[for_await]
            for msg in upstream {
                let msg: Message = msg?;
                if let Some(barrier) = msg.as_barrier() {
                    self.progress.finish(barrier.epoch, processed_rows);
                }
                yield msg;
            }
        } else {
            // If there's no need to consume the snapshot ...
            // We directly forward the messages from the upstream.

            #[for_await]
            for msg in upstream {
                yield msg?;
            }
        }
    }

    /// Rearrangement stream. The `upstream: U` will be taken out from the mutex, then put back
    /// after stopped.
    ///
    /// Check `execute_inner` for more details.
    #[try_stream(ok = Barrier, error = StreamExecutorError)]
    async fn rearrange_barrier<U>(
        upstream: &mut U,
        upstream_tx: mpsc::UnboundedSender<RearrangedMessage>,
        mut stop_rearrange_rx: oneshot::Receiver<()>,
    ) where
        U: MessageStream + std::marker::Unpin,
    {
        loop {
            use futures::future::{select, Either};

            // Stop when `stop_rearrange_rx` is received.
            match select(&mut stop_rearrange_rx, upstream.next()).await {
                Either::Left((Ok(_), _)) => break,
                Either::Left((Err(_e), _)) => {
                    return Err(StreamExecutorError::channel_closed("stop rearrange"))
                }

                Either::Right((Some(msg), _)) => {
                    let msg = msg?;

                    // If we polled a barrier, rearrange it by yielding and leave a phantom barrier
                    // with `RearrangedMessage::phantom_from` in-place.
                    // If we polled a chunk, simply put it to the `upstream_tx`.
                    if let Some(barrier) = msg.as_barrier().cloned() {
                        yield barrier;
                    }
                    upstream_tx
                        .unbounded_send(RearrangedMessage::phantom_from(msg))
                        .map_err(|_| StreamExecutorError::channel_closed("rearranged upstream"))?;
                }
                Either::Right((None, _)) => {
                    Err(StreamExecutorError::channel_closed("upstream"))?;
                }
            }
        }
    }
}

impl Execute for RearrangedChainExecutor {
    fn execute(self: Box<Self>) -> super::BoxedMessageStream {
        self.execute_inner().boxed()
    }
}

// TODO: add new unit tests for rearranged chain