1use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use parking_lot::RwLock;
20use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
21use risingwave_common::catalog::CatalogVersion;
22use risingwave_common::hash::WorkerSlotMapping;
23use risingwave_common::license::LicenseManager;
24use risingwave_common::secret::LocalSecretManager;
25use risingwave_common::session_config::SessionConfig;
26use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
27use risingwave_common_service::ObserverState;
28use risingwave_hummock_sdk::FrontendHummockVersion;
29use risingwave_pb::common::WorkerNode;
30use risingwave_pb::hummock::{HummockVersionDeltas, HummockVersionStats};
31use risingwave_pb::meta::object::{ObjectInfo, PbObjectInfo};
32use risingwave_pb::meta::subscribe_response::{Info, Operation};
33use risingwave_pb::meta::{FragmentWorkerSlotMapping, MetaSnapshot, SubscribeResponse};
34use risingwave_rpc_client::ComputeClientPoolRef;
35use tokio::sync::watch::Sender;
36
37use crate::catalog::root_catalog::Catalog;
38use crate::catalog::{FragmentId, SecretId};
39use crate::scheduler::HummockSnapshotManagerRef;
40use crate::user::UserInfoVersion;
41use crate::user::user_manager::UserInfoManager;
42
43pub struct FrontendObserverNode {
44 worker_node_manager: WorkerNodeManagerRef,
45 catalog: Arc<RwLock<Catalog>>,
46 catalog_updated_tx: Sender<CatalogVersion>,
47 user_info_manager: Arc<RwLock<UserInfoManager>>,
48 user_info_updated_tx: Sender<UserInfoVersion>,
49 hummock_snapshot_manager: HummockSnapshotManagerRef,
50 system_params_manager: LocalSystemParamsManagerRef,
51 session_params: Arc<RwLock<SessionConfig>>,
52 compute_client_pool: ComputeClientPoolRef,
53}
54
55impl ObserverState for FrontendObserverNode {
56 fn subscribe_type() -> risingwave_pb::meta::SubscribeType {
57 risingwave_pb::meta::SubscribeType::Frontend
58 }
59
60 fn handle_notification(&mut self, resp: SubscribeResponse) {
61 let Some(info) = resp.info.as_ref() else {
62 return;
63 };
64
65 match info.to_owned() {
67 Info::Database(_)
68 | Info::Schema(_)
69 | Info::ObjectGroup(_)
70 | Info::Function(_)
71 | Info::Connection(_) => {
72 self.handle_catalog_notification(resp);
73 }
74 Info::Secret(_) => {
75 self.handle_catalog_notification(resp.clone());
76 self.handle_secret_notification(resp);
77 }
78 Info::Node(node) => {
79 self.update_worker_node_manager(resp.operation(), node);
80 }
81 Info::User(_) => {
82 self.handle_user_notification(resp);
83 }
84 Info::Snapshot(_) => {
85 panic!(
86 "receiving a snapshot in the middle is unsupported now {:?}",
87 resp
88 )
89 }
90 Info::HummockVersionDeltas(deltas) => {
91 self.handle_hummock_snapshot_notification(deltas);
92 }
93 Info::MetaBackupManifestId(_) => {
94 panic!("frontend node should not receive MetaBackupManifestId");
95 }
96 Info::HummockWriteLimits(_) => {
97 panic!("frontend node should not receive HummockWriteLimits");
98 }
99 Info::SystemParams(p) => {
100 self.system_params_manager.try_set_params(p);
101 }
102 Info::SessionParam(p) => {
103 self.session_params
104 .write()
105 .set(&p.param, p.value().to_owned(), &mut ())
106 .unwrap();
107 }
108 Info::HummockStats(stats) => {
109 self.handle_table_stats_notification(stats);
110 }
111 Info::StreamingWorkerSlotMapping(_) => self.handle_fragment_mapping_notification(resp),
112 Info::ServingWorkerSlotMappings(m) => {
113 self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation())
114 }
115 Info::Recovery(_) => {
116 self.compute_client_pool.invalidate_all();
117 }
118 Info::ComputeNodeTotalCpuCount(count) => {
119 LicenseManager::get().update_cpu_core_count(count as _);
120 }
121 }
122 }
123
124 fn handle_initialization_notification(&mut self, resp: SubscribeResponse) {
125 let mut catalog_guard = self.catalog.write();
126 let mut user_guard = self.user_info_manager.write();
127 catalog_guard.clear();
128 user_guard.clear();
129
130 let Some(Info::Snapshot(snapshot)) = resp.info else {
131 unreachable!();
132 };
133 let MetaSnapshot {
134 databases,
135 schemas,
136 sources,
137 sinks,
138 tables,
139 indexes,
140 views,
141 subscriptions,
142 functions,
143 connections,
144 users,
145 nodes,
146 hummock_version,
147 meta_backup_manifest_id: _,
148 hummock_write_limits: _,
149 streaming_worker_slot_mappings,
150 serving_worker_slot_mappings,
151 session_params,
152 version,
153 secrets,
154 compute_node_total_cpu_count,
155 } = snapshot;
156
157 for db in databases {
158 catalog_guard.create_database(&db)
159 }
160 for schema in schemas {
161 catalog_guard.create_schema(&schema)
162 }
163 for source in sources {
164 catalog_guard.create_source(&source)
165 }
166 for sink in sinks {
167 catalog_guard.create_sink(&sink)
168 }
169 for subscription in subscriptions {
170 catalog_guard.create_subscription(&subscription)
171 }
172 for table in tables {
173 catalog_guard.create_table(&table)
174 }
175 for index in indexes {
176 catalog_guard.create_index(&index)
177 }
178 for view in views {
179 catalog_guard.create_view(&view)
180 }
181 for function in functions {
182 catalog_guard.create_function(&function)
183 }
184 for connection in connections {
185 catalog_guard.create_connection(&connection)
186 }
187 for secret in &secrets {
188 catalog_guard.create_secret(secret)
189 }
190 for user in users {
191 user_guard.create_user(user)
192 }
193
194 self.worker_node_manager.refresh(
195 nodes,
196 convert_worker_slot_mapping(&streaming_worker_slot_mappings),
197 convert_worker_slot_mapping(&serving_worker_slot_mappings),
198 );
199 self.hummock_snapshot_manager
200 .init(FrontendHummockVersion::from_protobuf(
201 hummock_version.unwrap(),
202 ));
203
204 let snapshot_version = version.unwrap();
205 catalog_guard.set_version(snapshot_version.catalog_version);
206 self.catalog_updated_tx
207 .send(snapshot_version.catalog_version)
208 .unwrap();
209 user_guard.set_version(snapshot_version.catalog_version);
210 self.user_info_updated_tx
211 .send(snapshot_version.catalog_version)
212 .unwrap();
213 *self.session_params.write() =
214 serde_json::from_str(&session_params.unwrap().params).unwrap();
215 LocalSecretManager::global().init_secrets(secrets);
216 LicenseManager::get().update_cpu_core_count(compute_node_total_cpu_count as _);
217 }
218}
219
220impl FrontendObserverNode {
221 pub fn new(
222 worker_node_manager: WorkerNodeManagerRef,
223 catalog: Arc<RwLock<Catalog>>,
224 catalog_updated_tx: Sender<CatalogVersion>,
225 user_info_manager: Arc<RwLock<UserInfoManager>>,
226 user_info_updated_tx: Sender<UserInfoVersion>,
227 hummock_snapshot_manager: HummockSnapshotManagerRef,
228 system_params_manager: LocalSystemParamsManagerRef,
229 session_params: Arc<RwLock<SessionConfig>>,
230 compute_client_pool: ComputeClientPoolRef,
231 ) -> Self {
232 Self {
233 worker_node_manager,
234 catalog,
235 catalog_updated_tx,
236 user_info_manager,
237 user_info_updated_tx,
238 hummock_snapshot_manager,
239 system_params_manager,
240 session_params,
241 compute_client_pool,
242 }
243 }
244
245 fn handle_table_stats_notification(&mut self, table_stats: HummockVersionStats) {
246 let mut catalog_guard = self.catalog.write();
247 catalog_guard.set_table_stats(table_stats);
248 }
249
250 fn handle_catalog_notification(&mut self, resp: SubscribeResponse) {
251 let Some(info) = resp.info.as_ref() else {
252 return;
253 };
254 tracing::trace!(op = ?resp.operation(), ?info, "handle catalog notification");
255
256 let mut catalog_guard = self.catalog.write();
257 match info {
258 Info::Database(database) => match resp.operation() {
259 Operation::Add => catalog_guard.create_database(database),
260 Operation::Delete => catalog_guard.drop_database(database.id),
261 Operation::Update => catalog_guard.update_database(database),
262 _ => panic!("receive an unsupported notify {:?}", resp),
263 },
264 Info::Schema(schema) => match resp.operation() {
265 Operation::Add => catalog_guard.create_schema(schema),
266 Operation::Delete => catalog_guard.drop_schema(schema.database_id, schema.id),
267 Operation::Update => catalog_guard.update_schema(schema),
268 _ => panic!("receive an unsupported notify {:?}", resp),
269 },
270 Info::ObjectGroup(object_group) => {
271 for object in &object_group.objects {
272 let Some(obj) = object.object_info.as_ref() else {
273 continue;
274 };
275 match obj {
276 ObjectInfo::Database(db) => match resp.operation() {
277 Operation::Add => catalog_guard.create_database(db),
278 Operation::Delete => catalog_guard.drop_database(db.id),
279 Operation::Update => catalog_guard.update_database(db),
280 _ => panic!("receive an unsupported notify {:?}", resp),
281 },
282 ObjectInfo::Schema(schema) => match resp.operation() {
283 Operation::Add => catalog_guard.create_schema(schema),
284 Operation::Delete => {
285 catalog_guard.drop_schema(schema.database_id, schema.id)
286 }
287 Operation::Update => catalog_guard.update_schema(schema),
288 _ => panic!("receive an unsupported notify {:?}", resp),
289 },
290 PbObjectInfo::Table(table) => match resp.operation() {
291 Operation::Add => catalog_guard.create_table(table),
292 Operation::Delete => catalog_guard.drop_table(
293 table.database_id,
294 table.schema_id,
295 table.id.into(),
296 ),
297 Operation::Update => {
298 let old_fragment_id = catalog_guard
299 .get_any_table_by_id(&table.id.into())
300 .unwrap()
301 .fragment_id;
302 catalog_guard.update_table(table);
303 if old_fragment_id != table.fragment_id {
304 self.worker_node_manager
307 .remove_streaming_fragment_mapping(&old_fragment_id);
308 }
309 }
310 _ => panic!("receive an unsupported notify {:?}", resp),
311 },
312 PbObjectInfo::Source(source) => match resp.operation() {
313 Operation::Add => catalog_guard.create_source(source),
314 Operation::Delete => catalog_guard.drop_source(
315 source.database_id,
316 source.schema_id,
317 source.id,
318 ),
319 Operation::Update => catalog_guard.update_source(source),
320 _ => panic!("receive an unsupported notify {:?}", resp),
321 },
322 PbObjectInfo::Sink(sink) => match resp.operation() {
323 Operation::Add => catalog_guard.create_sink(sink),
324 Operation::Delete => {
325 catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id)
326 }
327 Operation::Update => catalog_guard.update_sink(sink),
328 _ => panic!("receive an unsupported notify {:?}", resp),
329 },
330 PbObjectInfo::Subscription(subscription) => match resp.operation() {
331 Operation::Add => catalog_guard.create_subscription(subscription),
332 Operation::Delete => catalog_guard.drop_subscription(
333 subscription.database_id,
334 subscription.schema_id,
335 subscription.id,
336 ),
337 Operation::Update => catalog_guard.update_subscription(subscription),
338 _ => panic!("receive an unsupported notify {:?}", resp),
339 },
340 PbObjectInfo::Index(index) => match resp.operation() {
341 Operation::Add => catalog_guard.create_index(index),
342 Operation::Delete => catalog_guard.drop_index(
343 index.database_id,
344 index.schema_id,
345 index.id.into(),
346 ),
347 Operation::Update => catalog_guard.update_index(index),
348 _ => panic!("receive an unsupported notify {:?}", resp),
349 },
350 PbObjectInfo::View(view) => match resp.operation() {
351 Operation::Add => catalog_guard.create_view(view),
352 Operation::Delete => {
353 catalog_guard.drop_view(view.database_id, view.schema_id, view.id)
354 }
355 Operation::Update => catalog_guard.update_view(view),
356 _ => panic!("receive an unsupported notify {:?}", resp),
357 },
358 ObjectInfo::Function(function) => match resp.operation() {
359 Operation::Add => catalog_guard.create_function(function),
360 Operation::Delete => catalog_guard.drop_function(
361 function.database_id,
362 function.schema_id,
363 function.id.into(),
364 ),
365 Operation::Update => catalog_guard.update_function(function),
366 _ => panic!("receive an unsupported notify {:?}", resp),
367 },
368 ObjectInfo::Connection(connection) => match resp.operation() {
369 Operation::Add => catalog_guard.create_connection(connection),
370 Operation::Delete => catalog_guard.drop_connection(
371 connection.database_id,
372 connection.schema_id,
373 connection.id,
374 ),
375 Operation::Update => catalog_guard.update_connection(connection),
376 _ => panic!("receive an unsupported notify {:?}", resp),
377 },
378 ObjectInfo::Secret(secret) => {
379 let mut secret = secret.clone();
380 secret.value =
382 "SECRET VALUE SHOULD NOT BE REVEALED".as_bytes().to_vec();
383 match resp.operation() {
384 Operation::Add => catalog_guard.create_secret(&secret),
385 Operation::Delete => catalog_guard.drop_secret(
386 secret.database_id,
387 secret.schema_id,
388 SecretId::new(secret.id),
389 ),
390 Operation::Update => catalog_guard.update_secret(&secret),
391 _ => panic!("receive an unsupported notify {:?}", resp),
392 }
393 }
394 }
395 }
396 }
397 Info::Function(function) => match resp.operation() {
398 Operation::Add => catalog_guard.create_function(function),
399 Operation::Delete => catalog_guard.drop_function(
400 function.database_id,
401 function.schema_id,
402 function.id.into(),
403 ),
404 Operation::Update => catalog_guard.update_function(function),
405 _ => panic!("receive an unsupported notify {:?}", resp),
406 },
407 Info::Connection(connection) => match resp.operation() {
408 Operation::Add => catalog_guard.create_connection(connection),
409 Operation::Delete => catalog_guard.drop_connection(
410 connection.database_id,
411 connection.schema_id,
412 connection.id,
413 ),
414 Operation::Update => catalog_guard.update_connection(connection),
415 _ => panic!("receive an unsupported notify {:?}", resp),
416 },
417 Info::Secret(secret) => {
418 let mut secret = secret.clone();
419 secret.value = "SECRET VALUE SHOULD NOT BE REVEALED".as_bytes().to_vec();
421 match resp.operation() {
422 Operation::Add => catalog_guard.create_secret(&secret),
423 Operation::Delete => catalog_guard.drop_secret(
424 secret.database_id,
425 secret.schema_id,
426 SecretId::new(secret.id),
427 ),
428 Operation::Update => catalog_guard.update_secret(&secret),
429 _ => panic!("receive an unsupported notify {:?}", resp),
430 }
431 }
432 _ => unreachable!(),
433 }
434 assert!(
435 resp.version > catalog_guard.version(),
436 "resp version={:?}, current version={:?}",
437 resp.version,
438 catalog_guard.version()
439 );
440 catalog_guard.set_version(resp.version);
441 self.catalog_updated_tx.send(resp.version).unwrap();
442 }
443
444 fn handle_user_notification(&mut self, resp: SubscribeResponse) {
445 let Some(info) = resp.info.as_ref() else {
446 return;
447 };
448
449 let mut user_guard = self.user_info_manager.write();
450 match info {
451 Info::User(user) => match resp.operation() {
452 Operation::Add => user_guard.create_user(user.clone()),
453 Operation::Delete => user_guard.drop_user(user.id),
454 Operation::Update => user_guard.update_user(user.clone()),
455 _ => panic!("receive an unsupported notify {:?}", resp),
456 },
457 _ => unreachable!(),
458 }
459 assert!(
460 resp.version > user_guard.version(),
461 "resp version={:?}, current version={:?}",
462 resp.version,
463 user_guard.version()
464 );
465 user_guard.set_version(resp.version);
466 self.user_info_updated_tx.send(resp.version).unwrap();
467 }
468
469 fn handle_fragment_mapping_notification(&mut self, resp: SubscribeResponse) {
470 let Some(info) = resp.info.as_ref() else {
471 return;
472 };
473 match info {
474 Info::StreamingWorkerSlotMapping(streaming_worker_slot_mapping) => {
475 let fragment_id = streaming_worker_slot_mapping.fragment_id;
476 let mapping = || {
477 WorkerSlotMapping::from_protobuf(
478 streaming_worker_slot_mapping.mapping.as_ref().unwrap(),
479 )
480 };
481
482 match resp.operation() {
483 Operation::Add => {
484 self.worker_node_manager
485 .insert_streaming_fragment_mapping(fragment_id, mapping());
486 }
487 Operation::Delete => {
488 self.worker_node_manager
489 .remove_streaming_fragment_mapping(&fragment_id);
490 }
491 Operation::Update => {
492 self.worker_node_manager
493 .update_streaming_fragment_mapping(fragment_id, mapping());
494 }
495 _ => panic!("receive an unsupported notify {:?}", resp),
496 }
497 }
498 _ => unreachable!(),
499 }
500 }
501
502 fn handle_fragment_serving_mapping_notification(
503 &mut self,
504 mappings: Vec<FragmentWorkerSlotMapping>,
505 op: Operation,
506 ) {
507 match op {
508 Operation::Add | Operation::Update => {
509 self.worker_node_manager
510 .upsert_serving_fragment_mapping(convert_worker_slot_mapping(&mappings));
511 }
512 Operation::Delete => self.worker_node_manager.remove_serving_fragment_mapping(
513 &mappings.into_iter().map(|m| m.fragment_id).collect_vec(),
514 ),
515 Operation::Snapshot => {
516 self.worker_node_manager
517 .set_serving_fragment_mapping(convert_worker_slot_mapping(&mappings));
518 }
519 _ => panic!("receive an unsupported notify {:?}", op),
520 }
521 }
522
523 fn handle_hummock_snapshot_notification(&self, deltas: HummockVersionDeltas) {
525 self.hummock_snapshot_manager.update(deltas);
526 }
527
528 fn handle_secret_notification(&mut self, resp: SubscribeResponse) {
529 let resp_op = resp.operation();
530 let Some(Info::Secret(secret)) = resp.info else {
531 unreachable!();
532 };
533 match resp_op {
534 Operation::Add => {
535 LocalSecretManager::global().add_secret(secret.id, secret.value);
536 }
537 Operation::Delete => {
538 LocalSecretManager::global().remove_secret(secret.id);
539 }
540 Operation::Update => {
541 LocalSecretManager::global().update_secret(secret.id, secret.value);
542 }
543 _ => {
544 panic!("error type notification");
545 }
546 }
547 }
548
549 fn update_worker_node_manager(&self, operation: Operation, node: WorkerNode) {
552 tracing::debug!(
553 "Update worker nodes, operation: {:?}, node: {:?}",
554 operation,
555 node
556 );
557
558 match operation {
559 Operation::Add => self.worker_node_manager.add_worker_node(node),
560 Operation::Delete => self.worker_node_manager.remove_worker_node(node),
561 _ => (),
562 }
563 }
564}
565
566fn convert_worker_slot_mapping(
567 worker_slot_mappings: &[FragmentWorkerSlotMapping],
568) -> HashMap<FragmentId, WorkerSlotMapping> {
569 worker_slot_mappings
570 .iter()
571 .map(
572 |FragmentWorkerSlotMapping {
573 fragment_id,
574 mapping,
575 }| {
576 let mapping = WorkerSlotMapping::from_protobuf(mapping.as_ref().unwrap());
577 (*fragment_id, mapping)
578 },
579 )
580 .collect()
581}