1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
30use risingwave_common::id::JobId;
31use risingwave_common::util::epoch::Epoch;
32use risingwave_common::util::tracing::TracingContext;
33use risingwave_connector::source::SplitImpl;
34use risingwave_connector::source::cdc::{
35 CdcTableSnapshotSplitAssignmentWithGeneration,
36 build_pb_actor_cdc_table_snapshot_splits_with_generation,
37};
38use risingwave_meta_model::WorkerId;
39use risingwave_pb::common::{HostAddress, WorkerNode};
40use risingwave_pb::hummock::HummockVersionStats;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
43use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
44use risingwave_pb::stream_service::inject_barrier_request::{
45 BuildActorInfo, FragmentBuildActorInfo,
46};
47use risingwave_pb::stream_service::streaming_control_stream_request::{
48 CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
49 RemovePartialGraphRequest, ResetDatabaseRequest,
50};
51use risingwave_pb::stream_service::{
52 BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
53 streaming_control_stream_request, streaming_control_stream_response,
54};
55use risingwave_rpc_client::StreamingControlHandle;
56use thiserror_ext::AsReport;
57use tokio::time::{Instant, sleep};
58use tokio_retry::strategy::ExponentialBackoff;
59use tracing::{debug, error, info, warn};
60use uuid::Uuid;
61
62use super::{BarrierKind, TracedEpoch};
63use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
64use crate::barrier::checkpoint::{
65 BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
66};
67use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
68use crate::barrier::edge_builder::FragmentEdgeBuildResult;
69use crate::barrier::info::{
70 BarrierInfo, CreateStreamingJobStatus, InflightStreamingJobInfo, SubscriberType,
71};
72use crate::barrier::progress::CreateMviewProgressTracker;
73use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
74use crate::controller::fragment::InflightFragmentInfo;
75use crate::manager::MetaSrvEnv;
76use crate::model::{ActorId, FragmentId, StreamActor, StreamJobActorsToCreate, SubscriptionId};
77use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
78use crate::{MetaError, MetaResult};
79
80fn to_partial_graph_id(job_id: Option<JobId>) -> u32 {
81 job_id
82 .map(|job_id| {
83 assert_ne!(job_id, u32::MAX);
84 job_id.as_raw_id()
85 })
86 .unwrap_or(u32::MAX)
87}
88
89pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<JobId> {
90 if partial_graph_id == u32::MAX {
91 None
92 } else {
93 Some(JobId::new(partial_graph_id))
94 }
95}
96
97struct ControlStreamNode {
98 worker_id: WorkerId,
99 host: HostAddress,
100 handle: StreamingControlHandle,
101}
102
103enum WorkerNodeState {
104 Connected {
105 control_stream: ControlStreamNode,
106 removed: bool,
107 },
108 Reconnecting(BoxFuture<'static, StreamingControlHandle>),
109}
110
111pub(super) struct ControlStreamManager {
112 workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
113 pub env: MetaSrvEnv,
114}
115
116impl ControlStreamManager {
117 pub(super) fn new(env: MetaSrvEnv) -> Self {
118 Self {
119 workers: Default::default(),
120 env,
121 }
122 }
123
124 pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
125 self.workers[&worker_id].0.host.clone().unwrap()
126 }
127
128 pub(super) async fn add_worker(
129 &mut self,
130 node: WorkerNode,
131 inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
132 term_id: String,
133 context: &impl GlobalBarrierWorkerContext,
134 ) {
135 let node_id = node.id;
136 if let Entry::Occupied(entry) = self.workers.entry(node_id) {
137 let (existing_node, worker_state) = entry.get();
138 assert_eq!(existing_node.host, node.host);
139 warn!(id = %node.id, host = ?node.host, "node already exists");
140 match worker_state {
141 WorkerNodeState::Connected { .. } => {
142 warn!(id = %node.id, host = ?node.host, "new node already connected");
143 return;
144 }
145 WorkerNodeState::Reconnecting(_) => {
146 warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
147 entry.remove();
148 }
149 }
150 }
151 let node_host = node.host.clone().unwrap();
152 let mut backoff = ExponentialBackoff::from_millis(100)
153 .max_delay(Duration::from_secs(3))
154 .factor(5);
155 const MAX_RETRY: usize = 5;
156 for i in 1..=MAX_RETRY {
157 match context
158 .new_control_stream(
159 &node,
160 &PbInitRequest {
161 term_id: term_id.clone(),
162 },
163 )
164 .await
165 {
166 Ok(mut handle) => {
167 WorkerNodeConnected {
168 handle: &mut handle,
169 node: &node,
170 }
171 .initialize(inflight_infos);
172 info!(?node_host, "add control stream worker");
173 assert!(
174 self.workers
175 .insert(
176 node_id,
177 (
178 node,
179 WorkerNodeState::Connected {
180 control_stream: ControlStreamNode {
181 worker_id: node_id as _,
182 host: node_host,
183 handle,
184 },
185 removed: false
186 }
187 )
188 )
189 .is_none()
190 );
191 return;
192 }
193 Err(e) => {
194 let delay = backoff.next().unwrap();
197 error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
198 sleep(delay).await;
199 }
200 }
201 }
202 error!(?node_host, "fail to create worker node after retry");
203 }
204
205 pub(super) fn remove_worker(&mut self, node: WorkerNode) {
206 if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
207 let (_, worker_state) = entry.get_mut();
208 match worker_state {
209 WorkerNodeState::Connected { removed, .. } => {
210 info!(worker_id = %node.id, "mark connected worker as removed");
211 *removed = true;
212 }
213 WorkerNodeState::Reconnecting(_) => {
214 info!(worker_id = %node.id, "remove worker");
215 entry.remove();
216 }
217 }
218 }
219 }
220
221 fn retry_connect(
222 node: WorkerNode,
223 term_id: String,
224 context: Arc<impl GlobalBarrierWorkerContext>,
225 ) -> BoxFuture<'static, StreamingControlHandle> {
226 async move {
227 let mut attempt = 0;
228 let backoff = ExponentialBackoff::from_millis(100)
229 .max_delay(Duration::from_mins(1))
230 .factor(5);
231 let init_request = PbInitRequest { term_id };
232 for delay in backoff {
233 attempt += 1;
234 sleep(delay).await;
235 match context.new_control_stream(&node, &init_request).await {
236 Ok(handle) => {
237 return handle;
238 }
239 Err(e) => {
240 warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
241 }
242 }
243 }
244 unreachable!("end of retry backoff")
245 }.boxed()
246 }
247
248 pub(super) async fn recover(
249 env: MetaSrvEnv,
250 nodes: &HashMap<WorkerId, WorkerNode>,
251 term_id: &str,
252 context: Arc<impl GlobalBarrierWorkerContext>,
253 ) -> Self {
254 let reset_start_time = Instant::now();
255 let init_request = PbInitRequest {
256 term_id: term_id.to_owned(),
257 };
258 let init_request = &init_request;
259 let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
260 let result = context.new_control_stream(node, init_request).await;
261 (*worker_id, node.clone(), result)
262 }))
263 .await;
264 let mut unconnected_workers = HashSet::new();
265 let mut workers = HashMap::new();
266 for (worker_id, node, result) in nodes {
267 match result {
268 Ok(handle) => {
269 let control_stream = ControlStreamNode {
270 worker_id: node.id,
271 host: node.host.clone().unwrap(),
272 handle,
273 };
274 assert!(
275 workers
276 .insert(
277 worker_id,
278 (
279 node,
280 WorkerNodeState::Connected {
281 control_stream,
282 removed: false
283 }
284 )
285 )
286 .is_none()
287 );
288 }
289 Err(e) => {
290 unconnected_workers.insert(worker_id);
291 warn!(
292 e = %e.as_report(),
293 %worker_id,
294 ?node,
295 "failed to connect to node"
296 );
297 assert!(
298 workers
299 .insert(
300 worker_id,
301 (
302 node.clone(),
303 WorkerNodeState::Reconnecting(Self::retry_connect(
304 node,
305 term_id.to_owned(),
306 context.clone()
307 ))
308 )
309 )
310 .is_none()
311 );
312 }
313 }
314 }
315
316 info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
317
318 Self { workers, env }
319 }
320
321 pub(super) fn clear(&mut self) {
323 *self = Self::new(self.env.clone());
324 }
325}
326
327pub(super) struct WorkerNodeConnected<'a> {
328 node: &'a WorkerNode,
329 handle: &'a mut StreamingControlHandle,
330}
331
332impl<'a> WorkerNodeConnected<'a> {
333 pub(super) fn initialize(
334 self,
335 inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
336 ) {
337 for request in ControlStreamManager::collect_init_partial_graph(inflight_infos) {
338 if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
339 request: Some(
340 streaming_control_stream_request::Request::CreatePartialGraph(request),
341 ),
342 }) {
343 warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
344 }
345 }
346 }
347}
348
349pub(super) enum WorkerNodeEvent<'a> {
350 Response(MetaResult<streaming_control_stream_response::Response>),
351 Connected(WorkerNodeConnected<'a>),
352}
353
354impl ControlStreamManager {
355 fn poll_next_event<'a>(
356 this_opt: &mut Option<&'a mut Self>,
357 cx: &mut Context<'_>,
358 term_id: &str,
359 context: &Arc<impl GlobalBarrierWorkerContext>,
360 poll_reconnect: bool,
361 ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
362 let this = this_opt.as_mut().expect("Future polled after completion");
363 if this.workers.is_empty() {
364 return Poll::Pending;
365 }
366 {
367 for (&worker_id, (node, worker_state)) in &mut this.workers {
368 let control_stream = match worker_state {
369 WorkerNodeState::Connected { control_stream, .. } => control_stream,
370 WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
371 continue;
372 }
373 WorkerNodeState::Reconnecting(join_handle) => {
374 match join_handle.poll_unpin(cx) {
375 Poll::Ready(handle) => {
376 info!(id=%node.id, host=?node.host, "reconnected to worker");
377 *worker_state = WorkerNodeState::Connected {
378 control_stream: ControlStreamNode {
379 worker_id: node.id,
380 host: node.host.clone().unwrap(),
381 handle,
382 },
383 removed: false,
384 };
385 let this = this_opt.take().expect("should exist");
386 let (node, worker_state) =
387 this.workers.get_mut(&worker_id).expect("should exist");
388 let WorkerNodeState::Connected { control_stream, .. } =
389 worker_state
390 else {
391 unreachable!()
392 };
393 return Poll::Ready((
394 worker_id,
395 WorkerNodeEvent::Connected(WorkerNodeConnected {
396 node,
397 handle: &mut control_stream.handle,
398 }),
399 ));
400 }
401 Poll::Pending => {
402 continue;
403 }
404 }
405 }
406 };
407 match control_stream.handle.response_stream.poll_next_unpin(cx) {
408 Poll::Ready(result) => {
409 {
410 let result = result
411 .ok_or_else(|| (false, anyhow!("end of stream").into()))
412 .and_then(|result| {
413 result.map_err(|err| -> (bool, MetaError) {(false, err.into())}).and_then(|resp| {
414 match resp
415 .response
416 .ok_or_else(|| (false, anyhow!("empty response").into()))?
417 {
418 streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
419 "worker node {worker_id} is shutting down"
420 )
421 .into())),
422 streaming_control_stream_response::Response::Init(_) => {
423 Err((false, anyhow!("get unexpected init response").into()))
425 }
426 resp => {
427 if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
428 assert_eq!(worker_id, barrier_resp.worker_id);
429 }
430 Ok(resp)
431 },
432 }
433 })
434 });
435 let result = match result {
436 Ok(resp) => Ok(resp),
437 Err((shutdown, err)) => {
438 warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
439 let WorkerNodeState::Connected { removed, .. } = worker_state
440 else {
441 unreachable!("checked connected")
442 };
443 if *removed || shutdown {
444 this.workers.remove(&worker_id);
445 } else {
446 *worker_state = WorkerNodeState::Reconnecting(
447 ControlStreamManager::retry_connect(
448 node.clone(),
449 term_id.to_owned(),
450 context.clone(),
451 ),
452 );
453 }
454 Err(err)
455 }
456 };
457 return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
458 }
459 }
460 Poll::Pending => {
461 continue;
462 }
463 }
464 }
465 };
466
467 Poll::Pending
468 }
469
470 #[await_tree::instrument("control_stream_next_event")]
471 pub(super) async fn next_event<'a>(
472 &'a mut self,
473 term_id: &str,
474 context: &Arc<impl GlobalBarrierWorkerContext>,
475 ) -> (WorkerId, WorkerNodeEvent<'a>) {
476 let mut this = Some(self);
477 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
478 }
479
480 #[await_tree::instrument("control_stream_next_response")]
481 pub(super) async fn next_response(
482 &mut self,
483 term_id: &str,
484 context: &Arc<impl GlobalBarrierWorkerContext>,
485 ) -> (
486 WorkerId,
487 MetaResult<streaming_control_stream_response::Response>,
488 ) {
489 let mut this = Some(self);
490 let (worker_id, event) =
491 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
492 match event {
493 WorkerNodeEvent::Response(result) => (worker_id, result),
494 WorkerNodeEvent::Connected(_) => {
495 unreachable!("set poll_reconnect=false")
496 }
497 }
498 }
499
500 fn collect_init_partial_graph(
501 initial_inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
502 ) -> impl Iterator<Item = PbCreatePartialGraphRequest> {
503 initial_inflight_infos.flat_map(|(database_id, creating_job_ids)| {
504 [PbCreatePartialGraphRequest {
505 partial_graph_id: to_partial_graph_id(None),
506 database_id,
507 }]
508 .into_iter()
509 .chain(
510 creating_job_ids.map(move |job_id| PbCreatePartialGraphRequest {
511 partial_graph_id: to_partial_graph_id(Some(job_id)),
512 database_id,
513 }),
514 )
515 })
516 }
517}
518
519pub(super) struct DatabaseInitialBarrierCollector {
520 database_id: DatabaseId,
521 node_to_collect: NodeToCollect,
522 database_state: BarrierWorkerState,
523 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
524 committed_epoch: u64,
525 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
526}
527
528impl Debug for DatabaseInitialBarrierCollector {
529 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
530 f.debug_struct("DatabaseInitialBarrierCollector")
531 .field("database_id", &self.database_id)
532 .field("node_to_collect", &self.node_to_collect)
533 .finish()
534 }
535}
536
537impl DatabaseInitialBarrierCollector {
538 pub(super) fn is_collected(&self) -> bool {
539 self.node_to_collect.is_empty()
540 && self
541 .creating_streaming_job_controls
542 .values()
543 .all(|job| job.is_empty())
544 }
545
546 pub(super) fn database_state(
547 &self,
548 ) -> (
549 &BarrierWorkerState,
550 &HashMap<JobId, CreatingStreamingJobControl>,
551 ) {
552 (&self.database_state, &self.creating_streaming_job_controls)
553 }
554
555 pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
556 assert_eq!(self.database_id, resp.database_id);
557 if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
558 self.creating_streaming_job_controls
559 .get_mut(&creating_job_id)
560 .expect("should exist")
561 .collect(resp);
562 } else {
563 assert_eq!(resp.epoch, self.committed_epoch);
564 assert!(self.node_to_collect.remove(&resp.worker_id).is_some());
565 }
566 }
567
568 pub(super) fn finish(self) -> DatabaseCheckpointControl {
569 assert!(self.is_collected());
570 DatabaseCheckpointControl::recovery(
571 self.database_id,
572 self.database_state,
573 self.committed_epoch,
574 self.creating_streaming_job_controls,
575 self.cdc_table_backfill_tracker,
576 )
577 }
578
579 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
580 is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
581 && self
582 .creating_streaming_job_controls
583 .values_mut()
584 .all(|job| job.is_valid_after_worker_err(worker_id))
585 }
586}
587
588impl ControlStreamManager {
589 #[expect(clippy::too_many_arguments)]
591 pub(super) fn inject_database_initial_barrier(
592 &mut self,
593 database_id: DatabaseId,
594 jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
595 state_table_committed_epochs: &mut HashMap<TableId, u64>,
596 state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
597 edges: &mut FragmentEdgeBuildResult,
598 stream_actors: &HashMap<ActorId, StreamActor>,
599 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
600 background_jobs: &mut HashMap<JobId, String>,
601 mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
602 is_paused: bool,
603 hummock_version_stats: &HummockVersionStats,
604 cdc_table_snapshot_split_assignment: &mut CdcTableSnapshotSplitAssignmentWithGeneration,
605 ) -> MetaResult<DatabaseInitialBarrierCollector> {
606 self.add_partial_graph(database_id, None);
607 let source_split_assignments = jobs
608 .values()
609 .flat_map(|fragments| fragments.values())
610 .flat_map(|info| info.actors.keys())
611 .filter_map(|actor_id| {
612 let actor_id = *actor_id as ActorId;
613 source_splits
614 .remove(&actor_id)
615 .map(|splits| (actor_id, splits))
616 })
617 .collect();
618 let database_cdc_table_snapshot_split_assignment = jobs
619 .values()
620 .flat_map(|fragments| fragments.values())
621 .flat_map(|info| info.actors.keys())
622 .filter_map(|actor_id| {
623 let actor_id = *actor_id as ActorId;
624 cdc_table_snapshot_split_assignment
625 .splits
626 .remove(&actor_id)
627 .map(|splits| (actor_id, splits))
628 })
629 .collect();
630 let database_cdc_table_snapshot_split_assignment =
631 CdcTableSnapshotSplitAssignmentWithGeneration::new(
632 database_cdc_table_snapshot_split_assignment,
633 cdc_table_snapshot_split_assignment.generation,
634 );
635 let mutation = Mutation::Add(AddMutation {
636 actor_dispatchers: Default::default(),
638 added_actors: Default::default(),
639 actor_splits: build_actor_connector_splits(&source_split_assignments),
640 actor_cdc_table_snapshot_splits:
641 build_pb_actor_cdc_table_snapshot_splits_with_generation(
642 database_cdc_table_snapshot_split_assignment,
643 )
644 .into(),
645 pause: is_paused,
646 subscriptions_to_add: Default::default(),
647 backfill_nodes_to_pause: Default::default(),
649 new_upstream_sinks: Default::default(),
650 });
651
652 fn resolve_jobs_committed_epoch<'a>(
653 state_table_committed_epochs: &mut HashMap<TableId, u64>,
654 fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
655 ) -> u64 {
656 let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
657 (
658 table_id,
659 state_table_committed_epochs
660 .remove(&table_id)
661 .expect("should exist"),
662 )
663 });
664 let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
665 for (table_id, epoch) in epochs {
666 assert_eq!(
667 prev_epoch, epoch,
668 "{} has different committed epoch to {}",
669 first_table_id, table_id
670 );
671 }
672 prev_epoch
673 }
674
675 let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
676 .keys()
677 .filter_map(|job_id| {
678 mv_depended_subscriptions
679 .remove(&job_id.as_mv_table_id())
680 .map(|subscriptions| {
681 (
682 job_id.as_mv_table_id(),
683 subscriptions
684 .into_iter()
685 .map(|(subscription_id, retention)| {
686 (
687 subscription_id.as_raw_id(),
688 SubscriberType::Subscription(retention),
689 )
690 })
691 .collect(),
692 )
693 })
694 })
695 .collect();
696
697 let mut database_jobs = HashMap::new();
698 let mut snapshot_backfill_jobs = HashMap::new();
699
700 for (job_id, job_fragments) in jobs {
701 if let Some(definition) = background_jobs.remove(&job_id) {
702 if job_fragments.values().any(|fragment| {
703 fragment
704 .fragment_type_mask
705 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
706 }) {
707 debug!(%job_id, definition, "recovered snapshot backfill job");
708 snapshot_backfill_jobs.insert(job_id, (job_fragments, definition));
709 } else {
710 database_jobs.insert(job_id, (job_fragments, Some(definition)));
711 }
712 } else {
713 database_jobs.insert(job_id, (job_fragments, None));
714 }
715 }
716
717 let database_job_log_epochs: HashMap<_, _> = database_jobs
718 .keys()
719 .filter_map(|job_id| {
720 state_table_log_epochs
721 .remove(&job_id.as_mv_table_id())
722 .map(|epochs| (job_id.as_mv_table_id(), epochs))
723 })
724 .collect();
725
726 let prev_epoch = resolve_jobs_committed_epoch(
727 state_table_committed_epochs,
728 database_jobs.values().flat_map(|(job, _)| job.values()),
729 );
730 let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
731 let curr_epoch = prev_epoch.next();
733 let barrier_info = BarrierInfo {
734 prev_epoch,
735 curr_epoch,
736 kind: BarrierKind::Initial,
737 };
738
739 let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
740 for (job_id, (fragment_infos, definition)) in snapshot_backfill_jobs {
741 let committed_epoch =
742 resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
743 if committed_epoch == barrier_info.prev_epoch() {
744 info!(
745 "recovered creating snapshot backfill job {} catch up with upstream already",
746 job_id
747 );
748 database_jobs
749 .try_insert(job_id, (fragment_infos, Some(definition)))
750 .expect("non-duplicate");
751 continue;
752 }
753 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
754 fragment_infos
755 .values()
756 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
757 )?
758 .0
759 .ok_or_else(|| {
760 anyhow!(
761 "recovered snapshot backfill job {} has no snapshot backfill info",
762 job_id
763 )
764 })?;
765 let mut snapshot_epoch = None;
766 let upstream_table_ids: HashSet<_> = snapshot_backfill_info
767 .upstream_mv_table_id_to_backfill_epoch
768 .keys()
769 .cloned()
770 .collect();
771 for (upstream_table_id, epoch) in
772 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
773 {
774 let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
775 let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
776 if *snapshot_epoch != epoch {
777 return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
778 }
779 }
780 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
781 anyhow!(
782 "snapshot backfill job {} has not set snapshot epoch",
783 job_id
784 )
785 })?;
786 for upstream_table_id in &upstream_table_ids {
787 subscribers
788 .entry(*upstream_table_id)
789 .or_default()
790 .try_insert(job_id.as_raw_id(), SubscriberType::SnapshotBackfill)
791 .expect("non-duplicate");
792 }
793 ongoing_snapshot_backfill_jobs
794 .try_insert(
795 job_id,
796 (
797 fragment_infos,
798 definition,
799 upstream_table_ids,
800 committed_epoch,
801 snapshot_epoch,
802 ),
803 )
804 .expect("non-duplicated");
805 }
806
807 let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
808 database_jobs
809 .into_iter()
810 .map(|(job_id, (fragment_infos, background_job_definition))| {
811 let status = if let Some(definition) = background_job_definition {
812 CreateStreamingJobStatus::Creating(CreateMviewProgressTracker::recover(
813 job_id,
814 definition,
815 &fragment_infos,
816 Default::default(),
817 hummock_version_stats,
818 ))
819 } else {
820 CreateStreamingJobStatus::Created
821 };
822 (
823 job_id,
824 InflightStreamingJobInfo {
825 job_id,
826 fragment_infos,
827 subscribers: subscribers
828 .remove(&job_id.as_mv_table_id())
829 .unwrap_or_default(),
830 status,
831 },
832 )
833 })
834 .collect()
835 };
836
837 let node_to_collect = {
838 let new_actors =
839 edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
840 job.fragment_infos.values().map(move |fragment_infos| {
841 (
842 fragment_infos.fragment_id,
843 &fragment_infos.nodes,
844 fragment_infos.actors.iter().map(move |(actor_id, actor)| {
845 (
846 stream_actors.get(actor_id).expect("should exist"),
847 actor.worker_id,
848 )
849 }),
850 job.subscribers.keys().copied(),
851 )
852 })
853 }));
854
855 let nodes_actors =
856 InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
857
858 let node_to_collect = self.inject_barrier(
859 database_id,
860 None,
861 Some(mutation.clone()),
862 &barrier_info,
863 &nodes_actors,
864 InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
865 Some(new_actors),
866 )?;
867 debug!(
868 ?node_to_collect,
869 %database_id,
870 "inject initial barrier"
871 );
872 node_to_collect
873 };
874
875 let mut creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl> =
876 HashMap::new();
877 for (job_id, (info, definition, upstream_table_ids, committed_epoch, snapshot_epoch)) in
878 ongoing_snapshot_backfill_jobs
879 {
880 let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
881 (
882 fragment_infos.fragment_id,
883 &fragment_infos.nodes,
884 fragment_infos.actors.iter().map(move |(actor_id, actor)| {
885 (
886 stream_actors.get(actor_id).expect("should exist"),
887 actor.worker_id,
888 )
889 }),
890 vec![], )
892 }));
893
894 creating_streaming_job_controls.insert(
895 job_id,
896 CreatingStreamingJobControl::recover(
897 database_id,
898 job_id,
899 definition,
900 upstream_table_ids,
901 &database_job_log_epochs,
902 snapshot_epoch,
903 committed_epoch,
904 barrier_info.curr_epoch.value().0,
905 info,
906 hummock_version_stats,
907 node_actors,
908 mutation.clone(),
909 self,
910 )?,
911 );
912 }
913
914 self.env.shared_actor_infos().recover_database(
915 database_id,
916 database_jobs
917 .values()
918 .flat_map(|info| {
919 info.fragment_infos()
920 .map(move |fragment| (fragment, info.job_id))
921 })
922 .chain(
923 creating_streaming_job_controls
924 .values()
925 .flat_map(|job| job.fragment_infos_with_job_id()),
926 ),
927 );
928
929 let committed_epoch = barrier_info.prev_epoch();
930 let new_epoch = barrier_info.curr_epoch;
931 let database_state = BarrierWorkerState::recovery(
932 database_id,
933 self.env.shared_actor_infos().clone(),
934 new_epoch,
935 database_jobs.into_values(),
936 is_paused,
937 );
938 let cdc_table_backfill_tracker = self.env.cdc_table_backfill_tracker();
939 Ok(DatabaseInitialBarrierCollector {
940 database_id,
941 node_to_collect,
942 database_state,
943 creating_streaming_job_controls,
944 committed_epoch,
945 cdc_table_backfill_tracker,
946 })
947 }
948
949 fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
950 self.workers
951 .iter()
952 .filter_map(|(worker_id, (_, worker_state))| match worker_state {
953 WorkerNodeState::Connected { control_stream, .. } => {
954 Some((*worker_id, control_stream))
955 }
956 WorkerNodeState::Reconnecting(_) => None,
957 })
958 }
959
960 pub(super) fn inject_barrier(
961 &mut self,
962 database_id: DatabaseId,
963 creating_job_id: Option<JobId>,
964 mutation: Option<Mutation>,
965 barrier_info: &BarrierInfo,
966 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
967 table_ids_to_sync: impl Iterator<Item = TableId>,
968 mut new_actors: Option<StreamJobActorsToCreate>,
969 ) -> MetaResult<NodeToCollect> {
970 fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
971 "inject_barrier_err"
972 ));
973
974 let partial_graph_id = to_partial_graph_id(creating_job_id);
975
976 for worker_id in node_actors.keys() {
977 if let Some((_, worker_state)) = self.workers.get(worker_id)
978 && let WorkerNodeState::Connected { .. } = worker_state
979 {
980 } else {
981 return Err(anyhow!("unconnected worker node {}", worker_id).into());
982 }
983 }
984
985 let mut node_need_collect = HashMap::new();
986 let table_ids_to_sync = table_ids_to_sync.collect_vec();
987
988 self.connected_workers()
989 .try_for_each(|(node_id, node)| {
990 let actor_ids_to_collect = node_actors
991 .get(&node_id)
992 .map(|actors| actors.iter().cloned())
993 .into_iter()
994 .flatten()
995 .collect_vec();
996 let is_empty = actor_ids_to_collect.is_empty();
997 {
998 let mutation = mutation.clone();
999 let barrier = Barrier {
1000 epoch: Some(risingwave_pb::data::Epoch {
1001 curr: barrier_info.curr_epoch.value().0,
1002 prev: barrier_info.prev_epoch(),
1003 }),
1004 mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1005 tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1006 .to_protobuf(),
1007 kind: barrier_info.kind.to_protobuf() as i32,
1008 passed_actors: vec![],
1009 };
1010
1011 node.handle
1012 .request_sender
1013 .send(StreamingControlStreamRequest {
1014 request: Some(
1015 streaming_control_stream_request::Request::InjectBarrier(
1016 InjectBarrierRequest {
1017 request_id: Uuid::new_v4().to_string(),
1018 barrier: Some(barrier),
1019 database_id,
1020 actor_ids_to_collect,
1021 table_ids_to_sync: table_ids_to_sync.clone(),
1022 partial_graph_id,
1023 actors_to_build: new_actors
1024 .as_mut()
1025 .map(|new_actors| new_actors.remove(&(node_id as _)))
1026 .into_iter()
1027 .flatten()
1028 .flatten()
1029 .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1030 FragmentBuildActorInfo {
1031 fragment_id,
1032 node: Some(node),
1033 actors: actors
1034 .into_iter()
1035 .map(|(actor, upstreams, dispatchers)| {
1036 BuildActorInfo {
1037 actor_id: actor.actor_id,
1038 fragment_upstreams: upstreams
1039 .into_iter()
1040 .map(|(fragment_id, upstreams)| {
1041 (
1042 fragment_id,
1043 UpstreamActors {
1044 actors: upstreams
1045 .into_values()
1046 .collect(),
1047 },
1048 )
1049 })
1050 .collect(),
1051 dispatchers,
1052 vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1053 mview_definition: actor.mview_definition,
1054 expr_context: actor.expr_context,
1055 config_override: actor.config_override.to_string(),
1056 initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1057 }
1058 })
1059 .collect(),
1060 }
1061 })
1062 .collect(),
1063 },
1064 ),
1065 ),
1066 })
1067 .map_err(|_| {
1068 MetaError::from(anyhow!(
1069 "failed to send request to {} {:?}",
1070 node.worker_id,
1071 node.host
1072 ))
1073 })?;
1074
1075 node_need_collect.insert(node_id as WorkerId, is_empty);
1076 Result::<_, MetaError>::Ok(())
1077 }
1078 })
1079 .inspect_err(|e| {
1080 use risingwave_pb::meta::event_log;
1082 let event = event_log::EventInjectBarrierFail {
1083 prev_epoch: barrier_info.prev_epoch(),
1084 cur_epoch: barrier_info.curr_epoch.value().0,
1085 error: e.to_report_string(),
1086 };
1087 self.env
1088 .event_log_manager_ref()
1089 .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1090 })?;
1091 Ok(node_need_collect)
1092 }
1093
1094 pub(super) fn add_partial_graph(
1095 &mut self,
1096 database_id: DatabaseId,
1097 creating_job_id: Option<JobId>,
1098 ) {
1099 let partial_graph_id = to_partial_graph_id(creating_job_id);
1100 self.connected_workers().for_each(|(_, node)| {
1101 if node
1102 .handle
1103 .request_sender
1104 .send(StreamingControlStreamRequest {
1105 request: Some(
1106 streaming_control_stream_request::Request::CreatePartialGraph(
1107 CreatePartialGraphRequest {
1108 database_id,
1109 partial_graph_id,
1110 },
1111 ),
1112 ),
1113 }).is_err() {
1114 warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1115 }
1116 });
1117 }
1118
1119 pub(super) fn remove_partial_graph(
1120 &mut self,
1121 database_id: DatabaseId,
1122 creating_job_ids: Vec<JobId>,
1123 ) {
1124 if creating_job_ids.is_empty() {
1125 return;
1126 }
1127 let partial_graph_ids = creating_job_ids
1128 .into_iter()
1129 .map(|job_id| to_partial_graph_id(Some(job_id)))
1130 .collect_vec();
1131 self.connected_workers().for_each(|(_, node)| {
1132 if node.handle
1133 .request_sender
1134 .send(StreamingControlStreamRequest {
1135 request: Some(
1136 streaming_control_stream_request::Request::RemovePartialGraph(
1137 RemovePartialGraphRequest {
1138 partial_graph_ids: partial_graph_ids.clone(),
1139 database_id,
1140 },
1141 ),
1142 ),
1143 })
1144 .is_err()
1145 {
1146 warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1147 }
1148 })
1149 }
1150
1151 pub(super) fn reset_database(
1152 &mut self,
1153 database_id: DatabaseId,
1154 reset_request_id: u32,
1155 ) -> HashSet<WorkerId> {
1156 self.connected_workers()
1157 .filter_map(|(worker_id, node)| {
1158 if node
1159 .handle
1160 .request_sender
1161 .send(StreamingControlStreamRequest {
1162 request: Some(streaming_control_stream_request::Request::ResetDatabase(
1163 ResetDatabaseRequest {
1164 database_id,
1165 reset_request_id,
1166 },
1167 )),
1168 })
1169 .is_err()
1170 {
1171 warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1172 None
1173 } else {
1174 Some(worker_id)
1175 }
1176 })
1177 .collect()
1178 }
1179}
1180
1181impl GlobalBarrierWorkerContextImpl {
1182 pub(super) async fn new_control_stream_impl(
1183 &self,
1184 node: &WorkerNode,
1185 init_request: &PbInitRequest,
1186 ) -> MetaResult<StreamingControlHandle> {
1187 let handle = self
1188 .env
1189 .stream_client_pool()
1190 .get(node)
1191 .await?
1192 .start_streaming_control(init_request.clone())
1193 .await?;
1194 Ok(handle)
1195 }
1196}
1197
1198pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1199 message: &str,
1200 errors: impl IntoIterator<Item = (WorkerId, E)>,
1201) -> MetaError {
1202 use std::fmt::Write;
1203
1204 use risingwave_common::error::error_request_copy;
1205 use risingwave_common::error::tonic::extra::Score;
1206
1207 let errors = errors.into_iter().collect_vec();
1208
1209 if errors.is_empty() {
1210 return anyhow!(message.to_owned()).into();
1211 }
1212
1213 let single_error = |(worker_id, e)| {
1215 anyhow::Error::from(e)
1216 .context(format!("{message}, in worker node {worker_id}"))
1217 .into()
1218 };
1219
1220 if errors.len() == 1 {
1221 return single_error(errors.into_iter().next().unwrap());
1222 }
1223
1224 let max_score = errors
1226 .iter()
1227 .filter_map(|(_, e)| error_request_copy::<Score>(e))
1228 .max();
1229
1230 if let Some(max_score) = max_score {
1231 let mut errors = errors;
1232 let max_scored = errors
1233 .extract_if(.., |(_, e)| {
1234 error_request_copy::<Score>(e) == Some(max_score)
1235 })
1236 .next()
1237 .unwrap();
1238
1239 return single_error(max_scored);
1240 }
1241
1242 let concat: String = errors
1244 .into_iter()
1245 .fold(format!("{message}: "), |mut s, (w, e)| {
1246 write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1247 s
1248 });
1249 anyhow!(concat).into()
1250}