risingwave_meta/barrier/
mod.rsuse std::collections::HashMap;
use anyhow::anyhow;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_connector::source::SplitImpl;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PbRecoveryStatus;
use risingwave_pb::stream_plan::StreamActor;
use tokio::sync::oneshot::Sender;
use self::notifier::Notifier;
use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo};
use crate::manager::ActiveStreamingWorkerNodes;
use crate::model::{ActorId, StreamJobFragments};
use crate::{MetaError, MetaResult};
mod checkpoint;
mod command;
mod complete_task;
mod context;
mod info;
mod manager;
mod notifier;
mod progress;
mod rpc;
mod schedule;
mod trace;
mod utils;
mod worker;
pub use self::command::{
BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceTablePlan,
Reschedule, SnapshotBackfillInfo,
};
pub use self::info::InflightSubscriptionInfo;
pub use self::manager::{BarrierManagerRef, GlobalBarrierManager};
pub use self::schedule::BarrierScheduler;
pub use self::trace::TracedEpoch;
enum RecoveryReason {
Bootstrap,
Failover(MetaError),
Adhoc,
}
enum BarrierManagerStatus {
Starting,
Recovering(RecoveryReason),
Running,
}
struct Scheduled {
database_id: DatabaseId,
command: Command,
notifiers: Vec<Notifier>,
span: tracing::Span,
}
impl From<&BarrierManagerStatus> for PbRecoveryStatus {
fn from(status: &BarrierManagerStatus) -> Self {
match status {
BarrierManagerStatus::Starting => Self::StatusStarting,
BarrierManagerStatus::Recovering(reason) => match reason {
RecoveryReason::Bootstrap => Self::StatusStarting,
RecoveryReason::Failover(_) | RecoveryReason::Adhoc => Self::StatusRecovering,
},
BarrierManagerStatus::Running => Self::StatusRunning,
}
}
}
pub(crate) enum BarrierManagerRequest {
GetDdlProgress(Sender<HashMap<u32, DdlProgress>>),
}
#[derive(Debug)]
struct BarrierWorkerRuntimeInfoSnapshot {
active_streaming_nodes: ActiveStreamingWorkerNodes,
database_fragment_infos: HashMap<DatabaseId, InflightDatabaseInfo>,
state_table_committed_epochs: HashMap<TableId, u64>,
subscription_infos: HashMap<DatabaseId, InflightSubscriptionInfo>,
stream_actors: HashMap<ActorId, StreamActor>,
source_splits: HashMap<ActorId, Vec<SplitImpl>>,
background_jobs: HashMap<TableId, (String, StreamJobFragments)>,
hummock_version_stats: HummockVersionStats,
}
impl BarrierWorkerRuntimeInfoSnapshot {
fn validate(&self) -> MetaResult<()> {
for (database_id, database_info) in &self.database_fragment_infos {
for fragment in database_info.fragment_infos() {
for (actor_id, worker_id) in &fragment.actors {
if !self
.active_streaming_nodes
.current()
.contains_key(worker_id)
{
return Err(anyhow!(
"worker_id {} of actor {} do not exist",
worker_id,
actor_id
)
.into());
}
if !self.stream_actors.contains_key(actor_id) {
return Err(anyhow!("cannot find StreamActor of actor {}", actor_id).into());
}
}
for state_table_id in &fragment.state_table_ids {
if !self
.state_table_committed_epochs
.contains_key(state_table_id)
{
return Err(anyhow!(
"state table {} is not registered to hummock",
state_table_id
)
.into());
}
}
}
let mut committed_epochs = database_info.existing_table_ids().map(|table_id| {
(
table_id,
*self
.state_table_committed_epochs
.get(&table_id)
.expect("checked exist"),
)
});
let (first_table, first_epoch) = committed_epochs.next().ok_or_else(|| {
anyhow!("database {} has no state table after recovery", database_id)
})?;
for (table_id, epoch) in committed_epochs {
if epoch != first_epoch {
return Err(anyhow!(
"database {} has tables with different table ids. {}:{}, {}:{}",
database_id,
first_table,
first_epoch,
table_id,
epoch
)
.into());
}
}
}
Ok(())
}
}