1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

Make io_binwritev atomic.

This commit is contained in:
Samuel Williams 2022-01-09 11:41:20 +13:00
parent ce23cfa518
commit 15ebfe2849
Notes: git 2022-05-28 12:44:47 +09:00

145
io.c
View file

@ -1885,105 +1885,126 @@ io_write(VALUE io, VALUE str, int nosync)
#ifdef HAVE_WRITEV
struct binwritev_arg {
rb_io_t *fptr;
const struct iovec *iov;
struct iovec *iov;
int iovcnt;
size_t total;
};
static VALUE
call_writev_internal(VALUE arg)
io_binwritev_internal(VALUE arg)
{
struct binwritev_arg *p = (struct binwritev_arg *)arg;
return rb_writev_internal(p->fptr, p->iov, p->iovcnt);
size_t remaining = p->total;
size_t offset = 0;
rb_io_t *fptr = p->fptr;
struct iovec *iov = p->iov;
int iovcnt = p->iovcnt;
while (remaining) {
long result = rb_writev_internal(fptr, iov, iovcnt);
if (result >= 0) {
offset += result;
if (fptr->wbuf.ptr && fptr->wbuf.len) {
if (offset < (size_t)fptr->wbuf.len) {
fptr->wbuf.off += result;
fptr->wbuf.len -= result;
}
else {
offset -= (size_t)fptr->wbuf.len;
fptr->wbuf.off = 0;
fptr->wbuf.len = 0;
}
}
if (offset == p->total) {
return p->total;
}
while (result >= (ssize_t)iov->iov_len) {
/* iovcnt > 0 */
result -= iov->iov_len;
iov->iov_len = 0;
iov++;
if (!--iovcnt) {
// I don't believe this code path can ever occur.
return offset;
}
}
iov->iov_base = (char *)iov->iov_base + result;
iov->iov_len -= result;
errno = EAGAIN;
}
if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) {
rb_io_check_closed(fptr);
} else {
return -1;
}
}
return offset;
}
static long
io_binwritev(struct iovec *iov, int iovcnt, rb_io_t *fptr)
{
int i;
long r, total = 0, written_len = 0;
/* don't write anything if current thread has a pending interrupt. */
// Don't write anything if current thread has a pending interrupt:
rb_thread_check_ints();
if (iovcnt == 0) return 0;
for (i = 1; i < iovcnt; i++) total += iov[i].iov_len;
size_t total = 0;
for (int i = 1; i < iovcnt; i++) total += iov[i].iov_len;
io_allocate_write_buffer(fptr, 1);
if (fptr->wbuf.ptr && fptr->wbuf.len) {
long offset = fptr->wbuf.off + fptr->wbuf.len;
if (offset + total <= fptr->wbuf.capa) {
for (i = 1; i < iovcnt; i++) {
// The end of the buffered data:
size_t offset = fptr->wbuf.off + fptr->wbuf.len;
if (offset + total <= (size_t)fptr->wbuf.capa) {
for (int i = 1; i < iovcnt; i++) {
memcpy(fptr->wbuf.ptr+offset, iov[i].iov_base, iov[i].iov_len);
offset += iov[i].iov_len;
}
fptr->wbuf.len += total;
return total;
}
else {
iov[0].iov_base = fptr->wbuf.ptr + fptr->wbuf.off;
iov[0].iov_len = fptr->wbuf.len;
iov[0].iov_len = fptr->wbuf.len;
}
}
else {
// The first iov is reserved for the internal buffer, and it's empty.
iov++;
if (!--iovcnt) return 0;
if (!--iovcnt) {
// If there are no other io vectors we are done.
return 0;
}
}
retry:
struct binwritev_arg arg;
arg.fptr = fptr;
arg.iov = iov;
arg.iovcnt = iovcnt;
arg.total = total;
if (fptr->write_lock) {
struct binwritev_arg arg;
arg.fptr = fptr;
arg.iov = iov;
arg.iovcnt = iovcnt;
r = rb_mutex_synchronize(fptr->write_lock, call_writev_internal, (VALUE)&arg);
return rb_mutex_synchronize(fptr->write_lock, io_binwritev_internal, (VALUE)&arg);
}
else {
r = rb_writev_internal(fptr, iov, iovcnt);
return io_binwritev_internal((VALUE)&arg);
}
if (r >= 0) {
written_len += r;
if (fptr->wbuf.ptr && fptr->wbuf.len) {
if (written_len < fptr->wbuf.len) {
fptr->wbuf.off += r;
fptr->wbuf.len -= r;
}
else {
written_len -= fptr->wbuf.len;
fptr->wbuf.off = 0;
fptr->wbuf.len = 0;
}
}
if (written_len == total) return total;
while (r >= (ssize_t)iov->iov_len) {
/* iovcnt > 0 */
r -= iov->iov_len;
iov->iov_len = 0;
iov++;
if (!--iovcnt) {
// assert(written_len == total);
return total;
}
}
iov->iov_base = (char *)iov->iov_base + r;
iov->iov_len -= r;
errno = EAGAIN;
}
if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) {
rb_io_check_closed(fptr);
goto retry;
}
return -1L;
}
static long