Make `io_binwrite` atomic.

This commit is contained in:
Samuel Williams 2022-01-09 01:47:51 +13:00
parent 8a13a2e8d1
commit ce23cfa518
Notes: git 2022-05-28 12:44:48 +09:00
1 changed files with 125 additions and 89 deletions

214
io.c
View File

@ -1581,71 +1581,111 @@ struct write_arg {
#ifdef HAVE_WRITEV #ifdef HAVE_WRITEV
static VALUE static VALUE
io_binwrite_string(VALUE arg) io_binwrite_string_internal(rb_io_t *fptr, const char *ptr, long length)
{ {
struct binwrite_arg *p = (struct binwrite_arg *)arg;
rb_io_t *fptr = p->fptr;
long r;
if (fptr->wbuf.len) { if (fptr->wbuf.len) {
struct iovec iov[2]; struct iovec iov[2];
iov[0].iov_base = fptr->wbuf.ptr+fptr->wbuf.off; iov[0].iov_base = fptr->wbuf.ptr+fptr->wbuf.off;
iov[0].iov_len = fptr->wbuf.len; iov[0].iov_len = fptr->wbuf.len;
iov[1].iov_base = (char *)p->ptr; iov[1].iov_base = (void*)ptr;
iov[1].iov_len = p->length; iov[1].iov_len = length;
r = rb_writev_internal(fptr, iov, 2); long result = rb_writev_internal(fptr, iov, 2);
if (r < 0) if (result < 0)
return r; return result;
if (fptr->wbuf.len <= r) { if (result >= fptr->wbuf.len) {
r -= fptr->wbuf.len; // We wrote more than the internal buffer:
fptr->wbuf.off = 0; result -= fptr->wbuf.len;
fptr->wbuf.len = 0; fptr->wbuf.off = 0;
} fptr->wbuf.len = 0;
else { }
fptr->wbuf.off += (int)r; else {
fptr->wbuf.len -= (int)r; // We only wrote less data than the internal buffer:
r = 0L; fptr->wbuf.off += (int)result;
} fptr->wbuf.len -= (int)result;
return 0L;
}
return result;
} }
else { else {
r = rb_write_internal(fptr, p->ptr, p->length); return rb_write_internal(fptr, ptr, length);
} }
return r;
} }
#else #else
static VALUE
io_binwrite_string_internal(rb_io_t *fptr, const char *ptr, long length)
{
long remaining = length;
if (fptr->wbuf.len) {
if (fptr->wbuf.len+length <= fptr->wbuf.capa) {
if (fptr->wbuf.capa < fptr->wbuf.off+fptr->wbuf.len+length) {
MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
fptr->wbuf.off = 0;
}
MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, ptr, char, length);
fptr->wbuf.len += (int)length;
// We copied the entire incoming data to the internal buffer:
remaining = 0;
}
// Flush the internal buffer:
if (io_fflush(fptr) < 0) {
return -1;
}
// If all the data was buffered, we are done:
if (remaining == 0) {
return length;
}
}
// Otherwise, we should write the data directly:
return rb_write_internal(fptr, ptr, length);
}
#endif
static VALUE static VALUE
io_binwrite_string(VALUE arg) io_binwrite_string(VALUE arg)
{ {
struct binwrite_arg *p = (struct binwrite_arg *)arg; struct binwrite_arg *p = (struct binwrite_arg *)arg;
rb_io_t *fptr = p->fptr;
long l, len;
l = len = p->length; const char *ptr = p->ptr;
long remaining = p->length;
if (fptr->wbuf.len) { while (remaining) {
if (fptr->wbuf.len+len <= fptr->wbuf.capa) { // Write as much as possible:
if (fptr->wbuf.capa < fptr->wbuf.off+fptr->wbuf.len+len) { long result = (long)io_binwrite_string_internal(p->fptr, ptr, remaining);
MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
fptr->wbuf.off = 0; // Finished:
} if (result == remaining) {
MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, p->ptr, char, len); break;
fptr->wbuf.len += (int)len; }
l = 0;
} if (result >= 0) {
if (io_fflush(fptr) < 0) ptr += result;
return -2L; /* fail in fflush */ remaining -= result;
if (l == 0) errno = EAGAIN;
return len; }
// Wait for it to become writable:
if (rb_io_maybe_wait_writable(errno, p->fptr->self, Qnil)) {
rb_io_check_closed(p->fptr);
} else {
// The error was unrelated to waiting for it to become writable, so we fail:
return -1;
}
} }
return rb_write_internal(p->fptr, p->ptr, p->length); return p->length;
} }
#endif
inline static void inline static void
io_allocate_write_buffer(rb_io_t *fptr, int sync) io_allocate_write_buffer(rb_io_t *fptr, int sync)
@ -1660,65 +1700,57 @@ io_allocate_write_buffer(rb_io_t *fptr, int sync)
} }
} }
static inline int
io_binwrite_requires_flush_write(rb_io_t *fptr, long len, int nosync)
{
// If the requested operation was synchronous and the output mode is synchronus or a TTY:
if (!nosync && (fptr->mode & (FMODE_SYNC|FMODE_TTY)))
return 1;
// If the amount of data we want to write exceeds the internal buffer:
if (fptr->wbuf.ptr && fptr->wbuf.capa <= fptr->wbuf.len + len)
return 1;
// Otherwise, we can append to the internal buffer:
return 0;
}
static long static long
io_binwrite(VALUE str, const char *ptr, long len, rb_io_t *fptr, int nosync) io_binwrite(VALUE str, const char *ptr, long len, rb_io_t *fptr, int nosync)
{ {
long n, r, offset = 0; if (len <= 0) return len;
/* don't write anything if current thread has a pending interrupt. */ // Don't write anything if current thread has a pending interrupt:
rb_thread_check_ints(); rb_thread_check_ints();
if ((n = len) <= 0) return n;
io_allocate_write_buffer(fptr, !nosync); io_allocate_write_buffer(fptr, !nosync);
if ((!nosync && (fptr->mode & (FMODE_SYNC|FMODE_TTY))) || if (io_binwrite_requires_flush_write(fptr, len, nosync)) {
(fptr->wbuf.ptr && fptr->wbuf.capa <= fptr->wbuf.len + len)) {
struct binwrite_arg arg; struct binwrite_arg arg;
arg.fptr = fptr; arg.fptr = fptr;
arg.str = str; arg.str = str;
retry: arg.ptr = ptr;
arg.ptr = ptr + offset; arg.length = len;
arg.length = n;
if (fptr->write_lock) { if (fptr->write_lock) {
r = rb_mutex_synchronize(fptr->write_lock, io_binwrite_string, (VALUE)&arg); return rb_mutex_synchronize(fptr->write_lock, io_binwrite_string, (VALUE)&arg);
} }
else { else {
r = io_binwrite_string((VALUE)&arg); return io_binwrite_string((VALUE)&arg);
}
} else {
if (fptr->wbuf.off) {
if (fptr->wbuf.len)
MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
fptr->wbuf.off = 0;
} }
/* xxx: other threads may modify given string. */ MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, ptr, char, len);
if (r == n) return len; fptr->wbuf.len += (int)len;
if (0 <= r) {
offset += r;
n -= r;
errno = EAGAIN;
}
if (r == -2L) return len;
return -1L;
if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) {
rb_io_check_closed(fptr);
if (offset < len)
goto retry;
}
return -1L;
} }
if (fptr->wbuf.off) {
if (fptr->wbuf.len)
MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
fptr->wbuf.off = 0;
}
MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, ptr+offset, char, len);
fptr->wbuf.len += (int)len;
return len;
} }
# define MODE_BTMODE(a,b,c) ((fmode & FMODE_BINMODE) ? (b) : \ # define MODE_BTMODE(a,b,c) ((fmode & FMODE_BINMODE) ? (b) : \
@ -1792,15 +1824,17 @@ io_fwrite(VALUE str, rb_io_t *fptr, int nosync)
VALUE tmp; VALUE tmp;
long n, len; long n, len;
const char *ptr; const char *ptr;
#ifdef _WIN32 #ifdef _WIN32
if (fptr->mode & FMODE_TTY) { if (fptr->mode & FMODE_TTY) {
long len = rb_w32_write_console(str, fptr->fd); long len = rb_w32_write_console(str, fptr->fd);
if (len > 0) return len; if (len > 0) return len;
} }
#endif #endif
str = do_writeconv(str, fptr, &converted); str = do_writeconv(str, fptr, &converted);
if (converted) if (converted)
OBJ_FREEZE(str); OBJ_FREEZE(str);
tmp = rb_str_tmp_frozen_acquire(str); tmp = rb_str_tmp_frozen_acquire(str);
RSTRING_GETMEM(tmp, ptr, len); RSTRING_GETMEM(tmp, ptr, len);
@ -1830,10 +1864,12 @@ io_write(VALUE io, VALUE str, int nosync)
io = GetWriteIO(io); io = GetWriteIO(io);
str = rb_obj_as_string(str); str = rb_obj_as_string(str);
tmp = rb_io_check_io(io); tmp = rb_io_check_io(io);
if (NIL_P(tmp)) { if (NIL_P(tmp)) {
/* port is not IO, call write method for it. */ /* port is not IO, call write method for it. */
return rb_funcall(io, id_write, 1, str); return rb_funcall(io, id_write, 1, str);
} }
io = tmp; io = tmp;
if (RSTRING_LEN(str) == 0) return INT2FIX(0); if (RSTRING_LEN(str) == 0) return INT2FIX(0);