我在这个领域做了不少尝试,然后最近做出来一个自己还比较满意的东西,coredumpy
https://github.com/gaogaotiantian/coredumpy
这是个解决 python 的 post-mortem debugging 的工具。它的核心点其实就是一个,可以对程序当前运行状态做一个 snapshot ,然后保存 dump ,在进程结束后用这个 dump 文件去重建。
这个概念本身并不算新,在一些 native 语言里是存在的。python 这边也有人做过一些尝试,但是效果都不好。一个核心原因就是 Python 对象之间的引用关系比较复杂,不太好重建。而之前大量的尝试基本都是基于 pickle 的,pickle 有几个核心问题导致它上不了 production:
所以 coredumpy 是完全抛弃了 pickle ,用另一种方式来试图解决这个问题——我可以没办法重建全部数据,但是我一定要能 dump 出来,并且保留更多的信息。
拿到这个 dump 之后,可以用 vscode 直接打开(会调用 vscode 自身的 debugger ),也可以用 pdb 打开。同时还做了个 github actions ,可以在 github actions 里直接产生一个 vscode link ,然后直接复制到浏览器就可以在 vscode 打开这个 dump 了,连手动下载都不需要。
项目本身还在做,感兴趣的小伙伴可以试一下,如果有反馈也可以直接在 github 提 issue~
]]>输入 import sys print(sys.executable),它显示正常 C:\ProgramData\anaconda3\python.exe
输入 import sys !{sys.executable} -m pip install python-binance 它又显示 Defaulting to user installation because normal site-packages is not writeable
]]>之前尝试用 Selenium 写过一个自动预约健身房的小项目,结果一直过不了网页的反爬机制,打开网页一片空白... 不知道用这个能不能轻松搞成。
]]>岗位职责 1.负责 Coze 、LangChain 等 AIGC Framework 的工作流搭建,优化 AI 应用的开发流程。 2.熟练使用 主流大语言模型( LLM )(如 DS 、OpenAI GPT-4 、Claude 、Gemini 、Mistral 、Llama 2 等),并进行 API 集成与优化。 3.负责应用架构设计与开发,确保 AI 生成应用的高效运行与稳定性。 4.参与 AIGC 解决方案的技术选型与落地,推动 AI 生成应用的创新与优化。 5.通过远程协作,与团队成员沟通项目进展,确保按时交付高质量成果。
任职要求 1.熟悉 AIGC 开发框架(如 Coze 、LangChain 、LlamaIndex 、Haystack ),能够搭建 LLM 工作流。 2.熟练使用主流 LLM ,具备 API 调用、微调( Fine-tuning )、Prompt 设计等实践经验。 3.有丰富的后端开发经验,熟练掌握 Python ( FastAPI / Flask )或 Node.js ( Express / NestJS ) 。 4.熟悉数据库与缓存优化(如 PostgreSQL / MySQL / Redis / MongoDB ),具备良好的架构设计能力。 5.熟练使用 Docker 、Kubernetes 、Cloud 服务( AWS/GCP/Azure ) 进行部署与优化。 6.具备良好的远程协作与自我管理能力,能够独立完成任务并高效沟通。
加分项 1.有完整的 AIGC 应用开发经验,如 AI 客服、助手、AI 生成内容平台、智能对话系统等。 2.了解私有化 LLM 部署,能优化 AI 模型的推理速度与成本。 3.具备分布式系统开发经验,能够优化 AIGC 应用的高并发处理能力,熟悉本地化 AI 应用方向。
]]>def generate_state():
"""
生成一个随机的 state 参数
"""
return secrets.token_urlsafe(16) # 生成一个 16 字节的随机字符串
def wechat_login(request):
"""
微信登录入口,根据设备类型选择不同的授权方式
"""
user_agent = request.META.get('HTTP_USER_AGENT', '').lower()
# 判断是否为微信浏览器
is_wechat_browser = 'micromessenger' in user_agent
# 判断是否为手机端
is_mobile = any(keyword in user_agent for keyword in ['mobile', 'android', 'iphone', 'ipod'])
# 生成随机的 state 参数
state = generate_state()
request.session['wechat_state'] = state # 将 state 存储在会话中
if is_wechat_browser:
# 微信浏览器内登录(使用微信公众号的 OAuth2.0 授权)
wechat_auth_url = (
f"https://open.weixin.qq.com/connect/oauth2/authorize"
f"?appid={settings.WECHAT_MP_APP_ID}" # 使用微信公众号的 AppID
f"&redirect_uri={settings.WECHAT_MP_REDIRECT_URI}" # 微信公众号的回调地址
f"&response_type=code"
f"&scope=snsapi_userinfo" # 使用 snsapi_userinfo 或 snsapi_base
f"&state={state}#wechat_redirect"
)
elif is_mobile:
# 手机端浏览器登录(使用微信开放平台的 H5 登录)
wechat_auth_url = (
f"https://open.weixin.qq.com/connect/oauth2/authorize"
f"?appid={settings.WECHAT_OPEN_APP_ID}" # 使用微信开放平台的 AppID
f"&redirect_uri={settings.WECHAT_OPEN_REDIRECT_URI}" # 微信开放平台的回调地址
f"&response_type=code"
f"&scope=snsapi_userinfo" # 使用 snsapi_login
f"&state={state}#wechat_redirect"
)
else:
# 电脑端浏览器登录(使用微信开放平台的扫码登录)
wechat_auth_url = (
f"https://open.weixin.qq.com/connect/qrconnect"
f"?appid={settings.WECHAT_OPEN_APP_ID}" # 使用微信开放平台的 AppID
f"&redirect_uri={settings.WECHAT_OPEN_REDIRECT_URI}" # 微信开放平台的回调地址
f"&response_type=code"
f"&scope=snsapi_login" # 使用 snsapi_login
f"&state={state}#wechat_redirect"
)
return redirect(wechat_auth_url)
def get_wechat_h5_login_url(request):
"""
生成微信开放平台的 H5 登录 URL
"""
# 生成随机的 state 参数
state = generate_state()
request.session['wechat_state'] = state
# 微信开放平台的 H5 登录 URL
wechat_h5_login_url = (
f"https://open.weixin.qq.com/connect/oauth2/authorize"
f"?appid={settings.WECHAT_OPEN_APP_ID}" # 微信开放平台的 AppID
f"&redirect_uri={settings.WECHAT_OPEN_REDIRECT_URI}" # 回调地址
f"&response_type=code"
f"&scope=snsapi_login" # 使用 snsapi_login
f"&state={state}" # 可选参数,用于防止 CSRF 攻击
f"#wechat_redirect"
)
return JsonResponse({
'success': True,
'login_url': wechat_h5_login_url,
})
def filter_username(nickname):
"""
过滤掉不允许的字符,只保留字母、数字、_、-
"""
# 只保留字母、数字、_、-
filtered = re.sub(r'[^\w-]', '', nickname)
return filtered
def generate_unique_username(nickname):
"""
根据微信昵称生成唯一的用户名
"""
# 过滤特殊字符
base_username = filter_username(nickname)
# 如果过滤后的用户名为空,使用默认用户名
if not base_username:
base_username = 'wechat_user'
# 添加随机后缀
random_suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4))
username = f"{base_username}_{random_suffix}"
# 检查用户名是否已存在
while User.objects.filter(username=username).exists():
random_suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4))
username = f"{base_username}_{random_suffix}"
return username
def wechat_callback(request):
"""
微信登录回调处理
"""
code = request.GET.get('code')
state = request.GET.get('state')
if not code:
return HttpResponse('授权失败,未获取到 code')
# 验证 state 参数
if state != request.session.get('wechat_state'):
return HttpResponse('非法请求,state 参数不匹配')
# 清除会话中的 state
request.session.pop('wechat_state', None)
# 判断是否为微信浏览器
user_agent = request.META.get('HTTP_USER_AGENT', '').lower()
is_wechat_browser = 'micromessenger' in user_agent
# 根据设备类型选择 appid 和 secret
if is_wechat_browser:
# 微信浏览器内登录(使用微信公众号的 appid 和 secret )
appid = settings.WECHAT_MP_APP_ID
secret = settings.WECHAT_MP_APP_SECRET
else:
# 非微信浏览器登录(使用微信开放平台的 appid 和 secret )
appid = settings.WECHAT_OPEN_APP_ID
secret = settings.WECHAT_OPEN_APP_SECRET
# 通过 code 获取 access_token
token_url = (
f"https://api.weixin.qq.com/sns/oauth2/access_token"
f"?appid={appid}"
f"&secret={secret}"
f"&code={code}"
f"&grant_type=authorization_code"
)
response = requests.get(token_url)
data = response.json()
access_token = data.get('access_token')
openid = data.get('openid')
unionid = data.get('unionid') # 获取 unionid
if not access_token or not openid:
return HttpResponse('获取 access_token 失败')
# 获取用户信息
user_info_url = (
f"https://api.weixin.qq.com/sns/userinfo"
f"?access_token={access_token}"
f"&openid={openid}"
)
response = requests.get(user_info_url)
response.encoding = 'utf-8'
user_info = response.json()
# 根据 unionid 查找或创建用户
user, created = User.objects.get_or_create(unionid=unionid)
if created:
# 生成唯一的用户名
nickname = user_info.get('nickname', '微信用户')
user.username = generate_unique_username(nickname)
user.set_unusable_password() # 微信登录用户不需要密码
user.nickname = user_info.get('nickname', '') # 更新昵称
user.avatar = user_info.get('headimgurl', '') # 更新头像
user.wechat_nickname = user_info.get('nickname', '') # 更新微信昵称
user.save()
# 登录用户
login(request, user)
return redirect('/') # 登录成功后跳转到首页
def get_js_sdk_config(request):
"""
获取 JS-SDK 配置
"""
try:
access_token = get_access_token(settings.WECHAT_MP_APP_ID, settings.WECHAT_MP_APP_SECRET)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
# 获取 JSAPI Ticket
ticket_url = f"https://api.weixin.qq.com/cgi-bin/ticket/getticket?access_token={access_token}&type=jsapi"
response = requests.get(ticket_url)
jsapi_ticket = response.json().get('ticket')
# 生成签名参数
nonce_str = generate_nonce_str()
timestamp = int(time.time())
url = request.build_absolute_uri()
# 生成签名
signature = generate_signature(jsapi_ticket, nonce_str, timestamp, url)
# 返回 JS-SDK 配置
return JsonResponse({
'appId': settings.WECHAT_MP_APP_ID, # 公众号的 AppID
'timestamp': timestamp,
'nonceStr': nonce_str,
'signature': signature,
'wechatOpenAppId': settings.WECHAT_OPEN_APP_ID, # 微信开放平台的 AppID
'wechatOpenRedirectUri': settings.WECHAT_OPEN_REDIRECT_URI, # 微信开放平台回调地址
'wechatMpRedirectUri': settings.WECHAT_MP_REDIRECT_URI # 微信开放平台回调地址
})
def get_access_token(appid, appsecret):
"""
获取微信 access_token ,并缓存
"""
# 先从缓存中获取 access_token
access_token = cache.get('wechat_access_token')
if access_token:
return access_token
# 缓存中没有,则从微信接口获取
url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={appid}&secret={appsecret}"
response = requests.get(url)
data = response.json()
if 'access_token' in data:
access_token = data['access_token']
# 将 access_token 缓存 7000 秒(微信的有效期是 7200 秒)
cache.set('wechat_access_token', access_token, 7000)
return access_token
else:
raise Exception(f"获取 access_token 失败: {data}")
path('wechat/login/', wechat_login, name='wechat_login'),
path('wechat/callback/', wechat_callback, name='wechat_callback'),
path('get_js_sdk_config/', get_js_sdk_config, name='get_js_sdk_config'), # 获取 JS-SDK 配置
path('wechat/get_wechat_h5_login_url/', get_wechat_h5_login_url, name='get_wechat_h5_login_url'),#手机端非微信浏览器登录
login.html 方式 1
<script src="/load/view.php?a=aHR0cHM6Ly9yZXMud3gucXEuY29tL29wZW4vanMvandlaXhpbi0xLjYuMC5qcw"></script>
<div class="container reg">
<!-- 微信登录按钮 -->
<button id="wechat-login">微信登录</button>
</div>
<script>
document.getElementById('wechat-login').addEventListener('click', function () {
// 判断是否为微信浏览器
var userAgent = navigator.userAgent.toLowerCase();
var isWechatBrowser = userAgent.indexOf('micromessenger') !== -1;
var isMobile = /mobile|android|iphone/i.test(userAgent);
if (isWechatBrowser || !isMobile) {
// 微信浏览器内或电脑端,直接跳转到后端生成的微信登录 URL
window.location.href = "/user/wechat/login/"; // 这里假设 Django 反向解析后的实际 URL
} else {
// 手机端非微信浏览器,使用微信开放平台的 H5 登录
// 向后端请求微信开放平台的 H5 登录 URL
var xhr = new XMLHttpRequest();
xhr.open('GET', '/user/wechat/get_wechat_h5_login_url/', true); // 后端生成 H5 登录 URL 的接口
xhr.onreadystatechange = function () {
if (xhr.readyState === 4 && xhr.status === 200) {
var response = JSON.parse(xhr.responseText);
if (response.success) {
// 跳转到微信开放平台的 H5 登录页面
window.location.href = response.login_url;
} else {
alert('获取微信登录链接失败,请稍后重试。');
}
}
};
xhr.send();
}
});
</script>
login.html 方式 2
<script src="/load/view.php?a=aHR0cHM6Ly9yZXMud3gucXEuY29tL29wZW4vanMvandlaXhpbi0xLjYuMC5qcw"></script>
<div class="container reg">
<!-- 微信登录按钮 -->
<button id="wechat-login">微信登录</button>
</div>
<script>
document.getElementById('wechat-login').addEventListener('click', function () {
// 判断是否为微信浏览器
var userAgent = navigator.userAgent.toLowerCase();
var isWechatBrowser = userAgent.indexOf('micromessenger')!== -1;
var isMobile = /mobile|android|iphone/i.test(userAgent);
if (isWechatBrowser ||!isMobile) {
// 微信浏览器内或电脑端,直接跳转到后端生成的微信登录 URL
window.location.href = "/user/wechat/login/"; // 这里假设 Django 反向解析后的实际 URL
} else {
// 获取 JS-SDK 配置
var xhr = new XMLHttpRequest();
xhr.open('GET', '/user/get_js_sdk_config/', true);
xhr.onreadystatechange = function () {
if (xhr.readyState === 4 && xhr.status === 200) {
var config = JSON.parse(xhr.responseText);
wx.config({
debug: true, // 调试模式
appId: config.appId, // 公众号的 AppID
timestamp: config.timestamp, // 时间戳
nonceStr: config.nonceStr, // 随机字符串
signature: config.signature, // 签名
jsApiList: ['launchApplication'] // 需要使用的 JS 接口
});
wx.ready(function () {
// 尝试唤醒微信客户端
wx.launchApplication({
appId: config.appId, // 这里需要替换为实际的微信 AppID
extraData: '',
success: function () {
// 唤醒成功,跳转到微信登录 URL
window.location.href = "/user/wechat_login/"; // 这里假设 Django 反向解析后的实际 URL
},
fail: function (res) {
alert('唤醒微信客户端失败,请检查您的微信是否安装或尝试其他登录方式。');
}
});
});
wx.error(function (res) {
alert('JS-SDK 配置出错,请刷新页面重试。');
});
}
};
xhr.send();
}
});
</script>
setting.py 里面也配置好了上面的 appid 相关参数与回调地址,并且在电脑 pc 端,微信浏览器内都可以成功登录了,就是在手机非浏览器端,不管怎么操作,都是提示:请在微信客户端内打开链接
麻烦各位大佬看看是哪里的原因? 一直无法用 h5 的 js-sdk 在手机端唤起微信客户端登录?谢谢
]]>整体开发的体验非常丝滑,NiceGUI 框架提供了丰富的前端组件。
同时支持非常自然地将前端状态绑定至 python 的属性或变量:
demo = Demo()
ui.slider(min=1, max=3).bind_value(demo, 'number')
甚至直接绑定至用户一次 session:
ui.textarea('This note is kept between visits')
.classes('w-full').bind_value(app.storage.user, 'note')
优雅的背后是 tradeoff ,就如同 GIL 一刀切一般,在 NiceGUI 框架中所有的逻辑都在后端服务器处理(美名称之为 backend-first philosophy )。
举个例子,在前端界面点击菜单下拉框,也需要一来一回的 websocket 交互。如果网络状况一般,可能会严重影响用户体验,这也是为什么个人项目主打自部署~
总而言之,NiceGUI 对于没有丰富前端经验的 Python 程序员,应该会是一个不错的选择 :)
]]>Powered by FFI from rquest.
A new type of interpreter based on tail calls has been added to CPython. For certain newer compilers, this interpreter provides significantly better performance. Preliminary numbers on our machines suggest anywhere from -3% to 30% faster Python code, and a geometric mean of 9-15% faster on
pyperformance
depending on platform and architecture. The baseline is Python 3.14 built with Clang 19 without this new interpreter.
其实我很想知道,这种性能测试的结果到底准不准?熟悉这方面的大佬来聊聊。
]]>楼主从业开始用 Python 七八年了,这几年也看到了行业的不景气,
自己这几年一直很努力学习 Python 技术+领域驱动设计+需求分析,
以前的梦想就是当一个合格的 Python 软件工程师,可现实吧......哎.....
不知道自己能在这个行业还能多久,过年期间突然想着给心爱的 Python 留点儿什么,
于是开始了翻译(其实英文原文也很通俗易懂,楼主翻译完全是自己太闲了)
基本上是 AI+人工复制粘贴,这几天正在人工核对一些翻译不准确的地方...
仓库的 Readme.md 里可以看所有的文字,也没有什么商业目的,单纯希望 Python 越来越好,
这是翻译后的 github: https://github.com/fushall/cosmicpython-book
这是原作者的 github: https://github.com/cosmicpython/book
下面书的目录:
Chapter 章节 |
---|
Preface 前言(已翻译) |
Introduction: Why do our designs go wrong? 引言:为什么我们的设计会出问题?(已翻译) |
Part 1 Intro 第一部分简介(已翻译) |
Chapter 1: Domain Model 第一章:领域模型(已翻译) |
Chapter 2: Repository 第二章:仓储(已翻译) |
Chapter 3: Interlude: Abstractions 第三章:插曲:抽象(已翻译) |
Chapter 4: Service Layer (and Flask API) 第四章:服务层(和 Flask API )(已翻译) |
Chapter 5: TDD in High Gear and Low Gear 第五章:高速档与低速档中的测试驱动开发( TDD )(已翻译) |
Chapter 6: Unit of Work 第六章:工作单元(已翻译) |
Chapter 7: Aggregates 第七章:聚合(已翻译) |
Part 2 Intro 第二部分简介(已翻译) |
Chapter 8: Domain Events and a Simple Message Bus 第八章:领域事件与简单消息总线(已翻译) |
Chapter 9: Going to Town on the MessageBus 第九章:深入探讨消息总线(已翻译) |
Chapter 10: Commands 第十章:命令(已翻译) |
Chapter 11: External Events for Integration 第十一章:集成外部事件(已翻译) |
Chapter 12: CQRS 第十二章:命令查询责任分离(已翻译) |
Chapter 13: Dependency Injection 第十三章:依赖注入(已翻译) |
Epilogue: How do I get there from here? 尾声:我该如何开始?(已翻译) |
Appendix A: Recap table 附录 A:总结表格(已翻译) |
Appendix B: Project Structure 附录 B:项目结构(已翻译) |
Appendix C: A major infrastructure change, made easy 附录 C:轻松替换重要的基础设施(已翻译) |
Appendix D: Django 附录 D:Django (已翻译) |
Appendix F: Validation 附录 F:校验(已翻译) |
我目前打算使用 arq(异步消息框架) spider 抓到的 item 直接扔给 arq,扔之前查看任务队列是否过多,可以暂停发布任务
各位有好的想法吗
]]>问题并不是打包失败或者运行报错,而是有时候可以有时候不行。不行的时候也不报错,而是就是好像跳过那一行直接返回了。
]]>class FancyLogger:
"""日志类:支持向文件、Redis 、ES 等服务输出日志"""
_redis_max_length = 1024
def __init__(self, output_type=OutputType.FILE):
self.output_type = output_type
...
def log(self, message):
"""打印日志"""
if self.output_type == OutputType.FILE:
...
elif self.output_type == OutputType.REDIS:
...
elif self.output_type == OutputType.ES:
...
else:
raise TypeError('output type invalid')
def pre_process(self, message):
"""预处理日志"""
# Redis 对日志最大长度有限制,需要进行裁剪
if self.output_type == OutputType.REDIS:
return message[:self._redis_max_length]
FancyLogger 类在日志输出类型不同时,需要有不同的行为。因此,我们完全可以为“输出日志”行为建模一个新的类型:LogWriter ,然后把每个类型的不同逻辑封装到各自的 Writer 类中。
class FileWriter:
def write(self, message):
...
class RedisWriter:
max_length = 1024
def write(self, message):
message = self._pre_process(message)
...
def _pre_process(self, message):
"""Redis 对日志最大长度有限制,需要进行裁剪"""
return message[:self.max_length]
class EsWriter:
def write(self, message):
...
基于这些不同的 Writer 类,FancyLogger 可以简化成下面这样:
class FancyLogger:
"""日志类:支持向文件、Redis 、ES 等服务输出日志"""
def __init__(self, output_writer=None):
self._writer = output_writer or FileWriter()
...
def log(self, message):
self._writer.write(message)
文中对这样的写法好处解释为 代码利用多态特性,完全消除了原来的条件判断语句。另外你会发现,新代码的扩展性也远比旧代码好。
但是在我看来, 你要传什么 output_writer 不还是要通过 if 来选择吗, 只是把一个地方的 if 换到了另外一个地方,
扩展性 这个模块看起来确实好了, 但是总感觉和上面一样, 这里提高了, 但是其他地方就要更多 if, TVT, 面向对象还是没有入门
]]>但是担心这样大规模发邮件验证码,会被丢进垃圾箱,所以想问一下有没有提供类似于发送短信服务的,可以确保发送邮件验证码不会被扔进垃圾箱的服务。上网搜索了一下,貌似没有看到相关信息。
]]>主要是帮助练习和学习 Torch 的一些基本概念和基本操作,希望能对大家有帮助!
]]>宝塔版本:9.0.0
系统:Ubuntu 24.04.1 LTS (Noble Numbat) x86_64(Py3.7.16)
发现证书居然有 25 天的,宝塔自动续签摆设的???
/www/server/panel/pyenv/bin/python3 -u /www/server/panel/class/acme_v2.py --renew=1
经过测试,发现 p_key 多了个换行。 因此删除换行即可:
if pkey[-1] == '\n':
pkey = pkey[:-1]
(换行是宝塔加的,因为证书都是从宝塔点击申请的)
结果:
.proto
文件和生成代码,这对开发者来说是一项重复且容易出错的任务。FastGRPC 的出现,通过其简洁直观的设计理念,让我们能够用更少的代码和更高的效率构建 gRPC 服务。
首先,确保你的环境支持 Python 3.9 及以上版本。然后通过以下命令安装 FastGRPC:
pip install python-fast-grpc
以下是使用 FastGRPC 创建一个简单服务的代码示例:
from pydantic import BaseModel
from fast_grpc import FastGRPC
# 初始化服务
app = FastGRPC(service_name="Greeter", proto="greeter.proto")
# 定义请求和响应的数据模型
class HelloRequest(BaseModel):
name: str
class HelloReply(BaseModel):
message: str
# 定义一个 gRPC 方法
@app.unary_unary()
async def say_hello(request: HelloRequest) -> HelloReply:
return HelloReply(message=f"Greeter SayHello {request.name}")
# 启动服务
if __name__ == '__main__':
app.run()
代码特点:
.proto
文件,无需额外工具或命令,也支持根据 proto 自动生成 pydantic 版本的 client 。通过以上几行代码,我们快速构建了一个基于 gRPC 的 Python 微服务。更多功能,请访问 FastGRPC。
]]>tmp = '{"{\"username\":\"juheso\",\"is_cache\":1}":3}'
print(tmp)
print(repr(tmp))
tmp_1 = r'{"{\"username\":\"juheso\",\"is_cache\":1}":3}'
print(tmp_1)
我运行之后的结果为
{"{"username":"juheso","is_cache":1}":3}
'{"{"username":"juheso","is_cache":1}":3}'
{"{\"username\":\"juheso\",\"is_cache\":1}":3}
我想要的结果其实是{"{\"username\":\"juheso\",\"is_cache\":1}":3}
但是为什么 repr() 没有效果呢?
我的 python 版本为 3.10
]]>使用 pynupt 执行键盘模拟,通过 automator 配置服务、系统快捷键绑定脚本。代码如下,正常在编辑器里看执行是正常的,但是打开 chiaki ,快捷键调起脚本后就没反应了,但是如果多调用几次脚本会偶尔还会执行一次按键调用。不知道有没有大佬能指导下?
或者还有其他推荐的自动化工具么?主要需求就是模拟键盘,可以获取图像进行分析比如获取像素点 rgb 或者用 opencv 做图像识别就更好了。
import time from pynput.keyboard import Key, Controller keyboard = Controller() time1=0.1 for num in range(0,120): keyboard.tap("k"); time.sleep(time1); keyboard.tap(Key.left); time.sleep(time1); keyboard.tap("k"); time.sleep(time1); keyboard.tap(Key.down); time.sleep(time1);
]]># -*- coding: UTF-8 -*-
from flask import Flask, request
import json
import time
import re
import sys
import asyncio
from tornado.ioloop import IOLoop
from tornado.wsgi import WSGIContainer
from tornado.httpserver import HTTPServer
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine('sqlite:///database.db?check_same_thread=False')
Base = declarative_base()
class ClientLog(Base):
__tablename__ = 'clientlog'
log_id = Column(Integer, primary_key=True, autoincrement=True)
message = Column(String(512))
def __repr__(self):
return "A"
from sqlalchemy.ext.declarative import DeclarativeMeta
class AlchemyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
# an SQLAlchemy class
fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
data = obj.__getattribute__(field)
try:
json.dumps(data) # this will fail on non-encodable values, like other classes
fields[field] = data
except TypeError:
fields[field] = None
# a json-encodable dict
return fields
return json.JSONEncoder.default(self, obj)
Base.metadata.create_all(engine, checkfirst=True)
Session = sessionmaker(bind=engine)
session = Session()
app = Flask("app")
@app.route('/api', methods=['GET', 'POST'])
@app.route('/api/', methods=['GET', 'POST'])
def api1():
session.add(ClientLog(message="test"))
session.commit()
return "test"
def launch_server():
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
http_server = HTTPServer(WSGIContainer(app))
http_server.listen(8080)
IOLoop.current().start()
launch_server()
在阿里云的 Windows 2019 模板下 100 并发就会崩溃退出,报错
Exception in thread Tornado selector:
Traceback (most recent call last):
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 1045, in _bootstrap_inner
self.run()
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 982, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python311\Lib\site-packages\tornado\platform\asyncio.py", line 574, in _run_select
rs, ws, xs = select.select(to_read, to_write, to_write)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: too many file descriptors in select()
但是我在 Windows 10 的电脑上测试并发没问题 之前测试换了另外一个对 Windows 更友好的 web framework (想不起来名字,找了半天没找到)可以是可以,但是估计它用了不止一个进程写 SQLite ,导致冲突无法提交更改 业务需求调用一个 Windows Only 的 pip 包 ,无法更换 Linux 服务器
]]>from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello_world():
return "Hello, World!"
以上代码使用 wrk 压测(wrk -t2 -c100 -d10s http://localhost:5000/
),第三次 rps 就会掉到 0 ,是什么原因导致的?是 GIL 锁导致的吗?
我的 settings:
# djangoQ 配置
Q_CLUSTER = {
'name': 'myDjangoQ', # 启动服务名
'workers': 2, # 多少个 workers 的数量直接影响了任务处理的并发能力
'label': 'myDjangoQ_label',
'orm': 'default', # 使用 Django ORM 作为后端
# 'recycle': 4, # 工作进程在处理完指定数量的任务后,将自动重启自身以释放内存资源
'timeout': 10, # 超时
# 'recycle_frequency': 4, # 重启频率
'compress': False, # 是否将任务包压缩后发送到代理
'save_limit': 250, # 保存成功任务结果
'sync': False, # 是否同步执行任务
'queue_limit': 2, # 集群中的任务数量
'cpu_affinity': 1, # 单个任务使用 cpu 核心
"redis": {
"host": config.get("redis", 'host'),
"port": config.get("redis", 'port'),
"db": 3,
"password": config.get("redis", 'password'),
"socket_timeout": 30,
"charset": "utf-8",
"decode_responses": True,
"max_connections": 1000,
}
}
触发函数和具体执行函数:
def test(request):
from django_q.tasks import async_task, result
info = {'code': 0, 'data': [], 'msg': ''}
try:
task_ids = []
for i in range(700):
task_id = async_task(my_function, i)
task_ids.append(task_id)
except:
info['msg'] = 'test error'
logger.error(traceback.format_exc())
return JsonResponse(info, safe=False)
def my_function(i):
logger.info('i:{} 开始开始开始开始开始'.format(i))
logger.info('i:{} 开始'.format(i))
time.sleep(random.randint(3, 5))
logger.info('i:{} 结束'.format(i))
return i
实际输出:
11:15:31 [Q] INFO Process-1:1 processing [undress-hot-october-high]
2025-01-08 11:15:31.602 | INFO | deploy_queue.views:my_function:1637 - i:20 开始开始开始开始开始
2025-01-08 11:15:31.602 | INFO | deploy_queue.views:my_function:1643 - i:20 开始
2025-01-08 11:15:31.610 | INFO | deploy_queue.views:my_function:1645 - i:19 结束
11:15:31 [Q] INFO Process-1:2 processing [network-pizza-sink-emma]
2025-01-08 11:15:31.611 | INFO | deploy_queue.views:my_function:1637 - i:21 开始开始开始开始开始
2025-01-08 11:15:31.611 | INFO | deploy_queue.views:my_function:1643 - i:21 开始
11:15:31 [Q] INFO Processed [tennessee-sierra-timing-michigan]
11:15:31 [Q] INFO Processed [arkansas-muppet-charlie-orange]
2025-01-08 11:15:35.615 | INFO | deploy_queue.views:my_function:1645 - i:21 结束
2025-01-08 11:15:36.607 | INFO | deploy_queue.views:my_function:1645 - i:20 结束
11:15:41 [Q] WARNING reincarnated worker Process-1:1 after timeout
11:15:41 [Q] INFO Process-1:5 ready for work at 30020
11:15:41 [Q] INFO Process-1:5 processing [zulu-october-green-berlin]
2025-01-08 11:15:41.873 | INFO | deploy_queue.views:my_function:1637 - i:22 开始开始开始开始开始
11:15:42 [Q] WARNING reincarnated worker Process-1:2 after timeout
可以看到前面 21 个任务执行正常,但是第 22 个开始就全是 timeout 退出,完全没什么头绪,令人头大。
]]>但实际上我以前没啥需求, 没怎么实践过
请推荐下实际生产项目中好用的爬虫框架或方案
]]>想请教一下,打包 exe 的方案有哪些?
目前在使用 pyinstaller ,但是打包过程比较慢,打包后的 exe 感觉也挺大的
有没有更好的方案或者优化教程?
]]>