Python

pip加速

# 命令行指定仓库地址
# --break-system-packages 用于在安装包时忽略系统级别的包。
# 这意味着,即使系统级别已经安装了某个包,pip 也会尝试从其指定的源(本例中为豆瓣镜像)重新安装该包,而不是使用系统级别的版本。
python3 -m pip install bs4 -i https://pypi.doubanio.com/simple/ --break-system-packages

Zabbix API

import requests
import json

url = "http://wuxk.tpddns.cn:12003/zabbix/api_jsonrpc.php"

payload = json.dumps({
  "jsonrpc": "2.0",
  "method": "user.login",
  "params": {
    "user": "Admin",
    "password": "Esxi0000.."
  },
  "id": 1,
  "auth": None
})
headers = {
  'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

命令行传参

#行内输入
import sys

dir = sys.argv[1:]
get_all_lvm_info(dir[0])

#交互式传参
dir = input("LVM Mount Point: ")
dir = dir.replace("/","")
get_all_lvm_info("/" + dir)

#选项传参
import argparse

    parser = argparse.ArgumentParser(description="A simple argument parser.")
    
    # 添加参数
    parser.add_argument('--name', type=str, help='Your name')
    parser.add_argument('--age', type=int, help='Your age')

    # 解析参数
    args = parser.parse_args()
    
    print(f"Name: {args.name}")
    print(f"Age: {args.age}")

lvm2py模块

#lvs
        Probes the system for volume groups and returns a list of VolumeGroup
        instances::

            from lvm2py import *

            lvm = LVM()
            vgs = lvm.vgscan()

#remove vg
            from lvm2py import *

            lvm = LVM()
            vg = lvm.get_vg("myvg", "w")
            lvm.remove_vg(vg)

#create vg
            from lvm2py import *

            lvm = LVM()
            vg = lvm.create_vg("myvg", ["/dev/sdb1", "/dev/sdb2"])

#get vg
            from lvm2py import *

            lvm = LVM()
            vg = lvm.get_vg("myvg")

#get pv
        from lvm2py import *

        lvm = LVM()
        vg = lvm.get_vg("myexistingvg")
        # get pv from pvscan
        pv1 = vg.pvscan()[0]
        # get it by name
        pv2 = vg.get_pv("/dev/sdb1")
        # or just instantiate it
        pv3 = PhysicalVolume(vg, name="/dev/sdb1")

        Probes the volume group for physical volumes and returns a list of
        PhysicalVolume instances::

            from lvm2py import *

            lvm = LVM()
            vg = lvm.get_vg("myvg")
            pvs = vg.pvscan()

#add pv
        Initializes a device as a physical volume and adds it to the volume group::

            from lvm2py import *

            lvm = LVM()
            vg = lvm.get_vg("myvg", "w")
            vg.add_pv("/dev/sdbX")

#get pv
        Returns the physical volume associated with the given device::

            from lvm2py import *

            lvm = LVM()
            vg = lvm.get_vg("myvg", "w")
            vg.get_pv("/dev/sdb1")

#get lv
        Returns a LogicalVolume instance given an existin logical volume name::

            from lvm2py import *

            lvm = LVM()
            vg = lvm.get_vg("myvg", "w")
            vg.get_lv("mylv")
#remove pv 
        Removes a physical volume from the volume group::

            from lvm2py import *

            lvm = LVM()
            vg = lvm.get_vg("myvg", "w")
            pv = vg.pvscan()[0]
            vg.remove_pv(pv)

setup.py

# 当前目录任何时候都能作为程序的家目录

C:\USERS\WUXK\WUXK\setup.py

C:\USERS\WUXK\WUXK\wuxk_jaina
│  __init__.py
│  
└─ run
   │  main.py
   │
   ├─config
   │      pod5.ini
   │      pod52.ini
   │      pod6.ini
   │      test.ini
   │
   └─tools
      │  addrule.py
      │  check_rule.py
      │  chkip.py
      │  chkpod.py
      │  chkvip.py
      │  ConfigUtils.py
      │  create_security_group.py
      │  create_security_group_rule.py
      │  delete_security_group_rule.py
      │  get_token.py
      │  port_add_security_group.py
      └─ verify_token.py

# setup.py
from setuptools import setup, find_packages

setup(
    name='wuxk_jaina',
    version='0.0.1',
    packages=find_packages(),
    package_data={
        '':[
            'run/tools/add*.py',
            'run/tools/chk*.py',
            'run/tools/check*.py',
            'run/tools/create_*.py',
            'run/tools/delete_*.py',
            'run/tools/get_*.py',
            'run/tools/ConfigUtils.py',
            'run/tools/port_add_security_group.py',
            'run/tools/port_add_security_group.py',
            'run/tools/verify_token.py',
            'run/main.py',
            'run/config/test.ini',
            'run/config/pod*.ini'
        ]
    },
    python_requires='>=3',
    install_requires=['pymysql>=1.1.1','flask>=3.0.2', 'requests>=2.31.0'],
    url='',
    license='',
    author='Wuxk',
    author_email='1218304973@qq.com',
    description='',
)

# build 测试 (打包前执行一下,防止新修改不生效)
python setup.py build

# 生成pip安装包(wuxk_jaina-0.0.1-py3-none-any.whl)
python setup.py bdist_wheel

# 生成压缩包文件(.tar.gz)
python setup.py sdist

# 安装
python -m pip install .\dist\wuxk_jaina-0.0.1-py3-none-any.whl

字符串操作

# 输出变量 f-strings(格式化字符串字面量)
f"Created security group rule with result: {result}"

# 截取字符
text = "This is a sample string for demonstration."
first_11_chars = text[:11]  # 截取前 11 个字符
print(first_11_chars)


# 分割字符(分割为数组)
    if len(ports.split(':')) == 1:
        port_min = port_max = ports
    else:
        port_min,port_max = ports.split(':')


# 忽略大小写
str = "Wuxk"
# 转换为小写
str.lower()
# 大写
str.upper()


# 保留两位小数
round(size, 2)

#去除字符串两端的空白字符(包括空格、制表符和换行符)
text = "   Hello, World!   "
stripped_text = text.strip()
print(stripped_text)  # 输出: "Hello, World!"

判断对象类型

type()函数
print(type(sec))

字典类型(dict)

#创建
list = {
	"name": "xiaoming",
	"age": 17
}

#添加键值对
list["key_1"] = v
list["key_1"] = { v }
list["key_1"] = [ v ]

#读取
value = list["key_1"]

securitys = chkip(ip)["security_groups"]
for sec in securitys:
    for k, v in sec.items():
        if "open" in k:
            print("Key:", k)
            print("Value:", v)
	

列表推导式

[expression for item in iterable if condition]


# 普通循环
squares = []
for x in range(10):
    squares.append(x ** 2)

# 列表推导式
squares = [x ** 2 for x in range(10)]
或者带条件
even_squares = [x ** 2 for x in range(10) if x % 2 == 0]

with

# python3
# with 语句在 Python 中用于简化资源管理和异常处理。它确保了即使出现错误,资源也能被正确释放。
# 进入上下文:执行 __enter__ 方法,并将其返回值绑定到 as 后的变量。
# 执行代码块:在 with 语句块内运行代码。
# 退出上下文:执行 __exit__ 方法,处理任何异常(如果有)并清理资源。__exit__ 方法的参数包括异常类型、异常值和 traceback。如果 __exit__ 返回 True,异常会被抑制;如果返回 False,异常将会被重新抛出。

# 文件操作
with open('file.txt', 'r') as file:
    content = file.read()
# 文件在此自动关闭

# 数据库连接
with database.connect() as conn:
    # 使用conn进行数据库操作
# 连接在此自动关闭

日志 logging

# 根据主函数中的日志level设置全局日志输出等级
# 主函数中配置日志
logging.basicConfig(
    filename='../log/wuxk-' + date.today().strftime('%Y-%m-%d') + '.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s'
)
# 其他函数配置 创建一个日志记录器
logger = logging.getLogger(__name__)
logger.info('Checking IP: %s', ip)
logger.debug('Checking IP: %s', ip)





# 简单案例
import configparser
import os
import logging
from datetime import date

# 配置日志记录
logging.basicConfig(
    filename='app-' + date.today().__str__()   + '.log',  # 日志文件名
    level=logging.DEBUG,  # 日志级别
    format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s'  # 日志格式 时间 - 等级 - 函数名 - 信息
)

def getUserPasswd():
    conf_dir = "./config/test.conf"
    config = configparser.ConfigParser()

    if os.path.exists(conf_dir):
        config.read(conf_dir)
        username = config.get("database", "username")
        passwd = config.get("database", "passwd")
        logging.info("username, passwd = " + username + "," + passwd)
        return username, passwd
    else:
        raise Exception("ConfigName: '" + conf_dir + "' Config file may not exists!")

try:
    username, passwd = getUserPasswd()
except Exception as e:
    logging.exception(e)

读写配置文件 configparser

"""
[Database]
host = localhost
port = 3306
user = admin
"""

import configparser
import os

def getUserPasswd():
    conf_dir = "./config/test.conf"
    config = configparser.ConfigParser()

    if os.path.exists(conf_dir):
        config.read(conf_dir)
        username = config.get("database", "username")
        passwd = config.get("database", "passwd")
        print(username, passwd)
        return username, passwd
    else:
        raise Exception("ConfigName: '" + conf_dir + "' Config file may not exists!")

读取文件

#逐行读取文件内容
with open('filename.txt', 'r') as file:  
    line = file.readline()  
    while line:  
        print(line, end='')  # end='' 用于避免在每行末尾打印额外的换行符  
        line = file.readline()

#读取全部内容
with open('filename.txt', 'r') as file:  
    content = file.read()  
print(content)

#覆盖写入文件
with open(file_path, 'w', encoding='utf-8') as f:
	f.write(addresses)

#读取全部内容为列表
with open(dir,'r') as f:
    lines = f.readlines()

获取时间

#获取日期和时间
from datetime import datetime, date
  
now = datetime.now()  
print("当前日期和时间:", now)

today = date.today()  
print("今天的日期:", today)

current_time = datetime.now().time()  
print("当前时间:", current_time)

#获取时间戳
time.time()

# 时间转时间戳
from datetime import datetime

# 将字符串转换为 datetime 对象
date_str = "2020-07-28 00:00:00"
date_obj = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")

# 将 datetime 对象转换为时间戳
timestamp = date_obj.timestamp()

print("时间戳:", timestamp)

requests

get

# 导入 requests 包
import requests

kw = {'s': 'python 教程'}

# 设置请求头
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36"
}

# params 接收一个字典或者字符串的查询参数,字典类型自动转换为url编码,不需要urlencode()
response = requests.get("https://v1.hitokoto.cn", params=kw, headers=headers)

# 查看响应状态码
print(response.status_code)

# 查看响应头部字符编码
print(response.encoding)

# 查看完整url地址
print(response.url)

# 查看响应内容,response.text 返回的是Unicode格式的数据
print(response.text)
print(response.json())

post

# 导入 requests 包
import requests

headers = {'User-Agent': 'Mozilla/5.0'}  # 设置请求头
params = {'key1': 'value1', 'key2': 'value2'}  # 设置查询参数
data = {'username': 'example', 'password': '123456'}  # 设置请求体

# 方法一
response = requests.post('https://v1.hitokoto.cn', headers=headers, params=params, data=data)

#方法二
response = requests.request("POST", url, headers=headers, data=data)
response.raise_for_status()

# 查看响应状态码
print(response.status_code)

# 查看响应头部字符编码
print(response.encoding)

# 查看完整url地址
print(response.url)

# 查看响应内容,response.text 返回的是Unicode格式的数据
print(response.text)
print(response.json())

pymysql

import pymysql
import json

# 打开数据库连接
db = pymysql.connect(host='10.11.0.100',
                     user='root',
                     password='zPKXiuOdHiyIdK9e6uNAQVXDIs9gVRD1HFuDHQEP',
                     database='nova')

# 使用cursor()方法获取操作游标
cursor = db.cursor()

#sql语句
nova = ("select volume_id,instance_uuid,boot_index from block_device_mapping where uuid=%s")

try:
    # 执行SQL语句
    cursor.execute(nova, ("4daabc55-d533-4f03-be45-7a539f5b95f8", ))
    results = cursor.fetchall()

    results_list = []
    for row in results:
        results_list.append({
            'volume_id': row[0],
            'instance_uuid': row[1],
            'boot_index': row[2]
        })
    #会使输出的 JSON 格式化为每级缩进 4 个空格
    print(json.dumps(results_list, indent=4))

    # 提交修改
    # db.commit()

except Exception as e:
    raise Exception("Select Error: " + e)

    # 发生错误时回滚
    # db.rollback()

finally:
    # 关闭连接
    db.close()

实时返回结果

#flask接口利用stream_with_context实时返回内容
from flask import Flask, request, jsonify, Response, stream_with_context

@app.route('/', methods=['POST'])
def main():
    # Main operation
    if op == "cmd":
        return Response(stream_with_context(ssh_command_stream()), content_type='text/plain')

def ssh_command_stream():
    sshChannel = sshLink("192.168.72.129", "22", "root", "8080", 100, 7200)
    sshChannel.send("ping -c 5 www.baidu.com" + "\n")
    while True:
        output = sshChannel.recv(65535)
        if output.__str__() == "b'[root@wuxk ~]# '":
            break
        yield output
        print("result: " + output.__str__())
        time.sleep(1)  # Optionally add some delay to mimic long-polling behavior

OpenStack 2025-07-15
openssh打包rpm脚本 2025-07-15

评论区