mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-28 10:50:39 +00:00
feat: impl TableManifest and refactor table engine, object store etc.
This commit is contained in:
5
Cargo.lock
generated
5
Cargo.lock
generated
@@ -1182,6 +1182,7 @@ dependencies = [
|
||||
"hyper",
|
||||
"log-store",
|
||||
"metrics 0.18.1",
|
||||
"object-store",
|
||||
"query",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -3757,6 +3758,7 @@ dependencies = [
|
||||
name = "table-engine"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
@@ -3768,6 +3770,9 @@ dependencies = [
|
||||
"datatypes",
|
||||
"futures",
|
||||
"log-store",
|
||||
"object-store",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"snafu",
|
||||
"storage",
|
||||
"store-api",
|
||||
|
||||
@@ -8,7 +8,7 @@ use common_telemetry::{self, logging::error, logging::info};
|
||||
#[derive(Parser)]
|
||||
#[clap(name = "greptimedb")]
|
||||
struct Command {
|
||||
#[clap(long, default_value = "/tmp/greptime/logs")]
|
||||
#[clap(long, default_value = "/tmp/greptimedb/logs")]
|
||||
log_dir: String,
|
||||
#[clap(long, default_value = "info")]
|
||||
log_level: String,
|
||||
|
||||
@@ -18,6 +18,7 @@ datatypes = { path = "../datatypes"}
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
log-store = { path = "../log-store" }
|
||||
metrics = "0.18"
|
||||
object-store = { path = "../object-store" }
|
||||
query = { path = "../query" }
|
||||
serde = "1.0"
|
||||
serde_json = "1.0"
|
||||
|
||||
@@ -4,11 +4,37 @@ use crate::error::Result;
|
||||
use crate::instance::{Instance, InstanceRef};
|
||||
use crate::server::Services;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FileStoreConfig {
|
||||
/// Storage path
|
||||
pub store_dir: String,
|
||||
}
|
||||
|
||||
impl Default for FileStoreConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
store_dir: "/tmp/greptimedb/data/".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ObjectStoreConfig {
|
||||
File(FileStoreConfig),
|
||||
}
|
||||
|
||||
impl Default for ObjectStoreConfig {
|
||||
fn default() -> Self {
|
||||
ObjectStoreConfig::File(FileStoreConfig::default())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DatanodeOptions {
|
||||
pub http_addr: String,
|
||||
pub rpc_addr: String,
|
||||
pub wal_dir: String,
|
||||
pub store_config: ObjectStoreConfig,
|
||||
}
|
||||
|
||||
impl Default for DatanodeOptions {
|
||||
@@ -17,6 +43,7 @@ impl Default for DatanodeOptions {
|
||||
http_addr: Default::default(),
|
||||
rpc_addr: Default::default(),
|
||||
wal_dir: "/tmp/wal".to_string(),
|
||||
store_config: ObjectStoreConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,6 +108,13 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Failed to storage engine, source: {}", source))]
|
||||
OpenStorageEngine { source: StorageError },
|
||||
|
||||
#[snafu(display("Failed to init backend, source: {}", source))]
|
||||
InitBackend {
|
||||
dir: String,
|
||||
source: std::io::Error,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -133,6 +140,7 @@ impl ErrorExt for Error {
|
||||
| Error::TcpBind { .. }
|
||||
| Error::StartGrpc { .. }
|
||||
| Error::CreateDir { .. } => StatusCode::Internal,
|
||||
Error::InitBackend { .. } => StatusCode::StorageUnavailable,
|
||||
Error::OpenLogStore { source } => source.status_code(),
|
||||
Error::OpenStorageEngine { source } => source.status_code(),
|
||||
}
|
||||
|
||||
@@ -6,16 +6,18 @@ use common_telemetry::logging::info;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use log_store::fs::{config::LogConfig, log::LocalFileLogStore};
|
||||
use object_store::{backend::fs::Backend, util, ObjectStore};
|
||||
use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use sql::statements::statement::Statement;
|
||||
use storage::{config::EngineConfig, EngineImpl};
|
||||
use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl};
|
||||
use table::engine::EngineContext;
|
||||
use table::engine::TableEngine;
|
||||
use table::requests::CreateTableRequest;
|
||||
use table_engine::config::EngineConfig as TableEngineConfig;
|
||||
use table_engine::engine::MitoEngine;
|
||||
|
||||
use crate::datanode::DatanodeOptions;
|
||||
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
|
||||
use crate::error::{
|
||||
self, CreateTableSnafu, ExecuteSqlSnafu, InsertSnafu, NewCatalogSnafu, Result,
|
||||
TableNotFoundSnafu,
|
||||
@@ -38,12 +40,18 @@ pub struct Instance {
|
||||
pub type InstanceRef = Arc<Instance>;
|
||||
|
||||
impl Instance {
|
||||
pub async fn new(opts: &DatanodeOptions) -> Result<Self> {
|
||||
pub async fn new(opts: &DatanodeOptions, catalog_list: CatalogListRef) -> Result<Self> {
|
||||
let object_store = new_object_store(&opts.store_config).await?;
|
||||
let log_store = create_local_file_log_store(opts).await?;
|
||||
|
||||
let table_engine = DefaultEngine::new(
|
||||
EngineImpl::new(EngineConfig::default(), Arc::new(log_store))
|
||||
.await
|
||||
.context(error::OpenStorageEngineSnafu)?,
|
||||
TableEngineConfig::default(),
|
||||
EngineImpl::new(
|
||||
StorageEngineConfig::default(),
|
||||
Arc::new(log_store),
|
||||
object_store.clone(),
|
||||
),
|
||||
object_store,
|
||||
);
|
||||
let catalog_manager = Arc::new(
|
||||
catalog::LocalCatalogManager::try_new(Arc::new(table_engine.clone()))
|
||||
@@ -166,6 +174,26 @@ impl Instance {
|
||||
}
|
||||
}
|
||||
|
||||
async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
|
||||
// TODO(dennis): supports other backend
|
||||
let store_dir = util::normalize_dir(match store_config {
|
||||
ObjectStoreConfig::File(file) => &file.store_dir,
|
||||
});
|
||||
|
||||
fs::create_dir_all(path::Path::new(&store_dir))
|
||||
.context(error::CreateDirSnafu { dir: &store_dir })?;
|
||||
|
||||
info!("The storage directory is: {}", &store_dir);
|
||||
|
||||
let accessor = Backend::build()
|
||||
.root(&store_dir)
|
||||
.finish()
|
||||
.await
|
||||
.context(error::InitBackendSnafu { dir: &store_dir })?;
|
||||
|
||||
Ok(ObjectStore::new(accessor))
|
||||
}
|
||||
|
||||
async fn create_local_file_log_store(opts: &DatanodeOptions) -> Result<LocalFileLogStore> {
|
||||
// create WAL directory
|
||||
fs::create_dir_all(path::Path::new(&opts.wal_dir))
|
||||
|
||||
@@ -65,11 +65,13 @@ mod tests {
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::value::Value;
|
||||
use log_store::fs::noop::NoopLogStore;
|
||||
use object_store::{backend::fs::Backend, ObjectStore};
|
||||
use query::QueryEngineFactory;
|
||||
use storage::config::EngineConfig;
|
||||
use storage::config::EngineConfig as StorageEngineConfig;
|
||||
use storage::EngineImpl;
|
||||
use table::error::Result as TableResult;
|
||||
use table::{Table, TableRef};
|
||||
use table_engine::config::EngineConfig as TableEngineConfig;
|
||||
use table_engine::engine::MitoEngine;
|
||||
use tempdir::TempDir;
|
||||
|
||||
@@ -138,6 +140,8 @@ mod tests {
|
||||
async fn test_statement_to_request() {
|
||||
let dir = TempDir::new("setup_test_engine_and_table").unwrap();
|
||||
let store_dir = dir.path().to_string_lossy();
|
||||
let accessor = Backend::build().root(&store_dir).finish().await.unwrap();
|
||||
let object_store = ObjectStore::new(accessor);
|
||||
|
||||
let catalog_list = catalog::memory::new_memory_catalog_list().unwrap();
|
||||
let factory = QueryEngineFactory::new(catalog_list);
|
||||
@@ -149,12 +153,13 @@ mod tests {
|
||||
"#;
|
||||
|
||||
let table_engine = MitoEngine::<EngineImpl<NoopLogStore>>::new(
|
||||
TableEngineConfig::default(),
|
||||
EngineImpl::new(
|
||||
EngineConfig::with_store_dir(&store_dir),
|
||||
StorageEngineConfig::default(),
|
||||
Arc::new(NoopLogStore::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
object_store.clone(),
|
||||
),
|
||||
object_store,
|
||||
);
|
||||
let sql_handler = SqlHandler::new(table_engine);
|
||||
|
||||
|
||||
@@ -1,56 +1,4 @@
|
||||
//! Engine config
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FileStoreConfig {
|
||||
/// Storage path
|
||||
pub store_dir: String,
|
||||
}
|
||||
//! storage engine config
|
||||
|
||||
impl Default for FileStoreConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
store_dir: "/tmp/greptimedb/".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ObjectStoreConfig {
|
||||
File(FileStoreConfig),
|
||||
}
|
||||
|
||||
impl Default for ObjectStoreConfig {
|
||||
fn default() -> Self {
|
||||
ObjectStoreConfig::File(FileStoreConfig::default())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct EngineConfig {
|
||||
pub store_config: ObjectStoreConfig,
|
||||
}
|
||||
|
||||
impl EngineConfig {
|
||||
pub fn with_store_dir(store_dir: &str) -> Self {
|
||||
Self {
|
||||
store_config: ObjectStoreConfig::File(FileStoreConfig {
|
||||
store_dir: store_dir.to_string(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_default_engine_config() {
|
||||
let engine_config = EngineConfig::default();
|
||||
|
||||
let store_dir = match &engine_config.store_config {
|
||||
ObjectStoreConfig::File(file) => &file.store_dir,
|
||||
};
|
||||
|
||||
assert_eq!("/tmp/greptimedb/", store_dir);
|
||||
}
|
||||
}
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct EngineConfig {}
|
||||
|
||||
@@ -3,15 +3,15 @@ use std::sync::{Arc, RwLock};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::logging::info;
|
||||
use object_store::{backend::fs::Backend, util, ObjectStore};
|
||||
use object_store::{util, ObjectStore};
|
||||
use snafu::ResultExt;
|
||||
use store_api::{
|
||||
logstore::LogStore,
|
||||
storage::{EngineContext, OpenOptions, RegionDescriptor, StorageEngine},
|
||||
storage::{CreateOptions, EngineContext, OpenOptions, RegionDescriptor, StorageEngine},
|
||||
};
|
||||
|
||||
use crate::background::JobPoolImpl;
|
||||
use crate::config::{EngineConfig, ObjectStoreConfig};
|
||||
use crate::config::EngineConfig;
|
||||
use crate::error::{self, Error, Result};
|
||||
use crate::flush::{FlushSchedulerImpl, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy};
|
||||
use crate::manifest::region::RegionManifest;
|
||||
@@ -55,8 +55,9 @@ impl<S: LogStore> StorageEngine for EngineImpl<S> {
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
descriptor: RegionDescriptor,
|
||||
opts: &CreateOptions,
|
||||
) -> Result<Self::Region> {
|
||||
self.inner.create_region(descriptor).await
|
||||
self.inner.create_region(descriptor, opts).await
|
||||
}
|
||||
|
||||
async fn drop_region(&self, _ctx: &EngineContext, _region: Self::Region) -> Result<()> {
|
||||
@@ -69,36 +70,21 @@ impl<S: LogStore> StorageEngine for EngineImpl<S> {
|
||||
}
|
||||
|
||||
impl<S: LogStore> EngineImpl<S> {
|
||||
pub async fn new(config: EngineConfig, log_store: Arc<S>) -> Result<Self> {
|
||||
Ok(Self {
|
||||
inner: Arc::new(EngineInner::new(config, log_store).await?),
|
||||
})
|
||||
pub fn new(config: EngineConfig, log_store: Arc<S>, object_store: ObjectStore) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(EngineInner::new(config, log_store, object_store)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
|
||||
// TODO(dennis): supports other backend
|
||||
let store_dir = util::normalize_dir(match store_config {
|
||||
ObjectStoreConfig::File(file) => &file.store_dir,
|
||||
});
|
||||
|
||||
let accessor = Backend::build()
|
||||
.root(&store_dir)
|
||||
.finish()
|
||||
.await
|
||||
.context(error::InitBackendSnafu { dir: &store_dir })?;
|
||||
|
||||
Ok(ObjectStore::new(accessor))
|
||||
#[inline]
|
||||
pub fn region_sst_dir(parent_dir: &str, region_name: &str) -> String {
|
||||
format!("{}{}/", parent_dir, region_name)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn region_sst_dir(region_name: &str) -> String {
|
||||
format!("{}/", region_name)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn region_manifest_dir(region_name: &str) -> String {
|
||||
format!("{}/manifest/", region_name)
|
||||
pub fn region_manifest_dir(parent_dir: &str, region_name: &str) -> String {
|
||||
format!("{}{}/manifest/", parent_dir, region_name)
|
||||
}
|
||||
|
||||
/// A slot for region in the engine.
|
||||
@@ -209,19 +195,18 @@ struct EngineInner<S: LogStore> {
|
||||
}
|
||||
|
||||
impl<S: LogStore> EngineInner<S> {
|
||||
pub async fn new(config: EngineConfig, log_store: Arc<S>) -> Result<Self> {
|
||||
pub fn new(_config: EngineConfig, log_store: Arc<S>, object_store: ObjectStore) -> Self {
|
||||
let job_pool = Arc::new(JobPoolImpl {});
|
||||
let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool));
|
||||
let object_store = new_object_store(&config.store_config).await?;
|
||||
|
||||
Ok(Self {
|
||||
Self {
|
||||
object_store,
|
||||
log_store,
|
||||
regions: RwLock::new(Default::default()),
|
||||
memtable_builder: Arc::new(DefaultMemtableBuilder {}),
|
||||
flush_scheduler,
|
||||
flush_strategy: Arc::new(SizeBasedStrategy::default()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the `Some(slot)` if there is existing slot with given `name`, or insert
|
||||
@@ -257,7 +242,7 @@ impl<S: LogStore> EngineInner<S> {
|
||||
let mut guard = SlotGuard::new(name, &self.regions);
|
||||
|
||||
// FIXME(yingwen): Get region id or remove dependency of region id.
|
||||
let store_config = self.region_store_config(name);
|
||||
let store_config = self.region_store_config(&opts.parent_dir, name);
|
||||
|
||||
let region = match RegionImpl::open(name.to_string(), store_config, opts).await? {
|
||||
None => return Ok(None),
|
||||
@@ -268,7 +253,11 @@ impl<S: LogStore> EngineInner<S> {
|
||||
Ok(Some(region))
|
||||
}
|
||||
|
||||
async fn create_region(&self, descriptor: RegionDescriptor) -> Result<RegionImpl<S>> {
|
||||
async fn create_region(
|
||||
&self,
|
||||
descriptor: RegionDescriptor,
|
||||
opts: &CreateOptions,
|
||||
) -> Result<RegionImpl<S>> {
|
||||
if let Some(slot) = self.get_or_occupy_slot(&descriptor.name, RegionSlot::Creating) {
|
||||
return slot.try_get_ready_region();
|
||||
}
|
||||
@@ -283,7 +272,7 @@ impl<S: LogStore> EngineInner<S> {
|
||||
.context(error::InvalidRegionDescSnafu {
|
||||
region: ®ion_name,
|
||||
})?;
|
||||
let store_config = self.region_store_config(®ion_name);
|
||||
let store_config = self.region_store_config(&opts.parent_dir, ®ion_name);
|
||||
|
||||
let region = RegionImpl::create(metadata, store_config).await?;
|
||||
|
||||
@@ -299,10 +288,12 @@ impl<S: LogStore> EngineInner<S> {
|
||||
slot.get_ready_region()
|
||||
}
|
||||
|
||||
fn region_store_config(&self, region_name: &str) -> StoreConfig<S> {
|
||||
let sst_dir = ®ion_sst_dir(region_name);
|
||||
fn region_store_config(&self, parent_dir: &str, region_name: &str) -> StoreConfig<S> {
|
||||
let parent_dir = util::normalize_dir(parent_dir);
|
||||
|
||||
let sst_dir = ®ion_sst_dir(&parent_dir, region_name);
|
||||
let sst_layer = Arc::new(FsAccessLayer::new(sst_dir, self.object_store.clone()));
|
||||
let manifest_dir = region_manifest_dir(region_name);
|
||||
let manifest_dir = region_manifest_dir(&parent_dir, region_name);
|
||||
let manifest = RegionManifest::new(&manifest_dir, self.object_store.clone());
|
||||
|
||||
StoreConfig {
|
||||
@@ -320,6 +311,7 @@ impl<S: LogStore> EngineInner<S> {
|
||||
mod tests {
|
||||
use datatypes::type_id::LogicalTypeId;
|
||||
use log_store::test_util::log_store_util;
|
||||
use object_store::backend::fs::Backend;
|
||||
use store_api::storage::Region;
|
||||
use tempdir::TempDir;
|
||||
|
||||
@@ -332,9 +324,12 @@ mod tests {
|
||||
log_store_util::create_tmp_local_file_log_store("test_engine_wal").await;
|
||||
let dir = TempDir::new("test_create_new_region").unwrap();
|
||||
let store_dir = dir.path().to_string_lossy();
|
||||
let config = EngineConfig::with_store_dir(&store_dir);
|
||||
let accessor = Backend::build().root(&store_dir).finish().await.unwrap();
|
||||
let object_store = ObjectStore::new(accessor);
|
||||
|
||||
let engine = EngineImpl::new(config, Arc::new(log_store)).await.unwrap();
|
||||
let config = EngineConfig::default();
|
||||
|
||||
let engine = EngineImpl::new(config, Arc::new(log_store), object_store);
|
||||
|
||||
let region_name = "region-0";
|
||||
let desc = RegionDescBuilder::new(region_name)
|
||||
@@ -342,7 +337,10 @@ mod tests {
|
||||
.push_value_column(("v1", LogicalTypeId::Float32, true))
|
||||
.build();
|
||||
let ctx = EngineContext::default();
|
||||
let region = engine.create_region(&ctx, desc).await.unwrap();
|
||||
let region = engine
|
||||
.create_region(&ctx, desc, &CreateOptions::default())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(region_name, region.name());
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ use store_api::storage::SequenceNumber;
|
||||
use crate::metadata::Error as MetadataError;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub(crate)))]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum Error {
|
||||
#[snafu(display("Invalid region descriptor, region: {}, source: {}", region, source))]
|
||||
InvalidRegionDesc {
|
||||
@@ -43,13 +43,6 @@ pub enum Error {
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to init backend, source: {}", source))]
|
||||
InitBackend {
|
||||
dir: String,
|
||||
source: std::io::Error,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to write parquet file, source: {}", source))]
|
||||
WriteParquet {
|
||||
source: arrow::error::ArrowError,
|
||||
@@ -162,8 +155,8 @@ pub enum Error {
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to decode region action list, {}", msg))]
|
||||
DecodeRegionMetaActionList { msg: String, backtrace: Backtrace },
|
||||
#[snafu(display("Failed to decode action list, {}", msg))]
|
||||
DecodeMetaActionList { msg: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Failed to read line, err: {}", source))]
|
||||
Readline { source: IoError },
|
||||
@@ -249,7 +242,7 @@ impl ErrorExt for Error {
|
||||
| DecodeJson { .. }
|
||||
| JoinTask { .. }
|
||||
| Cancelled { .. }
|
||||
| DecodeRegionMetaActionList { .. }
|
||||
| DecodeMetaActionList { .. }
|
||||
| Readline { .. }
|
||||
| InvalidParquetSchema { .. }
|
||||
| SequenceColumnNotFound { .. }
|
||||
@@ -258,7 +251,6 @@ impl ErrorExt for Error {
|
||||
| SequenceNotMonotonic { .. } => StatusCode::Unexpected,
|
||||
|
||||
FlushIo { .. }
|
||||
| InitBackend { .. }
|
||||
| WriteParquet { .. }
|
||||
| ReadObject { .. }
|
||||
| WriteObject { .. }
|
||||
|
||||
@@ -8,7 +8,7 @@ pub mod config;
|
||||
mod engine;
|
||||
pub mod error;
|
||||
mod flush;
|
||||
mod manifest;
|
||||
pub mod manifest;
|
||||
pub mod memtable;
|
||||
pub mod metadata;
|
||||
mod proto;
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
//! manifest storage
|
||||
pub(crate) mod action;
|
||||
pub(crate) mod checkpoint;
|
||||
pub mod helper;
|
||||
mod impl_;
|
||||
pub mod region;
|
||||
pub(crate) mod storage;
|
||||
#[cfg(test)]
|
||||
pub mod test_utils;
|
||||
|
||||
pub use self::impl_::*;
|
||||
|
||||
@@ -1,20 +1,19 @@
|
||||
use std::io::{BufRead, BufReader, Write};
|
||||
use std::io::{BufRead, BufReader};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json as json;
|
||||
use serde_json::ser::to_writer;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::manifest::action::ProtocolAction;
|
||||
use store_api::manifest::action::ProtocolVersion;
|
||||
use store_api::manifest::action::{ProtocolAction, ProtocolVersion, VersionHeader};
|
||||
use store_api::manifest::ManifestVersion;
|
||||
use store_api::manifest::MetaAction;
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::SequenceNumber;
|
||||
|
||||
use crate::error::{
|
||||
DecodeJsonSnafu, DecodeRegionMetaActionListSnafu, EncodeJsonSnafu,
|
||||
ManifestProtocolForbidReadSnafu, ReadlineSnafu, Result,
|
||||
self, DecodeJsonSnafu, DecodeMetaActionListSnafu, ManifestProtocolForbidReadSnafu,
|
||||
ReadlineSnafu, Result,
|
||||
};
|
||||
use crate::manifest::helper;
|
||||
use crate::metadata::{RegionMetadataRef, VersionNumber};
|
||||
use crate::sst::FileMeta;
|
||||
|
||||
@@ -50,13 +49,6 @@ pub enum RegionMetaAction {
|
||||
Edit(RegionEdit),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
struct VersionHeader {
|
||||
prev_version: ManifestVersion,
|
||||
}
|
||||
|
||||
const NEWLINE: &[u8] = b"\n";
|
||||
|
||||
impl RegionMetaActionList {
|
||||
pub fn with_action(action: RegionMetaAction) -> Self {
|
||||
Self {
|
||||
@@ -71,31 +63,21 @@ impl RegionMetaActionList {
|
||||
prev_version: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Encode self into json in the form of string lines, starts with prev_version and then action json list.
|
||||
pub(crate) fn encode(&self) -> Result<Vec<u8>> {
|
||||
let mut bytes = Vec::default();
|
||||
impl MetaAction for RegionMetaActionList {
|
||||
type Error = error::Error;
|
||||
|
||||
{
|
||||
// Encode prev_version
|
||||
let v = VersionHeader {
|
||||
prev_version: self.prev_version,
|
||||
};
|
||||
|
||||
to_writer(&mut bytes, &v).context(EncodeJsonSnafu)?;
|
||||
// unwrap is fine here, because we write into a buffer.
|
||||
bytes.write_all(NEWLINE).unwrap();
|
||||
}
|
||||
|
||||
for action in &self.actions {
|
||||
to_writer(&mut bytes, action).context(EncodeJsonSnafu)?;
|
||||
bytes.write_all(NEWLINE).unwrap();
|
||||
}
|
||||
|
||||
Ok(bytes)
|
||||
fn set_prev_version(&mut self, version: ManifestVersion) {
|
||||
self.prev_version = version;
|
||||
}
|
||||
|
||||
pub(crate) fn decode(
|
||||
/// Encode self into json in the form of string lines, starts with prev_version and then action json list.
|
||||
fn encode(&self) -> Result<Vec<u8>> {
|
||||
helper::encode_actions(self.prev_version, &self.actions)
|
||||
}
|
||||
|
||||
fn decode(
|
||||
bs: &[u8],
|
||||
reader_version: ProtocolVersion,
|
||||
) -> Result<(Self, Option<ProtocolAction>)> {
|
||||
@@ -109,7 +91,7 @@ impl RegionMetaActionList {
|
||||
{
|
||||
let first_line = lines
|
||||
.next()
|
||||
.with_context(|| DecodeRegionMetaActionListSnafu {
|
||||
.with_context(|| DecodeMetaActionListSnafu {
|
||||
msg: format!(
|
||||
"Invalid content in manifest: {}",
|
||||
std::str::from_utf8(bs).unwrap_or("**invalid bytes**")
|
||||
@@ -148,12 +130,6 @@ impl RegionMetaActionList {
|
||||
}
|
||||
}
|
||||
|
||||
impl MetaAction for RegionMetaActionList {
|
||||
fn set_prev_version(&mut self, version: ManifestVersion) {
|
||||
self.prev_version = version;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_telemetry::logging;
|
||||
|
||||
33
src/storage/src/manifest/helper.rs
Normal file
33
src/storage/src/manifest/helper.rs
Normal file
@@ -0,0 +1,33 @@
|
||||
use std::io::Write;
|
||||
|
||||
use serde::Serialize;
|
||||
use serde_json::to_writer;
|
||||
use snafu::ResultExt;
|
||||
use store_api::manifest::action::VersionHeader;
|
||||
use store_api::manifest::ManifestVersion;
|
||||
|
||||
use crate::error::{EncodeJsonSnafu, Result};
|
||||
|
||||
pub const NEWLINE: &[u8] = b"\n";
|
||||
|
||||
pub fn encode_actions<T: Serialize>(
|
||||
prev_version: ManifestVersion,
|
||||
actions: &[T],
|
||||
) -> Result<Vec<u8>> {
|
||||
let mut bytes = Vec::default();
|
||||
{
|
||||
// Encode prev_version
|
||||
let v = VersionHeader { prev_version };
|
||||
|
||||
to_writer(&mut bytes, &v).context(EncodeJsonSnafu)?;
|
||||
// unwrap is fine here, because we write into a buffer.
|
||||
bytes.write_all(NEWLINE).unwrap();
|
||||
}
|
||||
|
||||
for action in actions {
|
||||
to_writer(&mut bytes, action).context(EncodeJsonSnafu)?;
|
||||
bytes.write_all(NEWLINE).unwrap();
|
||||
}
|
||||
|
||||
Ok(bytes)
|
||||
}
|
||||
174
src/storage/src/manifest/impl.rs
Normal file
174
src/storage/src/manifest/impl.rs
Normal file
@@ -0,0 +1,174 @@
|
||||
use std::sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
};
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::logging;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ensure;
|
||||
use store_api::manifest::action::{self, ProtocolAction, ProtocolVersion};
|
||||
use store_api::manifest::*;
|
||||
|
||||
use crate::error::{Error, ManifestProtocolForbidWriteSnafu, Result};
|
||||
use crate::manifest::action::*;
|
||||
use crate::manifest::storage::ManifestObjectStore;
|
||||
use crate::manifest::storage::ObjectStoreLogIterator;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RegionManifest {
|
||||
inner: Arc<RegionManifestInner>,
|
||||
}
|
||||
|
||||
impl RegionManifest {
|
||||
pub fn new(manifest_dir: &str, object_store: ObjectStore) -> Self {
|
||||
RegionManifest {
|
||||
inner: Arc::new(RegionManifestInner::new(manifest_dir, object_store)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Update inner state.
|
||||
pub fn update_state(&self, version: ManifestVersion, protocol: Option<ProtocolAction>) {
|
||||
self.inner.update_state(version, protocol);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Manifest for RegionManifest {
|
||||
type Error = Error;
|
||||
type MetaAction = RegionMetaActionList;
|
||||
type MetaActionIterator = RegionMetaActionListIterator;
|
||||
|
||||
async fn update(&self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
|
||||
self.inner.save(action_list).await
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
start: ManifestVersion,
|
||||
end: ManifestVersion,
|
||||
) -> Result<RegionMetaActionListIterator> {
|
||||
self.inner.scan(start, end).await
|
||||
}
|
||||
|
||||
async fn checkpoint(&self) -> Result<ManifestVersion> {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
fn last_version(&self) -> ManifestVersion {
|
||||
self.inner.last_version()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RegionManifestInner {
|
||||
store: Arc<ManifestObjectStore>,
|
||||
version: AtomicU64,
|
||||
/// Current using protocol
|
||||
protocol: ArcSwap<ProtocolAction>,
|
||||
/// Current node supported protocols (reader_version, writer_version)
|
||||
supported_reader_version: ProtocolVersion,
|
||||
supported_writer_version: ProtocolVersion,
|
||||
}
|
||||
|
||||
pub struct RegionMetaActionListIterator {
|
||||
log_iter: ObjectStoreLogIterator,
|
||||
reader_version: ProtocolVersion,
|
||||
last_protocol: Option<ProtocolAction>,
|
||||
}
|
||||
|
||||
impl RegionMetaActionListIterator {
|
||||
pub fn last_protocol(&self) -> &Option<ProtocolAction> {
|
||||
&self.last_protocol
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl MetaActionIterator for RegionMetaActionListIterator {
|
||||
type Error = Error;
|
||||
type MetaAction = RegionMetaActionList;
|
||||
|
||||
async fn next_action(&mut self) -> Result<Option<(ManifestVersion, RegionMetaActionList)>> {
|
||||
match self.log_iter.next_log().await? {
|
||||
Some((v, bytes)) => {
|
||||
let (action_list, protocol) =
|
||||
RegionMetaActionList::decode(&bytes, self.reader_version)?;
|
||||
|
||||
if protocol.is_some() {
|
||||
self.last_protocol = protocol;
|
||||
}
|
||||
|
||||
Ok(Some((v, action_list)))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RegionManifestInner {
|
||||
fn new(manifest_dir: &str, object_store: ObjectStore) -> Self {
|
||||
let (reader_version, writer_version) = action::supported_protocol_version();
|
||||
|
||||
Self {
|
||||
store: Arc::new(ManifestObjectStore::new(manifest_dir, object_store)),
|
||||
version: AtomicU64::new(0),
|
||||
protocol: ArcSwap::new(Arc::new(ProtocolAction::new())),
|
||||
supported_reader_version: reader_version,
|
||||
supported_writer_version: writer_version,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn inc_version(&self) -> ManifestVersion {
|
||||
self.version.fetch_add(1, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn update_state(&self, version: ManifestVersion, protocol: Option<ProtocolAction>) {
|
||||
self.version.store(version, Ordering::Relaxed);
|
||||
if let Some(p) = protocol {
|
||||
self.protocol.store(Arc::new(p));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn last_version(&self) -> ManifestVersion {
|
||||
self.version.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
async fn save(&self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
|
||||
let protocol = self.protocol.load();
|
||||
|
||||
ensure!(
|
||||
protocol.is_writable(self.supported_writer_version),
|
||||
ManifestProtocolForbidWriteSnafu {
|
||||
min_version: protocol.min_writer_version,
|
||||
supported_version: self.supported_writer_version,
|
||||
}
|
||||
);
|
||||
|
||||
let version = self.inc_version();
|
||||
|
||||
logging::debug!(
|
||||
"Save region metadata action: {:?}, version: {}",
|
||||
action_list,
|
||||
version
|
||||
);
|
||||
|
||||
self.store.save(version, &action_list.encode()?).await?;
|
||||
|
||||
Ok(version)
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
start: ManifestVersion,
|
||||
end: ManifestVersion,
|
||||
) -> Result<RegionMetaActionListIterator> {
|
||||
Ok(RegionMetaActionListIterator {
|
||||
log_iter: self.store.scan(start, end).await?,
|
||||
reader_version: self.supported_reader_version,
|
||||
last_protocol: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
177
src/storage/src/manifest/impl_.rs
Normal file
177
src/storage/src/manifest/impl_.rs
Normal file
@@ -0,0 +1,177 @@
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
};
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::logging;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ensure;
|
||||
use store_api::manifest::action::{self, ProtocolAction, ProtocolVersion};
|
||||
use store_api::manifest::*;
|
||||
|
||||
use crate::error::{Error, ManifestProtocolForbidWriteSnafu, Result};
|
||||
use crate::manifest::storage::ManifestObjectStore;
|
||||
use crate::manifest::storage::ObjectStoreLogIterator;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ManifestImpl<M: MetaAction<Error = Error>> {
|
||||
inner: Arc<ManifestImplInner<M>>,
|
||||
}
|
||||
|
||||
impl<M: MetaAction<Error = Error>> ManifestImpl<M> {
|
||||
pub fn new(manifest_dir: &str, object_store: ObjectStore) -> Self {
|
||||
ManifestImpl {
|
||||
inner: Arc::new(ManifestImplInner::new(manifest_dir, object_store)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Update inner state.
|
||||
pub fn update_state(&self, version: ManifestVersion, protocol: Option<ProtocolAction>) {
|
||||
self.inner.update_state(version, protocol);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<M: 'static + MetaAction<Error = Error>> Manifest for ManifestImpl<M> {
|
||||
type Error = Error;
|
||||
type MetaAction = M;
|
||||
type MetaActionIterator = MetaActionIteratorImpl<M>;
|
||||
|
||||
async fn update(&self, action_list: M) -> Result<ManifestVersion> {
|
||||
self.inner.save(action_list).await
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
start: ManifestVersion,
|
||||
end: ManifestVersion,
|
||||
) -> Result<Self::MetaActionIterator> {
|
||||
self.inner.scan(start, end).await
|
||||
}
|
||||
|
||||
async fn checkpoint(&self) -> Result<ManifestVersion> {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
fn last_version(&self) -> ManifestVersion {
|
||||
self.inner.last_version()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ManifestImplInner<M: MetaAction<Error = Error>> {
|
||||
store: Arc<ManifestObjectStore>,
|
||||
version: AtomicU64,
|
||||
/// Current using protocol
|
||||
protocol: ArcSwap<ProtocolAction>,
|
||||
/// Current node supported protocols (reader_version, writer_version)
|
||||
supported_reader_version: ProtocolVersion,
|
||||
supported_writer_version: ProtocolVersion,
|
||||
_phantom: PhantomData<M>,
|
||||
}
|
||||
|
||||
pub struct MetaActionIteratorImpl<M: MetaAction<Error = Error>> {
|
||||
log_iter: ObjectStoreLogIterator,
|
||||
reader_version: ProtocolVersion,
|
||||
last_protocol: Option<ProtocolAction>,
|
||||
_phantom: PhantomData<M>,
|
||||
}
|
||||
|
||||
impl<M: MetaAction<Error = Error>> MetaActionIteratorImpl<M> {
|
||||
pub fn last_protocol(&self) -> &Option<ProtocolAction> {
|
||||
&self.last_protocol
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<M: MetaAction<Error = Error>> MetaActionIterator for MetaActionIteratorImpl<M> {
|
||||
type Error = Error;
|
||||
type MetaAction = M;
|
||||
|
||||
async fn next_action(&mut self) -> Result<Option<(ManifestVersion, M)>> {
|
||||
match self.log_iter.next_log().await? {
|
||||
Some((v, bytes)) => {
|
||||
let (action_list, protocol) = M::decode(&bytes, self.reader_version)?;
|
||||
|
||||
if protocol.is_some() {
|
||||
self.last_protocol = protocol;
|
||||
}
|
||||
|
||||
Ok(Some((v, action_list)))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: MetaAction<Error = Error>> ManifestImplInner<M> {
|
||||
fn new(manifest_dir: &str, object_store: ObjectStore) -> Self {
|
||||
let (reader_version, writer_version) = action::supported_protocol_version();
|
||||
|
||||
Self {
|
||||
store: Arc::new(ManifestObjectStore::new(manifest_dir, object_store)),
|
||||
version: AtomicU64::new(0),
|
||||
protocol: ArcSwap::new(Arc::new(ProtocolAction::new())),
|
||||
supported_reader_version: reader_version,
|
||||
supported_writer_version: writer_version,
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn inc_version(&self) -> ManifestVersion {
|
||||
self.version.fetch_add(1, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn update_state(&self, version: ManifestVersion, protocol: Option<ProtocolAction>) {
|
||||
self.version.store(version, Ordering::Relaxed);
|
||||
if let Some(p) = protocol {
|
||||
self.protocol.store(Arc::new(p));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn last_version(&self) -> ManifestVersion {
|
||||
self.version.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
async fn save(&self, action_list: M) -> Result<ManifestVersion> {
|
||||
let protocol = self.protocol.load();
|
||||
|
||||
ensure!(
|
||||
protocol.is_writable(self.supported_writer_version),
|
||||
ManifestProtocolForbidWriteSnafu {
|
||||
min_version: protocol.min_writer_version,
|
||||
supported_version: self.supported_writer_version,
|
||||
}
|
||||
);
|
||||
|
||||
let version = self.inc_version();
|
||||
|
||||
logging::debug!(
|
||||
"Save region metadata action: {:?}, version: {}",
|
||||
action_list,
|
||||
version
|
||||
);
|
||||
|
||||
self.store.save(version, &action_list.encode()?).await?;
|
||||
|
||||
Ok(version)
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
start: ManifestVersion,
|
||||
end: ManifestVersion,
|
||||
) -> Result<MetaActionIteratorImpl<M>> {
|
||||
Ok(MetaActionIteratorImpl {
|
||||
log_iter: self.store.scan(start, end).await?,
|
||||
reader_version: self.supported_reader_version,
|
||||
last_protocol: None,
|
||||
_phantom: PhantomData,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,182 +1,15 @@
|
||||
//! Region manifest impl
|
||||
use std::sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
};
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::logging;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ensure;
|
||||
use store_api::manifest::action::{self, ProtocolAction, ProtocolVersion};
|
||||
use store_api::manifest::*;
|
||||
|
||||
use crate::error::{Error, ManifestProtocolForbidWriteSnafu, Result};
|
||||
use crate::manifest::action::*;
|
||||
use crate::manifest::storage::ManifestObjectStore;
|
||||
use crate::manifest::storage::ObjectStoreLogIterator;
|
||||
use crate::manifest::ManifestImpl;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RegionManifest {
|
||||
inner: Arc<RegionManifestInner>,
|
||||
}
|
||||
|
||||
impl RegionManifest {
|
||||
pub fn new(manifest_dir: &str, object_store: ObjectStore) -> Self {
|
||||
RegionManifest {
|
||||
inner: Arc::new(RegionManifestInner::new(manifest_dir, object_store)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Update inner state.
|
||||
pub fn update_state(&self, version: ManifestVersion, protocol: Option<ProtocolAction>) {
|
||||
self.inner.update_state(version, protocol);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Manifest for RegionManifest {
|
||||
type Error = Error;
|
||||
type MetaAction = RegionMetaActionList;
|
||||
type MetaActionIterator = RegionMetaActionListIterator;
|
||||
|
||||
async fn update(&self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
|
||||
self.inner.save(action_list).await
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
start: ManifestVersion,
|
||||
end: ManifestVersion,
|
||||
) -> Result<RegionMetaActionListIterator> {
|
||||
self.inner.scan(start, end).await
|
||||
}
|
||||
|
||||
async fn checkpoint(&self) -> Result<ManifestVersion> {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
fn last_version(&self) -> ManifestVersion {
|
||||
self.inner.last_version()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RegionManifestInner {
|
||||
store: Arc<ManifestObjectStore>,
|
||||
version: AtomicU64,
|
||||
/// Current using protocol
|
||||
protocol: ArcSwap<ProtocolAction>,
|
||||
/// Current node supported protocols (reader_version, writer_version)
|
||||
supported_reader_version: ProtocolVersion,
|
||||
supported_writer_version: ProtocolVersion,
|
||||
}
|
||||
|
||||
pub struct RegionMetaActionListIterator {
|
||||
log_iter: ObjectStoreLogIterator,
|
||||
reader_version: ProtocolVersion,
|
||||
last_protocol: Option<ProtocolAction>,
|
||||
}
|
||||
|
||||
impl RegionMetaActionListIterator {
|
||||
pub fn last_protocol(&self) -> &Option<ProtocolAction> {
|
||||
&self.last_protocol
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl MetaActionIterator for RegionMetaActionListIterator {
|
||||
type Error = Error;
|
||||
type MetaAction = RegionMetaActionList;
|
||||
|
||||
async fn next_action(&mut self) -> Result<Option<(ManifestVersion, RegionMetaActionList)>> {
|
||||
match self.log_iter.next_log().await? {
|
||||
Some((v, bytes)) => {
|
||||
let (action_list, protocol) =
|
||||
RegionMetaActionList::decode(&bytes, self.reader_version)?;
|
||||
|
||||
if protocol.is_some() {
|
||||
self.last_protocol = protocol;
|
||||
}
|
||||
|
||||
Ok(Some((v, action_list)))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RegionManifestInner {
|
||||
fn new(manifest_dir: &str, object_store: ObjectStore) -> Self {
|
||||
let (reader_version, writer_version) = action::supported_protocol_version();
|
||||
|
||||
Self {
|
||||
store: Arc::new(ManifestObjectStore::new(manifest_dir, object_store)),
|
||||
version: AtomicU64::new(0),
|
||||
protocol: ArcSwap::new(Arc::new(ProtocolAction::new())),
|
||||
supported_reader_version: reader_version,
|
||||
supported_writer_version: writer_version,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn inc_version(&self) -> ManifestVersion {
|
||||
self.version.fetch_add(1, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn update_state(&self, version: ManifestVersion, protocol: Option<ProtocolAction>) {
|
||||
self.version.store(version, Ordering::Relaxed);
|
||||
if let Some(p) = protocol {
|
||||
self.protocol.store(Arc::new(p));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn last_version(&self) -> ManifestVersion {
|
||||
self.version.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
async fn save(&self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
|
||||
let protocol = self.protocol.load();
|
||||
|
||||
ensure!(
|
||||
protocol.is_writable(self.supported_writer_version),
|
||||
ManifestProtocolForbidWriteSnafu {
|
||||
min_version: protocol.min_writer_version,
|
||||
supported_version: self.supported_writer_version,
|
||||
}
|
||||
);
|
||||
|
||||
let version = self.inc_version();
|
||||
|
||||
logging::debug!(
|
||||
"Save region metadata action: {:?}, version: {}",
|
||||
action_list,
|
||||
version
|
||||
);
|
||||
|
||||
self.store.save(version, &action_list.encode()?).await?;
|
||||
|
||||
Ok(version)
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
start: ManifestVersion,
|
||||
end: ManifestVersion,
|
||||
) -> Result<RegionMetaActionListIterator> {
|
||||
Ok(RegionMetaActionListIterator {
|
||||
log_iter: self.store.scan(start, end).await?,
|
||||
reader_version: self.supported_reader_version,
|
||||
last_protocol: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
pub type RegionManifest = ManifestImpl<RegionMetaActionList>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use object_store::{backend::fs, ObjectStore};
|
||||
use store_api::manifest::{Manifest, MetaActionIterator, MAX_VERSION};
|
||||
use tempdir::TempDir;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -103,7 +103,7 @@ async fn test_flush_and_stall() {
|
||||
tester.put(&data).await;
|
||||
|
||||
// Check parquet files.
|
||||
let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir(REGION_NAME));
|
||||
let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME));
|
||||
let mut has_parquet_file = false;
|
||||
for entry in std::fs::read_dir(sst_dir).unwrap() {
|
||||
let entry = entry.unwrap();
|
||||
|
||||
@@ -20,8 +20,9 @@ pub async fn new_store_config(
|
||||
region_name: &str,
|
||||
store_dir: &str,
|
||||
) -> StoreConfig<LocalFileLogStore> {
|
||||
let sst_dir = engine::region_sst_dir(region_name);
|
||||
let manifest_dir = engine::region_manifest_dir(region_name);
|
||||
let parent_dir = "";
|
||||
let sst_dir = engine::region_sst_dir(parent_dir, region_name);
|
||||
let manifest_dir = engine::region_manifest_dir(parent_dir, region_name);
|
||||
|
||||
let accessor = Backend::build().root(store_dir).finish().await.unwrap();
|
||||
let object_store = ObjectStore::new(accessor);
|
||||
|
||||
@@ -6,14 +6,29 @@ use async_trait::async_trait;
|
||||
use common_error::ext::ErrorExt;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
|
||||
use crate::manifest::action::ProtocolAction;
|
||||
use crate::manifest::action::ProtocolVersion;
|
||||
pub use crate::manifest::storage::*;
|
||||
|
||||
pub type ManifestVersion = u64;
|
||||
pub const MIN_VERSION: u64 = 0;
|
||||
pub const MAX_VERSION: u64 = u64::MAX;
|
||||
|
||||
pub trait MetaAction: Serialize + DeserializeOwned {
|
||||
pub trait MetaAction: Serialize + DeserializeOwned + Send + Sync + Clone + std::fmt::Debug {
|
||||
type Error: ErrorExt + Send + Sync;
|
||||
|
||||
/// Set previous valid manifest version.
|
||||
fn set_prev_version(&mut self, version: ManifestVersion);
|
||||
|
||||
/// Encode this action into a byte vector
|
||||
fn encode(&self) -> Result<Vec<u8>, Self::Error>;
|
||||
|
||||
/// Decode self from byte slice with reader protocol version,
|
||||
/// return error when reader version is not supported.
|
||||
fn decode(
|
||||
bs: &[u8],
|
||||
reader_version: ProtocolVersion,
|
||||
) -> Result<(Self, Option<ProtocolAction>), Self::Error>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
///! Common actions for manifest
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::manifest::ManifestVersion;
|
||||
|
||||
pub type ProtocolVersion = u16;
|
||||
|
||||
/// Current reader and writer versions
|
||||
@@ -23,6 +25,11 @@ pub struct ProtocolAction {
|
||||
pub min_writer_version: ProtocolVersion,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub struct VersionHeader {
|
||||
pub prev_version: ManifestVersion,
|
||||
}
|
||||
|
||||
impl Default for ProtocolAction {
|
||||
fn default() -> Self {
|
||||
let (min_reader_version, min_writer_version) = supported_protocol_version();
|
||||
|
||||
@@ -16,7 +16,7 @@ pub use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
|
||||
pub use self::chunk::{Chunk, ChunkReader};
|
||||
pub use self::descriptors::*;
|
||||
pub use self::engine::{EngineContext, OpenOptions, StorageEngine};
|
||||
pub use self::engine::{CreateOptions, EngineContext, OpenOptions, StorageEngine};
|
||||
pub use self::metadata::RegionMeta;
|
||||
pub use self::region::{Region, WriteContext};
|
||||
pub use self::requests::{GetRequest, PutOperation, ScanRequest, WriteRequest};
|
||||
|
||||
@@ -39,6 +39,7 @@ pub trait StorageEngine: Send + Sync + Clone + 'static {
|
||||
&self,
|
||||
ctx: &EngineContext,
|
||||
descriptor: RegionDescriptor,
|
||||
opts: &CreateOptions,
|
||||
) -> Result<Self::Region, Self::Error>;
|
||||
|
||||
/// Drops given region.
|
||||
@@ -62,6 +63,16 @@ pub trait StorageEngine: Send + Sync + Clone + 'static {
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct EngineContext {}
|
||||
|
||||
/// Options to create a region.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct CreateOptions {
|
||||
/// Region parent directory
|
||||
pub parent_dir: String,
|
||||
}
|
||||
|
||||
/// Options to open a region.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct OpenOptions {}
|
||||
pub struct OpenOptions {
|
||||
/// Region parent directory
|
||||
pub parent_dir: String,
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
arc-swap = "1.0"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
@@ -15,6 +16,9 @@ datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , b
|
||||
datatypes = { path = "../datatypes" }
|
||||
futures = "0.3"
|
||||
log-store = { path = "../log-store" }
|
||||
object-store = { path = "../object-store" }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
storage ={ path = "../storage" }
|
||||
store-api ={ path = "../store-api" }
|
||||
|
||||
4
src/table-engine/src/config.rs
Normal file
4
src/table-engine/src/config.rs
Normal file
@@ -0,0 +1,4 @@
|
||||
//! Table Engine config
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct EngineConfig {}
|
||||
@@ -6,11 +6,12 @@ use std::sync::RwLock;
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_telemetry::logging;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::{
|
||||
self, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId,
|
||||
OpenOptions, Region, RegionDescriptorBuilder, RegionMeta, RowKeyDescriptor,
|
||||
RowKeyDescriptorBuilder, StorageEngine,
|
||||
CreateOptions, OpenOptions, Region, RegionDescriptorBuilder, RegionId, RegionMeta,
|
||||
RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine,
|
||||
};
|
||||
use table::engine::{EngineContext, TableEngine};
|
||||
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
|
||||
@@ -21,15 +22,32 @@ use table::{
|
||||
};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
use crate::error::{
|
||||
self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu,
|
||||
BuildRowKeyDescriptorSnafu, MissingTimestampIndexSnafu, Result,
|
||||
};
|
||||
use crate::manifest::TableManifest;
|
||||
use crate::table::MitoTable;
|
||||
|
||||
pub const MITO_ENGINE: &str = "mito";
|
||||
const INIT_COLUMN_ID: ColumnId = 0;
|
||||
|
||||
#[inline]
|
||||
fn region_name(id: RegionId) -> String {
|
||||
format!("{:010}", id)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn table_dir(table_name: &str) -> String {
|
||||
format!("{}/", table_name)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn table_manifest_dir(table_name: &str) -> String {
|
||||
format!("{}/manifest/", table_name)
|
||||
}
|
||||
|
||||
/// [TableEngine] implementation.
|
||||
///
|
||||
/// About mito <https://en.wikipedia.org/wiki/Alfa_Romeo_MiTo>.
|
||||
@@ -40,9 +58,9 @@ pub struct MitoEngine<S: StorageEngine> {
|
||||
}
|
||||
|
||||
impl<S: StorageEngine> MitoEngine<S> {
|
||||
pub fn new(storage_engine: S) -> Self {
|
||||
pub fn new(config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(MitoEngineInner::new(storage_engine)),
|
||||
inner: Arc::new(MitoEngineInner::new(config, storage_engine, object_store)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -100,6 +118,7 @@ struct MitoEngineInner<S: StorageEngine> {
|
||||
///
|
||||
/// Writing to `tables` should also hold the `table_mutex`.
|
||||
tables: RwLock<HashMap<String, TableRef>>,
|
||||
object_store: ObjectStore,
|
||||
storage_engine: S,
|
||||
// FIXME(yingwen): Remove `next_table_id`. Table id should be assigned by other module (maybe catalog).
|
||||
next_table_id: AtomicU64,
|
||||
@@ -109,10 +128,11 @@ struct MitoEngineInner<S: StorageEngine> {
|
||||
}
|
||||
|
||||
impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
fn new(storage_engine: S) -> Self {
|
||||
fn new(_config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self {
|
||||
Self {
|
||||
tables: RwLock::new(HashMap::default()),
|
||||
storage_engine,
|
||||
object_store,
|
||||
next_table_id: AtomicU64::new(0),
|
||||
table_mutex: Mutex::new(()),
|
||||
}
|
||||
@@ -246,10 +266,11 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
build_column_family_from_request(INIT_COLUMN_ID, &request)?;
|
||||
let (next_column_id, row_key) = build_row_key_desc_from_schema(next_column_id, &request)?;
|
||||
|
||||
// Now we just use table name as region name. TODO(yingwen): Naming pattern of region.
|
||||
let region_name = table_name.clone();
|
||||
// TODO(dennis): supports multi regions;
|
||||
let region_id = 0;
|
||||
let region_name = region_name(region_id);
|
||||
let region_descriptor = RegionDescriptorBuilder::default()
|
||||
.id(0)
|
||||
.id(region_id)
|
||||
.name(®ion_name)
|
||||
.row_key(row_key)
|
||||
.default_cf(default_cf)
|
||||
@@ -265,9 +286,13 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
return Ok(table);
|
||||
}
|
||||
|
||||
let opts = CreateOptions {
|
||||
parent_dir: table_dir(table_name),
|
||||
};
|
||||
|
||||
let region = self
|
||||
.storage_engine
|
||||
.create_region(&storage::EngineContext::default(), region_descriptor)
|
||||
.create_region(&storage::EngineContext::default(), region_descriptor, &opts)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::CreateRegionSnafu)?;
|
||||
@@ -289,10 +314,13 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
.build()
|
||||
.context(error::BuildTableInfoSnafu { table_name })?;
|
||||
|
||||
let manifest =
|
||||
TableManifest::new(&table_manifest_dir(table_name), self.object_store.clone());
|
||||
|
||||
//TODO(dennis): persist table info to table manifest service.
|
||||
logging::info!("Mito engine created table: {:?}.", table_info);
|
||||
|
||||
let table = Arc::new(MitoTable::new(table_info, region));
|
||||
let table = Arc::new(MitoTable::new(table_info, region, manifest));
|
||||
|
||||
self.tables
|
||||
.write()
|
||||
@@ -323,13 +351,17 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
}
|
||||
|
||||
let engine_ctx = storage::EngineContext::default();
|
||||
let opts = OpenOptions::default();
|
||||
let region_name = table_name;
|
||||
// Now we just use table name as region name. TODO(yingwen): Naming pattern of region.
|
||||
let opts = OpenOptions {
|
||||
parent_dir: table_dir(table_name),
|
||||
};
|
||||
|
||||
// TODO(dennis): supports multi regions
|
||||
let region_id = 0;
|
||||
let region_name = region_name(region_id);
|
||||
|
||||
let region = match self
|
||||
.storage_engine
|
||||
.open_region(&engine_ctx, region_name, &opts)
|
||||
.open_region(&engine_ctx, ®ion_name, &opts)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::OpenRegionSnafu { region_name })?
|
||||
@@ -353,8 +385,10 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
.table_type(TableType::Base)
|
||||
.build()
|
||||
.context(error::BuildTableInfoSnafu { table_name })?;
|
||||
let manifest =
|
||||
TableManifest::new(&table_manifest_dir(table_name), self.object_store.clone());
|
||||
|
||||
let table = Arc::new(MitoTable::new(table_info, region));
|
||||
let table = Arc::new(MitoTable::new(table_info, region, manifest));
|
||||
|
||||
self.tables
|
||||
.write()
|
||||
@@ -447,8 +481,9 @@ mod tests {
|
||||
table_id: 0,
|
||||
};
|
||||
|
||||
let (engine, table) = {
|
||||
let (engine, table_engine, table) = test_util::setup_mock_engine_and_table().await;
|
||||
let (engine, table, object_store) = {
|
||||
let (engine, table_engine, table, object_store) =
|
||||
test_util::setup_mock_engine_and_table().await;
|
||||
assert_eq!(MITO_ENGINE, table_engine.name());
|
||||
// Now try to open the table again.
|
||||
let reopened = table_engine
|
||||
@@ -458,11 +493,11 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(table.schema(), reopened.schema());
|
||||
|
||||
(engine, table)
|
||||
(engine, table, object_store)
|
||||
};
|
||||
|
||||
// Construct a new table engine, and try to open the table.
|
||||
let table_engine = MitoEngine::new(engine);
|
||||
let table_engine = MitoEngine::new(EngineConfig::default(), engine, object_store);
|
||||
let reopened = table_engine
|
||||
.open_table(&ctx, open_req.clone())
|
||||
.await
|
||||
|
||||
@@ -111,7 +111,6 @@ impl ErrorExt for Error {
|
||||
|
||||
match self {
|
||||
CreateRegion { source, .. } | OpenRegion { source, .. } => source.status_code(),
|
||||
|
||||
BuildRowKeyDescriptor { .. }
|
||||
| BuildColumnDescriptor { .. }
|
||||
| BuildColumnFamilyDescriptor { .. }
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
pub mod config;
|
||||
pub mod engine;
|
||||
pub mod error;
|
||||
mod manifest;
|
||||
pub mod table;
|
||||
|
||||
11
src/table-engine/src/manifest.rs
Normal file
11
src/table-engine/src/manifest.rs
Normal file
@@ -0,0 +1,11 @@
|
||||
//! Table manifest service
|
||||
mod action;
|
||||
|
||||
use storage::manifest::ManifestImpl;
|
||||
|
||||
use crate::manifest::action::*;
|
||||
|
||||
pub type TableManifest = ManifestImpl<TableMetaActionList>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {}
|
||||
106
src/table-engine/src/manifest/action.rs
Normal file
106
src/table-engine/src/manifest/action.rs
Normal file
@@ -0,0 +1,106 @@
|
||||
use std::io::{BufRead, BufReader};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json as json;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use storage::error::{
|
||||
DecodeJsonSnafu, DecodeMetaActionListSnafu, Error as StorageError,
|
||||
ManifestProtocolForbidReadSnafu, ReadlineSnafu,
|
||||
};
|
||||
use storage::manifest::helper;
|
||||
use store_api::manifest::action::{ProtocolAction, ProtocolVersion, VersionHeader};
|
||||
use store_api::manifest::ManifestVersion;
|
||||
use store_api::manifest::MetaAction;
|
||||
use table::metadata::{TableIdent, TableInfo};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub struct TableChange {
|
||||
pub table_info: TableInfo,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub struct TableRemove {
|
||||
pub table_ident: TableIdent,
|
||||
pub table_name: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub enum TableMetaAction {
|
||||
Protocol(ProtocolAction),
|
||||
Change(TableChange),
|
||||
Remove(TableRemove),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub struct TableMetaActionList {
|
||||
pub actions: Vec<TableMetaAction>,
|
||||
pub prev_version: ManifestVersion,
|
||||
}
|
||||
|
||||
impl MetaAction for TableMetaActionList {
|
||||
type Error = StorageError;
|
||||
|
||||
fn set_prev_version(&mut self, version: ManifestVersion) {
|
||||
self.prev_version = version;
|
||||
}
|
||||
|
||||
fn encode(&self) -> Result<Vec<u8>, Self::Error> {
|
||||
helper::encode_actions(self.prev_version, &self.actions)
|
||||
}
|
||||
|
||||
/// TODO(dennis): duplicated code with RegionMetaActionList::decode, try to refactor it.
|
||||
fn decode(
|
||||
bs: &[u8],
|
||||
reader_version: ProtocolVersion,
|
||||
) -> Result<(Self, Option<ProtocolAction>), Self::Error> {
|
||||
let mut lines = BufReader::new(bs).lines();
|
||||
|
||||
let mut action_list = TableMetaActionList {
|
||||
actions: Vec::default(),
|
||||
prev_version: 0,
|
||||
};
|
||||
|
||||
{
|
||||
let first_line = lines
|
||||
.next()
|
||||
.with_context(|| DecodeMetaActionListSnafu {
|
||||
msg: format!(
|
||||
"Invalid content in manifest: {}",
|
||||
std::str::from_utf8(bs).unwrap_or("**invalid bytes**")
|
||||
),
|
||||
})?
|
||||
.context(ReadlineSnafu)?;
|
||||
|
||||
// Decode prev_version
|
||||
let v: VersionHeader = json::from_str(&first_line).context(DecodeJsonSnafu)?;
|
||||
action_list.prev_version = v.prev_version;
|
||||
}
|
||||
|
||||
// Decode actions
|
||||
let mut protocol_action = None;
|
||||
let mut actions = Vec::default();
|
||||
for line in lines {
|
||||
let line = &line.context(ReadlineSnafu)?;
|
||||
let action: TableMetaAction = json::from_str(line).context(DecodeJsonSnafu)?;
|
||||
|
||||
if let TableMetaAction::Protocol(p) = &action {
|
||||
ensure!(
|
||||
p.is_readable(reader_version),
|
||||
ManifestProtocolForbidReadSnafu {
|
||||
min_version: p.min_reader_version,
|
||||
supported_version: reader_version,
|
||||
}
|
||||
);
|
||||
protocol_action = Some(p.clone());
|
||||
}
|
||||
|
||||
actions.push(action);
|
||||
}
|
||||
action_list.actions = actions;
|
||||
|
||||
Ok((action_list, protocol_action))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {}
|
||||
@@ -22,8 +22,11 @@ use table::{
|
||||
table::Table,
|
||||
};
|
||||
|
||||
use crate::manifest::TableManifest;
|
||||
|
||||
/// [Table] implementation.
|
||||
pub struct MitoTable<R: Region> {
|
||||
_manifest: TableManifest,
|
||||
table_info: TableInfo,
|
||||
//TODO(dennis): a table contains multi regions
|
||||
region: R,
|
||||
@@ -139,7 +142,11 @@ impl Stream for ChunkStream {
|
||||
}
|
||||
|
||||
impl<R: Region> MitoTable<R> {
|
||||
pub fn new(table_info: TableInfo, region: R) -> Self {
|
||||
Self { table_info, region }
|
||||
pub fn new(table_info: TableInfo, region: R, manifest: TableManifest) -> Self {
|
||||
Self {
|
||||
table_info,
|
||||
region,
|
||||
_manifest: manifest,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,8 @@ use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use log_store::fs::noop::NoopLogStore;
|
||||
use storage::config::EngineConfig;
|
||||
use object_store::{backend::fs::Backend, ObjectStore};
|
||||
use storage::config::EngineConfig as StorageEngineConfig;
|
||||
use storage::EngineImpl;
|
||||
use table::engine::EngineContext;
|
||||
use table::engine::TableEngine;
|
||||
@@ -14,6 +15,7 @@ use table::requests::CreateTableRequest;
|
||||
use table::TableRef;
|
||||
use tempdir::TempDir;
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
use crate::engine::MitoEngine;
|
||||
use crate::table::test_util::mock_engine::MockEngine;
|
||||
|
||||
@@ -32,22 +34,30 @@ fn schema_for_test() -> Schema {
|
||||
|
||||
pub type MockMitoEngine = MitoEngine<MockEngine>;
|
||||
|
||||
pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) {
|
||||
let dir = TempDir::new(prefix).unwrap();
|
||||
let store_dir = dir.path().to_string_lossy();
|
||||
let accessor = Backend::build().root(&store_dir).finish().await.unwrap();
|
||||
|
||||
(dir, ObjectStore::new(accessor))
|
||||
}
|
||||
|
||||
pub async fn setup_test_engine_and_table() -> (
|
||||
MitoEngine<EngineImpl<NoopLogStore>>,
|
||||
TableRef,
|
||||
SchemaRef,
|
||||
TempDir,
|
||||
) {
|
||||
let dir = TempDir::new("setup_test_engine_and_table").unwrap();
|
||||
let store_dir = dir.path().to_string_lossy();
|
||||
let (dir, object_store) = new_test_object_store("setup_test_engine_and_table").await;
|
||||
|
||||
let table_engine = MitoEngine::new(
|
||||
EngineConfig::default(),
|
||||
EngineImpl::new(
|
||||
EngineConfig::with_store_dir(&store_dir),
|
||||
StorageEngineConfig::default(),
|
||||
Arc::new(NoopLogStore::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
object_store.clone(),
|
||||
),
|
||||
object_store,
|
||||
);
|
||||
|
||||
let schema = Arc::new(schema_for_test());
|
||||
@@ -68,9 +78,14 @@ pub async fn setup_test_engine_and_table() -> (
|
||||
(table_engine, table, schema, dir)
|
||||
}
|
||||
|
||||
pub async fn setup_mock_engine_and_table() -> (MockEngine, MockMitoEngine, TableRef) {
|
||||
pub async fn setup_mock_engine_and_table() -> (MockEngine, MockMitoEngine, TableRef, ObjectStore) {
|
||||
let mock_engine = MockEngine::default();
|
||||
let table_engine = MitoEngine::new(mock_engine.clone());
|
||||
let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;
|
||||
let table_engine = MitoEngine::new(
|
||||
EngineConfig::default(),
|
||||
mock_engine.clone(),
|
||||
object_store.clone(),
|
||||
);
|
||||
|
||||
let schema = Arc::new(schema_for_test());
|
||||
let table = table_engine
|
||||
@@ -87,5 +102,5 @@ pub async fn setup_mock_engine_and_table() -> (MockEngine, MockMitoEngine, Table
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
(mock_engine, table_engine, table)
|
||||
(mock_engine, table_engine, table, object_store)
|
||||
}
|
||||
|
||||
@@ -10,9 +10,9 @@ use common_telemetry::logging;
|
||||
use storage::metadata::{RegionMetaImpl, RegionMetadataRef};
|
||||
use storage::write_batch::WriteBatch;
|
||||
use store_api::storage::{
|
||||
Chunk, ChunkReader, EngineContext, GetRequest, GetResponse, OpenOptions, ReadContext, Region,
|
||||
RegionDescriptor, ScanRequest, ScanResponse, SchemaRef, Snapshot, StorageEngine, WriteContext,
|
||||
WriteResponse,
|
||||
Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse, OpenOptions,
|
||||
ReadContext, Region, RegionDescriptor, ScanRequest, ScanResponse, SchemaRef, Snapshot,
|
||||
StorageEngine, WriteContext, WriteResponse,
|
||||
};
|
||||
|
||||
pub type Result<T> = std::result::Result<T, MockError>;
|
||||
@@ -153,6 +153,7 @@ impl StorageEngine for MockEngine {
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
descriptor: RegionDescriptor,
|
||||
_opts: &CreateOptions,
|
||||
) -> Result<MockRegion> {
|
||||
logging::info!("Mock engine create region, descriptor: {:?}", descriptor);
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::collections::HashMap;
|
||||
use chrono::{DateTime, Utc};
|
||||
use datatypes::schema::SchemaRef;
|
||||
use derive_builder::Builder;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
pub type TableId = u64;
|
||||
@@ -10,7 +11,7 @@ pub type TableVersion = u64;
|
||||
|
||||
/// Indicates whether and how a filter expression can be handled by a
|
||||
/// Table for table scans.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
pub enum FilterPushDownType {
|
||||
/// The expression cannot be used by the provider.
|
||||
Unsupported,
|
||||
@@ -26,7 +27,7 @@ pub enum FilterPushDownType {
|
||||
}
|
||||
|
||||
/// Indicates the type of this table for metadata/catalog purposes.
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
|
||||
pub enum TableType {
|
||||
/// An ordinary physical table.
|
||||
Base,
|
||||
@@ -36,13 +37,13 @@ pub enum TableType {
|
||||
Temporary,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Default)]
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
|
||||
pub struct TableIdent {
|
||||
pub table_id: TableId,
|
||||
pub version: TableVersion,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Builder)]
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, Builder, PartialEq)]
|
||||
#[builder(pattern = "mutable")]
|
||||
pub struct TableMeta {
|
||||
pub schema: SchemaRef,
|
||||
@@ -90,7 +91,7 @@ impl TableMeta {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Builder)]
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Builder)]
|
||||
#[builder(pattern = "owned")]
|
||||
pub struct TableInfo {
|
||||
#[builder(default, setter(into))]
|
||||
|
||||
Reference in New Issue
Block a user