From 8af5cbedb17cb9903987bf1ac40263bf75ebaf30 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 22 Apr 2021 13:22:34 +0300 Subject: [PATCH] Move xlog_utils.rs to postgres_ffi module. I had copy-pasted these functions to a few other places. Clean that up, move them to a common module, and add some comments. --- Cargo.lock | 2 + control_plane/src/local_env.rs | 2 +- pageserver/src/restore_local_repo.rs | 54 +----------------- pageserver/src/walreceiver.rs | 57 +------------------ postgres_ffi/Cargo.toml | 1 + postgres_ffi/README | 3 + postgres_ffi/src/lib.rs | 2 + {walkeeper => postgres_ffi}/src/xlog_utils.rs | 12 ++++ walkeeper/Cargo.toml | 1 + walkeeper/src/lib.rs | 1 - walkeeper/src/wal_service.rs | 2 +- 11 files changed, 25 insertions(+), 112 deletions(-) create mode 100644 postgres_ffi/README rename {walkeeper => postgres_ffi}/src/xlog_utils.rs (95%) diff --git a/Cargo.lock b/Cargo.lock index 5e8d3dd20f..29889a4583 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1505,6 +1505,7 @@ dependencies = [ "chrono", "crc32c", "hex", + "log", "rand 0.8.3", ] @@ -2469,6 +2470,7 @@ dependencies = [ "pageserver", "postgres", "postgres-protocol", + "postgres_ffi", "regex", "slog", "slog-async", diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 934c45a831..8138364353 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -16,7 +16,7 @@ use anyhow::Result; use serde_derive::{Deserialize, Serialize}; use pageserver::ZTimelineId; -use walkeeper::xlog_utils; +use postgres_ffi::xlog_utils; // // This data structure represents deserialized zenith config, which should be diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 5cd927486a..1477233a6d 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -32,6 +32,7 @@ use crate::page_cache::PageCache; use crate::waldecoder::{decode_wal_record, WalStreamDecoder}; use crate::PageServerConf; use crate::ZTimelineId; +use postgres_ffi::xlog_utils::*; // From pg_tablespace_d.h // @@ -346,59 +347,6 @@ fn restore_wal( Ok(()) } -// FIXME: copied from xlog_utils.rs -pub const XLOG_FNAME_LEN: usize = 24; -pub type XLogRecPtr = u64; -pub type XLogSegNo = u64; -pub type TimeLineID = u32; - -#[allow(non_snake_case)] -pub fn XLogSegmentOffset(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> u32 { - return (xlogptr as u32) & (wal_segsz_bytes as u32 - 1); -} - -#[allow(non_snake_case)] -pub fn XLByteToSeg(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> XLogSegNo { - return xlogptr / wal_segsz_bytes as u64; -} - -#[allow(non_snake_case)] -pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String { - return format!( - "{:>08X}{:>08X}{:>08X}", - tli, - logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes), - logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes) - ); -} - -#[allow(non_snake_case)] -pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo { - return (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo; -} - -#[allow(non_snake_case)] -pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) { - let tli = u32::from_str_radix(&fname[0..8], 16).unwrap(); - let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo; - let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo; - return (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli); -} - -#[allow(non_snake_case)] -pub fn IsXLogFileName(fname: &str) -> bool { - return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit()); -} - -#[allow(non_snake_case)] -pub fn IsPartialXLogFileName(fname: &str) -> bool { - if let Some(basefname) = fname.strip_suffix(".partial") { - IsXLogFileName(basefname) - } else { - false - } -} - #[derive(Debug, Clone)] struct FilePathError { msg: String, diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 50be785aab..0145ae77c4 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -29,6 +29,7 @@ use tokio::time::{sleep, Duration}; use tokio_postgres::replication::{PgTimestamp, ReplicationStream}; use tokio_postgres::{NoTls, SimpleQueryMessage, SimpleQueryRow}; use tokio_stream::StreamExt; +use postgres_ffi::xlog_utils::*; // // We keep one WAL Receiver active per timeline. @@ -343,62 +344,6 @@ pub async fn identify_system(client: &tokio_postgres::Client) -> Result u32 { - return (xlogptr as u32) & (wal_segsz_bytes as u32 - 1); -} - -#[allow(non_snake_case)] -pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo { - return (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo; -} - -#[allow(non_snake_case)] -pub fn XLByteToSeg(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> XLogSegNo { - return xlogptr / wal_segsz_bytes as u64; -} - -#[allow(non_snake_case)] -pub fn XLogSegNoOffsetToRecPtr( - segno: XLogSegNo, - offset: u32, - wal_segsz_bytes: usize, -) -> XLogRecPtr { - return segno * (wal_segsz_bytes as u64) + (offset as u64); -} - -#[allow(non_snake_case)] -pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String { - return format!( - "{:>08X}{:>08X}{:>08X}", - tli, - logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes), - logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes) - ); -} - -#[allow(non_snake_case)] -pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) { - let tli = u32::from_str_radix(&fname[0..8], 16).unwrap(); - let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo; - let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo; - return (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli); -} - fn write_wal_file( startpos: XLogRecPtr, timeline: ZTimelineId, diff --git a/postgres_ffi/Cargo.toml b/postgres_ffi/Cargo.toml index 77cc5cf028..e843a28385 100644 --- a/postgres_ffi/Cargo.toml +++ b/postgres_ffi/Cargo.toml @@ -14,6 +14,7 @@ byteorder = "1.4.3" anyhow = "1.0" crc32c = "0.6.0" hex = "0.4.3" +log = "0.4.14" [build-dependencies] bindgen = "0.53.1" diff --git a/postgres_ffi/README b/postgres_ffi/README new file mode 100644 index 0000000000..899e8234f6 --- /dev/null +++ b/postgres_ffi/README @@ -0,0 +1,3 @@ +This module contains utility functions for interacting with PostgreSQL +file formats. + diff --git a/postgres_ffi/src/lib.rs b/postgres_ffi/src/lib.rs index 59cad0db39..d1496da2ef 100644 --- a/postgres_ffi/src/lib.rs +++ b/postgres_ffi/src/lib.rs @@ -3,6 +3,8 @@ #![allow(non_snake_case)] include!(concat!(env!("OUT_DIR"), "/bindings.rs")); +pub mod xlog_utils; + use bytes::{Buf, Bytes, BytesMut}; // sizeof(ControlFileData) diff --git a/walkeeper/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs similarity index 95% rename from walkeeper/src/xlog_utils.rs rename to postgres_ffi/src/xlog_utils.rs index c31a160cce..2ead4f9718 100644 --- a/walkeeper/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -1,3 +1,12 @@ +// +// This file contains common utilities for dealing with PostgreSQL WAL files and +// LSNs. +// +// Many of these functions have been copied from PostgreSQL, and rewritten in +// Rust. That's why they don't follow the usual Rust naming conventions, they +// have been named the same as the corresponding PostgreSQL functions instead. +// + use byteorder::{ByteOrder, LittleEndian}; use crc32c::*; use log::*; @@ -184,6 +193,9 @@ fn find_end_of_wal_segment( last_valid_rec_pos as u32 } +/// +/// Scan a directory that contains PostgreSQL WAL files, for the end of WAL. +/// pub fn find_end_of_wal( data_dir: &Path, wal_seg_size: usize, diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index 93c6b94ac5..21b7dee113 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -30,3 +30,4 @@ crc32c = "0.6.0" # FIXME: 'pageserver' is needed for ZTimelineId. Refactor pageserver = { path = "../pageserver" } +postgres_ffi = { path = "../postgres_ffi" } diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 784ab730b6..be4eec9fa8 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -4,7 +4,6 @@ use std::path::PathBuf; mod pq_protocol; pub mod wal_service; -pub mod xlog_utils; use crate::pq_protocol::SystemId; diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index 9d7e6a8bfc..dd894ee483 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -29,9 +29,9 @@ use tokio::task; use tokio_postgres::{connect, Error, NoTls}; use crate::pq_protocol::*; -use crate::xlog_utils::*; use crate::WalAcceptorConf; use pageserver::ZTimelineId; +use postgres_ffi::xlog_utils::*; type FullTransactionId = u64;