Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction manager refactoring #19889

Draft
wants to merge 15 commits into
base: dev
Choose a base branch
from

Conversation

mmaslankaprv
Copy link
Member

@mmaslankaprv mmaslankaprv commented Jun 19, 2024

Major refactor of Redpanda transaction coordinator. The refactoring involves replication of every transaction state update into coordinator partition. The changes involve fixing the transaction state machine and providing a strict and centralised definition of possible transaction state transitions. Replicating each transaction state update allowed us to drop the tx_stm_cache and tx_stm_cache_manager completely.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v24.1.x
  • v23.3.x
  • v23.2.x

Release Notes

  • none

@mmaslankaprv mmaslankaprv marked this pull request as draft June 19, 2024 09:09
@mmaslankaprv mmaslankaprv force-pushed the tx-tm-stm-refector branch 3 times, most recently from 1a1c84c to 169342f Compare June 25, 2024 12:30
@mmaslankaprv
Copy link
Member Author

/dt

@mmaslankaprv mmaslankaprv force-pushed the tx-tm-stm-refector branch 2 times, most recently from a99252a to cb3023c Compare June 25, 2024 15:06
@mmaslankaprv
Copy link
Member Author

/dt

@mmaslankaprv
Copy link
Member Author

/dt

@mmaslankaprv mmaslankaprv force-pushed the tx-tm-stm-refector branch 2 times, most recently from a7ebc00 to 70d07d1 Compare June 28, 2024 13:43
@mmaslankaprv
Copy link
Member Author

/dt

Previously the test was not testing what it expected to test as the
error was validated against was incorrect. The test passed as the
transaction was already timeouted when group was finally rebalanced.

Signed-off-by: Michał Maślanka <[email protected]>
When producers are evicted from the `rm_stm` the timer is fired based on
the requested eviction timeout. Added an upper bound on the frequency of
timer fires to prevent busy loops.

Signed-off-by: Michał Maślanka <[email protected]>
The epoch is exhausted if it is equal to the max possible epoch - 1.

Signed-off-by: Michał Maślanka <[email protected]>
Introduced an error code indicating existence of transaction concurrent
to the one client is interested in. Concurrent transactions error
indicates that client should retry without requesting a metadata
update.

Signed-off-by: Michał Maślanka <[email protected]>
Unified mapping of transactional error codes to kafka protocol error
codes.

Signed-off-by: Michał Maślanka <[email protected]>
The dependency on `cluster::controller` is not required as it is enough
for the gateway to know current node id.

Signed-off-by: Michał Maślanka <[email protected]>
Replaced absl flat hash maps with chunked version to prevent memory
fragmentation.

Signed-off-by: Michał Maślanka <[email protected]>
When transaction is expired on one of the partitions the coordinator is
requested to send abort requests to other transactions participants.
Changed the test to account for that logic instead of requesting each
data partition separately to abort the transaction.

Signed-off-by: Michał Maślanka <[email protected]>
Now the transaction state validation is more strict so the coordinator
started to send more appropriate error codes.

Signed-off-by: Michał Maślanka <[email protected]>
Added a function validating state transition of transaction FSM.

Signed-off-by: Michał Maślanka <[email protected]>
Previously add partition to transaction and add group to transaction
data batches were not persisted in transaction manager log. This made
transactions implementation vulnerable to coordinator leadership
changes. Changed the code to base on replicating each transaction state
update.

This commit includes a major refactoring of transactions subsystem
including:
- `tx_status` values rename
- dropping the tm_stm_cache and moving the state to tm_stm
- refining the logic of transaction coordinator and gateway

Signed-off-by: Michał Maślanka <[email protected]>
@mmaslankaprv
Copy link
Member Author

/dt

case cluster::tx::errc::request_rejected:
case cluster::tx::errc::unknown_server_error:
return error_code::unknown_server_error;
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: redundant break.

Comment on lines +151 to +154
case cluster::tx::errc::preparing_rebalance:
case cluster::tx::errc::rebalance_in_progress:
case cluster::tx::errc::coordinator_load_in_progress:
case cluster::tx::errc::request_rejected:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these seem to be retriable at a high level but are mapped to USE which is terminal state and requires producer reset?

@@ -113,4 +114,48 @@ constexpr error_code map_topic_error_code(cluster::errc code) {
return error_code::unknown_server_error;
}

constexpr error_code map_tx_errc(cluster::tx::errc ec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonder if this belongs in tx_err.h? kafka/server/errors.h depending on (internal) tx_err seems a bit odd.

@@ -29,7 +29,7 @@ class CompactionE2EIdempotencyTest(RedpandaTest):
def __init__(self, test_context):
extra_rp_conf = {}

self.segment_size = 5 * 1024 * 1024
self.segment_size = 1 * 1024 * 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why?

@@ -481,7 +481,7 @@ def expired_tx_test(self):
assert False, "tx is expected to be expired"
except ck.cimpl.KafkaException as e:
kafka_error = e.args[0]
assert kafka_error.code() == ck.cimpl.KafkaError._FENCED
assert kafka_error.code() == ck.cimpl.KafkaError.INVALID_TXN_STATE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this commit supposed to come after your (deeper) changes? or is it because of changes in error code mapping commit?

Copy link
Contributor

@bharathv bharathv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still looking at the last two (main) commits, fleshed out the remaining comments.

return "PrepareAbort";
case tx_status::killed:
case tx_status::expired:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://redpandadata.slack.com/archives/C01ND4SVB6Z/p1719592501245829

I don't know this mapping is 100% correct, at-least confusing users.

preparing_abort = 3,
/**
* The same as the preparing_abort but when transaction is aborted by
* internals of Redpand f.e. timeout or transactional id limit overflow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo Redpanda

@@ -15,16 +15,63 @@
namespace cluster {

enum tx_status : int32_t {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fsm is a great candidate for ascii art :D

preparing_commit = 1,
/**
* Transaction is completed commit when its participants committed the
* transaction. This state is not replicated as committing data on
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This state is not replicated

I see finish_transaction() is replicating these, did I miss something or is this comment stale?

, to(to) {}

bool is_state_transition_valid(
const tx_metadata& current, tx_status requested_status) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: requested_status -> target_status?

*/
completed_abort = 7,
/**
* Transaction is about to be removed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a little more detail here may be useful, under what context is this state possible.

case empty:
// ready is an initial state a transaction can never go back to that
// state
return is_one_of(current.status, tx_status::empty);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think one confusing bit here is the state transition from to itself (current == target) is a valid transition. Is that intentional? That seems to be the case for all the states.

Comment on lines +46 to +47
tx_status::completed_abort,
tx_status::completed_commit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this transition is a bit non intuitive IMO, especially when we say empty is the begin state.

case expired:
return is_one_of(
current.status,
tx_status::empty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while empty -> expired makes sense I wonder if that is possible. Currently auto expiration is only possible if there is a partition added (it is initiated by the rm_stm), without a partition no one triggers it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants