diff --git a/lib/fog/aws/kinesis.rb b/lib/fog/aws/kinesis.rb index 0a46db15b..c80272aee 100644 --- a/lib/fog/aws/kinesis.rb +++ b/lib/fog/aws/kinesis.rb @@ -9,6 +9,9 @@ module Fog request_path 'fog/aws/requests/kinesis' request :list_streams + request :describe_stream + request :create_stream + request :delete_stream class Real include Fog::AWS::CredentialFetcher::ConnectionMethods diff --git a/lib/fog/aws/requests/kinesis/create_stream.rb b/lib/fog/aws/requests/kinesis/create_stream.rb new file mode 100644 index 000000000..d7eaf9220 --- /dev/null +++ b/lib/fog/aws/requests/kinesis/create_stream.rb @@ -0,0 +1,38 @@ +module Fog + module AWS + class Kinesis + class Real + # Creates a Amazon Kinesis stream. + # + # ==== Options + # * ShardCount<~Number>: The number of shards that the stream will use. + # * StreamName<~String>: A name to identify the stream. + # ==== Returns + # * response<~Excon::Response>: + # + # ==== See Also + # https://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html + # + def create_stream(options={}) + body = { + "ShardCount" => options.delete("ShardCount") || 1, + "StreamName" => options.delete("StreamName") + }.reject{ |_,v| v.nil? } + + response = request({ + 'X-Amz-Target' => 'Kinesis_20131202.CreateStream', + :body => body, + }.merge(options)) + # response.body = Fog::JSON.decode(response.body) unless response.body.nil? + response + end + end + + class Mock + def create_streams(options={}) + raise Fog::Mock::NotImplementedError + end + end + end + end +end diff --git a/lib/fog/aws/requests/kinesis/delete_stream.rb b/lib/fog/aws/requests/kinesis/delete_stream.rb new file mode 100644 index 000000000..66977b74e --- /dev/null +++ b/lib/fog/aws/requests/kinesis/delete_stream.rb @@ -0,0 +1,36 @@ +module Fog + module AWS + class Kinesis + class Real + # Deletes a stream and all its shards and data. + # + # ==== Options + # * StreamName<~String>: A name to identify the stream. + # ==== Returns + # * response<~Excon::Response>: + # + # ==== See Also + # https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeleteStream.html + # + def delete_stream(options={}) + body = { + "StreamName" => options.delete("StreamName") + }.reject{ |_,v| v.nil? } + + response = request({ + 'X-Amz-Target' => 'Kinesis_20131202.DeleteStream', + :body => body, + }.merge(options)) + # response.body = Fog::JSON.decode(response.body) unless response.body.nil? + response + end + end + + class Mock + def delete_streams(options={}) + raise Fog::Mock::NotImplementedError + end + end + end + end +end diff --git a/lib/fog/aws/requests/kinesis/describe_stream.rb b/lib/fog/aws/requests/kinesis/describe_stream.rb new file mode 100644 index 000000000..26fd17a81 --- /dev/null +++ b/lib/fog/aws/requests/kinesis/describe_stream.rb @@ -0,0 +1,40 @@ +module Fog + module AWS + class Kinesis + class Real + # Describes the specified stream. + # + # ==== Options + # * ExclusiveStartShardId<~String>: The shard ID of the shard to start with. + # * Limit<~Number>: The maximum number of shards to return. + # * StreamName<~String>: The name of the stream to describe. + # ==== Returns + # * response<~Excon::Response>: + # + # ==== See Also + # https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html + # + def describe_stream(options={}) + body = { + "ExclusiveStartShardId" => options.delete("ExclusiveStartShardId"), + "Limit" => options.delete("Limit"), + "StreamName" => options.delete("StreamName") + }.reject{ |_,v| v.nil? } + + response = request({ + 'X-Amz-Target' => 'Kinesis_20131202.DescribeStream', + :body => body, + }.merge(options)) + response.body = Fog::JSON.decode(response.body) unless response.body.nil? + response + end + end + + class Mock + def describe_streams(options={}) + raise Fog::Mock::NotImplementedError + end + end + end + end +end diff --git a/tests/requests/kinesis/stream_tests.rb b/tests/requests/kinesis/stream_tests.rb index e5ac5518d..aceca5f0f 100644 --- a/tests/requests/kinesis/stream_tests.rb +++ b/tests/requests/kinesis/stream_tests.rb @@ -1,5 +1,25 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do + @stream_id = 'fog-test-stream' + tests('success') do + wait_for_delete = lambda { + begin + while Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["StreamStatus"] == "DELETING" + sleep 1 + print '.' + end + rescue Excon::Errors::BadRequest; end + } + + # ensure we start from a clean slate + if Fog::AWS[:kinesis].list_streams.body["StreamNames"].include?(@stream_id) + wait_for_delete.() + begin + Fog::AWS[:kinesis].delete_stream("StreamName" => @stream_id) + wait_for_delete.() + rescue Excon::Errors::BadRequest; end + end + @list_streams_format = { "HasMoreStreams" => Fog::Boolean, "StreamNames" => [ @@ -7,8 +27,46 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do ] } - tests("#list_streams").formats(@list_streams_format) do + # optional keys are commented out + @describe_stream_format = { + "StreamDescription" => { + "HasMoreShards" => Fog::Boolean, + "Shards" => [ + { + #"AdjacentParentShardId" => String, + "HashKeyRange" => { + "EndingHashKey" => String, + "StartingHashKey" => String + }, + #"ParentShardId" => String, + "SequenceNumberRange" => { + # "EndingSequenceNumber" => String, + "StartingSequenceNumber" => String + }, + "ShardId" => String + } + ], + "StreamARN" => String, + "StreamName" => String, + "StreamStatus" => String + } + } + + tests("#create_stream").returns("") do + Fog::AWS[:kinesis].create_stream("StreamName" => @stream_id).body + end + + tests("#list_streams").formats(@list_streams_format, false) do Fog::AWS[:kinesis].list_streams.body end + + tests("#describe_stream").formats(@describe_stream_format) do + Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body + end + + tests("#delete_stream").returns("") do + Fog::AWS[:kinesis].delete_stream("StreamName" => @stream_id).body + end + end end