Pipeline System API Reference
The EasilyAI pipeline system allows you to chain multiple AI operations together, creating complex workflows that combine text generation, image creation, and text-to-speech.
Core Pipeline Classes
EasilyAIPipeline
Main pipeline class for orchestrating multiple AI tasks.
class EasilyAIPipeline:
def __init__(name: str)
Parameters:
name
(str): A unique identifier for the pipeline
Attributes:
name
(str): Pipeline nametasks
(list): List of pipeline tasksresults
(list): Results from executed tasks
Methods
add_task()
Add a task to the pipeline.
def add_task(
app: EasyAIApp,
task_type: str,
prompt: str,
**kwargs
) -> None
Parameters:
app
(EasyAIApp): The AI application to use for this tasktask_type
(str): Type of task ('generate_text', 'generate_image', 'text_to_speech')prompt
(str): The prompt for this task (supports variable substitution)**kwargs
: Additional parameters for the specific task
Variable Substitution:
{previous_result}
: Result from the immediately previous task{result_0}
,{result_1}
, etc.: Results from specific task indices
Example:
from easilyai import create_app
from easilyai.pipeline import EasilyAIPipeline
# Create apps
text_app = create_app("Writer", "openai", "your-key", "gpt-4")
image_app = create_app("Artist", "openai", "your-key", "dall-e-3")
# Create pipeline
pipeline = EasilyAIPipeline("ContentCreator")
# Add tasks with variable substitution
pipeline.add_task(text_app, "generate_text", "Write a story about dragons")
pipeline.add_task(image_app, "generate_image", "Create an illustration for: {previous_result}")
run()
Execute all tasks in the pipeline sequentially.
def run() -> list
Returns:
list
: Results from all tasks in order of execution
Example:
# Execute pipeline
results = pipeline.run()
print("Story:", results[0])
print("Image URL:", results[1])
clear()
Clear all tasks and results from the pipeline.
def clear() -> None
Example:
pipeline.clear()
print(f"Tasks: {len(pipeline.tasks)}") # Output: Tasks: 0
get_task_count()
Get the number of tasks in the pipeline.
def get_task_count() -> int
Returns:
int
: Number of tasks in the pipeline
get_results()
Get results from executed tasks.
def get_results() -> list
Returns:
list
: Results from all executed tasks
Pipeline Task Class
PipelineTask
Internal class representing a single task in the pipeline.
class PipelineTask:
def __init__(
app: EasyAIApp,
task_type: str,
prompt: str,
**kwargs
)
Attributes:
app
(EasyAIApp): The AI application for this tasktask_type
(str): Type of taskprompt
(str): Task promptkwargs
(dict): Additional parameters
Advanced Pipeline Features
Conditional Tasks
Create pipelines with conditional execution:
from easilyai import create_app
from easilyai.pipeline import EasilyAIPipeline
class ConditionalPipeline(EasilyAIPipeline):
def __init__(self, name: str):
super().__init__(name)
self.conditions = []
def add_conditional_task(self, app, task_type, prompt, condition_func, **kwargs):
"""Add a task that only executes if condition is met"""
task = {
"app": app,
"task_type": task_type,
"prompt": prompt,
"condition": condition_func,
"kwargs": kwargs
}
self.tasks.append(task)
def run(self):
"""Execute tasks with conditional logic"""
self.results = []
for i, task in enumerate(self.tasks):
# Check if this is a conditional task
if "condition" in task:
# Evaluate condition based on previous results
if not task["condition"](self.results):
self.results.append(None) # Skip task
continue
# Substitute variables in prompt
prompt = self._substitute_variables(task["prompt"], i)
# Execute task
try:
if hasattr(task, "app"): # Regular task
result = task["app"].request(prompt, task_type=task["task_type"], **task.get("kwargs", {}))
else: # Conditional task
result = task["app"].request(prompt, task_type=task["task_type"], **task.get("kwargs", {}))
self.results.append(result)
except Exception as e:
self.results.append(f"Error: {e}")
return self.results
# Example usage
def should_create_image(results):
"""Only create image if text is longer than 100 characters"""
if not results:
return False
return len(results[-1]) > 100
conditional_pipeline = ConditionalPipeline("ConditionalContent")
conditional_pipeline.add_task(text_app, "generate_text", "Write about AI")
conditional_pipeline.add_conditional_task(
image_app,
"generate_image",
"Create image for: {previous_result}",
should_create_image
)
Parallel Execution
Execute multiple tasks in parallel:
import asyncio
from concurrent.futures import ThreadPoolExecutor
class ParallelPipeline(EasilyAIPipeline):
def __init__(self, name: str, max_workers: int = 3):
super().__init__(name)
self.max_workers = max_workers
def add_parallel_group(self, tasks: list):
"""Add a group of tasks to execute in parallel"""
self.tasks.append({"type": "parallel", "tasks": tasks})
def run(self):
"""Execute pipeline with parallel task groups"""
self.results = []
for task_group in self.tasks:
if isinstance(task_group, dict) and task_group.get("type") == "parallel":
# Execute parallel group
parallel_results = self._execute_parallel_group(task_group["tasks"])
self.results.extend(parallel_results)
else:
# Execute single task
prompt = self._substitute_variables(task_group.prompt, len(self.results))
result = task_group.app.request(
prompt,
task_type=task_group.task_type,
**task_group.kwargs
)
self.results.append(result)
return self.results
def _execute_parallel_group(self, tasks):
"""Execute a group of tasks in parallel"""
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for task in tasks:
prompt = self._substitute_variables(task.prompt, len(self.results))
future = executor.submit(
task.app.request,
prompt,
task_type=task.task_type,
**task.kwargs
)
futures.append(future)
# Collect results
results = []
for future in futures:
try:
result = future.result()
results.append(result)
except Exception as e:
results.append(f"Error: {e}")
return results
# Example usage
parallel_pipeline = ParallelPipeline("ParallelContent")
# Add parallel tasks
parallel_tasks = [
PipelineTask(text_app, "generate_text", "Write about AI"),
PipelineTask(text_app, "generate_text", "Write about machine learning"),
PipelineTask(text_app, "generate_text", "Write about data science")
]
parallel_pipeline.add_parallel_group(parallel_tasks)
Error Handling in Pipelines
Robust error handling for pipeline execution:
class RobustPipeline(EasilyAIPipeline):
def __init__(self, name: str, fail_fast: bool = False):
super().__init__(name)
self.fail_fast = fail_fast
self.errors = []
def run(self):
"""Execute pipeline with robust error handling"""
self.results = []
self.errors = []
for i, task in enumerate(self.tasks):
try:
# Substitute variables
prompt = self._substitute_variables(task.prompt, i)
# Execute task with retry logic
result = self._execute_with_retry(task, prompt)
self.results.append(result)
except Exception as e:
error_info = {
"task_index": i,
"task_type": task.task_type,
"error": str(e)
}
self.errors.append(error_info)
if self.fail_fast:
raise e
else:
# Continue with placeholder result
self.results.append(f"[Error in task {i}: {e}]")
return self.results
def _execute_with_retry(self, task, prompt, max_retries=3):
"""Execute task with retry logic"""
for attempt in range(max_retries):
try:
return task.app.request(
prompt,
task_type=task.task_type,
**task.kwargs
)
except Exception as e:
if attempt == max_retries - 1:
raise e
else:
# Wait before retry
import time
time.sleep(2 ** attempt)
def get_error_summary(self):
"""Get summary of errors encountered"""
return {
"total_errors": len(self.errors),
"errors": self.errors,
"success_rate": (len(self.tasks) - len(self.errors)) / len(self.tasks) * 100
}
# Example usage
robust_pipeline = RobustPipeline("RobustContent", fail_fast=False)
robust_pipeline.add_task(text_app, "generate_text", "Write about AI")
robust_pipeline.add_task(image_app, "generate_image", "Invalid prompt that might fail")
results = robust_pipeline.run()
error_summary = robust_pipeline.get_error_summary()
print(f"Success rate: {error_summary['success_rate']:.1f}%")
print(f"Errors: {error_summary['total_errors']}")
Pipeline Templates
Content Creation Pipeline
def create_content_pipeline(text_app, image_app, tts_app, topic):
"""Create a complete content creation pipeline"""
pipeline = EasilyAIPipeline("ContentCreation")
# Generate article
pipeline.add_task(
text_app,
"generate_text",
f"Write a comprehensive 500-word article about {topic}"
)
# Create summary
pipeline.add_task(
text_app,
"generate_text",
"Create a 2-sentence summary of this article: {previous_result}"
)
# Generate image
pipeline.add_task(
image_app,
"generate_image",
"Create a professional illustration for this article: {result_1}"
)
# Create audio narration
pipeline.add_task(
tts_app,
"text_to_speech",
"{result_0}", # Use the full article
voice="nova",
output_file=f"{topic.replace(' ', '_')}_narration.mp3"
)
return pipeline
# Usage
content_pipeline = create_content_pipeline(text_app, image_app, tts_app, "renewable energy")
results = content_pipeline.run()
Analysis Pipeline
def create_analysis_pipeline(analyst_app, visualizer_app, data):
"""Create a data analysis pipeline"""
pipeline = EasilyAIPipeline("DataAnalysis")
# Initial analysis
pipeline.add_task(
analyst_app,
"generate_text",
f"Analyze this data and identify key trends: {data}"
)
# Generate insights
pipeline.add_task(
analyst_app,
"generate_text",
"Based on this analysis, provide 3 actionable business insights: {previous_result}"
)
# Create visualization prompt
pipeline.add_task(
analyst_app,
"generate_text",
"Create a detailed prompt for visualizing this analysis: {result_0}"
)
# Generate visualization
pipeline.add_task(
visualizer_app,
"generate_image",
"{previous_result}"
)
return pipeline
Multi-Language Pipeline
def create_translation_pipeline(translator_app, languages, source_text):
"""Create a multi-language translation pipeline"""
pipeline = EasilyAIPipeline("MultiLanguageTranslation")
# Add source text as first result
pipeline.results = [source_text]
# Add translation tasks for each language
for lang in languages:
pipeline.add_task(
translator_app,
"generate_text",
f"Translate this text to {lang}: {source_text}"
)
return pipeline
Pipeline Utilities
Variable Substitution
Internal method for replacing variables in prompts:
def _substitute_variables(self, prompt: str, current_index: int) -> str
Supported Variables:
{previous_result}
: Result from previous task{result_N}
: Result from task at index N{current_index}
: Current task index{task_count}
: Total number of tasks
Pipeline Validation
Validate pipeline before execution:
def validate_pipeline(pipeline: EasilyAIPipeline) -> dict:
"""Validate pipeline configuration"""
issues = []
if not pipeline.tasks:
issues.append("Pipeline has no tasks")
for i, task in enumerate(pipeline.tasks):
# Check for circular references
if "{result_" + str(i) + "}" in task.prompt:
issues.append(f"Task {i} references its own result")
# Check for forward references
import re
forward_refs = re.findall(r'\{result_(\d+)\}', task.prompt)
for ref in forward_refs:
if int(ref) >= i:
issues.append(f"Task {i} references future result {ref}")
return {
"valid": len(issues) == 0,
"issues": issues
}
# Usage
validation = validate_pipeline(pipeline)
if not validation["valid"]:
print("Pipeline issues:")
for issue in validation["issues"]:
print(f" - {issue}")
Best Practices
1. Design for Failure
# Use robust pipelines for production
robust_pipeline = RobustPipeline("Production", fail_fast=False)
# Always check for errors
results = robust_pipeline.run()
if robust_pipeline.errors:
print("Pipeline completed with errors")
2. Optimize Task Order
# Place fast tasks first, expensive tasks last
pipeline.add_task(fast_app, "generate_text", "Quick summary")
pipeline.add_task(expensive_app, "generate_image", "Detailed illustration")
3. Use Meaningful Variable Names
# Good: Clear variable usage
pipeline.add_task(image_app, "generate_image", "Illustration for story: {result_0}")
# Avoid: Unclear references
pipeline.add_task(image_app, "generate_image", "Make image: {previous_result}")
4. Validate Inputs
def safe_pipeline_run(pipeline):
"""Safely run a pipeline with validation"""
validation = validate_pipeline(pipeline)
if not validation["valid"]:
raise ValueError(f"Invalid pipeline: {validation['issues']}")
return pipeline.run()
The pipeline system provides powerful capabilities for creating complex AI workflows while maintaining flexibility and robustness.