risingwave_meta/rpc/election/
sql.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use sea_orm::{
20    ConnectionTrait, DatabaseBackend, DatabaseConnection, FromQueryResult, Statement,
21    TransactionTrait, Value,
22};
23use thiserror_ext::AsReport;
24use tokio::sync::watch;
25use tokio::sync::watch::Receiver;
26use tokio::time;
27
28use crate::rpc::election::META_ELECTION_KEY;
29use crate::{ElectionClient, ElectionMember, MetaResult};
30
31pub struct SqlBackendElectionClient<T: SqlDriver> {
32    id: String,
33    driver: Arc<T>,
34    is_leader_sender: watch::Sender<bool>,
35}
36
37impl<T: SqlDriver> SqlBackendElectionClient<T> {
38    pub fn new(id: String, driver: Arc<T>) -> Self {
39        let (sender, _) = watch::channel(false);
40        Self {
41            id,
42            driver,
43            is_leader_sender: sender,
44        }
45    }
46}
47
48#[derive(Debug, FromQueryResult)]
49pub struct ElectionRow {
50    service: String,
51    id: String,
52}
53
54#[async_trait::async_trait]
55pub trait SqlDriver: Send + Sync + 'static {
56    async fn init_database(&self) -> MetaResult<()>;
57
58    async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()>;
59
60    async fn try_campaign(&self, service_name: &str, id: &str, ttl: i64)
61    -> MetaResult<ElectionRow>;
62    async fn leader(&self, service_name: &str) -> MetaResult<Option<ElectionRow>>;
63
64    async fn candidates(&self, service_name: &str) -> MetaResult<Vec<ElectionRow>>;
65
66    async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()>;
67
68    async fn trim_candidates(&self, service_name: &str, timeout: i64) -> MetaResult<()>;
69}
70
71pub trait SqlDriverCommon {
72    const ELECTION_LEADER_TABLE_NAME: &'static str = "election_leader";
73    const ELECTION_MEMBER_TABLE_NAME: &'static str = "election_member";
74
75    fn election_table_name() -> &'static str {
76        Self::ELECTION_LEADER_TABLE_NAME
77    }
78    fn member_table_name() -> &'static str {
79        Self::ELECTION_MEMBER_TABLE_NAME
80    }
81}
82
83impl SqlDriverCommon for MySqlDriver {}
84
85impl SqlDriverCommon for PostgresDriver {}
86
87impl SqlDriverCommon for SqliteDriver {}
88
89pub struct MySqlDriver {
90    pub(crate) conn: DatabaseConnection,
91}
92
93impl MySqlDriver {
94    pub fn new(conn: DatabaseConnection) -> Arc<Self> {
95        Arc::new(Self { conn })
96    }
97}
98
99pub struct PostgresDriver {
100    pub(crate) conn: DatabaseConnection,
101}
102
103impl PostgresDriver {
104    pub fn new(conn: DatabaseConnection) -> Arc<Self> {
105        Arc::new(Self { conn })
106    }
107}
108
109pub struct SqliteDriver {
110    pub(crate) conn: DatabaseConnection,
111}
112
113impl SqliteDriver {
114    pub fn new(conn: DatabaseConnection) -> Arc<Self> {
115        Arc::new(Self { conn })
116    }
117}
118
119#[async_trait::async_trait]
120impl SqlDriver for SqliteDriver {
121    async fn init_database(&self) -> MetaResult<()> {
122        self.conn.execute(
123            Statement::from_string(DatabaseBackend::Sqlite, format!(
124                r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service, id));"#,
125                table = Self::member_table_name()
126            ))).await?;
127
128        self.conn.execute(
129            Statement::from_string(DatabaseBackend::Sqlite, format!(
130                r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service));"#,
131                table = Self::election_table_name()
132            ))).await?;
133
134        Ok(())
135    }
136
137    async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()> {
138        self.conn
139            .execute(Statement::from_sql_and_values(
140                DatabaseBackend::Sqlite,
141                format!(
142                    r#"INSERT INTO {table} (id, service, last_heartbeat)
143VALUES($1, $2, CURRENT_TIMESTAMP)
144ON CONFLICT (id, service)
145DO
146   UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat;
147"#,
148                    table = Self::member_table_name()
149                ),
150                vec![Value::from(id), Value::from(service_name)],
151            ))
152            .await?;
153        Ok(())
154    }
155
156    async fn try_campaign(
157        &self,
158        service_name: &str,
159        id: &str,
160        ttl: i64,
161    ) -> MetaResult<ElectionRow> {
162        let query_result = self.conn
163            .query_one(Statement::from_sql_and_values(
164                DatabaseBackend::Sqlite,
165                format!(
166                    r#"INSERT INTO {table} (service, id, last_heartbeat)
167        VALUES ($1, $2, CURRENT_TIMESTAMP)
168        ON CONFLICT (service)
169            DO UPDATE
170            SET id             = CASE
171                                     WHEN DATETIME({table}.last_heartbeat, '+' || $3 || ' second') < CURRENT_TIMESTAMP THEN EXCLUDED.id
172                                     ELSE {table}.id
173                END,
174                last_heartbeat = CASE
175                                     WHEN DATETIME({table}.last_heartbeat, '+' || $3 || ' seconds') < CURRENT_TIMESTAMP THEN EXCLUDED.last_heartbeat
176                                     WHEN {table}.id = EXCLUDED.id THEN EXCLUDED.last_heartbeat
177                                     ELSE {table}.last_heartbeat
178                    END
179        RETURNING service, id, last_heartbeat;
180        "#,
181                    table = Self::election_table_name()
182                ),
183                vec![Value::from(service_name), Value::from(id), Value::from(ttl)],
184            ))
185            .await?;
186
187        let row = query_result
188            .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
189            .transpose()?;
190
191        let row = row.ok_or_else(|| anyhow!("bad result from sqlite"))?;
192
193        Ok(row)
194    }
195
196    async fn leader(&self, service_name: &str) -> MetaResult<Option<ElectionRow>> {
197        let query_result = self
198            .conn
199            .query_one(Statement::from_sql_and_values(
200                DatabaseBackend::Sqlite,
201                format!(
202                    r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#,
203                    table = Self::election_table_name()
204                ),
205                vec![Value::from(service_name)],
206            ))
207            .await?;
208
209        let row = query_result
210            .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
211            .transpose()?;
212
213        Ok(row)
214    }
215
216    async fn candidates(&self, service_name: &str) -> MetaResult<Vec<ElectionRow>> {
217        let all = self
218            .conn
219            .query_all(Statement::from_sql_and_values(
220                DatabaseBackend::Sqlite,
221                format!(
222                    r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#,
223                    table = Self::member_table_name()
224                ),
225                vec![Value::from(service_name)],
226            ))
227            .await?;
228
229        let rows = all
230            .into_iter()
231            .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
232            .collect::<Result<_, sea_orm::DbErr>>()?;
233
234        Ok(rows)
235    }
236
237    async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()> {
238        let txn = self.conn.begin().await?;
239
240        txn.execute(Statement::from_sql_and_values(
241            DatabaseBackend::Sqlite,
242            format!(
243                r#"
244            DELETE FROM {table} WHERE service = $1 AND id = $2;
245            "#,
246                table = Self::election_table_name()
247            ),
248            vec![Value::from(service_name), Value::from(id)],
249        ))
250        .await?;
251
252        txn.execute(Statement::from_sql_and_values(
253            DatabaseBackend::Sqlite,
254            format!(
255                r#"
256            DELETE FROM {table} WHERE service = $1 AND id = $2;
257            "#,
258                table = Self::member_table_name()
259            ),
260            vec![Value::from(service_name), Value::from(id)],
261        ))
262        .await?;
263
264        txn.commit().await?;
265
266        Ok(())
267    }
268
269    async fn trim_candidates(&self, service_name: &str, timeout: i64) -> MetaResult<()> {
270        self.conn
271            .execute(Statement::from_sql_and_values(
272                DatabaseBackend::Sqlite,
273                format!(
274                    r#"
275                    DELETE FROM {table} WHERE service = $1 AND DATETIME({table}.last_heartbeat, '+' || $2 || ' second') < CURRENT_TIMESTAMP;
276                    "#,
277                    table = Self::member_table_name()
278                ),
279                vec![Value::from(service_name), Value::from(timeout)],
280            ))
281            .await?;
282
283        Ok(())
284    }
285}
286
287#[async_trait::async_trait]
288impl SqlDriver for MySqlDriver {
289    async fn init_database(&self) -> MetaResult<()> {
290        self.conn.execute(
291            Statement::from_string(DatabaseBackend::MySql, format!(
292                r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service, id));"#,
293                table = Self::member_table_name()
294            ))).await?;
295
296        self.conn.execute(
297            Statement::from_string(DatabaseBackend::MySql, format!(
298                r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service));"#,
299                table = Self::election_table_name()
300            ))).await?;
301
302        Ok(())
303    }
304
305    async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()> {
306        self.conn
307            .execute(Statement::from_sql_and_values(
308                DatabaseBackend::MySql,
309                format!(
310                    r#"INSERT INTO {table} (id, service, last_heartbeat)
311        VALUES(?, ?, NOW())
312        ON duplicate KEY
313           UPDATE last_heartbeat = VALUES(last_heartbeat);
314        "#,
315                    table = Self::member_table_name()
316                ),
317                vec![Value::from(id), Value::from(service_name)],
318            ))
319            .await?;
320
321        Ok(())
322    }
323
324    async fn try_campaign(
325        &self,
326        service_name: &str,
327        id: &str,
328        ttl: i64,
329    ) -> MetaResult<ElectionRow> {
330        self.conn
331            .execute(Statement::from_sql_and_values(
332                DatabaseBackend::MySql,
333                format!(
334                    r#"INSERT
335            IGNORE
336        INTO {table} (service, id, last_heartbeat)
337        VALUES (?, ?, NOW())
338        ON duplicate KEY
339            UPDATE id             = if(last_heartbeat < NOW() - INTERVAL ? SECOND,
340                                       VALUES(id), id),
341                   last_heartbeat = if(id =
342                                       VALUES(id),
343                                       VALUES(last_heartbeat), last_heartbeat);"#,
344                    table = Self::election_table_name()
345                ),
346                vec![Value::from(service_name), Value::from(id), Value::from(ttl)],
347            ))
348            .await?;
349
350        let query_result = self
351            .conn
352            .query_one(Statement::from_sql_and_values(
353                DatabaseBackend::MySql,
354                format!(
355                    r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#,
356                    table = Self::election_table_name(),
357                ),
358                vec![Value::from(service_name)],
359            ))
360            .await?;
361
362        let row = query_result
363            .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
364            .transpose()?;
365
366        let row = row.ok_or_else(|| anyhow!("bad result from mysql"))?;
367
368        Ok(row)
369    }
370
371    async fn leader(&self, service_name: &str) -> MetaResult<Option<ElectionRow>> {
372        let query_result = self
373            .conn
374            .query_one(Statement::from_sql_and_values(
375                DatabaseBackend::MySql,
376                format!(
377                    r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#,
378                    table = Self::election_table_name()
379                ),
380                vec![Value::from(service_name)],
381            ))
382            .await?;
383
384        let row = query_result
385            .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
386            .transpose()?;
387
388        Ok(row)
389    }
390
391    async fn candidates(&self, service_name: &str) -> MetaResult<Vec<ElectionRow>> {
392        let all = self
393            .conn
394            .query_all(Statement::from_sql_and_values(
395                DatabaseBackend::MySql,
396                format!(
397                    r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#,
398                    table = Self::member_table_name()
399                ),
400                vec![Value::from(service_name)],
401            ))
402            .await?;
403
404        let rows = all
405            .into_iter()
406            .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
407            .collect::<Result<_, sea_orm::DbErr>>()?;
408
409        Ok(rows)
410    }
411
412    async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()> {
413        let txn = self.conn.begin().await?;
414
415        txn.execute(Statement::from_sql_and_values(
416            DatabaseBackend::MySql,
417            format!(
418                r#"
419            DELETE FROM {table} WHERE service = ? AND id = ?;
420            "#,
421                table = Self::election_table_name()
422            ),
423            vec![Value::from(service_name), Value::from(id)],
424        ))
425        .await?;
426
427        txn.execute(Statement::from_sql_and_values(
428            DatabaseBackend::MySql,
429            format!(
430                r#"
431            DELETE FROM {table} WHERE service = ? AND id = ?;
432            "#,
433                table = Self::member_table_name()
434            ),
435            vec![Value::from(service_name), Value::from(id)],
436        ))
437        .await?;
438
439        txn.commit().await?;
440
441        Ok(())
442    }
443
444    async fn trim_candidates(&self, service_name: &str, timeout: i64) -> MetaResult<()> {
445        self.conn
446            .execute(Statement::from_sql_and_values(
447                DatabaseBackend::MySql,
448                format!(
449                    r#"
450                    DELETE FROM {table} WHERE service = ? AND last_heartbeat < NOW() - INTERVAL ? SECOND;
451                    "#,
452                    table = Self::member_table_name()
453                ),
454                vec![Value::from(service_name), Value::from(timeout)],
455            ))
456            .await?;
457
458        Ok(())
459    }
460}
461
462#[async_trait::async_trait]
463impl SqlDriver for PostgresDriver {
464    async fn init_database(&self) -> MetaResult<()> {
465        self.conn.execute(
466            Statement::from_string(DatabaseBackend::Postgres, format!(
467                r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR, id VARCHAR, last_heartbeat TIMESTAMPTZ, PRIMARY KEY (service, id));"#,
468                table = Self::member_table_name()
469            ))).await?;
470
471        self.conn.execute(
472            Statement::from_string(DatabaseBackend::Postgres, format!(
473                r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR, id VARCHAR, last_heartbeat TIMESTAMPTZ, PRIMARY KEY (service));"#,
474                table = Self::election_table_name()
475            ))).await?;
476
477        Ok(())
478    }
479
480    async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()> {
481        self.conn
482            .execute(Statement::from_sql_and_values(
483                DatabaseBackend::Postgres,
484                format!(
485                    r#"INSERT INTO {table} (id, service, last_heartbeat)
486        VALUES($1, $2, NOW())
487        ON CONFLICT (id, service)
488        DO
489           UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat;
490        "#,
491                    table = Self::member_table_name()
492                ),
493                vec![Value::from(id), Value::from(service_name)],
494            ))
495            .await?;
496
497        Ok(())
498    }
499
500    async fn try_campaign(
501        &self,
502        service_name: &str,
503        id: &str,
504        ttl: i64,
505    ) -> MetaResult<ElectionRow> {
506        let query_result = self
507            .conn
508            .query_one(Statement::from_sql_and_values(
509                DatabaseBackend::Postgres,
510                format!(
511                    r#"INSERT INTO {table} (service, id, last_heartbeat)
512        VALUES ($1, $2, NOW())
513        ON CONFLICT (service)
514            DO UPDATE
515            SET id             = CASE
516                                     WHEN {table}.last_heartbeat < NOW() - $3::INTERVAL THEN EXCLUDED.id
517                                     ELSE {table}.id
518                END,
519                last_heartbeat = CASE
520                                     WHEN {table}.last_heartbeat < NOW() - $3::INTERVAL THEN EXCLUDED.last_heartbeat
521                                     WHEN {table}.id = EXCLUDED.id THEN EXCLUDED.last_heartbeat
522                                     ELSE {table}.last_heartbeat
523                    END
524        RETURNING service, id, last_heartbeat;
525        "#,
526                    table = Self::election_table_name()
527                ),
528                vec![
529                    Value::from(service_name),
530                    Value::from(id),
531                    // special handling for interval
532                    Value::from(ttl.to_string()),
533                ],
534            ))
535            .await?;
536
537        let row = query_result
538            .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
539            .transpose()?;
540
541        let row = row.ok_or_else(|| anyhow!("bad result from postgres"))?;
542
543        Ok(row)
544    }
545
546    async fn leader(&self, service_name: &str) -> MetaResult<Option<ElectionRow>> {
547        let query_result = self
548            .conn
549            .query_one(Statement::from_sql_and_values(
550                DatabaseBackend::Postgres,
551                format!(
552                    r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#,
553                    table = Self::election_table_name()
554                ),
555                vec![Value::from(service_name)],
556            ))
557            .await?;
558
559        let row = query_result
560            .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
561            .transpose()?;
562
563        Ok(row)
564    }
565
566    async fn candidates(&self, service_name: &str) -> MetaResult<Vec<ElectionRow>> {
567        let all = self
568            .conn
569            .query_all(Statement::from_sql_and_values(
570                DatabaseBackend::Postgres,
571                format!(
572                    r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#,
573                    table = Self::member_table_name()
574                ),
575                vec![Value::from(service_name)],
576            ))
577            .await?;
578
579        let rows = all
580            .into_iter()
581            .map(|query_result| ElectionRow::from_query_result(&query_result, ""))
582            .collect::<Result<_, sea_orm::DbErr>>()?;
583
584        Ok(rows)
585    }
586
587    async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()> {
588        let txn = self.conn.begin().await?;
589
590        txn.execute(Statement::from_sql_and_values(
591            DatabaseBackend::Postgres,
592            format!(
593                r#"
594            DELETE FROM {table} WHERE service = $1 AND id = $2;
595            "#,
596                table = Self::election_table_name()
597            ),
598            vec![Value::from(service_name), Value::from(id)],
599        ))
600        .await?;
601
602        txn.execute(Statement::from_sql_and_values(
603            DatabaseBackend::Postgres,
604            format!(
605                r#"
606            DELETE FROM {table} WHERE service = $1 AND id = $2;
607            "#,
608                table = Self::member_table_name()
609            ),
610            vec![Value::from(service_name), Value::from(id)],
611        ))
612        .await?;
613
614        txn.commit().await?;
615
616        Ok(())
617    }
618
619    async fn trim_candidates(&self, service_name: &str, timeout: i64) -> MetaResult<()> {
620        self.conn
621            .execute(Statement::from_sql_and_values(
622                DatabaseBackend::Postgres,
623                format!(
624                    r#"
625                    DELETE FROM {table} WHERE {table}.service = $1 AND {table}.last_heartbeat < NOW() - $2::INTERVAL;
626                    "#,
627                    table = Self::member_table_name()
628                ),
629                vec![Value::from(service_name), Value::from(timeout.to_string())],
630            ))
631            .await?;
632
633        Ok(())
634    }
635}
636
637#[async_trait::async_trait]
638impl<T> ElectionClient for SqlBackendElectionClient<T>
639where
640    T: SqlDriver + Send + Sync + 'static,
641{
642    async fn init(&self) -> MetaResult<()> {
643        tracing::info!("initializing database for Sql backend election client");
644        self.driver.init_database().await
645    }
646
647    fn id(&self) -> MetaResult<String> {
648        Ok(self.id.clone())
649    }
650
651    async fn run_once(&self, ttl: i64, stop: Receiver<()>) -> MetaResult<()> {
652        let stop = stop.clone();
653
654        let member_refresh_driver = self.driver.clone();
655
656        let id = self.id.clone();
657
658        let mut member_refresh_stop = stop.clone();
659
660        let handle = tokio::spawn(async move {
661            let mut ticker = tokio::time::interval(Duration::from_secs(1));
662
663            loop {
664                tokio::select! {
665                    _ = ticker.tick() => {
666
667                        if let Err(e) = member_refresh_driver
668                            .update_heartbeat(META_ELECTION_KEY, id.as_str())
669                            .await {
670
671                            tracing::debug!(error = %e.as_report(), "keep alive for member {} failed", id);
672                            continue
673                        }
674                    }
675                    _ = member_refresh_stop.changed() => {
676                        return;
677                    }
678                }
679            }
680        });
681
682        let _guard = scopeguard::guard(handle, |handle| handle.abort());
683
684        self.is_leader_sender.send_replace(false);
685
686        let mut timeout_ticker = time::interval(Duration::from_secs_f64(ttl as f64 / 2.0));
687        timeout_ticker.reset();
688        let mut stop = stop.clone();
689
690        let mut is_leader = false;
691
692        let mut election_ticker = time::interval(Duration::from_secs(1));
693
694        let mut prev_leader = "".to_owned();
695
696        loop {
697            tokio::select! {
698                    _ = election_ticker.tick() => {
699                        let election_row = self
700                            .driver
701                            .try_campaign(META_ELECTION_KEY, self.id.as_str(), ttl)
702                            .await?;
703
704                        assert_eq!(election_row.service, META_ELECTION_KEY);
705
706                        if election_row.id.eq_ignore_ascii_case(self.id.as_str()) {
707                            if !is_leader{
708                                self.is_leader_sender.send_replace(true);
709                                is_leader = true;
710                            } else {
711                                self.is_leader_sender.send_replace(false);
712                            }
713                        } else if is_leader {
714                            tracing::warn!("leader has been changed to {}", election_row.id);
715                            break;
716                        } else if prev_leader != election_row.id {
717                            tracing::info!("leader is {}", election_row.id);
718                            prev_leader.clone_from(&election_row.id)
719                        }
720
721                        timeout_ticker.reset();
722
723                        if is_leader {
724                            if let Err(e) = self.driver.trim_candidates(META_ELECTION_KEY, ttl * 2).await {
725                                tracing::warn!(error = %e.as_report(), "trim candidates failed");
726                            }
727                        }
728                    }
729                _ = timeout_ticker.tick() => {
730                    tracing::error!("member {} election timeout", self.id);
731                    break;
732                }
733                _ = stop.changed() => {
734                    tracing::info!("stop signal received when observing");
735
736                    if is_leader {
737                        tracing::info!("leader {} resigning", self.id);
738                        if let Err(e) = self.driver.resign(META_ELECTION_KEY, self.id.as_str()).await {
739                            tracing::warn!(error = %e.as_report(), "resign failed");
740                        }
741                    }
742
743                    return Ok(());
744                }
745            }
746        }
747        self.is_leader_sender.send_replace(false);
748
749        return Ok(());
750    }
751
752    fn subscribe(&self) -> Receiver<bool> {
753        self.is_leader_sender.subscribe()
754    }
755
756    async fn leader(&self) -> MetaResult<Option<ElectionMember>> {
757        let row = self.driver.leader(META_ELECTION_KEY).await?;
758        Ok(row.map(|row| ElectionMember {
759            id: row.id,
760            is_leader: true,
761        }))
762    }
763
764    async fn get_members(&self) -> MetaResult<Vec<ElectionMember>> {
765        let leader = self.leader().await?;
766        let members = self.driver.candidates(META_ELECTION_KEY).await?;
767
768        Ok(members
769            .into_iter()
770            .map(|row| {
771                let is_leader = leader
772                    .as_ref()
773                    .map(|leader| leader.id.eq_ignore_ascii_case(row.id.as_str()))
774                    .unwrap_or(false);
775
776                ElectionMember {
777                    id: row.id,
778                    is_leader,
779                }
780            })
781            .collect())
782    }
783
784    fn is_leader(&self) -> bool {
785        *self.is_leader_sender.borrow()
786    }
787}
788
789#[cfg(not(madsim))]
790#[cfg(test)]
791mod tests {
792    use std::sync::Arc;
793
794    use sea_orm::{ConnectionTrait, Database, DatabaseConnection, DbBackend, Statement};
795    use tokio::sync::watch;
796
797    use crate::rpc::election::sql::{SqlBackendElectionClient, SqlDriverCommon, SqliteDriver};
798    use crate::{ElectionClient, MetaResult};
799
800    async fn prepare_sqlite_env() -> MetaResult<DatabaseConnection> {
801        let db: DatabaseConnection = Database::connect("sqlite::memory:").await?;
802
803        db.execute(Statement::from_sql_and_values(
804            DbBackend::Sqlite,
805            format!("CREATE TABLE {table} (service VARCHAR(256) PRIMARY KEY, id VARCHAR(256), last_heartbeat DATETIME)",
806                    table = SqliteDriver::election_table_name()),
807            vec![],
808        ))
809            .await?;
810
811        db.execute(Statement::from_sql_and_values(
812            DbBackend::Sqlite,
813            format!("CREATE TABLE {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service, id))",
814                    table = SqliteDriver::member_table_name()),
815            vec![],
816        ))
817            .await?;
818
819        Ok(db)
820    }
821
822    #[tokio::test]
823    async fn test_sql_election() {
824        let id = "test_id".to_owned();
825        let conn = prepare_sqlite_env().await.unwrap();
826
827        let provider = SqliteDriver { conn };
828        let (sender, _) = watch::channel(false);
829        let sql_election_client: Arc<dyn ElectionClient> = Arc::new(SqlBackendElectionClient {
830            id,
831            driver: Arc::new(provider),
832            is_leader_sender: sender,
833        });
834        let (stop_sender, _) = watch::channel(());
835
836        let stop_receiver = stop_sender.subscribe();
837
838        let mut receiver = sql_election_client.subscribe();
839        let client_ = sql_election_client.clone();
840        tokio::spawn(async move { client_.run_once(10, stop_receiver).await.unwrap() });
841
842        loop {
843            receiver.changed().await.unwrap();
844            if *receiver.borrow() {
845                assert!(sql_election_client.is_leader());
846                break;
847            }
848        }
849    }
850
851    #[tokio::test]
852    async fn test_sql_election_multi() {
853        let (stop_sender, _) = watch::channel(());
854
855        let mut clients = vec![];
856
857        let conn = prepare_sqlite_env().await.unwrap();
858        for i in 1..3 {
859            let id = format!("test_id_{}", i);
860            let provider = SqliteDriver { conn: conn.clone() };
861            let (sender, _) = watch::channel(false);
862            let sql_election_client: Arc<dyn ElectionClient> = Arc::new(SqlBackendElectionClient {
863                id,
864                driver: Arc::new(provider),
865                is_leader_sender: sender,
866            });
867
868            let stop_receiver = stop_sender.subscribe();
869            let client_ = sql_election_client.clone();
870            tokio::spawn(async move { client_.run_once(10, stop_receiver).await.unwrap() });
871            clients.push(sql_election_client);
872        }
873
874        let mut is_leaders = vec![];
875
876        for client in clients {
877            is_leaders.push(client.is_leader());
878        }
879
880        assert!(is_leaders.iter().filter(|&x| *x).count() <= 1);
881    }
882}