Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import shutil | |
| from zipfile import ZipFile | |
| import logging | |
| import psutil | |
| import subprocess | |
| from flask import Flask, request, jsonify, render_template, send_file | |
| from mpi4py import MPI | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize Flask app | |
| app = Flask(__name__) | |
| connected_cpus = {"localhost": {"cpu_count": psutil.cpu_count(logical=False), "usage": psutil.cpu_percent(interval=1)}} | |
| # Define the target function for MPI | |
| def target_function(script_path, folder_path): | |
| output_log = tempfile.TemporaryFile(mode='w+t') | |
| try: | |
| result = subprocess.run(['python', script_path], cwd=folder_path, stdout=output_log, stderr=subprocess.STDOUT) | |
| output_log.seek(0) | |
| log_output = output_log.read() | |
| except Exception as e: | |
| log_output = str(e) | |
| finally: | |
| output_log.close() | |
| return log_output | |
| # Endpoint to handle file uploads and script execution | |
| def handle_upload(): | |
| try: | |
| if 'file' not in request.files or 'script_content' not in request.form: | |
| return jsonify({"status": "error", "message": "File or script content not provided"}), 400 | |
| files = request.files.getlist('file') | |
| script_content = request.form['script_content'] | |
| # Create a temporary directory to store uploaded files | |
| temp_dir = tempfile.mkdtemp() | |
| logger.info(f"Temporary directory created at {temp_dir}") | |
| # Save the uploaded files to the temporary directory | |
| folder_path = os.path.join(temp_dir, 'uploaded_folder') | |
| os.makedirs(folder_path, exist_ok=True) | |
| for file_obj in files: | |
| file_path = os.path.join(folder_path, file_obj.filename) | |
| file_obj.save(file_path) | |
| logger.info(f"File saved to {file_path}") | |
| # Save the script content to a file | |
| script_path = os.path.join(folder_path, 'user_script.py') | |
| with open(script_path, 'w') as script_file: | |
| script_file.write(script_content) | |
| logger.info(f"Script content saved to {script_path}") | |
| # Run the script using MPI | |
| log_output = run_script_with_mpi(script_path, folder_path) | |
| # Create a zip file of the entire folder | |
| zip_path = os.path.join(temp_dir, 'output_folder.zip') | |
| with ZipFile(zip_path, 'w') as zipf: | |
| for root, _, files in os.walk(folder_path): | |
| for file in files: | |
| zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), folder_path)) | |
| logger.info(f"Output folder zipped at {zip_path}") | |
| return jsonify({"status": "success", "log_output": log_output, "download_url": f"/download/{os.path.basename(zip_path)}"}) | |
| except Exception as e: | |
| logger.error(f"Error in handle_upload: {e}") | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| def download_file(filename): | |
| try: | |
| return send_file(os.path.join(tempfile.gettempdir(), filename), as_attachment=True) | |
| except Exception as e: | |
| logger.error(f"Error in download_file: {e}") | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| # Endpoint to get connected CPUs information | |
| def get_cpu_info(): | |
| try: | |
| info = [] | |
| for host, data in connected_cpus.items(): | |
| usage = psutil.cpu_percent(interval=1) if host == "localhost" else data['usage'] | |
| info.append(f"{host}: {data['cpu_count']} CPUs, {usage}% usage") | |
| return jsonify({"status": "success", "cpu_info": "\n".join(info)}) | |
| except Exception as e: | |
| logger.error(f"Error in get_cpu_info: {e}") | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| # Endpoint to execute commands | |
| def execute_command(): | |
| try: | |
| command = request.form['command'] | |
| if not command: | |
| return jsonify({"status": "error", "message": "No command provided"}), 400 | |
| # Ensure commands are executed in a safe environment | |
| allowed_commands = ['pip install'] | |
| if not any(command.startswith(cmd) for cmd in allowed_commands): | |
| return jsonify({"status": "error", "message": "Command not allowed"}), 400 | |
| output_log = tempfile.TemporaryFile(mode='w+t') | |
| try: | |
| result = subprocess.run(command.split(), stdout=output_log, stderr=subprocess.STDOUT) | |
| output_log.seek(0) | |
| log_output = output_log.read() | |
| except Exception as e: | |
| log_output = str(e) | |
| finally: | |
| output_log.close() | |
| return jsonify({"status": "success", "log_output": log_output}) | |
| except Exception as e: | |
| logger.error(f"Error in execute_command: {e}") | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| # Endpoint to donate CPU resources | |
| def donate_cpu(): | |
| try: | |
| data = request.json | |
| host = data['host'] | |
| cpu_count = data['cpu_count'] | |
| connected_cpus[host] = {"cpu_count": cpu_count, "usage": 0.0} | |
| return jsonify({"status": "success", "message": f"CPU resources from {host} donated successfully."}) | |
| except Exception as e: | |
| logger.error(f"Error in donate_cpu: {e}") | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| # Endpoint to update CPU usage | |
| def update_cpu_usage(): | |
| try: | |
| data = request.json | |
| host = data['host'] | |
| usage = data['usage'] | |
| if host in connected_cpus: | |
| connected_cpus[host]['usage'] = usage | |
| return jsonify({"status": "success", "message": f"CPU usage from {host} updated successfully."}) | |
| else: | |
| return jsonify({"status": "error", "message": f"Host {host} not found."}), 404 | |
| except Exception as e: | |
| logger.error(f"Error in update_cpu_usage: {e}") | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| # Main interface | |
| def index(): | |
| return render_template('index.html') | |
| def run_script_with_mpi(script_path, folder_path): | |
| # Create a temporary directory for MPI processes | |
| mpi_temp_dir = tempfile.mkdtemp() | |
| mpi_script_path = os.path.join(mpi_temp_dir, 'mpi_script.py') | |
| # Write the MPI script to the temporary directory | |
| with open(mpi_script_path, 'w') as mpi_script_file: | |
| mpi_script_file.write(f""" | |
| import os | |
| import tempfile | |
| import subprocess | |
| from mpi4py import MPI | |
| def target_function(script_path, folder_path): | |
| output_log = tempfile.TemporaryFile(mode='w+t') | |
| try: | |
| result = subprocess.run(['python', script_path], cwd=folder_path, stdout=output_log, stderr=subprocess.STDOUT) | |
| output_log.seek(0) | |
| log_output = output_log.read() | |
| except Exception as e: | |
| log_output = str(e) | |
| finally: | |
| output_log.close() | |
| return log_output | |
| def run_script(script_path, folder_path): | |
| comm = MPI.COMM_WORLD | |
| rank = comm.Get_rank() | |
| size = comm.Get_size() | |
| if rank == 0: | |
| # Master process | |
| log_outputs = [] | |
| for i in range(1, size): | |
| log_output = comm.recv(source=i, tag=11) | |
| log_outputs.append(log_output) | |
| with open(os.path.join(folder_path, 'mpi_log_output.txt'), 'w') as log_file: | |
| log_file.write('\\n'.join(log_outputs)) | |
| else: | |
| # Worker process | |
| log_output = target_function(script_path, folder_path) | |
| comm.send(log_output, dest=0, tag=11) | |
| if __name__ == "__main__": | |
| run_script('{script_path}', '{folder_path}') | |
| """) | |
| # Run the MPI script using subprocess | |
| result = subprocess.run(['mpiexec', '-n', str(psutil.cpu_count(logical=False)), 'python', mpi_script_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| # Read the log output from the file | |
| log_output_path = os.path.join(folder_path, 'mpi_log_output.txt') | |
| with open(log_output_path, 'r') as log_file: | |
| log_output = log_file.read() | |
| # Clean up the temporary directory | |
| shutil.rmtree(mpi_temp_dir) | |
| return log_output | |
| if __name__ == "__main__": | |
| app.run(host='0.0.0.0', port=7860, threaded=True) |