Tools Architecture
OWL's tool system provides controlled file and shell operations.
Registry Pattern
# tools/registry.py
class ToolRegistry:
def __init__(self):
self._tools: Dict[str, ToolSpec] = {}
self._profile: ToolProfile = ToolProfile.CODING
self._disabled_tools: Set[str] = set()
def register(self, name: str, ...):
"""Register a tool."""
self._tools[name] = ToolSpec(...)
def is_tool_available(self, name: str) -> bool:
"""Check tool availability under current profile."""
if name not in self._tools:
return False
if name in self._disabled_tools:
return False
tool = self._tools[name]
allowed_groups = PROFILE_GROUPS[self._profile]
if tool.group not in allowed_groups:
return False
# Write tools disabled in MINIMAL
if self._profile == ToolProfile.MINIMAL and tool.can_write:
return False
return True
def execute(self, name: str, args: dict) -> dict:
"""Execute a tool."""
if not self.is_tool_available(name):
return {"error": f"Tool not available: {name}"}
tool = self._tools[name]
return tool.function(**args)
Tool Specification
@dataclass
class ToolSpec:
name: str
description: str
group: ToolGroup
function: Callable
parameters: Dict[str, Any]
can_write: bool = False
def to_definition(self) -> dict:
"""Convert to LLM tool definition format."""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
}
}
Groups and Profiles
Tool Groups
class ToolGroup(Enum):
FS = "fs" # File system
SEARCH = "search" # Search operations
EXEC = "exec" # Shell execution
INTERACT = "interact" # User interaction
Tool Profiles
class ToolProfile(Enum):
MINIMAL = "minimal" # Read-only
CODING = "coding" # Read + write
FULL = "full" # All including shell
PROFILE_GROUPS = {
ToolProfile.MINIMAL: {ToolGroup.FS, ToolGroup.SEARCH, ToolGroup.INTERACT},
ToolProfile.CODING: {ToolGroup.FS, ToolGroup.SEARCH, ToolGroup.INTERACT},
ToolProfile.FULL: {ToolGroup.FS, ToolGroup.SEARCH, ToolGroup.EXEC, ToolGroup.INTERACT},
}
# Write tools restricted in MINIMAL
WRITE_TOOLS = {"write_file", "edit_file", "delete_file"}
Built-in Tools
list_files
def list_files(path: str, pattern: str = "*", recursive: bool = False) -> dict:
"""List files in directory."""
p = Path(path)
if not p.exists():
return {"error": f"Path not found: {path}"}
if recursive:
files = list(p.rglob(pattern))
else:
files = list(p.glob(pattern))
return {
"path": str(p),
"files": [str(f.relative_to(p)) for f in files],
"count": len(files)
}
read_file
def read_file(path: str, start_line: int = None, end_line: int = None) -> dict:
"""Read file contents."""
p = Path(path)
if not p.exists():
return {"error": f"File not found: {path}"}
# Size check (1MB limit)
if p.stat().st_size > 1_000_000:
return {"error": "File too large (>1MB)"}
content = p.read_text()
lines = content.splitlines()
# Line range
if start_line or end_line:
start = (start_line or 1) - 1
end = end_line or len(lines)
lines = lines[start:end]
content = "\n".join(lines)
return {
"path": str(p),
"content": content,
"lines": len(lines)
}
write_file
def write_file(path: str, content: str) -> dict:
"""Write content to file."""
p = Path(path).resolve()
# Safety: only write to home or tmp
home = Path.home()
if not (str(p).startswith(str(home)) or str(p).startswith("/tmp")):
return {"error": "Can only write to home directory or /tmp"}
# Create parent directories
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(content)
return {
"path": str(p),
"lines": len(content.splitlines()),
"success": True
}
edit_file
def edit_file(path: str, old_text: str, new_text: str, occurrence: int = 1) -> dict:
"""Replace text in file."""
p = Path(path)
if not p.exists():
return {"error": f"File not found: {path}"}
content = p.read_text()
if old_text not in content:
return {"error": "Text not found in file"}
if occurrence == 0:
# Replace all
new_content = content.replace(old_text, new_text)
count = content.count(old_text)
else:
# Replace nth occurrence
parts = content.split(old_text)
if len(parts) <= occurrence:
return {"error": f"Occurrence {occurrence} not found"}
new_content = old_text.join(parts[:occurrence]) + new_text + old_text.join(parts[occurrence:])
count = 1
p.write_text(new_content)
return {
"path": str(p),
"replacements_made": count,
"success": True
}
delete_file
def delete_file(path: str) -> dict:
"""Delete a file or empty directory."""
p = Path(path).resolve()
# Safety: only delete in home or tmp
home = Path.home()
if not (str(p).startswith(str(home)) or str(p).startswith("/tmp")):
return {"error": "Can only delete in home directory or /tmp"}
# Block critical paths
critical = [str(home), str(home / ".ssh"), str(home / ".config")]
if str(p) in critical:
return {"error": "Cannot delete critical path"}
if not p.exists():
return {"error": f"Path not found: {path}"}
if p.is_file():
p.unlink()
return {"path": str(p), "deleted": True, "type": "file"}
elif p.is_dir():
if any(p.iterdir()):
return {"error": "Directory not empty"}
p.rmdir()
return {"path": str(p), "deleted": True, "type": "directory"}
search_files
def search_files(path: str, pattern: str, file_pattern: str = "*.py") -> dict:
"""Search for pattern in files."""
p = Path(path)
if not p.exists():
return {"error": f"Path not found: {path}"}
matches = []
for file in p.rglob(file_pattern):
if file.is_file():
try:
content = file.read_text()
for i, line in enumerate(content.splitlines(), 1):
if pattern in line:
matches.append({
"file": str(file.relative_to(p)),
"line": i,
"content": line.strip()
})
except Exception:
pass
return {
"path": str(p),
"pattern": pattern,
"matches": matches[:50], # Limit results
"count": len(matches)
}
run_command
def run_command(command: str, working_dir: str = None) -> dict:
"""Execute shell command."""
try:
result = subprocess.run(
command,
shell=True,
cwd=working_dir,
capture_output=True,
text=True,
timeout=30
)
return {
"command": command,
"stdout": result.stdout,
"stderr": result.stderr,
"exit_code": result.returncode
}
except subprocess.TimeoutExpired:
return {"error": "Command timed out (30s)"}
except Exception as e:
return {"error": str(e)}
ask_user
# Placeholder - handled specially by server
def ask_user(question: str, options: list = None) -> dict:
"""Ask user a clarifying question."""
return {"error": "ask_user should be handled by server"}
LLM Integration
Tool Definitions
def get_definitions(self) -> List[dict]:
"""Get LLM-compatible tool definitions."""
return [
tool.to_definition()
for tool in self.get_available_tools()
]
Example Definition
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read the contents of a file",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "File path to read"
},
"start_line": {
"type": "integer",
"description": "Start line (1-indexed)"
},
"end_line": {
"type": "integer",
"description": "End line"
}
},
"required": ["path"]
}
}
}
Permission Modes
Separate from tool profiles, permission modes control when to ask the user before executing tools.
# permissions/modes.py
class PermissionMode(Enum):
DEFAULT = "default" # Ask for edits and commands
AUTO_EDIT = "auto-edit" # Allow edits, ask for commands
PLAN = "plan" # Read-only only
YOLO = "yolo" # Allow everything
# Tool categories
READ_TOOLS = {"list_files", "read_file", "search_files"}
WRITE_TOOLS = {"write_file", "edit_file", "delete_file"}
EXEC_TOOLS = {"run_command"}
class ModePermissionManager:
def is_allowed(self, tool_name: str, args: dict) -> bool:
"""Check if tool runs without asking."""
...
def is_blocked(self, tool_name: str, args: dict) -> bool:
"""Check if tool is blocked (plan mode)."""
...
def needs_approval(self, tool_name: str, args: dict) -> bool:
"""Check if we need to ask user first."""
...
Execution Flow
1. LLM response includes tool_calls
2. Server iterates through calls
3. For each call:
a. Check if blocked (plan mode) → skip with message
b. Check if needs approval → ask user
c. Check availability (profile)
d. If ask_user: handle specially
e. Otherwise: execute via registry
f. Send notification chunks to CLI
g. Add result to LLM context
4. Continue LLM conversation with results
Singleton Pattern
_registry: Optional[ToolRegistry] = None
def get_tool_registry() -> ToolRegistry:
"""Get global tool registry."""
global _registry
if _registry is None:
_registry = ToolRegistry()
_register_default_tools(_registry)
return _registry