1use std::cmp::max;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::error::Error;
19use std::fmt::{Debug, Formatter};
20use std::future::poll_fn;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::StreamExt;
27use futures::future::join_all;
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, TableId};
30use risingwave_common::util::epoch::Epoch;
31use risingwave_common::util::tracing::TracingContext;
32use risingwave_connector::source::SplitImpl;
33use risingwave_meta_model::WorkerId;
34use risingwave_pb::common::{HostAddress, WorkerNode};
35use risingwave_pb::hummock::HummockVersionStats;
36use risingwave_pb::stream_plan::barrier_mutation::Mutation;
37use risingwave_pb::stream_plan::{
38 AddMutation, Barrier, BarrierMutation, FragmentTypeFlag, SubscriptionUpstreamInfo,
39};
40use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
41use risingwave_pb::stream_service::inject_barrier_request::{
42 BuildActorInfo, FragmentBuildActorInfo,
43};
44use risingwave_pb::stream_service::streaming_control_stream_request::{
45 CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph,
46 RemovePartialGraphRequest, ResetDatabaseRequest,
47};
48use risingwave_pb::stream_service::{
49 BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
50 streaming_control_stream_request, streaming_control_stream_response,
51};
52use risingwave_rpc_client::StreamingControlHandle;
53use thiserror_ext::AsReport;
54use tokio::time::sleep;
55use tokio_retry::strategy::ExponentialBackoff;
56use tracing::{debug, error, info, warn};
57use uuid::Uuid;
58
59use super::{BarrierKind, Command, InflightSubscriptionInfo, TracedEpoch};
60use crate::barrier::checkpoint::{
61 BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
62};
63use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
64use crate::barrier::edge_builder::FragmentEdgeBuildResult;
65use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo};
66use crate::barrier::progress::CreateMviewProgressTracker;
67use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
68use crate::controller::fragment::InflightFragmentInfo;
69use crate::manager::MetaSrvEnv;
70use crate::model::{ActorId, StreamActor, StreamJobActorsToCreate, StreamJobFragments};
71use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
72use crate::{MetaError, MetaResult};
73
74fn to_partial_graph_id(job_id: Option<TableId>) -> u32 {
75 job_id
76 .map(|table| {
77 assert_ne!(table.table_id, u32::MAX);
78 table.table_id
79 })
80 .unwrap_or(u32::MAX)
81}
82
83pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<TableId> {
84 if partial_graph_id == u32::MAX {
85 None
86 } else {
87 Some(TableId::new(partial_graph_id))
88 }
89}
90
91struct ControlStreamNode {
92 worker_id: WorkerId,
93 host: HostAddress,
94 handle: StreamingControlHandle,
95}
96
97pub(super) struct ControlStreamManager {
98 connected_nodes: HashMap<WorkerId, ControlStreamNode>,
99 workers: HashMap<WorkerId, WorkerNode>,
100 env: MetaSrvEnv,
101}
102
103impl ControlStreamManager {
104 pub(super) fn new(env: MetaSrvEnv) -> Self {
105 Self {
106 connected_nodes: Default::default(),
107 workers: Default::default(),
108 env,
109 }
110 }
111
112 pub(super) fn is_connected(&self, worker_id: WorkerId) -> bool {
113 self.connected_nodes.contains_key(&worker_id)
114 }
115
116 pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
117 self.workers[&worker_id].host.clone().unwrap()
118 }
119
120 pub(super) async fn try_reconnect_worker(
121 &mut self,
122 worker_id: WorkerId,
123 inflight_infos: impl Iterator<
124 Item = (
125 DatabaseId,
126 &InflightSubscriptionInfo,
127 impl Iterator<Item = TableId>,
128 ),
129 >,
130 term_id: String,
131 context: &impl GlobalBarrierWorkerContext,
132 ) {
133 if self.connected_nodes.contains_key(&worker_id) {
134 warn!(worker_id, "node already connected");
135 return;
136 }
137 let node = &self.workers[&worker_id];
138 let node_host = node.host.as_ref().unwrap();
139
140 let init_request = Self::collect_init_request(inflight_infos, term_id);
141 match context.new_control_stream(node, &init_request).await {
142 Ok(handle) => {
143 assert!(
144 self.connected_nodes
145 .insert(
146 worker_id,
147 ControlStreamNode {
148 worker_id,
149 host: node.host.clone().unwrap(),
150 handle,
151 }
152 )
153 .is_none()
154 );
155 info!(?node_host, "add control stream worker");
156 }
157 Err(e) => {
158 error!(err = %e.as_report(), ?node_host, "fail to create worker node");
159 }
160 }
161 }
162
163 pub(super) async fn add_worker(
164 &mut self,
165 node: WorkerNode,
166 inflight_infos: impl Iterator<
167 Item = (
168 DatabaseId,
169 &InflightSubscriptionInfo,
170 impl Iterator<Item = TableId>,
171 ),
172 >,
173 term_id: String,
174 context: &impl GlobalBarrierWorkerContext,
175 ) {
176 let node_id = node.id as WorkerId;
177 let node = match self.workers.entry(node_id) {
178 Entry::Occupied(entry) => {
179 let entry = entry.into_mut();
180 assert_eq!(entry.host, node.host);
181 warn!(id = node.id, host = ?node.host, "node already exists");
182 &*entry
183 }
184 Entry::Vacant(entry) => &*entry.insert(node),
185 };
186 if self.connected_nodes.contains_key(&node_id) {
187 warn!(id = node.id, host = ?node.host, "new node already connected");
188 return;
189 }
190 let node_host = node.host.clone().unwrap();
191 let mut backoff = ExponentialBackoff::from_millis(100)
192 .max_delay(Duration::from_secs(3))
193 .factor(5);
194 let init_request = Self::collect_init_request(inflight_infos, term_id);
195 const MAX_RETRY: usize = 5;
196 for i in 1..=MAX_RETRY {
197 match context.new_control_stream(node, &init_request).await {
198 Ok(handle) => {
199 assert!(
200 self.connected_nodes
201 .insert(
202 node_id,
203 ControlStreamNode {
204 worker_id: node.id as _,
205 host: node.host.clone().unwrap(),
206 handle,
207 }
208 )
209 .is_none()
210 );
211 info!(?node_host, "add control stream worker");
212 return;
213 }
214 Err(e) => {
215 let delay = backoff.next().unwrap();
218 error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
219 sleep(delay).await;
220 }
221 }
222 }
223 error!(?node_host, "fail to create worker node after retry");
224 }
225
226 pub(super) async fn reset(
227 &mut self,
228 nodes: &HashMap<WorkerId, WorkerNode>,
229 term_id: String,
230 context: &impl GlobalBarrierWorkerContext,
231 ) -> HashSet<WorkerId> {
232 let init_request = PbInitRequest {
233 databases: vec![],
234 term_id,
235 };
236 let init_request = &init_request;
237 self.workers = nodes.clone();
238 let nodes = join_all(nodes.iter().map(|(worker_id, node)| async move {
239 let result = context.new_control_stream(node, init_request).await;
240 (*worker_id, node.clone(), result)
241 }))
242 .await;
243 self.connected_nodes.clear();
244 let mut failed_workers = HashSet::new();
245 for (worker_id, node, result) in nodes {
246 match result {
247 Ok(handle) => {
248 assert!(
249 self.connected_nodes
250 .insert(
251 worker_id,
252 ControlStreamNode {
253 worker_id: node.id as _,
254 host: node.host.clone().unwrap(),
255 handle
256 }
257 )
258 .is_none()
259 );
260 }
261 Err(e) => {
262 failed_workers.insert(worker_id);
263 warn!(
264 e = %e.as_report(),
265 worker_id,
266 ?node,
267 "failed to connect to node"
268 )
269 }
270 }
271 }
272
273 failed_workers
274 }
275
276 pub(super) fn clear(&mut self) {
278 *self = Self::new(self.env.clone());
279 }
280
281 fn poll_next_response(
282 &mut self,
283 cx: &mut Context<'_>,
284 ) -> Poll<(
285 WorkerId,
286 MetaResult<streaming_control_stream_response::Response>,
287 )> {
288 if self.connected_nodes.is_empty() {
289 return Poll::Pending;
290 }
291 let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending;
292 {
293 for (worker_id, node) in &mut self.connected_nodes {
294 match node.handle.response_stream.poll_next_unpin(cx) {
295 Poll::Ready(result) => {
296 poll_result = Poll::Ready((
297 *worker_id,
298 result
299 .ok_or_else(|| anyhow!("end of stream").into())
300 .and_then(|result| {
301 result.map_err(Into::<MetaError>::into).and_then(|resp| {
302 match resp
303 .response
304 .ok_or_else(||anyhow!("empty response"))?
305 {
306 streaming_control_stream_response::Response::Shutdown(_) => Err(anyhow!(
307 "worker node {worker_id} is shutting down"
308 )
309 .into()),
310 streaming_control_stream_response::Response::Init(_) => {
311 Err(anyhow!("get unexpected init response").into())
313 }
314 resp => Ok(resp),
315 }
316 })
317 })
318 ));
319 break;
320 }
321 Poll::Pending => {
322 continue;
323 }
324 }
325 }
326 };
327
328 if let Poll::Ready((worker_id, Err(err))) = &poll_result {
329 let node = self
330 .connected_nodes
331 .remove(worker_id)
332 .expect("should exist when get shutdown resp");
333 warn!(worker_id = node.worker_id, host = ?node.host, err = %err.as_report(), "get error from response stream");
334 }
335
336 poll_result
337 }
338
339 pub(super) async fn next_response(
340 &mut self,
341 ) -> (
342 WorkerId,
343 MetaResult<streaming_control_stream_response::Response>,
344 ) {
345 poll_fn(|cx| self.poll_next_response(cx)).await
346 }
347
348 fn collect_init_request(
349 initial_inflight_infos: impl Iterator<
350 Item = (
351 DatabaseId,
352 &InflightSubscriptionInfo,
353 impl Iterator<Item = TableId>,
354 ),
355 >,
356 term_id: String,
357 ) -> PbInitRequest {
358 PbInitRequest {
359 databases: initial_inflight_infos
360 .map(|(database_id, subscriptions, creating_job_ids)| {
361 let mut graphs = vec![PbInitialPartialGraph {
362 partial_graph_id: to_partial_graph_id(None),
363 subscriptions: subscriptions.into_iter().collect_vec(),
364 }];
365 graphs.extend(creating_job_ids.map(|job_id| PbInitialPartialGraph {
366 partial_graph_id: to_partial_graph_id(Some(job_id)),
367 subscriptions: vec![],
368 }));
369 PbDatabaseInitialPartialGraph {
370 database_id: database_id.database_id,
371 graphs,
372 }
373 })
374 .collect(),
375 term_id,
376 }
377 }
378}
379
380pub(super) struct DatabaseInitialBarrierCollector {
381 database_id: DatabaseId,
382 node_to_collect: NodeToCollect,
383 database_state: BarrierWorkerState,
384 create_mview_tracker: CreateMviewProgressTracker,
385 creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
386 committed_epoch: u64,
387}
388
389impl Debug for DatabaseInitialBarrierCollector {
390 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
391 f.debug_struct("DatabaseInitialBarrierCollector")
392 .field("database_id", &self.database_id)
393 .field("node_to_collect", &self.node_to_collect)
394 .finish()
395 }
396}
397
398impl DatabaseInitialBarrierCollector {
399 pub(super) fn is_collected(&self) -> bool {
400 self.node_to_collect.is_empty()
401 && self
402 .creating_streaming_job_controls
403 .values()
404 .all(|job| job.is_empty())
405 }
406
407 pub(super) fn database_state(
408 &self,
409 ) -> (
410 &BarrierWorkerState,
411 &HashMap<TableId, CreatingStreamingJobControl>,
412 ) {
413 (&self.database_state, &self.creating_streaming_job_controls)
414 }
415
416 pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
417 assert_eq!(self.database_id.database_id, resp.database_id);
418 if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
419 self.creating_streaming_job_controls
420 .get_mut(&creating_job_id)
421 .expect("should exist")
422 .collect(resp);
423 } else {
424 assert_eq!(resp.epoch, self.committed_epoch);
425 assert!(
426 self.node_to_collect
427 .remove(&(resp.worker_id as _))
428 .is_some()
429 );
430 }
431 }
432
433 pub(super) fn finish(self) -> DatabaseCheckpointControl {
434 assert!(self.is_collected());
435 DatabaseCheckpointControl::recovery(
436 self.database_id,
437 self.create_mview_tracker,
438 self.database_state,
439 self.committed_epoch,
440 self.creating_streaming_job_controls,
441 )
442 }
443
444 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
445 is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
446 && self
447 .creating_streaming_job_controls
448 .values_mut()
449 .all(|job| job.is_valid_after_worker_err(worker_id))
450 }
451}
452
453impl ControlStreamManager {
454 #[expect(clippy::too_many_arguments)]
456 pub(super) fn inject_database_initial_barrier(
457 &mut self,
458 database_id: DatabaseId,
459 jobs: HashMap<TableId, InflightStreamingJobInfo>,
460 state_table_committed_epochs: &mut HashMap<TableId, u64>,
461 state_table_log_epochs: &mut HashMap<TableId, Vec<Vec<u64>>>,
462 edges: &mut FragmentEdgeBuildResult,
463 stream_actors: &HashMap<ActorId, StreamActor>,
464 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
465 background_jobs: &mut HashMap<TableId, (String, StreamJobFragments)>,
466 mut subscription_info: InflightSubscriptionInfo,
467 is_paused: bool,
468 hummock_version_stats: &HummockVersionStats,
469 ) -> MetaResult<DatabaseInitialBarrierCollector> {
470 self.add_partial_graph(database_id, None);
471 let source_split_assignments = jobs
472 .values()
473 .flat_map(|job| job.fragment_infos())
474 .flat_map(|info| info.actors.keys())
475 .filter_map(|actor_id| {
476 let actor_id = *actor_id as ActorId;
477 source_splits
478 .remove(&actor_id)
479 .map(|splits| (actor_id, splits))
480 })
481 .collect();
482 let mutation = Mutation::Add(AddMutation {
483 actor_dispatchers: Default::default(),
485 added_actors: Default::default(),
486 actor_splits: build_actor_connector_splits(&source_split_assignments),
487 pause: is_paused,
488 subscriptions_to_add: Default::default(),
489 });
490
491 fn resolve_jobs_committed_epoch(
492 state_table_committed_epochs: &mut HashMap<TableId, u64>,
493 jobs: impl IntoIterator<Item = &InflightStreamingJobInfo>,
494 ) -> u64 {
495 let mut epochs = jobs
496 .into_iter()
497 .flat_map(InflightStreamingJobInfo::existing_table_ids)
498 .map(|table_id| {
499 (
500 table_id,
501 state_table_committed_epochs
502 .remove(&table_id)
503 .expect("should exist"),
504 )
505 });
506 let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
507 for (table_id, epoch) in epochs {
508 assert_eq!(
509 prev_epoch, epoch,
510 "{} has different committed epoch to {}",
511 first_table_id, table_id
512 );
513 }
514 prev_epoch
515 }
516
517 let mut database_jobs = HashMap::new();
518 let mut snapshot_backfill_jobs = HashMap::new();
519 let mut background_mviews = HashMap::new();
520
521 for (job_id, job) in jobs {
522 if let Some((definition, stream_job_fragments)) = background_jobs.remove(&job_id) {
523 if stream_job_fragments.fragments().any(|fragment| {
524 (fragment.fragment_type_mask
525 & (FragmentTypeFlag::SnapshotBackfillStreamScan as u32))
526 != 0
527 }) {
528 debug!(%job_id, definition, "recovered snapshot backfill job");
529 snapshot_backfill_jobs.insert(job_id, (job, definition, stream_job_fragments));
530 } else {
531 database_jobs.insert(job_id, job);
532 background_mviews.insert(job_id, (definition, stream_job_fragments));
533 }
534 } else {
535 database_jobs.insert(job_id, job);
536 }
537 }
538
539 let database_job_log_epochs: HashMap<_, _> = database_jobs
540 .keys()
541 .filter_map(|job_id| {
542 state_table_log_epochs
543 .remove(job_id)
544 .map(|epochs| (*job_id, epochs))
545 })
546 .collect();
547
548 let prev_epoch =
549 resolve_jobs_committed_epoch(state_table_committed_epochs, database_jobs.values());
550 let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
551 let curr_epoch = prev_epoch.next();
553 let barrier_info = BarrierInfo {
554 prev_epoch,
555 curr_epoch,
556 kind: BarrierKind::Initial,
557 };
558
559 let mut ongoing_snapshot_backfill_jobs: HashMap<TableId, _> = HashMap::new();
560 for (job_id, (info, definition, stream_job_fragments)) in snapshot_backfill_jobs {
561 let committed_epoch =
562 resolve_jobs_committed_epoch(state_table_committed_epochs, [&info]);
563 if committed_epoch == barrier_info.prev_epoch() {
564 info!(
565 "recovered creating snapshot backfill job {} catch up with upstream already",
566 job_id
567 );
568 background_mviews
569 .try_insert(job_id, (definition, stream_job_fragments))
570 .expect("non-duplicate");
571 database_jobs
572 .try_insert(job_id, info)
573 .expect("non-duplicate");
574 continue;
575 }
576 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
577 stream_job_fragments
578 .fragments()
579 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
580 )?
581 .0
582 .ok_or_else(|| {
583 anyhow!(
584 "recovered snapshot backfill job {} has no snapshot backfill info",
585 job_id
586 )
587 })?;
588 let mut snapshot_epoch = None;
589 let upstream_table_ids: HashSet<_> = snapshot_backfill_info
590 .upstream_mv_table_id_to_backfill_epoch
591 .keys()
592 .cloned()
593 .collect();
594 for (upstream_table_id, epoch) in
595 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
596 {
597 let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
598 let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
599 if *snapshot_epoch != epoch {
600 return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
601 }
602 }
603 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
604 anyhow!(
605 "snapshot backfill job {} has not set snapshot epoch",
606 job_id
607 )
608 })?;
609 for upstream_table_id in &upstream_table_ids {
610 subscription_info
611 .mv_depended_subscriptions
612 .entry(*upstream_table_id)
613 .or_default()
614 .try_insert(job_id.into(), max(snapshot_epoch, committed_epoch))
615 .expect("non-duplicate");
616 }
617 ongoing_snapshot_backfill_jobs
618 .try_insert(
619 job_id,
620 (
621 info,
622 definition,
623 stream_job_fragments,
624 upstream_table_ids,
625 committed_epoch,
626 snapshot_epoch,
627 ),
628 )
629 .expect("non-duplicated");
630 }
631
632 let node_to_collect = {
633 let node_actors = edges.collect_actors_to_create(database_jobs.values().flatten().map(
634 move |fragment_info| {
635 (
636 fragment_info.fragment_id,
637 &fragment_info.nodes,
638 fragment_info.actors.iter().map(move |(actor_id, actor)| {
639 (
640 stream_actors.get(actor_id).expect("should exist"),
641 actor.worker_id,
642 )
643 }),
644 )
645 },
646 ));
647
648 let node_to_collect = self.inject_barrier(
649 database_id,
650 None,
651 Some(mutation.clone()),
652 &barrier_info,
653 database_jobs.values().flatten(),
654 database_jobs.values().flatten(),
655 Some(node_actors),
656 (&subscription_info).into_iter().collect(),
657 vec![],
658 )?;
659 debug!(
660 ?node_to_collect,
661 database_id = database_id.database_id,
662 "inject initial barrier"
663 );
664 node_to_collect
665 };
666
667 let tracker = CreateMviewProgressTracker::recover(
668 background_mviews
669 .iter()
670 .map(|(table_id, (definition, stream_job_fragments))| {
671 (*table_id, (definition.clone(), stream_job_fragments))
672 }),
673 hummock_version_stats,
674 );
675
676 let mut creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl> =
677 HashMap::new();
678 for (
679 job_id,
680 (
681 info,
682 definition,
683 stream_job_fragments,
684 upstream_table_ids,
685 committed_epoch,
686 snapshot_epoch,
687 ),
688 ) in ongoing_snapshot_backfill_jobs
689 {
690 let node_actors =
691 edges.collect_actors_to_create(info.fragment_infos().map(move |fragment_info| {
692 (
693 fragment_info.fragment_id,
694 &fragment_info.nodes,
695 fragment_info.actors.iter().map(move |(actor_id, actor)| {
696 (
697 stream_actors.get(actor_id).expect("should exist"),
698 actor.worker_id,
699 )
700 }),
701 )
702 }));
703
704 creating_streaming_job_controls.insert(
705 job_id,
706 CreatingStreamingJobControl::recover(
707 database_id,
708 job_id,
709 definition,
710 upstream_table_ids,
711 &database_job_log_epochs,
712 snapshot_epoch,
713 committed_epoch,
714 barrier_info.curr_epoch.value().0,
715 info,
716 stream_job_fragments,
717 hummock_version_stats,
718 node_actors,
719 mutation.clone(),
720 self,
721 )?,
722 );
723 }
724
725 let committed_epoch = barrier_info.prev_epoch();
726 let new_epoch = barrier_info.curr_epoch;
727 let mut database = InflightDatabaseInfo::empty();
728 database_jobs
729 .into_values()
730 .for_each(|job| database.extend(job));
731 let database_state =
732 BarrierWorkerState::recovery(new_epoch, database, subscription_info, is_paused);
733 Ok(DatabaseInitialBarrierCollector {
734 database_id,
735 node_to_collect,
736 database_state,
737 create_mview_tracker: tracker,
738 creating_streaming_job_controls,
739 committed_epoch,
740 })
741 }
742
743 pub(super) fn inject_command_ctx_barrier(
744 &mut self,
745 database_id: DatabaseId,
746 command: Option<&Command>,
747 barrier_info: &BarrierInfo,
748 is_paused: bool,
749 pre_applied_graph_info: &InflightDatabaseInfo,
750 applied_graph_info: &InflightDatabaseInfo,
751 edges: &mut Option<FragmentEdgeBuildResult>,
752 ) -> MetaResult<NodeToCollect> {
753 let mutation = command.and_then(|c| c.to_mutation(is_paused, edges, self));
754 let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation {
755 add.subscriptions_to_add.clone()
756 } else {
757 vec![]
758 };
759 let subscriptions_to_remove = if let Some(Mutation::DropSubscriptions(drop)) = &mutation {
760 drop.info.clone()
761 } else {
762 vec![]
763 };
764 self.inject_barrier(
765 database_id,
766 None,
767 mutation,
768 barrier_info,
769 pre_applied_graph_info.fragment_infos(),
770 applied_graph_info.fragment_infos(),
771 command
772 .as_ref()
773 .map(|command| command.actors_to_create(pre_applied_graph_info, edges, self))
774 .unwrap_or_default(),
775 subscriptions_to_add,
776 subscriptions_to_remove,
777 )
778 }
779
780 pub(super) fn inject_barrier<'a>(
781 &mut self,
782 database_id: DatabaseId,
783 creating_table_id: Option<TableId>,
784 mutation: Option<Mutation>,
785 barrier_info: &BarrierInfo,
786 pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
787 applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
788 mut new_actors: Option<StreamJobActorsToCreate>,
789 subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
790 subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
791 ) -> MetaResult<NodeToCollect> {
792 fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
793 "inject_barrier_err"
794 ));
795
796 let partial_graph_id = to_partial_graph_id(creating_table_id);
797
798 let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
799
800 for worker_id in node_actors.keys() {
801 if !self.connected_nodes.contains_key(worker_id) {
802 return Err(anyhow!("unconnected worker node {}", worker_id).into());
803 }
804 }
805
806 let table_ids_to_sync: HashSet<_> =
807 InflightFragmentInfo::existing_table_ids(applied_graph_info)
808 .map(|table_id| table_id.table_id)
809 .collect();
810
811 let mut node_need_collect = HashMap::new();
812
813 self.connected_nodes
814 .iter()
815 .try_for_each(|(node_id, node)| {
816 let actor_ids_to_collect = node_actors
817 .get(node_id)
818 .map(|actors| actors.iter().cloned())
819 .into_iter()
820 .flatten()
821 .collect_vec();
822 let is_empty = actor_ids_to_collect.is_empty();
823 {
824 let mutation = mutation.clone();
825 let barrier = Barrier {
826 epoch: Some(risingwave_pb::data::Epoch {
827 curr: barrier_info.curr_epoch.value().0,
828 prev: barrier_info.prev_epoch(),
829 }),
830 mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
831 tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
832 .to_protobuf(),
833 kind: barrier_info.kind.to_protobuf() as i32,
834 passed_actors: vec![],
835 };
836
837 node.handle
838 .request_sender
839 .send(StreamingControlStreamRequest {
840 request: Some(
841 streaming_control_stream_request::Request::InjectBarrier(
842 InjectBarrierRequest {
843 request_id: Uuid::new_v4().to_string(),
844 barrier: Some(barrier),
845 database_id: database_id.database_id,
846 actor_ids_to_collect,
847 table_ids_to_sync: table_ids_to_sync
848 .iter()
849 .cloned()
850 .collect(),
851 partial_graph_id,
852 actors_to_build: new_actors
853 .as_mut()
854 .map(|new_actors| new_actors.remove(&(*node_id as _)))
855 .into_iter()
856 .flatten()
857 .flatten()
858 .map(|(fragment_id, (node, actors))| {
859 FragmentBuildActorInfo {
860 fragment_id,
861 node: Some(node),
862 actors: actors
863 .into_iter()
864 .map(|(actor, upstreams, dispatchers)| {
865 BuildActorInfo {
866 actor_id: actor.actor_id,
867 fragment_upstreams: upstreams
868 .into_iter()
869 .map(|(fragment_id, upstreams)| {
870 (
871 fragment_id,
872 UpstreamActors {
873 actors: upstreams
874 .into_values()
875 .collect(),
876 },
877 )
878 })
879 .collect(),
880 dispatchers,
881 vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
882 mview_definition: actor.mview_definition,
883 expr_context: actor.expr_context,
884 }
885 })
886 .collect(),
887 }
888 })
889 .collect(),
890 subscriptions_to_add: subscriptions_to_add.clone(),
891 subscriptions_to_remove: subscriptions_to_remove.clone(),
892 },
893 ),
894 ),
895 })
896 .map_err(|_| {
897 MetaError::from(anyhow!(
898 "failed to send request to {} {:?}",
899 node.worker_id,
900 node.host
901 ))
902 })?;
903
904 node_need_collect.insert(*node_id as WorkerId, is_empty);
905 Result::<_, MetaError>::Ok(())
906 }
907 })
908 .inspect_err(|e| {
909 use risingwave_pb::meta::event_log;
911 let event = event_log::EventInjectBarrierFail {
912 prev_epoch: barrier_info.prev_epoch(),
913 cur_epoch: barrier_info.curr_epoch.value().0,
914 error: e.to_report_string(),
915 };
916 self.env
917 .event_log_manager_ref()
918 .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
919 })?;
920 Ok(node_need_collect)
921 }
922
923 pub(super) fn add_partial_graph(
924 &mut self,
925 database_id: DatabaseId,
926 creating_job_id: Option<TableId>,
927 ) {
928 let partial_graph_id = to_partial_graph_id(creating_job_id);
929 self.connected_nodes.iter().for_each(|(_, node)| {
930 if node
931 .handle
932 .request_sender
933 .send(StreamingControlStreamRequest {
934 request: Some(
935 streaming_control_stream_request::Request::CreatePartialGraph(
936 CreatePartialGraphRequest {
937 database_id: database_id.database_id,
938 partial_graph_id,
939 },
940 ),
941 ),
942 }).is_err() {
943 warn!(%database_id, ?creating_job_id, worker_id = node.worker_id, "fail to add partial graph to worker")
944 }
945 });
946 }
947
948 pub(super) fn remove_partial_graph(
949 &mut self,
950 database_id: DatabaseId,
951 creating_job_ids: Vec<TableId>,
952 ) {
953 if creating_job_ids.is_empty() {
954 return;
955 }
956 let partial_graph_ids = creating_job_ids
957 .into_iter()
958 .map(|job_id| to_partial_graph_id(Some(job_id)))
959 .collect_vec();
960 self.connected_nodes.iter().for_each(|(_, node)| {
961 if node.handle
962 .request_sender
963 .send(StreamingControlStreamRequest {
964 request: Some(
965 streaming_control_stream_request::Request::RemovePartialGraph(
966 RemovePartialGraphRequest {
967 partial_graph_ids: partial_graph_ids.clone(),
968 database_id: database_id.database_id,
969 },
970 ),
971 ),
972 })
973 .is_err()
974 {
975 warn!(worker_id = node.worker_id,node = ?node.host,"failed to send remove partial graph request");
976 }
977 })
978 }
979
980 pub(super) fn reset_database(
981 &mut self,
982 database_id: DatabaseId,
983 reset_request_id: u32,
984 ) -> HashSet<WorkerId> {
985 self.connected_nodes
986 .iter()
987 .filter_map(|(worker_id, node)| {
988 if node
989 .handle
990 .request_sender
991 .send(StreamingControlStreamRequest {
992 request: Some(streaming_control_stream_request::Request::ResetDatabase(
993 ResetDatabaseRequest {
994 database_id: database_id.database_id,
995 reset_request_id,
996 },
997 )),
998 })
999 .is_err()
1000 {
1001 warn!(worker_id, node = ?node.host,"failed to send reset database request");
1002 None
1003 } else {
1004 Some(*worker_id)
1005 }
1006 })
1007 .collect()
1008 }
1009}
1010
1011impl GlobalBarrierWorkerContextImpl {
1012 pub(super) async fn new_control_stream_impl(
1013 &self,
1014 node: &WorkerNode,
1015 init_request: &PbInitRequest,
1016 ) -> MetaResult<StreamingControlHandle> {
1017 let handle = self
1018 .env
1019 .stream_client_pool()
1020 .get(node)
1021 .await?
1022 .start_streaming_control(init_request.clone())
1023 .await?;
1024 Ok(handle)
1025 }
1026}
1027
1028pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1029 message: &str,
1030 errors: impl IntoIterator<Item = (WorkerId, E)>,
1031) -> MetaError {
1032 use std::error::request_value;
1033 use std::fmt::Write;
1034
1035 use risingwave_common::error::tonic::extra::Score;
1036
1037 let errors = errors.into_iter().collect_vec();
1038
1039 if errors.is_empty() {
1040 return anyhow!(message.to_owned()).into();
1041 }
1042
1043 let single_error = |(worker_id, e)| {
1045 anyhow::Error::from(e)
1046 .context(format!("{message}, in worker node {worker_id}"))
1047 .into()
1048 };
1049
1050 if errors.len() == 1 {
1051 return single_error(errors.into_iter().next().unwrap());
1052 }
1053
1054 let max_score = errors
1056 .iter()
1057 .filter_map(|(_, e)| request_value::<Score>(e))
1058 .max();
1059
1060 if let Some(max_score) = max_score {
1061 let mut errors = errors;
1062 let max_scored = errors
1063 .extract_if(.., |(_, e)| request_value::<Score>(e) == Some(max_score))
1064 .next()
1065 .unwrap();
1066
1067 return single_error(max_scored);
1068 }
1069
1070 let concat: String = errors
1072 .into_iter()
1073 .fold(format!("{message}: "), |mut s, (w, e)| {
1074 write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1075 s
1076 });
1077 anyhow!(concat).into()
1078}