Skip to main content

Daemon Architecture

The daemon (owld) is OWL's core service, handling all AI logic and persistent state.

Overview

owl/daemon/
├── main.py # Entry point, initialization
├── server.py # Unix socket server, request handling
├── protocol.py # Communication protocol definitions
└── project.py # Project detection

Server Lifecycle

Startup

# main.py
def main():
config = get_config()

# Check Ollama connectivity
llm = LLMClient()
if not llm.health_check():
print("Cannot connect to Ollama")
sys.exit(1)

# Start server
server = DaemonServer()
server.start()

Server Initialization

# server.py
class DaemonServer:
def __init__(self):
# Core components
self.llm = LLMClient()
self.soul = get_soul()
self.memory = get_memory_store()
self.knowledge = get_knowledge_store()
self.tool_registry = get_tool_registry()

# State
self.session_id = self._get_or_create_session()
self.current_project = None

Request Handling

def _handle_client(self, client: socket.socket):
# Read request
data = self._recv_message(client)
request = Request.from_json(data)

# Route to handler
if request.type == RequestType.STREAM:
self._handle_stream(request, client)
else:
handler = self.handlers[request.type]
response = handler(request)
self._send_message(client, response.to_json())

Request Types

TypeHandlerDescription
CHAT_handle_chatNon-streaming chat
STREAM_handle_streamStreaming chat
ABORT_handle_abortCancel current request
PROJECT_handle_projectSet/get project
SESSION_handle_sessionSession management
TOOLS_handle_toolsTool profiles
MEMORY_handle_memoryMemory operations
KNOWLEDGE_handle_knowledgeKnowledge base
SOUL_handle_soulView/modify soul
EVOLVE_handle_evolveTrigger evolution
STATUS_handle_statusDaemon status
SHUTDOWN_handle_shutdownStop daemon

Streaming Handler

The streaming handler is the most complex, handling tool loops:

def _handle_stream(self, request: Request, client: socket.socket):
message = request.payload.get("message")

# Build context
builder = ContextBuilder()
builder.with_project(self.current_project)
builder.with_knowledge_query(message)

# Get history
history = self.memory.get_conversation(self.session_id)
messages = builder.build(message, history)

# Tool loop
while True:
# Check abort/timeout
if self._should_stop():
break

# Call LLM
response = self.llm.chat(messages, tools=tools)

# No tool calls - stream response
if not response.tool_calls:
for chunk in self.llm.chat_stream(messages):
self._send_chunk(client, StreamChunk(content=chunk))
break

# Execute tools
for tool_call in response.tool_calls:
# Send tool_call notification
self._send_chunk(client, StreamChunk(
tool_call={"name": tool_call.name, "args": tool_call.arguments}
))

# Execute
result = self.tool_registry.execute(tool_call.name, tool_call.arguments)

# Send tool_result notification
self._send_chunk(client, StreamChunk(
tool_result={"name": tool_call.name, "success": "error" not in result}
))

# Add to context
messages.append(Message(role="tool", content=json.dumps(result)))

# Store conversation
self.memory.add_message(self.session_id, "user", message)
self.memory.add_message(self.session_id, "assistant", final_content)

# Async tasks
self.reflector.reflect_async(self.session_id)
self.summarizer.summarize_async(self.session_id)

Run State Management

class RunState(Enum):
QUEUED = "queued"
RUNNING = "running"
STREAMING = "streaming"
DONE = "done"
ABORTED = "aborted"
TIMEOUT = "timeout"
ERROR = "error"

The server tracks run state for abort/timeout handling:

def _start_run(self) -> str:
self._run_lock.acquire()
self._current_run_id = str(uuid.uuid4())[:8]
self._run_state = RunState.RUNNING
self._abort_requested.clear()
self._run_start_time = time.time()
return self._current_run_id

def _check_abort(self) -> bool:
return self._abort_requested.is_set()

def _check_timeout(self) -> bool:
elapsed = time.time() - self._run_start_time
return elapsed > self._timeout

Protocol

Message Format

All messages are length-prefixed JSON:

[4 bytes: length][JSON payload]

Request

@dataclass
class Request:
type: RequestType
payload: Dict[str, Any]

Response

@dataclass
class Response:
success: bool
data: Dict[str, Any]
error: Optional[str] = None
stream: bool = False

Stream Chunk

@dataclass
class StreamChunk:
content: str = "" # Text content
tool_call: Optional[Dict] # Tool being called
tool_result: Optional[Dict] # Tool completed
stage: Optional[str] # Current stage
ask_user: Optional[Dict] # Question for user
done: bool = False # Stream complete
error: Optional[str] # Error message
aborted: bool = False # Was aborted

Project Detection

# project.py
def detect_project(path: str) -> ProjectContext:
"""Detect project type and frameworks."""

# Check for project markers
markers = {
"pyproject.toml": ("python", None),
"setup.py": ("python", None),
"package.json": ("javascript", None),
"Cargo.toml": ("rust", None),
"go.mod": ("go", None),
"pom.xml": ("java", "maven"),
"build.gradle": ("java", "gradle"),
}

# Detect frameworks
frameworks = []
if (path / "manage.py").exists():
frameworks.append("django")
if (path / "app.py").exists():
frameworks.append("flask")
# ... more detection

return ProjectContext(
path=str(path),
type=project_type,
frameworks=frameworks,
key_dirs=find_key_directories(path)
)

Concurrency

Thread Safety

  • _run_lock serializes LLM requests
  • One active run at a time
  • Multiple CLI connections allowed

Async Operations

Background tasks don't block responses:

# Reflection runs in background thread
self.reflector.reflect_async(session_id)

# Summarization runs in background
self.summarizer.summarize_async(session_id)

Error Handling

def _handle_client(self, client: socket.socket):
try:
# Handle request
...
except Exception as e:
error_response = Response.err(str(e))
self._send_message(client, error_response.to_json())
finally:
client.close()

For streaming:

try:
# Stream response
...
except Exception as e:
self._send_chunk(client, StreamChunk(error=str(e), done=True))
self._end_run(RunState.ERROR)