1use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::sync::{Arc, LazyLock};
19
20use derive_builder::Builder;
21use iceberg::{Catalog, TableIdent};
22use iceberg_compaction_core::compaction::{
23 Compaction, CompactionType, RewriteDataFilesCommitManagerRetryConfig,
24};
25use iceberg_compaction_core::config::CompactionConfigBuilder;
26use iceberg_compaction_core::executor::RewriteFilesStat;
27use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
28use parquet::basic::Compression;
29use parquet::file::properties::WriterProperties;
30use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
31use risingwave_connector::sink::iceberg::IcebergConfig;
32use risingwave_pb::iceberg_compaction::IcebergCompactionTask;
33use thiserror_ext::AsReport;
34use tokio::sync::oneshot::Receiver;
35
36use super::HummockResult;
37use crate::hummock::HummockError;
38use crate::monitor::CompactorMetrics;
39
40static BERGLOOM_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
41 Box::new(PrometheusMetricsRegistry::new(
42 GLOBAL_METRICS_REGISTRY.clone(),
43 ))
44});
45
46pub struct IcebergCompactorRunner {
47 pub task_id: u64,
48 pub catalog: Arc<dyn Catalog>,
49 pub table_ident: TableIdent,
50 pub iceberg_config: IcebergConfig,
51
52 config: IcebergCompactorRunnerConfig,
53 metrics: Arc<CompactorMetrics>,
54}
55
56pub fn default_writer_properties() -> WriterProperties {
57 WriterProperties::builder()
58 .set_compression(Compression::SNAPPY)
59 .set_created_by(concat!("risingwave version ", env!("CARGO_PKG_VERSION")).to_owned())
60 .build()
61}
62
63#[derive(Builder, Debug, Clone)]
64pub struct IcebergCompactorRunnerConfig {
65 #[builder(default = "4")]
66 pub max_parallelism: u32,
67 #[builder(default = "1024 * 1024 * 1024")] pub min_size_per_partition: u64,
69 #[builder(default = "32")]
70 pub max_file_count_per_partition: u32,
71 #[builder(default = "1024 * 1024 * 1024")] pub target_file_size_bytes: u64,
73 #[builder(default = "false")]
74 pub enable_validate_compaction: bool,
75 #[builder(default = "1024")]
76 pub max_record_batch_rows: usize,
77 #[builder(default = "default_writer_properties()")]
78 pub write_parquet_properties: WriterProperties,
79}
80
81#[derive(Debug, Clone)]
82pub(crate) struct RunnerContext {
83 pub max_available_parallelism: u32,
84 pub running_task_parallelism: Arc<AtomicU32>,
85}
86
87impl RunnerContext {
88 pub fn new(max_available_parallelism: u32, running_task_parallelism: Arc<AtomicU32>) -> Self {
89 Self {
90 max_available_parallelism,
91 running_task_parallelism,
92 }
93 }
94
95 pub fn is_available_parallelism_sufficient(&self, input_parallelism: u32) -> bool {
96 (self.max_available_parallelism - self.running_task_parallelism.load(Ordering::SeqCst))
97 >= input_parallelism
98 }
99
100 pub fn incr_running_task_parallelism(&self, increment: u32) {
101 self.running_task_parallelism
102 .fetch_add(increment, Ordering::SeqCst);
103 }
104
105 pub fn decr_running_task_parallelism(&self, decrement: u32) {
106 self.running_task_parallelism
107 .fetch_sub(decrement, Ordering::SeqCst);
108 }
109}
110
111impl IcebergCompactorRunner {
112 pub async fn new(
113 iceberg_compaction_task: IcebergCompactionTask,
114 config: IcebergCompactorRunnerConfig,
115 metrics: Arc<CompactorMetrics>,
116 ) -> HummockResult<Self> {
117 let IcebergCompactionTask { task_id, props } = iceberg_compaction_task;
118 let iceberg_config = IcebergConfig::from_btreemap(BTreeMap::from_iter(props.into_iter()))
119 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
120 let catalog = iceberg_config
121 .create_catalog()
122 .await
123 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
124 let table_ident = iceberg_config
125 .full_table_name()
126 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
127
128 Ok(Self {
129 task_id,
130 catalog,
131 table_ident,
132 iceberg_config,
133 config,
134 metrics,
135 })
136 }
137
138 pub fn calculate_task_parallelism_impl(
139 task_id_for_logging: u64,
140 table_ident_for_logging: &TableIdent,
141 task_statistics: &IcebergCompactionTaskStatistics,
142 max_parallelism: u32,
143 min_size_per_partition: u64,
144 max_file_count_per_partition: u32,
145 target_file_size_bytes: u64,
146 ) -> HummockResult<(u32, u32)> {
147 if max_parallelism == 0 {
148 return Err(HummockError::compaction_executor(
149 "Max parallelism cannot be 0".to_owned(),
150 ));
151 }
152
153 let total_file_size_for_partitioning = task_statistics.total_data_file_size
154 + task_statistics.total_pos_del_file_size
155 + task_statistics.total_eq_del_file_size;
156 if total_file_size_for_partitioning == 0 {
157 tracing::warn!(
160 task_id = task_id_for_logging,
161 table = ?table_ident_for_logging,
162 "Total data file size is 0, setting partition_by_size to 0."
163 );
164
165 return Err(HummockError::compaction_executor(
166 "No files to calculate_task_parallelism".to_owned(),
167 )); }
169
170 let partition_by_size = total_file_size_for_partitioning
171 .div_ceil(min_size_per_partition)
172 .max(1) as u32; let total_files_count_for_partitioning = task_statistics.total_data_file_count
175 + task_statistics.total_pos_del_file_count
176 + task_statistics.total_eq_del_file_count;
177
178 let partition_by_count = total_files_count_for_partitioning
179 .div_ceil(max_file_count_per_partition)
180 .max(1); let input_parallelism = partition_by_size
183 .max(partition_by_count)
184 .min(max_parallelism);
185
186 let mut output_parallelism = partition_by_size
190 .min(input_parallelism)
191 .min(max_parallelism);
192
193 if task_statistics.total_data_file_size > 0 && task_statistics.total_data_file_size < target_file_size_bytes
197 && output_parallelism > 1
198 {
199 tracing::debug!(
200 task_id = task_id_for_logging,
201 table = ?table_ident_for_logging,
202 total_data_file_size = task_statistics.total_data_file_size,
203 target_file_size_bytes = target_file_size_bytes,
204 original_output_parallelism = output_parallelism,
205 "Total data size is less than target file size, forcing output_parallelism to 1."
206 );
207 output_parallelism = 1;
208 }
209
210 tracing::debug!(
211 task_id = task_id_for_logging,
212 table = ?table_ident_for_logging,
213 stats = ?task_statistics,
214 config_max_parallelism = max_parallelism,
215 config_min_size_per_partition = min_size_per_partition,
216 config_max_file_count_per_partition = max_file_count_per_partition,
217 config_target_file_size_bytes = target_file_size_bytes,
218 calculated_partition_by_size = partition_by_size,
219 calculated_partition_by_count = partition_by_count,
220 final_input_parallelism = input_parallelism,
221 final_output_parallelism = output_parallelism,
222 "Calculated task parallelism"
223 );
224
225 Ok((input_parallelism, output_parallelism))
226 }
227
228 fn calculate_task_parallelism(
229 &self,
230 task_statistics: &IcebergCompactionTaskStatistics,
231 max_parallelism: u32,
232 min_size_per_partition: u64,
233 max_file_count_per_partition: u32,
234 target_file_size_bytes: u64,
235 ) -> HummockResult<(u32, u32)> {
236 if max_parallelism == 0 {
237 return Err(HummockError::compaction_executor(
238 "Max parallelism cannot be 0".to_owned(),
239 ));
240 }
241
242 Self::calculate_task_parallelism_impl(
243 self.task_id,
244 &self.table_ident,
245 task_statistics,
246 max_parallelism,
247 min_size_per_partition,
248 max_file_count_per_partition,
249 target_file_size_bytes,
250 )
251 }
252
253 pub async fn compact(
254 self,
255 context: RunnerContext,
256 shutdown_rx: Receiver<()>,
257 ) -> HummockResult<()> {
258 let task_id = self.task_id;
259 let now = std::time::Instant::now();
260
261 let compact = async move {
262 let retry_config = RewriteDataFilesCommitManagerRetryConfig::default();
263 let statistics = self
264 .analyze_task_statistics()
265 .await
266 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
267
268 let (input_parallelism, output_parallelism) = self
269 .calculate_task_parallelism(
270 &statistics,
271 self.config.max_parallelism,
272 self.config.min_size_per_partition,
273 self.config.max_file_count_per_partition,
274 self.config.target_file_size_bytes,
275 )
276 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
277
278 if !context.is_available_parallelism_sufficient(input_parallelism) {
279 tracing::warn!(
280 "Available parallelism is less than input parallelism {} task {} will not run",
281 input_parallelism,
282 task_id
283 );
284 return Err(HummockError::compaction_executor(
285 "Available parallelism is less than input parallelism",
286 ));
287 }
288
289 let compaction_config = Arc::new(
290 CompactionConfigBuilder::default()
291 .output_parallelism(output_parallelism as usize)
292 .executor_parallelism(input_parallelism as usize)
293 .target_file_size(self.config.target_file_size_bytes)
294 .enable_validate_compaction(self.config.enable_validate_compaction)
295 .max_record_batch_rows(self.config.max_record_batch_rows)
296 .write_parquet_properties(self.config.write_parquet_properties.clone())
297 .build()
298 .unwrap_or_else(|e| {
299 panic!(
300 "Failed to build iceberg compaction write props: {:?}",
301 e.as_report()
302 );
303 }),
304 );
305
306 tracing::info!(
307 task_id = task_id,
308 table = ?self.table_ident,
309 input_parallelism,
310 output_parallelism,
311 statistics = ?statistics,
312 compaction_config = ?compaction_config,
313 "Iceberg compaction task started",
314 );
315
316 let compaction = Compaction::builder()
317 .with_catalog(self.catalog.clone())
318 .with_catalog_name(self.iceberg_config.catalog_name())
319 .with_compaction_type(CompactionType::Full)
320 .with_config(compaction_config)
321 .with_table_ident(self.table_ident.clone())
322 .with_executor_type(iceberg_compaction_core::executor::ExecutorType::DataFusion)
323 .with_registry(BERGLOOM_METRICS_REGISTRY.clone())
324 .with_retry_config(retry_config)
325 .build()
326 .await
327 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
328
329 context.incr_running_task_parallelism(input_parallelism);
330 self.metrics.compact_task_pending_num.inc();
331 self.metrics
332 .compact_task_pending_parallelism
333 .add(input_parallelism as _);
334
335 let _release_guard = scopeguard::guard(
336 (input_parallelism, context.clone(), self.metrics.clone()), |(val, ctx, metrics_guard)| {
338 ctx.decr_running_task_parallelism(val);
339 metrics_guard.compact_task_pending_num.dec();
340 metrics_guard.compact_task_pending_parallelism.sub(val as _);
341 },
342 );
343
344 let stat = compaction
345 .compact()
346 .await
347 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
348 Ok::<RewriteFilesStat, HummockError>(stat)
349 };
350
351 tokio::select! {
352 _ = shutdown_rx => {
353 tracing::info!(task_id = task_id, "Iceberg compaction task cancelled");
354 }
355 stat = compact => {
356 match stat {
357 Ok(stat) => {
358 tracing::info!(
359 task_id = task_id,
360 elapsed_millis = now.elapsed().as_millis(),
361 stat = ?stat,
362 "Iceberg compaction task finished",
363 );
364 }
365
366 Err(e) => {
367 tracing::warn!(
368 error = %e.as_report(),
369 task_id = task_id,
370 "Iceberg compaction task failed with error",
371 );
372 }
373 }
374 }
375 }
376
377 Ok(())
378 }
379
380 async fn analyze_task_statistics(&self) -> HummockResult<IcebergCompactionTaskStatistics> {
381 let table = self
382 .catalog
383 .load_table(&self.table_ident)
384 .await
385 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
386 let manifest_list = table
387 .metadata()
388 .current_snapshot()
389 .ok_or_else(|| HummockError::compaction_executor("Don't find current_snapshot"))?
390 .load_manifest_list(table.file_io(), table.metadata())
391 .await
392 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
393
394 let mut total_data_file_size: u64 = 0;
395 let mut total_data_file_count = 0;
396 let mut total_pos_del_file_size: u64 = 0;
397 let mut total_pos_del_file_count = 0;
398 let mut total_eq_del_file_size: u64 = 0;
399 let mut total_eq_del_file_count = 0;
400
401 for manifest_file in manifest_list.entries() {
402 let manifest = manifest_file
403 .load_manifest(table.file_io())
404 .await
405 .map_err(|e| HummockError::compaction_executor(e.as_report()))?;
406 let (entry, _) = manifest.into_parts();
407 for i in entry {
408 match i.content_type() {
409 iceberg::spec::DataContentType::Data => {
410 total_data_file_size += i.data_file().file_size_in_bytes();
411 total_data_file_count += 1;
412 }
413 iceberg::spec::DataContentType::EqualityDeletes => {
414 total_eq_del_file_size += i.data_file().file_size_in_bytes();
415 total_eq_del_file_count += 1;
416 }
417 iceberg::spec::DataContentType::PositionDeletes => {
418 total_pos_del_file_size += i.data_file().file_size_in_bytes();
419 total_pos_del_file_count += 1;
420 }
421 }
422 }
423 }
424
425 Ok(IcebergCompactionTaskStatistics {
426 total_data_file_size,
427 total_data_file_count: total_data_file_count as u32,
428 total_pos_del_file_size,
429 total_pos_del_file_count: total_pos_del_file_count as u32,
430 total_eq_del_file_size,
431 total_eq_del_file_count: total_eq_del_file_count as u32,
432 })
433 }
434}
435
436pub struct IcebergCompactionTaskStatistics {
437 pub total_data_file_size: u64,
438 pub total_data_file_count: u32,
439 pub total_pos_del_file_size: u64,
440 pub total_pos_del_file_count: u32,
441 pub total_eq_del_file_size: u64,
442 pub total_eq_del_file_count: u32,
443}
444
445impl Debug for IcebergCompactionTaskStatistics {
446 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
447 f.debug_struct("IcebergCompactionTaskStatistics")
448 .field("total_data_file_size", &self.total_data_file_size)
449 .field("total_data_file_count", &self.total_data_file_count)
450 .field("total_pos_del_file_size", &self.total_pos_del_file_size)
451 .field("total_pos_del_file_count", &self.total_pos_del_file_count)
452 .field("total_eq_del_file_size", &self.total_eq_del_file_size)
453 .field("total_eq_del_file_count", &self.total_eq_del_file_count)
454 .finish()
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use iceberg::TableIdent;
461
462 use super::*;
463
464 fn default_test_config_params() -> IcebergCompactorRunnerConfig {
465 IcebergCompactorRunnerConfigBuilder::default()
466 .max_parallelism(4)
467 .min_size_per_partition(1024 * 1024 * 100) .max_file_count_per_partition(10)
469 .target_file_size_bytes(1024 * 1024 * 128) .build()
471 .unwrap()
472 }
473
474 #[test]
475 fn test_calculate_parallelism_no_files() {
476 let config = default_test_config_params();
477 let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
478 let stats = IcebergCompactionTaskStatistics {
479 total_data_file_size: 0,
480 total_data_file_count: 0,
481 total_pos_del_file_size: 0,
482 total_pos_del_file_count: 0,
483 total_eq_del_file_size: 0,
484 total_eq_del_file_count: 0,
485 };
486 let result = IcebergCompactorRunner::calculate_task_parallelism_impl(
487 1,
488 &dummy_table_ident,
489 &stats,
490 config.max_parallelism,
491 config.min_size_per_partition,
492 config.max_file_count_per_partition,
493 config.target_file_size_bytes,
494 );
495
496 assert!(result.is_err());
497 if let Err(e) = result {
498 assert!(
499 e.to_string()
500 .contains("No files to calculate_task_parallelism")
501 );
502 }
503 }
504
505 #[test]
506 fn test_calculate_parallelism_size_dominant() {
507 let config = default_test_config_params();
508 let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
509 let stats = IcebergCompactionTaskStatistics {
510 total_data_file_size: 1024 * 1024 * 500, total_data_file_count: 5, total_pos_del_file_size: 0,
513 total_pos_del_file_count: 0,
514 total_eq_del_file_size: 0,
515 total_eq_del_file_count: 0,
516 };
517 let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
523 1,
524 &dummy_table_ident,
525 &stats,
526 config.max_parallelism,
527 config.min_size_per_partition,
528 config.max_file_count_per_partition,
529 config.target_file_size_bytes,
530 )
531 .unwrap();
532 assert_eq!(input_p, 4);
533 assert_eq!(output_p, 4);
534 }
535
536 #[test]
537 fn test_calculate_parallelism_count_dominant() {
538 let config = default_test_config_params();
539 let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
540 let stats = IcebergCompactionTaskStatistics {
541 total_data_file_size: 1024 * 1024 * 50, total_data_file_count: 35, total_pos_del_file_size: 0,
544 total_pos_del_file_count: 0,
545 total_eq_del_file_size: 0,
546 total_eq_del_file_count: 0,
547 };
548 let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
554 1,
555 &dummy_table_ident,
556 &stats,
557 config.max_parallelism,
558 config.min_size_per_partition,
559 config.max_file_count_per_partition,
560 config.target_file_size_bytes,
561 )
562 .unwrap();
563 assert_eq!(input_p, 4);
564 assert_eq!(output_p, 1);
565 }
566
567 #[test]
568 fn test_calculate_parallelism_max_parallelism_cap() {
569 let mut config = default_test_config_params();
570 config.max_parallelism = 2; let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
572 let stats = IcebergCompactionTaskStatistics {
573 total_data_file_size: 1024 * 1024 * 500, total_data_file_count: 35, total_pos_del_file_size: 0,
576 total_pos_del_file_count: 0,
577 total_eq_del_file_size: 0,
578 total_eq_del_file_count: 0,
579 };
580 let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
586 1,
587 &dummy_table_ident,
588 &stats,
589 config.max_parallelism,
590 config.min_size_per_partition,
591 config.max_file_count_per_partition,
592 config.target_file_size_bytes,
593 )
594 .unwrap();
595 assert_eq!(input_p, 2);
596 assert_eq!(output_p, 2);
597 }
598
599 #[test]
600 fn test_calculate_parallelism_small_data_heuristic() {
601 let config = default_test_config_params(); let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
603 let stats_for_heuristic = IcebergCompactionTaskStatistics {
612 total_data_file_size: 1024 * 1024 * 60, total_data_file_count: 5, total_pos_del_file_size: 1024 * 1024 * 250, total_pos_del_file_count: 2,
616 total_eq_del_file_size: 0,
617 total_eq_del_file_count: 0,
618 };
619 let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
629 1,
630 &dummy_table_ident,
631 &stats_for_heuristic,
632 config.max_parallelism,
633 config.min_size_per_partition,
634 config.max_file_count_per_partition,
635 config.target_file_size_bytes,
636 )
637 .unwrap();
638 assert_eq!(input_p, 4);
639 assert_eq!(output_p, 1);
640 }
641
642 #[test]
643 fn test_calculate_parallelism_all_empty_files() {
644 let config = default_test_config_params();
645 let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
646 let stats = IcebergCompactionTaskStatistics {
647 total_data_file_size: 0,
648 total_data_file_count: 5, total_pos_del_file_size: 0,
650 total_pos_del_file_count: 0,
651 total_eq_del_file_size: 0,
652 total_eq_del_file_count: 0,
653 };
654 let result = IcebergCompactorRunner::calculate_task_parallelism_impl(
656 1,
657 &dummy_table_ident,
658 &stats,
659 config.max_parallelism,
660 config.min_size_per_partition,
661 config.max_file_count_per_partition,
662 config.target_file_size_bytes,
663 );
664 assert!(result.is_err());
665 if let Err(e) = result {
666 assert!(
667 e.to_string()
668 .contains("No files to calculate_task_parallelism")
669 );
670 }
671 }
672
673 #[test]
674 fn test_calculate_parallelism_max_parallelism_zero() {
675 let mut config = default_test_config_params();
676 config.max_parallelism = 0; let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
678 let stats = IcebergCompactionTaskStatistics {
679 total_data_file_size: 1024 * 1024 * 50, total_data_file_count: 5,
681 total_pos_del_file_size: 0,
682 total_pos_del_file_count: 0,
683 total_eq_del_file_size: 0,
684 total_eq_del_file_count: 0,
685 };
686 let result = IcebergCompactorRunner::calculate_task_parallelism_impl(
688 1,
689 &dummy_table_ident,
690 &stats,
691 config.max_parallelism,
692 config.min_size_per_partition,
693 config.max_file_count_per_partition,
694 config.target_file_size_bytes,
695 );
696 assert!(result.is_err());
697 if let Err(e) = result {
698 assert!(e.to_string().contains("Max parallelism cannot be 0"));
699 }
700 }
701
702 #[test]
703 fn test_calculate_parallelism_only_delete_files() {
704 let config = default_test_config_params();
705 let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
706 let stats = IcebergCompactionTaskStatistics {
707 total_data_file_size: 0,
708 total_data_file_count: 0,
709 total_pos_del_file_size: 1024 * 1024 * 20, total_pos_del_file_count: 2,
711 total_eq_del_file_size: 0,
712 total_eq_del_file_count: 0,
713 };
714 let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
723 1,
724 &dummy_table_ident,
725 &stats,
726 config.max_parallelism,
727 config.min_size_per_partition,
728 config.max_file_count_per_partition,
729 config.target_file_size_bytes,
730 )
731 .unwrap();
732 assert_eq!(input_p, 1);
733 assert_eq!(output_p, 1);
734 }
735
736 #[test]
737 fn test_calculate_parallelism_zero_file_count_non_zero_size() {
738 let config = default_test_config_params();
739 let dummy_table_ident = TableIdent::from_strs(["db", "schema", "table"]).unwrap();
740 let stats = IcebergCompactionTaskStatistics {
741 total_data_file_size: 1024 * 1024 * 200, total_data_file_count: 0, total_pos_del_file_size: 0,
744 total_pos_del_file_count: 0,
745 total_eq_del_file_size: 0,
746 total_eq_del_file_count: 0,
747 };
748 let (input_p, output_p) = IcebergCompactorRunner::calculate_task_parallelism_impl(
756 1,
757 &dummy_table_ident,
758 &stats,
759 config.max_parallelism,
760 config.min_size_per_partition,
761 config.max_file_count_per_partition,
762 config.target_file_size_bytes,
763 )
764 .unwrap();
765 assert_eq!(input_p, 2);
766 assert_eq!(output_p, 2);
767 }
768}