mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
io + socket: make pipes and sockets nonblocking by default
All normal Ruby IO methods (IO#read, IO#gets, IO#write, ...) are all capable of appearing to be "blocking" when presented with a file description with the O_NONBLOCK flag set; so there is little risk of incompatibility within Ruby-using programs. The biggest compatibility risk is when spawning external programs. As a result, stdin, stdout, and stderr are now always made blocking before exec-family calls. This change will make an event-oriented MJIT usable if it is waiting on pipes on POSIX_like platforms. It is ALSO necessary to take advantage of (proposed lightweight concurrency (aka "auto-Fiber") or any similar proposal for network concurrency: https://bugs.ruby-lang.org/issues/13618 Named-pipe (FIFO) are NOT yet non-blocking by default since they are rarely-used and may introduce compatibility problems and extra syscall overhead for a common path. Please revert this commit if there are problems and if I am afk since I am afk a lot, lately. [ruby-core:89950] [Bug #14968] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@65922 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
b009de13bf
commit
6a65f2b1e4
14 changed files with 139 additions and 46 deletions
|
@ -435,7 +435,7 @@ rsock_socket0(int domain, int type, int proto)
|
|||
static int cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */
|
||||
|
||||
if (cloexec_state > 0) { /* common path, if SOCK_CLOEXEC is defined */
|
||||
ret = socket(domain, type|SOCK_CLOEXEC, proto);
|
||||
ret = socket(domain, type|SOCK_CLOEXEC|SOCK_NONBLOCK, proto);
|
||||
if (ret >= 0) {
|
||||
if (ret <= 2)
|
||||
goto fix_cloexec;
|
||||
|
@ -443,7 +443,7 @@ rsock_socket0(int domain, int type, int proto)
|
|||
}
|
||||
}
|
||||
else if (cloexec_state < 0) { /* usually runs once only for detection */
|
||||
ret = socket(domain, type|SOCK_CLOEXEC, proto);
|
||||
ret = socket(domain, type|SOCK_CLOEXEC|SOCK_NONBLOCK, proto);
|
||||
if (ret >= 0) {
|
||||
cloexec_state = rsock_detect_cloexec(ret);
|
||||
if (cloexec_state == 0 || ret <= 2)
|
||||
|
@ -466,6 +466,7 @@ rsock_socket0(int domain, int type, int proto)
|
|||
return -1;
|
||||
fix_cloexec:
|
||||
rb_maygvl_fd_fix_cloexec(ret);
|
||||
rsock_make_fd_nonblock(ret);
|
||||
update_max_fd:
|
||||
rb_update_max_fd(ret);
|
||||
|
||||
|
@ -632,8 +633,8 @@ rsock_connect(int fd, const struct sockaddr *sockaddr, int len, int socks)
|
|||
return status;
|
||||
}
|
||||
|
||||
static void
|
||||
make_fd_nonblock(int fd)
|
||||
void
|
||||
rsock_make_fd_nonblock(int fd)
|
||||
{
|
||||
int flags;
|
||||
#ifdef F_GETFL
|
||||
|
@ -659,6 +660,7 @@ cloexec_accept(int socket, struct sockaddr *address, socklen_t *address_len,
|
|||
#ifdef HAVE_ACCEPT4
|
||||
static int try_accept4 = 1;
|
||||
#endif
|
||||
nonblock = 1; /* TODO remove parameter */
|
||||
if (address_len) len0 = *address_len;
|
||||
#ifdef HAVE_ACCEPT4
|
||||
if (try_accept4) {
|
||||
|
@ -678,7 +680,7 @@ cloexec_accept(int socket, struct sockaddr *address, socklen_t *address_len,
|
|||
rb_maygvl_fd_fix_cloexec(ret);
|
||||
#ifndef SOCK_NONBLOCK
|
||||
if (nonblock) {
|
||||
make_fd_nonblock(ret);
|
||||
rsock_make_fd_nonblock(ret);
|
||||
}
|
||||
#endif
|
||||
if (address_len && len0 < *address_len) *address_len = len0;
|
||||
|
@ -695,7 +697,7 @@ cloexec_accept(int socket, struct sockaddr *address, socklen_t *address_len,
|
|||
if (address_len && len0 < *address_len) *address_len = len0;
|
||||
rb_maygvl_fd_fix_cloexec(ret);
|
||||
if (nonblock) {
|
||||
make_fd_nonblock(ret);
|
||||
rsock_make_fd_nonblock(ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -433,6 +433,8 @@ static inline void rsock_maybe_wait_fd(int fd) { }
|
|||
VALUE rsock_read_nonblock(VALUE sock, VALUE length, VALUE buf, VALUE ex);
|
||||
VALUE rsock_write_nonblock(VALUE sock, VALUE buf, VALUE ex);
|
||||
|
||||
void rsock_make_fd_nonblock(int fd);
|
||||
|
||||
#if !defined HAVE_INET_NTOP && ! defined _WIN32
|
||||
const char *inet_ntop(int, const void *, char *, size_t);
|
||||
#elif defined __MINGW32__
|
||||
|
|
|
@ -177,14 +177,14 @@ rsock_socketpair0(int domain, int type, int protocol, int sv[2])
|
|||
static int cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */
|
||||
|
||||
if (cloexec_state > 0) { /* common path, if SOCK_CLOEXEC is defined */
|
||||
ret = socketpair(domain, type|SOCK_CLOEXEC, protocol, sv);
|
||||
ret = socketpair(domain, type|SOCK_CLOEXEC|SOCK_NONBLOCK, protocol, sv);
|
||||
if (ret == 0 && (sv[0] <= 2 || sv[1] <= 2)) {
|
||||
goto fix_cloexec; /* highly unlikely */
|
||||
}
|
||||
goto update_max_fd;
|
||||
}
|
||||
else if (cloexec_state < 0) { /* usually runs once only for detection */
|
||||
ret = socketpair(domain, type|SOCK_CLOEXEC, protocol, sv);
|
||||
ret = socketpair(domain, type|SOCK_CLOEXEC|SOCK_NONBLOCK, protocol, sv);
|
||||
if (ret == 0) {
|
||||
cloexec_state = rsock_detect_cloexec(sv[0]);
|
||||
if ((cloexec_state == 0) || (sv[0] <= 2 || sv[1] <= 2))
|
||||
|
@ -213,6 +213,8 @@ rsock_socketpair0(int domain, int type, int protocol, int sv[2])
|
|||
fix_cloexec:
|
||||
rb_maygvl_fd_fix_cloexec(sv[0]);
|
||||
rb_maygvl_fd_fix_cloexec(sv[1]);
|
||||
rsock_make_fd_nonblock(sv[0]);
|
||||
rsock_make_fd_nonblock(sv[1]);
|
||||
|
||||
update_max_fd:
|
||||
rb_update_max_fd(sv[0]);
|
||||
|
@ -231,6 +233,8 @@ rsock_socketpair0(int domain, int type, int protocol, int sv[2])
|
|||
|
||||
rb_fd_fix_cloexec(sv[0]);
|
||||
rb_fd_fix_cloexec(sv[1]);
|
||||
rsock_make_fd_nonblock(sv[0]);
|
||||
rsock_make_fd_nonblock(sv[1]);
|
||||
return ret;
|
||||
}
|
||||
#endif /* !SOCK_CLOEXEC */
|
||||
|
|
45
io.c
45
io.c
|
@ -316,6 +316,27 @@ rb_cloexec_dup2(int oldfd, int newfd)
|
|||
return ret;
|
||||
}
|
||||
|
||||
static int
|
||||
rb_fd_set_nonblock(int fd)
|
||||
{
|
||||
#ifdef _WIN32
|
||||
return rb_w32_set_nonblock(fd);
|
||||
#elif defined(F_GETFL)
|
||||
int err;
|
||||
int oflags = fcntl(fd, F_GETFL);
|
||||
|
||||
if (oflags == -1)
|
||||
return -1;
|
||||
if (oflags & O_NONBLOCK)
|
||||
return 0;
|
||||
oflags |= O_NONBLOCK;
|
||||
err = fcntl(fd, F_SETFL, oflags);
|
||||
if (err == -1)
|
||||
return -1;
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
rb_cloexec_pipe(int fildes[2])
|
||||
{
|
||||
|
@ -324,7 +345,7 @@ rb_cloexec_pipe(int fildes[2])
|
|||
#if defined(HAVE_PIPE2)
|
||||
static int try_pipe2 = 1;
|
||||
if (try_pipe2) {
|
||||
ret = pipe2(fildes, O_CLOEXEC);
|
||||
ret = pipe2(fildes, O_CLOEXEC | O_NONBLOCK);
|
||||
if (ret != -1)
|
||||
return ret;
|
||||
/* pipe2 is available since Linux 2.6.27, glibc 2.9. */
|
||||
|
@ -350,6 +371,8 @@ rb_cloexec_pipe(int fildes[2])
|
|||
#endif
|
||||
rb_maygvl_fd_fix_cloexec(fildes[0]);
|
||||
rb_maygvl_fd_fix_cloexec(fildes[1]);
|
||||
rb_fd_set_nonblock(fildes[0]);
|
||||
rb_fd_set_nonblock(fildes[1]);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -2696,27 +2719,9 @@ read_all(rb_io_t *fptr, long siz, VALUE str)
|
|||
void
|
||||
rb_io_set_nonblock(rb_io_t *fptr)
|
||||
{
|
||||
#ifdef _WIN32
|
||||
if (rb_w32_set_nonblock(fptr->fd) != 0) {
|
||||
if (rb_fd_set_nonblock(fptr->fd) != 0) {
|
||||
rb_sys_fail_path(fptr->pathv);
|
||||
}
|
||||
#else
|
||||
int oflags;
|
||||
#ifdef F_GETFL
|
||||
oflags = fcntl(fptr->fd, F_GETFL);
|
||||
if (oflags == -1) {
|
||||
rb_sys_fail_path(fptr->pathv);
|
||||
}
|
||||
#else
|
||||
oflags = 0;
|
||||
#endif
|
||||
if ((oflags & O_NONBLOCK) == 0) {
|
||||
oflags |= O_NONBLOCK;
|
||||
if (fcntl(fptr->fd, F_SETFL, oflags) == -1) {
|
||||
rb_sys_fail_path(fptr->pathv);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
struct read_internal_arg {
|
||||
|
|
44
process.c
44
process.c
|
@ -1474,6 +1474,39 @@ before_exec_non_async_signal_safe(void)
|
|||
rb_thread_stop_timer_thread();
|
||||
}
|
||||
|
||||
#define WRITE_CONST(fd, str) (void)(write((fd),(str),sizeof(str)-1)<0)
|
||||
#ifdef _WIN32
|
||||
int rb_w32_set_nonblock2(int fd, int nonblock);
|
||||
#endif
|
||||
|
||||
static int
|
||||
set_blocking(int fd)
|
||||
{
|
||||
#ifdef _WIN32
|
||||
return rb_w32_set_nonblock2(fd, 0);
|
||||
#elif defined(F_GETFL) && defined(F_SETFL)
|
||||
int fl = fcntl(fd, F_GETFL); /* async-signal-safe */
|
||||
|
||||
/* EBADF ought to be possible */
|
||||
if (fl == -1) return fl;
|
||||
if (fl & O_NONBLOCK) {
|
||||
fl &= ~O_NONBLOCK;
|
||||
return fcntl(fd, F_SETFL, fl);
|
||||
}
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
static void
|
||||
stdfd_clear_nonblock(void)
|
||||
{
|
||||
/* many programs cannot deal with non-blocking stdin/stdout/stderr */
|
||||
int fd;
|
||||
for (fd = 0; fd < 3; fd++) {
|
||||
(void)set_blocking(fd); /* can't do much about errors anyhow */
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
before_exec(void)
|
||||
{
|
||||
|
@ -3445,6 +3478,11 @@ rb_execarg_run_options(const struct rb_execarg *eargp, struct rb_execarg *sargp,
|
|||
rb_execarg_allocate_dup2_tmpbuf(sargp, RARRAY_LEN(ary));
|
||||
}
|
||||
}
|
||||
{
|
||||
int preserve = errno;
|
||||
stdfd_clear_nonblock();
|
||||
errno = preserve;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -3645,6 +3683,12 @@ read_retry(int fd, void *buf, size_t len)
|
|||
{
|
||||
ssize_t r;
|
||||
|
||||
if (set_blocking(fd) != 0) {
|
||||
#ifndef _WIN32
|
||||
rb_async_bug_errno("set_blocking failed reading child error", errno);
|
||||
#endif
|
||||
}
|
||||
|
||||
do {
|
||||
r = read(fd, buf, len);
|
||||
} while (r < 0 && errno == EINTR);
|
||||
|
|
|
@ -44,7 +44,6 @@ describe "IO#read_nonblock" do
|
|||
platform_is_not :windows do
|
||||
it 'sets the IO in nonblock mode' do
|
||||
require 'io/nonblock'
|
||||
@read.nonblock?.should == false
|
||||
@write.write "abc"
|
||||
@read.read_nonblock(1).should == "a"
|
||||
@read.nonblock?.should == true
|
||||
|
|
|
@ -76,7 +76,6 @@ describe 'IO#write_nonblock' do
|
|||
platform_is_not :windows do
|
||||
it 'sets the IO in nonblock mode' do
|
||||
require 'io/nonblock'
|
||||
@write.nonblock?.should == false
|
||||
@write.write_nonblock('a')
|
||||
@write.nonblock?.should == true
|
||||
end
|
||||
|
|
|
@ -34,6 +34,12 @@ describe 'Socket#connect' do
|
|||
|
||||
lambda {
|
||||
@client.connect(@server.getsockname)
|
||||
|
||||
# A second call needed if non-blocking sockets become default
|
||||
# XXX honestly I don't expect any real code to care about this spec
|
||||
# as it's too implementation-dependent and checking for connect()
|
||||
# errors is futile anyways because of TOCTOU
|
||||
@client.connect(@server.getsockname)
|
||||
}.should raise_error(Errno::EISCONN)
|
||||
end
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ class TestIONonblock < Test::Unit::TestCase
|
|||
|
||||
def test_nonblock
|
||||
IO.pipe {|r, w|
|
||||
w.nonblock = false
|
||||
assert_equal(false, w.nonblock?)
|
||||
w.nonblock do
|
||||
assert_equal(true, w.nonblock?)
|
||||
|
|
|
@ -1360,6 +1360,7 @@ class TestIO < Test::Unit::TestCase
|
|||
def test_readpartial_lock
|
||||
with_pipe do |r, w|
|
||||
s = ""
|
||||
r.nonblock = false if have_nonblock?
|
||||
t = Thread.new { r.readpartial(5, s) }
|
||||
Thread.pass until t.stop?
|
||||
assert_raise(RuntimeError) { s.clear }
|
||||
|
|
|
@ -770,6 +770,15 @@ class TestProcess < Test::Unit::TestCase
|
|||
Process.wait pid
|
||||
end
|
||||
}
|
||||
|
||||
# ensure standard FDs we redirect to are blocking for compatibility
|
||||
with_pipes(3) do |pipes|
|
||||
src = 'p [STDIN,STDOUT,STDERR].map(&:nonblock?)'
|
||||
rdr = { 0 => pipes[0][0], 1 => pipes[1][1], 2 => pipes[2][1] }
|
||||
pid = spawn(RUBY, '-rio/nonblock', '-e', src, rdr)
|
||||
assert_equal("[false, false, false]\n", pipes[1][0].gets)
|
||||
Process.wait pid
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -159,8 +159,9 @@ class TestSocket_BasicSocket < Test::Unit::TestCase
|
|||
set_nb = true
|
||||
buf = String.new
|
||||
if ssock.respond_to?(:nonblock?)
|
||||
assert_not_predicate(ssock, :nonblock?)
|
||||
assert_not_predicate(csock, :nonblock?)
|
||||
assert_predicate(ssock, :nonblock?)
|
||||
assert_predicate(csock, :nonblock?)
|
||||
csock.nonblock = ssock.nonblock = false
|
||||
|
||||
# Linux may use MSG_DONTWAIT to avoid setting O_NONBLOCK
|
||||
if RUBY_PLATFORM.match?(/linux/) && Socket.const_defined?(:MSG_DONTWAIT)
|
||||
|
|
31
thread.c
31
thread.c
|
@ -4068,17 +4068,19 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
|
|||
int result = 0, lerrno;
|
||||
rb_hrtime_t *to, rel, end = 0;
|
||||
int drained;
|
||||
rb_thread_t *th = GET_THREAD();
|
||||
nfds_t nfds;
|
||||
rb_unblock_function_t *ubf;
|
||||
struct waiting_fd wfd;
|
||||
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
|
||||
wfd.th = GET_THREAD();
|
||||
wfd.fd = fd;
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
|
||||
timeout_prepare(&to, &rel, &end, timeout);
|
||||
fds[0].fd = fd;
|
||||
fds[0].events = (short)events;
|
||||
do {
|
||||
fds[0].revents = 0;
|
||||
fds[1].fd = rb_sigwait_fd_get(th);
|
||||
fds[1].fd = rb_sigwait_fd_get(wfd.th);
|
||||
|
||||
if (fds[1].fd >= 0) {
|
||||
fds[1].events = POLLIN;
|
||||
|
@ -4092,27 +4094,29 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
|
|||
}
|
||||
|
||||
lerrno = 0;
|
||||
BLOCKING_REGION(th, {
|
||||
list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node);
|
||||
BLOCKING_REGION(wfd.th, {
|
||||
const rb_hrtime_t *sto;
|
||||
struct timespec ts;
|
||||
|
||||
sto = sigwait_timeout(th, fds[1].fd, to, &drained);
|
||||
if (!RUBY_VM_INTERRUPTED(th->ec)) {
|
||||
sto = sigwait_timeout(wfd.th, fds[1].fd, to, &drained);
|
||||
if (!RUBY_VM_INTERRUPTED(wfd.th->ec)) {
|
||||
result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, sto), NULL);
|
||||
if (result < 0) lerrno = errno;
|
||||
}
|
||||
}, ubf, th, TRUE);
|
||||
}, ubf, wfd.th, TRUE);
|
||||
list_del(&wfd.wfd_node);
|
||||
|
||||
if (fds[1].fd >= 0) {
|
||||
if (result > 0 && fds[1].revents) {
|
||||
result--;
|
||||
fds[1].revents = 0;
|
||||
}
|
||||
(void)check_signals_nogvl(th, fds[1].fd);
|
||||
rb_sigwait_fd_put(th, fds[1].fd);
|
||||
rb_sigwait_fd_migrate(th->vm);
|
||||
(void)check_signals_nogvl(wfd.th, fds[1].fd);
|
||||
rb_sigwait_fd_put(wfd.th, fds[1].fd);
|
||||
rb_sigwait_fd_migrate(wfd.th->vm);
|
||||
}
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
|
||||
} while (wait_retryable(&result, lerrno, to, end));
|
||||
|
||||
if (result < 0) {
|
||||
|
@ -4152,6 +4156,7 @@ struct select_args {
|
|||
rb_fdset_t *read;
|
||||
rb_fdset_t *write;
|
||||
rb_fdset_t *except;
|
||||
struct waiting_fd wfd;
|
||||
struct timeval *tv;
|
||||
};
|
||||
|
||||
|
@ -4182,6 +4187,7 @@ select_single_cleanup(VALUE ptr)
|
|||
{
|
||||
struct select_args *args = (struct select_args *)ptr;
|
||||
|
||||
list_del(&args->wfd.wfd_node);
|
||||
if (args->read) rb_fd_term(args->read);
|
||||
if (args->write) rb_fd_term(args->write);
|
||||
if (args->except) rb_fd_term(args->except);
|
||||
|
@ -4202,7 +4208,10 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
|
|||
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
|
||||
args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
|
||||
args.tv = tv;
|
||||
args.wfd.fd = fd;
|
||||
args.wfd.th = GET_THREAD();
|
||||
|
||||
list_add(&args.wfd.th->vm->waiting_fds, &args.wfd.wfd_node);
|
||||
r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
|
||||
if (r == -1)
|
||||
errno = args.as.error;
|
||||
|
|
|
@ -4429,11 +4429,11 @@ fcntl(int fd, int cmd, ...)
|
|||
|
||||
/* License: Ruby's */
|
||||
int
|
||||
rb_w32_set_nonblock(int fd)
|
||||
rb_w32_set_nonblock2(int fd, int nonblock)
|
||||
{
|
||||
SOCKET sock = TO_SOCKET(fd);
|
||||
if (is_socket(sock)) {
|
||||
return setfl(sock, O_NONBLOCK);
|
||||
return setfl(sock, nonblock ? O_NONBLOCK : 0);
|
||||
}
|
||||
else if (is_pipe(sock)) {
|
||||
DWORD state;
|
||||
|
@ -4441,7 +4441,12 @@ rb_w32_set_nonblock(int fd)
|
|||
errno = map_errno(GetLastError());
|
||||
return -1;
|
||||
}
|
||||
state |= PIPE_NOWAIT;
|
||||
if (nonblock) {
|
||||
state |= PIPE_NOWAIT;
|
||||
}
|
||||
else {
|
||||
state &= ~PIPE_NOWAIT;
|
||||
}
|
||||
if (!SetNamedPipeHandleState((HANDLE)sock, &state, NULL, NULL)) {
|
||||
errno = map_errno(GetLastError());
|
||||
return -1;
|
||||
|
@ -4454,6 +4459,12 @@ rb_w32_set_nonblock(int fd)
|
|||
}
|
||||
}
|
||||
|
||||
int
|
||||
rb_w32_set_nonblock(int fd)
|
||||
{
|
||||
return rb_w32_set_nonblock2(fd, TRUE);
|
||||
}
|
||||
|
||||
#ifndef WNOHANG
|
||||
#define WNOHANG -1
|
||||
#endif
|
||||
|
|
Loading…
Reference in a new issue