feat: v04 rm unused exprs (#2285)

* feat: rm compact and flush exprs

* refactor: continue to rm compact and flush
This commit is contained in:
JeremyHi
2023-08-30 19:19:06 +08:00
committed by Ruihang Xia
parent db89235474
commit 58d07e0e62
22 changed files with 373 additions and 853 deletions

621
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -77,7 +77,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
derive_builder = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4a277f27caa035a801d5b9c020a0449777736614" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7fb5aa1095b62794c5c4333a1a2ed45da1c86a07" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"

View File

@@ -339,8 +339,6 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
Some(Expr::CreateTable(_)) => "ddl.create_table",
Some(Expr::Alter(_)) => "ddl.alter",
Some(Expr::DropTable(_)) => "ddl.drop_table",
Some(Expr::FlushTable(_)) => "ddl.flush_table",
Some(Expr::CompactTable(_)) => "ddl.compact_table",
Some(Expr::TruncateTable(_)) => "ddl.truncate_table",
None => "ddl.empty",
}

View File

@@ -73,7 +73,7 @@ async fn run() {
let logical = mock_logical_plan();
event!(Level::INFO, "plan size: {:#?}", logical.len());
let result = db.logical_plan(logical, None).await.unwrap();
let result = db.logical_plan(logical, 0).await.unwrap();
event!(Level::INFO, "result: {:#?}", result);
}

View File

@@ -17,9 +17,9 @@ use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
AlterExpr, AuthHeader, CompactTableExpr, CreateTableExpr, DdlRequest, DeleteRequests,
DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests, PromRangeQuery, QueryRequest,
RequestHeader, RowInsertRequests, TruncateTableExpr,
AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, DeleteRequests, DropTableExpr,
GreptimeRequest, InsertRequests, PromRangeQuery, QueryRequest, RequestHeader,
RowInsertRequests, TruncateTableExpr,
};
use arrow_flight::Ticket;
use async_stream::stream;
@@ -147,13 +147,13 @@ impl Database {
async fn handle(&self, request: Request) -> Result<u32> {
let mut client = self.client.make_database_client()?.inner;
let request = self.to_rpc_request(request, None);
let request = self.to_rpc_request(request, 0);
let response = client.handle(request).await?.into_inner();
from_grpc_response(response)
}
#[inline]
fn to_rpc_request(&self, request: Request, trace_id: Option<u64>) -> GreptimeRequest {
fn to_rpc_request(&self, request: Request, trace_id: u64) -> GreptimeRequest {
GreptimeRequest {
header: Some(RequestHeader {
catalog: self.catalog.clone(),
@@ -161,7 +161,7 @@ impl Database {
authorization: self.ctx.auth_header.clone(),
dbname: self.dbname.clone(),
trace_id,
span_id: None,
span_id: 0,
}),
request: Some(request),
}
@@ -173,16 +173,12 @@ impl Database {
Request::Query(QueryRequest {
query: Some(Query::Sql(sql.to_string())),
}),
None,
0,
)
.await
}
pub async fn logical_plan(
&self,
logical_plan: Vec<u8>,
trace_id: Option<u64>,
) -> Result<Output> {
pub async fn logical_plan(&self, logical_plan: Vec<u8>, trace_id: u64) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_LOGICAL_PLAN);
self.do_get(
Request::Query(QueryRequest {
@@ -210,7 +206,7 @@ impl Database {
step: step.to_string(),
})),
}),
None,
0,
)
.await
}
@@ -221,7 +217,7 @@ impl Database {
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
}),
None,
0,
)
.await
}
@@ -232,7 +228,7 @@ impl Database {
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
}),
None,
0,
)
.await
}
@@ -243,29 +239,7 @@ impl Database {
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(expr)),
}),
None,
)
.await
}
pub async fn flush_table(&self, expr: FlushTableExpr) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_FLUSH_TABLE);
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::FlushTable(expr)),
}),
None,
)
.await
}
pub async fn compact_table(&self, expr: CompactTableExpr) -> Result<Output> {
let _timer = timer!(metrics::METRIC_GRPC_COMPACT_TABLE);
self.do_get(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CompactTable(expr)),
}),
None,
0,
)
.await
}
@@ -276,12 +250,12 @@ impl Database {
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::TruncateTable(expr)),
}),
None,
0,
)
.await
}
async fn do_get(&self, request: Request, trace_id: Option<u64>) -> Result<Output> {
async fn do_get(&self, request: Request, trace_id: u64) -> Result<Output> {
// FIXME(paomian): should be added some labels for metrics
let _timer = timer!(metrics::METRIC_GRPC_DO_GET);
let request = self.to_rpc_request(request, trace_id);

View File

@@ -21,8 +21,6 @@ pub const METRIC_GRPC_SQL: &str = "grpc.sql";
pub const METRIC_GRPC_LOGICAL_PLAN: &str = "grpc.logical_plan";
pub const METRIC_GRPC_ALTER: &str = "grpc.alter";
pub const METRIC_GRPC_DROP_TABLE: &str = "grpc.drop_table";
pub const METRIC_GRPC_FLUSH_TABLE: &str = "grpc.flush_table";
pub const METRIC_GRPC_COMPACT_TABLE: &str = "grpc.compact_table";
pub const METRIC_GRPC_TRUNCATE_TABLE: &str = "grpc.truncate_table";
pub const METRIC_GRPC_DO_GET: &str = "grpc.do_get";
pub(crate) const METRIC_REGION_REQUEST_GRPC: &str = "grpc.region_request";

View File

@@ -25,8 +25,8 @@ type AffectedRows = u64;
#[derive(Debug)]
pub struct RegionRequester {
trace_id: Option<u64>,
span_id: Option<u64>,
trace_id: u64,
span_id: u64,
client: Client,
}
@@ -34,8 +34,8 @@ impl RegionRequester {
pub fn new(client: Client) -> Self {
// TODO(LFC): Pass in trace_id and span_id from some context when we have it.
Self {
trace_id: None,
span_id: None,
trace_id: 0,
span_id: 0,
client,
}
}

View File

@@ -180,7 +180,7 @@ impl Repl {
.encode(&plan)
.context(SubstraitEncodeLogicalPlanSnafu)?;
self.database.logical_plan(plan.to_vec(), None).await
self.database.logical_plan(plan.to_vec(), 0).await
} else {
self.database.sql(&sql).await
}

View File

@@ -186,9 +186,9 @@ fn parse_location(location: Option<Location>) -> Result<Option<AddColumnLocation
}) => Ok(Some(AddColumnLocation::First)),
Some(Location {
location_type: LOCATION_TYPE_AFTER,
after_cloumn_name,
after_column_name,
}) => Ok(Some(AddColumnLocation::After {
column_name: after_cloumn_name,
column_name: after_column_name,
})),
Some(Location { location_type, .. }) => UnknownLocationTypeSnafu { location_type }.fail(),
None => Ok(None),
@@ -262,7 +262,7 @@ mod tests {
is_key: false,
location: Some(Location {
location_type: LocationType::First.into(),
after_cloumn_name: "".to_string(),
after_column_name: "".to_string(),
}),
},
AddColumn {
@@ -275,7 +275,7 @@ mod tests {
is_key: false,
location: Some(Location {
location_type: LocationType::After.into(),
after_cloumn_name: "ts".to_string(),
after_column_name: "ts".to_string(),
}),
},
],

View File

@@ -59,11 +59,11 @@ impl From<&AddColumnLocation> for Location {
match value {
AddColumnLocation::First => Location {
location_type: LocationType::First.into(),
after_cloumn_name: "".to_string(),
after_column_name: "".to_string(),
},
AddColumnLocation::After { column_name } => Location {
location_type: LocationType::After.into(),
after_cloumn_name: column_name.to_string(),
after_column_name: column_name.to_string(),
},
}
}

View File

@@ -216,8 +216,6 @@ impl Instance {
DdlExpr::Alter(expr) => self.handle_alter(expr, query_ctx).await,
DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr, query_ctx).await,
DdlExpr::DropTable(expr) => self.handle_drop_table(expr, query_ctx).await,
DdlExpr::FlushTable(expr) => self.handle_flush_table(expr, query_ctx).await,
DdlExpr::CompactTable(expr) => self.handle_compact_table(expr, query_ctx).await,
DdlExpr::TruncateTable(expr) => self.handle_truncate_table(expr, query_ctx).await,
}
}
@@ -726,7 +724,7 @@ mod test {
is_key: true,
location: Some(Location {
location_type: LocationType::First.into(),
after_cloumn_name: "".to_string(),
after_column_name: "".to_string(),
}),
},
AddColumn {
@@ -739,7 +737,7 @@ mod test {
is_key: true,
location: Some(Location {
location_type: LocationType::After.into(),
after_cloumn_name: "a".to_string(),
after_column_name: "a".to_string(),
}),
},
],

View File

@@ -12,9 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::{
AlterExpr, CompactTableExpr, CreateTableExpr, DropTableExpr, FlushTableExpr, TruncateTableExpr,
};
use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, TruncateTableExpr};
use common_catalog::consts::IMMUTABLE_FILE_ENGINE;
use common_catalog::format_full_table_name;
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
@@ -22,9 +20,7 @@ use common_query::Output;
use common_telemetry::info;
use session::context::QueryContextRef;
use snafu::prelude::*;
use table::requests::{
CompactTableRequest, DropTableRequest, FlushTableRequest, TruncateTableRequest,
};
use table::requests::{DropTableRequest, TruncateTableRequest};
use crate::error::{
AlterExprToRequestSnafu, BumpTableIdSnafu, CatalogSnafu, CreateExprToRequestSnafu,
@@ -137,52 +133,6 @@ impl Instance {
.await
}
pub(crate) async fn handle_flush_table(
&self,
expr: FlushTableExpr,
ctx: QueryContextRef,
) -> Result<Output> {
let table_name = if expr.table_name.trim().is_empty() {
None
} else {
Some(expr.table_name)
};
let req = FlushTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name,
region_number: expr.region_number,
wait: None,
};
self.sql_handler()
.execute(SqlRequest::FlushTable(req), ctx)
.await
}
pub(crate) async fn handle_compact_table(
&self,
expr: CompactTableExpr,
ctx: QueryContextRef,
) -> Result<Output> {
let table_name = if expr.table_name.trim().is_empty() {
None
} else {
Some(expr.table_name)
};
let req = CompactTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name,
region_number: expr.region_number,
wait: None,
};
self.sql_handler()
.execute(SqlRequest::CompactTable(req), ctx)
.await
}
pub(crate) async fn handle_truncate_table(
&self,
expr: TruncateTableExpr,

View File

@@ -164,11 +164,8 @@ impl Instance {
table_metadata_manager.clone(),
);
let dist_instance = DistInstance::new(
meta_client.clone(),
Arc::new(catalog_manager.clone()),
datanode_clients.clone(),
);
let dist_instance =
DistInstance::new(meta_client.clone(), Arc::new(catalog_manager.clone()));
let dist_instance = Arc::new(dist_instance);
catalog_manager.set_dist_instance(dist_instance.clone());

View File

@@ -23,24 +23,21 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::{
column_def, AlterExpr, CompactTableExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequests,
FlushTableExpr, InsertRequests, RowInsertRequests, TruncateTableExpr,
column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequests, InsertRequests,
RowInsertRequests, TruncateTableExpr,
};
use async_trait::async_trait;
use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest};
use chrono::DateTime;
use client::client_manager::DatanodeClients;
use client::Database;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
use common_meta::peer::Peer;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::router::{Partition, Partition as MetaPartition, RouteRequest};
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::{debug, info};
use common_telemetry::info;
use datanode::instance::sql::table_idents_to_full_name;
use datanode::sql::SqlHandler;
use datatypes::prelude::ConcreteDataType;
@@ -57,7 +54,6 @@ use sql::ast::{Ident, Value as SqlValue};
use sql::statements::create::{PartitionEntry, Partitions};
use sql::statements::statement::Statement;
use sql::statements::{self, sql_value_to_value};
use store_api::storage::RegionNumber;
use table::error::TableOperationSnafu;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableInfo, TableType};
use table::requests::{AlterTableRequest, TableOptions};
@@ -66,9 +62,9 @@ use table::TableRef;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
DeserializePartitionSnafu, InvokeDatanodeSnafu, NotSupportedSnafu, ParseSqlSnafu,
RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, TableSnafu, UnrecognizedTableOptionSnafu,
DeserializePartitionSnafu, InvokeDatanodeSnafu, NotSupportedSnafu, ParseSqlSnafu, Result,
SchemaExistsSnafu, TableAlreadyExistSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
TableSnafu, UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
use crate::instance::distributed::deleter::DistDeleter;
@@ -82,19 +78,13 @@ const MAX_VALUE: &str = "MAXVALUE";
pub struct DistInstance {
meta_client: Arc<MetaClient>,
pub(crate) catalog_manager: Arc<FrontendCatalogManager>,
datanode_clients: Arc<DatanodeClients>,
}
impl DistInstance {
pub fn new(
meta_client: Arc<MetaClient>,
catalog_manager: Arc<FrontendCatalogManager>,
datanode_clients: Arc<DatanodeClients>,
) -> Self {
pub fn new(meta_client: Arc<MetaClient>, catalog_manager: Arc<FrontendCatalogManager>) -> Self {
Self {
meta_client,
catalog_manager,
datanode_clients,
}
}
@@ -213,109 +203,6 @@ impl DistInstance {
Ok(Output::AffectedRows(1))
}
async fn flush_table(
&self,
table_name: TableName,
region_number: Option<RegionNumber>,
) -> Result<Output> {
let candidates = self
.find_flush_or_compaction_candidates(&table_name, region_number)
.await?;
let expr = FlushTableExpr {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
region_number,
..Default::default()
};
for candidate in candidates {
debug!("Flushing table {table_name} on Datanode {candidate:?}");
let client = self.datanode_clients.get_client(&candidate).await;
let client = Database::new(&expr.catalog_name, &expr.schema_name, client);
client
.flush_table(expr.clone())
.await
.context(RequestDatanodeSnafu)?;
}
Ok(Output::AffectedRows(0))
}
async fn compact_table(
&self,
table_name: TableName,
region_number: Option<RegionNumber>,
) -> Result<Output> {
let candidates = self
.find_flush_or_compaction_candidates(&table_name, region_number)
.await?;
let expr = CompactTableExpr {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
region_number,
};
for candidate in candidates {
debug!("Compacting table {table_name} on Datanode {candidate:?}");
let client = self.datanode_clients.get_client(&candidate).await;
let client = Database::new(&expr.catalog_name, &expr.schema_name, client);
client
.compact_table(expr.clone())
.await
.context(RequestDatanodeSnafu)?;
}
Ok(Output::AffectedRows(0))
}
async fn find_flush_or_compaction_candidates(
&self,
table_name: &TableName,
region_number: Option<RegionNumber>,
) -> Result<Vec<Peer>> {
let table = self
.catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
})?;
let table_id = table.table_info().table_id();
let route_response = self
.meta_client
.route(RouteRequest {
table_ids: vec![table_id],
})
.await
.context(RequestMetaSnafu)?;
let res = route_response
.table_routes
.iter()
.filter(|route| {
route.region_routes.iter().any(|r| {
let Some(n) = region_number else {
return true;
};
n == r.region.id.region_number()
})
})
.flat_map(|route| route.find_leaders().into_iter())
.collect::<Vec<_>>();
Ok(res)
}
async fn truncate_table(&self, table_name: TableName) -> Result<Output> {
let table = self
.catalog_manager
@@ -423,7 +310,7 @@ impl DistInstance {
let table_name = TableName::new(catalog, schema, table);
self.truncate_table(table_name).await
}
_ => error::NotSupportedSnafu {
_ => NotSupportedSnafu {
feat: format!("{stmt:?}"),
}
.fail(),
@@ -723,16 +610,6 @@ impl GrpcQueryHandler for DistInstance {
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.drop_table(table_name).await
}
DdlExpr::FlushTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.flush_table(table_name, expr.region_number).await
}
DdlExpr::CompactTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.compact_table(table_name, expr.region_number).await
}
DdlExpr::TruncateTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);

View File

@@ -146,7 +146,10 @@ impl MergeScanExec {
let peers = self.peers.clone();
let clients = self.clients.clone();
let table = self.table.clone();
let trace_id = context.task_id().and_then(|id| id.parse().ok());
let trace_id = context
.task_id()
.and_then(|id| id.parse().ok())
.unwrap_or_default();
let metric = MergeScanMetric::new(&self.metric);
let stream = Box::pin(stream!({

View File

@@ -170,7 +170,7 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte
QueryContextBuilder::default()
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
.try_trace_id(header.and_then(|h: &RequestHeader| h.trace_id))
.try_trace_id(header.map(|h| h.trace_id))
.build()
}

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod admin;
pub mod authorize;
pub mod handler;
pub mod header;
@@ -65,7 +64,6 @@ use self::authorize::HttpAuth;
use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::configurator::ConfiguratorRef;
use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu};
use crate::http::admin::{compact, flush};
use crate::http::prometheus::{
instant_query, label_values_query, labels_query, range_query, series_query,
};
@@ -487,13 +485,6 @@ impl HttpServer {
router = router.nest(&format!("/{HTTP_API_VERSION}"), sql_router);
}
if let Some(grpc_handler) = self.grpc_handler.clone() {
router = router.nest(
&format!("/{HTTP_API_VERSION}/admin"),
self.route_admin(grpc_handler.clone()),
);
}
if let Some(opentsdb_handler) = self.opentsdb_handler.clone() {
router = router.nest(
&format!("/{HTTP_API_VERSION}/opentsdb"),
@@ -674,13 +665,6 @@ impl HttpServer {
.with_state(otlp_handler)
}
fn route_admin<S>(&self, grpc_handler: ServerGrpcQueryHandlerRef) -> Router<S> {
Router::new()
.route("/flush", routing::post(flush))
.route("/compact", routing::post(compact))
.with_state(grpc_handler)
}
fn route_config<S>(&self, state: GreptimeOptionsConfigState) -> ApiRouter<S> {
ApiRouter::new()
.route("/config", apirouting::get(handler::config))

View File

@@ -1,112 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::ddl_request::Expr;
use api::v1::greptime_request::Request;
use api::v1::{CompactTableExpr, DdlRequest, FlushTableExpr};
use axum::extract::{Query, RawBody, State};
use axum::http::StatusCode;
use axum::Extension;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use session::context::QueryContextRef;
use snafu::OptionExt;
use crate::error;
use crate::error::Result;
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
#[axum_macros::debug_handler]
pub async fn flush(
State(grpc_handler): State<ServerGrpcQueryHandlerRef>,
Query(params): Query<HashMap<String, String>>,
Extension(query_ctx): Extension<QueryContextRef>,
RawBody(_): RawBody,
) -> Result<(StatusCode, ())> {
let catalog_name = params
.get("catalog")
.cloned()
.unwrap_or(DEFAULT_CATALOG_NAME.to_string());
let schema_name = params
.get("db")
.cloned()
.context(error::InvalidFlushArgumentSnafu {
err_msg: "db is not present",
})?;
// if table name is not present, flush all tables inside schema
let table_name = params.get("table").cloned().unwrap_or_default();
let region_number: Option<u32> = params
.get("region")
.map(|v| v.parse())
.transpose()
.ok()
.flatten();
let request = Request::Ddl(DdlRequest {
expr: Some(Expr::FlushTable(FlushTableExpr {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
region_number,
..Default::default()
})),
});
grpc_handler.do_query(request, query_ctx).await?;
Ok((StatusCode::NO_CONTENT, ()))
}
#[axum_macros::debug_handler]
pub async fn compact(
State(grpc_handler): State<ServerGrpcQueryHandlerRef>,
Query(params): Query<HashMap<String, String>>,
Extension(query_ctx): Extension<QueryContextRef>,
RawBody(_): RawBody,
) -> Result<(StatusCode, ())> {
let catalog_name = params
.get("catalog")
.cloned()
.unwrap_or(DEFAULT_CATALOG_NAME.to_string());
let schema_name = params
.get("db")
.cloned()
.context(error::InvalidFlushArgumentSnafu {
err_msg: "db is not present",
})?;
// if table name is not present, flush all tables inside schema
let table_name = params.get("table").cloned().unwrap_or_default();
let region_number: Option<u32> = params
.get("region")
.map(|v| v.parse())
.transpose()
.ok()
.flatten();
let request = Request::Ddl(DdlRequest {
expr: Some(Expr::CompactTable(CompactTableExpr {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
region_number,
})),
});
grpc_handler.do_query(request, query_ctx).await?;
Ok((StatusCode::NO_CONTENT, ()))
}

View File

@@ -415,11 +415,11 @@ pub fn sql_location_to_grpc_add_column_location(
match location {
Some(AddColumnLocation::First) => Some(Location {
location_type: LocationType::First.into(),
after_cloumn_name: "".to_string(),
after_column_name: "".to_string(),
}),
Some(AddColumnLocation::After { column_name }) => Some(Location {
location_type: LocationType::After.into(),
after_cloumn_name: column_name.to_string(),
after_column_name: column_name.to_string(),
}),
None => None,
}

View File

@@ -23,7 +23,7 @@ mod test {
use api::v1::{
alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DeleteRequests,
DropTableExpr, FlushTableExpr, InsertRequest, InsertRequests, QueryRequest, SemanticType,
DropTableExpr, InsertRequest, InsertRequests, QueryRequest, SemanticType,
};
use common_catalog::consts::MITO_ENGINE;
use common_meta::rpc::router::region_distribution;
@@ -33,8 +33,6 @@ mod test {
use query::parser::QueryLanguageParser;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContext;
use store_api::storage::RegionNumber;
use tests::{has_parquet_file, test_region_dir};
use crate::tests;
use crate::tests::MockDistributedInstance;
@@ -294,106 +292,6 @@ CREATE TABLE {table_name} (
test_insert_delete_and_query_on_auto_created_table(instance).await
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_flush_table() {
common_telemetry::init_default_ut_logging();
let instance = tests::create_distributed_instance("test_distributed_flush_table").await;
let data_tmp_dirs = instance.data_tmp_dirs();
let frontend = instance.frontend();
let frontend = frontend.as_ref();
let table_name = "my_dist_table";
let sql = format!(
r"
CREATE TABLE {table_name} (
a INT,
ts TIMESTAMP,
TIME INDEX (ts)
) PARTITION BY RANGE COLUMNS(a) (
PARTITION r0 VALUES LESS THAN (10),
PARTITION r1 VALUES LESS THAN (20),
PARTITION r2 VALUES LESS THAN (50),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
)"
);
create_table(frontend, sql).await;
test_insert_delete_and_query_on_existing_table(frontend, table_name).await;
flush_table(frontend, "greptime", "public", table_name, None).await;
// Wait for previous task finished
flush_table(frontend, "greptime", "public", table_name, None).await;
let table = frontend
.catalog_manager()
.table("greptime", "public", table_name)
.await
.unwrap()
.unwrap();
let table_id = table.table_info().table_id();
let table_route_value = instance
.table_metadata_manager()
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap();
let region_to_dn_map = region_distribution(&table_route_value.region_routes)
.unwrap()
.iter()
.map(|(k, v)| (v[0], *k))
.collect::<HashMap<u32, u64>>();
for (region, dn) in region_to_dn_map.iter() {
// data_tmp_dirs -> dn: 1..4
let data_tmp_dir = data_tmp_dirs.get((*dn - 1) as usize).unwrap();
let region_dir = test_region_dir(
data_tmp_dir.path().to_str().unwrap(),
"greptime",
"public",
table_id,
*region,
);
assert!(has_parquet_file(&region_dir));
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_flush_table() {
common_telemetry::init_default_ut_logging();
let standalone = tests::create_standalone_instance("test_standalone_flush_table").await;
let instance = &standalone.instance;
let data_tmp_dir = standalone.data_tmp_dir();
let table_name = "my_table";
let sql = format!("CREATE TABLE {table_name} (a INT, b STRING, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b))");
create_table(instance, sql).await;
test_insert_delete_and_query_on_existing_table(instance, table_name).await;
let table_id = 1024;
let region_id = 0;
let region_dir = test_region_dir(
data_tmp_dir.path().to_str().unwrap(),
"greptime",
"public",
table_id,
region_id,
);
assert!(!has_parquet_file(&region_dir));
flush_table(instance, "greptime", "public", "my_table", None).await;
// Wait for previous task finished
flush_table(instance, "greptime", "public", "my_table", None).await;
assert!(has_parquet_file(&region_dir));
}
async fn create_table(frontend: &Instance, sql: String) {
let request = Request::Query(QueryRequest {
query: Some(Query::Sql(sql)),
@@ -402,27 +300,6 @@ CREATE TABLE {table_name} (
assert!(matches!(output, Output::AffectedRows(0)));
}
async fn flush_table(
frontend: &Instance,
catalog_name: &str,
schema_name: &str,
table_name: &str,
region_number: Option<RegionNumber>,
) {
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::FlushTable(FlushTableExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
region_number,
..Default::default()
})),
});
let output = query(frontend, request).await;
assert!(matches!(output, Output::AffectedRows(0)));
}
async fn test_insert_delete_and_query_on_existing_table(instance: &Instance, table_name: &str) {
let ts_millisecond_values = vec![
1672557972000,

View File

@@ -21,10 +21,8 @@ use std::sync::Arc;
use catalog::RegisterSchemaRequest;
use common_meta::key::TableMetadataManagerRef;
use common_test_util::temp_dir::TempDir;
use datanode::instance::Instance as DatanodeInstance;
use frontend::instance::Instance;
use table::engine::{region_name, table_dir};
use crate::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder};
use crate::test_util::{create_tmp_dir_and_datanode_opts, StorageType, TestGuard};
@@ -32,14 +30,6 @@ use crate::test_util::{create_tmp_dir_and_datanode_opts, StorageType, TestGuard}
pub struct MockDistributedInstance(GreptimeDbCluster);
impl MockDistributedInstance {
pub fn data_tmp_dirs(&self) -> Vec<&TempDir> {
self.0
._dir_guards
.iter()
.filter_map(|d| if !d.is_wal { Some(&d.temp_dir) } else { None })
.collect()
}
pub fn frontend(&self) -> Arc<Instance> {
self.0.frontend.clone()
}
@@ -58,12 +48,6 @@ pub struct MockStandaloneInstance {
_guard: TestGuard,
}
impl MockStandaloneInstance {
pub fn data_tmp_dir(&self) -> &TempDir {
&self._guard.home_guard.temp_dir
}
}
pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance {
let (opts, guard) = create_tmp_dir_and_datanode_opts(StorageType::File, test_name);
let (dn_instance, heartbeat) = DatanodeInstance::with_opts(&opts, Default::default())
@@ -105,29 +89,3 @@ pub async fn create_distributed_instance(test_name: &str) -> MockDistributedInst
let cluster = GreptimeDbClusterBuilder::new(test_name).build().await;
MockDistributedInstance(cluster)
}
pub fn test_region_dir(
dir: &str,
catalog_name: &str,
schema_name: &str,
table_id: u32,
region_id: u32,
) -> String {
let table_dir = table_dir(catalog_name, schema_name, table_id);
let region_name = region_name(table_id, region_id);
format!("{}/{}/{}", dir, table_dir, region_name)
}
pub fn has_parquet_file(sst_dir: &str) -> bool {
for entry in std::fs::read_dir(sst_dir).unwrap() {
let entry = entry.unwrap();
let path = entry.path();
if !path.is_dir() {
assert_eq!("parquet", path.extension().unwrap());
return true;
}
}
false
}

View File

@@ -136,7 +136,10 @@ pub async fn test_mysql_crud(store_type: StorageType) {
.await
.is_ok());
for i in 0..10 {
let dt = DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp_opt(60, i).unwrap(), Utc);
let dt: DateTime<Utc> = DateTime::from_naive_utc_and_offset(
NaiveDateTime::from_timestamp_opt(60, i).unwrap(),
Utc,
);
let d = NaiveDate::from_yo_opt(2015, 100).unwrap();
let hello = format!("hello{i}");
let bytes = hello.as_bytes();
@@ -165,7 +168,7 @@ pub async fn test_mysql_crud(store_type: StorageType) {
assert_eq!(ret, i as i64);
let expected_d = NaiveDate::from_yo_opt(2015, 100).unwrap();
assert_eq!(expected_d, d);
let expected_dt = DateTime::<Utc>::from_utc(
let expected_dt: DateTime<Utc> = DateTime::from_naive_utc_and_offset(
NaiveDateTime::from_timestamp_opt(60, i as u32).unwrap(),
Utc,
);