将Airflow
更新至1.9
后,所有变量都会被创建为加密状态。
是否可以禁用加密?
将Airflow
更新至1.9
后,所有变量都会被创建为加密状态。
是否可以禁用加密?
from airflow.models import get_fernet
from airflow.models import Variable
var_to_decryp = Variable.get("var_name",deserialize_json=True)
fernet = get_fernet()
decryp_value = fernet.decrypt(bytes(var_to_decryp, 'utf-8')).decode()
2-正确的方法是使用Variable模型中的get_val():
def get_val(self):
log = LoggingMixin().log
if self._val and self.is_encrypted:
try:
fernet = get_fernet()
return fernet.decrypt(bytes(self._val, 'utf-8')).decode()
except InvalidFernetToken:
log.error("Can't decrypt _val for key={}, invalid token "
"or value".format(self.key))
return None
except Exception:
log.error("Can't decrypt _val for key={}, FERNET_KEY "
"configuration missing".format(self.key))
return None
else:
return self._val
您可以在需要设置未加密变量的DAG中覆盖Variable类:
import json
from typing import Any, Optional
import airflow.models as models
from airflow.utils.session import provide_session
from sqlalchemy.orm import Session
class Variable(models.Variable):
def set_val_unencrypted(self, value):
if value is not None:
self._val = value
self.is_encrypted = False
@classmethod
@provide_session
def set_unencrypted(
cls,
key: str,
value: Any,
description: Optional[str] = None,
serialize_json: bool = False,
session: Session = None,
):
# check if the secret exists in the custom secrets backend.
cls.check_for_write_conflict(key)
if serialize_json:
stored_value = json.dumps(value, indent=2)
else:
stored_value = str(value)
Variable.delete(key, session=session)
var = Variable(key=key, description=description)
var.set_val_unencrypted(stored_value)
session.add(var)
session.flush()
然后当您需要设置未加密变量时,只需调用Variable.set_unencrypted(<key>, <value>, ...)
(而不是Variable.set()
)