Introduction

This guide is intended to be used by contributors to learn about how to develop RisingWave. The instructions about how to submit code changes are included in contribution guidelines.

If you have questions, you can search for existing discussions or start a new discussion in the Discussions forum of RisingWave, or ask in the RisingWave Community channel on Slack. Please use the invitation link to join the channel.

To report bugs, create a GitHub issue.

Note: the url was previously for the crate rustdocs, and now they are moved to path /risingwave/rustdoc

Read the design docs

Before you start to make code changes, ensure that you understand the design and implementation of RisingWave. We recommend that you read the design docs listed in docs/README.md first.

You can also read the crate level documentation for implementation details, or run ./risedev doc to read it locally.

Learn about the code structure

  • The src folder contains all of the kernel components, refer to src/README.md for more details, which contains more details about Design Patterns in RisingWave.
  • The docker folder contains Docker files to build and start RisingWave.
  • The e2e_test folder contains the latest end-to-end test cases.
  • The docs folder contains the design docs. If you want to learn about how RisingWave is designed and implemented, check out the design docs here.
  • The dashboard folder contains RisingWave dashboard.

Contribution guidelines

Thanks for your interest in contributing to RisingWave! We welcome and appreciate contributions.

This document describes how to submit your code changes. To learn about the development process, see other chapters of the book. To understand the design and implementation of RisingWave, refer to the design docs listed in docs/README.md.

If you have questions, you can search for existing discussions or start a new discussion in the Discussions forum of RisingWave, or ask in the RisingWave Community channel on Slack. Please use the invitation link to join the channel.

To report bugs, create a GitHub issue.

Tests and miscellaneous checks

Before submitting your code changes, ensure you fully test them and perform necessary checks. The testing instructions and necessary checks are detailed in other sections of the book.

Submit a PR

Pull Request title

As described in here, a valid PR title should begin with one of the following prefixes:

  • feat: A new feature
  • fix: A bug fix
  • doc: Documentation only changes
  • refactor: A code change that neither fixes a bug nor adds a feature
  • style: A refactoring that improves code style
  • perf: A code change that improves performance
  • test: Adding missing tests or correcting existing tests
  • build: Changes that affect the build system or external dependencies (example scopes: .config, .cargo, Cargo.toml)
  • ci: Changes to RisingWave CI configuration files and scripts (example scopes: .github, ci (Buildkite))
  • chore: Other changes that don’t modify src or test files
  • revert: Reverts a previous commit

For example, a PR title could be:

  • refactor: modify executor protobuf package path
  • feat(execution): enable comparison between nullable data arrays, where (execution) means that this PR mainly focuses on the execution component.

You may also check out previous PRs in the PR list.

Pull Request description

  • If your PR is small (such as a typo fix), you can go brief.
  • If it is large and you have changed a lot, it’s better to write more details.

Sign the CLA

Contributors will need to sign RisingWave Labs’ CLA.

Cherry pick the commit to release candidate branch

We have a GitHub Action to help cherry-pick commits from main branch to a release candidate branch, such as v*.*.*-rc where * is a number.

Checkout details at: https://github.com/risingwavelabs/risingwave/blob/main/.github/workflows/cherry-pick-to-release-branch.yml

To trigger the action, we give a correct label to the PR on main branch : https://github.com/risingwavelabs/risingwave/blob/main/.github/workflows/cherry-pick-to-release-branch.yml#L10

It will act when the PR on main branch merged:

  • If git cherry-pick does not find any conflicts, it will open a PR to the release candidate branch, and assign the original author as the reviewer.

  • If there is a conflict, it will open an issue and make the original author the assignee.

How to build and run RisingWave

Set up the development environment

macOS

To install the dependencies on macOS, run:

brew install postgresql cmake protobuf tmux cyrus-sasl lld openssl@3
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

Debian-based Linux

To install the dependencies on Debian-based Linux systems, run:

sudo apt install make build-essential cmake protobuf-compiler curl postgresql-client tmux lld pkg-config libssl-dev libsasl2-dev
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

nix shell

If you use nix, you can also enter the nix shell via:

nix develop ./develop/nix

All dependencies will be automatically downloaded and configured.

You can also use direnv to automatically enter the nix shell:

direnv allow

Check out flake.nix to read more information!

Then you’ll be able to compile and start RisingWave!

.cargo/config.toml contains rustflags configurations like -Clink-arg and -Ctarget-feature. Since it will be merged with $HOME/.cargo/config.toml, check the config files and make sure they don’t conflict if you have global rustflags configurations for e.g. linker there.

If you want to build RisingWave with embedded-python-udf feature, you need to install Python 3.12.

To install Python 3.12 on macOS, run:

brew install python@3.12

To install Python 3.12 on Debian-based Linux systems, run:

sudo apt install software-properties-common
sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt-get update
sudo apt-get install python3.12 python3.12-dev

If the default python3 version is not 3.12, please set the PYO3_PYTHON environment variable:

export PYO3_PYTHON=python3.12

Start and monitor a dev cluster

RiseDev is the RisingWave developers’ tool. You can now use RiseDev to start a dev cluster. It is as simple as:

./risedev d                        # shortcut for ./risedev dev
psql -h localhost -p 4566 -d dev -U root

The default dev cluster includes meta-node, compute-node and frontend-node processes, and an embedded volatile in-memory state storage. No data will be persisted. This configuration is intended to make it easier to develop and debug RisingWave.

To stop the cluster:

./risedev k # shortcut for ./risedev kill

To view the logs:

./risedev l # shortcut for ./risedev logs

To clean local data and logs:

./risedev clean-data

Tips for compilation

If you detect memory bottlenecks while compiling, either allocate some disk space on your computer as swap memory, or lower the compilation parallelism with CARGO_BUILD_JOBS, e.g. CARGO_BUILD_JOBS=2.

Configure additional components

There are a few additional components supported by RiseDev.

Use the ./risedev configure command to enable and disable components.

  • Hummock (MinIO + MinIO-CLI): Enable this component to persist state data.
  • Prometheus and Grafana: Enable this component to view RisingWave metrics. You can view the metrics through a built-in Grafana dashboard.
  • Etcd: Enable this component if you want to persist metadata node data.
  • Kafka: Enable this component if you want to create a streaming source from a Kafka topic.
  • Grafana Tempo: Use this component for tracing.

Enabling a component with the ./risedev configure command will only download the component to your environment. To allow it to function, you must revise the corresponding configuration setting in risedev.yml and restart the dev cluster.

For example, you can modify the default section to:

  default:
    - use: minio
    - use: meta-node
    - use: compute-node
    - use: frontend
    - use: prometheus
    - use: grafana
    - use: kafka
      persist-data: true

Now you can run ./risedev d to start a new dev cluster. The new dev cluster will contain components as configured in the yaml file. RiseDev will automatically configure the components to use the available storage service and to monitor the target.

You may also add multiple compute nodes in the cluster. The ci-3cn-1fe config is an example.

Configure system variables

You can check src/common/src/config.rs to see all the configurable variables. If additional variables are needed, include them in the correct sections (such as [server] or [storage]) in src/config/risingwave.toml.

Start the playground

If you do not need to start a full cluster to develop, you can issue ./risedev p to start the playground, where the metadata node, compute nodes and frontend nodes are running in the same process. Logs are printed to stdout instead of separate log files.

./risedev p # shortcut for ./risedev playground

For more information, refer to README.md under src/risedevtool.

You can also start the playground with cargo directly:

cargo run --bin risingwave -- playground

Then, connect to the playground instance via:

psql -h localhost -p 4566 -d dev -U root

Build Profiles

RisingWave uses Cargo profiles to manage build settings. To briefly introduce Cargo profiles, here is a snippet from the Cargo References:

Profiles provide a way to alter the compiler settings, influencing things like optimizations and debugging symbols.

Cargo has 4 built-in profiles: dev, release, test, and bench. The profile is automatically chosen based on which command is being run if a profile is not specified on the command-line. In addition to the built-in profiles, custom user-defined profiles can also be specified.

All profiles talked in this document are defined in the Cargo.toml file of the root directory of the project. Please always refer to it for the most up-to-date information.

Built-in Profiles

RisingWave tweaks some settings of the built-in profiles to better fit its needs, in the sections of [profile.<built-in-profile>] in Cargo.toml. For example,

  • dev: for local development and testing

    • completely disables LTO to speed up the build time
  • release: for local testing with near-production performance

    • completely disables LTO to speed up the build time
    • embeds full debug information to help with debugging in production

Custom Profiles

RisingWave also defines some custom profiles that inherit from the built-in profiles, in the sections of [profile.<custom-profile>] in Cargo.toml. For example,

  • production: for distribution and production deployment

    • inherits from release
    • enables aggressive code optimizations (like LTO) for maximum performance, at the cost of significantly increased build time
  • ci-dev: for pull-request pipelines in CI

    • inherits from dev
    • tweaks some settings to reduce the build time and binary size
    • enables code optimizations for 3rd-party dependencies to improve CI performance
  • ci-release: for main and main-cron pipelines in CI

    • inherits from release
    • tweaks some settings to reduce the build time and binary size
    • enables more runtime checks (like debug assertions and overflow checks) to catch potential bugs
  • ci-sim: for madsim simulation tests in CI

    • similar to ci-dev
    • enables slight code optimizations for all crates to improve CI performance under single-threaded madsim execution

Comparisons

To give a better idea of the differences between the profiles, here is a matrix comparing the profiles:

ProfileDebug Infocfg(debug_assertions)PerformanceBuild Time
devFulltrueBadFastest
releaseFullfalseGoodSlow
productionFullfalseBestSlowest
ci-devBacktrace onlytrueMediumFast
ci-releaseBacktrace onlytrueGoodSlow
ci-simBacktrace onlytrueMediumMedium

Some miscellaneous notes:

  • Compared to “Backtrace only”, “Full” debug information additionally includes the capability to attach a debugger at runtime or on core dumps, to inspect variables and stack traces.
  • There are also other subtle differences like incremental compilation settings, overflow checks, and more. They are not listed here for brevity.
  • cfg(debug_assertions) can be roughly used to determine whether it’s a production environment or not. Note that even though ci-release contains release in its name, the debug_assertions are still enabled.

Choose a Profile

By default, RisingWave (and RiseDev) uses the dev profile for local development and testing. To use release profile instead, you can set the corresponding configuration entry by running risedev configure. Other profiles are for their specific use cases and are not meant to be used directly by developers, thus not available with RiseDev.

Testing

Before you submit a PR, fully test the code changes and perform necessary checks.

The RisingWave project enforces several checks in CI. Every time the code is modified, you need to perform the checks and ensure they pass.

Lint

RisingWave requires all code to pass fmt, clippy, sort and hakari checks. Run the following commands to install test tools and perform these checks.

./risedev install-tools # Install required tools for running unit tests
./risedev c             # Run all checks. Shortcut for ./risedev check

There are also some miscellaneous checks. See ci/scripts/misc-check.sh.

Unit and integration tests

RiseDev runs unit tests with cargo-nextest. To run unit tests:

./risedev test          # Run unit tests

Some ideas and caveats for writing tests:

  • Use expect_test to write data driven tests that can automatically update results.

  • It’s recommended to write new tests as integration tests (i.e. in tests/ directory) instead of unit tests (i.e. in src/ directory).

    Besides, put integration tests under tests/integration_tests/*.rs, instead of tests/*.rs. See Delete Cargo Integration Tests and #9878, for more details.

You might want to read How to Test for more good ideas on testing.

Planner tests

RisingWave’s SQL frontend has SQL planner tests.

End-to-end tests

We use sqllogictest-rs to run RisingWave e2e tests.

Refer to Sqllogictest .slt Test File Format Cookbook for the syntax.

Before running end-to-end tests, you will need to start a full cluster first:

./risedev d

Then to run the end-to-end tests, you can use one of the following commands according to which component you are developing:

# run all streaming tests
./risedev slt-streaming -p 4566 -d dev -j 1
# run all batch tests
./risedev slt-batch -p 4566 -d dev -j 1
# run both
./risedev slt-all -p 4566 -d dev -j 1

Use -j 1 to create a separate database for each test case, which can ensure that previous test case failure won’t affect other tests due to table cleanups.

Alternatively, you can also run some specific tests:

# run a single test
./risedev slt -p 4566 -d dev './e2e_test/path/to/file.slt'
# run all tests under a directory (including subdirectories)
./risedev slt -p 4566 -d dev './e2e_test/path/to/directory/**/*.slt'

After running e2e tests, you may kill the cluster and clean data.

./risedev k  # shortcut for ./risedev kill
./risedev clean-data

RisingWave’s codebase is constantly changing. The persistent data might not be stable. In case of unexpected decode errors, try ./risedev clean-data first.

Fuzzing tests

SqlSmith

Currently, SqlSmith supports for e2e and frontend fuzzing. Take a look at Fuzzing tests for more details on running it locally.

DocSlt tests

As introduced in #5117, DocSlt tool allows you to write SQL examples in sqllogictest syntax in Rust doc comments. After adding or modifying any such SQL examples, you should run the following commands to generate and run e2e tests for them.

# generate e2e tests from doc comments for all default packages
./risedev docslt
# or, generate for only modified package
./risedev docslt -p risingwave_expr

# run all generated e2e tests
./risedev slt-generated -p 4566 -d dev
# or, run only some of them
./risedev slt -p 4566 -d dev './e2e_test/generated/docslt/risingwave_expr/**/*.slt'

These will be run on CI as well.

Deterministic simulation tests

Deterministic simulation is a powerful tool to efficiently search bugs and reliably reproduce them. In case you are not familiar with this technique, here is a talk and a blog post for brief introduction.

See also the blog posts for a detailed writeup:

Unit and e2e tests

You can run normal unit tests and end-to-end tests in deterministic simulation mode.

# run deterministic unit test
./risedev stest
# run deterministic end-to-end test
./risedev sslt -- './e2e_test/path/to/directory/**/*.slt'

When your program panics, the simulator will print the random seed of this run:

thread '<unnamed>' panicked at '...',
note: run with `MADSIM_TEST_SEED=1` environment variable to reproduce this error

Then you can reproduce the bug with the given seed:

# set the random seed to reproduce a run
MADSIM_TEST_SEED=1 RUST_LOG=info ./risedev sslt -- './e2e_test/path/to/directory/**/*.slt'

More advanced usages are listed below:

# run multiple times with different seeds to test reliability
# it's recommended to build in release mode for a fast run
MADSIM_TEST_NUM=100 ./risedev sslt --release -- './e2e_test/path/to/directory/**/*.slt'

# configure cluster nodes (by default: 2fe+3cn)
./risedev sslt -- --compute-nodes 2 './e2e_test/path/to/directory/**/*.slt'

# inject failures to test fault recovery
./risedev sslt -- --kill-meta --etcd-timeout-rate=0.01 './e2e_test/path/to/directory/**/*.slt'

# see more usages
./risedev sslt -- --help

Deterministic test is included in CI as well. See CI script for details.

Integration tests

src/tests/simulation contains some special integration tests that are designed to be run in deterministic simulation mode. In these tests, we have more fine-grained control over the cluster and the execution environment to test some complex cases that are hard to test in normal environments.

To run these tests:

./risedev sit-test <test_name>

Sometimes in CI you may see a backtrace, followed by an error message with a MADSIM_TEST_SEED:

 161: madsim::sim::task::Executor::block_on
             at /risingwave/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.22/src/sim/task/mod.rs:238:13
 162: madsim::sim::runtime::Runtime::block_on
             at /risingwave/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.22/src/sim/runtime/mod.rs:126:9
 163: madsim::sim::runtime::builder::Builder::run::{{closure}}::{{closure}}::{{closure}}
             at /risingwave/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.22/src/sim/runtime/builder.rs:128:35
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
context: node=6 "compute-1", task=2237 (spawned at /risingwave/src/stream/src/task/stream_manager.rs:689:34)
note: run with `MADSIM_TEST_SEED=2` environment variable to reproduce this error

You may use that to reproduce it in your local environment. For example:

MADSIM_TEST_SEED=4 ./risedev sit-test test_backfill_with_upstream_and_snapshot_read

Backwards compatibility tests

This tests backwards compatibility between the earliest minor version and latest minor version of Risingwave (e.g. 1.0.0 vs 1.1.0).

You can run it locally with:

./risedev backwards-compat-test

In CI, you can make sure the PR runs it by adding the label ci/run-backwards-compat-tests.

Debugging

Debug playground using vscode

To step through risingwave locally with a debugger you can use the launch.json and the tasks.json provided in vscode_suggestions. After adding these files to your local .vscode folder you can debug and set breakpoints by launching Launch 'risingwave p' debug.

Observability

RiseDev supports several observability components.

Cluster Control

risectl is the tool for providing internal access to the RisingWave cluster. See

cargo run --bin risectl -- --help

… or

./risedev ctl --help

for more information.

Monitoring

Uncomment grafana and prometheus lines in risedev.yml to enable Grafana and Prometheus services.

Tracing

Compute nodes support streaming tracing. Tracing is not enabled by default. You need to use ./risedev configure to download the tracing components first. After that, you will need to uncomment tempo

service in risedev.yml and start a new dev cluster to allow the components to work.

Traces are visualized in Grafana. You may also want to uncomment grafana service in risedev.yml to enable it.

Dashboard

You may use RisingWave Dashboard to see actors in the system. It will be started along with meta node, and available at http://127.0.0.1:5691/ .

The development instructions for dashboard are available here.

Logging

The Rust components use tokio-tracing to handle both logging and tracing. The default log level is set as:

  • Third-party libraries: warn
  • Other libraries: debug

To configure log levels, launch RisingWave with the environment variable RUST_LOG set as described here.

There’re also some logs designated for debugging purposes with target names starting with events:: . For example, by setting RUST_LOG=events::stream::message::chunk=trace , all chunk messages will be logged as it passes through the executors in the streaming engine. Search in the codebase to find more of them.

Metrics

The contents of this document may be subject to frequent change. It covers what each metric measures, and what information we may derive from it.

Barrier Latency

Prerequisite: Checkpoint

This metric measures the duration from which a barrier is injected into all sources in the stream graph, to the barrier flown through all executors in the graph.

What can we understand from it?

Usually when examining barrier latency, we look at high barrier latency.

There are two contributing factors to it:

  1. Time taken to actually process the streaming messages.
  2. Buffer capacity for streaming messages.

Processing costs

When injecting a new barrier, there will usually be streaming messages in the stream graph (unless it’s the initial barrier). Since we keep total order for streaming messages, this means that all streaming messages currently in the stream graph have to be processed before the barrier can pass through. If barrier latency is high, it could mean a long time is taken to process these streaming messages. Concretely, here are some costs of processing streaming messages:

  1. CPU cost of evaluating expressions.
  2. I/O remote exchange between fragments.
  3. Stateful Executor cache-miss (for instance hash-join and hash-agg). This results in extra costs to access state on s3.

Buffer capacities

Next, high barrier latency could also be caused by buffers in the graph. If some downstream buffer is congested, we will be unable to queue and continue processing upstream messages.

For instance, if the channel in the exchange executor is full, upstream messages cannot be sent through this channel. This means the upstream executor will be unable to continue processing new stream messages, until some space on the buffer is freed.

The various buffer sizes can currently be adjusted via options in the developer configuration file. For instance, options to configure buffer size of the exchange executor can be found here.

Another subtle cause is that large buffer size can also worsen barrier latency. Suppose stream message processing is at its limit, and there’s high latency as a result. Typically, backpressure kicks in, the source is throttled. If buffer sizes are too large, or if there are many buffers, there will not be backpressure applied to source immediately. During this delay, we will continue to see high barrier latency. A heuristic algorithm is on the way to deal with this: https://github.com/risingwavelabs/risingwave/issues/8654.

CPU Profiling Guide

Profiling on host

Share an easy-to-use profiler and flamegraph tool: https://github.com/koute/not-perf.git

Record samples:

nperf record -p `pidof compute-node` -o perf.data

Generate flamegraph:

nperf flamegraph --merge-threads perf.data > perf.svg

Profiling remote compute nodes

You can profile remote compute nodes from a local machine by simply type the following command.

./risedev ctl profile cpu --sleep [seconds]

All compute nodes will be profile for a given seconds time and generated flame graph will be transferred to your local machine .risingwave/profiling/.

Note: To profile compute nodes remotely, please make sure all remote nodes have a public IP address accessible from your local machine (where you are running risedev).

CPU Profiling on OSX

Get the pid of the node you want to profile.

pgrep compute-node

Use cargo flamegraph:

cargo install flamegraph
sudo flamegraph -o flamegraph.svg --pid [pid]

When you’re satisfied, you can Ctrl+C to stop the profiler.

open flamegraph.svg

Memory (Heap) Profiling Guide

Note that the content below is Linux-exclusive.

What is Heap Profile?

A heap profiler records the stack trace of the allocation of each live object, so it’s possible that function A allocates something and then hand over it to struct B, in this case, the allocation will still be counted on A.

Internals

RisingWave uses tikv-jemallocator on Linux, which is a Rust wrapper of jemalloc, as its memory allocator. On other platforms, RisingWave uses the default allocator.

Luckily, jemalloc provides built-in profiling support (official wiki). jemallocator exposes the feature via a cargo feature ‘profiling’. Here is a simple guide to profiling with jemallocator.

For RisingWave, feat: support heap profiling from risedev by fuyufjh · Pull Request #4871 added all things needed. Please just follow the below steps.

Step 1 - Collect Memory Profiling Dump

Depends on the deployment, click the corresponding section to read the instructions.

1.1. Profile RisingWave (locally) with risedev

Run a local cluster in EC2 instance with an additional environment variable RISEDEV_ENABLE_HEAP_PROFILE.

RISEDEV_ENABLE_HEAP_PROFILE=1 ./risedev d full

Here we use full instead of compose-3node-deploy because compose-3node-deploy uses Docker container to run RisingWave processes, which makes it more difficult to do profiling and analyzing.

Under the hood, risedev set environment variable MALLOC_CONF for RisingWave process. Here is the implementation.

By default, the profiler will output a profile result on every 4GB memory allocation. Running a query and waiting for a while, lots of .heap files will be generated in the current folder:

...
compactor.266308.15.i15.heap
compactor.266308.16.i16.heap
compactor.266308.17.i17.heap
compactor.266308.18.i18.heap
...
compute-node.266187.116.i116.heap
compute-node.266187.117.i117.heap
compute-node.266187.118.i118.heap
compute-node.266187.119.i119.heap
...
1.2. Profile RisingWave in testing pipelines

Currently, some testing pipelines such as longevity tests have enabled memory profiling by default, but some are not, such as performance benchmarks.

To enable heap profiling of compute nodes in benchmark pipelines, set environment variable when starting a job:

ENABLE_MEMORY_PROFILING=true

Under the hood, the pipeline script passes the value to kube-bench’s parameter benchmark.risingwave.compute.memory_profiling.enable (code here, and then kube-bench sets the environment to RisingWave Pods (code here).

Note that this is only for compute nodes. If you need to run profiling on other nodes, or need to tune the parameters of profiling, you may modify the parameters in risingwave-test’s env.override.toml manually and run the job with that branch. (Example)

1.3. Profile RisingWave in Kubernetes/EKS

If you run into an OOM issue in Kukernetes, now you will need to enable memory profiling first and reproduce the problem.

To enable memory profiling, set the environment variables MALLOC_CONF to Pods.

# Example: `statefulsets` for CN and Meta
kubectl edit statefulsets/benchmark-risingwave-compute-c
# Example: `deployments` for other nodes
kubectl edit deployments/benchmark-risingwave-connector-c

Add the MALLOC_CONF env var. Note the prof_prefix is used to specify the path and file names of dump. By default, /risingwave/cache/ is mounted to HostPath and will persist after Pod restarts, so we use it as dump path here.

env:
- name: MALLOC_CONF
  value: prof:true,lg_prof_interval:38,lg_prof_sample:19,prof_prefix:/risingwave/cache/cn

The suggested values of lg_prof_interval are different for different nodes. See risedev code: compactor_service, compute_node_service.rs, meta_node_service.rs.

Afterwards, the memory dump should be outputted to the specified folder. Use kubectl cp to download it to local.

1.4. Dump memory profile with risectl

You can manually dump a heap profiling with risectl for a compute node with Jemalloc profiling enabled (MALLOC_CONF=prof:true).

./risedev ctl profile heap --dir [dumped_file_dir]

The dumped files will be saved in the directory you specified.

Note: To profile compute nodes remotely, please make sure all remote nodes have a public IP address accessible from your local machine (where you are running risedev).

Step 2 - Analyze with jeprof

Note that each of the .heap files are full snapshots instead of increments. Hence, simply pick the latest file (or any historical snapshot).

jeprof is a utility provided by jemalloc to analyze heap dump files. It reads both the executable binary and the heap dump to get a full heap profiling.

Note that the heap profiler dump file must be analyzed along with exactly the same binary that it generated from. If the memory dump is collected from Kubernetes, please refer to 2.2.

2.1. Use jeprof locally

jeprof is already compiled in jemallocator and should be compiled by cargo, use it as follows:

# find jeprof binary
find . -name 'jeprof'

# set execution permission
chmod +x ./target/release/build/tikv-jemalloc-sys-22f0d47d5c562226/out/build/bin/jeprof

Faster jeprof (recommend)

In some platforms jeprof runs very slow. The bottleneck is addr2line, if you want to speed up from 30 minutes to 3s, please use :

git clone https://github.com/gimli-rs/addr2line
cd addr2line
cargo b --examples -r
cp ./target/release/examples/addr2line <your-path>
2.2. Use jeprof in Docker images

jeprof is included in RisingWave image v1.0.0 or later. For earlier versions, please copy an jeprof manually into the container.

Find a Linux machine and use docker command to start an environment with the specific RisingWave version. Here, -v $(pwd):/dumps mounts current directory to /dumps folder inside the container, so that you don’t need to copy the files in and out.

docker run -it --rm --entrypoint /bin/bash -v $(pwd):/dumps  ghcr.io/risingwavelabs/risingwave:latest

Generate collapsed file.

jeprof --collapsed binary_file heap_file > heap_file.collapsed

For example:

jeprof --collapsed /risingwave/bin/risingwave jeprof.198272.123.i123.heap > jeprof.198272.123.i123.heap.collapsed

Step 3 - Visualize Flame Graph

We recommend you to analyze collapsed file with speedscope. Just drop the .collapsed file into it. Click Left Heavy in the top-left corner to merge shared calling stacks.

Alternative: Generate flame graph locally

Download and unarchive FlameGraph utility.

Run

./flamegraph.pl --color=mem --countname=bytes heap_file.collapsed > flamegraph.svg

Example:

./flamegraph.pl --color=mem --countname=bytes jeprof.198272.4741.i4741.collapsed > flamegraph.svg

By the way, the step 2 and 3 can be written in one line with pipe:

jeprof --collapsed target/release/risingwave compute-node.10404.2466.i2466.heap | ~/FlameGraph/flamegraph.pl --color=mem --countname=bytes > flamegraph.svg

Micro Benchmarks

We have micro benchmarks for various components such as storage, stream and batch executors.

Running Micro Benchmarks

You can run them by specifying their name. For instance to run json_parser micro benchmark:

cargo bench json_parser

Generating Flamegraph for Micro Benchmarks

Note: Flamegraph generation depends on perf. You will need a linux box to run it.

  1. Install cargo-flamegraph

    cargo install flamegraph
    
  2. Install perf. If on ubuntu:

    sudo apt install linux-tools
    

    If using EC2, you may need this instead:

    sudo apt install linux-tools-aws
    

    On an EC2 instance you may also need to set paranoid level to 1, to give the profiler necessary permissions.

    sudo sh -c  "echo 1 >/proc/sys/kernel/perf_event_paranoid"
    
  3. Run flamegraph + benchmark (change json_parser to whichever benchmark you want to run.)

    cargo flamegraph --bench json_parser -- --bench
    

    Within a benchmark, there are also typically multiple benchmark groups. For instance, within the json_parser bench, there’s json_parser, debezium_json_parser_(create/read/update/delete) To filter you can just append a regex. For instance to only bench json_parser:

    cargo flamegraph --bench json_parser -- --bench ^json_parser
    

    You can take a look at Criterion Docs for more information.

Develop Connectors

This page describes the development workflow to develop connectors. For design docs, see

RisingWave supports a lot of connectors (sources and sinks). However, developing connectors is tricky because it involves external systems:

  • Before developing and test, it’s troublesome to set up the development environment
  • During testing, we need to seed the external system with test data (perhaps with some scripts)
  • The test relies on the configuration of the setup. e.g., the test needs to know the port of your Kafka in order to
  • We need to do the setup for both CI and local development.

Our solution is: we resort to RiseDev, our all-in-one development tool, to help manage external systems and solve these problems.

Before going to the specific methods described in the sections below, the principles we should follow are:

  • environment-independent: It should easy to start cluster and run tests on your own machine, other developers’ machines, and CI.
    • Don’t use hard-coded configurations (e.g., localhost:9092 for Kafka).
    • Don’t write too many logic in ci/scripts. Let CI scripts be thin wrappers.
  • self-contained tests: It should be straightforward to run one test case, without worrying about where is the script to prepare the test.
    • Don’t put setup logic, running logic and verification logic of a test in different places.

Reference: for the full explanations of the difficulies and the design of our solution, see here.

The following sections first walk you through what is the development workflow for existing connectors, and finally explain how to extend the development framework to support a new connector.

Set up the development environment

RiseDev supports starting external connector systems (e.g., Kafka, MySQL) just like how it starts the RisingWave cluster, and other standard systems used as part of the RisingWave Cluster (e.g., MinIO, etcd, Grafana).

You write the profile in risedev.yml (Or risedev-profiles.user.yml ), e.g., the following config includes Kafka and MySQL, which will be used to test sources.

  my-cool-profile:
    steps:
      # RisingWave cluster
      - use: minio
      - use: sqlite
      - use: meta-node
        meta-backend: sqlite
      - use: compute-node
      - use: frontend
      - use: compactor
      # Connectors
      - use: kafka
        address: message_queue
        port: 29092
      - use: mysql
        port: 3306
        address: mysql
        user: root
        password: 123456

Then

# will start the cluster along with Kafka and MySQL for you
risedev d my-cool-profile

For all config options of supported systems, check the comments in template section of risedev.yml .

Escape hatch: user-managed mode

user-managed is a special config. When set to true , you will need to start the system by yourself. You may wonder why bother to add it to the RiseDev profile if you start it by yourself. In this case, the config will still be loaded by RiseDev, which will be useful in tests. See chapters below for more details.

The user-managed mode can be used as a workaround to start a system that is not yet supported by RiseDev, or is buggy. It’s also used to config the CI profile. (In CI, all services are pre-started by ci/docker-compose.yml )

Example of the config:

      - use: kafka
        user-managed: true
        address: message_queue
        port: 29092

End-to-end tests

The e2e tests are written in slt files. There are 2 main points to note:

  1. Use system ok to run bash commands to interact with external systems. Use this to prepare the test data, and verify the results. The whole lifecycle of a test case should be written in the same slt file.
  2. Use control substitution on and then use environment variables to specify the config of the external systems, e.g., the port of Kafka.

Refer to the sqllogictest-rs documentation for the details of system and substitution .


Take Kafka as an example about how to the tests are written:

When you use risedev d to start the external services, related environment variables for Kafka will be available when you run risedev slt:

RISEDEV_KAFKA_BOOTSTRAP_SERVERS="127.0.0.1:9092"
RISEDEV_KAFKA_WITH_OPTIONS_COMMON="connector='kafka',properties.bootstrap.server='127.0.0.1:9092'"
RPK_BROKERS="127.0.0.1:9092"

The slt test case looks like this:

control substitution on

# Note: you can also use envvars in `system` commands, but usually it's not necessary since the CLI tools can load envvars themselves.
system ok
rpk topic create my_source -p 4

# Prepared test topic above, and produce test data now
system ok
cat << EOF | rpk topic produce my_source -f "%p %v\n" -p 0
0 {"v1": 1, "v2": "a"}
1 {"v1": 2, "v2": "b"}
2 {"v1": 3, "v2": "c"}
EOF

# Create the source, connecting to the Kafka started by RiseDev
statement ok
create source s0 (v1 int, v2 varchar) with (
  ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
  topic = 'my_source',
  scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

See src/risedevtool/src/risedev_env.rs for variables supported for each service.

Note again: You need to use risedev d to start the cluster, and then use risedev slt to run the tests. It doesn’t work if you start the cluster by yourself without telling RiseDev, or you use raw sqllogictest binary directly.

How it works: risedev d will write env vars to .risingwave/config/risedev-env, and risedev slt will load env vars from this file.

Tips for writing system commands

Refer to the sqllogictest-rs documentation for the syntax.

For simple cases, you can directly write a bash command, e.g.,

system ok
mysql -e "
    DROP DATABASE IF EXISTS testdb1; CREATE DATABASE testdb1;
    USE testdb1;
    CREATE TABLE tt1 (v1 int primary key, v2 timestamp);
    INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00');
"

system ok
cat << EOF | rpk topic produce my_source -f "%p %v\n" -p 0
0 {"v1": 1, "v2": "a"}
1 {"v1": 2, "v2": "b"}
2 {"v1": 3, "v2": "c"}
EOF

For more complex cases, you can write a test script, and invoke it in slt. Scripts can be written in any language you like, but kindly write a README.md to help other developers get started more easily.

  • For ad-hoc scripts (only used for one test), it’s better to put next to the test file.

    e.g., e2e_test/source_inline/kafka/consumer_group.mjs, which is invoked by consumer_group.slt next to it.

  • For general scripts that can be used under many situations, put it in e2e_test/commands/. This directory will be loaded in PATH by risedev slt, and thus function as kind of “built-in” commands.

    A common scenario is when a CLI tool does not accept envvars as arguments. In such cases, instead of manually specifying the arguments each time invoking it in slt, you can create a wrapper to handle this implicitly, making it more concise. e2e_test/commands/mysql is a good demonstration.


Tips for debugging:

  • Use echo to check whether the environment is correctly set.

    system ok
    echo $PGPORT
    ----
    placeholder
    

    Then running risedev slt will return error “result mismatch”, and shows what’s the output of the echo command, i.e., the value of PGPORT.

  • Use risedev show-risedev-env to see the environment variables available for risedev slt, after you starting the cluster with risedev d.

Adding a new connector to the development framework

Refer to #16449 ( user-managed only MySQL), and #16514 (Docker based MySQL) as examples.

  1. Add a new service in template section of risedev.yml. And add corresponding config in src/risedevtool/src/service_config.rs .
  2. Implement the new service task, and add it to src/risedevtool/src/bin/risedev-dev.rs.
  3. Add environment variables you want to use in the slt tests in src/risedevtool/src/risedev_env.rs.
  4. Write tests according to the style explained in the previous section.

Source

This page describes RisingWave’s Data Source API and the architecture behind it. This may help if you are interested in how data sources work, or if you want to implement a new Data Source.

For the workflow of developing connectors, see Develop Connectors.

Components

RisingWave’s data source covers four parts: connectors, enumerators, ConnectorSource and SourceExecutor.

data source arch

Connectors

Connector serves as an interface to upstream data pipeline, including the message queue and file system. In the current design, it can only have a limited concurrency. One connector instance only reads from one split from the upstream. For example, if upstream is a Kafka and it has three partitions so, in RisingWave, there should be three connectors.

All connectors need to implement the following trait and it exposes two methods to the upper layer.

#![allow(unused)]
fn main() {
// src/connector/src/base.rs
pub trait SplitReader: Sized {
    type Properties;

    async fn new(
        properties: Self::Properties,
        state: ConnectorState,
        columns: Option<Vec<Column>>,
    ) -> Result<Self>;
    async fn next(&mut self) -> Result<Option<Vec<SourceMessage>>>;
}
}
  • new: create a new connector with some properties, and this method should support restoring from a specific state(with partitions and offsets).
  • next: return a batch of new messages and their offsets in the split.

Enumerators

Enumerator periodically requests upstream to discover changes in splits, and in most cases the number of splits only increases. The enumerator is a separate task that runs on the meta. If the upstream split changes, the enumerator notifies the connector by means of config change to change the subscription relationship.

All enumerators need to implement the following trait.

#![allow(unused)]
fn main() {
// src/connector/src/base.rs
pub trait SplitEnumerator: Sized {
    type Split: SplitMetaData + Send + Sync;
    type Properties;

    async fn new(properties: Self::Properties) -> Result<Self>;
    async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
}
}
  • new: creates an enumerator with some properties.
  • list_splits: requests the upstream and returns all partitions.

ConnectorSource

ConnectorSource unites all connectors via SourceReader trait. Also, a parser is held here, which parses raw data to stream chunks according to column description. A ConnectorSource can handle multiple splits by spawning a new thread for each split. If the source is assigned no split, it will start a dummy reader whose next method never returns as a placeholder.

SourceExecutor

SourceExecutor is initialized with splits assigned by the enumerator to subscribe. The channel of the data chunks and the channel of the barrier are combined at this level and the SourceExecutor needs to prioritize and correctly handle the barriers.

How It Works

  1. When a source is defined, meta service will register its schema and broadcast to compute nodes. Compute node extracts properties from the frontend and builds corresponding components and stores them as SourceDesc in source_manager identified by table_id. Note that at this stage, the source instance is only built but not running.
  2. No SourceExecutor will be built until a subsequent materialized view is created. SourceExecutor fetches specific source instance from source_manager identified by table_id and holds a copy of it, and initializes the corresponding state store at this stage.
  3. When receiving a barrier, SourceExecutor will check whether it contains an assign_split mutation. If the partition assignment in the assign_split mutation is different from the current situation, the SourceExecutor needs to rebuild the ConnectorSource and other underlying services based on the information in the mutation, then starts reading from the new split and offset.
  4. Whenever receiving a barrier, the state handler always takes a snapshot of the ConnectorSource then labels the snapshot with an epoch number. When an error occurs, SourceExecutor takes a specific state and applies it.

Continuous Integration

CI Labels Guide

  • [ci/run-xxx ...]: Run additional steps in the PR workflow indicated by ci/run-xxx in your PR.
  • ci/pr/run-selected + [ci/run-xxx ...] : Only run selected steps indicated by ci/run-xxx in your DRAFT PR.
  • ci/main-cron/run-all: Run full main-cron workflow for your PR.
  • ci/main-cron/run-selected + [ci/run-xxx …] : Run specific steps indicated by ci/run-xxx from the main-cron workflow, in your PR. Can use to verify some main-cron fix works as expected.
  • To reference [ci/run-xxx ...] labels, you may look at steps from pull-request.yml and main-cron.yml.

Example

https://github.com/risingwavelabs/risingwave/pull/17197

To run e2e-test and e2e-source-test for main-cron in your pull request:

  1. Add ci/run-e2e-test.
  2. Add ci/run-e2e-source-tests.
  3. Add ci/main-cron/run-selected to skip all other steps which were not selected with ci/run-xxx.

Main Cron Bisect Guide

  1. Create a new build via buildkite
  2. Add the following environment variables:
    • GOOD_COMMIT: The good commit hash.
    • BAD_COMMIT: The bad commit hash.
    • BISECT_BRANCH: The branch name where the bisect will be performed.
    • CI_STEPS: The CI_STEPS to run during the bisect. Separate multiple steps with a comma.
      • You can check the labels for this in main-cron.yml, under the conditions for each step.

Example you can try on buildkite:

  • Environment variables:
    GOOD_COMMIT=29791ddf16fdf2c2e83ad3a58215f434e610f89a
    BAD_COMMIT=7f36bf17c1d19a1e6b2cdb90491d3c08ae8b0004
    BISECT_BRANCH=kwannoel/test-bisect
    CI_STEPS="test-bisect,disable-build"
    

Architecture Design

Motivation

This document serves as one of the materials for newcomers to learn the high-level architecture and the functionalities of each component.

Architecture

There are currently 4 types of nodes in the cluster:

  • Frontend: Frontend is a stateless proxy that accepts user queries through Postgres protocol. It is responsible for parsing, validation, optimization, and answering the results of each individual query.
  • ComputeNode: ComputeNode is responsible for executing the optimized query plan.
  • Compactor: Compactor is a stateless worker node responsible for executing the compaction tasks for our storage engine.
  • MetaServer: The central metadata management service. It also acts as a failure detector that periodically sends heartbeats to frontends and compute-nodes in the cluster. There are multiple sub-components running in MetaServer:
    • ClusterManager: Manages the cluster information, such as the address and status of nodes.
    • StreamManager: Manages the stream graph of RisingWave.
    • CatalogManager: Manage table catalog in RisingWave. DDL goes through catalog manager and catalog updates will be propagated to all frontend nodes in an async manner.
    • BarrierManager: Manage barrier injection and collection. Checkpoint is initiated by barrier manager regularly.
    • HummockManager: Manages the SST file manifest and meta-info of Hummock storage.
    • CompactionManager: Manages the compaction status and task assignment of Hummock storage.

Architecture

The topmost component is the Postgres client. It issues queries through TCP-based Postgres wire protocol.

The leftmost component is the streaming data source. Kafka is the most representative system for streaming sources. Alternatively, Redpanda, Apache Pulsar, AWS Kinesis, Google Pub/Sub are also widely-used. Streams from Kafka will be consumed and processed through the pipeline in the database.

The bottom-most component is AWS S3, or MinIO (an open-sourced s3-compatible system). We employed a disaggregated architecture in order to elastically scale the compute-nodes without migrating the storage.

Execution Mode

There are 2 execution modes in our system serving different analytics purposes.

Batch-Query Mode

The first is the batch-query mode. Users issue such a query via a SELECT statement and the system answers immediately. This is the most typical RDBMS use case.

Let’s begin with a simple SELECT and see how it is executed.

SELECT SUM(t.quantity) FROM t group by t.company;

Batch-Query

The query will be sliced into multiple plan fragments, each being an independent scheduling unit and probably with different parallelism. For simplicity, parallelism is usually set to the number of CPU cores in the cluster. For example, if there are 3 compute-nodes in the cluster, each with 4 CPU cores, then the parallelism will be set to 12 by default.

Each parallel unit is called a task. Specifically, PlanFragment 2 will be distributed as 4 tasks to 4 CPU cores.

Plan-Fragments

Behind the TableScan operator, there’s a storage engine called Hummock that stores the internal states, materialized views, and the tables. Note that only the materialized views and tables are queryable. The internal states are invisible to users.

To know more about Hummock, you can check out “An Overview of RisingWave State Store”.

Streaming Mode

The other execution mode is the streaming mode. Users build streaming pipelines via CREATE MATERIALIZED VIEW statement. For example:

CREATE MATERIALIZED VIEW mv1 AS SELECT SUM(t.quantity) as q FROM t group by t.company;

Stream-Pipeline

When the data source (Kafka, e.g.) propagates a bunch of records into the system, the materialized view will refresh automatically.

Assume that we have a sequence [(2, "AMERICA"), (3, "ASIA"), (4, "AMERICA"), (5, "ASIA")]. After the sequence flows through the DAG, the MV will be updated to:

AB
6AMERICA
8ASIA

When another sequence [(6, "EUROPE"), (7, "EUROPE")] comes, the MV will soon become:

AB
6AMERICA
8ASIA
13EUROPE

mv1 can also act as other MV’s source. For example, mv2, mv3 can reuse the processing results of mv1 thus deduplicating the computation.

The durability of materialized views in RisingWave is built upon a snapshot-based mechanism. Every time a snapshot is triggered, the internal states of each operator will be flushed to S3. Upon failover, the operator recovers from the latest S3 checkpoint.

Since the streaming states can be extremely large, so large that they cannot (or only ineffectively) be held in memory in their entirety, we have designed Hummock to be highly scalable. Compared to Flink’s rocksdb-based state store, Hummock is cloud-native and provides super elasticity.

For more details of our streaming engine, please refer to “An Overview of RisingWave Streaming Engine”.

An Overview of the RisingWave Streaming Engine

Overview

RisingWave provides real-time analytics to serve user’s need. This is done by defining materialized views (MV). All materialized views will be automatically refreshed according to recent updates, such that querying materialized views will reflect real-time analytical results. Such refreshing is carried out by our RisingWave streaming engine.

The core design principles of the RisingWave streaming engine are summarized as follows.

  • Actor model based execution engine. We create a set of actors such that each actor reacts to its own input message, including both data update and control signal. In this way we build a highly concurrent and efficient streaming engine.
  • Shared storage for states. The backbone of the state storage is based on shared cloud object storage (currently AWS S3), which gives us computational elasticity, cheap and infinite storage capacity, and simplicity during configuration change.
  • Everything is a table, everything is a state. We treat every object in our internal storage as both a logical table and an internal state. Therefore, they can be effectively managed by catalog, and be updated in a unified streaming engine with consistency guarantee.

In this document we give an overview of the RisingWave streaming engine.

Architecture

streaming-architecture

The overall architecture of RisingWave is depicted in the figure above. In brief, RisingWave streaming engine consists of three sets of nodes: frontend, compute nodes, and meta service. The frontend node consists of the serving layer, handling users’ SQL requests concurrently. Underlying is the processing layer. Each compute node hosts a collection of long-running actors for stream processing. All actors access a shared persistence layer of storage (currently AWS S3) as its state storage. The meta service maintains all meta-information and coordinates the whole cluster.

When receiving a create materialized view statement at the frontend, a materialized view and the corresponding streaming pipeline are built in the following steps.

  1. Building a stream plan. Here a stream plan is a logical plan which consists of logical operators encoding the dataflow. This is carried out by the streaming planner at the frontend.
  2. Fragmentation. The stream fragmenter at the meta service breaks the generated logical stream plan into stream fragments, and duplicates such fragments when necessary. Here a stream fragment holds partial nodes from the stream plan, and each fragment can be parallelized by building multiple actors for data parallelization.
  3. Scheduling plan fragments. The meta service distributes different fragments into different compute nodes and let all compute nodes build their local actors.
  4. Initializing the job at the backend. The meta service notifies all compute nodes to start serving streaming pipelines.

Actors, executors, and states

streaming-executor

Actors

Actors are the minimal unit to be scheduled in the RisingWave streaming engine, such that there is no parallelism inside each actor. The typical structure of an actor is depicted on the right of the figure above. An actor consists of three parts.

  • Merger (optional). Each merger merges the messages from different upstream actors into one channel, such that the executors can handle messages sequentially. The merger is also in charge of aligning barriers to support checkpoints (details described later).
  • A chain of executors. Each executor is the basic unit of delta computation (details described later).
  • Dispatcher (optional). Each dispatcher will send its received messages to different downstream actors according to certain strategies, e.g. hash shuffling or round-robin.

The execution of actors is carried out by tokio async runtime. After an actor starts running, it runs an infinite loop in which it continuously runs async functions to generate outputs, until it receives a stop message.

Messages between two local actors are transferred via channels. For two actors located on different compute nodes, messages are re-directed to an exchange service. The exchange service will continuously exchange messages with each other via RPC requests.

Executors

Executors are the basic computational units in the streaming engine. Each executor responds to its received messages and computes an output message atomically, i.e the computation inside each executor will not be broken down.

The underlying algorithmic framework of the RisingWave streaming system is the traditional change propagation framework. Given a materialized view to be maintained, we build a set of executors where each executor corresponds to a relational operator (including base table). When any of the base tables receive an update, the streaming engine computes the changes to each of the materialized views by recursively computing the update from the leaf to the root. Each node receives an update from one of its children, computes the local update, and propagates the update to its parents. By guaranteeing the correctness of every single executor, we get a composable framework for maintaining arbitrary SQL queries.

Checkpoint, Consistency, and Fault tolerance

We use the term consistency to denote the model of the completeness and correctness of querying materialized views. We follow the consistency model introduced in Materialize. More specifically, the system assures that the query result is always a consistent snapshot of a certain timestamp t before the query issued a timestamp. Also, later queries always get consistent snapshots from a later timestamp. A consistent snapshot at t requires that all messages no later than t are reflected in the snapshot exactly once while all messages after t are not reflected.

Barrier based checkpoint

To guarantee consistency, RisingWave introduces a Chandy-Lamport style consistent snapshot algorithm as its checkpoint scheme.

This procedure guarantees that every state to be flushed into the storage is consistent (matching a certain barrier at the source). Therefore, when querying materialized views, consistency is naturally guaranteed when the batch engine reads a consistent snapshot (of views and tables) on the storage. We also call each barrier an epoch and sometimes use both terms interchangeably as data streams are cut into epochs. In other words, the write to the database is visible only after it has been committed to the storage via the checkpoint.

To improve the efficiency, all dirty states on the same compute node are gathered to a shared buffer, and the compute node asynchronously flushes the whole shared buffer into a single SST file in the storage, such that the checkpoint procedure shall not block stream processing.

See more detailed descriptions on Checkpoint.

Fault tolerance

When the streaming engine crashes down, the system must globally rollback to a previous consistent snapshot. To achieve this, whenever the meta detects the failover of some certain compute node or any undergoing checkpoint procedure, it triggers a recovery process. After rebuilding the streaming pipeline, each executor will reset its local state from a consistent snapshot on the storage and recover its computation.

Checkpoint

Revisit: Consistency Model

Similar to other relational databases, RisingWave provides consistent snapshot reads on both tables and materialized views. Specifically,

  1. Consistency. Given a query Q and the latest event timestamp t, the system returns a result Q(t′​) such that Q(t′​) is consistent on a timestamp t′≤t , i.e. evaluating Q over a snapshot S(t′)​ from a previous timestamp t′≤t, where S(t′) is the set of all tuples presented before timestamp t′. That is, the system delivers a query result that is consistent with a previous timestamp t′.
  2. Monotonicity. For any two timestamps t1​ and t2​ such that t1​<t2​, assume the result for Q on t1​ and t2​ are consistent with snapshots on t1′​ and t2′​ respectively, then t1′​<t2′​. That is, later queries should not return results that correspond to earlier snapshots.

Note that RisingWave does not guarantee a write must be visible to subsequence reads, a.k.a. the read-after-write consistency. Users may use the FLUSH command to make sure the changes have taken effect before reads.

Internally, the upcoming changes may take a while to propagate from sources to materialized views, and at least one barrier event is required to flush the changes. Such two kinds of latency determine the latency between write and read.

Streaming Checkpoint

The consistent checkpoints play 2 roles in our system.

  1. Fault-tolerance. To recover the cluster from an unexpected failure, every stateful streaming operator needs to recover their states from a consistent checkpoint.
  2. Consistent snapshot. The snapshot to be read is actually the latest completed checkpoint. As the previous section discussed, it’s required to guarantee the data of tables & materialized views consistent in one snapshot.

RisingWave makes checkpointing via Chandy–Lamport algorithm. A special kind of message, checkpoint barriers, is generated on streaming source and propagates across the streaming graph to the materialized views (or sink).

To guarantee consistency, RisingWave introduces Chandy-Lamport algorithm as its checkpoint scheme. In particular, RisingWave periodically (every barrier_interval_ms) repeats the following procedure:

  1. The meta service initializes a barrier and broadcasts it to all source actors across the streaming engine.
  2. The barrier messages go through every streaming operator (actors) in the streaming graph.
    • For fan-out operators like Dispatch, the barrier messages are copied to every downstream.
    • For fan-in operators like Merge or Join, the barrier messages are collected and emitted out once collected from all upstreams.
    • For other operators, the barrier messages flow through the operators and trigger a checkpoint operation on them. Namely, flush the changes into storage.
  3. When all dirty states from a compute node are flushed to storage, the compute node sends a finish signal to the meta service.
  4. After receiving the finish signal from all compute nodes, the meta service tells the storage manager to commit the checkpoint and finishes the checkpoint procedure.

Checkpoint on Storage

As is mentioned before, during checkpointing, every operator writes their changes of this epoch into storage. For the storage layer, these data are still uncommitted, i.e. not persisted to the shared storage. However, the data still need to be visible to that operator locally.

A local shared buffer is introduced to stage these uncommitted write batches. Once the checkpoint barriers have pass through all actors, the storage manager can notify all compute nodes to ‘commit’ their buffered write batches into the shared storage.

shared buffer

Another benefit of shared buffer is that the write batches in a compute node can be compacted into a single SSTable file before uploading, which significantly reduces the number of SSTable files in Layer 0.

Aggregations

We will cover internal implementation of common aggregations in this document.

Frontend

TODO

Expression Framework

TODO

HashAggExecutor

aggregation components

Within the HashAggExecutor, there are 4 main components:

  1. AggCalls.
  2. AggState.
  3. AggGroups.
  4. Persisted State.

AggCalls are the aggregation calls for the query. For instance SUM(v1), COUNT(v2) has the AggCalls SUM and COUNT.

AggState is the state we use to compute to the result (output) of the aggregation call. Within each aggregation group, it will have an AggState for each AggCall.

AggGroups are created per aggregation group. For instance with GROUP BY x1, x2, there will be a group for each unique combination of x1 and x2.

Whenever stream chunks come in, the executor will update the aggregation state for each group, per agg call.

On barrier, we will persist the in-memory states. For value type aggregations, we will persist the state to the intermediate state table. This state table will store all value aggregations per group on a single row.

For MaterializedInput type aggregations, these require tracking input state. For example, non-append-only min/max. For each of these aggregations, they have 1 state table (AggStateStorage::MaterializedInput) each. Within the state table, it will store the input state for each group.

Initialization of AggGroups

init-agg-group

AggGroups are initialized when corresponding aggregation groups are not found in AggGroupCache. This could be either because the AggGroupCache got evicted, or its a new group key.

It could take a while to initialize agg groups, hence we cache them in AggGroupCache.

MView on Top of MView

Background

RisingWave supports creating a new materialized view (abbreviated as mview) based on the source and another mview, so users can split their data into multiple layers and use mviews’ chains to connect them.

In detail, we will support the creation of a materialized view whose source is some other mview(s). Please note that there should not be a circular dependency on mviews.

create table t1 (v1 int, deleted boolean);
create materialized view mv1 as select * from t1 where deleted = false;
create materialized view mv2 as select sum(v1) as sum_v1 from mv1;
create materialized view mv3 as select count(v1) as count_v1 from mv1;

Design

Broadcast operator

In physical representation, we introduce a dispatcher operator type, Broadcast. Broadcast dispatcher, as its name indicates, will dispatch every message to multiple downstreams. To simplify our design, we can assume that every MViewOperator has a Broadcast output, with zero or more downstreams.

fig1

Create new mview online

Assume that we already have a materialized view mv1, and we want to create a new materialized view mv2 based on mv1. This is equivalent to a configuration change to Broadcast dispatcher. Before the real change happens, we have to apply the snapshot of mv1 to mv2 first. Therefore, we introduce another operator named Chain.

The Chain operator has two inputs. The first one will be a batch query, denoted by the blue patterns in the figure below, which is a finite append-only stream (the snapshot of historical data in the base mview). The second one is its original input, an infinite stream, denoted by the red patterns.

fig2

The full process of creation is:

  1. The frontend parses the query and sends the plan to StreamManager.
  2. StreamManager creates the new actors.
  3. StreamManager chooses a change epoch e1, pins a snapshot of mv1 at e1, and sends a barrier with e1 and change info.
  4. The broadcast operator receives the barrier, then creates a SnapshotStream of mv1 with e1, and creates a Chain operator, then connects them all. (only changes in the memory).
  5. The broadcast operator sends a normal barrier e1 to all downstreams, and continues.
  6. The Chain operator consumes all messages from snapshot and receives EOF, then consumes buffered messages from upstream.
  7. StreamManager discovered that mv2 has almost caught up with the progress of mv1, and the creation success.

Drop mview online

Assume that we already have three materialized views mv1, mv2, and mv3. mv2 and mv3 are on top of mv1, so mv1 is not allowed to be dropped.

The full process of drop mv3 is:

  1. The frontend parses the query and sends the plan to StreamManager.
  2. StreamManager chooses a change epoch e1, and sends a barrier with e1 and change info.
  3. The broadcast operator sends a normal barrier e1 to all downstreams.
  4. The broadcast operator removes the dropped output from its outputs, and continues.
  5. StreamManager discovered that mv3 has the epoch e1, then drops extra fragments physically.

Backfill

Backfill is used by various components of our system to merge historical data and realtime data stream.

There are many variants to it, and we will discuss them in the following sections.

Table of Contents

Backfilling 101

Motivation

When creating a Materialized View on top of another one, we need to fetch all the historical data, only then we can start processing the realtime updates, and applying them to the stream.

However, in the process of doing so, we need to either:

  1. Buffer all the updates.
  2. Block the upstream from sending updates (i.e. pause the entire stream graph).

For the first option, it is not feasible to buffer all the updates, when historical data takes while to process. If we buffer all the updates it can cause OOM.

For the second option, it is also not feasible, as we are blocking the entire stream graph whenever we create a new materialized view, until processing historical data is done.

So we need a way to merge historical data and realtime updates, without blocking the upstream. This can be done by Backfill.

How it works

Backfilling is the process of merging historical data and update stream into one.

Consider the following example.

We have the following table with 1M records:

CREATE TABLE t (
  id INT PRIMARY KEY,
  name VARCHAR
);

Then we create a materialized view from that table:

CREATE MATERIALIZED VIEW mv AS SELECT * FROM t;

In one epoch B, we read the historical data from t up to row 2, from data in the previous epoch A:

opidname
+1a
+2b

Note that the op column is the operation type, and + means insert.

We use insert since all the historical data are just inserts to the downstream materialized view.

In that same epoch B, suppose there are some updates to the table t due to DML statements being ran:

opidname
+4d
-1a
......
-99zzz
+100zzzz

The same updates will then be sent propagated mv in epoch B.

Since we backfilled the historical data up to row 2, we only need to apply the updates up to row 2.

So downstream will just receive:

  1. The historical data up to row 2.

    opidname
    +1a
    +2b
  2. The realtime delta stream up to row 2:

    opidname
    -1a

So we didn’t need to buffer all the updates, until historical data is completely processed. Instead at each epoch, we just read some historical data, and apply any relevant updates on them.

To ensure we always make progress, we will keep track of the last row we backfilled to, and continue from after that row in the next epoch.

In the following sections, we will delve into specific types of backfill.

References

RFC: Use Backfill To Let Mv On Mv Stream Again

NoShuffle Backfill

This section will mainly discuss the implementation of NoShuffle Backfill, as the concept is as described above.

This backfill executor is the precursor to arrangement backfill.

It is used to backfill RisingWave Materialized Views, Tables and Indexes.

NoShuffleBackfill executor receives upstream updates via a NoShuffleDispatcher. For historical data, it uses batch scan to read snapshot data using the StorageTable interface.

Using the NoShuffleDispatcher means that the actors in the scan fragment need to be scheduled to the same parallel unit as the actors in the dispatcher.

This also means that the parallelism of NoShuffleBackfill is coupled with its upstream fragment.

When scaling, we can’t scale backfill independently as a result, only together with the upstream fragment.

Another issue with NoShuffleBackfill is the way backfill state is stored. Backfill executor stores a single latest state across all vnodes it has:

vnodepk_offset
05
15
25

This means if we scale in or out, the state may not be accurate. This is because the state if partitioned per vnode could be:

vnodepk_offset
01
12
25

If we ran some scaling operations, and got:

vnodepk_offset
30
15
25

It would be unclear which pk_offset to resume from.

In the next iteration of backfill, we have ArrangementBackfill which solves these issues.

Arrangement Backfill

ArrangementBackfill is the next iteration of NoShuffleBackfill executor.

Similarly, it is used to backfill RisingWave Materialized Views, Tables and Indexes.

The main goal of ArrangementBackfill is to scale its parallelism independently of the upstream fragment. This is done with replication.

Differences with NoShuffleBackfill

First, let’s discuss the key differences in components.

SideNoShuffleBackfillArrangementBackfill
UpstreamNoShuffleDispatcherHashDispatcher
HistoricalScan on StorageTableScan on Replicated StateTable

For the upstream part, it’s pretty straightforward. We use a HashDispatcher to dispatch updates to the backfill executor, since ArrangementBackfill can be on a different parallel unit than its upstream fragment.

For the historical part, we use a Replicated StateTable to read historical data, and replicate the shared buffer.

Arrangement Backfill Frontend

Arrangement Backfill is constructed in the following phases in the optimizer:

(1) LogicalScan -> (2) StreamTableScan -> (3) PbStreamScan (MergeNode, ..)

From 2 to 3, we will compute the output_indices (A) from upstream to backfill, and the output_indices (B) from backfill to downstream.

(B) will always be a subset of (A). The full PK is needed for backfilling, but it is not always needed after that.

For example, consider the following queries.

create table t1(id bigint primary key, i bigint);
create materialized view mv1 as select id, i from t1 order by id, i;
create materialized view mv2 as select id from mv1;

mv1 has the following plan:

 StreamMaterialize { columns: [id, i], stream_key: [id], pk_columns: [id, i], pk_conflict: NoCheck }
 └─StreamTableScan { table: t1, columns: [id, i] }

mv2 has the following plan:

 StreamMaterialize { columns: [id], stream_key: [id], pk_columns: [id], pk_conflict: NoCheck }
 └─StreamTableScan { table: mv1, columns: [mv1.id], stream_scan_type: ArrangementBackfill, pk: [mv1.id], dist: UpstreamHashShard(mv1.id) }
(2 rows)

Notice how mv2 only needs the id column from mv1, and not the full pk with i.

Backfill logic

Overview

backfill sides

For ArrangementBackfill, we have 2 streams which we merge: upstream and historical streams. Upstream will be given precedence, to make sure Barriers can flow through the stream graph. Additionally, every epoch, we will refresh the historical stream, as upstream data gets checkpointed so our snapshot is stale.

polling

We will poll from this stream in backfill to get upstream and historical data chunks for processing, as well as barriers to checkpoint to backfill state.

For each chunk (DataChunk / StreamChunk), we may also need to do some further processing based on their schemas.

Schemas

schema

There are 3 schemas to consider when processing the backfill data:

  1. The state table schema of upstream.
  2. The output schema from upstream to arrangement backfill.
  3. The output schema from arrangement backfill to its downstream.

For chunks coming from upstream (whether historical or directly from the dispatcher), we will need to transform it from (2) to (3) before yielding the chunks downstream.

For chunks being replicated to the replicated state table, we need to transform them from (2) to (1), so they match the upstream state table schema. Otherwise, deserialization for these replicated records will fail, due to a schema mismatch.

For chunks being read from the replicated state table, it must contain logic to transform them from (1) to (2), to ensure the historical side and the upstream side have a consistent schema.

Polling loop

handle_poll

If we poll a chunk from the historical side, we will yield it to the downstream, and update the primary key (pk) we have backfilled to in the backfill state.

If we poll a chunk from the upstream side, we will buffer it. This is because we need to only contain updates for historical data we have backfilled. We can just do that at the end of the epoch, which is when we receive a barrier. We will also replicate it by writing it to the ReplicatedStateTable.

If we poll a barrier from the upstream side, we will need to flush the upstream chunk buffer. First, transform the schema of the upstream chunk buffer from 2. to 3.. Next we will flush records which are lower or equal to the pk we have backfilled to. Finally, we build a new snapshot stream to read historical data in the next epoch.

Then the polling loop will continue.

Replication

replication_simple

Previously, when doing snapshot reads to read Historical Data, backfill executor is able to read from the shared buffer for the previous epoch. This is because it will be scheduled to the same parallel unit as the upstream.

However, with ArrangementBackfill, we can’t rely on the shared buffer of upstream, since it can be on a different parallel unit.

replication_replicated

So we need to make sure for the previous epoch, we buffer its updates somewhere to replicate the shared buffer.

Then we can merge the shared buffer, with the current checkpointed data to get the historical data for that epoch.

To replicate the shared buffer, we simply just create a ReplicatedStateTable. This will just store the ReadVersions but never upload them to the Object Store. Then the StateTable’s logic will take care of merging the shared buffer and the committed data in the Object store for us.

Example: Read / Write Paths Replicated Chunk

Recall from the above section on schemas:

For chunks being replicated to the replicated state table, we need to transform them from (2) to (1), so they match the upstream state table schema.

For chunks being read from the replicated state table, it must contain logic to transform them from (1) to (2), to ensure the historical side and the upstream side have a consistent schema.

Where (1) refers to the state table schema of upstream, and (2) refers to the output schema from upstream to arrangement backfill.

replication_example

Now let’s consider an instance where (1) has the schema:

idnameagedrivers_license_id

And (2) has the schema:

drivers_license_idnameid

Consider if we have the following chunk being replicated to the ReplicatedStateTable:

drivers_license_idnameid
1‘Jack’29

We will to transform it to the schema of (1), and insert it into the ReplicatedStateTable:

idnameagedrivers_license_id
29‘Jack’NULL1

This will then be serialized into kv pairs and written to the state store.

Subsequently, when reading from the state store to get historical data, we deserialize the kv pairs, merging the shared buffer with the committed data in the Object Store.

Let’s say we got this back:

idnameagedrivers_license_id
29‘Jack’NULL1
30‘Jill’302

Then we will transform it to the schema of (2), and arrangement backfill will consume this historical data snapshot:

drivers_license_idnameid
1‘Jack’29
2‘Jill’30

Initialization

Something to note is that for the first snapshot, upstream may not have finished committing data in that epoch to s3.

Additionally, we have not replicated any upstream records during that epoch, only in the subsequent ones.

As such, we must wait for that first checkpoint to be committed, before reading, or we risk missing the uncommitted data in our backfill.

This is supported internally inside init_epoch for replicated state table.

        upstream_table.init_epoch(first_epoch).await?;

Recovery

TODO

Scaling

TODO

Further improvements

  • Make backfill vnode level within each partition: https://github.com/risingwavelabs/risingwave/issues/14905

Cdc Backfill

TODO

Source Backfill

TODO

An Overview of RisingWave State Store

Overview

In RisingWave, all streaming executors store their data into a state store. This KV state store is backed by a service called Hummock, a cloud-native LSM-Tree-based storage engine. Hummock provides key-value API, and stores all data on a S3-compatible service. However, it is not a KV store for general purpose, but a storage engine co-designed with RisingWave the streaming engine and optimized for streaming workloads.

Architecture

Reading this document requires prior knowledge of LSM-Tree-based KV storage engines, like RocksDB, LevelDB, etc.

Overview of Architecture

Hummock consists of a manager service on the meta node, clients on worker nodes (including compute nodes, frontend nodes, and compactor nodes), and a shared storage to store files (SSTs). Every time a new write batch is produced, the Hummock client will upload those files to shared storage, and notify the Hummock manager of the new data. With compaction going on, new files will be added and unused files will be vacuumed. The Hummock manager will take care of the lifecycle of a file — is a file being used? can we delete a file? etc.

The streaming state store has distinguished workload characteristics.

  • Every streaming executor will only read and write its own portion of data.
  • Data (generally) won’t be shared across nodes, so every worker node will only read and write its own data. Therefore, every Hummock API, like get or scan, only guarantees that writes on one node can be immediately read from the same node. In some cases, if we want to read data written from other nodes, we will need to wait for the epoch.
  • Streaming data are committed in serial. Based on the barrier-based checkpoint algorithm, the states are persisted epoch by epoch. We can tailor the write path specifically for the epoch-based checkpoint workload.

This leads to the design of Hummock, the cloud-native KV-based streaming state store. We’ll explain concepts like “epoch” and “barrier” in the following chapters.

The Hummock User API

source code

In this part, we will introduce how users can use Hummock as a KV store.

The Hummock itself provides 3 simple APIs: ingest_batch, get, and scan. Hummock provides MVCC write and read on KV pairs. Every key stored in Hummock has an epoch (aka. timestamp). Developers should specify an epoch when calling Hummock APIs.

Hummock doesn’t support writing a single key. To write data into Hummock, users should provide a sorted, unique list of keys and the corresponding operations (put value, delete), with an epoch, and call the ingest_batch API. Therefore, within one epoch, users can only have one operation for a key. For example,

[a => put 1, b => put 2] epoch = 1 is a valid write batch
[a => put 1, a => delete, b => put 2] epoch = 1 is an invalid write batch
[b => put 1, a => put 2] epoch = 1 is an invalid write batch

For reads, we can call the scan and get API on the Hummock client. Developers need to specify a read epoch for read APIs. Hummock only guarantees that writes on one node can be immediately read from the same node. Let’s take a look at the following example:

Node 1: write a => 1, b => 2 at epoch 1
Node 1: write a => 3, b => 4 at epoch 2
Node 2: write c => 5, d => 6 at epoch 2

After all operations have been done,

Read at epoch 2 on Node 1: a => 3, b => 4, (c => 5, d => 6 may be read)
Read at epoch 1 on Node 1: a => 1, b => 2
Read at epoch 2 on Node 2 with `wait_epoch 2`: a => 3, b => 4, c => 5, d => 6

Hummock Internals

In this part, we will discuss how data are stored and organized in Hummock internally. If you will develop Hummock, you should learn some basic concepts, like SST, key encoding, read / write path, consistency, from the following sections.

Storage Format

SST encoding source code

All key-value pairs are stored in block-based SSTables. Each user key is associated with an epoch. In SSTs, key-value pairs are sorted first by user key (lexicographical order), and then by epoch (largest to smallest).

For example, if users write two batches in consequence:

write a => 1, b => 2 at epoch 1
write a => delete, b => 3 at epoch 2

After compaction (w/ min watermark = 0), there will eventually be an SST with the following content:

(a, 2) => delete
(a, 1) => 1
(b, 2) => 3
(b, 1) => 2

The final written key (aka. full key) is encoded by appending the 8-byte epoch after the user key. When doing full key comparison in Hummock, we should always compare full keys using the KeyComparator to get the correct result.

Write Path

The Hummock client will batch writes and generate SSTs to sync to the underlying S3-compatible service. An SST consists of two files:

  • .data: Data file composed of ~64KB blocks, each of which contains the actual key-value pairs.
  • .meta: Meta file containing large metadata including min-max index, Bloom filter as well as data block metadata.

After the SST is uploaded to an S3-compatible service, the Hummock client will let the Hummock manager know there’s a new table. The list of all SSTs along with some metadata forms a version. When the Hummock client adds new SSTs to the Hummock manager, a new version will be generated with the new set of SST files.

Write Path

Read Path

To read from Hummock, we need a version (a consistent state of list of SSTs we can read) and epoch to generate a consistent read snapshot. To avoid RPC with the Hummock manager in every user read, the Hummock client will cache a most recent version locally. The local version will be updated when 1) the client initiates a write batch and 2) the background refresher triggers.

For every read operation (scan, get), we will first select SSTs that might contain the required keys.

For scan, we simply select by overlapping key range. For point get, we will filter SSTs further by Bloom filter. After that, we will compose a single MergeIterator over all SSTs. The MergeIterator will return all keys in range along with their epochs. Then, we will create UserIterator over MergeIterator, and for all user keys, the user iterator will pick the first full key whose epoch <= read epoch. Therefore, users can perform a snapshot read from Hummock based on the given epoch. The snapshot should be acquired beforehand and released afterwards.

Read Path

Hummock implements the following iterators:

  • BlockIterator: iterates a block of an SSTable.
  • SSTableIterator: iterates an SSTable.
  • ConcatIterator: iterates SSTables with non-overlapping key ranges.
  • MergeIterator: iterates SSTables with overlapping key ranges.
  • UserIterator: wraps internal iterators and outputs user key-value with epoch <= read epoch.

iterators source code

Compaction

Currently, Hummock is using a compaction strategy similar to leveled-compaction in RocksDB. It will compact data using consistent hash (docs and implementation TBD), so that data on shared storage distribute in the same way as how stream executors use them.

Compaction is done on a special worker node called compactor node. The standalone compactor listens for compaction jobs from the meta node, compacts one or more SSTs into new ones, and reports completion to the meta node. (In Hummock in-memory mode, compactor will be running as a thread inside compute node.)

To support MVCC read without affecting compaction, we track the epoch low watermark in Hummock snapshots. A user key-value pair will be retained if (1) it is the latest, or (2) it belongs to an epoch above the low watermark.

Transaction Management with Hummock Manager

source code of Hummock manager on meta service

In this part, we discuss how Hummock coordinates between multiple compute nodes. We will introduce key concepts like “snapshot”, “version”, and give examples on how Hummock manages them.

Every operation on the LSM-tree yields a new version on the Hummock manager, e.g., adding new L0 SSTs and compactions. In streaming, each stream barrier is associated with an epoch. When the barrier flows across the system and collected by the stream manager, we can start doing checkpoint on this epoch. SSTs produced in a single checkpoint are associated with an uncommitted epoch. After all compute nodes flush shared buffers to shared storage, the Hummock manager considers the epoch committed. Therefore, apart from the list of files in LSM, a version also contains committed epoch number max_committed_epoch and SSTs in uncommitted epochs. As a result, both an operation on LSM and a streaming checkpoint will yield a new version in the Hummock manager.

Currently, there is only one checkpoint happening in the system at the same time. In the future, we might support more checkpoint optimizations including concurrent checkpointing.

As mentioned in Read Path, reads are performed on a version based on a given epoch. During the whole read process, data from the specified read epoch cannot be removed by compaction, which is guaranteed by pinning a snapshot; SSTs within a version cannot be vacuumed by compaction, which is guaranteed by pinning a version.

The SQL frontend will get the latest epoch from the meta service. Then, it will embed the epoch number into SQL plans, so that all compute nodes will read from that epoch. In theory, both SQL frontend and compute nodes will pin the snapshot, to handle the case that frontend goes down and the compute nodes are still reading from Hummock (#622). However, to simplify the process, currently we only pin on the frontend side.

Hummock Service

Hummock only guarantees that writes on one node can be immediately read from the same node. However, the worker nodes running batch queries might have a slightly outdated version when a batch query plan is received (due to the local version caching). Therefore, we have a wait_epoch interface to wait until the local cached version contains full data of one epoch.

When there is no reference to a version, all file deletions in this version can be actually applied. There is a background vacuum task dealing with the actual deletion.

Checkpointing in Streaming

related PR

Now we discuss how streaming executors and the streaming manager use Hummock as a state store.

From the perspective of the streaming executors, when they receive a barrier, they will be “in the new epoch”. For data received in epoch 1, they will be persisted (write batch) with epoch 1. Receiving the barrier also causes the read and write epoch being set to the new one.

Here we have two cases: Agg executors always persist and produce new write batches when receiving a barrier; Join executors (in the future when async flush gets implemented) will produce write batches within an epoch.

Checkpoint in Streaming

Streaming executors cannot control when data will be persisted — they can only write to Hummock’s shared buffer. When a barrier flows across the system and is collected by the meta service, we can ensure that all executors have written their states of the previous epoch to the shared buffer, so we can initiate checkpoint process on all worker nodes, and upload SSTs to persistent remote storage.

For example, the barrier manager sends barrier epoch = 2. When the epoch 2 barrier is collected on meta service, we can ensure that data prior to epoch 2 have been fully flushed to the Hummock shared buffer. (Note that epoch number in streaming is generated by machine time + serial number, so we cannot simply use +1 -1 to determine the epoch of the previous / next barrier.) Assuming the previous barrier is of epoch 1, we can start checkpointing data from epoch 1 after barrier of epoch 2 has been collected.

The Hummock Shared Buffer

Introduction

Note: L0 and Shared buffer are totally different. They both exist independently.

The Hummock Shared Buffer serves 3 purposes:

  • Batch writes on worker node level, so as to reduce SST number.

    • Currently, a single epoch might produce hundreds of SST, which makes meta service hard to handle.
  • Support async checkpoint.

    • The shared buffer will generate SST based on epoch, and provide a consistent view of a epoch, by merging the snapshot of the storage SSTs and the immutable in-memory buffers.
  • Support read-after-write (so-called async flush), so as to make executor logic simpler.

    Currently, if executors need to compute the “maximum” value, there are only two ways:

    1. Produce a write batch (i.e. write directly to object store), and read from the storage (like ExtremeState i.e. the state of MAX()/MIN()).
    2. Write to in-memory flush buffer, and merge data from flush buffer and storage (like TopN, HashJoin).

Part 1: Async Checkpoint

Previously, when an executor is processing a barrier, it will flush its content to Hummock using the write_batch interface. But it turns out that we have so many executors, that a single epoch might produce 200∼300 write batches, which yields 200∼300 SSTs. The SST number is tremendous for an LSM engine.

The first idea is to batch writes. For example, we can collect all write batches from a single epoch, and flush them as one SST to the storage engine. However, it is not as simple as said — the downstream executor might rely on the upstream barrier to process data.

See the following example:

┌─────────────┐      ┌─────────────┐
│             │      │             │
│   HashAgg   ├──────► XXXExecutor │
│             │      │             │
└─────────────┘      └─────────────┘

When HashAgg is processing ExtremeState, it will need to “Produce a write batch, and read from the storage”. Therefore, it cannot produce data for the current epoch until data get fully checkpoint. However, we want to batch writes and checkpoint at once. This is a contradiction. Therefore, the only way to solve this is to add a memtable and async checkpoint.

The async checkpoint logic will be implemented with StreamManager (on meta) and HummockLocalManager (on CN), and we will only have immutable memtables in this part.

Write Path

Assume there is a new barrier flowing across our system.

When each executor consumes a barrier, it will produce a write batch (which should complete immediately) and forward the barrier.

After all executors have completed processing barrier, a checkpoint will be initiated, and new SST of the previous epoch will be produced. As our system only have one barrier flowing, we can simply checkpoint epoch 3 after stream manager collects barrier 4.

Read Path

As executors all own their own keyspace, the read path doesn’t need to do snapshot isolation. We can simply view all committed SSTs and memtables as a “HummockVersion”.

When the barrier is being processed (but not checkpoint), the read set is simply all memtables on current worker node and all SSTs on shared storage.

When a checkpoint is being processed, things would become a little bit complex. The HummockVersion might contain both SSTs committed, and SSTs being checkpoint. In this case, the read set is:

  • All SSTs (including those uncommitted), but set the read epoch to the previous checkpoint (in this case, < epoch 3, or simply <= epoch 2). Therefore, all epochs being checkpoint will be ignored from shared storage.
  • All memtable of the checkpoint epoch (in this case, epoch 3).

These two parts combined provide a consistent view of the KV storage.

After the checkpoint has been finished (SSTs have been committed), we can either:

  • Evict epoch 3 from Hummock memtable, and serve all data from shared storage.
  • Retain latest N epochs in Hummock memtable, and serve all data where memtable epoch >= E, and shared storage epoch < E.

Having implemented async checkpoint, we can now batch writes and significantly reduce SST number.

Part 2: Write Anytime / Async Flush

As said in introduction, previously, all RisingWave streaming executors will do either of the following to maintain operator states:

  • merge-on-write: Produce a write batch, and read from the storage (like ExtremeState).
  • merge-on-read: Write to in-memory flush buffer, and merge data from flush buffer and storage (like TopN, HashJoin).

We have established earlier that the merge-on-write way is not scalable, as it will produce too many SSTs. So we will explore the second way.

When executors are using the second way, it will always need to “merge data from flush buffer and storage”. This “merge iterator” has been implemented in various ways in different executors, and make the ManagedState very hard to read.

Therefore, to standardize this, we support “async flush” in shared buffer, which means that streaming executors can write to the state store at any time, and the state store will provide “read after write” semantics within epoch.

Currently, all streaming executors will only read their own keys, since they are partitioned by state_table_id and vnode.

Therefore, we can leverage this property to provide a mutable memtable to each executor, and unify the “merge” logic across all “merge-on-read” executors.

A New Merge Iterator

Apart from the MergeIterator in Hummock, which merges SSTs from various levels in the LSMTree, we now need a “unified” merge iterator above the state store:

The MutableMemTable will store data in its in-memory-representation (e.g., i32, i32). The special MergeIterator will merge encoded data from Hummock and memory-representation data from MutableMemTable.

Therefore, we can unify all logic across TopN executor, HashJoin executor, etc.

Every executor only has one active MutableMemTable. When one epoch ends, the MutableMemTable should be converted to an immutable memtable in Hummock, and everything stays the same as The Hummock Shared Buffer — Part 1 (Async Checkpoint).

Considerations

For all data a, b of the same type, we must ensure that:

in-memory representation of a < in-memory representation of b,
iff memcomparable(a) < memcomparable(b)

Storing State Using Relational Table

Row-based Encoding

RisingWave adapts a relational data model. Relational tables, including tables and materialized views, consist of a list of named, strong-typed columns. All streaming executors store their data into a KV state store, which is backed by a service called Hummock. There are two choices to save a relational row into key-value pairs: cell-based format and row-based format. We choose row-based format because internal states always read and write the whole row, and don’t need to partially update some fields in a row. Row-based encoding has better performance than cell-based encoding, which reduces the number of read and write kv pairs.

We implement a relational table layer as the bridge between executors and KV state store, which provides the interfaces accessing KV data in relational semantics. As the executor state’s encoding is very similar to a row-based table, each kind of state is stored as a row-based relational table first. In short, one row is stored as a key-value pair. For example, encoding of some stateful executors in row-based format is as follows:

statekeyvalue
mvtable_id | sort key | pkmaterialized value
top ntable_id | sort key | pkmaterialized value
jointable_id | join_key | pkmaterialized value
aggtable_id | group_keyagg_value

Relational Table Layer

source code

In this part, we will introduce how stateful executors interact with KV state store through the relational table layer.

Relational table layer consists of State Table, Mem Table and Storage Table. The State Table and MemTable is used in streaming mode, and Storage Table is used in batch mode.

State Table provides the table operations by these APIs: get_row, scan, insert_row, delete_row and update_row, which are the read and write interfaces for streaming executors. The Mem Table is an in-memory buffer for caching table operations during one epoch. The Storage Table is read only, and will output the partial columns upper level needs.

Overview of Relational Table

Write Path

To write into KV state store, executors first perform operations on State Table, and these operations will be cached in Mem Table. Once a barrier flows through one executor, executor will flush the cached operations into state store. At this moment, State Table will covert these operations into kv pairs and write to state store with specific epoch.

For example, an executor performs insert(a, b, c) and delete(d, e, f) through the State Table APIs, Mem Table first caches these two operations in memory. After receiving new barrier, State Table converts these two operations into KV operations by row-based format, and writes these KV operations into state store (Hummock).

write example

Read Path

In streaming mode, executors should be able to read the latest written data, which means uncommitted data is visible. The data in Mem Table (memory) is fresher than that in shared storage (state store). State Table provides both point-get and scan to read from state store by merging data from Mem Table and Storage Table.

Get

For example, let’s assume that the first column is the pk of relational table, and the following operations are performed.

insert [1, 11, 111]
insert [2, 22, 222]
delete [2, 22, 222]
insert [3, 33, 333]

commit

insert [3, 3333, 3333]

After commit, a new record is inserted again. Then the Get results with corresponding pk are:

Get(pk = 1): [1, 11, 111]
Get(pk = 2):  None
Get(pk = 3): [3, 3333, 3333]

Scan

Scan on relational table is implemented by StateTableIter, which is a merge iterator of MemTableIter and StorageIter. If a pk exists in both KV state store (shared storage) and memory (MemTable), result of MemTableIter is returned. For example, in the following figure, StateTableIter will generate 1->4->5->6 in order.

Scan example

Example: HashAgg

In this doc, we will take HashAgg with extreme state (max, min) or value state (sum, count) for example, and introduce a more detailed design for the internal table schema.

Code

Table id

table_id is a globally unique id allocated in meta for each relational table object. Meta is responsible for traversing the Plan Tree and calculating the total number of Relational Tables needed. For example, the Hash Join Operator needs 2, one for the left table and one for the right table. The number of tables needed for Agg depends on the number of agg calls.

Value State (Sum, Count)

Query example:

select sum(v2), count(v3) from t group by v1

This query will need to initiate 2 Relational Tables. The schema is table_id/group_key.

Extreme State (Max, Min)

Query example:

select max(v2), min(v3) from t group by v1

This query will need to initiate 2 Relational Tables. If the upstream is not append-only, the schema becomes table_id/group_key/sort_key/upstream_pk.

The order of sort_key depends on the agg call kind. For example, if it’s max(), sort_key will order with Ascending. if it’s min(), sort_key will order with Descending. The upstream_pk is also appended to ensure the uniqueness of the key. This design allows the streaming executor not to read all the data from the storage when the cache fails, but only a part of it. The streaming executor will try to write all streaming data to storage, because there may be update or delete operations in the stream, it’s impossible to always guarantee correct results without storing all data.

If t is created with append-only flag, the schema becomes table_id/group_key, which is the same for Value State. This is because in the append-only mode, there is no update or delete operation, so the cache will never miss. Therefore, we only need to write one value to the storage.

Build RisingWave with Multiple Object Storage Backends

Overview

As a cloud-neutral database, RisingWave supports running on different (object) storage backends. Currently, these storage products include

This doc first briefly introduces how RisingWave supports these storage products, then give a guidance about how to build RisingWave with these object stores quickly and easily through risedev.

How RisingWave supports multiple object storage backends

The first supported object storage was s3. Afterwards, for other object storage, RisingWave supports them in two ways: via s3 compatible mode or via OpenDAL.

S3 and other S3 compatible object store

If an object store declares that it is s3-compatible, it means that it can be directly accessed through the s3 APIs. As RisingWave already implemented S3ObjectStore, we can reuse the interfaces of s3 to access this kind of object storage.

Currently for COS and Lyvecloud Storage, we use s3 compatible mode. To use these two object storage products, you need to overwrite s3 environmrnt with the corresponding access_key, secret_key, region and bueket_name, and config endpoint as well.

OpenDAL object store

For those (object) storage products that are not compatible with s3 (or compatible but some interfaces are unstable), we use OpenDAL to access them. OpenDAL is the Open Data Access Layer to freely access data, which supports several different storage backends. We implemented a OpenDALObjectStore to support the interface for accessing object store in RisingWave.

All of these object stores are supported in risedev, you can use the risedev command to start RisingWave on these storage backends.

How to build RisingWave with multiple object store

COS & Lyvecloud Storage

To use COS or Lyvecloud Storage, you need to overwrite the aws default access_key, secret_key, region, and config endpoint in the environment variable:

export AWS_REGION=your_region
export AWS_ACCESS_KEY_ID=your_access_key
export AWS_SECRET_ACCESS_KEY=your_secret_key
export RW_S3_ENDPOINT=your_endpoint

then in risedev.yml, set the bucket name, starting RisingWave with ridedev. Then you can successfully run RisingWave on these two storage backends.

GCS

To use GCS, you need to enable OpenDAL in risedev.yml, set engine = gcs, bucket_name and root as well. For authentication, you just need to config credential by gcloud, and then OpenDAL gcs backend will automatically find the token in application to complete the verification.

Once these configurations are set, run ./risedev d gcs and then you can run RisingWave on GCS.

OSS

To use OSS, you need to enable OpenDAL in risedev.yml, set engine = oss, bucket_name and root as well.

For authentication, set the identity information in the environment variable:

export OSS_ENDPOINT="endpoint"
export OSS_ACCESS_KEY_ID="oss_access_key"
export OSS_ACCESS_KEY_SECRET="oss_secret_key"

Once these configurations are set, run ./risedev d oss and then you can run RisingWave on OSS.

Azure Blob

To use Azure Blob, you need to enable OpenDAL in risedev.yml, set engine = azblob, bucket_name and root as well. For azure blob storage, bucket_name is actually the container_name.

For authentication, set the identity information in the environment variable:

export AZBLOB_ENDPOINT="endpoint"
export AZBLOB_ACCOUNT_NAME="your_account_name"
export AZBLOB_ACCOUNT_KEY="your_account_key"

Once these configurations are set, run ./risedev d azblob and then you can run RisingWave on Azure Blob Storage.

HDFS

HDFS requairs complete hadoop environment and java environment, which are very heavy. Thus, RisingWave does not open the hdfs feature by default. To compile RisingWave with hdfs backend, turn on this feature first, and enable hdfs for risedev tools. Run ./risedev configure, and enable [Component] Hummock: Hdfs Backend.

After that, you need to enable OpenDAL in risedev.yml, set engine = hdfs, namenode and root as well.

You can also use WebHDFS as a lightweight alternative to HDFS. Hdfs is powered by HDFS’s native java client. Users need to setup the hdfs services correctly. But webhdfs can access from HTTP API and no extra setup needed. The way to start WebHDFS is basically the same as hdfs, but its default name_node is 127.0.0.1:9870.

Once these configurations are set, run ./risedev d hdfs or ./risedev d webhdfs, then you can run RisingWave on HDFS(WebHDFS).

Meta Service

Background

RisingWave provides both real-time analytical query as well as high-concurrent access to Materialized Views. Therefore, both the frontend and compute nodes are designed to be scalable, and they may share the same set of host machines or not, depending on cluster size and whether the user cares about resource isolation.

Meanwhile, components such as metadata provider, scheduler, monitoring are more suitable for a centralized design. For example, a typical on-premise deployment may look like below, where the dotted boxes represent minimal unit of deployment (VM or container).

Cluster Deployment

Meta Store

Metadata should be regarded as transactional data. Especially, we need guarantees like atomicity, consistency (e.g. read-after-write) and transaction support. Besides, it’s not necessary to scale out it to multiple nodes.

We choose etcd for production deployments, which is a replicated key-value storage based on B-Tree and Raft protocol.

To fit into the key-value data model, metadata entries are serialized with Protobuf as the value and the key is the ID or version of them, with respect to the kind of metadata.

Types of Metadata

Catalog

Catalog is the metadata of relational tables in databases.

  • Database & Schema: Namespace of database objects like in PostgreSQL.
  • Table & Materialized Views: Definition of tables & materialized views along with the columns on them.
  • Source: User-defined external data sources.

To execute a DDL statement like CREATE or DROP TABLE, the frontend sends an RPC to meta node and waits the updated catalog to take effect.

Storage

Hummock, an LSM-Tree-based storage engine, stores the mapping from version to the set of SSTable files in Meta Service. See more details in the overview of State Store.

Push on Updates

There are 2 choices on how to distribute information across multiple nodes.

  • Push: When metadata changes, the meta node tells all nodes to update, and master node must wait for others to acknowledge before continuing.
  • Pull: When data changes, the master node does nothing. Other nodes may not have the latest information, so they need to ask the master node every time.

Currently, for simplicity, we choose the push-style approach for all kinds of metadata. This is implemented as NotificationService on meta service and ObserverManager on frontend and compute nodes.

Notification

ObserverManager will register itself to meta service on bootstrap and subscribe metadata it needs. Afterwards, once metadata changed, the meta node streams the changes to it, expecting all subscribers to acknowledge.

Data Model and Encoding

Data Model

Source files: common/src/types

RisingWave adapts a relational data model with extensive support for semi-structured data. Relational tables, including tables and materialized views, consist of a list of named, strong-typed columns.

Tables created by users have an implicit, auto-generated row-id column as their primary key; while for materialized views, the primary key is derived from queries. For example, the primary key of an aggregation (group-by) materialized view is the specified group keys.

NULL values mean missing or unknown fields. Currently, all columns are implicitly nullable.

Primitive data types:

  • Booleans: BOOLEAN
  • Integers: SMALLINT (16-bit), INT (32-bit), BIGINT (64-bit)
  • Decimals: NUMERIC
  • Floating-point numbers: REAL, DOUBLE
  • Strings: VARCHAR
  • Temporals: DATE, TIMESTAMP, TIMESTAMP WITH TIME ZONE, TIME, INTERVAL

Composite data types:

  • Struct: A structure with a list of named, strong-typed fields.
  • List: A variable-length list of values with same data type.

In-Memory Encoding

Source files: common/src/array

In-memory data is encoded in arrays for vectorized execution. For variable-length data like strings, generally we use another offset array to mark the start of encoded values in a byte buffer.

A Data Chunk consists of multiple columns and a visibility array, as is shown in the left subgraph below. The visibility array marks each row as visible or not. This helps filtering some rows while keeping other data arrays unchanged.

A Stream Chunk consists of columns, visibility array and an additional ops column, as is shown in the right subgraph below. The ops column marks the operation of row, which can be one of Delete, Insert, UpdateDelete and UpdateInsert.

chunk

On-Disk Encoding

Source files: utils/memcomparable, utils/value-encoding

RisingWave stores user data in shared key-value storage called ‘Hummock’. Tables, materialized views and checkpoints of internal streaming operators are encoded into key-value entries. Every field of a row, a.k.a. cell, is encoded as a key-value entry, except that NULL values are omitted.

row-format

Considering that ordering matters in some cases, e.g. result set of an order-by query, fields of keys must preserve the order of original values after being encoded into bytes. This is what memcomparable is used for. For example, integers must be encoded in big-endian and the sign bit must be flipped to preserve order. In contrast, the encoding of values does not need to preserve order.

Design: Local execution mode for online serving queries

Background

The query engine of RisingWave supports two types of queries: highly concurrent point queries and ad-hoc queries. The characteristics of these two different kinds of queries are summarized as follows:

Point QueriesAdhoc Queries
Latencyseveral msms to minutes
QPS1000 ~ 100000100 ~ 1000
SQLSimpleArbitrary complex
Result SetSmallSmall, Medium, Large
Use ScenariosDashboardAdhoc analysis

Our distributed query processing engine is designed for complex adhoc queries, and it can’t meet the latency/QPS requirement of point queries, and in this article we introduce local execution mode for point queries.

Design

Frontend Flow

Example 1: select a from t where b in (1, 2, 3, 4)

Let’s use the above SQL as an example:

Example 1

The key changes from the distributed mode:

  1. The exchange executor will be executed directly by local query execution, not by distributed scheduler. This means that we no longer have async execution/monitoring, etc.
  2. The rpc is issued by exchange executor directly, not by scheduler.

Example 2: SELECT pk, t1.a, t1.fk, t2.b FROM t1, t2 WHERE t1.fk = t2.pk AND t1.pk = 114514

Following is the plan and execution of above sql in local mode:

Example 2

As explained above, the lookup join/exchange phase will be executed directly on frontend. The pushdown(filter/table, both the build and probe side) will be issued by executors rather than scheduler.

Optimization/Scheduling

The overall process will be quite similar to distributed processing, but with a little difference:

  1. We only use heuristic optimizer for it, and only a limited set of rules will be applied.
  2. The scheduler will not be involved, and the physical plan is executed in the current thread (coroutine) immediately.

Monitoring/Management

Local execution mode will not go through query management mentioned in batch query manager to reduce latency as much as possible.

How to switch between local/distributed execution modes?

As mentioned in the first paragraph, the main use case for local execution mode is determined(dashboard/reporting), so currently we just expose a session configuration(query_mode) to user. In future we may use optimizer to determined it if required.

RPC execution in local mode

In the distributed mode we have several steps to execute a computing task and fetch results:

There are some problems with above process in local mode:

  1. We need at least two rpcs to fetch task execution result, this increases query overhead
  2. We have task lifecycle management APIs, this is unnecessary for local mode.
  3. We may need to add several new APIs for task monitoring/failure detection

For the local mode we will add a new rpc API:

rpc Execute(ExecuteRequest) returns (ExecuteResponse)

message ExecuteRequest {
 batch_plan.PlanFragment plan = 2;
 uint64 epoch = 3;
}

message ExecuteResponse {
  common.Status status = 1;
  data.DataChunk record_batch = 2;
}

This is quite similar to distributed execution APIs, but with some differences:

  1. Response is returned directly in rpc call, without the need of another call.
  2. No task lifecycle management/monitoring is involved, and if it fails, we just remove the task and return error in response directly.

Consistent Hash

Background

Scaling could occur for multiple reasons in RisingWave. For example, when workload of some streaming operator is heavier than expected, we need to scale out. During scaling out or scaling in, physical resources will be allocated or freed accordingly. That is to say, new actors will be created on scaling out, while some old actors will be dropped on scaling in. As a result of actor change, redistributing data (i.e. states of the actors) is required inevitably. That yields a question: how to efficiently determine the distribution of data and minimize data movement on scaling?

On the other hand, we need to parallel the scan on tables or materialized views in batch query mode. Therefore, we need to partition the data in a way that could boost the performance most. So what is the most beneficial way of data partition for tables and materialized views?

In RisingWave, we adopt consistent-hash-based strategy to solve the two problems above. This document will elaborate on our design.

Design

Meta

Actor Scheduling

First, we need to introduce a little about how we schedule the actors. Each worker node in RisingWave cluster will have a number of parallel units. A parallel unit is the minimal scheduling unit in RisingWave, as well as the physical location of an actor. Each actor will be scheduled to exactly one parallel unit.

Data Distribution

Here comes the main part, where we will construct a mapping that determines data distribution.

For all data \( k \in U_k \), where \( U_k \) is an unbounded set, we apply a hash function \( v = H(k) \), where \( v \) falls to a limited range. The hash function \( H \) ensures that all data are hashed uniformly to that range. We call \( v \) vnode, namely virtual node, as is shown as the squares in the figure below.

initial data distribution

Then we have vnode mapping, which ensures that vnodes are mapped evenly to parallel units in the cluster. In other words, the number of vnodes that are mapped to each parallel unit should be as close as possible. This is denoted by different colors in the figure above. As is depicted, we have 3 parallel units (shown as circles), each taking \( \frac{1}{3} \) of total vnodes. Vnode mapping is constructed and maintained by meta.

As long as the hash function \( H \) could ensure uniformity, the data distribution determined by this strategy would be even across physical resources. The evenness will be retained even if data in \( U_k \) are skewed to a certain range, say, most students scoring over 60 in a hundred-mark system.

Data Redistribution

Since \( v = H(k) \), the way that data are mapped to vnodes will be invariant. Therefore, when scaling occurs, we only need to modify vnode mapping (the way that vnodes are mapped to parallel units), so as to redistribute the data.

Let’s take scaling out for example. Assume that we have one more parallel unit after scaling out, as is depicted as the orange circle in the figure below. Using the optimal strategy, we modify the vnode mapping in such a way that only \( \frac{1}{4} \) of the data have to be moved, as is shown in the figure below. The vnodes whose data are required to be moved are highlighted with bold border in the figure.

optimal data redistribution

To minimize data movement when scaling occurs, we should be careful when we modify the vnode mapping. Below is an opposite example. Modifying vnode mapping like this will result in \( \frac{1}{2} \) of the data being moved.

worst data redistribution

Streaming

We know that a fragment has several actors as its different parallelisms, and that upstream actors will send data to downstream actors via dispatcher. The figure below illustrates how actors distribute data based on consistent hash by example.

actor data distribution

In the figure, we can see that one upstream actor dispatches data to three downstream actors. The downstream actors are scheduled on the parallel units mentioned in previous example respectively.

Based on our consistent hash design, the dispatcher is informed of the latest vnode mapping by meta node. It then decides how to send data by following steps:

  1. Compute vnode of the data via the hash function \( H \). Let the vnode be \( v_k \).
  2. Look up vnode mapping and find out parallel unit \( p_n \) that vnode \( v_k \) maps to.
  3. Send data to the downstream actor that is scheduled on parallel unit \( p_n \) (remember that one actor will be scheduled on exactly one parallel unit).

In this way, all actors’ data (i.e. actors’ states) will be distributed according to the vnode mapping constructed by meta.

When scaling occurs, actors will be re-scheduled accordingly. By modifying the vnode mapping in meta and make streaming act on the new vnode mapping, we could minimize data movement from following aspects:

  • The data of existing actors will not be displaced too much.
  • The block cache of a compute node will not be invalidated too much.

Batch

When we perform parallel batch read, we should partition the data for each parallelism in some way. Now that we have vnodes corresponding to disjoint sets of data, this naturally forms a data partition pattern: one vnode could be viewed as a minimal data partition unit, and we could aggregate several vnodes together to get a larger data partition.

In vnode mapping introduced above, one parallel unit will correspond to several vnodes, so we could view vnodes that are mapped to one parallel unit as one partition group. In the figure above, namely, a partition group is vnodes with the same color (or the data that are hashed to these vnodes).

This is better than range partition in that this approach of partition is more stable when the primary key of a materialized view distributes non-randomly, for example, monotonically increasing.

Storage

If we look into the read-write pattern of streaming actors, we’ll find that in most cases, actors only need to read the data written by itself (i.e. actor’s internal states). Namely, read data with the same vnodes as it previously writes.

Therefore, an instinctive way to place data in storage is to group data by vnodes. In this way, when actors perform read operation, they could touch as few SST blocks as possible and thus trigger less I/O.

We know that Hummock, our LSM-Tree-based storage engine, sorts key-value pairs by the order of the key. Hence, in order to group data by vnode on the basis of Hummock, we encode vnode into the storage key. The storage key will look like

table_id | vnode | ...

where table_id denotes the state table, and vnode is computed via \( H \) on key of the data.

To illustrate this, let’s revisit the previous example. Executors of an operator will share the same logical state table, just as is shown in the figure below:

actor state table

Now that we have 12 vnodes in total in the example, the data layout in storage will accordingly look like this:

storage data layout

Note that we only show the logical sequence and aggregation of data in this illustration. The actual data may be separated into different SSTs in Hummock.

Since the way that certain data are hashed to vnode is invariant, the encoding of the data will also be invariant. How we schedule the fragment (e.g. parallelism of the fragment) will not affect data encoding. In other words, storage will not care about vnode mapping, which is determined by meta and used only by streaming. This is actually a way of decoupling the storage layer from the compute layer.

Keys

Document the different Keys in RisingWave.

Stream Key

The key which can identify records in the RisingWave stream.

For example, given the following stream chunk, where stream key is k1, k2:

| op | k1 | k2 | v1 | v2 |
|----|----|----|----|----|
| -  | 1  | 2  | 1  | 1  |
| +  | 1  | 2  | 3  | 4  |
| +  | 0  | 1  | 2  | 3  |

We can tell that the record corresponding to the key (1, 2) has been updated from (1, 2, 1, 1) to (1, 2, 3, 4).

The record corresponding to key (0, 1) has been inserted with (0, 1, 2, 3).

It may not be the minimal set of columns required to identify a record, for instance group key could be part of the stream key, to specify the distribution of records.

Primary Key (Storage)

This discusses the internal primary key (pk) which we often see in streaming operators. It is different from the primary key in SQL.

A more appropriate name for this would be Storage Primary Key.

Besides uniquely identifying a record in storage, this key may also be used to provide ordering properties.

Let’s use the following query as an example:

create table t1(id bigint primary key, i bigint);
create materialized view mv1 as select id, i from t1 order by i, id;

mv1 has the following plan:

 StreamMaterialize {
   columns: [id, i],
   stream_key: [id],
   pk_columns: [i, id], -- notice the pk_columns
   pk_conflict: NoCheck
 }
 └─StreamTableScan { table: t1, columns: [id, i] }

You can see that the pk_columns are [i, id], although the upstream SQL primary key is just id. In the storage layer, key-value pairs are sorted by their keys.

Because the materialized view contains an order by i, id, the storage primary key is [i, id] to ensure they are ordered in storage. Importantly, i will be a prefix of the key.

Then when iterating over the keys from storage, the records are returned in the correct order per partition.

When the update stream comes, we can just use id to identify the records that need to be updated. We can get the whole record corresponding to the id and get the i from there. Then we can use that to update the materialized state accordingly.