1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::bail;
30use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
31use risingwave_common::id::JobId;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
34use risingwave_common::util::tracing::TracingContext;
35use risingwave_connector::source::SplitImpl;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::common::{HostAddress, WorkerNode};
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::id::PartialGraphId;
40use risingwave_pb::source::{PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration};
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::stream_node::NodeBody;
43use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
44use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
45use risingwave_pb::stream_service::inject_barrier_request::{
46 BuildActorInfo, FragmentBuildActorInfo,
47};
48use risingwave_pb::stream_service::streaming_control_stream_request::{
49 CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
50 RemovePartialGraphRequest, ResetPartialGraphsRequest,
51};
52use risingwave_pb::stream_service::{
53 BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
54 streaming_control_stream_request, streaming_control_stream_response,
55};
56use risingwave_rpc_client::StreamingControlHandle;
57use thiserror_ext::AsReport;
58use tokio::time::{Instant, sleep};
59use tokio_retry::strategy::ExponentialBackoff;
60use tracing::{debug, error, info, warn};
61use uuid::Uuid;
62
63use super::{BarrierKind, TracedEpoch};
64use crate::barrier::BackfillOrderState;
65use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
66use crate::barrier::cdc_progress::CdcTableBackfillTracker;
67use crate::barrier::checkpoint::{
68 BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
69};
70use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
71use crate::barrier::edge_builder::FragmentEdgeBuilder;
72use crate::barrier::info::{
73 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
74 SubscriberType,
75};
76use crate::barrier::progress::CreateMviewProgressTracker;
77use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
78use crate::controller::fragment::InflightFragmentInfo;
79use crate::manager::MetaSrvEnv;
80use crate::model::{
81 ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
82 SubscriptionId,
83};
84use crate::stream::cdc::{
85 CdcTableSnapshotSplits, is_parallelized_backfill_enabled_cdc_scan_fragment,
86};
87use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
88use crate::{MetaError, MetaResult};
89
90pub(super) fn to_partial_graph_id(
91 database_id: DatabaseId,
92 creating_job_id: Option<JobId>,
93) -> PartialGraphId {
94 let raw_job_id = creating_job_id
95 .map(|job_id| {
96 assert_ne!(job_id, u32::MAX);
97 job_id.as_raw_id()
98 })
99 .unwrap_or(u32::MAX);
100 (((database_id.as_raw_id() as u64) << 32) | (raw_job_id as u64)).into()
101}
102
103pub(super) fn from_partial_graph_id(
104 partial_graph_id: PartialGraphId,
105) -> (DatabaseId, Option<JobId>) {
106 let id = partial_graph_id.as_raw_id();
107 let database_id = (id >> 32) as u32;
108 let raw_creating_job_id = (id & ((1 << 32) - 1)) as u32;
109 let creating_job_id = if raw_creating_job_id == u32::MAX {
110 None
111 } else {
112 Some(JobId::new(raw_creating_job_id))
113 };
114 (database_id.into(), creating_job_id)
115}
116
117fn build_locality_fragment_state_table_mapping(
118 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
119) -> HashMap<FragmentId, Vec<TableId>> {
120 let mut mapping = HashMap::new();
121
122 for (fragment_id, fragment_info) in fragment_infos {
123 let mut state_table_ids = Vec::new();
124 visit_stream_node_cont(&fragment_info.nodes, |stream_node| {
125 if let Some(NodeBody::LocalityProvider(locality_provider)) =
126 stream_node.node_body.as_ref()
127 {
128 let state_table_id = locality_provider
129 .state_table
130 .as_ref()
131 .expect("must have state table")
132 .id;
133 state_table_ids.push(state_table_id);
134 false
135 } else {
136 true
137 }
138 });
139 if !state_table_ids.is_empty() {
140 mapping.insert(*fragment_id, state_table_ids);
141 }
142 }
143
144 mapping
145}
146
147struct ControlStreamNode {
148 worker_id: WorkerId,
149 host: HostAddress,
150 handle: StreamingControlHandle,
151}
152
153enum WorkerNodeState {
154 Connected {
155 control_stream: ControlStreamNode,
156 removed: bool,
157 },
158 Reconnecting(BoxFuture<'static, StreamingControlHandle>),
159}
160
161pub(super) struct ControlStreamManager {
162 workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
163 pub env: MetaSrvEnv,
164}
165
166impl ControlStreamManager {
167 pub(super) fn new(env: MetaSrvEnv) -> Self {
168 Self {
169 workers: Default::default(),
170 env,
171 }
172 }
173
174 pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
175 self.workers[&worker_id].0.host.clone().unwrap()
176 }
177
178 pub(super) async fn add_worker(
179 &mut self,
180 node: WorkerNode,
181 inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
182 term_id: String,
183 context: &impl GlobalBarrierWorkerContext,
184 ) {
185 let node_id = node.id;
186 if let Entry::Occupied(entry) = self.workers.entry(node_id) {
187 let (existing_node, worker_state) = entry.get();
188 assert_eq!(existing_node.host, node.host);
189 warn!(id = %node.id, host = ?node.host, "node already exists");
190 match worker_state {
191 WorkerNodeState::Connected { .. } => {
192 warn!(id = %node.id, host = ?node.host, "new node already connected");
193 return;
194 }
195 WorkerNodeState::Reconnecting(_) => {
196 warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
197 entry.remove();
198 }
199 }
200 }
201 let node_host = node.host.clone().unwrap();
202 let mut backoff = ExponentialBackoff::from_millis(100)
203 .max_delay(Duration::from_secs(3))
204 .factor(5);
205 const MAX_RETRY: usize = 5;
206 for i in 1..=MAX_RETRY {
207 match context
208 .new_control_stream(
209 &node,
210 &PbInitRequest {
211 term_id: term_id.clone(),
212 },
213 )
214 .await
215 {
216 Ok(mut handle) => {
217 WorkerNodeConnected {
218 handle: &mut handle,
219 node: &node,
220 }
221 .initialize(inflight_infos);
222 info!(?node_host, "add control stream worker");
223 assert!(
224 self.workers
225 .insert(
226 node_id,
227 (
228 node,
229 WorkerNodeState::Connected {
230 control_stream: ControlStreamNode {
231 worker_id: node_id as _,
232 host: node_host,
233 handle,
234 },
235 removed: false
236 }
237 )
238 )
239 .is_none()
240 );
241 return;
242 }
243 Err(e) => {
244 let delay = backoff.next().unwrap();
247 error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
248 sleep(delay).await;
249 }
250 }
251 }
252 error!(?node_host, "fail to create worker node after retry");
253 }
254
255 pub(super) fn remove_worker(&mut self, node: WorkerNode) {
256 if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
257 let (_, worker_state) = entry.get_mut();
258 match worker_state {
259 WorkerNodeState::Connected { removed, .. } => {
260 info!(worker_id = %node.id, "mark connected worker as removed");
261 *removed = true;
262 }
263 WorkerNodeState::Reconnecting(_) => {
264 info!(worker_id = %node.id, "remove worker");
265 entry.remove();
266 }
267 }
268 }
269 }
270
271 fn retry_connect(
272 node: WorkerNode,
273 term_id: String,
274 context: Arc<impl GlobalBarrierWorkerContext>,
275 ) -> BoxFuture<'static, StreamingControlHandle> {
276 async move {
277 let mut attempt = 0;
278 let backoff = ExponentialBackoff::from_millis(100)
279 .max_delay(Duration::from_mins(1))
280 .factor(5);
281 let init_request = PbInitRequest { term_id };
282 for delay in backoff {
283 attempt += 1;
284 sleep(delay).await;
285 match context.new_control_stream(&node, &init_request).await {
286 Ok(handle) => {
287 return handle;
288 }
289 Err(e) => {
290 warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
291 }
292 }
293 }
294 unreachable!("end of retry backoff")
295 }.boxed()
296 }
297
298 pub(super) async fn recover(
299 env: MetaSrvEnv,
300 nodes: &HashMap<WorkerId, WorkerNode>,
301 term_id: &str,
302 context: Arc<impl GlobalBarrierWorkerContext>,
303 ) -> Self {
304 let reset_start_time = Instant::now();
305 let init_request = PbInitRequest {
306 term_id: term_id.to_owned(),
307 };
308 let init_request = &init_request;
309 let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
310 let result = context.new_control_stream(node, init_request).await;
311 (*worker_id, node.clone(), result)
312 }))
313 .await;
314 let mut unconnected_workers = HashSet::new();
315 let mut workers = HashMap::new();
316 for (worker_id, node, result) in nodes {
317 match result {
318 Ok(handle) => {
319 let control_stream = ControlStreamNode {
320 worker_id: node.id,
321 host: node.host.clone().unwrap(),
322 handle,
323 };
324 assert!(
325 workers
326 .insert(
327 worker_id,
328 (
329 node,
330 WorkerNodeState::Connected {
331 control_stream,
332 removed: false
333 }
334 )
335 )
336 .is_none()
337 );
338 }
339 Err(e) => {
340 unconnected_workers.insert(worker_id);
341 warn!(
342 e = %e.as_report(),
343 %worker_id,
344 ?node,
345 "failed to connect to node"
346 );
347 assert!(
348 workers
349 .insert(
350 worker_id,
351 (
352 node.clone(),
353 WorkerNodeState::Reconnecting(Self::retry_connect(
354 node,
355 term_id.to_owned(),
356 context.clone()
357 ))
358 )
359 )
360 .is_none()
361 );
362 }
363 }
364 }
365
366 info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
367
368 Self { workers, env }
369 }
370
371 pub(super) fn clear(&mut self) {
373 *self = Self::new(self.env.clone());
374 }
375}
376
377pub(super) struct WorkerNodeConnected<'a> {
378 node: &'a WorkerNode,
379 handle: &'a mut StreamingControlHandle,
380}
381
382impl<'a> WorkerNodeConnected<'a> {
383 pub(super) fn initialize(
384 self,
385 inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
386 ) {
387 for request in ControlStreamManager::collect_init_partial_graph(inflight_infos) {
388 if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
389 request: Some(
390 streaming_control_stream_request::Request::CreatePartialGraph(request),
391 ),
392 }) {
393 warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
394 }
395 }
396 }
397}
398
399pub(super) enum WorkerNodeEvent<'a> {
400 Response(MetaResult<streaming_control_stream_response::Response>),
401 Connected(WorkerNodeConnected<'a>),
402}
403
404impl ControlStreamManager {
405 fn poll_next_event<'a>(
406 this_opt: &mut Option<&'a mut Self>,
407 cx: &mut Context<'_>,
408 term_id: &str,
409 context: &Arc<impl GlobalBarrierWorkerContext>,
410 poll_reconnect: bool,
411 ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
412 let this = this_opt.as_mut().expect("Future polled after completion");
413 if this.workers.is_empty() {
414 return Poll::Pending;
415 }
416 {
417 for (&worker_id, (node, worker_state)) in &mut this.workers {
418 let control_stream = match worker_state {
419 WorkerNodeState::Connected { control_stream, .. } => control_stream,
420 WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
421 continue;
422 }
423 WorkerNodeState::Reconnecting(join_handle) => {
424 match join_handle.poll_unpin(cx) {
425 Poll::Ready(handle) => {
426 info!(id=%node.id, host=?node.host, "reconnected to worker");
427 *worker_state = WorkerNodeState::Connected {
428 control_stream: ControlStreamNode {
429 worker_id: node.id,
430 host: node.host.clone().unwrap(),
431 handle,
432 },
433 removed: false,
434 };
435 let this = this_opt.take().expect("should exist");
436 let (node, worker_state) =
437 this.workers.get_mut(&worker_id).expect("should exist");
438 let WorkerNodeState::Connected { control_stream, .. } =
439 worker_state
440 else {
441 unreachable!()
442 };
443 return Poll::Ready((
444 worker_id,
445 WorkerNodeEvent::Connected(WorkerNodeConnected {
446 node,
447 handle: &mut control_stream.handle,
448 }),
449 ));
450 }
451 Poll::Pending => {
452 continue;
453 }
454 }
455 }
456 };
457 match control_stream.handle.response_stream.poll_next_unpin(cx) {
458 Poll::Ready(result) => {
459 {
460 let result = result
461 .ok_or_else(|| (false, anyhow!("end of stream").into()))
462 .and_then(|result| {
463 result.map_err(|err| -> (bool, MetaError) { (false, err.into()) }).and_then(|resp| {
464 match resp
465 .response
466 .ok_or_else(|| (false, anyhow!("empty response").into()))?
467 {
468 streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
469 "worker node {worker_id} is shutting down"
470 )
471 .into())),
472 streaming_control_stream_response::Response::Init(_) => {
473 Err((false, anyhow!("get unexpected init response").into()))
475 }
476 resp => {
477 if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
478 assert_eq!(worker_id, barrier_resp.worker_id);
479 }
480 Ok(resp)
481 }
482 }
483 })
484 });
485 let result = match result {
486 Ok(resp) => Ok(resp),
487 Err((shutdown, err)) => {
488 warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
489 let WorkerNodeState::Connected { removed, .. } = worker_state
490 else {
491 unreachable!("checked connected")
492 };
493 if *removed || shutdown {
494 this.workers.remove(&worker_id);
495 } else {
496 *worker_state = WorkerNodeState::Reconnecting(
497 ControlStreamManager::retry_connect(
498 node.clone(),
499 term_id.to_owned(),
500 context.clone(),
501 ),
502 );
503 }
504 Err(err)
505 }
506 };
507 return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
508 }
509 }
510 Poll::Pending => {
511 continue;
512 }
513 }
514 }
515 };
516
517 Poll::Pending
518 }
519
520 #[await_tree::instrument("control_stream_next_event")]
521 pub(super) async fn next_event<'a>(
522 &'a mut self,
523 term_id: &str,
524 context: &Arc<impl GlobalBarrierWorkerContext>,
525 ) -> (WorkerId, WorkerNodeEvent<'a>) {
526 let mut this = Some(self);
527 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
528 }
529
530 #[await_tree::instrument("control_stream_next_response")]
531 pub(super) async fn next_response(
532 &mut self,
533 term_id: &str,
534 context: &Arc<impl GlobalBarrierWorkerContext>,
535 ) -> (
536 WorkerId,
537 MetaResult<streaming_control_stream_response::Response>,
538 ) {
539 let mut this = Some(self);
540 let (worker_id, event) =
541 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
542 match event {
543 WorkerNodeEvent::Response(result) => (worker_id, result),
544 WorkerNodeEvent::Connected(_) => {
545 unreachable!("set poll_reconnect=false")
546 }
547 }
548 }
549
550 fn collect_init_partial_graph(
551 initial_inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
552 ) -> impl Iterator<Item = PbCreatePartialGraphRequest> {
553 initial_inflight_infos.flat_map(|(database_id, creating_job_ids)| {
554 [PbCreatePartialGraphRequest {
555 partial_graph_id: to_partial_graph_id(database_id, None),
556 }]
557 .into_iter()
558 .chain(
559 creating_job_ids.map(move |job_id| PbCreatePartialGraphRequest {
560 partial_graph_id: to_partial_graph_id(database_id, Some(job_id)),
561 }),
562 )
563 })
564 }
565}
566
567pub(super) struct DatabaseInitialBarrierCollector {
568 database_id: DatabaseId,
569 node_to_collect: NodeToCollect,
570 database_state: BarrierWorkerState,
571 database_info: InflightDatabaseInfo,
572 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
573 committed_epoch: u64,
574}
575
576impl Debug for DatabaseInitialBarrierCollector {
577 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
578 f.debug_struct("DatabaseInitialBarrierCollector")
579 .field("database_id", &self.database_id)
580 .field("node_to_collect", &self.node_to_collect)
581 .finish()
582 }
583}
584
585impl DatabaseInitialBarrierCollector {
586 pub(super) fn is_collected(&self) -> bool {
587 self.node_to_collect.is_empty()
588 && self
589 .creating_streaming_job_controls
590 .values()
591 .all(|job| job.is_empty())
592 }
593
594 pub(super) fn database_state(
595 &self,
596 ) -> (
597 &BarrierWorkerState,
598 &HashMap<JobId, CreatingStreamingJobControl>,
599 ) {
600 (&self.database_state, &self.creating_streaming_job_controls)
601 }
602
603 pub(super) fn creating_job_ids(&self) -> impl Iterator<Item = JobId> {
604 self.creating_streaming_job_controls.keys().copied()
605 }
606
607 pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
608 let (database_id, creating_job_id) = from_partial_graph_id(resp.partial_graph_id);
609 assert_eq!(self.database_id, database_id);
610 if let Some(creating_job_id) = creating_job_id {
611 assert!(
612 !self
613 .creating_streaming_job_controls
614 .get_mut(&creating_job_id)
615 .expect("should exist")
616 .collect(resp),
617 "unlikely to finish backfill since just recovered"
618 );
619 } else {
620 assert_eq!(resp.epoch, self.committed_epoch);
621 assert!(self.node_to_collect.remove(&resp.worker_id));
622 }
623 }
624
625 pub(super) fn finish(self) -> DatabaseCheckpointControl {
626 assert!(self.is_collected());
627 DatabaseCheckpointControl::recovery(
628 self.database_id,
629 self.database_state,
630 self.committed_epoch,
631 self.database_info,
632 self.creating_streaming_job_controls,
633 )
634 }
635
636 pub(super) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
637 is_valid_after_worker_err(&self.node_to_collect, worker_id)
638 && self
639 .creating_streaming_job_controls
640 .values()
641 .all(|job| job.is_valid_after_worker_err(worker_id))
642 }
643}
644
645impl ControlStreamManager {
646 #[expect(clippy::too_many_arguments)]
648 pub(super) fn inject_database_initial_barrier(
649 &mut self,
650 database_id: DatabaseId,
651 jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
652 backfill_orders: HashMap<JobId, HashMap<FragmentId, Vec<FragmentId>>>,
653 state_table_committed_epochs: &mut HashMap<TableId, u64>,
654 state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
655 fragment_relations: &FragmentDownstreamRelation,
656 stream_actors: &HashMap<ActorId, StreamActor>,
657 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
658 background_jobs: &mut HashSet<JobId>,
659 mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
660 is_paused: bool,
661 hummock_version_stats: &HummockVersionStats,
662 cdc_table_snapshot_splits: &mut HashMap<JobId, CdcTableSnapshotSplits>,
663 injected_creating_jobs: &mut HashSet<JobId>,
664 ) -> MetaResult<DatabaseInitialBarrierCollector> {
665 self.add_partial_graph(database_id, None);
666 fn collect_source_splits(
667 fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
668 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
669 ) -> HashMap<ActorId, Vec<SplitImpl>> {
670 fragment_infos
671 .flat_map(|info| info.actors.keys())
672 .filter_map(|actor_id| {
673 let actor_id = *actor_id as ActorId;
674 source_splits
675 .remove(&actor_id)
676 .map(|splits| (actor_id, splits))
677 })
678 .collect()
679 }
680 fn build_mutation(
681 splits: &HashMap<ActorId, Vec<SplitImpl>>,
682 cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits>,
683 backfill_orders: &HashMap<FragmentId, Vec<FragmentId>>,
684 is_paused: bool,
685 ) -> Mutation {
686 let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_orders)
687 .into_iter()
688 .collect();
689 Mutation::Add(AddMutation {
690 actor_dispatchers: Default::default(),
692 added_actors: Default::default(),
693 actor_splits: build_actor_connector_splits(splits),
694 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
695 splits: cdc_table_snapshot_split_assignment,
696 }),
697 pause: is_paused,
698 subscriptions_to_add: Default::default(),
699 backfill_nodes_to_pause,
700 new_upstream_sinks: Default::default(),
701 })
702 }
703
704 fn resolve_jobs_committed_epoch<'a>(
705 state_table_committed_epochs: &mut HashMap<TableId, u64>,
706 fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
707 ) -> u64 {
708 let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
709 (
710 table_id,
711 state_table_committed_epochs
712 .remove(&table_id)
713 .expect("should exist"),
714 )
715 });
716 let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
717 for (table_id, epoch) in epochs {
718 assert_eq!(
719 prev_epoch, epoch,
720 "{} has different committed epoch to {}",
721 first_table_id, table_id
722 );
723 }
724 prev_epoch
725 }
726
727 let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
728 .keys()
729 .filter_map(|job_id| {
730 mv_depended_subscriptions
731 .remove(&job_id.as_mv_table_id())
732 .map(|subscriptions| {
733 (
734 job_id.as_mv_table_id(),
735 subscriptions
736 .into_iter()
737 .map(|(subscription_id, retention)| {
738 (
739 subscription_id.as_subscriber_id(),
740 SubscriberType::Subscription(retention),
741 )
742 })
743 .collect(),
744 )
745 })
746 })
747 .collect();
748
749 let mut database_jobs = HashMap::new();
750 let mut snapshot_backfill_jobs = HashMap::new();
751
752 for (job_id, job_fragments) in jobs {
753 if background_jobs.remove(&job_id) {
754 if job_fragments.values().any(|fragment| {
755 fragment
756 .fragment_type_mask
757 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
758 }) {
759 debug!(%job_id, "recovered snapshot backfill job");
760 snapshot_backfill_jobs.insert(job_id, job_fragments);
761 } else {
762 database_jobs.insert(job_id, (job_fragments, true));
763 }
764 } else {
765 database_jobs.insert(job_id, (job_fragments, false));
766 }
767 }
768
769 let database_job_log_epochs: HashMap<_, _> = database_jobs
770 .keys()
771 .filter_map(|job_id| {
772 state_table_log_epochs
773 .remove(&job_id.as_mv_table_id())
774 .map(|epochs| (job_id.as_mv_table_id(), epochs))
775 })
776 .collect();
777
778 let prev_epoch = resolve_jobs_committed_epoch(
779 state_table_committed_epochs,
780 database_jobs.values().flat_map(|(job, _)| job.values()),
781 );
782 let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
783 let curr_epoch = prev_epoch.next();
785 let barrier_info = BarrierInfo {
786 prev_epoch,
787 curr_epoch,
788 kind: BarrierKind::Initial,
789 };
790
791 let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
792 for (job_id, fragment_infos) in snapshot_backfill_jobs {
793 let committed_epoch =
794 resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
795 if committed_epoch == barrier_info.prev_epoch() {
796 info!(
797 "recovered creating snapshot backfill job {} catch up with upstream already",
798 job_id
799 );
800 database_jobs
801 .try_insert(job_id, (fragment_infos, true))
802 .expect("non-duplicate");
803 continue;
804 }
805 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
806 fragment_infos
807 .values()
808 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
809 )?
810 .0
811 .ok_or_else(|| {
812 anyhow!(
813 "recovered snapshot backfill job {} has no snapshot backfill info",
814 job_id
815 )
816 })?;
817 let mut snapshot_epoch = None;
818 let upstream_table_ids: HashSet<_> = snapshot_backfill_info
819 .upstream_mv_table_id_to_backfill_epoch
820 .keys()
821 .cloned()
822 .collect();
823 for (upstream_table_id, epoch) in
824 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
825 {
826 let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
827 let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
828 if *snapshot_epoch != epoch {
829 return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
830 }
831 }
832 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
833 anyhow!(
834 "snapshot backfill job {} has not set snapshot epoch",
835 job_id
836 )
837 })?;
838 for upstream_table_id in &upstream_table_ids {
839 subscribers
840 .entry(*upstream_table_id)
841 .or_default()
842 .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
843 .expect("non-duplicate");
844 }
845 ongoing_snapshot_backfill_jobs
846 .try_insert(
847 job_id,
848 (
849 fragment_infos,
850 upstream_table_ids,
851 committed_epoch,
852 snapshot_epoch,
853 ),
854 )
855 .expect("non-duplicated");
856 }
857
858 let mut cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits> =
859 HashMap::new();
860
861 let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
862 database_jobs
863 .into_iter()
864 .map(|(job_id, (fragment_infos, is_background_creating))| {
865 let status = if is_background_creating {
866 let backfill_ordering =
867 backfill_orders.get(&job_id).cloned().unwrap_or_default();
868 let locality_fragment_state_table_mapping =
869 build_locality_fragment_state_table_mapping(&fragment_infos);
870 let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
871 &backfill_ordering,
872 &fragment_infos,
873 locality_fragment_state_table_mapping,
874 );
875 CreateStreamingJobStatus::Creating {
876 tracker: CreateMviewProgressTracker::recover(
877 job_id,
878 &fragment_infos,
879 backfill_order_state,
880 hummock_version_stats,
881 ),
882 }
883 } else {
884 CreateStreamingJobStatus::Created
885 };
886 let cdc_table_backfill_tracker =
887 if let Some(splits) = cdc_table_snapshot_splits.remove(&job_id) {
888 let cdc_fragment = fragment_infos
889 .values()
890 .find(|fragment| {
891 is_parallelized_backfill_enabled_cdc_scan_fragment(
892 fragment.fragment_type_mask,
893 &fragment.nodes,
894 )
895 .is_some()
896 })
897 .expect("should have parallel cdc fragment");
898 let cdc_actors = cdc_fragment.actors.keys().copied().collect();
899 let mut tracker =
900 CdcTableBackfillTracker::restore(cdc_fragment.fragment_id, splits);
901 cdc_table_snapshot_split_assignment
902 .extend(tracker.reassign_splits(cdc_actors)?);
903 Some(tracker)
904 } else {
905 None
906 };
907 Ok((
908 job_id,
909 InflightStreamingJobInfo {
910 job_id,
911 fragment_infos,
912 subscribers: subscribers
913 .remove(&job_id.as_mv_table_id())
914 .unwrap_or_default(),
915 status,
916 cdc_table_backfill_tracker,
917 },
918 ))
919 })
920 .try_collect::<_, _, MetaError>()
921 }?;
922
923 let mut builder = FragmentEdgeBuilder::new(
924 database_jobs
925 .values()
926 .flat_map(|job| {
927 job.fragment_infos()
928 .map(|info| (info, to_partial_graph_id(database_id, None)))
929 })
930 .chain(ongoing_snapshot_backfill_jobs.iter().flat_map(
931 |(job_id, (fragments, ..))| {
932 let partial_graph_id = to_partial_graph_id(database_id, Some(*job_id));
933 fragments
934 .values()
935 .map(move |fragment| (fragment, partial_graph_id))
936 },
937 )),
938 self,
939 );
940 builder.add_relations(fragment_relations);
941 let mut edges = builder.build();
942
943 let node_to_collect = {
944 let new_actors =
945 edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
946 job.fragment_infos.values().map(move |fragment_infos| {
947 (
948 fragment_infos.fragment_id,
949 &fragment_infos.nodes,
950 fragment_infos.actors.iter().map(move |(actor_id, actor)| {
951 (
952 stream_actors.get(actor_id).expect("should exist"),
953 actor.worker_id,
954 )
955 }),
956 job.subscribers.keys().copied(),
957 )
958 })
959 }));
960
961 let nodes_actors =
962 InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
963 let database_job_source_splits =
964 collect_source_splits(database_jobs.values().flatten(), source_splits);
965 let database_backfill_orders = database_jobs
966 .values()
967 .flat_map(|job| {
968 if matches!(job.status, CreateStreamingJobStatus::Creating { .. }) {
969 backfill_orders
970 .get(&job.job_id)
971 .cloned()
972 .unwrap_or_default()
973 } else {
974 HashMap::new()
975 }
976 })
977 .collect();
978 let mutation = build_mutation(
979 &database_job_source_splits,
980 cdc_table_snapshot_split_assignment,
981 &database_backfill_orders,
982 is_paused,
983 );
984
985 let node_to_collect = self.inject_barrier(
986 database_id,
987 None,
988 Some(mutation),
989 &barrier_info,
990 &nodes_actors,
991 InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
992 nodes_actors.keys().copied(),
993 Some(new_actors),
994 )?;
995 debug!(
996 ?node_to_collect,
997 %database_id,
998 "inject initial barrier"
999 );
1000 node_to_collect
1001 };
1002
1003 let mut creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl> =
1004 HashMap::new();
1005 for (job_id, (info, upstream_table_ids, committed_epoch, snapshot_epoch)) in
1006 ongoing_snapshot_backfill_jobs
1007 {
1008 let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
1009 (
1010 fragment_infos.fragment_id,
1011 &fragment_infos.nodes,
1012 fragment_infos.actors.iter().map(move |(actor_id, actor)| {
1013 (
1014 stream_actors.get(actor_id).expect("should exist"),
1015 actor.worker_id,
1016 )
1017 }),
1018 vec![], )
1020 }));
1021
1022 let database_job_source_splits =
1023 collect_source_splits(database_jobs.values().flatten(), source_splits);
1024 assert!(
1025 !cdc_table_snapshot_splits.contains_key(&job_id),
1026 "snapshot backfill job {job_id} should not have cdc backfill"
1027 );
1028 if is_paused {
1029 bail!("should not pause when having snapshot backfill job {job_id}");
1030 }
1031 let job_backfill_orders = backfill_orders.get(&job_id).cloned().unwrap_or_default();
1032 let mutation = build_mutation(
1033 &database_job_source_splits,
1034 Default::default(), &job_backfill_orders,
1036 false,
1037 );
1038
1039 injected_creating_jobs.insert(job_id);
1042 creating_streaming_job_controls.insert(
1043 job_id,
1044 CreatingStreamingJobControl::recover(
1045 database_id,
1046 job_id,
1047 upstream_table_ids,
1048 &database_job_log_epochs,
1049 snapshot_epoch,
1050 committed_epoch,
1051 &barrier_info,
1052 info,
1053 fragment_relations,
1054 hummock_version_stats,
1055 node_actors,
1056 mutation,
1057 self,
1058 )?,
1059 );
1060 }
1061
1062 self.env.shared_actor_infos().recover_database(
1063 database_id,
1064 database_jobs
1065 .values()
1066 .flat_map(|info| {
1067 info.fragment_infos()
1068 .map(move |fragment| (fragment, info.job_id))
1069 })
1070 .chain(
1071 creating_streaming_job_controls
1072 .values()
1073 .flat_map(|job| job.fragment_infos_with_job_id()),
1074 ),
1075 );
1076
1077 let committed_epoch = barrier_info.prev_epoch();
1078 let new_epoch = barrier_info.curr_epoch;
1079 let database_info = InflightDatabaseInfo::recover(
1080 database_id,
1081 database_jobs.into_values(),
1082 self.env.shared_actor_infos().clone(),
1083 );
1084 let database_state = BarrierWorkerState::recovery(new_epoch, is_paused);
1085 Ok(DatabaseInitialBarrierCollector {
1086 database_id,
1087 node_to_collect,
1088 database_state,
1089 database_info,
1090 creating_streaming_job_controls,
1091 committed_epoch,
1092 })
1093 }
1094
1095 fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
1096 self.workers
1097 .iter()
1098 .filter_map(|(worker_id, (_, worker_state))| match worker_state {
1099 WorkerNodeState::Connected { control_stream, .. } => {
1100 Some((*worker_id, control_stream))
1101 }
1102 WorkerNodeState::Reconnecting(_) => None,
1103 })
1104 }
1105
1106 pub(super) fn inject_barrier(
1107 &mut self,
1108 database_id: DatabaseId,
1109 creating_job_id: Option<JobId>,
1110 mutation: Option<Mutation>,
1111 barrier_info: &BarrierInfo,
1112 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
1113 table_ids_to_sync: impl Iterator<Item = TableId>,
1114 nodes_to_sync_table: impl Iterator<Item = WorkerId>,
1115 mut new_actors: Option<StreamJobActorsToCreate>,
1116 ) -> MetaResult<NodeToCollect> {
1117 fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1118 "inject_barrier_err"
1119 ));
1120
1121 let nodes_to_sync_table: HashSet<_> = nodes_to_sync_table.collect();
1122
1123 let partial_graph_id = to_partial_graph_id(database_id, creating_job_id);
1124
1125 nodes_to_sync_table.iter().for_each(|worker_id| {
1126 assert!(node_actors.contains_key(worker_id), "worker_id {worker_id} in nodes_to_sync_table {nodes_to_sync_table:?} but not in node_actors {node_actors:?}");
1127 });
1128
1129 let mut node_need_collect = NodeToCollect::new();
1130 let table_ids_to_sync = table_ids_to_sync.collect_vec();
1131
1132 node_actors.iter()
1133 .try_for_each(|(worker_id, actor_ids_to_collect)| {
1134 assert!(!actor_ids_to_collect.is_empty(), "empty actor_ids_to_collect on worker {worker_id} in node_actors {node_actors:?}");
1135 let table_ids_to_sync = if nodes_to_sync_table.contains(worker_id) {
1136 table_ids_to_sync.clone()
1137 } else {
1138 vec![]
1139 };
1140
1141 let node = if let Some((_, worker_state)) = self.workers.get(worker_id)
1142 &&
1143 let WorkerNodeState::Connected { control_stream, .. } = worker_state
1144 {
1145 control_stream
1146 } else {
1147 return Err(anyhow!("unconnected worker node {}", worker_id).into());
1148 };
1149
1150 {
1151 let mutation = mutation.clone();
1152 let barrier = Barrier {
1153 epoch: Some(risingwave_pb::data::Epoch {
1154 curr: barrier_info.curr_epoch(),
1155 prev: barrier_info.prev_epoch(),
1156 }),
1157 mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1158 tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1159 .to_protobuf(),
1160 kind: barrier_info.kind.to_protobuf() as i32,
1161 };
1162
1163 node.handle
1164 .request_sender
1165 .send(StreamingControlStreamRequest {
1166 request: Some(
1167 streaming_control_stream_request::Request::InjectBarrier(
1168 InjectBarrierRequest {
1169 request_id: Uuid::new_v4().to_string(),
1170 barrier: Some(barrier),
1171 actor_ids_to_collect: actor_ids_to_collect.iter().copied().collect(),
1172 table_ids_to_sync,
1173 partial_graph_id,
1174 actors_to_build: new_actors
1175 .as_mut()
1176 .map(|new_actors| new_actors.remove(worker_id))
1177 .into_iter()
1178 .flatten()
1179 .flatten()
1180 .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1181 FragmentBuildActorInfo {
1182 fragment_id,
1183 node: Some(node),
1184 actors: actors
1185 .into_iter()
1186 .map(|(actor, upstreams, dispatchers)| {
1187 BuildActorInfo {
1188 actor_id: actor.actor_id,
1189 fragment_upstreams: upstreams
1190 .into_iter()
1191 .map(|(fragment_id, upstreams)| {
1192 (
1193 fragment_id,
1194 UpstreamActors {
1195 actors: upstreams
1196 .into_values()
1197 .collect(),
1198 },
1199 )
1200 })
1201 .collect(),
1202 dispatchers,
1203 vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1204 mview_definition: actor.mview_definition,
1205 expr_context: actor.expr_context,
1206 config_override: actor.config_override.to_string(),
1207 initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1208 }
1209 })
1210 .collect(),
1211 }
1212 })
1213 .collect(),
1214 },
1215 ),
1216 ),
1217 })
1218 .map_err(|_| {
1219 MetaError::from(anyhow!(
1220 "failed to send request to {} {:?}",
1221 node.worker_id,
1222 node.host
1223 ))
1224 })?;
1225
1226 node_need_collect.insert(*worker_id);
1227 Result::<_, MetaError>::Ok(())
1228 }
1229 })
1230 .inspect_err(|e| {
1231 use risingwave_pb::meta::event_log;
1233 let event = event_log::EventInjectBarrierFail {
1234 prev_epoch: barrier_info.prev_epoch(),
1235 cur_epoch: barrier_info.curr_epoch(),
1236 error: e.to_report_string(),
1237 };
1238 self.env
1239 .event_log_manager_ref()
1240 .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1241 })?;
1242 Ok(node_need_collect)
1243 }
1244
1245 pub(super) fn add_partial_graph(
1246 &mut self,
1247 database_id: DatabaseId,
1248 creating_job_id: Option<JobId>,
1249 ) {
1250 let partial_graph_id = to_partial_graph_id(database_id, creating_job_id);
1251 self.connected_workers().for_each(|(_, node)| {
1252 if node
1253 .handle
1254 .request_sender
1255 .send(StreamingControlStreamRequest {
1256 request: Some(
1257 streaming_control_stream_request::Request::CreatePartialGraph(
1258 CreatePartialGraphRequest {
1259 partial_graph_id,
1260 },
1261 ),
1262 ),
1263 }).is_err() {
1264 warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1265 }
1266 });
1267 }
1268
1269 pub(super) fn remove_partial_graph(
1270 &mut self,
1271 database_id: DatabaseId,
1272 creating_job_ids: Vec<JobId>,
1273 ) {
1274 if creating_job_ids.is_empty() {
1275 return;
1276 }
1277 let partial_graph_ids = creating_job_ids
1278 .into_iter()
1279 .map(|job_id| to_partial_graph_id(database_id, Some(job_id)))
1280 .collect_vec();
1281 self.connected_workers().for_each(|(_, node)| {
1282 if node.handle
1283 .request_sender
1284 .send(StreamingControlStreamRequest {
1285 request: Some(
1286 streaming_control_stream_request::Request::RemovePartialGraph(
1287 RemovePartialGraphRequest {
1288 partial_graph_ids: partial_graph_ids.clone(),
1289 },
1290 ),
1291 ),
1292 })
1293 .is_err()
1294 {
1295 warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1296 }
1297 })
1298 }
1299
1300 pub(super) fn reset_partial_graphs(
1301 &mut self,
1302 partial_graph_ids: Vec<PartialGraphId>,
1303 ) -> HashSet<WorkerId> {
1304 self.connected_workers()
1305 .filter_map(|(worker_id, node)| {
1306 if node
1307 .handle
1308 .request_sender
1309 .send(StreamingControlStreamRequest {
1310 request: Some(
1311 streaming_control_stream_request::Request::ResetPartialGraphs(
1312 ResetPartialGraphsRequest {
1313 partial_graph_ids: partial_graph_ids.clone(),
1314 },
1315 ),
1316 ),
1317 })
1318 .is_err()
1319 {
1320 warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1321 None
1322 } else {
1323 Some(worker_id)
1324 }
1325 })
1326 .collect()
1327 }
1328}
1329
1330impl GlobalBarrierWorkerContextImpl {
1331 pub(super) async fn new_control_stream_impl(
1332 &self,
1333 node: &WorkerNode,
1334 init_request: &PbInitRequest,
1335 ) -> MetaResult<StreamingControlHandle> {
1336 let handle = self
1337 .env
1338 .stream_client_pool()
1339 .get(node)
1340 .await?
1341 .start_streaming_control(init_request.clone())
1342 .await?;
1343 Ok(handle)
1344 }
1345}
1346
1347pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1348 message: &str,
1349 errors: impl IntoIterator<Item = (WorkerId, E)>,
1350) -> MetaError {
1351 use std::fmt::Write;
1352
1353 use risingwave_common::error::error_request_copy;
1354 use risingwave_common::error::tonic::extra::Score;
1355
1356 let errors = errors.into_iter().collect_vec();
1357
1358 if errors.is_empty() {
1359 return anyhow!(message.to_owned()).into();
1360 }
1361
1362 let single_error = |(worker_id, e)| {
1364 anyhow::Error::from(e)
1365 .context(format!("{message}, in worker node {worker_id}"))
1366 .into()
1367 };
1368
1369 if errors.len() == 1 {
1370 return single_error(errors.into_iter().next().unwrap());
1371 }
1372
1373 let max_score = errors
1375 .iter()
1376 .filter_map(|(_, e)| error_request_copy::<Score>(e))
1377 .max();
1378
1379 if let Some(max_score) = max_score {
1380 let mut errors = errors;
1381 let max_scored = errors
1382 .extract_if(.., |(_, e)| {
1383 error_request_copy::<Score>(e) == Some(max_score)
1384 })
1385 .next()
1386 .unwrap();
1387
1388 return single_error(max_scored);
1389 }
1390
1391 let concat: String = errors
1393 .into_iter()
1394 .fold(format!("{message}: "), |mut s, (w, e)| {
1395 write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1396 s
1397 });
1398 anyhow!(concat).into()
1399}
1400
1401#[cfg(test)]
1402mod test_partial_graph_id {
1403 use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
1404
1405 #[test]
1406 fn test_partial_graph_id_conversion() {
1407 let database_id = 233.into();
1408 let job_id = 233.into();
1409 assert_eq!(
1410 (database_id, None),
1411 from_partial_graph_id(to_partial_graph_id(database_id, None))
1412 );
1413 assert_eq!(
1414 (database_id, Some(job_id)),
1415 from_partial_graph_id(to_partial_graph_id(database_id, Some(job_id)))
1416 );
1417 }
1418}