# If file_paths is provided, ensure it matches the number of documents if file_paths isnotNone: ifisinstance(file_paths, str): file_paths = [file_paths] iflen(file_paths) != len(input): raise ValueError( "Number of file paths must match the number of documents" )
# 1. Validate ids if provided or generate MD5 hash IDs if ids isnotNone: # Check if the number of IDs matches the number of documents iflen(ids) != len(input): raise ValueError("Number of IDs must match the number of documents")
# Check if IDs are unique iflen(ids) != len(set(ids)): raise ValueError("IDs must be unique")
# Generate contents dict of IDs provided by user and documents contents = { id_: {"content": doc, "file_path": path} for id_, doc, path inzip(ids, input, file_paths) } else: # Clean input text and remove duplicates cleaned_input = [ (clean_text(doc), path) for doc, path inzip(input, file_paths) ] unique_content_with_paths = {}
# Keep track of unique content and their paths for content, path in cleaned_input: if content notin unique_content_with_paths: unique_content_with_paths[content] = path
# Generate contents dict of MD5 hash IDs and documents with paths contents = { compute_mdhash_id(content, prefix="doc-"): { "content": content, "file_path": path, } for content, path in unique_content_with_paths.items() }
紧接着在输入的文档内部进行去重,意思是说,去除输入里的重复文档
Code
1 2 3 4 5 6 7 8 9 10 11 12 13
# 2. Remove duplicate contents unique_contents = {} for id_, content_data in contents.items(): content = content_data["content"] file_path = content_data["file_path"] if content notin unique_contents: unique_contents[content] = (id_, file_path)
# Reconstruct contents with unique content contents = { id_: {"content": content, "file_path": file_path} for content, (id_, file_path) in unique_contents.items() }
为每一份文档建立一个状态,方便追踪(包括更新时间)
这里的 content_summary 并非 LLM 的总结,仅仅只是做了截取。
Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# 3. Generate document initial status new_docs: dict[str, Any] = { id_: { "status": DocStatus.PENDING, "content": content_data["content"], "content_summary": get_content_summary(content_data["content"]), "content_length": len(content_data["content"]), "created_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat(), "file_path": content_data[ "file_path" ], # Store file path in document status } for id_, content_data in contents.items() }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
defget_content_summary(content: str, max_length: int = 250) -> str: """Get summary of document content Args: content: Original document content max_length: Maximum length of summary Returns: Truncated content with ellipsis if needed """ content = content.strip() iflen(content) <= max_length: return content return content[:max_length] + "..."
# 4. Filter out already processed documents # Get docs ids all_new_doc_ids = set(new_docs.keys()) # Exclude IDs of documents that are already in progress unique_new_doc_ids = awaitself.doc_status.filter_keys(all_new_doc_ids)
# Log ignored document IDs ignored_ids = [ doc_id for doc_id in unique_new_doc_ids if doc_id notin new_docs ] if ignored_ids: logger.warning( f"Ignoring {len(ignored_ids)} document IDs not found in new_docs" ) for doc_id in ignored_ids: logger.warning(f"Ignored document ID: {doc_id}")
# Filter new_docs to only include documents with unique IDs new_docs = { doc_id: new_docs[doc_id] for doc_id in unique_new_doc_ids if doc_id in new_docs }
ifnot new_docs: logger.info("No new unique documents were found.") return
最后把过滤出来的文档插入文档数据库终于!(笑
Code
1 2 3
# 5. Store status document awaitself.doc_status.upsert(new_docs) logger.info(f"Stored {len(new_docs)} new unique documents")
正式处理文档
.apipeline_process_enqueue_documents() 大体的结构分成 async with 部分和 try ... finally 部分,分别对应“获取所有待处理文档”和“处理文档”的逻辑。
ifnot to_process_docs: logger.info("No documents to process") return
pipeline_status.update( { "busy": True, "job_name": "Default Job", "job_start": datetime.now().isoformat(), "docs": 0, "batchs": 0, # Total number of files to be processed "cur_batch": 0, # Number of files already processed "request_pending": False, # Clear any previous request "latest_message": "", } ) # Cleaning history_messages without breaking it as a shared list object del pipeline_status["history_messages"][:] else: # Another process is busy, just set request flag and return pipeline_status["request_pending"] = True logger.info( "Another process is already processing the document queue. Request queued." ) return
收集完 chunk_results 后,直接用 list 的 extend() 方法合并到 all_nodes, all_edges 里面
合并 chunk
1 2 3 4 5 6 7 8 9 10 11 12 13
# Collect all nodes and edges from all chunks all_nodes = defaultdict(list) all_edges = defaultdict(list)
for maybe_nodes, maybe_edges in chunk_results: # Collect nodes for entity_name, entities in maybe_nodes.items(): all_nodes[entity_name].extend(entities)
# Collect edges with sorted keys for undirected graph for edge_key, edges in maybe_edges.items(): sorted_edge_key = tuple(sorted(edge_key)) all_edges[sorted_edge_key].extend(edges)
紧接着处理每一条记录(通过正则的匹配字符串可以发现每一条 Record 都包裹在一对圆括号内),在每一条 record 的内部,再使用 tuple delimiter 分割出 Entity 与 Attribute
1 2 3 4 5 6 7 8
for record in records: record = re.search(r"\((.*)\)", record) if record isNone: continue record = record.group(1) record_attributes = split_string_by_multi_markers( record, [context_base["tuple_delimiter"]] )
# Process gleaning result separately with file path glean_nodes, glean_edges = await _process_extraction_result( glean_result, chunk_key, file_path )
# Merge results - only add entities and edges with new names for entity_name, entities in glean_nodes.items(): if ( entity_name notin maybe_nodes ): # Only accetp entities with new name in gleaning stage maybe_nodes[entity_name].extend(entities) for edge_key, edges in glean_edges.items(): if ( edge_key notin maybe_edges ): # Only accetp edges with new name in gleaning stage maybe_edges[edge_key].extend(edges)
# Centralized processing of all nodes and edges entities_data = [] relationships_data = []
# Use graph database lock to ensure atomic merges and updates asyncwith graph_db_lock: # Process and update all entities at once for entity_name, entities in all_nodes.items(): entity_data = await _merge_nodes_then_upsert( entity_name, entities, knowledge_graph_inst, global_config, pipeline_status, pipeline_status_lock, llm_response_cache, ) entities_data.append(entity_data)
# Process and update all relationships at once for edge_key, edges in all_edges.items(): edge_data = await _merge_edges_then_upsert( edge_key[0], edge_key[1], edges, knowledge_graph_inst, global_config, pipeline_status, pipeline_status_lock, llm_response_cache, ) if edge_data isnotNone: relationships_data.append(edge_data)
然后是更新节点数据库
更新节点数据库
1 2 3 4 5 6 7 8 9 10 11 12 13
# Update vector databases with all collected data if entity_vdb isnotNoneand entities_data: data_for_vdb = { compute_mdhash_id(dp["entity_name"], prefix="ent-"): { "entity_name": dp["entity_name"], "entity_type": dp["entity_type"], "content": f"{dp['entity_name']}\n{dp['description']}", "source_id": dp["source_id"], "file_path": dp.get("file_path", "unknown_source"), } for dp in entities_data } await entity_vdb.upsert(data_for_vdb)