1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::bail;
30use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
31use risingwave_common::id::JobId;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_common::util::tracing::TracingContext;
34use risingwave_connector::source::SplitImpl;
35use risingwave_meta_model::WorkerId;
36use risingwave_pb::common::{HostAddress, WorkerNode};
37use risingwave_pb::hummock::HummockVersionStats;
38use risingwave_pb::source::{PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration};
39use risingwave_pb::stream_plan::barrier_mutation::Mutation;
40use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
41use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
42use risingwave_pb::stream_service::inject_barrier_request::{
43 BuildActorInfo, FragmentBuildActorInfo,
44};
45use risingwave_pb::stream_service::streaming_control_stream_request::{
46 CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
47 RemovePartialGraphRequest, ResetDatabaseRequest,
48};
49use risingwave_pb::stream_service::{
50 BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
51 streaming_control_stream_request, streaming_control_stream_response,
52};
53use risingwave_rpc_client::StreamingControlHandle;
54use thiserror_ext::AsReport;
55use tokio::time::{Instant, sleep};
56use tokio_retry::strategy::ExponentialBackoff;
57use tracing::{debug, error, info, warn};
58use uuid::Uuid;
59
60use super::{BarrierKind, TracedEpoch};
61use crate::barrier::cdc_progress::CdcTableBackfillTracker;
62use crate::barrier::checkpoint::{
63 BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
64};
65use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
66use crate::barrier::edge_builder::FragmentEdgeBuildResult;
67use crate::barrier::info::{
68 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
69 SubscriberType,
70};
71use crate::barrier::progress::CreateMviewProgressTracker;
72use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
73use crate::controller::fragment::InflightFragmentInfo;
74use crate::manager::MetaSrvEnv;
75use crate::model::{
76 ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
77 SubscriptionId,
78};
79use crate::stream::cdc::{
80 CdcTableSnapshotSplits, is_parallelized_backfill_enabled_cdc_scan_fragment,
81};
82use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
83use crate::{MetaError, MetaResult};
84
85fn to_partial_graph_id(job_id: Option<JobId>) -> u32 {
86 job_id
87 .map(|job_id| {
88 assert_ne!(job_id, u32::MAX);
89 job_id.as_raw_id()
90 })
91 .unwrap_or(u32::MAX)
92}
93
94pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<JobId> {
95 if partial_graph_id == u32::MAX {
96 None
97 } else {
98 Some(JobId::new(partial_graph_id))
99 }
100}
101
102struct ControlStreamNode {
103 worker_id: WorkerId,
104 host: HostAddress,
105 handle: StreamingControlHandle,
106}
107
108enum WorkerNodeState {
109 Connected {
110 control_stream: ControlStreamNode,
111 removed: bool,
112 },
113 Reconnecting(BoxFuture<'static, StreamingControlHandle>),
114}
115
116pub(super) struct ControlStreamManager {
117 workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
118 pub env: MetaSrvEnv,
119}
120
121impl ControlStreamManager {
122 pub(super) fn new(env: MetaSrvEnv) -> Self {
123 Self {
124 workers: Default::default(),
125 env,
126 }
127 }
128
129 pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
130 self.workers[&worker_id].0.host.clone().unwrap()
131 }
132
133 pub(super) async fn add_worker(
134 &mut self,
135 node: WorkerNode,
136 inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
137 term_id: String,
138 context: &impl GlobalBarrierWorkerContext,
139 ) {
140 let node_id = node.id;
141 if let Entry::Occupied(entry) = self.workers.entry(node_id) {
142 let (existing_node, worker_state) = entry.get();
143 assert_eq!(existing_node.host, node.host);
144 warn!(id = %node.id, host = ?node.host, "node already exists");
145 match worker_state {
146 WorkerNodeState::Connected { .. } => {
147 warn!(id = %node.id, host = ?node.host, "new node already connected");
148 return;
149 }
150 WorkerNodeState::Reconnecting(_) => {
151 warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
152 entry.remove();
153 }
154 }
155 }
156 let node_host = node.host.clone().unwrap();
157 let mut backoff = ExponentialBackoff::from_millis(100)
158 .max_delay(Duration::from_secs(3))
159 .factor(5);
160 const MAX_RETRY: usize = 5;
161 for i in 1..=MAX_RETRY {
162 match context
163 .new_control_stream(
164 &node,
165 &PbInitRequest {
166 term_id: term_id.clone(),
167 },
168 )
169 .await
170 {
171 Ok(mut handle) => {
172 WorkerNodeConnected {
173 handle: &mut handle,
174 node: &node,
175 }
176 .initialize(inflight_infos);
177 info!(?node_host, "add control stream worker");
178 assert!(
179 self.workers
180 .insert(
181 node_id,
182 (
183 node,
184 WorkerNodeState::Connected {
185 control_stream: ControlStreamNode {
186 worker_id: node_id as _,
187 host: node_host,
188 handle,
189 },
190 removed: false
191 }
192 )
193 )
194 .is_none()
195 );
196 return;
197 }
198 Err(e) => {
199 let delay = backoff.next().unwrap();
202 error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
203 sleep(delay).await;
204 }
205 }
206 }
207 error!(?node_host, "fail to create worker node after retry");
208 }
209
210 pub(super) fn remove_worker(&mut self, node: WorkerNode) {
211 if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
212 let (_, worker_state) = entry.get_mut();
213 match worker_state {
214 WorkerNodeState::Connected { removed, .. } => {
215 info!(worker_id = %node.id, "mark connected worker as removed");
216 *removed = true;
217 }
218 WorkerNodeState::Reconnecting(_) => {
219 info!(worker_id = %node.id, "remove worker");
220 entry.remove();
221 }
222 }
223 }
224 }
225
226 fn retry_connect(
227 node: WorkerNode,
228 term_id: String,
229 context: Arc<impl GlobalBarrierWorkerContext>,
230 ) -> BoxFuture<'static, StreamingControlHandle> {
231 async move {
232 let mut attempt = 0;
233 let backoff = ExponentialBackoff::from_millis(100)
234 .max_delay(Duration::from_mins(1))
235 .factor(5);
236 let init_request = PbInitRequest { term_id };
237 for delay in backoff {
238 attempt += 1;
239 sleep(delay).await;
240 match context.new_control_stream(&node, &init_request).await {
241 Ok(handle) => {
242 return handle;
243 }
244 Err(e) => {
245 warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
246 }
247 }
248 }
249 unreachable!("end of retry backoff")
250 }.boxed()
251 }
252
253 pub(super) async fn recover(
254 env: MetaSrvEnv,
255 nodes: &HashMap<WorkerId, WorkerNode>,
256 term_id: &str,
257 context: Arc<impl GlobalBarrierWorkerContext>,
258 ) -> Self {
259 let reset_start_time = Instant::now();
260 let init_request = PbInitRequest {
261 term_id: term_id.to_owned(),
262 };
263 let init_request = &init_request;
264 let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
265 let result = context.new_control_stream(node, init_request).await;
266 (*worker_id, node.clone(), result)
267 }))
268 .await;
269 let mut unconnected_workers = HashSet::new();
270 let mut workers = HashMap::new();
271 for (worker_id, node, result) in nodes {
272 match result {
273 Ok(handle) => {
274 let control_stream = ControlStreamNode {
275 worker_id: node.id,
276 host: node.host.clone().unwrap(),
277 handle,
278 };
279 assert!(
280 workers
281 .insert(
282 worker_id,
283 (
284 node,
285 WorkerNodeState::Connected {
286 control_stream,
287 removed: false
288 }
289 )
290 )
291 .is_none()
292 );
293 }
294 Err(e) => {
295 unconnected_workers.insert(worker_id);
296 warn!(
297 e = %e.as_report(),
298 %worker_id,
299 ?node,
300 "failed to connect to node"
301 );
302 assert!(
303 workers
304 .insert(
305 worker_id,
306 (
307 node.clone(),
308 WorkerNodeState::Reconnecting(Self::retry_connect(
309 node,
310 term_id.to_owned(),
311 context.clone()
312 ))
313 )
314 )
315 .is_none()
316 );
317 }
318 }
319 }
320
321 info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
322
323 Self { workers, env }
324 }
325
326 pub(super) fn clear(&mut self) {
328 *self = Self::new(self.env.clone());
329 }
330}
331
332pub(super) struct WorkerNodeConnected<'a> {
333 node: &'a WorkerNode,
334 handle: &'a mut StreamingControlHandle,
335}
336
337impl<'a> WorkerNodeConnected<'a> {
338 pub(super) fn initialize(
339 self,
340 inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
341 ) {
342 for request in ControlStreamManager::collect_init_partial_graph(inflight_infos) {
343 if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
344 request: Some(
345 streaming_control_stream_request::Request::CreatePartialGraph(request),
346 ),
347 }) {
348 warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
349 }
350 }
351 }
352}
353
354pub(super) enum WorkerNodeEvent<'a> {
355 Response(MetaResult<streaming_control_stream_response::Response>),
356 Connected(WorkerNodeConnected<'a>),
357}
358
359impl ControlStreamManager {
360 fn poll_next_event<'a>(
361 this_opt: &mut Option<&'a mut Self>,
362 cx: &mut Context<'_>,
363 term_id: &str,
364 context: &Arc<impl GlobalBarrierWorkerContext>,
365 poll_reconnect: bool,
366 ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
367 let this = this_opt.as_mut().expect("Future polled after completion");
368 if this.workers.is_empty() {
369 return Poll::Pending;
370 }
371 {
372 for (&worker_id, (node, worker_state)) in &mut this.workers {
373 let control_stream = match worker_state {
374 WorkerNodeState::Connected { control_stream, .. } => control_stream,
375 WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
376 continue;
377 }
378 WorkerNodeState::Reconnecting(join_handle) => {
379 match join_handle.poll_unpin(cx) {
380 Poll::Ready(handle) => {
381 info!(id=%node.id, host=?node.host, "reconnected to worker");
382 *worker_state = WorkerNodeState::Connected {
383 control_stream: ControlStreamNode {
384 worker_id: node.id,
385 host: node.host.clone().unwrap(),
386 handle,
387 },
388 removed: false,
389 };
390 let this = this_opt.take().expect("should exist");
391 let (node, worker_state) =
392 this.workers.get_mut(&worker_id).expect("should exist");
393 let WorkerNodeState::Connected { control_stream, .. } =
394 worker_state
395 else {
396 unreachable!()
397 };
398 return Poll::Ready((
399 worker_id,
400 WorkerNodeEvent::Connected(WorkerNodeConnected {
401 node,
402 handle: &mut control_stream.handle,
403 }),
404 ));
405 }
406 Poll::Pending => {
407 continue;
408 }
409 }
410 }
411 };
412 match control_stream.handle.response_stream.poll_next_unpin(cx) {
413 Poll::Ready(result) => {
414 {
415 let result = result
416 .ok_or_else(|| (false, anyhow!("end of stream").into()))
417 .and_then(|result| {
418 result.map_err(|err| -> (bool, MetaError) {(false, err.into())}).and_then(|resp| {
419 match resp
420 .response
421 .ok_or_else(|| (false, anyhow!("empty response").into()))?
422 {
423 streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
424 "worker node {worker_id} is shutting down"
425 )
426 .into())),
427 streaming_control_stream_response::Response::Init(_) => {
428 Err((false, anyhow!("get unexpected init response").into()))
430 }
431 resp => {
432 if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
433 assert_eq!(worker_id, barrier_resp.worker_id);
434 }
435 Ok(resp)
436 },
437 }
438 })
439 });
440 let result = match result {
441 Ok(resp) => Ok(resp),
442 Err((shutdown, err)) => {
443 warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
444 let WorkerNodeState::Connected { removed, .. } = worker_state
445 else {
446 unreachable!("checked connected")
447 };
448 if *removed || shutdown {
449 this.workers.remove(&worker_id);
450 } else {
451 *worker_state = WorkerNodeState::Reconnecting(
452 ControlStreamManager::retry_connect(
453 node.clone(),
454 term_id.to_owned(),
455 context.clone(),
456 ),
457 );
458 }
459 Err(err)
460 }
461 };
462 return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
463 }
464 }
465 Poll::Pending => {
466 continue;
467 }
468 }
469 }
470 };
471
472 Poll::Pending
473 }
474
475 #[await_tree::instrument("control_stream_next_event")]
476 pub(super) async fn next_event<'a>(
477 &'a mut self,
478 term_id: &str,
479 context: &Arc<impl GlobalBarrierWorkerContext>,
480 ) -> (WorkerId, WorkerNodeEvent<'a>) {
481 let mut this = Some(self);
482 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
483 }
484
485 #[await_tree::instrument("control_stream_next_response")]
486 pub(super) async fn next_response(
487 &mut self,
488 term_id: &str,
489 context: &Arc<impl GlobalBarrierWorkerContext>,
490 ) -> (
491 WorkerId,
492 MetaResult<streaming_control_stream_response::Response>,
493 ) {
494 let mut this = Some(self);
495 let (worker_id, event) =
496 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
497 match event {
498 WorkerNodeEvent::Response(result) => (worker_id, result),
499 WorkerNodeEvent::Connected(_) => {
500 unreachable!("set poll_reconnect=false")
501 }
502 }
503 }
504
505 fn collect_init_partial_graph(
506 initial_inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
507 ) -> impl Iterator<Item = PbCreatePartialGraphRequest> {
508 initial_inflight_infos.flat_map(|(database_id, creating_job_ids)| {
509 [PbCreatePartialGraphRequest {
510 partial_graph_id: to_partial_graph_id(None),
511 database_id,
512 }]
513 .into_iter()
514 .chain(
515 creating_job_ids.map(move |job_id| PbCreatePartialGraphRequest {
516 partial_graph_id: to_partial_graph_id(Some(job_id)),
517 database_id,
518 }),
519 )
520 })
521 }
522}
523
524pub(super) struct DatabaseInitialBarrierCollector {
525 database_id: DatabaseId,
526 node_to_collect: NodeToCollect,
527 database_state: BarrierWorkerState,
528 database_info: InflightDatabaseInfo,
529 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
530 committed_epoch: u64,
531}
532
533impl Debug for DatabaseInitialBarrierCollector {
534 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
535 f.debug_struct("DatabaseInitialBarrierCollector")
536 .field("database_id", &self.database_id)
537 .field("node_to_collect", &self.node_to_collect)
538 .finish()
539 }
540}
541
542impl DatabaseInitialBarrierCollector {
543 pub(super) fn is_collected(&self) -> bool {
544 self.node_to_collect.is_empty()
545 && self
546 .creating_streaming_job_controls
547 .values()
548 .all(|job| job.is_empty())
549 }
550
551 pub(super) fn database_state(
552 &self,
553 ) -> (
554 &BarrierWorkerState,
555 &HashMap<JobId, CreatingStreamingJobControl>,
556 ) {
557 (&self.database_state, &self.creating_streaming_job_controls)
558 }
559
560 pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
561 assert_eq!(self.database_id, resp.database_id);
562 if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
563 assert!(
564 !self
565 .creating_streaming_job_controls
566 .get_mut(&creating_job_id)
567 .expect("should exist")
568 .collect(resp),
569 "unlikely to finish backfill since just recovered"
570 );
571 } else {
572 assert_eq!(resp.epoch, self.committed_epoch);
573 assert!(self.node_to_collect.remove(&resp.worker_id).is_some());
574 }
575 }
576
577 pub(super) fn finish(self) -> DatabaseCheckpointControl {
578 assert!(self.is_collected());
579 DatabaseCheckpointControl::recovery(
580 self.database_id,
581 self.database_state,
582 self.committed_epoch,
583 self.database_info,
584 self.creating_streaming_job_controls,
585 )
586 }
587
588 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
589 is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
590 && self
591 .creating_streaming_job_controls
592 .values_mut()
593 .all(|job| job.is_valid_after_worker_err(worker_id))
594 }
595}
596
597impl ControlStreamManager {
598 #[expect(clippy::too_many_arguments)]
600 pub(super) fn inject_database_initial_barrier(
601 &mut self,
602 database_id: DatabaseId,
603 jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
604 state_table_committed_epochs: &mut HashMap<TableId, u64>,
605 state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
606 fragment_relations: &FragmentDownstreamRelation,
607 edges: &mut FragmentEdgeBuildResult,
608 stream_actors: &HashMap<ActorId, StreamActor>,
609 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
610 background_jobs: &mut HashMap<JobId, String>,
611 mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
612 is_paused: bool,
613 hummock_version_stats: &HummockVersionStats,
614 cdc_table_snapshot_splits: &mut HashMap<JobId, CdcTableSnapshotSplits>,
615 ) -> MetaResult<DatabaseInitialBarrierCollector> {
616 self.add_partial_graph(database_id, None);
617 fn collect_source_splits(
618 fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
619 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
620 ) -> HashMap<ActorId, Vec<SplitImpl>> {
621 fragment_infos
622 .flat_map(|info| info.actors.keys())
623 .filter_map(|actor_id| {
624 let actor_id = *actor_id as ActorId;
625 source_splits
626 .remove(&actor_id)
627 .map(|splits| (actor_id, splits))
628 })
629 .collect()
630 }
631 fn build_mutation(
632 splits: &HashMap<ActorId, Vec<SplitImpl>>,
633 cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits>,
634 is_paused: bool,
635 ) -> Mutation {
636 Mutation::Add(AddMutation {
637 actor_dispatchers: Default::default(),
639 added_actors: Default::default(),
640 actor_splits: build_actor_connector_splits(splits),
641 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
642 splits: cdc_table_snapshot_split_assignment,
643 }),
644 pause: is_paused,
645 subscriptions_to_add: Default::default(),
646 backfill_nodes_to_pause: Default::default(),
648 new_upstream_sinks: Default::default(),
649 })
650 }
651
652 fn resolve_jobs_committed_epoch<'a>(
653 state_table_committed_epochs: &mut HashMap<TableId, u64>,
654 fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
655 ) -> u64 {
656 let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
657 (
658 table_id,
659 state_table_committed_epochs
660 .remove(&table_id)
661 .expect("should exist"),
662 )
663 });
664 let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
665 for (table_id, epoch) in epochs {
666 assert_eq!(
667 prev_epoch, epoch,
668 "{} has different committed epoch to {}",
669 first_table_id, table_id
670 );
671 }
672 prev_epoch
673 }
674
675 let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
676 .keys()
677 .filter_map(|job_id| {
678 mv_depended_subscriptions
679 .remove(&job_id.as_mv_table_id())
680 .map(|subscriptions| {
681 (
682 job_id.as_mv_table_id(),
683 subscriptions
684 .into_iter()
685 .map(|(subscription_id, retention)| {
686 (
687 subscription_id.as_subscriber_id(),
688 SubscriberType::Subscription(retention),
689 )
690 })
691 .collect(),
692 )
693 })
694 })
695 .collect();
696
697 let mut database_jobs = HashMap::new();
698 let mut snapshot_backfill_jobs = HashMap::new();
699
700 for (job_id, job_fragments) in jobs {
701 if let Some(definition) = background_jobs.remove(&job_id) {
702 if job_fragments.values().any(|fragment| {
703 fragment
704 .fragment_type_mask
705 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
706 }) {
707 debug!(%job_id, definition, "recovered snapshot backfill job");
708 snapshot_backfill_jobs.insert(job_id, (job_fragments, definition));
709 } else {
710 database_jobs.insert(job_id, (job_fragments, Some(definition)));
711 }
712 } else {
713 database_jobs.insert(job_id, (job_fragments, None));
714 }
715 }
716
717 let database_job_log_epochs: HashMap<_, _> = database_jobs
718 .keys()
719 .filter_map(|job_id| {
720 state_table_log_epochs
721 .remove(&job_id.as_mv_table_id())
722 .map(|epochs| (job_id.as_mv_table_id(), epochs))
723 })
724 .collect();
725
726 let prev_epoch = resolve_jobs_committed_epoch(
727 state_table_committed_epochs,
728 database_jobs.values().flat_map(|(job, _)| job.values()),
729 );
730 let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
731 let curr_epoch = prev_epoch.next();
733 let barrier_info = BarrierInfo {
734 prev_epoch,
735 curr_epoch,
736 kind: BarrierKind::Initial,
737 };
738
739 let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
740 for (job_id, (fragment_infos, definition)) in snapshot_backfill_jobs {
741 let committed_epoch =
742 resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
743 if committed_epoch == barrier_info.prev_epoch() {
744 info!(
745 "recovered creating snapshot backfill job {} catch up with upstream already",
746 job_id
747 );
748 database_jobs
749 .try_insert(job_id, (fragment_infos, Some(definition)))
750 .expect("non-duplicate");
751 continue;
752 }
753 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
754 fragment_infos
755 .values()
756 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
757 )?
758 .0
759 .ok_or_else(|| {
760 anyhow!(
761 "recovered snapshot backfill job {} has no snapshot backfill info",
762 job_id
763 )
764 })?;
765 let mut snapshot_epoch = None;
766 let upstream_table_ids: HashSet<_> = snapshot_backfill_info
767 .upstream_mv_table_id_to_backfill_epoch
768 .keys()
769 .cloned()
770 .collect();
771 for (upstream_table_id, epoch) in
772 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
773 {
774 let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
775 let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
776 if *snapshot_epoch != epoch {
777 return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
778 }
779 }
780 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
781 anyhow!(
782 "snapshot backfill job {} has not set snapshot epoch",
783 job_id
784 )
785 })?;
786 for upstream_table_id in &upstream_table_ids {
787 subscribers
788 .entry(*upstream_table_id)
789 .or_default()
790 .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
791 .expect("non-duplicate");
792 }
793 ongoing_snapshot_backfill_jobs
794 .try_insert(
795 job_id,
796 (
797 fragment_infos,
798 definition,
799 upstream_table_ids,
800 committed_epoch,
801 snapshot_epoch,
802 ),
803 )
804 .expect("non-duplicated");
805 }
806
807 let mut cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits> =
808 HashMap::new();
809
810 let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
811 database_jobs
812 .into_iter()
813 .map(|(job_id, (fragment_infos, background_job_definition))| {
814 let status = if let Some(definition) = background_job_definition {
815 CreateStreamingJobStatus::Creating(CreateMviewProgressTracker::recover(
816 job_id,
817 definition,
818 &fragment_infos,
819 Default::default(),
820 hummock_version_stats,
821 ))
822 } else {
823 CreateStreamingJobStatus::Created
824 };
825 let cdc_table_backfill_tracker =
826 if let Some(splits) = cdc_table_snapshot_splits.remove(&job_id) {
827 let cdc_fragment = fragment_infos
828 .values()
829 .find(|fragment| {
830 is_parallelized_backfill_enabled_cdc_scan_fragment(
831 fragment.fragment_type_mask,
832 &fragment.nodes,
833 )
834 .is_some()
835 })
836 .expect("should have parallel cdc fragment");
837 let cdc_actors = cdc_fragment.actors.keys().copied().collect();
838 let mut tracker =
839 CdcTableBackfillTracker::restore(cdc_fragment.fragment_id, splits);
840 cdc_table_snapshot_split_assignment
841 .extend(tracker.reassign_splits(cdc_actors)?);
842 Some(tracker)
843 } else {
844 None
845 };
846 Ok((
847 job_id,
848 InflightStreamingJobInfo {
849 job_id,
850 fragment_infos,
851 subscribers: subscribers
852 .remove(&job_id.as_mv_table_id())
853 .unwrap_or_default(),
854 status,
855 cdc_table_backfill_tracker,
856 },
857 ))
858 })
859 .try_collect::<_, _, MetaError>()
860 }?;
861
862 let node_to_collect = {
863 let new_actors =
864 edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
865 job.fragment_infos.values().map(move |fragment_infos| {
866 (
867 fragment_infos.fragment_id,
868 &fragment_infos.nodes,
869 fragment_infos.actors.iter().map(move |(actor_id, actor)| {
870 (
871 stream_actors.get(actor_id).expect("should exist"),
872 actor.worker_id,
873 )
874 }),
875 job.subscribers.keys().copied(),
876 )
877 })
878 }));
879
880 let nodes_actors =
881 InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
882 let database_job_source_splits =
883 collect_source_splits(database_jobs.values().flatten(), source_splits);
884 let mutation = build_mutation(
885 &database_job_source_splits,
886 cdc_table_snapshot_split_assignment,
887 is_paused,
888 );
889
890 let node_to_collect = self.inject_barrier(
891 database_id,
892 None,
893 Some(mutation),
894 &barrier_info,
895 &nodes_actors,
896 InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
897 Some(new_actors),
898 )?;
899 debug!(
900 ?node_to_collect,
901 %database_id,
902 "inject initial barrier"
903 );
904 node_to_collect
905 };
906
907 let mut creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl> =
908 HashMap::new();
909 for (job_id, (info, definition, upstream_table_ids, committed_epoch, snapshot_epoch)) in
910 ongoing_snapshot_backfill_jobs
911 {
912 let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
913 (
914 fragment_infos.fragment_id,
915 &fragment_infos.nodes,
916 fragment_infos.actors.iter().map(move |(actor_id, actor)| {
917 (
918 stream_actors.get(actor_id).expect("should exist"),
919 actor.worker_id,
920 )
921 }),
922 vec![], )
924 }));
925
926 let database_job_source_splits =
927 collect_source_splits(database_jobs.values().flatten(), source_splits);
928 assert!(
929 !cdc_table_snapshot_splits.contains_key(&job_id),
930 "snapshot backfill job {job_id} should not have cdc backfill"
931 );
932 if is_paused {
933 bail!("should not pause when having snapshot backfill job {job_id}");
934 }
935 let mutation = build_mutation(
936 &database_job_source_splits,
937 Default::default(), false,
939 );
940
941 creating_streaming_job_controls.insert(
942 job_id,
943 CreatingStreamingJobControl::recover(
944 database_id,
945 job_id,
946 definition,
947 upstream_table_ids,
948 &database_job_log_epochs,
949 snapshot_epoch,
950 committed_epoch,
951 &barrier_info,
952 info,
953 fragment_relations,
954 hummock_version_stats,
955 node_actors,
956 mutation.clone(),
957 self,
958 )?,
959 );
960 }
961
962 self.env.shared_actor_infos().recover_database(
963 database_id,
964 database_jobs
965 .values()
966 .flat_map(|info| {
967 info.fragment_infos()
968 .map(move |fragment| (fragment, info.job_id))
969 })
970 .chain(
971 creating_streaming_job_controls
972 .values()
973 .flat_map(|job| job.fragment_infos_with_job_id()),
974 ),
975 );
976
977 let committed_epoch = barrier_info.prev_epoch();
978 let new_epoch = barrier_info.curr_epoch;
979 let database_info = InflightDatabaseInfo::recover(
980 database_id,
981 database_jobs.into_values(),
982 self.env.shared_actor_infos().clone(),
983 );
984 let database_state = BarrierWorkerState::recovery(new_epoch, is_paused);
985 Ok(DatabaseInitialBarrierCollector {
986 database_id,
987 node_to_collect,
988 database_state,
989 database_info,
990 creating_streaming_job_controls,
991 committed_epoch,
992 })
993 }
994
995 fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
996 self.workers
997 .iter()
998 .filter_map(|(worker_id, (_, worker_state))| match worker_state {
999 WorkerNodeState::Connected { control_stream, .. } => {
1000 Some((*worker_id, control_stream))
1001 }
1002 WorkerNodeState::Reconnecting(_) => None,
1003 })
1004 }
1005
1006 pub(super) fn inject_barrier(
1007 &mut self,
1008 database_id: DatabaseId,
1009 creating_job_id: Option<JobId>,
1010 mutation: Option<Mutation>,
1011 barrier_info: &BarrierInfo,
1012 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
1013 table_ids_to_sync: impl Iterator<Item = TableId>,
1014 mut new_actors: Option<StreamJobActorsToCreate>,
1015 ) -> MetaResult<NodeToCollect> {
1016 fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1017 "inject_barrier_err"
1018 ));
1019
1020 let partial_graph_id = to_partial_graph_id(creating_job_id);
1021
1022 for worker_id in node_actors.keys() {
1023 if let Some((_, worker_state)) = self.workers.get(worker_id)
1024 && let WorkerNodeState::Connected { .. } = worker_state
1025 {
1026 } else {
1027 return Err(anyhow!("unconnected worker node {}", worker_id).into());
1028 }
1029 }
1030
1031 let mut node_need_collect = HashMap::new();
1032 let table_ids_to_sync = table_ids_to_sync.collect_vec();
1033
1034 self.connected_workers()
1035 .try_for_each(|(node_id, node)| {
1036 let actor_ids_to_collect = node_actors
1037 .get(&node_id)
1038 .map(|actors| actors.iter().cloned())
1039 .into_iter()
1040 .flatten()
1041 .collect_vec();
1042 let is_empty = actor_ids_to_collect.is_empty();
1043 {
1044 let mutation = mutation.clone();
1045 let barrier = Barrier {
1046 epoch: Some(risingwave_pb::data::Epoch {
1047 curr: barrier_info.curr_epoch(),
1048 prev: barrier_info.prev_epoch(),
1049 }),
1050 mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1051 tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1052 .to_protobuf(),
1053 kind: barrier_info.kind.to_protobuf() as i32,
1054 };
1055
1056 node.handle
1057 .request_sender
1058 .send(StreamingControlStreamRequest {
1059 request: Some(
1060 streaming_control_stream_request::Request::InjectBarrier(
1061 InjectBarrierRequest {
1062 request_id: Uuid::new_v4().to_string(),
1063 barrier: Some(barrier),
1064 database_id,
1065 actor_ids_to_collect,
1066 table_ids_to_sync: table_ids_to_sync.clone(),
1067 partial_graph_id,
1068 actors_to_build: new_actors
1069 .as_mut()
1070 .map(|new_actors| new_actors.remove(&(node_id as _)))
1071 .into_iter()
1072 .flatten()
1073 .flatten()
1074 .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1075 FragmentBuildActorInfo {
1076 fragment_id,
1077 node: Some(node),
1078 actors: actors
1079 .into_iter()
1080 .map(|(actor, upstreams, dispatchers)| {
1081 BuildActorInfo {
1082 actor_id: actor.actor_id,
1083 fragment_upstreams: upstreams
1084 .into_iter()
1085 .map(|(fragment_id, upstreams)| {
1086 (
1087 fragment_id,
1088 UpstreamActors {
1089 actors: upstreams
1090 .into_values()
1091 .collect(),
1092 },
1093 )
1094 })
1095 .collect(),
1096 dispatchers,
1097 vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1098 mview_definition: actor.mview_definition,
1099 expr_context: actor.expr_context,
1100 config_override: actor.config_override.to_string(),
1101 initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1102 }
1103 })
1104 .collect(),
1105 }
1106 })
1107 .collect(),
1108 },
1109 ),
1110 ),
1111 })
1112 .map_err(|_| {
1113 MetaError::from(anyhow!(
1114 "failed to send request to {} {:?}",
1115 node.worker_id,
1116 node.host
1117 ))
1118 })?;
1119
1120 node_need_collect.insert(node_id as WorkerId, is_empty);
1121 Result::<_, MetaError>::Ok(())
1122 }
1123 })
1124 .inspect_err(|e| {
1125 use risingwave_pb::meta::event_log;
1127 let event = event_log::EventInjectBarrierFail {
1128 prev_epoch: barrier_info.prev_epoch(),
1129 cur_epoch: barrier_info.curr_epoch(),
1130 error: e.to_report_string(),
1131 };
1132 self.env
1133 .event_log_manager_ref()
1134 .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1135 })?;
1136 Ok(node_need_collect)
1137 }
1138
1139 pub(super) fn add_partial_graph(
1140 &mut self,
1141 database_id: DatabaseId,
1142 creating_job_id: Option<JobId>,
1143 ) {
1144 let partial_graph_id = to_partial_graph_id(creating_job_id);
1145 self.connected_workers().for_each(|(_, node)| {
1146 if node
1147 .handle
1148 .request_sender
1149 .send(StreamingControlStreamRequest {
1150 request: Some(
1151 streaming_control_stream_request::Request::CreatePartialGraph(
1152 CreatePartialGraphRequest {
1153 database_id,
1154 partial_graph_id,
1155 },
1156 ),
1157 ),
1158 }).is_err() {
1159 warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1160 }
1161 });
1162 }
1163
1164 pub(super) fn remove_partial_graph(
1165 &mut self,
1166 database_id: DatabaseId,
1167 creating_job_ids: Vec<JobId>,
1168 ) {
1169 if creating_job_ids.is_empty() {
1170 return;
1171 }
1172 let partial_graph_ids = creating_job_ids
1173 .into_iter()
1174 .map(|job_id| to_partial_graph_id(Some(job_id)))
1175 .collect_vec();
1176 self.connected_workers().for_each(|(_, node)| {
1177 if node.handle
1178 .request_sender
1179 .send(StreamingControlStreamRequest {
1180 request: Some(
1181 streaming_control_stream_request::Request::RemovePartialGraph(
1182 RemovePartialGraphRequest {
1183 partial_graph_ids: partial_graph_ids.clone(),
1184 database_id,
1185 },
1186 ),
1187 ),
1188 })
1189 .is_err()
1190 {
1191 warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1192 }
1193 })
1194 }
1195
1196 pub(super) fn reset_database(
1197 &mut self,
1198 database_id: DatabaseId,
1199 reset_request_id: u32,
1200 ) -> HashSet<WorkerId> {
1201 self.connected_workers()
1202 .filter_map(|(worker_id, node)| {
1203 if node
1204 .handle
1205 .request_sender
1206 .send(StreamingControlStreamRequest {
1207 request: Some(streaming_control_stream_request::Request::ResetDatabase(
1208 ResetDatabaseRequest {
1209 database_id,
1210 reset_request_id,
1211 },
1212 )),
1213 })
1214 .is_err()
1215 {
1216 warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1217 None
1218 } else {
1219 Some(worker_id)
1220 }
1221 })
1222 .collect()
1223 }
1224}
1225
1226impl GlobalBarrierWorkerContextImpl {
1227 pub(super) async fn new_control_stream_impl(
1228 &self,
1229 node: &WorkerNode,
1230 init_request: &PbInitRequest,
1231 ) -> MetaResult<StreamingControlHandle> {
1232 let handle = self
1233 .env
1234 .stream_client_pool()
1235 .get(node)
1236 .await?
1237 .start_streaming_control(init_request.clone())
1238 .await?;
1239 Ok(handle)
1240 }
1241}
1242
1243pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1244 message: &str,
1245 errors: impl IntoIterator<Item = (WorkerId, E)>,
1246) -> MetaError {
1247 use std::fmt::Write;
1248
1249 use risingwave_common::error::error_request_copy;
1250 use risingwave_common::error::tonic::extra::Score;
1251
1252 let errors = errors.into_iter().collect_vec();
1253
1254 if errors.is_empty() {
1255 return anyhow!(message.to_owned()).into();
1256 }
1257
1258 let single_error = |(worker_id, e)| {
1260 anyhow::Error::from(e)
1261 .context(format!("{message}, in worker node {worker_id}"))
1262 .into()
1263 };
1264
1265 if errors.len() == 1 {
1266 return single_error(errors.into_iter().next().unwrap());
1267 }
1268
1269 let max_score = errors
1271 .iter()
1272 .filter_map(|(_, e)| error_request_copy::<Score>(e))
1273 .max();
1274
1275 if let Some(max_score) = max_score {
1276 let mut errors = errors;
1277 let max_scored = errors
1278 .extract_if(.., |(_, e)| {
1279 error_request_copy::<Score>(e) == Some(max_score)
1280 })
1281 .next()
1282 .unwrap();
1283
1284 return single_error(max_scored);
1285 }
1286
1287 let concat: String = errors
1289 .into_iter()
1290 .fold(format!("{message}: "), |mut s, (w, e)| {
1291 write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1292 s
1293 });
1294 anyhow!(concat).into()
1295}