1use std::collections::hash_map::Entry;
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::future::{pending, poll_fn};
18use std::sync::Arc;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use await_tree::{InstrumentAwait, SpanExt};
23use futures::future::{BoxFuture, join_all};
24use futures::stream::{BoxStream, FuturesOrdered};
25use futures::{FutureExt, StreamExt, TryFutureExt};
26use itertools::Itertools;
27use risingwave_pb::stream_plan::barrier::BarrierKind;
28use risingwave_pb::stream_service::barrier_complete_response::{
29 PbCdcTableBackfillProgress, PbCreateMviewProgress, PbLocalSstableInfo,
30};
31use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
32use risingwave_storage::store_impl::AsHummock;
33use thiserror_ext::AsReport;
34use tokio::select;
35use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
36use tokio::sync::oneshot;
37use tokio::task::JoinHandle;
38use tonic::{Code, Status};
39use tracing::warn;
40
41use self::managed_state::ManagedBarrierState;
42use crate::error::{ScoredStreamError, StreamError, StreamResult};
43#[cfg(test)]
44use crate::task::LocalBarrierManager;
45use crate::task::managed_state::BarrierToComplete;
46use crate::task::{
47 ActorId, AtomicU64Ref, PartialGraphId, StreamActorManager, StreamEnvironment, UpDownActorIds,
48};
49pub mod managed_state;
50#[cfg(test)]
51mod tests;
52
53use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
54use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
55use risingwave_pb::stream_service::streaming_control_stream_request::{
56 DatabaseInitialPartialGraph, InitRequest, Request, ResetDatabaseRequest,
57};
58use risingwave_pb::stream_service::streaming_control_stream_response::{
59 InitResponse, ReportDatabaseFailureResponse, ResetDatabaseResponse, Response, ShutdownResponse,
60};
61use risingwave_pb::stream_service::{
62 BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
63 StreamingControlStreamResponse, streaming_control_stream_response,
64};
65
66use crate::executor::Barrier;
67use crate::executor::exchange::permit::Receiver;
68use crate::executor::monitor::StreamingMetrics;
69use crate::task::barrier_worker::managed_state::{
70 DatabaseManagedBarrierState, DatabaseStatus, ManagedBarrierStateDebugInfo,
71 ManagedBarrierStateEvent, PartialGraphManagedBarrierState, ResetDatabaseOutput,
72};
73
74pub const ENABLE_BARRIER_AGGREGATION: bool = false;
77
78#[derive(Debug)]
80pub struct BarrierCompleteResult {
81 pub sync_result: Option<SyncResult>,
83
84 pub create_mview_progress: Vec<PbCreateMviewProgress>,
86
87 pub load_finished_source_ids: Vec<u32>,
89
90 pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
91
92 pub truncate_tables: Vec<u32>,
94 pub refresh_finished_tables: Vec<u32>,
96}
97
98pub(super) struct ControlStreamHandle {
102 #[expect(clippy::type_complexity)]
103 pair: Option<(
104 UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
105 BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
106 )>,
107}
108
109impl ControlStreamHandle {
110 fn empty() -> Self {
111 Self { pair: None }
112 }
113
114 pub(super) fn new(
115 sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
116 request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
117 ) -> Self {
118 Self {
119 pair: Some((sender, request_stream)),
120 }
121 }
122
123 pub(super) fn connected(&self) -> bool {
124 self.pair.is_some()
125 }
126
127 fn reset_stream_with_err(&mut self, err: Status) {
128 if let Some((sender, _)) = self.pair.take() {
129 let err = TonicStatusWrapper::new(err);
131 warn!(error = %err.as_report(), "control stream reset with error");
132
133 let err = err.into_inner();
134 if sender.send(Err(err)).is_err() {
135 warn!("failed to notify reset of control stream");
136 }
137 }
138 }
139
140 async fn shutdown_stream(&mut self) {
143 if let Some((sender, _)) = self.pair.take() {
144 if sender
145 .send(Ok(StreamingControlStreamResponse {
146 response: Some(streaming_control_stream_response::Response::Shutdown(
147 ShutdownResponse::default(),
148 )),
149 }))
150 .is_err()
151 {
152 warn!("failed to notify shutdown of control stream");
153 } else {
154 tracing::info!("waiting for meta service to close control stream...");
155
156 sender.closed().await;
163 }
164 } else {
165 debug!("control stream has been reset, ignore shutdown");
166 }
167 }
168
169 pub(super) fn ack_reset_database(
170 &mut self,
171 database_id: DatabaseId,
172 root_err: Option<ScoredStreamError>,
173 reset_request_id: u32,
174 ) {
175 self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
176 database_id: database_id.database_id,
177 root_err: root_err.map(|err| PbScoredError {
178 err_msg: err.error.to_report_string(),
179 score: err.score.0,
180 }),
181 reset_request_id,
182 }));
183 }
184
185 fn send_response(&mut self, response: streaming_control_stream_response::Response) {
186 if let Some((sender, _)) = self.pair.as_ref() {
187 if sender
188 .send(Ok(StreamingControlStreamResponse {
189 response: Some(response),
190 }))
191 .is_err()
192 {
193 self.pair = None;
194 warn!("fail to send response. control stream reset");
195 }
196 } else {
197 debug!(?response, "control stream has been reset. ignore response");
198 }
199 }
200
201 async fn next_request(&mut self) -> StreamingControlStreamRequest {
202 if let Some((_, stream)) = &mut self.pair {
203 match stream.next().await {
204 Some(Ok(request)) => {
205 return request;
206 }
207 Some(Err(e)) => self.reset_stream_with_err(
208 anyhow!(TonicStatusWrapper::new(e)) .context("failed to get request")
210 .to_status_unnamed(Code::Internal),
211 ),
212 None => self.reset_stream_with_err(Status::internal("end of stream")),
213 }
214 }
215 pending().await
216 }
217}
218
219#[derive(strum_macros::Display)]
223pub(super) enum LocalActorOperation {
224 NewControlStream {
225 handle: ControlStreamHandle,
226 init_request: InitRequest,
227 },
228 TakeReceiver {
229 database_id: DatabaseId,
230 term_id: String,
231 ids: UpDownActorIds,
232 result_sender: oneshot::Sender<StreamResult<Receiver>>,
233 },
234 #[cfg(test)]
235 GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
236 #[cfg(test)]
237 TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
238 #[cfg(test)]
239 Flush(oneshot::Sender<()>),
240 InspectState {
241 result_sender: oneshot::Sender<String>,
242 },
243 Shutdown {
244 result_sender: oneshot::Sender<()>,
245 },
246}
247
248pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
249 managed_barrier_state: HashMap<DatabaseId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
250 has_control_stream_connected: bool,
251}
252
253impl Display for LocalBarrierWorkerDebugInfo<'_> {
254 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255 writeln!(
256 f,
257 "\nhas_control_stream_connected: {}",
258 self.has_control_stream_connected
259 )?;
260
261 for (database_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
262 writeln!(
263 f,
264 "database {} status: {} managed_barrier_state:\n{}",
265 database_id.database_id,
266 status,
267 managed_barrier_state
268 .as_ref()
269 .map(ToString::to_string)
270 .unwrap_or_default()
271 )?;
272 }
273 Ok(())
274 }
275}
276
277pub(super) struct LocalBarrierWorker {
285 pub(super) state: ManagedBarrierState,
287
288 await_epoch_completed_futures: HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
290
291 control_stream_handle: ControlStreamHandle,
292
293 pub(super) actor_manager: Arc<StreamActorManager>,
294
295 pub(super) term_id: String,
296}
297
298impl LocalBarrierWorker {
299 pub(super) fn new(
300 actor_manager: Arc<StreamActorManager>,
301 initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
302 term_id: String,
303 ) -> Self {
304 let state = ManagedBarrierState::new(
305 actor_manager.clone(),
306 initial_partial_graphs,
307 term_id.clone(),
308 );
309 Self {
310 state,
311 await_epoch_completed_futures: Default::default(),
312 control_stream_handle: ControlStreamHandle::empty(),
313 actor_manager,
314 term_id,
315 }
316 }
317
318 fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
319 LocalBarrierWorkerDebugInfo {
320 managed_barrier_state: self
321 .state
322 .databases
323 .iter()
324 .map(|(database_id, status)| {
325 (*database_id, {
326 match status {
327 DatabaseStatus::ReceivedExchangeRequest(_) => {
328 ("ReceivedExchangeRequest".to_owned(), None)
329 }
330 DatabaseStatus::Running(state) => {
331 ("running".to_owned(), Some(state.to_debug_info()))
332 }
333 DatabaseStatus::Suspended(state) => {
334 (format!("suspended: {:?}", state.suspend_time), None)
335 }
336 DatabaseStatus::Resetting(_) => ("resetting".to_owned(), None),
337 DatabaseStatus::Unspecified => {
338 unreachable!()
339 }
340 }
341 })
342 })
343 .collect(),
344 has_control_stream_connected: self.control_stream_handle.connected(),
345 }
346 }
347
348 async fn next_completed_epoch(
349 futures: &mut HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
350 ) -> (
351 DatabaseId,
352 PartialGraphId,
353 Barrier,
354 StreamResult<BarrierCompleteResult>,
355 ) {
356 poll_fn(|cx| {
357 for (database_id, futures) in &mut *futures {
358 if let Poll::Ready(Some((partial_graph_id, barrier, result))) =
359 futures.poll_next_unpin(cx)
360 {
361 return Poll::Ready((*database_id, partial_graph_id, barrier, result));
362 }
363 }
364 Poll::Pending
365 })
366 .await
367 }
368
369 async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
370 loop {
371 select! {
372 biased;
373 (database_id, event) = self.state.next_event() => {
374 match event {
375 ManagedBarrierStateEvent::BarrierCollected{
376 partial_graph_id,
377 barrier,
378 } => {
379 self.complete_barrier(database_id, partial_graph_id, barrier.epoch.prev);
382 }
383 ManagedBarrierStateEvent::ActorError{
384 actor_id,
385 err,
386 } => {
387 self.on_database_failure(database_id, Some(actor_id), err, "recv actor failure");
388 }
389 ManagedBarrierStateEvent::DatabaseReset(output, reset_request_id) => {
390 self.ack_database_reset(database_id, Some(output), reset_request_id);
391 }
392 }
393 }
394 (database_id, partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
395 match result {
396 Ok(result) => {
397 self.on_epoch_completed(database_id, partial_graph_id, barrier.epoch.prev, result);
398 }
399 Err(err) => {
400 self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {} {:?} {:?}", database_id, partial_graph_id.0, barrier.epoch, err.as_report())));
404 }
405 }
406 },
407 actor_op = actor_op_rx.recv() => {
408 if let Some(actor_op) = actor_op {
409 match actor_op {
410 LocalActorOperation::NewControlStream { handle, init_request } => {
411 self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
412 self.reset(init_request).await;
413 self.control_stream_handle = handle;
414 self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
415 }
416 LocalActorOperation::Shutdown { result_sender } => {
417 if self.state.databases.values().any(|database| {
418 match database {
419 DatabaseStatus::Running(database) => {
420 !database.actor_states.is_empty()
421 }
422 DatabaseStatus::Suspended(_) | DatabaseStatus::Resetting(_) |
423 DatabaseStatus::ReceivedExchangeRequest(_) => {
424 false
425 }
426 DatabaseStatus::Unspecified => {
427 unreachable!()
428 }
429 }
430 }) {
431 tracing::warn!(
432 "shutdown with running actors, scaling or migration will be triggered"
433 );
434 }
435 self.control_stream_handle.shutdown_stream().await;
436 let _ = result_sender.send(());
437 }
438 actor_op => {
439 self.handle_actor_op(actor_op);
440 }
441 }
442 }
443 else {
444 break;
445 }
446 },
447 request = self.control_stream_handle.next_request() => {
448 let result = self.handle_streaming_control_request(request.request.expect("non empty"));
449 if let Err((database_id, err)) = result {
450 self.on_database_failure(database_id, None, err, "failed to inject barrier");
451 }
452 },
453 }
454 }
455 }
456
457 fn handle_streaming_control_request(
458 &mut self,
459 request: Request,
460 ) -> Result<(), (DatabaseId, StreamError)> {
461 match request {
462 Request::InjectBarrier(req) => {
463 let database_id = DatabaseId::new(req.database_id);
464 let result: StreamResult<()> = try {
465 let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
466 self.send_barrier(&barrier, req)?;
467 };
468 result.map_err(|e| (database_id, e))?;
469 Ok(())
470 }
471 Request::RemovePartialGraph(req) => {
472 self.remove_partial_graphs(
473 DatabaseId::new(req.database_id),
474 req.partial_graph_ids.into_iter().map(PartialGraphId::new),
475 );
476 Ok(())
477 }
478 Request::CreatePartialGraph(req) => {
479 self.add_partial_graph(
480 DatabaseId::new(req.database_id),
481 PartialGraphId::new(req.partial_graph_id),
482 );
483 Ok(())
484 }
485 Request::ResetDatabase(req) => {
486 self.reset_database(req);
487 Ok(())
488 }
489 Request::Init(_) => {
490 unreachable!()
491 }
492 }
493 }
494
495 fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
496 match actor_op {
497 LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
498 unreachable!("event {actor_op} should be handled separately in async context")
499 }
500 LocalActorOperation::TakeReceiver {
501 database_id,
502 term_id,
503 ids,
504 result_sender,
505 } => {
506 let err = if self.term_id != term_id {
507 {
508 warn!(
509 ?ids,
510 term_id,
511 current_term_id = self.term_id,
512 "take receiver on unmatched term_id"
513 );
514 anyhow!(
515 "take receiver {:?} on unmatched term_id {} to current term_id {}",
516 ids,
517 term_id,
518 self.term_id
519 )
520 }
521 } else {
522 match self.state.databases.entry(database_id) {
523 Entry::Occupied(mut entry) => match entry.get_mut() {
524 DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
525 pending_requests.push((ids, result_sender));
526 return;
527 }
528 DatabaseStatus::Running(database) => {
529 let (upstream_actor_id, actor_id) = ids;
530 database.new_actor_remote_output_request(
531 actor_id,
532 upstream_actor_id,
533 result_sender,
534 );
535 return;
536 }
537 DatabaseStatus::Suspended(_) => {
538 anyhow!("database suspended")
539 }
540 DatabaseStatus::Resetting(_) => {
541 anyhow!("database resetting")
542 }
543 DatabaseStatus::Unspecified => {
544 unreachable!()
545 }
546 },
547 Entry::Vacant(entry) => {
548 entry.insert(DatabaseStatus::ReceivedExchangeRequest(vec![(
549 ids,
550 result_sender,
551 )]));
552 return;
553 }
554 }
555 };
556 let _ = result_sender.send(Err(err.into()));
557 }
558 #[cfg(test)]
559 LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
560 let database_status = self
561 .state
562 .databases
563 .get(&crate::task::TEST_DATABASE_ID)
564 .unwrap();
565 let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
566 let _ = sender.send(database_state.local_barrier_manager.clone());
567 }
568 #[cfg(test)]
569 LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
570 let database_status = self
571 .state
572 .databases
573 .get_mut(&crate::task::TEST_DATABASE_ID)
574 .unwrap();
575
576 let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
577 assert!(!database_state.actor_states.contains_key(&actor_id));
578 let requests = database_state
579 .actor_pending_new_output_requests
580 .remove(&actor_id)
581 .unwrap();
582 let _ = sender.send(requests);
583 }
584 #[cfg(test)]
585 LocalActorOperation::Flush(sender) => {
586 use futures::FutureExt;
587 while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
588 self.handle_streaming_control_request(
589 request.request.expect("should not be empty"),
590 )
591 .unwrap();
592 }
593 while let Some((database_id, event)) = self.state.next_event().now_or_never() {
594 match event {
595 ManagedBarrierStateEvent::BarrierCollected {
596 partial_graph_id,
597 barrier,
598 } => {
599 self.complete_barrier(
600 database_id,
601 partial_graph_id,
602 barrier.epoch.prev,
603 );
604 }
605 ManagedBarrierStateEvent::ActorError { .. }
606 | ManagedBarrierStateEvent::DatabaseReset(..) => {
607 unreachable!()
608 }
609 }
610 }
611 sender.send(()).unwrap()
612 }
613 LocalActorOperation::InspectState { result_sender } => {
614 let debug_info = self.to_debug_info();
615 let _ = result_sender.send(debug_info.to_string());
616 }
617 }
618 }
619}
620
621mod await_epoch_completed_future {
622 use std::future::Future;
623
624 use futures::FutureExt;
625 use futures::future::BoxFuture;
626 use risingwave_hummock_sdk::SyncResult;
627 use risingwave_pb::stream_service::barrier_complete_response::{
628 PbCdcTableBackfillProgress, PbCreateMviewProgress,
629 };
630
631 use crate::error::StreamResult;
632 use crate::executor::Barrier;
633 use crate::task::{BarrierCompleteResult, PartialGraphId, await_tree_key};
634
635 pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
636 + 'static;
637
638 #[define_opaque(AwaitEpochCompletedFuture)]
639 #[expect(clippy::too_many_arguments)]
640 pub(super) fn instrument_complete_barrier_future(
641 partial_graph_id: PartialGraphId,
642 complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
643 barrier: Barrier,
644 barrier_await_tree_reg: Option<&await_tree::Registry>,
645 create_mview_progress: Vec<PbCreateMviewProgress>,
646 load_finished_source_ids: Vec<u32>,
647 cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
648 truncate_tables: Vec<u32>,
649 refresh_finished_tables: Vec<u32>,
650 ) -> AwaitEpochCompletedFuture {
651 let prev_epoch = barrier.epoch.prev;
652 let future = async move {
653 if let Some(future) = complete_barrier_future {
654 let result = future.await;
655 result.map(Some)
656 } else {
657 Ok(None)
658 }
659 }
660 .map(move |result| {
661 (
662 partial_graph_id,
663 barrier,
664 result.map(|sync_result| BarrierCompleteResult {
665 sync_result,
666 create_mview_progress,
667 load_finished_source_ids,
668 cdc_table_backfill_progress,
669 truncate_tables,
670 refresh_finished_tables,
671 }),
672 )
673 });
674 if let Some(reg) = barrier_await_tree_reg {
675 reg.register(
676 await_tree_key::BarrierAwait { prev_epoch },
677 format!("SyncEpoch({})", prev_epoch),
678 )
679 .instrument(future)
680 .left_future()
681 } else {
682 future.right_future()
683 }
684 }
685}
686
687use await_epoch_completed_future::*;
688use risingwave_common::catalog::{DatabaseId, TableId};
689use risingwave_pb::hummock::vector_index_delta::PbVectorIndexAdds;
690use risingwave_storage::{StateStoreImpl, dispatch_state_store};
691
692use crate::executor::exchange::permit;
693
694fn sync_epoch(
695 state_store: &StateStoreImpl,
696 streaming_metrics: &StreamingMetrics,
697 prev_epoch: u64,
698 table_ids: HashSet<TableId>,
699) -> BoxFuture<'static, StreamResult<SyncResult>> {
700 let timer = streaming_metrics.barrier_sync_latency.start_timer();
701
702 let state_store = state_store.clone();
703 let future = async move {
704 dispatch_state_store!(state_store, hummock, {
705 hummock.sync(vec![(prev_epoch, table_ids)]).await
706 })
707 };
708
709 future
710 .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
711 .inspect_ok(move |_| {
712 timer.observe_duration();
713 })
714 .map_err(move |e| {
715 tracing::error!(
716 prev_epoch,
717 error = %e.as_report(),
718 "Failed to sync state store",
719 );
720 e.into()
721 })
722 .boxed()
723}
724
725impl LocalBarrierWorker {
726 fn complete_barrier(
727 &mut self,
728 database_id: DatabaseId,
729 partial_graph_id: PartialGraphId,
730 prev_epoch: u64,
731 ) {
732 {
733 let Some(database_state) = self
734 .state
735 .databases
736 .get_mut(&database_id)
737 .expect("should exist")
738 .state_for_request()
739 else {
740 return;
741 };
742 let BarrierToComplete {
743 barrier,
744 table_ids,
745 create_mview_progress,
746 load_finished_source_ids,
747 cdc_table_backfill_progress,
748 truncate_tables,
749 refresh_finished_tables,
750 } = database_state.pop_barrier_to_complete(partial_graph_id, prev_epoch);
751
752 let complete_barrier_future = match &barrier.kind {
753 BarrierKind::Unspecified => unreachable!(),
754 BarrierKind::Initial => {
755 tracing::info!(
756 epoch = prev_epoch,
757 "ignore sealing data for the first barrier"
758 );
759 tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
760 None
761 }
762 BarrierKind::Barrier => None,
763 BarrierKind::Checkpoint => Some(sync_epoch(
764 &self.actor_manager.env.state_store(),
765 &self.actor_manager.streaming_metrics,
766 prev_epoch,
767 table_ids.expect("should be Some on BarrierKind::Checkpoint"),
768 )),
769 };
770
771 self.await_epoch_completed_futures
772 .entry(database_id)
773 .or_default()
774 .push_back({
775 instrument_complete_barrier_future(
776 partial_graph_id,
777 complete_barrier_future,
778 barrier,
779 self.actor_manager.await_tree_reg.as_ref(),
780 create_mview_progress,
781 load_finished_source_ids,
782 cdc_table_backfill_progress,
783 truncate_tables,
784 refresh_finished_tables,
785 )
786 });
787 }
788 }
789
790 fn on_epoch_completed(
791 &mut self,
792 database_id: DatabaseId,
793 partial_graph_id: PartialGraphId,
794 epoch: u64,
795 result: BarrierCompleteResult,
796 ) {
797 let BarrierCompleteResult {
798 create_mview_progress,
799 sync_result,
800 load_finished_source_ids,
801 cdc_table_backfill_progress,
802 truncate_tables,
803 refresh_finished_tables,
804 } = result;
805
806 let (synced_sstables, table_watermarks, old_value_ssts, vector_index_adds) = sync_result
807 .map(|sync_result| {
808 (
809 sync_result.uncommitted_ssts,
810 sync_result.table_watermarks,
811 sync_result.old_value_ssts,
812 sync_result.vector_index_adds,
813 )
814 })
815 .unwrap_or_default();
816
817 let result = {
818 {
819 streaming_control_stream_response::Response::CompleteBarrier(
820 BarrierCompleteResponse {
821 request_id: "todo".to_owned(),
822 partial_graph_id: partial_graph_id.into(),
823 epoch,
824 status: None,
825 create_mview_progress,
826 synced_sstables: synced_sstables
827 .into_iter()
828 .map(
829 |LocalSstableInfo {
830 sst_info,
831 table_stats,
832 created_at,
833 }| PbLocalSstableInfo {
834 sst: Some(sst_info.into()),
835 table_stats_map: to_prost_table_stats_map(table_stats),
836 created_at,
837 },
838 )
839 .collect_vec(),
840 worker_id: self.actor_manager.env.worker_id(),
841 table_watermarks: table_watermarks
842 .into_iter()
843 .map(|(key, value)| (key.table_id, value.into()))
844 .collect(),
845 old_value_sstables: old_value_ssts
846 .into_iter()
847 .map(|sst| sst.sst_info.into())
848 .collect(),
849 database_id: database_id.database_id,
850 load_finished_source_ids,
851 vector_index_adds: vector_index_adds
852 .into_iter()
853 .map(|(table_id, adds)| {
854 (
855 table_id.table_id,
856 PbVectorIndexAdds {
857 adds: adds.into_iter().map(|add| add.into()).collect(),
858 },
859 )
860 })
861 .collect(),
862 cdc_table_backfill_progress,
863 truncate_tables,
864 refresh_finished_tables,
865 },
866 )
867 }
868 };
869
870 self.control_stream_handle.send_response(result);
871 }
872
873 fn send_barrier(
880 &mut self,
881 barrier: &Barrier,
882 request: InjectBarrierRequest,
883 ) -> StreamResult<()> {
884 debug!(
885 target: "events::stream::barrier::manager::send",
886 "send barrier {:?}, actor_ids_to_collect = {:?}",
887 barrier,
888 request.actor_ids_to_collect
889 );
890
891 let database_status = self
892 .state
893 .databases
894 .get_mut(&DatabaseId::new(request.database_id))
895 .expect("should exist");
896 if let Some(state) = database_status.state_for_request() {
897 state.transform_to_issued(barrier, request)?;
898 }
899 Ok(())
900 }
901
902 fn remove_partial_graphs(
903 &mut self,
904 database_id: DatabaseId,
905 partial_graph_ids: impl Iterator<Item = PartialGraphId>,
906 ) {
907 let Some(database_status) = self.state.databases.get_mut(&database_id) else {
908 warn!(
909 database_id = database_id.database_id,
910 "database to remove partial graph not exist"
911 );
912 return;
913 };
914 let Some(database_state) = database_status.state_for_request() else {
915 warn!(
916 database_id = database_id.database_id,
917 "ignore remove partial graph request on err database",
918 );
919 return;
920 };
921 for partial_graph_id in partial_graph_ids {
922 if let Some(graph) = database_state.graph_states.remove(&partial_graph_id) {
923 assert!(
924 graph.is_empty(),
925 "non empty graph to be removed: {}",
926 &graph
927 );
928 } else {
929 warn!(
930 partial_graph_id = partial_graph_id.0,
931 "no partial graph to remove"
932 );
933 }
934 }
935 }
936
937 fn add_partial_graph(&mut self, database_id: DatabaseId, partial_graph_id: PartialGraphId) {
938 let status = match self.state.databases.entry(database_id) {
939 Entry::Occupied(entry) => {
940 let status = entry.into_mut();
941 if let DatabaseStatus::ReceivedExchangeRequest(pending_requests) = status {
942 let mut database = DatabaseManagedBarrierState::new(
943 database_id,
944 self.term_id.clone(),
945 self.actor_manager.clone(),
946 vec![],
947 );
948 for ((upstream_actor_id, actor_id), result_sender) in pending_requests.drain(..)
949 {
950 database.new_actor_remote_output_request(
951 actor_id,
952 upstream_actor_id,
953 result_sender,
954 );
955 }
956 *status = DatabaseStatus::Running(database);
957 }
958
959 status
960 }
961 Entry::Vacant(entry) => {
962 entry.insert(DatabaseStatus::Running(DatabaseManagedBarrierState::new(
963 database_id,
964 self.term_id.clone(),
965 self.actor_manager.clone(),
966 vec![],
967 )))
968 }
969 };
970 if let Some(state) = status.state_for_request() {
971 assert!(
972 state
973 .graph_states
974 .insert(
975 partial_graph_id,
976 PartialGraphManagedBarrierState::new(&self.actor_manager)
977 )
978 .is_none()
979 );
980 }
981 }
982
983 fn reset_database(&mut self, req: ResetDatabaseRequest) {
984 let database_id = DatabaseId::new(req.database_id);
985 if let Some(database_status) = self.state.databases.get_mut(&database_id) {
986 database_status.start_reset(
987 database_id,
988 self.await_epoch_completed_futures.remove(&database_id),
989 req.reset_request_id,
990 );
991 } else {
992 self.ack_database_reset(database_id, None, req.reset_request_id);
993 }
994 }
995
996 fn ack_database_reset(
997 &mut self,
998 database_id: DatabaseId,
999 reset_output: Option<ResetDatabaseOutput>,
1000 reset_request_id: u32,
1001 ) {
1002 info!(
1003 database_id = database_id.database_id,
1004 "database reset successfully"
1005 );
1006 if let Some(reset_database) = self.state.databases.remove(&database_id) {
1007 match reset_database {
1008 DatabaseStatus::Resetting(_) => {}
1009 _ => {
1010 unreachable!("must be resetting previously")
1011 }
1012 }
1013 }
1014 self.await_epoch_completed_futures.remove(&database_id);
1015 self.control_stream_handle.ack_reset_database(
1016 database_id,
1017 reset_output.and_then(|output| output.root_err),
1018 reset_request_id,
1019 );
1020 }
1021
1022 fn on_database_failure(
1026 &mut self,
1027 database_id: DatabaseId,
1028 failed_actor: Option<ActorId>,
1029 err: StreamError,
1030 message: impl Into<String>,
1031 ) {
1032 let message = message.into();
1033 error!(database_id = database_id.database_id, ?failed_actor, message, err = ?err.as_report(), "suspend database on error");
1034 let completing_futures = self.await_epoch_completed_futures.remove(&database_id);
1035 self.state
1036 .databases
1037 .get_mut(&database_id)
1038 .expect("should exist")
1039 .suspend(failed_actor, err, completing_futures);
1040 self.control_stream_handle
1041 .send_response(Response::ReportDatabaseFailure(
1042 ReportDatabaseFailureResponse {
1043 database_id: database_id.database_id,
1044 },
1045 ));
1046 }
1047
1048 async fn reset(&mut self, init_request: InitRequest) {
1050 join_all(
1051 self.state
1052 .databases
1053 .values_mut()
1054 .map(|database| database.abort()),
1055 )
1056 .await;
1057 if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
1058 m.clear();
1059 }
1060
1061 if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
1062 hummock
1063 .clear_shared_buffer()
1064 .instrument_await("store_clear_shared_buffer".verbose())
1065 .await
1066 }
1067 self.actor_manager.env.dml_manager_ref().clear();
1068 *self = Self::new(
1069 self.actor_manager.clone(),
1070 init_request.databases,
1071 init_request.term_id,
1072 );
1073 self.actor_manager.env.client_pool().invalidate_all();
1074 }
1075
1076 pub fn spawn(
1078 env: StreamEnvironment,
1079 streaming_metrics: Arc<StreamingMetrics>,
1080 await_tree_reg: Option<await_tree::Registry>,
1081 watermark_epoch: AtomicU64Ref,
1082 actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1083 ) -> JoinHandle<()> {
1084 let runtime = {
1085 let mut builder = tokio::runtime::Builder::new_multi_thread();
1086 if let Some(worker_threads_num) = env.config().actor_runtime_worker_threads_num {
1087 builder.worker_threads(worker_threads_num);
1088 }
1089 builder
1090 .thread_name("rw-streaming")
1091 .enable_all()
1092 .build()
1093 .unwrap()
1094 };
1095
1096 let actor_manager = Arc::new(StreamActorManager {
1097 env: env.clone(),
1098 streaming_metrics,
1099 watermark_epoch,
1100 await_tree_reg,
1101 runtime: runtime.into(),
1102 });
1103 let worker = LocalBarrierWorker::new(actor_manager, vec![], "uninitialized".into());
1104 tokio::spawn(worker.run(actor_op_rx))
1105 }
1106}
1107
1108pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1109
1110impl<T> Clone for EventSender<T> {
1111 fn clone(&self) -> Self {
1112 Self(self.0.clone())
1113 }
1114}
1115
1116impl<T> EventSender<T> {
1117 pub(super) fn send_event(&self, event: T) {
1118 self.0.send(event).expect("should be able to send event")
1119 }
1120
1121 pub(super) async fn send_and_await<RSP>(
1122 &self,
1123 make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1124 ) -> StreamResult<RSP> {
1125 let (tx, rx) = oneshot::channel();
1126 let event = make_event(tx);
1127 self.send_event(event);
1128 rx.await
1129 .map_err(|_| anyhow!("barrier manager maybe reset").into())
1130 }
1131}
1132
1133pub(crate) enum NewOutputRequest {
1134 Local(permit::Sender),
1135 Remote(permit::Sender),
1136}
1137
1138#[cfg(test)]
1139pub(crate) mod barrier_test_utils {
1140 use assert_matches::assert_matches;
1141 use futures::StreamExt;
1142 use risingwave_pb::stream_service::streaming_control_stream_request::{
1143 InitRequest, PbDatabaseInitialPartialGraph, PbInitialPartialGraph,
1144 };
1145 use risingwave_pb::stream_service::{
1146 InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse,
1147 streaming_control_stream_request, streaming_control_stream_response,
1148 };
1149 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1150 use tokio::sync::oneshot;
1151 use tokio_stream::wrappers::UnboundedReceiverStream;
1152 use tonic::Status;
1153
1154 use crate::executor::Barrier;
1155 use crate::task::barrier_worker::{ControlStreamHandle, EventSender, LocalActorOperation};
1156 use crate::task::{
1157 ActorId, LocalBarrierManager, NewOutputRequest, TEST_DATABASE_ID, TEST_PARTIAL_GRAPH_ID,
1158 };
1159
1160 pub(crate) struct LocalBarrierTestEnv {
1161 pub local_barrier_manager: LocalBarrierManager,
1162 pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1163 pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1164 pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1165 }
1166
1167 impl LocalBarrierTestEnv {
1168 pub(crate) async fn for_test() -> Self {
1169 let actor_op_tx = LocalBarrierManager::spawn_for_test();
1170
1171 let (request_tx, request_rx) = unbounded_channel();
1172 let (response_tx, mut response_rx) = unbounded_channel();
1173
1174 actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1175 handle: ControlStreamHandle::new(
1176 response_tx,
1177 UnboundedReceiverStream::new(request_rx).boxed(),
1178 ),
1179 init_request: InitRequest {
1180 databases: vec![PbDatabaseInitialPartialGraph {
1181 database_id: TEST_DATABASE_ID.database_id,
1182 graphs: vec![PbInitialPartialGraph {
1183 partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1184 subscriptions: vec![],
1185 }],
1186 }],
1187 term_id: "for_test".into(),
1188 },
1189 });
1190
1191 assert_matches!(
1192 response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1193 streaming_control_stream_response::Response::Init(_)
1194 );
1195
1196 let local_barrier_manager = actor_op_tx
1197 .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1198 .await
1199 .unwrap();
1200
1201 Self {
1202 local_barrier_manager,
1203 actor_op_tx,
1204 request_tx,
1205 response_rx,
1206 }
1207 }
1208
1209 pub(crate) fn inject_barrier(
1210 &self,
1211 barrier: &Barrier,
1212 actor_to_collect: impl IntoIterator<Item = ActorId>,
1213 ) {
1214 self.request_tx
1215 .send(Ok(StreamingControlStreamRequest {
1216 request: Some(streaming_control_stream_request::Request::InjectBarrier(
1217 InjectBarrierRequest {
1218 request_id: "".to_owned(),
1219 barrier: Some(barrier.to_protobuf()),
1220 database_id: TEST_DATABASE_ID.database_id,
1221 actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1222 table_ids_to_sync: vec![],
1223 partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1224 actors_to_build: vec![],
1225 subscriptions_to_add: vec![],
1226 subscriptions_to_remove: vec![],
1227 },
1228 )),
1229 }))
1230 .unwrap();
1231 }
1232
1233 pub(crate) async fn flush_all_events(&self) {
1234 Self::flush_all_events_impl(&self.actor_op_tx).await
1235 }
1236
1237 pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1238 let (tx, rx) = oneshot::channel();
1239 actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1240 rx.await.unwrap()
1241 }
1242
1243 pub(crate) async fn take_pending_new_output_requests(
1244 &self,
1245 actor_id: ActorId,
1246 ) -> Vec<(ActorId, NewOutputRequest)> {
1247 self.actor_op_tx
1248 .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1249 .await
1250 .unwrap()
1251 }
1252 }
1253}