EmbeddingGemma-300m保姆级教程手把手教你搭建语义搜索系统1. 引言从关键词匹配到语义理解你是否遇到过这样的困扰在公司的知识库里搜索“如何重置密码”却找不到那篇名为“账户安全与密码修改指南”的文档。传统的搜索系统依赖关键词匹配就像只认识字面意思的机器人——它不懂“重置密码”和“修改密码”其实是同一回事。这就是语义搜索要解决的问题。它让计算机理解文字背后的含义而不仅仅是表面的词汇。今天我要带你用EmbeddingGemma-300m这个轻量级但强大的模型从零开始搭建一个真正“懂你”的语义搜索系统。EmbeddingGemma-300m是谷歌推出的开源嵌入模型只有3亿参数却继承了构建Gemini系列模型的先进技术。它最大的魅力在于能在你的笔记本电脑上流畅运行不需要昂贵的GPU服务器。无论是为个人文档建立智能检索还是为小型企业构建客服知识库它都是一个绝佳的选择。在这篇教程里我不会只教你如何调用API而是带你完整走一遍从环境搭建到系统部署的全过程。你会学到如何将一段段文字转换成有意义的数字向量如何计算它们之间的“语义距离”以及如何构建一个实用的搜索服务。让我们开始吧。2. 环境准备与模型部署2.1 理解嵌入模型文本如何变成数字在开始动手之前我们先花几分钟理解核心概念。嵌入模型就像一个“语义翻译器”它把人类能理解的文字转换成计算机能处理的数字。想象一下你有一个包含300个维度的空间可以理解为有300个特征方向。模型会把每个词、每句话映射到这个空间中的一个点也就是一个300维的向量。神奇的是意思相近的文本在这个空间里的位置也很接近。举个例子“我喜欢机器学习” → 在空间的某个位置“我爱人工智能” → 在非常接近的位置“今天天气真好” → 在很远的位置通过计算这些点之间的距离通常是余弦相似度我们就能判断两段文字在语义上是否相似。这就是语义搜索的基础原理。2.2 三步完成模型部署现在我们来实际部署EmbeddingGemma-300m。整个过程非常简单只需要三个步骤。第一步安装OllamaOllama是一个本地运行大模型的工具它让模型部署变得像安装普通软件一样简单。macOS/Linux用户打开终端运行以下命令curl -fsSL https://ollama.com/install.sh | sh安装完成后Ollama会自动启动服务。Windows用户直接访问Ollama官网下载安装包双击运行即可。安装完成后打开终端或命令提示符输入ollama --version如果看到版本号说明安装成功。第二步拉取模型模型文件大约1.2GB下载需要一些时间。在终端中输入ollama pull embeddinggemma:300m你会看到下载进度条。这个过程可能需要几分钟到十几分钟取决于你的网络速度。可以先去泡杯茶回来时应该就下载完成了。第三步验证部署下载完成后检查模型是否可用ollama list你应该能看到类似这样的输出NAME ID SIZE MODIFIED embeddinggemma:300m abc123def456 1.2 GB 2 minutes ago重要提醒很多人会习惯性地输入ollama run embeddinggemma:300m来测试但这是错误的EmbeddingGemma是嵌入模型不是聊天模型它不支持直接对话。正确的使用方式是通过API调用我们马上就会讲到。3. 核心原理从文本到向量的魔法3.1 第一次API调用让文字“变身”现在模型已经准备好了让我们来第一次“施法”——把文字变成向量。打开终端输入以下命令确保Ollama服务正在运行curl -X POST http://localhost:11434/api/embeddings \ -H Content-Type: application/json \ -d { model: embeddinggemma:300m, prompt: 今天天气真好适合出去散步 }几秒钟后你会看到类似这样的响应{ embedding: [ 0.0231456, -0.0451289, 0.118347, -0.203491, 0.0567321, 0.189234, -0.102345, 0.0789123, -0.0456712, 0.123456, ... // 总共300个数字 ] }这一长串数字300个就是“今天天气真好适合出去散步”的向量表示。每个数字都代表了文本在某个语义维度上的“强度”或“特征”。3.2 用Python封装API调用在实际应用中我们通常用程序来调用。下面是一个简单的Python封装import requests import json class EmbeddingClient: def __init__(self, base_urlhttp://localhost:11434, modelembeddinggemma:300m): self.base_url base_url self.model model self.api_url f{base_url}/api/embeddings def get_embedding(self, text): 获取单段文本的向量表示 payload { model: self.model, prompt: text } try: response requests.post(self.api_url, jsonpayload, timeout30) response.raise_for_status() # 如果状态码不是200抛出异常 result response.json() return result[embedding] except requests.exceptions.RequestException as e: print(fAPI调用失败: {e}) return None except KeyError: print(响应格式错误未找到embedding字段) return None def get_embeddings_batch(self, texts): 批量获取文本向量注意Ollama API本身不支持批量这里用循环实现 embeddings [] for text in texts: embedding self.get_embedding(text) if embedding is not None: embeddings.append(embedding) else: embeddings.append(None) # 或者根据需求处理错误 return embeddings # 使用示例 if __name__ __main__: client EmbeddingClient() # 测试单个文本 text 人工智能正在改变世界 vector client.get_embedding(text) if vector: print(f文本: {text}) print(f向量长度: {len(vector)}) # 应该是300 print(f前5个值: {vector[:5]}) # 查看前几个值这个类封装了基本的API调用你可以直接在自己的项目中使用。3.3 理解向量的“语义空间”让我们做个小实验看看向量如何反映语义关系import numpy as np from numpy.linalg import norm def cosine_similarity(vec1, vec2): 计算两个向量的余弦相似度范围[-1, 1]越接近1越相似 # 转换为numpy数组方便计算 vec1 np.array(vec1) vec2 np.array(vec2) # 计算点积 dot_product np.dot(vec1, vec2) # 计算模长 norm1 norm(vec1) norm2 norm(vec2) # 避免除零错误 if norm1 0 or norm2 0: return 0.0 return dot_product / (norm1 * norm2) # 测试不同文本对的相似度 test_pairs [ (机器学习, 人工智能), # 高度相关 (苹果手机, iPhone), # 同义不同词 (编程学习, 烹饪技巧), # 完全不相关 (今天天气很好, 阳光明媚的日子), # 同义表达 ] client EmbeddingClient() print(语义相似度测试:) print( * 50) for text1, text2 in test_pairs: vec1 client.get_embedding(text1) vec2 client.get_embedding(text2) if vec1 and vec2: similarity cosine_similarity(vec1, vec2) print(f{text1} vs {text2}) print(f相似度: {similarity:.4f}) print(f解释: {高度相似 if similarity 0.7 else 中等相似 if similarity 0.4 else 不太相似}) print(- * 30)运行这段代码你会看到“机器学习”和“人工智能”相似度很高可能0.8以上“苹果手机”和“iPhone”相似度也很高“编程学习”和“烹饪技巧”相似度很低可能0.2以下“今天天气很好”和“阳光明媚的日子”相似度较高这就是语义搜索的核心通过向量距离找到语义相近的内容而不是仅仅匹配关键词。4. 实战构建语义搜索系统4.1 设计搜索系统架构现在我们来构建一个完整的语义搜索系统。系统的主要组件包括文档处理模块将原始文档转换为向量向量存储保存文档向量和元数据查询处理模块将用户查询转换为向量相似度计算模块找到最相关的文档结果排序模块按相关性排序返回对于小型系统我们可以用内存存储对于大型系统建议使用专门的向量数据库如Chroma、Pinecone等。4.2 实现基础搜索系统下面是一个完整的内存版语义搜索系统实现import numpy as np from typing import List, Dict, Tuple, Optional import json import time from dataclasses import dataclass from pathlib import Path dataclass class SearchResult: 搜索结果数据类 doc_id: str text: str similarity: float metadata: Dict None class SemanticSearchEngine: 语义搜索引擎核心类 def __init__(self, model_nameembeddinggemma:300m): self.model_name model_name self.documents [] # 存储原始文档 self.embeddings [] # 存储文档向量 self.metadata [] # 存储文档元数据 self.client EmbeddingClient(modelmodel_name) # 性能统计 self.stats { total_searches: 0, avg_search_time: 0, total_docs: 0 } def add_document(self, text: str, doc_id: Optional[str] None, metadata: Optional[Dict] None) - str: 添加单个文档到搜索库 if not text.strip(): raise ValueError(文档内容不能为空) # 生成文档ID如果未提供 if doc_id is None: doc_id fdoc_{len(self.documents)}_{int(time.time())} # 获取文档向量 print(f正在处理文档: {doc_id[:50]}...) vector self.client.get_embedding(text) if vector is None: raise RuntimeError(f无法获取文档{doc_id}的向量) # 存储文档信息 self.documents.append(text) self.embeddings.append(np.array(vector)) # 存储元数据 if metadata is None: metadata {} metadata.update({ id: doc_id, added_time: time.time(), text_length: len(text) }) self.metadata.append(metadata) self.stats[total_docs] 1 print(f文档添加成功: {doc_id}) return doc_id def add_documents_batch(self, texts: List[str], ids: Optional[List[str]] None, metadata_list: Optional[List[Dict]] None) - List[str]: 批量添加文档 if ids is None: ids [None] * len(texts) if metadata_list is None: metadata_list [None] * len(texts) added_ids [] for i, (text, doc_id, metadata) in enumerate(zip(texts, ids, metadata_list)): try: added_id self.add_document(text, doc_id, metadata) added_ids.append(added_id) except Exception as e: print(f添加文档{i}失败: {e}) added_ids.append(None) return added_ids def search(self, query: str, top_k: int 5, threshold: float 0.0) - List[SearchResult]: 执行语义搜索 start_time time.time() # 获取查询向量 query_vector self.client.get_embedding(query) if query_vector is None: raise RuntimeError(无法获取查询向量) query_vector np.array(query_vector) # 如果没有文档返回空结果 if len(self.embeddings) 0: return [] # 计算相似度 similarities [] for doc_vector in self.embeddings: # 计算余弦相似度 dot_product np.dot(query_vector, doc_vector) norm_query np.linalg.norm(query_vector) norm_doc np.linalg.norm(doc_vector) if norm_query 0 or norm_doc 0: similarity 0.0 else: similarity dot_product / (norm_query * norm_doc) similarities.append(similarity) # 获取top_k结果 top_indices np.argsort(similarities)[::-1][:top_k] # 构建结果 results [] for idx in top_indices: similarity similarities[idx] if similarity threshold: # 应用阈值过滤 result SearchResult( doc_idself.metadata[idx][id], textself.documents[idx], similaritysimilarity, metadataself.metadata[idx] ) results.append(result) # 更新统计信息 search_time time.time() - start_time self.stats[total_searches] 1 self.stats[avg_search_time] ( (self.stats[avg_search_time] * (self.stats[total_searches] - 1) search_time) / self.stats[total_searches] ) return results def save_index(self, filepath: str): 保存搜索索引到文件 data { documents: self.documents, embeddings: [vec.tolist() for vec in self.embeddings], metadata: self.metadata, stats: self.stats } with open(filepath, w, encodingutf-8) as f: json.dump(data, f, ensure_asciiFalse, indent2) print(f索引已保存到: {filepath}) def load_index(self, filepath: str): 从文件加载搜索索引 with open(filepath, r, encodingutf-8) as f: data json.load(f) self.documents data[documents] self.embeddings [np.array(vec) for vec in data[embeddings]] self.metadata data[metadata] self.stats data[stats] print(f索引已加载共{len(self.documents)}个文档) def get_stats(self) - Dict: 获取系统统计信息 return self.stats.copy() # 使用示例构建客服知识库搜索 def build_customer_service_kb(): 构建客服知识库示例 search_engine SemanticSearchEngine() # 添加常见问题文档 faq_documents [ { text: 如何重置账户密码请登录后进入安全设置页面点击修改密码按照提示操作即可。, id: faq_001, metadata: {category: 账户安全, views: 0} }, { text: 订单取消流程在订单页面找到待取消的订单点击取消按钮选择取消原因并提交。, id: faq_002, metadata: {category: 订单管理, views: 0} }, { text: 退款处理时间一般为3-7个工作日具体到账时间取决于银行处理速度。, id: faq_003, metadata: {category: 支付退款, views: 0} }, { text: 修改收货地址在个人中心-地址管理中添加或修改地址下单时选择即可。, id: faq_004, metadata: {category: 物流配送, views: 0} }, { text: 会员等级说明普通会员、白银会员、黄金会员、钻石会员等级越高享受权益越多。, id: faq_005, metadata: {category: 会员体系, views: 0} } ] # 批量添加文档 for doc in faq_documents: search_engine.add_document( textdoc[text], doc_iddoc[id], metadatadoc[metadata] ) return search_engine # 测试搜索功能 def test_search_functionality(): 测试搜索功能 print(构建客服知识库...) search_engine build_customer_service_kb() # 测试查询 test_queries [ 我忘记密码了怎么办, 想取消刚下的订单, 退款什么时候能到账, 怎么改送货地址, 会员有什么好处 ] print(\n语义搜索测试:) print( * 60) for query in test_queries: print(f\n查询: {query}) print(- * 40) results search_engine.search(query, top_k2) for i, result in enumerate(results, 1): print(f{i}. [相似度: {result.similarity:.3f}]) print(f 文档ID: {result.doc_id}) print(f 内容: {result.text[:80]}...) if result.metadata: print(f 分类: {result.metadata.get(category, 未知)}) print() # 显示统计信息 stats search_engine.get_stats() print(f\n系统统计:) print(f 文档总数: {stats[total_docs]}) print(f 搜索次数: {stats[total_searches]}) print(f 平均搜索时间: {stats[avg_search_time]:.3f}秒) if __name__ __main__: test_search_functionality()这个搜索系统已经具备了基本功能包括文档添加和向量化语义搜索和相似度计算结果排序和过滤索引保存和加载使用统计4.3 添加高级功能混合搜索在实际应用中我们往往需要结合语义搜索和关键词搜索。下面实现一个混合搜索系统class HybridSearchEngine(SemanticSearchEngine): 混合搜索引擎结合语义搜索和关键词搜索 def __init__(self, model_nameembeddinggemma:300m, keyword_weight0.3): super().__init__(model_name) self.keyword_weight keyword_weight # 关键词搜索权重 self.semantic_weight 1 - keyword_weight # 语义搜索权重 # 构建关键词索引 self.keyword_index {} # 词 - [文档ID列表] self.doc_keywords [] # 每个文档的关键词列表 def _extract_keywords(self, text: str) - List[str]: 简单关键词提取实际项目中可用TF-IDF等更复杂的方法 # 移除标点转换为小写分词 import re words re.findall(r\b\w\b, text.lower()) # 过滤停用词简单示例 stop_words {的, 了, 在, 是, 我, 有, 和, 就, 不, 人, 都, 一, 一个, 上, 也, 很, 到, 说, 要, 去, 你, 会, 着, 没有, 看, 好, 自己, 这} keywords [word for word in words if word not in stop_words and len(word) 1] # 取前10个作为关键词 return keywords[:10] def add_document(self, text: str, doc_id: Optional[str] None, metadata: Optional[Dict] None) - str: 重写添加文档方法同时构建关键词索引 # 调用父类方法添加文档 actual_id super().add_document(text, doc_id, metadata) # 提取关键词 keywords self._extract_keywords(text) self.doc_keywords.append(keywords) # 更新关键词索引 doc_index len(self.documents) - 1 for keyword in keywords: if keyword not in self.keyword_index: self.keyword_index[keyword] [] self.keyword_index[keyword].append(doc_index) return actual_id def keyword_search(self, query: str, top_k: int 5) - List[Tuple[int, float]]: 关键词搜索 query_keywords self._extract_keywords(query) # 计算文档得分 scores [0.0] * len(self.documents) for keyword in query_keywords: if keyword in self.keyword_index: # 简单计数包含关键词的文档得分1 for doc_idx in self.keyword_index[keyword]: scores[doc_idx] 1.0 # 归一化得分 max_score max(scores) if scores else 1.0 if max_score 0: scores [score / max_score for score in scores] # 获取top_k结果 top_indices np.argsort(scores)[::-1][:top_k] return [(idx, scores[idx]) for idx in top_indices] def hybrid_search(self, query: str, top_k: int 5, threshold: float 0.0) - List[SearchResult]: 混合搜索结合语义和关键词 # 语义搜索 semantic_results self.search(query, top_ktop_k*2, threshold0) # 关键词搜索 keyword_results self.keyword_search(query, top_ktop_k*2) # 合并结果 combined_scores {} # 处理语义搜索结果 for result in semantic_results: doc_idx self.documents.index(result.text) semantic_score result.similarity combined_scores[doc_idx] semantic_score * self.semantic_weight # 处理关键词搜索结果 for doc_idx, keyword_score in keyword_results: if doc_idx in combined_scores: combined_scores[doc_idx] keyword_score * self.keyword_weight else: combined_scores[doc_idx] keyword_score * self.keyword_weight # 排序并构建最终结果 sorted_indices sorted(combined_scores.items(), keylambda x: x[1], reverseTrue)[:top_k] results [] for doc_idx, final_score in sorted_indices: if final_score threshold: result SearchResult( doc_idself.metadata[doc_idx][id], textself.documents[doc_idx], similarityfinal_score, metadataself.metadata[doc_idx] ) results.append(result) return results # 测试混合搜索 def test_hybrid_search(): 测试混合搜索效果 print(构建混合搜索引擎...) engine HybridSearchEngine(keyword_weight0.3) # 添加测试文档 test_docs [ Python编程语言入门教程适合初学者学习, Java开发实战指南包含大量代码示例, 机器学习算法原理与Python实现, 深度学习在计算机视觉中的应用, Web前端开发技术HTML、CSS、JavaScript, 数据库设计与SQL优化技巧, 人工智能技术发展趋势分析 ] for i, doc in enumerate(test_docs): engine.add_document(doc, fdoc_{i}) # 测试不同搜索方式 queries [ Python学习, 人工智能技术, 代码编程 ] print(\n混合搜索对比测试:) print( * 60) for query in queries: print(f\n查询: {query}) print(- * 40) # 语义搜索 print(语义搜索结果:) semantic_results engine.search(query, top_k2) for i, result in enumerate(semantic_results, 1): print(f {i}. [{result.similarity:.3f}] {result.text}) # 关键词搜索 print(\n关键词搜索结果:) keyword_indices engine.keyword_search(query, top_k2) for i, (idx, score) in enumerate(keyword_indices, 1): print(f {i}. [{score:.3f}] {engine.documents[idx]}) # 混合搜索 print(\n混合搜索结果:) hybrid_results engine.hybrid_search(query, top_k2) for i, result in enumerate(hybrid_results, 1): print(f {i}. [{result.similarity:.3f}] {result.text}) print() if __name__ __main__: test_hybrid_search()混合搜索结合了两种方法的优点语义搜索理解查询的深层含义找到语义相关的内容关键词搜索确保重要的关键词不被忽略加权融合通过权重调整两者的重要性5. 性能优化与生产部署5.1 优化搜索速度当文档数量增多时线性搜索逐个计算相似度会变慢。下面介绍几种优化方法方法一使用向量索引对于大规模数据可以使用专门的向量数据库或者实现简单的索引结构import heapq from collections import defaultdict class OptimizedSearchEngine(SemanticSearchEngine): 优化版搜索引擎支持快速检索 def __init__(self, model_nameembeddinggemma:300m, num_clusters10): super().__init__(model_name) self.num_clusters num_clusters self.cluster_centers None self.doc_clusters None # 文档所属的簇 def build_clusters(self): 构建文档聚类简化版K-means from sklearn.cluster import KMeans if len(self.embeddings) self.num_clusters: print(文档数量太少跳过聚类) return # 转换为numpy数组 vectors np.array(self.embeddings) # 使用K-means聚类 kmeans KMeans(n_clustersmin(self.num_clusters, len(vectors)), random_state42) clusters kmeans.fit_predict(vectors) self.cluster_centers kmeans.cluster_centers_ self.doc_clusters clusters print(f文档聚类完成共{len(set(clusters))}个簇) def search_optimized(self, query: str, top_k: int 5) - List[SearchResult]: 优化搜索先找相关簇再在簇内搜索 if self.cluster_centers is None or len(self.documents) 100: # 文档少时使用普通搜索 return self.search(query, top_k) # 获取查询向量 query_vector self.client.get_embedding(query) if query_vector is None: return [] query_vector np.array(query_vector) # 找到最相关的簇 cluster_similarities [] for i, center in enumerate(self.cluster_centers): similarity cosine_similarity(query_vector, center) cluster_similarities.append((i, similarity)) # 按相似度排序 cluster_similarities.sort(keylambda x: x[1], reverseTrue) # 只在最相关的几个簇中搜索 top_clusters [idx for idx, _ in cluster_similarities[:3]] # 收集候选文档 candidate_indices [] for doc_idx, cluster_idx in enumerate(self.doc_clusters): if cluster_idx in top_clusters: candidate_indices.append(doc_idx) # 在候选文档中计算相似度 results [] for doc_idx in candidate_indices: similarity cosine_similarity(query_vector, self.embeddings[doc_idx]) if similarity 0: # 可以设置阈值 results.append((doc_idx, similarity)) # 排序并返回top_k results.sort(keylambda x: x[1], reverseTrue) results results[:top_k] # 构建SearchResult对象 search_results [] for doc_idx, similarity in results: result SearchResult( doc_idself.metadata[doc_idx][id], textself.documents[doc_idx], similaritysimilarity, metadataself.metadata[doc_idx] ) search_results.append(result) return search_results方法二批量处理和多线程import concurrent.futures from threading import Lock class BatchEmbeddingClient(EmbeddingClient): 支持批量处理的嵌入客户端 def __init__(self, base_urlhttp://localhost:11434, modelembeddinggemma:300m, max_workers4): super().__init__(base_url, model) self.max_workers max_workers self.lock Lock() def get_embeddings_parallel(self, texts: List[str]) - List[List[float]]: 多线程并行获取嵌入向量 embeddings [None] * len(texts) def process_text(idx, text): try: embedding self.get_embedding(text) with self.lock: embeddings[idx] embedding except Exception as e: print(f处理文本{idx}失败: {e}) with concurrent.futures.ThreadPoolExecutor(max_workersself.max_workers) as executor: futures [] for i, text in enumerate(texts): future executor.submit(process_text, i, text) futures.append(future) # 等待所有任务完成 concurrent.futures.wait(futures) return embeddings # 性能对比测试 def performance_comparison(): 对比不同方法的性能 import time client EmbeddingClient() batch_client BatchEmbeddingClient(max_workers4) # 测试数据 test_texts [f测试文本{i}: 人工智能和机器学习技术 for i in range(10)] print(性能对比测试:) print( * 50) # 方法1: 顺序处理 print(\n1. 顺序处理:) start_time time.time() embeddings_seq [] for text in test_texts: embedding client.get_embedding(text) embeddings_seq.append(embedding) seq_time time.time() - start_time print(f 时间: {seq_time:.2f}秒) print(f 平均每个文本: {seq_time/len(test_texts):.2f}秒) # 方法2: 并行处理 print(\n2. 并行处理(4线程):) start_time time.time() embeddings_parallel batch_client.get_embeddings_parallel(test_texts) parallel_time time.time() - start_time print(f 时间: {parallel_time:.2f}秒) print(f 平均每个文本: {parallel_time/len(test_texts):.2f}秒) print(f 加速比: {seq_time/parallel_time:.2f}倍) # 验证结果一致性 print(\n3. 结果验证:) all_match True for i, (emb1, emb2) in enumerate(zip(embeddings_seq, embeddings_parallel)): if emb1 is not None and emb2 is not None: # 检查前5个值是否相同允许浮点误差 for j in range(min(5, len(emb1), len(emb2))): if abs(emb1[j] - emb2[j]) 0.0001: all_match False break print(f 结果一致性: {通过 if all_match else 失败}) if __name__ __main__: performance_comparison()5.2 生产环境部署建议部署架构对于生产环境建议采用以下架构用户请求 → Web服务器 → 语义搜索服务 → Ollama API → 返回结果 ↓ 向量数据库 ↓ 缓存层(Redis)使用向量数据库对于大规模应用建议使用专门的向量数据库# 使用Chroma向量数据库的示例 try: import chromadb from chromadb.config import Settings class ChromaSearchEngine: 基于Chroma的搜索引擎 def __init__(self, model_nameembeddinggemma:300m, persist_directory./chroma_db): self.client chromadb.Client(Settings( chroma_db_implduckdbparquet, persist_directorypersist_directory )) # 创建或获取集合 self.collection self.client.get_or_create_collection( namedocuments, metadata{hnsw:space: cosine} # 使用余弦相似度 ) self.embedding_client EmbeddingClient(modelmodel_name) def add_document(self, text: str, doc_id: str, metadata: dict None): 添加文档到Chroma # 获取向量 embedding self.embedding_client.get_embedding(text) if embedding is None: raise ValueError(无法获取文本向量) # 添加到集合 self.collection.add( documents[text], embeddings[embedding], metadatas[metadata] if metadata else [{}], ids[doc_id] ) def search(self, query: str, top_k: int 5): 在Chroma中搜索 # 获取查询向量 query_embedding self.embedding_client.get_embedding(query) if query_embedding is None: return [] # 搜索 results self.collection.query( query_embeddings[query_embedding], n_resultstop_k ) return results except ImportError: print(Chroma未安装请运行: pip install chromadb)API服务封装将搜索功能封装为REST API服务from flask import Flask, request, jsonify import threading app Flask(__name__) search_engine SemanticSearchEngine() # 全局锁防止并发问题 engine_lock threading.Lock() app.route(/api/search, methods[POST]) def search(): 搜索API data request.json if not data or query not in data: return jsonify({error: 缺少query参数}), 400 query data[query] top_k data.get(top_k, 5) threshold data.get(threshold, 0.0) try: with engine_lock: results search_engine.search(query, top_ktop_k, thresholdthreshold) # 格式化结果 formatted_results [] for result in results: formatted_results.append({ id: result.doc_id, text: result.text, similarity: float(result.similarity), metadata: result.metadata }) return jsonify({ query: query, results: formatted_results, count: len(formatted_results) }) except Exception as e: return jsonify({error: str(e)}), 500 app.route(/api/add_document, methods[POST]) def add_document(): 添加文档API data request.json if not data or text not in data: return jsonify({error: 缺少text参数}), 400 text data[text] doc_id data.get(doc_id) metadata data.get(metadata, {}) try: with engine_lock: actual_id search_engine.add_document(text, doc_id, metadata) return jsonify({ success: True, doc_id: actual_id, message: 文档添加成功 }) except Exception as e: return jsonify({error: str(e)}), 500 app.route(/api/stats, methods[GET]) def get_stats(): 获取统计信息API stats search_engine.get_stats() return jsonify(stats) if __name__ __main__: # 初始化一些示例数据 with engine_lock: search_engine.add_document(机器学习是人工智能的一个重要分支, doc_001) search_engine.add_document(深度学习基于神经网络模型, doc_002) search_engine.add_document(自然语言处理让计算机理解人类语言, doc_003) print(搜索服务启动中...) print(API端点:) print( POST /api/search - 执行搜索) print( POST /api/add_document - 添加文档) print( GET /api/stats - 获取统计信息) print(\n服务地址: http://localhost:5000) app.run(host0.0.0.0, port5000, debugFalse)运行这个服务后你就可以通过HTTP API来使用搜索功能了。6. 总结6.1 关键要点回顾通过这篇教程我们完整地走过了搭建语义搜索系统的全过程模型理解EmbeddingGemma-300m是一个轻量级但强大的嵌入模型专门将文本转换为语义向量适合在普通设备上运行。环境部署使用Ollama可以一键部署模型通过简单的API调用即可获取文本向量。核心原理语义搜索基于向量相似度通过计算余弦相似度找到语义相近的内容突破了传统关键词匹配的局限。系统构建我们从零开始构建了一个完整的语义搜索系统包括文档处理、向量存储、相似度计算和结果排序。高级功能实现了混合搜索结合语义和关键词、性能优化聚类索引、批量处理、以及生产级部署方案。实际应用展示了如何在客服知识库、文档检索等场景中应用语义搜索显著提升搜索准确性和用户体验。6.2 最佳实践建议基于实际项目经验我总结了一些最佳实践文本预处理很重要对于长文档考虑分段处理每段200-500字添加适当的上下文信息避免歧义清理无关字符和格式相似度阈值需要调优严格匹配相似度 0.85相关推荐相似度 0.70话题聚类相似度 0.60根据具体场景调整阈值性能优化策略文档数量少1000使用内存搜索文档数量中等1000-10万使用向量数据库如Chroma文档数量大10万考虑分布式向量数据库如Milvus错误处理与监控实现API调用的重试机制添加超时和限流控制监控搜索准确率和响应时间6.3 扩展学习方向如果你已经掌握了基础可以进一步探索多语言支持EmbeddingGemma支持100多种语言尝试构建多语言搜索系统。模型微调在自己的专业领域数据上微调模型提升特定任务的准确率。多模态搜索结合文本、图像、音频的嵌入模型构建统一的多模态搜索。实时更新实现文档的实时增删改查保持搜索索引的时效性。用户反馈学习根据用户的点击和反馈优化搜索排序算法。EmbeddingGemma-300m的魅力在于它的平衡——既有不错的语义理解能力又足够轻量便于部署。无论是个人项目、创业公司还是企业内部应用它都能提供一个高性价比的语义搜索解决方案。记住技术只是工具真正的价值在于解决实际问题。现在你已经掌握了搭建语义搜索系统的全套技能接下来就是发挥创意用它来解决你遇到的实际问题了。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。