risingwave_stream/task/barrier_manager/
managed_state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::LazyCell;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::fmt::{Debug, Display, Formatter};
18use std::future::{Future, pending, poll_fn};
19use std::mem::replace;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Instant;
23
24use futures::FutureExt;
25use futures::stream::FuturesOrdered;
26use prometheus::HistogramTimer;
27use risingwave_common::catalog::{DatabaseId, TableId};
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_pb::stream_plan::barrier::BarrierKind;
30use risingwave_storage::StateStoreImpl;
31use tokio::sync::mpsc;
32use tokio::sync::mpsc::UnboundedReceiver;
33use tokio::task::JoinHandle;
34
35use super::progress::BackfillState;
36use crate::error::{StreamError, StreamResult};
37use crate::executor::Barrier;
38use crate::executor::monitor::StreamingMetrics;
39use crate::task::{
40    ActorId, LocalBarrierManager, PartialGraphId, SharedContext, StreamActorManager,
41};
42
43struct IssuedState {
44    /// Actor ids remaining to be collected.
45    pub remaining_actors: BTreeSet<ActorId>,
46
47    pub barrier_inflight_latency: HistogramTimer,
48}
49
50impl Debug for IssuedState {
51    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52        f.debug_struct("IssuedState")
53            .field("remaining_actors", &self.remaining_actors)
54            .finish()
55    }
56}
57
58/// The state machine of local barrier manager.
59#[derive(Debug)]
60enum ManagedBarrierStateInner {
61    /// Meta service has issued a `send_barrier` request. We're collecting barriers now.
62    Issued(IssuedState),
63
64    /// The barrier has been collected by all remaining actors
65    AllCollected(Vec<PbCreateMviewProgress>),
66}
67
68#[derive(Debug)]
69struct BarrierState {
70    barrier: Barrier,
71    /// Only be `Some(_)` when `barrier.kind` is `Checkpoint`
72    table_ids: Option<HashSet<TableId>>,
73    inner: ManagedBarrierStateInner,
74}
75
76use risingwave_common::must_match;
77use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
78use risingwave_pb::stream_service::InjectBarrierRequest;
79use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
80use risingwave_pb::stream_service::streaming_control_stream_request::{
81    DatabaseInitialPartialGraph, InitialPartialGraph,
82};
83
84use crate::task::barrier_manager::await_epoch_completed_future::AwaitEpochCompletedFuture;
85use crate::task::barrier_manager::{LocalBarrierEvent, ScoredStreamError};
86
87pub(super) struct ManagedBarrierStateDebugInfo<'a> {
88    running_actors: BTreeSet<ActorId>,
89    graph_states: &'a HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
90}
91
92impl Display for ManagedBarrierStateDebugInfo<'_> {
93    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94        write!(f, "running_actors: ")?;
95        for actor_id in &self.running_actors {
96            write!(f, "{}, ", actor_id)?;
97        }
98        for (partial_graph_id, graph_states) in self.graph_states {
99            writeln!(f, "--- Partial Group {}", partial_graph_id.0)?;
100            write!(f, "{}", graph_states)?;
101        }
102        Ok(())
103    }
104}
105
106impl Display for &'_ PartialGraphManagedBarrierState {
107    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
108        let mut prev_epoch = 0u64;
109        for (epoch, barrier_state) in &self.epoch_barrier_state_map {
110            write!(f, "> Epoch {}: ", epoch)?;
111            match &barrier_state.inner {
112                ManagedBarrierStateInner::Issued(state) => {
113                    write!(
114                        f,
115                        "Issued [{:?}]. Remaining actors: [",
116                        barrier_state.barrier.kind
117                    )?;
118                    let mut is_prev_epoch_issued = false;
119                    if prev_epoch != 0 {
120                        let bs = &self.epoch_barrier_state_map[&prev_epoch];
121                        if let ManagedBarrierStateInner::Issued(IssuedState {
122                            remaining_actors: remaining_actors_prev,
123                            ..
124                        }) = &bs.inner
125                        {
126                            // Only show the actors that are not in the previous epoch.
127                            is_prev_epoch_issued = true;
128                            let mut duplicates = 0usize;
129                            for actor_id in &state.remaining_actors {
130                                if !remaining_actors_prev.contains(actor_id) {
131                                    write!(f, "{}, ", actor_id)?;
132                                } else {
133                                    duplicates += 1;
134                                }
135                            }
136                            if duplicates > 0 {
137                                write!(f, "...and {} actors in prev epoch", duplicates)?;
138                            }
139                        }
140                    }
141                    if !is_prev_epoch_issued {
142                        for actor_id in &state.remaining_actors {
143                            write!(f, "{}, ", actor_id)?;
144                        }
145                    }
146                    write!(f, "]")?;
147                }
148                ManagedBarrierStateInner::AllCollected(_) => {
149                    write!(f, "AllCollected")?;
150                }
151            }
152            prev_epoch = *epoch;
153            writeln!(f)?;
154        }
155
156        if !self.create_mview_progress.is_empty() {
157            writeln!(f, "Create MView Progress:")?;
158            for (epoch, progress) in &self.create_mview_progress {
159                write!(f, "> Epoch {}:", epoch)?;
160                for (actor_id, state) in progress {
161                    write!(f, ">> Actor {}: {}, ", actor_id, state)?;
162                }
163            }
164        }
165
166        Ok(())
167    }
168}
169
170enum InflightActorStatus {
171    /// The actor has been issued some barriers, but has not collected the first barrier
172    IssuedFirst(Vec<Barrier>),
173    /// The actor has been issued some barriers, and has collected the first barrier
174    Running(u64),
175}
176
177impl InflightActorStatus {
178    fn max_issued_epoch(&self) -> u64 {
179        match self {
180            InflightActorStatus::Running(epoch) => *epoch,
181            InflightActorStatus::IssuedFirst(issued_barriers) => {
182                issued_barriers.last().expect("non-empty").epoch.prev
183            }
184        }
185    }
186}
187
188pub(crate) struct InflightActorState {
189    actor_id: ActorId,
190    barrier_senders: Vec<mpsc::UnboundedSender<Barrier>>,
191    /// `prev_epoch` -> partial graph id
192    pub(super) inflight_barriers: BTreeMap<u64, PartialGraphId>,
193    status: InflightActorStatus,
194    /// Whether the actor has been issued a stop barrier
195    is_stopping: bool,
196
197    join_handle: JoinHandle<()>,
198    monitor_task_handle: Option<JoinHandle<()>>,
199}
200
201impl InflightActorState {
202    pub(super) fn start(
203        actor_id: ActorId,
204        initial_partial_graph_id: PartialGraphId,
205        initial_barrier: &Barrier,
206        join_handle: JoinHandle<()>,
207        monitor_task_handle: Option<JoinHandle<()>>,
208    ) -> Self {
209        Self {
210            actor_id,
211            barrier_senders: vec![],
212            inflight_barriers: BTreeMap::from_iter([(
213                initial_barrier.epoch.prev,
214                initial_partial_graph_id,
215            )]),
216            status: InflightActorStatus::IssuedFirst(vec![initial_barrier.clone()]),
217            is_stopping: false,
218            join_handle,
219            monitor_task_handle,
220        }
221    }
222
223    pub(super) fn issue_barrier(
224        &mut self,
225        partial_graph_id: PartialGraphId,
226        barrier: &Barrier,
227        is_stop: bool,
228    ) -> StreamResult<()> {
229        assert!(barrier.epoch.prev > self.status.max_issued_epoch());
230
231        for barrier_sender in &self.barrier_senders {
232            barrier_sender.send(barrier.clone()).map_err(|_| {
233                StreamError::barrier_send(
234                    barrier.clone(),
235                    self.actor_id,
236                    "failed to send to registered sender",
237                )
238            })?;
239        }
240
241        assert!(
242            self.inflight_barriers
243                .insert(barrier.epoch.prev, partial_graph_id)
244                .is_none()
245        );
246
247        match &mut self.status {
248            InflightActorStatus::IssuedFirst(pending_barriers) => {
249                pending_barriers.push(barrier.clone());
250            }
251            InflightActorStatus::Running(prev_epoch) => {
252                *prev_epoch = barrier.epoch.prev;
253            }
254        };
255
256        if is_stop {
257            assert!(!self.is_stopping, "stopped actor should not issue barrier");
258            self.is_stopping = true;
259        }
260        Ok(())
261    }
262
263    pub(super) fn collect(&mut self, epoch: EpochPair) -> (PartialGraphId, bool) {
264        let (prev_epoch, prev_partial_graph_id) =
265            self.inflight_barriers.pop_first().expect("should exist");
266        assert_eq!(prev_epoch, epoch.prev);
267        match &self.status {
268            InflightActorStatus::IssuedFirst(pending_barriers) => {
269                assert_eq!(
270                    prev_epoch,
271                    pending_barriers.first().expect("non-empty").epoch.prev
272                );
273                self.status = InflightActorStatus::Running(
274                    pending_barriers.last().expect("non-empty").epoch.prev,
275                );
276            }
277            InflightActorStatus::Running(_) => {}
278        }
279        (
280            prev_partial_graph_id,
281            self.inflight_barriers.is_empty() && self.is_stopping,
282        )
283    }
284}
285
286pub(super) struct PartialGraphManagedBarrierState {
287    /// Record barrier state for each epoch of concurrent checkpoints.
288    ///
289    /// The key is `prev_epoch`, and the first value is `curr_epoch`
290    epoch_barrier_state_map: BTreeMap<u64, BarrierState>,
291
292    prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,
293
294    mv_depended_subscriptions: HashMap<TableId, HashSet<u32>>,
295
296    /// Record the progress updates of creating mviews for each epoch of concurrent checkpoints.
297    ///
298    /// This is updated by [`super::CreateMviewProgressReporter::update`] and will be reported to meta
299    /// in [`crate::task::barrier_manager::BarrierCompleteResult`].
300    pub(super) create_mview_progress: HashMap<u64, HashMap<ActorId, BackfillState>>,
301
302    state_store: StateStoreImpl,
303
304    streaming_metrics: Arc<StreamingMetrics>,
305}
306
307impl PartialGraphManagedBarrierState {
308    pub(super) fn new(actor_manager: &StreamActorManager) -> Self {
309        Self::new_inner(
310            actor_manager.env.state_store(),
311            actor_manager.streaming_metrics.clone(),
312        )
313    }
314
315    fn new_inner(state_store: StateStoreImpl, streaming_metrics: Arc<StreamingMetrics>) -> Self {
316        Self {
317            epoch_barrier_state_map: Default::default(),
318            prev_barrier_table_ids: None,
319            mv_depended_subscriptions: Default::default(),
320            create_mview_progress: Default::default(),
321            state_store,
322            streaming_metrics,
323        }
324    }
325
326    #[cfg(test)]
327    pub(crate) fn for_test() -> Self {
328        Self::new_inner(
329            StateStoreImpl::for_test(),
330            Arc::new(StreamingMetrics::unused()),
331        )
332    }
333
334    pub(super) fn is_empty(&self) -> bool {
335        self.epoch_barrier_state_map.is_empty()
336    }
337}
338
339pub(crate) struct SuspendedDatabaseState {
340    pub(super) suspend_time: Instant,
341    inner: DatabaseManagedBarrierState,
342    failure: Option<(Option<ActorId>, StreamError)>,
343}
344
345impl SuspendedDatabaseState {
346    fn new(
347        state: DatabaseManagedBarrierState,
348        failure: Option<(Option<ActorId>, StreamError)>,
349        _completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>, /* discard the completing futures */
350    ) -> Self {
351        Self {
352            suspend_time: Instant::now(),
353            inner: state,
354            failure,
355        }
356    }
357
358    async fn reset(mut self) -> ResetDatabaseOutput {
359        let root_err = self.inner.try_find_root_actor_failure(self.failure).await;
360        self.inner.abort_and_wait_actors().await;
361        if let Some(hummock) = self.inner.actor_manager.env.state_store().as_hummock() {
362            hummock.clear_tables(self.inner.table_ids).await;
363        }
364        ResetDatabaseOutput { root_err }
365    }
366}
367
368pub(crate) struct ResettingDatabaseState {
369    join_handle: JoinHandle<ResetDatabaseOutput>,
370    reset_request_id: u32,
371}
372
373pub(crate) struct ResetDatabaseOutput {
374    pub(crate) root_err: Option<ScoredStreamError>,
375}
376
377pub(crate) enum DatabaseStatus {
378    Running(DatabaseManagedBarrierState),
379    Suspended(SuspendedDatabaseState),
380    Resetting(ResettingDatabaseState),
381    /// temporary place holder
382    Unspecified,
383}
384
385impl DatabaseStatus {
386    pub(crate) async fn abort(&mut self) {
387        match self {
388            DatabaseStatus::Running(state) => {
389                state.abort_and_wait_actors().await;
390            }
391            DatabaseStatus::Suspended(SuspendedDatabaseState { inner: state, .. }) => {
392                state.abort_and_wait_actors().await;
393            }
394            DatabaseStatus::Resetting(state) => {
395                (&mut state.join_handle)
396                    .await
397                    .expect("failed to join reset database join handle");
398            }
399            DatabaseStatus::Unspecified => {
400                unreachable!()
401            }
402        }
403    }
404
405    pub(crate) fn state_for_request(&mut self) -> Option<&mut DatabaseManagedBarrierState> {
406        match self {
407            DatabaseStatus::Running(state) => Some(state),
408            DatabaseStatus::Suspended(_) => None,
409            DatabaseStatus::Resetting(_) => {
410                unreachable!("should not receive further request during cleaning")
411            }
412            DatabaseStatus::Unspecified => {
413                unreachable!()
414            }
415        }
416    }
417
418    pub(super) fn poll_next_event(
419        &mut self,
420        cx: &mut Context<'_>,
421    ) -> Poll<ManagedBarrierStateEvent> {
422        match self {
423            DatabaseStatus::Running(state) => state.poll_next_event(cx),
424            DatabaseStatus::Suspended(_) => Poll::Pending,
425            DatabaseStatus::Resetting(state) => state.join_handle.poll_unpin(cx).map(|result| {
426                let output = result.expect("should be able to join");
427                ManagedBarrierStateEvent::DatabaseReset(output, state.reset_request_id)
428            }),
429            DatabaseStatus::Unspecified => {
430                unreachable!()
431            }
432        }
433    }
434
435    pub(super) fn suspend(
436        &mut self,
437        failed_actor: Option<ActorId>,
438        err: StreamError,
439        completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
440    ) {
441        let state = must_match!(replace(self, DatabaseStatus::Unspecified), DatabaseStatus::Running(state) => state);
442        *self = DatabaseStatus::Suspended(SuspendedDatabaseState::new(
443            state,
444            Some((failed_actor, err)),
445            completing_futures,
446        ));
447    }
448
449    pub(super) fn start_reset(
450        &mut self,
451        database_id: DatabaseId,
452        completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
453        reset_request_id: u32,
454    ) {
455        let join_handle = match replace(self, DatabaseStatus::Unspecified) {
456            DatabaseStatus::Running(state) => {
457                assert_eq!(database_id, state.database_id);
458                info!(
459                    database_id = database_id.database_id,
460                    reset_request_id, "start database reset from Running"
461                );
462                tokio::spawn(SuspendedDatabaseState::new(state, None, completing_futures).reset())
463            }
464            DatabaseStatus::Suspended(state) => {
465                assert!(
466                    completing_futures.is_none(),
467                    "should have been clear when suspended"
468                );
469                assert_eq!(database_id, state.inner.database_id);
470                info!(
471                    database_id = database_id.database_id,
472                    reset_request_id,
473                    suspend_elapsed = ?state.suspend_time.elapsed(),
474                    "start database reset after suspended"
475                );
476                tokio::spawn(state.reset())
477            }
478            DatabaseStatus::Resetting(state) => {
479                let prev_request_id = state.reset_request_id;
480                info!(
481                    database_id = database_id.database_id,
482                    reset_request_id, prev_request_id, "receive duplicate reset request"
483                );
484                assert!(reset_request_id > prev_request_id);
485                state.join_handle
486            }
487            DatabaseStatus::Unspecified => {
488                unreachable!()
489            }
490        };
491        *self = DatabaseStatus::Resetting(ResettingDatabaseState {
492            join_handle,
493            reset_request_id,
494        });
495    }
496}
497
498pub(crate) struct ManagedBarrierState {
499    pub(crate) databases: HashMap<DatabaseId, DatabaseStatus>,
500    pub(crate) current_shared_context: HashMap<DatabaseId, Arc<SharedContext>>,
501}
502
503pub(super) enum ManagedBarrierStateEvent {
504    BarrierCollected {
505        partial_graph_id: PartialGraphId,
506        barrier: Barrier,
507    },
508    ActorError {
509        actor_id: ActorId,
510        err: StreamError,
511    },
512    DatabaseReset(ResetDatabaseOutput, u32),
513}
514
515impl ManagedBarrierState {
516    pub(super) fn new(
517        actor_manager: Arc<StreamActorManager>,
518        initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
519        term_id: String,
520    ) -> Self {
521        let mut databases = HashMap::new();
522        let mut current_shared_context = HashMap::new();
523        for database in initial_partial_graphs {
524            let database_id = DatabaseId::new(database.database_id);
525            assert!(!databases.contains_key(&database_id));
526            let shared_context = Arc::new(SharedContext::new(
527                database_id,
528                &actor_manager.env,
529                term_id.clone(),
530            ));
531            shared_context.add_actors(
532                database
533                    .graphs
534                    .iter()
535                    .flat_map(|graph| graph.actor_infos.iter())
536                    .cloned(),
537            );
538            let state = DatabaseManagedBarrierState::new(
539                database_id,
540                actor_manager.clone(),
541                shared_context.clone(),
542                database.graphs,
543            );
544            databases.insert(database_id, DatabaseStatus::Running(state));
545            current_shared_context.insert(database_id, shared_context);
546        }
547
548        Self {
549            databases,
550            current_shared_context,
551        }
552    }
553
554    pub(super) fn next_event(
555        &mut self,
556    ) -> impl Future<Output = (DatabaseId, ManagedBarrierStateEvent)> + '_ {
557        poll_fn(|cx| {
558            for (database_id, database) in &mut self.databases {
559                if let Poll::Ready(event) = database.poll_next_event(cx) {
560                    return Poll::Ready((*database_id, event));
561                }
562            }
563            Poll::Pending
564        })
565    }
566}
567
568pub(crate) struct DatabaseManagedBarrierState {
569    database_id: DatabaseId,
570    pub(super) actor_states: HashMap<ActorId, InflightActorState>,
571
572    pub(super) graph_states: HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
573
574    table_ids: HashSet<TableId>,
575
576    actor_manager: Arc<StreamActorManager>,
577
578    pub(super) current_shared_context: Arc<SharedContext>,
579    pub(super) local_barrier_manager: LocalBarrierManager,
580
581    barrier_event_rx: UnboundedReceiver<LocalBarrierEvent>,
582    pub(super) actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,
583}
584
585impl DatabaseManagedBarrierState {
586    /// Create a barrier manager state. This will be called only once.
587    pub(super) fn new(
588        database_id: DatabaseId,
589        actor_manager: Arc<StreamActorManager>,
590        current_shared_context: Arc<SharedContext>,
591        initial_partial_graphs: Vec<InitialPartialGraph>,
592    ) -> Self {
593        let (local_barrier_manager, barrier_event_rx, actor_failure_rx) =
594            LocalBarrierManager::new();
595        Self {
596            database_id,
597            actor_states: Default::default(),
598            graph_states: initial_partial_graphs
599                .into_iter()
600                .map(|graph| {
601                    let mut state = PartialGraphManagedBarrierState::new(&actor_manager);
602                    state.add_subscriptions(graph.subscriptions);
603                    (PartialGraphId::new(graph.partial_graph_id), state)
604                })
605                .collect(),
606            table_ids: Default::default(),
607            actor_manager,
608            current_shared_context,
609            local_barrier_manager,
610            barrier_event_rx,
611            actor_failure_rx,
612        }
613    }
614
615    pub(super) fn to_debug_info(&self) -> ManagedBarrierStateDebugInfo<'_> {
616        ManagedBarrierStateDebugInfo {
617            running_actors: self.actor_states.keys().cloned().collect(),
618            graph_states: &self.graph_states,
619        }
620    }
621
622    async fn abort_and_wait_actors(&mut self) {
623        for (actor_id, state) in &self.actor_states {
624            tracing::debug!("force stopping actor {}", actor_id);
625            state.join_handle.abort();
626            if let Some(monitor_task_handle) = &state.monitor_task_handle {
627                monitor_task_handle.abort();
628            }
629        }
630
631        for (actor_id, state) in self.actor_states.drain() {
632            tracing::debug!("join actor {}", actor_id);
633            let result = state.join_handle.await;
634            assert!(result.is_ok() || result.unwrap_err().is_cancelled());
635        }
636    }
637}
638
639impl InflightActorState {
640    pub(super) fn register_barrier_sender(
641        &mut self,
642        tx: mpsc::UnboundedSender<Barrier>,
643    ) -> StreamResult<()> {
644        match &self.status {
645            InflightActorStatus::IssuedFirst(pending_barriers) => {
646                for barrier in pending_barriers {
647                    tx.send(barrier.clone()).map_err(|_| {
648                        StreamError::barrier_send(
649                            barrier.clone(),
650                            self.actor_id,
651                            "failed to send pending barriers to newly registered sender",
652                        )
653                    })?;
654                }
655                self.barrier_senders.push(tx);
656            }
657            InflightActorStatus::Running(_) => {
658                unreachable!("should not register barrier sender when entering Running status")
659            }
660        }
661        Ok(())
662    }
663}
664
665impl DatabaseManagedBarrierState {
666    pub(super) fn register_barrier_sender(
667        &mut self,
668        actor_id: ActorId,
669        tx: mpsc::UnboundedSender<Barrier>,
670    ) -> StreamResult<()> {
671        self.actor_states
672            .get_mut(&actor_id)
673            .expect("should exist")
674            .register_barrier_sender(tx)
675    }
676}
677
678impl PartialGraphManagedBarrierState {
679    pub(super) fn add_subscriptions(&mut self, subscriptions: Vec<SubscriptionUpstreamInfo>) {
680        for subscription_to_add in subscriptions {
681            if !self
682                .mv_depended_subscriptions
683                .entry(TableId::new(subscription_to_add.upstream_mv_table_id))
684                .or_default()
685                .insert(subscription_to_add.subscriber_id)
686            {
687                if cfg!(debug_assertions) {
688                    panic!("add an existing subscription: {:?}", subscription_to_add);
689                }
690                warn!(?subscription_to_add, "add an existing subscription");
691            }
692        }
693    }
694
695    pub(super) fn remove_subscriptions(&mut self, subscriptions: Vec<SubscriptionUpstreamInfo>) {
696        for subscription_to_remove in subscriptions {
697            let upstream_table_id = TableId::new(subscription_to_remove.upstream_mv_table_id);
698            let Some(subscribers) = self.mv_depended_subscriptions.get_mut(&upstream_table_id)
699            else {
700                if cfg!(debug_assertions) {
701                    panic!(
702                        "unable to find upstream mv table to remove: {:?}",
703                        subscription_to_remove
704                    );
705                }
706                warn!(
707                    ?subscription_to_remove,
708                    "unable to find upstream mv table to remove"
709                );
710                continue;
711            };
712            if !subscribers.remove(&subscription_to_remove.subscriber_id) {
713                if cfg!(debug_assertions) {
714                    panic!(
715                        "unable to find subscriber to remove: {:?}",
716                        subscription_to_remove
717                    );
718                }
719                warn!(
720                    ?subscription_to_remove,
721                    "unable to find subscriber to remove"
722                );
723            }
724            if subscribers.is_empty() {
725                self.mv_depended_subscriptions.remove(&upstream_table_id);
726            }
727        }
728    }
729}
730
731impl DatabaseManagedBarrierState {
732    pub(super) fn transform_to_issued(
733        &mut self,
734        barrier: &Barrier,
735        request: InjectBarrierRequest,
736    ) -> StreamResult<()> {
737        let partial_graph_id = PartialGraphId::new(request.partial_graph_id);
738        let actor_to_stop = barrier.all_stop_actors();
739        let is_stop_actor = |actor_id| {
740            actor_to_stop
741                .map(|actors| actors.contains(&actor_id))
742                .unwrap_or(false)
743        };
744        let graph_state = self
745            .graph_states
746            .get_mut(&partial_graph_id)
747            .expect("should exist");
748
749        graph_state.add_subscriptions(request.subscriptions_to_add);
750        graph_state.remove_subscriptions(request.subscriptions_to_remove);
751
752        let table_ids =
753            HashSet::from_iter(request.table_ids_to_sync.iter().cloned().map(TableId::new));
754        self.table_ids.extend(table_ids.iter().cloned());
755
756        graph_state.transform_to_issued(
757            barrier,
758            request.actor_ids_to_collect.iter().cloned(),
759            table_ids,
760        );
761
762        let mut new_actors = HashSet::new();
763        let subscriptions =
764            LazyCell::new(|| Arc::new(graph_state.mv_depended_subscriptions.clone()));
765        for (node, fragment_id, actor) in
766            request
767                .actors_to_build
768                .into_iter()
769                .flat_map(|fragment_actors| {
770                    let node = Arc::new(fragment_actors.node.unwrap());
771                    fragment_actors
772                        .actors
773                        .into_iter()
774                        .map(move |actor| (node.clone(), fragment_actors.fragment_id, actor))
775                })
776        {
777            let actor_id = actor.actor_id;
778            assert!(!is_stop_actor(actor_id));
779            assert!(new_actors.insert(actor_id));
780            assert!(request.actor_ids_to_collect.contains(&actor_id));
781            let (join_handle, monitor_join_handle) = self.actor_manager.spawn_actor(
782                actor,
783                fragment_id,
784                node,
785                (*subscriptions).clone(),
786                self.current_shared_context.clone(),
787                self.local_barrier_manager.clone(),
788            );
789            assert!(
790                self.actor_states
791                    .try_insert(
792                        actor_id,
793                        InflightActorState::start(
794                            actor_id,
795                            partial_graph_id,
796                            barrier,
797                            join_handle,
798                            monitor_join_handle
799                        )
800                    )
801                    .is_ok()
802            );
803        }
804
805        // Spawn a trivial join handle to be compatible with the unit test
806        if cfg!(test) {
807            for actor_id in &request.actor_ids_to_collect {
808                if !self.actor_states.contains_key(actor_id) {
809                    let join_handle = self.actor_manager.runtime.spawn(async { pending().await });
810                    assert!(
811                        self.actor_states
812                            .try_insert(
813                                *actor_id,
814                                InflightActorState::start(
815                                    *actor_id,
816                                    partial_graph_id,
817                                    barrier,
818                                    join_handle,
819                                    None,
820                                )
821                            )
822                            .is_ok()
823                    );
824                    new_actors.insert(*actor_id);
825                }
826            }
827        }
828
829        // Note: it's important to issue barrier to actor after issuing to graph to ensure that
830        // we call `start_epoch` on the graph before the actors receive the barrier
831        for actor_id in &request.actor_ids_to_collect {
832            if new_actors.contains(actor_id) {
833                continue;
834            }
835            self.actor_states
836                .get_mut(actor_id)
837                .unwrap_or_else(|| {
838                    panic!(
839                        "should exist: {} {:?}",
840                        actor_id, request.actor_ids_to_collect
841                    );
842                })
843                .issue_barrier(partial_graph_id, barrier, is_stop_actor(*actor_id))?;
844        }
845
846        Ok(())
847    }
848
849    pub(super) fn poll_next_event(
850        &mut self,
851        cx: &mut Context<'_>,
852    ) -> Poll<ManagedBarrierStateEvent> {
853        if let Poll::Ready(option) = self.actor_failure_rx.poll_recv(cx) {
854            let (actor_id, err) = option.expect("non-empty when tx in local_barrier_manager");
855            return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
856        }
857        // yield some pending collected epochs
858        for (partial_graph_id, graph_state) in &mut self.graph_states {
859            if let Some(barrier) = graph_state.may_have_collected_all() {
860                if let Some(actors_to_stop) = barrier.all_stop_actors() {
861                    self.current_shared_context.drop_actors(actors_to_stop);
862                }
863                return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
864                    partial_graph_id: *partial_graph_id,
865                    barrier,
866                });
867            }
868        }
869        while let Poll::Ready(event) = self.barrier_event_rx.poll_recv(cx) {
870            match event.expect("non-empty when tx in local_barrier_manager") {
871                LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => {
872                    if let Some((partial_graph_id, barrier)) = self.collect(actor_id, epoch) {
873                        if let Some(actors_to_stop) = barrier.all_stop_actors() {
874                            self.current_shared_context.drop_actors(actors_to_stop);
875                        }
876                        return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
877                            partial_graph_id,
878                            barrier,
879                        });
880                    }
881                }
882                LocalBarrierEvent::ReportCreateProgress {
883                    epoch,
884                    actor,
885                    state,
886                } => {
887                    self.update_create_mview_progress(epoch, actor, state);
888                }
889                LocalBarrierEvent::RegisterBarrierSender {
890                    actor_id,
891                    barrier_sender,
892                } => {
893                    if let Err(err) = self.register_barrier_sender(actor_id, barrier_sender) {
894                        return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
895                    }
896                }
897            }
898        }
899
900        debug_assert!(
901            self.graph_states
902                .values_mut()
903                .all(|graph_state| graph_state.may_have_collected_all().is_none())
904        );
905        Poll::Pending
906    }
907}
908
909impl DatabaseManagedBarrierState {
910    #[must_use]
911    pub(super) fn collect(
912        &mut self,
913        actor_id: ActorId,
914        epoch: EpochPair,
915    ) -> Option<(PartialGraphId, Barrier)> {
916        let (prev_partial_graph_id, is_finished) = self
917            .actor_states
918            .get_mut(&actor_id)
919            .expect("should exist")
920            .collect(epoch);
921        if is_finished {
922            let state = self.actor_states.remove(&actor_id).expect("should exist");
923            if let Some(monitor_task_handle) = state.monitor_task_handle {
924                monitor_task_handle.abort();
925            }
926        }
927        let prev_graph_state = self
928            .graph_states
929            .get_mut(&prev_partial_graph_id)
930            .expect("should exist");
931        prev_graph_state.collect(actor_id, epoch);
932        prev_graph_state
933            .may_have_collected_all()
934            .map(|barrier| (prev_partial_graph_id, barrier))
935    }
936
937    pub(super) fn pop_barrier_to_complete(
938        &mut self,
939        partial_graph_id: PartialGraphId,
940        prev_epoch: u64,
941    ) -> (
942        Barrier,
943        Option<HashSet<TableId>>,
944        Vec<PbCreateMviewProgress>,
945    ) {
946        self.graph_states
947            .get_mut(&partial_graph_id)
948            .expect("should exist")
949            .pop_barrier_to_complete(prev_epoch)
950    }
951}
952
953impl PartialGraphManagedBarrierState {
954    /// This method is called when barrier state is modified in either `Issued` or `Stashed`
955    /// to transform the state to `AllCollected` and start state store `sync` when the barrier
956    /// has been collected from all actors for an `Issued` barrier.
957    fn may_have_collected_all(&mut self) -> Option<Barrier> {
958        for barrier_state in self.epoch_barrier_state_map.values_mut() {
959            match &barrier_state.inner {
960                ManagedBarrierStateInner::Issued(IssuedState {
961                    remaining_actors, ..
962                }) if remaining_actors.is_empty() => {}
963                ManagedBarrierStateInner::AllCollected(_) => {
964                    continue;
965                }
966                ManagedBarrierStateInner::Issued(_) => {
967                    break;
968                }
969            }
970
971            self.streaming_metrics.barrier_manager_progress.inc();
972
973            let create_mview_progress = self
974                .create_mview_progress
975                .remove(&barrier_state.barrier.epoch.curr)
976                .unwrap_or_default()
977                .into_iter()
978                .map(|(actor, state)| state.to_pb(actor))
979                .collect();
980
981            let prev_state = replace(
982                &mut barrier_state.inner,
983                ManagedBarrierStateInner::AllCollected(create_mview_progress),
984            );
985
986            must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState {
987                barrier_inflight_latency: timer,
988                ..
989            }) => {
990                timer.observe_duration();
991            });
992
993            return Some(barrier_state.barrier.clone());
994        }
995        None
996    }
997
998    fn pop_barrier_to_complete(
999        &mut self,
1000        prev_epoch: u64,
1001    ) -> (
1002        Barrier,
1003        Option<HashSet<TableId>>,
1004        Vec<PbCreateMviewProgress>,
1005    ) {
1006        let (popped_prev_epoch, barrier_state) = self
1007            .epoch_barrier_state_map
1008            .pop_first()
1009            .expect("should exist");
1010
1011        assert_eq!(prev_epoch, popped_prev_epoch);
1012
1013        let create_mview_progress = must_match!(barrier_state.inner, ManagedBarrierStateInner::AllCollected(create_mview_progress) => {
1014            create_mview_progress
1015        });
1016        (
1017            barrier_state.barrier,
1018            barrier_state.table_ids,
1019            create_mview_progress,
1020        )
1021    }
1022}
1023
1024impl PartialGraphManagedBarrierState {
1025    /// Collect a `barrier` from the actor with `actor_id`.
1026    pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) {
1027        tracing::debug!(
1028            target: "events::stream::barrier::manager::collect",
1029            ?epoch, actor_id, state = ?self.epoch_barrier_state_map,
1030            "collect_barrier",
1031        );
1032
1033        match self.epoch_barrier_state_map.get_mut(&epoch.prev) {
1034            None => {
1035                // If the barrier's state is stashed, this occurs exclusively in scenarios where the barrier has not been
1036                // injected by the barrier manager, or the barrier message is blocked at the `RemoteInput` side waiting for injection.
1037                // Given these conditions, it's inconceivable for an actor to attempt collect at this point.
1038                panic!(
1039                    "cannot collect new actor barrier {:?} at current state: None",
1040                    epoch,
1041                )
1042            }
1043            Some(&mut BarrierState {
1044                ref barrier,
1045                inner:
1046                    ManagedBarrierStateInner::Issued(IssuedState {
1047                        ref mut remaining_actors,
1048                        ..
1049                    }),
1050                ..
1051            }) => {
1052                let exist = remaining_actors.remove(&actor_id);
1053                assert!(
1054                    exist,
1055                    "the actor doesn't exist. actor_id: {:?}, curr_epoch: {:?}",
1056                    actor_id, epoch.curr
1057                );
1058                assert_eq!(barrier.epoch.curr, epoch.curr);
1059            }
1060            Some(BarrierState { inner, .. }) => {
1061                panic!(
1062                    "cannot collect new actor barrier {:?} at current state: {:?}",
1063                    epoch, inner
1064                )
1065            }
1066        }
1067    }
1068
1069    /// When the meta service issues a `send_barrier` request, call this function to transform to
1070    /// `Issued` and start to collect or to notify.
1071    pub(super) fn transform_to_issued(
1072        &mut self,
1073        barrier: &Barrier,
1074        actor_ids_to_collect: impl IntoIterator<Item = ActorId>,
1075        table_ids: HashSet<TableId>,
1076    ) {
1077        let timer = self
1078            .streaming_metrics
1079            .barrier_inflight_latency
1080            .start_timer();
1081
1082        if let Some(hummock) = self.state_store.as_hummock() {
1083            hummock.start_epoch(barrier.epoch.curr, table_ids.clone());
1084        }
1085
1086        let table_ids = match barrier.kind {
1087            BarrierKind::Unspecified => {
1088                unreachable!()
1089            }
1090            BarrierKind::Initial => {
1091                assert!(
1092                    self.prev_barrier_table_ids.is_none(),
1093                    "non empty table_ids at initial barrier: {:?}",
1094                    self.prev_barrier_table_ids
1095                );
1096                info!(epoch = ?barrier.epoch, "initialize at Initial barrier");
1097                self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1098                None
1099            }
1100            BarrierKind::Barrier => {
1101                if let Some((prev_epoch, prev_table_ids)) = self.prev_barrier_table_ids.as_mut() {
1102                    assert_eq!(prev_epoch.curr, barrier.epoch.prev);
1103                    assert_eq!(prev_table_ids, &table_ids);
1104                    *prev_epoch = barrier.epoch;
1105                } else {
1106                    info!(epoch = ?barrier.epoch, "initialize at non-checkpoint barrier");
1107                    self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1108                }
1109                None
1110            }
1111            BarrierKind::Checkpoint => Some(
1112                if let Some((prev_epoch, prev_table_ids)) = self
1113                    .prev_barrier_table_ids
1114                    .replace((barrier.epoch, table_ids))
1115                    && prev_epoch.curr == barrier.epoch.prev
1116                {
1117                    prev_table_ids
1118                } else {
1119                    debug!(epoch = ?barrier.epoch, "reinitialize at Checkpoint barrier");
1120                    HashSet::new()
1121                },
1122            ),
1123        };
1124
1125        if let Some(&mut BarrierState { ref inner, .. }) =
1126            self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev)
1127        {
1128            {
1129                panic!(
1130                    "barrier epochs{:?} state has already been `Issued`. Current state: {:?}",
1131                    barrier.epoch, inner
1132                );
1133            }
1134        };
1135
1136        self.epoch_barrier_state_map.insert(
1137            barrier.epoch.prev,
1138            BarrierState {
1139                barrier: barrier.clone(),
1140                inner: ManagedBarrierStateInner::Issued(IssuedState {
1141                    remaining_actors: BTreeSet::from_iter(actor_ids_to_collect),
1142                    barrier_inflight_latency: timer,
1143                }),
1144                table_ids,
1145            },
1146        );
1147    }
1148
1149    #[cfg(test)]
1150    async fn pop_next_completed_epoch(&mut self) -> u64 {
1151        if let Some(barrier) = self.may_have_collected_all() {
1152            self.pop_barrier_to_complete(barrier.epoch.prev);
1153            return barrier.epoch.prev;
1154        }
1155        pending().await
1156    }
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161    use std::collections::HashSet;
1162
1163    use risingwave_common::util::epoch::test_epoch;
1164
1165    use crate::executor::Barrier;
1166    use crate::task::barrier_manager::managed_state::PartialGraphManagedBarrierState;
1167
1168    #[tokio::test]
1169    async fn test_managed_state_add_actor() {
1170        let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1171        let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1172        let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1173        let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1174        let actor_ids_to_collect1 = HashSet::from([1, 2]);
1175        let actor_ids_to_collect2 = HashSet::from([1, 2]);
1176        let actor_ids_to_collect3 = HashSet::from([1, 2, 3]);
1177        managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1178        managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1179        managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1180        managed_barrier_state.collect(1, barrier1.epoch);
1181        managed_barrier_state.collect(2, barrier1.epoch);
1182        assert_eq!(
1183            managed_barrier_state.pop_next_completed_epoch().await,
1184            test_epoch(0)
1185        );
1186        assert_eq!(
1187            managed_barrier_state
1188                .epoch_barrier_state_map
1189                .first_key_value()
1190                .unwrap()
1191                .0,
1192            &test_epoch(1)
1193        );
1194        managed_barrier_state.collect(1, barrier2.epoch);
1195        managed_barrier_state.collect(1, barrier3.epoch);
1196        managed_barrier_state.collect(2, barrier2.epoch);
1197        assert_eq!(
1198            managed_barrier_state.pop_next_completed_epoch().await,
1199            test_epoch(1)
1200        );
1201        assert_eq!(
1202            managed_barrier_state
1203                .epoch_barrier_state_map
1204                .first_key_value()
1205                .unwrap()
1206                .0,
1207            &test_epoch(2)
1208        );
1209        managed_barrier_state.collect(2, barrier3.epoch);
1210        managed_barrier_state.collect(3, barrier3.epoch);
1211        assert_eq!(
1212            managed_barrier_state.pop_next_completed_epoch().await,
1213            test_epoch(2)
1214        );
1215        assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1216    }
1217
1218    #[tokio::test]
1219    async fn test_managed_state_stop_actor() {
1220        let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1221        let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1222        let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1223        let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1224        let actor_ids_to_collect1 = HashSet::from([1, 2, 3, 4]);
1225        let actor_ids_to_collect2 = HashSet::from([1, 2, 3]);
1226        let actor_ids_to_collect3 = HashSet::from([1, 2]);
1227        managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1228        managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1229        managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1230
1231        managed_barrier_state.collect(1, barrier1.epoch);
1232        managed_barrier_state.collect(1, barrier2.epoch);
1233        managed_barrier_state.collect(1, barrier3.epoch);
1234        managed_barrier_state.collect(2, barrier1.epoch);
1235        managed_barrier_state.collect(2, barrier2.epoch);
1236        managed_barrier_state.collect(2, barrier3.epoch);
1237        assert_eq!(
1238            managed_barrier_state
1239                .epoch_barrier_state_map
1240                .first_key_value()
1241                .unwrap()
1242                .0,
1243            &0
1244        );
1245        managed_barrier_state.collect(3, barrier1.epoch);
1246        managed_barrier_state.collect(3, barrier2.epoch);
1247        assert_eq!(
1248            managed_barrier_state
1249                .epoch_barrier_state_map
1250                .first_key_value()
1251                .unwrap()
1252                .0,
1253            &0
1254        );
1255        managed_barrier_state.collect(4, barrier1.epoch);
1256        assert_eq!(
1257            managed_barrier_state.pop_next_completed_epoch().await,
1258            test_epoch(0)
1259        );
1260        assert_eq!(
1261            managed_barrier_state.pop_next_completed_epoch().await,
1262            test_epoch(1)
1263        );
1264        assert_eq!(
1265            managed_barrier_state.pop_next_completed_epoch().await,
1266            test_epoch(2)
1267        );
1268        assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1269    }
1270}