1use std::cell::LazyCell;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::fmt::{Debug, Display, Formatter};
18use std::future::{Future, pending, poll_fn};
19use std::mem::replace;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use anyhow::anyhow;
25use futures::FutureExt;
26use futures::stream::FuturesOrdered;
27use prometheus::HistogramTimer;
28use risingwave_common::catalog::{DatabaseId, TableId};
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_pb::stream_plan::barrier::BarrierKind;
31use risingwave_pb::stream_service::barrier_complete_response::PbCdcTableBackfillProgress;
32use risingwave_storage::StateStoreImpl;
33use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
34use tokio::sync::{mpsc, oneshot};
35use tokio::task::JoinHandle;
36
37use crate::error::{StreamError, StreamResult};
38use crate::executor::Barrier;
39use crate::executor::monitor::StreamingMetrics;
40use crate::task::progress::BackfillState;
41use crate::task::{
42 ActorId, LocalBarrierEvent, LocalBarrierManager, NewOutputRequest, PartialGraphId,
43 StreamActorManager, UpDownActorIds,
44};
45
46struct IssuedState {
47 pub remaining_actors: BTreeSet<ActorId>,
49
50 pub barrier_inflight_latency: HistogramTimer,
51}
52
53impl Debug for IssuedState {
54 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("IssuedState")
56 .field("remaining_actors", &self.remaining_actors)
57 .finish()
58 }
59}
60
61#[derive(Debug)]
63enum ManagedBarrierStateInner {
64 Issued(IssuedState),
66
67 AllCollected {
69 create_mview_progress: Vec<PbCreateMviewProgress>,
70 load_finished_source_ids: Vec<u32>,
71 cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
72 },
73}
74
75#[derive(Debug)]
76struct BarrierState {
77 barrier: Barrier,
78 table_ids: Option<HashSet<TableId>>,
80 inner: ManagedBarrierStateInner,
81}
82
83use risingwave_common::must_match;
84use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
85use risingwave_pb::stream_service::InjectBarrierRequest;
86use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
87use risingwave_pb::stream_service::streaming_control_stream_request::{
88 DatabaseInitialPartialGraph, InitialPartialGraph,
89};
90
91use crate::executor::exchange::permit;
92use crate::executor::exchange::permit::channel_from_config;
93use crate::task::barrier_worker::ScoredStreamError;
94use crate::task::barrier_worker::await_epoch_completed_future::AwaitEpochCompletedFuture;
95use crate::task::cdc_progress::CdcTableBackfillState;
96
97pub(super) struct ManagedBarrierStateDebugInfo<'a> {
98 running_actors: BTreeSet<ActorId>,
99 graph_states: &'a HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
100}
101
102impl Display for ManagedBarrierStateDebugInfo<'_> {
103 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
104 write!(f, "running_actors: ")?;
105 for actor_id in &self.running_actors {
106 write!(f, "{}, ", actor_id)?;
107 }
108 for (partial_graph_id, graph_states) in self.graph_states {
109 writeln!(f, "--- Partial Group {}", partial_graph_id.0)?;
110 write!(f, "{}", graph_states)?;
111 }
112 Ok(())
113 }
114}
115
116impl Display for &'_ PartialGraphManagedBarrierState {
117 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
118 let mut prev_epoch = 0u64;
119 for (epoch, barrier_state) in &self.epoch_barrier_state_map {
120 write!(f, "> Epoch {}: ", epoch)?;
121 match &barrier_state.inner {
122 ManagedBarrierStateInner::Issued(state) => {
123 write!(
124 f,
125 "Issued [{:?}]. Remaining actors: [",
126 barrier_state.barrier.kind
127 )?;
128 let mut is_prev_epoch_issued = false;
129 if prev_epoch != 0 {
130 let bs = &self.epoch_barrier_state_map[&prev_epoch];
131 if let ManagedBarrierStateInner::Issued(IssuedState {
132 remaining_actors: remaining_actors_prev,
133 ..
134 }) = &bs.inner
135 {
136 is_prev_epoch_issued = true;
138 let mut duplicates = 0usize;
139 for actor_id in &state.remaining_actors {
140 if !remaining_actors_prev.contains(actor_id) {
141 write!(f, "{}, ", actor_id)?;
142 } else {
143 duplicates += 1;
144 }
145 }
146 if duplicates > 0 {
147 write!(f, "...and {} actors in prev epoch", duplicates)?;
148 }
149 }
150 }
151 if !is_prev_epoch_issued {
152 for actor_id in &state.remaining_actors {
153 write!(f, "{}, ", actor_id)?;
154 }
155 }
156 write!(f, "]")?;
157 }
158 ManagedBarrierStateInner::AllCollected { .. } => {
159 write!(f, "AllCollected")?;
160 }
161 }
162 prev_epoch = *epoch;
163 writeln!(f)?;
164 }
165
166 if !self.create_mview_progress.is_empty() {
167 writeln!(f, "Create MView Progress:")?;
168 for (epoch, progress) in &self.create_mview_progress {
169 write!(f, "> Epoch {}:", epoch)?;
170 for (actor_id, state) in progress {
171 write!(f, ">> Actor {}: {}, ", actor_id, state)?;
172 }
173 }
174 }
175
176 Ok(())
177 }
178}
179
180enum InflightActorStatus {
181 IssuedFirst(Vec<Barrier>),
183 Running(u64),
185}
186
187impl InflightActorStatus {
188 fn max_issued_epoch(&self) -> u64 {
189 match self {
190 InflightActorStatus::Running(epoch) => *epoch,
191 InflightActorStatus::IssuedFirst(issued_barriers) => {
192 issued_barriers.last().expect("non-empty").epoch.prev
193 }
194 }
195 }
196}
197
198pub(crate) struct InflightActorState {
199 actor_id: ActorId,
200 barrier_senders: Vec<mpsc::UnboundedSender<Barrier>>,
201 pub(crate) inflight_barriers: BTreeMap<u64, PartialGraphId>,
203 status: InflightActorStatus,
204 is_stopping: bool,
206
207 new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
208 join_handle: JoinHandle<()>,
209 monitor_task_handle: Option<JoinHandle<()>>,
210}
211
212impl InflightActorState {
213 pub(super) fn start(
214 actor_id: ActorId,
215 initial_partial_graph_id: PartialGraphId,
216 initial_barrier: &Barrier,
217 new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
218 join_handle: JoinHandle<()>,
219 monitor_task_handle: Option<JoinHandle<()>>,
220 ) -> Self {
221 Self {
222 actor_id,
223 barrier_senders: vec![],
224 inflight_barriers: BTreeMap::from_iter([(
225 initial_barrier.epoch.prev,
226 initial_partial_graph_id,
227 )]),
228 status: InflightActorStatus::IssuedFirst(vec![initial_barrier.clone()]),
229 is_stopping: false,
230 new_output_request_tx,
231 join_handle,
232 monitor_task_handle,
233 }
234 }
235
236 pub(super) fn issue_barrier(
237 &mut self,
238 partial_graph_id: PartialGraphId,
239 barrier: &Barrier,
240 is_stop: bool,
241 ) -> StreamResult<()> {
242 assert!(barrier.epoch.prev > self.status.max_issued_epoch());
243
244 for barrier_sender in &self.barrier_senders {
245 barrier_sender.send(barrier.clone()).map_err(|_| {
246 StreamError::barrier_send(
247 barrier.clone(),
248 self.actor_id,
249 "failed to send to registered sender",
250 )
251 })?;
252 }
253
254 assert!(
255 self.inflight_barriers
256 .insert(barrier.epoch.prev, partial_graph_id)
257 .is_none()
258 );
259
260 match &mut self.status {
261 InflightActorStatus::IssuedFirst(pending_barriers) => {
262 pending_barriers.push(barrier.clone());
263 }
264 InflightActorStatus::Running(prev_epoch) => {
265 *prev_epoch = barrier.epoch.prev;
266 }
267 };
268
269 if is_stop {
270 assert!(!self.is_stopping, "stopped actor should not issue barrier");
271 self.is_stopping = true;
272 }
273 Ok(())
274 }
275
276 pub(super) fn collect(&mut self, epoch: EpochPair) -> (PartialGraphId, bool) {
277 let (prev_epoch, prev_partial_graph_id) =
278 self.inflight_barriers.pop_first().expect("should exist");
279 assert_eq!(prev_epoch, epoch.prev);
280 match &self.status {
281 InflightActorStatus::IssuedFirst(pending_barriers) => {
282 assert_eq!(
283 prev_epoch,
284 pending_barriers.first().expect("non-empty").epoch.prev
285 );
286 self.status = InflightActorStatus::Running(
287 pending_barriers.last().expect("non-empty").epoch.prev,
288 );
289 }
290 InflightActorStatus::Running(_) => {}
291 }
292 (
293 prev_partial_graph_id,
294 self.inflight_barriers.is_empty() && self.is_stopping,
295 )
296 }
297}
298
299pub(crate) struct PartialGraphManagedBarrierState {
301 epoch_barrier_state_map: BTreeMap<u64, BarrierState>,
305
306 prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,
307
308 mv_depended_subscriptions: HashMap<TableId, HashSet<u32>>,
309
310 pub(crate) create_mview_progress: HashMap<u64, HashMap<ActorId, BackfillState>>,
318
319 pub(crate) load_finished_source_ids: HashMap<u64, HashSet<u32>>,
322
323 pub(crate) cdc_table_backfill_progress: HashMap<u64, HashMap<ActorId, CdcTableBackfillState>>,
324
325 state_store: StateStoreImpl,
326
327 streaming_metrics: Arc<StreamingMetrics>,
328}
329
330impl PartialGraphManagedBarrierState {
331 pub(super) fn new(actor_manager: &StreamActorManager) -> Self {
332 Self::new_inner(
333 actor_manager.env.state_store(),
334 actor_manager.streaming_metrics.clone(),
335 )
336 }
337
338 fn new_inner(state_store: StateStoreImpl, streaming_metrics: Arc<StreamingMetrics>) -> Self {
339 Self {
340 epoch_barrier_state_map: Default::default(),
341 prev_barrier_table_ids: None,
342 mv_depended_subscriptions: Default::default(),
343 create_mview_progress: Default::default(),
344 load_finished_source_ids: Default::default(),
345 cdc_table_backfill_progress: Default::default(),
346 state_store,
347 streaming_metrics,
348 }
349 }
350
351 #[cfg(test)]
352 pub(crate) fn for_test() -> Self {
353 Self::new_inner(
354 StateStoreImpl::for_test(),
355 Arc::new(StreamingMetrics::unused()),
356 )
357 }
358
359 pub(super) fn is_empty(&self) -> bool {
360 self.epoch_barrier_state_map.is_empty()
361 }
362}
363
364pub(crate) struct SuspendedDatabaseState {
365 pub(super) suspend_time: Instant,
366 inner: DatabaseManagedBarrierState,
367 failure: Option<(Option<ActorId>, StreamError)>,
368}
369
370impl SuspendedDatabaseState {
371 fn new(
372 state: DatabaseManagedBarrierState,
373 failure: Option<(Option<ActorId>, StreamError)>,
374 _completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>, ) -> Self {
376 Self {
377 suspend_time: Instant::now(),
378 inner: state,
379 failure,
380 }
381 }
382
383 async fn reset(mut self) -> ResetDatabaseOutput {
384 let root_err = self.inner.try_find_root_actor_failure(self.failure).await;
385 self.inner.abort_and_wait_actors().await;
386 if let Some(hummock) = self.inner.actor_manager.env.state_store().as_hummock() {
387 hummock.clear_tables(self.inner.table_ids).await;
388 }
389 ResetDatabaseOutput { root_err }
390 }
391}
392
393pub(crate) struct ResettingDatabaseState {
394 join_handle: JoinHandle<ResetDatabaseOutput>,
395 reset_request_id: u32,
396}
397
398pub(crate) struct ResetDatabaseOutput {
399 pub(crate) root_err: Option<ScoredStreamError>,
400}
401
402pub(crate) enum DatabaseStatus {
403 ReceivedExchangeRequest(
404 Vec<(
405 UpDownActorIds,
406 oneshot::Sender<StreamResult<permit::Receiver>>,
407 )>,
408 ),
409 Running(DatabaseManagedBarrierState),
410 Suspended(SuspendedDatabaseState),
411 Resetting(ResettingDatabaseState),
412 Unspecified,
414}
415
416impl DatabaseStatus {
417 pub(crate) async fn abort(&mut self) {
418 match self {
419 DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
420 for (_, sender) in pending_requests.drain(..) {
421 let _ = sender.send(Err(anyhow!("database aborted").into()));
422 }
423 }
424 DatabaseStatus::Running(state) => {
425 state.abort_and_wait_actors().await;
426 }
427 DatabaseStatus::Suspended(SuspendedDatabaseState { inner: state, .. }) => {
428 state.abort_and_wait_actors().await;
429 }
430 DatabaseStatus::Resetting(state) => {
431 (&mut state.join_handle)
432 .await
433 .expect("failed to join reset database join handle");
434 }
435 DatabaseStatus::Unspecified => {
436 unreachable!()
437 }
438 }
439 }
440
441 pub(crate) fn state_for_request(&mut self) -> Option<&mut DatabaseManagedBarrierState> {
442 match self {
443 DatabaseStatus::ReceivedExchangeRequest(_) => {
444 unreachable!("should not handle request")
445 }
446 DatabaseStatus::Running(state) => Some(state),
447 DatabaseStatus::Suspended(_) => None,
448 DatabaseStatus::Resetting(_) => {
449 unreachable!("should not receive further request during cleaning")
450 }
451 DatabaseStatus::Unspecified => {
452 unreachable!()
453 }
454 }
455 }
456
457 pub(super) fn poll_next_event(
458 &mut self,
459 cx: &mut Context<'_>,
460 ) -> Poll<ManagedBarrierStateEvent> {
461 match self {
462 DatabaseStatus::ReceivedExchangeRequest(_) => Poll::Pending,
463 DatabaseStatus::Running(state) => state.poll_next_event(cx),
464 DatabaseStatus::Suspended(_) => Poll::Pending,
465 DatabaseStatus::Resetting(state) => state.join_handle.poll_unpin(cx).map(|result| {
466 let output = result.expect("should be able to join");
467 ManagedBarrierStateEvent::DatabaseReset(output, state.reset_request_id)
468 }),
469 DatabaseStatus::Unspecified => {
470 unreachable!()
471 }
472 }
473 }
474
475 pub(super) fn suspend(
476 &mut self,
477 failed_actor: Option<ActorId>,
478 err: StreamError,
479 completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
480 ) {
481 let state = must_match!(replace(self, DatabaseStatus::Unspecified), DatabaseStatus::Running(state) => state);
482 *self = DatabaseStatus::Suspended(SuspendedDatabaseState::new(
483 state,
484 Some((failed_actor, err)),
485 completing_futures,
486 ));
487 }
488
489 pub(super) fn start_reset(
490 &mut self,
491 database_id: DatabaseId,
492 completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
493 reset_request_id: u32,
494 ) {
495 let join_handle = match replace(self, DatabaseStatus::Unspecified) {
496 DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
497 for (_, sender) in pending_requests {
498 let _ = sender.send(Err(anyhow!("database reset").into()));
499 }
500 tokio::spawn(async move { ResetDatabaseOutput { root_err: None } })
501 }
502 DatabaseStatus::Running(state) => {
503 assert_eq!(database_id, state.database_id);
504 info!(
505 database_id = database_id.database_id,
506 reset_request_id, "start database reset from Running"
507 );
508 tokio::spawn(SuspendedDatabaseState::new(state, None, completing_futures).reset())
509 }
510 DatabaseStatus::Suspended(state) => {
511 assert!(
512 completing_futures.is_none(),
513 "should have been clear when suspended"
514 );
515 assert_eq!(database_id, state.inner.database_id);
516 info!(
517 database_id = database_id.database_id,
518 reset_request_id,
519 suspend_elapsed = ?state.suspend_time.elapsed(),
520 "start database reset after suspended"
521 );
522 tokio::spawn(state.reset())
523 }
524 DatabaseStatus::Resetting(state) => {
525 let prev_request_id = state.reset_request_id;
526 info!(
527 database_id = database_id.database_id,
528 reset_request_id, prev_request_id, "receive duplicate reset request"
529 );
530 assert!(reset_request_id > prev_request_id);
531 state.join_handle
532 }
533 DatabaseStatus::Unspecified => {
534 unreachable!()
535 }
536 };
537 *self = DatabaseStatus::Resetting(ResettingDatabaseState {
538 join_handle,
539 reset_request_id,
540 });
541 }
542}
543
544pub(crate) struct ManagedBarrierState {
545 pub(crate) databases: HashMap<DatabaseId, DatabaseStatus>,
546}
547
548pub(super) enum ManagedBarrierStateEvent {
549 BarrierCollected {
550 partial_graph_id: PartialGraphId,
551 barrier: Barrier,
552 },
553 ActorError {
554 actor_id: ActorId,
555 err: StreamError,
556 },
557 DatabaseReset(ResetDatabaseOutput, u32),
558}
559
560impl ManagedBarrierState {
561 pub(super) fn new(
562 actor_manager: Arc<StreamActorManager>,
563 initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
564 term_id: String,
565 ) -> Self {
566 let mut databases = HashMap::new();
567 for database in initial_partial_graphs {
568 let database_id = DatabaseId::new(database.database_id);
569 assert!(!databases.contains_key(&database_id));
570 let state = DatabaseManagedBarrierState::new(
571 database_id,
572 term_id.clone(),
573 actor_manager.clone(),
574 database.graphs,
575 );
576 databases.insert(database_id, DatabaseStatus::Running(state));
577 }
578
579 Self { databases }
580 }
581
582 pub(super) fn next_event(
583 &mut self,
584 ) -> impl Future<Output = (DatabaseId, ManagedBarrierStateEvent)> + '_ {
585 poll_fn(|cx| {
586 for (database_id, database) in &mut self.databases {
587 if let Poll::Ready(event) = database.poll_next_event(cx) {
588 return Poll::Ready((*database_id, event));
589 }
590 }
591 Poll::Pending
592 })
593 }
594}
595
596pub(crate) struct DatabaseManagedBarrierState {
601 database_id: DatabaseId,
602 pub(crate) actor_states: HashMap<ActorId, InflightActorState>,
603 pub(super) actor_pending_new_output_requests:
604 HashMap<ActorId, Vec<(ActorId, NewOutputRequest)>>,
605
606 pub(crate) graph_states: HashMap<PartialGraphId, PartialGraphManagedBarrierState>,
607
608 table_ids: HashSet<TableId>,
609
610 actor_manager: Arc<StreamActorManager>,
611
612 pub(super) local_barrier_manager: LocalBarrierManager,
613
614 barrier_event_rx: UnboundedReceiver<LocalBarrierEvent>,
615 pub(super) actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,
616}
617
618impl DatabaseManagedBarrierState {
619 pub(super) fn new(
621 database_id: DatabaseId,
622 term_id: String,
623 actor_manager: Arc<StreamActorManager>,
624 initial_partial_graphs: Vec<InitialPartialGraph>,
625 ) -> Self {
626 let (local_barrier_manager, barrier_event_rx, actor_failure_rx) =
627 LocalBarrierManager::new(database_id, term_id, actor_manager.env.clone());
628 Self {
629 database_id,
630 actor_states: Default::default(),
631 actor_pending_new_output_requests: Default::default(),
632 graph_states: initial_partial_graphs
633 .into_iter()
634 .map(|graph| {
635 let mut state = PartialGraphManagedBarrierState::new(&actor_manager);
636 state.add_subscriptions(graph.subscriptions);
637 (PartialGraphId::new(graph.partial_graph_id), state)
638 })
639 .collect(),
640 table_ids: Default::default(),
641 actor_manager,
642 local_barrier_manager,
643 barrier_event_rx,
644 actor_failure_rx,
645 }
646 }
647
648 pub(super) fn to_debug_info(&self) -> ManagedBarrierStateDebugInfo<'_> {
649 ManagedBarrierStateDebugInfo {
650 running_actors: self.actor_states.keys().cloned().collect(),
651 graph_states: &self.graph_states,
652 }
653 }
654
655 async fn abort_and_wait_actors(&mut self) {
656 for (actor_id, state) in &self.actor_states {
657 tracing::debug!("force stopping actor {}", actor_id);
658 state.join_handle.abort();
659 if let Some(monitor_task_handle) = &state.monitor_task_handle {
660 monitor_task_handle.abort();
661 }
662 }
663
664 for (actor_id, state) in self.actor_states.drain() {
665 tracing::debug!("join actor {}", actor_id);
666 let result = state.join_handle.await;
667 assert!(result.is_ok() || result.unwrap_err().is_cancelled());
668 }
669 }
670}
671
672impl InflightActorState {
673 pub(super) fn register_barrier_sender(
674 &mut self,
675 tx: mpsc::UnboundedSender<Barrier>,
676 ) -> StreamResult<()> {
677 match &self.status {
678 InflightActorStatus::IssuedFirst(pending_barriers) => {
679 for barrier in pending_barriers {
680 tx.send(barrier.clone()).map_err(|_| {
681 StreamError::barrier_send(
682 barrier.clone(),
683 self.actor_id,
684 "failed to send pending barriers to newly registered sender",
685 )
686 })?;
687 }
688 self.barrier_senders.push(tx);
689 }
690 InflightActorStatus::Running(_) => {
691 unreachable!("should not register barrier sender when entering Running status")
692 }
693 }
694 Ok(())
695 }
696}
697
698impl DatabaseManagedBarrierState {
699 pub(super) fn register_barrier_sender(
700 &mut self,
701 actor_id: ActorId,
702 tx: mpsc::UnboundedSender<Barrier>,
703 ) -> StreamResult<()> {
704 self.actor_states
705 .get_mut(&actor_id)
706 .expect("should exist")
707 .register_barrier_sender(tx)
708 }
709}
710
711impl PartialGraphManagedBarrierState {
712 pub(super) fn add_subscriptions(&mut self, subscriptions: Vec<SubscriptionUpstreamInfo>) {
713 for subscription_to_add in subscriptions {
714 if !self
715 .mv_depended_subscriptions
716 .entry(TableId::new(subscription_to_add.upstream_mv_table_id))
717 .or_default()
718 .insert(subscription_to_add.subscriber_id)
719 {
720 if cfg!(debug_assertions) {
721 panic!("add an existing subscription: {:?}", subscription_to_add);
722 }
723 warn!(?subscription_to_add, "add an existing subscription");
724 }
725 }
726 }
727
728 pub(super) fn remove_subscriptions(&mut self, subscriptions: Vec<SubscriptionUpstreamInfo>) {
729 for subscription_to_remove in subscriptions {
730 let upstream_table_id = TableId::new(subscription_to_remove.upstream_mv_table_id);
731 let Some(subscribers) = self.mv_depended_subscriptions.get_mut(&upstream_table_id)
732 else {
733 if cfg!(debug_assertions) {
734 panic!(
735 "unable to find upstream mv table to remove: {:?}",
736 subscription_to_remove
737 );
738 }
739 warn!(
740 ?subscription_to_remove,
741 "unable to find upstream mv table to remove"
742 );
743 continue;
744 };
745 if !subscribers.remove(&subscription_to_remove.subscriber_id) {
746 if cfg!(debug_assertions) {
747 panic!(
748 "unable to find subscriber to remove: {:?}",
749 subscription_to_remove
750 );
751 }
752 warn!(
753 ?subscription_to_remove,
754 "unable to find subscriber to remove"
755 );
756 }
757 if subscribers.is_empty() {
758 self.mv_depended_subscriptions.remove(&upstream_table_id);
759 }
760 }
761 }
762}
763
764impl DatabaseManagedBarrierState {
765 pub(super) fn transform_to_issued(
766 &mut self,
767 barrier: &Barrier,
768 request: InjectBarrierRequest,
769 ) -> StreamResult<()> {
770 let partial_graph_id = PartialGraphId::new(request.partial_graph_id);
771 let actor_to_stop = barrier.all_stop_actors();
772 let is_stop_actor = |actor_id| {
773 actor_to_stop
774 .map(|actors| actors.contains(&actor_id))
775 .unwrap_or(false)
776 };
777 let graph_state = self
778 .graph_states
779 .get_mut(&partial_graph_id)
780 .expect("should exist");
781
782 graph_state.add_subscriptions(request.subscriptions_to_add);
783 graph_state.remove_subscriptions(request.subscriptions_to_remove);
784
785 let table_ids =
786 HashSet::from_iter(request.table_ids_to_sync.iter().cloned().map(TableId::new));
787 self.table_ids.extend(table_ids.iter().cloned());
788
789 graph_state.transform_to_issued(
790 barrier,
791 request.actor_ids_to_collect.iter().cloned(),
792 table_ids,
793 );
794
795 let mut new_actors = HashSet::new();
796 let subscriptions =
797 LazyCell::new(|| Arc::new(graph_state.mv_depended_subscriptions.clone()));
798 for (node, fragment_id, actor) in
799 request
800 .actors_to_build
801 .into_iter()
802 .flat_map(|fragment_actors| {
803 let node = Arc::new(fragment_actors.node.unwrap());
804 fragment_actors
805 .actors
806 .into_iter()
807 .map(move |actor| (node.clone(), fragment_actors.fragment_id, actor))
808 })
809 {
810 let actor_id = actor.actor_id;
811 assert!(!is_stop_actor(actor_id));
812 assert!(new_actors.insert(actor_id));
813 assert!(request.actor_ids_to_collect.contains(&actor_id));
814 let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
815 if let Some(pending_requests) = self.actor_pending_new_output_requests.remove(&actor_id)
816 {
817 for request in pending_requests {
818 let _ = new_output_request_tx.send(request);
819 }
820 }
821 let (join_handle, monitor_join_handle) = self.actor_manager.spawn_actor(
822 actor,
823 fragment_id,
824 node,
825 (*subscriptions).clone(),
826 self.local_barrier_manager.clone(),
827 new_output_request_rx,
828 );
829 assert!(
830 self.actor_states
831 .try_insert(
832 actor_id,
833 InflightActorState::start(
834 actor_id,
835 partial_graph_id,
836 barrier,
837 new_output_request_tx,
838 join_handle,
839 monitor_join_handle
840 )
841 )
842 .is_ok()
843 );
844 }
845
846 if cfg!(test) {
850 for actor_id in &request.actor_ids_to_collect {
851 if !self.actor_states.contains_key(actor_id) {
852 let (tx, rx) = unbounded_channel();
853 let join_handle = self.actor_manager.runtime.spawn(async move {
854 let _ = rx;
856 pending().await
857 });
858 assert!(
859 self.actor_states
860 .try_insert(
861 *actor_id,
862 InflightActorState::start(
863 *actor_id,
864 partial_graph_id,
865 barrier,
866 tx,
867 join_handle,
868 None,
869 )
870 )
871 .is_ok()
872 );
873 new_actors.insert(*actor_id);
874 }
875 }
876 }
877
878 for actor_id in &request.actor_ids_to_collect {
881 if new_actors.contains(actor_id) {
882 continue;
883 }
884 self.actor_states
885 .get_mut(actor_id)
886 .unwrap_or_else(|| {
887 panic!(
888 "should exist: {} {:?}",
889 actor_id, request.actor_ids_to_collect
890 );
891 })
892 .issue_barrier(partial_graph_id, barrier, is_stop_actor(*actor_id))?;
893 }
894
895 Ok(())
896 }
897
898 pub(super) fn new_actor_remote_output_request(
899 &mut self,
900 actor_id: ActorId,
901 upstream_actor_id: ActorId,
902 result_sender: oneshot::Sender<StreamResult<permit::Receiver>>,
903 ) {
904 let (tx, rx) = channel_from_config(self.local_barrier_manager.env.config());
905 self.new_actor_output_request(actor_id, upstream_actor_id, NewOutputRequest::Remote(tx));
906 let _ = result_sender.send(Ok(rx));
907 }
908
909 pub(super) fn new_actor_output_request(
910 &mut self,
911 actor_id: ActorId,
912 upstream_actor_id: ActorId,
913 request: NewOutputRequest,
914 ) {
915 if let Some(actor) = self.actor_states.get_mut(&upstream_actor_id) {
916 let _ = actor.new_output_request_tx.send((actor_id, request));
917 } else {
918 self.actor_pending_new_output_requests
919 .entry(upstream_actor_id)
920 .or_default()
921 .push((actor_id, request));
922 }
923 }
924
925 pub(super) fn poll_next_event(
927 &mut self,
928 cx: &mut Context<'_>,
929 ) -> Poll<ManagedBarrierStateEvent> {
930 if let Poll::Ready(option) = self.actor_failure_rx.poll_recv(cx) {
931 let (actor_id, err) = option.expect("non-empty when tx in local_barrier_manager");
932 return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
933 }
934 for (partial_graph_id, graph_state) in &mut self.graph_states {
936 if let Some(barrier) = graph_state.may_have_collected_all() {
937 return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
938 partial_graph_id: *partial_graph_id,
939 barrier,
940 });
941 }
942 }
943 while let Poll::Ready(event) = self.barrier_event_rx.poll_recv(cx) {
944 match event.expect("non-empty when tx in local_barrier_manager") {
945 LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => {
946 if let Some((partial_graph_id, barrier)) = self.collect(actor_id, epoch) {
947 return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
948 partial_graph_id,
949 barrier,
950 });
951 }
952 }
953 LocalBarrierEvent::ReportCreateProgress {
954 epoch,
955 actor,
956 state,
957 } => {
958 self.update_create_mview_progress(epoch, actor, state);
959 }
960 LocalBarrierEvent::ReportSourceLoadFinished {
961 epoch,
962 actor_id,
963 table_id,
964 associated_source_id,
965 } => {
966 self.report_source_load_finished(
967 epoch,
968 actor_id,
969 table_id,
970 associated_source_id,
971 );
972 }
973 LocalBarrierEvent::RegisterBarrierSender {
974 actor_id,
975 barrier_sender,
976 } => {
977 if let Err(err) = self.register_barrier_sender(actor_id, barrier_sender) {
978 return Poll::Ready(ManagedBarrierStateEvent::ActorError { actor_id, err });
979 }
980 }
981 LocalBarrierEvent::RegisterLocalUpstreamOutput {
982 actor_id,
983 upstream_actor_id,
984 tx,
985 } => {
986 self.new_actor_output_request(
987 actor_id,
988 upstream_actor_id,
989 NewOutputRequest::Local(tx),
990 );
991 }
992 LocalBarrierEvent::ReportCdcTableBackfillProgress {
993 actor_id,
994 epoch,
995 state,
996 } => {
997 self.update_cdc_table_backfill_progress(epoch, actor_id, state);
998 }
999 }
1000 }
1001
1002 debug_assert!(
1003 self.graph_states
1004 .values_mut()
1005 .all(|graph_state| graph_state.may_have_collected_all().is_none())
1006 );
1007 Poll::Pending
1008 }
1009}
1010
1011impl DatabaseManagedBarrierState {
1012 #[must_use]
1013 pub(super) fn collect(
1014 &mut self,
1015 actor_id: ActorId,
1016 epoch: EpochPair,
1017 ) -> Option<(PartialGraphId, Barrier)> {
1018 let (prev_partial_graph_id, is_finished) = self
1019 .actor_states
1020 .get_mut(&actor_id)
1021 .expect("should exist")
1022 .collect(epoch);
1023 if is_finished {
1024 let state = self.actor_states.remove(&actor_id).expect("should exist");
1025 if let Some(monitor_task_handle) = state.monitor_task_handle {
1026 monitor_task_handle.abort();
1027 }
1028 }
1029 let prev_graph_state = self
1030 .graph_states
1031 .get_mut(&prev_partial_graph_id)
1032 .expect("should exist");
1033 prev_graph_state.collect(actor_id, epoch);
1034 prev_graph_state
1035 .may_have_collected_all()
1036 .map(|barrier| (prev_partial_graph_id, barrier))
1037 }
1038
1039 #[allow(clippy::type_complexity)]
1040 pub(super) fn pop_barrier_to_complete(
1041 &mut self,
1042 partial_graph_id: PartialGraphId,
1043 prev_epoch: u64,
1044 ) -> (
1045 Barrier,
1046 Option<HashSet<TableId>>,
1047 Vec<PbCreateMviewProgress>,
1048 Vec<u32>,
1049 Vec<PbCdcTableBackfillProgress>,
1050 ) {
1051 self.graph_states
1052 .get_mut(&partial_graph_id)
1053 .expect("should exist")
1054 .pop_barrier_to_complete(prev_epoch)
1055 }
1056
1057 async fn try_find_root_actor_failure(
1061 &mut self,
1062 first_failure: Option<(Option<ActorId>, StreamError)>,
1063 ) -> Option<ScoredStreamError> {
1064 let mut later_errs = vec![];
1065 let _ = tokio::time::timeout(Duration::from_secs(3), async {
1067 let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
1068 if let Some((Some(failed_actor), _)) = &first_failure {
1069 uncollected_actors.remove(failed_actor);
1070 }
1071 while !uncollected_actors.is_empty()
1072 && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
1073 {
1074 uncollected_actors.remove(&actor_id);
1075 later_errs.push(error);
1076 }
1077 })
1078 .await;
1079
1080 first_failure
1081 .into_iter()
1082 .map(|(_, err)| err)
1083 .chain(later_errs.into_iter())
1084 .map(|e| e.with_score())
1085 .max_by_key(|e| e.score)
1086 }
1087
1088 pub(super) fn report_source_load_finished(
1090 &mut self,
1091 epoch: EpochPair,
1092 actor_id: ActorId,
1093 _table_id: u32,
1094 associated_source_id: u32,
1095 ) {
1096 if let Some(actor_state) = self.actor_states.get(&actor_id)
1098 && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
1099 && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
1100 {
1101 graph_state
1102 .load_finished_source_ids
1103 .entry(epoch.curr)
1104 .or_default()
1105 .insert(associated_source_id);
1106 } else {
1107 warn!(
1108 ?epoch,
1109 actor_id, associated_source_id, "ignore source load finished"
1110 );
1111 }
1112 }
1113}
1114
1115impl PartialGraphManagedBarrierState {
1116 fn may_have_collected_all(&mut self) -> Option<Barrier> {
1120 for barrier_state in self.epoch_barrier_state_map.values_mut() {
1121 match &barrier_state.inner {
1122 ManagedBarrierStateInner::Issued(IssuedState {
1123 remaining_actors, ..
1124 }) if remaining_actors.is_empty() => {}
1125 ManagedBarrierStateInner::AllCollected { .. } => {
1126 continue;
1127 }
1128 ManagedBarrierStateInner::Issued(_) => {
1129 break;
1130 }
1131 }
1132
1133 self.streaming_metrics.barrier_manager_progress.inc();
1134
1135 let create_mview_progress = self
1136 .create_mview_progress
1137 .remove(&barrier_state.barrier.epoch.curr)
1138 .unwrap_or_default()
1139 .into_iter()
1140 .map(|(actor, state)| state.to_pb(actor))
1141 .collect();
1142
1143 let load_finished_source_ids = self
1144 .load_finished_source_ids
1145 .remove(&barrier_state.barrier.epoch.curr)
1146 .unwrap_or_default()
1147 .into_iter()
1148 .collect();
1149 let cdc_table_backfill_progress = self
1150 .cdc_table_backfill_progress
1151 .remove(&barrier_state.barrier.epoch.curr)
1152 .unwrap_or_default()
1153 .into_iter()
1154 .map(|(actor, state)| state.to_pb(actor, barrier_state.barrier.epoch.curr))
1155 .collect();
1156
1157 let prev_state = replace(
1158 &mut barrier_state.inner,
1159 ManagedBarrierStateInner::AllCollected {
1160 create_mview_progress,
1161 load_finished_source_ids,
1162 cdc_table_backfill_progress,
1163 },
1164 );
1165
1166 must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState {
1167 barrier_inflight_latency: timer,
1168 ..
1169 }) => {
1170 timer.observe_duration();
1171 });
1172
1173 return Some(barrier_state.barrier.clone());
1174 }
1175 None
1176 }
1177
1178 #[allow(clippy::type_complexity)]
1179 fn pop_barrier_to_complete(
1180 &mut self,
1181 prev_epoch: u64,
1182 ) -> (
1183 Barrier,
1184 Option<HashSet<TableId>>,
1185 Vec<PbCreateMviewProgress>,
1186 Vec<u32>,
1187 Vec<PbCdcTableBackfillProgress>,
1188 ) {
1189 let (popped_prev_epoch, barrier_state) = self
1190 .epoch_barrier_state_map
1191 .pop_first()
1192 .expect("should exist");
1193
1194 assert_eq!(prev_epoch, popped_prev_epoch);
1195
1196 let (create_mview_progress, load_finished_source_ids, cdc_table_backfill_progress) = must_match!(barrier_state.inner, ManagedBarrierStateInner::AllCollected {
1197 create_mview_progress,
1198 load_finished_source_ids,
1199 cdc_table_backfill_progress,
1200 } => {
1201 (create_mview_progress, load_finished_source_ids, cdc_table_backfill_progress)
1202 });
1203 (
1204 barrier_state.barrier,
1205 barrier_state.table_ids,
1206 create_mview_progress,
1207 load_finished_source_ids,
1208 cdc_table_backfill_progress,
1209 )
1210 }
1211}
1212
1213impl PartialGraphManagedBarrierState {
1214 pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) {
1216 tracing::debug!(
1217 target: "events::stream::barrier::manager::collect",
1218 ?epoch, actor_id, state = ?self.epoch_barrier_state_map,
1219 "collect_barrier",
1220 );
1221
1222 match self.epoch_barrier_state_map.get_mut(&epoch.prev) {
1223 None => {
1224 panic!(
1228 "cannot collect new actor barrier {:?} at current state: None",
1229 epoch,
1230 )
1231 }
1232 Some(&mut BarrierState {
1233 ref barrier,
1234 inner:
1235 ManagedBarrierStateInner::Issued(IssuedState {
1236 ref mut remaining_actors,
1237 ..
1238 }),
1239 ..
1240 }) => {
1241 let exist = remaining_actors.remove(&actor_id);
1242 assert!(
1243 exist,
1244 "the actor doesn't exist. actor_id: {:?}, curr_epoch: {:?}",
1245 actor_id, epoch.curr
1246 );
1247 assert_eq!(barrier.epoch.curr, epoch.curr);
1248 }
1249 Some(BarrierState { inner, .. }) => {
1250 panic!(
1251 "cannot collect new actor barrier {:?} at current state: {:?}",
1252 epoch, inner
1253 )
1254 }
1255 }
1256 }
1257
1258 pub(super) fn transform_to_issued(
1261 &mut self,
1262 barrier: &Barrier,
1263 actor_ids_to_collect: impl IntoIterator<Item = ActorId>,
1264 table_ids: HashSet<TableId>,
1265 ) {
1266 let timer = self
1267 .streaming_metrics
1268 .barrier_inflight_latency
1269 .start_timer();
1270
1271 if let Some(hummock) = self.state_store.as_hummock() {
1272 hummock.start_epoch(barrier.epoch.curr, table_ids.clone());
1273 }
1274
1275 let table_ids = match barrier.kind {
1276 BarrierKind::Unspecified => {
1277 unreachable!()
1278 }
1279 BarrierKind::Initial => {
1280 assert!(
1281 self.prev_barrier_table_ids.is_none(),
1282 "non empty table_ids at initial barrier: {:?}",
1283 self.prev_barrier_table_ids
1284 );
1285 info!(epoch = ?barrier.epoch, "initialize at Initial barrier");
1286 self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1287 None
1288 }
1289 BarrierKind::Barrier => {
1290 if let Some((prev_epoch, prev_table_ids)) = self.prev_barrier_table_ids.as_mut() {
1291 assert_eq!(prev_epoch.curr, barrier.epoch.prev);
1292 assert_eq!(prev_table_ids, &table_ids);
1293 *prev_epoch = barrier.epoch;
1294 } else {
1295 info!(epoch = ?barrier.epoch, "initialize at non-checkpoint barrier");
1296 self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1297 }
1298 None
1299 }
1300 BarrierKind::Checkpoint => Some(
1301 if let Some((prev_epoch, prev_table_ids)) = self
1302 .prev_barrier_table_ids
1303 .replace((barrier.epoch, table_ids))
1304 && prev_epoch.curr == barrier.epoch.prev
1305 {
1306 prev_table_ids
1307 } else {
1308 debug!(epoch = ?barrier.epoch, "reinitialize at Checkpoint barrier");
1309 HashSet::new()
1310 },
1311 ),
1312 };
1313
1314 if let Some(&mut BarrierState { ref inner, .. }) =
1315 self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev)
1316 {
1317 {
1318 panic!(
1319 "barrier epochs{:?} state has already been `Issued`. Current state: {:?}",
1320 barrier.epoch, inner
1321 );
1322 }
1323 };
1324
1325 self.epoch_barrier_state_map.insert(
1326 barrier.epoch.prev,
1327 BarrierState {
1328 barrier: barrier.clone(),
1329 inner: ManagedBarrierStateInner::Issued(IssuedState {
1330 remaining_actors: BTreeSet::from_iter(actor_ids_to_collect),
1331 barrier_inflight_latency: timer,
1332 }),
1333 table_ids,
1334 },
1335 );
1336 }
1337
1338 #[cfg(test)]
1339 async fn pop_next_completed_epoch(&mut self) -> u64 {
1340 if let Some(barrier) = self.may_have_collected_all() {
1341 self.pop_barrier_to_complete(barrier.epoch.prev);
1342 return barrier.epoch.prev;
1343 }
1344 pending().await
1345 }
1346}
1347
1348#[cfg(test)]
1349mod tests {
1350 use std::collections::HashSet;
1351
1352 use risingwave_common::util::epoch::test_epoch;
1353
1354 use crate::executor::Barrier;
1355 use crate::task::barrier_worker::managed_state::PartialGraphManagedBarrierState;
1356
1357 #[tokio::test]
1358 async fn test_managed_state_add_actor() {
1359 let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1360 let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1361 let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1362 let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1363 let actor_ids_to_collect1 = HashSet::from([1, 2]);
1364 let actor_ids_to_collect2 = HashSet::from([1, 2]);
1365 let actor_ids_to_collect3 = HashSet::from([1, 2, 3]);
1366 managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1367 managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1368 managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1369 managed_barrier_state.collect(1, barrier1.epoch);
1370 managed_barrier_state.collect(2, barrier1.epoch);
1371 assert_eq!(
1372 managed_barrier_state.pop_next_completed_epoch().await,
1373 test_epoch(0)
1374 );
1375 assert_eq!(
1376 managed_barrier_state
1377 .epoch_barrier_state_map
1378 .first_key_value()
1379 .unwrap()
1380 .0,
1381 &test_epoch(1)
1382 );
1383 managed_barrier_state.collect(1, barrier2.epoch);
1384 managed_barrier_state.collect(1, barrier3.epoch);
1385 managed_barrier_state.collect(2, barrier2.epoch);
1386 assert_eq!(
1387 managed_barrier_state.pop_next_completed_epoch().await,
1388 test_epoch(1)
1389 );
1390 assert_eq!(
1391 managed_barrier_state
1392 .epoch_barrier_state_map
1393 .first_key_value()
1394 .unwrap()
1395 .0,
1396 &test_epoch(2)
1397 );
1398 managed_barrier_state.collect(2, barrier3.epoch);
1399 managed_barrier_state.collect(3, barrier3.epoch);
1400 assert_eq!(
1401 managed_barrier_state.pop_next_completed_epoch().await,
1402 test_epoch(2)
1403 );
1404 assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1405 }
1406
1407 #[tokio::test]
1408 async fn test_managed_state_stop_actor() {
1409 let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1410 let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1411 let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1412 let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1413 let actor_ids_to_collect1 = HashSet::from([1, 2, 3, 4]);
1414 let actor_ids_to_collect2 = HashSet::from([1, 2, 3]);
1415 let actor_ids_to_collect3 = HashSet::from([1, 2]);
1416 managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1417 managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1418 managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1419
1420 managed_barrier_state.collect(1, barrier1.epoch);
1421 managed_barrier_state.collect(1, barrier2.epoch);
1422 managed_barrier_state.collect(1, barrier3.epoch);
1423 managed_barrier_state.collect(2, barrier1.epoch);
1424 managed_barrier_state.collect(2, barrier2.epoch);
1425 managed_barrier_state.collect(2, barrier3.epoch);
1426 assert_eq!(
1427 managed_barrier_state
1428 .epoch_barrier_state_map
1429 .first_key_value()
1430 .unwrap()
1431 .0,
1432 &0
1433 );
1434 managed_barrier_state.collect(3, barrier1.epoch);
1435 managed_barrier_state.collect(3, barrier2.epoch);
1436 assert_eq!(
1437 managed_barrier_state
1438 .epoch_barrier_state_map
1439 .first_key_value()
1440 .unwrap()
1441 .0,
1442 &0
1443 );
1444 managed_barrier_state.collect(4, barrier1.epoch);
1445 assert_eq!(
1446 managed_barrier_state.pop_next_completed_epoch().await,
1447 test_epoch(0)
1448 );
1449 assert_eq!(
1450 managed_barrier_state.pop_next_completed_epoch().await,
1451 test_epoch(1)
1452 );
1453 assert_eq!(
1454 managed_barrier_state.pop_next_completed_epoch().await,
1455 test_epoch(2)
1456 );
1457 assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1458 }
1459}