feat: return new added columns in region server's extension response (#3533)

* feat: adapt the new proto response

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update interfaces

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* write columns to extension

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use physical column's schema

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sort logical columns by name

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* format code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* return physical table's column

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/common/meta/src/datanode_manager.rs

Co-authored-by: JeremyHi <jiachun_feng@proton.me>

* implement sort column logic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* proxy create table procedure to create logical table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add unit test for sort_columns

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: JeremyHi <jiachun_feng@proton.me>
This commit is contained in:
Ruihang Xia
2024-03-23 17:31:16 +08:00
committed by GitHub
parent 24886b9530
commit 2b2fd80bf4
40 changed files with 543 additions and 171 deletions

2
Cargo.lock generated
View File

@@ -3870,7 +3870,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=96f1f0404f421ee560a4310c73c5071e49168168#96f1f0404f421ee560a4310c73c5071e49168168"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=349cb385583697f41010dabeb3c106d58f9599b4#349cb385583697f41010dabeb3c106d58f9599b4"
dependencies = [
"prost 0.12.3",
"serde",

View File

@@ -103,7 +103,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "96f1f0404f421ee560a4310c73c5071e49168168" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "349cb385583697f41010dabeb3c106d58f9599b4" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::ResponseHeader;
use arc_swap::ArcSwapOption;
use arrow_flight::Ticket;
@@ -23,7 +23,7 @@ use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_meta::datanode_manager::{AffectedRows, Datanode};
use common_meta::datanode_manager::{Datanode, HandleResponse};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
@@ -46,7 +46,7 @@ pub struct RegionRequester {
#[async_trait]
impl Datanode for RegionRequester {
async fn handle(&self, request: RegionRequest) -> MetaResult<AffectedRows> {
async fn handle(&self, request: RegionRequest) -> MetaResult<HandleResponse> {
self.handle_inner(request).await.map_err(|err| {
if err.should_retry() {
meta_error::Error::RetryLater {
@@ -165,7 +165,7 @@ impl RegionRequester {
Ok(Box::pin(record_batch_stream))
}
async fn handle_inner(&self, request: RegionRequest) -> Result<AffectedRows> {
async fn handle_inner(&self, request: RegionRequest) -> Result<HandleResponse> {
let request_type = request
.body
.as_ref()
@@ -178,10 +178,7 @@ impl RegionRequester {
let mut client = self.client.raw_region_client()?;
let RegionResponse {
header,
affected_rows,
} = client
let response = client
.handle(request)
.await
.map_err(|e| {
@@ -195,19 +192,20 @@ impl RegionRequester {
})?
.into_inner();
check_response_header(header)?;
check_response_header(&response.header)?;
Ok(affected_rows as _)
Ok(HandleResponse::from_region_response(response))
}
pub async fn handle(&self, request: RegionRequest) -> Result<AffectedRows> {
pub async fn handle(&self, request: RegionRequest) -> Result<HandleResponse> {
self.handle_inner(request).await
}
}
pub fn check_response_header(header: Option<ResponseHeader>) -> Result<()> {
pub fn check_response_header(header: &Option<ResponseHeader>) -> Result<()> {
let status = header
.and_then(|header| header.status)
.as_ref()
.and_then(|header| header.status.as_ref())
.context(IllegalDatabaseResponseSnafu {
err_msg: "either response header or status is missing",
})?;
@@ -221,7 +219,7 @@ pub fn check_response_header(header: Option<ResponseHeader>) -> Result<()> {
})?;
ServerSnafu {
code,
msg: status.err_msg,
msg: status.err_msg.clone(),
}
.fail()
}
@@ -236,19 +234,19 @@ mod test {
#[test]
fn test_check_response_header() {
let result = check_response_header(None);
let result = check_response_header(&None);
assert!(matches!(
result.unwrap_err(),
IllegalDatabaseResponse { .. }
));
let result = check_response_header(Some(ResponseHeader { status: None }));
let result = check_response_header(&Some(ResponseHeader { status: None }));
assert!(matches!(
result.unwrap_err(),
IllegalDatabaseResponse { .. }
));
let result = check_response_header(Some(ResponseHeader {
let result = check_response_header(&Some(ResponseHeader {
status: Some(PbStatus {
status_code: StatusCode::Success as u32,
err_msg: String::default(),
@@ -256,7 +254,7 @@ mod test {
}));
assert!(result.is_ok());
let result = check_response_header(Some(ResponseHeader {
let result = check_response_header(&Some(ResponseHeader {
status: Some(PbStatus {
status_code: u32::MAX,
err_msg: String::default(),
@@ -267,7 +265,7 @@ mod test {
IllegalDatabaseResponse { .. }
));
let result = check_response_header(Some(ResponseHeader {
let result = check_response_header(&Some(ResponseHeader {
status: Some(PbStatus {
status_code: StatusCode::Internal as u32,
err_msg: "blabla".to_string(),

View File

@@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;
@@ -25,7 +26,7 @@ use crate::peer::Peer;
#[async_trait::async_trait]
pub trait Datanode: Send + Sync {
/// Handles DML, and DDL requests.
async fn handle(&self, request: RegionRequest) -> Result<AffectedRows>;
async fn handle(&self, request: RegionRequest) -> Result<HandleResponse>;
/// Handles query requests
async fn handle_query(&self, request: QueryRequest) -> Result<SendableRecordBatchStream>;
@@ -41,3 +42,27 @@ pub trait DatanodeManager: Send + Sync {
}
pub type DatanodeManagerRef = Arc<dyn DatanodeManager>;
/// This result struct is derived from [RegionResponse]
#[derive(Debug)]
pub struct HandleResponse {
pub affected_rows: AffectedRows,
pub extension: HashMap<String, Vec<u8>>,
}
impl HandleResponse {
pub fn from_region_response(region_response: RegionResponse) -> Self {
Self {
affected_rows: region_response.affected_rows as _,
extension: region_response.extension,
}
}
/// Creates one response without extension
pub fn new(affected_rows: AffectedRows) -> Self {
Self {
affected_rows,
extension: Default::default(),
}
}
}

View File

@@ -70,6 +70,7 @@ impl CreateLogicalTablesProcedure {
/// - Checks whether physical table exists.
/// - Checks whether logical tables exist.
/// - Allocates the table ids.
/// - Modify tasks to sort logical columns on their names.
///
/// Abort(non-retry):
/// - The physical table does not exist.
@@ -130,7 +131,7 @@ impl CreateLogicalTablesProcedure {
));
}
// Allocates table ids
// Allocates table ids and sort columns on their names.
for (task, table_id) in tasks.iter_mut().zip(already_exists_tables_ids.iter()) {
let table_id = if let Some(table_id) = table_id {
*table_id
@@ -141,6 +142,11 @@ impl CreateLogicalTablesProcedure {
.await?
};
task.set_table_id(table_id);
// sort columns in task
task.sort_columns();
common_telemetry::info!("[DEBUG] sorted task {:?}", task);
}
self.creator

View File

@@ -28,6 +28,7 @@ use common_telemetry::debug;
use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use crate::datanode_manager::HandleResponse;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::test_util::create_table::build_raw_table_info_from_expr;
use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder};
@@ -36,7 +37,7 @@ use crate::error::{Error, Result};
use crate::key::table_route::TableRouteValue;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::test_util::{new_ddl_context, AffectedRows, MockDatanodeHandler, MockDatanodeManager};
use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};
// Note: this code may be duplicated with others.
// However, it's by design, ensures the tests are easy to be modified or added.
@@ -332,9 +333,9 @@ pub struct NaiveDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for NaiveDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
Ok(0)
Ok(HandleResponse::new(0))
}
async fn handle_query(

View File

@@ -26,6 +26,7 @@ use common_procedure_test::MockContextProvider;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use crate::datanode_manager::HandleResponse;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::test_util::create_table::build_raw_table_info_from_expr;
use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder};
@@ -34,11 +35,11 @@ use crate::error::{Error, Result};
use crate::key::table_route::TableRouteValue;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::test_util::{new_ddl_context, AffectedRows, MockDatanodeHandler, MockDatanodeManager};
use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};
#[async_trait::async_trait]
impl MockDatanodeHandler for () {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<AffectedRows> {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<HandleResponse> {
unreachable!()
}
@@ -176,7 +177,7 @@ pub struct RetryErrorDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for RetryErrorDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning retry later for request: {request:?}, peer: {peer:?}");
Err(Error::RetryLater {
source: BoxedError::new(
@@ -220,7 +221,7 @@ pub struct UnexpectedErrorDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
error::UnexpectedSnafu {
err_msg: "mock error",
@@ -260,9 +261,9 @@ pub struct NaiveDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for NaiveDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
Ok(0)
Ok(HandleResponse::new(0))
}
async fn handle_query(

View File

@@ -22,7 +22,7 @@ use api::v1::meta::{
DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks, Partition, ProcedureId,
TruncateTableTask as PbTruncateTableTask,
};
use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, TruncateTableExpr};
use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, SemanticType, TruncateTableExpr};
use base64::engine::general_purpose;
use base64::Engine as _;
use prost::Message;
@@ -368,6 +368,44 @@ impl CreateTableTask {
pub fn set_table_id(&mut self, table_id: TableId) {
self.table_info.ident.table_id = table_id;
}
/// Sort the columns in [CreateTableExpr] and [RawTableInfo].
///
/// This function won't do any check or verification. Caller should
/// ensure this task is valid.
pub fn sort_columns(&mut self) {
// sort create table expr
// sort column_defs by name
self.create_table
.column_defs
.sort_unstable_by(|a, b| a.name.cmp(&b.name));
// compute new indices of sorted columns
// this part won't do any check or verification.
let mut primary_key_indices = Vec::with_capacity(self.create_table.primary_keys.len());
let mut value_indices =
Vec::with_capacity(self.create_table.column_defs.len() - primary_key_indices.len() - 1);
let mut timestamp_index = None;
for (index, col) in self.create_table.column_defs.iter().enumerate() {
if self.create_table.primary_keys.contains(&col.name) {
primary_key_indices.push(index);
} else if col.semantic_type == SemanticType::Timestamp as i32 {
timestamp_index = Some(index);
} else {
value_indices.push(index);
}
}
// overwrite table info
self.table_info
.meta
.schema
.column_schemas
.sort_unstable_by(|a, b| a.name.cmp(&b.name));
self.table_info.meta.schema.timestamp_index = timestamp_index;
self.table_info.meta.primary_key_indices = primary_key_indices;
self.table_info.meta.value_indices = value_indices;
}
}
impl Serialize for CreateTableTask {
@@ -555,9 +593,11 @@ impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
mod tests {
use std::sync::Arc;
use api::v1::{AlterExpr, CreateTableExpr};
use datatypes::schema::SchemaBuilder;
use table::metadata::RawTableInfo;
use api::v1::{AlterExpr, ColumnDef, CreateTableExpr, SemanticType};
use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::storage::ConcreteDataType;
use table::metadata::{RawTableInfo, RawTableMeta, TableType};
use table::test_util::table_info::test_table_info;
use super::{AlterTableTask, CreateTableTask};
@@ -589,4 +629,108 @@ mod tests {
let de = serde_json::from_slice(&output).unwrap();
assert_eq!(task, de);
}
#[test]
fn test_sort_columns() {
// construct RawSchema
let raw_schema = RawSchema {
column_schemas: vec![
ColumnSchema::new(
"column3".to_string(),
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"column1".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
ColumnSchema::new(
"column2".to_string(),
ConcreteDataType::float64_datatype(),
true,
),
],
timestamp_index: Some(1),
version: 0,
};
// construct RawTableMeta
let raw_table_meta = RawTableMeta {
schema: raw_schema,
primary_key_indices: vec![0],
value_indices: vec![2],
engine: METRIC_ENGINE_NAME.to_string(),
next_column_id: 0,
region_numbers: vec![0],
options: Default::default(),
created_on: Default::default(),
partition_key_indices: Default::default(),
};
// construct RawTableInfo
let raw_table_info = RawTableInfo {
ident: Default::default(),
meta: raw_table_meta,
name: Default::default(),
desc: Default::default(),
catalog_name: Default::default(),
schema_name: Default::default(),
table_type: TableType::Base,
};
// construct create table expr
let create_table_expr = CreateTableExpr {
column_defs: vec![
ColumnDef {
name: "column3".to_string(),
semantic_type: SemanticType::Tag as i32,
..Default::default()
},
ColumnDef {
name: "column1".to_string(),
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
},
ColumnDef {
name: "column2".to_string(),
semantic_type: SemanticType::Field as i32,
..Default::default()
},
],
primary_keys: vec!["column3".to_string()],
..Default::default()
};
let mut create_table_task =
CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
// Call the sort_columns method
create_table_task.sort_columns();
// Assert that the columns are sorted correctly
assert_eq!(
create_table_task.create_table.column_defs[0].name,
"column1".to_string()
);
assert_eq!(
create_table_task.create_table.column_defs[1].name,
"column2".to_string()
);
assert_eq!(
create_table_task.create_table.column_defs[2].name,
"column3".to_string()
);
// Assert that the table_info is updated correctly
assert_eq!(
create_table_task.table_info.meta.schema.timestamp_index,
Some(0)
);
assert_eq!(
create_table_task.table_info.meta.primary_key_indices,
vec![2]
);
assert_eq!(create_table_task.table_info.meta.value_indices, vec![1]);
}
}

View File

@@ -19,7 +19,9 @@ pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;
use crate::cache_invalidator::DummyCacheInvalidator;
use crate::datanode_manager::{Datanode, DatanodeManager, DatanodeManagerRef, DatanodeRef};
use crate::datanode_manager::{
Datanode, DatanodeManager, DatanodeManagerRef, DatanodeRef, HandleResponse,
};
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::DdlContext;
use crate::error::Result;
@@ -32,7 +34,7 @@ use crate::wal_options_allocator::WalOptionsAllocator;
#[async_trait::async_trait]
pub trait MockDatanodeHandler: Sync + Send + Clone {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows>;
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse>;
async fn handle_query(
&self,
@@ -62,7 +64,7 @@ struct MockDatanode<T> {
#[async_trait::async_trait]
impl<T: MockDatanodeHandler> Datanode for MockDatanode<T> {
async fn handle(&self, request: RegionRequest) -> Result<AffectedRows> {
async fn handle(&self, request: RegionRequest) -> Result<HandleResponse> {
self.handler.handle(&self.peer, request).await
}

View File

@@ -25,6 +25,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use common_error::ext::BoxedError;
use common_error::status_code::StatusCode;
use common_meta::datanode_manager::HandleResponse;
use common_query::logical_plan::Expr;
use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::{DfPhysicalPlan, OutputData};
@@ -128,7 +129,7 @@ impl RegionServer {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<AffectedRows> {
) -> Result<HandleResponse> {
self.inner.handle_request(region_id, request).await
}
@@ -267,11 +268,10 @@ impl RegionServerHandler for RegionServer {
results
};
// merge results by simply sum up affected rows.
// only insert/delete will have multiple results.
// merge results by sum up affected rows and merge extensions.
let mut affected_rows = 0;
for result in results {
affected_rows += result;
affected_rows += result.affected_rows;
}
Ok(RegionResponse {
@@ -282,6 +282,7 @@ impl RegionServerHandler for RegionServer {
}),
}),
affected_rows: affected_rows as _,
extension: Default::default(),
})
}
}
@@ -462,7 +463,7 @@ impl RegionServerInner {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<AffectedRows> {
) -> Result<HandleResponse> {
let request_type = request.request_type();
let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
.with_label_values(&[request_type])
@@ -487,7 +488,7 @@ impl RegionServerInner {
let engine = match self.get_engine(region_id, &region_change)? {
CurrentEngine::Engine(engine) => engine,
CurrentEngine::EarlyReturn(rows) => return Ok(rows),
CurrentEngine::EarlyReturn(rows) => return Ok(HandleResponse::new(rows)),
};
// Sets corresponding region status to registering/deregistering before the operation.
@@ -502,7 +503,10 @@ impl RegionServerInner {
// Sets corresponding region status to ready.
self.set_region_status_ready(region_id, engine, region_change)
.await?;
Ok(result)
Ok(HandleResponse {
affected_rows: result.affected_rows,
extension: result.extension,
})
}
Err(err) => {
// Removes the region status if the operation fails.
@@ -645,6 +649,7 @@ impl RegionServerInner {
.decode(Bytes::from(plan), catalog_list, "", "")
.await
.context(DecodeLogicalPlanSnafu)?;
let result = self
.query_engine
.execute(logical_plan.into(), ctx)
@@ -916,11 +921,11 @@ mod tests {
RegionEngineWithStatus::Registering(engine.clone()),
);
let affected_rows = mock_region_server
let response = mock_region_server
.handle_request(region_id, RegionRequest::Create(create_req))
.await
.unwrap();
assert_eq!(affected_rows, 0);
assert_eq!(response.affected_rows, 0);
let status = mock_region_server
.inner
@@ -931,7 +936,7 @@ mod tests {
assert!(matches!(status, RegionEngineWithStatus::Registering(_)));
let affected_rows = mock_region_server
let response = mock_region_server
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
@@ -943,7 +948,7 @@ mod tests {
)
.await
.unwrap();
assert_eq!(affected_rows, 0);
assert_eq!(response.affected_rows, 0);
let status = mock_region_server
.inner
@@ -971,11 +976,11 @@ mod tests {
RegionEngineWithStatus::Deregistering(engine.clone()),
);
let affected_rows = mock_region_server
let response = mock_region_server
.handle_request(region_id, RegionRequest::Drop(RegionDropRequest {}))
.await
.unwrap();
assert_eq!(affected_rows, 0);
assert_eq!(response.affected_rows, 0);
let status = mock_region_server
.inner
@@ -990,11 +995,11 @@ mod tests {
RegionEngineWithStatus::Deregistering(engine.clone()),
);
let affected_rows = mock_region_server
let response = mock_region_server
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();
assert_eq!(affected_rows, 0);
assert_eq!(response.affected_rows, 0);
let status = mock_region_server
.inner

View File

@@ -31,7 +31,7 @@ use query::query_engine::DescribeResult;
use query::{QueryEngine, QueryEngineContext};
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_engine::{RegionEngine, RegionHandleResult, RegionRole, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use table::TableRef;
@@ -166,16 +166,18 @@ impl RegionEngine for MockRegionEngine {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<AffectedRows, BoxedError> {
) -> Result<RegionHandleResult, BoxedError> {
if let Some(delay) = self.handle_request_delay {
tokio::time::sleep(delay).await;
}
if let Some(mock_fn) = &self.handle_request_mock_fn {
return mock_fn(region_id, request).map_err(BoxedError::new);
return mock_fn(region_id, request)
.map_err(BoxedError::new)
.map(RegionHandleResult::new);
};
let _ = self.sender.send((region_id, request)).await;
Ok(0)
Ok(RegionHandleResult::new(0))
}
async fn handle_query(

View File

@@ -143,11 +143,22 @@ impl ColumnSchema {
}
/// Set the nullablity to `true` of the column.
/// Similar to [set_nullable] but take the ownership and return a owned value.
///
/// [set_nullable]: Self::set_nullable
pub fn with_nullable_set(mut self) -> Self {
self.is_nullable = true;
self
}
/// Set the nullability to `true` of the column.
/// Similar to [with_nullable_set] but don't take the ownership
///
/// [with_nullable_set]: Self::with_nullable_set
pub fn set_nullable(&mut self) {
self.is_nullable = true;
}
/// Creates a new [`ColumnSchema`] with given metadata.
pub fn with_metadata(mut self, metadata: Metadata) -> Self {
self.metadata = metadata;

View File

@@ -24,7 +24,7 @@ use common_telemetry::{error, info};
use object_store::ObjectStore;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_engine::{RegionEngine, RegionHandleResult, RegionRole, SetReadonlyResponse};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
RegionRequest,
@@ -60,7 +60,7 @@ impl RegionEngine for FileRegionEngine {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<AffectedRows, BoxedError> {
) -> Result<RegionHandleResult, BoxedError> {
self.inner
.handle_request(region_id, request)
.await
@@ -154,8 +154,8 @@ impl EngineInner {
&self,
region_id: RegionId,
request: RegionRequest,
) -> EngineResult<AffectedRows> {
match request {
) -> EngineResult<RegionHandleResult> {
let result = match request {
RegionRequest::Create(req) => self.handle_create(region_id, req).await,
RegionRequest::Drop(req) => self.handle_drop(region_id, req).await,
RegionRequest::Open(req) => self.handle_open(region_id, req).await,
@@ -164,7 +164,8 @@ impl EngineInner {
operation: request.to_string(),
}
.fail(),
}
};
result.map(RegionHandleResult::new)
}
async fn stop(&self) -> EngineResult<()> {

View File

@@ -18,7 +18,7 @@ use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
use async_trait::async_trait;
use client::region::check_response_header;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef};
use common_meta::datanode_manager::{Datanode, DatanodeManager, DatanodeRef, HandleResponse};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::peer::Peer;
use common_recordbatch::SendableRecordBatchStream;
@@ -63,7 +63,7 @@ impl RegionInvoker {
#[async_trait]
impl Datanode for RegionInvoker {
async fn handle(&self, request: RegionRequest) -> MetaResult<AffectedRows> {
async fn handle(&self, request: RegionRequest) -> MetaResult<HandleResponse> {
let span = request
.header
.as_ref()
@@ -76,10 +76,10 @@ impl Datanode for RegionInvoker {
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
check_response_header(response.header)
check_response_header(&response.header)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
Ok(response.affected_rows as _)
Ok(HandleResponse::from_region_response(response))
}
async fn handle_query(&self, request: QueryRequest) -> MetaResult<SendableRecordBatchStream> {

View File

@@ -93,6 +93,7 @@ pub mod mock {
}),
}),
affected_rows: 0,
extension: Default::default(),
})
}
}

View File

@@ -58,18 +58,19 @@ impl DataRegion {
/// Invoker don't need to set up or verify the column id. This method will adjust
/// it using underlying schema.
///
/// This method will also set the nullable marker to true.
/// This method will also set the nullable marker to true. All of those change are applies
/// to `columns` in-place.
pub async fn add_columns(
&self,
region_id: RegionId,
columns: Vec<ColumnMetadata>,
columns: &mut [ColumnMetadata],
) -> Result<()> {
let region_id = utils::to_data_region_id(region_id);
let mut retries = 0;
// submit alter request
while retries < MAX_RETRIES {
let request = self.assemble_alter_request(region_id, &columns).await?;
let request = self.assemble_alter_request(region_id, columns).await?;
let _timer = MITO_DDL_DURATION.start_timer();
@@ -90,10 +91,12 @@ impl DataRegion {
Ok(())
}
/// Generate warpped [RegionAlterRequest] with given [ColumnMetadata].
/// This method will modify `columns` in-place.
async fn assemble_alter_request(
&self,
region_id: RegionId,
columns: &[ColumnMetadata],
columns: &mut [ColumnMetadata],
) -> Result<RegionRequest> {
// retrieve underlying version
let region_metadata = self
@@ -118,15 +121,14 @@ impl DataRegion {
.unwrap_or(0);
// overwrite semantic type
let columns = columns
.iter()
let new_columns = columns
.iter_mut()
.enumerate()
.map(|(delta, c)| {
let mut c = c.clone();
if c.semantic_type == SemanticType::Tag {
if !c.column_schema.data_type.is_string() {
return ColumnTypeMismatchSnafu {
column_type: c.column_schema.data_type,
column_type: c.column_schema.data_type.clone(),
}
.fail();
}
@@ -138,11 +140,10 @@ impl DataRegion {
};
c.column_id = new_column_id_start + delta as u32;
c.column_schema = c.column_schema.with_nullable_set();
c.column_schema.set_nullable();
Ok(AddColumn {
column_metadata: c,
column_metadata: c.clone(),
location: None,
})
})
@@ -151,7 +152,9 @@ impl DataRegion {
// assemble alter request
let alter_request = RegionRequest::Alter(RegionAlterRequest {
schema_version: version,
kind: AlterKind::AddColumns { columns },
kind: AlterKind::AddColumns {
columns: new_columns,
},
});
Ok(alter_request)
@@ -167,6 +170,7 @@ impl DataRegion {
.handle_request(region_id, RegionRequest::Put(request))
.await
.context(MitoWriteOperationSnafu)
.map(|result| result.affected_rows)
}
pub async fn physical_columns(
@@ -205,7 +209,7 @@ mod test {
// TestEnv will create a logical region which changes the version to 1.
assert_eq!(current_version, 1);
let new_columns = vec![
let mut new_columns = vec![
ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
@@ -226,7 +230,7 @@ mod test {
},
];
env.data_region()
.add_columns(env.default_physical_region_id(), new_columns)
.add_columns(env.default_physical_region_id(), &mut new_columns)
.await
.unwrap();
@@ -258,14 +262,14 @@ mod test {
let env = TestEnv::new().await;
env.init_metric_region().await;
let new_columns = vec![ColumnMetadata {
let mut new_columns = vec![ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new("tag2", ConcreteDataType::int64_datatype(), false),
}];
let result = env
.data_region()
.add_columns(env.default_physical_region_id(), new_columns)
.add_columns(env.default_physical_region_id(), &mut new_columns)
.await;
assert!(result.is_err());
}

View File

@@ -24,6 +24,7 @@ mod region_metadata;
mod state;
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use async_trait::async_trait;
@@ -33,13 +34,13 @@ use common_recordbatch::SendableRecordBatchStream;
use mito2::engine::MitoEngine;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::region_engine::{RegionEngine, RegionHandleResult, RegionRole, SetReadonlyResponse};
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
use self::state::MetricEngineState;
use crate::data_region::DataRegion;
use crate::error::Result;
use crate::error::{Result, UnsupportedRegionRequestSnafu};
use crate::metadata_region::MetadataRegion;
use crate::utils;
@@ -121,23 +122,39 @@ impl RegionEngine for MetricEngine {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<AffectedRows, BoxedError> {
) -> Result<RegionHandleResult, BoxedError> {
let mut extension_return_value = HashMap::new();
let result = match request {
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Delete(_) => todo!(),
RegionRequest::Create(create) => self.inner.create_region(region_id, create).await,
RegionRequest::Create(create) => {
self.inner
.create_region(region_id, create, &mut extension_return_value)
.await
}
RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
RegionRequest::Alter(alter) => self.inner.alter_region(region_id, alter).await,
RegionRequest::Flush(_) => todo!(),
RegionRequest::Compact(_) => todo!(),
RegionRequest::Truncate(_) => todo!(),
RegionRequest::Alter(alter) => {
self.inner
.alter_region(region_id, alter, &mut extension_return_value)
.await
}
RegionRequest::Delete(_)
| RegionRequest::Flush(_)
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
// It always Ok(0), all data is the latest.
RegionRequest::Catchup(_) => Ok(0),
};
result.map_err(BoxedError::new)
// TODO: pass extension
result
.map_err(BoxedError::new)
.map(|rows| RegionHandleResult {
affected_rows: rows,
extension: extension_return_value,
})
}
/// Handles substrait query and return a stream of record batches

View File

@@ -12,13 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_telemetry::{error, info};
use snafu::OptionExt;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest};
use store_api::storage::RegionId;
use crate::engine::MetricEngineInner;
use crate::error::{ForbiddenPhysicalAlterSnafu, LogicalRegionNotFoundSnafu, Result};
use crate::error::{
ForbiddenPhysicalAlterSnafu, LogicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
};
use crate::metrics::FORBIDDEN_OPERATION_COUNT;
use crate::utils::{to_data_region_id, to_metadata_region_id};
@@ -28,23 +34,39 @@ impl MetricEngineInner {
&self,
region_id: RegionId,
request: RegionAlterRequest,
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<AffectedRows> {
let is_altering_physical_region = self.is_physical_region(region_id);
let result = if is_altering_physical_region {
self.alter_physical_region(region_id, request).await
} else {
self.alter_logical_region(region_id, request).await
let physical_region_id = self.alter_logical_region(region_id, request).await?;
// Add physical table's column to extension map.
// It's ok to overwrite existing key, as the latter come schema is more up-to-date
let physical_columns = self
.data_region
.physical_columns(physical_region_id)
.await?;
extension_return_value.insert(
ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
ColumnMetadata::encode_list(&physical_columns)
.context(SerializeColumnMetadataSnafu)?,
);
Ok(())
};
result.map(|_| 0)
}
/// Return the physical region id behind this logical region
async fn alter_logical_region(
&self,
region_id: RegionId,
request: RegionAlterRequest,
) -> Result<()> {
) -> Result<RegionId> {
let physical_region_id = {
let state = &self.state.read().unwrap();
state.get_physical_region_id(region_id).with_context(|| {
@@ -55,7 +77,7 @@ impl MetricEngineInner {
// only handle adding column
let AlterKind::AddColumns { columns } = request.kind else {
return Ok(());
return Ok(physical_region_id);
};
let metadata_region_id = to_metadata_region_id(physical_region_id);
@@ -92,7 +114,7 @@ impl MetricEngineInner {
.await?;
}
Ok(())
Ok(physical_region_id)
}
async fn alter_physical_region(

View File

@@ -15,6 +15,7 @@
use std::collections::{HashMap, HashSet};
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_telemetry::info;
use common_time::Timestamp;
use datatypes::data_type::ConcreteDataType;
@@ -25,11 +26,12 @@ use object_store::util::join_dir;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::{
DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR, METADATA_SCHEMA_KEY_COLUMN_INDEX,
METADATA_SCHEMA_KEY_COLUMN_NAME, METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX,
METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
METADATA_SCHEMA_VALUE_COLUMN_NAME, PHYSICAL_TABLE_METADATA_KEY,
ALTER_PHYSICAL_EXTENSION_KEY, DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR,
METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
@@ -41,8 +43,9 @@ use crate::engine::options::{
};
use crate::engine::MetricEngineInner;
use crate::error::{
ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu,
MissingRegionOptionSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu, Result,
ColumnNotFoundSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
InternalColumnOccupiedSnafu, MissingRegionOptionSnafu, MitoReadOperationSnafu,
ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
};
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT};
use crate::utils::{to_data_region_id, to_metadata_region_id};
@@ -53,13 +56,28 @@ impl MetricEngineInner {
&self,
region_id: RegionId,
request: RegionCreateRequest,
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<AffectedRows> {
Self::verify_region_create_request(&request)?;
let result = if request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) {
self.create_physical_region(region_id, request).await
} else if request.options.contains_key(LOGICAL_TABLE_METADATA_KEY) {
self.create_logical_region(region_id, request).await
let physical_region_id = self.create_logical_region(region_id, request).await?;
// Add physical table's column to extension map.
// It's ok to overwrite existing key, as the latter come schema is more up-to-date
let physical_columns = self
.data_region
.physical_columns(physical_region_id)
.await?;
extension_return_value.insert(
ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
ColumnMetadata::encode_list(&physical_columns)
.context(SerializeColumnMetadataSnafu)?,
);
Ok(())
} else {
MissingRegionOptionSnafu {}.fail()
};
@@ -126,11 +144,16 @@ impl MetricEngineInner {
/// This method will alter the data region to add columns if necessary.
///
/// If the logical region to create already exists, this method will do nothing.
///
/// `alter_request` is a hashmap that stores the alter requests that were executed
/// to the physical region.
///
/// Return the physical region id of this logical region
async fn create_logical_region(
&self,
logical_region_id: RegionId,
request: RegionCreateRequest,
) -> Result<()> {
) -> Result<RegionId> {
// transform IDs
let physical_region_id_raw = request
.options
@@ -151,11 +174,12 @@ impl MetricEngineInner {
.await?
{
info!("Create a existing logical region {logical_region_id}. Skipped");
return Ok(());
return Ok(data_region_id);
}
// find new columns to add
let mut new_columns = vec![];
let mut existing_columns = vec![];
{
let state = &self.state.read().unwrap();
let physical_columns =
@@ -168,6 +192,8 @@ impl MetricEngineInner {
for col in &request.column_metadatas {
if !physical_columns.contains(&col.column_schema.name) {
new_columns.push(col.clone());
} else {
existing_columns.push(col.column_schema.name.clone());
}
}
}
@@ -188,9 +214,28 @@ impl MetricEngineInner {
self.metadata_region
.add_logical_region(metadata_region_id, logical_region_id)
.await?;
for col in &request.column_metadatas {
// register existing physical column to this new logical region.
let physical_schema = self
.data_region
.physical_columns(data_region_id)
.await
.map_err(BoxedError::new)
.context(MitoReadOperationSnafu)?;
let physical_schema_map = physical_schema
.into_iter()
.map(|metadata| (metadata.column_schema.name.clone(), metadata))
.collect::<HashMap<_, _>>();
for col in &existing_columns {
let column_metadata = physical_schema_map
.get(col)
.with_context(|| ColumnNotFoundSnafu {
name: col,
region_id: physical_region_id,
})?
.clone();
self.metadata_region
.add_column(metadata_region_id, logical_region_id, col)
.add_column(metadata_region_id, logical_region_id, &column_metadata)
.await?;
}
@@ -203,19 +248,21 @@ impl MetricEngineInner {
info!("Created new logical region {logical_region_id} on physical region {data_region_id}");
LOGICAL_REGION_COUNT.inc();
Ok(())
Ok(data_region_id)
}
/// Execute corresponding alter requests to mito region. New added columns' [ColumnMetadata] will be
/// cloned into `added_columns`.
pub(crate) async fn add_columns_to_physical_data_region(
&self,
data_region_id: RegionId,
metadata_region_id: RegionId,
logical_region_id: RegionId,
new_columns: Vec<ColumnMetadata>,
mut new_columns: Vec<ColumnMetadata>,
) -> Result<()> {
// alter data region
self.data_region
.add_columns(data_region_id, new_columns.clone())
.add_columns(data_region_id, &mut new_columns)
.await?;
// register columns to metadata region
@@ -362,13 +409,13 @@ impl MetricEngineInner {
// concat region dir
data_region_request.region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
// convert semantic type
// change nullability for tag columns
data_region_request
.column_metadatas
.iter_mut()
.for_each(|metadata| {
if metadata.semantic_type == SemanticType::Tag {
metadata.semantic_type = SemanticType::Field;
metadata.column_schema.set_nullable();
}
});

View File

@@ -215,12 +215,12 @@ mod tests {
// write data
let logical_region_id = env.default_logical_region_id();
let count = env
let result = env
.metric()
.handle_request(logical_region_id, request)
.await
.unwrap();
assert_eq!(count, 5);
assert_eq!(result.affected_rows, 5);
// read data from physical region
let physical_region_id = env.default_physical_region_id();
@@ -287,11 +287,11 @@ mod tests {
});
// write data
let count = engine
let result = engine
.handle_request(logical_region_id, request)
.await
.unwrap();
assert_eq!(100, count);
assert_eq!(100, result.affected_rows);
}
#[tokio::test]

View File

@@ -143,6 +143,7 @@ impl MetricEngineInner {
self.default_projection(physical_region_id, logical_region_id)
.await?
};
request.projection = Some(physical_projection);
// add table filter
@@ -186,6 +187,7 @@ impl MetricEngineInner {
.get_metadata(data_region_id)
.await
.context(MitoReadOperationSnafu)?;
for name in projected_logical_names {
// Safety: logical columns is a strict subset of physical columns
physical_projection.push(physical_metadata.column_index_by_name(&name).unwrap());
@@ -301,7 +303,7 @@ mod test {
.await
.unwrap();
assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 8, 9, 10, 11]);
assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 0, 1, 4]);
assert_eq!(scan_req.filters.len(), 1);
assert_eq!(
scan_req.filters[0],
@@ -318,6 +320,6 @@ mod test {
.transform_request(physical_region_id, logical_region_id, scan_req)
.await
.unwrap();
assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 8, 9, 10, 11]);
assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 0, 1, 4]);
}
}

View File

@@ -39,7 +39,8 @@ impl MetricEngineInner {
.collect::<Vec<_>>();
// sort columns on column id to ensure the order
logical_column_metadata.sort_unstable_by_key(|col| col.column_id);
logical_column_metadata
.sort_unstable_by(|c1, c2| c1.column_schema.name.cmp(&c2.column_schema.name));
Ok(logical_column_metadata)
}

View File

@@ -19,6 +19,7 @@ use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::prelude::ConcreteDataType;
use snafu::{Location, Snafu};
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
#[derive(Snafu)]
@@ -71,6 +72,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to serialize column metadata"))]
SerializeColumnMetadata {
#[snafu(source)]
error: serde_json::Error,
location: Location,
},
#[snafu(display("Failed to decode base64 column value"))]
DecodeColumnValue {
#[snafu(source)]
@@ -155,6 +163,12 @@ pub enum Error {
region_id: RegionId,
location: Location,
},
#[snafu(display("Unsupported region request: {}", request))]
UnsupportedRegionRequest {
request: RegionRequest,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -170,11 +184,14 @@ impl ErrorExt for Error {
| ColumnTypeMismatch { .. }
| PhysicalRegionBusy { .. } => StatusCode::InvalidArguments,
ForbiddenPhysicalAlter { .. } => StatusCode::Unsupported,
ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => {
StatusCode::Unsupported
}
MissingInternalColumn { .. }
| DeserializeSemanticType { .. }
| DeserializeColumnMetadata { .. }
| SerializeColumnMetadata { .. }
| DecodeColumnValue { .. }
| ParseRegionId { .. }
| InvalidMetadata { .. } => StatusCode::Unexpected,

View File

@@ -167,7 +167,7 @@ impl MetadataRegion {
// TODO(ruihang): avoid using `get_all`
/// Get all the columns of a given logical region.
/// Return a list of (column_name, semantic_type).
/// Return a list of (column_name, column_metadata).
pub async fn logical_columns(
&self,
physical_region_id: RegionId,

View File

@@ -57,7 +57,7 @@ use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_engine::{RegionEngine, RegionHandleResult, RegionRole, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::oneshot;
@@ -290,10 +290,11 @@ impl RegionEngine for MitoEngine {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<AffectedRows, BoxedError> {
) -> Result<RegionHandleResult, BoxedError> {
self.inner
.handle_request(region_id, request)
.await
.map(RegionHandleResult::new)
.map_err(BoxedError::new)
}

View File

@@ -111,7 +111,7 @@ async fn test_region_replay() {
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
let rows = engine
let result = engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
@@ -123,7 +123,7 @@ async fn test_region_replay() {
)
.await
.unwrap();
assert_eq!(0, rows);
assert_eq!(0, result.affected_rows);
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();

View File

@@ -42,7 +42,7 @@ async fn put_and_flush(
};
put_rows(engine, region_id, rows).await;
let rows = engine
let result = engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
@@ -51,7 +51,7 @@ async fn put_and_flush(
)
.await
.unwrap();
assert_eq!(0, rows);
assert_eq!(0, result.affected_rows);
}
async fn delete_and_flush(
@@ -66,16 +66,16 @@ async fn delete_and_flush(
rows: build_rows_for_key("a", rows.start, rows.end, 0),
};
let rows_affected = engine
let result = engine
.handle_request(
region_id,
RegionRequest::Delete(RegionDeleteRequest { rows }),
)
.await
.unwrap();
assert_eq!(row_cnt, rows_affected);
assert_eq!(row_cnt, result.affected_rows);
let rows = engine
let result = engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
@@ -84,7 +84,7 @@ async fn delete_and_flush(
)
.await
.unwrap();
assert_eq!(0, rows);
assert_eq!(0, result.affected_rows);
}
async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec<i64> {
@@ -127,11 +127,11 @@ async fn test_compaction_region() {
delete_and_flush(&engine, region_id, &column_schemas, 15..30).await;
put_and_flush(&engine, region_id, &column_schemas, 15..25).await;
let output = engine
let result = engine
.handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {}))
.await
.unwrap();
assert_eq!(output, 0);
assert_eq!(result.affected_rows, 0);
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(

View File

@@ -712,11 +712,11 @@ pub fn delete_rows_schema(request: &RegionCreateRequest) -> Vec<api::v1::ColumnS
/// Put rows into the engine.
pub async fn put_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) {
let num_rows = rows.rows.len();
let rows_inserted = engine
let result = engine
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
.await
.unwrap();
assert_eq!(num_rows, rows_inserted);
assert_eq!(num_rows, result.affected_rows);
}
/// Build rows to put for specific `key`.
@@ -758,26 +758,26 @@ pub fn build_delete_rows_for_key(key: &str, start: usize, end: usize) -> Vec<Row
/// Delete rows from the engine.
pub async fn delete_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) {
let num_rows = rows.rows.len();
let rows_inserted = engine
let result = engine
.handle_request(
region_id,
RegionRequest::Delete(RegionDeleteRequest { rows }),
)
.await
.unwrap();
assert_eq!(num_rows, rows_inserted);
assert_eq!(num_rows, result.affected_rows);
}
/// Flush a region manually.
pub async fn flush_region(engine: &MitoEngine, region_id: RegionId, row_group_size: Option<usize>) {
let rows = engine
let result = engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest { row_group_size }),
)
.await
.unwrap();
assert_eq!(0, rows);
assert_eq!(0, result.affected_rows);
}
/// Reopen a region.

View File

@@ -144,7 +144,10 @@ impl Deleter {
});
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<AffectedRows>>()?;
let affected_rows = results
.into_iter()
.map(|resp| resp.map(|r| r.affected_rows))
.sum::<Result<AffectedRows>>()?;
crate::metrics::DIST_DELETE_ROW_COUNT.inc_by(affected_rows as u64);
Ok(affected_rows)
}

View File

@@ -216,7 +216,10 @@ impl Inserter {
});
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<AffectedRows>>()?;
let affected_rows = results
.into_iter()
.map(|resp| resp.map(|r| r.affected_rows))
.sum::<Result<AffectedRows>>()?;
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(affected_rows as u64);
Ok(Output::new(
OutputData::AffectedRows(affected_rows),

View File

@@ -181,7 +181,10 @@ impl Requester {
});
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<AffectedRows>>()?;
let affected_rows = results
.into_iter()
.map(|resp| resp.map(|r| r.affected_rows))
.sum::<Result<AffectedRows>>()?;
Ok(affected_rows)
}

View File

@@ -48,6 +48,7 @@ use sql::statements::alter::AlterTable;
use sql::statements::create::{CreateExternalTable, CreateTable, CreateTableLike, Partitions};
use sql::statements::sql_value_to_value;
use sqlparser::ast::{Expr, Ident, Value as ParserValue};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
use table::dist_table::DistTable;
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
use table::requests::{AlterKind, AlterTableRequest, TableOptions};
@@ -138,6 +139,22 @@ impl StatementExecutor {
partitions: Option<Partitions>,
query_ctx: &QueryContextRef,
) -> Result<TableRef> {
// Check if is creating logical table
if create_table.engine == METRIC_ENGINE_NAME
&& create_table
.table_options
.contains_key(LOGICAL_TABLE_METADATA_KEY)
{
return self
.create_logical_tables(&[create_table.clone()])
.await?
.into_iter()
.next()
.context(error::UnexpectedSnafu {
violated: "expected to create a logical table",
});
}
let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
let schema = self
.table_metadata_manager

View File

@@ -95,6 +95,16 @@ impl ColumnMetadata {
column_id,
})
}
/// Encodes a vector of `ColumnMetadata` into a JSON byte vector.
pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
serde_json::to_vec(columns)
}
/// Decodes a JSON byte vector into a vector of `ColumnMetadata`.
pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
serde_json::from_slice(bytes)
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]

View File

@@ -66,3 +66,7 @@ pub const PHYSICAL_TABLE_METADATA_KEY: &str = "physical_metric_table";
/// ```
/// And this key will be translated to corresponding physical **REGION** id in metasrv.
pub const LOGICAL_TABLE_METADATA_KEY: &str = "on_physical_table";
/// HashMap key to be used in the region server's extension response.
/// Represent a list of column metadata that are added to physical table.
pub const ALTER_PHYSICAL_EXTENSION_KEY: &str = "ALTER_PHYSICAL";

View File

@@ -15,6 +15,7 @@
//! Region Engine's definition
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
@@ -129,7 +130,7 @@ pub trait RegionEngine: Send + Sync {
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<AffectedRows, BoxedError>;
) -> Result<RegionHandleResult, BoxedError>;
/// Handles substrait query and return a stream of record batches
async fn handle_query(
@@ -171,3 +172,20 @@ pub trait RegionEngine: Send + Sync {
}
pub type RegionEngineRef = Arc<dyn RegionEngine>;
// TODO: reorganize the dependence to merge this struct with the
// one in common_meta
#[derive(Debug)]
pub struct RegionHandleResult {
pub affected_rows: AffectedRows,
pub extension: HashMap<String, Vec<u8>>,
}
impl RegionHandleResult {
pub fn new(affected_rows: AffectedRows) -> Self {
Self {
affected_rows,
extension: Default::default(),
}
}
}

View File

@@ -567,13 +567,19 @@ impl From<TableId> for TableIdent {
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct RawTableMeta {
pub schema: RawSchema,
/// The indices of columns in primary key. Note that the index of timestamp column
/// is not included. Order matters to this array.
pub primary_key_indices: Vec<usize>,
/// The indices of columns in value. Order doesn't matter to this array.
pub value_indices: Vec<usize>,
/// Engine type of this table. Usually in small case.
pub engine: String,
/// Deprecated. See https://github.com/GreptimeTeam/greptimedb/issues/2982
pub next_column_id: ColumnId,
pub region_numbers: Vec<u32>,
pub options: TableOptions,
pub created_on: DateTime<Utc>,
/// Order doesn't matter to this array.
#[serde(default)]
pub partition_key_indices: Vec<usize>,
}

View File

@@ -195,14 +195,14 @@ mod tests {
name: prom_store::METRIC_NAME_LABEL.to_string(),
value: "metric3".to_string(),
},
Label {
name: "idc".to_string(),
value: "z002".to_string(),
},
Label {
name: "app".to_string(),
value: "biz".to_string(),
},
Label {
name: "idc".to_string(),
value: "z002".to_string(),
},
],
timeseries.labels
);

View File

@@ -38,9 +38,9 @@ DESC TABLE t1;
+--------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+----------------------+-----+------+---------+---------------+
| host | String | PRI | YES | | TAG |
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
| val | Float64 | | YES | | FIELD |
| host | String | PRI | YES | | TAG |
+--------+----------------------+-----+------+---------+---------------+
DESC TABLE t2;
@@ -48,8 +48,8 @@ DESC TABLE t2;
+--------+----------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+----------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
| job | String | PRI | YES | | TAG |
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
| val | Float64 | | YES | | FIELD |
+--------+----------------------+-----+------+---------+---------------+

View File

@@ -6,18 +6,18 @@ CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) e
Affected Rows: 0
INSERT INTO t1 VALUES (0, 0, 'host1'), (1, 1, 'host2');
INSERT INTO t1 VALUES ('host1',0, 0), ('host2', 1, 1,);
Affected Rows: 2
SELECT * from t1;
+-------------------------+-----+-------+
| ts | val | host |
+-------------------------+-----+-------+
| 1970-01-01T00:00:00.001 | 1.0 | host2 |
| 1970-01-01T00:00:00 | 0.0 | host1 |
+-------------------------+-----+-------+
+-------+-------------------------+-----+
| host | ts | val |
+-------+-------------------------+-----+
| host2 | 1970-01-01T00:00:00.001 | 1.0 |
| host1 | 1970-01-01T00:00:00 | 0.0 |
+-------+-------------------------+-----+
CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");
@@ -28,18 +28,18 @@ SELECT * from t2;
++
++
INSERT INTO t2 VALUES (0, 'job1', 0), (1, 'job2', 1);
INSERT INTO t2 VALUES ('job1', 0, 0), ('job2', 1, 1);
Affected Rows: 2
SELECT * from t2;
+-------------------------+------+-----+
| ts | job | val |
+-------------------------+------+-----+
| 1970-01-01T00:00:00.001 | job2 | 1.0 |
| 1970-01-01T00:00:00 | job1 | 0.0 |
+-------------------------+------+-----+
+------+-------------------------+-----+
| job | ts | val |
+------+-------------------------+-----+
| job2 | 1970-01-01T00:00:00.001 | 1.0 |
| job1 | 1970-01-01T00:00:00 | 0.0 |
+------+-------------------------+-----+
DROP TABLE t1;

View File

@@ -2,7 +2,7 @@ CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("phys
CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy");
INSERT INTO t1 VALUES (0, 0, 'host1'), (1, 1, 'host2');
INSERT INTO t1 VALUES ('host1',0, 0), ('host2', 1, 1,);
SELECT * from t1;
@@ -10,7 +10,7 @@ CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) en
SELECT * from t2;
INSERT INTO t2 VALUES (0, 'job1', 0), (1, 'job2', 1);
INSERT INTO t2 VALUES ('job1', 0, 0), ('job2', 1, 1);
SELECT * from t2;