risingwave_meta_model/
worker.rs
1use risingwave_pb::common::worker_node::PbState;
16use risingwave_pb::common::{PbWorkerNode, PbWorkerType};
17use sea_orm::ActiveValue::Set;
18use sea_orm::entity::prelude::*;
19use serde::{Deserialize, Serialize};
20
21use crate::{TransactionId, WorkerId};
22
23#[derive(Clone, Debug, Hash, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
24#[sea_orm(rs_type = "String", db_type = "string(None)")]
25pub enum WorkerType {
26 #[sea_orm(string_value = "FRONTEND")]
27 Frontend,
28 #[sea_orm(string_value = "COMPUTE_NODE")]
29 ComputeNode,
30 #[sea_orm(string_value = "RISE_CTL")]
31 RiseCtl,
32 #[sea_orm(string_value = "COMPACTOR")]
33 Compactor,
34 #[sea_orm(string_value = "META")]
35 Meta,
36}
37
38impl From<PbWorkerType> for WorkerType {
39 fn from(worker_type: PbWorkerType) -> Self {
40 match worker_type {
41 PbWorkerType::Unspecified => unreachable!("unspecified worker type"),
42 PbWorkerType::Frontend => Self::Frontend,
43 PbWorkerType::ComputeNode => Self::ComputeNode,
44 PbWorkerType::RiseCtl => Self::RiseCtl,
45 PbWorkerType::Compactor => Self::Compactor,
46 PbWorkerType::Meta => Self::Meta,
47 }
48 }
49}
50
51impl From<WorkerType> for PbWorkerType {
52 fn from(worker_type: WorkerType) -> Self {
53 match worker_type {
54 WorkerType::Frontend => Self::Frontend,
55 WorkerType::ComputeNode => Self::ComputeNode,
56 WorkerType::RiseCtl => Self::RiseCtl,
57 WorkerType::Compactor => Self::Compactor,
58 WorkerType::Meta => Self::Meta,
59 }
60 }
61}
62
63#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
64#[sea_orm(rs_type = "String", db_type = "string(None)")]
65pub enum WorkerStatus {
66 #[sea_orm(string_value = "STARTING")]
67 Starting,
68 #[sea_orm(string_value = "RUNNING")]
69 Running,
70}
71
72impl From<PbState> for WorkerStatus {
73 fn from(state: PbState) -> Self {
74 match state {
75 PbState::Unspecified => unreachable!("unspecified worker status"),
76 PbState::Starting => Self::Starting,
77 PbState::Running => Self::Running,
78 }
79 }
80}
81
82impl From<WorkerStatus> for PbState {
83 fn from(status: WorkerStatus) -> Self {
84 match status {
85 WorkerStatus::Starting => Self::Starting,
86 WorkerStatus::Running => Self::Running,
87 }
88 }
89}
90
91impl From<&PbWorkerNode> for ActiveModel {
92 fn from(worker: &PbWorkerNode) -> Self {
93 let host = worker.host.clone().unwrap();
94 Self {
95 worker_id: Set(worker.id as _),
96 worker_type: Set(worker.r#type().into()),
97 host: Set(host.host),
98 port: Set(host.port),
99 status: Set(worker.state().into()),
100 transaction_id: Set(worker.transactional_id.map(|id| id as _)),
101 }
102 }
103}
104
105#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
106#[sea_orm(table_name = "worker")]
107pub struct Model {
108 #[sea_orm(primary_key)]
109 pub worker_id: WorkerId,
110 pub worker_type: WorkerType,
111 pub host: String,
112 pub port: i32,
113 pub status: WorkerStatus,
114 pub transaction_id: Option<TransactionId>,
115}
116
117#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
118pub enum Relation {
119 #[sea_orm(has_many = "super::worker_property::Entity")]
120 WorkerProperty,
121}
122
123impl Related<super::worker_property::Entity> for Entity {
124 fn to() -> RelationDef {
125 Relation::WorkerProperty.def()
126 }
127}
128
129impl ActiveModelBehavior for ActiveModel {}