From 116bdaf690ad8623557454a10acc8597130f9805 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 6 Feb 2025 19:43:28 +0800 Subject: [PATCH] refactor: pull column filling logic out of mito worker loop (#5455) * avoid duplicated req catagorisation Signed-off-by: Ruihang Xia * pull column filling up Signed-off-by: Ruihang Xia * fill columns instead of fill column Signed-off-by: Ruihang Xia * add test with metadata Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/mito2/src/engine.rs | 4 +- src/mito2/src/request.rs | 167 +++++++++++++++++++-------- src/mito2/src/worker.rs | 59 +++++++--- src/mito2/src/worker/handle_flush.rs | 6 +- src/mito2/src/worker/handle_write.rs | 67 +++++------ 5 files changed, 196 insertions(+), 107 deletions(-) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 77d33178c9..fb1a05d36c 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -416,7 +416,9 @@ impl EngineInner { region_id: RegionId, request: RegionRequest, ) -> Result { - let (request, receiver) = WorkerRequest::try_from_region_request(region_id, request)?; + let region_metadata = self.get_metadata(region_id).ok(); + let (request, receiver) = + WorkerRequest::try_from_region_request(region_id, request, region_metadata)?; self.workers.submit_to_worker(region_id, request).await?; receiver.await.context(RecvSnafu)? diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 574eb9e19a..ca20c01e40 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -66,13 +66,20 @@ pub struct WriteRequest { has_null: Vec, /// Write hint. pub hint: Option, + /// Region metadata on the time of this request is created. + pub(crate) region_metadata: Option, } impl WriteRequest { /// Creates a new request. /// /// Returns `Err` if `rows` are invalid. - pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> Result { + pub fn new( + region_id: RegionId, + op_type: OpType, + rows: Rows, + region_metadata: Option, + ) -> Result { let mut name_to_index = HashMap::with_capacity(rows.schema.len()); for (index, column) in rows.schema.iter().enumerate() { ensure!( @@ -116,6 +123,7 @@ impl WriteRequest { name_to_index, has_null, hint: None, + region_metadata, }) } @@ -248,46 +256,67 @@ impl WriteRequest { pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> { debug_assert_eq!(self.region_id, metadata.region_id); + let mut columns_to_fill = vec![]; for column in &metadata.column_metadatas { if !self.name_to_index.contains_key(&column.column_schema.name) { - self.fill_column(column)?; + columns_to_fill.push(column); + } + } + self.fill_columns(columns_to_fill)?; + + Ok(()) + } + + /// Checks the schema and fill missing columns. + pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> { + if let Err(e) = self.check_schema(metadata) { + if e.is_fill_default() { + // TODO(yingwen): Add metrics for this case. + // We need to fill default value. The write request may be a request + // sent before changing the schema. + self.fill_missing_columns(metadata)?; + } else { + return Err(e); } } Ok(()) } - /// Fills default value for specific `column`. - fn fill_column(&mut self, column: &ColumnMetadata) -> Result<()> { - // Need to add a default value for this column. - let proto_value = self.column_default_value(column)?; - - if proto_value.value_data.is_none() { - return Ok(()); + /// Fills default value for specific `columns`. + fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> { + let mut default_values = Vec::with_capacity(columns.len()); + let mut columns_to_fill = Vec::with_capacity(columns.len()); + for column in columns { + let default_value = self.column_default_value(column)?; + if default_value.value_data.is_some() { + default_values.push(default_value); + columns_to_fill.push(column); + } } - // Insert default value to each row. for row in &mut self.rows.rows { - row.values.push(proto_value.clone()); + row.values.extend(default_values.iter().cloned()); } - // Insert column schema. - let (datatype, datatype_ext) = - ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone()) - .with_context(|_| ConvertColumnDataTypeSnafu { - reason: format!( - "no protobuf type for column {} ({:?})", - column.column_schema.name, column.column_schema.data_type - ), - })? - .to_parts(); - self.rows.schema.push(ColumnSchema { - column_name: column.column_schema.name.clone(), - datatype: datatype as i32, - semantic_type: column.semantic_type as i32, - datatype_extension: datatype_ext, - options: options_from_column_schema(&column.column_schema), - }); + for column in columns_to_fill { + let (datatype, datatype_ext) = + ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone()) + .with_context(|_| ConvertColumnDataTypeSnafu { + reason: format!( + "no protobuf type for column {} ({:?})", + column.column_schema.name, column.column_schema.data_type + ), + })? + .to_parts(); + self.rows.schema.push(ColumnSchema { + column_name: column.column_schema.name.clone(), + datatype: datatype as i32, + semantic_type: column.semantic_type as i32, + datatype_extension: datatype_ext, + options: options_from_column_schema(&column.column_schema), + }); + } Ok(()) } @@ -559,19 +588,32 @@ impl WorkerRequest { pub(crate) fn try_from_region_request( region_id: RegionId, value: RegionRequest, + region_metadata: Option, ) -> Result<(WorkerRequest, Receiver>)> { let (sender, receiver) = oneshot::channel(); let worker_request = match value { RegionRequest::Put(v) => { - let write_request = - WriteRequest::new(region_id, OpType::Put, v.rows)?.with_hint(v.hint); + let mut write_request = + WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())? + .with_hint(v.hint); + if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense + && let Some(region_metadata) = ®ion_metadata + { + write_request.maybe_fill_missing_columns(region_metadata)?; + } WorkerRequest::Write(SenderWriteRequest { sender: sender.into(), request: write_request, }) } RegionRequest::Delete(v) => { - let write_request = WriteRequest::new(region_id, OpType::Delete, v.rows)?; + let mut write_request = + WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?; + if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense + && let Some(region_metadata) = ®ion_metadata + { + write_request.maybe_fill_missing_columns(region_metadata)?; + } WorkerRequest::Write(SenderWriteRequest { sender: sender.into(), request: write_request, @@ -875,7 +917,7 @@ mod tests { rows: vec![], }; - let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap_err(); + let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err(); check_invalid_request(&err, "duplicate column c0"); } @@ -891,7 +933,7 @@ mod tests { }], }; - let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap(); assert_eq!(0, request.column_index_by_name("c0").unwrap()); assert_eq!(1, request.column_index_by_name("c1").unwrap()); assert_eq!(None, request.column_index_by_name("c2")); @@ -909,7 +951,7 @@ mod tests { }], }; - let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap_err(); + let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err(); check_invalid_request(&err, "row has 3 columns but schema has 2"); } @@ -955,7 +997,7 @@ mod tests { }; let metadata = new_region_metadata(); - let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap(); request.check_schema(&metadata).unwrap(); } @@ -972,7 +1014,7 @@ mod tests { }; let metadata = new_region_metadata(); - let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap(); let err = request.check_schema(&metadata).unwrap_err(); check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)"); } @@ -994,7 +1036,7 @@ mod tests { }; let metadata = new_region_metadata(); - let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap(); let err = request.check_schema(&metadata).unwrap_err(); check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)"); } @@ -1016,7 +1058,7 @@ mod tests { }; let metadata = new_region_metadata(); - let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap(); let err = request.check_schema(&metadata).unwrap_err(); check_invalid_request(&err, "column ts is not null but input has null"); } @@ -1035,7 +1077,7 @@ mod tests { }; let metadata = new_region_metadata(); - let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap(); let err = request.check_schema(&metadata).unwrap_err(); check_invalid_request(&err, "missing column ts"); } @@ -1058,7 +1100,7 @@ mod tests { }; let metadata = new_region_metadata(); - let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap(); let err = request.check_schema(&metadata).unwrap_err(); check_invalid_request(&err, r#"unknown columns: ["k1"]"#); } @@ -1104,7 +1146,7 @@ mod tests { builder.build().unwrap() }; - let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap(); let err = request.check_schema(&metadata).unwrap_err(); assert!(err.is_fill_default()); assert!(request @@ -1128,7 +1170,7 @@ mod tests { }; let metadata = new_region_metadata(); - let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap(); let err = request.check_schema(&metadata).unwrap_err(); assert!(err.is_fill_default()); request.fill_missing_columns(&metadata).unwrap(); @@ -1214,7 +1256,8 @@ mod tests { }; let metadata = region_metadata_two_fields(); - let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows).unwrap(); + let mut request = + WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap(); let err = request.check_schema(&metadata).unwrap_err(); check_invalid_request(&err, "delete requests need column k0"); let err = request.fill_missing_columns(&metadata).unwrap_err(); @@ -1233,7 +1276,8 @@ mod tests { values: vec![i64_value(100), ts_ms_value(1)], }], }; - let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows).unwrap(); + let mut request = + WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap(); let err = request.check_schema(&metadata).unwrap_err(); assert!(err.is_fill_default()); request.fill_missing_columns(&metadata).unwrap(); @@ -1296,7 +1340,8 @@ mod tests { values: vec![i64_value(100), ts_ms_value(1)], }], }; - let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows).unwrap(); + let mut request = + WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap(); let err = request.check_schema(&metadata).unwrap_err(); assert!(err.is_fill_default()); request.fill_missing_columns(&metadata).unwrap(); @@ -1333,7 +1378,7 @@ mod tests { }; let metadata = new_region_metadata(); - let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap(); let err = request.fill_missing_columns(&metadata).unwrap_err(); check_invalid_request(&err, "column ts does not have default value"); } @@ -1363,11 +1408,39 @@ mod tests { }; let metadata = region_metadata_two_fields(); - let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap(); let err = request.check_schema(&metadata).unwrap_err(); check_invalid_request( &err, "column f1 expect type Int64(Int64Type), given: STRING(12)", ); } + + #[test] + fn test_write_request_metadata() { + let rows = Rows { + schema: vec![ + new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![i64_value(1), i64_value(2)], + }], + }; + + let metadata = Arc::new(new_region_metadata()); + let request = WriteRequest::new( + RegionId::new(1, 1), + OpType::Put, + rows, + Some(metadata.clone()), + ) + .unwrap(); + + assert!(request.region_metadata.is_some()); + assert_eq!( + request.region_metadata.unwrap().region_id, + RegionId::new(1, 1) + ); + } } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index f45ea11fe4..2f089d9a97 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -688,11 +688,18 @@ impl RegionWorkerLoop { self.last_periodical_check_millis += init_check_delay.as_millis() as i64; // Buffer to retrieve requests from receiver. - let mut buffer = RequestBuffer::with_capacity(self.config.worker_request_batch_size); + let mut write_req_buffer: Vec = + Vec::with_capacity(self.config.worker_request_batch_size); + let mut ddl_req_buffer: Vec = + Vec::with_capacity(self.config.worker_request_batch_size); + let mut general_req_buffer: Vec = + RequestBuffer::with_capacity(self.config.worker_request_batch_size); while self.running.load(Ordering::Relaxed) { // Clear the buffer before handling next batch of requests. - buffer.clear(); + write_req_buffer.clear(); + ddl_req_buffer.clear(); + general_req_buffer.clear(); let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL); let sleep = tokio::time::sleep(max_wait_time); @@ -701,7 +708,11 @@ impl RegionWorkerLoop { tokio::select! { request_opt = self.receiver.recv() => { match request_opt { - Some(request) => buffer.push(request), + Some(request) => match request { + WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req), + WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req), + _ => general_req_buffer.push(request), + }, // The channel is disconnected. None => break, } @@ -736,18 +747,29 @@ impl RegionWorkerLoop { } // Try to recv more requests from the channel. - for _ in 1..buffer.capacity() { + for _ in 1..self.config.worker_request_batch_size { // We have received one request so we start from 1. match self.receiver.try_recv() { - Ok(req) => buffer.push(req), + Ok(req) => match req { + WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req), + WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req), + _ => general_req_buffer.push(req), + }, // We still need to handle remaining requests. Err(_) => break, } } - self.listener.on_recv_requests(buffer.len()); + self.listener.on_recv_requests( + write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(), + ); - self.handle_requests(&mut buffer).await; + self.handle_requests( + &mut write_req_buffer, + &mut ddl_req_buffer, + &mut general_req_buffer, + ) + .await; self.handle_periodical_tasks(); } @@ -760,16 +782,17 @@ impl RegionWorkerLoop { /// Dispatches and processes requests. /// /// `buffer` should be empty. - async fn handle_requests(&mut self, buffer: &mut RequestBuffer) { - let mut write_requests = Vec::with_capacity(buffer.len()); - let mut ddl_requests = Vec::with_capacity(buffer.len()); - for worker_req in buffer.drain(..) { + async fn handle_requests( + &mut self, + write_requests: &mut Vec, + ddl_requests: &mut Vec, + general_requests: &mut Vec, + ) { + for worker_req in general_requests.drain(..) { match worker_req { - WorkerRequest::Write(sender_req) => { - write_requests.push(sender_req); - } - WorkerRequest::Ddl(sender_req) => { - ddl_requests.push(sender_req); + WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => { + // These requests are categorized into write_requests and ddl_requests. + continue; } WorkerRequest::Background { region_id, notify } => { // For background notify, we handle it directly. @@ -803,12 +826,12 @@ impl RegionWorkerLoop { } /// Takes and handles all ddl requests. - async fn handle_ddl_requests(&mut self, ddl_requests: Vec) { + async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec) { if ddl_requests.is_empty() { return; } - for ddl in ddl_requests { + for ddl in ddl_requests.drain(..) { let res = match ddl.request { DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await, DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await, diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index b2bc5fd2e8..e846809a5d 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -230,13 +230,13 @@ impl RegionWorkerLoop { request.on_success(); // Handle pending requests for the region. - if let Some((ddl_requests, write_requests)) = + if let Some((mut ddl_requests, mut write_requests)) = self.flush_scheduler.on_flush_success(region_id) { // Perform DDLs first because they require empty memtables. - self.handle_ddl_requests(ddl_requests).await; + self.handle_ddl_requests(&mut ddl_requests).await; // Handle pending write requests, we don't stall these requests. - self.handle_write_requests(write_requests, false).await; + self.handle_write_requests(&mut write_requests, false).await; } // Handle stalled requests. diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 2804aabf1a..efc81df57b 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -22,7 +22,6 @@ use common_telemetry::{debug, error}; use snafu::ensure; use store_api::codec::PrimaryKeyEncoding; use store_api::logstore::LogStore; -use store_api::metadata::RegionMetadata; use store_api::storage::RegionId; use crate::error::{InvalidRequestSnafu, RegionLeaderStateSnafu, RejectWriteSnafu, Result}; @@ -36,7 +35,7 @@ impl RegionWorkerLoop { /// Takes and handles all write requests. pub(crate) async fn handle_write_requests( &mut self, - mut write_requests: Vec, + write_requests: &mut Vec, allow_stall: bool, ) { if write_requests.is_empty() { @@ -56,7 +55,7 @@ impl RegionWorkerLoop { if self.write_buffer_manager.should_stall() && allow_stall { self.stalled_count.add(write_requests.len() as i64); - self.stalled_requests.append(&mut write_requests); + self.stalled_requests.append(write_requests); self.listener.on_write_stall(); return; } @@ -150,8 +149,8 @@ impl RegionWorkerLoop { let stalled = std::mem::take(&mut self.stalled_requests); self.stalled_count.sub(stalled.requests.len() as i64); // We already stalled these requests, don't stall them again. - for (_, (_, requests)) in stalled.requests { - self.handle_write_requests(requests, false).await; + for (_, (_, mut requests)) in stalled.requests { + self.handle_write_requests(&mut requests, false).await; } } @@ -159,25 +158,25 @@ impl RegionWorkerLoop { pub(crate) fn reject_stalled_requests(&mut self) { let stalled = std::mem::take(&mut self.stalled_requests); self.stalled_count.sub(stalled.requests.len() as i64); - for (_, (_, requests)) in stalled.requests { - reject_write_requests(requests); + for (_, (_, mut requests)) in stalled.requests { + reject_write_requests(&mut requests); } } /// Rejects a specific region's stalled requests. pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) { debug!("Rejects stalled requests for region {}", region_id); - let requests = self.stalled_requests.remove(region_id); + let mut requests = self.stalled_requests.remove(region_id); self.stalled_count.sub(requests.len() as i64); - reject_write_requests(requests); + reject_write_requests(&mut requests); } /// Handles a specific region's stalled requests. pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) { debug!("Handles stalled requests for region {}", region_id); - let requests = self.stalled_requests.remove(region_id); + let mut requests = self.stalled_requests.remove(region_id); self.stalled_count.sub(requests.len() as i64); - self.handle_write_requests(requests, true).await; + self.handle_write_requests(&mut requests, true).await; } } @@ -185,11 +184,11 @@ impl RegionWorkerLoop { /// Validates and groups requests by region. fn prepare_region_write_ctx( &mut self, - write_requests: Vec, + write_requests: &mut Vec, ) -> HashMap { // Initialize region write context map. let mut region_ctxs = HashMap::new(); - for mut sender_req in write_requests { + for mut sender_req in write_requests.drain(..) { let region_id = sender_req.request.region_id; // If region is waiting for alteration, add requests to pending writes. @@ -257,13 +256,21 @@ impl RegionWorkerLoop { continue; } - // If the primary key is dense, we need to fill missing columns. - if sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense { - // Checks whether request schema is compatible with region schema. - if let Err(e) = maybe_fill_missing_columns( - &mut sender_req.request, - ®ion_ctx.version().metadata, - ) { + // Double check the request schema + let need_fill_missing_columns = + if let Some(ref region_metadata) = sender_req.request.region_metadata { + region_ctx.version().metadata.schema_version != region_metadata.schema_version + } else { + true + }; + // Only fill missing columns if primary key is dense encoded. + if need_fill_missing_columns + && sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense + { + if let Err(e) = sender_req + .request + .maybe_fill_missing_columns(®ion_ctx.version().metadata) + { sender_req.sender.send(Err(e)); continue; @@ -291,10 +298,10 @@ impl RegionWorkerLoop { } /// Send rejected error to all `write_requests`. -fn reject_write_requests(write_requests: Vec) { +fn reject_write_requests(write_requests: &mut Vec) { WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64); - for req in write_requests { + for req in write_requests.drain(..) { req.sender.send( RejectWriteSnafu { region_id: req.request.region_id, @@ -304,22 +311,6 @@ fn reject_write_requests(write_requests: Vec) { } } -/// Checks the schema and fill missing columns. -fn maybe_fill_missing_columns(request: &mut WriteRequest, metadata: &RegionMetadata) -> Result<()> { - if let Err(e) = request.check_schema(metadata) { - if e.is_fill_default() { - // TODO(yingwen): Add metrics for this case. - // We need to fill default value. The write request may be a request - // sent before changing the schema. - request.fill_missing_columns(metadata)?; - } else { - return Err(e); - } - } - - Ok(()) -} - /// Rejects delete request under append mode. fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> { if append_mode {