1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
16use std::fmt::{Debug, Display, Formatter};
17use std::future::{Future, pending, poll_fn};
18use std::mem::replace;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21use std::time::{Duration, Instant};
22
23use anyhow::anyhow;
24use futures::future::BoxFuture;
25use futures::stream::{FuturesOrdered, FuturesUnordered};
26use futures::{FutureExt, StreamExt};
27use prometheus::HistogramTimer;
28use risingwave_common::catalog::TableId;
29use risingwave_common::id::SourceId;
30use risingwave_common::util::epoch::EpochPair;
31use risingwave_pb::stream_plan::barrier::BarrierKind;
32use risingwave_pb::stream_service::barrier_complete_response::{
33 PbCdcTableBackfillProgress, PbCreateMviewProgress, PbListFinishedSource, PbLoadFinishedSource,
34};
35use risingwave_storage::StateStoreImpl;
36use tokio::sync::mpsc;
37use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
38use tokio::task::JoinHandle;
39
40use crate::error::{StreamError, StreamResult};
41use crate::executor::Barrier;
42use crate::executor::monitor::StreamingMetrics;
43use crate::task::progress::BackfillState;
44use crate::task::{
45 ActorId, LocalBarrierEvent, LocalBarrierManager, NewOutputRequest, PartialGraphId,
46 StreamActorManager, UpDownActorIds,
47};
48
49struct IssuedState {
50 pub remaining_actors: BTreeSet<ActorId>,
52
53 pub barrier_inflight_latency: HistogramTimer,
54}
55
56impl Debug for IssuedState {
57 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("IssuedState")
59 .field("remaining_actors", &self.remaining_actors)
60 .finish()
61 }
62}
63
64#[derive(Debug)]
66enum ManagedBarrierStateInner {
67 Issued(IssuedState),
69
70 AllCollected {
72 create_mview_progress: Vec<PbCreateMviewProgress>,
73 list_finished_source_ids: Vec<PbListFinishedSource>,
74 load_finished_source_ids: Vec<PbLoadFinishedSource>,
75 cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
76 truncate_tables: Vec<TableId>,
77 refresh_finished_tables: Vec<TableId>,
78 },
79}
80
81#[derive(Debug)]
82struct BarrierState {
83 barrier: Barrier,
84 table_ids: Option<HashSet<TableId>>,
86 inner: ManagedBarrierStateInner,
87}
88
89use risingwave_common::must_match;
90use risingwave_pb::id::FragmentId;
91use risingwave_pb::stream_service::InjectBarrierRequest;
92
93use crate::executor::exchange::permit;
94use crate::executor::exchange::permit::channel_from_config;
95use crate::task::barrier_worker::await_epoch_completed_future::AwaitEpochCompletedFuture;
96use crate::task::barrier_worker::{ScoredStreamError, TakeReceiverRequest};
97use crate::task::cdc_progress::CdcTableBackfillState;
98
99pub(super) struct ManagedBarrierStateDebugInfo<'a> {
100 running_actors: BTreeSet<ActorId>,
101 graph_state: &'a PartialGraphManagedBarrierState,
102}
103
104impl Display for ManagedBarrierStateDebugInfo<'_> {
105 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
106 write!(f, "running_actors: ")?;
107 for actor_id in &self.running_actors {
108 write!(f, "{}, ", actor_id)?;
109 }
110 {
111 writeln!(f, "graph states")?;
112 write!(f, "{}", self.graph_state)?;
113 }
114 Ok(())
115 }
116}
117
118impl Display for &'_ PartialGraphManagedBarrierState {
119 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
120 let mut prev_epoch = 0u64;
121 for (epoch, barrier_state) in &self.epoch_barrier_state_map {
122 write!(f, "> Epoch {}: ", epoch)?;
123 match &barrier_state.inner {
124 ManagedBarrierStateInner::Issued(state) => {
125 write!(
126 f,
127 "Issued [{:?}]. Remaining actors: [",
128 barrier_state.barrier.kind
129 )?;
130 let mut is_prev_epoch_issued = false;
131 if prev_epoch != 0 {
132 let bs = &self.epoch_barrier_state_map[&prev_epoch];
133 if let ManagedBarrierStateInner::Issued(IssuedState {
134 remaining_actors: remaining_actors_prev,
135 ..
136 }) = &bs.inner
137 {
138 is_prev_epoch_issued = true;
140 let mut duplicates = 0usize;
141 for actor_id in &state.remaining_actors {
142 if !remaining_actors_prev.contains(actor_id) {
143 write!(f, "{}, ", actor_id)?;
144 } else {
145 duplicates += 1;
146 }
147 }
148 if duplicates > 0 {
149 write!(f, "...and {} actors in prev epoch", duplicates)?;
150 }
151 }
152 }
153 if !is_prev_epoch_issued {
154 for actor_id in &state.remaining_actors {
155 write!(f, "{}, ", actor_id)?;
156 }
157 }
158 write!(f, "]")?;
159 }
160 ManagedBarrierStateInner::AllCollected { .. } => {
161 write!(f, "AllCollected")?;
162 }
163 }
164 prev_epoch = *epoch;
165 writeln!(f)?;
166 }
167
168 if !self.create_mview_progress.is_empty() {
169 writeln!(f, "Create MView Progress:")?;
170 for (epoch, progress) in &self.create_mview_progress {
171 write!(f, "> Epoch {}:", epoch)?;
172 for (actor_id, (_, state)) in progress {
173 write!(f, ">> Actor {}: {}, ", actor_id, state)?;
174 }
175 }
176 }
177
178 Ok(())
179 }
180}
181
182enum InflightActorStatus {
183 IssuedFirst(Vec<Barrier>),
185 Running(u64),
187}
188
189impl InflightActorStatus {
190 fn max_issued_epoch(&self) -> u64 {
191 match self {
192 InflightActorStatus::Running(epoch) => *epoch,
193 InflightActorStatus::IssuedFirst(issued_barriers) => {
194 issued_barriers.last().expect("non-empty").epoch.prev
195 }
196 }
197 }
198}
199
200pub(crate) struct InflightActorState {
201 actor_id: ActorId,
202 barrier_senders: Vec<mpsc::UnboundedSender<Barrier>>,
203 pub(in crate::task) inflight_barriers: VecDeque<u64>,
205 status: InflightActorStatus,
206 is_stopping: bool,
208
209 new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
210 join_handle: JoinHandle<()>,
211 monitor_task_handle: Option<JoinHandle<()>>,
212}
213
214impl InflightActorState {
215 pub(super) fn start(
216 actor_id: ActorId,
217 initial_barrier: &Barrier,
218 new_output_request_tx: UnboundedSender<(ActorId, NewOutputRequest)>,
219 join_handle: JoinHandle<()>,
220 monitor_task_handle: Option<JoinHandle<()>>,
221 ) -> Self {
222 Self {
223 actor_id,
224 barrier_senders: vec![],
225 inflight_barriers: VecDeque::from_iter([initial_barrier.epoch.prev]),
226 status: InflightActorStatus::IssuedFirst(vec![initial_barrier.clone()]),
227 is_stopping: false,
228 new_output_request_tx,
229 join_handle,
230 monitor_task_handle,
231 }
232 }
233
234 pub(super) fn issue_barrier(&mut self, barrier: &Barrier, is_stop: bool) -> StreamResult<()> {
235 assert!(barrier.epoch.prev > self.status.max_issued_epoch());
236
237 for barrier_sender in &self.barrier_senders {
238 barrier_sender.send(barrier.clone()).map_err(|_| {
239 StreamError::barrier_send(
240 barrier.clone(),
241 self.actor_id,
242 "failed to send to registered sender",
243 )
244 })?;
245 }
246
247 if let Some(prev_epoch) = self.inflight_barriers.back() {
248 assert!(*prev_epoch < barrier.epoch.prev);
249 }
250 self.inflight_barriers.push_back(barrier.epoch.prev);
251
252 match &mut self.status {
253 InflightActorStatus::IssuedFirst(pending_barriers) => {
254 pending_barriers.push(barrier.clone());
255 }
256 InflightActorStatus::Running(prev_epoch) => {
257 *prev_epoch = barrier.epoch.prev;
258 }
259 };
260
261 if is_stop {
262 assert!(!self.is_stopping, "stopped actor should not issue barrier");
263 self.is_stopping = true;
264 }
265 Ok(())
266 }
267
268 pub(super) fn collect(&mut self, epoch: EpochPair) -> bool {
269 let prev_epoch = self.inflight_barriers.pop_front().expect("should exist");
270 assert_eq!(prev_epoch, epoch.prev);
271 match &self.status {
272 InflightActorStatus::IssuedFirst(pending_barriers) => {
273 assert_eq!(
274 prev_epoch,
275 pending_barriers.first().expect("non-empty").epoch.prev
276 );
277 self.status = InflightActorStatus::Running(
278 pending_barriers.last().expect("non-empty").epoch.prev,
279 );
280 }
281 InflightActorStatus::Running(_) => {}
282 }
283
284 self.inflight_barriers.is_empty() && self.is_stopping
285 }
286}
287
288pub(crate) struct PartialGraphManagedBarrierState {
290 epoch_barrier_state_map: BTreeMap<u64, BarrierState>,
294
295 prev_barrier_table_ids: Option<(EpochPair, HashSet<TableId>)>,
296
297 pub(crate) create_mview_progress: HashMap<u64, HashMap<ActorId, (FragmentId, BackfillState)>>,
305
306 pub(crate) list_finished_source_ids: HashMap<u64, Vec<PbListFinishedSource>>,
310
311 pub(crate) load_finished_source_ids: HashMap<u64, Vec<PbLoadFinishedSource>>,
315
316 pub(crate) cdc_table_backfill_progress: HashMap<u64, HashMap<ActorId, CdcTableBackfillState>>,
317
318 pub(crate) truncate_tables: HashMap<u64, HashSet<TableId>>,
320 pub(crate) refresh_finished_tables: HashMap<u64, HashSet<TableId>>,
323
324 state_store: StateStoreImpl,
325
326 streaming_metrics: Arc<StreamingMetrics>,
327}
328
329impl PartialGraphManagedBarrierState {
330 pub(super) fn new(actor_manager: &StreamActorManager) -> Self {
331 Self::new_inner(
332 actor_manager.env.state_store(),
333 actor_manager.streaming_metrics.clone(),
334 )
335 }
336
337 fn new_inner(state_store: StateStoreImpl, streaming_metrics: Arc<StreamingMetrics>) -> Self {
338 Self {
339 epoch_barrier_state_map: Default::default(),
340 prev_barrier_table_ids: None,
341 create_mview_progress: Default::default(),
342 list_finished_source_ids: Default::default(),
343 load_finished_source_ids: Default::default(),
344 cdc_table_backfill_progress: Default::default(),
345 truncate_tables: Default::default(),
346 refresh_finished_tables: Default::default(),
347 state_store,
348 streaming_metrics,
349 }
350 }
351
352 #[cfg(test)]
353 pub(crate) fn for_test() -> Self {
354 Self::new_inner(
355 StateStoreImpl::for_test(),
356 Arc::new(StreamingMetrics::unused()),
357 )
358 }
359
360 pub(super) fn is_empty(&self) -> bool {
361 self.epoch_barrier_state_map.is_empty()
362 }
363}
364
365pub(crate) struct SuspendedPartialGraphState {
366 pub(super) suspend_time: Instant,
367 inner: PartialGraphState,
368 failure: Option<(Option<ActorId>, StreamError)>,
369}
370
371impl SuspendedPartialGraphState {
372 fn new(
373 state: PartialGraphState,
374 failure: Option<(Option<ActorId>, StreamError)>,
375 _completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>, ) -> Self {
377 Self {
378 suspend_time: Instant::now(),
379 inner: state,
380 failure,
381 }
382 }
383
384 async fn reset(mut self) -> ResetPartialGraphOutput {
385 let root_err = self.inner.try_find_root_actor_failure(self.failure).await;
386 self.inner.abort_and_wait_actors().await;
387 ResetPartialGraphOutput { root_err }
388 }
389}
390
391pub(crate) struct ResetPartialGraphOutput {
392 pub(crate) root_err: Option<ScoredStreamError>,
393}
394
395pub(in crate::task) enum PartialGraphStatus {
396 ReceivedExchangeRequest(Vec<(UpDownActorIds, TakeReceiverRequest)>),
397 Running(PartialGraphState),
398 Suspended(SuspendedPartialGraphState),
399 Resetting,
400 Unspecified,
402}
403
404impl PartialGraphStatus {
405 pub(crate) async fn abort(&mut self) {
406 match self {
407 PartialGraphStatus::ReceivedExchangeRequest(pending_requests) => {
408 for (_, request) in pending_requests.drain(..) {
409 if let TakeReceiverRequest::Remote(sender) = request {
410 let _ = sender.send(Err(anyhow!("partial graph aborted").into()));
411 }
412 }
413 }
414 PartialGraphStatus::Running(state) => {
415 state.abort_and_wait_actors().await;
416 }
417 PartialGraphStatus::Suspended(SuspendedPartialGraphState { inner: state, .. }) => {
418 state.abort_and_wait_actors().await;
419 }
420 PartialGraphStatus::Resetting => {}
421 PartialGraphStatus::Unspecified => {
422 unreachable!()
423 }
424 }
425 }
426
427 pub(crate) fn state_for_request(&mut self) -> Option<&mut PartialGraphState> {
428 match self {
429 PartialGraphStatus::ReceivedExchangeRequest(_) => {
430 unreachable!("should not handle request")
431 }
432 PartialGraphStatus::Running(state) => Some(state),
433 PartialGraphStatus::Suspended(_) => None,
434 PartialGraphStatus::Resetting => {
435 unreachable!("should not receive further request during cleaning")
436 }
437 PartialGraphStatus::Unspecified => {
438 unreachable!()
439 }
440 }
441 }
442
443 pub(super) fn poll_next_event(
444 &mut self,
445 cx: &mut Context<'_>,
446 ) -> Poll<ManagedBarrierStateEvent> {
447 match self {
448 PartialGraphStatus::ReceivedExchangeRequest(_) => Poll::Pending,
449 PartialGraphStatus::Running(state) => state.poll_next_event(cx),
450 PartialGraphStatus::Suspended(_) | PartialGraphStatus::Resetting => Poll::Pending,
451 PartialGraphStatus::Unspecified => {
452 unreachable!()
453 }
454 }
455 }
456
457 pub(super) fn suspend(
458 &mut self,
459 failed_actor: Option<ActorId>,
460 err: StreamError,
461 completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
462 ) {
463 let state = must_match!(replace(self, PartialGraphStatus::Unspecified), PartialGraphStatus::Running(state) => state);
464 *self = PartialGraphStatus::Suspended(SuspendedPartialGraphState::new(
465 state,
466 Some((failed_actor, err)),
467 completing_futures,
468 ));
469 }
470
471 pub(super) fn start_reset(
472 &mut self,
473 partial_graph_id: PartialGraphId,
474 completing_futures: Option<FuturesOrdered<AwaitEpochCompletedFuture>>,
475 table_ids_to_clear: &mut HashSet<TableId>,
476 ) -> BoxFuture<'static, ResetPartialGraphOutput> {
477 match replace(self, PartialGraphStatus::Resetting) {
478 PartialGraphStatus::ReceivedExchangeRequest(pending_requests) => {
479 for (_, request) in pending_requests {
480 if let TakeReceiverRequest::Remote(sender) = request {
481 let _ = sender.send(Err(anyhow!("partial graph reset").into()));
482 }
483 }
484 async move { ResetPartialGraphOutput { root_err: None } }.boxed()
485 }
486 PartialGraphStatus::Running(state) => {
487 assert_eq!(partial_graph_id, state.partial_graph_id);
488 info!(
489 %partial_graph_id,
490 "start partial graph reset from Running"
491 );
492 table_ids_to_clear.extend(state.table_ids.iter().copied());
493 SuspendedPartialGraphState::new(state, None, completing_futures)
494 .reset()
495 .boxed()
496 }
497 PartialGraphStatus::Suspended(state) => {
498 assert!(
499 completing_futures.is_none(),
500 "should have been clear when suspended"
501 );
502 assert_eq!(partial_graph_id, state.inner.partial_graph_id);
503 info!(
504 %partial_graph_id,
505 suspend_elapsed = ?state.suspend_time.elapsed(),
506 "start partial graph reset after suspended"
507 );
508 table_ids_to_clear.extend(state.inner.table_ids.iter().copied());
509 state.reset().boxed()
510 }
511 PartialGraphStatus::Resetting => {
512 unreachable!("should not reset for twice");
513 }
514 PartialGraphStatus::Unspecified => {
515 unreachable!()
516 }
517 }
518 }
519}
520
521#[derive(Default)]
522pub(in crate::task) struct ManagedBarrierState {
523 pub(super) partial_graphs: HashMap<PartialGraphId, PartialGraphStatus>,
524 pub(super) resetting_graphs:
525 FuturesUnordered<JoinHandle<Vec<(PartialGraphId, ResetPartialGraphOutput)>>>,
526}
527
528pub(super) enum ManagedBarrierStateEvent {
529 BarrierCollected {
530 partial_graph_id: PartialGraphId,
531 barrier: Barrier,
532 },
533 ActorError {
534 partial_graph_id: PartialGraphId,
535 actor_id: ActorId,
536 err: StreamError,
537 },
538 PartialGraphsReset(Vec<(PartialGraphId, ResetPartialGraphOutput)>),
539 RegisterLocalUpstreamOutput {
540 actor_id: ActorId,
541 upstream_actor_id: ActorId,
542 upstream_partial_graph_id: PartialGraphId,
543 tx: permit::Sender,
544 },
545}
546
547impl ManagedBarrierState {
548 pub(super) fn next_event(&mut self) -> impl Future<Output = ManagedBarrierStateEvent> + '_ {
549 poll_fn(|cx| {
550 for graph in self.partial_graphs.values_mut() {
551 if let Poll::Ready(event) = graph.poll_next_event(cx) {
552 return Poll::Ready(event);
553 }
554 }
555 if let Poll::Ready(Some(result)) = self.resetting_graphs.poll_next_unpin(cx) {
556 let outputs = result.expect("failed to join resetting future");
557 for (partial_graph_id, _) in &outputs {
558 let PartialGraphStatus::Resetting = self
559 .partial_graphs
560 .remove(partial_graph_id)
561 .expect("should exist")
562 else {
563 panic!("should be resetting")
564 };
565 }
566 return Poll::Ready(ManagedBarrierStateEvent::PartialGraphsReset(outputs));
567 }
568 Poll::Pending
569 })
570 }
571}
572
573pub(crate) struct PartialGraphState {
578 partial_graph_id: PartialGraphId,
579 pub(crate) actor_states: HashMap<ActorId, InflightActorState>,
580 pub(super) actor_pending_new_output_requests:
581 HashMap<ActorId, Vec<(ActorId, NewOutputRequest)>>,
582
583 pub(crate) graph_state: PartialGraphManagedBarrierState,
584
585 table_ids: HashSet<TableId>,
586
587 actor_manager: Arc<StreamActorManager>,
588
589 pub(super) local_barrier_manager: LocalBarrierManager,
590
591 barrier_event_rx: UnboundedReceiver<LocalBarrierEvent>,
592 pub(super) actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,
593}
594
595impl PartialGraphState {
596 pub(super) fn new(
598 partial_graph_id: PartialGraphId,
599 term_id: String,
600 actor_manager: Arc<StreamActorManager>,
601 ) -> Self {
602 let (local_barrier_manager, barrier_event_rx, actor_failure_rx) =
603 LocalBarrierManager::new(term_id, actor_manager.env.clone());
604 Self {
605 partial_graph_id,
606 actor_states: Default::default(),
607 actor_pending_new_output_requests: Default::default(),
608 graph_state: PartialGraphManagedBarrierState::new(&actor_manager),
609 table_ids: Default::default(),
610 actor_manager,
611 local_barrier_manager,
612 barrier_event_rx,
613 actor_failure_rx,
614 }
615 }
616
617 pub(super) fn to_debug_info(&self) -> ManagedBarrierStateDebugInfo<'_> {
618 ManagedBarrierStateDebugInfo {
619 running_actors: self.actor_states.keys().cloned().collect(),
620 graph_state: &self.graph_state,
621 }
622 }
623
624 async fn abort_and_wait_actors(&mut self) {
625 for (actor_id, state) in &self.actor_states {
626 tracing::debug!("force stopping actor {}", actor_id);
627 state.join_handle.abort();
628 if let Some(monitor_task_handle) = &state.monitor_task_handle {
629 monitor_task_handle.abort();
630 }
631 }
632
633 for (actor_id, state) in self.actor_states.drain() {
634 tracing::debug!("join actor {}", actor_id);
635 let result = state.join_handle.await;
636 assert!(result.is_ok() || result.unwrap_err().is_cancelled());
637 }
638 }
639}
640
641impl InflightActorState {
642 pub(super) fn register_barrier_sender(
643 &mut self,
644 tx: mpsc::UnboundedSender<Barrier>,
645 ) -> StreamResult<()> {
646 match &self.status {
647 InflightActorStatus::IssuedFirst(pending_barriers) => {
648 for barrier in pending_barriers {
649 tx.send(barrier.clone()).map_err(|_| {
650 StreamError::barrier_send(
651 barrier.clone(),
652 self.actor_id,
653 "failed to send pending barriers to newly registered sender",
654 )
655 })?;
656 }
657 self.barrier_senders.push(tx);
658 }
659 InflightActorStatus::Running(_) => {
660 unreachable!("should not register barrier sender when entering Running status")
661 }
662 }
663 Ok(())
664 }
665}
666
667impl PartialGraphState {
668 pub(super) fn register_barrier_sender(
669 &mut self,
670 actor_id: ActorId,
671 tx: mpsc::UnboundedSender<Barrier>,
672 ) -> StreamResult<()> {
673 self.actor_states
674 .get_mut(&actor_id)
675 .expect("should exist")
676 .register_barrier_sender(tx)
677 }
678}
679
680impl PartialGraphState {
681 pub(super) fn transform_to_issued(
682 &mut self,
683 barrier: &Barrier,
684 request: InjectBarrierRequest,
685 ) -> StreamResult<()> {
686 assert_eq!(self.partial_graph_id, request.partial_graph_id);
687 let actor_to_stop = barrier.all_stop_actors();
688 let is_stop_actor = |actor_id| {
689 actor_to_stop
690 .map(|actors| actors.contains(&actor_id))
691 .unwrap_or(false)
692 };
693
694 let table_ids = HashSet::from_iter(request.table_ids_to_sync);
695 self.table_ids.extend(table_ids.iter().cloned());
696
697 self.graph_state.transform_to_issued(
698 barrier,
699 request.actor_ids_to_collect.iter().copied(),
700 table_ids,
701 );
702
703 let mut new_actors = HashSet::new();
704 for (node, fragment_id, actor) in
705 request
706 .actors_to_build
707 .into_iter()
708 .flat_map(|fragment_actors| {
709 let node = Arc::new(fragment_actors.node.unwrap());
710 fragment_actors
711 .actors
712 .into_iter()
713 .map(move |actor| (node.clone(), fragment_actors.fragment_id, actor))
714 })
715 {
716 let actor_id = actor.actor_id;
717 assert!(!is_stop_actor(actor_id));
718 assert!(new_actors.insert(actor_id));
719 assert!(request.actor_ids_to_collect.contains(&actor_id));
720 let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
721 if let Some(pending_requests) = self.actor_pending_new_output_requests.remove(&actor_id)
722 {
723 for request in pending_requests {
724 let _ = new_output_request_tx.send(request);
725 }
726 }
727 let (join_handle, monitor_join_handle) = self.actor_manager.spawn_actor(
728 actor,
729 fragment_id,
730 node,
731 self.local_barrier_manager.clone(),
732 new_output_request_rx,
733 );
734 assert!(
735 self.actor_states
736 .try_insert(
737 actor_id,
738 InflightActorState::start(
739 actor_id,
740 barrier,
741 new_output_request_tx,
742 join_handle,
743 monitor_join_handle
744 )
745 )
746 .is_ok()
747 );
748 }
749
750 if cfg!(test) {
754 for &actor_id in &request.actor_ids_to_collect {
755 if !self.actor_states.contains_key(&actor_id) {
756 let (tx, rx) = unbounded_channel();
757 let join_handle = self.actor_manager.runtime.spawn(async move {
758 let _ = rx;
760 pending().await
761 });
762 assert!(
763 self.actor_states
764 .try_insert(
765 actor_id,
766 InflightActorState::start(actor_id, barrier, tx, join_handle, None,)
767 )
768 .is_ok()
769 );
770 new_actors.insert(actor_id);
771 }
772 }
773 }
774
775 for &actor_id in &request.actor_ids_to_collect {
778 if new_actors.contains(&actor_id) {
779 continue;
780 }
781 self.actor_states
782 .get_mut(&actor_id)
783 .unwrap_or_else(|| {
784 panic!(
785 "should exist: {} {:?}",
786 actor_id, request.actor_ids_to_collect
787 );
788 })
789 .issue_barrier(barrier, is_stop_actor(actor_id))?;
790 }
791
792 Ok(())
793 }
794
795 pub(super) fn new_actor_output_request(
796 &mut self,
797 actor_id: ActorId,
798 upstream_actor_id: ActorId,
799 request: TakeReceiverRequest,
800 ) {
801 let request = match request {
802 TakeReceiverRequest::Remote(result_sender) => {
803 let (tx, rx) = channel_from_config(self.local_barrier_manager.env.global_config());
804 let _ = result_sender.send(Ok(rx));
805 NewOutputRequest::Remote(tx)
806 }
807 TakeReceiverRequest::Local(tx) => NewOutputRequest::Local(tx),
808 };
809 if let Some(actor) = self.actor_states.get_mut(&upstream_actor_id) {
810 let _ = actor.new_output_request_tx.send((actor_id, request));
811 } else {
812 self.actor_pending_new_output_requests
813 .entry(upstream_actor_id)
814 .or_default()
815 .push((actor_id, request));
816 }
817 }
818
819 pub(super) fn poll_next_event(
821 &mut self,
822 cx: &mut Context<'_>,
823 ) -> Poll<ManagedBarrierStateEvent> {
824 if let Poll::Ready(option) = self.actor_failure_rx.poll_recv(cx) {
825 let (actor_id, err) = option.expect("non-empty when tx in local_barrier_manager");
826 return Poll::Ready(ManagedBarrierStateEvent::ActorError {
827 actor_id,
828 err,
829 partial_graph_id: self.partial_graph_id,
830 });
831 }
832 {
834 if let Some(barrier) = self.graph_state.may_have_collected_all() {
835 return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
836 barrier,
837 partial_graph_id: self.partial_graph_id,
838 });
839 }
840 }
841 while let Poll::Ready(event) = self.barrier_event_rx.poll_recv(cx) {
842 match event.expect("non-empty when tx in local_barrier_manager") {
843 LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => {
844 if let Some(barrier) = self.collect(actor_id, epoch) {
845 return Poll::Ready(ManagedBarrierStateEvent::BarrierCollected {
846 barrier,
847 partial_graph_id: self.partial_graph_id,
848 });
849 }
850 }
851 LocalBarrierEvent::ReportCreateProgress {
852 epoch,
853 fragment_id,
854 actor,
855 state,
856 } => {
857 self.update_create_mview_progress(epoch, fragment_id, actor, state);
858 }
859 LocalBarrierEvent::ReportSourceListFinished {
860 epoch,
861 actor_id,
862 table_id,
863 associated_source_id,
864 } => {
865 self.report_source_list_finished(
866 epoch,
867 actor_id,
868 table_id,
869 associated_source_id,
870 );
871 }
872 LocalBarrierEvent::ReportSourceLoadFinished {
873 epoch,
874 actor_id,
875 table_id,
876 associated_source_id,
877 } => {
878 self.report_source_load_finished(
879 epoch,
880 actor_id,
881 table_id,
882 associated_source_id,
883 );
884 }
885 LocalBarrierEvent::RefreshFinished {
886 epoch,
887 actor_id,
888 table_id,
889 staging_table_id,
890 } => {
891 self.report_refresh_finished(epoch, actor_id, table_id, staging_table_id);
892 }
893 LocalBarrierEvent::RegisterBarrierSender {
894 actor_id,
895 barrier_sender,
896 } => {
897 if let Err(err) = self.register_barrier_sender(actor_id, barrier_sender) {
898 return Poll::Ready(ManagedBarrierStateEvent::ActorError {
899 actor_id,
900 err,
901 partial_graph_id: self.partial_graph_id,
902 });
903 }
904 }
905 LocalBarrierEvent::RegisterLocalUpstreamOutput {
906 actor_id,
907 upstream_actor_id,
908 upstream_partial_graph_id,
909 tx,
910 } => {
911 return Poll::Ready(ManagedBarrierStateEvent::RegisterLocalUpstreamOutput {
912 actor_id,
913 upstream_actor_id,
914 upstream_partial_graph_id,
915 tx,
916 });
917 }
918 LocalBarrierEvent::ReportCdcTableBackfillProgress {
919 actor_id,
920 epoch,
921 state,
922 } => {
923 self.update_cdc_table_backfill_progress(epoch, actor_id, state);
924 }
925 }
926 }
927
928 debug_assert!(self.graph_state.may_have_collected_all().is_none());
929 Poll::Pending
930 }
931}
932
933impl PartialGraphState {
934 #[must_use]
935 pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) -> Option<Barrier> {
936 let is_finished = self
937 .actor_states
938 .get_mut(&actor_id)
939 .expect("should exist")
940 .collect(epoch);
941 if is_finished {
942 let state = self.actor_states.remove(&actor_id).expect("should exist");
943 if let Some(monitor_task_handle) = state.monitor_task_handle {
944 monitor_task_handle.abort();
945 }
946 }
947 self.graph_state.collect(actor_id, epoch);
948 self.graph_state.may_have_collected_all()
949 }
950
951 pub(super) fn pop_barrier_to_complete(&mut self, prev_epoch: u64) -> BarrierToComplete {
952 self.graph_state.pop_barrier_to_complete(prev_epoch)
953 }
954
955 async fn try_find_root_actor_failure(
959 &mut self,
960 first_failure: Option<(Option<ActorId>, StreamError)>,
961 ) -> Option<ScoredStreamError> {
962 let mut later_errs = vec![];
963 let _ = tokio::time::timeout(Duration::from_secs(3), async {
965 let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
966 if let Some((Some(failed_actor), _)) = &first_failure {
967 uncollected_actors.remove(failed_actor);
968 }
969 while !uncollected_actors.is_empty()
970 && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
971 {
972 uncollected_actors.remove(&actor_id);
973 later_errs.push(error);
974 }
975 })
976 .await;
977
978 first_failure
979 .into_iter()
980 .map(|(_, err)| err)
981 .chain(later_errs.into_iter())
982 .map(|e| e.with_score())
983 .max_by_key(|e| e.score)
984 }
985
986 pub(super) fn report_source_list_finished(
988 &mut self,
989 epoch: EpochPair,
990 actor_id: ActorId,
991 table_id: TableId,
992 associated_source_id: SourceId,
993 ) {
994 if let Some(actor_state) = self.actor_states.get(&actor_id)
996 && actor_state.inflight_barriers.contains(&epoch.prev)
997 {
998 self.graph_state
999 .list_finished_source_ids
1000 .entry(epoch.curr)
1001 .or_default()
1002 .push(PbListFinishedSource {
1003 reporter_actor_id: actor_id,
1004 table_id,
1005 associated_source_id,
1006 });
1007 } else {
1008 warn!(
1009 ?epoch,
1010 %actor_id, %table_id, %associated_source_id, "ignore source list finished"
1011 );
1012 }
1013 }
1014
1015 pub(super) fn report_source_load_finished(
1017 &mut self,
1018 epoch: EpochPair,
1019 actor_id: ActorId,
1020 table_id: TableId,
1021 associated_source_id: SourceId,
1022 ) {
1023 if let Some(actor_state) = self.actor_states.get(&actor_id)
1025 && actor_state.inflight_barriers.contains(&epoch.prev)
1026 {
1027 self.graph_state
1028 .load_finished_source_ids
1029 .entry(epoch.curr)
1030 .or_default()
1031 .push(PbLoadFinishedSource {
1032 reporter_actor_id: actor_id,
1033 table_id,
1034 associated_source_id,
1035 });
1036 } else {
1037 warn!(
1038 ?epoch,
1039 %actor_id, %table_id, %associated_source_id, "ignore source load finished"
1040 );
1041 }
1042 }
1043
1044 pub(super) fn report_refresh_finished(
1046 &mut self,
1047 epoch: EpochPair,
1048 actor_id: ActorId,
1049 table_id: TableId,
1050 staging_table_id: TableId,
1051 ) {
1052 let Some(actor_state) = self.actor_states.get(&actor_id) else {
1054 warn!(
1055 ?epoch,
1056 %actor_id, %table_id, "ignore refresh finished table: actor_state not found"
1057 );
1058 return;
1059 };
1060 if !actor_state.inflight_barriers.contains(&epoch.prev) {
1061 warn!(
1062 ?epoch,
1063 %actor_id,
1064 %table_id,
1065 inflight_barriers = ?actor_state.inflight_barriers,
1066 "ignore refresh finished table: partial_graph_id not found in inflight_barriers"
1067 );
1068 return;
1069 };
1070 self.graph_state
1071 .refresh_finished_tables
1072 .entry(epoch.curr)
1073 .or_default()
1074 .insert(table_id);
1075 self.graph_state
1076 .truncate_tables
1077 .entry(epoch.curr)
1078 .or_default()
1079 .insert(staging_table_id);
1080 }
1081}
1082
1083impl PartialGraphManagedBarrierState {
1084 fn may_have_collected_all(&mut self) -> Option<Barrier> {
1088 for barrier_state in self.epoch_barrier_state_map.values_mut() {
1089 match &barrier_state.inner {
1090 ManagedBarrierStateInner::Issued(IssuedState {
1091 remaining_actors, ..
1092 }) if remaining_actors.is_empty() => {}
1093 ManagedBarrierStateInner::AllCollected { .. } => {
1094 continue;
1095 }
1096 ManagedBarrierStateInner::Issued(_) => {
1097 break;
1098 }
1099 }
1100
1101 self.streaming_metrics.barrier_manager_progress.inc();
1102
1103 let create_mview_progress = self
1104 .create_mview_progress
1105 .remove(&barrier_state.barrier.epoch.curr)
1106 .unwrap_or_default()
1107 .into_iter()
1108 .map(|(actor, (fragment_id, state))| state.to_pb(fragment_id, actor))
1109 .collect();
1110
1111 let list_finished_source_ids = self
1112 .list_finished_source_ids
1113 .remove(&barrier_state.barrier.epoch.curr)
1114 .unwrap_or_default();
1115
1116 let load_finished_source_ids = self
1117 .load_finished_source_ids
1118 .remove(&barrier_state.barrier.epoch.curr)
1119 .unwrap_or_default();
1120
1121 let cdc_table_backfill_progress = self
1122 .cdc_table_backfill_progress
1123 .remove(&barrier_state.barrier.epoch.curr)
1124 .unwrap_or_default()
1125 .into_iter()
1126 .map(|(actor, state)| state.to_pb(actor, barrier_state.barrier.epoch.curr))
1127 .collect();
1128
1129 let truncate_tables = self
1130 .truncate_tables
1131 .remove(&barrier_state.barrier.epoch.curr)
1132 .unwrap_or_default()
1133 .into_iter()
1134 .collect();
1135
1136 let refresh_finished_tables = self
1137 .refresh_finished_tables
1138 .remove(&barrier_state.barrier.epoch.curr)
1139 .unwrap_or_default()
1140 .into_iter()
1141 .collect();
1142 let prev_state = replace(
1143 &mut barrier_state.inner,
1144 ManagedBarrierStateInner::AllCollected {
1145 create_mview_progress,
1146 list_finished_source_ids,
1147 load_finished_source_ids,
1148 truncate_tables,
1149 refresh_finished_tables,
1150 cdc_table_backfill_progress,
1151 },
1152 );
1153
1154 must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState {
1155 barrier_inflight_latency: timer,
1156 ..
1157 }) => {
1158 timer.observe_duration();
1159 });
1160
1161 return Some(barrier_state.barrier.clone());
1162 }
1163 None
1164 }
1165
1166 fn pop_barrier_to_complete(&mut self, prev_epoch: u64) -> BarrierToComplete {
1167 let (popped_prev_epoch, barrier_state) = self
1168 .epoch_barrier_state_map
1169 .pop_first()
1170 .expect("should exist");
1171
1172 assert_eq!(prev_epoch, popped_prev_epoch);
1173
1174 let (
1175 create_mview_progress,
1176 list_finished_source_ids,
1177 load_finished_source_ids,
1178 cdc_table_backfill_progress,
1179 truncate_tables,
1180 refresh_finished_tables,
1181 ) = must_match!(barrier_state.inner, ManagedBarrierStateInner::AllCollected {
1182 create_mview_progress,
1183 list_finished_source_ids,
1184 load_finished_source_ids,
1185 truncate_tables,
1186 refresh_finished_tables,
1187 cdc_table_backfill_progress,
1188 } => {
1189 (create_mview_progress, list_finished_source_ids, load_finished_source_ids, cdc_table_backfill_progress, truncate_tables, refresh_finished_tables)
1190 });
1191 BarrierToComplete {
1192 barrier: barrier_state.barrier,
1193 table_ids: barrier_state.table_ids,
1194 create_mview_progress,
1195 list_finished_source_ids,
1196 load_finished_source_ids,
1197 truncate_tables,
1198 refresh_finished_tables,
1199 cdc_table_backfill_progress,
1200 }
1201 }
1202}
1203
1204pub(crate) struct BarrierToComplete {
1205 pub barrier: Barrier,
1206 pub table_ids: Option<HashSet<TableId>>,
1207 pub create_mview_progress: Vec<PbCreateMviewProgress>,
1208 pub list_finished_source_ids: Vec<PbListFinishedSource>,
1209 pub load_finished_source_ids: Vec<PbLoadFinishedSource>,
1210 pub truncate_tables: Vec<TableId>,
1211 pub refresh_finished_tables: Vec<TableId>,
1212 pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
1213}
1214
1215impl PartialGraphManagedBarrierState {
1216 pub(super) fn collect(&mut self, actor_id: impl Into<ActorId>, epoch: EpochPair) {
1218 let actor_id = actor_id.into();
1219 tracing::debug!(
1220 target: "events::stream::barrier::manager::collect",
1221 ?epoch, %actor_id, state = ?self.epoch_barrier_state_map,
1222 "collect_barrier",
1223 );
1224
1225 match self.epoch_barrier_state_map.get_mut(&epoch.prev) {
1226 None => {
1227 panic!(
1231 "cannot collect new actor barrier {:?} at current state: None",
1232 epoch,
1233 )
1234 }
1235 Some(&mut BarrierState {
1236 ref barrier,
1237 inner:
1238 ManagedBarrierStateInner::Issued(IssuedState {
1239 ref mut remaining_actors,
1240 ..
1241 }),
1242 ..
1243 }) => {
1244 let exist = remaining_actors.remove(&actor_id);
1245 assert!(
1246 exist,
1247 "the actor doesn't exist. actor_id: {:?}, curr_epoch: {:?}",
1248 actor_id, epoch.curr
1249 );
1250 assert_eq!(barrier.epoch.curr, epoch.curr);
1251 }
1252 Some(BarrierState { inner, .. }) => {
1253 panic!(
1254 "cannot collect new actor barrier {:?} at current state: {:?}",
1255 epoch, inner
1256 )
1257 }
1258 }
1259 }
1260
1261 pub(super) fn transform_to_issued(
1264 &mut self,
1265 barrier: &Barrier,
1266 actor_ids_to_collect: impl IntoIterator<Item = ActorId>,
1267 table_ids: HashSet<TableId>,
1268 ) {
1269 let timer = self
1270 .streaming_metrics
1271 .barrier_inflight_latency
1272 .start_timer();
1273
1274 if let Some(hummock) = self.state_store.as_hummock() {
1275 hummock.start_epoch(barrier.epoch.curr, table_ids.clone());
1276 }
1277
1278 let table_ids = match barrier.kind {
1279 BarrierKind::Unspecified => {
1280 unreachable!()
1281 }
1282 BarrierKind::Initial => {
1283 assert!(
1284 self.prev_barrier_table_ids.is_none(),
1285 "non empty table_ids at initial barrier: {:?}",
1286 self.prev_barrier_table_ids
1287 );
1288 info!(epoch = ?barrier.epoch, "initialize at Initial barrier");
1289 self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1290 None
1291 }
1292 BarrierKind::Barrier => {
1293 if let Some((prev_epoch, prev_table_ids)) = self.prev_barrier_table_ids.as_mut() {
1294 assert_eq!(prev_epoch.curr, barrier.epoch.prev);
1295 assert_eq!(prev_table_ids, &table_ids);
1296 *prev_epoch = barrier.epoch;
1297 } else {
1298 info!(epoch = ?barrier.epoch, "initialize at non-checkpoint barrier");
1299 self.prev_barrier_table_ids = Some((barrier.epoch, table_ids));
1300 }
1301 None
1302 }
1303 BarrierKind::Checkpoint => Some(
1304 if let Some((prev_epoch, prev_table_ids)) = self
1305 .prev_barrier_table_ids
1306 .replace((barrier.epoch, table_ids))
1307 && prev_epoch.curr == barrier.epoch.prev
1308 {
1309 prev_table_ids
1310 } else {
1311 debug!(epoch = ?barrier.epoch, "reinitialize at Checkpoint barrier");
1312 HashSet::new()
1313 },
1314 ),
1315 };
1316
1317 if let Some(&mut BarrierState { ref inner, .. }) =
1318 self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev)
1319 {
1320 {
1321 panic!(
1322 "barrier epochs{:?} state has already been `Issued`. Current state: {:?}",
1323 barrier.epoch, inner
1324 );
1325 }
1326 };
1327
1328 self.epoch_barrier_state_map.insert(
1329 barrier.epoch.prev,
1330 BarrierState {
1331 barrier: barrier.clone(),
1332 inner: ManagedBarrierStateInner::Issued(IssuedState {
1333 remaining_actors: BTreeSet::from_iter(actor_ids_to_collect),
1334 barrier_inflight_latency: timer,
1335 }),
1336 table_ids,
1337 },
1338 );
1339 }
1340
1341 #[cfg(test)]
1342 async fn pop_next_completed_epoch(&mut self) -> u64 {
1343 if let Some(barrier) = self.may_have_collected_all() {
1344 self.pop_barrier_to_complete(barrier.epoch.prev);
1345 return barrier.epoch.prev;
1346 }
1347 pending().await
1348 }
1349}
1350
1351#[cfg(test)]
1352mod tests {
1353 use std::collections::HashSet;
1354
1355 use risingwave_common::util::epoch::test_epoch;
1356
1357 use crate::executor::Barrier;
1358 use crate::task::barrier_worker::managed_state::PartialGraphManagedBarrierState;
1359
1360 #[tokio::test]
1361 async fn test_managed_state_add_actor() {
1362 let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1363 let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1364 let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1365 let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1366 let actor_ids_to_collect1 = HashSet::from([1.into(), 2.into()]);
1367 let actor_ids_to_collect2 = HashSet::from([1.into(), 2.into()]);
1368 let actor_ids_to_collect3 = HashSet::from([1.into(), 2.into(), 3.into()]);
1369 managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1370 managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1371 managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1372 managed_barrier_state.collect(1, barrier1.epoch);
1373 managed_barrier_state.collect(2, barrier1.epoch);
1374 assert_eq!(
1375 managed_barrier_state.pop_next_completed_epoch().await,
1376 test_epoch(0)
1377 );
1378 assert_eq!(
1379 managed_barrier_state
1380 .epoch_barrier_state_map
1381 .first_key_value()
1382 .unwrap()
1383 .0,
1384 &test_epoch(1)
1385 );
1386 managed_barrier_state.collect(1, barrier2.epoch);
1387 managed_barrier_state.collect(1, barrier3.epoch);
1388 managed_barrier_state.collect(2, barrier2.epoch);
1389 assert_eq!(
1390 managed_barrier_state.pop_next_completed_epoch().await,
1391 test_epoch(1)
1392 );
1393 assert_eq!(
1394 managed_barrier_state
1395 .epoch_barrier_state_map
1396 .first_key_value()
1397 .unwrap()
1398 .0,
1399 &test_epoch(2)
1400 );
1401 managed_barrier_state.collect(2, barrier3.epoch);
1402 managed_barrier_state.collect(3, barrier3.epoch);
1403 assert_eq!(
1404 managed_barrier_state.pop_next_completed_epoch().await,
1405 test_epoch(2)
1406 );
1407 assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1408 }
1409
1410 #[tokio::test]
1411 async fn test_managed_state_stop_actor() {
1412 let mut managed_barrier_state = PartialGraphManagedBarrierState::for_test();
1413 let barrier1 = Barrier::new_test_barrier(test_epoch(1));
1414 let barrier2 = Barrier::new_test_barrier(test_epoch(2));
1415 let barrier3 = Barrier::new_test_barrier(test_epoch(3));
1416 let actor_ids_to_collect1 = HashSet::from([1.into(), 2.into(), 3.into(), 4.into()]);
1417 let actor_ids_to_collect2 = HashSet::from([1.into(), 2.into(), 3.into()]);
1418 let actor_ids_to_collect3 = HashSet::from([1.into(), 2.into()]);
1419 managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new());
1420 managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new());
1421 managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new());
1422
1423 managed_barrier_state.collect(1, barrier1.epoch);
1424 managed_barrier_state.collect(1, barrier2.epoch);
1425 managed_barrier_state.collect(1, barrier3.epoch);
1426 managed_barrier_state.collect(2, barrier1.epoch);
1427 managed_barrier_state.collect(2, barrier2.epoch);
1428 managed_barrier_state.collect(2, barrier3.epoch);
1429 assert_eq!(
1430 managed_barrier_state
1431 .epoch_barrier_state_map
1432 .first_key_value()
1433 .unwrap()
1434 .0,
1435 &0
1436 );
1437 managed_barrier_state.collect(3, barrier1.epoch);
1438 managed_barrier_state.collect(3, barrier2.epoch);
1439 assert_eq!(
1440 managed_barrier_state
1441 .epoch_barrier_state_map
1442 .first_key_value()
1443 .unwrap()
1444 .0,
1445 &0
1446 );
1447 managed_barrier_state.collect(4, barrier1.epoch);
1448 assert_eq!(
1449 managed_barrier_state.pop_next_completed_epoch().await,
1450 test_epoch(0)
1451 );
1452 assert_eq!(
1453 managed_barrier_state.pop_next_completed_epoch().await,
1454 test_epoch(1)
1455 );
1456 assert_eq!(
1457 managed_barrier_state.pop_next_completed_epoch().await,
1458 test_epoch(2)
1459 );
1460 assert!(managed_barrier_state.epoch_barrier_state_map.is_empty());
1461 }
1462}