risingwave_meta/hummock/model/ext/
hummock.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_hummock_sdk::version::HummockVersionDelta;
17use risingwave_meta_model::compaction_config::CompactionConfig;
18use risingwave_meta_model::compaction_status::LevelHandlers;
19use risingwave_meta_model::compaction_task::CompactionTask;
20use risingwave_meta_model::hummock_version_delta::FullVersionDelta;
21use risingwave_meta_model::hummock_version_stats::TableStats;
22use risingwave_meta_model::{
23    CompactionGroupId, CompactionTaskId, HummockVersionId, WorkerId, compaction_config,
24    compaction_status, compaction_task, hummock_pinned_snapshot, hummock_pinned_version,
25    hummock_version_delta, hummock_version_stats,
26};
27use risingwave_pb::hummock::{
28    CompactTaskAssignment, HummockPinnedSnapshot, HummockPinnedVersion, HummockVersionStats,
29};
30use sea_orm::ActiveValue::Set;
31use sea_orm::EntityTrait;
32use sea_orm::sea_query::OnConflict;
33
34use crate::hummock::compaction::CompactStatus;
35use crate::hummock::model::CompactionGroup;
36use crate::model::{MetadataModelError, MetadataModelResult, Transactional};
37use crate::storage::MetaStoreError;
38
39pub type Transaction = sea_orm::DatabaseTransaction;
40
41impl From<sea_orm::DbErr> for MetadataModelError {
42    fn from(err: sea_orm::DbErr) -> Self {
43        MetadataModelError::MetaStoreError(MetaStoreError::Internal(err.into()))
44    }
45}
46
47// TODO: reduce boilerplate code
48
49#[async_trait::async_trait]
50impl Transactional<Transaction> for CompactionGroup {
51    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
52        let m = compaction_config::ActiveModel {
53            compaction_group_id: Set(self.group_id.try_into().unwrap()),
54            config: Set(CompactionConfig::from(&(*self.compaction_config))),
55        };
56        compaction_config::Entity::insert(m)
57            .on_conflict(
58                OnConflict::column(compaction_config::Column::CompactionGroupId)
59                    .update_columns([compaction_config::Column::Config])
60                    .to_owned(),
61            )
62            .exec(trx)
63            .await?;
64        Ok(())
65    }
66
67    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
68        compaction_config::Entity::delete_by_id(
69            CompactionGroupId::try_from(self.group_id).unwrap(),
70        )
71        .exec(trx)
72        .await?;
73        Ok(())
74    }
75}
76
77#[async_trait::async_trait]
78impl Transactional<Transaction> for CompactStatus {
79    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
80        let m = compaction_status::ActiveModel {
81            compaction_group_id: Set(self.compaction_group_id.try_into().unwrap()),
82            status: Set(LevelHandlers::from(
83                self.level_handlers.iter().map_into().collect_vec(),
84            )),
85        };
86        compaction_status::Entity::insert(m)
87            .on_conflict(
88                OnConflict::column(compaction_status::Column::CompactionGroupId)
89                    .update_columns([compaction_status::Column::Status])
90                    .to_owned(),
91            )
92            .exec(trx)
93            .await?;
94        Ok(())
95    }
96
97    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
98        compaction_status::Entity::delete_by_id(
99            CompactionGroupId::try_from(self.compaction_group_id).unwrap(),
100        )
101        .exec(trx)
102        .await?;
103        Ok(())
104    }
105}
106
107#[async_trait::async_trait]
108impl Transactional<Transaction> for CompactTaskAssignment {
109    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
110        let task = self.compact_task.to_owned().unwrap();
111        let m = compaction_task::ActiveModel {
112            id: Set(task.task_id.try_into().unwrap()),
113            context_id: Set(self.context_id.try_into().unwrap()),
114            task: Set(CompactionTask::from(&task)),
115        };
116        compaction_task::Entity::insert(m)
117            .on_conflict(
118                OnConflict::column(compaction_task::Column::Id)
119                    .update_columns([
120                        compaction_task::Column::ContextId,
121                        compaction_task::Column::Task,
122                    ])
123                    .to_owned(),
124            )
125            .exec(trx)
126            .await?;
127        Ok(())
128    }
129
130    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
131        compaction_task::Entity::delete_by_id(
132            CompactionTaskId::try_from(self.compact_task.as_ref().unwrap().task_id).unwrap(),
133        )
134        .exec(trx)
135        .await?;
136        Ok(())
137    }
138}
139
140#[async_trait::async_trait]
141impl Transactional<Transaction> for HummockPinnedVersion {
142    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
143        let m = hummock_pinned_version::ActiveModel {
144            context_id: Set(self.context_id.try_into().unwrap()),
145            min_pinned_id: Set(self.min_pinned_id.try_into().unwrap()),
146        };
147        hummock_pinned_version::Entity::insert(m)
148            .on_conflict(
149                OnConflict::column(hummock_pinned_version::Column::ContextId)
150                    .update_columns([hummock_pinned_version::Column::MinPinnedId])
151                    .to_owned(),
152            )
153            .exec(trx)
154            .await?;
155        Ok(())
156    }
157
158    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
159        hummock_pinned_version::Entity::delete_by_id(WorkerId::try_from(self.context_id).unwrap())
160            .exec(trx)
161            .await?;
162        Ok(())
163    }
164}
165
166#[async_trait::async_trait]
167impl Transactional<Transaction> for HummockPinnedSnapshot {
168    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
169        let m = hummock_pinned_snapshot::ActiveModel {
170            context_id: Set(self.context_id.try_into().unwrap()),
171            min_pinned_snapshot: Set(self.minimal_pinned_snapshot.try_into().unwrap()),
172        };
173        hummock_pinned_snapshot::Entity::insert(m)
174            .on_conflict(
175                OnConflict::column(hummock_pinned_snapshot::Column::ContextId)
176                    .update_columns([hummock_pinned_snapshot::Column::MinPinnedSnapshot])
177                    .to_owned(),
178            )
179            .exec(trx)
180            .await?;
181        Ok(())
182    }
183
184    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
185        hummock_pinned_snapshot::Entity::delete_by_id(WorkerId::try_from(self.context_id).unwrap())
186            .exec(trx)
187            .await?;
188        Ok(())
189    }
190}
191
192#[async_trait::async_trait]
193impl Transactional<Transaction> for HummockVersionStats {
194    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
195        let m = hummock_version_stats::ActiveModel {
196            id: Set(self.hummock_version_id.try_into().unwrap()),
197            stats: Set(TableStats(self.table_stats.clone())),
198        };
199        hummock_version_stats::Entity::insert(m)
200            .on_conflict(
201                OnConflict::column(hummock_version_stats::Column::Id)
202                    .update_columns([hummock_version_stats::Column::Stats])
203                    .to_owned(),
204            )
205            .exec(trx)
206            .await?;
207        Ok(())
208    }
209
210    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
211        hummock_version_stats::Entity::delete_by_id(
212            HummockVersionId::try_from(self.hummock_version_id).unwrap(),
213        )
214        .exec(trx)
215        .await?;
216        Ok(())
217    }
218}
219
220#[async_trait::async_trait]
221impl Transactional<Transaction> for HummockVersionDelta {
222    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
223        let m = hummock_version_delta::ActiveModel {
224            id: Set(self.id.to_u64().try_into().unwrap()),
225            prev_id: Set(self.prev_id.to_u64().try_into().unwrap()),
226            max_committed_epoch: Set(0.into()),
227            safe_epoch: Set(0.into()),
228            trivial_move: Set(self.trivial_move),
229            full_version_delta: Set(FullVersionDelta::from(&self.into())),
230        };
231        hummock_version_delta::Entity::insert(m)
232            .on_conflict(
233                OnConflict::column(hummock_version_delta::Column::Id)
234                    .update_columns([
235                        hummock_version_delta::Column::PrevId,
236                        hummock_version_delta::Column::MaxCommittedEpoch,
237                        hummock_version_delta::Column::SafeEpoch,
238                        hummock_version_delta::Column::TrivialMove,
239                        hummock_version_delta::Column::FullVersionDelta,
240                    ])
241                    .to_owned(),
242            )
243            .exec(trx)
244            .await?;
245        Ok(())
246    }
247
248    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
249        hummock_version_delta::Entity::delete_by_id(
250            HummockVersionId::try_from(self.id.to_u64()).unwrap(),
251        )
252        .exec(trx)
253        .await?;
254        Ok(())
255    }
256}
257
258impl From<compaction_config::Model> for CompactionGroup {
259    fn from(value: compaction_config::Model) -> Self {
260        Self::new(
261            value.compaction_group_id.try_into().unwrap(),
262            value.config.to_protobuf(),
263        )
264    }
265}
266
267impl From<compaction_status::Model> for CompactStatus {
268    fn from(value: compaction_status::Model) -> Self {
269        Self {
270            compaction_group_id: value.compaction_group_id.try_into().unwrap(),
271            level_handlers: value.status.to_protobuf().iter().map_into().collect(),
272        }
273    }
274}