1
0
Fork 0
mirror of https://github.com/fog/fog-aws.git synced 2022-11-09 13:50:52 -05:00

Kinesis create/delete/describe

This commit is contained in:
Michael Hale 2015-06-19 16:55:32 -04:00
parent 98fa5997e5
commit 16922f0f39
5 changed files with 176 additions and 1 deletions

View file

@ -9,6 +9,9 @@ module Fog
request_path 'fog/aws/requests/kinesis'
request :list_streams
request :describe_stream
request :create_stream
request :delete_stream
class Real
include Fog::AWS::CredentialFetcher::ConnectionMethods

View file

@ -0,0 +1,38 @@
module Fog
module AWS
class Kinesis
class Real
# Creates a Amazon Kinesis stream.
#
# ==== Options
# * ShardCount<~Number>: The number of shards that the stream will use.
# * StreamName<~String>: A name to identify the stream.
# ==== Returns
# * response<~Excon::Response>:
#
# ==== See Also
# https://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html
#
def create_stream(options={})
body = {
"ShardCount" => options.delete("ShardCount") || 1,
"StreamName" => options.delete("StreamName")
}.reject{ |_,v| v.nil? }
response = request({
'X-Amz-Target' => 'Kinesis_20131202.CreateStream',
:body => body,
}.merge(options))
# response.body = Fog::JSON.decode(response.body) unless response.body.nil?
response
end
end
class Mock
def create_streams(options={})
raise Fog::Mock::NotImplementedError
end
end
end
end
end

View file

@ -0,0 +1,36 @@
module Fog
module AWS
class Kinesis
class Real
# Deletes a stream and all its shards and data.
#
# ==== Options
# * StreamName<~String>: A name to identify the stream.
# ==== Returns
# * response<~Excon::Response>:
#
# ==== See Also
# https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeleteStream.html
#
def delete_stream(options={})
body = {
"StreamName" => options.delete("StreamName")
}.reject{ |_,v| v.nil? }
response = request({
'X-Amz-Target' => 'Kinesis_20131202.DeleteStream',
:body => body,
}.merge(options))
# response.body = Fog::JSON.decode(response.body) unless response.body.nil?
response
end
end
class Mock
def delete_streams(options={})
raise Fog::Mock::NotImplementedError
end
end
end
end
end

View file

@ -0,0 +1,40 @@
module Fog
module AWS
class Kinesis
class Real
# Describes the specified stream.
#
# ==== Options
# * ExclusiveStartShardId<~String>: The shard ID of the shard to start with.
# * Limit<~Number>: The maximum number of shards to return.
# * StreamName<~String>: The name of the stream to describe.
# ==== Returns
# * response<~Excon::Response>:
#
# ==== See Also
# https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html
#
def describe_stream(options={})
body = {
"ExclusiveStartShardId" => options.delete("ExclusiveStartShardId"),
"Limit" => options.delete("Limit"),
"StreamName" => options.delete("StreamName")
}.reject{ |_,v| v.nil? }
response = request({
'X-Amz-Target' => 'Kinesis_20131202.DescribeStream',
:body => body,
}.merge(options))
response.body = Fog::JSON.decode(response.body) unless response.body.nil?
response
end
end
class Mock
def describe_streams(options={})
raise Fog::Mock::NotImplementedError
end
end
end
end
end

View file

@ -1,5 +1,25 @@
Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do
@stream_id = 'fog-test-stream'
tests('success') do
wait_for_delete = lambda {
begin
while Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["StreamStatus"] == "DELETING"
sleep 1
print '.'
end
rescue Excon::Errors::BadRequest; end
}
# ensure we start from a clean slate
if Fog::AWS[:kinesis].list_streams.body["StreamNames"].include?(@stream_id)
wait_for_delete.()
begin
Fog::AWS[:kinesis].delete_stream("StreamName" => @stream_id)
wait_for_delete.()
rescue Excon::Errors::BadRequest; end
end
@list_streams_format = {
"HasMoreStreams" => Fog::Boolean,
"StreamNames" => [
@ -7,8 +27,46 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do
]
}
tests("#list_streams").formats(@list_streams_format) do
# optional keys are commented out
@describe_stream_format = {
"StreamDescription" => {
"HasMoreShards" => Fog::Boolean,
"Shards" => [
{
#"AdjacentParentShardId" => String,
"HashKeyRange" => {
"EndingHashKey" => String,
"StartingHashKey" => String
},
#"ParentShardId" => String,
"SequenceNumberRange" => {
# "EndingSequenceNumber" => String,
"StartingSequenceNumber" => String
},
"ShardId" => String
}
],
"StreamARN" => String,
"StreamName" => String,
"StreamStatus" => String
}
}
tests("#create_stream").returns("") do
Fog::AWS[:kinesis].create_stream("StreamName" => @stream_id).body
end
tests("#list_streams").formats(@list_streams_format, false) do
Fog::AWS[:kinesis].list_streams.body
end
tests("#describe_stream").formats(@describe_stream_format) do
Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body
end
tests("#delete_stream").returns("") do
Fog::AWS[:kinesis].delete_stream("StreamName" => @stream_id).body
end
end
end