Sync to Async Migration Guide¶
This guide helps you migrate existing sync Pulse SDK code to use the new async/await functionality.
Overview¶
The Pulse SDK's async API provides the same functionality as the sync API with these key benefits:
- Non-blocking operations: Perfect for web applications and concurrent processing
- Better resource utilization: Efficient handling of I/O-bound operations
- Concurrent processing: Process multiple operations simultaneously
- Scalability: Handle higher loads with better performance
Component Migration Map¶
| Sync Component | Async Component | Import Change |
|---|---|---|
CoreClient |
AsyncCoreClient |
pulse.core.async_client |
Analyzer |
AsyncAnalyzer |
pulse.analysis.async_analyzer |
Workflow |
AsyncWorkflow |
pulse.async_dsl |
ClientCredentialsAuth |
AsyncClientCredentialsAuth |
pulse.async_auth |
AuthorizationCodePKCEAuth |
AsyncAuthorizationCodePKCEAuth |
pulse.async_auth |
| Starter functions | Async starter functions | pulse.async_starters |
Step-by-Step Migration¶
1. Update Imports¶
Before (Sync):
from pulse.core.client import CoreClient
from pulse.analysis.analyzer import Analyzer
from pulse.dsl import Workflow
from pulse.starters import sentiment_analysis, generate_themes
from pulse.auth import ClientCredentialsAuth
After (Async):
from pulse.core.async_client import AsyncCoreClient
from pulse.analysis.async_analyzer import AsyncAnalyzer
from pulse.async_dsl import AsyncWorkflow
from pulse.async_starters import sentiment_analysis_async, generate_themes_async
from pulse.async_auth import AsyncClientCredentialsAuth
2. Update Function Signatures¶
Before (Sync):
def analyze_customer_feedback(reviews):
with CoreClient.with_client_credentials() as client:
request = SentimentRequest(inputs=reviews, fast=True)
return client.analyze_sentiment(request)
After (Async):
async def analyze_customer_feedback(reviews):
async with AsyncCoreClient.with_client_credentials_async() as client:
request = SentimentRequest(inputs=reviews, fast=True)
return await client.analyze_sentiment(request)
3. Update Client Creation¶
Before (Sync):
# Method 1: Context manager
with CoreClient.with_client_credentials() as client:
result = client.analyze_sentiment(request)
# Method 2: Manual creation
auth = ClientCredentialsAuth(client_id="...", client_secret="...")
client = CoreClient(auth=auth)
result = client.analyze_sentiment(request)
client.close()
After (Async):
# Method 1: Async context manager
async with AsyncCoreClient.with_client_credentials_async() as client:
result = await client.analyze_sentiment(request)
# Method 2: Manual creation
auth = AsyncClientCredentialsAuth(client_id="...", client_secret="...")
client = AsyncCoreClient(auth=auth)
result = await client.analyze_sentiment(request)
await client.close()
4. Update Starter Functions¶
Before (Sync):
def process_reviews(reviews):
# Generate themes
themes = generate_themes(reviews, min_themes=2, max_themes=5)
# Analyze sentiment
sentiment = sentiment_analysis(reviews)
# Allocate to themes
allocation = theme_allocation(reviews, themes=[t.shortLabel for t in themes.themes])
return themes, sentiment, allocation
After (Async):
async def process_reviews(reviews):
# Generate themes
themes = await generate_themes_async(reviews, min_themes=2, max_themes=5)
# Analyze sentiment
sentiment = await sentiment_analysis_async(reviews)
# Allocate to themes
allocation = await theme_allocation_async(reviews, themes=[t.shortLabel for t in themes.themes])
return themes, sentiment, allocation
5. Update Analyzer Workflows¶
Before (Sync):
def run_analysis_workflow(dataset):
processes = [
ThemeGeneration(min_themes=2, max_themes=4),
SentimentProcess(),
ThemeAllocation(),
]
with Analyzer(dataset=dataset, processes=processes) as analyzer:
return analyzer.run()
After (Async):
async def run_analysis_workflow(dataset):
processes = [
ThemeGeneration(min_themes=2, max_themes=4),
SentimentProcess(),
ThemeAllocation(),
]
async with AsyncAnalyzer(dataset=dataset, processes=processes) as analyzer:
return await analyzer.run()
6. Update DSL Workflows¶
Before (Sync):
def create_workflow(texts):
with Workflow() as workflow:
workflow.source("texts", texts)
workflow.theme_generation(source="texts", min_themes=2, max_themes=4)
workflow.sentiment(source="texts")
workflow.theme_allocation(inputs="texts")
return workflow.run()
After (Async):
async def create_workflow(texts):
async with AsyncWorkflow() as workflow:
workflow.source("texts", texts)
workflow.theme_generation(source="texts", min_themes=2, max_themes=4)
workflow.sentiment(source="texts")
workflow.theme_allocation(inputs="texts")
return await workflow.run()
Advanced Migration Patterns¶
1. Concurrent Processing¶
One of the biggest benefits of async is the ability to process multiple operations concurrently.
Before (Sync - Sequential):
def process_multiple_datasets(datasets):
results = []
for dataset in datasets:
result = sentiment_analysis(dataset)
results.append(result)
return results
After (Async - Concurrent):
import asyncio
async def process_multiple_datasets(datasets):
# Process all datasets concurrently
tasks = [sentiment_analysis_async(dataset) for dataset in datasets]
results = await asyncio.gather(*tasks)
return results
2. Manual Job Control¶
Async provides better control over job management.
Before (Sync):
def process_with_job_control(texts):
with CoreClient.with_client_credentials() as client:
request = SentimentRequest(inputs=texts, fast=False)
# Sync client automatically waits for job completion
return client.analyze_sentiment(request)
After (Async):
async def process_with_job_control(texts):
async with AsyncCoreClient.with_client_credentials_async() as client:
request = SentimentRequest(inputs=texts, fast=False)
# Option 1: Automatic waiting (same as sync)
result = await client.analyze_sentiment(request)
# Option 2: Manual job control
job = await client.analyze_sentiment(request, await_job_result=False)
print(f"Job submitted: {job.id}")
result = await job.wait(timeout=120.0)
return result
3. Error Handling¶
Before (Sync):
def safe_analysis(texts):
try:
with CoreClient.with_client_credentials() as client:
request = SentimentRequest(inputs=texts, fast=True)
return client.analyze_sentiment(request)
except Exception as e:
print(f"Analysis failed: {e}")
return None
After (Async):
import asyncio
async def safe_analysis(texts):
try:
async with AsyncCoreClient.with_client_credentials_async() as client:
request = SentimentRequest(inputs=texts, fast=True)
# Add timeout for better control
return await asyncio.wait_for(
client.analyze_sentiment(request),
timeout=60.0
)
except asyncio.TimeoutError:
print("Analysis timed out")
return None
except Exception as e:
print(f"Analysis failed: {e}")
return None
4. Batch Processing¶
Before (Sync):
def process_large_dataset(texts, batch_size=10):
results = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
result = sentiment_analysis(batch)
results.append(result)
return results
After (Async):
from pulse.core.async_concurrent import submit_and_gather_jobs
async def process_large_dataset(texts, batch_size=10, max_concurrent=3):
async with AsyncCoreClient.with_client_credentials_async() as client:
# Create job submitters for each batch
async def create_batch_submitter(batch):
async def submitter():
request = SentimentRequest(inputs=batch, fast=True)
return await client.analyze_sentiment(request, await_job_result=False)
return submitter
# Create submitters for all batches
submitters = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
submitter = await create_batch_submitter(batch)
submitters.append(submitter)
# Process with controlled concurrency
results = await submit_and_gather_jobs(
submitters,
max_concurrent=max_concurrent,
rate_limit_delay=0.1
)
return results
Web Framework Integration¶
FastAPI¶
Before (Sync with thread pool):
from fastapi import FastAPI
from concurrent.futures import ThreadPoolExecutor
import asyncio
app = FastAPI()
executor = ThreadPoolExecutor()
@app.post("/analyze")
async def analyze_endpoint(texts: list[str]):
# Run sync code in thread pool
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor,
lambda: sentiment_analysis(texts)
)
return result
After (Native Async):
from fastapi import FastAPI
app = FastAPI()
@app.post("/analyze")
async def analyze_endpoint(texts: list[str]):
# Native async - no thread pool needed
result = await sentiment_analysis_async(texts)
return result
Django (Async Views)¶
Before (Sync):
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
import json
@csrf_exempt
def analyze_view(request):
data = json.loads(request.body)
texts = data.get('texts', [])
result = sentiment_analysis(texts)
return JsonResponse({'result': result.dict()})
After (Async):
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
import json
@csrf_exempt
async def analyze_view(request):
data = json.loads(request.body)
texts = data.get('texts', [])
result = await sentiment_analysis_async(texts)
return JsonResponse({'result': result.dict()})
Performance Considerations¶
Memory Usage¶
Async operations can use more memory due to maintaining multiple coroutines. Use these strategies:
# ✅ Good - Control concurrency
from pulse.core.async_concurrent import gather_jobs
async def memory_efficient_processing(large_dataset):
async with AsyncCoreClient.with_client_credentials_async() as client:
# Submit jobs
jobs = []
for batch in large_dataset:
request = SentimentRequest(inputs=batch, fast=True)
job = await client.analyze_sentiment(request, await_job_result=False)
jobs.append(job)
# Process with limited concurrency
results = await gather_jobs(jobs, max_concurrent=5)
return results
# ❌ Bad - Unlimited concurrency
async def memory_intensive_processing(large_dataset):
tasks = [sentiment_analysis_async(batch) for batch in large_dataset]
return await asyncio.gather(*tasks) # May use too much memory
Connection Pooling¶
from pulse.core.async_concurrent import AsyncConnectionPoolManager
async def optimized_connection_example():
# Configure optimized connection pool
config = AsyncConnectionPoolManager.get_recommended_config(concurrent_jobs=10)
optimized_client = AsyncConnectionPoolManager.create_optimized_client(
"https://pulse.researchwiseai.com/v1",
config=config
)
async with AsyncCoreClient(client=optimized_client) as client:
# Use optimized client for better performance
pass
Testing Migration¶
Unit Tests¶
Before (Sync):
import unittest
from pulse.starters import sentiment_analysis
class TestAnalysis(unittest.TestCase):
def test_sentiment_analysis(self):
texts = ["Great product!", "Poor quality"]
result = sentiment_analysis(texts)
self.assertEqual(len(result.results), 2)
After (Async):
import unittest
import asyncio
from pulse.async_starters import sentiment_analysis_async
class TestAnalysis(unittest.TestCase):
def test_sentiment_analysis(self):
async def run_test():
texts = ["Great product!", "Poor quality"]
result = await sentiment_analysis_async(texts)
self.assertEqual(len(result.results), 2)
asyncio.run(run_test())
pytest-asyncio¶
import pytest
from pulse.async_starters import sentiment_analysis_async
@pytest.mark.asyncio
async def test_sentiment_analysis():
texts = ["Great product!", "Poor quality"]
result = await sentiment_analysis_async(texts)
assert len(result.results) == 2
Common Migration Issues¶
1. Event Loop Already Running¶
Problem:
# This will fail in Jupyter notebooks or async contexts
asyncio.run(sentiment_analysis_async(texts))
Solution:
async def jupyter_solution():
# In Jupyter or async context, use await directly
result = await sentiment_analysis_async(texts)
return result
# Or create a new event loop
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(sentiment_analysis_async(texts))
loop.close()
2. Mixing Sync and Async¶
Problem:
async def mixed_function():
# This won't work - can't call sync from async context efficiently
sync_result = sentiment_analysis(texts) # Blocks event loop
async_result = await sentiment_analysis_async(texts)
return sync_result, async_result
Solution:
async def async_only_function():
# Use only async functions
result1 = await sentiment_analysis_async(texts1)
result2 = await sentiment_analysis_async(texts2)
return result1, result2
# Or run sync code in thread pool if necessary
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def mixed_function_fixed():
executor = ThreadPoolExecutor()
loop = asyncio.get_event_loop()
sync_result = await loop.run_in_executor(
executor,
lambda: sentiment_analysis(texts)
)
async_result = await sentiment_analysis_async(texts)
return sync_result, async_result
3. Resource Cleanup¶
Problem:
# Forgetting to close resources
client = AsyncCoreClient.with_client_credentials_async()
result = await client.analyze_sentiment(request)
# Client not closed - resource leak
Solution:
# Always use context managers
async with AsyncCoreClient.with_client_credentials_async() as client:
result = await client.analyze_sentiment(request)
# Client automatically closed
Migration Checklist¶
- [ ] Update all imports to async versions
- [ ] Add
asynckeyword to function definitions - [ ] Add
awaitkeyword before all API calls - [ ] Replace
withwithasync withfor context managers - [ ] Update error handling for async patterns
- [ ] Add timeout handling where appropriate
- [ ] Consider concurrent processing opportunities
- [ ] Update tests to use async patterns
- [ ] Verify resource cleanup with context managers
- [ ] Test performance improvements
Gradual Migration Strategy¶
You can migrate gradually by running both sync and async code in the same application:
# Keep existing sync functions
def legacy_analysis(texts):
return sentiment_analysis(texts)
# Add new async functions
async def new_analysis(texts):
return await sentiment_analysis_async(texts)
# Bridge function to call async from sync context
def bridge_analysis(texts):
return asyncio.run(new_analysis(texts))
# Use bridge function during migration
result = bridge_analysis(texts)
Performance Benefits¶
After migration, you should see:
- Reduced latency for concurrent operations
- Better resource utilization in I/O-bound applications
- Improved scalability for web applications
- More responsive user interfaces
Example performance improvement:
import time
# Before (Sync - Sequential)
start = time.time()
results = []
for dataset in datasets: # 5 datasets
result = sentiment_analysis(dataset) # ~2s each
results.append(result)
# Total time: ~10 seconds
# After (Async - Concurrent)
async def concurrent_processing():
start = time.time()
tasks = [sentiment_analysis_async(dataset) for dataset in datasets]
results = await asyncio.gather(*tasks)
# Total time: ~2-3 seconds (depending on concurrency limits)
return results
The async version can be 3-5x faster for concurrent operations while using the same API interface.