feat: support multi regions on datanode (#653)

* wip: fix compile errors

* chore: move splitter to partition crate

* fix: remove useless variants in frontend errors

* chore: move more partition related code to partition manager

* fix: license header

* wip: move WriteSplitter to PartitionRuleManager

* fix: clippy warnings

* chore: remove useless error variant and format toml

* fix: cr comments

* chore: resolve conflicts

* chore: rebase develop

* fix: cr comments

* feat: support multi regions on datanode

* chore: rebase onto develop

* chore: rebase develop

* chore: rebase develop

* wip

* fix: compile errors

* feat: multi region

* fix: CR comments

* feat: allow stat existing regions without actually open it

* fix: use table meta in manifest to recover region info
This commit is contained in:
Lei, HUANG
2023-02-07 10:46:18 +08:00
committed by GitHub
parent 7d77913e88
commit 5d62e193bd
26 changed files with 383 additions and 227 deletions

1
Cargo.lock generated
View File

@@ -4724,6 +4724,7 @@ dependencies = [
name = "partition"
version = "0.1.0"
dependencies = [
"common-catalog",
"common-error",
"common-query",
"datafusion",

View File

@@ -167,11 +167,6 @@ pub struct RegisterSchemaRequest {
pub schema: String,
}
/// Formats table fully-qualified name
pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> String {
format!("{catalog}.{schema}.{table}")
}
pub trait CatalogProviderFactory {
fn create(&self, catalog_name: String) -> CatalogProviderRef;
}
@@ -198,8 +193,10 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
.create_table(&EngineContext::default(), req.create_table_request.clone())
.await
.with_context(|_| CreateTableSnafu {
table_info: format!(
"{catalog_name}.{schema_name}.{table_name}, id: {table_id}",
table_info: common_catalog::format_full_table_name(
catalog_name,
schema_name,
table_name,
),
})?;
manager

View File

@@ -20,6 +20,7 @@ use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MIN_USER_TABLE_ID,
SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME,
};
use common_catalog::format_full_table_name;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::{error, info};
use datatypes::prelude::ScalarVector;
@@ -45,10 +46,9 @@ use crate::system::{
};
use crate::tables::SystemCatalog;
use crate::{
format_full_table_name, handle_system_table_request, CatalogList, CatalogManager,
CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest,
RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, SchemaProvider,
SchemaProviderRef,
handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef,
DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest,
RegisterTableRequest, RenameTableRequest, SchemaProvider, SchemaProviderRef,
};
/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
@@ -252,7 +252,6 @@ impl LocalCatalogManager {
schema_name: t.schema_name.clone(),
table_name: t.table_name.clone(),
table_id: t.table_id,
region_numbers: vec![0],
};
let option = self

View File

@@ -324,7 +324,6 @@ impl RemoteCatalogManager {
schema_name: schema_name.clone(),
table_name: table_name.clone(),
table_id,
region_numbers: region_numbers.clone(),
};
match self
.engine

View File

@@ -87,7 +87,6 @@ impl SystemCatalogTable {
schema_name: INFORMATION_SCHEMA_NAME.to_string(),
table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
table_id: SYSTEM_CATALOG_TABLE_ID,
region_numbers: vec![0],
};
let schema = Arc::new(build_system_catalog_schema());
let ctx = EngineContext::default();
@@ -271,6 +270,7 @@ pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) ->
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
columns_values,
region_number: 0, // system catalog table has only one region
}
}

View File

@@ -14,3 +14,9 @@
pub mod consts;
pub mod error;
/// Formats table fully-qualified name
#[inline]
pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> String {
format!("{catalog}.{schema}.{table}")
}

View File

@@ -306,6 +306,7 @@ pub fn to_table_insert_request(
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
columns_values,
region_number: request.region_number,
})
}
@@ -439,6 +440,7 @@ fn is_null(null_mask: &BitVec, idx: usize) -> Option<bool> {
mod tests {
use std::any::Any;
use std::sync::Arc;
use std::{assert_eq, unimplemented, vec};
use api::helper::ColumnDataTypeWrapper;
use api::v1::column::{self, SemanticType, Values};

View File

@@ -120,6 +120,7 @@ impl SqlHandler {
.into_iter()
.map(|(cs, mut b)| (cs.name.to_string(), b.to_vector()))
.collect(),
region_number: 0,
}))
}
}

View File

@@ -344,6 +344,7 @@ pub async fn test_execute_create() {
#[tokio::test]
async fn test_rename_table() {
common_telemetry::init_default_ut_logging();
let instance = MockInstance::new("test_rename_table_local").await;
let output = execute_sql(&instance, "create database db").await;

View File

@@ -28,7 +28,7 @@ use crate::error::{self, Result};
const DEFAULT_PLACEHOLDER_VALUE: &str = "default";
// TODO(fys): Extract the common logic in datanode and frontend in the future.
#[allow(dead_code)]
// This function convert insert statement to an `InsertRequest` to region 0.
pub(crate) fn insert_to_request(table: &TableRef, stmt: Insert) -> Result<InsertRequest> {
let columns = stmt.columns();
let values = stmt.values().context(error::ParseSqlSnafu)?;
@@ -86,6 +86,7 @@ pub(crate) fn insert_to_request(table: &TableRef, stmt: Insert) -> Result<Insert
.into_iter()
.map(|(cs, mut b)| (cs.name.to_string(), b.to_vector()))
.collect(),
region_number: 0,
})
}

View File

@@ -75,7 +75,7 @@ impl Table for DistTable {
}
async fn insert(&self, request: InsertRequest) -> table::Result<usize> {
let split = self
let splits = self
.partition_manager
.split_insert_request(&self.table_name, request)
.await
@@ -83,7 +83,7 @@ impl Table for DistTable {
.context(TableOperationSnafu)?;
let output = self
.dist_insert(split)
.dist_insert(splits)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
@@ -123,7 +123,6 @@ impl Table for DistTable {
let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client);
let datanode_instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db);
// TODO(LFC): Pass in "regions" when Datanode supports multi regions for a table.
partition_execs.push(Arc::new(PartitionExec {
table_name: table_name.clone(),
datanode_instance,
@@ -852,14 +851,21 @@ mod test {
(2, (30..35).collect::<Vec<i32>>()),
(3, (100..105).collect::<Vec<i32>>()),
];
for (region_id, numbers) in regional_numbers {
let datanode_id = *region_to_datanode_mapping.get(&region_id).unwrap();
for (region_number, numbers) in regional_numbers {
let datanode_id = *region_to_datanode_mapping.get(&region_number).unwrap();
let instance = datanode_instances.get(&datanode_id).unwrap().clone();
let start_ts = global_start_ts;
global_start_ts += numbers.len() as i64;
insert_testing_data(&table_name, instance.clone(), numbers, start_ts).await;
insert_testing_data(
&table_name,
instance.clone(),
numbers,
start_ts,
region_number,
)
.await;
}
let meta = TableMetaBuilder::default()
@@ -887,6 +893,7 @@ mod test {
dn_instance: Arc<Instance>,
data: Vec<i32>,
start_ts: i64,
region_number: RegionNumber,
) {
let row_count = data.len() as u32;
let columns = vec![
@@ -923,7 +930,7 @@ mod test {
table_name: table_name.table_name.clone(),
columns,
row_count,
region_number: 0,
region_number,
};
dn_instance
.handle_insert(request, QueryContext::arc())

View File

@@ -62,8 +62,7 @@ impl DistTable {
let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client);
let instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db);
// TODO(fys): a separate runtime should be used here.
let join = tokio::spawn(async move {
let join = common_runtime::spawn_write(async move {
instance
.grpc_insert(to_grpc_insert_request(region_id, insert)?)
.await
@@ -186,6 +185,7 @@ mod tests {
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "demo".to_string(),
columns_values,
region_number: 0,
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use api::v1::meta::{
CreateRequest as PbCreateRequest, DeleteRequest as PbDeleteRequest, Partition as PbPartition,
@@ -170,12 +170,12 @@ pub struct TableRoute {
}
impl TableRoute {
pub fn find_leaders(&self) -> Vec<Peer> {
pub fn find_leaders(&self) -> HashSet<Peer> {
self.region_routes
.iter()
.flat_map(|x| &x.leader_peer)
.cloned()
.collect::<Vec<Peer>>()
.collect()
}
pub fn find_leader_regions(&self, datanode: &Peer) -> Vec<u32> {
@@ -189,7 +189,7 @@ impl TableRoute {
}
None
})
.collect::<Vec<u32>>()
.collect()
}
}

View File

@@ -17,17 +17,21 @@ use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_telemetry::logging;
use common_telemetry::tracing::log::info;
use common_telemetry::{debug, logging};
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{
ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId,
CreateOptions, EngineContext as StorageEngineContext, OpenOptions, RegionDescriptorBuilder,
RegionId, RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine,
CreateOptions, EngineContext as StorageEngineContext, OpenOptions, Region,
RegionDescriptorBuilder, RegionId, RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine,
};
use table::engine::{EngineContext, TableEngine, TableReference};
use table::metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion};
use table::error::TableOperationSnafu;
use table::metadata::{
TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion,
};
use table::requests::{
AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
};
@@ -38,9 +42,10 @@ use tokio::sync::Mutex;
use crate::config::EngineConfig;
use crate::error::{
self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu,
BuildRowKeyDescriptorSnafu, InvalidPrimaryKeySnafu, MissingTimestampIndexSnafu, Result,
TableExistsSnafu,
BuildRowKeyDescriptorSnafu, InvalidPrimaryKeySnafu, MissingTimestampIndexSnafu,
RegionNotFoundSnafu, Result, TableExistsSnafu,
};
use crate::manifest::TableManifest;
use crate::table::MitoTable;
pub const MITO_ENGINE: &str = "mito";
@@ -331,51 +336,53 @@ impl<S: StorageEngine> MitoEngineInner<S> {
)?;
let table_id = request.id;
// TODO(dennis): supports multi regions;
assert_eq!(1, request.region_numbers.len());
let region_number = request.region_numbers[0];
let region_id = region_id(table_id, region_number);
let region_name = region_name(table_id, region_number);
let region_descriptor = RegionDescriptorBuilder::default()
.id(region_id)
.name(&region_name)
.row_key(row_key)
.default_cf(default_cf)
.build()
.context(BuildRegionDescriptorSnafu {
table_name,
region_name,
})?;
let table_dir = table_dir(schema_name, table_id);
let mut regions = HashMap::with_capacity(request.region_numbers.len());
let _lock = self.table_mutex.lock().await;
// Checks again, read lock should be enough since we are guarded by the mutex.
if let Some(table) = self.get_table(&table_ref) {
if request.create_if_not_exists {
return Ok(table);
return if request.create_if_not_exists {
Ok(table)
} else {
return TableExistsSnafu { table_name }.fail();
}
TableExistsSnafu { table_name }.fail()
};
}
let table_dir = table_dir(schema_name, table_id);
let opts = CreateOptions {
parent_dir: table_dir.clone(),
};
for region_number in &request.region_numbers {
let region_id = region_id(table_id, *region_number);
let region = self
.storage_engine
.create_region(&StorageEngineContext::default(), region_descriptor, &opts)
.await
.map_err(BoxedError::new)
.context(error::CreateRegionSnafu)?;
let region_name = region_name(table_id, *region_number);
let region_descriptor = RegionDescriptorBuilder::default()
.id(region_id)
.name(&region_name)
.row_key(row_key.clone())
.default_cf(default_cf.clone())
.build()
.context(BuildRegionDescriptorSnafu {
table_name,
region_name,
})?;
let opts = CreateOptions {
parent_dir: table_dir.clone(),
};
let region = self
.storage_engine
.create_region(&StorageEngineContext::default(), region_descriptor, &opts)
.await
.map_err(BoxedError::new)
.context(error::CreateRegionSnafu)?;
info!("Mito engine created region: {:?}", region.id());
regions.insert(*region_number, region);
}
let table_meta = TableMetaBuilder::default()
.schema(request.schema)
.engine(MITO_ENGINE)
.next_column_id(next_column_id)
.primary_key_indices(request.primary_key_indices.clone())
.region_numbers(vec![region_number])
.region_numbers(request.region_numbers)
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
@@ -394,7 +401,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
table_name,
&table_dir,
table_info,
region,
regions,
self.object_store.clone(),
)
.await?,
@@ -444,28 +451,37 @@ impl<S: StorageEngine> MitoEngineInner<S> {
parent_dir: table_dir.to_string(),
};
// TODO(dennis): supports multi regions;
assert_eq!(request.region_numbers.len(), 1);
let region_number = request.region_numbers[0];
let region_name = region_name(table_id, region_number);
let Some((manifest, table_info)) = self
.recover_table_manifest_and_info(table_name, &table_dir)
.await? else { return Ok(None) };
let region = match self
.storage_engine
.open_region(&engine_ctx, &region_name, &opts)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?
{
None => return Ok(None),
Some(region) => region,
};
debug!(
"Opening table {}, table info recovered: {:?}",
table_id, table_info
);
let table = Arc::new(
MitoTable::open(table_name, &table_dir, region, self.object_store.clone())
let mut regions = HashMap::with_capacity(table_info.meta.region_numbers.len());
for region_number in &table_info.meta.region_numbers {
let region_name = region_name(table_id, *region_number);
let region = self
.storage_engine
.open_region(&engine_ctx, &region_name, &opts)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?,
);
.context(table_error::TableOperationSnafu)?
.with_context(|| RegionNotFoundSnafu {
table: format!(
"{}.{}.{}",
request.catalog_name, request.schema_name, request.table_name
),
region: *region_number,
})
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
regions.insert(*region_number, region);
}
let table = Arc::new(MitoTable::new(table_info, regions, manifest));
self.tables
.write()
@@ -479,6 +495,24 @@ impl<S: StorageEngine> MitoEngineInner<S> {
Ok(table)
}
async fn recover_table_manifest_and_info(
&self,
table_name: &str,
table_dir: &str,
) -> TableResult<Option<(TableManifest, TableInfo)>> {
let manifest = MitoTable::<<S as StorageEngine>::Region>::build_manifest(
table_dir,
self.object_store.clone(),
);
let Some(table_info) =
MitoTable::<<S as StorageEngine>::Region>::recover_table_info(table_name, &manifest)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)? else { return Ok(None) };
Ok(Some((manifest, table_info)))
}
fn get_table(&self, table_ref: &TableReference) -> Option<TableRef> {
self.tables
.read()
@@ -572,6 +606,7 @@ mod tests {
};
use log_store::NoopLogStore;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::region::RegionImpl;
use storage::EngineImpl;
use store_api::manifest::Manifest;
use store_api::storage::ReadContext;
@@ -580,7 +615,9 @@ mod tests {
use super::*;
use crate::table::test_util;
use crate::table::test_util::{new_insert_request, schema_for_test, MockRegion, TABLE_NAME};
use crate::table::test_util::{
new_insert_request, schema_for_test, TestEngineComponents, TABLE_NAME,
};
async fn setup_table_with_column_default_constraint() -> (TempDir, String, TableRef) {
let table_name = "test_default_constraint";
@@ -757,10 +794,14 @@ mod tests {
#[tokio::test]
async fn test_create_table_insert_scan() {
let (_engine, table, schema, _dir) = test_util::setup_test_engine_and_table().await;
let TestEngineComponents {
table_ref: table,
schema_ref,
dir: _dir,
..
} = test_util::setup_test_engine_and_table().await;
assert_eq!(TableType::Base, table.table_type());
assert_eq!(schema, table.schema());
assert_eq!(schema_ref, table.schema());
let insert_req = new_insert_request("demo".to_string(), HashMap::default());
assert_eq!(0, table.insert(insert_req).await.unwrap());
@@ -839,7 +880,11 @@ mod tests {
async fn test_create_table_scan_batches() {
common_telemetry::init_default_ut_logging();
let (_engine, table, _schema, _dir) = test_util::setup_test_engine_and_table().await;
let TestEngineComponents {
table_ref: table,
dir: _dir,
..
} = test_util::setup_test_engine_and_table().await;
// TODO(yingwen): Custom batch size once the table support setting batch_size.
let default_batch_size = ReadContext::default().batch_size;
@@ -933,12 +978,18 @@ mod tests {
table_name: test_util::TABLE_NAME.to_string(),
// the test table id is 1
table_id: 1,
region_numbers: vec![0],
};
let (engine, table, object_store, _dir) = {
let (engine, table_engine, table, object_store, dir) =
test_util::setup_mock_engine_and_table().await;
let (_engine, storage_engine, table, object_store, _dir) = {
let TestEngineComponents {
table_engine,
storage_engine,
table_ref: table,
object_store,
dir,
..
} = test_util::setup_test_engine_and_table().await;
assert_eq!(MITO_ENGINE, table_engine.name());
// Now try to open the table again.
let reopened = table_engine
@@ -948,11 +999,11 @@ mod tests {
.unwrap();
assert_eq!(table.schema(), reopened.schema());
(engine, table, object_store, dir)
(table_engine, storage_engine, table, object_store, dir)
};
// Construct a new table engine, and try to open the table.
let table_engine = MitoEngine::new(EngineConfig::default(), engine, object_store);
let table_engine = MitoEngine::new(EngineConfig::default(), storage_engine, object_store);
let reopened = table_engine
.open_table(&ctx, open_req.clone())
.await
@@ -962,11 +1013,13 @@ mod tests {
let reopened = reopened
.as_any()
.downcast_ref::<MitoTable<MockRegion>>()
.downcast_ref::<MitoTable<RegionImpl<NoopLogStore>>>()
.unwrap();
let left = table.table_info();
// assert recovered table_info is correct
assert_eq!(table.table_info(), reopened.table_info());
let right = reopened.table_info();
assert_eq!(left, right);
assert_eq!(reopened.manifest().last_version(), 1);
}
@@ -1092,8 +1145,13 @@ mod tests {
#[tokio::test]
async fn test_alter_rename_table() {
let (engine, table_engine, _table, object_store, _dir) =
test_util::setup_mock_engine_and_table().await;
let TestEngineComponents {
table_engine,
storage_engine,
object_store,
dir: _dir,
..
} = test_util::setup_test_engine_and_table().await;
let ctx = EngineContext::default();
// register another table
@@ -1143,13 +1201,12 @@ mod tests {
assert_eq!(table.table_info().name, new_table_name);
let table_engine = MitoEngine::new(EngineConfig::default(), engine, object_store);
let table_engine = MitoEngine::new(EngineConfig::default(), storage_engine, object_store);
let open_req = OpenTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: new_table_name.to_string(),
table_id: 1,
region_numbers: vec![0],
};
// test reopen table
@@ -1160,7 +1217,7 @@ mod tests {
.unwrap();
let reopened = reopened
.as_any()
.downcast_ref::<MitoTable<MockRegion>>()
.downcast_ref::<MitoTable<RegionImpl<NoopLogStore>>>()
.unwrap();
assert_eq!(reopened.table_info(), table.table_info());
assert_eq!(reopened.table_info().name, new_table_name);
@@ -1234,7 +1291,11 @@ mod tests {
#[tokio::test]
async fn test_table_delete_rows() {
let (_engine, table, _schema, _dir) = test_util::setup_test_engine_and_table().await;
let TestEngineComponents {
table_ref: table,
dir: _dir,
..
} = test_util::setup_test_engine_and_table().await;
let mut columns_values: HashMap<String, VectorRef> = HashMap::with_capacity(4);
let hosts: VectorRef =

View File

@@ -16,6 +16,7 @@ use std::any::Any;
use common_error::ext::BoxedError;
use common_error::prelude::*;
use store_api::storage::RegionNumber;
use table::metadata::{TableInfoBuilderError, TableMetaBuilderError};
#[derive(Debug, Snafu)]
@@ -154,7 +155,7 @@ pub enum Error {
},
#[snafu(display(
"Projected columnd not found in region, column: {}",
"Projected column not found in region, column: {}",
column_qualified_name
))]
ProjectedColumnNotFound {
@@ -170,6 +171,19 @@ pub enum Error {
#[snafu(backtrace)]
source: table::metadata::ConvertError,
},
#[snafu(display("Cannot find region, table: {}, region: {}", table, region))]
RegionNotFound {
table: String,
region: RegionNumber,
backtrace: Backtrace,
},
#[snafu(display("Invalid region name: {}", region_name))]
InvalidRegionName {
region_name: String,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -198,6 +212,8 @@ impl ErrorExt for Error {
TableInfoNotFound { .. } | ConvertRaw { .. } => StatusCode::Unexpected,
ScanTableManifest { .. } | UpdateTableManifest { .. } => StatusCode::StorageUnavailable,
RegionNotFound { .. } => StatusCode::Internal,
InvalidRegionName { .. } => StatusCode::Internal,
}
}

View File

@@ -16,6 +16,7 @@
pub mod test_util;
use std::any::Any;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
@@ -27,17 +28,18 @@ use common_query::physical_plan::PhysicalPlanRef;
use common_recordbatch::error::{ExternalSnafu, Result as RecordBatchResult};
use common_recordbatch::{RecordBatch, RecordBatchStream};
use common_telemetry::logging;
use datatypes::schema::Schema;
use futures::task::{Context, Poll};
use futures::Stream;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
use store_api::storage::{
AddColumn, AlterOperation, AlterRequest, ChunkReader, ReadContext, Region, RegionMeta,
ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest,
RegionNumber, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest,
};
use table::error as table_error;
use table::error::Result as TableResult;
use table::error::{RegionSchemaMismatchSnafu, Result as TableResult, TableOperationSnafu};
use table::metadata::{
FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType,
};
@@ -48,8 +50,9 @@ use table::table::scan::SimpleTableScan;
use table::table::{AlterContext, Table};
use tokio::sync::Mutex;
use crate::error;
use crate::error::{
self, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, TableInfoNotFoundSnafu,
ProjectedColumnNotFoundSnafu, RegionNotFoundSnafu, Result, ScanTableManifestSnafu,
UpdateTableManifestSnafu,
};
use crate::manifest::action::*;
@@ -65,8 +68,7 @@ pub struct MitoTable<R: Region> {
manifest: TableManifest,
// guarded by `self.alter_lock`
table_info: ArcSwap<TableInfo>,
// TODO(dennis): a table contains multi regions
region: R,
regions: HashMap<RegionNumber, R>,
alter_lock: Mutex<()>,
}
@@ -85,15 +87,29 @@ impl<R: Region> Table for MitoTable<R> {
return Ok(0);
}
let mut write_request = self.region.write_request();
let region = self
.regions
.get(&request.region_number)
.with_context(|| RegionNotFoundSnafu {
table: common_catalog::format_full_table_name(
&request.catalog_name,
&request.schema_name,
&request.table_name,
),
region: request.region_number,
})
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
let mut write_request = region.write_request();
let columns_values = request.columns_values;
// columns_values is not empty, it's safe to unwrap
let rows_num = columns_values.values().next().unwrap().len();
logging::trace!(
"Insert into table {} with data: {:?}",
"Insert into table {} region {} with data: {:?}",
self.table_info().name,
region.id(),
columns_values
);
@@ -102,8 +118,7 @@ impl<R: Region> Table for MitoTable<R> {
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
let _resp = self
.region
let _resp = region
.write(&WriteContext::default(), write_request)
.await
.map_err(BoxedError::new)
@@ -127,35 +142,64 @@ impl<R: Region> Table for MitoTable<R> {
_limit: Option<usize>,
) -> TableResult<PhysicalPlanRef> {
let read_ctx = ReadContext::default();
let snapshot = self
.region
.snapshot(&read_ctx)
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
let mut readers = Vec::with_capacity(self.regions.len());
let mut first_schema: Option<Arc<Schema>> = None;
let projection = self
.transform_projection(&self.region, projection.cloned())
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
let filters = filters.into();
let scan_request = ScanRequest {
projection,
filters,
..Default::default()
};
let mut reader = snapshot
.scan(&read_ctx, scan_request)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?
.reader;
let table_info = self.table_info.load();
// TODO(hl): Currently the API between frontend and datanode is under refactoring in
// https://github.com/GreptimeTeam/greptimedb/issues/597 . Once it's finished, query plan
// can carry filtered region info to avoid scanning all regions on datanode.
for region in self.regions.values() {
let snapshot = region
.snapshot(&read_ctx)
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
let projection = self
.transform_projection(region, projection.cloned())
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
let filters = filters.into();
let scan_request = ScanRequest {
projection,
filters,
..Default::default()
};
let reader = snapshot
.scan(&read_ctx, scan_request)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?
.reader;
let schema = reader.schema().clone();
let stream_schema = schema.clone();
let schema = reader.schema().clone();
if let Some(first_schema) = &first_schema {
// TODO(hl): we assume all regions' schemas are the same, but undergoing table altering
// may make these schemas inconsistent.
ensure!(
first_schema.version() == schema.version(),
RegionSchemaMismatchSnafu {
table: common_catalog::format_full_table_name(
&table_info.catalog_name,
&table_info.schema_name,
&table_info.name
)
}
);
} else {
first_schema = Some(schema);
}
readers.push(reader);
}
// TODO(hl): we assume table contains at least one region, but with region migration this
// assumption may become invalid.
let stream_schema = first_schema.unwrap();
let schema = stream_schema.clone();
let stream = Box::pin(async_stream::try_stream! {
while let Some(chunk) = reader.next_chunk().await.map_err(BoxedError::new).context(ExternalSnafu)? {
yield RecordBatch::new(stream_schema.clone(), chunk.columns)?
for mut reader in readers {
while let Some(chunk) = reader.next_chunk().await.map_err(BoxedError::new).context(ExternalSnafu)? {
yield RecordBatch::new(stream_schema.clone(), chunk.columns)?
}
}
});
@@ -218,24 +262,26 @@ impl<R: Region> Table for MitoTable<R> {
{
// TODO(yingwen): Error handling. Maybe the region need to provide a method to
// validate the request first.
let region = self.region();
let region_meta = region.in_memory_metadata();
let alter_req = AlterRequest {
operation: alter_op,
version: region_meta.version(),
};
// Alter the region.
logging::debug!(
"start altering region {} of table {}, with request {:?}",
region.name(),
table_name,
alter_req,
);
region
.alter(alter_req)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
let regions = self.regions();
for region in regions.values() {
let region_meta = region.in_memory_metadata();
let alter_req = AlterRequest {
operation: alter_op.clone(),
version: region_meta.version(),
};
// Alter the region.
logging::debug!(
"start altering region {} of table {}, with request {:?}",
region.name(),
table_name,
alter_req,
);
region
.alter(alter_req)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
}
}
// Update in memory metadata of the table.
self.set_table_info(new_info);
@@ -247,30 +293,33 @@ impl<R: Region> Table for MitoTable<R> {
if request.key_column_values.is_empty() {
return Ok(0);
}
let mut rows_deleted = 0;
// TODO(hl): Should be tracked by procedure.
// TODO(hl): Parse delete request into region->keys instead of delete in each region
for region in self.regions.values() {
let mut write_request = region.write_request();
let key_column_values = request.key_column_values.clone();
// Safety: key_column_values isn't empty.
let rows_num = key_column_values.values().next().unwrap().len();
let mut write_request = self.region.write_request();
logging::trace!(
"Delete from table {} where key_columns are: {:?}",
self.table_info().name,
key_column_values
);
let key_column_values = request.key_column_values;
// Safety: key_column_values isn't empty.
let rows_num = key_column_values.values().next().unwrap().len();
logging::trace!(
"Delete from table {} where key_columns are: {:?}",
self.table_info().name,
key_column_values
);
write_request
.delete(key_column_values)
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
self.region
.write(&WriteContext::default(), write_request)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
Ok(rows_num)
write_request
.delete(key_column_values)
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
region
.write(&WriteContext::default(), write_request)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
rows_deleted += rows_num;
}
Ok(rows_deleted)
}
}
@@ -299,10 +348,14 @@ fn column_qualified_name(table_name: &str, region_name: &str, column_name: &str)
}
impl<R: Region> MitoTable<R> {
fn new(table_info: TableInfo, region: R, manifest: TableManifest) -> Self {
pub(crate) fn new(
table_info: TableInfo,
regions: HashMap<RegionNumber, R>,
manifest: TableManifest,
) -> Self {
Self {
table_info: ArcSwap::new(Arc::new(table_info)),
region,
regions,
manifest,
alter_lock: Mutex::new(()),
}
@@ -368,7 +421,7 @@ impl<R: Region> MitoTable<R> {
table_name: &str,
table_dir: &str,
table_info: TableInfo,
region: R,
regions: HashMap<RegionNumber, R>,
object_store: ObjectStore,
) -> Result<MitoTable<R>> {
let manifest = TableManifest::new(&table_manifest_dir(table_dir), object_store);
@@ -383,25 +436,14 @@ impl<R: Region> MitoTable<R> {
.await
.context(UpdateTableManifestSnafu { table_name })?;
Ok(MitoTable::new(table_info, region, manifest))
Ok(MitoTable::new(table_info, regions, manifest))
}
pub async fn open(
table_name: &str,
table_dir: &str,
region: R,
object_store: ObjectStore,
) -> Result<MitoTable<R>> {
let manifest = TableManifest::new(&table_manifest_dir(table_dir), object_store);
let mut table_info = Self::recover_table_info(table_name, &manifest)
.await?
.context(TableInfoNotFoundSnafu { table_name })?;
table_info.meta.region_numbers = vec![(region.id() & 0xFFFFFFFF) as u32];
Ok(MitoTable::new(table_info, region, manifest))
pub(crate) fn build_manifest(table_dir: &str, object_store: ObjectStore) -> TableManifest {
TableManifest::new(&table_manifest_dir(table_dir), object_store)
}
async fn recover_table_info(
pub(crate) async fn recover_table_info(
table_name: &str,
manifest: &TableManifest,
) -> Result<Option<TableInfo>> {
@@ -449,8 +491,8 @@ impl<R: Region> MitoTable<R> {
}
#[inline]
pub fn region(&self) -> &R {
&self.region
pub fn regions(&self) -> &HashMap<RegionNumber, R> {
&self.regions
}
pub fn set_table_info(&self, table_info: TableInfo) {

View File

@@ -47,6 +47,7 @@ pub fn new_insert_request(
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name,
columns_values,
region_number: 0,
}
}
@@ -115,22 +116,27 @@ fn new_create_request(schema: SchemaRef) -> CreateTableRequest {
}
}
pub async fn setup_test_engine_and_table() -> (
MitoEngine<EngineImpl<NoopLogStore>>,
TableRef,
SchemaRef,
TempDir,
) {
pub struct TestEngineComponents {
pub table_engine: MitoEngine<EngineImpl<NoopLogStore>>,
pub storage_engine: EngineImpl<NoopLogStore>,
pub table_ref: TableRef,
pub schema_ref: SchemaRef,
pub object_store: ObjectStore,
pub dir: TempDir,
}
pub async fn setup_test_engine_and_table() -> TestEngineComponents {
let (dir, object_store) = new_test_object_store("setup_test_engine_and_table").await;
let storage_engine = EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
object_store.clone(),
);
let table_engine = MitoEngine::new(
EngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
object_store.clone(),
),
object_store,
storage_engine.clone(),
object_store.clone(),
);
let schema = Arc::new(schema_for_test());
@@ -142,7 +148,14 @@ pub async fn setup_test_engine_and_table() -> (
.await
.unwrap();
(table_engine, table, schema, dir)
TestEngineComponents {
table_engine,
storage_engine,
table_ref: table,
schema_ref: schema,
object_store,
dir,
}
}
pub async fn setup_mock_engine_and_table(

View File

@@ -7,16 +7,17 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-query = { path = "../common/query" }
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion.workspace = true
datatypes = { path = "../datatypes" }
meta-client = { path = "../meta-client" }
moka = { version = "0.9", features = ["future"] }
snafu.workspace = true
store-api = { path = "../store-api" }
serde.workspace = true
serde_json = "1.0"
snafu.workspace = true
store-api = { path = "../store-api" }
table = { path = "../table" }

View File

@@ -23,7 +23,7 @@ use store_api::storage::RegionId;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to get cache, error: {}", err_msg))]
#[snafu(display("Failed to get meta info from cache, error: {}", err_msg))]
GetCache {
err_msg: String,
backtrace: Backtrace,

View File

@@ -155,18 +155,19 @@ fn split_insert_request(
let table_name = &insert.table_name;
dist_insert
.into_iter()
.map(|(region_id, vector_map)| {
.map(|(region_number, vector_map)| {
let columns_values = vector_map
.into_iter()
.map(|(column_name, mut builder)| (column_name.to_string(), builder.to_vector()))
.collect();
(
region_id,
region_number,
InsertRequest {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
columns_values,
region_number,
},
)
})
@@ -396,10 +397,11 @@ mod tests {
columns_values.insert("id".to_string(), builder.to_vector());
InsertRequest {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
catalog_name: common_catalog::consts::DEFAULT_CATALOG_NAME.to_string(),
schema_name: common_catalog::consts::DEFAULT_SCHEMA_NAME.to_string(),
table_name: "demo".to_string(),
columns_values,
region_number: 0,
}
}
@@ -423,10 +425,11 @@ mod tests {
columns_values.insert("id".to_string(), builder.to_vector());
InsertRequest {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
catalog_name: common_catalog::consts::DEFAULT_CATALOG_NAME.to_string(),
schema_name: common_catalog::consts::DEFAULT_SCHEMA_NAME.to_string(),
table_name: "demo".to_string(),
columns_values,
region_number: 0,
}
}

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use catalog::{CatalogManagerRef, RegisterSystemTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, SCRIPTS_TABLE_ID};
use common_catalog::format_full_table_name;
use common_query::Output;
use common_recordbatch::util as record_util;
use common_telemetry::logging;
@@ -77,7 +78,7 @@ impl ScriptsTable {
Ok(Self {
catalog_manager,
query_engine,
name: catalog::format_full_table_name(
name: format_full_table_name(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
SCRIPTS_TABLE_NAME,
@@ -131,6 +132,7 @@ impl ScriptsTable {
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: SCRIPTS_TABLE_NAME.to_string(),
columns_values,
region_number: 0,
})
.await
.context(InsertScriptSnafu { name })?;

View File

@@ -153,6 +153,7 @@ impl LineWriter {
schema_name: self.db,
table_name: self.table_name,
columns_values,
region_number: 0, // TODO(hl): Check if assign 0 region is ok?
}
}
}

View File

@@ -55,7 +55,7 @@ pub struct ScanRequest {
pub struct GetRequest {}
/// Operation to add a column.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct AddColumn {
/// Descriptor of the column to add.
pub desc: ColumnDescriptor,
@@ -64,7 +64,7 @@ pub struct AddColumn {
}
/// Operation to alter a region.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum AlterOperation {
/// Add columns to the region.
AddColumns {

View File

@@ -99,6 +99,8 @@ pub enum Error {
column_name: String,
backtrace: Backtrace,
},
#[snafu(display("Regions schemas mismatch in table: {}", table))]
RegionSchemaMismatch { table: String, backtrace: Backtrace },
#[snafu(display("Failed to operate table, source: {}", source))]
TableOperation { source: BoxedError },
@@ -122,6 +124,7 @@ impl ErrorExt for Error {
Error::SchemaBuild { source, .. } => source.status_code(),
Error::TableOperation { source } => source.status_code(),
Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound,
Error::RegionSchemaMismatch { .. } => StatusCode::StorageUnavailable,
Error::Unsupported { .. } => StatusCode::Unsupported,
}
}

View File

@@ -416,7 +416,7 @@ impl TryFrom<RawTableMeta> for TableMeta {
primary_key_indices: raw.primary_key_indices,
value_indices: raw.value_indices,
engine: raw.engine,
region_numbers: vec![],
region_numbers: raw.region_numbers,
next_column_id: raw.next_column_id,
engine_options: raw.engine_options,
options: raw.options,

View File

@@ -28,6 +28,7 @@ pub struct InsertRequest {
pub schema_name: String,
pub table_name: String,
pub columns_values: HashMap<String, VectorRef>,
pub region_number: RegionNumber,
}
#[derive(Debug, Clone)]
@@ -58,7 +59,6 @@ pub struct OpenTableRequest {
pub schema_name: String,
pub table_name: String,
pub table_id: TableId,
pub region_numbers: Vec<RegionNumber>,
}
/// Alter table request