原型效果
接口设计
# 获取短信验证码
/app/sms/codes/<mobile># 请求方式
GET# 请求参数 路径参数
mobile 手机号响应数据 json
{"message": "ok","data": {"mobile": 135xxxxxxxx }
}
接口实现
# app/resources/user/passport.pyfrom flask_restful import Resource
import random
from app import redis_client
from utils.constants import SMS_CODE_EXPIREclass SMSCodeResource(Resource):"""获取短信验证码"""def get(self, mobile):# 生成短信验证码rand_num &#61; &#39;%06d&#39; % random.randint(0, 999999)# 保存验证码(redis) app:code:18912341234 123456key &#61; &#39;app:code:{}&#39;.format(mobile)redis_client.set(key, rand_num, ex&#61;SMS_CODE_EXPIRE)# 发送短信 第三方短信平台 celeryprint(&#39;短信验证码: "mobile": {}, "code": {}&#39;.format(mobile, rand_num))# 返回结果return {&#39;mobile&#39;: mobile}
# common/utils/constants.py...SMS_CODE_EXPIRE &#61; 300 # 短信验证码有效期
配置URL
# app/resources/user/__init__.py...user_api.add_resource(SMSCodeResource, &#39;/sms/codes/
# common/utils/constants.py...BASE_URL_PRIFIX &#61; &#39;/app&#39; # 基础URL的前缀
# app/resources/user/__init__.py...from utils.constants import BASE_URL_PRIFIX...user_bp &#61; Blueprint(&#39;user&#39;, __name__, url_prefix&#61;BASE_URL_PRIFIX)
配置路由转化器
# common/utils/converters.pyfrom werkzeug.routing import BaseConverterclass MobileConverter(BaseConverter):"""手机号格式"""regex &#61; r&#39;1[3-9]\d{9}&#39;def register_converters(app):"""向Flask app中添加转换器:param app: Flask app对象"""app.url_map.converters[&#39;mob&#39;] &#61; MobileConverter
在 app包的初始化文件中注册路由转换器
# app/__init__.pydef register_extensions(app):"""组件初始化"""...# 添加转换器from utils.converters import register_convertersregister_converters(app)
在 user包的初始化文件中, 给类视图的路径参数添加转换器
# app/resources/user/__init__.py...user_api.add_resource(SMSCodeResource, &#39;/sms/codes/
# 注册登录
/app/authorizations# 请求方式
POST# 请求参数 json
mobile 手机号
code 短信验证码响应数据 json
{"message": "ok","data": {"token": "xxxxxxxx"}
}
模型设计
# common/models/user.pyfrom app import dbclass User(db.Model):"""用户基本信息"""__tablename__ &#61; &#39;user_basic&#39;id &#61; db.Column(db.Integer, primary_key&#61;True, doc&#61;&#39;用户ID&#39;)mobile &#61; db.Column(db.String(11), doc&#61;&#39;手机号&#39;)name &#61; db.Column(db.String(20), doc&#61;&#39;昵称&#39;)last_login &#61; db.Column(db.DateTime, doc&#61;&#39;最后登录时间&#39;)introduction &#61; db.Column(db.String(50), doc&#61;&#39;简介&#39;)article_count &#61; db.Column(db.Integer, default&#61;0, doc&#61;&#39;作品数&#39;)following_count &#61; db.Column(db.Integer, default&#61;0, doc&#61;&#39;关注的人数&#39;)fans_count &#61; db.Column(db.Integer, default&#61;0, doc&#61;&#39;粉丝数&#39;)profile_photo &#61; db.Column(db.String(130), doc&#61;&#39;头像&#39;)def to_dict(self):"""模型转字典, 用于序列化处理"""return {&#39;id&#39;: self.id,&#39;name&#39;: self.name,&#39;photo&#39;: self.profile_photo,&#39;intro&#39;: self.introduction,&#39;art_count&#39;: self.article_count,&#39;follow_count&#39;: self.following_count,&#39;fans_count&#39;: self.fans_count}
数据迁移
在 app包的初始化文件的 register_extensions函数中, 对数据迁移组件进行初始化
# app/__init__.py ...from flask_migrate import Migrate...def register_extensions(app):"""组件初始化"""...# 数据迁移组件初始化Migrate(app, db)# 导入模型类from models import user
执行数据迁移命令
export FLASK_APP&#61;app.main # 设置环境变量指定启动文件
flask db init # 生成迁移文件夹
flask db migrate # ⽣成迁移版本, 保存到迁移文件夹中
flask db upgrade # 执行迁移
# .gitignore*.py[cod]
.idea
migration
配置请求校验函数
# common/utils/parser.pyimport re
import base64
import imghdr
from datetime import datetimedef email(email_str):"""检验邮箱格式:param email_str: str 被检验字符串:return: email_str"""if re.match(r&#39;^([A-Za-z0-9_\-\.\u4e00-\u9fa5])&#43;\&#64;([A-Za-z0-9_\-\.])&#43;\.([A-Za-z]{2,8})$&#39;, email_str):return email_strelse:raise ValueError(&#39;{} is not a valid email&#39;.format(email_str))def mobile(mobile_str):"""检验手机号格式:param mobile_str: str 被检验字符串:return: mobile_str"""if re.match(r&#39;^1[3-9]\d{9}$&#39;, mobile_str):return mobile_strelse:raise ValueError(&#39;{} is not a valid mobile&#39;.format(mobile_str))def id_number(value):"""检查是否为身份证号"""id_number_pattern &#61; r&#39;(^[1-9]\d{5}(18|19|([23]\d))\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\d{3}[0-9Xx]$)|(^[1-9]\d{5}\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\d{2}$)&#39;if re.match(id_number_pattern, value):return value.upper()else:raise ValueError(&#39;Invalid id number.&#39;)
接口实现
# app/resources/user/passport.py...from datetime import datetime, timedelta
from flask import current_app
from flask_restful.inputs import regex
from flask_restful.reqparse import RequestParser
from sqlalchemy.orm import load_only
from app import db
from utils.parser import mobile as mobile_type
from models.user import User...class LoginResource(Resource):"""注册登录"""def post(self):# 获取参数parser &#61; RequestParser()parser.add_argument(&#39;mobile&#39;, required&#61;True, location&#61;&#39;json&#39;, type&#61;mobile_type)parser.add_argument(&#39;code&#39;, required&#61;True, location&#61;&#39;json&#39;, type&#61;regex(r&#39;^\d{6}$&#39;))args &#61; parser.parse_args()mobile &#61; args.mobilecode &#61; args.code# 校验短信验证码key &#61; &#39;app:code:{}&#39;.format(mobile)real_code &#61; redis_client.get(key)if not real_code or real_code !&#61; code:return {&#39;message&#39;: &#39;Invalid Code&#39;, &#39;data&#39;: None}, 400# 删除验证码# redis_client.delete(key)# 校验成功, 查询数据库user &#61; User.query.options(load_only(User.id)).filter(User.mobile &#61;&#61; mobile).first()if user: # 如果有, 取出用户id, 更新最后登录时间user.last_login &#61; datetime.now()else: # 如果没有, 创建新用户user &#61; User(mobile&#61;mobile, name&#61;mobile, last_login&#61;datetime.now())db.session.add(user)db.session.commit()# 返回结果return {&#39;userid&#39;: user.id}, 201
注意点
配置URL
在 user包的初始化文件中设置类视图的URL
# app/resources/user/__init__.pyfrom .passport import SMSCodeResource, LoginResource# 添加类视图
user_api.add_resource(LoginResource, &#39;/authorizations&#39;)
状态保持
PyJWT
base64.b64encode(os.urandom(40)).decode()
配置JWT工具函数
# common/utils/jwt_util.pyimport jwt
from flask import current_appdef generate_jwt(payload, expiry, secret&#61;None):"""生成jwt:param payload: dict 载荷:param expiry: datetime 有效期:param secret: 密钥:return: jwt"""_payload &#61; {&#39;exp&#39;: expiry}_payload.update(payload)if not secret:secret &#61; current_app.config[&#39;JWT_SECRET&#39;]token &#61; jwt.encode(_payload, secret, algorithm&#61;&#39;HS256&#39;)return token.decode()def verify_jwt(token, secret&#61;None):"""检验jwt:param token: jwt:param secret: 密钥:return: dict: payload"""if not secret:secret &#61; current_app.config[&#39;JWT_SECRET&#39;]try:payload &#61; jwt.decode(token, secret, algorithm&#61;[&#39;HS256&#39;])except jwt.PyJWTError:payload &#61; Nonereturn payload
# app/settings/config.pyclass DefaultConfig:"""默认配置"""...# JWTJWT_SECRET &#61; &#39;TPmi4aLWRbyVq8zu9v82dWYW17/z&#43;UvRnYTt4P6fAXA&#39; # 秘钥JWT_EXPIRE_DAYS &#61; 14 # JWT过期时间14天
接口实现
# app/resources/user/passport.py...from utils.jwt_util import generate_jwt...class LoginResource(Resource):"""注册登录"""def post(self):...db.session.commit()# 生成jwttoken &#61; generate_jwt({&#39;userid&#39;: user.id}, expiry&#61;datetime.utcnow() &#43; timedelta(days&#61;current_app.config[&#39;JWT_EXPIRE_DAYS&#39;]))# 返回结果return {&#39;token&#39;: token}, 201
接口设计
# 获取当前用户信息
/app/user# 请求方式
GET# 请求头
Authorization 用户token响应数据 json
{"message": "OK","data": {"id": 1155,"name": "18912341234","photo": "xxxxx","intro": "xxx","art_count": 0,"follow_count": 0,"fans_count": 0}
}
相关模型类
# common/models/user.pyfrom app import dbclass User(db.Model):"""用户基本信息"""__tablename__ &#61; &#39;user_basic&#39;id &#61; db.Column(db.Integer, primary_key&#61;True, doc&#61;&#39;用户ID&#39;)mobile &#61; db.Column(db.String(11), doc&#61;&#39;手机号&#39;)name &#61; db.Column(db.String(20), doc&#61;&#39;昵称&#39;)last_login &#61; db.Column(db.DateTime, doc&#61;&#39;最后登录时间&#39;)introduction &#61; db.Column(db.String(50), doc&#61;&#39;简介&#39;)article_count &#61; db.Column(db.Integer, default&#61;0, doc&#61;&#39;作品数&#39;)following_count &#61; db.Column(db.Integer, default&#61;0, doc&#61;&#39;关注的人数&#39;)fans_count &#61; db.Column(db.Integer, default&#61;0, doc&#61;&#39;粉丝数&#39;)profile_photo &#61; db.Column(db.String(130), doc&#61;&#39;头像&#39;)def to_dict(self):"""模型转字典, 用于序列化处理"""return {&#39;id&#39;: self.id,&#39;name&#39;: self.name,&#39;photo&#39;: self.profile_photo,&#39;intro&#39;: self.introduction,&#39;art_count&#39;: self.article_count,&#39;follow_count&#39;: self.following_count,&#39;fans_count&#39;: self.fans_count}
代码实现
实现权限控制
获取用户信息接口有访问权限要求: 用户登录才能访问, 所以需要实现权限控制, 需要实现以下两步:
定义钩子函数: 获取用户信息, 并使用g变量传递数据
定义装饰器: 根据用户信息进行访问限制
# common/utils/middlewares.pyfrom flask import request, g
from utils.jwt_util import verify_jwtdef get_userinfo():"""获取用户信息"""# 获取请求头中的tokentoken &#61; request.headers.get(&#39;Authorization&#39;)g.userid &#61; None # 如果未登录, userid&#61;Noneif token: # 如果传递了token# 校验tokendata &#61; verify_jwt(token)if data: # 校验成功g.userid &#61; data.get(&#39;userid&#39;) # 如果已登录, userid&#61;11
# app/__init__.pydef register_extensions(app):"""组件初始化"""...# 添加请求钩子from utils.middlewares import get_userinfoapp.before_request(get_userinfo)
# common/utils/decorators.pyfrom flask import g
from functools import wrapsdef login_required(f):&#64;wraps(f)def wrapper(*args, **kwargs):# 如果用户已登录, 正常访问if g.userid:return f(*args, **kwargs)else:return {&#39;message&#39;: &#39;Invalid Token&#39;, &#39;data&#39;: None}, 401return wrapper
接口实现
# app/resources/user/profile.pyfrom flask import g
from flask_restful import Resource
from sqlalchemy.orm import load_onlyfrom models.user import User
from utils.decorators import login_requiredclass CurrentUserResource(Resource):"""个人中心-当前用户"""method_decorators &#61; {&#39;get&#39;: [login_required]}def get(self):# 获取用户iduserid &#61; g.userid# 查询用户数据user &#61; User.query.options(load_only(User.id, User.name, User.profile_photo, User.introduction, User.article_count, User.following_count, User.fans_count)).filter(User.id &#61;&#61; userid).first()return user.to_dict()
配置URL
# app/resources/user/__init__.pyfrom .profile import CurrentUserResource# 添加类视图
user_api.add_resource(CurrentUserResource, &#39;/user&#39;)