Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 714 with compression support #4721

Open
wants to merge 70 commits into
base: master
Choose a base branch
from

Conversation

anchitj
Copy link
Member

@anchitj anchitj commented May 17, 2024

No description provided.

milindl and others added 30 commits July 25, 2023 12:06
* WIP:Push telemetry is being scheduled now.

* Push

* Compliation Fix

* Working

* Use UUID in PUSH

* Remove fprintf

* Changes

* Fix CONFIGURATION.md

* Fix size

* Update s2i bounds

---------

Co-authored-by: Milind L <[email protected]>
* Add broker selection and client termination

* Address review comments
* Serialise metrics using nanopb

* Move nanopb and opentelemetry inside src

* Add metrics.options file

* Remove unused includes

* Style fix

* Fix formatting

* Skip copyright check

* Add nanopb and opentelemetry in windows vcxproj

* Include headers directories in CMAKE

* Use flexver with PushTelemetry

* Fix memory leaks

* Change import path

* Use rd_bool_t everywhere

* Fix librdkafka.vcxproj

* Use rd_bool_t

* PR Feedback

* Add nanopb license

* Include opentelemtry license
* Support for delta temporality

* Style fix

* Fix bugs

* Fix memory leaks and formatting

* Fixes

* PR Feedback
* Add telemetry encode and decode unit tests

* Style fix

* Improve test

* PR Feedback
* Add max telemetry bytes

* Clear telemetry_max_bytes

* PR comments
@anchitj anchitj requested a review from a team as a code owner June 21, 2024 13:41
Copy link
Collaborator

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First comments

src/rdkafka_telemetry_decode.c Outdated Show resolved Hide resolved
src/rdkafka_int.h Show resolved Hide resolved
src/rdkafka_request.c Outdated Show resolved Hide resolved
src/rdkafka_request.c Outdated Show resolved Hide resolved
src/rdkafka_telemetry_decode.c Outdated Show resolved Hide resolved
src/rdkafka_telemetry.c Outdated Show resolved Hide resolved
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka_telemetry_decode.c Outdated Show resolved Hide resolved
tests/0009-mock_cluster.c Outdated Show resolved Hide resolved
tests/0009-mock_cluster.c Outdated Show resolved Hide resolved
@anchitj anchitj force-pushed the dev_kip_714_mock_broker_integration_tests_master_merge_with_compression branch from 3accc47 to 604d18f Compare June 25, 2024 07:05
Copy link
Collaborator

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First the segfault to fix

src/rdkafka_telemetry_encode.c Outdated Show resolved Hide resolved
src/rdkafka_telemetry_encode.c Outdated Show resolved Hide resolved
src/rdkafka_telemetry_encode.c Outdated Show resolved Hide resolved
Comment on lines +80 to +81
*rd_kafka_telemetry_metric_value_calculator_t)(rd_kafka_t *,
rd_kafka_broker_t *);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's pass the timestamp so we use a uniform timestamp across all metrics.
Also please add the param names even if not required they're useful for clarity.

Suggested change
*rd_kafka_telemetry_metric_value_calculator_t)(rd_kafka_t *,
rd_kafka_broker_t *);
typedef rd_kafka_telemetry_metric_value_t (
*rd_kafka_telemetry_metric_value_calculator_t)(
rd_kafka_t *rk,
rd_kafka_broker_t *rkb_selected,
rd_ts_t now_nanos);

Comment on lines +484 to +498
if (rk->rk_type == RD_KAFKA_PRODUCER &&
(metric_idx ==
RD_KAFKA_TELEMETRY_METRIC_PRODUCER_NODE_REQUEST_LATENCY_AVG ||
metric_idx ==
RD_KAFKA_TELEMETRY_METRIC_PRODUCER_NODE_REQUEST_LATENCY_MAX)) {
return true;
}
if (rk->rk_type == RD_KAFKA_CONSUMER &&
(metric_idx ==
RD_KAFKA_TELEMETRY_METRIC_CONSUMER_NODE_REQUEST_LATENCY_AVG ||
metric_idx ==
RD_KAFKA_TELEMETRY_METRIC_CONSUMER_NODE_REQUEST_LATENCY_MAX)) {
return true;
}
return false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be added to the metric definition

Suggested change
if (rk->rk_type == RD_KAFKA_PRODUCER &&
(metric_idx ==
RD_KAFKA_TELEMETRY_METRIC_PRODUCER_NODE_REQUEST_LATENCY_AVG ||
metric_idx ==
RD_KAFKA_TELEMETRY_METRIC_PRODUCER_NODE_REQUEST_LATENCY_MAX)) {
return true;
}
if (rk->rk_type == RD_KAFKA_CONSUMER &&
(metric_idx ==
RD_KAFKA_TELEMETRY_METRIC_CONSUMER_NODE_REQUEST_LATENCY_AVG ||
metric_idx ==
RD_KAFKA_TELEMETRY_METRIC_CONSUMER_NODE_REQUEST_LATENCY_MAX)) {
return true;
}
return false;
return info[metric_idx].is_per_broker;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated and also fixed the bug at the initial loop where total_metrics_count is being calculated. We were passing the loop index instead of metrics_to_encode[i]

src/rdkafka_telemetry_encode.c Show resolved Hide resolved

total.doubleValue = 0;
TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
ts_last = rkb->rkb_c_historic.ts_last;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ts_last shoud be outside the loop and taken from rk:

Suggested change
ts_last = rkb->rkb_c_historic.ts_last;
ts_last = rk->rk_telemetry.rk_historic_c.ts_last;

src/rdkafka_telemetry_encode.c Show resolved Hide resolved
(cnt_diff * RDKAFKA_TELEMETRY_NS_TO_MS_FACTOR);
}
}
avg_throttle.intValue = sum_value / broker_count;
Copy link
Collaborator

@emasab emasab Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double_value instead of intValue, same for queue_time_avg. For the latter you have to divide by THREE_ORDERS_MAGNITUDE to go from µs to ms

Suggested change
avg_throttle.intValue = sum_value / broker_count;
avg_throttle.double_value = avg;

src/rdkafka_telemetry_encode.c Show resolved Hide resolved
src/rdkafka_telemetry_encode.c Show resolved Hide resolved
src/rdkafka_telemetry_encode.c Show resolved Hide resolved
Comment on lines +222 to +238

TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
rkb->rkb_c_historic.assigned_partitions = rkb->rkb_toppar_cnt;
rkb->rkb_c_historic.connects = rkb->rkb_c.connects.val;
rkb->rkb_c_historic.ts_last = rd_uclock() * 1000;
rkb->rkb_c_historic.connects = rkb->rkb_c.connects.val;
/* Only ra_v is being used to keep track of the metrics */
rkb->rkb_c_historic.rkb_avg_rtt.ra_v = rkb->rkb_avg_rtt.ra_v;
rd_atomic32_set(&rkb->rkb_avg_rtt.ra_v.maxv_reset, 1);
rkb->rkb_c_historic.rkb_avg_throttle.ra_v =
rkb->rkb_avg_throttle.ra_v;
rd_atomic32_set(&rkb->rkb_avg_throttle.ra_v.maxv_reset, 1);
rkb->rkb_c_historic.rkb_avg_outbuf_latency.ra_v =
rkb->rkb_avg_outbuf_latency.ra_v;
rd_atomic32_set(&rkb->rkb_avg_outbuf_latency.ra_v.maxv_reset,
1);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it just needs to set the ts_last in rk and the historic connect count.

Suggested change
TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
rkb->rkb_c_historic.assigned_partitions = rkb->rkb_toppar_cnt;
rkb->rkb_c_historic.connects = rkb->rkb_c.connects.val;
rkb->rkb_c_historic.ts_last = rd_uclock() * 1000;
rkb->rkb_c_historic.connects = rkb->rkb_c.connects.val;
/* Only ra_v is being used to keep track of the metrics */
rkb->rkb_c_historic.rkb_avg_rtt.ra_v = rkb->rkb_avg_rtt.ra_v;
rd_atomic32_set(&rkb->rkb_avg_rtt.ra_v.maxv_reset, 1);
rkb->rkb_c_historic.rkb_avg_throttle.ra_v =
rkb->rkb_avg_throttle.ra_v;
rd_atomic32_set(&rkb->rkb_avg_throttle.ra_v.maxv_reset, 1);
rkb->rkb_c_historic.rkb_avg_outbuf_latency.ra_v =
rkb->rkb_avg_outbuf_latency.ra_v;
rd_atomic32_set(&rkb->rkb_avg_outbuf_latency.ra_v.maxv_reset,
1);
}
rk->rk_telemetry.rk_historic_c.ts_last = now_ns;
TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
rkb->rkb_telemetry.rkb_historic_c.connects =
rd_atomic32_get(&rkb->rkb_c.connects);
}

NULL;
rd_ts_t now_ns = rd_uclock() * 1000;

int resource_attributes_count =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First thing it needs to do is to rollover the dedicated avgs, the ones used by the stats cb cannot be reused because they can be reset by the corresponding callback.
They must be incremented in the same places, for example:

                rd_avg_add(
                    &rkb->rkb_telemetry.rd_avg_current.rkb_avg_outbuf_latency,
                    rkbuf->rkbuf_ts_sent - rkbuf->rkbuf_ts_enq);
Suggested change
int resource_attributes_count =
if (rk->rk_telemetry.rk_historic_c.ts_start == 0) {
first_push = rd_true;
rk->rk_telemetry.rk_historic_c.ts_start = now_ns;
rk->rk_telemetry.rk_historic_c.ts_last = now_ns;
}
i = 0;
TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
rd_avg_rollover(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_rtt,
&rkb->rkb_telemetry.rd_avg_current.rkb_avg_rtt);
rd_avg_rollover(
&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_outbuf_latency,
&rkb->rkb_telemetry.rd_avg_current.rkb_avg_outbuf_latency);
rd_avg_rollover(
&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_throttle,
&rkb->rkb_telemetry.rd_avg_current.rkb_avg_throttle);
if (first_push) {
rkb->rkb_telemetry.rkb_historic_c.connects =
rd_atomic32_get(&rkb->rkb_c.connects);
}
i++;
}
int resource_attributes_count =

src/rdkafka_telemetry_encode.c Show resolved Hide resolved
src/rdkafka_telemetry_encode.c Show resolved Hide resolved
Comment on lines +645 to +649
for (i = 0; i < metrics_to_encode_count; i++) {
if (is_per_broker_metric(rk, (int)i)) {
total_metrics_count += rk->rk_broker_cnt.val - 1;
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes inside the read lock, just before Serializing metrics

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

src/rdkafka_telemetry_encode.c Show resolved Hide resolved
src/rdkafka_telemetry_encode.c Show resolved Hide resolved
Copy link
Collaborator

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments about struct changes, compression functions and ApiVersion

/* TODO: Use rd_list_t to store the metrics */
int *matched_metrics;
size_t matched_metrics_cnt;
} rk_telemetry;
Copy link
Collaborator

@emasab emasab Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you can store ts_start and ts_last

Suggested change
} rk_telemetry;
struct {
rd_ts_t ts_last; /**< Timestamp of last push */
rd_ts_t ts_start; /**< Timestamp from when collection
* started */
} rk_historic_c;
} rk_telemetry;

Comment on lines +196 to +210
struct {
rd_ts_t ts_last; /**< Timestamp of last push */
rd_ts_t ts_start; /**< Timestamp from when collection started */
int32_t assigned_partitions; /**< Current number of assigned
partitions. */
int32_t connects; /**< Connection attempts,
* successful or not. */
rd_avg_t rkb_avg_rtt; /* Current RTT period */
rd_avg_t rkb_avg_throttle; /* Current throttle period */
rd_avg_t rkb_avg_outbuf_latency; /**< Current latency
* between buf_enq0
* and writing to socket
*/

} rkb_c_historic;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the struct I'm referring to in the other comments, only connects is left in historic_c for each broker and we have two sets of avg for increasing the avg and the rollover.

Suggested change
struct {
rd_ts_t ts_last; /**< Timestamp of last push */
rd_ts_t ts_start; /**< Timestamp from when collection started */
int32_t assigned_partitions; /**< Current number of assigned
partitions. */
int32_t connects; /**< Connection attempts,
* successful or not. */
rd_avg_t rkb_avg_rtt; /* Current RTT period */
rd_avg_t rkb_avg_throttle; /* Current throttle period */
rd_avg_t rkb_avg_outbuf_latency; /**< Current latency
* between buf_enq0
* and writing to socket
*/
} rkb_c_historic;
struct {
struct {
int32_t connects; /**< Connection attempts,
* successful or not. */
} rkb_historic_c;
struct {
rd_avg_t rkb_avg_rtt; /* Current RTT avg */
rd_avg_t rkb_avg_throttle; /* Current throttle avg */
rd_avg_t
rkb_avg_outbuf_latency; /**< Current latency
* between buf_enq0
* and writing to socket
*/
} rd_avg_current;
struct {
rd_avg_t rkb_avg_rtt; /**< Rolled over RTT avg */
rd_avg_t rkb_avg_throttle; /**< Rolled over throttle avg */
rd_avg_t
rkb_avg_outbuf_latency; /**< Rolled over outbuf
* latency avg */
} rd_avg_rollover;
} rkb_telemetry;

int16_t maxver,
int *featuresp) {
static int16_t
rd_kafka_broker_ApiVersion_supported_implementation(rd_kafka_broker_t *rkb,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be rd_kafka_broker_ApiVersion_supported0, the function that takes all the parameters, without changing the naming convention for now.

Suggested change
rd_kafka_broker_ApiVersion_supported_implementation(rd_kafka_broker_t *rkb,
rd_kafka_broker_ApiVersion_supported0(rd_kafka_broker_t *rkb,

* @enum Telemetry States
*/
typedef enum {
/**< Initial state, awaiting telemetry broker to be assigned */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's /** , because /**< is used when the documentation comes after the field

Suggested change
/**< Initial state, awaiting telemetry broker to be assigned */
/** Initial state, awaiting telemetry broker to be assigned */

Comment on lines +251 to +252
/**< Client is being terminated and last PushTelemetry is scheduled to
send */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding an asterisk at the beginning of the line is not required by Doxygen format but it helps readability

Suggested change
/**< Client is being terminated and last PushTelemetry is scheduled to
send */
/**< Client is being terminated and last PushTelemetry is scheduled to
* send */

Comment on lines +840 to +850
rd_kafka_dbg(rk, TELEMETRY, "RD_KAFKA_TELEMETRY_METRICS_INFO",
"Encoding failed: %s", PB_GET_ERROR(&stream));
rd_free(buffer);
free_metrics(metrics, metric_names, data_points,
datapoint_attributes_key_values,
total_metrics_count);
free_resource_attributes(resource_attributes_key_values,
resource_attributes_struct,
resource_attributes_count);

return NULL;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once you do rd_buf_t changes and add the label:

Suggested change
rd_kafka_dbg(rk, TELEMETRY, "RD_KAFKA_TELEMETRY_METRICS_INFO",
"Encoding failed: %s", PB_GET_ERROR(&stream));
rd_free(buffer);
free_metrics(metrics, metric_names, data_points,
datapoint_attributes_key_values,
total_metrics_count);
free_resource_attributes(resource_attributes_key_values,
resource_attributes_struct,
resource_attributes_count);
return NULL;
rd_kafka_dbg(rk, TELEMETRY, "PUSH", "Encoding failed: %s",
PB_GET_ERROR(&stream));
rd_buf_destroy_free(rbuf);
goto fail;

Comment on lines +852 to +856
free_metrics(metrics, metric_names, data_points,
datapoint_attributes_key_values, total_metrics_count);
free_resource_attributes(resource_attributes_key_values,
resource_attributes_struct,
resource_attributes_count);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this after reset_historical_metrics with a rd_kafka_rdunlock

Suggested change
free_metrics(metrics, metric_names, data_points,
datapoint_attributes_key_values, total_metrics_count);
free_resource_attributes(resource_attributes_key_values,
resource_attributes_struct,
resource_attributes_count);
free_metrics(metrics, metric_names, data_points,
datapoint_attributes_key_values, total_metrics_count);
free_resource_attributes(resource_attributes_key_values,
resource_attributes_struct,
resource_attributes_count);
rd_kafka_rdunlock(rk);
return rbuf;
fail:
free_metrics(metrics, metric_names, data_points,
datapoint_attributes_key_values, total_metrics_count);
free_resource_attributes(resource_attributes_key_values,
resource_attributes_struct,
resource_attributes_count);
rd_kafka_rdunlock(rk);
return NULL;

Comment on lines +450 to +468
size_t metrics_payload_size = 0, compressed_metrics_payload_size = 0;
void *metrics_payload =
rd_kafka_telemetry_encode_metrics(rk, &metrics_payload_size),
*compressed_metrics_payload = NULL;
rd_kafka_compression_t compression_used = RD_KAFKA_COMPRESSION_NONE;

if (rk->rk_telemetry.accepted_compression_types_cnt != 0) {
compression_used = rd_kafka_push_telemetry_payload_compress(
rk, rkb, metrics_payload, metrics_payload_size,
&compressed_metrics_payload,
&compressed_metrics_payload_size);
} else {
rd_kafka_dbg(rk, TELEMETRY, "PUSHSENT",
"No compression types accepted, sending "
"uncompressed payload");
compressed_metrics_payload = metrics_payload;
metrics_payload = NULL;
compressed_metrics_payload_size = metrics_payload_size;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can pass the rd_buf_t for compression here.

Suggested change
size_t metrics_payload_size = 0, compressed_metrics_payload_size = 0;
void *metrics_payload =
rd_kafka_telemetry_encode_metrics(rk, &metrics_payload_size),
*compressed_metrics_payload = NULL;
rd_kafka_compression_t compression_used = RD_KAFKA_COMPRESSION_NONE;
if (rk->rk_telemetry.accepted_compression_types_cnt != 0) {
compression_used = rd_kafka_push_telemetry_payload_compress(
rk, rkb, metrics_payload, metrics_payload_size,
&compressed_metrics_payload,
&compressed_metrics_payload_size);
} else {
rd_kafka_dbg(rk, TELEMETRY, "PUSHSENT",
"No compression types accepted, sending "
"uncompressed payload");
compressed_metrics_payload = metrics_payload;
metrics_payload = NULL;
compressed_metrics_payload_size = metrics_payload_size;
}
rd_buf_t *metrics_payload = rd_kafka_telemetry_encode_metrics(rk);
size_t compressed_metrics_payload_size = 0;
void *compressed_metrics_payload = NULL;
rd_kafka_compression_t compression_used =
rd_kafka_push_telemetry_payload_compress(
rk, rkb, metrics_payload, &compressed_metrics_payload,
&compressed_metrics_payload_size);
if (compressed_metrics_payload_size >
(size_t)rk->rk_telemetry.telemetry_max_bytes) {
rd_kafka_log(rk, LOG_WARNING, "TELEMETRY",
"Metrics payload size %" PRIdsz
" exceeds telemetry_max_bytes %" PRId32
"specified by the broker.",
compressed_metrics_payload_size,
rk->rk_telemetry.telemetry_max_bytes);
}

0, RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_handle_PushTelemetry,
NULL);

rd_free(compressed_metrics_payload);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can free the rd_buf_t here and in case it was compressed, the compressed payload.

Suggested change
rd_free(compressed_metrics_payload);
rd_buf_destroy_free(metrics_payload);
if (compression_used != RD_KAFKA_COMPRESSION_NONE)
rd_free(compressed_metrics_payload);

Comment on lines +370 to +443
static rd_kafka_compression_t
rd_kafka_push_telemetry_payload_compress(rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
void *payload,
size_t payload_len,
void **compressed_payload,
size_t *compressed_payload_size) {
rd_kafka_compression_t compression_used = RD_KAFKA_COMPRESSION_NONE;
int i, r = -1;

for (i = 0; i < (int)rk->rk_telemetry.accepted_compression_types_cnt;
i++) {
rd_kafka_compression_t compression_type =
rk->rk_telemetry.accepted_compression_types[i];

switch (compression_type) {
#if WITH_ZLIB
case RD_KAFKA_COMPRESSION_GZIP:
r = rd_kafka_compress_gzip(rkb, payload, payload_len,
compressed_payload,
compressed_payload_size);
compression_used = RD_KAFKA_COMPRESSION_GZIP;
break;
#endif
case RD_KAFKA_COMPRESSION_LZ4:
// TODO: Using 0 for compression level for now.
r = rd_kafka_lz4_compress_direct(
rkb, 0, payload, payload_len, compressed_payload,
compressed_payload_size);
compression_used = RD_KAFKA_COMPRESSION_LZ4;
break;
#if WITH_ZSTD
case RD_KAFKA_COMPRESSION_ZSTD:
// TODO: Using 0 for compression level for now.
r = rd_kafka_zstd_compress_direct(
rkb, 0, payload, payload_len, compressed_payload,
compressed_payload_size);
compression_used = RD_KAFKA_COMPRESSION_ZSTD;
break;
#endif
#if WITH_SNAPPY
case RD_KAFKA_COMPRESSION_SNAPPY:
r = rd_kafka_compress_snappy(rkb, payload, payload_len,
compressed_payload,
compressed_payload_size);
compression_used = RD_KAFKA_COMPRESSION_SNAPPY;
break;
#endif
default:
break;
}
if (compression_used != RD_KAFKA_COMPRESSION_NONE && r == 0) {
rd_kafka_dbg(
rk, TELEMETRY, "PUSH",
"Compressed payload of size %" PRIusz " to %" PRIusz
" using compression type "
"%s",
payload_len, *compressed_payload_size,
rd_kafka_compression2str(compression_used));
rd_free(payload);
return compression_used;
}
}
if (compression_used != RD_KAFKA_COMPRESSION_NONE && r == -1) {
rd_kafka_dbg(rk, TELEMETRY, "PUSHERR",
"Failed to compress payload with available "
"compression types");
}
rd_kafka_dbg(rk, TELEMETRY, "PUSH", "Sending uncompressed payload");

*compressed_payload = payload;
*compressed_payload_size = payload_len;
return RD_KAFKA_COMPRESSION_NONE;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use existing compression functions here or set the pointers to payload->rbuf_wpos->seg_p and payload->rbuf_wpos->seg_of in case no compression was applied.

Suggested change
static rd_kafka_compression_t
rd_kafka_push_telemetry_payload_compress(rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
void *payload,
size_t payload_len,
void **compressed_payload,
size_t *compressed_payload_size) {
rd_kafka_compression_t compression_used = RD_KAFKA_COMPRESSION_NONE;
int i, r = -1;
for (i = 0; i < (int)rk->rk_telemetry.accepted_compression_types_cnt;
i++) {
rd_kafka_compression_t compression_type =
rk->rk_telemetry.accepted_compression_types[i];
switch (compression_type) {
#if WITH_ZLIB
case RD_KAFKA_COMPRESSION_GZIP:
r = rd_kafka_compress_gzip(rkb, payload, payload_len,
compressed_payload,
compressed_payload_size);
compression_used = RD_KAFKA_COMPRESSION_GZIP;
break;
#endif
case RD_KAFKA_COMPRESSION_LZ4:
// TODO: Using 0 for compression level for now.
r = rd_kafka_lz4_compress_direct(
rkb, 0, payload, payload_len, compressed_payload,
compressed_payload_size);
compression_used = RD_KAFKA_COMPRESSION_LZ4;
break;
#if WITH_ZSTD
case RD_KAFKA_COMPRESSION_ZSTD:
// TODO: Using 0 for compression level for now.
r = rd_kafka_zstd_compress_direct(
rkb, 0, payload, payload_len, compressed_payload,
compressed_payload_size);
compression_used = RD_KAFKA_COMPRESSION_ZSTD;
break;
#endif
#if WITH_SNAPPY
case RD_KAFKA_COMPRESSION_SNAPPY:
r = rd_kafka_compress_snappy(rkb, payload, payload_len,
compressed_payload,
compressed_payload_size);
compression_used = RD_KAFKA_COMPRESSION_SNAPPY;
break;
#endif
default:
break;
}
if (compression_used != RD_KAFKA_COMPRESSION_NONE && r == 0) {
rd_kafka_dbg(
rk, TELEMETRY, "PUSH",
"Compressed payload of size %" PRIusz " to %" PRIusz
" using compression type "
"%s",
payload_len, *compressed_payload_size,
rd_kafka_compression2str(compression_used));
rd_free(payload);
return compression_used;
}
}
if (compression_used != RD_KAFKA_COMPRESSION_NONE && r == -1) {
rd_kafka_dbg(rk, TELEMETRY, "PUSHERR",
"Failed to compress payload with available "
"compression types");
}
rd_kafka_dbg(rk, TELEMETRY, "PUSH", "Sending uncompressed payload");
*compressed_payload = payload;
*compressed_payload_size = payload_len;
return RD_KAFKA_COMPRESSION_NONE;
}
static rd_kafka_compression_t
rd_kafka_push_telemetry_payload_compress(rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_buf_t *payload,
void **compressed_payload,
size_t *compressed_payload_size) {
rd_kafka_compression_t compression_used = RD_KAFKA_COMPRESSION_NONE;
rd_slice_t payload_slice;
size_t i;
rd_kafka_resp_err_t r = RD_KAFKA_RESP_ERR_NO_ERROR;
rd_slice_init_full(&payload_slice, payload);
for (i = 0; i < rk->rk_telemetry.accepted_compression_types_cnt; i++) {
rd_kafka_compression_t compression_type =
rk->rk_telemetry.accepted_compression_types[i];
switch (compression_type) {
#if WITH_ZLIB
case RD_KAFKA_COMPRESSION_GZIP:
/* TODO: Using 0 for compression level for now. */
r = rd_kafka_gzip_compress(rkb, 0, &payload_slice,
compressed_payload,
compressed_payload_size);
compression_used = RD_KAFKA_COMPRESSION_GZIP;
break;
#endif
case RD_KAFKA_COMPRESSION_LZ4:
/* TODO: Using 0 for compression level for now. */
r = rd_kafka_lz4_compress(
rkb, rd_true, 0, &payload_slice, compressed_payload,
compressed_payload_size);
compression_used = RD_KAFKA_COMPRESSION_LZ4;
break;
#if WITH_ZSTD
case RD_KAFKA_COMPRESSION_ZSTD:
/* TODO: Using 0 for compression level for now. */
r = rd_kafka_zstd_compress(rkb, 0, &payload_slice,
compressed_payload,
compressed_payload_size);
compression_used = RD_KAFKA_COMPRESSION_ZSTD;
break;
#endif
#if WITH_SNAPPY
case RD_KAFKA_COMPRESSION_SNAPPY:
r = rd_kafka_snappy_compress(rkb, &payload_slice,
compressed_payload,
compressed_payload_size);
compression_used = RD_KAFKA_COMPRESSION_SNAPPY;
break;
#endif
default:
break;
}
if (compression_used != RD_KAFKA_COMPRESSION_NONE &&
r == RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_dbg(
rk, TELEMETRY, "PUSH",
"Compressed payload of size %" PRIusz " to %" PRIusz
" using compression type "
"%s",
payload->rbuf_size, *compressed_payload_size,
rd_kafka_compression2str(compression_used));
return compression_used;
}
}
if (compression_used != RD_KAFKA_COMPRESSION_NONE &&
r != RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_dbg(rk, TELEMETRY, "PUSH",
"Failed to compress payload with available "
"compression types");
}
rd_kafka_dbg(rk, TELEMETRY, "PUSH", "Sending uncompressed payload");
*compressed_payload = payload->rbuf_wpos->seg_p;
*compressed_payload_size = payload->rbuf_wpos->seg_of;
return RD_KAFKA_COMPRESSION_NONE;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants