feat: make grpc insert requests in a batch (#1687)

* feat: make Prometheus remote write in a batch

* rebase

* fix: resolve PR comments

* fix: resolve PR comments

* fix: resolve PR comments
This commit is contained in:
LFC
2023-06-02 17:06:48 +08:00
committed by GitHub
parent 8e69aef973
commit 5004cf6d9a
53 changed files with 1132 additions and 440 deletions

5
Cargo.lock generated
View File

@@ -1232,6 +1232,7 @@ dependencies = [
"async-stream",
"async-trait",
"backoff",
"catalog",
"chrono",
"common-catalog",
"common-error",
@@ -4054,7 +4055,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45#ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=4398d20c56d5f7939cc2960789cb1fa7dd18e6fe#4398d20c56d5f7939cc2960789cb1fa7dd18e6fe"
dependencies = [
"prost",
"serde",
@@ -6116,6 +6117,7 @@ dependencies = [
name = "partition"
version = "0.2.0"
dependencies = [
"api",
"async-trait",
"common-catalog",
"common-error",
@@ -8426,6 +8428,7 @@ dependencies = [
"humantime-serde",
"hyper",
"influxdb_line_protocol",
"itertools",
"metrics",
"metrics-process",
"mime_guess",

View File

@@ -71,7 +71,8 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4398d20c56d5f7939cc2960789cb1fa7dd18e6fe" }
itertools = "0.10"
parquet = "40.0"
paste = "1.0"
prost = "0.11"

View File

@@ -9,6 +9,6 @@ arrow.workspace = true
clap = { version = "4.0", features = ["derive"] }
client = { path = "../src/client" }
indicatif = "0.17.1"
itertools = "0.10.5"
itertools.workspace = true
parquet.workspace = true
tokio.workspace = true

View File

@@ -26,7 +26,9 @@ use arrow::datatypes::{DataType, Float64Type, Int64Type};
use arrow::record_batch::RecordBatch;
use clap::Parser;
use client::api::v1::column::Values;
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest};
use client::api::v1::{
Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, InsertRequests,
};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
@@ -107,8 +109,12 @@ async fn write_data(
columns,
row_count,
};
let requests = InsertRequests {
inserts: vec![request],
};
let now = Instant::now();
db.insert(request).await.unwrap();
db.insert(requests).await.unwrap();
let elapsed = now.elapsed();
total_rpc_elapsed_ms += elapsed.as_millis();
progress_bar.inc(row_count as _);

View File

@@ -231,7 +231,7 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) {
/// Returns the type name of the [Request].
pub fn request_type(request: &Request) -> &'static str {
match request {
Request::Insert(_) => "insert",
Request::Inserts(_) => "inserts",
Request::Query(query_req) => query_request_type(query_req),
Request::Ddl(ddl_req) => ddl_request_type(ddl_req),
Request::Delete(_) => "delete",

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[features]
testing = []
[dependencies]
api = { path = "../api" }
arc-swap = "1.0"
@@ -42,6 +45,7 @@ table = { path = "../table" }
tokio.workspace = true
[dev-dependencies]
catalog = { path = ".", features = ["testing"] }
common-test-util = { path = "../common/test-util" }
chrono.workspace = true
log-store = { path = "../log-store" }

View File

@@ -27,6 +27,9 @@ use crate::error::Error;
mod client;
mod manager;
#[cfg(feature = "testing")]
pub mod mock;
#[derive(Debug, Clone)]
pub struct Kv(pub Vec<u8>, pub Vec<u8>);

View File

@@ -20,9 +20,6 @@ use std::str::FromStr;
use std::sync::Arc;
use async_stream::stream;
use catalog::error::Error;
use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use catalog::remote::{Kv, KvBackend, ValueIter};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_recordbatch::RecordBatch;
use common_telemetry::logging::info;
@@ -37,6 +34,10 @@ use table::test_util::MemTable;
use table::TableRef;
use tokio::sync::RwLock;
use crate::error::Error;
use crate::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use crate::remote::{Kv, KvBackend, ValueIter};
pub struct MockKvBackend {
map: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
}

View File

@@ -14,8 +14,6 @@
#![feature(assert_matches)]
mod mock;
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
@@ -23,6 +21,7 @@ mod tests {
use std::sync::Arc;
use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use catalog::remote::mock::{MockKvBackend, MockTableEngine};
use catalog::remote::{
CachedMetaKvBackend, KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider,
RemoteSchemaProvider,
@@ -35,8 +34,6 @@ mod tests {
use table::engine::{EngineContext, TableEngineRef};
use table::requests::CreateTableRequest;
use crate::mock::{MockKvBackend, MockTableEngine};
#[tokio::test]
async fn test_backend() {
common_telemetry::init_default_ut_logging();

View File

@@ -16,7 +16,7 @@ use std::fmt::{Debug, Formatter};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use common_grpc::channel_manager::ChannelManager;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::peer::Peer;
use common_telemetry::info;
use moka::future::{Cache, CacheBuilder};
@@ -31,8 +31,11 @@ pub struct DatanodeClients {
impl Default for DatanodeClients {
fn default() -> Self {
// TODO(LFC): Make this channel config configurable.
let config = ChannelConfig::new().timeout(Duration::from_secs(8));
Self {
channel_manager: ChannelManager::new(),
channel_manager: ChannelManager::with_config(config),
clients: CacheBuilder::new(1024)
.time_to_live(Duration::from_secs(30 * 60))
.time_to_idle(Duration::from_secs(5 * 60))

View File

@@ -18,7 +18,7 @@ use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
greptime_response, AffectedRows, AlterExpr, AuthHeader, CreateTableExpr, DdlRequest,
DeleteRequest, DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequest, PromRangeQuery,
DeleteRequest, DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests, PromRangeQuery,
QueryRequest, RequestHeader,
};
use arrow_flight::{FlightData, Ticket};
@@ -109,9 +109,9 @@ impl Database {
});
}
pub async fn insert(&self, request: InsertRequest) -> Result<u32> {
pub async fn insert(&self, requests: InsertRequests) -> Result<u32> {
let _timer = timer!(metrics::METRIC_GRPC_INSERT);
self.handle(Request::Insert(request)).await
self.handle(Request::Inserts(requests)).await
}
pub async fn delete(&self, request: DeleteRequest) -> Result<u32> {

View File

@@ -28,6 +28,7 @@ use tower::make::MakeConnection;
use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsConfigSnafu, Result};
const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60;
const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 2;
#[derive(Clone, Debug)]
pub struct ChannelManager {
@@ -236,7 +237,7 @@ pub struct ChannelConfig {
impl Default for ChannelConfig {
fn default() -> Self {
Self {
timeout: Some(Duration::from_secs(2)),
timeout: Some(Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)),
connect_timeout: Some(Duration::from_secs(4)),
concurrency_limit: None,
rate_limit: None,
@@ -497,7 +498,7 @@ mod tests {
let default_cfg = ChannelConfig::new();
assert_eq!(
ChannelConfig {
timeout: Some(Duration::from_secs(2)),
timeout: Some(Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)),
connect_timeout: Some(Duration::from_secs(4)),
concurrency_limit: None,
rate_limit: None,

View File

@@ -22,7 +22,7 @@ use api::v1::meta::{
};
use serde::{Deserialize, Serialize, Serializer};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::RawTableInfo;
use crate::error::{self, Result};
@@ -141,9 +141,23 @@ impl TryFrom<PbRouteResponse> for RouteResponse {
pub struct TableRoute {
pub table: Table,
pub region_routes: Vec<RegionRoute>,
region_leaders: HashMap<RegionNumber, Option<Peer>>,
}
impl TableRoute {
pub fn new(table: Table, region_routes: Vec<RegionRoute>) -> Self {
let region_leaders = region_routes
.iter()
.map(|x| (x.region.id as RegionNumber, x.leader_peer.clone()))
.collect::<HashMap<_, _>>();
Self {
table,
region_routes,
region_leaders,
}
}
pub fn try_from_raw(peers: &[PbPeer], table_route: PbTableRoute) -> Result<Self> {
let table = table_route
.table
@@ -179,10 +193,7 @@ impl TableRoute {
});
}
Ok(Self {
table,
region_routes,
})
Ok(Self::new(table, region_routes))
}
pub fn try_into_raw(self) -> Result<(Vec<PbPeer>, PbTableRoute)> {
@@ -267,6 +278,12 @@ impl TableRoute {
})
.collect()
}
pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> {
self.region_leaders
.get(&region_number)
.and_then(|x| x.as_ref())
}
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
@@ -670,6 +687,10 @@ mod tests {
follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
},
],
region_leaders: HashMap::from([
(2, Some(Peer::new(1, "a1"))),
(1, Some(Peer::new(2, "a2"))),
]),
};
let from_raw = TableRoute::try_from_raw(&raw_peers, raw_table_route.clone()).unwrap();

View File

@@ -230,6 +230,12 @@ pub enum Error {
source: servers::error::Error,
},
#[snafu(display("Failed to wait for GRPC serving, source: {}", source))]
WaitForGrpcServing {
source: servers::error::Error,
location: Location,
},
#[snafu(display("Failed to parse address {}, source: {}", addr, source))]
ParseAddr {
addr: String,
@@ -467,6 +473,12 @@ pub enum Error {
#[snafu(display("Missing WAL dir config"))]
MissingWalDirConfig { location: Location },
#[snafu(display("Failed to join task, source: {}", source))]
JoinTask {
source: common_runtime::JoinError,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -537,16 +549,19 @@ impl ErrorExt for Error {
| GetTable { source, .. } => source.status_code(),
// TODO(yingwen): Further categorize http error.
StartServer { .. }
| ParseAddr { .. }
ParseAddr { .. }
| CreateDir { .. }
| RemoveDir { .. }
| Catalog { .. }
| MissingRequiredField { .. }
| IncorrectInternalState { .. }
| ShutdownServer { .. }
| ShutdownInstance { .. }
| CloseTableEngine { .. } => StatusCode::Internal,
| CloseTableEngine { .. }
| JoinTask { .. } => StatusCode::Internal,
StartServer { source }
| ShutdownServer { source }
| WaitForGrpcServing { source, .. } => source.status_code(),
InitBackend { .. } => StatusCode::StorageUnavailable,

View File

@@ -18,15 +18,17 @@ use std::sync::Arc;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request as GrpcRequest;
use api::v1::query_request::Query;
use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequest, InsertRequest};
use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequest, InsertRequests};
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_grpc_expr::insert::to_table_insert_request;
use common_query::Output;
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
};
use datafusion::catalog::schema::SchemaProvider;
use datafusion::datasource::TableProvider;
use futures::future;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::plan::LogicalPlan;
use query::query_engine::SqlStatementExecutor;
@@ -41,8 +43,8 @@ use table::table::adapter::DfTableProviderAdapter;
use crate::error::{
self, CatalogNotFoundSnafu, CatalogSnafu, DecodeLogicalPlanSnafu, DeleteExprToRequestSnafu,
DeleteSnafu, ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, InsertSnafu, PlanStatementSnafu, Result,
SchemaNotFoundSnafu, TableNotFoundSnafu,
DeleteSnafu, ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, InsertDataSnafu, InsertSnafu,
JoinTaskSnafu, PlanStatementSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu,
};
use crate::instance::Instance;
@@ -119,31 +121,41 @@ impl Instance {
}
}
pub async fn handle_insert(
pub async fn handle_inserts(
&self,
request: InsertRequest,
ctx: QueryContextRef,
requests: InsertRequests,
ctx: &QueryContextRef,
) -> Result<Output> {
let catalog = &ctx.current_catalog();
let schema = &ctx.current_schema();
let table_name = &request.table_name.clone();
let table_ref = TableReference::full(catalog, schema, table_name);
let results = future::try_join_all(requests.inserts.into_iter().map(|insert| {
let catalog_manager = self.catalog_manager.clone();
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let table = self
.catalog_manager
.table(catalog, schema, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
common_runtime::spawn_write(async move {
let table_name = &insert.table_name.clone();
let table = catalog_manager
.table(&catalog, &schema, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: common_catalog::format_full_table_name(
&catalog, &schema, table_name,
),
})?;
let request = common_grpc_expr::insert::to_table_insert_request(catalog, schema, request)
.context(error::InsertDataSnafu)?;
let request =
to_table_insert_request(&catalog, &schema, insert).context(InsertDataSnafu)?;
let affected_rows = table.insert(request).await.with_context(|_| InsertSnafu {
table_name: table_ref.to_string(),
})?;
table.insert(request).await.with_context(|_| InsertSnafu {
table_name: common_catalog::format_full_table_name(
&catalog, &schema, table_name,
),
})
})
}))
.await
.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<usize>>()?;
Ok(Output::AffectedRows(affected_rows))
}
@@ -191,7 +203,7 @@ impl GrpcQueryHandler for Instance {
async fn do_query(&self, request: GrpcRequest, ctx: QueryContextRef) -> Result<Output> {
match request {
GrpcRequest::Insert(request) => self.handle_insert(request, ctx).await,
GrpcRequest::Inserts(requests) => self.handle_inserts(requests, &ctx).await,
GrpcRequest::Delete(request) => self.handle_delete(request, ctx).await,
GrpcRequest::Query(query_request) => {
let query = query_request
@@ -296,7 +308,7 @@ mod test {
use api::v1::column::{SemanticType, Values};
use api::v1::{
alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
CreateDatabaseExpr, CreateTableExpr, QueryRequest,
CreateDatabaseExpr, CreateTableExpr, InsertRequest, InsertRequests, QueryRequest,
};
use common_catalog::consts::MITO_ENGINE;
use common_recordbatch::RecordBatches;
@@ -481,7 +493,9 @@ mod test {
..Default::default()
};
let query = GrpcRequest::Insert(insert);
let query = GrpcRequest::Inserts(InsertRequests {
inserts: vec![insert],
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
assert!(matches!(output, Output::AffectedRows(3)));

View File

@@ -17,17 +17,18 @@ use std::net::SocketAddr;
use std::sync::Arc;
use common_runtime::Builder as RuntimeBuilder;
use futures::future;
use servers::grpc::GrpcServer;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::server::Server;
use snafu::ResultExt;
use tokio::select;
use crate::datanode::DatanodeOptions;
use crate::error::{
ParseAddrSnafu, Result, RuntimeResourceSnafu, ShutdownServerSnafu, StartServerSnafu,
WaitForGrpcServingSnafu,
};
use crate::instance::InstanceRef;
@@ -71,10 +72,14 @@ impl Services {
})?;
let grpc = self.grpc_server.start(grpc_addr);
let http = self.http_server.start(http_addr);
select!(
v = grpc => v.context(StartServerSnafu)?,
v = http => v.context(StartServerSnafu)?,
);
future::try_join_all(vec![grpc, http])
.await
.context(StartServerSnafu)?;
self.grpc_server
.wait_for_serve()
.await
.context(WaitForGrpcServingSnafu)?;
Ok(())
}

View File

@@ -37,7 +37,7 @@ datatypes = { path = "../datatypes" }
file-table-engine = { path = "../file-table-engine" }
futures = "0.3"
futures-util.workspace = true
itertools = "0.10"
itertools.workspace = true
meta-client = { path = "../meta-client" }
meter-core.workspace = true
meter-macros.workspace = true
@@ -65,6 +65,7 @@ tokio.workspace = true
tonic.workspace = true
[dev-dependencies]
catalog = { path = "../catalog", features = ["testing"] }
common-test-util = { path = "../common/test-util" }
datanode = { path = "../datanode" }
futures = "0.3"

View File

@@ -279,9 +279,7 @@ impl CatalogManager for FrontendCatalogManager {
Ok(self.backend.get(key.as_bytes()).await?.map(|_| {
Arc::new(FrontendCatalogProvider {
catalog_name: catalog.to_string(),
backend: self.backend.clone(),
partition_manager: self.partition_manager.clone(),
datanode_clients: self.datanode_clients.clone(),
catalog_manager: Arc::new(self.clone()),
}) as Arc<_>
}))
}
@@ -320,9 +318,7 @@ impl CatalogManager for FrontendCatalogManager {
pub struct FrontendCatalogProvider {
catalog_name: String,
backend: KvBackendRef,
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
catalog_manager: Arc<FrontendCatalogManager>,
}
#[async_trait::async_trait]
@@ -333,7 +329,8 @@ impl CatalogProvider for FrontendCatalogProvider {
async fn schema_names(&self) -> catalog::error::Result<Vec<String>> {
let key = build_schema_prefix(&self.catalog_name);
let mut iter = self.backend.range(key.as_bytes());
let backend = self.catalog_manager.backend();
let mut iter = backend.range(key.as_bytes());
let mut res = HashSet::new();
while let Some(r) = iter.next().await {
let Kv(k, _) = r?;
@@ -361,15 +358,17 @@ impl CatalogProvider for FrontendCatalogProvider {
}
.to_string();
let val = self.backend.get(schema_key.as_bytes()).await?;
let val = self
.catalog_manager
.backend()
.get(schema_key.as_bytes())
.await?;
let provider = val.map(|_| {
Arc::new(FrontendSchemaProvider {
catalog_name: catalog.clone(),
schema_name: name.to_string(),
backend: self.backend.clone(),
partition_manager: self.partition_manager.clone(),
datanode_clients: self.datanode_clients.clone(),
catalog_manager: self.catalog_manager.clone(),
}) as Arc<dyn SchemaProvider>
});
@@ -380,9 +379,7 @@ impl CatalogProvider for FrontendCatalogProvider {
pub struct FrontendSchemaProvider {
catalog_name: String,
schema_name: String,
backend: KvBackendRef,
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
catalog_manager: Arc<FrontendCatalogManager>,
}
#[async_trait]
@@ -397,7 +394,8 @@ impl SchemaProvider for FrontendSchemaProvider {
tables.push("numbers".to_string());
}
let key = build_table_global_prefix(&self.catalog_name, &self.schema_name);
let iter = self.backend.range(key.as_bytes());
let backend = self.catalog_manager.backend();
let iter = backend.range(key.as_bytes());
let result = iter
.map(|r| {
let Kv(k, _) = r?;
@@ -424,7 +422,9 @@ impl SchemaProvider for FrontendSchemaProvider {
schema_name: self.schema_name.clone(),
table_name: name.to_string(),
};
let Some(kv) = self.backend.get(table_global_key.to_string().as_bytes()).await? else { return Ok(None) };
let Some(kv) = self.catalog_manager.backend().get(table_global_key.to_string().as_bytes()).await? else {
return Ok(None);
};
let v = TableGlobalValue::from_bytes(kv.1).context(InvalidCatalogValueSnafu)?;
let table_info = Arc::new(
v.table_info
@@ -434,9 +434,7 @@ impl SchemaProvider for FrontendSchemaProvider {
let table = Arc::new(DistTable::new(
TableName::new(&self.catalog_name, &self.schema_name, name),
table_info,
self.partition_manager.clone(),
self.datanode_clients.clone(),
self.backend.clone(),
self.catalog_manager.clone(),
));
Ok(Some(table))
}

View File

@@ -19,7 +19,7 @@ use datafusion::parquet;
use datatypes::arrow::error::ArrowError;
use datatypes::value::Value;
use snafu::Location;
use store_api::storage::RegionId;
use store_api::storage::RegionNumber;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
@@ -96,6 +96,9 @@ pub enum Error {
source: api::error::Error,
},
#[snafu(display("Failed to convert vector to GRPC column, reason: {}", reason))]
VectorToGrpcColumn { reason: String, location: Location },
#[snafu(display(
"Failed to convert column default constraint, column: {}, source: {}",
column_name,
@@ -115,7 +118,7 @@ pub enum Error {
#[snafu(display("Failed to find Datanode by region: {:?}", region))]
FindDatanode {
region: RegionId,
region: RegionNumber,
location: Location,
},
@@ -192,6 +195,12 @@ pub enum Error {
source: partition::error::Error,
},
#[snafu(display("Failed to split insert request, source: {}", source))]
SplitInsert {
source: partition::error::Error,
location: Location,
},
#[snafu(display("Failed to create table info, source: {}", source))]
CreateTableInfo {
#[snafu(backtrace)]
@@ -582,7 +591,8 @@ impl ErrorExt for Error {
| Error::CreateTableRoute { .. }
| Error::FindRegionRoute { .. }
| Error::BuildDfLogicalPlan { .. }
| Error::BuildTableMeta { .. } => StatusCode::Internal,
| Error::BuildTableMeta { .. }
| Error::VectorToGrpcColumn { .. } => StatusCode::Internal,
Error::IncompleteGrpcResult { .. }
| Error::ContextValueNotFound { .. }
@@ -623,7 +633,9 @@ impl ErrorExt for Error {
Error::External { source } => source.status_code(),
Error::DeserializePartition { source, .. }
| Error::FindTablePartitionRule { source, .. }
| Error::FindTableRoute { source, .. } => source.status_code(),
| Error::FindTableRoute { source, .. }
| Error::SplitInsert { source, .. } => source.status_code(),
Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
Error::StartScriptManager { source } => source.status_code(),

View File

@@ -13,5 +13,6 @@
// limitations under the License.
pub mod invalidate_table_cache;
#[cfg(test)]
mod tests;
pub(crate) mod tests;

View File

@@ -30,6 +30,7 @@ use tokio::sync::mpsc;
use super::invalidate_table_cache::InvalidateTableCacheHandler;
#[derive(Default)]
pub struct MockKvCacheInvalidator {
inner: Mutex<HashMap<Vec<u8>, i32>>,
}

View File

@@ -28,7 +28,7 @@ use api::v1::alter_expr::Kind;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::meta::Role;
use api::v1::{AddColumns, AlterExpr, Column, DdlRequest, InsertRequest};
use api::v1::{AddColumns, AlterExpr, Column, DdlRequest, InsertRequest, InsertRequests};
use async_trait::async_trait;
use catalog::remote::CachedMetaKvBackend;
use catalog::CatalogManagerRef;
@@ -47,6 +47,7 @@ use datanode::instance::sql::table_idents_to_full_name;
use datanode::instance::InstanceRef as DnInstanceRef;
use datatypes::schema::Schema;
use distributed::DistInstance;
use futures::future;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOptions;
use partition::manager::PartitionRuleManager;
@@ -281,24 +282,21 @@ impl Instance {
/// Handle batch inserts
pub async fn handle_inserts(
&self,
requests: Vec<InsertRequest>,
requests: InsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let mut success = 0;
for request in requests {
match self.handle_insert(request, ctx.clone()).await? {
Output::AffectedRows(rows) => success += rows,
_ => unreachable!("Insert should not yield output other than AffectedRows"),
}
}
Ok(Output::AffectedRows(success))
}
// TODO(LFC): Optimize concurrent table creation and table alteration.
// Currently table creation is guarded by a distributed lock in Metasrv. However, table
// alteration is not. We should all switch to procedures in Metasrv.
let _ = future::join_all(
requests
.inserts
.iter()
.map(|x| self.create_or_alter_table_on_demand(ctx.clone(), x)),
)
.await;
async fn handle_insert(&self, request: InsertRequest, ctx: QueryContextRef) -> Result<Output> {
self.create_or_alter_table_on_demand(ctx.clone(), &request)
.await?;
let query = Request::Insert(request);
let query = Request::Inserts(requests);
GrpcQueryHandler::do_query(&*self.grpc_query_handler, query, ctx).await
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod inserter;
use std::collections::HashMap;
use std::sync::Arc;
@@ -20,7 +22,7 @@ use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::{
column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequest, DropTableExpr,
FlushTableExpr, InsertRequest, TableId,
FlushTableExpr, InsertRequests, TableId,
};
use async_trait::async_trait;
use catalog::helper::{SchemaKey, SchemaValue};
@@ -38,7 +40,7 @@ use common_meta::rpc::router::{
use common_meta::rpc::store::CompareAndPutRequest;
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::debug;
use common_telemetry::{debug, info, warn};
use datanode::instance::sql::table_idents_to_full_name;
use datanode::sql::SqlHandler;
use datatypes::prelude::ConcreteDataType;
@@ -68,9 +70,10 @@ use crate::error::{
DeserializePartitionSnafu, InvokeDatanodeSnafu, ParseSqlSnafu, PrimaryKeyNotFoundSnafu,
RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, StartMetaClientSnafu,
TableAlreadyExistSnafu, TableNotFoundSnafu, TableSnafu, ToTableDeleteRequestSnafu,
ToTableInsertRequestSnafu, UnrecognizedTableOptionSnafu,
UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
use crate::instance::distributed::inserter::DistInserter;
use crate::table::DistTable;
const MAX_VALUE: &str = "MAXVALUE";
@@ -95,6 +98,17 @@ impl DistInstance {
}
}
async fn find_table(&self, table_name: &TableName) -> Result<Option<TableRef>> {
self.catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
)
.await
.context(CatalogSnafu)
}
pub async fn create_table(
&self,
create_table: &mut CreateTableExpr,
@@ -107,16 +121,7 @@ impl DistInstance {
&create_table.table_name,
);
if let Some(table) = self
.catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
)
.await
.context(CatalogSnafu)?
{
if let Some(table) = self.find_table(&table_name).await? {
return if create_table.create_if_not_exists {
Ok(table)
} else {
@@ -131,7 +136,19 @@ impl DistInstance {
let response = self
.create_table_in_meta(create_table, partitions, &table_info)
.await?;
.await;
let response = match response {
Ok(response) => response,
Err(e) => {
return if let Some(table) = self.find_table(&table_name).await? {
warn!("Table '{table_name}' is created concurrently by other Frontend nodes!");
Ok(table)
} else {
Err(e)
}
}
};
let table_routes = response.table_routes;
ensure!(
table_routes.len() == 1,
@@ -140,7 +157,7 @@ impl DistInstance {
}
);
let table_route = table_routes.first().unwrap();
debug!(
info!(
"Creating distributed table {table_name} with table routes: {}",
serde_json::to_string_pretty(table_route)
.unwrap_or_else(|_| format!("{table_route:#?}"))
@@ -162,9 +179,7 @@ impl DistInstance {
let table = Arc::new(DistTable::new(
table_name.clone(),
table_info,
self.catalog_manager.partition_manager(),
self.catalog_manager.datanode_clients(),
self.catalog_manager.backend(),
self.catalog_manager.clone(),
));
let request = RegisterTableRequest {
@@ -569,36 +584,21 @@ impl DistInstance {
.context(RequestMetaSnafu)
}
// TODO(LFC): Refactor GRPC insertion and deletion implementation here,
// Take insertion as an example. GRPC insertion is converted to Table InsertRequest here,
// than split the Table InsertRequest in DistTable, than assemble each GRPC InsertRequest there.
// Rather inefficient, should operate on GRPC InsertRequest directly.
async fn handle_dist_insert(
&self,
request: InsertRequest,
requests: InsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let catalog = &ctx.current_catalog();
let schema = &ctx.current_schema();
let table_name = &request.table_name;
let table_ref = TableReference::full(catalog, schema, table_name);
let table = self
.catalog_manager
.table(catalog, schema, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let request = common_grpc_expr::insert::to_table_insert_request(catalog, schema, request)
.context(ToTableInsertRequestSnafu)?;
let affected_rows = table.insert(request).await.context(TableSnafu)?;
Ok(Output::AffectedRows(affected_rows))
let inserter = DistInserter::new(
ctx.current_catalog(),
ctx.current_schema(),
self.catalog_manager.clone(),
);
let affected_rows = inserter.grpc_insert(requests).await?;
Ok(Output::AffectedRows(affected_rows as usize))
}
// TODO(LFC): Like insertions above, refactor GRPC deletion impl here.
async fn handle_dist_delete(
&self,
request: DeleteRequest,
@@ -650,7 +650,7 @@ impl GrpcQueryHandler for DistInstance {
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
match request {
Request::Insert(request) => self.handle_dist_insert(request, ctx).await,
Request::Inserts(requests) => self.handle_dist_insert(requests, ctx).await,
Request::Delete(request) => self.handle_dist_delete(request, ctx).await,
Request::Query(_) => {
unreachable!("Query should have been handled directly in Frontend Instance!")

View File

@@ -0,0 +1,380 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::InsertRequests;
use catalog::CatalogManager;
use client::Database;
use common_grpc_expr::insert::to_table_insert_request;
use common_meta::peer::Peer;
use common_meta::table_name::TableName;
use futures::future;
use metrics::counter;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableInfoRef;
use table::meter_insert_request;
use table::requests::InsertRequest;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
CatalogSnafu, FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, RequestDatanodeSnafu,
Result, SplitInsertSnafu, TableNotFoundSnafu, ToTableInsertRequestSnafu,
};
use crate::table::insert::to_grpc_insert_request;
/// A distributed inserter. It ingests GRPC [InsertRequests] or table [InsertRequest] (so it can be
/// used in protocol handlers or table insertion API).
///
/// Table data partitioning and Datanode requests batching are handled inside.
///
/// Note that the inserter is confined to a single catalog and schema. I.e., it cannot handle
/// multiple insert requests with different catalog or schema (will throw "NotSupported" error).
/// This is because we currently do not have this kind of requirements. Let's keep it simple for now.
pub(crate) struct DistInserter {
catalog: String,
schema: String,
catalog_manager: Arc<FrontendCatalogManager>,
}
impl DistInserter {
pub(crate) fn new(
catalog: String,
schema: String,
catalog_manager: Arc<FrontendCatalogManager>,
) -> Self {
Self {
catalog,
schema,
catalog_manager,
}
}
pub(crate) async fn grpc_insert(&self, requests: InsertRequests) -> Result<u32> {
let inserts = requests
.inserts
.into_iter()
.map(|x| {
to_table_insert_request(&self.catalog, &self.schema, x)
.context(ToTableInsertRequestSnafu)
})
.collect::<Result<Vec<_>>>()?;
self.insert(inserts).await
}
pub(crate) async fn insert(&self, requests: Vec<InsertRequest>) -> Result<u32> {
debug_assert!(requests
.iter()
.all(|x| x.catalog_name == self.catalog && x.schema_name == self.schema));
let inserts = self.split_inserts(requests).await?;
self.request_datanodes(inserts).await
}
/// Splits multiple table [InsertRequest]s into multiple GRPC [InsertRequests]s, each of which
/// is grouped by the peer of Datanode, so we can batch them together when invoking gRPC write
/// method in Datanode.
async fn split_inserts(
&self,
requests: Vec<InsertRequest>,
) -> Result<HashMap<Peer, InsertRequests>> {
let partition_manager = self.catalog_manager.partition_manager();
let mut inserts = HashMap::new();
for request in requests {
meter_insert_request!(request);
let table_name = TableName::new(&self.catalog, &self.schema, &request.table_name);
let table_info = self.find_table_info(&request.table_name).await?;
let table_meta = &table_info.meta;
let split = partition_manager
.split_insert_request(&table_name, request, table_meta.schema.as_ref())
.await
.context(SplitInsertSnafu)?;
let table_route = partition_manager
.find_table_route(&table_name)
.await
.with_context(|_| FindTableRouteSnafu {
table_name: table_name.to_string(),
})?;
for (region_number, insert) in split {
let datanode =
table_route
.find_region_leader(region_number)
.context(FindDatanodeSnafu {
region: region_number,
})?;
let insert = to_grpc_insert_request(table_meta, region_number, insert)?;
inserts
.entry(datanode.clone())
.or_insert_with(|| InsertRequests { inserts: vec![] })
.inserts
.push(insert);
}
}
Ok(inserts)
}
async fn find_table_info(&self, table_name: &str) -> Result<TableInfoRef> {
let table = self
.catalog_manager
.table(&self.catalog, &self.schema, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: common_catalog::format_full_table_name(
&self.catalog,
&self.schema,
table_name,
),
})?;
Ok(table.table_info())
}
async fn request_datanodes(&self, inserts: HashMap<Peer, InsertRequests>) -> Result<u32> {
let results = future::try_join_all(inserts.into_iter().map(|(peer, inserts)| {
let datanode_clients = self.catalog_manager.datanode_clients();
let catalog = self.catalog.clone();
let schema = self.schema.clone();
common_runtime::spawn_write(async move {
let client = datanode_clients.get_client(&peer).await;
let database = Database::new(&catalog, &schema, client);
database.insert(inserts).await.context(RequestDatanodeSnafu)
})
}))
.await
.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<u32>>()?;
counter!(crate::metrics::DIST_INGEST_ROW_COUNT, affected_rows as u64);
Ok(affected_rows)
}
}
#[cfg(test)]
mod tests {
use api::v1::column::{SemanticType, Values};
use api::v1::{Column, ColumnDataType, InsertRequest as GrpcInsertRequest};
use catalog::helper::{
CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue,
};
use catalog::remote::mock::MockKvBackend;
use catalog::remote::{KvBackend, KvBackendRef};
use client::client_manager::DatanodeClients;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use super::*;
use crate::heartbeat::handler::tests::MockKvCacheInvalidator;
use crate::table::test::create_partition_rule_manager;
async fn prepare_mocked_backend() -> KvBackendRef {
let backend = Arc::new(MockKvBackend::default());
let default_catalog = CatalogKey {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
}
.to_string();
backend
.set(
default_catalog.as_bytes(),
CatalogValue.as_bytes().unwrap().as_slice(),
)
.await
.unwrap();
let default_schema = SchemaKey {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
}
.to_string();
backend
.set(
default_schema.as_bytes(),
SchemaValue.as_bytes().unwrap().as_slice(),
)
.await
.unwrap();
backend
}
async fn create_testing_table(backend: &KvBackendRef, table_name: &str) {
let table_global_key = TableGlobalKey {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
};
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true)
.with_default_constraint(Some(ColumnDefaultConstraint::Function(
"current_timestamp()".to_string(),
)))
.unwrap(),
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
]));
let mut builder = TableMetaBuilder::default();
builder.schema(schema);
builder.primary_key_indices(vec![]);
builder.next_column_id(1);
let table_meta = builder.build().unwrap();
let table_info = TableInfoBuilder::new(table_name, table_meta)
.build()
.unwrap();
let table_global_value = TableGlobalValue {
node_id: 1,
regions_id_map: HashMap::from([(1, vec![1]), (2, vec![2]), (3, vec![3])]),
table_info: table_info.into(),
};
backend
.set(
table_global_key.to_string().as_bytes(),
table_global_value.as_bytes().unwrap().as_slice(),
)
.await
.unwrap();
}
#[tokio::test]
async fn test_split_inserts() {
let backend = prepare_mocked_backend().await;
let table_name = "one_column_partitioning_table";
create_testing_table(&backend, table_name).await;
let catalog_manager = Arc::new(FrontendCatalogManager::new(
backend,
Arc::new(MockKvCacheInvalidator::default()),
create_partition_rule_manager().await,
Arc::new(DatanodeClients::default()),
));
let inserter = DistInserter::new(
DEFAULT_CATALOG_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string(),
catalog_manager,
);
let new_insert_request = |vector: VectorRef| -> InsertRequest {
InsertRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
columns_values: HashMap::from([("a".to_string(), vector)]),
region_number: 0,
}
};
let requests = vec![
new_insert_request(Arc::new(Int32Vector::from(vec![
Some(1),
None,
Some(11),
Some(101),
]))),
new_insert_request(Arc::new(Int32Vector::from(vec![
Some(2),
Some(12),
None,
Some(102),
]))),
];
let mut inserts = inserter.split_inserts(requests).await.unwrap();
assert_eq!(inserts.len(), 3);
let new_grpc_insert_request = |column_values: Vec<i32>,
null_mask: Vec<u8>,
row_count: u32,
region_number: u32|
-> GrpcInsertRequest {
GrpcInsertRequest {
table_name: table_name.to_string(),
columns: vec![Column {
column_name: "a".to_string(),
semantic_type: SemanticType::Field as i32,
values: Some(Values {
i32_values: column_values,
..Default::default()
}),
null_mask,
datatype: ColumnDataType::Int32 as i32,
}],
row_count,
region_number,
}
};
// region to datanode placement:
// 1 -> 1
// 2 -> 2
// 3 -> 3
//
// region value ranges:
// 1 -> [50, max)
// 2 -> [10, 50)
// 3 -> (min, 10)
let datanode_inserts = inserts.remove(&Peer::new(1, "")).unwrap().inserts;
assert_eq!(datanode_inserts.len(), 2);
assert_eq!(
datanode_inserts[0],
new_grpc_insert_request(vec![101], vec![0], 1, 1)
);
assert_eq!(
datanode_inserts[1],
new_grpc_insert_request(vec![102], vec![0], 1, 1)
);
let datanode_inserts = inserts.remove(&Peer::new(2, "")).unwrap().inserts;
assert_eq!(datanode_inserts.len(), 2);
assert_eq!(
datanode_inserts[0],
new_grpc_insert_request(vec![11], vec![0], 1, 2)
);
assert_eq!(
datanode_inserts[1],
new_grpc_insert_request(vec![12], vec![0], 1, 2)
);
let datanode_inserts = inserts.remove(&Peer::new(3, "")).unwrap().inserts;
assert_eq!(datanode_inserts.len(), 2);
assert_eq!(
datanode_inserts[0],
new_grpc_insert_request(vec![1], vec![2], 2, 3)
);
assert_eq!(
datanode_inserts[1],
new_grpc_insert_request(vec![2], vec![2], 2, 3)
);
}
}

View File

@@ -36,7 +36,7 @@ impl GrpcQueryHandler for Instance {
interceptor.pre_execute(&request, ctx.clone())?;
let output = match request {
Request::Insert(request) => self.handle_insert(request, ctx.clone()).await?,
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::Query(query_request) => {
let query = query_request.query.context(IncompleteGrpcResultSnafu {
err_msg: "Missing field 'QueryRequest.query'",

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::InsertRequests;
use async_trait::async_trait;
use common_error::prelude::BoxedError;
use servers::error as server_error;
@@ -25,8 +26,10 @@ use crate::instance::Instance;
#[async_trait]
impl OpentsdbProtocolHandler for Instance {
async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> server_error::Result<()> {
let request = data_point.as_grpc_insert();
self.handle_insert(request, ctx)
let requests = InsertRequests {
inserts: vec![data_point.as_grpc_insert()],
};
self.handle_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {

View File

@@ -21,6 +21,7 @@ use common_error::prelude::BoxedError;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging;
use metrics::counter;
use prost::Message;
use servers::error::{self, Result as ServerResult};
use servers::prometheus::{self, Metrics};
@@ -30,6 +31,7 @@ use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use crate::instance::Instance;
use crate::metrics::PROMETHEUS_REMOTE_WRITE_SAMPLES;
const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
@@ -106,11 +108,13 @@ impl Instance {
#[async_trait]
impl PrometheusProtocolHandler for Instance {
async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> ServerResult<()> {
let requests = prometheus::to_grpc_insert_requests(request.clone())?;
let (requests, samples) = prometheus::to_grpc_insert_requests(request)?;
self.handle_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
counter!(PROMETHEUS_REMOTE_WRITE_SAMPLES, samples as u64);
Ok(())
}

View File

@@ -22,3 +22,6 @@ pub const DIST_CREATE_TABLE: &str = "frontend.dist.create_table";
pub const DIST_CREATE_TABLE_IN_META: &str = "frontend.dist.create_table.update_meta";
pub const DIST_CREATE_TABLE_IN_DATANODE: &str = "frontend.dist.create_table.invoke_datanode";
pub const DIST_INGEST_ROW_COUNT: &str = "frontend.dist.ingest_rows";
/// The samples count of Prometheus remote write.
pub const PROMETHEUS_REMOTE_WRITE_SAMPLES: &str = "frontend.prometheus.remote_write.samples";

View File

@@ -20,8 +20,6 @@ use std::sync::Arc;
use api::v1::AlterExpr;
use async_trait::async_trait;
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use catalog::remote::KvBackendRef;
use client::client_manager::DatanodeClients;
use client::Database;
use common_error::prelude::BoxedError;
use common_meta::key::TableRouteKey;
@@ -45,7 +43,7 @@ use datafusion::physical_plan::{
use datafusion_common::DataFusionError;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use futures_util::{Stream, StreamExt};
use partition::manager::{PartitionRuleManagerRef, TableRouteCacheInvalidator};
use partition::manager::TableRouteCacheInvalidator;
use partition::splitter::WriteSplitter;
use snafu::prelude::*;
use store_api::storage::{RegionNumber, ScanRequest};
@@ -53,12 +51,13 @@ use table::error::TableOperationSnafu;
use table::metadata::{FilterPushDownType, TableInfo, TableInfoRef};
use table::requests::{AlterKind, AlterTableRequest, DeleteRequest, InsertRequest};
use table::table::AlterContext;
use table::{meter_insert_request, Table};
use table::Table;
use tokio::sync::RwLock;
use crate::catalog::FrontendCatalogManager;
use crate::error::{self, FindDatanodeSnafu, FindTableRouteSnafu, Result};
use crate::instance::distributed::inserter::DistInserter;
use crate::table::delete::to_grpc_delete_request;
use crate::table::insert::to_grpc_insert_request;
use crate::table::scan::{DatanodeInstance, TableScanPlan};
mod delete;
@@ -69,10 +68,7 @@ pub(crate) mod scan;
pub struct DistTable {
table_name: TableName,
table_info: TableInfoRef,
partition_manager: PartitionRuleManagerRef,
// TODO(ruihang): move this field into PartitionRuleManager
datanode_clients: Arc<DatanodeClients>,
backend: KvBackendRef,
catalog_manager: Arc<FrontendCatalogManager>,
}
#[async_trait]
@@ -90,29 +86,17 @@ impl Table for DistTable {
}
async fn insert(&self, request: InsertRequest) -> table::Result<usize> {
meter_insert_request!(request);
let splits = self
.partition_manager
.split_insert_request(&self.table_name, request, &self.schema())
let inserter = DistInserter::new(
request.catalog_name.clone(),
request.schema_name.clone(),
self.catalog_manager.clone(),
);
let affected_rows = inserter
.insert(vec![request])
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let inserts = splits
.into_iter()
.map(|(region_number, insert)| to_grpc_insert_request(region_number, insert))
.collect::<Result<Vec<_>>>()
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let output = self
.dist_insert(inserts)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let Output::AffectedRows(rows) = output else { unreachable!() };
Ok(rows)
Ok(affected_rows as usize)
}
async fn scan(
@@ -121,20 +105,20 @@ impl Table for DistTable {
filters: &[Expr],
limit: Option<usize>,
) -> table::Result<PhysicalPlanRef> {
let partition_rule = self
.partition_manager
let partition_manager = self.catalog_manager.partition_manager();
let datanode_clients = self.catalog_manager.datanode_clients();
let partition_rule = partition_manager
.find_table_partition_rule(&self.table_name)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let regions = self
.partition_manager
let regions = partition_manager
.find_regions_by_filters(partition_rule, filters)
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let datanodes = self
.partition_manager
let datanodes = partition_manager
.find_region_datanodes(&self.table_name, regions)
.await
.map_err(BoxedError::new)
@@ -143,7 +127,7 @@ impl Table for DistTable {
let table_name = &self.table_name;
let mut partition_execs = Vec::with_capacity(datanodes.len());
for (datanode, _regions) in datanodes.iter() {
let client = self.datanode_clients.get_client(datanode).await;
let client = datanode_clients.get_client(datanode).await;
let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client);
let datanode_instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db);
@@ -169,20 +153,20 @@ impl Table for DistTable {
&self,
request: ScanRequest,
) -> table::Result<SendableRecordBatchStream> {
let partition_rule = self
.partition_manager
let partition_manager = self.catalog_manager.partition_manager();
let datanode_clients = self.catalog_manager.datanode_clients();
let partition_rule = partition_manager
.find_table_partition_rule(&self.table_name)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let regions = self
.partition_manager
let regions = partition_manager
.find_regions_by_filters(partition_rule, &request.filters)
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let datanodes = self
.partition_manager
let datanodes = partition_manager
.find_region_datanodes(&self.table_name, regions)
.await
.map_err(BoxedError::new)
@@ -191,7 +175,7 @@ impl Table for DistTable {
let table_name = &self.table_name;
let mut partition_execs = Vec::with_capacity(datanodes.len());
for (datanode, _regions) in datanodes.iter() {
let client = self.datanode_clients.get_client(datanode).await;
let client = datanode_clients.get_client(datanode).await;
let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client);
let datanode_instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db);
@@ -247,8 +231,9 @@ impl Table for DistTable {
}
async fn delete(&self, request: DeleteRequest) -> table::Result<usize> {
let partition_rule = self
.partition_manager
let partition_manager = self.catalog_manager.partition_manager();
let partition_rule = partition_manager
.find_table_partition_rule(&self.table_name)
.await
.map_err(BoxedError::new)
@@ -276,7 +261,12 @@ impl Table for DistTable {
requests
.into_iter()
.map(|(region_number, request)| {
to_grpc_delete_request(&self.table_name, region_number, request)
to_grpc_delete_request(
&table_info.meta,
&self.table_name,
region_number,
request,
)
})
.collect::<Result<Vec<_>>>()
.map_err(BoxedError::new)
@@ -297,16 +287,12 @@ impl DistTable {
pub fn new(
table_name: TableName,
table_info: TableInfoRef,
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
backend: KvBackendRef,
catalog_manager: Arc<FrontendCatalogManager>,
) -> Self {
Self {
table_name,
table_info,
partition_manager,
datanode_clients,
backend,
catalog_manager,
}
}
@@ -315,7 +301,8 @@ impl DistTable {
key: &TableGlobalKey,
) -> Result<Option<TableGlobalValue>> {
let raw = self
.backend
.catalog_manager
.backend()
.get(key.to_string().as_bytes())
.await
.context(error::CatalogSnafu)?;
@@ -332,14 +319,16 @@ impl DistTable {
value: TableGlobalValue,
) -> Result<()> {
let value = value.as_bytes().context(error::CatalogEntrySerdeSnafu)?;
self.backend
self.catalog_manager
.backend()
.set(key.to_string().as_bytes(), &value)
.await
.context(error::CatalogSnafu)
}
async fn delete_table_global_value(&self, key: TableGlobalKey) -> Result<()> {
self.backend
self.catalog_manager
.backend()
.delete(key.to_string().as_bytes())
.await
.context(error::CatalogSnafu)
@@ -369,12 +358,14 @@ impl DistTable {
}
.key();
self.backend
self.catalog_manager
.backend()
.move_value(old_key.as_bytes(), new_key.as_bytes())
.await
.context(error::CatalogSnafu)?;
self.partition_manager
self.catalog_manager
.partition_manager()
.invalidate_table_route(&TableName {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
@@ -451,7 +442,8 @@ impl DistTable {
/// [`table::requests::AlterTableRequest`] and [`AlterExpr`].
async fn alter_by_expr(&self, expr: &AlterExpr) -> Result<()> {
let table_routes = self
.partition_manager
.catalog_manager
.partition_manager()
.find_table_route(&self.table_name)
.await
.with_context(|_| error::FindTableRouteSnafu {
@@ -467,8 +459,10 @@ impl DistTable {
)
}
);
let datanode_clients = self.catalog_manager.datanode_clients();
for datanode in leaders {
let client = self.datanode_clients.get_client(&datanode).await;
let client = datanode_clients.get_client(&datanode).await;
let db = Database::new(&expr.catalog_name, &expr.schema_name, client);
debug!("Sending {:?} to {:?}", expr, db);
let result = db
@@ -487,7 +481,8 @@ impl DistTable {
) -> Result<Vec<DatanodeInstance>> {
let table_name = &self.table_name;
let route = self
.partition_manager
.catalog_manager
.partition_manager()
.find_table_route(table_name)
.await
.with_context(|_| FindTableRouteSnafu {
@@ -497,24 +492,16 @@ impl DistTable {
let datanodes = regions
.iter()
.map(|&n| {
let region_id = n as u64;
route
.region_routes
.iter()
.find_map(|x| {
if x.region.id == region_id {
x.leader_peer.clone()
} else {
None
}
})
.context(FindDatanodeSnafu { region: region_id })
.find_region_leader(n)
.context(FindDatanodeSnafu { region: n })
})
.collect::<Result<Vec<_>>>()?;
let datanode_clients = self.catalog_manager.datanode_clients();
let mut instances = Vec::with_capacity(datanodes.len());
for datanode in datanodes {
let client = self.datanode_clients.get_client(&datanode).await;
let client = datanode_clients.get_client(datanode).await;
let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client);
instances.push(DatanodeInstance::new(Arc::new(self.clone()) as _, db));
}
@@ -622,12 +609,12 @@ impl PartitionExec {
}
#[cfg(test)]
mod test {
pub(crate) mod test {
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use catalog::error::Result;
use catalog::remote::{KvBackend, ValueIter};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute, Table, TableRoute};
use datafusion_expr::expr_fn::{and, binary_expr, col, or};
use datafusion_expr::{lit, Operator};
@@ -637,7 +624,7 @@ mod test {
use meter_core::global::global_registry;
use meter_core::write_calc::WriteCalculator;
use partition::columns::RangeColumnsPartitionRule;
use partition::manager::PartitionRuleManager;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use partition::partition::{PartitionBound, PartitionDef};
use partition::range::RangePartitionRule;
use partition::route::TableRoutes;
@@ -647,56 +634,38 @@ mod test {
use super::*;
struct DummyKvBackend;
#[async_trait]
impl KvBackend for DummyKvBackend {
fn as_any(&self) -> &dyn Any {
self
}
fn range<'a, 'b>(&'a self, _key: &[u8]) -> ValueIter<'b, catalog::error::Error>
where
'a: 'b,
{
unimplemented!()
}
async fn set(&self, _key: &[u8], _val: &[u8]) -> Result<()> {
unimplemented!()
}
async fn compare_and_set(
&self,
_key: &[u8],
_expect: &[u8],
_val: &[u8],
) -> Result<std::result::Result<(), Option<Vec<u8>>>> {
unimplemented!()
}
async fn move_value(&self, _from_key: &[u8], _to_key: &[u8]) -> Result<()> {
unimplemented!()
}
async fn delete_range(&self, _key: &[u8], _end: &[u8]) -> Result<()> {
unimplemented!()
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_find_partition_rule() {
let table_name = TableName::new("greptime", "public", "foo");
/// Create a partition rule manager with two tables, one is partitioned by single column, and
/// the other one is two. The tables are under default catalog and schema.
///
/// Table named "one_column_partitioning_table" is partitioned by column "a" like this:
/// PARTITION BY RANGE (a) (
/// PARTITION r1 VALUES LESS THAN (10),
/// PARTITION r2 VALUES LESS THAN (50),
/// PARTITION r3 VALUES LESS THAN (MAXVALUE),
/// )
///
/// Table named "two_column_partitioning_table" is partitioned by columns "a" and "b" like this:
/// PARTITION BY RANGE (a, b) (
/// PARTITION r1 VALUES LESS THAN (10, 'hz'),
/// PARTITION r2 VALUES LESS THAN (50, 'sh'),
/// PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE),
/// )
pub(crate) async fn create_partition_rule_manager() -> PartitionRuleManagerRef {
let table_routes = Arc::new(TableRoutes::new(Arc::new(MetaClient::default())));
let partition_manager = Arc::new(PartitionRuleManager::new(table_routes.clone()));
let table_route = TableRoute {
table: Table {
let table_name = TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"one_column_partitioning_table",
);
let table_route = TableRoute::new(
Table {
id: 1,
table_name: table_name.clone(),
table_schema: vec![],
},
region_routes: vec![
vec![
RegionRoute {
region: Region {
id: 3,
@@ -711,7 +680,7 @@ mod test {
),
attrs: HashMap::new(),
},
leader_peer: None,
leader_peer: Some(Peer::new(3, "")),
follower_peers: vec![],
},
RegionRoute {
@@ -728,7 +697,7 @@ mod test {
),
attrs: HashMap::new(),
},
leader_peer: None,
leader_peer: Some(Peer::new(2, "")),
follower_peers: vec![],
},
RegionRoute {
@@ -745,34 +714,27 @@ mod test {
),
attrs: HashMap::new(),
},
leader_peer: None,
leader_peer: Some(Peer::new(1, "")),
follower_peers: vec![],
},
],
};
);
table_routes
.insert_table_route(table_name.clone(), Arc::new(table_route))
.await;
let partition_rule = partition_manager
.find_table_partition_rule(&table_name)
.await
.unwrap();
let range_rule = partition_rule
.as_any()
.downcast_ref::<RangePartitionRule>()
.unwrap();
assert_eq!(range_rule.column_name(), "a");
assert_eq!(range_rule.all_regions(), &vec![3, 2, 1]);
assert_eq!(range_rule.bounds(), &vec![10_i32.into(), 50_i32.into()]);
let table_route = TableRoute {
table: Table {
let table_name = TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"two_column_partitioning_table",
);
let table_route = TableRoute::new(
Table {
id: 1,
table_name: table_name.clone(),
table_schema: vec![],
},
region_routes: vec![
vec![
RegionRoute {
region: Region {
id: 1,
@@ -831,13 +793,40 @@ mod test {
follower_peers: vec![],
},
],
};
);
table_routes
.insert_table_route(table_name.clone(), Arc::new(table_route))
.await;
partition_manager
}
#[tokio::test(flavor = "multi_thread")]
async fn test_find_partition_rule() {
let partition_manager = create_partition_rule_manager().await;
let partition_rule = partition_manager
.find_table_partition_rule(&table_name)
.find_table_partition_rule(&TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"one_column_partitioning_table",
))
.await
.unwrap();
let range_rule = partition_rule
.as_any()
.downcast_ref::<RangePartitionRule>()
.unwrap();
assert_eq!(range_rule.column_name(), "a");
assert_eq!(range_rule.all_regions(), &vec![3, 2, 1]);
assert_eq!(range_rule.bounds(), &vec![10_i32.into(), 50_i32.into()]);
let partition_rule = partition_manager
.find_table_partition_rule(&TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"two_column_partitioning_table",
))
.await
.unwrap();
let range_columns_rule = partition_rule

View File

@@ -18,6 +18,7 @@ use common_query::Output;
use futures::future;
use snafu::ResultExt;
use store_api::storage::RegionNumber;
use table::metadata::TableMeta;
use table::requests::DeleteRequest;
use crate::error::{JoinTaskSnafu, RequestDatanodeSnafu, Result};
@@ -48,11 +49,12 @@ impl DistTable {
}
pub(super) fn to_grpc_delete_request(
table_meta: &TableMeta,
table_name: &TableName,
region_number: RegionNumber,
request: DeleteRequest,
) -> Result<GrpcDeleteRequest> {
let (key_columns, row_count) = to_grpc_columns(&request.key_column_values)?;
let (key_columns, row_count) = to_grpc_columns(table_meta, &request.key_column_values)?;
Ok(GrpcDeleteRequest {
table_name: table_name.table_name.clone(),
region_number,
@@ -68,13 +70,27 @@ mod tests {
use api::v1::column::{SemanticType, Values};
use api::v1::{Column, ColumnDataType};
use datatypes::prelude::VectorRef;
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use table::metadata::TableMetaBuilder;
use super::*;
#[test]
fn test_to_grpc_delete_request() {
let schema = Schema::new(vec![
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true),
ColumnSchema::new("id", ConcreteDataType::int32_datatype(), false),
]);
let mut builder = TableMetaBuilder::default();
builder.schema(Arc::new(schema));
builder.primary_key_indices(vec![]);
builder.next_column_id(2);
let table_meta = builder.build().unwrap();
let table_name = TableName {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
@@ -88,7 +104,8 @@ mod tests {
)]);
let request = DeleteRequest { key_column_values };
let result = to_grpc_delete_request(&table_name, region_number, request).unwrap();
let result =
to_grpc_delete_request(&table_meta, &table_name, region_number, request).unwrap();
assert_eq!(result.table_name, "foo");
assert_eq!(result.region_number, region_number);

View File

@@ -15,49 +15,18 @@
use std::collections::HashMap;
use api::helper::{push_vals, ColumnDataTypeWrapper};
use api::v1::column::SemanticType;
use api::v1::column::{SemanticType, Values};
use api::v1::{Column, InsertRequest as GrpcInsertRequest};
use common_query::Output;
use datatypes::prelude::{ConcreteDataType, VectorRef};
use futures::future;
use metrics::counter;
use snafu::{ensure, ResultExt};
use datatypes::prelude::*;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use table::metadata::TableMeta;
use table::requests::InsertRequest;
use super::DistTable;
use crate::error;
use crate::error::{JoinTaskSnafu, RequestDatanodeSnafu, Result};
impl DistTable {
pub async fn dist_insert(&self, inserts: Vec<GrpcInsertRequest>) -> Result<Output> {
let regions = inserts.iter().map(|x| x.region_number).collect::<Vec<_>>();
let instances = self.find_datanode_instances(&regions).await?;
let results = future::try_join_all(instances.into_iter().zip(inserts.into_iter()).map(
|(instance, request)| {
common_runtime::spawn_write(async move {
instance
.grpc_insert(request)
.await
.context(RequestDatanodeSnafu)
})
},
))
.await
.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<u32>>()?;
counter!(crate::metrics::DIST_INGEST_ROW_COUNT, affected_rows as u64);
Ok(Output::AffectedRows(affected_rows as _))
}
}
pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result<(Vec<Column>, u32)> {
to_grpc_columns(&insert.columns_values)
}
use crate::error::{self, ColumnDataTypeSnafu, NotSupportedSnafu, Result, VectorToGrpcColumnSnafu};
pub(crate) fn to_grpc_columns(
table_meta: &TableMeta,
columns_values: &HashMap<String, VectorRef>,
) -> Result<(Vec<Column>, u32)> {
let mut row_count = None;
@@ -76,27 +45,7 @@ pub(crate) fn to_grpc_columns(
None => row_count = Some(vector.len()),
}
let datatype: ColumnDataTypeWrapper = vector
.data_type()
.try_into()
.context(error::ColumnDataTypeSnafu)?;
// TODO(hl): need refactor
let semantic_type =
if vector.data_type() == ConcreteDataType::timestamp_millisecond_datatype() {
SemanticType::Timestamp
} else {
SemanticType::Field
};
let mut column = Column {
column_name: column_name.clone(),
semantic_type: semantic_type.into(),
datatype: datatype.datatype() as i32,
..Default::default()
};
push_vals(&mut column, 0, vector.clone());
let column = vector_to_grpc_column(table_meta, column_name, vector.clone())?;
Ok(column)
})
.collect::<Result<Vec<_>>>()?;
@@ -107,11 +56,12 @@ pub(crate) fn to_grpc_columns(
}
pub(crate) fn to_grpc_insert_request(
table_meta: &TableMeta,
region_number: RegionNumber,
insert: InsertRequest,
) -> Result<GrpcInsertRequest> {
let table_name = insert.table_name.clone();
let (columns, row_count) = insert_request_to_insert_batch(&insert)?;
let (columns, row_count) = to_grpc_columns(table_meta, &insert.columns_values)?;
Ok(GrpcInsertRequest {
table_name,
region_number,
@@ -120,23 +70,142 @@ pub(crate) fn to_grpc_insert_request(
})
}
fn vector_to_grpc_column(
table_meta: &TableMeta,
column_name: &str,
vector: VectorRef,
) -> Result<Column> {
let time_index_column = &table_meta
.schema
.timestamp_column()
.context(NotSupportedSnafu {
feat: "Table without time index.",
})?
.name;
let semantic_type = if column_name == time_index_column {
SemanticType::Timestamp
} else {
let column_index = table_meta
.schema
.column_index_by_name(column_name)
.context(VectorToGrpcColumnSnafu {
reason: format!("unable to find column {column_name} in table schema"),
})?;
if table_meta.primary_key_indices.contains(&column_index) {
SemanticType::Tag
} else {
SemanticType::Field
}
};
let datatype: ColumnDataTypeWrapper =
vector.data_type().try_into().context(ColumnDataTypeSnafu)?;
let mut column = Column {
column_name: column_name.to_string(),
semantic_type: semantic_type as i32,
null_mask: vec![],
datatype: datatype.datatype() as i32,
values: Some(Values::default()), // vector values will be pushed into it below
};
push_vals(&mut column, 0, vector);
Ok(column)
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::ColumnDataType;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::prelude::ScalarVectorBuilder;
use datatypes::vectors::{Int16VectorBuilder, MutableVector, StringVectorBuilder};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{
Int16VectorBuilder, Int32Vector, Int64Vector, MutableVector, StringVector,
StringVectorBuilder,
};
use table::metadata::TableMetaBuilder;
use table::requests::InsertRequest;
use super::*;
#[test]
fn test_to_grpc_insert_request() {
let insert_request = mock_insert_request();
fn test_vector_to_grpc_column() {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true),
ColumnSchema::new("k", ConcreteDataType::int32_datatype(), false),
ColumnSchema::new("v", ConcreteDataType::string_datatype(), true),
]));
let request = to_grpc_insert_request(12, insert_request).unwrap();
let mut builder = TableMetaBuilder::default();
builder.schema(schema);
builder.primary_key_indices(vec![1]);
builder.next_column_id(3);
let table_meta = builder.build().unwrap();
let column = vector_to_grpc_column(
&table_meta,
"ts",
Arc::new(Int64Vector::from_slice([1, 2, 3])),
)
.unwrap();
assert_eq!(column.column_name, "ts");
assert_eq!(column.semantic_type, SemanticType::Timestamp as i32);
assert_eq!(column.values.unwrap().i64_values, vec![1, 2, 3]);
assert_eq!(column.null_mask, vec![0]);
assert_eq!(column.datatype, ColumnDataType::Int64 as i32);
let column = vector_to_grpc_column(
&table_meta,
"k",
Arc::new(Int32Vector::from_slice([3, 2, 1])),
)
.unwrap();
assert_eq!(column.column_name, "k");
assert_eq!(column.semantic_type, SemanticType::Tag as i32);
assert_eq!(column.values.unwrap().i32_values, vec![3, 2, 1]);
assert_eq!(column.null_mask, vec![0]);
assert_eq!(column.datatype, ColumnDataType::Int32 as i32);
let column = vector_to_grpc_column(
&table_meta,
"v",
Arc::new(StringVector::from(vec![
Some("hello"),
None,
Some("greptime"),
])),
)
.unwrap();
assert_eq!(column.column_name, "v");
assert_eq!(column.semantic_type, SemanticType::Field as i32);
assert_eq!(
column.values.unwrap().string_values,
vec!["hello", "greptime"]
);
assert_eq!(column.null_mask, vec![2]);
assert_eq!(column.datatype, ColumnDataType::String as i32);
}
#[test]
fn test_to_grpc_insert_request() {
let schema = Schema::new(vec![
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true),
ColumnSchema::new("id", ConcreteDataType::int16_datatype(), false),
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
]);
let mut builder = TableMetaBuilder::default();
builder.schema(Arc::new(schema));
builder.primary_key_indices(vec![]);
builder.next_column_id(3);
let table_meta = builder.build().unwrap();
let insert_request = mock_insert_request();
let request = to_grpc_insert_request(&table_meta, 12, insert_request).unwrap();
verify_grpc_insert_request(request);
}

View File

@@ -15,7 +15,7 @@
use std::fmt::Formatter;
use std::sync::Arc;
use api::v1::{DeleteRequest, InsertRequest};
use api::v1::DeleteRequest;
use client::Database;
use common_meta::table_name::TableName;
use common_query::prelude::Expr;
@@ -47,10 +47,6 @@ impl DatanodeInstance {
Self { table, db }
}
pub(crate) async fn grpc_insert(&self, request: InsertRequest) -> client::Result<u32> {
self.db.insert(request).await
}
pub(crate) async fn grpc_delete(&self, request: DeleteRequest) -> client::Result<u32> {
self.db.delete(request).await
}

View File

@@ -33,6 +33,7 @@ use tonic::transport::server::Router;
use crate::cluster::MetaPeerClientBuilder;
use crate::election::etcd::EtcdElection;
use crate::lock::etcd::EtcdLock;
use crate::lock::memory::MemLock;
use crate::metasrv::builder::MetaSrvBuilder;
use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef};
use crate::selector::lease_based::LeaseBasedSelector;
@@ -152,7 +153,11 @@ pub fn router(meta_srv: MetaSrv) -> Router {
pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result<MetaSrv> {
let (kv_store, election, lock) = if opts.use_memory_store {
(Arc::new(MemStore::new()) as _, None, None)
(
Arc::new(MemStore::new()) as _,
None,
Some(Arc::new(MemLock::default()) as _),
)
} else {
let etcd_endpoints = [&opts.store_addr];
let etcd_client = Client::connect(etcd_endpoints, None)

View File

@@ -283,6 +283,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Table already exists: {table_name}"))]
TableAlreadyExists {
table_name: String,
location: Location,
},
#[snafu(display("Pusher not found: {pusher_id}"))]
PusherNotFound {
pusher_id: String,
@@ -394,6 +400,7 @@ impl ErrorExt for Error {
| Error::SendShutdownSignal { .. }
| Error::ParseAddr { .. }
| Error::SchemaAlreadyExists { .. }
| Error::TableAlreadyExists { .. }
| Error::PusherNotFound { .. }
| Error::PushMessage { .. }
| Error::MailboxClosed { .. }

View File

@@ -18,6 +18,8 @@ pub(crate) mod memory;
use std::sync::Arc;
use common_telemetry::error;
use crate::error::Result;
pub type Key = Vec<u8>;
@@ -49,3 +51,50 @@ pub trait DistLock: Send + Sync {
}
pub type DistLockRef = Arc<dyn DistLock>;
pub(crate) struct DistLockGuard<'a> {
lock: &'a DistLockRef,
name: Vec<u8>,
key: Option<Key>,
}
impl<'a> DistLockGuard<'a> {
pub(crate) fn new(lock: &'a DistLockRef, name: Vec<u8>) -> Self {
Self {
lock,
name,
key: None,
}
}
pub(crate) async fn lock(&mut self) -> Result<()> {
if self.key.is_some() {
return Ok(());
}
let key = self
.lock
.lock(
self.name.clone(),
Opts {
expire_secs: Some(2),
},
)
.await?;
self.key = Some(key);
Ok(())
}
}
impl Drop for DistLockGuard<'_> {
fn drop(&mut self) {
if let Some(key) = self.key.take() {
let lock = self.lock.clone();
let name = self.name.clone();
common_runtime::spawn_bg(async move {
if let Err(e) = lock.unlock(key).await {
error!(e; "Failed to unlock '{}'", String::from_utf8_lossy(&name));
}
});
}
}
}

View File

@@ -15,6 +15,7 @@
//! All keys used for distributed locking in the Metasrv.
//! Place them in this unified module for better maintenance.
use common_meta::table_name::TableName;
use common_meta::RegionIdent;
use crate::lock::Key;
@@ -30,3 +31,7 @@ pub(crate) fn table_metadata_lock_key(region: &RegionIdent) -> Key {
)
.into_bytes()
}
pub(crate) fn table_creation_lock_key(table_name: &TableName) -> Key {
format!("table_creation_lock_({})", table_name).into_bytes()
}

View File

@@ -21,16 +21,19 @@ use api::v1::meta::{
};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_meta::key::TableRouteKey;
use common_meta::table_name::TableName;
use common_telemetry::{timer, warn};
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::RawTableInfo;
use tonic::{Request, Response};
use crate::error;
use crate::error::Result;
use crate::lock::{keys, DistLockGuard};
use crate::metasrv::{Context, MetaSrv, SelectorContext, SelectorRef};
use crate::metrics::METRIC_META_ROUTE_REQUEST;
use crate::sequence::SequenceRef;
use crate::service::store::ext::KvStoreExt;
use crate::service::store::kv::KvStoreRef;
use crate::service::GrpcResult;
use crate::table_routes::{get_table_global_value, get_table_route_value};
@@ -43,6 +46,30 @@ impl router_server::Router for MetaSrv {
let CreateRequest {
header, table_name, ..
} = &req;
let table_name: TableName = table_name
.as_ref()
.context(error::EmptyTableNameSnafu)?
.clone()
.into();
// TODO(LFC): Use procedure to create table, and get rid of locks here.
let mut guard = DistLockGuard::new(self.lock(), keys::table_creation_lock_key(&table_name));
guard.lock().await?;
let table_global_key = TableGlobalKey {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
}
.to_string()
.into_bytes();
ensure!(
self.kv_store().get(table_global_key).await?.is_none(),
error::TableAlreadyExistsSnafu {
table_name: table_name.to_string(),
}
);
let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id);
let _timer = timer!(
@@ -53,14 +80,13 @@ impl router_server::Router for MetaSrv {
]
);
let table_name = table_name.clone().context(error::EmptyTableNameSnafu)?;
let ctx = SelectorContext {
datanode_lease_secs: self.options().datanode_lease_secs,
server_addr: self.options().server_addr.clone(),
kv_store: self.kv_store(),
catalog: Some(table_name.catalog_name),
schema: Some(table_name.schema_name),
table: Some(table_name.table_name),
catalog: Some(table_name.catalog_name.clone()),
schema: Some(table_name.schema_name.clone()),
table: Some(table_name.table_name.clone()),
};
let selector = self.selector();
@@ -138,7 +164,7 @@ async fn handle_create(
return Ok(RouteResponse {
header: Some(ResponseHeader::failed(
cluster_id,
Error::not_enough_active_datanodes(peers.len() as _),
Error::not_enough_available_datanodes(partitions.len(), peers.len()),
)),
..Default::default()
});

View File

@@ -7,6 +7,7 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
api = { path = "../api" }
async-trait = "0.1"
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }

View File

@@ -88,21 +88,13 @@ impl PartitionRuleManager {
let mut datanodes = HashMap::with_capacity(regions.len());
for region in regions.iter() {
let datanode = route
.region_routes
.iter()
.find_map(|x| {
if x.region.id == *region as RegionId {
x.leader_peer.clone()
} else {
None
}
})
.find_region_leader(*region)
.context(error::FindDatanodeSnafu {
table: table.to_string(),
region: *region,
})?;
datanodes
.entry(datanode)
.entry(datanode.clone())
.or_insert_with(Vec::new)
.push(*region);
}

View File

@@ -39,6 +39,7 @@ http-body = "0.4"
humantime-serde = "1.1"
hyper = { version = "0.14", features = ["full"] }
influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" }
itertools.workspace = true
metrics.workspace = true
# metrics-process 1.0.10 depends on metrics-0.21 but opendal depends on metrics-0.20.1
metrics-process = "<1.0.10"

View File

@@ -28,10 +28,11 @@ use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use async_trait::async_trait;
use common_runtime::Runtime;
use common_telemetry::logging::info;
use common_telemetry::{error, warn};
use futures::FutureExt;
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::net::TcpListener;
use tokio::sync::oneshot::{self, Sender};
use tokio::sync::oneshot::{self, Receiver, Sender};
use tokio::sync::Mutex;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::{Request, Response, Status};
@@ -39,7 +40,8 @@ use tonic::{Request, Response, Status};
use self::prom_query_gateway::PrometheusGatewayService;
use crate::auth::UserProviderRef;
use crate::error::{
AlreadyStartedSnafu, GrpcReflectionServiceSnafu, Result, StartGrpcSnafu, TcpBindSnafu,
AlreadyStartedSnafu, GrpcReflectionServiceSnafu, InternalSnafu, Result, StartGrpcSnafu,
TcpBindSnafu,
};
use crate::grpc::database::DatabaseService;
use crate::grpc::flight::FlightHandler;
@@ -55,6 +57,10 @@ pub struct GrpcServer {
request_handler: Arc<GreptimeRequestHandler>,
/// Handler for Prometheus-compatible PromQL queries. Only present for frontend server.
promql_handler: Option<PromHandlerRef>,
/// gRPC serving state receiver. Only present if the gRPC server is started.
/// Used to wait for the server to stop, performing the old blocking fashion.
serve_state: Mutex<Option<Receiver<Result<()>>>>,
}
impl GrpcServer {
@@ -73,6 +79,7 @@ impl GrpcServer {
shutdown_tx: Mutex::new(None),
request_handler,
promql_handler,
serve_state: Mutex::new(None),
}
}
@@ -94,6 +101,22 @@ impl GrpcServer {
) -> PrometheusGatewayServer<impl PrometheusGateway> {
PrometheusGatewayServer::new(PrometheusGatewayService::new(handler))
}
pub async fn wait_for_serve(&self) -> Result<()> {
let mut serve_state = self.serve_state.lock().await;
let rx = serve_state.take().context(InternalSnafu {
err_msg: "gRPC serving state is unknown, maybe the server is not started, \
or we have already waited for the serve result before.",
})?;
let Ok(result) = rx.await else {
warn!("Background gRPC serving task is quited before we can receive the serve result.");
return Ok(());
};
if let Err(e) = result {
error!(e; "GRPC serve error");
}
Ok(())
}
}
pub struct HealthCheckHandler;
@@ -151,7 +174,6 @@ impl Server for GrpcServer {
.build()
.context(GrpcReflectionServiceSnafu)?;
// Would block to serve requests.
let mut builder = tonic::transport::Server::builder()
.add_service(self.create_flight_service())
.add_service(self.create_database_service())
@@ -160,12 +182,19 @@ impl Server for GrpcServer {
builder =
builder.add_service(self.create_prom_query_gateway_service(promql_handler.clone()))
}
builder
.add_service(reflection_service)
.serve_with_incoming_shutdown(TcpListenerStream::new(listener), rx.map(drop))
.await
.context(StartGrpcSnafu)?;
let builder = builder.add_service(reflection_service);
let (serve_state_tx, serve_state_rx) = oneshot::channel();
let mut serve_state = self.serve_state.lock().await;
*serve_state = Some(serve_state_rx);
common_runtime::spawn_bg(async move {
let result = builder
.serve_with_incoming_shutdown(TcpListenerStream::new(listener), rx.map(drop))
.await
.context(StartGrpcSnafu);
serve_state_tx.send(result)
});
Ok(addr)
}

View File

@@ -14,7 +14,7 @@
use std::collections::HashMap;
use api::v1::InsertRequest as GrpcInsertRequest;
use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests};
use common_grpc::writer::{LinesWriter, Precision};
use influxdb_line_protocol::{parse_lines, FieldValue};
use snafu::ResultExt;
@@ -32,7 +32,7 @@ pub struct InfluxdbRequest {
type TableName = String;
impl TryFrom<&InfluxdbRequest> for Vec<GrpcInsertRequest> {
impl TryFrom<&InfluxdbRequest> for InsertRequests {
type Error = Error;
fn try_from(value: &InfluxdbRequest) -> Result<Self, Self::Error> {
@@ -103,7 +103,7 @@ impl TryFrom<&InfluxdbRequest> for Vec<GrpcInsertRequest> {
writer.commit();
}
Ok(writers
let inserts = writers
.into_iter()
.map(|(table_name, writer)| {
let (columns, row_count) = writer.finish();
@@ -114,7 +114,8 @@ impl TryFrom<&InfluxdbRequest> for Vec<GrpcInsertRequest> {
row_count,
}
})
.collect())
.collect();
Ok(InsertRequests { inserts })
}
}
@@ -140,10 +141,10 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
lines: lines.to_string(),
};
let requests: Vec<GrpcInsertRequest> = influxdb_req.try_into().unwrap();
assert_eq!(2, requests.len());
let requests: InsertRequests = influxdb_req.try_into().unwrap();
assert_eq!(2, requests.inserts.len());
for request in requests {
for request in requests.inserts {
match &request.table_name[..] {
"monitor1" => assert_monitor_1(&request.columns),
"monitor2" => assert_monitor_2(&request.columns),

View File

@@ -21,7 +21,7 @@ use std::hash::{Hash, Hasher};
use api::prometheus::remote::label_matcher::Type as MatcherType;
use api::prometheus::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
use api::v1::column::SemanticType;
use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest};
use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest, InsertRequests};
use common_recordbatch::{RecordBatch, RecordBatches};
use common_time::timestamp::TimeUnit;
use datatypes::prelude::{ConcreteDataType, Value};
@@ -283,15 +283,23 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Ve
Ok(timeseries_map.into_values().collect())
}
pub fn to_grpc_insert_requests(mut request: WriteRequest) -> Result<Vec<GrpcInsertRequest>> {
let timeseries = std::mem::take(&mut request.timeseries);
timeseries.into_iter().map(to_grpc_insert_request).collect()
pub fn to_grpc_insert_requests(request: WriteRequest) -> Result<(InsertRequests, usize)> {
let (inserts, samples_counts) = itertools::process_results(
request.timeseries.into_iter().map(to_grpc_insert_request),
|x| x.unzip::<_, _, Vec<_>, Vec<_>>(),
)?;
Ok((
InsertRequests { inserts },
samples_counts.into_iter().sum::<usize>(),
))
}
fn to_grpc_insert_request(mut timeseries: TimeSeries) -> Result<GrpcInsertRequest> {
fn to_grpc_insert_request(timeseries: TimeSeries) -> Result<(GrpcInsertRequest, usize)> {
let samples_count = timeseries.samples.len();
// TODO(dennis): save exemplars into a column
let labels = std::mem::take(&mut timeseries.labels);
let samples = std::mem::take(&mut timeseries.samples);
let labels = timeseries.labels;
let samples = timeseries.samples;
let row_count = samples.len();
let mut columns = Vec::with_capacity(2 + labels.len());
@@ -344,14 +352,15 @@ fn to_grpc_insert_request(mut timeseries: TimeSeries) -> Result<GrpcInsertReques
});
}
Ok(GrpcInsertRequest {
let request = GrpcInsertRequest {
table_name: table_name.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in timeseries",
})?,
region_number: 0,
columns,
row_count: row_count as u32,
})
};
Ok((request, samples_count))
}
#[inline]
@@ -507,7 +516,7 @@ mod tests {
..Default::default()
};
let exprs = to_grpc_insert_requests(write_request).unwrap();
let exprs = to_grpc_insert_requests(write_request).unwrap().0.inserts;
assert_eq!(3, exprs.len());
assert_eq!("metric1", exprs[0].table_name);
assert_eq!("metric2", exprs[1].table_name);

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use api::v1::greptime_request::Request;
use api::v1::InsertRequest;
use api::v1::InsertRequests;
use async_trait::async_trait;
use axum::{http, Router};
use axum_test_helper::TestClient;
@@ -54,9 +54,8 @@ impl GrpcQueryHandler for DummyInstance {
#[async_trait]
impl InfluxdbLineProtocolHandler for DummyInstance {
async fn exec(&self, request: &InfluxdbRequest, ctx: QueryContextRef) -> Result<()> {
let requests: Vec<InsertRequest> = request.try_into()?;
for expr in requests {
let requests: InsertRequests = request.try_into()?;
for expr in requests.inserts {
let _ = self.tx.send((ctx.current_schema(), expr.table_name)).await;
}

View File

@@ -16,7 +16,7 @@ use std::borrow::Cow;
use std::sync::Arc;
use api::v1::greptime_request::Request;
use api::v1::InsertRequest;
use api::v1::{InsertRequest, InsertRequests};
use servers::error::{self, NotSupportedSnafu, Result};
use servers::interceptor::{GrpcQueryInterceptor, SqlQueryInterceptor};
use session::context::{QueryContext, QueryContextRef};
@@ -51,9 +51,9 @@ impl GrpcQueryInterceptor for NoopInterceptor {
_query_ctx: QueryContextRef,
) -> std::result::Result<(), Self::Error> {
match req {
Request::Insert(insert) => {
Request::Inserts(insert) => {
ensure!(
insert.region_number == 0,
insert.inserts.iter().all(|x| x.region_number == 0),
NotSupportedSnafu {
feat: "region not 0"
}
@@ -72,15 +72,17 @@ fn test_grpc_interceptor() {
let di = NoopInterceptor;
let ctx = Arc::new(QueryContext::new());
let req = Request::Insert(InsertRequest {
region_number: 1,
..Default::default()
let req = Request::Inserts(InsertRequests {
inserts: vec![InsertRequest {
region_number: 1,
..Default::default()
}],
});
let fail = GrpcQueryInterceptor::pre_execute(&di, &req, ctx.clone());
assert!(fail.is_err());
let req = Request::Insert(InsertRequest::default());
let req = Request::Inserts(InsertRequests::default());
let success = GrpcQueryInterceptor::pre_execute(&di, &req, ctx);
assert!(success.is_ok());
}

View File

@@ -154,7 +154,7 @@ impl GrpcQueryHandler for DummyInstance {
ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {
let output = match request {
Request::Insert(_) | Request::Delete(_) => unimplemented!(),
Request::Inserts(_) | Request::Delete(_) => unimplemented!(),
Request::Query(query_request) => {
let query = query_request.query.unwrap();
match query {

View File

@@ -15,7 +15,7 @@ common-time = { path = "../common/time" }
datafusion-sql.workspace = true
datatypes = { path = "../datatypes" }
hex = "0.4"
itertools = "0.10"
itertools.workspace = true
mito = { path = "../mito" }
once_cell = "1.10"
snafu = { version = "0.7", features = ["backtraces"] }

View File

@@ -25,7 +25,7 @@ datafusion-common.workspace = true
datafusion-expr.workspace = true
futures.workspace = true
futures-util.workspace = true
itertools = "0.10"
itertools.workspace = true
lazy_static = "1.4"
metrics.workspace = true
object-store = { path = "../object-store" }

View File

@@ -61,7 +61,7 @@ uuid.workspace = true
common-procedure = { path = "../src/common/procedure" }
datafusion.workspace = true
datafusion-expr.workspace = true
itertools = "0.10"
itertools.workspace = true
partition = { path = "../src/partition" }
paste.workspace = true
prost.workspace = true

View File

@@ -23,7 +23,7 @@ mod test {
use api::v1::{
alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DropTableExpr,
FlushTableExpr, InsertRequest, QueryRequest,
FlushTableExpr, InsertRequest, InsertRequests, QueryRequest,
};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_catalog::consts::MITO_ENGINE;
@@ -482,7 +482,13 @@ CREATE TABLE {table_name} (
row_count: 16,
..Default::default()
};
let output = query(instance, Request::Insert(insert)).await;
let output = query(
instance,
Request::Inserts(InsertRequests {
inserts: vec![insert],
}),
)
.await;
assert!(matches!(output, Output::AffectedRows(16)));
let request = Request::Query(QueryRequest {
@@ -670,7 +676,9 @@ CREATE TABLE {table_name} (
};
// Test auto create not existed table upon insertion.
let request = Request::Insert(insert);
let request = Request::Inserts(InsertRequests {
inserts: vec![insert],
});
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(3)));
@@ -703,7 +711,9 @@ CREATE TABLE {table_name} (
};
// Test auto add not existed column upon insertion.
let request = Request::Insert(insert);
let request = Request::Inserts(InsertRequests {
inserts: vec![insert],
});
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(3)));
@@ -829,7 +839,9 @@ CREATE TABLE {table_name} (
..Default::default()
};
let request = Request::Insert(insert);
let request = Request::Inserts(InsertRequests {
inserts: vec![insert],
});
let output = query(instance, request).await;
assert!(matches!(output, Output::AffectedRows(8)));

View File

@@ -18,7 +18,9 @@ mod test {
use std::sync::Arc;
use api::v1::column::SemanticType;
use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest};
use api::v1::{
column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest, InsertRequests,
};
use common_meta::table_name::TableName;
use common_query::logical_plan::Expr;
use common_query::physical_plan::DfPhysicalPlanAdapter;
@@ -46,7 +48,6 @@ mod test {
#[tokio::test(flavor = "multi_thread")]
async fn test_dist_table_scan() {
common_telemetry::init_default_ut_logging();
let table = Arc::new(new_dist_table("test_dist_table_scan").await);
// should scan all regions
// select a, row_id from numbers
@@ -220,7 +221,6 @@ mod test {
.downcast_ref::<FrontendCatalogManager>()
.unwrap();
let partition_manager = catalog_manager.partition_manager();
let datanode_clients = catalog_manager.datanode_clients();
let table_name = TableName::new("greptime", "public", "dist_numbers");
@@ -295,9 +295,7 @@ mod test {
DistTable::new(
table_name,
Arc::new(table_info),
partition_manager,
datanode_clients,
catalog_manager.backend(),
Arc::new(catalog_manager.clone()),
)
}
@@ -345,8 +343,11 @@ mod test {
row_count,
region_number,
};
let requests = InsertRequests {
inserts: vec![request],
};
dn_instance
.handle_insert(request, QueryContext::arc())
.handle_inserts(requests, &QueryContext::arc())
.await
.unwrap();
}

View File

@@ -421,8 +421,6 @@ pub async fn setup_grpc_server(
.unwrap(),
);
let fe_grpc_addr = format!("127.0.0.1:{}", ports::get_port());
let fe_instance = FeInstance::try_new_standalone(instance.clone())
.await
.unwrap();
@@ -434,13 +432,13 @@ pub async fn setup_grpc_server(
None,
runtime,
));
let grpc_server_clone = fe_grpc_server.clone();
let fe_grpc_addr_clone = fe_grpc_addr.clone();
tokio::spawn(async move {
let addr = fe_grpc_addr_clone.parse::<SocketAddr>().unwrap();
grpc_server_clone.start(addr).await.unwrap()
});
let fe_grpc_addr = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
let fe_grpc_addr = fe_grpc_server
.start(fe_grpc_addr)
.await
.unwrap()
.to_string();
// wait for GRPC server to start
tokio::time::sleep(Duration::from_secs(1)).await;

View File

@@ -17,7 +17,8 @@ use api::v1::column::SemanticType;
use api::v1::promql_request::Promql;
use api::v1::{
column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, CreateTableExpr,
InsertRequest, PromInstantQuery, PromRangeQuery, PromqlRequest, RequestHeader, TableId,
InsertRequest, InsertRequests, PromInstantQuery, PromRangeQuery, PromqlRequest, RequestHeader,
TableId,
};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE};
@@ -86,7 +87,11 @@ pub async fn test_invalid_dbname(store_type: StorageType) {
],
row_count: 4,
};
let result = db.insert(request).await;
let result = db
.insert(InsertRequests {
inserts: vec![request],
})
.await;
assert!(result.is_err());
let _ = fe_grpc_server.shutdown().await;
@@ -230,7 +235,11 @@ async fn insert_and_assert(db: &Database) {
],
row_count: 4,
};
let result = db.insert(request).await;
let result = db
.insert(InsertRequests {
inserts: vec![request],
})
.await;
assert_eq!(result.unwrap(), 4);
let result = db