risingwave_connector/sink/elasticsearch_opensearch/
opensearch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::catalog::Schema;
17use risingwave_common::session_config::sink_decouple::SinkDecouple;
18use tonic::async_trait;
19
20use super::super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriterExt};
21use super::super::{DummySinkCommitCoordinator, Sink, SinkError, SinkParam, SinkWriterParam};
22use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchSinkWriter;
23use super::elasticsearch_opensearch_config::ElasticSearchOpenSearchConfig;
24use crate::sink::Result;
25
26pub const OPENSEARCH_SINK: &str = "opensearch";
27
28#[derive(Debug)]
29pub struct OpenSearchSink {
30    config: ElasticSearchOpenSearchConfig,
31    schema: Schema,
32    pk_indices: Vec<usize>,
33    is_append_only: bool,
34}
35
36#[async_trait]
37impl TryFrom<SinkParam> for OpenSearchSink {
38    type Error = SinkError;
39
40    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
41        let schema = param.schema();
42        let config = ElasticSearchOpenSearchConfig::from_btreemap(param.properties)?;
43        Ok(Self {
44            config,
45            schema,
46            pk_indices: param.downstream_pk,
47            is_append_only: param.sink_type.is_append_only(),
48        })
49    }
50}
51
52impl Sink for OpenSearchSink {
53    type Coordinator = DummySinkCommitCoordinator;
54    type LogSinker = AsyncTruncateLogSinkerOf<ElasticSearchOpenSearchSinkWriter>;
55
56    const SINK_NAME: &'static str = OPENSEARCH_SINK;
57
58    async fn validate(&self) -> Result<()> {
59        risingwave_common::license::Feature::OpenSearchSink
60            .check_available()
61            .map_err(|e| anyhow::anyhow!(e))?;
62        self.config.validate_config(&self.schema)?;
63        let client = self.config.build_client(Self::SINK_NAME)?;
64        client.ping().await?;
65        Ok(())
66    }
67
68    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
69        Ok(ElasticSearchOpenSearchSinkWriter::new(
70            self.config.clone(),
71            self.schema.clone(),
72            self.pk_indices.clone(),
73            Self::SINK_NAME,
74            self.is_append_only,
75        )?
76        .into_log_sinker(self.config.concurrent_requests))
77    }
78
79    fn set_default_commit_checkpoint_interval(
80        desc: &mut crate::sink::catalog::desc::SinkDesc,
81        user_specified: &risingwave_common::session_config::sink_decouple::SinkDecouple,
82    ) -> Result<()> {
83        if crate::sink::is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
84            match desc
85                .properties
86                .get(crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL)
87            {
88                Some(commit_checkpoint_interval) => {
89                    let commit_checkpoint_interval = commit_checkpoint_interval
90                        .parse::<u64>()
91                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
92                    if std::matches!(user_specified, SinkDecouple::Disable)
93                        && commit_checkpoint_interval > 1
94                    {
95                        return Err(SinkError::Config(anyhow!(
96                            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
97                        )));
98                    }
99                }
100                None => match user_specified {
101                    risingwave_common::session_config::sink_decouple::SinkDecouple::Default
102                    | risingwave_common::session_config::sink_decouple::SinkDecouple::Enable => {
103                        desc.properties.insert(
104                            crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL.to_owned(),
105                            crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
106                        );
107                    }
108                    risingwave_common::session_config::sink_decouple::SinkDecouple::Disable => {
109                        desc.properties.insert(
110                            crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL.to_owned(),
111                            crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
112                        );
113                    }
114                },
115            }
116        }
117        Ok(())
118    }
119
120    fn is_sink_decouple(
121        user_specified: &risingwave_common::session_config::sink_decouple::SinkDecouple,
122    ) -> Result<bool> {
123        match user_specified {
124            risingwave_common::session_config::sink_decouple::SinkDecouple::Default
125            | risingwave_common::session_config::sink_decouple::SinkDecouple::Enable => Ok(true),
126            risingwave_common::session_config::sink_decouple::SinkDecouple::Disable => Ok(false),
127        }
128    }
129}