risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch_converter.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use risingwave_common::array::{
19    ArrayBuilder, ArrayImpl, JsonbArrayBuilder, StreamChunk, Utf8ArrayBuilder,
20};
21use risingwave_common::catalog::Schema;
22use risingwave_common::types::{JsonbVal, Scalar};
23use serde_json::Value;
24
25use super::elasticsearch_opensearch_config::{
26    ES_OPTION_DELIMITER, ES_OPTION_INDEX, ES_OPTION_INDEX_COLUMN, ES_OPTION_ROUTING_COLUMN,
27};
28use super::elasticsearch_opensearch_formatter::{BuildBulkPara, ElasticSearchOpenSearchFormatter};
29use crate::sink::Result;
30
31#[expect(clippy::large_enum_variant)]
32pub enum StreamChunkConverter {
33    Es(EsStreamChunkConverter),
34    Other,
35}
36impl StreamChunkConverter {
37    pub fn new(
38        sink_name: &str,
39        schema: Schema,
40        pk_indices: &Vec<usize>,
41        properties: &BTreeMap<String, String>,
42        is_append_only: bool,
43    ) -> Result<Self> {
44        if is_remote_es_sink(sink_name) {
45            let index_column = properties
46                .get(ES_OPTION_INDEX_COLUMN)
47                .cloned()
48                .map(|n| {
49                    schema
50                        .fields()
51                        .iter()
52                        .position(|s| s.name == n)
53                        .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN))
54                })
55                .transpose()?;
56            let index = properties.get(ES_OPTION_INDEX).cloned();
57            let routing_column = properties
58                .get(ES_OPTION_ROUTING_COLUMN)
59                .cloned()
60                .map(|n| {
61                    schema
62                        .fields()
63                        .iter()
64                        .position(|s| s.name == n)
65                        .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_ROUTING_COLUMN))
66                })
67                .transpose()?;
68            Ok(StreamChunkConverter::Es(EsStreamChunkConverter::new(
69                schema,
70                pk_indices.clone(),
71                properties.get(ES_OPTION_DELIMITER).cloned(),
72                index_column,
73                index,
74                routing_column,
75                is_append_only,
76            )?))
77        } else {
78            Ok(StreamChunkConverter::Other)
79        }
80    }
81
82    pub fn convert_chunk(&self, chunk: StreamChunk) -> Result<StreamChunk> {
83        match self {
84            StreamChunkConverter::Es(es) => es.convert_chunk(chunk, es.is_append_only),
85            StreamChunkConverter::Other => Ok(chunk),
86        }
87    }
88}
89pub struct EsStreamChunkConverter {
90    formatter: ElasticSearchOpenSearchFormatter,
91    is_append_only: bool,
92}
93impl EsStreamChunkConverter {
94    pub fn new(
95        schema: Schema,
96        pk_indices: Vec<usize>,
97        delimiter: Option<String>,
98        index_column: Option<usize>,
99        index: Option<String>,
100        routing_column: Option<usize>,
101        is_append_only: bool,
102    ) -> Result<Self> {
103        let formatter = ElasticSearchOpenSearchFormatter::new(
104            pk_indices,
105            &schema,
106            delimiter,
107            index_column,
108            index,
109            routing_column,
110        )?;
111        Ok(Self {
112            formatter,
113            is_append_only,
114        })
115    }
116
117    fn convert_chunk(&self, chunk: StreamChunk, is_append_only: bool) -> Result<StreamChunk> {
118        let mut ops = Vec::with_capacity(chunk.capacity());
119        let mut id_string_builder =
120            <Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
121        let mut json_builder =
122            <JsonbArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
123        let mut index_builder =
124            <Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
125        let mut routing_builder =
126            <Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
127        for build_bulk_para in self.formatter.convert_chunk(chunk, is_append_only)? {
128            let BuildBulkPara {
129                key,
130                value,
131                index,
132                routing_column,
133                ..
134            } = build_bulk_para;
135
136            id_string_builder.append(Some(&key));
137            index_builder.append(Some(&index));
138            routing_builder.append(routing_column.as_deref());
139            if value.is_some() {
140                ops.push(risingwave_common::array::Op::Insert);
141            } else {
142                ops.push(risingwave_common::array::Op::Delete);
143            }
144            let value = value.map(|json| JsonbVal::from(Value::Object(json)));
145            json_builder.append(value.as_ref().map(|json| json.as_scalar_ref()));
146        }
147        let json_array = risingwave_common::array::ArrayBuilder::finish(json_builder);
148        let id_string_array = risingwave_common::array::ArrayBuilder::finish(id_string_builder);
149        let index_string_array = risingwave_common::array::ArrayBuilder::finish(index_builder);
150        let routing_string_array = risingwave_common::array::ArrayBuilder::finish(routing_builder);
151        Ok(StreamChunk::new(
152            ops,
153            vec![
154                std::sync::Arc::new(ArrayImpl::Utf8(index_string_array)),
155                std::sync::Arc::new(ArrayImpl::Utf8(id_string_array)),
156                std::sync::Arc::new(ArrayImpl::Jsonb(json_array)),
157                std::sync::Arc::new(ArrayImpl::Utf8(routing_string_array)),
158            ],
159        ))
160    }
161}
162
163pub fn is_remote_es_sink(_sink_name: &str) -> bool {
164    // sink_name == ElasticSearchJavaSink::SINK_NAME || sink_name == OpenSearchJavaSink::SINK_NAME
165    false
166}