risingwave_connector/source/nats/enumerator/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use async_trait::async_trait;
18use risingwave_common::bail;
19
20use super::NatsProperties;
21use super::source::{NatsOffset, NatsSplit};
22use crate::error::ConnectorResult;
23use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId};
24
25#[derive(Debug, Clone)]
26pub struct NatsSplitEnumerator {
27    subject: String,
28    #[expect(dead_code)]
29    split_id: SplitId,
30    client: async_nats::Client,
31}
32
33#[async_trait]
34impl SplitEnumerator for NatsSplitEnumerator {
35    type Properties = NatsProperties;
36    type Split = NatsSplit;
37
38    async fn new(
39        properties: Self::Properties,
40        _context: SourceEnumeratorContextRef,
41    ) -> ConnectorResult<NatsSplitEnumerator> {
42        let client = properties.common.build_client().await?;
43        Ok(Self {
44            subject: properties.common.subject,
45            split_id: Arc::from("0"),
46            client,
47        })
48    }
49
50    async fn list_splits(&mut self) -> ConnectorResult<Vec<NatsSplit>> {
51        // Nats currently does not support list_splits API, if we simple return the default 0 without checking the client status, will result executor crash
52        let state = self.client.connection_state();
53        if state != async_nats::connection::State::Connected {
54            bail!(
55                "Nats connection status is not connected, current status is {:?}",
56                state
57            );
58        }
59        // TODO: to simplify the logic, return 1 split for first version
60        let nats_split = NatsSplit {
61            subject: self.subject.clone(),
62            split_id: Arc::from("0"), // be the same as `from_nats_jetstream_message`
63            start_sequence: NatsOffset::None,
64        };
65
66        Ok(vec![nats_split])
67    }
68}