using System; using System.Buffers; using System.Collections; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Data; using System.Data.Common; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.IO; using System.Runtime.CompilerServices; using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Npgsql.BackendMessages; using Npgsql.Internal; using Npgsql.Internal.Converters; using Npgsql.PostgresTypes; using Npgsql.Schema; using NpgsqlTypes; using static Npgsql.Util.Statics; namespace Npgsql; /// /// Reads a forward-only stream of rows from a data source. /// #pragma warning disable CA1010 public sealed class NpgsqlDataReader : DbDataReader, IDbColumnSchemaGenerator #pragma warning restore CA1010 { static readonly Task TrueTask = Task.FromResult(true); static readonly Task FalseTask = Task.FromResult(false); internal NpgsqlCommand Command { get; private set; } = default!; internal NpgsqlConnector Connector { get; } NpgsqlConnection? _connection; /// /// The behavior of the command with which this reader was executed. /// CommandBehavior _behavior; /// /// The task for writing this command's messages. Awaited on reader cleanup. /// Task? _sendTask; internal ReaderState State = ReaderState.Disposed; internal NpgsqlReadBuffer Buffer = default!; PgReader PgReader => Buffer.PgReader; /// /// Holds the list of statements being executed by this reader. /// List _statements = default!; /// /// The index of the current query resultset we're processing (within a multiquery) /// internal int StatementIndex { get; private set; } /// /// Records, for each column, its starting offset and length in the current row. /// Used only in non-sequential mode. /// readonly List<(int Offset, int Length)> _columns = []; int _columnsStartPos; /// /// The index of the column that we're on, i.e. that has already been parsed, /// is memory and can be retrieved. Initialized to -1, which means we're on the column /// count (which comes before the first column). /// int _column; /// /// The position in the buffer at which the current data row message ends. /// Used only when the row is consumed non-sequentially. /// int _dataMsgEnd; /// /// Determines, if we can consume the row non-sequentially. /// Mostly useful for a sequential mode, when the row is already in the buffer. /// Should always be true for the non-sequential mode. /// bool _isRowBuffered; /// /// Gets or sets whether the current row is fully buffered in memory. /// When , async reads will go through the real async converter path rather than the sync shortcut. /// /// Settable for testing purposes. internal bool IsRowBuffered { get => _isRowBuffered; set => _isRowBuffered = value; } /// /// The RowDescription message for the current resultset being processed /// internal RowDescriptionMessage? RowDescription; int ColumnCount => RowDescription!.Count; /// /// Stores the last converter info resolved by column, to speed up repeated reading. /// ColumnInfo[]? ColumnInfoCache { get; set; } ulong? _recordsAffected; /// /// Whether the current result set has rows /// bool _hasRows; /// /// Is raised whenever Close() is called. /// public event EventHandler? ReaderClosed; bool _isSchemaOnly; bool _isSequential; internal NpgsqlNestedDataReader? CachedFreeNestedDataReader; long _startTimestamp; readonly ILogger _commandLogger; internal NpgsqlDataReader(NpgsqlConnector connector) { Connector = connector; _commandLogger = connector.CommandLogger; } internal void Init( NpgsqlCommand command, CommandBehavior behavior, List statements, long startTimestamp = 0, Task? sendTask = null) { Debug.Assert(ColumnInfoCache is null); Command = command; _connection = command.InternalConnection; _behavior = behavior; _isSchemaOnly = _behavior.HasFlag(CommandBehavior.SchemaOnly); _isSequential = _behavior.HasFlag(CommandBehavior.SequentialAccess); _statements = statements; StatementIndex = -1; _sendTask = sendTask; State = ReaderState.BetweenResults; _recordsAffected = null; _startTimestamp = startTimestamp; } #region Read /// /// Advances the reader to the next record in a result set. /// /// true if there are more rows; otherwise false. /// /// The default position of a data reader is before the first record. Therefore, you must call Read to begin accessing data. /// public override bool Read() { ThrowIfClosedOrDisposed(); return TryRead()?.Result ?? Read(false).GetAwaiter().GetResult(); } /// /// This is the asynchronous version of /// /// /// An optional token to cancel the asynchronous operation. The default value is . /// /// A task representing the asynchronous operation. public override Task ReadAsync(CancellationToken cancellationToken) { ThrowIfClosedOrDisposed(); return TryRead() ?? Read(async: true, cancellationToken); } // This is an optimized execution path that avoids calling any async methods for the (usual) // case where the next row (or CommandComplete) is already in memory. Task? TryRead() { switch (State) { case ReaderState.BeforeResult: // First Read() after NextResult. Data row has already been processed. State = ReaderState.InResult; return TrueTask; case ReaderState.InResult: break; default: return FalseTask; } // We have a special case path for SingleRow. if (_behavior.HasFlag(CommandBehavior.SingleRow) || !_isRowBuffered) return null; ConsumeBufferedRow(); const int headerSize = sizeof(byte) + sizeof(int); var buffer = Buffer; var readPosition = buffer.ReadPosition; var bytesLeft = buffer.FilledBytes - readPosition; if (bytesLeft < headerSize) return null; var messageCode = (BackendMessageCode)buffer.ReadByte(); var len = buffer.ReadInt32() - sizeof(int); // Transmitted length includes itself var isDataRow = messageCode is BackendMessageCode.DataRow; // sizeof(short) is for the number of columns var sufficientBytes = isDataRow && _isSequential ? headerSize + sizeof(short) : headerSize + len; if (bytesLeft < sufficientBytes || !isDataRow && (_statements[StatementIndex].AppendErrorBarrier ?? Command.EnableErrorBarriers) // Could be an error, let main read handle it. || Connector.ParseResultSetMessage(buffer, messageCode, len) is not { } msg) { buffer.ReadPosition = readPosition; return null; } ProcessMessage(msg); return isDataRow ? TrueTask : FalseTask; } async Task Read(bool async, CancellationToken cancellationToken = default) { using var registration = Connector.StartNestedCancellableOperation(cancellationToken); try { switch (State) { case ReaderState.BeforeResult: // First Read() after NextResult. Data row has already been processed. State = ReaderState.InResult; return true; case ReaderState.InResult: await ConsumeRow(async).ConfigureAwait(false); if (_behavior.HasFlag(CommandBehavior.SingleRow)) { // TODO: See optimization proposal in #410 await Consume(async).ConfigureAwait(false); return false; } break; case ReaderState.BetweenResults: case ReaderState.Consumed: case ReaderState.Closed: case ReaderState.Disposed: return false; default: ThrowHelper.ThrowArgumentOutOfRangeException(); return false; } var msg = await ReadMessage(async).ConfigureAwait(false); switch (msg.Code) { case BackendMessageCode.DataRow: ProcessMessage(msg); return true; case BackendMessageCode.CommandComplete: case BackendMessageCode.EmptyQueryResponse: ProcessMessage(msg); if (_statements[StatementIndex].AppendErrorBarrier ?? Command.EnableErrorBarriers) Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector); return false; default: throw Connector.UnexpectedMessageReceived(msg.Code); } } catch { // Break may have progressed the reader already. if (State is not ReaderState.Closed) State = ReaderState.Consumed; throw; } } ValueTask ReadMessage(bool async) { return _isSequential ? ReadMessageSequential(Connector, async) : Connector.ReadMessage(async); static async ValueTask ReadMessageSequential(NpgsqlConnector connector, bool async) { var msg = await connector.ReadMessage(async, DataRowLoadingMode.Sequential).ConfigureAwait(false); if (msg.Code == BackendMessageCode.DataRow) { // Make sure that the datarow's column count is already buffered await connector.ReadBuffer.Ensure(2, async).ConfigureAwait(false); return msg; } return msg; } } #endregion #region NextResult /// /// Advances the reader to the next result when reading the results of a batch of statements. /// /// public override bool NextResult() { ThrowIfClosedOrDisposed(); return (_isSchemaOnly ? NextResultSchemaOnly(false) : NextResult(false)) .GetAwaiter().GetResult(); } /// /// This is the asynchronous version of NextResult. /// /// /// An optional token to cancel the asynchronous operation. The default value is . /// /// A task representing the asynchronous operation. public override Task NextResultAsync(CancellationToken cancellationToken) { ThrowIfClosedOrDisposed(); return _isSchemaOnly ? NextResultSchemaOnly(async: true, cancellationToken: cancellationToken) : NextResult(async: true, cancellationToken: cancellationToken); } /// /// Internal implementation of NextResult /// async Task NextResult(bool async, bool isConsuming = false, CancellationToken cancellationToken = default) { Debug.Assert(!_isSchemaOnly); if (State is ReaderState.Consumed) return false; try { using var registration = isConsuming ? default : Connector.StartNestedCancellableOperation(cancellationToken); // If we're in the middle of a resultset, consume it if (State is ReaderState.BeforeResult or ReaderState.InResult) await ConsumeResultSet(async).ConfigureAwait(false); Debug.Assert(State is ReaderState.BetweenResults); _hasRows = false; var statements = _statements; var statementIndex = StatementIndex; if (statementIndex >= 0) { if (RowDescription is { } description && statements[statementIndex].IsPrepared && ColumnInfoCache is { } cache) description.SetColumnInfoCache(new(cache, 0, ColumnCount)); if (statementIndex is 0 && _behavior.HasFlag(CommandBehavior.SingleResult) && !isConsuming) { await Consume(async).ConfigureAwait(false); return false; } } // We are now at the end of the previous result set. Read up to the next result set, if any. // Non-prepared statements receive ParseComplete, BindComplete, DescriptionRow/NoData, // prepared statements receive only BindComplete for (statementIndex = ++StatementIndex; statementIndex < statements.Count; statementIndex = ++StatementIndex) { var statement = statements[statementIndex]; IBackendMessage msg; if (statement.TryGetPrepared(out var preparedStatement)) { Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector); RowDescription = preparedStatement.Description; } else // Non-prepared/preparing flow { preparedStatement = statement.PreparedStatement; if (preparedStatement != null) { Debug.Assert(!preparedStatement.IsPrepared); if (preparedStatement.StatementBeingReplaced != null) { Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector); preparedStatement.StatementBeingReplaced.CompleteUnprepare(); preparedStatement.StatementBeingReplaced = null; } } Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector); if (statement.IsPreparing) { preparedStatement!.State = PreparedState.Prepared; Connector.PreparedStatementManager.NumPrepared++; statement.IsPreparing = false; } Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector); msg = await Connector.ReadMessage(async).ConfigureAwait(false); RowDescription = statement.Description = msg.Code switch { BackendMessageCode.NoData => null, // RowDescription messages are cached on the connector, but if we're auto-preparing, we need to // clone our own copy which will last beyond the lifetime of this invocation. BackendMessageCode.RowDescription => preparedStatement == null ? (RowDescriptionMessage)msg : ((RowDescriptionMessage)msg).Clone(), _ => throw Connector.UnexpectedMessageReceived(msg.Code) }; } if (RowDescription is not null) { if (ColumnInfoCache?.Length >= ColumnCount) Array.Clear(ColumnInfoCache, 0, ColumnCount); else { if (ColumnInfoCache is { } cache) ArrayPool.Shared.Return(cache, clearArray: true); ColumnInfoCache = ArrayPool.Shared.Rent(ColumnCount); } if (statement.IsPrepared) RowDescription.LoadColumnInfoCache(Connector.SerializerOptions, ColumnInfoCache); } else { // Statement did not generate a resultset (e.g. INSERT) // Read and process its completion message and move on to the next statement // No need to read sequentially as it's not a DataRow msg = await Connector.ReadMessage(async).ConfigureAwait(false); switch (msg.Code) { case BackendMessageCode.CommandComplete: case BackendMessageCode.EmptyQueryResponse: break; case BackendMessageCode.CopyInResponse: throw Connector.Break(new NotSupportedException( "COPY isn't supported in regular command execution - see https://www.npgsql.org/doc/copy.html for documentation on COPY with Npgsql. " + "If you are trying to execute a SQL script created by pg_dump, pass the '--inserts' switch to disable generating COPY statements.")); case BackendMessageCode.CopyOutResponse: throw Connector.Break(new NotSupportedException( "COPY isn't supported in regular command execution - see https://www.npgsql.org/doc/copy.html for documentation on COPY with Npgsql.")); default: throw Connector.UnexpectedMessageReceived(msg.Code); } ProcessMessage(msg); if (statement.AppendErrorBarrier ?? Command.EnableErrorBarriers) Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector); continue; } if ((Command.WrappingBatch is not null || StatementIndex is 0) && Command.InternalBatchCommands[StatementIndex] is { HasOutputParameters: true } command) { // If output parameters are present and this is the first row of the resultset, // we must always read it in non-sequential mode because it will be traversed twice (once // here for the parameters, then as a regular row). msg = await Connector.ReadMessage(async, dataRowLoadingMode: DataRowLoadingMode.NonSequential).ConfigureAwait(false); ProcessMessage(msg); if (msg.Code == BackendMessageCode.DataRow) { Debug.Assert(RowDescription != null); Debug.Assert(State == ReaderState.BeforeResult); try { // Temporarily set our state to InResult and non-sequential to allow us to read the values, and in any order. var isSequential = _isSequential; var currentPosition = Buffer.ReadPosition; State = ReaderState.InResult; _isSequential = false; try { command.PopulateOutputParameters(this, _commandLogger); // On success we want to revert any row and column state for the user to be able to read the same row again. if (async) await PgReader.CommitAsync().ConfigureAwait(false); else PgReader.Commit(); State = ReaderState.BeforeResult; // Set the state back Buffer.ReadPosition = currentPosition; // Restore position _column = -1; } finally { // To be on the safe side we always revert this CommandBehavior state change, including on failure. _isSequential = isSequential; } } catch (Exception e) { // TODO: ideally we should flow down to global exception filter and consume there await Consume(async, firstException: e).ConfigureAwait(false); throw; } } } else { msg = await ReadMessage(async).ConfigureAwait(false); ProcessMessage(msg); } switch (msg.Code) { case BackendMessageCode.DataRow: Connector.State = ConnectorState.Fetching; return true; case BackendMessageCode.CommandComplete: if (statement.AppendErrorBarrier ?? Command.EnableErrorBarriers) Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector); return true; default: Connector.UnexpectedMessageReceived(msg.Code); break; } } // There are no more queries, we're done. Read the RFQ. if (_statements.Count is 0 || !(_statements[^1].AppendErrorBarrier ?? Command.EnableErrorBarriers)) Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector); State = ReaderState.Consumed; RowDescription = null; return false; } catch (Exception e) { if (e is PostgresException postgresException && StatementIndex >= 0 && StatementIndex < _statements.Count) { var statement = _statements[StatementIndex]; // Reference the triggering statement from the exception if (Connector.Settings.IncludeFailedBatchedCommand) postgresException.BatchCommand = statement; // Prevent the command or batch from being recycled (by the connection) when it's disposed. This is important since // the exception is very likely to escape the using statement of the command, and by that time some other user may // already be using the recycled instance. // TODO: we probably should do than even if it's not PostgresException (error from PopulateOutputParameters) Command.IsCacheable = false; // If the schema of a table changes after a statement is prepared on that table, PostgreSQL errors with // 0A000: cached plan must not change result type. 0A000 seems like a non-specific code, but it's very unlikely the // statement would successfully execute anyway, so invalidate the prepared statement. if (postgresException.SqlState == PostgresErrorCodes.FeatureNotSupported && statement.PreparedStatement is { } preparedStatement) { preparedStatement.State = PreparedState.Invalidated; Command.ResetPreparation(); } } // For the statement that errored, if it was being prepared we need to update our bookkeeping to put them back in unprepared // state. for (; StatementIndex < _statements.Count; StatementIndex++) { var statement = _statements[StatementIndex]; if (statement.IsPreparing) { statement.IsPreparing = false; statement.PreparedStatement!.AbortPrepare(); } // In normal, non-isolated batching, we've consumed the result set and are done. // However, if the command has error barrier, we now have to consume results from the commands after it (unless it's the // last one). // Note that Consume calls NextResult (this method) recursively, the isConsuming flag tells us we're in this mode. // TODO: We might as well call Consume on every command (even the last one) to make sure we do read every single message until RFQ // in case we get an exception in the middle of NextResult if ((statement.AppendErrorBarrier ?? Command.EnableErrorBarriers) && StatementIndex < _statements.Count - 1) { if (isConsuming) throw; switch (State) { case ReaderState.Consumed: case ReaderState.Closed: case ReaderState.Disposed: // The exception may have caused the connector to break (e.g. I/O), and so the reader is already closed. break; default: // We provide Consume with the first exception which we've just caught. // If it encounters other exceptions while consuming the rest of the result set, it will raise an AggregateException, // otherwise it will rethrow this first exception. await Consume(async, firstException: e).ConfigureAwait(false); break; // Never reached, Consume always throws above } } } // Break may have progressed the reader already. if (State is not ReaderState.Closed) State = ReaderState.Consumed; throw; } async ValueTask ConsumeResultSet(bool async) { await ConsumeRow(async).ConfigureAwait(false); while (true) { var completedMsg = await Connector.ReadMessage(async, DataRowLoadingMode.Skip).ConfigureAwait(false); switch (completedMsg.Code) { case BackendMessageCode.CommandComplete: case BackendMessageCode.EmptyQueryResponse: ProcessMessage(completedMsg); var statement = _statements[StatementIndex]; if (statement.IsPrepared && ColumnInfoCache is not null) RowDescription!.SetColumnInfoCache(new(ColumnInfoCache, 0, ColumnCount)); if (statement.AppendErrorBarrier ?? Command.EnableErrorBarriers) Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector); break; default: // TODO if we hit an ErrorResponse here (PG doesn't do this *today*) we should probably throw. continue; } break; } } } /// /// Note that in SchemaOnly mode there are no resultsets, and we read nothing from the backend (all /// RowDescriptions have already been processed and are available) /// async Task NextResultSchemaOnly(bool async, bool isConsuming = false, CancellationToken cancellationToken = default) { Debug.Assert(_isSchemaOnly); if (State is ReaderState.Consumed) return false; using var registration = isConsuming ? default : Connector.StartNestedCancellableOperation(cancellationToken); try { for (StatementIndex++; StatementIndex < _statements.Count; StatementIndex++) { var statement = _statements[StatementIndex]; if (statement.TryGetPrepared(out var preparedStatement)) { // Row descriptions have already been populated in the statement objects at the // Prepare phase RowDescription = preparedStatement.Description; } else { var pStatement = statement.PreparedStatement; if (pStatement != null) { Debug.Assert(!pStatement.IsPrepared); if (pStatement.StatementBeingReplaced != null) { Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector); pStatement.StatementBeingReplaced.CompleteUnprepare(); pStatement.StatementBeingReplaced = null; } } Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector); if (statement.IsPreparing) { pStatement!.State = PreparedState.Prepared; Connector.PreparedStatementManager.NumPrepared++; statement.IsPreparing = false; } Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector); var msg = await Connector.ReadMessage(async).ConfigureAwait(false); switch (msg.Code) { case BackendMessageCode.NoData: RowDescription = _statements[StatementIndex].Description = null; break; case BackendMessageCode.RowDescription: // We have a resultset // RowDescription messages are cached on the connector, but if we're auto-preparing, we need to // clone our own copy which will last beyond the lifetime of this invocation. RowDescription = _statements[StatementIndex].Description = preparedStatement == null ? (RowDescriptionMessage)msg : ((RowDescriptionMessage)msg).Clone(); Command.FixupRowDescription(RowDescription, StatementIndex == 0); break; default: throw Connector.UnexpectedMessageReceived(msg.Code); } var forall = true; for (var i = StatementIndex + 1; i < _statements.Count; i++) if (!_statements[i].IsPrepared) { forall = false; break; } // There are no more queries, we're done. Read to the RFQ. if (forall) Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector); } // Found a resultset if (RowDescription is not null) return true; } State = ReaderState.Consumed; RowDescription = null; return false; } catch (Exception e) { // Break may have progressed the reader already. if (State is not ReaderState.Closed) State = ReaderState.Consumed; // Reference the triggering statement from the exception if (e is PostgresException postgresException && StatementIndex >= 0 && StatementIndex < _statements.Count) { // Reference the triggering statement from the exception if (Connector.Settings.IncludeFailedBatchedCommand) postgresException.BatchCommand = _statements[StatementIndex]; // Prevent the command or batch from being recycled (by the connection) when it's disposed. This is important since // the exception is very likely to escape the using statement of the command, and by that time some other user may // already be using the recycled instance. Command.IsCacheable = false; } // An error means all subsequent statements were skipped by PostgreSQL. // If any of them were being prepared, we need to update our bookkeeping to put // them back in unprepared state. for (; StatementIndex < _statements.Count; StatementIndex++) { var statement = _statements[StatementIndex]; if (statement.IsPreparing) { statement.IsPreparing = false; statement.PreparedStatement!.AbortPrepare(); } } throw; } } #endregion #region ProcessMessage internal void ProcessMessage(IBackendMessage msg) { if (msg.Code is not BackendMessageCode.DataRow) { HandleUncommon(msg); return; } var dataRow = (DataRowMessage)msg; // The connector's buffer can actually change between DataRows: // If a large DataRow exceeding the connector's current read buffer arrives, and we're // reading in non-sequential mode, a new oversize buffer is allocated. We thus have to // recapture the connector's buffer on each new DataRow. // Note that this can happen even in sequential mode, if the row description message is big // (see #2003) if (!ReferenceEquals(Buffer, Connector.ReadBuffer)) Buffer = Connector.ReadBuffer; // We assume that the row's number of columns is identical to the description's var numColumns = Buffer.ReadInt16(); if (ColumnCount != numColumns) ThrowHelper.ThrowArgumentException($"Row's number of columns ({numColumns}) differs from the row description's ({ColumnCount})"); var readPosition = Buffer.ReadPosition; var msgRemainder = dataRow.Length - sizeof(short); _dataMsgEnd = readPosition + msgRemainder; _columnsStartPos = readPosition; _isRowBuffered = msgRemainder <= Buffer.FilledBytes - readPosition; Debug.Assert(_isRowBuffered || _isSequential); _column = -1; if (_columns.Count > 0) _columns.Clear(); switch (State) { case ReaderState.BetweenResults: _hasRows = true; State = ReaderState.BeforeResult; break; case ReaderState.BeforeResult: State = ReaderState.InResult; break; case ReaderState.InResult: break; default: Connector.UnexpectedMessageReceived(BackendMessageCode.DataRow); break; } [MethodImpl(MethodImplOptions.NoInlining)] void HandleUncommon(IBackendMessage msg) { switch (msg.Code) { case BackendMessageCode.CommandComplete: var completed = (CommandCompleteMessage)msg; switch (completed.StatementType) { case StatementType.Update: case StatementType.Insert: case StatementType.Delete: case StatementType.Copy: case StatementType.Move: case StatementType.Merge: _recordsAffected ??= 0; _recordsAffected += completed.Rows; break; } _statements[StatementIndex].ApplyCommandComplete(completed); State = ReaderState.BetweenResults; break; case BackendMessageCode.EmptyQueryResponse: State = ReaderState.BetweenResults; break; default: Connector.UnexpectedMessageReceived(msg.Code); break; } } } #endregion /// /// Gets a value indicating the depth of nesting for the current row. Always returns zero. /// public override int Depth => 0; /// /// Gets a value indicating whether the data reader is closed. /// public override bool IsClosed => State is ReaderState.Closed or ReaderState.Disposed; /// /// Gets the number of rows changed, inserted, or deleted by execution of the SQL statement. /// /// /// The number of rows changed, inserted, or deleted. -1 for SELECT statements; 0 if no rows were affected or the statement failed. /// public override int RecordsAffected => !_recordsAffected.HasValue ? -1 : _recordsAffected > int.MaxValue ? throw new OverflowException( $"The number of records affected exceeds int.MaxValue. Use {nameof(Rows)}.") : (int)_recordsAffected; /// /// Gets the number of rows changed, inserted, or deleted by execution of the SQL statement. /// /// /// The number of rows changed, inserted, or deleted. 0 for SELECT statements, if no rows were affected or the statement failed. /// public ulong Rows => _recordsAffected ?? 0; /// /// Returns details about each statement that this reader will or has executed. /// /// /// Note that some fields (i.e. rows and oid) are only populated as the reader /// traverses the result. /// /// For commands with multiple queries, this exposes the number of rows affected on /// a statement-by-statement basis, unlike /// which exposes an aggregation across all statements. /// [Obsolete("Use the new DbBatch API")] public IReadOnlyList Statements { get { ThrowIfClosedOrDisposed(); return _statements.AsReadOnly(); } } /// /// Gets a value that indicates whether this DbDataReader contains one or more rows. /// public override bool HasRows { get { ThrowIfClosedOrDisposed(); return _hasRows; } } /// /// Indicates whether the reader is currently positioned on a row, i.e. whether reading a /// column is possible. /// This property is different from in that will /// return true even if attempting to read a column will fail, e.g. before /// has been called /// public bool IsOnRow { get { ThrowIfClosedOrDisposed(); return State is ReaderState.InResult; } } /// /// Gets the name of the column, given the zero-based column ordinal. /// /// The zero-based column ordinal. /// The name of the specified column. public override string GetName(int ordinal) => GetField(ordinal).Name; /// /// Gets the number of columns in the current row. /// public override int FieldCount { get { ThrowIfClosedOrDisposed(); return RowDescription?.Count ?? 0; } } #region Cleanup / Dispose /// /// Consumes all result sets for this reader, leaving the connector ready for sending and processing further /// queries /// async Task Consume(bool async, Exception? firstException = null) { var exceptions = firstException is null ? null : new List { firstException }; // Skip over the other result sets. Note that this does tally records affected from CommandComplete messages, and properly sets // state for auto-prepared statements // // The only exception is when the connector is broken (which can happen in the middle of consuming) // As then there is no point in going forward. // An exception to the exception above is when connector is concurrently closed while // the reader is still going over the result set. // While this is undefined behavior and user error, we should try to at least do our best to not loop indefinitely. // // While we can also check our local state (State == Closed) // It's probably better to rely on connector since it's private and its state can't be changed while (Connector.IsConnected) { try { if (!(_isSchemaOnly ? await NextResultSchemaOnly(async, isConsuming: true).ConfigureAwait(false) : await NextResult(async, isConsuming: true).ConfigureAwait(false))) { break; } } catch (Exception e) { exceptions ??= []; exceptions.Add(e); } } Debug.Assert(exceptions?.Count != 0); switch (exceptions?.Count) { case null: return; case 1: ExceptionDispatchInfo.Capture(exceptions[0]).Throw(); return; default: throw new NpgsqlException( "Multiple exceptions occurred when consuming the result set", new AggregateException(exceptions)); } } /// /// Releases the resources used by the . /// protected override void Dispose(bool disposing) { try { Close(connectionClosing: false, async: false, isDisposing: true).GetAwaiter().GetResult(); } catch (Exception ex) { // In the case of a PostgresException (or multiple ones, if we have error barriers), the reader's state has already been set // to Disposed in Close above. Therefore, we only set the state to Disposed if the exception *wasn't* a PostgresException. if (!(ex is PostgresException || ex is NpgsqlException { InnerException: AggregateException aggregateException } && AllPostgresExceptions(aggregateException.InnerExceptions))) { State = ReaderState.Disposed; } throw; } finally { Command.TraceCommandStop(); } } /// /// Releases the resources used by the . /// public override async ValueTask DisposeAsync() { try { await Close(connectionClosing: false, async: true, isDisposing: true).ConfigureAwait(false); } catch (Exception ex) { // In the case of a PostgresException (or multiple ones, if we have error barriers), the reader's state has already been set // to Disposed in Close above. Therefore, we only set the state to Disposed if the exception *wasn't* a PostgresException. if (!(ex is PostgresException || ex is NpgsqlException { InnerException: AggregateException aggregateException } && AllPostgresExceptions(aggregateException.InnerExceptions))) { State = ReaderState.Disposed; } throw; } finally { Command.TraceCommandStop(); } } static bool AllPostgresExceptions(ReadOnlyCollection collection) { foreach (var exception in collection) if (exception is not PostgresException) return false; return true; } /// /// Closes the reader, allowing a new command to be executed. /// public override void Close() => Close(connectionClosing: false, async: false, isDisposing: false).GetAwaiter().GetResult(); /// /// Closes the reader, allowing a new command to be executed. /// public override Task CloseAsync() => Close(async: true, connectionClosing: false, isDisposing: false); internal async Task Close(bool async, bool connectionClosing, bool isDisposing) { if (State is ReaderState.Closed or ReaderState.Disposed) { if (isDisposing) State = ReaderState.Disposed; return; } // Whenever a connector is broken, it also closes the current reader. Connector.CurrentReader = null; switch (Connector.State) { case ConnectorState.Ready: case ConnectorState.Fetching: case ConnectorState.Executing: case ConnectorState.Connecting: if (State != ReaderState.Consumed) { try { await Consume(async).ConfigureAwait(false); } catch (Exception ex) when (ex is OperationCanceledException or NpgsqlException { InnerException: TimeoutException }) { // Timeout/cancellation - completely normal, consume has basically completed. } catch (Exception ex) when ( ex is PostgresException || ex is NpgsqlException { InnerException: AggregateException aggregateException } && AllPostgresExceptions(aggregateException.InnerExceptions)) { // In the case of a PostgresException (or multiple ones, if we have error barriers), the connection is fine and consume // has basically completed. Defer throwing the exception until Cleanup is complete. await Cleanup(async, connectionClosing, isDisposing).ConfigureAwait(false); throw; } catch { Debug.Assert(Connector.IsBroken); throw; } } break; case ConnectorState.Closed: case ConnectorState.Broken: break; case ConnectorState.Waiting: case ConnectorState.Copy: case ConnectorState.Replication: Debug.Fail("Bad connector state when closing reader: " + Connector.State); break; default: throw new ArgumentOutOfRangeException(); } await Cleanup(async, connectionClosing, isDisposing).ConfigureAwait(false); } internal async Task Cleanup(bool async, bool connectionClosing = false, bool isDisposing = false) { LogMessages.ReaderCleanup(_commandLogger, Connector.Id); // _sendTask contains the task for the writing of this command. // Make sure that this task, which may have executed asynchronously and in parallel with the reading, // has completed, throwing any exceptions it generated. If we don't do this, there's the possibility of a race condition where the // user executes a new command after reader.Dispose() returns, but some additional write stuff is still finishing up from the last // command. if (_sendTask is { Status: not TaskStatus.RanToCompletion }) { // If the connector is broken, we have no reason to wait for the sendTask to complete // as we're not going to send anything else over it // and that can lead to deadlocks (concurrent write and read failure, see #4804) if (Connector.IsBroken) { // Prevent unobserved Task notifications by observing the failed Task exception. _ = _sendTask.ContinueWith(t => _ = t.Exception, CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Current); } else { try { if (async) await _sendTask.ConfigureAwait(false); else _sendTask.GetAwaiter().GetResult(); } catch (Exception e) { // TODO: think of a better way to handle exceptions, see #1323 and #3163 _commandLogger.LogDebug(e, "Exception caught while sending the request", Connector.Id); } } } if (ColumnInfoCache is { } cache) { ColumnInfoCache = null; ArrayPool.Shared.Return(cache, clearArray: true); } State = ReaderState.Closed; Command.State = CommandState.Idle; Connector.CurrentReader = null; if (_commandLogger.IsEnabled(LogLevel.Information)) Command.LogExecutingCompleted(Connector, executing: false); NpgsqlEventSource.Log.CommandStop(); Connector.DataSource.MetricsReporter.ReportCommandStop(_startTimestamp); Connector.EndUserAction(); if (isDisposing) State = ReaderState.Disposed; if (_behavior.HasFlag(CommandBehavior.CloseConnection) && !connectionClosing) { Debug.Assert(_connection is not null); await _connection.Close(async).ConfigureAwait(false); } if (ReaderClosed != null) { ReaderClosed(this, EventArgs.Empty); ReaderClosed = null; } } #endregion #region Simple value getters /// /// Gets the value of the specified column as a Boolean. /// /// The zero-based column ordinal. /// The value of the specified column. public override bool GetBoolean(int ordinal) => GetFieldValueCore(ordinal); /// /// Gets the value of the specified column as a byte. /// /// The zero-based column ordinal. /// The value of the specified column. public override byte GetByte(int ordinal) => GetFieldValueCore(ordinal); /// /// Gets the value of the specified column as a single character. /// /// The zero-based column ordinal. /// The value of the specified column. public override char GetChar(int ordinal) => GetFieldValueCore(ordinal); /// /// Gets the value of the specified column as a 16-bit signed integer. /// /// The zero-based column ordinal. /// The value of the specified column. public override short GetInt16(int ordinal) => GetFieldValueCore(ordinal); /// /// Gets the value of the specified column as a 32-bit signed integer. /// /// The zero-based column ordinal. /// The value of the specified column. public override int GetInt32(int ordinal) => GetFieldValueCore(ordinal); /// /// Gets the value of the specified column as a 64-bit signed integer. /// /// The zero-based column ordinal. /// The value of the specified column. public override long GetInt64(int ordinal) => GetFieldValueCore(ordinal); /// /// Gets the value of the specified column as a object. /// /// The zero-based column ordinal. /// The value of the specified column. public override DateTime GetDateTime(int ordinal) => GetFieldValueCore(ordinal); /// /// Gets the value of the specified column as an instance of . /// /// The zero-based column ordinal. /// The value of the specified column. public override string GetString(int ordinal) => GetFieldValueCore(ordinal); /// /// Gets the value of the specified column as a object. /// /// The zero-based column ordinal. /// The value of the specified column. public override decimal GetDecimal(int ordinal) => GetFieldValueCore(ordinal); /// /// Gets the value of the specified column as a double-precision floating point number. /// /// The zero-based column ordinal. /// The value of the specified column. public override double GetDouble(int ordinal) => GetFieldValueCore(ordinal); /// /// Gets the value of the specified column as a single-precision floating point number. /// /// The zero-based column ordinal. /// The value of the specified column. public override float GetFloat(int ordinal) => GetFieldValueCore(ordinal); /// /// Gets the value of the specified column as a globally-unique identifier (GUID). /// /// The zero-based column ordinal. /// The value of the specified column. public override Guid GetGuid(int ordinal) => GetFieldValueCore(ordinal); /// /// Populates an array of objects with the column values of the current row. /// /// An array of Object into which to copy the attribute columns. /// The number of instances of in the array. public override int GetValues(object[] values) { ThrowIfNotInResult(); ArgumentNullException.ThrowIfNull(values); var count = Math.Min(ColumnCount, values.Length); for (var i = 0; i < count; i++) values[i] = GetValue(i); return count; } /// /// Gets the value of the specified column as an instance of . /// /// The zero-based column ordinal. /// The value of the specified column. public override object this[int ordinal] => GetValue(ordinal); #endregion #region Provider-specific simple type getters /// /// Gets the value of the specified column as a TimeSpan, /// /// /// PostgreSQL's interval type has a resolution of 1 microsecond and ranges from /// -178000000 to 178000000 years, while .NET's TimeSpan has a resolution of 100 nanoseconds /// and ranges from roughly -29247 to 29247 years. /// See https://www.postgresql.org/docs/current/static/datatype-datetime.html /// /// The zero-based column ordinal. /// The value of the specified column. public TimeSpan GetTimeSpan(int ordinal) => GetFieldValueCore(ordinal); /// protected override DbDataReader GetDbDataReader(int ordinal) => GetData(ordinal); /// /// Returns a nested data reader for the requested column. /// The column type must be a record or a Npgsql known composite type, or an array thereof. /// Currently only supported in non-sequential mode. /// /// The zero-based column ordinal. /// A data reader. public new NpgsqlNestedDataReader GetData(int ordinal) { ThrowIfNotInResult(); var field = RowDescription[ordinal]; if (_isSequential) ThrowHelper.ThrowNotSupportedException("GetData() not supported in sequential mode."); var type = field.PostgresType; var isArray = type is PostgresArrayType; var elementType = isArray ? ((PostgresArrayType)type).Element : type; var compositeType = elementType as PostgresCompositeType; if (field.DataFormat is DataFormat.Text || (elementType.InternalName != "record" && compositeType == null)) ThrowHelper.ThrowInvalidCastException("GetData() not supported for type " + field.TypeDisplayName); if (SeekToColumn(ordinal, field.DataFormat, resumableOp: true) is -1) ThrowHelper.ThrowInvalidCastException_NoValue(field); Debug.Assert(!PgReader.NestedInitialized, "Unexpected nested read active, Seek(0) would seek to the start of the nested data."); PgReader.Seek(0); var reader = CachedFreeNestedDataReader; if (reader != null) { CachedFreeNestedDataReader = null; reader.Init(compositeType); } else { reader = new NpgsqlNestedDataReader(this, null, 1, compositeType); } if (isArray) reader.InitArray(); else reader.InitSingleRow(); return reader; } #endregion #region Special binary getters /// /// Reads a stream of bytes from the specified column, starting at location indicated by dataOffset, into the buffer, starting at the location indicated by bufferOffset. /// /// The zero-based column ordinal. /// The index within the row from which to begin the read operation. /// The buffer into which to copy the data. /// The index with the buffer to which the data will be copied. /// The maximum number of characters to read. /// The actual number of bytes read. public override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length) { ThrowIfNotInResult(); var field = RowDescription[ordinal]; if (dataOffset is < 0 or > int.MaxValue) ThrowHelper.ThrowArgumentOutOfRangeException(nameof(dataOffset), "dataOffset must be between 0 and {0}", int.MaxValue); if (buffer != null && (bufferOffset < 0 || bufferOffset >= buffer.Length + 1)) ThrowHelper.ThrowIndexOutOfRangeException("bufferOffset must be between 0 and {0}", buffer.Length); if (buffer != null && (length < 0 || length > buffer.Length - bufferOffset)) ThrowHelper.ThrowIndexOutOfRangeException("bufferOffset must be between 0 and {0}", buffer.Length - bufferOffset); if (SeekToColumn(ordinal, field.DataFormat, resumableOp: true) is var columnLength && columnLength is -1) ThrowHelper.ThrowInvalidCastException_NoValue(field); if (buffer is null) return columnLength; // Check whether any sequential seek is contractually sound (even though we might be able to satisfy rewinds we make sure we won't). if (_isSequential && PgReader.IsFieldConsumed((int)dataOffset)) ThrowHelper.ThrowInvalidOperationException("Attempt to read a position in the column which has already been read"); // Move to offset Debug.Assert(!PgReader.NestedInitialized, "Unexpected nested read active, Seek(0) would seek to the start of the nested data."); var remaining = PgReader.Seek((int)dataOffset); // At offset, read into buffer. length = Math.Min(length, remaining); PgReader.ReadBytes(new Span(buffer, bufferOffset, length)); return length; } /// /// Retrieves data as a . /// /// The zero-based column ordinal. /// The returned object. public override Stream GetStream(int ordinal) => GetFieldValueCore(ordinal); /// /// Retrieves data as a . /// /// The zero-based column ordinal. /// /// An optional token to cancel the asynchronous operation. The default value is . /// /// The returned object. public Task GetStreamAsync(int ordinal, CancellationToken cancellationToken = default) => GetFieldValueAsync(ordinal, cancellationToken); #endregion #region Special text getters /// /// Reads a stream of characters from the specified column, starting at location indicated by dataOffset, into the buffer, starting at the location indicated by bufferOffset. /// /// The zero-based column ordinal. /// The index within the row from which to begin the read operation. /// The buffer into which to copy the data. /// The index with the buffer to which the data will be copied. /// The maximum number of characters to read. /// The actual number of characters read. public override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length) { ThrowIfNotInResult(); // Check whether we have a GetChars implementation for this column type. var field = GetInfo(ordinal, typeof(GetChars), out var converter, out var bufferRequirement, out var asObject); if (dataOffset is < 0 or > int.MaxValue) ThrowHelper.ThrowArgumentOutOfRangeException(nameof(dataOffset), "dataOffset must be between 0 and {0}", int.MaxValue); if (buffer != null && (bufferOffset < 0 || bufferOffset >= buffer.Length + 1)) ThrowHelper.ThrowIndexOutOfRangeException("bufferOffset must be between 0 and {0}", buffer.Length); if (buffer != null && (length < 0 || length > buffer.Length - bufferOffset)) ThrowHelper.ThrowIndexOutOfRangeException("bufferOffset must be between 0 and {0}", buffer.Length - bufferOffset); if (SeekToColumn(ordinal, field, resumableOp: true) is -1) ThrowHelper.ThrowInvalidCastException_NoValue(RowDescription[ordinal]); var reader = PgReader; dataOffset = buffer is null ? 0 : dataOffset; if (_isSequential && reader.CharsRead > dataOffset) ThrowHelper.ThrowInvalidOperationException("Attempt to read a position in the column which has already been read"); reader.StartCharsRead(checked((int)dataOffset), buffer is not null ? new ArraySegment(buffer, bufferOffset, length) : (ArraySegment?)null); reader.StartRead(bufferRequirement); var result = asObject ? (GetChars)converter.ReadAsObject(reader) : ((PgConverter)converter).Read(reader); reader.EndRead(); reader.EndCharsRead(); return result.Read; } /// /// Retrieves data as a . /// /// The zero-based column ordinal. /// The returned object. public override TextReader GetTextReader(int ordinal) => GetFieldValueCore(ordinal); /// /// Retrieves data as a . /// /// The zero-based column ordinal. /// /// An optional token to cancel the asynchronous operation. The default value is . /// /// The returned object. public Task GetTextReaderAsync(int ordinal, CancellationToken cancellationToken = default) => GetFieldValueAsync(ordinal, cancellationToken); #endregion #region GetFieldValue /// /// Asynchronously gets the value of the specified column as a type. /// /// The type of the value to be returned. /// The type of the value to be returned. /// /// An optional token to cancel the asynchronous operation. The default value is . /// /// public override Task GetFieldValueAsync(int ordinal, CancellationToken cancellationToken) { // As the row is buffered we know the column is too - no I/O will take place if (_isRowBuffered) return Task.FromResult(GetFieldValueCore(ordinal)); // The only statically mapped converter, it always exists. if (typeof(T) == typeof(Stream)) return GetStream(ordinal, cancellationToken); return Core(ordinal, cancellationToken).AsTask(); async ValueTask Core(int ordinal, CancellationToken cancellationToken) { ThrowIfNotInResult(); var field = GetInfo(ordinal, typeof(T), out var converter, out var bufferRequirement, out var asObject); using var registration = Connector.StartNestedCancellableOperation(cancellationToken, attemptPgCancellation: false); if (await SeekToColumnAsync(ordinal, field).ConfigureAwait(false) is -1) return DbNullValueOrThrow(ordinal); if (typeof(T) == typeof(TextReader)) PgReader.ThrowIfStreamActive(); Debug.Assert(asObject || converter is PgConverter); await PgReader.StartReadAsync(bufferRequirement, cancellationToken).ConfigureAwait(false); var result = asObject ? (T)await converter.ReadAsObjectAsync(PgReader, cancellationToken).ConfigureAwait(false) : await converter.UnsafeDowncast().ReadAsync(PgReader, cancellationToken).ConfigureAwait(false); await PgReader.EndReadAsync().ConfigureAwait(false); return result; } async Task GetStream(int ordinal, CancellationToken cancellationToken) { using var registration = Connector.StartNestedCancellableOperation(cancellationToken, attemptPgCancellation: false); var field = GetDefaultInfo(ordinal, out _, out _); PgReader.ThrowIfStreamActive(); if (await SeekToColumnAsync(ordinal, field).ConfigureAwait(false) is -1) return DbNullValueOrThrow(ordinal); return (T)(object)PgReader.GetStream(canSeek: !_isSequential); } } /// /// Synchronously gets the value of the specified column as a type. /// /// Synchronously gets the value of the specified column as a type. /// The column to be retrieved. /// The column to be retrieved. public override T GetFieldValue(int ordinal) => GetFieldValueCore(ordinal); T GetFieldValueCore(int ordinal) { ThrowIfNotInResult(); // The only statically mapped converter, it always exists. if (typeof(T) == typeof(Stream)) return GetStream(ordinal); var field = GetInfo(ordinal, typeof(T), out var converter, out var bufferRequirement, out var asObject); if (typeof(T) == typeof(TextReader)) PgReader.ThrowIfStreamActive(); if (SeekToColumn(ordinal, field) is -1) return DbNullValueOrThrow(ordinal); Debug.Assert(asObject || converter is PgConverter); PgReader.StartRead(bufferRequirement); var result = asObject ? (T)converter.ReadAsObject(PgReader) : converter.UnsafeDowncast().Read(PgReader); PgReader.EndRead(); return result; [MethodImpl(MethodImplOptions.NoInlining)] T GetStream(int ordinal) { var field = GetDefaultInfo(ordinal, out _, out _); PgReader.ThrowIfStreamActive(); if (SeekToColumn(ordinal, field) is -1) return DbNullValueOrThrow(ordinal); return (T)(object)PgReader.GetStream(canSeek: !_isSequential); } } #endregion #region GetValue /// /// Gets the value of the specified column as an instance of . /// /// The zero-based column ordinal. /// The value of the specified column. public override object GetValue(int ordinal) { ThrowIfNotInResult(); var field = GetDefaultInfo(ordinal, out var converter, out var bufferRequirement); if (SeekToColumn(ordinal, field) is -1) return DBNull.Value; PgReader.StartRead(bufferRequirement); var result = converter.ReadAsObject(PgReader); PgReader.EndRead(); return result; } /// /// Gets the value of the specified column as an instance of . /// /// The name of the column. /// The value of the specified column. public override object this[string name] => GetValue(GetOrdinal(name)); #endregion #region IsDBNull /// /// Gets a value that indicates whether the column contains nonexistent or missing values. /// /// The zero-based column ordinal. /// true if the specified column is equivalent to ; otherwise false. public override bool IsDBNull(int ordinal) { ThrowIfNotInResult(); return SeekToColumn(ordinal, RowDescription[ordinal].DataFormat, resumableOp: true) is -1; } /// /// An asynchronous version of , which gets a value that indicates whether the column contains non-existent or missing values. /// The parameter is currently ignored. /// /// The zero-based column to be retrieved. /// /// An optional token to cancel the asynchronous operation. The default value is . /// /// true if the specified column value is equivalent to otherwise false. public override Task IsDBNullAsync(int ordinal, CancellationToken cancellationToken) { if (_isRowBuffered) return IsDBNull(ordinal) ? TrueTask : FalseTask; return Core(ordinal, cancellationToken); async Task Core(int ordinal, CancellationToken cancellationToken) { ThrowIfNotInResult(); using var registration = Connector.StartNestedCancellableOperation(cancellationToken, attemptPgCancellation: false); return await SeekToColumnAsync(ordinal, RowDescription[ordinal].DataFormat, resumableOp: true).ConfigureAwait(false) is -1; } } #endregion #region Other public accessors /// /// Gets the column ordinal given the name of the column. /// /// The name of the column. /// The zero-based column ordinal. public override int GetOrdinal(string name) { ThrowIfClosedOrDisposed(); if (string.IsNullOrEmpty(name)) ThrowHelper.ThrowArgumentException($"{nameof(name)} cannot be empty", nameof(name)); if (RowDescription is null) ThrowHelper.ThrowInvalidOperationException("No resultset is currently being traversed"); return RowDescription.GetFieldIndex(name); } /// /// Gets a representation of the PostgreSQL data type for the specified field. /// The returned representation can be used to access various information about the field. /// /// The zero-based column index. public PostgresType GetPostgresType(int ordinal) => GetField(ordinal).PostgresType; /// /// Gets the data type information for the specified field. /// This is the PostgreSQL type name (e.g. double precision), not the .NET type /// (see for that). /// /// The zero-based column index. public override string GetDataTypeName(int ordinal) => GetField(ordinal).TypeDisplayName; /// /// Gets the OID for the PostgreSQL type for the specified field, as it appears in the pg_type table. /// /// /// This is a PostgreSQL-internal value that should not be relied upon and should only be used for /// debugging purposes. /// /// The zero-based column index. public uint GetDataTypeOID(int ordinal) => GetField(ordinal).TypeOID; /// /// Gets the data type of the specified column. /// /// The zero-based column ordinal. /// The data type of the specified column. [UnconditionalSuppressMessage("ILLink", "IL2093", Justification = "Members are only dynamically accessed by Npgsql via GetFieldType by GetSchema, and only in certain cases. " + "Holding PublicFields and PublicProperties metadata on all our mapped types just for that case is the wrong tradeoff.")] public override Type GetFieldType(int ordinal) => GetField(ordinal).FieldType; /// /// Returns an that can be used to iterate through the rows in the data reader. /// /// An that can be used to iterate through the rows in the data reader. public override IEnumerator GetEnumerator() => new DbEnumerator(this); /// /// Returns schema information for the columns in the current resultset. /// /// public ReadOnlyCollection GetColumnSchema() => GetColumnSchema(async: false).GetAwaiter().GetResult(); ReadOnlyCollection IDbColumnSchemaGenerator.GetColumnSchema() { var columns = GetColumnSchema(); var result = new DbColumn[columns.Count]; var i = 0; foreach (var column in columns) result[i++] = column; return new ReadOnlyCollection(result); } /// /// Asynchronously returns schema information for the columns in the current resultset. /// /// public override Task> GetColumnSchemaAsync(CancellationToken cancellationToken = default) => GetColumnSchema(async: true, cancellationToken); Task> GetColumnSchema(bool async, CancellationToken cancellationToken = default) where T : DbColumn => RowDescription == null || ColumnCount == 0 ? Task.FromResult(new List().AsReadOnly()) : new DbColumnSchemaGenerator(_connection!, RowDescription, _behavior.HasFlag(CommandBehavior.KeyInfo)) .GetColumnSchema(async, cancellationToken); #endregion #region Schema metadata table /// /// Returns a System.Data.DataTable that describes the column metadata of the DataReader. /// [UnconditionalSuppressMessage( "Composite type mapping currently isn't trimming-safe, and warnings are generated at the MapComposite level.", "IL2026")] public override DataTable? GetSchemaTable() => GetSchemaTable(async: false).GetAwaiter().GetResult(); /// /// Asynchronously returns a System.Data.DataTable that describes the column metadata of the DataReader. /// [UnconditionalSuppressMessage( "Composite type mapping currently isn't trimming-safe, and warnings are generated at the MapComposite level.", "IL2026")] public override Task GetSchemaTableAsync(CancellationToken cancellationToken = default) => GetSchemaTable(async: true, cancellationToken); [UnconditionalSuppressMessage("Trimming", "IL2111", Justification = "typeof(Type).TypeInitializer is not used.")] async Task GetSchemaTable(bool async, CancellationToken cancellationToken = default) { if (FieldCount == 0) // No resultset return null; var table = new DataTable("SchemaTable"); // Note: column order is important to match SqlClient's, some ADO.NET users appear // to assume ordering (see #1671) table.Columns.Add("ColumnName", typeof(string)); table.Columns.Add("ColumnOrdinal", typeof(int)); table.Columns.Add("ColumnSize", typeof(int)); table.Columns.Add("NumericPrecision", typeof(int)); table.Columns.Add("NumericScale", typeof(int)); table.Columns.Add("IsUnique", typeof(bool)); table.Columns.Add("IsKey", typeof(bool)); table.Columns.Add("BaseServerName", typeof(string)); table.Columns.Add("BaseCatalogName", typeof(string)); table.Columns.Add("BaseColumnName", typeof(string)); table.Columns.Add("BaseSchemaName", typeof(string)); table.Columns.Add("BaseTableName", typeof(string)); table.Columns.Add("DataType", typeof(Type)); table.Columns.Add("AllowDBNull", typeof(bool)); table.Columns.Add("ProviderType", typeof(int)); table.Columns.Add("IsAliased", typeof(bool)); table.Columns.Add("IsExpression", typeof(bool)); table.Columns.Add("IsIdentity", typeof(bool)); table.Columns.Add("IsAutoIncrement", typeof(bool)); table.Columns.Add("IsRowVersion", typeof(bool)); table.Columns.Add("IsHidden", typeof(bool)); table.Columns.Add("IsLong", typeof(bool)); table.Columns.Add("IsReadOnly", typeof(bool)); table.Columns.Add("ProviderSpecificDataType", typeof(Type)); table.Columns.Add("DataTypeName", typeof(string)); foreach (var column in await GetColumnSchema(async, cancellationToken).ConfigureAwait(false)) { var row = table.NewRow(); row["ColumnName"] = column.ColumnName; row["ColumnOrdinal"] = column.ColumnOrdinal ?? -1; row["ColumnSize"] = column.ColumnSize ?? -1; row["NumericPrecision"] = column.NumericPrecision ?? 0; row["NumericScale"] = column.NumericScale ?? 0; row["IsUnique"] = column.IsUnique == true; row["IsKey"] = column.IsKey == true; row["BaseServerName"] = ""; row["BaseCatalogName"] = column.BaseCatalogName; row["BaseColumnName"] = column.BaseColumnName; row["BaseSchemaName"] = column.BaseSchemaName; row["BaseTableName"] = column.BaseTableName; row["DataType"] = column.DataType; row["AllowDBNull"] = (object?)column.AllowDBNull ?? DBNull.Value; row["ProviderType"] = column.NpgsqlDbType ?? NpgsqlDbType.Unknown; row["IsAliased"] = column.IsAliased == true; row["IsExpression"] = column.IsExpression == true; row["IsIdentity"] = column.IsIdentity == true; row["IsAutoIncrement"] = column.IsAutoIncrement == true; row["IsRowVersion"] = false; row["IsHidden"] = column.IsHidden == true; row["IsLong"] = column.IsLong == true; row["IsReadOnly"] = column.IsReadOnly == true; row["DataTypeName"] = column.DataTypeName; table.Rows.Add(row); } return table; } #endregion Schema metadata table #region Seeking [MethodImpl(MethodImplOptions.AggressiveInlining)] int SeekToColumn(int ordinal, DataFormat dataFormat, bool resumableOp = false) { Debug.Assert(_isRowBuffered || _isSequential); var reader = PgReader; var column = _column; // Column rereading rules for sequential mode: // * We never allow rereading if the column didn't get initialized as resumable the previous time // * If it did get initialized as resumable we only allow rereading when either of the following is true: // - The op is a resumable one again // - The op isn't resumable but the field is still entirely unconsumed if (_isSequential && (column > ordinal || (column == ordinal && (!reader.Resumable || (!resumableOp && !reader.FieldAtStart))))) ThrowInvalidSequentialSeek(column, ordinal); if (column == ordinal) return reader.Restart(resumableOp); reader.Commit(); var columnLength = BufferSeekToColumn(column, ordinal, !_isRowBuffered); reader.Init(columnLength, dataFormat, resumableOp); return columnLength; static void ThrowInvalidSequentialSeek(int column, int ordinal) => ThrowHelper.ThrowInvalidOperationException( $"Invalid attempt to read from column ordinal '{ordinal}'. With CommandBehavior.SequentialAccess, " + $"you may only read from column ordinal '{column}' or greater."); } ValueTask SeekToColumnAsync(int ordinal, DataFormat dataFormat, bool resumableOp = false) { // When the row is buffered or we're rereading previous data no IO will be done. if (_isRowBuffered || _column >= ordinal) return new(SeekToColumn(ordinal, dataFormat, resumableOp)); return Core(ordinal, dataFormat, resumableOp); [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] async ValueTask Core(int ordinal, DataFormat dataFormat, bool resumableOp) { Debug.Assert(!_isRowBuffered && _column < ordinal); var reader = PgReader; await reader.CommitAsync().ConfigureAwait(false); var columnLength = await BufferSeekToColumnAsync(_column, ordinal, !_isRowBuffered).ConfigureAwait(false); reader.Init(columnLength, dataFormat, resumableOp); return columnLength; } } [MethodImpl(MethodImplOptions.AggressiveInlining)] int BufferSeekToColumn(int column, int ordinal, bool allowIO) { Debug.Assert(column < ordinal || !allowIO); if (column >= ordinal) { _column = ordinal; return SeekBackwards(ordinal); } // We know we need at least one iteration, a do while also helps with optimal codegen. var buffer = Buffer; var columnLength = 0; do { if (columnLength > 0) buffer.Skip(columnLength, allowIO); if (allowIO) buffer.Ensure(sizeof(int)); columnLength = buffer.ReadInt32(); Debug.Assert(columnLength >= -1); } while (++_column < ordinal); return columnLength; // On the first call to SeekBackwards we'll fill up the columns list as we may need seek positions more than once. [MethodImpl(MethodImplOptions.NoInlining)] int SeekBackwards(int ordinal) { var buffer = Buffer; var columns = _columns; (buffer.ReadPosition, var columnLength) = columns.Count is 0 ? (_columnsStartPos, 0) : columns[Math.Min(columns.Count -1, ordinal)]; while (columns.Count <= ordinal) { if (columnLength > 0) buffer.Skip(columnLength); columnLength = buffer.ReadInt32(); columns.Add((buffer.ReadPosition, columnLength)); } return columnLength; } } ValueTask BufferSeekToColumnAsync(int column, int ordinal, bool allowIO) { return !allowIO || column >= ordinal ? new(BufferSeekToColumn(column, ordinal, allowIO)) : Core(ordinal); [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] async ValueTask Core(int ordinal) { // We know we need at least one iteration, a do while also helps with optimal codegen. var buffer = Buffer; var columnLength = 0; do { if (columnLength > 0) await buffer.Skip(async: true, columnLength).ConfigureAwait(false); await buffer.EnsureAsync(sizeof(int)).ConfigureAwait(false); columnLength = buffer.ReadInt32(); Debug.Assert(columnLength >= -1); } while (++_column < ordinal); return columnLength; } } #endregion #region ConsumeRow Task ConsumeRow(bool async) { Debug.Assert(State is ReaderState.InResult or ReaderState.BeforeResult); if (!_isRowBuffered) return ConsumeRowSequential(async); // We get here, if we're in a non-sequential mode (or the row is already in the buffer) ConsumeBufferedRow(); return Task.CompletedTask; async Task ConsumeRowSequential(bool async) { if (async) await PgReader.CommitAsync().ConfigureAwait(false); else PgReader.Commit(); // Skip over the remaining columns in the row var buffer = Buffer; // Written as a while to be able to increment _column directly after reading into it. while (_column < ColumnCount - 1) { await buffer.Ensure(4, async).ConfigureAwait(false); var columnLength = buffer.ReadInt32(); _column++; Debug.Assert(columnLength >= -1); if (columnLength > 0) await buffer.Skip(async, columnLength).ConfigureAwait(false); } } } [MethodImpl(MethodImplOptions.AggressiveInlining)] void ConsumeBufferedRow() { Debug.Assert(State is ReaderState.InResult or ReaderState.BeforeResult); PgReader.Commit(); Buffer.ReadPosition = _dataMsgEnd; } #endregion #region Checks [MethodImpl(MethodImplOptions.NoInlining)] T DbNullValueOrThrow(int ordinal) { // When T is a Nullable (and only in that case), we support returning null if (default(T) is null && typeof(T).IsValueType) return default!; if (typeof(T) == typeof(object)) return (T)(object)DBNull.Value; ThrowHelper.ThrowInvalidCastException_NoValue(RowDescription![ordinal]); return default; } [MethodImpl(MethodImplOptions.AggressiveInlining)] DataFormat GetInfo(int ordinal, Type type, out PgConverter converter, out Size bufferRequirement, out bool asObject) { if ((uint)ordinal > (uint)ColumnCount) ThrowHelper.ThrowIndexOutOfRangeException("Ordinal must be between 0 and " + (ColumnCount - 1)); ref var info = ref ColumnInfoCache![ordinal]; Debug.Assert(info.ConverterInfo.IsDefault || ReferenceEquals(Connector.SerializerOptions, info.ConverterInfo.TypeInfo.Options), "Cache is bleeding over"); if (info.ConverterInfo.TypeToConvert == type) { converter = info.ConverterInfo.Converter; bufferRequirement = info.ConverterInfo.BufferRequirement; asObject = info.AsObject; return info.DataFormat; } return Slow(ref info, out converter, out bufferRequirement, out asObject); [MethodImpl(MethodImplOptions.NoInlining)] DataFormat Slow(ref ColumnInfo info, out PgConverter converter, out Size bufferRequirement, out bool asObject) { var field = RowDescription![ordinal]; field.GetInfo(type, ref info); converter = info.ConverterInfo.Converter; bufferRequirement = info.ConverterInfo.BufferRequirement; asObject = info.AsObject; return field.DataFormat; } } [MethodImpl(MethodImplOptions.AggressiveInlining)] DataFormat GetDefaultInfo(int ordinal, out PgConverter converter, out Size bufferRequirement) { var field = RowDescription![ordinal]; converter = field.ObjectInfo.Converter; bufferRequirement = field.ObjectInfo.BufferRequirement; return field.DataFormat; } /// /// Checks that we have a RowDescription, but not necessary an actual resultset /// (for operations which work in SchemaOnly mode). /// FieldDescription GetField(int ordinal) { ThrowIfClosedOrDisposed(); if (RowDescription is { } columns) return columns[ordinal]; ThrowHelper.ThrowInvalidOperationException("No resultset is currently being traversed"); return default!; } void ThrowIfClosedOrDisposed() { if (State is (ReaderState.Closed or ReaderState.Disposed) and var state) ThrowInvalidState(state); } [MemberNotNull(nameof(RowDescription))] void ThrowIfNotInResult() { if (State is not ReaderState.InResult and var state) ThrowInvalidState(state); Debug.Assert(RowDescription is not null); } [MethodImpl(MethodImplOptions.NoInlining)] static void ThrowInvalidState(ReaderState state) { switch (state) { case ReaderState.Closed: ThrowHelper.ThrowInvalidOperationException("The reader is closed"); break; case ReaderState.Disposed: ThrowHelper.ThrowObjectDisposedException(nameof(NpgsqlDataReader)); break; default: ThrowHelper.ThrowInvalidOperationException("No resultset is currently being traversed"); break; } } #endregion #region Misc /// /// Unbinds reader from the connector. /// Should be called before the connector is returned to the pool. /// internal void UnbindIfNecessary() { // We're closing the connection, but reader is not yet disposed // We have to unbind the reader from the connector, otherwise there could be a concurrency issues // See #3126 and #3290 if (State != ReaderState.Disposed) { Connector.DataReader = Connector.UnboundDataReader is { State: ReaderState.Disposed } previousReader ? previousReader : new NpgsqlDataReader(Connector); Connector.UnboundDataReader = this; } } #endregion } enum ReaderState { BeforeResult, InResult, BetweenResults, Consumed, Closed, Disposed, }