File size: 7,281 Bytes
4aec76b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
"""
# Files.py
- This module handles file uploads, storage, and management of user files
- This deals with disk storage of files.
"""
import os
import time
import shutil
import base64
import fitz # PyMuPDF
from io import BytesIO
from typing import Tuple
from logger import get_logger
log = get_logger(name="FILES", log_to_console=False)
UPLOADS_PATH = os.path.abspath("./user_uploads/")
def check_create_uploads_folder() -> str:
"""Check and create the uploads directory if it doesn't exist."""
if not os.path.exists(UPLOADS_PATH):
os.makedirs(UPLOADS_PATH)
log.info(f"Uploads folder created at: {UPLOADS_PATH}")
else:
log.info(f"Uploads folder already exists at: {UPLOADS_PATH}")
return UPLOADS_PATH
def delete_empty_user_folders() -> None:
"""Delete empty user folders in the uploads directory."""
try:
for user_folder in os.listdir(UPLOADS_PATH):
user_path = os.path.join(UPLOADS_PATH, user_folder)
if os.path.isdir(user_path) and not os.listdir(user_path):
shutil.rmtree(user_path)
log.info(f"Deleted empty user folder: {user_path}")
except Exception as e:
log.error(f"Error deleting empty user folders: {repr(e)}")
def create_user_uploads_folder(user_id: str) -> bool:
"""Create a user-specific uploads directory if it doesn't exist."""
try:
user_upload_path = os.path.join(UPLOADS_PATH, user_id)
if not os.path.exists(user_upload_path):
os.makedirs(user_upload_path)
log.info(f"Uploads folder created for user {user_id} at: {user_upload_path}")
else:
log.info(f"Uploads folder already exists for user {user_id} at: {user_upload_path}")
return True
except Exception as e:
log.error(f"Error creating uploads folder for user {user_id}: {repr(e)}")
return False
def delete_file(user_id: str, file_name: str) -> bool:
"""Delete a user file from the uploads directory.
Args:
user_id (str): The name of the user whose file is to be deleted.
file_name (str): The name of the file to be deleted.
Returns:
bool: True if the file was successfully deleted, False otherwise.
"""
file_path = os.path.join(UPLOADS_PATH, user_id, file_name)
if os.path.isfile(file_path):
try:
os.remove(file_path)
log.info(f"User {user_id} - File deleted: {file_name}")
return True
except Exception as e:
log.error(f"User {user_id} - Error deleting file {file_name}: {repr(e)}")
return False
else:
log.warning(f"User {user_id} - File not found for deletion: {file_name}")
return False
def save_file(user_id: str, file_value_binary: bytes, file_name: str) -> Tuple[bool, str]:
"""Save the streamlit uploaded user file to the uploads directory.
Args:
user_id (str): The name of the user uploading the file.
file_value_binary (bytes): The binary content of the uploaded file.
file_name (str): The original name of the uploaded file.
Returns:
Tuple[bool, str]: A tuple containing a success flag and the saved file name or failure message.
"""
try:
# Create user directory if it doesn't exist
if not create_user_uploads_folder(user_id):
return False, "Error creating user uploads folder!"
base_name = file_name[:file_name.rfind(".")]
ext = file_name[file_name.rfind("."):]
new_file_name = base_name.replace(" ", "_").replace(".", "_") + ext
# check if same name already exists:
user_upload_path = os.path.join(UPLOADS_PATH, user_id)
while os.path.exists(os.path.join(user_upload_path, new_file_name)):
new_file_name = "n_" + new_file_name
# Save the file
file_path = os.path.join(user_upload_path, new_file_name)
with open(file_path, "wb") as f:
f.write(file_value_binary)
log.info(f"User {user_id} - File saved: {new_file_name}")
return True, new_file_name
except Exception as e:
log.error(f"User {user_id} - Error saving file {file_name}: {repr(e)}")
return False, "Error saving file!"
def get_pdf_iframe(user_id: str, file_name: str, num_pages: int = 5) -> Tuple[bool, str]:
"""Return first n pages of asked pdf in an iframe.
Args:
user_id (str): The name of the user whose file is to be loaded.
file_name (str): The name of the PDF file to be loaded.
num_pages (int): The number of pages to include in the iframe.
Returns:
Tuple[bool, str]: A tuple containing a success flag and the HTML iframe string or an error message.
"""
ext = file_name.split('.')[-1].lower()
if ext != 'pdf':
log.info(f"Requested iframe for non-PDF file: {file_name}")
return False, "Unsupported File Format. [Only PDF previews are supported for now, Others will be added soon.]"
log.info(f"Loading PDF: {user_id}/{file_name}")
abs_path = os.path.join(UPLOADS_PATH, user_id, file_name)
if not os.path.isfile(abs_path):
log.error(f"File not found at {abs_path}!")
return False, "File not found!"
try:
# Open original PDF
doc = fitz.open(abs_path)
# Create new PDF in memory
new_pdf = fitz.open()
for i in range(min(num_pages, len(doc))):
new_pdf.insert_pdf(doc, from_page=i, to_page=i)
# Write to in-memory buffer
pdf_buffer = BytesIO()
new_pdf.save(pdf_buffer)
new_pdf.close()
doc.close()
# Base64 encode the trimmed version
base64_pdf = base64.b64encode(pdf_buffer.getvalue()).decode("utf-8")
# Close the buffer
pdf_buffer.close()
log.info(f"PDF iframe loaded successfully: {user_id}/{file_name}")
return True, f"""
<iframe
src="data:application/pdf;base64,{base64_pdf}#toolbar=0&navpanes=0&scrollbar=1&page=1&view=FitH"
width=100%
height=300rem
type="application/pdf"
></iframe>
"""
except Exception as e:
log.error(f"Error loading PDF {file_name} for user {user_id}: {repr(e)}")
return False, "Some error occurred while loading the PDF!"
def get_file_path(user_id: str, file_name: str) -> str:
"""Get the absolute path of a user's file in the uploads directory.
Args:
user_id (str): The name of the user whose file path is to be retrieved.
file_name (str): The name of the file.
Returns:
str: The absolute path of the file.
"""
return os.path.join(UPLOADS_PATH, user_id, file_name)
if __name__ == "__main__":
user = "test_user"
# Example usage
check_create_uploads_folder()
# Create a user uploads folder
create_user_uploads_folder(user)
# Save a sample file
sample_file_content = b"This is a sample file content."
print(save_file(user, sample_file_content, "sample.a.b c.d.pdf"))
# Get PDF iframe:
file = input(f"Paste one file here, `{UPLOADS_PATH}/{user}/` and paste just the file name: ")
print(get_pdf_iframe(user, file, num_pages=1))
|