Skip to content

Commit c6e7d35

Browse files
committed
Use worker_readPtr and worker_writePtr when feasible
This mostly affects reading and writing network sockets, avoiding an allocation and extra copy inside *_getReadablePtr and *_getWriteablePtr when the MemoryManager is disabled or hasn't mapped the applicable memory.
1 parent 4c433c8 commit c6e7d35

17 files changed

Lines changed: 227 additions & 103 deletions

File tree

src/main/core/worker.c

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -894,4 +894,24 @@ int worker_readPtr(void* dst, PluginVirtualPtr src, size_t n) {
894894
int worker_writePtr(PluginVirtualPtr dst, void* src, size_t n) {
895895
Worker* worker = _worker_getPrivate();
896896
return process_writePtr(worker->active.process, worker->active.thread, dst, src, n);
897+
}
898+
899+
const void* worker_getReadablePtr(PluginVirtualPtr src, size_t n) {
900+
Worker* worker = _worker_getPrivate();
901+
return process_getReadablePtr(worker->active.process, worker->active.thread, src, n);
902+
}
903+
904+
void* worker_getWritablePtr(PluginVirtualPtr dst, size_t n) {
905+
Worker* worker = _worker_getPrivate();
906+
return process_getWriteablePtr(worker->active.process, worker->active.thread, dst, n);
907+
}
908+
909+
void* worker_getMutablePtr(PluginVirtualPtr dst, size_t n) {
910+
Worker* worker = _worker_getPrivate();
911+
return process_getMutablePtr(worker->active.process, worker->active.thread, dst, n);
912+
}
913+
914+
void worker_flushPtrs() {
915+
Worker* worker = _worker_getPrivate();
916+
return process_flushPtrs(worker->active.process, worker->active.thread);
897917
}

src/main/core/worker.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "main/core/support/options.h"
1717
#include "main/core/work/task.h"
1818
#include "main/host/host.h"
19+
#include "main/host/syscall_types.h"
1920
#include "main/routing/address.h"
2021
#include "main/routing/dns.h"
2122
#include "main/routing/packet.h"
@@ -104,6 +105,19 @@ void worker_incrementPluginError();
104105
Address* worker_resolveIPToAddress(in_addr_t ip);
105106
Address* worker_resolveNameToAddress(const gchar* name);
106107

108+
// Get a readable pointer in the current active Process.
109+
const void* worker_getReadablePtr(PluginVirtualPtr src, size_t n);
110+
111+
// Get a writable pointer in the current active Process.
112+
void* worker_getWritablePtr(PluginVirtualPtr dst, size_t n);
113+
114+
// Get a mutable pointer containing the data at `dst`.
115+
void* worker_getMutablePtr(PluginVirtualPtr dst, size_t n);
116+
117+
// Flushes and invalidates previously returned readable, writable, and mutable
118+
// pointers.
119+
void worker_flushPtrs();
120+
107121
// Copy `n` bytes from `src` in the current active Process to `dst`. Returns 0
108122
// on success or EFAULT if any of the specified range couldn't be accessed.
109123
int worker_readPtr(void* dst, PluginVirtualPtr src, size_t n);

src/main/host/descriptor/channel.c

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ static void channel_free(LegacyDescriptor* descriptor) {
7070
worker_count_deallocation(Channel);
7171
}
7272

73-
static gssize channel_linkedWrite(Channel* channel, gconstpointer buffer, gsize nBytes) {
73+
static gssize channel_linkedWrite(Channel* channel, PluginVirtualPtr buffer, gsize nBytes) {
7474
MAGIC_ASSERT(channel);
7575
/* our linked channel is trying to send us data, make sure we can read it */
7676
utility_assert(!(channel->type & CT_WRITEONLY));
@@ -81,9 +81,17 @@ static gssize channel_linkedWrite(Channel* channel, gconstpointer buffer, gsize
8181
return (gssize)-EWOULDBLOCK;
8282
}
8383

84-
/* accept some data from the other end of the pipe */
84+
// TODO: Add an interface to bytequeue that would allow us to use
85+
// worker_readPtr to read directly into its storage, saving a potential copy
86+
// inside worker_getReadablePtr. e.g. one that accepts a read callback.
8587
gsize copyLength = MIN(nBytes, available);
86-
bytequeue_push(channel->buffer, buffer, copyLength);
88+
const void* readablePtr = worker_getReadablePtr(buffer, copyLength);
89+
if (!readablePtr) {
90+
return -EFAULT;
91+
}
92+
93+
/* accept some data from the other end of the pipe */
94+
bytequeue_push(channel->buffer, readablePtr, copyLength);
8795
channel->bufferLength += copyLength;
8896

8997
/* we just got some data in our buffer */
@@ -92,8 +100,8 @@ static gssize channel_linkedWrite(Channel* channel, gconstpointer buffer, gsize
92100
return copyLength;
93101
}
94102

95-
static gssize channel_sendUserData(Transport* transport, gconstpointer buffer,
96-
gsize nBytes, in_addr_t ip, in_port_t port) {
103+
static gssize channel_sendUserData(Transport* transport, PluginVirtualPtr buffer, gsize nBytes,
104+
in_addr_t ip, in_port_t port) {
97105
Channel* channel = _channel_fromLegacyDescriptor((LegacyDescriptor*)transport);
98106
MAGIC_ASSERT(channel);
99107
/* the read end of a unidirectional pipe can not write! */
@@ -118,9 +126,8 @@ static gssize channel_sendUserData(Transport* transport, gconstpointer buffer,
118126
return result;
119127
}
120128

121-
static gssize channel_receiveUserData(Transport* transport, gpointer buffer,
122-
gsize nBytes, in_addr_t* ip,
123-
in_port_t* port) {
129+
static gssize channel_receiveUserData(Transport* transport, PluginVirtualPtr buffer, gsize nBytes,
130+
in_addr_t* ip, in_port_t* port) {
124131
Channel* channel = _channel_fromLegacyDescriptor((LegacyDescriptor*)transport);
125132
MAGIC_ASSERT(channel);
126133
/* the write end of a unidirectional pipe can not read! */
@@ -138,9 +145,17 @@ static gssize channel_receiveUserData(Transport* transport, gpointer buffer,
138145
}
139146
}
140147

141-
/* accept some data from the other end of the pipe */
148+
// TODO: Use `worker_writePtr` to write directly into the process's memory,
149+
// saving a potential copy in worker_getWritablePtr. e.g. add an interface
150+
// to bytequeue that takes a `write` callback.
142151
gsize copyLength = MIN(nBytes, available);
143-
gsize numCopied = bytequeue_pop(channel->buffer, buffer, copyLength);
152+
void* writableBuf = worker_getWritablePtr(buffer, copyLength);
153+
if (!writableBuf) {
154+
return -EFAULT;
155+
}
156+
157+
/* accept some data from the other end of the pipe */
158+
gsize numCopied = bytequeue_pop(channel->buffer, writableBuf, copyLength);
144159
channel->bufferLength -= numCopied;
145160

146161
/* we are no longer readable if we have nothing left */

src/main/host/descriptor/socket.c

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,17 +78,16 @@ static gboolean _socket_close(LegacyDescriptor* descriptor) {
7878
return socket->vtable->close((LegacyDescriptor*)socket);
7979
}
8080

81-
static gssize _socket_sendUserData(Transport* transport, gconstpointer buffer,
82-
gsize nBytes, in_addr_t ip, in_port_t port) {
81+
static gssize _socket_sendUserData(Transport* transport, PluginVirtualPtr buffer, gsize nBytes,
82+
in_addr_t ip, in_port_t port) {
8383
Socket* socket = _socket_fromLegacyDescriptor((LegacyDescriptor*)transport);
8484
MAGIC_ASSERT(socket);
8585
MAGIC_ASSERT(socket->vtable);
8686
return socket->vtable->send((Transport*)socket, buffer, nBytes, ip, port);
8787
}
8888

89-
static gssize _socket_receiveUserData(Transport* transport, gpointer buffer,
90-
gsize nBytes, in_addr_t* ip,
91-
in_port_t* port) {
89+
static gssize _socket_receiveUserData(Transport* transport, PluginVirtualPtr buffer, gsize nBytes,
90+
in_addr_t* ip, in_port_t* port) {
9291
Socket* socket = _socket_fromLegacyDescriptor((LegacyDescriptor*)transport);
9392
MAGIC_ASSERT(socket);
9493
MAGIC_ASSERT(socket->vtable);

src/main/host/descriptor/tcp.c

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -803,7 +803,8 @@ static void _tcp_updateSendWindow(TCP* tcp) {
803803
tcp->send.window = (guint32)MIN(tcp->cong.cwnd, (gint)tcp->receive.lastWindow);
804804
}
805805

806-
static Packet* _tcp_createPacket(TCP* tcp, enum ProtocolTCPFlags flags, gconstpointer payload, gsize payloadLength) {
806+
static Packet* _tcp_createPacket(TCP* tcp, enum ProtocolTCPFlags flags, PluginVirtualPtr payload,
807+
gsize payloadLength) {
807808
MAGIC_ASSERT(tcp);
808809

809810
/*
@@ -856,7 +857,7 @@ static void _tcp_sendControlPacket(TCP* tcp, enum ProtocolTCPFlags flags) {
856857
tcp->super.boundString, tcp->super.peerString);
857858

858859
/* create the ack packet, without any payload data */
859-
Packet* control = _tcp_createPacket(tcp, flags, NULL, 0);
860+
Packet* control = _tcp_createPacket(tcp, flags, (PluginVirtualPtr){0}, 0);
860861

861862
/* make sure it gets sent before whatever else is in the queue */
862863
packet_setPriority(control, 0.0);
@@ -1096,7 +1097,7 @@ static void _tcp_sendShutdownFin(TCP* tcp) {
10961097

10971098
if(sendFin) {
10981099
/* send a fin */
1099-
Packet* fin = _tcp_createPacket(tcp, PTCP_FIN, NULL, 0);
1100+
Packet* fin = _tcp_createPacket(tcp, PTCP_FIN, (PluginVirtualPtr){0}, 0);
11001101
_tcp_bufferPacketOut(tcp, fin);
11011102
_tcp_flush(tcp);
11021103

@@ -2195,8 +2196,8 @@ static void _tcp_endOfFileSignalled(TCP* tcp, enum TCPFlags flags) {
21952196
}
21962197
}
21972198

2198-
static gssize _tcp_sendUserData(Transport* transport, gconstpointer buffer,
2199-
gsize nBytes, in_addr_t ip, in_port_t port) {
2199+
static gssize _tcp_sendUserData(Transport* transport, PluginVirtualPtr buffer, gsize nBytes,
2200+
in_addr_t ip, in_port_t port) {
22002201
TCP* tcp = _tcp_fromLegacyDescriptor((LegacyDescriptor*)transport);
22012202
MAGIC_ASSERT(tcp);
22022203

@@ -2226,7 +2227,8 @@ static gssize _tcp_sendUserData(Transport* transport, gconstpointer buffer,
22262227
gsize copyLength = MIN(maxPacketLength, remaining);
22272228

22282229
/* use helper to create the packet */
2229-
Packet* packet = _tcp_createPacket(tcp, PTCP_ACK, buffer + bytesCopied, copyLength);
2230+
Packet* packet = _tcp_createPacket(
2231+
tcp, PTCP_ACK, (PluginVirtualPtr){.val = buffer.val + bytesCopied}, copyLength);
22302232
if(copyLength > 0) {
22312233
/* we are sending more user data */
22322234
tcp->send.end++;
@@ -2262,9 +2264,8 @@ static void _tcp_sendWindowUpdate(TCP* tcp, gpointer data) {
22622264
tcp->receive.windowUpdatePending = FALSE;
22632265
}
22642266

2265-
static gssize _tcp_receiveUserData(Transport* transport, gpointer buffer,
2266-
gsize nBytes, in_addr_t* ip,
2267-
in_port_t* port) {
2267+
static gssize _tcp_receiveUserData(Transport* transport, PluginVirtualPtr buffer, gsize nBytes,
2268+
in_addr_t* ip, in_port_t* port) {
22682269
TCP* tcp = _tcp_fromLegacyDescriptor((LegacyDescriptor*)transport);
22692270
MAGIC_ASSERT(tcp);
22702271

@@ -2280,7 +2281,6 @@ static gssize _tcp_receiveUserData(Transport* transport, gpointer buffer,
22802281
_tcp_flush(tcp);
22812282

22822283
gsize remaining = nBytes;
2283-
gsize bytesCopied = 0;
22842284
gsize totalCopied = 0;
22852285
gsize offset = 0;
22862286
gsize copyLength = 0;
@@ -2291,7 +2291,7 @@ static gssize _tcp_receiveUserData(Transport* transport, gpointer buffer,
22912291
return -EWOULDBLOCK;
22922292
}
22932293

2294-
if (buffer == NULL && nBytes > 0) {
2294+
if (buffer.val == 0 && nBytes > 0) {
22952295
info("Can't recv >0 bytes into NULL buffer on socket");
22962296
return -EFAULT;
22972297
}
@@ -2303,7 +2303,12 @@ static gssize _tcp_receiveUserData(Transport* transport, gpointer buffer,
23032303
utility_assert(partialBytes > 0);
23042304

23052305
copyLength = MIN(partialBytes, remaining);
2306-
bytesCopied = packet_copyPayload(tcp->partialUserDataPacket, tcp->partialOffset, buffer, copyLength);
2306+
gssize bytesCopied =
2307+
packet_copyPayload(tcp->partialUserDataPacket, tcp->partialOffset, buffer, copyLength);
2308+
if (bytesCopied < 0) {
2309+
// Error writing to PluginVirtualPtr
2310+
return bytesCopied;
2311+
}
23072312
totalCopied += bytesCopied;
23082313
remaining -= bytesCopied;
23092314
offset += bytesCopied;
@@ -2329,19 +2334,30 @@ static gssize _tcp_receiveUserData(Transport* transport, gpointer buffer,
23292334

23302335
/* get the next buffered packet - we'll always need it.
23312336
* this could mark the socket as unreadable if this is its last packet.*/
2332-
Packet* packet = socket_removeFromInputBuffer((Socket*)tcp);
2333-
if(!packet) {
2337+
const Packet* nextPacket = socket_peekNextInPacket((Socket*)tcp);
2338+
if (!nextPacket) {
23342339
/* no more packets or partial packets */
23352340
break;
23362341
}
23372342

2338-
guint packetLength = packet_getPayloadLength(packet);
2343+
guint packetLength = packet_getPayloadLength(nextPacket);
23392344
copyLength = MIN(packetLength, remaining);
2340-
bytesCopied = packet_copyPayload(packet, 0, buffer + offset, copyLength);
2345+
gssize bytesCopied = packet_copyPayload(
2346+
nextPacket, 0, (PluginVirtualPtr){.val = buffer.val + offset}, copyLength);
2347+
if (bytesCopied < 0) {
2348+
// Error writing to PluginVirtualPtr
2349+
if (totalCopied > 0) {
2350+
warning("Returning error %s, but already copied %lu bytes which will be lost",
2351+
g_strerror(-bytesCopied), totalCopied);
2352+
}
2353+
return bytesCopied;
2354+
}
23412355
totalCopied += bytesCopied;
23422356
remaining -= bytesCopied;
23432357
offset += bytesCopied;
23442358

2359+
Packet* packet = socket_removeFromInputBuffer((Socket*)tcp);
2360+
23452361
if(bytesCopied < packetLength) {
23462362
/* we were only able to read part of this packet */
23472363
tcp->partialUserDataPacket = packet;

src/main/host/descriptor/transport.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include "main/core/support/definitions.h"
1111
#include "main/host/descriptor/descriptor.h"
1212
#include "main/host/descriptor/transport.h"
13+
#include "main/host/syscall_types.h"
1314
#include "main/utility/utility.h"
1415

1516
static Transport* _transport_fromLegacyDescriptor(LegacyDescriptor* descriptor) {
@@ -54,15 +55,15 @@ void transport_init(Transport* transport, TransportFunctionTable* vtable,
5455
transport->vtable = vtable;
5556
}
5657

57-
gssize transport_sendUserData(Transport* transport, gconstpointer buffer, gsize nBytes,
58-
in_addr_t ip, in_port_t port) {
58+
gssize transport_sendUserData(Transport* transport, PluginVirtualPtr buffer, gsize nBytes,
59+
in_addr_t ip, in_port_t port) {
5960
MAGIC_ASSERT(transport);
6061
MAGIC_ASSERT(transport->vtable);
6162
return transport->vtable->send(transport, buffer, nBytes, ip, port);
6263
}
6364

64-
gssize transport_receiveUserData(Transport* transport, gpointer buffer, gsize nBytes,
65-
in_addr_t* ip, in_port_t* port) {
65+
gssize transport_receiveUserData(Transport* transport, PluginVirtualPtr buffer, gsize nBytes,
66+
in_addr_t* ip, in_port_t* port) {
6667
MAGIC_ASSERT(transport);
6768
MAGIC_ASSERT(transport->vtable);
6869
return transport->vtable->receive(transport, buffer, nBytes, ip, port);

src/main/host/descriptor/transport.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,16 @@
1212

1313
#include "main/core/support/definitions.h"
1414
#include "main/host/descriptor/descriptor.h"
15+
#include "main/host/syscall_types.h"
1516
#include "main/utility/utility.h"
1617

1718
typedef struct _Transport Transport;
1819
typedef struct _TransportFunctionTable TransportFunctionTable;
1920

20-
typedef gssize (*TransportSendFunc)(Transport* transport, gconstpointer buffer, gsize nBytes, in_addr_t ip, in_port_t port);
21-
typedef gssize (*TransportReceiveFunc)(Transport* transport, gpointer buffer, gsize nBytes, in_addr_t* ip, in_port_t* port);
21+
typedef gssize (*TransportSendFunc)(Transport* transport, PluginVirtualPtr buffer, gsize nBytes,
22+
in_addr_t ip, in_port_t port);
23+
typedef gssize (*TransportReceiveFunc)(Transport* transport, PluginVirtualPtr buffer, gsize nBytes,
24+
in_addr_t* ip, in_port_t* port);
2225

2326
struct _TransportFunctionTable {
2427
DescriptorCloseFunc close;
@@ -38,9 +41,9 @@ struct _Transport {
3841
void transport_init(Transport* transport, TransportFunctionTable* vtable,
3942
LegacyDescriptorType type);
4043

41-
gssize transport_sendUserData(Transport* transport, gconstpointer buffer, gsize nBytes,
42-
in_addr_t ip, in_port_t port);
43-
gssize transport_receiveUserData(Transport* transport, gpointer buffer, gsize nBytes,
44-
in_addr_t* ip, in_port_t* port);
44+
gssize transport_sendUserData(Transport* transport, PluginVirtualPtr buffer, gsize nBytes,
45+
in_addr_t ip, in_port_t port);
46+
gssize transport_receiveUserData(Transport* transport, PluginVirtualPtr buffer, gsize nBytes,
47+
in_addr_t* ip, in_port_t* port);
4548

4649
#endif /* SHD_TRANSPORT_H_ */

src/main/host/descriptor/udp.c

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ static void _udp_dropPacket(Socket* socket, Packet* packet) {
106106
* ip and port parameters. this function assumes that the socket is already
107107
* bound to a local port, no matter if that happened explicitly or implicitly.
108108
*/
109-
static gssize _udp_sendUserData(Transport* transport, gconstpointer buffer,
110-
gsize nBytes, in_addr_t ip, in_port_t port) {
109+
static gssize _udp_sendUserData(Transport* transport, PluginVirtualPtr buffer, gsize nBytes,
110+
in_addr_t ip, in_port_t port) {
111111
UDP* udp = _udp_fromLegacyDescriptor((LegacyDescriptor*)transport);
112112
MAGIC_ASSERT(udp);
113113

@@ -169,29 +169,34 @@ static gssize _udp_sendUserData(Transport* transport, gconstpointer buffer,
169169
return bytes_sent;
170170
}
171171

172-
static gssize _udp_receiveUserData(Transport* transport, gpointer buffer,
173-
gsize nBytes, in_addr_t* ip,
174-
in_port_t* port) {
172+
static gssize _udp_receiveUserData(Transport* transport, PluginVirtualPtr buffer, gsize nBytes,
173+
in_addr_t* ip, in_port_t* port) {
175174
UDP* udp = _udp_fromLegacyDescriptor((LegacyDescriptor*)transport);
176175
MAGIC_ASSERT(udp);
177176

178177
if (socket_peekNextInPacket(&(udp->super)) == NULL) {
179178
return -EWOULDBLOCK;
180179
}
181180

182-
if (buffer == NULL && nBytes > 0) {
181+
if (buffer.val == 0 && nBytes > 0) {
183182
return -EFAULT;
184183
}
185184

186-
Packet* packet = socket_removeFromInputBuffer((Socket*)udp);
187-
if(!packet) {
185+
const Packet* nextPacket = socket_peekNextInPacket((Socket*)udp);
186+
if (!nextPacket) {
188187
return -EWOULDBLOCK;
189188
}
190189

191190
/* copy lesser of requested and available amount to application buffer */
192-
guint packetLength = packet_getPayloadLength(packet);
191+
guint packetLength = packet_getPayloadLength(nextPacket);
193192
gsize copyLength = MIN(nBytes, packetLength);
194-
guint bytesCopied = packet_copyPayload(packet, 0, buffer, copyLength);
193+
gssize bytesCopied = packet_copyPayload(nextPacket, 0, buffer, copyLength);
194+
if (bytesCopied < 0) {
195+
// Error writing to PluginVirtualPtr
196+
return bytesCopied;
197+
}
198+
199+
Packet* packet = socket_removeFromInputBuffer((Socket*)udp);
195200

196201
utility_assert(bytesCopied == copyLength);
197202
packet_addDeliveryStatus(packet, PDS_RCV_SOCKET_DELIVERED);
@@ -207,7 +212,7 @@ static gssize _udp_receiveUserData(Transport* transport, gpointer buffer,
207212
/* destroy packet, throwing away any bytes not claimed by the app */
208213
packet_unref(packet);
209214

210-
debug("user read %u inbound UDP bytes", bytesCopied);
215+
debug("user read %ld inbound UDP bytes", bytesCopied);
211216

212217
return bytesCopied;
213218
}

0 commit comments

Comments
 (0)