1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Display;
18use std::future::{pending, poll_fn};
19use std::sync::Arc;
20use std::task::Poll;
21use std::time::Duration;
22
23use anyhow::anyhow;
24use await_tree::InstrumentAwait;
25use futures::future::BoxFuture;
26use futures::stream::{BoxStream, FuturesOrdered};
27use futures::{FutureExt, StreamExt, TryFutureExt};
28use itertools::Itertools;
29use risingwave_common::error::tonic::extra::{Score, ScoredError};
30use risingwave_pb::stream_plan::barrier::BarrierKind;
31use risingwave_pb::stream_service::barrier_complete_response::{
32 PbCreateMviewProgress, PbLocalSstableInfo,
33};
34use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
35use risingwave_storage::store_impl::AsHummock;
36use thiserror_ext::AsReport;
37use tokio::select;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
39use tokio::sync::{mpsc, oneshot};
40use tokio::task::JoinHandle;
41use tonic::{Code, Status};
42use tracing::warn;
43
44use self::managed_state::ManagedBarrierState;
45use crate::error::{IntoUnexpectedExit, StreamError, StreamResult};
46use crate::task::{ActorId, AtomicU64Ref, PartialGraphId, StreamEnvironment, UpDownActorIds};
47
48mod managed_state;
49mod progress;
50#[cfg(test)]
51mod tests;
52
53pub use progress::CreateMviewProgressReporter;
54use risingwave_common::util::epoch::EpochPair;
55use risingwave_common::util::runtime::BackgroundShutdownRuntime;
56use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
57use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
58use risingwave_pb::stream_service::streaming_control_stream_request::{
59 DatabaseInitialPartialGraph, InitRequest, Request, ResetDatabaseRequest,
60};
61use risingwave_pb::stream_service::streaming_control_stream_response::{
62 InitResponse, ReportDatabaseFailureResponse, ResetDatabaseResponse, Response, ShutdownResponse,
63};
64use risingwave_pb::stream_service::{
65 BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
66 StreamingControlStreamResponse, streaming_control_stream_response,
67};
68
69use crate::executor::exchange::permit::{Receiver, channel_from_config};
70use crate::executor::monitor::StreamingMetrics;
71use crate::executor::{Barrier, BarrierInner, StreamExecutorError};
72use crate::task::barrier_manager::managed_state::{
73 DatabaseManagedBarrierState, DatabaseStatus, ManagedBarrierStateDebugInfo,
74 ManagedBarrierStateEvent, PartialGraphManagedBarrierState, ResetDatabaseOutput,
75};
76use crate::task::barrier_manager::progress::BackfillState;
77
78pub const ENABLE_BARRIER_AGGREGATION: bool = false;
81
82#[derive(Debug)]
84pub struct BarrierCompleteResult {
85 pub sync_result: Option<SyncResult>,
87
88 pub create_mview_progress: Vec<PbCreateMviewProgress>,
90}
91
92pub(super) struct ControlStreamHandle {
93 #[expect(clippy::type_complexity)]
94 pair: Option<(
95 UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
96 BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
97 )>,
98}
99
100impl ControlStreamHandle {
101 fn empty() -> Self {
102 Self { pair: None }
103 }
104
105 pub(super) fn new(
106 sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
107 request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
108 ) -> Self {
109 Self {
110 pair: Some((sender, request_stream)),
111 }
112 }
113
114 pub(super) fn connected(&self) -> bool {
115 self.pair.is_some()
116 }
117
118 fn reset_stream_with_err(&mut self, err: Status) {
119 if let Some((sender, _)) = self.pair.take() {
120 let err = TonicStatusWrapper::new(err);
122 warn!(error = %err.as_report(), "control stream reset with error");
123
124 let err = err.into_inner();
125 if sender.send(Err(err)).is_err() {
126 warn!("failed to notify reset of control stream");
127 }
128 }
129 }
130
131 async fn shutdown_stream(&mut self) {
134 if let Some((sender, _)) = self.pair.take() {
135 if sender
136 .send(Ok(StreamingControlStreamResponse {
137 response: Some(streaming_control_stream_response::Response::Shutdown(
138 ShutdownResponse::default(),
139 )),
140 }))
141 .is_err()
142 {
143 warn!("failed to notify shutdown of control stream");
144 } else {
145 tracing::info!("waiting for meta service to close control stream...");
146
147 sender.closed().await;
154 }
155 } else {
156 debug!("control stream has been reset, ignore shutdown");
157 }
158 }
159
160 pub(super) fn ack_reset_database(
161 &mut self,
162 database_id: DatabaseId,
163 root_err: Option<ScoredStreamError>,
164 reset_request_id: u32,
165 ) {
166 self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
167 database_id: database_id.database_id,
168 root_err: root_err.map(|err| PbScoredError {
169 err_msg: err.error.to_report_string(),
170 score: err.score.0,
171 }),
172 reset_request_id,
173 }));
174 }
175
176 fn send_response(&mut self, response: streaming_control_stream_response::Response) {
177 if let Some((sender, _)) = self.pair.as_ref() {
178 if sender
179 .send(Ok(StreamingControlStreamResponse {
180 response: Some(response),
181 }))
182 .is_err()
183 {
184 self.pair = None;
185 warn!("fail to send response. control stream reset");
186 }
187 } else {
188 debug!(?response, "control stream has been reset. ignore response");
189 }
190 }
191
192 async fn next_request(&mut self) -> StreamingControlStreamRequest {
193 if let Some((_, stream)) = &mut self.pair {
194 match stream.next().await {
195 Some(Ok(request)) => {
196 return request;
197 }
198 Some(Err(e)) => self.reset_stream_with_err(
199 anyhow!(TonicStatusWrapper::new(e)) .context("failed to get request")
201 .to_status_unnamed(Code::Internal),
202 ),
203 None => self.reset_stream_with_err(Status::internal("end of stream")),
204 }
205 }
206 pending().await
207 }
208}
209
210pub(super) enum LocalBarrierEvent {
211 ReportActorCollected {
212 actor_id: ActorId,
213 epoch: EpochPair,
214 },
215 ReportCreateProgress {
216 epoch: EpochPair,
217 actor: ActorId,
218 state: BackfillState,
219 },
220 RegisterBarrierSender {
221 actor_id: ActorId,
222 barrier_sender: mpsc::UnboundedSender<Barrier>,
223 },
224 RegisterLocalUpstreamOutput {
225 actor_id: ActorId,
226 upstream_actor_id: ActorId,
227 tx: permit::Sender,
228 },
229}
230
231#[derive(strum_macros::Display)]
232pub(super) enum LocalActorOperation {
233 NewControlStream {
234 handle: ControlStreamHandle,
235 init_request: InitRequest,
236 },
237 TakeReceiver {
238 database_id: DatabaseId,
239 term_id: String,
240 ids: UpDownActorIds,
241 result_sender: oneshot::Sender<StreamResult<Receiver>>,
242 },
243 #[cfg(test)]
244 GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
245 #[cfg(test)]
246 TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
247 #[cfg(test)]
248 Flush(oneshot::Sender<()>),
249 InspectState {
250 result_sender: oneshot::Sender<String>,
251 },
252 Shutdown {
253 result_sender: oneshot::Sender<()>,
254 },
255}
256
257pub(crate) struct StreamActorManager {
258 pub(super) env: StreamEnvironment,
259 pub(super) streaming_metrics: Arc<StreamingMetrics>,
260
261 pub(super) watermark_epoch: AtomicU64Ref,
263
264 pub(super) await_tree_reg: Option<await_tree::Registry>,
266
267 pub(super) runtime: BackgroundShutdownRuntime,
269}
270
271pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
272 managed_barrier_state: HashMap<DatabaseId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
273 has_control_stream_connected: bool,
274}
275
276impl Display for LocalBarrierWorkerDebugInfo<'_> {
277 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278 writeln!(
279 f,
280 "\nhas_control_stream_connected: {}",
281 self.has_control_stream_connected
282 )?;
283
284 for (database_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
285 writeln!(
286 f,
287 "database {} status: {} managed_barrier_state:\n{}",
288 database_id.database_id,
289 status,
290 managed_barrier_state
291 .as_ref()
292 .map(ToString::to_string)
293 .unwrap_or_default()
294 )?;
295 }
296 Ok(())
297 }
298}
299
300pub(super) struct LocalBarrierWorker {
304 pub(super) state: ManagedBarrierState,
306
307 await_epoch_completed_futures: HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
309
310 control_stream_handle: ControlStreamHandle,
311
312 pub(super) actor_manager: Arc<StreamActorManager>,
313
314 pub(super) term_id: String,
315}
316
317impl LocalBarrierWorker {
318 pub(super) fn new(
319 actor_manager: Arc<StreamActorManager>,
320 initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
321 term_id: String,
322 ) -> Self {
323 let state = ManagedBarrierState::new(
324 actor_manager.clone(),
325 initial_partial_graphs,
326 term_id.clone(),
327 );
328 Self {
329 state,
330 await_epoch_completed_futures: Default::default(),
331 control_stream_handle: ControlStreamHandle::empty(),
332 actor_manager,
333 term_id,
334 }
335 }
336
337 fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
338 LocalBarrierWorkerDebugInfo {
339 managed_barrier_state: self
340 .state
341 .databases
342 .iter()
343 .map(|(database_id, status)| {
344 (*database_id, {
345 match status {
346 DatabaseStatus::ReceivedExchangeRequest(_) => {
347 ("ReceivedExchangeRequest".to_owned(), None)
348 }
349 DatabaseStatus::Running(state) => {
350 ("running".to_owned(), Some(state.to_debug_info()))
351 }
352 DatabaseStatus::Suspended(state) => {
353 (format!("suspended: {:?}", state.suspend_time), None)
354 }
355 DatabaseStatus::Resetting(_) => ("resetting".to_owned(), None),
356 DatabaseStatus::Unspecified => {
357 unreachable!()
358 }
359 }
360 })
361 })
362 .collect(),
363 has_control_stream_connected: self.control_stream_handle.connected(),
364 }
365 }
366
367 async fn next_completed_epoch(
368 futures: &mut HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
369 ) -> (
370 DatabaseId,
371 PartialGraphId,
372 Barrier,
373 StreamResult<BarrierCompleteResult>,
374 ) {
375 poll_fn(|cx| {
376 for (database_id, futures) in &mut *futures {
377 if let Poll::Ready(Some((partial_graph_id, barrier, result))) =
378 futures.poll_next_unpin(cx)
379 {
380 return Poll::Ready((*database_id, partial_graph_id, barrier, result));
381 }
382 }
383 Poll::Pending
384 })
385 .await
386 }
387
388 async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
389 loop {
390 select! {
391 biased;
392 (database_id, event) = self.state.next_event() => {
393 match event {
394 ManagedBarrierStateEvent::BarrierCollected{
395 partial_graph_id,
396 barrier,
397 } => {
398 self.complete_barrier(database_id, partial_graph_id, barrier.epoch.prev);
399 }
400 ManagedBarrierStateEvent::ActorError{
401 actor_id,
402 err,
403 } => {
404 self.on_database_failure(database_id, Some(actor_id), err, "recv actor failure");
405 }
406 ManagedBarrierStateEvent::DatabaseReset(output, reset_request_id) => {
407 self.ack_database_reset(database_id, Some(output), reset_request_id);
408 }
409 }
410 }
411 (database_id, partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
412 match result {
413 Ok(result) => {
414 self.on_epoch_completed(database_id, partial_graph_id, barrier.epoch.prev, result);
415 }
416 Err(err) => {
417 self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {} {:?} {:?}", database_id, partial_graph_id.0, barrier.epoch, err.as_report())));
421 }
422 }
423 },
424 actor_op = actor_op_rx.recv() => {
425 if let Some(actor_op) = actor_op {
426 match actor_op {
427 LocalActorOperation::NewControlStream { handle, init_request } => {
428 self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
429 self.reset(init_request).await;
430 self.control_stream_handle = handle;
431 self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
432 }
433 LocalActorOperation::Shutdown { result_sender } => {
434 if self.state.databases.values().any(|database| {
435 match database {
436 DatabaseStatus::Running(database) => {
437 !database.actor_states.is_empty()
438 }
439 DatabaseStatus::Suspended(_) | DatabaseStatus::Resetting(_) |
440 DatabaseStatus::ReceivedExchangeRequest(_) => {
441 false
442 }
443 DatabaseStatus::Unspecified => {
444 unreachable!()
445 }
446 }
447 }) {
448 tracing::warn!(
449 "shutdown with running actors, scaling or migration will be triggered"
450 );
451 }
452 self.control_stream_handle.shutdown_stream().await;
453 let _ = result_sender.send(());
454 }
455 actor_op => {
456 self.handle_actor_op(actor_op);
457 }
458 }
459 }
460 else {
461 break;
462 }
463 },
464 request = self.control_stream_handle.next_request() => {
465 let result = self.handle_streaming_control_request(request.request.expect("non empty"));
466 if let Err((database_id, err)) = result {
467 self.on_database_failure(database_id, None, err, "failed to inject barrier");
468 }
469 },
470 }
471 }
472 }
473
474 fn handle_streaming_control_request(
475 &mut self,
476 request: Request,
477 ) -> Result<(), (DatabaseId, StreamError)> {
478 match request {
479 Request::InjectBarrier(req) => {
480 let database_id = DatabaseId::new(req.database_id);
481 let result: StreamResult<()> = try {
482 let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
483 self.send_barrier(&barrier, req)?;
484 };
485 result.map_err(|e| (database_id, e))?;
486 Ok(())
487 }
488 Request::RemovePartialGraph(req) => {
489 self.remove_partial_graphs(
490 DatabaseId::new(req.database_id),
491 req.partial_graph_ids.into_iter().map(PartialGraphId::new),
492 );
493 Ok(())
494 }
495 Request::CreatePartialGraph(req) => {
496 self.add_partial_graph(
497 DatabaseId::new(req.database_id),
498 PartialGraphId::new(req.partial_graph_id),
499 );
500 Ok(())
501 }
502 Request::ResetDatabase(req) => {
503 self.reset_database(req);
504 Ok(())
505 }
506 Request::Init(_) => {
507 unreachable!()
508 }
509 }
510 }
511
512 fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
513 match actor_op {
514 LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
515 unreachable!("event {actor_op} should be handled separately in async context")
516 }
517 LocalActorOperation::TakeReceiver {
518 database_id,
519 term_id,
520 ids,
521 result_sender,
522 } => {
523 let err = if self.term_id != term_id {
524 {
525 warn!(
526 ?ids,
527 term_id,
528 current_term_id = self.term_id,
529 "take receiver on unmatched term_id"
530 );
531 anyhow!(
532 "take receiver {:?} on unmatched term_id {} to current term_id {}",
533 ids,
534 term_id,
535 self.term_id
536 )
537 }
538 } else {
539 match self.state.databases.entry(database_id) {
540 Entry::Occupied(mut entry) => match entry.get_mut() {
541 DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
542 pending_requests.push((ids, result_sender));
543 return;
544 }
545 DatabaseStatus::Running(database) => {
546 let (upstream_actor_id, actor_id) = ids;
547 database.new_actor_remote_output_request(
548 actor_id,
549 upstream_actor_id,
550 result_sender,
551 );
552 return;
553 }
554 DatabaseStatus::Suspended(_) => {
555 anyhow!("database suspended")
556 }
557 DatabaseStatus::Resetting(_) => {
558 anyhow!("database resetting")
559 }
560 DatabaseStatus::Unspecified => {
561 unreachable!()
562 }
563 },
564 Entry::Vacant(entry) => {
565 entry.insert(DatabaseStatus::ReceivedExchangeRequest(vec![(
566 ids,
567 result_sender,
568 )]));
569 return;
570 }
571 }
572 };
573 let _ = result_sender.send(Err(err.into()));
574 }
575 #[cfg(test)]
576 LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
577 let database_status = self
578 .state
579 .databases
580 .get(&crate::task::TEST_DATABASE_ID)
581 .unwrap();
582 let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
583 let _ = sender.send(database_state.local_barrier_manager.clone());
584 }
585 #[cfg(test)]
586 LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
587 let database_status = self
588 .state
589 .databases
590 .get_mut(&crate::task::TEST_DATABASE_ID)
591 .unwrap();
592
593 let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
594 assert!(!database_state.actor_states.contains_key(&actor_id));
595 let requests = database_state
596 .actor_pending_new_output_requests
597 .remove(&actor_id)
598 .unwrap();
599 let _ = sender.send(requests);
600 }
601 #[cfg(test)]
602 LocalActorOperation::Flush(sender) => {
603 use futures::FutureExt;
604 while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
605 self.handle_streaming_control_request(
606 request.request.expect("should not be empty"),
607 )
608 .unwrap();
609 }
610 while let Some((database_id, event)) = self.state.next_event().now_or_never() {
611 match event {
612 ManagedBarrierStateEvent::BarrierCollected {
613 partial_graph_id,
614 barrier,
615 } => {
616 self.complete_barrier(
617 database_id,
618 partial_graph_id,
619 barrier.epoch.prev,
620 );
621 }
622 ManagedBarrierStateEvent::ActorError { .. }
623 | ManagedBarrierStateEvent::DatabaseReset(..) => {
624 unreachable!()
625 }
626 }
627 }
628 sender.send(()).unwrap()
629 }
630 LocalActorOperation::InspectState { result_sender } => {
631 let debug_info = self.to_debug_info();
632 let _ = result_sender.send(debug_info.to_string());
633 }
634 }
635 }
636}
637
638mod await_epoch_completed_future {
639 use std::future::Future;
640
641 use futures::FutureExt;
642 use futures::future::BoxFuture;
643 use risingwave_hummock_sdk::SyncResult;
644 use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
645
646 use crate::error::StreamResult;
647 use crate::executor::Barrier;
648 use crate::task::{BarrierCompleteResult, PartialGraphId, await_tree_key};
649
650 pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
651 + 'static;
652
653 pub(super) fn instrument_complete_barrier_future(
654 partial_graph_id: PartialGraphId,
655 complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
656 barrier: Barrier,
657 barrier_await_tree_reg: Option<&await_tree::Registry>,
658 create_mview_progress: Vec<PbCreateMviewProgress>,
659 ) -> AwaitEpochCompletedFuture {
660 let prev_epoch = barrier.epoch.prev;
661 let future = async move {
662 if let Some(future) = complete_barrier_future {
663 let result = future.await;
664 result.map(Some)
665 } else {
666 Ok(None)
667 }
668 }
669 .map(move |result| {
670 (
671 partial_graph_id,
672 barrier,
673 result.map(|sync_result| BarrierCompleteResult {
674 sync_result,
675 create_mview_progress,
676 }),
677 )
678 });
679 if let Some(reg) = barrier_await_tree_reg {
680 reg.register(
681 await_tree_key::BarrierAwait { prev_epoch },
682 format!("SyncEpoch({})", prev_epoch),
683 )
684 .instrument(future)
685 .left_future()
686 } else {
687 future.right_future()
688 }
689 }
690}
691
692use await_epoch_completed_future::*;
693use risingwave_common::catalog::{DatabaseId, TableId};
694use risingwave_storage::{StateStoreImpl, dispatch_state_store};
695
696use crate::executor::exchange::permit;
697
698fn sync_epoch(
699 state_store: &StateStoreImpl,
700 streaming_metrics: &StreamingMetrics,
701 prev_epoch: u64,
702 table_ids: HashSet<TableId>,
703) -> BoxFuture<'static, StreamResult<SyncResult>> {
704 let timer = streaming_metrics.barrier_sync_latency.start_timer();
705
706 let state_store = state_store.clone();
707 let future = async move {
708 dispatch_state_store!(state_store, hummock, {
709 hummock.sync(vec![(prev_epoch, table_ids)]).await
710 })
711 };
712
713 future
714 .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
715 .inspect_ok(move |_| {
716 timer.observe_duration();
717 })
718 .map_err(move |e| {
719 tracing::error!(
720 prev_epoch,
721 error = %e.as_report(),
722 "Failed to sync state store",
723 );
724 e.into()
725 })
726 .boxed()
727}
728
729impl LocalBarrierWorker {
730 fn complete_barrier(
731 &mut self,
732 database_id: DatabaseId,
733 partial_graph_id: PartialGraphId,
734 prev_epoch: u64,
735 ) {
736 {
737 let Some(database_state) = self
738 .state
739 .databases
740 .get_mut(&database_id)
741 .expect("should exist")
742 .state_for_request()
743 else {
744 return;
745 };
746 let (barrier, table_ids, create_mview_progress) =
747 database_state.pop_barrier_to_complete(partial_graph_id, prev_epoch);
748
749 let complete_barrier_future = match &barrier.kind {
750 BarrierKind::Unspecified => unreachable!(),
751 BarrierKind::Initial => {
752 tracing::info!(
753 epoch = prev_epoch,
754 "ignore sealing data for the first barrier"
755 );
756 tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
757 None
758 }
759 BarrierKind::Barrier => None,
760 BarrierKind::Checkpoint => Some(sync_epoch(
761 &self.actor_manager.env.state_store(),
762 &self.actor_manager.streaming_metrics,
763 prev_epoch,
764 table_ids.expect("should be Some on BarrierKind::Checkpoint"),
765 )),
766 };
767
768 self.await_epoch_completed_futures
769 .entry(database_id)
770 .or_default()
771 .push_back({
772 instrument_complete_barrier_future(
773 partial_graph_id,
774 complete_barrier_future,
775 barrier,
776 self.actor_manager.await_tree_reg.as_ref(),
777 create_mview_progress,
778 )
779 });
780 }
781 }
782
783 fn on_epoch_completed(
784 &mut self,
785 database_id: DatabaseId,
786 partial_graph_id: PartialGraphId,
787 epoch: u64,
788 result: BarrierCompleteResult,
789 ) {
790 let BarrierCompleteResult {
791 create_mview_progress,
792 sync_result,
793 } = result;
794
795 let (synced_sstables, table_watermarks, old_value_ssts) = sync_result
796 .map(|sync_result| {
797 (
798 sync_result.uncommitted_ssts,
799 sync_result.table_watermarks,
800 sync_result.old_value_ssts,
801 )
802 })
803 .unwrap_or_default();
804
805 let result = {
806 {
807 streaming_control_stream_response::Response::CompleteBarrier(
808 BarrierCompleteResponse {
809 request_id: "todo".to_owned(),
810 partial_graph_id: partial_graph_id.into(),
811 epoch,
812 status: None,
813 create_mview_progress,
814 synced_sstables: synced_sstables
815 .into_iter()
816 .map(
817 |LocalSstableInfo {
818 sst_info,
819 table_stats,
820 created_at,
821 }| PbLocalSstableInfo {
822 sst: Some(sst_info.into()),
823 table_stats_map: to_prost_table_stats_map(table_stats),
824 created_at,
825 },
826 )
827 .collect_vec(),
828 worker_id: self.actor_manager.env.worker_id(),
829 table_watermarks: table_watermarks
830 .into_iter()
831 .map(|(key, value)| (key.table_id, value.into()))
832 .collect(),
833 old_value_sstables: old_value_ssts
834 .into_iter()
835 .map(|sst| sst.sst_info.into())
836 .collect(),
837 database_id: database_id.database_id,
838 },
839 )
840 }
841 };
842
843 self.control_stream_handle.send_response(result);
844 }
845
846 fn send_barrier(
853 &mut self,
854 barrier: &Barrier,
855 request: InjectBarrierRequest,
856 ) -> StreamResult<()> {
857 debug!(
858 target: "events::stream::barrier::manager::send",
859 "send barrier {:?}, actor_ids_to_collect = {:?}",
860 barrier,
861 request.actor_ids_to_collect
862 );
863
864 let database_status = self
865 .state
866 .databases
867 .get_mut(&DatabaseId::new(request.database_id))
868 .expect("should exist");
869 if let Some(state) = database_status.state_for_request() {
870 state.transform_to_issued(barrier, request)?;
871 }
872 Ok(())
873 }
874
875 fn remove_partial_graphs(
876 &mut self,
877 database_id: DatabaseId,
878 partial_graph_ids: impl Iterator<Item = PartialGraphId>,
879 ) {
880 let Some(database_status) = self.state.databases.get_mut(&database_id) else {
881 warn!(
882 database_id = database_id.database_id,
883 "database to remove partial graph not exist"
884 );
885 return;
886 };
887 let Some(database_state) = database_status.state_for_request() else {
888 warn!(
889 database_id = database_id.database_id,
890 "ignore remove partial graph request on err database",
891 );
892 return;
893 };
894 for partial_graph_id in partial_graph_ids {
895 if let Some(graph) = database_state.graph_states.remove(&partial_graph_id) {
896 assert!(
897 graph.is_empty(),
898 "non empty graph to be removed: {}",
899 &graph
900 );
901 } else {
902 warn!(
903 partial_graph_id = partial_graph_id.0,
904 "no partial graph to remove"
905 );
906 }
907 }
908 }
909
910 fn add_partial_graph(&mut self, database_id: DatabaseId, partial_graph_id: PartialGraphId) {
911 let status = match self.state.databases.entry(database_id) {
912 Entry::Occupied(entry) => {
913 let status = entry.into_mut();
914 if let DatabaseStatus::ReceivedExchangeRequest(pending_requests) = status {
915 let mut database = DatabaseManagedBarrierState::new(
916 database_id,
917 self.term_id.clone(),
918 self.actor_manager.clone(),
919 vec![],
920 );
921 for ((upstream_actor_id, actor_id), result_sender) in pending_requests.drain(..)
922 {
923 database.new_actor_remote_output_request(
924 actor_id,
925 upstream_actor_id,
926 result_sender,
927 );
928 }
929 *status = DatabaseStatus::Running(database);
930 }
931
932 status
933 }
934 Entry::Vacant(entry) => {
935 entry.insert(DatabaseStatus::Running(DatabaseManagedBarrierState::new(
936 database_id,
937 self.term_id.clone(),
938 self.actor_manager.clone(),
939 vec![],
940 )))
941 }
942 };
943 if let Some(state) = status.state_for_request() {
944 assert!(
945 state
946 .graph_states
947 .insert(
948 partial_graph_id,
949 PartialGraphManagedBarrierState::new(&self.actor_manager)
950 )
951 .is_none()
952 );
953 }
954 }
955
956 fn reset_database(&mut self, req: ResetDatabaseRequest) {
957 let database_id = DatabaseId::new(req.database_id);
958 if let Some(database_status) = self.state.databases.get_mut(&database_id) {
959 database_status.start_reset(
960 database_id,
961 self.await_epoch_completed_futures.remove(&database_id),
962 req.reset_request_id,
963 );
964 } else {
965 self.ack_database_reset(database_id, None, req.reset_request_id);
966 }
967 }
968
969 fn ack_database_reset(
970 &mut self,
971 database_id: DatabaseId,
972 reset_output: Option<ResetDatabaseOutput>,
973 reset_request_id: u32,
974 ) {
975 info!(
976 database_id = database_id.database_id,
977 "database reset successfully"
978 );
979 if let Some(reset_database) = self.state.databases.remove(&database_id) {
980 match reset_database {
981 DatabaseStatus::Resetting(_) => {}
982 _ => {
983 unreachable!("must be resetting previously")
984 }
985 }
986 }
987 self.await_epoch_completed_futures.remove(&database_id);
988 self.control_stream_handle.ack_reset_database(
989 database_id,
990 reset_output.and_then(|output| output.root_err),
991 reset_request_id,
992 );
993 }
994
995 fn on_database_failure(
999 &mut self,
1000 database_id: DatabaseId,
1001 failed_actor: Option<ActorId>,
1002 err: StreamError,
1003 message: impl Into<String>,
1004 ) {
1005 let message = message.into();
1006 error!(database_id = database_id.database_id, ?failed_actor, message, err = ?err.as_report(), "suspend database on error");
1007 let completing_futures = self.await_epoch_completed_futures.remove(&database_id);
1008 self.state
1009 .databases
1010 .get_mut(&database_id)
1011 .expect("should exist")
1012 .suspend(failed_actor, err, completing_futures);
1013 self.control_stream_handle
1014 .send_response(Response::ReportDatabaseFailure(
1015 ReportDatabaseFailureResponse {
1016 database_id: database_id.database_id,
1017 },
1018 ));
1019 }
1020}
1021
1022impl DatabaseManagedBarrierState {
1023 async fn try_find_root_actor_failure(
1027 &mut self,
1028 first_failure: Option<(Option<ActorId>, StreamError)>,
1029 ) -> Option<ScoredStreamError> {
1030 let mut later_errs = vec![];
1031 let _ = tokio::time::timeout(Duration::from_secs(3), async {
1033 let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
1034 if let Some((Some(failed_actor), _)) = &first_failure {
1035 uncollected_actors.remove(failed_actor);
1036 }
1037 while !uncollected_actors.is_empty()
1038 && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
1039 {
1040 uncollected_actors.remove(&actor_id);
1041 later_errs.push(error);
1042 }
1043 })
1044 .await;
1045
1046 first_failure
1047 .into_iter()
1048 .map(|(_, err)| err)
1049 .chain(later_errs.into_iter())
1050 .map(|e| e.with_score())
1051 .max_by_key(|e| e.score)
1052 }
1053}
1054
1055#[derive(Clone)]
1056pub struct LocalBarrierManager {
1057 barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
1058 actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
1059 pub(crate) database_id: DatabaseId,
1060 pub(crate) term_id: String,
1061 pub(crate) env: StreamEnvironment,
1062}
1063
1064impl LocalBarrierWorker {
1065 pub fn spawn(
1067 env: StreamEnvironment,
1068 streaming_metrics: Arc<StreamingMetrics>,
1069 await_tree_reg: Option<await_tree::Registry>,
1070 watermark_epoch: AtomicU64Ref,
1071 actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1072 ) -> JoinHandle<()> {
1073 let runtime = {
1074 let mut builder = tokio::runtime::Builder::new_multi_thread();
1075 if let Some(worker_threads_num) = env.config().actor_runtime_worker_threads_num {
1076 builder.worker_threads(worker_threads_num);
1077 }
1078 builder
1079 .thread_name("rw-streaming")
1080 .enable_all()
1081 .build()
1082 .unwrap()
1083 };
1084
1085 let actor_manager = Arc::new(StreamActorManager {
1086 env: env.clone(),
1087 streaming_metrics,
1088 watermark_epoch,
1089 await_tree_reg,
1090 runtime: runtime.into(),
1091 });
1092 let worker = LocalBarrierWorker::new(actor_manager, vec![], "uninitialized".into());
1093 tokio::spawn(worker.run(actor_op_rx))
1094 }
1095}
1096
1097pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1098
1099impl<T> Clone for EventSender<T> {
1100 fn clone(&self) -> Self {
1101 Self(self.0.clone())
1102 }
1103}
1104
1105impl<T> EventSender<T> {
1106 pub(super) fn send_event(&self, event: T) {
1107 self.0.send(event).expect("should be able to send event")
1108 }
1109
1110 pub(super) async fn send_and_await<RSP>(
1111 &self,
1112 make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1113 ) -> StreamResult<RSP> {
1114 let (tx, rx) = oneshot::channel();
1115 let event = make_event(tx);
1116 self.send_event(event);
1117 rx.await
1118 .map_err(|_| anyhow!("barrier manager maybe reset").into())
1119 }
1120}
1121
1122pub(crate) enum NewOutputRequest {
1123 Local(permit::Sender),
1124 Remote(permit::Sender),
1125}
1126
1127impl LocalBarrierManager {
1128 pub(super) fn new(
1129 database_id: DatabaseId,
1130 term_id: String,
1131 env: StreamEnvironment,
1132 ) -> (
1133 Self,
1134 UnboundedReceiver<LocalBarrierEvent>,
1135 UnboundedReceiver<(ActorId, StreamError)>,
1136 ) {
1137 let (event_tx, event_rx) = unbounded_channel();
1138 let (err_tx, err_rx) = unbounded_channel();
1139 (
1140 Self {
1141 barrier_event_sender: event_tx,
1142 actor_failure_sender: err_tx,
1143 database_id,
1144 term_id,
1145 env,
1146 },
1147 event_rx,
1148 err_rx,
1149 )
1150 }
1151
1152 fn send_event(&self, event: LocalBarrierEvent) {
1153 let _ = self.barrier_event_sender.send(event);
1155 }
1156
1157 pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
1160 self.send_event(LocalBarrierEvent::ReportActorCollected {
1161 actor_id,
1162 epoch: barrier.epoch,
1163 })
1164 }
1165
1166 pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
1169 let _ = self
1170 .actor_failure_sender
1171 .send((actor_id, err.into_unexpected_exit(actor_id)));
1172 }
1173
1174 pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
1175 let (tx, rx) = mpsc::unbounded_channel();
1176 self.send_event(LocalBarrierEvent::RegisterBarrierSender {
1177 actor_id,
1178 barrier_sender: tx,
1179 });
1180 rx
1181 }
1182
1183 pub fn register_local_upstream_output(
1184 &self,
1185 actor_id: ActorId,
1186 upstream_actor_id: ActorId,
1187 ) -> permit::Receiver {
1188 let (tx, rx) = channel_from_config(self.env.config());
1189 self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
1190 actor_id,
1191 upstream_actor_id,
1192 tx,
1193 });
1194 rx
1195 }
1196}
1197
1198type ScoredStreamError = ScoredError<StreamError>;
1200
1201impl StreamError {
1202 fn with_score(self) -> ScoredStreamError {
1204 fn stream_executor_error_score(e: &StreamExecutorError) -> i32 {
1208 use crate::executor::error::ErrorKind;
1209 match e.inner() {
1210 ErrorKind::ChannelClosed(_) | ErrorKind::ExchangeChannelClosed(_) => 1,
1213
1214 ErrorKind::Uncategorized(_)
1216 | ErrorKind::Storage(_)
1217 | ErrorKind::ArrayError(_)
1218 | ErrorKind::ExprError(_)
1219 | ErrorKind::SerdeError(_)
1220 | ErrorKind::SinkError(_, _)
1221 | ErrorKind::RpcError(_)
1222 | ErrorKind::AlignBarrier(_, _)
1223 | ErrorKind::ConnectorError(_)
1224 | ErrorKind::DmlError(_)
1225 | ErrorKind::NotImplemented(_) => 999,
1226 }
1227 }
1228
1229 fn stream_error_score(e: &StreamError) -> i32 {
1230 use crate::error::ErrorKind;
1231 match e.inner() {
1232 ErrorKind::UnexpectedExit { source, .. } => stream_error_score(source),
1234
1235 ErrorKind::BarrierSend { .. } => 1,
1237
1238 ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee),
1240
1241 ErrorKind::Uncategorized(_)
1243 | ErrorKind::Storage(_)
1244 | ErrorKind::Expression(_)
1245 | ErrorKind::Array(_)
1246 | ErrorKind::Secret(_) => 1000,
1247 }
1248 }
1249
1250 let score = Score(stream_error_score(&self));
1251 ScoredStreamError { error: self, score }
1252 }
1253}
1254
1255#[cfg(test)]
1256impl LocalBarrierManager {
1257 fn spawn_for_test() -> EventSender<LocalActorOperation> {
1258 use std::sync::atomic::AtomicU64;
1259 let (tx, rx) = unbounded_channel();
1260 let _join_handle = LocalBarrierWorker::spawn(
1261 StreamEnvironment::for_test(),
1262 Arc::new(StreamingMetrics::unused()),
1263 None,
1264 Arc::new(AtomicU64::new(0)),
1265 rx,
1266 );
1267 EventSender(tx)
1268 }
1269}
1270
1271#[cfg(test)]
1272pub(crate) mod barrier_test_utils {
1273 use assert_matches::assert_matches;
1274 use futures::StreamExt;
1275 use risingwave_pb::stream_service::streaming_control_stream_request::{
1276 InitRequest, PbDatabaseInitialPartialGraph, PbInitialPartialGraph,
1277 };
1278 use risingwave_pb::stream_service::{
1279 InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse,
1280 streaming_control_stream_request, streaming_control_stream_response,
1281 };
1282 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1283 use tokio::sync::oneshot;
1284 use tokio_stream::wrappers::UnboundedReceiverStream;
1285 use tonic::Status;
1286
1287 use crate::executor::Barrier;
1288 use crate::task::barrier_manager::{ControlStreamHandle, EventSender, LocalActorOperation};
1289 use crate::task::{
1290 ActorId, LocalBarrierManager, NewOutputRequest, TEST_DATABASE_ID, TEST_PARTIAL_GRAPH_ID,
1291 };
1292
1293 pub(crate) struct LocalBarrierTestEnv {
1294 pub local_barrier_manager: LocalBarrierManager,
1295 pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1296 pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1297 pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1298 }
1299
1300 impl LocalBarrierTestEnv {
1301 pub(crate) async fn for_test() -> Self {
1302 let actor_op_tx = LocalBarrierManager::spawn_for_test();
1303
1304 let (request_tx, request_rx) = unbounded_channel();
1305 let (response_tx, mut response_rx) = unbounded_channel();
1306
1307 actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1308 handle: ControlStreamHandle::new(
1309 response_tx,
1310 UnboundedReceiverStream::new(request_rx).boxed(),
1311 ),
1312 init_request: InitRequest {
1313 databases: vec![PbDatabaseInitialPartialGraph {
1314 database_id: TEST_DATABASE_ID.database_id,
1315 graphs: vec![PbInitialPartialGraph {
1316 partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1317 subscriptions: vec![],
1318 }],
1319 }],
1320 term_id: "for_test".into(),
1321 },
1322 });
1323
1324 assert_matches!(
1325 response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1326 streaming_control_stream_response::Response::Init(_)
1327 );
1328
1329 let local_barrier_manager = actor_op_tx
1330 .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1331 .await
1332 .unwrap();
1333
1334 Self {
1335 local_barrier_manager,
1336 actor_op_tx,
1337 request_tx,
1338 response_rx,
1339 }
1340 }
1341
1342 pub(crate) fn inject_barrier(
1343 &self,
1344 barrier: &Barrier,
1345 actor_to_collect: impl IntoIterator<Item = ActorId>,
1346 ) {
1347 self.request_tx
1348 .send(Ok(StreamingControlStreamRequest {
1349 request: Some(streaming_control_stream_request::Request::InjectBarrier(
1350 InjectBarrierRequest {
1351 request_id: "".to_owned(),
1352 barrier: Some(barrier.to_protobuf()),
1353 database_id: TEST_DATABASE_ID.database_id,
1354 actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1355 table_ids_to_sync: vec![],
1356 partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1357 actors_to_build: vec![],
1358 subscriptions_to_add: vec![],
1359 subscriptions_to_remove: vec![],
1360 },
1361 )),
1362 }))
1363 .unwrap();
1364 }
1365
1366 pub(crate) async fn flush_all_events(&self) {
1367 Self::flush_all_events_impl(&self.actor_op_tx).await
1368 }
1369
1370 pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1371 let (tx, rx) = oneshot::channel();
1372 actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1373 rx.await.unwrap()
1374 }
1375
1376 pub(crate) async fn take_pending_new_output_requests(
1377 &self,
1378 actor_id: ActorId,
1379 ) -> Vec<(ActorId, NewOutputRequest)> {
1380 self.actor_op_tx
1381 .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1382 .await
1383 .unwrap()
1384 }
1385 }
1386}