mirror of
https://github.com/fog/fog-aws.git
synced 2022-11-09 13:50:52 -05:00
Add initial mock support for kinesis
This commit is contained in:
parent
561dcded5b
commit
7024576bb2
8 changed files with 179 additions and 12 deletions
|
@ -113,8 +113,42 @@ module Fog
|
|||
end
|
||||
|
||||
class Mock
|
||||
def self.data
|
||||
@data ||= Hash.new do |hash, region|
|
||||
hash[region] = Hash.new do |region_hash, key|
|
||||
region_hash[key] = {
|
||||
:kinesis_streams => {}
|
||||
}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.reset
|
||||
@data = nil
|
||||
end
|
||||
|
||||
def initialize(options={})
|
||||
raise Fog::Mock::NotImplementedError
|
||||
@account_id = Fog::AWS::Mock.owner_id
|
||||
@aws_access_key_id = options[:aws_access_key_id]
|
||||
@region = options[:region] || 'us-east-1'
|
||||
@sequence_number = 0
|
||||
|
||||
unless ['ap-northeast-1', 'ap-southeast-1', 'ap-southeast-2', 'eu-central-1', 'eu-west-1', 'sa-east-1', 'us-east-1', 'us-west-1', 'us-west-2'].include?(@region)
|
||||
raise ArgumentError, "Unknown region: #{@region.inspect}"
|
||||
end
|
||||
end
|
||||
|
||||
def data
|
||||
self.class.data[@region][@aws_access_key_id]
|
||||
end
|
||||
|
||||
def reset_data
|
||||
self.class.data[@region].delete(@aws_access_key_id)
|
||||
end
|
||||
|
||||
def next_sequence_number
|
||||
@sequence_number += 1
|
||||
@sequence_number
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -29,8 +29,42 @@ module Fog
|
|||
end
|
||||
|
||||
class Mock
|
||||
def create_streams(options={})
|
||||
raise Fog::Mock::NotImplementedError
|
||||
def create_stream(options={})
|
||||
stream_name = options.delete("StreamName")
|
||||
shard_count = options.delete("ShardCount") || 1
|
||||
stream_arn = "arn:aws:kinesis:#{@region}:#{@account_id}:stream/#{stream_name}"
|
||||
|
||||
response = Excon::Response.new
|
||||
response.status = 200
|
||||
|
||||
if data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
|
||||
raise 'stream already exists'
|
||||
end
|
||||
|
||||
shards = (0...shard_count).map do |shard|
|
||||
{
|
||||
"HashKeyRange"=>{
|
||||
"EndingHashKey"=>"340282366920938463463374607431768211455",
|
||||
"StartingHashKey"=>"0"
|
||||
},
|
||||
"SequenceNumberRange"=>{
|
||||
"StartingSequenceNumber"=> next_sequence_number.to_s
|
||||
},
|
||||
"ShardId"=>"shardId-#{shard.to_s.rjust(12, "0")}",
|
||||
"Records" => []
|
||||
}
|
||||
end
|
||||
|
||||
data[:kinesis_streams] = [{
|
||||
"HasMoreShards" => false,
|
||||
"StreamARN" => stream_arn,
|
||||
"StreamName" => stream_name,
|
||||
"StreamStatus" => "ACTIVE",
|
||||
"Shards" => shards,
|
||||
}]
|
||||
|
||||
response.body = ""
|
||||
response
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -27,8 +27,19 @@ module Fog
|
|||
end
|
||||
|
||||
class Mock
|
||||
def delete_streams(options={})
|
||||
raise Fog::Mock::NotImplementedError
|
||||
def delete_stream(options={})
|
||||
stream_name = options.delete("StreamName")
|
||||
|
||||
unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
|
||||
raise 'unknown stream'
|
||||
end
|
||||
|
||||
data[:kinesis_streams].delete(stream)
|
||||
|
||||
response = Excon::Response.new
|
||||
response.status = 200
|
||||
response.body = ""
|
||||
response
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -27,13 +27,26 @@ module Fog
|
|||
:body => body,
|
||||
}.merge(options))
|
||||
response.body = Fog::JSON.decode(response.body) unless response.body.nil?
|
||||
response.body
|
||||
response
|
||||
end
|
||||
end
|
||||
|
||||
class Mock
|
||||
def describe_streams(options={})
|
||||
raise Fog::Mock::NotImplementedError
|
||||
def describe_stream(options={})
|
||||
stream_name = options.delete("StreamName")
|
||||
|
||||
unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
|
||||
raise 'unknown stream'
|
||||
end
|
||||
|
||||
# Strip Records key out of shards for response
|
||||
shards = stream["Shards"].reject{ |k,_| k == "Records" }
|
||||
|
||||
response = Excon::Response.new
|
||||
response.status = 200
|
||||
response.body = { "StreamDescription" => stream.dup.merge("Shards" => shards) }
|
||||
response
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -15,7 +15,7 @@ module Fog
|
|||
#
|
||||
def get_records(options={})
|
||||
body = {
|
||||
"Limit" => options.delete("Limit"),
|
||||
"Limit" => options.delete("Limit") || 1,
|
||||
"ShardIterator" => options.delete("ShardIterator")
|
||||
}.reject{ |_,v| v.nil? }
|
||||
|
||||
|
@ -30,7 +30,41 @@ module Fog
|
|||
|
||||
class Mock
|
||||
def get_records(options={})
|
||||
raise Fog::Mock::NotImplementedError
|
||||
shard_iterator = Fog::JSON.decode(options.delete("ShardIterator"))
|
||||
limit = options.delete("Limit") || 1
|
||||
stream_name = shard_iterator["StreamName"]
|
||||
shard_id = shard_iterator["ShardId"]
|
||||
starting_sequence_number = (shard_iterator["StartingSequenceNumber"] || 1).to_i
|
||||
|
||||
unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
|
||||
raise 'unknown stream'
|
||||
end
|
||||
|
||||
unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id }
|
||||
raise 'unknown shard'
|
||||
end
|
||||
|
||||
records = []
|
||||
shard["Records"].each do |record|
|
||||
next if record["SequenceNumber"].to_i < starting_sequence_number
|
||||
records << record
|
||||
break if records.size == limit
|
||||
end
|
||||
|
||||
shard_iterator["StartingSequenceNumber"] = if records.empty?
|
||||
starting_sequence_number.to_s
|
||||
else
|
||||
(records.last["SequenceNumber"].to_i + 1).to_s
|
||||
end
|
||||
|
||||
response = Excon::Response.new
|
||||
response.status = 200
|
||||
response.body = {
|
||||
"MillisBehindLatest"=> 0,
|
||||
"NextShardIterator"=> Fog::JSON.encode(shard_iterator),
|
||||
"Records"=> records
|
||||
}
|
||||
response
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -34,7 +34,18 @@ module Fog
|
|||
|
||||
class Mock
|
||||
def get_shard_iterator(options={})
|
||||
raise Fog::Mock::NotImplementedError
|
||||
stream_name = options["StreamName"]
|
||||
|
||||
unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
|
||||
raise 'unknown stream'
|
||||
end
|
||||
|
||||
response = Excon::Response.new
|
||||
response.status = 200
|
||||
response.body = {
|
||||
"ShardIterator" => Fog::JSON.encode(options) # just encode the options that were given, we decode them in get_records
|
||||
}
|
||||
response
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -26,7 +26,13 @@ module Fog
|
|||
|
||||
class Mock
|
||||
def list_streams(options={})
|
||||
raise Fog::Mock::NotImplementedError
|
||||
response = Excon::Response.new
|
||||
response.status = 200
|
||||
response.body = {
|
||||
"HasMoreStreams" => false,
|
||||
"StreamNames" => []
|
||||
}
|
||||
response
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -34,7 +34,31 @@ module Fog
|
|||
|
||||
class Mock
|
||||
def put_records(options={})
|
||||
raise Fog::Mock::NotImplementedError
|
||||
stream_name = options.delete("StreamName")
|
||||
unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
|
||||
raise 'unknown stream'
|
||||
end
|
||||
|
||||
records = options.delete("Records")
|
||||
record_results = records.map { |r|
|
||||
sequence_number = next_sequence_number.to_s
|
||||
shard_id = stream["Shards"].sample["ShardId"]
|
||||
shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id }
|
||||
# store the records on the shard(s)
|
||||
shard["Records"] << r.merge("SequenceNumber" => sequence_number)
|
||||
{
|
||||
"SequenceNumber" => sequence_number,
|
||||
"ShardId" => shard_id
|
||||
}
|
||||
}
|
||||
|
||||
response = Excon::Response.new
|
||||
response.status = 200
|
||||
response.body = {
|
||||
"FailedRecordCount" => 0,
|
||||
"Records" => record_results
|
||||
}
|
||||
response
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue