fix: log file test fail (#54)

* fix: log file test fail

* remove some log

* wip

* add log

* wip
This commit is contained in:
Lei, Huang
2022-06-29 17:01:15 +08:00
committed by GitHub
parent bac6c720f8
commit 651bdbaa71
4 changed files with 21 additions and 8 deletions

View File

@@ -27,6 +27,7 @@ jobs:
command: test
args: --workspace
env:
RUST_BACKTRACE: 1
CARGO_INCREMENTAL: 0
RUSTFLAGS: "-Zprofile -Ccodegen-units=1 -Cinline-threshold=0 -Clink-dead-code -Coverflow-checks=off -Cpanic=abort -Zpanic_abort_tests"
GT_S3_BUCKET: ${{ secrets.S3_BUCKET }}

View File

@@ -36,6 +36,7 @@ jobs:
command: test
args: --workspace
env:
RUST_BACKTRACE: 1
GT_S3_BUCKET: ${{ secrets.S3_BUCKET }}
GT_S3_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY_ID }}
GT_S3_ACCESS_KEY: ${{ secrets.S3_ACCESS_KEY }}

View File

@@ -39,6 +39,7 @@ const LOG_WRITER_BATCH_SIZE: usize = 16;
pub struct LogFile {
name: FileName,
file: Arc<RwLock<File>>,
path: String,
write_offset: Arc<AtomicUsize>,
flush_offset: Arc<AtomicUsize>,
next_entry_id: Arc<AtomicU64>,
@@ -85,6 +86,7 @@ impl LogFile {
let mut log = Self {
name: file_name,
file: Arc::new(RwLock::new(file)),
path: path.to_string(),
write_offset: Arc::new(AtomicUsize::new(0)),
flush_offset: Arc::new(AtomicUsize::new(0)),
next_entry_id: Arc::new(AtomicU64::new(start_entry_id)),
@@ -238,7 +240,7 @@ impl LogFile {
});
*self.join_handle.lock().unwrap() = Some(handle);
info!("Flush task started...");
info!("Flush task started: {}", self.name);
}
Ok(())
}
@@ -311,6 +313,7 @@ impl LogFile {
pub fn create_stream(&self, _ns: impl Namespace, start_entry_id: u64) -> impl EntryStream + '_ {
let s = stream!({
let length = self.flush_offset.load(Ordering::Relaxed);
info!("Read mmap file: {}, length: {}", self.to_string(), length);
let mmap = self.map(0, length).await?;
let mut buf: &[u8] = mmap.as_ref();
@@ -360,8 +363,6 @@ impl LogFile {
let mut file = self.file.write().await;
// generate entry id
entry_id = self.inc_entry_id();
// generate in-file offset
entry_offset = self.inc_offset(size);
// rewrite encoded data
LittleEndian::write_u64(&mut serialized[0..8], entry_id);
// TODO(hl): CRC was calculated twice
@@ -370,7 +371,11 @@ impl LogFile {
// write to file
// TODO(hl): use io buffer and pwrite to reduce syscalls.
file.write(serialized.as_slice()).await.context(IoSnafu)?;
file.write_all(serialized.as_slice())
.await
.context(IoSnafu)?;
// generate in-file offset
entry_offset = self.inc_offset(size);
}
let (tx, rx) = oneshot::channel();
@@ -431,8 +436,10 @@ impl LogFile {
impl ToString for LogFile {
fn to_string(&self) -> String {
format!("LogFile{{ name: {}, write_offset: {}, flush_offset: {}, start_entry_id: {}, entry_id_counter: {} }}",
self.name, self.write_offset.load(Ordering::Relaxed), self.flush_offset.load(Ordering::Relaxed), self.start_entry_id, self.next_entry_id.load(Ordering::Relaxed))
format!("LogFile{{ name: {}, path: {}, write_offset: {}, flush_offset: {}, start_entry_id: {}, entry_id_counter: {} }}",
self.name,
self.path,
self.write_offset.load(Ordering::Relaxed), self.flush_offset.load(Ordering::Relaxed), self.start_entry_id, self.next_entry_id.load(Ordering::Relaxed))
}
}

View File

@@ -236,7 +236,7 @@ mod tests {
#[tokio::test]
pub async fn test_roll_file() {
common_telemetry::logging::init_default_ut_logging();
let dir = TempDir::new("greptimedb").unwrap();
let dir = TempDir::new("greptimedb1").unwrap();
let config = LogConfig {
append_buffer_size: 128,
max_log_file_size: 128,
@@ -281,7 +281,11 @@ mod tests {
#[tokio::test]
pub async fn test_write_and_read_data() {
common_telemetry::logging::init_default_ut_logging();
let dir = TempDir::new("greptimedb").unwrap();
let dir = TempDir::new("greptimedb2").unwrap();
let dir_str = dir.path().to_string_lossy().to_string();
info!("dir: {}", dir_str);
let config = LogConfig {
append_buffer_size: 128,
max_log_file_size: 128,