fix: compile error and remove region id

This commit is contained in:
Dennis Zhuang
2022-08-11 17:17:55 +08:00
parent dcdbef534c
commit c14c250f8a
18 changed files with 53 additions and 90 deletions

View File

@@ -65,6 +65,7 @@ impl SystemCatalogTable {
} else {
// system catalog table is not yet created, try to create
let request = CreateTableRequest {
table_id: SYSTEM_CATALOG_TABLE_ID,
name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
desc: Some("System catalog table".to_string()),
schema: schema.clone(),

View File

@@ -40,7 +40,7 @@ pub struct Instance {
pub type InstanceRef = Arc<Instance>;
impl Instance {
pub async fn new(opts: &DatanodeOptions, catalog_list: CatalogListRef) -> Result<Self> {
pub async fn new(opts: &DatanodeOptions) -> Result<Self> {
let object_store = new_object_store(&opts.store_config).await?;
let log_store = create_local_file_log_store(opts).await?;
@@ -146,6 +146,7 @@ impl Instance {
.create_table(
&EngineContext::default(),
CreateTableRequest {
table_id: 1,
name: table_name.to_string(),
desc: Some(" a test table".to_string()),
schema: Arc::new(
@@ -224,7 +225,7 @@ mod tests {
#[tokio::test]
async fn test_execute_insert() {
common_telemetry::init_default_ut_logging();
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
let (opts, _wal_dir, _data_dir) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Instance::new(&opts).await.unwrap();
instance.start().await.unwrap();
@@ -243,7 +244,7 @@ mod tests {
#[tokio::test]
async fn test_execute_query() {
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
let (opts, _wal_dir, _data_dir) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Instance::new(&opts).await.unwrap();
instance.start().await.unwrap();

View File

@@ -59,7 +59,7 @@ mod tests {
}
async fn create_extension() -> Extension<InstanceRef> {
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
let (opts, _wal_dir, _data_dir) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Arc::new(Instance::new(&opts).await.unwrap());
instance.start().await.unwrap();
Extension(instance)

View File

@@ -49,6 +49,9 @@ impl<Engine: TableEngine> SqlHandler<Engine> {
table_name: &table_name,
})?;
let schema = table.schema();
println!("{:?}", schema);
let columns_num = if columns.is_empty() {
schema.column_schemas().len()
} else {

View File

@@ -1,17 +1,21 @@
use tempdir::TempDir;
use crate::datanode::DatanodeOptions;
use crate::datanode::{DatanodeOptions, FileStoreConfig, ObjectStoreConfig};
/// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`,
/// Only for test.
///
/// TODO: Add a test feature
pub fn create_tmp_dir_and_datanode_opts() -> (DatanodeOptions, TempDir) {
let tmp_dir = TempDir::new("/tmp/greptimedb_test").unwrap();
pub fn create_tmp_dir_and_datanode_opts() -> (DatanodeOptions, TempDir, TempDir) {
let wal_tmp_dir = TempDir::new("/tmp/greptimedb_test_wal").unwrap();
let data_tmp_dir = TempDir::new("/tmp/greptimedb_test_data").unwrap();
let opts = DatanodeOptions {
wal_dir: tmp_dir.path().to_str().unwrap().to_string(),
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
store_config: ObjectStoreConfig::File(FileStoreConfig {
store_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
}),
..Default::default()
};
(opts, tmp_dir)
(opts, wal_tmp_dir, data_tmp_dir)
}

View File

@@ -11,7 +11,7 @@ use crate::server::http::HttpServer;
use crate::test_util;
async fn make_test_app() -> Router {
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
let (opts, _wal_dir, _data_dir) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Arc::new(Instance::new(&opts).await.unwrap());
instance.start().await.unwrap();
let http_server = HttpServer::new(instance);

View File

@@ -37,7 +37,6 @@ impl RegionDescBuilder {
pub fn build(self) -> RegionDescriptor {
RegionDescriptor {
id: 0,
name: self.name,
row_key: self.key_builder.build().unwrap(),
default_cf: self.default_cf_builder.build().unwrap(),

View File

@@ -6,7 +6,6 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::action::{ProtocolAction, ProtocolVersion, VersionHeader};
use store_api::manifest::ManifestVersion;
use store_api::manifest::MetaAction;
use store_api::storage::RegionId;
use store_api::storage::SequenceNumber;
use crate::error::{
@@ -24,7 +23,7 @@ pub struct RegionChange {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct RegionRemove {
pub region_id: RegionId,
pub region_name: String,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]

View File

@@ -9,7 +9,6 @@ use crate::test_util::descriptor_util::RegionDescBuilder;
pub fn build_region_meta() -> RegionMetadata {
let region_name = "region-0";
let desc = RegionDescBuilder::new(region_name)
.id(0)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_value_column(("v1", LogicalTypeId::Float32, true))
.build();

View File

@@ -7,8 +7,7 @@ use serde::{Deserialize, Serialize};
use snafu::ensure;
use store_api::storage::{
consts, ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyId,
ColumnId, ColumnSchema, RegionDescriptor, RegionId, RegionMeta, RowKeyDescriptor, Schema,
SchemaRef,
ColumnId, ColumnSchema, RegionDescriptor, RegionMeta, RowKeyDescriptor, Schema, SchemaRef,
};
/// Error for handling metadata.
@@ -59,8 +58,6 @@ pub type VersionNumber = u32;
/// In memory metadata of region.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct RegionMetadata {
// The following fields are immutable.
id: RegionId,
name: String,
// The following fields are mutable.
@@ -79,11 +76,6 @@ pub struct RegionMetadata {
}
impl RegionMetadata {
#[inline]
pub fn id(&self) -> RegionId {
self.id
}
#[inline]
pub fn name(&self) -> &str {
&self.name
@@ -178,7 +170,6 @@ impl TryFrom<RegionDescriptor> for RegionMetadata {
// created from descriptor, using initial version is reasonable.
let mut builder = RegionMetadataBuilder::new()
.name(desc.name)
.id(desc.id)
.row_key(desc.row_key)?
.add_column_family(desc.default_cf)?;
for cf in desc.extra_cfs {
@@ -191,7 +182,6 @@ impl TryFrom<RegionDescriptor> for RegionMetadata {
#[derive(Default)]
struct RegionMetadataBuilder {
id: RegionId,
name: String,
columns: Vec<ColumnMetadata>,
column_schemas: Vec<ColumnSchema>,
@@ -213,11 +203,6 @@ impl RegionMetadataBuilder {
self
}
fn id(mut self, id: RegionId) -> Self {
self.id = id;
self
}
fn row_key(mut self, key: RowKeyDescriptor) -> Result<Self> {
for col in key.columns {
self.push_row_key_column(col)?;
@@ -293,7 +278,6 @@ impl RegionMetadataBuilder {
});
Ok(RegionMetadata {
id: self.id,
name: self.name,
schema,
columns_row_key,

View File

@@ -13,7 +13,7 @@ use store_api::manifest::{
self, action::ProtocolAction, Manifest, ManifestVersion, MetaActionIterator,
};
use store_api::storage::{
OpenOptions, ReadContext, Region, RegionId, RegionMeta, WriteContext, WriteResponse,
OpenOptions, ReadContext, Region, RegionMeta, WriteContext, WriteResponse,
};
use crate::error::{self, Error, Result};
@@ -117,14 +117,12 @@ impl<S: LogStore> RegionImpl<S> {
/// Create a new region without persisting manifest.
fn new(version: Version, store_config: StoreConfig<S>) -> RegionImpl<S> {
let metadata = version.metadata();
let id = metadata.id();
let name = metadata.name().to_string();
let version_control = VersionControl::with_version(version);
let wal = Wal::new(name.clone(), store_config.log_store);
let inner = Arc::new(RegionInner {
shared: Arc::new(SharedData {
id,
name,
version_control: Arc::new(version_control),
}),
@@ -158,11 +156,9 @@ impl<S: LogStore> RegionImpl<S> {
version
);
let metadata = version.metadata().clone();
let version_control = Arc::new(VersionControl::with_version(version));
let wal = Wal::new(name.clone(), store_config.log_store);
let shared = Arc::new(SharedData {
id: metadata.id(),
name,
version_control,
});
@@ -282,20 +278,14 @@ impl<S: LogStore> RegionImpl<S> {
/// Shared data of region.
#[derive(Debug)]
pub struct SharedData {
// Region id and name is immutable, so we cache them in shared data to avoid loading
// Region ame is immutable, so we cache them in shared data to avoid loading
// current version from `version_control` each time we need to access them.
id: RegionId,
name: String,
// TODO(yingwen): Maybe no need to use Arc for version control.
pub version_control: VersionControlRef,
}
impl SharedData {
#[inline]
pub fn id(&self) -> RegionId {
self.id
}
#[inline]
pub fn name(&self) -> &str {
&self.name

View File

@@ -222,10 +222,9 @@ impl WriterInner {
last_sequence = req_sequence;
} else {
logging::error!(
"Sequence should not decrease during replay, found {} < {}, region_id: {}, region_name: {}",
"Sequence should not decrease during replay, found {} < {}, region_name: {}",
req_sequence,
last_sequence,
writer_ctx.shared.id,
writer_ctx.shared.name,
);
@@ -247,8 +246,7 @@ impl WriterInner {
}
logging::info!(
"Region replay finished, region_id: {}, region_name: {}, flushed_sequence: {}, last_sequence: {}, num_requests: {}",
writer_ctx.shared.id,
"Region replay finished, region_name: {}, flushed_sequence: {}, last_sequence: {}, num_requests: {}",
writer_ctx.shared.name,
flushed_sequence,
last_sequence,

View File

@@ -1,14 +1,13 @@
use datatypes::prelude::ConcreteDataType;
use store_api::storage::{
ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, ColumnId,
RegionDescriptor, RegionId, RowKeyDescriptorBuilder,
RegionDescriptor, RowKeyDescriptorBuilder,
};
use crate::test_util::{self, schema_util::ColumnDef};
/// A RegionDescriptor builder for test.
pub struct RegionDescBuilder {
id: RegionId,
name: String,
last_column_id: ColumnId,
key_builder: RowKeyDescriptorBuilder,
@@ -29,7 +28,6 @@ impl RegionDescBuilder {
);
Self {
id: 0,
name: name.into(),
last_column_id: 2,
key_builder,
@@ -37,11 +35,6 @@ impl RegionDescBuilder {
}
}
pub fn id(mut self, id: RegionId) -> Self {
self.id = id;
self
}
// This will reset the row key builder, so should be called before `push_key_column()`
// and `enable_version_column()`, or just call after `new()`.
//
@@ -78,7 +71,6 @@ impl RegionDescBuilder {
pub fn build(self) -> RegionDescriptor {
RegionDescriptor {
id: self.id,
name: self.name,
row_key: self.key_builder.build().unwrap(),
default_cf: self.default_cf_builder.build().unwrap(),

View File

@@ -8,8 +8,6 @@ use crate::storage::{consts, ColumnSchema, ConcreteDataType};
pub type ColumnId = u32;
/// Id of column family, unique in each region.
pub type ColumnFamilyId = u32;
/// Id of the region.
pub type RegionId = u32;
// TODO(yingwen): Validate default value has same type with column, and name is a valid column name.
/// A [ColumnDescriptor] contains information to create a column.
@@ -80,7 +78,6 @@ pub struct ColumnFamilyDescriptor {
#[derive(Debug, Clone, PartialEq, Builder)]
#[builder(pattern = "owned")]
pub struct RegionDescriptor {
pub id: RegionId,
/// Region name.
#[builder(setter(into))]
pub name: String,

View File

@@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::RwLock;
@@ -10,8 +9,8 @@ use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{
self, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId,
CreateOptions, OpenOptions, Region, RegionDescriptorBuilder, RegionId, RegionMeta,
RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine,
CreateOptions, OpenOptions, Region, RegionDescriptorBuilder, RegionMeta, RowKeyDescriptor,
RowKeyDescriptorBuilder, StorageEngine,
};
use table::engine::{EngineContext, TableEngine};
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
@@ -25,7 +24,7 @@ use tokio::sync::Mutex;
use crate::config::EngineConfig;
use crate::error::{
self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu,
BuildRowKeyDescriptorSnafu, MissingTimestampIndexSnafu, Result, TableExistsSnafu
BuildRowKeyDescriptorSnafu, MissingTimestampIndexSnafu, Result, TableExistsSnafu,
};
use crate::table::MitoTable;
@@ -34,8 +33,8 @@ const INIT_COLUMN_ID: ColumnId = 0;
const INIT_TABLE_VERSION: TableVersion = 0;
#[inline]
fn region_name(id: RegionId) -> String {
format!("{:010}", id)
fn region_name(table_id: TableId, n: u32) -> String {
format!("{}_{:010}", table_id, n)
}
#[inline]
@@ -115,8 +114,6 @@ struct MitoEngineInner<S: StorageEngine> {
tables: RwLock<HashMap<String, TableRef>>,
object_store: ObjectStore,
storage_engine: S,
// FIXME(yingwen): Remove `next_table_id`. Table id should be assigned by other module (maybe catalog).
next_table_id: AtomicU64,
/// Table mutex is used to protect the operations such as creating/opening/closing
/// a table, to avoid things like opening the same table simultaneously.
table_mutex: Mutex<()>,
@@ -249,11 +246,13 @@ impl<S: StorageEngine> MitoEngineInner<S> {
build_column_family_from_request(INIT_COLUMN_ID, &request)?;
let (next_column_id, row_key) = build_row_key_desc_from_schema(next_column_id, &request)?;
let _lock = self.table_mutex.lock().await;
let table_id = request.table_id;
// TODO(dennis): supports multi regions;
let region_id = 0;
let region_name = region_name(region_id);
let region_number = 0;
let region_name = region_name(table_id, region_number);
let region_descriptor = RegionDescriptorBuilder::default()
.id(region_id)
.name(&region_name)
.row_key(row_key)
.default_cf(default_cf)
@@ -262,8 +261,6 @@ impl<S: StorageEngine> MitoEngineInner<S> {
table_name,
region_name,
})?;
let _lock = self.table_mutex.lock().await;
// Checks again, read lock should be enough since we are guarded by the mutex.
if let Some(table) = self.get_table(table_name) {
if request.create_if_not_exists {
@@ -294,7 +291,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.context(error::BuildTableMetaSnafu { table_name })?;
let table_info = TableInfoBuilder::new(table_name.clone(), table_meta)
.ident(self.next_table_id())
.ident(table_id)
.table_version(INIT_TABLE_VERSION)
.table_type(TableType::Base)
.desc(request.desc)
@@ -340,9 +337,10 @@ impl<S: StorageEngine> MitoEngineInner<S> {
parent_dir: table_dir(table_name),
};
let table_id = request.table_id;
// TODO(dennis): supports multi regions
let region_id = 0;
let region_name = region_name(region_id);
let region_number = 0;
let region_name = region_name(table_id, region_number);
let region = match self
.storage_engine
@@ -381,14 +379,9 @@ impl<S: StorageEngine> MitoEngineInner<S> {
tables: RwLock::new(HashMap::default()),
storage_engine,
object_store,
next_table_id: AtomicU64::new(0),
table_mutex: Mutex::new(()),
}
}
fn next_table_id(&self) -> TableId {
self.next_table_id.fetch_add(1, Ordering::Relaxed)
}
}
#[cfg(test)]
@@ -406,10 +399,10 @@ mod tests {
#[test]
fn test_region_name() {
assert_eq!("0000000000", region_name(0));
assert_eq!("0000000001", region_name(1));
assert_eq!("0000000100", region_name(100));
assert_eq!("0000009999", region_name(9999));
assert_eq!("0_0000000000", region_name(0, 0));
assert_eq!("1_0000000001", region_name(1, 1));
assert_eq!("10_0000000100", region_name(10, 100));
assert_eq!("7_0000009999", region_name(7, 9999));
}
#[test]
@@ -483,6 +476,7 @@ mod tests {
let table_info = table.table_info();
let request = CreateTableRequest {
table_id: 1,
name: table_info.name.to_string(),
schema: table_info.meta.schema.clone(),
create_if_not_exists: true,
@@ -502,6 +496,7 @@ mod tests {
// test create_if_not_exists=false
let request = CreateTableRequest {
table_id: 1,
name: table_info.name.to_string(),
schema: table_info.meta.schema.clone(),
create_if_not_exists: false,
@@ -520,12 +515,12 @@ mod tests {
common_telemetry::init_default_ut_logging();
let ctx = EngineContext::default();
let table_id = 1;
let open_req = OpenTableRequest {
catalog_name: String::new(),
schema_name: String::new(),
table_name: test_util::TABLE_NAME.to_string(),
// Currently the first table has id 0.
table_id: 0,
table_id,
};
let (engine, table, object_store, _dir) = {

View File

@@ -155,8 +155,6 @@ impl ErrorExt for Error {
| TableExists { .. }
| MissingTimestampIndex { .. } => StatusCode::InvalidArguments,
UpdateTableManifest { .. } => StatusCode::StorageUnavailable,
TableInfoNotFound { .. } => StatusCode::Unexpected,
ScanTableManifest { .. } | UpdateTableManifest { .. } => StatusCode::StorageUnavailable,

View File

@@ -18,7 +18,7 @@ use tempdir::TempDir;
use crate::config::EngineConfig;
use crate::engine::MitoEngine;
use crate::engine::DEFAULT_ENGINE;
use crate::engine::MITO_ENGINE;
pub use crate::table::test_util::mock_engine::MockEngine;
pub use crate::table::test_util::mock_engine::MockRegion;
@@ -40,7 +40,7 @@ pub type MockMitoEngine = MitoEngine<MockEngine>;
pub fn build_test_table_info() -> TableInfo {
let table_meta = TableMetaBuilder::default()
.schema(Arc::new(schema_for_test()))
.engine(DEFAULT_ENGINE)
.engine(MITO_ENGINE)
.next_column_id(1)
.primary_key_indices(vec![0, 1])
.build()
@@ -85,6 +85,7 @@ pub async fn setup_test_engine_and_table() -> (
.create_table(
&EngineContext::default(),
CreateTableRequest {
table_id: 1,
name: TABLE_NAME.to_string(),
desc: Some("a test table".to_string()),
schema: schema.clone(),
@@ -113,6 +114,7 @@ pub async fn setup_mock_engine_and_table(
.create_table(
&EngineContext::default(),
CreateTableRequest {
table_id: 1,
name: TABLE_NAME.to_string(),
desc: None,
schema: schema.clone(),

View File

@@ -15,6 +15,7 @@ pub struct InsertRequest {
/// Create table request
#[derive(Debug)]
pub struct CreateTableRequest {
pub table_id: TableId,
pub name: String,
pub desc: Option<String>,
pub schema: SchemaRef,