#!/usr/bin/env python3"""
Kubernetes CNI Hotfix Tool
-------------------------Dependencies:
- docker-py: Docker API client
- psutil: Process and system utilities
- dbus-python: D-Bus interface for Python
- systemd-python: systemd integration
- python-daemon: Daemon process support
- pyinstaller: Binary packaging
- tqdm: For displaying progress bars, useful for indicating the progress of time-consuming operations like waiting for a container to restart.To install dependencies, run:sudo yum install systemd-develsudo yum install dbus-develpip install docker tqdm psutil dbus-python systemd-python python-daemon pyinstallerBuild instructions:pip install pyinstallerpyinstaller --name=kubecni --onefile --hidden-import=systemd --hidden-import=dbus --hidden-import=shutil --hidden-import=daemon --hidden-import=psutil --hidden-import=tqdm kubecni.pyService Type: forking (creates daemon process)# Start a service
systemd_manager.StartUnit('my_service.service', 'replace')# Stop a service
systemd_manager.StopUnit('my_service.service', 'replace')# Enable a service
systemd_manager.EnableUnitFiles(['my_service.service'], False, True)# Disable a service
systemd_manager.DisableUnitFiles(['my_service.service'], False)# Reload the systemd manager
systemd_manager.Reload()# List all units
units = systemd_manager.ListUnits()
for unit in units:print(unit)# Get a specific unit
unit = systemd_manager.GetUnit('my_service.service')"""import docker
import psutil
import time
import os
import sys
import shutil
import logging
import dbus
import daemon
from daemon import pidfile
from systemd import journal, daemon as systemd_daemon
import subprocess
import socket# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')class FixedSizeStack:def __init__(self, size=3): # Set size to 2self.size = sizeself.stack = []def push(self, value):if len(self.stack) >= self.size:self.stack.pop(0) # Remove the oldest valueself.stack.append(value)def get_stack(self):return self.stackdef __len__(self):return len(self.stack)class FlannelMonitor:def __init__(self):"""Initialize the FlannelMonitor class.Set up the Docker client and systemd manager to interact with containers and services."""self.docker_client = docker.from_env()self.logger = logging.getLogger('FlannelMonitor')self.system_bus = dbus.SystemBus()self.systemd_manager = dbus.Interface(self.system_bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1'),'org.freedesktop.systemd1.Manager')def check_service(self, service_name):"""Use D-Bus to check if the specified systemd service is running.Parameters:service_name (str): The name of the service to check.Returns:bool: True if the service is active, False otherwise."""try:unit = self.systemd_manager.GetUnit(f"{service_name}.service")unit_proxy = self.system_bus.get_object('org.freedesktop.systemd1', str(unit))unit_interface = dbus.Interface(unit_proxy, 'org.freedesktop.DBus.Properties')active_state = unit_interface.Get('org.freedesktop.systemd1.Unit', 'ActiveState')self.logger.info(f"{service_name} active state: {active_state}")return active_state == 'active'except dbus.DBusException as e:self.logger.error(f"Error checking {service_name}: {e}")return Falsedef check_kubelet_service(self):"""Check if the kubelet, docker, and containerd services are running.Returns:bool: True if all services are running, False otherwise."""result = (self.check_service('kubelet') andself.check_service('docker') andself.check_service('containerd'))self.logger.info(f"Kubelet, Docker, and Containerd services running: {result}")return resultdef get_flannel_container_id(self):"""Get the container ID and name of the flannel container.Returns:tuple: (container_id, container_name) if found, otherwise (None, None)."""try:flannel_container = Nonepod_container = Nonecontainers = self.docker_client.containers.list(filters={'status': 'running'})for container in containers:if 'k8s_kube-flannel' in container.name:flannel_container = containerelif 'k8s_POD_kube-flannel' in container.name:pod_container = containerif flannel_container and pod_container:self.logger.info(f"Found flannel container: {flannel_container.name}")return flannel_container.id, flannel_container.nameelse:if not flannel_container:self.logger.error("No running k8s_kube-flannel container found")if not pod_container:self.logger.error("No running k8s_POD_kube-flannel container found")return None, Noneexcept Exception as e:self.logger.error(f"Error getting flannel container ID: {e}")return None, Nonedef get_flannel_pid(self):"""Get the process ID (PID) of the flannel process.Returns:int: The PID of the flannel process if found, otherwise None."""try:for proc in psutil.process_iter(['name', 'pid']):if 'flannel' in proc.info['name']:self.logger.info(f"Found flannel process with PID: {proc.info['pid']}")return proc.info['pid']self.logger.error("Flannel process not found")return Noneexcept Exception as e:self.logger.error(f"Error getting flannel PID: {e}")return Nonedef check_cni0_device(self):"""Check if the cni0 network device exists and log its properties.Returns:bool: True if the cni0 device exists, otherwise False."""try:# Check if cni0 existsif 'cni0' in psutil.net_if_addrs():self.logger.info("cni0 device exists.")# Log the properties of the cni0 devicecni0_info = psutil.net_if_addrs()['cni0']self.logger.info("cni0 device properties:")for addr in cni0_info:self.logger.info(f" - Address: {addr.address}, Family: {addr.family}, Netmask: {addr.netmask}, Broadcast: {addr.broadcast}")return Trueelse:self.logger.info("cni0 device does not exist.")return Falseexcept Exception as e:self.logger.error(f"Error checking cni0 device: {e}")return Falsedef check_cni0_ip(self):"""Check if the cni0 network device has an IP address and log its properties.Returns:bool: True if cni0 has an IP address, otherwise False."""try:# Get the addresses for cni0if 'cni0' in psutil.net_if_addrs():cni0_info = psutil.net_if_addrs()['cni0']has_ip = Falsefor addr in cni0_info:# Check if the address family is AF_INET (IPv4)if addr.family == socket.AF_INET:has_ip = Trueself.logger.info(f"cni0 has an IP address: {addr.address}")break # Exit loop after finding the first IP addressif has_ip:return Trueelse:self.logger.error("cni0 does not have an IP address configured.")return Falseelse:self.logger.info("cni0 device does not exist.")return Falseexcept Exception as e:self.logger.error(f"Error checking cni0 IP configuration: {e}")return Falsedef create_cni0_device(self, ip_suffix, netmask, broadcast_suffix):"""Create the cni0 network device using the flannel binary.Parameters:ip_suffix (str): The last octet of the IP address for cni0.netmask (str): The subnet mask for the network.broadcast_suffix (str): The last octet of the broadcast address.Returns:bool: True if the cni0 device is created successfully, otherwise False."""try:flannel_bin_path = '/opt/cni/bin/flannel'if not os.path.exists(flannel_bin_path):self.logger.error(f"Flannel binary not found at {flannel_bin_path}")return Falseflannel_pid = self.get_flannel_pid()container_id, container_name = self.get_flannel_container_id()if not flannel_pid or not container_id:self.logger.error("Flannel process or container ID not found")return Falseenv = os.environ.copy()env.update({'CNI_COMMAND': 'ADD','CNI_CONTAINERID': container_id,'CNI_NETNS': f'/proc/{flannel_pid}/ns/net','CNI_IFNAME': 'cni0','CNI_PATH': '/opt/cni/bin'})# Log the environment variablesself.logger.info(f"Environment variables for cni0 creation: {env}")config_path = '/etc/cni/net.d/10-flannel.conflist'if not os.path.exists(config_path):self.logger.error(f"Configuration file {config_path} not found")return False# Log the contents of the configuration filewith open(config_path, 'r') as f:config_content = f.read()self.logger.info(f"Contents of {config_path}:\n{config_content}")result = subprocess.run([flannel_bin_path],input=config_content,env=env,universal_newlines=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)if result.returncode != 0:self.logger.error(f"Failed to create cni0 device: {result.stderr.strip()}")return False # Return False immediately if creation failsself.logger.info("cni0 device created successfully")return Trueexcept Exception as e:self.logger.error(f"Error creating cni0 device: {e}")return Falsedef fix_cni0_ip(self, ip_suffix, netmask, broadcast_suffix):"""Fix the IP address of the cni0 network device.Parameters:ip_suffix (str): The last octet of the IP address for cni0.netmask (str): The subnet mask for the network.broadcast_suffix (str): The last octet of the broadcast address.Returns:bool: True if the IP address is set successfully, otherwise False."""try:subnet_env_path = '/run/flannel/subnet.env'if not os.path.exists(subnet_env_path):self.logger.error(f"Subnet configuration file {subnet_env_path} not found")return False# Log the contents of the subnet configuration filewith open(subnet_env_path, 'r') as f:subnet_content = f.read()self.logger.info(f"Contents of {subnet_env_path}:\n{subnet_content}")for line in subnet_content.splitlines():if line.startswith('FLANNEL_SUBNET='):subnet = line.split('=')[1].strip()breakelse:self.logger.error("FLANNEL_SUBNET configuration not found")return Falsebase_ip = '.'.join(subnet.split('.')[:-1])cmd = f"ip addr add {base_ip}.{ip_suffix}/{netmask} brd {base_ip}.{broadcast_suffix} dev cni0"result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)if result.returncode == 0:self.logger.info("cni0 IP address set successfully")else:self.logger.error(f"Failed to set cni0 IP address: {result.stderr}")return result.returncode == 0except Exception as e:self.logger.error(f"Error setting cni0 IP address: {e}")return Falsedef delete_cni0_ip(self):"""Delete the IP address of the cni0 network device.Returns:bool: True if the IP address is deleted successfully, otherwise False."""try:cmd = "ip addr flush dev cni0" # Command to delete all IP addresses from cni0result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)if result.returncode == 0:self.logger.info("Existing IP address for cni0 deleted successfully")return Trueelse:self.logger.error(f"Failed to delete IP address for cni0: {result.stderr.strip()}")return Falseexcept Exception as e:self.logger.error(f"Error deleting IP address for cni0: {e}")return Falsedef monitor_loop(self, ip_suffix, netmask, broadcast_suffix, log_interval=1):"""Main monitoring loop that ensures the flannel CNI is correctly configured.Args:ip_suffix (str): The last octet of the IP address for cni0.netmask (str): The subnet mask for the network.broadcast_suffix (str): The last octet of the broadcast address.log_interval (int): The interval in seconds for logging status updates."""self.logger.info("CNI monitoring service started")# Notify systemd that the service is readysystemd_daemon.notify('READY=1')# Create a fixed-size stack to store the last three flanneld PIDsflanneld_pids = FixedSizeStack(size=6)while True:try:# Notify systemd that the service is alivesystemd_daemon.notify('WATCHDOG=1')# Log status every log_interval secondsself.logger.info("Monitoring CNI status...")# Sleep for the defined log intervaltime.sleep(log_interval) if self.check_kubelet_service():# Get the current flanneld PIDcurrent_flanneld_pid = self.get_flannel_pid()if current_flanneld_pid:# Push the current PID onto the stackflanneld_pids.push(current_flanneld_pid)# Check if cni0 device exists firstif not self.check_cni0_device():self.logger.info("cni0 device not found, waiting for 180 seconds before attempting to create...")time.sleep(180)# Now attempt to create the cni0 deviceif self.create_cni0_device(ip_suffix, netmask, broadcast_suffix):self.logger.info("cni0 device created successfully")else:self.logger.error("Failed to create cni0 device")continueelse:if len(flanneld_pids) == 6:# Compare the last two PIDs with the oldest PIDif flanneld_pids.get_stack()[4] != flanneld_pids.get_stack()[0]:self.logger.info("Detected change in flanneld PIDs, checking cni0 device...")# Clear the existing IP address on cni0 before setting a new oneif not self.delete_cni0_ip():self.logger.error("Failed flush ip to delete existing IP address for cni0")continueelse:# Now that we know cni0 exists, attempt to set the IP addressif not self.fix_cni0_ip(ip_suffix, netmask, broadcast_suffix):self.logger.error("Failed to set IP address for cni0")continueelse:self.logger.info("cni0 IP address set successfully.")continueelse:self.logger.info(f"flanneld_pids {len(flanneld_pids)}")continueexcept Exception as e:self.logger.error(f"Error in monitoring loop: {e}")def reload_systemd():"""Reload systemd manager configuration using D-Bus."""try:# Connect to the system busbus = dbus.SystemBus()# Get the systemd manager objectsystemd_manager = dbus.Interface(bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1'),'org.freedesktop.systemd1.Manager')# Call the Reload method to reload the systemd configurationsystemd_manager.Reload()logging.info("Systemd manager configuration reloaded successfully.")except dbus.DBusException as e:logging.error(f"Error reloading systemd manager configuration: {e}")except Exception as e:logging.error(f"Unexpected error: {e}")def install_service(force=False):"""Install systemd service"""service_name = 'kubecni' # Define the service nameservice_path = f'/etc/systemd/system/{service_name}.service'pid_file = '/var/run/kubecni.pid'# Reload systemd manager configurationlogging.info("Reloading systemd manager configuration...")reload_systemd()# Check if the service is running and stop it if necessarytry:if os.path.exists(service_path) and not force:logging.info("Service is already installed. Use --force to reinstall.")return True # Return True to indicate the service is installed# Check if the service is activebus = dbus.SystemBus()systemd_manager = dbus.Interface(bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1'),'org.freedesktop.systemd1.Manager')unit = systemd_manager.GetUnit(f"{service_name}.service")unit_proxy = bus.get_object('org.freedesktop.systemd1', str(unit))unit_interface = dbus.Interface(unit_proxy, 'org.freedesktop.DBus.Properties')active_state = unit_interface.Get('org.freedesktop.systemd1.Unit', 'ActiveState')if active_state == 'active':logging.info("Stopping the running service before installation...")systemd_manager.StopUnit(f'{service_name}.service', 'replace')# Wait for a moment to ensure the service has stoppedtime.sleep(10)systemd_manager.DisableUnitFiles([f'{service_name}.service'], False)# Wait for a moment to ensure the service has stoppedtime.sleep(10)# Check again if the service is still activeactive_state = unit_interface.Get('org.freedesktop.systemd1.Unit', 'ActiveState')if active_state == 'active':logging.error("Service could not be stopped. Please stop it manually before reinstalling.")return False # Exit if the service is still active# Reload systemd daemonreload_systemd()except Exception as e:logging.error(f"Error checking or stopping service: {e}")service_content = f"""[Unit]
Description=Kubernetes CNI Hotfix Monitor
After=network.target docker.service kubelet.service
Requires=docker.service kubelet.service[Service]
Type=forking
ExecStart=/usr/bin/kubecni --daemon=true
PIDFile={pid_file}
Restart=always[Install]
WantedBy=multi-user.target
"""try:# Remove existing PID file if it existsif os.path.exists(pid_file):os.remove(pid_file)logging.info(f"Removed existing PID file: {pid_file}")# Copy executabletry:shutil.copy2(sys.argv[0], '/usr/bin/kubecni')logging.info("Executable copied to /usr/bin/kubecni")except Exception as e:logging.error(f"Error copying executable: {e}")return False # Exit if copying fails# Create service filewith open(service_path, 'w') as f:f.write(service_content)logging.info(f"Service file created at: {service_path}")# Reload systemd daemonreload_systemd()# Enable service using dbusbus = dbus.SystemBus()systemd_manager = dbus.Interface(bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1'),'org.freedesktop.systemd1.Manager')systemd_manager.EnableUnitFiles([f'{service_name}.service'], False, True)# Wait for a moment to ensure the service has stoppedtime.sleep(10)# Reload systemd daemonreload_systemd()# Start the servicesystemd_manager.RestartUnit(f'{service_name}.service', 'replace')# Wait for a moment to ensure the service has stoppedtime.sleep(10)# Get the unit object directlyunit = systemd_manager.GetUnit(f"{service_name}.service")unit_proxy = bus.get_object('org.freedesktop.systemd1', str(unit))unit_interface = dbus.Interface(unit_proxy, 'org.freedesktop.DBus.Properties')active_state = unit_interface.Get('org.freedesktop.systemd1.Unit', 'ActiveState')if active_state == 'active':logging.info("Service installed and running successfully")return Trueelse:logging.error("Service installed and running failed to start")return Falseexcept Exception as e:logging.error(f"Error during installation: {e}")return Falsedef uninstall_service():"""Uninstall systemd service"""service_name = 'kubecni' # Define the service nameservice_path = f'/etc/systemd/system/{service_name}.service'pid_file = '/var/run/kubecni.pid'try:# Connect to the D-Bus system busbus = dbus.SystemBus()systemd_manager = dbus.Interface(bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1'),'org.freedesktop.systemd1.Manager')# Reload systemd manager configuration before stopping the servicereload_systemd()# Disable the servicelogging.info("Disabling the service...")systemd_manager.DisableUnitFiles([f'{service_name}.service'], False)logging.info("Service disabled.")# Stop the servicelogging.info("Service stopping in progress...")systemd_manager.StopUnit(f'{service_name}.service', 'replace')# Check the active state after stoppingunit = systemd_manager.GetUnit(f"{service_name}.service")unit_proxy = bus.get_object('org.freedesktop.systemd1', str(unit))unit_interface = dbus.Interface(unit_proxy, 'org.freedesktop.DBus.Properties')active_state = unit_interface.Get('org.freedesktop.systemd1.Unit', 'ActiveState')if active_state != 'active':logging.info("Service has stopped successfully.")# Remove the service fileif os.path.exists(service_path):os.remove(service_path)logging.info(f"Service file removed: {service_path}")# Remove the PID file if it existsif os.path.exists(pid_file):os.remove(pid_file)logging.info(f"PID file removed: {pid_file}")# Reload systemd manager configuration after removing the servicereload_systemd()else:logging.error("Service failed to stop.")except dbus.DBusException as e:logging.error(f"DBus error: {e}")except Exception as e:logging.error(f"Error uninstalling service: {e}")def main():# Log version informationversion = "2.0.0" # Specify the version herelogging.info(f"Kubecni version: {version}")# Configuration for IP settingsip_suffix = '1'netmask = '24'broadcast_suffix = '255'# Default log intervallog_interval = 5 # Set your desired log interval here (in seconds)if os.geteuid() != 0:logging.error("Root privileges are required")sys.exit(1)valid_args = {'--install', '--uninstall', '--execute', '--daemon', '--help', '--force'}provided_args = {arg.split('=')[0]: arg.split('=')[1] for arg in sys.argv[1:] if '=' in arg}if not provided_args:logging.error("No arguments provided. Use --help for usage information.")sys.exit(1)if not all(arg in valid_args for arg in provided_args.keys()):logging.error("Invalid argument(s) provided. Use --help for usage information.")logging.info("Usage: kubecni [--install=true | --uninstall=true | --execute=true | --daemon=true | --help]")sys.exit(1)if '--install' in provided_args and provided_args['--install'].lower() == 'true':force = provided_args.get('--force', 'false').lower() == 'true' # Check if --force is providedinstall_service(force) # Pass the force argument to the functionsys.exit(0)if '--uninstall' in provided_args and provided_args['--uninstall'].lower() == 'true':uninstall_service()sys.exit(0)if '--execute' in provided_args and provided_args['--execute'].lower() == 'true':monitor = FlannelMonitor()# Check system status before proceedingif not monitor.check_kubelet_service():monitor.logger.error("System check failed. Exiting.")sys.exit(1) # Exit if the system check fails# Check if cni0 device existsif not monitor.check_cni0_device():monitor.logger.info("cni0 device not found, attempting to create...")if monitor.create_cni0_device(ip_suffix, netmask, broadcast_suffix):monitor.logger.info("cni0 device created successfully")else:monitor.logger.error("Failed to create cni0 device")sys.exit(1) # Exit if the device creation fails# Now check if cni0 has a valid IP address configuredif not monitor.check_cni0_ip():monitor.logger.info("cni0 does not have a valid IP address configured, attempting to set IP...")if not monitor.fix_cni0_ip(ip_suffix, netmask, broadcast_suffix):monitor.logger.error("Failed to set IP address for cni0")sys.exit(1)else:monitor.logger.info("cni0 IP address set successfully.")else:# Clear the existing IP address on cni0 before setting a new oneif not monitor.delete_cni0_ip():monitor.logger.error("Failed flush ip to delete existing IP address for cni0")sys.exit(1)else:# Now that we know cni0 exists, attempt to set the IP addressif not monitor.fix_cni0_ip(ip_suffix, netmask, broadcast_suffix):monitor.logger.error("Failed to set IP address for cni0")sys.exit(1)else:monitor.logger.info("cni0 IP address set successfully.")sys.exit(0)if '--daemon' in provided_args and provided_args['--daemon'].lower() == 'true':logging.info("Starting the daemon process...") # Log output for starting the daemonpid_file = '/var/run/kubecni.pid' # Specify the PID file pathwith daemon.DaemonContext(pidfile=pidfile.TimeoutPIDLockFile(pid_file), # Manage the PID fileworking_directory='/',umask=0o022,detach_process=True):monitor = FlannelMonitor()monitor.monitor_loop(ip_suffix, netmask, broadcast_suffix, log_interval) # Pass log_interval heresys.exit(0)if '--help' in provided_args:logging.info("Usage: kubecni [--install=true | --uninstall=true | --execute=true | --daemon=true | --help]")sys.exit(0)if __name__ == "__main__":main()