从文档到知识库:RAG系统的自动化数据处理与管理方案
一、背景介绍
检索增强生成(RAG)系统已成为人工智能领域的一个重要发展方向,它结合了大规模语言模型的生成能力和外部知识库的精确信息,以提供更准确、更可靠的回答。然而,构建和维护RAG系统的知识库一直是一个耗时且复杂的过程,特别是在处理大量非结构化文档时。最近,我们正在为一个检索增强生成(RAG)系统开发一个自动化的问答(QA)生成工具。这个项目旨在缓解上述挑战,通过自动化流程将各种格式的文档转化为结构化的问答对,并将它们无缝集成到RAG系统的知识库中。
二、提出动机
本项目源于实际RAG系统开发中遇到的挑战,其中大致的动机有以下几点:
- 提高效率:传统方法要么效果不佳,要么耗时过多,我们需要一种能够快速处理大量文档的方法。
- 提升质量:利用大模型的智能性,我们希望生成的问答对能够更加贴合文本内容,提高知识库的质量。
- 减少人工干预:通过自动化流程,我们旨在最小化人工参与,从而降低人为错误和主观偏差。
- 灵活适应:我们需要一个系统能够处理各种格式的文档,并适应不同领域的知识需求。
- 用户友好:即使是非技术人员也应该能够轻松使用这个系统,参与到知识库的构建和管理中。
三、技术方案
特别的,我们整体的技术方案可以归结为下面的几个部分:
- 文档处理:使用unstructured库的partition函数来处理各种格式的文档(txt、pdf、docx),将其分割成适当大小的文本块。
- AI驱动的QA生成:利用OpenAI的API(在本案例中使用qwen1.5-72b模型)自动生成高质量的问答对。通过精心设计的prompt,确保生成的问答对紧密围绕文本内容。
- 知识库管理:实现了一个灵活的集合管理系统,允许创建新的集合或选择现有集合来存储生成的QA对。使用RESTful API与后端数据库进行交互,实现数据的存储和检索。
- 用户界面:基于Streamlit构建了一个直观、用户友好的Web界面。该界面提供了文件上传、QA对生成预览、知识库管理等功能,使整个过程变得简单明了。
- 进度跟踪和错误处理:实现了详细的进度显示和错误处理机制,确保用户能够实时了解处理进展,并在出现问题时得到及时反馈。
- 缓存优化:使用Streamlit的@st.cache_data装饰器来优化性能,特别是在QA对生成过程中。
- 安全性考虑:使用临时文件处理上传的文档,处理后立即删除,以确保数据安全。
四、使用步骤
4.1 启动应用
运行Python脚本以启动Streamlit应用:
1
streamlit run rag_admin_interface.py
4.2 页面概览
应用界面分为两个主要部分:
- 左侧边栏:用于选择操作(上传文件或管理知识库)
- 主界面:显示当前操作的详细内容和交互元素
RAG管理主页面
4.3 上传文件
- 在左侧边栏选择“上传文件”操作。
- 在主界面中,使用文件上传器上传非结构化文件(支持txt、pdf、docx格式)。
- 文件上传成功后,点击“处理文件并生成QA对”按钮。
- 系统将处理文件并生成QA对,显示进度条和结果摘要。
- 生成完成后,可以预览前3个QA对。
4.4 管理知识库
- 在左侧边栏选择“管理知识库”操作。
- 选择“插入现有Collection”或“创建新Collection”。
- 插入现有Collection:
- 从下拉列表中选择一个现有的Collection。
- 创建新Collection:
- 输入新Collection的名称。
- 设置Collection的容量(1-1000之间)。
- 点击“创建新Collection”按钮。
- 插入现有Collection:
4.5 插入QA对到Collection
- 确保已经上传文件并生成了QA对。
- 在知识库管理界面,选择或创建一个Collection。
- 点击“插入QA对到选定的Collection”按钮。
- 系统将显示插入进度和结果摘要。
五、技术实现
5.1 配置和初始化
首先,我们设置了必要的配置和初始化:
1
2
3
4
5
6
7
8
base_url = 'your_knowledgebase_base_url'
api_key = 'your_knowledgebase_api_key'
headers = {"Authorization": f"Bearer {api_key}"}
client = OpenAI(
api_key="your_llm_api_key",
base_url="your_llm_base_url",
)
这部分设置了API的基础URL和认证信息,以及OpenAI客户端的配置。
5.2 核心功能函数
5.2.1 get_completion()
这个函数用于调用AI模型生成内容:
1
2
3
4
5
6
7
8
9
10
11
def get_completion(prompt, model="qwen1.5-72b"):
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=0,
)
return response.choices[0].message.content
except Exception as e:
st.error(f"调用API时发生错误: {e}")
return None
5.2.2 list_collections(), create_collection(), create_chunk()
这些函数用于管理知识库集合和数据块:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def list_collections():
try:
response = requests.get(f"{base_url}collections", headers=headers)
response.raise_for_status()
return response.json()['data']
except requests.RequestException as e:
st.error(f"获取集合列表失败: {e}")
return []
def create_collection(name, embedding_model_id, capacity):
data = {
"name": name,
"embedding_model_id": embedding_model_id,
"capacity": capacity
}
try:
response = requests.post(f"{base_url}collections", headers=headers, json=data)
response.raise_for_status()
return response.json()['data']
except requests.RequestException as e:
st.error(f"创建集合失败: {e}")
return None
def create_chunk(collection_id, content):
data = {
"collection_id": collection_id,
"content": content
}
endpoints = [
f"{base_url}chunks",
f"{base_url}collections/{collection_id}/chunks",
f"{base_url}embeddings"
]
for endpoint in endpoints:
try:
response = requests.post(endpoint, headers=headers, json=data)
response.raise_for_status()
return response.json()['data']
except requests.RequestException:
continue
st.error("创建chunk失败: 所有端点都失败了")
return None
5.2.3 process_file()
这个函数用于处理上传的文件,将其分割成文本块:
1
2
3
4
5
6
7
8
9
10
11
12
13
def process_file(uploaded_file):
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as tmp_file:
tmp_file.write(uploaded_file.getvalue())
tmp_file_path = tmp_file.name
try:
elements = partition(filename=tmp_file_path)
text_chunks = [str(element) for element in elements if not str(element).strip() == '']
return text_chunks
except Exception as e:
st.error(f"处理文件时发生错误: {e}")
return []
finally:
os.unlink(tmp_file_path)
5.2.4 generate_qa_pairs_with_progress()
这个函数基于文本块生成QA对:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@st.cache_data
def generate_qa_pairs_with_progress(text_chunks):
qa_pairs = []
progress_bar = st.progress(0)
for i, chunk in enumerate(text_chunks):
prompt = f"""基于以下给定的文本,生成一组高质量的问答对。请遵循以下指南:
...
"""
response = get_completion(prompt)
if response:
try:
parts = response.split("A:", 1)
if len(parts) == 2:
question = parts[0].replace("Q:", "").strip()
answer = parts[1].strip()
qa_pairs.append({"question": question, "answer": answer})
else:
st.warning(f"无法解析响应: {response}")
except Exception as e:
st.warning(f"处理响应时出错: {str(e)}")
progress = (i + 1) / len(text_chunks)
progress_bar.progress(progress)
return qa_pairs
5.2.5 insert_qa_pairs_to_database()
这个函数将生成的QA对插入到选定的集合中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def insert_qa_pairs_to_database(collection_id):
progress_bar = st.progress(0)
status_text = st.empty()
success_count = 0
fail_count = 0
for i, qa_pair in enumerate(st.session_state.qa_pairs):
try:
if "question" in qa_pair and "answer" in qa_pair:
content = f"问题:{qa_pair['question']}\n答案:{qa_pair['answer']}"
if create_chunk(collection_id=collection_id, content=content):
success_count += 1
else:
fail_count += 1
st.warning(f"插入QA对 {i+1} 失败")
else:
fail_count += 1
st.warning(f"QA对 {i+1} 格式无效")
except Exception as e:
st.error(f"插入QA对 {i+1} 时发生错误: {str(e)}")
fail_count += 1
progress = (i + 1) / len(st.session_state.qa_pairs)
progress_bar.progress(progress)
status_text.text(f"进度: {progress:.2%} | 成功: {success_count} | 失败: {fail_count}")
return success_count, fail_count
5.3 主页面结构
主界面结构在main()函数中定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def main():
st.set_page_config(page_title="RAG管理员界面", layout="wide")
st.title("RAG管理员界面")
# 侧边栏
st.sidebar.title("操作面板")
operation = st.sidebar.radio("选择操作", ["上传文件", "管理知识库"])
if operation == "上传文件":
# 文件上传和处理逻辑
...
elif operation == "管理知识库":
# 知识库管理逻辑
...
if __name__ == "__main__":
main()
5.4 文件上传和处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if operation == "上传文件":
st.header("文件上传与QA对生成")
uploaded_file = st.file_uploader("上传非结构化文件", type=["txt", "pdf", "docx"])
if uploaded_file is not None:
st.success("文件上传成功!")
if st.button("处理文件并生成QA对"):
with st.spinner("正在处理文件..."):
text_chunks = process_file(uploaded_file)
if not text_chunks:
st.error("文件处理失败,请检查文件格式是否正确。")
return
st.info(f"文件已分割成 {len(text_chunks)} 个文本段")
with st.spinner("正在生成QA对..."):
st.session_state.qa_pairs = generate_qa_pairs_with_progress(text_chunks)
st.success(f"已生成 {len(st.session_state.qa_pairs)} 个QA对")
# QA对预览逻辑
...
5.5 知识库管理
知识库管理的代码片段:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
elif operation == "管理知识库":
st.header("知识库管理")
option = st.radio("选择操作", ("插入现有Collection", "创建新Collection"))
if option == "插入现有Collection":
# 选择现有Collection的逻辑
...
elif option == "创建新Collection":
# 创建新Collection的逻辑
...
# 插入QA对到选定的Collection
if hasattr(st.session_state, 'qa_pairs') and st.session_state.qa_pairs:
if st.button("插入QA对到选定的Collection"):
# 插入QA对的逻辑
...
else:
st.warning("没有可用的QA对。请先上传文件并生成QA对。")
六、其他事项
6.1 注意事项
- 确保在使用前正确配置了API密钥和基础URL。
- 大文件处理和QA对生成可能需要一些时间,请耐心等待。
- 插入大量QA对到Collection时可能需要较长时间,系统会显示进度。
6.2 错误处理
- 如果遇到API调用错误或文件处理错误,系统会在界面上显示相应的错误消息。
- 对于插入失败的QA对,系统会显示警告信息。
6.3 性能考虑
- 应用使用了Streamlit的缓存机制来优化性能,特别是在QA对生成过程中。
- 对于大型文件或大量QA对,处理时间可能会较长。
6.4 安全性
- 请确保妥善保管API密钥和其他敏感信息。
- 上传的文件会被临时存储并在处理后删除。
This post is licensed under CC BY 4.0 by the author.