mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 05:42:57 +00:00
feat: impl Logstore::read by LogFile::create_stream (#124)
* feat: bridge LogStore::read to LogFile::create_stream * fix some CR comments
This commit is contained in:
@@ -274,10 +274,10 @@ impl LogFile {
|
||||
pub async fn replay(&mut self) -> Result<(usize, Id)> {
|
||||
let log_name = self.name.to_string();
|
||||
let previous_offset = self.flush_offset.load(Ordering::Relaxed);
|
||||
let ns = LocalNamespace::default();
|
||||
let mut stream = self.create_stream(
|
||||
// TODO(hl): LocalNamespace should be filled
|
||||
LocalNamespace::default(),
|
||||
0,
|
||||
&ns, 0,
|
||||
);
|
||||
|
||||
let mut last_offset = 0usize;
|
||||
@@ -313,7 +313,11 @@ impl LogFile {
|
||||
/// ### Notice
|
||||
/// If the entry with start entry id is not present, the first generated entry will start with
|
||||
/// the first entry with an id greater than `start_entry_id`.
|
||||
pub fn create_stream(&self, _ns: impl Namespace, start_entry_id: u64) -> impl EntryStream + '_ {
|
||||
pub fn create_stream(
|
||||
&self,
|
||||
_ns: &impl Namespace,
|
||||
start_entry_id: u64,
|
||||
) -> impl EntryStream<Entry = EntryImpl, Error = Error> + '_ {
|
||||
let length = self.flush_offset.load(Ordering::Relaxed);
|
||||
|
||||
let s = stream!({
|
||||
|
||||
@@ -3,9 +3,13 @@ use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use async_stream::stream;
|
||||
use common_telemetry::{error, info, warn};
|
||||
use futures::pin_mut;
|
||||
use futures::StreamExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::logstore::entry::{Encode, Id};
|
||||
use store_api::logstore::entry_stream::SendableEntryStream;
|
||||
use store_api::logstore::LogStore;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
@@ -167,7 +171,7 @@ impl LogStore for LocalFileLogStore {
|
||||
|
||||
async fn append(
|
||||
&self,
|
||||
_ns: Self::Namespace,
|
||||
_ns: &Self::Namespace,
|
||||
mut entry: Self::Entry,
|
||||
) -> Result<Self::AppendResponse> {
|
||||
// TODO(hl): configurable retry times
|
||||
@@ -208,11 +212,24 @@ impl LogStore for LocalFileLogStore {
|
||||
|
||||
async fn read(
|
||||
&self,
|
||||
_ns: Self::Namespace,
|
||||
_id: Id,
|
||||
) -> Result<store_api::logstore::entry_stream::SendableEntryStream<'_, Self::Entry, Self::Error>>
|
||||
{
|
||||
todo!()
|
||||
ns: Self::Namespace,
|
||||
id: Id,
|
||||
) -> Result<SendableEntryStream<'_, Self::Entry, Self::Error>> {
|
||||
let files = self.files.read().await;
|
||||
|
||||
let s = stream!({
|
||||
for (start_id, file) in files.iter() {
|
||||
if *start_id >= id {
|
||||
let s = file.create_stream(&ns, *start_id);
|
||||
pin_mut!(s);
|
||||
while let Some(entries) = s.next().await {
|
||||
yield entries;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Box::pin(s))
|
||||
}
|
||||
|
||||
async fn create_namespace(&mut self, _ns: Self::Namespace) -> Result<()> {
|
||||
@@ -248,13 +265,11 @@ mod tests {
|
||||
};
|
||||
|
||||
let logstore = LocalFileLogStore::open(&config).await.unwrap();
|
||||
let ns = LocalNamespace::default();
|
||||
assert_eq!(
|
||||
0,
|
||||
logstore
|
||||
.append(
|
||||
LocalNamespace::default(),
|
||||
EntryImpl::new(generate_data(100)),
|
||||
)
|
||||
.append(&ns, EntryImpl::new(generate_data(100)),)
|
||||
.await
|
||||
.unwrap()
|
||||
.entry_id
|
||||
@@ -263,10 +278,7 @@ mod tests {
|
||||
assert_eq!(
|
||||
1,
|
||||
logstore
|
||||
.append(
|
||||
LocalNamespace::default(),
|
||||
EntryImpl::new(generate_data(100)),
|
||||
)
|
||||
.append(&ns, EntryImpl::new(generate_data(100)),)
|
||||
.await
|
||||
.unwrap()
|
||||
.entry_id
|
||||
@@ -296,18 +308,15 @@ mod tests {
|
||||
log_file_dir: dir.path().to_str().unwrap().to_string(),
|
||||
};
|
||||
let logstore = LocalFileLogStore::open(&config).await.unwrap();
|
||||
let ns = LocalNamespace::default();
|
||||
let id = logstore
|
||||
.append(
|
||||
LocalNamespace::default(),
|
||||
EntryImpl::new(generate_data(100)),
|
||||
)
|
||||
.append(&ns, EntryImpl::new(generate_data(100)))
|
||||
.await
|
||||
.unwrap()
|
||||
.entry_id;
|
||||
assert_eq!(0, id);
|
||||
|
||||
let active_file = logstore.active_file();
|
||||
let stream = active_file.create_stream(LocalNamespace::default(), 0);
|
||||
let stream = logstore.read(ns, 0).await.unwrap();
|
||||
tokio::pin!(stream);
|
||||
|
||||
let entries = stream.next().await.unwrap().unwrap();
|
||||
|
||||
@@ -17,7 +17,7 @@ impl LogStore for NoopLogStore {
|
||||
|
||||
async fn append(
|
||||
&self,
|
||||
_ns: Self::Namespace,
|
||||
_ns: &Self::Namespace,
|
||||
mut _e: Self::Entry,
|
||||
) -> Result<Self::AppendResponse> {
|
||||
Ok(AppendResponseImpl {
|
||||
|
||||
@@ -127,7 +127,7 @@ impl<S: LogStore> Wal<S> {
|
||||
|
||||
let res = self
|
||||
.store
|
||||
.append(ns, e)
|
||||
.append(&ns, e)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::WriteWalSnafu { name: self.name() })?;
|
||||
|
||||
@@ -21,7 +21,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
|
||||
/// Append an `Entry` to WAL with given namespace
|
||||
async fn append(
|
||||
&self,
|
||||
ns: Self::Namespace,
|
||||
ns: &Self::Namespace,
|
||||
mut e: Self::Entry,
|
||||
) -> Result<Self::AppendResponse, Self::Error>;
|
||||
|
||||
@@ -32,7 +32,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
|
||||
e: Vec<Self::Entry>,
|
||||
) -> Result<Id, Self::Error>;
|
||||
|
||||
// Create a new `EntryStream` to asynchronously generates `Entry`.
|
||||
// Create a new `EntryStream` to asynchronously generates `Entry` with ids starting from `id`.
|
||||
async fn read(
|
||||
&self,
|
||||
ns: Self::Namespace,
|
||||
|
||||
Reference in New Issue
Block a user