risingwave_stream/task/barrier_manager/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod progress;
16pub use progress::CreateMviewProgressReporter;
17use risingwave_common::catalog::DatabaseId;
18use risingwave_common::util::epoch::EpochPair;
19use tokio::sync::mpsc;
20use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
21
22use crate::error::{IntoUnexpectedExit, StreamError};
23use crate::executor::exchange::permit::{self, channel_from_config};
24use crate::executor::{Barrier, BarrierInner};
25use crate::task::barrier_manager::progress::BackfillState;
26use crate::task::{ActorId, StreamEnvironment};
27
28/// Events sent from actors via [`LocalBarrierManager`] to [`super::barrier_worker::managed_state::DatabaseManagedBarrierState`].
29///
30/// See [`crate::task`] for architecture overview.
31pub(super) enum LocalBarrierEvent {
32    ReportActorCollected {
33        actor_id: ActorId,
34        epoch: EpochPair,
35    },
36    ReportCreateProgress {
37        epoch: EpochPair,
38        actor: ActorId,
39        state: BackfillState,
40    },
41    ReportSourceLoadFinished {
42        epoch: EpochPair,
43        actor_id: ActorId,
44        table_id: u32,
45        associated_source_id: u32,
46    },
47    RegisterBarrierSender {
48        actor_id: ActorId,
49        barrier_sender: mpsc::UnboundedSender<Barrier>,
50    },
51    RegisterLocalUpstreamOutput {
52        actor_id: ActorId,
53        upstream_actor_id: ActorId,
54        tx: permit::Sender,
55    },
56}
57
58/// Can send [`LocalBarrierEvent`] to [`super::barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event`]
59///
60/// See [`crate::task`] for architecture overview.
61#[derive(Clone)]
62pub struct LocalBarrierManager {
63    barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
64    actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
65    pub(crate) database_id: DatabaseId,
66    pub(crate) term_id: String,
67    pub(crate) env: StreamEnvironment,
68}
69
70impl LocalBarrierManager {
71    pub(super) fn new(
72        database_id: DatabaseId,
73        term_id: String,
74        env: StreamEnvironment,
75    ) -> (
76        Self,
77        UnboundedReceiver<LocalBarrierEvent>,
78        UnboundedReceiver<(ActorId, StreamError)>,
79    ) {
80        let (event_tx, event_rx) = unbounded_channel();
81        let (err_tx, err_rx) = unbounded_channel();
82        (
83            Self {
84                barrier_event_sender: event_tx,
85                actor_failure_sender: err_tx,
86                database_id,
87                term_id,
88                env,
89            },
90            event_rx,
91            err_rx,
92        )
93    }
94
95    #[cfg(test)]
96    pub fn for_test() -> Self {
97        Self::new(
98            DatabaseId {
99                database_id: 114514,
100            },
101            "114514".to_owned(),
102            StreamEnvironment::for_test(),
103        )
104        .0
105    }
106
107    /// Event is handled by [`super::barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event`]
108    fn send_event(&self, event: LocalBarrierEvent) {
109        // ignore error, because the current barrier manager maybe a stale one
110        let _ = self.barrier_event_sender.send(event);
111    }
112
113    /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report
114    /// and collect this barrier with its own `actor_id` using this function.
115    pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
116        self.send_event(LocalBarrierEvent::ReportActorCollected {
117            actor_id,
118            epoch: barrier.epoch,
119        })
120    }
121
122    /// When a actor exit unexpectedly, it should report this event using this function, so meta
123    /// will notice actor's exit while collecting.
124    pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
125        let _ = self
126            .actor_failure_sender
127            .send((actor_id, err.into_unexpected_exit(actor_id)));
128    }
129
130    pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
131        let (tx, rx) = mpsc::unbounded_channel();
132        self.send_event(LocalBarrierEvent::RegisterBarrierSender {
133            actor_id,
134            barrier_sender: tx,
135        });
136        rx
137    }
138
139    pub fn register_local_upstream_output(
140        &self,
141        actor_id: ActorId,
142        upstream_actor_id: ActorId,
143    ) -> permit::Receiver {
144        let (tx, rx) = channel_from_config(self.env.config());
145        self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
146            actor_id,
147            upstream_actor_id,
148            tx,
149        });
150        rx
151    }
152
153    pub fn report_source_load_finished(
154        &self,
155        epoch: EpochPair,
156        actor_id: ActorId,
157        table_id: u32,
158        associated_source_id: u32,
159    ) {
160        self.send_event(LocalBarrierEvent::ReportSourceLoadFinished {
161            epoch,
162            actor_id,
163            table_id,
164            associated_source_id,
165        });
166    }
167}
168
169#[cfg(test)]
170impl LocalBarrierManager {
171    pub(super) fn spawn_for_test()
172    -> crate::task::barrier_worker::EventSender<crate::task::barrier_worker::LocalActorOperation>
173    {
174        use std::sync::Arc;
175        use std::sync::atomic::AtomicU64;
176
177        use crate::executor::monitor::StreamingMetrics;
178        use crate::task::barrier_worker::{EventSender, LocalBarrierWorker};
179
180        let (tx, rx) = unbounded_channel();
181        let _join_handle = LocalBarrierWorker::spawn(
182            StreamEnvironment::for_test(),
183            Arc::new(StreamingMetrics::unused()),
184            None,
185            Arc::new(AtomicU64::new(0)),
186            rx,
187        );
188        EventSender(tx)
189    }
190}