feat: handle write requests

This commit is contained in:
evenyag
2023-08-02 15:13:59 +08:00
parent 2c4f1487bf
commit a737c3d12a
4 changed files with 102 additions and 29 deletions

View File

@@ -26,7 +26,7 @@ use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::error::{RecvSnafu, Result};
use crate::request::{CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody};
use crate::request::{CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest};
use crate::worker::WorkerGroup;
/// Region engine implementation for timeseries data.
@@ -83,6 +83,14 @@ impl MitoEngine {
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_exists(region_id)
}
/// Write to a region.
pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> {
write_request.validate()?;
self.inner.handle_request_body(RequestBody::Write(write_request))
.await
}
}
/// Inner struct of [MitoEngine].

View File

@@ -94,13 +94,22 @@ pub struct Mutation {
/// Request to write a region.
#[derive(Debug)]
pub(crate) struct WriteRequest {
pub struct WriteRequest {
/// Region to write.
pub region_id: RegionId,
/// Mutation to the region.
pub mutation: Mutation,
}
impl WriteRequest {
/// Validate the request.
pub(crate) fn validate(&self) -> Result<()> {
// 1. checks whether the request is too large.
// 2. checks whether each row in rows has the same schema.
unimplemented!()
}
}
/// Request sent to a worker
pub(crate) enum WorkerRequest {
/// Region request.
@@ -139,7 +148,6 @@ impl RegionRequest {
/// Body to carry actual region request.
#[derive(Debug)]
pub(crate) enum RequestBody {
// DML:
/// Write to a region.
Write(WriteRequest),
@@ -163,13 +171,8 @@ impl RequestBody {
}
}
/// Returns whether the request is a DDL (e.g. CREATE/OPEN/ALTER).
pub(crate) fn is_ddl(&self) -> bool {
match self {
RequestBody::Write(_) => false,
RequestBody::Create(_) => true,
RequestBody::Open(_) => true,
RequestBody::Close(_) => true,
}
/// Returns whether the request is a write request.
pub(crate) fn is_write(&self) -> bool {
matches!(self, RequestBody::Write(_))
}
}

View File

@@ -323,15 +323,15 @@ impl<S> RegionWorkerLoop<S> {
///
/// `buffer` should be empty.
async fn handle_requests(&mut self, buffer: &mut RequestBuffer) {
let mut dml_requests = Vec::with_capacity(buffer.len());
let mut write_requests = Vec::with_capacity(buffer.len());
let mut ddl_requests = Vec::with_capacity(buffer.len());
for worker_req in buffer.drain(..) {
match worker_req {
WorkerRequest::Region(req) => {
if req.body.is_ddl() {
ddl_requests.push(req);
if req.body.is_write() {
write_requests.push(req);
} else {
dml_requests.push(req);
ddl_requests.push(req);
}
}
// We receive a stop signal, but we still want to process remaining
@@ -343,24 +343,13 @@ impl<S> RegionWorkerLoop<S> {
}
}
// Handles all dml requests first. So we can alter regions without
// considering existing dml requests.
self.handle_dml_requests(dml_requests).await;
// Handles all write requests first. So we can alter regions without
// considering existing write requests.
self.handle_write_requests(write_requests).await;
self.handle_ddl_requests(ddl_requests).await;
}
/// Takes and handles all dml requests.
async fn handle_dml_requests(&mut self, write_requests: Vec<RegionRequest>) {
if write_requests.is_empty() {
return;
}
// Create a write context that holds meta and sequence.
unimplemented!()
}
/// Takes and handles all ddl requests.
async fn handle_ddl_requests(&mut self, ddl_requests: Vec<RegionRequest>) {
if ddl_requests.is_empty() {

View File

@@ -13,3 +13,76 @@
// limitations under the License.
//! Handling write requests.
use crate::{worker::RegionWorkerLoop, request::{RegionRequest}};
impl<S> RegionWorkerLoop<S> {
/// Takes and handles all write requests.
///
/// # Panics
/// Panics if `write_requests` contains a request whose body is a [WriteRequest].
pub(crate) async fn handle_write_requests(&mut self, write_requests: Vec<RegionRequest>) {
if write_requests.is_empty() {
return;
}
// We need to check:
// - region exists, if not, return error
// - check whether the schema is compatible with region schema
// - collect rows by region
// - get sequence for each row
// problem:
// - column order in request may be different from table column order
// - need to add missing column
// - memtable may need a new struct for sequence and op type.
todo!()
}
}
// pb write message
// region id
// rows
// sequence
// op type
// /// Entry for a write request in [WriteRequestBatch].
// #[derive(Debug)]
// pub(crate) struct BatchEntry {
// /// Result sender.
// pub(crate) sender: Option<Sender<Result<()>>>,
// /// A region write request.
// pub(crate) request: WriteRequest,
// }
// /// Batch of write requests.
// #[derive(Debug, Default)]
// pub(crate) struct WriteRequestBatch {
// /// Batched requests for each region.
// pub(crate) requests: HashMap<RegionId, Vec<BatchEntry>>,
// }
// impl WriteRequestBatch {
// /// Push a write request into the batch.
// ///
// /// # Panics
// /// Panics if the request body isn't a [WriteRequest].
// pub(crate) fn push(&mut self, request: RegionRequest) {
// match request.body {
// RequestBody::Write(write_req) => {
// self.requests.entry(write_req.region_id)
// .or_default()
// .push(BatchEntry { sender: request.sender, request: write_req, })
// },
// other => panic!("request is not a write request: {:?}", other),
// }
// }
// /// Returns true if the batch is empty.
// pub(crate) fn is_empty(&self) -> bool {
// self.requests.is_empty()
// }
// }