1
0
Fork 0
mirror of https://gitlab.com/sortix/sortix.git synced 2023-02-13 20:55:38 -05:00

Optimize pipe transfers using scheduler yield hints.

This commit is contained in:
Jonas 'Sortie' Termansen 2014-10-04 00:47:20 +02:00
parent 6a14b67feb
commit ede6d8f800

View file

@ -24,6 +24,7 @@
#include <assert.h>
#include <errno.h>
#include <stdint.h>
#include <string.h>
#include <sortix/fcntl.h>
@ -43,6 +44,7 @@
#include <sortix/kernel/poll.h>
#include <sortix/kernel/process.h>
#include <sortix/kernel/refcount.h>
#include <sortix/kernel/scheduler.h>
#include <sortix/kernel/signal.h>
#include <sortix/kernel/syscall.h>
#include <sortix/kernel/thread.h>
@ -82,10 +84,14 @@ private:
kthread_cond_t readcond;
kthread_cond_t writecond;
uint8_t* buffer;
uintptr_t sender_system_tid;
uintptr_t receiver_system_tid;
size_t bufferoffset;
size_t bufferused;
size_t buffersize;
size_t pretended_read_buffer_size;
size_t pledged_read;
size_t pledged_write;
bool anyreading;
bool anywriting;
bool is_sigpipe_enabled;
@ -102,6 +108,10 @@ PipeChannel::PipeChannel(uint8_t* buffer, size_t buffersize)
bufferoffset = bufferused = 0;
anyreading = anywriting = true;
is_sigpipe_enabled = true;
sender_system_tid = 0;
receiver_system_tid = 0;
pledged_read = 0;
pledged_write = 0;
}
PipeChannel::~PipeChannel()
@ -136,65 +146,125 @@ void PipeChannel::PerhapsShutdown()
ssize_t PipeChannel::read(ioctx_t* ctx, uint8_t* buf, size_t count)
{
if ( SSIZE_MAX < count )
count = SSIZE_MAX;
Thread* this_thread = CurrentThread();
this_thread->yield_to_tid = sender_system_tid;
ScopedLockSignal lock(&pipelock);
if ( !lock.IsAcquired() ) { errno = EINTR; return -1; }
while ( anywriting && !bufferused )
if ( !lock.IsAcquired() )
return errno = EINTR, -1;
size_t so_far = 0;
while ( count )
{
if ( ctx->dflags & O_NONBLOCK )
return errno = EWOULDBLOCK, -1;
if ( !kthread_cond_wait_signal(&readcond, &pipelock) )
receiver_system_tid = this_thread->system_tid;
while ( anywriting && !bufferused )
{
errno = EINTR;
return -1;
this_thread->yield_to_tid = sender_system_tid;
if ( pledged_read )
{
pledged_write++;
kthread_mutex_unlock(&pipelock);
kthread_yield();
kthread_mutex_lock(&pipelock);
pledged_write--;
continue;
}
if ( so_far )
return so_far;
if ( ctx->dflags & O_NONBLOCK )
return errno = EWOULDBLOCK, -1;
pledged_write++;
bool interrupted = !kthread_cond_wait_signal(&readcond, &pipelock);
pledged_write--;
if ( interrupted )
return errno = EINTR, -1;
}
if ( !bufferused && !anywriting )
return (ssize_t) so_far;
size_t amount = count;
if ( bufferused < amount )
amount = bufferused;
size_t linear = buffersize - bufferoffset;
if ( linear < amount )
amount = linear;
assert(amount);
if ( !ctx->copy_to_dest(buf, buffer + bufferoffset, amount) )
return so_far ? (ssize_t) so_far : -1;
bufferoffset = (bufferoffset + amount) % buffersize;
bufferused -= amount;
buf += amount;
count -= amount;
so_far += amount;
kthread_cond_broadcast(&writecond);
read_poll_channel.Signal(ReadPollEventStatus());
write_poll_channel.Signal(WritePollEventStatus());
}
if ( !bufferused && !anywriting ) { return 0; }
if ( bufferused < count ) { count = bufferused; }
size_t amount = count;
size_t linear = buffersize - bufferoffset;
if ( linear < amount ) { amount = linear; }
assert(amount);
ctx->copy_to_dest(buf, buffer + bufferoffset, amount);
bufferoffset = (bufferoffset + amount) % buffersize;
bufferused -= amount;
kthread_cond_broadcast(&writecond);
read_poll_channel.Signal(ReadPollEventStatus());
write_poll_channel.Signal(WritePollEventStatus());
return amount;
return (ssize_t) so_far;
}
ssize_t PipeChannel::write(ioctx_t* ctx, const uint8_t* buf, size_t count)
{
if ( SSIZE_MAX < count )
count = SSIZE_MAX;
Thread* this_thread = CurrentThread();
this_thread->yield_to_tid = receiver_system_tid;
ScopedLockSignal lock(&pipelock);
if ( !lock.IsAcquired() ) { errno = EINTR; return -1; }
while ( anyreading && bufferused == buffersize )
if ( !lock.IsAcquired() )
return errno = EINTR, -1;
sender_system_tid = this_thread->system_tid;
size_t so_far = 0;
while ( count )
{
if ( ctx->dflags & O_NONBLOCK )
return errno = EWOULDBLOCK, -1;
if ( !kthread_cond_wait_signal(&writecond, &pipelock) )
sender_system_tid = this_thread->system_tid;
while ( anyreading && bufferused == buffersize )
{
errno = EINTR;
return -1;
this_thread->yield_to_tid = receiver_system_tid;
if ( pledged_write )
{
pledged_read++;
kthread_mutex_unlock(&pipelock);
kthread_yield();
kthread_mutex_lock(&pipelock);
pledged_read--;
continue;
}
if ( so_far )
return so_far;
if ( ctx->dflags & O_NONBLOCK )
return errno = EWOULDBLOCK, -1;
pledged_read++;
bool interrupted = !kthread_cond_wait_signal(&writecond, &pipelock);
pledged_read--;
if ( interrupted )
return errno = EINTR, -1;
}
if ( !anyreading )
{
if ( so_far )
return (ssize_t) so_far;
if ( is_sigpipe_enabled )
CurrentThread()->DeliverSignal(SIGPIPE);
return errno = EPIPE, -1;
}
size_t amount = count;
if ( buffersize - bufferused < amount )
amount = buffersize - bufferused;
size_t writeoffset = (bufferoffset + bufferused) % buffersize;
size_t linear = buffersize - writeoffset;
if ( linear < amount )
amount = linear;
assert(amount);
if ( !ctx->copy_from_src(buffer + writeoffset, buf, amount) )
return so_far ? (ssize_t) so_far : -1;
bufferused += amount;
buf += amount;
count -= amount;
so_far += amount;
kthread_cond_broadcast(&readcond);
read_poll_channel.Signal(ReadPollEventStatus());
write_poll_channel.Signal(WritePollEventStatus());
}
if ( !anyreading )
{
if ( is_sigpipe_enabled )
CurrentThread()->DeliverSignal(SIGPIPE);
return errno = EPIPE, -1;
}
if ( buffersize - bufferused < count ) { count = buffersize - bufferused; }
size_t writeoffset = (bufferoffset + bufferused) % buffersize;
size_t amount = count;
size_t linear = buffersize - writeoffset;
if ( linear < amount ) { amount = linear; }
assert(amount);
ctx->copy_from_src(buffer + writeoffset, buf, amount);
bufferused += amount;
kthread_cond_broadcast(&readcond);
read_poll_channel.Signal(ReadPollEventStatus());
write_poll_channel.Signal(WritePollEventStatus());
return amount;
return (ssize_t) so_far;
}
short PipeChannel::ReadPollEventStatus()
@ -341,14 +411,22 @@ void PipeEndpoint::Disconnect()
ssize_t PipeEndpoint::read(ioctx_t* ctx, uint8_t* buf, size_t count)
{
if ( !reading ) { errno = EBADF; return -1; }
return channel->read(ctx, buf, count);
if ( !reading )
return errno = EBADF, -1;
ssize_t result = channel->read(ctx, buf, count);
CurrentThread()->yield_to_tid = 0;
Scheduler::ScheduleTrueThread();
return result;
}
ssize_t PipeEndpoint::write(ioctx_t* ctx, const uint8_t* buf, size_t count)
{
if ( reading ) { errno = EBADF; return -1; }
return channel->write(ctx, buf, count);
if ( reading )
return errno = EBADF, -1;
ssize_t result = channel->write(ctx, buf, count);
CurrentThread()->yield_to_tid = 0;
Scheduler::ScheduleTrueThread();
return result;
}
int PipeEndpoint::poll(ioctx_t* ctx, PollNode* node)