1use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use parking_lot::RwLock;
20use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
21use risingwave_common::catalog::CatalogVersion;
22use risingwave_common::hash::WorkerSlotMapping;
23use risingwave_common::license::LicenseManager;
24use risingwave_common::secret::LocalSecretManager;
25use risingwave_common::session_config::SessionConfig;
26use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
27use risingwave_common_service::ObserverState;
28use risingwave_hummock_sdk::FrontendHummockVersion;
29use risingwave_pb::common::WorkerNode;
30use risingwave_pb::hummock::{HummockVersionDeltas, HummockVersionStats};
31use risingwave_pb::meta::object::{ObjectInfo, PbObjectInfo};
32use risingwave_pb::meta::subscribe_response::{Info, Operation};
33use risingwave_pb::meta::{FragmentWorkerSlotMapping, MetaSnapshot, SubscribeResponse};
34use risingwave_rpc_client::ComputeClientPoolRef;
35use tokio::sync::watch::Sender;
36
37use crate::catalog::root_catalog::Catalog;
38use crate::catalog::{FragmentId, SecretId};
39use crate::scheduler::HummockSnapshotManagerRef;
40use crate::user::user_manager::UserInfoManager;
41
42pub struct FrontendObserverNode {
43 worker_node_manager: WorkerNodeManagerRef,
44 version: CatalogVersion,
45 catalog_updated_tx: Sender<CatalogVersion>,
46 catalog: Arc<RwLock<Catalog>>,
47 user_info_manager: Arc<RwLock<UserInfoManager>>,
48 hummock_snapshot_manager: HummockSnapshotManagerRef,
49 system_params_manager: LocalSystemParamsManagerRef,
50 session_params: Arc<RwLock<SessionConfig>>,
51 compute_client_pool: ComputeClientPoolRef,
52}
53
54impl ObserverState for FrontendObserverNode {
55 fn subscribe_type() -> risingwave_pb::meta::SubscribeType {
56 risingwave_pb::meta::SubscribeType::Frontend
57 }
58
59 fn handle_notification(&mut self, resp: SubscribeResponse) {
60 let Some(info) = resp.info.as_ref() else {
61 return;
62 };
63
64 match info.to_owned() {
66 Info::Database(_)
67 | Info::Schema(_)
68 | Info::ObjectGroup(_)
69 | Info::Function(_)
70 | Info::Connection(_) => {
71 self.handle_catalog_notification(resp);
72 }
73 Info::Secret(_) => {
74 self.handle_catalog_notification(resp.clone());
75 self.handle_secret_notification(resp);
76 }
77 Info::Node(node) => {
78 self.update_worker_node_manager(resp.operation(), node);
79 }
80 Info::User(_) => {
81 self.handle_user_notification(resp);
82 }
83 Info::Snapshot(_) => {
84 panic!(
85 "receiving a snapshot in the middle is unsupported now {:?}",
86 resp
87 )
88 }
89 Info::HummockVersionDeltas(deltas) => {
90 self.handle_hummock_snapshot_notification(deltas);
91 }
92 Info::MetaBackupManifestId(_) => {
93 panic!("frontend node should not receive MetaBackupManifestId");
94 }
95 Info::HummockWriteLimits(_) => {
96 panic!("frontend node should not receive HummockWriteLimits");
97 }
98 Info::SystemParams(p) => {
99 self.system_params_manager.try_set_params(p);
100 }
101 Info::SessionParam(p) => {
102 self.session_params
103 .write()
104 .set(&p.param, p.value().to_owned(), &mut ())
105 .unwrap();
106 }
107 Info::HummockStats(stats) => {
108 self.handle_table_stats_notification(stats);
109 }
110 Info::StreamingWorkerSlotMapping(_) => self.handle_fragment_mapping_notification(resp),
111 Info::ServingWorkerSlotMappings(m) => {
112 self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation())
113 }
114 Info::Recovery(_) => {
115 self.compute_client_pool.invalidate_all();
116 }
117 Info::ComputeNodeTotalCpuCount(count) => {
118 LicenseManager::get().update_cpu_core_count(count as _);
119 }
120 }
121 }
122
123 fn handle_initialization_notification(&mut self, resp: SubscribeResponse) {
124 let mut catalog_guard = self.catalog.write();
125 let mut user_guard = self.user_info_manager.write();
126 catalog_guard.clear();
127 user_guard.clear();
128
129 let Some(Info::Snapshot(snapshot)) = resp.info else {
130 unreachable!();
131 };
132 let MetaSnapshot {
133 databases,
134 schemas,
135 sources,
136 sinks,
137 tables,
138 indexes,
139 views,
140 subscriptions,
141 functions,
142 connections,
143 users,
144 nodes,
145 hummock_version,
146 meta_backup_manifest_id: _,
147 hummock_write_limits: _,
148 streaming_worker_slot_mappings,
149 serving_worker_slot_mappings,
150 session_params,
151 version,
152 secrets,
153 compute_node_total_cpu_count,
154 } = snapshot;
155
156 for db in databases {
157 catalog_guard.create_database(&db)
158 }
159 for schema in schemas {
160 catalog_guard.create_schema(&schema)
161 }
162 for source in sources {
163 catalog_guard.create_source(&source)
164 }
165 for sink in sinks {
166 catalog_guard.create_sink(&sink)
167 }
168 for subscription in subscriptions {
169 catalog_guard.create_subscription(&subscription)
170 }
171 for table in tables {
172 catalog_guard.create_table(&table)
173 }
174 for index in indexes {
175 catalog_guard.create_index(&index)
176 }
177 for view in views {
178 catalog_guard.create_view(&view)
179 }
180 for function in functions {
181 catalog_guard.create_function(&function)
182 }
183 for connection in connections {
184 catalog_guard.create_connection(&connection)
185 }
186 for secret in &secrets {
187 catalog_guard.create_secret(secret)
188 }
189 for user in users {
190 user_guard.create_user(user)
191 }
192
193 self.worker_node_manager.refresh(
194 nodes,
195 convert_worker_slot_mapping(&streaming_worker_slot_mappings),
196 convert_worker_slot_mapping(&serving_worker_slot_mappings),
197 );
198 self.hummock_snapshot_manager
199 .init(FrontendHummockVersion::from_protobuf(
200 hummock_version.unwrap(),
201 ));
202
203 let snapshot_version = version.unwrap();
204 self.version = snapshot_version.catalog_version;
205 self.catalog_updated_tx
206 .send(snapshot_version.catalog_version)
207 .unwrap();
208 *self.session_params.write() =
209 serde_json::from_str(&session_params.unwrap().params).unwrap();
210 LocalSecretManager::global().init_secrets(secrets);
211 LicenseManager::get().update_cpu_core_count(compute_node_total_cpu_count as _);
212 }
213}
214
215impl FrontendObserverNode {
216 pub fn new(
217 worker_node_manager: WorkerNodeManagerRef,
218 catalog: Arc<RwLock<Catalog>>,
219 catalog_updated_tx: Sender<CatalogVersion>,
220 user_info_manager: Arc<RwLock<UserInfoManager>>,
221 hummock_snapshot_manager: HummockSnapshotManagerRef,
222 system_params_manager: LocalSystemParamsManagerRef,
223 session_params: Arc<RwLock<SessionConfig>>,
224 compute_client_pool: ComputeClientPoolRef,
225 ) -> Self {
226 Self {
227 version: 0,
228 worker_node_manager,
229 catalog,
230 catalog_updated_tx,
231 user_info_manager,
232 hummock_snapshot_manager,
233 system_params_manager,
234 session_params,
235 compute_client_pool,
236 }
237 }
238
239 fn handle_table_stats_notification(&mut self, table_stats: HummockVersionStats) {
240 let mut catalog_guard = self.catalog.write();
241 catalog_guard.set_table_stats(table_stats);
242 }
243
244 fn handle_catalog_notification(&mut self, resp: SubscribeResponse) {
245 let Some(info) = resp.info.as_ref() else {
246 return;
247 };
248 tracing::trace!(op = ?resp.operation(), ?info, "handle catalog notification");
249
250 let mut catalog_guard = self.catalog.write();
251 match info {
252 Info::Database(database) => match resp.operation() {
253 Operation::Add => catalog_guard.create_database(database),
254 Operation::Delete => catalog_guard.drop_database(database.id),
255 Operation::Update => catalog_guard.update_database(database),
256 _ => panic!("receive an unsupported notify {:?}", resp),
257 },
258 Info::Schema(schema) => match resp.operation() {
259 Operation::Add => catalog_guard.create_schema(schema),
260 Operation::Delete => catalog_guard.drop_schema(schema.database_id, schema.id),
261 Operation::Update => catalog_guard.update_schema(schema),
262 _ => panic!("receive an unsupported notify {:?}", resp),
263 },
264 Info::ObjectGroup(object_group) => {
265 for object in &object_group.objects {
266 let Some(obj) = object.object_info.as_ref() else {
267 continue;
268 };
269 match obj {
270 ObjectInfo::Database(db) => match resp.operation() {
271 Operation::Add => catalog_guard.create_database(db),
272 Operation::Delete => catalog_guard.drop_database(db.id),
273 Operation::Update => catalog_guard.update_database(db),
274 _ => panic!("receive an unsupported notify {:?}", resp),
275 },
276 ObjectInfo::Schema(schema) => match resp.operation() {
277 Operation::Add => catalog_guard.create_schema(schema),
278 Operation::Delete => {
279 catalog_guard.drop_schema(schema.database_id, schema.id)
280 }
281 Operation::Update => catalog_guard.update_schema(schema),
282 _ => panic!("receive an unsupported notify {:?}", resp),
283 },
284 PbObjectInfo::Table(table) => match resp.operation() {
285 Operation::Add => catalog_guard.create_table(table),
286 Operation::Delete => catalog_guard.drop_table(
287 table.database_id,
288 table.schema_id,
289 table.id.into(),
290 ),
291 Operation::Update => {
292 let old_fragment_id = catalog_guard
293 .get_any_table_by_id(&table.id.into())
294 .unwrap()
295 .fragment_id;
296 catalog_guard.update_table(table);
297 if old_fragment_id != table.fragment_id {
298 self.worker_node_manager
301 .remove_streaming_fragment_mapping(&old_fragment_id);
302 }
303 }
304 _ => panic!("receive an unsupported notify {:?}", resp),
305 },
306 PbObjectInfo::Source(source) => match resp.operation() {
307 Operation::Add => catalog_guard.create_source(source),
308 Operation::Delete => catalog_guard.drop_source(
309 source.database_id,
310 source.schema_id,
311 source.id,
312 ),
313 Operation::Update => catalog_guard.update_source(source),
314 _ => panic!("receive an unsupported notify {:?}", resp),
315 },
316 PbObjectInfo::Sink(sink) => match resp.operation() {
317 Operation::Add => catalog_guard.create_sink(sink),
318 Operation::Delete => {
319 catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id)
320 }
321 Operation::Update => catalog_guard.update_sink(sink),
322 _ => panic!("receive an unsupported notify {:?}", resp),
323 },
324 PbObjectInfo::Subscription(subscription) => match resp.operation() {
325 Operation::Add => catalog_guard.create_subscription(subscription),
326 Operation::Delete => catalog_guard.drop_subscription(
327 subscription.database_id,
328 subscription.schema_id,
329 subscription.id,
330 ),
331 Operation::Update => catalog_guard.update_subscription(subscription),
332 _ => panic!("receive an unsupported notify {:?}", resp),
333 },
334 PbObjectInfo::Index(index) => match resp.operation() {
335 Operation::Add => catalog_guard.create_index(index),
336 Operation::Delete => catalog_guard.drop_index(
337 index.database_id,
338 index.schema_id,
339 index.id.into(),
340 ),
341 Operation::Update => catalog_guard.update_index(index),
342 _ => panic!("receive an unsupported notify {:?}", resp),
343 },
344 PbObjectInfo::View(view) => match resp.operation() {
345 Operation::Add => catalog_guard.create_view(view),
346 Operation::Delete => {
347 catalog_guard.drop_view(view.database_id, view.schema_id, view.id)
348 }
349 Operation::Update => catalog_guard.update_view(view),
350 _ => panic!("receive an unsupported notify {:?}", resp),
351 },
352 ObjectInfo::Function(function) => match resp.operation() {
353 Operation::Add => catalog_guard.create_function(function),
354 Operation::Delete => catalog_guard.drop_function(
355 function.database_id,
356 function.schema_id,
357 function.id.into(),
358 ),
359 Operation::Update => catalog_guard.update_function(function),
360 _ => panic!("receive an unsupported notify {:?}", resp),
361 },
362 ObjectInfo::Connection(connection) => match resp.operation() {
363 Operation::Add => catalog_guard.create_connection(connection),
364 Operation::Delete => catalog_guard.drop_connection(
365 connection.database_id,
366 connection.schema_id,
367 connection.id,
368 ),
369 Operation::Update => catalog_guard.update_connection(connection),
370 _ => panic!("receive an unsupported notify {:?}", resp),
371 },
372 ObjectInfo::Secret(secret) => {
373 let mut secret = secret.clone();
374 secret.value =
376 "SECRET VALUE SHOULD NOT BE REVEALED".as_bytes().to_vec();
377 match resp.operation() {
378 Operation::Add => catalog_guard.create_secret(&secret),
379 Operation::Delete => catalog_guard.drop_secret(
380 secret.database_id,
381 secret.schema_id,
382 SecretId::new(secret.id),
383 ),
384 Operation::Update => catalog_guard.update_secret(&secret),
385 _ => panic!("receive an unsupported notify {:?}", resp),
386 }
387 }
388 }
389 }
390 }
391 Info::Function(function) => match resp.operation() {
392 Operation::Add => catalog_guard.create_function(function),
393 Operation::Delete => catalog_guard.drop_function(
394 function.database_id,
395 function.schema_id,
396 function.id.into(),
397 ),
398 Operation::Update => catalog_guard.update_function(function),
399 _ => panic!("receive an unsupported notify {:?}", resp),
400 },
401 Info::Connection(connection) => match resp.operation() {
402 Operation::Add => catalog_guard.create_connection(connection),
403 Operation::Delete => catalog_guard.drop_connection(
404 connection.database_id,
405 connection.schema_id,
406 connection.id,
407 ),
408 Operation::Update => catalog_guard.update_connection(connection),
409 _ => panic!("receive an unsupported notify {:?}", resp),
410 },
411 Info::Secret(secret) => {
412 let mut secret = secret.clone();
413 secret.value = "SECRET VALUE SHOULD NOT BE REVEALED".as_bytes().to_vec();
415 match resp.operation() {
416 Operation::Add => catalog_guard.create_secret(&secret),
417 Operation::Delete => catalog_guard.drop_secret(
418 secret.database_id,
419 secret.schema_id,
420 SecretId::new(secret.id),
421 ),
422 Operation::Update => catalog_guard.update_secret(&secret),
423 _ => panic!("receive an unsupported notify {:?}", resp),
424 }
425 }
426 _ => unreachable!(),
427 }
428 assert!(
429 resp.version > self.version,
430 "resp version={:?}, current version={:?}",
431 resp.version,
432 self.version
433 );
434 self.version = resp.version;
435 self.catalog_updated_tx.send(resp.version).unwrap();
436 }
437
438 fn handle_user_notification(&mut self, resp: SubscribeResponse) {
439 let Some(info) = resp.info.as_ref() else {
440 return;
441 };
442
443 let mut user_guard = self.user_info_manager.write();
444 match info {
445 Info::User(user) => match resp.operation() {
446 Operation::Add => user_guard.create_user(user.clone()),
447 Operation::Delete => user_guard.drop_user(user.id),
448 Operation::Update => user_guard.update_user(user.clone()),
449 _ => panic!("receive an unsupported notify {:?}", resp),
450 },
451 _ => unreachable!(),
452 }
453 assert!(
454 resp.version > self.version,
455 "resp version={:?}, current version={:?}",
456 resp.version,
457 self.version
458 );
459 self.version = resp.version;
460 self.catalog_updated_tx.send(resp.version).unwrap();
461 }
462
463 fn handle_fragment_mapping_notification(&mut self, resp: SubscribeResponse) {
464 let Some(info) = resp.info.as_ref() else {
465 return;
466 };
467 match info {
468 Info::StreamingWorkerSlotMapping(streaming_worker_slot_mapping) => {
469 let fragment_id = streaming_worker_slot_mapping.fragment_id;
470 let mapping = || {
471 WorkerSlotMapping::from_protobuf(
472 streaming_worker_slot_mapping.mapping.as_ref().unwrap(),
473 )
474 };
475
476 match resp.operation() {
477 Operation::Add => {
478 self.worker_node_manager
479 .insert_streaming_fragment_mapping(fragment_id, mapping());
480 }
481 Operation::Delete => {
482 self.worker_node_manager
483 .remove_streaming_fragment_mapping(&fragment_id);
484 }
485 Operation::Update => {
486 self.worker_node_manager
487 .update_streaming_fragment_mapping(fragment_id, mapping());
488 }
489 _ => panic!("receive an unsupported notify {:?}", resp),
490 }
491 }
492 _ => unreachable!(),
493 }
494 }
495
496 fn handle_fragment_serving_mapping_notification(
497 &mut self,
498 mappings: Vec<FragmentWorkerSlotMapping>,
499 op: Operation,
500 ) {
501 match op {
502 Operation::Add | Operation::Update => {
503 self.worker_node_manager
504 .upsert_serving_fragment_mapping(convert_worker_slot_mapping(&mappings));
505 }
506 Operation::Delete => self.worker_node_manager.remove_serving_fragment_mapping(
507 &mappings.into_iter().map(|m| m.fragment_id).collect_vec(),
508 ),
509 Operation::Snapshot => {
510 self.worker_node_manager
511 .set_serving_fragment_mapping(convert_worker_slot_mapping(&mappings));
512 }
513 _ => panic!("receive an unsupported notify {:?}", op),
514 }
515 }
516
517 fn handle_hummock_snapshot_notification(&self, deltas: HummockVersionDeltas) {
519 self.hummock_snapshot_manager.update(deltas);
520 }
521
522 fn handle_secret_notification(&mut self, resp: SubscribeResponse) {
523 let resp_op = resp.operation();
524 let Some(Info::Secret(secret)) = resp.info else {
525 unreachable!();
526 };
527 match resp_op {
528 Operation::Add => {
529 LocalSecretManager::global().add_secret(secret.id, secret.value);
530 }
531 Operation::Delete => {
532 LocalSecretManager::global().remove_secret(secret.id);
533 }
534 Operation::Update => {
535 LocalSecretManager::global().update_secret(secret.id, secret.value);
536 }
537 _ => {
538 panic!("error type notification");
539 }
540 }
541 }
542
543 fn update_worker_node_manager(&self, operation: Operation, node: WorkerNode) {
546 tracing::debug!(
547 "Update worker nodes, operation: {:?}, node: {:?}",
548 operation,
549 node
550 );
551
552 match operation {
553 Operation::Add => self.worker_node_manager.add_worker_node(node),
554 Operation::Delete => self.worker_node_manager.remove_worker_node(node),
555 _ => (),
556 }
557 }
558}
559
560fn convert_worker_slot_mapping(
561 worker_slot_mappings: &[FragmentWorkerSlotMapping],
562) -> HashMap<FragmentId, WorkerSlotMapping> {
563 worker_slot_mappings
564 .iter()
565 .map(
566 |FragmentWorkerSlotMapping {
567 fragment_id,
568 mapping,
569 }| {
570 let mapping = WorkerSlotMapping::from_protobuf(mapping.as_ref().unwrap());
571 (*fragment_id, mapping)
572 },
573 )
574 .collect()
575}