1use std::collections::BTreeMap;
16use std::time::Duration;
17
18use anyhow::{Context, anyhow};
19use risingwave_common::hash::VnodeCount;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::{
22 PrivateLinkService, connection, database, function, index, object, schema, secret, sink,
23 source, subscription, table, view,
24};
25use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait};
26use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo;
27use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
28use risingwave_pb::catalog::subscription::PbSubscriptionState;
29use risingwave_pb::catalog::table::{PbEngine, PbOptionalAssociatedSourceId, PbTableType};
30use risingwave_pb::catalog::{
31 PbConnection, PbCreateType, PbDatabase, PbFunction, PbHandleConflictBehavior, PbIndex,
32 PbSchema, PbSecret, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbSubscription, PbTable,
33 PbView,
34};
35use sea_orm::{ConnectOptions, DatabaseConnection, DbBackend, ModelTrait};
36
37use crate::{MetaError, MetaResult, MetaStoreBackend};
38
39pub mod catalog;
40pub mod cluster;
41pub mod fragment;
42pub mod id;
43pub mod rename;
44pub mod scale;
45pub mod session_params;
46pub mod streaming_job;
47pub mod system_param;
48pub mod user;
49pub mod utils;
50
51impl From<sea_orm::DbErr> for MetaError {
53 fn from(err: sea_orm::DbErr) -> Self {
54 if let Some(err) = err.sql_err() {
55 return anyhow!(err).into();
56 }
57 anyhow!(err).into()
58 }
59}
60
61#[derive(Clone)]
62pub struct SqlMetaStore {
63 pub conn: DatabaseConnection,
64 pub endpoint: String,
65}
66
67impl SqlMetaStore {
68 pub async fn connect(backend: MetaStoreBackend) -> Result<Self, sea_orm::DbErr> {
70 const MAX_DURATION: Duration = Duration::new(u64::MAX / 4, 0);
71
72 #[easy_ext::ext]
73 impl ConnectOptions {
74 fn sqlite_common(&mut self) -> &mut Self {
76 self
77 .min_connections(1)
80 .max_connections(1)
81 .acquire_timeout(MAX_DURATION)
85 .connect_timeout(MAX_DURATION)
86 }
87 }
88
89 Ok(match backend {
90 MetaStoreBackend::Mem => {
91 const IN_MEMORY_STORE: &str = "sqlite::memory:";
92
93 let mut options = ConnectOptions::new(IN_MEMORY_STORE);
94
95 options
96 .sqlite_common()
97 .idle_timeout(MAX_DURATION)
102 .max_lifetime(MAX_DURATION);
103
104 let conn = sea_orm::Database::connect(options).await?;
105 Self {
106 conn,
107 endpoint: IN_MEMORY_STORE.to_owned(),
108 }
109 }
110 MetaStoreBackend::Sql { endpoint, config } => {
111 let mut options = ConnectOptions::new(endpoint.clone());
112 options
113 .max_connections(config.max_connections)
114 .min_connections(config.min_connections)
115 .connect_timeout(Duration::from_secs(config.connection_timeout_sec))
116 .idle_timeout(Duration::from_secs(config.idle_timeout_sec))
117 .acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));
118
119 if DbBackend::Sqlite.is_prefix_of(&endpoint) {
120 options.sqlite_common();
121 }
122
123 let conn = sea_orm::Database::connect(options).await?;
124 Self { conn, endpoint }
125 }
126 })
127 }
128
129 #[cfg(any(test, feature = "test"))]
130 pub async fn for_test() -> Self {
131 let this = Self::connect(MetaStoreBackend::Mem).await.unwrap();
132 Migrator::up(&this.conn, None).await.unwrap();
133 this
134 }
135
136 async fn is_first_launch(&self) -> MetaResult<bool> {
142 let migrations = Migrator::get_applied_migrations(&self.conn)
143 .await
144 .context("failed to get applied migrations")?;
145 for migration in migrations {
146 if migration.name() == "m20230908_072257_init"
147 && migration.status() == MigrationStatus::Applied
148 {
149 return Ok(false);
150 }
151 }
152 Ok(true)
153 }
154
155 pub async fn up(&self) -> MetaResult<bool> {
159 let cluster_first_launch = self.is_first_launch().await?;
160 Migrator::up(&self.conn, None)
162 .await
163 .context("failed to upgrade models in meta store")?;
164
165 Ok(cluster_first_launch)
166 }
167}
168
169pub struct ObjectModel<M: ModelTrait>(M, object::Model);
170
171impl From<ObjectModel<database::Model>> for PbDatabase {
172 fn from(value: ObjectModel<database::Model>) -> Self {
173 Self {
174 id: value.0.database_id as _,
175 name: value.0.name,
176 owner: value.1.owner_id as _,
177 resource_group: value.0.resource_group.clone(),
178 }
179 }
180}
181
182impl From<ObjectModel<secret::Model>> for PbSecret {
183 fn from(value: ObjectModel<secret::Model>) -> Self {
184 Self {
185 id: value.0.secret_id as _,
186 name: value.0.name,
187 database_id: value.1.database_id.unwrap() as _,
188 value: value.0.value,
189 owner: value.1.owner_id as _,
190 schema_id: value.1.schema_id.unwrap() as _,
191 }
192 }
193}
194
195impl From<ObjectModel<schema::Model>> for PbSchema {
196 fn from(value: ObjectModel<schema::Model>) -> Self {
197 Self {
198 id: value.0.schema_id as _,
199 name: value.0.name,
200 database_id: value.1.database_id.unwrap() as _,
201 owner: value.1.owner_id as _,
202 }
203 }
204}
205
206impl From<ObjectModel<table::Model>> for PbTable {
207 fn from(value: ObjectModel<table::Model>) -> Self {
208 Self {
209 id: value.0.table_id as _,
210 schema_id: value.1.schema_id.unwrap() as _,
211 database_id: value.1.database_id.unwrap() as _,
212 name: value.0.name,
213 columns: value.0.columns.to_protobuf(),
214 pk: value.0.pk.to_protobuf(),
215 dependent_relations: vec![], table_type: PbTableType::from(value.0.table_type) as _,
217 distribution_key: value.0.distribution_key.0,
218 stream_key: value.0.stream_key.0,
219 append_only: value.0.append_only,
220 owner: value.1.owner_id as _,
221 fragment_id: value.0.fragment_id.unwrap_or_default() as u32,
222 vnode_col_index: value.0.vnode_col_index.map(|index| index as _),
223 row_id_index: value.0.row_id_index.map(|index| index as _),
224 value_indices: value.0.value_indices.0,
225 definition: value.0.definition,
226 handle_pk_conflict_behavior: PbHandleConflictBehavior::from(
227 value.0.handle_pk_conflict_behavior,
228 ) as _,
229 version_column_index: value.0.version_column_index.map(|x| x as u32),
230 read_prefix_len_hint: value.0.read_prefix_len_hint as _,
231 watermark_indices: value.0.watermark_indices.0,
232 dist_key_in_pk: value.0.dist_key_in_pk.0,
233 dml_fragment_id: value.0.dml_fragment_id.map(|id| id as u32),
234 cardinality: value
235 .0
236 .cardinality
237 .map(|cardinality| cardinality.to_protobuf()),
238 initialized_at_epoch: Some(
239 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
240 ),
241 created_at_epoch: Some(
242 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
243 ),
244 cleaned_by_watermark: value.0.cleaned_by_watermark,
245 stream_job_status: PbStreamJobStatus::Created as _,
246 create_type: PbCreateType::Foreground as _,
247 version: value.0.version.map(|v| v.to_protobuf()),
248 optional_associated_source_id: value
249 .0
250 .optional_associated_source_id
251 .map(|id| PbOptionalAssociatedSourceId::AssociatedSourceId(id as _)),
252 description: value.0.description,
253 incoming_sinks: value.0.incoming_sinks.into_u32_array(),
254 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
255 created_at_cluster_version: value.1.created_at_cluster_version,
256 retention_seconds: value.0.retention_seconds.map(|id| id as u32),
257 cdc_table_id: value.0.cdc_table_id,
258 maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(),
259 webhook_info: value.0.webhook_info.map(|info| info.to_protobuf()),
260 job_id: value.0.belongs_to_job_id.map(|id| id as _),
261 engine: value.0.engine.map(|engine| PbEngine::from(engine) as i32),
262 clean_watermark_index_in_pk: value.0.clean_watermark_index_in_pk,
263 }
264 }
265}
266
267impl From<ObjectModel<source::Model>> for PbSource {
268 fn from(value: ObjectModel<source::Model>) -> Self {
269 let mut secret_ref_map = BTreeMap::new();
270 if let Some(secret_ref) = value.0.secret_ref {
271 secret_ref_map = secret_ref.to_protobuf();
272 }
273 Self {
274 id: value.0.source_id as _,
275 schema_id: value.1.schema_id.unwrap() as _,
276 database_id: value.1.database_id.unwrap() as _,
277 name: value.0.name,
278 row_id_index: value.0.row_id_index.map(|id| id as _),
279 columns: value.0.columns.to_protobuf(),
280 pk_column_ids: value.0.pk_column_ids.0,
281 with_properties: value.0.with_properties.0,
282 owner: value.1.owner_id as _,
283 info: value.0.source_info.map(|info| info.to_protobuf()),
284 watermark_descs: value.0.watermark_descs.to_protobuf(),
285 definition: value.0.definition,
286 connection_id: value.0.connection_id.map(|id| id as _),
287 initialized_at_epoch: Some(
289 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
290 ),
291 created_at_epoch: Some(
292 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
293 ),
294 version: value.0.version as _,
295 optional_associated_table_id: value
296 .0
297 .optional_associated_table_id
298 .map(|id| PbOptionalAssociatedTableId::AssociatedTableId(id as _)),
299 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
300 created_at_cluster_version: value.1.created_at_cluster_version,
301 secret_refs: secret_ref_map,
302 rate_limit: value.0.rate_limit.map(|v| v as _),
303 }
304 }
305}
306
307impl From<ObjectModel<sink::Model>> for PbSink {
308 fn from(value: ObjectModel<sink::Model>) -> Self {
309 let mut secret_ref_map = BTreeMap::new();
310 if let Some(secret_ref) = value.0.secret_ref {
311 secret_ref_map = secret_ref.to_protobuf();
312 }
313 #[allow(deprecated)] Self {
315 id: value.0.sink_id as _,
316 schema_id: value.1.schema_id.unwrap() as _,
317 database_id: value.1.database_id.unwrap() as _,
318 name: value.0.name,
319 columns: value.0.columns.to_protobuf(),
320 plan_pk: value.0.plan_pk.to_protobuf(),
321 dependent_relations: vec![],
322 distribution_key: value.0.distribution_key.0,
323 downstream_pk: value.0.downstream_pk.0,
324 sink_type: PbSinkType::from(value.0.sink_type) as _,
325 owner: value.1.owner_id as _,
326 properties: value.0.properties.0,
327 definition: value.0.definition,
328 connection_id: value.0.connection_id.map(|id| id as _),
329 initialized_at_epoch: Some(
330 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
331 ),
332 created_at_epoch: Some(
333 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
334 ),
335 db_name: value.0.db_name,
336 sink_from_name: value.0.sink_from_name,
337 stream_job_status: PbStreamJobStatus::Created as _,
338 format_desc: value.0.sink_format_desc.map(|desc| desc.to_protobuf()),
339 target_table: value.0.target_table.map(|id| id as _),
340 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
341 created_at_cluster_version: value.1.created_at_cluster_version,
342 create_type: PbCreateType::Foreground as _,
343 secret_refs: secret_ref_map,
344 original_target_columns: value
345 .0
346 .original_target_columns
347 .map(|cols| cols.to_protobuf())
348 .unwrap_or_default(),
349 }
350 }
351}
352
353impl From<ObjectModel<subscription::Model>> for PbSubscription {
354 fn from(value: ObjectModel<subscription::Model>) -> Self {
355 Self {
356 id: value.0.subscription_id as _,
357 schema_id: value.1.schema_id.unwrap() as _,
358 database_id: value.1.database_id.unwrap() as _,
359 name: value.0.name,
360 owner: value.1.owner_id as _,
361 retention_seconds: value.0.retention_seconds as _,
362 definition: value.0.definition,
363 initialized_at_epoch: Some(
364 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
365 ),
366 created_at_epoch: Some(
367 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
368 ),
369 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
370 created_at_cluster_version: value.1.created_at_cluster_version,
371 dependent_table_id: value.0.dependent_table_id as _,
372 subscription_state: PbSubscriptionState::Init as _,
373 }
374 }
375}
376
377impl From<ObjectModel<index::Model>> for PbIndex {
378 fn from(value: ObjectModel<index::Model>) -> Self {
379 Self {
380 id: value.0.index_id as _,
381 schema_id: value.1.schema_id.unwrap() as _,
382 database_id: value.1.database_id.unwrap() as _,
383 name: value.0.name,
384 owner: value.1.owner_id as _,
385 index_table_id: value.0.index_table_id as _,
386 primary_table_id: value.0.primary_table_id as _,
387 index_item: value.0.index_items.to_protobuf(),
388 index_column_properties: value
389 .0
390 .index_column_properties
391 .map(|p| p.to_protobuf())
392 .unwrap_or_default(),
393 index_columns_len: value.0.index_columns_len as _,
394 initialized_at_epoch: Some(
395 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
396 ),
397 created_at_epoch: Some(
398 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
399 ),
400 stream_job_status: PbStreamJobStatus::Created as _,
401 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
402 created_at_cluster_version: value.1.created_at_cluster_version,
403 }
404 }
405}
406
407impl From<ObjectModel<view::Model>> for PbView {
408 fn from(value: ObjectModel<view::Model>) -> Self {
409 Self {
410 id: value.0.view_id as _,
411 schema_id: value.1.schema_id.unwrap() as _,
412 database_id: value.1.database_id.unwrap() as _,
413 name: value.0.name,
414 owner: value.1.owner_id as _,
415 properties: value.0.properties.0,
416 sql: value.0.definition,
417 dependent_relations: vec![], columns: value.0.columns.to_protobuf(),
419 }
420 }
421}
422
423impl From<ObjectModel<connection::Model>> for PbConnection {
424 fn from(value: ObjectModel<connection::Model>) -> Self {
425 let info: PbConnectionInfo = if value.0.info == PrivateLinkService::default() {
426 PbConnectionInfo::ConnectionParams(value.0.params.to_protobuf())
427 } else {
428 PbConnectionInfo::PrivateLinkService(value.0.info.to_protobuf())
429 };
430 Self {
431 id: value.1.oid as _,
432 schema_id: value.1.schema_id.unwrap() as _,
433 database_id: value.1.database_id.unwrap() as _,
434 name: value.0.name,
435 owner: value.1.owner_id as _,
436 info: Some(info),
437 }
438 }
439}
440
441impl From<ObjectModel<function::Model>> for PbFunction {
442 fn from(value: ObjectModel<function::Model>) -> Self {
443 Self {
444 id: value.0.function_id as _,
445 schema_id: value.1.schema_id.unwrap() as _,
446 database_id: value.1.database_id.unwrap() as _,
447 name: value.0.name,
448 owner: value.1.owner_id as _,
449 arg_names: value.0.arg_names.split(',').map(|s| s.to_owned()).collect(),
450 arg_types: value.0.arg_types.to_protobuf(),
451 return_type: Some(value.0.return_type.to_protobuf()),
452 language: value.0.language,
453 runtime: value.0.runtime,
454 link: value.0.link,
455 name_in_runtime: value.0.name_in_runtime,
456 body: value.0.body,
457 compressed_binary: value.0.compressed_binary,
458 kind: Some(value.0.kind.into()),
459 always_retry_on_network_error: value.0.always_retry_on_network_error,
460 is_async: value
461 .0
462 .options
463 .as_ref()
464 .and_then(|o| o.0.get("async").map(|v| v == "true")),
465 is_batched: value
466 .0
467 .options
468 .as_ref()
469 .and_then(|o| o.0.get("batch").map(|v| v == "true")),
470 }
471 }
472}