risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::Schema;
16use tonic::async_trait;
17
18use super::super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriterExt};
19use super::super::{DummySinkCommitCoordinator, Sink, SinkError, SinkParam, SinkWriterParam};
20use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchSinkWriter;
21use super::elasticsearch_opensearch_config::ElasticSearchOpenSearchConfig;
22use crate::sink::Result;
23
24pub const ES_SINK: &str = "elasticsearch";
25
26#[derive(Debug)]
27pub struct ElasticSearchSink {
28    config: ElasticSearchOpenSearchConfig,
29    schema: Schema,
30    pk_indices: Vec<usize>,
31    is_append_only: bool,
32}
33
34#[async_trait]
35impl TryFrom<SinkParam> for ElasticSearchSink {
36    type Error = SinkError;
37
38    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
39        let schema = param.schema();
40        let config = ElasticSearchOpenSearchConfig::from_btreemap(param.properties)?;
41        Ok(Self {
42            config,
43            schema,
44            pk_indices: param.downstream_pk,
45            is_append_only: param.sink_type.is_append_only(),
46        })
47    }
48}
49
50impl Sink for ElasticSearchSink {
51    type Coordinator = DummySinkCommitCoordinator;
52    type LogSinker = AsyncTruncateLogSinkerOf<ElasticSearchOpenSearchSinkWriter>;
53
54    const SINK_NAME: &'static str = ES_SINK;
55
56    async fn validate(&self) -> Result<()> {
57        self.config.validate_config(&self.schema)?;
58        let client = self.config.build_client(Self::SINK_NAME)?;
59        client.ping().await?;
60        Ok(())
61    }
62
63    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
64        Ok(ElasticSearchOpenSearchSinkWriter::new(
65            self.config.clone(),
66            self.schema.clone(),
67            self.pk_indices.clone(),
68            Self::SINK_NAME,
69            self.is_append_only,
70        )?
71        .into_log_sinker(self.config.concurrent_requests))
72    }
73}