In-Memory Batch Processing for Serverless¶
This guide demonstrates how to use Instructor's in-memory batch processing feature, which is perfect for serverless deployments and applications that need to avoid disk I/O.
Overview¶
In-memory batch processing allows you to create and submit batch requests without writing to disk, using BytesIO buffers instead of files. This is ideal for:
- Serverless environments (AWS Lambda, Google Cloud Functions, Azure Functions)
- Containerized applications with read-only file systems
- Security-sensitive applications that avoid temporary files
- High-performance applications that minimize I/O overhead
Quick Start¶
import time
from pydantic import BaseModel
from instructor.batch.processor import BatchProcessor
class User(BaseModel):
"""User model for extraction."""
name: str
age: int
email: str
def main():
# Initialize batch processor
processor = BatchProcessor("openai/gpt-4o-mini", User)
# Sample messages for batch processing
messages_list = [
[
{"role": "system", "content": "Extract user information from the text."},
{
"role": "user",
"content": "John Doe is 25 years old and his email is john@example.com",
},
],
[
{"role": "system", "content": "Extract user information from the text."},
{
"role": "user",
"content": "Jane Smith, age 30, can be reached at jane.smith@company.com",
},
],
[
{"role": "system", "content": "Extract user information from the text."},
{
"role": "user",
"content": "Bob Wilson (bob.wilson@email.com) is 28 years old",
},
],
]
# Create batch in memory (no file_path specified)
batch_buffer = processor.create_batch_from_messages(
messages_list,
file_path=None, # This triggers in-memory mode
max_tokens=150,
temperature=0.1,
)
print(f"Created batch buffer: {type(batch_buffer)}")
print(f"Buffer size: {len(batch_buffer.getvalue())} bytes")
# Submit the batch using the in-memory buffer
batch_id = processor.submit_batch(
batch_buffer, metadata={"description": "In-memory batch example"}
)
print(f"Batch submitted successfully! Batch ID: {batch_id}")
# Poll for completion
print("Waiting for batch to complete...")
max_wait_time = 300 # 5 minutes max
start_time = time.time()
while time.time() - start_time < max_wait_time:
status = processor.get_batch_status(batch_id)
current_status = status.get("status", "unknown")
print(f"Current status: {current_status}")
if current_status in ["completed", "failed", "cancelled", "expired"]:
break
time.sleep(10)
# Retrieve and process results
if status.get("status") == "completed":
print("Batch completed! Retrieving results...")
results = processor.get_results(batch_id)
successful_results = [r for r in results if hasattr(r, "result")]
error_results = [r for r in results if hasattr(r, "error_message")]
print(f"Total results: {len(results)}")
print(f"Successful: {len(successful_results)}")
print(f"Errors: {len(error_results)}")
# Show successful extractions
if successful_results:
print("\nExtracted Users:")
for result in successful_results:
user = result.result
print(f" - {user.name}, {user.age} years old, {user.email}")
# Show any errors
if error_results:
print("\nErrors encountered:")
for error in error_results:
print(f" - {error.custom_id}: {error.error_message}")
if __name__ == "__main__":
main()
File vs In-Memory Comparison¶
Traditional File-Based Approach¶
# File-based approach
processor = BatchProcessor("openai/gpt-4o-mini", User)
# Creates file on disk
file_path = processor.create_batch_from_messages(
messages_list,
file_path="temp_batch.jsonl", # Specify file path
max_tokens=150,
temperature=0.1,
)
# Submit using file path
batch_id = processor.submit_batch(file_path)
# Remember to clean up
import os
if os.path.exists(file_path):
os.remove(file_path)
New In-Memory Approach¶
# In-memory approach
processor = BatchProcessor("openai/gpt-4o-mini", User)
# Creates BytesIO buffer in memory
buffer = processor.create_batch_from_messages(
messages_list,
file_path=None, # No file path = in-memory
max_tokens=150,
temperature=0.1,
)
# Submit using buffer
batch_id = processor.submit_batch(buffer)
# No cleanup required - buffer is automatically garbage collected
Benefits of In-Memory Processing¶
✅ Perfect for Serverless¶
# AWS Lambda example
import json
def lambda_handler(event, context):
"""AWS Lambda function using in-memory batch processing."""
# Extract data from event
messages_list = event.get("messages", [])
# Process in memory - no disk I/O
processor = BatchProcessor("openai/gpt-4o-mini", User)
buffer = processor.create_batch_from_messages(
messages_list,
file_path=None, # Essential for Lambda
)
batch_id = processor.submit_batch(buffer)
return {
'statusCode': 200,
'body': json.dumps({
'batch_id': batch_id,
'message': 'Batch submitted successfully'
})
}
✅ Memory Efficient¶
# Check buffer size before submission
buffer = processor.create_batch_from_messages(messages_list, file_path=None)
print(f"Buffer size: {len(buffer.getvalue())} bytes")
print(f"Buffer type: {type(buffer)}")
# Buffer content is accessible
buffer.seek(0)
content_preview = buffer.read(200).decode("utf-8")
print(f"Preview: {content_preview}...")
# Reset for submission
buffer.seek(0)
batch_id = processor.submit_batch(buffer)
✅ Security Benefits¶
# No temporary files on disk
# No file permissions to manage
# No cleanup required
# Buffer is automatically garbage collected
processor = BatchProcessor("openai/gpt-4o-mini", User)
# This approach leaves no trace on the file system
buffer = processor.create_batch_from_messages(
sensitive_messages,
file_path=None, # Keeps everything in memory
)
batch_id = processor.submit_batch(buffer)
# When buffer goes out of scope, it's automatically cleaned up
Error Handling¶
try:
# Create batch buffer
buffer = processor.create_batch_from_messages(
messages_list,
file_path=None,
)
# Submit batch
batch_id = processor.submit_batch(buffer)
# Process results
results = processor.get_results(batch_id)
except Exception as e:
print(f"Error during batch processing: {e}")
# No file cleanup needed with in-memory approach
Provider Support¶
All providers support in-memory batch processing:
OpenAI¶
processor = BatchProcessor("openai/gpt-4o-mini", User)
buffer = processor.create_batch_from_messages(messages_list, file_path=None)
batch_id = processor.submit_batch(buffer)
Anthropic¶
processor = BatchProcessor("anthropic/claude-3-5-sonnet-20241022", User)
buffer = processor.create_batch_from_messages(messages_list, file_path=None)
batch_id = processor.submit_batch(buffer)
Google GenAI¶
processor = BatchProcessor("google/gemini-2.5-flash", User)
buffer = processor.create_batch_from_messages(messages_list, file_path=None)
batch_id = processor.submit_batch(buffer)
Best Practices¶
- Always set
file_path=None
to enable in-memory mode - Monitor buffer size for large batches to avoid memory issues
- Use appropriate models that support JSON schema (e.g., gpt-4o-mini)
- Handle errors gracefully - no file cleanup needed
- Consider memory limits in serverless environments
Limitations¶
- Memory usage: Large batches may consume significant memory
- No debugging files: Can't inspect batch files for troubleshooting
- Temporary storage: Buffer contents are lost if not submitted immediately
Troubleshooting¶
Buffer Size Issues¶
# Check buffer size before submission
buffer = processor.create_batch_from_messages(messages_list, file_path=None)
size_mb = len(buffer.getvalue()) / (1024 * 1024)
print(f"Buffer size: {size_mb:.2f} MB")
if size_mb > 100: # Adjust threshold as needed
print("Warning: Large buffer size, consider splitting batch")
Memory Monitoring¶
import psutil
import os
# Check memory usage
process = psutil.Process(os.getpid())
memory_before = process.memory_info().rss / 1024 / 1024 # MB
buffer = processor.create_batch_from_messages(messages_list, file_path=None)
memory_after = process.memory_info().rss / 1024 / 1024 # MB
print(f"Memory increase: {memory_after - memory_before:.2f} MB")
This in-memory approach makes Instructor's batch processing perfect for modern serverless and containerized applications while maintaining the same powerful API and provider support.