mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
* io.c: fix IO.copy_stream interrupt handling.
based on the patch by Eric Wong. [ruby-core:36156] * vm_core.h (rb_thread_call_with_gvl): don't declare here. * thread.c: include internal.h. (rb_thread_execute_interrupts): new function. * internal.h (rb_thread_execute_interrupts): declared. (rb_thread_call_with_gvl): declared. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@31971 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
1fdbe0f437
commit
87f025da25
6 changed files with 137 additions and 23 deletions
13
ChangeLog
13
ChangeLog
|
@ -1,3 +1,16 @@
|
||||||
|
Thu Jun 9 23:57:53 2011 Tanaka Akira <akr@fsij.org>
|
||||||
|
|
||||||
|
* io.c: fix IO.copy_stream interrupt handling.
|
||||||
|
based on the patch by Eric Wong. [ruby-core:36156]
|
||||||
|
|
||||||
|
* vm_core.h (rb_thread_call_with_gvl): don't declare here.
|
||||||
|
|
||||||
|
* thread.c: include internal.h.
|
||||||
|
(rb_thread_execute_interrupts): new function.
|
||||||
|
|
||||||
|
* internal.h (rb_thread_execute_interrupts): declared.
|
||||||
|
(rb_thread_call_with_gvl): declared.
|
||||||
|
|
||||||
Thu Jun 9 23:34:01 2011 CHIKANAGA Tomoyuki <nagachika00@gmail.com>
|
Thu Jun 9 23:34:01 2011 CHIKANAGA Tomoyuki <nagachika00@gmail.com>
|
||||||
|
|
||||||
* gc.c (rb_objspace_call_finalizer): use rb_typeddata_is_kind_of() for
|
* gc.c (rb_objspace_call_finalizer): use rb_typeddata_is_kind_of() for
|
||||||
|
|
|
@ -30,6 +30,9 @@ VALUE rb_big_uminus(VALUE x);
|
||||||
VALUE rb_obj_is_thread(VALUE obj);
|
VALUE rb_obj_is_thread(VALUE obj);
|
||||||
VALUE rb_obj_is_mutex(VALUE obj);
|
VALUE rb_obj_is_mutex(VALUE obj);
|
||||||
|
|
||||||
|
void rb_thread_execute_interrupts(VALUE th);
|
||||||
|
void *rb_thread_call_with_gvl(void *(*func)(void *), void *data1);
|
||||||
|
|
||||||
#if defined(__cplusplus)
|
#if defined(__cplusplus)
|
||||||
#if 0
|
#if 0
|
||||||
{ /* satisfy cc-mode */
|
{ /* satisfy cc-mode */
|
||||||
|
|
104
io.c
104
io.c
|
@ -14,6 +14,7 @@
|
||||||
#include "ruby/ruby.h"
|
#include "ruby/ruby.h"
|
||||||
#include "ruby/io.h"
|
#include "ruby/io.h"
|
||||||
#include "dln.h"
|
#include "dln.h"
|
||||||
|
#include "internal.h"
|
||||||
#include <ctype.h>
|
#include <ctype.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
|
|
||||||
|
@ -8517,13 +8518,57 @@ struct copy_stream_struct {
|
||||||
VALUE th;
|
VALUE th;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static void *
|
||||||
|
exec_interrupts(void *arg)
|
||||||
|
{
|
||||||
|
VALUE th = (VALUE)arg;
|
||||||
|
rb_thread_execute_interrupts(th);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* returns TRUE if the preceding system call was interrupted
|
||||||
|
* so we can continue. If the thread was interrupted, we
|
||||||
|
* reacquire the GVL to execute interrupts before continuing.
|
||||||
|
*/
|
||||||
static int
|
static int
|
||||||
maygvl_copy_stream_wait_read(struct copy_stream_struct *stp)
|
maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp)
|
||||||
|
{
|
||||||
|
switch (errno) {
|
||||||
|
case EINTR:
|
||||||
|
#if defined(ERESTART)
|
||||||
|
case ERESTART:
|
||||||
|
#endif
|
||||||
|
if (rb_thread_interrupted(stp->th))
|
||||||
|
if (has_gvl)
|
||||||
|
rb_thread_execute_interrupts(stp->th);
|
||||||
|
else
|
||||||
|
rb_thread_call_with_gvl(exec_interrupts, (void *)stp->th);
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
return FALSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
maygvl_select(int has_gvl, int n, rb_fdset_t *rfds, rb_fdset_t *wfds, rb_fdset_t *efds, struct timeval *timeout)
|
||||||
|
{
|
||||||
|
if (has_gvl)
|
||||||
|
return rb_thread_fd_select(n, rfds, wfds, efds, timeout);
|
||||||
|
else
|
||||||
|
return rb_fd_select(n, rfds, wfds, efds, timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
maygvl_copy_stream_wait_read(int has_gvl, struct copy_stream_struct *stp)
|
||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
rb_fd_zero(&stp->fds);
|
|
||||||
rb_fd_set(stp->src_fd, &stp->fds);
|
do {
|
||||||
ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
|
rb_fd_zero(&stp->fds);
|
||||||
|
rb_fd_set(stp->src_fd, &stp->fds);
|
||||||
|
ret = maygvl_select(has_gvl, rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
|
||||||
|
} while (ret == -1 && maygvl_copy_stream_continue_p(has_gvl, stp));
|
||||||
|
|
||||||
if (ret == -1) {
|
if (ret == -1) {
|
||||||
stp->syserr = "select";
|
stp->syserr = "select";
|
||||||
stp->error_no = errno;
|
stp->error_no = errno;
|
||||||
|
@ -8536,9 +8581,13 @@ static int
|
||||||
nogvl_copy_stream_wait_write(struct copy_stream_struct *stp)
|
nogvl_copy_stream_wait_write(struct copy_stream_struct *stp)
|
||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
rb_fd_zero(&stp->fds);
|
|
||||||
rb_fd_set(stp->dst_fd, &stp->fds);
|
do {
|
||||||
ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
|
rb_fd_zero(&stp->fds);
|
||||||
|
rb_fd_set(stp->dst_fd, &stp->fds);
|
||||||
|
ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
|
||||||
|
} while (ret == -1 && maygvl_copy_stream_continue_p(0, stp));
|
||||||
|
|
||||||
if (ret == -1) {
|
if (ret == -1) {
|
||||||
stp->syserr = "select";
|
stp->syserr = "select";
|
||||||
stp->error_no = errno;
|
stp->error_no = errno;
|
||||||
|
@ -8600,13 +8649,13 @@ simple_sendfile(int out_fd, int in_fd, off_t *offset, off_t count)
|
||||||
|
|
||||||
#ifdef USE_SENDFILE
|
#ifdef USE_SENDFILE
|
||||||
static int
|
static int
|
||||||
maygvl_copy_stream_wait_readwrite(struct copy_stream_struct *stp)
|
maygvl_copy_stream_wait_readwrite(int has_gvl, struct copy_stream_struct *stp)
|
||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
rb_fd_zero(&stp->fds);
|
rb_fd_zero(&stp->fds);
|
||||||
rb_fd_set(stp->src_fd, &stp->fds);
|
rb_fd_set(stp->src_fd, &stp->fds);
|
||||||
rb_fd_set(stp->dst_fd, &stp->fds);
|
rb_fd_set(stp->dst_fd, &stp->fds);
|
||||||
ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
|
ret = maygvl_select(has_gvl, rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
|
||||||
if (ret == -1) {
|
if (ret == -1) {
|
||||||
stp->syserr = "select";
|
stp->syserr = "select";
|
||||||
stp->error_no = errno;
|
stp->error_no = errno;
|
||||||
|
@ -8685,6 +8734,8 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (ss == -1) {
|
if (ss == -1) {
|
||||||
|
if (maygvl_copy_stream_continue_p(0, stp))
|
||||||
|
goto retry_sendfile;
|
||||||
switch (errno) {
|
switch (errno) {
|
||||||
case EINVAL:
|
case EINVAL:
|
||||||
#ifdef ENOSYS
|
#ifdef ENOSYS
|
||||||
|
@ -8695,9 +8746,7 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp)
|
||||||
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
|
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
|
||||||
case EWOULDBLOCK:
|
case EWOULDBLOCK:
|
||||||
#endif
|
#endif
|
||||||
if (maygvl_copy_stream_wait_readwrite(stp) == -1)
|
if (maygvl_copy_stream_wait_readwrite(0, stp) == -1)
|
||||||
return -1;
|
|
||||||
if (rb_thread_interrupted(stp->th))
|
|
||||||
return -1;
|
return -1;
|
||||||
goto retry_sendfile;
|
goto retry_sendfile;
|
||||||
}
|
}
|
||||||
|
@ -8710,12 +8759,22 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static ssize_t
|
static ssize_t
|
||||||
maygvl_copy_stream_read(struct copy_stream_struct *stp, char *buf, size_t len, off_t offset)
|
maygvl_read(int has_gvl, int fd, void *buf, size_t count)
|
||||||
|
{
|
||||||
|
if (has_gvl)
|
||||||
|
return rb_read_internal(fd, buf, count);
|
||||||
|
else
|
||||||
|
return read(fd, buf, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
static ssize_t
|
||||||
|
maygvl_copy_stream_read(int has_gvl, struct copy_stream_struct *stp, char *buf, size_t len, off_t offset)
|
||||||
{
|
{
|
||||||
ssize_t ss;
|
ssize_t ss;
|
||||||
retry_read:
|
retry_read:
|
||||||
if (offset == (off_t)-1)
|
if (offset == (off_t)-1) {
|
||||||
ss = read(stp->src_fd, buf, len);
|
ss = maygvl_read(has_gvl, stp->src_fd, buf, len);
|
||||||
|
}
|
||||||
else {
|
else {
|
||||||
#ifdef HAVE_PREAD
|
#ifdef HAVE_PREAD
|
||||||
ss = pread(stp->src_fd, buf, len, offset);
|
ss = pread(stp->src_fd, buf, len, offset);
|
||||||
|
@ -8728,12 +8787,14 @@ maygvl_copy_stream_read(struct copy_stream_struct *stp, char *buf, size_t len, o
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
if (ss == -1) {
|
if (ss == -1) {
|
||||||
|
if (maygvl_copy_stream_continue_p(has_gvl, stp))
|
||||||
|
goto retry_read;
|
||||||
switch (errno) {
|
switch (errno) {
|
||||||
case EAGAIN:
|
case EAGAIN:
|
||||||
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
|
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
|
||||||
case EWOULDBLOCK:
|
case EWOULDBLOCK:
|
||||||
#endif
|
#endif
|
||||||
if (maygvl_copy_stream_wait_read(stp) == -1)
|
if (maygvl_copy_stream_wait_read(has_gvl, stp) == -1)
|
||||||
return -1;
|
return -1;
|
||||||
goto retry_read;
|
goto retry_read;
|
||||||
#ifdef ENOSYS
|
#ifdef ENOSYS
|
||||||
|
@ -8757,6 +8818,8 @@ nogvl_copy_stream_write(struct copy_stream_struct *stp, char *buf, size_t len)
|
||||||
while (len) {
|
while (len) {
|
||||||
ss = write(stp->dst_fd, buf+off, len);
|
ss = write(stp->dst_fd, buf+off, len);
|
||||||
if (ss == -1) {
|
if (ss == -1) {
|
||||||
|
if (maygvl_copy_stream_continue_p(0, stp))
|
||||||
|
continue;
|
||||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||||
if (nogvl_copy_stream_wait_write(stp) == -1)
|
if (nogvl_copy_stream_wait_write(stp) == -1)
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -8811,12 +8874,12 @@ nogvl_copy_stream_read_write(struct copy_stream_struct *stp)
|
||||||
len = sizeof(buf);
|
len = sizeof(buf);
|
||||||
}
|
}
|
||||||
if (use_pread) {
|
if (use_pread) {
|
||||||
ss = maygvl_copy_stream_read(stp, buf, len, src_offset);
|
ss = maygvl_copy_stream_read(0, stp, buf, len, src_offset);
|
||||||
if (0 < ss)
|
if (0 < ss)
|
||||||
src_offset += ss;
|
src_offset += ss;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
ss = maygvl_copy_stream_read(stp, buf, len, (off_t)-1);
|
ss = maygvl_copy_stream_read(0, stp, buf, len, (off_t)-1);
|
||||||
}
|
}
|
||||||
if (ss <= 0) /* EOF or error */
|
if (ss <= 0) /* EOF or error */
|
||||||
return;
|
return;
|
||||||
|
@ -8827,9 +8890,6 @@ nogvl_copy_stream_read_write(struct copy_stream_struct *stp)
|
||||||
|
|
||||||
if (!use_eof)
|
if (!use_eof)
|
||||||
copy_length -= ss;
|
copy_length -= ss;
|
||||||
|
|
||||||
if (rb_thread_interrupted(stp->th))
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8890,7 +8950,7 @@ copy_stream_fallback_body(VALUE arg)
|
||||||
ssize_t ss;
|
ssize_t ss;
|
||||||
rb_thread_wait_fd(stp->src_fd);
|
rb_thread_wait_fd(stp->src_fd);
|
||||||
rb_str_resize(buf, buflen);
|
rb_str_resize(buf, buflen);
|
||||||
ss = maygvl_copy_stream_read(stp, RSTRING_PTR(buf), l, off);
|
ss = maygvl_copy_stream_read(1, stp, RSTRING_PTR(buf), l, off);
|
||||||
if (ss == -1)
|
if (ss == -1)
|
||||||
return Qnil;
|
return Qnil;
|
||||||
if (ss == 0)
|
if (ss == 0)
|
||||||
|
|
|
@ -78,6 +78,14 @@ class TestIO < Test::Unit::TestCase
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def trapping_usr1
|
||||||
|
@usr1_rcvd = 0
|
||||||
|
trap(:USR1) { @usr1_rcvd += 1 }
|
||||||
|
yield
|
||||||
|
ensure
|
||||||
|
trap(:USR1, "DEFAULT")
|
||||||
|
end
|
||||||
|
|
||||||
def test_pipe
|
def test_pipe
|
||||||
r, w = IO.pipe
|
r, w = IO.pipe
|
||||||
assert_instance_of(IO, r)
|
assert_instance_of(IO, r)
|
||||||
|
@ -594,6 +602,30 @@ class TestIO < Test::Unit::TestCase
|
||||||
result = t.value
|
result = t.value
|
||||||
assert_equal(megacontent, result)
|
assert_equal(megacontent, result)
|
||||||
}
|
}
|
||||||
|
with_socketpair {|s1, s2|
|
||||||
|
begin
|
||||||
|
s1.nonblock = true
|
||||||
|
rescue Errno::EBADF
|
||||||
|
skip "nonblocking IO for pipe is not implemented"
|
||||||
|
end
|
||||||
|
trapping_usr1 do
|
||||||
|
nr = 10
|
||||||
|
pid = fork do
|
||||||
|
s1.close
|
||||||
|
IO.select([s2])
|
||||||
|
Process.kill(:USR1, Process.ppid)
|
||||||
|
s2.read
|
||||||
|
end
|
||||||
|
s2.close
|
||||||
|
nr.times do
|
||||||
|
assert_equal megacontent.bytesize, IO.copy_stream("megasrc", s1)
|
||||||
|
end
|
||||||
|
assert_equal(1, @usr1_rcvd)
|
||||||
|
s1.close
|
||||||
|
_, status = Process.waitpid2(pid)
|
||||||
|
assert status.success?, status.inspect
|
||||||
|
end
|
||||||
|
}
|
||||||
end
|
end
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
7
thread.c
7
thread.c
|
@ -46,6 +46,7 @@
|
||||||
|
|
||||||
#include "eval_intern.h"
|
#include "eval_intern.h"
|
||||||
#include "gc.h"
|
#include "gc.h"
|
||||||
|
#include "internal.h"
|
||||||
#include "ruby/io.h"
|
#include "ruby/io.h"
|
||||||
|
|
||||||
#ifndef USE_NATIVE_THREAD_PRIORITY
|
#ifndef USE_NATIVE_THREAD_PRIORITY
|
||||||
|
@ -1358,6 +1359,12 @@ rb_threadptr_execute_interrupts(rb_thread_t *th)
|
||||||
rb_threadptr_execute_interrupts_rec(th, 0);
|
rb_threadptr_execute_interrupts_rec(th, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
rb_thread_execute_interrupts(VALUE th)
|
||||||
|
{
|
||||||
|
rb_threadptr_execute_interrupts_rec((rb_thread_t *)th, 0);
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
rb_gc_mark_threads(void)
|
rb_gc_mark_threads(void)
|
||||||
{
|
{
|
||||||
|
|
|
@ -656,7 +656,6 @@ void rb_vm_gvl_destroy(rb_vm_t *vm);
|
||||||
void rb_thread_start_timer_thread(void);
|
void rb_thread_start_timer_thread(void);
|
||||||
void rb_thread_stop_timer_thread(void);
|
void rb_thread_stop_timer_thread(void);
|
||||||
void rb_thread_reset_timer_thread(void);
|
void rb_thread_reset_timer_thread(void);
|
||||||
void *rb_thread_call_with_gvl(void *(*func)(void *), void *data1);
|
|
||||||
int ruby_thread_has_gvl_p(void);
|
int ruby_thread_has_gvl_p(void);
|
||||||
VALUE rb_make_backtrace(void);
|
VALUE rb_make_backtrace(void);
|
||||||
typedef int rb_backtrace_iter_func(void *, VALUE, int, VALUE);
|
typedef int rb_backtrace_iter_func(void *, VALUE, int, VALUE);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue