Skip to content

Python SDK

The official Python SDK is under active development. In the meantime, use the REST API directly with requests or httpx.

import requests
import os
API_KEY = os.environ['ASYNCQUEUE_API_KEY']
BASE_URL = 'https://api.asyncqueue.io'
headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json'
}
def create_task(task_data):
response = requests.post(
f'{BASE_URL}/v1/tasks',
headers=headers,
json=task_data
)
return response.json()
def get_task(task_id):
response = requests.get(
f'{BASE_URL}/v1/tasks/{task_id}',
headers=headers
)
return response.json()
# Create a task
result = create_task({
'targetUrl': 'https://api.example.com/process',
'body': '{"orderId": 12345}',
'maxRetries': 5
})
print(f"Task created: {result['task']['id']}")
# Check status
task = get_task(result['task']['id'])
print(f"Status: {task['task']['status']}")

The SDK will provide a typed client with async support:

from asyncqueue import AsyncQueue
aq = AsyncQueue(api_key=os.environ['ASYNCQUEUE_API_KEY'])
task = aq.tasks.create(
callback_url='https://api.example.com/process',
body={'orderId': 12345},
max_retries=5,
on_complete_url='https://api.example.com/done'
)
result = aq.tasks.get(task.id)

Want early access? Contact us.