1use core::pin::Pin;
16use core::time::Duration;
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use anyhow::{Context, anyhow};
20use futures::future::pending;
21use futures::prelude::Future;
22use futures::{Stream, StreamExt};
23use futures_async_stream::try_stream;
24use gcp_bigquery_client::Client;
25use gcp_bigquery_client::error::BQError;
26use gcp_bigquery_client::model::query_request::QueryRequest;
27use gcp_bigquery_client::model::table::Table;
28use gcp_bigquery_client::model::table_field_schema::TableFieldSchema;
29use gcp_bigquery_client::model::table_schema::TableSchema;
30use google_cloud_bigquery::grpc::apiv1::conn_pool::ConnectionManager;
31use google_cloud_gax::conn::{ConnectionOptions, Environment};
32use google_cloud_gax::grpc::Request;
33use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::{
34 MissingValueInterpretation, ProtoData, Rows as AppendRowsRequestRows,
35};
36use google_cloud_googleapis::cloud::bigquery::storage::v1::{
37 AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema,
38};
39use google_cloud_pubsub::client::google_cloud_auth;
40use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
41use phf::{Set, phf_set};
42use prost_reflect::{FieldDescriptor, MessageDescriptor};
43use prost_types::{
44 DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet,
45 field_descriptor_proto,
46};
47use risingwave_common::array::{Op, StreamChunk};
48use risingwave_common::catalog::{Field, Schema};
49use risingwave_common::types::DataType;
50use serde_derive::Deserialize;
51use serde_with::{DisplayFromStr, serde_as};
52use simd_json::prelude::ArrayTrait;
53use tokio::sync::mpsc;
54use tonic::{Response, Status, async_trait};
55use url::Url;
56use uuid::Uuid;
57use with_options::WithOptions;
58use yup_oauth2::ServiceAccountKey;
59
60use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo};
61use super::log_store::{LogStoreReadItem, TruncateOffset};
62use super::{
63 LogSinker, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkLogReader,
64};
65use crate::aws_utils::load_file_descriptor_from_s3;
66use crate::connector_common::AwsAuthProps;
67use crate::enforce_secret::EnforceSecret;
68use crate::sink::{Result, Sink, SinkParam, SinkWriterParam};
69
70pub const BIGQUERY_SINK: &str = "bigquery";
71pub const CHANGE_TYPE: &str = "_CHANGE_TYPE";
72const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4;
73const CONNECT_TIMEOUT: Option<Duration> = Some(Duration::from_secs(30));
74const CONNECTION_TIMEOUT: Option<Duration> = None;
75const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
76const MAX_ROW_SIZE: usize = 8 * 1024 * 1024;
78
79#[serde_as]
80#[derive(Deserialize, Debug, Clone, WithOptions)]
81pub struct BigQueryCommon {
82 #[serde(rename = "bigquery.local.path")]
83 pub local_path: Option<String>,
84 #[serde(rename = "bigquery.s3.path")]
85 pub s3_path: Option<String>,
86 #[serde(rename = "bigquery.project")]
87 pub project: String,
88 #[serde(rename = "bigquery.dataset")]
89 pub dataset: String,
90 #[serde(rename = "bigquery.table")]
91 pub table: String,
92 #[serde(default)] #[serde_as(as = "DisplayFromStr")]
94 pub auto_create: bool,
95 #[serde(rename = "bigquery.credentials")]
96 pub credentials: Option<String>,
97}
98
99impl EnforceSecret for BigQueryCommon {
100 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
101 "bigquery.credentials",
102 };
103}
104
105struct BigQueryFutureManager {
106 offset_queue: VecDeque<(TruncateOffset, usize)>,
113 resp_stream: Pin<Box<dyn Stream<Item = Result<()>> + Send>>,
114}
115impl BigQueryFutureManager {
116 pub fn new(
117 max_future_num: usize,
118 resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
119 ) -> Self {
120 let offset_queue = VecDeque::with_capacity(max_future_num);
121 Self {
122 offset_queue,
123 resp_stream: Box::pin(resp_stream),
124 }
125 }
126
127 pub fn add_offset(&mut self, offset: TruncateOffset, resp_num: usize) {
128 self.offset_queue.push_back((offset, resp_num));
129 }
130
131 pub async fn next_offset(&mut self) -> Result<TruncateOffset> {
132 if let Some((_offset, remaining_resp_num)) = self.offset_queue.front_mut() {
133 if *remaining_resp_num == 0 {
134 return Ok(self.offset_queue.pop_front().unwrap().0);
135 }
136 while *remaining_resp_num > 0 {
137 self.resp_stream
138 .next()
139 .await
140 .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??;
141 *remaining_resp_num -= 1;
142 }
143 Ok(self.offset_queue.pop_front().unwrap().0)
144 } else {
145 pending().await
146 }
147 }
148}
149pub struct BigQueryLogSinker {
150 writer: BigQuerySinkWriter,
151 bigquery_future_manager: BigQueryFutureManager,
152 future_num: usize,
153}
154impl BigQueryLogSinker {
155 pub fn new(
156 writer: BigQuerySinkWriter,
157 resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
158 future_num: usize,
159 ) -> Self {
160 Self {
161 writer,
162 bigquery_future_manager: BigQueryFutureManager::new(future_num, resp_stream),
163 future_num,
164 }
165 }
166}
167
168#[async_trait]
169impl LogSinker for BigQueryLogSinker {
170 async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
171 log_reader.start_from(None).await?;
172 loop {
173 tokio::select!(
174 offset = self.bigquery_future_manager.next_offset() => {
175 log_reader.truncate(offset?)?;
176 }
177 item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => {
178 let (epoch, item) = item_result?;
179 match item {
180 LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
181 let resp_num = self.writer.write_chunk(chunk)?;
182 self.bigquery_future_manager
183 .add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num);
184 }
185 LogStoreReadItem::Barrier { .. } => {
186 self.bigquery_future_manager
187 .add_offset(TruncateOffset::Barrier { epoch },0);
188 }
189 }
190 }
191 )
192 }
193 }
194}
195
196impl BigQueryCommon {
197 async fn build_client(&self, aws_auth_props: &AwsAuthProps) -> Result<Client> {
198 let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
199
200 let service_account = serde_json::from_str::<ServiceAccountKey>(&auth_json)
201 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
202 let client: Client = Client::from_service_account_key(service_account, false)
203 .await
204 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
205 Ok(client)
206 }
207
208 async fn build_writer_client(
209 &self,
210 aws_auth_props: &AwsAuthProps,
211 ) -> Result<(StorageWriterClient, impl Stream<Item = Result<()>> + use<>)> {
212 let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
213
214 let credentials_file = CredentialsFile::new_from_str(&auth_json)
215 .await
216 .map_err(|e| SinkError::BigQuery(e.into()))?;
217 StorageWriterClient::new(credentials_file).await
218 }
219
220 async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> {
221 if let Some(credentials) = &self.credentials {
222 Ok(credentials.clone())
223 } else if let Some(local_path) = &self.local_path {
224 std::fs::read_to_string(local_path)
225 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))
226 } else if let Some(s3_path) = &self.s3_path {
227 let url =
228 Url::parse(s3_path).map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
229 let auth_vec = load_file_descriptor_from_s3(&url, aws_auth_props)
230 .await
231 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
232 Ok(String::from_utf8(auth_vec).map_err(|e| SinkError::BigQuery(e.into()))?)
233 } else {
234 Err(SinkError::BigQuery(anyhow::anyhow!(
235 "`bigquery.local.path` and `bigquery.s3.path` set at least one, configure as needed."
236 )))
237 }
238 }
239}
240
241#[serde_as]
242#[derive(Clone, Debug, Deserialize, WithOptions)]
243pub struct BigQueryConfig {
244 #[serde(flatten)]
245 pub common: BigQueryCommon,
246 #[serde(flatten)]
247 pub aws_auth_props: AwsAuthProps,
248 pub r#type: String, }
250
251impl EnforceSecret for BigQueryConfig {
252 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
253 BigQueryCommon::enforce_one(prop)?;
254 AwsAuthProps::enforce_one(prop)?;
255 Ok(())
256 }
257}
258
259impl BigQueryConfig {
260 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
261 let config =
262 serde_json::from_value::<BigQueryConfig>(serde_json::to_value(properties).unwrap())
263 .map_err(|e| SinkError::Config(anyhow!(e)))?;
264 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
265 return Err(SinkError::Config(anyhow!(
266 "`{}` must be {}, or {}",
267 SINK_TYPE_OPTION,
268 SINK_TYPE_APPEND_ONLY,
269 SINK_TYPE_UPSERT
270 )));
271 }
272 Ok(config)
273 }
274}
275
276#[derive(Debug)]
277pub struct BigQuerySink {
278 pub config: BigQueryConfig,
279 schema: Schema,
280 pk_indices: Vec<usize>,
281 is_append_only: bool,
282}
283
284impl EnforceSecret for BigQuerySink {
285 fn enforce_secret<'a>(
286 prop_iter: impl Iterator<Item = &'a str>,
287 ) -> crate::error::ConnectorResult<()> {
288 for prop in prop_iter {
289 BigQueryConfig::enforce_one(prop)?;
290 }
291 Ok(())
292 }
293}
294
295impl BigQuerySink {
296 pub fn new(
297 config: BigQueryConfig,
298 schema: Schema,
299 pk_indices: Vec<usize>,
300 is_append_only: bool,
301 ) -> Result<Self> {
302 Ok(Self {
303 config,
304 schema,
305 pk_indices,
306 is_append_only,
307 })
308 }
309}
310
311impl BigQuerySink {
312 fn check_column_name_and_type(
313 &self,
314 big_query_columns_desc: HashMap<String, String>,
315 ) -> Result<()> {
316 let rw_fields_name = self.schema.fields();
317 if big_query_columns_desc.is_empty() {
318 return Err(SinkError::BigQuery(anyhow::anyhow!(
319 "Cannot find table in bigquery"
320 )));
321 }
322 if rw_fields_name.len().ne(&big_query_columns_desc.len()) {
323 return Err(SinkError::BigQuery(anyhow::anyhow!(
324 "The length of the RisingWave column {} must be equal to the length of the bigquery column {}",
325 rw_fields_name.len(),
326 big_query_columns_desc.len()
327 )));
328 }
329
330 for i in rw_fields_name {
331 let value = big_query_columns_desc.get(&i.name).ok_or_else(|| {
332 SinkError::BigQuery(anyhow::anyhow!(
333 "Column `{:?}` on RisingWave side is not found on BigQuery side.",
334 i.name
335 ))
336 })?;
337 let data_type_string = Self::get_string_and_check_support_from_datatype(&i.data_type)?;
338 if data_type_string.ne(value) {
339 return Err(SinkError::BigQuery(anyhow::anyhow!(
340 "Data type mismatch for column `{:?}`. BigQuery side: `{:?}`, RisingWave side: `{:?}`. ",
341 i.name,
342 value,
343 data_type_string
344 )));
345 };
346 }
347 Ok(())
348 }
349
350 fn get_string_and_check_support_from_datatype(rw_data_type: &DataType) -> Result<String> {
351 match rw_data_type {
352 DataType::Boolean => Ok("BOOL".to_owned()),
353 DataType::Int16 => Ok("INT64".to_owned()),
354 DataType::Int32 => Ok("INT64".to_owned()),
355 DataType::Int64 => Ok("INT64".to_owned()),
356 DataType::Float32 => Err(SinkError::BigQuery(anyhow::anyhow!(
357 "REAL is not supported for BigQuery sink. Please convert to FLOAT64 or other supported types."
358 ))),
359 DataType::Float64 => Ok("FLOAT64".to_owned()),
360 DataType::Decimal => Ok("NUMERIC".to_owned()),
361 DataType::Date => Ok("DATE".to_owned()),
362 DataType::Varchar => Ok("STRING".to_owned()),
363 DataType::Time => Ok("TIME".to_owned()),
364 DataType::Timestamp => Ok("DATETIME".to_owned()),
365 DataType::Timestamptz => Ok("TIMESTAMP".to_owned()),
366 DataType::Interval => Ok("INTERVAL".to_owned()),
367 DataType::Struct(structs) => {
368 let mut elements_vec = vec![];
369 for (name, datatype) in structs.iter() {
370 let element_string =
371 Self::get_string_and_check_support_from_datatype(datatype)?;
372 elements_vec.push(format!("{} {}", name, element_string));
373 }
374 Ok(format!("STRUCT<{}>", elements_vec.join(", ")))
375 }
376 DataType::List(l) => {
377 let element_string = Self::get_string_and_check_support_from_datatype(l.as_ref())?;
378 Ok(format!("ARRAY<{}>", element_string))
379 }
380 DataType::Bytea => Ok("BYTES".to_owned()),
381 DataType::Jsonb => Ok("JSON".to_owned()),
382 DataType::Serial => Ok("INT64".to_owned()),
383 DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!(
384 "INT256 is not supported for BigQuery sink."
385 ))),
386 DataType::Map(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
387 "MAP is not supported for BigQuery sink."
388 ))),
389 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
390 }
391 }
392
393 fn map_field(rw_field: &Field) -> Result<TableFieldSchema> {
394 let tfs = match &rw_field.data_type {
395 DataType::Boolean => TableFieldSchema::bool(&rw_field.name),
396 DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Serial => {
397 TableFieldSchema::integer(&rw_field.name)
398 }
399 DataType::Float32 => {
400 return Err(SinkError::BigQuery(anyhow::anyhow!(
401 "REAL is not supported for BigQuery sink. Please convert to FLOAT64 or other supported types."
402 )));
403 }
404 DataType::Float64 => TableFieldSchema::float(&rw_field.name),
405 DataType::Decimal => TableFieldSchema::numeric(&rw_field.name),
406 DataType::Date => TableFieldSchema::date(&rw_field.name),
407 DataType::Varchar => TableFieldSchema::string(&rw_field.name),
408 DataType::Time => TableFieldSchema::time(&rw_field.name),
409 DataType::Timestamp => TableFieldSchema::date_time(&rw_field.name),
410 DataType::Timestamptz => TableFieldSchema::timestamp(&rw_field.name),
411 DataType::Interval => {
412 return Err(SinkError::BigQuery(anyhow::anyhow!(
413 "INTERVAL is not supported for BigQuery sink. Please convert to VARCHAR or other supported types."
414 )));
415 }
416 DataType::Struct(st) => {
417 let mut sub_fields = Vec::with_capacity(st.len());
418 for (name, dt) in st.iter() {
419 let rw_field = Field::with_name(dt.clone(), name);
420 let field = Self::map_field(&rw_field)?;
421 sub_fields.push(field);
422 }
423 TableFieldSchema::record(&rw_field.name, sub_fields)
424 }
425 DataType::List(dt) => {
426 let inner_field = Self::map_field(&Field::with_name(*dt.clone(), &rw_field.name))?;
427 TableFieldSchema {
428 mode: Some("REPEATED".to_owned()),
429 ..inner_field
430 }
431 }
432
433 DataType::Bytea => TableFieldSchema::bytes(&rw_field.name),
434 DataType::Jsonb => TableFieldSchema::json(&rw_field.name),
435 DataType::Int256 => {
436 return Err(SinkError::BigQuery(anyhow::anyhow!(
437 "INT256 is not supported for BigQuery sink."
438 )));
439 }
440 DataType::Map(_) => {
441 return Err(SinkError::BigQuery(anyhow::anyhow!(
442 "MAP is not supported for BigQuery sink."
443 )));
444 }
445 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
446 };
447 Ok(tfs)
448 }
449
450 async fn create_table(
451 &self,
452 client: &Client,
453 project_id: &str,
454 dataset_id: &str,
455 table_id: &str,
456 fields: &Vec<Field>,
457 ) -> Result<Table> {
458 let dataset = client
459 .dataset()
460 .get(project_id, dataset_id)
461 .await
462 .map_err(|e| SinkError::BigQuery(e.into()))?;
463 let fields: Vec<_> = fields.iter().map(Self::map_field).collect::<Result<_>>()?;
464 let table = Table::from_dataset(&dataset, table_id, TableSchema::new(fields));
465
466 client
467 .table()
468 .create(table)
469 .await
470 .map_err(|e| SinkError::BigQuery(e.into()))
471 }
472}
473
474impl Sink for BigQuerySink {
475 type LogSinker = BigQueryLogSinker;
476
477 const SINK_NAME: &'static str = BIGQUERY_SINK;
478
479 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
480 let (writer, resp_stream) = BigQuerySinkWriter::new(
481 self.config.clone(),
482 self.schema.clone(),
483 self.pk_indices.clone(),
484 self.is_append_only,
485 )
486 .await?;
487 Ok(BigQueryLogSinker::new(
488 writer,
489 resp_stream,
490 BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE,
491 ))
492 }
493
494 async fn validate(&self) -> Result<()> {
495 risingwave_common::license::Feature::BigQuerySink
496 .check_available()
497 .map_err(|e| anyhow::anyhow!(e))?;
498 if !self.is_append_only && self.pk_indices.is_empty() {
499 return Err(SinkError::Config(anyhow!(
500 "Primary key not defined for upsert bigquery sink (please define in `primary_key` field)"
501 )));
502 }
503 let client = self
504 .config
505 .common
506 .build_client(&self.config.aws_auth_props)
507 .await?;
508 let BigQueryCommon {
509 project: project_id,
510 dataset: dataset_id,
511 table: table_id,
512 ..
513 } = &self.config.common;
514
515 if self.config.common.auto_create {
516 match client
517 .table()
518 .get(project_id, dataset_id, table_id, None)
519 .await
520 {
521 Err(BQError::RequestError(_)) => {
522 return self
524 .create_table(
525 &client,
526 project_id,
527 dataset_id,
528 table_id,
529 &self.schema.fields,
530 )
531 .await
532 .map(|_| ());
533 }
534 Err(e) => return Err(SinkError::BigQuery(e.into())),
535 _ => {}
536 }
537 }
538
539 let mut rs = client
540 .job()
541 .query(
542 &self.config.common.project,
543 QueryRequest::new(format!(
544 "SELECT column_name, data_type FROM `{}.{}.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '{}'",
545 project_id, dataset_id, table_id,
546 )),
547 ).await.map_err(|e| SinkError::BigQuery(e.into()))?;
548
549 let mut big_query_schema = HashMap::default();
550 while rs.next_row() {
551 big_query_schema.insert(
552 rs.get_string_by_name("column_name")
553 .map_err(|e| SinkError::BigQuery(e.into()))?
554 .ok_or_else(|| {
555 SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
556 })?,
557 rs.get_string_by_name("data_type")
558 .map_err(|e| SinkError::BigQuery(e.into()))?
559 .ok_or_else(|| {
560 SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
561 })?,
562 );
563 }
564
565 self.check_column_name_and_type(big_query_schema)?;
566 Ok(())
567 }
568}
569
570pub struct BigQuerySinkWriter {
571 pub config: BigQueryConfig,
572 #[expect(dead_code)]
573 schema: Schema,
574 #[expect(dead_code)]
575 pk_indices: Vec<usize>,
576 client: StorageWriterClient,
577 is_append_only: bool,
578 row_encoder: ProtoEncoder,
579 writer_pb_schema: ProtoSchema,
580 #[expect(dead_code)]
581 message_descriptor: MessageDescriptor,
582 write_stream: String,
583 proto_field: Option<FieldDescriptor>,
584}
585
586impl TryFrom<SinkParam> for BigQuerySink {
587 type Error = SinkError;
588
589 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
590 let schema = param.schema();
591 let config = BigQueryConfig::from_btreemap(param.properties)?;
592 BigQuerySink::new(
593 config,
594 schema,
595 param.downstream_pk,
596 param.sink_type.is_append_only(),
597 )
598 }
599}
600
601impl BigQuerySinkWriter {
602 pub async fn new(
603 config: BigQueryConfig,
604 schema: Schema,
605 pk_indices: Vec<usize>,
606 is_append_only: bool,
607 ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
608 let (client, resp_stream) = config
609 .common
610 .build_writer_client(&config.aws_auth_props)
611 .await?;
612 let mut descriptor_proto = build_protobuf_schema(
613 schema
614 .fields()
615 .iter()
616 .map(|f| (f.name.as_str(), &f.data_type)),
617 config.common.table.clone(),
618 )?;
619
620 if !is_append_only {
621 let field = FieldDescriptorProto {
622 name: Some(CHANGE_TYPE.to_owned()),
623 number: Some((schema.len() + 1) as i32),
624 r#type: Some(field_descriptor_proto::Type::String.into()),
625 ..Default::default()
626 };
627 descriptor_proto.field.push(field);
628 }
629
630 let descriptor_pool = build_protobuf_descriptor_pool(&descriptor_proto)?;
631 let message_descriptor = descriptor_pool
632 .get_message_by_name(&config.common.table)
633 .ok_or_else(|| {
634 SinkError::BigQuery(anyhow::anyhow!(
635 "Can't find message proto {}",
636 &config.common.table
637 ))
638 })?;
639 let proto_field = if !is_append_only {
640 let proto_field = message_descriptor
641 .get_field_by_name(CHANGE_TYPE)
642 .ok_or_else(|| {
643 SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE))
644 })?;
645 Some(proto_field)
646 } else {
647 None
648 };
649 let row_encoder = ProtoEncoder::new(
650 schema.clone(),
651 None,
652 message_descriptor.clone(),
653 ProtoHeader::None,
654 )?;
655 Ok((
656 Self {
657 write_stream: format!(
658 "projects/{}/datasets/{}/tables/{}/streams/_default",
659 config.common.project, config.common.dataset, config.common.table
660 ),
661 config,
662 schema,
663 pk_indices,
664 client,
665 is_append_only,
666 row_encoder,
667 message_descriptor,
668 proto_field,
669 writer_pb_schema: ProtoSchema {
670 proto_descriptor: Some(descriptor_proto),
671 },
672 },
673 resp_stream,
674 ))
675 }
676
677 fn append_only(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
678 let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
679 for (op, row) in chunk.rows() {
680 if op != Op::Insert {
681 continue;
682 }
683 serialized_rows.push(self.row_encoder.encode(row)?.ser_to()?)
684 }
685 Ok(serialized_rows)
686 }
687
688 fn upsert(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
689 let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
690 for (op, row) in chunk.rows() {
691 if op == Op::UpdateDelete {
692 continue;
693 }
694 let mut pb_row = self.row_encoder.encode(row)?;
695 match op {
696 Op::Insert => pb_row
697 .message
698 .try_set_field(
699 self.proto_field.as_ref().unwrap(),
700 prost_reflect::Value::String("UPSERT".to_owned()),
701 )
702 .map_err(|e| SinkError::BigQuery(e.into()))?,
703 Op::Delete => pb_row
704 .message
705 .try_set_field(
706 self.proto_field.as_ref().unwrap(),
707 prost_reflect::Value::String("DELETE".to_owned()),
708 )
709 .map_err(|e| SinkError::BigQuery(e.into()))?,
710 Op::UpdateDelete => continue,
711 Op::UpdateInsert => pb_row
712 .message
713 .try_set_field(
714 self.proto_field.as_ref().unwrap(),
715 prost_reflect::Value::String("UPSERT".to_owned()),
716 )
717 .map_err(|e| SinkError::BigQuery(e.into()))?,
718 };
719
720 serialized_rows.push(pb_row.ser_to()?)
721 }
722 Ok(serialized_rows)
723 }
724
725 fn write_chunk(&mut self, chunk: StreamChunk) -> Result<usize> {
726 let serialized_rows = if self.is_append_only {
727 self.append_only(chunk)?
728 } else {
729 self.upsert(chunk)?
730 };
731 if serialized_rows.is_empty() {
732 return Ok(0);
733 }
734 let mut result = Vec::new();
735 let mut result_inner = Vec::new();
736 let mut size_count = 0;
737 for i in serialized_rows {
738 size_count += i.len();
739 if size_count > MAX_ROW_SIZE {
740 result.push(result_inner);
741 result_inner = Vec::new();
742 size_count = i.len();
743 }
744 result_inner.push(i);
745 }
746 if !result_inner.is_empty() {
747 result.push(result_inner);
748 }
749 let len = result.len();
750 for serialized_rows in result {
751 let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
752 writer_schema: Some(self.writer_pb_schema.clone()),
753 rows: Some(ProtoRows { serialized_rows }),
754 });
755 self.client.append_rows(rows, self.write_stream.clone())?;
756 }
757 Ok(len)
758 }
759}
760
761#[try_stream(ok = (), error = SinkError)]
762pub async fn resp_to_stream(
763 resp_stream: impl Future<
764 Output = std::result::Result<
765 Response<google_cloud_gax::grpc::Streaming<AppendRowsResponse>>,
766 Status,
767 >,
768 >
769 + 'static
770 + Send,
771) {
772 let mut resp_stream = resp_stream
773 .await
774 .map_err(|e| SinkError::BigQuery(e.into()))?
775 .into_inner();
776 loop {
777 match resp_stream
778 .message()
779 .await
780 .map_err(|e| SinkError::BigQuery(e.into()))?
781 {
782 Some(append_rows_response) => {
783 if !append_rows_response.row_errors.is_empty() {
784 return Err(SinkError::BigQuery(anyhow::anyhow!(
785 "bigquery insert error {:?}",
786 append_rows_response.row_errors
787 )));
788 }
789 if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{
790 return Err(SinkError::BigQuery(anyhow::anyhow!(
791 "bigquery insert error {:?}",
792 status
793 )));
794 }
795 yield ();
796 }
797 None => {
798 return Err(SinkError::BigQuery(anyhow::anyhow!(
799 "bigquery insert error: end of resp stream",
800 )));
801 }
802 }
803 }
804}
805
806struct StorageWriterClient {
807 #[expect(dead_code)]
808 environment: Environment,
809 request_sender: mpsc::UnboundedSender<AppendRowsRequest>,
810}
811impl StorageWriterClient {
812 pub async fn new(
813 credentials: CredentialsFile,
814 ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
815 let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
816 Self::bigquery_grpc_auth_config(),
817 Box::new(credentials),
818 )
819 .await
820 .map_err(|e| SinkError::BigQuery(e.into()))?;
821 let conn_options = ConnectionOptions {
822 connect_timeout: CONNECT_TIMEOUT,
823 timeout: CONNECTION_TIMEOUT,
824 };
825 let environment = Environment::GoogleCloud(Box::new(ts_grpc));
826 let conn = ConnectionManager::new(DEFAULT_GRPC_CHANNEL_NUMS, &environment, &conn_options)
827 .await
828 .map_err(|e| SinkError::BigQuery(e.into()))?;
829 let mut client = conn.writer();
830
831 let (tx, rx) = mpsc::unbounded_channel();
832 let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
833
834 let resp = async move { client.append_rows(Request::new(stream)).await };
835 let resp_stream = resp_to_stream(resp);
836
837 Ok((
838 StorageWriterClient {
839 environment,
840 request_sender: tx,
841 },
842 resp_stream,
843 ))
844 }
845
846 pub fn append_rows(&mut self, row: AppendRowsRequestRows, write_stream: String) -> Result<()> {
847 let append_req = AppendRowsRequest {
848 write_stream: write_stream.clone(),
849 offset: None,
850 trace_id: Uuid::new_v4().hyphenated().to_string(),
851 missing_value_interpretations: HashMap::default(),
852 rows: Some(row),
853 default_missing_value_interpretation: MissingValueInterpretation::DefaultValue as i32,
854 };
855 self.request_sender
856 .send(append_req)
857 .map_err(|e| SinkError::BigQuery(e.into()))?;
858 Ok(())
859 }
860
861 fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
862 let mut auth_config = google_cloud_auth::project::Config::default();
863 auth_config =
864 auth_config.with_audience(google_cloud_bigquery::grpc::apiv1::conn_pool::AUDIENCE);
865 auth_config =
866 auth_config.with_scopes(&google_cloud_bigquery::grpc::apiv1::conn_pool::SCOPES);
867 auth_config
868 }
869}
870
871fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> Result<prost_reflect::DescriptorPool> {
872 let file_descriptor = FileDescriptorProto {
873 message_type: vec![desc.clone()],
874 name: Some("bigquery".to_owned()),
875 ..Default::default()
876 };
877
878 prost_reflect::DescriptorPool::from_file_descriptor_set(FileDescriptorSet {
879 file: vec![file_descriptor],
880 })
881 .context("failed to build descriptor pool")
882 .map_err(SinkError::BigQuery)
883}
884
885fn build_protobuf_schema<'a>(
886 fields: impl Iterator<Item = (&'a str, &'a DataType)>,
887 name: String,
888) -> Result<DescriptorProto> {
889 let mut proto = DescriptorProto {
890 name: Some(name),
891 ..Default::default()
892 };
893 let mut struct_vec = vec![];
894 let field_vec = fields
895 .enumerate()
896 .map(|(index, (name, data_type))| {
897 let (field, des_proto) =
898 build_protobuf_field(data_type, (index + 1) as i32, name.to_owned())?;
899 if let Some(sv) = des_proto {
900 struct_vec.push(sv);
901 }
902 Ok(field)
903 })
904 .collect::<Result<Vec<_>>>()?;
905 proto.field = field_vec;
906 proto.nested_type = struct_vec;
907 Ok(proto)
908}
909
910fn build_protobuf_field(
911 data_type: &DataType,
912 index: i32,
913 name: String,
914) -> Result<(FieldDescriptorProto, Option<DescriptorProto>)> {
915 let mut field = FieldDescriptorProto {
916 name: Some(name.clone()),
917 number: Some(index),
918 ..Default::default()
919 };
920 match data_type {
921 DataType::Boolean => field.r#type = Some(field_descriptor_proto::Type::Bool.into()),
922 DataType::Int32 => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
923 DataType::Int16 | DataType::Int64 => {
924 field.r#type = Some(field_descriptor_proto::Type::Int64.into())
925 }
926 DataType::Float64 => field.r#type = Some(field_descriptor_proto::Type::Double.into()),
927 DataType::Decimal => field.r#type = Some(field_descriptor_proto::Type::String.into()),
928 DataType::Date => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
929 DataType::Varchar => field.r#type = Some(field_descriptor_proto::Type::String.into()),
930 DataType::Time => field.r#type = Some(field_descriptor_proto::Type::String.into()),
931 DataType::Timestamp => field.r#type = Some(field_descriptor_proto::Type::String.into()),
932 DataType::Timestamptz => field.r#type = Some(field_descriptor_proto::Type::String.into()),
933 DataType::Interval => field.r#type = Some(field_descriptor_proto::Type::String.into()),
934 DataType::Struct(s) => {
935 field.r#type = Some(field_descriptor_proto::Type::Message.into());
936 let name = format!("Struct{}", name);
937 let sub_proto = build_protobuf_schema(s.iter(), name.clone())?;
938 field.type_name = Some(name);
939 return Ok((field, Some(sub_proto)));
940 }
941 DataType::List(l) => {
942 let (mut field, proto) = build_protobuf_field(l.as_ref(), index, name.clone())?;
943 field.label = Some(field_descriptor_proto::Label::Repeated.into());
944 return Ok((field, proto));
945 }
946 DataType::Bytea => field.r#type = Some(field_descriptor_proto::Type::Bytes.into()),
947 DataType::Jsonb => field.r#type = Some(field_descriptor_proto::Type::String.into()),
948 DataType::Serial => field.r#type = Some(field_descriptor_proto::Type::Int64.into()),
949 DataType::Float32 | DataType::Int256 => {
950 return Err(SinkError::BigQuery(anyhow::anyhow!(
951 "Don't support Float32 and Int256"
952 )));
953 }
954 DataType::Map(_) => todo!(),
955 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
956 }
957 Ok((field, None))
958}
959
960#[cfg(test)]
961mod test {
962
963 use std::assert_matches::assert_matches;
964
965 use risingwave_common::catalog::{Field, Schema};
966 use risingwave_common::types::{DataType, StructType};
967
968 use crate::sink::big_query::{
969 BigQuerySink, build_protobuf_descriptor_pool, build_protobuf_schema,
970 };
971
972 #[tokio::test]
973 async fn test_type_check() {
974 let big_query_type_string = "ARRAY<STRUCT<v1 ARRAY<INT64>, v2 STRUCT<v1 INT64, v2 INT64>>>";
975 let rw_datatype = DataType::List(Box::new(DataType::Struct(StructType::new(vec![
976 ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
977 (
978 "v2".to_owned(),
979 DataType::Struct(StructType::new(vec![
980 ("v1".to_owned(), DataType::Int64),
981 ("v2".to_owned(), DataType::Int64),
982 ])),
983 ),
984 ]))));
985 assert_eq!(
986 BigQuerySink::get_string_and_check_support_from_datatype(&rw_datatype).unwrap(),
987 big_query_type_string
988 );
989 }
990
991 #[tokio::test]
992 async fn test_schema_check() {
993 let schema = Schema {
994 fields: vec![
995 Field::with_name(DataType::Int64, "v1"),
996 Field::with_name(DataType::Float64, "v2"),
997 Field::with_name(
998 DataType::List(Box::new(DataType::Struct(StructType::new(vec![
999 ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
1000 (
1001 "v3".to_owned(),
1002 DataType::Struct(StructType::new(vec![
1003 ("v1".to_owned(), DataType::Int64),
1004 ("v2".to_owned(), DataType::Int64),
1005 ])),
1006 ),
1007 ])))),
1008 "v3",
1009 ),
1010 ],
1011 };
1012 let fields = schema
1013 .fields()
1014 .iter()
1015 .map(|f| (f.name.as_str(), &f.data_type));
1016 let desc = build_protobuf_schema(fields, "t1".to_owned()).unwrap();
1017 let pool = build_protobuf_descriptor_pool(&desc).unwrap();
1018 let t1_message = pool.get_message_by_name("t1").unwrap();
1019 assert_matches!(
1020 t1_message.get_field_by_name("v1").unwrap().kind(),
1021 prost_reflect::Kind::Int64
1022 );
1023 assert_matches!(
1024 t1_message.get_field_by_name("v2").unwrap().kind(),
1025 prost_reflect::Kind::Double
1026 );
1027 assert_matches!(
1028 t1_message.get_field_by_name("v3").unwrap().kind(),
1029 prost_reflect::Kind::Message(_)
1030 );
1031
1032 let v3_message = pool.get_message_by_name("t1.Structv3").unwrap();
1033 assert_matches!(
1034 v3_message.get_field_by_name("v1").unwrap().kind(),
1035 prost_reflect::Kind::Int64
1036 );
1037 assert!(v3_message.get_field_by_name("v1").unwrap().is_list());
1038
1039 let v3_v3_message = pool.get_message_by_name("t1.Structv3.Structv3").unwrap();
1040 assert_matches!(
1041 v3_v3_message.get_field_by_name("v1").unwrap().kind(),
1042 prost_reflect::Kind::Int64
1043 );
1044 assert_matches!(
1045 v3_v3_message.get_field_by_name("v2").unwrap().kind(),
1046 prost_reflect::Kind::Int64
1047 );
1048 }
1049}