risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::Schema;
16use tonic::async_trait;
17
18use super::super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriterExt};
19use super::super::{Sink, SinkError, SinkParam, SinkWriterParam};
20use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchSinkWriter;
21use super::elasticsearch_opensearch_config::{ElasticSearchConfig, ElasticSearchOpenSearchConfig};
22use crate::enforce_secret::EnforceSecret;
23use crate::sink::Result;
24
25pub const ES_SINK: &str = "elasticsearch";
26
27#[derive(Debug)]
28pub struct ElasticSearchSink {
29    config: ElasticSearchOpenSearchConfig,
30    schema: Schema,
31    pk_indices: Vec<usize>,
32    is_append_only: bool,
33}
34
35impl EnforceSecret for ElasticSearchSink {
36    fn enforce_secret<'a>(
37        prop_iter: impl Iterator<Item = &'a str>,
38    ) -> crate::error::ConnectorResult<()> {
39        for prop in prop_iter {
40            ElasticSearchOpenSearchConfig::enforce_one(prop)?;
41        }
42        Ok(())
43    }
44}
45
46#[async_trait]
47impl TryFrom<SinkParam> for ElasticSearchSink {
48    type Error = SinkError;
49
50    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
51        let schema = param.schema();
52        let pk_indices = param.downstream_pk_or_empty();
53        let config = ElasticSearchConfig::from_btreemap(param.properties)?.inner;
54        Ok(Self {
55            config,
56            schema,
57            pk_indices,
58            is_append_only: param.sink_type.is_append_only(),
59        })
60    }
61}
62
63impl Sink for ElasticSearchSink {
64    type LogSinker = AsyncTruncateLogSinkerOf<ElasticSearchOpenSearchSinkWriter>;
65
66    const SINK_NAME: &'static str = ES_SINK;
67
68    fn support_schema_change() -> bool {
69        true
70    }
71
72    async fn validate(&self) -> Result<()> {
73        self.config.validate_config(&self.schema)?;
74        let client = self.config.build_client(Self::SINK_NAME)?;
75        client.ping().await?;
76        Ok(())
77    }
78
79    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
80        Ok(ElasticSearchOpenSearchSinkWriter::new(
81            self.config.clone(),
82            self.schema.clone(),
83            self.pk_indices.clone(),
84            Self::SINK_NAME,
85            self.is_append_only,
86        )?
87        .into_log_sinker(self.config.concurrent_requests))
88    }
89}