fix: sort system catalog entries to ensure catalog entry are firstlt processed (#449)

This commit is contained in:
Lei, Huang
2022-11-10 19:21:27 +08:00
committed by GitHub
parent 89b942798c
commit 2e9c9f2176
3 changed files with 127 additions and 50 deletions

View File

@@ -48,13 +48,19 @@ pub enum Error {
},
#[snafu(display("Invalid system catalog entry type: {:?}", entry_type))]
InvalidEntryType { entry_type: Option<u8> },
InvalidEntryType {
entry_type: Option<u8>,
backtrace: Backtrace,
},
#[snafu(display("Invalid system catalog key: {:?}", key))]
InvalidKey { key: Option<String> },
InvalidKey {
key: Option<String>,
backtrace: Backtrace,
},
#[snafu(display("Catalog value is not present"))]
EmptyValue,
EmptyValue { backtrace: Backtrace },
#[snafu(display("Failed to deserialize value, source: {}", source))]
ValueDeserialize {
@@ -63,10 +69,16 @@ pub enum Error {
},
#[snafu(display("Cannot find catalog by name: {}", catalog_name))]
CatalogNotFound { catalog_name: String },
CatalogNotFound {
catalog_name: String,
backtrace: Backtrace,
},
#[snafu(display("Cannot find schema, schema info: {}", schema_info))]
SchemaNotFound { schema_info: String },
SchemaNotFound {
schema_info: String,
backtrace: Backtrace,
},
#[snafu(display("Table {} already exists", table))]
TableExists { table: String, backtrace: Backtrace },
@@ -85,7 +97,10 @@ pub enum Error {
},
#[snafu(display("Table not found while opening table, table info: {}", table_info))]
TableNotFound { table_info: String },
TableNotFound {
table_info: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to read system catalog table records"))]
ReadSystemCatalog {
@@ -190,7 +205,7 @@ impl ErrorExt for Error {
| Error::CatalogStateInconsistent { .. } => StatusCode::Unexpected,
Error::SystemCatalog { .. }
| Error::EmptyValue
| Error::EmptyValue { .. }
| Error::ValueDeserialize { .. }
| Error::Io { .. } => StatusCode::StorageUnavailable,
@@ -255,7 +270,7 @@ mod tests {
assert_eq!(
StatusCode::Unexpected,
Error::InvalidKey { key: None }.status_code()
InvalidKeySnafu { key: None }.build().status_code()
);
assert_eq!(
@@ -296,7 +311,7 @@ mod tests {
);
assert_eq!(
StatusCode::StorageUnavailable,
Error::EmptyValue.status_code()
EmptyValueSnafu {}.build().status_code()
);
}

View File

@@ -6,12 +6,11 @@ use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MIN_USER_TABLE_ID,
SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME,
};
use common_recordbatch::RecordBatch;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::info;
use datatypes::prelude::ScalarVector;
use datatypes::vectors::{BinaryVector, UInt8Vector};
use futures_util::lock::Mutex;
use futures_util::StreamExt;
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::{EngineContext, TableEngineRef};
use table::metadata::TableId;
@@ -19,12 +18,11 @@ use table::requests::OpenTableRequest;
use table::table::numbers::NumbersTable;
use table::TableRef;
use crate::error::Result;
use crate::error::{
CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu,
SchemaNotFoundSnafu, SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableExistsSnafu,
TableNotFoundSnafu,
CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, SchemaNotFoundSnafu,
SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableExistsSnafu, TableNotFoundSnafu,
};
use crate::error::{ReadSystemCatalogSnafu, Result};
use crate::local::memory::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use crate::system::{
decode_system_catalog, Entry, SystemCatalogTable, TableEntry, ENTRY_TYPE_INDEX, KEY_INDEX,
@@ -70,17 +68,10 @@ impl LocalCatalogManager {
/// Scan all entries from system catalog table
pub async fn init(&self) -> Result<()> {
self.init_system_catalog()?;
let mut system_records = self.system.information_schema.system.records().await?;
let mut max_table_id = 0;
while let Some(records) = system_records
.next()
.await
.transpose()
.context(ReadSystemCatalogSnafu)?
{
let table_id = self.handle_system_catalog_entries(records).await?;
max_table_id = max_table_id.max(table_id);
}
let system_records = self.system.information_schema.system.records().await?;
let entries = self.collect_system_catalog_entries(system_records).await?;
let max_table_id = self.handle_system_catalog_entries(entries).await?;
info!(
"All system catalog entries processed, max table id: {}",
max_table_id
@@ -110,7 +101,6 @@ impl LocalCatalogManager {
let default_schema = Arc::new(MemorySchemaProvider::new());
// Add numbers table for test
// TODO(hl): remove this registration
let table = Arc::new(NumbersTable::default());
default_schema.register_table("numbers".to_string(), table)?;
@@ -120,47 +110,65 @@ impl LocalCatalogManager {
Ok(())
}
/// Processes records from system catalog table and returns the max table id persisted
/// in system catalog table.
async fn handle_system_catalog_entries(&self, records: RecordBatch) -> Result<TableId> {
/// Collect stream of system catalog entries to `Vec<Entry>`
async fn collect_system_catalog_entries(
&self,
stream: SendableRecordBatchStream,
) -> Result<Vec<Entry>> {
let record_batch = common_recordbatch::util::collect(stream)
.await
.context(ReadSystemCatalogSnafu)?;
let rbs = record_batch
.into_iter()
.map(Self::record_batch_to_entry)
.collect::<Result<Vec<_>>>()?;
Ok(rbs.into_iter().flat_map(Vec::into_iter).collect::<_>())
}
/// Convert `RecordBatch` to a vector of `Entry`.
fn record_batch_to_entry(rb: RecordBatch) -> Result<Vec<Entry>> {
ensure!(
records.df_recordbatch.columns().len() >= 6,
rb.df_recordbatch.columns().len() >= 6,
SystemCatalogSnafu {
msg: format!(
"Length mismatch: {}",
records.df_recordbatch.columns().len()
)
msg: format!("Length mismatch: {}", rb.df_recordbatch.columns().len())
}
);
let entry_type = UInt8Vector::try_from_arrow_array(&records.df_recordbatch.columns()[0])
let entry_type = UInt8Vector::try_from_arrow_array(&rb.df_recordbatch.columns()[0])
.with_context(|_| SystemCatalogTypeMismatchSnafu {
data_type: records.df_recordbatch.columns()[ENTRY_TYPE_INDEX]
data_type: rb.df_recordbatch.columns()[ENTRY_TYPE_INDEX]
.data_type()
.clone(),
})?;
let key = BinaryVector::try_from_arrow_array(&records.df_recordbatch.columns()[1])
let key = BinaryVector::try_from_arrow_array(&rb.df_recordbatch.columns()[1])
.with_context(|_| SystemCatalogTypeMismatchSnafu {
data_type: records.df_recordbatch.columns()[KEY_INDEX]
.data_type()
.clone(),
data_type: rb.df_recordbatch.columns()[KEY_INDEX].data_type().clone(),
})?;
let value = BinaryVector::try_from_arrow_array(&records.df_recordbatch.columns()[3])
let value = BinaryVector::try_from_arrow_array(&rb.df_recordbatch.columns()[3])
.with_context(|_| SystemCatalogTypeMismatchSnafu {
data_type: records.df_recordbatch.columns()[VALUE_INDEX]
.data_type()
.clone(),
data_type: rb.df_recordbatch.columns()[VALUE_INDEX].data_type().clone(),
})?;
let mut max_table_id = 0;
let mut res = Vec::with_capacity(rb.num_rows());
for ((t, k), v) in entry_type
.iter_data()
.zip(key.iter_data())
.zip(value.iter_data())
{
let entry = decode_system_catalog(t, k, v)?;
res.push(entry);
}
Ok(res)
}
/// Processes records from system catalog table and returns the max table id persisted
/// in system catalog table.
async fn handle_system_catalog_entries(&self, entries: Vec<Entry>) -> Result<TableId> {
let entries = Self::sort_entries(entries);
let mut max_table_id = 0;
for entry in entries {
match entry {
Entry::Catalog(c) => {
self.catalogs.register_catalog_if_absent(
@@ -192,6 +200,13 @@ impl LocalCatalogManager {
Ok(max_table_id)
}
/// Sort catalog entries to ensure catalog entries comes first, then schema entries,
/// and table entries is the last.
fn sort_entries(mut entries: Vec<Entry>) -> Vec<Entry> {
entries.sort();
entries
}
async fn open_and_register_table(&self, t: &TableEntry) -> Result<()> {
let catalog = self
.catalogs
@@ -351,3 +366,50 @@ impl CatalogManager for LocalCatalogManager {
schema.table(table_name)
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use super::*;
use crate::system::{CatalogEntry, SchemaEntry};
#[test]
fn test_sort_entry() {
let vec = vec![
Entry::Table(TableEntry {
catalog_name: "C1".to_string(),
schema_name: "S1".to_string(),
table_name: "T1".to_string(),
table_id: 1,
}),
Entry::Catalog(CatalogEntry {
catalog_name: "C2".to_string(),
}),
Entry::Schema(SchemaEntry {
catalog_name: "C1".to_string(),
schema_name: "S1".to_string(),
}),
Entry::Schema(SchemaEntry {
catalog_name: "C2".to_string(),
schema_name: "S2".to_string(),
}),
Entry::Catalog(CatalogEntry {
catalog_name: "".to_string(),
}),
Entry::Table(TableEntry {
catalog_name: "C1".to_string(),
schema_name: "S1".to_string(),
table_name: "T2".to_string(),
table_id: 2,
}),
];
let res = LocalCatalogManager::sort_entries(vec);
assert_matches!(res[0], Entry::Catalog(..));
assert_matches!(res[1], Entry::Catalog(..));
assert_matches!(res[2], Entry::Schema(..));
assert_matches!(res[3], Entry::Schema(..));
assert_matches!(res[4], Entry::Table(..));
assert_matches!(res[5], Entry::Table(..));
}
}

View File

@@ -306,25 +306,25 @@ impl TryFrom<u8> for EntryType {
}
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
pub enum Entry {
Catalog(CatalogEntry),
Schema(SchemaEntry),
Table(TableEntry),
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
pub struct CatalogEntry {
pub catalog_name: String,
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
pub struct SchemaEntry {
pub catalog_name: String,
pub schema_name: String,
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
pub struct TableEntry {
pub catalog_name: String,
pub schema_name: String,