File size: 7,281 Bytes
4aec76b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
"""
# Files.py
- This module handles file uploads, storage, and management of user files
- This deals with disk storage of files.
"""

import os
import time
import shutil
import base64
import fitz  # PyMuPDF
from io import BytesIO
from typing import Tuple

from logger import get_logger
log = get_logger(name="FILES", log_to_console=False)

UPLOADS_PATH = os.path.abspath("./user_uploads/")


def check_create_uploads_folder() -> str:
    """Check and create the uploads directory if it doesn't exist."""

    if not os.path.exists(UPLOADS_PATH):
        os.makedirs(UPLOADS_PATH)
        log.info(f"Uploads folder created at: {UPLOADS_PATH}")
    else:
        log.info(f"Uploads folder already exists at: {UPLOADS_PATH}")
    return UPLOADS_PATH


def delete_empty_user_folders() -> None:
    """Delete empty user folders in the uploads directory."""

    try:
        for user_folder in os.listdir(UPLOADS_PATH):
            user_path = os.path.join(UPLOADS_PATH, user_folder)
            if os.path.isdir(user_path) and not os.listdir(user_path):
                shutil.rmtree(user_path)
                log.info(f"Deleted empty user folder: {user_path}")

    except Exception as e:
        log.error(f"Error deleting empty user folders: {repr(e)}")
        

def create_user_uploads_folder(user_id: str) -> bool:
    """Create a user-specific uploads directory if it doesn't exist."""

    try:
        user_upload_path = os.path.join(UPLOADS_PATH, user_id)
        if not os.path.exists(user_upload_path):
            os.makedirs(user_upload_path)
            log.info(f"Uploads folder created for user {user_id} at: {user_upload_path}")
        else:
            log.info(f"Uploads folder already exists for user {user_id} at: {user_upload_path}")
        return True

    except Exception as e:
        log.error(f"Error creating uploads folder for user {user_id}: {repr(e)}")
        return False


def delete_file(user_id: str, file_name: str) -> bool:
    """Delete a user file from the uploads directory.
    Args:
        user_id (str): The name of the user whose file is to be deleted.
        file_name (str): The name of the file to be deleted.
    Returns:
        bool: True if the file was successfully deleted, False otherwise.
    """

    file_path = os.path.join(UPLOADS_PATH, user_id, file_name)
    if os.path.isfile(file_path):
        try:
            os.remove(file_path)
            log.info(f"User {user_id} - File deleted: {file_name}")
            return True
        except Exception as e:
            log.error(f"User {user_id} - Error deleting file {file_name}: {repr(e)}")
            return False
    else:
        log.warning(f"User {user_id} - File not found for deletion: {file_name}")
        return False


def save_file(user_id: str, file_value_binary: bytes, file_name: str) -> Tuple[bool, str]:
    """Save the streamlit uploaded user file to the uploads directory.
    Args:
        user_id (str): The name of the user uploading the file.
        file_value_binary (bytes): The binary content of the uploaded file.
        file_name (str): The original name of the uploaded file.
    Returns:
        Tuple[bool, str]: A tuple containing a success flag and the saved file name or failure message.
    """

    try:
        # Create user directory if it doesn't exist
        if not create_user_uploads_folder(user_id):
            return False, "Error creating user uploads folder!"
        
        base_name = file_name[:file_name.rfind(".")]
        ext = file_name[file_name.rfind("."):]
        new_file_name = base_name.replace(" ", "_").replace(".", "_") + ext

        # check if same name already exists:
        user_upload_path = os.path.join(UPLOADS_PATH, user_id)
        while os.path.exists(os.path.join(user_upload_path, new_file_name)):
            new_file_name = "n_" + new_file_name

        # Save the file
        file_path = os.path.join(user_upload_path, new_file_name)

        with open(file_path, "wb") as f:
            f.write(file_value_binary)
            log.info(f"User {user_id} - File saved: {new_file_name}")

        return True, new_file_name

    except Exception as e:
        log.error(f"User {user_id} - Error saving file {file_name}: {repr(e)}")
        return False, "Error saving file!"


def get_pdf_iframe(user_id: str, file_name: str, num_pages: int = 5) -> Tuple[bool, str]:
    """Return first n pages of asked pdf in an iframe.

    Args:
        user_id (str): The name of the user whose file is to be loaded.
        file_name (str): The name of the PDF file to be loaded.
        num_pages (int): The number of pages to include in the iframe.
    Returns:
        Tuple[bool, str]: A tuple containing a success flag and the HTML iframe string or an error message.
    """
    ext = file_name.split('.')[-1].lower()
    if ext != 'pdf':
        log.info(f"Requested iframe for non-PDF file: {file_name}")
        return False, "Unsupported File Format. [Only PDF previews are supported for now, Others will be added soon.]"

    log.info(f"Loading PDF: {user_id}/{file_name}")
    abs_path = os.path.join(UPLOADS_PATH, user_id, file_name)

    if not os.path.isfile(abs_path):
        log.error(f"File not found at {abs_path}!")
        return False, "File not found!"

    try:
        # Open original PDF
        doc = fitz.open(abs_path)

        # Create new PDF in memory
        new_pdf = fitz.open()
        for i in range(min(num_pages, len(doc))):
            new_pdf.insert_pdf(doc, from_page=i, to_page=i)

        # Write to in-memory buffer
        pdf_buffer = BytesIO()
        new_pdf.save(pdf_buffer)
        new_pdf.close()
        doc.close()

        # Base64 encode the trimmed version
        base64_pdf = base64.b64encode(pdf_buffer.getvalue()).decode("utf-8")

        # Close the buffer
        pdf_buffer.close()
        log.info(f"PDF iframe loaded successfully: {user_id}/{file_name}")

        return True, f"""
            <iframe
                src="data:application/pdf;base64,{base64_pdf}#toolbar=0&navpanes=0&scrollbar=1&page=1&view=FitH"
                width=100%
                height=300rem
                type="application/pdf"
            ></iframe>
        """

    except Exception as e:
        log.error(f"Error loading PDF {file_name} for user {user_id}: {repr(e)}")
        return False, "Some error occurred while loading the PDF!"


def get_file_path(user_id: str, file_name: str) -> str:
    """Get the absolute path of a user's file in the uploads directory.

    Args:
        user_id (str): The name of the user whose file path is to be retrieved.
        file_name (str): The name of the file.
    Returns:
        str: The absolute path of the file.
    """
    return os.path.join(UPLOADS_PATH, user_id, file_name)


if __name__ == "__main__":
    user = "test_user"

    # Example usage
    check_create_uploads_folder()

    # Create a user uploads folder
    create_user_uploads_folder(user)

    # Save a sample file
    sample_file_content = b"This is a sample file content."
    print(save_file(user, sample_file_content, "sample.a.b c.d.pdf"))

    # Get PDF iframe:
    file = input(f"Paste one file here, `{UPLOADS_PATH}/{user}/` and paste just the file name: ")
    print(get_pdf_iframe(user, file, num_pages=1))