最近学习了下用户权限划分的数据库结构,并且结合到了 Flask 和 SQLAlchemy 中

首先是数据库的整体结构图(简化版)

### 基础表
#### 用户表
class UserModel(db.Model):
    __tablename__ = 'user'
    username = db.Column(db.String(50))
    password = db.Column(db.String(128))
    email = db.Column(db.String(128))
    mobile = db.Column(db.String(11))
    name = db.Column(db.String(50))
    gender = db.Column(db.SmallInteger)  # 0 未知, 1 男 2 女
#### 角色表
class RoleModel(db.Model):
    __tablename__ = 'role'
    name = db.Column(db.String(20))
#### 权限表
class PermissionModel(db.Model):
    __tablename__ = 'permission'
    name = db.Column(db.String(50))
    action = db.Column(db.String(250), unique=True)
#### 菜单表
class MenuModel(db.Model):
    __tablename__ = 'menu'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50))
    icon = db.Column(db.String(50))
    url = db.Column(db.String(250))
    order = db.Column(db.SmallInteger, default=0)
    bg_color = db.Column(db.String(50))
### 关联表

基础表完了就是关联表了

#### 用户角色表

用户跟角色,肯定是多对多的关系,按照 Flask-SQLAlchemy 里的 Many-to-Many Relationships

user_role = db.Table('user_role',  # 用户角色关联表
    db.Column('user_id', db.Integer, db.ForeignKey('user.id')),
    db.Column('role_id', db.Integer, db.ForeignKey('role.id')),
    db.Column('created_at', db.DateTime, default=datetime.now),
)
#### 角色权限表

这里把权限挂在了角色下面,其实也可以去掉角色,直接跟用户挂钩,但是如果后期在后台分配用户权限,估计会累死。这里角色跟权限也是多对多

role_permission = db.Table('role_permission',  # 角色权限关联表
    db.Column('permission_id', db.Integer, db.ForeignKey('permission.id')),
    db.Column('role_id', db.Integer, db.ForeignKey('role.id')),
    db.Column('created_at', db.DateTime, default=datetime.now),
)
#### 角色菜单表

同上,也是多对多

role_menu = db.Table('role_menu',  # 用户菜单关联表
    db.Column('role_id', db.Integer, db.ForeignKey('role.id')),
    db.Column('menu_id', db.Integer, db.ForeignKey('menu.id')),
    db.Column('created_at', db.DateTime, default=datetime.now),
    db.Column('is_delete', db.Boolean, default=False),
)

SQLAlchemy

如果需要获取一个用户的角色,可以利用relationship,关联到角色表上

class UserModel(db.Model):
    # ...
    roles = db.relationship(
        'RoleModel',
        secondary=user_role,
        backref=db.backref(
            'users',
            lazy='dynamic'
        )
    )

获取用户的所有权限可以用property

class UserModel(db.Model):
    # ...
    @property
    def permissions(self):
        permissions = PermissionModel.query.join(role_permission).join(RoleModel).join(user_role).join(UserModel).\
            filter(
            UserModel.id == self.id
        )
        return permissions

同理菜单

class UserModel(db.Model):
    # ...
    @property
    def menus(self):
        menus = MenuModel.query.join(role_menu).join(RoleModel).join(user_role).join(UserModel).\
            filter(
            UserModel.id == self.id
        ).order_by(MenuModel.type_, MenuModel.order).all()
        return menus

这样就可以用user.permissionsuser.menus来获得用户的权限和菜单了

### 与 Flask 结合

数据库表结构设计好了,下面就是跟 Flask 的结合了

在 Python 中,用 decorator 可以用来做用户验证,比如下面

def auth(method):
    @functools.wraps(method)
    def wrapper(*args, **kwargs):
        user_id = session.get('user_id')
        if not user_id:
            return abort(403)
        return method(*args, **kwargs)
    return wrapper

@app.router('/user/info')
@auth
def user_info():
    return render_template('user/info.html')

上面就是利用 Python 的 decorator 来认证用户,其实也是简单的权限划分

因为在 Flask 中,每个 view 就是一个函数,所以在权限表中,用action来表示每个 view 的函数名,那么每个 view 就是一个最小的权限单位,如果一个角色拥有这个权限,那么他就可以请求这个 view 的操作。所以可以这样验证权限

class UserModel(db.Model):
    # ...
    def check(self, action):
        permission = self.permissions.filter(PermissionModel.action == action).first()
        return bool(permissions)

然后把这个权限验证写到 decorator 里去

permissions = list()

class Permission(object):

    def __init__(self, module=None, action=None):
        self.module = module
        self.action = action

    def check(self, module, func):
        if not self.current_user:
            return False
        return self.current_user.check('{module}.{action}'.format(
            module=module,
            action=func
        ))

    def deny(self):
        return fail(4003, u'无权访问')

    def __call__(self, func):
        permissions.append({
            'action': '{}.{}'.format(func.__module__, func.__name__),
            'name': func.__doc__
        })
        @wraps(func)
        def decorator(*args, **kwargs):
            if not self.check(func.__module__, func.__name__):
                return self.deny()
            return func(*args, **kwargs)
        return decorator

    def __enter__(self):
        if not self.check(self.module, self.action):
            try:
                self.deny()
            except Exception as e:
                raise e
            else:
                raise PermissionDeniedException

    def __exit(self):
        pass

    @property
    def current_user(self):
        return g.user

permission = Permission()

这里参考了 hustazp 的 permission

使用 func.__module__func.__name__ 结合作为权限中的 action,如果单独用 func.__name,肯定会出现相同的函数名,如果加上 func.__module__ 就在一定程度上避免了重合,并且将 func.__doc__ 来作为权限种的 name,还没想到更好的办法来自动加入 name

那么上面的用户认证换成 permission 就是下面

@app.router('/user/info')
@permission
def user_info():
    """用户信息"""
    return render_template('user/info.html')

在开发的过程中,如果写了一个权限就要加到数据库里该有多累,于是就加了一个 permissions,这里把所有的 view 都加到这里面来,然后通过下面的脚本来加入权限

from application.models.user import PermissionModel, RoleModel, role_permission
from application.utils.permissions import permissions
for permission in permissions:
    p = PermissionModel.query.filter_by(action=permission['action']).first()
    if not p:
        p = PermissionModel(
            name=permission['name'],
            action=permission['action']
        )
        db.session.add(p)
        db.session.commit()

role = RoleModel.query.first()  # 这里默认获取一个角色,并且赋予权限
for p in PermissionModel.query.filter_by(is_delete=False):
    r = db.session.query(role_permission).join(RoleModel).join(PermissionModel).\
        filter(
            RoleModel.id == role.id,
            RoleModel.is_delete == False,
            PermissionModel.id == p.id,
            PermissionModel.is_delete == False,
            role_permission.c.is_delete == False,
        ).first()
    if not r:
        role.permissions.append(p)

role.save()