编辑推荐: |
本文来自于www.cnblogs.com,文章案例说明,从配置文件,路由系统,视图,模板语言等介绍详细。 |
|
前言:
Django:1个重武器,包含了web开发中常用的功能、组件的框架;(ORM、Session、Form、Admin、分页、中间件、信号、缓存、ContenType....);
Tornado:2大特性就是异步非阻塞、原生支持WebSocket协议;
Flask:封装功能不及Django完善,性能不及Tornado,但是Flask的第三方开源组件比丰富;http://flask.pocoo.org/extensions/
Bottle:比较简单;
总结:
都不是我写的!!!不论优劣,不同的工具而已;
小型web应用设计的功能点不多使用Flask;
大型web应用设计的功能点比较多使用的组件也会比较多,使用Django(自带功能多不用去找插件);
如果追求性能可以考虑Tornado;
Flask的socket是基于Werkzeug 实现的,模板语言依赖jinja2模板,在使用Flask之前需要安装一下;
pip3 install flask
#安装flask |
from werkzeug.wrappers
import Request, Response # Flask的socket使用werkzeug实现,所以要导入
werkzeug
@Request.application
def hellow(request):
return Response('Hello World')
if __name__ == '__main__':
from werkzeug.serving import run_simple
run_simple('localhost',400,hellow)
|
Flask简单使用
from flask import
Flask
app=Flask(__name__) #创建1个Flask实例
@app.route('/') #路由系统生成 视图对应url,1. decorator=app.route()
2. decorator(first_flask)
def first_flask(): #视图函数
return 'Hello World' #response
if __name__ == '__main__':
app.run() #启动socket
|
一、配置文件
app=Flask (__name__, template_folder ='templates', static_url_path ='/static/',static_path ='/zhanggen') |
模板路径: template_folder='templates'
静态文件路径:static_url_path='/static/'
静态文件引入别名:static_path='/zhanggen'
设置为调试环境:app.debug=True (代码修改自动更新)
设置json编码格式 如果为False 就不使用ascii编码:app.config['JSON_AS_ASCII']=False
设置响应头信息Content-Type app.config['JSONIFY_MIMETYPE']
="application/json;charset=utf-8" (注意 ;charset=utf-8)
二、路由系统
1.动态路由(url传参)
@app.route('/user/<name>')
from flask import
Flask
app=Flask(__name__)
@app.route('/<name>') #设置url传参数 http://127.0.0.1:5000/zhanggen
def first_flask(name): #视图必须有对应接收参数
print(name)
return 'Hello World' #response
if __name__ == '__main__':
app.run()
|
@app.route('/post/<int:age>'))
接收整型数字参数
app=Flask(__name__)
@app.route('/<int:age>/') #设置url传参数 http://127.0.0.1:5000/18/
def first_flask(age): #视图必须有对应接收参数
print(age)
return 'Hello World' #response
if __name__ == '__main__':
app.run() |
@app.route('/post/<float:salary>')
#接收浮点型型数字参数
app=Flask(__name__)
@app.route('/<float:salary>/') #设置url传参数http://127.0.0.1:5000/2345555.8889/
def first_flask(salary): #视图必须有对应接收参数
print(salary)
return 'Hello World' #response
if __name__ == '__main__':
app.run() |
@app.route('/post/<path:path>')
# 接收URL链接类型参数
app=Flask(__name__)
@app.route('/<path:url>/') #设置url传参数:http://127.0.0.1:5000/http://www.baiu.com/
def first_flask(url): #视图必须有对应接收参数
print(url)
return 'Hello World' #response
if __name__ == '__main__':
app.run() |
2、指定允许的请求方法
@app.route('/login', methods=['GET',
'POST'])
# 指定允许的请求方法
app=Flask(__name__)
@app.route('/<path:url>/',methods=['get'])
#只允许get请求
def first_flask(url):
print(url)
return 'Hello World' #response
if __name__ == '__main__':
app.run() |
3、通过别名反向生成url
#反向生成url
from flask import Flask,url_for
app=Flask(__name__)
@app.route('/<path:url>',endpoint='name1')
def first_flask(url):
print(url_for('name1',url=url)) #如果设置了url参数,url_for(别名,加参数)
return 'Hello World'
if __name__ == '__main__':
app.run() |
4、通过app.add_url_rule()调用路由
#方式2通过app.add_url_rule()方法的方式调用路由
app=Flask(__name__)
def first_flask():
return 'Hello World'
app.add_url_rule(rule='/index/', endpoint='name1', view_func=first_flask, methods=['GET'])
#app.add_url_rule(rule=访问的url, endpoint=路由别名, view_func=视图名称, methods=[允许访问的方法])
if __name__ == '__main__':
app.run() |
5、扩展路由功能:正则匹配url
如果需要一些复杂的匹配规则可以自定义正则匹配url
from flask import
Flask, views, url_for
from werkzeug.routing import BaseConverter
app = Flask(import_name=__name__)
class RegexConverter(BaseConverter):
"""
自定义URL匹配正则表达式
"""
def __init__(self, map, regex):
super(RegexConverter, self).__init__(map)
self.regex = regex
def to_python(self, value):
"""
路由匹配时,匹配成功后传递给视图函数中参数的值
:param value:
:return:
"""
return int(value)
def to_url(self, value):
"""
使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
:param value:
:return:
"""
val = super(RegexConverter, self).to_url(value)
return val
# 添加到flask中
app.url_map.converters['regex'] = RegexConverter
@app.route('/index/<regex("\d+"):nid>')
def index(nid):
print(url_for('index', nid='888'))
return 'Index'
if __name__ == '__main__':
app.run()
|
四、视图 1、给Flask视图函数加装饰器
注意如果要给视图函数加装饰器增加新功能,一点要加在路由装饰器下面,才会被路由装饰器装饰,才能生生成url关系;
#给Flask视图加装饰器
#1、定义1个装饰器
def auth(func):
print('我在上面')
def inner(*args,**kwargs):
return func(*args,**kwargs)
return inner
app=Flask(__name__)
@app.route('/',methods=['GET'])
@auth #注意如果要给视图函数加装饰器,一点要加在路由装饰器下面,才会被路由装饰器装饰
def first_flask():
print('ffff')
return 'Hello World'
if __name__ == '__main__':
app.run() |
2、request和response
a.请求相关信息
request.method: 获取请求方法
request.json
request.json.get("json_key"):获取json数据 **较常用
request.argsget('name') :获取get请求参数
request.form.get('name') :获取POST请求参数
request.form.getlist('name_list'):获取POST请求参数列表(多个)
request.values.get('age') :获取GET和POST请求携带的所有参数(GET/POST通用)
request.cookies.get('name'):获取cookies信息
request.headers.get('Host'):获取请求头相关信息
request.path:获取用户访问的url地址,例如(/,/login/,/ index/);
request.full_path:获取用户访问的完整url地址+参数 例如(/login/?age=18)
request.script_root: 抱歉,暂未理解其含义;
request.url:获取访问url地址,例如http://127.0.0.1:5000/?age=18;
request.base_url:获取访问url地址,例如 http://127.0.0.1:5000/;
request.url_root
request.host_url
request.host:获取主机地址
request.files:获取用户上传的文件
obj = request.files['the_file_name']
obj.save('/var/www/uploads/' + secure_filename(f.filename))
直接保存
b、响应相关信息
return "字符串" :响应字符串
return render_template('html模板路径',**{}):响应模板
return redirect('/index.html'):跳转页面
响应json数据
方式1: return jsonify(user_list)
app.config['JSON_AS_ASCII']= False
#指定json编码格式 如果为False 就不使用ascii编码,
app.config['JSONIFY_MIMETYPE'] = "application/json;charset =utf-8"
#指定浏览器渲染的文件类型,和解码格式; |
方式2:
return Response(data,mimetype="application/json;charset=utf-8",)
如果需要设置响应头就需要借助make_response()方法
from flask import
Flask, request,make_response |
response = make_response(render_template('index.html'))
response是flask.wrappers.Response类型
response.delete_cookie('key')
response.set_cookie('key', 'value')
response.headers['X-Something'] = 'A value'
return respons
3 、Flask之CBV视图
#CBV视图
from flask import Flask,url_for,views
app=Flask(__name__) #装饰器
def auth(func):
print('我在上面')
def inner(*args,**kwargs):
return func(*args,**kwargs)
return inner
class IndexView(views.MethodView): #CBV视图
methods=['GET'] #允许的http请求方法(改CBV只允许GET方法)
decorators = [auth,] #每次请求过来都加auth装饰器
def get(self):
return 'Index.GET'
def post(self):
return 'Index.POST'
app.add_url_rule('/index/', view_func= IndexView.as_view (name='name1'))
# (name='name1'反向生成url别名
if __name__ == '__main__':
app.run()
|
五、模板语言
Flask使用的是Jinja2模板,所以其语法和Django无差别(Django的模板语言参考Jinja2)
1.引用静态文件
方式1:别名引入
<link rel="stylesheet"
href="/zhanggen/commons.css"> |
方式2:url_for()方法引入
<link rel="stylesheet"
href="{{ url_for('static', filename='commons.css')
}}"> |
2.模板语言引用上下文对象
变量
<h1>{{user_list}}</h1>
<!--变量 --> |
循环、索引取值
<ul>
{% for user in user_list %} <!--循环 -->
<li>{{user}}</li>
{% endfor %}
{{user_list.0}} <!-- 索引取值-->
</ul> |
Flask的Jinjia2可以通过Context 把视图中的函数传递把模板语言中执行,这就是Django中的simple_tag和simple_fifter;
simple_tag(只能传2个参数,支持for、if)
@app.template_global()
#simple_tag
def foo(arg):
return '<input type="text">' |
simple_fifter(对参数个数无限制,不支持for、if)
@app.template_filter()
#simple_fifter
def foo1(arg1,arg2,arg3):
return arg1+arg2+arg3 |
<h1> {{
'alex'|foo1('s ','b',) }} </h1> <!--
simple_fifter --> |
3.wtform(flask表单验证插件)
3.0.简介
wtforms WTForms是一个支持多个web框架的form组件,主要对用户请求数据 进行表单验证。
3.1. 安装
pip install wtforms
#安装wtfroms插件 |
3.2.简单使用
wtforms和Django自带的form验证插件功能相同,使用起来大同小异;
用户登录页面验证
#!/usr/bin/env
python
# -*- coding:utf-8 -*-
from flask import Flask, render_template, request,
redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
app=Flask(__name__,template_folder='templates')
#知道模板文件
app.debug=True
#登录验证实例
class LoginForm(Form):
#不同的字段 内部包含正则表达式 html5.EmailField | html5.DateTimeField...
name=simple.StringField(
label='用户名',
validators=[ #验证规则和错误提示信息
validators.DataRequired(message='用户名不能为空.'),
validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
],
widget=widgets.TextInput(), #前端页面显示的插件.TextArea
render_kw={'class': 'form-control'} #设置form标签的class信息
)
# 不同的字段 内部包含正则表达式 html5.EmailField | html5.DateTimeField...
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.'),
validators.Length(min=8, message='用户名长度必须大于%(min)d'),
#自定义验证规则
validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
@app.route('/login/', methods=['GET', 'POST'])
def login():
if request.method == 'GET':
form = LoginForm() #实例化 form验证类
return render_template('login.html', form=form)
else:
form = LoginForm(formdata=request.form)
if form.validate(): #判断是否验证成功?
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('login.html', form=form)
if __name__ == '__main__':
app.run() |
<!DOCTYPE
html>
<html lang="en">
<head> <meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>登录</h1>
<form method="post" novalidate>
<!--<input type="text" name="name">-->
<p>{{form.name.label}} {{form.name}} {{form.name.errors[0]
}}</p>
<!--<input type="password"
name="pwd">-->
<p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0]
}}</p>
<input type="submit" value="提交">
</form>
</body>
</html>
login.html |
用户注册页面验证
#用户注册
from flask import Flask, render_template, request,
redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
app = Flask(__name__, template_folder='templates')
app.debug = True
class RegisterForm(Form):
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired()
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'},
default='张根' #设置input标签中默认值
)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
pwd_confirm = simple.PasswordField( #第二次输入密码
label='重复密码',
validators=[
validators.DataRequired(message='重复密码不能为空.'),
validators.EqualTo('pwd', message="两次密码输入不一致")
#验证2次输入的密码是否一致?
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
email = html5.EmailField(
label='邮箱',
validators=[
validators.DataRequired(message='邮箱不能为空.'),
validators.Email(message='邮箱格式错误')
],
widget=widgets.TextInput(input_type='email'),
#生成email input标签
render_kw={'class': 'form-control'}
)
gender = core.RadioField(
label='性别',
choices=( #choice radio选项
(1, '男'),
(2, '女'),
),
coerce=int #讲用户提交过来的 '4' 强制转成 int 4
)
city = core.SelectField(
label='城市',
choices=(
('bj', '北京'),
('sh', '上海'),
)
)
hobby = core.SelectMultipleField( #select
下拉框多选框
label='爱好',
choices=(
(1, '篮球'),
(2, '足球'),
),
coerce=int
)
favor = core.SelectMultipleField(
label='喜好',
choices=(
(1, '篮球'),
(2, '足球'),
),
widget=widgets.ListWidget(prefix_label=False),
#生成Checkbox 多选框
option_widget=widgets.CheckboxInput(),
coerce=int,
default=[1, 2]
)
def __init__(self, *args, **kwargs): #重写form验证类的__init__方法可以实时同步数据中数据
super(RegisterForm, self).__init__(*args, **kwargs)
self.favor.choices = ((1, '篮球'), (2, '足球'),
(3, '羽毛球'))
def validate_pwd_confirm(self, field): #wtforms验证
钩子函数
"""
自定义pwd_confirm字段规则,例:与pwd字段是否一致
:param field:
:return:
"""
# 最开始初始化时,self.data中已经有所有的值
if field.data != self.data['pwd']:
# raise validators.ValidationError("密码不一致")
# 继续后续验证
raise validators.StopValidation("密码不一致")
# 不再继续后续验证
@app.route('/register/', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
form = RegisterForm(data={'gender': 1}) #默认值
return render_template('register.html', form=form)
else:
form = RegisterForm(formdata=request.form)
if form.validate():
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('register.html', form=form)
if __name__ == '__main__':
app.run() |
3.2.wtforms源码 猜想....
A.自动生成html标签
先来分析一下form验证类的结构
LoginForm类中包含了2个字段: name 和 pwd,而name / pwd字段 = 对象,所以LoginForm
类包含了2个对象;
如果实例化了obj=LoginForm() 就等于 在 这1个对象中嵌套了 2个对象;
前端使用Form验证插件:
那如果在前端for循环LoginForm对象,就等于调用LoginForm对象的__iter__方法,把每个字段(对象)封装的数据
返回
如果前端{{ obj }}= 直接调用了字段对象的__str__方法;
class InputText(object):
#插件
def __str__(self):
return '<input type="text" />'
class InputPassword(object):
def __str__(self):
return '<input type="password"
/>'
class StringField(object): #字段
def __init__(self,wg):
self.widget=wg
def __str__(self): #调用插件的__str__
return str(self.widget)
class DateField(object):
def __init__(self, wg):
self.widget = wg
def __str__(self):
return str(self.widget)
class LoginForm(object): #统一 灵活接口 (对象嵌套对象,多层封装)
name=StringField(wg=InputText()) #wg=InputText()
对象 StringField(wg=InputText())对象
pwd=DateField(wg=InputPassword())
l_obj=LoginForm()
print(l_obj.name)
print(l_obj.pwd)
复制代码 |
B.数据校验
后台定义好正则
用户发来数据
对数据进行校验
3.3.源码流程
生成HTML标签并显示
1.验证类(LogibForm)生成
1.1.由于 metaclass=FormMeta,所以LoginForm是由FormMeta创建的
'''
class BaseForm():
pass
class NewBase(BaseForm,metaclass=FormMeta,):
pass
class Form(NewBase):
pass
class LoginForm(Form):
pass
'''
class Form(with_metaclass(FormMeta,BaseForm)): |
1.2.执行FormMeta 的__init__方法,在LoginForm中添加2个静态字段
class FormMeta(type):
"""
The metaclass for `Form` and any subclasses of
`Form`.
`FormMeta`'s responsibility is to create the
`_unbound_fields` list, which
is a list of `UnboundField` instances sorted
by their order of
instantiation. The list is created at the first
instantiation of the form.
If any fields are added/removed from the form,
the list is cleared to be
re-generated on the next instantiation.
Any properties which begin with an underscore
or are not `UnboundField`
instances are ignored by the metaclass.
"""
def __init__(cls, name, bases, attrs):
type.__init__(cls, name, bases, attrs) #继承type的功能
cls._unbound_fields = None #在LoginForm中添加1个静态字段
cls._wtforms_meta = None #在LoginForm中添加1个静态字段 |
1.3.开始解释LoginForm中的 实例化字段对象name=simple.StringField()simple.PasswordField()
StringField/PasswordField开始实例化(提到实例化就应该想到:指定元类的__call__、自己/父类的__new__、__init__):
StringField/PasswordField是默认元类,自己没有__new__和__init__方法;
但父类Field类中有__new__方法,所以执行父类的__new__(Field.__new__)返回UnboundField对象
def __new__(cls, *args, **kwargs):#执行__new__方法
if '_form' in kwargs and '_name' in kwargs:
return super(Field, cls).__new__(cls)
else:
#我x 没想到 ! __new__既然返回了1个 UnboundField()而不是StringField/PasswordField对象;狸猫换了太子
?
return UnboundField(cls, *args, **kwargs) |
由于Field.__new__方法返回了 1个 UnboundField对象,来看
UnboundField的__init__方法
class UnboundField(object):
_formfield = True
creation_counter = 0 #静态字段 设置计数器
def __init__(self, field_class, *args, **kwargs):
#field_class=.StringField / PasswordField
#获取到field_class 的 参数封装到 UnboundField对象中,并且设置
排序 'creation_counter': 2
UnboundField.creation_counter += 1 #每实例化1个
UnboundField对象 计数器+1
self.field_class = field_class
self.args = args
self.kwargs = kwargs #{'label': '用户名', 'validators':
[<wtforms.validators.DataRequired object
at 0x00000000038EF080>, <wtforms.validators.Length
object at 0x00000000038EF0F0>], 'widget':
<wtforms.widgets.core.TextInput object at
0x00000000038EF0B8>, 'render_kw': {'class':
'form-control'}}
self.creation_counter = UnboundField.creation_counter#
'''
print(self.__dict__)
{
'field_class': <class 'wtforms.fields.simple.PasswordField'>,
'args': (),
'kwargs': {'label': '密码', 'validators': [<wtforms.validators.DataRequired
object at 0x00000000038EF198>, <wtforms.validators.Length
object at 0x00000000038EF1D0>, <wtforms.validators.Regexp
object at 0x00000000038EF208>], 'widget':
<wtforms.widgets.core.PasswordInput object
at 0x00000000038EF2B0>,
'render_kw': {'class': 'form-control'}},
'creation_counter': 2
}
''' |
UnboundField的__init__方法在 UnboundField对象中封装了Field类的参数和计数器,所以现在LoginForml类中封装数据如下
"""
print(LoginForm.__dict__)
LoginForm ={
'__module__': '__main__',
'name': <1 UnboundField(StringField, (),{'creation_counter':
1, 'label': '用户名', 'validators': [<wtforms.validators.DataRequired
object at 0x00000000037DAEB8>, <wtforms.validators.Length
object at 0x000000000382B048>], 'widget': <wtforms.widgets.core.TextInput
object at 0x000000000382B080>, 'render_kw':
{'class': 'form-control'} })>,
'pwd': <2 UnboundField(PasswordField, (),{'creation_counter':
2,'label': '密码', 'validators': [<wtforms.validators.DataRequired
object at 0x000000000382B0F0>, <wtforms.validators.Length
object at 0x000000000382B128>, <wtforms.validators.Regexp
object at 0x000000000382B160>], 'widget': <wtforms.widgets.core.PasswordInput
object at 0x000000000382B208>, 'render_kw':
{'class': 'form-control'}})>,
'__doc__': None,
'_unbound_fields': None,
'_wtforms_meta': None,
}
""" |
启发:
不一定要把代码都写在当前类中,如过多个类和类之间有同性方法、属性可以抽出来集中到父类之中;子类继承父类所以子类实例化对象之后,继承享有2者的属性和方法;所以看源码遇到继承一点要注意
观察父类;
每个对象实例化(在排除MetaClass的情况下)都会执行 父类的__new__方法,再去执行__init__方法;而__new__实质性决定了实例化出来的对象是神马?
class Queen(object):
def __new__(cls, *args, **kwargs): #类中__new__方法决定了类(),实例化出什么对象;
return Cat('狸猫','男','太子')
def __init__(self,name): #由于__nwe__方法返回了其他对象,所以不会执行Queen的__init__方法
print('ok')
self.name=name
Prince=Queen('王子')
print(Prince.name)
print(Prince.gender)
print(Prince.identity) |
2.LoginForm实例化
谈到类实例化应该先检查该类是否指定了 Meta类,如果指定了Meta类, 就需要先执行 (指定元类的__call__、自己/父类的__new__、__init__)
21.执行FormMeta的__call__方法,赋值LoginForm的_unbound_fields
和 _wtforms_meta属性;
根据unbound对象的creation_counter属性对 LoginForm中的字段进行排序,并填充到
LoginForm的_unbound_fields属性中
根据 LoginForm的__mro__继承顺序:获取当前类(FormLogin)所有父类,并在每个父类中
提取Meta属性添加到列表,转成元组,最后创建Meta类让其继承,赋值LoginForm._wtforms_meta属性
def __call__(cls,
*args, **kwargs):
if cls._unbound_fields is None: #在创建类时 已经设置LoginForm的_unbound_fields为空
fields = []
# 获取LoginForm类中,中所有属性的key:[ '_get_translations',
'_unbound_fields', '_wtforms_met,'name', 'populate_obj',
'process', 'pwd', 'validate'..... ]
for name in dir(cls):
if not name.startswith('_'): #排除__下划线的私有属性 name.
pwd
unbound_field = getattr(cls, name) #cls =LoginForm类
#根据key 获取unbound_field 对象
if hasattr(unbound_field, '_formfield'): #检查unbound_field
对象是否包含_formfield = True
fields.append((name, unbound_field))
# '''
# fields = [
# (name,name的unbound对象),
# (pwd,pwd的unbound对象),
# ]
# '''
#对fields 按照定义顺序 进行排序
fields.sort(key=lambda x: (x[1].creation_counter,
x[0])) #根据unbound对象的creation_counter进行字段排序
cls._unbound_fields = fields
if cls._wtforms_meta is None:
bases = [] #bases = [DefaultMeta],
# 按照继承顺序:获取当前类(FormLogin)所有父类
for mro_class in cls.__mro__:
if 'Meta' in mro_class.__dict__: #去每个父类(mro_class)获取
Meta = DefaultMeta
bases.append(mro_class.Meta) #bases = [DefaultMeta],
'''
class Meta(DefaultMeta):
pass
'''
cls._wtforms_meta = type('Meta', tuple(bases),
{}) #cls._wtforms_meta=Meta(DefaultMeta)类:
return type.__call__(cls, *args, **kwargs) |
执行完了指定元类 FormMeta.__call__()方法之后的LoginForm类中封装的数据
print(LoginForm.__dict__)
LoginForm ={
'__module__': '__main__',
'name': <1 UnboundField(StringField, (),{'creation_counter':
1, 'label': '用户名', 'validators': [<wtforms.validators.DataRequired
object at 0x00000000037DAEB8>, <wtforms.validators.Length
object at 0x000000000382B048>], 'widget':
<wtforms.widgets.core.TextInput object at
0x000000000382B080>, 'render_kw': {'class':
'form-control'} })>,
'pwd': <2 UnboundField(PasswordField, (),{'creation_counter':
2,'label': '密码', 'validators': [<wtforms.validators.DataRequired
object at 0x000000000382B0F0>, <wtforms.validators.Length
object at 0x000000000382B128>, <wtforms.validators.Regexp
object at 0x000000000382B160>], 'widget':
<wtforms.widgets.core.PasswordInput object
at 0x000000000382B208>, 'render_kw': {'class':
'form-control'}})>,
'__doc__': None,
'_unbound_fields': [
(name, UnboundField对象(1,simple.StringField,参数)),
(pwd, UnboundField对象(2,simple.PasswordField,参数)),
],,
'_wtforms_meta': Meta(DefaultMeta)类,
}
"""
|
启发:
#sort排序
v1=[
(11,'Martin11',18),
(121,'Martin121',19),
(311,'Martin311',25),
(311, 'Martin311', 26) #按元素1排序,如果元素1相同按照 元素3排序
]
v1.sort(key=lambda x:(x[0],x[2])) #列表的sort方法,根据
列表中的元组元素 进行排序
print(v1)
'''
[(11, 'Martin11', 18), (121, 'Martin121', 19),
(311, 'Martin311', 25), (311, 'Martin311', 26)]
''' |
class F1(object):
pass
class F2(object):
pass
class F3(F1):
pass
class F4(F2,F3):
pass
print(F4.__mro__) #打印F4 的继承关系
'''
(
<class '__main__.F4'>,
<class '__main__.F2'>,
<class '__main__.F3'>,
<class '__main__.F1'>,
<class 'object'>)
'''
'''
|
2.2.执行LoginForm的__new__方法
没有__new__方法 pass
2.3.执行LoginForm的__init__方法实例化form对象
def __init__(self,
formdata=None, obj=None, prefix='', data=None,
meta=None, **kwargs):
# 实例化LoginForm中封装的 Meta类进行实例化,以后用于生成CSRF Tocken
标签
meta_obj = self._wtforms_meta()
#meta是 form = LoginForm(meta={'csrf':'true'})传过来的参数,封装到meta_obj中
if meta is not None and isinstance(meta, dict):
meta_obj.update_values(meta)
#执行父类的构造方法,参数
# self._unbound_fields
'''
'_unbound_fields'=[
(name, UnboundField对象(1,simple.StringField,参数)),
(pwd, UnboundField对象(2,simple.PasswordField,参数)),
],
'''
# meta_ob=Meta(DefaultMeta)对象
super(Form, self).__init__(self._unbound_fields,
meta=meta_obj, prefix=prefix)
#给 form对象 中的_fields字段赋值如下;
'''
_fields: {
name: StringField对象(),
pwd: PasswordField对象(),
}
name: StringField对象(widget=widgets.TextInput()),
pwd: PasswordField对象(widget=widgets.PasswordInput())
'''
#循环form对象 中的_fields字段(字典),给form对象赋值 form.name/form.pwd
for name, field in iteritems(self._fields):
setattr(self, name, field)
'''
_fields: {
name: StringField对象(),
pwd: PasswordField对象(),
}
name: StringField对象(widget=widgets.TextInput()),
pwd: PasswordField对象(widget=widgets.PasswordInput())
'''
self.process(formdata, obj, data=data, **kwargs)
复制代码 |
执行Form父类BaseForm.__init__方法,把UnboundField对象转换成StringField对象,并赋值到form对象的_fields:{}字典中;
class BaseForm(object):
def __init__(self, fields, prefix='', meta=DefaultMeta()):
'''
参数
fields=[
(name, UnboundField对象(1,simple.StringField,参数)),
(pwd, UnboundField对象(2,simple.PasswordField,参数)),
],
meta=Meta(DefaultMeta)对象
'''
if prefix and prefix[-1] not in '-_;:/.':
prefix += '-'
self.meta = meta
self._prefix = prefix
self._errors = None
self._fields = OrderedDict()
if hasattr(fields, 'items'):
fields = fields.items()
translations = self._get_translations()
extra_fields = []
#------------------------------------------------------
if meta.csrf: #生成 CSRF tocken隐藏标签
self._csrf = meta.build_csrf(self)
extra_fields.extend(self._csrf.setup_form(self))
for name, unbound_field in itertools.chain(fields,
extra_fields):#
#(name, UnboundField对象(1,simple.StringField,参数))
#(pwd, UnboundField对象(2,simple.PasswordField,参数))
options = dict(name=name, prefix=prefix, translations=translations)
#(name, UnboundField对象(1,simple.StringField,参数))
#真正实例化 simple.StringField(参数)
field = meta.bind_field(self, unbound_field,
options)
#UnboundField对象转换成StringField对象
self._fields[name] = field |
form = {
_fields: {
name: StringField对象(),
pwd: PasswordField对象(),
} |
循环form对象 中的_fields字段(字典),分别赋值到form对象,这样就可以通过form.name/form.pwd直接获取到Field对象了
,无需form._fields['name'] / form._fields['name']
代码:
for name, field
in iteritems(self._fields):
setattr(self, name, field) |
form对象封装数据就变成以下内容喽
form = {
_fields: {
name: StringField对象(),
pwd: PasswordField对象(),
}
name: StringField对象(widget=widgets.TextInput()),
pwd: PasswordField对象(widget=widgets.PasswordInput())
} |
3. 当form对象生成之后 print(form.name) = 执行StringField对象的__str__方法;
StringField类中没有__str__方法,所以去执行基类Field的,Field.__str__方法返回了:
self() = StringFieldObj.__call__()
def __str__(self):
return self() #执行LoginForm的__call__方法 |
StringField没有__call__所以执行其基类Field.__call__方法,调用了self.meta.render_field(self,
kwargs)
def __call__(self,
**kwargs): # self=StringField对象
return self.meta.render_field(self, kwargs) #把StringField对象传传入meta.render_field方法 |
下面来看self.meta.render_field(self, kwargs)做了什么?
def render_field(self,
field, render_kw):
other_kw = getattr(field, 'render_kw', None)
if other_kw is not None:
render_kw = dict(other_kw, **render_kw)
# StringField对象.widget(field, **render_kw)
#插件.__call__()
'''
#field =StringField对象
StringField对象.widget对象()=调用widget对象的.__call__方法
'''
return field.widget(field, **render_kw) |
来看widget对象=TextInput()的__call__方法,最终打印了obj.name的结果
def __call__(self,
field, **kwargs):
kwargs.setdefault('id', field.id)
kwargs.setdefault('type', self.input_type)
if 'value' not in kwargs:
kwargs['value'] = field._value()
if 'required' not in kwargs and 'required' in
getattr(field, 'flags', []):
kwargs['required'] = True
return HTMLString('<input %s>' % self.html_params(name=field.name,
**kwargs)) |
"""
0. Form.__iter__: 返回所有字段对象
1. StringField对象.__str__
2. StringField对象.__call__
3. meta.render_field(StringField对象,)
4. StringField对象.widget(field, **render_kw)
5. 插件.__call__() """ |
4.执行for iteam in form对象的执行流程
执行form对象基类BaseForm的__inter__方法,变量self._fields字典中的内容
def __iter__(self):
"""Iterate form fields in creation
order."""
return iter(itervalues(self._fields))
_fields: {
name: StringField对象(),
pwd: PasswordField对象(),
} |
用户输入数据的校验验证流程form = LoginForm(formdata=request.form)
# 请求发过来的值
form = LoginForm(formdata=request.form) # 值.getlist('name')
# 实例:编辑
# # 从数据库对象
# form = LoginForm(obj='值') # 值.name/值.pwd
#
# # 字典 {}
# form = LoginForm(data=request.form) # 值['name']
# 1. 循环所有的字段
# 2. 获取每个字段的钩子函数
# 3. 为每个字段执行他的验证流程 字段.validate(钩子函数+内置验证规则) |
六、session功能
1. Flask自带的session功能
from flask import
session
import json
app=Flask(__name__, template_folder='templates', static_path='/static/',static_url_path='/static/')
app.debug=True
app.secret_key='sjehfjeefrjewth43u' #设置session加密
app.config['JSON_AS_ASCII'] =False #指定json编码格式
如果为False 就不使用ascii编码,
app.config['JSONIFY_MIMETYPE'] = "application/json;charset= utf-8"
#指定浏览器渲染的文件类型,和解码格式;
@app.route('/login/',methods=['GET','POST'])
def login():
msg = ''
if request.method=='POST':
name=request.values.get('user')
pwd=request.values.get('pwd')
if name =='zhanggen' and pwd=='123.com':
session['user']=name #设置session的key value
return redirect('/index/')
else:
msg='用户名或者密码错误'
return render_template('login.html',msg=msg)
@app.route('/index/',methods=['GET','POST'])
def index():
user_list = ['张根', 'egon', 'eric']
user=session.get('user') #获取session
if user:
user=['alex','egon','eric']
return jsonify(user_list)
else:
return redirect('/login/')
if __name__ == '__main__':
app.run() |
2.第三方session组件(Session)
安装 pip install flask-session
from flask import
session, Flask,request, make_response,render_template, redirect,jsonify,Response
from flask.ext.session import Session #引入第三方session
import json
app=Flask(__name__,template_folder= 'templates',static_path= '/static/',static_url_path='/static/')
app.debug=True
app.secret_key='sjehfjeefrjewth43u' #设置session加密
app.config['JSON_AS_ASCII']=False #指定json编码格式
如果为False 就不使用ascii编码,
app.config['JSONIFY_MIMETYPE'] =& quot;application/json; charset=utf-8"
#指定浏览器渲染的文件类型,和解码格式;
app.config['SESSION_TYPE']='redis'
from redis import Redis #引入连接 redis模块
app.config['SESSION_REDIS']= Redis(host='192.168.0.94',port=6379)
#连接redis
Session(app)
@app.route ('/login/',methods=['GET','POST'])
def login():
msg = ''
if request.method=='POST':
name=request.values.get('user')
pwd=request.values.get('pwd')
if name == 'zhanggen' and pwd=='123.com':
session['user']= name #设置session的key value
return redirect('/index/')
else:
msg='用户名或者密码错误'
return render_template('login.html',msg=msg)
@app.route('/index/',methods=['GET','POST'])
def index():
user_list = ['张根', 'egon', 'eric']
user=session.get('user') #获取session
if user:
user=['alex','egon','eric']
return jsonify(user_list)
else:
return redirect('/login/')
if __name__ == '__main__':
app.run() |
不仅可以把session存放到redis还可放到文件、内存、memcache...
def _get_interface(self,
app):
config = app.config.copy()
config.setdefault('SESSION_TYPE', 'null')
config.setdefault('SESSION_PERMANENT', True)
config.setdefault('SESSION_USE_SIGNER', False)
config.setdefault('SESSION_KEY_PREFIX', 'session:')
config.setdefault('SESSION_REDIS', None)
config.setdefault('SESSION_MEMCACHED', None)
config.setdefault('SESSION_FILE_DIR',
os.path.join(os.getcwd(), 'flask_session'))
config.setdefault('SESSION_FILE_THRESHOLD', 500)
config.setdefault('SESSION_FILE_MODE', 384)
config.setdefault('SESSION_MONGODB', None)
config.setdefault('SESSION_MONGODB_DB', 'flask_session')
config.setdefault('SESSION_MONGODB_COLLECT', 'sessions')
config.setdefault('SESSION_SQLALCHEMY', None)
config.setdefault('SESSION_SQLALCHEMY_TABLE',
'sessions')
if config['SESSION_TYPE'] == 'redis':
session_interface = RedisSessionInterface(
config['SESSION_REDIS'], config['SESSION_KEY_PREFIX'],
config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
elif config['SESSION_TYPE'] == 'memcached':
session_interface = MemcachedSessionInterface(
config['SESSION_MEMCACHED'], config['SESSION_KEY_PREFIX'],
config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
elif config['SESSION_TYPE'] == 'filesystem':
session_interface = FileSystemSessionInterface(
config['SESSION_FILE_DIR'], config['SESSION_FILE_THRESHOLD'],
config['SESSION_FILE_MODE'], config['SESSION_KEY_PREFIX'],
config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
elif config['SESSION_TYPE'] == 'mongodb':
session_interface = MongoDBSessionInterface(
config['SESSION_MONGODB'], config['SESSION_MONGODB_DB'],
config['SESSION_MONGODB_COLLECT'],
config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'],
config['SESSION_PERMANENT'])
elif config['SESSION_TYPE'] == 'sqlalchemy':
session_interface = SqlAlchemySessionInterface(
app, config['SESSION_SQLALCHEMY'],
config['SESSION_SQLALCHEMY_TABLE'],
config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'],
config['SESSION_PERMANENT'])
else:
session_interface = NullSessionInterface()
return session_interface |
3.自定义session组件
#!/usr/bin/env
python
# -*- coding:utf-8 -*-
import uuid
import json
from flask.sessions import SessionInterface
from flask.sessions import SessionMixin
from itsdangerous import Signer, BadSignature,
want_bytes
class MySession(dict, SessionMixin):
def __init__(self, initial=None, sid=None):
self.sid = sid
self.initial = initial
super(MySession, self).__init__(initial or ())
def __setitem__(self, key, value):
super(MySession, self).__setitem__(key, value)
def __getitem__(self, item):
return super(MySession, self).__getitem__(item)
def __delitem__(self, key):
super(MySession, self).__delitem__(key)
class MySessionInterface(SessionInterface):
session_class = MySession
container = {}
def __init__(self):
import redis
self.redis = redis.Redis()
def _generate_sid(self):
return str(uuid.uuid4())
def _get_signer(self, app):
if not app.secret_key:
return None
return Signer(app.secret_key, salt='flask-session',
key_derivation='hmac')
def open_session(self, app, request):
"""
程序刚启动时执行,需要返回一个session对象
"""
sid = request.cookies.get(app.session_cookie_name)
if not sid:
sid = self._generate_sid()
return self.session_class(sid=sid)
signer = self._get_signer(app)
try:
sid_as_bytes = signer.unsign(sid)
sid = sid_as_bytes.decode()
except BadSignature:
sid = self._generate_sid()
return self.session_class(sid=sid)
# session保存在redis中
# val = self.redis.get(sid)
# session保存在内存中
val = self.container.get(sid)
if val is not None:
try:
data = json.loads(val)
return self.session_class(data, sid=sid)
except:
return self.session_class(sid=sid)
return self.session_class(sid=sid)
def save_session(self, app, session, response):
"""
程序结束前执行,可以保存session中所有的值
如:
保存到resit
写入到用户cookie
"""
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
httponly = self.get_cookie_httponly(app)
secure = self.get_cookie_secure(app)
expires = self.get_expiration_time(app, session)
val = json.dumps(dict(session))
# session保存在redis中
# self.redis.setex(name=session.sid, value=val,
time=app.permanent_session_lifetime)
# session保存在内存中
self.container.setdefault(session.sid, val)
session_id = self._get_signer(app).sign(want_bytes(session.sid))
response.set_cookie(app.session_cookie_name,
session_id,
expires=expires, httponly=httponly,
domain=domain, path=path, secure=secure) |
from flask import
Flask
from flask import session
from my_session import MySessionInterface
app = Flask(__name__)
app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT'
app.session_interface = MySessionInterface()
@app.route('/login/', methods=['GET', "POST"])
def login():
print(session)
session['user1'] = 'alex'
session['user2'] = 'alex'
del session['user2']
return "内容"
if __name__ == '__main__':
app.run()
|
七、蓝图
使用Flask自带Blueprintmuk模块,帮助我们做代码目录结构的归类
import luffy #导入luffy包就会执行luffy包中__init__.py文件
luffy.app.run() |
from flask import
Flask
app=Flask(__name__,template_folder= 'templates',static_path= '/static/',static_url_path='/static/')
app.debug=True
from .views import login
from .views import index
#把文件中蓝图对象注册到app里
app.register_blueprint (login.login,url_prefix='/login')
#访问login蓝图必须以url_prefix开头
app.register_blueprint (index.index,url_prefix='/index')
if __name__ == '__main__':
app.run() |
from flask import
Blueprint #导入蓝图
login=Blueprint('login',__name__) #在本模块实例化1个蓝图
@login.route('/login/',methods=['GET','POST'])
def login1():
return '登录页面' |
from flask import
Blueprint
index=Blueprint('index',__name__)
@index.route('/index/',methods=['GET','POST'])
def index1():
return '首页' |
八、message (闪现)
message是一个基于Session实现的用于保存数据的集合,其特点是:一次性。
特点:和labada匿名函数一样不长期占用内存
from flask import
Flask,request,flash,get_flashed_messages
app = Flask(__name__)
app.secret_key = 'some_secret'
@app.route('/set/')
def index2():
flash('Disposable') #在message中设置1个个值
return 'ok'
@app.route('/')
def index1():
messages = get_flashed_messages() #获取message中设置的值,只能获取1次。(1次性)
print(messages)
return "Index1"
if __name__ == "__main__":
app.run()
|
九、中间件
flask也有中间件功能和Django类似,不同的是使用的是使用3个装饰器来实现的;
1.@app.before_first_request :请求第1次到来执行1次,之后都不执行;
2.@app.before_request:请求到达视图之前执行;(改函数不能有返回值,否则直接在当前返回)
3.@app.after_request:请求 经过视图之后执行;(最下面的先执行)
#!/usr/bin/env
python
# -*- coding:utf-8 -*-
from flask import Flask, Request, render_template
app = Flask(__name__, template_folder='templates')
app.debug = True
@app.before_first_request #第1个请求到来执行
def before_first_request1():
print('before_first_request1')
@app.before_request #中间件2
def before_request1():
Request.nnn = 123
print('before_request1') #不能有返回值,一旦有返回值在当前返回
@app.before_request
def before_request2():
print('before_request2')
@app.errorhandler(404)
def page_not_found(error):
return 'This page does not exist', 404
@app.route('/')
def hello_world():
return "Hello World"
@app.after_request #中间件 执行视图之后
def after_request1(response):
print('after_request1', response)
return response
@app.after_request #中间件 执行视图之后 先执行 after_request2
def after_request2(response):
print('after_request2', response)
return response
if __name__ == '__main__':
app.run() |
十、Flask相关组件
1、flask-sqlchemy
2、flask-script组件
flask-script组件:用于通过脚本的形式,启动 flask;(实现类似Django的python
manager.py runserver 0.0.0.0:8001)
pip install
flask-script #安装 |
#!/usr/bin/env
python
# -*- coding:utf-8 -*-
from sansa import create_app
from flask_script import Manager #导入
app = create_app()
manager=Manager(app) #实例化Manager对象
if __name__ == '__main__':
manager.run() |
python run.py
runserver -h 0.0.0.0 -p 8001
* Running on http://0.0.0.0:8001/ (Press CTRL+C
to quit) |
3.flask-migrate组件
在线修改、迁移数据库(Django的 migrate 。
#!/usr/bin/env
python
# -*- coding:utf-8 -*-
from sansa import create_app,db
from flask_script import Manager #导入
from flask_migrate import Migrate,MigrateCommand
app = create_app()
manager=Manager(app) #实例化Manager对象
migrate=Migrate(app,db)
manager.add_command('db',MigrateCommand) #注册命令
if __name__ == '__main__':
manager.run() |
pip install
flask-migrate #安装 |
3.1.初始化数据库:python run.py db init
3.2.迁移数据: python run.py db migrate
3.3.生成表: python run.py db upgrade
ps:修改表结构 first 直接注释静态字段代码,second 执行
python run.py db upgrade.
D:\Flask练习\sansa>python
run.py db init
Creating directory D:\Flask练习\sansa\migrations
... done
Creating directory D:\Flask练习\sansa\migrations\versions
... done
Generating D:\Flask练习\sansa\migrations\alembic.ini
... done
Generating D:\Flask练习\sansa\migrations\env.py
... done
Generating D:\Flask练习\sansa\migrations\README
... done
Generating D:\Flask练习\sansa\migrations\script.py.mako
... done
Please edit configuration/connection/logging settings
in 'D:\\Flask练习\\sansa\\migrations\\alembic.ini'
before proceeding.
D:\Flask练习\sansa>python run.py db migrate
INFO [alembic.runtime.migration] Context impl
MySQLImpl.
INFO [alembic.runtime.migration] Will assume
non-transactional DDL.
INFO [alembic.autogenerate.compare] Detected
added table 'users666'
Generating D:\Flask练习\sansa\migrations \versions\a7f412a8146f_.py
... done
D:\Flask练习\sansa>python run.py db upgrade
INFO [alembic.runtime.migration] Context impl
MySQLImpl.
INFO [alembic.runtime.migration] Will assume
non-transactional DDL.
INFO [alembic.runtime.migration] Running upgrade
-> a7f412a8146f, empty message
D:\Flask练习\sansa> |
|