Skip to content

Add support for correlated subquery #6140

@mustafasrepo

Description

@mustafasrepo

Is your feature request related to a problem or challenge?

When I run the query below

SELECT s.amount * (
         SELECT e.amount
         FROM sales_us AS e
         WHERE s.currency = e.currency AND
  		         s.ts >= e.ts
         ORDER BY e.ts DESC
         LIMIT 1
       ) AS amount_usd
FROM sales_global AS s
ORDER BY s.sn;

Datafusion returns

Error: NotImplemented("Physical plan does not support logical expression (<subquery>)")
Same query successfully runs on Postgre.

Describe the solution you'd like

I would like to have this feature.

Describe alternatives you've considered

No response

Additional context

To reproduce you can use test below

#[tokio::test]
async fn test_subquery() -> Result<()> {
    let config = SessionConfig::new()
        .with_target_partitions(1);
    let ctx = SessionContext::with_config(config);
    ctx.sql("CREATE TABLE sales_us (
          ts TIMESTAMP,
          currency VARCHAR(3),
          amount INT
        ) as VALUES
              ('2022-01-01 10:00:00'::timestamp, 'USD', 100.00),
              ('2022-01-01 11:00:00'::timestamp, 'USD', 200.00),
              ('2022-01-02 09:00:00'::timestamp, 'USD', 300.00),
              ('2022-01-02 10:00:00'::timestamp, 'USD', 150.00)").await?;
    ctx.sql("CREATE TABLE sales_global (
          ts TIMESTAMP,
          currency VARCHAR(3),
          amount INT
        ) as VALUES
          ('2022-01-01 08:00:00'::timestamp, 'EUR', 50.00),
          ('2022-01-01 11:30:00'::timestamp, 'EUR', 75.00),
          ('2022-01-02 12:00:00'::timestamp, 'EUR', 200.00),
          ('2022-01-03 10:00:00'::timestamp, 'EUR', 100.00)").await?;
    let sql = "SELECT s.amount * (
                 SELECT e.amount
                 FROM sales_us AS e
                 WHERE s.currency = e.currency AND
                         s.ts >= e.ts
                 ORDER BY e.ts DESC
                 LIMIT 1
               ) AS amount_usd
        FROM sales_global AS s
        ORDER BY s.sn
        ";

    let msg = format!("Creating logical plan for '{sql}'");
    let dataframe: DataFrame = ctx.sql(sql).await.expect(&msg);
    let physical_plan = dataframe.create_physical_plan().await?;
    let batches = collect(physical_plan, ctx.task_ctx()).await?;
    print_batches(&batches)?;
    Ok(())
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions