mirror of
https://github.com/fog/fog-aws.git
synced 2022-11-09 13:50:52 -05:00
Merge pull request #578 from stanhu/sh-add-multipart-copy-support
Add support for multipart Fog::AWS::Storage::File#copy
This commit is contained in:
commit
9e11357c85
5 changed files with 224 additions and 3 deletions
|
|
@ -4,6 +4,10 @@ module Fog
|
||||||
module AWS
|
module AWS
|
||||||
class Storage
|
class Storage
|
||||||
class File < Fog::Model
|
class File < Fog::Model
|
||||||
|
MIN_MULTIPART_CHUNK_SIZE = 5242880
|
||||||
|
MAX_SINGLE_PUT_SIZE = 5368709120
|
||||||
|
MULTIPART_COPY_THRESHOLD = 15728640
|
||||||
|
|
||||||
# @see AWS Object docs http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectOps.html
|
# @see AWS Object docs http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectOps.html
|
||||||
|
|
||||||
identity :key, :aliases => 'Key'
|
identity :key, :aliases => 'Key'
|
||||||
|
|
@ -31,7 +35,7 @@ module Fog
|
||||||
# Use small chunk sizes to minimize memory. E.g. 5242880 = 5mb
|
# Use small chunk sizes to minimize memory. E.g. 5242880 = 5mb
|
||||||
attr_reader :multipart_chunk_size
|
attr_reader :multipart_chunk_size
|
||||||
def multipart_chunk_size=(mp_chunk_size)
|
def multipart_chunk_size=(mp_chunk_size)
|
||||||
raise ArgumentError.new("minimum multipart_chunk_size is 5242880") if mp_chunk_size < 5242880
|
raise ArgumentError.new("minimum multipart_chunk_size is #{MIN_MULTIPART_CHUNK_SIZE}") if mp_chunk_size < MIN_MULTIPART_CHUNK_SIZE
|
||||||
@multipart_chunk_size = mp_chunk_size
|
@multipart_chunk_size = mp_chunk_size
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -99,7 +103,17 @@ module Fog
|
||||||
#
|
#
|
||||||
def copy(target_directory_key, target_file_key, options = {})
|
def copy(target_directory_key, target_file_key, options = {})
|
||||||
requires :directory, :key
|
requires :directory, :key
|
||||||
service.copy_object(directory.key, key, target_directory_key, target_file_key, options)
|
|
||||||
|
# With a single PUT operation you can upload objects up to 5 GB in size. Automatically set MP for larger objects.
|
||||||
|
self.multipart_chunk_size = MIN_MULTIPART_CHUNK_SIZE if !multipart_chunk_size && self.content_length.to_i > MAX_SINGLE_PUT_SIZE
|
||||||
|
|
||||||
|
if multipart_chunk_size && self.content_length.to_i >= multipart_chunk_size
|
||||||
|
upload_part_options = options.merge({ 'x-amz-copy-source' => "#{directory.key}/#{key}" })
|
||||||
|
multipart_copy(options, upload_part_options, target_directory_key, target_file_key)
|
||||||
|
else
|
||||||
|
service.copy_object(directory.key, key, target_directory_key, target_file_key, options)
|
||||||
|
end
|
||||||
|
|
||||||
target_directory = service.directories.new(:key => target_directory_key)
|
target_directory = service.directories.new(:key => target_directory_key)
|
||||||
target_directory.files.head(target_file_key)
|
target_directory.files.head(target_file_key)
|
||||||
end
|
end
|
||||||
|
|
@ -214,7 +228,7 @@ module Fog
|
||||||
options.merge!(encryption_headers)
|
options.merge!(encryption_headers)
|
||||||
|
|
||||||
# With a single PUT operation you can upload objects up to 5 GB in size. Automatically set MP for larger objects.
|
# With a single PUT operation you can upload objects up to 5 GB in size. Automatically set MP for larger objects.
|
||||||
self.multipart_chunk_size = 5242880 if !multipart_chunk_size && Fog::Storage.get_body_size(body) > 5368709120
|
self.multipart_chunk_size = MIN_MULTIPART_CHUNK_SIZE if !multipart_chunk_size && Fog::Storage.get_body_size(body) > MAX_SINGLE_PUT_SIZE
|
||||||
|
|
||||||
if multipart_chunk_size && Fog::Storage.get_body_size(body) >= multipart_chunk_size && body.respond_to?(:read)
|
if multipart_chunk_size && Fog::Storage.get_body_size(body) >= multipart_chunk_size && body.respond_to?(:read)
|
||||||
data = multipart_save(options)
|
data = multipart_save(options)
|
||||||
|
|
@ -294,6 +308,38 @@ module Fog
|
||||||
service.complete_multipart_upload(directory.key, key, upload_id, part_tags)
|
service.complete_multipart_upload(directory.key, key, upload_id, part_tags)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def multipart_copy(options, upload_part_options, target_directory_key, target_file_key)
|
||||||
|
# Initiate the upload
|
||||||
|
res = service.initiate_multipart_upload(target_directory_key, target_file_key, options)
|
||||||
|
upload_id = res.body["UploadId"]
|
||||||
|
|
||||||
|
# Store ETags of upload parts
|
||||||
|
part_tags = []
|
||||||
|
|
||||||
|
current_pos = 0
|
||||||
|
|
||||||
|
# Upload each part
|
||||||
|
# TODO: optionally upload chunks in parallel using threads
|
||||||
|
# (may cause network performance problems with many small chunks)
|
||||||
|
while current_pos < self.content_length do
|
||||||
|
start_pos = current_pos
|
||||||
|
end_pos = [current_pos + self.multipart_chunk_size, self.content_length - 1].min
|
||||||
|
range = "bytes=#{start_pos}-#{end_pos}"
|
||||||
|
|
||||||
|
upload_part_options['x-amz-copy-source-range'] = range
|
||||||
|
part_upload = service.upload_part_copy(target_directory_key, target_file_key, upload_id, part_tags.size + 1, upload_part_options)
|
||||||
|
part_tags << part_upload.body['ETag']
|
||||||
|
current_pos = end_pos + 1
|
||||||
|
end
|
||||||
|
rescue => e
|
||||||
|
# Abort the upload & reraise
|
||||||
|
service.abort_multipart_upload(target_directory_key, target_file_key, upload_id) if upload_id
|
||||||
|
raise
|
||||||
|
else
|
||||||
|
# Complete the upload
|
||||||
|
service.complete_multipart_upload(target_directory_key, target_file_key, upload_id, part_tags)
|
||||||
|
end
|
||||||
|
|
||||||
def encryption_headers
|
def encryption_headers
|
||||||
if encryption && encryption_key
|
if encryption && encryption_key
|
||||||
encryption_customer_key_headers
|
encryption_customer_key_headers
|
||||||
|
|
|
||||||
18
lib/fog/aws/parsers/storage/upload_part_copy_object.rb
Normal file
18
lib/fog/aws/parsers/storage/upload_part_copy_object.rb
Normal file
|
|
@ -0,0 +1,18 @@
|
||||||
|
module Fog
|
||||||
|
module Parsers
|
||||||
|
module AWS
|
||||||
|
module Storage
|
||||||
|
class UploadPartCopyObject < Fog::Parsers::Base
|
||||||
|
def end_element(name)
|
||||||
|
case name
|
||||||
|
when 'ETag'
|
||||||
|
@response[name] = value.gsub('"', '')
|
||||||
|
when 'LastModified'
|
||||||
|
@response[name] = Time.parse(value)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
92
lib/fog/aws/requests/storage/upload_part_copy.rb
Normal file
92
lib/fog/aws/requests/storage/upload_part_copy.rb
Normal file
|
|
@ -0,0 +1,92 @@
|
||||||
|
module Fog
|
||||||
|
module AWS
|
||||||
|
class Storage
|
||||||
|
class Real
|
||||||
|
require 'fog/aws/parsers/storage/upload_part_copy_object'
|
||||||
|
|
||||||
|
# Upload a part for a multipart copy
|
||||||
|
#
|
||||||
|
# @param target_bucket_name [String] Name of bucket to create copy in
|
||||||
|
# @param target_object_name [String] Name for new copy of object
|
||||||
|
# @param upload_id [String] Id of upload to add part to
|
||||||
|
# @param part_number [String] Index of part in upload
|
||||||
|
# @param options [Hash]:
|
||||||
|
# @option options [String] x-amz-metadata-directive Specifies whether to copy metadata from source or replace with data in request. Must be in ['COPY', 'REPLACE']
|
||||||
|
# @option options [String] x-amz-copy_source-if-match Copies object if its etag matches this value
|
||||||
|
# @option options [Time] x-amz-copy_source-if-modified_since Copies object it it has been modified since this time
|
||||||
|
# @option options [String] x-amz-copy_source-if-none-match Copies object if its etag does not match this value
|
||||||
|
# @option options [Time] x-amz-copy_source-if-unmodified-since Copies object it it has not been modified since this time
|
||||||
|
# @option options [Time] x-amz-copy-source-range Specifes the range of bytes to copy from the source object
|
||||||
|
#
|
||||||
|
# @return [Excon::Response]
|
||||||
|
# * body [Hash]:
|
||||||
|
# * ETag [String] - etag of new object
|
||||||
|
# * LastModified [Time] - date object was last modified
|
||||||
|
#
|
||||||
|
# @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html
|
||||||
|
#
|
||||||
|
def upload_part_copy(target_bucket_name, target_object_name, upload_id, part_number, options = {})
|
||||||
|
headers = options
|
||||||
|
request({
|
||||||
|
:expects => 200,
|
||||||
|
:idempotent => true,
|
||||||
|
:headers => headers,
|
||||||
|
:bucket_name => target_bucket_name,
|
||||||
|
:object_name => target_object_name,
|
||||||
|
:method => 'PUT',
|
||||||
|
:query => {'uploadId' => upload_id, 'partNumber' => part_number},
|
||||||
|
:parser => Fog::Parsers::AWS::Storage::UploadPartCopyObject.new,
|
||||||
|
})
|
||||||
|
end
|
||||||
|
end # Real
|
||||||
|
|
||||||
|
class Mock # :nodoc:all
|
||||||
|
require 'fog/aws/requests/storage/shared_mock_methods'
|
||||||
|
include Fog::AWS::Storage::SharedMockMethods
|
||||||
|
|
||||||
|
def upload_part_copy(target_bucket_name, target_object_name, upload_id, part_number, options = {})
|
||||||
|
copy_source = options['x-amz-copy-source']
|
||||||
|
copy_range = options['x-amz-copy-source-range']
|
||||||
|
|
||||||
|
raise 'No x-amz-copy-source header provided' unless copy_source
|
||||||
|
raise 'No x-amz-copy-source-range header provided' unless copy_range
|
||||||
|
|
||||||
|
source_bucket_name, source_object_name = copy_source.split('/', 2)
|
||||||
|
verify_mock_bucket_exists(source_bucket_name)
|
||||||
|
|
||||||
|
source_bucket = self.data[:buckets][source_bucket_name]
|
||||||
|
source_object = source_bucket && source_bucket[:objects][source_object_name] && source_bucket[:objects][source_object_name].first
|
||||||
|
upload_info = get_upload_info(target_bucket_name, upload_id)
|
||||||
|
|
||||||
|
response = Excon::Response.new
|
||||||
|
|
||||||
|
if source_object
|
||||||
|
start_pos, end_pos = byte_range(copy_range, source_object[:body].length)
|
||||||
|
upload_info[:parts][part_number] = source_object[:body][start_pos..end_pos]
|
||||||
|
|
||||||
|
response.status = 200
|
||||||
|
response.body = {
|
||||||
|
# just use the part number as the ETag, for simplicity
|
||||||
|
'ETag' => part_number.to_i,
|
||||||
|
'LastModified' => Time.parse(source_object['Last-Modified'])
|
||||||
|
}
|
||||||
|
response
|
||||||
|
else
|
||||||
|
response.status = 404
|
||||||
|
raise(Excon::Errors.status_error({:expects => 200}, response))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def byte_range(range, size)
|
||||||
|
matches = range.match(/bytes=(\d*)-(\d*)/)
|
||||||
|
|
||||||
|
return nil unless matches
|
||||||
|
|
||||||
|
end_pos = [matches[2].to_i, size].min
|
||||||
|
|
||||||
|
[matches[1].to_i, end_pos]
|
||||||
|
end
|
||||||
|
end # Mock
|
||||||
|
end # Storage
|
||||||
|
end # AWS
|
||||||
|
end # Fog
|
||||||
|
|
@ -112,6 +112,7 @@ module Fog
|
||||||
request :put_request_payment
|
request :put_request_payment
|
||||||
request :sync_clock
|
request :sync_clock
|
||||||
request :upload_part
|
request :upload_part
|
||||||
|
request :upload_part_copy
|
||||||
|
|
||||||
module Utils
|
module Utils
|
||||||
attr_accessor :region
|
attr_accessor :region
|
||||||
|
|
|
||||||
64
tests/requests/storage/multipart_copy_tests.rb
Normal file
64
tests/requests/storage/multipart_copy_tests.rb
Normal file
|
|
@ -0,0 +1,64 @@
|
||||||
|
require 'securerandom'
|
||||||
|
|
||||||
|
Shindo.tests('Fog::Storage[:aws] | copy requests', ["aws"]) do
|
||||||
|
|
||||||
|
@directory = Fog::Storage[:aws].directories.create(:key => uniq_id('fogmultipartcopytests'))
|
||||||
|
|
||||||
|
tests('copies an empty object') do
|
||||||
|
Fog::Storage[:aws].put_object(@directory.identity, 'empty_object', '')
|
||||||
|
|
||||||
|
file = Fog::Storage[:aws].directories.new(key: @directory.identity).files.get('empty_object')
|
||||||
|
file.multipart_chunk_size = Fog::AWS::Storage::File::MIN_MULTIPART_CHUNK_SIZE
|
||||||
|
|
||||||
|
tests("#copy_object('#{@directory.identity}', 'empty_copied_object'").succeeds do
|
||||||
|
file.copy(@directory.identity, 'empty_copied_object')
|
||||||
|
end
|
||||||
|
|
||||||
|
copied = Fog::Storage[:aws].directories.new(key: @directory.identity).files.get('empty_copied_object')
|
||||||
|
test("copied is the same") { copied.body == file.body }
|
||||||
|
end
|
||||||
|
|
||||||
|
tests('copies a small object') do
|
||||||
|
Fog::Storage[:aws].put_object(@directory.identity, 'fog_object', lorem_file)
|
||||||
|
|
||||||
|
file = Fog::Storage[:aws].directories.new(key: @directory.identity).files.get('fog_object')
|
||||||
|
|
||||||
|
tests("#copy_object('#{@directory.identity}', 'copied_object'").succeeds do
|
||||||
|
file.copy(@directory.identity, 'copied_object')
|
||||||
|
end
|
||||||
|
|
||||||
|
copied = Fog::Storage[:aws].directories.new(key: @directory.identity).files.get('copied_object')
|
||||||
|
test("copied is the same") { copied.body == file.body }
|
||||||
|
end
|
||||||
|
|
||||||
|
tests('copies a file needing a single part') do
|
||||||
|
data = '*' * Fog::AWS::Storage::File::MIN_MULTIPART_CHUNK_SIZE
|
||||||
|
Fog::Storage[:aws].put_object(@directory.identity, '1_part_object', data)
|
||||||
|
|
||||||
|
file = Fog::Storage[:aws].directories.new(key: @directory.identity).files.get('1_part_object')
|
||||||
|
file.multipart_chunk_size = Fog::AWS::Storage::File::MIN_MULTIPART_CHUNK_SIZE
|
||||||
|
|
||||||
|
tests("#copy_object('#{@directory.identity}', '1_part_copied_object'").succeeds do
|
||||||
|
file.copy(@directory.identity, '1_part_copied_object')
|
||||||
|
end
|
||||||
|
|
||||||
|
copied = Fog::Storage[:aws].directories.new(key: @directory.identity).files.get('1_part_copied_object')
|
||||||
|
test("copied is the same") { copied.body == file.body }
|
||||||
|
end
|
||||||
|
|
||||||
|
tests('copies a file with many parts') do
|
||||||
|
data = SecureRandom.hex * 19 * 1024 * 1024
|
||||||
|
Fog::Storage[:aws].put_object(@directory.identity, 'large_object', data)
|
||||||
|
|
||||||
|
file = Fog::Storage[:aws].directories.new(key: @directory.identity).files.get('large_object')
|
||||||
|
file.multipart_chunk_size = Fog::AWS::Storage::File::MIN_MULTIPART_CHUNK_SIZE
|
||||||
|
|
||||||
|
tests("#copy_object('#{@directory.identity}', 'large_copied_object'").succeeds do
|
||||||
|
file.copy(@directory.identity, 'large_copied_object')
|
||||||
|
end
|
||||||
|
|
||||||
|
copied = Fog::Storage[:aws].directories.new(key: @directory.identity).files.get('large_copied_object')
|
||||||
|
|
||||||
|
test("copied is the same") { copied.body == file.body }
|
||||||
|
end
|
||||||
|
end
|
||||||
Loading…
Add table
Add a link
Reference in a new issue