risingwave_frontend/observer/
observer_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use parking_lot::RwLock;
20use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
21use risingwave_common::catalog::CatalogVersion;
22use risingwave_common::hash::WorkerSlotMapping;
23use risingwave_common::license::LicenseManager;
24use risingwave_common::secret::LocalSecretManager;
25use risingwave_common::session_config::SessionConfig;
26use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
27use risingwave_common_service::ObserverState;
28use risingwave_hummock_sdk::FrontendHummockVersion;
29use risingwave_pb::common::WorkerNode;
30use risingwave_pb::hummock::{HummockVersionDeltas, HummockVersionStats};
31use risingwave_pb::meta::object::{ObjectInfo, PbObjectInfo};
32use risingwave_pb::meta::subscribe_response::{Info, Operation};
33use risingwave_pb::meta::{FragmentWorkerSlotMapping, MetaSnapshot, SubscribeResponse};
34use risingwave_rpc_client::ComputeClientPoolRef;
35use tokio::sync::watch::Sender;
36
37use crate::catalog::root_catalog::Catalog;
38use crate::catalog::{FragmentId, SecretId};
39use crate::scheduler::HummockSnapshotManagerRef;
40use crate::user::user_manager::UserInfoManager;
41
42pub struct FrontendObserverNode {
43    worker_node_manager: WorkerNodeManagerRef,
44    version: CatalogVersion,
45    catalog_updated_tx: Sender<CatalogVersion>,
46    catalog: Arc<RwLock<Catalog>>,
47    user_info_manager: Arc<RwLock<UserInfoManager>>,
48    hummock_snapshot_manager: HummockSnapshotManagerRef,
49    system_params_manager: LocalSystemParamsManagerRef,
50    session_params: Arc<RwLock<SessionConfig>>,
51    compute_client_pool: ComputeClientPoolRef,
52}
53
54impl ObserverState for FrontendObserverNode {
55    fn subscribe_type() -> risingwave_pb::meta::SubscribeType {
56        risingwave_pb::meta::SubscribeType::Frontend
57    }
58
59    fn handle_notification(&mut self, resp: SubscribeResponse) {
60        let Some(info) = resp.info.as_ref() else {
61            return;
62        };
63
64        // TODO: this clone can be avoided
65        match info.to_owned() {
66            Info::Database(_)
67            | Info::Schema(_)
68            | Info::ObjectGroup(_)
69            | Info::Function(_)
70            | Info::Connection(_) => {
71                self.handle_catalog_notification(resp);
72            }
73            Info::Secret(_) => {
74                self.handle_catalog_notification(resp.clone());
75                self.handle_secret_notification(resp);
76            }
77            Info::Node(node) => {
78                self.update_worker_node_manager(resp.operation(), node);
79            }
80            Info::User(_) => {
81                self.handle_user_notification(resp);
82            }
83            Info::Snapshot(_) => {
84                panic!(
85                    "receiving a snapshot in the middle is unsupported now {:?}",
86                    resp
87                )
88            }
89            Info::HummockVersionDeltas(deltas) => {
90                self.handle_hummock_snapshot_notification(deltas);
91            }
92            Info::MetaBackupManifestId(_) => {
93                panic!("frontend node should not receive MetaBackupManifestId");
94            }
95            Info::HummockWriteLimits(_) => {
96                panic!("frontend node should not receive HummockWriteLimits");
97            }
98            Info::SystemParams(p) => {
99                self.system_params_manager.try_set_params(p);
100            }
101            Info::SessionParam(p) => {
102                self.session_params
103                    .write()
104                    .set(&p.param, p.value().to_owned(), &mut ())
105                    .unwrap();
106            }
107            Info::HummockStats(stats) => {
108                self.handle_table_stats_notification(stats);
109            }
110            Info::StreamingWorkerSlotMapping(_) => self.handle_fragment_mapping_notification(resp),
111            Info::ServingWorkerSlotMappings(m) => {
112                self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation())
113            }
114            Info::Recovery(_) => {
115                self.compute_client_pool.invalidate_all();
116            }
117            Info::ComputeNodeTotalCpuCount(count) => {
118                LicenseManager::get().update_cpu_core_count(count as _);
119            }
120        }
121    }
122
123    fn handle_initialization_notification(&mut self, resp: SubscribeResponse) {
124        let mut catalog_guard = self.catalog.write();
125        let mut user_guard = self.user_info_manager.write();
126        catalog_guard.clear();
127        user_guard.clear();
128
129        let Some(Info::Snapshot(snapshot)) = resp.info else {
130            unreachable!();
131        };
132        let MetaSnapshot {
133            databases,
134            schemas,
135            sources,
136            sinks,
137            tables,
138            indexes,
139            views,
140            subscriptions,
141            functions,
142            connections,
143            users,
144            nodes,
145            hummock_version,
146            meta_backup_manifest_id: _,
147            hummock_write_limits: _,
148            streaming_worker_slot_mappings,
149            serving_worker_slot_mappings,
150            session_params,
151            version,
152            secrets,
153            compute_node_total_cpu_count,
154        } = snapshot;
155
156        for db in databases {
157            catalog_guard.create_database(&db)
158        }
159        for schema in schemas {
160            catalog_guard.create_schema(&schema)
161        }
162        for source in sources {
163            catalog_guard.create_source(&source)
164        }
165        for sink in sinks {
166            catalog_guard.create_sink(&sink)
167        }
168        for subscription in subscriptions {
169            catalog_guard.create_subscription(&subscription)
170        }
171        for table in tables {
172            catalog_guard.create_table(&table)
173        }
174        for index in indexes {
175            catalog_guard.create_index(&index)
176        }
177        for view in views {
178            catalog_guard.create_view(&view)
179        }
180        for function in functions {
181            catalog_guard.create_function(&function)
182        }
183        for connection in connections {
184            catalog_guard.create_connection(&connection)
185        }
186        for secret in &secrets {
187            catalog_guard.create_secret(secret)
188        }
189        for user in users {
190            user_guard.create_user(user)
191        }
192
193        self.worker_node_manager.refresh(
194            nodes,
195            convert_worker_slot_mapping(&streaming_worker_slot_mappings),
196            convert_worker_slot_mapping(&serving_worker_slot_mappings),
197        );
198        self.hummock_snapshot_manager
199            .init(FrontendHummockVersion::from_protobuf(
200                hummock_version.unwrap(),
201            ));
202
203        let snapshot_version = version.unwrap();
204        self.version = snapshot_version.catalog_version;
205        self.catalog_updated_tx
206            .send(snapshot_version.catalog_version)
207            .unwrap();
208        *self.session_params.write() =
209            serde_json::from_str(&session_params.unwrap().params).unwrap();
210        LocalSecretManager::global().init_secrets(secrets);
211        LicenseManager::get().update_cpu_core_count(compute_node_total_cpu_count as _);
212    }
213}
214
215impl FrontendObserverNode {
216    pub fn new(
217        worker_node_manager: WorkerNodeManagerRef,
218        catalog: Arc<RwLock<Catalog>>,
219        catalog_updated_tx: Sender<CatalogVersion>,
220        user_info_manager: Arc<RwLock<UserInfoManager>>,
221        hummock_snapshot_manager: HummockSnapshotManagerRef,
222        system_params_manager: LocalSystemParamsManagerRef,
223        session_params: Arc<RwLock<SessionConfig>>,
224        compute_client_pool: ComputeClientPoolRef,
225    ) -> Self {
226        Self {
227            version: 0,
228            worker_node_manager,
229            catalog,
230            catalog_updated_tx,
231            user_info_manager,
232            hummock_snapshot_manager,
233            system_params_manager,
234            session_params,
235            compute_client_pool,
236        }
237    }
238
239    fn handle_table_stats_notification(&mut self, table_stats: HummockVersionStats) {
240        let mut catalog_guard = self.catalog.write();
241        catalog_guard.set_table_stats(table_stats);
242    }
243
244    fn handle_catalog_notification(&mut self, resp: SubscribeResponse) {
245        let Some(info) = resp.info.as_ref() else {
246            return;
247        };
248        tracing::trace!(op = ?resp.operation(), ?info, "handle catalog notification");
249
250        let mut catalog_guard = self.catalog.write();
251        match info {
252            Info::Database(database) => match resp.operation() {
253                Operation::Add => catalog_guard.create_database(database),
254                Operation::Delete => catalog_guard.drop_database(database.id),
255                Operation::Update => catalog_guard.update_database(database),
256                _ => panic!("receive an unsupported notify {:?}", resp),
257            },
258            Info::Schema(schema) => match resp.operation() {
259                Operation::Add => catalog_guard.create_schema(schema),
260                Operation::Delete => catalog_guard.drop_schema(schema.database_id, schema.id),
261                Operation::Update => catalog_guard.update_schema(schema),
262                _ => panic!("receive an unsupported notify {:?}", resp),
263            },
264            Info::ObjectGroup(object_group) => {
265                for object in &object_group.objects {
266                    let Some(obj) = object.object_info.as_ref() else {
267                        continue;
268                    };
269                    match obj {
270                        ObjectInfo::Database(db) => match resp.operation() {
271                            Operation::Add => catalog_guard.create_database(db),
272                            Operation::Delete => catalog_guard.drop_database(db.id),
273                            Operation::Update => catalog_guard.update_database(db),
274                            _ => panic!("receive an unsupported notify {:?}", resp),
275                        },
276                        ObjectInfo::Schema(schema) => match resp.operation() {
277                            Operation::Add => catalog_guard.create_schema(schema),
278                            Operation::Delete => {
279                                catalog_guard.drop_schema(schema.database_id, schema.id)
280                            }
281                            Operation::Update => catalog_guard.update_schema(schema),
282                            _ => panic!("receive an unsupported notify {:?}", resp),
283                        },
284                        PbObjectInfo::Table(table) => match resp.operation() {
285                            Operation::Add => catalog_guard.create_table(table),
286                            Operation::Delete => catalog_guard.drop_table(
287                                table.database_id,
288                                table.schema_id,
289                                table.id.into(),
290                            ),
291                            Operation::Update => {
292                                let old_fragment_id = catalog_guard
293                                    .get_any_table_by_id(&table.id.into())
294                                    .unwrap()
295                                    .fragment_id;
296                                catalog_guard.update_table(table);
297                                if old_fragment_id != table.fragment_id {
298                                    // FIXME: the frontend node delete its fragment for the update
299                                    // operation by itself.
300                                    self.worker_node_manager
301                                        .remove_streaming_fragment_mapping(&old_fragment_id);
302                                }
303                            }
304                            _ => panic!("receive an unsupported notify {:?}", resp),
305                        },
306                        PbObjectInfo::Source(source) => match resp.operation() {
307                            Operation::Add => catalog_guard.create_source(source),
308                            Operation::Delete => catalog_guard.drop_source(
309                                source.database_id,
310                                source.schema_id,
311                                source.id,
312                            ),
313                            Operation::Update => catalog_guard.update_source(source),
314                            _ => panic!("receive an unsupported notify {:?}", resp),
315                        },
316                        PbObjectInfo::Sink(sink) => match resp.operation() {
317                            Operation::Add => catalog_guard.create_sink(sink),
318                            Operation::Delete => {
319                                catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id)
320                            }
321                            Operation::Update => catalog_guard.update_sink(sink),
322                            _ => panic!("receive an unsupported notify {:?}", resp),
323                        },
324                        PbObjectInfo::Subscription(subscription) => match resp.operation() {
325                            Operation::Add => catalog_guard.create_subscription(subscription),
326                            Operation::Delete => catalog_guard.drop_subscription(
327                                subscription.database_id,
328                                subscription.schema_id,
329                                subscription.id,
330                            ),
331                            Operation::Update => catalog_guard.update_subscription(subscription),
332                            _ => panic!("receive an unsupported notify {:?}", resp),
333                        },
334                        PbObjectInfo::Index(index) => match resp.operation() {
335                            Operation::Add => catalog_guard.create_index(index),
336                            Operation::Delete => catalog_guard.drop_index(
337                                index.database_id,
338                                index.schema_id,
339                                index.id.into(),
340                            ),
341                            Operation::Update => catalog_guard.update_index(index),
342                            _ => panic!("receive an unsupported notify {:?}", resp),
343                        },
344                        PbObjectInfo::View(view) => match resp.operation() {
345                            Operation::Add => catalog_guard.create_view(view),
346                            Operation::Delete => {
347                                catalog_guard.drop_view(view.database_id, view.schema_id, view.id)
348                            }
349                            Operation::Update => catalog_guard.update_view(view),
350                            _ => panic!("receive an unsupported notify {:?}", resp),
351                        },
352                        ObjectInfo::Function(function) => match resp.operation() {
353                            Operation::Add => catalog_guard.create_function(function),
354                            Operation::Delete => catalog_guard.drop_function(
355                                function.database_id,
356                                function.schema_id,
357                                function.id.into(),
358                            ),
359                            Operation::Update => catalog_guard.update_function(function),
360                            _ => panic!("receive an unsupported notify {:?}", resp),
361                        },
362                        ObjectInfo::Connection(connection) => match resp.operation() {
363                            Operation::Add => catalog_guard.create_connection(connection),
364                            Operation::Delete => catalog_guard.drop_connection(
365                                connection.database_id,
366                                connection.schema_id,
367                                connection.id,
368                            ),
369                            Operation::Update => catalog_guard.update_connection(connection),
370                            _ => panic!("receive an unsupported notify {:?}", resp),
371                        },
372                        ObjectInfo::Secret(secret) => {
373                            let mut secret = secret.clone();
374                            // The secret value should not be revealed to users. So mask it in the frontend catalog.
375                            secret.value =
376                                "SECRET VALUE SHOULD NOT BE REVEALED".as_bytes().to_vec();
377                            match resp.operation() {
378                                Operation::Add => catalog_guard.create_secret(&secret),
379                                Operation::Delete => catalog_guard.drop_secret(
380                                    secret.database_id,
381                                    secret.schema_id,
382                                    SecretId::new(secret.id),
383                                ),
384                                Operation::Update => catalog_guard.update_secret(&secret),
385                                _ => panic!("receive an unsupported notify {:?}", resp),
386                            }
387                        }
388                    }
389                }
390            }
391            Info::Function(function) => match resp.operation() {
392                Operation::Add => catalog_guard.create_function(function),
393                Operation::Delete => catalog_guard.drop_function(
394                    function.database_id,
395                    function.schema_id,
396                    function.id.into(),
397                ),
398                Operation::Update => catalog_guard.update_function(function),
399                _ => panic!("receive an unsupported notify {:?}", resp),
400            },
401            Info::Connection(connection) => match resp.operation() {
402                Operation::Add => catalog_guard.create_connection(connection),
403                Operation::Delete => catalog_guard.drop_connection(
404                    connection.database_id,
405                    connection.schema_id,
406                    connection.id,
407                ),
408                Operation::Update => catalog_guard.update_connection(connection),
409                _ => panic!("receive an unsupported notify {:?}", resp),
410            },
411            Info::Secret(secret) => {
412                let mut secret = secret.clone();
413                // The secret value should not be revealed to users. So mask it in the frontend catalog.
414                secret.value = "SECRET VALUE SHOULD NOT BE REVEALED".as_bytes().to_vec();
415                match resp.operation() {
416                    Operation::Add => catalog_guard.create_secret(&secret),
417                    Operation::Delete => catalog_guard.drop_secret(
418                        secret.database_id,
419                        secret.schema_id,
420                        SecretId::new(secret.id),
421                    ),
422                    Operation::Update => catalog_guard.update_secret(&secret),
423                    _ => panic!("receive an unsupported notify {:?}", resp),
424                }
425            }
426            _ => unreachable!(),
427        }
428        assert!(
429            resp.version > self.version,
430            "resp version={:?}, current version={:?}",
431            resp.version,
432            self.version
433        );
434        self.version = resp.version;
435        self.catalog_updated_tx.send(resp.version).unwrap();
436    }
437
438    fn handle_user_notification(&mut self, resp: SubscribeResponse) {
439        let Some(info) = resp.info.as_ref() else {
440            return;
441        };
442
443        let mut user_guard = self.user_info_manager.write();
444        match info {
445            Info::User(user) => match resp.operation() {
446                Operation::Add => user_guard.create_user(user.clone()),
447                Operation::Delete => user_guard.drop_user(user.id),
448                Operation::Update => user_guard.update_user(user.clone()),
449                _ => panic!("receive an unsupported notify {:?}", resp),
450            },
451            _ => unreachable!(),
452        }
453        assert!(
454            resp.version > self.version,
455            "resp version={:?}, current version={:?}",
456            resp.version,
457            self.version
458        );
459        self.version = resp.version;
460        self.catalog_updated_tx.send(resp.version).unwrap();
461    }
462
463    fn handle_fragment_mapping_notification(&mut self, resp: SubscribeResponse) {
464        let Some(info) = resp.info.as_ref() else {
465            return;
466        };
467        match info {
468            Info::StreamingWorkerSlotMapping(streaming_worker_slot_mapping) => {
469                let fragment_id = streaming_worker_slot_mapping.fragment_id;
470                let mapping = || {
471                    WorkerSlotMapping::from_protobuf(
472                        streaming_worker_slot_mapping.mapping.as_ref().unwrap(),
473                    )
474                };
475
476                match resp.operation() {
477                    Operation::Add => {
478                        self.worker_node_manager
479                            .insert_streaming_fragment_mapping(fragment_id, mapping());
480                    }
481                    Operation::Delete => {
482                        self.worker_node_manager
483                            .remove_streaming_fragment_mapping(&fragment_id);
484                    }
485                    Operation::Update => {
486                        self.worker_node_manager
487                            .update_streaming_fragment_mapping(fragment_id, mapping());
488                    }
489                    _ => panic!("receive an unsupported notify {:?}", resp),
490                }
491            }
492            _ => unreachable!(),
493        }
494    }
495
496    fn handle_fragment_serving_mapping_notification(
497        &mut self,
498        mappings: Vec<FragmentWorkerSlotMapping>,
499        op: Operation,
500    ) {
501        match op {
502            Operation::Add | Operation::Update => {
503                self.worker_node_manager
504                    .upsert_serving_fragment_mapping(convert_worker_slot_mapping(&mappings));
505            }
506            Operation::Delete => self.worker_node_manager.remove_serving_fragment_mapping(
507                &mappings.into_iter().map(|m| m.fragment_id).collect_vec(),
508            ),
509            Operation::Snapshot => {
510                self.worker_node_manager
511                    .set_serving_fragment_mapping(convert_worker_slot_mapping(&mappings));
512            }
513            _ => panic!("receive an unsupported notify {:?}", op),
514        }
515    }
516
517    /// Update max committed epoch in `HummockSnapshotManager`.
518    fn handle_hummock_snapshot_notification(&self, deltas: HummockVersionDeltas) {
519        self.hummock_snapshot_manager.update(deltas);
520    }
521
522    fn handle_secret_notification(&mut self, resp: SubscribeResponse) {
523        let resp_op = resp.operation();
524        let Some(Info::Secret(secret)) = resp.info else {
525            unreachable!();
526        };
527        match resp_op {
528            Operation::Add => {
529                LocalSecretManager::global().add_secret(secret.id, secret.value);
530            }
531            Operation::Delete => {
532                LocalSecretManager::global().remove_secret(secret.id);
533            }
534            Operation::Update => {
535                LocalSecretManager::global().update_secret(secret.id, secret.value);
536            }
537            _ => {
538                panic!("error type notification");
539            }
540        }
541    }
542
543    /// `update_worker_node_manager` is called in `start` method.
544    /// It calls `add_worker_node` and `remove_worker_node` of `WorkerNodeManager`.
545    fn update_worker_node_manager(&self, operation: Operation, node: WorkerNode) {
546        tracing::debug!(
547            "Update worker nodes, operation: {:?}, node: {:?}",
548            operation,
549            node
550        );
551
552        match operation {
553            Operation::Add => self.worker_node_manager.add_worker_node(node),
554            Operation::Delete => self.worker_node_manager.remove_worker_node(node),
555            _ => (),
556        }
557    }
558}
559
560fn convert_worker_slot_mapping(
561    worker_slot_mappings: &[FragmentWorkerSlotMapping],
562) -> HashMap<FragmentId, WorkerSlotMapping> {
563    worker_slot_mappings
564        .iter()
565        .map(
566            |FragmentWorkerSlotMapping {
567                 fragment_id,
568                 mapping,
569             }| {
570                let mapping = WorkerSlotMapping::from_protobuf(mapping.as_ref().unwrap());
571                (*fragment_id, mapping)
572            },
573        )
574        .collect()
575}