import math import re from collections import Counter import numpy as np from sentence_transformers import SentenceTransformer, util corpus = [ { "id": "doc-001", "text": "Dense embeddings retrieve answers that use different words from the question.", }, { "id": "doc-002", "text": "Sparse keyword scores keep exact terms such as refund, invoice, and policy visible.", }, { "id": "doc-003", "text": "Hybrid search fuses semantic vectors with keyword retrieval for refund policy answers.", }, { "id": "doc-004", "text": "Cross-encoders rerank a short candidate list after the first retrieval stage.", }, { "id": "doc-005", "text": "FAISS indexes dense vectors for local nearest-neighbor search.", }, ] query = "semantic keyword retrieval for refund policy" def tokenize(text): return re.findall(r"[a-z0-9]+", text.lower()) def sparse_scores(query_text, documents): query_terms = Counter(tokenize(query_text)) document_terms = [Counter(tokenize(item["text"])) for item in documents] document_count = len(documents) document_frequency = Counter() for terms in document_terms: document_frequency.update(terms.keys()) scores = [] for terms in document_terms: score = 0.0 for term, query_count in query_terms.items(): if term not in terms: continue idf = math.log((document_count + 1) / (document_frequency[term] + 1)) + 1 score += min(query_count, terms[term]) * idf scores.append(score) return np.array(scores) def ranked_results(scores): order = np.argsort(scores)[::-1] return [(int(index), float(scores[index])) for index in order if scores[index] > 0] def reciprocal_rank_fusion(dense_hits, sparse_hits, k=60): fused = Counter() dense_rank = {} sparse_rank = {} for rank, (index, _) in enumerate(dense_hits, start=1): fused[index] += 1 / (k + rank) dense_rank[index] = rank for rank, (index, _) in enumerate(sparse_hits, start=1): fused[index] += 1 / (k + rank) sparse_rank[index] = rank return [ (index, score, dense_rank.get(index, "-"), sparse_rank.get(index, "-")) for index, score in fused.most_common() ] model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") documents = [item["text"] for item in corpus] document_embeddings = model.encode_document( documents, normalize_embeddings=True, convert_to_tensor=True, show_progress_bar=False, ) query_embedding = model.encode_query( query, normalize_embeddings=True, convert_to_tensor=True, show_progress_bar=False, ) dense_scores = util.dot_score(query_embedding, document_embeddings)[0].cpu().numpy() sparse_scores_array = sparse_scores(query, corpus) dense_hits = ranked_results(dense_scores)[:3] sparse_hits = ranked_results(sparse_scores_array)[:3] fused_hits = reciprocal_rank_fusion(dense_hits, sparse_hits)[:3] print(f"query: {query}") print("dense hits:") for rank, (index, score) in enumerate(dense_hits, start=1): record = corpus[index] print(f"{rank}. {record['id']} score={score:.4f} text={record['text']}") print("sparse hits:") for rank, (index, score) in enumerate(sparse_hits, start=1): record = corpus[index] print(f"{rank}. {record['id']} score={score:.4f} text={record['text']}") print("fused ranking:") for rank, (index, score, dense_rank, sparse_rank) in enumerate(fused_hits, start=1): record = corpus[index] print( f"{rank}. {record['id']} rrf={score:.4f} " f"dense_rank={dense_rank} sparse_rank={sparse_rank}" ) top_record = corpus[fused_hits[0][0]] if top_record["id"] != "doc-003": raise SystemExit(f"unexpected top fused result: {top_record['id']}") print("verification: PASS hybrid ranking keeps semantic and keyword evidence together")