pip加速
# 命令行指定仓库地址
# --break-system-packages 用于在安装包时忽略系统级别的包。
# 这意味着,即使系统级别已经安装了某个包,pip 也会尝试从其指定的源(本例中为豆瓣镜像)重新安装该包,而不是使用系统级别的版本。
python3 -m pip install bs4 -i https://pypi.doubanio.com/simple/ --break-system-packages
Zabbix API
import requests
import json
url = "http://wuxk.tpddns.cn:12003/zabbix/api_jsonrpc.php"
payload = json.dumps({
"jsonrpc": "2.0",
"method": "user.login",
"params": {
"user": "Admin",
"password": "Esxi0000.."
},
"id": 1,
"auth": None
})
headers = {
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
命令行传参
#行内输入
import sys
dir = sys.argv[1:]
get_all_lvm_info(dir[0])
#交互式传参
dir = input("LVM Mount Point: ")
dir = dir.replace("/","")
get_all_lvm_info("/" + dir)
#选项传参
import argparse
parser = argparse.ArgumentParser(description="A simple argument parser.")
# 添加参数
parser.add_argument('--name', type=str, help='Your name')
parser.add_argument('--age', type=int, help='Your age')
# 解析参数
args = parser.parse_args()
print(f"Name: {args.name}")
print(f"Age: {args.age}")
lvm2py模块
#lvs
Probes the system for volume groups and returns a list of VolumeGroup
instances::
from lvm2py import *
lvm = LVM()
vgs = lvm.vgscan()
#remove vg
from lvm2py import *
lvm = LVM()
vg = lvm.get_vg("myvg", "w")
lvm.remove_vg(vg)
#create vg
from lvm2py import *
lvm = LVM()
vg = lvm.create_vg("myvg", ["/dev/sdb1", "/dev/sdb2"])
#get vg
from lvm2py import *
lvm = LVM()
vg = lvm.get_vg("myvg")
#get pv
from lvm2py import *
lvm = LVM()
vg = lvm.get_vg("myexistingvg")
# get pv from pvscan
pv1 = vg.pvscan()[0]
# get it by name
pv2 = vg.get_pv("/dev/sdb1")
# or just instantiate it
pv3 = PhysicalVolume(vg, name="/dev/sdb1")
Probes the volume group for physical volumes and returns a list of
PhysicalVolume instances::
from lvm2py import *
lvm = LVM()
vg = lvm.get_vg("myvg")
pvs = vg.pvscan()
#add pv
Initializes a device as a physical volume and adds it to the volume group::
from lvm2py import *
lvm = LVM()
vg = lvm.get_vg("myvg", "w")
vg.add_pv("/dev/sdbX")
#get pv
Returns the physical volume associated with the given device::
from lvm2py import *
lvm = LVM()
vg = lvm.get_vg("myvg", "w")
vg.get_pv("/dev/sdb1")
#get lv
Returns a LogicalVolume instance given an existin logical volume name::
from lvm2py import *
lvm = LVM()
vg = lvm.get_vg("myvg", "w")
vg.get_lv("mylv")
#remove pv
Removes a physical volume from the volume group::
from lvm2py import *
lvm = LVM()
vg = lvm.get_vg("myvg", "w")
pv = vg.pvscan()[0]
vg.remove_pv(pv)
setup.py
# 当前目录任何时候都能作为程序的家目录
C:\USERS\WUXK\WUXK\setup.py
C:\USERS\WUXK\WUXK\wuxk_jaina
│ __init__.py
│
└─ run
│ main.py
│
├─config
│ pod5.ini
│ pod52.ini
│ pod6.ini
│ test.ini
│
└─tools
│ addrule.py
│ check_rule.py
│ chkip.py
│ chkpod.py
│ chkvip.py
│ ConfigUtils.py
│ create_security_group.py
│ create_security_group_rule.py
│ delete_security_group_rule.py
│ get_token.py
│ port_add_security_group.py
└─ verify_token.py
# setup.py
from setuptools import setup, find_packages
setup(
name='wuxk_jaina',
version='0.0.1',
packages=find_packages(),
package_data={
'':[
'run/tools/add*.py',
'run/tools/chk*.py',
'run/tools/check*.py',
'run/tools/create_*.py',
'run/tools/delete_*.py',
'run/tools/get_*.py',
'run/tools/ConfigUtils.py',
'run/tools/port_add_security_group.py',
'run/tools/port_add_security_group.py',
'run/tools/verify_token.py',
'run/main.py',
'run/config/test.ini',
'run/config/pod*.ini'
]
},
python_requires='>=3',
install_requires=['pymysql>=1.1.1','flask>=3.0.2', 'requests>=2.31.0'],
url='',
license='',
author='Wuxk',
author_email='1218304973@qq.com',
description='',
)
# build 测试 (打包前执行一下,防止新修改不生效)
python setup.py build
# 生成pip安装包(wuxk_jaina-0.0.1-py3-none-any.whl)
python setup.py bdist_wheel
# 生成压缩包文件(.tar.gz)
python setup.py sdist
# 安装
python -m pip install .\dist\wuxk_jaina-0.0.1-py3-none-any.whl
字符串操作
# 输出变量 f-strings(格式化字符串字面量)
f"Created security group rule with result: {result}"
# 截取字符
text = "This is a sample string for demonstration."
first_11_chars = text[:11] # 截取前 11 个字符
print(first_11_chars)
# 分割字符(分割为数组)
if len(ports.split(':')) == 1:
port_min = port_max = ports
else:
port_min,port_max = ports.split(':')
# 忽略大小写
str = "Wuxk"
# 转换为小写
str.lower()
# 大写
str.upper()
# 保留两位小数
round(size, 2)
#去除字符串两端的空白字符(包括空格、制表符和换行符)
text = " Hello, World! "
stripped_text = text.strip()
print(stripped_text) # 输出: "Hello, World!"
判断对象类型
type()函数
print(type(sec))
字典类型(dict)
#创建
list = {
"name": "xiaoming",
"age": 17
}
#添加键值对
list["key_1"] = v
list["key_1"] = { v }
list["key_1"] = [ v ]
#读取
value = list["key_1"]
securitys = chkip(ip)["security_groups"]
for sec in securitys:
for k, v in sec.items():
if "open" in k:
print("Key:", k)
print("Value:", v)
列表推导式
[expression for item in iterable if condition]
# 普通循环
squares = []
for x in range(10):
squares.append(x ** 2)
# 列表推导式
squares = [x ** 2 for x in range(10)]
或者带条件
even_squares = [x ** 2 for x in range(10) if x % 2 == 0]
with
# python3
# with 语句在 Python 中用于简化资源管理和异常处理。它确保了即使出现错误,资源也能被正确释放。
# 进入上下文:执行 __enter__ 方法,并将其返回值绑定到 as 后的变量。
# 执行代码块:在 with 语句块内运行代码。
# 退出上下文:执行 __exit__ 方法,处理任何异常(如果有)并清理资源。__exit__ 方法的参数包括异常类型、异常值和 traceback。如果 __exit__ 返回 True,异常会被抑制;如果返回 False,异常将会被重新抛出。
# 文件操作
with open('file.txt', 'r') as file:
content = file.read()
# 文件在此自动关闭
# 数据库连接
with database.connect() as conn:
# 使用conn进行数据库操作
# 连接在此自动关闭
日志 logging
# 根据主函数中的日志level设置全局日志输出等级
# 主函数中配置日志
logging.basicConfig(
filename='../log/wuxk-' + date.today().strftime('%Y-%m-%d') + '.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s'
)
# 其他函数配置 创建一个日志记录器
logger = logging.getLogger(__name__)
logger.info('Checking IP: %s', ip)
logger.debug('Checking IP: %s', ip)
# 简单案例
import configparser
import os
import logging
from datetime import date
# 配置日志记录
logging.basicConfig(
filename='app-' + date.today().__str__() + '.log', # 日志文件名
level=logging.DEBUG, # 日志级别
format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s' # 日志格式 时间 - 等级 - 函数名 - 信息
)
def getUserPasswd():
conf_dir = "./config/test.conf"
config = configparser.ConfigParser()
if os.path.exists(conf_dir):
config.read(conf_dir)
username = config.get("database", "username")
passwd = config.get("database", "passwd")
logging.info("username, passwd = " + username + "," + passwd)
return username, passwd
else:
raise Exception("ConfigName: '" + conf_dir + "' Config file may not exists!")
try:
username, passwd = getUserPasswd()
except Exception as e:
logging.exception(e)
读写配置文件 configparser
"""
[Database]
host = localhost
port = 3306
user = admin
"""
import configparser
import os
def getUserPasswd():
conf_dir = "./config/test.conf"
config = configparser.ConfigParser()
if os.path.exists(conf_dir):
config.read(conf_dir)
username = config.get("database", "username")
passwd = config.get("database", "passwd")
print(username, passwd)
return username, passwd
else:
raise Exception("ConfigName: '" + conf_dir + "' Config file may not exists!")
读取文件
#逐行读取文件内容
with open('filename.txt', 'r') as file:
line = file.readline()
while line:
print(line, end='') # end='' 用于避免在每行末尾打印额外的换行符
line = file.readline()
#读取全部内容
with open('filename.txt', 'r') as file:
content = file.read()
print(content)
#覆盖写入文件
with open(file_path, 'w', encoding='utf-8') as f:
f.write(addresses)
#读取全部内容为列表
with open(dir,'r') as f:
lines = f.readlines()
获取时间
#获取日期和时间
from datetime import datetime, date
now = datetime.now()
print("当前日期和时间:", now)
today = date.today()
print("今天的日期:", today)
current_time = datetime.now().time()
print("当前时间:", current_time)
#获取时间戳
time.time()
# 时间转时间戳
from datetime import datetime
# 将字符串转换为 datetime 对象
date_str = "2020-07-28 00:00:00"
date_obj = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
# 将 datetime 对象转换为时间戳
timestamp = date_obj.timestamp()
print("时间戳:", timestamp)
requests
get
# 导入 requests 包
import requests
kw = {'s': 'python 教程'}
# 设置请求头
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36"
}
# params 接收一个字典或者字符串的查询参数,字典类型自动转换为url编码,不需要urlencode()
response = requests.get("https://v1.hitokoto.cn", params=kw, headers=headers)
# 查看响应状态码
print(response.status_code)
# 查看响应头部字符编码
print(response.encoding)
# 查看完整url地址
print(response.url)
# 查看响应内容,response.text 返回的是Unicode格式的数据
print(response.text)
print(response.json())
post
# 导入 requests 包
import requests
headers = {'User-Agent': 'Mozilla/5.0'} # 设置请求头
params = {'key1': 'value1', 'key2': 'value2'} # 设置查询参数
data = {'username': 'example', 'password': '123456'} # 设置请求体
# 方法一
response = requests.post('https://v1.hitokoto.cn', headers=headers, params=params, data=data)
#方法二
response = requests.request("POST", url, headers=headers, data=data)
response.raise_for_status()
# 查看响应状态码
print(response.status_code)
# 查看响应头部字符编码
print(response.encoding)
# 查看完整url地址
print(response.url)
# 查看响应内容,response.text 返回的是Unicode格式的数据
print(response.text)
print(response.json())
pymysql
import pymysql
import json
# 打开数据库连接
db = pymysql.connect(host='10.11.0.100',
user='root',
password='zPKXiuOdHiyIdK9e6uNAQVXDIs9gVRD1HFuDHQEP',
database='nova')
# 使用cursor()方法获取操作游标
cursor = db.cursor()
#sql语句
nova = ("select volume_id,instance_uuid,boot_index from block_device_mapping where uuid=%s")
try:
# 执行SQL语句
cursor.execute(nova, ("4daabc55-d533-4f03-be45-7a539f5b95f8", ))
results = cursor.fetchall()
results_list = []
for row in results:
results_list.append({
'volume_id': row[0],
'instance_uuid': row[1],
'boot_index': row[2]
})
#会使输出的 JSON 格式化为每级缩进 4 个空格
print(json.dumps(results_list, indent=4))
# 提交修改
# db.commit()
except Exception as e:
raise Exception("Select Error: " + e)
# 发生错误时回滚
# db.rollback()
finally:
# 关闭连接
db.close()
实时返回结果
#flask接口利用stream_with_context实时返回内容
from flask import Flask, request, jsonify, Response, stream_with_context
@app.route('/', methods=['POST'])
def main():
# Main operation
if op == "cmd":
return Response(stream_with_context(ssh_command_stream()), content_type='text/plain')
def ssh_command_stream():
sshChannel = sshLink("192.168.72.129", "22", "root", "8080", 100, 7200)
sshChannel.send("ping -c 5 www.baidu.com" + "\n")
while True:
output = sshChannel.recv(65535)
if output.__str__() == "b'[root@wuxk ~]# '":
break
yield output
print("result: " + output.__str__())
time.sleep(1) # Optionally add some delay to mimic long-polling behavior