-
Notifications
You must be signed in to change notification settings - Fork 301
Add API for mem-constant DB streams, Postgres implementation #1298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This code is based on that contained in: #726 This commit has two components: 1. A new backend class, `PersistQueryStream`, for backends which *may* allow for constant-memory streaming of query results. This is intended to replace the use of `selectSource` from `PersistQueryRead`, which is "broken" in the sense that the `ConduitT` it returns rarely actually does any streaming. 2. An implementation of `PersistQueryStream` for `SqlBackend` which is based on an internal field, `connPrepareCursor`, which allows for backends to provide a way to run queries using an internal cursor which iterates over rows as requested, instead of returning them all at once. This internal field is provided for the Postgres version of `SqlBackend`, using functions from `postgresql-simple`. This code differs in an important way from the above PR in that it solves a memory issue in the way the `ConduitT` was constructed that meant that the implementation using cursors was also not memory-constant. This implementation is memory-constant, tested to large sets of results. Co-authored-by: Tim Stapels <tim@supercede.com>
|
Interestingly, this code has some odd performance characteristics that I can't figure out the reason for. Calling |
|
@parsonsmatt Hope you don't mind me pinging you for review on this — just thought you'd be a likely candidate as it's closely related to |
parsonsmatt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So -
Streaming rows from the database in a safe, composable, and fast way is apparently a hard problem.
This seems like a good solution! But you're noticing that the persistent-pagination approach is faster, anyway. I have no idea why that would be the case. And the tests don't cover a lot of the gnarly concurrency stuff that plague the other attempts I've seen.
So, I'm wondering - can this functionality be exposed from a 3rd party library, like persistent-postgresql-streaming? If not, what functionality do we need to export from persistent to support that library's creation?
Once persistent-postgresql-streaming is 'battle tested' and known to be safe, efficient, and awesome, then I'd be happy to merge it upstream. However, until it's known good, I'd prefer to err on the side of safety and not introduce features that may need to be deprecated or marked as unsafe, all due to the tricky vagaries of concurrency and foreign code.
I do appreciate the approach and the effort you spent, and I apologize for the delay in making this review.
| specs :: Spec | ||
| specs = describe "Streaming rows" $ do | ||
| describe "selectStream" $ before_ wipe $ do | ||
| itDb "streams rows" $ do | ||
| runConduit $ numbers .| CL.chunksOf 1000 .| CC.mapM_ (void . insertMany) | ||
|
|
||
| let resultStream = selectStream [] [ Asc StreamableNumberVal ] | ||
|
|
||
| let checkIncrementing expected (Entity _ (StreamableNumber actual)) = do | ||
| expected `shouldBe` actual | ||
| pure $ expected + 1 | ||
|
|
||
| result <- runConduit $ resultStream .| CC.foldM checkIncrementing 1 | ||
|
|
||
| result `shouldBe` numberOfRows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll double check it. I believe the nature of cursor queries should automatically avoid some of these concurrency issues, since streaming really fetches batches of rows from the cursor by name.
It might be necessary to make cursor statements bypass the connStmtMap, since the randomly-generated cursor name isn't included in the prepared query.
Also note that this code is largely based on #726 - the main changes being a fix to a memory leak in that code, and the difference in the way cursor statements are used by SqlBackend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm struggling to follow the exact control flow, but: I believe this implementation is actually immune to some of these concurrency issues.
Testing has shown that certainly the single-threaded version behaves as expected. Conduits which simultaneously run the same query in a single-threaded application (one of the problematic uses mentioned in #981) yield results in the expected way, and don't affect each other. I think this is for the reason I mentioned above - each call to withCursorStmt' will cause a new temporary cursor to be created, the name of which is then used to grab rows in chunks.
The multi-threaded version might well still have issues - but testing that can wait until persistent-postgresql-streaming is spun off.
| * Added a new class, `PersistQueryStream`, and a new function, `selectStream`, | ||
| for backends which *may* stream results in constant memory. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From an approach standpoint - I'd prefer not to add new type classes.
This PR does everything right in terms of following how the code base currently does stuff, but I think the current way is bad and probably not worth continuing. I'd like to explore an alternative that can satisfy the desire for polymorphism while being a bit easier to write and nicer to maintain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote up #1302 to philosophize on this. I don't think it should hold things back though.
| -> [SelectOpt record] | ||
| -> ConduitM () (Entity record) (ReaderT backend m) () | ||
| selectSourceStream filts opts = do | ||
| srcRes <- lift $ selectSourceRes filts opts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd prefer to deprecate selectSourceRes and not see it used as a default - if someone sees this instance available, they should know that it works in a way that selectSourceRes does not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eg - an instance PersistQueryStream MongoContext would compile just fine, but it would probably fail to work in a reasonable way. That seems bad to me.
| getStmtConn' :: Bool -> SqlBackend -> Text -> IO Statement | ||
| getStmtConn' useCursorIfPossible conn sql = do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really a fan of Bool parameters when we have some specific intent - it makes it hard to read at the use-sites. Introducing a separate data type that specifies the behavior would be preferable IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been replaced with a sum type.
| -- streaming the results in a memory-constant way. | ||
| -- | ||
| -- @since 2.13.2.0 | ||
| class (PersistQueryRead backend) => PersistQueryStream backend where |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think adding this class makes the expectations around selectSource really weird - that's a class method on PersistQuery, so why do we have a separate method here?
| Just prepareCursor | useCursorIfPossible -> prepareCursor sql | ||
| _ -> connPrepare conn sql |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this is where we switch - if we have connPrepareCursor = Just ... and we're trying to stream, then we do this thing. otherwise, we do the old thing.
| withCursorStmt' :: MonadIO m | ||
| => PG.Connection | ||
| -> PG.Query | ||
| -> [PersistValue] | ||
| -> Acquire (ConduitM () [PersistValue] m ()) | ||
| withCursorStmt' conn query vals = | ||
| foldWithCursor `fmap` mkAcquire openC closeC | ||
| where | ||
| openC = do | ||
| rawquery <- liftIO $ PG.formatQuery conn query (map P vals) | ||
| PGC.declareCursor conn (PG.Query rawquery) | ||
| closeC = PGC.closeCursor | ||
| foldWithCursor cursor = go | ||
| where | ||
| go = do | ||
| -- 256 is the default chunk size used for fetching | ||
| rows <- liftIO $ PGC.foldForward cursor 256 processRow [] | ||
| case rows of | ||
| Left final -> CL.sourceList final | ||
| Right nonfinal -> CL.sourceList nonfinal >> go | ||
| processRow s row = pure $ s <> [map unP row] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation looks fine to me
This was part of the feedback on #1298.
I haven't tried this out yet, but I think the biggest obstacle is the inability to go from the getPGConnection :: ReaderT SqlBackend m Connectionwhich would be required to call There is, as far as I can tell, no easy way to get around that with the current polymorphic data SqlBackend = SqlBackend
{ ...
, connPostgreSQLConnection :: Maybe Connection
}Another is for |
|
Makes sense - there was some work done for the SQLite backend to expose the 'raw' connection. If exposing the raw Postgres connection (in a similar way) works, then we can get that out the door for y'all relatively quickly and easily. |
As part of the discussion in yesodweb#1298, it was found that external libraries which want to hook into Persistent's Postgres API are limited by their inability to access the postgresql-simple `Connection` which backs a Postgres `SqlBackend`. A similar issue in the past with Sqlite was solved by the `RawSqlite` wrapper, which is able to expose the raw connection as well as be compatible with the wrapped `SqlBackend` that it stores. This adds an equivalent wrapper to persistent-postgresql, which exposes the raw connection, and therefore allows third-party libraries to access it.
* Add RawPostgresql backend which exposes connection As part of the discussion in #1298, it was found that external libraries which want to hook into Persistent's Postgres API are limited by their inability to access the postgresql-simple `Connection` which backs a Postgres `SqlBackend`. A similar issue in the past with Sqlite was solved by the `RawSqlite` wrapper, which is able to expose the raw connection as well as be compatible with the wrapped `SqlBackend` that it stores. This adds an equivalent wrapper to persistent-postgresql, which exposes the raw connection, and therefore allows third-party libraries to access it. * Run stylish-haskell on Postgresql.hs * Add ChangeLog entry for 2.13.1.0 * Remove microlens dependency, expose RawPostgresql fields This was requested as part of the review on #1305. Co-authored-by: Matt Parsons <parsonsmatt@gmail.com> * Fix backwards-compatibility of Postgresql.hs Co-authored-by: Matt Parsons <parsonsmatt@gmail.com>
* Expose orderClause from PersistQuery.hs This is part of the followup to #1298. This exposes `orderClause` in a similar manner to `filterClause`, to help third-party libraries which want to format their own SQL queries using Persistent's internal helpers. It rewrites `orderClause` a bit to bring it in line with the other exposed functions, but there should be no functional changes as a result. * Version bump for PR #1317
* Expose PSQL parser helpers in new 'Internal' module This is part of the followup to #1298. The code exposed in this new `Database.Persist.Postgresql.Internal` module is very helpful to third-party libraries which want to use functions from `postgresql-simple` to produce `PersistValue`s for feeding to `persistent` functions. Exposing them is necessary to allow for a third-party library which provides streaming of Persistent entities using Postgres cursors. * stylish-haskell pass for PR * Version bump for PR #1316
|
The code from this PR is now up in a separate library. |
This code is based on that contained in:
#726
This PR has two components:
PersistQueryStream, for backends which may allow for constant-memory streaming of query results. This is intended to replace the use ofselectSourcefromPersistQueryRead, which is "broken" in the sense that theConduitTit returns rarely actually does any streaming.PersistQueryStreamforSqlBackendwhich is based on an internal field,connPrepareCursor, which allows for backends to provide a way to run queries using an internal cursor which iterates over rows as requested, instead of returning them all at once. This internal field is provided for the Postgres version ofSqlBackend, using functions frompostgresql-simple.This code differs in an important way from the above PR in that it solves a memory issue in the way the
ConduitTwas constructed that meant that the implementation using cursors was also not memory-constant. This implementation is memory-constant, tested to large sets of results.@timds
Before submitting your PR, check that you've:
@sincedeclarations to the Haddockstylish-haskellon any changed files..editorconfigfile for details)After submitting your PR:
(unreleased)on the Changelog