risingwave_meta/hummock/model/ext/
hummock.rs1use itertools::Itertools;
16use risingwave_common::catalog::TableId;
17use risingwave_hummock_sdk::change_log::EpochNewChangeLog;
18use risingwave_hummock_sdk::compact_task::{CompactTask, CompactTaskAssignment};
19use risingwave_hummock_sdk::version::HummockVersionDelta;
20use risingwave_meta_model::compaction_config::CompactionConfig;
21use risingwave_meta_model::compaction_status::LevelHandlers;
22use risingwave_meta_model::compaction_task::CompactionTask;
23use risingwave_meta_model::hummock_table_change_log::ActiveModel as MetaStoreTableChangeLog;
24use risingwave_meta_model::hummock_version_delta::FullVersionDelta;
25use risingwave_meta_model::hummock_version_stats::TableStats;
26use risingwave_meta_model::{
27 CompactionGroupId, CompactionTaskId, Epoch, HummockVersionId, compaction_config,
28 compaction_status, compaction_task, hummock_pinned_snapshot, hummock_pinned_version,
29 hummock_version_delta, hummock_version_stats,
30};
31use risingwave_pb::hummock::{
32 HummockPinnedSnapshot, HummockPinnedVersion, HummockVersionStats, PbSstableInfo,
33};
34use sea_orm::ActiveValue::Set;
35use sea_orm::EntityTrait;
36use sea_orm::sea_query::OnConflict;
37
38use crate::hummock::compaction::CompactStatus;
39use crate::hummock::model::CompactionGroup;
40use crate::model::{MetadataModelError, MetadataModelResult, Transactional};
41
42pub type Transaction = sea_orm::DatabaseTransaction;
43
44pub(crate) fn compaction_task_model_to_assignment(
45 model: compaction_task::Model,
46) -> CompactTaskAssignment {
47 CompactTaskAssignment {
48 compact_task: CompactTask::from(model.task.to_protobuf()),
49 context_id: model.context_id,
50 }
51}
52
53impl From<sea_orm::DbErr> for MetadataModelError {
54 fn from(err: sea_orm::DbErr) -> Self {
55 MetadataModelError::InternalError(err.into())
57 }
58}
59
60#[async_trait::async_trait]
63impl Transactional<Transaction> for CompactionGroup {
64 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
65 let m = compaction_config::ActiveModel {
66 compaction_group_id: Set(self.group_id),
67 config: Set(CompactionConfig::from(&(*self.compaction_config))),
68 };
69 compaction_config::Entity::insert(m)
70 .on_conflict(
71 OnConflict::column(compaction_config::Column::CompactionGroupId)
72 .update_columns([compaction_config::Column::Config])
73 .to_owned(),
74 )
75 .exec(trx)
76 .await?;
77 Ok(())
78 }
79
80 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
81 compaction_config::Entity::delete_by_id(
82 CompactionGroupId::try_from(self.group_id).unwrap(),
83 )
84 .exec(trx)
85 .await?;
86 Ok(())
87 }
88}
89
90#[async_trait::async_trait]
91impl Transactional<Transaction> for CompactStatus {
92 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
93 let m = compaction_status::ActiveModel {
94 compaction_group_id: Set(self.compaction_group_id),
95 status: Set(LevelHandlers::from(
96 self.level_handlers.iter().map_into().collect_vec(),
97 )),
98 };
99 compaction_status::Entity::insert(m)
100 .on_conflict(
101 OnConflict::column(compaction_status::Column::CompactionGroupId)
102 .update_columns([compaction_status::Column::Status])
103 .to_owned(),
104 )
105 .exec(trx)
106 .await?;
107 Ok(())
108 }
109
110 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
111 compaction_status::Entity::delete_by_id(
112 CompactionGroupId::try_from(self.compaction_group_id).unwrap(),
113 )
114 .exec(trx)
115 .await?;
116 Ok(())
117 }
118}
119
120#[async_trait::async_trait]
121impl Transactional<Transaction> for CompactTaskAssignment {
122 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
123 let task: risingwave_pb::hummock::CompactTask = (&self.compact_task).into();
124 let m = compaction_task::ActiveModel {
125 id: Set(self.compact_task.task_id.try_into().unwrap()),
126 context_id: Set(self.context_id),
127 task: Set(CompactionTask::from(&task)),
128 };
129 compaction_task::Entity::insert(m)
130 .on_conflict(
131 OnConflict::column(compaction_task::Column::Id)
132 .update_columns([
133 compaction_task::Column::ContextId,
134 compaction_task::Column::Task,
135 ])
136 .to_owned(),
137 )
138 .exec(trx)
139 .await?;
140 Ok(())
141 }
142
143 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
144 compaction_task::Entity::delete_by_id(
145 CompactionTaskId::try_from(self.compact_task.task_id).unwrap(),
146 )
147 .exec(trx)
148 .await?;
149 Ok(())
150 }
151}
152
153#[async_trait::async_trait]
154impl Transactional<Transaction> for HummockPinnedVersion {
155 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
156 let m = hummock_pinned_version::ActiveModel {
157 context_id: Set(self.context_id),
158 min_pinned_id: Set(self.min_pinned_id),
159 };
160 hummock_pinned_version::Entity::insert(m)
161 .on_conflict(
162 OnConflict::column(hummock_pinned_version::Column::ContextId)
163 .update_columns([hummock_pinned_version::Column::MinPinnedId])
164 .to_owned(),
165 )
166 .exec(trx)
167 .await?;
168 Ok(())
169 }
170
171 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
172 hummock_pinned_version::Entity::delete_by_id(self.context_id)
173 .exec(trx)
174 .await?;
175 Ok(())
176 }
177}
178
179#[async_trait::async_trait]
180impl Transactional<Transaction> for HummockPinnedSnapshot {
181 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
182 let m = hummock_pinned_snapshot::ActiveModel {
183 context_id: Set(self.context_id),
184 min_pinned_snapshot: Set(self.minimal_pinned_snapshot.try_into().unwrap()),
185 };
186 hummock_pinned_snapshot::Entity::insert(m)
187 .on_conflict(
188 OnConflict::column(hummock_pinned_snapshot::Column::ContextId)
189 .update_columns([hummock_pinned_snapshot::Column::MinPinnedSnapshot])
190 .to_owned(),
191 )
192 .exec(trx)
193 .await?;
194 Ok(())
195 }
196
197 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
198 hummock_pinned_snapshot::Entity::delete_by_id(self.context_id)
199 .exec(trx)
200 .await?;
201 Ok(())
202 }
203}
204
205#[async_trait::async_trait]
206impl Transactional<Transaction> for HummockVersionStats {
207 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
208 let m = hummock_version_stats::ActiveModel {
209 id: Set(self.hummock_version_id),
210 stats: Set(TableStats(self.table_stats.clone())),
211 };
212 hummock_version_stats::Entity::insert(m)
213 .on_conflict(
214 OnConflict::column(hummock_version_stats::Column::Id)
215 .update_columns([hummock_version_stats::Column::Stats])
216 .to_owned(),
217 )
218 .exec(trx)
219 .await?;
220 Ok(())
221 }
222
223 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
224 hummock_version_stats::Entity::delete_by_id(
225 HummockVersionId::try_from(self.hummock_version_id).unwrap(),
226 )
227 .exec(trx)
228 .await?;
229 Ok(())
230 }
231}
232
233#[async_trait::async_trait]
234impl Transactional<Transaction> for HummockVersionDelta {
235 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
236 let m = hummock_version_delta::ActiveModel {
237 id: Set(self.id),
238 prev_id: Set(self.prev_id),
239 max_committed_epoch: Set(0.into()),
240 safe_epoch: Set(0.into()),
241 trivial_move: Set(self.trivial_move),
242 full_version_delta: Set(FullVersionDelta::from(&self.into())),
243 };
244 hummock_version_delta::Entity::insert(m)
245 .on_conflict(
246 OnConflict::column(hummock_version_delta::Column::Id)
247 .update_columns([
248 hummock_version_delta::Column::PrevId,
249 hummock_version_delta::Column::MaxCommittedEpoch,
250 hummock_version_delta::Column::SafeEpoch,
251 hummock_version_delta::Column::TrivialMove,
252 hummock_version_delta::Column::FullVersionDelta,
253 ])
254 .to_owned(),
255 )
256 .exec(trx)
257 .await?;
258 Ok(())
259 }
260
261 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
262 hummock_version_delta::Entity::delete_by_id(self.id)
263 .exec(trx)
264 .await?;
265 Ok(())
266 }
267}
268
269impl From<compaction_config::Model> for CompactionGroup {
270 fn from(value: compaction_config::Model) -> Self {
271 Self::new(value.compaction_group_id, value.config.to_protobuf())
272 }
273}
274
275impl From<compaction_status::Model> for CompactStatus {
276 fn from(value: compaction_status::Model) -> Self {
277 Self {
278 compaction_group_id: value.compaction_group_id,
279 level_handlers: value.status.to_protobuf().iter().map_into().collect(),
280 }
281 }
282}
283
284pub fn to_table_change_log_meta_store_model(
285 table_id: TableId,
286 change_log: &EpochNewChangeLog,
287) -> MetaStoreTableChangeLog {
288 MetaStoreTableChangeLog {
289 table_id: Set(table_id),
290 checkpoint_epoch: Set(change_log.checkpoint_epoch as _),
291 non_checkpoint_epochs: Set(change_log
292 .non_checkpoint_epochs
293 .iter()
294 .map(|e| *e as Epoch)
295 .collect::<Vec<_>>()
296 .into()),
297 new_value_sst: Set(change_log
298 .new_value
299 .iter()
300 .map(Into::into)
301 .collect::<Vec<PbSstableInfo>>()
302 .into()),
303 old_value_sst: Set(change_log
304 .old_value
305 .iter()
306 .map(Into::into)
307 .collect::<Vec<PbSstableInfo>>()
308 .into()),
309 }
310}
311
312pub fn to_table_change_log(
313 change_log: risingwave_meta_model::hummock_table_change_log::Model,
314) -> EpochNewChangeLog {
315 EpochNewChangeLog {
316 new_value: change_log
317 .new_value_sst
318 .to_protobuf()
319 .into_iter()
320 .map(Into::into)
321 .collect(),
322 old_value: change_log
323 .old_value_sst
324 .to_protobuf()
325 .into_iter()
326 .map(Into::into)
327 .collect(),
328 non_checkpoint_epochs: change_log
329 .non_checkpoint_epochs
330 .0
331 .iter()
332 .map(|e| *e as _)
333 .collect(),
334 checkpoint_epoch: change_log.checkpoint_epoch as _,
335 }
336}