Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws_kinesis_stream sink batching is not working #20575

Open
romkimchi1002 opened this issue May 29, 2024 · 6 comments
Open

aws_kinesis_stream sink batching is not working #20575

romkimchi1002 opened this issue May 29, 2024 · 6 comments
Labels
sink: aws_kinesis_streams Anything `aws_kinesis_streams` sink related type: bug A code related bug.

Comments

@romkimchi1002
Copy link

romkimchi1002 commented May 29, 2024

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

records are not batched together before being sent to kinesis. each record is sent individually to kinesis

Configuration

acknowledgements:
  enabled: true

sinks:
  kinesis:
    type: 'aws_kinesis_streams'
    inputs:
      - 'events_source'
    encoding:
      codec: 'json'
    auth:
      access_key_id: '#######'
      secret_access_key: '#######'
    stream_name: '#######'
    region: '######'
    # using default batching configuration

sources:
  events_source:
    type: 'http_server'
    address: '127.0.0.1:5000'
    auth: 
      username: 'user'
      password: 'pass'
    encoding: 'json'
    method: 'POST'
    path: '/data'
    response_code: 202

Version

vector 0.38.0 (aarch64-apple-darwin)

Debug Output

No response

Example Data

i tried debugging it myself and downloded vecror as a source.
i added to this file
vector/src/sinks/aws_kinesis/streams/record.rs
this print line:

async fn send(
        &self,
        records: Vec<Self::T>,
        stream_name: String,
    ) -> Result<KinesisResponse, SdkError<Self::E, HttpResponse>> {
        let rec_count = records.len();
        let total_size = records
            .iter()
            .fold(0, |acc, record| acc + record.data().as_ref().len());

        println!("Sending {} records to stream {}. size: {}", rec_count, stream_name, total_size); // Debugging line

this line always return 1 no matter what configuration i'm using.

after modifying the code it seems that the partition_key is used for the batching, which is a bug since partition key should be a field used by the kinesis for spreading data across shards.

Additional Context

No response

References

No response

@romkimchi1002 romkimchi1002 added the type: bug A code related bug. label May 29, 2024
@jszwedko
Copy link
Member

I think there may be some, understandable, confusion here. The batch settings for the aws_kinesis sink correspond to how the events are batched into AWS API PutRecords calls. That is, if you the batch size is set to 500, it'll send 500 records in a single request. However, each event is always a single Kinesis record. It seems like you might be expecting it to batch within a Kinesis record.

Can you share more about your use-case for putting multiple events into a single Kinesis record?

@jszwedko
Copy link
Member

@romkimchi1002
Copy link
Author

I think there may be some, understandable, confusion here. The batch settings for the aws_kinesis sink correspond to how the events are batched into AWS API PutRecords calls. That is, if you the batch size is set to 500, it'll send 500 records in a single request. However, each event is always a single Kinesis record. It seems like you might be expecting it to batch within a Kinesis record.

Can you share more about your use-case for putting multiple events into a single Kinesis record?

hi! im not trying to batch them into a single kinesis event. The line i added in the vector source indicates that the putRecords only put 1 vector event instead of 500.

async fn send(
        &self,
        records: Vec<Self::T>,
        stream_name: String,
    ) -> Result<KinesisResponse, SdkError<Self::E, HttpResponse>> {
        let rec_count = records.len();
        let total_size = records
            .iter()
            .fold(0, |acc, record| acc + record.data().as_ref().len());

        println!("Sending {} records to stream {}. size: {}", rec_count, stream_name, total_size); // Debugging line

image

i think that the issue might be in this function: batched_partitioned in the file: vector/src/sinks/aws_kinesis/sink.rs. but i didn't find the definition of it anywhere. reminder that when i modified vector code to use a constant partition key, the batching works, i assume that batched_partitioned function uses the partition key for batching when it shouldn't.

when using:

 fn gen_partition_key(number_of_shards) -> String {
    "fixed_partition_key".to_string()
}

the debugging print gives:
image

@jszwedko
Copy link
Member

jszwedko commented Jun 3, 2024

Oh I see, thanks for the additional detail @romkimchi1002 ! It does look like there is a bug here then; likely with the partition key as you noted.

@jszwedko jszwedko added the sink: aws_kinesis_streams Anything `aws_kinesis_streams` sink related label Jun 3, 2024
@romkimchi1002
Copy link
Author

thanks!

@steven-aerts
Copy link
Contributor

I hereby confirm that this problem also exists for the aws_kinesis_firehose sink.
We were able to work around it, by selecting a (locally) constant field as partition_key_field:

[sinks.firehose_al]
  type = "aws_kinesis_firehose"
  inputs = ["input"]
  stream_name = "firehose-stream-name"
  encoding.codec = "json"
  partition_key_field = "hostname" # Workaround for https://github.com/vectordotdev/vector/issues/20575

steven-aerts added a commit to steven-aerts/vector that referenced this issue Jun 12, 2024
…ctordotdev#1407

Send batches to AWS Kinesis Data Streams and AWS Firehose independent of
ther parition keys.  In both API's batches of events do not need to
share the same partition key.
This makes the protocol more efficient, as by default the partition key
is a random key being different for every event.
steven-aerts added a commit to steven-aerts/vector that referenced this issue Jun 12, 2024
…ctordotdev#1407

Send batches to AWS Kinesis Data Streams and AWS Firehose independent of
ther parition keys.  In both API's batches of events do not need to
share the same partition key.
This makes the protocol more efficient, as by default the partition key
is a random key being different for every event.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sink: aws_kinesis_streams Anything `aws_kinesis_streams` sink related type: bug A code related bug.
Projects
None yet
Development

No branches or pull requests

3 participants