1use std::collections::BTreeMap;
16use std::time::Duration;
17
18use anyhow::{Context, anyhow};
19use risingwave_common::bail;
20use risingwave_common::hash::VnodeCount;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_meta_model::{
23 PrivateLinkService, connection, database, function, index, object, schema, secret, sink,
24 source, subscription, table, view,
25};
26use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait};
27use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo;
28use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
29use risingwave_pb::catalog::table::{
30 CdcTableType as PbCdcTableType, PbEngine, PbOptionalAssociatedSourceId, PbTableType,
31};
32use risingwave_pb::catalog::{
33 PbConnection, PbCreateType, PbDatabase, PbFunction, PbHandleConflictBehavior, PbIndex,
34 PbSchema, PbSecret, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbSubscription, PbTable,
35 PbView,
36};
37use sea_orm::{ConnectOptions, DatabaseConnection, DbBackend, ModelTrait};
38
39use crate::{MetaError, MetaResult, MetaStoreBackend};
40
41pub mod catalog;
42pub mod cluster;
43pub mod fragment;
44pub mod id;
45pub mod rename;
46pub mod scale;
47pub mod session_params;
48pub mod streaming_job;
49pub mod system_param;
50pub mod user;
51pub mod utils;
52
53impl From<sea_orm::DbErr> for MetaError {
55 fn from(err: sea_orm::DbErr) -> Self {
56 if let Some(err) = err.sql_err() {
57 return anyhow!(err).into();
58 }
59 anyhow!(err).into()
60 }
61}
62
63#[derive(Clone)]
64pub struct SqlMetaStore {
65 pub conn: DatabaseConnection,
66 pub endpoint: String,
67}
68
69impl SqlMetaStore {
70 pub async fn connect(backend: MetaStoreBackend) -> MetaResult<Self> {
72 const MAX_DURATION: Duration = Duration::new(u64::MAX / 4, 0);
73
74 #[easy_ext::ext]
75 impl ConnectOptions {
76 fn sqlite_common(&mut self) -> &mut Self {
78 self
79 .min_connections(1)
82 .max_connections(1)
83 .acquire_timeout(MAX_DURATION)
87 .connect_timeout(MAX_DURATION)
88 }
89 }
90
91 Ok(match backend {
92 MetaStoreBackend::Mem => {
93 const IN_MEMORY_STORE: &str = "sqlite::memory:";
94
95 let mut options = ConnectOptions::new(IN_MEMORY_STORE);
96
97 options
98 .sqlite_common()
99 .idle_timeout(MAX_DURATION)
104 .max_lifetime(MAX_DURATION);
105
106 let conn = sea_orm::Database::connect(options).await?;
107 Self {
108 conn,
109 endpoint: IN_MEMORY_STORE.to_owned(),
110 }
111 }
112 MetaStoreBackend::Sql { endpoint, config } => {
113 let mut options = ConnectOptions::new(endpoint.clone());
114 options
115 .max_connections(config.max_connections)
116 .min_connections(config.min_connections)
117 .connect_timeout(Duration::from_secs(config.connection_timeout_sec))
118 .idle_timeout(Duration::from_secs(config.idle_timeout_sec))
119 .acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));
120
121 if DbBackend::Sqlite.is_prefix_of(&endpoint) {
122 if endpoint.contains(":memory:") || endpoint.contains("mode=memory") {
123 bail!(
124 "use the `mem` backend instead of specifying a URL of in-memory SQLite"
125 );
126 }
127 options.sqlite_common();
128 }
129
130 let conn = sea_orm::Database::connect(options).await?;
131 Self { conn, endpoint }
132 }
133 })
134 }
135
136 #[cfg(any(test, feature = "test"))]
137 pub async fn for_test() -> Self {
138 let this = Self::connect(MetaStoreBackend::Mem).await.unwrap();
139 Migrator::up(&this.conn, None).await.unwrap();
140 this
141 }
142
143 async fn is_first_launch(&self) -> MetaResult<bool> {
149 let migrations = Migrator::get_applied_migrations(&self.conn)
150 .await
151 .context("failed to get applied migrations")?;
152 for migration in migrations {
153 if migration.name() == "m20230908_072257_init"
154 && migration.status() == MigrationStatus::Applied
155 {
156 return Ok(false);
157 }
158 }
159 Ok(true)
160 }
161
162 pub async fn up(&self) -> MetaResult<bool> {
166 let cluster_first_launch = self.is_first_launch().await?;
167 Migrator::up(&self.conn, None)
169 .await
170 .context("failed to upgrade models in meta store")?;
171
172 Ok(cluster_first_launch)
173 }
174}
175
176pub struct ObjectModel<M: ModelTrait>(M, object::Model);
177
178impl From<ObjectModel<database::Model>> for PbDatabase {
179 fn from(value: ObjectModel<database::Model>) -> Self {
180 Self {
181 id: value.0.database_id,
182 name: value.0.name,
183 owner: value.1.owner_id as _,
184 resource_group: value.0.resource_group.clone(),
185 barrier_interval_ms: value.0.barrier_interval_ms.map(|v| v as u32),
186 checkpoint_frequency: value.0.checkpoint_frequency.map(|v| v as u64),
187 }
188 }
189}
190
191impl From<ObjectModel<secret::Model>> for PbSecret {
192 fn from(value: ObjectModel<secret::Model>) -> Self {
193 Self {
194 id: value.0.secret_id as _,
195 name: value.0.name,
196 database_id: value.1.database_id.unwrap(),
197 value: value.0.value,
198 owner: value.1.owner_id as _,
199 schema_id: value.1.schema_id.unwrap(),
200 }
201 }
202}
203
204impl From<ObjectModel<schema::Model>> for PbSchema {
205 fn from(value: ObjectModel<schema::Model>) -> Self {
206 Self {
207 id: value.0.schema_id,
208 name: value.0.name,
209 database_id: value.1.database_id.unwrap(),
210 owner: value.1.owner_id as _,
211 }
212 }
213}
214
215impl From<ObjectModel<table::Model>> for PbTable {
216 fn from(value: ObjectModel<table::Model>) -> Self {
217 Self {
218 id: value.0.table_id,
219 schema_id: value.1.schema_id.unwrap(),
220 database_id: value.1.database_id.unwrap(),
221 name: value.0.name,
222 columns: value.0.columns.to_protobuf(),
223 pk: value.0.pk.to_protobuf(),
224 table_type: PbTableType::from(value.0.table_type) as _,
225 distribution_key: value.0.distribution_key.0,
226 stream_key: value.0.stream_key.0,
227 append_only: value.0.append_only,
228 owner: value.1.owner_id as _,
229 fragment_id: value.0.fragment_id.unwrap_or_default() as u32,
230 vnode_col_index: value.0.vnode_col_index.map(|index| index as _),
231 row_id_index: value.0.row_id_index.map(|index| index as _),
232 value_indices: value.0.value_indices.0,
233 definition: value.0.definition,
234 handle_pk_conflict_behavior: PbHandleConflictBehavior::from(
235 value.0.handle_pk_conflict_behavior,
236 ) as _,
237 version_column_indices: value
238 .0
239 .version_column_indices
240 .unwrap_or_default()
241 .0
242 .iter()
243 .map(|&idx| idx as u32)
244 .collect(),
245 read_prefix_len_hint: value.0.read_prefix_len_hint as _,
246 watermark_indices: value.0.watermark_indices.0,
247 dist_key_in_pk: value.0.dist_key_in_pk.0,
248 dml_fragment_id: value.0.dml_fragment_id.map(|id| id as u32),
249 cardinality: value
250 .0
251 .cardinality
252 .map(|cardinality| cardinality.to_protobuf()),
253 initialized_at_epoch: Some(
254 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
255 ),
256 created_at_epoch: Some(
257 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
258 ),
259 cleaned_by_watermark: value.0.cleaned_by_watermark,
260 stream_job_status: PbStreamJobStatus::Created as _,
261 create_type: PbCreateType::Foreground as _,
262 version: value.0.version.map(|v| v.to_protobuf()),
263 optional_associated_source_id: value
264 .0
265 .optional_associated_source_id
266 .map(|id| PbOptionalAssociatedSourceId::AssociatedSourceId(id as _)),
267 description: value.0.description,
268 #[expect(deprecated)]
269 incoming_sinks: vec![],
270 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
271 created_at_cluster_version: value.1.created_at_cluster_version,
272 retention_seconds: value.0.retention_seconds.map(|id| id as u32),
273 cdc_table_id: value.0.cdc_table_id,
274 maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(),
275 webhook_info: value.0.webhook_info.map(|info| info.to_protobuf()),
276 job_id: value.0.belongs_to_job_id,
277 engine: value.0.engine.map(|engine| PbEngine::from(engine) as i32),
278 clean_watermark_index_in_pk: value.0.clean_watermark_index_in_pk,
279 refreshable: value.0.refreshable,
280 vector_index_info: value.0.vector_index_info.map(|index| index.to_protobuf()),
281 cdc_table_type: value
282 .0
283 .cdc_table_type
284 .map(|cdc_type| PbCdcTableType::from(cdc_type) as i32),
285 refresh_state: Some(risingwave_pb::catalog::RefreshState::from(
286 value
287 .0
288 .refresh_state
289 .unwrap_or(risingwave_meta_model::table::RefreshState::Idle),
290 ) as i32),
291 }
292 }
293}
294
295impl From<ObjectModel<source::Model>> for PbSource {
296 fn from(value: ObjectModel<source::Model>) -> Self {
297 let mut secret_ref_map = BTreeMap::new();
298 if let Some(secret_ref) = value.0.secret_ref {
299 secret_ref_map = secret_ref.to_protobuf();
300 }
301 Self {
302 id: value.0.source_id as _,
303 schema_id: value.1.schema_id.unwrap(),
304 database_id: value.1.database_id.unwrap(),
305 name: value.0.name,
306 row_id_index: value.0.row_id_index.map(|id| id as _),
307 columns: value.0.columns.to_protobuf(),
308 pk_column_ids: value.0.pk_column_ids.0,
309 with_properties: value.0.with_properties.0,
310 owner: value.1.owner_id as _,
311 info: value.0.source_info.map(|info| info.to_protobuf()),
312 watermark_descs: value.0.watermark_descs.to_protobuf(),
313 definition: value.0.definition,
314 connection_id: value.0.connection_id.map(|id| id as _),
315 initialized_at_epoch: Some(
317 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
318 ),
319 created_at_epoch: Some(
320 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
321 ),
322 version: value.0.version as _,
323 optional_associated_table_id: value
324 .0
325 .optional_associated_table_id
326 .map(|id| PbOptionalAssociatedTableId::AssociatedTableId(id.as_raw_id())),
327 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
328 created_at_cluster_version: value.1.created_at_cluster_version,
329 secret_refs: secret_ref_map,
330 rate_limit: value.0.rate_limit.map(|v| v as _),
331 refresh_mode: value
332 .0
333 .refresh_mode
334 .map(|refresh_mode| refresh_mode.to_protobuf()),
335 }
336 }
337}
338
339impl From<ObjectModel<sink::Model>> for PbSink {
340 fn from(value: ObjectModel<sink::Model>) -> Self {
341 let mut secret_ref_map = BTreeMap::new();
342 if let Some(secret_ref) = value.0.secret_ref {
343 secret_ref_map = secret_ref.to_protobuf();
344 }
345 Self {
346 id: value.0.sink_id as _,
347 schema_id: value.1.schema_id.unwrap(),
348 database_id: value.1.database_id.unwrap(),
349 name: value.0.name,
350 columns: value.0.columns.to_protobuf(),
351 plan_pk: value.0.plan_pk.to_protobuf(),
352 distribution_key: value.0.distribution_key.0,
353 downstream_pk: value.0.downstream_pk.0,
354 sink_type: PbSinkType::from(value.0.sink_type) as _,
355 owner: value.1.owner_id as _,
356 properties: value.0.properties.0,
357 definition: value.0.definition,
358 connection_id: value.0.connection_id.map(|id| id as _),
359 initialized_at_epoch: Some(
360 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
361 ),
362 created_at_epoch: Some(
363 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
364 ),
365 db_name: value.0.db_name,
366 sink_from_name: value.0.sink_from_name,
367 stream_job_status: PbStreamJobStatus::Created as _,
368 format_desc: value.0.sink_format_desc.map(|desc| desc.to_protobuf()),
369 target_table: value.0.target_table,
370 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
371 created_at_cluster_version: value.1.created_at_cluster_version,
372 create_type: PbCreateType::Foreground as _,
373 secret_refs: secret_ref_map,
374 original_target_columns: value
375 .0
376 .original_target_columns
377 .map(|cols| cols.to_protobuf())
378 .unwrap_or_default(),
379 auto_refresh_schema_from_table: value.0.auto_refresh_schema_from_table,
380 }
381 }
382}
383
384impl From<ObjectModel<subscription::Model>> for PbSubscription {
385 fn from(value: ObjectModel<subscription::Model>) -> Self {
386 Self {
387 id: value.0.subscription_id as _,
388 schema_id: value.1.schema_id.unwrap(),
389 database_id: value.1.database_id.unwrap(),
390 name: value.0.name,
391 owner: value.1.owner_id as _,
392 retention_seconds: value.0.retention_seconds as _,
393 definition: value.0.definition,
394 initialized_at_epoch: Some(
395 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
396 ),
397 created_at_epoch: Some(
398 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
399 ),
400 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
401 created_at_cluster_version: value.1.created_at_cluster_version,
402 dependent_table_id: value.0.dependent_table_id as _,
403 subscription_state: value.0.subscription_state as _,
404 }
405 }
406}
407
408impl From<ObjectModel<index::Model>> for PbIndex {
409 fn from(value: ObjectModel<index::Model>) -> Self {
410 Self {
411 id: value.0.index_id as _,
412 schema_id: value.1.schema_id.unwrap(),
413 database_id: value.1.database_id.unwrap(),
414 name: value.0.name,
415 owner: value.1.owner_id as _,
416 index_table_id: value.0.index_table_id,
417 primary_table_id: value.0.primary_table_id,
418 index_item: value.0.index_items.to_protobuf(),
419 index_column_properties: value
420 .0
421 .index_column_properties
422 .map(|p| p.to_protobuf())
423 .unwrap_or_default(),
424 index_columns_len: value.0.index_columns_len as _,
425 initialized_at_epoch: Some(
426 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
427 ),
428 created_at_epoch: Some(
429 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
430 ),
431 stream_job_status: PbStreamJobStatus::Created as _,
432 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
433 created_at_cluster_version: value.1.created_at_cluster_version,
434 create_type: risingwave_pb::catalog::CreateType::Foreground.into(), }
436 }
437}
438
439impl From<ObjectModel<view::Model>> for PbView {
440 fn from(value: ObjectModel<view::Model>) -> Self {
441 Self {
442 id: value.0.view_id as _,
443 schema_id: value.1.schema_id.unwrap(),
444 database_id: value.1.database_id.unwrap(),
445 name: value.0.name,
446 owner: value.1.owner_id as _,
447 properties: value.0.properties.0,
448 sql: value.0.definition,
449 columns: value.0.columns.to_protobuf(),
450 }
451 }
452}
453
454impl From<ObjectModel<connection::Model>> for PbConnection {
455 fn from(value: ObjectModel<connection::Model>) -> Self {
456 let info: PbConnectionInfo = if value.0.info == PrivateLinkService::default() {
457 PbConnectionInfo::ConnectionParams(value.0.params.to_protobuf())
458 } else {
459 PbConnectionInfo::PrivateLinkService(value.0.info.to_protobuf())
460 };
461 Self {
462 id: value.1.oid as _,
463 schema_id: value.1.schema_id.unwrap(),
464 database_id: value.1.database_id.unwrap(),
465 name: value.0.name,
466 owner: value.1.owner_id as _,
467 info: Some(info),
468 }
469 }
470}
471
472impl From<ObjectModel<function::Model>> for PbFunction {
473 fn from(value: ObjectModel<function::Model>) -> Self {
474 Self {
475 id: value.0.function_id as _,
476 schema_id: value.1.schema_id.unwrap(),
477 database_id: value.1.database_id.unwrap(),
478 name: value.0.name,
479 owner: value.1.owner_id as _,
480 arg_names: value.0.arg_names.split(',').map(|s| s.to_owned()).collect(),
481 arg_types: value.0.arg_types.to_protobuf(),
482 return_type: Some(value.0.return_type.to_protobuf()),
483 language: value.0.language,
484 runtime: value.0.runtime,
485 link: value.0.link,
486 name_in_runtime: value.0.name_in_runtime,
487 body: value.0.body,
488 compressed_binary: value.0.compressed_binary,
489 kind: Some(value.0.kind.into()),
490 always_retry_on_network_error: value.0.always_retry_on_network_error,
491 is_async: value
492 .0
493 .options
494 .as_ref()
495 .and_then(|o| o.0.get("async").map(|v| v == "true")),
496 is_batched: value
497 .0
498 .options
499 .as_ref()
500 .and_then(|o| o.0.get("batch").map(|v| v == "true")),
501 created_at_epoch: Some(
502 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
503 ),
504 created_at_cluster_version: value.1.created_at_cluster_version,
505 }
506 }
507}