# -*- coding: utf-8 -*- """ Remote Indexer — Async (FAISS) for code repos. API: - POST /index -> {job_id} - GET /status/{job_id} -> JobState - POST /search -> {"results":[{path,text,score,...}, ...]} (file-level aggregation by default) - GET /artifacts/{project_id}/dataset -> tgz - GET /artifacts/{project_id}/faiss -> tgz Key improvements (v3.1.0): - Code-aware chunking by lines + code boundaries (instead of character tokenization) - Semantic chunk header (FILE/LANG/KIND/LINES) to improve embedding signal - store_text respected (embed always, store configurable) - Search groups by file (unique paths) with aggregation; still backward compatible with {path,text,score} """ from __future__ import annotations import os import io import re import json import time import tarfile import hashlib import logging from typing import Dict, Any, List, Tuple, Optional from concurrent.futures import ThreadPoolExecutor import numpy as np import faiss import gradio as gr from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel # ============================================================================= # LOGGING # ============================================================================= LOG = logging.getLogger("remote_indexer") DBG = logging.getLogger("remote_indexer.dbg") if not LOG.handlers: _h = logging.StreamHandler() _h.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) LOG.addHandler(_h) LOG.setLevel(os.getenv("LOG_LEVEL", "DEBUG").upper()) if not DBG.handlers: _h2 = logging.StreamHandler() _h2.setFormatter(logging.Formatter("[DEBUG] %(asctime)s - %(message)s")) DBG.addHandler(_h2) DBG.setLevel(os.getenv("DBG_LEVEL", "DEBUG").upper()) # ============================================================================= # CONFIG / ENV # ============================================================================= PORT = int(os.getenv("PORT", "7860")) DATA_ROOT = os.getenv("DATA_ROOT", "/data").rstrip("/") or "/data" os.makedirs(DATA_ROOT, exist_ok=True) EMB_PROVIDER = os.getenv("EMB_PROVIDER", "dummy").strip().lower() EMB_MODEL = os.getenv("EMB_MODEL", "sentence-transformers/all-mpnet-base-v2").strip() EMB_BATCH = int(os.getenv("EMB_BATCH", "32")) EMB_DIM = int(os.getenv("EMB_DIM", "128")) # dummy only MAX_WORKERS = int(os.getenv("MAX_WORKERS", "1")) # ============================================================================= # CACHE DIRECTORIES (avoid PermissionError on HF) # ============================================================================= def _setup_cache_dirs() -> Dict[str, str]: os.environ.setdefault("HOME", "/home/user") cache_root = os.getenv("CACHE_ROOT", "/tmp/.cache").rstrip("/") paths = { "root": cache_root, "hf_home": f"{cache_root}/huggingface", "hf_hub": f"{cache_root}/huggingface/hub", "hf_tf": f"{cache_root}/huggingface/transformers", "torch": f"{cache_root}/torch", "st": f"{cache_root}/sentence-transformers", "mpl": f"{cache_root}/matplotlib", } for p in paths.values(): try: os.makedirs(p, exist_ok=True) except Exception as e: LOG.warning("Impossible de créer %s : %s", p, e) os.environ["HF_HOME"] = paths["hf_home"] os.environ["HF_HUB_CACHE"] = paths["hf_hub"] os.environ["TRANSFORMERS_CACHE"] = paths["hf_tf"] os.environ["TORCH_HOME"] = paths["torch"] os.environ["SENTENCE_TRANSFORMERS_HOME"] = paths["st"] os.environ["MPLCONFIGDIR"] = paths["mpl"] os.environ.setdefault("HF_HUB_DISABLE_SYMLINKS_WARNING", "1") os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") LOG.info("Caches configurés: %s", json.dumps(paths, indent=2)) return paths CACHE_PATHS = _setup_cache_dirs() _ST_MODEL = None _HF_TOKENIZER = None _HF_MODEL = None # ============================================================================= # JOB STATE # ============================================================================= class JobState(BaseModel): job_id: str project_id: str stage: str = "pending" # pending -> queued -> chunking -> embedding -> indexing -> done/failed total_files: int = 0 total_chunks: int = 0 embedded: int = 0 indexed: int = 0 errors: List[str] = [] messages: List[str] = [] started_at: float = time.time() finished_at: Optional[float] = None JOBS: Dict[str, JobState] = {} def _now() -> str: return time.strftime("%H:%M:%S") def _proj_dirs(project_id: str) -> Tuple[str, str, str]: base = os.path.join(DATA_ROOT, project_id) ds_dir = os.path.join(base, "dataset") fx_dir = os.path.join(base, "faiss") os.makedirs(ds_dir, exist_ok=True) os.makedirs(fx_dir, exist_ok=True) return base, ds_dir, fx_dir def _add_msg(st: JobState, msg: str): st.messages.append(f"[{_now()}] {msg}") LOG.info("[%s] %s", st.job_id, msg) DBG.debug("[%s] %s", st.job_id, msg) def _set_stage(st: JobState, stage: str): st.stage = stage _add_msg(st, f"stage={stage}") # ============================================================================= # CHUNKING (code-aware) # ============================================================================= _RE_CODE_BOUNDARY = re.compile( r"""^\s*(export\s+)?(default\s+)?(async\s+)?(function|class|const|let|var)\b|^\s*interface\b|^\s*type\b""", re.IGNORECASE, ) def _infer_kind(path: str) -> str: p = (path or "").lower().replace("\\", "/") if "/hooks/" in p or p.endswith(("hook.ts", "hook.tsx")) or os.path.basename(p).startswith("use"): return "hook" if "/components/" in p: return "component" if "/pages/" in p or "/routes/" in p: return "page" if "/api/" in p: return "api" if p.endswith((".test.ts", ".test.tsx", ".spec.ts", ".spec.tsx", ".test.js", ".spec.js")): return "test" return "file" def _make_chunk_header(path: str, kind: str, start_line: int, end_line: int) -> str: ext = os.path.splitext(path)[1].lstrip(".").lower() return ( f"FILE: {path}\n" f"LANG: {ext or 'text'}\n" f"KIND: {kind}\n" f"LINES: {start_line}-{end_line}\n" ) def _chunk_text_codeaware(text: str, size: int = 1200, overlap: int = 150) -> List[Dict[str, Any]]: """ Chunking par lignes + frontières de code. size/overlap sont en caractères approximatifs. Retour: [{start_line,end_line,text}, ...] """ text = (text or "").replace("\r\n", "\n").replace("\r", "\n") lines = text.split("\n") out: List[Dict[str, Any]] = [] buf: List[str] = [] buf_len = 0 start_line = 1 def flush(end_line: int): nonlocal buf, buf_len, start_line chunk = "\n".join(buf).strip() if chunk: out.append({"start_line": start_line, "end_line": end_line, "text": chunk}) buf = [] buf_len = 0 start_line = end_line + 1 for i, line in enumerate(lines, start=1): if buf and buf_len >= size and _RE_CODE_BOUNDARY.match(line): flush(i - 1) buf.append(line) buf_len += len(line) + 1 if buf_len >= size: flush(i) if buf: flush(len(lines)) # overlap (approx): prepend tail of previous chunk if overlap > 0 and len(out) >= 2: new_out = [out[0]] for prev, cur in zip(out, out[1:]): tail = prev["text"][-overlap:] cur2 = dict(cur) cur2["text"] = (tail + "\n" + cur["text"]).strip() cur2["start_line"] = max(1, cur["start_line"] - 1) new_out.append(cur2) out = new_out return out # ============================================================================= # EMBEDDINGS # ============================================================================= def _emb_dummy(texts: List[str], dim: int = EMB_DIM) -> np.ndarray: vecs = np.zeros((len(texts), dim), dtype="float32") for i, t in enumerate(texts): h = hashlib.sha1((t or "").encode("utf-8")).digest() rng = np.random.default_rng(int.from_bytes(h[:8], "little", signed=False)) v = rng.standard_normal(dim).astype("float32") vecs[i] = v / (np.linalg.norm(v) + 1e-9) return vecs def _get_st_model(): global _ST_MODEL if _ST_MODEL is None: from sentence_transformers import SentenceTransformer _ST_MODEL = SentenceTransformer(EMB_MODEL, cache_folder=CACHE_PATHS["st"]) LOG.info("[st] modèle chargé: %s (cache=%s)", EMB_MODEL, CACHE_PATHS["st"]) return _ST_MODEL def _emb_st(texts: List[str], batch_size: int) -> np.ndarray: model = _get_st_model() vecs = model.encode( texts, batch_size=max(1, int(batch_size)), convert_to_numpy=True, normalize_embeddings=True, show_progress_bar=False, ).astype("float32") return vecs def _get_hf_model(): global _HF_TOKENIZER, _HF_MODEL if _HF_MODEL is None or _HF_TOKENIZER is None: from transformers import AutoTokenizer, AutoModel _HF_TOKENIZER = AutoTokenizer.from_pretrained(EMB_MODEL, cache_dir=CACHE_PATHS["hf_tf"]) _HF_MODEL = AutoModel.from_pretrained(EMB_MODEL, cache_dir=CACHE_PATHS["hf_tf"]) _HF_MODEL.eval() LOG.info("[hf] modèle chargé: %s (cache=%s)", EMB_MODEL, CACHE_PATHS["hf_tf"]) return _HF_TOKENIZER, _HF_MODEL def _mean_pool(last_hidden_state: "np.ndarray", attention_mask: "np.ndarray") -> "np.ndarray": mask = attention_mask[..., None] s = (last_hidden_state * mask).sum(axis=1) d = mask.sum(axis=1) + 1e-9 return s / d def _l2_normalize(x: np.ndarray) -> np.ndarray: n = np.linalg.norm(x, axis=1, keepdims=True) + 1e-12 return x / n def _emb_hf(texts: List[str], batch_size: int) -> np.ndarray: tok, model = _get_hf_model() # Torch is required only here import torch outs: List[np.ndarray] = [] bs = max(1, int(batch_size)) with torch.no_grad(): for i in range(0, len(texts), bs): batch = texts[i:i+bs] enc = tok(batch, padding=True, truncation=True, return_tensors="pt") out = model(**enc) last = out.last_hidden_state.detach().cpu().numpy() mask = enc["attention_mask"].detach().cpu().numpy() pooled = _mean_pool(last, mask) outs.append(pooled.astype("float32")) xb = np.vstack(outs) if outs else np.zeros((0, 1), dtype="float32") xb = _l2_normalize(xb) return xb.astype("float32") # ============================================================================= # DATASET + FAISS IO # ============================================================================= def _save_dataset(ds_dir: str, rows: List[Dict[str, Any]]) -> None: os.makedirs(ds_dir, exist_ok=True) data_path = os.path.join(ds_dir, "data.jsonl") with open(data_path, "w", encoding="utf-8") as f: for r in rows: f.write(json.dumps(r, ensure_ascii=False) + "\n") def _load_dataset(ds_dir: str) -> List[Dict[str, Any]]: data_path = os.path.join(ds_dir, "data.jsonl") if not os.path.isfile(data_path): return [] out: List[Dict[str, Any]] = [] with open(data_path, "r", encoding="utf-8") as f: for line in f: try: out.append(json.loads(line)) except Exception: continue return out def _save_faiss(fx_dir: str, xb: np.ndarray, meta: Dict[str, Any]) -> None: os.makedirs(fx_dir, exist_ok=True) idx_path = os.path.join(fx_dir, "emb.faiss") index = faiss.IndexFlatIP(xb.shape[1]) # cosine ~ inner product if normalized index.add(xb) faiss.write_index(index, idx_path) with open(os.path.join(fx_dir, "meta.json"), "w", encoding="utf-8") as f: json.dump(meta, f, ensure_ascii=False, indent=2) def _load_faiss(fx_dir: str) -> faiss.Index: idx_path = os.path.join(fx_dir, "emb.faiss") if not os.path.isfile(idx_path): raise FileNotFoundError(f"FAISS index introuvable: {idx_path}") return faiss.read_index(idx_path) def _tar_dir_to_bytes(dir_path: str) -> bytes: bio = io.BytesIO() with tarfile.open(fileobj=bio, mode="w:gz") as tar: tar.add(dir_path, arcname=os.path.basename(dir_path)) bio.seek(0) return bio.read() # ============================================================================= # WORKER POOL (async) # ============================================================================= EXECUTOR = ThreadPoolExecutor(max_workers=max(1, MAX_WORKERS)) LOG.info("ThreadPoolExecutor initialisé : max_workers=%s", MAX_WORKERS) def _do_index_job( st: JobState, files: List[Dict[str, str]], chunk_size: int, overlap: int, batch_size: int, store_text: bool, ) -> None: """ Heavy task running in worker thread. Updates st during pipeline. """ try: _, ds_dir, fx_dir = _proj_dirs(st.project_id) # 1) Chunking _set_stage(st, "chunking") rows: List[Dict[str, Any]] = [] embed_texts: List[str] = [] st.total_files = len(files) for it in files: path = (it.get("path") or "unknown").strip() txt = it.get("text") or "" kind = _infer_kind(path) chunks = _chunk_text_codeaware(txt, size=int(chunk_size), overlap=int(overlap)) _add_msg(st, f"{path}: len(text)={len(txt)} chunks={len(chunks)}") for ci, ck in enumerate(chunks): header = _make_chunk_header(path, kind, int(ck["start_line"]), int(ck["end_line"])) merged = header + "\n" + (ck["text"] or "") # Always embed merged (full signal) embed_texts.append(merged) # Store text depending on store_text (but keep at least header) stored_text = merged if store_text else header rows.append({ "path": path, "text": stored_text, "chunk_id": int(ci), "kind": kind, "start_line": int(ck["start_line"]), "end_line": int(ck["end_line"]), }) st.total_chunks = len(rows) _add_msg(st, f"Total chunks = {st.total_chunks}") # 2) Embedding _set_stage(st, "embedding") if EMB_PROVIDER == "dummy": xb = _emb_dummy(embed_texts, dim=EMB_DIM) dim = xb.shape[1] elif EMB_PROVIDER == "st": xb = _emb_st(embed_texts, batch_size=batch_size or EMB_BATCH) dim = xb.shape[1] else: xb = _emb_hf(embed_texts, batch_size=batch_size or EMB_BATCH) dim = xb.shape[1] st.embedded = int(xb.shape[0]) _add_msg(st, f"Embeddings {st.embedded}/{st.total_chunks}") _add_msg(st, f"Embeddings dim={dim}") # 3) Save dataset (jsonl) _save_dataset(ds_dir, rows) _add_msg(st, f"Dataset sauvegardé dans {ds_dir} (store_text={store_text})") # 4) FAISS _set_stage(st, "indexing") faiss_meta = { "dim": int(dim), "count": int(xb.shape[0]), "provider": EMB_PROVIDER, "model": EMB_MODEL if EMB_PROVIDER != "dummy" else None, "chunking": "codeaware_lines_v1", "store_text": bool(store_text), } _save_faiss(fx_dir, xb, meta=faiss_meta) st.indexed = int(xb.shape[0]) _add_msg(st, f"FAISS écrit sur {os.path.join(fx_dir, 'emb.faiss')}") _add_msg(st, f"OK — dataset+index prêts (projet={st.project_id})") _set_stage(st, "done") st.finished_at = time.time() except Exception as e: LOG.exception("Job %s failed", st.job_id) st.errors.append(str(e)) _add_msg(st, f"❌ Exception: {e}") st.stage = "failed" st.finished_at = time.time() def _submit_job( project_id: str, files: List[Dict[str, str]], chunk_size: int, overlap: int, batch_size: int, store_text: bool ) -> str: job_id = hashlib.sha1(f"{project_id}{time.time()}".encode()).hexdigest()[:12] st = JobState(job_id=job_id, project_id=project_id, stage="pending", messages=[]) JOBS[job_id] = st _add_msg(st, f"Job {job_id} créé pour project {project_id}") _add_msg(st, f"Index start project={project_id} files={len(files)} chunk_size={chunk_size} overlap={overlap} batch_size={batch_size} store_text={store_text} provider={EMB_PROVIDER} model={EMB_MODEL if EMB_PROVIDER!='dummy' else '-'}") EXECUTOR.submit(_do_index_job, st, files, chunk_size, overlap, batch_size, store_text) _set_stage(st, "queued") return job_id # ============================================================================= # FASTAPI # ============================================================================= fastapi_app = FastAPI(title="remote-indexer-async", version="3.1.0") fastapi_app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class FileItem(BaseModel): path: str text: str class IndexRequest(BaseModel): project_id: str files: List[FileItem] chunk_size: int = 512 overlap: int = 50 batch_size: int = 32 store_text: bool = True class SearchRequest(BaseModel): project_id: str query: str k: int = 10 # new: fetch more chunks then aggregate by file group_by_file: bool = True fetch_k: int = 60 # number of chunks retrieved before aggregation @fastapi_app.get("/health") def health(): info = { "ok": True, "service": "remote-indexer-async", "provider": EMB_PROVIDER, "model": EMB_MODEL if EMB_PROVIDER != "dummy" else None, "cache_root": os.getenv("CACHE_ROOT", "/tmp/.cache"), "workers": MAX_WORKERS, "data_root": DATA_ROOT, "version": "3.1.0", } return info @fastapi_app.get("/") def root_redirect(): return {"ok": True, "service": "remote-indexer-async", "ui": "/ui"} @fastapi_app.post("/index") def index(req: IndexRequest): """ Async: returns immediately a job_id. Worker thread does the heavy job. """ try: files = [fi.model_dump() for fi in req.files] job_id = _submit_job( project_id=req.project_id, files=files, chunk_size=int(req.chunk_size), overlap=int(req.overlap), batch_size=int(req.batch_size), store_text=bool(req.store_text), ) return {"job_id": job_id} except Exception as e: LOG.exception("index failed (submit)") raise HTTPException(status_code=500, detail=str(e)) @fastapi_app.get("/status/{job_id}") def status(job_id: str): st = JOBS.get(job_id) if not st: raise HTTPException(status_code=404, detail="job inconnu") return JSONResponse(st.model_dump()) @fastapi_app.post("/search") def search(req: SearchRequest): _, ds_dir, fx_dir = _proj_dirs(req.project_id) idx_path = os.path.join(fx_dir, "emb.faiss") ds_path = os.path.join(ds_dir, "data.jsonl") if not (os.path.isfile(idx_path) and os.path.isfile(ds_path)): raise HTTPException(status_code=409, detail="Index non prêt (reviens plus tard)") rows = _load_dataset(ds_dir) if not rows: raise HTTPException(status_code=404, detail="dataset introuvable") # Query embedding with the SAME provider qtxt = (req.query or "").strip() if not qtxt: return {"results": []} bs = max(1, int(EMB_BATCH)) if EMB_PROVIDER == "dummy": q = _emb_dummy([qtxt], dim=EMB_DIM)[0:1, :] elif EMB_PROVIDER == "st": q = _emb_st([qtxt], batch_size=1)[0:1, :] else: q = _emb_hf([qtxt], batch_size=1)[0:1, :] index = _load_faiss(fx_dir) if index.d != q.shape[1]: raise HTTPException(status_code=500, detail=f"dim incompatibles: index.d={index.d} vs query={q.shape[1]}") k = int(max(1, req.k)) fetch_k = int(max(k, req.fetch_k or k)) scores, ids = index.search(q, fetch_k) ids = ids[0].tolist() scores = scores[0].tolist() raw_hits: List[Dict[str, Any]] = [] for idx, sc in zip(ids, scores): if idx < 0 or idx >= len(rows): continue r = rows[idx] raw_hits.append({ "path": r.get("path") or "", "text": r.get("text") or "", "score": float(sc), "chunk_id": r.get("chunk_id"), "kind": r.get("kind"), "start_line": r.get("start_line"), "end_line": r.get("end_line"), }) # Legacy mode: return chunk-level as-is if not bool(req.group_by_file): return {"results": raw_hits[:k]} # File-level aggregation (unique path) from collections import defaultdict by_path: Dict[str, List[Dict[str, Any]]] = defaultdict(list) for h in raw_hits: p = h.get("path") or "" if p: by_path[p].append(h) file_results: List[Dict[str, Any]] = [] for path, hits in by_path.items(): hits.sort(key=lambda x: x["score"], reverse=True) best = hits[0] top = hits[:3] avg_top = sum(h["score"] for h in top) / max(1, len(top)) file_score = float(best["score"] + 0.15 * avg_top) # Backward compat: keep {path,text,score} at top-level (text = best chunk stored) file_results.append({ "path": path, "text": best.get("text") or "", "score": file_score, # Extra diagnostics (optional for client) "chunk_id": best.get("chunk_id"), "kind": best.get("kind"), "start_line": best.get("start_line"), "end_line": best.get("end_line"), "hits": int(len(hits)), "top_chunks": [ { "chunk_id": h.get("chunk_id"), "score": float(h.get("score") or 0.0), "start_line": h.get("start_line"), "end_line": h.get("end_line"), } for h in top ], }) file_results.sort(key=lambda x: x["score"], reverse=True) return {"results": file_results[:k]} # ----------- ARTIFACTS EXPORT ----------- @fastapi_app.get("/artifacts/{project_id}/dataset") def download_dataset(project_id: str): _, ds_dir, _ = _proj_dirs(project_id) if not os.path.isdir(ds_dir): raise HTTPException(status_code=404, detail="Dataset introuvable") buf = _tar_dir_to_bytes(ds_dir) headers = {"Content-Disposition": f'attachment; filename="{project_id}_dataset.tgz"'} return StreamingResponse(io.BytesIO(buf), media_type="application/gzip", headers=headers) @fastapi_app.get("/artifacts/{project_id}/faiss") def download_faiss(project_id: str): _, _, fx_dir = _proj_dirs(project_id) if not os.path.isdir(fx_dir): raise HTTPException(status_code=404, detail="FAISS introuvable") buf = _tar_dir_to_bytes(fx_dir) headers = {"Content-Disposition": f'attachment; filename="{project_id}_faiss.tgz"'} return StreamingResponse(io.BytesIO(buf), media_type="application/gzip", headers=headers) # ============================================================================= # GRADIO UI (optional) # ============================================================================= def _ui_index(project_id: str, sample_text: str): files = [{"path": "sample.tsx", "text": sample_text}] try: req = IndexRequest(project_id=project_id, files=[FileItem(**f) for f in files]) res = index(req) return f"Job lancé: {res['job_id']}" except Exception as e: return f"Erreur index: {e}" def _ui_search(project_id: str, query: str, k: int): try: res = search(SearchRequest(project_id=project_id, query=query, k=int(k))) return json.dumps(res, ensure_ascii=False, indent=2) except Exception as e: return f"Erreur search: {e}" with gr.Blocks(title="Remote Indexer (Async FAISS)", analytics_enabled=False) as ui: gr.Markdown("## Remote Indexer — **Async** (API: `/index`, `/status/{job}`, `/search`, `/artifacts/.`).") gr.Markdown( f"**Provider**: `{EMB_PROVIDER}` — **Model**: `{EMB_MODEL if EMB_PROVIDER!='dummy' else '-'}` — " f"**Cache**: `{os.getenv('CACHE_ROOT', '/tmp/.cache')}` — **Workers**: `{MAX_WORKERS}`" ) with gr.Tab("Index"): pid = gr.Textbox(label="Project ID", value="DEEPWEB") sample = gr.Textbox( label="Texte d’exemple", value="export const AlbumCard = () => { return
Albums
}", lines=4 ) btn = gr.Button("Lancer index (sample)") out = gr.Textbox(label="Résultat") btn.click(_ui_index, inputs=[pid, sample], outputs=[out]) with gr.Tab("Search"): pid2 = gr.Textbox(label="Project ID", value="DEEPWEB") q = gr.Textbox(label="Query", value="améliorer la page albums") k = gr.Slider(1, 20, value=10, step=1, label="k") btn2 = gr.Button("Rechercher") out2 = gr.Code(label="Résultats") btn2.click(_ui_search, inputs=[pid2, q, k], outputs=[out2]) fastapi_app = gr.mount_gradio_app(fastapi_app, ui, path="/ui") # ============================================================================= # MAIN # ============================================================================= if __name__ == "__main__": import uvicorn LOG.info("Démarrage Uvicorn sur 0.0.0.0:%s (UI_PATH=/ui) — async index", PORT) uvicorn.run(fastapi_app, host="0.0.0.0", port=PORT)