Apache Airflow working with multi teams

Sunho Song
8 min readSep 22, 2021

--

  1. Airflow authentication with RBAC and Keycloak
  2. Airflow multi-tenant guide

Airflow Multi-tenant?

Airflow consists of Web Server, Scheduler, and Worker, and the basic rights provided are Admin, Op, User, Viewer, and Public.

https://en.wikipedia.org/wiki/Apache_Airflow

It doesn’t matter much if there is only one group that connects to Airflow and runs Dag, but if there are a large number of users or groups, it is recommended to apply the group permission policy. If the group permission policy is not applied, more time is spent on operation rather than development, and in our experience, repetition of these routines acts as a factor that lowers the life cycle of development and operation. At first, it appears that a little more time is required for system upgrade and patch preparation, but eventually, the system changes that cannot be upgraded or patched, and development and operation are progressed with a focus on operation later.

https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html

In this article, following the application of Airflow authentication with RBAC and Keycloak, I am going to explain how to implement multi-tenancy using RBAC. This article was written assuming that Airflow is used by the Data Engineering and Data Science groups. In this case, as well, it was difficult to find related content, so a considerable amount of time was consumed in the overall implementation and testing.

Prerequisite

  • Airfow >= 2.1.3
  • Keycloak >= 14.0.0
  • Python > 3.7

Configuring Keycloak

First, create a role that is mapped to the multi-tenancy group to be used in airflow. In this article, data_engineering and data_science roles are created as it is assumed to be used by the Data Engineering and Data Science teams.

Roles can be created by clicking the add role button in the roles tab of the airflow client in Realm -> Configure -> Clients. After creation, two roles are added and displayed as shown below.

When the role creation is completed, create a group corresponding to the role. Groups can be created by clicking the New button in Realm -> Manage -> Groups. If creation is completed normally, the added group is displayed in the Groups tree.

This time, let’s map Roles to groups. You can bind a role directly to a user and use it, but for easy management, it is more convenient to map a role to a group and manage it by adding or removing users from the group. Go to the Role Mappings tab of the two groups created above and map the roles. If the role is normally bound, you can see that it is displayed in Assigned roles.

Now that the mapping of roles to groups has been completed, it is time to add users to the group. The role can be used in JWT (Jason Web Token) only when the user is added to the group.

Configuring Airflow

After setting the role and group to be used in airflow, set the permissions for the role in airflow. In this article, since the final goal is multi-tenancy of airflow, if the overall setting is completed, the data engineering team should only show data_engineering_k8spodoperator, and the data science team should only show data_science_k8spodoperator. For now, log in with the Admin role account and both are displayed.

First, create the role in Airflow first. The role’s permission is used by copying the user role. After selecting the user role in Security -> List Roles, click Copy Role in Actions.

When the role copy is complete, select the copied role and click the Edit Record button to set the role name to the data_engineering set above and REMOVE “can read on DAGs” and “can edit on DAGs” from the role privileges.

When deletion is complete, add read and edit permission for data_engineering_k8spodoperator DAG by adding “can read on DAG:data_engineering_k8spodoperator” and “can edit on DAG:data_engineering_k8spodoperator”.

When the data_engineering role is set, work on the data_science role is also completed. Since it is the same setting operation, setting and image capture are omitted.

Edit webserver_config.py

Again, as in the article applying Airflow authentication with RBAC and Keycloak, the webserver_config.py file needs to be modified so that the role set in ROLE is set to airflow. The auth_user_oauth function has been added, and it is a function that searches for and adds roles. The full source is below.

(It is a setting for testing, the code has duplicates, and additional review is required when applied to a production environment.)

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
webserver_config
Referencies
- https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
"""
import os
import logging
import jwt
from flask import redirect, session
from flask_appbuilder import expose
from flask_appbuilder.security.manager import AUTH_OAUTH
from flask_appbuilder.security.views import AuthOAuthView
from flask_appbuilder.security.sqla.models import Role
from airflow.www.security import AirflowSecurityManagerbasedir = os.path.abspath(os.path.dirname(__file__))
log = logging.getLogger(__name__)
MY_PROVIDER = 'keycloak'AUTH_TYPE = AUTH_OAUTH
AUTH_USER_REGISTRATION = True
AUTH_USER_REGISTRATION_ROLE = "Public"
AUTH_ROLES_SYNC_AT_LOGIN = True
AUTH_ROLE_PREFIX='aiplatform'
PERMANENT_SESSION_LIFETIME = 1800AUTH_ROLES_MAPPING = {
"airflow_admin": ["Admin"],
"airflow_op": ["Op"],
"airflow_user": ["User"],
"airflow_viewer": ["Viewer"],
"airflow_public": ["Public"],
}
OAUTH_PROVIDERS = [
{
'name': 'keycloak',
'icon': 'fa-circle-o',
'token_key': 'access_token',
'remote_app': {
'client_id': 'airflow',
'client_secret': 'dfd9c0d6-ac73-41c1-a97f-5b05948cee0e',
'client_kwargs': {
'scope': 'email profile'
},
'api_base_url': 'https://cumulus.koreacentral.cloudapp.azure.com/auth/realms/homelab/protocol/openid-connect/',
'request_token_url': None,
'access_token_url': 'https://cumulus.koreacentral.cloudapp.azure.com/auth/realms/homelab/protocol/openid-connect/token',
'authorize_url': 'https://cumulus.koreacentral.cloudapp.azure.com/auth/realms/homelab/protocol/openid-connect/auth',
},
},
]
class CustomAuthRemoteUserView(AuthOAuthView):
@expose("/logout/")
def logout(self):
"""Delete access token before logging out."""
return super().logout()
class CustomSecurityManager(AirflowSecurityManager):
authoauthview = CustomAuthRemoteUserView
def oauth_user_info(self, provider, response):
if provider == MY_PROVIDER:
token = response["access_token"]
me = jwt.decode(token, algorithms="RS256", verify=False)
# {
# "resource_access": { "airflow": { "roles": ["airflow_admin"] }}
# }
groups = me["resource_access"]["airflow"]["roles"] # unsafe
# log.info("groups: {0}".format(groups))
if len(groups) < 1:
groups = ["airflow_public"]
# else:
# groups = [str for str in groups if "aiplatform" in str]
# role_objects = [str for str in groups if "aiplatform" in str]
userinfo = {
"username": me.get("preferred_username"),
"email": me.get("email"),
"first_name": me.get("given_name"),
"last_name": me.get("family_name"),
"role_keys": groups,
# "role_objects": role_objects,
}
log.info("user info: {0}".format(userinfo))
return userinfo
else:
return {}

def auth_user_oauth(self, userinfo):
"""
Method for authenticating user with OAuth.
:userinfo: dict with user information
(keys are the same as User model columns)
"""
# extract the username from `userinfo`
if "username" in userinfo:
username = userinfo["username"]
elif "email" in userinfo:
username = userinfo["email"]
else:
log.error(
"OAUTH userinfo does not have username or email {0}".format(userinfo)
)
return None
# If username is empty, go away
if (username is None) or username == "":
return None

# Search the DB for this user
user = self.find_user(username=username)
# If user is not active, go away
if user and (not user.is_active):
return None
# If user is not registered, and not self-registration, go away
if (not user) and (not self.auth_user_registration):
return None
# Sync the user's roles
if user and self.auth_roles_sync_at_login:
user_role_objects = set()
user_role_objects.add(self.find_role(AUTH_USER_REGISTRATION_ROLE))

for item in userinfo.get("role_keys", []):
fab_role = self.find_role(item)
if fab_role:
user_role_objects.add(fab_role)
user.roles = list(user_role_objects)
log.debug(
"Calculated new roles for user='{0}' as: {1}".format(
username, user.roles
)
)
# If the user is new, register them
if (not user) and self.auth_user_registration:
user_role_objects = set()
user_role_objects.add(self.find_role(AUTH_USER_REGISTRATION_ROLE))
for item in userinfo.get("role_keys", []):
fab_role = self.find_role(item)
if fab_role:
user_role_objects.add(fab_role)

user = self.add_user(
username=username,
first_name=userinfo.get("first_name", ""),
last_name=userinfo.get("last_name", ""),
email=userinfo.get("email", "") or f"{username}@email.notfound",
role=list(user_role_objects),
)
log.debug("New user registered: {0}".format(user))
# If user registration failed, go away
if not user:
log.error("Error creating a new OAuth user {0}".format(username))
return None
# LOGIN SUCCESS (only if user is now registered)
if user:
self.update_user_auth_stat(user)
return user
else:
return None
SECURITY_MANAGER_CLASS = CustomSecurityManagerAPP_THEME = "simplex.css"

After changing the source, make airflow run webserver again. If you log in with a data engineering related account, only data_engineering_k8spodoperator is displayed.

Conclusion

I have summarized the method of linking OIDC to Airflow in two rounds. It took a lot of time than I thought to organize the contents because it was difficult to find data on Google and Stack Overflow. I hope it will be helpful to those who are looking for related content. And if you have saved a lot of time with this content, please donate a cup of coffee. (Please help me to write while eating ice americano at a local cafe.)

And I am looking for a job. If you are interested, please comment.

https://buymeacoffee.com/7ov2xm5

References

--

--

Sunho Song
Sunho Song

Written by Sunho Song

I have developed an AI platform for semiconductor defect analysis. I am very interested in MLOps and love to learn new skills.

Responses (1)