1use std::cmp::max;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::error::Error;
19use std::fmt::{Debug, Formatter};
20use std::future::poll_fn;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::StreamExt;
27use futures::future::join_all;
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
30use risingwave_common::util::epoch::Epoch;
31use risingwave_common::util::tracing::TracingContext;
32use risingwave_connector::source::SplitImpl;
33use risingwave_meta_model::WorkerId;
34use risingwave_pb::common::{HostAddress, WorkerNode};
35use risingwave_pb::hummock::HummockVersionStats;
36use risingwave_pb::stream_plan::barrier_mutation::Mutation;
37use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation, SubscriptionUpstreamInfo};
38use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
39use risingwave_pb::stream_service::inject_barrier_request::{
40 BuildActorInfo, FragmentBuildActorInfo,
41};
42use risingwave_pb::stream_service::streaming_control_stream_request::{
43 CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph,
44 RemovePartialGraphRequest, ResetDatabaseRequest,
45};
46use risingwave_pb::stream_service::{
47 BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
48 streaming_control_stream_request, streaming_control_stream_response,
49};
50use risingwave_rpc_client::StreamingControlHandle;
51use thiserror_ext::AsReport;
52use tokio::time::sleep;
53use tokio_retry::strategy::ExponentialBackoff;
54use tracing::{debug, error, info, warn};
55use uuid::Uuid;
56
57use super::{BarrierKind, Command, InflightSubscriptionInfo, TracedEpoch};
58use crate::barrier::checkpoint::{
59 BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
60};
61use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
62use crate::barrier::edge_builder::FragmentEdgeBuildResult;
63use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo};
64use crate::barrier::progress::CreateMviewProgressTracker;
65use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
66use crate::controller::fragment::InflightFragmentInfo;
67use crate::manager::MetaSrvEnv;
68use crate::model::{ActorId, StreamActor, StreamJobActorsToCreate, StreamJobFragments};
69use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
70use crate::{MetaError, MetaResult};
71
72fn to_partial_graph_id(job_id: Option<TableId>) -> u32 {
73 job_id
74 .map(|table| {
75 assert_ne!(table.table_id, u32::MAX);
76 table.table_id
77 })
78 .unwrap_or(u32::MAX)
79}
80
81pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<TableId> {
82 if partial_graph_id == u32::MAX {
83 None
84 } else {
85 Some(TableId::new(partial_graph_id))
86 }
87}
88
89struct ControlStreamNode {
90 worker_id: WorkerId,
91 host: HostAddress,
92 handle: StreamingControlHandle,
93}
94
95pub(super) struct ControlStreamManager {
96 connected_nodes: HashMap<WorkerId, ControlStreamNode>,
97 workers: HashMap<WorkerId, WorkerNode>,
98 env: MetaSrvEnv,
99}
100
101impl ControlStreamManager {
102 pub(super) fn new(env: MetaSrvEnv) -> Self {
103 Self {
104 connected_nodes: Default::default(),
105 workers: Default::default(),
106 env,
107 }
108 }
109
110 pub(super) fn is_connected(&self, worker_id: WorkerId) -> bool {
111 self.connected_nodes.contains_key(&worker_id)
112 }
113
114 pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
115 self.workers[&worker_id].host.clone().unwrap()
116 }
117
118 #[await_tree::instrument("try_reconnect_worker({worker_id})")]
119 pub(super) async fn try_reconnect_worker(
120 &mut self,
121 worker_id: WorkerId,
122 inflight_infos: impl Iterator<
123 Item = (
124 DatabaseId,
125 &InflightSubscriptionInfo,
126 impl Iterator<Item = TableId>,
127 ),
128 >,
129 term_id: String,
130 context: &impl GlobalBarrierWorkerContext,
131 ) {
132 if self.connected_nodes.contains_key(&worker_id) {
133 warn!(worker_id, "node already connected");
134 return;
135 }
136 let node = &self.workers[&worker_id];
137 let node_host = node.host.as_ref().unwrap();
138
139 let init_request = Self::collect_init_request(inflight_infos, term_id);
140 match context.new_control_stream(node, &init_request).await {
141 Ok(handle) => {
142 assert!(
143 self.connected_nodes
144 .insert(
145 worker_id,
146 ControlStreamNode {
147 worker_id,
148 host: node.host.clone().unwrap(),
149 handle,
150 }
151 )
152 .is_none()
153 );
154 info!(?node_host, "add control stream worker");
155 }
156 Err(e) => {
157 error!(err = %e.as_report(), ?node_host, "fail to create worker node");
158 }
159 }
160 }
161
162 pub(super) async fn add_worker(
163 &mut self,
164 node: WorkerNode,
165 inflight_infos: impl Iterator<
166 Item = (
167 DatabaseId,
168 &InflightSubscriptionInfo,
169 impl Iterator<Item = TableId>,
170 ),
171 >,
172 term_id: String,
173 context: &impl GlobalBarrierWorkerContext,
174 ) {
175 let node_id = node.id as WorkerId;
176 let node = match self.workers.entry(node_id) {
177 Entry::Occupied(entry) => {
178 let entry = entry.into_mut();
179 assert_eq!(entry.host, node.host);
180 warn!(id = node.id, host = ?node.host, "node already exists");
181 &*entry
182 }
183 Entry::Vacant(entry) => &*entry.insert(node),
184 };
185 if self.connected_nodes.contains_key(&node_id) {
186 warn!(id = node.id, host = ?node.host, "new node already connected");
187 return;
188 }
189 let node_host = node.host.clone().unwrap();
190 let mut backoff = ExponentialBackoff::from_millis(100)
191 .max_delay(Duration::from_secs(3))
192 .factor(5);
193 let init_request = Self::collect_init_request(inflight_infos, term_id);
194 const MAX_RETRY: usize = 5;
195 for i in 1..=MAX_RETRY {
196 match context.new_control_stream(node, &init_request).await {
197 Ok(handle) => {
198 assert!(
199 self.connected_nodes
200 .insert(
201 node_id,
202 ControlStreamNode {
203 worker_id: node.id as _,
204 host: node.host.clone().unwrap(),
205 handle,
206 }
207 )
208 .is_none()
209 );
210 info!(?node_host, "add control stream worker");
211 return;
212 }
213 Err(e) => {
214 let delay = backoff.next().unwrap();
217 error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
218 sleep(delay).await;
219 }
220 }
221 }
222 error!(?node_host, "fail to create worker node after retry");
223 }
224
225 pub(super) async fn reset(
226 &mut self,
227 nodes: &HashMap<WorkerId, WorkerNode>,
228 term_id: String,
229 context: &impl GlobalBarrierWorkerContext,
230 ) -> HashSet<WorkerId> {
231 let init_request = PbInitRequest {
232 databases: vec![],
233 term_id,
234 };
235 let init_request = &init_request;
236 self.workers = nodes.clone();
237 let nodes = join_all(nodes.iter().map(|(worker_id, node)| async move {
238 let result = context.new_control_stream(node, init_request).await;
239 (*worker_id, node.clone(), result)
240 }))
241 .await;
242 self.connected_nodes.clear();
243 let mut failed_workers = HashSet::new();
244 for (worker_id, node, result) in nodes {
245 match result {
246 Ok(handle) => {
247 assert!(
248 self.connected_nodes
249 .insert(
250 worker_id,
251 ControlStreamNode {
252 worker_id: node.id as _,
253 host: node.host.clone().unwrap(),
254 handle
255 }
256 )
257 .is_none()
258 );
259 }
260 Err(e) => {
261 failed_workers.insert(worker_id);
262 warn!(
263 e = %e.as_report(),
264 worker_id,
265 ?node,
266 "failed to connect to node"
267 )
268 }
269 }
270 }
271
272 failed_workers
273 }
274
275 pub(super) fn clear(&mut self) {
277 *self = Self::new(self.env.clone());
278 }
279
280 fn poll_next_response(
281 &mut self,
282 cx: &mut Context<'_>,
283 ) -> Poll<(
284 WorkerId,
285 MetaResult<streaming_control_stream_response::Response>,
286 )> {
287 if self.connected_nodes.is_empty() {
288 return Poll::Pending;
289 }
290 let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending;
291 {
292 for (worker_id, node) in &mut self.connected_nodes {
293 match node.handle.response_stream.poll_next_unpin(cx) {
294 Poll::Ready(result) => {
295 poll_result = Poll::Ready((
296 *worker_id,
297 result
298 .ok_or_else(|| anyhow!("end of stream").into())
299 .and_then(|result| {
300 result.map_err(Into::<MetaError>::into).and_then(|resp| {
301 match resp
302 .response
303 .ok_or_else(||anyhow!("empty response"))?
304 {
305 streaming_control_stream_response::Response::Shutdown(_) => Err(anyhow!(
306 "worker node {worker_id} is shutting down"
307 )
308 .into()),
309 streaming_control_stream_response::Response::Init(_) => {
310 Err(anyhow!("get unexpected init response").into())
312 }
313 resp => Ok(resp),
314 }
315 })
316 })
317 ));
318 break;
319 }
320 Poll::Pending => {
321 continue;
322 }
323 }
324 }
325 };
326
327 if let Poll::Ready((worker_id, Err(err))) = &poll_result {
328 let node = self
329 .connected_nodes
330 .remove(worker_id)
331 .expect("should exist when get shutdown resp");
332 warn!(worker_id = node.worker_id, host = ?node.host, err = %err.as_report(), "get error from response stream");
333 }
334
335 poll_result
336 }
337
338 #[await_tree::instrument("control_stream_next_response")]
339 pub(super) async fn next_response(
340 &mut self,
341 ) -> (
342 WorkerId,
343 MetaResult<streaming_control_stream_response::Response>,
344 ) {
345 poll_fn(|cx| self.poll_next_response(cx)).await
346 }
347
348 fn collect_init_request(
349 initial_inflight_infos: impl Iterator<
350 Item = (
351 DatabaseId,
352 &InflightSubscriptionInfo,
353 impl Iterator<Item = TableId>,
354 ),
355 >,
356 term_id: String,
357 ) -> PbInitRequest {
358 PbInitRequest {
359 databases: initial_inflight_infos
360 .map(|(database_id, subscriptions, creating_job_ids)| {
361 let mut graphs = vec![PbInitialPartialGraph {
362 partial_graph_id: to_partial_graph_id(None),
363 subscriptions: subscriptions.into_iter().collect_vec(),
364 }];
365 graphs.extend(creating_job_ids.map(|job_id| PbInitialPartialGraph {
366 partial_graph_id: to_partial_graph_id(Some(job_id)),
367 subscriptions: vec![],
368 }));
369 PbDatabaseInitialPartialGraph {
370 database_id: database_id.database_id,
371 graphs,
372 }
373 })
374 .collect(),
375 term_id,
376 }
377 }
378}
379
380pub(super) struct DatabaseInitialBarrierCollector {
381 database_id: DatabaseId,
382 node_to_collect: NodeToCollect,
383 database_state: BarrierWorkerState,
384 create_mview_tracker: CreateMviewProgressTracker,
385 creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
386 committed_epoch: u64,
387}
388
389impl Debug for DatabaseInitialBarrierCollector {
390 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
391 f.debug_struct("DatabaseInitialBarrierCollector")
392 .field("database_id", &self.database_id)
393 .field("node_to_collect", &self.node_to_collect)
394 .finish()
395 }
396}
397
398impl DatabaseInitialBarrierCollector {
399 pub(super) fn is_collected(&self) -> bool {
400 self.node_to_collect.is_empty()
401 && self
402 .creating_streaming_job_controls
403 .values()
404 .all(|job| job.is_empty())
405 }
406
407 pub(super) fn database_state(
408 &self,
409 ) -> (
410 &BarrierWorkerState,
411 &HashMap<TableId, CreatingStreamingJobControl>,
412 ) {
413 (&self.database_state, &self.creating_streaming_job_controls)
414 }
415
416 pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
417 assert_eq!(self.database_id.database_id, resp.database_id);
418 if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
419 self.creating_streaming_job_controls
420 .get_mut(&creating_job_id)
421 .expect("should exist")
422 .collect(resp);
423 } else {
424 assert_eq!(resp.epoch, self.committed_epoch);
425 assert!(
426 self.node_to_collect
427 .remove(&(resp.worker_id as _))
428 .is_some()
429 );
430 }
431 }
432
433 pub(super) fn finish(self) -> DatabaseCheckpointControl {
434 assert!(self.is_collected());
435 DatabaseCheckpointControl::recovery(
436 self.database_id,
437 self.create_mview_tracker,
438 self.database_state,
439 self.committed_epoch,
440 self.creating_streaming_job_controls,
441 )
442 }
443
444 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
445 is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
446 && self
447 .creating_streaming_job_controls
448 .values_mut()
449 .all(|job| job.is_valid_after_worker_err(worker_id))
450 }
451}
452
453impl ControlStreamManager {
454 #[expect(clippy::too_many_arguments)]
456 pub(super) fn inject_database_initial_barrier(
457 &mut self,
458 database_id: DatabaseId,
459 jobs: HashMap<TableId, InflightStreamingJobInfo>,
460 state_table_committed_epochs: &mut HashMap<TableId, u64>,
461 state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
462 edges: &mut FragmentEdgeBuildResult,
463 stream_actors: &HashMap<ActorId, StreamActor>,
464 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
465 background_jobs: &mut HashMap<TableId, (String, StreamJobFragments)>,
466 mut subscription_info: InflightSubscriptionInfo,
467 is_paused: bool,
468 hummock_version_stats: &HummockVersionStats,
469 ) -> MetaResult<DatabaseInitialBarrierCollector> {
470 self.add_partial_graph(database_id, None);
471 let source_split_assignments = jobs
472 .values()
473 .flat_map(|job| job.fragment_infos())
474 .flat_map(|info| info.actors.keys())
475 .filter_map(|actor_id| {
476 let actor_id = *actor_id as ActorId;
477 source_splits
478 .remove(&actor_id)
479 .map(|splits| (actor_id, splits))
480 })
481 .collect();
482 let mutation = Mutation::Add(AddMutation {
483 actor_dispatchers: Default::default(),
485 added_actors: Default::default(),
486 actor_splits: build_actor_connector_splits(&source_split_assignments),
487 pause: is_paused,
488 subscriptions_to_add: Default::default(),
489 backfill_nodes_to_pause: Default::default(),
491 });
492
493 fn resolve_jobs_committed_epoch(
494 state_table_committed_epochs: &mut HashMap<TableId, u64>,
495 jobs: impl IntoIterator<Item = &InflightStreamingJobInfo>,
496 ) -> u64 {
497 let mut epochs = jobs
498 .into_iter()
499 .flat_map(InflightStreamingJobInfo::existing_table_ids)
500 .map(|table_id| {
501 (
502 table_id,
503 state_table_committed_epochs
504 .remove(&table_id)
505 .expect("should exist"),
506 )
507 });
508 let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
509 for (table_id, epoch) in epochs {
510 assert_eq!(
511 prev_epoch, epoch,
512 "{} has different committed epoch to {}",
513 first_table_id, table_id
514 );
515 }
516 prev_epoch
517 }
518
519 let mut database_jobs = HashMap::new();
520 let mut snapshot_backfill_jobs = HashMap::new();
521 let mut background_mviews = HashMap::new();
522
523 for (job_id, job) in jobs {
524 if let Some((definition, stream_job_fragments)) = background_jobs.remove(&job_id) {
525 if stream_job_fragments.fragments().any(|fragment| {
526 fragment
527 .fragment_type_mask
528 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
529 }) {
530 debug!(%job_id, definition, "recovered snapshot backfill job");
531 snapshot_backfill_jobs.insert(job_id, (job, definition, stream_job_fragments));
532 } else {
533 database_jobs.insert(job_id, job);
534 background_mviews.insert(job_id, (definition, stream_job_fragments));
535 }
536 } else {
537 database_jobs.insert(job_id, job);
538 }
539 }
540
541 let database_job_log_epochs: HashMap<_, _> = database_jobs
542 .keys()
543 .filter_map(|job_id| {
544 state_table_log_epochs
545 .remove(job_id)
546 .map(|epochs| (*job_id, epochs))
547 })
548 .collect();
549
550 let prev_epoch =
551 resolve_jobs_committed_epoch(state_table_committed_epochs, database_jobs.values());
552 let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
553 let curr_epoch = prev_epoch.next();
555 let barrier_info = BarrierInfo {
556 prev_epoch,
557 curr_epoch,
558 kind: BarrierKind::Initial,
559 };
560
561 let mut ongoing_snapshot_backfill_jobs: HashMap<TableId, _> = HashMap::new();
562 for (job_id, (info, definition, stream_job_fragments)) in snapshot_backfill_jobs {
563 let committed_epoch =
564 resolve_jobs_committed_epoch(state_table_committed_epochs, [&info]);
565 if committed_epoch == barrier_info.prev_epoch() {
566 info!(
567 "recovered creating snapshot backfill job {} catch up with upstream already",
568 job_id
569 );
570 background_mviews
571 .try_insert(job_id, (definition, stream_job_fragments))
572 .expect("non-duplicate");
573 database_jobs
574 .try_insert(job_id, info)
575 .expect("non-duplicate");
576 continue;
577 }
578 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
579 stream_job_fragments
580 .fragments()
581 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
582 )?
583 .0
584 .ok_or_else(|| {
585 anyhow!(
586 "recovered snapshot backfill job {} has no snapshot backfill info",
587 job_id
588 )
589 })?;
590 let mut snapshot_epoch = None;
591 let upstream_table_ids: HashSet<_> = snapshot_backfill_info
592 .upstream_mv_table_id_to_backfill_epoch
593 .keys()
594 .cloned()
595 .collect();
596 for (upstream_table_id, epoch) in
597 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
598 {
599 let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
600 let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
601 if *snapshot_epoch != epoch {
602 return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
603 }
604 }
605 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
606 anyhow!(
607 "snapshot backfill job {} has not set snapshot epoch",
608 job_id
609 )
610 })?;
611 for upstream_table_id in &upstream_table_ids {
612 subscription_info
613 .mv_depended_subscriptions
614 .entry(*upstream_table_id)
615 .or_default()
616 .try_insert(job_id.into(), max(snapshot_epoch, committed_epoch))
617 .expect("non-duplicate");
618 }
619 ongoing_snapshot_backfill_jobs
620 .try_insert(
621 job_id,
622 (
623 info,
624 definition,
625 stream_job_fragments,
626 upstream_table_ids,
627 committed_epoch,
628 snapshot_epoch,
629 ),
630 )
631 .expect("non-duplicated");
632 }
633
634 let node_to_collect = {
635 let node_actors = edges.collect_actors_to_create(database_jobs.values().flatten().map(
636 move |fragment_info| {
637 (
638 fragment_info.fragment_id,
639 &fragment_info.nodes,
640 fragment_info.actors.iter().map(move |(actor_id, actor)| {
641 (
642 stream_actors.get(actor_id).expect("should exist"),
643 actor.worker_id,
644 )
645 }),
646 )
647 },
648 ));
649
650 let node_to_collect = self.inject_barrier(
651 database_id,
652 None,
653 Some(mutation.clone()),
654 &barrier_info,
655 database_jobs.values().flatten(),
656 database_jobs.values().flatten(),
657 Some(node_actors),
658 (&subscription_info).into_iter().collect(),
659 vec![],
660 )?;
661 debug!(
662 ?node_to_collect,
663 database_id = database_id.database_id,
664 "inject initial barrier"
665 );
666 node_to_collect
667 };
668
669 let tracker = CreateMviewProgressTracker::recover(
670 background_mviews
671 .iter()
672 .map(|(table_id, (definition, stream_job_fragments))| {
673 (
674 *table_id,
675 (definition.clone(), stream_job_fragments, Default::default()),
676 )
677 }),
678 hummock_version_stats,
679 );
680
681 let mut creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl> =
682 HashMap::new();
683 for (
684 job_id,
685 (
686 info,
687 definition,
688 stream_job_fragments,
689 upstream_table_ids,
690 committed_epoch,
691 snapshot_epoch,
692 ),
693 ) in ongoing_snapshot_backfill_jobs
694 {
695 let node_actors =
696 edges.collect_actors_to_create(info.fragment_infos().map(move |fragment_info| {
697 (
698 fragment_info.fragment_id,
699 &fragment_info.nodes,
700 fragment_info.actors.iter().map(move |(actor_id, actor)| {
701 (
702 stream_actors.get(actor_id).expect("should exist"),
703 actor.worker_id,
704 )
705 }),
706 )
707 }));
708
709 creating_streaming_job_controls.insert(
710 job_id,
711 CreatingStreamingJobControl::recover(
712 database_id,
713 job_id,
714 definition,
715 upstream_table_ids,
716 &database_job_log_epochs,
717 snapshot_epoch,
718 committed_epoch,
719 barrier_info.curr_epoch.value().0,
720 info,
721 stream_job_fragments,
722 hummock_version_stats,
723 node_actors,
724 mutation.clone(),
725 self,
726 )?,
727 );
728 }
729
730 let committed_epoch = barrier_info.prev_epoch();
731 let new_epoch = barrier_info.curr_epoch;
732 let mut database = InflightDatabaseInfo::empty();
733 database_jobs
734 .into_values()
735 .for_each(|job| database.extend(job));
736 let database_state =
737 BarrierWorkerState::recovery(new_epoch, database, subscription_info, is_paused);
738 Ok(DatabaseInitialBarrierCollector {
739 database_id,
740 node_to_collect,
741 database_state,
742 create_mview_tracker: tracker,
743 creating_streaming_job_controls,
744 committed_epoch,
745 })
746 }
747
748 pub(super) fn inject_command_ctx_barrier(
749 &mut self,
750 database_id: DatabaseId,
751 command: Option<&Command>,
752 barrier_info: &BarrierInfo,
753 is_paused: bool,
754 pre_applied_graph_info: &InflightDatabaseInfo,
755 applied_graph_info: &InflightDatabaseInfo,
756 edges: &mut Option<FragmentEdgeBuildResult>,
757 ) -> MetaResult<NodeToCollect> {
758 let mutation = command.and_then(|c| c.to_mutation(is_paused, edges, self));
759 let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation {
760 add.subscriptions_to_add.clone()
761 } else {
762 vec![]
763 };
764 let subscriptions_to_remove = if let Some(Mutation::DropSubscriptions(drop)) = &mutation {
765 drop.info.clone()
766 } else {
767 vec![]
768 };
769 self.inject_barrier(
770 database_id,
771 None,
772 mutation,
773 barrier_info,
774 pre_applied_graph_info.fragment_infos(),
775 applied_graph_info.fragment_infos(),
776 command
777 .as_ref()
778 .map(|command| command.actors_to_create(pre_applied_graph_info, edges, self))
779 .unwrap_or_default(),
780 subscriptions_to_add,
781 subscriptions_to_remove,
782 )
783 }
784
785 pub(super) fn inject_barrier<'a>(
786 &mut self,
787 database_id: DatabaseId,
788 creating_table_id: Option<TableId>,
789 mutation: Option<Mutation>,
790 barrier_info: &BarrierInfo,
791 pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
792 applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
793 mut new_actors: Option<StreamJobActorsToCreate>,
794 subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
795 subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
796 ) -> MetaResult<NodeToCollect> {
797 fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
798 "inject_barrier_err"
799 ));
800
801 let partial_graph_id = to_partial_graph_id(creating_table_id);
802
803 let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
804
805 for worker_id in node_actors.keys() {
806 if !self.connected_nodes.contains_key(worker_id) {
807 return Err(anyhow!("unconnected worker node {}", worker_id).into());
808 }
809 }
810
811 let table_ids_to_sync: HashSet<_> =
812 InflightFragmentInfo::existing_table_ids(applied_graph_info)
813 .map(|table_id| table_id.table_id)
814 .collect();
815
816 let mut node_need_collect = HashMap::new();
817
818 self.connected_nodes
819 .iter()
820 .try_for_each(|(node_id, node)| {
821 let actor_ids_to_collect = node_actors
822 .get(node_id)
823 .map(|actors| actors.iter().cloned())
824 .into_iter()
825 .flatten()
826 .collect_vec();
827 let is_empty = actor_ids_to_collect.is_empty();
828 {
829 let mutation = mutation.clone();
830 let barrier = Barrier {
831 epoch: Some(risingwave_pb::data::Epoch {
832 curr: barrier_info.curr_epoch.value().0,
833 prev: barrier_info.prev_epoch(),
834 }),
835 mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
836 tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
837 .to_protobuf(),
838 kind: barrier_info.kind.to_protobuf() as i32,
839 passed_actors: vec![],
840 };
841
842 node.handle
843 .request_sender
844 .send(StreamingControlStreamRequest {
845 request: Some(
846 streaming_control_stream_request::Request::InjectBarrier(
847 InjectBarrierRequest {
848 request_id: Uuid::new_v4().to_string(),
849 barrier: Some(barrier),
850 database_id: database_id.database_id,
851 actor_ids_to_collect,
852 table_ids_to_sync: table_ids_to_sync
853 .iter()
854 .cloned()
855 .collect(),
856 partial_graph_id,
857 actors_to_build: new_actors
858 .as_mut()
859 .map(|new_actors| new_actors.remove(&(*node_id as _)))
860 .into_iter()
861 .flatten()
862 .flatten()
863 .map(|(fragment_id, (node, actors))| {
864 FragmentBuildActorInfo {
865 fragment_id,
866 node: Some(node),
867 actors: actors
868 .into_iter()
869 .map(|(actor, upstreams, dispatchers)| {
870 BuildActorInfo {
871 actor_id: actor.actor_id,
872 fragment_upstreams: upstreams
873 .into_iter()
874 .map(|(fragment_id, upstreams)| {
875 (
876 fragment_id,
877 UpstreamActors {
878 actors: upstreams
879 .into_values()
880 .collect(),
881 },
882 )
883 })
884 .collect(),
885 dispatchers,
886 vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
887 mview_definition: actor.mview_definition,
888 expr_context: actor.expr_context,
889 }
890 })
891 .collect(),
892 }
893 })
894 .collect(),
895 subscriptions_to_add: subscriptions_to_add.clone(),
896 subscriptions_to_remove: subscriptions_to_remove.clone(),
897 },
898 ),
899 ),
900 })
901 .map_err(|_| {
902 MetaError::from(anyhow!(
903 "failed to send request to {} {:?}",
904 node.worker_id,
905 node.host
906 ))
907 })?;
908
909 node_need_collect.insert(*node_id as WorkerId, is_empty);
910 Result::<_, MetaError>::Ok(())
911 }
912 })
913 .inspect_err(|e| {
914 use risingwave_pb::meta::event_log;
916 let event = event_log::EventInjectBarrierFail {
917 prev_epoch: barrier_info.prev_epoch(),
918 cur_epoch: barrier_info.curr_epoch.value().0,
919 error: e.to_report_string(),
920 };
921 self.env
922 .event_log_manager_ref()
923 .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
924 })?;
925 Ok(node_need_collect)
926 }
927
928 pub(super) fn add_partial_graph(
929 &mut self,
930 database_id: DatabaseId,
931 creating_job_id: Option<TableId>,
932 ) {
933 let partial_graph_id = to_partial_graph_id(creating_job_id);
934 self.connected_nodes.iter().for_each(|(_, node)| {
935 if node
936 .handle
937 .request_sender
938 .send(StreamingControlStreamRequest {
939 request: Some(
940 streaming_control_stream_request::Request::CreatePartialGraph(
941 CreatePartialGraphRequest {
942 database_id: database_id.database_id,
943 partial_graph_id,
944 },
945 ),
946 ),
947 }).is_err() {
948 warn!(%database_id, ?creating_job_id, worker_id = node.worker_id, "fail to add partial graph to worker")
949 }
950 });
951 }
952
953 pub(super) fn remove_partial_graph(
954 &mut self,
955 database_id: DatabaseId,
956 creating_job_ids: Vec<TableId>,
957 ) {
958 if creating_job_ids.is_empty() {
959 return;
960 }
961 let partial_graph_ids = creating_job_ids
962 .into_iter()
963 .map(|job_id| to_partial_graph_id(Some(job_id)))
964 .collect_vec();
965 self.connected_nodes.iter().for_each(|(_, node)| {
966 if node.handle
967 .request_sender
968 .send(StreamingControlStreamRequest {
969 request: Some(
970 streaming_control_stream_request::Request::RemovePartialGraph(
971 RemovePartialGraphRequest {
972 partial_graph_ids: partial_graph_ids.clone(),
973 database_id: database_id.database_id,
974 },
975 ),
976 ),
977 })
978 .is_err()
979 {
980 warn!(worker_id = node.worker_id,node = ?node.host,"failed to send remove partial graph request");
981 }
982 })
983 }
984
985 pub(super) fn reset_database(
986 &mut self,
987 database_id: DatabaseId,
988 reset_request_id: u32,
989 ) -> HashSet<WorkerId> {
990 self.connected_nodes
991 .iter()
992 .filter_map(|(worker_id, node)| {
993 if node
994 .handle
995 .request_sender
996 .send(StreamingControlStreamRequest {
997 request: Some(streaming_control_stream_request::Request::ResetDatabase(
998 ResetDatabaseRequest {
999 database_id: database_id.database_id,
1000 reset_request_id,
1001 },
1002 )),
1003 })
1004 .is_err()
1005 {
1006 warn!(worker_id, node = ?node.host,"failed to send reset database request");
1007 None
1008 } else {
1009 Some(*worker_id)
1010 }
1011 })
1012 .collect()
1013 }
1014}
1015
1016impl GlobalBarrierWorkerContextImpl {
1017 pub(super) async fn new_control_stream_impl(
1018 &self,
1019 node: &WorkerNode,
1020 init_request: &PbInitRequest,
1021 ) -> MetaResult<StreamingControlHandle> {
1022 let handle = self
1023 .env
1024 .stream_client_pool()
1025 .get(node)
1026 .await?
1027 .start_streaming_control(init_request.clone())
1028 .await?;
1029 Ok(handle)
1030 }
1031}
1032
1033pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1034 message: &str,
1035 errors: impl IntoIterator<Item = (WorkerId, E)>,
1036) -> MetaError {
1037 use std::fmt::Write;
1038
1039 use risingwave_common::error::error_request_copy;
1040 use risingwave_common::error::tonic::extra::Score;
1041
1042 let errors = errors.into_iter().collect_vec();
1043
1044 if errors.is_empty() {
1045 return anyhow!(message.to_owned()).into();
1046 }
1047
1048 let single_error = |(worker_id, e)| {
1050 anyhow::Error::from(e)
1051 .context(format!("{message}, in worker node {worker_id}"))
1052 .into()
1053 };
1054
1055 if errors.len() == 1 {
1056 return single_error(errors.into_iter().next().unwrap());
1057 }
1058
1059 let max_score = errors
1061 .iter()
1062 .filter_map(|(_, e)| error_request_copy::<Score>(e))
1063 .max();
1064
1065 if let Some(max_score) = max_score {
1066 let mut errors = errors;
1067 let max_scored = errors
1068 .extract_if(.., |(_, e)| {
1069 error_request_copy::<Score>(e) == Some(max_score)
1070 })
1071 .next()
1072 .unwrap();
1073
1074 return single_error(max_scored);
1075 }
1076
1077 let concat: String = errors
1079 .into_iter()
1080 .fold(format!("{message}: "), |mut s, (w, e)| {
1081 write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1082 s
1083 });
1084 anyhow!(concat).into()
1085}