Async Job Management and Concurrency¶
This guide covers advanced async job management patterns and concurrency best practices for the Pulse SDK.
Overview¶
The Pulse SDK's async job management system provides:
- Manual job control: Submit jobs without waiting for completion
- Concurrent processing: Handle multiple jobs simultaneously with controlled concurrency
- Advanced monitoring: Track job progress with callbacks and status updates
- Error handling: Robust error recovery and retry mechanisms
- Resource optimization: Efficient connection pooling and rate limiting
Job Lifecycle¶
Basic Job Flow¶
from pulse.core.async_client import AsyncCoreClient
from pulse.core.models import SentimentRequest
async def basic_job_flow():
async with AsyncCoreClient.with_client_credentials_async() as client:
request = SentimentRequest(inputs=["Sample text"], fast=True)
# Submit job without waiting
job = await client.analyze_sentiment(request, await_job_result=False)
print(f"Job submitted: {job.id} (status: {job.status})")
# Manual polling
while job.status in ["pending", "queued", "running"]:
await asyncio.sleep(1) # Wait 1 second
await job.refresh()
print(f"Job status: {job.status}")
# Get final result
if job.status == "completed":
result = await job.get_result()
return result
else:
raise Exception(f"Job failed with status: {job.status}")
Automatic Job Waiting¶
async def automatic_job_waiting():
async with AsyncCoreClient.with_client_credentials_async() as client:
request = SentimentRequest(inputs=["Sample text"], fast=True)
# Automatic waiting (default behavior)
result = await client.analyze_sentiment(request)
# Job is automatically polled until completion
return result
Advanced Job Control¶
Job Monitoring with Callbacks¶
async def job_with_monitoring():
async def status_callback(status: str):
print(f"Job status changed to: {status}")
if status == "running":
print("Job is now processing...")
async with AsyncCoreClient.with_client_credentials_async() as client:
request = SentimentRequest(inputs=["Sample text"], fast=False)
job = await client.analyze_sentiment(request, await_job_result=False)
# Wait with status callbacks
result = await job.wait_with_callback(
callback=status_callback,
timeout=120.0,
poll_interval=2.0
)
return result
Waiting for Specific Status¶
async def wait_for_specific_status():
async with AsyncCoreClient.with_client_credentials_async() as client:
request = SentimentRequest(inputs=["Sample text"], fast=False)
job = await client.analyze_sentiment(request, await_job_result=False)
# Wait for job to start running
await job.wait_for_status("running", timeout=30.0)
print("Job is now running!")
# Wait for completion
result = await job.wait(timeout=120.0)
return result
Job Cancellation¶
async def job_cancellation_example():
async with AsyncCoreClient.with_client_credentials_async() as client:
request = SentimentRequest(inputs=["Sample text"], fast=False)
job = await client.analyze_sentiment(request, await_job_result=False)
try:
# Wait with short timeout
result = await job.wait(timeout=5.0)
return result
except asyncio.TimeoutError:
print("Job taking too long, cancelling...")
cancelled = await job.cancel()
if cancelled:
print("Job cancelled successfully")
else:
print("Job could not be cancelled (may have completed)")
return None
Concurrent Job Processing¶
Basic Concurrent Jobs¶
from pulse.core.async_concurrent import gather_jobs
async def basic_concurrent_jobs():
async with AsyncCoreClient.with_client_credentials_async() as client:
# Submit multiple jobs
jobs = []
texts_batches = [["text1", "text2"], ["text3", "text4"], ["text5", "text6"]]
for batch in texts_batches:
request = SentimentRequest(inputs=batch, fast=True)
job = await client.analyze_sentiment(request, await_job_result=False)
jobs.append(job)
print(f"Submitted job: {job.id}")
# Gather results with controlled concurrency
print("Gathering results...")
results = await gather_jobs(jobs, max_concurrent=2)
print(f"Completed {len(results)} jobs")
return results
Submit and Gather Pattern¶
from pulse.core.async_concurrent import submit_and_gather_jobs
async def submit_and_gather_example():
async with AsyncCoreClient.with_client_credentials_async() as client:
# Create job submitters
async def create_job_submitter(batch_id: int, texts: list):
async def submitter():
request = SentimentRequest(inputs=texts, fast=True)
return await client.analyze_sentiment(request, await_job_result=False)
return submitter
# Create submitters for multiple batches
submitters = []
for i, batch in enumerate([["text1"], ["text2"], ["text3"], ["text4"]]):
submitter = await create_job_submitter(i, batch)
submitters.append(submitter)
# Submit and gather with rate limiting
results = await submit_and_gather_jobs(
submitters,
max_concurrent=2,
rate_limit_delay=0.5, # 500ms between submissions
timeout=60.0
)
return results
Wait for Any Job Completion¶
from pulse.core.async_concurrent import wait_for_any_job
async def wait_for_any_example():
async with AsyncCoreClient.with_client_credentials_async() as client:
# Submit multiple jobs with different processing times
jobs = []
job_descriptions = [
(["Short text"], "quick job"),
(["This is a much longer text that will take more time to process"], "slow job"),
(["Medium length text"], "medium job"),
]
for texts, description in job_descriptions:
request = SentimentRequest(inputs=texts, fast=False)
job = await client.analyze_sentiment(request, await_job_result=False)
jobs.append(job)
print(f"Submitted {description}: {job.id}")
# Wait for first completion
print("Waiting for first job to complete...")
completed_job, result = await wait_for_any_job(jobs, timeout=60.0)
print(f"First completed job: {completed_job.id}")
# Cancel remaining jobs if needed
for job in jobs:
if job.id != completed_job.id and job.status in ["pending", "queued", "running"]:
await job.cancel()
return result
Advanced Concurrency Patterns¶
Job Manager with Retry Logic¶
from pulse.core.async_concurrent import AsyncJobManager, AsyncConnectionPoolManager
async def advanced_job_manager():
# Create optimized configuration
config = AsyncConnectionPoolManager.get_recommended_config(concurrent_jobs=10)
manager = AsyncJobManager(config)
# Create optimized client
optimized_client = AsyncConnectionPoolManager.create_optimized_client(
"https://pulse.researchwiseai.com/v1",
config=config
)
async with AsyncCoreClient(client=optimized_client) as client:
# Create job submitters
async def create_batch_submitter(batch_id: int):
async def submitter():
texts = [f"Batch {batch_id} text {i}" for i in range(3)]
request = SentimentRequest(inputs=texts, fast=True)
return await client.analyze_sentiment(request, await_job_result=False)
return submitter
# Create multiple batch submitters
submitters = [await create_batch_submitter(i) for i in range(8)]
# Process with retry logic
try:
results = await manager.batch_process_with_retry(
submitters,
max_retries=2,
retry_delay=1.0,
timeout=120.0
)
print(f"Successfully processed {len(results)} batches")
return results
except Exception as e:
print(f"Batch processing failed: {e}")
return []
Producer-Consumer Pattern¶
import asyncio
from asyncio import Queue
async def producer_consumer_pattern():
# Create queue for job coordination
job_queue = Queue(maxsize=20)
result_queue = Queue()
async def producer(texts: list):
"""Submit jobs to the queue."""
async with AsyncCoreClient.with_client_credentials_async() as client:
for i, text in enumerate(texts):
request = SentimentRequest(inputs=[text], fast=True)
job = await client.analyze_sentiment(request, await_job_result=False)
await job_queue.put((i, job))
print(f"Produced job {i}: {job.id}")
# Signal completion
await job_queue.put(None)
async def consumer(consumer_id: int):
"""Process jobs from the queue."""
results = []
while True:
item = await job_queue.get()
if item is None:
# Put sentinel back for other consumers
await job_queue.put(None)
break
job_index, job = item
try:
result = await job.wait(timeout=60.0)
results.append((job_index, result))
print(f"Consumer {consumer_id} completed job {job_index}")
except Exception as e:
print(f"Consumer {consumer_id} failed job {job_index}: {e}")
finally:
job_queue.task_done()
return results
# Sample data
texts = [f"Sample text {i}" for i in range(10)]
# Start producer and multiple consumers
producer_task = asyncio.create_task(producer(texts))
consumer_tasks = [
asyncio.create_task(consumer(i))
for i in range(3) # 3 consumers
]
# Wait for producer to finish
await producer_task
# Wait for all jobs to be processed
await job_queue.join()
# Gather results from all consumers
consumer_results = await asyncio.gather(*consumer_tasks)
# Combine and sort results
all_results = []
for consumer_result in consumer_results:
all_results.extend(consumer_result)
all_results.sort(key=lambda x: x[0]) # Sort by job index
return [result for _, result in all_results]
Rate-Limited Processing¶
import asyncio
from asyncio import Semaphore
async def rate_limited_processing():
# Control concurrent operations
semaphore = Semaphore(3) # Max 3 concurrent operations
async def process_with_rate_limit(text: str, delay: float = 0.1):
async with semaphore:
# Add delay between operations
await asyncio.sleep(delay)
async with AsyncCoreClient.with_client_credentials_async() as client:
request = SentimentRequest(inputs=[text], fast=True)
return await client.analyze_sentiment(request)
# Process multiple texts with rate limiting
texts = [f"Text {i}" for i in range(10)]
tasks = [process_with_rate_limit(text) for text in texts]
results = await asyncio.gather(*tasks)
return results
Error Handling and Recovery¶
Robust Error Handling¶
async def robust_job_processing():
async def safe_job_execution(job_data: dict):
"""Execute a job with comprehensive error handling."""
try:
async with AsyncCoreClient.with_client_credentials_async() as client:
request = SentimentRequest(inputs=job_data['texts'], fast=True)
# Submit job
job = await client.analyze_sentiment(request, await_job_result=False)
# Wait with timeout
result = await asyncio.wait_for(job.wait(), timeout=60.0)
return {
'job_id': job_data['id'],
'status': 'success',
'result': result,
'error': None
}
except asyncio.TimeoutError:
return {
'job_id': job_data['id'],
'status': 'timeout',
'result': None,
'error': 'Job timed out after 60 seconds'
}
except Exception as e:
return {
'job_id': job_data['id'],
'status': 'error',
'result': None,
'error': str(e)
}
# Sample job data
job_data_list = [
{'id': 1, 'texts': ['Great product!']},
{'id': 2, 'texts': ['Poor quality']},
{'id': 3, 'texts': ['Average experience']},
]
# Process all jobs with error handling
results = await asyncio.gather(*[
safe_job_execution(job_data)
for job_data in job_data_list
])
# Analyze results
successful = [r for r in results if r['status'] == 'success']
failed = [r for r in results if r['status'] != 'success']
print(f"Successful jobs: {len(successful)}")
print(f"Failed jobs: {len(failed)}")
for failure in failed:
print(f"Job {failure['job_id']} failed: {failure['error']}")
return successful, failed
Retry Logic with Exponential Backoff¶
import random
async def job_with_retry():
async def execute_with_retry(
operation,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0
):
"""Execute operation with exponential backoff retry."""
for attempt in range(max_retries + 1):
try:
return await operation()
except Exception as e:
if attempt == max_retries:
raise e
# Calculate delay with exponential backoff and jitter
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
total_delay = delay + jitter
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {total_delay:.2f} seconds...")
await asyncio.sleep(total_delay)
async def unreliable_operation():
"""Simulate an operation that might fail."""
async with AsyncCoreClient.with_client_credentials_async() as client:
request = SentimentRequest(inputs=["Sample text"], fast=True)
return await client.analyze_sentiment(request)
# Execute with retry logic
try:
result = await execute_with_retry(unreliable_operation)
print("Operation succeeded!")
return result
except Exception as e:
print(f"Operation failed after all retries: {e}")
return None
Performance Optimization¶
Connection Pool Optimization¶
from pulse.core.async_concurrent import AsyncConnectionPoolManager
async def optimized_concurrent_processing():
# Get recommended configuration for high concurrency
config = AsyncConnectionPoolManager.get_recommended_config(
concurrent_jobs=20,
target_latency_ms=500
)
print(f"Using configuration:")
print(f" Max concurrent jobs: {config.max_concurrent_jobs}")
print(f" Rate limit delay: {config.rate_limit_delay}s")
print(f" Connection pool size: {config.connection_pool_size}")
# Create optimized HTTP client
optimized_client = AsyncConnectionPoolManager.create_optimized_client(
"https://pulse.researchwiseai.com/v1",
config=config
)
async with AsyncCoreClient(client=optimized_client) as client:
# Create many concurrent jobs
jobs = []
for i in range(50):
request = SentimentRequest(inputs=[f"Text {i}"], fast=True)
job = await client.analyze_sentiment(request, await_job_result=False)
jobs.append(job)
# Process with optimized settings
results = await gather_jobs(
jobs,
max_concurrent=config.max_concurrent_jobs
)
print(f"Processed {len(results)} jobs with optimized configuration")
return results
Memory-Efficient Batch Processing¶
async def memory_efficient_processing(large_dataset: list, batch_size: int = 100):
"""Process large datasets without loading everything into memory."""
async def process_batch(batch: list):
async with AsyncCoreClient.with_client_credentials_async() as client:
request = SentimentRequest(inputs=batch, fast=True)
return await client.analyze_sentiment(request)
results = []
# Process in batches to control memory usage
for i in range(0, len(large_dataset), batch_size):
batch = large_dataset[i:i + batch_size]
print(f"Processing batch {i // batch_size + 1}/{(len(large_dataset) + batch_size - 1) // batch_size}")
# Process current batch
batch_result = await process_batch(batch)
results.append(batch_result)
# Optional: Add delay between batches to avoid rate limits
await asyncio.sleep(0.1)
return results
Monitoring and Observability¶
Job Progress Tracking¶
import time
from dataclasses import dataclass
from typing import Dict, List
@dataclass
class JobMetrics:
submitted: int = 0
completed: int = 0
failed: int = 0
cancelled: int = 0
start_time: float = 0
end_time: float = 0
async def monitored_job_processing():
metrics = JobMetrics(start_time=time.time())
async def track_job(job, job_id: str):
"""Track individual job progress."""
try:
result = await job.wait(timeout=120.0)
metrics.completed += 1
print(f"✅ Job {job_id} completed")
return result
except asyncio.TimeoutError:
await job.cancel()
metrics.cancelled += 1
print(f"⏰ Job {job_id} cancelled (timeout)")
return None
except Exception as e:
metrics.failed += 1
print(f"❌ Job {job_id} failed: {e}")
return None
async with AsyncCoreClient.with_client_credentials_async() as client:
# Submit jobs
jobs = []
for i in range(10):
request = SentimentRequest(inputs=[f"Text {i}"], fast=True)
job = await client.analyze_sentiment(request, await_job_result=False)
jobs.append((job, f"job_{i}"))
metrics.submitted += 1
print(f"📤 Submitted {metrics.submitted} jobs")
# Track all jobs
tracking_tasks = [track_job(job, job_id) for job, job_id in jobs]
results = await asyncio.gather(*tracking_tasks)
metrics.end_time = time.time()
# Print final metrics
duration = metrics.end_time - metrics.start_time
print(f"\n📊 Final Metrics:")
print(f" Submitted: {metrics.submitted}")
print(f" Completed: {metrics.completed}")
print(f" Failed: {metrics.failed}")
print(f" Cancelled: {metrics.cancelled}")
print(f" Duration: {duration:.2f}s")
print(f" Success Rate: {metrics.completed / metrics.submitted * 100:.1f}%")
return [r for r in results if r is not None]
Real-time Progress Updates¶
async def real_time_progress_monitoring():
"""Monitor job progress with real-time updates."""
class ProgressMonitor:
def __init__(self, total_jobs: int):
self.total_jobs = total_jobs
self.completed_jobs = 0
self.failed_jobs = 0
self.start_time = time.time()
async def update_progress(self, job_id: str, status: str):
if status == "completed":
self.completed_jobs += 1
elif status == "failed":
self.failed_jobs += 1
elapsed = time.time() - self.start_time
progress = (self.completed_jobs + self.failed_jobs) / self.total_jobs * 100
print(f"Progress: {progress:.1f}% ({self.completed_jobs + self.failed_jobs}/{self.total_jobs}) "
f"- Elapsed: {elapsed:.1f}s - Job {job_id}: {status}")
monitor = ProgressMonitor(total_jobs=5)
async def monitored_job(job_id: str, texts: list):
try:
async with AsyncCoreClient.with_client_credentials_async() as client:
request = SentimentRequest(inputs=texts, fast=True)
result = await client.analyze_sentiment(request)
await monitor.update_progress(job_id, "completed")
return result
except Exception as e:
await monitor.update_progress(job_id, "failed")
raise e
# Create and run monitored jobs
job_data = [
("job_1", ["Great product!"]),
("job_2", ["Poor quality"]),
("job_3", ["Average experience"]),
("job_4", ["Excellent service"]),
("job_5", ["Terrible support"]),
]
tasks = [monitored_job(job_id, texts) for job_id, texts in job_data]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
Best Practices Summary¶
1. Job Submission¶
- Use
await_job_result=Falsefor manual job control - Submit jobs in batches to avoid overwhelming the API
- Implement proper error handling for job submission
2. Concurrency Control¶
- Always use
max_concurrentparameter to limit concurrent operations - Use
gather_jobs()for controlled job gathering - Implement rate limiting with
rate_limit_delay
3. Error Handling¶
- Implement timeout handling for all job operations
- Use retry logic with exponential backoff for transient failures
- Handle job cancellation gracefully
4. Resource Management¶
- Always use async context managers for client lifecycle
- Configure connection pools for high-concurrency scenarios
- Monitor memory usage with large datasets
5. Monitoring¶
- Track job metrics for performance analysis
- Implement progress monitoring for long-running operations
- Log job failures for debugging
6. Performance Optimization¶
- Use optimized connection pool configurations
- Process data in batches for memory efficiency
- Consider producer-consumer patterns for complex workflows
These patterns provide a solid foundation for building robust, scalable async applications with the Pulse SDK.