risingwave_stream/task/barrier_manager/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod cdc_progress;
16pub mod progress;
17
18pub use progress::CreateMviewProgressReporter;
19use risingwave_common::id::{SourceId, TableId};
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_pb::id::{FragmentId, PartialGraphId};
22use tokio::sync::mpsc;
23use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
24
25use crate::error::{IntoUnexpectedExit, StreamError};
26use crate::executor::exchange::permit::{self, channel_from_config};
27use crate::executor::{Barrier, BarrierInner};
28use crate::task::barrier_manager::progress::BackfillState;
29use crate::task::cdc_progress::CdcTableBackfillState;
30use crate::task::{ActorId, StreamEnvironment};
31
32/// Events sent from actors via [`LocalBarrierManager`] to [`super::barrier_worker::managed_state::PartialGraphState`].
33///
34/// See [`crate::task`] for architecture overview.
35pub(super) enum LocalBarrierEvent {
36    ReportActorCollected {
37        actor_id: ActorId,
38        epoch: EpochPair,
39    },
40    ReportCreateProgress {
41        epoch: EpochPair,
42        fragment_id: FragmentId,
43        actor: ActorId,
44        state: BackfillState,
45    },
46    ReportSourceListFinished {
47        epoch: EpochPair,
48        actor_id: ActorId,
49        table_id: TableId,
50        associated_source_id: SourceId,
51    },
52    ReportSourceLoadFinished {
53        epoch: EpochPair,
54        actor_id: ActorId,
55        table_id: TableId,
56        associated_source_id: SourceId,
57    },
58    RefreshFinished {
59        epoch: EpochPair,
60        actor_id: ActorId,
61        table_id: TableId,
62        staging_table_id: TableId,
63    },
64    RegisterBarrierSender {
65        actor_id: ActorId,
66        barrier_sender: mpsc::UnboundedSender<Barrier>,
67    },
68    RegisterLocalUpstreamOutput {
69        actor_id: ActorId,
70        upstream_actor_id: ActorId,
71        upstream_partial_graph_id: PartialGraphId,
72        tx: permit::Sender,
73    },
74    ReportCdcTableBackfillProgress {
75        actor_id: ActorId,
76        epoch: EpochPair,
77        state: CdcTableBackfillState,
78    },
79}
80
81/// Can send [`LocalBarrierEvent`] to [`super::barrier_worker::managed_state::PartialGraphState::poll_next_event`]
82///
83/// See [`crate::task`] for architecture overview.
84#[derive(Clone)]
85pub struct LocalBarrierManager {
86    barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
87    actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
88    pub(crate) term_id: String,
89    pub(crate) env: StreamEnvironment,
90}
91
92impl LocalBarrierManager {
93    pub(super) fn new(
94        term_id: String,
95        env: StreamEnvironment,
96    ) -> (
97        Self,
98        UnboundedReceiver<LocalBarrierEvent>,
99        UnboundedReceiver<(ActorId, StreamError)>,
100    ) {
101        let (event_tx, event_rx) = unbounded_channel();
102        let (err_tx, err_rx) = unbounded_channel();
103        (
104            Self {
105                barrier_event_sender: event_tx,
106                actor_failure_sender: err_tx,
107                term_id,
108                env,
109            },
110            event_rx,
111            err_rx,
112        )
113    }
114
115    pub fn for_test() -> Self {
116        Self::new("114514".to_owned(), StreamEnvironment::for_test()).0
117    }
118
119    /// Event is handled by [`super::barrier_worker::managed_state::PartialGraphState::poll_next_event`]
120    fn send_event(&self, event: LocalBarrierEvent) {
121        // ignore error, because the current barrier manager maybe a stale one
122        let _ = self.barrier_event_sender.send(event);
123    }
124
125    /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report
126    /// and collect this barrier with its own `actor_id` using this function.
127    pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
128        self.send_event(LocalBarrierEvent::ReportActorCollected {
129            actor_id,
130            epoch: barrier.epoch,
131        })
132    }
133
134    /// When a actor exit unexpectedly, it should report this event using this function, so meta
135    /// will notice actor's exit while collecting.
136    pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
137        let _ = self
138            .actor_failure_sender
139            .send((actor_id, err.into_unexpected_exit(actor_id)));
140    }
141
142    pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
143        let (tx, rx) = mpsc::unbounded_channel();
144        self.send_event(LocalBarrierEvent::RegisterBarrierSender {
145            actor_id,
146            barrier_sender: tx,
147        });
148        rx
149    }
150
151    pub fn register_local_upstream_output(
152        &self,
153        actor_id: ActorId,
154        upstream_actor_id: ActorId,
155        upstream_partial_graph_id: PartialGraphId,
156    ) -> permit::Receiver {
157        let (tx, rx) = channel_from_config(self.env.global_config());
158        self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
159            actor_id,
160            upstream_actor_id,
161            upstream_partial_graph_id,
162            tx,
163        });
164        rx
165    }
166
167    pub fn report_source_list_finished(
168        &self,
169        epoch: EpochPair,
170        actor_id: ActorId,
171        table_id: TableId,
172        associated_source_id: SourceId,
173    ) {
174        self.send_event(LocalBarrierEvent::ReportSourceListFinished {
175            epoch,
176            actor_id,
177            table_id,
178            associated_source_id,
179        });
180    }
181
182    pub fn report_source_load_finished(
183        &self,
184        epoch: EpochPair,
185        actor_id: ActorId,
186        table_id: TableId,
187        associated_source_id: SourceId,
188    ) {
189        self.send_event(LocalBarrierEvent::ReportSourceLoadFinished {
190            epoch,
191            actor_id,
192            table_id,
193            associated_source_id,
194        });
195    }
196
197    pub fn report_refresh_finished(
198        &self,
199        epoch: EpochPair,
200        actor_id: ActorId,
201        table_id: TableId,
202        staging_table_id: TableId,
203    ) {
204        self.send_event(LocalBarrierEvent::RefreshFinished {
205            epoch,
206            actor_id,
207            table_id,
208            staging_table_id,
209        });
210    }
211}
212
213#[cfg(test)]
214impl LocalBarrierManager {
215    pub(super) fn spawn_for_test()
216    -> crate::task::barrier_worker::EventSender<crate::task::barrier_worker::LocalActorOperation>
217    {
218        use std::sync::Arc;
219        use std::sync::atomic::AtomicU64;
220
221        use crate::executor::monitor::StreamingMetrics;
222        use crate::task::barrier_worker::{EventSender, LocalBarrierWorker};
223
224        let (tx, rx) = unbounded_channel();
225        let _join_handle = LocalBarrierWorker::spawn(
226            StreamEnvironment::for_test(),
227            Arc::new(StreamingMetrics::unused()),
228            None,
229            Arc::new(AtomicU64::new(0)),
230            rx,
231        );
232        EventSender(tx)
233    }
234}