risingwave_meta/hummock/model/ext/
hummock.rs1use itertools::Itertools;
16use risingwave_hummock_sdk::version::HummockVersionDelta;
17use risingwave_meta_model::compaction_config::CompactionConfig;
18use risingwave_meta_model::compaction_status::LevelHandlers;
19use risingwave_meta_model::compaction_task::CompactionTask;
20use risingwave_meta_model::hummock_version_delta::FullVersionDelta;
21use risingwave_meta_model::hummock_version_stats::TableStats;
22use risingwave_meta_model::{
23 CompactionGroupId, CompactionTaskId, HummockVersionId, WorkerId, compaction_config,
24 compaction_status, compaction_task, hummock_pinned_snapshot, hummock_pinned_version,
25 hummock_version_delta, hummock_version_stats,
26};
27use risingwave_pb::hummock::{
28 CompactTaskAssignment, HummockPinnedSnapshot, HummockPinnedVersion, HummockVersionStats,
29};
30use sea_orm::ActiveValue::Set;
31use sea_orm::EntityTrait;
32use sea_orm::sea_query::OnConflict;
33
34use crate::hummock::compaction::CompactStatus;
35use crate::hummock::model::CompactionGroup;
36use crate::model::{MetadataModelError, MetadataModelResult, Transactional};
37use crate::storage::MetaStoreError;
38
39pub type Transaction = sea_orm::DatabaseTransaction;
40
41impl From<sea_orm::DbErr> for MetadataModelError {
42 fn from(err: sea_orm::DbErr) -> Self {
43 MetadataModelError::MetaStoreError(MetaStoreError::Internal(err.into()))
44 }
45}
46
47#[async_trait::async_trait]
50impl Transactional<Transaction> for CompactionGroup {
51 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
52 let m = compaction_config::ActiveModel {
53 compaction_group_id: Set(self.group_id.try_into().unwrap()),
54 config: Set(CompactionConfig::from(&(*self.compaction_config))),
55 };
56 compaction_config::Entity::insert(m)
57 .on_conflict(
58 OnConflict::column(compaction_config::Column::CompactionGroupId)
59 .update_columns([compaction_config::Column::Config])
60 .to_owned(),
61 )
62 .exec(trx)
63 .await?;
64 Ok(())
65 }
66
67 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
68 compaction_config::Entity::delete_by_id(
69 CompactionGroupId::try_from(self.group_id).unwrap(),
70 )
71 .exec(trx)
72 .await?;
73 Ok(())
74 }
75}
76
77#[async_trait::async_trait]
78impl Transactional<Transaction> for CompactStatus {
79 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
80 let m = compaction_status::ActiveModel {
81 compaction_group_id: Set(self.compaction_group_id.try_into().unwrap()),
82 status: Set(LevelHandlers::from(
83 self.level_handlers.iter().map_into().collect_vec(),
84 )),
85 };
86 compaction_status::Entity::insert(m)
87 .on_conflict(
88 OnConflict::column(compaction_status::Column::CompactionGroupId)
89 .update_columns([compaction_status::Column::Status])
90 .to_owned(),
91 )
92 .exec(trx)
93 .await?;
94 Ok(())
95 }
96
97 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
98 compaction_status::Entity::delete_by_id(
99 CompactionGroupId::try_from(self.compaction_group_id).unwrap(),
100 )
101 .exec(trx)
102 .await?;
103 Ok(())
104 }
105}
106
107#[async_trait::async_trait]
108impl Transactional<Transaction> for CompactTaskAssignment {
109 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
110 let task = self.compact_task.to_owned().unwrap();
111 let m = compaction_task::ActiveModel {
112 id: Set(task.task_id.try_into().unwrap()),
113 context_id: Set(self.context_id.try_into().unwrap()),
114 task: Set(CompactionTask::from(&task)),
115 };
116 compaction_task::Entity::insert(m)
117 .on_conflict(
118 OnConflict::column(compaction_task::Column::Id)
119 .update_columns([
120 compaction_task::Column::ContextId,
121 compaction_task::Column::Task,
122 ])
123 .to_owned(),
124 )
125 .exec(trx)
126 .await?;
127 Ok(())
128 }
129
130 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
131 compaction_task::Entity::delete_by_id(
132 CompactionTaskId::try_from(self.compact_task.as_ref().unwrap().task_id).unwrap(),
133 )
134 .exec(trx)
135 .await?;
136 Ok(())
137 }
138}
139
140#[async_trait::async_trait]
141impl Transactional<Transaction> for HummockPinnedVersion {
142 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
143 let m = hummock_pinned_version::ActiveModel {
144 context_id: Set(self.context_id.try_into().unwrap()),
145 min_pinned_id: Set(self.min_pinned_id.try_into().unwrap()),
146 };
147 hummock_pinned_version::Entity::insert(m)
148 .on_conflict(
149 OnConflict::column(hummock_pinned_version::Column::ContextId)
150 .update_columns([hummock_pinned_version::Column::MinPinnedId])
151 .to_owned(),
152 )
153 .exec(trx)
154 .await?;
155 Ok(())
156 }
157
158 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
159 hummock_pinned_version::Entity::delete_by_id(WorkerId::try_from(self.context_id).unwrap())
160 .exec(trx)
161 .await?;
162 Ok(())
163 }
164}
165
166#[async_trait::async_trait]
167impl Transactional<Transaction> for HummockPinnedSnapshot {
168 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
169 let m = hummock_pinned_snapshot::ActiveModel {
170 context_id: Set(self.context_id.try_into().unwrap()),
171 min_pinned_snapshot: Set(self.minimal_pinned_snapshot.try_into().unwrap()),
172 };
173 hummock_pinned_snapshot::Entity::insert(m)
174 .on_conflict(
175 OnConflict::column(hummock_pinned_snapshot::Column::ContextId)
176 .update_columns([hummock_pinned_snapshot::Column::MinPinnedSnapshot])
177 .to_owned(),
178 )
179 .exec(trx)
180 .await?;
181 Ok(())
182 }
183
184 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
185 hummock_pinned_snapshot::Entity::delete_by_id(WorkerId::try_from(self.context_id).unwrap())
186 .exec(trx)
187 .await?;
188 Ok(())
189 }
190}
191
192#[async_trait::async_trait]
193impl Transactional<Transaction> for HummockVersionStats {
194 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
195 let m = hummock_version_stats::ActiveModel {
196 id: Set(self.hummock_version_id.try_into().unwrap()),
197 stats: Set(TableStats(self.table_stats.clone())),
198 };
199 hummock_version_stats::Entity::insert(m)
200 .on_conflict(
201 OnConflict::column(hummock_version_stats::Column::Id)
202 .update_columns([hummock_version_stats::Column::Stats])
203 .to_owned(),
204 )
205 .exec(trx)
206 .await?;
207 Ok(())
208 }
209
210 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
211 hummock_version_stats::Entity::delete_by_id(
212 HummockVersionId::try_from(self.hummock_version_id).unwrap(),
213 )
214 .exec(trx)
215 .await?;
216 Ok(())
217 }
218}
219
220#[async_trait::async_trait]
221impl Transactional<Transaction> for HummockVersionDelta {
222 async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
223 let m = hummock_version_delta::ActiveModel {
224 id: Set(self.id.to_u64().try_into().unwrap()),
225 prev_id: Set(self.prev_id.to_u64().try_into().unwrap()),
226 max_committed_epoch: Set(0.into()),
227 safe_epoch: Set(0.into()),
228 trivial_move: Set(self.trivial_move),
229 full_version_delta: Set(FullVersionDelta::from(&self.into())),
230 };
231 hummock_version_delta::Entity::insert(m)
232 .on_conflict(
233 OnConflict::column(hummock_version_delta::Column::Id)
234 .update_columns([
235 hummock_version_delta::Column::PrevId,
236 hummock_version_delta::Column::MaxCommittedEpoch,
237 hummock_version_delta::Column::SafeEpoch,
238 hummock_version_delta::Column::TrivialMove,
239 hummock_version_delta::Column::FullVersionDelta,
240 ])
241 .to_owned(),
242 )
243 .exec(trx)
244 .await?;
245 Ok(())
246 }
247
248 async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
249 hummock_version_delta::Entity::delete_by_id(
250 HummockVersionId::try_from(self.id.to_u64()).unwrap(),
251 )
252 .exec(trx)
253 .await?;
254 Ok(())
255 }
256}
257
258impl From<compaction_config::Model> for CompactionGroup {
259 fn from(value: compaction_config::Model) -> Self {
260 Self::new(
261 value.compaction_group_id.try_into().unwrap(),
262 value.config.to_protobuf(),
263 )
264 }
265}
266
267impl From<compaction_status::Model> for CompactStatus {
268 fn from(value: compaction_status::Model) -> Self {
269 Self {
270 compaction_group_id: value.compaction_group_id.try_into().unwrap(),
271 level_handlers: value.status.to_protobuf().iter().map_into().collect(),
272 }
273 }
274}