diff --git a/ChangeLog b/ChangeLog index 8cbdbc13f2..2f3cea283e 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,9 @@ +Tue Jul 3 06:02:54 2012 Eric Hodel + + * ext/zlib/zlib.c (zstream_run): Process zlib streams without GVL. + [Feature #6615] + * NEWS: ditto. + Mon Jul 2 22:13:04 2012 Tanaka Akira * thread.c (rb_thread_aref): add explanation for why Thread#[] and diff --git a/NEWS b/NEWS index 7431c79be0..3f45b2f6e9 100644 --- a/NEWS +++ b/NEWS @@ -117,6 +117,8 @@ with all sufficient information, see the ChangeLog file. * zlib * Added support for the new deflate strategies Zlib::RLE and Zlib::FIXED. + * Zlib streams are now processed without the GVL. This allows gzip, zlib and + deflate streams to be processed in parallel. * openssl * Consistently raise an error when trying to encode nil values. All instances diff --git a/ext/zlib/zlib.c b/ext/zlib/zlib.c index 149f54d9b2..9e61ca20e5 100644 --- a/ext/zlib/zlib.c +++ b/ext/zlib/zlib.c @@ -72,6 +72,7 @@ static void finalizer_warn(const char*); struct zstream; struct zstream_funcs; +struct zstream_run_args; static void zstream_init(struct zstream*, const struct zstream_funcs*); static void zstream_expand_buffer(struct zstream*); static void zstream_expand_buffer_into(struct zstream*, unsigned long); @@ -564,6 +565,11 @@ static const struct zstream_funcs inflate_funcs = { inflateReset, inflateEnd, inflate, }; +struct zstream_run_args { + struct zstream * z; + int flush; + int interrupt; +}; static voidpf zlib_mem_alloc(voidpf opaque, uInt items, uInt size) @@ -655,6 +661,42 @@ zstream_expand_buffer_into(struct zstream *z, unsigned long size) } } +static int +zstream_expand_buffer_without_gvl(struct zstream *z) +{ + char * new_str; + long inc, len; + + if (RSTRING_LEN(z->buf) - z->buf_filled >= ZSTREAM_AVAIL_OUT_STEP_MAX) { + z->stream.avail_out = ZSTREAM_AVAIL_OUT_STEP_MAX; + } + else { + inc = z->buf_filled / 2; + if (inc < ZSTREAM_AVAIL_OUT_STEP_MIN) { + inc = ZSTREAM_AVAIL_OUT_STEP_MIN; + } + + len = z->buf_filled + inc; + + new_str = realloc(RSTRING(z->buf)->as.heap.ptr, len + 1); + + if (!new_str) + return 0; + + /* from rb_str_resize */ + RSTRING(z->buf)->as.heap.ptr = new_str; + RSTRING(z->buf)->as.heap.ptr[len] = '\0'; /* sentinel */ + RSTRING(z->buf)->as.heap.len = + RSTRING(z->buf)->as.heap.aux.capa = len; + + z->stream.avail_out = (inc < ZSTREAM_AVAIL_OUT_STEP_MAX) ? + (int)inc : ZSTREAM_AVAIL_OUT_STEP_MAX; + } + z->stream.next_out = (Bytef*)RSTRING_PTR(z->buf) + z->buf_filled; + + return 1; +} + static void zstream_append_buffer(struct zstream *z, const Bytef *src, long len) { @@ -871,13 +913,62 @@ zstream_end(struct zstream *z) return Qnil; } +static VALUE +zstream_run_func(void *ptr) { + struct zstream_run_args *args = (struct zstream_run_args *)ptr; + int err, flush = args->flush; + struct zstream *z = args->z; + uInt n; + + while (!args->interrupt) { + n = z->stream.avail_out; + err = z->func->run(&z->stream, flush); + z->buf_filled += n - z->stream.avail_out; + + if (err == Z_STREAM_END) { + z->flags &= ~ZSTREAM_FLAG_IN_STREAM; + z->flags |= ZSTREAM_FLAG_FINISHED; + break; + } + + if (err != Z_OK) + break; + + if (z->stream.avail_out > 0) { + z->flags |= ZSTREAM_FLAG_IN_STREAM; + break; + } + + if (!zstream_expand_buffer_without_gvl(z)) { + err = Z_MEM_ERROR; /* realloc failed */ + break; + } + } + + return (VALUE)err; +} + +/* + * There is no safe way to interrupt z->run->func(). + */ +static void +zstream_unblock_func(void *ptr) { + struct zstream_run_args *args = (struct zstream_run_args *)ptr; + + args->interrupt = 1; +} + static void zstream_run(struct zstream *z, Bytef *src, long len, int flush) { - uInt n; + struct zstream_run_args args; int err; volatile VALUE guard = Qnil; + args.z = z; + args.flush = flush; + args.interrupt = 0; + if (NIL_P(z->input) && len == 0) { z->stream.next_in = (Bytef*)""; z->stream.avail_in = 0; @@ -896,49 +987,34 @@ zstream_run(struct zstream *z, Bytef *src, long len, int flush) zstream_expand_buffer(z); } - for (;;) { - /* VC allocates err and guard to same address. accessing err and guard - in same scope prevents it. */ - RB_GC_GUARD(guard); - n = z->stream.avail_out; - err = z->func->run(&z->stream, flush); - z->buf_filled += n - z->stream.avail_out; - rb_thread_schedule(); +loop: + err = (int)rb_thread_blocking_region( + zstream_run_func, (void *)&args, + zstream_unblock_func, (void *)&args); - if (err == Z_STREAM_END) { - z->flags &= ~ZSTREAM_FLAG_IN_STREAM; - z->flags |= ZSTREAM_FLAG_FINISHED; - break; - } - if (err != Z_OK) { - if (flush != Z_FINISH && err == Z_BUF_ERROR - && z->stream.avail_out > 0) { - z->flags |= ZSTREAM_FLAG_IN_STREAM; - break; - } - zstream_reset_input(z); - if (z->stream.avail_in > 0) { - zstream_append_input(z, z->stream.next_in, z->stream.avail_in); - } - if (err == Z_NEED_DICT) { - VALUE self = (VALUE)z->stream.opaque; - VALUE dicts = rb_ivar_get(self, id_dictionaries); - VALUE dict = rb_hash_aref(dicts, rb_uint2inum(z->stream.adler)); - if (!NIL_P(dict)) { - rb_inflate_set_dictionary(self, dict); - continue; - } - } - raise_zlib_error(err, z->stream.msg); - } - if (z->stream.avail_out > 0) { - z->flags |= ZSTREAM_FLAG_IN_STREAM; - break; - } - zstream_expand_buffer(z); + if (flush != Z_FINISH && err == Z_BUF_ERROR + && z->stream.avail_out > 0) { + z->flags |= ZSTREAM_FLAG_IN_STREAM; } zstream_reset_input(z); + + if (err != Z_OK && err != Z_STREAM_END) { + if (z->stream.avail_in > 0) { + zstream_append_input(z, z->stream.next_in, z->stream.avail_in); + } + if (err == Z_NEED_DICT) { + VALUE self = (VALUE)z->stream.opaque; + VALUE dicts = rb_ivar_get(self, id_dictionaries); + VALUE dict = rb_hash_aref(dicts, rb_uint2inum(z->stream.adler)); + if (!NIL_P(dict)) { + rb_inflate_set_dictionary(self, dict); + goto loop; + } + } + raise_zlib_error(err, z->stream.msg); + } + if (z->stream.avail_in > 0) { zstream_append_input(z, z->stream.next_in, z->stream.avail_in); guard = Qnil; /* prevent tail call to make guard effective */