mi primera base de datos vetorial :)
Vivimos tiempos interesantes en cuanto al estado de la técnica en el terreno de las tecnologías de la información. Con esa manía capitalista de convertir todo en mercancía, ciertos avances técnicos se engalanan en entidades abstractas pseudomágicas a la vez que se ofusca su verdadero funcionamiento interior. Y la inteligencia artificial no deja de sufrir esta tergiversación de la realidad.
El caso es que la única manera que encuentro de desmitificar y bajar tierra las ideas preconcebidas que podamos tener sobre una materia, y especialmente sobre su aplicación práctica, es pasar a la acción, que en este caso consiste en hacer correr algo de código en mi máquina local. Así que tras una corta búsqueda sobre bases de datos vectoriales, un asombro infinito ante las posibilidades que Euclides aró en la mente humana y sus aplicaciones en la arquitectura de la información, ordené codificar en python un script que se dedica a:
1.- Buscar archivos pdf (y de texto) en una determinada carpeta, extrayendo el texto.
2.- Hacer chunks (o cachos) de texto con solape de 50 tokenes.
3.- Indexar vectores con FAISS (Facebook AI Similarity Search), que es una biblioteca de código abierto creada por Meta (antes Facebook) para buscar y agrupar vectores de alta dimensión rápidamente.
4.- Guardar los metadatos y el mapa de vectores en una base de datos, dando persistencia al trabajo.
5.- Montar un servidor local para accionar el indexado de datos y realizar búsquedas (interfaz de usuario).
El archivo indexer.py consta del siguiente código
# indexer.py
"""
Minimal local vector indexer + tiny Flask UI (modificado).
Minimal local vector indexer + tiny Flask UI.
- Walks a folder, extracts text (plain + PDF),
- Chunks text with overlap (50 tokens default), embeds with SentenceTransformers (local model),
- Indexes vectors with FAISS / FAISS HNSW (or fallback brute-force),
- Persists metadata and vector-id mapping in SQLite, PRAGMA WAL for better concurrency.
- Provides a simple web UI for indexing and search.
Usage:
python indexer.py --data_dir "C:\path\to\folder" --port 5000
"""
import os
import sqlite3
import argparse
import threading
import time
from pathlib import Path
from datetime import datetime
from flask import Flask, request, render_template_string
import numpy as np
# Try import faiss, otherwise set fallback
try:
import faiss
HAVE_FAISS = True
except Exception:
faiss = None
HAVE_FAISS = False
from sentence_transformers import SentenceTransformer
from pdfminer.high_level import extract_text as pdf_extract
import magic
import tiktoken
# ---------- Config defaults ----------
MODEL_NAME = "all-MiniLM-L6-v2" # compact local model (384d)
EMBED_DIM = 384
CHUNK_TOKENS = 256
CHUNK_OVERLAP = 50 # tokens of overlap between chunks
BATCH_SIZE = 64
# DB / index files
DEFAULT_DB = "vector_store_pdf.db"
DEFAULT_INDEX = "faiss.index_pdf" # used only if faiss present
# Simple HTML (file links and longer snippet)
HTML = """<!doctype html>
<title>Local Vector Indexer</title>
<h2>Folder: {{data_dir}}</h2>
<form action="/index" method="post"><button type="submit">Run indexing</button></form>
<form action="/search" method="get">
Query: <input name="q" size="60"/>
K: <input name="k" value="50" size="2"/>
<button type="submit">Search</button>
</form>
{% if msg %}<p><b>{{msg}}</b></p>{% endif %}
{% if results %}
<h3>Results</h3>
<ul>
{% for r in results %}
<li><b>score:</b>{{"%.4f"|format(r.score)}} — <b>file:</b><a href="file://{{r.path}}" target="_blank">{{r.path}}</a> — {{r.snippet}}</li>
{% endfor %}
</ul>
{% endif %}
"""
# ---------- tokenization ----------
ENC = tiktoken.get_encoding("cl100k_base")
def num_tokens(text: str) -> int:
return len(ENC.encode(text))
def chunk_text(text: str, max_tokens: int = CHUNK_TOKENS, overlap: int = CHUNK_OVERLAP):
"""
Chunk text into pieces with approximate token counts, using token-counted words,
and applying a sliding-window overlap (in tokens).
"""
words = text.split()
chunks = []
cur = []
cur_tokens = 0
word_tokens = []
# Precompute token length per word (including trailing space)
for w in words:
t = num_tokens(w + " ")
word_tokens.append((w, t))
i = 0
n = len(word_tokens)
while i < n:
cur = []
cur_tokens = 0
j = i
while j < n:
w, t = word_tokens[j]
if cur_tokens + t > max_tokens and cur:
break
cur.append(w)
cur_tokens += t
j += 1
chunks.append(" ".join(cur))
if j >= n:
break
# advance i by chunk size minus overlap (in tokens) -> find new i index
# we need to move back overlap tokens worth of words
# compute how many words to step such that last `overlap` tokens are kept
if overlap <= 0:
i = j
else:
# walk back from j-1 adding tokens until we reach overlap
back = 0
k = j - 1
while k >= i and back < overlap:
back += word_tokens[k][1]
k -= 1
# next start is k+1 (one after where back accumulation stopped)
i = max(i + 1, k + 1) # ensure forward progress at least 1
return chunks
def extract_text_from_file(path: Path):
try:
m = magic.from_file(str(path), mime=True)
except Exception:
m = None
if m == "application/pdf" or path.suffix.lower() == ".pdf":
try:
return pdf_extract(str(path))
except Exception:
return ""
else:
try:
return path.read_text(encoding="utf8")
except Exception:
try:
return path.read_text(encoding="latin1")
except Exception:
return ""
# ---------- SQLite schema ----------
def init_db(conn: sqlite3.Connection):
cur = conn.cursor()
cur.execute("PRAGMA journal_mode=WAL;")
cur.execute("""
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY,
path TEXT UNIQUE,
mtime REAL,
size INTEGER
)""")
cur.execute("""
CREATE TABLE IF NOT EXISTS chunks (
id INTEGER PRIMARY KEY,
file_id INTEGER,
chunk_index INTEGER,
text_snippet TEXT,
vector_id INTEGER UNIQUE,
created_at REAL,
FOREIGN KEY(file_id) REFERENCES files(id)
)""")
cur.execute("""
CREATE TABLE IF NOT EXISTS meta (
k TEXT PRIMARY KEY,
v TEXT
)""")
conn.commit()
# ---------- Index wrapper (FAISS or fallback) ----------
class VectorIndex:
def __init__(self, dim: int, index_path: str = DEFAULT_INDEX, faiss_hnsw: bool = True):
self.dim = dim
self.index_path = index_path
self.lock = threading.Lock()
self.faiss_hnsw = faiss_hnsw
if HAVE_FAISS:
if os.path.exists(index_path):
try:
print(f"Loading FAISS index from {index_path}")
self.index = faiss.read_index(index_path)
except Exception as e:
print("Failed to read FAISS index, recreating. Error:", e)
self._create_faiss_index()
else:
self._create_faiss_index()
self.ntotal = int(self.index.ntotal)
else:
# fallback: store vectors in-memory and brute-force search
self.vectors = [] # list of np arrays
self.ntotal = 0
def _create_faiss_index(self):
# prefer HNSW (approx, fast) for scalable similarity; fallback to IndexFlatIP
try:
if self.faiss_hnsw:
print("Creating FAISS HNSW index (IndexHNSWFlat, inner-product).")
# HNSW on inner product needs normalization at query and add.
self.index = faiss.IndexHNSWFlat(self.dim, 32) # M=32
# For better inner-product behavior, wrap with IndexNormalized... not required here
else:
raise Exception("HNSW disabled")
except Exception:
print("Falling back to IndexFlatIP (exact).")
self.index = faiss.IndexFlatIP(self.dim)
# ensure index is on CPU (default). ntotal will be 0 initially.
self.ntotal = int(self.index.ntotal)
def add(self, vecs: np.ndarray):
# vecs: (n,d) float32
if vecs.ndim == 1:
vecs = vecs.reshape(1, -1)
# normalize for cosine (IP)
norms = np.linalg.norm(vecs, axis=1, keepdims=True)
norms[norms==0] = 1.0
vecs = vecs / norms
with self.lock:
start_id = self.ntotal
if HAVE_FAISS:
# FAISS expects float32
vecs = vecs.astype(np.float32)
try:
self.index.add(vecs)
except Exception as e:
# if index was incompatible (e.g., dimension mismatch), recreate and re-add not possible here
print("Error adding to FAISS index:", e)
raise
self.ntotal = int(self.index.ntotal)
else:
# store each vector row-wise
if len(self.vectors) == 0:
self.vectors.append(vecs.copy())
else:
self.vectors.append(vecs.copy())
self.ntotal += vecs.shape[0]
ids = list(range(start_id, start_id + vecs.shape[0]))
return ids
def search(self, qvec: np.ndarray, k: int = 5):
# qvec: (1,d) float32
# returns distances, indices arrays
q = qvec.astype(np.float32)
norms = np.linalg.norm(q, axis=1, keepdims=True)
norms[norms==0] = 1.0
q = q / norms
with self.lock:
if HAVE_FAISS:
try:
D, I = self.index.search(q, k)
return D, I
except Exception as e:
print("FAISS search error:", e)
return np.array([[]]), np.array([[]])
else:
if self.ntotal == 0:
return np.array([[]]), np.array([[]])
V = np.vstack(self.vectors) if len(self.vectors) > 1 else self.vectors[0]
scores = (V @ q.T).squeeze(-1) # inner product
idx = np.argsort(-scores)[:k]
return scores[idx].reshape(1, -1), idx.reshape(1, -1)
def save(self):
if HAVE_FAISS:
try:
faiss.write_index(self.index, self.index_path)
print(f"FAISS index saved to {self.index_path}")
except Exception as e:
print("Failed to save FAISS index:", e)
# ---------- Indexer logic ----------
class Indexer:
def __init__(self, data_dir: str, db_path: str, model_name: str, chunk_tokens: int, batch_size: int, overlap_tokens: int):
self.data_dir = Path(data_dir)
self.db_path = db_path
self.conn = sqlite3.connect(db_path, check_same_thread=False)
init_db(self.conn)
print(f"Inicializando modelo SentenceTransformer: {model_name}")
self.model = SentenceTransformer(model_name)
self.dim = self.model.get_sentence_embedding_dimension()
print(f"Modelo cargado. Dimensión de embeddings: {self.dim}")
self.index = VectorIndex(self.dim)
self.chunk_tokens = chunk_tokens
self.batch_size = batch_size
self.overlap_tokens = overlap_tokens
self.lock = threading.Lock()
def scan_and_index(self):
print(f"[{datetime.now().isoformat()}] Escaneando carpeta: {self.data_dir}")
to_process = []
for root, _, files in os.walk(self.data_dir):
for fn in files:
p = Path(root) / fn
try:
st = p.stat()
except Exception:
continue
cur = self.conn.execute("SELECT mtime FROM files WHERE path=?", (str(p),)).fetchone()
if cur is None or cur[0] < st.st_mtime:
to_process.append((p, st))
print(f"Detectado cambio: {p} (mtime={st.st_mtime})")
if not to_process:
print("No hay archivos nuevos o modificados para indexar.")
for p, st in to_process:
print("Indexando:", p)
text = extract_text_from_file(p)
if not text or not text.strip():
print(f"Archivo vacío o no legible: {p}")
continue
chunks = chunk_text(text, self.chunk_tokens, self.overlap_tokens)
print(f"Extraído {len(chunks)} chunks (tokens por chunk≈{self.chunk_tokens}, overlap={self.overlap_tokens})")
# ensure file row
cur = self.conn.cursor()
cur.execute("INSERT OR REPLACE INTO files(path, mtime, size) VALUES (?,?,?)",
(str(p), st.st_mtime, st.st_size))
self.conn.commit()
file_id = self.conn.execute("SELECT id FROM files WHERE path=?", (str(p),)).fetchone()[0]
# batch embeddings
for i in range(0, len(chunks), self.batch_size):
batch = chunks[i:i+self.batch_size]
batch_no = i // self.batch_size + 1
print(f"Procesando batch {batch_no}: {len(batch)} elementos (chunks {i}..{i+len(batch)-1})")
embs = self.model.encode(batch, convert_to_numpy=True, show_progress_bar=False)
if embs.dtype != np.float32:
embs = embs.astype(np.float32)
ids = self.index.add(embs)
print(f"Añadidos {len(ids)} vectores, ids {ids[0]}–{ids[-1]} (ntotal={self.index.ntotal})")
now = time.time()
# insert chunk rows mapping vector ids
cur = self.conn.cursor()
for j, vid in enumerate(ids):
snippet = batch[j][:600] # doble tamaño ~ 600 chars
cur.execute("INSERT INTO chunks(file_id, chunk_index, text_snippet, vector_id, created_at) VALUES (?,?,?,?,?)",
(file_id, i + j, snippet, int(vid), now))
self.conn.commit()
print(f"Guardados metadatos en DB para file_id={file_id} (batch {batch_no})")
# persist index if faiss
self.index.save()
print(f"[{datetime.now().isoformat()}] Indexado completado: archivos procesados {len(to_process)}")
def query(self, q: str, k: int = 5):
print(f"Buscando: '{q}' (k={k})")
emb = self.model.encode([q], convert_to_numpy=True)
if emb.dtype != np.float32:
emb = emb.astype(np.float32)
D, I = self.index.search(emb, k)
results = []
if I.size == 0:
print("No hay vectores en el índice o búsqueda fallida.")
return results
for score, idx in zip(D[0], I[0]):
if int(idx) < 0:
continue
row = self.conn.execute(
"SELECT files.path, chunks.text_snippet FROM chunks JOIN files ON chunks.file_id=files.id WHERE chunks.vector_id=?",
(int(idx),)
).fetchone()
if row:
results.append({"score": float(score), "path": row[0], "snippet": row[1]})
print(f"Encontrados {len(results)} resultados")
return results
# ---------- Flask App ----------
def create_app(data_dir, db_path, model_name, chunk_tokens, batch_size, overlap_tokens):
app = Flask(__name__)
idx = Indexer(data_dir, db_path, model_name, chunk_tokens, batch_size, overlap_tokens)
@app.route("/")
def home():
return render_template_string(HTML, data_dir=str(data_dir), msg=None, results=None)
@app.route("/index", methods=["POST"])
def do_index():
t = threading.Thread(target=idx.scan_and_index, daemon=True)
t.start()
return render_template_string(HTML, data_dir=str(data_dir), msg="Indexing started", results=None)
@app.route("/search")
def search():
q = request.args.get("q", "")
k = int(request.args.get("k", 5))
res = idx.query(q, k)
return render_template_string(HTML, data_dir=str(data_dir), msg=None, results=res)
return app
# ---------- CLI ----------
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data_dir", required=True, help="Folder to index")
parser.add_argument("--db", default=DEFAULT_DB, help="SQLite DB path")
parser.add_argument("--model", default=MODEL_NAME, help="SentenceTransformer model name")
parser.add_argument("--chunk_tokens", type=int, default=CHUNK_TOKENS)
parser.add_argument("--overlap_tokens", type=int, default=CHUNK_OVERLAP, help="Tokens overlap between chunks")
parser.add_argument("--batch_size", type=int, default=BATCH_SIZE)
parser.add_argument("--port", type=int, default=5000)
args = parser.parse_args()
app = create_app(args.data_dir, args.db, args.model, args.chunk_tokens, args.batch_size, args.overlap_tokens)
print(f"Inicializando app en 0.0.0.0:{args.port} (data_dir={args.data_dir})")
app.run(host="0.0.0.0", port=args.port)
Hay que instalar las dependencias necesarias, así como las variables de entorno para python. Luego, correr el siguiente comando:
python indexer.py --data_dir "C:\ruta\a\tus\archivos\" --port 5000
Una vez monta el servidor flask, en nuestro navegador favorito buscamos la siguiente url:
http://127.0.0.1:5000/
Ya en la interfaz de usuario, debemos darle al botón Run indexing
| interfaz de usuario y backend con proceso de indexado en curso |
En ese momento comienza a indexar los datos y a desarrollar todo el proceso de creación de la base de datos. Los datos que contenga la carpeta objetivo de nuestro índice. Este proceso puede tardar mas o menos en función de los datos a indexar. Yo mantuve el perfil bajo en el sentido de no indexar mucha información, limitándome al maravilloso archivo disponible para la formación de cualquier persona en https://www.marxists.org/ . Además, si arrastras mas archivos a esta carpeta seran indexados de forma automática y añadidos a la base de datos existente sin tener que reindexarlo todo de nuevo cuando hagas correr el script desde el principio.
Para disponer de la información descargué el sitio web en peso mediante wget en GNU/Linux.
torify wget -r -nH --user-agent="Mozilla/5.0" --wait=2 --random-wait https://your.target.goes.here
(este comando usará tor para acceder desde un nodo de salida de la susodicha red, Descargará los datos de la página web objeto de nuestra investigación de forma recursiva, sin descargar los headers, como si viniéramos desde Mozilla, esperan dos segundos y algo de tiempo random para evitar ser clasificados como robots y poco mas)
Luego, de vuelta a windows, extraje exclusivamente los archivos .pdf de la carpeta con todo el contenido mediante el archivo pdfrescue.bat , también aquí compartido cuyo código es el que sigue:
@echo off
setlocal enabledelayedexpansion
set "src=C:\ruta\a\tu\carpeta\porEjemplo_marxists.org"
set "dst=C:\ruta\a\tu\carpeta\porEjemplo_archivoSoloPdf"
if not exist "%dst%" mkdir "%dst%"
set /a copied=0
set /a skipped=0
for /R "%src%" %%G in (*.pdf) do (
rem ruta completa de origen
set "full=%%~G"
rem ruta relativa: quita el prefijo %src%\
set "rel=!full:%src%\=!"
rem carpeta destino completa donde recrear la estructura
for %%I in ("!rel!") do set "reldir=%%~dpI"
set "reldir=!reldir:~0,-1!" rem quitar la barra final
if not "!reldir!"=="" (
set "targetdir=%dst%\!reldir!"
) else (
set "targetdir=%dst%"
)
if not exist "!targetdir!" mkdir "!targetdir!"
rem nombre de archivo
set "name=%%~nxG"
set "targetfile=!targetdir!\!name!"
rem si ya existe, agregar sufijo numerico
if exist "!targetfile!" (
set /a n=1
:loopname
set "tname=!targetdir!\%%~nG_!n!%%~xG"
if exist "!tname!" (
set /a n+=1
goto loopname
)
copy /Y "%%~G" "!tname!" >nul 2>&1 && (set /a copied+=1) || (set /a skipped+=1)
) else (
copy /Y "%%~G" "!targetfile!" >nul 2>&1 && (set /a copied+=1) || (set /a skipped+=1)
)
)
echo Archivos copiados: %copied%
echo Archivos con error: %skipped%
endlocal
A modo orientativo, para indexar una carpeta con casi 1 GB de información en pdf, la base de datos .db creada ocupó 146 MB, y el archivo de índice faiss unos 387 MB aproximadamente.
Ahora disponemos pues de los datos indexados en la base de datos vectorial y podemos comenzar a ver como funciona el asunto: Buscamos en la terminal y nos devuelve, a modo de puntuación, un score de similitud en lo que tiene indexado, dejando un enlace al archivo y un extracto de la zona del documento señalada, que te puede permitir localizarlo en el documento mediante una sencilla búsqueda literal.
| búsqueda de un termino y resultados por puntuacion de cercanía de indice de vectores. |
El resultado, aun siendo pronto para emitir un juicio, es espectacular. Indudablemente depende mucho del o los términos que emplees en tu búsqueda, pero los resultados que arroja son bastante certeros si uno quiere hacer una búsqueda semántica sobre multiples archivos locales agrupados en una carpeta.
| resultados de busqueda por similitud |
Las bases de datos vectoriales parecen ser uno de los pilares de la nueva generación de buscadores 3.0 (los sabelotodo de esta época, los oráculos de una generación, que ahora ya no dan enlaces para que tu mismo emitas un juicio sino que categóricamente afirman como si fueran autoridad, y sin duda lo son…) consiste precisamente en esta estructura lógica, esta arquitectura en la cual se crea una capa que fagocita el contenido para ubicarlo en un sistema de vectores (¡de 384 dimensiones en este ejemplo!), la clave de toda la magia. El espacio Euclidiano expresando su máximo potencial. Ahora comprendo la carrera tecnológica para monopolizar la producción de tarjetas gráficas. El pipeline rendering es vital para esta industria.
Explicación sobre vectores y su uso en este código (concisa) [duck.ai]
Qué es un embedding: un embedding es un vector numérico (aquí de dimensión 384) que representa el significado de un texto en un espacio continuo; textos semánticamente parecidos producen vectores cercanos.
Normalización y similitud: el código normaliza vectores y usa producto interno (inner product) como medida; con vectores normalizados, el producto interno equivale a la similitud coseno.
Dimensión (EMBED_DIM): define cuántas coordenadas tiene cada vector (aquí el modelo devuelve 384).
Almacenamiento:
Con FAISS: se guarda en un índice optimizado para búsquedas rápidas (IndexFlatIP para inner-product). FAISS permite búsquedas muy rápidas incluso con millones de vectores.
Fallback: si no hay FAISS, se guarda una lista de arrays y se hace búsqueda por fuerza bruta (matmul + ordenación).
Mapeo vector ↔ chunk: cada vector recibe un vector_id (aquí el índice del vector en el índice). En la tabla chunks se guarda vector_id para luego recuperar el archivo y snippet asociado a un resultado.
Búsqueda: se toma la consulta, se convierte a vector, se normaliza y se buscan los k vectores con mayor producto interno (más cercanos). Luego, por cada vector_id devuelto se consulta SQLite para obtener el snippet y la ruta del archivo.
Ventajas: permite búsquedas semánticas (no sólo coincidencia de palabras). Limitaciones: requiere almacenamiento para vectores, coste de embeddings y precisión depende del modelo.
| búsqueda convencional en un documento individual |
Los resultados mas relevantes aparecen abajo del todo. Una vez accedes al documento puedes buscar literalmente alguna de las palabras o frases dentro del documento e ir a la parte que mas te pueda interesar. Ahora ya todo depende de lo que busques, como siempre es.
¿Te interesa lo que ves? Si quieres saber mas acerca de esta experiencia, contacta:
habitainer (arroba) gmail (punto) com
Creative Commons Licensed Work.
Mayo 2026 – Luís Rodríguez Alonso.