import os import numpy as np from elasticsearch import Elasticsearch, helpers from sentence_transformers import SentenceTransformer INDEX_NAME = os.environ.get("ELASTICSEARCH_INDEX", "support-docs-demo") QUERY = "password reset instructions" CORPUS = [ { "doc_id": "doc-001", "title": "Reset a forgotten password", "text": "Reset a forgotten password from account settings and confirm the email link.", }, { "doc_id": "doc-002", "title": "Create an invoice receipt", "text": "Create a billing invoice and download a PDF receipt.", }, { "doc_id": "doc-003", "title": "Rotate API tokens", "text": "Rotate API tokens before sharing a new integration with a teammate.", }, { "doc_id": "doc-004", "title": "Store semantic vectors", "text": "Elasticsearch stores Sentence Transformers embeddings for vector search.", }, ] def elasticsearch_client() -> Elasticsearch: api_key = os.environ.get("ELASTICSEARCH_API_KEY") ca_certs = os.environ.get("ELASTICSEARCH_CA_CERTS") options = {} if api_key: options["api_key"] = api_key if ca_certs: options["ca_certs"] = ca_certs return Elasticsearch( os.environ.get("ELASTICSEARCH_URL", "http://localhost:9200"), request_timeout=60, **options, ) client = elasticsearch_client() model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") texts = [item["text"] for item in CORPUS] document_embeddings = model.encode_document( texts, normalize_embeddings=True, convert_to_numpy=True, show_progress_bar=False, ) document_embeddings = np.asarray(document_embeddings, dtype="float32") dimension = document_embeddings.shape[1] if client.indices.exists(index=INDEX_NAME): client.indices.delete(index=INDEX_NAME) client.indices.create( index=INDEX_NAME, mappings={ "properties": { "doc_id": {"type": "keyword"}, "title": {"type": "text"}, "text": {"type": "text"}, "embedding": { "type": "dense_vector", "dims": dimension, "index": True, "similarity": "cosine", }, } }, ) actions = [] for item, embedding in zip(CORPUS, document_embeddings): actions.append( { "_op_type": "index", "_index": INDEX_NAME, "_id": item["doc_id"], "_source": {**item, "embedding": embedding.tolist()}, } ) indexed_count, errors = helpers.bulk(client, actions) if errors: raise SystemExit(errors) client.indices.refresh(index=INDEX_NAME) query_embedding = model.encode_query( QUERY, normalize_embeddings=True, convert_to_numpy=True, show_progress_bar=False, ).astype("float32") response = client.search( index=INDEX_NAME, knn={ "field": "embedding", "query_vector": query_embedding.tolist(), "k": 2, "num_candidates": 4, }, source=["doc_id", "title", "text"], ) hits = response["hits"]["hits"] stored_count = client.count(index=INDEX_NAME)["count"] print(f"index: {INDEX_NAME}") print(f"embedding dimension: {dimension}") print(f"indexed documents: {indexed_count}") print(f"stored documents: {stored_count}") print(f"query: {QUERY}") print("top matches:") for rank, hit in enumerate(hits, start=1): source = hit["_source"] print( f"{rank}. {source['doc_id']} score={hit['_score']:.4f} " f"title={source['title']}" ) if hits[0]["_source"]["doc_id"] != "doc-001": raise SystemExit(f"unexpected top match: {hits[0]['_source']['doc_id']}") print("verification: PASS query returned the password reset document")