1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

* io.c: IO.copy_stream implemented. [ruby-dev:33843]

* thread.c (rb_fd_select): new function.

* configure.in (sys/sendfile.h): check the header file.
  (sendfile): check the function.
  (pread): check the function.


git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@15858 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
akr 2008-03-30 06:38:05 +00:00
parent a367738381
commit 0ef84dcefc
5 changed files with 840 additions and 2 deletions

View file

@ -1,3 +1,13 @@
Sun Mar 30 15:33:29 2008 Tanaka Akira <akr@fsij.org>
* io.c: IO.copy_stream implemented. [ruby-dev:33843]
* thread.c (rb_fd_select): new function.
* configure.in (sys/sendfile.h): check the header file.
(sendfile): check the function.
(pread): check the function.
Sat Mar 29 14:18:41 2008 Hidetoshi NAGAI <nagai@ai.kyutech.ac.jp> Sat Mar 29 14:18:41 2008 Hidetoshi NAGAI <nagai@ai.kyutech.ac.jp>
* ext/tk/*: full update Ruby/Tk to support Ruby(1.9|1.8) and Tc/Tk8.5. * ext/tk/*: full update Ruby/Tk to support Ruby(1.9|1.8) and Tc/Tk8.5.

View file

@ -592,7 +592,7 @@ AC_CHECK_HEADERS(stdlib.h string.h unistd.h limits.h sys/file.h sys/ioctl.h sys/
fcntl.h sys/fcntl.h sys/select.h sys/time.h sys/times.h sys/param.h\ fcntl.h sys/fcntl.h sys/select.h sys/time.h sys/times.h sys/param.h\
syscall.h pwd.h grp.h a.out.h utime.h memory.h direct.h sys/resource.h \ syscall.h pwd.h grp.h a.out.h utime.h memory.h direct.h sys/resource.h \
sys/mkdev.h sys/utime.h xti.h netinet/in_systm.h float.h ieeefp.h pthread.h \ sys/mkdev.h sys/utime.h xti.h netinet/in_systm.h float.h ieeefp.h pthread.h \
ucontext.h intrinsics.h langinfo.h locale.h) ucontext.h intrinsics.h langinfo.h locale.h sys/sendfile.h)
dnl Check additional types. dnl Check additional types.
AC_CHECK_SIZEOF(rlim_t, 0, [ AC_CHECK_SIZEOF(rlim_t, 0, [
@ -705,7 +705,8 @@ AC_CHECK_FUNCS(fmod killpg wait4 waitpid fork spawnv syscall chroot fsync getcwd
dlopen sigprocmask sigaction sigsetjmp _setjmp vsnprintf snprintf\ dlopen sigprocmask sigaction sigsetjmp _setjmp vsnprintf snprintf\
setsid telldir seekdir fchmod cosh sinh tanh log2 round signbit\ setsid telldir seekdir fchmod cosh sinh tanh log2 round signbit\
setuid setgid daemon select_large_fdset setenv unsetenv\ setuid setgid daemon select_large_fdset setenv unsetenv\
mktime timegm clock_gettime gettimeofday) mktime timegm clock_gettime gettimeofday\
pread sendfile)
AC_ARG_ENABLE(setreuid, AC_ARG_ENABLE(setreuid,
[ --enable-setreuid use setreuid()/setregid() according to need even if obsolete.], [ --enable-setreuid use setreuid()/setregid() according to need even if obsolete.],
[use_setreuid=$enableval]) [use_setreuid=$enableval])

472
io.c
View file

@ -14,6 +14,7 @@
#include "ruby/ruby.h" #include "ruby/ruby.h"
#include "ruby/io.h" #include "ruby/io.h"
#include "ruby/signal.h" #include "ruby/signal.h"
#include "vm_core.h"
#include <ctype.h> #include <ctype.h>
#include <errno.h> #include <errno.h>
@ -6219,6 +6220,476 @@ rb_io_s_read(int argc, VALUE *argv, VALUE io)
return rb_ensure(io_s_read, (VALUE)&arg, rb_io_close, arg.io); return rb_ensure(io_s_read, (VALUE)&arg, rb_io_close, arg.io);
} }
struct copy_stream_struct {
VALUE src;
VALUE dst;
int src_fd;
int dst_fd;
off_t copy_length;
off_t src_offset;
int close_src;
int close_dst;
off_t total;
char *syserr;
int error_no;
char *notimp;
rb_fdset_t fds;
rb_thread_t *th;
};
static void
copy_stream_rbuf_to_dst(struct copy_stream_struct *stp,
rb_io_t *src_fptr, rb_io_t *dst_fptr, const char *dst_path)
{
ssize_t r;
int len;
retry:
len = src_fptr->rbuf_len;
if (stp->copy_length != (off_t)-1 && stp->copy_length < len) {
len = stp->copy_length;
}
if (len == 0)
return;
r = rb_write_internal(dst_fptr->fd, src_fptr->rbuf + src_fptr->rbuf_off, len);
if (len == r) {
src_fptr->rbuf_len -= len;
if (src_fptr->rbuf_len < 0) src_fptr->rbuf_len = 0;
if (stp->copy_length != (off_t)-1) stp->copy_length -= len;
stp->total += len;
return;
}
else if (0 <= r) {
src_fptr->rbuf_off += r;
src_fptr->rbuf_len -= r;
if (stp->copy_length != (off_t)-1) stp->copy_length -= r;
stp->total += r;
errno = EAGAIN;
}
if (rb_io_wait_writable(dst_fptr->fd)) {
rb_io_check_closed(dst_fptr);
if (src_fptr->rbuf_len)
goto retry;
}
rb_sys_fail(dst_path);
}
static int
copy_stream_wait_read(struct copy_stream_struct *stp)
{
int ret;
rb_fd_zero(&stp->fds);
rb_fd_set(stp->src_fd, &stp->fds);
ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
if (ret == -1) {
stp->syserr = "select";
stp->error_no = errno;
return -1;
}
return 0;
}
static int
copy_stream_wait_write(struct copy_stream_struct *stp)
{
int ret;
rb_fd_zero(&stp->fds);
rb_fd_set(stp->dst_fd, &stp->fds);
ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
if (ret == -1) {
stp->syserr = "select";
stp->error_no = errno;
return -1;
}
return 0;
}
#ifdef HAVE_SENDFILE
#ifdef __linux__
#define USE_SENDFILE
#ifdef HAVE_SYS_SENDFILE_H
#include <sys/sendfile.h>
#endif
static ssize_t
simple_sendfile(int out_fd, int in_fd, off_t *offset, size_t count)
{
return sendfile(out_fd, in_fd, offset, count);
}
#endif
#endif
#ifdef USE_SENDFILE
static int
copy_stream_sendfile(struct copy_stream_struct *stp)
{
struct stat src_stat, dst_stat;
ssize_t ss;
int ret;
off_t copy_length;
off_t src_offset;
int use_pread;
ret = fstat(stp->src_fd, &src_stat);
if (ret == -1) {
stp->syserr = "fstat";
stp->error_no = errno;
return -1;
}
if (!S_ISREG(src_stat.st_mode))
return 0;
ret = fstat(stp->dst_fd, &dst_stat);
if (ret == -1) {
stp->syserr = "fstat";
stp->error_no = errno;
return -1;
}
if ((dst_stat.st_mode & S_IFMT) != S_IFSOCK)
return 0;
src_offset = stp->src_offset;
use_pread = src_offset != (off_t)-1;
copy_length = stp->copy_length;
if (copy_length == (off_t)-1) {
if (use_pread)
copy_length = src_stat.st_size - src_offset;
else {
off_t cur = lseek(stp->src_fd, 0, SEEK_CUR);
if (cur == (off_t)-1) {
stp->syserr = "lseek";
stp->error_no = errno;
return -1;
}
copy_length = src_stat.st_size - cur;
}
}
retry_sendfile:
if (use_pread) {
ss = simple_sendfile(stp->dst_fd, stp->src_fd, &src_offset, copy_length);
}
else {
ss = simple_sendfile(stp->dst_fd, stp->src_fd, NULL, copy_length);
}
if (0 < ss) {
stp->total += ss;
copy_length -= ss;
if (0 < copy_length) {
ss = -1;
errno = EAGAIN;
}
}
if (ss == -1) {
if (errno == EINVAL || errno == ENOSYS)
return 0;
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (copy_stream_wait_write(stp) == -1)
return -1;
if (RUBY_VM_INTERRUPTED(stp->th))
return;
goto retry_sendfile;
}
stp->syserr = "sendfile";
stp->error_no = errno;
return -1;
}
return 1;
}
#endif
static ssize_t
copy_stream_read(struct copy_stream_struct *stp, char *buf, int len, off_t offset)
{
ssize_t ss;
retry_read:
if (offset == (off_t)-1)
ss = read(stp->src_fd, buf, len);
else {
#ifdef HAVE_PREAD
ss = pread(stp->src_fd, buf, len, offset);
#else
stp->notimp = "pread";
return -1;
#endif
}
if (ss == 0) {
return 0;
}
if (ss == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (copy_stream_wait_read(stp) == -1)
return -1;
goto retry_read;
}
if (errno == ENOSYS) {
stp->notimp = "pread";
return -1;
}
stp->syserr = offset == (off_t)-1 ? "read" : "pread";
stp->error_no = errno;
return -1;
}
return ss;
}
static int
copy_stream_write(struct copy_stream_struct *stp, char *buf, int len)
{
ssize_t ss;
int off = 0;
while (len) {
ss = write(stp->dst_fd, buf+off, len);
if (ss == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (copy_stream_wait_write(stp) == -1)
return -1;
continue;
}
stp->syserr = "write";
stp->error_no = errno;
return -1;
}
off += ss;
len -= ss;
stp->total += ss;
}
return 0;
}
static void
copy_stream_read_write(struct copy_stream_struct *stp)
{
char buf[1024*16];
int len;
ssize_t ss;
int ret;
off_t copy_length;
int use_eof;
off_t src_offset;
int use_pread;
copy_length = stp->copy_length;
use_eof = copy_length == (off_t)-1;
src_offset = stp->src_offset;
use_pread = src_offset != (off_t)-1;
if (use_pread && stp->close_src) {
off_t r;
r = lseek(stp->src_fd, src_offset, SEEK_SET);
if (r == (off_t)-1) {
stp->syserr = "lseek";
stp->error_no = errno;
return;
}
src_offset = (off_t)-1;
use_pread = 0;
}
while (use_eof || 0 < copy_length) {
if (!use_eof && copy_length < sizeof(buf)) {
len = copy_length;
}
else {
len = sizeof(buf);
}
if (use_pread) {
ss = copy_stream_read(stp, buf, len, src_offset);
if (0 < ss)
src_offset += ss;
}
else {
ss = copy_stream_read(stp, buf, len, (off_t)-1);
}
if (ss <= 0) /* EOF or error */
return;
ret = copy_stream_write(stp, buf, ss);
if (ret < 0)
return;
if (!use_eof)
copy_length -= ss;
if (RUBY_VM_INTERRUPTED(stp->th))
return;
}
}
static VALUE
copy_stream_func(void *arg)
{
struct copy_stream_struct *stp = (struct copy_stream_struct *)arg;
int ret;
#ifdef USE_SENDFILE
ret = copy_stream_sendfile(stp);
if (ret != 0)
goto finish; /* error or success */
#endif
copy_stream_read_write(stp);
finish:
return Qnil;
}
static VALUE
copy_stream_body(VALUE arg)
{
struct copy_stream_struct *stp = (struct copy_stream_struct *)arg;
VALUE src_io, dst_io;
rb_io_t *src_fptr, *dst_fptr;
int src_fd, dst_fd;
char *src_path = 0, *dst_path = 0;
stp->th = GET_THREAD();
src_io = rb_check_convert_type(stp->src, T_FILE, "IO", "to_io");
if (!NIL_P(src_io)) {
GetOpenFile(src_io, src_fptr);
src_fd = src_fptr->fd;
}
else {
src_fptr = 0;
FilePathValue(stp->src);
src_path = StringValueCStr(stp->src);
src_fd = rb_sysopen_internal(src_path, O_RDONLY|O_NOCTTY, 0);
if (src_fd == -1) { rb_sys_fail(src_path); }
stp->close_src = 1;
}
stp->src_fd = src_fd;
dst_io = rb_check_convert_type(stp->dst, T_FILE, "IO", "to_io");
if (!NIL_P(dst_io)) {
dst_io = GetWriteIO(dst_io);
GetOpenFile(dst_io, dst_fptr);
dst_fd = dst_fptr->fd;
}
else {
dst_fptr = 0;
FilePathValue(stp->dst);
dst_path = StringValueCStr(stp->dst);
dst_fd = rb_sysopen_internal(dst_path, O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY, 0600);
if (dst_fd == -1) { rb_sys_fail(dst_path); }
stp->close_dst = 1;
}
stp->dst_fd = dst_fd;
stp->total = 0;
if (src_fptr && dst_fptr && src_fptr->rbuf_len && dst_fptr->wbuf_len) {
long len = src_fptr->rbuf_len;
VALUE str;
if (stp->copy_length != (off_t)-1 && stp->copy_length < len) {
len = stp->copy_length;
}
str = rb_str_buf_new(len);
rb_str_resize(str,len);
read_buffered_data(RSTRING_PTR(str), len, src_fptr);
io_fwrite(str, dst_fptr);
stp->total += len;
if (stp->copy_length != (off_t)-1)
stp->copy_length -= len;
}
if (dst_fptr && io_fflush(dst_fptr) < 0) {
rb_raise(rb_eIOError, "flush failed");
}
if (src_fptr) {
copy_stream_rbuf_to_dst(stp, src_fptr, dst_fptr, dst_path);
}
if (stp->copy_length == 0)
return Qnil;
rb_fd_init(&stp->fds);
rb_fd_set(src_fd, &stp->fds);
rb_fd_set(dst_fd, &stp->fds);
return rb_thread_blocking_region(copy_stream_func, (void*)stp, RB_UBF_DFL, 0);
}
static VALUE
copy_stream_finalize(VALUE arg)
{
struct copy_stream_struct *stp = (struct copy_stream_struct *)arg;
if (stp->close_src)
close(stp->src_fd);
if (stp->close_dst)
close(stp->dst_fd);
rb_fd_term(&stp->fds);
if (stp->syserr) {
errno = stp->error_no;
rb_sys_fail(stp->syserr);
}
if (stp->notimp) {
rb_raise(rb_eNotImpError, "%s() not implemented", stp->notimp);
}
return Qnil;
}
/*
* call-seq:
* IO.copy_stream(src, dst)
* IO.copy_stream(src, dst, copy_length)
* IO.copy_stream(src, dst, copy_length, src_offset)
*
* IO.copy_stream copies <i>src</i> to <i>dst</i>.
* <i>src</i> and <i>dst</i> is either a filename or an IO.
*
* This method returns the number of bytes copied.
*
* If optional arguments are not given,
* the start position of the copy is
* the beginning of the filename or
* the current file offset of the IO.
* The end position of the copy is the end of file.
*
* If <i>copy_length</i> is given,
* No more than <i>copy_length</i> bytes are copied.
*
* If <i>src_offset</i> is given,
* it specifies the start position of the copy.
*
* When <i>src_offset</i> is specified and
* <i>src</i> is an IO,
* IO.copy_stream doesn't move the current file offset.
*
*/
static VALUE
rb_io_s_copy_stream(int argc, VALUE *argv, VALUE io)
{
VALUE src, dst, length, src_offset;
struct copy_stream_struct st;
MEMZERO(&st, struct copy_stream_struct, 1);
rb_scan_args(argc, argv, "22", &src, &dst, &length, &src_offset);
if (NIL_P(length))
st.copy_length = (off_t)-1;
else
st.copy_length = NUM2OFFT(length);
if (NIL_P(src_offset))
st.src_offset = (off_t)-1;
else
st.src_offset = NUM2OFFT(src_offset);
st.src = src;
st.dst = dst;
rb_ensure(copy_stream_body, (VALUE)&st, copy_stream_finalize, (VALUE)&st);
return OFFT2NUM(st.total);
}
/* /*
* call-seq: * call-seq:
@ -6874,6 +7345,7 @@ Init_IO(void)
rb_define_singleton_method(rb_cIO, "select", rb_f_select, -1); rb_define_singleton_method(rb_cIO, "select", rb_f_select, -1);
rb_define_singleton_method(rb_cIO, "pipe", rb_io_s_pipe, -1); rb_define_singleton_method(rb_cIO, "pipe", rb_io_s_pipe, -1);
rb_define_singleton_method(rb_cIO, "try_convert", rb_io_s_try_convert, 1); rb_define_singleton_method(rb_cIO, "try_convert", rb_io_s_try_convert, 1);
rb_define_singleton_method(rb_cIO, "copy_stream", rb_io_s_copy_stream, -1);
rb_define_method(rb_cIO, "initialize", rb_io_initialize, -1); rb_define_method(rb_cIO, "initialize", rb_io_initialize, -1);

View file

@ -1,4 +1,7 @@
require 'test/unit' require 'test/unit'
require 'tmpdir'
require 'io/nonblock'
require 'socket'
class TestIO < Test::Unit::TestCase class TestIO < Test::Unit::TestCase
def test_gets_rs def test_gets_rs
@ -61,4 +64,337 @@ class TestIO < Test::Unit::TestCase
File.read("empty", nil, nil, {}) File.read("empty", nil, nil, {})
end end
end end
def with_pipe
r, w = IO.pipe
begin
yield r, w
ensure
r.close unless r.closed?
w.close unless w.closed?
end
end
def with_read_pipe(content)
r, w = IO.pipe
w << content
w.close
begin
yield r
ensure
r.close
end
end
def mkcdtmpdir
Dir.mktmpdir {|d|
Dir.chdir(d) {
yield
}
}
end
def test_copy_stream
mkcdtmpdir {|d|
content = "foobar"
File.open("src", "w") {|f| f << content }
ret = IO.copy_stream("src", "dst")
assert_equal(content.bytesize, ret)
assert_equal(content, File.read("dst"))
# overwrite by smaller file.
content = "baz"
File.open("src", "w") {|f| f << content }
ret = IO.copy_stream("src", "dst")
assert_equal(content.bytesize, ret)
assert_equal(content, File.read("dst"))
ret = IO.copy_stream("src", "dst", 2)
assert_equal(2, ret)
assert_equal(content[0,2], File.read("dst"))
ret = IO.copy_stream("src", "dst", 0)
assert_equal(0, ret)
assert_equal("", File.read("dst"))
ret = IO.copy_stream("src", "dst", nil, 1)
assert_equal(content.bytesize-1, ret)
assert_equal(content[1..-1], File.read("dst"))
assert_raise(Errno::ENOENT) {
IO.copy_stream("nodir/foo", "dst")
}
assert_raise(Errno::ENOENT) {
IO.copy_stream("src", "nodir/bar")
}
with_pipe {|r, w|
ret = IO.copy_stream("src", w)
assert_equal(content.bytesize, ret)
w.close
assert_equal(content, r.read)
}
with_pipe {|r, w|
w.close
assert_raise(IOError) { IO.copy_stream("src", w) }
}
pipe_content = "abc"
with_read_pipe(pipe_content) {|r|
ret = IO.copy_stream(r, "dst")
assert_equal(pipe_content.bytesize, ret)
assert_equal(pipe_content, File.read("dst"))
}
with_read_pipe("abc") {|r1|
assert_equal("a", r1.getc)
with_pipe {|r2, w2|
w2.sync = false
w2 << "def"
ret = IO.copy_stream(r1, w2)
assert_equal(2, ret)
w2.close
assert_equal("defbc", r2.read)
}
}
with_read_pipe("abc") {|r1|
assert_equal("a", r1.getc)
with_pipe {|r2, w2|
w2.sync = false
w2 << "def"
ret = IO.copy_stream(r1, w2, 1)
assert_equal(1, ret)
w2.close
assert_equal("defb", r2.read)
}
}
with_read_pipe("abc") {|r1|
assert_equal("a", r1.getc)
with_pipe {|r2, w2|
ret = IO.copy_stream(r1, w2)
assert_equal(2, ret)
w2.close
assert_equal("bc", r2.read)
}
}
with_read_pipe("abc") {|r1|
assert_equal("a", r1.getc)
with_pipe {|r2, w2|
ret = IO.copy_stream(r1, w2, 1)
assert_equal(1, ret)
w2.close
assert_equal("b", r2.read)
}
}
with_read_pipe("abc") {|r1|
assert_equal("a", r1.getc)
with_pipe {|r2, w2|
ret = IO.copy_stream(r1, w2, 0)
assert_equal(0, ret)
w2.close
assert_equal("", r2.read)
}
}
with_pipe {|r1, w1|
w1 << "abc"
assert_equal("a", r1.getc)
with_pipe {|r2, w2|
w1 << "def"
w1.close
ret = IO.copy_stream(r1, w2)
assert_equal(5, ret)
w2.close
assert_equal("bcdef", r2.read)
}
}
with_pipe {|r, w|
ret = IO.copy_stream("src", w, 1, 1)
assert_equal(1, ret)
w.close
assert_equal(content[1,1], r.read)
}
with_read_pipe("abc") {|r1|
assert_equal("a", r1.getc)
with_pipe {|r2, w2|
w2.nonblock = true
s = w2.syswrite("a" * 100000)
t = Thread.new { sleep 0.1; r2.read }
ret = IO.copy_stream(r1, w2)
w2.close
assert_equal(2, ret)
assert_equal("a" * s + "bc", t.value)
}
}
bigcontent = "abc" * 123456
File.open("bigsrc", "w") {|f| f << bigcontent }
ret = IO.copy_stream("bigsrc", "bigdst")
assert_equal(bigcontent.bytesize, ret)
assert_equal(bigcontent, File.read("bigdst"))
File.unlink("bigdst")
ret = IO.copy_stream("bigsrc", "bigdst", nil, 100)
assert_equal(bigcontent.bytesize-100, ret)
assert_equal(bigcontent[100..-1], File.read("bigdst"))
File.unlink("bigdst")
ret = IO.copy_stream("bigsrc", "bigdst", 30000, 100)
assert_equal(30000, ret)
assert_equal(bigcontent[100, 30000], File.read("bigdst"))
File.open("bigsrc") {|f|
assert_equal(0, f.pos)
ret = IO.copy_stream(f, "bigdst", nil, 10)
assert_equal(bigcontent.bytesize-10, ret)
assert_equal(bigcontent[10..-1], File.read("bigdst"))
assert_equal(0, f.pos)
ret = IO.copy_stream(f, "bigdst", 40, 30)
assert_equal(40, ret)
assert_equal(bigcontent[30, 40], File.read("bigdst"))
assert_equal(0, f.pos)
}
with_pipe {|r, w|
w.close
assert_raise(IOError) { IO.copy_stream("src", w) }
}
megacontent = "abc" * 1234567
File.open("megasrc", "w") {|f| f << megacontent }
with_pipe {|r1, w1|
with_pipe {|r2, w2|
t1 = Thread.new { w1 << megacontent; w1.close }
t2 = Thread.new { r2.read }
r1.nonblock = true
w2.nonblock = true
ret = IO.copy_stream(r1, w2)
assert_equal(megacontent.bytesize, ret)
w2.close
t1.join
assert_equal(megacontent, t2.value)
}
}
with_pipe {|r1, w1|
with_pipe {|r2, w2|
t1 = Thread.new { w1 << megacontent; w1.close }
t2 = Thread.new { r2.read }
ret = IO.copy_stream(r1, w2)
assert_equal(megacontent.bytesize, ret)
w2.close
t1.join
assert_equal(megacontent, t2.value)
}
}
with_pipe {|r, w|
t = Thread.new { r.read }
ret = IO.copy_stream("megasrc", w)
assert_equal(megacontent.bytesize, ret)
w.close
assert_equal(megacontent, t.value)
}
}
end
def with_socketpair
s1, s2 = UNIXSocket.pair
begin
yield s1, s2
ensure
s1.close unless s1.closed?
s2.close unless s2.closed?
end
end
def test_copy_stream_socket
return unless defined? UNIXSocket
mkcdtmpdir {|d|
content = "foobar"
File.open("src", "w") {|f| f << content }
with_socketpair {|s1, s2|
ret = IO.copy_stream("src", s1)
assert_equal(content.bytesize, ret)
s1.close
assert_equal(content, s2.read)
}
bigcontent = "abc" * 123456
File.open("bigsrc", "w") {|f| f << bigcontent }
with_socketpair {|s1, s2|
t = Thread.new { s2.read }
ret = IO.copy_stream("bigsrc", s1)
assert_equal(bigcontent.bytesize, ret)
s1.close
result = t.value
assert_equal(bigcontent, result)
}
with_socketpair {|s1, s2|
t = Thread.new { s2.read }
ret = IO.copy_stream("bigsrc", s1, 10000)
assert_equal(10000, ret)
s1.close
result = t.value
assert_equal(bigcontent[0,10000], result)
}
File.open("bigsrc") {|f|
assert_equal(0, f.pos)
with_socketpair {|s1, s2|
t = Thread.new { s2.read }
ret = IO.copy_stream(f, s1, nil, 100)
assert_equal(bigcontent.bytesize-100, ret)
assert_equal(0, f.pos)
s1.close
result = t.value
assert_equal(bigcontent[100..-1], result)
}
}
File.open("bigsrc") {|f|
assert_equal(bigcontent[0,100], f.read(100))
assert_equal(100, f.pos)
with_socketpair {|s1, s2|
t = Thread.new { s2.read }
ret = IO.copy_stream(f, s1)
assert_equal(bigcontent.bytesize-100, ret)
assert_equal(bigcontent.length, f.pos)
s1.close
result = t.value
assert_equal(bigcontent[100..-1], result)
}
}
megacontent = "abc" * 1234567
File.open("megasrc", "w") {|f| f << megacontent }
with_socketpair {|s1, s2|
t = Thread.new { s2.read }
s1.nonblock = true
ret = IO.copy_stream("megasrc", s1)
assert_equal(megacontent.bytesize, ret)
s1.close
result = t.value
assert_equal(megacontent, result)
}
}
end
end end

View file

@ -1702,6 +1702,25 @@ rb_fd_copy(rb_fdset_t *dst, const fd_set *src, int max)
memcpy(dst->fdset, src, size); memcpy(dst->fdset, src, size);
} }
int
rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout)
{
fd_set *r = NULL, *w = NULL, *e = NULL;
if (readfds) {
rb_fd_resize(n - 1, readfds);
r = rb_fd_ptr(readfds);
}
if (writefds) {
rb_fd_resize(n - 1, writefds);
w = rb_fd_ptr(writefds);
}
if (exceptfds) {
rb_fd_resize(n - 1, exceptfds);
e = rb_fd_ptr(exceptfds);
}
return select(n, r, w, e, timeout);
}
#undef FD_ZERO #undef FD_ZERO
#undef FD_SET #undef FD_SET
#undef FD_CLR #undef FD_CLR