test: introduce unstable fuzz create table test (#3788)

* feat: implement unstable_fuzz_create_table_standalone

* chore: use drop database

* docs: update docs

* chore: add ci config

* chore: add feature gate

* fix: fix clippy

* chore: update ci

* Apply suggestions from code review

* feat: reduce num

* Apply suggestions from code review

* chore: apply suggestions from CR

* Apply suggestions from code review

* chore: reduce `wait_timeout` in health check

* Update .env.example

* refactor: use `init_greptime_connections_via_env`

* refactor: use `init_greptime_connections_via_env`

---------

Co-authored-by: tison <wander4096@gmail.com>
This commit is contained in:
Weny Xu
2024-05-01 14:08:49 +08:00
committed by GitHub
parent 3b89b9ddd8
commit f6e2039eb8
20 changed files with 846 additions and 20 deletions

View File

@@ -24,3 +24,7 @@ GT_KAFKA_ENDPOINTS = localhost:9092
# Setting for fuzz tests
GT_MYSQL_ADDR = localhost:4002
# Setting for unstable fuzz tests
GT_FUZZ_BINARY_PATH=/path/to/
GT_FUZZ_INSTANCE_ROOT_DIR=/tmp/unstable_greptime

View File

@@ -3,11 +3,17 @@ description: 'Fuzz test given setup and service'
inputs:
target:
description: "The fuzz target to test"
required: true
max-total-time:
description: "Max total time(secs)"
required: true
unstable:
default: 'false'
description: "Enable unstable feature"
runs:
using: composite
steps:
- name: Run Fuzz Test
shell: bash
run: cargo fuzz run ${{ inputs.target }} --fuzz-dir tests-fuzz -D -s none -- -max_total_time=120
env:
GT_MYSQL_ADDR: 127.0.0.1:4002
run: cargo fuzz run ${{ inputs.target }} --fuzz-dir tests-fuzz -D -s none ${{ inputs.unstable == 'true' && '--features=unstable' || '' }} -- -max_total_time=${{ inputs.max-total-time }}

View File

@@ -171,8 +171,62 @@ jobs:
uses: ./.github/actions/fuzz-test
env:
CUSTOM_LIBFUZZER_PATH: /usr/lib/llvm-14/lib/libFuzzer.a
GT_MYSQL_ADDR: 127.0.0.1:4002
with:
target: ${{ matrix.target }}
max-total-time: 120
unstable-fuzztest:
name: Unstable Fuzz Test
needs: build
runs-on: ubuntu-latest
strategy:
matrix:
target: [ "unstable_fuzz_create_table_standalone" ]
steps:
- uses: actions/checkout@v4
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
shared-key: "fuzz-test-targets"
- name: Set Rust Fuzz
shell: bash
run: |
sudo apt update && sudo apt install -y libfuzzer-14-dev
cargo install cargo-fuzz
- name: Download pre-built binaries
uses: actions/download-artifact@v4
with:
name: bins
path: .
- name: Unzip binaries
run: tar -xvf ./bins.tar.gz
- name: Fuzz Test
uses: ./.github/actions/fuzz-test
env:
CUSTOM_LIBFUZZER_PATH: /usr/lib/llvm-14/lib/libFuzzer.a
GT_MYSQL_ADDR: 127.0.0.1:4002
GT_FUZZ_BINARY_PATH: ./bins/greptime
GT_FUZZ_INSTANCE_ROOT_DIR: /tmp/unstable-greptime/
with:
target: ${{ matrix.target }}
max-total-time: 120
unstable: 'true'
- name: Upload unstable fuzz test logs
if: failure()
uses: actions/upload-artifact@v4
with:
name: unstable-fuzz-logs
path: /tmp/unstable-greptime/
retention-days: 3
sqlness:
name: Sqlness Test

3
Cargo.lock generated
View File

@@ -10237,15 +10237,18 @@ dependencies = [
"dotenv",
"lazy_static",
"libfuzzer-sys",
"nix 0.28.0",
"partition",
"rand",
"rand_chacha",
"reqwest",
"serde",
"serde_json",
"snafu",
"sql",
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)",
"sqlx",
"tinytemplate",
"tokio",
]

View File

@@ -10,6 +10,10 @@ workspace = true
[package.metadata]
cargo-fuzz = true
[features]
default = []
unstable = ["nix"]
[dependencies]
arbitrary = { version = "1.3.0", features = ["derive"] }
async-trait = { workspace = true }
@@ -24,9 +28,11 @@ derive_builder = { workspace = true }
dotenv = "0.15"
lazy_static = { workspace = true }
libfuzzer-sys = "0.4"
nix = { version = "0.28", features = ["process", "signal"], optional = true }
partition = { workspace = true }
rand = { workspace = true }
rand_chacha = "0.3.1"
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
snafu = { workspace = true }
@@ -38,10 +44,11 @@ sqlx = { version = "0.6", features = [
"postgres",
"chrono",
] }
tinytemplate = "1.2"
tokio = { workspace = true }
[dev-dependencies]
dotenv.workspace = true
tokio = { workspace = true }
[[bin]]
name = "fuzz_create_table"
@@ -91,3 +98,11 @@ path = "targets/fuzz_create_database.rs"
test = false
bench = false
doc = false
[[bin]]
name = "unstable_fuzz_create_table_standalone"
path = "targets/unstable/fuzz_create_table_standalone.rs"
test = false
bench = false
doc = false
required-features = ["unstable"]

View File

@@ -9,6 +9,22 @@ cargo install cargo-fuzz
2. Start GreptimeDB
3. Copy the `.env.example`, which is at project root, to `.env` and change the values on need.
### For stable fuzz tests
Set the GreptimeDB MySQL address.
```
GT_MYSQL_ADDR = localhost:4002
```
### For unstable fuzz tests
Set the binary path of the GreptimeDB:
```
GT_FUZZ_BINARY_PATH = /path/to/
```
Change the instance root directory(the default value: `/tmp/unstable_greptime/`)
```
GT_FUZZ_INSTANCE_ROOT_DIR = /path/to/
```
## Run
1. List all fuzz targets
```bash

View File

@@ -0,0 +1,23 @@
mode = 'standalone'
enable_memory_catalog = false
require_lease_before_startup = true
[wal]
provider = "raft_engine"
file_size = '1GB'
purge_interval = '10m'
purge_threshold = '10GB'
read_batch_size = 128
sync_write = false
[storage]
type = 'File'
data_home = '{data_home}'
[grpc_options]
addr = '127.0.0.1:4001'
runtime_size = 8
[procedure]
max_retry_times = 3
retry_delay = "500ms"

View File

@@ -16,6 +16,8 @@ use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use crate::ir::create_expr::{CreateDatabaseExprBuilderError, CreateTableExprBuilderError};
#[cfg(feature = "unstable")]
use crate::utils::process::Pid;
pub type Result<T> = std::result::Result<T, Error>;
@@ -23,6 +25,22 @@ pub type Result<T> = std::result::Result<T, Error>;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to create a file: {}", path))]
CreateFile {
path: String,
location: Location,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Failed to write a file: {}", path))]
WriteFile {
path: String,
location: Location,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Unexpected, violated: {violated}"))]
Unexpected {
violated: String,
@@ -56,4 +74,23 @@ pub enum Error {
#[snafu(display("Failed to assert: {}", reason))]
Assert { reason: String, location: Location },
#[snafu(display("Child process exited unexpected"))]
UnexpectedExited { location: Location },
#[snafu(display("Failed to spawn a child process"))]
SpawnChild {
location: Location,
#[snafu(source)]
error: std::io::Error,
},
#[cfg(feature = "unstable")]
#[snafu(display("Failed to kill a process, pid: {}", pid))]
KillProcess {
location: Location,
#[snafu(source)]
error: nix::Error,
pid: Pid,
},
}

View File

@@ -12,21 +12,40 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod config;
pub mod health;
#[cfg(feature = "unstable")]
pub mod process;
use std::env;
use common_telemetry::info;
use sqlx::mysql::MySqlPoolOptions;
use sqlx::{MySql, Pool};
/// Database connections
pub struct Connections {
pub mysql: Option<Pool<MySql>>,
}
const GT_MYSQL_ADDR: &str = "GT_MYSQL_ADDR";
pub async fn init_greptime_connections() -> Connections {
/// Connects to GreptimeDB via env variables.
pub async fn init_greptime_connections_via_env() -> Connections {
let _ = dotenv::dotenv();
let mysql = if let Ok(addr) = env::var(GT_MYSQL_ADDR) {
Some(addr)
} else {
info!("GT_MYSQL_ADDR is empty, ignores test");
None
};
init_greptime_connections(mysql).await
}
/// Connects to GreptimeDB.
pub async fn init_greptime_connections(mysql: Option<String>) -> Connections {
let mysql = if let Some(addr) = mysql {
Some(
MySqlPoolOptions::new()
.connect(&format!("mysql://{addr}/public"))
@@ -34,9 +53,33 @@ pub async fn init_greptime_connections() -> Connections {
.unwrap(),
)
} else {
info!("GT_MYSQL_ADDR is empty, ignores test");
None
};
Connections { mysql }
}
const GT_FUZZ_BINARY_PATH: &str = "GT_FUZZ_BINARY_PATH";
const GT_FUZZ_INSTANCE_ROOT_DIR: &str = "GT_FUZZ_INSTANCE_ROOT_DIR";
/// The variables for unstable test
pub struct UnstableTestVariables {
pub binary_path: String,
pub root_dir: Option<String>,
}
/// Loads env variables for unstable test
pub fn load_unstable_test_env_variables() -> UnstableTestVariables {
let _ = dotenv::dotenv();
let binary_path = env::var(GT_FUZZ_BINARY_PATH).expect("GT_FUZZ_BINARY_PATH not found");
let root_dir = if let Ok(root) = env::var(GT_FUZZ_INSTANCE_ROOT_DIR) {
Some(root)
} else {
None
};
UnstableTestVariables {
binary_path,
root_dir,
}
}

View File

@@ -0,0 +1,58 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use common_telemetry::tracing::info;
use serde::Serialize;
use snafu::ResultExt;
use tinytemplate::TinyTemplate;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use crate::error;
use crate::error::Result;
/// Get the path of config dir `tests-fuzz/conf`.
pub fn get_conf_path() -> PathBuf {
let mut root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
root_path.push("conf");
root_path
}
/// Returns rendered config file.
pub fn render_config_file<C: Serialize>(template_path: &str, context: &C) -> String {
let mut tt = TinyTemplate::new();
let template = std::fs::read_to_string(template_path).unwrap();
tt.add_template(template_path, &template).unwrap();
tt.render(template_path, context).unwrap()
}
// Writes config file to `output_path`.
pub async fn write_config_file<C: Serialize>(
template_path: &str,
context: &C,
output_path: &str,
) -> Result<()> {
info!("template_path: {template_path}, output_path: {output_path}");
let content = render_config_file(template_path, context);
let mut config_file = File::create(output_path)
.await
.context(error::CreateFileSnafu { path: output_path })?;
config_file
.write_all(content.as_bytes())
.await
.context(error::WriteFileSnafu { path: output_path })?;
Ok(())
}

View File

@@ -0,0 +1,57 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use crate::utils::info;
/// Check health of the processing.
#[async_trait::async_trait]
pub trait HealthChecker: Send + Sync {
async fn check(&self);
fn wait_timeout(&self) -> Duration;
}
/// Http health checker.
pub struct HttpHealthChecker {
pub url: String,
}
#[async_trait::async_trait]
impl HealthChecker for HttpHealthChecker {
async fn check(&self) {
loop {
match reqwest::get(&self.url).await {
Ok(resp) => {
if resp.status() == 200 {
info!("Health checked!");
return;
}
info!("Failed to check health, status: {}", resp.status());
}
Err(err) => {
info!("Failed to check health, error: {err:?}");
}
}
info!("Checking health later...");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
fn wait_timeout(&self) -> Duration {
Duration::from_secs(5)
}
}

View File

@@ -0,0 +1,264 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::process::{ExitStatus, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use common_telemetry::{info, warn};
use nix::sys::signal::Signal;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use snafu::{ensure, ResultExt};
use tokio::fs::OpenOptions;
use tokio::process::Child;
use crate::error::{self, Result};
use crate::utils::health::HealthChecker;
pub type Pid = u32;
/// The state of a process.
#[derive(Debug, Clone)]
pub struct Process {
pub(crate) exit_status: Option<ExitStatus>,
pub(crate) exited: bool,
}
/// ProcessManager provides the ability to spawn/wait/kill a child process.
#[derive(Debug, Clone)]
pub struct ProcessManager {
processes: Arc<Mutex<HashMap<Pid, Process>>>,
}
/// The callback while the child process exits.
pub type OnChildExitResult = std::result::Result<ExitStatus, std::io::Error>;
impl Default for ProcessManager {
fn default() -> Self {
Self::new()
}
}
impl ProcessManager {
pub fn new() -> Self {
Self {
processes: Arc::new(Default::default()),
}
}
pub fn get(&self, pid: Pid) -> Option<Process> {
self.processes.lock().unwrap().get(&pid).cloned()
}
fn wait<F>(&self, mut child: Child, f: F)
where
F: FnOnce(Pid, OnChildExitResult) + Send + 'static,
{
let processes = self.processes.clone();
tokio::spawn(async move {
// Safety: caller checked
let pid = child.id().unwrap();
let result = child.wait().await;
match result {
Ok(code) => {
warn!("pid: {pid} exited with status: {}", code);
f(pid, Ok(code));
processes.lock().unwrap().entry(pid).and_modify(|process| {
process.exit_status = Some(code);
process.exited = true;
});
}
Err(err) => {
warn!("pid: {pid} exited with error: {}", err);
f(pid, Err(err));
processes.lock().unwrap().entry(pid).and_modify(|process| {
process.exited = true;
});
}
}
});
}
/// Spawns a new process.
pub fn spawn<T: Into<Stdio>, F>(
&self,
binary: &str,
args: &[String],
stdout: T,
stderr: T,
on_exit: F,
) -> Result<Pid>
where
F: FnOnce(Pid, OnChildExitResult) + Send + 'static,
{
info!("starting {} with {:?}", binary, args);
let child = tokio::process::Command::new(binary)
.args(args)
.stdout(stdout)
.stderr(stderr)
.spawn()
.context(error::SpawnChildSnafu)?;
let pid = child.id();
if let Some(pid) = pid {
self.processes.lock().unwrap().insert(
pid,
Process {
exit_status: None,
exited: false,
},
);
self.wait(child, on_exit);
Ok(pid)
} else {
error::UnexpectedExitedSnafu {}.fail()
}
}
/// Kills a process via [Pid].
pub fn kill<T: Into<Option<Signal>>>(pid: Pid, signal: T) -> Result<()> {
let signal: Option<Signal> = signal.into();
info!("kill pid :{} signal: {:?}", pid, signal);
// Safety: checked.
nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid as i32), signal)
.context(error::KillProcessSnafu { pid })?;
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProcessState {
NotSpawn,
Spawning,
HealthChecking(Pid),
Health(Pid),
Killing(Pid),
Exited(Pid),
}
impl ProcessState {
/// Returns true if it's [ProcessState::Health].
pub fn health(&self) -> bool {
matches!(self, ProcessState::Health(_))
}
}
/// The controller of an unstable process.
pub struct UnstableProcessController {
pub binary_path: String,
pub args: Vec<String>,
pub root_dir: String,
pub seed: u64,
pub process_manager: ProcessManager,
pub health_check: Box<dyn HealthChecker>,
pub sender: tokio::sync::watch::Sender<ProcessState>,
pub running: Arc<AtomicBool>,
}
async fn path_to_stdio(path: &str) -> Result<std::fs::File> {
Ok(OpenOptions::new()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(path)
.await
.context(error::CreateFileSnafu { path })?
.into_std()
.await)
}
impl UnstableProcessController {
/// Start the unstable processes.
pub async fn start(&self) {
self.running.store(true, Ordering::Relaxed);
let mut rng = ChaChaRng::seed_from_u64(self.seed);
while self.running.load(Ordering::Relaxed) {
let min = rng.gen_range(50..100);
let max = rng.gen_range(300..600);
let ms = rng.gen_range(min..max);
let pid = self
.start_process_with_retry(3)
.await
.expect("Failed to start process");
tokio::time::sleep(Duration::from_millis(ms)).await;
warn!("After {ms}ms, killing pid: {pid}");
self.sender.send(ProcessState::Killing(pid)).unwrap();
ProcessManager::kill(pid, Signal::SIGKILL).expect("Failed to kill");
}
}
pub fn stop(&self) {
self.running.store(false, Ordering::Relaxed);
}
async fn start_process_with_retry(&self, max_retry: usize) -> Result<Pid> {
for _ in 0..max_retry {
let pid = self.start_process().await.unwrap();
let wait_timeout = self.health_check.wait_timeout();
let result = tokio::time::timeout(wait_timeout, async {
self.sender.send(ProcessState::HealthChecking(pid)).unwrap();
self.health_check.check().await;
})
.await;
match result {
Ok(_) => {
self.sender.send(ProcessState::Health(pid)).unwrap();
return Ok(pid);
}
Err(_) => {
ensure!(
self.process_manager.get(pid).unwrap().exited,
error::UnexpectedSnafu {
violated: format!("Failed to start process: pid: {pid}")
}
);
self.sender.send(ProcessState::Exited(pid)).unwrap();
// Retry alter
warn!("Wait for health checking timeout, retry later...");
}
}
}
error::UnexpectedSnafu {
violated: "Failed to start process",
}
.fail()
}
async fn start_process(&self) -> Result<Pid> {
let on_exit = move |pid, result| {
info!("The pid: {pid} exited, result: {result:?}");
};
let now = common_time::util::current_time_millis();
let stdout = format!("{}stdout-{}", self.root_dir, now);
let stderr = format!("{}stderr-{}", self.root_dir, now);
let stdout = path_to_stdio(&stdout).await?;
let stderr = path_to_stdio(&stderr).await?;
self.sender.send(ProcessState::Spawning).unwrap();
self.process_manager.spawn(
&self.binary_path,
&self.args.clone(),
stdout,
stderr,
on_exit,
)
}
}

View File

@@ -42,7 +42,7 @@ use tests_fuzz::ir::{
use tests_fuzz::translator::mysql::alter_expr::AlterTableExprTranslator;
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::utils::{init_greptime_connections, Connections};
use tests_fuzz::utils::{init_greptime_connections_via_env, Connections};
use tests_fuzz::validator;
struct FuzzContext {
@@ -229,7 +229,7 @@ async fn execute_alter_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
fuzz_target!(|input: FuzzInput| {
common_telemetry::init_default_ut_logging();
common_runtime::block_on_write(async {
let Connections { mysql } = init_greptime_connections().await;
let Connections { mysql } = init_greptime_connections_via_env().await;
let ctx = FuzzContext {
greptime: mysql.expect("mysql connection init must be succeed"),
};

View File

@@ -39,7 +39,7 @@ use tests_fuzz::ir::{droppable_columns, AlterTableExpr, CreateTableExpr};
use tests_fuzz::translator::mysql::alter_expr::AlterTableExprTranslator;
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::utils::{init_greptime_connections, Connections};
use tests_fuzz::utils::{init_greptime_connections_via_env, Connections};
use tests_fuzz::validator;
struct FuzzContext {
@@ -174,7 +174,7 @@ async fn execute_alter_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
fuzz_target!(|input: FuzzInput| {
common_telemetry::init_default_ut_logging();
common_runtime::block_on_write(async {
let Connections { mysql } = init_greptime_connections().await;
let Connections { mysql } = init_greptime_connections_via_env().await;
let ctx = FuzzContext {
greptime: mysql.expect("mysql connection init must be succeed"),
};

View File

@@ -31,7 +31,7 @@ use tests_fuzz::generator::Generator;
use tests_fuzz::ir::CreateDatabaseExpr;
use tests_fuzz::translator::mysql::create_expr::CreateDatabaseExprTranslator;
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::utils::{init_greptime_connections, Connections};
use tests_fuzz::utils::{init_greptime_connections_via_env, Connections};
struct FuzzContext {
greptime: Pool<MySql>,
@@ -95,7 +95,7 @@ async fn execute_create_database(ctx: FuzzContext, input: FuzzInput) -> Result<(
fuzz_target!(|input: FuzzInput| {
common_telemetry::init_default_ut_logging();
common_runtime::block_on_write(async {
let Connections { mysql } = init_greptime_connections().await;
let Connections { mysql } = init_greptime_connections_via_env().await;
let ctx = FuzzContext {
greptime: mysql.expect("mysql connection init must be succeed"),
};

View File

@@ -37,7 +37,7 @@ use tests_fuzz::generator::Generator;
use tests_fuzz::ir::{primary_key_and_not_null_column_options_generator, Column};
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::utils::{init_greptime_connections, Connections};
use tests_fuzz::utils::{init_greptime_connections_via_env, Connections};
use tests_fuzz::validator;
struct FuzzContext {
@@ -184,7 +184,7 @@ async fn execute_create_logic_table(ctx: FuzzContext, input: FuzzInput) -> Resul
fuzz_target!(|input: FuzzInput| {
common_telemetry::init_default_ut_logging();
common_runtime::block_on_write(async {
let Connections { mysql } = init_greptime_connections().await;
let Connections { mysql } = init_greptime_connections_via_env().await;
let ctx = FuzzContext {
greptime: mysql.expect("mysql connection init must be succeed"),
};

View File

@@ -31,7 +31,7 @@ use tests_fuzz::generator::Generator;
use tests_fuzz::ir::CreateTableExpr;
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::utils::{init_greptime_connections, Connections};
use tests_fuzz::utils::{init_greptime_connections_via_env, Connections};
use tests_fuzz::validator;
struct FuzzContext {
@@ -111,7 +111,7 @@ async fn execute_create_table(ctx: FuzzContext, input: FuzzInput) -> Result<()>
fuzz_target!(|input: FuzzInput| {
common_telemetry::init_default_ut_logging();
common_runtime::block_on_write(async {
let Connections { mysql } = init_greptime_connections().await;
let Connections { mysql } = init_greptime_connections_via_env().await;
let ctx = FuzzContext {
greptime: mysql.expect("mysql connection init must be succeed"),
};

View File

@@ -36,7 +36,7 @@ use tests_fuzz::ir::{CreateTableExpr, InsertIntoExpr};
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator;
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::utils::{init_greptime_connections, Connections};
use tests_fuzz::utils::{init_greptime_connections_via_env, Connections};
struct FuzzContext {
greptime: Pool<MySql>,
@@ -155,7 +155,7 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
fuzz_target!(|input: FuzzInput| {
common_telemetry::init_default_ut_logging();
common_runtime::block_on_write(async {
let Connections { mysql } = init_greptime_connections().await;
let Connections { mysql } = init_greptime_connections_via_env().await;
let ctx = FuzzContext {
greptime: mysql.expect("mysql connection init must be succeed"),
};

View File

@@ -38,7 +38,7 @@ use tests_fuzz::ir::{CreateTableExpr, InsertIntoExpr};
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator;
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::utils::{init_greptime_connections, Connections};
use tests_fuzz::utils::{init_greptime_connections_via_env, Connections};
struct FuzzContext {
greptime: Pool<MySql>,
@@ -191,7 +191,7 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
fuzz_target!(|input: FuzzInput| {
common_telemetry::init_default_ut_logging();
common_runtime::block_on_write(async {
let Connections { mysql } = init_greptime_connections().await;
let Connections { mysql } = init_greptime_connections_via_env().await;
let ctx = FuzzContext {
greptime: mysql.expect("mysql connection init must be succeed"),
};

View File

@@ -0,0 +1,246 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![no_main]
use std::collections::HashMap;
use std::fs::create_dir_all;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use libfuzzer_sys::arbitrary::{Arbitrary, Unstructured};
use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use serde::Serialize;
use snafu::ensure;
use sqlx::mysql::MySqlPoolOptions;
use sqlx::{MySql, Pool};
use tests_fuzz::context::TableContext;
use tests_fuzz::error::Result;
use tests_fuzz::fake::{
merge_two_word_map_fn, random_capitalize_map, uppercase_and_keyword_backtick_map,
MappedGenerator, WordGenerator,
};
use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder;
use tests_fuzz::generator::Generator;
use tests_fuzz::ir::CreateTableExpr;
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::utils::config::{get_conf_path, write_config_file};
use tests_fuzz::utils::health::HttpHealthChecker;
use tests_fuzz::utils::load_unstable_test_env_variables;
use tests_fuzz::utils::process::{ProcessManager, ProcessState, UnstableProcessController};
use tests_fuzz::{error, validator};
use tokio::sync::watch;
struct FuzzContext {
greptime: Pool<MySql>,
}
impl FuzzContext {
async fn close(self) {
self.greptime.close().await;
}
}
#[derive(Clone, Debug)]
struct FuzzInput {
seed: u64,
num: usize,
}
impl Arbitrary<'_> for FuzzInput {
fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result<Self> {
let seed = u.int_in_range(u64::MIN..=u64::MAX)?;
let mut rng = ChaChaRng::seed_from_u64(seed);
let num = rng.gen_range(1..500);
Ok(FuzzInput { seed, num })
}
}
const DEFAULT_TEMPLATE: &str = "standalone.template.toml";
const DEFAULT_CONFIG_NAME: &str = "standalone.template.toml";
const DEFAULT_ROOT_DIR: &str = "/tmp/unstable_greptime/";
const DEFAULT_DATA_HOME: &str = "/tmp/unstable_greptime/datahome/";
const DEFAULT_MYSQL_URL: &str = "127.0.0.1:4002";
const DEFAULT_HTTP_HEALTH_URL: &str = "http://127.0.0.1:4000/health";
fn generate_create_table_expr<R: Rng + 'static>(rng: &mut R) -> CreateTableExpr {
let columns = rng.gen_range(2..30);
let create_table_generator = CreateTableExprGeneratorBuilder::default()
.name_generator(Box::new(MappedGenerator::new(
WordGenerator,
merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map),
)))
.columns(columns)
.engine("mito")
.build()
.unwrap();
create_table_generator.generate(rng).unwrap()
}
async fn connect_mysql(addr: &str) -> Pool<MySql> {
loop {
match MySqlPoolOptions::new()
.connect(&format!("mysql://{addr}/public"))
.await
{
Ok(mysql) => return mysql,
Err(err) => {
warn!("Reconnecting to {addr}, error: {err}")
}
}
}
}
async fn execute_unstable_create_table(
unstable_process_controller: Arc<UnstableProcessController>,
rx: watch::Receiver<ProcessState>,
input: FuzzInput,
) -> Result<()> {
// Starts the unstable process.
let moved_unstable_process_controller = unstable_process_controller.clone();
let handler = tokio::spawn(async move { moved_unstable_process_controller.start().await });
let mut rng = ChaChaRng::seed_from_u64(input.seed);
let mysql = connect_mysql(DEFAULT_MYSQL_URL).await;
let ctx = FuzzContext { greptime: mysql };
let mut table_states = HashMap::new();
for _ in 0..input.num {
let expr = generate_create_table_expr(&mut rng);
let table_ctx = Arc::new(TableContext::from(&expr));
let table_name = expr.table_name.to_string();
if table_states.contains_key(&table_name) {
warn!("ignores same name table: {table_name}");
// ignores.
continue;
}
let translator = CreateTableExprTranslator;
let sql = translator.translate(&expr).unwrap();
let result = sqlx::query(&sql).execute(&ctx.greptime).await;
match result {
Ok(result) => {
let state = *rx.borrow();
table_states.insert(table_name, state);
validate_columns(&ctx.greptime, &table_ctx).await;
info!("Create table: {sql}, result: {result:?}");
}
Err(err) => {
let state = *rx.borrow();
ensure!(
!state.health(),
error::UnexpectedSnafu {
violated: format!("Failed to create table: {sql}, error: {err}")
}
);
table_states.insert(table_name, state);
continue;
}
}
}
loop {
let sql = "DROP DATABASE IF EXISTS public";
match sqlx::query(sql).execute(&ctx.greptime).await {
Ok(result) => {
info!("Drop table: {}, result: {result:?}", sql);
break;
}
Err(err) => warn!("Failed to drop table: {}, error: {err}", sql),
}
}
// Cleans up
ctx.close().await;
unstable_process_controller.stop();
let _ = handler.await;
info!("Finishing test for input: {:?}", input);
Ok(())
}
async fn validate_columns(client: &Pool<MySql>, table_ctx: &TableContext) {
loop {
match validator::column::fetch_columns(client, "public".into(), table_ctx.name.clone())
.await
{
Ok(mut column_entries) => {
column_entries.sort_by(|a, b| a.column_name.cmp(&b.column_name));
let mut columns = table_ctx.columns.clone();
columns.sort_by(|a, b| a.name.value.cmp(&b.name.value));
validator::column::assert_eq(&column_entries, &columns).unwrap();
return;
}
Err(err) => warn!(
"Failed to fetch table '{}' columns, error: {}",
table_ctx.name, err
),
}
}
}
fuzz_target!(|input: FuzzInput| {
common_telemetry::init_default_ut_logging();
common_runtime::block_on_write(async {
let variables = load_unstable_test_env_variables();
let root_dir = variables.root_dir.unwrap_or(DEFAULT_ROOT_DIR.to_string());
create_dir_all(&root_dir).unwrap();
let output_config_path = format!("{root_dir}{DEFAULT_CONFIG_NAME}");
let mut conf_path = get_conf_path();
conf_path.push(DEFAULT_TEMPLATE);
let template_path = conf_path.to_str().unwrap().to_string();
// Writes config file.
#[derive(Serialize)]
struct Context {
data_home: String,
}
write_config_file(
&template_path,
&Context {
data_home: DEFAULT_DATA_HOME.to_string(),
},
&output_config_path,
)
.await
.unwrap();
let args = vec![
"standalone".to_string(),
"start".to_string(),
format!("--config-file={output_config_path}"),
];
let process_manager = ProcessManager::new();
let (tx, rx) = watch::channel(ProcessState::NotSpawn);
let unstable_process_controller = Arc::new(UnstableProcessController {
binary_path: variables.binary_path,
args,
root_dir,
seed: input.seed,
process_manager,
health_check: Box::new(HttpHealthChecker {
url: DEFAULT_HTTP_HEALTH_URL.to_string(),
}),
sender: tx,
running: Arc::new(AtomicBool::new(false)),
});
execute_unstable_create_table(unstable_process_controller, rx, input)
.await
.unwrap_or_else(|err| panic!("fuzz test must be succeed: {err:?}"));
})
});