29 may 2026

VectorWords - Búsqueda semántica sobre múltiples archivos en una carpeta

mi primera base de datos vetorial   :)

Vivimos tiempos interesantes en cuanto al estado de la técnica en el terreno de las tecnologías de la información. Con esa manía capitalista de convertir todo en mercancía, ciertos avances técnicos se engalanan en entidades abstractas pseudomágicas a la vez que se ofusca su verdadero funcionamiento interior. Y la inteligencia artificial no deja de sufrir esta tergiversación de la realidad.

El caso es que la única manera que encuentro de desmitificar y bajar tierra las ideas preconcebidas que podamos tener sobre una materia, y especialmente sobre su aplicación práctica, es pasar a la acción, que en este caso consiste en hacer correr algo de código en mi máquina local. Así que tras una corta búsqueda sobre bases de datos vectoriales, un asombro infinito ante las posibilidades que Euclides aró en la mente humana y sus aplicaciones en la arquitectura de la información, ordené codificar en python un script que se dedica a:


1.- Buscar archivos pdf (y de texto) en una determinada carpeta, extrayendo el texto.


2.- Hacer chunks (o cachos) de texto con solape de 50 tokenes.


3.- Indexar vectores con FAISS (Facebook AI Similarity Search), que es una biblioteca de código abierto creada por Meta (antes Facebook) para buscar y agrupar vectores de alta dimensión rápidamente.


4.- Guardar los metadatos y el mapa de vectores en una base de datos, dando persistencia al trabajo.


5.- Montar un servidor local para accionar el indexado de datos y realizar búsquedas (interfaz de usuario).


El archivo indexer.py consta del siguiente código



# indexer.py

"""

Minimal local vector indexer + tiny Flask UI (modificado).


Minimal local vector indexer + tiny Flask UI.

- Walks a folder, extracts text (plain + PDF),

- Chunks text with overlap (50 tokens default), embeds with SentenceTransformers (local model),

- Indexes vectors with FAISS / FAISS HNSW (or fallback brute-force),

- Persists metadata and vector-id mapping in SQLite, PRAGMA WAL for better concurrency.

- Provides a simple web UI for indexing and search.


Usage:

python indexer.py --data_dir "C:\path\to\folder" --port 5000

"""

import os

import sqlite3

import argparse

import threading

import time

from pathlib import Path

from datetime import datetime


from flask import Flask, request, render_template_string

import numpy as np


# Try import faiss, otherwise set fallback

try:

import faiss

HAVE_FAISS = True

except Exception:

faiss = None

HAVE_FAISS = False


from sentence_transformers import SentenceTransformer

from pdfminer.high_level import extract_text as pdf_extract

import magic

import tiktoken


# ---------- Config defaults ----------

MODEL_NAME = "all-MiniLM-L6-v2" # compact local model (384d)

EMBED_DIM = 384

CHUNK_TOKENS = 256

CHUNK_OVERLAP = 50 # tokens of overlap between chunks

BATCH_SIZE = 64


# DB / index files

DEFAULT_DB = "vector_store_pdf.db"

DEFAULT_INDEX = "faiss.index_pdf" # used only if faiss present


# Simple HTML (file links and longer snippet)

HTML = """<!doctype html>

<title>Local Vector Indexer</title>

<h2>Folder: {{data_dir}}</h2>

<form action="/index" method="post"><button type="submit">Run indexing</button></form>

<form action="/search" method="get">

Query: <input name="q" size="60"/>

K: <input name="k" value="50" size="2"/>

<button type="submit">Search</button>

</form>

{% if msg %}<p><b>{{msg}}</b></p>{% endif %}

{% if results %}

<h3>Results</h3>

<ul>

{% for r in results %}

<li><b>score:</b>{{"%.4f"|format(r.score)}} — <b>file:</b><a href="file://{{r.path}}" target="_blank">{{r.path}}</a> — {{r.snippet}}</li>

{% endfor %}

</ul>

{% endif %}

"""


# ---------- tokenization ----------

ENC = tiktoken.get_encoding("cl100k_base")

def num_tokens(text: str) -> int:

return len(ENC.encode(text))


def chunk_text(text: str, max_tokens: int = CHUNK_TOKENS, overlap: int = CHUNK_OVERLAP):

"""

Chunk text into pieces with approximate token counts, using token-counted words,

and applying a sliding-window overlap (in tokens).

"""

words = text.split()

chunks = []

cur = []

cur_tokens = 0

word_tokens = []

# Precompute token length per word (including trailing space)

for w in words:

t = num_tokens(w + " ")

word_tokens.append((w, t))

i = 0

n = len(word_tokens)

while i < n:

cur = []

cur_tokens = 0

j = i

while j < n:

w, t = word_tokens[j]

if cur_tokens + t > max_tokens and cur:

break

cur.append(w)

cur_tokens += t

j += 1

chunks.append(" ".join(cur))

if j >= n:

break

# advance i by chunk size minus overlap (in tokens) -> find new i index

# we need to move back overlap tokens worth of words

# compute how many words to step such that last `overlap` tokens are kept

if overlap <= 0:

i = j

else:

# walk back from j-1 adding tokens until we reach overlap

back = 0

k = j - 1

while k >= i and back < overlap:

back += word_tokens[k][1]

k -= 1

# next start is k+1 (one after where back accumulation stopped)

i = max(i + 1, k + 1) # ensure forward progress at least 1

return chunks


def extract_text_from_file(path: Path):

try:

m = magic.from_file(str(path), mime=True)

except Exception:

m = None

if m == "application/pdf" or path.suffix.lower() == ".pdf":

try:

return pdf_extract(str(path))

except Exception:

return ""

else:

try:

return path.read_text(encoding="utf8")

except Exception:

try:

return path.read_text(encoding="latin1")

except Exception:

return ""


# ---------- SQLite schema ----------

def init_db(conn: sqlite3.Connection):

cur = conn.cursor()

cur.execute("PRAGMA journal_mode=WAL;")

cur.execute("""

CREATE TABLE IF NOT EXISTS files (

id INTEGER PRIMARY KEY,

path TEXT UNIQUE,

mtime REAL,

size INTEGER

)""")

cur.execute("""

CREATE TABLE IF NOT EXISTS chunks (

id INTEGER PRIMARY KEY,

file_id INTEGER,

chunk_index INTEGER,

text_snippet TEXT,

vector_id INTEGER UNIQUE,

created_at REAL,

FOREIGN KEY(file_id) REFERENCES files(id)

)""")

cur.execute("""

CREATE TABLE IF NOT EXISTS meta (

k TEXT PRIMARY KEY,

v TEXT

)""")

conn.commit()


# ---------- Index wrapper (FAISS or fallback) ----------

class VectorIndex:

def __init__(self, dim: int, index_path: str = DEFAULT_INDEX, faiss_hnsw: bool = True):

self.dim = dim

self.index_path = index_path

self.lock = threading.Lock()

self.faiss_hnsw = faiss_hnsw

if HAVE_FAISS:

if os.path.exists(index_path):

try:

print(f"Loading FAISS index from {index_path}")

self.index = faiss.read_index(index_path)

except Exception as e:

print("Failed to read FAISS index, recreating. Error:", e)

self._create_faiss_index()

else:

self._create_faiss_index()

self.ntotal = int(self.index.ntotal)

else:

# fallback: store vectors in-memory and brute-force search

self.vectors = [] # list of np arrays

self.ntotal = 0


def _create_faiss_index(self):

# prefer HNSW (approx, fast) for scalable similarity; fallback to IndexFlatIP

try:

if self.faiss_hnsw:

print("Creating FAISS HNSW index (IndexHNSWFlat, inner-product).")

# HNSW on inner product needs normalization at query and add.

self.index = faiss.IndexHNSWFlat(self.dim, 32) # M=32

# For better inner-product behavior, wrap with IndexNormalized... not required here

else:

raise Exception("HNSW disabled")

except Exception:

print("Falling back to IndexFlatIP (exact).")

self.index = faiss.IndexFlatIP(self.dim)

# ensure index is on CPU (default). ntotal will be 0 initially.

self.ntotal = int(self.index.ntotal)


def add(self, vecs: np.ndarray):

# vecs: (n,d) float32

if vecs.ndim == 1:

vecs = vecs.reshape(1, -1)

# normalize for cosine (IP)

norms = np.linalg.norm(vecs, axis=1, keepdims=True)

norms[norms==0] = 1.0

vecs = vecs / norms

with self.lock:

start_id = self.ntotal

if HAVE_FAISS:

# FAISS expects float32

vecs = vecs.astype(np.float32)

try:

self.index.add(vecs)

except Exception as e:

# if index was incompatible (e.g., dimension mismatch), recreate and re-add not possible here

print("Error adding to FAISS index:", e)

raise

self.ntotal = int(self.index.ntotal)

else:

# store each vector row-wise

if len(self.vectors) == 0:

self.vectors.append(vecs.copy())

else:

self.vectors.append(vecs.copy())

self.ntotal += vecs.shape[0]

ids = list(range(start_id, start_id + vecs.shape[0]))

return ids


def search(self, qvec: np.ndarray, k: int = 5):

# qvec: (1,d) float32

# returns distances, indices arrays

q = qvec.astype(np.float32)

norms = np.linalg.norm(q, axis=1, keepdims=True)

norms[norms==0] = 1.0

q = q / norms

with self.lock:

if HAVE_FAISS:

try:

D, I = self.index.search(q, k)

return D, I

except Exception as e:

print("FAISS search error:", e)

return np.array([[]]), np.array([[]])

else:

if self.ntotal == 0:

return np.array([[]]), np.array([[]])

V = np.vstack(self.vectors) if len(self.vectors) > 1 else self.vectors[0]

scores = (V @ q.T).squeeze(-1) # inner product

idx = np.argsort(-scores)[:k]

return scores[idx].reshape(1, -1), idx.reshape(1, -1)


def save(self):

if HAVE_FAISS:

try:

faiss.write_index(self.index, self.index_path)

print(f"FAISS index saved to {self.index_path}")

except Exception as e:

print("Failed to save FAISS index:", e)


# ---------- Indexer logic ----------

class Indexer:

def __init__(self, data_dir: str, db_path: str, model_name: str, chunk_tokens: int, batch_size: int, overlap_tokens: int):

self.data_dir = Path(data_dir)

self.db_path = db_path

self.conn = sqlite3.connect(db_path, check_same_thread=False)

init_db(self.conn)

print(f"Inicializando modelo SentenceTransformer: {model_name}")

self.model = SentenceTransformer(model_name)

self.dim = self.model.get_sentence_embedding_dimension()

print(f"Modelo cargado. Dimensión de embeddings: {self.dim}")

self.index = VectorIndex(self.dim)

self.chunk_tokens = chunk_tokens

self.batch_size = batch_size

self.overlap_tokens = overlap_tokens

self.lock = threading.Lock()


def scan_and_index(self):

print(f"[{datetime.now().isoformat()}] Escaneando carpeta: {self.data_dir}")

to_process = []

for root, _, files in os.walk(self.data_dir):

for fn in files:

p = Path(root) / fn

try:

st = p.stat()

except Exception:

continue

cur = self.conn.execute("SELECT mtime FROM files WHERE path=?", (str(p),)).fetchone()

if cur is None or cur[0] < st.st_mtime:

to_process.append((p, st))

print(f"Detectado cambio: {p} (mtime={st.st_mtime})")

if not to_process:

print("No hay archivos nuevos o modificados para indexar.")

for p, st in to_process:

print("Indexando:", p)

text = extract_text_from_file(p)

if not text or not text.strip():

print(f"Archivo vacío o no legible: {p}")

continue

chunks = chunk_text(text, self.chunk_tokens, self.overlap_tokens)

print(f"Extraído {len(chunks)} chunks (tokens por chunk≈{self.chunk_tokens}, overlap={self.overlap_tokens})")

# ensure file row

cur = self.conn.cursor()

cur.execute("INSERT OR REPLACE INTO files(path, mtime, size) VALUES (?,?,?)",

(str(p), st.st_mtime, st.st_size))

self.conn.commit()

file_id = self.conn.execute("SELECT id FROM files WHERE path=?", (str(p),)).fetchone()[0]

# batch embeddings

for i in range(0, len(chunks), self.batch_size):

batch = chunks[i:i+self.batch_size]

batch_no = i // self.batch_size + 1

print(f"Procesando batch {batch_no}: {len(batch)} elementos (chunks {i}..{i+len(batch)-1})")

embs = self.model.encode(batch, convert_to_numpy=True, show_progress_bar=False)

if embs.dtype != np.float32:

embs = embs.astype(np.float32)

ids = self.index.add(embs)

print(f"Añadidos {len(ids)} vectores, ids {ids[0]}–{ids[-1]} (ntotal={self.index.ntotal})")

now = time.time()

# insert chunk rows mapping vector ids

cur = self.conn.cursor()

for j, vid in enumerate(ids):

snippet = batch[j][:600] # doble tamaño ~ 600 chars

cur.execute("INSERT INTO chunks(file_id, chunk_index, text_snippet, vector_id, created_at) VALUES (?,?,?,?,?)",

(file_id, i + j, snippet, int(vid), now))

self.conn.commit()

print(f"Guardados metadatos en DB para file_id={file_id} (batch {batch_no})")

# persist index if faiss

self.index.save()

print(f"[{datetime.now().isoformat()}] Indexado completado: archivos procesados {len(to_process)}")


def query(self, q: str, k: int = 5):

print(f"Buscando: '{q}' (k={k})")

emb = self.model.encode([q], convert_to_numpy=True)

if emb.dtype != np.float32:

emb = emb.astype(np.float32)

D, I = self.index.search(emb, k)

results = []

if I.size == 0:

print("No hay vectores en el índice o búsqueda fallida.")

return results

for score, idx in zip(D[0], I[0]):

if int(idx) < 0:

continue

row = self.conn.execute(

"SELECT files.path, chunks.text_snippet FROM chunks JOIN files ON chunks.file_id=files.id WHERE chunks.vector_id=?",

(int(idx),)

).fetchone()

if row:

results.append({"score": float(score), "path": row[0], "snippet": row[1]})

print(f"Encontrados {len(results)} resultados")

return results


# ---------- Flask App ----------

def create_app(data_dir, db_path, model_name, chunk_tokens, batch_size, overlap_tokens):

app = Flask(__name__)

idx = Indexer(data_dir, db_path, model_name, chunk_tokens, batch_size, overlap_tokens)


@app.route("/")

def home():

return render_template_string(HTML, data_dir=str(data_dir), msg=None, results=None)


@app.route("/index", methods=["POST"])

def do_index():

t = threading.Thread(target=idx.scan_and_index, daemon=True)

t.start()

return render_template_string(HTML, data_dir=str(data_dir), msg="Indexing started", results=None)


@app.route("/search")

def search():

q = request.args.get("q", "")

k = int(request.args.get("k", 5))

res = idx.query(q, k)

return render_template_string(HTML, data_dir=str(data_dir), msg=None, results=res)


return app


# ---------- CLI ----------

if __name__ == "__main__":

parser = argparse.ArgumentParser()

parser.add_argument("--data_dir", required=True, help="Folder to index")

parser.add_argument("--db", default=DEFAULT_DB, help="SQLite DB path")

parser.add_argument("--model", default=MODEL_NAME, help="SentenceTransformer model name")

parser.add_argument("--chunk_tokens", type=int, default=CHUNK_TOKENS)

parser.add_argument("--overlap_tokens", type=int, default=CHUNK_OVERLAP, help="Tokens overlap between chunks")

parser.add_argument("--batch_size", type=int, default=BATCH_SIZE)

parser.add_argument("--port", type=int, default=5000)

args = parser.parse_args()


app = create_app(args.data_dir, args.db, args.model, args.chunk_tokens, args.batch_size, args.overlap_tokens)

print(f"Inicializando app en 0.0.0.0:{args.port} (data_dir={args.data_dir})")

app.run(host="0.0.0.0", port=args.port)



Hay que instalar las dependencias necesarias, así como las variables de entorno para python. Luego, correr el siguiente comando:


python indexer.py --data_dir "C:\ruta\a\tus\archivos\" --port 5000


Una vez monta el servidor flask, en nuestro navegador favorito buscamos la siguiente url:


http://127.0.0.1:5000/


Ya en la interfaz de usuario, debemos darle al botón Run indexing


interfaz de usuario y backend con proceso de indexado en curso


En ese momento comienza a indexar los datos y a desarrollar todo el proceso de creación de la base de datos. Los datos que contenga la carpeta objetivo de nuestro índice. Este proceso puede tardar mas o menos en función de los datos a indexar. Yo mantuve el perfil bajo en el sentido de no indexar mucha información, limitándome al maravilloso archivo disponible para la formación de cualquier persona en https://www.marxists.org/ . Además, si arrastras mas archivos a esta carpeta seran indexados de forma automática y añadidos a la base de datos existente sin tener que reindexarlo todo de nuevo cuando hagas correr el script desde el principio.


 

Para disponer de la información descargué el sitio web en peso mediante wget en GNU/Linux.


torify wget -r -nH --user-agent="Mozilla/5.0" --wait=2 --random-wait https://your.target.goes.here

(este comando usará tor para acceder desde un nodo de salida de la susodicha red, Descargará los datos de la página web objeto de nuestra investigación de forma recursiva, sin descargar los headers, como si viniéramos desde Mozilla, esperan dos segundos y algo de tiempo random para evitar ser clasificados como robots y poco mas)


Luego, de vuelta a windows, extraje exclusivamente los archivos .pdf de la carpeta con todo el contenido mediante el archivo pdfrescue.bat , también aquí compartido cuyo código es el que sigue:



@echo off

setlocal enabledelayedexpansion


set "src=C:\ruta\a\tu\carpeta\porEjemplo_marxists.org"

set "dst=C:\ruta\a\tu\carpeta\porEjemplo_archivoSoloPdf"


if not exist "%dst%" mkdir "%dst%"


set /a copied=0

set /a skipped=0


for /R "%src%" %%G in (*.pdf) do (

rem ruta completa de origen

set "full=%%~G"

rem ruta relativa: quita el prefijo %src%\

set "rel=!full:%src%\=!"

rem carpeta destino completa donde recrear la estructura

for %%I in ("!rel!") do set "reldir=%%~dpI"

set "reldir=!reldir:~0,-1!" rem quitar la barra final

if not "!reldir!"=="" (

set "targetdir=%dst%\!reldir!"

) else (

set "targetdir=%dst%"

)

if not exist "!targetdir!" mkdir "!targetdir!"

rem nombre de archivo

set "name=%%~nxG"

set "targetfile=!targetdir!\!name!"


rem si ya existe, agregar sufijo numerico

if exist "!targetfile!" (

set /a n=1

:loopname

set "tname=!targetdir!\%%~nG_!n!%%~xG"

if exist "!tname!" (

set /a n+=1

goto loopname

)

copy /Y "%%~G" "!tname!" >nul 2>&1 && (set /a copied+=1) || (set /a skipped+=1)

) else (

copy /Y "%%~G" "!targetfile!" >nul 2>&1 && (set /a copied+=1) || (set /a skipped+=1)

)

)


echo Archivos copiados: %copied%

echo Archivos con error: %skipped%


endlocal


A modo orientativo, para indexar una carpeta con casi 1 GB de información en pdf, la base de datos .db creada ocupó 146 MB, y el archivo de índice faiss unos 387 MB aproximadamente.

Ahora disponemos pues de los datos indexados en la base de datos vectorial y podemos comenzar a ver como funciona el asunto: Buscamos en la terminal y nos devuelve, a modo de puntuación, un score de similitud en lo que tiene indexado, dejando un enlace al archivo y un extracto de la zona del documento señalada, que te puede permitir localizarlo en el documento mediante una sencilla búsqueda literal. 

búsqueda de un termino y resultados por puntuacion de cercanía de indice de vectores.

El resultado, aun siendo pronto para emitir un juicio, es espectacular. Indudablemente depende mucho del o los términos que emplees en tu búsqueda, pero los resultados que arroja son bastante certeros si uno quiere hacer una búsqueda semántica sobre multiples archivos locales agrupados en una carpeta.

resultados de busqueda por similitud

Las bases de datos vectoriales parecen ser uno de los pilares de la nueva generación de buscadores 3.0 (los sabelotodo de esta época, los oráculos de una generación, que ahora ya no dan enlaces para que tu mismo emitas un juicio sino que categóricamente afirman como si fueran autoridad, y sin duda lo son…) consiste precisamente en esta estructura lógica, esta arquitectura en la cual se crea una capa que fagocita el contenido para ubicarlo en un sistema de vectores (¡de 384 dimensiones en este ejemplo!), la clave de toda la magia. El espacio Euclidiano expresando su máximo potencial. Ahora comprendo la carrera tecnológica para monopolizar la producción de tarjetas gráficas. El pipeline rendering es vital para esta industria.

 

Explicación sobre vectores y su uso en este código (concisa) [duck.ai]

  • Qué es un embedding: un embedding es un vector numérico (aquí de dimensión 384) que representa el significado de un texto en un espacio continuo; textos semánticamente parecidos producen vectores cercanos.

  • Normalización y similitud: el código normaliza vectores y usa producto interno (inner product) como medida; con vectores normalizados, el producto interno equivale a la similitud coseno.

  • Dimensión (EMBED_DIM): define cuántas coordenadas tiene cada vector (aquí el modelo devuelve 384).

  • Almacenamiento:

    • Con FAISS: se guarda en un índice optimizado para búsquedas rápidas (IndexFlatIP para inner-product). FAISS permite búsquedas muy rápidas incluso con millones de vectores.

    • Fallback: si no hay FAISS, se guarda una lista de arrays y se hace búsqueda por fuerza bruta (matmul + ordenación).

  • Mapeo vector ↔ chunk: cada vector recibe un vector_id (aquí el índice del vector en el índice). En la tabla chunks se guarda vector_id para luego recuperar el archivo y snippet asociado a un resultado.

  • Búsqueda: se toma la consulta, se convierte a vector, se normaliza y se buscan los k vectores con mayor producto interno (más cercanos). Luego, por cada vector_id devuelto se consulta SQLite para obtener el snippet y la ruta del archivo.

  • Ventajas: permite búsquedas semánticas (no sólo coincidencia de palabras). Limitaciones: requiere almacenamiento para vectores, coste de embeddings y precisión depende del modelo. 

     

búsqueda convencional en un documento individual

Los resultados mas relevantes aparecen abajo del todo. Una vez accedes al documento puedes buscar literalmente alguna de las palabras o frases dentro del documento e ir a la parte que mas te pueda interesar. Ahora ya todo depende de lo que busques, como siempre es.

¿Te interesa lo que ves? Si quieres saber mas acerca de esta experiencia, contacta:

habitainer (arroba) gmail (punto) com

Creative Commons Licensed Work.

Mayo 2026 – Luís Rodríguez Alonso.