@@ -26,7 +26,7 @@ use crate::error::DataFusionError;
2626use crate :: logical_plan:: dfschema:: DFSchemaRef ;
2727use crate :: sql:: parser:: FileType ;
2828use arrow:: datatypes:: { DataType , Field , Schema , SchemaRef } ;
29- use datafusion_common:: DFSchema ;
29+ use datafusion_common:: { DFField , DFSchema } ;
3030use std:: fmt:: Formatter ;
3131use std:: {
3232 collections:: HashSet ,
@@ -267,14 +267,37 @@ pub struct Limit {
267267/// Evaluates correlated sub queries
268268#[ derive( Clone ) ]
269269pub struct Subquery {
270- /// The list of sub queries
270+ /// The list of sub queries (SubqueryNode)
271271 pub subqueries : Vec < LogicalPlan > ,
272272 /// The incoming logical plan
273273 pub input : Arc < LogicalPlan > ,
274274 /// The schema description of the output
275275 pub schema : DFSchemaRef ,
276276}
277277
278+ /// Subquery node defines single subquery with its type
279+ #[ derive( Clone ) ]
280+ pub struct SubqueryNode {
281+ /// The logical plan of subquery
282+ pub input : Arc < LogicalPlan > ,
283+ /// The subquery type
284+ pub typ : SubqueryType ,
285+ /// The schema description of the output
286+ pub schema : DFSchemaRef ,
287+ }
288+
289+ /// Subquery type
290+ #[ derive( Debug , Clone , Copy , PartialEq , Eq , PartialOrd ) ]
291+ pub enum SubqueryType {
292+ /// Scalar (SELECT, WHERE) evaluating to one value
293+ Scalar ,
294+ /// EXISTS(...) evaluating to true if at least one row was produced
295+ Exists ,
296+ /// ANY(...) / ALL(...)
297+ AnyAll ,
298+ // [NOT] IN(...) is not defined as it is implicitly evaluated as ANY = (...) / ALL <> (...)
299+ }
300+
278301impl Subquery {
279302 /// Merge schema of main input and correlated subquery columns
280303 pub fn merged_schema ( input : & LogicalPlan , subqueries : & [ LogicalPlan ] ) -> DFSchema {
@@ -284,6 +307,72 @@ impl Subquery {
284307 res
285308 } )
286309 }
310+
311+ /// Transform DataFusion schema according to subquery type
312+ pub fn transform_dfschema ( schema : & DFSchema , typ : SubqueryType ) -> DFSchema {
313+ match typ {
314+ SubqueryType :: Scalar => schema. clone ( ) ,
315+ SubqueryType :: Exists | SubqueryType :: AnyAll => {
316+ let new_fields = schema
317+ . fields ( )
318+ . iter ( )
319+ . map ( |field| {
320+ let new_field = Subquery :: transform_field ( field. field ( ) , typ) ;
321+ if let Some ( qualifier) = field. qualifier ( ) {
322+ DFField :: from_qualified ( qualifier, new_field)
323+ } else {
324+ DFField :: from ( new_field)
325+ }
326+ } )
327+ . collect ( ) ;
328+ DFSchema :: new_with_metadata ( new_fields, schema. metadata ( ) . clone ( ) )
329+ . unwrap ( )
330+ }
331+ }
332+ }
333+
334+ /// Transform Arrow field according to subquery type
335+ pub fn transform_field ( field : & Field , typ : SubqueryType ) -> Field {
336+ match typ {
337+ SubqueryType :: Scalar => field. clone ( ) ,
338+ SubqueryType :: Exists => Field :: new ( field. name ( ) , DataType :: Boolean , false ) ,
339+ // ANY/ALL subquery converts subquery result rows into a list
340+ // and uses existing code evaluating ANY with a list to evaluate the result
341+ SubqueryType :: AnyAll => {
342+ let item = Field :: new_dict (
343+ "item" ,
344+ field. data_type ( ) . clone ( ) ,
345+ true ,
346+ field. dict_id ( ) . unwrap_or ( 0 ) ,
347+ field. dict_is_ordered ( ) . unwrap_or ( false ) ,
348+ ) ;
349+ Field :: new ( field. name ( ) , DataType :: List ( Box :: new ( item) ) , false )
350+ }
351+ }
352+ }
353+ }
354+
355+ impl SubqueryNode {
356+ /// Creates a new SubqueryNode evaluating the schema based on subquery type
357+ pub fn new ( input : LogicalPlan , typ : SubqueryType ) -> Self {
358+ let schema = Subquery :: transform_dfschema ( input. schema ( ) , typ) ;
359+ Self {
360+ input : Arc :: new ( input) ,
361+ typ,
362+ schema : Arc :: new ( schema) ,
363+ }
364+ }
365+ }
366+
367+ impl Display for SubqueryType {
368+ fn fmt ( & self , f : & mut Formatter < ' _ > ) -> fmt:: Result {
369+ let name = match self {
370+ Self :: Scalar => "Scalar" ,
371+ Self :: Exists => "Exists" ,
372+ Self :: AnyAll => "AnyAll" ,
373+ } ;
374+ write ! ( f, "{}" , name)
375+ }
287376}
288377
289378/// Values expression. See
@@ -402,6 +491,8 @@ pub enum LogicalPlan {
402491 Limit ( Limit ) ,
403492 /// Evaluates correlated sub queries
404493 Subquery ( Subquery ) ,
494+ /// Single subquery node with subquery type
495+ SubqueryNode ( SubqueryNode ) ,
405496 /// Creates an external table.
406497 CreateExternalTable ( CreateExternalTable ) ,
407498 /// Creates an in memory table.
@@ -439,6 +530,7 @@ impl LogicalPlan {
439530 } ) => projected_schema,
440531 LogicalPlan :: Projection ( Projection { schema, .. } ) => schema,
441532 LogicalPlan :: Subquery ( Subquery { schema, .. } ) => schema,
533+ LogicalPlan :: SubqueryNode ( SubqueryNode { schema, .. } ) => schema,
442534 LogicalPlan :: Filter ( Filter { input, .. } ) => input. schema ( ) ,
443535 LogicalPlan :: Distinct ( Distinct { input } ) => input. schema ( ) ,
444536 LogicalPlan :: Window ( Window { schema, .. } ) => schema,
@@ -498,7 +590,8 @@ impl LogicalPlan {
498590 schemas. insert ( 0 , schema) ;
499591 schemas
500592 }
501- LogicalPlan :: Union ( Union { schema, .. } ) => {
593+ LogicalPlan :: Union ( Union { schema, .. } )
594+ | LogicalPlan :: SubqueryNode ( SubqueryNode { schema, .. } ) => {
502595 vec ! [ schema]
503596 }
504597 LogicalPlan :: Extension ( extension) => vec ! [ extension. node. schema( ) ] ,
@@ -569,6 +662,7 @@ impl LogicalPlan {
569662 | LogicalPlan :: Analyze { .. }
570663 | LogicalPlan :: Explain { .. }
571664 | LogicalPlan :: Subquery ( _)
665+ | LogicalPlan :: SubqueryNode ( _)
572666 | LogicalPlan :: Union ( _)
573667 | LogicalPlan :: Distinct ( _) => {
574668 vec ! [ ]
@@ -587,6 +681,7 @@ impl LogicalPlan {
587681 . into_iter ( )
588682 . chain ( subqueries. iter ( ) )
589683 . collect ( ) ,
684+ LogicalPlan :: SubqueryNode ( SubqueryNode { input, .. } ) => vec ! [ input] ,
590685 LogicalPlan :: Filter ( Filter { input, .. } ) => vec ! [ input] ,
591686 LogicalPlan :: Repartition ( Repartition { input, .. } ) => vec ! [ input] ,
592687 LogicalPlan :: Window ( Window { input, .. } ) => vec ! [ input] ,
@@ -735,6 +830,9 @@ impl LogicalPlan {
735830 }
736831 true
737832 }
833+ LogicalPlan :: SubqueryNode ( SubqueryNode { input, .. } ) => {
834+ input. accept ( visitor) ?
835+ }
738836 LogicalPlan :: Filter ( Filter { input, .. } ) => input. accept ( visitor) ?,
739837 LogicalPlan :: Repartition ( Repartition { input, .. } ) => {
740838 input. accept ( visitor) ?
@@ -1064,6 +1162,9 @@ impl LogicalPlan {
10641162 Ok ( ( ) )
10651163 }
10661164 LogicalPlan :: Subquery ( Subquery { .. } ) => write ! ( f, "Subquery" ) ,
1165+ LogicalPlan :: SubqueryNode ( SubqueryNode { typ, .. } ) => {
1166+ write ! ( f, "SubqueryNode: type={:?}" , typ)
1167+ }
10671168 LogicalPlan :: Filter ( Filter {
10681169 predicate : ref expr,
10691170 ..
0 commit comments