risingwave_meta/hummock/model/ext/
hummock.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::catalog::TableId;
17use risingwave_hummock_sdk::change_log::EpochNewChangeLog;
18use risingwave_hummock_sdk::version::HummockVersionDelta;
19use risingwave_meta_model::compaction_config::CompactionConfig;
20use risingwave_meta_model::compaction_status::LevelHandlers;
21use risingwave_meta_model::compaction_task::CompactionTask;
22use risingwave_meta_model::hummock_table_change_log::ActiveModel as MetaStoreTableChangeLog;
23use risingwave_meta_model::hummock_version_delta::FullVersionDelta;
24use risingwave_meta_model::hummock_version_stats::TableStats;
25use risingwave_meta_model::{
26    CompactionGroupId, CompactionTaskId, Epoch, HummockVersionId, compaction_config,
27    compaction_status, compaction_task, hummock_pinned_snapshot, hummock_pinned_version,
28    hummock_version_delta, hummock_version_stats,
29};
30use risingwave_pb::hummock::{
31    CompactTaskAssignment, HummockPinnedSnapshot, HummockPinnedVersion, HummockVersionStats,
32    PbSstableInfo,
33};
34use sea_orm::ActiveValue::Set;
35use sea_orm::EntityTrait;
36use sea_orm::sea_query::OnConflict;
37
38use crate::hummock::compaction::CompactStatus;
39use crate::hummock::model::CompactionGroup;
40use crate::model::{MetadataModelError, MetadataModelResult, Transactional};
41
42pub type Transaction = sea_orm::DatabaseTransaction;
43
44impl From<sea_orm::DbErr> for MetadataModelError {
45    fn from(err: sea_orm::DbErr) -> Self {
46        // TODO: a separate error variant
47        MetadataModelError::InternalError(err.into())
48    }
49}
50
51// TODO: reduce boilerplate code
52
53#[async_trait::async_trait]
54impl Transactional<Transaction> for CompactionGroup {
55    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
56        let m = compaction_config::ActiveModel {
57            compaction_group_id: Set(self.group_id),
58            config: Set(CompactionConfig::from(&(*self.compaction_config))),
59        };
60        compaction_config::Entity::insert(m)
61            .on_conflict(
62                OnConflict::column(compaction_config::Column::CompactionGroupId)
63                    .update_columns([compaction_config::Column::Config])
64                    .to_owned(),
65            )
66            .exec(trx)
67            .await?;
68        Ok(())
69    }
70
71    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
72        compaction_config::Entity::delete_by_id(
73            CompactionGroupId::try_from(self.group_id).unwrap(),
74        )
75        .exec(trx)
76        .await?;
77        Ok(())
78    }
79}
80
81#[async_trait::async_trait]
82impl Transactional<Transaction> for CompactStatus {
83    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
84        let m = compaction_status::ActiveModel {
85            compaction_group_id: Set(self.compaction_group_id),
86            status: Set(LevelHandlers::from(
87                self.level_handlers.iter().map_into().collect_vec(),
88            )),
89        };
90        compaction_status::Entity::insert(m)
91            .on_conflict(
92                OnConflict::column(compaction_status::Column::CompactionGroupId)
93                    .update_columns([compaction_status::Column::Status])
94                    .to_owned(),
95            )
96            .exec(trx)
97            .await?;
98        Ok(())
99    }
100
101    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
102        compaction_status::Entity::delete_by_id(
103            CompactionGroupId::try_from(self.compaction_group_id).unwrap(),
104        )
105        .exec(trx)
106        .await?;
107        Ok(())
108    }
109}
110
111#[async_trait::async_trait]
112impl Transactional<Transaction> for CompactTaskAssignment {
113    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
114        let task = self.compact_task.clone().unwrap();
115        let m = compaction_task::ActiveModel {
116            id: Set(task.task_id.try_into().unwrap()),
117            context_id: Set(self.context_id),
118            task: Set(CompactionTask::from(&task)),
119        };
120        compaction_task::Entity::insert(m)
121            .on_conflict(
122                OnConflict::column(compaction_task::Column::Id)
123                    .update_columns([
124                        compaction_task::Column::ContextId,
125                        compaction_task::Column::Task,
126                    ])
127                    .to_owned(),
128            )
129            .exec(trx)
130            .await?;
131        Ok(())
132    }
133
134    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
135        compaction_task::Entity::delete_by_id(
136            CompactionTaskId::try_from(self.compact_task.as_ref().unwrap().task_id).unwrap(),
137        )
138        .exec(trx)
139        .await?;
140        Ok(())
141    }
142}
143
144#[async_trait::async_trait]
145impl Transactional<Transaction> for HummockPinnedVersion {
146    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
147        let m = hummock_pinned_version::ActiveModel {
148            context_id: Set(self.context_id),
149            min_pinned_id: Set(self.min_pinned_id),
150        };
151        hummock_pinned_version::Entity::insert(m)
152            .on_conflict(
153                OnConflict::column(hummock_pinned_version::Column::ContextId)
154                    .update_columns([hummock_pinned_version::Column::MinPinnedId])
155                    .to_owned(),
156            )
157            .exec(trx)
158            .await?;
159        Ok(())
160    }
161
162    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
163        hummock_pinned_version::Entity::delete_by_id(self.context_id)
164            .exec(trx)
165            .await?;
166        Ok(())
167    }
168}
169
170#[async_trait::async_trait]
171impl Transactional<Transaction> for HummockPinnedSnapshot {
172    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
173        let m = hummock_pinned_snapshot::ActiveModel {
174            context_id: Set(self.context_id),
175            min_pinned_snapshot: Set(self.minimal_pinned_snapshot.try_into().unwrap()),
176        };
177        hummock_pinned_snapshot::Entity::insert(m)
178            .on_conflict(
179                OnConflict::column(hummock_pinned_snapshot::Column::ContextId)
180                    .update_columns([hummock_pinned_snapshot::Column::MinPinnedSnapshot])
181                    .to_owned(),
182            )
183            .exec(trx)
184            .await?;
185        Ok(())
186    }
187
188    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
189        hummock_pinned_snapshot::Entity::delete_by_id(self.context_id)
190            .exec(trx)
191            .await?;
192        Ok(())
193    }
194}
195
196#[async_trait::async_trait]
197impl Transactional<Transaction> for HummockVersionStats {
198    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
199        let m = hummock_version_stats::ActiveModel {
200            id: Set(self.hummock_version_id),
201            stats: Set(TableStats(self.table_stats.clone())),
202        };
203        hummock_version_stats::Entity::insert(m)
204            .on_conflict(
205                OnConflict::column(hummock_version_stats::Column::Id)
206                    .update_columns([hummock_version_stats::Column::Stats])
207                    .to_owned(),
208            )
209            .exec(trx)
210            .await?;
211        Ok(())
212    }
213
214    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
215        hummock_version_stats::Entity::delete_by_id(
216            HummockVersionId::try_from(self.hummock_version_id).unwrap(),
217        )
218        .exec(trx)
219        .await?;
220        Ok(())
221    }
222}
223
224#[async_trait::async_trait]
225impl Transactional<Transaction> for HummockVersionDelta {
226    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
227        let m = hummock_version_delta::ActiveModel {
228            id: Set(self.id),
229            prev_id: Set(self.prev_id),
230            max_committed_epoch: Set(0.into()),
231            safe_epoch: Set(0.into()),
232            trivial_move: Set(self.trivial_move),
233            full_version_delta: Set(FullVersionDelta::from(&self.into())),
234        };
235        hummock_version_delta::Entity::insert(m)
236            .on_conflict(
237                OnConflict::column(hummock_version_delta::Column::Id)
238                    .update_columns([
239                        hummock_version_delta::Column::PrevId,
240                        hummock_version_delta::Column::MaxCommittedEpoch,
241                        hummock_version_delta::Column::SafeEpoch,
242                        hummock_version_delta::Column::TrivialMove,
243                        hummock_version_delta::Column::FullVersionDelta,
244                    ])
245                    .to_owned(),
246            )
247            .exec(trx)
248            .await?;
249        Ok(())
250    }
251
252    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
253        hummock_version_delta::Entity::delete_by_id(self.id)
254            .exec(trx)
255            .await?;
256        Ok(())
257    }
258}
259
260impl From<compaction_config::Model> for CompactionGroup {
261    fn from(value: compaction_config::Model) -> Self {
262        Self::new(value.compaction_group_id, value.config.to_protobuf())
263    }
264}
265
266impl From<compaction_status::Model> for CompactStatus {
267    fn from(value: compaction_status::Model) -> Self {
268        Self {
269            compaction_group_id: value.compaction_group_id,
270            level_handlers: value.status.to_protobuf().iter().map_into().collect(),
271        }
272    }
273}
274
275pub fn to_table_change_log_meta_store_model(
276    table_id: TableId,
277    change_log: &EpochNewChangeLog,
278) -> MetaStoreTableChangeLog {
279    MetaStoreTableChangeLog {
280        table_id: Set(table_id),
281        checkpoint_epoch: Set(change_log.checkpoint_epoch as _),
282        non_checkpoint_epochs: Set(change_log
283            .non_checkpoint_epochs
284            .iter()
285            .map(|e| *e as Epoch)
286            .collect::<Vec<_>>()
287            .into()),
288        new_value_sst: Set(change_log
289            .new_value
290            .iter()
291            .map(Into::into)
292            .collect::<Vec<PbSstableInfo>>()
293            .into()),
294        old_value_sst: Set(change_log
295            .old_value
296            .iter()
297            .map(Into::into)
298            .collect::<Vec<PbSstableInfo>>()
299            .into()),
300    }
301}
302
303pub fn to_table_change_log(
304    change_log: risingwave_meta_model::hummock_table_change_log::Model,
305) -> EpochNewChangeLog {
306    EpochNewChangeLog {
307        new_value: change_log
308            .new_value_sst
309            .to_protobuf()
310            .into_iter()
311            .map(Into::into)
312            .collect(),
313        old_value: change_log
314            .old_value_sst
315            .to_protobuf()
316            .into_iter()
317            .map(Into::into)
318            .collect(),
319        non_checkpoint_epochs: change_log
320            .non_checkpoint_epochs
321            .0
322            .iter()
323            .map(|e| *e as _)
324            .collect(),
325        checkpoint_epoch: change_log.checkpoint_epoch as _,
326    }
327}