1use std::cmp::max;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::error::Error;
19use std::fmt::{Debug, Formatter};
20use std::future::poll_fn;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::StreamExt;
27use futures::future::join_all;
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
30use risingwave_common::util::epoch::Epoch;
31use risingwave_common::util::tracing::TracingContext;
32use risingwave_connector::source::SplitImpl;
33use risingwave_connector::source::cdc::{
34 CdcTableSnapshotSplitAssignment, build_pb_actor_cdc_table_snapshot_splits,
35};
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::common::{HostAddress, WorkerNode};
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::stream_plan::barrier_mutation::Mutation;
40use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation, SubscriptionUpstreamInfo};
41use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
42use risingwave_pb::stream_service::inject_barrier_request::{
43 BuildActorInfo, FragmentBuildActorInfo,
44};
45use risingwave_pb::stream_service::streaming_control_stream_request::{
46 CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph,
47 RemovePartialGraphRequest, ResetDatabaseRequest,
48};
49use risingwave_pb::stream_service::{
50 BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
51 streaming_control_stream_request, streaming_control_stream_response,
52};
53use risingwave_rpc_client::StreamingControlHandle;
54use thiserror_ext::AsReport;
55use tokio::time::sleep;
56use tokio_retry::strategy::ExponentialBackoff;
57use tracing::{debug, error, info, warn};
58use uuid::Uuid;
59
60use super::{BarrierKind, Command, InflightSubscriptionInfo, TracedEpoch};
61use crate::barrier::checkpoint::{
62 BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
63};
64use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
65use crate::barrier::edge_builder::FragmentEdgeBuildResult;
66use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo};
67use crate::barrier::progress::CreateMviewProgressTracker;
68use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
69use crate::controller::fragment::InflightFragmentInfo;
70use crate::manager::MetaSrvEnv;
71use crate::model::{ActorId, StreamActor, StreamJobActorsToCreate, StreamJobFragments};
72use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
73use crate::{MetaError, MetaResult};
74
75fn to_partial_graph_id(job_id: Option<TableId>) -> u32 {
76 job_id
77 .map(|table| {
78 assert_ne!(table.table_id, u32::MAX);
79 table.table_id
80 })
81 .unwrap_or(u32::MAX)
82}
83
84pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<TableId> {
85 if partial_graph_id == u32::MAX {
86 None
87 } else {
88 Some(TableId::new(partial_graph_id))
89 }
90}
91
92struct ControlStreamNode {
93 worker_id: WorkerId,
94 host: HostAddress,
95 handle: StreamingControlHandle,
96}
97
98pub(super) struct ControlStreamManager {
99 connected_nodes: HashMap<WorkerId, ControlStreamNode>,
100 workers: HashMap<WorkerId, WorkerNode>,
101 env: MetaSrvEnv,
102}
103
104impl ControlStreamManager {
105 pub(super) fn new(env: MetaSrvEnv) -> Self {
106 Self {
107 connected_nodes: Default::default(),
108 workers: Default::default(),
109 env,
110 }
111 }
112
113 pub(super) fn is_connected(&self, worker_id: WorkerId) -> bool {
114 self.connected_nodes.contains_key(&worker_id)
115 }
116
117 pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
118 self.workers[&worker_id].host.clone().unwrap()
119 }
120
121 #[await_tree::instrument("try_reconnect_worker({worker_id})")]
122 pub(super) async fn try_reconnect_worker(
123 &mut self,
124 worker_id: WorkerId,
125 inflight_infos: impl Iterator<
126 Item = (
127 DatabaseId,
128 &InflightSubscriptionInfo,
129 impl Iterator<Item = TableId>,
130 ),
131 >,
132 term_id: String,
133 context: &impl GlobalBarrierWorkerContext,
134 ) {
135 if self.connected_nodes.contains_key(&worker_id) {
136 warn!(worker_id, "node already connected");
137 return;
138 }
139 let node = &self.workers[&worker_id];
140 let node_host = node.host.as_ref().unwrap();
141
142 let init_request = Self::collect_init_request(inflight_infos, term_id);
143 match context.new_control_stream(node, &init_request).await {
144 Ok(handle) => {
145 assert!(
146 self.connected_nodes
147 .insert(
148 worker_id,
149 ControlStreamNode {
150 worker_id,
151 host: node.host.clone().unwrap(),
152 handle,
153 }
154 )
155 .is_none()
156 );
157 info!(?node_host, "add control stream worker");
158 }
159 Err(e) => {
160 error!(err = %e.as_report(), ?node_host, "fail to create worker node");
161 }
162 }
163 }
164
165 pub(super) async fn add_worker(
166 &mut self,
167 node: WorkerNode,
168 inflight_infos: impl Iterator<
169 Item = (
170 DatabaseId,
171 &InflightSubscriptionInfo,
172 impl Iterator<Item = TableId>,
173 ),
174 >,
175 term_id: String,
176 context: &impl GlobalBarrierWorkerContext,
177 ) {
178 let node_id = node.id as WorkerId;
179 let node = match self.workers.entry(node_id) {
180 Entry::Occupied(entry) => {
181 let entry = entry.into_mut();
182 assert_eq!(entry.host, node.host);
183 warn!(id = node.id, host = ?node.host, "node already exists");
184 &*entry
185 }
186 Entry::Vacant(entry) => &*entry.insert(node),
187 };
188 if self.connected_nodes.contains_key(&node_id) {
189 warn!(id = node.id, host = ?node.host, "new node already connected");
190 return;
191 }
192 let node_host = node.host.clone().unwrap();
193 let mut backoff = ExponentialBackoff::from_millis(100)
194 .max_delay(Duration::from_secs(3))
195 .factor(5);
196 let init_request = Self::collect_init_request(inflight_infos, term_id);
197 const MAX_RETRY: usize = 5;
198 for i in 1..=MAX_RETRY {
199 match context.new_control_stream(node, &init_request).await {
200 Ok(handle) => {
201 assert!(
202 self.connected_nodes
203 .insert(
204 node_id,
205 ControlStreamNode {
206 worker_id: node.id as _,
207 host: node.host.clone().unwrap(),
208 handle,
209 }
210 )
211 .is_none()
212 );
213 info!(?node_host, "add control stream worker");
214 return;
215 }
216 Err(e) => {
217 let delay = backoff.next().unwrap();
220 error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
221 sleep(delay).await;
222 }
223 }
224 }
225 error!(?node_host, "fail to create worker node after retry");
226 }
227
228 pub(super) async fn reset(
229 &mut self,
230 nodes: &HashMap<WorkerId, WorkerNode>,
231 term_id: String,
232 context: &impl GlobalBarrierWorkerContext,
233 ) -> HashSet<WorkerId> {
234 let init_request = PbInitRequest {
235 databases: vec![],
236 term_id,
237 };
238 let init_request = &init_request;
239 self.workers = nodes.clone();
240 let nodes = join_all(nodes.iter().map(|(worker_id, node)| async move {
241 let result = context.new_control_stream(node, init_request).await;
242 (*worker_id, node.clone(), result)
243 }))
244 .await;
245 self.connected_nodes.clear();
246 let mut failed_workers = HashSet::new();
247 for (worker_id, node, result) in nodes {
248 match result {
249 Ok(handle) => {
250 assert!(
251 self.connected_nodes
252 .insert(
253 worker_id,
254 ControlStreamNode {
255 worker_id: node.id as _,
256 host: node.host.clone().unwrap(),
257 handle
258 }
259 )
260 .is_none()
261 );
262 }
263 Err(e) => {
264 failed_workers.insert(worker_id);
265 warn!(
266 e = %e.as_report(),
267 worker_id,
268 ?node,
269 "failed to connect to node"
270 )
271 }
272 }
273 }
274
275 failed_workers
276 }
277
278 pub(super) fn clear(&mut self) {
280 *self = Self::new(self.env.clone());
281 }
282
283 fn poll_next_response(
284 &mut self,
285 cx: &mut Context<'_>,
286 ) -> Poll<(
287 WorkerId,
288 MetaResult<streaming_control_stream_response::Response>,
289 )> {
290 if self.connected_nodes.is_empty() {
291 return Poll::Pending;
292 }
293 let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending;
294 {
295 for (worker_id, node) in &mut self.connected_nodes {
296 match node.handle.response_stream.poll_next_unpin(cx) {
297 Poll::Ready(result) => {
298 poll_result = Poll::Ready((
299 *worker_id,
300 result
301 .ok_or_else(|| anyhow!("end of stream").into())
302 .and_then(|result| {
303 result.map_err(Into::<MetaError>::into).and_then(|resp| {
304 match resp
305 .response
306 .ok_or_else(||anyhow!("empty response"))?
307 {
308 streaming_control_stream_response::Response::Shutdown(_) => Err(anyhow!(
309 "worker node {worker_id} is shutting down"
310 )
311 .into()),
312 streaming_control_stream_response::Response::Init(_) => {
313 Err(anyhow!("get unexpected init response").into())
315 }
316 resp => Ok(resp),
317 }
318 })
319 })
320 ));
321 break;
322 }
323 Poll::Pending => {
324 continue;
325 }
326 }
327 }
328 };
329
330 if let Poll::Ready((worker_id, Err(err))) = &poll_result {
331 let node = self
332 .connected_nodes
333 .remove(worker_id)
334 .expect("should exist when get shutdown resp");
335 warn!(worker_id = node.worker_id, host = ?node.host, err = %err.as_report(), "get error from response stream");
336 }
337
338 poll_result
339 }
340
341 #[await_tree::instrument("control_stream_next_response")]
342 pub(super) async fn next_response(
343 &mut self,
344 ) -> (
345 WorkerId,
346 MetaResult<streaming_control_stream_response::Response>,
347 ) {
348 poll_fn(|cx| self.poll_next_response(cx)).await
349 }
350
351 fn collect_init_request(
352 initial_inflight_infos: impl Iterator<
353 Item = (
354 DatabaseId,
355 &InflightSubscriptionInfo,
356 impl Iterator<Item = TableId>,
357 ),
358 >,
359 term_id: String,
360 ) -> PbInitRequest {
361 PbInitRequest {
362 databases: initial_inflight_infos
363 .map(|(database_id, subscriptions, creating_job_ids)| {
364 let mut graphs = vec![PbInitialPartialGraph {
365 partial_graph_id: to_partial_graph_id(None),
366 subscriptions: subscriptions.into_iter().collect_vec(),
367 }];
368 graphs.extend(creating_job_ids.map(|job_id| PbInitialPartialGraph {
369 partial_graph_id: to_partial_graph_id(Some(job_id)),
370 subscriptions: vec![],
371 }));
372 PbDatabaseInitialPartialGraph {
373 database_id: database_id.database_id,
374 graphs,
375 }
376 })
377 .collect(),
378 term_id,
379 }
380 }
381}
382
383pub(super) struct DatabaseInitialBarrierCollector {
384 database_id: DatabaseId,
385 node_to_collect: NodeToCollect,
386 database_state: BarrierWorkerState,
387 create_mview_tracker: CreateMviewProgressTracker,
388 creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
389 committed_epoch: u64,
390}
391
392impl Debug for DatabaseInitialBarrierCollector {
393 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
394 f.debug_struct("DatabaseInitialBarrierCollector")
395 .field("database_id", &self.database_id)
396 .field("node_to_collect", &self.node_to_collect)
397 .finish()
398 }
399}
400
401impl DatabaseInitialBarrierCollector {
402 pub(super) fn is_collected(&self) -> bool {
403 self.node_to_collect.is_empty()
404 && self
405 .creating_streaming_job_controls
406 .values()
407 .all(|job| job.is_empty())
408 }
409
410 pub(super) fn database_state(
411 &self,
412 ) -> (
413 &BarrierWorkerState,
414 &HashMap<TableId, CreatingStreamingJobControl>,
415 ) {
416 (&self.database_state, &self.creating_streaming_job_controls)
417 }
418
419 pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
420 assert_eq!(self.database_id.database_id, resp.database_id);
421 if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
422 self.creating_streaming_job_controls
423 .get_mut(&creating_job_id)
424 .expect("should exist")
425 .collect(resp);
426 } else {
427 assert_eq!(resp.epoch, self.committed_epoch);
428 assert!(
429 self.node_to_collect
430 .remove(&(resp.worker_id as _))
431 .is_some()
432 );
433 }
434 }
435
436 pub(super) fn finish(self) -> DatabaseCheckpointControl {
437 assert!(self.is_collected());
438 DatabaseCheckpointControl::recovery(
439 self.database_id,
440 self.create_mview_tracker,
441 self.database_state,
442 self.committed_epoch,
443 self.creating_streaming_job_controls,
444 )
445 }
446
447 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
448 is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
449 && self
450 .creating_streaming_job_controls
451 .values_mut()
452 .all(|job| job.is_valid_after_worker_err(worker_id))
453 }
454}
455
456impl ControlStreamManager {
457 #[expect(clippy::too_many_arguments)]
459 pub(super) fn inject_database_initial_barrier(
460 &mut self,
461 database_id: DatabaseId,
462 jobs: HashMap<TableId, InflightStreamingJobInfo>,
463 state_table_committed_epochs: &mut HashMap<TableId, u64>,
464 state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
465 edges: &mut FragmentEdgeBuildResult,
466 stream_actors: &HashMap<ActorId, StreamActor>,
467 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
468 background_jobs: &mut HashMap<TableId, (String, StreamJobFragments)>,
469 mut subscription_info: InflightSubscriptionInfo,
470 is_paused: bool,
471 hummock_version_stats: &HummockVersionStats,
472 cdc_table_snapshot_split_assignment: &mut CdcTableSnapshotSplitAssignment,
473 ) -> MetaResult<DatabaseInitialBarrierCollector> {
474 self.add_partial_graph(database_id, None);
475 let source_split_assignments = jobs
476 .values()
477 .flat_map(|job| job.fragment_infos())
478 .flat_map(|info| info.actors.keys())
479 .filter_map(|actor_id| {
480 let actor_id = *actor_id as ActorId;
481 source_splits
482 .remove(&actor_id)
483 .map(|splits| (actor_id, splits))
484 })
485 .collect();
486 let database_cdc_table_snapshot_split_assignment = jobs
487 .values()
488 .flat_map(|job| job.fragment_infos())
489 .flat_map(|info| info.actors.keys())
490 .filter_map(|actor_id| {
491 let actor_id = *actor_id as ActorId;
492 cdc_table_snapshot_split_assignment
493 .remove(&actor_id)
494 .map(|splits| (actor_id, splits))
495 })
496 .collect();
497 let mutation = Mutation::Add(AddMutation {
498 actor_dispatchers: Default::default(),
500 added_actors: Default::default(),
501 actor_splits: build_actor_connector_splits(&source_split_assignments),
502 actor_cdc_table_snapshot_splits: build_pb_actor_cdc_table_snapshot_splits(
503 database_cdc_table_snapshot_split_assignment,
504 ),
505 pause: is_paused,
506 subscriptions_to_add: Default::default(),
507 backfill_nodes_to_pause: Default::default(),
509 });
510
511 fn resolve_jobs_committed_epoch(
512 state_table_committed_epochs: &mut HashMap<TableId, u64>,
513 jobs: impl IntoIterator<Item = &InflightStreamingJobInfo>,
514 ) -> u64 {
515 let mut epochs = jobs
516 .into_iter()
517 .flat_map(InflightStreamingJobInfo::existing_table_ids)
518 .map(|table_id| {
519 (
520 table_id,
521 state_table_committed_epochs
522 .remove(&table_id)
523 .expect("should exist"),
524 )
525 });
526 let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
527 for (table_id, epoch) in epochs {
528 assert_eq!(
529 prev_epoch, epoch,
530 "{} has different committed epoch to {}",
531 first_table_id, table_id
532 );
533 }
534 prev_epoch
535 }
536
537 let mut database_jobs = HashMap::new();
538 let mut snapshot_backfill_jobs = HashMap::new();
539 let mut background_mviews = HashMap::new();
540
541 for (job_id, job) in jobs {
542 if let Some((definition, stream_job_fragments)) = background_jobs.remove(&job_id) {
543 if stream_job_fragments.fragments().any(|fragment| {
544 fragment
545 .fragment_type_mask
546 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
547 }) {
548 debug!(%job_id, definition, "recovered snapshot backfill job");
549 snapshot_backfill_jobs.insert(job_id, (job, definition, stream_job_fragments));
550 } else {
551 database_jobs.insert(job_id, job);
552 background_mviews.insert(job_id, (definition, stream_job_fragments));
553 }
554 } else {
555 database_jobs.insert(job_id, job);
556 }
557 }
558
559 let database_job_log_epochs: HashMap<_, _> = database_jobs
560 .keys()
561 .filter_map(|job_id| {
562 state_table_log_epochs
563 .remove(job_id)
564 .map(|epochs| (*job_id, epochs))
565 })
566 .collect();
567
568 let prev_epoch =
569 resolve_jobs_committed_epoch(state_table_committed_epochs, database_jobs.values());
570 let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
571 let curr_epoch = prev_epoch.next();
573 let barrier_info = BarrierInfo {
574 prev_epoch,
575 curr_epoch,
576 kind: BarrierKind::Initial,
577 };
578
579 let mut ongoing_snapshot_backfill_jobs: HashMap<TableId, _> = HashMap::new();
580 for (job_id, (info, definition, stream_job_fragments)) in snapshot_backfill_jobs {
581 let committed_epoch =
582 resolve_jobs_committed_epoch(state_table_committed_epochs, [&info]);
583 if committed_epoch == barrier_info.prev_epoch() {
584 info!(
585 "recovered creating snapshot backfill job {} catch up with upstream already",
586 job_id
587 );
588 background_mviews
589 .try_insert(job_id, (definition, stream_job_fragments))
590 .expect("non-duplicate");
591 database_jobs
592 .try_insert(job_id, info)
593 .expect("non-duplicate");
594 continue;
595 }
596 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
597 stream_job_fragments
598 .fragments()
599 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
600 )?
601 .0
602 .ok_or_else(|| {
603 anyhow!(
604 "recovered snapshot backfill job {} has no snapshot backfill info",
605 job_id
606 )
607 })?;
608 let mut snapshot_epoch = None;
609 let upstream_table_ids: HashSet<_> = snapshot_backfill_info
610 .upstream_mv_table_id_to_backfill_epoch
611 .keys()
612 .cloned()
613 .collect();
614 for (upstream_table_id, epoch) in
615 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
616 {
617 let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
618 let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
619 if *snapshot_epoch != epoch {
620 return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
621 }
622 }
623 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
624 anyhow!(
625 "snapshot backfill job {} has not set snapshot epoch",
626 job_id
627 )
628 })?;
629 for upstream_table_id in &upstream_table_ids {
630 subscription_info
631 .mv_depended_subscriptions
632 .entry(*upstream_table_id)
633 .or_default()
634 .try_insert(job_id.into(), max(snapshot_epoch, committed_epoch))
635 .expect("non-duplicate");
636 }
637 ongoing_snapshot_backfill_jobs
638 .try_insert(
639 job_id,
640 (
641 info,
642 definition,
643 stream_job_fragments,
644 upstream_table_ids,
645 committed_epoch,
646 snapshot_epoch,
647 ),
648 )
649 .expect("non-duplicated");
650 }
651
652 let node_to_collect = {
653 let node_actors = edges.collect_actors_to_create(database_jobs.values().flatten().map(
654 move |fragment_info| {
655 (
656 fragment_info.fragment_id,
657 &fragment_info.nodes,
658 fragment_info.actors.iter().map(move |(actor_id, actor)| {
659 (
660 stream_actors.get(actor_id).expect("should exist"),
661 actor.worker_id,
662 )
663 }),
664 )
665 },
666 ));
667
668 let node_to_collect = self.inject_barrier(
669 database_id,
670 None,
671 Some(mutation.clone()),
672 &barrier_info,
673 database_jobs.values().flatten(),
674 database_jobs.values().flatten(),
675 Some(node_actors),
676 (&subscription_info).into_iter().collect(),
677 vec![],
678 )?;
679 debug!(
680 ?node_to_collect,
681 database_id = database_id.database_id,
682 "inject initial barrier"
683 );
684 node_to_collect
685 };
686
687 let tracker = CreateMviewProgressTracker::recover(
688 background_mviews
689 .iter()
690 .map(|(table_id, (definition, stream_job_fragments))| {
691 (
692 *table_id,
693 (definition.clone(), stream_job_fragments, Default::default()),
694 )
695 }),
696 hummock_version_stats,
697 );
698
699 let mut creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl> =
700 HashMap::new();
701 for (
702 job_id,
703 (
704 info,
705 definition,
706 stream_job_fragments,
707 upstream_table_ids,
708 committed_epoch,
709 snapshot_epoch,
710 ),
711 ) in ongoing_snapshot_backfill_jobs
712 {
713 let node_actors =
714 edges.collect_actors_to_create(info.fragment_infos().map(move |fragment_info| {
715 (
716 fragment_info.fragment_id,
717 &fragment_info.nodes,
718 fragment_info.actors.iter().map(move |(actor_id, actor)| {
719 (
720 stream_actors.get(actor_id).expect("should exist"),
721 actor.worker_id,
722 )
723 }),
724 )
725 }));
726
727 creating_streaming_job_controls.insert(
728 job_id,
729 CreatingStreamingJobControl::recover(
730 database_id,
731 job_id,
732 definition,
733 upstream_table_ids,
734 &database_job_log_epochs,
735 snapshot_epoch,
736 committed_epoch,
737 barrier_info.curr_epoch.value().0,
738 info,
739 stream_job_fragments,
740 hummock_version_stats,
741 node_actors,
742 mutation.clone(),
743 self,
744 )?,
745 );
746 }
747
748 self.env.shared_actor_infos().recover_database(
749 database_id,
750 database_jobs.values().flatten().chain(
751 creating_streaming_job_controls
752 .values()
753 .flat_map(|job| job.graph_info().fragment_infos()),
754 ),
755 );
756
757 let committed_epoch = barrier_info.prev_epoch();
758 let new_epoch = barrier_info.curr_epoch;
759 let database_state = BarrierWorkerState::recovery(
760 database_id,
761 self.env.shared_actor_infos().clone(),
762 new_epoch,
763 database_jobs.into_values(),
764 subscription_info,
765 is_paused,
766 );
767 Ok(DatabaseInitialBarrierCollector {
768 database_id,
769 node_to_collect,
770 database_state,
771 create_mview_tracker: tracker,
772 creating_streaming_job_controls,
773 committed_epoch,
774 })
775 }
776
777 pub(super) fn inject_command_ctx_barrier(
778 &mut self,
779 database_id: DatabaseId,
780 command: Option<&Command>,
781 barrier_info: &BarrierInfo,
782 is_paused: bool,
783 pre_applied_graph_info: &InflightDatabaseInfo,
784 applied_graph_info: &InflightDatabaseInfo,
785 edges: &mut Option<FragmentEdgeBuildResult>,
786 ) -> MetaResult<NodeToCollect> {
787 let mutation = command.and_then(|c| c.to_mutation(is_paused, edges, self));
788 let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation {
789 add.subscriptions_to_add.clone()
790 } else {
791 vec![]
792 };
793 let subscriptions_to_remove = if let Some(Mutation::DropSubscriptions(drop)) = &mutation {
794 drop.info.clone()
795 } else {
796 vec![]
797 };
798 self.inject_barrier(
799 database_id,
800 None,
801 mutation,
802 barrier_info,
803 pre_applied_graph_info.fragment_infos(),
804 applied_graph_info.fragment_infos(),
805 command
806 .as_ref()
807 .map(|command| command.actors_to_create(pre_applied_graph_info, edges, self))
808 .unwrap_or_default(),
809 subscriptions_to_add,
810 subscriptions_to_remove,
811 )
812 }
813
814 pub(super) fn inject_barrier<'a>(
815 &mut self,
816 database_id: DatabaseId,
817 creating_table_id: Option<TableId>,
818 mutation: Option<Mutation>,
819 barrier_info: &BarrierInfo,
820 pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
821 applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
822 mut new_actors: Option<StreamJobActorsToCreate>,
823 subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
824 subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
825 ) -> MetaResult<NodeToCollect> {
826 fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
827 "inject_barrier_err"
828 ));
829
830 let partial_graph_id = to_partial_graph_id(creating_table_id);
831
832 let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
833
834 for worker_id in node_actors.keys() {
835 if !self.connected_nodes.contains_key(worker_id) {
836 return Err(anyhow!("unconnected worker node {}", worker_id).into());
837 }
838 }
839
840 let table_ids_to_sync: HashSet<_> =
841 InflightFragmentInfo::existing_table_ids(applied_graph_info)
842 .map(|table_id| table_id.table_id)
843 .collect();
844
845 let mut node_need_collect = HashMap::new();
846
847 self.connected_nodes
848 .iter()
849 .try_for_each(|(node_id, node)| {
850 let actor_ids_to_collect = node_actors
851 .get(node_id)
852 .map(|actors| actors.iter().cloned())
853 .into_iter()
854 .flatten()
855 .collect_vec();
856 let is_empty = actor_ids_to_collect.is_empty();
857 {
858 let mutation = mutation.clone();
859 let barrier = Barrier {
860 epoch: Some(risingwave_pb::data::Epoch {
861 curr: barrier_info.curr_epoch.value().0,
862 prev: barrier_info.prev_epoch(),
863 }),
864 mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
865 tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
866 .to_protobuf(),
867 kind: barrier_info.kind.to_protobuf() as i32,
868 passed_actors: vec![],
869 };
870
871 node.handle
872 .request_sender
873 .send(StreamingControlStreamRequest {
874 request: Some(
875 streaming_control_stream_request::Request::InjectBarrier(
876 InjectBarrierRequest {
877 request_id: Uuid::new_v4().to_string(),
878 barrier: Some(barrier),
879 database_id: database_id.database_id,
880 actor_ids_to_collect,
881 table_ids_to_sync: table_ids_to_sync
882 .iter()
883 .cloned()
884 .collect(),
885 partial_graph_id,
886 actors_to_build: new_actors
887 .as_mut()
888 .map(|new_actors| new_actors.remove(&(*node_id as _)))
889 .into_iter()
890 .flatten()
891 .flatten()
892 .map(|(fragment_id, (node, actors))| {
893 FragmentBuildActorInfo {
894 fragment_id,
895 node: Some(node),
896 actors: actors
897 .into_iter()
898 .map(|(actor, upstreams, dispatchers)| {
899 BuildActorInfo {
900 actor_id: actor.actor_id,
901 fragment_upstreams: upstreams
902 .into_iter()
903 .map(|(fragment_id, upstreams)| {
904 (
905 fragment_id,
906 UpstreamActors {
907 actors: upstreams
908 .into_values()
909 .collect(),
910 },
911 )
912 })
913 .collect(),
914 dispatchers,
915 vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
916 mview_definition: actor.mview_definition,
917 expr_context: actor.expr_context,
918 }
919 })
920 .collect(),
921 }
922 })
923 .collect(),
924 subscriptions_to_add: subscriptions_to_add.clone(),
925 subscriptions_to_remove: subscriptions_to_remove.clone(),
926 },
927 ),
928 ),
929 })
930 .map_err(|_| {
931 MetaError::from(anyhow!(
932 "failed to send request to {} {:?}",
933 node.worker_id,
934 node.host
935 ))
936 })?;
937
938 node_need_collect.insert(*node_id as WorkerId, is_empty);
939 Result::<_, MetaError>::Ok(())
940 }
941 })
942 .inspect_err(|e| {
943 use risingwave_pb::meta::event_log;
945 let event = event_log::EventInjectBarrierFail {
946 prev_epoch: barrier_info.prev_epoch(),
947 cur_epoch: barrier_info.curr_epoch.value().0,
948 error: e.to_report_string(),
949 };
950 self.env
951 .event_log_manager_ref()
952 .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
953 })?;
954 Ok(node_need_collect)
955 }
956
957 pub(super) fn add_partial_graph(
958 &mut self,
959 database_id: DatabaseId,
960 creating_job_id: Option<TableId>,
961 ) {
962 let partial_graph_id = to_partial_graph_id(creating_job_id);
963 self.connected_nodes.iter().for_each(|(_, node)| {
964 if node
965 .handle
966 .request_sender
967 .send(StreamingControlStreamRequest {
968 request: Some(
969 streaming_control_stream_request::Request::CreatePartialGraph(
970 CreatePartialGraphRequest {
971 database_id: database_id.database_id,
972 partial_graph_id,
973 },
974 ),
975 ),
976 }).is_err() {
977 warn!(%database_id, ?creating_job_id, worker_id = node.worker_id, "fail to add partial graph to worker")
978 }
979 });
980 }
981
982 pub(super) fn remove_partial_graph(
983 &mut self,
984 database_id: DatabaseId,
985 creating_job_ids: Vec<TableId>,
986 ) {
987 if creating_job_ids.is_empty() {
988 return;
989 }
990 let partial_graph_ids = creating_job_ids
991 .into_iter()
992 .map(|job_id| to_partial_graph_id(Some(job_id)))
993 .collect_vec();
994 self.connected_nodes.iter().for_each(|(_, node)| {
995 if node.handle
996 .request_sender
997 .send(StreamingControlStreamRequest {
998 request: Some(
999 streaming_control_stream_request::Request::RemovePartialGraph(
1000 RemovePartialGraphRequest {
1001 partial_graph_ids: partial_graph_ids.clone(),
1002 database_id: database_id.database_id,
1003 },
1004 ),
1005 ),
1006 })
1007 .is_err()
1008 {
1009 warn!(worker_id = node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1010 }
1011 })
1012 }
1013
1014 pub(super) fn reset_database(
1015 &mut self,
1016 database_id: DatabaseId,
1017 reset_request_id: u32,
1018 ) -> HashSet<WorkerId> {
1019 self.connected_nodes
1020 .iter()
1021 .filter_map(|(worker_id, node)| {
1022 if node
1023 .handle
1024 .request_sender
1025 .send(StreamingControlStreamRequest {
1026 request: Some(streaming_control_stream_request::Request::ResetDatabase(
1027 ResetDatabaseRequest {
1028 database_id: database_id.database_id,
1029 reset_request_id,
1030 },
1031 )),
1032 })
1033 .is_err()
1034 {
1035 warn!(worker_id, node = ?node.host,"failed to send reset database request");
1036 None
1037 } else {
1038 Some(*worker_id)
1039 }
1040 })
1041 .collect()
1042 }
1043}
1044
1045impl GlobalBarrierWorkerContextImpl {
1046 pub(super) async fn new_control_stream_impl(
1047 &self,
1048 node: &WorkerNode,
1049 init_request: &PbInitRequest,
1050 ) -> MetaResult<StreamingControlHandle> {
1051 let handle = self
1052 .env
1053 .stream_client_pool()
1054 .get(node)
1055 .await?
1056 .start_streaming_control(init_request.clone())
1057 .await?;
1058 Ok(handle)
1059 }
1060}
1061
1062pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1063 message: &str,
1064 errors: impl IntoIterator<Item = (WorkerId, E)>,
1065) -> MetaError {
1066 use std::fmt::Write;
1067
1068 use risingwave_common::error::error_request_copy;
1069 use risingwave_common::error::tonic::extra::Score;
1070
1071 let errors = errors.into_iter().collect_vec();
1072
1073 if errors.is_empty() {
1074 return anyhow!(message.to_owned()).into();
1075 }
1076
1077 let single_error = |(worker_id, e)| {
1079 anyhow::Error::from(e)
1080 .context(format!("{message}, in worker node {worker_id}"))
1081 .into()
1082 };
1083
1084 if errors.len() == 1 {
1085 return single_error(errors.into_iter().next().unwrap());
1086 }
1087
1088 let max_score = errors
1090 .iter()
1091 .filter_map(|(_, e)| error_request_copy::<Score>(e))
1092 .max();
1093
1094 if let Some(max_score) = max_score {
1095 let mut errors = errors;
1096 let max_scored = errors
1097 .extract_if(.., |(_, e)| {
1098 error_request_copy::<Score>(e) == Some(max_score)
1099 })
1100 .next()
1101 .unwrap();
1102
1103 return single_error(max_scored);
1104 }
1105
1106 let concat: String = errors
1108 .into_iter()
1109 .fold(format!("{message}: "), |mut s, (w, e)| {
1110 write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1111 s
1112 });
1113 anyhow!(concat).into()
1114}