mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-27 10:20:38 +00:00
@@ -55,7 +55,7 @@ The DataFusion basically execute aggregate like this:
|
||||
2. Call `update_batch` on each accumulator with partitioned data, to let you update your aggregate calculation.
|
||||
3. Call `state` to get each accumulator's internal state, the medial calculation result.
|
||||
4. Call `merge_batch` to merge all accumulator's internal state to one.
|
||||
5. Execute `evalute` on the chosen one to get the final calculation result.
|
||||
5. Execute `evaluate` on the chosen one to get the final calculation result.
|
||||
|
||||
Once you know the meaning of each method, you can easily write your accumulator. You can refer to `Median` accumulator or `SUM` accumulator defined in file `my_sum_udaf_example.rs` for more details.
|
||||
|
||||
@@ -63,7 +63,7 @@ Once you know the meaning of each method, you can easily write your accumulator.
|
||||
|
||||
You can call `register_aggregate_function` method in query engine to register your aggregate function. To do that, you have to new an instance of struct `AggregateFunctionMeta`. The struct has three fields, first is the name of your aggregate function's name. The function name is case-sensitive due to DataFusion's restriction. We strongly recommend using lowercase for your name. If you have to use uppercase name, wrap your aggregate function with quotation marks. For example, if you define an aggregate function named "my_aggr", you can use "`SELECT MY_AGGR(x)`"; if you define "my_AGGR", you have to use "`SELECT "my_AGGR"(x)`".
|
||||
|
||||
The second field is arg_counts ,the count of the arguments. Like accumulator `percentile`, caculating the p_number of the column. We need to input the value of column and the value of p to cacalate, and so the count of the arguments is two.
|
||||
The second field is arg_counts ,the count of the arguments. Like accumulator `percentile`, calculating the p_number of the column. We need to input the value of column and the value of p to cacalate, and so the count of the arguments is two.
|
||||
|
||||
The third field is a function about how to create your accumulator creator that you defined in step 1 above. Create creator, that's a bit intertwined, but it is how we make DataFusion use a newly created aggregate function each time it executes a SQL, preventing the stored input types from affecting each other. The key detail can be starting looking at our `DfContextProviderAdapter` struct's `get_aggregate_meta` method.
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ message NodeStat {
|
||||
uint64 wcus = 2;
|
||||
// Table number in this node
|
||||
uint64 table_num = 3;
|
||||
// Regon number in this node
|
||||
// Region number in this node
|
||||
uint64 region_num = 4;
|
||||
|
||||
double cpu_usage = 5;
|
||||
|
||||
@@ -383,7 +383,7 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
pub fn test_decode_catalog_enrty() {
|
||||
pub fn test_decode_catalog_entry() {
|
||||
let entry = decode_system_catalog(
|
||||
Some(EntryType::Catalog as u8),
|
||||
Some("some_catalog".as_bytes()),
|
||||
|
||||
@@ -43,7 +43,7 @@ async fn run() {
|
||||
|
||||
fn insert_batches() -> Vec<Vec<u8>> {
|
||||
const SEMANTIC_TAG: i32 = 0;
|
||||
const SEMANTIC_FEILD: i32 = 1;
|
||||
const SEMANTIC_FIELD: i32 = 1;
|
||||
const SEMANTIC_TS: i32 = 2;
|
||||
|
||||
let row_count = 4;
|
||||
@@ -71,7 +71,7 @@ fn insert_batches() -> Vec<Vec<u8>> {
|
||||
};
|
||||
let cpu_column = Column {
|
||||
column_name: "cpu".to_string(),
|
||||
semantic_type: SEMANTIC_FEILD,
|
||||
semantic_type: SEMANTIC_FIELD,
|
||||
values: Some(cpu_vals),
|
||||
null_mask: vec![2],
|
||||
..Default::default()
|
||||
@@ -83,7 +83,7 @@ fn insert_batches() -> Vec<Vec<u8>> {
|
||||
};
|
||||
let mem_column = Column {
|
||||
column_name: "memory".to_string(),
|
||||
semantic_type: SEMANTIC_FEILD,
|
||||
semantic_type: SEMANTIC_FIELD,
|
||||
values: Some(mem_vals),
|
||||
null_mask: vec![4],
|
||||
..Default::default()
|
||||
|
||||
@@ -22,7 +22,7 @@ use api::v1::{
|
||||
SelectExpr,
|
||||
};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_grpc::{AsExcutionPlan, DefaultAsPlanImpl};
|
||||
use common_grpc::{AsExecutionPlan, DefaultAsPlanImpl};
|
||||
use common_insert::column_to_vector;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::{RecordBatch, RecordBatches};
|
||||
|
||||
@@ -20,4 +20,4 @@ pub mod writer;
|
||||
|
||||
pub use error::Error;
|
||||
pub use physical::plan::{DefaultAsPlanImpl, MockExecution};
|
||||
pub use physical::AsExcutionPlan;
|
||||
pub use physical::AsExecutionPlan;
|
||||
|
||||
@@ -22,7 +22,7 @@ use datafusion::physical_plan::ExecutionPlan;
|
||||
|
||||
pub type ExecutionPlanRef = Arc<dyn ExecutionPlan>;
|
||||
|
||||
pub trait AsExcutionPlan {
|
||||
pub trait AsExecutionPlan {
|
||||
type Error: std::error::Error;
|
||||
|
||||
fn try_into_physical_plan(&self) -> Result<ExecutionPlanRef, Self::Error>;
|
||||
|
||||
@@ -35,13 +35,13 @@ use crate::error::{
|
||||
DecodePhysicalPlanNodeSnafu, EmptyPhysicalPlanSnafu, Error, MissingFieldSnafu,
|
||||
NewProjectionSnafu, UnsupportedDfPlanSnafu,
|
||||
};
|
||||
use crate::physical::{expr, AsExcutionPlan, ExecutionPlanRef};
|
||||
use crate::physical::{expr, AsExecutionPlan, ExecutionPlanRef};
|
||||
|
||||
pub struct DefaultAsPlanImpl {
|
||||
pub bytes: Vec<u8>,
|
||||
}
|
||||
|
||||
impl AsExcutionPlan for DefaultAsPlanImpl {
|
||||
impl AsExecutionPlan for DefaultAsPlanImpl {
|
||||
type Error = Error;
|
||||
|
||||
// Vec<u8> -> PhysicalPlanNode -> ExecutionPlanRef
|
||||
@@ -64,7 +64,7 @@ impl AsExcutionPlan for DefaultAsPlanImpl {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsExcutionPlan for PhysicalPlanNode {
|
||||
impl AsExecutionPlan for PhysicalPlanNode {
|
||||
type Error = Error;
|
||||
|
||||
fn try_into_physical_plan(&self) -> Result<ExecutionPlanRef, Self::Error> {
|
||||
@@ -227,7 +227,7 @@ mod tests {
|
||||
use datafusion::physical_plan::projection::ProjectionExec;
|
||||
|
||||
use crate::physical::plan::{DefaultAsPlanImpl, MockExecution};
|
||||
use crate::physical::{AsExcutionPlan, ExecutionPlanRef};
|
||||
use crate::physical::{AsExecutionPlan, ExecutionPlanRef};
|
||||
|
||||
#[test]
|
||||
fn test_convert_df_projection_with_bytes() {
|
||||
@@ -236,7 +236,7 @@ mod tests {
|
||||
let bytes = DefaultAsPlanImpl::try_from_physical_plan(projection_exec).unwrap();
|
||||
let exec = bytes.try_into_physical_plan().unwrap();
|
||||
|
||||
verify_df_porjection(exec);
|
||||
verify_df_projection(exec);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -246,7 +246,7 @@ mod tests {
|
||||
let projection_node = PhysicalPlanNode::try_from_physical_plan(projection_exec).unwrap();
|
||||
let exec = projection_node.try_into_physical_plan().unwrap();
|
||||
|
||||
verify_df_porjection(exec);
|
||||
verify_df_projection(exec);
|
||||
}
|
||||
|
||||
fn mock_df_projection() -> Arc<ProjectionExec> {
|
||||
@@ -264,7 +264,7 @@ mod tests {
|
||||
)
|
||||
}
|
||||
|
||||
fn verify_df_porjection(exec: ExecutionPlanRef) {
|
||||
fn verify_df_projection(exec: ExecutionPlanRef) {
|
||||
let projection_exec = exec.as_any().downcast_ref::<ProjectionExec>().unwrap();
|
||||
let mock_input = projection_exec
|
||||
.input()
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
//! Udf module contains foundational types that are used to represent UDFs.
|
||||
//! It's modifed from datafusion.
|
||||
//! It's modified from datafusion.
|
||||
use std::fmt;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -53,7 +53,7 @@ pub struct Signature {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn concret_types_to_arrow_types(ts: Vec<ConcreteDataType>) -> Vec<ArrowDataType> {
|
||||
fn concrete_types_to_arrow_types(ts: Vec<ConcreteDataType>) -> Vec<ArrowDataType> {
|
||||
ts.iter().map(ConcreteDataType::as_arrow_type).collect()
|
||||
}
|
||||
|
||||
@@ -118,14 +118,14 @@ impl From<TypeSignature> for DfTypeSignature {
|
||||
fn from(type_signature: TypeSignature) -> DfTypeSignature {
|
||||
match type_signature {
|
||||
TypeSignature::Variadic(types) => {
|
||||
DfTypeSignature::Variadic(concret_types_to_arrow_types(types))
|
||||
DfTypeSignature::Variadic(concrete_types_to_arrow_types(types))
|
||||
}
|
||||
TypeSignature::VariadicEqual => DfTypeSignature::VariadicEqual,
|
||||
TypeSignature::Uniform(n, types) => {
|
||||
DfTypeSignature::Uniform(n, concret_types_to_arrow_types(types))
|
||||
DfTypeSignature::Uniform(n, concrete_types_to_arrow_types(types))
|
||||
}
|
||||
TypeSignature::Exact(types) => {
|
||||
DfTypeSignature::Exact(concret_types_to_arrow_types(types))
|
||||
DfTypeSignature::Exact(concrete_types_to_arrow_types(types))
|
||||
}
|
||||
TypeSignature::Any(n) => DfTypeSignature::Any(n),
|
||||
TypeSignature::OneOf(ts) => {
|
||||
|
||||
@@ -178,14 +178,14 @@ impl DFLogicalSubstraitConvertor {
|
||||
})?;
|
||||
let adapter = Arc::new(DfTableProviderAdapter::new(table_ref));
|
||||
|
||||
// Get schema directly from the table, and compare it with the schema retrived from substrait proto.
|
||||
// Get schema directly from the table, and compare it with the schema retrieved from substrait proto.
|
||||
let stored_schema = adapter.schema();
|
||||
let retrived_schema = to_schema(read_rel.base_schema.unwrap_or_default())?;
|
||||
let retrived_arrow_schema = retrived_schema.arrow_schema();
|
||||
let retrieved_schema = to_schema(read_rel.base_schema.unwrap_or_default())?;
|
||||
let retrieved_arrow_schema = retrieved_schema.arrow_schema();
|
||||
ensure!(
|
||||
stored_schema.fields == retrived_arrow_schema.fields,
|
||||
stored_schema.fields == retrieved_arrow_schema.fields,
|
||||
SchemaNotMatchSnafu {
|
||||
substrait_schema: retrived_arrow_schema.clone(),
|
||||
substrait_schema: retrieved_arrow_schema.clone(),
|
||||
storage_schema: stored_schema
|
||||
}
|
||||
);
|
||||
|
||||
@@ -81,7 +81,7 @@ pub enum Error {
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Table quering not found: {}", name))]
|
||||
#[snafu(display("Table querying not found: {}", name))]
|
||||
TableNotFound { name: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Cannot convert plan doesn't belong to GreptimeDB"))]
|
||||
|
||||
@@ -12,10 +12,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Methods that perform convertion between Substrait's type ([Type](SType)) and GreptimeDB's type ([ConcreteDataType]).
|
||||
//! Methods that perform conversion between Substrait's type ([Type](SType)) and GreptimeDB's type ([ConcreteDataType]).
|
||||
//!
|
||||
//! Substrait use [type variation](https://substrait.io/types/type_variations/) to express different "logical types".
|
||||
//! Current we only have variations on integer types. Variation 0 (system prefered) are the same with base types, which
|
||||
//! Current we only have variations on integer types. Variation 0 (system preferred) are the same with base types, which
|
||||
//! are signed integer (i.e. I8 -> [i8]), and Variation 1 stands for unsigned integer (i.e. I8 -> [u8]).
|
||||
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
|
||||
@@ -339,7 +339,7 @@ mod tests {
|
||||
assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype());
|
||||
assert!(column_schema.is_nullable());
|
||||
|
||||
let default_constraint = ColumnDefaultConstraint::Value(Value::from("defaut value"));
|
||||
let default_constraint = ColumnDefaultConstraint::Value(Value::from("default value"));
|
||||
let column_def = ColumnDef {
|
||||
name: "a".to_string(),
|
||||
datatype: 12, // string
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_grpc::{AsExcutionPlan, DefaultAsPlanImpl};
|
||||
use common_grpc::{AsExecutionPlan, DefaultAsPlanImpl};
|
||||
use common_query::physical_plan::{PhysicalPlanAdapter, PhysicalPlanRef};
|
||||
use common_query::Output;
|
||||
use datatypes::schema::Schema;
|
||||
|
||||
@@ -156,7 +156,7 @@ impl Schema {
|
||||
/// Create a schema from a vector of [ColumnSchema].
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics when ColumnSchema's `default_constrait` can't be serialized into json.
|
||||
/// Panics when ColumnSchema's `default_constraint` can't be serialized into json.
|
||||
pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
|
||||
// Builder won't fail in this case
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
|
||||
@@ -44,7 +44,7 @@ pub(crate) fn find_unique_scalar<'a, T: ScalarVector>(
|
||||
}
|
||||
}
|
||||
|
||||
// Marks first element as selcted if it is different from previous element, otherwise
|
||||
// Marks first element as selected if it is different from previous element, otherwise
|
||||
// keep selected bitmap unchanged.
|
||||
let is_first_not_duplicate = prev_vector
|
||||
.map(|pv| {
|
||||
|
||||
@@ -32,7 +32,7 @@ pub struct WriteSpliter {
|
||||
}
|
||||
|
||||
impl WriteSpliter {
|
||||
pub fn with_patition_rule(rule: PartitionRuleRef<Error>) -> Self {
|
||||
pub fn with_partition_rule(rule: PartitionRuleRef<Error>) -> Self {
|
||||
Self {
|
||||
partition_rule: rule,
|
||||
}
|
||||
@@ -204,7 +204,7 @@ mod tests {
|
||||
fn test_writer_spliter() {
|
||||
let insert = mock_insert_request();
|
||||
let rule = Arc::new(MockPartitionRule) as PartitionRuleRef<Error>;
|
||||
let spliter = WriteSpliter::with_patition_rule(rule);
|
||||
let spliter = WriteSpliter::with_partition_rule(rule);
|
||||
let ret = spliter.split(insert).unwrap();
|
||||
|
||||
assert_eq!(2, ret.len());
|
||||
@@ -354,16 +354,16 @@ mod tests {
|
||||
let vectors = vec![v1, v2];
|
||||
|
||||
let row_0_vals = partition_values(&vectors, 0);
|
||||
let expeted: Vec<Value> = vec![true.into(), "host1".into()];
|
||||
assert_eq!(expeted, row_0_vals);
|
||||
let expected: Vec<Value> = vec![true.into(), "host1".into()];
|
||||
assert_eq!(expected, row_0_vals);
|
||||
|
||||
let row_1_vals = partition_values(&vectors, 1);
|
||||
let expeted: Vec<Value> = vec![false.into(), Value::Null];
|
||||
assert_eq!(expeted, row_1_vals);
|
||||
let expected: Vec<Value> = vec![false.into(), Value::Null];
|
||||
assert_eq!(expected, row_1_vals);
|
||||
|
||||
let row_2_vals = partition_values(&vectors, 2);
|
||||
let expeted: Vec<Value> = vec![true.into(), "host3".into()];
|
||||
assert_eq!(expeted, row_2_vals);
|
||||
let expected: Vec<Value> = vec![true.into(), "host3".into()];
|
||||
assert_eq!(expected, row_2_vals);
|
||||
}
|
||||
|
||||
fn mock_insert_request() -> InsertRequest {
|
||||
|
||||
@@ -81,7 +81,7 @@ impl Table for DistTable {
|
||||
async fn insert(&self, request: InsertRequest) -> table::Result<usize> {
|
||||
let partition_rule = self.find_partition_rule().await.map_err(TableError::new)?;
|
||||
|
||||
let spliter = WriteSpliter::with_patition_rule(partition_rule);
|
||||
let spliter = WriteSpliter::with_partition_rule(partition_rule);
|
||||
let inserts = spliter.split(request).map_err(TableError::new)?;
|
||||
let result = match self.dist_insert(inserts).await.map_err(TableError::new)? {
|
||||
client::ObjectResult::Select(_) => unreachable!(),
|
||||
|
||||
@@ -186,7 +186,7 @@ impl TryFrom<PbTable> for Table {
|
||||
let table_name = t
|
||||
.table_name
|
||||
.context(error::RouteInfoCorruptedSnafu {
|
||||
err_msg: "table name requied",
|
||||
err_msg: "table name required",
|
||||
})?
|
||||
.into();
|
||||
Ok(Self {
|
||||
|
||||
@@ -85,7 +85,7 @@ impl heartbeat_server::Heartbeat for MetaSrv {
|
||||
|
||||
match tx.send(Err(err)).await {
|
||||
Ok(_) => (),
|
||||
Err(_err) => break, // response was droped
|
||||
Err(_err) => break, // response was dropped
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,7 +51,7 @@ pub trait ScriptEngine {
|
||||
) -> std::result::Result<Self::Script, Self::Error>;
|
||||
}
|
||||
|
||||
/// Evalute script context
|
||||
/// Evaluate script context
|
||||
#[derive(Debug, Default)]
|
||||
pub struct EvalContext {}
|
||||
|
||||
|
||||
@@ -235,7 +235,7 @@ macro_rules! bind_call_unary_math_function {
|
||||
|
||||
/// The macro for binding function in `datafusion_physical_expr::expressions`(most of them are aggregate function)
|
||||
///
|
||||
/// - first arguements is the name of datafusion expression function like `Avg`
|
||||
/// - first arguments is the name of datafusion expression function like `Avg`
|
||||
/// - second is the python virtual machine ident `vm`
|
||||
/// - following is the actual args passing in(as a slice).i.e.`&[values.to_arrow_array()]`
|
||||
/// - the data type of passing in args, i.e: `Datatype::Float64`
|
||||
@@ -259,7 +259,7 @@ fn from_df_err(err: DataFusionError, vm: &VirtualMachine) -> PyBaseExceptionRef
|
||||
vm.new_runtime_error(format!("Data Fusion Error: {err:#?}"))
|
||||
}
|
||||
|
||||
/// evalute Aggregate Expr using its backing accumulator
|
||||
/// evaluate Aggregate Expr using its backing accumulator
|
||||
fn eval_aggr_fn<T: AggregateExpr>(
|
||||
aggr: T,
|
||||
values: &[ArrayRef],
|
||||
@@ -1120,7 +1120,7 @@ pub(crate) mod greptime_builtin {
|
||||
State::Num(v) => {
|
||||
if cur_idx + 1 > parsed.len() {
|
||||
return Err(vm.new_runtime_error(
|
||||
"Expect a spearator after number, found nothing!".to_string(),
|
||||
"Expect a separator after number, found nothing!".to_string(),
|
||||
));
|
||||
}
|
||||
let nxt = &parsed[cur_idx + 1];
|
||||
@@ -1128,7 +1128,7 @@ pub(crate) mod greptime_builtin {
|
||||
tot_time += v * factor(sep, vm)?;
|
||||
} else {
|
||||
return Err(vm.new_runtime_error(format!(
|
||||
"Expect a spearator after number, found `{nxt:#?}`"
|
||||
"Expect a separator after number, found `{nxt:#?}`"
|
||||
)));
|
||||
}
|
||||
cur_idx += 2;
|
||||
|
||||
@@ -118,7 +118,7 @@ struct Var {
|
||||
ty: DataType,
|
||||
}
|
||||
|
||||
/// for floating number comparsion
|
||||
/// for floating number comparison
|
||||
const EPS: f64 = 2.0 * f64::EPSILON;
|
||||
|
||||
/// Null element just not supported for now for simplicity with writing test cases
|
||||
|
||||
@@ -49,7 +49,7 @@ use crate::python::PyVector;
|
||||
#[cfg_attr(test, derive(Deserialize))]
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct AnnotationInfo {
|
||||
/// if None, use types infered by PyVector
|
||||
/// if None, use types inferred by PyVector
|
||||
pub datatype: Option<DataType>,
|
||||
pub is_nullable: bool,
|
||||
}
|
||||
@@ -115,7 +115,7 @@ impl Coprocessor {
|
||||
datatype: ty,
|
||||
is_nullable,
|
||||
} = anno[idx].to_owned().unwrap_or_else(||
|
||||
// default to be not nullable and use DataType infered by PyVector itself
|
||||
// default to be not nullable and use DataType inferred by PyVector itself
|
||||
AnnotationInfo{
|
||||
datatype: Some(real_ty.to_owned()),
|
||||
is_nullable: false
|
||||
@@ -208,7 +208,7 @@ fn try_into_py_vector(fetch_args: Vec<ArrayRef>) -> Result<Vec<PyVector>> {
|
||||
}
|
||||
_ => {
|
||||
return ret_other_error_with(format!(
|
||||
"Unsupport data type at column {idx}: {:?} for coprocessor",
|
||||
"Unsupported data type at column {idx}: {:?} for coprocessor",
|
||||
arg.data_type()
|
||||
))
|
||||
.fail()
|
||||
@@ -348,7 +348,7 @@ fn set_items_in_scope(
|
||||
/// ```
|
||||
///
|
||||
/// # Type Annotation
|
||||
/// you can use type annotations in args and returns to designate types, so coprocessor will check for corrsponding types.
|
||||
/// you can use type annotations in args and returns to designate types, so coprocessor will check for corresponding types.
|
||||
///
|
||||
/// Currently support types are `u8`, `u16`, `u32`, `u64`, `i8`, `i16`, `i32`, `i64` and `f16`, `f32`, `f64`
|
||||
///
|
||||
|
||||
@@ -67,7 +67,7 @@ fn gen_call(name: &str, deco_args: &DecoratorArgs, loc: &Location) -> ast::Stmt<
|
||||
|
||||
/// stripe the decorator(`@xxxx`) and type annotation(for type checker is done in rust function), add one line in the ast for call function with given parameter, and compiler into `CodeObject`
|
||||
///
|
||||
/// The rationale is that rustpython's vm is not very efficient according to [offical benchmark](https://rustpython.github.io/benchmarks),
|
||||
/// The rationale is that rustpython's vm is not very efficient according to [official benchmark](https://rustpython.github.io/benchmarks),
|
||||
/// So we should avoid running too much Python Bytecode, hence in this function we delete `@` decorator(instead of actually write a decorator in python)
|
||||
/// And add a function call in the end and also
|
||||
/// strip type annotation
|
||||
@@ -108,8 +108,8 @@ pub fn compile_script(name: &str, deco_args: &DecoratorArgs, script: &str) -> Re
|
||||
}
|
||||
loc = Some(stmt.location);
|
||||
|
||||
// This manually construct ast has no corrsponding code
|
||||
// in the script, so just give it a location that don't exist in orginal script
|
||||
// This manually construct ast has no corresponding code
|
||||
// in the script, so just give it a location that don't exist in original script
|
||||
// (which doesn't matter because Location usually only used in pretty print errors)
|
||||
}
|
||||
// Append statement which calling coprocessor function.
|
||||
|
||||
@@ -286,7 +286,7 @@ fn parse_keywords(keywords: &Vec<ast::Keyword<()>>) -> Result<DecoratorArgs> {
|
||||
let s = s.as_str();
|
||||
if visited_key.contains(s) {
|
||||
return fail_parse_error!(
|
||||
format!("`{s}` occur multiple times in decorator's arguements' list."),
|
||||
format!("`{s}` occur multiple times in decorator's arguments' list."),
|
||||
Some(kw.location),
|
||||
);
|
||||
}
|
||||
@@ -308,7 +308,7 @@ fn parse_keywords(keywords: &Vec<ast::Keyword<()>>) -> Result<DecoratorArgs> {
|
||||
None => {
|
||||
return fail_parse_error!(
|
||||
format!(
|
||||
"Expect explictly set both `args` and `returns`, found \n{:#?}",
|
||||
"Expect explicitly set both `args` and `returns`, found \n{:#?}",
|
||||
&kw.node
|
||||
),
|
||||
Some(kw.location),
|
||||
@@ -365,14 +365,14 @@ fn parse_decorator(decorator: &ast::Expr<()>) -> Result<DecoratorArgs> {
|
||||
}
|
||||
}
|
||||
|
||||
// get type annotaion in arguments
|
||||
// get type annotation in arguments
|
||||
fn get_arg_annotations(args: &Arguments) -> Result<Vec<Option<AnnotationInfo>>> {
|
||||
// get arg types from type annotation>
|
||||
args.args
|
||||
.iter()
|
||||
.map(|arg| {
|
||||
if let Some(anno) = &arg.node.annotation {
|
||||
// for there is erro handling for parse_annotation
|
||||
// for there is error handling for parse_annotation
|
||||
parse_annotation(anno).map(Some)
|
||||
} else {
|
||||
Ok(None)
|
||||
@@ -472,7 +472,7 @@ pub fn parse_and_compile_copr(script: &str) -> Result<Coprocessor> {
|
||||
.collect()
|
||||
};
|
||||
|
||||
// make sure both arguments&returns in fucntion
|
||||
// make sure both arguments&returns in function
|
||||
// and in decorator have same length
|
||||
ensure!(
|
||||
deco_args.arg_names.len() == arg_types.len(),
|
||||
|
||||
@@ -140,7 +140,7 @@ impl ErrorExt for Error {
|
||||
self
|
||||
}
|
||||
}
|
||||
// impl from for those error so one can use question mark and implictly cast into `CoprError`
|
||||
// impl from for those error so one can use question mark and implicitly cast into `CoprError`
|
||||
impl From<DataTypeError> for Error {
|
||||
fn from(e: DataTypeError) -> Self {
|
||||
Self::TypeCast { source: e }
|
||||
|
||||
@@ -450,7 +450,7 @@ impl PyVector {
|
||||
#[pymethod(magic)]
|
||||
fn rfloordiv(&self, other: PyObjectRef, vm: &VirtualMachine) -> PyResult<PyVector> {
|
||||
if is_pyobj_scalar(&other, vm) {
|
||||
// FIXME: DataType convert problem, target_type should be infered?
|
||||
// FIXME: DataType convert problem, target_type should be inferred?
|
||||
self.scalar_arith_op(other, Some(DataType::Int64), arrow2_rfloordiv_scalar, vm)
|
||||
} else {
|
||||
self.arith_op(
|
||||
@@ -482,7 +482,7 @@ impl PyVector {
|
||||
// The Comparable Trait only support normal cmp
|
||||
// (yes there is a slot_richcompare function, but it is not used in anywhere)
|
||||
// so use our own function
|
||||
// TODO(discord9): test those funciton
|
||||
// TODO(discord9): test those function
|
||||
|
||||
#[pymethod(name = "eq")]
|
||||
#[pymethod(magic)]
|
||||
@@ -676,7 +676,7 @@ impl PyVector {
|
||||
}
|
||||
}
|
||||
|
||||
/// Unsupport
|
||||
/// Unsupported
|
||||
/// TODO(discord9): make it work
|
||||
#[allow(unused)]
|
||||
fn setitem_by_index(
|
||||
@@ -689,7 +689,7 @@ impl PyVector {
|
||||
}
|
||||
}
|
||||
|
||||
/// get corrsponding arrow op function according to given PyComaprsionOp
|
||||
/// get corresponding arrow op function according to given PyComaprsionOp
|
||||
///
|
||||
/// TODO(discord9): impl scalar version function
|
||||
fn get_arrow_op(op: PyComparisonOp) -> impl Fn(&dyn Array, &dyn Array) -> Box<dyn Array> {
|
||||
@@ -708,7 +708,7 @@ fn get_arrow_op(op: PyComparisonOp) -> impl Fn(&dyn Array, &dyn Array) -> Box<dy
|
||||
}
|
||||
}
|
||||
|
||||
/// get corrsponding arrow scalar op function according to given PyComaprsionOp
|
||||
/// get corresponding arrow scalar op function according to given PyComaprsionOp
|
||||
///
|
||||
/// TODO(discord9): impl scalar version function
|
||||
fn get_arrow_scalar_op(
|
||||
|
||||
@@ -453,7 +453,7 @@ mod test {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_recordbatches_convertion() {
|
||||
async fn test_recordbatches_conversion() {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
|
||||
ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
|
||||
|
||||
@@ -39,7 +39,7 @@ pub trait Server: Send {
|
||||
async fn start(&self, listening: SocketAddr) -> Result<SocketAddr>;
|
||||
}
|
||||
|
||||
struct AccpetTask {
|
||||
struct AcceptTask {
|
||||
// `abort_handle` and `abort_registration` are used in pairs in shutting down the server.
|
||||
// They work like sender and receiver for aborting stream. When the server is shutting down,
|
||||
// calling `abort_handle.abort()` will "notify" `abort_registration` to stop emitting new
|
||||
@@ -51,7 +51,7 @@ struct AccpetTask {
|
||||
join_handle: Option<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl AccpetTask {
|
||||
impl AcceptTask {
|
||||
async fn shutdown(&mut self, name: &str) -> Result<()> {
|
||||
match self.join_handle.take() {
|
||||
Some(join_handle) => {
|
||||
@@ -118,7 +118,7 @@ impl AccpetTask {
|
||||
|
||||
pub(crate) struct BaseTcpServer {
|
||||
name: String,
|
||||
accept_task: Mutex<AccpetTask>,
|
||||
accept_task: Mutex<AcceptTask>,
|
||||
io_runtime: Arc<Runtime>,
|
||||
}
|
||||
|
||||
@@ -127,7 +127,7 @@ impl BaseTcpServer {
|
||||
let (abort_handle, registration) = AbortHandle::new_pair();
|
||||
Self {
|
||||
name: name.into(),
|
||||
accept_task: Mutex::new(AccpetTask {
|
||||
accept_task: Mutex::new(AcceptTask {
|
||||
abort_handle,
|
||||
abort_registration: Some(registration),
|
||||
join_handle: None,
|
||||
|
||||
@@ -117,7 +117,7 @@ impl<'a> ParserContext<'a> {
|
||||
}
|
||||
|
||||
/// Parses SHOW statements
|
||||
/// todo(hl) support `show settings`/`show create`/`show users` ect.
|
||||
/// todo(hl) support `show settings`/`show create`/`show users` etc.
|
||||
fn parse_show(&mut self) -> Result<Statement> {
|
||||
if self.consume_token("DATABASES") || self.consume_token("SCHEMAS") {
|
||||
self.parse_show_databases()
|
||||
|
||||
@@ -170,7 +170,7 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
RegionImpl { inner }
|
||||
}
|
||||
|
||||
/// Open an exsiting region and recover its data.
|
||||
/// Open an existing region and recover its data.
|
||||
///
|
||||
/// The caller should avoid calling this method simultaneously.
|
||||
pub async fn open(
|
||||
|
||||
@@ -547,7 +547,7 @@ impl WriterInner {
|
||||
let flush_req = FlushJob {
|
||||
max_memtable_id: max_memtable_id.unwrap(),
|
||||
memtables: mem_to_flush,
|
||||
// In write thread, safe to use current commited sequence.
|
||||
// In write thread, safe to use current committed sequence.
|
||||
flush_sequence: version_control.committed_sequence(),
|
||||
shared: ctx.shared.clone(),
|
||||
sst_layer: ctx.sst_layer.clone(),
|
||||
|
||||
@@ -280,7 +280,7 @@ impl BatchOp for ProjectedSchema {
|
||||
let indices = self.schema_to_read.row_key_indices();
|
||||
for idx in indices {
|
||||
let (left_col, right_col) = (left.column(idx), right.column(idx));
|
||||
// Comparision of vector is done by virtual method calls currently. Consider using
|
||||
// Comparison of vector is done by virtual method calls currently. Consider using
|
||||
// enum dispatch if this becomes bottleneck.
|
||||
let order = left_col.get_ref(i).cmp(&right_col.get_ref(j));
|
||||
if order != Ordering::Equal {
|
||||
|
||||
Reference in New Issue
Block a user