diff --git a/sortix/pipe.cpp b/sortix/pipe.cpp
index 5e6dd1f1..4073211e 100644
--- a/sortix/pipe.cpp
+++ b/sortix/pipe.cpp
@@ -1,6 +1,6 @@
-/******************************************************************************
+/*******************************************************************************
- COPYRIGHT(C) JONAS 'SORTIE' TERMANSEN 2011.
+ Copyright(C) Jonas 'Sortie' Termansen 2011, 2012.
This file is part of Sortix.
@@ -14,18 +14,21 @@
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details.
- You should have received a copy of the GNU General Public License along
- with Sortix. If not, see .
+ You should have received a copy of the GNU General Public License along with
+ Sortix. If not, see .
pipe.cpp
A device with a writing end and a reading end.
-******************************************************************************/
+*******************************************************************************/
#include
+#include
#include
#include
+#ifdef GOT_FAKE_KTHREAD
#include "event.h"
+#endif
#include "thread.h"
#include "process.h"
#include "syscall.h"
@@ -49,10 +52,15 @@ namespace Sortix
size_t buffersize;
size_t bufferoffset;
size_t bufferused;
+#ifdef GOT_FAKE_KTHREAD
Event readevent;
Event writeevent;
+#endif
bool anyreading;
bool anywriting;
+ kthread_mutex_t pipelock;
+ kthread_cond_t readcond;
+ kthread_cond_t writecond;
public:
virtual ssize_t Read(byte* dest, size_t count);
@@ -74,6 +82,9 @@ namespace Sortix
this->bufferused = 0;
this->anyreading = true;
this->anywriting = true;
+ this->pipelock = KTHREAD_MUTEX_INITIALIZER;
+ this->readcond = KTHREAD_COND_INITIALIZER;
+ this->writecond = KTHREAD_COND_INITIALIZER;
}
DevPipeStorage::~DevPipeStorage()
@@ -87,6 +98,29 @@ namespace Sortix
ssize_t DevPipeStorage::Read(byte* dest, size_t count)
{
if ( count == 0 ) { return 0; }
+#ifdef GOT_ACTUAL_KTHREAD
+ ScopedLockSignal lock(&pipelock);
+ if ( !lock.IsAcquired() ) { Error::Set(EINTR); return -1; }
+ while ( anywriting && !bufferused )
+ {
+ if ( !kthread_cond_wait_signal(&readcond, &pipelock) )
+ {
+ Error::Set(EINTR);
+ return -1;
+ }
+ }
+ if ( !bufferused && !anywriting ) { return 0; }
+ if ( bufferused < count ) { count = bufferused; }
+ size_t amount = count;
+ size_t linear = buffersize - bufferoffset;
+ if ( linear < amount ) { amount = linear; }
+ ASSERT(amount);
+ Memory::Copy(dest, buffer + bufferoffset, amount);
+ bufferoffset = (bufferoffset + amount) % buffersize;
+ bufferused -= amount;
+ kthread_cond_broadcast(&writecond);
+ return amount;
+#else
if ( bufferused )
{
if ( bufferused < count ) { count = bufferused; }
@@ -106,12 +140,41 @@ namespace Sortix
Error::Set(EBLOCKING);
readevent.Register();
return -1;
+#endif
}
ssize_t DevPipeStorage::Write(const byte* src, size_t count)
{
- if ( !anyreading ) { /* TODO: SIGPIPE */ }
if ( count == 0 ) { return 0; }
+#ifdef GOT_ACTUAL_KTHREAD
+ ScopedLockSignal lock(&pipelock);
+ if ( !lock.IsAcquired() ) { Error::Set(EINTR); return -1; }
+ while ( anyreading && bufferused == buffersize )
+ {
+ if ( !kthread_cond_wait_signal(&writecond, &pipelock) )
+ {
+ Error::Set(EINTR);
+ return -1;
+ }
+ }
+ if ( !anyreading )
+ {
+ // TODO: Implement better signal support and uncomment.
+ //CurrentThread()->DeliverSignal(SIGPIPE);
+ Error::Set(EPIPE);
+ return -1;
+ }
+ if ( buffersize - bufferused < count ) { count = buffersize - bufferused; }
+ size_t writeoffset = (bufferoffset + bufferused) % buffersize;
+ size_t amount = count;
+ size_t linear = buffersize - writeoffset;
+ if ( linear < amount ) { amount = linear; }
+ ASSERT(amount);
+ Memory::Copy(buffer + writeoffset, src, amount);
+ bufferused += amount;
+ kthread_cond_broadcast(&readcond);
+ return amount;
+#else
if ( bufferused < buffersize )
{
if ( buffersize - bufferused < count ) { count = buffersize - bufferused; }
@@ -129,10 +192,22 @@ namespace Sortix
Error::Set(EBLOCKING);
writeevent.Register();
return -1;
+#endif
}
- void DevPipeStorage::NotReading() { anyreading = false; }
- void DevPipeStorage::NotWriting() { anywriting = false; }
+ void DevPipeStorage::NotReading()
+ {
+ ScopedLock lock(&pipelock);
+ anyreading = false;
+ kthread_cond_broadcast(&readcond);
+ }
+
+ void DevPipeStorage::NotWriting()
+ {
+ ScopedLock lock(&pipelock);
+ anywriting = false;
+ kthread_cond_broadcast(&writecond);
+ }
class DevPipeReading : public DevStream
{
diff --git a/sortix/pipe.h b/sortix/pipe.h
index b8197b7b..11faee2c 100644
--- a/sortix/pipe.h
+++ b/sortix/pipe.h
@@ -1,6 +1,6 @@
-/******************************************************************************
+/*******************************************************************************
- COPYRIGHT(C) JONAS 'SORTIE' TERMANSEN 2011.
+ Copyright(C) Jonas 'Sortie' Termansen 2011, 2012.
This file is part of Sortix.
@@ -14,13 +14,13 @@
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details.
- You should have received a copy of the GNU General Public License along
- with Sortix. If not, see .
+ You should have received a copy of the GNU General Public License along with
+ Sortix. If not, see .
pipe.h
A device with a writing end and a reading end.
-******************************************************************************/
+*******************************************************************************/
#ifndef SORTIX_PIPE_H