Skip to content

Moving to SQLAlchemy >= 1.4#8158

Merged
jsignell merged 14 commits intodask:mainfrom
McToel:main
Jan 25, 2022
Merged

Moving to SQLAlchemy >= 1.4#8158
jsignell merged 14 commits intodask:mainfrom
McToel:main

Conversation

@McToel
Copy link
Contributor

@McToel McToel commented Sep 19, 2021

Currently, dask does not work properly with SQLAlchemy 1.4. I am working on getting dask to work with SQLAlchemy 1.4. This is the state of my work, which does kind of work but is not yet properly tested and documented. It also seriously changes the API around sql io which will break code.

Right now, it is possible to pass a sql function as index_col or columns. If, for example, someone wants to cast the index to another type via a sql function, it could also be done using read_sql_query(sql=the_sql_to_do_the_casting). This "feature" however is very buggy right now and supporting this feature with SQLAlchemy 1.4 makes a big mess, so I suggest removing it. As my changes actually support sql queries (it was very buggy), all of those cast can be done using sql selects.

When breaking code anyway, I suggest to also make the dask sql io API more similar to the pandas sql io API.

What I want to change:

  • read_sql_table() will no longer support sql queries
  • read_sql_query() will handle SQLAlchemy queries
  • read_sql() will try to call read_sql_table() or read_sql_query() depending on the passed arguments (not yet implemented)
  • rename some arguments
  • it might be possible to support str sql queries (not yet implemented) (no, this will not work)

Before I implement, test and document all the changes, I would like to have some feedback on my suggestions.

@GPUtester
Copy link
Collaborator

Can one of the admins verify this patch?

@jsignell
Copy link
Member

cc @martindurant

@martindurant
Copy link
Member

Supporting this feature with SQLAlchemy 1.4 makes a big mess, so I suggest removing it.

Can you clarify this? We certainly wouldn't want to throw away functionality if we don't have to. We have no other way to support general SQL (as opposed to reading a table), because we need to be able to compose the partition selection logic, and that would be SQL dialect specific when using text.

@McToel
Copy link
Contributor Author

McToel commented Sep 21, 2021

I meant supporting sql functions passed to index_col and columns is a mess. That might have been unclear.

SQLALchemy 1.3 select objects had a .c / .column attribute. Columns have these too. In 1.4, select objects should now use .selected_columns instead. This makes it really hard to mix those two in the same way as it is right now. Especially as they have to be combined in a SQL query in the end.
It could be possible to convert columns to select statements, but I don't know how to combine them back into a single query, and I think that it might make the query less performant.

General SQL is supported through SQLAlchemy queries. Applying a SQL function to the index_col for example, can be done like this:

s = sql.select(
    [
        sa.cast(sql.column("number"), sa.types.BigInteger).label("number"),
        sql.column("name"),
    ]
).where(sql.column("number") >= 5).select_from(sql.table("test"))

out = read_sql_query(s, db, npartitions=2, index_col="number")

@martindurant
Copy link
Member

General SQL is supported through SQLAlchemy queries

but can we modify these to produce the partitioned sub-queries needed to build a dask dataframe? If yes, great!

@McToel
Copy link
Contributor Author

McToel commented Sep 21, 2021

At least it does work in that test. I will be trying to find a case to break it, but until now it works.

@jsignell jsignell linked an issue Oct 1, 2021 that may be closed by this pull request
@jsignell
Copy link
Member

What is the status of this @McToel and @martindurant? Are there still open discussions?

@martindurant
Copy link
Member

Thanks for the ping. There's quite a lot in this PR, and I am not an expert on sqlalchemy, but we have a very good reason to move ahead now, even if we drop support for slqlachemy<1.4

@McToel
Copy link
Contributor Author

McToel commented Jan 19, 2022

I think it is fully implemented. I basically left the old code in there and added Deprecation warnings to it. If someone is using sqlalchemy 1.3 or prior, it should still work but throw a warning. However, I wasn't sure how dask would normally handle deprecations and I think I might not have fully tested the deprecations.

And the test are failing right now because the tester is using sqlalchemy 1.3 I think.

@jsignell
Copy link
Member

Yeah since pandas is moving to >1.4 I think we can reasonably do that as well.

If it isn't too hard, it would definitely be better if we could go through a deprecation cycle. That would look like adding clear warnings with suggested actions and then altering the tests to expect those warnings on older versions.

@martindurant
Copy link
Member

I assume the win37 tests pass if we merge from main?

@jsignell
Copy link
Member

This win37 tests are failing on main, they should be ignored.

@martindurant
Copy link
Member

+1 from me, this is nicely done.

@jsignell
Copy link
Member

@McToel do you mind updating the environment.yamls so that the tests run with latest sqlalchemy?

@McToel
Copy link
Contributor Author

McToel commented Jan 21, 2022

I have no idea how the yamls work. So if someone else has the time to do it, it would be great

@martindurant
Copy link
Member

@jsignell
Copy link
Member

Sorry I wasn't very clear @McToel. Do you mind if I push to your branch? There are a few other environment.yaml files to change for other python versions.

@McToel
Copy link
Contributor Author

McToel commented Jan 21, 2022

I think I've done it myself, but if anything is missing, feel free to just contribute to my branch.

@jsignell
Copy link
Member

Ok, @McToel I just removed a file that has been deleted on main since this PR was first opened.

@jsignell
Copy link
Member

It looks like there is one issue with the int32 dtype:

================================== FAILURES ===================================
____________________________ test_query_with_meta _____________________________
[gw1] win32 -- Python 3.8.12 C:\Miniconda3\envs\test-environment\python.exe

db = 'sqlite:///C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpwxx4978a'

    def test_query_with_meta(db):
        from sqlalchemy import sql
    
        data = {
            "name": pd.Series([], name="name", dtype="str"),
            "age": pd.Series([], name="age", dtype="int"),
        }
        index = pd.Index([], name="number", dtype="int")
        meta = pd.DataFrame(data, index=index)
    
        s1 = sql.select(
            [sql.column("number"), sql.column("name"), sql.column("age")]
        ).select_from(sql.table("test"))
        out = read_sql_query(s1, db, npartitions=2, index_col="number", meta=meta)
>       assert_eq(out, df[["name", "age"]])

dask\dataframe\io\tests\test_sql.py:433: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

a =            name  age
number              
0         Alice   33
1           Bob   40
2         Chris   22
3          Dora   16
4         Edith   53
5       Francis   30
6       Garreth   20
b =            name  age
number              
0         Alice   33
1           Bob   40
2         Chris   22
3          Dora   16
4         Edith   53
5       Francis   30
6       Garreth   20
check_names = True, check_dtype = True, check_divisions = True
check_index = True, scheduler = 'sync', kwargs = {}

    def assert_eq(
        a,
        b,
        check_names=True,
        check_dtype=True,
        check_divisions=True,
        check_index=True,
        scheduler="sync",
        **kwargs,
    ):
        if check_divisions:
            assert_divisions(a, scheduler=scheduler)
            assert_divisions(b, scheduler=scheduler)
            if hasattr(a, "divisions") and hasattr(b, "divisions"):
                at = type(np.asarray(a.divisions).tolist()[0])  # numpy to python
                bt = type(np.asarray(b.divisions).tolist()[0])  # scalar conversion
                assert at == bt, (at, bt)
        assert_sane_keynames(a)
        assert_sane_keynames(b)
        a = _check_dask(
            a, check_names=check_names, check_dtypes=check_dtype, scheduler=scheduler
        )
        b = _check_dask(
            b, check_names=check_names, check_dtypes=check_dtype, scheduler=scheduler
        )
        if hasattr(a, "to_pandas"):
            a = a.to_pandas()
        if hasattr(b, "to_pandas"):
            b = b.to_pandas()
        if isinstance(a, (pd.DataFrame, pd.Series)):
            a = _maybe_sort(a, check_index)
            b = _maybe_sort(b, check_index)
        if not check_index:
            a = a.reset_index(drop=True)
            b = b.reset_index(drop=True)
        if isinstance(a, pd.DataFrame):
>           tm.assert_frame_equal(
                a, b, check_names=check_names, check_dtype=check_dtype, **kwargs
E               AssertionError: Attributes of DataFrame.iloc[:, 1] (column name="age") are different
E               
E               Attribute "dtype" are different
E               [left]:  int32
E               [right]: int64

dask\dataframe\utils.py:560: AssertionError

ref: https://github.com/dask/dask/runs/4925139130?check_suite_focus=true

@McToel
Copy link
Contributor Author

McToel commented Jan 24, 2022

That's interesting. On my platform (Ubuntu, 3.8) it works perfectly fine. I switched to python 3.9 and removed df.append which was causing a deprecation warning. But I don't have a dev Windows machine right now, so it is a little hard for me to debug those errors. @jsignell do you have I Windows machine?

@jsignell
Copy link
Member

I don't have a windows machine, but I think we should merge this and fix the windows issue as a follow-up. I am going to push a skip for the win32 case.

@jsignell
Copy link
Member

I'm looking into how to filter all the warnings for python 3.7

@McToel
Copy link
Contributor Author

McToel commented Jan 25, 2022

Thanks a lot! The only failing test is the docs build, and that seems to be unrelated to the changes in this branch. If @jsignell and @martindurant are happy with it, I think it should be merged.

@jsignell
Copy link
Member

I just merged main into this branch to try to get CI working properly. Thanks for being so patient on this @McToel, it should be in soon.

@jsignell
Copy link
Member

Ok I feel persuaded that this is an improvement. Thanks @McToel for taking on this work!

@jsignell jsignell merged commit 95e1cf3 into dask:main Jan 25, 2022
@jakirkham jakirkham mentioned this pull request Mar 17, 2022
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support for sqlalchemy>= 1.4.0

4 participants