risingwave_stream/executor/lookup/
sides.rs1use std::pin::pin;
16
17use anyhow::Context;
18use either::Either;
19use futures::future::select;
20use futures::{StreamExt, future};
21use futures_async_stream::try_stream;
22use risingwave_common::array::StreamChunk;
23use risingwave_common::bail;
24use risingwave_common::catalog::ColumnDesc;
25use risingwave_common::types::DataType;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_storage::StateStore;
28use risingwave_storage::table::batch_table::BatchTable;
29
30use crate::executor::error::StreamExecutorError;
31use crate::executor::{Barrier, BoxedMessageStream, Executor, Message, MessageStream};
32
33pub(crate) struct StreamJoinSide {
35 pub key_indices: Vec<usize>,
37
38 pub pk_indices: Vec<usize>,
40
41 pub col_types: Vec<DataType>,
43}
44
45impl std::fmt::Debug for StreamJoinSide {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("ArrangeJoinSide")
48 .field("key_indices", &self.key_indices)
49 .field("pk_indices", &self.pk_indices)
50 .field("col_types", &self.col_types)
51 .finish()
52 }
53}
54
55pub(crate) struct ArrangeJoinSide<S: StateStore> {
57 pub pk_indices: Vec<usize>,
59
60 pub col_types: Vec<DataType>,
62
63 pub col_descs: Vec<ColumnDesc>,
65
66 pub order_rules: Vec<ColumnOrder>,
69
70 pub key_indices: Vec<usize>,
75
76 pub use_current_epoch: bool,
78
79 pub batch_table: BatchTable<S>,
80}
81
82#[derive(Debug)]
84pub enum ArrangeMessage {
85 ArrangeReady(Vec<StreamChunk>, Barrier),
89
90 Stream(StreamChunk),
92
93 Barrier(Barrier),
95}
96
97pub type BarrierAlignedMessage = Either<Message, Message>;
98
99#[try_stream(ok = Message, error = StreamExecutorError)]
100pub async fn poll_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
101 #[for_await]
102 for item in stream {
103 match item? {
104 Message::Watermark(_) => {
105 }
107 c @ Message::Chunk(_) => yield c,
108 Message::Barrier(b) => {
109 if b.epoch != expected_barrier.epoch {
110 return Err(StreamExecutorError::align_barrier(expected_barrier, b));
111 } else {
112 yield Message::Barrier(b);
113 break;
114 }
115 }
116 }
117 }
118}
119
120#[try_stream(ok = BarrierAlignedMessage, error = StreamExecutorError)]
123pub async fn align_barrier(mut left: BoxedMessageStream, mut right: BoxedMessageStream) {
124 enum SideStatus {
125 LeftBarrier,
126 RightBarrier,
127 }
128
129 'outer: loop {
130 let (side_status, side_barrier) = 'inner: loop {
131 let select_result = match select(right.next(), left.next()).await {
133 future::Either::Left(x) => future::Either::Right(x),
134 future::Either::Right(x) => future::Either::Left(x),
135 };
136 match select_result {
137 future::Either::Left((None, _)) => {
138 while let Some(msg) = right.next().await {
140 match msg? {
141 w @ Message::Watermark(_) => yield Either::Left(w),
142 c @ Message::Chunk(_) => yield Either::Left(c),
143 Message::Barrier(_) => {
144 bail!("right barrier received while left stream end");
145 }
146 }
147 }
148 break 'outer;
149 }
150 future::Either::Right((None, _)) => {
151 while let Some(msg) = left.next().await {
153 match msg? {
154 w @ Message::Watermark(_) => yield Either::Right(w),
155 c @ Message::Chunk(_) => yield Either::Right(c),
156 Message::Barrier(_) => {
157 bail!("left barrier received while right stream end");
158 }
159 }
160 }
161 break 'outer;
162 }
163 future::Either::Left((Some(msg), _)) => match msg? {
164 w @ Message::Watermark(_) => yield Either::Left(w),
165 c @ Message::Chunk(_) => yield Either::Left(c),
166 Message::Barrier(b) => {
167 yield Either::Left(Message::Barrier(b.clone()));
168 break 'inner (SideStatus::LeftBarrier, b);
169 }
170 },
171 future::Either::Right((Some(msg), _)) => match msg? {
172 w @ Message::Watermark(_) => yield Either::Right(w),
173 c @ Message::Chunk(_) => yield Either::Right(c),
174 Message::Barrier(b) => {
175 yield Either::Right(Message::Barrier(b.clone()));
176 break 'inner (SideStatus::RightBarrier, b);
177 }
178 },
179 }
180 };
181
182 match side_status {
183 SideStatus::LeftBarrier => {
184 #[for_await]
185 for item in poll_until_barrier(right.by_ref(), side_barrier) {
186 yield Either::Right(item?);
187 }
188 }
189 SideStatus::RightBarrier => {
190 #[for_await]
191 for item in poll_until_barrier(left.by_ref(), side_barrier) {
192 yield Either::Left(item?);
193 }
194 }
195 }
196 }
197}
198
199#[try_stream(ok = ArrangeMessage, error = StreamExecutorError)]
214pub async fn stream_lookup_arrange_prev_epoch(stream: Executor, arrangement: Executor) {
215 let mut input = pin!(align_barrier(stream.execute(), arrangement.execute()));
216 let mut arrange_buf = vec![];
217 let mut stream_side_end = false;
218
219 loop {
220 let mut arrange_barrier = None;
221
222 while let Some(item) = input.next().await {
223 match item? {
224 Either::Left(Message::Chunk(msg)) => {
225 yield ArrangeMessage::Stream(msg);
228 }
229 Either::Right(Message::Chunk(chunk)) => {
230 arrange_buf.push(chunk);
232 }
233 Either::Left(Message::Barrier(barrier)) => {
234 yield ArrangeMessage::Barrier(barrier);
235 stream_side_end = true;
236 }
237 Either::Right(Message::Barrier(barrier)) => {
238 if stream_side_end {
239 yield ArrangeMessage::ArrangeReady(
240 std::mem::take(&mut arrange_buf),
241 barrier,
242 );
243 stream_side_end = false;
244 } else {
245 arrange_barrier = Some(barrier);
246 break;
247 }
248 }
249 Either::Left(Message::Watermark(_)) => {
250 }
252 Either::Right(Message::Watermark(_)) => {
253 }
255 }
256 }
257
258 loop {
259 match input
260 .next()
261 .await
262 .context("unexpected close of barrier aligner")??
263 {
264 Either::Left(Message::Watermark(_)) => {
265 }
267 Either::Left(Message::Chunk(msg)) => yield ArrangeMessage::Stream(msg),
268 Either::Left(Message::Barrier(b)) => {
269 yield ArrangeMessage::Barrier(b);
270 break;
271 }
272 Either::Right(_) => unreachable!(),
273 }
274 }
275
276 yield ArrangeMessage::ArrangeReady(
277 std::mem::take(&mut arrange_buf),
278 arrange_barrier.take().unwrap(),
279 );
280 }
281}
282
283#[try_stream(ok = ArrangeMessage, error = StreamExecutorError)]
295pub async fn stream_lookup_arrange_this_epoch(stream: Executor, arrangement: Executor) {
296 let mut input = pin!(align_barrier(stream.execute(), arrangement.execute()));
297 let mut stream_buf = vec![];
298 let mut arrange_buf = vec![];
299
300 enum Status {
301 ArrangeReady,
302 StreamReady(Barrier),
303 }
304
305 loop {
306 let status = 'inner: loop {
307 match input
308 .next()
309 .await
310 .context("unexpected close of barrier aligner")??
311 {
312 Either::Left(Message::Chunk(msg)) => {
313 stream_buf.push(msg);
315 }
316 Either::Right(Message::Chunk(chunk)) => {
317 arrange_buf.push(chunk);
319 }
320 Either::Left(Message::Barrier(barrier)) => {
321 break 'inner Status::StreamReady(barrier);
322 }
323 Either::Right(Message::Barrier(barrier)) => {
324 yield ArrangeMessage::ArrangeReady(std::mem::take(&mut arrange_buf), barrier);
325 for msg in std::mem::take(&mut stream_buf) {
326 yield ArrangeMessage::Stream(msg);
327 }
328 break 'inner Status::ArrangeReady;
329 }
330 Either::Left(Message::Watermark(_)) => {
331 }
333 Either::Right(Message::Watermark(_)) => {
334 }
336 }
337 };
338 match status {
339 Status::ArrangeReady => loop {
342 match input
343 .next()
344 .await
345 .context("unexpected close of barrier aligner")??
346 {
347 Either::Left(Message::Chunk(msg)) => yield ArrangeMessage::Stream(msg),
348 Either::Left(Message::Barrier(b)) => {
349 yield ArrangeMessage::Barrier(b);
350 break;
351 }
352 Either::Left(Message::Watermark(_)) => {
353 }
355 Either::Right(Message::Watermark(_)) => {
356 }
358 Either::Right(_) => unreachable!(),
359 }
360 },
361 Status::StreamReady(stream_barrier) => loop {
364 match input
365 .next()
366 .await
367 .context("unexpected close of barrier aligner")??
368 {
369 Either::Left(_) => unreachable!(),
370 Either::Right(Message::Chunk(chunk)) => {
371 arrange_buf.push(chunk);
372 }
373 Either::Right(Message::Barrier(barrier)) => {
374 yield ArrangeMessage::ArrangeReady(
375 std::mem::take(&mut arrange_buf),
376 barrier,
377 );
378 for msg in std::mem::take(&mut stream_buf) {
379 yield ArrangeMessage::Stream(msg);
380 }
381 yield ArrangeMessage::Barrier(stream_barrier);
382 break;
383 }
384 Either::Right(Message::Watermark(_)) => {
385 }
387 }
388 },
389 }
390 }
391}
392
393impl<S: StateStore> std::fmt::Debug for ArrangeJoinSide<S> {
394 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
395 f.debug_struct("ArrangeJoinSide")
396 .field("pk_indices", &self.pk_indices)
397 .field("col_types", &self.col_types)
398 .field("col_descs", &self.col_descs)
399 .field("order_rules", &self.order_rules)
400 .field("use_current_epoch", &self.use_current_epoch)
401 .finish()
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use futures::StreamExt;
408 use risingwave_common::array::{StreamChunk, StreamChunkTestExt};
409 use risingwave_common::catalog::{Field, Schema};
410 use risingwave_common::types::DataType;
411 use risingwave_common::util::epoch::test_epoch;
412
413 use crate::executor::StreamExecutorResult;
414 use crate::executor::lookup::sides::stream_lookup_arrange_this_epoch;
415 use crate::executor::test_utils::MockSource;
416
417 #[tokio::test]
418 async fn test_stream_lookup_arrange_this_epoch() -> StreamExecutorResult<()> {
419 let chunk_l1 = StreamChunk::from_pretty(
420 " I I
421 + 1 1",
422 );
423
424 let schema = Schema {
425 fields: vec![
426 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
428 ],
429 };
430 let (mut tx_l, source_l) = MockSource::channel();
431 let source_l = source_l
432 .stop_on_finish(false)
433 .into_executor(schema.clone(), vec![1]);
434 let (tx_r, source_r) = MockSource::channel();
435 let source_r = source_r
436 .stop_on_finish(false)
437 .into_executor(schema, vec![1]);
438
439 let mut stream = stream_lookup_arrange_this_epoch(source_l, source_r).boxed();
440
441 drop(tx_r);
443
444 tx_l.push_barrier(test_epoch(1), false);
445
446 tx_l.push_chunk(chunk_l1);
447
448 stream.next().await.unwrap().unwrap_err();
450
451 Ok(())
452 }
453}