篇首语:本文由编程笔记#小编为大家整理,主要介绍了drf-jwt认证组件权限组件频率组件的使用相关的知识,希望对你有一定的参考价值。
目录
三大认证流程图:
在restframework中自带认证组件,而其自带的认证组件是如何认证校验的呢:
class BaseAuthentication:
"""
All authentication classes should extend BaseAuthentication.
所有身份验证类都应扩展BaseAuthentication。
"""
def authenticate(self, request):
"""
Authenticate the request and return a two-tuple of (user, token).
验证请求并返回(user, token)的二元组。
"""
raise NotImplementedError(".authenticate() must be overridden.")
def authenticate_header(self, request):
"""
Return a string to be used as the value of the `WWW-Authenticate`
header in a `401 Unauthenticated` response, or `None` if the
authentication scheme should return `403 Permission Denied` responses.
返回一个字符串,用作“ 401 Unauthenticated”响应中的“ WWW-Authenticate”标头的值;如果身份验证方案应返回“ 403 Permission Denied”响应,则返回“ None”。
"""
pass
class BasicAuthentication(BaseAuthentication):
"""
HTTP Basic authentication against username/password.
针对用户名/密码的HTTP基本身份验证。
"""
www_authenticate_realm = 'api'
def authenticate(self, request):
"""
Returns a `User` if a correct username and password have been supplied
using HTTP Basic authentication. Otherwise returns `None`.
如果使用HTTP Basic身份验证提供了正确的用户名和密码,则返回“用户”。 否则返回“无”。
"""
auth = get_authorization_header(request).split()#按空格拆分,拆分的结果长度为2才合法
if not auth or auth[0].lower() != b'basic':#auth的结构大致是“basic abc.def.hig ”
#如果没有token,认证方法直接返回None,代表游客方式访问
return None
#判断auth是否被切成了两份
if len(auth) == 1:
msg = _('Invalid basic header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid basic header. Credentials string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
#作用:当提交的token格式有误则抛出异常,(非法用户访问,可用于反爬)
try:
#反解token(auth是被查分的列表,0是头,1是token)
auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')
except (TypeError, UnicodeDecodeError, binascii.Error):
msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
raise exceptions.AuthenticationFailed(msg)
#如果反解失败则抛出异常,代表该用户是非法用户
userid, password = auth_parts[0], auth_parts[2]
return self.authenticate_credentials(userid, password, request)
#token反解成功,返回(user,token)组成的元组,代表合法用户
#元组0位的user会被存储到request.user中
#元组1位的token会被存储到request.auth中,通常也可以不保存,所以可以用none填充。
def authenticate_credentials(self, userid, password, request=None):
"""
Authenticate the userid and password against username and password
with optional request for context.
针对用户名和密码对用户标识和密码进行身份验证,并提供可选的上下文请求。
"""
credentials = {
get_user_model().USERNAME_FIELD: userid,
'password': password
}
user = authenticate(request=request, **credentials)
if user is None:
raise exceptions.AuthenticationFailed(_('Invalid username/password.'))
if not user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
return (user, None)
def authenticate_header(self, request):
return 'Basic realm="%s"' % self.www_authenticate_realm
这里我们不采用rest-framework的身份认证组件而是采用drf-jwt框架的认证组件,下面看一下相关源码:
class BaseJSONWebTokenAuthentication(BaseAuthentication):
"""
Token based authentication using the JSON Web Token standard.
"""
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
jwt_value = self.get_jwt_value(request)
#从请求中拿出token
if jwt_value is None:
return None
#如果没有token则返回None,以游客方式访问
#反解token,反解失败,抛出异常,用户为非法用户;反解成功则反解的数据就是载荷,存放在payload中
try:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
#载荷校验得到登录用户
user = self.authenticate_credentials(payload)
#得到登录用户,返回(user,token)
return (user, jwt_value)
def authenticate_credentials(self, payload):
"""
Returns an active user that matches the payload's user id and email.
"""
User = get_user_model()
username = jwt_get_username_from_payload(payload)
if not username:
msg = _('Invalid payload.')
raise exceptions.AuthenticationFailed(msg)
try:
user = User.objects.get_by_natural_key(username)
except User.DoesNotExist:
msg = _('Invalid signature.')
raise exceptions.AuthenticationFailed(msg)
if not user.is_active:
msg = _('User account is disabled.')
raise exceptions.AuthenticationFailed(msg)
return user
class JSONWebTokenAuthentication(BaseJSONWebTokenAuthentication):
"""
Clients should authenticate by passing the token key in the "Authorization"
HTTP header, prepended with the string specified in the setting
`JWT_AUTH_HEADER_PREFIX`. For example:
Authorization: JWT eyJhbGciOiAiSFMyNTYiLCAidHlwIj
"""
www_authenticate_realm = 'api'
def get_jwt_value(self, request):
auth = get_authorization_header(request).split()
auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()
#这里的校验和rest-framework类似
if not auth:
if api_settings.JWT_AUTH_COOKIE:
return request.COOKIES.get(api_settings.JWT_AUTH_COOKIE)
return None
if smart_text(auth[0].lower()) != auth_header_prefix:
return None
if len(auth) == 1:
msg = _('Invalid Authorization header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid Authorization header. Credentials string '
'should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
return auth[1]
def authenticate_header(self, request):
"""
Return a string to be used as the value of the `WWW-Authenticate`
header in a `401 Unauthenticated` response, or `None` if the
authentication scheme should return `403 Permission Denied` responses.
"""
return '{0} realm="{1}"'.format(api_settings.JWT_AUTH_HEADER_PREFIX, self.www_authenticate_realm)
我们还可以自定义认证组件,方法是自己新建一个authentications文件,然后重写校验代码,并在settings中进行配置,但是我们一般不采用自定义的方法而是使用drf-jwt认证组件进行身份认证。由于身份认证只是判断一下来访问的客户端是什么身份,并不做其他的处理,所以我们一般需要对其进行全局配置,因为所有来访问的人是什么身份,以便权限组件进行用户权限的处理。
认证组件的全局配置:
'''
1.settings文件全局配置drf-jwt框架的认证类
REST_FRAMEWORK = {
# 认证组件
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework_jwt.authentication.JSONWebTokenAuthentication'
],
}
2.在自定义的authentications文件中导入认证组件
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
class MyAuthentication(JSONWebTokenAuthentication):
def authenticate(self, request):
"""
1) 从请求头中拿到前台提交的token(一般从HTTP_AUTHORIZATION中拿,也可以与前台约定)
-- 如果设置了反爬等措施,校验一下反爬(头 token)
2)没有token,返回None,代表游客
3)有token,进入校验
-- 不通过:抛AuthenticationFailed异常,代表非法用户
-- 通过:返回 (user, token),代表合法用户
"""
pass#这里的pass相当于return None
'''
自定义认证类的步骤:
# 自定义认证类
# 1) 如果使用session认证,drf默认提供了SessionAuthentication
# 2) 如果使用drf-jwt认证框架,drf-jwt框架提供了JSONWebTokenAuthentication
# 3) 如果是自定义签发与校验token,才需要将校验token的算法封装到自定义的认证类中
from rest_framework.authentication import BaseAuthentication
class MyAuthentication(BaseAuthentication):
def authenticate(self, request):
pass#这里的pass相当于return None
权限组件的校验套路都是一样的,如果验证通过就return True,如不通过就return False。
class AllowAny(BasePermission):
"""
Allow any access.
This isn't strictly required, since you could use an empty
permission_classes list, but it's useful because it makes the intention
more explicit.
"""
def has_permission(self, request, view):
return True
drf默认提供了一些权限类:
AllowAny:游客和登录用户有全权限
IsAuthenticated:只有登录用户有全权限
IsAdminUser:只有后台用户(admin用户)有全权限
IsAuthenticatedOrReadOnly:游客有读权限,登录用户有全权限
权限组件为我们提供的校验功能是有限的,我们可以自定义自己的校验功能。自定义权限类的步骤如下:
1.新建自己的permissions文件,定义自己的校验类并继承BasePermission类
from rest_framework.permissions import BasePermission
"""如果有特殊需要,需要自定义权限类
如:只有superuser有权限、只有vip用户有权限、只有某ip网段用户有权限、只有某个视图及其子类有权限
"""
class MyPermission(BasePermission):
def has_permission(self, request, view):
"""
1) 根据需求,request和view的辅助,制定权限规则判断条件
2)如果条件通过,返回True
3)如果条件不通过,返回False
"""
print(request.user, request.auth)
return False
# VIP用户权限
class VIPUserPermission(BasePermission):
def has_permission(self, request, view):
for group in request.user.groups.all():
if group.name.lower() == 'vip':#如果用户的group是vip则校验通过,否则不通过。
return True
return False
我们还可结合权限组件的权限类使用,方法:
from rest_framework.permissions import IsAuthenticated, IsAdminUser, AllowAny, IsAuthenticatedOrReadOnly
permission_classes = [IsAuthenticated]
permission_classes = [IsAdminUser]#这里是在重写APIView的权限校验属性,属于局部配置。如果我们需要添加多个权限类则直接在中括号内添加。
直接将对应的类导入进行属性添加即可。
# 频率组件:频率类一般做局部配置,但是频率调节在settings中配置
'DEFAULT_THROTTLE_RATES': {
'user': '5/min',
'anon': '3/min',
'mobile': '1/min'
},
# drf-jwt自定义配置
import datetime
JWT_AUTH = {
# 过期时间
'JWT_EXPIRATION_DELTA': datetime.timedelta(secOnds=300),
# 是否允许刷新
'JWT_ALLOW_REFRESH': False,
# 最大刷新的过期时间
'JWT_REFRESH_EXPIRATION_DELTA': datetime.timedelta(days=7),
}
首先我们来分析一下rest-framework-jwt签发token的代码:
class JSONWebTokenSerializer(Serializer):
"""
Serializer class used to validate a username and password.
'username' is identified by the custom UserModel.USERNAME_FIELD.
Returns a JSON Web Token that can be used to authenticate later calls.
"""
def __init__(self, *args, **kwargs):
"""
Dynamically add the USERNAME_FIELD to self.fields.
"""
super(JSONWebTokenSerializer, self).__init__(*args, **kwargs)
#jwt的校验规则只校验username和password
self.fields[self.username_field] = serializers.CharField()
self.fields['password'] = PasswordField(write_Only=True)
@property
def username_field(self):
return get_username_field()
def validate(self, attrs):
#将username和password包装到字典中
credentials = {
self.username_field: attrs.get(self.username_field),
'password': attrs.get('password')
}
#用auth组件的authenticate方法进行校验,该方法也只能校验username和password
if all(credentials.values()):
user = authenticate(**credentials)
#判断用户是否激活
if user:
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
#将激活的用户添加到载荷中
payload = jwt_payload_handler(user)
#将token和user存放在serializer对象中
return {
'token': jwt_encode_handler(payload),#签发token字符串
'user': user
}
else:
msg = _('Unable to log in with provided credentials.')
raise serializers.ValidationError(msg)
else:
msg = _('Must include "{username_field}" and "password".')
msg = msg.format(username_field=self.username_field)
raise serializers.ValidationError(msg)
通过上面源码我们可以总结drf-jwt签发token的步骤如下:
username、password通过auth组件的authenticate方法得到user对象
user对象通过drf-jwt框架的jwt_payload_handler函数包装payload载荷
payload载荷通过drf-jwt框架的jwt_encode_handler函数签发token字符串
注:我们可以借助jwt_payload_handler和jwt_encode_handler两个函数完成自定义jws-token的签发。
# 多方式登录
#视图类
from rest_framework.views import APIView
class LoginAPIView(APIView):
""" 重点
1)token只能由登录接口签发
2)登录接口也是APIView的子类,使用一定会进行认证、权限组件的校验
结论:不管系统默认、或是全局settings配置的是何认证与权限组件,登录接口不用参与任何认证与权限的校验
所以,登录接口一定要进行认证与权限的局部禁用
"""
authentication_classes = []
pagination_class = []
def post(self, request, *args, **kwargs):
serializer = serializers.LoginModelSerializer(data=request.data)
serializer.is_valid(raise_exception=True) # 内部在全局钩子中完成token的签发
return APIResponse(results={
'username': serializer.content.get('user').username,
'token': serializer.content.get('token')
})
#序列化类
from rest_framework_jwt.serializers import jwt_payload_handler, jwt_encode_handler
import re
class LoginModelSerializer(serializers.ModelSerializer):
# post请求,序列化默认当做create动作进行校验,需要校验数据库,create动作username会抛用户已存在异常
# 抛用户已存在异常是多余的,所以自定义系统校验规则即可
username = serializers.CharField(min_length=3, max_length=16)
password = serializers.CharField(min_length=3, max_length=16)
class Meta:
model = models.User
fields = ('username', 'password')
# 用全局钩子,完成token的签发
def validate(self, attrs):
# 1)通过 username 和 password 完成多方式登录校验,得到user对象
user = self._validate_user(attrs)
# 2)user对象包装payload载荷
payload = jwt_payload_handler(user)
# 3)payload载荷签发token
token = jwt_encode_handler(payload)
# 4)将user与token存储到serializer对象中,方便在视图类中使用
self.cOntent= {
'user': user,
'token': token
}
return attrs
def _validate_user(self, attrs):
username = attrs.get('username')
password = attrs.get('password')
if re.match(r'.*@.*', username): # 邮箱
user = models.User.objects.filter(email=username).first() # type: models.User
elif re.match(r'^1[3-9][0-9]{9}$', username): # 电话
user = models.User.objects.filter(mobile=username).first()
else: # 用户名
user = models.User.objects.filter(username=username).first()
if not user or not user.check_password(password):
raise serializers.ValidationError({'message': '用户信息异常'})
"""
AnonRateThrottle:只对游客进行频率限制
UserRateThrottle:对所有用户进行频率限制
看一下频率组件的源码:
class AnonRateThrottle(SimpleRateThrottle):
"""
Limits the rate of API calls that may be made by a anonymous users.
The IP address of the request will be used as the unique cache key.
限制匿名用户可能进行的API调用的速率。
???? 请求的IP地址将用作唯一的缓存密钥。
"""
scope = 'anon'
def get_cache_key(self, request, view):
if request.user.is_authenticated:
#登录用户返回none代表登录用户可以无限次访问,我们可以重新在settings中配置需要限制的频率
return None # Only throttle unauthenticated requests.
return self.cache_format % {
#匿名用户,返回一个与当前匿名用户有关的字符串
'scope': self.scope,
'ident': self.get_ident(request)
}
# 配置drf自带的频率类
'DEFAULT_THROTTLE_CLASSES': [],
#在settings文件中全局配置
from rest_framework.throttling import AnonRateThrottle, UserRateThrottle
throttle_classes = [UserRateThrottle]
#在视图类中局部配置
#在我们的settings文件中的REST_FRAMEWORK中进行如下频率配置
'DEFAULT_THROTTLE_RATES': {
'user': '5/min',#自定义已登录用户每分钟访问次数
'anon': '3/min',#自定义匿名用户每分钟访问次数
'mobile': '1/min'
},
在实际应用中如对ip进行限次、对电话进行限次、对视图某些信息进行限次我们需要自定义频率类。
以手机号为例:
from rest_framework.throttling import SimpleRateThrottle
class MobileRateThrottle(SimpleRateThrottle):
"""
1)设置scope字符串类属性,同时在settings中进行drf配置DEFAULT_THROTTLE_RATES
eg: DEFAULT_THROTTLE_RATES = {'mobile': '1/min'}
2)重写get_catch_key方法:
返回与限制条件有关的字符串,表示限制
返回None,表示不限制
"""
scope = 'mobile'
def get_cache_key(self, request, view):
if not request.user.is_authenticated or not request.user.mobile:
return None # 匿名用户 或 没有电话号的用户 都不限制
# 只要有电话号的用户踩进行限制
return self.cache_format % {
'scope': self.scope,
'ident': request.user.mobile
}