clearwind

clearwind

首页
分类
登录 →
clearwind

clearwind

首页 分类
登录
  1. 首页
  2. 🚀AI
  3. 📚RAG
  4. Quick start of RAG

Quick start of RAG

0
  • 📚RAG
  • 发布于 2024-12-24
  • 27 次阅读
clearwind
clearwind

大模型固有的局限性

LLM所具有的能力是建立在预训练的基础上,因此,LLM并不具备实时或训练时没有使用的知识:

  1. 实时数据:时间等

  2. 私有领域/业务知识

RAG

Retrieval Augmented Generation,通过检索的方式增强生成模型的能力。

基本流程

  1. 文档加载,并按照一定的条件切割成片段

  2. 将切片灌入检索引擎

  3. 封装检索接口

  4. 调用流程:Query -> 检索 -> Prompt -> LLM -> Answer

依赖

pip install --upgrade openai
# pdf 解析库
pip install pdfminer.six
pip install numpy
pip install chromadb

向量检索

向量是一种有大小和方向的数学对象。它可以表示为从一个点到另一个点的有向线段。例如,二维空间中的向量可以表示为 (x,y),表示从原点 (0,0) 到点 (x,y)$的有向线段。可以用一组坐标 x_0, x_1, \ldots, x_{N-1}) 表示一个 N 维空间中的向量,N$叫向量的维度。

文本向量(Embedding)

  1. 将文本转成一组 N 维浮点数,即文本向量又叫 Embeddings

  2. 向量之间可以计算距离,距离远近对应语义相似度大小

向量间的相似度计算

import numpy as np
from numpy import dot
from numpy.linalg import norm

def cos_sim(a, b):
    '''余弦距离 -- 越大越相似'''
    return dot(a, b)/(norm(a)*norm(b))


def l2(a, b):
    '''欧氏距离 -- 越小越相似'''
    x = np.asarray(a)-np.asarray(b)
    return norm(x)

def get_embeddings(texts, model="text-embedding-ada-002", dimensions=None):
    '''封装 OpenAI 的 Embedding 模型接口'''
    if model == "text-embedding-ada-002":
        dimensions = None
    if dimensions:
        data = client.embeddings.create(
            input=texts, model=model, dimensions=dimensions).data
    else:
        data = client.embeddings.create(input=texts, model=model).data
    return [x.embedding for x in data]

向量数据库&Chroma

将数据量庞大的知识、新闻、文献、语料等先通过嵌入(embedding)算法转变为向量数据,然后存储在Chroma等向量数据库中。当用户在大模型输入问题后,将问题本身也embedding,转化为向量,在向量数据库中查找与之最匹配的相关知识,组成大模型的上下文,将其输入给大模型,最终返回大模型处理后的文本给用户。

import chromadb
from chromadb.config import Settings

class MyVectorDBConnector:
    def __init__(self, collection_name, embedding_fn):
        # 内存模式
        chroma_client = chromadb.Client(Settings(allow_reset=True))
        # 数据持久化
        # chroma_client = chromadb.PersistentClient(path="./chroma")

        # 注意:为了演示,实际不需要每次 reset(),并且是不可逆的!
        chroma_client.reset()

        # 创建一个 collection
        self.collection = chroma_client.get_or_create_collection(name=collection_name)
        self.embedding_fn = embedding_fn

    def add_documents(self, documents):
        '''向 collection 中添加文档与向量'''
        self.collection.add(
            embeddings=self.embedding_fn(documents),  # 每个文档的向量
            documents=documents,  # 文档的原文
            ids=[f"id{i}" for i in range(len(documents))]  # 每个文档的 id
        )

    def search(self, query, top_n):
        '''检索向量数据库'''
        results = self.collection.query(
            query_embeddings=self.embedding_fn([query]),
            n_results=top_n
        )
        return results

  • FAISS: Meta 开源的向量检索引擎 https://github.com/facebookresearch/faiss

  • Pinecone: 商用向量数据库,只有云服务 https://www.pinecone.io/

  • Milvus: 开源向量数据库,同时有云服务 https://milvus.io/

  • Weaviate: 开源向量数据库,同时有云服务 https://weaviate.io/

  • Qdrant: 开源向量数据库,同时有云服务 https://qdrant.tech/

  • PGVector: Postgres 的开源向量检索引擎 https://github.com/pgvector/pgvector

  • RediSearch: Redis 的开源向量检索引擎 https://github.com/RediSearch/RediSearch

  • ElasticSearch 也支持向量检索 https://www.elastic.co/enterprise-search/vector-search

文本切割粒度

1. 粒度太大可能导致检索不精准,粒度太小可能导致信息不全面

2. 问题的答案可能跨越两个片段

from nltk.tokenize import sent_tokenize
import json


# chunk_size 一般根据文档内容或大小来设置
# overlap_size 一般设置 chunk_size 大小的10%-20%之间
def split_text(paragraphs, chunk_size=300, overlap_size=100):
    '''按指定 chunk_size 和 overlap_size 交叠割文本'''
    sentences = [s.strip() for p in paragraphs for s in sent_tokenize(p)]
    chunks = []
    i = 0
    while i < len(sentences):
        chunk = sentences[i]
        overlap = ''
        prev_len = 0
        prev = i - 1
        # 向前计算重叠部分
        while prev >= 0 and len(sentences[prev])+len(overlap) <= overlap_size:
            overlap = sentences[prev] + ' ' + overlap
            prev -= 1
        chunk = overlap+chunk
        next = i + 1
        # 向后计算当前chunk
        while next < len(sentences) and len(sentences[next])+len(chunk) <= chunk_size:
            chunk = chunk + ' ' + sentences[next]
            next += 1
        chunks.append(chunk)
        i = next
    return chunks

检索后排序

最合适的答案不一定排在检索的最前面

1. 检索时过招回一部分文本

2. 通过一个排序模型对 query 和 document 重新打分排序

from sentence_transformers import CrossEncoder

model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2', max_length=512) # 英文,模型较小
# model = CrossEncoder('BAAI/bge-reranker-large', max_length=512) # 多语言,国产,模型较大

user_query = "how safe is llama 2"
# user_query = "llama 2安全性如何"
scores = model.predict([(user_query, doc) for doc in search_results['documents'][0]])
# 按得分排序
sorted_list = sorted(zip(scores, search_results['documents'][0]), key=lambda x: x[0], reverse=True)
for score, doc in sorted_list:
    print(f"{score}\t{doc}\n")

Rerank 的 API 服务

  • [Cohere Rerank](https://cohere.com/rerank):支持多语言

  • [Jina Rerank](https://jina.ai/reranker/):目前只支持英文

混合检索

传统的关键字检索(稀疏表示)与向量检索(稠密表示)各有优劣。 举个具体例子,比如文档中包含很长的专有名词,关键字检索往往更精准而向量检索容易引入概念混淆。

1. 基于关键字检索的排序

import time


class MyEsConnector:
    def __init__(self, es_client, index_name, keyword_fn):
        self.es_client = es_client
        self.index_name = index_name
        self.keyword_fn = keyword_fn

    def add_documents(self, documents):
        '''文档灌库'''
        if self.es_client.indices.exists(index=self.index_name):
            self.es_client.indices.delete(index=self.index_name)
        self.es_client.indices.create(index=self.index_name)
        actions = [
            {
                "_index": self.index_name,
                "_source": {
                    "keywords": self.keyword_fn(doc),
                    "text": doc,
                    "id": f"doc_{i}"
                }
            }
            for i, doc in enumerate(documents)
        ]
        helpers.bulk(self.es_client, actions)
        time.sleep(1)

    def search(self, query_string, top_n=3):
        '''检索'''
        search_query = {
            "match": {
                "keywords": self.keyword_fn(query_string)
            }
        }
        res = self.es_client.search(
            index=self.index_name, query=search_query, size=top_n)
        return {
            hit["_source"]["id"]: {
                "text": hit["_source"]["text"],
                "rank": i,
            }
            for i, hit in enumerate(res["hits"]["hits"])
        }

from chinese_utils import to_keywords  # 使用中文的关键字提取函数

# 引入配置文件
ELASTICSEARCH_BASE_URL = os.getenv('ELASTICSEARCH_BASE_URL')
ELASTICSEARCH_PASSWORD = os.getenv('ELASTICSEARCH_PASSWORD')
ELASTICSEARCH_NAME= os.getenv('ELASTICSEARCH_NAME')

es = Elasticsearch(
    hosts=[ELASTICSEARCH_BASE_URL],
    http_auth=(ELASTICSEARCH_NAME, ELASTICSEARCH_PASSWORD),  # 用户名,密码
)


# 创建 ES 连接器
es_connector = MyEsConnector(es, "demo_es_rrf", to_keywords)

# 文档灌库
es_connector.add_documents(documents)

# 关键字检索
keyword_search_results = es_connector.search(query, 3)

print(json.dumps(keyword_search_results, indent=4, ensure_ascii=False))

2. 基于向量检索的排序

# 创建向量数据库连接器
vecdb_connector = MyVectorDBConnector("demo_vec_rrf", get_embeddings)

# 文档灌库
vecdb_connector.add_documents(documents)

# 向量检索
vector_search_results = {
    "doc_"+str(documents.index(doc)): {
        "text": doc,
        "rank": i
    }
    for i, doc in enumerate(
        vecdb_connector.search(query, 3)["documents"][0]
    )
}  # 把结果转成跟上面关键字检索结果一样的格式

print(json.dumps(vector_search_results, indent=4, ensure_ascii=False))

3. 基于 RRF 的融合排序

参考资料:https://learn.microsoft.com/zh-cn/azure/search/hybrid-search-ranking

def rrf(ranks, k=1):
    ret = {}
    # 遍历每次的排序结果
    for rank in ranks:
        # 遍历排序中每个元素
        for id, val in rank.items():
            if id not in ret:
                ret[id] = {"score": 0, "text": val["text"]}
            # 计算 RRF 得分
            ret[id]["score"] += 1.0/(k+val["rank"])
    # 按 RRF 得分排序,并返回
    return dict(sorted(ret.items(), key=lambda item: item[1]["score"], reverse=True))

import json

# 融合两次检索的排序结果
reranked = rrf([keyword_search_results, vector_search_results])

print(json.dumps(reranked, indent=4, ensure_ascii=False))

基于向量检索的RAG

文档加载于切割

from pdfminer.high_level import extract_pages
from pdfminer.layout import LTTextContainer

def extract_text_from_pdf(filename, page_numbers=None, min_line_length=1):
    '''从 PDF 文件中(按指定页码)提取文字'''
    paragraphs = []
    buffer = ''
    full_text = ''
    # 提取全部文本
    for i, page_layout in enumerate(extract_pages(filename)):
        # 如果指定了页码范围,跳过范围外的页
        if page_numbers is not None and i not in page_numbers:
            continue
        for element in page_layout:
            if isinstance(element, LTTextContainer):
                full_text += element.get_text() + '\n'
    # 按空行分隔,将文本重新组织成段落
    lines = full_text.split('\n')
    for text in lines:
        if len(text) >= min_line_length:
            buffer += (' '+text) if not text.endswith('-') else text.strip('-')
        elif buffer:
            paragraphs.append(buffer)
            buffer = ''
    if buffer:
        paragraphs.append(buffer)
    return paragraphs

LLM接口封装

from openai import OpenAI
import os

# 加载环境变量
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv(), verbose=True)  # 读取本地 .env 文件,里面定义了 OPENAI_API_KEY

client = OpenAI()

def get_completion(prompt, model="gpt-4o"):
    '''封装 openai 接口'''
    messages = [{"role": "user", "content": prompt}]
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=0,  # 模型输出的随机性,0 表示随机性最小
    )
    return response.choices[0].message.content

Prompt模版

def build_prompt(prompt_template, **kwargs):
    '''将 Prompt 模板赋值'''
    inputs = {}
    for k, v in kwargs.items():
        if isinstance(v, list) and all(isinstance(elem, str) for elem in v):
            val = '\n\n'.join(v)
        else:
            val = v
        inputs[k] = val
    return prompt_template.format(**inputs)

prompt_template = """
你是一个问答机器人。
你的任务是根据下述给定的已知信息回答用户问题。

已知信息:
{context} # 检索出来的原始文档

用户问:
{query} # 用户的提问

如果已知信息不包含用户问题的答案,或者已知信息不足以回答用户的问题,请直接回复"我无法回答您的问题"。
请不要输出已知信息中不包含的信息或答案。
请用中文回答用户问题。
"""

检索

class RAG_Bot:
    def __init__(self, vector_db, llm_api, n_results=2):
        self.vector_db = vector_db
        self.llm_api = llm_api
        self.n_results = n_results

    def chat(self, user_query):
        # 1. 检索
        search_results = self.vector_db.search(user_query, self.n_results)

        # 2. 构建 Prompt
        prompt = build_prompt(
            prompt_template, context=search_results['documents'][0], query=user_query)

        # 3. 调用 LLM
        response = self.llm_api(prompt)
        return response
# 创建一个RAG机器人
bot = RAG_Bot(
    vector_db,
    llm_api=get_completion
)

user_query = "llama 2有多少参数?"

response = bot.chat(user_query)

print(response)

标签: #LLM 9 #AI 7 #RAG 3
相关文章
基于 internlm2 和 LangChain 搭建你的知识库

基于 internlm2 和 LangChain 搭建你的知识库 2025-02-27 14:25

环境配置 internlm2 模型部署 创建虚拟环境 conda create -n deepseek_rag python=3.10 -y conda activate deepseek_rag 并在环境中安装运行 demo 所需要的依赖 # 升级pip python -m pip install

Quick start of LangChain

Quick start of LangChain 2024-12-24 23:39

本文内容 LangChain(大模型能力封装框架) 的基本使用 基于LangChain探索AGI时代原型 需提前安装环境依赖,以及设置环境变量,如果选择openai开放接口需要会上网 export OPENAI_API_KEY="b233095ff.00gIXhXyE8yNc3Hx" export

Quick start of RAG

Quick start of RAG 2024-12-24 20:29

大模型固有的局限性 LLM所具有的能力是建立在预训练的基础上,因此,LLM并不具备实时或训练时没有使用的知识: 实时数据:时间等 私有领域/业务知识 RAG Retrieval Augmented Generation,通过检索的方式增强生成模型的能力。 基本流程

订单引擎 2024-09-17 20:31

订单状态流转是交易系统的最为核心的功能,复杂的订单系统会存在状态多、链路长、逻辑复杂的特点,针对不同的商品、业务、发货方式还存在多场景、多类型、多业务维度等业务特性。在保证订单状态流转稳定性的前提下、可扩展性和可维护性由是需要重点关注和解决的问题。 以公司目前的订单系统为例,订单状态有待支付、支付成

敏捷开发

敏捷开发 2024-08-21 10:00

一、敏捷宣言 12原则 最重要的目标是通过持续不断尽早交付有价值的软件使客户满意; 欣然面对需求变更即使在开发后期。为了客户的竞争优势。敏捷拥抱变化; 经常的交付可工作的软件,倾向于采取较短的周期; 业务人员与研发人员每天要一起工作; 激发个体斗志,以他们为核心搭建项目。提供所需环境和支持,辅以信任

产品出生指南

产品出生指南 2024-07-10 00:39

01启动 1.行业调研 工作内容:调研行业的发展现状以及趋势,目的是为产品的发展方向提供依据。 工作产出:BRD 2. 市场调研 工作内容:在行业调研的

目录
  • clearwind
  • 微信小程序

导航菜单

  • 首页
  • 分类
Copyright © 2024 your company All Rights Reserved. Powered by clearwind.
皖ICP备19023482号