1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::bail;
30use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
31use risingwave_common::id::JobId;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
34use risingwave_common::util::tracing::TracingContext;
35use risingwave_connector::source::SplitImpl;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::common::{HostAddress, WorkerNode};
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::id::PartialGraphId;
40use risingwave_pb::source::{PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration};
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::stream_node::NodeBody;
43use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
44use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
45use risingwave_pb::stream_service::inject_barrier_request::{
46 BuildActorInfo, FragmentBuildActorInfo,
47};
48use risingwave_pb::stream_service::streaming_control_stream_request::{
49 CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
50 RemovePartialGraphRequest, ResetPartialGraphsRequest,
51};
52use risingwave_pb::stream_service::{
53 InjectBarrierRequest, StreamingControlStreamRequest, streaming_control_stream_request,
54 streaming_control_stream_response,
55};
56use risingwave_rpc_client::StreamingControlHandle;
57use thiserror_ext::AsReport;
58use tokio::time::{Instant, sleep};
59use tokio_retry::strategy::ExponentialBackoff;
60use tracing::{debug, error, info, warn};
61use uuid::Uuid;
62
63use super::{BarrierKind, TracedEpoch};
64use crate::barrier::BackfillOrderState;
65use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
66use crate::barrier::cdc_progress::CdcTableBackfillTracker;
67use crate::barrier::checkpoint::{
68 BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
69};
70use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
71use crate::barrier::edge_builder::FragmentEdgeBuilder;
72use crate::barrier::info::{
73 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
74 SubscriberType,
75};
76use crate::barrier::partial_graph::PartialGraphRecoverer;
77use crate::barrier::progress::CreateMviewProgressTracker;
78use crate::barrier::utils::NodeToCollect;
79use crate::controller::fragment::InflightFragmentInfo;
80use crate::controller::utils::StreamingJobExtraInfo;
81use crate::manager::MetaSrvEnv;
82use crate::model::{
83 ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
84 SubscriptionId,
85};
86use crate::stream::cdc::{
87 CdcTableSnapshotSplits, is_parallelized_backfill_enabled_cdc_scan_fragment,
88};
89use crate::stream::{
90 ExtendedFragmentBackfillOrder, StreamFragmentGraph, UserDefinedFragmentBackfillOrder,
91 build_actor_connector_splits,
92};
93use crate::{MetaError, MetaResult};
94
95pub(super) fn to_partial_graph_id(
96 database_id: DatabaseId,
97 creating_job_id: Option<JobId>,
98) -> PartialGraphId {
99 let raw_job_id = creating_job_id
100 .map(|job_id| {
101 assert_ne!(job_id, u32::MAX);
102 job_id.as_raw_id()
103 })
104 .unwrap_or(u32::MAX);
105 (((database_id.as_raw_id() as u64) << 32) | (raw_job_id as u64)).into()
106}
107
108pub(super) fn from_partial_graph_id(
109 partial_graph_id: PartialGraphId,
110) -> (DatabaseId, Option<JobId>) {
111 let id = partial_graph_id.as_raw_id();
112 let database_id = (id >> 32) as u32;
113 let raw_creating_job_id = (id & ((1 << 32) - 1)) as u32;
114 let creating_job_id = if raw_creating_job_id == u32::MAX {
115 None
116 } else {
117 Some(JobId::new(raw_creating_job_id))
118 };
119 (database_id.into(), creating_job_id)
120}
121
122pub(super) fn build_locality_fragment_state_table_mapping(
123 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
124) -> HashMap<FragmentId, Vec<TableId>> {
125 let mut mapping = HashMap::new();
126
127 for (fragment_id, fragment_info) in fragment_infos {
128 let mut state_table_ids = Vec::new();
129 visit_stream_node_cont(&fragment_info.nodes, |stream_node| {
130 if let Some(NodeBody::LocalityProvider(locality_provider)) =
131 stream_node.node_body.as_ref()
132 {
133 let state_table_id = locality_provider
134 .state_table
135 .as_ref()
136 .expect("must have state table")
137 .id;
138 state_table_ids.push(state_table_id);
139 false
140 } else {
141 true
142 }
143 });
144 if !state_table_ids.is_empty() {
145 mapping.insert(*fragment_id, state_table_ids);
146 }
147 }
148
149 mapping
150}
151
152pub(super) fn database_partial_graphs<'a>(
153 database_id: DatabaseId,
154 creating_jobs: impl Iterator<Item = JobId> + Sized + 'a,
155) -> impl Iterator<Item = PartialGraphId> + 'a {
156 creating_jobs
157 .map(Some)
158 .chain([None])
159 .map(move |creating_job_id| to_partial_graph_id(database_id, creating_job_id))
160}
161
162struct ControlStreamNode {
163 worker_id: WorkerId,
164 host: HostAddress,
165 handle: StreamingControlHandle,
166}
167
168enum WorkerNodeState {
169 Connected {
170 control_stream: ControlStreamNode,
171 removed: bool,
172 },
173 Reconnecting(BoxFuture<'static, StreamingControlHandle>),
174}
175
176pub(super) struct ControlStreamManager {
177 workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
178 pub env: MetaSrvEnv,
179}
180
181impl ControlStreamManager {
182 pub(super) fn new(env: MetaSrvEnv) -> Self {
183 Self {
184 workers: Default::default(),
185 env,
186 }
187 }
188
189 pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
190 self.workers[&worker_id].0.host.clone().unwrap()
191 }
192
193 pub(super) async fn add_worker(
194 &mut self,
195 node: WorkerNode,
196 partial_graphs: impl Iterator<Item = PartialGraphId>,
197 term_id: &String,
198 context: &impl GlobalBarrierWorkerContext,
199 ) {
200 let node_id = node.id;
201 if let Entry::Occupied(entry) = self.workers.entry(node_id) {
202 let (existing_node, worker_state) = entry.get();
203 assert_eq!(existing_node.host, node.host);
204 warn!(id = %node.id, host = ?node.host, "node already exists");
205 match worker_state {
206 WorkerNodeState::Connected { .. } => {
207 warn!(id = %node.id, host = ?node.host, "new node already connected");
208 return;
209 }
210 WorkerNodeState::Reconnecting(_) => {
211 warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
212 entry.remove();
213 }
214 }
215 }
216 let node_host = node.host.clone().unwrap();
217 let mut backoff = ExponentialBackoff::from_millis(100)
218 .max_delay(Duration::from_secs(3))
219 .factor(5);
220 const MAX_RETRY: usize = 5;
221 for i in 1..=MAX_RETRY {
222 match context
223 .new_control_stream(
224 &node,
225 &PbInitRequest {
226 term_id: term_id.clone(),
227 },
228 )
229 .await
230 {
231 Ok(mut handle) => {
232 WorkerNodeConnected {
233 handle: &mut handle,
234 node: &node,
235 }
236 .initialize(partial_graphs);
237 info!(?node_host, "add control stream worker");
238 assert!(
239 self.workers
240 .insert(
241 node_id,
242 (
243 node,
244 WorkerNodeState::Connected {
245 control_stream: ControlStreamNode {
246 worker_id: node_id as _,
247 host: node_host,
248 handle,
249 },
250 removed: false
251 }
252 )
253 )
254 .is_none()
255 );
256 return;
257 }
258 Err(e) => {
259 let delay = backoff.next().unwrap();
262 error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
263 sleep(delay).await;
264 }
265 }
266 }
267 error!(?node_host, "fail to create worker node after retry");
268 }
269
270 pub(super) fn remove_worker(&mut self, node: WorkerNode) {
271 if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
272 let (_, worker_state) = entry.get_mut();
273 match worker_state {
274 WorkerNodeState::Connected { removed, .. } => {
275 info!(worker_id = %node.id, "mark connected worker as removed");
276 *removed = true;
277 }
278 WorkerNodeState::Reconnecting(_) => {
279 info!(worker_id = %node.id, "remove worker");
280 entry.remove();
281 }
282 }
283 }
284 }
285
286 fn retry_connect(
287 node: WorkerNode,
288 term_id: String,
289 context: Arc<impl GlobalBarrierWorkerContext>,
290 ) -> BoxFuture<'static, StreamingControlHandle> {
291 async move {
292 let mut attempt = 0;
293 let backoff = ExponentialBackoff::from_millis(100)
294 .max_delay(Duration::from_mins(1))
295 .factor(5);
296 let init_request = PbInitRequest { term_id };
297 for delay in backoff {
298 attempt += 1;
299 sleep(delay).await;
300 match context.new_control_stream(&node, &init_request).await {
301 Ok(handle) => {
302 return handle;
303 }
304 Err(e) => {
305 warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
306 }
307 }
308 }
309 unreachable!("end of retry backoff")
310 }.boxed()
311 }
312
313 pub(super) async fn recover(
314 env: MetaSrvEnv,
315 nodes: &HashMap<WorkerId, WorkerNode>,
316 term_id: &str,
317 context: Arc<impl GlobalBarrierWorkerContext>,
318 ) -> Self {
319 let reset_start_time = Instant::now();
320 let init_request = PbInitRequest {
321 term_id: term_id.to_owned(),
322 };
323 let init_request = &init_request;
324 let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
325 let result = context.new_control_stream(node, init_request).await;
326 (*worker_id, node.clone(), result)
327 }))
328 .await;
329 let mut unconnected_workers = HashSet::new();
330 let mut workers = HashMap::new();
331 for (worker_id, node, result) in nodes {
332 match result {
333 Ok(handle) => {
334 let control_stream = ControlStreamNode {
335 worker_id: node.id,
336 host: node.host.clone().unwrap(),
337 handle,
338 };
339 assert!(
340 workers
341 .insert(
342 worker_id,
343 (
344 node,
345 WorkerNodeState::Connected {
346 control_stream,
347 removed: false
348 }
349 )
350 )
351 .is_none()
352 );
353 }
354 Err(e) => {
355 unconnected_workers.insert(worker_id);
356 warn!(
357 e = %e.as_report(),
358 %worker_id,
359 ?node,
360 "failed to connect to node"
361 );
362 assert!(
363 workers
364 .insert(
365 worker_id,
366 (
367 node.clone(),
368 WorkerNodeState::Reconnecting(Self::retry_connect(
369 node,
370 term_id.to_owned(),
371 context.clone()
372 ))
373 )
374 )
375 .is_none()
376 );
377 }
378 }
379 }
380
381 info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
382
383 Self { workers, env }
384 }
385
386 pub(super) fn clear(&mut self) {
388 *self = Self::new(self.env.clone());
389 }
390}
391
392pub(super) struct WorkerNodeConnected<'a> {
393 node: &'a WorkerNode,
394 handle: &'a mut StreamingControlHandle,
395}
396
397impl<'a> WorkerNodeConnected<'a> {
398 pub(super) fn initialize(self, partial_graphs: impl Iterator<Item = PartialGraphId>) {
399 for partial_graph_id in partial_graphs {
400 if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
401 request: Some(
402 streaming_control_stream_request::Request::CreatePartialGraph(
403 PbCreatePartialGraphRequest { partial_graph_id },
404 ),
405 ),
406 }) {
407 warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
408 }
409 }
410 }
411}
412
413pub(super) enum WorkerNodeEvent<'a> {
414 Response(MetaResult<streaming_control_stream_response::Response>),
415 Connected(WorkerNodeConnected<'a>),
416}
417
418impl ControlStreamManager {
419 fn poll_next_event<'a>(
420 this_opt: &mut Option<&'a mut Self>,
421 cx: &mut Context<'_>,
422 term_id: &str,
423 context: &Arc<impl GlobalBarrierWorkerContext>,
424 poll_reconnect: bool,
425 ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
426 let this = this_opt.as_mut().expect("Future polled after completion");
427 if this.workers.is_empty() {
428 return Poll::Pending;
429 }
430 {
431 for (&worker_id, (node, worker_state)) in &mut this.workers {
432 let control_stream = match worker_state {
433 WorkerNodeState::Connected { control_stream, .. } => control_stream,
434 WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
435 continue;
436 }
437 WorkerNodeState::Reconnecting(join_handle) => {
438 match join_handle.poll_unpin(cx) {
439 Poll::Ready(handle) => {
440 info!(id=%node.id, host=?node.host, "reconnected to worker");
441 *worker_state = WorkerNodeState::Connected {
442 control_stream: ControlStreamNode {
443 worker_id: node.id,
444 host: node.host.clone().unwrap(),
445 handle,
446 },
447 removed: false,
448 };
449 let this = this_opt.take().expect("should exist");
450 let (node, worker_state) =
451 this.workers.get_mut(&worker_id).expect("should exist");
452 let WorkerNodeState::Connected { control_stream, .. } =
453 worker_state
454 else {
455 unreachable!()
456 };
457 return Poll::Ready((
458 worker_id,
459 WorkerNodeEvent::Connected(WorkerNodeConnected {
460 node,
461 handle: &mut control_stream.handle,
462 }),
463 ));
464 }
465 Poll::Pending => {
466 continue;
467 }
468 }
469 }
470 };
471 match control_stream.handle.response_stream.poll_next_unpin(cx) {
472 Poll::Ready(result) => {
473 {
474 let result = result
475 .ok_or_else(|| (false, anyhow!("end of stream").into()))
476 .and_then(|result| {
477 result.map_err(|err| -> (bool, MetaError) { (false, err.into()) }).and_then(|resp| {
478 match resp
479 .response
480 .ok_or_else(|| (false, anyhow!("empty response").into()))?
481 {
482 streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
483 "worker node {worker_id} is shutting down"
484 )
485 .into())),
486 streaming_control_stream_response::Response::Init(_) => {
487 Err((false, anyhow!("get unexpected init response").into()))
489 }
490 resp => {
491 if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
492 assert_eq!(worker_id, barrier_resp.worker_id);
493 }
494 Ok(resp)
495 }
496 }
497 })
498 });
499 let result = match result {
500 Ok(resp) => Ok(resp),
501 Err((shutdown, err)) => {
502 warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
503 let WorkerNodeState::Connected { removed, .. } = worker_state
504 else {
505 unreachable!("checked connected")
506 };
507 if *removed || shutdown {
508 this.workers.remove(&worker_id);
509 } else {
510 *worker_state = WorkerNodeState::Reconnecting(
511 ControlStreamManager::retry_connect(
512 node.clone(),
513 term_id.to_owned(),
514 context.clone(),
515 ),
516 );
517 }
518 Err(err)
519 }
520 };
521 return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
522 }
523 }
524 Poll::Pending => {
525 continue;
526 }
527 }
528 }
529 };
530
531 Poll::Pending
532 }
533
534 #[await_tree::instrument("control_stream_next_event")]
535 pub(super) async fn next_event<'a>(
536 &'a mut self,
537 term_id: &str,
538 context: &Arc<impl GlobalBarrierWorkerContext>,
539 ) -> (WorkerId, WorkerNodeEvent<'a>) {
540 let mut this = Some(self);
541 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
542 }
543
544 #[await_tree::instrument("control_stream_next_response")]
545 pub(super) async fn next_response(
546 &mut self,
547 term_id: &str,
548 context: &Arc<impl GlobalBarrierWorkerContext>,
549 ) -> (
550 WorkerId,
551 MetaResult<streaming_control_stream_response::Response>,
552 ) {
553 let mut this = Some(self);
554 let (worker_id, event) =
555 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
556 match event {
557 WorkerNodeEvent::Response(result) => (worker_id, result),
558 WorkerNodeEvent::Connected(_) => {
559 unreachable!("set poll_reconnect=false")
560 }
561 }
562 }
563}
564
565pub(super) struct DatabaseInitialBarrierCollector {
566 pub(super) database_id: DatabaseId,
567 pub(super) initializing_partial_graphs: HashSet<PartialGraphId>,
568 pub(super) database: DatabaseCheckpointControl,
569}
570
571impl Debug for DatabaseInitialBarrierCollector {
572 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
573 f.debug_struct("DatabaseInitialBarrierCollector")
574 .field("database_id", &self.database_id)
575 .field("initializing_graphs", &self.initializing_partial_graphs)
576 .finish()
577 }
578}
579
580impl DatabaseInitialBarrierCollector {
581 pub(super) fn is_collected(&self) -> bool {
582 self.initializing_partial_graphs.is_empty()
583 }
584
585 pub(super) fn partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
586 assert!(self.initializing_partial_graphs.remove(&partial_graph_id));
587 }
588
589 pub(super) fn all_partial_graphs(&self) -> impl Iterator<Item = PartialGraphId> + '_ {
590 database_partial_graphs(
591 self.database_id,
592 self.database
593 .creating_streaming_job_controls
594 .keys()
595 .copied(),
596 )
597 }
598
599 pub(super) fn finish(self) -> DatabaseCheckpointControl {
600 assert!(self.is_collected());
601 self.database
602 }
603
604 pub(super) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
605 self.database.is_valid_after_worker_err(worker_id)
606 }
607}
608
609impl PartialGraphRecoverer<'_> {
610 #[expect(clippy::too_many_arguments)]
612 pub(super) fn inject_database_initial_barrier(
613 &mut self,
614 database_id: DatabaseId,
615 jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
616 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
617 state_table_committed_epochs: &mut HashMap<TableId, u64>,
618 state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
619 fragment_relations: &FragmentDownstreamRelation,
620 stream_actors: &HashMap<ActorId, StreamActor>,
621 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
622 background_jobs: &mut HashSet<JobId>,
623 mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
624 is_paused: bool,
625 hummock_version_stats: &HummockVersionStats,
626 cdc_table_snapshot_splits: &mut HashMap<JobId, CdcTableSnapshotSplits>,
627 ) -> MetaResult<DatabaseCheckpointControl> {
628 fn collect_source_splits(
629 fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
630 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
631 ) -> HashMap<ActorId, Vec<SplitImpl>> {
632 fragment_infos
633 .flat_map(|info| info.actors.keys())
634 .filter_map(|actor_id| {
635 let actor_id = *actor_id as ActorId;
636 source_splits
637 .remove(&actor_id)
638 .map(|splits| (actor_id, splits))
639 })
640 .collect()
641 }
642 fn build_mutation(
643 splits: &HashMap<ActorId, Vec<SplitImpl>>,
644 cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits>,
645 backfill_orders: &ExtendedFragmentBackfillOrder,
646 is_paused: bool,
647 ) -> Mutation {
648 let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_orders)
649 .into_iter()
650 .collect();
651 Mutation::Add(AddMutation {
652 actor_dispatchers: Default::default(),
654 added_actors: Default::default(),
655 actor_splits: build_actor_connector_splits(splits),
656 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
657 splits: cdc_table_snapshot_split_assignment,
658 }),
659 pause: is_paused,
660 subscriptions_to_add: Default::default(),
661 backfill_nodes_to_pause,
662 new_upstream_sinks: Default::default(),
663 })
664 }
665
666 fn resolve_jobs_committed_epoch<'a>(
667 state_table_committed_epochs: &mut HashMap<TableId, u64>,
668 fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
669 ) -> u64 {
670 let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
671 (
672 table_id,
673 state_table_committed_epochs
674 .remove(&table_id)
675 .expect("should exist"),
676 )
677 });
678 let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
679 for (table_id, epoch) in epochs {
680 assert_eq!(
681 prev_epoch, epoch,
682 "{} has different committed epoch to {}",
683 first_table_id, table_id
684 );
685 }
686 prev_epoch
687 }
688 fn job_backfill_orders(
689 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
690 job_id: JobId,
691 ) -> UserDefinedFragmentBackfillOrder {
692 UserDefinedFragmentBackfillOrder::new(
693 job_extra_info
694 .get(&job_id)
695 .and_then(|info| info.backfill_orders.clone())
696 .map_or_else(HashMap::new, |orders| orders.0),
697 )
698 }
699
700 let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
701 .keys()
702 .filter_map(|job_id| {
703 mv_depended_subscriptions
704 .remove(&job_id.as_mv_table_id())
705 .map(|subscriptions| {
706 (
707 job_id.as_mv_table_id(),
708 subscriptions
709 .into_iter()
710 .map(|(subscription_id, retention)| {
711 (
712 subscription_id.as_subscriber_id(),
713 SubscriberType::Subscription(retention),
714 )
715 })
716 .collect(),
717 )
718 })
719 })
720 .collect();
721
722 let mut database_jobs = HashMap::new();
723 let mut snapshot_backfill_jobs = HashMap::new();
724
725 for (job_id, job_fragments) in jobs {
726 if background_jobs.remove(&job_id) {
727 if job_fragments.values().any(|fragment| {
728 fragment
729 .fragment_type_mask
730 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
731 }) {
732 debug!(%job_id, "recovered snapshot backfill job");
733 snapshot_backfill_jobs.insert(job_id, job_fragments);
734 } else {
735 database_jobs.insert(job_id, (job_fragments, true));
736 }
737 } else {
738 database_jobs.insert(job_id, (job_fragments, false));
739 }
740 }
741
742 let database_job_log_epochs: HashMap<_, _> = database_jobs
743 .keys()
744 .filter_map(|job_id| {
745 state_table_log_epochs
746 .remove(&job_id.as_mv_table_id())
747 .map(|epochs| (job_id.as_mv_table_id(), epochs))
748 })
749 .collect();
750
751 let prev_epoch = resolve_jobs_committed_epoch(
752 state_table_committed_epochs,
753 database_jobs.values().flat_map(|(job, _)| job.values()),
754 );
755 let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
756 let curr_epoch = prev_epoch.next();
758 let barrier_info = BarrierInfo {
759 prev_epoch,
760 curr_epoch,
761 kind: BarrierKind::Initial,
762 };
763
764 let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
765 for (job_id, fragment_infos) in snapshot_backfill_jobs {
766 let committed_epoch =
767 resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
768 if committed_epoch == barrier_info.prev_epoch() {
769 info!(
770 "recovered creating snapshot backfill job {} catch up with upstream already",
771 job_id
772 );
773 database_jobs
774 .try_insert(job_id, (fragment_infos, true))
775 .expect("non-duplicate");
776 continue;
777 }
778 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
779 fragment_infos
780 .values()
781 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
782 )?
783 .0
784 .ok_or_else(|| {
785 anyhow!(
786 "recovered snapshot backfill job {} has no snapshot backfill info",
787 job_id
788 )
789 })?;
790 let mut snapshot_epoch = None;
791 let upstream_table_ids: HashSet<_> = snapshot_backfill_info
792 .upstream_mv_table_id_to_backfill_epoch
793 .keys()
794 .cloned()
795 .collect();
796 for (upstream_table_id, epoch) in
797 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
798 {
799 let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
800 let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
801 if *snapshot_epoch != epoch {
802 return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
803 }
804 }
805 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
806 anyhow!(
807 "snapshot backfill job {} has not set snapshot epoch",
808 job_id
809 )
810 })?;
811 for upstream_table_id in &upstream_table_ids {
812 subscribers
813 .entry(*upstream_table_id)
814 .or_default()
815 .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
816 .expect("non-duplicate");
817 }
818 ongoing_snapshot_backfill_jobs
819 .try_insert(
820 job_id,
821 (
822 fragment_infos,
823 upstream_table_ids,
824 committed_epoch,
825 snapshot_epoch,
826 ),
827 )
828 .expect("non-duplicated");
829 }
830
831 let mut cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits> =
832 HashMap::new();
833
834 let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
835 database_jobs
836 .into_iter()
837 .map(|(job_id, (fragment_infos, is_background_creating))| {
838 let status = if is_background_creating {
839 let backfill_ordering = job_backfill_orders(job_extra_info, job_id);
840 let backfill_ordering = StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
841 backfill_ordering,
842 fragment_relations,
843 || fragment_infos.iter().map(|(fragment_id, fragment)| {
844 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
845 }));
846 let locality_fragment_state_table_mapping =
847 build_locality_fragment_state_table_mapping(&fragment_infos);
848 let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
849 &backfill_ordering,
850 &fragment_infos,
851 locality_fragment_state_table_mapping,
852 );
853 CreateStreamingJobStatus::Creating {
854 tracker: CreateMviewProgressTracker::recover(
855 job_id,
856 &fragment_infos,
857 backfill_order_state,
858 hummock_version_stats,
859 ),
860 }
861 } else {
862 CreateStreamingJobStatus::Created
863 };
864 let cdc_table_backfill_tracker =
865 if let Some(splits) = cdc_table_snapshot_splits.remove(&job_id) {
866 let cdc_fragment = fragment_infos
867 .values()
868 .find(|fragment| {
869 is_parallelized_backfill_enabled_cdc_scan_fragment(
870 fragment.fragment_type_mask,
871 &fragment.nodes,
872 )
873 .is_some()
874 })
875 .expect("should have parallel cdc fragment");
876 let cdc_actors = cdc_fragment.actors.keys().copied().collect();
877 let mut tracker =
878 CdcTableBackfillTracker::restore(cdc_fragment.fragment_id, splits);
879 cdc_table_snapshot_split_assignment
880 .extend(tracker.reassign_splits(cdc_actors)?);
881 Some(tracker)
882 } else {
883 None
884 };
885 Ok((
886 job_id,
887 InflightStreamingJobInfo {
888 job_id,
889 fragment_infos,
890 subscribers: subscribers
891 .remove(&job_id.as_mv_table_id())
892 .unwrap_or_default(),
893 status,
894 cdc_table_backfill_tracker,
895 },
896 ))
897 })
898 .try_collect::<_, _, MetaError>()
899 }?;
900
901 let mut builder = FragmentEdgeBuilder::new(
902 database_jobs
903 .values()
904 .flat_map(|job| {
905 job.fragment_infos()
906 .map(|info| (info, to_partial_graph_id(database_id, None)))
907 })
908 .chain(ongoing_snapshot_backfill_jobs.iter().flat_map(
909 |(job_id, (fragments, ..))| {
910 let partial_graph_id = to_partial_graph_id(database_id, Some(*job_id));
911 fragments
912 .values()
913 .map(move |fragment| (fragment, partial_graph_id))
914 },
915 )),
916 self.control_stream_manager(),
917 );
918 builder.add_relations(fragment_relations);
919 let mut edges = builder.build();
920
921 {
922 let new_actors =
923 edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
924 job.fragment_infos.values().map(move |fragment_infos| {
925 (
926 fragment_infos.fragment_id,
927 &fragment_infos.nodes,
928 fragment_infos.actors.iter().map(move |(actor_id, actor)| {
929 (
930 stream_actors.get(actor_id).expect("should exist"),
931 actor.worker_id,
932 )
933 }),
934 job.subscribers.keys().copied(),
935 )
936 })
937 }));
938
939 let nodes_actors =
940 InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
941 let database_job_source_splits =
942 collect_source_splits(database_jobs.values().flatten(), source_splits);
943 let database_backfill_orders =
944 UserDefinedFragmentBackfillOrder::merge(database_jobs.values().map(|job| {
945 if matches!(job.status, CreateStreamingJobStatus::Creating { .. }) {
946 job_backfill_orders(job_extra_info, job.job_id)
947 } else {
948 UserDefinedFragmentBackfillOrder::default()
949 }
950 }));
951 let database_backfill_orders =
952 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
953 database_backfill_orders,
954 fragment_relations,
955 || {
956 database_jobs.values().flat_map(|job_fragments| {
957 job_fragments
958 .fragment_infos
959 .iter()
960 .map(|(fragment_id, fragment)| {
961 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
962 })
963 })
964 },
965 );
966 let mutation = build_mutation(
967 &database_job_source_splits,
968 cdc_table_snapshot_split_assignment,
969 &database_backfill_orders,
970 is_paused,
971 );
972
973 let partial_graph_id = to_partial_graph_id(database_id, None);
974 self.recover_graph(
975 partial_graph_id,
976 mutation,
977 &barrier_info,
978 &nodes_actors,
979 InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
980 new_actors,
981 )?;
982 debug!(
983 %database_id,
984 "inject initial barrier"
985 );
986 };
987
988 let mut creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl> =
989 HashMap::new();
990 for (job_id, (info, upstream_table_ids, committed_epoch, snapshot_epoch)) in
991 ongoing_snapshot_backfill_jobs
992 {
993 let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
994 (
995 fragment_infos.fragment_id,
996 &fragment_infos.nodes,
997 fragment_infos.actors.iter().map(move |(actor_id, actor)| {
998 (
999 stream_actors.get(actor_id).expect("should exist"),
1000 actor.worker_id,
1001 )
1002 }),
1003 vec![], )
1005 }));
1006
1007 let database_job_source_splits =
1008 collect_source_splits(database_jobs.values().flatten(), source_splits);
1009 assert!(
1010 !cdc_table_snapshot_splits.contains_key(&job_id),
1011 "snapshot backfill job {job_id} should not have cdc backfill"
1012 );
1013 if is_paused {
1014 bail!("should not pause when having snapshot backfill job {job_id}");
1015 }
1016 let job_backfill_orders = job_backfill_orders(job_extra_info, job_id);
1017 let job_backfill_orders =
1018 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1019 job_backfill_orders,
1020 fragment_relations,
1021 || {
1022 info.iter().map(|(fragment_id, fragment)| {
1023 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
1024 })
1025 },
1026 );
1027 let mutation = build_mutation(
1028 &database_job_source_splits,
1029 Default::default(), &job_backfill_orders,
1031 false,
1032 );
1033
1034 let job = CreatingStreamingJobControl::recover(
1035 database_id,
1036 job_id,
1037 upstream_table_ids,
1038 &database_job_log_epochs,
1039 snapshot_epoch,
1040 committed_epoch,
1041 &barrier_info,
1042 info,
1043 job_backfill_orders,
1044 fragment_relations,
1045 hummock_version_stats,
1046 node_actors,
1047 mutation.clone(),
1048 self,
1049 )?;
1050 creating_streaming_job_controls.insert(job_id, job);
1051 }
1052
1053 self.control_stream_manager()
1054 .env
1055 .shared_actor_infos()
1056 .recover_database(
1057 database_id,
1058 database_jobs
1059 .values()
1060 .flat_map(|info| {
1061 info.fragment_infos()
1062 .map(move |fragment| (fragment, info.job_id))
1063 })
1064 .chain(
1065 creating_streaming_job_controls
1066 .values()
1067 .flat_map(|job| job.fragment_infos_with_job_id()),
1068 ),
1069 );
1070
1071 let committed_epoch = barrier_info.prev_epoch();
1072 let new_epoch = barrier_info.curr_epoch;
1073 let database_info = InflightDatabaseInfo::recover(
1074 database_id,
1075 database_jobs.into_values(),
1076 self.control_stream_manager()
1077 .env
1078 .shared_actor_infos()
1079 .clone(),
1080 );
1081 let database_state = BarrierWorkerState::recovery(new_epoch, is_paused);
1082 Ok(DatabaseCheckpointControl::recovery(
1083 database_id,
1084 database_state,
1085 committed_epoch,
1086 database_info,
1087 creating_streaming_job_controls,
1088 ))
1089 }
1090}
1091
1092impl ControlStreamManager {
1093 fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
1094 self.workers
1095 .iter()
1096 .filter_map(|(worker_id, (_, worker_state))| match worker_state {
1097 WorkerNodeState::Connected { control_stream, .. } => {
1098 Some((*worker_id, control_stream))
1099 }
1100 WorkerNodeState::Reconnecting(_) => None,
1101 })
1102 }
1103
1104 pub(super) fn inject_barrier(
1105 &mut self,
1106 partial_graph_id: PartialGraphId,
1107 mutation: Option<Mutation>,
1108 barrier_info: &BarrierInfo,
1109 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
1110 table_ids_to_sync: impl Iterator<Item = TableId>,
1111 nodes_to_sync_table: impl Iterator<Item = WorkerId>,
1112 mut new_actors: Option<StreamJobActorsToCreate>,
1113 ) -> MetaResult<NodeToCollect> {
1114 fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1115 "inject_barrier_err"
1116 ));
1117
1118 let nodes_to_sync_table: HashSet<_> = nodes_to_sync_table.collect();
1119
1120 nodes_to_sync_table.iter().for_each(|worker_id| {
1121 assert!(node_actors.contains_key(worker_id), "worker_id {worker_id} in nodes_to_sync_table {nodes_to_sync_table:?} but not in node_actors {node_actors:?}");
1122 });
1123
1124 let mut node_need_collect = NodeToCollect::new();
1125 let table_ids_to_sync = table_ids_to_sync.collect_vec();
1126
1127 node_actors.iter()
1128 .try_for_each(|(worker_id, actor_ids_to_collect)| {
1129 assert!(!actor_ids_to_collect.is_empty(), "empty actor_ids_to_collect on worker {worker_id} in node_actors {node_actors:?}");
1130 let table_ids_to_sync = if nodes_to_sync_table.contains(worker_id) {
1131 table_ids_to_sync.clone()
1132 } else {
1133 vec![]
1134 };
1135
1136 let node = if let Some((_, worker_state)) = self.workers.get(worker_id)
1137 &&
1138 let WorkerNodeState::Connected { control_stream, .. } = worker_state
1139 {
1140 control_stream
1141 } else {
1142 return Err(anyhow!("unconnected worker node {}", worker_id).into());
1143 };
1144
1145 {
1146 let mutation = mutation.clone();
1147 let barrier = Barrier {
1148 epoch: Some(risingwave_pb::data::Epoch {
1149 curr: barrier_info.curr_epoch(),
1150 prev: barrier_info.prev_epoch(),
1151 }),
1152 mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1153 tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1154 .to_protobuf(),
1155 kind: barrier_info.kind.to_protobuf() as i32,
1156 };
1157
1158 node.handle
1159 .request_sender
1160 .send(StreamingControlStreamRequest {
1161 request: Some(
1162 streaming_control_stream_request::Request::InjectBarrier(
1163 InjectBarrierRequest {
1164 request_id: Uuid::new_v4().to_string(),
1165 barrier: Some(barrier),
1166 actor_ids_to_collect: actor_ids_to_collect.iter().copied().collect(),
1167 table_ids_to_sync,
1168 partial_graph_id,
1169 actors_to_build: new_actors
1170 .as_mut()
1171 .map(|new_actors| new_actors.remove(worker_id))
1172 .into_iter()
1173 .flatten()
1174 .flatten()
1175 .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1176 FragmentBuildActorInfo {
1177 fragment_id,
1178 node: Some(node),
1179 actors: actors
1180 .into_iter()
1181 .map(|(actor, upstreams, dispatchers)| {
1182 BuildActorInfo {
1183 actor_id: actor.actor_id,
1184 fragment_upstreams: upstreams
1185 .into_iter()
1186 .map(|(fragment_id, upstreams)| {
1187 (
1188 fragment_id,
1189 UpstreamActors {
1190 actors: upstreams
1191 .into_values()
1192 .collect(),
1193 },
1194 )
1195 })
1196 .collect(),
1197 dispatchers,
1198 vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1199 mview_definition: actor.mview_definition,
1200 expr_context: actor.expr_context,
1201 config_override: actor.config_override.to_string(),
1202 initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1203 }
1204 })
1205 .collect(),
1206 }
1207 })
1208 .collect(),
1209 },
1210 ),
1211 ),
1212 })
1213 .map_err(|_| {
1214 MetaError::from(anyhow!(
1215 "failed to send request to {} {:?}",
1216 node.worker_id,
1217 node.host
1218 ))
1219 })?;
1220
1221 node_need_collect.insert(*worker_id);
1222 Result::<_, MetaError>::Ok(())
1223 }
1224 })
1225 .inspect_err(|e| {
1226 use risingwave_pb::meta::event_log;
1228 let event = event_log::EventInjectBarrierFail {
1229 prev_epoch: barrier_info.prev_epoch(),
1230 cur_epoch: barrier_info.curr_epoch(),
1231 error: e.to_report_string(),
1232 };
1233 self.env
1234 .event_log_manager_ref()
1235 .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1236 })?;
1237 Ok(node_need_collect)
1238 }
1239
1240 pub(super) fn add_partial_graph(&mut self, partial_graph_id: PartialGraphId) {
1241 self.connected_workers().for_each(|(_, node)| {
1242 if node
1243 .handle
1244 .request_sender
1245 .send(StreamingControlStreamRequest {
1246 request: Some(
1247 streaming_control_stream_request::Request::CreatePartialGraph(
1248 CreatePartialGraphRequest {
1249 partial_graph_id,
1250 },
1251 ),
1252 ),
1253 }).is_err() {
1254 let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
1255 warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1256 }
1257 });
1258 }
1259
1260 pub(super) fn remove_partial_graphs(&mut self, partial_graph_ids: Vec<PartialGraphId>) {
1261 self.connected_workers().for_each(|(_, node)| {
1262 if node.handle
1263 .request_sender
1264 .send(StreamingControlStreamRequest {
1265 request: Some(
1266 streaming_control_stream_request::Request::RemovePartialGraph(
1267 RemovePartialGraphRequest {
1268 partial_graph_ids: partial_graph_ids.clone(),
1269 },
1270 ),
1271 ),
1272 })
1273 .is_err()
1274 {
1275 warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1276 }
1277 })
1278 }
1279
1280 pub(super) fn reset_partial_graphs(
1281 &mut self,
1282 partial_graph_ids: Vec<PartialGraphId>,
1283 ) -> HashSet<WorkerId> {
1284 self.connected_workers()
1285 .filter_map(|(worker_id, node)| {
1286 if node
1287 .handle
1288 .request_sender
1289 .send(StreamingControlStreamRequest {
1290 request: Some(
1291 streaming_control_stream_request::Request::ResetPartialGraphs(
1292 ResetPartialGraphsRequest {
1293 partial_graph_ids: partial_graph_ids.clone(),
1294 },
1295 ),
1296 ),
1297 })
1298 .is_err()
1299 {
1300 warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1301 None
1302 } else {
1303 Some(worker_id)
1304 }
1305 })
1306 .collect()
1307 }
1308}
1309
1310impl GlobalBarrierWorkerContextImpl {
1311 pub(super) async fn new_control_stream_impl(
1312 &self,
1313 node: &WorkerNode,
1314 init_request: &PbInitRequest,
1315 ) -> MetaResult<StreamingControlHandle> {
1316 let handle = self
1317 .env
1318 .stream_client_pool()
1319 .get(node)
1320 .await?
1321 .start_streaming_control(init_request.clone())
1322 .await?;
1323 Ok(handle)
1324 }
1325}
1326
1327pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1328 message: &str,
1329 errors: impl IntoIterator<Item = (WorkerId, E)>,
1330) -> MetaError {
1331 use std::fmt::Write;
1332
1333 use risingwave_common::error::error_request_copy;
1334 use risingwave_common::error::tonic::extra::Score;
1335
1336 let errors = errors.into_iter().collect_vec();
1337
1338 if errors.is_empty() {
1339 return anyhow!(message.to_owned()).into();
1340 }
1341
1342 let single_error = |(worker_id, e)| {
1344 anyhow::Error::from(e)
1345 .context(format!("{message}, in worker node {worker_id}"))
1346 .into()
1347 };
1348
1349 if errors.len() == 1 {
1350 return single_error(errors.into_iter().next().unwrap());
1351 }
1352
1353 let max_score = errors
1355 .iter()
1356 .filter_map(|(_, e)| error_request_copy::<Score>(e))
1357 .max();
1358
1359 if let Some(max_score) = max_score {
1360 let mut errors = errors;
1361 let max_scored = errors
1362 .extract_if(.., |(_, e)| {
1363 error_request_copy::<Score>(e) == Some(max_score)
1364 })
1365 .next()
1366 .unwrap();
1367
1368 return single_error(max_scored);
1369 }
1370
1371 let concat: String = errors
1373 .into_iter()
1374 .fold(format!("{message}: "), |mut s, (w, e)| {
1375 write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1376 s
1377 });
1378 anyhow!(concat).into()
1379}
1380
1381#[cfg(test)]
1382mod test_partial_graph_id {
1383 use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
1384
1385 #[test]
1386 fn test_partial_graph_id_conversion() {
1387 let database_id = 233.into();
1388 let job_id = 233.into();
1389 assert_eq!(
1390 (database_id, None),
1391 from_partial_graph_id(to_partial_graph_id(database_id, None))
1392 );
1393 assert_eq!(
1394 (database_id, Some(job_id)),
1395 from_partial_graph_id(to_partial_graph_id(database_id, Some(job_id)))
1396 );
1397 }
1398}