尧图网站建设 尧图网络
  • 首页
  • 关于我们
  • 服务项目
  • 案例展示
  • 建站流程
  • 资讯中心
  • 联系我们
首页/资讯中心/详情

多线程+asyncio端口扫描器

多线程+asyncio端口扫描器
📅 发布时间:2026/6/20 5:20:48
#!/usr/bin/env python3
"""
多线程+asyncio端口扫描器
结合了异步IO和多线程技术,实现高效的端口扫描功能
"""import asyncio
import socket
import threading
import time
import argparse
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple, Dictclass AsyncPortScanner:"""异步端口扫描器类使用asyncio和ThreadPoolExecutor实现高效的并发端口扫描"""def __init__(self, target: str, start_port: int, end_port: int, max_threads: int = 100, timeout: float = 1.0):"""初始化端口扫描器Args:target: 目标IP地址或域名start_port: 起始端口号end_port: 结束端口号max_threads: 最大线程数timeout: 连接超时时间(秒)"""self.target = targetself.start_port = start_portself.end_port = end_portself.max_threads = max_threadsself.timeout = timeoutself.open_ports = []  # 存储发现的开放端口self.lock = threading.Lock()  # 线程锁,保护共享资源self.total_ports = end_port - start_port + 1self.scanned_ports = 0async def scan_port(self, port: int) -> bool:"""异步扫描单个端口Args:port: 要扫描的端口号Returns:bool: 端口是否开放"""try:# 使用asyncio的run_in_executor在线程池中执行同步的socket操作# 这样可以在不阻塞事件循环的情况下执行耗时的网络IO操作loop = asyncio.get_event_loop()with ThreadPoolExecutor(max_workers=1) as executor:result = await loop.run_in_executor(executor, self._sync_connect, port)return resultexcept Exception as e:return Falsedef _sync_connect(self, port: int) -> bool:"""同步连接测试方法在线程池中执行,测试指定端口是否开放Args:port: 要测试的端口号Returns:bool: 连接是否成功"""try:# 创建TCP socketsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.settimeout(self.timeout)# connect_ex()方法返回0表示连接成功result = sock.connect_ex((self.target, port))sock.close()return result == 0except Exception:return Falseasync def scan_port_range_chunk(self, start: int, end: int) -> List[int]:"""扫描端口范围块将大范围的端口分成小块进行并发扫描Args:start: 起始端口end: 结束端口Returns:List[int]: 该端口范围内的开放端口列表"""tasks = []# 为每个端口创建异步任务for port in range(start, end + 1):task = asyncio.create_task(self.scan_port_with_update(port))tasks.append(task)# 并发执行所有端口扫描任务results = await asyncio.gather(*tasks)# 筛选出开放的端口open_ports = [start + i for i, is_open in enumerate(results) if is_open]return open_portsasync def scan_port_with_update(self, port: int) -> bool:"""扫描端口并更新进度显示在扫描过程中实时更新进度条和发现的开放端口Args:port: 要扫描的端口号Returns:bool: 端口是否开放"""is_open = await self.scan_port(port)# 使用线程锁保护共享变量的更新
        with self.lock:self.scanned_ports += 1progress = (self.scanned_ports / self.total_ports) * 100print(f"\r扫描进度: {self.scanned_ports}/{self.total_ports} ({progress:.1f}%)", end="")# 如果发现开放端口,立即显示if is_open:with self.lock:self.open_ports.append(port)print(f"\n[+] 发现开放端口: {port}")return is_openasync def run_scan(self) -> List[int]:"""运行完整的端口扫描流程Returns:List[int]: 所有发现的开放端口列表"""print(f"开始扫描 {self.target} 的端口 {self.start_port}-{self.end_port}")print(f"使用 {self.max_threads} 个线程,超时时间: {self.timeout}秒")start_time = time.time()# 将端口范围分成多个块,每个块由一个线程处理# 这种分块策略可以更好地利用系统资源chunk_size = max(1, self.total_ports // self.max_threads)tasks = []# 为每个端口块创建异步任务for i in range(self.start_port, self.end_port + 1, chunk_size):chunk_end = min(i + chunk_size - 1, self.end_port)task = asyncio.create_task(self.scan_port_range_chunk(i, chunk_end))tasks.append(task)# 等待所有端口块扫描完成chunk_results = await asyncio.gather(*tasks)# 合并所有块的结果for chunk_ports in chunk_results:self.open_ports.extend(chunk_ports)end_time = time.time()elapsed_time = end_time - start_time# 显示扫描统计信息print(f"\n\n扫描完成!")print(f"扫描耗时: {elapsed_time:.2f}秒")print(f"发现 {len(self.open_ports)} 个开放端口")return sorted(self.open_ports)class PortScannerService:"""多目标扫描服务类管理多个目标的并发扫描任务"""def __init__(self):"""初始化扫描服务"""self.scan_results: Dict[str, List[int]] = {}  # 存储所有目标的扫描结果
    async def scan_multiple_targets(self, targets: List[str], start_port: int, end_port: int,max_threads: int = 100) -> Dict[str, List[int]]:"""并发扫描多个目标Args:targets: 目标IP或域名列表start_port: 起始端口end_port: 结束端口max_threads: 每个目标的最大线程数Returns:Dict[str, List[int]]: 每个目标对应的开放端口字典"""print(f"开始扫描 {len(targets)} 个目标...")# 为每个目标创建独立的扫描器实例tasks = []for target in targets:scanner = AsyncPortScanner(target, start_port, end_port, max_threads)task = asyncio.create_task(scanner.run_scan())tasks.append((target, task))# 并发执行所有扫描任务for target, task in tasks:try:open_ports = await taskself.scan_results[target] = open_portsexcept Exception as e:print(f"扫描 {target} 时出错: {e}")self.scan_results[target] = []return self.scan_resultsdef display_results(self):"""格式化显示所有目标的扫描结果提供清晰的汇总报告"""print("\n" + "="*50)print("扫描结果汇总:")print("="*50)for target, ports in self.scan_results.items():print(f"\n目标: {target}")if ports:print(f"开放端口 ({len(ports)}个): {', '.join(map(str, ports))}")else:print("未发现开放端口")async def main():"""主函数处理命令行参数并启动端口扫描"""# 创建命令行参数解析器parser = argparse.ArgumentParser(description="多线程+asyncio端口扫描器")parser.add_argument("target", help="目标IP或域名")parser.add_argument("-s", "--start", type=int, default=1, help="起始端口 (默认: 1)")parser.add_argument("-e", "--end", type=int, default=1024, help="结束端口 (默认: 1024)")parser.add_argument("-t", "--threads", type=int, default=100, help="最大线程数 (默认: 100)")parser.add_argument("--timeout", type=float, default=1.0, help="连接超时时间(秒) (默认: 1.0)")# 解析命令行参数args = parser.parse_args()# 创建并配置扫描器scanner = AsyncPortScanner(target=args.target,start_port=args.start,end_port=args.end,max_threads=args.threads,timeout=args.timeout)# 运行扫描open_ports = await scanner.run_scan()# 显示最终结果if open_ports:print(f"\n{args.target} 的开放端口: {', '.join(map(str, open_ports))}")else:print(f"\n{args.target} 没有发现开放端口")if __name__ == "__main__":# 启动异步主程序asyncio.run(main())

 

#!/usr/bin/env python3
import asyncio
from port_scanner import PortScannerServiceasync def demo():"""演示多目标扫描功能"""service = PortScannerService()# 扫描多个目标targets = ["127.0.0.1", "localhost"]start_port = 1end_port = 1000max_threads = 50print(f"开始扫描 {len(targets)} 个目标的端口 {start_port}-{end_port}")results = await service.scan_multiple_targets(targets=targets,start_port=start_port,end_port=end_port,max_threads=max_threads)# 显示结果
    service.display_results()if __name__ == "__main__":asyncio.run(demo())

 

相关新闻

  • U635735 Treap=Tree+Heap
  • Docker客户端控制局域网服务器 - a-cool
  • U635734 神机

最新新闻

  • DeepTutor:你的智能学习伙伴,让AI辅导无处不在
  • 鸿蒙 Next 相亲防骗雷达 App 开发实战:防骗教育 + 交互式自测 + 内容驱动设计
  • 免熏蒸木箱个性化方案哪家好? - 工业品牌热点
  • 嵌入式音频设计:I2S/SAI时序解析与低功耗模式实战
  • 呼伦贝尔市2026年最新黄金回收+白银回收+铂金回收+彩金回收门店TOP排行榜+推荐及联系方式+地址+电话+靠谱店铺指南 - 大熊猫898989
  • Codex 如何使用更高效:一篇讲透实战方法的博文

日新闻

  • 信任的进化:技术实现详解——如何用JavaScript构建博弈论模拟器
  • Terrakube自定义工作流:如何集成OPA、Infracost等工具扩展IaC能力
  • grunt-concurrent快速入门:5分钟学会并行运行Grunt任务

周新闻

  • 3步解锁iOS设备:applera1n激活锁绕过完全指南
  • 39 2026 人工智能证书终极盘点,普通人选 AI 证书可以从这些方向入手
  • Redis 暴露公网有多危险?从端口检查到补救步骤

月新闻

  • 【总结】入门篇:50句话让你记住架构核心概念
  • WeChatMsg技术方案解析:实现Mac微信数据自主管理的完整解决方案
  • WeChatMsg:革新性微信数据备份方案,打造你的专属数字记忆库

关于尧图

  • 公司简介
  • 团队介绍
  • 企业文化
  • 荣誉资质

服务项目

  • 定制开发
  • 电商建站
  • UI 设计
  • 运维服务

快速链接

  • 案例展示
  • 建站流程
  • 常见问题
  • 资讯中心

联系方式

  • 📍北京市朝阳区互联网产业园 A 座 10 层
  • 📞400-888-8888
  • ✉️contact@rkmt.cn
  • 🕐周一至周日 9:00-21:00

© 2024 北京尧图网络科技有限公司 版权所有 | 京 ICP 备 XXXXXXXX 号