1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::bail;
30use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
31use risingwave_common::id::JobId;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
34use risingwave_common::util::tracing::TracingContext;
35use risingwave_connector::source::SplitImpl;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::common::{HostAddress, WorkerNode};
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::id::PartialGraphId;
40use risingwave_pb::source::{PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration};
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::stream_node::NodeBody;
43use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
44use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
45use risingwave_pb::stream_service::inject_barrier_request::{
46 BuildActorInfo, FragmentBuildActorInfo,
47};
48use risingwave_pb::stream_service::streaming_control_stream_request::{
49 CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
50 RemovePartialGraphRequest, ResetPartialGraphsRequest,
51};
52use risingwave_pb::stream_service::{
53 InjectBarrierRequest, StreamingControlStreamRequest, streaming_control_stream_request,
54 streaming_control_stream_response,
55};
56use risingwave_rpc_client::StreamingControlHandle;
57use thiserror_ext::AsReport;
58use tokio::time::{Instant, sleep};
59use tokio_retry::strategy::ExponentialBackoff;
60use tracing::{debug, error, info, warn};
61use uuid::Uuid;
62
63use super::{BarrierKind, TracedEpoch};
64use crate::barrier::BackfillOrderState;
65use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
66use crate::barrier::cdc_progress::CdcTableBackfillTracker;
67use crate::barrier::checkpoint::{
68 BarrierWorkerState, BatchRefreshJobCheckpointControl, BatchRefreshRenderResult,
69 CreatingStreamingJobControl, DatabaseCheckpointControl, DatabaseCheckpointControlMetrics,
70 IndependentCheckpointJobControl,
71};
72use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
73use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
74use crate::barrier::info::{
75 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
76 SubscriberType,
77};
78use crate::barrier::partial_graph::PartialGraphRecoverer;
79use crate::barrier::progress::CreateMviewProgressTracker;
80use crate::barrier::utils::NodeToCollect;
81use crate::controller::fragment::InflightFragmentInfo;
82use crate::controller::utils::StreamingJobExtraInfo;
83use crate::manager::MetaSrvEnv;
84use crate::model::{
85 ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
86 SubscriptionId,
87};
88use crate::stream::cdc::{
89 CdcTableSnapshotSplits, is_parallelized_backfill_enabled_cdc_scan_fragment,
90};
91use crate::stream::{
92 ExtendedFragmentBackfillOrder, StreamFragmentGraph, UserDefinedFragmentBackfillOrder,
93 build_actor_connector_splits,
94};
95use crate::{MetaError, MetaResult};
96
97pub(super) fn to_partial_graph_id(
98 database_id: DatabaseId,
99 creating_job_id: Option<JobId>,
100) -> PartialGraphId {
101 let raw_job_id = creating_job_id
102 .map(|job_id| {
103 assert_ne!(job_id, u32::MAX);
104 job_id.as_raw_id()
105 })
106 .unwrap_or(u32::MAX);
107 (((database_id.as_raw_id() as u64) << 32) | (raw_job_id as u64)).into()
108}
109
110pub(super) fn from_partial_graph_id(
111 partial_graph_id: PartialGraphId,
112) -> (DatabaseId, Option<JobId>) {
113 let id = partial_graph_id.as_raw_id();
114 let database_id = (id >> 32) as u32;
115 let raw_creating_job_id = (id & ((1 << 32) - 1)) as u32;
116 let creating_job_id = if raw_creating_job_id == u32::MAX {
117 None
118 } else {
119 Some(JobId::new(raw_creating_job_id))
120 };
121 (database_id.into(), creating_job_id)
122}
123
124pub(super) fn build_locality_fragment_state_table_mapping(
125 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
126) -> HashMap<FragmentId, Vec<TableId>> {
127 let mut mapping = HashMap::new();
128
129 for (fragment_id, fragment_info) in fragment_infos {
130 let mut state_table_ids = Vec::new();
131 visit_stream_node_cont(&fragment_info.nodes, |stream_node| {
132 if let Some(NodeBody::LocalityProvider(locality_provider)) =
133 stream_node.node_body.as_ref()
134 {
135 let state_table_id = locality_provider
136 .state_table
137 .as_ref()
138 .expect("must have state table")
139 .id;
140 state_table_ids.push(state_table_id);
141 false
142 } else {
143 true
144 }
145 });
146 if !state_table_ids.is_empty() {
147 mapping.insert(*fragment_id, state_table_ids);
148 }
149 }
150
151 mapping
152}
153
154pub(super) fn database_partial_graphs<'a>(
155 database_id: DatabaseId,
156 creating_jobs: impl Iterator<Item = JobId> + Sized + 'a,
157) -> impl Iterator<Item = PartialGraphId> + 'a {
158 creating_jobs
159 .map(Some)
160 .chain([None])
161 .map(move |creating_job_id| to_partial_graph_id(database_id, creating_job_id))
162}
163
164struct ControlStreamNode {
165 worker_id: WorkerId,
166 host: HostAddress,
167 handle: StreamingControlHandle,
168}
169
170enum WorkerNodeState {
171 Connected {
172 control_stream: ControlStreamNode,
173 removed: bool,
174 },
175 Reconnecting(BoxFuture<'static, StreamingControlHandle>),
176}
177
178pub(super) struct ControlStreamManager {
179 workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
180 pub env: MetaSrvEnv,
181}
182
183impl ControlStreamManager {
184 pub(super) fn new(env: MetaSrvEnv) -> Self {
185 Self {
186 workers: Default::default(),
187 env,
188 }
189 }
190
191 pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
192 self.workers[&worker_id].0.host.clone().unwrap()
193 }
194
195 pub(super) async fn add_worker(
196 &mut self,
197 node: WorkerNode,
198 partial_graphs: impl Iterator<Item = PartialGraphId>,
199 term_id: &String,
200 context: Arc<impl GlobalBarrierWorkerContext>,
201 ) {
202 let node_id = node.id;
203 if let Entry::Occupied(entry) = self.workers.entry(node_id) {
204 let (existing_node, worker_state) = entry.get();
205 assert_eq!(existing_node.host, node.host);
206 warn!(id = %node.id, host = ?node.host, "node already exists");
207 match worker_state {
208 WorkerNodeState::Connected { .. } => {
209 warn!(id = %node.id, host = ?node.host, "new node already connected");
210 return;
211 }
212 WorkerNodeState::Reconnecting(_) => {
213 warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
214 entry.remove();
215 }
216 }
217 }
218 let node_host = node.host.clone().unwrap();
219 let mut backoff = ExponentialBackoff::from_millis(100)
220 .max_delay(Duration::from_secs(3))
221 .factor(5);
222 const MAX_RETRY: usize = 5;
223 for i in 1..=MAX_RETRY {
224 match context
225 .new_control_stream(
226 &node,
227 &PbInitRequest {
228 term_id: term_id.clone(),
229 },
230 )
231 .await
232 {
233 Ok(mut handle) => {
234 WorkerNodeConnected {
235 handle: &mut handle,
236 node: &node,
237 }
238 .initialize(partial_graphs);
239 info!(?node_host, "add control stream worker");
240 assert!(
241 self.workers
242 .insert(
243 node_id,
244 (
245 node,
246 WorkerNodeState::Connected {
247 control_stream: ControlStreamNode {
248 worker_id: node_id as _,
249 host: node_host,
250 handle,
251 },
252 removed: false
253 }
254 )
255 )
256 .is_none()
257 );
258 return;
259 }
260 Err(e) => {
261 let delay = backoff.next().unwrap();
264 error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
265 sleep(delay).await;
266 }
267 }
268 }
269 error!(?node_host, "fail to create worker node after retry");
270 assert!(
271 self.workers
272 .insert(
273 node_id,
274 (
275 node.clone(),
276 WorkerNodeState::Reconnecting(ControlStreamManager::retry_connect(
277 node,
278 term_id.to_owned(),
279 context,
280 ))
281 )
282 )
283 .is_none()
284 );
285 }
286
287 pub(super) fn remove_worker(&mut self, node: WorkerNode) {
288 if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
289 let (_, worker_state) = entry.get_mut();
290 match worker_state {
291 WorkerNodeState::Connected { removed, .. } => {
292 info!(worker_id = %node.id, "mark connected worker as removed");
293 *removed = true;
294 }
295 WorkerNodeState::Reconnecting(_) => {
296 info!(worker_id = %node.id, "remove worker");
297 entry.remove();
298 }
299 }
300 }
301 }
302
303 fn retry_connect(
304 node: WorkerNode,
305 term_id: String,
306 context: Arc<impl GlobalBarrierWorkerContext>,
307 ) -> BoxFuture<'static, StreamingControlHandle> {
308 async move {
309 let mut attempt = 0;
310 let backoff = ExponentialBackoff::from_millis(100)
311 .max_delay(Duration::from_mins(1))
312 .factor(5);
313 let init_request = PbInitRequest { term_id };
314 for delay in backoff {
315 attempt += 1;
316 sleep(delay).await;
317 match context.new_control_stream(&node, &init_request).await {
318 Ok(handle) => {
319 return handle;
320 }
321 Err(e) => {
322 warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
323 }
324 }
325 }
326 unreachable!("end of retry backoff")
327 }.boxed()
328 }
329
330 pub(super) async fn recover(
331 env: MetaSrvEnv,
332 nodes: &HashMap<WorkerId, WorkerNode>,
333 term_id: &str,
334 context: Arc<impl GlobalBarrierWorkerContext>,
335 ) -> Self {
336 let reset_start_time = Instant::now();
337 let init_request = PbInitRequest {
338 term_id: term_id.to_owned(),
339 };
340 let init_request = &init_request;
341 let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
342 let result = context.new_control_stream(node, init_request).await;
343 (*worker_id, node.clone(), result)
344 }))
345 .await;
346 let mut unconnected_workers = HashSet::new();
347 let mut workers = HashMap::new();
348 for (worker_id, node, result) in nodes {
349 match result {
350 Ok(handle) => {
351 let control_stream = ControlStreamNode {
352 worker_id: node.id,
353 host: node.host.clone().unwrap(),
354 handle,
355 };
356 assert!(
357 workers
358 .insert(
359 worker_id,
360 (
361 node,
362 WorkerNodeState::Connected {
363 control_stream,
364 removed: false
365 }
366 )
367 )
368 .is_none()
369 );
370 }
371 Err(e) => {
372 unconnected_workers.insert(worker_id);
373 warn!(
374 e = %e.as_report(),
375 %worker_id,
376 ?node,
377 "failed to connect to node"
378 );
379 assert!(
380 workers
381 .insert(
382 worker_id,
383 (
384 node.clone(),
385 WorkerNodeState::Reconnecting(Self::retry_connect(
386 node,
387 term_id.to_owned(),
388 context.clone()
389 ))
390 )
391 )
392 .is_none()
393 );
394 }
395 }
396 }
397
398 info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
399
400 Self { workers, env }
401 }
402
403 pub(super) fn clear(&mut self) {
405 *self = Self::new(self.env.clone());
406 }
407}
408
409pub(super) struct WorkerNodeConnected<'a> {
410 node: &'a WorkerNode,
411 handle: &'a mut StreamingControlHandle,
412}
413
414impl<'a> WorkerNodeConnected<'a> {
415 pub(super) fn initialize(self, partial_graphs: impl Iterator<Item = PartialGraphId>) {
416 for partial_graph_id in partial_graphs {
417 if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
418 request: Some(
419 streaming_control_stream_request::Request::CreatePartialGraph(
420 PbCreatePartialGraphRequest { partial_graph_id },
421 ),
422 ),
423 }) {
424 warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
425 }
426 }
427 }
428}
429
430pub(super) enum WorkerNodeEvent<'a> {
431 Response(MetaResult<streaming_control_stream_response::Response>),
432 Connected(WorkerNodeConnected<'a>),
433}
434
435impl ControlStreamManager {
436 fn poll_next_event<'a>(
437 this_opt: &mut Option<&'a mut Self>,
438 cx: &mut Context<'_>,
439 term_id: &str,
440 context: &Arc<impl GlobalBarrierWorkerContext>,
441 poll_reconnect: bool,
442 ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
443 let this = this_opt.as_mut().expect("Future polled after completion");
444 if this.workers.is_empty() {
445 return Poll::Pending;
446 }
447 {
448 for (&worker_id, (node, worker_state)) in &mut this.workers {
449 let control_stream = match worker_state {
450 WorkerNodeState::Connected { control_stream, .. } => control_stream,
451 WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
452 continue;
453 }
454 WorkerNodeState::Reconnecting(join_handle) => {
455 match join_handle.poll_unpin(cx) {
456 Poll::Ready(handle) => {
457 info!(id=%node.id, host=?node.host, "reconnected to worker");
458 *worker_state = WorkerNodeState::Connected {
459 control_stream: ControlStreamNode {
460 worker_id: node.id,
461 host: node.host.clone().unwrap(),
462 handle,
463 },
464 removed: false,
465 };
466 let this = this_opt.take().expect("should exist");
467 let (node, worker_state) =
468 this.workers.get_mut(&worker_id).expect("should exist");
469 let WorkerNodeState::Connected { control_stream, .. } =
470 worker_state
471 else {
472 unreachable!()
473 };
474 return Poll::Ready((
475 worker_id,
476 WorkerNodeEvent::Connected(WorkerNodeConnected {
477 node,
478 handle: &mut control_stream.handle,
479 }),
480 ));
481 }
482 Poll::Pending => {
483 continue;
484 }
485 }
486 }
487 };
488 match control_stream.handle.response_stream.poll_next_unpin(cx) {
489 Poll::Ready(result) => {
490 {
491 let result = result
492 .ok_or_else(|| (false, anyhow!("end of stream").into()))
493 .and_then(|result| {
494 result.map_err(|err| -> (bool, MetaError) { (false, err.into()) }).and_then(|resp| {
495 match resp
496 .response
497 .ok_or_else(|| (false, anyhow!("empty response").into()))?
498 {
499 streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
500 "worker node {worker_id} is shutting down"
501 )
502 .into())),
503 streaming_control_stream_response::Response::Init(_) => {
504 Err((false, anyhow!("get unexpected init response").into()))
506 }
507 resp => {
508 if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
509 assert_eq!(worker_id, barrier_resp.worker_id);
510 }
511 Ok(resp)
512 }
513 }
514 })
515 });
516 let result = match result {
517 Ok(resp) => Ok(resp),
518 Err((shutdown, err)) => {
519 warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
520 let WorkerNodeState::Connected { removed, .. } = worker_state
521 else {
522 unreachable!("checked connected")
523 };
524 if *removed || shutdown {
525 this.workers.remove(&worker_id);
526 } else {
527 *worker_state = WorkerNodeState::Reconnecting(
528 ControlStreamManager::retry_connect(
529 node.clone(),
530 term_id.to_owned(),
531 context.clone(),
532 ),
533 );
534 }
535 Err(err)
536 }
537 };
538 return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
539 }
540 }
541 Poll::Pending => {
542 continue;
543 }
544 }
545 }
546 };
547
548 Poll::Pending
549 }
550
551 #[await_tree::instrument("control_stream_next_event")]
552 pub(super) async fn next_event<'a>(
553 &'a mut self,
554 term_id: &str,
555 context: &Arc<impl GlobalBarrierWorkerContext>,
556 ) -> (WorkerId, WorkerNodeEvent<'a>) {
557 let mut this = Some(self);
558 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
559 }
560
561 #[await_tree::instrument("control_stream_next_response")]
562 pub(super) async fn next_response(
563 &mut self,
564 term_id: &str,
565 context: &Arc<impl GlobalBarrierWorkerContext>,
566 ) -> (
567 WorkerId,
568 MetaResult<streaming_control_stream_response::Response>,
569 ) {
570 let mut this = Some(self);
571 let (worker_id, event) =
572 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
573 match event {
574 WorkerNodeEvent::Response(result) => (worker_id, result),
575 WorkerNodeEvent::Connected(_) => {
576 unreachable!("set poll_reconnect=false")
577 }
578 }
579 }
580}
581
582pub(super) struct DatabaseInitialBarrierCollector {
583 pub(super) database_id: DatabaseId,
584 pub(super) initializing_partial_graphs: HashSet<PartialGraphId>,
585 pub(super) database: DatabaseCheckpointControl,
586}
587
588impl Debug for DatabaseInitialBarrierCollector {
589 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
590 f.debug_struct("DatabaseInitialBarrierCollector")
591 .field("database_id", &self.database_id)
592 .field("initializing_graphs", &self.initializing_partial_graphs)
593 .finish()
594 }
595}
596
597impl DatabaseInitialBarrierCollector {
598 pub(super) fn is_collected(&self) -> bool {
599 self.initializing_partial_graphs.is_empty()
600 }
601
602 pub(super) fn partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
603 assert!(self.initializing_partial_graphs.remove(&partial_graph_id));
604 }
605
606 pub(super) fn all_partial_graphs(&self) -> impl Iterator<Item = PartialGraphId> + '_ {
607 database_partial_graphs(
608 self.database_id,
609 self.database
610 .independent_checkpoint_job_controls
611 .keys()
612 .copied(),
613 )
614 }
615
616 pub(super) fn finish(self) -> DatabaseCheckpointControl {
617 assert!(self.is_collected());
618 self.database
619 }
620
621 pub(super) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
622 self.database.is_valid_after_worker_err(worker_id)
623 }
624}
625
626impl PartialGraphRecoverer<'_> {
627 #[expect(clippy::too_many_arguments)]
629 pub(super) fn inject_database_initial_barrier(
630 &mut self,
631 database_id: DatabaseId,
632 jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
633 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
634 state_table_committed_epochs: &mut HashMap<TableId, u64>,
635 state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
636 fragment_relations: &FragmentDownstreamRelation,
637 stream_actors: &HashMap<ActorId, StreamActor>,
638 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
639 background_jobs: &mut HashSet<JobId>,
640 mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
641 is_paused: bool,
642 hummock_version_stats: &HummockVersionStats,
643 cdc_table_snapshot_splits: &mut HashMap<JobId, CdcTableSnapshotSplits>,
644 batch_refresh: HashMap<JobId, BatchRefreshRenderResult>,
645 ) -> MetaResult<DatabaseCheckpointControl> {
646 fn collect_source_splits(
647 fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
648 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
649 ) -> HashMap<ActorId, Vec<SplitImpl>> {
650 fragment_infos
651 .flat_map(|info| info.actors.keys())
652 .filter_map(|actor_id| {
653 let actor_id = *actor_id as ActorId;
654 source_splits
655 .remove(&actor_id)
656 .map(|splits| (actor_id, splits))
657 })
658 .collect()
659 }
660 fn build_mutation(
661 splits: &HashMap<ActorId, Vec<SplitImpl>>,
662 cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits>,
663 backfill_orders: &ExtendedFragmentBackfillOrder,
664 is_paused: bool,
665 ) -> Mutation {
666 let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_orders)
667 .into_iter()
668 .collect();
669 Mutation::Add(AddMutation {
670 actor_dispatchers: Default::default(),
672 added_actors: Default::default(),
673 actor_splits: build_actor_connector_splits(splits),
674 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
675 splits: cdc_table_snapshot_split_assignment,
676 }),
677 pause: is_paused,
678 subscriptions_to_add: Default::default(),
679 backfill_nodes_to_pause,
680 new_upstream_sinks: Default::default(),
681 })
682 }
683
684 fn resolve_jobs_committed_epoch(
685 state_table_committed_epochs: &mut HashMap<TableId, u64>,
686 table_ids: impl Iterator<Item = TableId>,
687 ) -> u64 {
688 let mut epochs = table_ids.map(|table_id| {
689 (
690 table_id,
691 state_table_committed_epochs
692 .remove(&table_id)
693 .expect("should exist"),
694 )
695 });
696 let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
697 for (table_id, epoch) in epochs {
698 assert_eq!(
699 prev_epoch, epoch,
700 "{} has different committed epoch to {}",
701 first_table_id, table_id
702 );
703 }
704 prev_epoch
705 }
706 fn job_backfill_orders(
707 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
708 job_id: JobId,
709 ) -> UserDefinedFragmentBackfillOrder {
710 UserDefinedFragmentBackfillOrder::new(
711 job_extra_info
712 .get(&job_id)
713 .and_then(|info| info.backfill_orders.clone())
714 .map_or_else(HashMap::new, |orders| orders.0),
715 )
716 }
717
718 let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
719 .keys()
720 .filter_map(|job_id| {
721 mv_depended_subscriptions
722 .remove(&job_id.as_mv_table_id())
723 .map(|subscriptions| {
724 (
725 job_id.as_mv_table_id(),
726 subscriptions
727 .into_iter()
728 .map(|(subscription_id, retention)| {
729 (
730 subscription_id.as_subscriber_id(),
731 SubscriberType::Subscription(retention),
732 )
733 })
734 .collect(),
735 )
736 })
737 })
738 .collect();
739
740 for (job_id, render_result) in &batch_refresh {
743 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
744 render_result
745 .fragment_infos
746 .values()
747 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
748 )?
749 .0
750 .ok_or_else(|| anyhow!("batch refresh job {} has no snapshot backfill info", job_id))?;
751
752 for upstream_table_id in snapshot_backfill_info
753 .upstream_mv_table_id_to_backfill_epoch
754 .keys()
755 {
756 subscribers
757 .entry(*upstream_table_id)
758 .or_default()
759 .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
760 .expect("non-duplicate");
761 }
762 }
763
764 let mut database_jobs = HashMap::new();
765 let mut snapshot_backfill_jobs = HashMap::new();
766
767 for (job_id, job_fragments) in jobs {
768 if background_jobs.remove(&job_id) {
769 if job_fragments.values().any(|fragment| {
770 fragment
771 .fragment_type_mask
772 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
773 }) {
774 debug!(%job_id, "recovered snapshot backfill job");
775 snapshot_backfill_jobs.insert(job_id, job_fragments);
776 } else {
777 database_jobs.insert(job_id, (job_fragments, true));
778 }
779 } else {
780 database_jobs.insert(job_id, (job_fragments, false));
781 }
782 }
783
784 let database_job_log_epochs: HashMap<_, _> = database_jobs
785 .keys()
786 .filter_map(|job_id| {
787 state_table_log_epochs
788 .remove(&job_id.as_mv_table_id())
789 .map(|epochs| (job_id.as_mv_table_id(), epochs))
790 })
791 .collect();
792
793 let prev_epoch = resolve_jobs_committed_epoch(
794 state_table_committed_epochs,
795 InflightFragmentInfo::existing_table_ids(
796 database_jobs.values().flat_map(|(job, _)| job.values()),
797 ),
798 );
799 let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
800 let curr_epoch = prev_epoch.next();
802 let barrier_info = BarrierInfo {
803 prev_epoch,
804 curr_epoch,
805 kind: BarrierKind::Initial,
806 };
807
808 let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
809 for (job_id, fragment_infos) in snapshot_backfill_jobs {
810 let committed_epoch = resolve_jobs_committed_epoch(
811 state_table_committed_epochs,
812 InflightFragmentInfo::existing_table_ids(fragment_infos.values()),
813 );
814 if committed_epoch == barrier_info.prev_epoch() {
815 info!(
816 "recovered creating snapshot backfill job {} catch up with upstream already",
817 job_id
818 );
819 database_jobs
820 .try_insert(job_id, (fragment_infos, true))
821 .expect("non-duplicate");
822 continue;
823 }
824 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
825 fragment_infos
826 .values()
827 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
828 )?
829 .0
830 .ok_or_else(|| {
831 anyhow!(
832 "recovered snapshot backfill job {} has no snapshot backfill info",
833 job_id
834 )
835 })?;
836 let mut snapshot_epoch = None;
837 let upstream_table_ids: HashSet<_> = snapshot_backfill_info
838 .upstream_mv_table_id_to_backfill_epoch
839 .keys()
840 .cloned()
841 .collect();
842 for (upstream_table_id, epoch) in
843 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
844 {
845 let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
846 let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
847 if *snapshot_epoch != epoch {
848 return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
849 }
850 }
851 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
852 anyhow!(
853 "snapshot backfill job {} has not set snapshot epoch",
854 job_id
855 )
856 })?;
857 for upstream_table_id in &upstream_table_ids {
858 subscribers
859 .entry(*upstream_table_id)
860 .or_default()
861 .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
862 .expect("non-duplicate");
863 }
864 ongoing_snapshot_backfill_jobs
865 .try_insert(
866 job_id,
867 (
868 fragment_infos,
869 upstream_table_ids,
870 committed_epoch,
871 snapshot_epoch,
872 ),
873 )
874 .expect("non-duplicated");
875 }
876
877 let mut cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits> =
878 HashMap::new();
879
880 let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
881 database_jobs
882 .into_iter()
883 .map(|(job_id, (fragment_infos, is_background_creating))| {
884 let status = if is_background_creating {
885 let backfill_ordering = job_backfill_orders(job_extra_info, job_id);
886 let backfill_ordering = StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
887 backfill_ordering,
888 fragment_relations,
889 || fragment_infos.iter().map(|(fragment_id, fragment)| {
890 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
891 }));
892 let locality_fragment_state_table_mapping =
893 build_locality_fragment_state_table_mapping(&fragment_infos);
894 let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
895 &backfill_ordering,
896 &fragment_infos,
897 locality_fragment_state_table_mapping,
898 );
899 CreateStreamingJobStatus::Creating {
900 tracker: CreateMviewProgressTracker::recover(
901 job_id,
902 &fragment_infos,
903 backfill_order_state,
904 hummock_version_stats,
905 ),
906 }
907 } else {
908 CreateStreamingJobStatus::Created
909 };
910 let cdc_table_backfill_tracker =
911 if let Some(splits) = cdc_table_snapshot_splits.remove(&job_id) {
912 let cdc_fragment = fragment_infos
913 .values()
914 .find(|fragment| {
915 is_parallelized_backfill_enabled_cdc_scan_fragment(
916 fragment.fragment_type_mask,
917 &fragment.nodes,
918 )
919 .is_some()
920 })
921 .expect("should have parallel cdc fragment");
922 let cdc_actors = cdc_fragment.actors.keys().copied().collect();
923 let mut tracker =
924 CdcTableBackfillTracker::restore(cdc_fragment.fragment_id, splits);
925 cdc_table_snapshot_split_assignment
926 .extend(tracker.reassign_splits(cdc_actors)?);
927 Some(tracker)
928 } else {
929 None
930 };
931 Ok((
932 job_id,
933 InflightStreamingJobInfo {
934 job_id,
935 fragment_infos,
936 subscribers: subscribers
937 .remove(&job_id.as_mv_table_id())
938 .unwrap_or_default(),
939 status,
940 cdc_table_backfill_tracker,
941 },
942 ))
943 })
944 .try_collect::<_, _, MetaError>()
945 }?;
946
947 let control_stream_manager = self.control_stream_manager();
948 let mut builder = FragmentEdgeBuilder::new(
949 database_jobs
950 .values()
951 .flat_map(|job| {
952 let partial_graph_id = to_partial_graph_id(database_id, None);
953 job.fragment_infos().map(move |info| {
954 (
955 info.fragment_id,
956 EdgeBuilderFragmentInfo::from_inflight(
957 info,
958 partial_graph_id,
959 control_stream_manager,
960 ),
961 )
962 })
963 })
964 .chain(ongoing_snapshot_backfill_jobs.iter().flat_map(
965 |(job_id, (fragments, ..))| {
966 let partial_graph_id = to_partial_graph_id(database_id, Some(*job_id));
967 fragments.values().map(move |fragment| {
968 (
969 fragment.fragment_id,
970 EdgeBuilderFragmentInfo::from_inflight(
971 fragment,
972 partial_graph_id,
973 control_stream_manager,
974 ),
975 )
976 })
977 },
978 )),
979 );
980 builder.add_relations(fragment_relations);
981 let mut edges = builder.build();
982
983 {
984 let new_actors =
985 edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
986 job.fragment_infos.values().map(move |fragment_infos| {
987 (
988 fragment_infos.fragment_id,
989 &fragment_infos.nodes,
990 fragment_infos.actors.iter().map(move |(actor_id, actor)| {
991 (
992 stream_actors.get(actor_id).expect("should exist"),
993 actor.worker_id,
994 )
995 }),
996 job.subscribers.keys().copied(),
997 )
998 })
999 }));
1000
1001 let nodes_actors =
1002 InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
1003 let database_job_source_splits =
1004 collect_source_splits(database_jobs.values().flatten(), source_splits);
1005 let database_backfill_orders =
1006 UserDefinedFragmentBackfillOrder::merge(database_jobs.values().map(|job| {
1007 if matches!(job.status, CreateStreamingJobStatus::Creating { .. }) {
1008 job_backfill_orders(job_extra_info, job.job_id)
1009 } else {
1010 UserDefinedFragmentBackfillOrder::default()
1011 }
1012 }));
1013 let database_backfill_orders =
1014 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1015 database_backfill_orders,
1016 fragment_relations,
1017 || {
1018 database_jobs.values().flat_map(|job_fragments| {
1019 job_fragments
1020 .fragment_infos
1021 .iter()
1022 .map(|(fragment_id, fragment)| {
1023 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
1024 })
1025 })
1026 },
1027 );
1028 let mutation = build_mutation(
1029 &database_job_source_splits,
1030 cdc_table_snapshot_split_assignment,
1031 &database_backfill_orders,
1032 is_paused,
1033 );
1034
1035 let partial_graph_id = to_partial_graph_id(database_id, None);
1036 self.recover_graph(
1037 partial_graph_id,
1038 mutation,
1039 &barrier_info,
1040 &nodes_actors,
1041 InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
1042 new_actors,
1043 DatabaseCheckpointControlMetrics::new(database_id),
1044 )?;
1045 debug!(
1046 %database_id,
1047 "inject initial barrier"
1048 );
1049 };
1050
1051 let mut independent_checkpoint_job_controls: HashMap<
1052 JobId,
1053 IndependentCheckpointJobControl,
1054 > = HashMap::new();
1055 for (job_id, (info, upstream_table_ids, committed_epoch, snapshot_epoch)) in
1056 ongoing_snapshot_backfill_jobs
1057 {
1058 let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
1059 (
1060 fragment_infos.fragment_id,
1061 &fragment_infos.nodes,
1062 fragment_infos.actors.iter().map(move |(actor_id, actor)| {
1063 (
1064 stream_actors.get(actor_id).expect("should exist"),
1065 actor.worker_id,
1066 )
1067 }),
1068 vec![], )
1070 }));
1071
1072 let database_job_source_splits =
1073 collect_source_splits(database_jobs.values().flatten(), source_splits);
1074 assert!(
1075 !cdc_table_snapshot_splits.contains_key(&job_id),
1076 "snapshot backfill job {job_id} should not have cdc backfill"
1077 );
1078 if is_paused {
1079 bail!("should not pause when having snapshot backfill job {job_id}");
1080 }
1081 let job_backfill_orders = job_backfill_orders(job_extra_info, job_id);
1082 let job_backfill_orders =
1083 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1084 job_backfill_orders,
1085 fragment_relations,
1086 || {
1087 info.iter().map(|(fragment_id, fragment)| {
1088 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
1089 })
1090 },
1091 );
1092 let mutation = build_mutation(
1093 &database_job_source_splits,
1094 Default::default(), &job_backfill_orders,
1096 false,
1097 );
1098
1099 let job = CreatingStreamingJobControl::recover(
1100 database_id,
1101 job_id,
1102 upstream_table_ids,
1103 &database_job_log_epochs,
1104 snapshot_epoch,
1105 committed_epoch,
1106 &barrier_info,
1107 info,
1108 job_backfill_orders,
1109 fragment_relations,
1110 hummock_version_stats,
1111 node_actors,
1112 mutation.clone(),
1113 self,
1114 )?;
1115 independent_checkpoint_job_controls.insert(
1116 job_id,
1117 IndependentCheckpointJobControl::CreatingStreamingJob(job),
1118 );
1119 }
1120
1121 for (job_id, render_result) in batch_refresh {
1124 background_jobs.remove(&job_id);
1125 debug!(%job_id, "recovered batch refresh job");
1126
1127 let committed_epoch = resolve_jobs_committed_epoch(
1129 state_table_committed_epochs,
1130 InflightFragmentInfo::existing_table_ids(render_result.fragment_infos.values()),
1131 );
1132
1133 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
1134 render_result
1135 .fragment_infos
1136 .values()
1137 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
1138 )?
1139 .0
1140 .ok_or_else(|| anyhow!("batch refresh job {} has no snapshot backfill info", job_id))?;
1141
1142 let upstream_table_ids: HashSet<TableId> = snapshot_backfill_info
1143 .upstream_mv_table_id_to_backfill_epoch
1144 .keys()
1145 .copied()
1146 .collect();
1147 let snapshot_epoch = snapshot_backfill_info
1148 .upstream_mv_table_id_to_backfill_epoch
1149 .values()
1150 .find_map(|e| *e)
1151 .unwrap_or(committed_epoch);
1152
1153 let job_backfill_orders = job_backfill_orders(job_extra_info, job_id);
1154 let job_backfill_orders =
1155 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1156 job_backfill_orders,
1157 fragment_relations,
1158 || {
1159 render_result
1160 .fragment_infos
1161 .iter()
1162 .map(|(fid, f)| (*fid, f.fragment_type_mask, &f.nodes))
1163 },
1164 );
1165 let mutation = build_mutation(
1166 &Default::default(), Default::default(),
1168 &job_backfill_orders,
1169 false,
1170 );
1171
1172 let refresh_interval_sec = job_extra_info
1173 .get(&job_id)
1174 .and_then(|info| info.refresh_interval_sec)
1175 .expect("batch refresh job should have refresh_interval_sec in job extra info");
1176
1177 let job = BatchRefreshJobCheckpointControl::recover(
1178 database_id,
1179 job_id,
1180 upstream_table_ids,
1181 snapshot_epoch,
1182 committed_epoch,
1183 job_backfill_orders,
1184 hummock_version_stats,
1185 mutation,
1186 render_result,
1187 self,
1188 refresh_interval_sec,
1189 )?;
1190 independent_checkpoint_job_controls
1191 .insert(job_id, IndependentCheckpointJobControl::BatchRefresh(job));
1192 }
1193
1194 self.control_stream_manager()
1195 .env
1196 .shared_actor_infos()
1197 .recover_database(
1198 database_id,
1199 database_jobs
1200 .values()
1201 .flat_map(|info| {
1202 info.fragment_infos()
1203 .map(move |fragment| (fragment, info.job_id))
1204 })
1205 .chain(
1206 independent_checkpoint_job_controls
1207 .iter()
1208 .flat_map(|(job_id, job)| {
1209 let job_id = *job_id;
1210 job.fragment_infos()
1211 .into_iter()
1212 .flat_map(move |infos| infos.values().map(move |f| (f, job_id)))
1213 }),
1214 ),
1215 );
1216
1217 let committed_epoch = barrier_info.prev_epoch();
1218 let new_epoch = barrier_info.curr_epoch;
1219 let database_info = InflightDatabaseInfo::recover(
1220 database_id,
1221 database_jobs.into_values(),
1222 self.control_stream_manager()
1223 .env
1224 .shared_actor_infos()
1225 .clone(),
1226 );
1227 let database_state = BarrierWorkerState::recovery(new_epoch, is_paused);
1228 Ok(DatabaseCheckpointControl::recovery(
1229 database_id,
1230 database_state,
1231 committed_epoch,
1232 database_info,
1233 independent_checkpoint_job_controls,
1234 ))
1235 }
1236}
1237
1238impl ControlStreamManager {
1239 fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
1240 self.workers
1241 .iter()
1242 .filter_map(|(worker_id, (_, worker_state))| match worker_state {
1243 WorkerNodeState::Connected { control_stream, .. } => {
1244 Some((*worker_id, control_stream))
1245 }
1246 WorkerNodeState::Reconnecting(_) => None,
1247 })
1248 }
1249
1250 pub(super) fn inject_barrier(
1251 &mut self,
1252 partial_graph_id: PartialGraphId,
1253 mutation: Option<Mutation>,
1254 barrier_info: &BarrierInfo,
1255 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
1256 table_ids_to_sync: impl Iterator<Item = TableId>,
1257 nodes_to_sync_table: impl Iterator<Item = WorkerId>,
1258 mut new_actors: Option<StreamJobActorsToCreate>,
1259 ) -> MetaResult<NodeToCollect> {
1260 fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1261 "inject_barrier_err"
1262 ));
1263
1264 let nodes_to_sync_table: HashSet<_> = nodes_to_sync_table.collect();
1265
1266 nodes_to_sync_table.iter().for_each(|worker_id| {
1267 assert!(node_actors.contains_key(worker_id), "worker_id {worker_id} in nodes_to_sync_table {nodes_to_sync_table:?} but not in node_actors {node_actors:?}");
1268 });
1269
1270 let mut node_need_collect = NodeToCollect::new();
1271 let table_ids_to_sync = table_ids_to_sync.collect_vec();
1272
1273 node_actors.iter()
1274 .try_for_each(|(worker_id, actor_ids_to_collect)| {
1275 assert!(!actor_ids_to_collect.is_empty(), "empty actor_ids_to_collect on worker {worker_id} in node_actors {node_actors:?}");
1276 let table_ids_to_sync = if nodes_to_sync_table.contains(worker_id) {
1277 table_ids_to_sync.clone()
1278 } else {
1279 vec![]
1280 };
1281
1282 let node = if let Some((_, worker_state)) = self.workers.get(worker_id)
1283 &&
1284 let WorkerNodeState::Connected { control_stream, .. } = worker_state
1285 {
1286 control_stream
1287 } else {
1288 return Err(anyhow!("unconnected worker node {}", worker_id).into());
1289 };
1290
1291 {
1292 let mutation = mutation.clone();
1293 let barrier = Barrier {
1294 epoch: Some(risingwave_pb::data::Epoch {
1295 curr: barrier_info.curr_epoch(),
1296 prev: barrier_info.prev_epoch(),
1297 }),
1298 mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1299 tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1300 .to_protobuf(),
1301 kind: barrier_info.kind.to_protobuf() as i32,
1302 };
1303
1304 node.handle
1305 .request_sender
1306 .send(StreamingControlStreamRequest {
1307 request: Some(
1308 streaming_control_stream_request::Request::InjectBarrier(
1309 InjectBarrierRequest {
1310 request_id: Uuid::new_v4().to_string(),
1311 barrier: Some(barrier),
1312 actor_ids_to_collect: actor_ids_to_collect.iter().copied().collect(),
1313 table_ids_to_sync,
1314 partial_graph_id,
1315 actors_to_build: new_actors
1316 .as_mut()
1317 .map(|new_actors| new_actors.remove(worker_id))
1318 .into_iter()
1319 .flatten()
1320 .flatten()
1321 .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1322 FragmentBuildActorInfo {
1323 fragment_id,
1324 node: Some(node),
1325 actors: actors
1326 .into_iter()
1327 .map(|(actor, upstreams, dispatchers)| {
1328 BuildActorInfo {
1329 actor_id: actor.actor_id,
1330 fragment_upstreams: upstreams
1331 .into_iter()
1332 .map(|(fragment_id, upstreams)| {
1333 (
1334 fragment_id,
1335 UpstreamActors {
1336 actors: upstreams
1337 .into_values()
1338 .collect(),
1339 },
1340 )
1341 })
1342 .collect(),
1343 dispatchers,
1344 vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1345 mview_definition: actor.mview_definition,
1346 expr_context: actor.expr_context,
1347 config_override: actor.config_override.to_string(),
1348 initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1349 }
1350 })
1351 .collect(),
1352 }
1353 })
1354 .collect(),
1355 },
1356 ),
1357 ),
1358 })
1359 .map_err(|_| {
1360 MetaError::from(anyhow!(
1361 "failed to send request to {} {:?}",
1362 node.worker_id,
1363 node.host
1364 ))
1365 })?;
1366
1367 node_need_collect.insert(*worker_id);
1368 Result::<_, MetaError>::Ok(())
1369 }
1370 })
1371 .inspect_err(|e| {
1372 use risingwave_pb::meta::event_log;
1374 let event = event_log::EventInjectBarrierFail {
1375 prev_epoch: barrier_info.prev_epoch(),
1376 cur_epoch: barrier_info.curr_epoch(),
1377 error: e.to_report_string(),
1378 };
1379 self.env
1380 .event_log_manager_ref()
1381 .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1382 })?;
1383 Ok(node_need_collect)
1384 }
1385
1386 pub(super) fn add_partial_graph(&mut self, partial_graph_id: PartialGraphId) {
1387 self.connected_workers().for_each(|(_, node)| {
1388 if node
1389 .handle
1390 .request_sender
1391 .send(StreamingControlStreamRequest {
1392 request: Some(
1393 streaming_control_stream_request::Request::CreatePartialGraph(
1394 CreatePartialGraphRequest {
1395 partial_graph_id,
1396 },
1397 ),
1398 ),
1399 }).is_err() {
1400 let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
1401 warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1402 }
1403 });
1404 }
1405
1406 pub(super) fn remove_partial_graphs(&mut self, partial_graph_ids: Vec<PartialGraphId>) {
1407 self.connected_workers().for_each(|(_, node)| {
1408 if node.handle
1409 .request_sender
1410 .send(StreamingControlStreamRequest {
1411 request: Some(
1412 streaming_control_stream_request::Request::RemovePartialGraph(
1413 RemovePartialGraphRequest {
1414 partial_graph_ids: partial_graph_ids.clone(),
1415 },
1416 ),
1417 ),
1418 })
1419 .is_err()
1420 {
1421 warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1422 }
1423 })
1424 }
1425
1426 pub(super) fn reset_partial_graphs(
1427 &mut self,
1428 partial_graph_ids: Vec<PartialGraphId>,
1429 ) -> HashSet<WorkerId> {
1430 self.connected_workers()
1431 .filter_map(|(worker_id, node)| {
1432 if node
1433 .handle
1434 .request_sender
1435 .send(StreamingControlStreamRequest {
1436 request: Some(
1437 streaming_control_stream_request::Request::ResetPartialGraphs(
1438 ResetPartialGraphsRequest {
1439 partial_graph_ids: partial_graph_ids.clone(),
1440 },
1441 ),
1442 ),
1443 })
1444 .is_err()
1445 {
1446 warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1447 None
1448 } else {
1449 Some(worker_id)
1450 }
1451 })
1452 .collect()
1453 }
1454}
1455
1456impl GlobalBarrierWorkerContextImpl {
1457 pub(super) async fn new_control_stream_impl(
1458 &self,
1459 node: &WorkerNode,
1460 init_request: &PbInitRequest,
1461 ) -> MetaResult<StreamingControlHandle> {
1462 let handle = self
1463 .env
1464 .stream_client_pool()
1465 .get(node)
1466 .await?
1467 .start_streaming_control(init_request.clone())
1468 .await?;
1469 Ok(handle)
1470 }
1471}
1472
1473pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1474 message: &str,
1475 errors: impl IntoIterator<Item = (WorkerId, E)>,
1476) -> MetaError {
1477 use std::fmt::Write;
1478
1479 use risingwave_common::error::error_request_copy;
1480 use risingwave_common::error::tonic::extra::Score;
1481
1482 let errors = errors.into_iter().collect_vec();
1483
1484 if errors.is_empty() {
1485 return anyhow!(message.to_owned()).into();
1486 }
1487
1488 let single_error = |(worker_id, e)| {
1490 anyhow::Error::from(e)
1491 .context(format!("{message}, in worker node {worker_id}"))
1492 .into()
1493 };
1494
1495 if errors.len() == 1 {
1496 return single_error(errors.into_iter().next().unwrap());
1497 }
1498
1499 let max_score = errors
1501 .iter()
1502 .filter_map(|(_, e)| error_request_copy::<Score>(e))
1503 .max();
1504
1505 if let Some(max_score) = max_score {
1506 let mut errors = errors;
1507 let max_scored = errors
1508 .extract_if(.., |(_, e)| {
1509 error_request_copy::<Score>(e) == Some(max_score)
1510 })
1511 .next()
1512 .unwrap();
1513
1514 return single_error(max_scored);
1515 }
1516
1517 let concat: String = errors
1519 .into_iter()
1520 .fold(format!("{message}: "), |mut s, (w, e)| {
1521 write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1522 s
1523 });
1524 anyhow!(concat).into()
1525}
1526
1527#[cfg(test)]
1528mod test_partial_graph_id {
1529 use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
1530
1531 #[test]
1532 fn test_partial_graph_id_conversion() {
1533 let database_id = 233.into();
1534 let job_id = 233.into();
1535 assert_eq!(
1536 (database_id, None),
1537 from_partial_graph_id(to_partial_graph_id(database_id, None))
1538 );
1539 assert_eq!(
1540 (database_id, Some(job_id)),
1541 from_partial_graph_id(to_partial_graph_id(database_id, Some(job_id)))
1542 );
1543 }
1544}