From a737c3d12ae926db48a38e4c23676132f36e87df Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 2 Aug 2023 15:13:59 +0800 Subject: [PATCH] feat: handle write requests --- src/mito2/src/engine.rs | 10 +++- src/mito2/src/request.rs | 23 +++++---- src/mito2/src/worker.rs | 25 +++------- src/mito2/src/worker/handle_write.rs | 73 ++++++++++++++++++++++++++++ 4 files changed, 102 insertions(+), 29 deletions(-) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 007315e193..fc9ef43946 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -26,7 +26,7 @@ use store_api::storage::RegionId; use crate::config::MitoConfig; use crate::error::{RecvSnafu, Result}; -use crate::request::{CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody}; +use crate::request::{CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest}; use crate::worker::WorkerGroup; /// Region engine implementation for timeseries data. @@ -83,6 +83,14 @@ impl MitoEngine { pub fn is_region_exists(&self, region_id: RegionId) -> bool { self.inner.workers.is_region_exists(region_id) } + + /// Write to a region. + pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> { + write_request.validate()?; + + self.inner.handle_request_body(RequestBody::Write(write_request)) + .await + } } /// Inner struct of [MitoEngine]. diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 5bc7bcfaed..a81d8c9d54 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -94,13 +94,22 @@ pub struct Mutation { /// Request to write a region. #[derive(Debug)] -pub(crate) struct WriteRequest { +pub struct WriteRequest { /// Region to write. pub region_id: RegionId, /// Mutation to the region. pub mutation: Mutation, } +impl WriteRequest { + /// Validate the request. + pub(crate) fn validate(&self) -> Result<()> { + // 1. checks whether the request is too large. + // 2. checks whether each row in rows has the same schema. + unimplemented!() + } +} + /// Request sent to a worker pub(crate) enum WorkerRequest { /// Region request. @@ -139,7 +148,6 @@ impl RegionRequest { /// Body to carry actual region request. #[derive(Debug)] pub(crate) enum RequestBody { - // DML: /// Write to a region. Write(WriteRequest), @@ -163,13 +171,8 @@ impl RequestBody { } } - /// Returns whether the request is a DDL (e.g. CREATE/OPEN/ALTER). - pub(crate) fn is_ddl(&self) -> bool { - match self { - RequestBody::Write(_) => false, - RequestBody::Create(_) => true, - RequestBody::Open(_) => true, - RequestBody::Close(_) => true, - } + /// Returns whether the request is a write request. + pub(crate) fn is_write(&self) -> bool { + matches!(self, RequestBody::Write(_)) } } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 616ecac556..23fba45f29 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -323,15 +323,15 @@ impl RegionWorkerLoop { /// /// `buffer` should be empty. async fn handle_requests(&mut self, buffer: &mut RequestBuffer) { - let mut dml_requests = Vec::with_capacity(buffer.len()); + let mut write_requests = Vec::with_capacity(buffer.len()); let mut ddl_requests = Vec::with_capacity(buffer.len()); for worker_req in buffer.drain(..) { match worker_req { WorkerRequest::Region(req) => { - if req.body.is_ddl() { - ddl_requests.push(req); + if req.body.is_write() { + write_requests.push(req); } else { - dml_requests.push(req); + ddl_requests.push(req); } } // We receive a stop signal, but we still want to process remaining @@ -343,24 +343,13 @@ impl RegionWorkerLoop { } } - // Handles all dml requests first. So we can alter regions without - // considering existing dml requests. - self.handle_dml_requests(dml_requests).await; + // Handles all write requests first. So we can alter regions without + // considering existing write requests. + self.handle_write_requests(write_requests).await; self.handle_ddl_requests(ddl_requests).await; } - /// Takes and handles all dml requests. - async fn handle_dml_requests(&mut self, write_requests: Vec) { - if write_requests.is_empty() { - return; - } - - // Create a write context that holds meta and sequence. - - unimplemented!() - } - /// Takes and handles all ddl requests. async fn handle_ddl_requests(&mut self, ddl_requests: Vec) { if ddl_requests.is_empty() { diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index afd009fee7..2f26bd63e6 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -13,3 +13,76 @@ // limitations under the License. //! Handling write requests. + +use crate::{worker::RegionWorkerLoop, request::{RegionRequest}}; + +impl RegionWorkerLoop { + /// Takes and handles all write requests. + /// + /// # Panics + /// Panics if `write_requests` contains a request whose body is a [WriteRequest]. + pub(crate) async fn handle_write_requests(&mut self, write_requests: Vec) { + if write_requests.is_empty() { + return; + } + + // We need to check: + // - region exists, if not, return error + // - check whether the schema is compatible with region schema + // - collect rows by region + // - get sequence for each row + + // problem: + // - column order in request may be different from table column order + // - need to add missing column + // - memtable may need a new struct for sequence and op type. + + todo!() + } +} + + +// pb write message +// region id +// rows +// sequence +// op type + +// /// Entry for a write request in [WriteRequestBatch]. +// #[derive(Debug)] +// pub(crate) struct BatchEntry { +// /// Result sender. +// pub(crate) sender: Option>>, +// /// A region write request. +// pub(crate) request: WriteRequest, +// } + +// /// Batch of write requests. +// #[derive(Debug, Default)] +// pub(crate) struct WriteRequestBatch { +// /// Batched requests for each region. +// pub(crate) requests: HashMap>, +// } + +// impl WriteRequestBatch { +// /// Push a write request into the batch. +// /// +// /// # Panics +// /// Panics if the request body isn't a [WriteRequest]. +// pub(crate) fn push(&mut self, request: RegionRequest) { +// match request.body { +// RequestBody::Write(write_req) => { +// self.requests.entry(write_req.region_id) +// .or_default() +// .push(BatchEntry { sender: request.sender, request: write_req, }) +// }, +// other => panic!("request is not a write request: {:?}", other), +// } +// } + +// /// Returns true if the batch is empty. +// pub(crate) fn is_empty(&self) -> bool { +// self.requests.is_empty() +// } +// } +