1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Display;
18use std::future::{pending, poll_fn};
19use std::sync::Arc;
20use std::task::Poll;
21use std::time::Duration;
22
23use anyhow::anyhow;
24use await_tree::InstrumentAwait;
25use futures::future::BoxFuture;
26use futures::stream::{BoxStream, FuturesOrdered};
27use futures::{FutureExt, StreamExt, TryFutureExt};
28use itertools::Itertools;
29use risingwave_common::error::tonic::extra::{Score, ScoredError};
30use risingwave_pb::stream_plan::barrier::BarrierKind;
31use risingwave_pb::stream_service::barrier_complete_response::{
32 PbCreateMviewProgress, PbLocalSstableInfo,
33};
34use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
35use risingwave_storage::store_impl::AsHummock;
36use thiserror_ext::AsReport;
37use tokio::select;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
39use tokio::sync::{mpsc, oneshot};
40use tokio::task::JoinHandle;
41use tonic::{Code, Status};
42use tracing::warn;
43
44use self::managed_state::ManagedBarrierState;
45use crate::error::{IntoUnexpectedExit, StreamError, StreamResult};
46use crate::task::{ActorId, AtomicU64Ref, PartialGraphId, StreamEnvironment, UpDownActorIds};
47
48mod managed_state;
49mod progress;
50#[cfg(test)]
51mod tests;
52
53pub use progress::CreateMviewProgressReporter;
54use risingwave_common::util::epoch::EpochPair;
55use risingwave_common::util::runtime::BackgroundShutdownRuntime;
56use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
57use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
58use risingwave_pb::stream_service::streaming_control_stream_request::{
59 DatabaseInitialPartialGraph, InitRequest, Request, ResetDatabaseRequest,
60};
61use risingwave_pb::stream_service::streaming_control_stream_response::{
62 InitResponse, ReportDatabaseFailureResponse, ResetDatabaseResponse, Response, ShutdownResponse,
63};
64use risingwave_pb::stream_service::{
65 BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
66 StreamingControlStreamResponse, streaming_control_stream_response,
67};
68
69use crate::executor::exchange::permit::{Receiver, channel_from_config};
70use crate::executor::monitor::StreamingMetrics;
71use crate::executor::{Barrier, BarrierInner, StreamExecutorError};
72use crate::task::barrier_manager::managed_state::{
73 DatabaseManagedBarrierState, DatabaseStatus, ManagedBarrierStateDebugInfo,
74 ManagedBarrierStateEvent, PartialGraphManagedBarrierState, ResetDatabaseOutput,
75};
76use crate::task::barrier_manager::progress::BackfillState;
77
78pub const ENABLE_BARRIER_AGGREGATION: bool = false;
81
82#[derive(Debug)]
84pub struct BarrierCompleteResult {
85 pub sync_result: Option<SyncResult>,
87
88 pub create_mview_progress: Vec<PbCreateMviewProgress>,
90}
91
92pub(super) struct ControlStreamHandle {
93 #[expect(clippy::type_complexity)]
94 pair: Option<(
95 UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
96 BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
97 )>,
98}
99
100impl ControlStreamHandle {
101 fn empty() -> Self {
102 Self { pair: None }
103 }
104
105 pub(super) fn new(
106 sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
107 request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
108 ) -> Self {
109 Self {
110 pair: Some((sender, request_stream)),
111 }
112 }
113
114 pub(super) fn connected(&self) -> bool {
115 self.pair.is_some()
116 }
117
118 fn reset_stream_with_err(&mut self, err: Status) {
119 if let Some((sender, _)) = self.pair.take() {
120 let err = TonicStatusWrapper::new(err);
122 warn!(error = %err.as_report(), "control stream reset with error");
123
124 let err = err.into_inner();
125 if sender.send(Err(err)).is_err() {
126 warn!("failed to notify reset of control stream");
127 }
128 }
129 }
130
131 async fn shutdown_stream(&mut self) {
134 if let Some((sender, _)) = self.pair.take() {
135 if sender
136 .send(Ok(StreamingControlStreamResponse {
137 response: Some(streaming_control_stream_response::Response::Shutdown(
138 ShutdownResponse::default(),
139 )),
140 }))
141 .is_err()
142 {
143 warn!("failed to notify shutdown of control stream");
144 } else {
145 tracing::info!("waiting for meta service to close control stream...");
146
147 sender.closed().await;
154 }
155 } else {
156 debug!("control stream has been reset, ignore shutdown");
157 }
158 }
159
160 pub(super) fn ack_reset_database(
161 &mut self,
162 database_id: DatabaseId,
163 root_err: Option<ScoredStreamError>,
164 reset_request_id: u32,
165 ) {
166 self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
167 database_id: database_id.database_id,
168 root_err: root_err.map(|err| PbScoredError {
169 err_msg: err.error.to_report_string(),
170 score: err.score.0,
171 }),
172 reset_request_id,
173 }));
174 }
175
176 fn send_response(&mut self, response: streaming_control_stream_response::Response) {
177 if let Some((sender, _)) = self.pair.as_ref() {
178 if sender
179 .send(Ok(StreamingControlStreamResponse {
180 response: Some(response),
181 }))
182 .is_err()
183 {
184 self.pair = None;
185 warn!("fail to send response. control stream reset");
186 }
187 } else {
188 debug!(?response, "control stream has been reset. ignore response");
189 }
190 }
191
192 async fn next_request(&mut self) -> StreamingControlStreamRequest {
193 if let Some((_, stream)) = &mut self.pair {
194 match stream.next().await {
195 Some(Ok(request)) => {
196 return request;
197 }
198 Some(Err(e)) => self.reset_stream_with_err(
199 anyhow!(TonicStatusWrapper::new(e)) .context("failed to get request")
201 .to_status_unnamed(Code::Internal),
202 ),
203 None => self.reset_stream_with_err(Status::internal("end of stream")),
204 }
205 }
206 pending().await
207 }
208}
209
210pub(super) enum LocalBarrierEvent {
211 ReportActorCollected {
212 actor_id: ActorId,
213 epoch: EpochPair,
214 },
215 ReportCreateProgress {
216 epoch: EpochPair,
217 actor: ActorId,
218 state: BackfillState,
219 },
220 RegisterBarrierSender {
221 actor_id: ActorId,
222 barrier_sender: mpsc::UnboundedSender<Barrier>,
223 },
224 RegisterLocalUpstreamOutput {
225 actor_id: ActorId,
226 upstream_actor_id: ActorId,
227 tx: permit::Sender,
228 },
229}
230
231#[derive(strum_macros::Display)]
232pub(super) enum LocalActorOperation {
233 NewControlStream {
234 handle: ControlStreamHandle,
235 init_request: InitRequest,
236 },
237 TakeReceiver {
238 database_id: DatabaseId,
239 term_id: String,
240 ids: UpDownActorIds,
241 result_sender: oneshot::Sender<StreamResult<Receiver>>,
242 },
243 #[cfg(test)]
244 GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
245 #[cfg(test)]
246 TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
247 #[cfg(test)]
248 Flush(oneshot::Sender<()>),
249 InspectState {
250 result_sender: oneshot::Sender<String>,
251 },
252 Shutdown {
253 result_sender: oneshot::Sender<()>,
254 },
255}
256
257pub(crate) struct StreamActorManager {
258 pub(super) env: StreamEnvironment,
259 pub(super) streaming_metrics: Arc<StreamingMetrics>,
260
261 pub(super) watermark_epoch: AtomicU64Ref,
263
264 pub(super) await_tree_reg: Option<await_tree::Registry>,
266
267 pub(super) runtime: BackgroundShutdownRuntime,
269}
270
271pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
272 managed_barrier_state: HashMap<DatabaseId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
273 has_control_stream_connected: bool,
274}
275
276impl Display for LocalBarrierWorkerDebugInfo<'_> {
277 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278 writeln!(
279 f,
280 "\nhas_control_stream_connected: {}",
281 self.has_control_stream_connected
282 )?;
283
284 for (database_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
285 writeln!(
286 f,
287 "database {} status: {} managed_barrier_state:\n{}",
288 database_id.database_id,
289 status,
290 managed_barrier_state
291 .as_ref()
292 .map(ToString::to_string)
293 .unwrap_or_default()
294 )?;
295 }
296 Ok(())
297 }
298}
299
300pub(super) struct LocalBarrierWorker {
304 pub(super) state: ManagedBarrierState,
306
307 await_epoch_completed_futures: HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
309
310 control_stream_handle: ControlStreamHandle,
311
312 pub(super) actor_manager: Arc<StreamActorManager>,
313
314 pub(super) term_id: String,
315}
316
317impl LocalBarrierWorker {
318 pub(super) fn new(
319 actor_manager: Arc<StreamActorManager>,
320 initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
321 term_id: String,
322 ) -> Self {
323 let state = ManagedBarrierState::new(
324 actor_manager.clone(),
325 initial_partial_graphs,
326 term_id.clone(),
327 );
328 Self {
329 state,
330 await_epoch_completed_futures: Default::default(),
331 control_stream_handle: ControlStreamHandle::empty(),
332 actor_manager,
333 term_id,
334 }
335 }
336
337 fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
338 LocalBarrierWorkerDebugInfo {
339 managed_barrier_state: self
340 .state
341 .databases
342 .iter()
343 .map(|(database_id, status)| {
344 (*database_id, {
345 match status {
346 DatabaseStatus::ReceivedExchangeRequest(_) => {
347 ("ReceivedExchangeRequest".to_owned(), None)
348 }
349 DatabaseStatus::Running(state) => {
350 ("running".to_owned(), Some(state.to_debug_info()))
351 }
352 DatabaseStatus::Suspended(state) => {
353 (format!("suspended: {:?}", state.suspend_time), None)
354 }
355 DatabaseStatus::Resetting(_) => ("resetting".to_owned(), None),
356 DatabaseStatus::Unspecified => {
357 unreachable!()
358 }
359 }
360 })
361 })
362 .collect(),
363 has_control_stream_connected: self.control_stream_handle.connected(),
364 }
365 }
366
367 async fn next_completed_epoch(
368 futures: &mut HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
369 ) -> (
370 DatabaseId,
371 PartialGraphId,
372 Barrier,
373 StreamResult<BarrierCompleteResult>,
374 ) {
375 poll_fn(|cx| {
376 for (database_id, futures) in &mut *futures {
377 if let Poll::Ready(Some((partial_graph_id, barrier, result))) =
378 futures.poll_next_unpin(cx)
379 {
380 return Poll::Ready((*database_id, partial_graph_id, barrier, result));
381 }
382 }
383 Poll::Pending
384 })
385 .await
386 }
387
388 async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
389 loop {
390 select! {
391 biased;
392 (database_id, event) = self.state.next_event() => {
393 match event {
394 ManagedBarrierStateEvent::BarrierCollected{
395 partial_graph_id,
396 barrier,
397 } => {
398 self.complete_barrier(database_id, partial_graph_id, barrier.epoch.prev);
399 }
400 ManagedBarrierStateEvent::ActorError{
401 actor_id,
402 err,
403 } => {
404 self.on_database_failure(database_id, Some(actor_id), err, "recv actor failure");
405 }
406 ManagedBarrierStateEvent::DatabaseReset(output, reset_request_id) => {
407 self.ack_database_reset(database_id, Some(output), reset_request_id);
408 }
409 }
410 }
411 (database_id, partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
412 match result {
413 Ok(result) => {
414 self.on_epoch_completed(database_id, partial_graph_id, barrier.epoch.prev, result);
415 }
416 Err(err) => {
417 self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {} {:?} {:?}", database_id, partial_graph_id.0, barrier.epoch, err.as_report())));
421 }
422 }
423 },
424 actor_op = actor_op_rx.recv() => {
425 if let Some(actor_op) = actor_op {
426 match actor_op {
427 LocalActorOperation::NewControlStream { handle, init_request } => {
428 self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
429 self.reset(init_request).await;
430 self.control_stream_handle = handle;
431 self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
432 }
433 LocalActorOperation::Shutdown { result_sender } => {
434 if self.state.databases.values().any(|database| {
435 match database {
436 DatabaseStatus::Running(database) => {
437 !database.actor_states.is_empty()
438 }
439 DatabaseStatus::Suspended(_) | DatabaseStatus::Resetting(_) |
440 DatabaseStatus::ReceivedExchangeRequest(_) => {
441 false
442 }
443 DatabaseStatus::Unspecified => {
444 unreachable!()
445 }
446 }
447 }) {
448 tracing::warn!(
449 "shutdown with running actors, scaling or migration will be triggered"
450 );
451 }
452 self.control_stream_handle.shutdown_stream().await;
453 let _ = result_sender.send(());
454 }
455 actor_op => {
456 self.handle_actor_op(actor_op);
457 }
458 }
459 }
460 else {
461 break;
462 }
463 },
464 request = self.control_stream_handle.next_request() => {
465 let result = self.handle_streaming_control_request(request.request.expect("non empty"));
466 if let Err((database_id, err)) = result {
467 self.on_database_failure(database_id, None, err, "failed to inject barrier");
468 }
469 },
470 }
471 }
472 }
473
474 fn handle_streaming_control_request(
475 &mut self,
476 request: Request,
477 ) -> Result<(), (DatabaseId, StreamError)> {
478 match request {
479 Request::InjectBarrier(req) => {
480 let database_id = DatabaseId::new(req.database_id);
481 let result: StreamResult<()> = try {
482 let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
483 self.send_barrier(&barrier, req)?;
484 };
485 result.map_err(|e| (database_id, e))?;
486 Ok(())
487 }
488 Request::RemovePartialGraph(req) => {
489 self.remove_partial_graphs(
490 DatabaseId::new(req.database_id),
491 req.partial_graph_ids.into_iter().map(PartialGraphId::new),
492 );
493 Ok(())
494 }
495 Request::CreatePartialGraph(req) => {
496 self.add_partial_graph(
497 DatabaseId::new(req.database_id),
498 PartialGraphId::new(req.partial_graph_id),
499 );
500 Ok(())
501 }
502 Request::ResetDatabase(req) => {
503 self.reset_database(req);
504 Ok(())
505 }
506 Request::Init(_) => {
507 unreachable!()
508 }
509 }
510 }
511
512 fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
513 match actor_op {
514 LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
515 unreachable!("event {actor_op} should be handled separately in async context")
516 }
517 LocalActorOperation::TakeReceiver {
518 database_id,
519 term_id,
520 ids,
521 result_sender,
522 } => {
523 let err = if self.term_id != term_id {
524 {
525 warn!(
526 ?ids,
527 term_id,
528 current_term_id = self.term_id,
529 "take receiver on unmatched term_id"
530 );
531 anyhow!(
532 "take receiver {:?} on unmatched term_id {} to current term_id {}",
533 ids,
534 term_id,
535 self.term_id
536 )
537 }
538 } else {
539 match self.state.databases.entry(database_id) {
540 Entry::Occupied(mut entry) => match entry.get_mut() {
541 DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
542 pending_requests.push((ids, result_sender));
543 return;
544 }
545 DatabaseStatus::Running(database) => {
546 let (upstream_actor_id, actor_id) = ids;
547 database.new_actor_remote_output_request(
548 actor_id,
549 upstream_actor_id,
550 result_sender,
551 );
552 return;
553 }
554 DatabaseStatus::Suspended(_) => {
555 anyhow!("database suspended")
556 }
557 DatabaseStatus::Resetting(_) => {
558 anyhow!("database resetting")
559 }
560 DatabaseStatus::Unspecified => {
561 unreachable!()
562 }
563 },
564 Entry::Vacant(entry) => {
565 entry.insert(DatabaseStatus::ReceivedExchangeRequest(vec![(
566 ids,
567 result_sender,
568 )]));
569 return;
570 }
571 }
572 };
573 let _ = result_sender.send(Err(err.into()));
574 }
575 #[cfg(test)]
576 LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
577 let database_status = self
578 .state
579 .databases
580 .get(&crate::task::TEST_DATABASE_ID)
581 .unwrap();
582 let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
583 let _ = sender.send(database_state.local_barrier_manager.clone());
584 }
585 #[cfg(test)]
586 LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
587 let database_status = self
588 .state
589 .databases
590 .get_mut(&crate::task::TEST_DATABASE_ID)
591 .unwrap();
592
593 let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
594 assert!(!database_state.actor_states.contains_key(&actor_id));
595 let requests = database_state
596 .actor_pending_new_output_requests
597 .remove(&actor_id)
598 .unwrap();
599 let _ = sender.send(requests);
600 }
601 #[cfg(test)]
602 LocalActorOperation::Flush(sender) => {
603 use futures::FutureExt;
604 while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
605 self.handle_streaming_control_request(
606 request.request.expect("should not be empty"),
607 )
608 .unwrap();
609 }
610 while let Some((database_id, event)) = self.state.next_event().now_or_never() {
611 match event {
612 ManagedBarrierStateEvent::BarrierCollected {
613 partial_graph_id,
614 barrier,
615 } => {
616 self.complete_barrier(
617 database_id,
618 partial_graph_id,
619 barrier.epoch.prev,
620 );
621 }
622 ManagedBarrierStateEvent::ActorError { .. }
623 | ManagedBarrierStateEvent::DatabaseReset(..) => {
624 unreachable!()
625 }
626 }
627 }
628 sender.send(()).unwrap()
629 }
630 LocalActorOperation::InspectState { result_sender } => {
631 let debug_info = self.to_debug_info();
632 let _ = result_sender.send(debug_info.to_string());
633 }
634 }
635 }
636}
637
638mod await_epoch_completed_future {
639 use std::future::Future;
640
641 use futures::FutureExt;
642 use futures::future::BoxFuture;
643 use risingwave_hummock_sdk::SyncResult;
644 use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
645
646 use crate::error::StreamResult;
647 use crate::executor::Barrier;
648 use crate::task::{BarrierCompleteResult, PartialGraphId, await_tree_key};
649
650 pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
651 + 'static;
652
653 #[define_opaque(AwaitEpochCompletedFuture)]
654 pub(super) fn instrument_complete_barrier_future(
655 partial_graph_id: PartialGraphId,
656 complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
657 barrier: Barrier,
658 barrier_await_tree_reg: Option<&await_tree::Registry>,
659 create_mview_progress: Vec<PbCreateMviewProgress>,
660 ) -> AwaitEpochCompletedFuture {
661 let prev_epoch = barrier.epoch.prev;
662 let future = async move {
663 if let Some(future) = complete_barrier_future {
664 let result = future.await;
665 result.map(Some)
666 } else {
667 Ok(None)
668 }
669 }
670 .map(move |result| {
671 (
672 partial_graph_id,
673 barrier,
674 result.map(|sync_result| BarrierCompleteResult {
675 sync_result,
676 create_mview_progress,
677 }),
678 )
679 });
680 if let Some(reg) = barrier_await_tree_reg {
681 reg.register(
682 await_tree_key::BarrierAwait { prev_epoch },
683 format!("SyncEpoch({})", prev_epoch),
684 )
685 .instrument(future)
686 .left_future()
687 } else {
688 future.right_future()
689 }
690 }
691}
692
693use await_epoch_completed_future::*;
694use risingwave_common::catalog::{DatabaseId, TableId};
695use risingwave_storage::{StateStoreImpl, dispatch_state_store};
696
697use crate::executor::exchange::permit;
698
699fn sync_epoch(
700 state_store: &StateStoreImpl,
701 streaming_metrics: &StreamingMetrics,
702 prev_epoch: u64,
703 table_ids: HashSet<TableId>,
704) -> BoxFuture<'static, StreamResult<SyncResult>> {
705 let timer = streaming_metrics.barrier_sync_latency.start_timer();
706
707 let state_store = state_store.clone();
708 let future = async move {
709 dispatch_state_store!(state_store, hummock, {
710 hummock.sync(vec![(prev_epoch, table_ids)]).await
711 })
712 };
713
714 future
715 .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
716 .inspect_ok(move |_| {
717 timer.observe_duration();
718 })
719 .map_err(move |e| {
720 tracing::error!(
721 prev_epoch,
722 error = %e.as_report(),
723 "Failed to sync state store",
724 );
725 e.into()
726 })
727 .boxed()
728}
729
730impl LocalBarrierWorker {
731 fn complete_barrier(
732 &mut self,
733 database_id: DatabaseId,
734 partial_graph_id: PartialGraphId,
735 prev_epoch: u64,
736 ) {
737 {
738 let Some(database_state) = self
739 .state
740 .databases
741 .get_mut(&database_id)
742 .expect("should exist")
743 .state_for_request()
744 else {
745 return;
746 };
747 let (barrier, table_ids, create_mview_progress) =
748 database_state.pop_barrier_to_complete(partial_graph_id, prev_epoch);
749
750 let complete_barrier_future = match &barrier.kind {
751 BarrierKind::Unspecified => unreachable!(),
752 BarrierKind::Initial => {
753 tracing::info!(
754 epoch = prev_epoch,
755 "ignore sealing data for the first barrier"
756 );
757 tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
758 None
759 }
760 BarrierKind::Barrier => None,
761 BarrierKind::Checkpoint => Some(sync_epoch(
762 &self.actor_manager.env.state_store(),
763 &self.actor_manager.streaming_metrics,
764 prev_epoch,
765 table_ids.expect("should be Some on BarrierKind::Checkpoint"),
766 )),
767 };
768
769 self.await_epoch_completed_futures
770 .entry(database_id)
771 .or_default()
772 .push_back({
773 instrument_complete_barrier_future(
774 partial_graph_id,
775 complete_barrier_future,
776 barrier,
777 self.actor_manager.await_tree_reg.as_ref(),
778 create_mview_progress,
779 )
780 });
781 }
782 }
783
784 fn on_epoch_completed(
785 &mut self,
786 database_id: DatabaseId,
787 partial_graph_id: PartialGraphId,
788 epoch: u64,
789 result: BarrierCompleteResult,
790 ) {
791 let BarrierCompleteResult {
792 create_mview_progress,
793 sync_result,
794 } = result;
795
796 let (synced_sstables, table_watermarks, old_value_ssts) = sync_result
797 .map(|sync_result| {
798 (
799 sync_result.uncommitted_ssts,
800 sync_result.table_watermarks,
801 sync_result.old_value_ssts,
802 )
803 })
804 .unwrap_or_default();
805
806 let result = {
807 {
808 streaming_control_stream_response::Response::CompleteBarrier(
809 BarrierCompleteResponse {
810 request_id: "todo".to_owned(),
811 partial_graph_id: partial_graph_id.into(),
812 epoch,
813 status: None,
814 create_mview_progress,
815 synced_sstables: synced_sstables
816 .into_iter()
817 .map(
818 |LocalSstableInfo {
819 sst_info,
820 table_stats,
821 created_at,
822 }| PbLocalSstableInfo {
823 sst: Some(sst_info.into()),
824 table_stats_map: to_prost_table_stats_map(table_stats),
825 created_at,
826 },
827 )
828 .collect_vec(),
829 worker_id: self.actor_manager.env.worker_id(),
830 table_watermarks: table_watermarks
831 .into_iter()
832 .map(|(key, value)| (key.table_id, value.into()))
833 .collect(),
834 old_value_sstables: old_value_ssts
835 .into_iter()
836 .map(|sst| sst.sst_info.into())
837 .collect(),
838 database_id: database_id.database_id,
839 },
840 )
841 }
842 };
843
844 self.control_stream_handle.send_response(result);
845 }
846
847 fn send_barrier(
854 &mut self,
855 barrier: &Barrier,
856 request: InjectBarrierRequest,
857 ) -> StreamResult<()> {
858 debug!(
859 target: "events::stream::barrier::manager::send",
860 "send barrier {:?}, actor_ids_to_collect = {:?}",
861 barrier,
862 request.actor_ids_to_collect
863 );
864
865 let database_status = self
866 .state
867 .databases
868 .get_mut(&DatabaseId::new(request.database_id))
869 .expect("should exist");
870 if let Some(state) = database_status.state_for_request() {
871 state.transform_to_issued(barrier, request)?;
872 }
873 Ok(())
874 }
875
876 fn remove_partial_graphs(
877 &mut self,
878 database_id: DatabaseId,
879 partial_graph_ids: impl Iterator<Item = PartialGraphId>,
880 ) {
881 let Some(database_status) = self.state.databases.get_mut(&database_id) else {
882 warn!(
883 database_id = database_id.database_id,
884 "database to remove partial graph not exist"
885 );
886 return;
887 };
888 let Some(database_state) = database_status.state_for_request() else {
889 warn!(
890 database_id = database_id.database_id,
891 "ignore remove partial graph request on err database",
892 );
893 return;
894 };
895 for partial_graph_id in partial_graph_ids {
896 if let Some(graph) = database_state.graph_states.remove(&partial_graph_id) {
897 assert!(
898 graph.is_empty(),
899 "non empty graph to be removed: {}",
900 &graph
901 );
902 } else {
903 warn!(
904 partial_graph_id = partial_graph_id.0,
905 "no partial graph to remove"
906 );
907 }
908 }
909 }
910
911 fn add_partial_graph(&mut self, database_id: DatabaseId, partial_graph_id: PartialGraphId) {
912 let status = match self.state.databases.entry(database_id) {
913 Entry::Occupied(entry) => {
914 let status = entry.into_mut();
915 if let DatabaseStatus::ReceivedExchangeRequest(pending_requests) = status {
916 let mut database = DatabaseManagedBarrierState::new(
917 database_id,
918 self.term_id.clone(),
919 self.actor_manager.clone(),
920 vec![],
921 );
922 for ((upstream_actor_id, actor_id), result_sender) in pending_requests.drain(..)
923 {
924 database.new_actor_remote_output_request(
925 actor_id,
926 upstream_actor_id,
927 result_sender,
928 );
929 }
930 *status = DatabaseStatus::Running(database);
931 }
932
933 status
934 }
935 Entry::Vacant(entry) => {
936 entry.insert(DatabaseStatus::Running(DatabaseManagedBarrierState::new(
937 database_id,
938 self.term_id.clone(),
939 self.actor_manager.clone(),
940 vec![],
941 )))
942 }
943 };
944 if let Some(state) = status.state_for_request() {
945 assert!(
946 state
947 .graph_states
948 .insert(
949 partial_graph_id,
950 PartialGraphManagedBarrierState::new(&self.actor_manager)
951 )
952 .is_none()
953 );
954 }
955 }
956
957 fn reset_database(&mut self, req: ResetDatabaseRequest) {
958 let database_id = DatabaseId::new(req.database_id);
959 if let Some(database_status) = self.state.databases.get_mut(&database_id) {
960 database_status.start_reset(
961 database_id,
962 self.await_epoch_completed_futures.remove(&database_id),
963 req.reset_request_id,
964 );
965 } else {
966 self.ack_database_reset(database_id, None, req.reset_request_id);
967 }
968 }
969
970 fn ack_database_reset(
971 &mut self,
972 database_id: DatabaseId,
973 reset_output: Option<ResetDatabaseOutput>,
974 reset_request_id: u32,
975 ) {
976 info!(
977 database_id = database_id.database_id,
978 "database reset successfully"
979 );
980 if let Some(reset_database) = self.state.databases.remove(&database_id) {
981 match reset_database {
982 DatabaseStatus::Resetting(_) => {}
983 _ => {
984 unreachable!("must be resetting previously")
985 }
986 }
987 }
988 self.await_epoch_completed_futures.remove(&database_id);
989 self.control_stream_handle.ack_reset_database(
990 database_id,
991 reset_output.and_then(|output| output.root_err),
992 reset_request_id,
993 );
994 }
995
996 fn on_database_failure(
1000 &mut self,
1001 database_id: DatabaseId,
1002 failed_actor: Option<ActorId>,
1003 err: StreamError,
1004 message: impl Into<String>,
1005 ) {
1006 let message = message.into();
1007 error!(database_id = database_id.database_id, ?failed_actor, message, err = ?err.as_report(), "suspend database on error");
1008 let completing_futures = self.await_epoch_completed_futures.remove(&database_id);
1009 self.state
1010 .databases
1011 .get_mut(&database_id)
1012 .expect("should exist")
1013 .suspend(failed_actor, err, completing_futures);
1014 self.control_stream_handle
1015 .send_response(Response::ReportDatabaseFailure(
1016 ReportDatabaseFailureResponse {
1017 database_id: database_id.database_id,
1018 },
1019 ));
1020 }
1021}
1022
1023impl DatabaseManagedBarrierState {
1024 async fn try_find_root_actor_failure(
1028 &mut self,
1029 first_failure: Option<(Option<ActorId>, StreamError)>,
1030 ) -> Option<ScoredStreamError> {
1031 let mut later_errs = vec![];
1032 let _ = tokio::time::timeout(Duration::from_secs(3), async {
1034 let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
1035 if let Some((Some(failed_actor), _)) = &first_failure {
1036 uncollected_actors.remove(failed_actor);
1037 }
1038 while !uncollected_actors.is_empty()
1039 && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
1040 {
1041 uncollected_actors.remove(&actor_id);
1042 later_errs.push(error);
1043 }
1044 })
1045 .await;
1046
1047 first_failure
1048 .into_iter()
1049 .map(|(_, err)| err)
1050 .chain(later_errs.into_iter())
1051 .map(|e| e.with_score())
1052 .max_by_key(|e| e.score)
1053 }
1054}
1055
1056#[derive(Clone)]
1057pub struct LocalBarrierManager {
1058 barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
1059 actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
1060 pub(crate) database_id: DatabaseId,
1061 pub(crate) term_id: String,
1062 pub(crate) env: StreamEnvironment,
1063}
1064
1065impl LocalBarrierWorker {
1066 pub fn spawn(
1068 env: StreamEnvironment,
1069 streaming_metrics: Arc<StreamingMetrics>,
1070 await_tree_reg: Option<await_tree::Registry>,
1071 watermark_epoch: AtomicU64Ref,
1072 actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1073 ) -> JoinHandle<()> {
1074 let runtime = {
1075 let mut builder = tokio::runtime::Builder::new_multi_thread();
1076 if let Some(worker_threads_num) = env.config().actor_runtime_worker_threads_num {
1077 builder.worker_threads(worker_threads_num);
1078 }
1079 builder
1080 .thread_name("rw-streaming")
1081 .enable_all()
1082 .build()
1083 .unwrap()
1084 };
1085
1086 let actor_manager = Arc::new(StreamActorManager {
1087 env: env.clone(),
1088 streaming_metrics,
1089 watermark_epoch,
1090 await_tree_reg,
1091 runtime: runtime.into(),
1092 });
1093 let worker = LocalBarrierWorker::new(actor_manager, vec![], "uninitialized".into());
1094 tokio::spawn(worker.run(actor_op_rx))
1095 }
1096}
1097
1098pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1099
1100impl<T> Clone for EventSender<T> {
1101 fn clone(&self) -> Self {
1102 Self(self.0.clone())
1103 }
1104}
1105
1106impl<T> EventSender<T> {
1107 pub(super) fn send_event(&self, event: T) {
1108 self.0.send(event).expect("should be able to send event")
1109 }
1110
1111 pub(super) async fn send_and_await<RSP>(
1112 &self,
1113 make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1114 ) -> StreamResult<RSP> {
1115 let (tx, rx) = oneshot::channel();
1116 let event = make_event(tx);
1117 self.send_event(event);
1118 rx.await
1119 .map_err(|_| anyhow!("barrier manager maybe reset").into())
1120 }
1121}
1122
1123pub(crate) enum NewOutputRequest {
1124 Local(permit::Sender),
1125 Remote(permit::Sender),
1126}
1127
1128impl LocalBarrierManager {
1129 pub(super) fn new(
1130 database_id: DatabaseId,
1131 term_id: String,
1132 env: StreamEnvironment,
1133 ) -> (
1134 Self,
1135 UnboundedReceiver<LocalBarrierEvent>,
1136 UnboundedReceiver<(ActorId, StreamError)>,
1137 ) {
1138 let (event_tx, event_rx) = unbounded_channel();
1139 let (err_tx, err_rx) = unbounded_channel();
1140 (
1141 Self {
1142 barrier_event_sender: event_tx,
1143 actor_failure_sender: err_tx,
1144 database_id,
1145 term_id,
1146 env,
1147 },
1148 event_rx,
1149 err_rx,
1150 )
1151 }
1152
1153 fn send_event(&self, event: LocalBarrierEvent) {
1154 let _ = self.barrier_event_sender.send(event);
1156 }
1157
1158 pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
1161 self.send_event(LocalBarrierEvent::ReportActorCollected {
1162 actor_id,
1163 epoch: barrier.epoch,
1164 })
1165 }
1166
1167 pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
1170 let _ = self
1171 .actor_failure_sender
1172 .send((actor_id, err.into_unexpected_exit(actor_id)));
1173 }
1174
1175 pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
1176 let (tx, rx) = mpsc::unbounded_channel();
1177 self.send_event(LocalBarrierEvent::RegisterBarrierSender {
1178 actor_id,
1179 barrier_sender: tx,
1180 });
1181 rx
1182 }
1183
1184 pub fn register_local_upstream_output(
1185 &self,
1186 actor_id: ActorId,
1187 upstream_actor_id: ActorId,
1188 ) -> permit::Receiver {
1189 let (tx, rx) = channel_from_config(self.env.config());
1190 self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
1191 actor_id,
1192 upstream_actor_id,
1193 tx,
1194 });
1195 rx
1196 }
1197}
1198
1199type ScoredStreamError = ScoredError<StreamError>;
1201
1202impl StreamError {
1203 fn with_score(self) -> ScoredStreamError {
1205 fn stream_executor_error_score(e: &StreamExecutorError) -> i32 {
1209 use crate::executor::error::ErrorKind;
1210 match e.inner() {
1211 ErrorKind::ChannelClosed(_) | ErrorKind::ExchangeChannelClosed(_) => 1,
1214
1215 ErrorKind::Uncategorized(_)
1217 | ErrorKind::Storage(_)
1218 | ErrorKind::ArrayError(_)
1219 | ErrorKind::ExprError(_)
1220 | ErrorKind::SerdeError(_)
1221 | ErrorKind::SinkError(_, _)
1222 | ErrorKind::RpcError(_)
1223 | ErrorKind::AlignBarrier(_, _)
1224 | ErrorKind::ConnectorError(_)
1225 | ErrorKind::DmlError(_)
1226 | ErrorKind::NotImplemented(_) => 999,
1227 }
1228 }
1229
1230 fn stream_error_score(e: &StreamError) -> i32 {
1231 use crate::error::ErrorKind;
1232 match e.inner() {
1233 ErrorKind::UnexpectedExit { source, .. } => stream_error_score(source),
1235
1236 ErrorKind::BarrierSend { .. } => 1,
1238
1239 ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee),
1241
1242 ErrorKind::Uncategorized(_)
1244 | ErrorKind::Storage(_)
1245 | ErrorKind::Expression(_)
1246 | ErrorKind::Array(_)
1247 | ErrorKind::Secret(_) => 1000,
1248 }
1249 }
1250
1251 let score = Score(stream_error_score(&self));
1252 ScoredStreamError { error: self, score }
1253 }
1254}
1255
1256#[cfg(test)]
1257impl LocalBarrierManager {
1258 fn spawn_for_test() -> EventSender<LocalActorOperation> {
1259 use std::sync::atomic::AtomicU64;
1260 let (tx, rx) = unbounded_channel();
1261 let _join_handle = LocalBarrierWorker::spawn(
1262 StreamEnvironment::for_test(),
1263 Arc::new(StreamingMetrics::unused()),
1264 None,
1265 Arc::new(AtomicU64::new(0)),
1266 rx,
1267 );
1268 EventSender(tx)
1269 }
1270}
1271
1272#[cfg(test)]
1273pub(crate) mod barrier_test_utils {
1274 use assert_matches::assert_matches;
1275 use futures::StreamExt;
1276 use risingwave_pb::stream_service::streaming_control_stream_request::{
1277 InitRequest, PbDatabaseInitialPartialGraph, PbInitialPartialGraph,
1278 };
1279 use risingwave_pb::stream_service::{
1280 InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse,
1281 streaming_control_stream_request, streaming_control_stream_response,
1282 };
1283 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1284 use tokio::sync::oneshot;
1285 use tokio_stream::wrappers::UnboundedReceiverStream;
1286 use tonic::Status;
1287
1288 use crate::executor::Barrier;
1289 use crate::task::barrier_manager::{ControlStreamHandle, EventSender, LocalActorOperation};
1290 use crate::task::{
1291 ActorId, LocalBarrierManager, NewOutputRequest, TEST_DATABASE_ID, TEST_PARTIAL_GRAPH_ID,
1292 };
1293
1294 pub(crate) struct LocalBarrierTestEnv {
1295 pub local_barrier_manager: LocalBarrierManager,
1296 pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1297 pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1298 pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1299 }
1300
1301 impl LocalBarrierTestEnv {
1302 pub(crate) async fn for_test() -> Self {
1303 let actor_op_tx = LocalBarrierManager::spawn_for_test();
1304
1305 let (request_tx, request_rx) = unbounded_channel();
1306 let (response_tx, mut response_rx) = unbounded_channel();
1307
1308 actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1309 handle: ControlStreamHandle::new(
1310 response_tx,
1311 UnboundedReceiverStream::new(request_rx).boxed(),
1312 ),
1313 init_request: InitRequest {
1314 databases: vec![PbDatabaseInitialPartialGraph {
1315 database_id: TEST_DATABASE_ID.database_id,
1316 graphs: vec![PbInitialPartialGraph {
1317 partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1318 subscriptions: vec![],
1319 }],
1320 }],
1321 term_id: "for_test".into(),
1322 },
1323 });
1324
1325 assert_matches!(
1326 response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1327 streaming_control_stream_response::Response::Init(_)
1328 );
1329
1330 let local_barrier_manager = actor_op_tx
1331 .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1332 .await
1333 .unwrap();
1334
1335 Self {
1336 local_barrier_manager,
1337 actor_op_tx,
1338 request_tx,
1339 response_rx,
1340 }
1341 }
1342
1343 pub(crate) fn inject_barrier(
1344 &self,
1345 barrier: &Barrier,
1346 actor_to_collect: impl IntoIterator<Item = ActorId>,
1347 ) {
1348 self.request_tx
1349 .send(Ok(StreamingControlStreamRequest {
1350 request: Some(streaming_control_stream_request::Request::InjectBarrier(
1351 InjectBarrierRequest {
1352 request_id: "".to_owned(),
1353 barrier: Some(barrier.to_protobuf()),
1354 database_id: TEST_DATABASE_ID.database_id,
1355 actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1356 table_ids_to_sync: vec![],
1357 partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1358 actors_to_build: vec![],
1359 subscriptions_to_add: vec![],
1360 subscriptions_to_remove: vec![],
1361 },
1362 )),
1363 }))
1364 .unwrap();
1365 }
1366
1367 pub(crate) async fn flush_all_events(&self) {
1368 Self::flush_all_events_impl(&self.actor_op_tx).await
1369 }
1370
1371 pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1372 let (tx, rx) = oneshot::channel();
1373 actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1374 rx.await.unwrap()
1375 }
1376
1377 pub(crate) async fn take_pending_new_output_requests(
1378 &self,
1379 actor_id: ActorId,
1380 ) -> Vec<(ActorId, NewOutputRequest)> {
1381 self.actor_op_tx
1382 .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1383 .await
1384 .unwrap()
1385 }
1386 }
1387}