LogStore::read takes a reference to namespace (#126)

This commit is contained in:
Lei, Huang
2022-08-02 12:59:08 +08:00
committed by GitHub
parent 868098d2b7
commit b5fcdae01d
4 changed files with 15 additions and 14 deletions

View File

@@ -206,17 +206,18 @@ impl LogStore for LocalFileLogStore {
.fail();
}
async fn append_batch(&self, _ns: Self::Namespace, _e: Vec<Self::Entry>) -> Result<Id> {
async fn append_batch(&self, _ns: &Self::Namespace, _e: Vec<Self::Entry>) -> Result<Id> {
todo!()
}
async fn read(
&self,
ns: Self::Namespace,
ns: &Self::Namespace,
id: Id,
) -> Result<SendableEntryStream<'_, Self::Entry, Self::Error>> {
let files = self.files.read().await;
let ns = ns.clone();
let s = stream!({
for (start_id, file) in files.iter() {
if *start_id >= id {
@@ -232,11 +233,11 @@ impl LogStore for LocalFileLogStore {
Ok(Box::pin(s))
}
async fn create_namespace(&mut self, _ns: Self::Namespace) -> Result<()> {
async fn create_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> {
todo!()
}
async fn delete_namespace(&mut self, _ns: Self::Namespace) -> Result<()> {
async fn delete_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> {
todo!()
}
@@ -316,7 +317,7 @@ mod tests {
.entry_id;
assert_eq!(0, id);
let stream = logstore.read(ns, 0).await.unwrap();
let stream = logstore.read(&ns, 0).await.unwrap();
tokio::pin!(stream);
let entries = stream.next().await.unwrap().unwrap();

View File

@@ -26,24 +26,24 @@ impl LogStore for NoopLogStore {
})
}
async fn append_batch(&self, _ns: Self::Namespace, _e: Vec<Self::Entry>) -> Result<Id> {
async fn append_batch(&self, _ns: &Self::Namespace, _e: Vec<Self::Entry>) -> Result<Id> {
todo!()
}
async fn read(
&self,
_ns: Self::Namespace,
_ns: &Self::Namespace,
_id: Id,
) -> Result<store_api::logstore::entry_stream::SendableEntryStream<'_, Self::Entry, Self::Error>>
{
todo!()
}
async fn create_namespace(&mut self, _ns: Self::Namespace) -> Result<()> {
async fn create_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> {
todo!()
}
async fn delete_namespace(&mut self, _ns: Self::Namespace) -> Result<()> {
async fn delete_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> {
todo!()
}

View File

@@ -102,7 +102,7 @@ impl<S: LogStore> Wal<S> {
pub async fn read_from_wal(&self, start_seq: SequenceNumber) -> Result<WriteBatchStream<'_>> {
let stream = self
.store
.read(self.namespace.clone(), start_seq)
.read(&self.namespace, start_seq)
.await
.map_err(BoxedError::new)
.context(error::ReadWalSnafu { name: self.name() })?

View File

@@ -28,22 +28,22 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
// Append a batch of entries atomically and return the offset of first entry.
async fn append_batch(
&self,
ns: Self::Namespace,
ns: &Self::Namespace,
e: Vec<Self::Entry>,
) -> Result<Id, Self::Error>;
// Create a new `EntryStream` to asynchronously generates `Entry` with ids starting from `id`.
async fn read(
&self,
ns: Self::Namespace,
ns: &Self::Namespace,
id: Id,
) -> Result<SendableEntryStream<Self::Entry, Self::Error>, Self::Error>;
// Create a new `Namespace`.
async fn create_namespace(&mut self, ns: Self::Namespace) -> Result<(), Self::Error>;
async fn create_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error>;
// Delete an existing `Namespace` with given ref.
async fn delete_namespace(&mut self, ns: Self::Namespace) -> Result<(), Self::Error>;
async fn delete_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error>;
// List all existing namespaces.
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>, Self::Error>;