test(remote_wal): add sqlness with kafka wal (#3027)

* feat(sqlness): add kafka wal config

* chore: add sqlness with kafka wal ci config

* fix: fix config

* chore: apply suggestions from CR

* fix: add metasrv config to sqlness with kafka

* fix: replay memtable should from flushed_entry_id + 1

* fix: should set append flag to fopen

* feat: start wal allocator in standalone meta mode

* feat: append a noop record after kafka topic initialization

* test: ignore tests temporally

* test: change sqlness kafka wal config
This commit is contained in:
Weny Xu
2023-12-29 17:17:22 +09:00
committed by GitHub
parent 301ffc1d91
commit e16f093282
14 changed files with 176 additions and 122 deletions

View File

@@ -126,102 +126,22 @@ SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
| 128 | 128 | 10000 | 1280000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 53 FROM bigtable;
Affected Rows: 128
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 256 | 256 | 10000 | 2560000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 57 FROM bigtable;
Affected Rows: 256
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 512 | 512 | 10000 | 5120000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 61 FROM bigtable;
Affected Rows: 512
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 1024 | 1024 | 10000 | 10240000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 63 FROM bigtable;
Affected Rows: 1024
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 2048 | 2048 | 10000 | 20480000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 67 FROM bigtable;
Affected Rows: 2048
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 4096 | 4096 | 10000 | 40960000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 71 FROM bigtable;
Affected Rows: 4096
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 8192 | 8192 | 10000 | 81920000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 73 FROM bigtable;
Affected Rows: 8192
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 16384 | 16384 | 10000 | 163840000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 79 FROM bigtable;
Affected Rows: 16384
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 32768 | 32768 | 10000 | 327680000 |
+----------+-------------------+-----------------------------------+-----------------------------------+
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 53 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 57 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 61 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 63 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 67 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 71 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 73 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 79 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
DROP TABLE test;
Affected Rows: 0

View File

@@ -51,38 +51,38 @@ INSERT INTO bigtable SELECT a, to_unixtime(ts) * 51 FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 53 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 53 FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 57 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 57 FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 61 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 61 FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 63 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 63 FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 67 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 67 FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 71 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 71 FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 73 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 73 FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
INSERT INTO bigtable SELECT a, to_unixtime(ts) * 79 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 79 FROM bigtable;
SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
DROP TABLE test;

View File

@@ -6,12 +6,18 @@ rpc_hostname = '127.0.0.1'
rpc_runtime_size = 8
[wal]
{{ if is_raft_engine }}
provider = "raft_engine"
file_size = '1GB'
purge_interval = '10m'
purge_threshold = '10GB'
read_batch_size = 128
sync_write = false
{{ else }}
provider = "kafka"
broker_endpoints = {kafka_wal_broker_endpoints | unescaped}
linger = "5ms"
{{ endif }}
[storage]
type = 'File'

View File

@@ -0,0 +1,10 @@
[wal]
{{ if is_raft_engine }}
provider = "raft_engine"
{{ else }}
provider = "kafka"
broker_endpoints = {kafka_wal_broker_endpoints | unescaped}
num_topics = 64
selector_type = "round_robin"
topic_name_prefix = "distributed_test_greptimedb_wal_topic"
{{ endif }}

View File

@@ -3,12 +3,18 @@ enable_memory_catalog = false
require_lease_before_startup = true
[wal]
{{ if is_raft_engine }}
provider = "raft_engine"
file_size = '1GB'
purge_interval = '10m'
purge_threshold = '10GB'
read_batch_size = 128
sync_write = false
{{ else }}
provider = "kafka"
broker_endpoints = {kafka_wal_broker_endpoints | unescaped}
linger = "5ms"
{{ endif }}
[storage]
type = 'File'

View File

@@ -15,6 +15,7 @@ common-query.workspace = true
common-recordbatch.workspace = true
common-time.workspace = true
serde.workspace = true
serde_json.workspace = true
sqlness = { version = "0.5" }
tinytemplate = "1.2"
tokio.workspace = true

View File

@@ -41,10 +41,17 @@ const METASRV_ADDR: &str = "127.0.0.1:3002";
const SERVER_ADDR: &str = "127.0.0.1:4001";
const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
#[derive(Clone)]
pub enum WalConfig {
RaftEngine,
Kafka { broker_endpoints: Vec<String> },
}
#[derive(Clone)]
pub struct Env {
data_home: PathBuf,
server_addr: Option<String>,
wal: WalConfig,
}
#[allow(clippy::print_stdout)]
@@ -68,10 +75,11 @@ impl EnvController for Env {
#[allow(clippy::print_stdout)]
impl Env {
pub fn new(data_home: PathBuf, server_addr: Option<String>) -> Self {
pub fn new(data_home: PathBuf, server_addr: Option<String>, wal: WalConfig) -> Self {
Self {
data_home,
server_addr,
wal,
}
}
@@ -81,7 +89,7 @@ impl Env {
} else {
Self::build_db().await;
let db_ctx = GreptimeDBContext::new();
let db_ctx = GreptimeDBContext::new(self.wal.clone());
let server_process = self.start_server("standalone", &db_ctx, true).await;
@@ -106,7 +114,7 @@ impl Env {
} else {
Self::build_db().await;
let db_ctx = GreptimeDBContext::new();
let db_ctx = GreptimeDBContext::new(self.wal.clone());
// start a distributed GreptimeDB
let meta_server = self.start_server("metasrv", &db_ctx, true).await;
@@ -145,6 +153,7 @@ impl Env {
ctx: GreptimeDBContext {
time: 0,
datanode_id: Default::default(),
wal: self.wal.clone(),
},
is_standalone: false,
env: self.clone(),
@@ -178,6 +187,7 @@ impl Env {
.create(true)
.write(true)
.truncate(truncate_log)
.append(!truncate_log)
.open(log_file_name)
.unwrap();
@@ -214,6 +224,8 @@ impl Env {
"--enable-region-failover".to_string(),
"false".to_string(),
"--http-addr=127.0.0.1:5002".to_string(),
"-c".to_string(),
self.generate_config_file(subcommand, db_ctx),
];
(args, METASRV_ADDR.to_string())
}
@@ -321,6 +333,8 @@ impl Env {
wal_dir: String,
data_home: String,
procedure_dir: String,
is_raft_engine: bool,
kafka_wal_broker_endpoints: String,
}
let data_home = self
@@ -334,6 +348,8 @@ impl Env {
wal_dir,
data_home: data_home.display().to_string(),
procedure_dir,
is_raft_engine: db_ctx.is_raft_engine(),
kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(),
};
let rendered = tt.render(subcommand, &ctx).unwrap();
@@ -447,13 +463,28 @@ struct GreptimeDBContext {
/// Start time in millisecond
time: i64,
datanode_id: AtomicU32,
wal: WalConfig,
}
impl GreptimeDBContext {
pub fn new() -> Self {
pub fn new(wal: WalConfig) -> Self {
Self {
time: common_time::util::current_time_millis(),
datanode_id: AtomicU32::new(0),
wal,
}
}
fn is_raft_engine(&self) -> bool {
matches!(self.wal, WalConfig::RaftEngine)
}
fn kafka_wal_broker_endpoints(&self) -> String {
match &self.wal {
WalConfig::RaftEngine => String::new(),
WalConfig::Kafka { broker_endpoints } => {
serde_json::to_string(&broker_endpoints).unwrap()
}
}
}

View File

@@ -14,13 +14,20 @@
use std::path::PathBuf;
use clap::Parser;
use env::Env;
use clap::{Parser, ValueEnum};
use env::{Env, WalConfig};
use sqlness::{ConfigBuilder, Runner};
mod env;
mod util;
#[derive(ValueEnum, Debug, Clone)]
#[clap(rename_all = "snake_case")]
enum Wal {
RaftEngine,
Kafka,
}
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
/// SQL Harness for GrepTimeDB
@@ -41,9 +48,17 @@ struct Args {
#[clap(short, long, default_value = ".*")]
test_filter: String,
/// Address of the server
/// Address of the server.
#[clap(short, long)]
server_addr: Option<String>,
/// The type of Wal.
#[clap(short, long, default_value = "raft_engine")]
wal: Wal,
/// The kafka wal broker endpoints.
#[clap(short, long, default_value = "127.0.0.1:9092")]
kafka_wal_broker_endpoints: String,
}
#[tokio::main]
@@ -63,6 +78,18 @@ async fn main() {
.env_config_file(args.env_config_file)
.build()
.unwrap();
let runner = Runner::new(config, Env::new(data_home, args.server_addr));
let wal = match args.wal {
Wal::RaftEngine => WalConfig::RaftEngine,
Wal::Kafka => WalConfig::Kafka {
broker_endpoints: args
.kafka_wal_broker_endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect(),
},
};
let runner = Runner::new(config, Env::new(data_home, args.server_addr, wal));
runner.run().await.unwrap();
}