refactor: trying to replace TableGlobalValue, part 1 (#1956)

* refactor: trying to replace TableGlobalValue, part 1

* fix: resolve PR comments
This commit is contained in:
LFC
2023-07-17 11:32:46 +08:00
committed by GitHub
parent 8f71ac2172
commit 7cf6c2bd5c
46 changed files with 810 additions and 1373 deletions

2
Cargo.lock generated
View File

@@ -4135,7 +4135,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=9b26cca2132df83916692cc6fc092a26a1a76f0b#9b26cca2132df83916692cc6fc092a26a1a76f0b"
source = "git+https://github.com/MichaelScofield/greptime-proto.git?rev=a98b81b660080e13f01b9cedc9778de9aa1c19b9#a98b81b660080e13f01b9cedc9778de9aa1c19b9"
dependencies = [
"prost",
"serde",

View File

@@ -74,7 +74,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
etcd-client = "0.11"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9b26cca2132df83916692cc6fc092a26a1a76f0b" }
greptime-proto = { git = "https://github.com/MichaelScofield/greptime-proto.git", rev = "a98b81b660080e13f01b9cedc9778de9aa1c19b9" }
itertools = "0.10"
lazy_static = "1.4"
parquet = "40.0"

View File

@@ -71,6 +71,7 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
ColumnDataType::TimestampNanosecond => {
ConcreteDataType::timestamp_nanosecond_datatype()
}
_ => unimplemented!("Implemented in #1961"),
}
}
}
@@ -189,6 +190,7 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values
ts_nanosecond_values: Vec::with_capacity(capacity),
..Default::default()
},
_ => unimplemented!("Implemented in #1961"),
}
}

View File

@@ -32,7 +32,6 @@ use table::TableRef;
use crate::error::{CreateTableSnafu, Result};
pub mod error;
pub mod helper;
pub mod information_schema;
pub mod local;
mod metrics;

View File

@@ -17,6 +17,10 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_catalog::consts::MITO_ENGINE;
use common_meta::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey,
TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue,
};
use common_meta::ident::TableIdent;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
@@ -36,10 +40,6 @@ use crate::error::{
TableEngineNotFoundSnafu, TableExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnimplementedSnafu,
};
use crate::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey,
TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue,
};
use crate::local::MemoryCatalogManager;
use crate::remote::region_alive_keeper::RegionAliveKeepers;
use crate::{

View File

@@ -22,12 +22,12 @@ mod tests {
use std::time::Duration;
use catalog::error::Error;
use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use catalog::remote::mock::MockTableEngine;
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager};
use catalog::{CatalogManager, RegisterSchemaRequest, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use common_meta::ident::TableIdent;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;

View File

@@ -162,6 +162,8 @@ impl Database {
schema: self.schema.clone(),
authorization: self.ctx.auth_header.clone(),
dbname: self.dbname.clone(),
trace_id: None,
span_id: None,
}),
request: Some(request),
}

View File

@@ -194,6 +194,7 @@ fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Ve
Timestamp::new_nanosecond(*v)
))
}
_ => unimplemented!("Implemented in #1961"),
}
}

View File

@@ -92,6 +92,12 @@ pub enum Error {
err_msg: String,
location: Location,
},
#[snafu(display("Invalid catalog value, source: {}", source))]
InvalidCatalogValue {
source: common_catalog::error::Error,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -118,6 +124,8 @@ impl ErrorExt for Error {
}
MetaSrv { source, .. } => source.status_code(),
InvalidCatalogValue { source, .. } => source.status_code(),
}
}

View File

@@ -13,13 +13,16 @@
// limitations under the License.
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use table::metadata::{RawTableInfo, TableId};
use super::TABLE_INFO_KEY_PREFIX;
use crate::error::Result;
use crate::error::{InvalidCatalogValueSnafu, Result};
use crate::helper::{TableGlobalKey, TableGlobalValue};
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest};
use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest, PutRequest};
use crate::table_name::TableName;
pub struct TableInfoKey {
table_id: TableId,
@@ -52,6 +55,60 @@ impl TableInfoManager {
Self { kv_backend }
}
// TODO(LFC): Remove this method when table metadata refactor is done.
pub async fn get_old(&self, table_name: &TableName) -> Result<Option<TableInfoValue>> {
let table_global_key = TableGlobalKey {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
};
self.kv_backend
.get(table_global_key.to_string().as_bytes())
.await?
.map(|kv| TableGlobalValue::from_bytes(kv.value))
.transpose()
.map(|v| {
v.map(|v| TableInfoValue {
table_info: v.table_info,
version: 0,
})
})
.context(InvalidCatalogValueSnafu)
}
// TODO(LFC): Remove this method when table metadata refactor is done.
pub async fn put_old(&self, table_info: RawTableInfo) -> Result<()> {
let key = TableGlobalKey {
catalog_name: table_info.catalog_name.clone(),
schema_name: table_info.schema_name.clone(),
table_name: table_info.name.clone(),
}
.to_string();
let raw_key = key.as_bytes();
let regions_id_map = self
.kv_backend
.get(raw_key)
.await?
.map(|kv| TableGlobalValue::from_bytes(kv.value()))
.transpose()
.context(InvalidCatalogValueSnafu)?
.map(|v| v.regions_id_map)
.unwrap_or_default();
let raw_value = TableGlobalValue {
node_id: 0,
regions_id_map,
table_info,
}
.as_bytes()
.context(InvalidCatalogValueSnafu)?;
let req = PutRequest::new().with_key(raw_key).with_value(raw_value);
self.kv_backend.put(req).await?;
Ok(())
}
pub async fn get(&self, table_id: TableId) -> Result<Option<TableInfoValue>> {
let key = TableInfoKey::new(table_id);
let raw_key = key.as_raw_key();
@@ -66,15 +123,16 @@ impl TableInfoManager {
/// with key is the same as `expect`, the value will be updated to `val`.
///
/// - If the compare-and-set operation successfully updated value, this method will return an `Ok(Ok())`
/// - If associated value is not the same as `expect`, no value will be updated and an `Ok(Err(Vec<u8>))`
/// will be returned, the `Err(Vec<u8>)` indicates the current associated value of key.
/// - If associated value is not the same as `expect`, no value will be updated and an
/// `Ok(Err(Option<TableInfoValue>))` will be returned. The `Option<TableInfoValue>` indicates
/// the current associated value of key.
/// - If any error happens during operation, an `Err(Error)` will be returned.
pub async fn compare_and_put(
&self,
table_id: TableId,
expect: Option<TableInfoValue>,
table_info: RawTableInfo,
) -> Result<std::result::Result<(), Option<Vec<u8>>>> {
) -> Result<std::result::Result<(), Option<TableInfoValue>>> {
let key = TableInfoKey::new(table_id);
let raw_key = key.as_raw_key();
@@ -98,7 +156,10 @@ impl TableInfoManager {
Ok(if resp.success {
Ok(())
} else {
Err(resp.prev_kv.map(|x| x.value))
Err(resp
.prev_kv
.map(|x| TableInfoValue::try_from_raw_value(x.value))
.transpose()?)
})
}
@@ -152,7 +213,21 @@ mod tests {
);
assert!(manager.get(4).await.unwrap().is_none());
// test cas failed, current value is not set
let table_info = new_table_info(4);
let result = manager
.compare_and_put(
4,
Some(TableInfoValue {
table_info: table_info.clone(),
version: 0,
}),
table_info.clone(),
)
.await
.unwrap();
assert!(result.unwrap_err().is_none());
let result = manager
.compare_and_put(4, None, table_info.clone())
.await
@@ -165,7 +240,7 @@ mod tests {
.compare_and_put(4, None, new_table_info.clone())
.await
.unwrap();
let actual = TableInfoValue::try_from_raw_value(result.unwrap_err().unwrap()).unwrap();
let actual = result.unwrap_err().unwrap();
assert_eq!(
actual,
TableInfoValue {

View File

@@ -15,18 +15,19 @@
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use super::{TABLE_NAME_KEY_PATTERN, TABLE_NAME_KEY_PREFIX};
use crate::error::{InvalidTableMetadataSnafu, Result};
use crate::error::{Error, InvalidCatalogValueSnafu, InvalidTableMetadataSnafu, Result};
use crate::helper::{build_table_global_prefix, TableGlobalKey, TableGlobalValue};
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest, RangeRequest};
use crate::table_name::TableName;
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub struct TableNameKey<'a> {
pub catalog: &'a str,
pub schema: &'a str,
@@ -88,6 +89,34 @@ impl<'a> From<&'a TableName> for TableNameKey<'a> {
}
}
impl From<TableNameKey<'_>> for TableName {
fn from(value: TableNameKey<'_>) -> Self {
Self {
catalog_name: value.catalog.to_string(),
schema_name: value.schema.to_string(),
table_name: value.table.to_string(),
}
}
}
impl<'a> TryFrom<&'a str> for TableNameKey<'a> {
type Error = Error;
fn try_from(s: &'a str) -> Result<Self> {
let captures = TABLE_NAME_KEY_PATTERN
.captures(s)
.context(InvalidTableMetadataSnafu {
err_msg: format!("Illegal TableNameKey format: '{s}'"),
})?;
// Safety: pass the regex check above
Ok(Self {
catalog: captures.get(1).unwrap().as_str(),
schema: captures.get(2).unwrap().as_str(),
table: captures.get(3).unwrap().as_str(),
})
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub struct TableNameValue {
table_id: TableId,
@@ -118,7 +147,12 @@ impl TableNameManager {
Self { kv_backend }
}
pub async fn create(&self, key: &TableNameKey<'_>, table_id: TableId) -> Result<bool> {
/// Creates a new table name entry. Returns the current [TableNameValue] if the entry already existed.
pub async fn create(
&self,
key: &TableNameKey<'_>,
table_id: TableId,
) -> Result<Option<TableNameValue>> {
let raw_key = key.as_raw_key();
let value = TableNameValue::new(table_id);
let raw_value = value.try_as_raw_value()?;
@@ -126,7 +160,46 @@ impl TableNameManager {
.with_key(raw_key)
.with_value(raw_value);
let result = self.kv_backend.compare_and_put(req).await?;
Ok(result.success)
Ok(if result.success {
None
} else {
result
.prev_kv
.map(|x| TableNameValue::try_from_raw_value(x.value))
.transpose()?
})
}
// TODO(LFC): Remove this method when table metadata refactor is done.
pub async fn get_old(&self, key: &TableNameKey<'_>) -> Result<Option<TableNameValue>> {
let table_global_key = TableGlobalKey {
catalog_name: key.catalog.to_string(),
schema_name: key.schema.to_string(),
table_name: key.table.to_string(),
};
self.kv_backend
.get(table_global_key.to_string().as_bytes())
.await?
.map(|kv| TableGlobalValue::from_bytes(kv.value()))
.transpose()
.map(|v| v.map(|v| TableNameValue::new(v.table_id())))
.context(InvalidCatalogValueSnafu)
}
// TODO(LFC): Remove this method when table metadata refactor is done.
pub async fn tables_old(&self, catalog: &str, schema: &str) -> Result<Vec<String>> {
let key = build_table_global_prefix(catalog, schema);
let req = RangeRequest::new().with_prefix(key.as_bytes());
let resp = self.kv_backend.range(req).await?;
let mut table_names = Vec::with_capacity(resp.kvs.len());
for kv in resp.kvs {
let key = TableGlobalKey::parse(String::from_utf8_lossy(kv.key()))
.context(InvalidCatalogValueSnafu)?;
table_names.push(key.table_name);
}
Ok(table_names)
}
pub async fn get(&self, key: &TableNameKey<'_>) -> Result<Option<TableNameValue>> {
@@ -175,12 +248,14 @@ mod tests {
for i in 1..=3 {
let table_name = format!("table_{}", i);
let key = TableNameKey::new("my_catalog", "my_schema", &table_name);
assert!(manager.create(&key, i).await.unwrap());
assert!(manager.create(&key, i).await.unwrap().is_none());
}
let key = TableNameKey::new("my_catalog", "my_schema", "my_table");
assert!(manager.create(&key, 99).await.unwrap());
assert!(!manager.create(&key, 99).await.unwrap());
assert!(manager.create(&key, 99).await.unwrap().is_none());
let curr = manager.create(&key, 9).await.unwrap();
assert_eq!(Some(TableNameValue::new(99)), curr);
let value = manager.get(&key).await.unwrap().unwrap();
assert_eq!(value.table_id(), 99);

View File

@@ -15,14 +15,17 @@
use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use table::metadata::TableId;
use super::TABLE_REGION_KEY_PREFIX;
use crate::error::Result;
use crate::error::{InvalidCatalogValueSnafu, InvalidTableMetadataSnafu, Result};
use crate::helper::{TableGlobalKey, TableGlobalValue};
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest};
use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest, PutRequest};
use crate::table_name::TableName;
use crate::DatanodeId;
pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;
@@ -68,6 +71,69 @@ impl TableRegionManager {
.transpose()
}
// TODO(LFC): Remove this method when table metadata refactor is done.
pub async fn get_old(&self, table_name: &TableName) -> Result<Option<TableRegionValue>> {
let key = TableGlobalKey {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
}
.to_string();
let raw_key = key.as_bytes();
self.kv_backend
.get(raw_key)
.await?
.map(|kv| TableGlobalValue::from_bytes(kv.value()))
.transpose()
.map(|v| {
v.map(|v| TableRegionValue {
region_distribution: v.regions_id_map.into_iter().collect(),
version: 0,
})
})
.context(InvalidCatalogValueSnafu)
}
// TODO(LFC): Remove this method when table metadata refactor is done.
pub async fn put_old(
&self,
table_name: &TableName,
region_distribution: RegionDistribution,
) -> Result<()> {
let key = TableGlobalKey {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
}
.to_string();
let raw_key = key.as_bytes();
let table_info = self
.kv_backend
.get(raw_key)
.await?
.map(|kv| TableGlobalValue::from_bytes(kv.value()))
.transpose()
.context(InvalidCatalogValueSnafu)?
.map(|v| v.table_info)
.with_context(|| InvalidTableMetadataSnafu {
err_msg: format!("table global value for {table_name} is empty"),
})?;
let raw_value = TableGlobalValue {
node_id: 0,
regions_id_map: region_distribution.into_iter().collect(),
table_info,
}
.as_bytes()
.context(InvalidCatalogValueSnafu)?;
let req = PutRequest::new().with_key(raw_key).with_value(raw_value);
self.kv_backend.put(req).await?;
Ok(())
}
/// Compare and put value of key. `expect` is the expected value, if backend's current value associated
/// with key is the same as `expect`, the value will be updated to `val`.
///

View File

@@ -16,6 +16,7 @@
pub mod error;
pub mod heartbeat;
pub mod helper;
pub mod ident;
pub mod instruction;
pub mod key;

View File

@@ -15,61 +15,24 @@
use std::collections::{HashMap, HashSet};
use api::v1::meta::{
CreateRequest as PbCreateRequest, DeleteRequest as PbDeleteRequest, Partition as PbPartition,
Peer as PbPeer, Region as PbRegion, RegionRoute as PbRegionRoute,
Partition as PbPartition, Peer as PbPeer, Region as PbRegion, RegionRoute as PbRegionRoute,
RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, Table as PbTable,
TableRoute as PbTableRoute, TableRouteValue as PbTableRouteValue,
TableId as PbTableId, TableRoute as PbTableRoute, TableRouteValue as PbTableRouteValue,
};
use serde::{Deserialize, Serialize, Serializer};
use snafu::{OptionExt, ResultExt};
use snafu::OptionExt;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::RawTableInfo;
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::peer::Peer;
use crate::rpc::util;
use crate::table_name::TableName;
#[derive(Debug, Clone)]
pub struct CreateRequest<'a> {
pub table_name: TableName,
pub partitions: Vec<Partition>,
pub table_info: &'a RawTableInfo,
}
impl TryFrom<CreateRequest<'_>> for PbCreateRequest {
type Error = error::Error;
fn try_from(mut req: CreateRequest) -> Result<Self> {
Ok(Self {
header: None,
table_name: Some(req.table_name.into()),
partitions: req.partitions.drain(..).map(Into::into).collect(),
table_info: serde_json::to_vec(&req.table_info).context(error::SerdeJsonSnafu)?,
})
}
}
impl<'a> CreateRequest<'a> {
#[inline]
pub fn new(table_name: TableName, table_info: &'a RawTableInfo) -> Self {
Self {
table_name,
partitions: vec![],
table_info,
}
}
#[inline]
pub fn add_partition(mut self, partition: Partition) -> Self {
self.partitions.push(partition);
self
}
}
#[derive(Debug, Clone, Default)]
pub struct RouteRequest {
pub table_names: Vec<TableName>,
pub table_ids: Vec<TableId>,
}
impl From<RouteRequest> for PbRouteRequest {
@@ -77,15 +40,17 @@ impl From<RouteRequest> for PbRouteRequest {
Self {
header: None,
table_names: req.table_names.drain(..).map(Into::into).collect(),
table_ids: req.table_ids.drain(..).map(|id| PbTableId { id }).collect(),
}
}
}
impl RouteRequest {
#[inline]
pub fn new() -> Self {
pub fn new(table_id: TableId) -> Self {
Self {
table_names: vec![],
table_ids: vec![table_id],
}
}
@@ -96,27 +61,6 @@ impl RouteRequest {
}
}
#[derive(Debug, Clone)]
pub struct DeleteRequest {
pub table_name: TableName,
}
impl From<DeleteRequest> for PbDeleteRequest {
fn from(req: DeleteRequest) -> Self {
Self {
header: None,
table_name: Some(req.table_name.into()),
}
}
}
impl DeleteRequest {
#[inline]
pub fn new(table_name: TableName) -> Self {
Self { table_name }
}
}
#[derive(Debug, Clone)]
pub struct RouteResponse {
pub table_routes: Vec<TableRoute>,
@@ -423,97 +367,13 @@ impl From<PbPartition> for Partition {
#[cfg(test)]
mod tests {
use api::v1::meta::{
DeleteRequest as PbDeleteRequest, Partition as PbPartition, Peer as PbPeer,
Region as PbRegion, RegionRoute as PbRegionRoute, RouteRequest as PbRouteRequest,
RouteResponse as PbRouteResponse, Table as PbTable, TableName as PbTableName,
TableRoute as PbTableRoute,
Partition as PbPartition, Peer as PbPeer, Region as PbRegion, RegionRoute as PbRegionRoute,
RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, Table as PbTable,
TableName as PbTableName, TableRoute as PbTableRoute,
};
use chrono::DateTime;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use table::metadata::{RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
use super::*;
fn new_table_info() -> RawTableInfo {
RawTableInfo {
ident: TableIdent {
table_id: 0,
version: 0,
},
name: "t1".to_string(),
desc: None,
catalog_name: "c1".to_string(),
schema_name: "s1".to_string(),
meta: RawTableMeta {
schema: RawSchema {
column_schemas: vec![
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
ColumnSchema::new("c1", ConcreteDataType::string_datatype(), true),
ColumnSchema::new("c2", ConcreteDataType::string_datatype(), true),
],
timestamp_index: Some(0),
version: 0,
},
primary_key_indices: vec![],
value_indices: vec![],
engine: "mito".to_string(),
next_column_id: 0,
region_numbers: vec![],
engine_options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
},
table_type: TableType::Base,
}
}
#[test]
fn test_create_request_trans() {
let req = CreateRequest {
table_name: TableName::new("c1", "s1", "t1"),
partitions: vec![
Partition {
column_list: vec![b"c1".to_vec(), b"c2".to_vec()],
value_list: vec![b"v1".to_vec(), b"v2".to_vec()],
},
Partition {
column_list: vec![b"c1".to_vec(), b"c2".to_vec()],
value_list: vec![b"v11".to_vec(), b"v22".to_vec()],
},
],
table_info: &new_table_info(),
};
let into_req: PbCreateRequest = req.try_into().unwrap();
assert!(into_req.header.is_none());
let table_name = into_req.table_name;
assert_eq!("c1", table_name.as_ref().unwrap().catalog_name);
assert_eq!("s1", table_name.as_ref().unwrap().schema_name);
assert_eq!("t1", table_name.as_ref().unwrap().table_name);
assert_eq!(
vec![b"c1".to_vec(), b"c2".to_vec()],
into_req.partitions.get(0).unwrap().column_list
);
assert_eq!(
vec![b"v1".to_vec(), b"v2".to_vec()],
into_req.partitions.get(0).unwrap().value_list
);
assert_eq!(
vec![b"c1".to_vec(), b"c2".to_vec()],
into_req.partitions.get(1).unwrap().column_list
);
assert_eq!(
vec![b"v11".to_vec(), b"v22".to_vec()],
into_req.partitions.get(1).unwrap().value_list
);
}
#[test]
fn test_route_request_trans() {
let req = RouteRequest {
@@ -521,6 +381,7 @@ mod tests {
TableName::new("c1", "s1", "t1"),
TableName::new("c2", "s2", "t2"),
],
table_ids: vec![1, 2, 3],
};
let into_req: PbRouteRequest = req.into();
@@ -532,20 +393,10 @@ mod tests {
assert_eq!("c2", into_req.table_names.get(1).unwrap().catalog_name);
assert_eq!("s2", into_req.table_names.get(1).unwrap().schema_name);
assert_eq!("t2", into_req.table_names.get(1).unwrap().table_name);
}
#[test]
fn test_delete_request_trans() {
let req = DeleteRequest {
table_name: TableName::new("c1", "s1", "t1"),
};
let into_req: PbDeleteRequest = req.into();
assert!(into_req.header.is_none());
assert_eq!("c1", into_req.table_name.as_ref().unwrap().catalog_name);
assert_eq!("s1", into_req.table_name.as_ref().unwrap().schema_name);
assert_eq!("t1", into_req.table_name.as_ref().unwrap().table_name);
assert_eq!(
(1..=3).map(|id| PbTableId { id }).collect::<Vec<_>>(),
into_req.table_ids
);
}
#[test]

View File

@@ -21,10 +21,6 @@ use catalog::error::{
self as catalog_err, InternalSnafu, InvalidCatalogValueSnafu, InvalidSystemTableDefSnafu,
Result as CatalogResult, TableMetadataManagerSnafu, UnimplementedSnafu,
};
use catalog::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey,
TableGlobalKey, TableGlobalValue,
};
use catalog::information_schema::InformationSchemaProvider;
use catalog::remote::KvCacheInvalidatorRef;
use catalog::{
@@ -34,14 +30,21 @@ use catalog::{
use client::client_manager::DatanodeClients;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_meta::key::TableMetadataManagerRef;
use common_meta::helper::{
build_catalog_prefix, build_schema_prefix, CatalogKey, SchemaKey, TableGlobalKey,
};
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_region::TableRegionKey;
use common_meta::key::{TableMetaKey, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::rpc::store::RangeRequest;
use common_meta::rpc::KeyValue;
use common_meta::table_name::TableName;
use common_telemetry::warn;
use common_telemetry::{debug, warn};
use partition::manager::PartitionRuleManagerRef;
use snafu::prelude::*;
use table::metadata::TableId;
use table::table::numbers::NumbersTable;
use table::TableRef;
@@ -110,7 +113,13 @@ impl FrontendCatalogManager {
self.backend_cache_invalidator.invalidate_key(key).await;
}
pub async fn invalidate_table(&self, catalog: &str, schema: &str, table: &str) {
pub async fn invalidate_table(
&self,
catalog: &str,
schema: &str,
table: &str,
table_id: TableId,
) {
let tg_key = TableGlobalKey {
catalog_name: catalog.into(),
schema_name: schema.into(),
@@ -121,6 +130,38 @@ impl FrontendCatalogManager {
let tg_key = tg_key.as_bytes();
self.backend_cache_invalidator.invalidate_key(tg_key).await;
let key = TableNameKey::new(catalog, schema, table);
self.backend_cache_invalidator
.invalidate_key(&key.as_raw_key())
.await;
debug!(
"invalidated cache key: {}",
String::from_utf8_lossy(&key.as_raw_key())
);
let key = TableInfoKey::new(table_id);
self.backend_cache_invalidator
.invalidate_key(&key.as_raw_key())
.await;
debug!(
"invalidated cache key: {}",
String::from_utf8_lossy(&key.as_raw_key())
);
let key = TableRegionKey::new(table_id);
self.backend_cache_invalidator
.invalidate_key(&key.as_raw_key())
.await;
debug!(
"invalidated cache key: {}",
String::from_utf8_lossy(&key.as_raw_key())
);
self.partition_manager
.table_routes()
.invalidate_table_route(&TableName::new(catalog, schema, table))
.await;
}
}
@@ -141,12 +182,7 @@ impl CatalogManager for FrontendCatalogManager {
Ok(true)
}
async fn deregister_table(&self, request: DeregisterTableRequest) -> CatalogResult<()> {
let table_name = TableName::new(request.catalog, request.schema, request.table_name);
self.partition_manager
.table_routes()
.invalidate_table_route(&table_name)
.await;
async fn deregister_table(&self, _request: DeregisterTableRequest) -> CatalogResult<()> {
Ok(())
}
@@ -304,29 +340,17 @@ impl CatalogManager for FrontendCatalogManager {
}
async fn table_names(&self, catalog: &str, schema: &str) -> CatalogResult<Vec<String>> {
let mut tables = vec![];
let mut tables = self
.table_metadata_manager
.table_name_manager()
.tables_old(catalog, schema)
.await
.context(TableMetadataManagerSnafu)?;
if catalog == DEFAULT_CATALOG_NAME && schema == DEFAULT_SCHEMA_NAME {
tables.push("numbers".to_string());
}
let key = build_table_global_prefix(catalog, schema);
let req = RangeRequest::new().with_prefix(key.as_bytes());
let iter = self
.backend
.range(req)
.await
.context(TableMetadataManagerSnafu)?
.kvs
.into_iter();
let result = iter
.map(|KeyValue { key: k, value: _ }| {
let key = TableGlobalKey::parse(String::from_utf8_lossy(&k))
.context(InvalidCatalogValueSnafu)?;
Ok(key.table_name)
})
.collect::<CatalogResult<Vec<_>>>()?;
tables.extend(result);
Ok(tables)
}
@@ -360,17 +384,13 @@ impl CatalogManager for FrontendCatalogManager {
}
async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult<bool> {
let table_global_key = TableGlobalKey {
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
};
Ok(self
.backend()
.get(table_global_key.to_string().as_bytes())
let key = TableNameKey::new(catalog, schema, table);
self.table_metadata_manager
.table_name_manager()
.get_old(&key)
.await
.context(TableMetadataManagerSnafu)?
.is_some())
.context(TableMetadataManagerSnafu)
.map(|x| x.is_some())
}
async fn table(
@@ -400,15 +420,20 @@ impl CatalogManager for FrontendCatalogManager {
return provider.table(table_name);
}
let table_global_key = TableGlobalKey {
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table_name.to_string(),
};
let Some(kv) = self.backend().get(table_global_key.to_string().as_bytes()).await.context(TableMetadataManagerSnafu)? else {
return Ok(None);
};
let v = TableGlobalValue::from_bytes(kv.value).context(InvalidCatalogValueSnafu)?;
let key = TableNameKey::new(catalog, schema, table_name);
let Some(table_name_value) = self.table_metadata_manager
.table_name_manager()
.get_old(&key)
.await
.context(TableMetadataManagerSnafu)? else { return Ok(None) };
let _table_id = table_name_value.table_id();
let Some(v) = self.table_metadata_manager
.table_info_manager()
.get_old(&key.into())
.await
.context(TableMetadataManagerSnafu)? else { return Ok(None) };
let table_info = Arc::new(
v.table_info
.try_into()
@@ -418,7 +443,6 @@ impl CatalogManager for FrontendCatalogManager {
TableName::new(catalog, schema, table_name),
table_info,
Arc::new(self.clone()),
self.table_metadata_manager.clone(),
));
Ok(Some(table))
}

View File

@@ -13,14 +13,16 @@
// limitations under the License.
use async_trait::async_trait;
use catalog::helper::TableGlobalKey;
use catalog::remote::KvCacheInvalidatorRef;
use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::helper::TableGlobalKey;
use common_meta::ident::TableIdent;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::table_region::TableRegionKey;
use common_meta::key::TableMetaKey;
use common_meta::table_name::TableName;
use common_telemetry::{error, info};
use partition::manager::TableRouteCacheInvalidatorRef;
@@ -48,15 +50,8 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
let mailbox = ctx.mailbox.clone();
let self_ref = self.clone();
let TableIdent {
catalog,
schema,
table,
..
} = table_ident;
let _handle = common_runtime::spawn_bg(async move {
self_ref.invalidate_table(&catalog, &schema, &table).await;
self_ref.invalidate_table_cache(table_ident).await;
if let Err(e) = mailbox
.send((
@@ -87,11 +82,11 @@ impl InvalidateTableCacheHandler {
}
}
async fn invalidate_table(&self, catalog_name: &str, schema_name: &str, table_name: &str) {
async fn invalidate_table_cache(&self, table_ident: TableIdent) {
let tg_key = TableGlobalKey {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
catalog_name: table_ident.catalog.clone(),
schema_name: table_ident.schema.clone(),
table_name: table_ident.table.clone(),
}
.to_string();
info!("invalidate table cache: {}", tg_key);
@@ -99,14 +94,15 @@ impl InvalidateTableCacheHandler {
self.backend_cache_invalidator.invalidate_key(tg_key).await;
let table = &TableName {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
};
let key = &TableRegionKey::new(table_ident.table_id).as_raw_key();
self.backend_cache_invalidator.invalidate_key(key).await;
self.table_route_cache_invalidator
.invalidate_table_route(table)
.invalidate_table_route(&TableName::new(
table_ident.catalog,
table_ident.schema,
table_ident.table,
))
.await;
}
}

View File

@@ -17,12 +17,12 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use api::v1::meta::HeartbeatResponse;
use catalog::helper::TableGlobalKey;
use catalog::remote::KvCacheInvalidator;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::helper::TableGlobalKey;
use common_meta::ident::TableIdent;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::table_name::TableName;

View File

@@ -163,7 +163,6 @@ impl Instance {
meta_client.clone(),
Arc::new(catalog_manager.clone()),
datanode_clients.clone(),
table_metadata_manager.clone(),
);
let dist_instance = Arc::new(dist_instance);

View File

@@ -25,7 +25,6 @@ use api::v1::{
FlushTableExpr, InsertRequests,
};
use async_trait::async_trait;
use catalog::helper::{SchemaKey, SchemaValue};
use catalog::{CatalogManager, RegisterTableRequest};
use chrono::DateTime;
use client::client_manager::DatanodeClients;
@@ -33,7 +32,7 @@ use client::Database;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::key::TableMetadataManagerRef;
use common_meta::helper::{SchemaKey, SchemaValue};
use common_meta::peer::Peer;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::router::{Partition as MetaPartition, RouteRequest};
@@ -82,7 +81,6 @@ pub struct DistInstance {
meta_client: Arc<MetaClient>,
catalog_manager: Arc<FrontendCatalogManager>,
datanode_clients: Arc<DatanodeClients>,
table_metadata_manager: TableMetadataManagerRef,
}
impl DistInstance {
@@ -90,13 +88,11 @@ impl DistInstance {
meta_client: Arc<MetaClient>,
catalog_manager: Arc<FrontendCatalogManager>,
datanode_clients: Arc<DatanodeClients>,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
meta_client,
catalog_manager,
datanode_clients,
table_metadata_manager,
}
}
@@ -132,7 +128,6 @@ impl DistInstance {
table_name.clone(),
table_info,
self.catalog_manager.clone(),
self.table_metadata_manager.clone(),
));
let request = RegisterTableRequest {
@@ -161,6 +156,7 @@ impl DistInstance {
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
table_id,
)
.await;
@@ -194,6 +190,7 @@ impl DistInstance {
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
table_id,
)
.await;
@@ -265,7 +262,7 @@ impl DistInstance {
table_name: &TableName,
region_number: Option<RegionNumber>,
) -> Result<Vec<Peer>> {
let _ = self
let table = self
.catalog_manager
.table(
&table_name.catalog_name,
@@ -277,11 +274,13 @@ impl DistInstance {
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
})?;
let table_id = table.table_info().table_id();
let route_response = self
.meta_client
.route(RouteRequest {
table_names: vec![table_name.clone()],
table_ids: vec![table_id],
})
.await
.context(RequestMetaSnafu)?;
@@ -521,6 +520,7 @@ impl DistInstance {
&table_info.catalog_name,
&table_info.schema_name,
&table_info.name,
table_info.table_id(),
)
.await;

View File

@@ -176,19 +176,19 @@ impl DistInserter {
mod tests {
use api::v1::column::{SemanticType, Values};
use api::v1::{Column, ColumnDataType, InsertRequest as GrpcInsertRequest};
use catalog::helper::{
CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue,
};
use client::client_manager::DatanodeClients;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::TableMetadataManager;
use common_meta::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_region::RegionDistribution;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackend, KvBackendRef};
use common_meta::rpc::store::PutRequest;
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::metadata::{RawTableInfo, TableInfoBuilder, TableMetaBuilder};
use super::*;
use crate::heartbeat::handler::tests::MockKvCacheInvalidator;
@@ -219,13 +219,10 @@ mod tests {
backend
}
async fn create_testing_table(backend: &KvBackendRef, table_name: &str) {
let table_global_key = TableGlobalKey {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
};
async fn create_testing_table(
table_name: &str,
table_metadata_manager: &TableMetadataManagerRef,
) {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true)
@@ -243,20 +240,35 @@ mod tests {
.build()
.unwrap();
let table_info = TableInfoBuilder::new(table_name, table_meta)
let table_id = 1;
let table_info: RawTableInfo = TableInfoBuilder::new(table_name, table_meta)
.table_id(table_id)
.build()
.unwrap();
.unwrap()
.into();
let table_global_value = TableGlobalValue {
node_id: 1,
regions_id_map: HashMap::from([(1, vec![1]), (2, vec![2]), (3, vec![3])]),
table_info: table_info.into(),
};
let key = TableNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
assert!(table_metadata_manager
.table_name_manager()
.create(&key, table_id)
.await
.unwrap()
.is_none());
let req = PutRequest::new()
.with_key(table_global_key.to_string().as_bytes())
.with_value(table_global_value.as_bytes().unwrap());
backend.put(req).await.unwrap();
assert!(table_metadata_manager
.table_info_manager()
.put_old(table_info)
.await
.is_ok());
assert!(table_metadata_manager
.table_region_manager()
.put_old(
&key.into(),
RegionDistribution::from([(1, vec![1]), (2, vec![2]), (3, vec![3])]),
)
.await
.is_ok());
}
#[tokio::test]
@@ -265,7 +277,7 @@ mod tests {
let table_metadata_manager = Arc::new(TableMetadataManager::new(backend.clone()));
let table_name = "one_column_partitioning_table";
create_testing_table(&backend, table_name).await;
create_testing_table(table_name, &table_metadata_manager).await;
let catalog_manager = Arc::new(FrontendCatalogManager::new(
backend,

View File

@@ -17,13 +17,9 @@ use std::iter;
use std::pin::Pin;
use std::sync::Arc;
use api::v1::AlterExpr;
use async_trait::async_trait;
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use client::Database;
use common_error::ext::BoxedError;
use common_meta::key::{TableMetadataManagerRef, TableRouteKey};
use common_meta::rpc::store::{MoveValueRequest, PutRequest};
use common_meta::table_name::TableName;
use common_query::error::Result as QueryResult;
use common_query::logical_plan::Expr;
@@ -36,7 +32,6 @@ use common_recordbatch::error::{
use common_recordbatch::{
RecordBatch, RecordBatchStreamAdaptor, RecordBatches, SendableRecordBatchStream,
};
use common_telemetry::debug;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::{
Partitioning, SendableRecordBatchStream as DfSendableRecordBatchStream,
@@ -44,21 +39,17 @@ use datafusion::physical_plan::{
use datafusion_common::DataFusionError;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use futures_util::{Stream, StreamExt};
use partition::manager::TableRouteCacheInvalidator;
use partition::splitter::WriteSplitter;
use snafu::prelude::*;
use store_api::storage::{RegionNumber, ScanRequest};
use table::error::TableOperationSnafu;
use table::metadata::{FilterPushDownType, TableInfo, TableInfoRef};
use table::requests::{AlterKind, AlterTableRequest, DeleteRequest, InsertRequest};
use table::table::AlterContext;
use table::metadata::{FilterPushDownType, TableInfoRef};
use table::requests::{DeleteRequest, InsertRequest};
use table::Table;
use tokio::sync::RwLock;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
self, FindDatanodeSnafu, FindTableRouteSnafu, Result, TableMetadataManagerSnafu,
};
use crate::error::{FindDatanodeSnafu, FindTableRouteSnafu, Result};
use crate::instance::distributed::inserter::DistInserter;
use crate::table::delete::to_grpc_delete_request;
use crate::table::scan::{DatanodeInstance, TableScanPlan};
@@ -72,8 +63,6 @@ pub struct DistTable {
table_name: TableName,
table_info: TableInfoRef,
catalog_manager: Arc<FrontendCatalogManager>,
#[allow(unused)]
table_metadata_manager: TableMetadataManagerRef,
}
#[async_trait]
@@ -179,13 +168,6 @@ impl Table for DistTable {
Ok(vec![FilterPushDownType::Inexact; filters.len()])
}
async fn alter(&self, context: AlterContext, request: &AlterTableRequest) -> table::Result<()> {
self.handle_alter(context, request)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)
}
async fn delete(&self, request: DeleteRequest) -> table::Result<usize> {
let partition_manager = self.catalog_manager.partition_manager();
@@ -244,205 +226,14 @@ impl DistTable {
table_name: TableName,
table_info: TableInfoRef,
catalog_manager: Arc<FrontendCatalogManager>,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
table_name,
table_info,
catalog_manager,
table_metadata_manager,
}
}
pub async fn table_global_value(
&self,
key: &TableGlobalKey,
) -> Result<Option<TableGlobalValue>> {
let raw = self
.catalog_manager
.backend()
.get(key.to_string().as_bytes())
.await
.context(TableMetadataManagerSnafu)?;
Ok(if let Some(raw) = raw {
Some(TableGlobalValue::from_bytes(raw.value).context(error::CatalogEntrySerdeSnafu)?)
} else {
None
})
}
async fn set_table_global_value(
&self,
key: TableGlobalKey,
value: TableGlobalValue,
) -> Result<()> {
let value = value.as_bytes().context(error::CatalogEntrySerdeSnafu)?;
let req = PutRequest::new()
.with_key(key.to_string().as_bytes())
.with_value(value);
let _ = self
.catalog_manager
.backend()
.put(req)
.await
.context(TableMetadataManagerSnafu)?;
Ok(())
}
async fn delete_table_global_value(&self, key: TableGlobalKey) -> Result<()> {
let _ = self
.catalog_manager
.backend()
.delete(key.to_string().as_bytes(), false)
.await
.context(TableMetadataManagerSnafu)?;
Ok(())
}
async fn move_table_route_value(
&self,
catalog_name: &str,
schema_name: &str,
table_id: u32,
old_table_name: &str,
new_table_name: &str,
) -> Result<()> {
let old_key = TableRouteKey {
table_id,
catalog_name,
schema_name,
table_name: old_table_name,
}
.to_string();
let new_key = TableRouteKey {
table_id,
catalog_name,
schema_name,
table_name: new_table_name,
}
.to_string();
let req = MoveValueRequest::new(old_key.as_bytes(), new_key.as_bytes());
self.catalog_manager
.backend()
.move_value(req)
.await
.context(TableMetadataManagerSnafu)?;
self.catalog_manager
.partition_manager()
.invalidate_table_route(&TableName {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: old_table_name.to_string(),
})
.await;
Ok(())
}
async fn handle_alter(&self, context: AlterContext, request: &AlterTableRequest) -> Result<()> {
let AlterTableRequest {
catalog_name,
schema_name,
table_name,
alter_kind,
table_id: _table_id,
..
} = request;
let alter_expr = context
.get::<AlterExpr>()
.context(error::ContextValueNotFoundSnafu { key: "AlterExpr" })?;
self.alter_by_expr(alter_expr).await?;
let table_info = self.table_info();
let new_meta = table_info
.meta
.builder_with_alter_kind(table_name, &request.alter_kind)
.context(error::TableSnafu)?
.build()
.context(error::BuildTableMetaSnafu {
table_name: table_name.clone(),
})?;
let mut new_info = TableInfo::clone(&*table_info);
new_info.ident.version = table_info.ident.version + 1;
new_info.meta = new_meta;
let key = TableGlobalKey {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
};
let mut value = self
.table_global_value(&key)
.await?
.context(error::TableNotFoundSnafu { table_name })?;
value.table_info = new_info.into();
if let AlterKind::RenameTable { new_table_name } = alter_kind {
let new_key = TableGlobalKey {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: new_table_name.clone(),
};
self.set_table_global_value(new_key, value).await?;
self.delete_table_global_value(key).await?;
self.move_table_route_value(
catalog_name,
schema_name,
table_info.ident.table_id,
table_name,
new_table_name,
)
.await?;
Ok(())
} else {
self.set_table_global_value(key, value).await
}
}
/// Define a `alter_by_expr` instead of impl [`Table::alter`] to avoid redundant conversion between
/// [`table::requests::AlterTableRequest`] and [`AlterExpr`].
async fn alter_by_expr(&self, expr: &AlterExpr) -> Result<()> {
let table_routes = self
.catalog_manager
.partition_manager()
.find_table_route(&self.table_name)
.await
.with_context(|_| error::FindTableRouteSnafu {
table_name: self.table_name.to_string(),
})?;
let leaders = table_routes.find_leaders();
ensure!(
!leaders.is_empty(),
error::LeaderNotFoundSnafu {
table: format!(
"{:?}.{:?}.{}",
expr.catalog_name, expr.schema_name, expr.table_name
)
}
);
let datanode_clients = self.catalog_manager.datanode_clients();
for datanode in leaders {
let client = datanode_clients.get_client(&datanode).await;
let db = Database::new(&expr.catalog_name, &expr.schema_name, client);
debug!("Sending {:?} to {:?}", expr, db);
let result = db
.alter(expr.clone())
.await
.context(error::RequestDatanodeSnafu)?;
debug!("Alter table result: {:?}", result);
// TODO(hl): We should further check and track alter result in some global DDL task tracker
}
Ok(())
}
async fn find_datanode_instances(
&self,
regions: &[RegionNumber],
@@ -698,7 +489,7 @@ pub(crate) mod test {
);
let table_route = TableRoute::new(
Table {
id: 1,
id: 2,
table_name: table_name.clone(),
table_schema: vec![],
},

View File

@@ -12,23 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, Peer, Role};
use chrono::DateTime;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::rpc::router::{CreateRequest, Partition};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest,
PutRequest, RangeRequest,
};
use common_meta::table_name::TableName;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use meta_client::client::MetaClientBuilder;
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
use tracing::{event, subscriber, Level};
use tracing_subscriber::FmtSubscriber;
@@ -78,25 +70,6 @@ async fn run() {
}
});
let p1 = Partition {
column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()],
value_list: vec![b"k1".to_vec(), b"k2".to_vec()],
};
let p2 = Partition {
column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()],
value_list: vec![b"Max1".to_vec(), b"Max2".to_vec()],
};
let table_name = TableName::new("test_catalog", "test_schema", "test_table");
let table_info = new_table_info();
let create_req = CreateRequest::new(table_name, &table_info)
.add_partition(p1)
.add_partition(p2);
let res = meta_client.create_route(create_req).await;
event!(Level::INFO, "create_route result: {:#?}", res);
// put
let put = PutRequest::new()
.with_key(b"key1".to_vec())
@@ -173,40 +146,3 @@ async fn run() {
let res = meta_client.batch_get(batch_get).await.unwrap();
event!(Level::INFO, "batch get result: {:#?}", res);
}
fn new_table_info() -> RawTableInfo {
RawTableInfo {
ident: TableIdent {
table_id: 0,
version: 0,
},
name: "test_table".to_string(),
desc: None,
catalog_name: "test_catalog".to_string(),
schema_name: "test_schema".to_string(),
meta: RawTableMeta {
schema: RawSchema {
column_schemas: vec![
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
ColumnSchema::new("col2", ConcreteDataType::string_datatype(), true),
],
timestamp_index: Some(0),
version: 0,
},
primary_key_indices: vec![],
value_indices: vec![],
engine: "mito".to_string(),
next_column_id: 0,
region_numbers: vec![],
engine_options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
},
table_type: TableType::Base,
}
}

View File

@@ -24,7 +24,7 @@ use api::v1::meta::Role;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::lock::{LockRequest, LockResponse, UnlockRequest};
use common_meta::rpc::router::{CreateRequest, DeleteRequest, RouteRequest, RouteResponse};
use common_meta::rpc::router::{RouteRequest, RouteResponse};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
@@ -41,7 +41,7 @@ use store::Client as StoreClient;
pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
use crate::error;
use crate::error::{ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Result};
use crate::error::{ConvertMetaResponseSnafu, Result};
pub type Id = (u64, u64);
@@ -224,21 +224,6 @@ impl MetaClient {
self.heartbeat_client()?.heartbeat().await
}
/// Provides routing information for distributed create table requests.
///
/// When a distributed create table request is received, this method returns
/// a list of `datanode` addresses that are generated based on the partition
/// information contained in the request and using some intelligent policies,
/// such as load-based.
pub async fn create_route(&self, req: CreateRequest<'_>) -> Result<RouteResponse> {
let req = req.try_into().context(ConvertMetaRequestSnafu)?;
self.router_client()?
.create(req)
.await?
.try_into()
.context(ConvertMetaResponseSnafu)
}
/// Fetch routing information for tables. The smallest unit is the complete
/// routing information(all regions) of a table.
///
@@ -266,17 +251,6 @@ impl MetaClient {
.context(ConvertMetaResponseSnafu)
}
/// Can be called repeatedly, the first call will delete and return the
/// table of routing information, the nth call can still return the
/// deleted route information.
pub async fn delete_route(&self, req: DeleteRequest) -> Result<RouteResponse> {
self.router_client()?
.delete(req.into())
.await?
.try_into()
.context(ConvertMetaResponseSnafu)
}
/// Range gets the keys in the range from the key-value store.
pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
self.store_client()?
@@ -424,20 +398,10 @@ impl MetaClient {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, Peer};
use chrono::DateTime;
use common_meta::rpc::router::Partition;
use common_meta::table_name::TableName;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use meta_srv::metasrv::SelectorContext;
use meta_srv::selector::{Namespace, Selector};
use meta_srv::Result as MetaResult;
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
use super::*;
use crate::mocks;
@@ -547,39 +511,6 @@ mod tests {
assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
}
fn new_table_info(table_name: &TableName) -> RawTableInfo {
RawTableInfo {
ident: TableIdent {
table_id: 0,
version: 0,
},
name: table_name.table_name.clone(),
desc: None,
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
meta: RawTableMeta {
schema: RawSchema {
column_schemas: vec![ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)],
timestamp_index: Some(0),
version: 0,
},
primary_key_indices: vec![],
value_indices: vec![],
engine: "mito".to_string(),
next_column_id: 0,
region_numbers: vec![],
engine_options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
},
table_type: TableType::Base,
}
}
#[tokio::test]
async fn test_not_start_router_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
@@ -589,10 +520,8 @@ mod tests {
.build();
meta_client.start(urls).await.unwrap();
let table_name = TableName::new("c", "s", "t");
let table_info = new_table_info(&table_name);
let req = CreateRequest::new(table_name, &table_info);
let res = meta_client.create_route(req).await;
let req = RouteRequest::new(1);
let res = meta_client.route(req).await;
assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
}
@@ -671,37 +600,6 @@ mod tests {
}
}
#[tokio::test]
async fn test_route() {
let selector = Arc::new(MockSelector {});
let client = mocks::mock_client_with_memorystore_and_selector(selector).await;
let p1 = Partition {
column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()],
value_list: vec![b"k1".to_vec(), b"k2".to_vec()],
};
let p2 = Partition {
column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()],
value_list: vec![b"Max1".to_vec(), b"Max2".to_vec()],
};
let table_name = TableName::new("test_catalog", "test_schema", "test_table");
let table_info = new_table_info(&table_name);
let req = CreateRequest::new(table_name.clone(), &table_info)
.add_partition(p1)
.add_partition(p2);
let res = client.create_route(req).await.unwrap();
assert_eq!(1, res.table_routes.len());
let req = RouteRequest::new().add_table_name(table_name.clone());
let res = client.route(req).await.unwrap();
assert!(!res.table_routes.is_empty());
let req = DeleteRequest::new(table_name.clone());
let res = client.delete_route(req).await;
let _ = res.unwrap();
}
#[tokio::test]
async fn test_range_get() {
let tc = new_client("test_range_get").await;

View File

@@ -16,7 +16,7 @@ use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::router_client::RouterClient;
use api::v1::meta::{CreateRequest, DeleteRequest, Role, RouteRequest, RouteResponse};
use api::v1::meta::{Role, RouteRequest, RouteResponse};
use common_grpc::channel_manager::ChannelManager;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::RwLock;
@@ -57,20 +57,10 @@ impl Client {
inner.is_started()
}
pub async fn create(&self, req: CreateRequest) -> Result<RouteResponse> {
let inner = self.inner.read().await;
inner.create(req).await
}
pub async fn route(&self, req: RouteRequest) -> Result<RouteResponse> {
let inner = self.inner.read().await;
inner.route(req).await
}
pub async fn delete(&self, req: DeleteRequest) -> Result<RouteResponse> {
let inner = self.inner.read().await;
inner.delete(req).await
}
}
#[derive(Debug)]
@@ -105,14 +95,6 @@ impl Inner {
Ok(())
}
async fn create(&self, mut req: CreateRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
let res = client.create(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
async fn route(&self, mut req: RouteRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
@@ -121,14 +103,6 @@ impl Inner {
Ok(res.into_inner())
}
async fn delete(&self, mut req: DeleteRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
let res = client.delete(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
fn random_client(&self) -> Result<RouterClient<Channel>> {
let len = self.peers.len();
let peer = lb::random_get(len, |i| Some(&self.peers[i])).context(

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use api::v1::meta::Role;
use meta_srv::metasrv::SelectorRef;
use meta_srv::mocks as server_mock;
use meta_srv::mocks::MockInfo;
@@ -30,11 +29,6 @@ pub async fn mock_client_with_etcdstore(addr: &str) -> MetaClient {
mock_client_by(mock_info).await
}
pub async fn mock_client_with_memorystore_and_selector(selector: SelectorRef) -> MetaClient {
let mock_info = server_mock::mock_with_memstore_and_selector(selector).await;
mock_client_by(mock_info).await
}
pub async fn mock_client_by(mock_info: MockInfo) -> MetaClient {
let MockInfo {
server_addr,

View File

@@ -17,18 +17,19 @@ use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use catalog::helper::TableGlobalKey;
use common_meta::ident::TableIdent;
use common_meta::key::TableMetadataManagerRef;
use common_meta::table_name::TableName;
use common_meta::ClusterId;
use common_telemetry::warn;
use snafu::ResultExt;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use crate::error::Result;
use crate::error::{Result, TableMetadataManagerSnafu};
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::procedure::region_failover::{RegionFailoverKey, RegionFailoverManager};
use crate::service::store::kv::KvStoreRef;
use crate::table_routes;
/// The lease seconds of a region. It's set by two default heartbeat intervals (5 second × 2) plus
/// two roundtrip time (2 second × 2 × 2), plus some extra buffer (2 second).
@@ -36,25 +37,44 @@ use crate::table_routes;
pub(crate) const REGION_LEASE_SECONDS: u64 = 20;
pub(crate) struct RegionLeaseHandler {
kv_store: KvStoreRef,
region_failover_manager: Option<Arc<RegionFailoverManager>>,
#[allow(unused)]
table_metadata_manager: TableMetadataManagerRef,
}
impl RegionLeaseHandler {
pub(crate) fn new(
kv_store: KvStoreRef,
region_failover_manager: Option<Arc<RegionFailoverManager>>,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
kv_store,
region_failover_manager,
table_metadata_manager,
}
}
async fn find_table_ident(
&self,
table_id: TableId,
table_name: &TableName,
) -> Result<Option<TableIdent>> {
let value = self
.table_metadata_manager
.table_info_manager()
.get_old(table_name)
.await
.context(TableMetadataManagerSnafu)?;
Ok(value.map(|x| {
let table_info = &x.table_info;
TableIdent {
catalog: table_info.catalog_name.clone(),
schema: table_info.schema_name.clone(),
table: table_info.name.clone(),
table_id,
engine: table_info.meta.engine.clone(),
}
}))
}
/// Filter out the regions that are currently in failover.
/// It's meaningless to extend the lease of a region if it is in failover.
fn filter_failover_regions(
@@ -96,46 +116,55 @@ impl HeartbeatHandler for RegionLeaseHandler {
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
let Some(stat) = acc.stat.as_ref() else { return Ok(()) };
let datanode_id = stat.id;
let mut datanode_regions = HashMap::new();
stat.region_stats.iter().for_each(|x| {
let key = TableGlobalKey {
catalog_name: x.catalog.to_string(),
schema_name: x.schema.to_string(),
table_name: x.table.to_string(),
};
let region_id: RegionId = x.id.into();
let table_id = region_id.table_id();
let table_name = TableName::new(
x.catalog.to_string(),
x.schema.to_string(),
x.table.to_string(),
);
datanode_regions
.entry(key)
.entry((table_id, table_name))
.or_insert_with(Vec::new)
.push(RegionId::from(x.id).region_number());
});
// TODO(LFC): Retrieve table global values from some cache here.
let table_global_values = table_routes::batch_get_table_global_value(
&self.kv_store,
datanode_regions.keys().collect::<Vec<_>>(),
)
.await?;
let mut region_leases = Vec::with_capacity(datanode_regions.len());
for (table_global_key, local_regions) in datanode_regions {
let Some(Some(table_global_value)) = table_global_values.get(&table_global_key) else { continue };
for ((table_id, table_name), local_regions) in datanode_regions {
let Some(table_ident) = self.find_table_ident(table_id, &table_name).await? else {
warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \
Reason: table not found.");
continue;
};
let Some(global_regions) = table_global_value.regions_id_map.get(&stat.id) else { continue };
let Some(table_region_value) = self
.table_metadata_manager
.table_region_manager()
.get_old(&table_name)
.await
.context(TableMetadataManagerSnafu)? else {
warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \
Reason: table region value not found.");
continue;
};
let Some(global_regions) = table_region_value
.region_distribution
.get(&datanode_id) else {
warn!("Reject region lease request from Datanode {datanode_id} for table id {table_id}. \
Reason: not expected to place the region on it.");
continue;
};
// Filter out the designated regions from table global metadata for the given table on the given Datanode.
// Filter out the designated regions from table info value for the given table on the given Datanode.
let designated_regions = local_regions
.into_iter()
.filter(|x| global_regions.contains(x))
.collect::<Vec<_>>();
let table_ident = TableIdent {
catalog: table_global_key.catalog_name.to_string(),
schema: table_global_key.schema_name.to_string(),
table: table_global_key.table_name.to_string(),
table_id: table_global_value.table_id(),
engine: table_global_value.engine().to_string(),
};
let designated_regions =
self.filter_failover_regions(stat.cluster_id, &table_ident, designated_regions);
@@ -160,7 +189,7 @@ mod test {
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
use crate::service::store::kv::KvBackendAdapter;
use crate::test_util;
use crate::{table_routes, test_util};
#[tokio::test]
async fn test_handle_region_lease() {
@@ -171,17 +200,22 @@ mod test {
.kv_store
.clone();
let table_id = 1;
let table_name = "my_table";
let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
)));
let _ = table_routes::tests::prepare_table_global_value(&kv_store, table_name).await;
table_routes::tests::prepare_table_region_and_info_value(
&table_metadata_manager,
table_name,
)
.await;
let table_ident = TableIdent {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: table_name.to_string(),
table_id: 1,
table_id,
engine: "mito".to_string(),
};
let _ = region_failover_manager
@@ -194,11 +228,8 @@ mod test {
region_number: 1,
});
let handler = RegionLeaseHandler::new(
kv_store,
Some(region_failover_manager),
table_metadata_manager,
);
let handler =
RegionLeaseHandler::new(Some(region_failover_manager), table_metadata_manager);
let req = HeartbeatRequest {
duration_since_epoch: 1234,
@@ -210,9 +241,10 @@ mod test {
let ctx = &mut metasrv.new_ctx();
let acc = &mut HeartbeatAccumulator::default();
let new_region_stat = |region_id: u64| -> RegionStat {
let new_region_stat = |region_number: RegionNumber| -> RegionStat {
let region_id = RegionId::new(table_id, region_number);
RegionStat {
id: region_id,
id: region_id.as_u64(),
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: table_name.to_string(),
@@ -228,7 +260,7 @@ mod test {
handler.handle(&req, ctx, acc).await.unwrap();
// region 1 is during failover and region 3 is not in table global value,
// region 1 is during failover and region 3 is not in table region value,
// so only region 2's lease is extended.
assert_eq!(acc.region_leases.len(), 1);
let lease = acc.region_leases.remove(0);

View File

@@ -24,7 +24,6 @@ use crate::error;
use crate::error::Result;
use crate::handler::node_stat::Stat;
pub(crate) const REMOVED_PREFIX: &str = "__removed";
pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease";
pub(crate) const SEQ_PREFIX: &str = "__meta_seq";
@@ -128,10 +127,6 @@ impl TryFrom<LeaseValue> for Vec<u8> {
}
}
pub(crate) fn to_removed_key(key: &str) -> String {
format!("{REMOVED_PREFIX}-{key}")
}
pub fn build_table_route_prefix(catalog: impl AsRef<str>, schema: impl AsRef<str>) -> String {
format!(
"{}-{}-{}-",

View File

@@ -52,14 +52,14 @@ pub trait DistLock: Send + Sync {
pub type DistLockRef = Arc<dyn DistLock>;
pub(crate) struct DistLockGuard<'a> {
pub struct DistLockGuard<'a> {
lock: &'a DistLockRef,
name: Vec<u8>,
key: Option<Key>,
}
impl<'a> DistLockGuard<'a> {
pub(crate) fn new(lock: &'a DistLockRef, name: Vec<u8>) -> Self {
pub fn new(lock: &'a DistLockRef, name: Vec<u8>) -> Self {
Self {
lock,
name,
@@ -67,7 +67,7 @@ impl<'a> DistLockGuard<'a> {
}
}
pub(crate) async fn lock(&mut self) -> Result<()> {
pub async fn lock(&mut self) -> Result<()> {
if self.key.is_some() {
return Ok(());
}

View File

@@ -15,7 +15,6 @@
//! All keys used for distributed locking in the Metasrv.
//! Place them in this unified module for better maintenance.
use common_meta::table_name::TableName;
use common_meta::RegionIdent;
use crate::lock::Key;
@@ -31,7 +30,3 @@ pub(crate) fn table_metadata_lock_key(region: &RegionIdent) -> Key {
)
.into_bytes()
}
pub(crate) fn table_creation_lock_key(table_name: &TableName) -> Key {
format!("table_creation_lock_({})", table_name).into_bytes()
}

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use async_trait::async_trait;
use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use common_meta::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use common_meta::rpc::store::CompareAndPutRequest;
use common_telemetry::{info, timer};
use metrics::increment_counter;
@@ -119,7 +119,7 @@ impl MetadataService for DefaultMetadataService {
mod tests {
use std::sync::Arc;
use catalog::helper::{CatalogKey, SchemaKey};
use common_meta::helper::{CatalogKey, SchemaKey};
use super::{DefaultMetadataService, MetadataService};
use crate::service::store::kv::KvStoreRef;

View File

@@ -234,7 +234,6 @@ impl MetaSrvBuilder {
};
let region_lease_handler = RegionLeaseHandler::new(
kv_store.clone(),
region_failover_handler
.as_ref()
.map(|x| x.region_failover_manager().clone()),

View File

@@ -14,10 +14,10 @@
use api::v1::meta::TableRouteValue;
use async_trait::async_trait;
use catalog::helper::TableGlobalKey;
use client::Database;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_meta::helper::TableGlobalKey;
use common_meta::key::TableRouteKey;
use common_meta::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use common_meta::rpc::ddl::CreateTableTask;

View File

@@ -25,9 +25,9 @@ use std::sync::{Arc, RwLock};
use std::time::Duration;
use async_trait::async_trait;
use catalog::helper::TableGlobalKey;
use common_meta::ident::TableIdent;
use common_meta::key::TableMetadataManagerRef;
use common_meta::table_name::TableName;
use common_meta::{ClusterId, RegionIdent};
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
@@ -42,7 +42,7 @@ use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::RegionNumber;
use crate::error::{Error, RegisterProcedureLoaderSnafu, Result};
use crate::error::{Error, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu};
use crate::lock::DistLockRef;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::service::mailbox::MailboxRef;
@@ -204,17 +204,17 @@ impl RegionFailoverManager {
async fn table_exists(&self, failed_region: &RegionIdent) -> Result<bool> {
let table_ident = &failed_region.table_ident;
let table_global_key = TableGlobalKey {
catalog_name: table_ident.catalog.clone(),
schema_name: table_ident.schema.clone(),
table_name: table_ident.table.clone(),
};
let table_global_value = self
.selector_ctx
.kv_store
.get(&table_global_key.to_raw_key())
.await?;
Ok(table_global_value.is_some())
Ok(self
.table_metadata_manager
.table_region_manager()
.get_old(&TableName::new(
&table_ident.catalog,
&table_ident.schema,
&table_ident.table,
))
.await
.context(TableMetadataManagerSnafu)?
.is_some())
}
}
@@ -376,7 +376,6 @@ mod tests {
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage, Peer, RequestHeader};
use catalog::helper::TableGlobalKey;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::ident::TableIdent;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
@@ -422,20 +421,21 @@ mod tests {
impl TestingEnv {
pub async fn failed_region(&self, region_number: u32) -> RegionIdent {
let table = "my_table";
let key = TableGlobalKey {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table.to_string(),
};
let value =
table_routes::get_table_global_value(&self.context.selector_ctx.kv_store, &key)
.await
.unwrap()
.unwrap();
let value = self
.context
.table_metadata_manager
.table_region_manager()
.get_old(&TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"my_table",
))
.await
.unwrap()
.unwrap();
let failed_datanode = value
.regions_id_map
.region_distribution
.iter()
.find_map(|(&datanode_id, regions)| {
if regions.contains(&region_number) {
@@ -454,7 +454,7 @@ mod tests {
engine: MITO_ENGINE.to_string(),
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: table.to_string(),
table: "my_table".to_string(),
},
}
}
@@ -489,8 +489,21 @@ mod tests {
let table_metadata_manager = Arc::new(TableMetadataManager::new(
KvBackendAdapter::wrap(kv_store.clone()),
));
let (_, table_global_value) =
table_routes::tests::prepare_table_global_value(&kv_store, table).await;
table_routes::tests::prepare_table_region_and_info_value(
&table_metadata_manager,
table,
)
.await;
let table_region_value = table_metadata_manager
.table_region_manager()
.get_old(&TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
table,
))
.await
.unwrap()
.unwrap();
let _ = table_routes::tests::prepare_table_route_value(&kv_store, table).await;
@@ -511,7 +524,7 @@ mod tests {
let mailbox = HeartbeatMailbox::create(pushers.clone(), mailbox_sequence);
let selector = self.selector.unwrap_or_else(|| {
let nodes = (1..=table_global_value.regions_id_map.len())
let nodes = (1..=table_region_value.region_distribution.len())
.map(|id| Peer {
id: id as u64,
addr: "".to_string(),
@@ -650,24 +663,27 @@ mod tests {
);
// Verifies that the failed region (region 1) is moved from failed datanode (datanode 1) to the candidate datanode.
let key = TableGlobalKey {
catalog_name: failed_region.table_ident.catalog.clone(),
schema_name: failed_region.table_ident.schema.clone(),
table_name: failed_region.table_ident.table.clone(),
};
let value = table_routes::get_table_global_value(&env.context.selector_ctx.kv_store, &key)
let value = env
.context
.table_metadata_manager
.table_region_manager()
.get_old(&TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"my_table",
))
.await
.unwrap()
.unwrap();
assert_eq!(
value
.regions_id_map
.region_distribution
.get(&failed_region.datanode_id)
.unwrap(),
&vec![2]
);
assert!(value
.regions_id_map
.region_distribution
.get(&candidate_rx.recv().await.unwrap())
.unwrap()
.contains(&1));

View File

@@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{TableName, TableRouteValue};
use api::v1::meta::{TableName as PbTableName, TableRouteValue};
use async_trait::async_trait;
use catalog::helper::TableGlobalKey;
use common_meta::key::TableRouteKey;
use common_meta::peer::Peer;
use common_meta::rpc::router::TableRoute;
use common_meta::table_name::TableName;
use common_meta::RegionIdent;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
@@ -26,8 +26,8 @@ use snafu::{OptionExt, ResultExt};
use super::invalidate_cache::InvalidateCache;
use super::{RegionFailoverContext, State};
use crate::error::{
CorruptedTableRouteSnafu, Result, RetryLaterSnafu, TableNotFoundSnafu,
TableRouteConversionSnafu,
CorruptedTableRouteSnafu, Result, RetryLaterSnafu, TableMetadataManagerSnafu,
TableNotFoundSnafu, TableRouteConversionSnafu,
};
use crate::lock::keys::table_metadata_lock_key;
use crate::lock::Opts;
@@ -43,7 +43,7 @@ impl UpdateRegionMetadata {
Self { candidate }
}
/// Updates the metadata of the table. Specifically, the [TableGlobalValue] and [TableRouteValue].
/// Updates the metadata of the table.
async fn update_metadata(
&self,
ctx: &RegionFailoverContext,
@@ -52,54 +52,60 @@ impl UpdateRegionMetadata {
let key = table_metadata_lock_key(failed_region);
let key = ctx.dist_lock.lock(key, Opts::default()).await?;
self.update_table_global_value(ctx, failed_region).await?;
self.update_table_region_value(ctx, failed_region).await?;
self.update_table_route(ctx, failed_region).await?;
ctx.dist_lock.unlock(key).await?;
Ok(())
}
async fn update_table_global_value(
async fn update_table_region_value(
&self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<()> {
let key = TableGlobalKey {
catalog_name: failed_region.table_ident.catalog.clone(),
schema_name: failed_region.table_ident.schema.clone(),
table_name: failed_region.table_ident.table.clone(),
};
let mut value = table_routes::get_table_global_value(&ctx.selector_ctx.kv_store, &key)
.await?
let table_ident = &failed_region.table_ident;
let table_id = table_ident.table_id;
let table_name = TableName::new(
&table_ident.catalog,
&table_ident.schema,
&table_ident.table,
);
let value = ctx
.table_metadata_manager
.table_region_manager()
.get_old(&table_name)
.await
.context(TableMetadataManagerSnafu)?
.with_context(|| TableNotFoundSnafu {
name: common_catalog::format_full_table_name(
&key.catalog_name,
&key.schema_name,
&key.table_name,
),
name: table_ident.to_string(),
})?;
let mut region_distribution = value.region_distribution.clone();
if let Some(mut region_numbers) = value.regions_id_map.remove(&failed_region.datanode_id) {
if let Some(mut region_numbers) = region_distribution.remove(&failed_region.datanode_id) {
region_numbers.retain(|x| *x != failed_region.region_number);
if !region_numbers.is_empty() {
let _ = value
.regions_id_map
.insert(failed_region.datanode_id, region_numbers);
region_distribution.insert(failed_region.datanode_id, region_numbers);
}
}
let region_numbers = value
.regions_id_map
let region_numbers = region_distribution
.entry(self.candidate.id)
.or_insert_with(Vec::new);
region_numbers.push(failed_region.region_number);
table_routes::put_table_global_value(&ctx.selector_ctx.kv_store, &key, &value).await?;
ctx.table_metadata_manager
.table_region_manager()
.put_old(&table_name, region_distribution.clone())
.await
.context(TableMetadataManagerSnafu)?;
info!(
"Region mappings in table global value (key = '{key}') are updated to {:?}. \
"Region distribution of table (id = {table_id}) is updated to {:?}. \
Failed region {} was on Datanode {}.",
value.regions_id_map, failed_region.region_number, failed_region.datanode_id,
region_distribution, failed_region.region_number, failed_region.datanode_id,
);
Ok(())
}
@@ -109,7 +115,7 @@ impl UpdateRegionMetadata {
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<()> {
let table_name = TableName {
let table_name = PbTableName {
catalog_name: failed_region.table_ident.catalog.clone(),
schema_name: failed_region.table_ident.schema.clone(),
table_name: failed_region.table_ident.table.clone(),
@@ -210,8 +216,10 @@ impl State for UpdateRegionMetadata {
#[cfg(test)]
mod tests {
use api::v1::meta::TableRouteValue;
use catalog::helper::TableGlobalValue;
use common_meta::key::table_region::TableRegionValue;
use common_meta::key::TableRouteKey;
use common_meta::DatanodeId;
use store_api::storage::RegionNumber;
use super::super::tests::{TestingEnv, TestingEnvBuilder};
use super::{State, *};
@@ -232,40 +240,34 @@ mod tests {
}
#[tokio::test]
async fn test_update_table_global_value() {
async fn test_update_table_info_value() {
common_telemetry::init_default_ut_logging();
async fn test(env: TestingEnv, failed_region: u32, candidate: u64) -> TableGlobalValue {
async fn test(
env: TestingEnv,
failed_region: RegionNumber,
candidate: DatanodeId,
) -> TableRegionValue {
let failed_region = env.failed_region(failed_region).await;
let key = TableGlobalKey {
catalog_name: failed_region.table_ident.catalog.clone(),
schema_name: failed_region.table_ident.schema.clone(),
table_name: failed_region.table_ident.table.clone(),
};
let original =
table_routes::get_table_global_value(&env.context.selector_ctx.kv_store, &key)
.await
.unwrap()
.unwrap();
let state = UpdateRegionMetadata::new(Peer::new(candidate, ""));
state
.update_table_global_value(&env.context, &failed_region)
.update_table_region_value(&env.context, &failed_region)
.await
.unwrap();
let updated =
table_routes::get_table_global_value(&env.context.selector_ctx.kv_store, &key)
.await
.unwrap()
.unwrap();
// verifies that other data stay untouched
assert_eq!(original.node_id, updated.node_id);
assert_eq!(original.table_info, updated.table_info);
updated
let table_ident = failed_region.table_ident;
env.context
.table_metadata_manager
.table_region_manager()
.get_old(&TableName::new(
&table_ident.catalog,
&table_ident.schema,
&table_ident.table,
))
.await
.unwrap()
.unwrap()
}
// Region distribution:
@@ -278,7 +280,7 @@ mod tests {
let env = TestingEnvBuilder::new().build().await;
let updated = test(env, 1, 2).await;
let new_region_id_map = updated.regions_id_map;
let new_region_id_map = updated.region_distribution;
assert_eq!(new_region_id_map.len(), 3);
assert_eq!(new_region_id_map.get(&1), Some(&vec![2]));
assert_eq!(new_region_id_map.get(&2), Some(&vec![3, 1]));
@@ -288,7 +290,7 @@ mod tests {
let env = TestingEnvBuilder::new().build().await;
let updated = test(env, 3, 3).await;
let new_region_id_map = updated.regions_id_map;
let new_region_id_map = updated.region_distribution;
assert_eq!(new_region_id_map.len(), 2);
assert_eq!(new_region_id_map.get(&1), Some(&vec![1, 2]));
assert_eq!(new_region_id_map.get(&3), Some(&vec![4, 3]));
@@ -297,7 +299,7 @@ mod tests {
let env = TestingEnvBuilder::new().build().await;
let updated = test(env, 1, 4).await;
let new_region_id_map = updated.regions_id_map;
let new_region_id_map = updated.region_distribution;
assert_eq!(new_region_id_map.len(), 4);
assert_eq!(new_region_id_map.get(&1), Some(&vec![2]));
assert_eq!(new_region_id_map.get(&2), Some(&vec![3]));
@@ -308,7 +310,7 @@ mod tests {
let env = TestingEnvBuilder::new().build().await;
let updated = test(env, 3, 4).await;
let new_region_id_map = updated.regions_id_map;
let new_region_id_map = updated.region_distribution;
assert_eq!(new_region_id_map.len(), 3);
assert_eq!(new_region_id_map.get(&1), Some(&vec![1, 2]));
assert_eq!(new_region_id_map.get(&3), Some(&vec![4]));
@@ -435,7 +437,7 @@ mod tests {
async fn test_update_metadata_concurrently() {
common_telemetry::init_default_ut_logging();
// Test the correctness of concurrently updating the region distribution in table global
// Test the correctness of concurrently updating the region distribution in table region
// value, and region routes in table route value. Region 1 moves to Datanode 2; region 2
// moves to Datanode 3.
//
@@ -512,19 +514,15 @@ mod tests {
assert_eq!(peers.len(), 2);
assert_eq!(actual, expected);
let table_global_key = TableGlobalKey {
catalog_name,
schema_name,
table_name,
};
let table_global_value = table_routes::get_table_global_value(
&env.context.selector_ctx.kv_store,
&table_global_key,
)
.await
.unwrap()
.unwrap();
let map = table_global_value.regions_id_map;
let map = env
.context
.table_metadata_manager
.table_region_manager()
.get_old(&TableName::new(&catalog_name, &schema_name, &table_name))
.await
.unwrap()
.unwrap()
.region_distribution;
assert_eq!(map.len(), 2);
assert_eq!(map.get(&2), Some(&vec![3, 1]));
assert_eq!(map.get(&3), Some(&vec![4, 2]));

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1::meta::TableRouteValue;
use catalog::helper::TableGlobalKey;
use common_meta::helper::TableGlobalKey;
use common_meta::key::TableRouteKey;
use common_meta::peer::Peer;
use common_meta::rpc::router::TableRoute;

View File

@@ -64,7 +64,6 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
let router = router.route(
"/tables",
meta::TablesHandler {
kv_store: meta_srv.kv_store().clone(),
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
},
);
@@ -72,7 +71,6 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
let router = router.route(
"/table",
meta::TableHandler {
kv_store: meta_srv.kv_store().clone(),
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
},
);

View File

@@ -14,9 +14,8 @@
use std::collections::HashMap;
use catalog::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, TABLE_GLOBAL_KEY_PREFIX,
};
use common_meta::helper::{build_catalog_prefix, build_schema_prefix};
use common_meta::key::table_name::TableNameKey;
use common_meta::key::TableMetadataManagerRef;
use common_meta::rpc::store::{RangeRequest, RangeResponse};
use common_meta::util;
@@ -24,7 +23,7 @@ use snafu::{OptionExt, ResultExt};
use tonic::codegen::http;
use crate::error;
use crate::error::Result;
use crate::error::{Result, TableMetadataManagerSnafu};
use crate::service::admin::HttpHandler;
use crate::service::store::kv::KvStoreRef;
@@ -37,12 +36,10 @@ pub struct SchemasHandler {
}
pub struct TablesHandler {
pub kv_store: KvStoreRef,
pub table_metadata_manager: TableMetadataManagerRef,
}
pub struct TableHandler {
pub kv_store: KvStoreRef,
pub table_metadata_manager: TableMetadataManagerRef,
}
@@ -87,8 +84,15 @@ impl HttpHandler for TablesHandler {
.context(error::MissingRequiredParameterSnafu {
param: "schema_name",
})?;
get_http_response_by_prefix(build_table_global_prefix(catalog, schema), &self.kv_store)
let tables = self
.table_metadata_manager
.table_name_manager()
.tables_old(catalog, schema)
.await
.context(TableMetadataManagerSnafu)?;
to_http_response(tables)
}
}
@@ -99,22 +103,56 @@ impl HttpHandler for TableHandler {
_: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
let table_name = params
.get("full_table_name")
.map(|full_table_name| full_table_name.replace('.', "-"))
.context(error::MissingRequiredParameterSnafu {
param: "full_table_name",
})?;
let table_key = format!("{TABLE_GLOBAL_KEY_PREFIX}-{table_name}");
let table_name =
params
.get("full_table_name")
.context(error::MissingRequiredParameterSnafu {
param: "full_table_name",
})?;
let response = self.kv_store.get(table_key.as_bytes()).await?;
let mut value: String = "Not found result".to_string();
if let Some(key_value) = response {
value = String::from_utf8(key_value.value).context(error::InvalidUtf8ValueSnafu)?;
let key: TableNameKey = table_name
.as_str()
.try_into()
.context(TableMetadataManagerSnafu)?;
let mut result = HashMap::with_capacity(2);
let table_id = self
.table_metadata_manager
.table_name_manager()
.get_old(&key)
.await
.context(TableMetadataManagerSnafu)?
.map(|x| x.table_id());
if let Some(_table_id) = table_id {
let table_info_value = self
.table_metadata_manager
.table_info_manager()
.get_old(&key.into())
.await
.context(TableMetadataManagerSnafu)?
.map(|x| format!("{x:?}"))
.unwrap_or_else(|| "Not Found".to_string());
result.insert("table_info_value", table_info_value);
}
if let Some(_table_id) = table_id {
let table_region_value = self
.table_metadata_manager
.table_region_manager()
.get_old(&key.into())
.await
.context(TableMetadataManagerSnafu)?
.map(|x| format!("{x:?}"))
.unwrap_or_else(|| "Not Found".to_string());
result.insert("table_region_value", table_region_value);
}
http::Response::builder()
.status(http::StatusCode::OK)
.body(value)
// Safety: HashMap<String, String> is definitely "serde-json"-able.
.body(serde_json::to_string(&result).unwrap())
.context(error::InvalidHttpBodySnafu)
}
}
@@ -125,6 +163,10 @@ async fn get_http_response_by_prefix(
kv_store: &KvStoreRef,
) -> Result<http::Response<String>> {
let keys = get_keys_by_prefix(key_prefix, kv_store).await?;
to_http_response(keys)
}
fn to_http_response(keys: Vec<String>) -> Result<http::Response<String>> {
let body = serde_json::to_string(&keys).context(error::SerializeToJsonSnafu {
input: format!("{keys:?}"),
})?;
@@ -164,7 +206,7 @@ async fn get_keys_by_prefix(key_prefix: String, kv_store: &KvStoreRef) -> Result
mod tests {
use std::sync::Arc;
use catalog::helper::{
use common_meta::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey,
SchemaKey, TableGlobalKey,
};

View File

@@ -14,11 +14,10 @@
use api::v1::meta::{
ddl_task_server, Partition, Region, RegionRoute, SubmitDdlTaskRequest, SubmitDdlTaskResponse,
Table, TableRoute,
Table, TableId, TableRoute,
};
use api::v1::TableId;
use catalog::helper::TableGlobalKey;
use common_grpc_expr::alter_expr_to_request;
use common_meta::helper::TableGlobalKey;
use common_meta::key::TableRouteKey;
use common_meta::rpc::ddl::{AlterTableTask, CreateTableTask, DdlTask, DropTableTask};
use common_meta::rpc::router;

View File

@@ -15,91 +15,26 @@
use std::collections::HashMap;
use api::v1::meta::{
router_server, CreateRequest, DeleteRequest, Error, Peer, PeerDict, Region, RegionRoute,
ResponseHeader, RouteRequest, RouteResponse, Table, TableRoute, TableRouteValue,
router_server, Peer, PeerDict, ResponseHeader, RouteRequest, RouteResponse, TableRoute,
TableRouteValue,
};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_meta::key::TableRouteKey;
use common_meta::rpc::store::BatchPutRequest;
use common_meta::rpc::KeyValue;
use common_meta::helper::TableGlobalValue;
use common_meta::key::table_info::TableInfoValue;
use common_meta::table_name::TableName;
use common_telemetry::{timer, warn};
use snafu::{ensure, OptionExt, ResultExt};
use common_telemetry::timer;
use snafu::{OptionExt, ResultExt};
use table::metadata::RawTableInfo;
use tonic::{Request, Response};
use crate::error;
use crate::error::Result;
use crate::lock::{keys, DistLockGuard};
use crate::metasrv::{Context, MetaSrv, SelectorContext, SelectorRef};
use crate::error::{Result, TableMetadataManagerSnafu};
use crate::metasrv::{Context, MetaSrv};
use crate::metrics::METRIC_META_ROUTE_REQUEST;
use crate::sequence::SequenceRef;
use crate::service::GrpcResult;
use crate::table_routes::{
fetch_tables, get_table_global_value, remove_table_global_value, remove_table_route_value,
table_route_key,
};
use crate::table_routes::fetch_tables;
#[async_trait::async_trait]
impl router_server::Router for MetaSrv {
async fn create(&self, req: Request<CreateRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let CreateRequest {
header, table_name, ..
} = &req;
let table_name: TableName = table_name
.as_ref()
.context(error::EmptyTableNameSnafu)?
.clone()
.into();
// TODO(LFC): Use procedure to create table, and get rid of locks here.
let mut guard = DistLockGuard::new(self.lock(), keys::table_creation_lock_key(&table_name));
guard.lock().await?;
let table_global_key = TableGlobalKey {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
}
.to_string()
.into_bytes();
ensure!(
self.kv_store().get(&table_global_key).await?.is_none(),
error::TableAlreadyExistsSnafu {
table_name: table_name.to_string(),
}
);
let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id);
let _timer = timer!(
METRIC_META_ROUTE_REQUEST,
&[
("op", "create".to_string()),
("cluster_id", cluster_id.to_string())
]
);
let ctx = SelectorContext {
datanode_lease_secs: self.options().datanode_lease_secs,
server_addr: self.options().server_addr.clone(),
kv_store: self.kv_store().clone(),
meta_peer_client: self.meta_peer_client().clone(),
catalog: Some(table_name.catalog_name.clone()),
schema: Some(table_name.schema_name.clone()),
table: Some(table_name.table_name.clone()),
};
let selector = self.selector();
let table_id_sequence = self.table_id_sequence();
let res = handle_create(req, ctx, selector, table_id_sequence).await?;
Ok(Response::new(res))
}
async fn route(&self, req: Request<RouteRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let cluster_id = req.header.as_ref().map_or(0, |h| h.cluster_id);
@@ -117,143 +52,6 @@ impl router_server::Router for MetaSrv {
Ok(Response::new(res))
}
async fn delete(&self, req: Request<DeleteRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let cluster_id = req.header.as_ref().map_or(0, |h| h.cluster_id);
let _timer = timer!(
METRIC_META_ROUTE_REQUEST,
&[
("op", "delete".to_string()),
("cluster_id", cluster_id.to_string())
]
);
let ctx = self.new_ctx();
let res = handle_delete(req, ctx).await?;
Ok(Response::new(res))
}
}
async fn handle_create(
req: CreateRequest,
ctx: SelectorContext,
selector: &SelectorRef,
table_id_sequence: &SequenceRef,
) -> Result<RouteResponse> {
let CreateRequest {
header,
table_name,
partitions,
table_info,
} = req;
let table_name = table_name.context(error::EmptyTableNameSnafu)?;
let mut table_info: RawTableInfo =
serde_json::from_slice(&table_info).with_context(|_| error::DeserializeFromJsonSnafu {
input: format!(
"Corrupted table info: {}",
String::from_utf8_lossy(&table_info)
),
})?;
let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id);
let mut peers = selector.select(cluster_id, &ctx).await?;
if peers.len() < partitions.len() {
warn!("Create table failed due to no enough available datanodes, table: {table_name:?}, partition number: {}, datanode number: {}", partitions.len(), peers.len());
return Ok(RouteResponse {
header: Some(ResponseHeader::failed(
cluster_id,
Error::not_enough_available_datanodes(partitions.len(), peers.len()),
)),
..Default::default()
});
}
// We don't need to keep all peers, just truncate it to the number of partitions.
// If the peers are not enough, some peers will be used for multiple partitions.
peers.truncate(partitions.len());
let id = table_id_sequence.next().await?;
table_info.ident.table_id = id as u32;
let table_route_key = TableRouteKey::with_table_name(id as _, &table_name)
.to_string()
.into_bytes();
let table = Table {
id,
table_name: Some(table_name.clone()),
..Default::default()
};
let mut region_routes = Vec::with_capacity(partitions.len());
for (i, partition) in partitions.into_iter().enumerate() {
let region = Region {
id: i as u64,
partition: Some(partition),
..Default::default()
};
let region_route = RegionRoute {
region: Some(region),
leader_peer_index: (i % peers.len()) as u64,
follower_peer_indexes: vec![], // follower_peers is not supported at the moment
};
region_routes.push(region_route);
}
let table_route = TableRoute {
table: Some(table),
region_routes,
};
// save table route data into meta store
let table_route_value = TableRouteValue {
peers: peers.clone(),
table_route: Some(table_route.clone()),
};
let table_global_key = TableGlobalKey {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
}
.to_string()
.into_bytes();
let table_global_value = create_table_global_value(&table_route_value, table_info)?
.as_bytes()
.context(error::InvalidCatalogValueSnafu)?;
let req = BatchPutRequest {
kvs: vec![
KeyValue {
key: table_global_key,
value: table_global_value,
},
KeyValue {
key: table_route_key,
value: table_route_value.into(),
},
],
prev_kv: true,
};
let resp = ctx.kv_store.batch_put(req).await?;
if !resp.prev_kvs.is_empty() {
warn!(
"Caution: table meta values are replaced! \
Maybe last creation procedure for table {table_name:?} was aborted?"
);
}
let header = Some(ResponseHeader::success(cluster_id));
Ok(RouteResponse {
header,
peers,
table_routes: vec![table_route],
})
}
pub(crate) fn create_table_global_value(
@@ -298,14 +96,16 @@ async fn handle_route(req: RouteRequest, ctx: Context) -> Result<RouteResponse>
let RouteRequest {
header,
table_names,
table_ids: _,
} = req;
let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id);
let table_global_keys = table_names.into_iter().map(|t| TableGlobalKey {
catalog_name: t.catalog_name,
schema_name: t.schema_name,
table_name: t.table_name,
});
let tables = fetch_tables(&ctx.kv_store, table_global_keys).await?;
let table_names = table_names
.into_iter()
.map(Into::into)
.collect::<Vec<TableName>>();
let tables = fetch_tables(&ctx, table_names).await?;
let (peers, table_routes) = fill_table_routes(tables)?;
let header = Some(ResponseHeader::success(cluster_id));
@@ -316,39 +116,8 @@ async fn handle_route(req: RouteRequest, ctx: Context) -> Result<RouteResponse>
})
}
async fn handle_delete(req: DeleteRequest, ctx: Context) -> Result<RouteResponse> {
let DeleteRequest { header, table_name } = req;
let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id);
let tgk = table_name
.map(|t| TableGlobalKey {
catalog_name: t.catalog_name,
schema_name: t.schema_name,
table_name: t.table_name,
})
.context(error::EmptyTableNameSnafu)?;
let tgv = get_table_global_value(&ctx.kv_store, &tgk)
.await?
.with_context(|| error::TableNotFoundSnafu {
name: format!("{tgk}"),
})?;
let _ = remove_table_global_value(&ctx.kv_store, &tgk).await?;
let trk = table_route_key(tgv.table_id(), &tgk);
let (_, trv) = remove_table_route_value(&ctx.kv_store, &trk).await?;
let (peers, table_routes) = fill_table_routes(vec![(tgv, trv)])?;
let header = Some(ResponseHeader::success(cluster_id));
Ok(RouteResponse {
header,
peers,
table_routes,
})
}
pub(crate) fn fill_table_routes(
tables: Vec<(TableGlobalValue, TableRouteValue)>,
tables: Vec<(TableInfoValue, TableRouteValue)>,
) -> Result<(Vec<Peer>, Vec<TableRoute>)> {
let mut peer_dict = PeerDict::default();
let mut table_routes = vec![];
@@ -370,7 +139,7 @@ pub(crate) fn fill_table_routes(
}
if let Some(table) = &mut table_route.table {
table.table_schema = tgv.as_bytes().context(error::InvalidCatalogValueSnafu)?;
table.table_schema = tgv.try_as_raw_value().context(TableMetadataManagerSnafu)?;
}
}
if let Some(table_route) = table_route {

View File

@@ -12,20 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::meta::TableRouteValue;
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_meta::helper::{TableGlobalKey, TableGlobalValue};
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::TableRouteKey;
use common_meta::rpc::store::{BatchGetRequest, MoveValueRequest, PutRequest};
use common_telemetry::warn;
use common_meta::rpc::store::PutRequest;
use common_meta::table_name::TableName;
use snafu::{OptionExt, ResultExt};
use table::engine::TableReference;
use crate::error::{
DecodeTableRouteSnafu, InvalidCatalogValueSnafu, Result, TableNotFoundSnafu,
DecodeTableRouteSnafu, InvalidCatalogValueSnafu, Result, TableMetadataManagerSnafu,
TableRouteNotFoundSnafu,
};
use crate::metasrv::Context;
use crate::service::store::kv::KvStoreRef;
pub async fn get_table_global_value(
@@ -37,58 +37,6 @@ pub async fn get_table_global_value(
.transpose()
}
pub(crate) async fn batch_get_table_global_value(
kv_store: &KvStoreRef,
keys: Vec<&TableGlobalKey>,
) -> Result<HashMap<TableGlobalKey, Option<TableGlobalValue>>> {
let req = BatchGetRequest {
keys: keys.iter().map(|x| x.to_raw_key()).collect::<Vec<_>>(),
};
let kvs = kv_store.batch_get(req).await?.kvs;
let mut result = HashMap::with_capacity(kvs.len());
for kv in kvs {
let key = TableGlobalKey::try_from_raw_key(kv.key()).context(InvalidCatalogValueSnafu)?;
let value = TableGlobalValue::from_bytes(kv.value()).context(InvalidCatalogValueSnafu)?;
let _ = result.insert(key, Some(value));
}
for key in keys {
if !result.contains_key(key) {
let _ = result.insert(key.clone(), None);
}
}
Ok(result)
}
pub(crate) async fn put_table_global_value(
kv_store: &KvStoreRef,
key: &TableGlobalKey,
value: &TableGlobalValue,
) -> Result<()> {
let req = PutRequest {
key: key.to_raw_key(),
value: value.as_bytes().context(InvalidCatalogValueSnafu)?,
prev_kv: false,
};
let _ = kv_store.put(req).await;
Ok(())
}
pub(crate) async fn remove_table_global_value(
kv_store: &KvStoreRef,
key: &TableGlobalKey,
) -> Result<(Vec<u8>, TableGlobalValue)> {
let key = key.to_string();
let removed_key = crate::keys::to_removed_key(&key);
let kv = move_value(kv_store, key.as_bytes(), removed_key)
.await?
.context(TableNotFoundSnafu { name: key })?;
let value: TableGlobalValue =
TableGlobalValue::from_bytes(&kv.1).context(InvalidCatalogValueSnafu)?;
Ok((kv.0, value))
}
pub(crate) async fn get_table_route_value(
kv_store: &KvStoreRef,
key: &TableRouteKey<'_>,
@@ -116,35 +64,6 @@ pub(crate) async fn put_table_route_value(
Ok(())
}
pub(crate) async fn remove_table_route_value(
kv_store: &KvStoreRef,
key: &TableRouteKey<'_>,
) -> Result<(Vec<u8>, TableRouteValue)> {
let from_key = key.to_string().into_bytes();
let to_key = key.removed_key().into_bytes();
let v = move_value(kv_store, from_key, to_key)
.await?
.context(TableRouteNotFoundSnafu {
key: key.to_string(),
})?;
let trv: TableRouteValue = v.1.as_slice().try_into().context(DecodeTableRouteSnafu)?;
Ok((v.0, trv))
}
async fn move_value(
kv_store: &KvStoreRef,
from_key: impl Into<Vec<u8>>,
to_key: impl Into<Vec<u8>>,
) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
let from_key = from_key.into();
let to_key = to_key.into();
let move_req = MoveValueRequest { from_key, to_key };
let res = kv_store.move_value(move_req).await?;
Ok(res.0.map(Into::into))
}
pub(crate) fn table_route_key(table_id: u32, t: &TableGlobalKey) -> TableRouteKey<'_> {
TableRouteKey {
table_id,
@@ -177,21 +96,30 @@ pub(crate) async fn fetch_table(
}
pub(crate) async fn fetch_tables(
kv_store: &KvStoreRef,
keys: impl Iterator<Item = TableGlobalKey>,
) -> Result<Vec<(TableGlobalValue, TableRouteValue)>> {
ctx: &Context,
table_names: Vec<TableName>,
) -> Result<Vec<(TableInfoValue, TableRouteValue)>> {
let kv_store = &ctx.kv_store;
let mut tables = vec![];
// Maybe we can optimize the for loop in the future, but in general,
// there won't be many keys, in fact, there is usually just one.
for tgk in keys {
let tgv = get_table_global_value(kv_store, &tgk).await?;
let Some(tgv) = tgv else {
warn!("Table global value is absent: {}", tgk);
for table_name in table_names {
let Some(tgv) = ctx.table_metadata_manager
.table_info_manager()
.get_old(&table_name)
.await
.context(TableMetadataManagerSnafu)? else {
continue;
};
let table_info = &tgv.table_info;
let trk = table_route_key(tgv.table_id(), &tgk);
let trk = TableRouteKey {
table_id: table_info.ident.table_id,
catalog_name: &table_info.catalog_name,
schema_name: &table_info.schema_name,
table_name: &table_info.name,
};
let trv = get_table_route_value(kv_store, &trk).await?;
tables.push((tgv, trv));
@@ -205,9 +133,11 @@ pub(crate) mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::{Peer, Region, RegionRoute, Table, TableName, TableRoute};
use api::v1::meta::{Peer, Region, RegionRoute, Table, TableRoute};
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::key::table_region::RegionDistribution;
use common_meta::key::TableMetadataManagerRef;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
@@ -217,53 +147,52 @@ pub(crate) mod tests {
use crate::error;
use crate::service::store::memory::MemStore;
pub(crate) async fn prepare_table_global_value(
kv_store: &KvStoreRef,
pub(crate) async fn prepare_table_region_and_info_value(
table_metadata_manager: &TableMetadataManagerRef,
table: &str,
) -> (TableGlobalKey, TableGlobalValue) {
) {
let table_info = RawTableInfo {
ident: TableIdent::new(1),
name: table.to_string(),
desc: None,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
meta: RawTableMeta {
schema: RawSchema::new(vec![ColumnSchema::new(
"a",
ConcreteDataType::string_datatype(),
true,
)]),
primary_key_indices: vec![],
value_indices: vec![],
engine: MITO_ENGINE.to_string(),
next_column_id: 1,
region_numbers: vec![1, 2, 3, 4],
engine_options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
},
table_type: TableType::Base,
};
table_metadata_manager
.table_info_manager()
.put_old(table_info)
.await
.unwrap();
// Region distribution:
// Datanode => Regions
// 1 => 1, 2
// 2 => 3
// 3 => 4
let regions_id_map = HashMap::from([(1, vec![1, 2]), (2, vec![3]), (3, vec![4])]);
let key = TableGlobalKey {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table.to_string(),
};
let value = TableGlobalValue {
node_id: 1,
regions_id_map,
table_info: RawTableInfo {
ident: TableIdent::new(1),
name: table.to_string(),
desc: None,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
meta: RawTableMeta {
schema: RawSchema::new(vec![ColumnSchema::new(
"a",
ConcreteDataType::string_datatype(),
true,
)]),
primary_key_indices: vec![],
value_indices: vec![],
engine: MITO_ENGINE.to_string(),
next_column_id: 1,
region_numbers: vec![1, 2, 3, 4],
engine_options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
},
table_type: TableType::Base,
},
};
put_table_global_value(kv_store, &key, &value)
table_metadata_manager
.table_region_manager()
.put_old(
&TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table),
RegionDistribution::from([(1, vec![1, 2]), (2, vec![3]), (3, vec![4])]),
)
.await
.unwrap();
(key, value)
}
pub(crate) async fn prepare_table_route_value<'a>(
@@ -299,11 +228,9 @@ pub(crate) mod tests {
let table_route = TableRoute {
table: Some(Table {
id: 1,
table_name: Some(TableName {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table.to_string(),
}),
table_name: Some(
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table).into(),
),
table_schema: vec![],
}),
region_routes,
@@ -347,34 +274,6 @@ pub(crate) mod tests {
}
}
#[tokio::test]
async fn test_put_and_get_table_global_value() {
let kv_store = Arc::new(MemStore::new()) as _;
let not_exist_key = TableGlobalKey {
catalog_name: "not_exist_catalog".to_string(),
schema_name: "not_exist_schema".to_string(),
table_name: "not_exist_table".to_string(),
};
assert!(get_table_global_value(&kv_store, &not_exist_key)
.await
.unwrap()
.is_none());
let (key, value) = prepare_table_global_value(&kv_store, "my_table").await;
let actual = get_table_global_value(&kv_store, &key)
.await
.unwrap()
.unwrap();
assert_eq!(actual, value);
let keys = vec![&not_exist_key, &key];
let result = batch_get_table_global_value(&kv_store, keys).await.unwrap();
assert_eq!(result.len(), 2);
assert!(result.get(&not_exist_key).unwrap().is_none());
assert_eq!(result.get(&key).unwrap().as_ref().unwrap(), &value);
}
#[tokio::test]
async fn test_put_and_get_table_route_value() {
let kv_store = Arc::new(MemStore::new()) as _;

View File

@@ -66,6 +66,7 @@ impl TableRoutes {
.meta_client
.route(RouteRequest {
table_names: vec![table_name.clone()],
table_ids: vec![],
})
.await
.context(error::RequestMetaSnafu)?;

View File

@@ -25,8 +25,8 @@ mod test {
CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DropTableExpr,
FlushTableExpr, InsertRequest, InsertRequests, QueryRequest,
};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_catalog::consts::MITO_ENGINE;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::table_name::TableName;
use common_query::Output;
use common_recordbatch::RecordBatches;
use frontend::instance::Instance;
@@ -35,6 +35,7 @@ mod test {
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContext;
use store_api::storage::RegionNumber;
use table::Table;
use tests::{has_parquet_file, test_region_dir};
use crate::tests;
@@ -332,20 +333,22 @@ CREATE TABLE {table_name} (
.unwrap()
.unwrap();
let table = table.as_any().downcast_ref::<DistTable>().unwrap();
let table_id = table.table_info().table_id();
let tgv = table
.table_global_value(&TableGlobalKey {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: table_name.to_string(),
})
let table_region_value = instance
.table_metadata_manager()
.table_region_manager()
.get_old(&TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
table_name,
))
.await
.unwrap()
.unwrap();
let table_id = tgv.table_id();
let region_to_dn_map = tgv
.regions_id_map
let region_to_dn_map = table_region_value
.region_distribution
.iter()
.map(|(k, v)| (v[0], *k))
.collect::<HashMap<u32, u64>>();
@@ -603,25 +606,19 @@ CREATE TABLE {table_name} (
table_name: &str,
expected_distribution: HashMap<u32, &str>,
) {
let table = instance
.frontend()
.catalog_manager()
.table("greptime", "public", table_name)
let table_region_value = instance
.table_metadata_manager()
.table_region_manager()
.get_old(&TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
table_name,
))
.await
.unwrap()
.unwrap();
let table = table.as_any().downcast_ref::<DistTable>().unwrap();
let TableGlobalValue { regions_id_map, .. } = table
.table_global_value(&TableGlobalKey {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: table_name.to_string(),
})
.await
.unwrap()
.unwrap();
let region_to_dn_map = regions_id_map
let region_to_dn_map = table_region_value
.region_distribution
.iter()
.map(|(k, v)| (v[0], *k))
.collect::<HashMap<u32, u64>>();

View File

@@ -19,13 +19,13 @@ mod tests {
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_base::Plugins;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::table_name::TableName;
use common_query::Output;
use common_recordbatch::RecordBatches;
use frontend::error::{self, Error, Result};
use frontend::instance::Instance;
use frontend::table::DistTable;
use query::parser::QueryLanguageParser;
use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
use servers::query_handler::sql::SqlQueryHandler;
@@ -177,25 +177,19 @@ mod tests {
instance: &MockDistributedInstance,
expected_distribution: HashMap<u32, &str>,
) {
let table = instance
.frontend()
.catalog_manager()
.table("greptime", "public", "demo")
let table_region_value = instance
.table_metadata_manager()
.table_region_manager()
.get_old(&TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"demo",
))
.await
.unwrap()
.unwrap();
let table = table.as_any().downcast_ref::<DistTable>().unwrap();
let TableGlobalValue { regions_id_map, .. } = table
.table_global_value(&TableGlobalKey {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "demo".to_string(),
})
.await
.unwrap()
.unwrap();
let region_to_dn_map = regions_id_map
let region_to_dn_map = table_region_value
.region_distribution
.iter()
.map(|(k, v)| (v[0], *k))
.collect::<HashMap<u32, u64>>();

View File

@@ -51,7 +51,6 @@ impl MockDistributedInstance {
&self.0.datanode_instances
}
#[allow(unused)]
pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
self.0.meta_srv.table_metadata_manager()
}

View File

@@ -17,9 +17,9 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::Peer;
use catalog::helper::TableGlobalKey;
use catalog::remote::CachedMetaKvBackend;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::helper::TableGlobalKey;
use common_meta::ident::TableIdent;
use common_meta::rpc::router::TableRoute;
use common_meta::rpc::KeyValue;