Featured image of post Building a Generative AI Search Engine with PlanAI

Building a Generative AI Search Engine with PlanAI

Learn how to implement a Perplexity-style generative AI search engine using PlanAI, a Python framework for building AI workflows

Building a Generative AI Search Engine with PlanAI

PlanAI is an open-source Python framework that simplifies building complex AI workflows. In this tutorial, we’ll implement a generative AI search engine similar to Perplexity using PlanAI’s task-based architecture and integrations.

This tutorial is aimed at developers with a basic understanding of Python and general familiarity with AI concepts. We’ll be building a search engine that can answer complex questions by synthesizing information from multiple web sources. It’s “Perplexity-style” in that it provides a concise, AI-generated answer along with cited sources, much like the search engine Perplexity.ai. PlanAI makes building this type of application much easier by handling the complexities of task dependencies, data flow, caching, and integrating with various Large Language Models (LLMs). It even allows for human-in-the-loop input when automated methods fail, making it robust for real-world scenarios.

Prerequisites

  • Basic understanding of Python
  • OpenAI API key
  • Serper API key for Google Search integration
  • Python 3.10 or later
  • PlanAI library - latest version from github

Architecture Overview

Our search engine implements a Perplexity-style workflow:

  1. Convert a user question into specific search queries
  2. Execute searches using Google (via Serper)
  3. Filter the most relevant results
  4. Fetch and process webpage content
  5. Consolidate all fetched pages
  6. Generate a comprehensive answer
graph TD A[User Question] --> B[Generate Queries] B --> C[Execute Searches] C --> D[Filter Results] D --> E[Fetch Content] H --> F[Generate Answer] E <--> G[Ask User for Content] E --> H[Consolidate Sources] F --> I[Print Results]

Each step is implemented as a specialized TaskWorker in PlanAI’s graph-based architecture.

Defining Data Models with Pydantic

PlanAI uses Pydantic for type-safe data handling. Let’s define our core data models:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from pydantic import Field
from planai import Task

class Question(Task):
    question: str = Field(..., description="The question to be answered")

class SearchQueries(Task):
    queries: List[str] = Field(..., description="The search queries to execute")

class SearchResult(Task):
    title: str
    link: str
    snippet: str

class FilteredResults(Task):
    results: List[SearchResult]
    explanation: str = Field(
        ..., description="Explanation for why these results were selected"
    )

class PageResult(Task):
    url: str
    title: str
    content: Optional[str]

class ConsolidatedPages(Task):
    pages: List[PageResult]


class LLMAnswer(Task):
    answer: str

class Answer(Task):
    question: str
    answer: str
    sources: List[str]

Generating Search Queries

The first step is converting a user question into targeted search queries:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class QuestionToQueries(CachedLLMTaskWorker):
    """
    Converts user questions into search queries using an LLM.
    Uses caching to avoid redundant LLM calls.
    """
    output_types: List[Type[Task]] = [SearchQueries]
    llm_input_type: Type[Task] = Question
    prompt: str = dedent(
        """
        Generate two distinct search queries to find information to answer this question:
        {question}

        Guidelines:
        - Queries should be specific and focused
        - Use different angles/approaches in each query
        - Return exactly 2 queries
    """
    ).strip()

    def pre_process(self, task: Question):
        return None

    def format_prompt(self, task: Question) -> str:
        return self.prompt.format(question=task.question)

Executing Searches

PlanAI provides a Serper integration for Google searches:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class SearchExecutor(CachedTaskWorker):
    """
    Exectures a web search for each query and combines all the unique results
    """
    output_types: List[Type[Task]] = [SearchResults]
    max_results: int = Field(10, description="Maximum number of results per query")

    def consume_work(self, task: SearchQueries):
        all_results = []
        for query in task.queries:
            results = SerperGoogleSearchTool().search_internet(
                query, num_results=self.max_results, print_func=self.print
            )
            all_results.extend(
                [SearchResult(title=r["title"], link=r["link"], snippet=r["snippet"])
                    for r in results]
            )

Filtering Results

We ask an LLM to cut the results down to the ones most likely to answer the question.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class ResultFilter(CachedLLMTaskWorker):
    output_types: List[Type[Task]] = [FilteredResults]
    llm_input_type: Type[Task] = SearchResults
    prompt: str = dedent(
        """
        Analyze these search results to find the 5 most promising sources to answer this question:
        {question}

        Guidelines:
        - Select credible and relevant sources
        - Prefer sources that directly address the question
        - Avoid duplicates and similar content
    """
    ).strip()

    def format_prompt(self, task: SearchResults) -> str:
        question = task.find_input_task(Question)
        if not question:
            raise ValueError("No question found in input tasks")
        return self.prompt.format(question=question.question)

Content Fetching with Playwright

The WebBrowser integration uses Playwright to fetch webpage content:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def extract_markdown_from_pdf(pdf_path: str, print_func: callable = print) -> str:
    try:
        md_text = pymupdf4llm.to_markdown(pdf_path, show_progress=False)
        return md_text
    except Exception as e:
        print_func(f"Error extracting text from PDF: {e}")
        return None

class PageFetcher(CachedTaskWorker):
    output_types: List[Type[Task]] = [PageResult]

    def consume_work(self, task: SearchResult):
        content = WebBrowser.get_markdown_from_page(
            task.link,
            extract_markdown_from_pdf=extract_markdown_from_pdf,
            print_func=self.print,
        )

        # code to ask the user for data on fetch failures removed

        if content:
            # Remove markdown links while preserving the link text
            content = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", content)

        result = PageResult(url=task.link, title=task.title, content=content)
        self.publish_work(task=result, input_task=task)

Consolidating All Sources

We used a JoinedTaskWorker to wait until all sources have been fetched and pass them on to the question answering component.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class PageConsolidator(JoinedTaskWorker):
    output_types: List[Type[Task]] = [ConsolidatedPages]
    join_type: Type[TaskWorker] = InitialTaskWorker

    def consume_work_joined(self, task: List[PageResult]):
        pages = []
        for entry in task:
            pages.append(entry)

        self.publish_work(task=ConsolidatedPages(pages=pages), input_task=task[0])

Generating the Final Answer

Finally, we use a more powerful LLM to synthesize the information:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class QuestionAnswerer(CachedLLMTaskWorker):
    output_types: List[Type[Task]] = [Answer]
    llm_input_type: Type[Task] = ConsolidatedPages
    llm_output_type: Type[Task] = LLMAnswer
    prompt: str = dedent(
        """
        Answer this question using the provided source materials:
        Question: {question}

        Guidelines:
        - Be specific and direct in your answer
        - Use information only from the provided sources
        - Cite sources when making specific claims
        - If the sources don't contain enough information, acknowledge the limitations
    """
    ).strip()

    def format_prompt(self, task: ConsolidatedPages) -> str:
        question = task.find_input_task(Question)
        if not question:
            raise ValueError("No question found in input tasks")
        return self.prompt.format(question=question.question)

Putting It All Together

PlanAI connects these components into a processing graph:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def get_question_graph(llm_small, llm_reasoning):
    graph = Graph(name="Question Answering")
    graph.add_workers(
        question_to_queries,
        search_executor,
        result_filter,
        search_result_splitter,
        page_fetcher,
        page_consolidator,
        question_answerer,
    )
    graph.set_dependency(question_to_queries, search_executor).next(result_filter)
        .next(search_result_splitter).next(page_fetcher)
        .next(page_consolidator).next(question_answerer).sink(Answer)
    return graph

Running the Search Engine

The engine can process multiple questions in parallel:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def main():
    llm_small = llm_from_config(provider="openai", model_name="gpt-4o-mini")
    llm_reasoning = llm_from_config(provider="openai", model_name="gpt-4o")
    graph = get_question_graph(llm_small, llm_reasoning=llm_reasoning)
    
    initial_task = Question(question="When will Friedman's doctrine end?")
    graph.run(
        initial_tasks=[(graph.get_worker_by_input_type(Question), initial_task)],
        run_dashboard=True,
    )

The output is markdown-formatted text that includes the answer and sources, which can be further processed or displayed as needed. As seen in the following example case.

Example Case

Question: When will Friedman’s doctrine end?

Answer: The materials suggest that the influence of Milton Friedman’s shareholder doctrine, which promotes the primacy of profit maximization for shareholders, is already perceived as declining in many spheres of thought and practice. For instance, the shift towards stakeholder capitalism, reflected in changes to corporate priorities, criticisms of shareholder primacy, and new considerations for corporate governance, is evident in the sources provided. However, no specific conclusion was reached in these articles about when Friedman’s doctrine might fully cease being relevant or influential. Its complete conclusion may depend on ongoing societal, political, and economic shifts.

Sources:

Extensions

We could make the GenAI search engine more powerful and accurate by adding another LLMTaskWorker that determines whether a page source has sufficient information to answer the question. If we don’t get enough relevant sources, we could ask for additional searches and page fetches.

Conclusion

PlanAI’s task-based architecture and built-in integrations make it straightforward to implement complex AI workflows. This example demonstrates how to build a generative AI search engine, but the same principles can be applied to many other AI applications.

The views expressed on these pages are my own and do not represent the views of anyone else.
Built with Hugo - Theme Stack designed by Jimmy