diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 6e1231d838..f5f70ff37e 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -60,7 +60,7 @@ use store_api::region_engine::{ }; use store_api::region_request::{ convert_body_to_requests, AffectedRows, BatchRegionRequest, RegionCloseRequest, - RegionOpenRequest, RegionRequest, RegionRequestBundle, + RegionOpenRequest, RegionPutRequest, RegionRequest, RegionRequestBundle, }; use store_api::storage::RegionId; use tokio::sync::{Semaphore, SemaphorePermit}; @@ -776,6 +776,10 @@ impl RegionServerInner { .iter() .map(|(region_id, _)| (*region_id, RegionChange::None)) .collect::>(), + BatchRegionRequest::Put(requests) => requests + .iter() + .map(|(region_id, _)| (*region_id, RegionChange::None)) + .collect::>(), }; let (first_region_id, first_region_change) = region_changes.first().unwrap(); @@ -807,7 +811,7 @@ impl RegionServerInner { } Err(err) => { for (region_id, region_change) in region_changes { - self.unset_region_status(region_id, &engine, region_change); + self.unset_region_status(region_id, region_change); } Err(err) diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 55fdb3f47d..e713dbc55a 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -132,6 +132,18 @@ impl RegionEngine for MetricEngine { batch_request: BatchRegionRequest, ) -> Result { match batch_request { + BatchRegionRequest::Put(requests) => { + let rows = self + .inner + .batch_put_region(requests) + .await + .map_err(BoxedError::new)?; + + Ok(RegionResponse { + affected_rows: rows, + extensions: HashMap::new(), + }) + } BatchRegionRequest::Create(requests) => { let mut extension_return_value = HashMap::new(); let rows = self @@ -217,16 +229,6 @@ impl RegionEngine for MetricEngine { }) } - async fn handle_batch_request(&self, request: BatchRegionRequest) -> Result<(), BoxedError> { - match request { - BatchRegionRequest::Put(put) => self - .inner - .batch_put_region(put) - .await - .map_err(BoxedError::new), - } - } - async fn handle_query( &self, region_id: RegionId, diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 0154adb84e..79af7f038d 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -56,7 +56,7 @@ impl MetricEngineInner { pub async fn batch_put_region( &self, requests: Vec<(RegionId, RegionPutRequest)>, - ) -> Result<()> { + ) -> Result { { let state = self.state.read().unwrap(); for region_id in requests.iter().map(|(region_id, _)| region_id) { @@ -69,8 +69,7 @@ impl MetricEngineInner { } } - self.batch_put_logical_regions(requests).await?; - Ok(()) + self.batch_put_logical_regions(requests).await } async fn put_logical_region( diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 135ba7b390..0169770176 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -42,13 +42,13 @@ use crate::region::options::{MemtableOptions, MergeMode}; use crate::sst::file::FileTimeRange; pub mod bulk; +mod encoder; pub mod key_values; pub mod partition_tree; mod stats; pub mod time_partition; pub mod time_series; pub(crate) mod version; -mod encoder; /// Id for memtables. /// diff --git a/src/mito2/src/memtable/partition_tree/tree.rs b/src/mito2/src/memtable/partition_tree/tree.rs index 0ba6b79def..f9e361cf20 100644 --- a/src/mito2/src/memtable/partition_tree/tree.rs +++ b/src/mito2/src/memtable/partition_tree/tree.rs @@ -23,16 +23,15 @@ use common_recordbatch::filter::SimpleFilterEvaluator; use common_time::Timestamp; use datafusion_common::ScalarValue; use datatypes::prelude::ValueRef; -use snafu::{ensure, }; +use snafu::ensure; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; use table::predicate::Predicate; -use crate::error::{ - EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result, -}; +use crate::error::{EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result}; use crate::flush::WriteBufferManagerRef; +use crate::memtable::encoder::SparseEncoder; use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::partition::{ Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext, @@ -40,12 +39,11 @@ use crate::memtable::partition_tree::partition::{ use crate::memtable::partition_tree::PartitionTreeConfig; use crate::memtable::stats::WriteMetrics; use crate::memtable::{BoxedBatchIterator, KeyValues}; -use crate::memtable::encoder::{ SparseEncoder}; use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::dedup::LastNonNullIter; use crate::read::Batch; use crate::region::options::MergeMode; -use crate::row_converter::{PrimaryKeyCodec}; +use crate::row_converter::PrimaryKeyCodec; /// The partition tree. pub struct PartitionTree { diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index f24475298d..a3826811b8 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -138,6 +138,7 @@ pub enum BatchRegionRequest { Create(Vec<(RegionId, RegionCreateRequest)>), Drop(Vec<(RegionId, RegionDropRequest)>), Alter(Vec<(RegionId, RegionAlterRequest)>), + Put(Vec<(RegionId, RegionPutRequest)>), } impl BatchRegionRequest { @@ -159,6 +160,11 @@ impl BatchRegionRequest { .into_iter() .map(|(region_id, req)| (region_id, RegionRequest::Alter(req))), ), + BatchRegionRequest::Put(requests) => Box::new( + requests + .into_iter() + .map(|(region_id, req)| (region_id, RegionRequest::Put(req))), + ), } }