Skip to content

Commit

Permalink
add page number info to new structure modified pkl structure tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Yikang Yu authored and FranardoHuang committed Dec 27, 2024
1 parent a10f7ca commit d29d177
Show file tree
Hide file tree
Showing 30 changed files with 497 additions and 279 deletions.
18 changes: 9 additions & 9 deletions rag/file_conversion_router/classes/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@ class Chunk:
# Can they be combined into a single metadata field?
content: str
titles: str = "default_title"
chunk_url: List[str] = ["default_no_url"],
chunk_url: str = "default_no_url",
metadata: Dict[str, Any] = field(default_factory=dict)
page_num: Any = None

def __post_init__(self):
"""Ensure metadata is properly initialized."""
if self.metadata is None:
self.metadata = {}

# Ensure core properties are included in metadata
if not isinstance(self.metadata, dict):
raise TypeError(f"metadata must be a dictionary, got {type(self.metadata).__name__}")
self.metadata.update({
'titles': self.titles,
'chunk_url': self.chunk_url,
**self.metadata # Keep any existing metadata
'page_num': self.page_num
})

def __eq__(self, other):
Expand All @@ -48,7 +46,8 @@ def __eq__(self, other):
return (
self.titles == other.titles and
self.content == other.content and
self.chunk_url == other.chunk_url
self.chunk_url == other.chunk_url and
self.page_num == other.page_num
)

def update_metadata(self, new_metadata: Dict[str, Any]) -> None:
Expand Down Expand Up @@ -87,5 +86,6 @@ def core_metadata(self) -> Dict[str, Any]:
"""
return {
'titles': self.titles,
'chunk_url': self.chunk_url
'chunk_url': self.chunk_url,
'page_num': self.page_num
}
254 changes: 155 additions & 99 deletions rag/file_conversion_router/classes/page.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import string
from typing import Optional
from rag.file_conversion_router.classes.chunk import Chunk
import tiktoken
import yaml
import pickle
import re
from pathlib import Path


class Page:

PAGE_LENGTH_THRESHOLD = 20

def __init__(self, pagename: str, content: dict, filetype: str, page_url: str = ""):
def __init__(self, pagename: str, content: dict, filetype: str, page_url: str = "", page_path: Optional[Path] = None):
"""
Initialize a Page instance.
Expand All @@ -24,6 +28,20 @@ def __init__(self, pagename: str, content: dict, filetype: str, page_url: str =
self.segments = []
self.tree_segments = []
self.chunks = []
self.page_numbers = self.load_metadata_page_numbers(
page_path) if page_path else None

def load_metadata_page_numbers(self, page_path: Path):
try:
with open(page_path, 'r', encoding='utf-8') as f:
page_data = yaml.safe_load(f)
loaded_page_numbers = [{'page_num': page_info.get('page_num'), 'start_line': page_info.get('start_line')}
for page_info in page_data.get('pages', [])]
print(f"Loaded page numbers: {loaded_page_numbers}")
return loaded_page_numbers
except Exception as e:
print(f"Error reading metadata: {e}")
return []

def recursive_separate(self, response: str, token_limit: int = 400) -> list:
"""
Expand Down Expand Up @@ -87,154 +105,191 @@ def count_consecutive_hashes(s):
break
return count

headers_content = [] # Stores tuples of ((header, level), content)
curheader = None # Current header, initially None
current_content = "" # Accumulates content for the current header
in_code_block = False # Indicates if inside a code block
md_content = md_content.split('\n')
for line in md_content:
headers_content = []
curheader = None
current_content = ""
in_code_block = False
md_lines = md_content.split('\n')

current_page_num = self.page_numbers[0]['page_num'] if self.page_numbers else None
page_num_index = 1 # Start from the second page since we've assigned the first
total_pages = len(self.page_numbers) if self.page_numbers else 0

for line_num, line in enumerate(md_lines, start=1):
stripped_line = line.strip()

# Update current_page_num based on start_line
if self.page_numbers:
while page_num_index < total_pages and line_num >= self.page_numbers[page_num_index]['start_line']:
current_page_num = self.page_numbers[page_num_index]['page_num']
page_num_index += 1
if "```" in stripped_line:
in_code_block = not in_code_block # Toggle state
in_code_block = not in_code_block

if in_code_block:
if curheader:
current_content += f"{line}\n" # Add to content within code blocks
current_content += f"{line}\n"
else:
if line.startswith('#'):
if curheader:
headers_content.append((curheader, current_content)) # Store previous header and its content
header = line
header_level = count_consecutive_hashes(header)
header = header.strip('#').strip()
curheader = (header, header_level) # Start new header context
current_content = "" # Reset content for new header
if line.startswith('#'): # Identify headers
if curheader: # Save the previous header and its content
headers_content.append(((curheader, current_page_num), current_content.strip()))
header_level = count_consecutive_hashes(line) # Count header level
header = line.strip('#').strip()
curheader = (header, header_level) # Save the header and level
current_content = "" # Reset content
else:
if curheader: # Only accumulate content if within a header
current_content += f"{line}\n"
current_content += f"{line}\n"

# Append the last header and its content, if there was any header encountered
if curheader:
headers_content.append((curheader, current_content))
headers_content.append(((curheader, current_page_num), current_content.strip()))

return headers_content

def page_seperate_to_segments(self) -> None:
self.segments = [i for i in self.extract_headers_and_content(self.content['text'])]
if not self.segments:
# LEVEL 0 for no header found
self.segments = [(("NO ANY HEADER DETECTED", 0),
self.content['text'])]
self.segments = [("(NO ANY HEADER DETECTED)", 0), self.content['text']]

def print_header_tree(self):

def print_header_tree(self) -> object:
result = ""
for (title, level), _ in self.segments:
indent = '--' * (level - 1)
header_tag = f"(h{level})"
result += f"{indent}{title} {header_tag}\n"
if level is not None:
indent = '--' * (level - 1)
result += f"{indent}{title}\n"
else:
result += f"{title} (hUnknown)\n"
return result

def tree_print(self):
new_filename = f"{self.pagename}_tree.txt" # No need to use this
top_header = []
counter = 1

for (header, level), content in self.segments:
page_toc = ""
page_path = ""
segment = ""
if len(top_header) < level:
for i in range(len(top_header), level - 1):
top_header.append(("", [], i + 1))
top_header.append((header, content, level))
for (header, page_num), content in self.segments:
level = header[1]
header_title = header[0]

# Adjust 'top_header' to match current level
if len(top_header) >= level:
# Truncate 'top_header' to the current level - 1
top_header = top_header[:level - 1]

# Append the current header with its page number (only if page number exists)
if page_num is not None:
top_header.append((header_title, content, level, page_num))

else:
# Table of Contents
page_toc += "(Table of Contents)\n"
page_toc += f"{self.print_header_tree()}\n"

# Page Path
page_path += "(Page path)\n"
first = True
for h, c, l in top_header:
if first:
page_path += f"(h{l}) {h}"
first = not first
else:
page_path += " > "
page_path += f"(h{l}) {h}"
# Segment Print
segment += f"(Segment {counter})\n"
header_list = [header[0] for header in top_header]
for h, c, l in top_header:
hash_symbols = '#' * l
top_header.append((header_title, content, level, None))

# Build the segment
segment = f"(Segment {counter})\n"
for h, c, l, p in top_header:
hash_symbols = '#' * l
if p is not None:
# segment += f"{hash_symbols}{h} (h{l}, Page {p})\n"
segment += f"{hash_symbols}{h} (h{l})\n"
else:
segment += f"{hash_symbols}{h} (h{l})\n"
segment += f"{c}\n"
# Store the information in tree_segments
self.tree_segments.append({'Page_table': page_toc, 'Page_path': header_list, 'Segment_print': segment})
top_header = top_header[:(level - 1)]
top_header.append((header, content, level))
counter += 1
segment += f"{c}\n"

# Build the Table of Contents
page_toc = "(Table of Contents)\n" + self.print_header_tree() + "\n"

# Build the Page Path
page_path = "(Page path)\n"
page_path += ' > '.join(
f"(h{l}) {h} (Page {p})" if p is not None else f"(h{l}) {h}" for h, c, l, p in top_header)

# Build header list
header_list = [h for h, c, l, p in top_header]

# Use the page number of the current header for the segment
segment_page_num = page_num if page_num is not None else None

# Add to `tree_segments`
tree_segment = {
'Page_table': self.print_header_tree(),
'Page_path': [h[0] for h in top_header],
'Segment_print': content,
'page_num': page_num
}

# Store the information in tree_segments
tree_segment = {
'Page_table': page_toc,
'Page_path': header_list,
'Segment_print': segment
}
if segment_page_num is not None:
tree_segment['page_num'] = segment_page_num

self.tree_segments.append(tree_segment)
counter += 1

# Handle the last segment
all_headers = [header[0] for header in self.segments]
if (header, level) == all_headers[-1]:
if top_header:
page_toc = ""
page_path = ""
segment = ""
# Table of Contents

page_toc += "(Table of Contents)\n"
page_toc += f"{self.print_header_tree()}\n"

# Page Path
page_path += "(Page path)\n"
first = True
for h, c, l in top_header:
for h, c, l, p in top_header:
if first:
page_path += f"(h{l}) {h}"
page_path += f"(h{l}) {h} (Page {p})" if p is not None else f"(h{l}) {h}"
first = not first
else:
page_path += " > "
page_path += f"(h{l}) {h}"
# Segment Print
page_path += f"(h{l}) {h} (Page {p})" if p is not None else f"(h{l}) {h}"

segment += f"(Segment {counter})\n"
header_list = [header[0] for header in top_header]
for h, c, l in top_header:
for h, c, l, p in top_header:
hash_symbols = '#' * l
segment += f"{hash_symbols}{h} (h{l})\n"
if p is not None:
segment += f"{hash_symbols}{h} (h{l}, Page {p})\n"
else:
segment += f"{hash_symbols}{h} (h{l})\n"
segment += f"{c}\n"
# Store the information in tree_segments
self.tree_segments.append({'Page_table': page_toc, 'Page_path': header_list, 'Segment_print': segment})
top_header = top_header[:(level - 1)]
top_header.append((header, content, level))
tree_segment = {
'Page_table': page_toc,
'Page_path': header_list,
'Segment_print': segment
}
if top_header[-1][3] is not None:
tree_segment['page_num'] = top_header[-1][3]

def tree_segments_to_chunks(self):
def generate_hyperlink_header(header_text):
"""
This function takes a header string, converts all characters to lowercase,
and replaces all spaces with dashes to create a hyperlink-friendly header.
Parameters:
header_text (str): The header string to be converted.
Returns:
str: The converted hyperlink-friendly header string.
"""
# Convert the string to lowercase
lower_text = header_text.lower()

# Replace spaces with dashes
hyperlink_header = lower_text.replace(' ', '-')

return hyperlink_header
self.tree_segments.append(tree_segment)

# seperate with recursive seperate
for i in self.tree_segments:
content_chunks = self.recursive_separate(i['Segment_print'], 400)
def tree_segments_to_chunks(self):
for segment in self.tree_segments:
content_chunks = self.recursive_separate(segment['Segment_print'], 400)
page_num = segment.get('page_num', None)
for count, content_chunk in enumerate(content_chunks):
headers = i['Page_path']
urls = [f"{self.page_url}#{generate_hyperlink_header(header)}" for header in headers]
page_path = ' > '.join(f"{item} (h{i + 1})" for i, item in enumerate(i['Page_path'])) + f" ({count})"
self.chunks.append(Chunk(page_path, content_chunk, urls))
headers = segment['Page_path']
if self.page_url and page_num:
urls = f"{self.page_url}#page={page_num}"
else:
urls = "URL_NOT_AVAILABLE"

page_path = ' > '.join(
f"{item} (h{i + 1})" for i, item in enumerate(segment['Page_path'])) + f" ({count})"
self.chunks.append(
Chunk(
content=content_chunk,
titles=headers[-1],
chunk_url=urls,
# metadata={"page_path": page_path}, # Include page_path in metadata
page_num=page_num
)
)

return self.chunks

def to_file(self, output_path: str) -> None:
Expand Down Expand Up @@ -267,3 +322,4 @@ def chunks_to_pkl(self, output_path: str) -> None:
"""
with open(output_path, "wb") as f:
pickle.dump(self.chunks, f)

Loading

0 comments on commit d29d177

Please sign in to comment.