Source code for aj.config

import os
import pwd
import stat
import yaml
import logging
from jadi import interface, component, service
import aj
from aj.util import public

class BaseConfig():
    A base class for config implementations. Your implementation must be able to save
    arbitrary mixture of ``dict``, ``list``, and scalar values.

    .. py:attribute:: data

        currently loaded config content

    def __init__(self): = None

    def load(self):
        Should load config content into :attr:`data`.
        raise NotImplementedError()

    def save(self):
        Should save config content from :attr:`data`.
        raise NotImplementedError()

    def ensure_structure(self):
        # Global options'name', None)'trusted_domains', [])'trusted_proxies', [])'max_sessions', 99)'session_max_time', 3600)'language', 'en')'restricted_user', 'nobody')'logo', os.path.dirname(__file__) + '/static/images/Logo.png')

        # Main view'view', {})['view'].setdefault('plugin', 'core')['view'].setdefault('filepath', 'content/pages/index.html')

        # Authentication'auth', {})['auth'].setdefault('emails', {})['auth'].setdefault('provider', 'os')['auth'].setdefault('users_file', '/etc/ajenti/users.yml')

        # SSL'ssl', {})['ssl'].setdefault('enable', False)['ssl'].setdefault('certificate', None)['ssl'].setdefault('fqdn_certificate', None)['ssl'].setdefault('force', False)['ssl'].setdefault('client_auth', {})['ssl']['client_auth'].setdefault('enable', False)['ssl']['client_auth'].setdefault('force', False)['ssl']['client_auth'].setdefault('certificates', [])
        if['ssl']['client_auth']['certificates'] is None:
  ['ssl']['client_auth']['certificates'] = []

        # Emails'email', {})['email'].setdefault('enable', False)['email'].setdefault('templates', {})

        # Before Ajenti 2.1.38, the users were stored in config.yml
        if 'users' in['auth'].keys():
            logging.warning(f"Users should be stored in {['auth']['users_file']}, migrating it ...")

    def migrate_users_to_own_configfile(self):
        users_path =['auth']['users_file']

        if os.path.isfile(users_path):
  "{users_path} already existing, backing it up")
            os.rename(users_path, users_path + '.bak')

        to_write = {'users':['auth']['users']}
        with open(users_path, 'w') as f:
           f.write(yaml.safe_dump(to_write, default_flow_style=False, encoding='utf-8', allow_unicode=True).decode('utf-8'))

        del['auth']['users']"{users_path} correctly written")

    def get_non_sensitive_data(self):
        return {


class SmtpConfig(BaseConfig):
    Class to handle the smtp config file in order to store credentials of the email
    server relay.
    Config file is located at /etc/ajenti/smtp.yml and should have the following
    structure :
      port: starttls or ssl
      user: user to authenticate
      password: password of the mail user

    def __init__(self):
        BaseConfig.__init__(self) = {}
        self.path = '/etc/ajenti/smtp.yml'

    def ensure_structure(self):'smtp', {})['smtp'].setdefault('password', None)['smtp'].setdefault('port', None)['smtp'].setdefault('server', None)['smtp'].setdefault('user', None)

    def get_smtp_password(self):
        # if smtp.yml is not provided
        if['smtp']['password'] is None:
            return ''
        with open(self.path, 'r') as smtp:
            smtp_config = yaml.load(smtp, Loader=yaml.SafeLoader).get('smtp', {})
        return smtp_config.get('password', None)

    def load(self):
        if not os.path.exists(self.path):
            logging.error(f'Smtp credentials file "{self.path}" not found')
            if os.geteuid() == 0:
                os.chmod(self.path, 384)  # 0o600
                with open(self.path, 'r') as smtp:
           = yaml.load(smtp, Loader=yaml.SafeLoader)
                    # Prevent password leak
          ['smtp']['password'] = ''

    def save(self, data):
        # Prevent emptying password from settings plugin
        if not data['smtp']['password']:
            data['smtp']['password'] = self.get_smtp_password()
        with open(self.path, 'w') as smtp:

class AjentiUsers(BaseConfig):
    Class to handle the users config file for the auth-user plugin.
    Config file is located at /etc/ajenti/users.yml and should have the following
    structure :
        email: ...@...
        password: hash
        permissions: {}
        uid: int
        fs_root: file system root directory

    def __init__(self, path):
        BaseConfig.__init__(self) = None
        self.path = os.path.abspath(path)

    def __str__(self):
        return self.path

    def load(self):
        # Find default users file
        if not self.path:
            # Check for users file in /etc/ajenti/users.yml
            if os.path.isfile('/etc/ajenti/users.yml'):
                config_path = '/etc/ajenti/users.yml'
            elif os.path.isfile(os.path.join(sys.path[0], 'users.yml')):
                # Try local users file
                config_path = os.path.join(sys.path[0], 'users.yml')

        if not os.path.exists(self.path):
            logging.error(f'Users file "{self.path}" not found')
   = {'users': {}}
            if os.geteuid() == 0:
                os.chmod(self.path, 384)  # 0o600
            with open(self.path, 'r') as users:
       = yaml.load(users, Loader=yaml.SafeLoader)
            if['users'] is None:
      ['users'] = {}

    def save(self):
        with open(self.path, 'w') as f:
            f.write(yaml.safe_dump(, default_flow_style=False, encoding='utf-8', allow_unicode=True).decode('utf-8'))

class UserConfigProvider():
    id = None
    name = None

    def __init__(self, context): = None

    def load(self):
        raise NotImplementedError

    def harden(self):
        raise NotImplementedError

    def save(self):
        raise NotImplementedError

class UserConfigError(Exception):
    def __init__(self, message):
        self.message = message

    def __str__(self):
        return self.message

[docs]@public @service class UserConfigService(): def __init__(self, context): self.context = context
[docs] def get_provider(self): provider_id =['auth'].get('provider', 'os') for provider in UserConfigProvider.all(self.context): if == provider_id: return provider raise UserConfigError(f'User config provider {provider_id} is unavailable')
@component(UserConfigProvider) class UserConfig(UserConfigProvider): id = 'os' name = 'OS users' def __init__(self, context): UserConfigProvider.__init__(self, context) username = pwd.getpwuid(os.getuid())[0] _dir = os.path.expanduser(f'~{username}/.config') if not os.path.exists(_dir): os.makedirs(_dir) self.path = os.path.join(_dir, 'ajenti.yml') if os.path.exists(self.path): self.load() else: = {} def load(self): = yaml.load(open(self.path), Loader=yaml.SafeLoader) def harden(self): os.chmod(self.path, stat.S_IRWXU) def save(self): with open(self.path, 'w') as f: f.write(yaml.safe_dump(, default_flow_style=False, encoding='utf-8', allow_unicode=True ).decode('utf-8')) self.harden()


comments powered by Disqus