mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 12:52:57 +00:00
refactor: remove constructors from trait (#121)
* refactor: remove constructors from trait * refactor: move PutOp into its parent type * refactor: move put constructor to write request * refactor: change visibility of PutData constructors call from WriteRequest instead * refactor: consistent naming for entry constructor * refactor: fix constructor form Namespace trait * refactor: remove comment code * doc: fix doc comments
This commit is contained in:
@@ -106,17 +106,19 @@ impl Encode for EntryImpl {
|
||||
}
|
||||
}
|
||||
|
||||
impl Entry for EntryImpl {
|
||||
type Error = Error;
|
||||
|
||||
fn new(data: impl AsRef<[u8]>) -> Self {
|
||||
Self {
|
||||
impl EntryImpl {
|
||||
pub(crate) fn new(data: impl AsRef<[u8]>) -> EntryImpl {
|
||||
EntryImpl {
|
||||
id: 0,
|
||||
data: data.as_ref().to_vec(),
|
||||
offset: 0,
|
||||
epoch: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Entry for EntryImpl {
|
||||
type Error = Error;
|
||||
|
||||
fn data(&self) -> &[u8] {
|
||||
&self.data
|
||||
|
||||
@@ -244,6 +244,14 @@ impl LogStore for LocalFileLogStore {
|
||||
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn entry<D: AsRef<[u8]>>(&self, data: D) -> Self::Entry {
|
||||
EntryImpl::new(data)
|
||||
}
|
||||
|
||||
fn namespace(&self, name: &str) -> Self::Namespace {
|
||||
LocalNamespace::new(name)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -18,14 +18,16 @@ struct LocalNamespaceInner {
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl Namespace for LocalNamespace {
|
||||
fn new(name: &str) -> Self {
|
||||
impl LocalNamespace {
|
||||
pub(crate) fn new(name: &str) -> Self {
|
||||
let inner = Arc::new(LocalNamespaceInner {
|
||||
name: name.to_string(),
|
||||
});
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl Namespace for LocalNamespace {
|
||||
fn name(&self) -> &str {
|
||||
self.inner.name.as_str()
|
||||
}
|
||||
|
||||
@@ -50,4 +50,12 @@ impl LogStore for NoopLogStore {
|
||||
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn entry<D: AsRef<[u8]>>(&self, data: D) -> Self::Entry {
|
||||
EntryImpl::new(data)
|
||||
}
|
||||
|
||||
fn namespace(&self, name: &str) -> Self::Namespace {
|
||||
LocalNamespace::new(name)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,18 +22,20 @@ pub struct RegionManifest {
|
||||
inner: Arc<RegionManifestInner>,
|
||||
}
|
||||
|
||||
impl RegionManifest {
|
||||
pub fn new(manifest_dir: &str, object_store: ObjectStore) -> Self {
|
||||
RegionManifest {
|
||||
inner: Arc::new(RegionManifestInner::new(manifest_dir, object_store)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Manifest for RegionManifest {
|
||||
type Error = Error;
|
||||
type MetaAction = RegionMetaActionList;
|
||||
type Metadata = RegionManifestData;
|
||||
|
||||
fn new(manifest_dir: &str, object_store: ObjectStore) -> Self {
|
||||
RegionManifest {
|
||||
inner: Arc::new(RegionManifestInner::new(manifest_dir, object_store)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn update(&self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
|
||||
self.inner.save(action_list).await
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ mod writer;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use snafu::ensure;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::manifest::Manifest;
|
||||
@@ -60,6 +61,10 @@ impl<S: LogStore> Region for RegionImpl<S> {
|
||||
fn snapshot(&self, _ctx: &ReadContext) -> Result<SnapshotImpl> {
|
||||
Ok(self.inner.create_snapshot())
|
||||
}
|
||||
|
||||
fn write_request(&self, schema: SchemaRef) -> Self::WriteRequest {
|
||||
WriteBatch::new(schema)
|
||||
}
|
||||
}
|
||||
|
||||
/// Storage related config for region.
|
||||
|
||||
@@ -2,7 +2,6 @@ use std::sync::Arc;
|
||||
|
||||
use log_store::fs::noop::NoopLogStore;
|
||||
use object_store::{backend::fs::Backend, ObjectStore};
|
||||
use store_api::manifest::Manifest;
|
||||
|
||||
use crate::background::JobPoolImpl;
|
||||
use crate::engine;
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use store_api::storage::WriteRequest;
|
||||
|
||||
use crate::test_util::schema_util::{self, ColumnDef};
|
||||
use crate::write_batch::WriteBatch;
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ impl<S: LogStore> Clone for Wal<S> {
|
||||
impl<S: LogStore> Wal<S> {
|
||||
pub fn new(region_name: impl Into<String>, store: Arc<S>) -> Self {
|
||||
let region_name = region_name.into();
|
||||
let namespace = S::Namespace::new(®ion_name);
|
||||
let namespace = store.namespace(®ion_name);
|
||||
|
||||
Self { namespace, store }
|
||||
}
|
||||
@@ -122,7 +122,7 @@ impl<S: LogStore> Wal<S> {
|
||||
|
||||
async fn write(&self, seq: SequenceNumber, bytes: &[u8]) -> Result<(u64, usize)> {
|
||||
let ns = self.namespace.clone();
|
||||
let mut e = S::Entry::new(bytes);
|
||||
let mut e = self.store.entry(bytes);
|
||||
e.set_id(seq);
|
||||
|
||||
let res = self
|
||||
|
||||
@@ -130,17 +130,19 @@ pub struct WriteBatch {
|
||||
num_rows: usize,
|
||||
}
|
||||
|
||||
impl WriteRequest for WriteBatch {
|
||||
type Error = Error;
|
||||
type PutOp = PutData;
|
||||
|
||||
fn new(schema: SchemaRef) -> Self {
|
||||
impl WriteBatch {
|
||||
pub fn new(schema: SchemaRef) -> Self {
|
||||
Self {
|
||||
schema,
|
||||
mutations: Vec::new(),
|
||||
num_rows: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WriteRequest for WriteBatch {
|
||||
type Error = Error;
|
||||
type PutOp = PutData;
|
||||
|
||||
fn put(&mut self, data: PutData) -> Result<()> {
|
||||
if data.is_empty() {
|
||||
@@ -193,6 +195,14 @@ impl WriteRequest for WriteBatch {
|
||||
|
||||
Ok(ranges)
|
||||
}
|
||||
|
||||
fn put_op(&self) -> Self::PutOp {
|
||||
PutData::new()
|
||||
}
|
||||
|
||||
fn put_op_with_columns(num_columns: usize) -> Self::PutOp {
|
||||
PutData::with_num_columns(num_columns)
|
||||
}
|
||||
}
|
||||
|
||||
/// Aligns timestamp to nearest time interval.
|
||||
@@ -231,18 +241,20 @@ pub struct PutData {
|
||||
columns: HashMap<String, VectorRef>,
|
||||
}
|
||||
|
||||
impl PutOperation for PutData {
|
||||
type Error = Error;
|
||||
|
||||
fn new() -> PutData {
|
||||
impl PutData {
|
||||
pub(crate) fn new() -> PutData {
|
||||
PutData::default()
|
||||
}
|
||||
|
||||
fn with_num_columns(num_columns: usize) -> PutData {
|
||||
pub(crate) fn with_num_columns(num_columns: usize) -> PutData {
|
||||
PutData {
|
||||
columns: HashMap::with_capacity(num_columns),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PutOperation for PutData {
|
||||
type Error = Error;
|
||||
|
||||
fn add_key_column(&mut self, name: &str, vector: VectorRef) -> Result<()> {
|
||||
self.add_column_by_name(name, vector)
|
||||
@@ -407,7 +419,7 @@ pub mod codec {
|
||||
vectors::Helper,
|
||||
};
|
||||
use snafu::ensure;
|
||||
use store_api::storage::{PutOperation, WriteRequest};
|
||||
use store_api::storage::WriteRequest;
|
||||
|
||||
use super::{
|
||||
DataCorruptionSnafu, DecodeArrowSnafu, DecodeVectorSnafu, EncodeArrowSnafu,
|
||||
|
||||
@@ -25,28 +25,36 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
|
||||
mut e: Self::Entry,
|
||||
) -> Result<Self::AppendResponse, Self::Error>;
|
||||
|
||||
// Append a batch of entries atomically and return the offset of first entry.
|
||||
/// Append a batch of entries atomically and return the offset of first entry.
|
||||
async fn append_batch(
|
||||
&self,
|
||||
ns: &Self::Namespace,
|
||||
e: Vec<Self::Entry>,
|
||||
) -> Result<Id, Self::Error>;
|
||||
|
||||
// Create a new `EntryStream` to asynchronously generates `Entry` with ids starting from `id`.
|
||||
/// Create a new `EntryStream` to asynchronously generates `Entry` with ids
|
||||
/// starting from `id`.
|
||||
async fn read(
|
||||
&self,
|
||||
ns: &Self::Namespace,
|
||||
id: Id,
|
||||
) -> Result<SendableEntryStream<Self::Entry, Self::Error>, Self::Error>;
|
||||
|
||||
// Create a new `Namespace`.
|
||||
/// Create a new `Namespace`.
|
||||
async fn create_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error>;
|
||||
|
||||
// Delete an existing `Namespace` with given ref.
|
||||
/// Delete an existing `Namespace` with given ref.
|
||||
async fn delete_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error>;
|
||||
|
||||
// List all existing namespaces.
|
||||
/// List all existing namespaces.
|
||||
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>, Self::Error>;
|
||||
|
||||
/// Create an entry of the associate Entry type
|
||||
fn entry<D: AsRef<[u8]>>(&self, data: D) -> Self::Entry;
|
||||
|
||||
/// Create a namespace of the associate Namespace type
|
||||
// TODO(sunng87): confusion with `create_namespace`
|
||||
fn namespace(&self, name: &str) -> Self::Namespace;
|
||||
}
|
||||
|
||||
pub trait AppendResponse: Send + Sync {
|
||||
|
||||
@@ -8,9 +8,6 @@ pub type Id = u64;
|
||||
/// Entry is the minimal data storage unit in `LogStore`.
|
||||
pub trait Entry: Encode + Send + Sync {
|
||||
type Error: ErrorExt + Send + Sync;
|
||||
|
||||
fn new(data: impl AsRef<[u8]>) -> Self;
|
||||
|
||||
/// Return contained data of entry.
|
||||
fn data(&self) -> &[u8];
|
||||
|
||||
|
||||
@@ -71,14 +71,6 @@ mod tests {
|
||||
impl Entry for SimpleEntry {
|
||||
type Error = Error;
|
||||
|
||||
fn new(data: impl AsRef<[u8]>) -> Self {
|
||||
Self {
|
||||
data: data.as_ref().to_vec(),
|
||||
offset: 0,
|
||||
epoch: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn data(&self) -> &[u8] {
|
||||
&self.data
|
||||
}
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
pub trait Namespace: Send + Sync + Clone + std::fmt::Debug {
|
||||
fn new(name: &str) -> Self;
|
||||
|
||||
fn name(&self) -> &str;
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ mod storage;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::ErrorExt;
|
||||
use object_store::ObjectStore;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
|
||||
pub use crate::manifest::storage::*;
|
||||
@@ -26,8 +25,6 @@ pub trait Manifest: Send + Sync + Clone + 'static {
|
||||
type MetaAction: MetaAction;
|
||||
type Metadata: Metadata;
|
||||
|
||||
fn new(manifest_dir: &str, object_store: ObjectStore) -> Self;
|
||||
|
||||
/// Update metadata by the action
|
||||
async fn update(&self, action: Self::MetaAction) -> Result<ManifestVersion, Self::Error>;
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::ErrorExt;
|
||||
use datatypes::schema::SchemaRef;
|
||||
|
||||
use crate::storage::engine::OpenOptions;
|
||||
use crate::storage::metadata::RegionMeta;
|
||||
@@ -50,6 +51,9 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static {
|
||||
|
||||
/// Create a snapshot for read.
|
||||
fn snapshot(&self, ctx: &ReadContext) -> Result<Self::Snapshot, Self::Error>;
|
||||
|
||||
/// Create write request
|
||||
fn write_request(&self, schema: SchemaRef) -> Self::WriteRequest;
|
||||
}
|
||||
|
||||
/// Context for write operations.
|
||||
|
||||
@@ -2,7 +2,6 @@ use std::time::Duration;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_time::RangeMillis;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use datatypes::vectors::VectorRef;
|
||||
|
||||
use crate::storage::SequenceNumber;
|
||||
@@ -12,24 +11,22 @@ pub trait WriteRequest: Send {
|
||||
type Error: ErrorExt + Send + Sync;
|
||||
type PutOp: PutOperation;
|
||||
|
||||
fn new(schema: SchemaRef) -> Self;
|
||||
|
||||
fn put(&mut self, put: Self::PutOp) -> Result<(), Self::Error>;
|
||||
|
||||
/// Returns all possible time ranges that contain the timestamp in this batch.
|
||||
///
|
||||
/// Each time range is aligned to given `duration`.
|
||||
fn time_ranges(&self, duration: Duration) -> Result<Vec<RangeMillis>, Self::Error>;
|
||||
|
||||
fn put_op(&self) -> Self::PutOp;
|
||||
|
||||
fn put_op_with_columns(num_columns: usize) -> Self::PutOp;
|
||||
}
|
||||
|
||||
/// Put multiple rows.
|
||||
pub trait PutOperation: Send {
|
||||
type Error: ErrorExt + Send + Sync;
|
||||
|
||||
fn new() -> Self;
|
||||
|
||||
fn with_num_columns(num_columns: usize) -> Self;
|
||||
|
||||
fn add_key_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>;
|
||||
|
||||
fn add_version_column(&mut self, vector: VectorRef) -> Result<(), Self::Error>;
|
||||
|
||||
@@ -44,10 +44,10 @@ impl<R: Region> Table for MitoTable<R> {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let mut write_request = R::WriteRequest::new(self.schema());
|
||||
let mut write_request = self.region.write_request(self.schema());
|
||||
|
||||
//FIXME(dennis): we can only insert to demo table right now
|
||||
let mut put_op = <<R as Region>::WriteRequest as WriteRequest>::PutOp::new();
|
||||
let mut put_op = write_request.put_op();
|
||||
let mut columns_values = request.columns_values;
|
||||
let key_columns = vec!["ts", "host"];
|
||||
let value_columns = vec!["cpu", "memory"];
|
||||
|
||||
Reference in New Issue
Block a user