From 2e9c9f217650e11d34ba1dae0a48395eba71d623 Mon Sep 17 00:00:00 2001 From: "Lei, Huang" <6406592+v0y4g3r@users.noreply.github.com> Date: Thu, 10 Nov 2022 19:21:27 +0800 Subject: [PATCH] fix: sort system catalog entries to ensure catalog entry are firstlt processed (#449) --- src/catalog/src/error.rs | 33 ++++++-- src/catalog/src/local/manager.rs | 136 ++++++++++++++++++++++--------- src/catalog/src/system.rs | 8 +- 3 files changed, 127 insertions(+), 50 deletions(-) diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index bb251aed57..4efe3553b9 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -48,13 +48,19 @@ pub enum Error { }, #[snafu(display("Invalid system catalog entry type: {:?}", entry_type))] - InvalidEntryType { entry_type: Option }, + InvalidEntryType { + entry_type: Option, + backtrace: Backtrace, + }, #[snafu(display("Invalid system catalog key: {:?}", key))] - InvalidKey { key: Option }, + InvalidKey { + key: Option, + backtrace: Backtrace, + }, #[snafu(display("Catalog value is not present"))] - EmptyValue, + EmptyValue { backtrace: Backtrace }, #[snafu(display("Failed to deserialize value, source: {}", source))] ValueDeserialize { @@ -63,10 +69,16 @@ pub enum Error { }, #[snafu(display("Cannot find catalog by name: {}", catalog_name))] - CatalogNotFound { catalog_name: String }, + CatalogNotFound { + catalog_name: String, + backtrace: Backtrace, + }, #[snafu(display("Cannot find schema, schema info: {}", schema_info))] - SchemaNotFound { schema_info: String }, + SchemaNotFound { + schema_info: String, + backtrace: Backtrace, + }, #[snafu(display("Table {} already exists", table))] TableExists { table: String, backtrace: Backtrace }, @@ -85,7 +97,10 @@ pub enum Error { }, #[snafu(display("Table not found while opening table, table info: {}", table_info))] - TableNotFound { table_info: String }, + TableNotFound { + table_info: String, + backtrace: Backtrace, + }, #[snafu(display("Failed to read system catalog table records"))] ReadSystemCatalog { @@ -190,7 +205,7 @@ impl ErrorExt for Error { | Error::CatalogStateInconsistent { .. } => StatusCode::Unexpected, Error::SystemCatalog { .. } - | Error::EmptyValue + | Error::EmptyValue { .. } | Error::ValueDeserialize { .. } | Error::Io { .. } => StatusCode::StorageUnavailable, @@ -255,7 +270,7 @@ mod tests { assert_eq!( StatusCode::Unexpected, - Error::InvalidKey { key: None }.status_code() + InvalidKeySnafu { key: None }.build().status_code() ); assert_eq!( @@ -296,7 +311,7 @@ mod tests { ); assert_eq!( StatusCode::StorageUnavailable, - Error::EmptyValue.status_code() + EmptyValueSnafu {}.build().status_code() ); } diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 3aba2ed3a0..405283df00 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -6,12 +6,11 @@ use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MIN_USER_TABLE_ID, SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME, }; -use common_recordbatch::RecordBatch; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use common_telemetry::info; use datatypes::prelude::ScalarVector; use datatypes::vectors::{BinaryVector, UInt8Vector}; use futures_util::lock::Mutex; -use futures_util::StreamExt; use snafu::{ensure, OptionExt, ResultExt}; use table::engine::{EngineContext, TableEngineRef}; use table::metadata::TableId; @@ -19,12 +18,11 @@ use table::requests::OpenTableRequest; use table::table::numbers::NumbersTable; use table::TableRef; -use crate::error::Result; use crate::error::{ - CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, - SchemaNotFoundSnafu, SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableExistsSnafu, - TableNotFoundSnafu, + CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, SchemaNotFoundSnafu, + SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableExistsSnafu, TableNotFoundSnafu, }; +use crate::error::{ReadSystemCatalogSnafu, Result}; use crate::local::memory::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; use crate::system::{ decode_system_catalog, Entry, SystemCatalogTable, TableEntry, ENTRY_TYPE_INDEX, KEY_INDEX, @@ -70,17 +68,10 @@ impl LocalCatalogManager { /// Scan all entries from system catalog table pub async fn init(&self) -> Result<()> { self.init_system_catalog()?; - let mut system_records = self.system.information_schema.system.records().await?; - let mut max_table_id = 0; - while let Some(records) = system_records - .next() - .await - .transpose() - .context(ReadSystemCatalogSnafu)? - { - let table_id = self.handle_system_catalog_entries(records).await?; - max_table_id = max_table_id.max(table_id); - } + let system_records = self.system.information_schema.system.records().await?; + let entries = self.collect_system_catalog_entries(system_records).await?; + let max_table_id = self.handle_system_catalog_entries(entries).await?; + info!( "All system catalog entries processed, max table id: {}", max_table_id @@ -110,7 +101,6 @@ impl LocalCatalogManager { let default_schema = Arc::new(MemorySchemaProvider::new()); // Add numbers table for test - // TODO(hl): remove this registration let table = Arc::new(NumbersTable::default()); default_schema.register_table("numbers".to_string(), table)?; @@ -120,47 +110,65 @@ impl LocalCatalogManager { Ok(()) } - /// Processes records from system catalog table and returns the max table id persisted - /// in system catalog table. - async fn handle_system_catalog_entries(&self, records: RecordBatch) -> Result { + /// Collect stream of system catalog entries to `Vec` + async fn collect_system_catalog_entries( + &self, + stream: SendableRecordBatchStream, + ) -> Result> { + let record_batch = common_recordbatch::util::collect(stream) + .await + .context(ReadSystemCatalogSnafu)?; + let rbs = record_batch + .into_iter() + .map(Self::record_batch_to_entry) + .collect::>>()?; + Ok(rbs.into_iter().flat_map(Vec::into_iter).collect::<_>()) + } + + /// Convert `RecordBatch` to a vector of `Entry`. + fn record_batch_to_entry(rb: RecordBatch) -> Result> { ensure!( - records.df_recordbatch.columns().len() >= 6, + rb.df_recordbatch.columns().len() >= 6, SystemCatalogSnafu { - msg: format!( - "Length mismatch: {}", - records.df_recordbatch.columns().len() - ) + msg: format!("Length mismatch: {}", rb.df_recordbatch.columns().len()) } ); - let entry_type = UInt8Vector::try_from_arrow_array(&records.df_recordbatch.columns()[0]) + let entry_type = UInt8Vector::try_from_arrow_array(&rb.df_recordbatch.columns()[0]) .with_context(|_| SystemCatalogTypeMismatchSnafu { - data_type: records.df_recordbatch.columns()[ENTRY_TYPE_INDEX] + data_type: rb.df_recordbatch.columns()[ENTRY_TYPE_INDEX] .data_type() .clone(), })?; - let key = BinaryVector::try_from_arrow_array(&records.df_recordbatch.columns()[1]) + let key = BinaryVector::try_from_arrow_array(&rb.df_recordbatch.columns()[1]) .with_context(|_| SystemCatalogTypeMismatchSnafu { - data_type: records.df_recordbatch.columns()[KEY_INDEX] - .data_type() - .clone(), + data_type: rb.df_recordbatch.columns()[KEY_INDEX].data_type().clone(), })?; - let value = BinaryVector::try_from_arrow_array(&records.df_recordbatch.columns()[3]) + let value = BinaryVector::try_from_arrow_array(&rb.df_recordbatch.columns()[3]) .with_context(|_| SystemCatalogTypeMismatchSnafu { - data_type: records.df_recordbatch.columns()[VALUE_INDEX] - .data_type() - .clone(), + data_type: rb.df_recordbatch.columns()[VALUE_INDEX].data_type().clone(), })?; - let mut max_table_id = 0; + let mut res = Vec::with_capacity(rb.num_rows()); for ((t, k), v) in entry_type .iter_data() .zip(key.iter_data()) .zip(value.iter_data()) { let entry = decode_system_catalog(t, k, v)?; + res.push(entry); + } + Ok(res) + } + + /// Processes records from system catalog table and returns the max table id persisted + /// in system catalog table. + async fn handle_system_catalog_entries(&self, entries: Vec) -> Result { + let entries = Self::sort_entries(entries); + let mut max_table_id = 0; + for entry in entries { match entry { Entry::Catalog(c) => { self.catalogs.register_catalog_if_absent( @@ -192,6 +200,13 @@ impl LocalCatalogManager { Ok(max_table_id) } + /// Sort catalog entries to ensure catalog entries comes first, then schema entries, + /// and table entries is the last. + fn sort_entries(mut entries: Vec) -> Vec { + entries.sort(); + entries + } + async fn open_and_register_table(&self, t: &TableEntry) -> Result<()> { let catalog = self .catalogs @@ -351,3 +366,50 @@ impl CatalogManager for LocalCatalogManager { schema.table(table_name) } } + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use super::*; + use crate::system::{CatalogEntry, SchemaEntry}; + + #[test] + fn test_sort_entry() { + let vec = vec![ + Entry::Table(TableEntry { + catalog_name: "C1".to_string(), + schema_name: "S1".to_string(), + table_name: "T1".to_string(), + table_id: 1, + }), + Entry::Catalog(CatalogEntry { + catalog_name: "C2".to_string(), + }), + Entry::Schema(SchemaEntry { + catalog_name: "C1".to_string(), + schema_name: "S1".to_string(), + }), + Entry::Schema(SchemaEntry { + catalog_name: "C2".to_string(), + schema_name: "S2".to_string(), + }), + Entry::Catalog(CatalogEntry { + catalog_name: "".to_string(), + }), + Entry::Table(TableEntry { + catalog_name: "C1".to_string(), + schema_name: "S1".to_string(), + table_name: "T2".to_string(), + table_id: 2, + }), + ]; + let res = LocalCatalogManager::sort_entries(vec); + assert_matches!(res[0], Entry::Catalog(..)); + assert_matches!(res[1], Entry::Catalog(..)); + assert_matches!(res[2], Entry::Schema(..)); + assert_matches!(res[3], Entry::Schema(..)); + assert_matches!(res[4], Entry::Table(..)); + assert_matches!(res[5], Entry::Table(..)); + } +} diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 66dc2e1116..232341e0b0 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -306,25 +306,25 @@ impl TryFrom for EntryType { } } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)] pub enum Entry { Catalog(CatalogEntry), Schema(SchemaEntry), Table(TableEntry), } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)] pub struct CatalogEntry { pub catalog_name: String, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)] pub struct SchemaEntry { pub catalog_name: String, pub schema_name: String, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)] pub struct TableEntry { pub catalog_name: String, pub schema_name: String,