1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::bail;
30use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
31use risingwave_common::id::JobId;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
34use risingwave_common::util::tracing::TracingContext;
35use risingwave_connector::source::SplitImpl;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::common::{HostAddress, WorkerNode};
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::id::PartialGraphId;
40use risingwave_pb::source::{PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration};
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::stream_node::NodeBody;
43use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
44use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
45use risingwave_pb::stream_service::inject_barrier_request::{
46 BuildActorInfo, FragmentBuildActorInfo,
47};
48use risingwave_pb::stream_service::streaming_control_stream_request::{
49 CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
50 RemovePartialGraphRequest, ResetPartialGraphsRequest,
51};
52use risingwave_pb::stream_service::{
53 InjectBarrierRequest, StreamingControlStreamRequest, streaming_control_stream_request,
54 streaming_control_stream_response,
55};
56use risingwave_rpc_client::StreamingControlHandle;
57use thiserror_ext::AsReport;
58use tokio::time::{Instant, sleep};
59use tokio_retry::strategy::ExponentialBackoff;
60use tracing::{debug, error, info, warn};
61use uuid::Uuid;
62
63use super::{BarrierKind, TracedEpoch};
64use crate::barrier::BackfillOrderState;
65use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
66use crate::barrier::cdc_progress::CdcTableBackfillTracker;
67use crate::barrier::checkpoint::{
68 BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
69 DatabaseCheckpointControlMetrics,
70};
71use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
72use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
73use crate::barrier::info::{
74 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
75 SubscriberType,
76};
77use crate::barrier::partial_graph::PartialGraphRecoverer;
78use crate::barrier::progress::CreateMviewProgressTracker;
79use crate::barrier::utils::NodeToCollect;
80use crate::controller::fragment::InflightFragmentInfo;
81use crate::controller::utils::StreamingJobExtraInfo;
82use crate::manager::MetaSrvEnv;
83use crate::model::{
84 ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
85 SubscriptionId,
86};
87use crate::stream::cdc::{
88 CdcTableSnapshotSplits, is_parallelized_backfill_enabled_cdc_scan_fragment,
89};
90use crate::stream::{
91 ExtendedFragmentBackfillOrder, StreamFragmentGraph, UserDefinedFragmentBackfillOrder,
92 build_actor_connector_splits,
93};
94use crate::{MetaError, MetaResult};
95
96pub(super) fn to_partial_graph_id(
97 database_id: DatabaseId,
98 creating_job_id: Option<JobId>,
99) -> PartialGraphId {
100 let raw_job_id = creating_job_id
101 .map(|job_id| {
102 assert_ne!(job_id, u32::MAX);
103 job_id.as_raw_id()
104 })
105 .unwrap_or(u32::MAX);
106 (((database_id.as_raw_id() as u64) << 32) | (raw_job_id as u64)).into()
107}
108
109pub(super) fn from_partial_graph_id(
110 partial_graph_id: PartialGraphId,
111) -> (DatabaseId, Option<JobId>) {
112 let id = partial_graph_id.as_raw_id();
113 let database_id = (id >> 32) as u32;
114 let raw_creating_job_id = (id & ((1 << 32) - 1)) as u32;
115 let creating_job_id = if raw_creating_job_id == u32::MAX {
116 None
117 } else {
118 Some(JobId::new(raw_creating_job_id))
119 };
120 (database_id.into(), creating_job_id)
121}
122
123pub(super) fn build_locality_fragment_state_table_mapping(
124 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
125) -> HashMap<FragmentId, Vec<TableId>> {
126 let mut mapping = HashMap::new();
127
128 for (fragment_id, fragment_info) in fragment_infos {
129 let mut state_table_ids = Vec::new();
130 visit_stream_node_cont(&fragment_info.nodes, |stream_node| {
131 if let Some(NodeBody::LocalityProvider(locality_provider)) =
132 stream_node.node_body.as_ref()
133 {
134 let state_table_id = locality_provider
135 .state_table
136 .as_ref()
137 .expect("must have state table")
138 .id;
139 state_table_ids.push(state_table_id);
140 false
141 } else {
142 true
143 }
144 });
145 if !state_table_ids.is_empty() {
146 mapping.insert(*fragment_id, state_table_ids);
147 }
148 }
149
150 mapping
151}
152
153pub(super) fn database_partial_graphs<'a>(
154 database_id: DatabaseId,
155 creating_jobs: impl Iterator<Item = JobId> + Sized + 'a,
156) -> impl Iterator<Item = PartialGraphId> + 'a {
157 creating_jobs
158 .map(Some)
159 .chain([None])
160 .map(move |creating_job_id| to_partial_graph_id(database_id, creating_job_id))
161}
162
163struct ControlStreamNode {
164 worker_id: WorkerId,
165 host: HostAddress,
166 handle: StreamingControlHandle,
167}
168
169enum WorkerNodeState {
170 Connected {
171 control_stream: ControlStreamNode,
172 removed: bool,
173 },
174 Reconnecting(BoxFuture<'static, StreamingControlHandle>),
175}
176
177pub(super) struct ControlStreamManager {
178 workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
179 pub env: MetaSrvEnv,
180}
181
182impl ControlStreamManager {
183 pub(super) fn new(env: MetaSrvEnv) -> Self {
184 Self {
185 workers: Default::default(),
186 env,
187 }
188 }
189
190 pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
191 self.workers[&worker_id].0.host.clone().unwrap()
192 }
193
194 pub(super) async fn add_worker(
195 &mut self,
196 node: WorkerNode,
197 partial_graphs: impl Iterator<Item = PartialGraphId>,
198 term_id: &String,
199 context: &impl GlobalBarrierWorkerContext,
200 ) {
201 let node_id = node.id;
202 if let Entry::Occupied(entry) = self.workers.entry(node_id) {
203 let (existing_node, worker_state) = entry.get();
204 assert_eq!(existing_node.host, node.host);
205 warn!(id = %node.id, host = ?node.host, "node already exists");
206 match worker_state {
207 WorkerNodeState::Connected { .. } => {
208 warn!(id = %node.id, host = ?node.host, "new node already connected");
209 return;
210 }
211 WorkerNodeState::Reconnecting(_) => {
212 warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
213 entry.remove();
214 }
215 }
216 }
217 let node_host = node.host.clone().unwrap();
218 let mut backoff = ExponentialBackoff::from_millis(100)
219 .max_delay(Duration::from_secs(3))
220 .factor(5);
221 const MAX_RETRY: usize = 5;
222 for i in 1..=MAX_RETRY {
223 match context
224 .new_control_stream(
225 &node,
226 &PbInitRequest {
227 term_id: term_id.clone(),
228 },
229 )
230 .await
231 {
232 Ok(mut handle) => {
233 WorkerNodeConnected {
234 handle: &mut handle,
235 node: &node,
236 }
237 .initialize(partial_graphs);
238 info!(?node_host, "add control stream worker");
239 assert!(
240 self.workers
241 .insert(
242 node_id,
243 (
244 node,
245 WorkerNodeState::Connected {
246 control_stream: ControlStreamNode {
247 worker_id: node_id as _,
248 host: node_host,
249 handle,
250 },
251 removed: false
252 }
253 )
254 )
255 .is_none()
256 );
257 return;
258 }
259 Err(e) => {
260 let delay = backoff.next().unwrap();
263 error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
264 sleep(delay).await;
265 }
266 }
267 }
268 error!(?node_host, "fail to create worker node after retry");
269 }
270
271 pub(super) fn remove_worker(&mut self, node: WorkerNode) {
272 if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
273 let (_, worker_state) = entry.get_mut();
274 match worker_state {
275 WorkerNodeState::Connected { removed, .. } => {
276 info!(worker_id = %node.id, "mark connected worker as removed");
277 *removed = true;
278 }
279 WorkerNodeState::Reconnecting(_) => {
280 info!(worker_id = %node.id, "remove worker");
281 entry.remove();
282 }
283 }
284 }
285 }
286
287 fn retry_connect(
288 node: WorkerNode,
289 term_id: String,
290 context: Arc<impl GlobalBarrierWorkerContext>,
291 ) -> BoxFuture<'static, StreamingControlHandle> {
292 async move {
293 let mut attempt = 0;
294 let backoff = ExponentialBackoff::from_millis(100)
295 .max_delay(Duration::from_mins(1))
296 .factor(5);
297 let init_request = PbInitRequest { term_id };
298 for delay in backoff {
299 attempt += 1;
300 sleep(delay).await;
301 match context.new_control_stream(&node, &init_request).await {
302 Ok(handle) => {
303 return handle;
304 }
305 Err(e) => {
306 warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
307 }
308 }
309 }
310 unreachable!("end of retry backoff")
311 }.boxed()
312 }
313
314 pub(super) async fn recover(
315 env: MetaSrvEnv,
316 nodes: &HashMap<WorkerId, WorkerNode>,
317 term_id: &str,
318 context: Arc<impl GlobalBarrierWorkerContext>,
319 ) -> Self {
320 let reset_start_time = Instant::now();
321 let init_request = PbInitRequest {
322 term_id: term_id.to_owned(),
323 };
324 let init_request = &init_request;
325 let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
326 let result = context.new_control_stream(node, init_request).await;
327 (*worker_id, node.clone(), result)
328 }))
329 .await;
330 let mut unconnected_workers = HashSet::new();
331 let mut workers = HashMap::new();
332 for (worker_id, node, result) in nodes {
333 match result {
334 Ok(handle) => {
335 let control_stream = ControlStreamNode {
336 worker_id: node.id,
337 host: node.host.clone().unwrap(),
338 handle,
339 };
340 assert!(
341 workers
342 .insert(
343 worker_id,
344 (
345 node,
346 WorkerNodeState::Connected {
347 control_stream,
348 removed: false
349 }
350 )
351 )
352 .is_none()
353 );
354 }
355 Err(e) => {
356 unconnected_workers.insert(worker_id);
357 warn!(
358 e = %e.as_report(),
359 %worker_id,
360 ?node,
361 "failed to connect to node"
362 );
363 assert!(
364 workers
365 .insert(
366 worker_id,
367 (
368 node.clone(),
369 WorkerNodeState::Reconnecting(Self::retry_connect(
370 node,
371 term_id.to_owned(),
372 context.clone()
373 ))
374 )
375 )
376 .is_none()
377 );
378 }
379 }
380 }
381
382 info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
383
384 Self { workers, env }
385 }
386
387 pub(super) fn clear(&mut self) {
389 *self = Self::new(self.env.clone());
390 }
391}
392
393pub(super) struct WorkerNodeConnected<'a> {
394 node: &'a WorkerNode,
395 handle: &'a mut StreamingControlHandle,
396}
397
398impl<'a> WorkerNodeConnected<'a> {
399 pub(super) fn initialize(self, partial_graphs: impl Iterator<Item = PartialGraphId>) {
400 for partial_graph_id in partial_graphs {
401 if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
402 request: Some(
403 streaming_control_stream_request::Request::CreatePartialGraph(
404 PbCreatePartialGraphRequest { partial_graph_id },
405 ),
406 ),
407 }) {
408 warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
409 }
410 }
411 }
412}
413
414pub(super) enum WorkerNodeEvent<'a> {
415 Response(MetaResult<streaming_control_stream_response::Response>),
416 Connected(WorkerNodeConnected<'a>),
417}
418
419impl ControlStreamManager {
420 fn poll_next_event<'a>(
421 this_opt: &mut Option<&'a mut Self>,
422 cx: &mut Context<'_>,
423 term_id: &str,
424 context: &Arc<impl GlobalBarrierWorkerContext>,
425 poll_reconnect: bool,
426 ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
427 let this = this_opt.as_mut().expect("Future polled after completion");
428 if this.workers.is_empty() {
429 return Poll::Pending;
430 }
431 {
432 for (&worker_id, (node, worker_state)) in &mut this.workers {
433 let control_stream = match worker_state {
434 WorkerNodeState::Connected { control_stream, .. } => control_stream,
435 WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
436 continue;
437 }
438 WorkerNodeState::Reconnecting(join_handle) => {
439 match join_handle.poll_unpin(cx) {
440 Poll::Ready(handle) => {
441 info!(id=%node.id, host=?node.host, "reconnected to worker");
442 *worker_state = WorkerNodeState::Connected {
443 control_stream: ControlStreamNode {
444 worker_id: node.id,
445 host: node.host.clone().unwrap(),
446 handle,
447 },
448 removed: false,
449 };
450 let this = this_opt.take().expect("should exist");
451 let (node, worker_state) =
452 this.workers.get_mut(&worker_id).expect("should exist");
453 let WorkerNodeState::Connected { control_stream, .. } =
454 worker_state
455 else {
456 unreachable!()
457 };
458 return Poll::Ready((
459 worker_id,
460 WorkerNodeEvent::Connected(WorkerNodeConnected {
461 node,
462 handle: &mut control_stream.handle,
463 }),
464 ));
465 }
466 Poll::Pending => {
467 continue;
468 }
469 }
470 }
471 };
472 match control_stream.handle.response_stream.poll_next_unpin(cx) {
473 Poll::Ready(result) => {
474 {
475 let result = result
476 .ok_or_else(|| (false, anyhow!("end of stream").into()))
477 .and_then(|result| {
478 result.map_err(|err| -> (bool, MetaError) { (false, err.into()) }).and_then(|resp| {
479 match resp
480 .response
481 .ok_or_else(|| (false, anyhow!("empty response").into()))?
482 {
483 streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
484 "worker node {worker_id} is shutting down"
485 )
486 .into())),
487 streaming_control_stream_response::Response::Init(_) => {
488 Err((false, anyhow!("get unexpected init response").into()))
490 }
491 resp => {
492 if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
493 assert_eq!(worker_id, barrier_resp.worker_id);
494 }
495 Ok(resp)
496 }
497 }
498 })
499 });
500 let result = match result {
501 Ok(resp) => Ok(resp),
502 Err((shutdown, err)) => {
503 warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
504 let WorkerNodeState::Connected { removed, .. } = worker_state
505 else {
506 unreachable!("checked connected")
507 };
508 if *removed || shutdown {
509 this.workers.remove(&worker_id);
510 } else {
511 *worker_state = WorkerNodeState::Reconnecting(
512 ControlStreamManager::retry_connect(
513 node.clone(),
514 term_id.to_owned(),
515 context.clone(),
516 ),
517 );
518 }
519 Err(err)
520 }
521 };
522 return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
523 }
524 }
525 Poll::Pending => {
526 continue;
527 }
528 }
529 }
530 };
531
532 Poll::Pending
533 }
534
535 #[await_tree::instrument("control_stream_next_event")]
536 pub(super) async fn next_event<'a>(
537 &'a mut self,
538 term_id: &str,
539 context: &Arc<impl GlobalBarrierWorkerContext>,
540 ) -> (WorkerId, WorkerNodeEvent<'a>) {
541 let mut this = Some(self);
542 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
543 }
544
545 #[await_tree::instrument("control_stream_next_response")]
546 pub(super) async fn next_response(
547 &mut self,
548 term_id: &str,
549 context: &Arc<impl GlobalBarrierWorkerContext>,
550 ) -> (
551 WorkerId,
552 MetaResult<streaming_control_stream_response::Response>,
553 ) {
554 let mut this = Some(self);
555 let (worker_id, event) =
556 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
557 match event {
558 WorkerNodeEvent::Response(result) => (worker_id, result),
559 WorkerNodeEvent::Connected(_) => {
560 unreachable!("set poll_reconnect=false")
561 }
562 }
563 }
564}
565
566pub(super) struct DatabaseInitialBarrierCollector {
567 pub(super) database_id: DatabaseId,
568 pub(super) initializing_partial_graphs: HashSet<PartialGraphId>,
569 pub(super) database: DatabaseCheckpointControl,
570}
571
572impl Debug for DatabaseInitialBarrierCollector {
573 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
574 f.debug_struct("DatabaseInitialBarrierCollector")
575 .field("database_id", &self.database_id)
576 .field("initializing_graphs", &self.initializing_partial_graphs)
577 .finish()
578 }
579}
580
581impl DatabaseInitialBarrierCollector {
582 pub(super) fn is_collected(&self) -> bool {
583 self.initializing_partial_graphs.is_empty()
584 }
585
586 pub(super) fn partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
587 assert!(self.initializing_partial_graphs.remove(&partial_graph_id));
588 }
589
590 pub(super) fn all_partial_graphs(&self) -> impl Iterator<Item = PartialGraphId> + '_ {
591 database_partial_graphs(
592 self.database_id,
593 self.database
594 .creating_streaming_job_controls
595 .keys()
596 .copied(),
597 )
598 }
599
600 pub(super) fn finish(self) -> DatabaseCheckpointControl {
601 assert!(self.is_collected());
602 self.database
603 }
604
605 pub(super) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
606 self.database.is_valid_after_worker_err(worker_id)
607 }
608}
609
610impl PartialGraphRecoverer<'_> {
611 #[expect(clippy::too_many_arguments)]
613 pub(super) fn inject_database_initial_barrier(
614 &mut self,
615 database_id: DatabaseId,
616 jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
617 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
618 state_table_committed_epochs: &mut HashMap<TableId, u64>,
619 state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
620 fragment_relations: &FragmentDownstreamRelation,
621 stream_actors: &HashMap<ActorId, StreamActor>,
622 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
623 background_jobs: &mut HashSet<JobId>,
624 mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
625 is_paused: bool,
626 hummock_version_stats: &HummockVersionStats,
627 cdc_table_snapshot_splits: &mut HashMap<JobId, CdcTableSnapshotSplits>,
628 ) -> MetaResult<DatabaseCheckpointControl> {
629 fn collect_source_splits(
630 fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
631 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
632 ) -> HashMap<ActorId, Vec<SplitImpl>> {
633 fragment_infos
634 .flat_map(|info| info.actors.keys())
635 .filter_map(|actor_id| {
636 let actor_id = *actor_id as ActorId;
637 source_splits
638 .remove(&actor_id)
639 .map(|splits| (actor_id, splits))
640 })
641 .collect()
642 }
643 fn build_mutation(
644 splits: &HashMap<ActorId, Vec<SplitImpl>>,
645 cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits>,
646 backfill_orders: &ExtendedFragmentBackfillOrder,
647 is_paused: bool,
648 ) -> Mutation {
649 let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_orders)
650 .into_iter()
651 .collect();
652 Mutation::Add(AddMutation {
653 actor_dispatchers: Default::default(),
655 added_actors: Default::default(),
656 actor_splits: build_actor_connector_splits(splits),
657 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
658 splits: cdc_table_snapshot_split_assignment,
659 }),
660 pause: is_paused,
661 subscriptions_to_add: Default::default(),
662 backfill_nodes_to_pause,
663 new_upstream_sinks: Default::default(),
664 })
665 }
666
667 fn resolve_jobs_committed_epoch<'a>(
668 state_table_committed_epochs: &mut HashMap<TableId, u64>,
669 fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
670 ) -> u64 {
671 let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
672 (
673 table_id,
674 state_table_committed_epochs
675 .remove(&table_id)
676 .expect("should exist"),
677 )
678 });
679 let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
680 for (table_id, epoch) in epochs {
681 assert_eq!(
682 prev_epoch, epoch,
683 "{} has different committed epoch to {}",
684 first_table_id, table_id
685 );
686 }
687 prev_epoch
688 }
689 fn job_backfill_orders(
690 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
691 job_id: JobId,
692 ) -> UserDefinedFragmentBackfillOrder {
693 UserDefinedFragmentBackfillOrder::new(
694 job_extra_info
695 .get(&job_id)
696 .and_then(|info| info.backfill_orders.clone())
697 .map_or_else(HashMap::new, |orders| orders.0),
698 )
699 }
700
701 let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
702 .keys()
703 .filter_map(|job_id| {
704 mv_depended_subscriptions
705 .remove(&job_id.as_mv_table_id())
706 .map(|subscriptions| {
707 (
708 job_id.as_mv_table_id(),
709 subscriptions
710 .into_iter()
711 .map(|(subscription_id, retention)| {
712 (
713 subscription_id.as_subscriber_id(),
714 SubscriberType::Subscription(retention),
715 )
716 })
717 .collect(),
718 )
719 })
720 })
721 .collect();
722
723 let mut database_jobs = HashMap::new();
724 let mut snapshot_backfill_jobs = HashMap::new();
725
726 for (job_id, job_fragments) in jobs {
727 if background_jobs.remove(&job_id) {
728 if job_fragments.values().any(|fragment| {
729 fragment
730 .fragment_type_mask
731 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
732 }) {
733 debug!(%job_id, "recovered snapshot backfill job");
734 snapshot_backfill_jobs.insert(job_id, job_fragments);
735 } else {
736 database_jobs.insert(job_id, (job_fragments, true));
737 }
738 } else {
739 database_jobs.insert(job_id, (job_fragments, false));
740 }
741 }
742
743 let database_job_log_epochs: HashMap<_, _> = database_jobs
744 .keys()
745 .filter_map(|job_id| {
746 state_table_log_epochs
747 .remove(&job_id.as_mv_table_id())
748 .map(|epochs| (job_id.as_mv_table_id(), epochs))
749 })
750 .collect();
751
752 let prev_epoch = resolve_jobs_committed_epoch(
753 state_table_committed_epochs,
754 database_jobs.values().flat_map(|(job, _)| job.values()),
755 );
756 let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
757 let curr_epoch = prev_epoch.next();
759 let barrier_info = BarrierInfo {
760 prev_epoch,
761 curr_epoch,
762 kind: BarrierKind::Initial,
763 };
764
765 let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
766 for (job_id, fragment_infos) in snapshot_backfill_jobs {
767 let committed_epoch =
768 resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
769 if committed_epoch == barrier_info.prev_epoch() {
770 info!(
771 "recovered creating snapshot backfill job {} catch up with upstream already",
772 job_id
773 );
774 database_jobs
775 .try_insert(job_id, (fragment_infos, true))
776 .expect("non-duplicate");
777 continue;
778 }
779 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
780 fragment_infos
781 .values()
782 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
783 )?
784 .0
785 .ok_or_else(|| {
786 anyhow!(
787 "recovered snapshot backfill job {} has no snapshot backfill info",
788 job_id
789 )
790 })?;
791 let mut snapshot_epoch = None;
792 let upstream_table_ids: HashSet<_> = snapshot_backfill_info
793 .upstream_mv_table_id_to_backfill_epoch
794 .keys()
795 .cloned()
796 .collect();
797 for (upstream_table_id, epoch) in
798 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
799 {
800 let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
801 let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
802 if *snapshot_epoch != epoch {
803 return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
804 }
805 }
806 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
807 anyhow!(
808 "snapshot backfill job {} has not set snapshot epoch",
809 job_id
810 )
811 })?;
812 for upstream_table_id in &upstream_table_ids {
813 subscribers
814 .entry(*upstream_table_id)
815 .or_default()
816 .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
817 .expect("non-duplicate");
818 }
819 ongoing_snapshot_backfill_jobs
820 .try_insert(
821 job_id,
822 (
823 fragment_infos,
824 upstream_table_ids,
825 committed_epoch,
826 snapshot_epoch,
827 ),
828 )
829 .expect("non-duplicated");
830 }
831
832 let mut cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits> =
833 HashMap::new();
834
835 let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
836 database_jobs
837 .into_iter()
838 .map(|(job_id, (fragment_infos, is_background_creating))| {
839 let status = if is_background_creating {
840 let backfill_ordering = job_backfill_orders(job_extra_info, job_id);
841 let backfill_ordering = StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
842 backfill_ordering,
843 fragment_relations,
844 || fragment_infos.iter().map(|(fragment_id, fragment)| {
845 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
846 }));
847 let locality_fragment_state_table_mapping =
848 build_locality_fragment_state_table_mapping(&fragment_infos);
849 let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
850 &backfill_ordering,
851 &fragment_infos,
852 locality_fragment_state_table_mapping,
853 );
854 CreateStreamingJobStatus::Creating {
855 tracker: CreateMviewProgressTracker::recover(
856 job_id,
857 &fragment_infos,
858 backfill_order_state,
859 hummock_version_stats,
860 ),
861 }
862 } else {
863 CreateStreamingJobStatus::Created
864 };
865 let cdc_table_backfill_tracker =
866 if let Some(splits) = cdc_table_snapshot_splits.remove(&job_id) {
867 let cdc_fragment = fragment_infos
868 .values()
869 .find(|fragment| {
870 is_parallelized_backfill_enabled_cdc_scan_fragment(
871 fragment.fragment_type_mask,
872 &fragment.nodes,
873 )
874 .is_some()
875 })
876 .expect("should have parallel cdc fragment");
877 let cdc_actors = cdc_fragment.actors.keys().copied().collect();
878 let mut tracker =
879 CdcTableBackfillTracker::restore(cdc_fragment.fragment_id, splits);
880 cdc_table_snapshot_split_assignment
881 .extend(tracker.reassign_splits(cdc_actors)?);
882 Some(tracker)
883 } else {
884 None
885 };
886 Ok((
887 job_id,
888 InflightStreamingJobInfo {
889 job_id,
890 fragment_infos,
891 subscribers: subscribers
892 .remove(&job_id.as_mv_table_id())
893 .unwrap_or_default(),
894 status,
895 cdc_table_backfill_tracker,
896 },
897 ))
898 })
899 .try_collect::<_, _, MetaError>()
900 }?;
901
902 let control_stream_manager = self.control_stream_manager();
903 let mut builder = FragmentEdgeBuilder::new(
904 database_jobs
905 .values()
906 .flat_map(|job| {
907 let partial_graph_id = to_partial_graph_id(database_id, None);
908 job.fragment_infos().map(move |info| {
909 (
910 info.fragment_id,
911 EdgeBuilderFragmentInfo::from_inflight(
912 info,
913 partial_graph_id,
914 control_stream_manager,
915 ),
916 )
917 })
918 })
919 .chain(ongoing_snapshot_backfill_jobs.iter().flat_map(
920 |(job_id, (fragments, ..))| {
921 let partial_graph_id = to_partial_graph_id(database_id, Some(*job_id));
922 fragments.values().map(move |fragment| {
923 (
924 fragment.fragment_id,
925 EdgeBuilderFragmentInfo::from_inflight(
926 fragment,
927 partial_graph_id,
928 control_stream_manager,
929 ),
930 )
931 })
932 },
933 )),
934 );
935 builder.add_relations(fragment_relations);
936 let mut edges = builder.build();
937
938 {
939 let new_actors =
940 edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
941 job.fragment_infos.values().map(move |fragment_infos| {
942 (
943 fragment_infos.fragment_id,
944 &fragment_infos.nodes,
945 fragment_infos.actors.iter().map(move |(actor_id, actor)| {
946 (
947 stream_actors.get(actor_id).expect("should exist"),
948 actor.worker_id,
949 )
950 }),
951 job.subscribers.keys().copied(),
952 )
953 })
954 }));
955
956 let nodes_actors =
957 InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
958 let database_job_source_splits =
959 collect_source_splits(database_jobs.values().flatten(), source_splits);
960 let database_backfill_orders =
961 UserDefinedFragmentBackfillOrder::merge(database_jobs.values().map(|job| {
962 if matches!(job.status, CreateStreamingJobStatus::Creating { .. }) {
963 job_backfill_orders(job_extra_info, job.job_id)
964 } else {
965 UserDefinedFragmentBackfillOrder::default()
966 }
967 }));
968 let database_backfill_orders =
969 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
970 database_backfill_orders,
971 fragment_relations,
972 || {
973 database_jobs.values().flat_map(|job_fragments| {
974 job_fragments
975 .fragment_infos
976 .iter()
977 .map(|(fragment_id, fragment)| {
978 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
979 })
980 })
981 },
982 );
983 let mutation = build_mutation(
984 &database_job_source_splits,
985 cdc_table_snapshot_split_assignment,
986 &database_backfill_orders,
987 is_paused,
988 );
989
990 let partial_graph_id = to_partial_graph_id(database_id, None);
991 self.recover_graph(
992 partial_graph_id,
993 mutation,
994 &barrier_info,
995 &nodes_actors,
996 InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
997 new_actors,
998 DatabaseCheckpointControlMetrics::new(database_id),
999 )?;
1000 debug!(
1001 %database_id,
1002 "inject initial barrier"
1003 );
1004 };
1005
1006 let mut creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl> =
1007 HashMap::new();
1008 for (job_id, (info, upstream_table_ids, committed_epoch, snapshot_epoch)) in
1009 ongoing_snapshot_backfill_jobs
1010 {
1011 let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
1012 (
1013 fragment_infos.fragment_id,
1014 &fragment_infos.nodes,
1015 fragment_infos.actors.iter().map(move |(actor_id, actor)| {
1016 (
1017 stream_actors.get(actor_id).expect("should exist"),
1018 actor.worker_id,
1019 )
1020 }),
1021 vec![], )
1023 }));
1024
1025 let database_job_source_splits =
1026 collect_source_splits(database_jobs.values().flatten(), source_splits);
1027 assert!(
1028 !cdc_table_snapshot_splits.contains_key(&job_id),
1029 "snapshot backfill job {job_id} should not have cdc backfill"
1030 );
1031 if is_paused {
1032 bail!("should not pause when having snapshot backfill job {job_id}");
1033 }
1034 let job_backfill_orders = job_backfill_orders(job_extra_info, job_id);
1035 let job_backfill_orders =
1036 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1037 job_backfill_orders,
1038 fragment_relations,
1039 || {
1040 info.iter().map(|(fragment_id, fragment)| {
1041 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
1042 })
1043 },
1044 );
1045 let mutation = build_mutation(
1046 &database_job_source_splits,
1047 Default::default(), &job_backfill_orders,
1049 false,
1050 );
1051
1052 let job = CreatingStreamingJobControl::recover(
1053 database_id,
1054 job_id,
1055 upstream_table_ids,
1056 &database_job_log_epochs,
1057 snapshot_epoch,
1058 committed_epoch,
1059 &barrier_info,
1060 info,
1061 job_backfill_orders,
1062 fragment_relations,
1063 hummock_version_stats,
1064 node_actors,
1065 mutation.clone(),
1066 self,
1067 )?;
1068 creating_streaming_job_controls.insert(job_id, job);
1069 }
1070
1071 self.control_stream_manager()
1072 .env
1073 .shared_actor_infos()
1074 .recover_database(
1075 database_id,
1076 database_jobs
1077 .values()
1078 .flat_map(|info| {
1079 info.fragment_infos()
1080 .map(move |fragment| (fragment, info.job_id))
1081 })
1082 .chain(
1083 creating_streaming_job_controls
1084 .values()
1085 .flat_map(|job| job.fragment_infos_with_job_id()),
1086 ),
1087 );
1088
1089 let committed_epoch = barrier_info.prev_epoch();
1090 let new_epoch = barrier_info.curr_epoch;
1091 let database_info = InflightDatabaseInfo::recover(
1092 database_id,
1093 database_jobs.into_values(),
1094 self.control_stream_manager()
1095 .env
1096 .shared_actor_infos()
1097 .clone(),
1098 );
1099 let database_state = BarrierWorkerState::recovery(new_epoch, is_paused);
1100 Ok(DatabaseCheckpointControl::recovery(
1101 database_id,
1102 database_state,
1103 committed_epoch,
1104 database_info,
1105 creating_streaming_job_controls,
1106 ))
1107 }
1108}
1109
1110impl ControlStreamManager {
1111 fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
1112 self.workers
1113 .iter()
1114 .filter_map(|(worker_id, (_, worker_state))| match worker_state {
1115 WorkerNodeState::Connected { control_stream, .. } => {
1116 Some((*worker_id, control_stream))
1117 }
1118 WorkerNodeState::Reconnecting(_) => None,
1119 })
1120 }
1121
1122 pub(super) fn inject_barrier(
1123 &mut self,
1124 partial_graph_id: PartialGraphId,
1125 mutation: Option<Mutation>,
1126 barrier_info: &BarrierInfo,
1127 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
1128 table_ids_to_sync: impl Iterator<Item = TableId>,
1129 nodes_to_sync_table: impl Iterator<Item = WorkerId>,
1130 mut new_actors: Option<StreamJobActorsToCreate>,
1131 ) -> MetaResult<NodeToCollect> {
1132 fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1133 "inject_barrier_err"
1134 ));
1135
1136 let nodes_to_sync_table: HashSet<_> = nodes_to_sync_table.collect();
1137
1138 nodes_to_sync_table.iter().for_each(|worker_id| {
1139 assert!(node_actors.contains_key(worker_id), "worker_id {worker_id} in nodes_to_sync_table {nodes_to_sync_table:?} but not in node_actors {node_actors:?}");
1140 });
1141
1142 let mut node_need_collect = NodeToCollect::new();
1143 let table_ids_to_sync = table_ids_to_sync.collect_vec();
1144
1145 node_actors.iter()
1146 .try_for_each(|(worker_id, actor_ids_to_collect)| {
1147 assert!(!actor_ids_to_collect.is_empty(), "empty actor_ids_to_collect on worker {worker_id} in node_actors {node_actors:?}");
1148 let table_ids_to_sync = if nodes_to_sync_table.contains(worker_id) {
1149 table_ids_to_sync.clone()
1150 } else {
1151 vec![]
1152 };
1153
1154 let node = if let Some((_, worker_state)) = self.workers.get(worker_id)
1155 &&
1156 let WorkerNodeState::Connected { control_stream, .. } = worker_state
1157 {
1158 control_stream
1159 } else {
1160 return Err(anyhow!("unconnected worker node {}", worker_id).into());
1161 };
1162
1163 {
1164 let mutation = mutation.clone();
1165 let barrier = Barrier {
1166 epoch: Some(risingwave_pb::data::Epoch {
1167 curr: barrier_info.curr_epoch(),
1168 prev: barrier_info.prev_epoch(),
1169 }),
1170 mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1171 tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1172 .to_protobuf(),
1173 kind: barrier_info.kind.to_protobuf() as i32,
1174 };
1175
1176 node.handle
1177 .request_sender
1178 .send(StreamingControlStreamRequest {
1179 request: Some(
1180 streaming_control_stream_request::Request::InjectBarrier(
1181 InjectBarrierRequest {
1182 request_id: Uuid::new_v4().to_string(),
1183 barrier: Some(barrier),
1184 actor_ids_to_collect: actor_ids_to_collect.iter().copied().collect(),
1185 table_ids_to_sync,
1186 partial_graph_id,
1187 actors_to_build: new_actors
1188 .as_mut()
1189 .map(|new_actors| new_actors.remove(worker_id))
1190 .into_iter()
1191 .flatten()
1192 .flatten()
1193 .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1194 FragmentBuildActorInfo {
1195 fragment_id,
1196 node: Some(node),
1197 actors: actors
1198 .into_iter()
1199 .map(|(actor, upstreams, dispatchers)| {
1200 BuildActorInfo {
1201 actor_id: actor.actor_id,
1202 fragment_upstreams: upstreams
1203 .into_iter()
1204 .map(|(fragment_id, upstreams)| {
1205 (
1206 fragment_id,
1207 UpstreamActors {
1208 actors: upstreams
1209 .into_values()
1210 .collect(),
1211 },
1212 )
1213 })
1214 .collect(),
1215 dispatchers,
1216 vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1217 mview_definition: actor.mview_definition,
1218 expr_context: actor.expr_context,
1219 config_override: actor.config_override.to_string(),
1220 initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1221 }
1222 })
1223 .collect(),
1224 }
1225 })
1226 .collect(),
1227 },
1228 ),
1229 ),
1230 })
1231 .map_err(|_| {
1232 MetaError::from(anyhow!(
1233 "failed to send request to {} {:?}",
1234 node.worker_id,
1235 node.host
1236 ))
1237 })?;
1238
1239 node_need_collect.insert(*worker_id);
1240 Result::<_, MetaError>::Ok(())
1241 }
1242 })
1243 .inspect_err(|e| {
1244 use risingwave_pb::meta::event_log;
1246 let event = event_log::EventInjectBarrierFail {
1247 prev_epoch: barrier_info.prev_epoch(),
1248 cur_epoch: barrier_info.curr_epoch(),
1249 error: e.to_report_string(),
1250 };
1251 self.env
1252 .event_log_manager_ref()
1253 .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1254 })?;
1255 Ok(node_need_collect)
1256 }
1257
1258 pub(super) fn add_partial_graph(&mut self, partial_graph_id: PartialGraphId) {
1259 self.connected_workers().for_each(|(_, node)| {
1260 if node
1261 .handle
1262 .request_sender
1263 .send(StreamingControlStreamRequest {
1264 request: Some(
1265 streaming_control_stream_request::Request::CreatePartialGraph(
1266 CreatePartialGraphRequest {
1267 partial_graph_id,
1268 },
1269 ),
1270 ),
1271 }).is_err() {
1272 let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
1273 warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1274 }
1275 });
1276 }
1277
1278 pub(super) fn remove_partial_graphs(&mut self, partial_graph_ids: Vec<PartialGraphId>) {
1279 self.connected_workers().for_each(|(_, node)| {
1280 if node.handle
1281 .request_sender
1282 .send(StreamingControlStreamRequest {
1283 request: Some(
1284 streaming_control_stream_request::Request::RemovePartialGraph(
1285 RemovePartialGraphRequest {
1286 partial_graph_ids: partial_graph_ids.clone(),
1287 },
1288 ),
1289 ),
1290 })
1291 .is_err()
1292 {
1293 warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1294 }
1295 })
1296 }
1297
1298 pub(super) fn reset_partial_graphs(
1299 &mut self,
1300 partial_graph_ids: Vec<PartialGraphId>,
1301 ) -> HashSet<WorkerId> {
1302 self.connected_workers()
1303 .filter_map(|(worker_id, node)| {
1304 if node
1305 .handle
1306 .request_sender
1307 .send(StreamingControlStreamRequest {
1308 request: Some(
1309 streaming_control_stream_request::Request::ResetPartialGraphs(
1310 ResetPartialGraphsRequest {
1311 partial_graph_ids: partial_graph_ids.clone(),
1312 },
1313 ),
1314 ),
1315 })
1316 .is_err()
1317 {
1318 warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1319 None
1320 } else {
1321 Some(worker_id)
1322 }
1323 })
1324 .collect()
1325 }
1326}
1327
1328impl GlobalBarrierWorkerContextImpl {
1329 pub(super) async fn new_control_stream_impl(
1330 &self,
1331 node: &WorkerNode,
1332 init_request: &PbInitRequest,
1333 ) -> MetaResult<StreamingControlHandle> {
1334 let handle = self
1335 .env
1336 .stream_client_pool()
1337 .get(node)
1338 .await?
1339 .start_streaming_control(init_request.clone())
1340 .await?;
1341 Ok(handle)
1342 }
1343}
1344
1345pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1346 message: &str,
1347 errors: impl IntoIterator<Item = (WorkerId, E)>,
1348) -> MetaError {
1349 use std::fmt::Write;
1350
1351 use risingwave_common::error::error_request_copy;
1352 use risingwave_common::error::tonic::extra::Score;
1353
1354 let errors = errors.into_iter().collect_vec();
1355
1356 if errors.is_empty() {
1357 return anyhow!(message.to_owned()).into();
1358 }
1359
1360 let single_error = |(worker_id, e)| {
1362 anyhow::Error::from(e)
1363 .context(format!("{message}, in worker node {worker_id}"))
1364 .into()
1365 };
1366
1367 if errors.len() == 1 {
1368 return single_error(errors.into_iter().next().unwrap());
1369 }
1370
1371 let max_score = errors
1373 .iter()
1374 .filter_map(|(_, e)| error_request_copy::<Score>(e))
1375 .max();
1376
1377 if let Some(max_score) = max_score {
1378 let mut errors = errors;
1379 let max_scored = errors
1380 .extract_if(.., |(_, e)| {
1381 error_request_copy::<Score>(e) == Some(max_score)
1382 })
1383 .next()
1384 .unwrap();
1385
1386 return single_error(max_scored);
1387 }
1388
1389 let concat: String = errors
1391 .into_iter()
1392 .fold(format!("{message}: "), |mut s, (w, e)| {
1393 write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1394 s
1395 });
1396 anyhow!(concat).into()
1397}
1398
1399#[cfg(test)]
1400mod test_partial_graph_id {
1401 use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
1402
1403 #[test]
1404 fn test_partial_graph_id_conversion() {
1405 let database_id = 233.into();
1406 let job_id = 233.into();
1407 assert_eq!(
1408 (database_id, None),
1409 from_partial_graph_id(to_partial_graph_id(database_id, None))
1410 );
1411 assert_eq!(
1412 (database_id, Some(job_id)),
1413 from_partial_graph_id(to_partial_graph_id(database_id, Some(job_id)))
1414 );
1415 }
1416}