risingwave_connector/sink/elasticsearch_opensearch/
opensearch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::catalog::Schema;
17use risingwave_common::session_config::sink_decouple::SinkDecouple;
18use tonic::async_trait;
19
20use super::super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriterExt};
21use super::super::{DummySinkCommitCoordinator, Sink, SinkError, SinkParam, SinkWriterParam};
22use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchSinkWriter;
23use super::elasticsearch_opensearch_config::ElasticSearchOpenSearchConfig;
24use crate::enforce_secret::EnforceSecret;
25use crate::sink::Result;
26
27pub const OPENSEARCH_SINK: &str = "opensearch";
28
29#[derive(Debug)]
30pub struct OpenSearchSink {
31    config: ElasticSearchOpenSearchConfig,
32    schema: Schema,
33    pk_indices: Vec<usize>,
34    is_append_only: bool,
35}
36
37impl EnforceSecret for OpenSearchSink {
38    fn enforce_secret<'a>(
39        prop_iter: impl Iterator<Item = &'a str>,
40    ) -> crate::error::ConnectorResult<()> {
41        for prop in prop_iter {
42            ElasticSearchOpenSearchConfig::enforce_one(prop)?;
43        }
44        Ok(())
45    }
46}
47#[async_trait]
48impl TryFrom<SinkParam> for OpenSearchSink {
49    type Error = SinkError;
50
51    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
52        let schema = param.schema();
53        let config = ElasticSearchOpenSearchConfig::from_btreemap(param.properties)?;
54        Ok(Self {
55            config,
56            schema,
57            pk_indices: param.downstream_pk,
58            is_append_only: param.sink_type.is_append_only(),
59        })
60    }
61}
62
63impl Sink for OpenSearchSink {
64    type Coordinator = DummySinkCommitCoordinator;
65    type LogSinker = AsyncTruncateLogSinkerOf<ElasticSearchOpenSearchSinkWriter>;
66
67    const SINK_NAME: &'static str = OPENSEARCH_SINK;
68
69    async fn validate(&self) -> Result<()> {
70        risingwave_common::license::Feature::OpenSearchSink
71            .check_available()
72            .map_err(|e| anyhow::anyhow!(e))?;
73        self.config.validate_config(&self.schema)?;
74        let client = self.config.build_client(Self::SINK_NAME)?;
75        client.ping().await?;
76        Ok(())
77    }
78
79    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
80        Ok(ElasticSearchOpenSearchSinkWriter::new(
81            self.config.clone(),
82            self.schema.clone(),
83            self.pk_indices.clone(),
84            Self::SINK_NAME,
85            self.is_append_only,
86        )?
87        .into_log_sinker(self.config.concurrent_requests))
88    }
89
90    fn set_default_commit_checkpoint_interval(
91        desc: &mut crate::sink::catalog::desc::SinkDesc,
92        user_specified: &risingwave_common::session_config::sink_decouple::SinkDecouple,
93    ) -> Result<()> {
94        if crate::sink::is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
95            match desc
96                .properties
97                .get(crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL)
98            {
99                Some(commit_checkpoint_interval) => {
100                    let commit_checkpoint_interval = commit_checkpoint_interval
101                        .parse::<u64>()
102                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
103                    if std::matches!(user_specified, SinkDecouple::Disable)
104                        && commit_checkpoint_interval > 1
105                    {
106                        return Err(SinkError::Config(anyhow!(
107                            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
108                        )));
109                    }
110                }
111                None => match user_specified {
112                    risingwave_common::session_config::sink_decouple::SinkDecouple::Default
113                    | risingwave_common::session_config::sink_decouple::SinkDecouple::Enable => {
114                        desc.properties.insert(
115                            crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL.to_owned(),
116                            crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
117                        );
118                    }
119                    risingwave_common::session_config::sink_decouple::SinkDecouple::Disable => {
120                        desc.properties.insert(
121                            crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL.to_owned(),
122                            crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
123                        );
124                    }
125                },
126            }
127        }
128        Ok(())
129    }
130
131    fn is_sink_decouple(
132        user_specified: &risingwave_common::session_config::sink_decouple::SinkDecouple,
133    ) -> Result<bool> {
134        match user_specified {
135            risingwave_common::session_config::sink_decouple::SinkDecouple::Default
136            | risingwave_common::session_config::sink_decouple::SinkDecouple::Enable => Ok(true),
137            risingwave_common::session_config::sink_decouple::SinkDecouple::Disable => Ok(false),
138        }
139    }
140}