-
Notifications
You must be signed in to change notification settings - Fork 557
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE-2841] Transforms: Start consuming from an arbitrary offset (numeric from start/end or timestamp) #19975
base: dev
Are you sure you want to change the base?
[CORE-2841] Transforms: Start consuming from an arbitrary offset (numeric from start/end or timestamp) #19975
Conversation
b177085
to
78e09c0
Compare
78e09c0
to
612cd16
Compare
/ci-repeat 1 |
Thin proxy for `kafka::partition_proxy::start_offset` Signed-off-by: Oren Leiman <[email protected]>
612cd16
to
2d34443
Compare
@@ -154,6 +155,7 @@ The --var flag can be repeated to specify multiple variables like so: | |||
cmd.Flags().StringVar(&fc.functionName, "name", "", "The name of the transform") | |||
cmd.Flags().Var(&fc.env, "var", "Specify an environment variable in the form of KEY=VALUE") | |||
cmd.Flags().StringVar(&fc.compression, "compression", "", "Output batch compression type") | |||
cmd.Flags().StringVar(&fc.from_offset, "from-offset", "", "Process an input topic partition from this offset") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs docs in the long form help text. How does this work since offsets are per partition and deploys are for topics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There seems to be a string that is parsed on the backend. Generally I would recommend the parsing to be in rpk so the API is structured data only.
We should also note that this only works on the first deploy.
But also is there a use case for arbitrary start offsets? Or is it just beginning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs in the long form help text
Good catch, that's my oversight
how does this work since offsets are per partition
What I have currently is the ability to specify a positive delta from start or a negative delta from end. I would expect the most common usage to be "+00", meaning "begin processing from the start offset of each partition", so we can always add a "start" alias or something.
also is there a use case for arbitrary start offsets?
In practice, I don't know, though I suppose the same question applies to "start from timestamp". The motivating event for exposing this was user error leading to nominal data loss; so if the general vibe is around minor disaster recover, I think the added flexibility is probably good?
recommend the parsing to be in rpk
fair enough. i had something more structured in a previous iteration. moved to this while working through the timequery bug and sort of liked it, but if it's particularly unsavory i can wind it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put the structure back. looks similar to env vars parsing now.
2d34443
to
6ba341a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
src/v/model/transform.h
Outdated
@@ -102,14 +102,15 @@ struct transform_offset_options | |||
// | |||
// When a timestamp is used, a timequery is used to resolve the offset for | |||
// each partition. | |||
serde::variant<latest_offset, model::timestamp> position; | |||
serde::variant<latest_offset, model::timestamp, model::offset> position; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: offset_delta is a better fit? And please update the docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya I'm not exactly sure. I balked initially because the on-label use for offset_delta is translating between logical and physical offsets. It's all ints at the end of the day, so it felt a but better to use the more generic option. Will consider
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's conceptually a kafka::offset_delta
(rather than a model::offset_delta
or kafka|model::offset
) since we're adding it onto a kafka start/end offset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Near as I can tell model::offset_delta
is explicitly meant to hit overloads that perform the model::offset <-> kafka::offset conversion automatically, e.g.
/// \brief conversion from kafka offset to redpanda offset
inline constexpr model::offset
operator+(kafka::offset o, model::offset_delta d) {
return model::offset{o() + d()};
}
So yeah, I agree with Gellert. Maybe just add kafka::offset_delta
for specifically the purpose of staying w/in kafka
land.
} else if (format == "from_end") { | ||
offset_opts.position = model::offset{-value}; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else 400?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly speaking, covered by the validator, but ya probably wise
src/v/transform/api.cc
Outdated
// transform metadata (i.e. legacy deployments) won't traverse this code. | ||
// Otherwise, respect whatever offset was included in the request. | ||
if ( | ||
std::get_if<model::transform_offset_options::latest_offset>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
holds_alternative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah of course 🙃
// Set the transform to start processing new records starting now, | ||
// this is the default expectations for developers, as once deploy | ||
// completes, they should be able to produce without waiting for the | ||
// vm to start. If we start from the end of the log, then records | ||
// produced between now and the vm start would be skipped. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if by the same logic, we should be resolving the offset delta to a model::offset
here instead of doing it in transform::processor::load_latest_committed()
. For example, "offset from the end -0
" is equivalent to latest
yet they behave slightly differently. But maybe it's not a big deal since the processor is started "soon enough"?
Would it make sense to move the offset delta resolution and latest resolution up to the admin API handler and change the transform_offset_options
to hold a fixed position serde::variant<model::timestamp, model::offset> position
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The trouble is that offsets are committed per transform + partition, so we actually need to compute a concrete offset when each vm spins up. So I think load_latest_committed
is the right place for the calculation - "give me the last committed offset on this partition, and if none exists calculate one based on my config".
To the similarity between latest
and -0
, it's true, but latest
isn't exposed at the API layer. As I understand it, it was meant primarily as a compatibility default for deployed transforms that predate the options field. So "-oo from end" is inclusive of that behavior but intended as a user-facing knob.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I didn't think this through carefully. -0
will not give the same result as latest
. offset{"format": "from_end", "value": 0}
winds up as a named int64, so we will eventually interpret it as plain old 0
and start processing from the beginning of the partition.
We should cover for this somehow - probably just by rejecting --from-offset -0
in rpk. I don't think it's ever particularly useful compared to the default behavior. i.e. "commit offset at deploy time" vs "commit offset at VM start time".
6ba341a
to
0fcb3a5
Compare
force push contents:
|
model::offset_delta resolves arithmetic operator overloads that perform automatic conversions between kafka and model offsets. This commit introduces an offset_delta that is used specifically for applying a numeric delta to an existing kafka::offset without adjusting its type. Useful for transform start offset calculations. Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
{ "format": enum[timestamp, from_start, from_end], "value": int64 } Signed-off-by: Oren Leiman <[email protected]>
--from-offset to start from this offset * @t: start from UNIX timestamp (ms from epoch) * +oo: start offset + oo * -oo: latest ofset - oo Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
- Consume records that were produced before the deploy - Specify offsets that run off the end of the input topic - Ill-formed offsets Signed-off-by: Oren Leiman <[email protected]>
0fcb3a5
to
17b827b
Compare
force push empty diff (signoff) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I think we can clarify the data model and make this slightly easier to reason about
// | ||
// When an offset_delta is used, offset resolution depends on its sign: | ||
// - if delta < 0 - start at latest - abs(delta) (for each partition) | ||
// - if delta >= 0 - start at earliest + delta (for each partition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not to be a pain but - we could get rid of the awkwardness in rpk with -0 if we represented things more similar to the API (which is more verbose and leaning into the sumtype more)
by making latest_offset struct have a delta field meaning the delta from the end, and adding a new struct for earliest offset and adding a delta field there too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could translate latest_offset delta=0 to freeze offset like we do now, or we could leave latest offset and just have two new variants for from_beginning and from_end with (positive) deltas
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a pain...I thought about doing that, but it seemed a bit over complex at the time. I'm not opposed. You could argue that the current approach just pushes the complexity into code comments and across the api boundary anyway, which is not great.
auto offset = doc["offset"].GetObject(); | ||
auto format = ss::sstring{ | ||
offset["format"].GetString(), offset["format"].GetStringLength()}; | ||
auto value = offset["value"].GetInt64(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we validate this is positive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup
This PR wires up the ability to configure the start offset of a transform at deploy time. This can be either a unix timestamp (ms since epoch) or an offset delta (+oo from start offset of -oo from end).
Includes rpk experience.
Backports Required
Release Notes
Improvements