Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #6368] Flink engine supports user impersonation #6383

Open
wants to merge 12 commits into
base: master
Choose a base branch
from

Conversation

wForget
Copy link
Member

@wForget wForget commented May 10, 2024

🔍 Description

Issue References 🔗

This pull request fixes #6368

Describe Your Solution 🔧

Support impersonation mode for flink sql engine.

Types of changes 🔖

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)

Test Plan 🧪

Behavior Without This Pull Request ⚰️

Behavior With This Pull Request 🎉

Test in hadoop-testing env.

Connection:

beeline -u "jdbc:hive2://hadoop-master1.orb.local:10009/default;hive.server2.proxy.user=spark;principal=kyuubi/[email protected]?kyuubi.engine.type=FLINK_SQL;flink.execution.target=yarn-application;kyuubi.engine.share.level=CONNECTION;kyuubi.engine.flink.doAs.enabled=true;"

sql:

select 1;

result:

image

launch engine command:

2024-06-12 03:22:10.242 INFO KyuubiSessionManager-exec-pool: Thread-62 org.apache.kyuubi.engine.EngineRef: Launching engine:
/opt/flink-1.18.1/bin/flink run-application \
	-t yarn-application \
	-Dyarn.ship-files=/opt/flink/opt/flink-sql-client-1.18.1.jar;/opt/flink/opt/flink-sql-gateway-1.18.1.jar;/etc/hive/conf/hive-site.xml \
	-Dyarn.application.name=kyuubi_CONNECTION_FLINK_SQL_spark_6170b9aa-c690-4b50-938f-d59cca9aa2d6 \
	-Dyarn.tags=KYUUBI,6170b9aa-c690-4b50-938f-d59cca9aa2d6 \
	-Dcontainerized.master.env.FLINK_CONF_DIR=. \
	-Dcontainerized.master.env.HIVE_CONF_DIR=. \
	-Dyarn.security.appmaster.delegation.token.services=kyuubi \
	-Dsecurity.delegation.token.provider.HiveServer2.enabled=false \
	-Dsecurity.delegation.token.provider.hbase.enabled=false \
	-Dexecution.target=yarn-application \
	-Dsecurity.module.factory.classes=org.apache.flink.runtime.security.modules.JaasModuleFactory;org.apache.flink.runtime.security.modules.ZookeeperModuleFa
ctory \
	-Dsecurity.delegation.token.provider.hadoopfs.enabled=false \
	-c org.apache.kyuubi.engine.flink.FlinkSQLEngine /opt/apache-kyuubi-1.10.0-SNAPSHOT-bin/externals/engines/flink/kyuubi-flink-sql-engine_2.12-1.10.0-SNAPS
HOT.jar \
	--conf kyuubi.session.user=spark \
	--conf kyuubi.client.ipAddress=172.20.0.5 \
	--conf kyuubi.engine.credentials=SERUUwACJnRocmlmdDovL2hhZG9vcC1tYXN0ZXIxLm9yYi5sb2NhbDo5MDgzRQAFc3BhcmsEaGl2ZShreXV1YmkvaGFkb29wLW1hc3RlcjEub3JiLmxvY2Fs
QFRFU1QuT1JHigGQCneevIoBkC6EIrwWDxSg03pnAB8dA295wh+Dim7Fx4FNxhVISVZFX0RFTEVHQVRJT05fVE9LRU4ADzE3Mi4yMC4wLjU6ODAyMEEABXNwYXJrAChreXV1YmkvaGFkb29wLW1hc3RlcjEub3JiL
mxvY2FsQFRFU1QuT1JHigGQCneekIoBkC6EIpBHHBSket0SQnlXT5EIMN0U2fUKFRIVvBVIREZTX0RFTEVHQVRJT05fVE9LRU4PMTcyLjIwLjAuNTo4MDIwAA== \
	--conf kyuubi.engine.flink.doAs.enabled=true \
	--conf kyuubi.engine.hive.extra.classpath=/opt/hadoop/share/hadoop/client/*:/opt/hadoop/share/hadoop/mapreduce/* \
	--conf kyuubi.engine.share.level=CONNECTION \
	--conf kyuubi.engine.submit.time=1718162530017 \
	--conf kyuubi.engine.type=FLINK_SQL \
	--conf kyuubi.frontend.protocols=THRIFT_BINARY,REST \
	--conf kyuubi.ha.addresses=hadoop-master1.orb.local:2181 \
	--conf kyuubi.ha.engine.ref.id=6170b9aa-c690-4b50-938f-d59cca9aa2d6 \
	--conf kyuubi.ha.namespace=/kyuubi_1.10.0-SNAPSHOT_CONNECTION_FLINK_SQL/spark/6170b9aa-c690-4b50-938f-d59cca9aa2d6 \
	--conf kyuubi.server.ipAddress=172.20.0.5 \
	--conf kyuubi.session.connection.url=hadoop-master1.orb.local:10009 \
	--conf kyuubi.session.engine.startup.waitCompletion=false \
	--conf kyuubi.session.real.user=spark

launch engine log:

image

jobmanager job:

2024-06-12 03:22:26,400 INFO  org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] - Loading delegation token providers
2024-06-12 03:22:26,992 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenProvider [] - Renew delegation token with engine credentials: SERUUwACJnRocmlmdDovL2hhZG9vcC1tYXN0ZXIxLm9yYi5sb2NhbDo5MDgzRQAFc3BhcmsEaGl2ZShreXV1YmkvaGFkb29wLW1hc3RlcjEub3JiLmxvY2FsQFRFU1QuT1JHigGQCneevIoBkC6EIrwWDxSg03pnAB8dA295wh+Dim7Fx4FNxhVISVZFX0RFTEVHQVRJT05fVE9LRU4ADzE3Mi4yMC4wLjU6ODAyMEEABXNwYXJrAChreXV1YmkvaGFkb29wLW1hc3RlcjEub3JiLmxvY2FsQFRFU1QuT1JHigGQCneekIoBkC6EIpBHHBSket0SQnlXT5EIMN0U2fUKFRIVvBVIREZTX0RFTEVHQVRJT05fVE9LRU4PMTcyLjIwLjAuNTo4MDIwAA==
2024-06-12 03:22:27,100 INFO  org.apache.kyuubi.engine.flink.FlinkEngineUtils              [] - Add new unknown token Kind: HIVE_DELEGATION_TOKEN, Service: , Ident: 00 05 73 70 61 72 6b 04 68 69 76 65 28 6b 79 75 75 62 69 2f 68 61 64 6f 6f 70 2d 6d 61 73 74 65 72 31 2e 6f 72 62 2e 6c 6f 63 61 6c 40 54 45 53 54 2e 4f 52 47 8a 01 90 0a 77 9e bc 8a 01 90 2e 84 22 bc 16 0f
2024-06-12 03:22:27,104 WARN  org.apache.kyuubi.engine.flink.FlinkEngineUtils              [] - Ignore token with earlier issue date: Kind: HDFS_DELEGATION_TOKEN, Service: 172.20.0.5:8020, Ident: (token for spark: HDFS_DELEGATION_TOKEN owner=spark, renewer=, realUser=kyuubi/[email protected], issueDate=1718162529936, maxDate=1718767329936, sequenceNumber=71, masterKeyId=28)
2024-06-12 03:22:27,104 INFO  org.apache.kyuubi.engine.flink.FlinkEngineUtils              [] - Update delegation tokens. The number of tokens sent by the server is 2. The actual number of updated tokens is 1.
......
4-06-12 03:22:29,414 INFO  org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] - Starting tokens update task
2024-06-12 03:22:29,415 INFO  org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] - New delegation tokens arrived, sending them to receivers
2024-06-12 03:22:29,422 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Updating delegation tokens for current user
2024-06-12 03:22:29,422 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Token Service: Identifier:[10, 13, 10, 9, 8, 10, 16, -78, -36, -49, -17, -5, 49, 16, 1, 16, -100, -112, -60, -127, -8, -1, -1, -1, -1, 1]
2024-06-12 03:22:29,422 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Token Service: Identifier:[0, 5, 115, 112, 97, 114, 107, 4, 104, 105, 118, 101, 40, 107, 121, 117, 117, 98, 105, 47, 104, 97, 100, 111, 111, 112, 45, 109, 97, 115, 116, 101, 114, 49, 46, 111, 114, 98, 46, 108, 111, 99, 97, 108, 64, 84, 69, 83, 84, 46, 79, 82, 71, -118, 1, -112, 10, 119, -98, -68, -118, 1, -112, 46, -124, 34, -68, 22, 15]
2024-06-12 03:22:29,422 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Token Service:172.20.0.5:8020 Identifier:[0, 5, 115, 112, 97, 114, 107, 0, 40, 107, 121, 117, 117, 98, 105, 47, 104, 97, 100, 111, 111, 112, 45, 109, 97, 115, 116, 101, 114, 49, 46, 111, 114, 98, 46, 108, 111, 99, 97, 108, 64, 84, 69, 83, 84, 46, 79, 82, 71, -118, 1, -112, 10, 119, -98, -112, -118, 1, -112, 46, -124, 34, -112, 71, 28]
2024-06-12 03:22:29,422 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Updated delegation tokens for current user successfully

taskmanager log:

2024-06-12 03:45:06,622 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive initial delegation tokens from resource manager
2024-06-12 03:45:06,627 INFO  org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] - New delegation tokens arrived, sending them to receivers
2024-06-12 03:45:06,628 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Updating delegation tokens for current user
2024-06-12 03:45:06,629 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Token Service: Identifier:[10, 13, 10, 9, 8, 10, 16, -78, -36, -49, -17, -5, 49, 16, 1, 16, -100, -112, -60, -127, -8, -1, -1, -1, -1, 1]
2024-06-12 03:45:06,630 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Token Service: Identifier:[0, 5, 115, 112, 97, 114, 107, 4, 104, 105, 118, 101, 40, 107, 121, 117, 117, 98, 105, 47, 104, 97, 100, 111, 111, 112, 45, 109, 97, 115, 116, 101, 114, 49, 46, 111, 114, 98, 46, 108, 111, 99, 97, 108, 64, 84, 69, 83, 84, 46, 79, 82, 71, -118, 1, -112, 10, 119, -98, -68, -118, 1, -112, 46, -124, 34, -68, 22, 15]
2024-06-12 03:45:06,630 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Token Service:172.20.0.5:8020 Identifier:[0, 5, 115, 112, 97, 114, 107, 0, 40, 107, 121, 117, 117, 98, 105, 47, 104, 97, 100, 111, 111, 112, 45, 109, 97, 115, 116, 101, 114, 49, 46, 111, 114, 98, 46, 108, 111, 99, 97, 108, 64, 84, 69, 83, 84, 46, 79, 82, 71, -118, 1, -112, 10, 119, -98, -112, -118, 1, -112, 46, -124, 34, -112, 71, 28]
2024-06-12 03:45:06,636 INFO  org.apache.kyuubi.engine.flink.security.token.KyuubiDelegationTokenReceiver [] - Updated delegation tokens for current user successfully
2024-06-12 03:45:06,636 INFO  org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] - Delegation tokens sent to receivers

Related Unit Tests


Checklist 📝

Be nice. Be informative.

@wForget wForget self-assigned this May 10, 2024
@codecov-commenter
Copy link

codecov-commenter commented May 10, 2024

Codecov Report

Attention: Patch coverage is 0% with 38 lines in your changes missing coverage. Please review.

Project coverage is 0.00%. Comparing base (4cbecdc) to head (02ab7af).
Report is 30 commits behind head on master.

Files Patch % Lines
...ache/kyuubi/engine/flink/FlinkProcessBuilder.scala 0.00% 32 Missing ⚠️
...in/scala/org/apache/kyuubi/config/KyuubiConf.scala 0.00% 6 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #6383       +/-   ##
============================================
- Coverage     58.37%   0.00%   -58.38%     
============================================
  Files           656     675       +19     
  Lines         40267   41674     +1407     
  Branches       5498    5689      +191     
============================================
- Hits          23507       0    -23507     
- Misses        14259   41674    +27415     
+ Partials       2501       0     -2501     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@wForget wForget added this to the v1.10.0 milestone May 14, 2024
@wForget wForget marked this pull request as draft May 24, 2024 07:44
@pan3793 pan3793 changed the title [KYUUBI #6368] Support impersonation mode for flink sql engine [KYUUBI #6368] Flink engine supports user impersonation May 24, 2024
@@ -68,12 +68,10 @@ class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with
}.toMap

try {
// Renewer is not needed. But setting a renewer can avoid potential NPE.
val renewer = UserGroupInformation.getCurrentUser.getUserName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change is worth a dedicated PR.

@@ -58,14 +61,30 @@ class FlinkProcessBuilder(
// flink.execution.target are required in Kyuubi conf currently
val executionTarget: Option[String] = conf.getOption("flink.execution.target")

private lazy val proxyUserEnable: Boolean = {
doAsEnabled && conf.get(ENGINE_FLINK_DOAS_ENABLED) &&
conf.getOption(s"$FLINK_CONF_PREFIX.$FLINK_SECURITY_KEYTAB_KEY").isEmpty &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in Spark, when doAs is enabled, we actually have a constraint to ensure that the principal should always be the session user.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of flink proxy user is different from the built-in --proxy-user of spark. It relies on HADOOP_PROXY_USER and has some limitations. It is difficult for us to control the behavior of the client well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should unify the concept on the Kyuubi layer as much as possible.

private def generateTokenFile(): Option[(String, String)] = {
// We disabled `hadoopfs` token service, which may cause yarn client to miss hdfs token.
// So we generate a hadoop token file to pass kyuubi engine tokens to submit process.
// TODO: Removed this after FLINK-35525, delegation tokens will be passed by `kyuubi` provider
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitant to mix two approaches up, maybe we need a switch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FLINK-35525 has not yet been introduced in stable version. the hadoop token file way is currently only effective one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink 1.20 is on the way.

We can simply make the decision based on the Flink version, but considering vendors are likely to backport patches to their internal distributions, an explicit switch is preferred.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an explicit switch is preferred.

Do you mean add a configuration to control this behavior?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a new configuration. Given the current approach is hacky and will be eventually removed, we can mark the configuration as internal.

@wForget wForget marked this pull request as ready for review June 12, 2024 06:04

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary

val ENGINE_FLINK_DOAS_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.flink.doAs.enabled")
.doc("Whether to enable using hadoop proxy user to run flink engine. Only takes effect" +
s" in kerberos environment and when `${ENGINE_DO_AS_ENABLED.key}` is set to `true`.")
Copy link
Member

@pan3793 pan3793 Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this configuration should be independent with kyuubi.engine.doAs.enabled. And we should fail the Flink engine bootstrap when user enables the configuration on a non-Kerberized environment.

the docs might be: "When enabled, the session user is used as the proxy user to launch the Flink engine, otherwise, the server user. Note, due to the limitation of Apache Flink, it can only be enabled on Kerberized environment."

@@ -58,14 +61,30 @@ class FlinkProcessBuilder(
// flink.execution.target are required in Kyuubi conf currently
val executionTarget: Option[String] = conf.getOption("flink.execution.target")

private lazy val proxyUserEnable: Boolean = {
doAsEnabled && conf.get(ENGINE_FLINK_DOAS_ENABLED) &&
conf.getOption(s"$FLINK_CONF_PREFIX.$FLINK_SECURITY_KEYTAB_KEY").isEmpty &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should unify the concept on the Kyuubi layer as much as possible.

private def generateTokenFile(): Option[(String, String)] = {
// We disabled `hadoopfs` token service, which may cause yarn client to miss hdfs token.
// So we generate a hadoop token file to pass kyuubi engine tokens to submit process.
// TODO: Removed this after FLINK-35525, delegation tokens will be passed by `kyuubi` provider
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a new configuration. Given the current approach is hacky and will be eventually removed, we can mark the configuration as internal.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEATURE] Support impersonation mode for flink sql engine
3 participants