1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::bail;
30use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
31use risingwave_common::id::JobId;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
34use risingwave_common::util::tracing::TracingContext;
35use risingwave_connector::source::SplitImpl;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::common::{HostAddress, WorkerNode};
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::id::PartialGraphId;
40use risingwave_pb::source::{PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration};
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::stream_node::NodeBody;
43use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
44use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
45use risingwave_pb::stream_service::inject_barrier_request::{
46 BuildActorInfo, FragmentBuildActorInfo,
47};
48use risingwave_pb::stream_service::streaming_control_stream_request::{
49 CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
50 RemovePartialGraphRequest, ResetPartialGraphsRequest,
51};
52use risingwave_pb::stream_service::{
53 InjectBarrierRequest, StreamingControlStreamRequest, streaming_control_stream_request,
54 streaming_control_stream_response,
55};
56use risingwave_rpc_client::StreamingControlHandle;
57use thiserror_ext::AsReport;
58use tokio::time::{Instant, sleep};
59use tokio_retry::strategy::ExponentialBackoff;
60use tracing::{debug, error, info, warn};
61use uuid::Uuid;
62
63use super::{BarrierKind, TracedEpoch};
64use crate::barrier::BackfillOrderState;
65use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
66use crate::barrier::cdc_progress::CdcTableBackfillTracker;
67use crate::barrier::checkpoint::{
68 BarrierWorkerState, BatchRefreshJobCheckpointControl, BatchRefreshRenderResult,
69 CreatingStreamingJobControl, DatabaseCheckpointControl, DatabaseCheckpointControlMetrics,
70 IndependentCheckpointJobControl,
71};
72use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
73use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
74use crate::barrier::info::{
75 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
76 SubscriberType,
77};
78use crate::barrier::partial_graph::PartialGraphRecoverer;
79use crate::barrier::progress::CreateMviewProgressTracker;
80use crate::barrier::utils::NodeToCollect;
81use crate::controller::fragment::InflightFragmentInfo;
82use crate::controller::utils::StreamingJobExtraInfo;
83use crate::manager::MetaSrvEnv;
84use crate::model::{
85 ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
86 SubscriptionId,
87};
88use crate::stream::cdc::{
89 CdcTableSnapshotSplits, is_parallelized_backfill_enabled_cdc_scan_fragment,
90};
91use crate::stream::{
92 ExtendedFragmentBackfillOrder, StreamFragmentGraph, UserDefinedFragmentBackfillOrder,
93 build_actor_connector_splits,
94};
95use crate::{MetaError, MetaResult};
96
97pub(super) fn to_partial_graph_id(
98 database_id: DatabaseId,
99 creating_job_id: Option<JobId>,
100) -> PartialGraphId {
101 let raw_job_id = creating_job_id
102 .map(|job_id| {
103 assert_ne!(job_id, u32::MAX);
104 job_id.as_raw_id()
105 })
106 .unwrap_or(u32::MAX);
107 (((database_id.as_raw_id() as u64) << 32) | (raw_job_id as u64)).into()
108}
109
110pub(super) fn from_partial_graph_id(
111 partial_graph_id: PartialGraphId,
112) -> (DatabaseId, Option<JobId>) {
113 let id = partial_graph_id.as_raw_id();
114 let database_id = (id >> 32) as u32;
115 let raw_creating_job_id = (id & ((1 << 32) - 1)) as u32;
116 let creating_job_id = if raw_creating_job_id == u32::MAX {
117 None
118 } else {
119 Some(JobId::new(raw_creating_job_id))
120 };
121 (database_id.into(), creating_job_id)
122}
123
124pub(super) fn build_locality_fragment_state_table_mapping(
125 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
126) -> HashMap<FragmentId, Vec<TableId>> {
127 let mut mapping = HashMap::new();
128
129 for (fragment_id, fragment_info) in fragment_infos {
130 let mut state_table_ids = Vec::new();
131 visit_stream_node_cont(&fragment_info.nodes, |stream_node| {
132 if let Some(NodeBody::LocalityProvider(locality_provider)) =
133 stream_node.node_body.as_ref()
134 {
135 let state_table_id = locality_provider
136 .state_table
137 .as_ref()
138 .expect("must have state table")
139 .id;
140 state_table_ids.push(state_table_id);
141 false
142 } else {
143 true
144 }
145 });
146 if !state_table_ids.is_empty() {
147 mapping.insert(*fragment_id, state_table_ids);
148 }
149 }
150
151 mapping
152}
153
154pub(super) fn database_partial_graphs<'a>(
155 database_id: DatabaseId,
156 creating_jobs: impl Iterator<Item = JobId> + Sized + 'a,
157) -> impl Iterator<Item = PartialGraphId> + 'a {
158 creating_jobs
159 .map(Some)
160 .chain([None])
161 .map(move |creating_job_id| to_partial_graph_id(database_id, creating_job_id))
162}
163
164struct ControlStreamNode {
165 worker_id: WorkerId,
166 host: HostAddress,
167 handle: StreamingControlHandle,
168}
169
170enum WorkerNodeState {
171 Connected {
172 control_stream: ControlStreamNode,
173 removed: bool,
174 },
175 Reconnecting(BoxFuture<'static, StreamingControlHandle>),
176}
177
178pub(super) struct ControlStreamManager {
179 workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
180 pub env: MetaSrvEnv,
181}
182
183impl ControlStreamManager {
184 pub(super) fn new(env: MetaSrvEnv) -> Self {
185 Self {
186 workers: Default::default(),
187 env,
188 }
189 }
190
191 pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
192 self.workers[&worker_id].0.host.clone().unwrap()
193 }
194
195 pub(super) async fn add_worker(
196 &mut self,
197 node: WorkerNode,
198 partial_graphs: impl Iterator<Item = PartialGraphId>,
199 term_id: &String,
200 context: &impl GlobalBarrierWorkerContext,
201 ) {
202 let node_id = node.id;
203 if let Entry::Occupied(entry) = self.workers.entry(node_id) {
204 let (existing_node, worker_state) = entry.get();
205 assert_eq!(existing_node.host, node.host);
206 warn!(id = %node.id, host = ?node.host, "node already exists");
207 match worker_state {
208 WorkerNodeState::Connected { .. } => {
209 warn!(id = %node.id, host = ?node.host, "new node already connected");
210 return;
211 }
212 WorkerNodeState::Reconnecting(_) => {
213 warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
214 entry.remove();
215 }
216 }
217 }
218 let node_host = node.host.clone().unwrap();
219 let mut backoff = ExponentialBackoff::from_millis(100)
220 .max_delay(Duration::from_secs(3))
221 .factor(5);
222 const MAX_RETRY: usize = 5;
223 for i in 1..=MAX_RETRY {
224 match context
225 .new_control_stream(
226 &node,
227 &PbInitRequest {
228 term_id: term_id.clone(),
229 },
230 )
231 .await
232 {
233 Ok(mut handle) => {
234 WorkerNodeConnected {
235 handle: &mut handle,
236 node: &node,
237 }
238 .initialize(partial_graphs);
239 info!(?node_host, "add control stream worker");
240 assert!(
241 self.workers
242 .insert(
243 node_id,
244 (
245 node,
246 WorkerNodeState::Connected {
247 control_stream: ControlStreamNode {
248 worker_id: node_id as _,
249 host: node_host,
250 handle,
251 },
252 removed: false
253 }
254 )
255 )
256 .is_none()
257 );
258 return;
259 }
260 Err(e) => {
261 let delay = backoff.next().unwrap();
264 error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
265 sleep(delay).await;
266 }
267 }
268 }
269 error!(?node_host, "fail to create worker node after retry");
270 }
271
272 pub(super) fn remove_worker(&mut self, node: WorkerNode) {
273 if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
274 let (_, worker_state) = entry.get_mut();
275 match worker_state {
276 WorkerNodeState::Connected { removed, .. } => {
277 info!(worker_id = %node.id, "mark connected worker as removed");
278 *removed = true;
279 }
280 WorkerNodeState::Reconnecting(_) => {
281 info!(worker_id = %node.id, "remove worker");
282 entry.remove();
283 }
284 }
285 }
286 }
287
288 fn retry_connect(
289 node: WorkerNode,
290 term_id: String,
291 context: Arc<impl GlobalBarrierWorkerContext>,
292 ) -> BoxFuture<'static, StreamingControlHandle> {
293 async move {
294 let mut attempt = 0;
295 let backoff = ExponentialBackoff::from_millis(100)
296 .max_delay(Duration::from_mins(1))
297 .factor(5);
298 let init_request = PbInitRequest { term_id };
299 for delay in backoff {
300 attempt += 1;
301 sleep(delay).await;
302 match context.new_control_stream(&node, &init_request).await {
303 Ok(handle) => {
304 return handle;
305 }
306 Err(e) => {
307 warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
308 }
309 }
310 }
311 unreachable!("end of retry backoff")
312 }.boxed()
313 }
314
315 pub(super) async fn recover(
316 env: MetaSrvEnv,
317 nodes: &HashMap<WorkerId, WorkerNode>,
318 term_id: &str,
319 context: Arc<impl GlobalBarrierWorkerContext>,
320 ) -> Self {
321 let reset_start_time = Instant::now();
322 let init_request = PbInitRequest {
323 term_id: term_id.to_owned(),
324 };
325 let init_request = &init_request;
326 let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
327 let result = context.new_control_stream(node, init_request).await;
328 (*worker_id, node.clone(), result)
329 }))
330 .await;
331 let mut unconnected_workers = HashSet::new();
332 let mut workers = HashMap::new();
333 for (worker_id, node, result) in nodes {
334 match result {
335 Ok(handle) => {
336 let control_stream = ControlStreamNode {
337 worker_id: node.id,
338 host: node.host.clone().unwrap(),
339 handle,
340 };
341 assert!(
342 workers
343 .insert(
344 worker_id,
345 (
346 node,
347 WorkerNodeState::Connected {
348 control_stream,
349 removed: false
350 }
351 )
352 )
353 .is_none()
354 );
355 }
356 Err(e) => {
357 unconnected_workers.insert(worker_id);
358 warn!(
359 e = %e.as_report(),
360 %worker_id,
361 ?node,
362 "failed to connect to node"
363 );
364 assert!(
365 workers
366 .insert(
367 worker_id,
368 (
369 node.clone(),
370 WorkerNodeState::Reconnecting(Self::retry_connect(
371 node,
372 term_id.to_owned(),
373 context.clone()
374 ))
375 )
376 )
377 .is_none()
378 );
379 }
380 }
381 }
382
383 info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
384
385 Self { workers, env }
386 }
387
388 pub(super) fn clear(&mut self) {
390 *self = Self::new(self.env.clone());
391 }
392}
393
394pub(super) struct WorkerNodeConnected<'a> {
395 node: &'a WorkerNode,
396 handle: &'a mut StreamingControlHandle,
397}
398
399impl<'a> WorkerNodeConnected<'a> {
400 pub(super) fn initialize(self, partial_graphs: impl Iterator<Item = PartialGraphId>) {
401 for partial_graph_id in partial_graphs {
402 if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
403 request: Some(
404 streaming_control_stream_request::Request::CreatePartialGraph(
405 PbCreatePartialGraphRequest { partial_graph_id },
406 ),
407 ),
408 }) {
409 warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
410 }
411 }
412 }
413}
414
415pub(super) enum WorkerNodeEvent<'a> {
416 Response(MetaResult<streaming_control_stream_response::Response>),
417 Connected(WorkerNodeConnected<'a>),
418}
419
420impl ControlStreamManager {
421 fn poll_next_event<'a>(
422 this_opt: &mut Option<&'a mut Self>,
423 cx: &mut Context<'_>,
424 term_id: &str,
425 context: &Arc<impl GlobalBarrierWorkerContext>,
426 poll_reconnect: bool,
427 ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
428 let this = this_opt.as_mut().expect("Future polled after completion");
429 if this.workers.is_empty() {
430 return Poll::Pending;
431 }
432 {
433 for (&worker_id, (node, worker_state)) in &mut this.workers {
434 let control_stream = match worker_state {
435 WorkerNodeState::Connected { control_stream, .. } => control_stream,
436 WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
437 continue;
438 }
439 WorkerNodeState::Reconnecting(join_handle) => {
440 match join_handle.poll_unpin(cx) {
441 Poll::Ready(handle) => {
442 info!(id=%node.id, host=?node.host, "reconnected to worker");
443 *worker_state = WorkerNodeState::Connected {
444 control_stream: ControlStreamNode {
445 worker_id: node.id,
446 host: node.host.clone().unwrap(),
447 handle,
448 },
449 removed: false,
450 };
451 let this = this_opt.take().expect("should exist");
452 let (node, worker_state) =
453 this.workers.get_mut(&worker_id).expect("should exist");
454 let WorkerNodeState::Connected { control_stream, .. } =
455 worker_state
456 else {
457 unreachable!()
458 };
459 return Poll::Ready((
460 worker_id,
461 WorkerNodeEvent::Connected(WorkerNodeConnected {
462 node,
463 handle: &mut control_stream.handle,
464 }),
465 ));
466 }
467 Poll::Pending => {
468 continue;
469 }
470 }
471 }
472 };
473 match control_stream.handle.response_stream.poll_next_unpin(cx) {
474 Poll::Ready(result) => {
475 {
476 let result = result
477 .ok_or_else(|| (false, anyhow!("end of stream").into()))
478 .and_then(|result| {
479 result.map_err(|err| -> (bool, MetaError) { (false, err.into()) }).and_then(|resp| {
480 match resp
481 .response
482 .ok_or_else(|| (false, anyhow!("empty response").into()))?
483 {
484 streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
485 "worker node {worker_id} is shutting down"
486 )
487 .into())),
488 streaming_control_stream_response::Response::Init(_) => {
489 Err((false, anyhow!("get unexpected init response").into()))
491 }
492 resp => {
493 if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
494 assert_eq!(worker_id, barrier_resp.worker_id);
495 }
496 Ok(resp)
497 }
498 }
499 })
500 });
501 let result = match result {
502 Ok(resp) => Ok(resp),
503 Err((shutdown, err)) => {
504 warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
505 let WorkerNodeState::Connected { removed, .. } = worker_state
506 else {
507 unreachable!("checked connected")
508 };
509 if *removed || shutdown {
510 this.workers.remove(&worker_id);
511 } else {
512 *worker_state = WorkerNodeState::Reconnecting(
513 ControlStreamManager::retry_connect(
514 node.clone(),
515 term_id.to_owned(),
516 context.clone(),
517 ),
518 );
519 }
520 Err(err)
521 }
522 };
523 return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
524 }
525 }
526 Poll::Pending => {
527 continue;
528 }
529 }
530 }
531 };
532
533 Poll::Pending
534 }
535
536 #[await_tree::instrument("control_stream_next_event")]
537 pub(super) async fn next_event<'a>(
538 &'a mut self,
539 term_id: &str,
540 context: &Arc<impl GlobalBarrierWorkerContext>,
541 ) -> (WorkerId, WorkerNodeEvent<'a>) {
542 let mut this = Some(self);
543 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
544 }
545
546 #[await_tree::instrument("control_stream_next_response")]
547 pub(super) async fn next_response(
548 &mut self,
549 term_id: &str,
550 context: &Arc<impl GlobalBarrierWorkerContext>,
551 ) -> (
552 WorkerId,
553 MetaResult<streaming_control_stream_response::Response>,
554 ) {
555 let mut this = Some(self);
556 let (worker_id, event) =
557 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
558 match event {
559 WorkerNodeEvent::Response(result) => (worker_id, result),
560 WorkerNodeEvent::Connected(_) => {
561 unreachable!("set poll_reconnect=false")
562 }
563 }
564 }
565}
566
567pub(super) struct DatabaseInitialBarrierCollector {
568 pub(super) database_id: DatabaseId,
569 pub(super) initializing_partial_graphs: HashSet<PartialGraphId>,
570 pub(super) database: DatabaseCheckpointControl,
571}
572
573impl Debug for DatabaseInitialBarrierCollector {
574 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
575 f.debug_struct("DatabaseInitialBarrierCollector")
576 .field("database_id", &self.database_id)
577 .field("initializing_graphs", &self.initializing_partial_graphs)
578 .finish()
579 }
580}
581
582impl DatabaseInitialBarrierCollector {
583 pub(super) fn is_collected(&self) -> bool {
584 self.initializing_partial_graphs.is_empty()
585 }
586
587 pub(super) fn partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
588 assert!(self.initializing_partial_graphs.remove(&partial_graph_id));
589 }
590
591 pub(super) fn all_partial_graphs(&self) -> impl Iterator<Item = PartialGraphId> + '_ {
592 database_partial_graphs(
593 self.database_id,
594 self.database
595 .independent_checkpoint_job_controls
596 .keys()
597 .copied(),
598 )
599 }
600
601 pub(super) fn finish(self) -> DatabaseCheckpointControl {
602 assert!(self.is_collected());
603 self.database
604 }
605
606 pub(super) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
607 self.database.is_valid_after_worker_err(worker_id)
608 }
609}
610
611impl PartialGraphRecoverer<'_> {
612 #[expect(clippy::too_many_arguments)]
614 pub(super) fn inject_database_initial_barrier(
615 &mut self,
616 database_id: DatabaseId,
617 jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
618 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
619 state_table_committed_epochs: &mut HashMap<TableId, u64>,
620 state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
621 fragment_relations: &FragmentDownstreamRelation,
622 stream_actors: &HashMap<ActorId, StreamActor>,
623 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
624 background_jobs: &mut HashSet<JobId>,
625 mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
626 is_paused: bool,
627 hummock_version_stats: &HummockVersionStats,
628 cdc_table_snapshot_splits: &mut HashMap<JobId, CdcTableSnapshotSplits>,
629 batch_refresh: HashMap<JobId, BatchRefreshRenderResult>,
630 ) -> MetaResult<DatabaseCheckpointControl> {
631 fn collect_source_splits(
632 fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
633 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
634 ) -> HashMap<ActorId, Vec<SplitImpl>> {
635 fragment_infos
636 .flat_map(|info| info.actors.keys())
637 .filter_map(|actor_id| {
638 let actor_id = *actor_id as ActorId;
639 source_splits
640 .remove(&actor_id)
641 .map(|splits| (actor_id, splits))
642 })
643 .collect()
644 }
645 fn build_mutation(
646 splits: &HashMap<ActorId, Vec<SplitImpl>>,
647 cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits>,
648 backfill_orders: &ExtendedFragmentBackfillOrder,
649 is_paused: bool,
650 ) -> Mutation {
651 let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_orders)
652 .into_iter()
653 .collect();
654 Mutation::Add(AddMutation {
655 actor_dispatchers: Default::default(),
657 added_actors: Default::default(),
658 actor_splits: build_actor_connector_splits(splits),
659 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
660 splits: cdc_table_snapshot_split_assignment,
661 }),
662 pause: is_paused,
663 subscriptions_to_add: Default::default(),
664 backfill_nodes_to_pause,
665 new_upstream_sinks: Default::default(),
666 })
667 }
668
669 fn resolve_jobs_committed_epoch(
670 state_table_committed_epochs: &mut HashMap<TableId, u64>,
671 table_ids: impl Iterator<Item = TableId>,
672 ) -> u64 {
673 let mut epochs = table_ids.map(|table_id| {
674 (
675 table_id,
676 state_table_committed_epochs
677 .remove(&table_id)
678 .expect("should exist"),
679 )
680 });
681 let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
682 for (table_id, epoch) in epochs {
683 assert_eq!(
684 prev_epoch, epoch,
685 "{} has different committed epoch to {}",
686 first_table_id, table_id
687 );
688 }
689 prev_epoch
690 }
691 fn job_backfill_orders(
692 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
693 job_id: JobId,
694 ) -> UserDefinedFragmentBackfillOrder {
695 UserDefinedFragmentBackfillOrder::new(
696 job_extra_info
697 .get(&job_id)
698 .and_then(|info| info.backfill_orders.clone())
699 .map_or_else(HashMap::new, |orders| orders.0),
700 )
701 }
702
703 let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
704 .keys()
705 .filter_map(|job_id| {
706 mv_depended_subscriptions
707 .remove(&job_id.as_mv_table_id())
708 .map(|subscriptions| {
709 (
710 job_id.as_mv_table_id(),
711 subscriptions
712 .into_iter()
713 .map(|(subscription_id, retention)| {
714 (
715 subscription_id.as_subscriber_id(),
716 SubscriberType::Subscription(retention),
717 )
718 })
719 .collect(),
720 )
721 })
722 })
723 .collect();
724
725 let mut database_jobs = HashMap::new();
726 let mut snapshot_backfill_jobs = HashMap::new();
727
728 for (job_id, job_fragments) in jobs {
729 if background_jobs.remove(&job_id) {
730 if job_fragments.values().any(|fragment| {
731 fragment
732 .fragment_type_mask
733 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
734 }) {
735 debug!(%job_id, "recovered snapshot backfill job");
736 snapshot_backfill_jobs.insert(job_id, job_fragments);
737 } else {
738 database_jobs.insert(job_id, (job_fragments, true));
739 }
740 } else {
741 database_jobs.insert(job_id, (job_fragments, false));
742 }
743 }
744
745 let database_job_log_epochs: HashMap<_, _> = database_jobs
746 .keys()
747 .filter_map(|job_id| {
748 state_table_log_epochs
749 .remove(&job_id.as_mv_table_id())
750 .map(|epochs| (job_id.as_mv_table_id(), epochs))
751 })
752 .collect();
753
754 let prev_epoch = resolve_jobs_committed_epoch(
755 state_table_committed_epochs,
756 InflightFragmentInfo::existing_table_ids(
757 database_jobs.values().flat_map(|(job, _)| job.values()),
758 ),
759 );
760 let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
761 let curr_epoch = prev_epoch.next();
763 let barrier_info = BarrierInfo {
764 prev_epoch,
765 curr_epoch,
766 kind: BarrierKind::Initial,
767 };
768
769 let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
770 for (job_id, fragment_infos) in snapshot_backfill_jobs {
771 let committed_epoch = resolve_jobs_committed_epoch(
772 state_table_committed_epochs,
773 InflightFragmentInfo::existing_table_ids(fragment_infos.values()),
774 );
775 if committed_epoch == barrier_info.prev_epoch() {
776 info!(
777 "recovered creating snapshot backfill job {} catch up with upstream already",
778 job_id
779 );
780 database_jobs
781 .try_insert(job_id, (fragment_infos, true))
782 .expect("non-duplicate");
783 continue;
784 }
785 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
786 fragment_infos
787 .values()
788 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
789 )?
790 .0
791 .ok_or_else(|| {
792 anyhow!(
793 "recovered snapshot backfill job {} has no snapshot backfill info",
794 job_id
795 )
796 })?;
797 let mut snapshot_epoch = None;
798 let upstream_table_ids: HashSet<_> = snapshot_backfill_info
799 .upstream_mv_table_id_to_backfill_epoch
800 .keys()
801 .cloned()
802 .collect();
803 for (upstream_table_id, epoch) in
804 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
805 {
806 let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
807 let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
808 if *snapshot_epoch != epoch {
809 return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
810 }
811 }
812 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
813 anyhow!(
814 "snapshot backfill job {} has not set snapshot epoch",
815 job_id
816 )
817 })?;
818 for upstream_table_id in &upstream_table_ids {
819 subscribers
820 .entry(*upstream_table_id)
821 .or_default()
822 .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
823 .expect("non-duplicate");
824 }
825 ongoing_snapshot_backfill_jobs
826 .try_insert(
827 job_id,
828 (
829 fragment_infos,
830 upstream_table_ids,
831 committed_epoch,
832 snapshot_epoch,
833 ),
834 )
835 .expect("non-duplicated");
836 }
837
838 let mut cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits> =
839 HashMap::new();
840
841 let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
842 database_jobs
843 .into_iter()
844 .map(|(job_id, (fragment_infos, is_background_creating))| {
845 let status = if is_background_creating {
846 let backfill_ordering = job_backfill_orders(job_extra_info, job_id);
847 let backfill_ordering = StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
848 backfill_ordering,
849 fragment_relations,
850 || fragment_infos.iter().map(|(fragment_id, fragment)| {
851 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
852 }));
853 let locality_fragment_state_table_mapping =
854 build_locality_fragment_state_table_mapping(&fragment_infos);
855 let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
856 &backfill_ordering,
857 &fragment_infos,
858 locality_fragment_state_table_mapping,
859 );
860 CreateStreamingJobStatus::Creating {
861 tracker: CreateMviewProgressTracker::recover(
862 job_id,
863 &fragment_infos,
864 backfill_order_state,
865 hummock_version_stats,
866 ),
867 }
868 } else {
869 CreateStreamingJobStatus::Created
870 };
871 let cdc_table_backfill_tracker =
872 if let Some(splits) = cdc_table_snapshot_splits.remove(&job_id) {
873 let cdc_fragment = fragment_infos
874 .values()
875 .find(|fragment| {
876 is_parallelized_backfill_enabled_cdc_scan_fragment(
877 fragment.fragment_type_mask,
878 &fragment.nodes,
879 )
880 .is_some()
881 })
882 .expect("should have parallel cdc fragment");
883 let cdc_actors = cdc_fragment.actors.keys().copied().collect();
884 let mut tracker =
885 CdcTableBackfillTracker::restore(cdc_fragment.fragment_id, splits);
886 cdc_table_snapshot_split_assignment
887 .extend(tracker.reassign_splits(cdc_actors)?);
888 Some(tracker)
889 } else {
890 None
891 };
892 Ok((
893 job_id,
894 InflightStreamingJobInfo {
895 job_id,
896 fragment_infos,
897 subscribers: subscribers
898 .remove(&job_id.as_mv_table_id())
899 .unwrap_or_default(),
900 status,
901 cdc_table_backfill_tracker,
902 },
903 ))
904 })
905 .try_collect::<_, _, MetaError>()
906 }?;
907
908 let control_stream_manager = self.control_stream_manager();
909 let mut builder = FragmentEdgeBuilder::new(
910 database_jobs
911 .values()
912 .flat_map(|job| {
913 let partial_graph_id = to_partial_graph_id(database_id, None);
914 job.fragment_infos().map(move |info| {
915 (
916 info.fragment_id,
917 EdgeBuilderFragmentInfo::from_inflight(
918 info,
919 partial_graph_id,
920 control_stream_manager,
921 ),
922 )
923 })
924 })
925 .chain(ongoing_snapshot_backfill_jobs.iter().flat_map(
926 |(job_id, (fragments, ..))| {
927 let partial_graph_id = to_partial_graph_id(database_id, Some(*job_id));
928 fragments.values().map(move |fragment| {
929 (
930 fragment.fragment_id,
931 EdgeBuilderFragmentInfo::from_inflight(
932 fragment,
933 partial_graph_id,
934 control_stream_manager,
935 ),
936 )
937 })
938 },
939 )),
940 );
941 builder.add_relations(fragment_relations);
942 let mut edges = builder.build();
943
944 {
945 let new_actors =
946 edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
947 job.fragment_infos.values().map(move |fragment_infos| {
948 (
949 fragment_infos.fragment_id,
950 &fragment_infos.nodes,
951 fragment_infos.actors.iter().map(move |(actor_id, actor)| {
952 (
953 stream_actors.get(actor_id).expect("should exist"),
954 actor.worker_id,
955 )
956 }),
957 job.subscribers.keys().copied(),
958 )
959 })
960 }));
961
962 let nodes_actors =
963 InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
964 let database_job_source_splits =
965 collect_source_splits(database_jobs.values().flatten(), source_splits);
966 let database_backfill_orders =
967 UserDefinedFragmentBackfillOrder::merge(database_jobs.values().map(|job| {
968 if matches!(job.status, CreateStreamingJobStatus::Creating { .. }) {
969 job_backfill_orders(job_extra_info, job.job_id)
970 } else {
971 UserDefinedFragmentBackfillOrder::default()
972 }
973 }));
974 let database_backfill_orders =
975 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
976 database_backfill_orders,
977 fragment_relations,
978 || {
979 database_jobs.values().flat_map(|job_fragments| {
980 job_fragments
981 .fragment_infos
982 .iter()
983 .map(|(fragment_id, fragment)| {
984 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
985 })
986 })
987 },
988 );
989 let mutation = build_mutation(
990 &database_job_source_splits,
991 cdc_table_snapshot_split_assignment,
992 &database_backfill_orders,
993 is_paused,
994 );
995
996 let partial_graph_id = to_partial_graph_id(database_id, None);
997 self.recover_graph(
998 partial_graph_id,
999 mutation,
1000 &barrier_info,
1001 &nodes_actors,
1002 InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
1003 new_actors,
1004 DatabaseCheckpointControlMetrics::new(database_id),
1005 )?;
1006 debug!(
1007 %database_id,
1008 "inject initial barrier"
1009 );
1010 };
1011
1012 let mut independent_checkpoint_job_controls: HashMap<
1013 JobId,
1014 IndependentCheckpointJobControl,
1015 > = HashMap::new();
1016 for (job_id, (info, upstream_table_ids, committed_epoch, snapshot_epoch)) in
1017 ongoing_snapshot_backfill_jobs
1018 {
1019 let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
1020 (
1021 fragment_infos.fragment_id,
1022 &fragment_infos.nodes,
1023 fragment_infos.actors.iter().map(move |(actor_id, actor)| {
1024 (
1025 stream_actors.get(actor_id).expect("should exist"),
1026 actor.worker_id,
1027 )
1028 }),
1029 vec![], )
1031 }));
1032
1033 let database_job_source_splits =
1034 collect_source_splits(database_jobs.values().flatten(), source_splits);
1035 assert!(
1036 !cdc_table_snapshot_splits.contains_key(&job_id),
1037 "snapshot backfill job {job_id} should not have cdc backfill"
1038 );
1039 if is_paused {
1040 bail!("should not pause when having snapshot backfill job {job_id}");
1041 }
1042 let job_backfill_orders = job_backfill_orders(job_extra_info, job_id);
1043 let job_backfill_orders =
1044 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1045 job_backfill_orders,
1046 fragment_relations,
1047 || {
1048 info.iter().map(|(fragment_id, fragment)| {
1049 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
1050 })
1051 },
1052 );
1053 let mutation = build_mutation(
1054 &database_job_source_splits,
1055 Default::default(), &job_backfill_orders,
1057 false,
1058 );
1059
1060 let job = CreatingStreamingJobControl::recover(
1061 database_id,
1062 job_id,
1063 upstream_table_ids,
1064 &database_job_log_epochs,
1065 snapshot_epoch,
1066 committed_epoch,
1067 &barrier_info,
1068 info,
1069 job_backfill_orders,
1070 fragment_relations,
1071 hummock_version_stats,
1072 node_actors,
1073 mutation.clone(),
1074 self,
1075 )?;
1076 independent_checkpoint_job_controls.insert(
1077 job_id,
1078 IndependentCheckpointJobControl::CreatingStreamingJob(job),
1079 );
1080 }
1081
1082 for (job_id, render_result) in batch_refresh {
1085 background_jobs.remove(&job_id);
1086 debug!(%job_id, "recovered batch refresh job");
1087
1088 let committed_epoch = resolve_jobs_committed_epoch(
1090 state_table_committed_epochs,
1091 InflightFragmentInfo::existing_table_ids(render_result.fragment_infos.values()),
1092 );
1093
1094 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
1095 render_result
1096 .fragment_infos
1097 .values()
1098 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
1099 )?
1100 .0
1101 .ok_or_else(|| anyhow!("batch refresh job {} has no snapshot backfill info", job_id))?;
1102
1103 let upstream_table_ids: HashSet<TableId> = snapshot_backfill_info
1104 .upstream_mv_table_id_to_backfill_epoch
1105 .keys()
1106 .copied()
1107 .collect();
1108 let snapshot_epoch = snapshot_backfill_info
1109 .upstream_mv_table_id_to_backfill_epoch
1110 .values()
1111 .find_map(|e| *e)
1112 .unwrap_or(committed_epoch);
1113
1114 for upstream_table_id in &upstream_table_ids {
1116 subscribers
1117 .entry(*upstream_table_id)
1118 .or_default()
1119 .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
1120 .expect("non-duplicate");
1121 }
1122
1123 let job_backfill_orders = job_backfill_orders(job_extra_info, job_id);
1124 let job_backfill_orders =
1125 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1126 job_backfill_orders,
1127 fragment_relations,
1128 || {
1129 render_result
1130 .fragment_infos
1131 .iter()
1132 .map(|(fid, f)| (*fid, f.fragment_type_mask, &f.nodes))
1133 },
1134 );
1135 let mutation = build_mutation(
1136 &Default::default(), Default::default(),
1138 &job_backfill_orders,
1139 false,
1140 );
1141
1142 let job = BatchRefreshJobCheckpointControl::recover(
1143 database_id,
1144 job_id,
1145 upstream_table_ids,
1146 snapshot_epoch,
1147 committed_epoch,
1148 job_backfill_orders,
1149 hummock_version_stats,
1150 mutation,
1151 render_result,
1152 self,
1153 )?;
1154 independent_checkpoint_job_controls
1155 .insert(job_id, IndependentCheckpointJobControl::BatchRefresh(job));
1156 }
1157
1158 self.control_stream_manager()
1159 .env
1160 .shared_actor_infos()
1161 .recover_database(
1162 database_id,
1163 database_jobs
1164 .values()
1165 .flat_map(|info| {
1166 info.fragment_infos()
1167 .map(move |fragment| (fragment, info.job_id))
1168 })
1169 .chain(
1170 independent_checkpoint_job_controls
1171 .iter()
1172 .flat_map(|(job_id, job)| {
1173 let job_id = *job_id;
1174 job.fragment_infos()
1175 .into_iter()
1176 .flat_map(move |infos| infos.values().map(move |f| (f, job_id)))
1177 }),
1178 ),
1179 );
1180
1181 let committed_epoch = barrier_info.prev_epoch();
1182 let new_epoch = barrier_info.curr_epoch;
1183 let database_info = InflightDatabaseInfo::recover(
1184 database_id,
1185 database_jobs.into_values(),
1186 self.control_stream_manager()
1187 .env
1188 .shared_actor_infos()
1189 .clone(),
1190 );
1191 let database_state = BarrierWorkerState::recovery(new_epoch, is_paused);
1192 Ok(DatabaseCheckpointControl::recovery(
1193 database_id,
1194 database_state,
1195 committed_epoch,
1196 database_info,
1197 independent_checkpoint_job_controls,
1198 ))
1199 }
1200}
1201
1202impl ControlStreamManager {
1203 fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
1204 self.workers
1205 .iter()
1206 .filter_map(|(worker_id, (_, worker_state))| match worker_state {
1207 WorkerNodeState::Connected { control_stream, .. } => {
1208 Some((*worker_id, control_stream))
1209 }
1210 WorkerNodeState::Reconnecting(_) => None,
1211 })
1212 }
1213
1214 pub(super) fn inject_barrier(
1215 &mut self,
1216 partial_graph_id: PartialGraphId,
1217 mutation: Option<Mutation>,
1218 barrier_info: &BarrierInfo,
1219 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
1220 table_ids_to_sync: impl Iterator<Item = TableId>,
1221 nodes_to_sync_table: impl Iterator<Item = WorkerId>,
1222 mut new_actors: Option<StreamJobActorsToCreate>,
1223 ) -> MetaResult<NodeToCollect> {
1224 fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1225 "inject_barrier_err"
1226 ));
1227
1228 let nodes_to_sync_table: HashSet<_> = nodes_to_sync_table.collect();
1229
1230 nodes_to_sync_table.iter().for_each(|worker_id| {
1231 assert!(node_actors.contains_key(worker_id), "worker_id {worker_id} in nodes_to_sync_table {nodes_to_sync_table:?} but not in node_actors {node_actors:?}");
1232 });
1233
1234 let mut node_need_collect = NodeToCollect::new();
1235 let table_ids_to_sync = table_ids_to_sync.collect_vec();
1236
1237 node_actors.iter()
1238 .try_for_each(|(worker_id, actor_ids_to_collect)| {
1239 assert!(!actor_ids_to_collect.is_empty(), "empty actor_ids_to_collect on worker {worker_id} in node_actors {node_actors:?}");
1240 let table_ids_to_sync = if nodes_to_sync_table.contains(worker_id) {
1241 table_ids_to_sync.clone()
1242 } else {
1243 vec![]
1244 };
1245
1246 let node = if let Some((_, worker_state)) = self.workers.get(worker_id)
1247 &&
1248 let WorkerNodeState::Connected { control_stream, .. } = worker_state
1249 {
1250 control_stream
1251 } else {
1252 return Err(anyhow!("unconnected worker node {}", worker_id).into());
1253 };
1254
1255 {
1256 let mutation = mutation.clone();
1257 let barrier = Barrier {
1258 epoch: Some(risingwave_pb::data::Epoch {
1259 curr: barrier_info.curr_epoch(),
1260 prev: barrier_info.prev_epoch(),
1261 }),
1262 mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1263 tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1264 .to_protobuf(),
1265 kind: barrier_info.kind.to_protobuf() as i32,
1266 };
1267
1268 node.handle
1269 .request_sender
1270 .send(StreamingControlStreamRequest {
1271 request: Some(
1272 streaming_control_stream_request::Request::InjectBarrier(
1273 InjectBarrierRequest {
1274 request_id: Uuid::new_v4().to_string(),
1275 barrier: Some(barrier),
1276 actor_ids_to_collect: actor_ids_to_collect.iter().copied().collect(),
1277 table_ids_to_sync,
1278 partial_graph_id,
1279 actors_to_build: new_actors
1280 .as_mut()
1281 .map(|new_actors| new_actors.remove(worker_id))
1282 .into_iter()
1283 .flatten()
1284 .flatten()
1285 .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1286 FragmentBuildActorInfo {
1287 fragment_id,
1288 node: Some(node),
1289 actors: actors
1290 .into_iter()
1291 .map(|(actor, upstreams, dispatchers)| {
1292 BuildActorInfo {
1293 actor_id: actor.actor_id,
1294 fragment_upstreams: upstreams
1295 .into_iter()
1296 .map(|(fragment_id, upstreams)| {
1297 (
1298 fragment_id,
1299 UpstreamActors {
1300 actors: upstreams
1301 .into_values()
1302 .collect(),
1303 },
1304 )
1305 })
1306 .collect(),
1307 dispatchers,
1308 vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1309 mview_definition: actor.mview_definition,
1310 expr_context: actor.expr_context,
1311 config_override: actor.config_override.to_string(),
1312 initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1313 }
1314 })
1315 .collect(),
1316 }
1317 })
1318 .collect(),
1319 },
1320 ),
1321 ),
1322 })
1323 .map_err(|_| {
1324 MetaError::from(anyhow!(
1325 "failed to send request to {} {:?}",
1326 node.worker_id,
1327 node.host
1328 ))
1329 })?;
1330
1331 node_need_collect.insert(*worker_id);
1332 Result::<_, MetaError>::Ok(())
1333 }
1334 })
1335 .inspect_err(|e| {
1336 use risingwave_pb::meta::event_log;
1338 let event = event_log::EventInjectBarrierFail {
1339 prev_epoch: barrier_info.prev_epoch(),
1340 cur_epoch: barrier_info.curr_epoch(),
1341 error: e.to_report_string(),
1342 };
1343 self.env
1344 .event_log_manager_ref()
1345 .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1346 })?;
1347 Ok(node_need_collect)
1348 }
1349
1350 pub(super) fn add_partial_graph(&mut self, partial_graph_id: PartialGraphId) {
1351 self.connected_workers().for_each(|(_, node)| {
1352 if node
1353 .handle
1354 .request_sender
1355 .send(StreamingControlStreamRequest {
1356 request: Some(
1357 streaming_control_stream_request::Request::CreatePartialGraph(
1358 CreatePartialGraphRequest {
1359 partial_graph_id,
1360 },
1361 ),
1362 ),
1363 }).is_err() {
1364 let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
1365 warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1366 }
1367 });
1368 }
1369
1370 pub(super) fn remove_partial_graphs(&mut self, partial_graph_ids: Vec<PartialGraphId>) {
1371 self.connected_workers().for_each(|(_, node)| {
1372 if node.handle
1373 .request_sender
1374 .send(StreamingControlStreamRequest {
1375 request: Some(
1376 streaming_control_stream_request::Request::RemovePartialGraph(
1377 RemovePartialGraphRequest {
1378 partial_graph_ids: partial_graph_ids.clone(),
1379 },
1380 ),
1381 ),
1382 })
1383 .is_err()
1384 {
1385 warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1386 }
1387 })
1388 }
1389
1390 pub(super) fn reset_partial_graphs(
1391 &mut self,
1392 partial_graph_ids: Vec<PartialGraphId>,
1393 ) -> HashSet<WorkerId> {
1394 self.connected_workers()
1395 .filter_map(|(worker_id, node)| {
1396 if node
1397 .handle
1398 .request_sender
1399 .send(StreamingControlStreamRequest {
1400 request: Some(
1401 streaming_control_stream_request::Request::ResetPartialGraphs(
1402 ResetPartialGraphsRequest {
1403 partial_graph_ids: partial_graph_ids.clone(),
1404 },
1405 ),
1406 ),
1407 })
1408 .is_err()
1409 {
1410 warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1411 None
1412 } else {
1413 Some(worker_id)
1414 }
1415 })
1416 .collect()
1417 }
1418}
1419
1420impl GlobalBarrierWorkerContextImpl {
1421 pub(super) async fn new_control_stream_impl(
1422 &self,
1423 node: &WorkerNode,
1424 init_request: &PbInitRequest,
1425 ) -> MetaResult<StreamingControlHandle> {
1426 let handle = self
1427 .env
1428 .stream_client_pool()
1429 .get(node)
1430 .await?
1431 .start_streaming_control(init_request.clone())
1432 .await?;
1433 Ok(handle)
1434 }
1435}
1436
1437pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1438 message: &str,
1439 errors: impl IntoIterator<Item = (WorkerId, E)>,
1440) -> MetaError {
1441 use std::fmt::Write;
1442
1443 use risingwave_common::error::error_request_copy;
1444 use risingwave_common::error::tonic::extra::Score;
1445
1446 let errors = errors.into_iter().collect_vec();
1447
1448 if errors.is_empty() {
1449 return anyhow!(message.to_owned()).into();
1450 }
1451
1452 let single_error = |(worker_id, e)| {
1454 anyhow::Error::from(e)
1455 .context(format!("{message}, in worker node {worker_id}"))
1456 .into()
1457 };
1458
1459 if errors.len() == 1 {
1460 return single_error(errors.into_iter().next().unwrap());
1461 }
1462
1463 let max_score = errors
1465 .iter()
1466 .filter_map(|(_, e)| error_request_copy::<Score>(e))
1467 .max();
1468
1469 if let Some(max_score) = max_score {
1470 let mut errors = errors;
1471 let max_scored = errors
1472 .extract_if(.., |(_, e)| {
1473 error_request_copy::<Score>(e) == Some(max_score)
1474 })
1475 .next()
1476 .unwrap();
1477
1478 return single_error(max_scored);
1479 }
1480
1481 let concat: String = errors
1483 .into_iter()
1484 .fold(format!("{message}: "), |mut s, (w, e)| {
1485 write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1486 s
1487 });
1488 anyhow!(concat).into()
1489}
1490
1491#[cfg(test)]
1492mod test_partial_graph_id {
1493 use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
1494
1495 #[test]
1496 fn test_partial_graph_id_conversion() {
1497 let database_id = 233.into();
1498 let job_id = 233.into();
1499 assert_eq!(
1500 (database_id, None),
1501 from_partial_graph_id(to_partial_graph_id(database_id, None))
1502 );
1503 assert_eq!(
1504 (database_id, Some(job_id)),
1505 from_partial_graph_id(to_partial_graph_id(database_id, Some(job_id)))
1506 );
1507 }
1508}