risingwave_frontend/observer/
observer_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use parking_lot::RwLock;
20use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
21use risingwave_common::catalog::CatalogVersion;
22use risingwave_common::hash::WorkerSlotMapping;
23use risingwave_common::license::LicenseManager;
24use risingwave_common::secret::LocalSecretManager;
25use risingwave_common::session_config::SessionConfig;
26use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
27use risingwave_common_service::ObserverState;
28use risingwave_hummock_sdk::FrontendHummockVersion;
29use risingwave_pb::common::WorkerNode;
30use risingwave_pb::hummock::{HummockVersionDeltas, HummockVersionStats};
31use risingwave_pb::meta::object::{ObjectInfo, PbObjectInfo};
32use risingwave_pb::meta::subscribe_response::{Info, Operation};
33use risingwave_pb::meta::{FragmentWorkerSlotMapping, MetaSnapshot, SubscribeResponse};
34use risingwave_rpc_client::ComputeClientPoolRef;
35use tokio::sync::watch::Sender;
36
37use crate::catalog::root_catalog::Catalog;
38use crate::catalog::{FragmentId, SecretId};
39use crate::scheduler::HummockSnapshotManagerRef;
40use crate::user::UserInfoVersion;
41use crate::user::user_manager::UserInfoManager;
42
43pub struct FrontendObserverNode {
44    worker_node_manager: WorkerNodeManagerRef,
45    catalog: Arc<RwLock<Catalog>>,
46    catalog_updated_tx: Sender<CatalogVersion>,
47    user_info_manager: Arc<RwLock<UserInfoManager>>,
48    user_info_updated_tx: Sender<UserInfoVersion>,
49    hummock_snapshot_manager: HummockSnapshotManagerRef,
50    system_params_manager: LocalSystemParamsManagerRef,
51    session_params: Arc<RwLock<SessionConfig>>,
52    compute_client_pool: ComputeClientPoolRef,
53}
54
55impl ObserverState for FrontendObserverNode {
56    fn subscribe_type() -> risingwave_pb::meta::SubscribeType {
57        risingwave_pb::meta::SubscribeType::Frontend
58    }
59
60    fn handle_notification(&mut self, resp: SubscribeResponse) {
61        let Some(info) = resp.info.as_ref() else {
62            return;
63        };
64
65        // TODO: this clone can be avoided
66        match info.to_owned() {
67            Info::Database(_)
68            | Info::Schema(_)
69            | Info::ObjectGroup(_)
70            | Info::Function(_)
71            | Info::Connection(_) => {
72                self.handle_catalog_notification(resp);
73            }
74            Info::Secret(_) => {
75                self.handle_catalog_notification(resp.clone());
76                self.handle_secret_notification(resp);
77            }
78            Info::Node(node) => {
79                self.update_worker_node_manager(resp.operation(), node);
80            }
81            Info::User(_) => {
82                self.handle_user_notification(resp);
83            }
84            Info::Snapshot(_) => {
85                panic!(
86                    "receiving a snapshot in the middle is unsupported now {:?}",
87                    resp
88                )
89            }
90            Info::HummockVersionDeltas(deltas) => {
91                self.handle_hummock_snapshot_notification(deltas);
92            }
93            Info::MetaBackupManifestId(_) => {
94                panic!("frontend node should not receive MetaBackupManifestId");
95            }
96            Info::HummockWriteLimits(_) => {
97                panic!("frontend node should not receive HummockWriteLimits");
98            }
99            Info::SystemParams(p) => {
100                self.system_params_manager.try_set_params(p);
101            }
102            Info::SessionParam(p) => {
103                self.session_params
104                    .write()
105                    .set(&p.param, p.value().to_owned(), &mut ())
106                    .unwrap();
107            }
108            Info::HummockStats(stats) => {
109                self.handle_table_stats_notification(stats);
110            }
111            Info::StreamingWorkerSlotMapping(_) => self.handle_fragment_mapping_notification(resp),
112            Info::ServingWorkerSlotMappings(m) => {
113                self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation())
114            }
115            Info::Recovery(_) => {
116                self.compute_client_pool.invalidate_all();
117            }
118            Info::ComputeNodeTotalCpuCount(count) => {
119                LicenseManager::get().update_cpu_core_count(count as _);
120            }
121        }
122    }
123
124    fn handle_initialization_notification(&mut self, resp: SubscribeResponse) {
125        let mut catalog_guard = self.catalog.write();
126        let mut user_guard = self.user_info_manager.write();
127        catalog_guard.clear();
128        user_guard.clear();
129
130        let Some(Info::Snapshot(snapshot)) = resp.info else {
131            unreachable!();
132        };
133        let MetaSnapshot {
134            databases,
135            schemas,
136            sources,
137            sinks,
138            tables,
139            indexes,
140            views,
141            subscriptions,
142            functions,
143            connections,
144            users,
145            nodes,
146            hummock_version,
147            meta_backup_manifest_id: _,
148            hummock_write_limits: _,
149            streaming_worker_slot_mappings,
150            serving_worker_slot_mappings,
151            session_params,
152            version,
153            secrets,
154            compute_node_total_cpu_count,
155        } = snapshot;
156
157        for db in databases {
158            catalog_guard.create_database(&db)
159        }
160        for schema in schemas {
161            catalog_guard.create_schema(&schema)
162        }
163        for source in sources {
164            catalog_guard.create_source(&source)
165        }
166        for sink in sinks {
167            catalog_guard.create_sink(&sink)
168        }
169        for subscription in subscriptions {
170            catalog_guard.create_subscription(&subscription)
171        }
172        for table in tables {
173            catalog_guard.create_table(&table)
174        }
175        for index in indexes {
176            catalog_guard.create_index(&index)
177        }
178        for view in views {
179            catalog_guard.create_view(&view)
180        }
181        for function in functions {
182            catalog_guard.create_function(&function)
183        }
184        for connection in connections {
185            catalog_guard.create_connection(&connection)
186        }
187        for secret in &secrets {
188            catalog_guard.create_secret(secret)
189        }
190        for user in users {
191            user_guard.create_user(user)
192        }
193
194        self.worker_node_manager.refresh(
195            nodes,
196            convert_worker_slot_mapping(&streaming_worker_slot_mappings),
197            convert_worker_slot_mapping(&serving_worker_slot_mappings),
198        );
199        self.hummock_snapshot_manager
200            .init(FrontendHummockVersion::from_protobuf(
201                hummock_version.unwrap(),
202            ));
203
204        let snapshot_version = version.unwrap();
205        catalog_guard.set_version(snapshot_version.catalog_version);
206        self.catalog_updated_tx
207            .send(snapshot_version.catalog_version)
208            .unwrap();
209        user_guard.set_version(snapshot_version.catalog_version);
210        self.user_info_updated_tx
211            .send(snapshot_version.catalog_version)
212            .unwrap();
213        *self.session_params.write() =
214            serde_json::from_str(&session_params.unwrap().params).unwrap();
215        LocalSecretManager::global().init_secrets(secrets);
216        LicenseManager::get().update_cpu_core_count(compute_node_total_cpu_count as _);
217    }
218}
219
220impl FrontendObserverNode {
221    pub fn new(
222        worker_node_manager: WorkerNodeManagerRef,
223        catalog: Arc<RwLock<Catalog>>,
224        catalog_updated_tx: Sender<CatalogVersion>,
225        user_info_manager: Arc<RwLock<UserInfoManager>>,
226        user_info_updated_tx: Sender<UserInfoVersion>,
227        hummock_snapshot_manager: HummockSnapshotManagerRef,
228        system_params_manager: LocalSystemParamsManagerRef,
229        session_params: Arc<RwLock<SessionConfig>>,
230        compute_client_pool: ComputeClientPoolRef,
231    ) -> Self {
232        Self {
233            worker_node_manager,
234            catalog,
235            catalog_updated_tx,
236            user_info_manager,
237            user_info_updated_tx,
238            hummock_snapshot_manager,
239            system_params_manager,
240            session_params,
241            compute_client_pool,
242        }
243    }
244
245    fn handle_table_stats_notification(&mut self, table_stats: HummockVersionStats) {
246        let mut catalog_guard = self.catalog.write();
247        catalog_guard.set_table_stats(table_stats);
248    }
249
250    fn handle_catalog_notification(&mut self, resp: SubscribeResponse) {
251        let Some(info) = resp.info.as_ref() else {
252            return;
253        };
254        tracing::trace!(op = ?resp.operation(), ?info, "handle catalog notification");
255
256        let mut catalog_guard = self.catalog.write();
257        match info {
258            Info::Database(database) => match resp.operation() {
259                Operation::Add => catalog_guard.create_database(database),
260                Operation::Delete => catalog_guard.drop_database(database.id),
261                Operation::Update => catalog_guard.update_database(database),
262                _ => panic!("receive an unsupported notify {:?}", resp),
263            },
264            Info::Schema(schema) => match resp.operation() {
265                Operation::Add => catalog_guard.create_schema(schema),
266                Operation::Delete => catalog_guard.drop_schema(schema.database_id, schema.id),
267                Operation::Update => catalog_guard.update_schema(schema),
268                _ => panic!("receive an unsupported notify {:?}", resp),
269            },
270            Info::ObjectGroup(object_group) => {
271                for object in &object_group.objects {
272                    let Some(obj) = object.object_info.as_ref() else {
273                        continue;
274                    };
275                    match obj {
276                        ObjectInfo::Database(db) => match resp.operation() {
277                            Operation::Add => catalog_guard.create_database(db),
278                            Operation::Delete => catalog_guard.drop_database(db.id),
279                            Operation::Update => catalog_guard.update_database(db),
280                            _ => panic!("receive an unsupported notify {:?}", resp),
281                        },
282                        ObjectInfo::Schema(schema) => match resp.operation() {
283                            Operation::Add => catalog_guard.create_schema(schema),
284                            Operation::Delete => {
285                                catalog_guard.drop_schema(schema.database_id, schema.id)
286                            }
287                            Operation::Update => catalog_guard.update_schema(schema),
288                            _ => panic!("receive an unsupported notify {:?}", resp),
289                        },
290                        PbObjectInfo::Table(table) => match resp.operation() {
291                            Operation::Add => catalog_guard.create_table(table),
292                            Operation::Delete => catalog_guard.drop_table(
293                                table.database_id,
294                                table.schema_id,
295                                table.id.into(),
296                            ),
297                            Operation::Update => {
298                                let old_fragment_id = catalog_guard
299                                    .get_any_table_by_id(&table.id.into())
300                                    .unwrap()
301                                    .fragment_id;
302                                catalog_guard.update_table(table);
303                                if old_fragment_id != table.fragment_id {
304                                    // FIXME: the frontend node delete its fragment for the update
305                                    // operation by itself.
306                                    self.worker_node_manager
307                                        .remove_streaming_fragment_mapping(&old_fragment_id);
308                                }
309                            }
310                            _ => panic!("receive an unsupported notify {:?}", resp),
311                        },
312                        PbObjectInfo::Source(source) => match resp.operation() {
313                            Operation::Add => catalog_guard.create_source(source),
314                            Operation::Delete => catalog_guard.drop_source(
315                                source.database_id,
316                                source.schema_id,
317                                source.id,
318                            ),
319                            Operation::Update => catalog_guard.update_source(source),
320                            _ => panic!("receive an unsupported notify {:?}", resp),
321                        },
322                        PbObjectInfo::Sink(sink) => match resp.operation() {
323                            Operation::Add => catalog_guard.create_sink(sink),
324                            Operation::Delete => {
325                                catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id)
326                            }
327                            Operation::Update => catalog_guard.update_sink(sink),
328                            _ => panic!("receive an unsupported notify {:?}", resp),
329                        },
330                        PbObjectInfo::Subscription(subscription) => match resp.operation() {
331                            Operation::Add => catalog_guard.create_subscription(subscription),
332                            Operation::Delete => catalog_guard.drop_subscription(
333                                subscription.database_id,
334                                subscription.schema_id,
335                                subscription.id,
336                            ),
337                            Operation::Update => catalog_guard.update_subscription(subscription),
338                            _ => panic!("receive an unsupported notify {:?}", resp),
339                        },
340                        PbObjectInfo::Index(index) => match resp.operation() {
341                            Operation::Add => catalog_guard.create_index(index),
342                            Operation::Delete => catalog_guard.drop_index(
343                                index.database_id,
344                                index.schema_id,
345                                index.id.into(),
346                            ),
347                            Operation::Update => catalog_guard.update_index(index),
348                            _ => panic!("receive an unsupported notify {:?}", resp),
349                        },
350                        PbObjectInfo::View(view) => match resp.operation() {
351                            Operation::Add => catalog_guard.create_view(view),
352                            Operation::Delete => {
353                                catalog_guard.drop_view(view.database_id, view.schema_id, view.id)
354                            }
355                            Operation::Update => catalog_guard.update_view(view),
356                            _ => panic!("receive an unsupported notify {:?}", resp),
357                        },
358                        ObjectInfo::Function(function) => match resp.operation() {
359                            Operation::Add => catalog_guard.create_function(function),
360                            Operation::Delete => catalog_guard.drop_function(
361                                function.database_id,
362                                function.schema_id,
363                                function.id.into(),
364                            ),
365                            Operation::Update => catalog_guard.update_function(function),
366                            _ => panic!("receive an unsupported notify {:?}", resp),
367                        },
368                        ObjectInfo::Connection(connection) => match resp.operation() {
369                            Operation::Add => catalog_guard.create_connection(connection),
370                            Operation::Delete => catalog_guard.drop_connection(
371                                connection.database_id,
372                                connection.schema_id,
373                                connection.id,
374                            ),
375                            Operation::Update => catalog_guard.update_connection(connection),
376                            _ => panic!("receive an unsupported notify {:?}", resp),
377                        },
378                        ObjectInfo::Secret(secret) => {
379                            let mut secret = secret.clone();
380                            // The secret value should not be revealed to users. So mask it in the frontend catalog.
381                            secret.value =
382                                "SECRET VALUE SHOULD NOT BE REVEALED".as_bytes().to_vec();
383                            match resp.operation() {
384                                Operation::Add => catalog_guard.create_secret(&secret),
385                                Operation::Delete => catalog_guard.drop_secret(
386                                    secret.database_id,
387                                    secret.schema_id,
388                                    SecretId::new(secret.id),
389                                ),
390                                Operation::Update => catalog_guard.update_secret(&secret),
391                                _ => panic!("receive an unsupported notify {:?}", resp),
392                            }
393                        }
394                    }
395                }
396            }
397            Info::Function(function) => match resp.operation() {
398                Operation::Add => catalog_guard.create_function(function),
399                Operation::Delete => catalog_guard.drop_function(
400                    function.database_id,
401                    function.schema_id,
402                    function.id.into(),
403                ),
404                Operation::Update => catalog_guard.update_function(function),
405                _ => panic!("receive an unsupported notify {:?}", resp),
406            },
407            Info::Connection(connection) => match resp.operation() {
408                Operation::Add => catalog_guard.create_connection(connection),
409                Operation::Delete => catalog_guard.drop_connection(
410                    connection.database_id,
411                    connection.schema_id,
412                    connection.id,
413                ),
414                Operation::Update => catalog_guard.update_connection(connection),
415                _ => panic!("receive an unsupported notify {:?}", resp),
416            },
417            Info::Secret(secret) => {
418                let mut secret = secret.clone();
419                // The secret value should not be revealed to users. So mask it in the frontend catalog.
420                secret.value = "SECRET VALUE SHOULD NOT BE REVEALED".as_bytes().to_vec();
421                match resp.operation() {
422                    Operation::Add => catalog_guard.create_secret(&secret),
423                    Operation::Delete => catalog_guard.drop_secret(
424                        secret.database_id,
425                        secret.schema_id,
426                        SecretId::new(secret.id),
427                    ),
428                    Operation::Update => catalog_guard.update_secret(&secret),
429                    _ => panic!("receive an unsupported notify {:?}", resp),
430                }
431            }
432            _ => unreachable!(),
433        }
434        assert!(
435            resp.version > catalog_guard.version(),
436            "resp version={:?}, current version={:?}",
437            resp.version,
438            catalog_guard.version()
439        );
440        catalog_guard.set_version(resp.version);
441        self.catalog_updated_tx.send(resp.version).unwrap();
442    }
443
444    fn handle_user_notification(&mut self, resp: SubscribeResponse) {
445        let Some(info) = resp.info.as_ref() else {
446            return;
447        };
448
449        let mut user_guard = self.user_info_manager.write();
450        match info {
451            Info::User(user) => match resp.operation() {
452                Operation::Add => user_guard.create_user(user.clone()),
453                Operation::Delete => user_guard.drop_user(user.id),
454                Operation::Update => user_guard.update_user(user.clone()),
455                _ => panic!("receive an unsupported notify {:?}", resp),
456            },
457            _ => unreachable!(),
458        }
459        assert!(
460            resp.version > user_guard.version(),
461            "resp version={:?}, current version={:?}",
462            resp.version,
463            user_guard.version()
464        );
465        user_guard.set_version(resp.version);
466        self.user_info_updated_tx.send(resp.version).unwrap();
467    }
468
469    fn handle_fragment_mapping_notification(&mut self, resp: SubscribeResponse) {
470        let Some(info) = resp.info.as_ref() else {
471            return;
472        };
473        match info {
474            Info::StreamingWorkerSlotMapping(streaming_worker_slot_mapping) => {
475                let fragment_id = streaming_worker_slot_mapping.fragment_id;
476                let mapping = || {
477                    WorkerSlotMapping::from_protobuf(
478                        streaming_worker_slot_mapping.mapping.as_ref().unwrap(),
479                    )
480                };
481
482                match resp.operation() {
483                    Operation::Add => {
484                        self.worker_node_manager
485                            .insert_streaming_fragment_mapping(fragment_id, mapping());
486                    }
487                    Operation::Delete => {
488                        self.worker_node_manager
489                            .remove_streaming_fragment_mapping(&fragment_id);
490                    }
491                    Operation::Update => {
492                        self.worker_node_manager
493                            .update_streaming_fragment_mapping(fragment_id, mapping());
494                    }
495                    _ => panic!("receive an unsupported notify {:?}", resp),
496                }
497            }
498            _ => unreachable!(),
499        }
500    }
501
502    fn handle_fragment_serving_mapping_notification(
503        &mut self,
504        mappings: Vec<FragmentWorkerSlotMapping>,
505        op: Operation,
506    ) {
507        match op {
508            Operation::Add | Operation::Update => {
509                self.worker_node_manager
510                    .upsert_serving_fragment_mapping(convert_worker_slot_mapping(&mappings));
511            }
512            Operation::Delete => self.worker_node_manager.remove_serving_fragment_mapping(
513                &mappings.into_iter().map(|m| m.fragment_id).collect_vec(),
514            ),
515            Operation::Snapshot => {
516                self.worker_node_manager
517                    .set_serving_fragment_mapping(convert_worker_slot_mapping(&mappings));
518            }
519            _ => panic!("receive an unsupported notify {:?}", op),
520        }
521    }
522
523    /// Update max committed epoch in `HummockSnapshotManager`.
524    fn handle_hummock_snapshot_notification(&self, deltas: HummockVersionDeltas) {
525        self.hummock_snapshot_manager.update(deltas);
526    }
527
528    fn handle_secret_notification(&mut self, resp: SubscribeResponse) {
529        let resp_op = resp.operation();
530        let Some(Info::Secret(secret)) = resp.info else {
531            unreachable!();
532        };
533        match resp_op {
534            Operation::Add => {
535                LocalSecretManager::global().add_secret(secret.id, secret.value);
536            }
537            Operation::Delete => {
538                LocalSecretManager::global().remove_secret(secret.id);
539            }
540            Operation::Update => {
541                LocalSecretManager::global().update_secret(secret.id, secret.value);
542            }
543            _ => {
544                panic!("error type notification");
545            }
546        }
547    }
548
549    /// `update_worker_node_manager` is called in `start` method.
550    /// It calls `add_worker_node` and `remove_worker_node` of `WorkerNodeManager`.
551    fn update_worker_node_manager(&self, operation: Operation, node: WorkerNode) {
552        tracing::debug!(
553            "Update worker nodes, operation: {:?}, node: {:?}",
554            operation,
555            node
556        );
557
558        match operation {
559            Operation::Add => self.worker_node_manager.add_worker_node(node),
560            Operation::Delete => self.worker_node_manager.remove_worker_node(node),
561            _ => (),
562        }
563    }
564}
565
566fn convert_worker_slot_mapping(
567    worker_slot_mappings: &[FragmentWorkerSlotMapping],
568) -> HashMap<FragmentId, WorkerSlotMapping> {
569    worker_slot_mappings
570        .iter()
571        .map(
572            |FragmentWorkerSlotMapping {
573                 fragment_id,
574                 mapping,
575             }| {
576                let mapping = WorkerSlotMapping::from_protobuf(mapping.as_ref().unwrap());
577                (*fragment_id, mapping)
578            },
579        )
580        .collect()
581}