risingwave_meta/hummock/model/ext/
hummock.rs1use itertools::Itertools;
16use risingwave_common::catalog::TableId;
17use risingwave_hummock_sdk::change_log::EpochNewChangeLog;
18use risingwave_hummock_sdk::version::HummockVersionDelta;
19use risingwave_meta_model::compaction_config::CompactionConfig;
20use risingwave_meta_model::compaction_status::LevelHandlers;
21use risingwave_meta_model::compaction_task::CompactionTask;
22use risingwave_meta_model::hummock_table_change_log::ActiveModel as MetaStoreTableChangeLog;
23use risingwave_meta_model::hummock_version_delta::FullVersionDelta;
24use risingwave_meta_model::hummock_version_stats::TableStats;
25use risingwave_meta_model::{
26 CompactionGroupId, CompactionTaskId, Epoch, HummockVersionId, compaction_config,
27 compaction_status, compaction_task, hummock_pinned_snapshot, hummock_pinned_version,
28 hummock_version_delta, hummock_version_stats,
29};
30use risingwave_pb::hummock::{
31 CompactTaskAssignment, HummockPinnedSnapshot, HummockPinnedVersion, HummockVersionStats,
32 PbSstableInfo,
33};
34use sea_orm::ActiveValue::Set;
35use sea_orm::EntityTrait;
36use sea_orm::sea_query::OnConflict;
37
38use crate::hummock::compaction::CompactStatus;
39use crate::hummock::model::CompactionGroup;
40use crate::model::{MetadataModelError, MetadataModelResult, Transactional};
41
42pub type Transaction = sea_orm::DatabaseTransaction;
43
44impl From<sea_orm::DbErr> for MetadataModelError {
45 fn from(err: sea_orm::DbErr) -> Self {
46 MetadataModelError::InternalError(err.into())
48 }
49}
50
51#[async_trait::async_trait]
54impl Transactional<Transaction> for CompactionGroup {
55 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
56 let m = compaction_config::ActiveModel {
57 compaction_group_id: Set(self.group_id),
58 config: Set(CompactionConfig::from(&(*self.compaction_config))),
59 };
60 compaction_config::Entity::insert(m)
61 .on_conflict(
62 OnConflict::column(compaction_config::Column::CompactionGroupId)
63 .update_columns([compaction_config::Column::Config])
64 .to_owned(),
65 )
66 .exec(trx)
67 .await?;
68 Ok(())
69 }
70
71 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
72 compaction_config::Entity::delete_by_id(
73 CompactionGroupId::try_from(self.group_id).unwrap(),
74 )
75 .exec(trx)
76 .await?;
77 Ok(())
78 }
79}
80
81#[async_trait::async_trait]
82impl Transactional<Transaction> for CompactStatus {
83 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
84 let m = compaction_status::ActiveModel {
85 compaction_group_id: Set(self.compaction_group_id),
86 status: Set(LevelHandlers::from(
87 self.level_handlers.iter().map_into().collect_vec(),
88 )),
89 };
90 compaction_status::Entity::insert(m)
91 .on_conflict(
92 OnConflict::column(compaction_status::Column::CompactionGroupId)
93 .update_columns([compaction_status::Column::Status])
94 .to_owned(),
95 )
96 .exec(trx)
97 .await?;
98 Ok(())
99 }
100
101 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
102 compaction_status::Entity::delete_by_id(
103 CompactionGroupId::try_from(self.compaction_group_id).unwrap(),
104 )
105 .exec(trx)
106 .await?;
107 Ok(())
108 }
109}
110
111#[async_trait::async_trait]
112impl Transactional<Transaction> for CompactTaskAssignment {
113 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
114 let task = self.compact_task.clone().unwrap();
115 let m = compaction_task::ActiveModel {
116 id: Set(task.task_id.try_into().unwrap()),
117 context_id: Set(self.context_id),
118 task: Set(CompactionTask::from(&task)),
119 };
120 compaction_task::Entity::insert(m)
121 .on_conflict(
122 OnConflict::column(compaction_task::Column::Id)
123 .update_columns([
124 compaction_task::Column::ContextId,
125 compaction_task::Column::Task,
126 ])
127 .to_owned(),
128 )
129 .exec(trx)
130 .await?;
131 Ok(())
132 }
133
134 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
135 compaction_task::Entity::delete_by_id(
136 CompactionTaskId::try_from(self.compact_task.as_ref().unwrap().task_id).unwrap(),
137 )
138 .exec(trx)
139 .await?;
140 Ok(())
141 }
142}
143
144#[async_trait::async_trait]
145impl Transactional<Transaction> for HummockPinnedVersion {
146 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
147 let m = hummock_pinned_version::ActiveModel {
148 context_id: Set(self.context_id),
149 min_pinned_id: Set(self.min_pinned_id),
150 };
151 hummock_pinned_version::Entity::insert(m)
152 .on_conflict(
153 OnConflict::column(hummock_pinned_version::Column::ContextId)
154 .update_columns([hummock_pinned_version::Column::MinPinnedId])
155 .to_owned(),
156 )
157 .exec(trx)
158 .await?;
159 Ok(())
160 }
161
162 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
163 hummock_pinned_version::Entity::delete_by_id(self.context_id)
164 .exec(trx)
165 .await?;
166 Ok(())
167 }
168}
169
170#[async_trait::async_trait]
171impl Transactional<Transaction> for HummockPinnedSnapshot {
172 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
173 let m = hummock_pinned_snapshot::ActiveModel {
174 context_id: Set(self.context_id),
175 min_pinned_snapshot: Set(self.minimal_pinned_snapshot.try_into().unwrap()),
176 };
177 hummock_pinned_snapshot::Entity::insert(m)
178 .on_conflict(
179 OnConflict::column(hummock_pinned_snapshot::Column::ContextId)
180 .update_columns([hummock_pinned_snapshot::Column::MinPinnedSnapshot])
181 .to_owned(),
182 )
183 .exec(trx)
184 .await?;
185 Ok(())
186 }
187
188 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
189 hummock_pinned_snapshot::Entity::delete_by_id(self.context_id)
190 .exec(trx)
191 .await?;
192 Ok(())
193 }
194}
195
196#[async_trait::async_trait]
197impl Transactional<Transaction> for HummockVersionStats {
198 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
199 let m = hummock_version_stats::ActiveModel {
200 id: Set(self.hummock_version_id),
201 stats: Set(TableStats(self.table_stats.clone())),
202 };
203 hummock_version_stats::Entity::insert(m)
204 .on_conflict(
205 OnConflict::column(hummock_version_stats::Column::Id)
206 .update_columns([hummock_version_stats::Column::Stats])
207 .to_owned(),
208 )
209 .exec(trx)
210 .await?;
211 Ok(())
212 }
213
214 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
215 hummock_version_stats::Entity::delete_by_id(
216 HummockVersionId::try_from(self.hummock_version_id).unwrap(),
217 )
218 .exec(trx)
219 .await?;
220 Ok(())
221 }
222}
223
224#[async_trait::async_trait]
225impl Transactional<Transaction> for HummockVersionDelta {
226 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
227 let m = hummock_version_delta::ActiveModel {
228 id: Set(self.id),
229 prev_id: Set(self.prev_id),
230 max_committed_epoch: Set(0.into()),
231 safe_epoch: Set(0.into()),
232 trivial_move: Set(self.trivial_move),
233 full_version_delta: Set(FullVersionDelta::from(&self.into())),
234 };
235 hummock_version_delta::Entity::insert(m)
236 .on_conflict(
237 OnConflict::column(hummock_version_delta::Column::Id)
238 .update_columns([
239 hummock_version_delta::Column::PrevId,
240 hummock_version_delta::Column::MaxCommittedEpoch,
241 hummock_version_delta::Column::SafeEpoch,
242 hummock_version_delta::Column::TrivialMove,
243 hummock_version_delta::Column::FullVersionDelta,
244 ])
245 .to_owned(),
246 )
247 .exec(trx)
248 .await?;
249 Ok(())
250 }
251
252 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
253 hummock_version_delta::Entity::delete_by_id(self.id)
254 .exec(trx)
255 .await?;
256 Ok(())
257 }
258}
259
260impl From<compaction_config::Model> for CompactionGroup {
261 fn from(value: compaction_config::Model) -> Self {
262 Self::new(value.compaction_group_id, value.config.to_protobuf())
263 }
264}
265
266impl From<compaction_status::Model> for CompactStatus {
267 fn from(value: compaction_status::Model) -> Self {
268 Self {
269 compaction_group_id: value.compaction_group_id,
270 level_handlers: value.status.to_protobuf().iter().map_into().collect(),
271 }
272 }
273}
274
275pub fn to_table_change_log_meta_store_model(
276 table_id: TableId,
277 change_log: &EpochNewChangeLog,
278) -> MetaStoreTableChangeLog {
279 MetaStoreTableChangeLog {
280 table_id: Set(table_id),
281 checkpoint_epoch: Set(change_log.checkpoint_epoch as _),
282 non_checkpoint_epochs: Set(change_log
283 .non_checkpoint_epochs
284 .iter()
285 .map(|e| *e as Epoch)
286 .collect::<Vec<_>>()
287 .into()),
288 new_value_sst: Set(change_log
289 .new_value
290 .iter()
291 .map(Into::into)
292 .collect::<Vec<PbSstableInfo>>()
293 .into()),
294 old_value_sst: Set(change_log
295 .old_value
296 .iter()
297 .map(Into::into)
298 .collect::<Vec<PbSstableInfo>>()
299 .into()),
300 }
301}
302
303pub fn to_table_change_log(
304 change_log: risingwave_meta_model::hummock_table_change_log::Model,
305) -> EpochNewChangeLog {
306 EpochNewChangeLog {
307 new_value: change_log
308 .new_value_sst
309 .to_protobuf()
310 .into_iter()
311 .map(Into::into)
312 .collect(),
313 old_value: change_log
314 .old_value_sst
315 .to_protobuf()
316 .into_iter()
317 .map(Into::into)
318 .collect(),
319 non_checkpoint_epochs: change_log
320 .non_checkpoint_epochs
321 .0
322 .iter()
323 .map(|e| *e as _)
324 .collect(),
325 checkpoint_epoch: change_log.checkpoint_epoch as _,
326 }
327}