1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Display;
18use std::future::{pending, poll_fn};
19use std::sync::Arc;
20use std::task::Poll;
21
22use anyhow::anyhow;
23use await_tree::{InstrumentAwait, SpanExt};
24use futures::future::{BoxFuture, join, join_all};
25use futures::stream::{BoxStream, FuturesOrdered};
26use futures::{FutureExt, StreamExt, TryFutureExt};
27use itertools::Itertools;
28use risingwave_pb::stream_plan::barrier::BarrierKind;
29use risingwave_pb::stream_service::barrier_complete_response::{
30 PbCdcSourceOffsetUpdated, PbCdcTableBackfillProgress, PbCreateMviewProgress,
31 PbListFinishedSource, PbLoadFinishedSource, PbLocalSstableInfo,
32};
33use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
34use risingwave_storage::store_impl::AsHummock;
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot;
38use tokio::task::JoinHandle;
39use tokio::{select, spawn};
40use tonic::{Code, Status};
41use tracing::warn;
42
43use self::managed_state::ManagedBarrierState;
44use crate::error::{ScoredStreamError, StreamError, StreamResult};
45#[cfg(test)]
46use crate::task::LocalBarrierManager;
47use crate::task::managed_state::{BarrierToComplete, ResetPartialGraphOutput};
48use crate::task::{
49 ActorId, AtomicU64Ref, CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY, ConfigOverrideCache,
50 PartialGraphId, StreamActorManager, StreamEnvironment, UpDownActorIds,
51};
52pub mod managed_state;
53#[cfg(test)]
54mod tests;
55
56use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
57use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
58use risingwave_pb::stream_service::streaming_control_stream_request::{
59 InitRequest, Request, ResetPartialGraphsRequest,
60};
61use risingwave_pb::stream_service::streaming_control_stream_response::{
62 InitResponse, ReportPartialGraphFailureResponse, ResetPartialGraphResponse, Response,
63 ShutdownResponse,
64};
65use risingwave_pb::stream_service::{
66 BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
67 StreamingControlStreamResponse, streaming_control_stream_response,
68};
69
70use crate::executor::Barrier;
71use crate::executor::exchange::permit::Receiver;
72use crate::executor::monitor::StreamingMetrics;
73use crate::task::barrier_worker::managed_state::{
74 ManagedBarrierStateDebugInfo, ManagedBarrierStateEvent, PartialGraphState, PartialGraphStatus,
75};
76
77pub const ENABLE_BARRIER_AGGREGATION: bool = false;
80
81#[derive(Debug)]
83pub struct BarrierCompleteResult {
84 pub sync_result: Option<SyncResult>,
86
87 pub create_mview_progress: Vec<PbCreateMviewProgress>,
89
90 pub list_finished_source_ids: Vec<PbListFinishedSource>,
92
93 pub load_finished_source_ids: Vec<PbLoadFinishedSource>,
95
96 pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
97
98 pub cdc_source_offset_updated: Vec<PbCdcSourceOffsetUpdated>,
100
101 pub truncate_tables: Vec<TableId>,
103 pub refresh_finished_tables: Vec<TableId>,
105}
106
107pub(super) struct ControlStreamHandle {
111 #[expect(clippy::type_complexity)]
112 pair: Option<(
113 UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
114 BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
115 )>,
116}
117
118impl ControlStreamHandle {
119 fn empty() -> Self {
120 Self { pair: None }
121 }
122
123 pub(super) fn new(
124 sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
125 request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
126 ) -> Self {
127 Self {
128 pair: Some((sender, request_stream)),
129 }
130 }
131
132 pub(super) fn connected(&self) -> bool {
133 self.pair.is_some()
134 }
135
136 fn reset_stream_with_err(&mut self, err: Status) {
137 if let Some((sender, _)) = self.pair.take() {
138 let err = TonicStatusWrapper::new(err);
140 warn!(error = %err.as_report(), "control stream reset with error");
141
142 let err = err.into_inner();
143 if sender.send(Err(err)).is_err() {
144 warn!("failed to notify reset of control stream");
145 }
146 }
147 }
148
149 async fn shutdown_stream(&mut self) {
152 if let Some((sender, _)) = self.pair.take() {
153 if sender
154 .send(Ok(StreamingControlStreamResponse {
155 response: Some(streaming_control_stream_response::Response::Shutdown(
156 ShutdownResponse::default(),
157 )),
158 }))
159 .is_err()
160 {
161 warn!("failed to notify shutdown of control stream");
162 } else {
163 tracing::info!("waiting for meta service to close control stream...");
164
165 sender.closed().await;
172 }
173 } else {
174 debug!("control stream has been reset, ignore shutdown");
175 }
176 }
177
178 pub(super) fn ack_reset_partial_graph(
179 &mut self,
180 partial_graph_id: PartialGraphId,
181 root_err: Option<ScoredStreamError>,
182 ) {
183 self.send_response(Response::ResetPartialGraph(ResetPartialGraphResponse {
184 partial_graph_id,
185 root_err: root_err.map(|err| PbScoredError {
186 err_msg: err.error.to_report_string(),
187 score: err.score.0,
188 }),
189 }));
190 }
191
192 fn send_response(&mut self, response: streaming_control_stream_response::Response) {
193 if let Some((sender, _)) = self.pair.as_ref() {
194 if sender
195 .send(Ok(StreamingControlStreamResponse {
196 response: Some(response),
197 }))
198 .is_err()
199 {
200 self.pair = None;
201 warn!("fail to send response. control stream reset");
202 }
203 } else {
204 debug!(?response, "control stream has been reset. ignore response");
205 }
206 }
207
208 async fn next_request(&mut self) -> StreamingControlStreamRequest {
209 if let Some((_, stream)) = &mut self.pair {
210 match stream.next().await {
211 Some(Ok(request)) => {
212 return request;
213 }
214 Some(Err(e)) => self.reset_stream_with_err(
215 anyhow!(TonicStatusWrapper::new(e)) .context("failed to get request")
217 .to_status_unnamed(Code::Internal),
218 ),
219 None => self.reset_stream_with_err(Status::internal("end of stream")),
220 }
221 }
222 pending().await
223 }
224}
225
226pub(super) enum TakeReceiverRequest {
227 Remote(oneshot::Sender<StreamResult<Receiver>>),
228 Local(permit::Sender),
229}
230
231#[derive(strum_macros::Display)]
235pub(super) enum LocalActorOperation {
236 NewControlStream {
237 handle: ControlStreamHandle,
238 init_request: InitRequest,
239 },
240 TakeReceiver {
241 partial_graph_id: PartialGraphId,
242 term_id: String,
243 ids: UpDownActorIds,
244 request: TakeReceiverRequest,
245 },
246 #[cfg(test)]
247 GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
248 #[cfg(test)]
249 TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
250 #[cfg(test)]
251 Flush(oneshot::Sender<()>),
252 InspectState {
253 result_sender: oneshot::Sender<String>,
254 },
255 Shutdown {
256 result_sender: oneshot::Sender<()>,
257 },
258}
259
260pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
261 managed_barrier_state:
262 HashMap<PartialGraphId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
263 has_control_stream_connected: bool,
264}
265
266impl Display for LocalBarrierWorkerDebugInfo<'_> {
267 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268 writeln!(
269 f,
270 "\nhas_control_stream_connected: {}",
271 self.has_control_stream_connected
272 )?;
273
274 for (partial_graph_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
275 writeln!(
276 f,
277 "partial graph {} status: {} managed_barrier_state:\n{}",
278 partial_graph_id,
279 status,
280 managed_barrier_state
281 .as_ref()
282 .map(ToString::to_string)
283 .unwrap_or_default()
284 )?;
285 }
286 Ok(())
287 }
288}
289
290pub(super) struct LocalBarrierWorker {
298 pub(super) state: ManagedBarrierState,
300
301 await_epoch_completed_futures:
303 HashMap<PartialGraphId, FuturesOrdered<AwaitEpochCompletedFuture>>,
304
305 control_stream_handle: ControlStreamHandle,
306
307 pub(super) actor_manager: Arc<StreamActorManager>,
308
309 pub(super) term_id: String,
310}
311
312impl LocalBarrierWorker {
313 pub(super) fn new(actor_manager: Arc<StreamActorManager>, term_id: String) -> Self {
314 Self {
315 state: Default::default(),
316 await_epoch_completed_futures: Default::default(),
317 control_stream_handle: ControlStreamHandle::empty(),
318 actor_manager,
319 term_id,
320 }
321 }
322
323 fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
324 LocalBarrierWorkerDebugInfo {
325 managed_barrier_state: self
326 .state
327 .partial_graphs
328 .iter()
329 .map(|(partial_graph_id, status)| {
330 (*partial_graph_id, {
331 match status {
332 PartialGraphStatus::ReceivedExchangeRequest(_) => {
333 ("ReceivedExchangeRequest".to_owned(), None)
334 }
335 PartialGraphStatus::Running(state) => {
336 ("running".to_owned(), Some(state.to_debug_info()))
337 }
338 PartialGraphStatus::Suspended(state) => {
339 (format!("suspended: {:?}", state.suspend_time), None)
340 }
341 PartialGraphStatus::Resetting => ("resetting".to_owned(), None),
342 PartialGraphStatus::Unspecified => {
343 unreachable!()
344 }
345 }
346 })
347 })
348 .collect(),
349 has_control_stream_connected: self.control_stream_handle.connected(),
350 }
351 }
352
353 async fn next_completed_epoch(
354 futures: &mut HashMap<PartialGraphId, FuturesOrdered<AwaitEpochCompletedFuture>>,
355 ) -> (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>) {
356 poll_fn(|cx| {
357 for (partial_graph_id, futures) in &mut *futures {
358 if let Poll::Ready(Some((barrier, result))) = futures.poll_next_unpin(cx) {
359 return Poll::Ready((*partial_graph_id, barrier, result));
360 }
361 }
362 Poll::Pending
363 })
364 .await
365 }
366
367 async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
368 loop {
369 select! {
370 biased;
371 event = self.state.next_event() => {
372 match event {
373 ManagedBarrierStateEvent::BarrierCollected{
374 partial_graph_id,
375 barrier,
376 } => {
377 self.complete_barrier(partial_graph_id, barrier.epoch.prev);
380 }
381 ManagedBarrierStateEvent::ActorError{
382 partial_graph_id,
383 actor_id,
384 err,
385 } => {
386 self.on_partial_graph_failure(partial_graph_id, Some(actor_id), err, "recv actor failure");
387 }
388 ManagedBarrierStateEvent::PartialGraphsReset(output) => {
389 for (partial_graph_id, output) in output {
390 self.ack_partial_graph_reset(partial_graph_id, Some(output));
391 }
392 }
393 ManagedBarrierStateEvent::RegisterLocalUpstreamOutput{
394 actor_id,
395 upstream_actor_id,
396 upstream_partial_graph_id,
397 tx
398 } => {
399 self.handle_actor_op(LocalActorOperation::TakeReceiver {
400 partial_graph_id: upstream_partial_graph_id,
401 term_id: self.term_id.clone(),
402 ids: (upstream_actor_id, actor_id),
403 request: TakeReceiverRequest::Local(tx),
404 });
405 }
406 }
407 }
408 (partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
409 match result {
410 Ok(result) => {
411 self.on_epoch_completed(partial_graph_id, barrier.epoch.prev, result);
412 }
413 Err(err) => {
414 self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {:?} {:?}", partial_graph_id, barrier.epoch, err.as_report())));
418 }
419 }
420 },
421 actor_op = actor_op_rx.recv() => {
422 if let Some(actor_op) = actor_op {
423 match actor_op {
424 LocalActorOperation::NewControlStream { handle, init_request } => {
425 self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
426 self.reset(init_request).await;
427 self.control_stream_handle = handle;
428 self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
429 }
430 LocalActorOperation::Shutdown { result_sender } => {
431 if self.state.partial_graphs.values().any(|graph| {
432 match graph {
433 PartialGraphStatus::Running(graph) => {
434 !graph.actor_states.is_empty()
435 }
436 PartialGraphStatus::Suspended(_) | PartialGraphStatus::Resetting |
437 PartialGraphStatus::ReceivedExchangeRequest(_) => {
438 false
439 }
440 PartialGraphStatus::Unspecified => {
441 unreachable!()
442 }
443 }
444 }) {
445 tracing::warn!(
446 "shutdown with running actors, scaling or migration will be triggered"
447 );
448 }
449 self.control_stream_handle.shutdown_stream().await;
450 let _ = result_sender.send(());
451 }
452 actor_op => {
453 self.handle_actor_op(actor_op);
454 }
455 }
456 }
457 else {
458 break;
459 }
460 },
461 request = self.control_stream_handle.next_request() => {
462 let result = self.handle_streaming_control_request(request.request.expect("non empty"));
463 if let Err((partial_graph_id, err)) = result {
464 self.on_partial_graph_failure(partial_graph_id, None, err, "failed to inject barrier");
465 }
466 },
467 }
468 }
469 }
470
471 fn handle_streaming_control_request(
472 &mut self,
473 request: Request,
474 ) -> Result<(), (PartialGraphId, StreamError)> {
475 match request {
476 Request::InjectBarrier(req) => {
477 let partial_graph_id = req.partial_graph_id;
478 let result: StreamResult<()> = try {
479 let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
480 self.send_barrier(&barrier, req)?;
481 };
482 result.map_err(|e| (partial_graph_id, e))?;
483 Ok(())
484 }
485 Request::RemovePartialGraph(req) => {
486 self.remove_partial_graphs(req.partial_graph_ids);
487 Ok(())
488 }
489 Request::CreatePartialGraph(req) => {
490 self.add_partial_graph(req.partial_graph_id);
491 Ok(())
492 }
493 Request::ResetPartialGraphs(req) => {
494 self.reset_partial_graphs(req);
495 Ok(())
496 }
497 Request::Init(_) => {
498 unreachable!()
499 }
500 }
501 }
502
503 fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
504 match actor_op {
505 LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
506 unreachable!("event {actor_op} should be handled separately in async context")
507 }
508 LocalActorOperation::TakeReceiver {
509 partial_graph_id,
510 term_id,
511 ids,
512 request,
513 } => {
514 let err = if self.term_id != term_id {
515 {
516 warn!(
517 ?ids,
518 term_id,
519 current_term_id = self.term_id,
520 "take receiver on unmatched term_id"
521 );
522 anyhow!(
523 "take receiver {:?} on unmatched term_id {} to current term_id {}",
524 ids,
525 term_id,
526 self.term_id
527 )
528 }
529 } else {
530 match self.state.partial_graphs.entry(partial_graph_id) {
531 Entry::Occupied(mut entry) => match entry.get_mut() {
532 PartialGraphStatus::ReceivedExchangeRequest(pending_requests) => {
533 pending_requests.push((ids, request));
534 return;
535 }
536 PartialGraphStatus::Running(graph) => {
537 let (upstream_actor_id, actor_id) = ids;
538 graph.new_actor_output_request(
539 actor_id,
540 upstream_actor_id,
541 request,
542 );
543 return;
544 }
545 PartialGraphStatus::Suspended(_) => {
546 anyhow!("partial graph suspended")
547 }
548 PartialGraphStatus::Resetting => {
549 anyhow!("partial graph resetting")
550 }
551 PartialGraphStatus::Unspecified => {
552 unreachable!()
553 }
554 },
555 Entry::Vacant(entry) => {
556 entry.insert(PartialGraphStatus::ReceivedExchangeRequest(vec![(
557 ids, request,
558 )]));
559 return;
560 }
561 }
562 };
563 if let TakeReceiverRequest::Remote(result_sender) = request {
564 let _ = result_sender.send(Err(err.into()));
565 }
566 }
567 #[cfg(test)]
568 LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
569 let partial_graph_status = self
570 .state
571 .partial_graphs
572 .get(&crate::task::TEST_PARTIAL_GRAPH_ID)
573 .unwrap();
574 let partial_graph_state = risingwave_common::must_match!(partial_graph_status, PartialGraphStatus::Running(database_state) => database_state);
575 let _ = sender.send(partial_graph_state.local_barrier_manager.clone());
576 }
577 #[cfg(test)]
578 LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
579 let partial_graph_status = self
580 .state
581 .partial_graphs
582 .get_mut(&crate::task::TEST_PARTIAL_GRAPH_ID)
583 .unwrap();
584
585 let partial_graph_state = risingwave_common::must_match!(partial_graph_status, PartialGraphStatus::Running(database_state) => database_state);
586 assert!(!partial_graph_state.actor_states.contains_key(&actor_id));
587 let requests = partial_graph_state
588 .actor_pending_new_output_requests
589 .remove(&actor_id)
590 .unwrap();
591 let _ = sender.send(requests);
592 }
593 #[cfg(test)]
594 LocalActorOperation::Flush(sender) => {
595 use futures::FutureExt;
596 while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
597 self.handle_streaming_control_request(
598 request.request.expect("should not be empty"),
599 )
600 .unwrap();
601 }
602 while let Some(event) = self.state.next_event().now_or_never() {
603 match event {
604 ManagedBarrierStateEvent::BarrierCollected {
605 barrier,
606 partial_graph_id,
607 } => {
608 self.complete_barrier(partial_graph_id, barrier.epoch.prev);
609 }
610 ManagedBarrierStateEvent::ActorError { .. }
611 | ManagedBarrierStateEvent::PartialGraphsReset { .. }
612 | ManagedBarrierStateEvent::RegisterLocalUpstreamOutput { .. } => {
613 unreachable!()
614 }
615 }
616 }
617 sender.send(()).unwrap()
618 }
619 LocalActorOperation::InspectState { result_sender } => {
620 let debug_info = self.to_debug_info();
621 let _ = result_sender.send(debug_info.to_string());
622 }
623 }
624 }
625}
626
627mod await_epoch_completed_future {
628 use std::future::Future;
629
630 use futures::FutureExt;
631 use futures::future::BoxFuture;
632 use risingwave_common::id::TableId;
633 use risingwave_hummock_sdk::SyncResult;
634 use risingwave_pb::stream_service::barrier_complete_response::{
635 PbCdcSourceOffsetUpdated, PbCdcTableBackfillProgress, PbCreateMviewProgress,
636 PbListFinishedSource, PbLoadFinishedSource,
637 };
638
639 use crate::error::StreamResult;
640 use crate::executor::Barrier;
641 use crate::task::{BarrierCompleteResult, await_tree_key};
642
643 pub(super) type AwaitEpochCompletedFuture =
644 impl Future<Output = (Barrier, StreamResult<BarrierCompleteResult>)> + 'static;
645
646 #[define_opaque(AwaitEpochCompletedFuture)]
647 #[expect(clippy::too_many_arguments)]
648 pub(super) fn instrument_complete_barrier_future(
649 complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
650 barrier: Barrier,
651 barrier_await_tree_reg: Option<&await_tree::Registry>,
652 create_mview_progress: Vec<PbCreateMviewProgress>,
653 list_finished_source_ids: Vec<PbListFinishedSource>,
654 load_finished_source_ids: Vec<PbLoadFinishedSource>,
655 cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
656 cdc_source_offset_updated: Vec<PbCdcSourceOffsetUpdated>,
657 truncate_tables: Vec<TableId>,
658 refresh_finished_tables: Vec<TableId>,
659 ) -> AwaitEpochCompletedFuture {
660 let prev_epoch = barrier.epoch.prev;
661 let future = async move {
662 if let Some(future) = complete_barrier_future {
663 let result = future.await;
664 result.map(Some)
665 } else {
666 Ok(None)
667 }
668 }
669 .map(move |result| {
670 (
671 barrier,
672 result.map(|sync_result| BarrierCompleteResult {
673 sync_result,
674 create_mview_progress,
675 list_finished_source_ids,
676 load_finished_source_ids,
677 cdc_table_backfill_progress,
678 cdc_source_offset_updated,
679 truncate_tables,
680 refresh_finished_tables,
681 }),
682 )
683 });
684 if let Some(reg) = barrier_await_tree_reg {
685 reg.register(
686 await_tree_key::BarrierAwait { prev_epoch },
687 format!("SyncEpoch({})", prev_epoch),
688 )
689 .instrument(future)
690 .left_future()
691 } else {
692 future.right_future()
693 }
694 }
695}
696
697use await_epoch_completed_future::*;
698use risingwave_common::catalog::TableId;
699use risingwave_pb::hummock::vector_index_delta::PbVectorIndexAdds;
700use risingwave_storage::{StateStoreImpl, dispatch_state_store};
701
702use crate::executor::exchange::permit;
703
704fn sync_epoch(
705 state_store: &StateStoreImpl,
706 streaming_metrics: &StreamingMetrics,
707 prev_epoch: u64,
708 table_ids: HashSet<TableId>,
709) -> BoxFuture<'static, StreamResult<SyncResult>> {
710 let timer = streaming_metrics.barrier_sync_latency.start_timer();
711
712 let state_store = state_store.clone();
713 let future = async move {
714 dispatch_state_store!(state_store, hummock, {
715 hummock.sync(vec![(prev_epoch, table_ids)]).await
716 })
717 };
718
719 future
720 .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
721 .inspect_ok(move |_| {
722 timer.observe_duration();
723 })
724 .map_err(move |e| {
725 tracing::error!(
726 prev_epoch,
727 error = %e.as_report(),
728 "Failed to sync state store",
729 );
730 e.into()
731 })
732 .boxed()
733}
734
735impl LocalBarrierWorker {
736 fn complete_barrier(&mut self, partial_graph_id: PartialGraphId, prev_epoch: u64) {
737 {
738 let Some(graph_state) = self
739 .state
740 .partial_graphs
741 .get_mut(&partial_graph_id)
742 .expect("should exist")
743 .state_for_request()
744 else {
745 return;
746 };
747 let BarrierToComplete {
748 barrier,
749 table_ids,
750 create_mview_progress,
751 list_finished_source_ids,
752 load_finished_source_ids,
753 cdc_table_backfill_progress,
754 cdc_source_offset_updated,
755 truncate_tables,
756 refresh_finished_tables,
757 } = graph_state.pop_barrier_to_complete(prev_epoch);
758
759 let complete_barrier_future = match &barrier.kind {
760 BarrierKind::Unspecified => unreachable!(),
761 BarrierKind::Initial => {
762 tracing::info!(
763 epoch = prev_epoch,
764 "ignore sealing data for the first barrier"
765 );
766 tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
767 None
768 }
769 BarrierKind::Barrier => None,
770 BarrierKind::Checkpoint => Some(sync_epoch(
771 &self.actor_manager.env.state_store(),
772 &self.actor_manager.streaming_metrics,
773 prev_epoch,
774 table_ids.expect("should be Some on BarrierKind::Checkpoint"),
775 )),
776 };
777
778 self.await_epoch_completed_futures
779 .entry(partial_graph_id)
780 .or_default()
781 .push_back({
782 instrument_complete_barrier_future(
783 complete_barrier_future,
784 barrier,
785 self.actor_manager.await_tree_reg.as_ref(),
786 create_mview_progress,
787 list_finished_source_ids,
788 load_finished_source_ids,
789 cdc_table_backfill_progress,
790 cdc_source_offset_updated,
791 truncate_tables,
792 refresh_finished_tables,
793 )
794 });
795 }
796 }
797
798 fn on_epoch_completed(
799 &mut self,
800 partial_graph_id: PartialGraphId,
801 epoch: u64,
802 result: BarrierCompleteResult,
803 ) {
804 let BarrierCompleteResult {
805 create_mview_progress,
806 sync_result,
807 list_finished_source_ids,
808 load_finished_source_ids,
809 cdc_table_backfill_progress,
810 cdc_source_offset_updated,
811 truncate_tables,
812 refresh_finished_tables,
813 } = result;
814
815 let (synced_sstables, table_watermarks, old_value_ssts, vector_index_adds) = sync_result
816 .map(|sync_result| {
817 (
818 sync_result.uncommitted_ssts,
819 sync_result.table_watermarks,
820 sync_result.old_value_ssts,
821 sync_result.vector_index_adds,
822 )
823 })
824 .unwrap_or_default();
825
826 let result = {
827 {
828 streaming_control_stream_response::Response::CompleteBarrier(
829 BarrierCompleteResponse {
830 request_id: "todo".to_owned(),
831 partial_graph_id,
832 epoch,
833 status: None,
834 create_mview_progress,
835 synced_sstables: synced_sstables
836 .into_iter()
837 .map(
838 |LocalSstableInfo {
839 sst_info,
840 table_stats,
841 created_at,
842 }| PbLocalSstableInfo {
843 sst: Some(sst_info.into()),
844 table_stats_map: to_prost_table_stats_map(table_stats),
845 created_at,
846 },
847 )
848 .collect_vec(),
849 worker_id: self.actor_manager.env.worker_id(),
850 table_watermarks: table_watermarks
851 .into_iter()
852 .map(|(key, value)| (key, value.into()))
853 .collect(),
854 old_value_sstables: old_value_ssts
855 .into_iter()
856 .map(|sst| sst.sst_info.into())
857 .collect(),
858 list_finished_sources: list_finished_source_ids,
859 load_finished_sources: load_finished_source_ids,
860 cdc_source_offset_updated,
861 vector_index_adds: vector_index_adds
862 .into_iter()
863 .map(|(table_id, adds)| {
864 (
865 table_id,
866 PbVectorIndexAdds {
867 adds: adds.into_iter().map(|add| add.into()).collect(),
868 },
869 )
870 })
871 .collect(),
872 cdc_table_backfill_progress,
873 truncate_tables,
874 refresh_finished_tables,
875 },
876 )
877 }
878 };
879
880 self.control_stream_handle.send_response(result);
881 }
882
883 fn send_barrier(
890 &mut self,
891 barrier: &Barrier,
892 request: InjectBarrierRequest,
893 ) -> StreamResult<()> {
894 debug!(
895 target: "events::stream::barrier::manager::send",
896 "send barrier {:?}, actor_ids_to_collect = {:?}",
897 barrier,
898 request.actor_ids_to_collect
899 );
900
901 let status = self
902 .state
903 .partial_graphs
904 .get_mut(&request.partial_graph_id)
905 .expect("should exist");
906 if let Some(state) = status.state_for_request() {
907 state.transform_to_issued(barrier, request)?;
908 }
909 Ok(())
910 }
911
912 fn remove_partial_graphs(
913 &mut self,
914 partial_graph_ids: impl IntoIterator<Item = PartialGraphId>,
915 ) {
916 for partial_graph_id in partial_graph_ids {
917 if let Some(mut graph) = self.state.partial_graphs.remove(&partial_graph_id) {
918 if let Some(graph) = graph.state_for_request() {
919 assert!(
920 graph.graph_state.is_empty(),
921 "non empty graph to be removed: {}",
922 &graph.graph_state
923 );
924 }
925 } else {
926 warn!(
927 partial_graph_id = %partial_graph_id,
928 "no partial graph to remove"
929 );
930 }
931 }
932 }
933
934 fn add_partial_graph(&mut self, partial_graph_id: PartialGraphId) {
935 match self.state.partial_graphs.entry(partial_graph_id) {
936 Entry::Occupied(entry) => {
937 let status = entry.into_mut();
938 if let PartialGraphStatus::ReceivedExchangeRequest(pending_requests) = status {
939 let mut graph = PartialGraphState::new(
940 partial_graph_id,
941 self.term_id.clone(),
942 self.actor_manager.clone(),
943 );
944 for ((upstream_actor_id, actor_id), request) in pending_requests.drain(..) {
945 graph.new_actor_output_request(actor_id, upstream_actor_id, request);
946 }
947 *status = PartialGraphStatus::Running(graph);
948 } else {
949 panic!("duplicated partial graph: {}", partial_graph_id);
950 }
951
952 status
953 }
954 Entry::Vacant(entry) => {
955 entry.insert(PartialGraphStatus::Running(PartialGraphState::new(
956 partial_graph_id,
957 self.term_id.clone(),
958 self.actor_manager.clone(),
959 )))
960 }
961 };
962 }
963
964 fn reset_partial_graphs(&mut self, req: ResetPartialGraphsRequest) {
965 let mut table_ids_to_clear = HashSet::new();
966 let mut reset_futures = HashMap::new();
967 for partial_graph_id in req.partial_graph_ids {
968 if let Some(status) = self.state.partial_graphs.get_mut(&partial_graph_id) {
969 let reset_future = status.start_reset(
970 partial_graph_id,
971 self.await_epoch_completed_futures.remove(&partial_graph_id),
972 &mut table_ids_to_clear,
973 );
974 reset_futures.insert(partial_graph_id, reset_future);
975 } else {
976 self.ack_partial_graph_reset(partial_graph_id, None);
977 }
978 }
979 if reset_futures.is_empty() {
980 assert!(table_ids_to_clear.is_empty());
981 return;
982 }
983 let state_store = self.actor_manager.env.state_store();
984 self.state.resetting_graphs.push(spawn(async move {
985 let outputs =
986 join_all(
987 reset_futures
988 .into_iter()
989 .map(|(partial_graph_id, future)| async move {
990 (partial_graph_id, future.await)
991 }),
992 )
993 .await;
994 if !table_ids_to_clear.is_empty()
995 && let Some(hummock) = state_store.as_hummock()
996 {
997 hummock.clear_tables(table_ids_to_clear).await;
998 }
999 outputs
1000 }));
1001 }
1002
1003 fn ack_partial_graph_reset(
1004 &mut self,
1005 partial_graph_id: PartialGraphId,
1006 reset_output: Option<ResetPartialGraphOutput>,
1007 ) {
1008 info!(
1009 %partial_graph_id,
1010 "partial graph reset successfully"
1011 );
1012 assert!(!self.state.partial_graphs.contains_key(&partial_graph_id));
1013 self.await_epoch_completed_futures.remove(&partial_graph_id);
1014 self.control_stream_handle.ack_reset_partial_graph(
1015 partial_graph_id,
1016 reset_output.and_then(|output| output.root_err),
1017 );
1018 }
1019
1020 fn on_partial_graph_failure(
1024 &mut self,
1025 partial_graph_id: PartialGraphId,
1026 failed_actor: Option<ActorId>,
1027 err: StreamError,
1028 message: impl Into<String>,
1029 ) {
1030 let message = message.into();
1031 error!(%partial_graph_id, ?failed_actor, message, err = ?err.as_report(), "suspend partial graph on error");
1032 let completing_futures = self.await_epoch_completed_futures.remove(&partial_graph_id);
1033 self.state
1034 .partial_graphs
1035 .get_mut(&partial_graph_id)
1036 .expect("should exist")
1037 .suspend(failed_actor, err, completing_futures);
1038 self.control_stream_handle
1039 .send_response(Response::ReportPartialGraphFailure(
1040 ReportPartialGraphFailureResponse { partial_graph_id },
1041 ));
1042 }
1043
1044 async fn reset(&mut self, init_request: InitRequest) {
1046 join(
1047 join_all(
1048 self.state
1049 .partial_graphs
1050 .values_mut()
1051 .map(|graph| graph.abort()),
1052 ),
1053 async {
1054 while let Some(join_result) = self.state.resetting_graphs.next().await {
1055 join_result.expect("failed to join reset partial graphs handle");
1056 }
1057 },
1058 )
1059 .await;
1060 if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
1061 m.clear();
1062 }
1063
1064 if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
1065 hummock
1066 .clear_shared_buffer()
1067 .instrument_await("store_clear_shared_buffer".verbose())
1068 .await
1069 }
1070 self.actor_manager.env.dml_manager_ref().clear();
1071 *self = Self::new(self.actor_manager.clone(), init_request.term_id);
1072 self.actor_manager.env.client_pool().invalidate_all();
1073 }
1074
1075 pub fn spawn(
1077 env: StreamEnvironment,
1078 streaming_metrics: Arc<StreamingMetrics>,
1079 await_tree_reg: Option<await_tree::Registry>,
1080 watermark_epoch: AtomicU64Ref,
1081 actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1082 ) -> JoinHandle<()> {
1083 let runtime = {
1084 let mut builder = tokio::runtime::Builder::new_multi_thread();
1085 if let Some(worker_threads_num) = env.global_config().actor_runtime_worker_threads_num {
1086 builder.worker_threads(worker_threads_num);
1087 }
1088 builder
1089 .thread_name("rw-streaming")
1090 .enable_all()
1091 .build()
1092 .unwrap()
1093 };
1094
1095 let actor_manager = Arc::new(StreamActorManager {
1096 env,
1097 streaming_metrics,
1098 watermark_epoch,
1099 await_tree_reg,
1100 runtime: runtime.into(),
1101 config_override_cache: ConfigOverrideCache::new(CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY),
1102 });
1103 let worker = LocalBarrierWorker::new(actor_manager, "uninitialized".into());
1104 tokio::spawn(worker.run(actor_op_rx))
1105 }
1106}
1107
1108pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1109
1110impl<T> Clone for EventSender<T> {
1111 fn clone(&self) -> Self {
1112 Self(self.0.clone())
1113 }
1114}
1115
1116impl<T> EventSender<T> {
1117 pub(super) fn send_event(&self, event: T) {
1118 self.0.send(event).expect("should be able to send event")
1119 }
1120
1121 pub(super) async fn send_and_await<RSP>(
1122 &self,
1123 make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1124 ) -> StreamResult<RSP> {
1125 let (tx, rx) = oneshot::channel();
1126 let event = make_event(tx);
1127 self.send_event(event);
1128 rx.await
1129 .map_err(|_| anyhow!("barrier manager maybe reset").into())
1130 }
1131}
1132
1133pub(crate) enum NewOutputRequest {
1134 Local(permit::Sender),
1135 Remote(permit::Sender),
1136}
1137
1138#[cfg(test)]
1139pub(crate) mod barrier_test_utils {
1140 use assert_matches::assert_matches;
1141 use futures::StreamExt;
1142 use risingwave_pb::stream_service::streaming_control_stream_request::{
1143 InitRequest, PbCreatePartialGraphRequest,
1144 };
1145 use risingwave_pb::stream_service::{
1146 InjectBarrierRequest, PbStreamingControlStreamRequest, StreamingControlStreamRequest,
1147 StreamingControlStreamResponse, streaming_control_stream_request,
1148 streaming_control_stream_response,
1149 };
1150 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1151 use tokio::sync::oneshot;
1152 use tokio_stream::wrappers::UnboundedReceiverStream;
1153 use tonic::Status;
1154
1155 use crate::executor::Barrier;
1156 use crate::task::barrier_worker::{ControlStreamHandle, EventSender, LocalActorOperation};
1157 use crate::task::{ActorId, LocalBarrierManager, NewOutputRequest, TEST_PARTIAL_GRAPH_ID};
1158
1159 pub(crate) struct LocalBarrierTestEnv {
1160 pub local_barrier_manager: LocalBarrierManager,
1161 pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1162 pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1163 pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1164 }
1165
1166 impl LocalBarrierTestEnv {
1167 pub(crate) async fn for_test() -> Self {
1168 let actor_op_tx = LocalBarrierManager::spawn_for_test();
1169
1170 let (request_tx, request_rx) = unbounded_channel();
1171 let (response_tx, mut response_rx) = unbounded_channel();
1172
1173 request_tx
1174 .send(Ok(PbStreamingControlStreamRequest {
1175 request: Some(
1176 streaming_control_stream_request::Request::CreatePartialGraph(
1177 PbCreatePartialGraphRequest {
1178 partial_graph_id: TEST_PARTIAL_GRAPH_ID,
1179 },
1180 ),
1181 ),
1182 }))
1183 .unwrap();
1184
1185 actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1186 handle: ControlStreamHandle::new(
1187 response_tx,
1188 UnboundedReceiverStream::new(request_rx).boxed(),
1189 ),
1190 init_request: InitRequest {
1191 term_id: "for_test".into(),
1192 },
1193 });
1194
1195 assert_matches!(
1196 response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1197 streaming_control_stream_response::Response::Init(_)
1198 );
1199
1200 let local_barrier_manager = actor_op_tx
1201 .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1202 .await
1203 .unwrap();
1204
1205 Self {
1206 local_barrier_manager,
1207 actor_op_tx,
1208 request_tx,
1209 response_rx,
1210 }
1211 }
1212
1213 pub(crate) fn inject_barrier(
1214 &self,
1215 barrier: &Barrier,
1216 actor_to_collect: impl IntoIterator<Item = ActorId>,
1217 ) {
1218 self.request_tx
1219 .send(Ok(StreamingControlStreamRequest {
1220 request: Some(streaming_control_stream_request::Request::InjectBarrier(
1221 InjectBarrierRequest {
1222 request_id: "".to_owned(),
1223 barrier: Some(barrier.to_protobuf()),
1224 actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1225 table_ids_to_sync: vec![],
1226 partial_graph_id: TEST_PARTIAL_GRAPH_ID,
1227 actors_to_build: vec![],
1228 },
1229 )),
1230 }))
1231 .unwrap();
1232 }
1233
1234 pub(crate) async fn flush_all_events(&self) {
1235 Self::flush_all_events_impl(&self.actor_op_tx).await
1236 }
1237
1238 pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1239 let (tx, rx) = oneshot::channel();
1240 actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1241 rx.await.unwrap()
1242 }
1243
1244 pub(crate) async fn take_pending_new_output_requests(
1245 &self,
1246 actor_id: ActorId,
1247 ) -> Vec<(ActorId, NewOutputRequest)> {
1248 self.actor_op_tx
1249 .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1250 .await
1251 .unwrap()
1252 }
1253 }
1254}