Building a Generative AI Search Engine with PlanAI
PlanAI is an open-source Python framework that simplifies building complex AI workflows. In this tutorial, we’ll implement a generative AI search engine similar to Perplexity using PlanAI’s task-based architecture and integrations.
This tutorial is aimed at developers with a basic understanding of Python and general familiarity with AI concepts. We’ll be building a search engine that can answer complex questions by synthesizing information from multiple web sources. It’s “Perplexity-style” in that it provides a concise, AI-generated answer along with cited sources, much like the search engine Perplexity.ai. PlanAI makes building this type of application much easier by handling the complexities of task dependencies, data flow, caching, and integrating with various Large Language Models (LLMs). It even allows for human-in-the-loop input when automated methods fail, making it robust for real-world scenarios.
Prerequisites
- Basic understanding of Python
- OpenAI API key
- Serper API key for Google Search integration
- Python 3.10 or later
- PlanAI library - latest version from github
Architecture Overview
Our search engine implements a Perplexity-style workflow:
- Convert a user question into specific search queries
- Execute searches using Google (via Serper)
- Filter the most relevant results
- Fetch and process webpage content
- Consolidate all fetched pages
- Generate a comprehensive answer
graph TD
A[User Question] --> B[Generate Queries]
B --> C[Execute Searches]
C --> D[Filter Results]
D --> E[Fetch Content]
H --> F[Generate Answer]
E <--> G[Ask User for Content]
E --> H[Consolidate Sources]
F --> I[Print Results]
Each step is implemented as a specialized TaskWorker in PlanAI’s graph-based architecture.
Defining Data Models with Pydantic
PlanAI uses Pydantic for type-safe data handling. Let’s define our core data models:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| from pydantic import Field
from planai import Task
class Question(Task):
question: str = Field(..., description="The question to be answered")
class SearchQueries(Task):
queries: List[str] = Field(..., description="The search queries to execute")
class SearchResult(Task):
title: str
link: str
snippet: str
class FilteredResults(Task):
results: List[SearchResult]
explanation: str = Field(
..., description="Explanation for why these results were selected"
)
class PageResult(Task):
url: str
title: str
content: Optional[str]
class ConsolidatedPages(Task):
pages: List[PageResult]
class LLMAnswer(Task):
answer: str
class Answer(Task):
question: str
answer: str
sources: List[str]
|
Generating Search Queries
The first step is converting a user question into targeted search queries:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| class QuestionToQueries(CachedLLMTaskWorker):
"""
Converts user questions into search queries using an LLM.
Uses caching to avoid redundant LLM calls.
"""
output_types: List[Type[Task]] = [SearchQueries]
llm_input_type: Type[Task] = Question
prompt: str = dedent(
"""
Generate two distinct search queries to find information to answer this question:
{question}
Guidelines:
- Queries should be specific and focused
- Use different angles/approaches in each query
- Return exactly 2 queries
"""
).strip()
def pre_process(self, task: Question):
return None
def format_prompt(self, task: Question) -> str:
return self.prompt.format(question=task.question)
|
Executing Searches
PlanAI provides a Serper integration for Google searches:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| class SearchExecutor(CachedTaskWorker):
"""
Exectures a web search for each query and combines all the unique results
"""
output_types: List[Type[Task]] = [SearchResults]
max_results: int = Field(10, description="Maximum number of results per query")
def consume_work(self, task: SearchQueries):
all_results = []
for query in task.queries:
results = SerperGoogleSearchTool().search_internet(
query, num_results=self.max_results, print_func=self.print
)
all_results.extend(
[SearchResult(title=r["title"], link=r["link"], snippet=r["snippet"])
for r in results]
)
|
Filtering Results
We ask an LLM to cut the results down to the ones most likely to answer the question.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| class ResultFilter(CachedLLMTaskWorker):
output_types: List[Type[Task]] = [FilteredResults]
llm_input_type: Type[Task] = SearchResults
prompt: str = dedent(
"""
Analyze these search results to find the 5 most promising sources to answer this question:
{question}
Guidelines:
- Select credible and relevant sources
- Prefer sources that directly address the question
- Avoid duplicates and similar content
"""
).strip()
def format_prompt(self, task: SearchResults) -> str:
question = task.find_input_task(Question)
if not question:
raise ValueError("No question found in input tasks")
return self.prompt.format(question=question.question)
|
Content Fetching with Playwright
The WebBrowser integration uses Playwright to fetch webpage content:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| def extract_markdown_from_pdf(pdf_path: str, print_func: callable = print) -> str:
try:
md_text = pymupdf4llm.to_markdown(pdf_path, show_progress=False)
return md_text
except Exception as e:
print_func(f"Error extracting text from PDF: {e}")
return None
class PageFetcher(CachedTaskWorker):
output_types: List[Type[Task]] = [PageResult]
def consume_work(self, task: SearchResult):
content = WebBrowser.get_markdown_from_page(
task.link,
extract_markdown_from_pdf=extract_markdown_from_pdf,
print_func=self.print,
)
# code to ask the user for data on fetch failures removed
if content:
# Remove markdown links while preserving the link text
content = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", content)
result = PageResult(url=task.link, title=task.title, content=content)
self.publish_work(task=result, input_task=task)
|
Consolidating All Sources
We used a JoinedTaskWorker to wait until all sources have been fetched and pass them on to the question answering component.
1
2
3
4
5
6
7
8
9
10
| class PageConsolidator(JoinedTaskWorker):
output_types: List[Type[Task]] = [ConsolidatedPages]
join_type: Type[TaskWorker] = InitialTaskWorker
def consume_work_joined(self, task: List[PageResult]):
pages = []
for entry in task:
pages.append(entry)
self.publish_work(task=ConsolidatedPages(pages=pages), input_task=task[0])
|
Generating the Final Answer
Finally, we use a more powerful LLM to synthesize the information:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| class QuestionAnswerer(CachedLLMTaskWorker):
output_types: List[Type[Task]] = [Answer]
llm_input_type: Type[Task] = ConsolidatedPages
llm_output_type: Type[Task] = LLMAnswer
prompt: str = dedent(
"""
Answer this question using the provided source materials:
Question: {question}
Guidelines:
- Be specific and direct in your answer
- Use information only from the provided sources
- Cite sources when making specific claims
- If the sources don't contain enough information, acknowledge the limitations
"""
).strip()
def format_prompt(self, task: ConsolidatedPages) -> str:
question = task.find_input_task(Question)
if not question:
raise ValueError("No question found in input tasks")
return self.prompt.format(question=question.question)
|
Putting It All Together
PlanAI connects these components into a processing graph:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| def get_question_graph(llm_small, llm_reasoning):
graph = Graph(name="Question Answering")
graph.add_workers(
question_to_queries,
search_executor,
result_filter,
search_result_splitter,
page_fetcher,
page_consolidator,
question_answerer,
)
graph.set_dependency(question_to_queries, search_executor).next(result_filter)
.next(search_result_splitter).next(page_fetcher)
.next(page_consolidator).next(question_answerer).sink(Answer)
return graph
|
Running the Search Engine
The engine can process multiple questions in parallel:
1
2
3
4
5
6
7
8
9
10
| def main():
llm_small = llm_from_config(provider="openai", model_name="gpt-4o-mini")
llm_reasoning = llm_from_config(provider="openai", model_name="gpt-4o")
graph = get_question_graph(llm_small, llm_reasoning=llm_reasoning)
initial_task = Question(question="When will Friedman's doctrine end?")
graph.run(
initial_tasks=[(graph.get_worker_by_input_type(Question), initial_task)],
run_dashboard=True,
)
|
The output is markdown-formatted text that includes the answer and sources, which can be further processed or displayed as needed. As seen in the following example case.
Example Case
Question: When will Friedman’s doctrine end?
Answer: The materials suggest that the influence of Milton Friedman’s shareholder doctrine, which promotes the primacy of profit maximization for shareholders, is already perceived as declining in many spheres of thought and practice. For instance, the shift towards stakeholder capitalism, reflected in changes to corporate priorities, criticisms of shareholder primacy, and new considerations for corporate governance, is evident in the sources provided. However, no specific conclusion was reached in these articles about when Friedman’s doctrine might fully cease being relevant or influential. Its complete conclusion may depend on ongoing societal, political, and economic shifts.
Sources:
Extensions
We could make the GenAI search engine more powerful and accurate by adding another LLMTaskWorker that determines whether a page source has sufficient information to answer the question. If we don’t get enough relevant sources, we could ask for additional searches and page fetches.
Conclusion
PlanAI’s task-based architecture and built-in integrations make it straightforward to implement complex AI workflows. This example demonstrates how to build a generative AI search engine, but the same principles can be applied to many other AI applications.