Adapt rust walproposer to neon_walreader.

This commit is contained in:
Arseny Sher
2023-12-08 18:05:48 +03:00
committed by Arseny Sher
parent cdb08f0362
commit 14913c6443
2 changed files with 66 additions and 32 deletions

View File

@@ -8,6 +8,7 @@ use std::ffi::CString;
use crate::bindings::uint32;
use crate::bindings::walproposer_api;
use crate::bindings::NeonWALReadResult;
use crate::bindings::PGAsyncReadResult;
use crate::bindings::PGAsyncWriteResult;
use crate::bindings::Safekeeper;
@@ -191,21 +192,6 @@ extern "C" fn recovery_download(
}
}
#[allow(clippy::unnecessary_cast)]
extern "C" fn wal_read(
sk: *mut Safekeeper,
buf: *mut ::std::os::raw::c_char,
startptr: XLogRecPtr,
count: Size,
) {
unsafe {
let buf = std::slice::from_raw_parts_mut(buf as *mut u8, count);
let callback_data = (*(*(*sk).wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
(*api).wal_read(&mut (*sk), buf, startptr)
}
}
extern "C" fn wal_reader_allocate(sk: *mut Safekeeper) {
unsafe {
let callback_data = (*(*(*sk).wp).config).callback_data;
@@ -214,11 +200,28 @@ extern "C" fn wal_reader_allocate(sk: *mut Safekeeper) {
}
}
extern "C" fn free_event_set(wp: *mut WalProposer) {
#[allow(clippy::unnecessary_cast)]
extern "C" fn wal_read(
sk: *mut Safekeeper,
buf: *mut ::std::os::raw::c_char,
startptr: XLogRecPtr,
count: Size,
_errmsg: *mut *mut ::std::os::raw::c_char,
) -> NeonWALReadResult {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let buf = std::slice::from_raw_parts_mut(buf as *mut u8, count);
let callback_data = (*(*(*sk).wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
(*api).free_event_set(&mut (*wp));
// TODO: errmsg is not forwarded
(*api).wal_read(&mut (*sk), buf, startptr)
}
}
extern "C" fn wal_reader_events(sk: *mut Safekeeper) -> uint32 {
unsafe {
let callback_data = (*(*(*sk).wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
(*api).wal_reader_events(&mut (*sk))
}
}
@@ -238,6 +241,14 @@ extern "C" fn update_event_set(sk: *mut Safekeeper, events: uint32) {
}
}
extern "C" fn active_state_update_event_set(sk: *mut Safekeeper) {
unsafe {
let callback_data = (*(*(*sk).wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
(*api).active_state_update_event_set(&mut (*sk));
}
}
extern "C" fn add_safekeeper_event_set(sk: *mut Safekeeper, events: uint32) {
unsafe {
let callback_data = (*(*(*sk).wp).config).callback_data;
@@ -246,6 +257,14 @@ extern "C" fn add_safekeeper_event_set(sk: *mut Safekeeper, events: uint32) {
}
}
extern "C" fn rm_safekeeper_event_set(sk: *mut Safekeeper) {
unsafe {
let callback_data = (*(*(*sk).wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
(*api).rm_safekeeper_event_set(&mut (*sk));
}
}
extern "C" fn wait_event_set(
wp: *mut WalProposer,
timeout: ::std::os::raw::c_long,
@@ -401,12 +420,14 @@ pub(crate) fn create_api() -> walproposer_api {
conn_async_write: Some(conn_async_write),
conn_blocking_write: Some(conn_blocking_write),
recovery_download: Some(recovery_download),
wal_read: Some(wal_read),
wal_reader_allocate: Some(wal_reader_allocate),
free_event_set: Some(free_event_set),
wal_read: Some(wal_read),
wal_reader_events: Some(wal_reader_events),
init_event_set: Some(init_event_set),
update_event_set: Some(update_event_set),
active_state_update_event_set: Some(active_state_update_event_set),
add_safekeeper_event_set: Some(add_safekeeper_event_set),
rm_safekeeper_event_set: Some(rm_safekeeper_event_set),
wait_event_set: Some(wait_event_set),
strong_random: Some(strong_random),
get_redo_start_lsn: Some(get_redo_start_lsn),

View File

@@ -6,8 +6,8 @@ use utils::id::TenantTimelineId;
use crate::{
api_bindings::{create_api, take_vec_u8, Level},
bindings::{
Safekeeper, WalProposer, WalProposerConfig, WalProposerCreate, WalProposerFree,
WalProposerStart,
NeonWALReadResult, Safekeeper, WalProposer, WalProposerConfig, WalProposerCreate,
WalProposerFree, WalProposerStart,
},
};
@@ -90,15 +90,15 @@ pub trait ApiImpl {
todo!()
}
fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) {
fn wal_reader_allocate(&self, _sk: &mut Safekeeper) -> NeonWALReadResult {
todo!()
}
fn wal_reader_allocate(&self, _sk: &mut Safekeeper) {
fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) -> NeonWALReadResult {
todo!()
}
fn free_event_set(&self, _wp: &mut WalProposer) {
fn wal_reader_events(&self, _sk: &mut Safekeeper) -> u32 {
todo!()
}
@@ -110,10 +110,18 @@ pub trait ApiImpl {
todo!()
}
fn active_state_update_event_set(&self, _sk: &mut Safekeeper) {
todo!()
}
fn add_safekeeper_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
todo!()
}
fn rm_safekeeper_event_set(&self, _sk: &mut Safekeeper) {
todo!()
}
fn wait_event_set(&self, _wp: &mut WalProposer, _timeout_millis: i64) -> WaitResult {
todo!()
}
@@ -240,6 +248,7 @@ impl Drop for Wrapper {
#[cfg(test)]
mod tests {
use core::panic;
use std::{
cell::Cell,
sync::{atomic::AtomicUsize, mpsc::sync_channel},
@@ -247,7 +256,7 @@ mod tests {
use utils::id::TenantTimelineId;
use crate::{api_bindings::Level, walproposer::Wrapper};
use crate::{api_bindings::Level, bindings::NeonWALReadResult, walproposer::Wrapper};
use super::ApiImpl;
@@ -355,12 +364,9 @@ mod tests {
true
}
fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) {
println!("wal_reader_allocate")
}
fn free_event_set(&self, _: &mut crate::bindings::WalProposer) {
println!("free_event_set")
fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult {
println!("wal_reader_allocate");
crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
}
fn init_event_set(&self, _: &mut crate::bindings::WalProposer) {
@@ -383,6 +389,13 @@ mod tests {
self.wait_events.set(WaitEventsData { sk, event_mask });
}
fn rm_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper) {
println!(
"rm_safekeeper_event_set, sk={:?}",
sk as *mut crate::bindings::Safekeeper
);
}
fn wait_event_set(
&self,
_: &mut crate::bindings::WalProposer,