mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
* ext/zlib/zlib.c (zstream_run): Process zlib streams without GVL.
[Feature #6615] * NEWS: ditto. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@36270 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
c03d1c2736
commit
802c468fc6
3 changed files with 124 additions and 40 deletions
|
@ -1,3 +1,9 @@
|
|||
Tue Jul 3 06:02:54 2012 Eric Hodel <drbrain@segment7.net>
|
||||
|
||||
* ext/zlib/zlib.c (zstream_run): Process zlib streams without GVL.
|
||||
[Feature #6615]
|
||||
* NEWS: ditto.
|
||||
|
||||
Mon Jul 2 22:13:04 2012 Tanaka Akira <akr@fsij.org>
|
||||
|
||||
* thread.c (rb_thread_aref): add explanation for why Thread#[] and
|
||||
|
|
2
NEWS
2
NEWS
|
@ -117,6 +117,8 @@ with all sufficient information, see the ChangeLog file.
|
|||
|
||||
* zlib
|
||||
* Added support for the new deflate strategies Zlib::RLE and Zlib::FIXED.
|
||||
* Zlib streams are now processed without the GVL. This allows gzip, zlib and
|
||||
deflate streams to be processed in parallel.
|
||||
|
||||
* openssl
|
||||
* Consistently raise an error when trying to encode nil values. All instances
|
||||
|
|
156
ext/zlib/zlib.c
156
ext/zlib/zlib.c
|
@ -72,6 +72,7 @@ static void finalizer_warn(const char*);
|
|||
|
||||
struct zstream;
|
||||
struct zstream_funcs;
|
||||
struct zstream_run_args;
|
||||
static void zstream_init(struct zstream*, const struct zstream_funcs*);
|
||||
static void zstream_expand_buffer(struct zstream*);
|
||||
static void zstream_expand_buffer_into(struct zstream*, unsigned long);
|
||||
|
@ -564,6 +565,11 @@ static const struct zstream_funcs inflate_funcs = {
|
|||
inflateReset, inflateEnd, inflate,
|
||||
};
|
||||
|
||||
struct zstream_run_args {
|
||||
struct zstream * z;
|
||||
int flush;
|
||||
int interrupt;
|
||||
};
|
||||
|
||||
static voidpf
|
||||
zlib_mem_alloc(voidpf opaque, uInt items, uInt size)
|
||||
|
@ -655,6 +661,42 @@ zstream_expand_buffer_into(struct zstream *z, unsigned long size)
|
|||
}
|
||||
}
|
||||
|
||||
static int
|
||||
zstream_expand_buffer_without_gvl(struct zstream *z)
|
||||
{
|
||||
char * new_str;
|
||||
long inc, len;
|
||||
|
||||
if (RSTRING_LEN(z->buf) - z->buf_filled >= ZSTREAM_AVAIL_OUT_STEP_MAX) {
|
||||
z->stream.avail_out = ZSTREAM_AVAIL_OUT_STEP_MAX;
|
||||
}
|
||||
else {
|
||||
inc = z->buf_filled / 2;
|
||||
if (inc < ZSTREAM_AVAIL_OUT_STEP_MIN) {
|
||||
inc = ZSTREAM_AVAIL_OUT_STEP_MIN;
|
||||
}
|
||||
|
||||
len = z->buf_filled + inc;
|
||||
|
||||
new_str = realloc(RSTRING(z->buf)->as.heap.ptr, len + 1);
|
||||
|
||||
if (!new_str)
|
||||
return 0;
|
||||
|
||||
/* from rb_str_resize */
|
||||
RSTRING(z->buf)->as.heap.ptr = new_str;
|
||||
RSTRING(z->buf)->as.heap.ptr[len] = '\0'; /* sentinel */
|
||||
RSTRING(z->buf)->as.heap.len =
|
||||
RSTRING(z->buf)->as.heap.aux.capa = len;
|
||||
|
||||
z->stream.avail_out = (inc < ZSTREAM_AVAIL_OUT_STEP_MAX) ?
|
||||
(int)inc : ZSTREAM_AVAIL_OUT_STEP_MAX;
|
||||
}
|
||||
z->stream.next_out = (Bytef*)RSTRING_PTR(z->buf) + z->buf_filled;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static void
|
||||
zstream_append_buffer(struct zstream *z, const Bytef *src, long len)
|
||||
{
|
||||
|
@ -871,13 +913,62 @@ zstream_end(struct zstream *z)
|
|||
return Qnil;
|
||||
}
|
||||
|
||||
static VALUE
|
||||
zstream_run_func(void *ptr) {
|
||||
struct zstream_run_args *args = (struct zstream_run_args *)ptr;
|
||||
int err, flush = args->flush;
|
||||
struct zstream *z = args->z;
|
||||
uInt n;
|
||||
|
||||
while (!args->interrupt) {
|
||||
n = z->stream.avail_out;
|
||||
err = z->func->run(&z->stream, flush);
|
||||
z->buf_filled += n - z->stream.avail_out;
|
||||
|
||||
if (err == Z_STREAM_END) {
|
||||
z->flags &= ~ZSTREAM_FLAG_IN_STREAM;
|
||||
z->flags |= ZSTREAM_FLAG_FINISHED;
|
||||
break;
|
||||
}
|
||||
|
||||
if (err != Z_OK)
|
||||
break;
|
||||
|
||||
if (z->stream.avail_out > 0) {
|
||||
z->flags |= ZSTREAM_FLAG_IN_STREAM;
|
||||
break;
|
||||
}
|
||||
|
||||
if (!zstream_expand_buffer_without_gvl(z)) {
|
||||
err = Z_MEM_ERROR; /* realloc failed */
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return (VALUE)err;
|
||||
}
|
||||
|
||||
/*
|
||||
* There is no safe way to interrupt z->run->func().
|
||||
*/
|
||||
static void
|
||||
zstream_unblock_func(void *ptr) {
|
||||
struct zstream_run_args *args = (struct zstream_run_args *)ptr;
|
||||
|
||||
args->interrupt = 1;
|
||||
}
|
||||
|
||||
static void
|
||||
zstream_run(struct zstream *z, Bytef *src, long len, int flush)
|
||||
{
|
||||
uInt n;
|
||||
struct zstream_run_args args;
|
||||
int err;
|
||||
volatile VALUE guard = Qnil;
|
||||
|
||||
args.z = z;
|
||||
args.flush = flush;
|
||||
args.interrupt = 0;
|
||||
|
||||
if (NIL_P(z->input) && len == 0) {
|
||||
z->stream.next_in = (Bytef*)"";
|
||||
z->stream.avail_in = 0;
|
||||
|
@ -896,49 +987,34 @@ zstream_run(struct zstream *z, Bytef *src, long len, int flush)
|
|||
zstream_expand_buffer(z);
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
/* VC allocates err and guard to same address. accessing err and guard
|
||||
in same scope prevents it. */
|
||||
RB_GC_GUARD(guard);
|
||||
n = z->stream.avail_out;
|
||||
err = z->func->run(&z->stream, flush);
|
||||
z->buf_filled += n - z->stream.avail_out;
|
||||
rb_thread_schedule();
|
||||
loop:
|
||||
err = (int)rb_thread_blocking_region(
|
||||
zstream_run_func, (void *)&args,
|
||||
zstream_unblock_func, (void *)&args);
|
||||
|
||||
if (err == Z_STREAM_END) {
|
||||
z->flags &= ~ZSTREAM_FLAG_IN_STREAM;
|
||||
z->flags |= ZSTREAM_FLAG_FINISHED;
|
||||
break;
|
||||
}
|
||||
if (err != Z_OK) {
|
||||
if (flush != Z_FINISH && err == Z_BUF_ERROR
|
||||
&& z->stream.avail_out > 0) {
|
||||
z->flags |= ZSTREAM_FLAG_IN_STREAM;
|
||||
break;
|
||||
}
|
||||
zstream_reset_input(z);
|
||||
if (z->stream.avail_in > 0) {
|
||||
zstream_append_input(z, z->stream.next_in, z->stream.avail_in);
|
||||
}
|
||||
if (err == Z_NEED_DICT) {
|
||||
VALUE self = (VALUE)z->stream.opaque;
|
||||
VALUE dicts = rb_ivar_get(self, id_dictionaries);
|
||||
VALUE dict = rb_hash_aref(dicts, rb_uint2inum(z->stream.adler));
|
||||
if (!NIL_P(dict)) {
|
||||
rb_inflate_set_dictionary(self, dict);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
raise_zlib_error(err, z->stream.msg);
|
||||
}
|
||||
if (z->stream.avail_out > 0) {
|
||||
z->flags |= ZSTREAM_FLAG_IN_STREAM;
|
||||
break;
|
||||
}
|
||||
zstream_expand_buffer(z);
|
||||
if (flush != Z_FINISH && err == Z_BUF_ERROR
|
||||
&& z->stream.avail_out > 0) {
|
||||
z->flags |= ZSTREAM_FLAG_IN_STREAM;
|
||||
}
|
||||
|
||||
zstream_reset_input(z);
|
||||
|
||||
if (err != Z_OK && err != Z_STREAM_END) {
|
||||
if (z->stream.avail_in > 0) {
|
||||
zstream_append_input(z, z->stream.next_in, z->stream.avail_in);
|
||||
}
|
||||
if (err == Z_NEED_DICT) {
|
||||
VALUE self = (VALUE)z->stream.opaque;
|
||||
VALUE dicts = rb_ivar_get(self, id_dictionaries);
|
||||
VALUE dict = rb_hash_aref(dicts, rb_uint2inum(z->stream.adler));
|
||||
if (!NIL_P(dict)) {
|
||||
rb_inflate_set_dictionary(self, dict);
|
||||
goto loop;
|
||||
}
|
||||
}
|
||||
raise_zlib_error(err, z->stream.msg);
|
||||
}
|
||||
|
||||
if (z->stream.avail_in > 0) {
|
||||
zstream_append_input(z, z->stream.next_in, z->stream.avail_in);
|
||||
guard = Qnil; /* prevent tail call to make guard effective */
|
||||
|
|
Loading…
Reference in a new issue