risingwave_meta_model/
worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::common::worker_node::PbState;
16use risingwave_pb::common::{PbWorkerNode, PbWorkerType};
17use sea_orm::ActiveValue::Set;
18use sea_orm::entity::prelude::*;
19use serde::{Deserialize, Serialize};
20
21use crate::{TransactionId, WorkerId};
22
23#[derive(Clone, Debug, Hash, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
24#[sea_orm(rs_type = "String", db_type = "string(None)")]
25pub enum WorkerType {
26    #[sea_orm(string_value = "FRONTEND")]
27    Frontend,
28    #[sea_orm(string_value = "COMPUTE_NODE")]
29    ComputeNode,
30    #[sea_orm(string_value = "RISE_CTL")]
31    RiseCtl,
32    #[sea_orm(string_value = "COMPACTOR")]
33    Compactor,
34    #[sea_orm(string_value = "META")]
35    Meta,
36}
37
38impl From<PbWorkerType> for WorkerType {
39    fn from(worker_type: PbWorkerType) -> Self {
40        match worker_type {
41            PbWorkerType::Unspecified => unreachable!("unspecified worker type"),
42            PbWorkerType::Frontend => Self::Frontend,
43            PbWorkerType::ComputeNode => Self::ComputeNode,
44            PbWorkerType::RiseCtl => Self::RiseCtl,
45            PbWorkerType::Compactor => Self::Compactor,
46            PbWorkerType::Meta => Self::Meta,
47        }
48    }
49}
50
51impl From<WorkerType> for PbWorkerType {
52    fn from(worker_type: WorkerType) -> Self {
53        match worker_type {
54            WorkerType::Frontend => Self::Frontend,
55            WorkerType::ComputeNode => Self::ComputeNode,
56            WorkerType::RiseCtl => Self::RiseCtl,
57            WorkerType::Compactor => Self::Compactor,
58            WorkerType::Meta => Self::Meta,
59        }
60    }
61}
62
63#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
64#[sea_orm(rs_type = "String", db_type = "string(None)")]
65pub enum WorkerStatus {
66    #[sea_orm(string_value = "STARTING")]
67    Starting,
68    #[sea_orm(string_value = "RUNNING")]
69    Running,
70}
71
72impl From<PbState> for WorkerStatus {
73    fn from(state: PbState) -> Self {
74        match state {
75            PbState::Unspecified => unreachable!("unspecified worker status"),
76            PbState::Starting => Self::Starting,
77            PbState::Running => Self::Running,
78        }
79    }
80}
81
82impl From<WorkerStatus> for PbState {
83    fn from(status: WorkerStatus) -> Self {
84        match status {
85            WorkerStatus::Starting => Self::Starting,
86            WorkerStatus::Running => Self::Running,
87        }
88    }
89}
90
91impl From<&PbWorkerNode> for ActiveModel {
92    fn from(worker: &PbWorkerNode) -> Self {
93        let host = worker.host.clone().unwrap();
94        Self {
95            worker_id: Set(worker.id as _),
96            worker_type: Set(worker.r#type().into()),
97            host: Set(host.host),
98            port: Set(host.port),
99            status: Set(worker.state().into()),
100            transaction_id: Set(worker.transactional_id.map(|id| id as _)),
101        }
102    }
103}
104
105#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
106#[sea_orm(table_name = "worker")]
107pub struct Model {
108    #[sea_orm(primary_key)]
109    pub worker_id: WorkerId,
110    pub worker_type: WorkerType,
111    pub host: String,
112    pub port: i32,
113    pub status: WorkerStatus,
114    pub transaction_id: Option<TransactionId>,
115}
116
117#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
118pub enum Relation {
119    #[sea_orm(has_many = "super::worker_property::Entity")]
120    WorkerProperty,
121}
122
123impl Related<super::worker_property::Entity> for Entity {
124    fn to() -> RelationDef {
125        Relation::WorkerProperty.def()
126    }
127}
128
129impl ActiveModelBehavior for ActiveModel {}