From 0ef84dcefce47cab974f8763abd7cbb3a7eb6d98 Mon Sep 17 00:00:00 2001 From: akr Date: Sun, 30 Mar 2008 06:38:05 +0000 Subject: [PATCH] * io.c: IO.copy_stream implemented. [ruby-dev:33843] * thread.c (rb_fd_select): new function. * configure.in (sys/sendfile.h): check the header file. (sendfile): check the function. (pread): check the function. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@15858 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- ChangeLog | 10 + configure.in | 5 +- io.c | 472 +++++++++++++++++++++++++++++++++++++++++++ test/ruby/test_io.rb | 336 ++++++++++++++++++++++++++++++ thread.c | 19 ++ 5 files changed, 840 insertions(+), 2 deletions(-) diff --git a/ChangeLog b/ChangeLog index 55e9fb3765..4cb507f093 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,13 @@ +Sun Mar 30 15:33:29 2008 Tanaka Akira + + * io.c: IO.copy_stream implemented. [ruby-dev:33843] + + * thread.c (rb_fd_select): new function. + + * configure.in (sys/sendfile.h): check the header file. + (sendfile): check the function. + (pread): check the function. + Sat Mar 29 14:18:41 2008 Hidetoshi NAGAI * ext/tk/*: full update Ruby/Tk to support Ruby(1.9|1.8) and Tc/Tk8.5. diff --git a/configure.in b/configure.in index 534db63e90..9246d990be 100644 --- a/configure.in +++ b/configure.in @@ -592,7 +592,7 @@ AC_CHECK_HEADERS(stdlib.h string.h unistd.h limits.h sys/file.h sys/ioctl.h sys/ fcntl.h sys/fcntl.h sys/select.h sys/time.h sys/times.h sys/param.h\ syscall.h pwd.h grp.h a.out.h utime.h memory.h direct.h sys/resource.h \ sys/mkdev.h sys/utime.h xti.h netinet/in_systm.h float.h ieeefp.h pthread.h \ - ucontext.h intrinsics.h langinfo.h locale.h) + ucontext.h intrinsics.h langinfo.h locale.h sys/sendfile.h) dnl Check additional types. AC_CHECK_SIZEOF(rlim_t, 0, [ @@ -705,7 +705,8 @@ AC_CHECK_FUNCS(fmod killpg wait4 waitpid fork spawnv syscall chroot fsync getcwd dlopen sigprocmask sigaction sigsetjmp _setjmp vsnprintf snprintf\ setsid telldir seekdir fchmod cosh sinh tanh log2 round signbit\ setuid setgid daemon select_large_fdset setenv unsetenv\ - mktime timegm clock_gettime gettimeofday) + mktime timegm clock_gettime gettimeofday\ + pread sendfile) AC_ARG_ENABLE(setreuid, [ --enable-setreuid use setreuid()/setregid() according to need even if obsolete.], [use_setreuid=$enableval]) diff --git a/io.c b/io.c index d19392bf8e..c8cb76ba7f 100644 --- a/io.c +++ b/io.c @@ -14,6 +14,7 @@ #include "ruby/ruby.h" #include "ruby/io.h" #include "ruby/signal.h" +#include "vm_core.h" #include #include @@ -6219,6 +6220,476 @@ rb_io_s_read(int argc, VALUE *argv, VALUE io) return rb_ensure(io_s_read, (VALUE)&arg, rb_io_close, arg.io); } +struct copy_stream_struct { + VALUE src; + VALUE dst; + int src_fd; + int dst_fd; + off_t copy_length; + off_t src_offset; + int close_src; + int close_dst; + off_t total; + char *syserr; + int error_no; + char *notimp; + rb_fdset_t fds; + rb_thread_t *th; +}; + +static void +copy_stream_rbuf_to_dst(struct copy_stream_struct *stp, + rb_io_t *src_fptr, rb_io_t *dst_fptr, const char *dst_path) +{ + ssize_t r; + int len; +retry: + len = src_fptr->rbuf_len; + if (stp->copy_length != (off_t)-1 && stp->copy_length < len) { + len = stp->copy_length; + } + if (len == 0) + return; + r = rb_write_internal(dst_fptr->fd, src_fptr->rbuf + src_fptr->rbuf_off, len); + if (len == r) { + src_fptr->rbuf_len -= len; + if (src_fptr->rbuf_len < 0) src_fptr->rbuf_len = 0; + if (stp->copy_length != (off_t)-1) stp->copy_length -= len; + stp->total += len; + return; + } + else if (0 <= r) { + src_fptr->rbuf_off += r; + src_fptr->rbuf_len -= r; + if (stp->copy_length != (off_t)-1) stp->copy_length -= r; + stp->total += r; + errno = EAGAIN; + } + if (rb_io_wait_writable(dst_fptr->fd)) { + rb_io_check_closed(dst_fptr); + if (src_fptr->rbuf_len) + goto retry; + } + rb_sys_fail(dst_path); +} + +static int +copy_stream_wait_read(struct copy_stream_struct *stp) +{ + int ret; + rb_fd_zero(&stp->fds); + rb_fd_set(stp->src_fd, &stp->fds); + ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL); + if (ret == -1) { + stp->syserr = "select"; + stp->error_no = errno; + return -1; + } + return 0; +} + +static int +copy_stream_wait_write(struct copy_stream_struct *stp) +{ + int ret; + rb_fd_zero(&stp->fds); + rb_fd_set(stp->dst_fd, &stp->fds); + ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL); + if (ret == -1) { + stp->syserr = "select"; + stp->error_no = errno; + return -1; + } + return 0; +} + +#ifdef HAVE_SENDFILE + +#ifdef __linux__ +#define USE_SENDFILE + +#ifdef HAVE_SYS_SENDFILE_H +#include +#endif + +static ssize_t +simple_sendfile(int out_fd, int in_fd, off_t *offset, size_t count) +{ + return sendfile(out_fd, in_fd, offset, count); +} + +#endif + +#endif + +#ifdef USE_SENDFILE +static int +copy_stream_sendfile(struct copy_stream_struct *stp) +{ + struct stat src_stat, dst_stat; + ssize_t ss; + int ret; + + off_t copy_length; + off_t src_offset; + int use_pread; + + ret = fstat(stp->src_fd, &src_stat); + if (ret == -1) { + stp->syserr = "fstat"; + stp->error_no = errno; + return -1; + } + if (!S_ISREG(src_stat.st_mode)) + return 0; + + ret = fstat(stp->dst_fd, &dst_stat); + if (ret == -1) { + stp->syserr = "fstat"; + stp->error_no = errno; + return -1; + } + if ((dst_stat.st_mode & S_IFMT) != S_IFSOCK) + return 0; + + src_offset = stp->src_offset; + use_pread = src_offset != (off_t)-1; + + copy_length = stp->copy_length; + if (copy_length == (off_t)-1) { + if (use_pread) + copy_length = src_stat.st_size - src_offset; + else { + off_t cur = lseek(stp->src_fd, 0, SEEK_CUR); + if (cur == (off_t)-1) { + stp->syserr = "lseek"; + stp->error_no = errno; + return -1; + } + copy_length = src_stat.st_size - cur; + } + } + +retry_sendfile: + if (use_pread) { + ss = simple_sendfile(stp->dst_fd, stp->src_fd, &src_offset, copy_length); + } + else { + ss = simple_sendfile(stp->dst_fd, stp->src_fd, NULL, copy_length); + } + if (0 < ss) { + stp->total += ss; + copy_length -= ss; + if (0 < copy_length) { + ss = -1; + errno = EAGAIN; + } + } + if (ss == -1) { + if (errno == EINVAL || errno == ENOSYS) + return 0; + if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (copy_stream_wait_write(stp) == -1) + return -1; + if (RUBY_VM_INTERRUPTED(stp->th)) + return; + goto retry_sendfile; + } + stp->syserr = "sendfile"; + stp->error_no = errno; + return -1; + } + return 1; +} +#endif + +static ssize_t +copy_stream_read(struct copy_stream_struct *stp, char *buf, int len, off_t offset) +{ + ssize_t ss; +retry_read: + if (offset == (off_t)-1) + ss = read(stp->src_fd, buf, len); + else { +#ifdef HAVE_PREAD + ss = pread(stp->src_fd, buf, len, offset); +#else + stp->notimp = "pread"; + return -1; +#endif + } + if (ss == 0) { + return 0; + } + if (ss == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (copy_stream_wait_read(stp) == -1) + return -1; + goto retry_read; + } + if (errno == ENOSYS) { + stp->notimp = "pread"; + return -1; + } + stp->syserr = offset == (off_t)-1 ? "read" : "pread"; + stp->error_no = errno; + return -1; + } + return ss; +} + +static int +copy_stream_write(struct copy_stream_struct *stp, char *buf, int len) +{ + ssize_t ss; + int off = 0; + while (len) { + ss = write(stp->dst_fd, buf+off, len); + if (ss == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (copy_stream_wait_write(stp) == -1) + return -1; + continue; + } + stp->syserr = "write"; + stp->error_no = errno; + return -1; + } + off += ss; + len -= ss; + stp->total += ss; + } + return 0; +} + +static void +copy_stream_read_write(struct copy_stream_struct *stp) +{ + char buf[1024*16]; + int len; + ssize_t ss; + int ret; + off_t copy_length; + int use_eof; + off_t src_offset; + int use_pread; + + copy_length = stp->copy_length; + use_eof = copy_length == (off_t)-1; + src_offset = stp->src_offset; + use_pread = src_offset != (off_t)-1; + + if (use_pread && stp->close_src) { + off_t r; + r = lseek(stp->src_fd, src_offset, SEEK_SET); + if (r == (off_t)-1) { + stp->syserr = "lseek"; + stp->error_no = errno; + return; + } + src_offset = (off_t)-1; + use_pread = 0; + } + + while (use_eof || 0 < copy_length) { + if (!use_eof && copy_length < sizeof(buf)) { + len = copy_length; + } + else { + len = sizeof(buf); + } + if (use_pread) { + ss = copy_stream_read(stp, buf, len, src_offset); + if (0 < ss) + src_offset += ss; + } + else { + ss = copy_stream_read(stp, buf, len, (off_t)-1); + } + if (ss <= 0) /* EOF or error */ + return; + + ret = copy_stream_write(stp, buf, ss); + if (ret < 0) + return; + + if (!use_eof) + copy_length -= ss; + + if (RUBY_VM_INTERRUPTED(stp->th)) + return; + } +} + +static VALUE +copy_stream_func(void *arg) +{ + struct copy_stream_struct *stp = (struct copy_stream_struct *)arg; + int ret; + +#ifdef USE_SENDFILE + ret = copy_stream_sendfile(stp); + if (ret != 0) + goto finish; /* error or success */ +#endif + + copy_stream_read_write(stp); + +finish: + return Qnil; +} + +static VALUE +copy_stream_body(VALUE arg) +{ + struct copy_stream_struct *stp = (struct copy_stream_struct *)arg; + VALUE src_io, dst_io; + rb_io_t *src_fptr, *dst_fptr; + int src_fd, dst_fd; + char *src_path = 0, *dst_path = 0; + + stp->th = GET_THREAD(); + + src_io = rb_check_convert_type(stp->src, T_FILE, "IO", "to_io"); + if (!NIL_P(src_io)) { + GetOpenFile(src_io, src_fptr); + src_fd = src_fptr->fd; + } + else { + src_fptr = 0; + FilePathValue(stp->src); + src_path = StringValueCStr(stp->src); + src_fd = rb_sysopen_internal(src_path, O_RDONLY|O_NOCTTY, 0); + if (src_fd == -1) { rb_sys_fail(src_path); } + stp->close_src = 1; + } + stp->src_fd = src_fd; + + dst_io = rb_check_convert_type(stp->dst, T_FILE, "IO", "to_io"); + if (!NIL_P(dst_io)) { + dst_io = GetWriteIO(dst_io); + GetOpenFile(dst_io, dst_fptr); + dst_fd = dst_fptr->fd; + } + else { + dst_fptr = 0; + FilePathValue(stp->dst); + dst_path = StringValueCStr(stp->dst); + dst_fd = rb_sysopen_internal(dst_path, O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY, 0600); + if (dst_fd == -1) { rb_sys_fail(dst_path); } + stp->close_dst = 1; + } + stp->dst_fd = dst_fd; + + stp->total = 0; + + if (src_fptr && dst_fptr && src_fptr->rbuf_len && dst_fptr->wbuf_len) { + long len = src_fptr->rbuf_len; + VALUE str; + if (stp->copy_length != (off_t)-1 && stp->copy_length < len) { + len = stp->copy_length; + } + str = rb_str_buf_new(len); + rb_str_resize(str,len); + read_buffered_data(RSTRING_PTR(str), len, src_fptr); + io_fwrite(str, dst_fptr); + stp->total += len; + if (stp->copy_length != (off_t)-1) + stp->copy_length -= len; + } + + if (dst_fptr && io_fflush(dst_fptr) < 0) { + rb_raise(rb_eIOError, "flush failed"); + } + + if (src_fptr) { + copy_stream_rbuf_to_dst(stp, src_fptr, dst_fptr, dst_path); + } + + if (stp->copy_length == 0) + return Qnil; + + rb_fd_init(&stp->fds); + rb_fd_set(src_fd, &stp->fds); + rb_fd_set(dst_fd, &stp->fds); + + return rb_thread_blocking_region(copy_stream_func, (void*)stp, RB_UBF_DFL, 0); +} + +static VALUE +copy_stream_finalize(VALUE arg) +{ + struct copy_stream_struct *stp = (struct copy_stream_struct *)arg; + if (stp->close_src) + close(stp->src_fd); + if (stp->close_dst) + close(stp->dst_fd); + rb_fd_term(&stp->fds); + if (stp->syserr) { + errno = stp->error_no; + rb_sys_fail(stp->syserr); + } + if (stp->notimp) { + rb_raise(rb_eNotImpError, "%s() not implemented", stp->notimp); + } + return Qnil; +} + +/* + * call-seq: + * IO.copy_stream(src, dst) + * IO.copy_stream(src, dst, copy_length) + * IO.copy_stream(src, dst, copy_length, src_offset) + * + * IO.copy_stream copies src to dst. + * src and dst is either a filename or an IO. + * + * This method returns the number of bytes copied. + * + * If optional arguments are not given, + * the start position of the copy is + * the beginning of the filename or + * the current file offset of the IO. + * The end position of the copy is the end of file. + * + * If copy_length is given, + * No more than copy_length bytes are copied. + * + * If src_offset is given, + * it specifies the start position of the copy. + * + * When src_offset is specified and + * src is an IO, + * IO.copy_stream doesn't move the current file offset. + * + */ +static VALUE +rb_io_s_copy_stream(int argc, VALUE *argv, VALUE io) +{ + VALUE src, dst, length, src_offset; + struct copy_stream_struct st; + + MEMZERO(&st, struct copy_stream_struct, 1); + + rb_scan_args(argc, argv, "22", &src, &dst, &length, &src_offset); + + if (NIL_P(length)) + st.copy_length = (off_t)-1; + else + st.copy_length = NUM2OFFT(length); + + if (NIL_P(src_offset)) + st.src_offset = (off_t)-1; + else + st.src_offset = NUM2OFFT(src_offset); + + st.src = src; + st.dst = dst; + + rb_ensure(copy_stream_body, (VALUE)&st, copy_stream_finalize, (VALUE)&st); + + return OFFT2NUM(st.total); +} /* * call-seq: @@ -6874,6 +7345,7 @@ Init_IO(void) rb_define_singleton_method(rb_cIO, "select", rb_f_select, -1); rb_define_singleton_method(rb_cIO, "pipe", rb_io_s_pipe, -1); rb_define_singleton_method(rb_cIO, "try_convert", rb_io_s_try_convert, 1); + rb_define_singleton_method(rb_cIO, "copy_stream", rb_io_s_copy_stream, -1); rb_define_method(rb_cIO, "initialize", rb_io_initialize, -1); diff --git a/test/ruby/test_io.rb b/test/ruby/test_io.rb index ae26609a21..0cb8a775e2 100644 --- a/test/ruby/test_io.rb +++ b/test/ruby/test_io.rb @@ -1,4 +1,7 @@ require 'test/unit' +require 'tmpdir' +require 'io/nonblock' +require 'socket' class TestIO < Test::Unit::TestCase def test_gets_rs @@ -61,4 +64,337 @@ class TestIO < Test::Unit::TestCase File.read("empty", nil, nil, {}) end end + + def with_pipe + r, w = IO.pipe + begin + yield r, w + ensure + r.close unless r.closed? + w.close unless w.closed? + end + end + + def with_read_pipe(content) + r, w = IO.pipe + w << content + w.close + begin + yield r + ensure + r.close + end + end + + def mkcdtmpdir + Dir.mktmpdir {|d| + Dir.chdir(d) { + yield + } + } + end + + def test_copy_stream + mkcdtmpdir {|d| + + content = "foobar" + File.open("src", "w") {|f| f << content } + ret = IO.copy_stream("src", "dst") + assert_equal(content.bytesize, ret) + assert_equal(content, File.read("dst")) + + # overwrite by smaller file. + content = "baz" + File.open("src", "w") {|f| f << content } + ret = IO.copy_stream("src", "dst") + assert_equal(content.bytesize, ret) + assert_equal(content, File.read("dst")) + + ret = IO.copy_stream("src", "dst", 2) + assert_equal(2, ret) + assert_equal(content[0,2], File.read("dst")) + + ret = IO.copy_stream("src", "dst", 0) + assert_equal(0, ret) + assert_equal("", File.read("dst")) + + ret = IO.copy_stream("src", "dst", nil, 1) + assert_equal(content.bytesize-1, ret) + assert_equal(content[1..-1], File.read("dst")) + + assert_raise(Errno::ENOENT) { + IO.copy_stream("nodir/foo", "dst") + } + + assert_raise(Errno::ENOENT) { + IO.copy_stream("src", "nodir/bar") + } + + with_pipe {|r, w| + ret = IO.copy_stream("src", w) + assert_equal(content.bytesize, ret) + w.close + assert_equal(content, r.read) + } + + with_pipe {|r, w| + w.close + assert_raise(IOError) { IO.copy_stream("src", w) } + } + + pipe_content = "abc" + with_read_pipe(pipe_content) {|r| + ret = IO.copy_stream(r, "dst") + assert_equal(pipe_content.bytesize, ret) + assert_equal(pipe_content, File.read("dst")) + } + + with_read_pipe("abc") {|r1| + assert_equal("a", r1.getc) + with_pipe {|r2, w2| + w2.sync = false + w2 << "def" + ret = IO.copy_stream(r1, w2) + assert_equal(2, ret) + w2.close + assert_equal("defbc", r2.read) + } + } + + with_read_pipe("abc") {|r1| + assert_equal("a", r1.getc) + with_pipe {|r2, w2| + w2.sync = false + w2 << "def" + ret = IO.copy_stream(r1, w2, 1) + assert_equal(1, ret) + w2.close + assert_equal("defb", r2.read) + } + } + + with_read_pipe("abc") {|r1| + assert_equal("a", r1.getc) + with_pipe {|r2, w2| + ret = IO.copy_stream(r1, w2) + assert_equal(2, ret) + w2.close + assert_equal("bc", r2.read) + } + } + + with_read_pipe("abc") {|r1| + assert_equal("a", r1.getc) + with_pipe {|r2, w2| + ret = IO.copy_stream(r1, w2, 1) + assert_equal(1, ret) + w2.close + assert_equal("b", r2.read) + } + } + + with_read_pipe("abc") {|r1| + assert_equal("a", r1.getc) + with_pipe {|r2, w2| + ret = IO.copy_stream(r1, w2, 0) + assert_equal(0, ret) + w2.close + assert_equal("", r2.read) + } + } + + with_pipe {|r1, w1| + w1 << "abc" + assert_equal("a", r1.getc) + with_pipe {|r2, w2| + w1 << "def" + w1.close + ret = IO.copy_stream(r1, w2) + assert_equal(5, ret) + w2.close + assert_equal("bcdef", r2.read) + } + } + + with_pipe {|r, w| + ret = IO.copy_stream("src", w, 1, 1) + assert_equal(1, ret) + w.close + assert_equal(content[1,1], r.read) + } + + with_read_pipe("abc") {|r1| + assert_equal("a", r1.getc) + with_pipe {|r2, w2| + w2.nonblock = true + s = w2.syswrite("a" * 100000) + t = Thread.new { sleep 0.1; r2.read } + ret = IO.copy_stream(r1, w2) + w2.close + assert_equal(2, ret) + assert_equal("a" * s + "bc", t.value) + } + } + + + bigcontent = "abc" * 123456 + File.open("bigsrc", "w") {|f| f << bigcontent } + ret = IO.copy_stream("bigsrc", "bigdst") + assert_equal(bigcontent.bytesize, ret) + assert_equal(bigcontent, File.read("bigdst")) + + File.unlink("bigdst") + ret = IO.copy_stream("bigsrc", "bigdst", nil, 100) + assert_equal(bigcontent.bytesize-100, ret) + assert_equal(bigcontent[100..-1], File.read("bigdst")) + + File.unlink("bigdst") + ret = IO.copy_stream("bigsrc", "bigdst", 30000, 100) + assert_equal(30000, ret) + assert_equal(bigcontent[100, 30000], File.read("bigdst")) + + File.open("bigsrc") {|f| + assert_equal(0, f.pos) + ret = IO.copy_stream(f, "bigdst", nil, 10) + assert_equal(bigcontent.bytesize-10, ret) + assert_equal(bigcontent[10..-1], File.read("bigdst")) + assert_equal(0, f.pos) + ret = IO.copy_stream(f, "bigdst", 40, 30) + assert_equal(40, ret) + assert_equal(bigcontent[30, 40], File.read("bigdst")) + assert_equal(0, f.pos) + } + + with_pipe {|r, w| + w.close + assert_raise(IOError) { IO.copy_stream("src", w) } + } + + megacontent = "abc" * 1234567 + File.open("megasrc", "w") {|f| f << megacontent } + + with_pipe {|r1, w1| + with_pipe {|r2, w2| + t1 = Thread.new { w1 << megacontent; w1.close } + t2 = Thread.new { r2.read } + r1.nonblock = true + w2.nonblock = true + ret = IO.copy_stream(r1, w2) + assert_equal(megacontent.bytesize, ret) + w2.close + t1.join + assert_equal(megacontent, t2.value) + } + } + + with_pipe {|r1, w1| + with_pipe {|r2, w2| + t1 = Thread.new { w1 << megacontent; w1.close } + t2 = Thread.new { r2.read } + ret = IO.copy_stream(r1, w2) + assert_equal(megacontent.bytesize, ret) + w2.close + t1.join + assert_equal(megacontent, t2.value) + } + } + + with_pipe {|r, w| + t = Thread.new { r.read } + ret = IO.copy_stream("megasrc", w) + assert_equal(megacontent.bytesize, ret) + w.close + assert_equal(megacontent, t.value) + } + } + end + + def with_socketpair + s1, s2 = UNIXSocket.pair + begin + yield s1, s2 + ensure + s1.close unless s1.closed? + s2.close unless s2.closed? + end + end + + def test_copy_stream_socket + return unless defined? UNIXSocket + mkcdtmpdir {|d| + + content = "foobar" + File.open("src", "w") {|f| f << content } + + with_socketpair {|s1, s2| + ret = IO.copy_stream("src", s1) + assert_equal(content.bytesize, ret) + s1.close + assert_equal(content, s2.read) + } + + bigcontent = "abc" * 123456 + File.open("bigsrc", "w") {|f| f << bigcontent } + + with_socketpair {|s1, s2| + t = Thread.new { s2.read } + ret = IO.copy_stream("bigsrc", s1) + assert_equal(bigcontent.bytesize, ret) + s1.close + result = t.value + assert_equal(bigcontent, result) + } + + with_socketpair {|s1, s2| + t = Thread.new { s2.read } + ret = IO.copy_stream("bigsrc", s1, 10000) + assert_equal(10000, ret) + s1.close + result = t.value + assert_equal(bigcontent[0,10000], result) + } + + File.open("bigsrc") {|f| + assert_equal(0, f.pos) + with_socketpair {|s1, s2| + t = Thread.new { s2.read } + ret = IO.copy_stream(f, s1, nil, 100) + assert_equal(bigcontent.bytesize-100, ret) + assert_equal(0, f.pos) + s1.close + result = t.value + assert_equal(bigcontent[100..-1], result) + } + } + + File.open("bigsrc") {|f| + assert_equal(bigcontent[0,100], f.read(100)) + assert_equal(100, f.pos) + with_socketpair {|s1, s2| + t = Thread.new { s2.read } + ret = IO.copy_stream(f, s1) + assert_equal(bigcontent.bytesize-100, ret) + assert_equal(bigcontent.length, f.pos) + s1.close + result = t.value + assert_equal(bigcontent[100..-1], result) + } + } + + megacontent = "abc" * 1234567 + File.open("megasrc", "w") {|f| f << megacontent } + + with_socketpair {|s1, s2| + t = Thread.new { s2.read } + s1.nonblock = true + ret = IO.copy_stream("megasrc", s1) + assert_equal(megacontent.bytesize, ret) + s1.close + result = t.value + assert_equal(megacontent, result) + } + + + } + end end diff --git a/thread.c b/thread.c index 8fc6dd9245..bdf1d4acc8 100644 --- a/thread.c +++ b/thread.c @@ -1702,6 +1702,25 @@ rb_fd_copy(rb_fdset_t *dst, const fd_set *src, int max) memcpy(dst->fdset, src, size); } +int +rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout) +{ + fd_set *r = NULL, *w = NULL, *e = NULL; + if (readfds) { + rb_fd_resize(n - 1, readfds); + r = rb_fd_ptr(readfds); + } + if (writefds) { + rb_fd_resize(n - 1, writefds); + w = rb_fd_ptr(writefds); + } + if (exceptfds) { + rb_fd_resize(n - 1, exceptfds); + e = rb_fd_ptr(exceptfds); + } + return select(n, r, w, e, timeout); +} + #undef FD_ZERO #undef FD_SET #undef FD_CLR