feat: Prototype of the storage engine (#107)

* feat: memtable flush (#63)

* wip: memtable flush

* optimize schema conversion

* remove unnecessary import

* add parquet file verfication

* add backtrace to error

* chore: upgrade opendal to 0.9 and fixed some problems

* rename error

* fix: error description

Co-authored-by: Dennis Zhuang <killme2008@gmail.com>

* feat: region manifest service (#57)

* feat: adds Manifest API

* feat: impl region manifest service

* refactor: by CR comments

* fix: storage error mod test

* fix: tweak storage cargo

* fix: tweak storage cargo

* refactor: by CR comments

* refactor: rename current_version

* feat: add wal writer (#60)

* feat: add Wal

* upgrade engine for wal

* fix: unit test for wal

* feat: wal into region

* fix: unix test

* fix clippy

* chore: by cr

* chore: by cr

* chore: prevent test data polution

* chore: by cr

* minor fix

* chore: by cr

* feat: Implement flush (#65)

* feat: Flush framework

- feat: Add id to memtable
- refactor: Rename MemtableSet/MutableMemtables to MemtableVersion/MemtableSet
- feat: Freeze memtable
- feat: Trigger flush
- feat: Background job pool
- feat: flush job
- feat: Sst access layer
- feat: Custom Deserialize for StringBytes
- feat: Use RegionWriter to apply file metas
- feat: Apply version edit
- chore: Remove unused imports

refactor: Use ParquetWriter to replace FlushTask

refactor: FsAccessLayer takes object store as param

chore: Remove todo from doc comments

feat: Move wal to WriterContext

chore: Fix clippy

chore: Add backtrace to WriteWal error

* feat: adds manifest to region and refactor sst/manifest dir config (#72)

* feat: adds manifest to region and refactor sst/manifest dir with EngineConfig

* refactor: ensure path ends with '/' in ManifestLogStorage

* fix: style

* refactor: normalize storage directory path and minor changes by CR

* refactor: doesn't need slash any more

* feat: Implement apply_edit() and add timestamp index to schema (#73)

* feat: Implement VersionControl::apply_edit()

* feat: Add timestamp index to schema

* feat: Implement Schema::timestamp_column()

* feat: persist region metadata to manifest (#74)

* feat: persist metadata when creating region or sst files

* fix: revert FileMeta comment

* feat: resolve todo

* fix: clippy warning

* fix: revert files_to_remove type in RegionEdit

* feat: impl SizeBasedStrategy for flush (#76)

* feat: impl SizeBasedStrategy for flush

* doc: get_mutable_limitation

* fix: code style and comment

* feat: align timestamp (#75)

* feat: align timestamps in write batch

* fix cr comments

* fix timestamp overflow

* simplify overflow check

* fix cr comments

* fix clippy issues

* test: Fix region tests (comment out some unsupported tests) (#82)

* feat: flush job (#80)

* feat: flush job

* fix cr comments

* move file name instead of clone

* comment log file test (#84)

* feat: improve MemtableVersion (#78)

* feat: improve MemtableVersion

* feat: remove flushed immutable memtables and test MemtableVersion

* refactor: by CR comments

* refactor: clone kv in iterator

* fix: clippy warning

* refactor: Make BatchIterator supertrait of Iterator (#85)

* refactor: rename Version to ManifestVersion and move out manifest from ShareData (#83)

* feat: Insert multiple memtables by time range (#77)

* feat: memtable::Inserter supports insert multiple memtables by time range

* chore: Update timestamp comment

* test: Add tests for Inserter

* test: Fix region tests (comment out some unsupported tests)

* refactor: align_timestamp() use TimestampMillis::aligned_by_bucket()

* chore: rename aligned_by_bucket to align_by_bucket

* fix: Fix compile errors

* fix: sst and manifest dir (#86)

* Set RowKeyDescriptor::enable_version_column to false by default

* feat: Implement write stall (#90)

* feat: Implement write stall

* chore: Update comments

* feat: Support reading multiple memtables (#93)

* feat: Support reading multiple memtables

* test: uncomment tests rely on snapshot read

* feat: wal format (#70)

* feat: wal codec

* chore: minor fix

* chore: comment

* chore: by cr

* chore: write_batch_codec mod

* chore: by cr

* chore: upgrade proto

* chore: by cr

* fix failing test

* fix failing test

* feat: manifest to wal (#100)

* feat: write manifest to wal

* chore: sequence into wal

* chore: by cr

* chore: by cr

* refactor: create log store (#104)

Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: Lei, Huang <6406592+v0y4g3r@users.noreply.github.com>
Co-authored-by: fariygirl <clickmetoday@163.com>
Co-authored-by: Jiachun Feng <jiachun_feng@proton.me>
Co-authored-by: Lei, HUANG <mrsatangel@gmail.com>

* chore: Fix clippy

Co-authored-by: Lei, Huang <6406592+v0y4g3r@users.noreply.github.com>
Co-authored-by: Dennis Zhuang <killme2008@gmail.com>
Co-authored-by: Jiachun Feng <jiachun_feng@proton.me>
Co-authored-by: fariygirl <clickmetoday@163.com>
Co-authored-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
evenyag
2022-07-25 15:26:00 +08:00
committed by GitHub
parent 2b064265bf
commit bf5975ca3e
95 changed files with 5675 additions and 543 deletions

View File

@@ -15,6 +15,7 @@ common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
datatypes = { path = "../datatypes"}
hyper = { version = "0.14", features = ["full"] }
log-store = { path = "../log-store" }
metrics = "0.18"
query = { path = "../query" }
serde = "1.0"
@@ -34,6 +35,7 @@ tower-http = { version ="0.3", features = ["full"]}
[dev-dependencies]
axum-test-helper = "0.1"
common-query = { path = "../common/query" }
tempdir = "0.3"
[dev-dependencies.arrow]
package = "arrow2"

View File

@@ -8,11 +8,23 @@ use crate::error::{NewCatalogSnafu, Result};
use crate::instance::{Instance, InstanceRef};
use crate::server::Services;
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct DatanodeOptions {
pub http_addr: String,
pub rpc_addr: String,
pub wal_dir: String,
}
impl Default for DatanodeOptions {
fn default() -> Self {
Self {
http_addr: Default::default(),
rpc_addr: Default::default(),
wal_dir: "/tmp/wal".to_string(),
}
}
}
/// Datanode service.
pub struct Datanode {
opts: DatanodeOptions,
@@ -22,9 +34,9 @@ pub struct Datanode {
}
impl Datanode {
pub fn new(opts: DatanodeOptions) -> Result<Datanode> {
pub async fn new(opts: DatanodeOptions) -> Result<Datanode> {
let catalog_list = memory::new_memory_catalog_list().context(NewCatalogSnafu)?;
let instance = Arc::new(Instance::new(catalog_list.clone()));
let instance = Arc::new(Instance::new(&opts, catalog_list.clone()).await?);
Ok(Self {
opts,

View File

@@ -3,6 +3,7 @@ use std::any::Any;
use common_error::ext::BoxedError;
use common_error::prelude::*;
use datatypes::prelude::ConcreteDataType;
use storage::error::Error as StorageError;
use table::error::Error as TableError;
use table_engine::error::Error as TableEngineError;
@@ -92,6 +93,15 @@ pub enum Error {
#[snafu(display("Fail to start gRPC server, source: {}", source))]
StartGrpc { source: tonic::transport::Error },
#[snafu(display("Failed to create directory {}, source: {}", dir, source))]
CreateDir { dir: String, source: std::io::Error },
#[snafu(display("Failed to open log store, source: {}", source))]
OpenLogStore { source: log_store::error::Error },
#[snafu(display("Failed to storage engine, source: {}", source))]
OpenStorageEngine { source: StorageError },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -112,7 +122,10 @@ impl ErrorExt for Error {
Error::StartHttp { .. }
| Error::ParseAddr { .. }
| Error::TcpBind { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
| Error::StartGrpc { .. }
| Error::CreateDir { .. } => StatusCode::Internal,
Error::OpenLogStore { source } => source.status_code(),
Error::OpenStorageEngine { source } => source.status_code(),
}
}

View File

@@ -1,21 +1,24 @@
use std::sync::Arc;
use std::{fs, path, sync::Arc};
use common_telemetry::logging::info;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use log_store::fs::{config::LogConfig, log::LocalFileLogStore};
use query::catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef};
use snafu::ResultExt;
use sql::statements::statement::Statement;
use storage::EngineImpl;
use storage::{config::EngineConfig, EngineImpl};
use table::engine::EngineContext;
use table::engine::TableEngine;
use table::requests::CreateTableRequest;
use table_engine::engine::MitoEngine;
use crate::error::{CreateTableSnafu, ExecuteSqlSnafu, Result};
use crate::datanode::DatanodeOptions;
use crate::error::{self, CreateTableSnafu, ExecuteSqlSnafu, Result};
use crate::sql::SqlHandler;
type DefaultEngine = MitoEngine<EngineImpl>;
type DefaultEngine = MitoEngine<EngineImpl<LocalFileLogStore>>;
// An abstraction to read/write services.
pub struct Instance {
@@ -30,17 +33,22 @@ pub struct Instance {
pub type InstanceRef = Arc<Instance>;
impl Instance {
pub fn new(catalog_list: CatalogListRef) -> Self {
pub async fn new(opts: &DatanodeOptions, catalog_list: CatalogListRef) -> Result<Self> {
let log_store = create_local_file_log_store(opts).await?;
let factory = QueryEngineFactory::new(catalog_list.clone());
let query_engine = factory.query_engine().clone();
let table_engine = DefaultEngine::new(EngineImpl::new());
let table_engine = DefaultEngine::new(
EngineImpl::new(EngineConfig::default(), Arc::new(log_store))
.await
.context(error::OpenStorageEngineSnafu)?,
);
Self {
Ok(Self {
query_engine,
sql_handler: SqlHandler::new(table_engine.clone()),
table_engine,
catalog_list,
}
})
}
pub async fn execute_sql(&self, sql: &str) -> Result<Output> {
@@ -95,7 +103,10 @@ impl Instance {
CreateTableRequest {
name: table_name.to_string(),
desc: Some(" a test table".to_string()),
schema: Arc::new(Schema::new(column_schemas)),
schema: Arc::new(
Schema::with_timestamp_index(column_schemas, 3)
.expect("ts is expected to be timestamp column"),
),
},
)
.await
@@ -116,6 +127,25 @@ impl Instance {
}
}
async fn create_local_file_log_store(opts: &DatanodeOptions) -> Result<LocalFileLogStore> {
// create WAL directory
fs::create_dir_all(path::Path::new(&opts.wal_dir))
.context(error::CreateDirSnafu { dir: &opts.wal_dir })?;
info!("The WAL directory is: {}", &opts.wal_dir);
let log_config = LogConfig {
log_file_dir: opts.wal_dir.clone(),
..Default::default()
};
let log_store = LocalFileLogStore::open(&log_config)
.await
.context(error::OpenLogStoreSnafu)?;
Ok(log_store)
}
#[cfg(test)]
mod tests {
use arrow::array::UInt64Array;
@@ -123,12 +153,13 @@ mod tests {
use query::catalog::memory;
use super::*;
use crate::test_util;
#[tokio::test]
async fn test_execute_insert() {
let catalog_list = memory::new_memory_catalog_list().unwrap();
let instance = Instance::new(catalog_list);
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Instance::new(&opts, catalog_list).await.unwrap();
instance.start().await.unwrap();
let output = instance
@@ -147,8 +178,8 @@ mod tests {
#[tokio::test]
async fn test_execute_query() {
let catalog_list = memory::new_memory_catalog_list().unwrap();
let instance = Instance::new(catalog_list);
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Instance::new(&opts, catalog_list).await.unwrap();
let output = instance
.execute_sql("select sum(number) from numbers limit 20")

View File

@@ -6,5 +6,7 @@ mod metric;
pub mod server;
mod sql;
pub use crate::datanode::Datanode;
pub use crate::datanode::DatanodeOptions;
#[cfg(test)]
pub mod test_util;
#[cfg(test)]
mod tests;

View File

@@ -48,6 +48,7 @@ mod tests {
use super::*;
use crate::instance::Instance;
use crate::server::http::JsonOutput;
use crate::test_util;
fn create_params() -> Query<HashMap<String, String>> {
let mut map = HashMap::new();
@@ -58,15 +59,16 @@ mod tests {
Query(map)
}
fn create_extension() -> Extension<InstanceRef> {
async fn create_extension() -> Extension<InstanceRef> {
let catalog_list = memory::new_memory_catalog_list().unwrap();
let instance = Arc::new(Instance::new(catalog_list));
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Arc::new(Instance::new(&opts, catalog_list).await.unwrap());
Extension(instance)
}
#[tokio::test]
async fn test_sql_not_provided() {
let extension = create_extension();
let extension = create_extension().await;
let json = sql(extension, Query(HashMap::default())).await;
match json {
@@ -82,7 +84,7 @@ mod tests {
#[tokio::test]
async fn test_sql_output_rows() {
let query = create_params();
let extension = create_extension();
let extension = create_extension().await;
let json = sql(extension, query).await;
@@ -110,7 +112,7 @@ mod tests {
counter!("test_metrics", 1);
let query = create_params();
let extension = create_extension();
let extension = create_extension().await;
let text = metrics(extension, query).await;
match text {

View File

@@ -63,14 +63,17 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use log_store::fs::noop::NoopLogStore;
use query::catalog::memory;
use query::catalog::schema::SchemaProvider;
use query::error::Result as QueryResult;
use query::QueryEngineFactory;
use storage::config::EngineConfig;
use storage::EngineImpl;
use table::error::Result as TableResult;
use table::{Table, TableRef};
use table_engine::engine::MitoEngine;
use tempdir::TempDir;
use super::*;
@@ -90,7 +93,7 @@ mod tests {
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
];
Arc::new(Schema::new(column_schemas))
Arc::new(Schema::with_timestamp_index(column_schemas, 3).unwrap())
}
async fn scan(
&self,
@@ -129,8 +132,11 @@ mod tests {
}
}
#[test]
fn test_statement_to_request() {
#[tokio::test]
async fn test_statement_to_request() {
let dir = TempDir::new("setup_test_engine_and_table").unwrap();
let store_dir = dir.path().to_string_lossy();
let catalog_list = memory::new_memory_catalog_list().unwrap();
let factory = QueryEngineFactory::new(catalog_list);
let query_engine = factory.query_engine().clone();
@@ -140,7 +146,14 @@ mod tests {
('host2', 88.8, 333.3, 1655276558000)
"#;
let table_engine = MitoEngine::<EngineImpl>::new(EngineImpl::new());
let table_engine = MitoEngine::<EngineImpl<NoopLogStore>>::new(
EngineImpl::new(
EngineConfig::with_store_dir(&store_dir),
Arc::new(NoopLogStore::default()),
)
.await
.unwrap(),
);
let sql_handler = SqlHandler::new(table_engine);
let stmt = query_engine.sql_to_statement(sql).unwrap();

View File

@@ -0,0 +1,17 @@
use tempdir::TempDir;
use crate::datanode::DatanodeOptions;
/// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`,
/// Only for test.
///
/// TODO: Add a test feature
pub fn create_tmp_dir_and_datanode_opts() -> (DatanodeOptions, TempDir) {
let tmp_dir = TempDir::new("/tmp/greptimedb_test").unwrap();
let opts = DatanodeOptions {
wal_dir: tmp_dir.path().to_str().unwrap().to_string(),
..Default::default()
};
(opts, tmp_dir)
}

View File

@@ -0,0 +1 @@
mod http_test;

View File

@@ -5,12 +5,16 @@ use std::sync::Arc;
use axum::http::StatusCode;
use axum::Router;
use axum_test_helper::TestClient;
use datanode::{instance::Instance, server::http::HttpServer};
use query::catalog::memory;
fn make_test_app() -> Router {
use crate::instance::Instance;
use crate::server::http::HttpServer;
use crate::test_util;
async fn make_test_app() -> Router {
let catalog_list = memory::new_memory_catalog_list().unwrap();
let instance = Arc::new(Instance::new(catalog_list));
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Arc::new(Instance::new(&opts, catalog_list).await.unwrap());
let http_server = HttpServer::new(instance);
http_server.make_app()
}
@@ -18,7 +22,7 @@ fn make_test_app() -> Router {
#[tokio::test]
async fn test_sql_api() {
common_telemetry::init_default_ut_logging();
let app = make_test_app();
let app = make_test_app().await;
let client = TestClient::new(app);
let res = client.get("/sql").send().await;
assert_eq!(res.status(), StatusCode::OK);
@@ -46,7 +50,7 @@ async fn test_sql_api() {
async fn test_metrics_api() {
common_telemetry::init_default_ut_logging();
common_telemetry::init_default_metrics_recorder();
let app = make_test_app();
let app = make_test_app().await;
let client = TestClient::new(app);
// Send a sql