From 301a642abd34016eb2001073173496f197fc3337 Mon Sep 17 00:00:00 2001 From: Michael Hale Date: Fri, 19 Jun 2015 17:30:10 -0400 Subject: [PATCH] Kinesis add put_records --- lib/fog/aws/kinesis.rb | 1 + lib/fog/aws/requests/kinesis/put_records.rb | 42 +++++++++++++++++++++ tests/requests/kinesis/stream_tests.rb | 22 ++++++++++- 3 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 lib/fog/aws/requests/kinesis/put_records.rb diff --git a/lib/fog/aws/kinesis.rb b/lib/fog/aws/kinesis.rb index c0ffeda88..97dc1ad02 100644 --- a/lib/fog/aws/kinesis.rb +++ b/lib/fog/aws/kinesis.rb @@ -13,6 +13,7 @@ module Fog request :create_stream request :delete_stream request :get_shard_iterator + request :put_records class Real include Fog::AWS::CredentialFetcher::ConnectionMethods diff --git a/lib/fog/aws/requests/kinesis/put_records.rb b/lib/fog/aws/requests/kinesis/put_records.rb new file mode 100644 index 000000000..206ecbd1a --- /dev/null +++ b/lib/fog/aws/requests/kinesis/put_records.rb @@ -0,0 +1,42 @@ +module Fog + module AWS + class Kinesis + class Real + # Writes multiple data records from a producer into an Amazon Kinesis stream in a single call (also referred to as a PutRecords request). + # + # ==== Options + # * Records<~Array>: The records associated with the request. + # * Record<~Hash>: A record. + # * Data<~Blob>: The data blob to put into the record, which is base64-encoded when the blob is serialized. + # * ExplicitHashKey<~String>: The hash value used to determine explicitly the shard that the data record is assigned to by overriding the partition key hash. + # * PartitionKey<~String>: Determines which shard in the stream the data record is assigned to. + # * StreamName<~String>: The stream name associated with the request. + # ==== Returns + # * response<~Excon::Response>: + # + # ==== See Also + # https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html + # + def put_records(options={}) + body = { + "Records" => options.delete("Records"), + "StreamName" => options.delete("StreamName") + }.reject{ |_,v| v.nil? } + + response = request({ + 'X-Amz-Target' => 'Kinesis_20131202.PutRecords', + :body => body, + }.merge(options)) + response.body = Fog::JSON.decode(response.body) unless response.body.nil? + response + end + end + + class Mock + def put_records(options={}) + raise Fog::Mock::NotImplementedError + end + end + end + end +end diff --git a/tests/requests/kinesis/stream_tests.rb b/tests/requests/kinesis/stream_tests.rb index 54b76e378..6105fb5c5 100644 --- a/tests/requests/kinesis/stream_tests.rb +++ b/tests/requests/kinesis/stream_tests.rb @@ -20,6 +20,7 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do rescue Excon::Errors::BadRequest; end end + # optional keys are commented out @list_streams_format = { "HasMoreStreams" => Fog::Boolean, "StreamNames" => [ @@ -27,7 +28,6 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do ] } - # optional keys are commented out @describe_stream_format = { "StreamDescription" => { "HasMoreShards" => Fog::Boolean, @@ -56,6 +56,18 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do "ShardIterator" => String } + @put_records_format = { + "FailedRecordCount" => Integer, + "Records" => [ + { + # "ErrorCode" => String, + # "ErrorMessage" => String, + "SequenceNumber" => String, + "ShardId" => String + } + ] + } + tests("#create_stream").returns("") do result = Fog::AWS[:kinesis].create_stream("StreamName" => @stream_id).body while Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["StreamStatus"] != "ACTIVE" @@ -73,6 +85,14 @@ Shindo.tests('AWS::Kinesis | stream requests', ['aws', 'kinesis']) do Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body end + tests("#put_records").formats(@put_records_format, false) do + records = [{ + "Data" => Base64.encode64("foo").chomp!, + "PartitionKey" => "foo" + }] + Fog::AWS[:kinesis].put_records("StreamName" => @stream_id, "Records" => records).body + end + tests("#get_shard_iterator").formats(@get_shard_iterator_format) do first_shard_id = Fog::AWS[:kinesis].describe_stream("StreamName" => @stream_id).body["StreamDescription"]["Shards"].first["ShardId"] Fog::AWS[:kinesis].get_shard_iterator("StreamName" => @stream_id, "ShardId" => first_shard_id, "ShardIteratorType" => "TRIM_HORIZON").body