From 14913c6443f36e9c94cab63698fdfd910a016148 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 8 Dec 2023 18:05:48 +0300 Subject: [PATCH] Adapt rust walproposer to neon_walreader. --- libs/walproposer/src/api_bindings.rs | 61 +++++++++++++++++++--------- libs/walproposer/src/walproposer.rs | 37 +++++++++++------ 2 files changed, 66 insertions(+), 32 deletions(-) diff --git a/libs/walproposer/src/api_bindings.rs b/libs/walproposer/src/api_bindings.rs index 77afe1e686..2f633243be 100644 --- a/libs/walproposer/src/api_bindings.rs +++ b/libs/walproposer/src/api_bindings.rs @@ -8,6 +8,7 @@ use std::ffi::CString; use crate::bindings::uint32; use crate::bindings::walproposer_api; +use crate::bindings::NeonWALReadResult; use crate::bindings::PGAsyncReadResult; use crate::bindings::PGAsyncWriteResult; use crate::bindings::Safekeeper; @@ -191,21 +192,6 @@ extern "C" fn recovery_download( } } -#[allow(clippy::unnecessary_cast)] -extern "C" fn wal_read( - sk: *mut Safekeeper, - buf: *mut ::std::os::raw::c_char, - startptr: XLogRecPtr, - count: Size, -) { - unsafe { - let buf = std::slice::from_raw_parts_mut(buf as *mut u8, count); - let callback_data = (*(*(*sk).wp).config).callback_data; - let api = callback_data as *mut Box; - (*api).wal_read(&mut (*sk), buf, startptr) - } -} - extern "C" fn wal_reader_allocate(sk: *mut Safekeeper) { unsafe { let callback_data = (*(*(*sk).wp).config).callback_data; @@ -214,11 +200,28 @@ extern "C" fn wal_reader_allocate(sk: *mut Safekeeper) { } } -extern "C" fn free_event_set(wp: *mut WalProposer) { +#[allow(clippy::unnecessary_cast)] +extern "C" fn wal_read( + sk: *mut Safekeeper, + buf: *mut ::std::os::raw::c_char, + startptr: XLogRecPtr, + count: Size, + _errmsg: *mut *mut ::std::os::raw::c_char, +) -> NeonWALReadResult { unsafe { - let callback_data = (*(*wp).config).callback_data; + let buf = std::slice::from_raw_parts_mut(buf as *mut u8, count); + let callback_data = (*(*(*sk).wp).config).callback_data; let api = callback_data as *mut Box; - (*api).free_event_set(&mut (*wp)); + // TODO: errmsg is not forwarded + (*api).wal_read(&mut (*sk), buf, startptr) + } +} + +extern "C" fn wal_reader_events(sk: *mut Safekeeper) -> uint32 { + unsafe { + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).wal_reader_events(&mut (*sk)) } } @@ -238,6 +241,14 @@ extern "C" fn update_event_set(sk: *mut Safekeeper, events: uint32) { } } +extern "C" fn active_state_update_event_set(sk: *mut Safekeeper) { + unsafe { + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).active_state_update_event_set(&mut (*sk)); + } +} + extern "C" fn add_safekeeper_event_set(sk: *mut Safekeeper, events: uint32) { unsafe { let callback_data = (*(*(*sk).wp).config).callback_data; @@ -246,6 +257,14 @@ extern "C" fn add_safekeeper_event_set(sk: *mut Safekeeper, events: uint32) { } } +extern "C" fn rm_safekeeper_event_set(sk: *mut Safekeeper) { + unsafe { + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).rm_safekeeper_event_set(&mut (*sk)); + } +} + extern "C" fn wait_event_set( wp: *mut WalProposer, timeout: ::std::os::raw::c_long, @@ -401,12 +420,14 @@ pub(crate) fn create_api() -> walproposer_api { conn_async_write: Some(conn_async_write), conn_blocking_write: Some(conn_blocking_write), recovery_download: Some(recovery_download), - wal_read: Some(wal_read), wal_reader_allocate: Some(wal_reader_allocate), - free_event_set: Some(free_event_set), + wal_read: Some(wal_read), + wal_reader_events: Some(wal_reader_events), init_event_set: Some(init_event_set), update_event_set: Some(update_event_set), + active_state_update_event_set: Some(active_state_update_event_set), add_safekeeper_event_set: Some(add_safekeeper_event_set), + rm_safekeeper_event_set: Some(rm_safekeeper_event_set), wait_event_set: Some(wait_event_set), strong_random: Some(strong_random), get_redo_start_lsn: Some(get_redo_start_lsn), diff --git a/libs/walproposer/src/walproposer.rs b/libs/walproposer/src/walproposer.rs index f5723018d7..013400325d 100644 --- a/libs/walproposer/src/walproposer.rs +++ b/libs/walproposer/src/walproposer.rs @@ -6,8 +6,8 @@ use utils::id::TenantTimelineId; use crate::{ api_bindings::{create_api, take_vec_u8, Level}, bindings::{ - Safekeeper, WalProposer, WalProposerConfig, WalProposerCreate, WalProposerFree, - WalProposerStart, + NeonWALReadResult, Safekeeper, WalProposer, WalProposerConfig, WalProposerCreate, + WalProposerFree, WalProposerStart, }, }; @@ -90,15 +90,15 @@ pub trait ApiImpl { todo!() } - fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) { + fn wal_reader_allocate(&self, _sk: &mut Safekeeper) -> NeonWALReadResult { todo!() } - fn wal_reader_allocate(&self, _sk: &mut Safekeeper) { + fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) -> NeonWALReadResult { todo!() } - fn free_event_set(&self, _wp: &mut WalProposer) { + fn wal_reader_events(&self, _sk: &mut Safekeeper) -> u32 { todo!() } @@ -110,10 +110,18 @@ pub trait ApiImpl { todo!() } + fn active_state_update_event_set(&self, _sk: &mut Safekeeper) { + todo!() + } + fn add_safekeeper_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) { todo!() } + fn rm_safekeeper_event_set(&self, _sk: &mut Safekeeper) { + todo!() + } + fn wait_event_set(&self, _wp: &mut WalProposer, _timeout_millis: i64) -> WaitResult { todo!() } @@ -240,6 +248,7 @@ impl Drop for Wrapper { #[cfg(test)] mod tests { + use core::panic; use std::{ cell::Cell, sync::{atomic::AtomicUsize, mpsc::sync_channel}, @@ -247,7 +256,7 @@ mod tests { use utils::id::TenantTimelineId; - use crate::{api_bindings::Level, walproposer::Wrapper}; + use crate::{api_bindings::Level, bindings::NeonWALReadResult, walproposer::Wrapper}; use super::ApiImpl; @@ -355,12 +364,9 @@ mod tests { true } - fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) { - println!("wal_reader_allocate") - } - - fn free_event_set(&self, _: &mut crate::bindings::WalProposer) { - println!("free_event_set") + fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult { + println!("wal_reader_allocate"); + crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS } fn init_event_set(&self, _: &mut crate::bindings::WalProposer) { @@ -383,6 +389,13 @@ mod tests { self.wait_events.set(WaitEventsData { sk, event_mask }); } + fn rm_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper) { + println!( + "rm_safekeeper_event_set, sk={:?}", + sk as *mut crate::bindings::Safekeeper + ); + } + fn wait_event_set( &self, _: &mut crate::bindings::WalProposer,