1use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use sea_orm::{
20 ConnectionTrait, DatabaseBackend, DatabaseConnection, FromQueryResult, Statement,
21 TransactionTrait, Value,
22};
23use thiserror_ext::AsReport;
24use tokio::sync::watch;
25use tokio::sync::watch::Receiver;
26use tokio::time;
27
28use crate::rpc::election::META_ELECTION_KEY;
29use crate::{ElectionClient, ElectionMember, MetaResult};
30
31pub struct SqlBackendElectionClient<T: SqlDriver> {
32 id: String,
33 driver: Arc<T>,
34 is_leader_sender: watch::Sender<bool>,
35}
36
37impl<T: SqlDriver> SqlBackendElectionClient<T> {
38 pub fn new(id: String, driver: Arc<T>) -> Self {
39 let (sender, _) = watch::channel(false);
40 Self {
41 id,
42 driver,
43 is_leader_sender: sender,
44 }
45 }
46}
47
48#[derive(Debug, FromQueryResult)]
49pub struct ElectionRow {
50 service: String,
51 id: String,
52}
53
54#[async_trait::async_trait]
55pub trait SqlDriver: Send + Sync + 'static {
56 async fn init_database(&self) -> MetaResult<()>;
57
58 async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()>;
59
60 async fn try_campaign(&self, service_name: &str, id: &str, ttl: i64)
61 -> MetaResult<ElectionRow>;
62 async fn leader(&self, service_name: &str) -> MetaResult<Option<ElectionRow>>;
63
64 async fn candidates(&self, service_name: &str) -> MetaResult<Vec<ElectionRow>>;
65
66 async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()>;
67
68 async fn trim_candidates(&self, service_name: &str, timeout: i64) -> MetaResult<()>;
69}
70
71pub trait SqlDriverCommon {
72 const ELECTION_LEADER_TABLE_NAME: &'static str = "election_leader";
73 const ELECTION_MEMBER_TABLE_NAME: &'static str = "election_member";
74
75 fn election_table_name() -> &'static str {
76 Self::ELECTION_LEADER_TABLE_NAME
77 }
78 fn member_table_name() -> &'static str {
79 Self::ELECTION_MEMBER_TABLE_NAME
80 }
81}
82
83impl SqlDriverCommon for MySqlDriver {}
84
85impl SqlDriverCommon for PostgresDriver {}
86
87impl SqlDriverCommon for SqliteDriver {}
88
89pub struct MySqlDriver {
90 pub(crate) conn: DatabaseConnection,
91}
92
93impl MySqlDriver {
94 pub fn new(conn: DatabaseConnection) -> Arc<Self> {
95 Arc::new(Self { conn })
96 }
97}
98
99pub struct PostgresDriver {
100 pub(crate) conn: DatabaseConnection,
101}
102
103impl PostgresDriver {
104 pub fn new(conn: DatabaseConnection) -> Arc<Self> {
105 Arc::new(Self { conn })
106 }
107}
108
109pub struct SqliteDriver {
110 pub(crate) conn: DatabaseConnection,
111}
112
113impl SqliteDriver {
114 pub fn new(conn: DatabaseConnection) -> Arc<Self> {
115 Arc::new(Self { conn })
116 }
117}
118
119#[async_trait::async_trait]
120impl SqlDriver for SqliteDriver {
121 async fn init_database(&self) -> MetaResult<()> {
122 self.conn.execute(
123 Statement::from_string(DatabaseBackend::Sqlite, format!(
124 r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service, id));"#,
125 table = Self::member_table_name()
126 ))).await?;
127
128 self.conn.execute(
129 Statement::from_string(DatabaseBackend::Sqlite, format!(
130 r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service));"#,
131 table = Self::election_table_name()
132 ))).await?;
133
134 Ok(())
135 }
136
137 async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()> {
138 self.conn
139 .execute(Statement::from_sql_and_values(
140 DatabaseBackend::Sqlite,
141 format!(
142 r#"INSERT INTO {table} (id, service, last_heartbeat)
143VALUES($1, $2, CURRENT_TIMESTAMP)
144ON CONFLICT (id, service)
145DO
146 UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat;
147"#,
148 table = Self::member_table_name()
149 ),
150 vec![Value::from(id), Value::from(service_name)],
151 ))
152 .await?;
153 Ok(())
154 }
155
156 async fn try_campaign(
157 &self,
158 service_name: &str,
159 id: &str,
160 ttl: i64,
161 ) -> MetaResult<ElectionRow> {
162 let query_result = self.conn
163 .query_one(Statement::from_sql_and_values(
164 DatabaseBackend::Sqlite,
165 format!(
166 r#"INSERT INTO {table} (service, id, last_heartbeat)
167 VALUES ($1, $2, CURRENT_TIMESTAMP)
168 ON CONFLICT (service)
169 DO UPDATE
170 SET id = CASE
171 WHEN DATETIME({table}.last_heartbeat, '+' || $3 || ' second') < CURRENT_TIMESTAMP THEN EXCLUDED.id
172 ELSE {table}.id
173 END,
174 last_heartbeat = CASE
175 WHEN DATETIME({table}.last_heartbeat, '+' || $3 || ' seconds') < CURRENT_TIMESTAMP THEN EXCLUDED.last_heartbeat
176 WHEN {table}.id = EXCLUDED.id THEN EXCLUDED.last_heartbeat
177 ELSE {table}.last_heartbeat
178 END
179 RETURNING service, id, last_heartbeat;
180 "#,
181 table = Self::election_table_name()
182 ),
183 vec![Value::from(service_name), Value::from(id), Value::from(ttl)],
184 ))
185 .await?;
186
187 let row = query_result
188 .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
189 .transpose()?;
190
191 let row = row.ok_or_else(|| anyhow!("bad result from sqlite"))?;
192
193 Ok(row)
194 }
195
196 async fn leader(&self, service_name: &str) -> MetaResult<Option<ElectionRow>> {
197 let query_result = self
198 .conn
199 .query_one(Statement::from_sql_and_values(
200 DatabaseBackend::Sqlite,
201 format!(
202 r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#,
203 table = Self::election_table_name()
204 ),
205 vec![Value::from(service_name)],
206 ))
207 .await?;
208
209 let row = query_result
210 .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
211 .transpose()?;
212
213 Ok(row)
214 }
215
216 async fn candidates(&self, service_name: &str) -> MetaResult<Vec<ElectionRow>> {
217 let all = self
218 .conn
219 .query_all(Statement::from_sql_and_values(
220 DatabaseBackend::Sqlite,
221 format!(
222 r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#,
223 table = Self::member_table_name()
224 ),
225 vec![Value::from(service_name)],
226 ))
227 .await?;
228
229 let rows = all
230 .into_iter()
231 .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
232 .collect::<Result<_, sea_orm::DbErr>>()?;
233
234 Ok(rows)
235 }
236
237 async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()> {
238 let txn = self.conn.begin().await?;
239
240 txn.execute(Statement::from_sql_and_values(
241 DatabaseBackend::Sqlite,
242 format!(
243 r#"
244 DELETE FROM {table} WHERE service = $1 AND id = $2;
245 "#,
246 table = Self::election_table_name()
247 ),
248 vec![Value::from(service_name), Value::from(id)],
249 ))
250 .await?;
251
252 txn.execute(Statement::from_sql_and_values(
253 DatabaseBackend::Sqlite,
254 format!(
255 r#"
256 DELETE FROM {table} WHERE service = $1 AND id = $2;
257 "#,
258 table = Self::member_table_name()
259 ),
260 vec![Value::from(service_name), Value::from(id)],
261 ))
262 .await?;
263
264 txn.commit().await?;
265
266 Ok(())
267 }
268
269 async fn trim_candidates(&self, service_name: &str, timeout: i64) -> MetaResult<()> {
270 self.conn
271 .execute(Statement::from_sql_and_values(
272 DatabaseBackend::Sqlite,
273 format!(
274 r#"
275 DELETE FROM {table} WHERE service = $1 AND DATETIME({table}.last_heartbeat, '+' || $2 || ' second') < CURRENT_TIMESTAMP;
276 "#,
277 table = Self::member_table_name()
278 ),
279 vec![Value::from(service_name), Value::from(timeout)],
280 ))
281 .await?;
282
283 Ok(())
284 }
285}
286
287#[async_trait::async_trait]
288impl SqlDriver for MySqlDriver {
289 async fn init_database(&self) -> MetaResult<()> {
290 self.conn.execute(
291 Statement::from_string(DatabaseBackend::MySql, format!(
292 r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service, id));"#,
293 table = Self::member_table_name()
294 ))).await?;
295
296 self.conn.execute(
297 Statement::from_string(DatabaseBackend::MySql, format!(
298 r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service));"#,
299 table = Self::election_table_name()
300 ))).await?;
301
302 Ok(())
303 }
304
305 async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()> {
306 self.conn
307 .execute(Statement::from_sql_and_values(
308 DatabaseBackend::MySql,
309 format!(
310 r#"INSERT INTO {table} (id, service, last_heartbeat)
311 VALUES(?, ?, NOW())
312 ON duplicate KEY
313 UPDATE last_heartbeat = VALUES(last_heartbeat);
314 "#,
315 table = Self::member_table_name()
316 ),
317 vec![Value::from(id), Value::from(service_name)],
318 ))
319 .await?;
320
321 Ok(())
322 }
323
324 async fn try_campaign(
325 &self,
326 service_name: &str,
327 id: &str,
328 ttl: i64,
329 ) -> MetaResult<ElectionRow> {
330 self.conn
331 .execute(Statement::from_sql_and_values(
332 DatabaseBackend::MySql,
333 format!(
334 r#"INSERT
335 IGNORE
336 INTO {table} (service, id, last_heartbeat)
337 VALUES (?, ?, NOW())
338 ON duplicate KEY
339 UPDATE id = if(last_heartbeat < NOW() - INTERVAL ? SECOND,
340 VALUES(id), id),
341 last_heartbeat = if(id =
342 VALUES(id),
343 VALUES(last_heartbeat), last_heartbeat);"#,
344 table = Self::election_table_name()
345 ),
346 vec![Value::from(service_name), Value::from(id), Value::from(ttl)],
347 ))
348 .await?;
349
350 let query_result = self
351 .conn
352 .query_one(Statement::from_sql_and_values(
353 DatabaseBackend::MySql,
354 format!(
355 r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#,
356 table = Self::election_table_name(),
357 ),
358 vec![Value::from(service_name)],
359 ))
360 .await?;
361
362 let row = query_result
363 .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
364 .transpose()?;
365
366 let row = row.ok_or_else(|| anyhow!("bad result from mysql"))?;
367
368 Ok(row)
369 }
370
371 async fn leader(&self, service_name: &str) -> MetaResult<Option<ElectionRow>> {
372 let query_result = self
373 .conn
374 .query_one(Statement::from_sql_and_values(
375 DatabaseBackend::MySql,
376 format!(
377 r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#,
378 table = Self::election_table_name()
379 ),
380 vec![Value::from(service_name)],
381 ))
382 .await?;
383
384 let row = query_result
385 .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
386 .transpose()?;
387
388 Ok(row)
389 }
390
391 async fn candidates(&self, service_name: &str) -> MetaResult<Vec<ElectionRow>> {
392 let all = self
393 .conn
394 .query_all(Statement::from_sql_and_values(
395 DatabaseBackend::MySql,
396 format!(
397 r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#,
398 table = Self::member_table_name()
399 ),
400 vec![Value::from(service_name)],
401 ))
402 .await?;
403
404 let rows = all
405 .into_iter()
406 .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
407 .collect::<Result<_, sea_orm::DbErr>>()?;
408
409 Ok(rows)
410 }
411
412 async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()> {
413 let txn = self.conn.begin().await?;
414
415 txn.execute(Statement::from_sql_and_values(
416 DatabaseBackend::MySql,
417 format!(
418 r#"
419 DELETE FROM {table} WHERE service = ? AND id = ?;
420 "#,
421 table = Self::election_table_name()
422 ),
423 vec![Value::from(service_name), Value::from(id)],
424 ))
425 .await?;
426
427 txn.execute(Statement::from_sql_and_values(
428 DatabaseBackend::MySql,
429 format!(
430 r#"
431 DELETE FROM {table} WHERE service = ? AND id = ?;
432 "#,
433 table = Self::member_table_name()
434 ),
435 vec![Value::from(service_name), Value::from(id)],
436 ))
437 .await?;
438
439 txn.commit().await?;
440
441 Ok(())
442 }
443
444 async fn trim_candidates(&self, service_name: &str, timeout: i64) -> MetaResult<()> {
445 self.conn
446 .execute(Statement::from_sql_and_values(
447 DatabaseBackend::MySql,
448 format!(
449 r#"
450 DELETE FROM {table} WHERE service = ? AND last_heartbeat < NOW() - INTERVAL ? SECOND;
451 "#,
452 table = Self::member_table_name()
453 ),
454 vec![Value::from(service_name), Value::from(timeout)],
455 ))
456 .await?;
457
458 Ok(())
459 }
460}
461
462#[async_trait::async_trait]
463impl SqlDriver for PostgresDriver {
464 async fn init_database(&self) -> MetaResult<()> {
465 self.conn.execute(
466 Statement::from_string(DatabaseBackend::Postgres, format!(
467 r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR, id VARCHAR, last_heartbeat TIMESTAMPTZ, PRIMARY KEY (service, id));"#,
468 table = Self::member_table_name()
469 ))).await?;
470
471 self.conn.execute(
472 Statement::from_string(DatabaseBackend::Postgres, format!(
473 r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR, id VARCHAR, last_heartbeat TIMESTAMPTZ, PRIMARY KEY (service));"#,
474 table = Self::election_table_name()
475 ))).await?;
476
477 Ok(())
478 }
479
480 async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()> {
481 self.conn
482 .execute(Statement::from_sql_and_values(
483 DatabaseBackend::Postgres,
484 format!(
485 r#"INSERT INTO {table} (id, service, last_heartbeat)
486 VALUES($1, $2, NOW())
487 ON CONFLICT (id, service)
488 DO
489 UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat;
490 "#,
491 table = Self::member_table_name()
492 ),
493 vec![Value::from(id), Value::from(service_name)],
494 ))
495 .await?;
496
497 Ok(())
498 }
499
500 async fn try_campaign(
501 &self,
502 service_name: &str,
503 id: &str,
504 ttl: i64,
505 ) -> MetaResult<ElectionRow> {
506 let query_result = self
507 .conn
508 .query_one(Statement::from_sql_and_values(
509 DatabaseBackend::Postgres,
510 format!(
511 r#"INSERT INTO {table} (service, id, last_heartbeat)
512 VALUES ($1, $2, NOW())
513 ON CONFLICT (service)
514 DO UPDATE
515 SET id = CASE
516 WHEN {table}.last_heartbeat < NOW() - $3::INTERVAL THEN EXCLUDED.id
517 ELSE {table}.id
518 END,
519 last_heartbeat = CASE
520 WHEN {table}.last_heartbeat < NOW() - $3::INTERVAL THEN EXCLUDED.last_heartbeat
521 WHEN {table}.id = EXCLUDED.id THEN EXCLUDED.last_heartbeat
522 ELSE {table}.last_heartbeat
523 END
524 RETURNING service, id, last_heartbeat;
525 "#,
526 table = Self::election_table_name()
527 ),
528 vec![
529 Value::from(service_name),
530 Value::from(id),
531 Value::from(ttl.to_string()),
533 ],
534 ))
535 .await?;
536
537 let row = query_result
538 .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
539 .transpose()?;
540
541 let row = row.ok_or_else(|| anyhow!("bad result from postgres"))?;
542
543 Ok(row)
544 }
545
546 async fn leader(&self, service_name: &str) -> MetaResult<Option<ElectionRow>> {
547 let query_result = self
548 .conn
549 .query_one(Statement::from_sql_and_values(
550 DatabaseBackend::Postgres,
551 format!(
552 r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#,
553 table = Self::election_table_name()
554 ),
555 vec![Value::from(service_name)],
556 ))
557 .await?;
558
559 let row = query_result
560 .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
561 .transpose()?;
562
563 Ok(row)
564 }
565
566 async fn candidates(&self, service_name: &str) -> MetaResult<Vec<ElectionRow>> {
567 let all = self
568 .conn
569 .query_all(Statement::from_sql_and_values(
570 DatabaseBackend::Postgres,
571 format!(
572 r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#,
573 table = Self::member_table_name()
574 ),
575 vec![Value::from(service_name)],
576 ))
577 .await?;
578
579 let rows = all
580 .into_iter()
581 .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
582 .collect::<Result<_, sea_orm::DbErr>>()?;
583
584 Ok(rows)
585 }
586
587 async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()> {
588 let txn = self.conn.begin().await?;
589
590 txn.execute(Statement::from_sql_and_values(
591 DatabaseBackend::Postgres,
592 format!(
593 r#"
594 DELETE FROM {table} WHERE service = $1 AND id = $2;
595 "#,
596 table = Self::election_table_name()
597 ),
598 vec![Value::from(service_name), Value::from(id)],
599 ))
600 .await?;
601
602 txn.execute(Statement::from_sql_and_values(
603 DatabaseBackend::Postgres,
604 format!(
605 r#"
606 DELETE FROM {table} WHERE service = $1 AND id = $2;
607 "#,
608 table = Self::member_table_name()
609 ),
610 vec![Value::from(service_name), Value::from(id)],
611 ))
612 .await?;
613
614 txn.commit().await?;
615
616 Ok(())
617 }
618
619 async fn trim_candidates(&self, service_name: &str, timeout: i64) -> MetaResult<()> {
620 self.conn
621 .execute(Statement::from_sql_and_values(
622 DatabaseBackend::Postgres,
623 format!(
624 r#"
625 DELETE FROM {table} WHERE {table}.service = $1 AND {table}.last_heartbeat < NOW() - $2::INTERVAL;
626 "#,
627 table = Self::member_table_name()
628 ),
629 vec![Value::from(service_name), Value::from(timeout.to_string())],
630 ))
631 .await?;
632
633 Ok(())
634 }
635}
636
637#[async_trait::async_trait]
638impl<T> ElectionClient for SqlBackendElectionClient<T>
639where
640 T: SqlDriver + Send + Sync + 'static,
641{
642 async fn init(&self) -> MetaResult<()> {
643 tracing::info!("initializing database for Sql backend election client");
644 self.driver.init_database().await
645 }
646
647 fn id(&self) -> MetaResult<String> {
648 Ok(self.id.clone())
649 }
650
651 async fn run_once(&self, ttl: i64, stop: Receiver<()>) -> MetaResult<()> {
652 let stop = stop.clone();
653
654 let member_refresh_driver = self.driver.clone();
655
656 let id = self.id.clone();
657
658 let mut member_refresh_stop = stop.clone();
659
660 let handle = tokio::spawn(async move {
661 let mut ticker = tokio::time::interval(Duration::from_secs(1));
662
663 loop {
664 tokio::select! {
665 _ = ticker.tick() => {
666
667 if let Err(e) = member_refresh_driver
668 .update_heartbeat(META_ELECTION_KEY, id.as_str())
669 .await {
670
671 tracing::debug!(error = %e.as_report(), "keep alive for member {} failed", id);
672 continue
673 }
674 }
675 _ = member_refresh_stop.changed() => {
676 return;
677 }
678 }
679 }
680 });
681
682 let _guard = scopeguard::guard(handle, |handle| handle.abort());
683
684 self.is_leader_sender.send_replace(false);
685
686 let mut timeout_ticker = time::interval(Duration::from_secs_f64(ttl as f64 / 2.0));
687 timeout_ticker.reset();
688 let mut stop = stop.clone();
689
690 let mut is_leader = false;
691
692 let mut election_ticker = time::interval(Duration::from_secs(1));
693
694 let mut prev_leader = "".to_owned();
695
696 loop {
697 tokio::select! {
698 _ = election_ticker.tick() => {
699 let election_row = self
700 .driver
701 .try_campaign(META_ELECTION_KEY, self.id.as_str(), ttl)
702 .await?;
703
704 assert_eq!(election_row.service, META_ELECTION_KEY);
705
706 if election_row.id.eq_ignore_ascii_case(self.id.as_str()) {
707 if !is_leader{
708 self.is_leader_sender.send_replace(true);
709 is_leader = true;
710 } else {
711 self.is_leader_sender.send_replace(false);
712 }
713 } else if is_leader {
714 tracing::warn!("leader has been changed to {}", election_row.id);
715 break;
716 } else if prev_leader != election_row.id {
717 tracing::info!("leader is {}", election_row.id);
718 prev_leader.clone_from(&election_row.id)
719 }
720
721 timeout_ticker.reset();
722
723 if is_leader {
724 if let Err(e) = self.driver.trim_candidates(META_ELECTION_KEY, ttl * 2).await {
725 tracing::warn!(error = %e.as_report(), "trim candidates failed");
726 }
727 }
728 }
729 _ = timeout_ticker.tick() => {
730 tracing::error!("member {} election timeout", self.id);
731 break;
732 }
733 _ = stop.changed() => {
734 tracing::info!("stop signal received when observing");
735
736 if is_leader {
737 tracing::info!("leader {} resigning", self.id);
738 if let Err(e) = self.driver.resign(META_ELECTION_KEY, self.id.as_str()).await {
739 tracing::warn!(error = %e.as_report(), "resign failed");
740 }
741 }
742
743 return Ok(());
744 }
745 }
746 }
747 self.is_leader_sender.send_replace(false);
748
749 return Ok(());
750 }
751
752 fn subscribe(&self) -> Receiver<bool> {
753 self.is_leader_sender.subscribe()
754 }
755
756 async fn leader(&self) -> MetaResult<Option<ElectionMember>> {
757 let row = self.driver.leader(META_ELECTION_KEY).await?;
758 Ok(row.map(|row| ElectionMember {
759 id: row.id,
760 is_leader: true,
761 }))
762 }
763
764 async fn get_members(&self) -> MetaResult<Vec<ElectionMember>> {
765 let leader = self.leader().await?;
766 let members = self.driver.candidates(META_ELECTION_KEY).await?;
767
768 Ok(members
769 .into_iter()
770 .map(|row| {
771 let is_leader = leader
772 .as_ref()
773 .map(|leader| leader.id.eq_ignore_ascii_case(row.id.as_str()))
774 .unwrap_or(false);
775
776 ElectionMember {
777 id: row.id,
778 is_leader,
779 }
780 })
781 .collect())
782 }
783
784 fn is_leader(&self) -> bool {
785 *self.is_leader_sender.borrow()
786 }
787}
788
789#[cfg(not(madsim))]
790#[cfg(test)]
791mod tests {
792 use std::sync::Arc;
793
794 use sea_orm::{ConnectionTrait, Database, DatabaseConnection, DbBackend, Statement};
795 use tokio::sync::watch;
796
797 use crate::rpc::election::sql::{SqlBackendElectionClient, SqlDriverCommon, SqliteDriver};
798 use crate::{ElectionClient, MetaResult};
799
800 async fn prepare_sqlite_env() -> MetaResult<DatabaseConnection> {
801 let db: DatabaseConnection = Database::connect("sqlite::memory:").await?;
802
803 db.execute(Statement::from_sql_and_values(
804 DbBackend::Sqlite,
805 format!("CREATE TABLE {table} (service VARCHAR(256) PRIMARY KEY, id VARCHAR(256), last_heartbeat DATETIME)",
806 table = SqliteDriver::election_table_name()),
807 vec![],
808 ))
809 .await?;
810
811 db.execute(Statement::from_sql_and_values(
812 DbBackend::Sqlite,
813 format!("CREATE TABLE {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service, id))",
814 table = SqliteDriver::member_table_name()),
815 vec![],
816 ))
817 .await?;
818
819 Ok(db)
820 }
821
822 #[tokio::test]
823 async fn test_sql_election() {
824 let id = "test_id".to_owned();
825 let conn = prepare_sqlite_env().await.unwrap();
826
827 let provider = SqliteDriver { conn };
828 let (sender, _) = watch::channel(false);
829 let sql_election_client: Arc<dyn ElectionClient> = Arc::new(SqlBackendElectionClient {
830 id,
831 driver: Arc::new(provider),
832 is_leader_sender: sender,
833 });
834 let (stop_sender, _) = watch::channel(());
835
836 let stop_receiver = stop_sender.subscribe();
837
838 let mut receiver = sql_election_client.subscribe();
839 let client_ = sql_election_client.clone();
840 tokio::spawn(async move { client_.run_once(10, stop_receiver).await.unwrap() });
841
842 loop {
843 receiver.changed().await.unwrap();
844 if *receiver.borrow() {
845 assert!(sql_election_client.is_leader());
846 break;
847 }
848 }
849 }
850
851 #[tokio::test]
852 async fn test_sql_election_multi() {
853 let (stop_sender, _) = watch::channel(());
854
855 let mut clients = vec![];
856
857 let conn = prepare_sqlite_env().await.unwrap();
858 for i in 1..3 {
859 let id = format!("test_id_{}", i);
860 let provider = SqliteDriver { conn: conn.clone() };
861 let (sender, _) = watch::channel(false);
862 let sql_election_client: Arc<dyn ElectionClient> = Arc::new(SqlBackendElectionClient {
863 id,
864 driver: Arc::new(provider),
865 is_leader_sender: sender,
866 });
867
868 let stop_receiver = stop_sender.subscribe();
869 let client_ = sql_election_client.clone();
870 tokio::spawn(async move { client_.run_once(10, stop_receiver).await.unwrap() });
871 clients.push(sql_election_client);
872 }
873
874 let mut is_leaders = vec![];
875
876 for client in clients {
877 is_leaders.push(client.is_leader());
878 }
879
880 assert!(is_leaders.iter().filter(|&x| *x).count() <= 1);
881 }
882}