Skip to main content

risingwave_meta/hummock/model/ext/
hummock.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::catalog::TableId;
17use risingwave_hummock_sdk::change_log::EpochNewChangeLog;
18use risingwave_hummock_sdk::compact_task::{CompactTask, CompactTaskAssignment};
19use risingwave_hummock_sdk::version::HummockVersionDelta;
20use risingwave_meta_model::compaction_config::CompactionConfig;
21use risingwave_meta_model::compaction_status::LevelHandlers;
22use risingwave_meta_model::compaction_task::CompactionTask;
23use risingwave_meta_model::hummock_table_change_log::ActiveModel as MetaStoreTableChangeLog;
24use risingwave_meta_model::hummock_version_delta::FullVersionDelta;
25use risingwave_meta_model::hummock_version_stats::TableStats;
26use risingwave_meta_model::{
27    CompactionGroupId, CompactionTaskId, Epoch, HummockVersionId, compaction_config,
28    compaction_status, compaction_task, hummock_pinned_snapshot, hummock_pinned_version,
29    hummock_version_delta, hummock_version_stats,
30};
31use risingwave_pb::hummock::{
32    HummockPinnedSnapshot, HummockPinnedVersion, HummockVersionStats, PbSstableInfo,
33};
34use sea_orm::ActiveValue::Set;
35use sea_orm::EntityTrait;
36use sea_orm::sea_query::OnConflict;
37
38use crate::hummock::compaction::CompactStatus;
39use crate::hummock::model::CompactionGroup;
40use crate::model::{MetadataModelError, MetadataModelResult, Transactional};
41
42pub type Transaction = sea_orm::DatabaseTransaction;
43
44pub(crate) fn compaction_task_model_to_assignment(
45    model: compaction_task::Model,
46) -> CompactTaskAssignment {
47    CompactTaskAssignment {
48        compact_task: CompactTask::from(model.task.to_protobuf()),
49        context_id: model.context_id,
50    }
51}
52
53impl From<sea_orm::DbErr> for MetadataModelError {
54    fn from(err: sea_orm::DbErr) -> Self {
55        // TODO: a separate error variant
56        MetadataModelError::InternalError(err.into())
57    }
58}
59
60// TODO: reduce boilerplate code
61
62#[async_trait::async_trait]
63impl Transactional<Transaction> for CompactionGroup {
64    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
65        let m = compaction_config::ActiveModel {
66            compaction_group_id: Set(self.group_id),
67            config: Set(CompactionConfig::from(&(*self.compaction_config))),
68        };
69        compaction_config::Entity::insert(m)
70            .on_conflict(
71                OnConflict::column(compaction_config::Column::CompactionGroupId)
72                    .update_columns([compaction_config::Column::Config])
73                    .to_owned(),
74            )
75            .exec(trx)
76            .await?;
77        Ok(())
78    }
79
80    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
81        compaction_config::Entity::delete_by_id(
82            CompactionGroupId::try_from(self.group_id).unwrap(),
83        )
84        .exec(trx)
85        .await?;
86        Ok(())
87    }
88}
89
90#[async_trait::async_trait]
91impl Transactional<Transaction> for CompactStatus {
92    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
93        let m = compaction_status::ActiveModel {
94            compaction_group_id: Set(self.compaction_group_id),
95            status: Set(LevelHandlers::from(
96                self.level_handlers.iter().map_into().collect_vec(),
97            )),
98        };
99        compaction_status::Entity::insert(m)
100            .on_conflict(
101                OnConflict::column(compaction_status::Column::CompactionGroupId)
102                    .update_columns([compaction_status::Column::Status])
103                    .to_owned(),
104            )
105            .exec(trx)
106            .await?;
107        Ok(())
108    }
109
110    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
111        compaction_status::Entity::delete_by_id(
112            CompactionGroupId::try_from(self.compaction_group_id).unwrap(),
113        )
114        .exec(trx)
115        .await?;
116        Ok(())
117    }
118}
119
120#[async_trait::async_trait]
121impl Transactional<Transaction> for CompactTaskAssignment {
122    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
123        let task: risingwave_pb::hummock::CompactTask = (&self.compact_task).into();
124        let m = compaction_task::ActiveModel {
125            id: Set(self.compact_task.task_id.try_into().unwrap()),
126            context_id: Set(self.context_id),
127            task: Set(CompactionTask::from(&task)),
128        };
129        compaction_task::Entity::insert(m)
130            .on_conflict(
131                OnConflict::column(compaction_task::Column::Id)
132                    .update_columns([
133                        compaction_task::Column::ContextId,
134                        compaction_task::Column::Task,
135                    ])
136                    .to_owned(),
137            )
138            .exec(trx)
139            .await?;
140        Ok(())
141    }
142
143    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
144        compaction_task::Entity::delete_by_id(
145            CompactionTaskId::try_from(self.compact_task.task_id).unwrap(),
146        )
147        .exec(trx)
148        .await?;
149        Ok(())
150    }
151}
152
153#[async_trait::async_trait]
154impl Transactional<Transaction> for HummockPinnedVersion {
155    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
156        let m = hummock_pinned_version::ActiveModel {
157            context_id: Set(self.context_id),
158            min_pinned_id: Set(self.min_pinned_id),
159        };
160        hummock_pinned_version::Entity::insert(m)
161            .on_conflict(
162                OnConflict::column(hummock_pinned_version::Column::ContextId)
163                    .update_columns([hummock_pinned_version::Column::MinPinnedId])
164                    .to_owned(),
165            )
166            .exec(trx)
167            .await?;
168        Ok(())
169    }
170
171    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
172        hummock_pinned_version::Entity::delete_by_id(self.context_id)
173            .exec(trx)
174            .await?;
175        Ok(())
176    }
177}
178
179#[async_trait::async_trait]
180impl Transactional<Transaction> for HummockPinnedSnapshot {
181    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
182        let m = hummock_pinned_snapshot::ActiveModel {
183            context_id: Set(self.context_id),
184            min_pinned_snapshot: Set(self.minimal_pinned_snapshot.try_into().unwrap()),
185        };
186        hummock_pinned_snapshot::Entity::insert(m)
187            .on_conflict(
188                OnConflict::column(hummock_pinned_snapshot::Column::ContextId)
189                    .update_columns([hummock_pinned_snapshot::Column::MinPinnedSnapshot])
190                    .to_owned(),
191            )
192            .exec(trx)
193            .await?;
194        Ok(())
195    }
196
197    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
198        hummock_pinned_snapshot::Entity::delete_by_id(self.context_id)
199            .exec(trx)
200            .await?;
201        Ok(())
202    }
203}
204
205#[async_trait::async_trait]
206impl Transactional<Transaction> for HummockVersionStats {
207    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
208        let m = hummock_version_stats::ActiveModel {
209            id: Set(self.hummock_version_id),
210            stats: Set(TableStats(self.table_stats.clone())),
211        };
212        hummock_version_stats::Entity::insert(m)
213            .on_conflict(
214                OnConflict::column(hummock_version_stats::Column::Id)
215                    .update_columns([hummock_version_stats::Column::Stats])
216                    .to_owned(),
217            )
218            .exec(trx)
219            .await?;
220        Ok(())
221    }
222
223    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
224        hummock_version_stats::Entity::delete_by_id(
225            HummockVersionId::try_from(self.hummock_version_id).unwrap(),
226        )
227        .exec(trx)
228        .await?;
229        Ok(())
230    }
231}
232
233#[async_trait::async_trait]
234impl Transactional<Transaction> for HummockVersionDelta {
235    async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
236        let m = hummock_version_delta::ActiveModel {
237            id: Set(self.id),
238            prev_id: Set(self.prev_id),
239            max_committed_epoch: Set(0.into()),
240            safe_epoch: Set(0.into()),
241            trivial_move: Set(self.trivial_move),
242            full_version_delta: Set(FullVersionDelta::from(&self.into())),
243        };
244        hummock_version_delta::Entity::insert(m)
245            .on_conflict(
246                OnConflict::column(hummock_version_delta::Column::Id)
247                    .update_columns([
248                        hummock_version_delta::Column::PrevId,
249                        hummock_version_delta::Column::MaxCommittedEpoch,
250                        hummock_version_delta::Column::SafeEpoch,
251                        hummock_version_delta::Column::TrivialMove,
252                        hummock_version_delta::Column::FullVersionDelta,
253                    ])
254                    .to_owned(),
255            )
256            .exec(trx)
257            .await?;
258        Ok(())
259    }
260
261    async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
262        hummock_version_delta::Entity::delete_by_id(self.id)
263            .exec(trx)
264            .await?;
265        Ok(())
266    }
267}
268
269impl From<compaction_config::Model> for CompactionGroup {
270    fn from(value: compaction_config::Model) -> Self {
271        Self::new(value.compaction_group_id, value.config.to_protobuf())
272    }
273}
274
275impl From<compaction_status::Model> for CompactStatus {
276    fn from(value: compaction_status::Model) -> Self {
277        Self {
278            compaction_group_id: value.compaction_group_id,
279            level_handlers: value.status.to_protobuf().iter().map_into().collect(),
280        }
281    }
282}
283
284pub fn to_table_change_log_meta_store_model(
285    table_id: TableId,
286    change_log: &EpochNewChangeLog,
287) -> MetaStoreTableChangeLog {
288    MetaStoreTableChangeLog {
289        table_id: Set(table_id),
290        checkpoint_epoch: Set(change_log.checkpoint_epoch as _),
291        non_checkpoint_epochs: Set(change_log
292            .non_checkpoint_epochs
293            .iter()
294            .map(|e| *e as Epoch)
295            .collect::<Vec<_>>()
296            .into()),
297        new_value_sst: Set(change_log
298            .new_value
299            .iter()
300            .map(Into::into)
301            .collect::<Vec<PbSstableInfo>>()
302            .into()),
303        old_value_sst: Set(change_log
304            .old_value
305            .iter()
306            .map(Into::into)
307            .collect::<Vec<PbSstableInfo>>()
308            .into()),
309    }
310}
311
312pub fn to_table_change_log(
313    change_log: risingwave_meta_model::hummock_table_change_log::Model,
314) -> EpochNewChangeLog {
315    EpochNewChangeLog {
316        new_value: change_log
317            .new_value_sst
318            .to_protobuf()
319            .into_iter()
320            .map(Into::into)
321            .collect(),
322        old_value: change_log
323            .old_value_sst
324            .to_protobuf()
325            .into_iter()
326            .map(Into::into)
327            .collect(),
328        non_checkpoint_epochs: change_log
329            .non_checkpoint_epochs
330            .0
331            .iter()
332            .map(|e| *e as _)
333            .collect(),
334        checkpoint_epoch: change_log.checkpoint_epoch as _,
335    }
336}