From b5a1a3576cc26377827dd65ee33058913ba71daf Mon Sep 17 00:00:00 2001 From: Michael Hale Date: Thu, 2 Jul 2015 15:57:29 -0400 Subject: [PATCH] Implement merge_shards --- lib/fog/aws/kinesis.rb | 2 +- lib/fog/aws/requests/kinesis/merge_shards.rb | 86 ++++++++++++++++++++ lib/fog/aws/requests/kinesis/split_shard.rb | 5 +- tests/requests/kinesis/stream_tests.rb | 37 +++++++-- 4 files changed, 122 insertions(+), 8 deletions(-) create mode 100644 lib/fog/aws/requests/kinesis/merge_shards.rb diff --git a/lib/fog/aws/kinesis.rb b/lib/fog/aws/kinesis.rb index 7b6d15c07..5cbb01d07 100644 --- a/lib/fog/aws/kinesis.rb +++ b/lib/fog/aws/kinesis.rb @@ -23,7 +23,7 @@ module Fog request :put_record request :get_records request :split_shard - # request :merge_shards + request :merge_shards # request :add_tags_to_stream # request :list_tags_for_stream # request :remove_tags_from_stream diff --git a/lib/fog/aws/requests/kinesis/merge_shards.rb b/lib/fog/aws/requests/kinesis/merge_shards.rb new file mode 100644 index 000000000..93ce8e60a --- /dev/null +++ b/lib/fog/aws/requests/kinesis/merge_shards.rb @@ -0,0 +1,86 @@ +module Fog + module AWS + class Kinesis + class Real + # Merges two adjacent shards in a stream and combines them into a single shard to reduce the stream's capacity to ingest and transport data. + # + # ==== Options + # * AdjacentShardToMerge<~String>: The shard ID of the adjacent shard for the merge. + # * ShardToMerge<~String>: The shard ID of the shard to combine with the adjacent shard for the merge. + # * StreamName<~String>: The name of the stream for the merge. + # ==== Returns + # * response<~Excon::Response>: + # + # ==== See Also + # https://docs.aws.amazon.com/kinesis/latest/APIReference/API_MergeShards.html + # + def merge_shards(options={}) + body = { + "AdjacentShardToMerge" => options.delete("AdjacentShardToMerge"), + "ShardToMerge" => options.delete("ShardToMerge"), + "StreamName" => options.delete("StreamName") + }.reject{ |_,v| v.nil? } + + response = request({ + 'X-Amz-Target' => "Kinesis_#{@version}.MergeShards", + :body => body, + }.merge(options)) + response + end + end + + class Mock + def merge_shards(options={}) + stream_name = options.delete("StreamName") + shard_to_merge_id = options.delete("ShardToMerge") + adjacent_shard_to_merge_id = options.delete("AdjacentShardToMerge") + + unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } + raise 'unknown stream' + end + + unless shard_to_merge = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_to_merge_id } + raise 'unknown shard' + end + + unless adjacent_shard_to_merge = stream["Shards"].detect{ |shard| shard["ShardId"] == adjacent_shard_to_merge_id } + raise 'unknown shard' + end + + # Close shards (set an EndingSequenceNumber on them) + shard_to_merge["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number + adjacent_shard_to_merge["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number + + new_starting_hash_key = [ + shard_to_merge["HashKeyRange"]["StartingHashKey"].to_i, + adjacent_shard_to_merge["HashKeyRange"]["StartingHashKey"].to_i + ].min.to_s + + new_ending_hash_key = [ + shard_to_merge["HashKeyRange"]["EndingHashKey"].to_i, + adjacent_shard_to_merge["HashKeyRange"]["EndingHashKey"].to_i + ].max.to_s + + # create a new shard with ParentShardId and AdjacentParentShardID + stream["Shards"] << { + "HashKeyRange"=> { + "EndingHashKey" => new_ending_hash_key, + "StartingHashKey" => new_starting_hash_key + }, + "SequenceNumberRange" => { + "StartingSequenceNumber" => next_sequence_number + }, + "ShardId" => next_shard_id, + "ParentShardId" => shard_to_merge_id, + "AdjacentParentShardId" => adjacent_shard_to_merge_id + } + + response = Excon::Response.new + response.status = 200 + response.body = "" + response + end + end + end + end +end diff --git a/lib/fog/aws/requests/kinesis/split_shard.rb b/lib/fog/aws/requests/kinesis/split_shard.rb index b47a654cd..44dfc805b 100644 --- a/lib/fog/aws/requests/kinesis/split_shard.rb +++ b/lib/fog/aws/requests/kinesis/split_shard.rb @@ -35,6 +35,10 @@ module Fog shard_id = options.delete("ShardToSplit") stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } + unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } + raise 'unknown stream' + end + unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } raise 'unknown shard' end @@ -75,7 +79,6 @@ module Fog response.status = 200 response.body = "" response - end end end diff --git a/tests/requests/kinesis/stream_tests.rb b/tests/requests/kinesis/stream_tests.rb index a86027be3..0c3710d87 100644 --- a/tests/requests/kinesis/stream_tests.rb +++ b/tests/requests/kinesis/stream_tests.rb @@ -92,11 +92,11 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do end tests("#list_streams").formats(@list_streams_format, false) do - Fog::AWS[:kinesis].list_streams.body - end - - tests("#list_streams").returns(true) do - Fog::AWS[:kinesis].list_streams.body["StreamNames"].include?(@stream_id) + Fog::AWS[:kinesis].list_streams.body.tap do + returns(true) { + Fog::AWS[:kinesis].list_streams.body["StreamNames"].include?(@stream_id) + } + end end tests("#describe_stream").formats(@describe_stream_format) do @@ -147,7 +147,7 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do data end - tests("#split_shard").formats("") do + tests("#split_shard").returns("") do shard = Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["Shards"].first shard_id = shard["ShardId"] ending_hash_key = shard["HashKeyRange"]["EndingHashKey"] @@ -180,6 +180,31 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do result end + tests("#merge_shards").returns("") do + shards = Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["Shards"] + child_shard_ids = shards.reject{ |shard| shard["SequenceNumberRange"].has_key?("EndingSequenceNumber") }.map{ |shard| shard["ShardId"] }.sort + result = Fog::AWS[:kinesis].merge_shards("StreamName" => @stream_id, "ShardToMerge" => child_shard_ids[0], "AdjacentShardToMerge" => child_shard_ids[1]).body + + wait_for_status.call("ACTIVE") + shards = Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["Shards"] + parent_shards = shards.select{ |shard| child_shard_ids.include?(shard["ShardId"]) } + child_shard = shards.detect{ |shard| + shard["ParentShardId"] == child_shard_ids[0] && + shard["AdjacentParentShardId"] == child_shard_ids[1] + } + + returns(2) { parent_shards.size } + returns(false) { child_shard.nil? } + returns({ + "EndingHashKey" => "340282366920938463463374607431768211455", + "StartingHashKey" => "0" + }) { + child_shard["HashKeyRange"] + } + + result + end + tests("#delete_stream").returns("") do Fog::AWS[:kinesis].delete_stream("StreamName" => @stream_id).body end