diff --git a/lib/gitlab/ci/trace/chunked_file/chunk_store/database.rb b/lib/gitlab/ci/trace/chunked_file/chunk_store/database.rb index b3a1c4734d3..3948c07c352 100644 --- a/lib/gitlab/ci/trace/chunked_file/chunk_store/database.rb +++ b/lib/gitlab/ci/trace/chunked_file/chunk_store/database.rb @@ -61,6 +61,10 @@ module Gitlab data.length end + def append!(data) + raise NotImplementedError + end + def truncate!(offset) raise NotImplementedError end diff --git a/lib/gitlab/ci/trace/chunked_file/chunk_store/object_storage.rb b/lib/gitlab/ci/trace/chunked_file/chunk_store/object_storage.rb index f144d670d03..95aaa9f9e2c 100644 --- a/lib/gitlab/ci/trace/chunked_file/chunk_store/object_storage.rb +++ b/lib/gitlab/ci/trace/chunked_file/chunk_store/object_storage.rb @@ -65,6 +65,10 @@ module Gitlab raise NotImplementedError end + def append!(data) + raise NotImplementedError + end + def truncate!(offset) raise NotImplementedError end diff --git a/lib/gitlab/ci/trace/chunked_file/chunk_store/redis.rb b/lib/gitlab/ci/trace/chunked_file/chunk_store/redis.rb index 6fa27b2c196..574657803dd 100644 --- a/lib/gitlab/ci/trace/chunked_file/chunk_store/redis.rb +++ b/lib/gitlab/ci/trace/chunked_file/chunk_store/redis.rb @@ -74,6 +74,13 @@ module Gitlab end end + def append!(data) + Gitlab::Redis::Cache.with do |redis| + redis.append(buffer_key, data) + data.length + end + end + def truncate!(offset) Gitlab::Redis::Cache.with do |redis| return unless redis.exists(buffer_key) diff --git a/lib/gitlab/ci/trace/chunked_file/chunked_io.rb b/lib/gitlab/ci/trace/chunked_file/chunked_io.rb index 5c36158dbea..c3a84083eae 100644 --- a/lib/gitlab/ci/trace/chunked_file/chunked_io.rb +++ b/lib/gitlab/ci/trace/chunked_file/chunked_io.rb @@ -1,12 +1,22 @@ ## -# This class is compatible with IO class (https://ruby-doc.org/core-2.3.1/IO.html) -# source: https://gitlab.com/snippets/1685610 +# This class is designed as it's compatible with IO class (https://ruby-doc.org/core-2.3.1/IO.html) module Gitlab module Ci class Trace module ChunkedFile class ChunkedIO + class << self + def open(job_id, size, mode) + stream = self.new(job_id, size, mode) + + yield stream + ensure + stream.close + end + end + WriteError = Class.new(StandardError) + FailedToGetChunkError = Class.new(StandardError) attr_reader :size attr_reader :tell @@ -23,7 +33,10 @@ module Gitlab if /(w|a)/ =~ mode @write_lock_uuid = Gitlab::ExclusiveLease.new(write_lock_key, timeout: 1.hour.to_i).try_obtain + raise WriteError, 'Already opened by another process' unless write_lock_uuid + + seek(0, IO::SEEK_END) if /a/ =~ mode end end @@ -39,10 +52,6 @@ module Gitlab true end - def path - nil - end - def seek(pos, where = IO::SEEK_SET) new_pos = case where @@ -111,46 +120,56 @@ module Gitlab end def write(data, &block) - raise WriteError, 'Already opened by another process' unless write_lock_uuid + raise WriteError, 'Could not write without lock' unless write_lock_uuid + raise WriteError, 'Could not write empty data' unless data.present? - while data.present? - empty_space = BUFFER_SIZE - chunk_offset + data = data.dup - chunk_store.open(job_id, chunk_index, params_for_store) do |store| - data_to_write = '' - data_to_write += store.get if store.size > 0 - data_to_write += data.slice!(0..empty_space) + chunk_index_start = chunk_index + chunk_index_end = (tell + data.length) / BUFFER_SIZE + prev_tell = tell - written_size = store.write!(data_to_write) + (chunk_index_start..chunk_index_end).each do |c_index| + chunk_store.open(job_id, c_index, params_for_store) do |store| + writable_space = BUFFER_SIZE - chunk_offset + writing_size = [writable_space, data.length].min - raise WriteError, 'Written size mismatch' unless data_to_write.length == written_size + if store.size > 0 + written_size = store.append!(data.slice!(0...writing_size)) + else + written_size = store.write!(data.slice!(0...writing_size)) + end - block.call(store, chunk_index) if block_given? + raise WriteError, 'Written size mismatch' unless writing_size == written_size @tell += written_size - @size += written_size + @size = [tell, size].max + + block.call(store, c_index) if block_given? end end + + tell - prev_tell end - def truncate(offset) - raise WriteError, 'Already opened by another process' unless write_lock_uuid + def truncate(offset, &block) + raise WriteError, 'Could not write without lock' unless write_lock_uuid + raise WriteError, 'Offset is out of bound' if offset > size || offset < 0 - removal_chunk_index_start = (offset / BUFFER_SIZE) - removal_chunk_index_end = chunks_count - 1 - removal_chunk_offset = offset % BUFFER_SIZE + chunk_index_start = (offset / BUFFER_SIZE) + chunk_index_end = chunks_count - 1 - if removal_chunk_offset > 0 - chunk_store.open(job_id, removal_chunk_index_start, params_for_store) do |store| - store.truncate!(removal_chunk_offset) - end + (chunk_index_start..chunk_index_end).reverse_each do |c_index| + chunk_store.open(job_id, c_index, params_for_store) do |store| + c_index_start = c_index * BUFFER_SIZE - removal_chunk_index_start += 1 - end + if offset <= c_index_start + store.delete! + else + store.truncate!(offset - c_index_start) if store.size > 0 + end - (removal_chunk_index_start..removal_chunk_index_end).each do |removal_chunk_index| - chunk_store.open(job_id, removal_chunk_index, params_for_store) do |store| - store.delete! + block.call(store, c_index) if block_given? end end @@ -165,15 +184,8 @@ module Gitlab true end - def delete_chunks! - truncate(0) - end - private - ## - # The below methods are not implemented in IO class - # def in_range? @chunk_range&.include?(tell) end @@ -182,7 +194,10 @@ module Gitlab unless in_range? chunk_store.open(job_id, chunk_index, params_for_store) do |store| @chunk = store.get - @chunk_range = (chunk_start...(chunk_start + @chunk.length)) + + raise FailedToGetChunkError unless chunk + + @chunk_range = (chunk_start...(chunk_start + chunk.length)) end end @@ -223,6 +238,10 @@ module Gitlab def write_lock_key "live_trace:operation:write:#{job_id}" end + + def chunk_store + raise NotImplementedError + end end end end diff --git a/lib/gitlab/ci/trace/chunked_file/live_trace.rb b/lib/gitlab/ci/trace/chunked_file/live_trace.rb index 7c39561bec7..081ae6e8d51 100644 --- a/lib/gitlab/ci/trace/chunked_file/live_trace.rb +++ b/lib/gitlab/ci/trace/chunked_file/live_trace.rb @@ -7,7 +7,7 @@ module Gitlab class << self def open(job_id, mode) - stream = self.class.new(job_id, mode) + stream = self.new(job_id, mode) yield stream ensure diff --git a/lib/gitlab/ci/trace/chunked_file/remote.rb b/lib/gitlab/ci/trace/chunked_file/remote.rb index 56ff05c7494..caa2235bdbd 100644 --- a/lib/gitlab/ci/trace/chunked_file/remote.rb +++ b/lib/gitlab/ci/trace/chunked_file/remote.rb @@ -6,11 +6,11 @@ module Gitlab BUFFER_SIZE = 128.kilobytes class << self - def open(job_id, url, size, mode) - stream = self.class.new(job_id, mode) + def open(job_id, mode) + stream = self.new(job_id, mode) yield stream - + ensure stream.close end end diff --git a/spec/lib/gitlab/ci/trace/chunked_file/chunked_io_spec.rb b/spec/lib/gitlab/ci/trace/chunked_file/chunked_io_spec.rb new file mode 100644 index 00000000000..506ce71099a --- /dev/null +++ b/spec/lib/gitlab/ci/trace/chunked_file/chunked_io_spec.rb @@ -0,0 +1,481 @@ +require 'spec_helper' + +describe Gitlab::Ci::Trace::ChunkedFile::ChunkedIO, :clean_gitlab_redis_cache do + include ChunkedIOHelpers + + let(:chunked_io) { described_class.new(job_id, size, mode) } + let(:job_id) { 1 } + let(:size) { sample_trace_size } + let(:mode) { 'rb' } + let(:buffer_size) { 128.kilobytes } + let(:chunk_store) { Gitlab::Ci::Trace::ChunkedFile::ChunkStore::Redis } + + before do + allow_any_instance_of(described_class).to receive(:chunk_store).and_return(chunk_store) + stub_const("Gitlab::Ci::Trace::ChunkedFile::ChunkedIO::BUFFER_SIZE", buffer_size) + end + + describe '#new' do + context 'when mode is read' do + let(:mode) { 'rb' } + + it 'raises no exception' do + described_class.new(job_id, size, mode) + + expect { described_class.new(job_id, size, mode) }.not_to raise_error + end + end + + context 'when mode is write' do + let(:mode) { 'a+b' } + + it 'raises an exception' do + described_class.new(job_id, size, mode) + + expect { described_class.new(job_id, size, mode) }.to raise_error('Already opened by another process') + end + + context 'when closed after open' do + it 'does not raise an exception' do + described_class.new(job_id, size, mode).close + + expect { described_class.new(job_id, size, mode) }.not_to raise_error + end + end + end + end + + describe '#seek' do + subject { chunked_io.seek(pos, where) } + + context 'when moves pos to end of the file' do + let(:pos) { 0 } + let(:where) { IO::SEEK_END } + + it { is_expected.to eq(size) } + end + + context 'when moves pos to middle of the file' do + let(:pos) { size / 2 } + let(:where) { IO::SEEK_SET } + + it { is_expected.to eq(size / 2) } + end + + context 'when moves pos around' do + it 'matches the result' do + expect(chunked_io.seek(0)).to eq(0) + expect(chunked_io.seek(100, IO::SEEK_CUR)).to eq(100) + expect { chunked_io.seek(size + 1, IO::SEEK_CUR) }.to raise_error('new position is outside of file') + end + end + end + + describe '#eof?' do + subject { chunked_io.eof? } + + context 'when current pos is at end of the file' do + before do + chunked_io.seek(size, IO::SEEK_SET) + end + + it { is_expected.to be_truthy } + end + + context 'when current pos is not at end of the file' do + before do + chunked_io.seek(0, IO::SEEK_SET) + end + + it { is_expected.to be_falsey } + end + end + + describe '#each_line' do + let(:buffer_size) { 128.kilobytes } + let(:string_io) { StringIO.new(sample_trace_raw) } + + context 'when BUFFER_SIZE is smaller than file size' do + before do + set_smaller_buffer_size_than(size) + fill_trace_to_chunks(sample_trace_raw) + end + + it 'yields lines' do + expect { |b| described_class.new(job_id, size, 'rb').each_line(&b) } + .to yield_successive_args(*string_io.each_line.to_a) + end + end + + context 'when BUFFER_SIZE is larger than file size' do + let(:buffer_size) { size + 1000 } + + before do + set_larger_buffer_size_than(size) + fill_trace_to_chunks(sample_trace_raw) + end + + it 'calls get_chunk only once' do + expect(chunk_store).to receive(:open).once.and_call_original + + described_class.new(job_id, size, 'rb').each_line { |line| } + end + end + end + + describe '#read' do + subject { described_class.new(job_id, size, 'rb').read(length) } + + context 'when read whole size' do + let(:length) { nil } + + context 'when BUFFER_SIZE is smaller than file size', :clean_gitlab_redis_cache do + before do + set_smaller_buffer_size_than(size) + fill_trace_to_chunks(sample_trace_raw) + end + + it 'reads a trace' do + is_expected.to eq(sample_trace_raw) + end + end + + context 'when BUFFER_SIZE is larger than file size', :clean_gitlab_redis_cache do + before do + set_larger_buffer_size_than(size) + fill_trace_to_chunks(sample_trace_raw) + end + + it 'reads a trace' do + is_expected.to eq(sample_trace_raw) + end + end + end + + context 'when read only first 100 bytes' do + let(:length) { 100 } + + context 'when BUFFER_SIZE is smaller than file size', :clean_gitlab_redis_cache do + before do + set_smaller_buffer_size_than(size) + fill_trace_to_chunks(sample_trace_raw) + end + + it 'reads a trace' do + is_expected.to eq(sample_trace_raw[0, length]) + end + end + + context 'when BUFFER_SIZE is larger than file size', :clean_gitlab_redis_cache do + before do + set_larger_buffer_size_than(size) + fill_trace_to_chunks(sample_trace_raw) + end + + it 'reads a trace' do + is_expected.to eq(sample_trace_raw[0, length]) + end + end + end + + context 'when tries to read oversize' do + let(:length) { size + 1000 } + + context 'when BUFFER_SIZE is smaller than file size' do + before do + set_smaller_buffer_size_than(size) + fill_trace_to_chunks(sample_trace_raw) + end + + it 'reads a trace' do + is_expected.to eq(sample_trace_raw) + end + end + + context 'when BUFFER_SIZE is larger than file size' do + before do + set_larger_buffer_size_than(size) + fill_trace_to_chunks(sample_trace_raw) + end + + it 'reads a trace' do + is_expected.to eq(sample_trace_raw) + end + end + end + + context 'when tries to read 0 bytes' do + let(:length) { 0 } + + context 'when BUFFER_SIZE is smaller than file size' do + before do + set_smaller_buffer_size_than(size) + fill_trace_to_chunks(sample_trace_raw) + end + + it 'reads a trace' do + is_expected.to be_empty + end + end + + context 'when BUFFER_SIZE is larger than file size' do + before do + set_larger_buffer_size_than(size) + fill_trace_to_chunks(sample_trace_raw) + end + + it 'reads a trace' do + is_expected.to be_empty + end + end + end + + context 'when chunk store failed to get chunk' do + let(:length) { nil } + + before do + fill_trace_to_chunks(sample_trace_raw) + + stub_chunk_store_redis_get_failed + end + + it 'reads a trace' do + expect { subject }.to raise_error(Gitlab::Ci::Trace::ChunkedFile::ChunkedIO::FailedToGetChunkError) + end + end + end + + describe '#readline' do + subject { chunked_io.readline } + + let(:string_io) { StringIO.new(sample_trace_raw) } + + shared_examples 'all line matching' do + it 'reads a line' do + (0...sample_trace_raw.lines.count).each do + expect(chunked_io.readline).to eq(string_io.readline) + end + end + end + + context 'when chunk store failed to get chunk' do + let(:length) { nil } + + before do + fill_trace_to_chunks(sample_trace_raw) + stub_chunk_store_redis_get_failed + end + + it 'reads a trace' do + expect { subject }.to raise_error(Gitlab::Ci::Trace::ChunkedFile::ChunkedIO::FailedToGetChunkError) + end + end + + context 'when BUFFER_SIZE is smaller than file size' do + before do + set_smaller_buffer_size_than(size) + fill_trace_to_chunks(sample_trace_raw) + end + + it_behaves_like 'all line matching' + end + + context 'when BUFFER_SIZE is larger than file size' do + before do + set_larger_buffer_size_than(size) + fill_trace_to_chunks(sample_trace_raw) + end + + it_behaves_like 'all line matching' + end + + context 'when pos is at middle of the file' do + before do + set_smaller_buffer_size_than(size) + fill_trace_to_chunks(sample_trace_raw) + + chunked_io.seek(size / 2) + string_io.seek(size / 2) + end + + it 'reads from pos' do + expect(chunked_io.readline).to eq(string_io.readline) + end + end + end + + describe '#write' do + subject { chunked_io.write(data) } + + let(:data) { sample_trace_raw } + + context 'when write mdoe' do + let(:mode) { 'wb' } + + context 'when BUFFER_SIZE is smaller than file size', :clean_gitlab_redis_cache do + before do + set_smaller_buffer_size_than(size) + end + + it 'writes a trace' do + is_expected.to eq(data.length) + + Gitlab::Ci::Trace::ChunkedFile::ChunkedIO.open(job_id, size, 'rb') do |stream| + expect(stream.read).to eq(data) + expect(chunk_store.chunks_count(job_id)).to eq(stream.send(:chunks_count)) + expect(chunk_store.chunks_size(job_id)).to eq(data.length) + end + end + end + + context 'when BUFFER_SIZE is larger than file size', :clean_gitlab_redis_cache do + before do + set_larger_buffer_size_than(size) + end + + it 'writes a trace' do + is_expected.to eq(data.length) + + Gitlab::Ci::Trace::ChunkedFile::ChunkedIO.open(job_id, size, 'rb') do |stream| + expect(stream.read).to eq(data) + expect(chunk_store.chunks_count(job_id)).to eq(stream.send(:chunks_count)) + expect(chunk_store.chunks_size(job_id)).to eq(data.length) + end + end + end + + context 'when data is nil' do + let(:data) { nil } + + it 'writes a trace' do + expect { subject } .to raise_error('Could not write empty data') + end + end + end + + context 'when append mdoe' do + let(:original_data) { 'original data' } + let(:total_size) { original_data.length + data.length } + + context 'when BUFFER_SIZE is smaller than file size', :clean_gitlab_redis_cache do + before do + set_smaller_buffer_size_than(size) + fill_trace_to_chunks(original_data) + end + + it 'appends a trace' do + described_class.open(job_id, original_data.length, 'a+b') do |stream| + expect(stream.write(data)).to eq(data.length) + end + + described_class.open(job_id, total_size, 'rb') do |stream| + expect(stream.read).to eq(original_data + data) + expect(chunk_store.chunks_count(job_id)).to eq(stream.send(:chunks_count)) + expect(chunk_store.chunks_size(job_id)).to eq(total_size) + end + end + end + + context 'when BUFFER_SIZE is larger than file size', :clean_gitlab_redis_cache do + before do + set_larger_buffer_size_than(size) + fill_trace_to_chunks(original_data) + end + + it 'appends a trace' do + described_class.open(job_id, original_data.length, 'a+b') do |stream| + expect(stream.write(data)).to eq(data.length) + end + + described_class.open(job_id, total_size, 'rb') do |stream| + expect(stream.read).to eq(original_data + data) + expect(chunk_store.chunks_count(job_id)).to eq(stream.send(:chunks_count)) + expect(chunk_store.chunks_size(job_id)).to eq(total_size) + end + end + end + end + end + + describe '#truncate' do + context 'when data exists' do + context 'when BUFFER_SIZE is smaller than file size', :clean_gitlab_redis_cache do + before do + set_smaller_buffer_size_than(size) + fill_trace_to_chunks(sample_trace_raw) + end + + it 'truncates a trace' do + described_class.open(job_id, size, 'rb') do |stream| + expect(stream.read).to eq(sample_trace_raw) + end + + described_class.open(job_id, size, 'wb') do |stream| + stream.truncate(0) + end + + described_class.open(job_id, 0, 'rb') do |stream| + expect(stream.read).to be_empty + end + + expect(chunk_store.chunks_count(job_id)).to eq(0) + expect(chunk_store.chunks_size(job_id)).to eq(0) + end + + context 'when offset is negative', :clean_gitlab_redis_cache do + it 'raises an error' do + described_class.open(job_id, size, 'wb') do |stream| + expect { stream.truncate(-1) }.to raise_error('Offset is out of bound') + end + end + end + + context 'when offset is larger than file size', :clean_gitlab_redis_cache do + it 'raises an error' do + described_class.open(job_id, size, 'wb') do |stream| + expect { stream.truncate(size + 1) }.to raise_error('Offset is out of bound') + end + end + end + end + + context 'when BUFFER_SIZE is larger than file size', :clean_gitlab_redis_cache do + before do + set_larger_buffer_size_than(size) + fill_trace_to_chunks(sample_trace_raw) + end + + it 'truncates a trace' do + described_class.open(job_id, size, 'rb') do |stream| + expect(stream.read).to eq(sample_trace_raw) + end + + described_class.open(job_id, size, 'wb') do |stream| + stream.truncate(0) + end + + described_class.open(job_id, 0, 'rb') do |stream| + expect(stream.read).to be_empty + end + + expect(chunk_store.chunks_count(job_id)).to eq(0) + expect(chunk_store.chunks_size(job_id)).to eq(0) + end + end + end + + context 'when data does not exist' do + before do + set_smaller_buffer_size_than(size) + end + + it 'truncates a trace' do + described_class.open(job_id, size, 'wb') do |stream| + stream.truncate(0) + expect(stream.send(:tell)).to eq(0) + expect(stream.send(:size)).to eq(0) + end + end + end + end +end diff --git a/spec/support/chunked_io/chunked_io_helpers.rb b/spec/support/chunked_io/chunked_io_helpers.rb new file mode 100644 index 00000000000..d87483620e5 --- /dev/null +++ b/spec/support/chunked_io/chunked_io_helpers.rb @@ -0,0 +1,32 @@ +module ChunkedIOHelpers + def fill_trace_to_chunks(data) + stream = Gitlab::Ci::Trace::ChunkedFile::ChunkedIO.new(job_id, data.length, 'wb') + stream.write(data) + stream.close + end + + def sample_trace_raw + @sample_trace_raw ||= File.read(expand_fixture_path('trace/sample_trace')) + end + + def sample_trace_size + sample_trace_raw.length + end + + def stub_chunk_store_redis_get_failed + allow_any_instance_of(Gitlab::Ci::Trace::ChunkedFile::ChunkStore::Redis) + .to receive(:get).and_return(nil) + end + + def set_smaller_buffer_size_than(file_size) + blocks = (file_size / 128) + new_size = (blocks / 2) * 128 + stub_const("Gitlab::Ci::Trace::ChunkedFile::ChunkedIO::BUFFER_SIZE", new_size) + end + + def set_larger_buffer_size_than(file_size) + blocks = (file_size / 128) + new_size = (blocks * 2) * 128 + stub_const("Gitlab::Ci::Trace::ChunkedFile::ChunkedIO::BUFFER_SIZE", new_size) + end +end