1use core::pin::Pin;
16use core::time::Duration;
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use anyhow::{Context, anyhow};
20use futures::future::pending;
21use futures::prelude::Future;
22use futures::{Stream, StreamExt};
23use futures_async_stream::try_stream;
24use gcp_bigquery_client::Client;
25use gcp_bigquery_client::error::BQError;
26use gcp_bigquery_client::model::query_request::QueryRequest;
27use gcp_bigquery_client::model::table::Table;
28use gcp_bigquery_client::model::table_field_schema::TableFieldSchema;
29use gcp_bigquery_client::model::table_schema::TableSchema;
30use google_cloud_bigquery::grpc::apiv1::conn_pool::ConnectionManager;
31use google_cloud_gax::conn::{ConnectionOptions, Environment};
32use google_cloud_gax::grpc::Request;
33use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::{
34 MissingValueInterpretation, ProtoData, Rows as AppendRowsRequestRows,
35};
36use google_cloud_googleapis::cloud::bigquery::storage::v1::{
37 AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema,
38};
39use google_cloud_pubsub::client::google_cloud_auth;
40use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
41use phf::{Set, phf_set};
42use prost_reflect::{FieldDescriptor, MessageDescriptor};
43use prost_types::{
44 DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet,
45 field_descriptor_proto,
46};
47use risingwave_common::array::{Op, StreamChunk};
48use risingwave_common::catalog::{Field, Schema};
49use risingwave_common::types::DataType;
50use serde_derive::Deserialize;
51use serde_with::{DisplayFromStr, serde_as};
52use simd_json::prelude::ArrayTrait;
53use tokio::sync::mpsc;
54use tonic::{Response, Status, async_trait};
55use url::Url;
56use uuid::Uuid;
57use with_options::WithOptions;
58use yup_oauth2::ServiceAccountKey;
59
60use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo};
61use super::log_store::{LogStoreReadItem, TruncateOffset};
62use super::{
63 LogSinker, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkLogReader,
64};
65use crate::aws_utils::load_file_descriptor_from_s3;
66use crate::connector_common::AwsAuthProps;
67use crate::enforce_secret::EnforceSecret;
68use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam};
69
70pub const BIGQUERY_SINK: &str = "bigquery";
71pub const CHANGE_TYPE: &str = "_CHANGE_TYPE";
72const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4;
73const CONNECT_TIMEOUT: Option<Duration> = Some(Duration::from_secs(30));
74const CONNECTION_TIMEOUT: Option<Duration> = None;
75const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
76const MAX_ROW_SIZE: usize = 8 * 1024 * 1024;
78
79#[serde_as]
80#[derive(Deserialize, Debug, Clone, WithOptions)]
81pub struct BigQueryCommon {
82 #[serde(rename = "bigquery.local.path")]
83 pub local_path: Option<String>,
84 #[serde(rename = "bigquery.s3.path")]
85 pub s3_path: Option<String>,
86 #[serde(rename = "bigquery.project")]
87 pub project: String,
88 #[serde(rename = "bigquery.dataset")]
89 pub dataset: String,
90 #[serde(rename = "bigquery.table")]
91 pub table: String,
92 #[serde(default)] #[serde_as(as = "DisplayFromStr")]
94 pub auto_create: bool,
95 #[serde(rename = "bigquery.credentials")]
96 pub credentials: Option<String>,
97}
98
99impl EnforceSecret for BigQueryCommon {
100 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
101 "bigquery.credentials",
102 };
103}
104
105struct BigQueryFutureManager {
106 offset_queue: VecDeque<(TruncateOffset, usize)>,
113 resp_stream: Pin<Box<dyn Stream<Item = Result<()>> + Send>>,
114}
115impl BigQueryFutureManager {
116 pub fn new(
117 max_future_num: usize,
118 resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
119 ) -> Self {
120 let offset_queue = VecDeque::with_capacity(max_future_num);
121 Self {
122 offset_queue,
123 resp_stream: Box::pin(resp_stream),
124 }
125 }
126
127 pub fn add_offset(&mut self, offset: TruncateOffset, resp_num: usize) {
128 self.offset_queue.push_back((offset, resp_num));
129 }
130
131 pub async fn next_offset(&mut self) -> Result<TruncateOffset> {
132 if let Some((_offset, remaining_resp_num)) = self.offset_queue.front_mut() {
133 if *remaining_resp_num == 0 {
134 return Ok(self.offset_queue.pop_front().unwrap().0);
135 }
136 while *remaining_resp_num > 0 {
137 self.resp_stream
138 .next()
139 .await
140 .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??;
141 *remaining_resp_num -= 1;
142 }
143 Ok(self.offset_queue.pop_front().unwrap().0)
144 } else {
145 pending().await
146 }
147 }
148}
149pub struct BigQueryLogSinker {
150 writer: BigQuerySinkWriter,
151 bigquery_future_manager: BigQueryFutureManager,
152 future_num: usize,
153}
154impl BigQueryLogSinker {
155 pub fn new(
156 writer: BigQuerySinkWriter,
157 resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
158 future_num: usize,
159 ) -> Self {
160 Self {
161 writer,
162 bigquery_future_manager: BigQueryFutureManager::new(future_num, resp_stream),
163 future_num,
164 }
165 }
166}
167
168#[async_trait]
169impl LogSinker for BigQueryLogSinker {
170 async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
171 log_reader.start_from(None).await?;
172 loop {
173 tokio::select!(
174 offset = self.bigquery_future_manager.next_offset() => {
175 log_reader.truncate(offset?)?;
176 }
177 item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => {
178 let (epoch, item) = item_result?;
179 match item {
180 LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
181 let resp_num = self.writer.write_chunk(chunk)?;
182 self.bigquery_future_manager
183 .add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num);
184 }
185 LogStoreReadItem::Barrier { .. } => {
186 self.bigquery_future_manager
187 .add_offset(TruncateOffset::Barrier { epoch },0);
188 }
189 }
190 }
191 )
192 }
193 }
194}
195
196impl BigQueryCommon {
197 async fn build_client(&self, aws_auth_props: &AwsAuthProps) -> Result<Client> {
198 let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
199
200 let service_account = serde_json::from_str::<ServiceAccountKey>(&auth_json)
201 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
202 let client: Client = Client::from_service_account_key(service_account, false)
203 .await
204 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
205 Ok(client)
206 }
207
208 async fn build_writer_client(
209 &self,
210 aws_auth_props: &AwsAuthProps,
211 ) -> Result<(StorageWriterClient, impl Stream<Item = Result<()>> + use<>)> {
212 let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
213
214 let credentials_file = CredentialsFile::new_from_str(&auth_json)
215 .await
216 .map_err(|e| SinkError::BigQuery(e.into()))?;
217 StorageWriterClient::new(credentials_file).await
218 }
219
220 async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> {
221 if let Some(credentials) = &self.credentials {
222 Ok(credentials.clone())
223 } else if let Some(local_path) = &self.local_path {
224 std::fs::read_to_string(local_path)
225 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))
226 } else if let Some(s3_path) = &self.s3_path {
227 let url =
228 Url::parse(s3_path).map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
229 let auth_vec = load_file_descriptor_from_s3(&url, aws_auth_props)
230 .await
231 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
232 Ok(String::from_utf8(auth_vec).map_err(|e| SinkError::BigQuery(e.into()))?)
233 } else {
234 Err(SinkError::BigQuery(anyhow::anyhow!(
235 "`bigquery.local.path` and `bigquery.s3.path` set at least one, configure as needed."
236 )))
237 }
238 }
239}
240
241#[serde_as]
242#[derive(Clone, Debug, Deserialize, WithOptions)]
243pub struct BigQueryConfig {
244 #[serde(flatten)]
245 pub common: BigQueryCommon,
246 #[serde(flatten)]
247 pub aws_auth_props: AwsAuthProps,
248 pub r#type: String, }
250
251impl EnforceSecret for BigQueryConfig {
252 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
253 BigQueryCommon::enforce_one(prop)?;
254 AwsAuthProps::enforce_one(prop)?;
255 Ok(())
256 }
257}
258
259impl BigQueryConfig {
260 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
261 let config =
262 serde_json::from_value::<BigQueryConfig>(serde_json::to_value(properties).unwrap())
263 .map_err(|e| SinkError::Config(anyhow!(e)))?;
264 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
265 return Err(SinkError::Config(anyhow!(
266 "`{}` must be {}, or {}",
267 SINK_TYPE_OPTION,
268 SINK_TYPE_APPEND_ONLY,
269 SINK_TYPE_UPSERT
270 )));
271 }
272 Ok(config)
273 }
274}
275
276#[derive(Debug)]
277pub struct BigQuerySink {
278 pub config: BigQueryConfig,
279 schema: Schema,
280 pk_indices: Vec<usize>,
281 is_append_only: bool,
282}
283
284impl EnforceSecret for BigQuerySink {
285 fn enforce_secret<'a>(
286 prop_iter: impl Iterator<Item = &'a str>,
287 ) -> crate::error::ConnectorResult<()> {
288 for prop in prop_iter {
289 BigQueryConfig::enforce_one(prop)?;
290 }
291 Ok(())
292 }
293}
294
295impl BigQuerySink {
296 pub fn new(
297 config: BigQueryConfig,
298 schema: Schema,
299 pk_indices: Vec<usize>,
300 is_append_only: bool,
301 ) -> Result<Self> {
302 Ok(Self {
303 config,
304 schema,
305 pk_indices,
306 is_append_only,
307 })
308 }
309}
310
311impl BigQuerySink {
312 fn check_column_name_and_type(
313 &self,
314 big_query_columns_desc: HashMap<String, String>,
315 ) -> Result<()> {
316 let rw_fields_name = self.schema.fields();
317 if big_query_columns_desc.is_empty() {
318 return Err(SinkError::BigQuery(anyhow::anyhow!(
319 "Cannot find table in bigquery"
320 )));
321 }
322 if rw_fields_name.len().ne(&big_query_columns_desc.len()) {
323 return Err(SinkError::BigQuery(anyhow::anyhow!(
324 "The length of the RisingWave column {} must be equal to the length of the bigquery column {}",
325 rw_fields_name.len(),
326 big_query_columns_desc.len()
327 )));
328 }
329
330 for i in rw_fields_name {
331 let value = big_query_columns_desc.get(&i.name).ok_or_else(|| {
332 SinkError::BigQuery(anyhow::anyhow!(
333 "Column `{:?}` on RisingWave side is not found on BigQuery side.",
334 i.name
335 ))
336 })?;
337 let data_type_string = Self::get_string_and_check_support_from_datatype(&i.data_type)?;
338 if data_type_string.ne(value) {
339 return Err(SinkError::BigQuery(anyhow::anyhow!(
340 "Data type mismatch for column `{:?}`. BigQuery side: `{:?}`, RisingWave side: `{:?}`. ",
341 i.name,
342 value,
343 data_type_string
344 )));
345 };
346 }
347 Ok(())
348 }
349
350 fn get_string_and_check_support_from_datatype(rw_data_type: &DataType) -> Result<String> {
351 match rw_data_type {
352 DataType::Boolean => Ok("BOOL".to_owned()),
353 DataType::Int16 => Ok("INT64".to_owned()),
354 DataType::Int32 => Ok("INT64".to_owned()),
355 DataType::Int64 => Ok("INT64".to_owned()),
356 DataType::Float32 => Err(SinkError::BigQuery(anyhow::anyhow!(
357 "Bigquery cannot support real"
358 ))),
359 DataType::Float64 => Ok("FLOAT64".to_owned()),
360 DataType::Decimal => Ok("NUMERIC".to_owned()),
361 DataType::Date => Ok("DATE".to_owned()),
362 DataType::Varchar => Ok("STRING".to_owned()),
363 DataType::Time => Ok("TIME".to_owned()),
364 DataType::Timestamp => Ok("DATETIME".to_owned()),
365 DataType::Timestamptz => Ok("TIMESTAMP".to_owned()),
366 DataType::Interval => Ok("INTERVAL".to_owned()),
367 DataType::Struct(structs) => {
368 let mut elements_vec = vec![];
369 for (name, datatype) in structs.iter() {
370 let element_string =
371 Self::get_string_and_check_support_from_datatype(datatype)?;
372 elements_vec.push(format!("{} {}", name, element_string));
373 }
374 Ok(format!("STRUCT<{}>", elements_vec.join(", ")))
375 }
376 DataType::List(l) => {
377 let element_string = Self::get_string_and_check_support_from_datatype(l.as_ref())?;
378 Ok(format!("ARRAY<{}>", element_string))
379 }
380 DataType::Bytea => Ok("BYTES".to_owned()),
381 DataType::Jsonb => Ok("JSON".to_owned()),
382 DataType::Serial => Ok("INT64".to_owned()),
383 DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!(
384 "Bigquery cannot support Int256"
385 ))),
386 DataType::Map(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
387 "Bigquery cannot support Map"
388 ))),
389 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
390 }
391 }
392
393 fn map_field(rw_field: &Field) -> Result<TableFieldSchema> {
394 let tfs = match &rw_field.data_type {
395 DataType::Boolean => TableFieldSchema::bool(&rw_field.name),
396 DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Serial => {
397 TableFieldSchema::integer(&rw_field.name)
398 }
399 DataType::Float32 => {
400 return Err(SinkError::BigQuery(anyhow::anyhow!(
401 "Bigquery cannot support real"
402 )));
403 }
404 DataType::Float64 => TableFieldSchema::float(&rw_field.name),
405 DataType::Decimal => TableFieldSchema::numeric(&rw_field.name),
406 DataType::Date => TableFieldSchema::date(&rw_field.name),
407 DataType::Varchar => TableFieldSchema::string(&rw_field.name),
408 DataType::Time => TableFieldSchema::time(&rw_field.name),
409 DataType::Timestamp => TableFieldSchema::date_time(&rw_field.name),
410 DataType::Timestamptz => TableFieldSchema::timestamp(&rw_field.name),
411 DataType::Interval => {
412 return Err(SinkError::BigQuery(anyhow::anyhow!(
413 "Bigquery cannot support Interval"
414 )));
415 }
416 DataType::Struct(st) => {
417 let mut sub_fields = Vec::with_capacity(st.len());
418 for (name, dt) in st.iter() {
419 let rw_field = Field::with_name(dt.clone(), name);
420 let field = Self::map_field(&rw_field)?;
421 sub_fields.push(field);
422 }
423 TableFieldSchema::record(&rw_field.name, sub_fields)
424 }
425 DataType::List(dt) => {
426 let inner_field = Self::map_field(&Field::with_name(*dt.clone(), &rw_field.name))?;
427 TableFieldSchema {
428 mode: Some("REPEATED".to_owned()),
429 ..inner_field
430 }
431 }
432
433 DataType::Bytea => TableFieldSchema::bytes(&rw_field.name),
434 DataType::Jsonb => TableFieldSchema::json(&rw_field.name),
435 DataType::Int256 => {
436 return Err(SinkError::BigQuery(anyhow::anyhow!(
437 "Bigquery cannot support Int256"
438 )));
439 }
440 DataType::Map(_) => {
441 return Err(SinkError::BigQuery(anyhow::anyhow!(
442 "Bigquery cannot support Map"
443 )));
444 }
445 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
446 };
447 Ok(tfs)
448 }
449
450 async fn create_table(
451 &self,
452 client: &Client,
453 project_id: &str,
454 dataset_id: &str,
455 table_id: &str,
456 fields: &Vec<Field>,
457 ) -> Result<Table> {
458 let dataset = client
459 .dataset()
460 .get(project_id, dataset_id)
461 .await
462 .map_err(|e| SinkError::BigQuery(e.into()))?;
463 let fields: Vec<_> = fields.iter().map(Self::map_field).collect::<Result<_>>()?;
464 let table = Table::from_dataset(&dataset, table_id, TableSchema::new(fields));
465
466 client
467 .table()
468 .create(table)
469 .await
470 .map_err(|e| SinkError::BigQuery(e.into()))
471 }
472}
473
474impl Sink for BigQuerySink {
475 type Coordinator = DummySinkCommitCoordinator;
476 type LogSinker = BigQueryLogSinker;
477
478 const SINK_NAME: &'static str = BIGQUERY_SINK;
479
480 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
481 let (writer, resp_stream) = BigQuerySinkWriter::new(
482 self.config.clone(),
483 self.schema.clone(),
484 self.pk_indices.clone(),
485 self.is_append_only,
486 )
487 .await?;
488 Ok(BigQueryLogSinker::new(
489 writer,
490 resp_stream,
491 BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE,
492 ))
493 }
494
495 async fn validate(&self) -> Result<()> {
496 risingwave_common::license::Feature::BigQuerySink
497 .check_available()
498 .map_err(|e| anyhow::anyhow!(e))?;
499 if !self.is_append_only && self.pk_indices.is_empty() {
500 return Err(SinkError::Config(anyhow!(
501 "Primary key not defined for upsert bigquery sink (please define in `primary_key` field)"
502 )));
503 }
504 let client = self
505 .config
506 .common
507 .build_client(&self.config.aws_auth_props)
508 .await?;
509 let BigQueryCommon {
510 project: project_id,
511 dataset: dataset_id,
512 table: table_id,
513 ..
514 } = &self.config.common;
515
516 if self.config.common.auto_create {
517 match client
518 .table()
519 .get(project_id, dataset_id, table_id, None)
520 .await
521 {
522 Err(BQError::RequestError(_)) => {
523 return self
525 .create_table(
526 &client,
527 project_id,
528 dataset_id,
529 table_id,
530 &self.schema.fields,
531 )
532 .await
533 .map(|_| ());
534 }
535 Err(e) => return Err(SinkError::BigQuery(e.into())),
536 _ => {}
537 }
538 }
539
540 let mut rs = client
541 .job()
542 .query(
543 &self.config.common.project,
544 QueryRequest::new(format!(
545 "SELECT column_name, data_type FROM `{}.{}.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '{}'",
546 project_id, dataset_id, table_id,
547 )),
548 ).await.map_err(|e| SinkError::BigQuery(e.into()))?;
549
550 let mut big_query_schema = HashMap::default();
551 while rs.next_row() {
552 big_query_schema.insert(
553 rs.get_string_by_name("column_name")
554 .map_err(|e| SinkError::BigQuery(e.into()))?
555 .ok_or_else(|| {
556 SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
557 })?,
558 rs.get_string_by_name("data_type")
559 .map_err(|e| SinkError::BigQuery(e.into()))?
560 .ok_or_else(|| {
561 SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
562 })?,
563 );
564 }
565
566 self.check_column_name_and_type(big_query_schema)?;
567 Ok(())
568 }
569}
570
571pub struct BigQuerySinkWriter {
572 pub config: BigQueryConfig,
573 #[expect(dead_code)]
574 schema: Schema,
575 #[expect(dead_code)]
576 pk_indices: Vec<usize>,
577 client: StorageWriterClient,
578 is_append_only: bool,
579 row_encoder: ProtoEncoder,
580 writer_pb_schema: ProtoSchema,
581 #[expect(dead_code)]
582 message_descriptor: MessageDescriptor,
583 write_stream: String,
584 proto_field: Option<FieldDescriptor>,
585}
586
587impl TryFrom<SinkParam> for BigQuerySink {
588 type Error = SinkError;
589
590 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
591 let schema = param.schema();
592 let config = BigQueryConfig::from_btreemap(param.properties)?;
593 BigQuerySink::new(
594 config,
595 schema,
596 param.downstream_pk,
597 param.sink_type.is_append_only(),
598 )
599 }
600}
601
602impl BigQuerySinkWriter {
603 pub async fn new(
604 config: BigQueryConfig,
605 schema: Schema,
606 pk_indices: Vec<usize>,
607 is_append_only: bool,
608 ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
609 let (client, resp_stream) = config
610 .common
611 .build_writer_client(&config.aws_auth_props)
612 .await?;
613 let mut descriptor_proto = build_protobuf_schema(
614 schema
615 .fields()
616 .iter()
617 .map(|f| (f.name.as_str(), &f.data_type)),
618 config.common.table.clone(),
619 )?;
620
621 if !is_append_only {
622 let field = FieldDescriptorProto {
623 name: Some(CHANGE_TYPE.to_owned()),
624 number: Some((schema.len() + 1) as i32),
625 r#type: Some(field_descriptor_proto::Type::String.into()),
626 ..Default::default()
627 };
628 descriptor_proto.field.push(field);
629 }
630
631 let descriptor_pool = build_protobuf_descriptor_pool(&descriptor_proto)?;
632 let message_descriptor = descriptor_pool
633 .get_message_by_name(&config.common.table)
634 .ok_or_else(|| {
635 SinkError::BigQuery(anyhow::anyhow!(
636 "Can't find message proto {}",
637 &config.common.table
638 ))
639 })?;
640 let proto_field = if !is_append_only {
641 let proto_field = message_descriptor
642 .get_field_by_name(CHANGE_TYPE)
643 .ok_or_else(|| {
644 SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE))
645 })?;
646 Some(proto_field)
647 } else {
648 None
649 };
650 let row_encoder = ProtoEncoder::new(
651 schema.clone(),
652 None,
653 message_descriptor.clone(),
654 ProtoHeader::None,
655 )?;
656 Ok((
657 Self {
658 write_stream: format!(
659 "projects/{}/datasets/{}/tables/{}/streams/_default",
660 config.common.project, config.common.dataset, config.common.table
661 ),
662 config,
663 schema,
664 pk_indices,
665 client,
666 is_append_only,
667 row_encoder,
668 message_descriptor,
669 proto_field,
670 writer_pb_schema: ProtoSchema {
671 proto_descriptor: Some(descriptor_proto),
672 },
673 },
674 resp_stream,
675 ))
676 }
677
678 fn append_only(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
679 let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
680 for (op, row) in chunk.rows() {
681 if op != Op::Insert {
682 continue;
683 }
684 serialized_rows.push(self.row_encoder.encode(row)?.ser_to()?)
685 }
686 Ok(serialized_rows)
687 }
688
689 fn upsert(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
690 let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
691 for (op, row) in chunk.rows() {
692 if op == Op::UpdateDelete {
693 continue;
694 }
695 let mut pb_row = self.row_encoder.encode(row)?;
696 match op {
697 Op::Insert => pb_row
698 .message
699 .try_set_field(
700 self.proto_field.as_ref().unwrap(),
701 prost_reflect::Value::String("UPSERT".to_owned()),
702 )
703 .map_err(|e| SinkError::BigQuery(e.into()))?,
704 Op::Delete => pb_row
705 .message
706 .try_set_field(
707 self.proto_field.as_ref().unwrap(),
708 prost_reflect::Value::String("DELETE".to_owned()),
709 )
710 .map_err(|e| SinkError::BigQuery(e.into()))?,
711 Op::UpdateDelete => continue,
712 Op::UpdateInsert => pb_row
713 .message
714 .try_set_field(
715 self.proto_field.as_ref().unwrap(),
716 prost_reflect::Value::String("UPSERT".to_owned()),
717 )
718 .map_err(|e| SinkError::BigQuery(e.into()))?,
719 };
720
721 serialized_rows.push(pb_row.ser_to()?)
722 }
723 Ok(serialized_rows)
724 }
725
726 fn write_chunk(&mut self, chunk: StreamChunk) -> Result<usize> {
727 let serialized_rows = if self.is_append_only {
728 self.append_only(chunk)?
729 } else {
730 self.upsert(chunk)?
731 };
732 if serialized_rows.is_empty() {
733 return Ok(0);
734 }
735 let mut result = Vec::new();
736 let mut result_inner = Vec::new();
737 let mut size_count = 0;
738 for i in serialized_rows {
739 size_count += i.len();
740 if size_count > MAX_ROW_SIZE {
741 result.push(result_inner);
742 result_inner = Vec::new();
743 size_count = i.len();
744 }
745 result_inner.push(i);
746 }
747 if !result_inner.is_empty() {
748 result.push(result_inner);
749 }
750 let len = result.len();
751 for serialized_rows in result {
752 let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
753 writer_schema: Some(self.writer_pb_schema.clone()),
754 rows: Some(ProtoRows { serialized_rows }),
755 });
756 self.client.append_rows(rows, self.write_stream.clone())?;
757 }
758 Ok(len)
759 }
760}
761
762#[try_stream(ok = (), error = SinkError)]
763pub async fn resp_to_stream(
764 resp_stream: impl Future<
765 Output = std::result::Result<
766 Response<google_cloud_gax::grpc::Streaming<AppendRowsResponse>>,
767 Status,
768 >,
769 >
770 + 'static
771 + Send,
772) {
773 let mut resp_stream = resp_stream
774 .await
775 .map_err(|e| SinkError::BigQuery(e.into()))?
776 .into_inner();
777 loop {
778 match resp_stream
779 .message()
780 .await
781 .map_err(|e| SinkError::BigQuery(e.into()))?
782 {
783 Some(append_rows_response) => {
784 if !append_rows_response.row_errors.is_empty() {
785 return Err(SinkError::BigQuery(anyhow::anyhow!(
786 "bigquery insert error {:?}",
787 append_rows_response.row_errors
788 )));
789 }
790 if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{
791 return Err(SinkError::BigQuery(anyhow::anyhow!(
792 "bigquery insert error {:?}",
793 status
794 )));
795 }
796 yield ();
797 }
798 None => {
799 return Err(SinkError::BigQuery(anyhow::anyhow!(
800 "bigquery insert error: end of resp stream",
801 )));
802 }
803 }
804 }
805}
806
807struct StorageWriterClient {
808 #[expect(dead_code)]
809 environment: Environment,
810 request_sender: mpsc::UnboundedSender<AppendRowsRequest>,
811}
812impl StorageWriterClient {
813 pub async fn new(
814 credentials: CredentialsFile,
815 ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
816 let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
817 Self::bigquery_grpc_auth_config(),
818 Box::new(credentials),
819 )
820 .await
821 .map_err(|e| SinkError::BigQuery(e.into()))?;
822 let conn_options = ConnectionOptions {
823 connect_timeout: CONNECT_TIMEOUT,
824 timeout: CONNECTION_TIMEOUT,
825 };
826 let environment = Environment::GoogleCloud(Box::new(ts_grpc));
827 let conn = ConnectionManager::new(DEFAULT_GRPC_CHANNEL_NUMS, &environment, &conn_options)
828 .await
829 .map_err(|e| SinkError::BigQuery(e.into()))?;
830 let mut client = conn.writer();
831
832 let (tx, rx) = mpsc::unbounded_channel();
833 let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
834
835 let resp = async move { client.append_rows(Request::new(stream)).await };
836 let resp_stream = resp_to_stream(resp);
837
838 Ok((
839 StorageWriterClient {
840 environment,
841 request_sender: tx,
842 },
843 resp_stream,
844 ))
845 }
846
847 pub fn append_rows(&mut self, row: AppendRowsRequestRows, write_stream: String) -> Result<()> {
848 let append_req = AppendRowsRequest {
849 write_stream: write_stream.clone(),
850 offset: None,
851 trace_id: Uuid::new_v4().hyphenated().to_string(),
852 missing_value_interpretations: HashMap::default(),
853 rows: Some(row),
854 default_missing_value_interpretation: MissingValueInterpretation::DefaultValue as i32,
855 };
856 self.request_sender
857 .send(append_req)
858 .map_err(|e| SinkError::BigQuery(e.into()))?;
859 Ok(())
860 }
861
862 fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
863 let mut auth_config = google_cloud_auth::project::Config::default();
864 auth_config =
865 auth_config.with_audience(google_cloud_bigquery::grpc::apiv1::conn_pool::AUDIENCE);
866 auth_config =
867 auth_config.with_scopes(&google_cloud_bigquery::grpc::apiv1::conn_pool::SCOPES);
868 auth_config
869 }
870}
871
872fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> Result<prost_reflect::DescriptorPool> {
873 let file_descriptor = FileDescriptorProto {
874 message_type: vec![desc.clone()],
875 name: Some("bigquery".to_owned()),
876 ..Default::default()
877 };
878
879 prost_reflect::DescriptorPool::from_file_descriptor_set(FileDescriptorSet {
880 file: vec![file_descriptor],
881 })
882 .context("failed to build descriptor pool")
883 .map_err(SinkError::BigQuery)
884}
885
886fn build_protobuf_schema<'a>(
887 fields: impl Iterator<Item = (&'a str, &'a DataType)>,
888 name: String,
889) -> Result<DescriptorProto> {
890 let mut proto = DescriptorProto {
891 name: Some(name),
892 ..Default::default()
893 };
894 let mut struct_vec = vec![];
895 let field_vec = fields
896 .enumerate()
897 .map(|(index, (name, data_type))| {
898 let (field, des_proto) =
899 build_protobuf_field(data_type, (index + 1) as i32, name.to_owned())?;
900 if let Some(sv) = des_proto {
901 struct_vec.push(sv);
902 }
903 Ok(field)
904 })
905 .collect::<Result<Vec<_>>>()?;
906 proto.field = field_vec;
907 proto.nested_type = struct_vec;
908 Ok(proto)
909}
910
911fn build_protobuf_field(
912 data_type: &DataType,
913 index: i32,
914 name: String,
915) -> Result<(FieldDescriptorProto, Option<DescriptorProto>)> {
916 let mut field = FieldDescriptorProto {
917 name: Some(name.clone()),
918 number: Some(index),
919 ..Default::default()
920 };
921 match data_type {
922 DataType::Boolean => field.r#type = Some(field_descriptor_proto::Type::Bool.into()),
923 DataType::Int32 => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
924 DataType::Int16 | DataType::Int64 => {
925 field.r#type = Some(field_descriptor_proto::Type::Int64.into())
926 }
927 DataType::Float64 => field.r#type = Some(field_descriptor_proto::Type::Double.into()),
928 DataType::Decimal => field.r#type = Some(field_descriptor_proto::Type::String.into()),
929 DataType::Date => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
930 DataType::Varchar => field.r#type = Some(field_descriptor_proto::Type::String.into()),
931 DataType::Time => field.r#type = Some(field_descriptor_proto::Type::String.into()),
932 DataType::Timestamp => field.r#type = Some(field_descriptor_proto::Type::String.into()),
933 DataType::Timestamptz => field.r#type = Some(field_descriptor_proto::Type::String.into()),
934 DataType::Interval => field.r#type = Some(field_descriptor_proto::Type::String.into()),
935 DataType::Struct(s) => {
936 field.r#type = Some(field_descriptor_proto::Type::Message.into());
937 let name = format!("Struct{}", name);
938 let sub_proto = build_protobuf_schema(s.iter(), name.clone())?;
939 field.type_name = Some(name);
940 return Ok((field, Some(sub_proto)));
941 }
942 DataType::List(l) => {
943 let (mut field, proto) = build_protobuf_field(l.as_ref(), index, name.clone())?;
944 field.label = Some(field_descriptor_proto::Label::Repeated.into());
945 return Ok((field, proto));
946 }
947 DataType::Bytea => field.r#type = Some(field_descriptor_proto::Type::Bytes.into()),
948 DataType::Jsonb => field.r#type = Some(field_descriptor_proto::Type::String.into()),
949 DataType::Serial => field.r#type = Some(field_descriptor_proto::Type::Int64.into()),
950 DataType::Float32 | DataType::Int256 => {
951 return Err(SinkError::BigQuery(anyhow::anyhow!(
952 "Don't support Float32 and Int256"
953 )));
954 }
955 DataType::Map(_) => todo!(),
956 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
957 }
958 Ok((field, None))
959}
960
961#[cfg(test)]
962mod test {
963
964 use std::assert_matches::assert_matches;
965
966 use risingwave_common::catalog::{Field, Schema};
967 use risingwave_common::types::{DataType, StructType};
968
969 use crate::sink::big_query::{
970 BigQuerySink, build_protobuf_descriptor_pool, build_protobuf_schema,
971 };
972
973 #[tokio::test]
974 async fn test_type_check() {
975 let big_query_type_string = "ARRAY<STRUCT<v1 ARRAY<INT64>, v2 STRUCT<v1 INT64, v2 INT64>>>";
976 let rw_datatype = DataType::List(Box::new(DataType::Struct(StructType::new(vec![
977 ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
978 (
979 "v2".to_owned(),
980 DataType::Struct(StructType::new(vec![
981 ("v1".to_owned(), DataType::Int64),
982 ("v2".to_owned(), DataType::Int64),
983 ])),
984 ),
985 ]))));
986 assert_eq!(
987 BigQuerySink::get_string_and_check_support_from_datatype(&rw_datatype).unwrap(),
988 big_query_type_string
989 );
990 }
991
992 #[tokio::test]
993 async fn test_schema_check() {
994 let schema = Schema {
995 fields: vec![
996 Field::with_name(DataType::Int64, "v1"),
997 Field::with_name(DataType::Float64, "v2"),
998 Field::with_name(
999 DataType::List(Box::new(DataType::Struct(StructType::new(vec![
1000 ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
1001 (
1002 "v3".to_owned(),
1003 DataType::Struct(StructType::new(vec![
1004 ("v1".to_owned(), DataType::Int64),
1005 ("v2".to_owned(), DataType::Int64),
1006 ])),
1007 ),
1008 ])))),
1009 "v3",
1010 ),
1011 ],
1012 };
1013 let fields = schema
1014 .fields()
1015 .iter()
1016 .map(|f| (f.name.as_str(), &f.data_type));
1017 let desc = build_protobuf_schema(fields, "t1".to_owned()).unwrap();
1018 let pool = build_protobuf_descriptor_pool(&desc).unwrap();
1019 let t1_message = pool.get_message_by_name("t1").unwrap();
1020 assert_matches!(
1021 t1_message.get_field_by_name("v1").unwrap().kind(),
1022 prost_reflect::Kind::Int64
1023 );
1024 assert_matches!(
1025 t1_message.get_field_by_name("v2").unwrap().kind(),
1026 prost_reflect::Kind::Double
1027 );
1028 assert_matches!(
1029 t1_message.get_field_by_name("v3").unwrap().kind(),
1030 prost_reflect::Kind::Message(_)
1031 );
1032
1033 let v3_message = pool.get_message_by_name("t1.Structv3").unwrap();
1034 assert_matches!(
1035 v3_message.get_field_by_name("v1").unwrap().kind(),
1036 prost_reflect::Kind::Int64
1037 );
1038 assert!(v3_message.get_field_by_name("v1").unwrap().is_list());
1039
1040 let v3_v3_message = pool.get_message_by_name("t1.Structv3.Structv3").unwrap();
1041 assert_matches!(
1042 v3_v3_message.get_field_by_name("v1").unwrap().kind(),
1043 prost_reflect::Kind::Int64
1044 );
1045 assert_matches!(
1046 v3_v3_message.get_field_by_name("v2").unwrap().kind(),
1047 prost_reflect::Kind::Int64
1048 );
1049 }
1050}