mirror of
https://github.com/fog/fog-aws.git
synced 2022-11-09 13:50:52 -05:00
Kinesis add put_records
This commit is contained in:
parent
d2b1d8623b
commit
301a642abd
3 changed files with 64 additions and 1 deletions
|
@ -13,6 +13,7 @@ module Fog
|
||||||
request :create_stream
|
request :create_stream
|
||||||
request :delete_stream
|
request :delete_stream
|
||||||
request :get_shard_iterator
|
request :get_shard_iterator
|
||||||
|
request :put_records
|
||||||
|
|
||||||
class Real
|
class Real
|
||||||
include Fog::AWS::CredentialFetcher::ConnectionMethods
|
include Fog::AWS::CredentialFetcher::ConnectionMethods
|
||||||
|
|
42
lib/fog/aws/requests/kinesis/put_records.rb
Normal file
42
lib/fog/aws/requests/kinesis/put_records.rb
Normal file
|
@ -0,0 +1,42 @@
|
||||||
|
module Fog
|
||||||
|
module AWS
|
||||||
|
class Kinesis
|
||||||
|
class Real
|
||||||
|
# Writes multiple data records from a producer into an Amazon Kinesis stream in a single call (also referred to as a PutRecords request).
|
||||||
|
#
|
||||||
|
# ==== Options
|
||||||
|
# * Records<~Array>: The records associated with the request.
|
||||||
|
# * Record<~Hash>: A record.
|
||||||
|
# * Data<~Blob>: The data blob to put into the record, which is base64-encoded when the blob is serialized.
|
||||||
|
# * ExplicitHashKey<~String>: The hash value used to determine explicitly the shard that the data record is assigned to by overriding the partition key hash.
|
||||||
|
# * PartitionKey<~String>: Determines which shard in the stream the data record is assigned to.
|
||||||
|
# * StreamName<~String>: The stream name associated with the request.
|
||||||
|
# ==== Returns
|
||||||
|
# * response<~Excon::Response>:
|
||||||
|
#
|
||||||
|
# ==== See Also
|
||||||
|
# https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html
|
||||||
|
#
|
||||||
|
def put_records(options={})
|
||||||
|
body = {
|
||||||
|
"Records" => options.delete("Records"),
|
||||||
|
"StreamName" => options.delete("StreamName")
|
||||||
|
}.reject{ |_,v| v.nil? }
|
||||||
|
|
||||||
|
response = request({
|
||||||
|
'X-Amz-Target' => 'Kinesis_20131202.PutRecords',
|
||||||
|
:body => body,
|
||||||
|
}.merge(options))
|
||||||
|
response.body = Fog::JSON.decode(response.body) unless response.body.nil?
|
||||||
|
response
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class Mock
|
||||||
|
def put_records(options={})
|
||||||
|
raise Fog::Mock::NotImplementedError
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -20,6 +20,7 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do
|
||||||
rescue Excon::Errors::BadRequest; end
|
rescue Excon::Errors::BadRequest; end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# optional keys are commented out
|
||||||
@list_streams_format = {
|
@list_streams_format = {
|
||||||
"HasMoreStreams" => Fog::Boolean,
|
"HasMoreStreams" => Fog::Boolean,
|
||||||
"StreamNames" => [
|
"StreamNames" => [
|
||||||
|
@ -27,7 +28,6 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
# optional keys are commented out
|
|
||||||
@describe_stream_format = {
|
@describe_stream_format = {
|
||||||
"StreamDescription" => {
|
"StreamDescription" => {
|
||||||
"HasMoreShards" => Fog::Boolean,
|
"HasMoreShards" => Fog::Boolean,
|
||||||
|
@ -56,6 +56,18 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do
|
||||||
"ShardIterator" => String
|
"ShardIterator" => String
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@put_records_format = {
|
||||||
|
"FailedRecordCount" => Integer,
|
||||||
|
"Records" => [
|
||||||
|
{
|
||||||
|
# "ErrorCode" => String,
|
||||||
|
# "ErrorMessage" => String,
|
||||||
|
"SequenceNumber" => String,
|
||||||
|
"ShardId" => String
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
tests("#create_stream").returns("") do
|
tests("#create_stream").returns("") do
|
||||||
result = Fog::AWS[:kinesis].create_stream("StreamName" => @stream_id).body
|
result = Fog::AWS[:kinesis].create_stream("StreamName" => @stream_id).body
|
||||||
while Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["StreamStatus"] != "ACTIVE"
|
while Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["StreamStatus"] != "ACTIVE"
|
||||||
|
@ -73,6 +85,14 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do
|
||||||
Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body
|
Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body
|
||||||
end
|
end
|
||||||
|
|
||||||
|
tests("#put_records").formats(@put_records_format, false) do
|
||||||
|
records = [{
|
||||||
|
"Data" => Base64.encode64("foo").chomp!,
|
||||||
|
"PartitionKey" => "foo"
|
||||||
|
}]
|
||||||
|
Fog::AWS[:kinesis].put_records("StreamName" => @stream_id, "Records" => records).body
|
||||||
|
end
|
||||||
|
|
||||||
tests("#get_shard_iterator").formats(@get_shard_iterator_format) do
|
tests("#get_shard_iterator").formats(@get_shard_iterator_format) do
|
||||||
first_shard_id = Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["Shards"].first["ShardId"]
|
first_shard_id = Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["Shards"].first["ShardId"]
|
||||||
Fog::AWS[:kinesis].get_shard_iterator("StreamName" => @stream_id, "ShardId" => first_shard_id, "ShardIteratorType" => "TRIM_HORIZON").body
|
Fog::AWS[:kinesis].get_shard_iterator("StreamName" => @stream_id, "ShardId" => first_shard_id, "ShardIteratorType" => "TRIM_HORIZON").body
|
||||||
|
|
Loading…
Reference in a new issue