mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-04 08:50:38 +00:00
Compare commits
1 Commits
mx_offset_
...
wal_accept
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
18d625e556 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2254,6 +2254,7 @@ dependencies = [
|
|||||||
"regex",
|
"regex",
|
||||||
"rust-s3",
|
"rust-s3",
|
||||||
"serde",
|
"serde",
|
||||||
|
"serde_json",
|
||||||
"slog",
|
"slog",
|
||||||
"slog-async",
|
"slog-async",
|
||||||
"slog-scope",
|
"slog-scope",
|
||||||
|
|||||||
@@ -112,6 +112,9 @@ impl FromStr for ZTimelineId {
|
|||||||
fn from_str(s: &str) -> Result<ZTimelineId, Self::Err> {
|
fn from_str(s: &str) -> Result<ZTimelineId, Self::Err> {
|
||||||
let timelineid = hex::decode(s)?;
|
let timelineid = hex::decode(s)?;
|
||||||
|
|
||||||
|
if timelineid.len() != 16 {
|
||||||
|
return Err(hex::FromHexError::InvalidStringLength);
|
||||||
|
}
|
||||||
let mut buf: [u8; 16] = [0u8; 16];
|
let mut buf: [u8; 16] = [0u8; 16];
|
||||||
buf.copy_from_slice(timelineid.as_slice());
|
buf.copy_from_slice(timelineid.as_slice());
|
||||||
Ok(ZTimelineId(buf))
|
Ok(ZTimelineId(buf))
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import pytest
|
import pytest
|
||||||
import random
|
import random
|
||||||
import time
|
import time
|
||||||
|
import json
|
||||||
|
|
||||||
from contextlib import closing
|
from contextlib import closing
|
||||||
from multiprocessing import Process, Value
|
from multiprocessing import Process, Value
|
||||||
@@ -197,3 +198,22 @@ def test_race_conditions(zenith_cli, pageserver, postgres, wa_factory, stop_valu
|
|||||||
|
|
||||||
stop_value.value = 1
|
stop_value.value = 1
|
||||||
proc.join()
|
proc.join()
|
||||||
|
|
||||||
|
def test_state(zenith_cli, pageserver, postgres, wa_factory):
|
||||||
|
wa_factory.start_n_new(1)
|
||||||
|
|
||||||
|
zenith_cli.run(["branch", "test_wal_acceptors_state", "empty"])
|
||||||
|
pg = postgres.create_start('test_wal_acceptors_state',
|
||||||
|
wal_acceptors=wa_factory.get_connstrs())
|
||||||
|
|
||||||
|
# learn zenith timeline from compute
|
||||||
|
ztli = pg.safe_psql("show zenith.zenith_timeline")[0][0]
|
||||||
|
|
||||||
|
pg.safe_psql("create table t(i int)")
|
||||||
|
|
||||||
|
pg.stop().start()
|
||||||
|
pg.safe_psql("insert into t values(10)")
|
||||||
|
|
||||||
|
wa = wa_factory.instances[0]
|
||||||
|
state = wa.safe_psql("state", options="'-c ztimelineid={}'".format(ztli), sslmode='disable')[0][0]
|
||||||
|
print(state)
|
||||||
|
|||||||
@@ -85,13 +85,17 @@ class PgProtocol:
|
|||||||
self.port = port
|
self.port = port
|
||||||
self.username = username or getpass.getuser()
|
self.username = username or getpass.getuser()
|
||||||
|
|
||||||
def connstr(self, *, dbname: str = 'postgres', username: Optional[str] = None) -> str:
|
def connstr(self, *, dbname: str = 'postgres', username: Optional[str] = None, **kwargs) -> str:
|
||||||
"""
|
"""
|
||||||
Build a libpq connection string for the Postgres instance.
|
Build a libpq connection string for the Postgres instance.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
username = username or self.username
|
username = username or self.username
|
||||||
return f'host={self.host} port={self.port} user={username} dbname={dbname}'
|
connstr = f'host={self.host} port={self.port} user={username} dbname={dbname}'
|
||||||
|
for k, v in kwargs.items():
|
||||||
|
connstr += " {}={}".format(k, v)
|
||||||
|
print("connstr is {}".format(connstr))
|
||||||
|
return connstr
|
||||||
|
|
||||||
# autocommit=True here by default because that's what we need most of the time
|
# autocommit=True here by default because that's what we need most of the time
|
||||||
def connect(self, *, autocommit=True, **kwargs: Any) -> PgConnection:
|
def connect(self, *, autocommit=True, **kwargs: Any) -> PgConnection:
|
||||||
@@ -458,9 +462,10 @@ def read_pid(path):
|
|||||||
return int(Path(path).read_text())
|
return int(Path(path).read_text())
|
||||||
|
|
||||||
|
|
||||||
class WalAcceptor:
|
class WalAcceptor(PgProtocol):
|
||||||
""" An object representing a running wal acceptor daemon. """
|
""" An object representing a running wal acceptor daemon. """
|
||||||
def __init__(self, wa_binpath, data_dir, port, num):
|
def __init__(self, wa_binpath, data_dir, port, num):
|
||||||
|
super().__init__(host='127.0.0.1', port=port)
|
||||||
self.wa_binpath = wa_binpath
|
self.wa_binpath = wa_binpath
|
||||||
self.data_dir = data_dir
|
self.data_dir = data_dir
|
||||||
self.port = port
|
self.port = port
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ crc32c = "0.6.0"
|
|||||||
parse_duration = "2.1.1"
|
parse_duration = "2.1.1"
|
||||||
walkdir = "2"
|
walkdir = "2"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
serde_json = "1"
|
||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
|
|
||||||
# FIXME: 'pageserver' is needed for ZTimelineId. Refactor
|
# FIXME: 'pageserver' is needed for ZTimelineId. Refactor
|
||||||
|
|||||||
@@ -213,8 +213,8 @@ impl ReceiveWalConn {
|
|||||||
self.peer_addr, server_info.system_id, server_info.timeline_id,
|
self.peer_addr, server_info.system_id, server_info.timeline_id,
|
||||||
);
|
);
|
||||||
// FIXME: also check that the system identifier matches
|
// FIXME: also check that the system identifier matches
|
||||||
self.timeline.set(server_info.timeline_id)?;
|
self.timeline
|
||||||
self.timeline.get().load_control_file(&self.conf)?;
|
.set(&self.conf, server_info.timeline_id, true)?;
|
||||||
|
|
||||||
let mut my_info = self.timeline.get().get_info();
|
let mut my_info = self.timeline.get().get_info();
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
//! Part of WAL acceptor pretending to be Postgres, streaming xlog to
|
//! Part of WAL acceptor pretending to be Postgres, streaming xlog to
|
||||||
//! pageserver/any other consumer.
|
//! pageserver/any other consumer and answering to some utility queries.
|
||||||
//!
|
//!
|
||||||
|
|
||||||
use crate::replication::ReplicationConn;
|
use crate::replication::ReplicationConn;
|
||||||
@@ -12,7 +12,7 @@ use std::str::FromStr;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use zenith_utils::postgres_backend;
|
use zenith_utils::postgres_backend;
|
||||||
use zenith_utils::postgres_backend::PostgresBackend;
|
use zenith_utils::postgres_backend::PostgresBackend;
|
||||||
use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor};
|
use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor, JSON_OID, TEXT_OID};
|
||||||
|
|
||||||
/// Handler for streaming WAL from acceptor
|
/// Handler for streaming WAL from acceptor
|
||||||
pub struct SendWalHandler {
|
pub struct SendWalHandler {
|
||||||
@@ -26,10 +26,14 @@ pub struct SendWalHandler {
|
|||||||
impl postgres_backend::Handler for SendWalHandler {
|
impl postgres_backend::Handler for SendWalHandler {
|
||||||
fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> {
|
fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> {
|
||||||
match sm.params.get("ztimelineid") {
|
match sm.params.get("ztimelineid") {
|
||||||
Some(ref ztimelineid) => {
|
Some(ref ztimelineid) => match ZTimelineId::from_str(ztimelineid) {
|
||||||
let ztlid = ZTimelineId::from_str(ztimelineid)?;
|
Ok(ztlid) => {
|
||||||
self.timeline.set(ztlid)?;
|
self.timeline.set(&self.conf, ztlid, false)?;
|
||||||
}
|
}
|
||||||
|
Err(e) => {
|
||||||
|
bail!("failed to parse ztimelineid: {}", e)
|
||||||
|
}
|
||||||
|
},
|
||||||
_ => bail!("timelineid is required"),
|
_ => bail!("timelineid is required"),
|
||||||
}
|
}
|
||||||
if let Some(app_name) = sm.params.get("application_name") {
|
if let Some(app_name) = sm.params.get("application_name") {
|
||||||
@@ -41,13 +45,18 @@ impl postgres_backend::Handler for SendWalHandler {
|
|||||||
fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> {
|
fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> {
|
||||||
if query_string.starts_with(b"IDENTIFY_SYSTEM") {
|
if query_string.starts_with(b"IDENTIFY_SYSTEM") {
|
||||||
self.handle_identify_system(pgb)?;
|
self.handle_identify_system(pgb)?;
|
||||||
Ok(())
|
|
||||||
} else if query_string.starts_with(b"START_REPLICATION") {
|
} else if query_string.starts_with(b"START_REPLICATION") {
|
||||||
ReplicationConn::new(pgb).run(self, pgb, &query_string)?;
|
ReplicationConn::new(pgb).run(self, pgb, &query_string)?;
|
||||||
Ok(())
|
} else if query_string.starts_with(b"state") {
|
||||||
|
self.handle_state(pgb)?;
|
||||||
|
} else if query_string.to_ascii_lowercase().starts_with(b"set ") {
|
||||||
|
// have it because psycopg2 executes "SET datestyle TO 'ISO'"
|
||||||
|
// on connect
|
||||||
|
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||||
} else {
|
} else {
|
||||||
bail!("Unexpected command {:?}", query_string);
|
bail!("Unexpected command {:?}", query_string);
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -75,7 +84,7 @@ impl SendWalHandler {
|
|||||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||||
RowDescriptor {
|
RowDescriptor {
|
||||||
name: b"systemid",
|
name: b"systemid",
|
||||||
typoid: 25,
|
typoid: TEXT_OID,
|
||||||
typlen: -1,
|
typlen: -1,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
},
|
||||||
@@ -87,13 +96,13 @@ impl SendWalHandler {
|
|||||||
},
|
},
|
||||||
RowDescriptor {
|
RowDescriptor {
|
||||||
name: b"xlogpos",
|
name: b"xlogpos",
|
||||||
typoid: 25,
|
typoid: TEXT_OID,
|
||||||
typlen: -1,
|
typlen: -1,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
},
|
||||||
RowDescriptor {
|
RowDescriptor {
|
||||||
name: b"dbname",
|
name: b"dbname",
|
||||||
typoid: 25,
|
typoid: TEXT_OID,
|
||||||
typlen: -1,
|
typlen: -1,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
},
|
||||||
@@ -107,4 +116,21 @@ impl SendWalHandler {
|
|||||||
.write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?;
|
.write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send current state of safekeeper
|
||||||
|
fn handle_state(&mut self, pgb: &mut PostgresBackend) -> Result<()> {
|
||||||
|
let my_info = self.timeline.get().get_info();
|
||||||
|
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor {
|
||||||
|
name: b"state",
|
||||||
|
typoid: JSON_OID,
|
||||||
|
typlen: -1,
|
||||||
|
..Default::default()
|
||||||
|
}]))?
|
||||||
|
.write_message_noflush(&BeMessage::DataRow(&[Some(
|
||||||
|
serde_json::to_string(&my_info).unwrap().as_bytes(),
|
||||||
|
)]))?
|
||||||
|
.write_message(&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -55,10 +55,12 @@ impl SharedState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Load and lock control file (prevent running more than one instance of safekeeper)
|
/// Load and lock control file (prevent running more than one instance of safekeeper)
|
||||||
|
/// If create=false and file doesn't exist, bails out.
|
||||||
pub fn load_control_file(
|
pub fn load_control_file(
|
||||||
&mut self,
|
&mut self,
|
||||||
conf: &WalAcceptorConf,
|
conf: &WalAcceptorConf,
|
||||||
timelineid: ZTimelineId,
|
timelineid: ZTimelineId,
|
||||||
|
create: bool,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
if self.control_file.is_some() {
|
if self.control_file.is_some() {
|
||||||
info!("control file for timeline {} is already open", timelineid);
|
info!("control file for timeline {} is already open", timelineid);
|
||||||
@@ -69,13 +71,17 @@ impl SharedState {
|
|||||||
.data_dir
|
.data_dir
|
||||||
.join(timelineid.to_string())
|
.join(timelineid.to_string())
|
||||||
.join(CONTROL_FILE_NAME);
|
.join(CONTROL_FILE_NAME);
|
||||||
info!("loading control file {}", control_file_path.display());
|
info!(
|
||||||
match OpenOptions::new()
|
"loading control file {}, create={}",
|
||||||
.read(true)
|
control_file_path.display(),
|
||||||
.write(true)
|
create
|
||||||
.create(true)
|
);
|
||||||
.open(&control_file_path)
|
let mut opts = OpenOptions::new();
|
||||||
{
|
opts.read(true).write(true);
|
||||||
|
if create {
|
||||||
|
opts.create(true);
|
||||||
|
}
|
||||||
|
match opts.open(&control_file_path) {
|
||||||
Ok(file) => {
|
Ok(file) => {
|
||||||
// Lock file to prevent two or more active wal_acceptors
|
// Lock file to prevent two or more active wal_acceptors
|
||||||
match file.try_lock_exclusive() {
|
match file.try_lock_exclusive() {
|
||||||
@@ -91,29 +97,36 @@ impl SharedState {
|
|||||||
self.control_file = Some(file);
|
self.control_file = Some(file);
|
||||||
|
|
||||||
let cfile_ref = self.control_file.as_mut().unwrap();
|
let cfile_ref = self.control_file.as_mut().unwrap();
|
||||||
match SafeKeeperInfo::des_from(cfile_ref) {
|
if cfile_ref.metadata().unwrap().len() == 0 {
|
||||||
Err(e) => {
|
if !create {
|
||||||
warn!("read from {:?} failed: {}", control_file_path, e);
|
bail!("control file is empty");
|
||||||
}
|
}
|
||||||
Ok(info) => {
|
} else {
|
||||||
if info.magic != SK_MAGIC {
|
match SafeKeeperInfo::des_from(cfile_ref) {
|
||||||
bail!("Invalid control file magic: {}", info.magic);
|
Err(e) => {
|
||||||
|
bail!("failed to read control file {:?}: {}", control_file_path, e);
|
||||||
}
|
}
|
||||||
if info.format_version != SK_FORMAT_VERSION {
|
Ok(info) => {
|
||||||
bail!(
|
if info.magic != SK_MAGIC {
|
||||||
"Incompatible format version: {} vs. {}",
|
bail!("Invalid control file magic: {}", info.magic);
|
||||||
info.format_version,
|
}
|
||||||
SK_FORMAT_VERSION
|
if info.format_version != SK_FORMAT_VERSION {
|
||||||
);
|
bail!(
|
||||||
|
"Incompatible format version: {} vs. {}",
|
||||||
|
info.format_version,
|
||||||
|
SK_FORMAT_VERSION
|
||||||
|
);
|
||||||
|
}
|
||||||
|
self.info = info;
|
||||||
}
|
}
|
||||||
self.info = info;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
panic!(
|
bail!(
|
||||||
"Failed to open control file {:?}: {}",
|
"Failed to open control file {:?}: {}",
|
||||||
&control_file_path, e
|
&control_file_path,
|
||||||
|
e
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -198,9 +211,9 @@ impl Timeline {
|
|||||||
shared_state.hs_feedback.clone()
|
shared_state.hs_feedback.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn load_control_file(&self, conf: &WalAcceptorConf) -> Result<()> {
|
pub fn load_control_file(&self, conf: &WalAcceptorConf, create: bool) -> Result<()> {
|
||||||
let mut shared_state = self.mutex.lock().unwrap();
|
let mut shared_state = self.mutex.lock().unwrap();
|
||||||
shared_state.load_control_file(conf, self.timelineid)
|
shared_state.load_control_file(conf, self.timelineid, create)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn save_control_file(&self, sync: bool) -> Result<()> {
|
pub fn save_control_file(&self, sync: bool) -> Result<()> {
|
||||||
@@ -211,17 +224,23 @@ impl Timeline {
|
|||||||
|
|
||||||
// Utilities needed by various Connection-like objects
|
// Utilities needed by various Connection-like objects
|
||||||
pub trait TimelineTools {
|
pub trait TimelineTools {
|
||||||
fn set(&mut self, timeline_id: ZTimelineId) -> Result<()>;
|
fn set(&mut self, conf: &WalAcceptorConf, timeline_id: ZTimelineId, create: bool)
|
||||||
|
-> Result<()>;
|
||||||
fn get(&self) -> &Arc<Timeline>;
|
fn get(&self) -> &Arc<Timeline>;
|
||||||
fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID);
|
fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID);
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TimelineTools for Option<Arc<Timeline>> {
|
impl TimelineTools for Option<Arc<Timeline>> {
|
||||||
fn set(&mut self, timeline_id: ZTimelineId) -> Result<()> {
|
fn set(
|
||||||
|
&mut self,
|
||||||
|
conf: &WalAcceptorConf,
|
||||||
|
timeline_id: ZTimelineId,
|
||||||
|
create: bool,
|
||||||
|
) -> Result<()> {
|
||||||
// We will only set the timeline once. If it were to ever change,
|
// We will only set the timeline once. If it were to ever change,
|
||||||
// anyone who cloned the Arc would be out of date.
|
// anyone who cloned the Arc would be out of date.
|
||||||
assert!(self.is_none());
|
assert!(self.is_none());
|
||||||
*self = Some(GlobalTimelines::store(timeline_id)?);
|
*self = Some(GlobalTimelines::get(conf, timeline_id, create)?);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -243,11 +262,16 @@ lazy_static! {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// A zero-sized struct used to manage access to the global timelines map.
|
/// A zero-sized struct used to manage access to the global timelines map.
|
||||||
struct GlobalTimelines;
|
pub struct GlobalTimelines;
|
||||||
|
|
||||||
impl GlobalTimelines {
|
impl GlobalTimelines {
|
||||||
/// Store a new timeline into the global TIMELINES map.
|
/// Get a timeline with control file loaded from the global TIMELINES map.
|
||||||
fn store(timeline_id: ZTimelineId) -> Result<Arc<Timeline>> {
|
/// If control file doesn't exist and create=false, bails out.
|
||||||
|
pub fn get(
|
||||||
|
conf: &WalAcceptorConf,
|
||||||
|
timeline_id: ZTimelineId,
|
||||||
|
create: bool,
|
||||||
|
) -> Result<Arc<Timeline>> {
|
||||||
let mut timelines = TIMELINES.lock().unwrap();
|
let mut timelines = TIMELINES.lock().unwrap();
|
||||||
|
|
||||||
match timelines.get(&timeline_id) {
|
match timelines.get(&timeline_id) {
|
||||||
@@ -258,9 +282,10 @@ impl GlobalTimelines {
|
|||||||
|
|
||||||
let shared_state = SharedState::new();
|
let shared_state = SharedState::new();
|
||||||
|
|
||||||
let new_tid = Arc::new(Timeline::new(timeline_id, shared_state));
|
let new_tli = Arc::new(Timeline::new(timeline_id, shared_state));
|
||||||
timelines.insert(timeline_id, Arc::clone(&new_tid));
|
new_tli.load_control_file(conf, create)?;
|
||||||
Ok(new_tid)
|
timelines.insert(timeline_id, Arc::clone(&new_tli));
|
||||||
|
Ok(new_tli)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -116,7 +116,12 @@ impl PostgresBackend {
|
|||||||
Some(FeMessage::StartupMessage(m)) => {
|
Some(FeMessage::StartupMessage(m)) => {
|
||||||
trace!("got startup message {:?}", m);
|
trace!("got startup message {:?}", m);
|
||||||
|
|
||||||
handler.startup(self, &m)?;
|
if let Err(e) = handler.startup(self, &m) {
|
||||||
|
// try to send error to the client
|
||||||
|
let errmsg = format!("{}", e);
|
||||||
|
self.write_message(&BeMessage::ErrorResponse(errmsg))?;
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
|
||||||
match m.kind {
|
match m.kind {
|
||||||
StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => {
|
StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => {
|
||||||
|
|||||||
@@ -366,6 +366,7 @@ pub struct XLogDataBody<'a> {
|
|||||||
|
|
||||||
pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]);
|
pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]);
|
||||||
pub const TEXT_OID: Oid = 25;
|
pub const TEXT_OID: Oid = 25;
|
||||||
|
pub const JSON_OID: Oid = 114;
|
||||||
// single text column
|
// single text column
|
||||||
pub static SINGLE_COL_ROWDESC: BeMessage = BeMessage::RowDescription(&[RowDescriptor {
|
pub static SINGLE_COL_ROWDESC: BeMessage = BeMessage::RowDescription(&[RowDescriptor {
|
||||||
name: b"data",
|
name: b"data",
|
||||||
|
|||||||
Reference in New Issue
Block a user