1use std::cmp::max;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::error::Error;
19use std::fmt::{Debug, Formatter};
20use std::future::poll_fn;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::StreamExt;
27use futures::future::join_all;
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, TableId};
30use risingwave_common::util::epoch::Epoch;
31use risingwave_common::util::tracing::TracingContext;
32use risingwave_connector::source::SplitImpl;
33use risingwave_meta_model::WorkerId;
34use risingwave_pb::common::{HostAddress, WorkerNode};
35use risingwave_pb::hummock::HummockVersionStats;
36use risingwave_pb::stream_plan::barrier_mutation::Mutation;
37use risingwave_pb::stream_plan::{
38 AddMutation, Barrier, BarrierMutation, FragmentTypeFlag, SubscriptionUpstreamInfo,
39};
40use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
41use risingwave_pb::stream_service::inject_barrier_request::{
42 BuildActorInfo, FragmentBuildActorInfo,
43};
44use risingwave_pb::stream_service::streaming_control_stream_request::{
45 CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph,
46 RemovePartialGraphRequest, ResetDatabaseRequest,
47};
48use risingwave_pb::stream_service::{
49 BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
50 streaming_control_stream_request, streaming_control_stream_response,
51};
52use risingwave_rpc_client::StreamingControlHandle;
53use thiserror_ext::AsReport;
54use tokio::time::sleep;
55use tokio_retry::strategy::ExponentialBackoff;
56use tracing::{debug, error, info, warn};
57use uuid::Uuid;
58
59use super::{BarrierKind, Command, InflightSubscriptionInfo, TracedEpoch};
60use crate::barrier::checkpoint::{
61 BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
62};
63use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
64use crate::barrier::edge_builder::FragmentEdgeBuildResult;
65use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo};
66use crate::barrier::progress::CreateMviewProgressTracker;
67use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
68use crate::controller::fragment::InflightFragmentInfo;
69use crate::manager::MetaSrvEnv;
70use crate::model::{ActorId, StreamActor, StreamJobActorsToCreate, StreamJobFragments};
71use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
72use crate::{MetaError, MetaResult};
73
74fn to_partial_graph_id(job_id: Option<TableId>) -> u32 {
75 job_id
76 .map(|table| {
77 assert_ne!(table.table_id, u32::MAX);
78 table.table_id
79 })
80 .unwrap_or(u32::MAX)
81}
82
83pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<TableId> {
84 if partial_graph_id == u32::MAX {
85 None
86 } else {
87 Some(TableId::new(partial_graph_id))
88 }
89}
90
91struct ControlStreamNode {
92 worker_id: WorkerId,
93 host: HostAddress,
94 handle: StreamingControlHandle,
95}
96
97pub(super) struct ControlStreamManager {
98 connected_nodes: HashMap<WorkerId, ControlStreamNode>,
99 workers: HashMap<WorkerId, WorkerNode>,
100 env: MetaSrvEnv,
101}
102
103impl ControlStreamManager {
104 pub(super) fn new(env: MetaSrvEnv) -> Self {
105 Self {
106 connected_nodes: Default::default(),
107 workers: Default::default(),
108 env,
109 }
110 }
111
112 pub(super) fn is_connected(&self, worker_id: WorkerId) -> bool {
113 self.connected_nodes.contains_key(&worker_id)
114 }
115
116 pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
117 self.workers[&worker_id].host.clone().unwrap()
118 }
119
120 #[await_tree::instrument("try_reconnect_worker({worker_id})")]
121 pub(super) async fn try_reconnect_worker(
122 &mut self,
123 worker_id: WorkerId,
124 inflight_infos: impl Iterator<
125 Item = (
126 DatabaseId,
127 &InflightSubscriptionInfo,
128 impl Iterator<Item = TableId>,
129 ),
130 >,
131 term_id: String,
132 context: &impl GlobalBarrierWorkerContext,
133 ) {
134 if self.connected_nodes.contains_key(&worker_id) {
135 warn!(worker_id, "node already connected");
136 return;
137 }
138 let node = &self.workers[&worker_id];
139 let node_host = node.host.as_ref().unwrap();
140
141 let init_request = Self::collect_init_request(inflight_infos, term_id);
142 match context.new_control_stream(node, &init_request).await {
143 Ok(handle) => {
144 assert!(
145 self.connected_nodes
146 .insert(
147 worker_id,
148 ControlStreamNode {
149 worker_id,
150 host: node.host.clone().unwrap(),
151 handle,
152 }
153 )
154 .is_none()
155 );
156 info!(?node_host, "add control stream worker");
157 }
158 Err(e) => {
159 error!(err = %e.as_report(), ?node_host, "fail to create worker node");
160 }
161 }
162 }
163
164 pub(super) async fn add_worker(
165 &mut self,
166 node: WorkerNode,
167 inflight_infos: impl Iterator<
168 Item = (
169 DatabaseId,
170 &InflightSubscriptionInfo,
171 impl Iterator<Item = TableId>,
172 ),
173 >,
174 term_id: String,
175 context: &impl GlobalBarrierWorkerContext,
176 ) {
177 let node_id = node.id as WorkerId;
178 let node = match self.workers.entry(node_id) {
179 Entry::Occupied(entry) => {
180 let entry = entry.into_mut();
181 assert_eq!(entry.host, node.host);
182 warn!(id = node.id, host = ?node.host, "node already exists");
183 &*entry
184 }
185 Entry::Vacant(entry) => &*entry.insert(node),
186 };
187 if self.connected_nodes.contains_key(&node_id) {
188 warn!(id = node.id, host = ?node.host, "new node already connected");
189 return;
190 }
191 let node_host = node.host.clone().unwrap();
192 let mut backoff = ExponentialBackoff::from_millis(100)
193 .max_delay(Duration::from_secs(3))
194 .factor(5);
195 let init_request = Self::collect_init_request(inflight_infos, term_id);
196 const MAX_RETRY: usize = 5;
197 for i in 1..=MAX_RETRY {
198 match context.new_control_stream(node, &init_request).await {
199 Ok(handle) => {
200 assert!(
201 self.connected_nodes
202 .insert(
203 node_id,
204 ControlStreamNode {
205 worker_id: node.id as _,
206 host: node.host.clone().unwrap(),
207 handle,
208 }
209 )
210 .is_none()
211 );
212 info!(?node_host, "add control stream worker");
213 return;
214 }
215 Err(e) => {
216 let delay = backoff.next().unwrap();
219 error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
220 sleep(delay).await;
221 }
222 }
223 }
224 error!(?node_host, "fail to create worker node after retry");
225 }
226
227 pub(super) async fn reset(
228 &mut self,
229 nodes: &HashMap<WorkerId, WorkerNode>,
230 term_id: String,
231 context: &impl GlobalBarrierWorkerContext,
232 ) -> HashSet<WorkerId> {
233 let init_request = PbInitRequest {
234 databases: vec![],
235 term_id,
236 };
237 let init_request = &init_request;
238 self.workers = nodes.clone();
239 let nodes = join_all(nodes.iter().map(|(worker_id, node)| async move {
240 let result = context.new_control_stream(node, init_request).await;
241 (*worker_id, node.clone(), result)
242 }))
243 .await;
244 self.connected_nodes.clear();
245 let mut failed_workers = HashSet::new();
246 for (worker_id, node, result) in nodes {
247 match result {
248 Ok(handle) => {
249 assert!(
250 self.connected_nodes
251 .insert(
252 worker_id,
253 ControlStreamNode {
254 worker_id: node.id as _,
255 host: node.host.clone().unwrap(),
256 handle
257 }
258 )
259 .is_none()
260 );
261 }
262 Err(e) => {
263 failed_workers.insert(worker_id);
264 warn!(
265 e = %e.as_report(),
266 worker_id,
267 ?node,
268 "failed to connect to node"
269 )
270 }
271 }
272 }
273
274 failed_workers
275 }
276
277 pub(super) fn clear(&mut self) {
279 *self = Self::new(self.env.clone());
280 }
281
282 fn poll_next_response(
283 &mut self,
284 cx: &mut Context<'_>,
285 ) -> Poll<(
286 WorkerId,
287 MetaResult<streaming_control_stream_response::Response>,
288 )> {
289 if self.connected_nodes.is_empty() {
290 return Poll::Pending;
291 }
292 let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending;
293 {
294 for (worker_id, node) in &mut self.connected_nodes {
295 match node.handle.response_stream.poll_next_unpin(cx) {
296 Poll::Ready(result) => {
297 poll_result = Poll::Ready((
298 *worker_id,
299 result
300 .ok_or_else(|| anyhow!("end of stream").into())
301 .and_then(|result| {
302 result.map_err(Into::<MetaError>::into).and_then(|resp| {
303 match resp
304 .response
305 .ok_or_else(||anyhow!("empty response"))?
306 {
307 streaming_control_stream_response::Response::Shutdown(_) => Err(anyhow!(
308 "worker node {worker_id} is shutting down"
309 )
310 .into()),
311 streaming_control_stream_response::Response::Init(_) => {
312 Err(anyhow!("get unexpected init response").into())
314 }
315 resp => Ok(resp),
316 }
317 })
318 })
319 ));
320 break;
321 }
322 Poll::Pending => {
323 continue;
324 }
325 }
326 }
327 };
328
329 if let Poll::Ready((worker_id, Err(err))) = &poll_result {
330 let node = self
331 .connected_nodes
332 .remove(worker_id)
333 .expect("should exist when get shutdown resp");
334 warn!(worker_id = node.worker_id, host = ?node.host, err = %err.as_report(), "get error from response stream");
335 }
336
337 poll_result
338 }
339
340 #[await_tree::instrument("control_stream_next_response")]
341 pub(super) async fn next_response(
342 &mut self,
343 ) -> (
344 WorkerId,
345 MetaResult<streaming_control_stream_response::Response>,
346 ) {
347 poll_fn(|cx| self.poll_next_response(cx)).await
348 }
349
350 fn collect_init_request(
351 initial_inflight_infos: impl Iterator<
352 Item = (
353 DatabaseId,
354 &InflightSubscriptionInfo,
355 impl Iterator<Item = TableId>,
356 ),
357 >,
358 term_id: String,
359 ) -> PbInitRequest {
360 PbInitRequest {
361 databases: initial_inflight_infos
362 .map(|(database_id, subscriptions, creating_job_ids)| {
363 let mut graphs = vec![PbInitialPartialGraph {
364 partial_graph_id: to_partial_graph_id(None),
365 subscriptions: subscriptions.into_iter().collect_vec(),
366 }];
367 graphs.extend(creating_job_ids.map(|job_id| PbInitialPartialGraph {
368 partial_graph_id: to_partial_graph_id(Some(job_id)),
369 subscriptions: vec![],
370 }));
371 PbDatabaseInitialPartialGraph {
372 database_id: database_id.database_id,
373 graphs,
374 }
375 })
376 .collect(),
377 term_id,
378 }
379 }
380}
381
382pub(super) struct DatabaseInitialBarrierCollector {
383 database_id: DatabaseId,
384 node_to_collect: NodeToCollect,
385 database_state: BarrierWorkerState,
386 create_mview_tracker: CreateMviewProgressTracker,
387 creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
388 committed_epoch: u64,
389}
390
391impl Debug for DatabaseInitialBarrierCollector {
392 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
393 f.debug_struct("DatabaseInitialBarrierCollector")
394 .field("database_id", &self.database_id)
395 .field("node_to_collect", &self.node_to_collect)
396 .finish()
397 }
398}
399
400impl DatabaseInitialBarrierCollector {
401 pub(super) fn is_collected(&self) -> bool {
402 self.node_to_collect.is_empty()
403 && self
404 .creating_streaming_job_controls
405 .values()
406 .all(|job| job.is_empty())
407 }
408
409 pub(super) fn database_state(
410 &self,
411 ) -> (
412 &BarrierWorkerState,
413 &HashMap<TableId, CreatingStreamingJobControl>,
414 ) {
415 (&self.database_state, &self.creating_streaming_job_controls)
416 }
417
418 pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
419 assert_eq!(self.database_id.database_id, resp.database_id);
420 if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
421 self.creating_streaming_job_controls
422 .get_mut(&creating_job_id)
423 .expect("should exist")
424 .collect(resp);
425 } else {
426 assert_eq!(resp.epoch, self.committed_epoch);
427 assert!(
428 self.node_to_collect
429 .remove(&(resp.worker_id as _))
430 .is_some()
431 );
432 }
433 }
434
435 pub(super) fn finish(self) -> DatabaseCheckpointControl {
436 assert!(self.is_collected());
437 DatabaseCheckpointControl::recovery(
438 self.database_id,
439 self.create_mview_tracker,
440 self.database_state,
441 self.committed_epoch,
442 self.creating_streaming_job_controls,
443 )
444 }
445
446 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
447 is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
448 && self
449 .creating_streaming_job_controls
450 .values_mut()
451 .all(|job| job.is_valid_after_worker_err(worker_id))
452 }
453}
454
455impl ControlStreamManager {
456 #[expect(clippy::too_many_arguments)]
458 pub(super) fn inject_database_initial_barrier(
459 &mut self,
460 database_id: DatabaseId,
461 jobs: HashMap<TableId, InflightStreamingJobInfo>,
462 state_table_committed_epochs: &mut HashMap<TableId, u64>,
463 state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
464 edges: &mut FragmentEdgeBuildResult,
465 stream_actors: &HashMap<ActorId, StreamActor>,
466 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
467 background_jobs: &mut HashMap<TableId, (String, StreamJobFragments)>,
468 mut subscription_info: InflightSubscriptionInfo,
469 is_paused: bool,
470 hummock_version_stats: &HummockVersionStats,
471 ) -> MetaResult<DatabaseInitialBarrierCollector> {
472 self.add_partial_graph(database_id, None);
473 let source_split_assignments = jobs
474 .values()
475 .flat_map(|job| job.fragment_infos())
476 .flat_map(|info| info.actors.keys())
477 .filter_map(|actor_id| {
478 let actor_id = *actor_id as ActorId;
479 source_splits
480 .remove(&actor_id)
481 .map(|splits| (actor_id, splits))
482 })
483 .collect();
484 let mutation = Mutation::Add(AddMutation {
485 actor_dispatchers: Default::default(),
487 added_actors: Default::default(),
488 actor_splits: build_actor_connector_splits(&source_split_assignments),
489 pause: is_paused,
490 subscriptions_to_add: Default::default(),
491 backfill_nodes_to_pause: Default::default(),
493 });
494
495 fn resolve_jobs_committed_epoch(
496 state_table_committed_epochs: &mut HashMap<TableId, u64>,
497 jobs: impl IntoIterator<Item = &InflightStreamingJobInfo>,
498 ) -> u64 {
499 let mut epochs = jobs
500 .into_iter()
501 .flat_map(InflightStreamingJobInfo::existing_table_ids)
502 .map(|table_id| {
503 (
504 table_id,
505 state_table_committed_epochs
506 .remove(&table_id)
507 .expect("should exist"),
508 )
509 });
510 let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
511 for (table_id, epoch) in epochs {
512 assert_eq!(
513 prev_epoch, epoch,
514 "{} has different committed epoch to {}",
515 first_table_id, table_id
516 );
517 }
518 prev_epoch
519 }
520
521 let mut database_jobs = HashMap::new();
522 let mut snapshot_backfill_jobs = HashMap::new();
523 let mut background_mviews = HashMap::new();
524
525 for (job_id, job) in jobs {
526 if let Some((definition, stream_job_fragments)) = background_jobs.remove(&job_id) {
527 if stream_job_fragments.fragments().any(|fragment| {
528 (fragment.fragment_type_mask
529 & (FragmentTypeFlag::SnapshotBackfillStreamScan as u32))
530 != 0
531 }) {
532 debug!(%job_id, definition, "recovered snapshot backfill job");
533 snapshot_backfill_jobs.insert(job_id, (job, definition, stream_job_fragments));
534 } else {
535 database_jobs.insert(job_id, job);
536 background_mviews.insert(job_id, (definition, stream_job_fragments));
537 }
538 } else {
539 database_jobs.insert(job_id, job);
540 }
541 }
542
543 let database_job_log_epochs: HashMap<_, _> = database_jobs
544 .keys()
545 .filter_map(|job_id| {
546 state_table_log_epochs
547 .remove(job_id)
548 .map(|epochs| (*job_id, epochs))
549 })
550 .collect();
551
552 let prev_epoch =
553 resolve_jobs_committed_epoch(state_table_committed_epochs, database_jobs.values());
554 let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
555 let curr_epoch = prev_epoch.next();
557 let barrier_info = BarrierInfo {
558 prev_epoch,
559 curr_epoch,
560 kind: BarrierKind::Initial,
561 };
562
563 let mut ongoing_snapshot_backfill_jobs: HashMap<TableId, _> = HashMap::new();
564 for (job_id, (info, definition, stream_job_fragments)) in snapshot_backfill_jobs {
565 let committed_epoch =
566 resolve_jobs_committed_epoch(state_table_committed_epochs, [&info]);
567 if committed_epoch == barrier_info.prev_epoch() {
568 info!(
569 "recovered creating snapshot backfill job {} catch up with upstream already",
570 job_id
571 );
572 background_mviews
573 .try_insert(job_id, (definition, stream_job_fragments))
574 .expect("non-duplicate");
575 database_jobs
576 .try_insert(job_id, info)
577 .expect("non-duplicate");
578 continue;
579 }
580 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
581 stream_job_fragments
582 .fragments()
583 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
584 )?
585 .0
586 .ok_or_else(|| {
587 anyhow!(
588 "recovered snapshot backfill job {} has no snapshot backfill info",
589 job_id
590 )
591 })?;
592 let mut snapshot_epoch = None;
593 let upstream_table_ids: HashSet<_> = snapshot_backfill_info
594 .upstream_mv_table_id_to_backfill_epoch
595 .keys()
596 .cloned()
597 .collect();
598 for (upstream_table_id, epoch) in
599 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
600 {
601 let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
602 let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
603 if *snapshot_epoch != epoch {
604 return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
605 }
606 }
607 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
608 anyhow!(
609 "snapshot backfill job {} has not set snapshot epoch",
610 job_id
611 )
612 })?;
613 for upstream_table_id in &upstream_table_ids {
614 subscription_info
615 .mv_depended_subscriptions
616 .entry(*upstream_table_id)
617 .or_default()
618 .try_insert(job_id.into(), max(snapshot_epoch, committed_epoch))
619 .expect("non-duplicate");
620 }
621 ongoing_snapshot_backfill_jobs
622 .try_insert(
623 job_id,
624 (
625 info,
626 definition,
627 stream_job_fragments,
628 upstream_table_ids,
629 committed_epoch,
630 snapshot_epoch,
631 ),
632 )
633 .expect("non-duplicated");
634 }
635
636 let node_to_collect = {
637 let node_actors = edges.collect_actors_to_create(database_jobs.values().flatten().map(
638 move |fragment_info| {
639 (
640 fragment_info.fragment_id,
641 &fragment_info.nodes,
642 fragment_info.actors.iter().map(move |(actor_id, actor)| {
643 (
644 stream_actors.get(actor_id).expect("should exist"),
645 actor.worker_id,
646 )
647 }),
648 )
649 },
650 ));
651
652 let node_to_collect = self.inject_barrier(
653 database_id,
654 None,
655 Some(mutation.clone()),
656 &barrier_info,
657 database_jobs.values().flatten(),
658 database_jobs.values().flatten(),
659 Some(node_actors),
660 (&subscription_info).into_iter().collect(),
661 vec![],
662 )?;
663 debug!(
664 ?node_to_collect,
665 database_id = database_id.database_id,
666 "inject initial barrier"
667 );
668 node_to_collect
669 };
670
671 let tracker = CreateMviewProgressTracker::recover(
672 background_mviews
673 .iter()
674 .map(|(table_id, (definition, stream_job_fragments))| {
675 (*table_id, (definition.clone(), stream_job_fragments))
676 }),
677 hummock_version_stats,
678 );
679
680 let mut creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl> =
681 HashMap::new();
682 for (
683 job_id,
684 (
685 info,
686 definition,
687 stream_job_fragments,
688 upstream_table_ids,
689 committed_epoch,
690 snapshot_epoch,
691 ),
692 ) in ongoing_snapshot_backfill_jobs
693 {
694 let node_actors =
695 edges.collect_actors_to_create(info.fragment_infos().map(move |fragment_info| {
696 (
697 fragment_info.fragment_id,
698 &fragment_info.nodes,
699 fragment_info.actors.iter().map(move |(actor_id, actor)| {
700 (
701 stream_actors.get(actor_id).expect("should exist"),
702 actor.worker_id,
703 )
704 }),
705 )
706 }));
707
708 creating_streaming_job_controls.insert(
709 job_id,
710 CreatingStreamingJobControl::recover(
711 database_id,
712 job_id,
713 definition,
714 upstream_table_ids,
715 &database_job_log_epochs,
716 snapshot_epoch,
717 committed_epoch,
718 barrier_info.curr_epoch.value().0,
719 info,
720 stream_job_fragments,
721 hummock_version_stats,
722 node_actors,
723 mutation.clone(),
724 self,
725 )?,
726 );
727 }
728
729 let committed_epoch = barrier_info.prev_epoch();
730 let new_epoch = barrier_info.curr_epoch;
731 let mut database = InflightDatabaseInfo::empty();
732 database_jobs
733 .into_values()
734 .for_each(|job| database.extend(job));
735 let database_state =
736 BarrierWorkerState::recovery(new_epoch, database, subscription_info, is_paused);
737 Ok(DatabaseInitialBarrierCollector {
738 database_id,
739 node_to_collect,
740 database_state,
741 create_mview_tracker: tracker,
742 creating_streaming_job_controls,
743 committed_epoch,
744 })
745 }
746
747 pub(super) fn inject_command_ctx_barrier(
748 &mut self,
749 database_id: DatabaseId,
750 command: Option<&Command>,
751 barrier_info: &BarrierInfo,
752 is_paused: bool,
753 pre_applied_graph_info: &InflightDatabaseInfo,
754 applied_graph_info: &InflightDatabaseInfo,
755 edges: &mut Option<FragmentEdgeBuildResult>,
756 ) -> MetaResult<NodeToCollect> {
757 let mutation = command.and_then(|c| c.to_mutation(is_paused, edges, self));
758 let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation {
759 add.subscriptions_to_add.clone()
760 } else {
761 vec![]
762 };
763 let subscriptions_to_remove = if let Some(Mutation::DropSubscriptions(drop)) = &mutation {
764 drop.info.clone()
765 } else {
766 vec![]
767 };
768 self.inject_barrier(
769 database_id,
770 None,
771 mutation,
772 barrier_info,
773 pre_applied_graph_info.fragment_infos(),
774 applied_graph_info.fragment_infos(),
775 command
776 .as_ref()
777 .map(|command| command.actors_to_create(pre_applied_graph_info, edges, self))
778 .unwrap_or_default(),
779 subscriptions_to_add,
780 subscriptions_to_remove,
781 )
782 }
783
784 pub(super) fn inject_barrier<'a>(
785 &mut self,
786 database_id: DatabaseId,
787 creating_table_id: Option<TableId>,
788 mutation: Option<Mutation>,
789 barrier_info: &BarrierInfo,
790 pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
791 applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
792 mut new_actors: Option<StreamJobActorsToCreate>,
793 subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
794 subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
795 ) -> MetaResult<NodeToCollect> {
796 fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
797 "inject_barrier_err"
798 ));
799
800 let partial_graph_id = to_partial_graph_id(creating_table_id);
801
802 let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
803
804 for worker_id in node_actors.keys() {
805 if !self.connected_nodes.contains_key(worker_id) {
806 return Err(anyhow!("unconnected worker node {}", worker_id).into());
807 }
808 }
809
810 let table_ids_to_sync: HashSet<_> =
811 InflightFragmentInfo::existing_table_ids(applied_graph_info)
812 .map(|table_id| table_id.table_id)
813 .collect();
814
815 let mut node_need_collect = HashMap::new();
816
817 self.connected_nodes
818 .iter()
819 .try_for_each(|(node_id, node)| {
820 let actor_ids_to_collect = node_actors
821 .get(node_id)
822 .map(|actors| actors.iter().cloned())
823 .into_iter()
824 .flatten()
825 .collect_vec();
826 let is_empty = actor_ids_to_collect.is_empty();
827 {
828 let mutation = mutation.clone();
829 let barrier = Barrier {
830 epoch: Some(risingwave_pb::data::Epoch {
831 curr: barrier_info.curr_epoch.value().0,
832 prev: barrier_info.prev_epoch(),
833 }),
834 mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
835 tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
836 .to_protobuf(),
837 kind: barrier_info.kind.to_protobuf() as i32,
838 passed_actors: vec![],
839 };
840
841 node.handle
842 .request_sender
843 .send(StreamingControlStreamRequest {
844 request: Some(
845 streaming_control_stream_request::Request::InjectBarrier(
846 InjectBarrierRequest {
847 request_id: Uuid::new_v4().to_string(),
848 barrier: Some(barrier),
849 database_id: database_id.database_id,
850 actor_ids_to_collect,
851 table_ids_to_sync: table_ids_to_sync
852 .iter()
853 .cloned()
854 .collect(),
855 partial_graph_id,
856 actors_to_build: new_actors
857 .as_mut()
858 .map(|new_actors| new_actors.remove(&(*node_id as _)))
859 .into_iter()
860 .flatten()
861 .flatten()
862 .map(|(fragment_id, (node, actors))| {
863 FragmentBuildActorInfo {
864 fragment_id,
865 node: Some(node),
866 actors: actors
867 .into_iter()
868 .map(|(actor, upstreams, dispatchers)| {
869 BuildActorInfo {
870 actor_id: actor.actor_id,
871 fragment_upstreams: upstreams
872 .into_iter()
873 .map(|(fragment_id, upstreams)| {
874 (
875 fragment_id,
876 UpstreamActors {
877 actors: upstreams
878 .into_values()
879 .collect(),
880 },
881 )
882 })
883 .collect(),
884 dispatchers,
885 vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
886 mview_definition: actor.mview_definition,
887 expr_context: actor.expr_context,
888 }
889 })
890 .collect(),
891 }
892 })
893 .collect(),
894 subscriptions_to_add: subscriptions_to_add.clone(),
895 subscriptions_to_remove: subscriptions_to_remove.clone(),
896 },
897 ),
898 ),
899 })
900 .map_err(|_| {
901 MetaError::from(anyhow!(
902 "failed to send request to {} {:?}",
903 node.worker_id,
904 node.host
905 ))
906 })?;
907
908 node_need_collect.insert(*node_id as WorkerId, is_empty);
909 Result::<_, MetaError>::Ok(())
910 }
911 })
912 .inspect_err(|e| {
913 use risingwave_pb::meta::event_log;
915 let event = event_log::EventInjectBarrierFail {
916 prev_epoch: barrier_info.prev_epoch(),
917 cur_epoch: barrier_info.curr_epoch.value().0,
918 error: e.to_report_string(),
919 };
920 self.env
921 .event_log_manager_ref()
922 .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
923 })?;
924 Ok(node_need_collect)
925 }
926
927 pub(super) fn add_partial_graph(
928 &mut self,
929 database_id: DatabaseId,
930 creating_job_id: Option<TableId>,
931 ) {
932 let partial_graph_id = to_partial_graph_id(creating_job_id);
933 self.connected_nodes.iter().for_each(|(_, node)| {
934 if node
935 .handle
936 .request_sender
937 .send(StreamingControlStreamRequest {
938 request: Some(
939 streaming_control_stream_request::Request::CreatePartialGraph(
940 CreatePartialGraphRequest {
941 database_id: database_id.database_id,
942 partial_graph_id,
943 },
944 ),
945 ),
946 }).is_err() {
947 warn!(%database_id, ?creating_job_id, worker_id = node.worker_id, "fail to add partial graph to worker")
948 }
949 });
950 }
951
952 pub(super) fn remove_partial_graph(
953 &mut self,
954 database_id: DatabaseId,
955 creating_job_ids: Vec<TableId>,
956 ) {
957 if creating_job_ids.is_empty() {
958 return;
959 }
960 let partial_graph_ids = creating_job_ids
961 .into_iter()
962 .map(|job_id| to_partial_graph_id(Some(job_id)))
963 .collect_vec();
964 self.connected_nodes.iter().for_each(|(_, node)| {
965 if node.handle
966 .request_sender
967 .send(StreamingControlStreamRequest {
968 request: Some(
969 streaming_control_stream_request::Request::RemovePartialGraph(
970 RemovePartialGraphRequest {
971 partial_graph_ids: partial_graph_ids.clone(),
972 database_id: database_id.database_id,
973 },
974 ),
975 ),
976 })
977 .is_err()
978 {
979 warn!(worker_id = node.worker_id,node = ?node.host,"failed to send remove partial graph request");
980 }
981 })
982 }
983
984 pub(super) fn reset_database(
985 &mut self,
986 database_id: DatabaseId,
987 reset_request_id: u32,
988 ) -> HashSet<WorkerId> {
989 self.connected_nodes
990 .iter()
991 .filter_map(|(worker_id, node)| {
992 if node
993 .handle
994 .request_sender
995 .send(StreamingControlStreamRequest {
996 request: Some(streaming_control_stream_request::Request::ResetDatabase(
997 ResetDatabaseRequest {
998 database_id: database_id.database_id,
999 reset_request_id,
1000 },
1001 )),
1002 })
1003 .is_err()
1004 {
1005 warn!(worker_id, node = ?node.host,"failed to send reset database request");
1006 None
1007 } else {
1008 Some(*worker_id)
1009 }
1010 })
1011 .collect()
1012 }
1013}
1014
1015impl GlobalBarrierWorkerContextImpl {
1016 pub(super) async fn new_control_stream_impl(
1017 &self,
1018 node: &WorkerNode,
1019 init_request: &PbInitRequest,
1020 ) -> MetaResult<StreamingControlHandle> {
1021 let handle = self
1022 .env
1023 .stream_client_pool()
1024 .get(node)
1025 .await?
1026 .start_streaming_control(init_request.clone())
1027 .await?;
1028 Ok(handle)
1029 }
1030}
1031
1032pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1033 message: &str,
1034 errors: impl IntoIterator<Item = (WorkerId, E)>,
1035) -> MetaError {
1036 use std::error::request_value;
1037 use std::fmt::Write;
1038
1039 use risingwave_common::error::tonic::extra::Score;
1040
1041 let errors = errors.into_iter().collect_vec();
1042
1043 if errors.is_empty() {
1044 return anyhow!(message.to_owned()).into();
1045 }
1046
1047 let single_error = |(worker_id, e)| {
1049 anyhow::Error::from(e)
1050 .context(format!("{message}, in worker node {worker_id}"))
1051 .into()
1052 };
1053
1054 if errors.len() == 1 {
1055 return single_error(errors.into_iter().next().unwrap());
1056 }
1057
1058 let max_score = errors
1060 .iter()
1061 .filter_map(|(_, e)| request_value::<Score>(e))
1062 .max();
1063
1064 if let Some(max_score) = max_score {
1065 let mut errors = errors;
1066 let max_scored = errors
1067 .extract_if(.., |(_, e)| request_value::<Score>(e) == Some(max_score))
1068 .next()
1069 .unwrap();
1070
1071 return single_error(max_scored);
1072 }
1073
1074 let concat: String = errors
1076 .into_iter()
1077 .fold(format!("{message}: "), |mut s, (w, e)| {
1078 write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1079 s
1080 });
1081 anyhow!(concat).into()
1082}