Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws_kinesis input: shards are not processed if they are closed #2649

Open
j0shthomas opened this issue Jun 11, 2024 · 4 comments
Open

aws_kinesis input: shards are not processed if they are closed #2649

j0shthomas opened this issue Jun 11, 2024 · 4 comments
Labels
bug inputs Any tasks or issues relating specifically to inputs

Comments

@j0shthomas
Copy link
Contributor

When kinesis a kinesis shard is split (eg. when scaling) it marks the parent shard as CLOSED and client writes start going to the child shards.
Benthos does not appear to be reading from CLOSED shards (unless shards are explicitly set on the input) due to this line https://github.com/redpanda-data/connect/blob/main/internal/impl/aws/input_kinesis.go#L630.

There may still be data from the parent shards which has not yet been processed so this data will be "lost". With kinesis you can tell once all the data has been read by the EndingSequenceNumber . https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-after-resharding.html

@j0shthomas j0shthomas changed the title aws_kinesis input: shards are not processed once they are closed aws_kinesis input: shards are not processed if they are closed Jun 11, 2024
@Jeffail Jeffail added bug inputs Any tasks or issues relating specifically to inputs labels Jun 11, 2024
@Jeffail
Copy link
Collaborator

Jeffail commented Jun 11, 2024

Hey @j0shthomas, just to confirm my understanding, would the correct behaviour here be to continue reading the shard until our current sequence number matches that of the EndingSequenceNumber?

@j0shthomas
Copy link
Contributor Author

Yes that is my understanding, that would mean that all the data has been read from the shard and no new data will be written

@Jeffail
Copy link
Collaborator

Jeffail commented Jun 11, 2024

Okay, I've had a little refresher course on the code. It looks as though this is specifically an issue when bootstrapping shard allocation after one of these splits. Currently, we check for the presence of that ending sequence number as a way of detecting finished shards, as otherwise we would be constantly busy-looping allocations to those finished shards.

Unfortunately, when we're scanning for any unallocated shards we don't have the context as to what the final sequence was from our checkpointers, as that's stored in dynamodb. A naive approach would be to call dynamodb for every single unallocated and "ended" shard each time we rebalance in order to determine if they're fully consumed or not. Since that would be extremely wasteful we would instead need to do some in-memory caching of that information such that subsequence shard listings would reuse that information.

To reproduce this issue one would need to:

  1. Populate one or more shards with data
  2. Split the shards (without any benthos consumers running)
  3. Run the benthos consumers

If you have benthos instances running during the splitting then it's likely the data would still be fully consumed (unless there's a severe backlog), which is probably why this hasn't been noticed in the wild.

It still continues to amaze me just how much logic a consumer of kinesis needs to implement just for the most basic use cases, and how outstandingly inadequate the documentation is for doing exactly that. If anyone is willing to take on this work then please let us know in this issue, otherwise I'll try and tackle it soon.

@j0shthomas
Copy link
Contributor Author

If the shard has ended but still has a claim on it and enough time has passed since the LeaseTimeout, it will be kept as an unclaimed shard.

However, if the service is shutdown, the client IDs are removed from dynamodb so then it won't be picked up the next time the service starts.
Does the service need to remove the client IDs when shutting down? If they were kept I don't think we would run into this issue.

Once a shard is fully consumed it is deleted from dynamodb (it would help debugging if this was a soft delete), so RunExplicitShards will re consume from the beginning but RunBalancedShards will ignore it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug inputs Any tasks or issues relating specifically to inputs
Projects
None yet
Development

No branches or pull requests

2 participants