Building BarGPT with CrewAI Flows

Learn how to build an autonomous cocktail content creator using CrewAI Flows


Building BarGPT with CrewAI Flows

In our previous articles, we explored building agents using CrewAI and looked at different approaches to creating autonomous agents. Today, we'll dive into a practical example using CrewAI's Flow feature to build BarGPT - an autonomous cocktail content creation agent.

What are CrewAI Flows?

CrewAI Flows is a powerful feature that allows you to create structured, state-machine-like workflows for your AI agents. It combines the flexibility of CrewAI's agent system with the predictability of defined process flows. This makes it perfect for building complex, multi-step autonomous applications.

BarGPT: An Autonomous Cocktail Content Creator

BarGPT is an AI agent that automatically creates and publishes cocktail-related content. It:

  1. Researches trending topics
  2. Creates themed cocktail recipes
  3. Writes articles about the cocktails
  4. Publishes the content
  5. Shares it on social media

Let's look at how we implemented this using CrewAI Flows.

Flow State Management

First, we define our flow state using Pydantic models:

class TrendingPostState(BaseModel):
    """State for tracking the trending post creation process."""
    recent_topics: List[str] = Field(default_factory=list)
    selected_topic: Optional[SelectedTrendingTopic] = None
    cocktails: List[Cocktail] = Field(default_factory=list)
    article: Optional[Article] = None
    publish_url: Optional[str] = None
    status: str = "pending"

Flow Implementation

The core of BarGPT is implemented as a Flow class that inherits from Flow[TrendingPostState]. Here's how the main steps are structured:

class TrendingPostFlow(Flow[TrendingPostState]):
    @start()
    def get_recent_topics(self):
        """Get recent topics to avoid duplication."""
        articles = get_articles()
        recent_article_topics = [
            f"{article.title} - {article.description}" 
            for article in articles
            if article.title and article.description
        ]
        self.state.recent_topics = recent_article_topics
        return self.state.recent_topics

    @listen(get_recent_topics)
    def research_topics(self):
        """Research potential trending topics for cocktail content."""
        research_agent = ResearchAgent()
        result = research_agent.run({
            "request": "What are the latest trending topics?", 
            "recent_topics": self.state.recent_topics
        })
        self.state.selected_topic = result.json_dict
        return result.json_dict

Status Updates and Progress Tracking

One of the key features of our implementation is real-time status updates. This is crucial for long-running autonomous processes:

def append_status_update(self, description: str, output: Any = None) -> None:
    """Helper method to send status updates via callback if set."""
    if self.status_callback:
        update = {
            "timestamp": datetime.utcnow().isoformat(),
            "description": description,
            "output": output if output is not None else description
        }
        self.status_callback(update)

Cocktail Creation and Content Publishing

The flow continues with cocktail creation and content publishing steps:

@listen(research_topics)
def create_cocktails(self):
    """Create cocktail recipes based on selected topic."""
    topic = self.state.selected_topic
    cocktails = []
    
    for prompt in topic['cocktailPrompts']:
        cocktail_response = create_cocktail(prompt)
        cocktail_data = json.loads(cocktail_response["cocktail"])
        cocktail = Cocktail(
            name=cocktail_data.get("name"),
            url=cocktail_data.get("cocktailid"),
            description=cocktail_data.get("description"),
            image_url=cocktail_data.get("imageurl")
        )
        cocktails.append(cocktail)
    
    self.state.cocktails = cocktails
    return cocktails

Running the Flow

The entire flow is wrapped in a CrewAgent class for easy integration with other systems:

class BarGPTTrendingPostFlow(CrewAgent):
    def __init__(self):
        super().__init__()
        initial_state = TrendingPostState()
        self._flow = TrendingPostFlow(initial_state=initial_state)

    def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        result = self._flow.kickoff()
        return result.model_dump()

Key Benefits of Using CrewAI Flows

  1. State Management: The flow system maintains clear state throughout the process
  2. Error Handling: Each step can be monitored and errors handled appropriately
  3. Progress Tracking: Real-time status updates for long-running processes
  4. Modularity: Easy to add or modify steps in the workflow
  5. Visualization: Built-in support for visualizing the flow structure

Conclusion

CrewAI Flows provides a powerful framework for building complex, autonomous AI agents. The BarGPT implementation demonstrates how to create a practical application that combines multiple AI capabilities into a cohesive, automated workflow.

Whether you're building content creation systems, data processing pipelines, or other autonomous agents, CrewAI Flows offers a structured approach to managing complex AI workflows while maintaining flexibility and control.

Give it a try and see how CrewAI Flows can help streamline your AI agent development process!