risingwave_stream/executor/lookup/
sides.rs1use std::pin::pin;
16
17use anyhow::Context;
18use either::Either;
19use futures::future::select;
20use futures::{StreamExt, future};
21use futures_async_stream::try_stream;
22use risingwave_common::array::StreamChunk;
23use risingwave_common::bail;
24use risingwave_common::catalog::ColumnDesc;
25use risingwave_common::types::DataType;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_storage::StateStore;
28use risingwave_storage::row_serde::value_serde::ValueRowSerde;
29
30use crate::common::table::state_table::ReplicatedStateTable;
31use crate::executor::error::StreamExecutorError;
32use crate::executor::{Barrier, BoxedMessageStream, Message, MessageStream};
33
34pub(crate) struct StreamJoinSide {
36 pub key_indices: Vec<usize>,
38
39 pub pk_indices: Vec<usize>,
41
42 pub col_types: Vec<DataType>,
44}
45
46impl std::fmt::Debug for StreamJoinSide {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("ArrangeJoinSide")
49 .field("key_indices", &self.key_indices)
50 .field("pk_indices", &self.pk_indices)
51 .field("col_types", &self.col_types)
52 .finish()
53 }
54}
55
56pub(crate) struct ArrangeJoinSide<S: StateStore, SD: ValueRowSerde> {
58 pub pk_indices: Vec<usize>,
60
61 pub col_types: Vec<DataType>,
63
64 pub col_descs: Vec<ColumnDesc>,
66
67 pub order_rules: Vec<ColumnOrder>,
70
71 pub key_indices: Vec<usize>,
76
77 pub use_current_epoch: bool,
79
80 pub state_table: ReplicatedStateTable<S, SD>,
81}
82
83#[derive(Debug)]
85pub enum ArrangeMessage {
86 ArrangeReady(Vec<StreamChunk>, Barrier),
90
91 Stream(StreamChunk),
93
94 Barrier(Barrier),
96}
97
98pub type BarrierAlignedMessage = Either<Message, Message>;
99
100#[try_stream(ok = Message, error = StreamExecutorError)]
101pub async fn poll_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
102 #[for_await]
103 for item in stream {
104 match item? {
105 Message::Watermark(_) => {
106 }
108 c @ Message::Chunk(_) => yield c,
109 Message::Barrier(b) => {
110 if b.epoch != expected_barrier.epoch {
111 return Err(StreamExecutorError::align_barrier(expected_barrier, b));
112 } else {
113 yield Message::Barrier(b);
114 break;
115 }
116 }
117 }
118 }
119}
120
121#[try_stream(ok = BarrierAlignedMessage, error = StreamExecutorError)]
124pub async fn align_barrier(mut left: BoxedMessageStream, mut right: BoxedMessageStream) {
125 enum SideStatus {
126 LeftBarrier,
127 RightBarrier,
128 }
129
130 'outer: loop {
131 let (side_status, side_barrier) = 'inner: loop {
132 let select_result = match select(right.next(), left.next()).await {
134 future::Either::Left(x) => future::Either::Right(x),
135 future::Either::Right(x) => future::Either::Left(x),
136 };
137 match select_result {
138 future::Either::Left((None, _)) => {
139 while let Some(msg) = right.next().await {
141 match msg? {
142 w @ Message::Watermark(_) => yield Either::Left(w),
143 c @ Message::Chunk(_) => yield Either::Left(c),
144 Message::Barrier(_) => {
145 bail!("right barrier received while left stream end");
146 }
147 }
148 }
149 break 'outer;
150 }
151 future::Either::Right((None, _)) => {
152 while let Some(msg) = left.next().await {
154 match msg? {
155 w @ Message::Watermark(_) => yield Either::Right(w),
156 c @ Message::Chunk(_) => yield Either::Right(c),
157 Message::Barrier(_) => {
158 bail!("left barrier received while right stream end");
159 }
160 }
161 }
162 break 'outer;
163 }
164 future::Either::Left((Some(msg), _)) => match msg? {
165 w @ Message::Watermark(_) => yield Either::Left(w),
166 c @ Message::Chunk(_) => yield Either::Left(c),
167 Message::Barrier(b) => {
168 yield Either::Left(Message::Barrier(b.clone()));
169 break 'inner (SideStatus::LeftBarrier, b);
170 }
171 },
172 future::Either::Right((Some(msg), _)) => match msg? {
173 w @ Message::Watermark(_) => yield Either::Right(w),
174 c @ Message::Chunk(_) => yield Either::Right(c),
175 Message::Barrier(b) => {
176 yield Either::Right(Message::Barrier(b.clone()));
177 break 'inner (SideStatus::RightBarrier, b);
178 }
179 },
180 }
181 };
182
183 match side_status {
184 SideStatus::LeftBarrier => {
185 #[for_await]
186 for item in poll_until_barrier(right.by_ref(), side_barrier) {
187 yield Either::Right(item?);
188 }
189 }
190 SideStatus::RightBarrier => {
191 #[for_await]
192 for item in poll_until_barrier(left.by_ref(), side_barrier) {
193 yield Either::Left(item?);
194 }
195 }
196 }
197 }
198}
199
200#[try_stream(ok = ArrangeMessage, error = StreamExecutorError)]
215pub async fn stream_lookup_arrange_prev_epoch(
216 stream: BoxedMessageStream,
217 arrangement: BoxedMessageStream,
218) {
219 let mut input = pin!(align_barrier(stream, arrangement));
220 let mut arrange_buf = vec![];
221 let mut stream_side_end = false;
222
223 loop {
224 let mut arrange_barrier = None;
225
226 while let Some(item) = input.next().await {
227 match item? {
228 Either::Left(Message::Chunk(msg)) => {
229 yield ArrangeMessage::Stream(msg);
232 }
233 Either::Right(Message::Chunk(chunk)) => {
234 arrange_buf.push(chunk);
236 }
237 Either::Left(Message::Barrier(barrier)) => {
238 yield ArrangeMessage::Barrier(barrier);
239 stream_side_end = true;
240 }
241 Either::Right(Message::Barrier(barrier)) => {
242 if stream_side_end {
243 yield ArrangeMessage::ArrangeReady(
244 std::mem::take(&mut arrange_buf),
245 barrier,
246 );
247 stream_side_end = false;
248 } else {
249 arrange_barrier = Some(barrier);
250 break;
251 }
252 }
253 Either::Left(Message::Watermark(_)) => {
254 }
256 Either::Right(Message::Watermark(_)) => {
257 }
259 }
260 }
261
262 loop {
263 match input
264 .next()
265 .await
266 .context("unexpected close of barrier aligner")??
267 {
268 Either::Left(Message::Watermark(_)) => {
269 }
271 Either::Left(Message::Chunk(msg)) => yield ArrangeMessage::Stream(msg),
272 Either::Left(Message::Barrier(b)) => {
273 yield ArrangeMessage::Barrier(b);
274 break;
275 }
276 Either::Right(_) => unreachable!(),
277 }
278 }
279
280 yield ArrangeMessage::ArrangeReady(
281 std::mem::take(&mut arrange_buf),
282 arrange_barrier.take().unwrap(),
283 );
284 }
285}
286
287#[try_stream(ok = ArrangeMessage, error = StreamExecutorError)]
299pub async fn stream_lookup_arrange_this_epoch(
300 stream: BoxedMessageStream,
301 arrangement: BoxedMessageStream,
302) {
303 let mut input = pin!(align_barrier(stream, arrangement));
304 let mut stream_buf = vec![];
305 let mut arrange_buf = vec![];
306
307 enum Status {
308 ArrangeReady,
309 StreamReady(Barrier),
310 }
311
312 loop {
313 let status = 'inner: loop {
314 match input
315 .next()
316 .await
317 .context("unexpected close of barrier aligner")??
318 {
319 Either::Left(Message::Chunk(msg)) => {
320 stream_buf.push(msg);
322 }
323 Either::Right(Message::Chunk(chunk)) => {
324 arrange_buf.push(chunk);
326 }
327 Either::Left(Message::Barrier(barrier)) => {
328 break 'inner Status::StreamReady(barrier);
329 }
330 Either::Right(Message::Barrier(barrier)) => {
331 yield ArrangeMessage::ArrangeReady(std::mem::take(&mut arrange_buf), barrier);
332 for msg in std::mem::take(&mut stream_buf) {
333 yield ArrangeMessage::Stream(msg);
334 }
335 break 'inner Status::ArrangeReady;
336 }
337 Either::Left(Message::Watermark(_)) => {
338 }
340 Either::Right(Message::Watermark(_)) => {
341 }
343 }
344 };
345 match status {
346 Status::ArrangeReady => loop {
349 match input
350 .next()
351 .await
352 .context("unexpected close of barrier aligner")??
353 {
354 Either::Left(Message::Chunk(msg)) => yield ArrangeMessage::Stream(msg),
355 Either::Left(Message::Barrier(b)) => {
356 yield ArrangeMessage::Barrier(b);
357 break;
358 }
359 Either::Left(Message::Watermark(_)) => {
360 }
362 Either::Right(Message::Watermark(_)) => {
363 }
365 Either::Right(_) => unreachable!(),
366 }
367 },
368 Status::StreamReady(stream_barrier) => loop {
371 match input
372 .next()
373 .await
374 .context("unexpected close of barrier aligner")??
375 {
376 Either::Left(_) => unreachable!(),
377 Either::Right(Message::Chunk(chunk)) => {
378 arrange_buf.push(chunk);
379 }
380 Either::Right(Message::Barrier(barrier)) => {
381 yield ArrangeMessage::ArrangeReady(
382 std::mem::take(&mut arrange_buf),
383 barrier,
384 );
385 for msg in std::mem::take(&mut stream_buf) {
386 yield ArrangeMessage::Stream(msg);
387 }
388 yield ArrangeMessage::Barrier(stream_barrier);
389 break;
390 }
391 Either::Right(Message::Watermark(_)) => {
392 }
394 }
395 },
396 }
397 }
398}
399
400impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for ArrangeJoinSide<S, SD> {
401 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
402 f.debug_struct("ArrangeJoinSide")
403 .field("pk_indices", &self.pk_indices)
404 .field("col_types", &self.col_types)
405 .field("col_descs", &self.col_descs)
406 .field("order_rules", &self.order_rules)
407 .field("use_current_epoch", &self.use_current_epoch)
408 .finish()
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use futures::StreamExt;
415 use risingwave_common::array::{StreamChunk, StreamChunkTestExt};
416 use risingwave_common::catalog::{Field, Schema};
417 use risingwave_common::types::DataType;
418 use risingwave_common::util::epoch::test_epoch;
419
420 use crate::executor::StreamExecutorResult;
421 use crate::executor::lookup::sides::stream_lookup_arrange_this_epoch;
422 use crate::executor::test_utils::MockSource;
423
424 #[tokio::test]
425 async fn test_stream_lookup_arrange_this_epoch() -> StreamExecutorResult<()> {
426 let chunk_l1 = StreamChunk::from_pretty(
427 " I I
428 + 1 1",
429 );
430
431 let schema = Schema {
432 fields: vec![
433 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
435 ],
436 };
437 let (mut tx_l, source_l) = MockSource::channel();
438 let source_l = source_l
439 .stop_on_finish(false)
440 .into_executor(schema.clone(), vec![1]);
441 let (tx_r, source_r) = MockSource::channel();
442 let source_r = source_r
443 .stop_on_finish(false)
444 .into_executor(schema, vec![1]);
445
446 let mut stream =
447 stream_lookup_arrange_this_epoch(source_l.execute(), source_r.execute()).boxed();
448
449 drop(tx_r);
451
452 tx_l.push_barrier(test_epoch(1), false);
453
454 tx_l.push_chunk(chunk_l1);
455
456 stream.next().await.unwrap().unwrap_err();
458
459 Ok(())
460 }
461}