Skip to content

[ENH] Support correlated subqueries #320

@randerzander

Description

@randerzander

I'd like to be able to execute correlated subqueries.

It looks like the following example is successfully parsed by Calcite, but there's a logic error in mapping to Dask DataFrame calls:

import pandas as pd

df = pd.DataFrame({'id': [0, 1, 2], 'name': ['a', 'b', 'c'], 'val': [0, 1, 2]})

c.create_table('test', df)
c.sql("""
select name, val, id from test a
where val >
  (select avg(val) from test where id = a.id)
  """)

Result:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
/tmp/ipykernel_1522695/1713961665.py in <module>
      6 
      7 c.create_table('test', df)
----> 8 c.sql("""
      9 select name, val, id from test a
     10 where val >

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/context.py in sql(self, sql, return_futures, dataframes)
    436         rel, select_names, _ = self._get_ral(sql)
    437 
--> 438         dc = RelConverter.convert(rel, context=self)
    439 
    440         if dc is None:

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     58             f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     59         )
---> 60         df = plugin_instance.convert(rel, context=context)
     61         logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     62         return df

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/logical/project.py in convert(self, rel, context)
     27     ) -> DataContainer:
     28         # Get the input of the previous step
---> 29         (dc,) = self.assert_inputs(rel, 1, context)
     30 
     31         df = dc.df

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in assert_inputs(rel, n, context)
     83         from dask_sql.physical.rel.convert import RelConverter
     84 
---> 85         return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
     86 
     87     @staticmethod

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in <listcomp>(.0)
     83         from dask_sql.physical.rel.convert import RelConverter
     84 
---> 85         return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
     86 
     87     @staticmethod

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     58             f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     59         )
---> 60         df = plugin_instance.convert(rel, context=context)
     61         logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     62         return df

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/logical/join.py in convert(self, rel, context)
    186             )
    187             logger.debug(f"Additionally applying filter {filter_condition}")
--> 188             df = filter_or_scalar(df, filter_condition)
    189             dc = DataContainer(df, cc)
    190 

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask_sql/physical/rel/logical/filter.py in filter_or_scalar(df, filter_condition)
     32     # In SQL, a NULL in a boolean is False on filtering
     33     filter_condition = filter_condition.fillna(False)
---> 34     return df[filter_condition]
     35 
     36 

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask-2021.11.1+9.g43f33a32-py3.8.egg/dask/dataframe/core.py in __getitem__(self, key)
   4111                 from .multi import _maybe_align_partitions
   4112 
-> 4113                 self, key = _maybe_align_partitions([self, key])
   4114             dsk = partitionwise_graph(operator.getitem, name, self, key)
   4115             graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self, key])

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask-2021.11.1+9.g43f33a32-py3.8.egg/dask/dataframe/multi.py in _maybe_align_partitions(args)
    166     divisions = dfs[0].divisions
    167     if not all(df.divisions == divisions for df in dfs):
--> 168         dfs2 = iter(align_partitions(*dfs)[0])
    169         return [a if not isinstance(a, _Frame) else next(dfs2) for a in args]
    170     return args

~/conda/envs/dsql-dask-main/lib/python3.8/site-packages/dask-2021.11.1+9.g43f33a32-py3.8.egg/dask/dataframe/multi.py in align_partitions(*dfs)
    120         raise ValueError("dfs contains no DataFrame and Series")
    121     if not all(df.known_divisions for df in dfs1):
--> 122         raise ValueError(
    123             "Not all divisions are known, can't align "
    124             "partitions. Please use `set_index` "

ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.

Note: Edits were from adjusting the snippet and trace back to a plain DF input. I had tried w/ both Pandas & Dask DataFrame inputs, but got the same errors.

Metadata

Metadata

Assignees

No one assigned

    Labels

    SQL grammarImprovements to or issues with SQL syntaxenhancementNew feature or requestneeds triageAwaiting triage by a dask-sql maintainer

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions