Overview
The Buffered Flush pattern captures events locally in memory, then flushes them to Elephantasm in a controlled sequence. This solves the interleaving problem when multiple agents run concurrently.
When to Use
- Concurrent agents produce events simultaneously
- Event ordering matters for synthesis quality
- You want maximum control over what the Dreamer sees
Architecture
┌────────────┐ ┌────────────┐
│ Researcher │ │ Analyzer │
│ (buffer) │ │ (buffer) │
└─────┬──────┘ └─────┬──────┘
│ flush() │ flush()
│ (sequential) │
▼ ▼
Events arrive in controlled order
Code
from elephantasm import Elephantasm, EventType
from datetime import datetime, timezone
class BufferedExtractor:
def __init__(self, client, session_id, author):
self.client = client
self.session_id = session_id
self.author = author
self.buffer = []
def capture(self, event_type, content, role="assistant"):
self.buffer.append({
"event_type": event_type, "content": content,
"role": role, "occurred_at": datetime.now(timezone.utc)
})
def flush(self):
for event in self.buffer:
self.client.extract(
event["event_type"], event["content"],
session_id=self.session_id, author=self.author,
role=event["role"], occurred_at=event["occurred_at"]
)
self.buffer.clear()
# Usage: agents buffer concurrently, then flush sequentially
researcher.flush() # All researcher events first
analyzer.flush() # Then analyzer eventsTrade-offs
| Pros | Cons |
|---|---|
| Full control over event ordering | More code complexity |
| Clean synthesis input | Events delayed until flush |
| Works with concurrent agents | Must manage buffer lifecycle |
| Preserves real timestamps | Memory overhead for large buffers |
Ensure you call flush() before the process exits. Unflushed events are lost. Consider wrapping in try/finally.