Source code for as3ninja.vault

# -*- coding: utf-8 -*-
"""
HashiCorp Vault integration
"""

# pylint: disable=C0330 # Wrong hanging indentation before block
# pylint: disable=C0301 # Line too long
# pylint: disable=E0213 # Method should have "self" as first argument
# pylint: disable=R0201 # Method could be a function

from enum import Enum
from os import getenv
from pathlib import Path
from typing import Dict, Optional, Union

import hvac
from jinja2 import pass_context
from jinja2.runtime import Context
from pydantic import BaseModel, validator

from .jinja2.j2ninja import J2Ninja
from .settings import NINJASETTINGS
from .utils import dict_filter

__all__ = ["VaultSecretsEngines", "VaultSecret", "VaultClient", "vault"]


[docs]class VaultSecretsEngines(Enum): """Supported Vault Secret Engines""" kv1 = "kv1" kv2 = "kv2" """Aliases""" default = "kv2" kv = "kv1"
[docs]class VaultSecret(BaseModel): """Vault Secret configuration BaseModel. :param path: The secret path. If `mount_point` is not specified the first path element is assumed to be the `mount_point`. :param mount_point: The secrets engine path. Optional. :param engine: The secrets engine. Optional. :param filter: Optional Filter to select specific data from the secret, e.g. "data.privateKey". Filter automatically prepends "data." for kv2 to replicate the same behaviour for kv1 and kv2. :param version: The version of the secret. Only relevant for KV2 Secrets Engine. Optional. Default: 0 (latest secret version) """ path: str mount_point: str engine: Union[str, VaultSecretsEngines] = VaultSecretsEngines["default"] filter: Optional[str] version: int = 0 def __init__(self, *args, **kwargs): path = kwargs.pop("path", None) mount_point = kwargs.pop("mount_point", None) if path and not mount_point: (mount_point, path) = self._split_mount_point_path(path) super().__init__(*args, mount_point=mount_point, path=path, **kwargs)
[docs] @validator("version") def validate_version(cls, value): """Validate version""" if not value >= 0: raise ValueError("version must be >= 0") return value
[docs] @validator("engine") def validate_engine(cls, value): """Validate engine against VaultSecretsEngines""" return VaultSecretsEngines[value]
[docs] @validator("path", "mount_point") def validate_pathlike(cls, value): """Basic secrets path validation using pathlib.Path. This should work for most vault secrets paths. """ if value: if value[0] == "/": return str(Path(value)) return str(Path(f"/{value}")) return value
[docs] @staticmethod def _split_mount_point_path(path: str) -> tuple: """Splits mount_point from path. The first path element is treated as the mount_point. :param path: path parameter """ offset = 0 if str(path)[0] == "/": offset = 1 _path = str(Path(str(path))) # normalize path _path = _path.split(sep="/", maxsplit=1 + offset) if len(_path) > offset + 1: return (_path[offset], _path[1 + offset]) return (None, path)
[docs]@J2Ninja.registerfunction class VaultClient: """Vault Client object, returns a hvac.v1.Client object. :param addr: Vault Address (url, eg. `https://myvault:8200/`) :param token: Vault Token to use for authentication :param verify: If `True` Verify TLS Certificate of Vault (Default: `True`) """ _defaultClient = None def __init__( self, addr: str, token: Optional[str] = None, verify: Union[str, bool] = True ): self._client = hvac.Client(url=addr, verify=verify) if token: self._client.token = token if not self._client.is_authenticated(): raise hvac.exceptions.VaultError( message="Could not successfully authenticate." )
[docs] def Client(self) -> hvac.v1.Client: """Returns hvac.client callable based on VaultClient() initialization parameters.""" return self._client
[docs] @classmethod def defaultClient(cls, ctx: Context) -> hvac.v1.Client: """Returns a hvac.v1.Client based on system/environment settings. This is method is not intended to be used directly. First checks for existing authentication based on `vault` cli. If authenticated no further action is performed. Then check the Jinja2 Context for the namespace ``ninja.as3ninja.vault`` and use ``addr``, ``token`` and ``ssl_verify`` to establish a Vault connection. For any of the above variables that doesn't exist the respective environment variable will be used as a fallback: ``addr`` = ``VAULT_ADDR`` ``token`` = `VAULT_TOKEN` ``ssl_verify`` = ``VAULT_SKIP_VERIFY`` If ``VAULT_SKIP_VERIFY`` does not exist ``VAULT_SSL_VERIFY`` from the AS3 Ninja configuration file (`as3ninja.settings.json`) is used. :param ctx: Context: Jinja2 Context """ if not cls._defaultClient: client = hvac.Client() # client might be authenticated already, e.g. when run through CLI if not client.is_authenticated(): vaddr = ( ctx.parent.get("ninja") .get("as3ninja", {}) .get("vault", {}) .get("addr", getenv("VAULT_ADDR", None)) ) token = ( ctx.parent.get("ninja") .get("as3ninja", {}) .get("vault", {}) .get("token", getenv("VAULT_TOKEN", None)) ) verify = ( ctx.parent.get("ninja") .get("as3ninja", {}) .get("vault", {}) .get( "ssl_verify", getenv("VAULT_SKIP_VERIFY", NINJASETTINGS.VAULT_SSL_VERIFY), ) ) if isinstance(verify, str): if verify in ("true", "True", "TRUE", "1"): verify = False else: verify = True client = hvac.Client(url=vaddr, verify=verify) if token: client.token = token if not client.is_authenticated(): raise hvac.exceptions.VaultError( message="Could not successfully authenticate." ) cls._defaultClient = client return cls._defaultClient
[docs]@J2Ninja.registerfilter @J2Ninja.registerfunction @pass_context def vault( ctx: Context, secret: Dict, client: Optional[VaultClient] = None, filter: Optional[str] = None, version: Optional[int] = None, ) -> Dict: """Vault filter to retrieve a secret. The secret is returned as a dict. :param ctx: Context: Jinja2 Context (automatically provided by jinja2) :param secret: secret configuration statement, automatically passed by "piping" to the vault filter :param client: Optional Vault client :param filter: Optional Filter to select specific data from the secret, e.g. "data.privateKey". Filter automatically prepends "data." for kv2 to replicate the same behaviour for kv1 and kv2. :param version: Optional secret version (overrides version provided by secret configuration statement) """ _secret: VaultSecret = VaultSecret(**secret) if version: _secret.version = version if client: vault_client = client.Client() else: vault_client = VaultClient.defaultClient(ctx=ctx) if filter is None: filter = _secret.filter if _secret.engine == VaultSecretsEngines.kv2: if filter: # prepend "data." for kv2 to replicate behaviour of kv1 filter = "data." + filter return dict_filter( vault_client.secrets.kv.v2.read_secret_version( path=_secret.path, mount_point=_secret.mount_point, version=_secret.version, ), filter=filter, ) elif _secret.engine == VaultSecretsEngines.kv1: return dict_filter( vault_client.secrets.kv.v1.read_secret( path=_secret.path, mount_point=_secret.mount_point ), filter=filter, )