Skip to main content

Tools Architecture

OWL's tool system provides controlled file and shell operations.

Registry Pattern

# tools/registry.py
class ToolRegistry:
def __init__(self):
self._tools: Dict[str, ToolSpec] = {}
self._profile: ToolProfile = ToolProfile.CODING
self._disabled_tools: Set[str] = set()

def register(self, name: str, ...):
"""Register a tool."""
self._tools[name] = ToolSpec(...)

def is_tool_available(self, name: str) -> bool:
"""Check tool availability under current profile."""
if name not in self._tools:
return False
if name in self._disabled_tools:
return False

tool = self._tools[name]
allowed_groups = PROFILE_GROUPS[self._profile]

if tool.group not in allowed_groups:
return False

# Write tools disabled in MINIMAL
if self._profile == ToolProfile.MINIMAL and tool.can_write:
return False

return True

def execute(self, name: str, args: dict) -> dict:
"""Execute a tool."""
if not self.is_tool_available(name):
return {"error": f"Tool not available: {name}"}

tool = self._tools[name]
return tool.function(**args)

Tool Specification

@dataclass
class ToolSpec:
name: str
description: str
group: ToolGroup
function: Callable
parameters: Dict[str, Any]
can_write: bool = False

def to_definition(self) -> dict:
"""Convert to LLM tool definition format."""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
}
}

Groups and Profiles

Tool Groups

class ToolGroup(Enum):
FS = "fs" # File system
SEARCH = "search" # Search operations
EXEC = "exec" # Shell execution
INTERACT = "interact" # User interaction

Tool Profiles

class ToolProfile(Enum):
MINIMAL = "minimal" # Read-only
CODING = "coding" # Read + write
FULL = "full" # All including shell

PROFILE_GROUPS = {
ToolProfile.MINIMAL: {ToolGroup.FS, ToolGroup.SEARCH, ToolGroup.INTERACT},
ToolProfile.CODING: {ToolGroup.FS, ToolGroup.SEARCH, ToolGroup.INTERACT},
ToolProfile.FULL: {ToolGroup.FS, ToolGroup.SEARCH, ToolGroup.EXEC, ToolGroup.INTERACT},
}

# Write tools restricted in MINIMAL
WRITE_TOOLS = {"write_file", "edit_file", "delete_file"}

Built-in Tools

list_files

def list_files(path: str, pattern: str = "*", recursive: bool = False) -> dict:
"""List files in directory."""
p = Path(path)
if not p.exists():
return {"error": f"Path not found: {path}"}

if recursive:
files = list(p.rglob(pattern))
else:
files = list(p.glob(pattern))

return {
"path": str(p),
"files": [str(f.relative_to(p)) for f in files],
"count": len(files)
}

read_file

def read_file(path: str, start_line: int = None, end_line: int = None) -> dict:
"""Read file contents."""
p = Path(path)
if not p.exists():
return {"error": f"File not found: {path}"}

# Size check (1MB limit)
if p.stat().st_size > 1_000_000:
return {"error": "File too large (>1MB)"}

content = p.read_text()
lines = content.splitlines()

# Line range
if start_line or end_line:
start = (start_line or 1) - 1
end = end_line or len(lines)
lines = lines[start:end]
content = "\n".join(lines)

return {
"path": str(p),
"content": content,
"lines": len(lines)
}

write_file

def write_file(path: str, content: str) -> dict:
"""Write content to file."""
p = Path(path).resolve()

# Safety: only write to home or tmp
home = Path.home()
if not (str(p).startswith(str(home)) or str(p).startswith("/tmp")):
return {"error": "Can only write to home directory or /tmp"}

# Create parent directories
p.parent.mkdir(parents=True, exist_ok=True)

p.write_text(content)
return {
"path": str(p),
"lines": len(content.splitlines()),
"success": True
}

edit_file

def edit_file(path: str, old_text: str, new_text: str, occurrence: int = 1) -> dict:
"""Replace text in file."""
p = Path(path)
if not p.exists():
return {"error": f"File not found: {path}"}

content = p.read_text()

if old_text not in content:
return {"error": "Text not found in file"}

if occurrence == 0:
# Replace all
new_content = content.replace(old_text, new_text)
count = content.count(old_text)
else:
# Replace nth occurrence
parts = content.split(old_text)
if len(parts) <= occurrence:
return {"error": f"Occurrence {occurrence} not found"}

new_content = old_text.join(parts[:occurrence]) + new_text + old_text.join(parts[occurrence:])
count = 1

p.write_text(new_content)
return {
"path": str(p),
"replacements_made": count,
"success": True
}

delete_file

def delete_file(path: str) -> dict:
"""Delete a file or empty directory."""
p = Path(path).resolve()

# Safety: only delete in home or tmp
home = Path.home()
if not (str(p).startswith(str(home)) or str(p).startswith("/tmp")):
return {"error": "Can only delete in home directory or /tmp"}

# Block critical paths
critical = [str(home), str(home / ".ssh"), str(home / ".config")]
if str(p) in critical:
return {"error": "Cannot delete critical path"}

if not p.exists():
return {"error": f"Path not found: {path}"}

if p.is_file():
p.unlink()
return {"path": str(p), "deleted": True, "type": "file"}
elif p.is_dir():
if any(p.iterdir()):
return {"error": "Directory not empty"}
p.rmdir()
return {"path": str(p), "deleted": True, "type": "directory"}

search_files

def search_files(path: str, pattern: str, file_pattern: str = "*.py") -> dict:
"""Search for pattern in files."""
p = Path(path)
if not p.exists():
return {"error": f"Path not found: {path}"}

matches = []
for file in p.rglob(file_pattern):
if file.is_file():
try:
content = file.read_text()
for i, line in enumerate(content.splitlines(), 1):
if pattern in line:
matches.append({
"file": str(file.relative_to(p)),
"line": i,
"content": line.strip()
})
except Exception:
pass

return {
"path": str(p),
"pattern": pattern,
"matches": matches[:50], # Limit results
"count": len(matches)
}

run_command

def run_command(command: str, working_dir: str = None) -> dict:
"""Execute shell command."""
try:
result = subprocess.run(
command,
shell=True,
cwd=working_dir,
capture_output=True,
text=True,
timeout=30
)

return {
"command": command,
"stdout": result.stdout,
"stderr": result.stderr,
"exit_code": result.returncode
}
except subprocess.TimeoutExpired:
return {"error": "Command timed out (30s)"}
except Exception as e:
return {"error": str(e)}

ask_user

# Placeholder - handled specially by server
def ask_user(question: str, options: list = None) -> dict:
"""Ask user a clarifying question."""
return {"error": "ask_user should be handled by server"}

LLM Integration

Tool Definitions

def get_definitions(self) -> List[dict]:
"""Get LLM-compatible tool definitions."""
return [
tool.to_definition()
for tool in self.get_available_tools()
]

Example Definition

{
"type": "function",
"function": {
"name": "read_file",
"description": "Read the contents of a file",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "File path to read"
},
"start_line": {
"type": "integer",
"description": "Start line (1-indexed)"
},
"end_line": {
"type": "integer",
"description": "End line"
}
},
"required": ["path"]
}
}
}

Permission Modes

Separate from tool profiles, permission modes control when to ask the user before executing tools.

# permissions/modes.py
class PermissionMode(Enum):
DEFAULT = "default" # Ask for edits and commands
AUTO_EDIT = "auto-edit" # Allow edits, ask for commands
PLAN = "plan" # Read-only only
YOLO = "yolo" # Allow everything

# Tool categories
READ_TOOLS = {"list_files", "read_file", "search_files"}
WRITE_TOOLS = {"write_file", "edit_file", "delete_file"}
EXEC_TOOLS = {"run_command"}

class ModePermissionManager:
def is_allowed(self, tool_name: str, args: dict) -> bool:
"""Check if tool runs without asking."""
...

def is_blocked(self, tool_name: str, args: dict) -> bool:
"""Check if tool is blocked (plan mode)."""
...

def needs_approval(self, tool_name: str, args: dict) -> bool:
"""Check if we need to ask user first."""
...

Execution Flow

1. LLM response includes tool_calls
2. Server iterates through calls
3. For each call:
a. Check if blocked (plan mode) → skip with message
b. Check if needs approval → ask user
c. Check availability (profile)
d. If ask_user: handle specially
e. Otherwise: execute via registry
f. Send notification chunks to CLI
g. Add result to LLM context
4. Continue LLM conversation with results

Singleton Pattern

_registry: Optional[ToolRegistry] = None

def get_tool_registry() -> ToolRegistry:
"""Get global tool registry."""
global _registry
if _registry is None:
_registry = ToolRegistry()
_register_default_tools(_registry)
return _registry