diff --git a/src/log-store/src/fs/entry.rs b/src/log-store/src/fs/entry.rs index 2ec6f20e87..45f3d88fc5 100644 --- a/src/log-store/src/fs/entry.rs +++ b/src/log-store/src/fs/entry.rs @@ -106,17 +106,19 @@ impl Encode for EntryImpl { } } -impl Entry for EntryImpl { - type Error = Error; - - fn new(data: impl AsRef<[u8]>) -> Self { - Self { +impl EntryImpl { + pub(crate) fn new(data: impl AsRef<[u8]>) -> EntryImpl { + EntryImpl { id: 0, data: data.as_ref().to_vec(), offset: 0, epoch: 0, } } +} + +impl Entry for EntryImpl { + type Error = Error; fn data(&self) -> &[u8] { &self.data diff --git a/src/log-store/src/fs/log.rs b/src/log-store/src/fs/log.rs index 6bee0f80a0..203d72b2fc 100644 --- a/src/log-store/src/fs/log.rs +++ b/src/log-store/src/fs/log.rs @@ -244,6 +244,14 @@ impl LogStore for LocalFileLogStore { async fn list_namespaces(&self) -> Result> { todo!() } + + fn entry>(&self, data: D) -> Self::Entry { + EntryImpl::new(data) + } + + fn namespace(&self, name: &str) -> Self::Namespace { + LocalNamespace::new(name) + } } #[cfg(test)] diff --git a/src/log-store/src/fs/namespace.rs b/src/log-store/src/fs/namespace.rs index ac29167413..75da1e3817 100644 --- a/src/log-store/src/fs/namespace.rs +++ b/src/log-store/src/fs/namespace.rs @@ -18,14 +18,16 @@ struct LocalNamespaceInner { name: String, } -impl Namespace for LocalNamespace { - fn new(name: &str) -> Self { +impl LocalNamespace { + pub(crate) fn new(name: &str) -> Self { let inner = Arc::new(LocalNamespaceInner { name: name.to_string(), }); Self { inner } } +} +impl Namespace for LocalNamespace { fn name(&self) -> &str { self.inner.name.as_str() } diff --git a/src/log-store/src/fs/noop.rs b/src/log-store/src/fs/noop.rs index bcacbc87cc..25975538a6 100644 --- a/src/log-store/src/fs/noop.rs +++ b/src/log-store/src/fs/noop.rs @@ -50,4 +50,12 @@ impl LogStore for NoopLogStore { async fn list_namespaces(&self) -> Result> { todo!() } + + fn entry>(&self, data: D) -> Self::Entry { + EntryImpl::new(data) + } + + fn namespace(&self, name: &str) -> Self::Namespace { + LocalNamespace::new(name) + } } diff --git a/src/storage/src/manifest/region.rs b/src/storage/src/manifest/region.rs index aeffbccb36..559f2077c1 100644 --- a/src/storage/src/manifest/region.rs +++ b/src/storage/src/manifest/region.rs @@ -22,18 +22,20 @@ pub struct RegionManifest { inner: Arc, } +impl RegionManifest { + pub fn new(manifest_dir: &str, object_store: ObjectStore) -> Self { + RegionManifest { + inner: Arc::new(RegionManifestInner::new(manifest_dir, object_store)), + } + } +} + #[async_trait] impl Manifest for RegionManifest { type Error = Error; type MetaAction = RegionMetaActionList; type Metadata = RegionManifestData; - fn new(manifest_dir: &str, object_store: ObjectStore) -> Self { - RegionManifest { - inner: Arc::new(RegionManifestInner::new(manifest_dir, object_store)), - } - } - async fn update(&self, action_list: RegionMetaActionList) -> Result { self.inner.save(action_list).await } diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index dd4337c491..dee349cec8 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -5,6 +5,7 @@ mod writer; use std::sync::Arc; use async_trait::async_trait; +use datatypes::schema::SchemaRef; use snafu::ensure; use store_api::logstore::LogStore; use store_api::manifest::Manifest; @@ -60,6 +61,10 @@ impl Region for RegionImpl { fn snapshot(&self, _ctx: &ReadContext) -> Result { Ok(self.inner.create_snapshot()) } + + fn write_request(&self, schema: SchemaRef) -> Self::WriteRequest { + WriteBatch::new(schema) + } } /// Storage related config for region. diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index 42793af108..98f488538a 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use log_store::fs::noop::NoopLogStore; use object_store::{backend::fs::Backend, ObjectStore}; -use store_api::manifest::Manifest; use crate::background::JobPoolImpl; use crate::engine; diff --git a/src/storage/src/test_util/write_batch_util.rs b/src/storage/src/test_util/write_batch_util.rs index a594d11382..5d56b3080e 100644 --- a/src/storage/src/test_util/write_batch_util.rs +++ b/src/storage/src/test_util/write_batch_util.rs @@ -1,5 +1,3 @@ -use store_api::storage::WriteRequest; - use crate::test_util::schema_util::{self, ColumnDef}; use crate::write_batch::WriteBatch; diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index e9cb8d6c58..e4184cffa7 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -39,7 +39,7 @@ impl Clone for Wal { impl Wal { pub fn new(region_name: impl Into, store: Arc) -> Self { let region_name = region_name.into(); - let namespace = S::Namespace::new(®ion_name); + let namespace = store.namespace(®ion_name); Self { namespace, store } } @@ -122,7 +122,7 @@ impl Wal { async fn write(&self, seq: SequenceNumber, bytes: &[u8]) -> Result<(u64, usize)> { let ns = self.namespace.clone(); - let mut e = S::Entry::new(bytes); + let mut e = self.store.entry(bytes); e.set_id(seq); let res = self diff --git a/src/storage/src/write_batch.rs b/src/storage/src/write_batch.rs index 69a9aa2781..10ea62e38b 100644 --- a/src/storage/src/write_batch.rs +++ b/src/storage/src/write_batch.rs @@ -130,17 +130,19 @@ pub struct WriteBatch { num_rows: usize, } -impl WriteRequest for WriteBatch { - type Error = Error; - type PutOp = PutData; - - fn new(schema: SchemaRef) -> Self { +impl WriteBatch { + pub fn new(schema: SchemaRef) -> Self { Self { schema, mutations: Vec::new(), num_rows: 0, } } +} + +impl WriteRequest for WriteBatch { + type Error = Error; + type PutOp = PutData; fn put(&mut self, data: PutData) -> Result<()> { if data.is_empty() { @@ -193,6 +195,14 @@ impl WriteRequest for WriteBatch { Ok(ranges) } + + fn put_op(&self) -> Self::PutOp { + PutData::new() + } + + fn put_op_with_columns(num_columns: usize) -> Self::PutOp { + PutData::with_num_columns(num_columns) + } } /// Aligns timestamp to nearest time interval. @@ -231,18 +241,20 @@ pub struct PutData { columns: HashMap, } -impl PutOperation for PutData { - type Error = Error; - - fn new() -> PutData { +impl PutData { + pub(crate) fn new() -> PutData { PutData::default() } - fn with_num_columns(num_columns: usize) -> PutData { + pub(crate) fn with_num_columns(num_columns: usize) -> PutData { PutData { columns: HashMap::with_capacity(num_columns), } } +} + +impl PutOperation for PutData { + type Error = Error; fn add_key_column(&mut self, name: &str, vector: VectorRef) -> Result<()> { self.add_column_by_name(name, vector) @@ -407,7 +419,7 @@ pub mod codec { vectors::Helper, }; use snafu::ensure; - use store_api::storage::{PutOperation, WriteRequest}; + use store_api::storage::WriteRequest; use super::{ DataCorruptionSnafu, DecodeArrowSnafu, DecodeVectorSnafu, EncodeArrowSnafu, diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 255220b894..f9d85ea292 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -25,28 +25,36 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { mut e: Self::Entry, ) -> Result; - // Append a batch of entries atomically and return the offset of first entry. + /// Append a batch of entries atomically and return the offset of first entry. async fn append_batch( &self, ns: &Self::Namespace, e: Vec, ) -> Result; - // Create a new `EntryStream` to asynchronously generates `Entry` with ids starting from `id`. + /// Create a new `EntryStream` to asynchronously generates `Entry` with ids + /// starting from `id`. async fn read( &self, ns: &Self::Namespace, id: Id, ) -> Result, Self::Error>; - // Create a new `Namespace`. + /// Create a new `Namespace`. async fn create_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error>; - // Delete an existing `Namespace` with given ref. + /// Delete an existing `Namespace` with given ref. async fn delete_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error>; - // List all existing namespaces. + /// List all existing namespaces. async fn list_namespaces(&self) -> Result, Self::Error>; + + /// Create an entry of the associate Entry type + fn entry>(&self, data: D) -> Self::Entry; + + /// Create a namespace of the associate Namespace type + // TODO(sunng87): confusion with `create_namespace` + fn namespace(&self, name: &str) -> Self::Namespace; } pub trait AppendResponse: Send + Sync { diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index 194f3d0388..25de0d83e7 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -8,9 +8,6 @@ pub type Id = u64; /// Entry is the minimal data storage unit in `LogStore`. pub trait Entry: Encode + Send + Sync { type Error: ErrorExt + Send + Sync; - - fn new(data: impl AsRef<[u8]>) -> Self; - /// Return contained data of entry. fn data(&self) -> &[u8]; diff --git a/src/store-api/src/logstore/entry_stream.rs b/src/store-api/src/logstore/entry_stream.rs index b5d8745b75..531588a9d2 100644 --- a/src/store-api/src/logstore/entry_stream.rs +++ b/src/store-api/src/logstore/entry_stream.rs @@ -71,14 +71,6 @@ mod tests { impl Entry for SimpleEntry { type Error = Error; - fn new(data: impl AsRef<[u8]>) -> Self { - Self { - data: data.as_ref().to_vec(), - offset: 0, - epoch: 0, - } - } - fn data(&self) -> &[u8] { &self.data } diff --git a/src/store-api/src/logstore/namespace.rs b/src/store-api/src/logstore/namespace.rs index 6b238aaf56..8464c1a5e3 100644 --- a/src/store-api/src/logstore/namespace.rs +++ b/src/store-api/src/logstore/namespace.rs @@ -1,5 +1,3 @@ pub trait Namespace: Send + Sync + Clone + std::fmt::Debug { - fn new(name: &str) -> Self; - fn name(&self) -> &str; } diff --git a/src/store-api/src/manifest.rs b/src/store-api/src/manifest.rs index fbd562a338..4ca31d2c9c 100644 --- a/src/store-api/src/manifest.rs +++ b/src/store-api/src/manifest.rs @@ -4,7 +4,6 @@ mod storage; use async_trait::async_trait; use common_error::ext::ErrorExt; -use object_store::ObjectStore; use serde::{de::DeserializeOwned, Serialize}; pub use crate::manifest::storage::*; @@ -26,8 +25,6 @@ pub trait Manifest: Send + Sync + Clone + 'static { type MetaAction: MetaAction; type Metadata: Metadata; - fn new(manifest_dir: &str, object_store: ObjectStore) -> Self; - /// Update metadata by the action async fn update(&self, action: Self::MetaAction) -> Result; diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index 1497eb8f14..dc4a3e0f09 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use common_error::ext::ErrorExt; +use datatypes::schema::SchemaRef; use crate::storage::engine::OpenOptions; use crate::storage::metadata::RegionMeta; @@ -50,6 +51,9 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static { /// Create a snapshot for read. fn snapshot(&self, ctx: &ReadContext) -> Result; + + /// Create write request + fn write_request(&self, schema: SchemaRef) -> Self::WriteRequest; } /// Context for write operations. diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index e328906a43..6e9325cf6f 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -2,7 +2,6 @@ use std::time::Duration; use common_error::ext::ErrorExt; use common_time::RangeMillis; -use datatypes::schema::SchemaRef; use datatypes::vectors::VectorRef; use crate::storage::SequenceNumber; @@ -12,24 +11,22 @@ pub trait WriteRequest: Send { type Error: ErrorExt + Send + Sync; type PutOp: PutOperation; - fn new(schema: SchemaRef) -> Self; - fn put(&mut self, put: Self::PutOp) -> Result<(), Self::Error>; /// Returns all possible time ranges that contain the timestamp in this batch. /// /// Each time range is aligned to given `duration`. fn time_ranges(&self, duration: Duration) -> Result, Self::Error>; + + fn put_op(&self) -> Self::PutOp; + + fn put_op_with_columns(num_columns: usize) -> Self::PutOp; } /// Put multiple rows. pub trait PutOperation: Send { type Error: ErrorExt + Send + Sync; - fn new() -> Self; - - fn with_num_columns(num_columns: usize) -> Self; - fn add_key_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>; fn add_version_column(&mut self, vector: VectorRef) -> Result<(), Self::Error>; diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index a09246498f..4146edbb10 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -44,10 +44,10 @@ impl Table for MitoTable { return Ok(0); } - let mut write_request = R::WriteRequest::new(self.schema()); + let mut write_request = self.region.write_request(self.schema()); //FIXME(dennis): we can only insert to demo table right now - let mut put_op = <::WriteRequest as WriteRequest>::PutOp::new(); + let mut put_op = write_request.put_op(); let mut columns_values = request.columns_values; let key_columns = vec!["ts", "host"]; let value_columns = vec!["cpu", "memory"];