From 2fac77b0819fc951bb9e896d2615f8a550093707 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Trzci=C5=84ski?= Date: Wed, 4 Apr 2018 12:19:17 +0200 Subject: [PATCH] Simpler chunking :) --- app/models/ci/build.rb | 2 + app/models/ci/job_trace_chunk.rb | 107 +++++++++ ...180326202229_create_ci_job_trace_chunks.rb | 3 +- db/schema.rb | 3 +- lib/gitlab/ci/trace.rb | 28 +-- lib/gitlab/ci/trace/chunked_io.rb | 216 ++++++++++++++++++ lib/gitlab/ci/trace/http_io.rb | 2 +- lib/gitlab/ci/trace/stream.rb | 3 +- 8 files changed, 344 insertions(+), 20 deletions(-) create mode 100644 lib/gitlab/ci/trace/chunked_io.rb diff --git a/app/models/ci/build.rb b/app/models/ci/build.rb index 4aa65bf4273..b471fb80536 100644 --- a/app/models/ci/build.rb +++ b/app/models/ci/build.rb @@ -25,6 +25,8 @@ module Ci has_one :job_artifacts_metadata, -> { where(file_type: Ci::JobArtifact.file_types[:metadata]) }, class_name: 'Ci::JobArtifact', inverse_of: :job, foreign_key: :job_id has_one :job_artifacts_trace, -> { where(file_type: Ci::JobArtifact.file_types[:trace]) }, class_name: 'Ci::JobArtifact', inverse_of: :job, foreign_key: :job_id + has_many :chunks, class_name: 'Ci::JobTraceChunk', foreign_key: :job_id, dependent: :destroy # rubocop:disable Cop/ActiveRecordDependent + has_one :metadata, class_name: 'Ci::BuildMetadata' delegate :timeout, to: :metadata, prefix: true, allow_nil: true diff --git a/app/models/ci/job_trace_chunk.rb b/app/models/ci/job_trace_chunk.rb index 8998ed920a5..85b67997d1e 100644 --- a/app/models/ci/job_trace_chunk.rb +++ b/app/models/ci/job_trace_chunk.rb @@ -3,5 +3,112 @@ module Ci extend Gitlab::Ci::Model belongs_to :job, class_name: "Ci::Build", foreign_key: :job_id + + after_destroy :redis_delete_data, if: :redis? + + default_value_for :data_store, :redis + + CHUNK_SIZE = 8 + CHUNK_REDIS_TTL = 1.month + + enum data_store: { + redis: 1, + db: 2, + } + + def data + case + when redis? + redis_data + when db? + raw_data + else + raise 'Unsupported data store' + end + end + + def set_data(value) + raise 'too much data' if value.length > CHUNK_SIZE + + case + when redis? + redis_set_data(value) + when db? + self.raw_data = value + else + raise 'Unsupported data store' + end + + save if changed? + schedule_to_db if fullfilled? + end + + def truncate(offset = 0) + self.append("", offset) + end + + def append(new_data, offset) + current_data = self.data || "" + raise 'Outside of if data' if offset > current_data.bytesize + + self.set_data(current_data.byteslice(0, offset) + new_data) + end + + def size + data&.bytesize.to_i + end + + def start_offset + chunk_index * CHUNK_SIZE + end + + def end_offset + start_offset + size + end + + def range + (start_offset...end_offset) + end + + def use_database! + return if db? + + self.update!(raw_data: data, data_store: :db) + redis_delete_data + end + + private + + def schedule_to_db + return if db? + + self.use_database! + end + + def fullfilled? + size == CHUNK_SIZE + end + + def redis_data + Gitlab::Redis::SharedState.with do |redis| + redis.get(redis_key) + end + end + + def redis_set_data(data) + Gitlab::Redis::SharedState.with do |redis| + redis.set(redis_key, data, ex: CHUNK_REDIS_TTL) + end + end + + def redis_delete_data + Gitlab::Redis::SharedState.with do |redis| + redis.del(redis_key) + end + end + + def redis_key + "gitlab:ci:trace:#{job_id}:chunks:#{chunk_index}" + end end end diff --git a/db/migrate/20180326202229_create_ci_job_trace_chunks.rb b/db/migrate/20180326202229_create_ci_job_trace_chunks.rb index f7548cd766e..70b230a7978 100644 --- a/db/migrate/20180326202229_create_ci_job_trace_chunks.rb +++ b/db/migrate/20180326202229_create_ci_job_trace_chunks.rb @@ -7,7 +7,8 @@ class CreateCiJobTraceChunks < ActiveRecord::Migration create_table :ci_job_trace_chunks do |t| t.integer :job_id, null: false t.integer :chunk_index, null: false - t.text :data + t.integer :data_store, null: false + t.text :raw_data t.foreign_key :ci_builds, column: :job_id, on_delete: :cascade t.index [:chunk_index, :job_id], unique: true diff --git a/db/schema.rb b/db/schema.rb index 54346dadad2..efad5bd6b1c 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -373,7 +373,8 @@ ActiveRecord::Schema.define(version: 20180327101207) do create_table "ci_job_trace_chunks", force: :cascade do |t| t.integer "job_id", null: false t.integer "chunk_index", null: false - t.text "data" + t.integer "data_store", null: false + t.text "raw_data" end add_index "ci_job_trace_chunks", ["chunk_index", "job_id"], name: "index_ci_job_trace_chunks_on_chunk_index_and_job_id", unique: true, using: :btree diff --git a/lib/gitlab/ci/trace.rb b/lib/gitlab/ci/trace.rb index 3dc4848c23d..e2061096356 100644 --- a/lib/gitlab/ci/trace.rb +++ b/lib/gitlab/ci/trace.rb @@ -54,15 +54,15 @@ module Gitlab end def exist? - trace_artifact&.exists? || ChunkedFile::LiveTrace.exist?(job.id) || current_path.present? || old_trace.present? + trace_artifact&.exists? || job.chunks.any? || current_path.present? || old_trace.present? end def read stream = Gitlab::Ci::Trace::Stream.new do if trace_artifact trace_artifact.open - elsif ChunkedFile::LiveTrace.exist?(job.id) - ChunkedFile::LiveTrace.new(job.id, nil, "rb") + elsif job.chunks.any? + Gitlab::Ci::Trace::ChunkedIO.new(job) elsif current_path File.open(current_path, "rb") elsif old_trace @@ -77,12 +77,10 @@ module Gitlab def write stream = Gitlab::Ci::Trace::Stream.new do - if Feature.enabled?('ci_enable_live_trace') - if current_path - current_path - else - ChunkedFile::LiveTrace.new(job.id, nil, "a+b") - end + if current_path + current_path + elsif Feature.enabled?('ci_enable_live_trace') + Gitlab::Ci::Trace::ChunkedIO.new(job) else File.open(ensure_path, "a+b") end @@ -102,6 +100,7 @@ module Gitlab FileUtils.rm(trace_path, force: true) end + job.chunks.destroy_all job.erase_old_trace! end @@ -109,13 +108,10 @@ module Gitlab raise ArchiveError, 'Already archived' if trace_artifact raise ArchiveError, 'Job is not finished yet' unless job.complete? - if ChunkedFile::LiveTrace.exist?(job.id) - ChunkedFile::LiveTrace.new(job.id, nil, 'a+b') do |live_trace_stream| - StringIO.new(live_trace_stream.read, 'rb').tap do |stream| - archive_stream!(stream) - end - - live_trace_stream.delete + if job.chunks.any? + Gitlab::Ci::Trace::ChunkedIO.new(job) do |stream| + archive_stream!(stream) + stream.destroy! end elsif current_path File.open(current_path) do |stream| diff --git a/lib/gitlab/ci/trace/chunked_io.rb b/lib/gitlab/ci/trace/chunked_io.rb new file mode 100644 index 00000000000..9a27c849449 --- /dev/null +++ b/lib/gitlab/ci/trace/chunked_io.rb @@ -0,0 +1,216 @@ +## +# This class is compatible with IO class (https://ruby-doc.org/core-2.3.1/IO.html) +# source: https://gitlab.com/snippets/1685610 +module Gitlab + module Ci + class Trace + class ChunkedIO + CHUNK_SIZE = ::Ci::JobTraceChunk::CHUNK_SIZE + + FailedToGetChunkError = Class.new(StandardError) + + attr_reader :job + attr_reader :tell, :size + attr_reader :chunk, :chunk_range + + alias_method :pos, :tell + + def initialize(job) + @job = job + @chunks_cache = [] + @tell = 0 + @size = job_chunks.last.try(&:end_offset).to_i + end + + def close + # no-op + end + + def binmode + # no-op + end + + def binmode? + true + end + + def path + nil + end + + def url + nil + end + + def seek(pos, where = IO::SEEK_SET) + new_pos = + case where + when IO::SEEK_END + size + pos + when IO::SEEK_SET + pos + when IO::SEEK_CUR + tell + pos + else + -1 + end + + raise 'new position is outside of file' if new_pos < 0 || new_pos > size + + @tell = new_pos + end + + def eof? + tell == size + end + + def each_line + until eof? + line = readline + break if line.nil? + + yield(line) + end + end + + def read(length = nil) + out = "" + + until eof? || (length && out.length >= length) + data = chunk_slice_from_offset + break if data.empty? + + out << data + @tell += data.bytesize + end + + out = out[0, length] if length && out.length > length + + out + end + + def readline + out = "" + + until eof? + data = chunk_slice_from_offset + new_line = data.index("\n") + + if !new_line.nil? + out << data[0..new_line] + @tell += new_line + 1 + break + else + out << data + @tell += data.bytesize + end + end + + out + end + + def write(data) + start_pos = @tell + + while @tell < start_pos + data.bytesize + # get slice from current offset till the end where it falls into chunk + chunk_bytes = CHUNK_SIZE - chunk_offset + chunk_data = data.byteslice(@tell - start_pos, chunk_bytes) + + # append data to chunk, overwriting from that point + ensure_chunk.append(chunk_data, chunk_offset) + + # move offsets within buffer + @tell += chunk_bytes + @size = [@size, @tell].max + end + end + + def truncate(offset) + raise 'Outside of file' if offset > size + + @tell = offset + @size = offset + invalidate_chunk_cache + + # remove all next chunks + job_chunks.where('chunk_index > ?', chunk_index).destroy_all + + # truncate current chunk + current_chunk.truncate(chunk_offset) if chunk_offset != 0 + end + + def flush + # no-op + end + + def present? + true + end + + def destroy! + job_chunks.destroy_all + invalidate_chunk_cache + end + + private + + ## + # The below methods are not implemented in IO class + # + def in_range? + @chunk_range&.include?(tell) + end + + def chunk_slice_from_offset + unless in_range? + current_chunk.tap do |chunk| + raise FailedToGetChunkError unless chunk + + @chunk = chunk.data.force_encoding(Encoding::BINARY) + @chunk_range = chunk.range + end + end + + @chunk.byteslice(chunk_offset, CHUNK_SIZE) + end + + def chunk_offset + tell % CHUNK_SIZE + end + + def chunk_index + tell / CHUNK_SIZE + end + + def chunk_start + chunk_index * CHUNK_SIZE + end + + def chunk_end + [chunk_start + CHUNK_SIZE, size].min + end + + def invalidate_chunk_cache + @chunks_cache = [] + end + + def current_chunk + @chunks_cache[chunk_index] ||= job_chunks.find_by(chunk_index: chunk_index) + end + + def build_chunk + @chunks_cache[chunk_index] = Ci::JobTraceChunk.new(job: job, chunk_index: chunk_index) + end + + def ensure_chunk + current_chunk || build_chunk + end + + def job_chunks + Ci::JobTraceChunk.where(job: job) + end + end + end + end +end diff --git a/lib/gitlab/ci/trace/http_io.rb b/lib/gitlab/ci/trace/http_io.rb index ac4308f4e2c..df32693ca9f 100644 --- a/lib/gitlab/ci/trace/http_io.rb +++ b/lib/gitlab/ci/trace/http_io.rb @@ -161,7 +161,7 @@ module Gitlab @chunk_range ||= (chunk_start...(chunk_start + @chunk.length)) end - @chunk[chunk_offset..BUFFER_SIZE] + @chunk.byteslice(chunk_offset, BUFFER_SIZE) end def request diff --git a/lib/gitlab/ci/trace/stream.rb b/lib/gitlab/ci/trace/stream.rb index b3fe3ef1c4d..6cd791df42b 100644 --- a/lib/gitlab/ci/trace/stream.rb +++ b/lib/gitlab/ci/trace/stream.rb @@ -40,8 +40,9 @@ module Gitlab end def set(data) - truncate(0) + stream.seek(0, IO::SEEK_SET) stream.write(data) + stream.truncate(data.bytesize) stream.flush() end