Skip to main content

Python SDK

Official Python SDK for building applications and automation with the Workspace Platform.

Overview

The Workspace Python SDK provides a comprehensive, Pythonic interface for integrating workspace functionality into your Python applications. With 23 modules spanning 18,284+ lines of code, it offers async/await support, enhanced RBAC features, and production-ready error handling.

Repository: /workspaces-sdk-python

Key Features

Comprehensive API Coverage

  • 23 Modules covering all platform features
  • 18,284+ lines of production code
  • GraphQL and REST API integration
  • Async/await for high-performance operations
  • Synchronous API also available

Enhanced Features

  • Advanced RBAC implementation
  • Security-focused design
  • Comprehensive error handling
  • Request retry and backoff
  • Connection pooling
  • Webhook support

Python-Native

  • Type hints throughout
  • Context managers for resources
  • Pythonic API design
  • Exception hierarchy
  • pytest-based testing

Technology Stack

  • Language: Python 3.9+
  • HTTP Client: httpx
  • Async: asyncio
  • Testing: pytest
  • Type Checking: mypy
  • Code Quality: black, isort, flake8

Installation

Using pip

pip install workspace-sdk

Using poetry

poetry add workspace-sdk

From Source

# Clone and navigate
cd /home/kruthika/Documents/Products/products/workspace/workspaces-sdk-python

# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate

# Install dependencies
pip install --upgrade pip
pip install -r requirements.txt

# Install SDK
pip install -e .

Quick Start

Basic Setup

from workspace_sdk import WorkspaceClient

# Initialize client
client = WorkspaceClient(
api_key='your_api_key_here',
endpoint='https://api.workspace-platform.ai' # optional
)

# Create workspace
workspace = client.workspaces.create(
name='My Workspace',
description='A new workspace'
)

print(f'Created workspace: {workspace.id}')

Async Setup

import asyncio
from workspace_sdk import AsyncWorkspaceClient

async def main():
# Initialize async client
async with AsyncWorkspaceClient(api_key='your_key') as client:
# Create workspace
workspace = await client.workspaces.create(
name='My Workspace',
description='A new workspace'
)
print(f'Created workspace: {workspace.id}')

# Run
asyncio.run(main())

Core Modules

Workspace Management

from workspace_sdk import WorkspaceClient

client = WorkspaceClient(api_key='your_key')

# Create workspace
workspace = client.workspaces.create(
name='Engineering Team',
description='Engineering department workspace',
settings={
'visibility': 'private',
'member_limit': 50
}
)

# Get workspace
ws = client.workspaces.get('ws_abc123')

# Update workspace
client.workspaces.update(
'ws_abc123',
name='Engineering Department',
settings={'visibility': 'internal'}
)

# List workspaces
workspaces = client.workspaces.list(
limit=20,
status='active'
)

# Delete workspace
client.workspaces.delete('ws_abc123')

Member Management

# Add member
member = client.members.add(
workspace_id='ws_abc123',
email='[email protected]',
role='admin',
team_ids=['team_1', 'team_2']
)

# Get member
mem = client.members.get('mem_abc123')

# Update member
client.members.update(
'mem_abc123',
role='editor',
permissions=['read', 'write', 'comment']
)

# List members
members = client.members.list(
workspace_id='ws_abc123',
role='admin',
limit=50
)

# Remove member
client.members.remove('mem_abc123')

# Bulk invite
invited = client.members.bulk_invite(
workspace_id='ws_abc123',
emails=['[email protected]', '[email protected]'],
role='viewer'
)

Team Operations

# Create team
team = client.teams.create(
workspace_id='ws_abc123',
name='Frontend Team',
description='Frontend developers',
member_ids=['mem_1', 'mem_2']
)

# Get team
tm = client.teams.get('team_abc123')

# Update team
client.teams.update(
'team_abc123',
name='Frontend Engineering',
settings={'max_members': 20}
)

# Add members to team
client.teams.add_members(
'team_abc123',
member_ids=['mem_3', 'mem_4']
)

# Remove members from team
client.teams.remove_members(
'team_abc123',
member_ids=['mem_1']
)

# List teams
teams = client.teams.list(
workspace_id='ws_abc123',
limit=20
)

# Delete team
client.teams.delete('team_abc123')

Project Management

# Create project
project = client.projects.create(
workspace_id='ws_abc123',
name='Mobile App Redesign',
description='Q4 mobile app redesign project',
team_id='team_abc123',
status='active',
settings={
'visibility': 'team',
'due_date': '2025-12-31'
}
)

# Get project
proj = client.projects.get('proj_abc123')

# Update project
client.projects.update(
'proj_abc123',
status='in_progress',
progress=45
)

# List projects
projects = client.projects.list(
workspace_id='ws_abc123',
status='active',
team_id='team_abc123'
)

# Assign project to team
client.projects.assign_team('proj_abc123', 'team_xyz789')

# Delete project
client.projects.delete('proj_abc123')

RBAC (Role-Based Access Control)

# List roles
roles = client.rbac.roles.list()

# Get role
role = client.rbac.roles.get('role_abc123')

# Create custom role
custom_role = client.rbac.roles.create(
name='Contractor',
description='External contractor access',
permissions=['read', 'comment']
)

# Assign role to member
client.rbac.assign(
member_id='mem_abc123',
role_id='role_abc123',
resource_id='ws_abc123',
resource_type='workspace'
)

# Check permission
has_permission = client.rbac.check_permission(
member_id='mem_abc123',
action='edit',
resource_id='proj_abc123',
resource_type='project'
)

print(f'Has edit permission: {has_permission}')

# Get member permissions
permissions = client.rbac.get_permissions('mem_abc123')

Async Operations

Async Client

import asyncio
from workspace_sdk import AsyncWorkspaceClient

async def main():
async with AsyncWorkspaceClient(api_key='your_key') as client:
# All operations are async
workspace = await client.workspaces.create(name='Test')
members = await client.members.list(workspace_id=workspace.id)

# Concurrent operations
results = await asyncio.gather(
client.workspaces.get('ws_1'),
client.workspaces.get('ws_2'),
client.workspaces.get('ws_3')
)

print(f'Fetched {len(results)} workspaces')

asyncio.run(main())

High-Performance Batch Operations

async def batch_create_workspaces(names):
async with AsyncWorkspaceClient(api_key='your_key') as client:
tasks = [
client.workspaces.create(name=name)
for name in names
]
workspaces = await asyncio.gather(*tasks)
return workspaces

# Create 100 workspaces concurrently
workspace_names = [f'Workspace {i}' for i in range(100)]
workspaces = asyncio.run(batch_create_workspaces(workspace_names))
print(f'Created {len(workspaces)} workspaces')

Advanced Features

Pagination

# Manual pagination
page1 = client.workspaces.list(limit=10)
page2 = client.workspaces.list(limit=10, cursor=page1.next_cursor)

# Iterator (auto-pagination)
for workspace in client.workspaces.iterate(limit=10):
print(workspace.name)

# Get all (auto-pagination)
all_workspaces = client.workspaces.list_all()

Error Handling

from workspace_sdk import (
WorkspaceClient,
WorkspaceError,
NotFoundError,
ValidationError,
AuthenticationError,
RateLimitError
)

client = WorkspaceClient(api_key='your_key')

try:
workspace = client.workspaces.get('ws_invalid')
except NotFoundError:
print('Workspace not found')
except ValidationError as e:
print(f'Validation failed: {e.errors}')
except AuthenticationError:
print('Invalid API key')
except RateLimitError as e:
print(f'Rate limit exceeded, retry after: {e.retry_after}')
except WorkspaceError as e:
print(f'Workspace error: {e.message}')
except Exception as e:
print(f'Unexpected error: {e}')

Context Managers

# Automatic resource cleanup
with WorkspaceClient(api_key='your_key') as client:
workspace = client.workspaces.create(name='Test')
# Client automatically closed after context

# Async context manager
async with AsyncWorkspaceClient(api_key='your_key') as client:
workspace = await client.workspaces.create(name='Test')
# Connection pool automatically closed

Webhooks

from workspace_sdk import WebhookClient

webhooks = WebhookClient(api_key='your_key')

# Create webhook
webhook = webhooks.create(
url='https://myapp.com/webhooks/workspace',
events=[
'workspace.created',
'workspace.updated',
'member.added',
'team.created'
],
secret='webhook_secret_key'
)

# List webhooks
all_webhooks = webhooks.list()

# Update webhook
webhooks.update(
'webhook_abc123',
events=['workspace.created', 'workspace.deleted']
)

# Delete webhook
webhooks.delete('webhook_abc123')

# Verify webhook signature (in your server)
from workspace_sdk.webhooks import verify_signature

def handle_webhook(request):
signature = request.headers.get('X-Workspace-Signature')
is_valid = verify_signature(
request.body,
signature,
'webhook_secret_key'
)

if not is_valid:
return {'error': 'Invalid signature'}, 401

# Process webhook
event = request.json()
print(f'Received event: {event["type"]}')

return {'status': 'ok'}, 200

File Operations

from workspace_sdk import FileClient

files = FileClient(api_key='your_key')

# Upload file
with open('/path/to/file.pdf', 'rb') as f:
file = files.upload(
workspace_id='ws_abc123',
file=f,
filename='document.pdf',
metadata={
'project_id': 'proj_abc123',
'tags': ['important', 'q4']
}
)

# Download file
with files.download('file_abc123') as response:
with open('/path/to/download.pdf', 'wb') as f:
for chunk in response.iter_bytes():
f.write(chunk)

# List files
file_list = files.list(
workspace_id='ws_abc123',
project_id='proj_abc123'
)

# Delete file
files.delete('file_abc123')

Activity and Audit

from datetime import datetime, timedelta

# Get activity log
start_date = datetime.now() - timedelta(days=30)
end_date = datetime.now()

activities = client.activity.list(
workspace_id='ws_abc123',
start_date=start_date,
end_date=end_date,
actor_id='mem_abc123',
limit=100
)

# Get workspace statistics
stats = client.analytics.get_workspace_stats('ws_abc123')
print(f'Total members: {stats.total_members}')
print(f'Active projects: {stats.active_projects}')

# Export audit trail
audit_data = client.audit.export(
workspace_id='ws_abc123',
start_date=start_date,
end_date=end_date,
format='json'
)

Configuration

Client Options

from workspace_sdk import WorkspaceClient

client = WorkspaceClient(
# Required
api_key='your_api_key',

# Optional
endpoint='https://api.workspace-platform.ai',
timeout=30.0, # 30 seconds
max_retries=3,
retry_delay=1.0, # 1 second

# Custom headers
headers={
'X-Custom-Header': 'value'
},

# Proxy
proxies={
'http': 'http://proxy.example.com:8080',
'https': 'https://proxy.example.com:8080'
},

# Logging
log_level='DEBUG' # 'ERROR' | 'WARNING' | 'INFO' | 'DEBUG'
)

Environment Variables

WORKSPACE_API_KEY=your_api_key
WORKSPACE_API_ENDPOINT=https://api.workspace-platform.ai
WORKSPACE_TIMEOUT=30
WORKSPACE_LOG_LEVEL=INFO

Load automatically:

from workspace_sdk import WorkspaceClient

# Automatically uses WORKSPACE_API_KEY env var
client = WorkspaceClient()

Testing

Unit Tests

# test_workspace.py
import pytest
from workspace_sdk import WorkspaceClient

@pytest.fixture
def client():
return WorkspaceClient(api_key='test_key')

def test_create_workspace(client):
workspace = client.workspaces.create(name='Test Workspace')
assert workspace.id is not None
assert workspace.name == 'Test Workspace'

def test_list_workspaces(client):
workspaces = client.workspaces.list()
assert isinstance(workspaces.data, list)

@pytest.mark.asyncio
async def test_async_create_workspace():
from workspace_sdk import AsyncWorkspaceClient

async with AsyncWorkspaceClient(api_key='test_key') as client:
workspace = await client.workspaces.create(name='Test')
assert workspace.id is not None

Mock Client

from unittest.mock import Mock
from workspace_sdk import WorkspaceClient

def test_with_mock():
# Create mock client
mock_client = Mock(spec=WorkspaceClient)

# Mock responses
mock_client.workspaces.create.return_value = Mock(
id='ws_mock',
name='Mock Workspace'
)

# Use in tests
workspace = mock_client.workspaces.create(name='Test')
assert workspace.id == 'ws_mock'
mock_client.workspaces.create.assert_called_once()

Running Tests

# Run all tests
pytest

# Run specific test file
pytest tests/test_workspace.py

# Run with coverage
pytest --cov=workspace_sdk --cov-report=html

# Run async tests
pytest -v tests/test_async.py

# Run with markers
pytest -m "not slow"

Type Hints

Full Type Support

from typing import List, Optional
from workspace_sdk import (
WorkspaceClient,
Workspace,
Member,
Team,
Project
)

def create_workspace_with_team(
client: WorkspaceClient,
workspace_name: str,
team_name: str,
member_emails: List[str]
) -> tuple[Workspace, Team, List[Member]]:
"""Create workspace with team and members."""

# Create workspace (typed as Workspace)
workspace: Workspace = client.workspaces.create(name=workspace_name)

# Create team (typed as Team)
team: Team = client.teams.create(
workspace_id=workspace.id,
name=team_name
)

# Add members (typed as List[Member])
members: List[Member] = []
for email in member_emails:
member = client.members.add(
workspace_id=workspace.id,
email=email,
team_ids=[team.id]
)
members.append(member)

return workspace, team, members

# Type checking with mypy
# $ mypy your_script.py

Best Practices

✅ Use environment variables for API keys ✅ Implement proper error handling ✅ Use async client for high-performance operations ✅ Enable request retries for production ✅ Implement pagination for large datasets ✅ Use context managers for resource cleanup ✅ Add type hints for better IDE support ✅ Cache responses when appropriate ✅ Monitor API usage and rate limits ✅ Use webhooks for real-time updates ✅ Validate input before API calls ✅ Log API errors for debugging

Performance

  • Sync Client: 50-100 requests/second
  • Async Client: 500+ requests/second
  • Memory Usage: ~30MB base
  • Connection Pooling: Automatic (httpx)
  • Request Latency: 100-300ms (API dependent)

Troubleshooting

Common Issues

Issue: Authentication errors

# Solution: Verify API key
import os
from workspace_sdk import WorkspaceClient

client = WorkspaceClient(
api_key=os.getenv('WORKSPACE_API_KEY'),
log_level='DEBUG' # Enable debug logging
)

Issue: Rate limiting

# Solution: Implement retry logic with backoff
from workspace_sdk import WorkspaceClient, RateLimitError
import time

client = WorkspaceClient(
api_key='your_key',
max_retries=5,
retry_delay=2.0
)

try:
workspace = client.workspaces.create(name='Test')
except RateLimitError as e:
time.sleep(e.retry_after)
workspace = client.workspaces.create(name='Test')

Issue: Type checking errors

# Solution: Install type stubs
pip install types-requests
pip install mypy

# Run type checker
mypy your_script.py

Framework Integration

Django

# workspace/client.py
from django.conf import settings
from workspace_sdk import WorkspaceClient

def get_workspace_client() -> WorkspaceClient:
return WorkspaceClient(api_key=settings.WORKSPACE_API_KEY)

# views.py
from django.http import JsonResponse
from .client import get_workspace_client

def list_workspaces(request):
client = get_workspace_client()
workspaces = client.workspaces.list()
return JsonResponse({
'workspaces': [ws.dict() for ws in workspaces.data]
})

Flask

from flask import Flask, jsonify
from workspace_sdk import WorkspaceClient
import os

app = Flask(__name__)
client = WorkspaceClient(api_key=os.getenv('WORKSPACE_API_KEY'))

@app.route('/workspaces')
def list_workspaces():
workspaces = client.workspaces.list()
return jsonify({
'workspaces': [ws.dict() for ws in workspaces.data]
})

if __name__ == '__main__':
app.run()

FastAPI

from fastapi import FastAPI, Depends
from workspace_sdk import AsyncWorkspaceClient
import os

app = FastAPI()

async def get_client():
async with AsyncWorkspaceClient(
api_key=os.getenv('WORKSPACE_API_KEY')
) as client:
yield client

@app.get('/workspaces')
async def list_workspaces(client: AsyncWorkspaceClient = Depends(get_client)):
workspaces = await client.workspaces.list()
return {'workspaces': [ws.dict() for ws in workspaces.data]}

Celery (Task Queue)

from celery import Celery
from workspace_sdk import WorkspaceClient
import os

app = Celery('tasks', broker='redis://localhost:6379')
client = WorkspaceClient(api_key=os.getenv('WORKSPACE_API_KEY'))

@app.task
def create_workspace_task(name: str, description: str):
workspace = client.workspaces.create(
name=name,
description=description
)
return workspace.id

# Usage
result = create_workspace_task.delay('New Workspace', 'Description')
workspace_id = result.get()

Support