1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
30use risingwave_common::id::JobId;
31use risingwave_common::util::epoch::Epoch;
32use risingwave_common::util::tracing::TracingContext;
33use risingwave_connector::source::SplitImpl;
34use risingwave_connector::source::cdc::{
35 CdcTableSnapshotSplitAssignmentWithGeneration,
36 build_pb_actor_cdc_table_snapshot_splits_with_generation,
37};
38use risingwave_meta_model::WorkerId;
39use risingwave_pb::common::{HostAddress, WorkerNode};
40use risingwave_pb::hummock::HummockVersionStats;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
43use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
44use risingwave_pb::stream_service::inject_barrier_request::{
45 BuildActorInfo, FragmentBuildActorInfo,
46};
47use risingwave_pb::stream_service::streaming_control_stream_request::{
48 CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
49 RemovePartialGraphRequest, ResetDatabaseRequest,
50};
51use risingwave_pb::stream_service::{
52 BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
53 streaming_control_stream_request, streaming_control_stream_response,
54};
55use risingwave_rpc_client::StreamingControlHandle;
56use thiserror_ext::AsReport;
57use tokio::time::{Instant, sleep};
58use tokio_retry::strategy::ExponentialBackoff;
59use tracing::{debug, error, info, warn};
60use uuid::Uuid;
61
62use super::{BarrierKind, Command, TracedEpoch};
63use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
64use crate::barrier::checkpoint::{
65 BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
66};
67use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
68use crate::barrier::edge_builder::FragmentEdgeBuildResult;
69use crate::barrier::info::{
70 BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo, SubscriberType,
71};
72use crate::barrier::progress::CreateMviewProgressTracker;
73use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
74use crate::controller::fragment::InflightFragmentInfo;
75use crate::manager::MetaSrvEnv;
76use crate::model::{ActorId, FragmentId, StreamActor, StreamJobActorsToCreate, SubscriptionId};
77use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
78use crate::{MetaError, MetaResult};
79
80fn to_partial_graph_id(job_id: Option<JobId>) -> u32 {
81 job_id
82 .map(|job_id| {
83 assert_ne!(job_id.as_raw_id(), u32::MAX);
84 job_id.as_raw_id()
85 })
86 .unwrap_or(u32::MAX)
87}
88
89pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<JobId> {
90 if partial_graph_id == u32::MAX {
91 None
92 } else {
93 Some(JobId::new(partial_graph_id))
94 }
95}
96
97struct ControlStreamNode {
98 worker_id: WorkerId,
99 host: HostAddress,
100 handle: StreamingControlHandle,
101}
102
103enum WorkerNodeState {
104 Connected {
105 control_stream: ControlStreamNode,
106 removed: bool,
107 },
108 Reconnecting(BoxFuture<'static, StreamingControlHandle>),
109}
110
111pub(super) struct ControlStreamManager {
112 workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
113 pub env: MetaSrvEnv,
114}
115
116impl ControlStreamManager {
117 pub(super) fn new(env: MetaSrvEnv) -> Self {
118 Self {
119 workers: Default::default(),
120 env,
121 }
122 }
123
124 pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
125 self.workers[&worker_id].0.host.clone().unwrap()
126 }
127
128 pub(super) async fn add_worker(
129 &mut self,
130 node: WorkerNode,
131 inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
132 term_id: String,
133 context: &impl GlobalBarrierWorkerContext,
134 ) {
135 let node_id = node.id as WorkerId;
136 if let Entry::Occupied(entry) = self.workers.entry(node_id) {
137 let (existing_node, worker_state) = entry.get();
138 assert_eq!(existing_node.host, node.host);
139 warn!(id = node.id, host = ?node.host, "node already exists");
140 match worker_state {
141 WorkerNodeState::Connected { .. } => {
142 warn!(id = node.id, host = ?node.host, "new node already connected");
143 return;
144 }
145 WorkerNodeState::Reconnecting(_) => {
146 warn!(id = node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
147 entry.remove();
148 }
149 }
150 }
151 let node_host = node.host.clone().unwrap();
152 let mut backoff = ExponentialBackoff::from_millis(100)
153 .max_delay(Duration::from_secs(3))
154 .factor(5);
155 const MAX_RETRY: usize = 5;
156 for i in 1..=MAX_RETRY {
157 match context
158 .new_control_stream(
159 &node,
160 &PbInitRequest {
161 term_id: term_id.clone(),
162 },
163 )
164 .await
165 {
166 Ok(mut handle) => {
167 WorkerNodeConnected {
168 handle: &mut handle,
169 node: &node,
170 }
171 .initialize(inflight_infos);
172 info!(?node_host, "add control stream worker");
173 assert!(
174 self.workers
175 .insert(
176 node_id,
177 (
178 node,
179 WorkerNodeState::Connected {
180 control_stream: ControlStreamNode {
181 worker_id: node_id as _,
182 host: node_host,
183 handle,
184 },
185 removed: false
186 }
187 )
188 )
189 .is_none()
190 );
191 return;
192 }
193 Err(e) => {
194 let delay = backoff.next().unwrap();
197 error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
198 sleep(delay).await;
199 }
200 }
201 }
202 error!(?node_host, "fail to create worker node after retry");
203 }
204
205 pub(super) fn remove_worker(&mut self, node: WorkerNode) {
206 if let Entry::Occupied(mut entry) = self.workers.entry(node.id as _) {
207 let (_, worker_state) = entry.get_mut();
208 match worker_state {
209 WorkerNodeState::Connected { removed, .. } => {
210 info!(worker_id = node.id, "mark connected worker as removed");
211 *removed = true;
212 }
213 WorkerNodeState::Reconnecting(_) => {
214 info!(worker_id = node.id, "remove worker");
215 entry.remove();
216 }
217 }
218 }
219 }
220
221 fn retry_connect(
222 node: WorkerNode,
223 term_id: String,
224 context: Arc<impl GlobalBarrierWorkerContext>,
225 ) -> BoxFuture<'static, StreamingControlHandle> {
226 async move {
227 let mut attempt = 0;
228 let backoff = ExponentialBackoff::from_millis(100)
229 .max_delay(Duration::from_mins(1))
230 .factor(5);
231 let init_request = PbInitRequest { term_id };
232 for delay in backoff {
233 attempt += 1;
234 sleep(delay).await;
235 match context.new_control_stream(&node, &init_request).await {
236 Ok(handle) => {
237 return handle;
238 }
239 Err(e) => {
240 warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
241 }
242 }
243 }
244 unreachable!("end of retry backoff")
245 }.boxed()
246 }
247
248 pub(super) async fn recover(
249 env: MetaSrvEnv,
250 nodes: &HashMap<WorkerId, WorkerNode>,
251 term_id: &str,
252 context: Arc<impl GlobalBarrierWorkerContext>,
253 ) -> Self {
254 let reset_start_time = Instant::now();
255 let init_request = PbInitRequest {
256 term_id: term_id.to_owned(),
257 };
258 let init_request = &init_request;
259 let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
260 let result = context.new_control_stream(node, init_request).await;
261 (*worker_id, node.clone(), result)
262 }))
263 .await;
264 let mut unconnected_workers = HashSet::new();
265 let mut workers = HashMap::new();
266 for (worker_id, node, result) in nodes {
267 match result {
268 Ok(handle) => {
269 let control_stream = ControlStreamNode {
270 worker_id: node.id as _,
271 host: node.host.clone().unwrap(),
272 handle,
273 };
274 assert!(
275 workers
276 .insert(
277 worker_id,
278 (
279 node,
280 WorkerNodeState::Connected {
281 control_stream,
282 removed: false
283 }
284 )
285 )
286 .is_none()
287 );
288 }
289 Err(e) => {
290 unconnected_workers.insert(worker_id);
291 warn!(
292 e = %e.as_report(),
293 worker_id,
294 ?node,
295 "failed to connect to node"
296 );
297 assert!(
298 workers
299 .insert(
300 worker_id,
301 (
302 node.clone(),
303 WorkerNodeState::Reconnecting(Self::retry_connect(
304 node,
305 term_id.to_owned(),
306 context.clone()
307 ))
308 )
309 )
310 .is_none()
311 );
312 }
313 }
314 }
315
316 info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
317
318 Self { workers, env }
319 }
320
321 pub(super) fn clear(&mut self) {
323 *self = Self::new(self.env.clone());
324 }
325}
326
327pub(super) struct WorkerNodeConnected<'a> {
328 node: &'a WorkerNode,
329 handle: &'a mut StreamingControlHandle,
330}
331
332impl<'a> WorkerNodeConnected<'a> {
333 pub(super) fn initialize(
334 self,
335 inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
336 ) {
337 for request in ControlStreamManager::collect_init_partial_graph(inflight_infos) {
338 if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
339 request: Some(
340 streaming_control_stream_request::Request::CreatePartialGraph(request),
341 ),
342 }) {
343 warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
344 }
345 }
346 }
347}
348
349pub(super) enum WorkerNodeEvent<'a> {
350 Response(MetaResult<streaming_control_stream_response::Response>),
351 Connected(WorkerNodeConnected<'a>),
352}
353
354impl ControlStreamManager {
355 fn poll_next_event<'a>(
356 this_opt: &mut Option<&'a mut Self>,
357 cx: &mut Context<'_>,
358 term_id: &str,
359 context: &Arc<impl GlobalBarrierWorkerContext>,
360 poll_reconnect: bool,
361 ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
362 let this = this_opt.as_mut().expect("Future polled after completion");
363 if this.workers.is_empty() {
364 return Poll::Pending;
365 }
366 {
367 for (&worker_id, (node, worker_state)) in &mut this.workers {
368 let control_stream = match worker_state {
369 WorkerNodeState::Connected { control_stream, .. } => control_stream,
370 WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
371 continue;
372 }
373 WorkerNodeState::Reconnecting(join_handle) => {
374 match join_handle.poll_unpin(cx) {
375 Poll::Ready(handle) => {
376 info!(id=node.id, host=?node.host, "reconnected to worker");
377 *worker_state = WorkerNodeState::Connected {
378 control_stream: ControlStreamNode {
379 worker_id: node.id as _,
380 host: node.host.clone().unwrap(),
381 handle,
382 },
383 removed: false,
384 };
385 let this = this_opt.take().expect("should exist");
386 let (node, worker_state) =
387 this.workers.get_mut(&worker_id).expect("should exist");
388 let WorkerNodeState::Connected { control_stream, .. } =
389 worker_state
390 else {
391 unreachable!()
392 };
393 return Poll::Ready((
394 worker_id,
395 WorkerNodeEvent::Connected(WorkerNodeConnected {
396 node,
397 handle: &mut control_stream.handle,
398 }),
399 ));
400 }
401 Poll::Pending => {
402 continue;
403 }
404 }
405 }
406 };
407 match control_stream.handle.response_stream.poll_next_unpin(cx) {
408 Poll::Ready(result) => {
409 {
410 let result = result
411 .ok_or_else(|| (false, anyhow!("end of stream").into()))
412 .and_then(|result| {
413 result.map_err(|err| -> (bool, MetaError) {(false, err.into())}).and_then(|resp| {
414 match resp
415 .response
416 .ok_or_else(|| (false, anyhow!("empty response").into()))?
417 {
418 streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
419 "worker node {worker_id} is shutting down"
420 )
421 .into())),
422 streaming_control_stream_response::Response::Init(_) => {
423 Err((false, anyhow!("get unexpected init response").into()))
425 }
426 resp => {
427 if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
428 assert_eq!(worker_id, barrier_resp.worker_id as WorkerId);
429 }
430 Ok(resp)
431 },
432 }
433 })
434 });
435 let result = match result {
436 Ok(resp) => Ok(resp),
437 Err((shutdown, err)) => {
438 warn!(worker_id = node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
439 let WorkerNodeState::Connected { removed, .. } = worker_state
440 else {
441 unreachable!("checked connected")
442 };
443 if *removed || shutdown {
444 this.workers.remove(&worker_id);
445 } else {
446 *worker_state = WorkerNodeState::Reconnecting(
447 ControlStreamManager::retry_connect(
448 node.clone(),
449 term_id.to_owned(),
450 context.clone(),
451 ),
452 );
453 }
454 Err(err)
455 }
456 };
457 return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
458 }
459 }
460 Poll::Pending => {
461 continue;
462 }
463 }
464 }
465 };
466
467 Poll::Pending
468 }
469
470 #[await_tree::instrument("control_stream_next_event")]
471 pub(super) async fn next_event<'a>(
472 &'a mut self,
473 term_id: &str,
474 context: &Arc<impl GlobalBarrierWorkerContext>,
475 ) -> (WorkerId, WorkerNodeEvent<'a>) {
476 let mut this = Some(self);
477 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
478 }
479
480 #[await_tree::instrument("control_stream_next_response")]
481 pub(super) async fn next_response(
482 &mut self,
483 term_id: &str,
484 context: &Arc<impl GlobalBarrierWorkerContext>,
485 ) -> (
486 WorkerId,
487 MetaResult<streaming_control_stream_response::Response>,
488 ) {
489 let mut this = Some(self);
490 let (worker_id, event) =
491 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
492 match event {
493 WorkerNodeEvent::Response(result) => (worker_id, result),
494 WorkerNodeEvent::Connected(_) => {
495 unreachable!("set poll_reconnect=false")
496 }
497 }
498 }
499
500 fn collect_init_partial_graph(
501 initial_inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
502 ) -> impl Iterator<Item = PbCreatePartialGraphRequest> {
503 initial_inflight_infos.flat_map(|(database_id, creating_job_ids)| {
504 [PbCreatePartialGraphRequest {
505 partial_graph_id: to_partial_graph_id(None),
506 database_id: database_id.into(),
507 }]
508 .into_iter()
509 .chain(
510 creating_job_ids.map(move |job_id| PbCreatePartialGraphRequest {
511 partial_graph_id: to_partial_graph_id(Some(job_id)),
512 database_id: database_id.into(),
513 }),
514 )
515 })
516 }
517}
518
519pub(super) struct DatabaseInitialBarrierCollector {
520 database_id: DatabaseId,
521 node_to_collect: NodeToCollect,
522 database_state: BarrierWorkerState,
523 create_mview_tracker: CreateMviewProgressTracker,
524 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
525 committed_epoch: u64,
526 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
527}
528
529impl Debug for DatabaseInitialBarrierCollector {
530 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
531 f.debug_struct("DatabaseInitialBarrierCollector")
532 .field("database_id", &self.database_id)
533 .field("node_to_collect", &self.node_to_collect)
534 .finish()
535 }
536}
537
538impl DatabaseInitialBarrierCollector {
539 pub(super) fn is_collected(&self) -> bool {
540 self.node_to_collect.is_empty()
541 && self
542 .creating_streaming_job_controls
543 .values()
544 .all(|job| job.is_empty())
545 }
546
547 pub(super) fn database_state(
548 &self,
549 ) -> (
550 &BarrierWorkerState,
551 &HashMap<JobId, CreatingStreamingJobControl>,
552 ) {
553 (&self.database_state, &self.creating_streaming_job_controls)
554 }
555
556 pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
557 assert_eq!(self.database_id.as_raw_id(), resp.database_id);
558 if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
559 self.creating_streaming_job_controls
560 .get_mut(&creating_job_id)
561 .expect("should exist")
562 .collect(resp);
563 } else {
564 assert_eq!(resp.epoch, self.committed_epoch);
565 assert!(
566 self.node_to_collect
567 .remove(&(resp.worker_id as _))
568 .is_some()
569 );
570 }
571 }
572
573 pub(super) fn finish(self) -> DatabaseCheckpointControl {
574 assert!(self.is_collected());
575 DatabaseCheckpointControl::recovery(
576 self.database_id,
577 self.create_mview_tracker,
578 self.database_state,
579 self.committed_epoch,
580 self.creating_streaming_job_controls,
581 self.cdc_table_backfill_tracker,
582 )
583 }
584
585 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
586 is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
587 && self
588 .creating_streaming_job_controls
589 .values_mut()
590 .all(|job| job.is_valid_after_worker_err(worker_id))
591 }
592}
593
594impl ControlStreamManager {
595 #[expect(clippy::too_many_arguments)]
597 pub(super) fn inject_database_initial_barrier(
598 &mut self,
599 database_id: DatabaseId,
600 jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
601 state_table_committed_epochs: &mut HashMap<TableId, u64>,
602 state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
603 edges: &mut FragmentEdgeBuildResult,
604 stream_actors: &HashMap<ActorId, StreamActor>,
605 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
606 background_jobs: &mut HashMap<JobId, String>,
607 mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
608 is_paused: bool,
609 hummock_version_stats: &HummockVersionStats,
610 cdc_table_snapshot_split_assignment: &mut CdcTableSnapshotSplitAssignmentWithGeneration,
611 ) -> MetaResult<DatabaseInitialBarrierCollector> {
612 self.add_partial_graph(database_id, None);
613 let source_split_assignments = jobs
614 .values()
615 .flat_map(|fragments| fragments.values())
616 .flat_map(|info| info.actors.keys())
617 .filter_map(|actor_id| {
618 let actor_id = *actor_id as ActorId;
619 source_splits
620 .remove(&actor_id)
621 .map(|splits| (actor_id, splits))
622 })
623 .collect();
624 let database_cdc_table_snapshot_split_assignment = jobs
625 .values()
626 .flat_map(|fragments| fragments.values())
627 .flat_map(|info| info.actors.keys())
628 .filter_map(|actor_id| {
629 let actor_id = *actor_id as ActorId;
630 cdc_table_snapshot_split_assignment
631 .splits
632 .remove(&actor_id)
633 .map(|splits| (actor_id, splits))
634 })
635 .collect();
636 let database_cdc_table_snapshot_split_assignment =
637 CdcTableSnapshotSplitAssignmentWithGeneration::new(
638 database_cdc_table_snapshot_split_assignment,
639 cdc_table_snapshot_split_assignment.generation,
640 );
641 let mutation = Mutation::Add(AddMutation {
642 actor_dispatchers: Default::default(),
644 added_actors: Default::default(),
645 actor_splits: build_actor_connector_splits(&source_split_assignments),
646 actor_cdc_table_snapshot_splits:
647 build_pb_actor_cdc_table_snapshot_splits_with_generation(
648 database_cdc_table_snapshot_split_assignment,
649 )
650 .into(),
651 pause: is_paused,
652 subscriptions_to_add: Default::default(),
653 backfill_nodes_to_pause: Default::default(),
655 new_upstream_sinks: Default::default(),
656 });
657
658 fn resolve_jobs_committed_epoch<'a>(
659 state_table_committed_epochs: &mut HashMap<TableId, u64>,
660 fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
661 ) -> u64 {
662 let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
663 (
664 table_id,
665 state_table_committed_epochs
666 .remove(&table_id)
667 .expect("should exist"),
668 )
669 });
670 let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
671 for (table_id, epoch) in epochs {
672 assert_eq!(
673 prev_epoch, epoch,
674 "{} has different committed epoch to {}",
675 first_table_id, table_id
676 );
677 }
678 prev_epoch
679 }
680
681 let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
682 .keys()
683 .filter_map(|job_id| {
684 mv_depended_subscriptions
685 .remove(&job_id.as_mv_table_id())
686 .map(|subscriptions| {
687 (
688 job_id.as_mv_table_id(),
689 subscriptions
690 .into_iter()
691 .map(|(subscription_id, retention)| {
692 (subscription_id, SubscriberType::Subscription(retention))
693 })
694 .collect(),
695 )
696 })
697 })
698 .collect();
699
700 let mut database_jobs = HashMap::new();
701 let mut snapshot_backfill_jobs = HashMap::new();
702 let mut background_mviews = HashMap::new();
703
704 for (job_id, job_fragments) in jobs {
705 if let Some(definition) = background_jobs.remove(&job_id) {
706 if job_fragments.values().any(|fragment| {
707 fragment
708 .fragment_type_mask
709 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
710 }) {
711 debug!(%job_id, definition, "recovered snapshot backfill job");
712 snapshot_backfill_jobs.insert(job_id, (job_fragments, definition));
713 } else {
714 database_jobs.insert(job_id, job_fragments);
715 background_mviews.insert(job_id, definition);
716 }
717 } else {
718 database_jobs.insert(job_id, job_fragments);
719 }
720 }
721
722 let database_job_log_epochs: HashMap<_, _> = database_jobs
723 .keys()
724 .filter_map(|job_id| {
725 state_table_log_epochs
726 .remove(&job_id.as_mv_table_id())
727 .map(|epochs| (job_id.as_mv_table_id(), epochs))
728 })
729 .collect();
730
731 let prev_epoch = resolve_jobs_committed_epoch(
732 state_table_committed_epochs,
733 database_jobs.values().flat_map(|job| job.values()),
734 );
735 let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
736 let curr_epoch = prev_epoch.next();
738 let barrier_info = BarrierInfo {
739 prev_epoch,
740 curr_epoch,
741 kind: BarrierKind::Initial,
742 };
743
744 let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
745 for (job_id, (fragment_infos, definition)) in snapshot_backfill_jobs {
746 let committed_epoch =
747 resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
748 if committed_epoch == barrier_info.prev_epoch() {
749 info!(
750 "recovered creating snapshot backfill job {} catch up with upstream already",
751 job_id
752 );
753 background_mviews
754 .try_insert(job_id, definition)
755 .expect("non-duplicate");
756 database_jobs
757 .try_insert(job_id, fragment_infos)
758 .expect("non-duplicate");
759 continue;
760 }
761 let info = InflightStreamingJobInfo {
762 job_id,
763 fragment_infos,
764 subscribers: Default::default(), };
766 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
767 info.fragment_infos()
768 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
769 )?
770 .0
771 .ok_or_else(|| {
772 anyhow!(
773 "recovered snapshot backfill job {} has no snapshot backfill info",
774 job_id
775 )
776 })?;
777 let mut snapshot_epoch = None;
778 let upstream_table_ids: HashSet<_> = snapshot_backfill_info
779 .upstream_mv_table_id_to_backfill_epoch
780 .keys()
781 .cloned()
782 .collect();
783 for (upstream_table_id, epoch) in
784 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
785 {
786 let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
787 let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
788 if *snapshot_epoch != epoch {
789 return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
790 }
791 }
792 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
793 anyhow!(
794 "snapshot backfill job {} has not set snapshot epoch",
795 job_id
796 )
797 })?;
798 for upstream_table_id in &upstream_table_ids {
799 subscribers
800 .entry(*upstream_table_id)
801 .or_default()
802 .try_insert(job_id.into(), SubscriberType::SnapshotBackfill)
803 .expect("non-duplicate");
804 }
805 ongoing_snapshot_backfill_jobs
806 .try_insert(
807 job_id,
808 (
809 info,
810 definition,
811 upstream_table_ids,
812 committed_epoch,
813 snapshot_epoch,
814 ),
815 )
816 .expect("non-duplicated");
817 }
818
819 let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
820 database_jobs
821 .into_iter()
822 .map(|(job_id, fragment_infos)| {
823 (
824 job_id,
825 InflightStreamingJobInfo {
826 job_id,
827 fragment_infos,
828 subscribers: subscribers
829 .remove(&job_id.as_mv_table_id())
830 .unwrap_or_default(),
831 },
832 )
833 })
834 .collect()
835 };
836
837 let node_to_collect = {
838 let node_actors =
839 edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
840 job.fragment_infos.values().map(move |fragment_info| {
841 (
842 fragment_info.fragment_id,
843 &fragment_info.nodes,
844 fragment_info.actors.iter().map(move |(actor_id, actor)| {
845 (
846 stream_actors.get(actor_id).expect("should exist"),
847 actor.worker_id,
848 )
849 }),
850 job.subscribers.keys().copied(),
851 )
852 })
853 }));
854
855 let node_to_collect = self.inject_barrier(
856 database_id,
857 None,
858 Some(mutation.clone()),
859 &barrier_info,
860 database_jobs.values().flatten(),
861 database_jobs.values().flatten(),
862 Some(node_actors),
863 )?;
864 debug!(
865 ?node_to_collect,
866 %database_id,
867 "inject initial barrier"
868 );
869 node_to_collect
870 };
871
872 let tracker = CreateMviewProgressTracker::recover(
873 background_mviews.iter().map(|(table_id, definition)| {
874 (
875 *table_id,
876 (
877 definition.clone(),
878 &database_jobs[table_id],
879 Default::default(),
880 ),
881 )
882 }),
883 hummock_version_stats,
884 );
885
886 let mut creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl> =
887 HashMap::new();
888 for (job_id, (info, definition, upstream_table_ids, committed_epoch, snapshot_epoch)) in
889 ongoing_snapshot_backfill_jobs
890 {
891 let node_actors =
892 edges.collect_actors_to_create(info.fragment_infos().map(|fragment_info| {
893 (
894 fragment_info.fragment_id,
895 &fragment_info.nodes,
896 fragment_info.actors.iter().map(move |(actor_id, actor)| {
897 (
898 stream_actors.get(actor_id).expect("should exist"),
899 actor.worker_id,
900 )
901 }),
902 info.subscribers.keys().copied(),
903 )
904 }));
905
906 creating_streaming_job_controls.insert(
907 job_id,
908 CreatingStreamingJobControl::recover(
909 database_id,
910 job_id,
911 definition,
912 upstream_table_ids,
913 &database_job_log_epochs,
914 snapshot_epoch,
915 committed_epoch,
916 barrier_info.curr_epoch.value().0,
917 info,
918 hummock_version_stats,
919 node_actors,
920 mutation.clone(),
921 self,
922 )?,
923 );
924 }
925
926 self.env.shared_actor_infos().recover_database(
927 database_id,
928 database_jobs
929 .values()
930 .chain(
931 creating_streaming_job_controls
932 .values()
933 .map(|job| job.graph_info()),
934 )
935 .flat_map(|info| {
936 info.fragment_infos()
937 .map(move |fragment| (fragment, info.job_id))
938 }),
939 );
940
941 let committed_epoch = barrier_info.prev_epoch();
942 let new_epoch = barrier_info.curr_epoch;
943 let database_state = BarrierWorkerState::recovery(
944 database_id,
945 self.env.shared_actor_infos().clone(),
946 new_epoch,
947 database_jobs.into_values(),
948 is_paused,
949 );
950 let cdc_table_backfill_tracker = self.env.cdc_table_backfill_tracker();
951 Ok(DatabaseInitialBarrierCollector {
952 database_id,
953 node_to_collect,
954 database_state,
955 create_mview_tracker: tracker,
956 creating_streaming_job_controls,
957 committed_epoch,
958 cdc_table_backfill_tracker,
959 })
960 }
961
962 pub(super) fn inject_command_ctx_barrier(
963 &mut self,
964 database_id: DatabaseId,
965 command: Option<&Command>,
966 barrier_info: &BarrierInfo,
967 is_paused: bool,
968 pre_applied_graph_info: &InflightDatabaseInfo,
969 applied_graph_info: &InflightDatabaseInfo,
970 edges: &mut Option<FragmentEdgeBuildResult>,
971 ) -> MetaResult<NodeToCollect> {
972 let mutation = command.and_then(|c| c.to_mutation(is_paused, edges, self));
973 self.inject_barrier(
974 database_id,
975 None,
976 mutation,
977 barrier_info,
978 pre_applied_graph_info.fragment_infos(),
979 applied_graph_info.fragment_infos(),
980 command
981 .as_ref()
982 .map(|command| command.actors_to_create(pre_applied_graph_info, edges, self))
983 .unwrap_or_default(),
984 )
985 }
986
987 fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
988 self.workers
989 .iter()
990 .filter_map(|(worker_id, (_, worker_state))| match worker_state {
991 WorkerNodeState::Connected { control_stream, .. } => {
992 Some((*worker_id, control_stream))
993 }
994 WorkerNodeState::Reconnecting(_) => None,
995 })
996 }
997
998 pub(super) fn inject_barrier<'a>(
999 &mut self,
1000 database_id: DatabaseId,
1001 creating_job_id: Option<JobId>,
1002 mutation: Option<Mutation>,
1003 barrier_info: &BarrierInfo,
1004 pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
1005 applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
1006 mut new_actors: Option<StreamJobActorsToCreate>,
1007 ) -> MetaResult<NodeToCollect> {
1008 fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1009 "inject_barrier_err"
1010 ));
1011
1012 let partial_graph_id = to_partial_graph_id(creating_job_id);
1013
1014 let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
1015
1016 for worker_id in node_actors.keys() {
1017 if let Some((_, worker_state)) = self.workers.get(worker_id)
1018 && let WorkerNodeState::Connected { .. } = worker_state
1019 {
1020 } else {
1021 return Err(anyhow!("unconnected worker node {}", worker_id).into());
1022 }
1023 }
1024
1025 let table_ids_to_sync: HashSet<_> =
1026 InflightFragmentInfo::existing_table_ids(applied_graph_info)
1027 .map(|table_id| table_id.as_raw_id())
1028 .collect();
1029
1030 let mut node_need_collect = HashMap::new();
1031
1032 self.connected_workers()
1033 .try_for_each(|(node_id, node)| {
1034 let actor_ids_to_collect = node_actors
1035 .get(&node_id)
1036 .map(|actors| actors.iter().cloned())
1037 .into_iter()
1038 .flatten()
1039 .collect_vec();
1040 let is_empty = actor_ids_to_collect.is_empty();
1041 {
1042 let mutation = mutation.clone();
1043 let barrier = Barrier {
1044 epoch: Some(risingwave_pb::data::Epoch {
1045 curr: barrier_info.curr_epoch.value().0,
1046 prev: barrier_info.prev_epoch(),
1047 }),
1048 mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1049 tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1050 .to_protobuf(),
1051 kind: barrier_info.kind.to_protobuf() as i32,
1052 passed_actors: vec![],
1053 };
1054
1055 node.handle
1056 .request_sender
1057 .send(StreamingControlStreamRequest {
1058 request: Some(
1059 streaming_control_stream_request::Request::InjectBarrier(
1060 InjectBarrierRequest {
1061 request_id: Uuid::new_v4().to_string(),
1062 barrier: Some(barrier),
1063 database_id: database_id.as_raw_id(),
1064 actor_ids_to_collect,
1065 table_ids_to_sync: table_ids_to_sync
1066 .iter()
1067 .cloned()
1068 .collect(),
1069 partial_graph_id,
1070 actors_to_build: new_actors
1071 .as_mut()
1072 .map(|new_actors| new_actors.remove(&(node_id as _)))
1073 .into_iter()
1074 .flatten()
1075 .flatten()
1076 .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1077 FragmentBuildActorInfo {
1078 fragment_id,
1079 node: Some(node),
1080 actors: actors
1081 .into_iter()
1082 .map(|(actor, upstreams, dispatchers)| {
1083 BuildActorInfo {
1084 actor_id: actor.actor_id,
1085 fragment_upstreams: upstreams
1086 .into_iter()
1087 .map(|(fragment_id, upstreams)| {
1088 (
1089 fragment_id,
1090 UpstreamActors {
1091 actors: upstreams
1092 .into_values()
1093 .collect(),
1094 },
1095 )
1096 })
1097 .collect(),
1098 dispatchers,
1099 vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1100 mview_definition: actor.mview_definition,
1101 expr_context: actor.expr_context,
1102 initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1103 }
1104 })
1105 .collect(),
1106 }
1107 })
1108 .collect(),
1109 },
1110 ),
1111 ),
1112 })
1113 .map_err(|_| {
1114 MetaError::from(anyhow!(
1115 "failed to send request to {} {:?}",
1116 node.worker_id,
1117 node.host
1118 ))
1119 })?;
1120
1121 node_need_collect.insert(node_id as WorkerId, is_empty);
1122 Result::<_, MetaError>::Ok(())
1123 }
1124 })
1125 .inspect_err(|e| {
1126 use risingwave_pb::meta::event_log;
1128 let event = event_log::EventInjectBarrierFail {
1129 prev_epoch: barrier_info.prev_epoch(),
1130 cur_epoch: barrier_info.curr_epoch.value().0,
1131 error: e.to_report_string(),
1132 };
1133 self.env
1134 .event_log_manager_ref()
1135 .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1136 })?;
1137 Ok(node_need_collect)
1138 }
1139
1140 pub(super) fn add_partial_graph(
1141 &mut self,
1142 database_id: DatabaseId,
1143 creating_job_id: Option<JobId>,
1144 ) {
1145 let partial_graph_id = to_partial_graph_id(creating_job_id);
1146 self.connected_workers().for_each(|(_, node)| {
1147 if node
1148 .handle
1149 .request_sender
1150 .send(StreamingControlStreamRequest {
1151 request: Some(
1152 streaming_control_stream_request::Request::CreatePartialGraph(
1153 CreatePartialGraphRequest {
1154 database_id: database_id.as_raw_id(),
1155 partial_graph_id,
1156 },
1157 ),
1158 ),
1159 }).is_err() {
1160 warn!(%database_id, ?creating_job_id, worker_id = node.worker_id, "fail to add partial graph to worker")
1161 }
1162 });
1163 }
1164
1165 pub(super) fn remove_partial_graph(
1166 &mut self,
1167 database_id: DatabaseId,
1168 creating_job_ids: Vec<JobId>,
1169 ) {
1170 if creating_job_ids.is_empty() {
1171 return;
1172 }
1173 let partial_graph_ids = creating_job_ids
1174 .into_iter()
1175 .map(|job_id| to_partial_graph_id(Some(job_id)))
1176 .collect_vec();
1177 self.connected_workers().for_each(|(_, node)| {
1178 if node.handle
1179 .request_sender
1180 .send(StreamingControlStreamRequest {
1181 request: Some(
1182 streaming_control_stream_request::Request::RemovePartialGraph(
1183 RemovePartialGraphRequest {
1184 partial_graph_ids: partial_graph_ids.clone(),
1185 database_id: database_id.as_raw_id(),
1186 },
1187 ),
1188 ),
1189 })
1190 .is_err()
1191 {
1192 warn!(worker_id = node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1193 }
1194 })
1195 }
1196
1197 pub(super) fn reset_database(
1198 &mut self,
1199 database_id: DatabaseId,
1200 reset_request_id: u32,
1201 ) -> HashSet<WorkerId> {
1202 self.connected_workers()
1203 .filter_map(|(worker_id, node)| {
1204 if node
1205 .handle
1206 .request_sender
1207 .send(StreamingControlStreamRequest {
1208 request: Some(streaming_control_stream_request::Request::ResetDatabase(
1209 ResetDatabaseRequest {
1210 database_id: database_id.as_raw_id(),
1211 reset_request_id,
1212 },
1213 )),
1214 })
1215 .is_err()
1216 {
1217 warn!(worker_id, node = ?node.host,"failed to send reset database request");
1218 None
1219 } else {
1220 Some(worker_id)
1221 }
1222 })
1223 .collect()
1224 }
1225}
1226
1227impl GlobalBarrierWorkerContextImpl {
1228 pub(super) async fn new_control_stream_impl(
1229 &self,
1230 node: &WorkerNode,
1231 init_request: &PbInitRequest,
1232 ) -> MetaResult<StreamingControlHandle> {
1233 let handle = self
1234 .env
1235 .stream_client_pool()
1236 .get(node)
1237 .await?
1238 .start_streaming_control(init_request.clone())
1239 .await?;
1240 Ok(handle)
1241 }
1242}
1243
1244pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1245 message: &str,
1246 errors: impl IntoIterator<Item = (WorkerId, E)>,
1247) -> MetaError {
1248 use std::fmt::Write;
1249
1250 use risingwave_common::error::error_request_copy;
1251 use risingwave_common::error::tonic::extra::Score;
1252
1253 let errors = errors.into_iter().collect_vec();
1254
1255 if errors.is_empty() {
1256 return anyhow!(message.to_owned()).into();
1257 }
1258
1259 let single_error = |(worker_id, e)| {
1261 anyhow::Error::from(e)
1262 .context(format!("{message}, in worker node {worker_id}"))
1263 .into()
1264 };
1265
1266 if errors.len() == 1 {
1267 return single_error(errors.into_iter().next().unwrap());
1268 }
1269
1270 let max_score = errors
1272 .iter()
1273 .filter_map(|(_, e)| error_request_copy::<Score>(e))
1274 .max();
1275
1276 if let Some(max_score) = max_score {
1277 let mut errors = errors;
1278 let max_scored = errors
1279 .extract_if(.., |(_, e)| {
1280 error_request_copy::<Score>(e) == Some(max_score)
1281 })
1282 .next()
1283 .unwrap();
1284
1285 return single_error(max_scored);
1286 }
1287
1288 let concat: String = errors
1290 .into_iter()
1291 .fold(format!("{message}: "), |mut s, (w, e)| {
1292 write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1293 s
1294 });
1295 anyhow!(concat).into()
1296}