1use std::collections::hash_map::Entry;
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::future::{pending, poll_fn};
18use std::sync::Arc;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use await_tree::{InstrumentAwait, SpanExt};
23use futures::future::{BoxFuture, join_all};
24use futures::stream::{BoxStream, FuturesOrdered};
25use futures::{FutureExt, StreamExt, TryFutureExt};
26use itertools::Itertools;
27use risingwave_pb::stream_plan::barrier::BarrierKind;
28use risingwave_pb::stream_service::barrier_complete_response::{
29 PbCdcTableBackfillProgress, PbCreateMviewProgress, PbListFinishedSource, PbLoadFinishedSource,
30 PbLocalSstableInfo,
31};
32use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
33use risingwave_storage::store_impl::AsHummock;
34use thiserror_ext::AsReport;
35use tokio::select;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot;
38use tokio::task::JoinHandle;
39use tonic::{Code, Status};
40use tracing::warn;
41
42use self::managed_state::ManagedBarrierState;
43use crate::error::{ScoredStreamError, StreamError, StreamResult};
44#[cfg(test)]
45use crate::task::LocalBarrierManager;
46use crate::task::managed_state::BarrierToComplete;
47use crate::task::{
48 ActorId, AtomicU64Ref, PartialGraphId, StreamActorManager, StreamEnvironment, UpDownActorIds,
49};
50pub mod managed_state;
51#[cfg(test)]
52mod tests;
53
54use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
55use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
56use risingwave_pb::stream_service::streaming_control_stream_request::{
57 InitRequest, Request, ResetDatabaseRequest,
58};
59use risingwave_pb::stream_service::streaming_control_stream_response::{
60 InitResponse, ReportDatabaseFailureResponse, ResetDatabaseResponse, Response, ShutdownResponse,
61};
62use risingwave_pb::stream_service::{
63 BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
64 StreamingControlStreamResponse, streaming_control_stream_response,
65};
66
67use crate::executor::Barrier;
68use crate::executor::exchange::permit::Receiver;
69use crate::executor::monitor::StreamingMetrics;
70use crate::task::barrier_worker::managed_state::{
71 DatabaseManagedBarrierState, DatabaseStatus, ManagedBarrierStateDebugInfo,
72 ManagedBarrierStateEvent, PartialGraphManagedBarrierState, ResetDatabaseOutput,
73};
74
75pub const ENABLE_BARRIER_AGGREGATION: bool = false;
78
79#[derive(Debug)]
81pub struct BarrierCompleteResult {
82 pub sync_result: Option<SyncResult>,
84
85 pub create_mview_progress: Vec<PbCreateMviewProgress>,
87
88 pub list_finished_source_ids: Vec<PbListFinishedSource>,
90
91 pub load_finished_source_ids: Vec<PbLoadFinishedSource>,
93
94 pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
95
96 pub truncate_tables: Vec<TableId>,
98 pub refresh_finished_tables: Vec<TableId>,
100}
101
102pub(super) struct ControlStreamHandle {
106 #[expect(clippy::type_complexity)]
107 pair: Option<(
108 UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
109 BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
110 )>,
111}
112
113impl ControlStreamHandle {
114 fn empty() -> Self {
115 Self { pair: None }
116 }
117
118 pub(super) fn new(
119 sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
120 request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
121 ) -> Self {
122 Self {
123 pair: Some((sender, request_stream)),
124 }
125 }
126
127 pub(super) fn connected(&self) -> bool {
128 self.pair.is_some()
129 }
130
131 fn reset_stream_with_err(&mut self, err: Status) {
132 if let Some((sender, _)) = self.pair.take() {
133 let err = TonicStatusWrapper::new(err);
135 warn!(error = %err.as_report(), "control stream reset with error");
136
137 let err = err.into_inner();
138 if sender.send(Err(err)).is_err() {
139 warn!("failed to notify reset of control stream");
140 }
141 }
142 }
143
144 async fn shutdown_stream(&mut self) {
147 if let Some((sender, _)) = self.pair.take() {
148 if sender
149 .send(Ok(StreamingControlStreamResponse {
150 response: Some(streaming_control_stream_response::Response::Shutdown(
151 ShutdownResponse::default(),
152 )),
153 }))
154 .is_err()
155 {
156 warn!("failed to notify shutdown of control stream");
157 } else {
158 tracing::info!("waiting for meta service to close control stream...");
159
160 sender.closed().await;
167 }
168 } else {
169 debug!("control stream has been reset, ignore shutdown");
170 }
171 }
172
173 pub(super) fn ack_reset_database(
174 &mut self,
175 database_id: DatabaseId,
176 root_err: Option<ScoredStreamError>,
177 reset_request_id: u32,
178 ) {
179 self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
180 database_id,
181 root_err: root_err.map(|err| PbScoredError {
182 err_msg: err.error.to_report_string(),
183 score: err.score.0,
184 }),
185 reset_request_id,
186 }));
187 }
188
189 fn send_response(&mut self, response: streaming_control_stream_response::Response) {
190 if let Some((sender, _)) = self.pair.as_ref() {
191 if sender
192 .send(Ok(StreamingControlStreamResponse {
193 response: Some(response),
194 }))
195 .is_err()
196 {
197 self.pair = None;
198 warn!("fail to send response. control stream reset");
199 }
200 } else {
201 debug!(?response, "control stream has been reset. ignore response");
202 }
203 }
204
205 async fn next_request(&mut self) -> StreamingControlStreamRequest {
206 if let Some((_, stream)) = &mut self.pair {
207 match stream.next().await {
208 Some(Ok(request)) => {
209 return request;
210 }
211 Some(Err(e)) => self.reset_stream_with_err(
212 anyhow!(TonicStatusWrapper::new(e)) .context("failed to get request")
214 .to_status_unnamed(Code::Internal),
215 ),
216 None => self.reset_stream_with_err(Status::internal("end of stream")),
217 }
218 }
219 pending().await
220 }
221}
222
223#[derive(strum_macros::Display)]
227pub(super) enum LocalActorOperation {
228 NewControlStream {
229 handle: ControlStreamHandle,
230 init_request: InitRequest,
231 },
232 TakeReceiver {
233 database_id: DatabaseId,
234 term_id: String,
235 ids: UpDownActorIds,
236 result_sender: oneshot::Sender<StreamResult<Receiver>>,
237 },
238 #[cfg(test)]
239 GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
240 #[cfg(test)]
241 TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
242 #[cfg(test)]
243 Flush(oneshot::Sender<()>),
244 InspectState {
245 result_sender: oneshot::Sender<String>,
246 },
247 Shutdown {
248 result_sender: oneshot::Sender<()>,
249 },
250}
251
252pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
253 managed_barrier_state: HashMap<DatabaseId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
254 has_control_stream_connected: bool,
255}
256
257impl Display for LocalBarrierWorkerDebugInfo<'_> {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 writeln!(
260 f,
261 "\nhas_control_stream_connected: {}",
262 self.has_control_stream_connected
263 )?;
264
265 for (database_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
266 writeln!(
267 f,
268 "database {} status: {} managed_barrier_state:\n{}",
269 database_id,
270 status,
271 managed_barrier_state
272 .as_ref()
273 .map(ToString::to_string)
274 .unwrap_or_default()
275 )?;
276 }
277 Ok(())
278 }
279}
280
281pub(super) struct LocalBarrierWorker {
289 pub(super) state: ManagedBarrierState,
291
292 await_epoch_completed_futures: HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
294
295 control_stream_handle: ControlStreamHandle,
296
297 pub(super) actor_manager: Arc<StreamActorManager>,
298
299 pub(super) term_id: String,
300}
301
302impl LocalBarrierWorker {
303 pub(super) fn new(actor_manager: Arc<StreamActorManager>, term_id: String) -> Self {
304 Self {
305 state: Default::default(),
306 await_epoch_completed_futures: Default::default(),
307 control_stream_handle: ControlStreamHandle::empty(),
308 actor_manager,
309 term_id,
310 }
311 }
312
313 fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
314 LocalBarrierWorkerDebugInfo {
315 managed_barrier_state: self
316 .state
317 .databases
318 .iter()
319 .map(|(database_id, status)| {
320 (*database_id, {
321 match status {
322 DatabaseStatus::ReceivedExchangeRequest(_) => {
323 ("ReceivedExchangeRequest".to_owned(), None)
324 }
325 DatabaseStatus::Running(state) => {
326 ("running".to_owned(), Some(state.to_debug_info()))
327 }
328 DatabaseStatus::Suspended(state) => {
329 (format!("suspended: {:?}", state.suspend_time), None)
330 }
331 DatabaseStatus::Resetting(_) => ("resetting".to_owned(), None),
332 DatabaseStatus::Unspecified => {
333 unreachable!()
334 }
335 }
336 })
337 })
338 .collect(),
339 has_control_stream_connected: self.control_stream_handle.connected(),
340 }
341 }
342
343 async fn next_completed_epoch(
344 futures: &mut HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
345 ) -> (
346 DatabaseId,
347 PartialGraphId,
348 Barrier,
349 StreamResult<BarrierCompleteResult>,
350 ) {
351 poll_fn(|cx| {
352 for (database_id, futures) in &mut *futures {
353 if let Poll::Ready(Some((partial_graph_id, barrier, result))) =
354 futures.poll_next_unpin(cx)
355 {
356 return Poll::Ready((*database_id, partial_graph_id, barrier, result));
357 }
358 }
359 Poll::Pending
360 })
361 .await
362 }
363
364 async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
365 loop {
366 select! {
367 biased;
368 (database_id, event) = self.state.next_event() => {
369 match event {
370 ManagedBarrierStateEvent::BarrierCollected{
371 partial_graph_id,
372 barrier,
373 } => {
374 self.complete_barrier(database_id, partial_graph_id, barrier.epoch.prev);
377 }
378 ManagedBarrierStateEvent::ActorError{
379 actor_id,
380 err,
381 } => {
382 self.on_database_failure(database_id, Some(actor_id), err, "recv actor failure");
383 }
384 ManagedBarrierStateEvent::DatabaseReset(output, reset_request_id) => {
385 self.ack_database_reset(database_id, Some(output), reset_request_id);
386 }
387 }
388 }
389 (database_id, partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
390 match result {
391 Ok(result) => {
392 self.on_epoch_completed(database_id, partial_graph_id, barrier.epoch.prev, result);
393 }
394 Err(err) => {
395 self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {} {:?} {:?}", database_id, partial_graph_id.0, barrier.epoch, err.as_report())));
399 }
400 }
401 },
402 actor_op = actor_op_rx.recv() => {
403 if let Some(actor_op) = actor_op {
404 match actor_op {
405 LocalActorOperation::NewControlStream { handle, init_request } => {
406 self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
407 self.reset(init_request).await;
408 self.control_stream_handle = handle;
409 self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
410 }
411 LocalActorOperation::Shutdown { result_sender } => {
412 if self.state.databases.values().any(|database| {
413 match database {
414 DatabaseStatus::Running(database) => {
415 !database.actor_states.is_empty()
416 }
417 DatabaseStatus::Suspended(_) | DatabaseStatus::Resetting(_) |
418 DatabaseStatus::ReceivedExchangeRequest(_) => {
419 false
420 }
421 DatabaseStatus::Unspecified => {
422 unreachable!()
423 }
424 }
425 }) {
426 tracing::warn!(
427 "shutdown with running actors, scaling or migration will be triggered"
428 );
429 }
430 self.control_stream_handle.shutdown_stream().await;
431 let _ = result_sender.send(());
432 }
433 actor_op => {
434 self.handle_actor_op(actor_op);
435 }
436 }
437 }
438 else {
439 break;
440 }
441 },
442 request = self.control_stream_handle.next_request() => {
443 let result = self.handle_streaming_control_request(request.request.expect("non empty"));
444 if let Err((database_id, err)) = result {
445 self.on_database_failure(database_id, None, err, "failed to inject barrier");
446 }
447 },
448 }
449 }
450 }
451
452 fn handle_streaming_control_request(
453 &mut self,
454 request: Request,
455 ) -> Result<(), (DatabaseId, StreamError)> {
456 match request {
457 Request::InjectBarrier(req) => {
458 let database_id = req.database_id;
459 let result: StreamResult<()> = try {
460 let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
461 self.send_barrier(&barrier, req)?;
462 };
463 result.map_err(|e| (database_id, e))?;
464 Ok(())
465 }
466 Request::RemovePartialGraph(req) => {
467 self.remove_partial_graphs(
468 req.database_id,
469 req.partial_graph_ids.into_iter().map(PartialGraphId::new),
470 );
471 Ok(())
472 }
473 Request::CreatePartialGraph(req) => {
474 self.add_partial_graph(req.database_id, PartialGraphId::new(req.partial_graph_id));
475 Ok(())
476 }
477 Request::ResetDatabase(req) => {
478 self.reset_database(req);
479 Ok(())
480 }
481 Request::Init(_) => {
482 unreachable!()
483 }
484 }
485 }
486
487 fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
488 match actor_op {
489 LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
490 unreachable!("event {actor_op} should be handled separately in async context")
491 }
492 LocalActorOperation::TakeReceiver {
493 database_id,
494 term_id,
495 ids,
496 result_sender,
497 } => {
498 let err = if self.term_id != term_id {
499 {
500 warn!(
501 ?ids,
502 term_id,
503 current_term_id = self.term_id,
504 "take receiver on unmatched term_id"
505 );
506 anyhow!(
507 "take receiver {:?} on unmatched term_id {} to current term_id {}",
508 ids,
509 term_id,
510 self.term_id
511 )
512 }
513 } else {
514 match self.state.databases.entry(database_id) {
515 Entry::Occupied(mut entry) => match entry.get_mut() {
516 DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
517 pending_requests.push((ids, result_sender));
518 return;
519 }
520 DatabaseStatus::Running(database) => {
521 let (upstream_actor_id, actor_id) = ids;
522 database.new_actor_remote_output_request(
523 actor_id,
524 upstream_actor_id,
525 result_sender,
526 );
527 return;
528 }
529 DatabaseStatus::Suspended(_) => {
530 anyhow!("database suspended")
531 }
532 DatabaseStatus::Resetting(_) => {
533 anyhow!("database resetting")
534 }
535 DatabaseStatus::Unspecified => {
536 unreachable!()
537 }
538 },
539 Entry::Vacant(entry) => {
540 entry.insert(DatabaseStatus::ReceivedExchangeRequest(vec![(
541 ids,
542 result_sender,
543 )]));
544 return;
545 }
546 }
547 };
548 let _ = result_sender.send(Err(err.into()));
549 }
550 #[cfg(test)]
551 LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
552 let database_status = self
553 .state
554 .databases
555 .get(&crate::task::TEST_DATABASE_ID)
556 .unwrap();
557 let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
558 let _ = sender.send(database_state.local_barrier_manager.clone());
559 }
560 #[cfg(test)]
561 LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
562 let database_status = self
563 .state
564 .databases
565 .get_mut(&crate::task::TEST_DATABASE_ID)
566 .unwrap();
567
568 let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
569 assert!(!database_state.actor_states.contains_key(&actor_id));
570 let requests = database_state
571 .actor_pending_new_output_requests
572 .remove(&actor_id)
573 .unwrap();
574 let _ = sender.send(requests);
575 }
576 #[cfg(test)]
577 LocalActorOperation::Flush(sender) => {
578 use futures::FutureExt;
579 while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
580 self.handle_streaming_control_request(
581 request.request.expect("should not be empty"),
582 )
583 .unwrap();
584 }
585 while let Some((database_id, event)) = self.state.next_event().now_or_never() {
586 match event {
587 ManagedBarrierStateEvent::BarrierCollected {
588 partial_graph_id,
589 barrier,
590 } => {
591 self.complete_barrier(
592 database_id,
593 partial_graph_id,
594 barrier.epoch.prev,
595 );
596 }
597 ManagedBarrierStateEvent::ActorError { .. }
598 | ManagedBarrierStateEvent::DatabaseReset(..) => {
599 unreachable!()
600 }
601 }
602 }
603 sender.send(()).unwrap()
604 }
605 LocalActorOperation::InspectState { result_sender } => {
606 let debug_info = self.to_debug_info();
607 let _ = result_sender.send(debug_info.to_string());
608 }
609 }
610 }
611}
612
613mod await_epoch_completed_future {
614 use std::future::Future;
615
616 use futures::FutureExt;
617 use futures::future::BoxFuture;
618 use risingwave_common::id::TableId;
619 use risingwave_hummock_sdk::SyncResult;
620 use risingwave_pb::stream_service::barrier_complete_response::{
621 PbCdcTableBackfillProgress, PbCreateMviewProgress, PbListFinishedSource,
622 PbLoadFinishedSource,
623 };
624
625 use crate::error::StreamResult;
626 use crate::executor::Barrier;
627 use crate::task::{BarrierCompleteResult, PartialGraphId, await_tree_key};
628
629 pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
630 + 'static;
631
632 #[define_opaque(AwaitEpochCompletedFuture)]
633 #[expect(clippy::too_many_arguments)]
634 pub(super) fn instrument_complete_barrier_future(
635 partial_graph_id: PartialGraphId,
636 complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
637 barrier: Barrier,
638 barrier_await_tree_reg: Option<&await_tree::Registry>,
639 create_mview_progress: Vec<PbCreateMviewProgress>,
640 list_finished_source_ids: Vec<PbListFinishedSource>,
641 load_finished_source_ids: Vec<PbLoadFinishedSource>,
642 cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
643 truncate_tables: Vec<TableId>,
644 refresh_finished_tables: Vec<TableId>,
645 ) -> AwaitEpochCompletedFuture {
646 let prev_epoch = barrier.epoch.prev;
647 let future = async move {
648 if let Some(future) = complete_barrier_future {
649 let result = future.await;
650 result.map(Some)
651 } else {
652 Ok(None)
653 }
654 }
655 .map(move |result| {
656 (
657 partial_graph_id,
658 barrier,
659 result.map(|sync_result| BarrierCompleteResult {
660 sync_result,
661 create_mview_progress,
662 list_finished_source_ids,
663 load_finished_source_ids,
664 cdc_table_backfill_progress,
665 truncate_tables,
666 refresh_finished_tables,
667 }),
668 )
669 });
670 if let Some(reg) = barrier_await_tree_reg {
671 reg.register(
672 await_tree_key::BarrierAwait { prev_epoch },
673 format!("SyncEpoch({})", prev_epoch),
674 )
675 .instrument(future)
676 .left_future()
677 } else {
678 future.right_future()
679 }
680 }
681}
682
683use await_epoch_completed_future::*;
684use risingwave_common::catalog::{DatabaseId, TableId};
685use risingwave_pb::hummock::vector_index_delta::PbVectorIndexAdds;
686use risingwave_storage::{StateStoreImpl, dispatch_state_store};
687
688use crate::executor::exchange::permit;
689
690fn sync_epoch(
691 state_store: &StateStoreImpl,
692 streaming_metrics: &StreamingMetrics,
693 prev_epoch: u64,
694 table_ids: HashSet<TableId>,
695) -> BoxFuture<'static, StreamResult<SyncResult>> {
696 let timer = streaming_metrics.barrier_sync_latency.start_timer();
697
698 let state_store = state_store.clone();
699 let future = async move {
700 dispatch_state_store!(state_store, hummock, {
701 hummock.sync(vec![(prev_epoch, table_ids)]).await
702 })
703 };
704
705 future
706 .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
707 .inspect_ok(move |_| {
708 timer.observe_duration();
709 })
710 .map_err(move |e| {
711 tracing::error!(
712 prev_epoch,
713 error = %e.as_report(),
714 "Failed to sync state store",
715 );
716 e.into()
717 })
718 .boxed()
719}
720
721impl LocalBarrierWorker {
722 fn complete_barrier(
723 &mut self,
724 database_id: DatabaseId,
725 partial_graph_id: PartialGraphId,
726 prev_epoch: u64,
727 ) {
728 {
729 let Some(database_state) = self
730 .state
731 .databases
732 .get_mut(&database_id)
733 .expect("should exist")
734 .state_for_request()
735 else {
736 return;
737 };
738 let BarrierToComplete {
739 barrier,
740 table_ids,
741 create_mview_progress,
742 list_finished_source_ids,
743 load_finished_source_ids,
744 cdc_table_backfill_progress,
745 truncate_tables,
746 refresh_finished_tables,
747 } = database_state.pop_barrier_to_complete(partial_graph_id, prev_epoch);
748
749 let complete_barrier_future = match &barrier.kind {
750 BarrierKind::Unspecified => unreachable!(),
751 BarrierKind::Initial => {
752 tracing::info!(
753 epoch = prev_epoch,
754 "ignore sealing data for the first barrier"
755 );
756 tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
757 None
758 }
759 BarrierKind::Barrier => None,
760 BarrierKind::Checkpoint => Some(sync_epoch(
761 &self.actor_manager.env.state_store(),
762 &self.actor_manager.streaming_metrics,
763 prev_epoch,
764 table_ids.expect("should be Some on BarrierKind::Checkpoint"),
765 )),
766 };
767
768 self.await_epoch_completed_futures
769 .entry(database_id)
770 .or_default()
771 .push_back({
772 instrument_complete_barrier_future(
773 partial_graph_id,
774 complete_barrier_future,
775 barrier,
776 self.actor_manager.await_tree_reg.as_ref(),
777 create_mview_progress,
778 list_finished_source_ids,
779 load_finished_source_ids,
780 cdc_table_backfill_progress,
781 truncate_tables,
782 refresh_finished_tables,
783 )
784 });
785 }
786 }
787
788 fn on_epoch_completed(
789 &mut self,
790 database_id: DatabaseId,
791 partial_graph_id: PartialGraphId,
792 epoch: u64,
793 result: BarrierCompleteResult,
794 ) {
795 let BarrierCompleteResult {
796 create_mview_progress,
797 sync_result,
798 list_finished_source_ids,
799 load_finished_source_ids,
800 cdc_table_backfill_progress,
801 truncate_tables,
802 refresh_finished_tables,
803 } = result;
804
805 let (synced_sstables, table_watermarks, old_value_ssts, vector_index_adds) = sync_result
806 .map(|sync_result| {
807 (
808 sync_result.uncommitted_ssts,
809 sync_result.table_watermarks,
810 sync_result.old_value_ssts,
811 sync_result.vector_index_adds,
812 )
813 })
814 .unwrap_or_default();
815
816 let result = {
817 {
818 streaming_control_stream_response::Response::CompleteBarrier(
819 BarrierCompleteResponse {
820 request_id: "todo".to_owned(),
821 partial_graph_id: partial_graph_id.into(),
822 epoch,
823 status: None,
824 create_mview_progress,
825 synced_sstables: synced_sstables
826 .into_iter()
827 .map(
828 |LocalSstableInfo {
829 sst_info,
830 table_stats,
831 created_at,
832 }| PbLocalSstableInfo {
833 sst: Some(sst_info.into()),
834 table_stats_map: to_prost_table_stats_map(table_stats),
835 created_at,
836 },
837 )
838 .collect_vec(),
839 worker_id: self.actor_manager.env.worker_id(),
840 table_watermarks: table_watermarks
841 .into_iter()
842 .map(|(key, value)| (key, value.into()))
843 .collect(),
844 old_value_sstables: old_value_ssts
845 .into_iter()
846 .map(|sst| sst.sst_info.into())
847 .collect(),
848 database_id,
849 list_finished_sources: list_finished_source_ids,
850 load_finished_sources: load_finished_source_ids,
851 vector_index_adds: vector_index_adds
852 .into_iter()
853 .map(|(table_id, adds)| {
854 (
855 table_id,
856 PbVectorIndexAdds {
857 adds: adds.into_iter().map(|add| add.into()).collect(),
858 },
859 )
860 })
861 .collect(),
862 cdc_table_backfill_progress,
863 truncate_tables,
864 refresh_finished_tables,
865 },
866 )
867 }
868 };
869
870 self.control_stream_handle.send_response(result);
871 }
872
873 fn send_barrier(
880 &mut self,
881 barrier: &Barrier,
882 request: InjectBarrierRequest,
883 ) -> StreamResult<()> {
884 debug!(
885 target: "events::stream::barrier::manager::send",
886 "send barrier {:?}, actor_ids_to_collect = {:?}",
887 barrier,
888 request.actor_ids_to_collect
889 );
890
891 let database_status = self
892 .state
893 .databases
894 .get_mut(&request.database_id)
895 .expect("should exist");
896 if let Some(state) = database_status.state_for_request() {
897 state.transform_to_issued(barrier, request)?;
898 }
899 Ok(())
900 }
901
902 fn remove_partial_graphs(
903 &mut self,
904 database_id: DatabaseId,
905 partial_graph_ids: impl Iterator<Item = PartialGraphId>,
906 ) {
907 let Some(database_status) = self.state.databases.get_mut(&database_id) else {
908 warn!(
909 %database_id,
910 "database to remove partial graph not exist"
911 );
912 return;
913 };
914 let Some(database_state) = database_status.state_for_request() else {
915 warn!(
916 %database_id,
917 "ignore remove partial graph request on err database",
918 );
919 return;
920 };
921 for partial_graph_id in partial_graph_ids {
922 if let Some(graph) = database_state.graph_states.remove(&partial_graph_id) {
923 assert!(
924 graph.is_empty(),
925 "non empty graph to be removed: {}",
926 &graph
927 );
928 } else {
929 warn!(
930 partial_graph_id = partial_graph_id.0,
931 "no partial graph to remove"
932 );
933 }
934 }
935 }
936
937 fn add_partial_graph(&mut self, database_id: DatabaseId, partial_graph_id: PartialGraphId) {
938 let status = match self.state.databases.entry(database_id) {
939 Entry::Occupied(entry) => {
940 let status = entry.into_mut();
941 if let DatabaseStatus::ReceivedExchangeRequest(pending_requests) = status {
942 let mut database = DatabaseManagedBarrierState::new(
943 database_id,
944 self.term_id.clone(),
945 self.actor_manager.clone(),
946 );
947 for ((upstream_actor_id, actor_id), result_sender) in pending_requests.drain(..)
948 {
949 database.new_actor_remote_output_request(
950 actor_id,
951 upstream_actor_id,
952 result_sender,
953 );
954 }
955 *status = DatabaseStatus::Running(database);
956 }
957
958 status
959 }
960 Entry::Vacant(entry) => {
961 entry.insert(DatabaseStatus::Running(DatabaseManagedBarrierState::new(
962 database_id,
963 self.term_id.clone(),
964 self.actor_manager.clone(),
965 )))
966 }
967 };
968 if let Some(state) = status.state_for_request() {
969 assert!(
970 state
971 .graph_states
972 .insert(
973 partial_graph_id,
974 PartialGraphManagedBarrierState::new(&self.actor_manager)
975 )
976 .is_none()
977 );
978 }
979 }
980
981 fn reset_database(&mut self, req: ResetDatabaseRequest) {
982 let database_id = req.database_id;
983 if let Some(database_status) = self.state.databases.get_mut(&database_id) {
984 database_status.start_reset(
985 database_id,
986 self.await_epoch_completed_futures.remove(&database_id),
987 req.reset_request_id,
988 );
989 } else {
990 self.ack_database_reset(database_id, None, req.reset_request_id);
991 }
992 }
993
994 fn ack_database_reset(
995 &mut self,
996 database_id: DatabaseId,
997 reset_output: Option<ResetDatabaseOutput>,
998 reset_request_id: u32,
999 ) {
1000 info!(
1001 %database_id,
1002 "database reset successfully"
1003 );
1004 if let Some(reset_database) = self.state.databases.remove(&database_id) {
1005 match reset_database {
1006 DatabaseStatus::Resetting(_) => {}
1007 _ => {
1008 unreachable!("must be resetting previously")
1009 }
1010 }
1011 }
1012 self.await_epoch_completed_futures.remove(&database_id);
1013 self.control_stream_handle.ack_reset_database(
1014 database_id,
1015 reset_output.and_then(|output| output.root_err),
1016 reset_request_id,
1017 );
1018 }
1019
1020 fn on_database_failure(
1024 &mut self,
1025 database_id: DatabaseId,
1026 failed_actor: Option<ActorId>,
1027 err: StreamError,
1028 message: impl Into<String>,
1029 ) {
1030 let message = message.into();
1031 error!(%database_id, ?failed_actor, message, err = ?err.as_report(), "suspend database on error");
1032 let completing_futures = self.await_epoch_completed_futures.remove(&database_id);
1033 self.state
1034 .databases
1035 .get_mut(&database_id)
1036 .expect("should exist")
1037 .suspend(failed_actor, err, completing_futures);
1038 self.control_stream_handle
1039 .send_response(Response::ReportDatabaseFailure(
1040 ReportDatabaseFailureResponse { database_id },
1041 ));
1042 }
1043
1044 async fn reset(&mut self, init_request: InitRequest) {
1046 join_all(
1047 self.state
1048 .databases
1049 .values_mut()
1050 .map(|database| database.abort()),
1051 )
1052 .await;
1053 if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
1054 m.clear();
1055 }
1056
1057 if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
1058 hummock
1059 .clear_shared_buffer()
1060 .instrument_await("store_clear_shared_buffer".verbose())
1061 .await
1062 }
1063 self.actor_manager.env.dml_manager_ref().clear();
1064 *self = Self::new(self.actor_manager.clone(), init_request.term_id);
1065 self.actor_manager.env.client_pool().invalidate_all();
1066 }
1067
1068 pub fn spawn(
1070 env: StreamEnvironment,
1071 streaming_metrics: Arc<StreamingMetrics>,
1072 await_tree_reg: Option<await_tree::Registry>,
1073 watermark_epoch: AtomicU64Ref,
1074 actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1075 ) -> JoinHandle<()> {
1076 let runtime = {
1077 let mut builder = tokio::runtime::Builder::new_multi_thread();
1078 if let Some(worker_threads_num) = env.global_config().actor_runtime_worker_threads_num {
1079 builder.worker_threads(worker_threads_num);
1080 }
1081 builder
1082 .thread_name("rw-streaming")
1083 .enable_all()
1084 .build()
1085 .unwrap()
1086 };
1087
1088 let actor_manager = Arc::new(StreamActorManager {
1089 env,
1090 streaming_metrics,
1091 watermark_epoch,
1092 await_tree_reg,
1093 runtime: runtime.into(),
1094 });
1095 let worker = LocalBarrierWorker::new(actor_manager, "uninitialized".into());
1096 tokio::spawn(worker.run(actor_op_rx))
1097 }
1098}
1099
1100pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1101
1102impl<T> Clone for EventSender<T> {
1103 fn clone(&self) -> Self {
1104 Self(self.0.clone())
1105 }
1106}
1107
1108impl<T> EventSender<T> {
1109 pub(super) fn send_event(&self, event: T) {
1110 self.0.send(event).expect("should be able to send event")
1111 }
1112
1113 pub(super) async fn send_and_await<RSP>(
1114 &self,
1115 make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1116 ) -> StreamResult<RSP> {
1117 let (tx, rx) = oneshot::channel();
1118 let event = make_event(tx);
1119 self.send_event(event);
1120 rx.await
1121 .map_err(|_| anyhow!("barrier manager maybe reset").into())
1122 }
1123}
1124
1125pub(crate) enum NewOutputRequest {
1126 Local(permit::Sender),
1127 Remote(permit::Sender),
1128}
1129
1130#[cfg(test)]
1131pub(crate) mod barrier_test_utils {
1132 use assert_matches::assert_matches;
1133 use futures::StreamExt;
1134 use risingwave_pb::stream_service::streaming_control_stream_request::{
1135 InitRequest, PbCreatePartialGraphRequest,
1136 };
1137 use risingwave_pb::stream_service::{
1138 InjectBarrierRequest, PbStreamingControlStreamRequest, StreamingControlStreamRequest,
1139 StreamingControlStreamResponse, streaming_control_stream_request,
1140 streaming_control_stream_response,
1141 };
1142 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1143 use tokio::sync::oneshot;
1144 use tokio_stream::wrappers::UnboundedReceiverStream;
1145 use tonic::Status;
1146
1147 use crate::executor::Barrier;
1148 use crate::task::barrier_worker::{ControlStreamHandle, EventSender, LocalActorOperation};
1149 use crate::task::{
1150 ActorId, LocalBarrierManager, NewOutputRequest, TEST_DATABASE_ID, TEST_PARTIAL_GRAPH_ID,
1151 };
1152
1153 pub(crate) struct LocalBarrierTestEnv {
1154 pub local_barrier_manager: LocalBarrierManager,
1155 pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1156 pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1157 pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1158 }
1159
1160 impl LocalBarrierTestEnv {
1161 pub(crate) async fn for_test() -> Self {
1162 let actor_op_tx = LocalBarrierManager::spawn_for_test();
1163
1164 let (request_tx, request_rx) = unbounded_channel();
1165 let (response_tx, mut response_rx) = unbounded_channel();
1166
1167 request_tx
1168 .send(Ok(PbStreamingControlStreamRequest {
1169 request: Some(
1170 streaming_control_stream_request::Request::CreatePartialGraph(
1171 PbCreatePartialGraphRequest {
1172 partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1173 database_id: TEST_DATABASE_ID,
1174 },
1175 ),
1176 ),
1177 }))
1178 .unwrap();
1179
1180 actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1181 handle: ControlStreamHandle::new(
1182 response_tx,
1183 UnboundedReceiverStream::new(request_rx).boxed(),
1184 ),
1185 init_request: InitRequest {
1186 term_id: "for_test".into(),
1187 },
1188 });
1189
1190 assert_matches!(
1191 response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1192 streaming_control_stream_response::Response::Init(_)
1193 );
1194
1195 let local_barrier_manager = actor_op_tx
1196 .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1197 .await
1198 .unwrap();
1199
1200 Self {
1201 local_barrier_manager,
1202 actor_op_tx,
1203 request_tx,
1204 response_rx,
1205 }
1206 }
1207
1208 pub(crate) fn inject_barrier(
1209 &self,
1210 barrier: &Barrier,
1211 actor_to_collect: impl IntoIterator<Item = ActorId>,
1212 ) {
1213 self.request_tx
1214 .send(Ok(StreamingControlStreamRequest {
1215 request: Some(streaming_control_stream_request::Request::InjectBarrier(
1216 InjectBarrierRequest {
1217 request_id: "".to_owned(),
1218 barrier: Some(barrier.to_protobuf()),
1219 database_id: TEST_DATABASE_ID,
1220 actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1221 table_ids_to_sync: vec![],
1222 partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1223 actors_to_build: vec![],
1224 },
1225 )),
1226 }))
1227 .unwrap();
1228 }
1229
1230 pub(crate) async fn flush_all_events(&self) {
1231 Self::flush_all_events_impl(&self.actor_op_tx).await
1232 }
1233
1234 pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1235 let (tx, rx) = oneshot::channel();
1236 actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1237 rx.await.unwrap()
1238 }
1239
1240 pub(crate) async fn take_pending_new_output_requests(
1241 &self,
1242 actor_id: ActorId,
1243 ) -> Vec<(ActorId, NewOutputRequest)> {
1244 self.actor_op_tx
1245 .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1246 .await
1247 .unwrap()
1248 }
1249 }
1250}