-
Notifications
You must be signed in to change notification settings - Fork 557
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
group/transactions: compaction fixes #19931
Conversation
/dt |
new failures in https://buildkite.com/redpanda/redpanda/builds/50483#01903499-4b14-4281-8fe3-3ff604ca2c45:
new failures in https://buildkite.com/redpanda/redpanda/builds/50483#01903499-4b16-46f1-8744-bc44f9f0834e:
new failures in https://buildkite.com/redpanda/redpanda/builds/50483#01903499-4b18-47bc-b679-07b621e5740a:
new failures in https://buildkite.com/redpanda/redpanda/builds/50498#01903792-80c0-4daa-944f-8c8e5e87664a:
new failures in https://buildkite.com/redpanda/redpanda/builds/50498#01903792-80c2-4d61-a546-f2c6ec6bc20b:
new failures in https://buildkite.com/redpanda/redpanda/builds/50509#01903998-bf65-426e-b1a1-611a9db57536:
new failures in https://buildkite.com/redpanda/redpanda/builds/50509#01903998-bf67-47c0-8473-c9523c31fcdf:
|
/dt |
ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/50498#01903790-d830-4921-8d52-a33e3366e545 |
/dt |
/dt |
/ci-repeat 3 |
cb72c7f
to
8e293de
Compare
return std::tie(begin_offsets, producer_to_begin); | ||
} | ||
}; | ||
using all_txs_t = absl::btree_map<kafka::group_id, per_group_state>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason not to use chunked_hash_map
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chm is not copyable because of the backing frag_vector, the code to snapshot required iteration, given this scale is very small here, used absl variants.
template<class Base> | ||
class group_data_parser { | ||
protected: | ||
ss::future<> parse(model::record_batch b) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks to me that we do not have to return a future
from all the handle methods ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its needed for handle_raft_data but not for the rest of them, I just kept it consistent as a 'future' proof incase some 'future' code wants to do something interesting than adding to some in memory state.
Last force push is a rebase to fix conflicts from recent group changes. |
|
||
bool group_tx_tracker_stm_factory::is_applicable_for( | ||
const storage::ntp_config& config) const { | ||
auto nt = model::topic_namespace{config.ntp().ns, config.ntp().tp.topic}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we copy topic and namespace for each single batch, maybe we can directly compare topic and namespace or use model::topic_namespace_view
public: | ||
group_data_parser() { | ||
static_assert( | ||
GroupDataParserBase<Base>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we just write template<GroupDataParserBase Base>
instead of a static assert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concept is called when the template is initialized and by then non virtual members are not initialized, so the type is really incomplete at that point, so the compiler complains. By the time constructor runs that is not the case, hence the static assert.
src/v/kafka/server/group.h
Outdated
@@ -884,6 +884,11 @@ class group final : public ss::enable_lw_shared_from_this<group> { | |||
std::vector<model::topic_partition> | |||
get_expired_offsets(std::chrono::seconds retention_period); | |||
|
|||
bool use_new_fence_batch_type() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'm always suspicious of using "new" in naming anything because after a year this thing is not so new anymore so it is less clear what "new" is referring to :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed and clarified in a comment.
src/v/features/feature_table.h
Outdated
@@ -77,6 +77,7 @@ enum class feature : std::uint64_t { | |||
cluster_topic_manifest_format_v2 = 1ULL << 45U, | |||
node_local_core_assignment = 1ULL << 46U, | |||
unified_tx_state = 1ULL << 47U, | |||
group_fence_batch_type_switch = 1ULL << 48U, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a bit unclear to me what "type switch" here means? I guess the incompatible change here is that we are using a new batch type, and it would be nice to consistently use a single name in all the places that are referring to it.
|
||
model::offset group_tx_tracker_stm::max_collectible_offset() { | ||
auto result = last_applied_offset(); | ||
for (const auto& [_, group_state] : _all_txs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I wonder what the scale here is? Maybe we should just recalculate it once in maybe_end_tx?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few 100s to max 1000 at the higher end I can think of. Also do note this inflight state, that is groups with open transaction (not historical groups in the log), so very unlikely this is in double digits IMO. Adding to that this is only called from compaction patch attempting to compute the compaction boundary, which is not a hot path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, sounds good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally the compaction stuff looks good. I don't fully understand the nuances of the new batch type, so deferring to others there, but IIUC we are free to remove the new group tx fences, right?
src/v/model/record_batch_types.h
Outdated
@@ -50,6 +50,7 @@ enum class record_batch_type : int8_t { | |||
= 29, // place holder for last batch in a segment that was aborted | |||
role_management_cmd = 30, // role management command | |||
client_quota = 31, // client quota command | |||
group_fence_tx = 32, // fence batch in group transactions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this go into offset_translator_batch_types? Is the offsets topic consumable with Kafka offsets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in the consumer offsets topic which doesn't have kafka based consumption (non data partition)
// Wait until all segments are compacted and only two remain | ||
co_await log->flush(); | ||
co_await log->force_roll(ss::default_priority_class()); | ||
co_await tests::cooperative_spin_wait_with_timeout(30s, [&]() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might be able to use RPTEST_REQUIRE_EVENTUALLY_CORO here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ya I was trying to remember this name but couldn't, thanks, will switch.
@@ -367,105 +367,50 @@ ss::future<> index_rebuilder_reducer::do_index(model::record_batch&& b) { | |||
}); | |||
} | |||
|
|||
void tx_reducer::consume_aborted_txs(model::offset upto) { | |||
void tx_reducer::refresh_ongoing_aborted_txs(const model::record_batch& b) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prepare batches are no longer used
What does this mean for old topics that have legacy prepare batches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they were in pre-transactions-GA code ( > 2years ago), so we don't really support them anymore (after 22.3.0)
.. into a separate utility so it can be shared by multiple code paths that parse this log data.
This helps clamp the max collectible offset on group partitions to not exceed earliest open trasnaction.
Prior to this change group_tx_fence shared the fence record batch type with data partitions (tx_fence). This made compaction logic complicated particularly because different compaction rules applied for fence batch in groups and data paritions. With the new feature, group fence has a separate dedicated batch type so it is easy to diambiguate both fence types.
Remove all control batch parsing. Control batches cannot be compacted (after redpanda-data#15404) and prepare batches are no longer used, most branches are dead code here. No logical changes.
We will add another transactional type for consumer offsets in a later commit. Renaming to disambiguate differnt transactional stms
Enables compaction of group transaction markers which was broker prior to this change. Group transaction batches are ommitted in the compaction reducer pass, check the code comments for more details.
The test generates interleavings of transactions across multiple groups in the same consumer offsets partition and repeatedly compacts and checks for some invariants.
// committed data has already been rewritten as separate raft_data batches, | ||
// so no need to retain originally written group_prepare_tx batches while | ||
// the transaction is in progress. | ||
return is_compactible_control_batch(b.header().type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So simple!
I wonder why we needed a separate batch type - if we know that we are in the consumer offsets topic, can't we just compact all fence batch types? Is this because in theory ordinary user transactions are also possible for the consumer offsets topic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, this is more of a code structuring problem because we don't have the "topic context" in all places, that is fixable with some (ugly) refactoring but even otherwise there is no reason to use the same batch type for fence in data & consumer offsets especially when every other batch type (data, commit/abort) are different. This change simplifies things because we can now just look at the batch header and say whether it is compactible and all the batch types are neatly grouped by topic (no overlap).
} | ||
|
||
private: | ||
ss::future<> parse_fence(model::record_batch b) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would probably be worth trying to move some of these implementations out of the header and into .cc files.
Fixes two main issues related to compaction of group transactions:
These issues are addressed by adding a new state machine that enforces max_collectible_offset and then allowing compaction of control batch types from group transactions.
Backports Required
Release Notes
Bug Fixes