1use std::cell::LazyCell;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::fmt::{Debug, Display, Formatter};
18use std::future::{Future, pending, poll_fn};
19use std::mem::replace;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Instant;
23
24use futures::FutureExt;
25use futures::stream::FuturesOrdered;
26use prometheus::HistogramTimer;
27use risingwave_common::catalog::{DatabaseId, TableId};
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_pb::stream_plan::barrier::BarrierKind;
30use risingwave_storage::StateStoreImpl;
31use tokio::sync::mpsc;
32use tokio::sync::mpsc::UnboundedReceiver;
33use tokio::task::JoinHandle;
34
35use super::progress::BackfillState;
36use crate::error::{StreamError, StreamResult};
37use crate::executor::Barrier;
38use crate::executor::monitor::StreamingMetrics;
39use crate::task::{
40 ActorId, LocalBarrierManager, PartialGraphId, SharedContext, StreamActorManager,
41};
42
43struct IssuedState {
44 pub remaining_actors: BTreeSet<ActorId>,
46
47 pub barrier_inflight_latency: HistogramTimer,
48}
49
50impl Debug for IssuedState {
51 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("IssuedState")
53 .field("remaining_actors", &self.remaining_actors)
54 .finish()
55 }
56}
57
58#[derive(Debug)]
60enum ManagedBarrierStateInner {
61 Issued(IssuedState),
63
64 AllCollected(Vec<PbCreateMviewProgress>),
66}
67
68#[derive(Debug)]
69struct BarrierState {
70 barrier: Barrier,
71 table_ids: Option<HashSet<TableId>>,
73 inner: ManagedBarrierStateInner,
74}
75
76use risingwave_common::must_match;
77use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
78use risingwave_pb::stream_service::InjectBarrierRequest;
79use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
80use risingwave_pb::stream_service::streaming_control_stream_request::{
81 DatabaseInitialPartialGraph, InitialPartialGraph,
82};
83
84use crate::task::barrier_manager::await_epoch_completed_future::AwaitEpochCompletedFuture;
85use crate::task::barrier_manager::{LocalBarrierEvent, ScoredStreamError};
86
87pub(super) struct ManagedBarrierStateDebugInfo<'a> {
88 running_actors: BTreeSet<ActorId>,
89 graph_states: &'a HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
90}
91
92impl Display for ManagedBarrierStateDebugInfo<'_> {
93 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94 write!(f, "running_actors: ")?;
95 for actor_id in &self.running_actors {
96 write!(f, "{}, ", actor_id)?;
97 }
98 for (partial_graph_id, graph_states) in self.graph_states {
99 writeln!(f, "--- Partial Group {}", partial_graph_id.0)?;
100 write!(f, "{}", graph_states)?;
101 }
102 Ok(())
103 }
104}
105
106impl Display for &'_ PartialGraphManagedBarrierState {
107 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
108 let mut prev_epoch = 0u64;
109 for (epoch, barrier_state) in &self.epoch_barrier_state_map {
110 write!(f, "> Epoch {}: ", epoch)?;
111 match &barrier_state.inner {
112 ManagedBarrierStateInner::Issued(state) => {
113 write!(
114 f,
115 "Issued [{:?}]. Remaining actors: [",
116 barrier_state.barrier.kind
117 )?;
118 let mut is_prev_epoch_issued = false;
119 if prev_epoch != 0 {
120 let bs = &self.epoch_barrier_state_map[&prev_epoch];
121 if let ManagedBarrierStateInner::Issued(IssuedState {
122 remaining_actors: remaining_actors_prev,
123 ..
124 }) = &bs.inner
125 {
126 is_prev_epoch_issued = true;
128 let mut duplicates = 0usize;
129 for actor_id in &state.remaining_actors {
130 if !remaining_actors_prev.contains(actor_id) {
131 write!(f, "{}, ", actor_id)?;
132 } else {
133 duplicates += 1;
134 }
135 }
136 if duplicates > 0 {
137 write!(f, "...and {} actors in prev epoch", duplicates)?;
138 }
139 }
140 }
141 if !is_prev_epoch_issued {
142 for actor_id in &state.remaining_actors {
143 write!(f, "{}, ", actor_id)?;
144 }
145 }
146 write!(f, "]")?;
147 }
148 ManagedBarrierStateInner::AllCollected(_) => {
149 write!(f, "AllCollected")?;
150 }
151 }
152 prev_epoch = *epoch;
153 writeln!(f)?;
154 }
155
156 if !self.create_mview_progress.is_empty() {
157 writeln!(f, "Create MView Progress:")?;
158 for (epoch, progress) in &self.create_mview_progress {
159 write!(f, "> Epoch {}:", epoch)?;
160 for (actor_id, state) in progress {
161 write!(f, ">> Actor {}: {}, ", actor_id, state)?;
162 }
163 }
164 }
165
166 Ok(())
167 }
168}
169
170enum InflightActorStatus {
171 IssuedFirst(Vec<Barrier>),
173 Running(u64),
175}
176
177impl InflightActorStatus {
178 fn max_issued_epoch(&self) -> u64 {
179 match self {
180 InflightActorStatus::Running(epoch) => *epoch,
181 InflightActorStatus::IssuedFirst(issued_barriers) => {
182 issued_barriers.last().expect("non-empty").epoch.prev
183 }
184 }
185 }
186}
187
188pub(crate) struct InflightActorState {
189 actor_id: ActorId,
190 barrier_senders: Vec<mpsc::UnboundedSender<Barrier>>,
191 pub(super) inflight_barriers: BTreeMap<u64, PartialGraphId>,
193 status: InflightActorStatus,
194 is_stopping: bool,
196
197 join_handle: JoinHandle<()>,
198 monitor_task_handle: Option<JoinHandle<()>>,
199}
200
201impl InflightActorState {
202 pub(super) fn start(
203 actor_id: ActorId,
204 initial_partial_graph_id: PartialGraphId,
205 initial_barrier: &Barrier,
206 join_handle: JoinHandle<()>,
207 monitor_task_handle: Option<JoinHandle<()>>,
208 ) -> Self {
209 Self {
210 actor_id,
211 barrier_senders: vec![],
212 inflight_barriers: BTreeMap::from_iter([(
213 initial_barrier.epoch.prev,
214 initial_partial_graph_id,
215 )]),
216 status: InflightActorStatus::IssuedFirst(vec![initial_barrier.clone()]),
217 is_stopping: false,
218 join_handle,
219 monitor_task_handle,
220 }
221 }
222
223 pub(super) fn issue_barrier(
224 &mut self,
225 partial_graph_id: PartialGraphId,
226 barrier: &Barrier,
227 is_stop: bool,
228 ) -> StreamResult<()> {
229 assert!(barrier.epoch.prev > self.status.max_issued_epoch());
230
231 for barrier_sender in &self.barrier_senders {
232 barrier_sender.send(barrier.clone()).map_err(|_| {
233 StreamError::barrier_send(
234 barrier.clone(),
235 self.actor_id,
236 "failed to send to registered sender",
237 )
238 })?;
239 }
240
241 assert!(
242 self.inflight_barriers
243 .insert(barrier.epoch.prev, partial_graph_id)
244 .is_none()
245 );
246
247 match &mut self.status {
248 InflightActorStatus::IssuedFirst(pending_barriers) => {
249 pending_barriers.push(barrier.clone());
250 }
251 InflightActorStatus::Running(prev_epoch) => {
252 *prev_epoch = barrier.epoch.prev;
253 }
254 };
255
256 if is_stop {
257 assert!(!self.is_stopping, "stopped actor should not issue barrier");
258 self.is_stopping = true;
259 }
260 Ok(())
261 }
262
263 pub(super) fn collect(&mut self, epoch: EpochPair) -> (PartialGraphId, bool) {
264 let (prev_epoch, prev_partial_graph_id) =
265 self.inflight_barriers.pop_first().expect("should exist");
266 assert_eq!(prev_epoch, epoch.prev);
267 match &self.status {
268 InflightActorStatus::IssuedFirst(pending_barriers) => {
269 assert_eq!(
270 prev_epoch,
271 pending_barriers.first().expect("non-empty").epoch.prev
272 );
273 self.status = InflightActorStatus::Running(
274 pending_barriers.last().expect("non-empty").epoch.prev,
275 );
276 }
277 InflightActorStatus::Running(_) => {}
278 }
279 (
280 prev_partial_graph_id,
281 self.inflight_barriers.is_empty() && self.is_stopping,
282 )
283 }
284}
285
286pub(super) struct PartialGraphManagedBarrierState {
287 epoch_barrier_state_map: BTreeMap<u64, BarrierState>,
291
292 prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,
293
294 mv_depended_subscriptions: HashMap<TableId, HashSet<u32>>,
295
296 pub(super) create_mview_progress: HashMap<u64, HashMap<ActorId, BackfillState>>,
301
302 state_store: StateStoreImpl,
303
304 streaming_metrics: Arc<StreamingMetrics>,
305}
306
307impl PartialGraphManagedBarrierState {
308 pub(super) fn new(actor_manager: &StreamActorManager) -> Self {
309 Self::new_inner(
310 actor_manager.env.state_store(),
311 actor_manager.streaming_metrics.clone(),
312 )
313 }
314
315 fn new_inner(state_store: StateStoreImpl, streaming_metrics: Arc<StreamingMetrics>) -> Self {
316 Self {
317 epoch_barrier_state_map: Default::default(),
318 prev_barrier_table_ids: None,
319 mv_depended_subscriptions: Default::default(),
320 create_mview_progress: Default::default(),
321 state_store,
322 streaming_metrics,
323 }
324 }
325
326 #[cfg(test)]
327 pub(crate) fn for_test() -> Self {
328 Self::new_inner(
329 StateStoreImpl::for_test(),
330 Arc::new(StreamingMetrics::unused()),
331 )
332 }
333
334 pub(super) fn is_empty(&self) -> bool {
335 self.epoch_barrier_state_map.is_empty()
336 }
337}
338
339pub(crate) struct SuspendedDatabaseState {
340 pub(super) suspend_time: Instant,
341 inner: DatabaseManagedBarrierState,
342 failure: Option<(Option<ActorId>, StreamError)>,
343}
344
345impl SuspendedDatabaseState {
346 fn new(
347 state: DatabaseManagedBarrierState,
348 failure: Option<(Option<ActorId>, StreamError)>,
349 _completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>, ) -> Self {
351 Self {
352 suspend_time: Instant::now(),
353 inner: state,
354 failure,
355 }
356 }
357
358 async fn reset(mut self) -> ResetDatabaseOutput {
359 let root_err = self.inner.try_find_root_actor_failure(self.failure).await;
360 self.inner.abort_and_wait_actors().await;
361 if let Some(hummock) = self.inner.actor_manager.env.state_store().as_hummock() {
362 hummock.clear_tables(self.inner.table_ids).await;
363 }
364 ResetDatabaseOutput { root_err }
365 }
366}
367
368pub(crate) struct ResettingDatabaseState {
369 join_handle: JoinHandle<ResetDatabaseOutput>,
370 reset_request_id: u32,
371}
372
373pub(crate) struct ResetDatabaseOutput {
374 pub(crate) root_err: Option<ScoredStreamError>,
375}
376
377pub(crate) enum DatabaseStatus {
378 Running(DatabaseManagedBarrierState),
379 Suspended(SuspendedDatabaseState),
380 Resetting(ResettingDatabaseState),
381 Unspecified,
383}
384
385impl DatabaseStatus {
386 pub(crate) async fn abort(&mut self) {
387 match self {
388 DatabaseStatus::Running(state) => {
389 state.abort_and_wait_actors().await;
390 }
391 DatabaseStatus::Suspended(SuspendedDatabaseState { inner: state, .. }) => {
392 state.abort_and_wait_actors().await;
393 }
394 DatabaseStatus::Resetting(state) => {
395 (&mut state.join_handle)
396 .await
397 .expect("failed to join reset database join handle");
398 }
399 DatabaseStatus::Unspecified => {
400 unreachable!()
401 }
402 }
403 }
404
405 pub(crate) fn state_for_request(&mut self) -> Option<&mut DatabaseManagedBarrierState> {
406 match self {
407 DatabaseStatus::Running(state) => Some(state),
408 DatabaseStatus::Suspended(_) => None,
409 DatabaseStatus::Resetting(_) => {
410 unreachable!("should not receive further request during cleaning")
411 }
412 DatabaseStatus::Unspecified => {
413 unreachable!()
414 }
415 }
416 }
417
418 pub(super) fn poll_next_event(
419 &mut self,
420 cx: &mut Context<'_>,
421 ) -> Poll<ManagedBarrierStateEvent> {
422 match self {
423 DatabaseStatus::Running(state) => state.poll_next_event(cx),
424 DatabaseStatus::Suspended(_) => Poll::Pending,
425 DatabaseStatus::Resetting(state) => state.join_handle.poll_unpin(cx).map(|result| {
426 let output = result.expect("should be able to join");
427 ManagedBarrierStateEvent::DatabaseReset(output, state.reset_request_id)
428 }),
429 DatabaseStatus::Unspecified => {
430 unreachable!()
431 }
432 }
433 }
434
435 pub(super) fn suspend(
436 &mut self,
437 failed_actor: Option<ActorId>,
438 err: StreamError,
439 completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
440 ) {
441 let state = must_match!(replace(self, DatabaseStatus::Unspecified), DatabaseStatus::Running(state) => state);
442 *self = DatabaseStatus::Suspended(SuspendedDatabaseState::new(
443 state,
444 Some((failed_actor, err)),
445 completing_futures,
446 ));
447 }
448
449 pub(super) fn start_reset(
450 &mut self,
451 database_id: DatabaseId,
452 completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
453 reset_request_id: u32,
454 ) {
455 let join_handle = match replace(self, DatabaseStatus::Unspecified) {
456 DatabaseStatus::Running(state) => {
457 assert_eq!(database_id, state.database_id);
458 info!(
459 database_id = database_id.database_id,
460 reset_request_id, "start database reset from Running"
461 );
462 tokio::spawn(SuspendedDatabaseState::new(state, None, completing_futures).reset())
463 }
464 DatabaseStatus::Suspended(state) => {
465 assert!(
466 completing_futures.is_none(),
467 "should have been clear when suspended"
468 );
469 assert_eq!(database_id, state.inner.database_id);
470 info!(
471 database_id = database_id.database_id,
472 reset_request_id,
473 suspend_elapsed = ?state.suspend_time.elapsed(),
474 "start database reset after suspended"
475 );
476 tokio::spawn(state.reset())
477 }
478 DatabaseStatus::Resetting(state) => {
479 let prev_request_id = state.reset_request_id;
480 info!(
481 database_id = database_id.database_id,
482 reset_request_id, prev_request_id, "receive duplicate reset request"
483 );
484 assert!(reset_request_id > prev_request_id);
485 state.join_handle
486 }
487 DatabaseStatus::Unspecified => {
488 unreachable!()
489 }
490 };
491 *self = DatabaseStatus::Resetting(ResettingDatabaseState {
492 join_handle,
493 reset_request_id,
494 });
495 }
496}
497
498pub(crate) struct ManagedBarrierState {
499 pub(crate) databases: HashMap<DatabaseId, DatabaseStatus>,
500 pub(crate) current_shared_context: HashMap<DatabaseId, Arc<SharedContext>>,
501}
502
503pub(super) enum ManagedBarrierStateEvent {
504 BarrierCollected {
505 partial_graph_id: PartialGraphId,
506 barrier: Barrier,
507 },
508 ActorError {
509 actor_id: ActorId,
510 err: StreamError,
511 },
512 DatabaseReset(ResetDatabaseOutput, u32),
513}
514
515impl ManagedBarrierState {
516 pub(super) fn new(
517 actor_manager: Arc<StreamActorManager>,
518 initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
519 term_id: String,
520 ) -> Self {
521 let mut databases = HashMap::new();
522 let mut current_shared_context = HashMap::new();
523 for database in initial_partial_graphs {
524 let database_id = DatabaseId::new(database.database_id);
525 assert!(!databases.contains_key(&database_id));
526 let shared_context = Arc::new(SharedContext::new(
527 database_id,
528 &actor_manager.env,
529 term_id.clone(),
530 ));
531 shared_context.add_actors(
532 database
533 .graphs
534 .iter()
535 .flat_map(|graph| graph.actor_infos.iter())
536 .cloned(),
537 );
538 let state = DatabaseManagedBarrierState::new(
539 database_id,
540 actor_manager.clone(),
541 shared_context.clone(),
542 database.graphs,
543 );
544 databases.insert(database_id, DatabaseStatus::Running(state));
545 current_shared_context.insert(database_id, shared_context);
546 }
547
548 Self {
549 databases,
550 current_shared_context,
551 }
552 }
553
554 pub(super) fn next_event(
555 &mut self,
556 ) -> impl Future<Output = (DatabaseId, ManagedBarrierStateEvent)> + '_ {
557 poll_fn(|cx| {
558 for (database_id, database) in &mut self.databases {
559 if let Poll::Ready(event) = database.poll_next_event(cx) {
560 return Poll::Ready((*database_id, event));
561 }
562 }
563 Poll::Pending
564 })
565 }
566}
567
568pub(crate) struct DatabaseManagedBarrierState {
569 database_id: DatabaseId,
570 pub(super) actor_states: HashMap<ActorId, InflightActorState>,
571
572 pub(super) graph_states: HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
573
574 table_ids: HashSet<TableId>,
575
576 actor_manager: Arc<StreamActorManager>,
577
578 pub(super) current_shared_context: Arc<SharedContext>,
579 pub(super) local_barrier_manager: LocalBarrierManager,
580
581 barrier_event_rx: UnboundedReceiver<LocalBarrierEvent>,
582 pub(super) actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,
583}
584
585impl DatabaseManagedBarrierState {
586 pub(super) fn new(
588 database_id: DatabaseId,
589 actor_manager: Arc<StreamActorManager>,
590 current_shared_context: Arc<SharedContext>,
591 initial_partial_graphs: Vec<InitialPartialGraph>,
592 ) -> Self {
593 let (local_barrier_manager, barrier_event_rx, actor_failure_rx) =
594 LocalBarrierManager::new();
595 Self {
596 database_id,
597 actor_states: Default::default(),
598 graph_states: initial_partial_graphs
599 .into_iter()
600 .map(|graph| {
601 let mut state = PartialGraphManagedBarrierState::new(&actor_manager);
602 state.add_subscriptions(graph.subscriptions);
603 (PartialGraphId::new(graph.partial_graph_id), state)
604 })
605 .collect(),
606 table_ids: Default::default(),
607 actor_manager,
608 current_shared_context,
609 local_barrier_manager,
610 barrier_event_rx,
611 actor_failure_rx,
612 }
613 }
614
615 pub(super) fn to_debug_info(&self) -> ManagedBarrierStateDebugInfo<'_> {
616 ManagedBarrierStateDebugInfo {
617 running_actors: self.actor_states.keys().cloned().collect(),
618 graph_states: &self.graph_states,
619 }
620 }
621
622 async fn abort_and_wait_actors(&mut self) {
623 for (actor_id, state) in &self.actor_states {
624 tracing::debug!("force stopping actor {}", actor_id);
625 state.join_handle.abort();
626 if let Some(monitor_task_handle) = &state.monitor_task_handle {
627 monitor_task_handle.abort();
628 }
629 }
630
631 for (actor_id, state) in self.actor_states.drain() {
632 tracing::debug!("join actor {}", actor_id);
633 let result = state.join_handle.await;
634 assert!(result.is_ok() || result.unwrap_err().is_cancelled());
635 }
636 }
637}
638
639impl InflightActorState {
640 pub(super) fn register_barrier_sender(
641 &mut self,
642 tx: mpsc::UnboundedSender<Barrier>,
643 ) -> StreamResult<()> {
644 match &self.status {
645 InflightActorStatus::IssuedFirst(pending_barriers) => {
646 for barrier in pending_barriers {
647 tx.send(barrier.clone()).map_err(|_| {
648 StreamError::barrier_send(
649 barrier.clone(),
650 self.actor_id,
651 "failed to send pending barriers to newly registered sender",
652 )
653 })?;
654 }
655 self.barrier_senders.push(tx);
656 }
657 InflightActorStatus::Running(_) => {
658 unreachable!("should not register barrier sender when entering Running status")
659 }
660 }
661 Ok(())
662 }
663}
664
665impl DatabaseManagedBarrierState {
666 pub(super) fn register_barrier_sender(
667 &mut self,
668 actor_id: ActorId,
669 tx: mpsc::UnboundedSender<Barrier>,
670 ) -> StreamResult<()> {
671 self.actor_states
672 .get_mut(&actor_id)
673 .expect("should exist")
674 .register_barrier_sender(tx)
675 }
676}
677
678impl PartialGraphManagedBarrierState {
679 pub(super) fn add_subscriptions(&mut self, subscriptions: Vec<SubscriptionUpstreamInfo>) {
680 for subscription_to_add in subscriptions {
681 if !self
682 .mv_depended_subscriptions
683 .entry(TableId::new(subscription_to_add.upstream_mv_table_id))
684 .or_default()
685 .insert(subscription_to_add.subscriber_id)
686 {
687 if cfg!(debug_assertions) {
688 panic!("add an existing subscription: {:?}", subscription_to_add);
689 }
690 warn!(?subscription_to_add, "add an existing subscription");
691 }
692 }
693 }
694
695 pub(super) fn remove_subscriptions(&mut self, subscriptions: Vec<SubscriptionUpstreamInfo>) {
696 for subscription_to_remove in subscriptions {
697 let upstream_table_id = TableId::new(subscription_to_remove.upstream_mv_table_id);
698 let Some(subscribers) = self.mv_depended_subscriptions.get_mut(&upstream_table_id)
699 else {
700 if cfg!(debug_assertions) {
701 panic!(
702 "unable to find upstream mv table to remove: {:?}",
703 subscription_to_remove
704 );
705 }
706 warn!(
707 ?subscription_to_remove,
708 "unable to find upstream mv table to remove"
709 );
710 continue;
711 };
712 if !subscribers.remove(&subscription_to_remove.subscriber_id) {
713 if cfg!(debug_assertions) {
714 panic!(
715 "unable to find subscriber to remove: {:?}",
716 subscription_to_remove
717 );
718 }
719 warn!(
720 ?subscription_to_remove,
721 "unable to find subscriber to remove"
722 );
723 }
724 if subscribers.is_empty() {
725 self.mv_depended_subscriptions.remove(&upstream_table_id);
726 }
727 }
728 }
729}
730
731impl DatabaseManagedBarrierState {
732 pub(super) fn transform_to_issued(
733 &mut self,
734 barrier: &Barrier,
735 request: InjectBarrierRequest,
736 ) -> StreamResult<()> {
737 let partial_graph_id = PartialGraphId::new(request.partial_graph_id);
738 let actor_to_stop = barrier.all_stop_actors();
739 let is_stop_actor = |actor_id| {
740 actor_to_stop
741 .map(|actors| actors.contains(&actor_id))
742 .unwrap_or(false)
743 };
744 let graph_state = self
745 .graph_states
746 .get_mut(&partial_graph_id)
747 .expect("should exist");
748
749 graph_state.add_subscriptions(request.subscriptions_to_add);
750 graph_state.remove_subscriptions(request.subscriptions_to_remove);
751
752 let table_ids =
753 HashSet::from_iter(request.table_ids_to_sync.iter().cloned().map(TableId::new));
754 self.table_ids.extend(table_ids.iter().cloned());
755
756 graph_state.transform_to_issued(
757 barrier,
758 request.actor_ids_to_collect.iter().cloned(),
759 table_ids,
760 );
761
762 let mut new_actors = HashSet::new();
763 let subscriptions =
764 LazyCell::new(|| Arc::new(graph_state.mv_depended_subscriptions.clone()));
765 for (node, fragment_id, actor) in
766 request
767 .actors_to_build
768 .into_iter()
769 .flat_map(|fragment_actors| {
770 let node = Arc::new(fragment_actors.node.unwrap());
771 fragment_actors
772 .actors
773 .into_iter()
774 .map(move |actor| (node.clone(), fragment_actors.fragment_id, actor))
775 })
776 {
777 let actor_id = actor.actor_id;
778 assert!(!is_stop_actor(actor_id));
779 assert!(new_actors.insert(actor_id));
780 assert!(request.actor_ids_to_collect.contains(&actor_id));
781 let (join_handle, monitor_join_handle) = self.actor_manager.spawn_actor(
782 actor,
783 fragment_id,
784 node,
785 (*subscriptions).clone(),
786 self.current_shared_context.clone(),
787 self.local_barrier_manager.clone(),
788 );
789 assert!(
790 self.actor_states
791 .try_insert(
792 actor_id,
793 InflightActorState::start(
794 actor_id,
795 partial_graph_id,
796 barrier,
797 join_handle,
798 monitor_join_handle
799 )
800 )
801 .is_ok()
802 );
803 }
804
805 if cfg!(test) {
807 for actor_id in &request.actor_ids_to_collect {
808 if !self.actor_states.contains_key(actor_id) {
809 let join_handle = self.actor_manager.runtime.spawn(async { pending().await });
810 assert!(
811 self.actor_states
812 .try_insert(
813 *actor_id,
814 InflightActorState::start(
815 *actor_id,
816 partial_graph_id,
817 barrier,
818 join_handle,
819 None,
820 )
821 )
822 .is_ok()
823 );
824 new_actors.insert(*actor_id);
825 }
826 }
827 }
828
829 for actor_id in &request.actor_ids_to_collect {
832 if new_actors.contains(actor_id) {
833 continue;
834 }
835 self.actor_states
836 .get_mut(actor_id)
837 .unwrap_or_else(|| {
838 panic!(
839 "should exist: {} {:?}",
840 actor_id, request.actor_ids_to_collect
841 );
842 })
843 .issue_barrier(partial_graph_id, barrier, is_stop_actor(*actor_id))?;
844 }
845
846 Ok(())
847 }
848
849 pub(super) fn poll_next_event(
850 &mut self,
851 cx: &mut Context<'_>,
852 ) -> Poll<ManagedBarrierStateEvent> {
853 if let Poll::Ready(option) = self.actor_failure_rx.poll_recv(cx) {
854 let (actor_id, err) = option.expect("non-empty when tx in local_barrier_manager");
855 return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
856 }
857 for (partial_graph_id, graph_state) in &mut self.graph_states {
859 if let Some(barrier) = graph_state.may_have_collected_all() {
860 if let Some(actors_to_stop) = barrier.all_stop_actors() {
861 self.current_shared_context.drop_actors(actors_to_stop);
862 }
863 return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
864 partial_graph_id: *partial_graph_id,
865 barrier,
866 });
867 }
868 }
869 while let Poll::Ready(event) = self.barrier_event_rx.poll_recv(cx) {
870 match event.expect("non-empty when tx in local_barrier_manager") {
871 LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => {
872 if let Some((partial_graph_id, barrier)) = self.collect(actor_id, epoch) {
873 if let Some(actors_to_stop) = barrier.all_stop_actors() {
874 self.current_shared_context.drop_actors(actors_to_stop);
875 }
876 return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
877 partial_graph_id,
878 barrier,
879 });
880 }
881 }
882 LocalBarrierEvent::ReportCreateProgress {
883 epoch,
884 actor,
885 state,
886 } => {
887 self.update_create_mview_progress(epoch, actor, state);
888 }
889 LocalBarrierEvent::RegisterBarrierSender {
890 actor_id,
891 barrier_sender,
892 } => {
893 if let Err(err) = self.register_barrier_sender(actor_id, barrier_sender) {
894 return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
895 }
896 }
897 }
898 }
899
900 debug_assert!(
901 self.graph_states
902 .values_mut()
903 .all(|graph_state| graph_state.may_have_collected_all().is_none())
904 );
905 Poll::Pending
906 }
907}
908
909impl DatabaseManagedBarrierState {
910 #[must_use]
911 pub(super) fn collect(
912 &mut self,
913 actor_id: ActorId,
914 epoch: EpochPair,
915 ) -> Option<(PartialGraphId, Barrier)> {
916 let (prev_partial_graph_id, is_finished) = self
917 .actor_states
918 .get_mut(&actor_id)
919 .expect("should exist")
920 .collect(epoch);
921 if is_finished {
922 let state = self.actor_states.remove(&actor_id).expect("should exist");
923 if let Some(monitor_task_handle) = state.monitor_task_handle {
924 monitor_task_handle.abort();
925 }
926 }
927 let prev_graph_state = self
928 .graph_states
929 .get_mut(&prev_partial_graph_id)
930 .expect("should exist");
931 prev_graph_state.collect(actor_id, epoch);
932 prev_graph_state
933 .may_have_collected_all()
934 .map(|barrier| (prev_partial_graph_id, barrier))
935 }
936
937 pub(super) fn pop_barrier_to_complete(
938 &mut self,
939 partial_graph_id: PartialGraphId,
940 prev_epoch: u64,
941 ) -> (
942 Barrier,
943 Option<HashSet<TableId>>,
944 Vec<PbCreateMviewProgress>,
945 ) {
946 self.graph_states
947 .get_mut(&partial_graph_id)
948 .expect("should exist")
949 .pop_barrier_to_complete(prev_epoch)
950 }
951}
952
953impl PartialGraphManagedBarrierState {
954 fn may_have_collected_all(&mut self) -> Option<Barrier> {
958 for barrier_state in self.epoch_barrier_state_map.values_mut() {
959 match &barrier_state.inner {
960 ManagedBarrierStateInner::Issued(IssuedState {
961 remaining_actors, ..
962 }) if remaining_actors.is_empty() => {}
963 ManagedBarrierStateInner::AllCollected(_) => {
964 continue;
965 }
966 ManagedBarrierStateInner::Issued(_) => {
967 break;
968 }
969 }
970
971 self.streaming_metrics.barrier_manager_progress.inc();
972
973 let create_mview_progress = self
974 .create_mview_progress
975 .remove(&barrier_state.barrier.epoch.curr)
976 .unwrap_or_default()
977 .into_iter()
978 .map(|(actor, state)| state.to_pb(actor))
979 .collect();
980
981 let prev_state = replace(
982 &mut barrier_state.inner,
983 ManagedBarrierStateInner::AllCollected(create_mview_progress),
984 );
985
986 must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState {
987 barrier_inflight_latency: timer,
988 ..
989 }) => {
990 timer.observe_duration();
991 });
992
993 return Some(barrier_state.barrier.clone());
994 }
995 None
996 }
997
998 fn pop_barrier_to_complete(
999 &mut self,
1000 prev_epoch: u64,
1001 ) -> (
1002 Barrier,
1003 Option<HashSet<TableId>>,
1004 Vec<PbCreateMviewProgress>,
1005 ) {
1006 let (popped_prev_epoch, barrier_state) = self
1007 .epoch_barrier_state_map
1008 .pop_first()
1009 .expect("should exist");
1010
1011 assert_eq!(prev_epoch, popped_prev_epoch);
1012
1013 let create_mview_progress = must_match!(barrier_state.inner, ManagedBarrierStateInner::AllCollected(create_mview_progress) => {
1014 create_mview_progress
1015 });
1016 (
1017 barrier_state.barrier,
1018 barrier_state.table_ids,
1019 create_mview_progress,
1020 )
1021 }
1022}
1023
1024impl PartialGraphManagedBarrierState {
1025 pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) {
1027 tracing::debug!(
1028 target: "events::stream::barrier::manager::collect",
1029 ?epoch, actor_id, state = ?self.epoch_barrier_state_map,
1030 "collect_barrier",
1031 );
1032
1033 match self.epoch_barrier_state_map.get_mut(&epoch.prev) {
1034 None => {
1035 panic!(
1039 "cannot collect new actor barrier {:?} at current state: None",
1040 epoch,
1041 )
1042 }
1043 Some(&mut BarrierState {
1044 ref barrier,
1045 inner:
1046 ManagedBarrierStateInner::Issued(IssuedState {
1047 ref mut remaining_actors,
1048 ..
1049 }),
1050 ..
1051 }) => {
1052 let exist = remaining_actors.remove(&actor_id);
1053 assert!(
1054 exist,
1055 "the actor doesn't exist. actor_id: {:?}, curr_epoch: {:?}",
1056 actor_id, epoch.curr
1057 );
1058 assert_eq!(barrier.epoch.curr, epoch.curr);
1059 }
1060 Some(BarrierState { inner, .. }) => {
1061 panic!(
1062 "cannot collect new actor barrier {:?} at current state: {:?}",
1063 epoch, inner
1064 )
1065 }
1066 }
1067 }
1068
1069 pub(super) fn transform_to_issued(
1072 &mut self,
1073 barrier: &Barrier,
1074 actor_ids_to_collect: impl IntoIterator<Item = ActorId>,
1075 table_ids: HashSet<TableId>,
1076 ) {
1077 let timer = self
1078 .streaming_metrics
1079 .barrier_inflight_latency
1080 .start_timer();
1081
1082 if let Some(hummock) = self.state_store.as_hummock() {
1083 hummock.start_epoch(barrier.epoch.curr, table_ids.clone());
1084 }
1085
1086 let table_ids = match barrier.kind {
1087 BarrierKind::Unspecified => {
1088 unreachable!()
1089 }
1090 BarrierKind::Initial => {
1091 assert!(
1092 self.prev_barrier_table_ids.is_none(),
1093 "non empty table_ids at initial barrier: {:?}",
1094 self.prev_barrier_table_ids
1095 );
1096 info!(epoch = ?barrier.epoch, "initialize at Initial barrier");
1097 self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1098 None
1099 }
1100 BarrierKind::Barrier => {
1101 if let Some((prev_epoch, prev_table_ids)) = self.prev_barrier_table_ids.as_mut() {
1102 assert_eq!(prev_epoch.curr, barrier.epoch.prev);
1103 assert_eq!(prev_table_ids, &table_ids);
1104 *prev_epoch = barrier.epoch;
1105 } else {
1106 info!(epoch = ?barrier.epoch, "initialize at non-checkpoint barrier");
1107 self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1108 }
1109 None
1110 }
1111 BarrierKind::Checkpoint => Some(
1112 if let Some((prev_epoch, prev_table_ids)) = self
1113 .prev_barrier_table_ids
1114 .replace((barrier.epoch, table_ids))
1115 && prev_epoch.curr == barrier.epoch.prev
1116 {
1117 prev_table_ids
1118 } else {
1119 debug!(epoch = ?barrier.epoch, "reinitialize at Checkpoint barrier");
1120 HashSet::new()
1121 },
1122 ),
1123 };
1124
1125 if let Some(&mut BarrierState { ref inner, .. }) =
1126 self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev)
1127 {
1128 {
1129 panic!(
1130 "barrier epochs{:?} state has already been `Issued`. Current state: {:?}",
1131 barrier.epoch, inner
1132 );
1133 }
1134 };
1135
1136 self.epoch_barrier_state_map.insert(
1137 barrier.epoch.prev,
1138 BarrierState {
1139 barrier: barrier.clone(),
1140 inner: ManagedBarrierStateInner::Issued(IssuedState {
1141 remaining_actors: BTreeSet::from_iter(actor_ids_to_collect),
1142 barrier_inflight_latency: timer,
1143 }),
1144 table_ids,
1145 },
1146 );
1147 }
1148
1149 #[cfg(test)]
1150 async fn pop_next_completed_epoch(&mut self) -> u64 {
1151 if let Some(barrier) = self.may_have_collected_all() {
1152 self.pop_barrier_to_complete(barrier.epoch.prev);
1153 return barrier.epoch.prev;
1154 }
1155 pending().await
1156 }
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161 use std::collections::HashSet;
1162
1163 use risingwave_common::util::epoch::test_epoch;
1164
1165 use crate::executor::Barrier;
1166 use crate::task::barrier_manager::managed_state::PartialGraphManagedBarrierState;
1167
1168 #[tokio::test]
1169 async fn test_managed_state_add_actor() {
1170 let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1171 let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1172 let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1173 let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1174 let actor_ids_to_collect1 = HashSet::from([1, 2]);
1175 let actor_ids_to_collect2 = HashSet::from([1, 2]);
1176 let actor_ids_to_collect3 = HashSet::from([1, 2, 3]);
1177 managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1178 managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1179 managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1180 managed_barrier_state.collect(1, barrier1.epoch);
1181 managed_barrier_state.collect(2, barrier1.epoch);
1182 assert_eq!(
1183 managed_barrier_state.pop_next_completed_epoch().await,
1184 test_epoch(0)
1185 );
1186 assert_eq!(
1187 managed_barrier_state
1188 .epoch_barrier_state_map
1189 .first_key_value()
1190 .unwrap()
1191 .0,
1192 &test_epoch(1)
1193 );
1194 managed_barrier_state.collect(1, barrier2.epoch);
1195 managed_barrier_state.collect(1, barrier3.epoch);
1196 managed_barrier_state.collect(2, barrier2.epoch);
1197 assert_eq!(
1198 managed_barrier_state.pop_next_completed_epoch().await,
1199 test_epoch(1)
1200 );
1201 assert_eq!(
1202 managed_barrier_state
1203 .epoch_barrier_state_map
1204 .first_key_value()
1205 .unwrap()
1206 .0,
1207 &test_epoch(2)
1208 );
1209 managed_barrier_state.collect(2, barrier3.epoch);
1210 managed_barrier_state.collect(3, barrier3.epoch);
1211 assert_eq!(
1212 managed_barrier_state.pop_next_completed_epoch().await,
1213 test_epoch(2)
1214 );
1215 assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1216 }
1217
1218 #[tokio::test]
1219 async fn test_managed_state_stop_actor() {
1220 let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1221 let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1222 let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1223 let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1224 let actor_ids_to_collect1 = HashSet::from([1, 2, 3, 4]);
1225 let actor_ids_to_collect2 = HashSet::from([1, 2, 3]);
1226 let actor_ids_to_collect3 = HashSet::from([1, 2]);
1227 managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1228 managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1229 managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1230
1231 managed_barrier_state.collect(1, barrier1.epoch);
1232 managed_barrier_state.collect(1, barrier2.epoch);
1233 managed_barrier_state.collect(1, barrier3.epoch);
1234 managed_barrier_state.collect(2, barrier1.epoch);
1235 managed_barrier_state.collect(2, barrier2.epoch);
1236 managed_barrier_state.collect(2, barrier3.epoch);
1237 assert_eq!(
1238 managed_barrier_state
1239 .epoch_barrier_state_map
1240 .first_key_value()
1241 .unwrap()
1242 .0,
1243 &0
1244 );
1245 managed_barrier_state.collect(3, barrier1.epoch);
1246 managed_barrier_state.collect(3, barrier2.epoch);
1247 assert_eq!(
1248 managed_barrier_state
1249 .epoch_barrier_state_map
1250 .first_key_value()
1251 .unwrap()
1252 .0,
1253 &0
1254 );
1255 managed_barrier_state.collect(4, barrier1.epoch);
1256 assert_eq!(
1257 managed_barrier_state.pop_next_completed_epoch().await,
1258 test_epoch(0)
1259 );
1260 assert_eq!(
1261 managed_barrier_state.pop_next_completed_epoch().await,
1262 test_epoch(1)
1263 );
1264 assert_eq!(
1265 managed_barrier_state.pop_next_completed_epoch().await,
1266 test_epoch(2)
1267 );
1268 assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1269 }
1270}