using System;
using System.Buffers;
using System.Collections;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Data;
using System.Data.Common;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Npgsql.BackendMessages;
using Npgsql.Internal;
using Npgsql.Internal.Converters;
using Npgsql.PostgresTypes;
using Npgsql.Schema;
using NpgsqlTypes;
using static Npgsql.Util.Statics;
namespace Npgsql;
///
/// Reads a forward-only stream of rows from a data source.
///
#pragma warning disable CA1010
public sealed class NpgsqlDataReader : DbDataReader, IDbColumnSchemaGenerator
#pragma warning restore CA1010
{
static readonly Task TrueTask = Task.FromResult(true);
static readonly Task FalseTask = Task.FromResult(false);
internal NpgsqlCommand Command { get; private set; } = default!;
internal NpgsqlConnector Connector { get; }
NpgsqlConnection? _connection;
///
/// The behavior of the command with which this reader was executed.
///
CommandBehavior _behavior;
///
/// The task for writing this command's messages. Awaited on reader cleanup.
///
Task? _sendTask;
internal ReaderState State = ReaderState.Disposed;
internal NpgsqlReadBuffer Buffer = default!;
PgReader PgReader => Buffer.PgReader;
///
/// Holds the list of statements being executed by this reader.
///
List _statements = default!;
///
/// The index of the current query resultset we're processing (within a multiquery)
///
internal int StatementIndex { get; private set; }
///
/// Records, for each column, its starting offset and length in the current row.
/// Used only in non-sequential mode.
///
readonly List<(int Offset, int Length)> _columns = [];
int _columnsStartPos;
///
/// The index of the column that we're on, i.e. that has already been parsed,
/// is memory and can be retrieved. Initialized to -1, which means we're on the column
/// count (which comes before the first column).
///
int _column;
///
/// The position in the buffer at which the current data row message ends.
/// Used only when the row is consumed non-sequentially.
///
int _dataMsgEnd;
///
/// Determines, if we can consume the row non-sequentially.
/// Mostly useful for a sequential mode, when the row is already in the buffer.
/// Should always be true for the non-sequential mode.
///
bool _isRowBuffered;
///
/// Gets or sets whether the current row is fully buffered in memory.
/// When , async reads will go through the real async converter path rather than the sync shortcut.
///
/// Settable for testing purposes.
internal bool IsRowBuffered
{
get => _isRowBuffered;
set => _isRowBuffered = value;
}
///
/// The RowDescription message for the current resultset being processed
///
internal RowDescriptionMessage? RowDescription;
int ColumnCount => RowDescription!.Count;
///
/// Stores the last converter info resolved by column, to speed up repeated reading.
///
ColumnInfo[]? ColumnInfoCache { get; set; }
ulong? _recordsAffected;
///
/// Whether the current result set has rows
///
bool _hasRows;
///
/// Is raised whenever Close() is called.
///
public event EventHandler? ReaderClosed;
bool _isSchemaOnly;
bool _isSequential;
internal NpgsqlNestedDataReader? CachedFreeNestedDataReader;
long _startTimestamp;
readonly ILogger _commandLogger;
internal NpgsqlDataReader(NpgsqlConnector connector)
{
Connector = connector;
_commandLogger = connector.CommandLogger;
}
internal void Init(
NpgsqlCommand command,
CommandBehavior behavior,
List statements,
long startTimestamp = 0,
Task? sendTask = null)
{
Debug.Assert(ColumnInfoCache is null);
Command = command;
_connection = command.InternalConnection;
_behavior = behavior;
_isSchemaOnly = _behavior.HasFlag(CommandBehavior.SchemaOnly);
_isSequential = _behavior.HasFlag(CommandBehavior.SequentialAccess);
_statements = statements;
StatementIndex = -1;
_sendTask = sendTask;
State = ReaderState.BetweenResults;
_recordsAffected = null;
_startTimestamp = startTimestamp;
}
#region Read
///
/// Advances the reader to the next record in a result set.
///
/// true if there are more rows; otherwise false.
///
/// The default position of a data reader is before the first record. Therefore, you must call Read to begin accessing data.
///
public override bool Read()
{
ThrowIfClosedOrDisposed();
return TryRead()?.Result ?? Read(false).GetAwaiter().GetResult();
}
///
/// This is the asynchronous version of
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
/// A task representing the asynchronous operation.
public override Task ReadAsync(CancellationToken cancellationToken)
{
ThrowIfClosedOrDisposed();
return TryRead() ?? Read(async: true, cancellationToken);
}
// This is an optimized execution path that avoids calling any async methods for the (usual)
// case where the next row (or CommandComplete) is already in memory.
Task? TryRead()
{
switch (State)
{
case ReaderState.BeforeResult:
// First Read() after NextResult. Data row has already been processed.
State = ReaderState.InResult;
return TrueTask;
case ReaderState.InResult:
break;
default:
return FalseTask;
}
// We have a special case path for SingleRow.
if (_behavior.HasFlag(CommandBehavior.SingleRow) || !_isRowBuffered)
return null;
ConsumeBufferedRow();
const int headerSize = sizeof(byte) + sizeof(int);
var buffer = Buffer;
var readPosition = buffer.ReadPosition;
var bytesLeft = buffer.FilledBytes - readPosition;
if (bytesLeft < headerSize)
return null;
var messageCode = (BackendMessageCode)buffer.ReadByte();
var len = buffer.ReadInt32() - sizeof(int); // Transmitted length includes itself
var isDataRow = messageCode is BackendMessageCode.DataRow;
// sizeof(short) is for the number of columns
var sufficientBytes = isDataRow && _isSequential ? headerSize + sizeof(short) : headerSize + len;
if (bytesLeft < sufficientBytes
|| !isDataRow && (_statements[StatementIndex].AppendErrorBarrier ?? Command.EnableErrorBarriers)
// Could be an error, let main read handle it.
|| Connector.ParseResultSetMessage(buffer, messageCode, len) is not { } msg)
{
buffer.ReadPosition = readPosition;
return null;
}
ProcessMessage(msg);
return isDataRow ? TrueTask : FalseTask;
}
async Task Read(bool async, CancellationToken cancellationToken = default)
{
using var registration = Connector.StartNestedCancellableOperation(cancellationToken);
try
{
switch (State)
{
case ReaderState.BeforeResult:
// First Read() after NextResult. Data row has already been processed.
State = ReaderState.InResult;
return true;
case ReaderState.InResult:
await ConsumeRow(async).ConfigureAwait(false);
if (_behavior.HasFlag(CommandBehavior.SingleRow))
{
// TODO: See optimization proposal in #410
await Consume(async).ConfigureAwait(false);
return false;
}
break;
case ReaderState.BetweenResults:
case ReaderState.Consumed:
case ReaderState.Closed:
case ReaderState.Disposed:
return false;
default:
ThrowHelper.ThrowArgumentOutOfRangeException();
return false;
}
var msg = await ReadMessage(async).ConfigureAwait(false);
switch (msg.Code)
{
case BackendMessageCode.DataRow:
ProcessMessage(msg);
return true;
case BackendMessageCode.CommandComplete:
case BackendMessageCode.EmptyQueryResponse:
ProcessMessage(msg);
if (_statements[StatementIndex].AppendErrorBarrier ?? Command.EnableErrorBarriers)
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
return false;
default:
throw Connector.UnexpectedMessageReceived(msg.Code);
}
}
catch
{
// Break may have progressed the reader already.
if (State is not ReaderState.Closed)
State = ReaderState.Consumed;
throw;
}
}
ValueTask ReadMessage(bool async)
{
return _isSequential ? ReadMessageSequential(Connector, async) : Connector.ReadMessage(async);
static async ValueTask ReadMessageSequential(NpgsqlConnector connector, bool async)
{
var msg = await connector.ReadMessage(async, DataRowLoadingMode.Sequential).ConfigureAwait(false);
if (msg.Code == BackendMessageCode.DataRow)
{
// Make sure that the datarow's column count is already buffered
await connector.ReadBuffer.Ensure(2, async).ConfigureAwait(false);
return msg;
}
return msg;
}
}
#endregion
#region NextResult
///
/// Advances the reader to the next result when reading the results of a batch of statements.
///
///
public override bool NextResult()
{
ThrowIfClosedOrDisposed();
return (_isSchemaOnly ? NextResultSchemaOnly(false) : NextResult(false))
.GetAwaiter().GetResult();
}
///
/// This is the asynchronous version of NextResult.
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
/// A task representing the asynchronous operation.
public override Task NextResultAsync(CancellationToken cancellationToken)
{
ThrowIfClosedOrDisposed();
return _isSchemaOnly
? NextResultSchemaOnly(async: true, cancellationToken: cancellationToken)
: NextResult(async: true, cancellationToken: cancellationToken);
}
///
/// Internal implementation of NextResult
///
async Task NextResult(bool async, bool isConsuming = false, CancellationToken cancellationToken = default)
{
Debug.Assert(!_isSchemaOnly);
if (State is ReaderState.Consumed)
return false;
try
{
using var registration = isConsuming ? default : Connector.StartNestedCancellableOperation(cancellationToken);
// If we're in the middle of a resultset, consume it
if (State is ReaderState.BeforeResult or ReaderState.InResult)
await ConsumeResultSet(async).ConfigureAwait(false);
Debug.Assert(State is ReaderState.BetweenResults);
_hasRows = false;
var statements = _statements;
var statementIndex = StatementIndex;
if (statementIndex >= 0)
{
if (RowDescription is { } description && statements[statementIndex].IsPrepared && ColumnInfoCache is { } cache)
description.SetColumnInfoCache(new(cache, 0, ColumnCount));
if (statementIndex is 0 && _behavior.HasFlag(CommandBehavior.SingleResult) && !isConsuming)
{
await Consume(async).ConfigureAwait(false);
return false;
}
}
// We are now at the end of the previous result set. Read up to the next result set, if any.
// Non-prepared statements receive ParseComplete, BindComplete, DescriptionRow/NoData,
// prepared statements receive only BindComplete
for (statementIndex = ++StatementIndex; statementIndex < statements.Count; statementIndex = ++StatementIndex)
{
var statement = statements[statementIndex];
IBackendMessage msg;
if (statement.TryGetPrepared(out var preparedStatement))
{
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
RowDescription = preparedStatement.Description;
}
else // Non-prepared/preparing flow
{
preparedStatement = statement.PreparedStatement;
if (preparedStatement != null)
{
Debug.Assert(!preparedStatement.IsPrepared);
if (preparedStatement.StatementBeingReplaced != null)
{
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
preparedStatement.StatementBeingReplaced.CompleteUnprepare();
preparedStatement.StatementBeingReplaced = null;
}
}
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
if (statement.IsPreparing)
{
preparedStatement!.State = PreparedState.Prepared;
Connector.PreparedStatementManager.NumPrepared++;
statement.IsPreparing = false;
}
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
msg = await Connector.ReadMessage(async).ConfigureAwait(false);
RowDescription = statement.Description = msg.Code switch
{
BackendMessageCode.NoData => null,
// RowDescription messages are cached on the connector, but if we're auto-preparing, we need to
// clone our own copy which will last beyond the lifetime of this invocation.
BackendMessageCode.RowDescription => preparedStatement == null
? (RowDescriptionMessage)msg
: ((RowDescriptionMessage)msg).Clone(),
_ => throw Connector.UnexpectedMessageReceived(msg.Code)
};
}
if (RowDescription is not null)
{
if (ColumnInfoCache?.Length >= ColumnCount)
Array.Clear(ColumnInfoCache, 0, ColumnCount);
else
{
if (ColumnInfoCache is { } cache)
ArrayPool.Shared.Return(cache, clearArray: true);
ColumnInfoCache = ArrayPool.Shared.Rent(ColumnCount);
}
if (statement.IsPrepared)
RowDescription.LoadColumnInfoCache(Connector.SerializerOptions, ColumnInfoCache);
}
else
{
// Statement did not generate a resultset (e.g. INSERT)
// Read and process its completion message and move on to the next statement
// No need to read sequentially as it's not a DataRow
msg = await Connector.ReadMessage(async).ConfigureAwait(false);
switch (msg.Code)
{
case BackendMessageCode.CommandComplete:
case BackendMessageCode.EmptyQueryResponse:
break;
case BackendMessageCode.CopyInResponse:
throw Connector.Break(new NotSupportedException(
"COPY isn't supported in regular command execution - see https://www.npgsql.org/doc/copy.html for documentation on COPY with Npgsql. " +
"If you are trying to execute a SQL script created by pg_dump, pass the '--inserts' switch to disable generating COPY statements."));
case BackendMessageCode.CopyOutResponse:
throw Connector.Break(new NotSupportedException(
"COPY isn't supported in regular command execution - see https://www.npgsql.org/doc/copy.html for documentation on COPY with Npgsql."));
default:
throw Connector.UnexpectedMessageReceived(msg.Code);
}
ProcessMessage(msg);
if (statement.AppendErrorBarrier ?? Command.EnableErrorBarriers)
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
continue;
}
if ((Command.WrappingBatch is not null || StatementIndex is 0) && Command.InternalBatchCommands[StatementIndex] is { HasOutputParameters: true } command)
{
// If output parameters are present and this is the first row of the resultset,
// we must always read it in non-sequential mode because it will be traversed twice (once
// here for the parameters, then as a regular row).
msg = await Connector.ReadMessage(async, dataRowLoadingMode: DataRowLoadingMode.NonSequential).ConfigureAwait(false);
ProcessMessage(msg);
if (msg.Code == BackendMessageCode.DataRow)
{
Debug.Assert(RowDescription != null);
Debug.Assert(State == ReaderState.BeforeResult);
try
{
// Temporarily set our state to InResult and non-sequential to allow us to read the values, and in any order.
var isSequential = _isSequential;
var currentPosition = Buffer.ReadPosition;
State = ReaderState.InResult;
_isSequential = false;
try
{
command.PopulateOutputParameters(this, _commandLogger);
// On success we want to revert any row and column state for the user to be able to read the same row again.
if (async)
await PgReader.CommitAsync().ConfigureAwait(false);
else
PgReader.Commit();
State = ReaderState.BeforeResult; // Set the state back
Buffer.ReadPosition = currentPosition; // Restore position
_column = -1;
}
finally
{
// To be on the safe side we always revert this CommandBehavior state change, including on failure.
_isSequential = isSequential;
}
}
catch (Exception e)
{
// TODO: ideally we should flow down to global exception filter and consume there
await Consume(async, firstException: e).ConfigureAwait(false);
throw;
}
}
}
else
{
msg = await ReadMessage(async).ConfigureAwait(false);
ProcessMessage(msg);
}
switch (msg.Code)
{
case BackendMessageCode.DataRow:
Connector.State = ConnectorState.Fetching;
return true;
case BackendMessageCode.CommandComplete:
if (statement.AppendErrorBarrier ?? Command.EnableErrorBarriers)
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
return true;
default:
Connector.UnexpectedMessageReceived(msg.Code);
break;
}
}
// There are no more queries, we're done. Read the RFQ.
if (_statements.Count is 0 || !(_statements[^1].AppendErrorBarrier ?? Command.EnableErrorBarriers))
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
State = ReaderState.Consumed;
RowDescription = null;
return false;
}
catch (Exception e)
{
if (e is PostgresException postgresException && StatementIndex >= 0 && StatementIndex < _statements.Count)
{
var statement = _statements[StatementIndex];
// Reference the triggering statement from the exception
if (Connector.Settings.IncludeFailedBatchedCommand)
postgresException.BatchCommand = statement;
// Prevent the command or batch from being recycled (by the connection) when it's disposed. This is important since
// the exception is very likely to escape the using statement of the command, and by that time some other user may
// already be using the recycled instance.
// TODO: we probably should do than even if it's not PostgresException (error from PopulateOutputParameters)
Command.IsCacheable = false;
// If the schema of a table changes after a statement is prepared on that table, PostgreSQL errors with
// 0A000: cached plan must not change result type. 0A000 seems like a non-specific code, but it's very unlikely the
// statement would successfully execute anyway, so invalidate the prepared statement.
if (postgresException.SqlState == PostgresErrorCodes.FeatureNotSupported &&
statement.PreparedStatement is { } preparedStatement)
{
preparedStatement.State = PreparedState.Invalidated;
Command.ResetPreparation();
}
}
// For the statement that errored, if it was being prepared we need to update our bookkeeping to put them back in unprepared
// state.
for (; StatementIndex < _statements.Count; StatementIndex++)
{
var statement = _statements[StatementIndex];
if (statement.IsPreparing)
{
statement.IsPreparing = false;
statement.PreparedStatement!.AbortPrepare();
}
// In normal, non-isolated batching, we've consumed the result set and are done.
// However, if the command has error barrier, we now have to consume results from the commands after it (unless it's the
// last one).
// Note that Consume calls NextResult (this method) recursively, the isConsuming flag tells us we're in this mode.
// TODO: We might as well call Consume on every command (even the last one) to make sure we do read every single message until RFQ
// in case we get an exception in the middle of NextResult
if ((statement.AppendErrorBarrier ?? Command.EnableErrorBarriers) && StatementIndex < _statements.Count - 1)
{
if (isConsuming)
throw;
switch (State)
{
case ReaderState.Consumed:
case ReaderState.Closed:
case ReaderState.Disposed:
// The exception may have caused the connector to break (e.g. I/O), and so the reader is already closed.
break;
default:
// We provide Consume with the first exception which we've just caught.
// If it encounters other exceptions while consuming the rest of the result set, it will raise an AggregateException,
// otherwise it will rethrow this first exception.
await Consume(async, firstException: e).ConfigureAwait(false);
break; // Never reached, Consume always throws above
}
}
}
// Break may have progressed the reader already.
if (State is not ReaderState.Closed)
State = ReaderState.Consumed;
throw;
}
async ValueTask ConsumeResultSet(bool async)
{
await ConsumeRow(async).ConfigureAwait(false);
while (true)
{
var completedMsg = await Connector.ReadMessage(async, DataRowLoadingMode.Skip).ConfigureAwait(false);
switch (completedMsg.Code)
{
case BackendMessageCode.CommandComplete:
case BackendMessageCode.EmptyQueryResponse:
ProcessMessage(completedMsg);
var statement = _statements[StatementIndex];
if (statement.IsPrepared && ColumnInfoCache is not null)
RowDescription!.SetColumnInfoCache(new(ColumnInfoCache, 0, ColumnCount));
if (statement.AppendErrorBarrier ?? Command.EnableErrorBarriers)
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
break;
default:
// TODO if we hit an ErrorResponse here (PG doesn't do this *today*) we should probably throw.
continue;
}
break;
}
}
}
///
/// Note that in SchemaOnly mode there are no resultsets, and we read nothing from the backend (all
/// RowDescriptions have already been processed and are available)
///
async Task NextResultSchemaOnly(bool async, bool isConsuming = false, CancellationToken cancellationToken = default)
{
Debug.Assert(_isSchemaOnly);
if (State is ReaderState.Consumed)
return false;
using var registration = isConsuming ? default : Connector.StartNestedCancellableOperation(cancellationToken);
try
{
for (StatementIndex++; StatementIndex < _statements.Count; StatementIndex++)
{
var statement = _statements[StatementIndex];
if (statement.TryGetPrepared(out var preparedStatement))
{
// Row descriptions have already been populated in the statement objects at the
// Prepare phase
RowDescription = preparedStatement.Description;
}
else
{
var pStatement = statement.PreparedStatement;
if (pStatement != null)
{
Debug.Assert(!pStatement.IsPrepared);
if (pStatement.StatementBeingReplaced != null)
{
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
pStatement.StatementBeingReplaced.CompleteUnprepare();
pStatement.StatementBeingReplaced = null;
}
}
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
if (statement.IsPreparing)
{
pStatement!.State = PreparedState.Prepared;
Connector.PreparedStatementManager.NumPrepared++;
statement.IsPreparing = false;
}
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
var msg = await Connector.ReadMessage(async).ConfigureAwait(false);
switch (msg.Code)
{
case BackendMessageCode.NoData:
RowDescription = _statements[StatementIndex].Description = null;
break;
case BackendMessageCode.RowDescription:
// We have a resultset
// RowDescription messages are cached on the connector, but if we're auto-preparing, we need to
// clone our own copy which will last beyond the lifetime of this invocation.
RowDescription = _statements[StatementIndex].Description = preparedStatement == null
? (RowDescriptionMessage)msg
: ((RowDescriptionMessage)msg).Clone();
Command.FixupRowDescription(RowDescription, StatementIndex == 0);
break;
default:
throw Connector.UnexpectedMessageReceived(msg.Code);
}
var forall = true;
for (var i = StatementIndex + 1; i < _statements.Count; i++)
if (!_statements[i].IsPrepared)
{
forall = false;
break;
}
// There are no more queries, we're done. Read to the RFQ.
if (forall)
Expect(await Connector.ReadMessage(async).ConfigureAwait(false), Connector);
}
// Found a resultset
if (RowDescription is not null)
return true;
}
State = ReaderState.Consumed;
RowDescription = null;
return false;
}
catch (Exception e)
{
// Break may have progressed the reader already.
if (State is not ReaderState.Closed)
State = ReaderState.Consumed;
// Reference the triggering statement from the exception
if (e is PostgresException postgresException && StatementIndex >= 0 && StatementIndex < _statements.Count)
{
// Reference the triggering statement from the exception
if (Connector.Settings.IncludeFailedBatchedCommand)
postgresException.BatchCommand = _statements[StatementIndex];
// Prevent the command or batch from being recycled (by the connection) when it's disposed. This is important since
// the exception is very likely to escape the using statement of the command, and by that time some other user may
// already be using the recycled instance.
Command.IsCacheable = false;
}
// An error means all subsequent statements were skipped by PostgreSQL.
// If any of them were being prepared, we need to update our bookkeeping to put
// them back in unprepared state.
for (; StatementIndex < _statements.Count; StatementIndex++)
{
var statement = _statements[StatementIndex];
if (statement.IsPreparing)
{
statement.IsPreparing = false;
statement.PreparedStatement!.AbortPrepare();
}
}
throw;
}
}
#endregion
#region ProcessMessage
internal void ProcessMessage(IBackendMessage msg)
{
if (msg.Code is not BackendMessageCode.DataRow)
{
HandleUncommon(msg);
return;
}
var dataRow = (DataRowMessage)msg;
// The connector's buffer can actually change between DataRows:
// If a large DataRow exceeding the connector's current read buffer arrives, and we're
// reading in non-sequential mode, a new oversize buffer is allocated. We thus have to
// recapture the connector's buffer on each new DataRow.
// Note that this can happen even in sequential mode, if the row description message is big
// (see #2003)
if (!ReferenceEquals(Buffer, Connector.ReadBuffer))
Buffer = Connector.ReadBuffer;
// We assume that the row's number of columns is identical to the description's
var numColumns = Buffer.ReadInt16();
if (ColumnCount != numColumns)
ThrowHelper.ThrowArgumentException($"Row's number of columns ({numColumns}) differs from the row description's ({ColumnCount})");
var readPosition = Buffer.ReadPosition;
var msgRemainder = dataRow.Length - sizeof(short);
_dataMsgEnd = readPosition + msgRemainder;
_columnsStartPos = readPosition;
_isRowBuffered = msgRemainder <= Buffer.FilledBytes - readPosition;
Debug.Assert(_isRowBuffered || _isSequential);
_column = -1;
if (_columns.Count > 0)
_columns.Clear();
switch (State)
{
case ReaderState.BetweenResults:
_hasRows = true;
State = ReaderState.BeforeResult;
break;
case ReaderState.BeforeResult:
State = ReaderState.InResult;
break;
case ReaderState.InResult:
break;
default:
Connector.UnexpectedMessageReceived(BackendMessageCode.DataRow);
break;
}
[MethodImpl(MethodImplOptions.NoInlining)]
void HandleUncommon(IBackendMessage msg)
{
switch (msg.Code)
{
case BackendMessageCode.CommandComplete:
var completed = (CommandCompleteMessage)msg;
switch (completed.StatementType)
{
case StatementType.Update:
case StatementType.Insert:
case StatementType.Delete:
case StatementType.Copy:
case StatementType.Move:
case StatementType.Merge:
_recordsAffected ??= 0;
_recordsAffected += completed.Rows;
break;
}
_statements[StatementIndex].ApplyCommandComplete(completed);
State = ReaderState.BetweenResults;
break;
case BackendMessageCode.EmptyQueryResponse:
State = ReaderState.BetweenResults;
break;
default:
Connector.UnexpectedMessageReceived(msg.Code);
break;
}
}
}
#endregion
///
/// Gets a value indicating the depth of nesting for the current row. Always returns zero.
///
public override int Depth => 0;
///
/// Gets a value indicating whether the data reader is closed.
///
public override bool IsClosed => State is ReaderState.Closed or ReaderState.Disposed;
///
/// Gets the number of rows changed, inserted, or deleted by execution of the SQL statement.
///
///
/// The number of rows changed, inserted, or deleted. -1 for SELECT statements; 0 if no rows were affected or the statement failed.
///
public override int RecordsAffected
=> !_recordsAffected.HasValue
? -1
: _recordsAffected > int.MaxValue
? throw new OverflowException(
$"The number of records affected exceeds int.MaxValue. Use {nameof(Rows)}.")
: (int)_recordsAffected;
///
/// Gets the number of rows changed, inserted, or deleted by execution of the SQL statement.
///
///
/// The number of rows changed, inserted, or deleted. 0 for SELECT statements, if no rows were affected or the statement failed.
///
public ulong Rows => _recordsAffected ?? 0;
///
/// Returns details about each statement that this reader will or has executed.
///
///
/// Note that some fields (i.e. rows and oid) are only populated as the reader
/// traverses the result.
///
/// For commands with multiple queries, this exposes the number of rows affected on
/// a statement-by-statement basis, unlike
/// which exposes an aggregation across all statements.
///
[Obsolete("Use the new DbBatch API")]
public IReadOnlyList Statements
{
get
{
ThrowIfClosedOrDisposed();
return _statements.AsReadOnly();
}
}
///
/// Gets a value that indicates whether this DbDataReader contains one or more rows.
///
public override bool HasRows
{
get
{
ThrowIfClosedOrDisposed();
return _hasRows;
}
}
///
/// Indicates whether the reader is currently positioned on a row, i.e. whether reading a
/// column is possible.
/// This property is different from in that will
/// return true even if attempting to read a column will fail, e.g. before
/// has been called
///
public bool IsOnRow
{
get
{
ThrowIfClosedOrDisposed();
return State is ReaderState.InResult;
}
}
///
/// Gets the name of the column, given the zero-based column ordinal.
///
/// The zero-based column ordinal.
/// The name of the specified column.
public override string GetName(int ordinal) => GetField(ordinal).Name;
///
/// Gets the number of columns in the current row.
///
public override int FieldCount
{
get
{
ThrowIfClosedOrDisposed();
return RowDescription?.Count ?? 0;
}
}
#region Cleanup / Dispose
///
/// Consumes all result sets for this reader, leaving the connector ready for sending and processing further
/// queries
///
async Task Consume(bool async, Exception? firstException = null)
{
var exceptions = firstException is null ? null : new List { firstException };
// Skip over the other result sets. Note that this does tally records affected from CommandComplete messages, and properly sets
// state for auto-prepared statements
//
// The only exception is when the connector is broken (which can happen in the middle of consuming)
// As then there is no point in going forward.
// An exception to the exception above is when connector is concurrently closed while
// the reader is still going over the result set.
// While this is undefined behavior and user error, we should try to at least do our best to not loop indefinitely.
//
// While we can also check our local state (State == Closed)
// It's probably better to rely on connector since it's private and its state can't be changed
while (Connector.IsConnected)
{
try
{
if (!(_isSchemaOnly
? await NextResultSchemaOnly(async, isConsuming: true).ConfigureAwait(false)
: await NextResult(async, isConsuming: true).ConfigureAwait(false)))
{
break;
}
}
catch (Exception e)
{
exceptions ??= [];
exceptions.Add(e);
}
}
Debug.Assert(exceptions?.Count != 0);
switch (exceptions?.Count)
{
case null:
return;
case 1:
ExceptionDispatchInfo.Capture(exceptions[0]).Throw();
return;
default:
throw new NpgsqlException(
"Multiple exceptions occurred when consuming the result set",
new AggregateException(exceptions));
}
}
///
/// Releases the resources used by the .
///
protected override void Dispose(bool disposing)
{
try
{
Close(connectionClosing: false, async: false, isDisposing: true).GetAwaiter().GetResult();
}
catch (Exception ex)
{
// In the case of a PostgresException (or multiple ones, if we have error barriers), the reader's state has already been set
// to Disposed in Close above. Therefore, we only set the state to Disposed if the exception *wasn't* a PostgresException.
if (!(ex is PostgresException ||
ex is NpgsqlException { InnerException: AggregateException aggregateException } &&
AllPostgresExceptions(aggregateException.InnerExceptions)))
{
State = ReaderState.Disposed;
}
throw;
}
finally
{
Command.TraceCommandStop();
}
}
///
/// Releases the resources used by the .
///
public override async ValueTask DisposeAsync()
{
try
{
await Close(connectionClosing: false, async: true, isDisposing: true).ConfigureAwait(false);
}
catch (Exception ex)
{
// In the case of a PostgresException (or multiple ones, if we have error barriers), the reader's state has already been set
// to Disposed in Close above. Therefore, we only set the state to Disposed if the exception *wasn't* a PostgresException.
if (!(ex is PostgresException ||
ex is NpgsqlException { InnerException: AggregateException aggregateException } &&
AllPostgresExceptions(aggregateException.InnerExceptions)))
{
State = ReaderState.Disposed;
}
throw;
}
finally
{
Command.TraceCommandStop();
}
}
static bool AllPostgresExceptions(ReadOnlyCollection collection)
{
foreach (var exception in collection)
if (exception is not PostgresException)
return false;
return true;
}
///
/// Closes the reader, allowing a new command to be executed.
///
public override void Close() => Close(connectionClosing: false, async: false, isDisposing: false).GetAwaiter().GetResult();
///
/// Closes the reader, allowing a new command to be executed.
///
public override Task CloseAsync()
=> Close(async: true, connectionClosing: false, isDisposing: false);
internal async Task Close(bool async, bool connectionClosing, bool isDisposing)
{
if (State is ReaderState.Closed or ReaderState.Disposed)
{
if (isDisposing)
State = ReaderState.Disposed;
return;
}
// Whenever a connector is broken, it also closes the current reader.
Connector.CurrentReader = null;
switch (Connector.State)
{
case ConnectorState.Ready:
case ConnectorState.Fetching:
case ConnectorState.Executing:
case ConnectorState.Connecting:
if (State != ReaderState.Consumed)
{
try
{
await Consume(async).ConfigureAwait(false);
}
catch (Exception ex) when (ex is OperationCanceledException or NpgsqlException { InnerException: TimeoutException })
{
// Timeout/cancellation - completely normal, consume has basically completed.
}
catch (Exception ex) when (
ex is PostgresException ||
ex is NpgsqlException { InnerException: AggregateException aggregateException } &&
AllPostgresExceptions(aggregateException.InnerExceptions))
{
// In the case of a PostgresException (or multiple ones, if we have error barriers), the connection is fine and consume
// has basically completed. Defer throwing the exception until Cleanup is complete.
await Cleanup(async, connectionClosing, isDisposing).ConfigureAwait(false);
throw;
}
catch
{
Debug.Assert(Connector.IsBroken);
throw;
}
}
break;
case ConnectorState.Closed:
case ConnectorState.Broken:
break;
case ConnectorState.Waiting:
case ConnectorState.Copy:
case ConnectorState.Replication:
Debug.Fail("Bad connector state when closing reader: " + Connector.State);
break;
default:
throw new ArgumentOutOfRangeException();
}
await Cleanup(async, connectionClosing, isDisposing).ConfigureAwait(false);
}
internal async Task Cleanup(bool async, bool connectionClosing = false, bool isDisposing = false)
{
LogMessages.ReaderCleanup(_commandLogger, Connector.Id);
// _sendTask contains the task for the writing of this command.
// Make sure that this task, which may have executed asynchronously and in parallel with the reading,
// has completed, throwing any exceptions it generated. If we don't do this, there's the possibility of a race condition where the
// user executes a new command after reader.Dispose() returns, but some additional write stuff is still finishing up from the last
// command.
if (_sendTask is { Status: not TaskStatus.RanToCompletion })
{
// If the connector is broken, we have no reason to wait for the sendTask to complete
// as we're not going to send anything else over it
// and that can lead to deadlocks (concurrent write and read failure, see #4804)
if (Connector.IsBroken)
{
// Prevent unobserved Task notifications by observing the failed Task exception.
_ = _sendTask.ContinueWith(t => _ = t.Exception, CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Current);
}
else
{
try
{
if (async)
await _sendTask.ConfigureAwait(false);
else
_sendTask.GetAwaiter().GetResult();
}
catch (Exception e)
{
// TODO: think of a better way to handle exceptions, see #1323 and #3163
_commandLogger.LogDebug(e, "Exception caught while sending the request", Connector.Id);
}
}
}
if (ColumnInfoCache is { } cache)
{
ColumnInfoCache = null;
ArrayPool.Shared.Return(cache, clearArray: true);
}
State = ReaderState.Closed;
Command.State = CommandState.Idle;
Connector.CurrentReader = null;
if (_commandLogger.IsEnabled(LogLevel.Information))
Command.LogExecutingCompleted(Connector, executing: false);
NpgsqlEventSource.Log.CommandStop();
Connector.DataSource.MetricsReporter.ReportCommandStop(_startTimestamp);
Connector.EndUserAction();
if (isDisposing)
State = ReaderState.Disposed;
if (_behavior.HasFlag(CommandBehavior.CloseConnection) && !connectionClosing)
{
Debug.Assert(_connection is not null);
await _connection.Close(async).ConfigureAwait(false);
}
if (ReaderClosed != null)
{
ReaderClosed(this, EventArgs.Empty);
ReaderClosed = null;
}
}
#endregion
#region Simple value getters
///
/// Gets the value of the specified column as a Boolean.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override bool GetBoolean(int ordinal) => GetFieldValueCore(ordinal);
///
/// Gets the value of the specified column as a byte.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override byte GetByte(int ordinal) => GetFieldValueCore(ordinal);
///
/// Gets the value of the specified column as a single character.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override char GetChar(int ordinal) => GetFieldValueCore(ordinal);
///
/// Gets the value of the specified column as a 16-bit signed integer.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override short GetInt16(int ordinal) => GetFieldValueCore(ordinal);
///
/// Gets the value of the specified column as a 32-bit signed integer.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override int GetInt32(int ordinal) => GetFieldValueCore(ordinal);
///
/// Gets the value of the specified column as a 64-bit signed integer.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override long GetInt64(int ordinal) => GetFieldValueCore(ordinal);
///
/// Gets the value of the specified column as a object.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override DateTime GetDateTime(int ordinal) => GetFieldValueCore(ordinal);
///
/// Gets the value of the specified column as an instance of .
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override string GetString(int ordinal) => GetFieldValueCore(ordinal);
///
/// Gets the value of the specified column as a object.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override decimal GetDecimal(int ordinal) => GetFieldValueCore(ordinal);
///
/// Gets the value of the specified column as a double-precision floating point number.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override double GetDouble(int ordinal) => GetFieldValueCore(ordinal);
///
/// Gets the value of the specified column as a single-precision floating point number.
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override float GetFloat(int ordinal) => GetFieldValueCore(ordinal);
///
/// Gets the value of the specified column as a globally-unique identifier (GUID).
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override Guid GetGuid(int ordinal) => GetFieldValueCore(ordinal);
///
/// Populates an array of objects with the column values of the current row.
///
/// An array of Object into which to copy the attribute columns.
/// The number of instances of in the array.
public override int GetValues(object[] values)
{
ThrowIfNotInResult();
ArgumentNullException.ThrowIfNull(values);
var count = Math.Min(ColumnCount, values.Length);
for (var i = 0; i < count; i++)
values[i] = GetValue(i);
return count;
}
///
/// Gets the value of the specified column as an instance of .
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override object this[int ordinal] => GetValue(ordinal);
#endregion
#region Provider-specific simple type getters
///
/// Gets the value of the specified column as a TimeSpan,
///
///
/// PostgreSQL's interval type has a resolution of 1 microsecond and ranges from
/// -178000000 to 178000000 years, while .NET's TimeSpan has a resolution of 100 nanoseconds
/// and ranges from roughly -29247 to 29247 years.
/// See https://www.postgresql.org/docs/current/static/datatype-datetime.html
///
/// The zero-based column ordinal.
/// The value of the specified column.
public TimeSpan GetTimeSpan(int ordinal) => GetFieldValueCore(ordinal);
///
protected override DbDataReader GetDbDataReader(int ordinal) => GetData(ordinal);
///
/// Returns a nested data reader for the requested column.
/// The column type must be a record or a Npgsql known composite type, or an array thereof.
/// Currently only supported in non-sequential mode.
///
/// The zero-based column ordinal.
/// A data reader.
public new NpgsqlNestedDataReader GetData(int ordinal)
{
ThrowIfNotInResult();
var field = RowDescription[ordinal];
if (_isSequential)
ThrowHelper.ThrowNotSupportedException("GetData() not supported in sequential mode.");
var type = field.PostgresType;
var isArray = type is PostgresArrayType;
var elementType = isArray ? ((PostgresArrayType)type).Element : type;
var compositeType = elementType as PostgresCompositeType;
if (field.DataFormat is DataFormat.Text || (elementType.InternalName != "record" && compositeType == null))
ThrowHelper.ThrowInvalidCastException("GetData() not supported for type " + field.TypeDisplayName);
if (SeekToColumn(ordinal, field.DataFormat, resumableOp: true) is -1)
ThrowHelper.ThrowInvalidCastException_NoValue(field);
Debug.Assert(!PgReader.NestedInitialized, "Unexpected nested read active, Seek(0) would seek to the start of the nested data.");
PgReader.Seek(0);
var reader = CachedFreeNestedDataReader;
if (reader != null)
{
CachedFreeNestedDataReader = null;
reader.Init(compositeType);
}
else
{
reader = new NpgsqlNestedDataReader(this, null, 1, compositeType);
}
if (isArray)
reader.InitArray();
else
reader.InitSingleRow();
return reader;
}
#endregion
#region Special binary getters
///
/// Reads a stream of bytes from the specified column, starting at location indicated by dataOffset, into the buffer, starting at the location indicated by bufferOffset.
///
/// The zero-based column ordinal.
/// The index within the row from which to begin the read operation.
/// The buffer into which to copy the data.
/// The index with the buffer to which the data will be copied.
/// The maximum number of characters to read.
/// The actual number of bytes read.
public override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length)
{
ThrowIfNotInResult();
var field = RowDescription[ordinal];
if (dataOffset is < 0 or > int.MaxValue)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(dataOffset), "dataOffset must be between 0 and {0}", int.MaxValue);
if (buffer != null && (bufferOffset < 0 || bufferOffset >= buffer.Length + 1))
ThrowHelper.ThrowIndexOutOfRangeException("bufferOffset must be between 0 and {0}", buffer.Length);
if (buffer != null && (length < 0 || length > buffer.Length - bufferOffset))
ThrowHelper.ThrowIndexOutOfRangeException("bufferOffset must be between 0 and {0}", buffer.Length - bufferOffset);
if (SeekToColumn(ordinal, field.DataFormat, resumableOp: true) is var columnLength && columnLength is -1)
ThrowHelper.ThrowInvalidCastException_NoValue(field);
if (buffer is null)
return columnLength;
// Check whether any sequential seek is contractually sound (even though we might be able to satisfy rewinds we make sure we won't).
if (_isSequential && PgReader.IsFieldConsumed((int)dataOffset))
ThrowHelper.ThrowInvalidOperationException("Attempt to read a position in the column which has already been read");
// Move to offset
Debug.Assert(!PgReader.NestedInitialized, "Unexpected nested read active, Seek(0) would seek to the start of the nested data.");
var remaining = PgReader.Seek((int)dataOffset);
// At offset, read into buffer.
length = Math.Min(length, remaining);
PgReader.ReadBytes(new Span(buffer, bufferOffset, length));
return length;
}
///
/// Retrieves data as a .
///
/// The zero-based column ordinal.
/// The returned object.
public override Stream GetStream(int ordinal)
=> GetFieldValueCore(ordinal);
///
/// Retrieves data as a .
///
/// The zero-based column ordinal.
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
/// The returned object.
public Task GetStreamAsync(int ordinal, CancellationToken cancellationToken = default)
=> GetFieldValueAsync(ordinal, cancellationToken);
#endregion
#region Special text getters
///
/// Reads a stream of characters from the specified column, starting at location indicated by dataOffset, into the buffer, starting at the location indicated by bufferOffset.
///
/// The zero-based column ordinal.
/// The index within the row from which to begin the read operation.
/// The buffer into which to copy the data.
/// The index with the buffer to which the data will be copied.
/// The maximum number of characters to read.
/// The actual number of characters read.
public override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length)
{
ThrowIfNotInResult();
// Check whether we have a GetChars implementation for this column type.
var field = GetInfo(ordinal, typeof(GetChars), out var converter, out var bufferRequirement, out var asObject);
if (dataOffset is < 0 or > int.MaxValue)
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(dataOffset), "dataOffset must be between 0 and {0}", int.MaxValue);
if (buffer != null && (bufferOffset < 0 || bufferOffset >= buffer.Length + 1))
ThrowHelper.ThrowIndexOutOfRangeException("bufferOffset must be between 0 and {0}", buffer.Length);
if (buffer != null && (length < 0 || length > buffer.Length - bufferOffset))
ThrowHelper.ThrowIndexOutOfRangeException("bufferOffset must be between 0 and {0}", buffer.Length - bufferOffset);
if (SeekToColumn(ordinal, field, resumableOp: true) is -1)
ThrowHelper.ThrowInvalidCastException_NoValue(RowDescription[ordinal]);
var reader = PgReader;
dataOffset = buffer is null ? 0 : dataOffset;
if (_isSequential && reader.CharsRead > dataOffset)
ThrowHelper.ThrowInvalidOperationException("Attempt to read a position in the column which has already been read");
reader.StartCharsRead(checked((int)dataOffset),
buffer is not null ? new ArraySegment(buffer, bufferOffset, length) : (ArraySegment?)null);
reader.StartRead(bufferRequirement);
var result = asObject
? (GetChars)converter.ReadAsObject(reader)
: ((PgConverter)converter).Read(reader);
reader.EndRead();
reader.EndCharsRead();
return result.Read;
}
///
/// Retrieves data as a .
///
/// The zero-based column ordinal.
/// The returned object.
public override TextReader GetTextReader(int ordinal)
=> GetFieldValueCore(ordinal);
///
/// Retrieves data as a .
///
/// The zero-based column ordinal.
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
/// The returned object.
public Task GetTextReaderAsync(int ordinal, CancellationToken cancellationToken = default)
=> GetFieldValueAsync(ordinal, cancellationToken);
#endregion
#region GetFieldValue
///
/// Asynchronously gets the value of the specified column as a type.
///
/// The type of the value to be returned.
/// The type of the value to be returned.
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
///
public override Task GetFieldValueAsync(int ordinal, CancellationToken cancellationToken)
{
// As the row is buffered we know the column is too - no I/O will take place
if (_isRowBuffered)
return Task.FromResult(GetFieldValueCore(ordinal));
// The only statically mapped converter, it always exists.
if (typeof(T) == typeof(Stream))
return GetStream(ordinal, cancellationToken);
return Core(ordinal, cancellationToken).AsTask();
async ValueTask Core(int ordinal, CancellationToken cancellationToken)
{
ThrowIfNotInResult();
var field = GetInfo(ordinal, typeof(T), out var converter, out var bufferRequirement, out var asObject);
using var registration = Connector.StartNestedCancellableOperation(cancellationToken, attemptPgCancellation: false);
if (await SeekToColumnAsync(ordinal, field).ConfigureAwait(false) is -1)
return DbNullValueOrThrow(ordinal);
if (typeof(T) == typeof(TextReader))
PgReader.ThrowIfStreamActive();
Debug.Assert(asObject || converter is PgConverter);
await PgReader.StartReadAsync(bufferRequirement, cancellationToken).ConfigureAwait(false);
var result = asObject
? (T)await converter.ReadAsObjectAsync(PgReader, cancellationToken).ConfigureAwait(false)
: await converter.UnsafeDowncast().ReadAsync(PgReader, cancellationToken).ConfigureAwait(false);
await PgReader.EndReadAsync().ConfigureAwait(false);
return result;
}
async Task GetStream(int ordinal, CancellationToken cancellationToken)
{
using var registration = Connector.StartNestedCancellableOperation(cancellationToken, attemptPgCancellation: false);
var field = GetDefaultInfo(ordinal, out _, out _);
PgReader.ThrowIfStreamActive();
if (await SeekToColumnAsync(ordinal, field).ConfigureAwait(false) is -1)
return DbNullValueOrThrow(ordinal);
return (T)(object)PgReader.GetStream(canSeek: !_isSequential);
}
}
///
/// Synchronously gets the value of the specified column as a type.
///
/// Synchronously gets the value of the specified column as a type.
/// The column to be retrieved.
/// The column to be retrieved.
public override T GetFieldValue(int ordinal) => GetFieldValueCore(ordinal);
T GetFieldValueCore(int ordinal)
{
ThrowIfNotInResult();
// The only statically mapped converter, it always exists.
if (typeof(T) == typeof(Stream))
return GetStream(ordinal);
var field = GetInfo(ordinal, typeof(T), out var converter, out var bufferRequirement, out var asObject);
if (typeof(T) == typeof(TextReader))
PgReader.ThrowIfStreamActive();
if (SeekToColumn(ordinal, field) is -1)
return DbNullValueOrThrow(ordinal);
Debug.Assert(asObject || converter is PgConverter);
PgReader.StartRead(bufferRequirement);
var result = asObject
? (T)converter.ReadAsObject(PgReader)
: converter.UnsafeDowncast().Read(PgReader);
PgReader.EndRead();
return result;
[MethodImpl(MethodImplOptions.NoInlining)]
T GetStream(int ordinal)
{
var field = GetDefaultInfo(ordinal, out _, out _);
PgReader.ThrowIfStreamActive();
if (SeekToColumn(ordinal, field) is -1)
return DbNullValueOrThrow(ordinal);
return (T)(object)PgReader.GetStream(canSeek: !_isSequential);
}
}
#endregion
#region GetValue
///
/// Gets the value of the specified column as an instance of .
///
/// The zero-based column ordinal.
/// The value of the specified column.
public override object GetValue(int ordinal)
{
ThrowIfNotInResult();
var field = GetDefaultInfo(ordinal, out var converter, out var bufferRequirement);
if (SeekToColumn(ordinal, field) is -1)
return DBNull.Value;
PgReader.StartRead(bufferRequirement);
var result = converter.ReadAsObject(PgReader);
PgReader.EndRead();
return result;
}
///
/// Gets the value of the specified column as an instance of .
///
/// The name of the column.
/// The value of the specified column.
public override object this[string name] => GetValue(GetOrdinal(name));
#endregion
#region IsDBNull
///
/// Gets a value that indicates whether the column contains nonexistent or missing values.
///
/// The zero-based column ordinal.
/// true if the specified column is equivalent to ; otherwise false.
public override bool IsDBNull(int ordinal)
{
ThrowIfNotInResult();
return SeekToColumn(ordinal, RowDescription[ordinal].DataFormat, resumableOp: true) is -1;
}
///
/// An asynchronous version of , which gets a value that indicates whether the column contains non-existent or missing values.
/// The parameter is currently ignored.
///
/// The zero-based column to be retrieved.
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
/// true if the specified column value is equivalent to otherwise false.
public override Task IsDBNullAsync(int ordinal, CancellationToken cancellationToken)
{
if (_isRowBuffered)
return IsDBNull(ordinal) ? TrueTask : FalseTask;
return Core(ordinal, cancellationToken);
async Task Core(int ordinal, CancellationToken cancellationToken)
{
ThrowIfNotInResult();
using var registration = Connector.StartNestedCancellableOperation(cancellationToken, attemptPgCancellation: false);
return await SeekToColumnAsync(ordinal, RowDescription[ordinal].DataFormat, resumableOp: true).ConfigureAwait(false) is -1;
}
}
#endregion
#region Other public accessors
///
/// Gets the column ordinal given the name of the column.
///
/// The name of the column.
/// The zero-based column ordinal.
public override int GetOrdinal(string name)
{
ThrowIfClosedOrDisposed();
if (string.IsNullOrEmpty(name))
ThrowHelper.ThrowArgumentException($"{nameof(name)} cannot be empty", nameof(name));
if (RowDescription is null)
ThrowHelper.ThrowInvalidOperationException("No resultset is currently being traversed");
return RowDescription.GetFieldIndex(name);
}
///
/// Gets a representation of the PostgreSQL data type for the specified field.
/// The returned representation can be used to access various information about the field.
///
/// The zero-based column index.
public PostgresType GetPostgresType(int ordinal) => GetField(ordinal).PostgresType;
///
/// Gets the data type information for the specified field.
/// This is the PostgreSQL type name (e.g. double precision), not the .NET type
/// (see for that).
///
/// The zero-based column index.
public override string GetDataTypeName(int ordinal) => GetField(ordinal).TypeDisplayName;
///
/// Gets the OID for the PostgreSQL type for the specified field, as it appears in the pg_type table.
///
///
/// This is a PostgreSQL-internal value that should not be relied upon and should only be used for
/// debugging purposes.
///
/// The zero-based column index.
public uint GetDataTypeOID(int ordinal) => GetField(ordinal).TypeOID;
///
/// Gets the data type of the specified column.
///
/// The zero-based column ordinal.
/// The data type of the specified column.
[UnconditionalSuppressMessage("ILLink", "IL2093",
Justification = "Members are only dynamically accessed by Npgsql via GetFieldType by GetSchema, and only in certain cases. " +
"Holding PublicFields and PublicProperties metadata on all our mapped types just for that case is the wrong tradeoff.")]
public override Type GetFieldType(int ordinal)
=> GetField(ordinal).FieldType;
///
/// Returns an that can be used to iterate through the rows in the data reader.
///
/// An that can be used to iterate through the rows in the data reader.
public override IEnumerator GetEnumerator()
=> new DbEnumerator(this);
///
/// Returns schema information for the columns in the current resultset.
///
///
public ReadOnlyCollection GetColumnSchema()
=> GetColumnSchema(async: false).GetAwaiter().GetResult();
ReadOnlyCollection IDbColumnSchemaGenerator.GetColumnSchema()
{
var columns = GetColumnSchema();
var result = new DbColumn[columns.Count];
var i = 0;
foreach (var column in columns)
result[i++] = column;
return new ReadOnlyCollection(result);
}
///
/// Asynchronously returns schema information for the columns in the current resultset.
///
///
public override Task> GetColumnSchemaAsync(CancellationToken cancellationToken = default)
=> GetColumnSchema(async: true, cancellationToken);
Task> GetColumnSchema(bool async, CancellationToken cancellationToken = default) where T : DbColumn
=> RowDescription == null || ColumnCount == 0
? Task.FromResult(new List().AsReadOnly())
: new DbColumnSchemaGenerator(_connection!, RowDescription, _behavior.HasFlag(CommandBehavior.KeyInfo))
.GetColumnSchema(async, cancellationToken);
#endregion
#region Schema metadata table
///
/// Returns a System.Data.DataTable that describes the column metadata of the DataReader.
///
[UnconditionalSuppressMessage(
"Composite type mapping currently isn't trimming-safe, and warnings are generated at the MapComposite level.", "IL2026")]
public override DataTable? GetSchemaTable()
=> GetSchemaTable(async: false).GetAwaiter().GetResult();
///
/// Asynchronously returns a System.Data.DataTable that describes the column metadata of the DataReader.
///
[UnconditionalSuppressMessage(
"Composite type mapping currently isn't trimming-safe, and warnings are generated at the MapComposite level.", "IL2026")]
public override Task GetSchemaTableAsync(CancellationToken cancellationToken = default)
=> GetSchemaTable(async: true, cancellationToken);
[UnconditionalSuppressMessage("Trimming", "IL2111", Justification = "typeof(Type).TypeInitializer is not used.")]
async Task GetSchemaTable(bool async, CancellationToken cancellationToken = default)
{
if (FieldCount == 0) // No resultset
return null;
var table = new DataTable("SchemaTable");
// Note: column order is important to match SqlClient's, some ADO.NET users appear
// to assume ordering (see #1671)
table.Columns.Add("ColumnName", typeof(string));
table.Columns.Add("ColumnOrdinal", typeof(int));
table.Columns.Add("ColumnSize", typeof(int));
table.Columns.Add("NumericPrecision", typeof(int));
table.Columns.Add("NumericScale", typeof(int));
table.Columns.Add("IsUnique", typeof(bool));
table.Columns.Add("IsKey", typeof(bool));
table.Columns.Add("BaseServerName", typeof(string));
table.Columns.Add("BaseCatalogName", typeof(string));
table.Columns.Add("BaseColumnName", typeof(string));
table.Columns.Add("BaseSchemaName", typeof(string));
table.Columns.Add("BaseTableName", typeof(string));
table.Columns.Add("DataType", typeof(Type));
table.Columns.Add("AllowDBNull", typeof(bool));
table.Columns.Add("ProviderType", typeof(int));
table.Columns.Add("IsAliased", typeof(bool));
table.Columns.Add("IsExpression", typeof(bool));
table.Columns.Add("IsIdentity", typeof(bool));
table.Columns.Add("IsAutoIncrement", typeof(bool));
table.Columns.Add("IsRowVersion", typeof(bool));
table.Columns.Add("IsHidden", typeof(bool));
table.Columns.Add("IsLong", typeof(bool));
table.Columns.Add("IsReadOnly", typeof(bool));
table.Columns.Add("ProviderSpecificDataType", typeof(Type));
table.Columns.Add("DataTypeName", typeof(string));
foreach (var column in await GetColumnSchema(async, cancellationToken).ConfigureAwait(false))
{
var row = table.NewRow();
row["ColumnName"] = column.ColumnName;
row["ColumnOrdinal"] = column.ColumnOrdinal ?? -1;
row["ColumnSize"] = column.ColumnSize ?? -1;
row["NumericPrecision"] = column.NumericPrecision ?? 0;
row["NumericScale"] = column.NumericScale ?? 0;
row["IsUnique"] = column.IsUnique == true;
row["IsKey"] = column.IsKey == true;
row["BaseServerName"] = "";
row["BaseCatalogName"] = column.BaseCatalogName;
row["BaseColumnName"] = column.BaseColumnName;
row["BaseSchemaName"] = column.BaseSchemaName;
row["BaseTableName"] = column.BaseTableName;
row["DataType"] = column.DataType;
row["AllowDBNull"] = (object?)column.AllowDBNull ?? DBNull.Value;
row["ProviderType"] = column.NpgsqlDbType ?? NpgsqlDbType.Unknown;
row["IsAliased"] = column.IsAliased == true;
row["IsExpression"] = column.IsExpression == true;
row["IsIdentity"] = column.IsIdentity == true;
row["IsAutoIncrement"] = column.IsAutoIncrement == true;
row["IsRowVersion"] = false;
row["IsHidden"] = column.IsHidden == true;
row["IsLong"] = column.IsLong == true;
row["IsReadOnly"] = column.IsReadOnly == true;
row["DataTypeName"] = column.DataTypeName;
table.Rows.Add(row);
}
return table;
}
#endregion Schema metadata table
#region Seeking
[MethodImpl(MethodImplOptions.AggressiveInlining)]
int SeekToColumn(int ordinal, DataFormat dataFormat, bool resumableOp = false)
{
Debug.Assert(_isRowBuffered || _isSequential);
var reader = PgReader;
var column = _column;
// Column rereading rules for sequential mode:
// * We never allow rereading if the column didn't get initialized as resumable the previous time
// * If it did get initialized as resumable we only allow rereading when either of the following is true:
// - The op is a resumable one again
// - The op isn't resumable but the field is still entirely unconsumed
if (_isSequential && (column > ordinal || (column == ordinal && (!reader.Resumable || (!resumableOp && !reader.FieldAtStart)))))
ThrowInvalidSequentialSeek(column, ordinal);
if (column == ordinal)
return reader.Restart(resumableOp);
reader.Commit();
var columnLength = BufferSeekToColumn(column, ordinal, !_isRowBuffered);
reader.Init(columnLength, dataFormat, resumableOp);
return columnLength;
static void ThrowInvalidSequentialSeek(int column, int ordinal)
=> ThrowHelper.ThrowInvalidOperationException(
$"Invalid attempt to read from column ordinal '{ordinal}'. With CommandBehavior.SequentialAccess, " +
$"you may only read from column ordinal '{column}' or greater.");
}
ValueTask SeekToColumnAsync(int ordinal, DataFormat dataFormat, bool resumableOp = false)
{
// When the row is buffered or we're rereading previous data no IO will be done.
if (_isRowBuffered || _column >= ordinal)
return new(SeekToColumn(ordinal, dataFormat, resumableOp));
return Core(ordinal, dataFormat, resumableOp);
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
async ValueTask Core(int ordinal, DataFormat dataFormat, bool resumableOp)
{
Debug.Assert(!_isRowBuffered && _column < ordinal);
var reader = PgReader;
await reader.CommitAsync().ConfigureAwait(false);
var columnLength = await BufferSeekToColumnAsync(_column, ordinal, !_isRowBuffered).ConfigureAwait(false);
reader.Init(columnLength, dataFormat, resumableOp);
return columnLength;
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
int BufferSeekToColumn(int column, int ordinal, bool allowIO)
{
Debug.Assert(column < ordinal || !allowIO);
if (column >= ordinal)
{
_column = ordinal;
return SeekBackwards(ordinal);
}
// We know we need at least one iteration, a do while also helps with optimal codegen.
var buffer = Buffer;
var columnLength = 0;
do
{
if (columnLength > 0)
buffer.Skip(columnLength, allowIO);
if (allowIO)
buffer.Ensure(sizeof(int));
columnLength = buffer.ReadInt32();
Debug.Assert(columnLength >= -1);
} while (++_column < ordinal);
return columnLength;
// On the first call to SeekBackwards we'll fill up the columns list as we may need seek positions more than once.
[MethodImpl(MethodImplOptions.NoInlining)]
int SeekBackwards(int ordinal)
{
var buffer = Buffer;
var columns = _columns;
(buffer.ReadPosition, var columnLength) = columns.Count is 0
? (_columnsStartPos, 0)
: columns[Math.Min(columns.Count -1, ordinal)];
while (columns.Count <= ordinal)
{
if (columnLength > 0)
buffer.Skip(columnLength);
columnLength = buffer.ReadInt32();
columns.Add((buffer.ReadPosition, columnLength));
}
return columnLength;
}
}
ValueTask BufferSeekToColumnAsync(int column, int ordinal, bool allowIO)
{
return !allowIO || column >= ordinal ? new(BufferSeekToColumn(column, ordinal, allowIO)) : Core(ordinal);
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
async ValueTask Core(int ordinal)
{
// We know we need at least one iteration, a do while also helps with optimal codegen.
var buffer = Buffer;
var columnLength = 0;
do
{
if (columnLength > 0)
await buffer.Skip(async: true, columnLength).ConfigureAwait(false);
await buffer.EnsureAsync(sizeof(int)).ConfigureAwait(false);
columnLength = buffer.ReadInt32();
Debug.Assert(columnLength >= -1);
} while (++_column < ordinal);
return columnLength;
}
}
#endregion
#region ConsumeRow
Task ConsumeRow(bool async)
{
Debug.Assert(State is ReaderState.InResult or ReaderState.BeforeResult);
if (!_isRowBuffered)
return ConsumeRowSequential(async);
// We get here, if we're in a non-sequential mode (or the row is already in the buffer)
ConsumeBufferedRow();
return Task.CompletedTask;
async Task ConsumeRowSequential(bool async)
{
if (async)
await PgReader.CommitAsync().ConfigureAwait(false);
else
PgReader.Commit();
// Skip over the remaining columns in the row
var buffer = Buffer;
// Written as a while to be able to increment _column directly after reading into it.
while (_column < ColumnCount - 1)
{
await buffer.Ensure(4, async).ConfigureAwait(false);
var columnLength = buffer.ReadInt32();
_column++;
Debug.Assert(columnLength >= -1);
if (columnLength > 0)
await buffer.Skip(async, columnLength).ConfigureAwait(false);
}
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void ConsumeBufferedRow()
{
Debug.Assert(State is ReaderState.InResult or ReaderState.BeforeResult);
PgReader.Commit();
Buffer.ReadPosition = _dataMsgEnd;
}
#endregion
#region Checks
[MethodImpl(MethodImplOptions.NoInlining)]
T DbNullValueOrThrow(int ordinal)
{
// When T is a Nullable (and only in that case), we support returning null
if (default(T) is null && typeof(T).IsValueType)
return default!;
if (typeof(T) == typeof(object))
return (T)(object)DBNull.Value;
ThrowHelper.ThrowInvalidCastException_NoValue(RowDescription![ordinal]);
return default;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
DataFormat GetInfo(int ordinal, Type type, out PgConverter converter, out Size bufferRequirement, out bool asObject)
{
if ((uint)ordinal > (uint)ColumnCount)
ThrowHelper.ThrowIndexOutOfRangeException("Ordinal must be between 0 and " + (ColumnCount - 1));
ref var info = ref ColumnInfoCache![ordinal];
Debug.Assert(info.ConverterInfo.IsDefault || ReferenceEquals(Connector.SerializerOptions, info.ConverterInfo.TypeInfo.Options), "Cache is bleeding over");
if (info.ConverterInfo.TypeToConvert == type)
{
converter = info.ConverterInfo.Converter;
bufferRequirement = info.ConverterInfo.BufferRequirement;
asObject = info.AsObject;
return info.DataFormat;
}
return Slow(ref info, out converter, out bufferRequirement, out asObject);
[MethodImpl(MethodImplOptions.NoInlining)]
DataFormat Slow(ref ColumnInfo info, out PgConverter converter, out Size bufferRequirement, out bool asObject)
{
var field = RowDescription![ordinal];
field.GetInfo(type, ref info);
converter = info.ConverterInfo.Converter;
bufferRequirement = info.ConverterInfo.BufferRequirement;
asObject = info.AsObject;
return field.DataFormat;
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
DataFormat GetDefaultInfo(int ordinal, out PgConverter converter, out Size bufferRequirement)
{
var field = RowDescription![ordinal];
converter = field.ObjectInfo.Converter;
bufferRequirement = field.ObjectInfo.BufferRequirement;
return field.DataFormat;
}
///
/// Checks that we have a RowDescription, but not necessary an actual resultset
/// (for operations which work in SchemaOnly mode).
///
FieldDescription GetField(int ordinal)
{
ThrowIfClosedOrDisposed();
if (RowDescription is { } columns)
return columns[ordinal];
ThrowHelper.ThrowInvalidOperationException("No resultset is currently being traversed");
return default!;
}
void ThrowIfClosedOrDisposed()
{
if (State is (ReaderState.Closed or ReaderState.Disposed) and var state)
ThrowInvalidState(state);
}
[MemberNotNull(nameof(RowDescription))]
void ThrowIfNotInResult()
{
if (State is not ReaderState.InResult and var state)
ThrowInvalidState(state);
Debug.Assert(RowDescription is not null);
}
[MethodImpl(MethodImplOptions.NoInlining)]
static void ThrowInvalidState(ReaderState state)
{
switch (state)
{
case ReaderState.Closed:
ThrowHelper.ThrowInvalidOperationException("The reader is closed");
break;
case ReaderState.Disposed:
ThrowHelper.ThrowObjectDisposedException(nameof(NpgsqlDataReader));
break;
default:
ThrowHelper.ThrowInvalidOperationException("No resultset is currently being traversed");
break;
}
}
#endregion
#region Misc
///
/// Unbinds reader from the connector.
/// Should be called before the connector is returned to the pool.
///
internal void UnbindIfNecessary()
{
// We're closing the connection, but reader is not yet disposed
// We have to unbind the reader from the connector, otherwise there could be a concurrency issues
// See #3126 and #3290
if (State != ReaderState.Disposed)
{
Connector.DataReader = Connector.UnboundDataReader is { State: ReaderState.Disposed } previousReader
? previousReader
: new NpgsqlDataReader(Connector);
Connector.UnboundDataReader = this;
}
}
#endregion
}
enum ReaderState
{
BeforeResult,
InResult,
BetweenResults,
Consumed,
Closed,
Disposed,
}