1use std::collections::BTreeMap;
16use std::time::Duration;
17
18use anyhow::{Context, anyhow};
19use risingwave_common::bail;
20use risingwave_common::hash::VnodeCount;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_meta_model::{
23 PrivateLinkService, connection, database, function, index, object, schema, secret, sink,
24 source, subscription, table, view,
25};
26use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait};
27use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo;
28use risingwave_pb::catalog::table::{CdcTableType as PbCdcTableType, PbEngine, PbTableType};
29use risingwave_pb::catalog::{
30 PbConnection, PbCreateType, PbDatabase, PbFunction, PbHandleConflictBehavior, PbIndex,
31 PbSchema, PbSecret, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbSubscription, PbTable,
32 PbView,
33};
34use sea_orm::{ConnectOptions, DatabaseConnection, DbBackend, ModelTrait};
35
36use crate::{MetaError, MetaResult, MetaStoreBackend};
37
38pub mod catalog;
39pub mod cluster;
40pub mod fragment;
41pub mod id;
42pub mod rename;
43pub mod scale;
44pub mod session_params;
45pub mod streaming_job;
46pub mod system_param;
47pub mod user;
48pub mod utils;
49
50impl From<sea_orm::DbErr> for MetaError {
52 fn from(err: sea_orm::DbErr) -> Self {
53 if let Some(err) = err.sql_err() {
54 return anyhow!(err).into();
55 }
56 anyhow!(err).into()
57 }
58}
59
60#[derive(Clone)]
61pub struct SqlMetaStore {
62 pub conn: DatabaseConnection,
63 pub endpoint: String,
64}
65
66impl SqlMetaStore {
67 pub async fn connect(backend: MetaStoreBackend) -> MetaResult<Self> {
69 const MAX_DURATION: Duration = Duration::new(u64::MAX / 4, 0);
70
71 #[easy_ext::ext]
72 impl ConnectOptions {
73 fn sqlite_common(&mut self) -> &mut Self {
75 self
76 .min_connections(1)
79 .max_connections(1)
80 .acquire_timeout(MAX_DURATION)
84 .connect_timeout(MAX_DURATION)
85 }
86 }
87
88 Ok(match backend {
89 MetaStoreBackend::Mem => {
90 const IN_MEMORY_STORE: &str = "sqlite::memory:";
91
92 let mut options = ConnectOptions::new(IN_MEMORY_STORE);
93
94 options
95 .sqlite_common()
96 .idle_timeout(MAX_DURATION)
101 .max_lifetime(MAX_DURATION);
102
103 let conn = sea_orm::Database::connect(options).await?;
104 Self {
105 conn,
106 endpoint: IN_MEMORY_STORE.to_owned(),
107 }
108 }
109 MetaStoreBackend::Sql { endpoint, config } => {
110 let mut options = ConnectOptions::new(endpoint.clone());
111 options
112 .max_connections(config.max_connections)
113 .min_connections(config.min_connections)
114 .connect_timeout(Duration::from_secs(config.connection_timeout_sec))
115 .idle_timeout(Duration::from_secs(config.idle_timeout_sec))
116 .acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));
117
118 if DbBackend::Sqlite.is_prefix_of(&endpoint) {
119 if endpoint.contains(":memory:") || endpoint.contains("mode=memory") {
120 bail!(
121 "use the `mem` backend instead of specifying a URL of in-memory SQLite"
122 );
123 }
124 options.sqlite_common();
125 }
126
127 let conn = sea_orm::Database::connect(options).await?;
128 Self { conn, endpoint }
129 }
130 })
131 }
132
133 #[cfg(any(test, feature = "test"))]
134 pub async fn for_test() -> Self {
135 let this = Self::connect(MetaStoreBackend::Mem).await.unwrap();
136 Migrator::up(&this.conn, None).await.unwrap();
137 this
138 }
139
140 async fn is_first_launch(&self) -> MetaResult<bool> {
146 let migrations = Migrator::get_applied_migrations(&self.conn)
147 .await
148 .context("failed to get applied migrations")?;
149 for migration in migrations {
150 if migration.name() == "m20230908_072257_init"
151 && migration.status() == MigrationStatus::Applied
152 {
153 return Ok(false);
154 }
155 }
156 Ok(true)
157 }
158
159 pub async fn up(&self) -> MetaResult<bool> {
163 let cluster_first_launch = self.is_first_launch().await?;
164 Migrator::up(&self.conn, None)
166 .await
167 .context("failed to upgrade models in meta store")?;
168
169 Ok(cluster_first_launch)
170 }
171}
172
173pub struct ObjectModel<M: ModelTrait>(M, object::Model);
174
175impl From<ObjectModel<database::Model>> for PbDatabase {
176 fn from(value: ObjectModel<database::Model>) -> Self {
177 Self {
178 id: value.0.database_id,
179 name: value.0.name,
180 owner: value.1.owner_id as _,
181 resource_group: value.0.resource_group.clone(),
182 barrier_interval_ms: value.0.barrier_interval_ms.map(|v| v as u32),
183 checkpoint_frequency: value.0.checkpoint_frequency.map(|v| v as u64),
184 }
185 }
186}
187
188impl From<ObjectModel<secret::Model>> for PbSecret {
189 fn from(value: ObjectModel<secret::Model>) -> Self {
190 Self {
191 id: value.0.secret_id,
192 name: value.0.name,
193 database_id: value.1.database_id.unwrap(),
194 value: value.0.value,
195 owner: value.1.owner_id as _,
196 schema_id: value.1.schema_id.unwrap(),
197 }
198 }
199}
200
201impl From<ObjectModel<schema::Model>> for PbSchema {
202 fn from(value: ObjectModel<schema::Model>) -> Self {
203 Self {
204 id: value.0.schema_id,
205 name: value.0.name,
206 database_id: value.1.database_id.unwrap(),
207 owner: value.1.owner_id as _,
208 }
209 }
210}
211
212impl From<ObjectModel<table::Model>> for PbTable {
213 fn from(value: ObjectModel<table::Model>) -> Self {
214 Self {
215 id: value.0.table_id,
216 schema_id: value.1.schema_id.unwrap(),
217 database_id: value.1.database_id.unwrap(),
218 name: value.0.name,
219 columns: value.0.columns.to_protobuf(),
220 pk: value.0.pk.to_protobuf(),
221 table_type: PbTableType::from(value.0.table_type) as _,
222 distribution_key: value.0.distribution_key.0,
223 stream_key: value.0.stream_key.0,
224 append_only: value.0.append_only,
225 owner: value.1.owner_id as _,
226 fragment_id: value.0.fragment_id.unwrap_or_default(),
227 vnode_col_index: value.0.vnode_col_index.map(|index| index as _),
228 row_id_index: value.0.row_id_index.map(|index| index as _),
229 value_indices: value.0.value_indices.0,
230 definition: value.0.definition,
231 handle_pk_conflict_behavior: PbHandleConflictBehavior::from(
232 value.0.handle_pk_conflict_behavior,
233 ) as _,
234 version_column_indices: value
235 .0
236 .version_column_indices
237 .unwrap_or_default()
238 .0
239 .iter()
240 .map(|&idx| idx as u32)
241 .collect(),
242 read_prefix_len_hint: value.0.read_prefix_len_hint as _,
243 watermark_indices: value.0.watermark_indices.0,
244 dist_key_in_pk: value.0.dist_key_in_pk.0,
245 dml_fragment_id: value.0.dml_fragment_id,
246 cardinality: value
247 .0
248 .cardinality
249 .map(|cardinality| cardinality.to_protobuf()),
250 initialized_at_epoch: Some(
251 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
252 ),
253 created_at_epoch: Some(
254 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
255 ),
256 cleaned_by_watermark: value.0.cleaned_by_watermark,
257 stream_job_status: PbStreamJobStatus::Created as _,
258 create_type: PbCreateType::Foreground as _,
259 version: value.0.version.map(|v| v.to_protobuf()),
260 optional_associated_source_id: value.0.optional_associated_source_id.map(Into::into),
261 description: value.0.description,
262 #[expect(deprecated)]
263 incoming_sinks: vec![],
264 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
265 created_at_cluster_version: value.1.created_at_cluster_version,
266 retention_seconds: value.0.retention_seconds.map(|id| id as u32),
267 cdc_table_id: value.0.cdc_table_id,
268 maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(),
269 webhook_info: value.0.webhook_info.map(|info| info.to_protobuf()),
270 job_id: value.0.belongs_to_job_id,
271 engine: value.0.engine.map(|engine| PbEngine::from(engine) as i32),
272 #[expect(deprecated)]
273 clean_watermark_index_in_pk: value.0.clean_watermark_index_in_pk,
274 clean_watermark_indices: value
275 .0
276 .clean_watermark_indices
277 .map(|indices| indices.0.iter().map(|&x| x as u32).collect())
278 .unwrap_or_default(),
279 refreshable: value.0.refreshable,
280 vector_index_info: value.0.vector_index_info.map(|index| index.to_protobuf()),
281 cdc_table_type: value
282 .0
283 .cdc_table_type
284 .map(|cdc_type| PbCdcTableType::from(cdc_type) as i32),
285 }
286 }
287}
288
289impl From<ObjectModel<source::Model>> for PbSource {
290 fn from(value: ObjectModel<source::Model>) -> Self {
291 let mut secret_ref_map = BTreeMap::new();
292 if let Some(secret_ref) = value.0.secret_ref {
293 secret_ref_map = secret_ref.to_protobuf();
294 }
295 Self {
296 id: value.0.source_id as _,
297 schema_id: value.1.schema_id.unwrap(),
298 database_id: value.1.database_id.unwrap(),
299 name: value.0.name,
300 row_id_index: value.0.row_id_index.map(|id| id as _),
301 columns: value.0.columns.to_protobuf(),
302 pk_column_ids: value.0.pk_column_ids.0,
303 with_properties: value.0.with_properties.0,
304 owner: value.1.owner_id as _,
305 info: value.0.source_info.map(|info| info.to_protobuf()),
306 watermark_descs: value.0.watermark_descs.to_protobuf(),
307 definition: value.0.definition,
308 connection_id: value.0.connection_id,
309 initialized_at_epoch: Some(
311 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
312 ),
313 created_at_epoch: Some(
314 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
315 ),
316 version: value.0.version as _,
317 optional_associated_table_id: value.0.optional_associated_table_id.map(Into::into),
318 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
319 created_at_cluster_version: value.1.created_at_cluster_version,
320 secret_refs: secret_ref_map,
321 rate_limit: value.0.rate_limit.map(|v| v as _),
322 refresh_mode: value
323 .0
324 .refresh_mode
325 .map(|refresh_mode| refresh_mode.to_protobuf()),
326 }
327 }
328}
329
330impl From<ObjectModel<sink::Model>> for PbSink {
331 fn from(value: ObjectModel<sink::Model>) -> Self {
332 let mut secret_ref_map = BTreeMap::new();
333 if let Some(secret_ref) = value.0.secret_ref {
334 secret_ref_map = secret_ref.to_protobuf();
335 }
336 Self {
337 id: value.0.sink_id as _,
338 schema_id: value.1.schema_id.unwrap(),
339 database_id: value.1.database_id.unwrap(),
340 name: value.0.name,
341 columns: value.0.columns.to_protobuf(),
342 plan_pk: value.0.plan_pk.to_protobuf(),
343 distribution_key: value.0.distribution_key.0,
344 downstream_pk: value.0.downstream_pk.0,
345 sink_type: PbSinkType::from(value.0.sink_type) as _,
346 owner: value.1.owner_id as _,
347 properties: value.0.properties.0,
348 definition: value.0.definition,
349 connection_id: value.0.connection_id,
350 initialized_at_epoch: Some(
351 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
352 ),
353 created_at_epoch: Some(
354 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
355 ),
356 db_name: value.0.db_name,
357 sink_from_name: value.0.sink_from_name,
358 stream_job_status: PbStreamJobStatus::Created as _,
359 format_desc: value.0.sink_format_desc.map(|desc| desc.to_protobuf()),
360 target_table: value.0.target_table,
361 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
362 created_at_cluster_version: value.1.created_at_cluster_version,
363 create_type: PbCreateType::Foreground as _,
364 secret_refs: secret_ref_map,
365 original_target_columns: value
366 .0
367 .original_target_columns
368 .map(|cols| cols.to_protobuf())
369 .unwrap_or_default(),
370 auto_refresh_schema_from_table: value.0.auto_refresh_schema_from_table,
371 }
372 }
373}
374
375impl From<ObjectModel<subscription::Model>> for PbSubscription {
376 fn from(value: ObjectModel<subscription::Model>) -> Self {
377 Self {
378 id: value.0.subscription_id as _,
379 schema_id: value.1.schema_id.unwrap(),
380 database_id: value.1.database_id.unwrap(),
381 name: value.0.name,
382 owner: value.1.owner_id as _,
383 retention_seconds: value.0.retention_seconds as _,
384 definition: value.0.definition,
385 initialized_at_epoch: Some(
386 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
387 ),
388 created_at_epoch: Some(
389 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
390 ),
391 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
392 created_at_cluster_version: value.1.created_at_cluster_version,
393 dependent_table_id: value.0.dependent_table_id,
394 subscription_state: value.0.subscription_state as _,
395 }
396 }
397}
398
399impl From<ObjectModel<index::Model>> for PbIndex {
400 fn from(value: ObjectModel<index::Model>) -> Self {
401 Self {
402 id: value.0.index_id as _,
403 schema_id: value.1.schema_id.unwrap(),
404 database_id: value.1.database_id.unwrap(),
405 name: value.0.name,
406 owner: value.1.owner_id as _,
407 index_table_id: value.0.index_table_id,
408 primary_table_id: value.0.primary_table_id,
409 index_item: value.0.index_items.to_protobuf(),
410 index_column_properties: value
411 .0
412 .index_column_properties
413 .map(|p| p.to_protobuf())
414 .unwrap_or_default(),
415 index_columns_len: value.0.index_columns_len as _,
416 initialized_at_epoch: Some(
417 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
418 ),
419 created_at_epoch: Some(
420 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
421 ),
422 stream_job_status: PbStreamJobStatus::Created as _,
423 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
424 created_at_cluster_version: value.1.created_at_cluster_version,
425 create_type: risingwave_pb::catalog::CreateType::Foreground.into(), }
427 }
428}
429
430impl From<ObjectModel<view::Model>> for PbView {
431 fn from(value: ObjectModel<view::Model>) -> Self {
432 Self {
433 id: value.0.view_id as _,
434 schema_id: value.1.schema_id.unwrap(),
435 database_id: value.1.database_id.unwrap(),
436 name: value.0.name,
437 owner: value.1.owner_id as _,
438 properties: value.0.properties.0,
439 sql: value.0.definition,
440 columns: value.0.columns.to_protobuf(),
441 created_at_epoch: Some(
442 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
443 ),
444 created_at_cluster_version: value.1.created_at_cluster_version,
445 }
446 }
447}
448
449impl From<ObjectModel<connection::Model>> for PbConnection {
450 fn from(value: ObjectModel<connection::Model>) -> Self {
451 let info: PbConnectionInfo = if value.0.info == PrivateLinkService::default() {
452 PbConnectionInfo::ConnectionParams(value.0.params.to_protobuf())
453 } else {
454 PbConnectionInfo::PrivateLinkService(value.0.info.to_protobuf())
455 };
456 Self {
457 id: value.1.oid.as_connection_id(),
458 schema_id: value.1.schema_id.unwrap(),
459 database_id: value.1.database_id.unwrap(),
460 name: value.0.name,
461 owner: value.1.owner_id as _,
462 info: Some(info),
463 }
464 }
465}
466
467impl From<ObjectModel<function::Model>> for PbFunction {
468 fn from(value: ObjectModel<function::Model>) -> Self {
469 Self {
470 id: value.0.function_id as _,
471 schema_id: value.1.schema_id.unwrap(),
472 database_id: value.1.database_id.unwrap(),
473 name: value.0.name,
474 owner: value.1.owner_id as _,
475 arg_names: value.0.arg_names.split(',').map(|s| s.to_owned()).collect(),
476 arg_types: value.0.arg_types.to_protobuf(),
477 return_type: Some(value.0.return_type.to_protobuf()),
478 language: value.0.language,
479 runtime: value.0.runtime,
480 link: value.0.link,
481 name_in_runtime: value.0.name_in_runtime,
482 body: value.0.body,
483 compressed_binary: value.0.compressed_binary,
484 kind: Some(value.0.kind.into()),
485 always_retry_on_network_error: value.0.always_retry_on_network_error,
486 is_async: value
487 .0
488 .options
489 .as_ref()
490 .and_then(|o| o.0.get("async").map(|v| v == "true")),
491 is_batched: value
492 .0
493 .options
494 .as_ref()
495 .and_then(|o| o.0.get("batch").map(|v| v == "true")),
496 created_at_epoch: Some(
497 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
498 ),
499 created_at_cluster_version: value.1.created_at_cluster_version,
500 }
501 }
502}