mirror of
https://github.com/fog/fog-aws.git
synced 2022-11-09 13:50:52 -05:00
Merge pull request #579 from stanhu/sh-add-mulithreaded-multipart-copy
Add multi-threaded support for File#copy
This commit is contained in:
commit
d64f77583d
3 changed files with 129 additions and 18 deletions
20
README.md
20
README.md
|
@ -105,6 +105,26 @@ directory.files
|
|||
directory.files.new(key: 'user/1/Gemfile').url(Time.now + 60)
|
||||
```
|
||||
|
||||
#### Copying a file
|
||||
|
||||
```ruby
|
||||
directory = s3.directories.new(key: 'gaudi-portal-dev')
|
||||
file = directory.files.get('user/1/Gemfile')
|
||||
file.copy("target-bucket", "user/2/Gemfile.copy")
|
||||
```
|
||||
|
||||
To speed transfers of large files, the `concurrency` option can be used
|
||||
to spawn multiple threads. Note that the file must be at least 5 MB for
|
||||
multipart uploads to work. For example:
|
||||
|
||||
```ruby
|
||||
directory = s3.directories.new(key: 'gaudi-portal-dev')
|
||||
file = directory.files.get('user/1/Gemfile')
|
||||
file.multipart_chunk_size = 10 * 1024 * 1024
|
||||
file.concurrency = 10
|
||||
file.copy("target-bucket", "user/2/Gemfile.copy")
|
||||
```
|
||||
|
||||
## Documentation
|
||||
|
||||
See the [online documentation](http://www.rubydoc.info/github/fog/fog-aws) for a complete API reference.
|
||||
|
|
|
@ -31,6 +31,35 @@ module Fog
|
|||
attribute :kms_key_id, :aliases => 'x-amz-server-side-encryption-aws-kms-key-id'
|
||||
attribute :tags, :aliases => 'x-amz-tagging'
|
||||
|
||||
UploadPartData = Struct.new(:part_number, :upload_options, :etag)
|
||||
|
||||
class PartList
|
||||
def initialize(parts = [])
|
||||
@parts = parts
|
||||
@mutex = Mutex.new
|
||||
end
|
||||
|
||||
def push(part)
|
||||
@mutex.synchronize { @parts.push(part) }
|
||||
end
|
||||
|
||||
def shift
|
||||
@mutex.synchronize { @parts.shift }
|
||||
end
|
||||
|
||||
def clear!
|
||||
@mutex.synchronize { @parts.clear }
|
||||
end
|
||||
|
||||
def size
|
||||
@mutex.synchronize { @parts.size }
|
||||
end
|
||||
|
||||
def to_a
|
||||
@mutex.synchronize { @parts.dup }
|
||||
end
|
||||
end
|
||||
|
||||
# @note Chunk size to use for multipart uploads.
|
||||
# Use small chunk sizes to minimize memory. E.g. 5242880 = 5mb
|
||||
attr_reader :multipart_chunk_size
|
||||
|
@ -39,6 +68,17 @@ module Fog
|
|||
@multipart_chunk_size = mp_chunk_size
|
||||
end
|
||||
|
||||
# @note Number of threads used to copy files.
|
||||
def concurrency=(concurrency)
|
||||
raise ArgumentError.new('minimum concurrency is 1') if concurrency.to_i < 1
|
||||
|
||||
@concurrency = concurrency.to_i
|
||||
end
|
||||
|
||||
def concurrency
|
||||
@concurrency || 1
|
||||
end
|
||||
|
||||
def acl
|
||||
requires :directory, :key
|
||||
service.get_object_acl(directory.key, key).body['AccessControlList']
|
||||
|
@ -105,7 +145,7 @@ module Fog
|
|||
requires :directory, :key
|
||||
|
||||
# With a single PUT operation you can upload objects up to 5 GB in size. Automatically set MP for larger objects.
|
||||
self.multipart_chunk_size = MIN_MULTIPART_CHUNK_SIZE if !multipart_chunk_size && self.content_length.to_i > MAX_SINGLE_PUT_SIZE
|
||||
self.multipart_chunk_size = MIN_MULTIPART_CHUNK_SIZE * 2 if !multipart_chunk_size && self.content_length.to_i > MAX_SINGLE_PUT_SIZE
|
||||
|
||||
if multipart_chunk_size && self.content_length.to_i >= multipart_chunk_size
|
||||
upload_part_options = options.merge({ 'x-amz-copy-source' => "#{directory.key}/#{key}" })
|
||||
|
@ -315,22 +355,14 @@ module Fog
|
|||
|
||||
# Store ETags of upload parts
|
||||
part_tags = []
|
||||
pending = PartList.new(create_part_list(upload_part_options))
|
||||
thread_count = self.concurrency
|
||||
completed = PartList.new
|
||||
errors = upload_in_threads(target_directory_key, target_file_key, upload_id, pending, completed, thread_count)
|
||||
|
||||
current_pos = 0
|
||||
raise error.first if errors.any?
|
||||
|
||||
# Upload each part
|
||||
# TODO: optionally upload chunks in parallel using threads
|
||||
# (may cause network performance problems with many small chunks)
|
||||
while current_pos < self.content_length do
|
||||
start_pos = current_pos
|
||||
end_pos = [current_pos + self.multipart_chunk_size, self.content_length - 1].min
|
||||
range = "bytes=#{start_pos}-#{end_pos}"
|
||||
|
||||
upload_part_options['x-amz-copy-source-range'] = range
|
||||
part_upload = service.upload_part_copy(target_directory_key, target_file_key, upload_id, part_tags.size + 1, upload_part_options)
|
||||
part_tags << part_upload.body['ETag']
|
||||
current_pos = end_pos + 1
|
||||
end
|
||||
part_tags = completed.to_a.sort_by { |part| part.part_number }.map(&:etag)
|
||||
rescue => e
|
||||
# Abort the upload & reraise
|
||||
service.abort_multipart_upload(target_directory_key, target_file_key, upload_id) if upload_id
|
||||
|
@ -364,6 +396,49 @@ module Fog
|
|||
'x-amz-server-side-encryption-customer-key-md5' => Base64.encode64(OpenSSL::Digest::MD5.digest(encryption_key.to_s)).chomp!
|
||||
}
|
||||
end
|
||||
|
||||
def create_part_list(upload_part_options)
|
||||
current_pos = 0
|
||||
count = 0
|
||||
pending = []
|
||||
|
||||
while current_pos < self.content_length do
|
||||
start_pos = current_pos
|
||||
end_pos = [current_pos + self.multipart_chunk_size, self.content_length - 1].min
|
||||
range = "bytes=#{start_pos}-#{end_pos}"
|
||||
part_options = upload_part_options.dup
|
||||
part_options['x-amz-copy-source-range'] = range
|
||||
pending << UploadPartData.new(count + 1, part_options, nil)
|
||||
count += 1
|
||||
current_pos = end_pos + 1
|
||||
end
|
||||
|
||||
pending
|
||||
end
|
||||
|
||||
def upload_in_threads(target_directory_key, target_file_key, upload_id, pending, completed, thread_count)
|
||||
threads = []
|
||||
|
||||
thread_count.times do
|
||||
thread = Thread.new do
|
||||
begin
|
||||
while part = pending.shift
|
||||
part_upload = service.upload_part_copy(target_directory_key, target_file_key, upload_id, part.part_number, part.upload_options)
|
||||
part.etag = part_upload.body['ETag']
|
||||
completed.push(part)
|
||||
end
|
||||
rescue => error
|
||||
pending.clear!
|
||||
error
|
||||
end
|
||||
end
|
||||
|
||||
thread.abort_on_exception = true
|
||||
threads << thread
|
||||
end
|
||||
|
||||
threads.map(&:value).compact
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -3,6 +3,8 @@ require 'securerandom'
|
|||
Shindo.tests('Fog::Storage[:aws] | copy requests', ["aws"]) do
|
||||
|
||||
@directory = Fog::Storage[:aws].directories.create(:key => uniq_id('fogmultipartcopytests'))
|
||||
@large_data = SecureRandom.hex * 19 * 1024 * 1024
|
||||
@large_blob = Fog::Storage[:aws].put_object(@directory.identity, 'large_object', @large_data)
|
||||
|
||||
tests('copies an empty object') do
|
||||
Fog::Storage[:aws].put_object(@directory.identity, 'empty_object', '')
|
||||
|
@ -47,9 +49,6 @@ Shindo.tests('Fog::Storage[:aws] | copy requests', ["aws"]) do
|
|||
end
|
||||
|
||||
tests('copies a file with many parts') do
|
||||
data = SecureRandom.hex * 19 * 1024 * 1024
|
||||
Fog::Storage[:aws].put_object(@directory.identity, 'large_object', data)
|
||||
|
||||
file = Fog::Storage[:aws].directories.new(key: @directory.identity).files.get('large_object')
|
||||
file.multipart_chunk_size = Fog::AWS::Storage::File::MIN_MULTIPART_CHUNK_SIZE
|
||||
|
||||
|
@ -59,6 +58,23 @@ Shindo.tests('Fog::Storage[:aws] | copy requests', ["aws"]) do
|
|||
|
||||
copied = Fog::Storage[:aws].directories.new(key: @directory.identity).files.get('large_copied_object')
|
||||
|
||||
test("concurrency defaults to 1") { file.concurrency == 1 }
|
||||
test("copied is the same") { copied.body == file.body }
|
||||
end
|
||||
|
||||
tests('copies a file with many parts with 10 threads') do
|
||||
file = Fog::Storage[:aws].directories.new(key: @directory.identity).files.get('large_object')
|
||||
file.multipart_chunk_size = Fog::AWS::Storage::File::MIN_MULTIPART_CHUNK_SIZE
|
||||
file.concurrency = 10
|
||||
|
||||
test("concurrency is set to 10") { file.concurrency == 10 }
|
||||
|
||||
tests("#copy_object('#{@directory.identity}', 'copied_object_with_10_threads'").succeeds do
|
||||
file.copy(@directory.identity, 'copied_object_with_10_threads')
|
||||
end
|
||||
|
||||
copied = Fog::Storage[:aws].directories.new(key: @directory.identity).files.get('copied_object_with_10_threads')
|
||||
|
||||
test("copied is the same") { copied.body == file.body }
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue