using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Npgsql.Util;
using static System.Threading.Timeout;
namespace Npgsql.Internal;
///
/// A buffer used by Npgsql to read data from the socket efficiently.
/// Provides methods which decode different values types and tracks the current position.
///
[Experimental(NpgsqlDiagnostics.ConvertersExperimental)]
sealed partial class NpgsqlReadBuffer : IDisposable
{
#region Fields and Properties
#if DEBUG
internal static readonly bool BufferBoundsChecks = true;
#else
internal static readonly bool BufferBoundsChecks = Statics.EnableAssertions;
#endif
public NpgsqlConnection Connection => Connector.Connection!;
internal readonly NpgsqlConnector Connector;
internal Stream Underlying { private get; set; }
readonly Socket? _underlyingSocket;
internal ResettableCancellationTokenSource Cts { get; }
readonly MetricsReporter? _metricsReporter;
///
/// Timeout for sync and async reads
///
internal TimeSpan Timeout
{
get => Cts.Timeout;
set
{
if (Cts.Timeout != value)
{
Debug.Assert(_underlyingSocket != null);
_underlyingSocket.ReceiveTimeout = (int)value.TotalMilliseconds;
Cts.Timeout = value;
}
}
}
///
/// The total byte length of the buffer.
///
internal int Size { get; }
internal Encoding TextEncoding { get; }
///
/// Same as , except that it does not throw an exception if an invalid char is
/// encountered (exception fallback), but rather replaces it with a question mark character (replacement
/// fallback).
///
internal Encoding RelaxedTextEncoding { get; }
internal int ReadPosition { get; set; }
internal int ReadBytesLeft => FilledBytes - ReadPosition;
internal PgReader PgReader { get; }
long _flushedBytes; // this will always fit at least one message.
internal long CumulativeReadPosition
// Cast to uint to remove the sign extension (ReadPosition is never negative)
=> _flushedBytes + (uint)ReadPosition;
internal readonly byte[] Buffer;
internal int FilledBytes;
internal ReadOnlySpan Span => Buffer.AsSpan(ReadPosition, ReadBytesLeft);
readonly bool _usePool;
bool _disposed;
///
/// The minimum buffer size possible.
///
internal const int MinimumSize = 4096;
internal const int DefaultSize = 8192;
#endregion
#region Constructors
internal NpgsqlReadBuffer(
NpgsqlConnector? connector,
Stream stream,
Socket? socket,
int size,
Encoding textEncoding,
Encoding relaxedTextEncoding,
bool usePool = false)
{
ArgumentOutOfRangeException.ThrowIfLessThan(size, MinimumSize);
Connector = connector!; // TODO: Clean this up
Underlying = stream;
_underlyingSocket = socket;
_metricsReporter = connector?.DataSource.MetricsReporter;
Cts = new ResettableCancellationTokenSource();
Buffer = usePool ? ArrayPool.Shared.Rent(size) : new byte[size];
Size = Buffer.Length;
_usePool = usePool;
TextEncoding = textEncoding;
RelaxedTextEncoding = relaxedTextEncoding;
PgReader = new PgReader(this);
}
#endregion
#region I/O
public void Ensure(int count)
=> Ensure(count, async: false, readingNotifications: false).GetAwaiter().GetResult();
public ValueTask Ensure(int count, bool async)
=> Ensure(count, async, readingNotifications: false);
public ValueTask EnsureAsync(int count)
=> Ensure(count, async: true, readingNotifications: false);
// Can't share due to Span vs Memory difference (can't make a memory out of a span).
int ReadWithTimeout(Span buffer)
{
while (true)
{
try
{
var read = Underlying.Read(buffer);
_flushedBytes = unchecked(_flushedBytes + read);
NpgsqlEventSource.Log.BytesRead(read);
return read;
}
catch (Exception ex)
{
var connector = Connector;
if (ex is IOException { InnerException: SocketException { SocketErrorCode: SocketError.TimedOut } })
{
// If we should attempt PostgreSQL cancellation, do it the first time we get a timeout.
// TODO: As an optimization, we can still attempt to send a cancellation request, but after
// that immediately break the connection
if (connector is { AttemptPostgresCancellation: true, PostgresCancellationPerformed: false }
&& connector.PerformPostgresCancellation())
{
// Note that if the cancellation timeout is negative, we flow down and break the
// connection immediately.
var cancellationTimeout = connector.Settings.CancellationTimeout;
if (cancellationTimeout >= 0)
{
if (cancellationTimeout > 0)
Timeout = TimeSpan.FromMilliseconds(cancellationTimeout);
continue;
}
}
// If we're here, the PostgreSQL cancellation either failed or skipped entirely.
// Break the connection, bubbling up the correct exception type (cancellation or timeout)
throw connector.Break(CreateCancelException(connector));
}
throw connector.Break(new NpgsqlException("Exception while reading from stream", ex));
}
}
}
async ValueTask ReadWithTimeoutAsync(Memory buffer, CancellationToken cancellationToken)
{
var finalCt = Timeout != InfiniteTimeSpan
? Cts.Start(cancellationToken)
: Cts.Reset();
while (true)
{
try
{
var read = await Underlying.ReadAsync(buffer, finalCt).ConfigureAwait(false);
_flushedBytes = unchecked(_flushedBytes + read);
Cts.Stop();
NpgsqlEventSource.Log.BytesRead(read);
return read;
}
catch (Exception ex)
{
var connector = Connector;
Cts.Stop();
switch (ex)
{
// Read timeout
case OperationCanceledException:
// Note that mono throws SocketException with the wrong error (see #1330)
case IOException e when (e.InnerException as SocketException)?.SocketErrorCode ==
(Type.GetType("Mono.Runtime") == null ? SocketError.TimedOut : SocketError.WouldBlock):
{
Debug.Assert(ex is OperationCanceledException);
// If we should attempt PostgreSQL cancellation, do it the first time we get a timeout.
// TODO: As an optimization, we can still attempt to send a cancellation request, but after
// that immediately break the connection
if (connector is { AttemptPostgresCancellation: true, PostgresCancellationPerformed: false } &&
connector.PerformPostgresCancellation())
{
// Note that if the cancellation timeout is negative, we flow down and break the
// connection immediately.
var cancellationTimeout = connector.Settings.CancellationTimeout;
if (cancellationTimeout >= 0)
{
if (cancellationTimeout > 0)
Timeout = TimeSpan.FromMilliseconds(cancellationTimeout);
finalCt = Cts.Start(cancellationToken);
continue;
}
}
// If we're here, the PostgreSQL cancellation either failed or skipped entirely.
// Break the connection, bubbling up the correct exception type (cancellation or timeout)
throw connector.Break(CreateCancelException(connector));
}
default:
throw connector.Break(new NpgsqlException("Exception while reading from stream", ex));
}
}
}
}
static Exception CreateCancelException(NpgsqlConnector connector)
=> !connector.UserCancellationRequested
? NpgsqlTimeoutException()
: connector.PostgresCancellationPerformed
? new OperationCanceledException("Query was cancelled", TimeoutException(), connector.UserCancellationToken)
: new OperationCanceledException("Query was cancelled", connector.UserCancellationToken);
static Exception NpgsqlTimeoutException() => new NpgsqlException("Exception while reading from stream", TimeoutException());
static Exception TimeoutException() => new TimeoutException("Timeout during reading attempt");
///
/// Ensures that bytes are available in the buffer, and if
/// not, reads from the socket until enough is available.
///
internal ValueTask Ensure(int count, bool async, bool readingNotifications)
{
return count <= ReadBytesLeft ? new() : EnsureLong(this, count, async, readingNotifications);
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
static async ValueTask EnsureLong(
NpgsqlReadBuffer buffer,
int count,
bool async,
bool readingNotifications)
{
Debug.Assert(count <= buffer.Size);
Debug.Assert(count > buffer.ReadBytesLeft);
count -= buffer.ReadBytesLeft;
if (buffer.ReadPosition == buffer.FilledBytes)
{
buffer.ResetPosition();
}
else if (count > buffer.Size - buffer.FilledBytes)
{
Array.Copy(buffer.Buffer, buffer.ReadPosition, buffer.Buffer, 0, buffer.ReadBytesLeft);
buffer.FilledBytes = buffer.ReadBytesLeft;
buffer._flushedBytes = unchecked(buffer._flushedBytes + buffer.ReadPosition);
buffer.ReadPosition = 0;
}
var finalCt = async && buffer.Timeout != InfiniteTimeSpan
? buffer.Cts.Start()
: buffer.Cts.Reset();
var totalRead = 0;
while (count > 0)
{
try
{
var toRead = buffer.Size - buffer.FilledBytes;
var read = async
? await buffer.Underlying.ReadAsync(buffer.Buffer.AsMemory(buffer.FilledBytes, toRead), finalCt).ConfigureAwait(false)
: buffer.Underlying.Read(buffer.Buffer, buffer.FilledBytes, toRead);
if (read == 0)
throw new EndOfStreamException();
count -= read;
buffer.FilledBytes += read;
totalRead += read;
// Most of the time, it should be fine to reset cancellation token source, so we can use it again
// It's still possible for cancellation token to cancel between reading and resetting (although highly improbable)
// In this case, we consider it as timed out and fail with OperationCancelledException on next ReadAsync
// Or we consider it not timed out if we have already read everything (count == 0)
// In which case we reinitialize it on the next call to EnsureLong()
if (async && count > 0)
buffer.Cts.RestartTimeoutWithoutReset();
}
catch (Exception e)
{
var connector = buffer.Connector;
// Stopping twice (in case the previous Stop() call succeeded) doesn't hurt.
// Not stopping will cause an assertion failure in debug mode when we call Start() the next time.
// We can't stop in a finally block because Connector.Break() will dispose the buffer and the contained
// _timeoutCts
buffer.Cts.Stop();
switch (e)
{
// Read timeout
case OperationCanceledException:
// Note that mono throws SocketException with the wrong error (see #1330)
case IOException when (e.InnerException as SocketException)?.SocketErrorCode ==
(Type.GetType("Mono.Runtime") == null ? SocketError.TimedOut : SocketError.WouldBlock):
{
Debug.Assert(e is OperationCanceledException ? async : !async);
// When reading notifications (Wait), just throw TimeoutException or
// OperationCanceledException immediately.
// Nothing to cancel, and no breaking of the connection.
if (readingNotifications)
throw CreateException(connector);
// If we should attempt PostgreSQL cancellation, do it the first time we get a timeout.
// TODO: As an optimization, we can still attempt to send a cancellation request, but after
// that immediately break the connection
if (connector is { AttemptPostgresCancellation: true, PostgresCancellationPerformed: false } &&
connector.PerformPostgresCancellation())
{
// Note that if the cancellation timeout is negative, we flow down and break the
// connection immediately.
var cancellationTimeout = connector.Settings.CancellationTimeout;
if (cancellationTimeout >= 0)
{
if (cancellationTimeout > 0)
buffer.Timeout = TimeSpan.FromMilliseconds(cancellationTimeout);
if (async)
finalCt = buffer.Cts.Start();
continue;
}
}
// If we're here, the PostgreSQL cancellation either failed or skipped entirely.
// Break the connection, bubbling up the correct exception type (cancellation or timeout)
throw connector.Break(CreateException(connector));
static Exception CreateException(NpgsqlConnector connector)
=> !connector.UserCancellationRequested
? NpgsqlTimeoutException()
: connector.PostgresCancellationPerformed
? new OperationCanceledException("Query was cancelled", TimeoutException(), connector.UserCancellationToken)
: new OperationCanceledException("Query was cancelled", connector.UserCancellationToken);
}
default:
throw connector.Break(new NpgsqlException("Exception while reading from stream", e));
}
}
}
buffer.Cts.Stop();
NpgsqlEventSource.Log.BytesRead(totalRead);
buffer._metricsReporter?.ReportBytesRead(totalRead);
static Exception NpgsqlTimeoutException() => new NpgsqlException("Exception while reading from stream", TimeoutException());
static Exception TimeoutException() => new TimeoutException("Timeout during reading attempt");
}
}
internal ValueTask ReadMore(bool async) => Ensure(ReadBytesLeft + 1, async);
internal NpgsqlReadBuffer AllocateOversize(int count)
{
Debug.Assert(count > Size);
var tempBuf = new NpgsqlReadBuffer(Connector, Underlying, _underlyingSocket, count, TextEncoding, RelaxedTextEncoding, usePool: true);
if (_underlyingSocket != null)
tempBuf.Timeout = Timeout;
CopyTo(tempBuf);
ResetPosition();
return tempBuf;
}
///
/// Skip a given number of bytes.
///
internal void Skip(int len, bool allowIO)
{
Debug.Assert(len >= 0);
if (allowIO && len > ReadBytesLeft)
{
len -= ReadBytesLeft;
while (len > Size)
{
ResetPosition();
Ensure(Size);
len -= Size;
}
ResetPosition();
Ensure(len);
}
Debug.Assert(ReadBytesLeft >= len);
ReadPosition += len;
}
internal void Skip(int len)
{
Debug.Assert(ReadBytesLeft >= len);
ReadPosition += len;
}
///
/// Skip a given number of bytes.
///
public async Task Skip(bool async, int len)
{
Debug.Assert(len >= 0);
if (len > ReadBytesLeft)
{
len -= ReadBytesLeft;
while (len > Size)
{
ResetPosition();
await Ensure(Size, async).ConfigureAwait(false);
len -= Size;
}
ResetPosition();
await Ensure(len, async).ConfigureAwait(false);
}
ReadPosition += len;
}
#endregion
#region Read Simple
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public byte ReadByte()
{
CheckBounds(sizeof(byte));
var result = Buffer[ReadPosition];
ReadPosition += sizeof(byte);
return result;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public short ReadInt16()
{
CheckBounds(sizeof(short));
var result = BitConverter.IsLittleEndian
? BinaryPrimitives.ReverseEndianness(Unsafe.ReadUnaligned(ref Buffer[ReadPosition]))
: Unsafe.ReadUnaligned(ref Buffer[ReadPosition]);
ReadPosition += sizeof(short);
return result;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ushort ReadUInt16()
{
CheckBounds(sizeof(ushort));
var result = BitConverter.IsLittleEndian
? BinaryPrimitives.ReverseEndianness(Unsafe.ReadUnaligned(ref Buffer[ReadPosition]))
: Unsafe.ReadUnaligned(ref Buffer[ReadPosition]);
ReadPosition += sizeof(ushort);
return result;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int ReadInt32()
{
CheckBounds(sizeof(int));
var result = BitConverter.IsLittleEndian
? BinaryPrimitives.ReverseEndianness(Unsafe.ReadUnaligned(ref Buffer[ReadPosition]))
: Unsafe.ReadUnaligned(ref Buffer[ReadPosition]);
ReadPosition += sizeof(int);
return result;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public uint ReadUInt32()
{
CheckBounds(sizeof(uint));
var result = BitConverter.IsLittleEndian
? BinaryPrimitives.ReverseEndianness(Unsafe.ReadUnaligned(ref Buffer[ReadPosition]))
: Unsafe.ReadUnaligned(ref Buffer[ReadPosition]);
ReadPosition += sizeof(uint);
return result;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public long ReadInt64()
{
CheckBounds(sizeof(long));
var result = BitConverter.IsLittleEndian
? BinaryPrimitives.ReverseEndianness(Unsafe.ReadUnaligned(ref Buffer[ReadPosition]))
: Unsafe.ReadUnaligned(ref Buffer[ReadPosition]);
ReadPosition += sizeof(long);
return result;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ulong ReadUInt64()
{
CheckBounds(sizeof(ulong));
var result = BitConverter.IsLittleEndian
? BinaryPrimitives.ReverseEndianness(Unsafe.ReadUnaligned(ref Buffer[ReadPosition]))
: Unsafe.ReadUnaligned(ref Buffer[ReadPosition]);
ReadPosition += sizeof(ulong);
return result;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public float ReadSingle()
{
CheckBounds(sizeof(float));
var result = BitConverter.IsLittleEndian
? BitConverter.Int32BitsToSingle(BinaryPrimitives.ReverseEndianness(Unsafe.ReadUnaligned(ref Buffer[ReadPosition])))
: Unsafe.ReadUnaligned(ref Buffer[ReadPosition]);
ReadPosition += sizeof(float);
return result;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public double ReadDouble()
{
CheckBounds(sizeof(double));
var result = BitConverter.IsLittleEndian
? BitConverter.Int64BitsToDouble(BinaryPrimitives.ReverseEndianness(Unsafe.ReadUnaligned(ref Buffer[ReadPosition])))
: Unsafe.ReadUnaligned(ref Buffer[ReadPosition]);
ReadPosition += sizeof(double);
return result;
}
void CheckBounds(int count)
{
if (BufferBoundsChecks)
Core(count);
[MethodImpl(MethodImplOptions.NoInlining)]
void Core(int count)
{
if (count > ReadBytesLeft)
ThrowHelper.ThrowInvalidOperationException("There is not enough data left in the buffer.");
}
}
public string ReadString(int byteLen)
{
Debug.Assert(byteLen <= ReadBytesLeft);
var result = TextEncoding.GetString(Buffer, ReadPosition, byteLen);
ReadPosition += byteLen;
return result;
}
public void ReadBytes(Span output)
{
Debug.Assert(output.Length <= ReadBytesLeft);
new Span(Buffer, ReadPosition, output.Length).CopyTo(output);
ReadPosition += output.Length;
}
public void ReadBytes(byte[] output, int outputOffset, int len)
=> ReadBytes(new Span(output, outputOffset, len));
public ReadOnlyMemory ReadMemory(int len)
{
Debug.Assert(len <= ReadBytesLeft);
var memory = new ReadOnlyMemory(Buffer, ReadPosition, len);
ReadPosition += len;
return memory;
}
#endregion
#region Read Complex
public int Read(bool commandScoped, Span output)
{
var readFromBuffer = Math.Min(ReadBytesLeft, output.Length);
if (readFromBuffer > 0)
{
Buffer.AsSpan(ReadPosition, readFromBuffer).CopyTo(output);
ReadPosition += readFromBuffer;
return readFromBuffer;
}
// Only reset if we'll be able to read data, this is to support zero-byte reads.
if (output.Length > 0)
{
Debug.Assert(ReadBytesLeft == 0);
ResetPosition();
}
if (commandScoped)
return ReadWithTimeout(output);
try
{
var read = Underlying.Read(output);
_flushedBytes = unchecked(_flushedBytes + read);
NpgsqlEventSource.Log.BytesRead(read);
return read;
}
catch (Exception e)
{
throw Connector.Break(new NpgsqlException("Exception while reading from stream", e));
}
}
public ValueTask ReadAsync(bool commandScoped, Memory output, CancellationToken cancellationToken = default)
{
var readFromBuffer = Math.Min(ReadBytesLeft, output.Length);
if (readFromBuffer > 0)
{
Buffer.AsSpan(ReadPosition, readFromBuffer).CopyTo(output.Span);
ReadPosition += readFromBuffer;
return new ValueTask(readFromBuffer);
}
return ReadAsyncLong(this, commandScoped, output, cancellationToken);
static async ValueTask ReadAsyncLong(NpgsqlReadBuffer buffer, bool commandScoped, Memory output, CancellationToken cancellationToken)
{
// Only reset if we'll be able to read data, this is to support zero-byte reads.
if (output.Length > 0)
{
Debug.Assert(buffer.ReadBytesLeft == 0);
buffer.ResetPosition();
}
if (commandScoped)
return await buffer.ReadWithTimeoutAsync(output, cancellationToken).ConfigureAwait(false);
try
{
var read = await buffer.Underlying.ReadAsync(output, cancellationToken).ConfigureAwait(false);
buffer._flushedBytes = unchecked(buffer._flushedBytes + read);
NpgsqlEventSource.Log.BytesRead(read);
return read;
}
catch (Exception e)
{
throw buffer.Connector.Break(new NpgsqlException("Exception while reading from stream", e));
}
}
}
ColumnStream? _lastStream;
public ColumnStream CreateStream(int len, bool canSeek, bool consumeOnDispose = true)
{
if (_lastStream is not { IsDisposed: true })
_lastStream = new ColumnStream(Connector);
_lastStream.Init(len, canSeek, Connector.Settings.ReplicationMode == ReplicationMode.Off, consumeOnDispose);
return _lastStream;
}
///
/// Seeks the first null terminator (\0) and returns the string up to it. The buffer must already
/// contain the entire string and its terminator.
///
public string ReadNullTerminatedString()
=> ReadNullTerminatedString(TextEncoding, async: false).GetAwaiter().GetResult();
///
/// Seeks the first null terminator (\0) and returns the string up to it. The buffer must already
/// contain the entire string and its terminator. If any character could not be decoded, a question
/// mark character is returned instead of throwing an exception.
///
public string ReadNullTerminatedStringRelaxed()
=> ReadNullTerminatedString(RelaxedTextEncoding, async: false).GetAwaiter().GetResult();
public ValueTask ReadNullTerminatedString(bool async, CancellationToken cancellationToken = default)
=> ReadNullTerminatedString(TextEncoding, async, cancellationToken);
///
/// Seeks the first null terminator (\0) and returns the string up to it. Reads additional data from the network if a null
/// terminator isn't found in the buffered data.
///
public ValueTask ReadNullTerminatedString(Encoding encoding, bool async, CancellationToken cancellationToken = default)
{
var index = Span.IndexOf((byte)0);
if (index >= 0)
{
var result = new ValueTask(encoding.GetString(Buffer, ReadPosition, index));
ReadPosition += index + 1;
return result;
}
return ReadLong(encoding, async);
async ValueTask ReadLong(Encoding encoding, bool async)
{
var chunkSize = FilledBytes - ReadPosition;
var tempBuf = ArrayPool.Shared.Rent(chunkSize + 1024);
try
{
bool foundTerminator;
var byteLen = chunkSize;
Array.Copy(Buffer, ReadPosition, tempBuf, 0, chunkSize);
ReadPosition += chunkSize;
do
{
await ReadMore(async).ConfigureAwait(false);
Debug.Assert(ReadPosition == 0);
foundTerminator = false;
int i;
for (i = 0; i < FilledBytes; i++)
{
if (Buffer[i] == 0)
{
foundTerminator = true;
break;
}
}
if (byteLen + i > tempBuf.Length)
{
var newTempBuf = ArrayPool.Shared.Rent(
foundTerminator ? byteLen + i : byteLen + i + 1024);
Array.Copy(tempBuf, 0, newTempBuf, 0, byteLen);
ArrayPool.Shared.Return(tempBuf);
tempBuf = newTempBuf;
}
Array.Copy(Buffer, 0, tempBuf, byteLen, i);
byteLen += i;
ReadPosition = i;
} while (!foundTerminator);
ReadPosition++;
return encoding.GetString(tempBuf, 0, byteLen);
}
finally
{
ArrayPool.Shared.Return(tempBuf);
}
}
}
public ReadOnlySpan GetNullTerminatedBytes()
{
var i = Span.IndexOf((byte)0);
Debug.Assert(i >= 0);
var result = new ReadOnlySpan(Buffer, ReadPosition, i);
ReadPosition += i + 1;
return result;
}
#endregion
#region Dispose
public void Dispose()
{
if (_disposed)
return;
if (_usePool)
ArrayPool.Shared.Return(Buffer);
Cts.Dispose();
_disposed = true;
}
#endregion
#region Misc
void ResetPosition()
{
_flushedBytes = unchecked(_flushedBytes + FilledBytes);
ReadPosition = 0;
FilledBytes = 0;
}
internal void ResetFlushedBytes() => _flushedBytes = 0;
internal void CopyTo(NpgsqlReadBuffer other)
{
Debug.Assert(other.Size - other.FilledBytes >= ReadBytesLeft);
Array.Copy(Buffer, ReadPosition, other.Buffer, other.FilledBytes, ReadBytesLeft);
other.FilledBytes += ReadBytesLeft;
}
#endregion
}