Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ksql interprets enums in ProtoBuf file incorrectly #10358

Open
ja-softdevel opened this issue Jun 4, 2024 · 0 comments
Open

Ksql interprets enums in ProtoBuf file incorrectly #10358

ja-softdevel opened this issue Jun 4, 2024 · 0 comments

Comments

@ja-softdevel
Copy link

ja-softdevel commented Jun 4, 2024

Versions

cp-ksqldb-server           7.6.0-2-ubi8 
cp-schema-registry         7.6.0-2-ubi8 

Describe the bug
When using a Protobuf schema from the schema reg, ksql interprets enum fields as VARCHAR(STRING) instead of BIGINT/INTEGER.
This causes a lot of issues and I have another topic/stream for which the kafka messages are based on a rather large Protobuf file. So I can't manually write out the CREATE STREAM portion.

Sample of protobuf file:

message Message {
    Command command = 1;
    ....
    Category category = 22;
}
enum Command {
    UPDATE = 0;
    DELETE = 1;
}
enum Category {
    UNK = 0;
    ....
}

describing stream where I give the CREATE STREAM all the columns and types (this returns kafka messages)

ksql> describe input_stream extended;

Name                 : INPUT_STREAM
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : PROTOBUF_NOSR
Kafka topic          : input (partitions: 10, replication: 1)
Statement            : CREATE STREAM INPUT_STREAM (COMMAND BIGINT, ... CATEGORY BIGINT) WITH (CLEANUP_POLICY='delete', KAFKA_TOPIC='input', KEY_FORMAT='KAFKA', VALUE_FORMAT='PROTOBUF_NOSR');


 Field       | Type
-------------------------------
 COMMAND     | BIGINT
 ....
 CATEGORY    | BIGINT
-------------------------------

describing stream where I create the stream with (this does not return kafka messages:

CREATE STREAM INPUT_STREAM_2 WITH (KAFKA_TOPIC='input', KEY_FORMAT='KAFKA', VALUE_FORMAT='PROTOBUF', VALUE_SCHEMA_FULL_NAME='Message');

ksql> describe input_stream_2 extended;

Name                 : INPUT_STREAM_2
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : PROTOBUF
Kafka topic          : input (partitions: 10, replication: 1)
Statement            : CREATE STREAM INPUT_STREAM_2 (COMMAND STRING, .... CATEGORY STRING) WITH (CLEANUP_POLICY='delete', KAFKA_TOPIC='input', KEY_FORMAT='KAFKA', VALUE_FORMAT='PROTOBUF', VALUE_SCHEMA_FULL_NAME='Message');


 Field       | Type
-------------------------------
 COMMAND     | VARCHAR(STRING)
 ....
 CATEGORY    | VARCHAR(STRING)
-------------------------------

schema registry -- schema was POSTed with normalize=true

> curl --silent -X GET http://localhost:8081/subjects/input-value/versions/8 | jq
{
  "subject": "input-value",
  "version": 8,
  "id": 10,
  "schemaType": "PROTOBUF",
  "schema": "syntax = \"proto3\";\npackage test;\n\nmessage Message {\n  test.Command command = 1; ....  test.Category category = 22;\n}\nenum Command {\n  UPDATE = 0;\n  DELETE = 1;\n} .... \nenum Category {\n  UNK = 0;\n  .... }\n"
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant