1use std::cmp::max;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::error::Error;
19use std::fmt::{Debug, Formatter};
20use std::future::poll_fn;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::StreamExt;
27use futures::future::join_all;
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
30use risingwave_common::util::epoch::Epoch;
31use risingwave_common::util::tracing::TracingContext;
32use risingwave_connector::source::SplitImpl;
33use risingwave_connector::source::cdc::{
34 CdcTableSnapshotSplitAssignmentWithGeneration,
35 build_pb_actor_cdc_table_snapshot_splits_with_generation,
36};
37use risingwave_meta_model::WorkerId;
38use risingwave_pb::common::{HostAddress, WorkerNode};
39use risingwave_pb::hummock::HummockVersionStats;
40use risingwave_pb::stream_plan::barrier_mutation::Mutation;
41use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation, SubscriptionUpstreamInfo};
42use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
43use risingwave_pb::stream_service::inject_barrier_request::{
44 BuildActorInfo, FragmentBuildActorInfo,
45};
46use risingwave_pb::stream_service::streaming_control_stream_request::{
47 CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph,
48 RemovePartialGraphRequest, ResetDatabaseRequest,
49};
50use risingwave_pb::stream_service::{
51 BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
52 streaming_control_stream_request, streaming_control_stream_response,
53};
54use risingwave_rpc_client::StreamingControlHandle;
55use thiserror_ext::AsReport;
56use tokio::time::sleep;
57use tokio_retry::strategy::ExponentialBackoff;
58use tracing::{debug, error, info, warn};
59use uuid::Uuid;
60
61use super::{BarrierKind, Command, InflightSubscriptionInfo, TracedEpoch};
62use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
63use crate::barrier::checkpoint::{
64 BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
65};
66use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
67use crate::barrier::edge_builder::FragmentEdgeBuildResult;
68use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo};
69use crate::barrier::progress::CreateMviewProgressTracker;
70use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
71use crate::controller::fragment::InflightFragmentInfo;
72use crate::manager::MetaSrvEnv;
73use crate::model::{ActorId, StreamActor, StreamJobActorsToCreate, StreamJobFragments};
74use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
75use crate::{MetaError, MetaResult};
76
77fn to_partial_graph_id(job_id: Option<TableId>) -> u32 {
78 job_id
79 .map(|table| {
80 assert_ne!(table.table_id, u32::MAX);
81 table.table_id
82 })
83 .unwrap_or(u32::MAX)
84}
85
86pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<TableId> {
87 if partial_graph_id == u32::MAX {
88 None
89 } else {
90 Some(TableId::new(partial_graph_id))
91 }
92}
93
94struct ControlStreamNode {
95 worker_id: WorkerId,
96 host: HostAddress,
97 handle: StreamingControlHandle,
98}
99
100pub(super) struct ControlStreamManager {
101 connected_nodes: HashMap<WorkerId, ControlStreamNode>,
102 workers: HashMap<WorkerId, WorkerNode>,
103 pub env: MetaSrvEnv,
104}
105
106impl ControlStreamManager {
107 pub(super) fn new(env: MetaSrvEnv) -> Self {
108 Self {
109 connected_nodes: Default::default(),
110 workers: Default::default(),
111 env,
112 }
113 }
114
115 pub(super) fn is_connected(&self, worker_id: WorkerId) -> bool {
116 self.connected_nodes.contains_key(&worker_id)
117 }
118
119 pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
120 self.workers[&worker_id].host.clone().unwrap()
121 }
122
123 #[await_tree::instrument("try_reconnect_worker({worker_id})")]
124 pub(super) async fn try_reconnect_worker(
125 &mut self,
126 worker_id: WorkerId,
127 inflight_infos: impl Iterator<
128 Item = (
129 DatabaseId,
130 &InflightSubscriptionInfo,
131 impl Iterator<Item = TableId>,
132 ),
133 >,
134 term_id: String,
135 context: &impl GlobalBarrierWorkerContext,
136 ) {
137 if self.connected_nodes.contains_key(&worker_id) {
138 warn!(worker_id, "node already connected");
139 return;
140 }
141 let node = &self.workers[&worker_id];
142 let node_host = node.host.as_ref().unwrap();
143
144 let init_request = Self::collect_init_request(inflight_infos, term_id);
145 match context.new_control_stream(node, &init_request).await {
146 Ok(handle) => {
147 assert!(
148 self.connected_nodes
149 .insert(
150 worker_id,
151 ControlStreamNode {
152 worker_id,
153 host: node.host.clone().unwrap(),
154 handle,
155 }
156 )
157 .is_none()
158 );
159 info!(?node_host, "add control stream worker");
160 }
161 Err(e) => {
162 error!(err = %e.as_report(), ?node_host, "fail to create worker node");
163 }
164 }
165 }
166
167 pub(super) async fn add_worker(
168 &mut self,
169 node: WorkerNode,
170 inflight_infos: impl Iterator<
171 Item = (
172 DatabaseId,
173 &InflightSubscriptionInfo,
174 impl Iterator<Item = TableId>,
175 ),
176 >,
177 term_id: String,
178 context: &impl GlobalBarrierWorkerContext,
179 ) {
180 let node_id = node.id as WorkerId;
181 let node = match self.workers.entry(node_id) {
182 Entry::Occupied(entry) => {
183 let entry = entry.into_mut();
184 assert_eq!(entry.host, node.host);
185 warn!(id = node.id, host = ?node.host, "node already exists");
186 &*entry
187 }
188 Entry::Vacant(entry) => &*entry.insert(node),
189 };
190 if self.connected_nodes.contains_key(&node_id) {
191 warn!(id = node.id, host = ?node.host, "new node already connected");
192 return;
193 }
194 let node_host = node.host.clone().unwrap();
195 let mut backoff = ExponentialBackoff::from_millis(100)
196 .max_delay(Duration::from_secs(3))
197 .factor(5);
198 let init_request = Self::collect_init_request(inflight_infos, term_id);
199 const MAX_RETRY: usize = 5;
200 for i in 1..=MAX_RETRY {
201 match context.new_control_stream(node, &init_request).await {
202 Ok(handle) => {
203 assert!(
204 self.connected_nodes
205 .insert(
206 node_id,
207 ControlStreamNode {
208 worker_id: node.id as _,
209 host: node.host.clone().unwrap(),
210 handle,
211 }
212 )
213 .is_none()
214 );
215 info!(?node_host, "add control stream worker");
216 return;
217 }
218 Err(e) => {
219 let delay = backoff.next().unwrap();
222 error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
223 sleep(delay).await;
224 }
225 }
226 }
227 error!(?node_host, "fail to create worker node after retry");
228 }
229
230 pub(super) async fn reset(
231 &mut self,
232 nodes: &HashMap<WorkerId, WorkerNode>,
233 term_id: String,
234 context: &impl GlobalBarrierWorkerContext,
235 ) -> HashSet<WorkerId> {
236 let init_request = PbInitRequest {
237 databases: vec![],
238 term_id,
239 };
240 let init_request = &init_request;
241 self.workers = nodes.clone();
242 let nodes = join_all(nodes.iter().map(|(worker_id, node)| async move {
243 let result = context.new_control_stream(node, init_request).await;
244 (*worker_id, node.clone(), result)
245 }))
246 .await;
247 self.connected_nodes.clear();
248 let mut failed_workers = HashSet::new();
249 for (worker_id, node, result) in nodes {
250 match result {
251 Ok(handle) => {
252 assert!(
253 self.connected_nodes
254 .insert(
255 worker_id,
256 ControlStreamNode {
257 worker_id: node.id as _,
258 host: node.host.clone().unwrap(),
259 handle
260 }
261 )
262 .is_none()
263 );
264 }
265 Err(e) => {
266 failed_workers.insert(worker_id);
267 warn!(
268 e = %e.as_report(),
269 worker_id,
270 ?node,
271 "failed to connect to node"
272 )
273 }
274 }
275 }
276
277 failed_workers
278 }
279
280 pub(super) fn clear(&mut self) {
282 *self = Self::new(self.env.clone());
283 }
284
285 fn poll_next_response(
286 &mut self,
287 cx: &mut Context<'_>,
288 ) -> Poll<(
289 WorkerId,
290 MetaResult<streaming_control_stream_response::Response>,
291 )> {
292 if self.connected_nodes.is_empty() {
293 return Poll::Pending;
294 }
295 let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending;
296 {
297 for (worker_id, node) in &mut self.connected_nodes {
298 match node.handle.response_stream.poll_next_unpin(cx) {
299 Poll::Ready(result) => {
300 poll_result = Poll::Ready((
301 *worker_id,
302 result
303 .ok_or_else(|| anyhow!("end of stream").into())
304 .and_then(|result| {
305 result.map_err(Into::<MetaError>::into).and_then(|resp| {
306 match resp
307 .response
308 .ok_or_else(||anyhow!("empty response"))?
309 {
310 streaming_control_stream_response::Response::Shutdown(_) => Err(anyhow!(
311 "worker node {worker_id} is shutting down"
312 )
313 .into()),
314 streaming_control_stream_response::Response::Init(_) => {
315 Err(anyhow!("get unexpected init response").into())
317 }
318 resp => Ok(resp),
319 }
320 })
321 })
322 ));
323 break;
324 }
325 Poll::Pending => {
326 continue;
327 }
328 }
329 }
330 };
331
332 if let Poll::Ready((worker_id, Err(err))) = &poll_result {
333 let node = self
334 .connected_nodes
335 .remove(worker_id)
336 .expect("should exist when get shutdown resp");
337 warn!(worker_id = node.worker_id, host = ?node.host, err = %err.as_report(), "get error from response stream");
338 }
339
340 poll_result
341 }
342
343 #[await_tree::instrument("control_stream_next_response")]
344 pub(super) async fn next_response(
345 &mut self,
346 ) -> (
347 WorkerId,
348 MetaResult<streaming_control_stream_response::Response>,
349 ) {
350 poll_fn(|cx| self.poll_next_response(cx)).await
351 }
352
353 fn collect_init_request(
354 initial_inflight_infos: impl Iterator<
355 Item = (
356 DatabaseId,
357 &InflightSubscriptionInfo,
358 impl Iterator<Item = TableId>,
359 ),
360 >,
361 term_id: String,
362 ) -> PbInitRequest {
363 PbInitRequest {
364 databases: initial_inflight_infos
365 .map(|(database_id, subscriptions, creating_job_ids)| {
366 let mut graphs = vec![PbInitialPartialGraph {
367 partial_graph_id: to_partial_graph_id(None),
368 subscriptions: subscriptions.into_iter().collect_vec(),
369 }];
370 graphs.extend(creating_job_ids.map(|job_id| PbInitialPartialGraph {
371 partial_graph_id: to_partial_graph_id(Some(job_id)),
372 subscriptions: vec![],
373 }));
374 PbDatabaseInitialPartialGraph {
375 database_id: database_id.database_id,
376 graphs,
377 }
378 })
379 .collect(),
380 term_id,
381 }
382 }
383}
384
385pub(super) struct DatabaseInitialBarrierCollector {
386 database_id: DatabaseId,
387 node_to_collect: NodeToCollect,
388 database_state: BarrierWorkerState,
389 create_mview_tracker: CreateMviewProgressTracker,
390 creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
391 committed_epoch: u64,
392 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
393}
394
395impl Debug for DatabaseInitialBarrierCollector {
396 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
397 f.debug_struct("DatabaseInitialBarrierCollector")
398 .field("database_id", &self.database_id)
399 .field("node_to_collect", &self.node_to_collect)
400 .finish()
401 }
402}
403
404impl DatabaseInitialBarrierCollector {
405 pub(super) fn is_collected(&self) -> bool {
406 self.node_to_collect.is_empty()
407 && self
408 .creating_streaming_job_controls
409 .values()
410 .all(|job| job.is_empty())
411 }
412
413 pub(super) fn database_state(
414 &self,
415 ) -> (
416 &BarrierWorkerState,
417 &HashMap<TableId, CreatingStreamingJobControl>,
418 ) {
419 (&self.database_state, &self.creating_streaming_job_controls)
420 }
421
422 pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
423 assert_eq!(self.database_id.database_id, resp.database_id);
424 if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
425 self.creating_streaming_job_controls
426 .get_mut(&creating_job_id)
427 .expect("should exist")
428 .collect(resp);
429 } else {
430 assert_eq!(resp.epoch, self.committed_epoch);
431 assert!(
432 self.node_to_collect
433 .remove(&(resp.worker_id as _))
434 .is_some()
435 );
436 }
437 }
438
439 pub(super) fn finish(self) -> DatabaseCheckpointControl {
440 assert!(self.is_collected());
441 DatabaseCheckpointControl::recovery(
442 self.database_id,
443 self.create_mview_tracker,
444 self.database_state,
445 self.committed_epoch,
446 self.creating_streaming_job_controls,
447 self.cdc_table_backfill_tracker,
448 )
449 }
450
451 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
452 is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
453 && self
454 .creating_streaming_job_controls
455 .values_mut()
456 .all(|job| job.is_valid_after_worker_err(worker_id))
457 }
458}
459
460impl ControlStreamManager {
461 #[expect(clippy::too_many_arguments)]
463 pub(super) fn inject_database_initial_barrier(
464 &mut self,
465 database_id: DatabaseId,
466 jobs: HashMap<TableId, InflightStreamingJobInfo>,
467 state_table_committed_epochs: &mut HashMap<TableId, u64>,
468 state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
469 edges: &mut FragmentEdgeBuildResult,
470 stream_actors: &HashMap<ActorId, StreamActor>,
471 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
472 background_jobs: &mut HashMap<TableId, (String, StreamJobFragments)>,
473 mut subscription_info: InflightSubscriptionInfo,
474 is_paused: bool,
475 hummock_version_stats: &HummockVersionStats,
476 cdc_table_snapshot_split_assignment: &mut CdcTableSnapshotSplitAssignmentWithGeneration,
477 ) -> MetaResult<DatabaseInitialBarrierCollector> {
478 self.add_partial_graph(database_id, None);
479 let source_split_assignments = jobs
480 .values()
481 .flat_map(|job| job.fragment_infos())
482 .flat_map(|info| info.actors.keys())
483 .filter_map(|actor_id| {
484 let actor_id = *actor_id as ActorId;
485 source_splits
486 .remove(&actor_id)
487 .map(|splits| (actor_id, splits))
488 })
489 .collect();
490 let database_cdc_table_snapshot_split_assignment = jobs
491 .values()
492 .flat_map(|job| job.fragment_infos())
493 .flat_map(|info| info.actors.keys())
494 .filter_map(|actor_id| {
495 let actor_id = *actor_id as ActorId;
496 cdc_table_snapshot_split_assignment
497 .splits
498 .remove(&actor_id)
499 .map(|splits| (actor_id, splits))
500 })
501 .collect();
502 let database_cdc_table_snapshot_split_assignment =
503 CdcTableSnapshotSplitAssignmentWithGeneration::new(
504 database_cdc_table_snapshot_split_assignment,
505 cdc_table_snapshot_split_assignment.generation,
506 );
507 let mutation = Mutation::Add(AddMutation {
508 actor_dispatchers: Default::default(),
510 added_actors: Default::default(),
511 actor_splits: build_actor_connector_splits(&source_split_assignments),
512 actor_cdc_table_snapshot_splits:
513 build_pb_actor_cdc_table_snapshot_splits_with_generation(
514 database_cdc_table_snapshot_split_assignment,
515 )
516 .into(),
517 pause: is_paused,
518 subscriptions_to_add: Default::default(),
519 backfill_nodes_to_pause: Default::default(),
521 new_upstream_sinks: Default::default(),
522 });
523
524 fn resolve_jobs_committed_epoch(
525 state_table_committed_epochs: &mut HashMap<TableId, u64>,
526 jobs: impl IntoIterator<Item = &InflightStreamingJobInfo>,
527 ) -> u64 {
528 let mut epochs = jobs
529 .into_iter()
530 .flat_map(InflightStreamingJobInfo::existing_table_ids)
531 .map(|table_id| {
532 (
533 table_id,
534 state_table_committed_epochs
535 .remove(&table_id)
536 .expect("should exist"),
537 )
538 });
539 let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
540 for (table_id, epoch) in epochs {
541 assert_eq!(
542 prev_epoch, epoch,
543 "{} has different committed epoch to {}",
544 first_table_id, table_id
545 );
546 }
547 prev_epoch
548 }
549
550 let mut database_jobs = HashMap::new();
551 let mut snapshot_backfill_jobs = HashMap::new();
552 let mut background_mviews = HashMap::new();
553
554 for (job_id, job) in jobs {
555 if let Some((definition, stream_job_fragments)) = background_jobs.remove(&job_id) {
556 if stream_job_fragments.fragments().any(|fragment| {
557 fragment
558 .fragment_type_mask
559 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
560 }) {
561 debug!(%job_id, definition, "recovered snapshot backfill job");
562 snapshot_backfill_jobs.insert(job_id, (job, definition, stream_job_fragments));
563 } else {
564 database_jobs.insert(job_id, job);
565 background_mviews.insert(job_id, (definition, stream_job_fragments));
566 }
567 } else {
568 database_jobs.insert(job_id, job);
569 }
570 }
571
572 let database_job_log_epochs: HashMap<_, _> = database_jobs
573 .keys()
574 .filter_map(|job_id| {
575 state_table_log_epochs
576 .remove(job_id)
577 .map(|epochs| (*job_id, epochs))
578 })
579 .collect();
580
581 let prev_epoch =
582 resolve_jobs_committed_epoch(state_table_committed_epochs, database_jobs.values());
583 let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
584 let curr_epoch = prev_epoch.next();
586 let barrier_info = BarrierInfo {
587 prev_epoch,
588 curr_epoch,
589 kind: BarrierKind::Initial,
590 };
591
592 let mut ongoing_snapshot_backfill_jobs: HashMap<TableId, _> = HashMap::new();
593 for (job_id, (info, definition, stream_job_fragments)) in snapshot_backfill_jobs {
594 let committed_epoch =
595 resolve_jobs_committed_epoch(state_table_committed_epochs, [&info]);
596 if committed_epoch == barrier_info.prev_epoch() {
597 info!(
598 "recovered creating snapshot backfill job {} catch up with upstream already",
599 job_id
600 );
601 background_mviews
602 .try_insert(job_id, (definition, stream_job_fragments))
603 .expect("non-duplicate");
604 database_jobs
605 .try_insert(job_id, info)
606 .expect("non-duplicate");
607 continue;
608 }
609 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
610 stream_job_fragments
611 .fragments()
612 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
613 )?
614 .0
615 .ok_or_else(|| {
616 anyhow!(
617 "recovered snapshot backfill job {} has no snapshot backfill info",
618 job_id
619 )
620 })?;
621 let mut snapshot_epoch = None;
622 let upstream_table_ids: HashSet<_> = snapshot_backfill_info
623 .upstream_mv_table_id_to_backfill_epoch
624 .keys()
625 .cloned()
626 .collect();
627 for (upstream_table_id, epoch) in
628 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
629 {
630 let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
631 let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
632 if *snapshot_epoch != epoch {
633 return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
634 }
635 }
636 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
637 anyhow!(
638 "snapshot backfill job {} has not set snapshot epoch",
639 job_id
640 )
641 })?;
642 for upstream_table_id in &upstream_table_ids {
643 subscription_info
644 .mv_depended_subscriptions
645 .entry(*upstream_table_id)
646 .or_default()
647 .try_insert(job_id.into(), max(snapshot_epoch, committed_epoch))
648 .expect("non-duplicate");
649 }
650 ongoing_snapshot_backfill_jobs
651 .try_insert(
652 job_id,
653 (
654 info,
655 definition,
656 stream_job_fragments,
657 upstream_table_ids,
658 committed_epoch,
659 snapshot_epoch,
660 ),
661 )
662 .expect("non-duplicated");
663 }
664
665 let node_to_collect = {
666 let node_actors = edges.collect_actors_to_create(database_jobs.values().flatten().map(
667 move |fragment_info| {
668 (
669 fragment_info.fragment_id,
670 &fragment_info.nodes,
671 fragment_info.actors.iter().map(move |(actor_id, actor)| {
672 (
673 stream_actors.get(actor_id).expect("should exist"),
674 actor.worker_id,
675 )
676 }),
677 )
678 },
679 ));
680
681 let node_to_collect = self.inject_barrier(
682 database_id,
683 None,
684 Some(mutation.clone()),
685 &barrier_info,
686 database_jobs.values().flatten(),
687 database_jobs.values().flatten(),
688 Some(node_actors),
689 (&subscription_info).into_iter().collect(),
690 vec![],
691 )?;
692 debug!(
693 ?node_to_collect,
694 database_id = database_id.database_id,
695 "inject initial barrier"
696 );
697 node_to_collect
698 };
699
700 let tracker = CreateMviewProgressTracker::recover(
701 background_mviews
702 .iter()
703 .map(|(table_id, (definition, stream_job_fragments))| {
704 (
705 *table_id,
706 (definition.clone(), stream_job_fragments, Default::default()),
707 )
708 }),
709 hummock_version_stats,
710 );
711
712 let mut creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl> =
713 HashMap::new();
714 for (
715 job_id,
716 (
717 info,
718 definition,
719 stream_job_fragments,
720 upstream_table_ids,
721 committed_epoch,
722 snapshot_epoch,
723 ),
724 ) in ongoing_snapshot_backfill_jobs
725 {
726 let node_actors =
727 edges.collect_actors_to_create(info.fragment_infos().map(move |fragment_info| {
728 (
729 fragment_info.fragment_id,
730 &fragment_info.nodes,
731 fragment_info.actors.iter().map(move |(actor_id, actor)| {
732 (
733 stream_actors.get(actor_id).expect("should exist"),
734 actor.worker_id,
735 )
736 }),
737 )
738 }));
739
740 creating_streaming_job_controls.insert(
741 job_id,
742 CreatingStreamingJobControl::recover(
743 database_id,
744 job_id,
745 definition,
746 upstream_table_ids,
747 &database_job_log_epochs,
748 snapshot_epoch,
749 committed_epoch,
750 barrier_info.curr_epoch.value().0,
751 info,
752 stream_job_fragments,
753 hummock_version_stats,
754 node_actors,
755 mutation.clone(),
756 self,
757 )?,
758 );
759 }
760
761 self.env.shared_actor_infos().recover_database(
762 database_id,
763 database_jobs.values().flatten().chain(
764 creating_streaming_job_controls
765 .values()
766 .flat_map(|job| job.graph_info().fragment_infos()),
767 ),
768 );
769
770 let committed_epoch = barrier_info.prev_epoch();
771 let new_epoch = barrier_info.curr_epoch;
772 let database_state = BarrierWorkerState::recovery(
773 database_id,
774 self.env.shared_actor_infos().clone(),
775 new_epoch,
776 database_jobs.into_values(),
777 subscription_info,
778 is_paused,
779 );
780 let cdc_table_backfill_tracker = self.env.cdc_table_backfill_tracker();
781 Ok(DatabaseInitialBarrierCollector {
782 database_id,
783 node_to_collect,
784 database_state,
785 create_mview_tracker: tracker,
786 creating_streaming_job_controls,
787 committed_epoch,
788 cdc_table_backfill_tracker,
789 })
790 }
791
792 pub(super) fn inject_command_ctx_barrier(
793 &mut self,
794 database_id: DatabaseId,
795 command: Option<&Command>,
796 barrier_info: &BarrierInfo,
797 is_paused: bool,
798 pre_applied_graph_info: &InflightDatabaseInfo,
799 applied_graph_info: &InflightDatabaseInfo,
800 edges: &mut Option<FragmentEdgeBuildResult>,
801 ) -> MetaResult<NodeToCollect> {
802 let mutation = command.and_then(|c| c.to_mutation(is_paused, edges, self));
803 let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation {
804 add.subscriptions_to_add.clone()
805 } else {
806 vec![]
807 };
808 let subscriptions_to_remove = if let Some(Mutation::DropSubscriptions(drop)) = &mutation {
809 drop.info.clone()
810 } else {
811 vec![]
812 };
813 self.inject_barrier(
814 database_id,
815 None,
816 mutation,
817 barrier_info,
818 pre_applied_graph_info.fragment_infos(),
819 applied_graph_info.fragment_infos(),
820 command
821 .as_ref()
822 .map(|command| command.actors_to_create(pre_applied_graph_info, edges, self))
823 .unwrap_or_default(),
824 subscriptions_to_add,
825 subscriptions_to_remove,
826 )
827 }
828
829 pub(super) fn inject_barrier<'a>(
830 &mut self,
831 database_id: DatabaseId,
832 creating_table_id: Option<TableId>,
833 mutation: Option<Mutation>,
834 barrier_info: &BarrierInfo,
835 pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
836 applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
837 mut new_actors: Option<StreamJobActorsToCreate>,
838 subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
839 subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
840 ) -> MetaResult<NodeToCollect> {
841 fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
842 "inject_barrier_err"
843 ));
844
845 let partial_graph_id = to_partial_graph_id(creating_table_id);
846
847 let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
848
849 for worker_id in node_actors.keys() {
850 if !self.connected_nodes.contains_key(worker_id) {
851 return Err(anyhow!("unconnected worker node {}", worker_id).into());
852 }
853 }
854
855 let table_ids_to_sync: HashSet<_> =
856 InflightFragmentInfo::existing_table_ids(applied_graph_info)
857 .map(|table_id| table_id.table_id)
858 .collect();
859
860 let mut node_need_collect = HashMap::new();
861
862 self.connected_nodes
863 .iter()
864 .try_for_each(|(node_id, node)| {
865 let actor_ids_to_collect = node_actors
866 .get(node_id)
867 .map(|actors| actors.iter().cloned())
868 .into_iter()
869 .flatten()
870 .collect_vec();
871 let is_empty = actor_ids_to_collect.is_empty();
872 {
873 let mutation = mutation.clone();
874 let barrier = Barrier {
875 epoch: Some(risingwave_pb::data::Epoch {
876 curr: barrier_info.curr_epoch.value().0,
877 prev: barrier_info.prev_epoch(),
878 }),
879 mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
880 tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
881 .to_protobuf(),
882 kind: barrier_info.kind.to_protobuf() as i32,
883 passed_actors: vec![],
884 };
885
886 node.handle
887 .request_sender
888 .send(StreamingControlStreamRequest {
889 request: Some(
890 streaming_control_stream_request::Request::InjectBarrier(
891 InjectBarrierRequest {
892 request_id: Uuid::new_v4().to_string(),
893 barrier: Some(barrier),
894 database_id: database_id.database_id,
895 actor_ids_to_collect,
896 table_ids_to_sync: table_ids_to_sync
897 .iter()
898 .cloned()
899 .collect(),
900 partial_graph_id,
901 actors_to_build: new_actors
902 .as_mut()
903 .map(|new_actors| new_actors.remove(&(*node_id as _)))
904 .into_iter()
905 .flatten()
906 .flatten()
907 .map(|(fragment_id, (node, actors))| {
908 FragmentBuildActorInfo {
909 fragment_id,
910 node: Some(node),
911 actors: actors
912 .into_iter()
913 .map(|(actor, upstreams, dispatchers)| {
914 BuildActorInfo {
915 actor_id: actor.actor_id,
916 fragment_upstreams: upstreams
917 .into_iter()
918 .map(|(fragment_id, upstreams)| {
919 (
920 fragment_id,
921 UpstreamActors {
922 actors: upstreams
923 .into_values()
924 .collect(),
925 },
926 )
927 })
928 .collect(),
929 dispatchers,
930 vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
931 mview_definition: actor.mview_definition,
932 expr_context: actor.expr_context,
933 }
934 })
935 .collect(),
936 }
937 })
938 .collect(),
939 subscriptions_to_add: subscriptions_to_add.clone(),
940 subscriptions_to_remove: subscriptions_to_remove.clone(),
941 },
942 ),
943 ),
944 })
945 .map_err(|_| {
946 MetaError::from(anyhow!(
947 "failed to send request to {} {:?}",
948 node.worker_id,
949 node.host
950 ))
951 })?;
952
953 node_need_collect.insert(*node_id as WorkerId, is_empty);
954 Result::<_, MetaError>::Ok(())
955 }
956 })
957 .inspect_err(|e| {
958 use risingwave_pb::meta::event_log;
960 let event = event_log::EventInjectBarrierFail {
961 prev_epoch: barrier_info.prev_epoch(),
962 cur_epoch: barrier_info.curr_epoch.value().0,
963 error: e.to_report_string(),
964 };
965 self.env
966 .event_log_manager_ref()
967 .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
968 })?;
969 Ok(node_need_collect)
970 }
971
972 pub(super) fn add_partial_graph(
973 &mut self,
974 database_id: DatabaseId,
975 creating_job_id: Option<TableId>,
976 ) {
977 let partial_graph_id = to_partial_graph_id(creating_job_id);
978 self.connected_nodes.iter().for_each(|(_, node)| {
979 if node
980 .handle
981 .request_sender
982 .send(StreamingControlStreamRequest {
983 request: Some(
984 streaming_control_stream_request::Request::CreatePartialGraph(
985 CreatePartialGraphRequest {
986 database_id: database_id.database_id,
987 partial_graph_id,
988 },
989 ),
990 ),
991 }).is_err() {
992 warn!(%database_id, ?creating_job_id, worker_id = node.worker_id, "fail to add partial graph to worker")
993 }
994 });
995 }
996
997 pub(super) fn remove_partial_graph(
998 &mut self,
999 database_id: DatabaseId,
1000 creating_job_ids: Vec<TableId>,
1001 ) {
1002 if creating_job_ids.is_empty() {
1003 return;
1004 }
1005 let partial_graph_ids = creating_job_ids
1006 .into_iter()
1007 .map(|job_id| to_partial_graph_id(Some(job_id)))
1008 .collect_vec();
1009 self.connected_nodes.iter().for_each(|(_, node)| {
1010 if node.handle
1011 .request_sender
1012 .send(StreamingControlStreamRequest {
1013 request: Some(
1014 streaming_control_stream_request::Request::RemovePartialGraph(
1015 RemovePartialGraphRequest {
1016 partial_graph_ids: partial_graph_ids.clone(),
1017 database_id: database_id.database_id,
1018 },
1019 ),
1020 ),
1021 })
1022 .is_err()
1023 {
1024 warn!(worker_id = node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1025 }
1026 })
1027 }
1028
1029 pub(super) fn reset_database(
1030 &mut self,
1031 database_id: DatabaseId,
1032 reset_request_id: u32,
1033 ) -> HashSet<WorkerId> {
1034 self.connected_nodes
1035 .iter()
1036 .filter_map(|(worker_id, node)| {
1037 if node
1038 .handle
1039 .request_sender
1040 .send(StreamingControlStreamRequest {
1041 request: Some(streaming_control_stream_request::Request::ResetDatabase(
1042 ResetDatabaseRequest {
1043 database_id: database_id.database_id,
1044 reset_request_id,
1045 },
1046 )),
1047 })
1048 .is_err()
1049 {
1050 warn!(worker_id, node = ?node.host,"failed to send reset database request");
1051 None
1052 } else {
1053 Some(*worker_id)
1054 }
1055 })
1056 .collect()
1057 }
1058}
1059
1060impl GlobalBarrierWorkerContextImpl {
1061 pub(super) async fn new_control_stream_impl(
1062 &self,
1063 node: &WorkerNode,
1064 init_request: &PbInitRequest,
1065 ) -> MetaResult<StreamingControlHandle> {
1066 let handle = self
1067 .env
1068 .stream_client_pool()
1069 .get(node)
1070 .await?
1071 .start_streaming_control(init_request.clone())
1072 .await?;
1073 Ok(handle)
1074 }
1075}
1076
1077pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1078 message: &str,
1079 errors: impl IntoIterator<Item = (WorkerId, E)>,
1080) -> MetaError {
1081 use std::fmt::Write;
1082
1083 use risingwave_common::error::error_request_copy;
1084 use risingwave_common::error::tonic::extra::Score;
1085
1086 let errors = errors.into_iter().collect_vec();
1087
1088 if errors.is_empty() {
1089 return anyhow!(message.to_owned()).into();
1090 }
1091
1092 let single_error = |(worker_id, e)| {
1094 anyhow::Error::from(e)
1095 .context(format!("{message}, in worker node {worker_id}"))
1096 .into()
1097 };
1098
1099 if errors.len() == 1 {
1100 return single_error(errors.into_iter().next().unwrap());
1101 }
1102
1103 let max_score = errors
1105 .iter()
1106 .filter_map(|(_, e)| error_request_copy::<Score>(e))
1107 .max();
1108
1109 if let Some(max_score) = max_score {
1110 let mut errors = errors;
1111 let max_scored = errors
1112 .extract_if(.., |(_, e)| {
1113 error_request_copy::<Score>(e) == Some(max_score)
1114 })
1115 .next()
1116 .unwrap();
1117
1118 return single_error(max_scored);
1119 }
1120
1121 let concat: String = errors
1123 .into_iter()
1124 .fold(format!("{message}: "), |mut s, (w, e)| {
1125 write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1126 s
1127 });
1128 anyhow!(concat).into()
1129}