fix: fix compile

This commit is contained in:
WenyXu
2025-02-10 13:18:01 +00:00
parent e0bafd661c
commit fd9940a253
6 changed files with 31 additions and 22 deletions

View File

@@ -60,7 +60,7 @@ use store_api::region_engine::{
};
use store_api::region_request::{
convert_body_to_requests, AffectedRows, BatchRegionRequest, RegionCloseRequest,
RegionOpenRequest, RegionRequest, RegionRequestBundle,
RegionOpenRequest, RegionPutRequest, RegionRequest, RegionRequestBundle,
};
use store_api::storage::RegionId;
use tokio::sync::{Semaphore, SemaphorePermit};
@@ -776,6 +776,10 @@ impl RegionServerInner {
.iter()
.map(|(region_id, _)| (*region_id, RegionChange::None))
.collect::<Vec<_>>(),
BatchRegionRequest::Put(requests) => requests
.iter()
.map(|(region_id, _)| (*region_id, RegionChange::None))
.collect::<Vec<_>>(),
};
let (first_region_id, first_region_change) = region_changes.first().unwrap();
@@ -807,7 +811,7 @@ impl RegionServerInner {
}
Err(err) => {
for (region_id, region_change) in region_changes {
self.unset_region_status(region_id, &engine, region_change);
self.unset_region_status(region_id, region_change);
}
Err(err)

View File

@@ -132,6 +132,18 @@ impl RegionEngine for MetricEngine {
batch_request: BatchRegionRequest,
) -> Result<RegionResponse, BoxedError> {
match batch_request {
BatchRegionRequest::Put(requests) => {
let rows = self
.inner
.batch_put_region(requests)
.await
.map_err(BoxedError::new)?;
Ok(RegionResponse {
affected_rows: rows,
extensions: HashMap::new(),
})
}
BatchRegionRequest::Create(requests) => {
let mut extension_return_value = HashMap::new();
let rows = self
@@ -217,16 +229,6 @@ impl RegionEngine for MetricEngine {
})
}
async fn handle_batch_request(&self, request: BatchRegionRequest) -> Result<(), BoxedError> {
match request {
BatchRegionRequest::Put(put) => self
.inner
.batch_put_region(put)
.await
.map_err(BoxedError::new),
}
}
async fn handle_query(
&self,
region_id: RegionId,

View File

@@ -56,7 +56,7 @@ impl MetricEngineInner {
pub async fn batch_put_region(
&self,
requests: Vec<(RegionId, RegionPutRequest)>,
) -> Result<()> {
) -> Result<AffectedRows> {
{
let state = self.state.read().unwrap();
for region_id in requests.iter().map(|(region_id, _)| region_id) {
@@ -69,8 +69,7 @@ impl MetricEngineInner {
}
}
self.batch_put_logical_regions(requests).await?;
Ok(())
self.batch_put_logical_regions(requests).await
}
async fn put_logical_region(

View File

@@ -42,13 +42,13 @@ use crate::region::options::{MemtableOptions, MergeMode};
use crate::sst::file::FileTimeRange;
pub mod bulk;
mod encoder;
pub mod key_values;
pub mod partition_tree;
mod stats;
pub mod time_partition;
pub mod time_series;
pub(crate) mod version;
mod encoder;
/// Id for memtables.
///

View File

@@ -23,16 +23,15 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datatypes::prelude::ValueRef;
use snafu::{ensure, };
use snafu::ensure;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::{
EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result,
};
use crate::error::{EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::encoder::SparseEncoder;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::partition::{
Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext,
@@ -40,12 +39,11 @@ use crate::memtable::partition_tree::partition::{
use crate::memtable::partition_tree::PartitionTreeConfig;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{BoxedBatchIterator, KeyValues};
use crate::memtable::encoder::{ SparseEncoder};
use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::read::dedup::LastNonNullIter;
use crate::read::Batch;
use crate::region::options::MergeMode;
use crate::row_converter::{PrimaryKeyCodec};
use crate::row_converter::PrimaryKeyCodec;
/// The partition tree.
pub struct PartitionTree {

View File

@@ -138,6 +138,7 @@ pub enum BatchRegionRequest {
Create(Vec<(RegionId, RegionCreateRequest)>),
Drop(Vec<(RegionId, RegionDropRequest)>),
Alter(Vec<(RegionId, RegionAlterRequest)>),
Put(Vec<(RegionId, RegionPutRequest)>),
}
impl BatchRegionRequest {
@@ -159,6 +160,11 @@ impl BatchRegionRequest {
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Alter(req))),
),
BatchRegionRequest::Put(requests) => Box::new(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Put(req))),
),
}
}