risingwave_stream/executor/dispatch/
dispatch_sync_log_store.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::future::{Future, pending};
17use std::pin::Pin;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use futures::future::{Either, select};
22use pin_project::pin_project;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::must_match;
25use risingwave_pb::stream_plan;
26use risingwave_storage::StateStore;
27use risingwave_storage::store::StateStoreRead;
28use rw_futures_util::drop_either_future;
29use tokio::sync::mpsc::UnboundedReceiver;
30
31use super::{DispatchExecutor, DispatchExecutorInner, dispatch_message_batch};
32use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
33use crate::common::log_store_impl::kv_log_store::serde::LogStoreRowSerde;
34use crate::common::log_store_impl::kv_log_store::state::LogStoreReadState;
35use crate::common::log_store_impl::kv_log_store::{
36    FIRST_SEQ_ID, KV_LOG_STORE_V2_INFO, LogStoreVnodeProgress,
37};
38use crate::executor::prelude::*;
39use crate::executor::sync_kv_log_store::{
40    ReadFuture, SyncKvLogStoreContext, SyncedKvLogStoreExecutor, SyncedLogStoreBuffer, WriteFuture,
41    WriteFutureEvent,
42};
43use crate::executor::{MessageBatch, StreamConsumer, SyncedKvLogStoreMetrics};
44use crate::task::NewOutputRequest;
45
46/// Executor that pairs a synced KV log store with a dispatcher so the log store can
47/// advance and write independently of downstream backpressure.
48///
49/// This keeps upstream executors being polled even when downstream is slow or blocked, decoupling
50/// log store progress from downstream polling.
51pub struct SyncLogStoreDispatchExecutor<S: StateStore> {
52    pub(super) input: Executor,
53    pub(super) inner: DispatchExecutorInner,
54    pub(super) log_store_context: SyncKvLogStoreContext<S>,
55}
56
57impl<S: StateStore> SyncLogStoreDispatchExecutor<S> {
58    pub(crate) async fn new(
59        input: Executor,
60        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
61        dispatchers: Vec<stream_plan::Dispatcher>,
62        actor_context: &ActorContextRef,
63        sync: &stream_plan::SyncLogStoreNode,
64        vnode_bitmap: Option<Bitmap>,
65        state_store: S,
66    ) -> StreamResult<Self> {
67        let chunk_size = actor_context.config.developer.chunk_size;
68        let fragment_id = actor_context.fragment_id;
69        let log_store_metrics = SyncedKvLogStoreMetrics::new(
70            &actor_context.streaming_metrics,
71            actor_context.id,
72            fragment_id,
73            "sync_log_store_dispatch",
74            "sync_log_store_dispatch",
75        );
76
77        let table = sync
78            .log_store_table
79            .as_ref()
80            .ok_or_else(|| anyhow!("missing log_store_table in SyncLogStoreNode"))?;
81
82        let pause_duration_ms = actor_context
83            .config
84            .developer
85            .sync_log_store_pause_duration_ms;
86        let max_buffer_size = actor_context.config.developer.sync_log_store_buffer_size;
87
88        let serde =
89            LogStoreRowSerde::new(table, vnode_bitmap.map(Into::into), &KV_LOG_STORE_V2_INFO);
90        let log_store_context = SyncKvLogStoreContext {
91            table_id: table.id,
92            fragment_id,
93            serde,
94            state_store,
95            max_buffer_size,
96            pause_duration_ms: Duration::from_millis(pause_duration_ms as _),
97            aligned: sync.aligned,
98            chunk_size,
99            metrics: log_store_metrics,
100        };
101
102        Self::new_with_log_store_context(
103            input,
104            new_output_request_rx,
105            dispatchers,
106            actor_context,
107            log_store_context,
108        )
109        .await
110    }
111
112    async fn new_with_log_store_context(
113        input: Executor,
114        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
115        dispatchers: Vec<stream_plan::Dispatcher>,
116        actor_context: &ActorContextRef,
117        log_store_context: SyncKvLogStoreContext<S>,
118    ) -> StreamResult<Self> {
119        let DispatchExecutor { input, inner } =
120            DispatchExecutor::new(input, new_output_request_rx, dispatchers, actor_context).await?;
121
122        tracing::info!(
123            actor_id = %actor_context.id,
124            "synclogstore dispatch executor info"
125        );
126
127        Ok(Self {
128            input,
129            inner,
130            log_store_context,
131        })
132    }
133}
134
135type DispatchingFuture =
136    impl Future<Output = (DispatchExecutorInner, StreamResult<Option<Barrier>>)> + 'static;
137
138#[define_opaque(DispatchingFuture)]
139fn dispatching_future(mut inner: DispatchExecutorInner, message: Message) -> DispatchingFuture {
140    async move {
141        let batch: MessageBatch = message.into();
142        let r = dispatch_message_batch(&mut inner, batch)
143            .await
144            .map(|barrier_batch| {
145                barrier_batch.map(|mut barrier_batch| {
146                    debug_assert_eq!(barrier_batch.len(), 1);
147                    barrier_batch
148                        .pop()
149                        .expect("barrier batch should contain one barrier")
150                })
151            });
152        (inner, r)
153    }
154}
155
156/// State machine for the consumer side, which reads chunks from the log store and dispatches
157/// chunks or barriers downstream.
158#[pin_project(project = ConsumerFutureProj, project_replace = ConsumerFutureProjReplace)]
159enum ConsumerFuture {
160    /// Polls the log store for the next chunk. The read future is kept outside this state machine
161    /// so both meaningful states can share it.
162    ReadingChunk { inner: DispatchExecutorInner },
163    /// Dispatches the current message downstream. Any barrier received while dispatching is queued
164    /// here and dispatched before reading another chunk.
165    Dispatching {
166        #[pin]
167        future: DispatchingFuture,
168        barrier_queue: VecDeque<Message>,
169    },
170    /// Temporary placeholder used while moving fields out during state transitions.
171    PlaceHolder,
172}
173
174enum ConsumerFutureEvent {
175    BarrierDispatched(Barrier),
176    CleanStateReached,
177}
178
179impl ConsumerFuture {
180    fn dispatch(inner: DispatchExecutorInner, message: Message) -> Self {
181        tracing::trace!("consumer_future: dispatching future created");
182        Self::Dispatching {
183            future: dispatching_future(inner, message),
184            barrier_queue: VecDeque::new(),
185        }
186    }
187
188    fn read_chunk(inner: DispatchExecutorInner) -> Self {
189        tracing::trace!("consumer_future: reading chunk future created");
190        Self::ReadingChunk { inner }
191    }
192
193    fn push_barrier(mut self: Pin<&mut Self>, barrier: Barrier) {
194        let message = Message::Barrier(barrier);
195        match self.as_mut().project() {
196            ConsumerFutureProj::ReadingChunk { .. } => {
197                let inner = must_match!(
198                    self.as_mut().project_replace(ConsumerFuture::PlaceHolder),
199                    ConsumerFutureProjReplace::ReadingChunk { inner } => inner
200                );
201                self.set(Self::dispatch(inner, message));
202            }
203            ConsumerFutureProj::Dispatching { barrier_queue, .. } => {
204                barrier_queue.push_front(message);
205            }
206            ConsumerFutureProj::PlaceHolder => {
207                unreachable!("ConsumerFuture::PlaceHolder should be handled!")
208            }
209        }
210    }
211
212    #[expect(clippy::too_many_arguments)]
213    async fn next_event<S: StateStoreRead>(
214        mut self: Pin<&mut Self>,
215        read_future: &mut ReadFuture<S>,
216        read_paused: bool,
217        clean_state: &mut bool,
218        progress: &mut LogStoreVnodeProgress,
219        read_state: &LogStoreReadState<S>,
220        buffer: &mut SyncedLogStoreBuffer,
221        metrics: &SyncedKvLogStoreMetrics,
222    ) -> StreamResult<ConsumerFutureEvent> {
223        loop {
224            match self.as_mut().project() {
225                ConsumerFutureProj::ReadingChunk { .. } => {
226                    if read_paused {
227                        pending().await
228                    }
229
230                    let chunk = read_future
231                        .next_chunk(progress, read_state, buffer, metrics)
232                        .await?;
233                    metrics.total_read_count.inc_by(chunk.cardinality() as _);
234
235                    let clean_state_reached =
236                        read_future.mark_clean_state(clean_state, buffer, metrics);
237                    let inner = must_match!(
238                        self.as_mut().project_replace(ConsumerFuture::PlaceHolder),
239                        ConsumerFutureProjReplace::ReadingChunk { inner } => inner
240                    );
241                    self.set(Self::dispatch(inner, Message::Chunk(chunk)));
242
243                    if clean_state_reached {
244                        return Ok(ConsumerFutureEvent::CleanStateReached);
245                    }
246                    continue;
247                }
248                ConsumerFutureProj::Dispatching {
249                    future,
250                    barrier_queue,
251                } => {
252                    let (inner, result) = future.await;
253                    let barrier = result?;
254
255                    if let Some(next_barrier) = barrier_queue.pop_back() {
256                        tracing::trace!("consumer_future: dispatching future created");
257                        let ConsumerFutureProj::Dispatching { mut future, .. } =
258                            self.as_mut().project()
259                        else {
260                            unreachable!("ConsumerFuture::ReadingChunk should be handled!")
261                        };
262                        future.set(dispatching_future(inner, next_barrier));
263                    } else {
264                        self.set(Self::read_chunk(inner));
265                    }
266
267                    if let Some(barrier) = barrier {
268                        return Ok(ConsumerFutureEvent::BarrierDispatched(barrier));
269                    }
270                }
271                ConsumerFutureProj::PlaceHolder => {
272                    unreachable!("ConsumerFuture::PlaceHolder should be handled!")
273                }
274            }
275        }
276    }
277}
278
279impl<S: StateStore> StreamConsumer for SyncLogStoreDispatchExecutor<S> {
280    type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
281
282    fn execute(mut self: Box<Self>) -> Self::BarrierStream {
283        #[try_stream]
284        async move {
285            let actor_id = self.inner.actor_id;
286            let log_store_config = self.log_store_context;
287
288            let mut input = self.input.execute();
289
290            let first_barrier = expect_first_barrier(&mut input).await?;
291            let first_write_epoch = first_barrier.epoch;
292
293            // Dispatch the first barrier before initializing the log store states
294            let first_barrier_batch = dispatch_message_batch(
295                &mut self.inner,
296                Message::Barrier(first_barrier.clone()).into(),
297            )
298            .await?;
299            debug_assert_eq!(
300                first_barrier_batch
301                    .as_ref()
302                    .map(|barrier_batch| barrier_batch.len()),
303                Some(1)
304            );
305            yield first_barrier.clone();
306
307            let (read_state, initial_write_state) =
308                SyncedKvLogStoreExecutor::<S>::init_local_log_store_state(
309                    &log_store_config,
310                    first_write_epoch,
311                )
312                .await?;
313
314            let initial_write_epoch = first_write_epoch;
315            let mut pause_stream = first_barrier.is_pause_on_startup();
316
317            if log_store_config.aligned {
318                let aligned_stream = SyncedKvLogStoreExecutor::<S>::aligned_message_stream(
319                    actor_id,
320                    input,
321                    read_state,
322                    initial_write_state,
323                    log_store_config.metrics.clone(),
324                    initial_write_epoch,
325                );
326
327                #[for_await]
328                for message in aligned_stream {
329                    if let Some(barrier_batch) =
330                        dispatch_message_batch(&mut self.inner, message?.into()).await?
331                    {
332                        // Now Synclogstoredispatchexecutor only support sending out single barrier
333                        debug_assert_eq!(barrier_batch.len(), 1);
334                        for barrier in barrier_batch {
335                            yield barrier;
336                        }
337                    }
338                }
339                return Ok(());
340            }
341
342            let mut seq_id = FIRST_SEQ_ID;
343            let mut buffer = SyncedLogStoreBuffer::new(
344                log_store_config.max_buffer_size,
345                log_store_config.chunk_size,
346                &log_store_config.metrics,
347            );
348
349            let log_store_stream = read_state
350                .read_persisted_log_store(
351                    log_store_config.metrics.persistent_log_read_metrics.clone(),
352                    initial_write_epoch.curr,
353                    LogStoreReadStateStreamRangeStart::Unbounded,
354                )
355                .await?;
356
357            let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
358            let mut clean_state = log_store_stream.peek().await.is_none();
359            tracing::trace!(?clean_state);
360
361            let mut progress = LogStoreVnodeProgress::None;
362            let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
363            let consumer_future_state = ConsumerFuture::ReadingChunk { inner: self.inner };
364            pin_mut!(consumer_future_state);
365
366            let mut write_future_state =
367                WriteFuture::receive_from_upstream(input, initial_write_state);
368            let mut end_of_stream = false;
369
370            loop {
371                let select_result = {
372                    let consumer_future = async {
373                        consumer_future_state
374                            .as_mut()
375                            .next_event(
376                                &mut read_future_state,
377                                pause_stream,
378                                &mut clean_state,
379                                &mut progress,
380                                &read_state,
381                                &mut buffer,
382                                &log_store_config.metrics,
383                            )
384                            .await
385                    };
386                    pin_mut!(consumer_future);
387                    let write_future = async {
388                        if end_of_stream {
389                            pending().await
390                        } else {
391                            write_future_state
392                                .next_event(&log_store_config.metrics)
393                                .await
394                        }
395                    };
396                    pin_mut!(write_future);
397                    let output = select(write_future, consumer_future).await;
398                    drop_either_future(output)
399                };
400
401                match select_result {
402                    Either::Left(_write_result) => {
403                        drop(write_future_state);
404                        let (stream, mut write_state, either) = _write_result?;
405                        match either {
406                            WriteFutureEvent::UpstreamMessageReceived(msg) => match msg {
407                                Message::Chunk(chunk) => {
408                                    let (new_seq_id, next_write_future) =
409                                        SyncedKvLogStoreExecutor::<S>::process_upstream_chunk(
410                                            seq_id,
411                                            stream,
412                                            write_state,
413                                            chunk,
414                                            &mut buffer,
415                                        );
416                                    seq_id = new_seq_id;
417                                    write_future_state = next_write_future;
418                                }
419                                Message::Barrier(barrier) => {
420                                    if clean_state
421                                        && barrier.kind.is_checkpoint()
422                                        && !buffer.is_empty()
423                                    {
424                                        write_future_state = WriteFuture::paused(
425                                            log_store_config.pause_duration_ms,
426                                            barrier,
427                                            stream,
428                                            write_state,
429                                        );
430                                        clean_state = false;
431                                        log_store_config.metrics.unclean_state.inc();
432                                    } else {
433                                        SyncedKvLogStoreExecutor::<S>::apply_pause_resume_mutation(
434                                            &barrier,
435                                            &mut pause_stream,
436                                        );
437                                        let write_state_post_write_barrier =
438                                            SyncedKvLogStoreExecutor::<S>::write_barrier(
439                                                actor_id,
440                                                &mut write_state,
441                                                barrier.clone(),
442                                                &log_store_config.metrics,
443                                                progress.take(),
444                                                &mut buffer,
445                                            )
446                                            .await?;
447                                        seq_id = FIRST_SEQ_ID;
448                                        barrier.assume_no_update_vnode_bitmap(actor_id)?;
449
450                                        write_state_post_write_barrier
451                                            .post_yield_barrier(None)
452                                            .await?;
453
454                                        let is_stop_barrier = barrier.is_stop(actor_id);
455                                        if is_stop_barrier {
456                                            // Stop polling upstream after the stop barrier is
457                                            // written into the log store.
458                                            end_of_stream = true;
459                                            write_future_state = WriteFuture::Empty;
460                                        } else {
461                                            write_future_state = WriteFuture::receive_from_upstream(
462                                                stream,
463                                                write_state,
464                                            );
465                                        }
466                                        consumer_future_state.as_mut().push_barrier(barrier);
467                                    }
468                                }
469                                // TODO: handle upstream watermark in sync log store dispatch.
470                                Message::Watermark(_watermark) => {
471                                    write_future_state =
472                                        WriteFuture::receive_from_upstream(stream, write_state);
473                                }
474                            },
475                            WriteFutureEvent::ChunkFlushed(info) => {
476                                write_future_state =
477                                    SyncedKvLogStoreExecutor::<S>::process_flushed_chunk(
478                                        stream,
479                                        write_state,
480                                        info,
481                                        &mut buffer,
482                                        &log_store_config.metrics,
483                                    );
484                            }
485                        }
486                    }
487                    Either::Right(consumer_result) => {
488                        let event = consumer_result?;
489                        match event {
490                            ConsumerFutureEvent::CleanStateReached => {
491                                if let WriteFuture::Paused { sleep_future, .. } =
492                                    &mut write_future_state
493                                {
494                                    assert!(buffer.has_available_capacity());
495                                    *sleep_future = None;
496                                }
497                            }
498                            ConsumerFutureEvent::BarrierDispatched(barrier) => {
499                                yield barrier;
500                            }
501                        }
502                    }
503                }
504            }
505        }
506    }
507}
508
509#[cfg(test)]
510mod tests {
511    use std::sync::Arc;
512
513    use futures::StreamExt;
514    use risingwave_common::array::{StreamChunk, StreamChunkTestExt};
515    use risingwave_common::bitmap::Bitmap;
516    use risingwave_common::catalog::{Field, Schema};
517    use risingwave_common::hash::VirtualNode;
518    use risingwave_common::types::DataType;
519    use risingwave_common::util::epoch::test_epoch;
520    use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
521    use risingwave_storage::memory::MemoryStateStore;
522    use tokio::sync::mpsc::unbounded_channel;
523    use tokio::time::{Duration, timeout};
524
525    use super::*;
526    use crate::assert_stream_chunk_eq;
527    use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
528    use crate::common::log_store_impl::kv_log_store::serde::LogStoreRowSerde;
529    use crate::common::log_store_impl::kv_log_store::test_utils::{
530        check_stream_chunk_eq, gen_test_log_store_table,
531    };
532    use crate::executor::exchange::permit::channel_for_test;
533    use crate::executor::test_utils::MockSource;
534    use crate::executor::{ActorContext, BarrierInner as Barrier};
535    use crate::task::ActorId;
536
537    const ACTOR_ID: u32 = 4242;
538    const DOWNSTREAM_ACTOR_ID: u32 = 5252;
539
540    fn init_logger() {
541        let _ = tracing_subscriber::fmt()
542            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
543            .with_ansi(false)
544            .try_init();
545    }
546
547    async fn run_barrier_chunk_ordering_test(aligned: bool) {
548        init_logger();
549
550        let actor_id = ActorId::new(ACTOR_ID);
551        let downstream_actor = ActorId::new(DOWNSTREAM_ACTOR_ID);
552        let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
553        let (down_tx, mut down_rx) = channel_for_test();
554        new_output_request_tx
555            .send((downstream_actor, NewOutputRequest::Local(down_tx)))
556            .unwrap();
557
558        let barrier1 = Barrier::new_test_barrier(test_epoch(1));
559        let barrier2 = Barrier::new_test_barrier(test_epoch(2));
560        let chunk_1 = StreamChunk::from_pretty(
561            "  I   T
562            +  5  10
563            +  6  10
564            +  8  10
565            +  9  10
566            + 10  11",
567        );
568        let chunk_2 = StreamChunk::from_pretty(
569            "  I   T
570            -  5  10
571            -  6  10
572            -  8  10
573            U- 10  11
574            U+ 10  10",
575        );
576        let dispatcher = stream_plan::Dispatcher {
577            r#type: DispatcherType::Simple as _,
578            dispatcher_id: 7.into(),
579            downstream_actor_id: vec![DOWNSTREAM_ACTOR_ID.into()],
580            output_mapping: PbDispatchOutputMapping::identical(2).into(),
581            ..Default::default()
582        };
583        let pk_info = &KV_LOG_STORE_V2_INFO;
584        let table = gen_test_log_store_table(pk_info);
585        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
586        let serde = LogStoreRowSerde::new(&table, vnodes, pk_info);
587        let log_store_config = SyncKvLogStoreContext {
588            table_id: table.id,
589            fragment_id: 0.into(),
590            serde,
591            state_store: MemoryStateStore::new(),
592            max_buffer_size: 1024,
593            pause_duration_ms: Duration::from_millis(10),
594            aligned,
595            chunk_size: 1024,
596            metrics: SyncedKvLogStoreMetrics::for_test(),
597        };
598        let (mut input_tx, source) = MockSource::channel();
599        let input = source.into_executor(
600            Schema {
601                fields: vec![
602                    Field::unnamed(DataType::Int64),
603                    Field::unnamed(DataType::Varchar),
604                ],
605            },
606            vec![0],
607        );
608
609        let executor = SyncLogStoreDispatchExecutor::new_with_log_store_context(
610            input,
611            new_output_request_rx,
612            vec![dispatcher],
613            &ActorContext::for_test(actor_id),
614            log_store_config,
615        )
616        .await
617        .unwrap();
618
619        let (barrier_out_tx, mut barrier_out_rx) = unbounded_channel();
620        let barrier_driver = tokio::spawn(async move {
621            let barrier_stream = Box::new(executor).execute();
622            futures::pin_mut!(barrier_stream);
623            while let Some(item) = barrier_stream.next().await {
624                barrier_out_tx.send(item).ok();
625            }
626        });
627
628        input_tx.send_barrier(barrier1.clone());
629        let observed1 = timeout(Duration::from_secs(1), barrier_out_rx.recv())
630            .await
631            .unwrap()
632            .unwrap()
633            .unwrap();
634        assert_eq!(observed1.epoch.curr, test_epoch(1));
635
636        let msg = timeout(Duration::from_secs(1), down_rx.recv())
637            .await
638            .unwrap()
639            .expect("downstream should receive barrier(1)");
640        let barriers = msg.as_barrier_batch().unwrap();
641        assert_eq!(barriers.len(), 1);
642        assert_eq!(barriers[0].epoch.curr, test_epoch(1));
643
644        input_tx.push_chunk(chunk_1.clone());
645        input_tx.push_chunk(chunk_2.clone());
646        let msg = timeout(Duration::from_secs(1), down_rx.recv())
647            .await
648            .unwrap()
649            .expect("downstream should receive chunk(1)");
650        assert_stream_chunk_eq!(msg.as_chunk().unwrap(), chunk_1);
651
652        let msg = timeout(Duration::from_secs(1), down_rx.recv())
653            .await
654            .unwrap()
655            .expect("downstream should receive chunk(2)");
656        assert_stream_chunk_eq!(msg.as_chunk().unwrap(), chunk_2);
657
658        input_tx.send_barrier(barrier2.clone());
659        let msg = timeout(Duration::from_secs(1), down_rx.recv())
660            .await
661            .unwrap()
662            .expect("downstream should receive barrier(2)");
663        let barriers = msg.as_barrier_batch().unwrap();
664        assert_eq!(barriers.len(), 1);
665        assert_eq!(barriers[0].epoch.curr, test_epoch(2));
666
667        let observed2 = timeout(Duration::from_secs(1), barrier_out_rx.recv())
668            .await
669            .unwrap()
670            .unwrap()
671            .unwrap();
672        assert_eq!(observed2.epoch.curr, test_epoch(2));
673
674        barrier_driver.abort();
675    }
676
677    /// Mirror `sync_kv_log_store::test_barrier_persisted_read`, but assert the dispatched output
678    /// order: chunk(1) -> chunk(2) -> barrier(2), while barrier(1) is surfaced via barrier stream.
679    #[tokio::test]
680    async fn test_barrier_chunk_ordering_in_dispatch() {
681        run_barrier_chunk_ordering_test(false).await;
682    }
683
684    #[tokio::test]
685    async fn test_aligned_barrier_chunk_ordering_in_dispatch() {
686        run_barrier_chunk_ordering_test(true).await;
687    }
688}