From 3ca6b75c868c1986ec011a35e8a886138bee70a1 Mon Sep 17 00:00:00 2001 From: Michael Hale Date: Thu, 2 Jul 2015 15:23:25 -0400 Subject: [PATCH] Add split_shard request --- lib/fog/aws/kinesis.rb | 31 ++++++- lib/fog/aws/requests/kinesis/create_stream.rb | 4 +- lib/fog/aws/requests/kinesis/put_record.rb | 2 +- lib/fog/aws/requests/kinesis/put_records.rb | 2 +- lib/fog/aws/requests/kinesis/split_shard.rb | 83 +++++++++++++++++++ tests/requests/kinesis/stream_tests.rb | 80 ++++++++++++------ 6 files changed, 169 insertions(+), 33 deletions(-) create mode 100644 lib/fog/aws/requests/kinesis/split_shard.rb diff --git a/lib/fog/aws/kinesis.rb b/lib/fog/aws/kinesis.rb index 36c3fddc1..7b6d15c07 100644 --- a/lib/fog/aws/kinesis.rb +++ b/lib/fog/aws/kinesis.rb @@ -22,6 +22,11 @@ module Fog request :put_records request :put_record request :get_records + request :split_shard + # request :merge_shards + # request :add_tags_to_stream + # request :list_tags_for_stream + # request :remove_tags_from_stream class Real include Fog::AWS::CredentialFetcher::ConnectionMethods @@ -114,6 +119,11 @@ module Fog end class Mock + def self.mutex + @mutex ||= Mutex.new + end + def mutex; self.class.mutex; end + def self.data @data ||= Hash.new do |hash, region| hash[region] = Hash.new do |region_hash, key| @@ -132,7 +142,6 @@ module Fog @account_id = Fog::AWS::Mock.owner_id @aws_access_key_id = options[:aws_access_key_id] @region = options[:region] || 'us-east-1' - @sequence_number = 0 unless ['ap-northeast-1', 'ap-southeast-1', 'ap-southeast-2', 'eu-central-1', 'eu-west-1', 'sa-east-1', 'us-east-1', 'us-west-1', 'us-west-2'].include?(@region) raise ArgumentError, "Unknown region: #{@region.inspect}" @@ -147,10 +156,24 @@ module Fog self.class.data[@region].delete(@aws_access_key_id) end - def next_sequence_number - @sequence_number += 1 - @sequence_number + def self.next_sequence_number + mutex.synchronize do + @sequence_number ||= -1 + @sequence_number += 1 + @sequence_number.to_s + end end + def next_sequence_number; self.class.next_sequence_number; end + + def self.next_shard_id + mutex.synchronize do + @shard_id ||= -1 + @shard_id += 1 + "shardId-#{@shard_id.to_s.rjust(12, "0")}" + end + end + def next_shard_id; self.class.next_shard_id; end + end end diff --git a/lib/fog/aws/requests/kinesis/create_stream.rb b/lib/fog/aws/requests/kinesis/create_stream.rb index 596f7a41e..bfa2fb420 100644 --- a/lib/fog/aws/requests/kinesis/create_stream.rb +++ b/lib/fog/aws/requests/kinesis/create_stream.rb @@ -44,9 +44,9 @@ module Fog "StartingHashKey"=>"0" }, "SequenceNumberRange"=>{ - "StartingSequenceNumber"=> next_sequence_number.to_s + "StartingSequenceNumber"=> next_sequence_number }, - "ShardId"=>"shardId-#{shard.to_s.rjust(12, "0")}", + "ShardId"=>next_shard_id, "Records" => [] } end diff --git a/lib/fog/aws/requests/kinesis/put_record.rb b/lib/fog/aws/requests/kinesis/put_record.rb index 76c13e5c3..26e2963fa 100644 --- a/lib/fog/aws/requests/kinesis/put_record.rb +++ b/lib/fog/aws/requests/kinesis/put_record.rb @@ -41,7 +41,7 @@ module Fog raise 'unknown stream' end - sequence_number = next_sequence_number.to_s + sequence_number = next_sequence_number data = options.delete("Data") partition_key = options.delete("PartitionKey") diff --git a/lib/fog/aws/requests/kinesis/put_records.rb b/lib/fog/aws/requests/kinesis/put_records.rb index f404d16c2..3c7648550 100644 --- a/lib/fog/aws/requests/kinesis/put_records.rb +++ b/lib/fog/aws/requests/kinesis/put_records.rb @@ -41,7 +41,7 @@ module Fog records = options.delete("Records") record_results = records.map { |r| - sequence_number = next_sequence_number.to_s + sequence_number = next_sequence_number sample_method = RUBY_VERSION == "1.8.7" ? :choice : :sample shard_id = stream["Shards"].send(sample_method)["ShardId"] diff --git a/lib/fog/aws/requests/kinesis/split_shard.rb b/lib/fog/aws/requests/kinesis/split_shard.rb new file mode 100644 index 000000000..b47a654cd --- /dev/null +++ b/lib/fog/aws/requests/kinesis/split_shard.rb @@ -0,0 +1,83 @@ +module Fog + module AWS + class Kinesis + class Real + # Splits a shard into two new shards in the stream, to increase the stream's capacity to ingest and transport data. + # + # ==== Options + # * NewStartingHashKey<~String>: A hash key value for the starting hash key of one of the child shards created by the split. + # * ShardToSplit<~String>: The shard ID of the shard to split. + # * StreamName<~String>: The name of the stream for the shard split. + # ==== Returns + # * response<~Excon::Response>: + # + # ==== See Also + # https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SplitShard.html + # + def split_shard(options={}) + body = { + "NewStartingHashKey" => options.delete("NewStartingHashKey"), + "ShardToSplit" => options.delete("ShardToSplit"), + "StreamName" => options.delete("StreamName") + }.reject{ |_,v| v.nil? } + + response = request({ + 'X-Amz-Target' => "Kinesis_#{@version}.SplitShard", + :body => body, + }.merge(options)) + response + end + end + + class Mock + def split_shard(options={}) + stream_name = options.delete("StreamName") + shard_id = options.delete("ShardToSplit") + stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } + + unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } + raise 'unknown shard' + end + + # Close original shard (set an EndingSequenceNumber on it) + shard["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number + + # Calculate new shard ranges + parent_starting_hash_key = shard["HashKeyRange"]["StartingHashKey"] + parent_ending_hash_key = shard["HashKeyRange"]["EndingHashKey"] + new_starting_hash_key = options.delete("NewStartingHashKey") + + # Create two new shards using contiguous hash space based on the original shard + stream["Shards"] << { + "HashKeyRange"=> { + "EndingHashKey" => (new_starting_hash_key.to_i - 1).to_s, + "StartingHashKey" => parent_starting_hash_key + }, + "SequenceNumberRange" => { + "StartingSequenceNumber" => next_sequence_number + }, + "ShardId" => next_shard_id, + "ParentShardId" => shard_id + } + stream["Shards"] << { + "HashKeyRange" => { + "EndingHashKey" => parent_ending_hash_key, + "StartingHashKey" => new_starting_hash_key + }, + "SequenceNumberRange" =>{ + "StartingSequenceNumber" => next_sequence_number + }, + "ShardId" => next_shard_id, + "ParentShardId" => shard_id + } + + response = Excon::Response.new + response.status = 200 + response.body = "" + response + + end + end + end + end +end diff --git a/tests/requests/kinesis/stream_tests.rb b/tests/requests/kinesis/stream_tests.rb index 262d20765..a86027be3 100644 --- a/tests/requests/kinesis/stream_tests.rb +++ b/tests/requests/kinesis/stream_tests.rb @@ -2,9 +2,9 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do @stream_id = 'fog-test-stream' tests('success') do - wait_for_delete = lambda { + wait_for_status = lambda { |status| begin - while Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["StreamStatus"] == "DELETING" + while Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["StreamStatus"] != status sleep 1 print '.' end @@ -13,10 +13,10 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do # ensure we start from a clean slate if Fog::AWS[:kinesis].list_streams.body["StreamNames"].include?(@stream_id) - wait_for_delete.call + wait_for_status.call("ACTIVE") begin Fog::AWS[:kinesis].delete_stream("StreamName" => @stream_id) - wait_for_delete.call + wait_for_status.call("ACTIVE") rescue Excon::Errors::BadRequest; end end @@ -77,21 +77,18 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do "MillisBehindLatest" => Integer, "NextShardIterator" => String, "Records" => [ - { - "Data" => String, - "PartitionKey" => String, - "SequenceNumber" => String - } - ] - } + { + "Data" => String, + "PartitionKey" => String, + "SequenceNumber" => String + } + ] + } tests("#create_stream").returns("") do - result = Fog::AWS[:kinesis].create_stream("StreamName" => @stream_id).body - while Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["StreamStatus"] != "ACTIVE" - sleep 1 - print '.' + Fog::AWS[:kinesis].create_stream("StreamName" => @stream_id).body.tap do + wait_for_status.call("ACTIVE") end - result end tests("#list_streams").formats(@list_streams_format, false) do @@ -108,15 +105,15 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do tests("#put_records").formats(@put_records_format, false) do records = [ - { - "Data" => Base64.encode64("foo").chomp!, - "PartitionKey" => "1" - }, - { - "Data" => Base64.encode64("bar").chomp!, - "PartitionKey" => "1" - } - ] + { + "Data" => Base64.encode64("foo").chomp!, + "PartitionKey" => "1" + }, + { + "Data" => Base64.encode64("bar").chomp!, + "PartitionKey" => "1" + } + ] Fog::AWS[:kinesis].put_records("StreamName" => @stream_id, "Records" => records).body end @@ -150,6 +147,39 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do data end + tests("#split_shard").formats("") do + shard = Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["Shards"].first + shard_id = shard["ShardId"] + ending_hash_key = shard["HashKeyRange"]["EndingHashKey"] + new_starting_hash_key = (ending_hash_key.to_i / 2).to_s + + result = Fog::AWS[:kinesis].split_shard("StreamName" => @stream_id, "ShardToSplit" => shard_id, "NewStartingHashKey" => new_starting_hash_key).body + + wait_for_status.call("ACTIVE") + shards = Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["Shards"] + parent_shard = shards.detect{ |shard| shard["ShardId"] == shard_id } + child_shards = shards.select{ |shard| shard["ParentShardId"] == shard_id }.sort_by{ |shard| shard["ShardId"] } + + returns(3) { shards.size } + returns(2) { child_shards.size } + # parent is closed + returns(false) { parent_shard["SequenceNumberRange"]["EndingSequenceNumber"].nil? } + + # ensure new ranges are what we expect (mostly for testing the mock) + returns([ + { + "StartingHashKey" => "0", + "EndingHashKey" => (new_starting_hash_key.to_i - 1).to_s + }, + { + "StartingHashKey" => new_starting_hash_key, + "EndingHashKey" => ending_hash_key + } + ]) { child_shards.map{ |shard| shard["HashKeyRange"] } } + + result + end + tests("#delete_stream").returns("") do Fog::AWS[:kinesis].delete_stream("StreamName" => @stream_id).body end