1use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::future::{pending, poll_fn};
18use std::sync::Arc;
19use std::task::Poll;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use await_tree::InstrumentAwait;
24use futures::future::BoxFuture;
25use futures::stream::{BoxStream, FuturesOrdered};
26use futures::{FutureExt, StreamExt, TryFutureExt};
27use itertools::Itertools;
28use risingwave_common::error::tonic::extra::{Score, ScoredError};
29use risingwave_pb::stream_plan::barrier::BarrierKind;
30use risingwave_pb::stream_service::barrier_complete_response::{
31 PbCreateMviewProgress, PbLocalSstableInfo,
32};
33use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
34use risingwave_storage::store_impl::AsHummock;
35use thiserror_ext::AsReport;
36use tokio::select;
37use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
38use tokio::sync::{mpsc, oneshot};
39use tokio::task::JoinHandle;
40use tonic::{Code, Status};
41use tracing::warn;
42
43use self::managed_state::ManagedBarrierState;
44use crate::error::{IntoUnexpectedExit, StreamError, StreamResult};
45use crate::task::{
46 ActorId, AtomicU64Ref, PartialGraphId, SharedContext, StreamEnvironment, UpDownActorIds,
47};
48
49mod managed_state;
50mod progress;
51#[cfg(test)]
52mod tests;
53
54pub use progress::CreateMviewProgressReporter;
55use risingwave_common::util::epoch::EpochPair;
56use risingwave_common::util::runtime::BackgroundShutdownRuntime;
57use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
58use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
59use risingwave_pb::stream_service::streaming_control_stream_request::{
60 DatabaseInitialPartialGraph, InitRequest, Request, ResetDatabaseRequest,
61};
62use risingwave_pb::stream_service::streaming_control_stream_response::{
63 InitResponse, ReportDatabaseFailureResponse, ResetDatabaseResponse, Response, ShutdownResponse,
64};
65use risingwave_pb::stream_service::{
66 BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
67 StreamingControlStreamResponse, streaming_control_stream_response,
68};
69
70use crate::executor::exchange::permit::Receiver;
71use crate::executor::monitor::StreamingMetrics;
72use crate::executor::{Barrier, BarrierInner, StreamExecutorError};
73use crate::task::barrier_manager::managed_state::{
74 DatabaseManagedBarrierState, DatabaseStatus, ManagedBarrierStateDebugInfo,
75 ManagedBarrierStateEvent, PartialGraphManagedBarrierState, ResetDatabaseOutput,
76};
77use crate::task::barrier_manager::progress::BackfillState;
78
79pub const ENABLE_BARRIER_AGGREGATION: bool = false;
82
83#[derive(Debug)]
85pub struct BarrierCompleteResult {
86 pub sync_result: Option<SyncResult>,
88
89 pub create_mview_progress: Vec<PbCreateMviewProgress>,
91}
92
93pub(super) struct ControlStreamHandle {
94 #[expect(clippy::type_complexity)]
95 pair: Option<(
96 UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
97 BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
98 )>,
99}
100
101impl ControlStreamHandle {
102 fn empty() -> Self {
103 Self { pair: None }
104 }
105
106 pub(super) fn new(
107 sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
108 request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
109 ) -> Self {
110 Self {
111 pair: Some((sender, request_stream)),
112 }
113 }
114
115 pub(super) fn connected(&self) -> bool {
116 self.pair.is_some()
117 }
118
119 fn reset_stream_with_err(&mut self, err: Status) {
120 if let Some((sender, _)) = self.pair.take() {
121 let err = TonicStatusWrapper::new(err);
123 warn!(error = %err.as_report(), "control stream reset with error");
124
125 let err = err.into_inner();
126 if sender.send(Err(err)).is_err() {
127 warn!("failed to notify reset of control stream");
128 }
129 }
130 }
131
132 async fn shutdown_stream(&mut self) {
135 if let Some((sender, _)) = self.pair.take() {
136 if sender
137 .send(Ok(StreamingControlStreamResponse {
138 response: Some(streaming_control_stream_response::Response::Shutdown(
139 ShutdownResponse::default(),
140 )),
141 }))
142 .is_err()
143 {
144 warn!("failed to notify shutdown of control stream");
145 } else {
146 tracing::info!("waiting for meta service to close control stream...");
147
148 sender.closed().await;
155 }
156 } else {
157 debug!("control stream has been reset, ignore shutdown");
158 }
159 }
160
161 pub(super) fn ack_reset_database(
162 &mut self,
163 database_id: DatabaseId,
164 root_err: Option<ScoredStreamError>,
165 reset_request_id: u32,
166 ) {
167 self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
168 database_id: database_id.database_id,
169 root_err: root_err.map(|err| PbScoredError {
170 err_msg: err.error.to_report_string(),
171 score: err.score.0,
172 }),
173 reset_request_id,
174 }));
175 }
176
177 fn send_response(&mut self, response: streaming_control_stream_response::Response) {
178 if let Some((sender, _)) = self.pair.as_ref() {
179 if sender
180 .send(Ok(StreamingControlStreamResponse {
181 response: Some(response),
182 }))
183 .is_err()
184 {
185 self.pair = None;
186 warn!("fail to send response. control stream reset");
187 }
188 } else {
189 debug!(?response, "control stream has been reset. ignore response");
190 }
191 }
192
193 async fn next_request(&mut self) -> StreamingControlStreamRequest {
194 if let Some((_, stream)) = &mut self.pair {
195 match stream.next().await {
196 Some(Ok(request)) => {
197 return request;
198 }
199 Some(Err(e)) => self.reset_stream_with_err(
200 anyhow!(TonicStatusWrapper::new(e)) .context("failed to get request")
202 .to_status_unnamed(Code::Internal),
203 ),
204 None => self.reset_stream_with_err(Status::internal("end of stream")),
205 }
206 }
207 pending().await
208 }
209}
210
211pub(super) enum LocalBarrierEvent {
212 ReportActorCollected {
213 actor_id: ActorId,
214 epoch: EpochPair,
215 },
216 ReportCreateProgress {
217 epoch: EpochPair,
218 actor: ActorId,
219 state: BackfillState,
220 },
221 RegisterBarrierSender {
222 actor_id: ActorId,
223 barrier_sender: mpsc::UnboundedSender<Barrier>,
224 },
225}
226
227#[derive(strum_macros::Display)]
228pub(super) enum LocalActorOperation {
229 NewControlStream {
230 handle: ControlStreamHandle,
231 init_request: InitRequest,
232 },
233 TakeReceiver {
234 database_id: DatabaseId,
235 term_id: String,
236 ids: UpDownActorIds,
237 result_sender: oneshot::Sender<StreamResult<Receiver>>,
238 },
239 #[cfg(test)]
240 GetCurrentSharedContext(oneshot::Sender<(Arc<SharedContext>, LocalBarrierManager)>),
241 #[cfg(test)]
242 Flush(oneshot::Sender<()>),
243 InspectState {
244 result_sender: oneshot::Sender<String>,
245 },
246 Shutdown {
247 result_sender: oneshot::Sender<()>,
248 },
249}
250
251pub(crate) struct StreamActorManager {
252 pub(super) env: StreamEnvironment,
253 pub(super) streaming_metrics: Arc<StreamingMetrics>,
254
255 pub(super) watermark_epoch: AtomicU64Ref,
257
258 pub(super) await_tree_reg: Option<await_tree::Registry>,
260
261 pub(super) runtime: BackgroundShutdownRuntime,
263}
264
265pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
266 managed_barrier_state: HashMap<DatabaseId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
267 has_control_stream_connected: bool,
268}
269
270impl Display for LocalBarrierWorkerDebugInfo<'_> {
271 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
272 writeln!(
273 f,
274 "\nhas_control_stream_connected: {}",
275 self.has_control_stream_connected
276 )?;
277
278 for (database_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
279 writeln!(
280 f,
281 "database {} status: {} managed_barrier_state:\n{}",
282 database_id.database_id,
283 status,
284 managed_barrier_state
285 .as_ref()
286 .map(ToString::to_string)
287 .unwrap_or_default()
288 )?;
289 }
290 Ok(())
291 }
292}
293
294pub(super) struct LocalBarrierWorker {
298 pub(super) state: ManagedBarrierState,
300
301 await_epoch_completed_futures: HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
303
304 control_stream_handle: ControlStreamHandle,
305
306 pub(super) actor_manager: Arc<StreamActorManager>,
307
308 pub(super) term_id: String,
309}
310
311impl LocalBarrierWorker {
312 pub(super) fn new(
313 actor_manager: Arc<StreamActorManager>,
314 initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
315 term_id: String,
316 ) -> Self {
317 let state = ManagedBarrierState::new(
318 actor_manager.clone(),
319 initial_partial_graphs,
320 term_id.clone(),
321 );
322 Self {
323 state,
324 await_epoch_completed_futures: Default::default(),
325 control_stream_handle: ControlStreamHandle::empty(),
326 actor_manager,
327 term_id,
328 }
329 }
330
331 fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
332 LocalBarrierWorkerDebugInfo {
333 managed_barrier_state: self
334 .state
335 .databases
336 .iter()
337 .map(|(database_id, status)| {
338 (*database_id, {
339 match status {
340 DatabaseStatus::Running(state) => {
341 ("running".to_owned(), Some(state.to_debug_info()))
342 }
343 DatabaseStatus::Suspended(state) => {
344 (format!("suspended: {:?}", state.suspend_time), None)
345 }
346 DatabaseStatus::Resetting(_) => ("resetting".to_owned(), None),
347 DatabaseStatus::Unspecified => {
348 unreachable!()
349 }
350 }
351 })
352 })
353 .collect(),
354 has_control_stream_connected: self.control_stream_handle.connected(),
355 }
356 }
357
358 pub(crate) fn get_or_insert_database_shared_context<'a>(
359 current_shared_context: &'a mut HashMap<DatabaseId, Arc<SharedContext>>,
360 database_id: DatabaseId,
361 actor_manager: &StreamActorManager,
362 term_id: &String,
363 ) -> &'a Arc<SharedContext> {
364 current_shared_context
365 .entry(database_id)
366 .or_insert_with(|| {
367 Arc::new(SharedContext::new(
368 database_id,
369 &actor_manager.env,
370 term_id.clone(),
371 ))
372 })
373 }
374
375 async fn next_completed_epoch(
376 futures: &mut HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
377 ) -> (
378 DatabaseId,
379 PartialGraphId,
380 Barrier,
381 StreamResult<BarrierCompleteResult>,
382 ) {
383 poll_fn(|cx| {
384 for (database_id, futures) in &mut *futures {
385 if let Poll::Ready(Some((partial_graph_id, barrier, result))) =
386 futures.poll_next_unpin(cx)
387 {
388 return Poll::Ready((*database_id, partial_graph_id, barrier, result));
389 }
390 }
391 Poll::Pending
392 })
393 .await
394 }
395
396 async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
397 loop {
398 select! {
399 biased;
400 (database_id, event) = self.state.next_event() => {
401 match event {
402 ManagedBarrierStateEvent::BarrierCollected{
403 partial_graph_id,
404 barrier,
405 } => {
406 self.complete_barrier(database_id, partial_graph_id, barrier.epoch.prev);
407 }
408 ManagedBarrierStateEvent::ActorError{
409 actor_id,
410 err,
411 } => {
412 self.on_database_failure(database_id, Some(actor_id), err, "recv actor failure");
413 }
414 ManagedBarrierStateEvent::DatabaseReset(output, reset_request_id) => {
415 self.ack_database_reset(database_id, Some(output), reset_request_id);
416 }
417 }
418 }
419 (database_id, partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
420 match result {
421 Ok(result) => {
422 self.on_epoch_completed(database_id, partial_graph_id, barrier.epoch.prev, result);
423 }
424 Err(err) => {
425 self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {} {:?} {:?}", database_id, partial_graph_id.0, barrier.epoch, err.as_report())));
429 }
430 }
431 },
432 actor_op = actor_op_rx.recv() => {
433 if let Some(actor_op) = actor_op {
434 match actor_op {
435 LocalActorOperation::NewControlStream { handle, init_request } => {
436 self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
437 self.reset(init_request).await;
438 self.control_stream_handle = handle;
439 self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
440 }
441 LocalActorOperation::Shutdown { result_sender } => {
442 if self.state.databases.values().any(|database| {
443 match database {
444 DatabaseStatus::Running(database) => {
445 !database.actor_states.is_empty()
446 }
447 DatabaseStatus::Suspended(_) | DatabaseStatus::Resetting(_) => {
448 false
449 }
450 DatabaseStatus::Unspecified => {
451 unreachable!()
452 }
453 }
454 }) {
455 tracing::warn!(
456 "shutdown with running actors, scaling or migration will be triggered"
457 );
458 }
459 self.control_stream_handle.shutdown_stream().await;
460 let _ = result_sender.send(());
461 }
462 actor_op => {
463 self.handle_actor_op(actor_op);
464 }
465 }
466 }
467 else {
468 break;
469 }
470 },
471 request = self.control_stream_handle.next_request() => {
472 let result = self.handle_streaming_control_request(request.request.expect("non empty"));
473 if let Err((database_id, err)) = result {
474 self.on_database_failure(database_id, None, err, "failed to inject barrier");
475 }
476 },
477 }
478 }
479 }
480
481 fn handle_streaming_control_request(
482 &mut self,
483 request: Request,
484 ) -> Result<(), (DatabaseId, StreamError)> {
485 match request {
486 Request::InjectBarrier(req) => {
487 let database_id = DatabaseId::new(req.database_id);
488 let result: StreamResult<()> = try {
489 let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
490 self.update_actor_info(database_id, req.broadcast_info.iter().cloned());
491 self.send_barrier(&barrier, req)?;
492 };
493 result.map_err(|e| (database_id, e))?;
494 Ok(())
495 }
496 Request::RemovePartialGraph(req) => {
497 self.remove_partial_graphs(
498 DatabaseId::new(req.database_id),
499 req.partial_graph_ids.into_iter().map(PartialGraphId::new),
500 );
501 Ok(())
502 }
503 Request::CreatePartialGraph(req) => {
504 self.add_partial_graph(
505 DatabaseId::new(req.database_id),
506 PartialGraphId::new(req.partial_graph_id),
507 );
508 Ok(())
509 }
510 Request::ResetDatabase(req) => {
511 self.reset_database(req);
512 Ok(())
513 }
514 Request::Init(_) => {
515 unreachable!()
516 }
517 }
518 }
519
520 fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
521 match actor_op {
522 LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
523 unreachable!("event {actor_op} should be handled separately in async context")
524 }
525 LocalActorOperation::TakeReceiver {
526 database_id,
527 term_id,
528 ids,
529 result_sender,
530 } => {
531 let result = try {
532 if self.term_id != term_id {
533 warn!(
534 ?ids,
535 term_id,
536 current_term_id = self.term_id,
537 "take receiver on unmatched term_id"
538 );
539 Err(anyhow!(
540 "take receiver {:?} on unmatched term_id {} to current term_id {}",
541 ids,
542 term_id,
543 self.term_id
544 ))?;
545 }
546 LocalBarrierWorker::get_or_insert_database_shared_context(
547 &mut self.state.current_shared_context,
548 database_id,
549 &self.actor_manager,
550 &self.term_id,
551 )
552 .take_receiver(ids)?
553 };
554 let _ = result_sender.send(result);
555 }
556 #[cfg(test)]
557 LocalActorOperation::GetCurrentSharedContext(sender) => {
558 let database_status = self
559 .state
560 .databases
561 .get(&crate::task::TEST_DATABASE_ID)
562 .unwrap();
563 let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
564 let _ = sender.send((
565 database_state.current_shared_context.clone(),
566 database_state.local_barrier_manager.clone(),
567 ));
568 }
569 #[cfg(test)]
570 LocalActorOperation::Flush(sender) => {
571 use futures::FutureExt;
572 while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
573 self.handle_streaming_control_request(
574 request.request.expect("should not be empty"),
575 )
576 .unwrap();
577 }
578 while let Some((database_id, event)) = self.state.next_event().now_or_never() {
579 match event {
580 ManagedBarrierStateEvent::BarrierCollected {
581 partial_graph_id,
582 barrier,
583 } => {
584 self.complete_barrier(
585 database_id,
586 partial_graph_id,
587 barrier.epoch.prev,
588 );
589 }
590 ManagedBarrierStateEvent::ActorError { .. }
591 | ManagedBarrierStateEvent::DatabaseReset(..) => {
592 unreachable!()
593 }
594 }
595 }
596 sender.send(()).unwrap()
597 }
598 LocalActorOperation::InspectState { result_sender } => {
599 let debug_info = self.to_debug_info();
600 let _ = result_sender.send(debug_info.to_string());
601 }
602 }
603 }
604}
605
606mod await_epoch_completed_future {
607 use std::future::Future;
608
609 use futures::FutureExt;
610 use futures::future::BoxFuture;
611 use risingwave_hummock_sdk::SyncResult;
612 use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
613
614 use crate::error::StreamResult;
615 use crate::executor::Barrier;
616 use crate::task::{BarrierCompleteResult, PartialGraphId, await_tree_key};
617
618 pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
619 + 'static;
620
621 pub(super) fn instrument_complete_barrier_future(
622 partial_graph_id: PartialGraphId,
623 complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
624 barrier: Barrier,
625 barrier_await_tree_reg: Option<&await_tree::Registry>,
626 create_mview_progress: Vec<PbCreateMviewProgress>,
627 ) -> AwaitEpochCompletedFuture {
628 let prev_epoch = barrier.epoch.prev;
629 let future = async move {
630 if let Some(future) = complete_barrier_future {
631 let result = future.await;
632 result.map(Some)
633 } else {
634 Ok(None)
635 }
636 }
637 .map(move |result| {
638 (
639 partial_graph_id,
640 barrier,
641 result.map(|sync_result| BarrierCompleteResult {
642 sync_result,
643 create_mview_progress,
644 }),
645 )
646 });
647 if let Some(reg) = barrier_await_tree_reg {
648 reg.register(
649 await_tree_key::BarrierAwait { prev_epoch },
650 format!("SyncEpoch({})", prev_epoch),
651 )
652 .instrument(future)
653 .left_future()
654 } else {
655 future.right_future()
656 }
657 }
658}
659
660use await_epoch_completed_future::*;
661use risingwave_common::catalog::{DatabaseId, TableId};
662use risingwave_storage::{StateStoreImpl, dispatch_state_store};
663
664fn sync_epoch(
665 state_store: &StateStoreImpl,
666 streaming_metrics: &StreamingMetrics,
667 prev_epoch: u64,
668 table_ids: HashSet<TableId>,
669) -> BoxFuture<'static, StreamResult<SyncResult>> {
670 let timer = streaming_metrics.barrier_sync_latency.start_timer();
671
672 let state_store = state_store.clone();
673 let future = async move {
674 dispatch_state_store!(state_store, hummock, {
675 hummock.sync(vec![(prev_epoch, table_ids)]).await
676 })
677 };
678
679 future
680 .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
681 .inspect_ok(move |_| {
682 timer.observe_duration();
683 })
684 .map_err(move |e| {
685 tracing::error!(
686 prev_epoch,
687 error = %e.as_report(),
688 "Failed to sync state store",
689 );
690 e.into()
691 })
692 .boxed()
693}
694
695impl LocalBarrierWorker {
696 fn complete_barrier(
697 &mut self,
698 database_id: DatabaseId,
699 partial_graph_id: PartialGraphId,
700 prev_epoch: u64,
701 ) {
702 {
703 let Some(database_state) = self
704 .state
705 .databases
706 .get_mut(&database_id)
707 .expect("should exist")
708 .state_for_request()
709 else {
710 return;
711 };
712 let (barrier, table_ids, create_mview_progress) =
713 database_state.pop_barrier_to_complete(partial_graph_id, prev_epoch);
714
715 let complete_barrier_future = match &barrier.kind {
716 BarrierKind::Unspecified => unreachable!(),
717 BarrierKind::Initial => {
718 tracing::info!(
719 epoch = prev_epoch,
720 "ignore sealing data for the first barrier"
721 );
722 tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
723 None
724 }
725 BarrierKind::Barrier => None,
726 BarrierKind::Checkpoint => Some(sync_epoch(
727 &self.actor_manager.env.state_store(),
728 &self.actor_manager.streaming_metrics,
729 prev_epoch,
730 table_ids.expect("should be Some on BarrierKind::Checkpoint"),
731 )),
732 };
733
734 self.await_epoch_completed_futures
735 .entry(database_id)
736 .or_default()
737 .push_back({
738 instrument_complete_barrier_future(
739 partial_graph_id,
740 complete_barrier_future,
741 barrier,
742 self.actor_manager.await_tree_reg.as_ref(),
743 create_mview_progress,
744 )
745 });
746 }
747 }
748
749 fn on_epoch_completed(
750 &mut self,
751 database_id: DatabaseId,
752 partial_graph_id: PartialGraphId,
753 epoch: u64,
754 result: BarrierCompleteResult,
755 ) {
756 let BarrierCompleteResult {
757 create_mview_progress,
758 sync_result,
759 } = result;
760
761 let (synced_sstables, table_watermarks, old_value_ssts) = sync_result
762 .map(|sync_result| {
763 (
764 sync_result.uncommitted_ssts,
765 sync_result.table_watermarks,
766 sync_result.old_value_ssts,
767 )
768 })
769 .unwrap_or_default();
770
771 let result = {
772 {
773 streaming_control_stream_response::Response::CompleteBarrier(
774 BarrierCompleteResponse {
775 request_id: "todo".to_owned(),
776 partial_graph_id: partial_graph_id.into(),
777 epoch,
778 status: None,
779 create_mview_progress,
780 synced_sstables: synced_sstables
781 .into_iter()
782 .map(
783 |LocalSstableInfo {
784 sst_info,
785 table_stats,
786 created_at,
787 }| PbLocalSstableInfo {
788 sst: Some(sst_info.into()),
789 table_stats_map: to_prost_table_stats_map(table_stats),
790 created_at,
791 },
792 )
793 .collect_vec(),
794 worker_id: self.actor_manager.env.worker_id(),
795 table_watermarks: table_watermarks
796 .into_iter()
797 .map(|(key, value)| (key.table_id, value.into()))
798 .collect(),
799 old_value_sstables: old_value_ssts
800 .into_iter()
801 .map(|sst| sst.sst_info.into())
802 .collect(),
803 database_id: database_id.database_id,
804 },
805 )
806 }
807 };
808
809 self.control_stream_handle.send_response(result);
810 }
811
812 fn send_barrier(
819 &mut self,
820 barrier: &Barrier,
821 request: InjectBarrierRequest,
822 ) -> StreamResult<()> {
823 debug!(
824 target: "events::stream::barrier::manager::send",
825 "send barrier {:?}, actor_ids_to_collect = {:?}",
826 barrier,
827 request.actor_ids_to_collect
828 );
829
830 let database_status = self
831 .state
832 .databases
833 .get_mut(&DatabaseId::new(request.database_id))
834 .expect("should exist");
835 if let Some(state) = database_status.state_for_request() {
836 state.transform_to_issued(barrier, request)?;
837 }
838 Ok(())
839 }
840
841 fn remove_partial_graphs(
842 &mut self,
843 database_id: DatabaseId,
844 partial_graph_ids: impl Iterator<Item = PartialGraphId>,
845 ) {
846 let Some(database_status) = self.state.databases.get_mut(&database_id) else {
847 warn!(
848 database_id = database_id.database_id,
849 "database to remove partial graph not exist"
850 );
851 return;
852 };
853 let Some(database_state) = database_status.state_for_request() else {
854 warn!(
855 database_id = database_id.database_id,
856 "ignore remove partial graph request on err database",
857 );
858 return;
859 };
860 for partial_graph_id in partial_graph_ids {
861 if let Some(graph) = database_state.graph_states.remove(&partial_graph_id) {
862 assert!(
863 graph.is_empty(),
864 "non empty graph to be removed: {}",
865 &graph
866 );
867 } else {
868 warn!(
869 partial_graph_id = partial_graph_id.0,
870 "no partial graph to remove"
871 );
872 }
873 }
874 }
875
876 fn add_partial_graph(&mut self, database_id: DatabaseId, partial_graph_id: PartialGraphId) {
877 let status = self.state.databases.entry(database_id).or_insert_with(|| {
878 DatabaseStatus::Running(DatabaseManagedBarrierState::new(
879 database_id,
880 self.actor_manager.clone(),
881 LocalBarrierWorker::get_or_insert_database_shared_context(
882 &mut self.state.current_shared_context,
883 database_id,
884 &self.actor_manager,
885 &self.term_id,
886 )
887 .clone(),
888 vec![],
889 ))
890 });
891 if let Some(state) = status.state_for_request() {
892 assert!(
893 state
894 .graph_states
895 .insert(
896 partial_graph_id,
897 PartialGraphManagedBarrierState::new(&self.actor_manager)
898 )
899 .is_none()
900 );
901 }
902 }
903
904 fn reset_database(&mut self, req: ResetDatabaseRequest) {
905 let database_id = DatabaseId::new(req.database_id);
906 if let Some(database_status) = self.state.databases.get_mut(&database_id) {
907 database_status.start_reset(
908 database_id,
909 self.await_epoch_completed_futures.remove(&database_id),
910 req.reset_request_id,
911 );
912 } else {
913 self.ack_database_reset(database_id, None, req.reset_request_id);
914 }
915 }
916
917 fn ack_database_reset(
918 &mut self,
919 database_id: DatabaseId,
920 reset_output: Option<ResetDatabaseOutput>,
921 reset_request_id: u32,
922 ) {
923 info!(
924 database_id = database_id.database_id,
925 "database reset successfully"
926 );
927 if let Some(reset_database) = self.state.databases.remove(&database_id) {
928 match reset_database {
929 DatabaseStatus::Resetting(_) => {}
930 _ => {
931 unreachable!("must be resetting previously")
932 }
933 }
934 }
935 self.state.current_shared_context.remove(&database_id);
936 self.await_epoch_completed_futures.remove(&database_id);
937 self.control_stream_handle.ack_reset_database(
938 database_id,
939 reset_output.and_then(|output| output.root_err),
940 reset_request_id,
941 );
942 }
943
944 fn on_database_failure(
948 &mut self,
949 database_id: DatabaseId,
950 failed_actor: Option<ActorId>,
951 err: StreamError,
952 message: impl Into<String>,
953 ) {
954 let message = message.into();
955 error!(database_id = database_id.database_id, ?failed_actor, message, err = ?err.as_report(), "suspend database on error");
956 let completing_futures = self.await_epoch_completed_futures.remove(&database_id);
957 self.state
958 .databases
959 .get_mut(&database_id)
960 .expect("should exist")
961 .suspend(failed_actor, err, completing_futures);
962 self.control_stream_handle
963 .send_response(Response::ReportDatabaseFailure(
964 ReportDatabaseFailureResponse {
965 database_id: database_id.database_id,
966 },
967 ));
968 }
969}
970
971impl DatabaseManagedBarrierState {
972 async fn try_find_root_actor_failure(
976 &mut self,
977 first_failure: Option<(Option<ActorId>, StreamError)>,
978 ) -> Option<ScoredStreamError> {
979 let mut later_errs = vec![];
980 let _ = tokio::time::timeout(Duration::from_secs(3), async {
982 let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
983 if let Some((Some(failed_actor), _)) = &first_failure {
984 uncollected_actors.remove(failed_actor);
985 }
986 while !uncollected_actors.is_empty()
987 && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
988 {
989 uncollected_actors.remove(&actor_id);
990 later_errs.push(error);
991 }
992 })
993 .await;
994
995 first_failure
996 .into_iter()
997 .map(|(_, err)| err)
998 .chain(later_errs.into_iter())
999 .map(|e| e.with_score())
1000 .max_by_key(|e| e.score)
1001 }
1002}
1003
1004#[derive(Clone)]
1005pub struct LocalBarrierManager {
1006 barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
1007 actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
1008}
1009
1010impl LocalBarrierWorker {
1011 pub fn spawn(
1013 env: StreamEnvironment,
1014 streaming_metrics: Arc<StreamingMetrics>,
1015 await_tree_reg: Option<await_tree::Registry>,
1016 watermark_epoch: AtomicU64Ref,
1017 actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1018 ) -> JoinHandle<()> {
1019 let runtime = {
1020 let mut builder = tokio::runtime::Builder::new_multi_thread();
1021 if let Some(worker_threads_num) = env.config().actor_runtime_worker_threads_num {
1022 builder.worker_threads(worker_threads_num);
1023 }
1024 builder
1025 .thread_name("rw-streaming")
1026 .enable_all()
1027 .build()
1028 .unwrap()
1029 };
1030
1031 let actor_manager = Arc::new(StreamActorManager {
1032 env: env.clone(),
1033 streaming_metrics,
1034 watermark_epoch,
1035 await_tree_reg,
1036 runtime: runtime.into(),
1037 });
1038 let worker = LocalBarrierWorker::new(actor_manager, vec![], "uninitialized".into());
1039 tokio::spawn(worker.run(actor_op_rx))
1040 }
1041}
1042
1043pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1044
1045impl<T> Clone for EventSender<T> {
1046 fn clone(&self) -> Self {
1047 Self(self.0.clone())
1048 }
1049}
1050
1051impl<T> EventSender<T> {
1052 pub(super) fn send_event(&self, event: T) {
1053 self.0.send(event).expect("should be able to send event")
1054 }
1055
1056 pub(super) async fn send_and_await<RSP>(
1057 &self,
1058 make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1059 ) -> StreamResult<RSP> {
1060 let (tx, rx) = oneshot::channel();
1061 let event = make_event(tx);
1062 self.send_event(event);
1063 rx.await
1064 .map_err(|_| anyhow!("barrier manager maybe reset").into())
1065 }
1066}
1067
1068impl LocalBarrierManager {
1069 pub(super) fn new() -> (
1070 Self,
1071 UnboundedReceiver<LocalBarrierEvent>,
1072 UnboundedReceiver<(ActorId, StreamError)>,
1073 ) {
1074 let (event_tx, event_rx) = unbounded_channel();
1075 let (err_tx, err_rx) = unbounded_channel();
1076 (
1077 Self {
1078 barrier_event_sender: event_tx,
1079 actor_failure_sender: err_tx,
1080 },
1081 event_rx,
1082 err_rx,
1083 )
1084 }
1085
1086 fn send_event(&self, event: LocalBarrierEvent) {
1087 let _ = self.barrier_event_sender.send(event);
1089 }
1090
1091 pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
1094 self.send_event(LocalBarrierEvent::ReportActorCollected {
1095 actor_id,
1096 epoch: barrier.epoch,
1097 })
1098 }
1099
1100 pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
1103 let _ = self
1104 .actor_failure_sender
1105 .send((actor_id, err.into_unexpected_exit(actor_id)));
1106 }
1107
1108 pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
1109 let (tx, rx) = mpsc::unbounded_channel();
1110 self.send_event(LocalBarrierEvent::RegisterBarrierSender {
1111 actor_id,
1112 barrier_sender: tx,
1113 });
1114 rx
1115 }
1116}
1117
1118type ScoredStreamError = ScoredError<StreamError>;
1120
1121impl StreamError {
1122 fn with_score(self) -> ScoredStreamError {
1124 fn stream_executor_error_score(e: &StreamExecutorError) -> i32 {
1128 use crate::executor::error::ErrorKind;
1129 match e.inner() {
1130 ErrorKind::ChannelClosed(_) | ErrorKind::ExchangeChannelClosed(_) => 1,
1133
1134 ErrorKind::Uncategorized(_)
1136 | ErrorKind::Storage(_)
1137 | ErrorKind::ArrayError(_)
1138 | ErrorKind::ExprError(_)
1139 | ErrorKind::SerdeError(_)
1140 | ErrorKind::SinkError(_, _)
1141 | ErrorKind::RpcError(_)
1142 | ErrorKind::AlignBarrier(_, _)
1143 | ErrorKind::ConnectorError(_)
1144 | ErrorKind::DmlError(_)
1145 | ErrorKind::NotImplemented(_) => 999,
1146 }
1147 }
1148
1149 fn stream_error_score(e: &StreamError) -> i32 {
1150 use crate::error::ErrorKind;
1151 match e.inner() {
1152 ErrorKind::UnexpectedExit { source, .. } => stream_error_score(source),
1154
1155 ErrorKind::BarrierSend { .. } => 1,
1157
1158 ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee),
1160
1161 ErrorKind::Uncategorized(_)
1163 | ErrorKind::Storage(_)
1164 | ErrorKind::Expression(_)
1165 | ErrorKind::Array(_)
1166 | ErrorKind::Secret(_) => 1000,
1167 }
1168 }
1169
1170 let score = Score(stream_error_score(&self));
1171 ScoredStreamError { error: self, score }
1172 }
1173}
1174
1175#[cfg(test)]
1176impl LocalBarrierManager {
1177 fn spawn_for_test() -> EventSender<LocalActorOperation> {
1178 use std::sync::atomic::AtomicU64;
1179 let (tx, rx) = unbounded_channel();
1180 let _join_handle = LocalBarrierWorker::spawn(
1181 StreamEnvironment::for_test(),
1182 Arc::new(StreamingMetrics::unused()),
1183 None,
1184 Arc::new(AtomicU64::new(0)),
1185 rx,
1186 );
1187 EventSender(tx)
1188 }
1189
1190 pub fn for_test() -> Self {
1191 let (tx, mut rx) = unbounded_channel();
1192 let (failure_tx, failure_rx) = unbounded_channel();
1193 let _join_handle = tokio::spawn(async move {
1194 let _failure_rx = failure_rx;
1195 while rx.recv().await.is_some() {}
1196 });
1197 Self {
1198 barrier_event_sender: tx,
1199 actor_failure_sender: failure_tx,
1200 }
1201 }
1202}
1203
1204#[cfg(test)]
1205pub(crate) mod barrier_test_utils {
1206 use std::sync::Arc;
1207
1208 use assert_matches::assert_matches;
1209 use futures::StreamExt;
1210 use risingwave_pb::stream_service::streaming_control_stream_request::{
1211 InitRequest, PbDatabaseInitialPartialGraph, PbInitialPartialGraph,
1212 };
1213 use risingwave_pb::stream_service::{
1214 InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse,
1215 streaming_control_stream_request, streaming_control_stream_response,
1216 };
1217 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1218 use tokio::sync::oneshot;
1219 use tokio_stream::wrappers::UnboundedReceiverStream;
1220 use tonic::Status;
1221
1222 use crate::executor::Barrier;
1223 use crate::task::barrier_manager::{ControlStreamHandle, EventSender, LocalActorOperation};
1224 use crate::task::{
1225 ActorId, LocalBarrierManager, SharedContext, TEST_DATABASE_ID, TEST_PARTIAL_GRAPH_ID,
1226 };
1227
1228 pub(crate) struct LocalBarrierTestEnv {
1229 pub shared_context: Arc<SharedContext>,
1230 pub local_barrier_manager: LocalBarrierManager,
1231 pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1232 pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1233 pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1234 }
1235
1236 impl LocalBarrierTestEnv {
1237 pub(crate) async fn for_test() -> Self {
1238 let actor_op_tx = LocalBarrierManager::spawn_for_test();
1239
1240 let (request_tx, request_rx) = unbounded_channel();
1241 let (response_tx, mut response_rx) = unbounded_channel();
1242
1243 actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1244 handle: ControlStreamHandle::new(
1245 response_tx,
1246 UnboundedReceiverStream::new(request_rx).boxed(),
1247 ),
1248 init_request: InitRequest {
1249 databases: vec![PbDatabaseInitialPartialGraph {
1250 database_id: TEST_DATABASE_ID.database_id,
1251 graphs: vec![PbInitialPartialGraph {
1252 partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1253 subscriptions: vec![],
1254 actor_infos: vec![],
1255 }],
1256 }],
1257 term_id: "for_test".into(),
1258 },
1259 });
1260
1261 assert_matches!(
1262 response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1263 streaming_control_stream_response::Response::Init(_)
1264 );
1265
1266 let (shared_context, local_barrier_manager) = actor_op_tx
1267 .send_and_await(LocalActorOperation::GetCurrentSharedContext)
1268 .await
1269 .unwrap();
1270
1271 Self {
1272 shared_context,
1273 local_barrier_manager,
1274 actor_op_tx,
1275 request_tx,
1276 response_rx,
1277 }
1278 }
1279
1280 pub(crate) fn inject_barrier(
1281 &self,
1282 barrier: &Barrier,
1283 actor_to_collect: impl IntoIterator<Item = ActorId>,
1284 ) {
1285 self.request_tx
1286 .send(Ok(StreamingControlStreamRequest {
1287 request: Some(streaming_control_stream_request::Request::InjectBarrier(
1288 InjectBarrierRequest {
1289 request_id: "".to_owned(),
1290 barrier: Some(barrier.to_protobuf()),
1291 database_id: TEST_DATABASE_ID.database_id,
1292 actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1293 table_ids_to_sync: vec![],
1294 partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1295 broadcast_info: vec![],
1296 actors_to_build: vec![],
1297 subscriptions_to_add: vec![],
1298 subscriptions_to_remove: vec![],
1299 },
1300 )),
1301 }))
1302 .unwrap();
1303 }
1304
1305 pub(crate) async fn flush_all_events(&self) {
1306 Self::flush_all_events_impl(&self.actor_op_tx).await
1307 }
1308
1309 pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1310 let (tx, rx) = oneshot::channel();
1311 actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1312 rx.await.unwrap()
1313 }
1314 }
1315}