1use std::collections::hash_map::Entry;
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::future::{pending, poll_fn};
18use std::sync::Arc;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use await_tree::{InstrumentAwait, SpanExt};
23use futures::future::{BoxFuture, join_all};
24use futures::stream::{BoxStream, FuturesOrdered};
25use futures::{FutureExt, StreamExt, TryFutureExt};
26use itertools::Itertools;
27use risingwave_pb::stream_plan::barrier::BarrierKind;
28use risingwave_pb::stream_service::barrier_complete_response::{
29 PbCdcTableBackfillProgress, PbCreateMviewProgress, PbListFinishedSource, PbLoadFinishedSource,
30 PbLocalSstableInfo,
31};
32use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
33use risingwave_storage::store_impl::AsHummock;
34use thiserror_ext::AsReport;
35use tokio::select;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot;
38use tokio::task::JoinHandle;
39use tonic::{Code, Status};
40use tracing::warn;
41
42use self::managed_state::ManagedBarrierState;
43use crate::error::{ScoredStreamError, StreamError, StreamResult};
44#[cfg(test)]
45use crate::task::LocalBarrierManager;
46use crate::task::managed_state::BarrierToComplete;
47use crate::task::{
48 ActorId, AtomicU64Ref, CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY, ConfigOverrideCache,
49 PartialGraphId, StreamActorManager, StreamEnvironment, UpDownActorIds,
50};
51pub mod managed_state;
52#[cfg(test)]
53mod tests;
54
55use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
56use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
57use risingwave_pb::stream_service::streaming_control_stream_request::{
58 InitRequest, Request, ResetDatabaseRequest,
59};
60use risingwave_pb::stream_service::streaming_control_stream_response::{
61 InitResponse, ReportDatabaseFailureResponse, ResetDatabaseResponse, Response, ShutdownResponse,
62};
63use risingwave_pb::stream_service::{
64 BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
65 StreamingControlStreamResponse, streaming_control_stream_response,
66};
67
68use crate::executor::Barrier;
69use crate::executor::exchange::permit::Receiver;
70use crate::executor::monitor::StreamingMetrics;
71use crate::task::barrier_worker::managed_state::{
72 DatabaseManagedBarrierState, DatabaseStatus, ManagedBarrierStateDebugInfo,
73 ManagedBarrierStateEvent, PartialGraphManagedBarrierState, ResetDatabaseOutput,
74};
75
76pub const ENABLE_BARRIER_AGGREGATION: bool = false;
79
80#[derive(Debug)]
82pub struct BarrierCompleteResult {
83 pub sync_result: Option<SyncResult>,
85
86 pub create_mview_progress: Vec<PbCreateMviewProgress>,
88
89 pub list_finished_source_ids: Vec<PbListFinishedSource>,
91
92 pub load_finished_source_ids: Vec<PbLoadFinishedSource>,
94
95 pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
96
97 pub truncate_tables: Vec<TableId>,
99 pub refresh_finished_tables: Vec<TableId>,
101}
102
103pub(super) struct ControlStreamHandle {
107 #[expect(clippy::type_complexity)]
108 pair: Option<(
109 UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
110 BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
111 )>,
112}
113
114impl ControlStreamHandle {
115 fn empty() -> Self {
116 Self { pair: None }
117 }
118
119 pub(super) fn new(
120 sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
121 request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
122 ) -> Self {
123 Self {
124 pair: Some((sender, request_stream)),
125 }
126 }
127
128 pub(super) fn connected(&self) -> bool {
129 self.pair.is_some()
130 }
131
132 fn reset_stream_with_err(&mut self, err: Status) {
133 if let Some((sender, _)) = self.pair.take() {
134 let err = TonicStatusWrapper::new(err);
136 warn!(error = %err.as_report(), "control stream reset with error");
137
138 let err = err.into_inner();
139 if sender.send(Err(err)).is_err() {
140 warn!("failed to notify reset of control stream");
141 }
142 }
143 }
144
145 async fn shutdown_stream(&mut self) {
148 if let Some((sender, _)) = self.pair.take() {
149 if sender
150 .send(Ok(StreamingControlStreamResponse {
151 response: Some(streaming_control_stream_response::Response::Shutdown(
152 ShutdownResponse::default(),
153 )),
154 }))
155 .is_err()
156 {
157 warn!("failed to notify shutdown of control stream");
158 } else {
159 tracing::info!("waiting for meta service to close control stream...");
160
161 sender.closed().await;
168 }
169 } else {
170 debug!("control stream has been reset, ignore shutdown");
171 }
172 }
173
174 pub(super) fn ack_reset_database(
175 &mut self,
176 database_id: DatabaseId,
177 root_err: Option<ScoredStreamError>,
178 reset_request_id: u32,
179 ) {
180 self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
181 database_id,
182 root_err: root_err.map(|err| PbScoredError {
183 err_msg: err.error.to_report_string(),
184 score: err.score.0,
185 }),
186 reset_request_id,
187 }));
188 }
189
190 fn send_response(&mut self, response: streaming_control_stream_response::Response) {
191 if let Some((sender, _)) = self.pair.as_ref() {
192 if sender
193 .send(Ok(StreamingControlStreamResponse {
194 response: Some(response),
195 }))
196 .is_err()
197 {
198 self.pair = None;
199 warn!("fail to send response. control stream reset");
200 }
201 } else {
202 debug!(?response, "control stream has been reset. ignore response");
203 }
204 }
205
206 async fn next_request(&mut self) -> StreamingControlStreamRequest {
207 if let Some((_, stream)) = &mut self.pair {
208 match stream.next().await {
209 Some(Ok(request)) => {
210 return request;
211 }
212 Some(Err(e)) => self.reset_stream_with_err(
213 anyhow!(TonicStatusWrapper::new(e)) .context("failed to get request")
215 .to_status_unnamed(Code::Internal),
216 ),
217 None => self.reset_stream_with_err(Status::internal("end of stream")),
218 }
219 }
220 pending().await
221 }
222}
223
224#[derive(strum_macros::Display)]
228pub(super) enum LocalActorOperation {
229 NewControlStream {
230 handle: ControlStreamHandle,
231 init_request: InitRequest,
232 },
233 TakeReceiver {
234 database_id: DatabaseId,
235 term_id: String,
236 ids: UpDownActorIds,
237 result_sender: oneshot::Sender<StreamResult<Receiver>>,
238 },
239 #[cfg(test)]
240 GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
241 #[cfg(test)]
242 TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
243 #[cfg(test)]
244 Flush(oneshot::Sender<()>),
245 InspectState {
246 result_sender: oneshot::Sender<String>,
247 },
248 Shutdown {
249 result_sender: oneshot::Sender<()>,
250 },
251}
252
253pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
254 managed_barrier_state: HashMap<DatabaseId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
255 has_control_stream_connected: bool,
256}
257
258impl Display for LocalBarrierWorkerDebugInfo<'_> {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 writeln!(
261 f,
262 "\nhas_control_stream_connected: {}",
263 self.has_control_stream_connected
264 )?;
265
266 for (database_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
267 writeln!(
268 f,
269 "database {} status: {} managed_barrier_state:\n{}",
270 database_id,
271 status,
272 managed_barrier_state
273 .as_ref()
274 .map(ToString::to_string)
275 .unwrap_or_default()
276 )?;
277 }
278 Ok(())
279 }
280}
281
282pub(super) struct LocalBarrierWorker {
290 pub(super) state: ManagedBarrierState,
292
293 await_epoch_completed_futures: HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
295
296 control_stream_handle: ControlStreamHandle,
297
298 pub(super) actor_manager: Arc<StreamActorManager>,
299
300 pub(super) term_id: String,
301}
302
303impl LocalBarrierWorker {
304 pub(super) fn new(actor_manager: Arc<StreamActorManager>, term_id: String) -> Self {
305 Self {
306 state: Default::default(),
307 await_epoch_completed_futures: Default::default(),
308 control_stream_handle: ControlStreamHandle::empty(),
309 actor_manager,
310 term_id,
311 }
312 }
313
314 fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
315 LocalBarrierWorkerDebugInfo {
316 managed_barrier_state: self
317 .state
318 .databases
319 .iter()
320 .map(|(database_id, status)| {
321 (*database_id, {
322 match status {
323 DatabaseStatus::ReceivedExchangeRequest(_) => {
324 ("ReceivedExchangeRequest".to_owned(), None)
325 }
326 DatabaseStatus::Running(state) => {
327 ("running".to_owned(), Some(state.to_debug_info()))
328 }
329 DatabaseStatus::Suspended(state) => {
330 (format!("suspended: {:?}", state.suspend_time), None)
331 }
332 DatabaseStatus::Resetting(_) => ("resetting".to_owned(), None),
333 DatabaseStatus::Unspecified => {
334 unreachable!()
335 }
336 }
337 })
338 })
339 .collect(),
340 has_control_stream_connected: self.control_stream_handle.connected(),
341 }
342 }
343
344 async fn next_completed_epoch(
345 futures: &mut HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
346 ) -> (
347 DatabaseId,
348 PartialGraphId,
349 Barrier,
350 StreamResult<BarrierCompleteResult>,
351 ) {
352 poll_fn(|cx| {
353 for (database_id, futures) in &mut *futures {
354 if let Poll::Ready(Some((partial_graph_id, barrier, result))) =
355 futures.poll_next_unpin(cx)
356 {
357 return Poll::Ready((*database_id, partial_graph_id, barrier, result));
358 }
359 }
360 Poll::Pending
361 })
362 .await
363 }
364
365 async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
366 loop {
367 select! {
368 biased;
369 (database_id, event) = self.state.next_event() => {
370 match event {
371 ManagedBarrierStateEvent::BarrierCollected{
372 partial_graph_id,
373 barrier,
374 } => {
375 self.complete_barrier(database_id, partial_graph_id, barrier.epoch.prev);
378 }
379 ManagedBarrierStateEvent::ActorError{
380 actor_id,
381 err,
382 } => {
383 self.on_database_failure(database_id, Some(actor_id), err, "recv actor failure");
384 }
385 ManagedBarrierStateEvent::DatabaseReset(output, reset_request_id) => {
386 self.ack_database_reset(database_id, Some(output), reset_request_id);
387 }
388 }
389 }
390 (database_id, partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
391 match result {
392 Ok(result) => {
393 self.on_epoch_completed(database_id, partial_graph_id, barrier.epoch.prev, result);
394 }
395 Err(err) => {
396 self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {} {:?} {:?}", database_id, partial_graph_id.0, barrier.epoch, err.as_report())));
400 }
401 }
402 },
403 actor_op = actor_op_rx.recv() => {
404 if let Some(actor_op) = actor_op {
405 match actor_op {
406 LocalActorOperation::NewControlStream { handle, init_request } => {
407 self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
408 self.reset(init_request).await;
409 self.control_stream_handle = handle;
410 self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
411 }
412 LocalActorOperation::Shutdown { result_sender } => {
413 if self.state.databases.values().any(|database| {
414 match database {
415 DatabaseStatus::Running(database) => {
416 !database.actor_states.is_empty()
417 }
418 DatabaseStatus::Suspended(_) | DatabaseStatus::Resetting(_) |
419 DatabaseStatus::ReceivedExchangeRequest(_) => {
420 false
421 }
422 DatabaseStatus::Unspecified => {
423 unreachable!()
424 }
425 }
426 }) {
427 tracing::warn!(
428 "shutdown with running actors, scaling or migration will be triggered"
429 );
430 }
431 self.control_stream_handle.shutdown_stream().await;
432 let _ = result_sender.send(());
433 }
434 actor_op => {
435 self.handle_actor_op(actor_op);
436 }
437 }
438 }
439 else {
440 break;
441 }
442 },
443 request = self.control_stream_handle.next_request() => {
444 let result = self.handle_streaming_control_request(request.request.expect("non empty"));
445 if let Err((database_id, err)) = result {
446 self.on_database_failure(database_id, None, err, "failed to inject barrier");
447 }
448 },
449 }
450 }
451 }
452
453 fn handle_streaming_control_request(
454 &mut self,
455 request: Request,
456 ) -> Result<(), (DatabaseId, StreamError)> {
457 match request {
458 Request::InjectBarrier(req) => {
459 let database_id = req.database_id;
460 let result: StreamResult<()> = try {
461 let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
462 self.send_barrier(&barrier, req)?;
463 };
464 result.map_err(|e| (database_id, e))?;
465 Ok(())
466 }
467 Request::RemovePartialGraph(req) => {
468 self.remove_partial_graphs(
469 req.database_id,
470 req.partial_graph_ids.into_iter().map(PartialGraphId::new),
471 );
472 Ok(())
473 }
474 Request::CreatePartialGraph(req) => {
475 self.add_partial_graph(req.database_id, PartialGraphId::new(req.partial_graph_id));
476 Ok(())
477 }
478 Request::ResetDatabase(req) => {
479 self.reset_database(req);
480 Ok(())
481 }
482 Request::Init(_) => {
483 unreachable!()
484 }
485 }
486 }
487
488 fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
489 match actor_op {
490 LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
491 unreachable!("event {actor_op} should be handled separately in async context")
492 }
493 LocalActorOperation::TakeReceiver {
494 database_id,
495 term_id,
496 ids,
497 result_sender,
498 } => {
499 let err = if self.term_id != term_id {
500 {
501 warn!(
502 ?ids,
503 term_id,
504 current_term_id = self.term_id,
505 "take receiver on unmatched term_id"
506 );
507 anyhow!(
508 "take receiver {:?} on unmatched term_id {} to current term_id {}",
509 ids,
510 term_id,
511 self.term_id
512 )
513 }
514 } else {
515 match self.state.databases.entry(database_id) {
516 Entry::Occupied(mut entry) => match entry.get_mut() {
517 DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
518 pending_requests.push((ids, result_sender));
519 return;
520 }
521 DatabaseStatus::Running(database) => {
522 let (upstream_actor_id, actor_id) = ids;
523 database.new_actor_remote_output_request(
524 actor_id,
525 upstream_actor_id,
526 result_sender,
527 );
528 return;
529 }
530 DatabaseStatus::Suspended(_) => {
531 anyhow!("database suspended")
532 }
533 DatabaseStatus::Resetting(_) => {
534 anyhow!("database resetting")
535 }
536 DatabaseStatus::Unspecified => {
537 unreachable!()
538 }
539 },
540 Entry::Vacant(entry) => {
541 entry.insert(DatabaseStatus::ReceivedExchangeRequest(vec![(
542 ids,
543 result_sender,
544 )]));
545 return;
546 }
547 }
548 };
549 let _ = result_sender.send(Err(err.into()));
550 }
551 #[cfg(test)]
552 LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
553 let database_status = self
554 .state
555 .databases
556 .get(&crate::task::TEST_DATABASE_ID)
557 .unwrap();
558 let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
559 let _ = sender.send(database_state.local_barrier_manager.clone());
560 }
561 #[cfg(test)]
562 LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
563 let database_status = self
564 .state
565 .databases
566 .get_mut(&crate::task::TEST_DATABASE_ID)
567 .unwrap();
568
569 let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
570 assert!(!database_state.actor_states.contains_key(&actor_id));
571 let requests = database_state
572 .actor_pending_new_output_requests
573 .remove(&actor_id)
574 .unwrap();
575 let _ = sender.send(requests);
576 }
577 #[cfg(test)]
578 LocalActorOperation::Flush(sender) => {
579 use futures::FutureExt;
580 while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
581 self.handle_streaming_control_request(
582 request.request.expect("should not be empty"),
583 )
584 .unwrap();
585 }
586 while let Some((database_id, event)) = self.state.next_event().now_or_never() {
587 match event {
588 ManagedBarrierStateEvent::BarrierCollected {
589 partial_graph_id,
590 barrier,
591 } => {
592 self.complete_barrier(
593 database_id,
594 partial_graph_id,
595 barrier.epoch.prev,
596 );
597 }
598 ManagedBarrierStateEvent::ActorError { .. }
599 | ManagedBarrierStateEvent::DatabaseReset(..) => {
600 unreachable!()
601 }
602 }
603 }
604 sender.send(()).unwrap()
605 }
606 LocalActorOperation::InspectState { result_sender } => {
607 let debug_info = self.to_debug_info();
608 let _ = result_sender.send(debug_info.to_string());
609 }
610 }
611 }
612}
613
614mod await_epoch_completed_future {
615 use std::future::Future;
616
617 use futures::FutureExt;
618 use futures::future::BoxFuture;
619 use risingwave_common::id::TableId;
620 use risingwave_hummock_sdk::SyncResult;
621 use risingwave_pb::stream_service::barrier_complete_response::{
622 PbCdcTableBackfillProgress, PbCreateMviewProgress, PbListFinishedSource,
623 PbLoadFinishedSource,
624 };
625
626 use crate::error::StreamResult;
627 use crate::executor::Barrier;
628 use crate::task::{BarrierCompleteResult, PartialGraphId, await_tree_key};
629
630 pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
631 + 'static;
632
633 #[define_opaque(AwaitEpochCompletedFuture)]
634 #[expect(clippy::too_many_arguments)]
635 pub(super) fn instrument_complete_barrier_future(
636 partial_graph_id: PartialGraphId,
637 complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
638 barrier: Barrier,
639 barrier_await_tree_reg: Option<&await_tree::Registry>,
640 create_mview_progress: Vec<PbCreateMviewProgress>,
641 list_finished_source_ids: Vec<PbListFinishedSource>,
642 load_finished_source_ids: Vec<PbLoadFinishedSource>,
643 cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
644 truncate_tables: Vec<TableId>,
645 refresh_finished_tables: Vec<TableId>,
646 ) -> AwaitEpochCompletedFuture {
647 let prev_epoch = barrier.epoch.prev;
648 let future = async move {
649 if let Some(future) = complete_barrier_future {
650 let result = future.await;
651 result.map(Some)
652 } else {
653 Ok(None)
654 }
655 }
656 .map(move |result| {
657 (
658 partial_graph_id,
659 barrier,
660 result.map(|sync_result| BarrierCompleteResult {
661 sync_result,
662 create_mview_progress,
663 list_finished_source_ids,
664 load_finished_source_ids,
665 cdc_table_backfill_progress,
666 truncate_tables,
667 refresh_finished_tables,
668 }),
669 )
670 });
671 if let Some(reg) = barrier_await_tree_reg {
672 reg.register(
673 await_tree_key::BarrierAwait { prev_epoch },
674 format!("SyncEpoch({})", prev_epoch),
675 )
676 .instrument(future)
677 .left_future()
678 } else {
679 future.right_future()
680 }
681 }
682}
683
684use await_epoch_completed_future::*;
685use risingwave_common::catalog::{DatabaseId, TableId};
686use risingwave_pb::hummock::vector_index_delta::PbVectorIndexAdds;
687use risingwave_storage::{StateStoreImpl, dispatch_state_store};
688
689use crate::executor::exchange::permit;
690
691fn sync_epoch(
692 state_store: &StateStoreImpl,
693 streaming_metrics: &StreamingMetrics,
694 prev_epoch: u64,
695 table_ids: HashSet<TableId>,
696) -> BoxFuture<'static, StreamResult<SyncResult>> {
697 let timer = streaming_metrics.barrier_sync_latency.start_timer();
698
699 let state_store = state_store.clone();
700 let future = async move {
701 dispatch_state_store!(state_store, hummock, {
702 hummock.sync(vec![(prev_epoch, table_ids)]).await
703 })
704 };
705
706 future
707 .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
708 .inspect_ok(move |_| {
709 timer.observe_duration();
710 })
711 .map_err(move |e| {
712 tracing::error!(
713 prev_epoch,
714 error = %e.as_report(),
715 "Failed to sync state store",
716 );
717 e.into()
718 })
719 .boxed()
720}
721
722impl LocalBarrierWorker {
723 fn complete_barrier(
724 &mut self,
725 database_id: DatabaseId,
726 partial_graph_id: PartialGraphId,
727 prev_epoch: u64,
728 ) {
729 {
730 let Some(database_state) = self
731 .state
732 .databases
733 .get_mut(&database_id)
734 .expect("should exist")
735 .state_for_request()
736 else {
737 return;
738 };
739 let BarrierToComplete {
740 barrier,
741 table_ids,
742 create_mview_progress,
743 list_finished_source_ids,
744 load_finished_source_ids,
745 cdc_table_backfill_progress,
746 truncate_tables,
747 refresh_finished_tables,
748 } = database_state.pop_barrier_to_complete(partial_graph_id, prev_epoch);
749
750 let complete_barrier_future = match &barrier.kind {
751 BarrierKind::Unspecified => unreachable!(),
752 BarrierKind::Initial => {
753 tracing::info!(
754 epoch = prev_epoch,
755 "ignore sealing data for the first barrier"
756 );
757 tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
758 None
759 }
760 BarrierKind::Barrier => None,
761 BarrierKind::Checkpoint => Some(sync_epoch(
762 &self.actor_manager.env.state_store(),
763 &self.actor_manager.streaming_metrics,
764 prev_epoch,
765 table_ids.expect("should be Some on BarrierKind::Checkpoint"),
766 )),
767 };
768
769 self.await_epoch_completed_futures
770 .entry(database_id)
771 .or_default()
772 .push_back({
773 instrument_complete_barrier_future(
774 partial_graph_id,
775 complete_barrier_future,
776 barrier,
777 self.actor_manager.await_tree_reg.as_ref(),
778 create_mview_progress,
779 list_finished_source_ids,
780 load_finished_source_ids,
781 cdc_table_backfill_progress,
782 truncate_tables,
783 refresh_finished_tables,
784 )
785 });
786 }
787 }
788
789 fn on_epoch_completed(
790 &mut self,
791 database_id: DatabaseId,
792 partial_graph_id: PartialGraphId,
793 epoch: u64,
794 result: BarrierCompleteResult,
795 ) {
796 let BarrierCompleteResult {
797 create_mview_progress,
798 sync_result,
799 list_finished_source_ids,
800 load_finished_source_ids,
801 cdc_table_backfill_progress,
802 truncate_tables,
803 refresh_finished_tables,
804 } = result;
805
806 let (synced_sstables, table_watermarks, old_value_ssts, vector_index_adds) = sync_result
807 .map(|sync_result| {
808 (
809 sync_result.uncommitted_ssts,
810 sync_result.table_watermarks,
811 sync_result.old_value_ssts,
812 sync_result.vector_index_adds,
813 )
814 })
815 .unwrap_or_default();
816
817 let result = {
818 {
819 streaming_control_stream_response::Response::CompleteBarrier(
820 BarrierCompleteResponse {
821 request_id: "todo".to_owned(),
822 partial_graph_id: partial_graph_id.into(),
823 epoch,
824 status: None,
825 create_mview_progress,
826 synced_sstables: synced_sstables
827 .into_iter()
828 .map(
829 |LocalSstableInfo {
830 sst_info,
831 table_stats,
832 created_at,
833 }| PbLocalSstableInfo {
834 sst: Some(sst_info.into()),
835 table_stats_map: to_prost_table_stats_map(table_stats),
836 created_at,
837 },
838 )
839 .collect_vec(),
840 worker_id: self.actor_manager.env.worker_id(),
841 table_watermarks: table_watermarks
842 .into_iter()
843 .map(|(key, value)| (key, value.into()))
844 .collect(),
845 old_value_sstables: old_value_ssts
846 .into_iter()
847 .map(|sst| sst.sst_info.into())
848 .collect(),
849 database_id,
850 list_finished_sources: list_finished_source_ids,
851 load_finished_sources: load_finished_source_ids,
852 vector_index_adds: vector_index_adds
853 .into_iter()
854 .map(|(table_id, adds)| {
855 (
856 table_id,
857 PbVectorIndexAdds {
858 adds: adds.into_iter().map(|add| add.into()).collect(),
859 },
860 )
861 })
862 .collect(),
863 cdc_table_backfill_progress,
864 truncate_tables,
865 refresh_finished_tables,
866 },
867 )
868 }
869 };
870
871 self.control_stream_handle.send_response(result);
872 }
873
874 fn send_barrier(
881 &mut self,
882 barrier: &Barrier,
883 request: InjectBarrierRequest,
884 ) -> StreamResult<()> {
885 debug!(
886 target: "events::stream::barrier::manager::send",
887 "send barrier {:?}, actor_ids_to_collect = {:?}",
888 barrier,
889 request.actor_ids_to_collect
890 );
891
892 let database_status = self
893 .state
894 .databases
895 .get_mut(&request.database_id)
896 .expect("should exist");
897 if let Some(state) = database_status.state_for_request() {
898 state.transform_to_issued(barrier, request)?;
899 }
900 Ok(())
901 }
902
903 fn remove_partial_graphs(
904 &mut self,
905 database_id: DatabaseId,
906 partial_graph_ids: impl Iterator<Item = PartialGraphId>,
907 ) {
908 let Some(database_status) = self.state.databases.get_mut(&database_id) else {
909 warn!(
910 %database_id,
911 "database to remove partial graph not exist"
912 );
913 return;
914 };
915 let Some(database_state) = database_status.state_for_request() else {
916 warn!(
917 %database_id,
918 "ignore remove partial graph request on err database",
919 );
920 return;
921 };
922 for partial_graph_id in partial_graph_ids {
923 if let Some(graph) = database_state.graph_states.remove(&partial_graph_id) {
924 assert!(
925 graph.is_empty(),
926 "non empty graph to be removed: {}",
927 &graph
928 );
929 } else {
930 warn!(
931 partial_graph_id = partial_graph_id.0,
932 "no partial graph to remove"
933 );
934 }
935 }
936 }
937
938 fn add_partial_graph(&mut self, database_id: DatabaseId, partial_graph_id: PartialGraphId) {
939 let status = match self.state.databases.entry(database_id) {
940 Entry::Occupied(entry) => {
941 let status = entry.into_mut();
942 if let DatabaseStatus::ReceivedExchangeRequest(pending_requests) = status {
943 let mut database = DatabaseManagedBarrierState::new(
944 database_id,
945 self.term_id.clone(),
946 self.actor_manager.clone(),
947 );
948 for ((upstream_actor_id, actor_id), result_sender) in pending_requests.drain(..)
949 {
950 database.new_actor_remote_output_request(
951 actor_id,
952 upstream_actor_id,
953 result_sender,
954 );
955 }
956 *status = DatabaseStatus::Running(database);
957 }
958
959 status
960 }
961 Entry::Vacant(entry) => {
962 entry.insert(DatabaseStatus::Running(DatabaseManagedBarrierState::new(
963 database_id,
964 self.term_id.clone(),
965 self.actor_manager.clone(),
966 )))
967 }
968 };
969 if let Some(state) = status.state_for_request() {
970 assert!(
971 state
972 .graph_states
973 .insert(
974 partial_graph_id,
975 PartialGraphManagedBarrierState::new(&self.actor_manager)
976 )
977 .is_none()
978 );
979 }
980 }
981
982 fn reset_database(&mut self, req: ResetDatabaseRequest) {
983 let database_id = req.database_id;
984 if let Some(database_status) = self.state.databases.get_mut(&database_id) {
985 database_status.start_reset(
986 database_id,
987 self.await_epoch_completed_futures.remove(&database_id),
988 req.reset_request_id,
989 );
990 } else {
991 self.ack_database_reset(database_id, None, req.reset_request_id);
992 }
993 }
994
995 fn ack_database_reset(
996 &mut self,
997 database_id: DatabaseId,
998 reset_output: Option<ResetDatabaseOutput>,
999 reset_request_id: u32,
1000 ) {
1001 info!(
1002 %database_id,
1003 "database reset successfully"
1004 );
1005 if let Some(reset_database) = self.state.databases.remove(&database_id) {
1006 match reset_database {
1007 DatabaseStatus::Resetting(_) => {}
1008 _ => {
1009 unreachable!("must be resetting previously")
1010 }
1011 }
1012 }
1013 self.await_epoch_completed_futures.remove(&database_id);
1014 self.control_stream_handle.ack_reset_database(
1015 database_id,
1016 reset_output.and_then(|output| output.root_err),
1017 reset_request_id,
1018 );
1019 }
1020
1021 fn on_database_failure(
1025 &mut self,
1026 database_id: DatabaseId,
1027 failed_actor: Option<ActorId>,
1028 err: StreamError,
1029 message: impl Into<String>,
1030 ) {
1031 let message = message.into();
1032 error!(%database_id, ?failed_actor, message, err = ?err.as_report(), "suspend database on error");
1033 let completing_futures = self.await_epoch_completed_futures.remove(&database_id);
1034 self.state
1035 .databases
1036 .get_mut(&database_id)
1037 .expect("should exist")
1038 .suspend(failed_actor, err, completing_futures);
1039 self.control_stream_handle
1040 .send_response(Response::ReportDatabaseFailure(
1041 ReportDatabaseFailureResponse { database_id },
1042 ));
1043 }
1044
1045 async fn reset(&mut self, init_request: InitRequest) {
1047 join_all(
1048 self.state
1049 .databases
1050 .values_mut()
1051 .map(|database| database.abort()),
1052 )
1053 .await;
1054 if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
1055 m.clear();
1056 }
1057
1058 if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
1059 hummock
1060 .clear_shared_buffer()
1061 .instrument_await("store_clear_shared_buffer".verbose())
1062 .await
1063 }
1064 self.actor_manager.env.dml_manager_ref().clear();
1065 *self = Self::new(self.actor_manager.clone(), init_request.term_id);
1066 self.actor_manager.env.client_pool().invalidate_all();
1067 }
1068
1069 pub fn spawn(
1071 env: StreamEnvironment,
1072 streaming_metrics: Arc<StreamingMetrics>,
1073 await_tree_reg: Option<await_tree::Registry>,
1074 watermark_epoch: AtomicU64Ref,
1075 actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1076 ) -> JoinHandle<()> {
1077 let runtime = {
1078 let mut builder = tokio::runtime::Builder::new_multi_thread();
1079 if let Some(worker_threads_num) = env.global_config().actor_runtime_worker_threads_num {
1080 builder.worker_threads(worker_threads_num);
1081 }
1082 builder
1083 .thread_name("rw-streaming")
1084 .enable_all()
1085 .build()
1086 .unwrap()
1087 };
1088
1089 let actor_manager = Arc::new(StreamActorManager {
1090 env,
1091 streaming_metrics,
1092 watermark_epoch,
1093 await_tree_reg,
1094 runtime: runtime.into(),
1095 config_override_cache: ConfigOverrideCache::new(CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY),
1096 });
1097 let worker = LocalBarrierWorker::new(actor_manager, "uninitialized".into());
1098 tokio::spawn(worker.run(actor_op_rx))
1099 }
1100}
1101
1102pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1103
1104impl<T> Clone for EventSender<T> {
1105 fn clone(&self) -> Self {
1106 Self(self.0.clone())
1107 }
1108}
1109
1110impl<T> EventSender<T> {
1111 pub(super) fn send_event(&self, event: T) {
1112 self.0.send(event).expect("should be able to send event")
1113 }
1114
1115 pub(super) async fn send_and_await<RSP>(
1116 &self,
1117 make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1118 ) -> StreamResult<RSP> {
1119 let (tx, rx) = oneshot::channel();
1120 let event = make_event(tx);
1121 self.send_event(event);
1122 rx.await
1123 .map_err(|_| anyhow!("barrier manager maybe reset").into())
1124 }
1125}
1126
1127pub(crate) enum NewOutputRequest {
1128 Local(permit::Sender),
1129 Remote(permit::Sender),
1130}
1131
1132#[cfg(test)]
1133pub(crate) mod barrier_test_utils {
1134 use assert_matches::assert_matches;
1135 use futures::StreamExt;
1136 use risingwave_pb::stream_service::streaming_control_stream_request::{
1137 InitRequest, PbCreatePartialGraphRequest,
1138 };
1139 use risingwave_pb::stream_service::{
1140 InjectBarrierRequest, PbStreamingControlStreamRequest, StreamingControlStreamRequest,
1141 StreamingControlStreamResponse, streaming_control_stream_request,
1142 streaming_control_stream_response,
1143 };
1144 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1145 use tokio::sync::oneshot;
1146 use tokio_stream::wrappers::UnboundedReceiverStream;
1147 use tonic::Status;
1148
1149 use crate::executor::Barrier;
1150 use crate::task::barrier_worker::{ControlStreamHandle, EventSender, LocalActorOperation};
1151 use crate::task::{
1152 ActorId, LocalBarrierManager, NewOutputRequest, TEST_DATABASE_ID, TEST_PARTIAL_GRAPH_ID,
1153 };
1154
1155 pub(crate) struct LocalBarrierTestEnv {
1156 pub local_barrier_manager: LocalBarrierManager,
1157 pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1158 pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1159 pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1160 }
1161
1162 impl LocalBarrierTestEnv {
1163 pub(crate) async fn for_test() -> Self {
1164 let actor_op_tx = LocalBarrierManager::spawn_for_test();
1165
1166 let (request_tx, request_rx) = unbounded_channel();
1167 let (response_tx, mut response_rx) = unbounded_channel();
1168
1169 request_tx
1170 .send(Ok(PbStreamingControlStreamRequest {
1171 request: Some(
1172 streaming_control_stream_request::Request::CreatePartialGraph(
1173 PbCreatePartialGraphRequest {
1174 partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1175 database_id: TEST_DATABASE_ID,
1176 },
1177 ),
1178 ),
1179 }))
1180 .unwrap();
1181
1182 actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1183 handle: ControlStreamHandle::new(
1184 response_tx,
1185 UnboundedReceiverStream::new(request_rx).boxed(),
1186 ),
1187 init_request: InitRequest {
1188 term_id: "for_test".into(),
1189 },
1190 });
1191
1192 assert_matches!(
1193 response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1194 streaming_control_stream_response::Response::Init(_)
1195 );
1196
1197 let local_barrier_manager = actor_op_tx
1198 .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1199 .await
1200 .unwrap();
1201
1202 Self {
1203 local_barrier_manager,
1204 actor_op_tx,
1205 request_tx,
1206 response_rx,
1207 }
1208 }
1209
1210 pub(crate) fn inject_barrier(
1211 &self,
1212 barrier: &Barrier,
1213 actor_to_collect: impl IntoIterator<Item = ActorId>,
1214 ) {
1215 self.request_tx
1216 .send(Ok(StreamingControlStreamRequest {
1217 request: Some(streaming_control_stream_request::Request::InjectBarrier(
1218 InjectBarrierRequest {
1219 request_id: "".to_owned(),
1220 barrier: Some(barrier.to_protobuf()),
1221 database_id: TEST_DATABASE_ID,
1222 actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1223 table_ids_to_sync: vec![],
1224 partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1225 actors_to_build: vec![],
1226 },
1227 )),
1228 }))
1229 .unwrap();
1230 }
1231
1232 pub(crate) async fn flush_all_events(&self) {
1233 Self::flush_all_events_impl(&self.actor_op_tx).await
1234 }
1235
1236 pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1237 let (tx, rx) = oneshot::channel();
1238 actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1239 rx.await.unwrap()
1240 }
1241
1242 pub(crate) async fn take_pending_new_output_requests(
1243 &self,
1244 actor_id: ActorId,
1245 ) -> Vec<(ActorId, NewOutputRequest)> {
1246 self.actor_op_tx
1247 .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1248 .await
1249 .unwrap()
1250 }
1251 }
1252}