risingwave_meta/controller/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::time::Duration;
17
18use anyhow::{Context, anyhow};
19use risingwave_common::hash::VnodeCount;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::{
22    PrivateLinkService, connection, database, function, index, object, schema, secret, sink,
23    source, subscription, table, view,
24};
25use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait};
26use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo;
27use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
28use risingwave_pb::catalog::subscription::PbSubscriptionState;
29use risingwave_pb::catalog::table::{PbEngine, PbOptionalAssociatedSourceId, PbTableType};
30use risingwave_pb::catalog::{
31    PbConnection, PbCreateType, PbDatabase, PbFunction, PbHandleConflictBehavior, PbIndex,
32    PbSchema, PbSecret, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbSubscription, PbTable,
33    PbView,
34};
35use sea_orm::{ConnectOptions, DatabaseConnection, DbBackend, ModelTrait};
36
37use crate::{MetaError, MetaResult, MetaStoreBackend};
38
39pub mod catalog;
40pub mod cluster;
41pub mod fragment;
42pub mod id;
43pub mod rename;
44pub mod scale;
45pub mod session_params;
46pub mod streaming_job;
47pub mod system_param;
48pub mod user;
49pub mod utils;
50
51// todo: refine the error transform.
52impl From<sea_orm::DbErr> for MetaError {
53    fn from(err: sea_orm::DbErr) -> Self {
54        if let Some(err) = err.sql_err() {
55            return anyhow!(err).into();
56        }
57        anyhow!(err).into()
58    }
59}
60
61#[derive(Clone)]
62pub struct SqlMetaStore {
63    pub conn: DatabaseConnection,
64    pub endpoint: String,
65}
66
67impl SqlMetaStore {
68    /// Connect to the SQL meta store based on the given configuration.
69    pub async fn connect(backend: MetaStoreBackend) -> Result<Self, sea_orm::DbErr> {
70        const MAX_DURATION: Duration = Duration::new(u64::MAX / 4, 0);
71
72        #[easy_ext::ext]
73        impl ConnectOptions {
74            /// Apply common settings for `SQLite` connections.
75            fn sqlite_common(&mut self) -> &mut Self {
76                self
77                    // Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access,
78                    // here we forcibly specify the number of connections as 1.
79                    .min_connections(1)
80                    .max_connections(1)
81                    // Workaround for https://github.com/risingwavelabs/risingwave/issues/18966.
82                    // Note: don't quite get the point but `acquire_timeout` and `connect_timeout` maps to the
83                    //       same underlying setting in `sqlx` under current implementation.
84                    .acquire_timeout(MAX_DURATION)
85                    .connect_timeout(MAX_DURATION)
86            }
87        }
88
89        Ok(match backend {
90            MetaStoreBackend::Mem => {
91                const IN_MEMORY_STORE: &str = "sqlite::memory:";
92
93                let mut options = ConnectOptions::new(IN_MEMORY_STORE);
94
95                options
96                    .sqlite_common()
97                    // Releasing the connection to in-memory SQLite database is unacceptable
98                    // because it will clear the database. Set a large enough timeout to prevent it.
99                    // `sqlx` actually supports disabling these timeouts by passing a `None`, but
100                    // `sea-orm` does not expose this option.
101                    .idle_timeout(MAX_DURATION)
102                    .max_lifetime(MAX_DURATION);
103
104                let conn = sea_orm::Database::connect(options).await?;
105                Self {
106                    conn,
107                    endpoint: IN_MEMORY_STORE.to_owned(),
108                }
109            }
110            MetaStoreBackend::Sql { endpoint, config } => {
111                let mut options = ConnectOptions::new(endpoint.clone());
112                options
113                    .max_connections(config.max_connections)
114                    .min_connections(config.min_connections)
115                    .connect_timeout(Duration::from_secs(config.connection_timeout_sec))
116                    .idle_timeout(Duration::from_secs(config.idle_timeout_sec))
117                    .acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));
118
119                if DbBackend::Sqlite.is_prefix_of(&endpoint) {
120                    options.sqlite_common();
121                }
122
123                let conn = sea_orm::Database::connect(options).await?;
124                Self { conn, endpoint }
125            }
126        })
127    }
128
129    #[cfg(any(test, feature = "test"))]
130    pub async fn for_test() -> Self {
131        let this = Self::connect(MetaStoreBackend::Mem).await.unwrap();
132        Migrator::up(&this.conn, None).await.unwrap();
133        this
134    }
135
136    /// Check whether the cluster, which uses SQL as the backend, is a new cluster.
137    /// It determines this by inspecting the applied migrations. If the migration `m20230908_072257_init` has been applied,
138    /// then it is considered an old cluster.
139    ///
140    /// Note: this check should be performed before [`Self::up()`].
141    async fn is_first_launch(&self) -> MetaResult<bool> {
142        let migrations = Migrator::get_applied_migrations(&self.conn)
143            .await
144            .context("failed to get applied migrations")?;
145        for migration in migrations {
146            if migration.name() == "m20230908_072257_init"
147                && migration.status() == MigrationStatus::Applied
148            {
149                return Ok(false);
150            }
151        }
152        Ok(true)
153    }
154
155    /// Apply all the migrations to the meta store before starting the service.
156    ///
157    /// Returns whether the cluster is the first launch.
158    pub async fn up(&self) -> MetaResult<bool> {
159        let cluster_first_launch = self.is_first_launch().await?;
160        // Try to upgrade if any new model changes are added.
161        Migrator::up(&self.conn, None)
162            .await
163            .context("failed to upgrade models in meta store")?;
164
165        Ok(cluster_first_launch)
166    }
167}
168
169pub struct ObjectModel<M: ModelTrait>(M, object::Model);
170
171impl From<ObjectModel<database::Model>> for PbDatabase {
172    fn from(value: ObjectModel<database::Model>) -> Self {
173        Self {
174            id: value.0.database_id as _,
175            name: value.0.name,
176            owner: value.1.owner_id as _,
177            resource_group: value.0.resource_group.clone(),
178        }
179    }
180}
181
182impl From<ObjectModel<secret::Model>> for PbSecret {
183    fn from(value: ObjectModel<secret::Model>) -> Self {
184        Self {
185            id: value.0.secret_id as _,
186            name: value.0.name,
187            database_id: value.1.database_id.unwrap() as _,
188            value: value.0.value,
189            owner: value.1.owner_id as _,
190            schema_id: value.1.schema_id.unwrap() as _,
191        }
192    }
193}
194
195impl From<ObjectModel<schema::Model>> for PbSchema {
196    fn from(value: ObjectModel<schema::Model>) -> Self {
197        Self {
198            id: value.0.schema_id as _,
199            name: value.0.name,
200            database_id: value.1.database_id.unwrap() as _,
201            owner: value.1.owner_id as _,
202        }
203    }
204}
205
206impl From<ObjectModel<table::Model>> for PbTable {
207    fn from(value: ObjectModel<table::Model>) -> Self {
208        Self {
209            id: value.0.table_id as _,
210            schema_id: value.1.schema_id.unwrap() as _,
211            database_id: value.1.database_id.unwrap() as _,
212            name: value.0.name,
213            columns: value.0.columns.to_protobuf(),
214            pk: value.0.pk.to_protobuf(),
215            dependent_relations: vec![], // todo: deprecate it.
216            table_type: PbTableType::from(value.0.table_type) as _,
217            distribution_key: value.0.distribution_key.0,
218            stream_key: value.0.stream_key.0,
219            append_only: value.0.append_only,
220            owner: value.1.owner_id as _,
221            fragment_id: value.0.fragment_id.unwrap_or_default() as u32,
222            vnode_col_index: value.0.vnode_col_index.map(|index| index as _),
223            row_id_index: value.0.row_id_index.map(|index| index as _),
224            value_indices: value.0.value_indices.0,
225            definition: value.0.definition,
226            handle_pk_conflict_behavior: PbHandleConflictBehavior::from(
227                value.0.handle_pk_conflict_behavior,
228            ) as _,
229            version_column_index: value.0.version_column_index.map(|x| x as u32),
230            read_prefix_len_hint: value.0.read_prefix_len_hint as _,
231            watermark_indices: value.0.watermark_indices.0,
232            dist_key_in_pk: value.0.dist_key_in_pk.0,
233            dml_fragment_id: value.0.dml_fragment_id.map(|id| id as u32),
234            cardinality: value
235                .0
236                .cardinality
237                .map(|cardinality| cardinality.to_protobuf()),
238            initialized_at_epoch: Some(
239                Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
240            ),
241            created_at_epoch: Some(
242                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
243            ),
244            cleaned_by_watermark: value.0.cleaned_by_watermark,
245            stream_job_status: PbStreamJobStatus::Created as _,
246            create_type: PbCreateType::Foreground as _,
247            version: value.0.version.map(|v| v.to_protobuf()),
248            optional_associated_source_id: value
249                .0
250                .optional_associated_source_id
251                .map(|id| PbOptionalAssociatedSourceId::AssociatedSourceId(id as _)),
252            description: value.0.description,
253            incoming_sinks: value.0.incoming_sinks.into_u32_array(),
254            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
255            created_at_cluster_version: value.1.created_at_cluster_version,
256            retention_seconds: value.0.retention_seconds.map(|id| id as u32),
257            cdc_table_id: value.0.cdc_table_id,
258            maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(),
259            webhook_info: value.0.webhook_info.map(|info| info.to_protobuf()),
260            job_id: value.0.belongs_to_job_id.map(|id| id as _),
261            engine: value.0.engine.map(|engine| PbEngine::from(engine) as i32),
262            clean_watermark_index_in_pk: value.0.clean_watermark_index_in_pk,
263        }
264    }
265}
266
267impl From<ObjectModel<source::Model>> for PbSource {
268    fn from(value: ObjectModel<source::Model>) -> Self {
269        let mut secret_ref_map = BTreeMap::new();
270        if let Some(secret_ref) = value.0.secret_ref {
271            secret_ref_map = secret_ref.to_protobuf();
272        }
273        Self {
274            id: value.0.source_id as _,
275            schema_id: value.1.schema_id.unwrap() as _,
276            database_id: value.1.database_id.unwrap() as _,
277            name: value.0.name,
278            row_id_index: value.0.row_id_index.map(|id| id as _),
279            columns: value.0.columns.to_protobuf(),
280            pk_column_ids: value.0.pk_column_ids.0,
281            with_properties: value.0.with_properties.0,
282            owner: value.1.owner_id as _,
283            info: value.0.source_info.map(|info| info.to_protobuf()),
284            watermark_descs: value.0.watermark_descs.to_protobuf(),
285            definition: value.0.definition,
286            connection_id: value.0.connection_id.map(|id| id as _),
287            // todo: using the timestamp from the database directly.
288            initialized_at_epoch: Some(
289                Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
290            ),
291            created_at_epoch: Some(
292                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
293            ),
294            version: value.0.version as _,
295            optional_associated_table_id: value
296                .0
297                .optional_associated_table_id
298                .map(|id| PbOptionalAssociatedTableId::AssociatedTableId(id as _)),
299            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
300            created_at_cluster_version: value.1.created_at_cluster_version,
301            secret_refs: secret_ref_map,
302            rate_limit: value.0.rate_limit.map(|v| v as _),
303        }
304    }
305}
306
307impl From<ObjectModel<sink::Model>> for PbSink {
308    fn from(value: ObjectModel<sink::Model>) -> Self {
309        let mut secret_ref_map = BTreeMap::new();
310        if let Some(secret_ref) = value.0.secret_ref {
311            secret_ref_map = secret_ref.to_protobuf();
312        }
313        #[allow(deprecated)] // for `dependent_relations`
314        Self {
315            id: value.0.sink_id as _,
316            schema_id: value.1.schema_id.unwrap() as _,
317            database_id: value.1.database_id.unwrap() as _,
318            name: value.0.name,
319            columns: value.0.columns.to_protobuf(),
320            plan_pk: value.0.plan_pk.to_protobuf(),
321            dependent_relations: vec![],
322            distribution_key: value.0.distribution_key.0,
323            downstream_pk: value.0.downstream_pk.0,
324            sink_type: PbSinkType::from(value.0.sink_type) as _,
325            owner: value.1.owner_id as _,
326            properties: value.0.properties.0,
327            definition: value.0.definition,
328            connection_id: value.0.connection_id.map(|id| id as _),
329            initialized_at_epoch: Some(
330                Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
331            ),
332            created_at_epoch: Some(
333                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
334            ),
335            db_name: value.0.db_name,
336            sink_from_name: value.0.sink_from_name,
337            stream_job_status: PbStreamJobStatus::Created as _,
338            format_desc: value.0.sink_format_desc.map(|desc| desc.to_protobuf()),
339            target_table: value.0.target_table.map(|id| id as _),
340            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
341            created_at_cluster_version: value.1.created_at_cluster_version,
342            create_type: PbCreateType::Foreground as _,
343            secret_refs: secret_ref_map,
344            original_target_columns: value
345                .0
346                .original_target_columns
347                .map(|cols| cols.to_protobuf())
348                .unwrap_or_default(),
349        }
350    }
351}
352
353impl From<ObjectModel<subscription::Model>> for PbSubscription {
354    fn from(value: ObjectModel<subscription::Model>) -> Self {
355        Self {
356            id: value.0.subscription_id as _,
357            schema_id: value.1.schema_id.unwrap() as _,
358            database_id: value.1.database_id.unwrap() as _,
359            name: value.0.name,
360            owner: value.1.owner_id as _,
361            retention_seconds: value.0.retention_seconds as _,
362            definition: value.0.definition,
363            initialized_at_epoch: Some(
364                Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
365            ),
366            created_at_epoch: Some(
367                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
368            ),
369            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
370            created_at_cluster_version: value.1.created_at_cluster_version,
371            dependent_table_id: value.0.dependent_table_id as _,
372            subscription_state: PbSubscriptionState::Init as _,
373        }
374    }
375}
376
377impl From<ObjectModel<index::Model>> for PbIndex {
378    fn from(value: ObjectModel<index::Model>) -> Self {
379        Self {
380            id: value.0.index_id as _,
381            schema_id: value.1.schema_id.unwrap() as _,
382            database_id: value.1.database_id.unwrap() as _,
383            name: value.0.name,
384            owner: value.1.owner_id as _,
385            index_table_id: value.0.index_table_id as _,
386            primary_table_id: value.0.primary_table_id as _,
387            index_item: value.0.index_items.to_protobuf(),
388            index_column_properties: value
389                .0
390                .index_column_properties
391                .map(|p| p.to_protobuf())
392                .unwrap_or_default(),
393            index_columns_len: value.0.index_columns_len as _,
394            initialized_at_epoch: Some(
395                Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
396            ),
397            created_at_epoch: Some(
398                Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
399            ),
400            stream_job_status: PbStreamJobStatus::Created as _,
401            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
402            created_at_cluster_version: value.1.created_at_cluster_version,
403        }
404    }
405}
406
407impl From<ObjectModel<view::Model>> for PbView {
408    fn from(value: ObjectModel<view::Model>) -> Self {
409        Self {
410            id: value.0.view_id as _,
411            schema_id: value.1.schema_id.unwrap() as _,
412            database_id: value.1.database_id.unwrap() as _,
413            name: value.0.name,
414            owner: value.1.owner_id as _,
415            properties: value.0.properties.0,
416            sql: value.0.definition,
417            dependent_relations: vec![], // todo: deprecate it.
418            columns: value.0.columns.to_protobuf(),
419        }
420    }
421}
422
423impl From<ObjectModel<connection::Model>> for PbConnection {
424    fn from(value: ObjectModel<connection::Model>) -> Self {
425        let info: PbConnectionInfo = if value.0.info == PrivateLinkService::default() {
426            PbConnectionInfo::ConnectionParams(value.0.params.to_protobuf())
427        } else {
428            PbConnectionInfo::PrivateLinkService(value.0.info.to_protobuf())
429        };
430        Self {
431            id: value.1.oid as _,
432            schema_id: value.1.schema_id.unwrap() as _,
433            database_id: value.1.database_id.unwrap() as _,
434            name: value.0.name,
435            owner: value.1.owner_id as _,
436            info: Some(info),
437        }
438    }
439}
440
441impl From<ObjectModel<function::Model>> for PbFunction {
442    fn from(value: ObjectModel<function::Model>) -> Self {
443        Self {
444            id: value.0.function_id as _,
445            schema_id: value.1.schema_id.unwrap() as _,
446            database_id: value.1.database_id.unwrap() as _,
447            name: value.0.name,
448            owner: value.1.owner_id as _,
449            arg_names: value.0.arg_names.split(',').map(|s| s.to_owned()).collect(),
450            arg_types: value.0.arg_types.to_protobuf(),
451            return_type: Some(value.0.return_type.to_protobuf()),
452            language: value.0.language,
453            runtime: value.0.runtime,
454            link: value.0.link,
455            name_in_runtime: value.0.name_in_runtime,
456            body: value.0.body,
457            compressed_binary: value.0.compressed_binary,
458            kind: Some(value.0.kind.into()),
459            always_retry_on_network_error: value.0.always_retry_on_network_error,
460            is_async: value
461                .0
462                .options
463                .as_ref()
464                .and_then(|o| o.0.get("async").map(|v| v == "true")),
465            is_batched: value
466                .0
467                .options
468                .as_ref()
469                .and_then(|o| o.0.get("batch").map(|v| v == "true")),
470        }
471    }
472}