1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Display;
18use std::future::{pending, poll_fn};
19use std::sync::Arc;
20use std::task::Poll;
21
22use anyhow::anyhow;
23use await_tree::{InstrumentAwait, SpanExt};
24use futures::future::{BoxFuture, join, join_all};
25use futures::stream::{BoxStream, FuturesOrdered};
26use futures::{FutureExt, StreamExt, TryFutureExt};
27use itertools::Itertools;
28use risingwave_pb::stream_plan::barrier::BarrierKind;
29use risingwave_pb::stream_service::barrier_complete_response::{
30 PbCdcSourceOffsetUpdated, PbCdcTableBackfillProgress, PbCreateMviewProgress,
31 PbListFinishedSource, PbLoadFinishedSource, PbLocalSstableInfo,
32};
33use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
34use risingwave_storage::store_impl::AsHummock;
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot;
38use tokio::task::JoinHandle;
39use tokio::{select, spawn};
40use tonic::{Code, Status};
41use tracing::warn;
42
43use self::managed_state::ManagedBarrierState;
44use crate::error::{ScoredStreamError, StreamError, StreamResult};
45#[cfg(test)]
46use crate::task::LocalBarrierManager;
47use crate::task::managed_state::{BarrierToComplete, ResetPartialGraphOutput};
48use crate::task::{
49 ActorId, AtomicU64Ref, CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY, ConfigOverrideCache, FragmentId,
50 PartialGraphId, StreamActorManager, StreamEnvironment, UpDownActorIds,
51};
52pub mod managed_state;
53#[cfg(test)]
54mod tests;
55
56use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
57use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
58use risingwave_pb::stream_service::streaming_control_stream_request::{
59 InitRequest, Request, ResetPartialGraphsRequest,
60};
61use risingwave_pb::stream_service::streaming_control_stream_response::{
62 InitResponse, ReportPartialGraphFailureResponse, ResetPartialGraphResponse, Response,
63 ShutdownResponse,
64};
65use risingwave_pb::stream_service::{
66 BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
67 StreamingControlStreamResponse, streaming_control_stream_response,
68};
69
70use crate::executor::Barrier;
71use crate::executor::exchange::permit::Receiver;
72use crate::executor::monitor::StreamingMetrics;
73use crate::task::barrier_worker::managed_state::{
74 ManagedBarrierStateDebugInfo, ManagedBarrierStateEvent, PartialGraphState, PartialGraphStatus,
75};
76
77pub const ENABLE_BARRIER_AGGREGATION: bool = false;
80
81#[derive(Debug)]
83pub struct BarrierCompleteResult {
84 pub sync_result: Option<SyncResult>,
86
87 pub create_mview_progress: Vec<PbCreateMviewProgress>,
89
90 pub list_finished_source_ids: Vec<PbListFinishedSource>,
92
93 pub load_finished_source_ids: Vec<PbLoadFinishedSource>,
95
96 pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
97
98 pub cdc_source_offset_updated: Vec<PbCdcSourceOffsetUpdated>,
100
101 pub truncate_tables: Vec<TableId>,
103 pub refresh_finished_tables: Vec<TableId>,
105}
106
107pub(super) struct ControlStreamHandle {
111 #[expect(clippy::type_complexity)]
112 pair: Option<(
113 UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
114 BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
115 )>,
116}
117
118impl ControlStreamHandle {
119 fn empty() -> Self {
120 Self { pair: None }
121 }
122
123 pub(super) fn new(
124 sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
125 request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
126 ) -> Self {
127 Self {
128 pair: Some((sender, request_stream)),
129 }
130 }
131
132 pub(super) fn connected(&self) -> bool {
133 self.pair.is_some()
134 }
135
136 fn reset_stream_with_err(&mut self, err: Status) {
137 if let Some((sender, _)) = self.pair.take() {
138 let err = TonicStatusWrapper::new(err);
140 warn!(error = %err.as_report(), "control stream reset with error");
141
142 let err = err.into_inner();
143 if sender.send(Err(err)).is_err() {
144 warn!("failed to notify reset of control stream");
145 }
146 }
147 }
148
149 async fn shutdown_stream(&mut self) {
152 if let Some((sender, _)) = self.pair.take() {
153 if sender
154 .send(Ok(StreamingControlStreamResponse {
155 response: Some(streaming_control_stream_response::Response::Shutdown(
156 ShutdownResponse::default(),
157 )),
158 }))
159 .is_err()
160 {
161 warn!("failed to notify shutdown of control stream");
162 } else {
163 tracing::info!("waiting for meta service to close control stream...");
164
165 sender.closed().await;
172 }
173 } else {
174 debug!("control stream has been reset, ignore shutdown");
175 }
176 }
177
178 pub(super) fn ack_reset_partial_graph(
179 &mut self,
180 partial_graph_id: PartialGraphId,
181 root_err: Option<ScoredStreamError>,
182 ) {
183 self.send_response(Response::ResetPartialGraph(ResetPartialGraphResponse {
184 partial_graph_id,
185 root_err: root_err.map(|err| PbScoredError {
186 err_msg: err.error.to_report_string(),
187 score: err.score.0,
188 }),
189 }));
190 }
191
192 fn send_response(&mut self, response: streaming_control_stream_response::Response) {
193 if let Some((sender, _)) = self.pair.as_ref() {
194 if sender
195 .send(Ok(StreamingControlStreamResponse {
196 response: Some(response),
197 }))
198 .is_err()
199 {
200 self.pair = None;
201 warn!("fail to send response. control stream reset");
202 }
203 } else {
204 debug!(?response, "control stream has been reset. ignore response");
205 }
206 }
207
208 async fn next_request(&mut self) -> StreamingControlStreamRequest {
209 if let Some((_, stream)) = &mut self.pair {
210 match stream.next().await {
211 Some(Ok(request)) => {
212 return request;
213 }
214 Some(Err(e)) => self.reset_stream_with_err(
215 anyhow!(TonicStatusWrapper::new(e)) .context("failed to get request")
217 .to_status_unnamed(Code::Internal),
218 ),
219 None => self.reset_stream_with_err(Status::internal("end of stream")),
220 }
221 }
222 pending().await
223 }
224}
225
226pub(super) enum TakeReceiverRequest {
227 Remote {
228 result_sender: oneshot::Sender<StreamResult<Receiver>>,
229 upstream_fragment_id: FragmentId,
230 },
231 Local(permit::Sender),
232}
233
234#[derive(strum_macros::Display)]
238pub(super) enum LocalActorOperation {
239 NewControlStream {
240 handle: ControlStreamHandle,
241 init_request: InitRequest,
242 },
243 TakeReceiver {
244 partial_graph_id: PartialGraphId,
245 term_id: String,
246 ids: UpDownActorIds,
247 request: TakeReceiverRequest,
248 },
249 #[cfg(test)]
250 GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
251 #[cfg(test)]
252 TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
253 #[cfg(test)]
254 Flush(oneshot::Sender<()>),
255 InspectState {
256 result_sender: oneshot::Sender<String>,
257 },
258 Shutdown {
259 result_sender: oneshot::Sender<()>,
260 },
261}
262
263pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
264 managed_barrier_state:
265 HashMap<PartialGraphId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
266 has_control_stream_connected: bool,
267}
268
269impl Display for LocalBarrierWorkerDebugInfo<'_> {
270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 writeln!(
272 f,
273 "\nhas_control_stream_connected: {}",
274 self.has_control_stream_connected
275 )?;
276
277 for (partial_graph_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
278 writeln!(
279 f,
280 "partial graph {} status: {} managed_barrier_state:\n{}",
281 partial_graph_id,
282 status,
283 managed_barrier_state
284 .as_ref()
285 .map(ToString::to_string)
286 .unwrap_or_default()
287 )?;
288 }
289 Ok(())
290 }
291}
292
293pub(super) struct LocalBarrierWorker {
301 pub(super) state: ManagedBarrierState,
303
304 await_epoch_completed_futures:
306 HashMap<PartialGraphId, FuturesOrdered<AwaitEpochCompletedFuture>>,
307
308 control_stream_handle: ControlStreamHandle,
309
310 pub(super) actor_manager: Arc<StreamActorManager>,
311
312 pub(super) term_id: String,
313}
314
315impl LocalBarrierWorker {
316 pub(super) fn new(actor_manager: Arc<StreamActorManager>, term_id: String) -> Self {
317 Self {
318 state: Default::default(),
319 await_epoch_completed_futures: Default::default(),
320 control_stream_handle: ControlStreamHandle::empty(),
321 actor_manager,
322 term_id,
323 }
324 }
325
326 fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
327 LocalBarrierWorkerDebugInfo {
328 managed_barrier_state: self
329 .state
330 .partial_graphs
331 .iter()
332 .map(|(partial_graph_id, status)| {
333 (*partial_graph_id, {
334 match status {
335 PartialGraphStatus::ReceivedExchangeRequest(_) => {
336 ("ReceivedExchangeRequest".to_owned(), None)
337 }
338 PartialGraphStatus::Running(state) => {
339 ("running".to_owned(), Some(state.to_debug_info()))
340 }
341 PartialGraphStatus::Suspended(state) => {
342 (format!("suspended: {:?}", state.suspend_time), None)
343 }
344 PartialGraphStatus::Resetting => ("resetting".to_owned(), None),
345 PartialGraphStatus::Unspecified => {
346 unreachable!()
347 }
348 }
349 })
350 })
351 .collect(),
352 has_control_stream_connected: self.control_stream_handle.connected(),
353 }
354 }
355
356 async fn next_completed_epoch(
357 futures: &mut HashMap<PartialGraphId, FuturesOrdered<AwaitEpochCompletedFuture>>,
358 ) -> (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>) {
359 poll_fn(|cx| {
360 for (partial_graph_id, futures) in &mut *futures {
361 if let Poll::Ready(Some((barrier, result))) = futures.poll_next_unpin(cx) {
362 return Poll::Ready((*partial_graph_id, barrier, result));
363 }
364 }
365 Poll::Pending
366 })
367 .await
368 }
369
370 async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
371 loop {
372 select! {
373 biased;
374 event = self.state.next_event() => {
375 match event {
376 ManagedBarrierStateEvent::BarrierCollected{
377 partial_graph_id,
378 barrier,
379 } => {
380 self.complete_barrier(partial_graph_id, barrier.epoch.prev);
383 }
384 ManagedBarrierStateEvent::ActorError{
385 partial_graph_id,
386 actor_id,
387 err,
388 } => {
389 self.on_partial_graph_failure(partial_graph_id, Some(actor_id), err, "recv actor failure");
390 }
391 ManagedBarrierStateEvent::PartialGraphsReset(output) => {
392 for (partial_graph_id, output) in output {
393 self.ack_partial_graph_reset(partial_graph_id, Some(output));
394 }
395 }
396 ManagedBarrierStateEvent::RegisterLocalUpstreamOutput{
397 actor_id,
398 upstream_actor_id,
399 upstream_partial_graph_id,
400 tx
401 } => {
402 self.handle_actor_op(LocalActorOperation::TakeReceiver {
403 partial_graph_id: upstream_partial_graph_id,
404 term_id: self.term_id.clone(),
405 ids: (upstream_actor_id, actor_id),
406 request: TakeReceiverRequest::Local(tx),
407 });
408 }
409 }
410 }
411 (partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
412 match result {
413 Ok(result) => {
414 self.on_epoch_completed(partial_graph_id, barrier.epoch.prev, result);
415 }
416 Err(err) => {
417 self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {:?} {:?}", partial_graph_id, barrier.epoch, err.as_report())));
421 }
422 }
423 },
424 actor_op = actor_op_rx.recv() => {
425 if let Some(actor_op) = actor_op {
426 match actor_op {
427 LocalActorOperation::NewControlStream { handle, init_request } => {
428 self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
429 self.reset(init_request).await;
430 self.control_stream_handle = handle;
431 self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
432 }
433 LocalActorOperation::Shutdown { result_sender } => {
434 if self.state.partial_graphs.values().any(|graph| {
435 match graph {
436 PartialGraphStatus::Running(graph) => {
437 !graph.actor_states.is_empty()
438 }
439 PartialGraphStatus::Suspended(_) | PartialGraphStatus::Resetting |
440 PartialGraphStatus::ReceivedExchangeRequest(_) => {
441 false
442 }
443 PartialGraphStatus::Unspecified => {
444 unreachable!()
445 }
446 }
447 }) {
448 tracing::warn!(
449 "shutdown with running actors, scaling or migration will be triggered"
450 );
451 }
452 self.control_stream_handle.shutdown_stream().await;
453 let _ = result_sender.send(());
454 }
455 actor_op => {
456 self.handle_actor_op(actor_op);
457 }
458 }
459 }
460 else {
461 break;
462 }
463 },
464 request = self.control_stream_handle.next_request() => {
465 let result = self.handle_streaming_control_request(request.request.expect("non empty"));
466 if let Err((partial_graph_id, err)) = result {
467 self.on_partial_graph_failure(partial_graph_id, None, err, "failed to inject barrier");
468 }
469 },
470 }
471 }
472 }
473
474 fn handle_streaming_control_request(
475 &mut self,
476 request: Request,
477 ) -> Result<(), (PartialGraphId, StreamError)> {
478 match request {
479 Request::InjectBarrier(req) => {
480 let partial_graph_id = req.partial_graph_id;
481 let result: StreamResult<()> = try {
482 let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
483 self.send_barrier(&barrier, req)?;
484 };
485 result.map_err(|e| (partial_graph_id, e))?;
486 Ok(())
487 }
488 Request::RemovePartialGraph(req) => {
489 self.remove_partial_graphs(req.partial_graph_ids);
490 Ok(())
491 }
492 Request::CreatePartialGraph(req) => {
493 self.add_partial_graph(req.partial_graph_id);
494 Ok(())
495 }
496 Request::ResetPartialGraphs(req) => {
497 self.reset_partial_graphs(req);
498 Ok(())
499 }
500 Request::Init(_) => {
501 unreachable!()
502 }
503 }
504 }
505
506 fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
507 match actor_op {
508 LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
509 unreachable!("event {actor_op} should be handled separately in async context")
510 }
511 LocalActorOperation::TakeReceiver {
512 partial_graph_id,
513 term_id,
514 ids,
515 request,
516 } => {
517 let err = if self.term_id != term_id {
518 {
519 warn!(
520 ?ids,
521 term_id,
522 current_term_id = self.term_id,
523 "take receiver on unmatched term_id"
524 );
525 anyhow!(
526 "take receiver {:?} on unmatched term_id {} to current term_id {}",
527 ids,
528 term_id,
529 self.term_id
530 )
531 }
532 } else {
533 match self.state.partial_graphs.entry(partial_graph_id) {
534 Entry::Occupied(mut entry) => match entry.get_mut() {
535 PartialGraphStatus::ReceivedExchangeRequest(pending_requests) => {
536 pending_requests.push((ids, request));
537 return;
538 }
539 PartialGraphStatus::Running(graph) => {
540 let (upstream_actor_id, actor_id) = ids;
541 graph.new_actor_output_request(
542 actor_id,
543 upstream_actor_id,
544 request,
545 );
546 return;
547 }
548 PartialGraphStatus::Suspended(_) => {
549 anyhow!("partial graph suspended")
550 }
551 PartialGraphStatus::Resetting => {
552 anyhow!("partial graph resetting")
553 }
554 PartialGraphStatus::Unspecified => {
555 unreachable!()
556 }
557 },
558 Entry::Vacant(entry) => {
559 entry.insert(PartialGraphStatus::ReceivedExchangeRequest(vec![(
560 ids, request,
561 )]));
562 return;
563 }
564 }
565 };
566 if let TakeReceiverRequest::Remote { result_sender, .. } = request {
567 let _ = result_sender.send(Err(err.into()));
568 }
569 }
570 #[cfg(test)]
571 LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
572 let partial_graph_status = self
573 .state
574 .partial_graphs
575 .get(&crate::task::TEST_PARTIAL_GRAPH_ID)
576 .unwrap();
577 let partial_graph_state = risingwave_common::must_match!(partial_graph_status, PartialGraphStatus::Running(database_state) => database_state);
578 let _ = sender.send(partial_graph_state.local_barrier_manager.clone());
579 }
580 #[cfg(test)]
581 LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
582 let partial_graph_status = self
583 .state
584 .partial_graphs
585 .get_mut(&crate::task::TEST_PARTIAL_GRAPH_ID)
586 .unwrap();
587
588 let partial_graph_state = risingwave_common::must_match!(partial_graph_status, PartialGraphStatus::Running(database_state) => database_state);
589 assert!(!partial_graph_state.actor_states.contains_key(&actor_id));
590 let requests = partial_graph_state
591 .actor_pending_new_output_requests
592 .remove(&actor_id)
593 .unwrap();
594 let _ = sender.send(requests);
595 }
596 #[cfg(test)]
597 LocalActorOperation::Flush(sender) => {
598 use futures::FutureExt;
599 while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
600 self.handle_streaming_control_request(
601 request.request.expect("should not be empty"),
602 )
603 .unwrap();
604 }
605 while let Some(event) = self.state.next_event().now_or_never() {
606 match event {
607 ManagedBarrierStateEvent::BarrierCollected {
608 barrier,
609 partial_graph_id,
610 } => {
611 self.complete_barrier(partial_graph_id, barrier.epoch.prev);
612 }
613 ManagedBarrierStateEvent::ActorError { .. }
614 | ManagedBarrierStateEvent::PartialGraphsReset { .. }
615 | ManagedBarrierStateEvent::RegisterLocalUpstreamOutput { .. } => {
616 unreachable!()
617 }
618 }
619 }
620 sender.send(()).unwrap()
621 }
622 LocalActorOperation::InspectState { result_sender } => {
623 let debug_info = self.to_debug_info();
624 let _ = result_sender.send(debug_info.to_string());
625 }
626 }
627 }
628}
629
630mod await_epoch_completed_future {
631 use std::future::Future;
632
633 use futures::FutureExt;
634 use futures::future::BoxFuture;
635 use risingwave_common::id::TableId;
636 use risingwave_hummock_sdk::SyncResult;
637 use risingwave_pb::stream_service::barrier_complete_response::{
638 PbCdcSourceOffsetUpdated, PbCdcTableBackfillProgress, PbCreateMviewProgress,
639 PbListFinishedSource, PbLoadFinishedSource,
640 };
641
642 use crate::error::StreamResult;
643 use crate::executor::Barrier;
644 use crate::task::{BarrierCompleteResult, await_tree_key};
645
646 pub(super) type AwaitEpochCompletedFuture =
647 impl Future<Output = (Barrier, StreamResult<BarrierCompleteResult>)> + 'static;
648
649 #[define_opaque(AwaitEpochCompletedFuture)]
650 #[expect(clippy::too_many_arguments)]
651 pub(super) fn instrument_complete_barrier_future(
652 complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
653 barrier: Barrier,
654 barrier_await_tree_reg: Option<&await_tree::Registry>,
655 create_mview_progress: Vec<PbCreateMviewProgress>,
656 list_finished_source_ids: Vec<PbListFinishedSource>,
657 load_finished_source_ids: Vec<PbLoadFinishedSource>,
658 cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
659 cdc_source_offset_updated: Vec<PbCdcSourceOffsetUpdated>,
660 truncate_tables: Vec<TableId>,
661 refresh_finished_tables: Vec<TableId>,
662 ) -> AwaitEpochCompletedFuture {
663 let prev_epoch = barrier.epoch.prev;
664 let future = async move {
665 if let Some(future) = complete_barrier_future {
666 let result = future.await;
667 result.map(Some)
668 } else {
669 Ok(None)
670 }
671 }
672 .map(move |result| {
673 (
674 barrier,
675 result.map(|sync_result| BarrierCompleteResult {
676 sync_result,
677 create_mview_progress,
678 list_finished_source_ids,
679 load_finished_source_ids,
680 cdc_table_backfill_progress,
681 cdc_source_offset_updated,
682 truncate_tables,
683 refresh_finished_tables,
684 }),
685 )
686 });
687 if let Some(reg) = barrier_await_tree_reg {
688 reg.register(
689 await_tree_key::BarrierAwait { prev_epoch },
690 format!("SyncEpoch({})", prev_epoch),
691 )
692 .instrument(future)
693 .left_future()
694 } else {
695 future.right_future()
696 }
697 }
698}
699
700use await_epoch_completed_future::*;
701use risingwave_common::catalog::TableId;
702use risingwave_pb::hummock::vector_index_delta::PbVectorIndexAdds;
703use risingwave_storage::{StateStoreImpl, dispatch_state_store};
704
705use crate::executor::exchange::permit;
706
707fn sync_epoch(
708 state_store: &StateStoreImpl,
709 streaming_metrics: &StreamingMetrics,
710 prev_epoch: u64,
711 table_ids: HashSet<TableId>,
712) -> BoxFuture<'static, StreamResult<SyncResult>> {
713 let timer = streaming_metrics.barrier_sync_latency.start_timer();
714
715 let state_store = state_store.clone();
716 let future = async move {
717 dispatch_state_store!(state_store, hummock, {
718 hummock.sync(vec![(prev_epoch, table_ids)]).await
719 })
720 };
721
722 future
723 .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
724 .inspect_ok(move |_| {
725 timer.observe_duration();
726 })
727 .map_err(move |e| {
728 tracing::error!(
729 prev_epoch,
730 error = %e.as_report(),
731 "Failed to sync state store",
732 );
733 e.into()
734 })
735 .boxed()
736}
737
738impl LocalBarrierWorker {
739 fn complete_barrier(&mut self, partial_graph_id: PartialGraphId, prev_epoch: u64) {
740 {
741 let Some(graph_state) = self
742 .state
743 .partial_graphs
744 .get_mut(&partial_graph_id)
745 .expect("should exist")
746 .state_for_request()
747 else {
748 return;
749 };
750 let BarrierToComplete {
751 barrier,
752 table_ids,
753 create_mview_progress,
754 list_finished_source_ids,
755 load_finished_source_ids,
756 cdc_table_backfill_progress,
757 cdc_source_offset_updated,
758 truncate_tables,
759 refresh_finished_tables,
760 } = graph_state.pop_barrier_to_complete(prev_epoch);
761
762 let complete_barrier_future = match &barrier.kind {
763 BarrierKind::Unspecified => unreachable!(),
764 BarrierKind::Initial => {
765 tracing::info!(
766 epoch = prev_epoch,
767 "ignore sealing data for the first barrier"
768 );
769 tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
770 None
771 }
772 BarrierKind::Barrier => None,
773 BarrierKind::Checkpoint => Some(sync_epoch(
774 &self.actor_manager.env.state_store(),
775 &self.actor_manager.streaming_metrics,
776 prev_epoch,
777 table_ids.expect("should be Some on BarrierKind::Checkpoint"),
778 )),
779 };
780
781 self.await_epoch_completed_futures
782 .entry(partial_graph_id)
783 .or_default()
784 .push_back({
785 instrument_complete_barrier_future(
786 complete_barrier_future,
787 barrier,
788 self.actor_manager.await_tree_reg.as_ref(),
789 create_mview_progress,
790 list_finished_source_ids,
791 load_finished_source_ids,
792 cdc_table_backfill_progress,
793 cdc_source_offset_updated,
794 truncate_tables,
795 refresh_finished_tables,
796 )
797 });
798 }
799 }
800
801 fn on_epoch_completed(
802 &mut self,
803 partial_graph_id: PartialGraphId,
804 epoch: u64,
805 result: BarrierCompleteResult,
806 ) {
807 let BarrierCompleteResult {
808 create_mview_progress,
809 sync_result,
810 list_finished_source_ids,
811 load_finished_source_ids,
812 cdc_table_backfill_progress,
813 cdc_source_offset_updated,
814 truncate_tables,
815 refresh_finished_tables,
816 } = result;
817
818 let (synced_sstables, table_watermarks, old_value_ssts, vector_index_adds) = sync_result
819 .map(|sync_result| {
820 (
821 sync_result.uncommitted_ssts,
822 sync_result.table_watermarks,
823 sync_result.old_value_ssts,
824 sync_result.vector_index_adds,
825 )
826 })
827 .unwrap_or_default();
828
829 let result = {
830 {
831 streaming_control_stream_response::Response::CompleteBarrier(
832 BarrierCompleteResponse {
833 request_id: "todo".to_owned(),
834 partial_graph_id,
835 epoch,
836 status: None,
837 create_mview_progress,
838 synced_sstables: synced_sstables
839 .into_iter()
840 .map(
841 |LocalSstableInfo {
842 sst_info,
843 table_stats,
844 created_at,
845 }| PbLocalSstableInfo {
846 sst: Some(sst_info.into()),
847 table_stats_map: to_prost_table_stats_map(table_stats),
848 created_at,
849 },
850 )
851 .collect_vec(),
852 worker_id: self.actor_manager.env.worker_id(),
853 table_watermarks: table_watermarks
854 .into_iter()
855 .map(|(key, value)| (key, value.into()))
856 .collect(),
857 old_value_sstables: old_value_ssts
858 .into_iter()
859 .map(|sst| sst.sst_info.into())
860 .collect(),
861 list_finished_sources: list_finished_source_ids,
862 load_finished_sources: load_finished_source_ids,
863 cdc_source_offset_updated,
864 vector_index_adds: vector_index_adds
865 .into_iter()
866 .map(|(table_id, adds)| {
867 (
868 table_id,
869 PbVectorIndexAdds {
870 adds: adds.into_iter().map(|add| add.into()).collect(),
871 },
872 )
873 })
874 .collect(),
875 cdc_table_backfill_progress,
876 truncate_tables,
877 refresh_finished_tables,
878 },
879 )
880 }
881 };
882
883 self.control_stream_handle.send_response(result);
884 }
885
886 fn send_barrier(
893 &mut self,
894 barrier: &Barrier,
895 request: InjectBarrierRequest,
896 ) -> StreamResult<()> {
897 debug!(
898 target: "events::stream::barrier::manager::send",
899 "send barrier {:?}, actor_ids_to_collect = {:?}",
900 barrier,
901 request.actor_ids_to_collect
902 );
903
904 let status = self
905 .state
906 .partial_graphs
907 .get_mut(&request.partial_graph_id)
908 .expect("should exist");
909 if let Some(state) = status.state_for_request() {
910 state.transform_to_issued(barrier, request)?;
911 }
912 Ok(())
913 }
914
915 fn remove_partial_graphs(
916 &mut self,
917 partial_graph_ids: impl IntoIterator<Item = PartialGraphId>,
918 ) {
919 for partial_graph_id in partial_graph_ids {
920 if let Some(mut graph) = self.state.partial_graphs.remove(&partial_graph_id) {
921 if let Some(graph) = graph.state_for_request() {
922 assert!(
923 graph.graph_state.is_empty(),
924 "non empty graph to be removed: {}",
925 &graph.graph_state
926 );
927 }
928 } else {
929 warn!(
930 partial_graph_id = %partial_graph_id,
931 "no partial graph to remove"
932 );
933 }
934 }
935 }
936
937 fn add_partial_graph(&mut self, partial_graph_id: PartialGraphId) {
938 match self.state.partial_graphs.entry(partial_graph_id) {
939 Entry::Occupied(entry) => {
940 let status = entry.into_mut();
941 if let PartialGraphStatus::ReceivedExchangeRequest(pending_requests) = status {
942 let mut graph = PartialGraphState::new(
943 partial_graph_id,
944 self.term_id.clone(),
945 self.actor_manager.clone(),
946 );
947 for ((upstream_actor_id, actor_id), request) in pending_requests.drain(..) {
948 graph.new_actor_output_request(actor_id, upstream_actor_id, request);
949 }
950 *status = PartialGraphStatus::Running(graph);
951 } else {
952 panic!("duplicated partial graph: {}", partial_graph_id);
953 }
954
955 status
956 }
957 Entry::Vacant(entry) => {
958 entry.insert(PartialGraphStatus::Running(PartialGraphState::new(
959 partial_graph_id,
960 self.term_id.clone(),
961 self.actor_manager.clone(),
962 )))
963 }
964 };
965 }
966
967 fn reset_partial_graphs(&mut self, req: ResetPartialGraphsRequest) {
968 let mut table_ids_to_clear = HashSet::new();
969 let mut reset_futures = HashMap::new();
970 for partial_graph_id in req.partial_graph_ids {
971 if let Some(status) = self.state.partial_graphs.get_mut(&partial_graph_id) {
972 let reset_future = status.start_reset(
973 partial_graph_id,
974 self.await_epoch_completed_futures.remove(&partial_graph_id),
975 &mut table_ids_to_clear,
976 );
977 reset_futures.insert(partial_graph_id, reset_future);
978 } else {
979 self.ack_partial_graph_reset(partial_graph_id, None);
980 }
981 }
982 if reset_futures.is_empty() {
983 assert!(table_ids_to_clear.is_empty());
984 return;
985 }
986 let state_store = self.actor_manager.env.state_store();
987 self.state.resetting_graphs.push(spawn(async move {
988 let outputs =
989 join_all(
990 reset_futures
991 .into_iter()
992 .map(|(partial_graph_id, future)| async move {
993 (partial_graph_id, future.await)
994 }),
995 )
996 .await;
997 if !table_ids_to_clear.is_empty()
998 && let Some(hummock) = state_store.as_hummock()
999 {
1000 hummock.clear_tables(table_ids_to_clear).await;
1001 }
1002 outputs
1003 }));
1004 }
1005
1006 fn ack_partial_graph_reset(
1007 &mut self,
1008 partial_graph_id: PartialGraphId,
1009 reset_output: Option<ResetPartialGraphOutput>,
1010 ) {
1011 info!(
1012 %partial_graph_id,
1013 "partial graph reset successfully"
1014 );
1015 assert!(!self.state.partial_graphs.contains_key(&partial_graph_id));
1016 self.await_epoch_completed_futures.remove(&partial_graph_id);
1017 self.control_stream_handle.ack_reset_partial_graph(
1018 partial_graph_id,
1019 reset_output.and_then(|output| output.root_err),
1020 );
1021 }
1022
1023 fn on_partial_graph_failure(
1027 &mut self,
1028 partial_graph_id: PartialGraphId,
1029 failed_actor: Option<ActorId>,
1030 err: StreamError,
1031 message: impl Into<String>,
1032 ) {
1033 let message = message.into();
1034 error!(%partial_graph_id, ?failed_actor, message, err = ?err.as_report(), "suspend partial graph on error");
1035 let completing_futures = self.await_epoch_completed_futures.remove(&partial_graph_id);
1036 self.state
1037 .partial_graphs
1038 .get_mut(&partial_graph_id)
1039 .expect("should exist")
1040 .suspend(failed_actor, err, completing_futures);
1041 self.control_stream_handle
1042 .send_response(Response::ReportPartialGraphFailure(
1043 ReportPartialGraphFailureResponse { partial_graph_id },
1044 ));
1045 }
1046
1047 async fn reset(&mut self, init_request: InitRequest) {
1049 join(
1050 join_all(
1051 self.state
1052 .partial_graphs
1053 .values_mut()
1054 .map(|graph| graph.abort()),
1055 ),
1056 async {
1057 while let Some(join_result) = self.state.resetting_graphs.next().await {
1058 join_result.expect("failed to join reset partial graphs handle");
1059 }
1060 },
1061 )
1062 .await;
1063 if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
1064 m.clear();
1065 }
1066
1067 if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
1068 hummock
1069 .clear_shared_buffer()
1070 .instrument_await("store_clear_shared_buffer".verbose())
1071 .await
1072 }
1073 self.actor_manager.env.dml_manager_ref().clear();
1074 *self = Self::new(self.actor_manager.clone(), init_request.term_id);
1075 self.actor_manager.env.client_pool().invalidate_all();
1076 }
1077
1078 pub fn spawn(
1080 env: StreamEnvironment,
1081 streaming_metrics: Arc<StreamingMetrics>,
1082 await_tree_reg: Option<await_tree::Registry>,
1083 watermark_epoch: AtomicU64Ref,
1084 actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1085 ) -> JoinHandle<()> {
1086 let runtime = {
1087 let mut builder = tokio::runtime::Builder::new_multi_thread();
1088 if let Some(worker_threads_num) = env.global_config().actor_runtime_worker_threads_num {
1089 builder.worker_threads(worker_threads_num);
1090 }
1091 builder
1092 .thread_name("rw-streaming")
1093 .enable_all()
1094 .build()
1095 .unwrap()
1096 };
1097
1098 let actor_manager = Arc::new(StreamActorManager {
1099 env,
1100 streaming_metrics,
1101 watermark_epoch,
1102 await_tree_reg,
1103 runtime: runtime.into(),
1104 config_override_cache: ConfigOverrideCache::new(CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY),
1105 });
1106 let worker = LocalBarrierWorker::new(actor_manager, "uninitialized".into());
1107 tokio::spawn(worker.run(actor_op_rx))
1108 }
1109}
1110
1111pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1112
1113impl<T> Clone for EventSender<T> {
1114 fn clone(&self) -> Self {
1115 Self(self.0.clone())
1116 }
1117}
1118
1119impl<T> EventSender<T> {
1120 pub(super) fn send_event(&self, event: T) {
1121 self.0.send(event).expect("should be able to send event")
1122 }
1123
1124 pub(super) async fn send_and_await<RSP>(
1125 &self,
1126 make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1127 ) -> StreamResult<RSP> {
1128 let (tx, rx) = oneshot::channel();
1129 let event = make_event(tx);
1130 self.send_event(event);
1131 rx.await
1132 .map_err(|_| anyhow!("barrier manager maybe reset").into())
1133 }
1134}
1135
1136pub(crate) enum NewOutputRequest {
1137 Local(permit::Sender),
1138 Remote(permit::Sender),
1139}
1140
1141#[cfg(test)]
1142pub(crate) mod barrier_test_utils {
1143 use assert_matches::assert_matches;
1144 use futures::StreamExt;
1145 use risingwave_pb::stream_service::streaming_control_stream_request::{
1146 InitRequest, PbCreatePartialGraphRequest,
1147 };
1148 use risingwave_pb::stream_service::{
1149 InjectBarrierRequest, PbStreamingControlStreamRequest, StreamingControlStreamRequest,
1150 StreamingControlStreamResponse, streaming_control_stream_request,
1151 streaming_control_stream_response,
1152 };
1153 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1154 use tokio::sync::oneshot;
1155 use tokio_stream::wrappers::UnboundedReceiverStream;
1156 use tonic::Status;
1157
1158 use crate::executor::Barrier;
1159 use crate::task::barrier_worker::{ControlStreamHandle, EventSender, LocalActorOperation};
1160 use crate::task::{ActorId, LocalBarrierManager, NewOutputRequest, TEST_PARTIAL_GRAPH_ID};
1161
1162 pub(crate) struct LocalBarrierTestEnv {
1163 pub local_barrier_manager: LocalBarrierManager,
1164 pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1165 pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1166 pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1167 }
1168
1169 impl LocalBarrierTestEnv {
1170 pub(crate) async fn for_test() -> Self {
1171 let actor_op_tx = LocalBarrierManager::spawn_for_test();
1172
1173 let (request_tx, request_rx) = unbounded_channel();
1174 let (response_tx, mut response_rx) = unbounded_channel();
1175
1176 request_tx
1177 .send(Ok(PbStreamingControlStreamRequest {
1178 request: Some(
1179 streaming_control_stream_request::Request::CreatePartialGraph(
1180 PbCreatePartialGraphRequest {
1181 partial_graph_id: TEST_PARTIAL_GRAPH_ID,
1182 },
1183 ),
1184 ),
1185 }))
1186 .unwrap();
1187
1188 actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1189 handle: ControlStreamHandle::new(
1190 response_tx,
1191 UnboundedReceiverStream::new(request_rx).boxed(),
1192 ),
1193 init_request: InitRequest {
1194 term_id: "for_test".into(),
1195 },
1196 });
1197
1198 assert_matches!(
1199 response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1200 streaming_control_stream_response::Response::Init(_)
1201 );
1202
1203 let local_barrier_manager = actor_op_tx
1204 .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1205 .await
1206 .unwrap();
1207
1208 Self {
1209 local_barrier_manager,
1210 actor_op_tx,
1211 request_tx,
1212 response_rx,
1213 }
1214 }
1215
1216 pub(crate) fn inject_barrier(
1217 &self,
1218 barrier: &Barrier,
1219 actor_to_collect: impl IntoIterator<Item = ActorId>,
1220 ) {
1221 self.request_tx
1222 .send(Ok(StreamingControlStreamRequest {
1223 request: Some(streaming_control_stream_request::Request::InjectBarrier(
1224 InjectBarrierRequest {
1225 request_id: "".to_owned(),
1226 barrier: Some(barrier.to_protobuf()),
1227 actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1228 table_ids_to_sync: vec![],
1229 partial_graph_id: TEST_PARTIAL_GRAPH_ID,
1230 actors_to_build: vec![],
1231 },
1232 )),
1233 }))
1234 .unwrap();
1235 }
1236
1237 pub(crate) async fn flush_all_events(&self) {
1238 Self::flush_all_events_impl(&self.actor_op_tx).await
1239 }
1240
1241 pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1242 let (tx, rx) = oneshot::channel();
1243 actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1244 rx.await.unwrap()
1245 }
1246
1247 pub(crate) async fn take_pending_new_output_requests(
1248 &self,
1249 actor_id: ActorId,
1250 ) -> Vec<(ActorId, NewOutputRequest)> {
1251 self.actor_op_tx
1252 .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1253 .await
1254 .unwrap()
1255 }
1256 }
1257}