1use std::cell::LazyCell;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::fmt::{Debug, Display, Formatter};
18use std::future::{Future, pending, poll_fn};
19use std::mem::replace;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use anyhow::anyhow;
25use futures::FutureExt;
26use futures::stream::FuturesOrdered;
27use prometheus::HistogramTimer;
28use risingwave_common::catalog::{DatabaseId, TableId};
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_pb::stream_plan::barrier::BarrierKind;
31use risingwave_storage::StateStoreImpl;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
33use tokio::sync::{mpsc, oneshot};
34use tokio::task::JoinHandle;
35
36use crate::error::{StreamError, StreamResult};
37use crate::executor::Barrier;
38use crate::executor::monitor::StreamingMetrics;
39use crate::task::progress::BackfillState;
40use crate::task::{
41 ActorId, LocalBarrierEvent, LocalBarrierManager, NewOutputRequest, PartialGraphId,
42 StreamActorManager, UpDownActorIds,
43};
44
45struct IssuedState {
46 pub remaining_actors: BTreeSet<ActorId>,
48
49 pub barrier_inflight_latency: HistogramTimer,
50}
51
52impl Debug for IssuedState {
53 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54 f.debug_struct("IssuedState")
55 .field("remaining_actors", &self.remaining_actors)
56 .finish()
57 }
58}
59
60#[derive(Debug)]
62enum ManagedBarrierStateInner {
63 Issued(IssuedState),
65
66 AllCollected(Vec<PbCreateMviewProgress>),
68}
69
70#[derive(Debug)]
71struct BarrierState {
72 barrier: Barrier,
73 table_ids: Option<HashSet<TableId>>,
75 inner: ManagedBarrierStateInner,
76}
77
78use risingwave_common::must_match;
79use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
80use risingwave_pb::stream_service::InjectBarrierRequest;
81use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
82use risingwave_pb::stream_service::streaming_control_stream_request::{
83 DatabaseInitialPartialGraph, InitialPartialGraph,
84};
85
86use crate::executor::exchange::permit;
87use crate::executor::exchange::permit::channel_from_config;
88use crate::task::barrier_worker::ScoredStreamError;
89use crate::task::barrier_worker::await_epoch_completed_future::AwaitEpochCompletedFuture;
90
91pub(super) struct ManagedBarrierStateDebugInfo<'a> {
92 running_actors: BTreeSet<ActorId>,
93 graph_states: &'a HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
94}
95
96impl Display for ManagedBarrierStateDebugInfo<'_> {
97 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
98 write!(f, "running_actors: ")?;
99 for actor_id in &self.running_actors {
100 write!(f, "{}, ", actor_id)?;
101 }
102 for (partial_graph_id, graph_states) in self.graph_states {
103 writeln!(f, "--- Partial Group {}", partial_graph_id.0)?;
104 write!(f, "{}", graph_states)?;
105 }
106 Ok(())
107 }
108}
109
110impl Display for &'_ PartialGraphManagedBarrierState {
111 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
112 let mut prev_epoch = 0u64;
113 for (epoch, barrier_state) in &self.epoch_barrier_state_map {
114 write!(f, "> Epoch {}: ", epoch)?;
115 match &barrier_state.inner {
116 ManagedBarrierStateInner::Issued(state) => {
117 write!(
118 f,
119 "Issued [{:?}]. Remaining actors: [",
120 barrier_state.barrier.kind
121 )?;
122 let mut is_prev_epoch_issued = false;
123 if prev_epoch != 0 {
124 let bs = &self.epoch_barrier_state_map[&prev_epoch];
125 if let ManagedBarrierStateInner::Issued(IssuedState {
126 remaining_actors: remaining_actors_prev,
127 ..
128 }) = &bs.inner
129 {
130 is_prev_epoch_issued = true;
132 let mut duplicates = 0usize;
133 for actor_id in &state.remaining_actors {
134 if !remaining_actors_prev.contains(actor_id) {
135 write!(f, "{}, ", actor_id)?;
136 } else {
137 duplicates += 1;
138 }
139 }
140 if duplicates > 0 {
141 write!(f, "...and {} actors in prev epoch", duplicates)?;
142 }
143 }
144 }
145 if !is_prev_epoch_issued {
146 for actor_id in &state.remaining_actors {
147 write!(f, "{}, ", actor_id)?;
148 }
149 }
150 write!(f, "]")?;
151 }
152 ManagedBarrierStateInner::AllCollected(_) => {
153 write!(f, "AllCollected")?;
154 }
155 }
156 prev_epoch = *epoch;
157 writeln!(f)?;
158 }
159
160 if !self.create_mview_progress.is_empty() {
161 writeln!(f, "Create MView Progress:")?;
162 for (epoch, progress) in &self.create_mview_progress {
163 write!(f, "> Epoch {}:", epoch)?;
164 for (actor_id, state) in progress {
165 write!(f, ">> Actor {}: {}, ", actor_id, state)?;
166 }
167 }
168 }
169
170 Ok(())
171 }
172}
173
174enum InflightActorStatus {
175 IssuedFirst(Vec<Barrier>),
177 Running(u64),
179}
180
181impl InflightActorStatus {
182 fn max_issued_epoch(&self) -> u64 {
183 match self {
184 InflightActorStatus::Running(epoch) => *epoch,
185 InflightActorStatus::IssuedFirst(issued_barriers) => {
186 issued_barriers.last().expect("non-empty").epoch.prev
187 }
188 }
189 }
190}
191
192pub(crate) struct InflightActorState {
193 actor_id: ActorId,
194 barrier_senders: Vec<mpsc::UnboundedSender<Barrier>>,
195 pub(crate) inflight_barriers: BTreeMap<u64, PartialGraphId>,
197 status: InflightActorStatus,
198 is_stopping: bool,
200
201 new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
202 join_handle: JoinHandle<()>,
203 monitor_task_handle: Option<JoinHandle<()>>,
204}
205
206impl InflightActorState {
207 pub(super) fn start(
208 actor_id: ActorId,
209 initial_partial_graph_id: PartialGraphId,
210 initial_barrier: &Barrier,
211 new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
212 join_handle: JoinHandle<()>,
213 monitor_task_handle: Option<JoinHandle<()>>,
214 ) -> Self {
215 Self {
216 actor_id,
217 barrier_senders: vec![],
218 inflight_barriers: BTreeMap::from_iter([(
219 initial_barrier.epoch.prev,
220 initial_partial_graph_id,
221 )]),
222 status: InflightActorStatus::IssuedFirst(vec![initial_barrier.clone()]),
223 is_stopping: false,
224 new_output_request_tx,
225 join_handle,
226 monitor_task_handle,
227 }
228 }
229
230 pub(super) fn issue_barrier(
231 &mut self,
232 partial_graph_id: PartialGraphId,
233 barrier: &Barrier,
234 is_stop: bool,
235 ) -> StreamResult<()> {
236 assert!(barrier.epoch.prev > self.status.max_issued_epoch());
237
238 for barrier_sender in &self.barrier_senders {
239 barrier_sender.send(barrier.clone()).map_err(|_| {
240 StreamError::barrier_send(
241 barrier.clone(),
242 self.actor_id,
243 "failed to send to registered sender",
244 )
245 })?;
246 }
247
248 assert!(
249 self.inflight_barriers
250 .insert(barrier.epoch.prev, partial_graph_id)
251 .is_none()
252 );
253
254 match &mut self.status {
255 InflightActorStatus::IssuedFirst(pending_barriers) => {
256 pending_barriers.push(barrier.clone());
257 }
258 InflightActorStatus::Running(prev_epoch) => {
259 *prev_epoch = barrier.epoch.prev;
260 }
261 };
262
263 if is_stop {
264 assert!(!self.is_stopping, "stopped actor should not issue barrier");
265 self.is_stopping = true;
266 }
267 Ok(())
268 }
269
270 pub(super) fn collect(&mut self, epoch: EpochPair) -> (PartialGraphId, bool) {
271 let (prev_epoch, prev_partial_graph_id) =
272 self.inflight_barriers.pop_first().expect("should exist");
273 assert_eq!(prev_epoch, epoch.prev);
274 match &self.status {
275 InflightActorStatus::IssuedFirst(pending_barriers) => {
276 assert_eq!(
277 prev_epoch,
278 pending_barriers.first().expect("non-empty").epoch.prev
279 );
280 self.status = InflightActorStatus::Running(
281 pending_barriers.last().expect("non-empty").epoch.prev,
282 );
283 }
284 InflightActorStatus::Running(_) => {}
285 }
286 (
287 prev_partial_graph_id,
288 self.inflight_barriers.is_empty() && self.is_stopping,
289 )
290 }
291}
292
293pub(crate) struct PartialGraphManagedBarrierState {
295 epoch_barrier_state_map: BTreeMap<u64, BarrierState>,
299
300 prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,
301
302 mv_depended_subscriptions: HashMap<TableId, HashSet<u32>>,
303
304 pub(crate) create_mview_progress: HashMap<u64, HashMap<ActorId, BackfillState>>,
309
310 state_store: StateStoreImpl,
311
312 streaming_metrics: Arc<StreamingMetrics>,
313}
314
315impl PartialGraphManagedBarrierState {
316 pub(super) fn new(actor_manager: &StreamActorManager) -> Self {
317 Self::new_inner(
318 actor_manager.env.state_store(),
319 actor_manager.streaming_metrics.clone(),
320 )
321 }
322
323 fn new_inner(state_store: StateStoreImpl, streaming_metrics: Arc<StreamingMetrics>) -> Self {
324 Self {
325 epoch_barrier_state_map: Default::default(),
326 prev_barrier_table_ids: None,
327 mv_depended_subscriptions: Default::default(),
328 create_mview_progress: Default::default(),
329 state_store,
330 streaming_metrics,
331 }
332 }
333
334 #[cfg(test)]
335 pub(crate) fn for_test() -> Self {
336 Self::new_inner(
337 StateStoreImpl::for_test(),
338 Arc::new(StreamingMetrics::unused()),
339 )
340 }
341
342 pub(super) fn is_empty(&self) -> bool {
343 self.epoch_barrier_state_map.is_empty()
344 }
345}
346
347pub(crate) struct SuspendedDatabaseState {
348 pub(super) suspend_time: Instant,
349 inner: DatabaseManagedBarrierState,
350 failure: Option<(Option<ActorId>, StreamError)>,
351}
352
353impl SuspendedDatabaseState {
354 fn new(
355 state: DatabaseManagedBarrierState,
356 failure: Option<(Option<ActorId>, StreamError)>,
357 _completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>, ) -> Self {
359 Self {
360 suspend_time: Instant::now(),
361 inner: state,
362 failure,
363 }
364 }
365
366 async fn reset(mut self) -> ResetDatabaseOutput {
367 let root_err = self.inner.try_find_root_actor_failure(self.failure).await;
368 self.inner.abort_and_wait_actors().await;
369 if let Some(hummock) = self.inner.actor_manager.env.state_store().as_hummock() {
370 hummock.clear_tables(self.inner.table_ids).await;
371 }
372 ResetDatabaseOutput { root_err }
373 }
374}
375
376pub(crate) struct ResettingDatabaseState {
377 join_handle: JoinHandle<ResetDatabaseOutput>,
378 reset_request_id: u32,
379}
380
381pub(crate) struct ResetDatabaseOutput {
382 pub(crate) root_err: Option<ScoredStreamError>,
383}
384
385pub(crate) enum DatabaseStatus {
386 ReceivedExchangeRequest(
387 Vec<(
388 UpDownActorIds,
389 oneshot::Sender<StreamResult<permit::Receiver>>,
390 )>,
391 ),
392 Running(DatabaseManagedBarrierState),
393 Suspended(SuspendedDatabaseState),
394 Resetting(ResettingDatabaseState),
395 Unspecified,
397}
398
399impl DatabaseStatus {
400 pub(crate) async fn abort(&mut self) {
401 match self {
402 DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
403 for (_, sender) in pending_requests.drain(..) {
404 let _ = sender.send(Err(anyhow!("database aborted").into()));
405 }
406 }
407 DatabaseStatus::Running(state) => {
408 state.abort_and_wait_actors().await;
409 }
410 DatabaseStatus::Suspended(SuspendedDatabaseState { inner: state, .. }) => {
411 state.abort_and_wait_actors().await;
412 }
413 DatabaseStatus::Resetting(state) => {
414 (&mut state.join_handle)
415 .await
416 .expect("failed to join reset database join handle");
417 }
418 DatabaseStatus::Unspecified => {
419 unreachable!()
420 }
421 }
422 }
423
424 pub(crate) fn state_for_request(&mut self) -> Option<&mut DatabaseManagedBarrierState> {
425 match self {
426 DatabaseStatus::ReceivedExchangeRequest(_) => {
427 unreachable!("should not handle request")
428 }
429 DatabaseStatus::Running(state) => Some(state),
430 DatabaseStatus::Suspended(_) => None,
431 DatabaseStatus::Resetting(_) => {
432 unreachable!("should not receive further request during cleaning")
433 }
434 DatabaseStatus::Unspecified => {
435 unreachable!()
436 }
437 }
438 }
439
440 pub(super) fn poll_next_event(
441 &mut self,
442 cx: &mut Context<'_>,
443 ) -> Poll<ManagedBarrierStateEvent> {
444 match self {
445 DatabaseStatus::ReceivedExchangeRequest(_) => Poll::Pending,
446 DatabaseStatus::Running(state) => state.poll_next_event(cx),
447 DatabaseStatus::Suspended(_) => Poll::Pending,
448 DatabaseStatus::Resetting(state) => state.join_handle.poll_unpin(cx).map(|result| {
449 let output = result.expect("should be able to join");
450 ManagedBarrierStateEvent::DatabaseReset(output, state.reset_request_id)
451 }),
452 DatabaseStatus::Unspecified => {
453 unreachable!()
454 }
455 }
456 }
457
458 pub(super) fn suspend(
459 &mut self,
460 failed_actor: Option<ActorId>,
461 err: StreamError,
462 completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
463 ) {
464 let state = must_match!(replace(self, DatabaseStatus::Unspecified), DatabaseStatus::Running(state) => state);
465 *self = DatabaseStatus::Suspended(SuspendedDatabaseState::new(
466 state,
467 Some((failed_actor, err)),
468 completing_futures,
469 ));
470 }
471
472 pub(super) fn start_reset(
473 &mut self,
474 database_id: DatabaseId,
475 completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
476 reset_request_id: u32,
477 ) {
478 let join_handle = match replace(self, DatabaseStatus::Unspecified) {
479 DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
480 for (_, sender) in pending_requests {
481 let _ = sender.send(Err(anyhow!("database reset").into()));
482 }
483 tokio::spawn(async move { ResetDatabaseOutput { root_err: None } })
484 }
485 DatabaseStatus::Running(state) => {
486 assert_eq!(database_id, state.database_id);
487 info!(
488 database_id = database_id.database_id,
489 reset_request_id, "start database reset from Running"
490 );
491 tokio::spawn(SuspendedDatabaseState::new(state, None, completing_futures).reset())
492 }
493 DatabaseStatus::Suspended(state) => {
494 assert!(
495 completing_futures.is_none(),
496 "should have been clear when suspended"
497 );
498 assert_eq!(database_id, state.inner.database_id);
499 info!(
500 database_id = database_id.database_id,
501 reset_request_id,
502 suspend_elapsed = ?state.suspend_time.elapsed(),
503 "start database reset after suspended"
504 );
505 tokio::spawn(state.reset())
506 }
507 DatabaseStatus::Resetting(state) => {
508 let prev_request_id = state.reset_request_id;
509 info!(
510 database_id = database_id.database_id,
511 reset_request_id, prev_request_id, "receive duplicate reset request"
512 );
513 assert!(reset_request_id > prev_request_id);
514 state.join_handle
515 }
516 DatabaseStatus::Unspecified => {
517 unreachable!()
518 }
519 };
520 *self = DatabaseStatus::Resetting(ResettingDatabaseState {
521 join_handle,
522 reset_request_id,
523 });
524 }
525}
526
527pub(crate) struct ManagedBarrierState {
528 pub(crate) databases: HashMap<DatabaseId, DatabaseStatus>,
529}
530
531pub(super) enum ManagedBarrierStateEvent {
532 BarrierCollected {
533 partial_graph_id: PartialGraphId,
534 barrier: Barrier,
535 },
536 ActorError {
537 actor_id: ActorId,
538 err: StreamError,
539 },
540 DatabaseReset(ResetDatabaseOutput, u32),
541}
542
543impl ManagedBarrierState {
544 pub(super) fn new(
545 actor_manager: Arc<StreamActorManager>,
546 initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
547 term_id: String,
548 ) -> Self {
549 let mut databases = HashMap::new();
550 for database in initial_partial_graphs {
551 let database_id = DatabaseId::new(database.database_id);
552 assert!(!databases.contains_key(&database_id));
553 let state = DatabaseManagedBarrierState::new(
554 database_id,
555 term_id.clone(),
556 actor_manager.clone(),
557 database.graphs,
558 );
559 databases.insert(database_id, DatabaseStatus::Running(state));
560 }
561
562 Self { databases }
563 }
564
565 pub(super) fn next_event(
566 &mut self,
567 ) -> impl Future<Output = (DatabaseId, ManagedBarrierStateEvent)> + '_ {
568 poll_fn(|cx| {
569 for (database_id, database) in &mut self.databases {
570 if let Poll::Ready(event) = database.poll_next_event(cx) {
571 return Poll::Ready((*database_id, event));
572 }
573 }
574 Poll::Pending
575 })
576 }
577}
578
579pub(crate) struct DatabaseManagedBarrierState {
584 database_id: DatabaseId,
585 pub(crate) actor_states: HashMap<ActorId, InflightActorState>,
586 pub(super) actor_pending_new_output_requests:
587 HashMap<ActorId, Vec<(ActorId, NewOutputRequest)>>,
588
589 pub(crate) graph_states: HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
590
591 table_ids: HashSet<TableId>,
592
593 actor_manager: Arc<StreamActorManager>,
594
595 pub(super) local_barrier_manager: LocalBarrierManager,
596
597 barrier_event_rx: UnboundedReceiver<LocalBarrierEvent>,
598 pub(super) actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,
599}
600
601impl DatabaseManagedBarrierState {
602 pub(super) fn new(
604 database_id: DatabaseId,
605 term_id: String,
606 actor_manager: Arc<StreamActorManager>,
607 initial_partial_graphs: Vec<InitialPartialGraph>,
608 ) -> Self {
609 let (local_barrier_manager, barrier_event_rx, actor_failure_rx) =
610 LocalBarrierManager::new(database_id, term_id, actor_manager.env.clone());
611 Self {
612 database_id,
613 actor_states: Default::default(),
614 actor_pending_new_output_requests: Default::default(),
615 graph_states: initial_partial_graphs
616 .into_iter()
617 .map(|graph| {
618 let mut state = PartialGraphManagedBarrierState::new(&actor_manager);
619 state.add_subscriptions(graph.subscriptions);
620 (PartialGraphId::new(graph.partial_graph_id), state)
621 })
622 .collect(),
623 table_ids: Default::default(),
624 actor_manager,
625 local_barrier_manager,
626 barrier_event_rx,
627 actor_failure_rx,
628 }
629 }
630
631 pub(super) fn to_debug_info(&self) -> ManagedBarrierStateDebugInfo<'_> {
632 ManagedBarrierStateDebugInfo {
633 running_actors: self.actor_states.keys().cloned().collect(),
634 graph_states: &self.graph_states,
635 }
636 }
637
638 async fn abort_and_wait_actors(&mut self) {
639 for (actor_id, state) in &self.actor_states {
640 tracing::debug!("force stopping actor {}", actor_id);
641 state.join_handle.abort();
642 if let Some(monitor_task_handle) = &state.monitor_task_handle {
643 monitor_task_handle.abort();
644 }
645 }
646
647 for (actor_id, state) in self.actor_states.drain() {
648 tracing::debug!("join actor {}", actor_id);
649 let result = state.join_handle.await;
650 assert!(result.is_ok() || result.unwrap_err().is_cancelled());
651 }
652 }
653}
654
655impl InflightActorState {
656 pub(super) fn register_barrier_sender(
657 &mut self,
658 tx: mpsc::UnboundedSender<Barrier>,
659 ) -> StreamResult<()> {
660 match &self.status {
661 InflightActorStatus::IssuedFirst(pending_barriers) => {
662 for barrier in pending_barriers {
663 tx.send(barrier.clone()).map_err(|_| {
664 StreamError::barrier_send(
665 barrier.clone(),
666 self.actor_id,
667 "failed to send pending barriers to newly registered sender",
668 )
669 })?;
670 }
671 self.barrier_senders.push(tx);
672 }
673 InflightActorStatus::Running(_) => {
674 unreachable!("should not register barrier sender when entering Running status")
675 }
676 }
677 Ok(())
678 }
679}
680
681impl DatabaseManagedBarrierState {
682 pub(super) fn register_barrier_sender(
683 &mut self,
684 actor_id: ActorId,
685 tx: mpsc::UnboundedSender<Barrier>,
686 ) -> StreamResult<()> {
687 self.actor_states
688 .get_mut(&actor_id)
689 .expect("should exist")
690 .register_barrier_sender(tx)
691 }
692}
693
694impl PartialGraphManagedBarrierState {
695 pub(super) fn add_subscriptions(&mut self, subscriptions: Vec<SubscriptionUpstreamInfo>) {
696 for subscription_to_add in subscriptions {
697 if !self
698 .mv_depended_subscriptions
699 .entry(TableId::new(subscription_to_add.upstream_mv_table_id))
700 .or_default()
701 .insert(subscription_to_add.subscriber_id)
702 {
703 if cfg!(debug_assertions) {
704 panic!("add an existing subscription: {:?}", subscription_to_add);
705 }
706 warn!(?subscription_to_add, "add an existing subscription");
707 }
708 }
709 }
710
711 pub(super) fn remove_subscriptions(&mut self, subscriptions: Vec<SubscriptionUpstreamInfo>) {
712 for subscription_to_remove in subscriptions {
713 let upstream_table_id = TableId::new(subscription_to_remove.upstream_mv_table_id);
714 let Some(subscribers) = self.mv_depended_subscriptions.get_mut(&upstream_table_id)
715 else {
716 if cfg!(debug_assertions) {
717 panic!(
718 "unable to find upstream mv table to remove: {:?}",
719 subscription_to_remove
720 );
721 }
722 warn!(
723 ?subscription_to_remove,
724 "unable to find upstream mv table to remove"
725 );
726 continue;
727 };
728 if !subscribers.remove(&subscription_to_remove.subscriber_id) {
729 if cfg!(debug_assertions) {
730 panic!(
731 "unable to find subscriber to remove: {:?}",
732 subscription_to_remove
733 );
734 }
735 warn!(
736 ?subscription_to_remove,
737 "unable to find subscriber to remove"
738 );
739 }
740 if subscribers.is_empty() {
741 self.mv_depended_subscriptions.remove(&upstream_table_id);
742 }
743 }
744 }
745}
746
747impl DatabaseManagedBarrierState {
748 pub(super) fn transform_to_issued(
749 &mut self,
750 barrier: &Barrier,
751 request: InjectBarrierRequest,
752 ) -> StreamResult<()> {
753 let partial_graph_id = PartialGraphId::new(request.partial_graph_id);
754 let actor_to_stop = barrier.all_stop_actors();
755 let is_stop_actor = |actor_id| {
756 actor_to_stop
757 .map(|actors| actors.contains(&actor_id))
758 .unwrap_or(false)
759 };
760 let graph_state = self
761 .graph_states
762 .get_mut(&partial_graph_id)
763 .expect("should exist");
764
765 graph_state.add_subscriptions(request.subscriptions_to_add);
766 graph_state.remove_subscriptions(request.subscriptions_to_remove);
767
768 let table_ids =
769 HashSet::from_iter(request.table_ids_to_sync.iter().cloned().map(TableId::new));
770 self.table_ids.extend(table_ids.iter().cloned());
771
772 graph_state.transform_to_issued(
773 barrier,
774 request.actor_ids_to_collect.iter().cloned(),
775 table_ids,
776 );
777
778 let mut new_actors = HashSet::new();
779 let subscriptions =
780 LazyCell::new(|| Arc::new(graph_state.mv_depended_subscriptions.clone()));
781 for (node, fragment_id, actor) in
782 request
783 .actors_to_build
784 .into_iter()
785 .flat_map(|fragment_actors| {
786 let node = Arc::new(fragment_actors.node.unwrap());
787 fragment_actors
788 .actors
789 .into_iter()
790 .map(move |actor| (node.clone(), fragment_actors.fragment_id, actor))
791 })
792 {
793 let actor_id = actor.actor_id;
794 assert!(!is_stop_actor(actor_id));
795 assert!(new_actors.insert(actor_id));
796 assert!(request.actor_ids_to_collect.contains(&actor_id));
797 let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
798 if let Some(pending_requests) = self.actor_pending_new_output_requests.remove(&actor_id)
799 {
800 for request in pending_requests {
801 let _ = new_output_request_tx.send(request);
802 }
803 }
804 let (join_handle, monitor_join_handle) = self.actor_manager.spawn_actor(
805 actor,
806 fragment_id,
807 node,
808 (*subscriptions).clone(),
809 self.local_barrier_manager.clone(),
810 new_output_request_rx,
811 );
812 assert!(
813 self.actor_states
814 .try_insert(
815 actor_id,
816 InflightActorState::start(
817 actor_id,
818 partial_graph_id,
819 barrier,
820 new_output_request_tx,
821 join_handle,
822 monitor_join_handle
823 )
824 )
825 .is_ok()
826 );
827 }
828
829 if cfg!(test) {
833 for actor_id in &request.actor_ids_to_collect {
834 if !self.actor_states.contains_key(actor_id) {
835 let (tx, rx) = unbounded_channel();
836 let join_handle = self.actor_manager.runtime.spawn(async move {
837 let _ = rx;
839 pending().await
840 });
841 assert!(
842 self.actor_states
843 .try_insert(
844 *actor_id,
845 InflightActorState::start(
846 *actor_id,
847 partial_graph_id,
848 barrier,
849 tx,
850 join_handle,
851 None,
852 )
853 )
854 .is_ok()
855 );
856 new_actors.insert(*actor_id);
857 }
858 }
859 }
860
861 for actor_id in &request.actor_ids_to_collect {
864 if new_actors.contains(actor_id) {
865 continue;
866 }
867 self.actor_states
868 .get_mut(actor_id)
869 .unwrap_or_else(|| {
870 panic!(
871 "should exist: {} {:?}",
872 actor_id, request.actor_ids_to_collect
873 );
874 })
875 .issue_barrier(partial_graph_id, barrier, is_stop_actor(*actor_id))?;
876 }
877
878 Ok(())
879 }
880
881 pub(super) fn new_actor_remote_output_request(
882 &mut self,
883 actor_id: ActorId,
884 upstream_actor_id: ActorId,
885 result_sender: oneshot::Sender<StreamResult<permit::Receiver>>,
886 ) {
887 let (tx, rx) = channel_from_config(self.local_barrier_manager.env.config());
888 self.new_actor_output_request(actor_id, upstream_actor_id, NewOutputRequest::Remote(tx));
889 let _ = result_sender.send(Ok(rx));
890 }
891
892 pub(super) fn new_actor_output_request(
893 &mut self,
894 actor_id: ActorId,
895 upstream_actor_id: ActorId,
896 request: NewOutputRequest,
897 ) {
898 if let Some(actor) = self.actor_states.get_mut(&upstream_actor_id) {
899 let _ = actor.new_output_request_tx.send((actor_id, request));
900 } else {
901 self.actor_pending_new_output_requests
902 .entry(upstream_actor_id)
903 .or_default()
904 .push((actor_id, request));
905 }
906 }
907
908 pub(super) fn poll_next_event(
910 &mut self,
911 cx: &mut Context<'_>,
912 ) -> Poll<ManagedBarrierStateEvent> {
913 if let Poll::Ready(option) = self.actor_failure_rx.poll_recv(cx) {
914 let (actor_id, err) = option.expect("non-empty when tx in local_barrier_manager");
915 return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
916 }
917 for (partial_graph_id, graph_state) in &mut self.graph_states {
919 if let Some(barrier) = graph_state.may_have_collected_all() {
920 return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
921 partial_graph_id: *partial_graph_id,
922 barrier,
923 });
924 }
925 }
926 while let Poll::Ready(event) = self.barrier_event_rx.poll_recv(cx) {
927 match event.expect("non-empty when tx in local_barrier_manager") {
928 LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => {
929 if let Some((partial_graph_id, barrier)) = self.collect(actor_id, epoch) {
930 return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
931 partial_graph_id,
932 barrier,
933 });
934 }
935 }
936 LocalBarrierEvent::ReportCreateProgress {
937 epoch,
938 actor,
939 state,
940 } => {
941 self.update_create_mview_progress(epoch, actor, state);
942 }
943 LocalBarrierEvent::RegisterBarrierSender {
944 actor_id,
945 barrier_sender,
946 } => {
947 if let Err(err) = self.register_barrier_sender(actor_id, barrier_sender) {
948 return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
949 }
950 }
951 LocalBarrierEvent::RegisterLocalUpstreamOutput {
952 actor_id,
953 upstream_actor_id,
954 tx,
955 } => {
956 self.new_actor_output_request(
957 actor_id,
958 upstream_actor_id,
959 NewOutputRequest::Local(tx),
960 );
961 }
962 }
963 }
964
965 debug_assert!(
966 self.graph_states
967 .values_mut()
968 .all(|graph_state| graph_state.may_have_collected_all().is_none())
969 );
970 Poll::Pending
971 }
972}
973
974impl DatabaseManagedBarrierState {
975 #[must_use]
976 pub(super) fn collect(
977 &mut self,
978 actor_id: ActorId,
979 epoch: EpochPair,
980 ) -> Option<(PartialGraphId, Barrier)> {
981 let (prev_partial_graph_id, is_finished) = self
982 .actor_states
983 .get_mut(&actor_id)
984 .expect("should exist")
985 .collect(epoch);
986 if is_finished {
987 let state = self.actor_states.remove(&actor_id).expect("should exist");
988 if let Some(monitor_task_handle) = state.monitor_task_handle {
989 monitor_task_handle.abort();
990 }
991 }
992 let prev_graph_state = self
993 .graph_states
994 .get_mut(&prev_partial_graph_id)
995 .expect("should exist");
996 prev_graph_state.collect(actor_id, epoch);
997 prev_graph_state
998 .may_have_collected_all()
999 .map(|barrier| (prev_partial_graph_id, barrier))
1000 }
1001
1002 pub(super) fn pop_barrier_to_complete(
1003 &mut self,
1004 partial_graph_id: PartialGraphId,
1005 prev_epoch: u64,
1006 ) -> (
1007 Barrier,
1008 Option<HashSet<TableId>>,
1009 Vec<PbCreateMviewProgress>,
1010 ) {
1011 self.graph_states
1012 .get_mut(&partial_graph_id)
1013 .expect("should exist")
1014 .pop_barrier_to_complete(prev_epoch)
1015 }
1016
1017 async fn try_find_root_actor_failure(
1021 &mut self,
1022 first_failure: Option<(Option<ActorId>, StreamError)>,
1023 ) -> Option<ScoredStreamError> {
1024 let mut later_errs = vec![];
1025 let _ = tokio::time::timeout(Duration::from_secs(3), async {
1027 let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
1028 if let Some((Some(failed_actor), _)) = &first_failure {
1029 uncollected_actors.remove(failed_actor);
1030 }
1031 while !uncollected_actors.is_empty()
1032 && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
1033 {
1034 uncollected_actors.remove(&actor_id);
1035 later_errs.push(error);
1036 }
1037 })
1038 .await;
1039
1040 first_failure
1041 .into_iter()
1042 .map(|(_, err)| err)
1043 .chain(later_errs.into_iter())
1044 .map(|e| e.with_score())
1045 .max_by_key(|e| e.score)
1046 }
1047}
1048
1049impl PartialGraphManagedBarrierState {
1050 fn may_have_collected_all(&mut self) -> Option<Barrier> {
1054 for barrier_state in self.epoch_barrier_state_map.values_mut() {
1055 match &barrier_state.inner {
1056 ManagedBarrierStateInner::Issued(IssuedState {
1057 remaining_actors, ..
1058 }) if remaining_actors.is_empty() => {}
1059 ManagedBarrierStateInner::AllCollected(_) => {
1060 continue;
1061 }
1062 ManagedBarrierStateInner::Issued(_) => {
1063 break;
1064 }
1065 }
1066
1067 self.streaming_metrics.barrier_manager_progress.inc();
1068
1069 let create_mview_progress = self
1070 .create_mview_progress
1071 .remove(&barrier_state.barrier.epoch.curr)
1072 .unwrap_or_default()
1073 .into_iter()
1074 .map(|(actor, state)| state.to_pb(actor))
1075 .collect();
1076
1077 let prev_state = replace(
1078 &mut barrier_state.inner,
1079 ManagedBarrierStateInner::AllCollected(create_mview_progress),
1080 );
1081
1082 must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState {
1083 barrier_inflight_latency: timer,
1084 ..
1085 }) => {
1086 timer.observe_duration();
1087 });
1088
1089 return Some(barrier_state.barrier.clone());
1090 }
1091 None
1092 }
1093
1094 fn pop_barrier_to_complete(
1095 &mut self,
1096 prev_epoch: u64,
1097 ) -> (
1098 Barrier,
1099 Option<HashSet<TableId>>,
1100 Vec<PbCreateMviewProgress>,
1101 ) {
1102 let (popped_prev_epoch, barrier_state) = self
1103 .epoch_barrier_state_map
1104 .pop_first()
1105 .expect("should exist");
1106
1107 assert_eq!(prev_epoch, popped_prev_epoch);
1108
1109 let create_mview_progress = must_match!(barrier_state.inner, ManagedBarrierStateInner::AllCollected(create_mview_progress) => {
1110 create_mview_progress
1111 });
1112 (
1113 barrier_state.barrier,
1114 barrier_state.table_ids,
1115 create_mview_progress,
1116 )
1117 }
1118}
1119
1120impl PartialGraphManagedBarrierState {
1121 pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) {
1123 tracing::debug!(
1124 target: "events::stream::barrier::manager::collect",
1125 ?epoch, actor_id, state = ?self.epoch_barrier_state_map,
1126 "collect_barrier",
1127 );
1128
1129 match self.epoch_barrier_state_map.get_mut(&epoch.prev) {
1130 None => {
1131 panic!(
1135 "cannot collect new actor barrier {:?} at current state: None",
1136 epoch,
1137 )
1138 }
1139 Some(&mut BarrierState {
1140 ref barrier,
1141 inner:
1142 ManagedBarrierStateInner::Issued(IssuedState {
1143 ref mut remaining_actors,
1144 ..
1145 }),
1146 ..
1147 }) => {
1148 let exist = remaining_actors.remove(&actor_id);
1149 assert!(
1150 exist,
1151 "the actor doesn't exist. actor_id: {:?}, curr_epoch: {:?}",
1152 actor_id, epoch.curr
1153 );
1154 assert_eq!(barrier.epoch.curr, epoch.curr);
1155 }
1156 Some(BarrierState { inner, .. }) => {
1157 panic!(
1158 "cannot collect new actor barrier {:?} at current state: {:?}",
1159 epoch, inner
1160 )
1161 }
1162 }
1163 }
1164
1165 pub(super) fn transform_to_issued(
1168 &mut self,
1169 barrier: &Barrier,
1170 actor_ids_to_collect: impl IntoIterator<Item = ActorId>,
1171 table_ids: HashSet<TableId>,
1172 ) {
1173 let timer = self
1174 .streaming_metrics
1175 .barrier_inflight_latency
1176 .start_timer();
1177
1178 if let Some(hummock) = self.state_store.as_hummock() {
1179 hummock.start_epoch(barrier.epoch.curr, table_ids.clone());
1180 }
1181
1182 let table_ids = match barrier.kind {
1183 BarrierKind::Unspecified => {
1184 unreachable!()
1185 }
1186 BarrierKind::Initial => {
1187 assert!(
1188 self.prev_barrier_table_ids.is_none(),
1189 "non empty table_ids at initial barrier: {:?}",
1190 self.prev_barrier_table_ids
1191 );
1192 info!(epoch = ?barrier.epoch, "initialize at Initial barrier");
1193 self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1194 None
1195 }
1196 BarrierKind::Barrier => {
1197 if let Some((prev_epoch, prev_table_ids)) = self.prev_barrier_table_ids.as_mut() {
1198 assert_eq!(prev_epoch.curr, barrier.epoch.prev);
1199 assert_eq!(prev_table_ids, &table_ids);
1200 *prev_epoch = barrier.epoch;
1201 } else {
1202 info!(epoch = ?barrier.epoch, "initialize at non-checkpoint barrier");
1203 self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1204 }
1205 None
1206 }
1207 BarrierKind::Checkpoint => Some(
1208 if let Some((prev_epoch, prev_table_ids)) = self
1209 .prev_barrier_table_ids
1210 .replace((barrier.epoch, table_ids))
1211 && prev_epoch.curr == barrier.epoch.prev
1212 {
1213 prev_table_ids
1214 } else {
1215 debug!(epoch = ?barrier.epoch, "reinitialize at Checkpoint barrier");
1216 HashSet::new()
1217 },
1218 ),
1219 };
1220
1221 if let Some(&mut BarrierState { ref inner, .. }) =
1222 self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev)
1223 {
1224 {
1225 panic!(
1226 "barrier epochs{:?} state has already been `Issued`. Current state: {:?}",
1227 barrier.epoch, inner
1228 );
1229 }
1230 };
1231
1232 self.epoch_barrier_state_map.insert(
1233 barrier.epoch.prev,
1234 BarrierState {
1235 barrier: barrier.clone(),
1236 inner: ManagedBarrierStateInner::Issued(IssuedState {
1237 remaining_actors: BTreeSet::from_iter(actor_ids_to_collect),
1238 barrier_inflight_latency: timer,
1239 }),
1240 table_ids,
1241 },
1242 );
1243 }
1244
1245 #[cfg(test)]
1246 async fn pop_next_completed_epoch(&mut self) -> u64 {
1247 if let Some(barrier) = self.may_have_collected_all() {
1248 self.pop_barrier_to_complete(barrier.epoch.prev);
1249 return barrier.epoch.prev;
1250 }
1251 pending().await
1252 }
1253}
1254
1255#[cfg(test)]
1256mod tests {
1257 use std::collections::HashSet;
1258
1259 use risingwave_common::util::epoch::test_epoch;
1260
1261 use crate::executor::Barrier;
1262 use crate::task::barrier_worker::managed_state::PartialGraphManagedBarrierState;
1263
1264 #[tokio::test]
1265 async fn test_managed_state_add_actor() {
1266 let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1267 let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1268 let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1269 let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1270 let actor_ids_to_collect1 = HashSet::from([1, 2]);
1271 let actor_ids_to_collect2 = HashSet::from([1, 2]);
1272 let actor_ids_to_collect3 = HashSet::from([1, 2, 3]);
1273 managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1274 managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1275 managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1276 managed_barrier_state.collect(1, barrier1.epoch);
1277 managed_barrier_state.collect(2, barrier1.epoch);
1278 assert_eq!(
1279 managed_barrier_state.pop_next_completed_epoch().await,
1280 test_epoch(0)
1281 );
1282 assert_eq!(
1283 managed_barrier_state
1284 .epoch_barrier_state_map
1285 .first_key_value()
1286 .unwrap()
1287 .0,
1288 &test_epoch(1)
1289 );
1290 managed_barrier_state.collect(1, barrier2.epoch);
1291 managed_barrier_state.collect(1, barrier3.epoch);
1292 managed_barrier_state.collect(2, barrier2.epoch);
1293 assert_eq!(
1294 managed_barrier_state.pop_next_completed_epoch().await,
1295 test_epoch(1)
1296 );
1297 assert_eq!(
1298 managed_barrier_state
1299 .epoch_barrier_state_map
1300 .first_key_value()
1301 .unwrap()
1302 .0,
1303 &test_epoch(2)
1304 );
1305 managed_barrier_state.collect(2, barrier3.epoch);
1306 managed_barrier_state.collect(3, barrier3.epoch);
1307 assert_eq!(
1308 managed_barrier_state.pop_next_completed_epoch().await,
1309 test_epoch(2)
1310 );
1311 assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1312 }
1313
1314 #[tokio::test]
1315 async fn test_managed_state_stop_actor() {
1316 let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1317 let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1318 let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1319 let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1320 let actor_ids_to_collect1 = HashSet::from([1, 2, 3, 4]);
1321 let actor_ids_to_collect2 = HashSet::from([1, 2, 3]);
1322 let actor_ids_to_collect3 = HashSet::from([1, 2]);
1323 managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1324 managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1325 managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1326
1327 managed_barrier_state.collect(1, barrier1.epoch);
1328 managed_barrier_state.collect(1, barrier2.epoch);
1329 managed_barrier_state.collect(1, barrier3.epoch);
1330 managed_barrier_state.collect(2, barrier1.epoch);
1331 managed_barrier_state.collect(2, barrier2.epoch);
1332 managed_barrier_state.collect(2, barrier3.epoch);
1333 assert_eq!(
1334 managed_barrier_state
1335 .epoch_barrier_state_map
1336 .first_key_value()
1337 .unwrap()
1338 .0,
1339 &0
1340 );
1341 managed_barrier_state.collect(3, barrier1.epoch);
1342 managed_barrier_state.collect(3, barrier2.epoch);
1343 assert_eq!(
1344 managed_barrier_state
1345 .epoch_barrier_state_map
1346 .first_key_value()
1347 .unwrap()
1348 .0,
1349 &0
1350 );
1351 managed_barrier_state.collect(4, barrier1.epoch);
1352 assert_eq!(
1353 managed_barrier_state.pop_next_completed_epoch().await,
1354 test_epoch(0)
1355 );
1356 assert_eq!(
1357 managed_barrier_state.pop_next_completed_epoch().await,
1358 test_epoch(1)
1359 );
1360 assert_eq!(
1361 managed_barrier_state.pop_next_completed_epoch().await,
1362 test_epoch(2)
1363 );
1364 assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1365 }
1366}