291 lines
12 KiB
Python
291 lines
12 KiB
Python
import time
|
|
import signal
|
|
import socketio
|
|
print(dir(socketio))
|
|
import openpyxl
|
|
import eventlet
|
|
from threading import Lock
|
|
import os
|
|
import csv
|
|
import pexpect
|
|
import sys
|
|
import re
|
|
from PIL import Image
|
|
import pytesseract
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.common.keys import Keys
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import TimeoutException, NoSuchElementException, ElementClickInterceptedException
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
|
|
# Initialize Socket.IO server
|
|
sio = socketio.Server(cors_allowed_origins='*')
|
|
app = socketio.WSGIApp(sio)
|
|
|
|
# Global variables
|
|
ssh_tunnel = None
|
|
ssh_lock = Lock()
|
|
driver = None
|
|
stop_requested = False
|
|
processing_log = "/Users/macos/Documents/SINDIGILIVE/CLIENTS/ICON+/Project-HES/Update_DCU/processing_log.txt"
|
|
|
|
# Signal handler for terminating processes
|
|
def handle_stop_signal(signum, frame):
|
|
global stop_requested
|
|
stop_requested = True
|
|
log_processing("Stop signal received, stopping process...", processing_log)
|
|
stop_ssh_tunnel()
|
|
|
|
signal.signal(signal.SIGTERM, handle_stop_signal)
|
|
|
|
# Function to log processing details
|
|
def log_processing(message, log_file):
|
|
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
|
with open(log_file, "a") as log:
|
|
log.write(f"{timestamp} - {message}\n")
|
|
|
|
# Function to create an SSH tunnel
|
|
def create_ssh_tunnel(dcu_ip, ssh_user="callysta_icon", ssh_host="10.232.4.113", ssh_port=21112, ssh_password="Callysta_icon2024!"):
|
|
global ssh_tunnel
|
|
ssh_command = f"ssh -L 8888:{dcu_ip}:80 {ssh_user}@{ssh_host} -p {ssh_port}"
|
|
log_processing(f"Starting SSH tunnel to {dcu_ip}", processing_log)
|
|
try:
|
|
child = pexpect.spawn(ssh_command, timeout=60)
|
|
child.expect(f"{ssh_user}@{ssh_host}'s password: ")
|
|
child.sendline(ssh_password)
|
|
ssh_tunnel = child
|
|
log_processing(f"SSH tunnel to {dcu_ip} established.", processing_log)
|
|
return True
|
|
except pexpect.exceptions.TIMEOUT as e:
|
|
log_processing(f"SSH connection timeout to {dcu_ip}: {str(e)}", processing_log)
|
|
except pexpect.exceptions.EOF as e:
|
|
log_processing(f"SSH connection error to {dcu_ip}: {str(e)}", processing_log)
|
|
return False
|
|
|
|
# Function to terminate the SSH tunnel and WebDriver
|
|
def stop_ssh_tunnel():
|
|
global ssh_tunnel, driver
|
|
if ssh_tunnel and ssh_tunnel.isalive():
|
|
ssh_tunnel.terminate()
|
|
log_processing("SSH tunnel terminated successfully.", processing_log)
|
|
ssh_tunnel = None
|
|
else:
|
|
log_processing("No active SSH tunnel to terminate.", processing_log)
|
|
if driver:
|
|
try:
|
|
driver.quit()
|
|
log_processing("Driver process quit successfully.", processing_log)
|
|
except Exception as e:
|
|
log_processing(f"Error quitting driver: {str(e)}", processing_log)
|
|
finally:
|
|
driver = None
|
|
|
|
# Socket.IO events for starting and stopping the tunnel
|
|
@sio.event
|
|
def connect(sid, environ):
|
|
log_processing(f"Client connected: {sid}", processing_log)
|
|
|
|
@sio.event
|
|
def disconnect(sid):
|
|
log_processing(f"Client disconnected: {sid}", processing_log)
|
|
stop_ssh_tunnel() # Stop SSH tunnel if the client disconnects
|
|
|
|
@sio.event
|
|
def stop_tunnel(sid):
|
|
with ssh_lock:
|
|
if stop_ssh_tunnel():
|
|
sio.emit("tunnel_status", {"status": "terminated"}, room=sid)
|
|
else:
|
|
sio.emit("tunnel_status", {"status": "not_active"}, room=sid)
|
|
|
|
# Placeholder functions for Selenium operations
|
|
def wait_for_page_to_load(driver, timeout=30, processing_log="processing_log.txt"):
|
|
"""Wait for the page to completely load."""
|
|
log_processing("Waiting for the page to load", processing_log)
|
|
try:
|
|
WebDriverWait(driver, timeout).until(
|
|
lambda d: d.execute_script("return document.readyState") == "complete"
|
|
)
|
|
log_processing("Page fully loaded.", processing_log)
|
|
except TimeoutException:
|
|
log_processing("Timeout waiting for page load.", processing_log)
|
|
raise Exception("Timeout waiting for page load.")
|
|
|
|
def click_running_state_tab(driver, processing_log="processing_log.txt"):
|
|
"""Ensure the 'Running State' tab is clickable and click it."""
|
|
log_processing("Attempting to click 'Running State' tab", processing_log)
|
|
try:
|
|
running_state_tab = WebDriverWait(driver, 10).until(
|
|
EC.presence_of_element_located((By.ID, "running-state"))
|
|
)
|
|
driver.execute_script("arguments[0].scrollIntoView(true);", running_state_tab)
|
|
WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.ID, "running-state")))
|
|
|
|
if "sun-navbar_item is-active" not in running_state_tab.get_attribute("class"):
|
|
running_state_tab.click()
|
|
log_processing("Clicked 'Running State' tab.", processing_log)
|
|
else:
|
|
log_processing("'Running State' tab already active.", processing_log)
|
|
except TimeoutException:
|
|
log_processing("Timeout waiting for 'Running State' tab.", processing_log)
|
|
raise Exception("Timeout waiting for 'Running State' tab.")
|
|
except Exception as e:
|
|
log_processing(f"Error clicking 'Running State' tab: {e}", processing_log)
|
|
raise
|
|
|
|
def extract_text_and_verify(dcu_ip, expected_ip, processing_log="processing_log.txt"):
|
|
screenshot_path = f"/Users/macos/Documents/SINDIGILIVE/CLIENTS/ICON+/Project-HES/Update_DCU/evidence/{dcu_ip}.png"
|
|
status = "Failed"
|
|
dcu_id = "Not found"
|
|
|
|
log_processing(f"Starting text extraction for {dcu_ip}", processing_log)
|
|
try:
|
|
image = Image.open(screenshot_path)
|
|
log_processing(f"Screenshot opened for {dcu_ip}", processing_log)
|
|
|
|
extracted_text = pytesseract.image_to_string(image)
|
|
log_processing(f"Extracted text: {extracted_text}", processing_log)
|
|
|
|
dcu_id_match = re.search(r'\b\d{14}\b', extracted_text)
|
|
if dcu_id_match:
|
|
dcu_id = dcu_id_match.group(0)
|
|
log_processing(f"DCU ID found: {dcu_id}", processing_log)
|
|
else:
|
|
log_processing("DCU ID not found in text.", processing_log)
|
|
|
|
if expected_ip in extracted_text and "Success" in extracted_text:
|
|
status = "Success"
|
|
log_processing(f"Expected IP and Success found for {dcu_ip}", processing_log)
|
|
else:
|
|
log_processing(f"Expected IP or 'Success' not found in text for {dcu_ip}", processing_log)
|
|
|
|
except FileNotFoundError:
|
|
log_processing(f"Screenshot not found for {dcu_ip} at {screenshot_path}", processing_log)
|
|
status = "Screenshot not found"
|
|
|
|
return dcu_id, status
|
|
|
|
def verify_dcu_update(driver, expected_ip, dcu_ip, retries=5, processing_log="processing_log.txt"):
|
|
save_directory = "/Users/macos/Documents/SINDIGILIVE/CLIENTS/ICON+/Project-HES/Update_DCU/evidence"
|
|
os.makedirs(save_directory, exist_ok=True)
|
|
screenshot_path = os.path.join(save_directory, f"{dcu_ip}.png")
|
|
|
|
log_processing(f"Starting verification for {dcu_ip}", processing_log)
|
|
|
|
for attempt in range(retries):
|
|
try:
|
|
log_processing(f"Attempt {attempt + 1} for {dcu_ip}", processing_log)
|
|
WebDriverWait(driver, 15).until(lambda d: d.execute_script("return document.readyState") == "complete")
|
|
|
|
iframe = driver.find_element(By.ID, 'home_content_iframe')
|
|
driver.switch_to.frame(iframe)
|
|
|
|
element = WebDriverWait(driver, 30).until(
|
|
EC.presence_of_element_located((By.XPATH, '/html/body/div[1]/div/div[2]/div[2]/div[1]/div/div[1]/div[2]'))
|
|
)
|
|
|
|
actual_text = element.text
|
|
log_processing(f"Text found for {dcu_ip}: {actual_text}", processing_log)
|
|
|
|
if actual_text == expected_ip:
|
|
log_processing(f"Verification successful for {dcu_ip}", processing_log)
|
|
driver.save_screenshot(screenshot_path)
|
|
return
|
|
else:
|
|
log_processing(f"Expected: {expected_ip}, found: {actual_text} for {dcu_ip}", processing_log)
|
|
raise Exception("Verification failed: Text does not match expected IP.")
|
|
|
|
except Exception as e:
|
|
log_processing(f"Error during attempt {attempt + 1} for {dcu_ip}: {e}", processing_log)
|
|
driver.switch_to.default_content()
|
|
driver.refresh()
|
|
time.sleep(5)
|
|
|
|
raise Exception(f"Verification failed for {dcu_ip} after {retries} attempts.")
|
|
|
|
# Function to update DCU IP
|
|
def update_dcu_ip(updated_log, ip_list, processing_log):
|
|
global stop_requested, driver
|
|
log_processing("Starting DCU IP update process", processing_log)
|
|
for ip in ip_list:
|
|
if stop_requested:
|
|
log_processing("Stop requested, terminating process.", processing_log)
|
|
break
|
|
|
|
log_processing(f"Processing DCU IP: {ip}", processing_log)
|
|
|
|
if not create_ssh_tunnel(ip):
|
|
log_processing(f"Failed to create SSH tunnel for {ip}", processing_log)
|
|
continue
|
|
|
|
with open(updated_log, "a", newline="") as log:
|
|
writer = csv.writer(log)
|
|
if log.tell() == 0:
|
|
writer.writerow(["DCU IP", "DCU ID", "Updated IP", "Update Status"])
|
|
|
|
chrome_options = webdriver.ChromeOptions()
|
|
chrome_options.binary_location = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"
|
|
chrome_options.add_argument("--headless")
|
|
driver_path = "/Users/macos/Documents/SINDIGILIVE/CLIENTS/ICON+/Project-HES/Update_DCU/chromedriver/chromedriver"
|
|
driver = webdriver.Chrome(service=Service(driver_path), options=chrome_options)
|
|
|
|
try:
|
|
driver.get("http://localhost:8888")
|
|
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "username")))
|
|
driver.find_element(By.ID, "username").send_keys("admin")
|
|
driver.find_element(By.ID, "password").send_keys("jh1296")
|
|
driver.find_element(By.ID, "submit_btn").click()
|
|
|
|
log_processing("Login to HES successful", processing_log)
|
|
WebDriverWait(driver, 20).until(EC.element_to_be_clickable((By.ID, "parameter-setting"))).click()
|
|
|
|
iframes = driver.find_elements(By.TAG_NAME, "iframe")
|
|
if iframes:
|
|
driver.switch_to.frame(iframes[0])
|
|
|
|
new_ip = "10.232.107.250:9032"
|
|
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "main_ip")))
|
|
ip_field = driver.find_element(By.ID, "main_ip")
|
|
dcu_id = "Not found"
|
|
|
|
for _ in range(3):
|
|
ip_field.clear()
|
|
current_value = ip_field.get_attribute("value").strip()
|
|
if current_value == "":
|
|
break
|
|
time.sleep(2)
|
|
else:
|
|
raise Exception("Failed to clear the IP field!")
|
|
|
|
ip_field.send_keys(new_ip)
|
|
log_processing("Updated IP in Parameter Setting page", processing_log)
|
|
|
|
driver.find_element(By.ID, "main_host_submit").click()
|
|
log_processing("Saved changes", processing_log)
|
|
|
|
driver.back()
|
|
wait_for_page_to_load(driver)
|
|
click_running_state_tab(driver)
|
|
verify_dcu_update(driver, expected_ip=new_ip, dcu_ip=ip)
|
|
|
|
dcu_id, status = extract_text_and_verify(ip, new_ip)
|
|
writer.writerow([ip, dcu_id, new_ip, status])
|
|
|
|
except Exception as e:
|
|
log_processing(f"Error processing {ip}: {e}", processing_log)
|
|
writer.writerow([ip, "Not found", "Not found", "Failed - Error"])
|
|
finally:
|
|
stop_ssh_tunnel()
|
|
|
|
# Main function to start the server
|
|
if __name__ == "__main__":
|
|
updated_log = "/Users/macos/Documents/SINDIGILIVE/CLIENTS/ICON+/Project-HES/Update_DCU/update_log.csv"
|
|
processing_log = "/Users/macos/Documents/SINDIGILIVE/CLIENTS/ICON+/Project-HES/Update_DCU/processing_log.txt"
|
|
ip_list = sys.argv[1:]
|
|
eventlet.wsgi.server(eventlet.listen(('localhost', 5000)), app)
|
|
update_dcu_ip(updated_log, ip_list, processing_log)
|