# app_fullbody_pretrained.py β Full-Body X-ray Analysis (Pretrained) + PDF + Suspect Boxes
# -----------------------------------------------------------------------------
# Run: python -m uvicorn app_fullbody_pretrained:api --host 0.0.0.0 --port 7860
# -----------------------------------------------------------------------------
import os
os.system("pip uninstall -y google-generativeai google-api-core googleapis-common-protos grpcio || true")
os.system("pip install --upgrade google-generativeai==0.8.5 google-api-core protobuf grpcio --quiet")
import os, io, json, tempfile, hashlib, datetime, pathlib
from io import BytesIO
from typing import Optional, Dict, List, Any, Tuple
import numpy as np
from PIL import Image
import cv2
import unicodedata
import torch
import torch.nn.functional as F
import base64
# FastAPI imports
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse, Response
from fastapi.staticfiles import StaticFiles
from transformers import DPTFeatureExtractor, DPTForDepthEstimation
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D # noqa: F401
import os
#import openai
from fastapi import FastAPI, UploadFile, File
from pydantic import BaseModel
# Hugging Face Space base URL
HF_SPACE_URL = os.getenv("HF_SPACE_URL", "https://abbhy123ghh-x-ray-analysis-2.hf.space").rstrip("/")
# β
Create app
api = FastAPI()
os.makedirs("static", exist_ok=True)
api.mount("/static", StaticFiles(directory="static"), name="static")
from transformers import DPTFeatureExtractor, DPTForDepthEstimation
# Optional dependencies
try: import pydicom; HAVE_DICOM=True
except Exception: HAVE_DICOM=False
try: import torchxrayvision as xrv; HAVE_XRV=True
except Exception: HAVE_XRV=False
try:
from transformers import (AutoProcessor, CLIPModel, AutoImageProcessor, AutoModelForImageClassification)
HAVE_TRF=True
except Exception: HAVE_TRF=False
try: from skimage.segmentation import slic; from skimage.util import img_as_float; HAVE_SKIMG=True
except Exception: HAVE_SKIMG=False
try: from fpdf import FPDF; HAVE_PDF=True
except Exception: HAVE_PDF=False
import uvicorn
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
# -----------------------------
# KEEP all your existing utility functions + configs
# -----------------------------
# (to_uint8, read_any_xray, chest_disease_probs, analyze_fullbody, pdf_report_bytes, etc.)
# I did not touch any model logic.
# -----------------------------
# Root
# -----------------------------
@api.get("/")
async def root():
return {"ok": True, "message": "X-ray API is running"}
def to_base64(img: np.ndarray) -> str:
pil = Image.fromarray(img).convert("RGB")
buf = io.BytesIO()
pil.save(buf, format="PNG")
return "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode()
# ========================================================================
# β
FastAPI endpoints β updated for multi-region analysis
# ========================================================================
@api.post("/analyze")
async def analyze(file: UploadFile = File(...)):
"""Analyze an uploaded X-ray and return meta + annotated overlays (JSON)."""
try:
suffix = (file.filename or "").lower()
raw = await file.read()
# Read DICOM or normal image
if suffix.endswith(".dcm") and HAVE_DICOM:
ds = pydicom.dcmread(io.BytesIO(raw))
arr = ds.pixel_array.astype(np.float32)
arr -= arr.min(); arr /= (arr.max() - arr.min() + 1e-6)
gray_u8 = (arr * 255.0).clip(0, 255).astype(np.uint8)
else:
gray_u8 = np.array(Image.open(io.BytesIO(raw)).convert("L"))
gray_u8 = to_uint8(gray_u8)
# Run full-body multi-region analysis
res = analyze_fullbody(gray_u8)
# π Flatten meta_pred so frontend can show Age/View/Gender/CTR
meta_pred = res.get("meta_pred", {})
res["age"] = meta_pred.get("age", {}).get("label", "N/A")
res["view"] = meta_pred.get("view", {}).get("label", "N/A")
res["gender"] = meta_pred.get("sex", {}).get("label", "N/A")
res["ctr"] = "--" # placeholder
# Convert original to base64
res["original_img"] = to_base64(gray_u8)
# π Overlay with boxes + labels
# --- Try segmentation overlay ---
overlay_img, overlay_meta = findings_overlay(gray_u8, res)
# --- β
Fallback if overlay is None (always show something)
if overlay_img is None:
print("β οΈ Overlay not generated β using edge-based overlay fallback.")
overlay_img = edges_overlay(gray_u8)
overlay_meta = {"label": "Edge-based fallback", "boxes": []}
res["overlay_img"] = to_base64(overlay_img)
res["overlay_meta"] = overlay_meta
# β
Generate structured AI radiology report
doctor_report_html = generate_medical_report(res)
return JSONResponse({
"ok": True,
"meta": {
"age": res["age"],
"view": res["view"],
"gender": res["gender"],
"ctr": res["ctr"]
},
"original_img": res["original_img"],
"overlay_img": res["overlay_img"],
"overlay_meta": res["overlay_meta"],
"tasks": res.get("tasks", []),
"region": res.get("region", "unknown"),
"region_conf": res.get("region_conf", 0.0),
"meta_pred": res.get("meta_pred", {}),
"doctor_report_html": doctor_report_html # π©» formatted HTML report
})
except Exception as e:
print("β Analyze failed:", e)
return JSONResponse({"error": str(e)}, status_code=400)
from fastapi import UploadFile, File
from fastapi.responses import Response, JSONResponse
from reportlab.lib.pagesizes import A4
from reportlab.lib import colors
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image as RLImage
import io, datetime, json
from PIL import Image
import numpy as np
import pydicom
# ===========================================================
# π©» Unified PDF Generator (Drlogy-style Layout for All Regions)
# ===========================================================
def generate_clinical_pdf(gray_img, overlay_img, res, doctor_report):
buffer = io.BytesIO()
doc = SimpleDocTemplate(
buffer, pagesize=A4, rightMargin=36, leftMargin=36, topMargin=60, bottomMargin=36
)
styles = getSampleStyleSheet()
normal = ParagraphStyle('Normal', parent=styles['Normal'], fontSize=10, leading=13)
bold = ParagraphStyle('Bold', parent=normal, fontName='Helvetica-Bold')
header = ParagraphStyle('Header', parent=normal, alignment=1, fontName='Helvetica-Bold', fontSize=14)
footer = ParagraphStyle('Footer', parent=normal, fontSize=8, textColor=colors.gray)
story = []
# --- HEADER ---
story.append(Paragraph("DRLOGY IMAGING CENTER", header))
story.append(Paragraph("X-Ray | CT-Scan | MRI | USG", bold))
story.append(Paragraph("105-108, SMART VISION COMPLEX, HEALTHCARE ROAD, MUMBAI - 689578", normal))
story.append(Paragraph("π 0123456789 | βοΈ drlogyimaging@drlogy.com", normal))
story.append(Spacer(1, 10))
# --- DYNAMIC TITLE ---
region_title = res.get("region", "General Region").replace("_", " ").title()
story.append(Paragraph(f"AI RADIOLOGY REPORT β {region_title.upper()}", header))
story.append(Spacer(1, 10))
# --- PATIENT INFO ---
patient_table = [
["Name:", res.get("patient_name", "N/A"), "Age:", f"{res.get('age', 'N/A')} yrs"],
["Sex:", res.get("gender", "N/A"), "Study Date:", datetime.datetime.now().strftime("%d %b %Y, %I:%M %p")],
["Region:", region_title, "AI Confidence:", f"{res.get('region_conf', 0):.2f}"],
]
table = Table(patient_table, colWidths=[70, 160, 90, 150])
table.setStyle(TableStyle([
("BOX", (0, 0), (-1, -1), 0.5, colors.gray),
("INNERGRID", (0, 0), (-1, -1), 0.25, colors.gray),
("FONTNAME", (0, 0), (-1, -1), "Helvetica"),
("FONTSIZE", (0, 0), (-1, -1), 9),
("BACKGROUND", (0, 0), (-1, 0), colors.whitesmoke)
]))
story.append(table)
story.append(Spacer(1, 12))
# --- TECHNICAL DETAILS ---
story.append(Paragraph("TECHNICAL DETAILS", bold))
story.append(Paragraph(
"AI-assisted analysis performed using a multi-region deep learning model. "
"Single-view radiograph analyzed for anatomical abnormalities.", normal))
story.append(Spacer(1, 10))
# --- FINDINGS ---
story.append(Paragraph("AI FINDINGS", bold))
story.append(Paragraph(
"Below are the observed findings as per AI-based analysis of the selected body part.", normal))
story.append(Spacer(1, 5))
story.append(Paragraph(doctor_report.replace("\n", "
"), normal))
story.append(Spacer(1, 10))
# --- IMPRESSION ---
story.append(Paragraph("AI IMPRESSION", bold))
story.append(Paragraph(
"Based on the AI modelβs interpretation, no acute abnormality was confidently detected unless otherwise stated above.",
normal))
story.append(Spacer(1, 10))
# --- RECOMMENDATIONS ---
story.append(Paragraph("RECOMMENDATIONS", bold))
story.append(Paragraph(
"Clinical correlation is advised. If clinical suspicion persists, further radiographic or advanced imaging is recommended.",
normal))
story.append(Spacer(1, 10))
# --- IMAGES ---
story.append(Paragraph("IMAGES:", bold))
img_buf = io.BytesIO()
Image.fromarray(gray_img).save(img_buf, format="PNG")
img_buf.seek(0)
story.append(RLImage(img_buf, width=200, height=200))
if overlay_img is not None:
overlay_buf = io.BytesIO()
Image.fromarray(overlay_img).save(overlay_buf, format="PNG")
overlay_buf.seek(0)
story.append(RLImage(overlay_buf, width=200, height=200))
story.append(Spacer(1, 20))
# --- FOOTER ---
story.append(Paragraph("**** End of Report ****", footer))
story.append(Spacer(1, 6))
story.append(Paragraph("Dr. AI Radiologist (MD)", footer))
story.append(Paragraph(f"Generated on: {datetime.datetime.now():%d %b %Y, %I:%M %p}", footer))
story.append(Paragraph("This AI report is supportive, not a substitute for radiologist diagnosis.", footer))
doc.build(story)
buffer.seek(0)
return buffer.getvalue()
# ===========================================================
# β
Main Endpoint β Analyze & Generate PDF
# ===========================================================
from fastapi import UploadFile, File
from fastapi.responses import Response, JSONResponse
from reportlab.lib.pagesizes import A4
from reportlab.lib import colors
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import (
SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image as RLImage
)
from reportlab.lib.units import mm
import io, datetime, json, re
from PIL import Image
import numpy as np
import pydicom
@api.post("/analyze_pdf")
async def analyze_pdf(file: UploadFile = File(...)):
"""Generate a professional AI radiology report PDF."""
try:
suffix = (file.filename or "").lower()
raw = await file.read()
# --- Read DICOM / Image ---
if suffix.endswith(".dcm") and HAVE_DICOM:
ds = pydicom.dcmread(io.BytesIO(raw))
arr = ds.pixel_array.astype(np.float32)
arr -= arr.min(); arr /= (arr.max()-arr.min()+1e-6)
gray_u8 = (arr*255).clip(0,255).astype(np.uint8)
else:
gray_u8 = np.array(Image.open(io.BytesIO(raw)).convert("L"))
gray_u8 = to_uint8(gray_u8)
# --- Model Analysis ---
res = analyze_fullbody(gray_u8)
overlay_img, _ = findings_overlay(gray_u8, res)
if overlay_img is None:
overlay_img = edges_overlay(gray_u8)
# --- Gemini AI Report ---
try:
prompt = f"""
You are a senior radiologist. Generate a structured radiology report.
Data: {json.dumps(res, indent=2)}
Include clear prose for: Technical Details, Findings, Impression, Recommendations.
Avoid markdown, bullets, or extra headings.
"""
gemini_response = gemini_model.generate_content(prompt)
doctor_report = getattr(gemini_response, "text", "AI report unavailable.").strip()
except Exception:
doctor_report = "AI medical report could not be generated."
clean_report = re.sub(r"[#*_]+", "", doctor_report)
clean_report = re.sub(r"\n{2,}", "\n", clean_report.strip())
# --- PDF Template ---
buffer = io.BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=A4, leftMargin=35, rightMargin=35, topMargin=35, bottomMargin=40)
styles = getSampleStyleSheet()
normal = ParagraphStyle("NormalCustom", parent=styles["Normal"], fontSize=10, leading=14)
heading = ParagraphStyle("Heading", parent=styles["Heading2"], fontSize=13, textColor=colors.HexColor("#004C99"))
center_heading = ParagraphStyle("CenterHeading", parent=styles["Heading2"], fontSize=14,
textColor=colors.white, alignment=1, spaceAfter=10)
footer = ParagraphStyle("Footer", parent=styles["Normal"], fontSize=9, textColor=colors.gray, alignment=1)
story = []
# === HEADER BAR ===
story.append(Table(
[[Paragraph("DRLOGY IMAGING CENTER", center_heading)]],
colWidths=[500],
style=[
('BACKGROUND', (0,0), (-1,-1), colors.HexColor("#004C99")),
('BOTTOMPADDING', (0,0), (-1,-1), 6),
('TOPPADDING', (0,0), (-1,-1), 6)
]
))
story.append(Paragraph("X-Ray | CT-Scan | MRI | USG", normal))
story.append(Paragraph("105-108, SMART VISION COMPLEX, HEALTHCARE ROAD, MUMBAI - 689578", normal))
story.append(Paragraph("π 0123456789 β drlogyimaging@drlogy.com", normal))
story.append(Spacer(1, 10))
story.append(Table([[ '', '' ]], colWidths=[480],
style=[('LINEBELOW',(0,0),(-1,-1),1,colors.HexColor("#004C99"))]))
story.append(Spacer(1, 8))
# === TITLE SECTION ===
title_table = Table(
[[Paragraph("AI RADIOLOGY REPORT", heading),
Paragraph(datetime.datetime.now().strftime("%d %b %Y, %I:%M %p"), normal)]],
colWidths=[340, 140],
style=[
("VALIGN", (0,0), (-1,-1), "MIDDLE"),
("ALIGN", (1,0), (1,0), "RIGHT")
]
)
story.append(title_table)
story.append(Spacer(1, 12))
# === PATIENT INFO ===
story.append(Paragraph("PATIENT INFORMATION", heading))
patient_info = [
["Name", "N/A"],
["Age", f"{res.get('meta_pred', {}).get('age', {}).get('label', 'N/A')}"],
["Sex", f"{res.get('meta_pred', {}).get('sex', {}).get('label', 'N/A')}"],
["Region", res.get("region", "Unknown").title()],
["AI Confidence", f"{res.get('region_conf', 0):.2f}"],
]
table = Table(patient_info, colWidths=[120, 360])
table.setStyle(TableStyle([
("BOX", (0,0), (-1,-1), 0.5, colors.grey),
("INNERGRID", (0,0), (-1,-1), 0.25, colors.lightgrey),
("BACKGROUND", (0,0), (0,-1), colors.HexColor("#EAF1FB")),
]))
story.append(table)
story.append(Spacer(1, 10))
# === MAIN BODY ===
story.append(Paragraph("CLINICAL REPORT SUMMARY", heading))
story.append(Spacer(1, 4))
story.append(Paragraph(clean_report.replace("\n", "
"), normal))
story.append(Spacer(1, 15))
# === IMAGES ===
story.append(Paragraph("REFERENCE IMAGES", heading))
story.append(Spacer(1, 4))
img_buf = io.BytesIO()
Image.fromarray(gray_u8).save(img_buf, format="PNG")
img_buf.seek(0)
story.append(RLImage(img_buf, width=180, height=180))
story.append(Spacer(1, 6))
overlay_buf = io.BytesIO()
Image.fromarray(overlay_img).save(overlay_buf, format="PNG")
overlay_buf.seek(0)
story.append(RLImage(overlay_buf, width=180, height=180))
story.append(Spacer(1, 20))
# === FOOTER ===
story.append(Table([[ '', '' ]], colWidths=[480],
style=[('LINEBELOW',(0,0),(-1,-1),1,colors.HexColor("#004C99"))]))
story.append(Spacer(1, 6))
story.append(Paragraph("*** End of Report ***", footer))
story.append(Spacer(1, 3))
story.append(Paragraph("Generated on: " + datetime.datetime.now().strftime("%d %b %Y, %I:%M %p"), footer))
story.append(Paragraph("This report is AI-generated and intended for research/clinical support only.", footer))
story.append(Paragraph("Β© 2025 Drlogy Imaging Center | 24x7 Services", footer))
doc.build(story)
buffer.seek(0)
return Response(
content=buffer.read(),
media_type="application/pdf",
headers={"Content-Disposition": "attachment; filename=ai_xray_report.pdf"}
)
except Exception as e:
import traceback; traceback.print_exc()
return JSONResponse({"error": str(e)}, status_code=400)
# ===============================================
# # β
Runway Gen-2 β Convert X-ray to 3D Video
# # ===============================================
# import requests
# import io
# import base64
# import time
# from fastapi import FastAPI, UploadFile, File
# from fastapi.responses import JSONResponse
# from PIL import Image
# app = FastAPI()
# PIXVERSE_API_KEY = "sk-4720d9d2ae7bd362bcece9906ffc6848"
# @api.post("/xray_to_3dvideo")
# async def xray_to_3dvideo(file: UploadFile = File(...)):
# try:
# raw = await file.read()
# img = Image.open(io.BytesIO(raw)).convert("RGB")
# buf = io.BytesIO()
# img.save(buf, format="PNG")
# base64_img = base64.b64encode(buf.getvalue()).decode()
# # β
FIXED PAYLOAD & HEADER
# payload = {
# "image": base64_img, # was 'image_base64'
# "prompt": "A grayscale rotating 3D visualization of a medical X-ray scan, showing bones with realistic depth and cinematic lighting.",
# "duration": 4,
# "motion": "rotate", # was 'motion_mode'
# "quality": "540p"
# }
# headers = {
# "Authorization": f"Bearer {PIXVERSE_API_KEY}", # was 'x-api-key'
# "Content-Type": "application/json"
# }
# # 1οΈβ£ Start PixVerse job
# start_response = requests.post(
# "https://api.segmind.com/v1/pixverse-image2video",
# json=payload,
# headers=headers
# )
# start_data = start_response.json()
# print("PixVerse start_data:", start_data)
# # 2οΈβ£ Extract task ID or instant video URL
# task_id = start_data.get("task_id") or start_data.get("id")
# video_url = (
# start_data.get("video_url")
# or start_data.get("video")
# or start_data.get("output", {}).get("video_url")
# )
# if not task_id and not video_url:
# return JSONResponse({
# "error": start_data,
# "message": "β PixVerse did not return a task ID or video URL (check API key or payload)"
# }, status_code=400)
# # If instant video available
# if video_url:
# return JSONResponse({
# "ok": True,
# "video_url": video_url,
# "message": "β
3D X-ray video generated instantly via PixVerse"
# })
# # 3οΈβ£ Poll task status
# poll_url = f"https://api.segmind.com/v1/tasks/{task_id}"
# for attempt in range(30): # 2.5 minutes max
# time.sleep(5)
# status_response = requests.get(poll_url, headers=headers)
# status_data = status_response.json()
# status = status_data.get("status", "").lower()
# if status == "succeeded":
# output = status_data.get("output", {})
# video_url = (
# output.get("video_url")
# or output.get("video")
# or output.get("assets", {}).get("video")
# )
# if video_url:
# return JSONResponse({
# "ok": True,
# "video_url": video_url,
# "message": "β
3D X-ray video generated successfully via PixVerse"
# })
# else:
# return JSONResponse({
# "error": "Video URL missing in succeeded task",
# "data": status_data
# }, status_code=500)
# if status == "failed":
# return JSONResponse({
# "error": "β Video generation task failed",
# "data": status_data
# }, status_code=500)
# return JSONResponse({
# "error": "β³ Video generation timed out",
# "task_status": status,
# "task_id": task_id
# }, status_code=202)
# except Exception as e:
# print("3D video backend error:", str(e))
# return JSONResponse({"error": str(e)}, status_code=500)
# ===============================================
# β
Fully Fixed Version β 3D Local Video Generator (with Overlay)
# ===============================================
from fastapi import UploadFile, File
from fastapi.responses import JSONResponse
import io, os, torch, numpy as np, imageio, cv2
#from torchvision.models.video import r3d_18
from PIL import Image
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
HF_SPACE_URL = os.getenv("HF_SPACE_URL", "https://abbhy123ghh-x-ray-analysis-2.hf.space").rstrip("/")
@api.post("/generate_3d_local")
async def generate_3d_local(file: UploadFile = File(...)):
"""
Generate a pseudo-3D rotating video from a 2D X-ray image.
Adds AI overlay and realistic depth-based surface projection.
"""
try:
# 1οΈβ£ Read and preprocess input image
raw = await file.read()
img = Image.open(io.BytesIO(raw)).convert("L").resize((224, 224))
gray_u8 = np.array(img, dtype=np.uint8)
img_np = np.array(img, dtype=np.float32) / 255.0
# 2οΈβ£ Try AI overlay from your model
try:
res = analyze_fullbody(gray_u8)
overlay_img, _ = findings_overlay(gray_u8, res)
if overlay_img is not None:
base_img = overlay_img
else:
base_img = np.stack([gray_u8] * 3, axis=-1)
except Exception as overlay_error:
print("β οΈ Overlay generation failed:", overlay_error)
base_img = np.stack([gray_u8] * 3, axis=-1)
blended = cv2.cvtColor(base_img, cv2.COLOR_BGR2RGB)
# 3οΈβ£ Create pseudo depth volume
depth_slices = [img_np * (i / 10) for i in range(10)]
volume = np.stack(depth_slices, axis=0) # [D,H,W]
volume = torch.tensor(volume).unsqueeze(0).unsqueeze(0)
if volume.shape[1] == 1:
volume = volume.repeat(1, 3, 1, 1, 1)
print("β
Final volume shape:", volume.shape)
# # 4οΈβ£ Dummy forward pass for consistency (no inference)
# model = r3d_18(weights=None)
# model.eval()
# with torch.no_grad():
# _ = model(volume)
# 5οΈβ£ Generate 3D surface rotation frames
frames = []
H, W = gray_u8.shape
x, y = np.meshgrid(np.linspace(0, 1, W), np.linspace(0, 1, H))
depth = cv2.GaussianBlur(gray_u8.astype(np.float32), (9, 9), 0)
for angle in range(0, 360, 5):
fig = plt.figure(figsize=(3, 3))
ax = fig.add_subplot(111, projection="3d")
ax.view_init(30, angle)
ax.plot_surface(
x, y, depth / 255.0,
facecolors=blended / 255.0,
rstride=2, cstride=2,
linewidth=0, antialiased=False
)
ax.axis("off")
fig.canvas.draw()
frame = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
frame = frame.reshape(fig.canvas.get_width_height()[::-1] + (3,))
frames.append(frame)
plt.close(fig)
# 6οΈβ£ Save output video (MP4 preferred, fallback to GIF)
os.makedirs("static", exist_ok=True)
out_path = "static/xray_3d_local.mp4"
try:
import imageio_ffmpeg
writer = imageio.get_writer(
out_path, format="ffmpeg", mode="I",
fps=12, codec="libx264", quality=8
)
for frame in frames:
writer.append_data(frame)
writer.close()
video_url = f"{HF_SPACE_URL}/static/xray_3d_local.mp4"
msg = "β
3D MP4 video (with overlay) generated successfully"
except Exception as e:
print("β οΈ FFmpeg failed, saving GIF instead:", e)
gif_path = out_path.replace(".mp4", ".gif")
imageio.mimsave(gif_path, frames, duration=0.08)
video_url = f"{HF_SPACE_URL}/static/xray_3d_local.gif"
msg = "β
3D GIF (with overlay) generated successfully"
return JSONResponse({
"ok": True,
"message": msg,
"video_url": video_url
})
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse({"error": str(e)}, status_code=500)
# ============================================================
# β
MONAI + PyVista 3D Full-Body Visualization (Final HF Safe)
# ============================================================
from fastapi import UploadFile, File
from fastapi.responses import JSONResponse
import io, os, numpy as np, torch
from PIL import Image
from monai.networks.nets import UNet
import torch.nn.functional as F
import pyvista as pv
from pyvista import start_xvfb
import imageio.v3 as iio
HF_SPACE_URL = os.getenv("HF_SPACE_URL", "https://abbhy123ghh-x-ray-analysis-2.hf.space").rstrip("/")
@api.post("/generate_3d_monai")
async def generate_3d_monai(file: UploadFile = File(...)):
"""
Generate a realistic 3D volumetric visualization for any X-ray region
(skull, chest, limb, etc.) using MONAI for enhancement and PyVista for
volumetric rendering. Fully CPU-compatible for Hugging Face Spaces.
"""
try:
# --- Step 1: Read and preprocess image ---
raw = await file.read()
img = Image.open(io.BytesIO(raw)).convert("L").resize((256, 256))
gray = np.array(img, dtype=np.float32) / 255.0
# --- Step 2: Build synthetic 3D volume (simulate multiple slices) ---
depth_slices = 16
volume = np.stack([gray * (1 - i / depth_slices) for i in range(depth_slices)], axis=0)
volume = np.expand_dims(volume, axis=0) # [1, D, H, W]
# --- Step 3: MONAI lightweight 3D UNet (shallow for CPU) ---
x = torch.tensor(volume, dtype=torch.float32).unsqueeze(0) # [1, 1, D, H, W]
pad_d = (16 - x.shape[2] % 16) % 16
pad_h = (16 - x.shape[3] % 16) % 16
pad_w = (16 - x.shape[4] % 16) % 16
x = F.pad(x, (0, pad_w, 0, pad_h, 0, pad_d))
model = UNet(
spatial_dims=3,
in_channels=1,
out_channels=1,
channels=(8, 16, 32),
strides=(2, 2),
)
model.eval()
with torch.no_grad():
y = torch.sigmoid(model(x))
seg = y[0, 0].cpu().numpy()
seg = (seg - seg.min()) / (seg.max() - seg.min() + 1e-8)
seg = np.clip(seg * 255, 0, 255).astype(np.uint8)
# --- Step 4: Create PyVista grid (Hugging Face compatible) ---
start_xvfb()
grid = pv.ImageData(dimensions=seg.shape)
grid.spacing = (1, 1, 1)
grid.origin = (0, 0, 0)
grid.point_data["values"] = seg.flatten(order="F")
# --- Step 5: Enhanced 3D visualization with lighting ---
plotter = pv.Plotter(off_screen=True, window_size=[512, 512])
plotter.add_volume(
grid,
cmap="bone",
opacity="sigmoid_5",
shade=True,
diffuse=1.0,
specular=0.3,
specular_power=15,
)
plotter.set_background("black")
# Add realistic light
light = pv.Light(
position=(seg.shape[2]*2, seg.shape[1]*2, seg.shape[0]),
color='white',
intensity=1.5,
)
plotter.add_light(light)
frames = []
for angle in range(0, 360, 5):
plotter.camera_position = [
(seg.shape[2]*2, 0, seg.shape[0]/2),
(seg.shape[2]/2, seg.shape[1]/2, seg.shape[0]/2),
(0, 0, 1),
]
plotter.camera.azimuth = angle
img_frame = plotter.screenshot(return_img=True)
frames.append(img_frame)
plotter.close()
# --- Step 6: Save rotating MP4 video ---
os.makedirs("static", exist_ok=True)
out_path = "static/xray_monai_3d_fullbody.mp4"
iio.imwrite(out_path, frames, fps=12, codec="libx264")
return JSONResponse({
"ok": True,
"message": "β
MONAI + PyVista 3D full-body visualization generated successfully",
"video_url": f"{HF_SPACE_URL}/{out_path}"
})
except Exception as e:
import traceback; traceback.print_exc()
return JSONResponse({"error": str(e)}, status_code=500)
# -----------------------------
# Config
# -----------------------------
CLIP_CANDIDATES = [
"microsoft/BiomedCLIP-PubMedBERT_256-vit_base_patch16_224",
"openai/clip-vit-base-patch32",
]
CLIP_TEMP = float(os.getenv("CLIP_TEMP", "1.0"))
CHEST_TEMP = float(os.getenv("CHEST_TEMP", "1.2"))
CAL_TAU = float(os.getenv("CAL_TAU", "1.0")) # <-- ADDED: site calibration temperature
TB_REPO = os.getenv("TB_REPO", "").strip()
TB_LABEL = os.getenv("TB_LABEL", "Tuberculosis")
HF_TOKEN = os.getenv("HF_TOKEN", None)
DEFAULT_THRESHOLDS: Dict[str, float] = {
# chest
"Pneumonia": 0.70,
"Pulmonary Fibrosis": 0.75,
"COPD (proxy)": 0.75,
"Lung Cancer (proxy)": 0.70,
"Tuberculosis": 0.70,
# non-chest
"Compression fracture": 0.65,
"Scoliosis (proxy)": 0.65,
"Hip fracture": 0.65,
"OA (proxy)": 0.65,
"Fracture (upper_ext)": 0.65,
"Fracture (lower_ext)": 0.65,
"Implant/Hardware": 0.65,
"Skull fracture": 0.65,
}
ABSTAIN_MARGIN = float(os.getenv("ABSTAIN_MARGIN", "0.08"))
REGION_CONF_MIN = float(os.getenv("REGION_CONF_MIN", "0.40"))
REGION_NAMES = ["skull","chest","spine","pelvis","upper_ext","lower_ext"]
REGION_PROMPTS = {
"skull": ["X-ray of skull", "Head/skull radiograph"],
"chest": ["Chest X-ray", "Thorax radiograph"],
"spine": ["X-ray of spine", "Spine radiograph"],
"pelvis": ["Pelvis or hip X-ray", "Pelvic radiograph"],
"upper_ext": ["Upper extremity X-ray (shoulder to hand)", "Arm/wrist/hand X-ray"],
"lower_ext": ["Lower extremity X-ray (hip to foot)", "Leg/ankle/foot X-ray"],
}
REGION_DISPLAY = {
"chest":"chest",
"skull":"skull",
"spine":"spine",
"pelvis":"pelvis/hip",
"upper_ext":"upper extremity",
"lower_ext":"lower extremity",
"unknown":"unknown",
}
# Tasks (non-chest)
TASKS: Dict[str, List[Dict[str, Any]]] = {
"spine": [
{"name": "Compression fracture", "zs": ("X-ray of spine with compression fracture", "X-ray of spine without fracture")},
{"name": "Scoliosis (proxy)", "zs": ("X-ray of spine with scoliosis", "X-ray of a straight spine")},
],
"pelvis": [
{"name": "Hip fracture", "zs": ("Pelvis or hip X-ray with hip fracture", "Pelvis or hip X-ray without fracture")},
{"name": "OA (proxy)", "zs": ("Pelvis or hip X-ray with osteoarthritis", "Pelvis or hip X-ray with normal joint space")},
],
"upper_ext": [
{"name": "Fracture (upper_ext)", "zs": ("Upper extremity X-ray with fracture", "Upper extremity X-ray without fracture")},
{"name": "Implant/Hardware", "zs": ("X-ray with orthopedic implant or hardware", "X-ray without orthopedic implant")},
],
"lower_ext": [
{"name": "Fracture (lower_ext)", "zs": ("Lower extremity X-ray with fracture", "Lower extremity X-ray without fracture")},
{"name": "Implant/Hardware", "zs": ("X-ray with orthopedic implant or hardware", "X-ray without orthopedic implant")},
],
"skull": [
{"name": "Skull fracture", "zs": ("Skull X-ray with fracture", "Skull X-ray without fracture")},
],
}
# Sub-label descriptions for every screening item
CHEST_DESCRIPTIONS = {
"Pneumonia": "Patchy/lobar opacities suggesting infection/inflammation; confirm clinically Β± labs.",
"Pulmonary Fibrosis": "Chronic interstitial changes; consider HRCT for characterization.",
"COPD (proxy)": "Proxy via emphysema/hyperinflation; COPD diagnosis needs spirometry/history.",
"Lung Cancer (proxy)": "Proxy via mass/nodule; suspicious lesions need CT Β± biopsy.",
"Tuberculosis": "CXR overlaps with other diseases; definitive Dx needs microbiology/NAAT.",
"Asthma": "Typically not detectable on CXR; diagnosis is clinical with spirometry.",
}
TASK_DESCRIPTIONS = {
"Compression fracture": "Height loss of vertebral body; triage to CT/MRI if clinical concern.",
"Scoliosis (proxy)": "Curve or rotation; clinical significance depends on Cobb angle (not provided).",
"Hip fracture": "Disruption of trabeculae/cortical lines; confirm with dedicated views/CT.",
"OA (proxy)": "Joint space narrowing, osteophytes, subchondral changes; clinical correlation.",
"Fracture (upper_ext)": "Cortical break or lucent line; immobilize and obtain dedicated views.",
"Fracture (lower_ext)": "Cortical break or lucent line; weight-bearing precautions and follow-up.",
"Implant/Hardware": "Presence of orthopedic material; assess alignment/loosening if applicable.",
"Skull fracture": "Linear or depressed lucency; correlate with neuro status/CT.",
}
CHEST_TARGETS = {
"Pneumonia": ["Pneumonia"],
"Pulmonary Fibrosis": ["Fibrosis"],
"COPD (proxy)": ["Emphysema"],
"Lung Cancer (proxy)": ["Mass", "Nodule"],
"Tuberculosis": ["Tuberculosis", "TB"],
"Asthma": [],
}
# -----------------------------
# Utilities
# -----------------------------
def to_uint8(img):
if isinstance(img, Image.Image):
img = np.array(img)
if img.dtype == np.uint8:
return img
img = img.astype(np.float32)
img -= img.min(); rng = img.max() - img.min()
if rng > 1e-6:
img = img / (img.max() + 1e-6)
return (img * 255.0).clip(0, 255).astype(np.uint8)
def read_any_xray(path_or_buffer) -> np.ndarray:
"""
Reads a DICOM (.dcm) or image (PNG/JPG) and returns an 8-bit grayscale (uint8) array.
DICOM handling:
- Applies RescaleSlope/Intercept if present.
- Applies Window Center/Width (first value if multiple).
- Correctly inverts MONOCHROME1 (after VOI windowing).
"""
name = getattr(path_or_buffer, "name", str(path_or_buffer)).lower()
# Non-DICOM: use PIL and convert to 8-bit grayscale
if not name.endswith(".dcm"):
return np.array(Image.open(path_or_buffer).convert("L"))
if not HAVE_DICOM:
raise RuntimeError("Install pydicom to read DICOM files.")
ds = pydicom.dcmread(path_or_buffer)
# Raw pixel array -> float
arr = ds.pixel_array.astype(np.float32)
# 1) Modality LUT (Rescale Slope/Intercept)
slope = float(getattr(ds, "RescaleSlope", 1.0))
intercept = float(getattr(ds, "RescaleIntercept", 0.0))
if slope != 1.0 or intercept != 0.0:
arr = arr * slope + intercept
# Helper: coerce WindowCenter/Width to float (first value if multi)
def _first_float(x):
if x is None:
return None
try:
# pydicom may give MultiValue/Sequence/DSfloat
return float(np.atleast_1d(x)[0])
except Exception:
try:
return float(x)
except Exception:
return None
# 2) VOI LUT: Windowing (if available)
wc = _first_float(getattr(ds, "WindowCenter", None))
ww = _first_float(getattr(ds, "WindowWidth", None))
if ww is not None and ww > 0:
lo = wc - ww / 2.0
hi = wc + ww / 2.0
arr = np.clip(arr, lo, hi)
arr = (arr - lo) / (hi - lo + 1e-6) # -> 0..1
else:
# Fallback min-max
mn = float(np.min(arr))
mx = float(np.max(arr))
arr = (arr - mn) / (mx - mn + 1e-6)
# 3) Photometric interpretation (invert MONOCHROME1)
phot = str(getattr(ds, "PhotometricInterpretation", "")).upper()
if phot == "MONOCHROME1":
arr = 1.0 - arr
# Return uint8 image
return (arr * 255.0).clip(0, 255).astype(np.uint8)
def edges_overlay(gray_u8: np.ndarray) -> np.ndarray:
try:
edges = cv2.Canny(gray_u8, 50, 150)
overlay = np.stack([gray_u8]*3, axis=-1)
overlay[edges>0] = [255, 80, 80]
return overlay
except Exception:
return np.stack([gray_u8]*3, axis=-1)
def edges_overlay(gray_u8: np.ndarray) -> np.ndarray:
try:
edges = cv2.Canny(gray_u8, 50, 150)
overlay = np.stack([gray_u8]*3, axis=-1)
overlay[edges>0] = [255, 80, 80]
return overlay
except Exception:
return np.stack([gray_u8]*3, axis=-1)
# β
Add your pseudo-3D function here
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D # noqa: F401
def pseudo3d_surface(gray_u8: np.ndarray) -> np.ndarray:
"""Generate a pseudo-3D surface plot from a 2D X-ray."""
try:
H, W = gray_u8.shape
X, Y = np.meshgrid(np.arange(W), np.arange(H))
Z = gray_u8.astype(np.float32)
fig = plt.figure(figsize=(6, 6))
ax = fig.add_subplot(111, projection='3d')
ax.plot_surface(X, Y, Z, cmap="bone", linewidth=0, antialiased=True)
ax.set_axis_off()
fig.tight_layout(pad=0)
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight", pad_inches=0)
plt.close(fig)
buf.seek(0)
img = Image.open(buf).convert("RGB")
return np.array(img)
except Exception as e:
print("Pseudo-3D generation failed:", e)
return np.stack([gray_u8]*3, axis=-1)
def xray_to_depth_3d(gray_u8: np.ndarray) -> np.ndarray:
"""Convert 2D X-ray into pseudo-3D surface using Intel/dpt-hybrid-midas."""
if not HAVE_DEPTH:
return np.stack([gray_u8]*3, axis=-1)
try:
pil = Image.fromarray(gray_u8).convert("RGB")
inputs = DEPTH_EXTRACTOR(images=pil, return_tensors="pt").to(DEVICE)
with torch.no_grad():
outputs = DEPTH_MODEL(**inputs)
predicted_depth = outputs.predicted_depth.squeeze().cpu().numpy()
# Normalize depth
depth_min, depth_max = predicted_depth.min(), predicted_depth.max()
depth_norm = (predicted_depth - depth_min) / (depth_max - depth_min + 1e-6)
# Render pseudo-3D
H, W = depth_norm.shape
X, Y = np.meshgrid(np.arange(W), np.arange(H))
Z = depth_norm * 255
fig = plt.figure(figsize=(6, 6))
ax = fig.add_subplot(111, projection="3d")
ax.plot_surface(X, Y, Z, cmap="bone", linewidth=0, antialiased=True)
ax.set_axis_off()
fig.tight_layout(pad=0)
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight", pad_inches=0)
plt.close(fig)
buf.seek(0)
img = Image.open(buf).convert("RGB")
return np.array(img)
except Exception as e:
print("3D depth rendering failed:", e)
return np.stack([gray_u8]*3, axis=-1)
def risk_level(prob: Optional[float]) -> str:
if prob is None: return "N/A"
if prob >= 0.70: return "High"
if prob >= 0.50: return "Moderate"
return "Low"
def risk_tag(prob: Optional[float]) -> str:
level = risk_level(prob)
color = {"High":"#E03131","Moderate":"#F59F00","Low":"#2F9E44","N/A":"#868E96"}[level]
pct = "N/A" if prob is None else f"{prob:.0%}"
return f"{level} {pct}"
def percent_bar(prob: Optional[float]) -> str:
if prob is None: return ""
width = int(round(prob*100))
return (
"
No report generated.
" text = re.sub(r'\n{2,}', '\n\n', text.strip()) # normalize spacing html_text = markdown.markdown(text) # convert Markdown to HTML html_text = html_text.replace('\n', '