1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
30use risingwave_common::id::JobId;
31use risingwave_common::util::epoch::Epoch;
32use risingwave_common::util::tracing::TracingContext;
33use risingwave_connector::source::SplitImpl;
34use risingwave_connector::source::cdc::{
35 CdcTableSnapshotSplitAssignmentWithGeneration,
36 build_pb_actor_cdc_table_snapshot_splits_with_generation,
37};
38use risingwave_meta_model::WorkerId;
39use risingwave_pb::common::{HostAddress, WorkerNode};
40use risingwave_pb::hummock::HummockVersionStats;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
43use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
44use risingwave_pb::stream_service::inject_barrier_request::{
45 BuildActorInfo, FragmentBuildActorInfo,
46};
47use risingwave_pb::stream_service::streaming_control_stream_request::{
48 CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
49 RemovePartialGraphRequest, ResetDatabaseRequest,
50};
51use risingwave_pb::stream_service::{
52 BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
53 streaming_control_stream_request, streaming_control_stream_response,
54};
55use risingwave_rpc_client::StreamingControlHandle;
56use thiserror_ext::AsReport;
57use tokio::time::{Instant, sleep};
58use tokio_retry::strategy::ExponentialBackoff;
59use tracing::{debug, error, info, warn};
60use uuid::Uuid;
61
62use super::{BarrierKind, Command, TracedEpoch};
63use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
64use crate::barrier::checkpoint::{
65 BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
66};
67use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
68use crate::barrier::edge_builder::FragmentEdgeBuildResult;
69use crate::barrier::info::{
70 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
71 SubscriberType,
72};
73use crate::barrier::progress::CreateMviewProgressTracker;
74use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
75use crate::controller::fragment::InflightFragmentInfo;
76use crate::manager::MetaSrvEnv;
77use crate::model::{ActorId, FragmentId, StreamActor, StreamJobActorsToCreate, SubscriptionId};
78use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
79use crate::{MetaError, MetaResult};
80
81fn to_partial_graph_id(job_id: Option<JobId>) -> u32 {
82 job_id
83 .map(|job_id| {
84 assert_ne!(job_id, u32::MAX);
85 job_id.as_raw_id()
86 })
87 .unwrap_or(u32::MAX)
88}
89
90pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<JobId> {
91 if partial_graph_id == u32::MAX {
92 None
93 } else {
94 Some(JobId::new(partial_graph_id))
95 }
96}
97
98struct ControlStreamNode {
99 worker_id: WorkerId,
100 host: HostAddress,
101 handle: StreamingControlHandle,
102}
103
104enum WorkerNodeState {
105 Connected {
106 control_stream: ControlStreamNode,
107 removed: bool,
108 },
109 Reconnecting(BoxFuture<'static, StreamingControlHandle>),
110}
111
112pub(super) struct ControlStreamManager {
113 workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
114 pub env: MetaSrvEnv,
115}
116
117impl ControlStreamManager {
118 pub(super) fn new(env: MetaSrvEnv) -> Self {
119 Self {
120 workers: Default::default(),
121 env,
122 }
123 }
124
125 pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
126 self.workers[&worker_id].0.host.clone().unwrap()
127 }
128
129 pub(super) async fn add_worker(
130 &mut self,
131 node: WorkerNode,
132 inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
133 term_id: String,
134 context: &impl GlobalBarrierWorkerContext,
135 ) {
136 let node_id = node.id;
137 if let Entry::Occupied(entry) = self.workers.entry(node_id) {
138 let (existing_node, worker_state) = entry.get();
139 assert_eq!(existing_node.host, node.host);
140 warn!(id = %node.id, host = ?node.host, "node already exists");
141 match worker_state {
142 WorkerNodeState::Connected { .. } => {
143 warn!(id = %node.id, host = ?node.host, "new node already connected");
144 return;
145 }
146 WorkerNodeState::Reconnecting(_) => {
147 warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
148 entry.remove();
149 }
150 }
151 }
152 let node_host = node.host.clone().unwrap();
153 let mut backoff = ExponentialBackoff::from_millis(100)
154 .max_delay(Duration::from_secs(3))
155 .factor(5);
156 const MAX_RETRY: usize = 5;
157 for i in 1..=MAX_RETRY {
158 match context
159 .new_control_stream(
160 &node,
161 &PbInitRequest {
162 term_id: term_id.clone(),
163 },
164 )
165 .await
166 {
167 Ok(mut handle) => {
168 WorkerNodeConnected {
169 handle: &mut handle,
170 node: &node,
171 }
172 .initialize(inflight_infos);
173 info!(?node_host, "add control stream worker");
174 assert!(
175 self.workers
176 .insert(
177 node_id,
178 (
179 node,
180 WorkerNodeState::Connected {
181 control_stream: ControlStreamNode {
182 worker_id: node_id as _,
183 host: node_host,
184 handle,
185 },
186 removed: false
187 }
188 )
189 )
190 .is_none()
191 );
192 return;
193 }
194 Err(e) => {
195 let delay = backoff.next().unwrap();
198 error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
199 sleep(delay).await;
200 }
201 }
202 }
203 error!(?node_host, "fail to create worker node after retry");
204 }
205
206 pub(super) fn remove_worker(&mut self, node: WorkerNode) {
207 if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
208 let (_, worker_state) = entry.get_mut();
209 match worker_state {
210 WorkerNodeState::Connected { removed, .. } => {
211 info!(worker_id = %node.id, "mark connected worker as removed");
212 *removed = true;
213 }
214 WorkerNodeState::Reconnecting(_) => {
215 info!(worker_id = %node.id, "remove worker");
216 entry.remove();
217 }
218 }
219 }
220 }
221
222 fn retry_connect(
223 node: WorkerNode,
224 term_id: String,
225 context: Arc<impl GlobalBarrierWorkerContext>,
226 ) -> BoxFuture<'static, StreamingControlHandle> {
227 async move {
228 let mut attempt = 0;
229 let backoff = ExponentialBackoff::from_millis(100)
230 .max_delay(Duration::from_mins(1))
231 .factor(5);
232 let init_request = PbInitRequest { term_id };
233 for delay in backoff {
234 attempt += 1;
235 sleep(delay).await;
236 match context.new_control_stream(&node, &init_request).await {
237 Ok(handle) => {
238 return handle;
239 }
240 Err(e) => {
241 warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
242 }
243 }
244 }
245 unreachable!("end of retry backoff")
246 }.boxed()
247 }
248
249 pub(super) async fn recover(
250 env: MetaSrvEnv,
251 nodes: &HashMap<WorkerId, WorkerNode>,
252 term_id: &str,
253 context: Arc<impl GlobalBarrierWorkerContext>,
254 ) -> Self {
255 let reset_start_time = Instant::now();
256 let init_request = PbInitRequest {
257 term_id: term_id.to_owned(),
258 };
259 let init_request = &init_request;
260 let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
261 let result = context.new_control_stream(node, init_request).await;
262 (*worker_id, node.clone(), result)
263 }))
264 .await;
265 let mut unconnected_workers = HashSet::new();
266 let mut workers = HashMap::new();
267 for (worker_id, node, result) in nodes {
268 match result {
269 Ok(handle) => {
270 let control_stream = ControlStreamNode {
271 worker_id: node.id,
272 host: node.host.clone().unwrap(),
273 handle,
274 };
275 assert!(
276 workers
277 .insert(
278 worker_id,
279 (
280 node,
281 WorkerNodeState::Connected {
282 control_stream,
283 removed: false
284 }
285 )
286 )
287 .is_none()
288 );
289 }
290 Err(e) => {
291 unconnected_workers.insert(worker_id);
292 warn!(
293 e = %e.as_report(),
294 %worker_id,
295 ?node,
296 "failed to connect to node"
297 );
298 assert!(
299 workers
300 .insert(
301 worker_id,
302 (
303 node.clone(),
304 WorkerNodeState::Reconnecting(Self::retry_connect(
305 node,
306 term_id.to_owned(),
307 context.clone()
308 ))
309 )
310 )
311 .is_none()
312 );
313 }
314 }
315 }
316
317 info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
318
319 Self { workers, env }
320 }
321
322 pub(super) fn clear(&mut self) {
324 *self = Self::new(self.env.clone());
325 }
326}
327
328pub(super) struct WorkerNodeConnected<'a> {
329 node: &'a WorkerNode,
330 handle: &'a mut StreamingControlHandle,
331}
332
333impl<'a> WorkerNodeConnected<'a> {
334 pub(super) fn initialize(
335 self,
336 inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
337 ) {
338 for request in ControlStreamManager::collect_init_partial_graph(inflight_infos) {
339 if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
340 request: Some(
341 streaming_control_stream_request::Request::CreatePartialGraph(request),
342 ),
343 }) {
344 warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
345 }
346 }
347 }
348}
349
350pub(super) enum WorkerNodeEvent<'a> {
351 Response(MetaResult<streaming_control_stream_response::Response>),
352 Connected(WorkerNodeConnected<'a>),
353}
354
355impl ControlStreamManager {
356 fn poll_next_event<'a>(
357 this_opt: &mut Option<&'a mut Self>,
358 cx: &mut Context<'_>,
359 term_id: &str,
360 context: &Arc<impl GlobalBarrierWorkerContext>,
361 poll_reconnect: bool,
362 ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
363 let this = this_opt.as_mut().expect("Future polled after completion");
364 if this.workers.is_empty() {
365 return Poll::Pending;
366 }
367 {
368 for (&worker_id, (node, worker_state)) in &mut this.workers {
369 let control_stream = match worker_state {
370 WorkerNodeState::Connected { control_stream, .. } => control_stream,
371 WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
372 continue;
373 }
374 WorkerNodeState::Reconnecting(join_handle) => {
375 match join_handle.poll_unpin(cx) {
376 Poll::Ready(handle) => {
377 info!(id=%node.id, host=?node.host, "reconnected to worker");
378 *worker_state = WorkerNodeState::Connected {
379 control_stream: ControlStreamNode {
380 worker_id: node.id,
381 host: node.host.clone().unwrap(),
382 handle,
383 },
384 removed: false,
385 };
386 let this = this_opt.take().expect("should exist");
387 let (node, worker_state) =
388 this.workers.get_mut(&worker_id).expect("should exist");
389 let WorkerNodeState::Connected { control_stream, .. } =
390 worker_state
391 else {
392 unreachable!()
393 };
394 return Poll::Ready((
395 worker_id,
396 WorkerNodeEvent::Connected(WorkerNodeConnected {
397 node,
398 handle: &mut control_stream.handle,
399 }),
400 ));
401 }
402 Poll::Pending => {
403 continue;
404 }
405 }
406 }
407 };
408 match control_stream.handle.response_stream.poll_next_unpin(cx) {
409 Poll::Ready(result) => {
410 {
411 let result = result
412 .ok_or_else(|| (false, anyhow!("end of stream").into()))
413 .and_then(|result| {
414 result.map_err(|err| -> (bool, MetaError) {(false, err.into())}).and_then(|resp| {
415 match resp
416 .response
417 .ok_or_else(|| (false, anyhow!("empty response").into()))?
418 {
419 streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
420 "worker node {worker_id} is shutting down"
421 )
422 .into())),
423 streaming_control_stream_response::Response::Init(_) => {
424 Err((false, anyhow!("get unexpected init response").into()))
426 }
427 resp => {
428 if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
429 assert_eq!(worker_id, barrier_resp.worker_id);
430 }
431 Ok(resp)
432 },
433 }
434 })
435 });
436 let result = match result {
437 Ok(resp) => Ok(resp),
438 Err((shutdown, err)) => {
439 warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
440 let WorkerNodeState::Connected { removed, .. } = worker_state
441 else {
442 unreachable!("checked connected")
443 };
444 if *removed || shutdown {
445 this.workers.remove(&worker_id);
446 } else {
447 *worker_state = WorkerNodeState::Reconnecting(
448 ControlStreamManager::retry_connect(
449 node.clone(),
450 term_id.to_owned(),
451 context.clone(),
452 ),
453 );
454 }
455 Err(err)
456 }
457 };
458 return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
459 }
460 }
461 Poll::Pending => {
462 continue;
463 }
464 }
465 }
466 };
467
468 Poll::Pending
469 }
470
471 #[await_tree::instrument("control_stream_next_event")]
472 pub(super) async fn next_event<'a>(
473 &'a mut self,
474 term_id: &str,
475 context: &Arc<impl GlobalBarrierWorkerContext>,
476 ) -> (WorkerId, WorkerNodeEvent<'a>) {
477 let mut this = Some(self);
478 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
479 }
480
481 #[await_tree::instrument("control_stream_next_response")]
482 pub(super) async fn next_response(
483 &mut self,
484 term_id: &str,
485 context: &Arc<impl GlobalBarrierWorkerContext>,
486 ) -> (
487 WorkerId,
488 MetaResult<streaming_control_stream_response::Response>,
489 ) {
490 let mut this = Some(self);
491 let (worker_id, event) =
492 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
493 match event {
494 WorkerNodeEvent::Response(result) => (worker_id, result),
495 WorkerNodeEvent::Connected(_) => {
496 unreachable!("set poll_reconnect=false")
497 }
498 }
499 }
500
501 fn collect_init_partial_graph(
502 initial_inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
503 ) -> impl Iterator<Item = PbCreatePartialGraphRequest> {
504 initial_inflight_infos.flat_map(|(database_id, creating_job_ids)| {
505 [PbCreatePartialGraphRequest {
506 partial_graph_id: to_partial_graph_id(None),
507 database_id,
508 }]
509 .into_iter()
510 .chain(
511 creating_job_ids.map(move |job_id| PbCreatePartialGraphRequest {
512 partial_graph_id: to_partial_graph_id(Some(job_id)),
513 database_id,
514 }),
515 )
516 })
517 }
518}
519
520pub(super) struct DatabaseInitialBarrierCollector {
521 database_id: DatabaseId,
522 node_to_collect: NodeToCollect,
523 database_state: BarrierWorkerState,
524 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
525 committed_epoch: u64,
526 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
527}
528
529impl Debug for DatabaseInitialBarrierCollector {
530 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
531 f.debug_struct("DatabaseInitialBarrierCollector")
532 .field("database_id", &self.database_id)
533 .field("node_to_collect", &self.node_to_collect)
534 .finish()
535 }
536}
537
538impl DatabaseInitialBarrierCollector {
539 pub(super) fn is_collected(&self) -> bool {
540 self.node_to_collect.is_empty()
541 && self
542 .creating_streaming_job_controls
543 .values()
544 .all(|job| job.is_empty())
545 }
546
547 pub(super) fn database_state(
548 &self,
549 ) -> (
550 &BarrierWorkerState,
551 &HashMap<JobId, CreatingStreamingJobControl>,
552 ) {
553 (&self.database_state, &self.creating_streaming_job_controls)
554 }
555
556 pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
557 assert_eq!(self.database_id, resp.database_id);
558 if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
559 self.creating_streaming_job_controls
560 .get_mut(&creating_job_id)
561 .expect("should exist")
562 .collect(resp);
563 } else {
564 assert_eq!(resp.epoch, self.committed_epoch);
565 assert!(self.node_to_collect.remove(&resp.worker_id).is_some());
566 }
567 }
568
569 pub(super) fn finish(self) -> DatabaseCheckpointControl {
570 assert!(self.is_collected());
571 DatabaseCheckpointControl::recovery(
572 self.database_id,
573 self.database_state,
574 self.committed_epoch,
575 self.creating_streaming_job_controls,
576 self.cdc_table_backfill_tracker,
577 )
578 }
579
580 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
581 is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
582 && self
583 .creating_streaming_job_controls
584 .values_mut()
585 .all(|job| job.is_valid_after_worker_err(worker_id))
586 }
587}
588
589impl ControlStreamManager {
590 #[expect(clippy::too_many_arguments)]
592 pub(super) fn inject_database_initial_barrier(
593 &mut self,
594 database_id: DatabaseId,
595 jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
596 state_table_committed_epochs: &mut HashMap<TableId, u64>,
597 state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
598 edges: &mut FragmentEdgeBuildResult,
599 stream_actors: &HashMap<ActorId, StreamActor>,
600 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
601 background_jobs: &mut HashMap<JobId, String>,
602 mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
603 is_paused: bool,
604 hummock_version_stats: &HummockVersionStats,
605 cdc_table_snapshot_split_assignment: &mut CdcTableSnapshotSplitAssignmentWithGeneration,
606 ) -> MetaResult<DatabaseInitialBarrierCollector> {
607 self.add_partial_graph(database_id, None);
608 let source_split_assignments = jobs
609 .values()
610 .flat_map(|fragments| fragments.values())
611 .flat_map(|info| info.actors.keys())
612 .filter_map(|actor_id| {
613 let actor_id = *actor_id as ActorId;
614 source_splits
615 .remove(&actor_id)
616 .map(|splits| (actor_id, splits))
617 })
618 .collect();
619 let database_cdc_table_snapshot_split_assignment = jobs
620 .values()
621 .flat_map(|fragments| fragments.values())
622 .flat_map(|info| info.actors.keys())
623 .filter_map(|actor_id| {
624 let actor_id = *actor_id as ActorId;
625 cdc_table_snapshot_split_assignment
626 .splits
627 .remove(&actor_id)
628 .map(|splits| (actor_id, splits))
629 })
630 .collect();
631 let database_cdc_table_snapshot_split_assignment =
632 CdcTableSnapshotSplitAssignmentWithGeneration::new(
633 database_cdc_table_snapshot_split_assignment,
634 cdc_table_snapshot_split_assignment.generation,
635 );
636 let mutation = Mutation::Add(AddMutation {
637 actor_dispatchers: Default::default(),
639 added_actors: Default::default(),
640 actor_splits: build_actor_connector_splits(&source_split_assignments),
641 actor_cdc_table_snapshot_splits:
642 build_pb_actor_cdc_table_snapshot_splits_with_generation(
643 database_cdc_table_snapshot_split_assignment,
644 )
645 .into(),
646 pause: is_paused,
647 subscriptions_to_add: Default::default(),
648 backfill_nodes_to_pause: Default::default(),
650 new_upstream_sinks: Default::default(),
651 });
652
653 fn resolve_jobs_committed_epoch<'a>(
654 state_table_committed_epochs: &mut HashMap<TableId, u64>,
655 fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
656 ) -> u64 {
657 let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
658 (
659 table_id,
660 state_table_committed_epochs
661 .remove(&table_id)
662 .expect("should exist"),
663 )
664 });
665 let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
666 for (table_id, epoch) in epochs {
667 assert_eq!(
668 prev_epoch, epoch,
669 "{} has different committed epoch to {}",
670 first_table_id, table_id
671 );
672 }
673 prev_epoch
674 }
675
676 let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
677 .keys()
678 .filter_map(|job_id| {
679 mv_depended_subscriptions
680 .remove(&job_id.as_mv_table_id())
681 .map(|subscriptions| {
682 (
683 job_id.as_mv_table_id(),
684 subscriptions
685 .into_iter()
686 .map(|(subscription_id, retention)| {
687 (
688 subscription_id.as_raw_id(),
689 SubscriberType::Subscription(retention),
690 )
691 })
692 .collect(),
693 )
694 })
695 })
696 .collect();
697
698 let mut database_jobs = HashMap::new();
699 let mut snapshot_backfill_jobs = HashMap::new();
700
701 for (job_id, job_fragments) in jobs {
702 if let Some(definition) = background_jobs.remove(&job_id) {
703 if job_fragments.values().any(|fragment| {
704 fragment
705 .fragment_type_mask
706 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
707 }) {
708 debug!(%job_id, definition, "recovered snapshot backfill job");
709 snapshot_backfill_jobs.insert(job_id, (job_fragments, definition));
710 } else {
711 database_jobs.insert(job_id, (job_fragments, Some(definition)));
712 }
713 } else {
714 database_jobs.insert(job_id, (job_fragments, None));
715 }
716 }
717
718 let database_job_log_epochs: HashMap<_, _> = database_jobs
719 .keys()
720 .filter_map(|job_id| {
721 state_table_log_epochs
722 .remove(&job_id.as_mv_table_id())
723 .map(|epochs| (job_id.as_mv_table_id(), epochs))
724 })
725 .collect();
726
727 let prev_epoch = resolve_jobs_committed_epoch(
728 state_table_committed_epochs,
729 database_jobs.values().flat_map(|(job, _)| job.values()),
730 );
731 let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
732 let curr_epoch = prev_epoch.next();
734 let barrier_info = BarrierInfo {
735 prev_epoch,
736 curr_epoch,
737 kind: BarrierKind::Initial,
738 };
739
740 let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
741 for (job_id, (fragment_infos, definition)) in snapshot_backfill_jobs {
742 let committed_epoch =
743 resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
744 if committed_epoch == barrier_info.prev_epoch() {
745 info!(
746 "recovered creating snapshot backfill job {} catch up with upstream already",
747 job_id
748 );
749 database_jobs
750 .try_insert(job_id, (fragment_infos, Some(definition)))
751 .expect("non-duplicate");
752 continue;
753 }
754 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
755 fragment_infos
756 .values()
757 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
758 )?
759 .0
760 .ok_or_else(|| {
761 anyhow!(
762 "recovered snapshot backfill job {} has no snapshot backfill info",
763 job_id
764 )
765 })?;
766 let mut snapshot_epoch = None;
767 let upstream_table_ids: HashSet<_> = snapshot_backfill_info
768 .upstream_mv_table_id_to_backfill_epoch
769 .keys()
770 .cloned()
771 .collect();
772 for (upstream_table_id, epoch) in
773 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
774 {
775 let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
776 let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
777 if *snapshot_epoch != epoch {
778 return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
779 }
780 }
781 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
782 anyhow!(
783 "snapshot backfill job {} has not set snapshot epoch",
784 job_id
785 )
786 })?;
787 for upstream_table_id in &upstream_table_ids {
788 subscribers
789 .entry(*upstream_table_id)
790 .or_default()
791 .try_insert(job_id.as_raw_id(), SubscriberType::SnapshotBackfill)
792 .expect("non-duplicate");
793 }
794 ongoing_snapshot_backfill_jobs
795 .try_insert(
796 job_id,
797 (
798 fragment_infos,
799 definition,
800 upstream_table_ids,
801 committed_epoch,
802 snapshot_epoch,
803 ),
804 )
805 .expect("non-duplicated");
806 }
807
808 let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
809 database_jobs
810 .into_iter()
811 .map(|(job_id, (fragment_infos, background_job_definition))| {
812 let status = if let Some(definition) = background_job_definition {
813 CreateStreamingJobStatus::Creating(CreateMviewProgressTracker::recover(
814 job_id,
815 definition,
816 &fragment_infos,
817 Default::default(),
818 hummock_version_stats,
819 ))
820 } else {
821 CreateStreamingJobStatus::Created
822 };
823 (
824 job_id,
825 InflightStreamingJobInfo {
826 job_id,
827 fragment_infos,
828 subscribers: subscribers
829 .remove(&job_id.as_mv_table_id())
830 .unwrap_or_default(),
831 status,
832 },
833 )
834 })
835 .collect()
836 };
837
838 let node_to_collect = {
839 let node_actors =
840 edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
841 job.fragment_infos.values().map(move |fragment_info| {
842 (
843 fragment_info.fragment_id,
844 &fragment_info.nodes,
845 fragment_info.actors.iter().map(move |(actor_id, actor)| {
846 (
847 stream_actors.get(actor_id).expect("should exist"),
848 actor.worker_id,
849 )
850 }),
851 job.subscribers.keys().copied(),
852 )
853 })
854 }));
855
856 let node_to_collect = self.inject_barrier(
857 database_id,
858 None,
859 Some(mutation.clone()),
860 &barrier_info,
861 database_jobs.values().flatten(),
862 database_jobs.values().flatten(),
863 Some(node_actors),
864 )?;
865 debug!(
866 ?node_to_collect,
867 %database_id,
868 "inject initial barrier"
869 );
870 node_to_collect
871 };
872
873 let mut creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl> =
874 HashMap::new();
875 for (job_id, (info, definition, upstream_table_ids, committed_epoch, snapshot_epoch)) in
876 ongoing_snapshot_backfill_jobs
877 {
878 let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_info| {
879 (
880 fragment_info.fragment_id,
881 &fragment_info.nodes,
882 fragment_info.actors.iter().map(move |(actor_id, actor)| {
883 (
884 stream_actors.get(actor_id).expect("should exist"),
885 actor.worker_id,
886 )
887 }),
888 vec![], )
890 }));
891
892 creating_streaming_job_controls.insert(
893 job_id,
894 CreatingStreamingJobControl::recover(
895 database_id,
896 job_id,
897 definition,
898 upstream_table_ids,
899 &database_job_log_epochs,
900 snapshot_epoch,
901 committed_epoch,
902 barrier_info.curr_epoch.value().0,
903 info,
904 hummock_version_stats,
905 node_actors,
906 mutation.clone(),
907 self,
908 )?,
909 );
910 }
911
912 self.env.shared_actor_infos().recover_database(
913 database_id,
914 database_jobs
915 .values()
916 .flat_map(|info| {
917 info.fragment_infos()
918 .map(move |fragment| (fragment, info.job_id))
919 })
920 .chain(
921 creating_streaming_job_controls
922 .iter()
923 .flat_map(|(job_id, job)| {
924 job.graph_info()
925 .values()
926 .map(|fragment| (fragment, *job_id))
927 }),
928 ),
929 );
930
931 let committed_epoch = barrier_info.prev_epoch();
932 let new_epoch = barrier_info.curr_epoch;
933 let database_state = BarrierWorkerState::recovery(
934 database_id,
935 self.env.shared_actor_infos().clone(),
936 new_epoch,
937 database_jobs.into_values(),
938 is_paused,
939 );
940 let cdc_table_backfill_tracker = self.env.cdc_table_backfill_tracker();
941 Ok(DatabaseInitialBarrierCollector {
942 database_id,
943 node_to_collect,
944 database_state,
945 creating_streaming_job_controls,
946 committed_epoch,
947 cdc_table_backfill_tracker,
948 })
949 }
950
951 pub(super) fn inject_command_ctx_barrier(
952 &mut self,
953 database_id: DatabaseId,
954 command: Option<&Command>,
955 barrier_info: &BarrierInfo,
956 is_paused: bool,
957 pre_applied_graph_info: &InflightDatabaseInfo,
958 applied_graph_info: &InflightDatabaseInfo,
959 edges: &mut Option<FragmentEdgeBuildResult>,
960 ) -> MetaResult<NodeToCollect> {
961 let mutation = command.and_then(|c| c.to_mutation(is_paused, edges, self));
962 self.inject_barrier(
963 database_id,
964 None,
965 mutation,
966 barrier_info,
967 pre_applied_graph_info.fragment_infos(),
968 applied_graph_info.fragment_infos(),
969 command
970 .as_ref()
971 .map(|command| command.actors_to_create(pre_applied_graph_info, edges, self))
972 .unwrap_or_default(),
973 )
974 }
975
976 fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
977 self.workers
978 .iter()
979 .filter_map(|(worker_id, (_, worker_state))| match worker_state {
980 WorkerNodeState::Connected { control_stream, .. } => {
981 Some((*worker_id, control_stream))
982 }
983 WorkerNodeState::Reconnecting(_) => None,
984 })
985 }
986
987 pub(super) fn inject_barrier<'a>(
988 &mut self,
989 database_id: DatabaseId,
990 creating_job_id: Option<JobId>,
991 mutation: Option<Mutation>,
992 barrier_info: &BarrierInfo,
993 pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
994 applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
995 mut new_actors: Option<StreamJobActorsToCreate>,
996 ) -> MetaResult<NodeToCollect> {
997 fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
998 "inject_barrier_err"
999 ));
1000
1001 let partial_graph_id = to_partial_graph_id(creating_job_id);
1002
1003 let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
1004
1005 for worker_id in node_actors.keys() {
1006 if let Some((_, worker_state)) = self.workers.get(worker_id)
1007 && let WorkerNodeState::Connected { .. } = worker_state
1008 {
1009 } else {
1010 return Err(anyhow!("unconnected worker node {}", worker_id).into());
1011 }
1012 }
1013
1014 let table_ids_to_sync: HashSet<_> =
1015 InflightFragmentInfo::existing_table_ids(applied_graph_info).collect();
1016
1017 let mut node_need_collect = HashMap::new();
1018
1019 self.connected_workers()
1020 .try_for_each(|(node_id, node)| {
1021 let actor_ids_to_collect = node_actors
1022 .get(&node_id)
1023 .map(|actors| actors.iter().cloned())
1024 .into_iter()
1025 .flatten()
1026 .collect_vec();
1027 let is_empty = actor_ids_to_collect.is_empty();
1028 {
1029 let mutation = mutation.clone();
1030 let barrier = Barrier {
1031 epoch: Some(risingwave_pb::data::Epoch {
1032 curr: barrier_info.curr_epoch.value().0,
1033 prev: barrier_info.prev_epoch(),
1034 }),
1035 mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1036 tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1037 .to_protobuf(),
1038 kind: barrier_info.kind.to_protobuf() as i32,
1039 passed_actors: vec![],
1040 };
1041
1042 node.handle
1043 .request_sender
1044 .send(StreamingControlStreamRequest {
1045 request: Some(
1046 streaming_control_stream_request::Request::InjectBarrier(
1047 InjectBarrierRequest {
1048 request_id: Uuid::new_v4().to_string(),
1049 barrier: Some(barrier),
1050 database_id,
1051 actor_ids_to_collect,
1052 table_ids_to_sync: table_ids_to_sync
1053 .iter()
1054 .cloned()
1055 .collect(),
1056 partial_graph_id,
1057 actors_to_build: new_actors
1058 .as_mut()
1059 .map(|new_actors| new_actors.remove(&(node_id as _)))
1060 .into_iter()
1061 .flatten()
1062 .flatten()
1063 .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1064 FragmentBuildActorInfo {
1065 fragment_id,
1066 node: Some(node),
1067 actors: actors
1068 .into_iter()
1069 .map(|(actor, upstreams, dispatchers)| {
1070 BuildActorInfo {
1071 actor_id: actor.actor_id,
1072 fragment_upstreams: upstreams
1073 .into_iter()
1074 .map(|(fragment_id, upstreams)| {
1075 (
1076 fragment_id,
1077 UpstreamActors {
1078 actors: upstreams
1079 .into_values()
1080 .collect(),
1081 },
1082 )
1083 })
1084 .collect(),
1085 dispatchers,
1086 vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1087 mview_definition: actor.mview_definition,
1088 expr_context: actor.expr_context,
1089 config_override: actor.config_override.to_string(),
1090 initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1091 }
1092 })
1093 .collect(),
1094 }
1095 })
1096 .collect(),
1097 },
1098 ),
1099 ),
1100 })
1101 .map_err(|_| {
1102 MetaError::from(anyhow!(
1103 "failed to send request to {} {:?}",
1104 node.worker_id,
1105 node.host
1106 ))
1107 })?;
1108
1109 node_need_collect.insert(node_id as WorkerId, is_empty);
1110 Result::<_, MetaError>::Ok(())
1111 }
1112 })
1113 .inspect_err(|e| {
1114 use risingwave_pb::meta::event_log;
1116 let event = event_log::EventInjectBarrierFail {
1117 prev_epoch: barrier_info.prev_epoch(),
1118 cur_epoch: barrier_info.curr_epoch.value().0,
1119 error: e.to_report_string(),
1120 };
1121 self.env
1122 .event_log_manager_ref()
1123 .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1124 })?;
1125 Ok(node_need_collect)
1126 }
1127
1128 pub(super) fn add_partial_graph(
1129 &mut self,
1130 database_id: DatabaseId,
1131 creating_job_id: Option<JobId>,
1132 ) {
1133 let partial_graph_id = to_partial_graph_id(creating_job_id);
1134 self.connected_workers().for_each(|(_, node)| {
1135 if node
1136 .handle
1137 .request_sender
1138 .send(StreamingControlStreamRequest {
1139 request: Some(
1140 streaming_control_stream_request::Request::CreatePartialGraph(
1141 CreatePartialGraphRequest {
1142 database_id,
1143 partial_graph_id,
1144 },
1145 ),
1146 ),
1147 }).is_err() {
1148 warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1149 }
1150 });
1151 }
1152
1153 pub(super) fn remove_partial_graph(
1154 &mut self,
1155 database_id: DatabaseId,
1156 creating_job_ids: Vec<JobId>,
1157 ) {
1158 if creating_job_ids.is_empty() {
1159 return;
1160 }
1161 let partial_graph_ids = creating_job_ids
1162 .into_iter()
1163 .map(|job_id| to_partial_graph_id(Some(job_id)))
1164 .collect_vec();
1165 self.connected_workers().for_each(|(_, node)| {
1166 if node.handle
1167 .request_sender
1168 .send(StreamingControlStreamRequest {
1169 request: Some(
1170 streaming_control_stream_request::Request::RemovePartialGraph(
1171 RemovePartialGraphRequest {
1172 partial_graph_ids: partial_graph_ids.clone(),
1173 database_id,
1174 },
1175 ),
1176 ),
1177 })
1178 .is_err()
1179 {
1180 warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1181 }
1182 })
1183 }
1184
1185 pub(super) fn reset_database(
1186 &mut self,
1187 database_id: DatabaseId,
1188 reset_request_id: u32,
1189 ) -> HashSet<WorkerId> {
1190 self.connected_workers()
1191 .filter_map(|(worker_id, node)| {
1192 if node
1193 .handle
1194 .request_sender
1195 .send(StreamingControlStreamRequest {
1196 request: Some(streaming_control_stream_request::Request::ResetDatabase(
1197 ResetDatabaseRequest {
1198 database_id,
1199 reset_request_id,
1200 },
1201 )),
1202 })
1203 .is_err()
1204 {
1205 warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1206 None
1207 } else {
1208 Some(worker_id)
1209 }
1210 })
1211 .collect()
1212 }
1213}
1214
1215impl GlobalBarrierWorkerContextImpl {
1216 pub(super) async fn new_control_stream_impl(
1217 &self,
1218 node: &WorkerNode,
1219 init_request: &PbInitRequest,
1220 ) -> MetaResult<StreamingControlHandle> {
1221 let handle = self
1222 .env
1223 .stream_client_pool()
1224 .get(node)
1225 .await?
1226 .start_streaming_control(init_request.clone())
1227 .await?;
1228 Ok(handle)
1229 }
1230}
1231
1232pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1233 message: &str,
1234 errors: impl IntoIterator<Item = (WorkerId, E)>,
1235) -> MetaError {
1236 use std::fmt::Write;
1237
1238 use risingwave_common::error::error_request_copy;
1239 use risingwave_common::error::tonic::extra::Score;
1240
1241 let errors = errors.into_iter().collect_vec();
1242
1243 if errors.is_empty() {
1244 return anyhow!(message.to_owned()).into();
1245 }
1246
1247 let single_error = |(worker_id, e)| {
1249 anyhow::Error::from(e)
1250 .context(format!("{message}, in worker node {worker_id}"))
1251 .into()
1252 };
1253
1254 if errors.len() == 1 {
1255 return single_error(errors.into_iter().next().unwrap());
1256 }
1257
1258 let max_score = errors
1260 .iter()
1261 .filter_map(|(_, e)| error_request_copy::<Score>(e))
1262 .max();
1263
1264 if let Some(max_score) = max_score {
1265 let mut errors = errors;
1266 let max_scored = errors
1267 .extract_if(.., |(_, e)| {
1268 error_request_copy::<Score>(e) == Some(max_score)
1269 })
1270 .next()
1271 .unwrap();
1272
1273 return single_error(max_scored);
1274 }
1275
1276 let concat: String = errors
1278 .into_iter()
1279 .fold(format!("{message}: "), |mut s, (w, e)| {
1280 write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1281 s
1282 });
1283 anyhow!(concat).into()
1284}