1
0
Fork 0
mirror of https://github.com/fog/fog-aws.git synced 2022-11-09 13:50:52 -05:00

Add split_shard request

This commit is contained in:
Michael Hale 2015-07-02 15:23:25 -04:00
parent 44d05ede46
commit 3ca6b75c86
6 changed files with 169 additions and 33 deletions

View file

@ -22,6 +22,11 @@ module Fog
request :put_records
request :put_record
request :get_records
request :split_shard
# request :merge_shards
# request :add_tags_to_stream
# request :list_tags_for_stream
# request :remove_tags_from_stream
class Real
include Fog::AWS::CredentialFetcher::ConnectionMethods
@ -114,6 +119,11 @@ module Fog
end
class Mock
def self.mutex
@mutex ||= Mutex.new
end
def mutex; self.class.mutex; end
def self.data
@data ||= Hash.new do |hash, region|
hash[region] = Hash.new do |region_hash, key|
@ -132,7 +142,6 @@ module Fog
@account_id = Fog::AWS::Mock.owner_id
@aws_access_key_id = options[:aws_access_key_id]
@region = options[:region] || 'us-east-1'
@sequence_number = 0
unless ['ap-northeast-1', 'ap-southeast-1', 'ap-southeast-2', 'eu-central-1', 'eu-west-1', 'sa-east-1', 'us-east-1', 'us-west-1', 'us-west-2'].include?(@region)
raise ArgumentError, "Unknown region: #{@region.inspect}"
@ -147,10 +156,24 @@ module Fog
self.class.data[@region].delete(@aws_access_key_id)
end
def next_sequence_number
@sequence_number += 1
@sequence_number
def self.next_sequence_number
mutex.synchronize do
@sequence_number ||= -1
@sequence_number += 1
@sequence_number.to_s
end
end
def next_sequence_number; self.class.next_sequence_number; end
def self.next_shard_id
mutex.synchronize do
@shard_id ||= -1
@shard_id += 1
"shardId-#{@shard_id.to_s.rjust(12, "0")}"
end
end
def next_shard_id; self.class.next_shard_id; end
end
end

View file

@ -44,9 +44,9 @@ module Fog
"StartingHashKey"=>"0"
},
"SequenceNumberRange"=>{
"StartingSequenceNumber"=> next_sequence_number.to_s
"StartingSequenceNumber"=> next_sequence_number
},
"ShardId"=>"shardId-#{shard.to_s.rjust(12, "0")}",
"ShardId"=>next_shard_id,
"Records" => []
}
end

View file

@ -41,7 +41,7 @@ module Fog
raise 'unknown stream'
end
sequence_number = next_sequence_number.to_s
sequence_number = next_sequence_number
data = options.delete("Data")
partition_key = options.delete("PartitionKey")

View file

@ -41,7 +41,7 @@ module Fog
records = options.delete("Records")
record_results = records.map { |r|
sequence_number = next_sequence_number.to_s
sequence_number = next_sequence_number
sample_method = RUBY_VERSION == "1.8.7" ? :choice : :sample
shard_id = stream["Shards"].send(sample_method)["ShardId"]

View file

@ -0,0 +1,83 @@
module Fog
module AWS
class Kinesis
class Real
# Splits a shard into two new shards in the stream, to increase the stream's capacity to ingest and transport data.
#
# ==== Options
# * NewStartingHashKey<~String>: A hash key value for the starting hash key of one of the child shards created by the split.
# * ShardToSplit<~String>: The shard ID of the shard to split.
# * StreamName<~String>: The name of the stream for the shard split.
# ==== Returns
# * response<~Excon::Response>:
#
# ==== See Also
# https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SplitShard.html
#
def split_shard(options={})
body = {
"NewStartingHashKey" => options.delete("NewStartingHashKey"),
"ShardToSplit" => options.delete("ShardToSplit"),
"StreamName" => options.delete("StreamName")
}.reject{ |_,v| v.nil? }
response = request({
'X-Amz-Target' => "Kinesis_#{@version}.SplitShard",
:body => body,
}.merge(options))
response
end
end
class Mock
def split_shard(options={})
stream_name = options.delete("StreamName")
shard_id = options.delete("ShardToSplit")
stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id }
raise 'unknown shard'
end
# Close original shard (set an EndingSequenceNumber on it)
shard["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number
# Calculate new shard ranges
parent_starting_hash_key = shard["HashKeyRange"]["StartingHashKey"]
parent_ending_hash_key = shard["HashKeyRange"]["EndingHashKey"]
new_starting_hash_key = options.delete("NewStartingHashKey")
# Create two new shards using contiguous hash space based on the original shard
stream["Shards"] << {
"HashKeyRange"=> {
"EndingHashKey" => (new_starting_hash_key.to_i - 1).to_s,
"StartingHashKey" => parent_starting_hash_key
},
"SequenceNumberRange" => {
"StartingSequenceNumber" => next_sequence_number
},
"ShardId" => next_shard_id,
"ParentShardId" => shard_id
}
stream["Shards"] << {
"HashKeyRange" => {
"EndingHashKey" => parent_ending_hash_key,
"StartingHashKey" => new_starting_hash_key
},
"SequenceNumberRange" =>{
"StartingSequenceNumber" => next_sequence_number
},
"ShardId" => next_shard_id,
"ParentShardId" => shard_id
}
response = Excon::Response.new
response.status = 200
response.body = ""
response
end
end
end
end
end

View file

@ -2,9 +2,9 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do
@stream_id = 'fog-test-stream'
tests('success') do
wait_for_delete = lambda {
wait_for_status = lambda { |status|
begin
while Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["StreamStatus"] == "DELETING"
while Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["StreamStatus"] != status
sleep 1
print '.'
end
@ -13,10 +13,10 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do
# ensure we start from a clean slate
if Fog::AWS[:kinesis].list_streams.body["StreamNames"].include?(@stream_id)
wait_for_delete.call
wait_for_status.call("ACTIVE")
begin
Fog::AWS[:kinesis].delete_stream("StreamName" => @stream_id)
wait_for_delete.call
wait_for_status.call("ACTIVE")
rescue Excon::Errors::BadRequest; end
end
@ -77,21 +77,18 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do
"MillisBehindLatest" => Integer,
"NextShardIterator" => String,
"Records" => [
{
"Data" => String,
"PartitionKey" => String,
"SequenceNumber" => String
}
]
}
{
"Data" => String,
"PartitionKey" => String,
"SequenceNumber" => String
}
]
}
tests("#create_stream").returns("") do
result = Fog::AWS[:kinesis].create_stream("StreamName" => @stream_id).body
while Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["StreamStatus"] != "ACTIVE"
sleep 1
print '.'
Fog::AWS[:kinesis].create_stream("StreamName" => @stream_id).body.tap do
wait_for_status.call("ACTIVE")
end
result
end
tests("#list_streams").formats(@list_streams_format, false) do
@ -108,15 +105,15 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do
tests("#put_records").formats(@put_records_format, false) do
records = [
{
"Data" => Base64.encode64("foo").chomp!,
"PartitionKey" => "1"
},
{
"Data" => Base64.encode64("bar").chomp!,
"PartitionKey" => "1"
}
]
{
"Data" => Base64.encode64("foo").chomp!,
"PartitionKey" => "1"
},
{
"Data" => Base64.encode64("bar").chomp!,
"PartitionKey" => "1"
}
]
Fog::AWS[:kinesis].put_records("StreamName" => @stream_id, "Records" => records).body
end
@ -150,6 +147,39 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do
data
end
tests("#split_shard").formats("") do
shard = Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["Shards"].first
shard_id = shard["ShardId"]
ending_hash_key = shard["HashKeyRange"]["EndingHashKey"]
new_starting_hash_key = (ending_hash_key.to_i / 2).to_s
result = Fog::AWS[:kinesis].split_shard("StreamName" => @stream_id, "ShardToSplit" => shard_id, "NewStartingHashKey" => new_starting_hash_key).body
wait_for_status.call("ACTIVE")
shards = Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["Shards"]
parent_shard = shards.detect{ |shard| shard["ShardId"] == shard_id }
child_shards = shards.select{ |shard| shard["ParentShardId"] == shard_id }.sort_by{ |shard| shard["ShardId"] }
returns(3) { shards.size }
returns(2) { child_shards.size }
# parent is closed
returns(false) { parent_shard["SequenceNumberRange"]["EndingSequenceNumber"].nil? }
# ensure new ranges are what we expect (mostly for testing the mock)
returns([
{
"StartingHashKey" => "0",
"EndingHashKey" => (new_starting_hash_key.to_i - 1).to_s
},
{
"StartingHashKey" => new_starting_hash_key,
"EndingHashKey" => ending_hash_key
}
]) { child_shards.map{ |shard| shard["HashKeyRange"] } }
result
end
tests("#delete_stream").returns("") do
Fog::AWS[:kinesis].delete_stream("StreamName" => @stream_id).body
end