Skip to content

SDK Setup Guide

Install and configure the official Subscribe Flow SDKs for Python and TypeScript.

Overview

SDK Package Install Language Features
Python subscribeflow pip install subscribeflow Python 3.11+ Async, Type Hints, Pydantic
TypeScript @subscribeflow/sdk npm install @subscribeflow/sdk TS/JS Types from OpenAPI, ESM

Python SDK

Installation

Bash
pip install subscribeflow

Or with uv:

Bash
uv add subscribeflow

Basic Usage

Python
import asyncio
from subscribeflow import SubscribeFlowClient

async def main():
    # Use the client as a context manager (recommended)
    async with SubscribeFlowClient(api_key="sf_live_xxx") as client:
        # Create a subscriber
        subscriber = await client.subscribers.create(
            email="user@example.com",
            tags=["newsletter"],
            metadata={"source": "website"},
        )
        print(f"Subscriber created: {subscriber.id}")

        # List tags
        tags = await client.tags.list()
        for tag in tags:
            print(f"- {tag.name}: {tag.subscriber_count} Subscribers")

asyncio.run(main())

Configuration

Python
from subscribeflow import SubscribeFlowClient

client = SubscribeFlowClient(
    # Required: Your API key
    api_key="sf_live_xxx",

    # Optional: API base URL (default: https://api.subscribeflow.net)
    base_url="https://api.subscribeflow.net",

    # Optional: Request timeout in seconds (default: 30)
    timeout=30.0,
)

Subscriber Operations

Python
# Create
subscriber = await client.subscribers.create(
    email="user@example.com",
    tags=["product-updates", "newsletter"],
    metadata={"plan": "premium"},
)

# List with pagination
result = await client.subscribers.list(
    limit=50,
    status="active",
    tag="newsletter",
)

# Iterate through all pages
while True:
    for subscriber in result:
        print(subscriber.email)
    if not result.next_cursor:
        break
    result = await client.subscribers.list(cursor=result.next_cursor)

# Get a single subscriber
subscriber = await client.subscribers.get("subscriber-id")

# Update
updated = await client.subscribers.update(
    "subscriber-id",
    metadata={"plan": "enterprise"},
    status="active",
)

# Delete
await client.subscribers.delete("subscriber-id")

# Generate preference center token
token = await client.subscribers.generate_token("subscriber-id")
preference_url = f"https://your-app.com/preferences?token={token.token}"

Tag Operations

Python
# Create
tag = await client.tags.create(
    name="Product Updates",
    slug="product-updates",
    description="Receive notifications about new features",
    metadata={"category": "product"},
)

# List
tags = await client.tags.list()

# Update
updated = await client.tags.update(
    "tag-id",
    description="New description",
)

# Delete
await client.tags.delete("tag-id")

Template Operations

Python
# Create a template
template = await client.templates.create(
    name="Welcome Email",
    subject="Welcome to {{company}}!",
    mjml_content="<mjml><mj-body>...</mj-body></mjml>",
    category="transactional",
)

# List templates
templates = await client.templates.list(category="transactional")

# Get a template by slug
template = await client.templates.get_by_slug("welcome-email")

# Render a preview with variables
preview = await client.templates.preview(
    "template-id",
    variables={"company": "Acme AG"},
)
print(preview.html)

# Update
await client.templates.update("template-id", subject="New subject")

# Delete
await client.templates.delete("template-id")

Sending Emails

Python
1
2
3
4
5
6
7
8
# Send a transactional email
result = await client.emails.send(
    template_slug="welcome-email",
    to="user@example.com",
    variables={"company": "Acme AG"},
    idempotency_key="unique-key-123",
)
print(f"Email queued: {result.id} (Status: {result.status})")

Campaign Operations

Python
from subscribeflow.models import TagFilter

# Create a campaign
campaign = await client.campaigns.create(
    name="February Newsletter",
    template_id="template-uuid",
    tag_filter=TagFilter(include_tags=["newsletter"], match="any"),
)

# List campaigns
campaigns = await client.campaigns.list(status="draft")

# Check recipient count in advance
count = await client.campaigns.count_recipients("campaign-id")
print(f"Will be sent to {count.count} subscribers")

# Send campaign
result = await client.campaigns.send("campaign-id")
print(f"Sending to {result.total_recipients} recipients started")

# Cancel a running campaign
await client.campaigns.cancel("campaign-id")

Trigger Operations

Python
# Create an event-based trigger
trigger = await client.triggers.create(
    event_type="subscriber.created",
    template_id="welcome-template-uuid",
    description="Send welcome email on sign-up",
)

# List triggers
triggers = await client.triggers.list()

# Deactivate a trigger
await client.triggers.update("trigger-id", is_active=False)

# Delete a trigger
await client.triggers.delete("trigger-id")

Billing Operations

Python
# Get current subscription info
subscription = await client.billing.get_subscription()
print(f"Tier: {subscription.tier}")
print(f"Subscriber limit: {subscription.subscriber_limit}")
print(f"Email limit: {subscription.email_limit}")

# Create a Stripe checkout session for upgrading
checkout = await client.billing.create_checkout_session(
    price_id="price_...",
    success_url="https://app.example.com/billing?success=true",
    cancel_url="https://app.example.com/billing?cancel=true",
)
print(f"Checkout URL: {checkout.url}")

# Get usage statistics
usage = await client.billing.get_usage()
print(f"Subscribers: {usage.subscriber_count}/{usage.subscriber_limit}")
print(f"Emails this month: {usage.emails_sent}/{usage.email_limit}")

Webhook Operations

Python
# Create a webhook endpoint
webhook = await client.webhooks.create(
    url="https://your-app.com/webhooks/subscribeflow",
    events=["subscriber.created", "subscriber.tag.subscribed"],
    description="Main webhook",
)

# Important: Store the signing secret securely!
print(f"Signing Secret: {webhook.signing_secret}")

# List
webhooks = await client.webhooks.list()

# Update
await client.webhooks.update(
    "webhook-id",
    events=["subscriber.created", "subscriber.deleted"],
)

# Test
result = await client.webhooks.test("webhook-id")
if result.success:
    print(f"Webhook works! Response time: {result.response_time_ms}ms")

# Rotate signing secret
new_webhook = await client.webhooks.rotate_secret("webhook-id")
print(f"New secret: {new_webhook.signing_secret}")

# View delivery history
deliveries = await client.webhooks.list_deliveries("webhook-id")
for d in deliveries:
    print(f"{d.event_type}: {d.status}")

# Get delivery statistics
stats = await client.webhooks.get_delivery_stats("webhook-id")
print(f"Success rate: {stats.success_rate}%")

# Retry a failed delivery
await client.webhooks.retry_delivery("webhook-id", "delivery-id")

Preference Center

Python
# Generate a preference center token for a subscriber
token_response = await client.subscribers.generate_preference_token("subscriber-id")

# Access the preference center with the token
pref_center = client.preference_center(token_response.token)

# Retrieve preferences and available tags
info = await pref_center.get_info()
print(f"Subscribed: {len(info.subscribed_tags)} tags")
print(f"Available: {len(info.available_tags)} tags")

# Subscribe to / unsubscribe from a tag
await pref_center.subscribe_tag("tag-id")
await pref_center.unsubscribe_tag("tag-id")

# Export data (GDPR)
export = await pref_center.export_data()

# Delete account (GDPR)
await pref_center.delete_account()

Error Handling

Python
from subscribeflow import (
    SubscribeFlowClient,
    SubscribeFlowError,
    NotFoundError,
    ValidationError,
    ConflictError,
    RateLimitError,
    AuthenticationError,
    LimitExceededError,
)

try:
    subscriber = await client.subscribers.create(email="user@example.com")
except LimitExceededError as e:
    print(f"Tier limit reached: {e.detail}")
    print(f"Current tier: {e.tier}")
    print(f"Limit: {e.limit}")
    print("Consider upgrading your plan")
except NotFoundError as e:
    print(f"Subscriber not found: {e.detail}")
except ValidationError as e:
    print(f"Validation failed: {e.detail}")
    for error in e.errors:
        print(f"  - {error['loc']}: {error['msg']}")
except ConflictError as e:
    print(f"Conflict: {e.detail}")
except RateLimitError as e:
    print("Rate limit reached, please try again later")
except AuthenticationError as e:
    print("Invalid API key")
except SubscribeFlowError as e:
    print(f"API error ({e.status}): {e.detail}")

Without Context Manager

If you prefer not to use the context manager:

Python
1
2
3
4
5
client = SubscribeFlowClient(api_key="sf_live_xxx")
try:
    subscriber = await client.subscribers.get("id")
finally:
    await client.close()

TypeScript SDK

Installation

Bash
# npm
npm install @subscribeflow/sdk

# yarn
yarn add @subscribeflow/sdk

# pnpm
pnpm add @subscribeflow/sdk

# bun
bun add @subscribeflow/sdk

Basic Usage

TypeScript
import { SubscribeFlowClient } from '@subscribeflow/sdk';

const client = new SubscribeFlowClient({
  apiKey: process.env.SUBSCRIBEFLOW_API_KEY!,
});

// Create a subscriber
const subscriber = await client.subscribers.create({
  email: 'user@example.com',
  tags: ['newsletter'],
  metadata: { source: 'website' },
});

console.log('Subscriber created:', subscriber.id);

Configuration

TypeScript
const client = new SubscribeFlowClient({
  // Required: Your API key
  apiKey: 'sf_live_xxx',

  // Optional: API base URL (default: https://api.subscribeflow.net)
  baseUrl: 'https://api.subscribeflow.net',

  // Optional: Request timeout in ms (default: 30000)
  timeout: 30000,
});

Subscriber Operations

TypeScript
// Create
const subscriber = await client.subscribers.create({
  email: 'user@example.com',
  tags: ['product-updates'],
  metadata: { plan: 'premium' },
});

// List
const { items, total, next_cursor } = await client.subscribers.list({
  limit: 50,
  status: 'active',
  tag: 'newsletter',
});

// Pagination
let cursor: string | undefined;
do {
  const result = await client.subscribers.list({ cursor });
  for (const subscriber of result.items) {
    console.log(subscriber.email);
  }
  cursor = result.next_cursor;
} while (cursor);

// Get a single subscriber
const subscriber = await client.subscribers.get('subscriber-id');

// Update
const updated = await client.subscribers.update('subscriber-id', {
  metadata: { plan: 'enterprise' },
});

// Delete
await client.subscribers.delete('subscriber-id');

// Generate token
const { token, expires_at } = await client.subscribers.generateToken('subscriber-id');

Tag Operations

TypeScript
// Create
const tag = await client.tags.create({
  name: 'Product Updates',
  slug: 'product-updates',
  description: 'Get notified about new features',
});

// List
const { items } = await client.tags.list();

// Update
await client.tags.update('tag-id', {
  description: 'Updated description',
});

// Delete
await client.tags.delete('tag-id');

Template Operations

TypeScript
// Create a template
const template = await client.templates.create({
  name: 'Welcome Email',
  subject: 'Welcome to {{company}}!',
  mjml_content: '<mjml><mj-body>...</mj-body></mjml>',
  category: 'transactional',
});

// List templates
const { items } = await client.templates.list({ category: 'transactional' });

// Get a template by slug
const tmpl = await client.templates.getBySlug('welcome-email');

// Render a preview
const preview = await client.templates.preview('template-id', {
  company: 'Acme AG',
});
console.log(preview.html);

// Update
await client.templates.update('template-id', { subject: 'New subject' });

// Delete
await client.templates.delete('template-id');

Sending Emails

TypeScript
1
2
3
4
5
6
7
8
// Send a transactional email
const result = await client.emails.send({
  template_slug: 'welcome-email',
  to: 'user@example.com',
  variables: { company: 'Acme AG' },
  idempotency_key: 'unique-key-123',
});
console.log('Email queued:', result.id);

Campaign Operations

TypeScript
// Create a campaign
const campaign = await client.campaigns.create({
  name: 'February Newsletter',
  template_id: 'template-uuid',
  tag_filter: { include_tags: ['newsletter'], match: 'any' },
});

// List campaigns
const campaigns = await client.campaigns.list({ status: 'draft' });

// Check recipient count in advance
const count = await client.campaigns.countRecipients('campaign-id');
console.log(`Will be sent to ${count.count} subscribers`);

// Send campaign
const sendResult = await client.campaigns.send('campaign-id');

// Cancel a running campaign
await client.campaigns.cancel('campaign-id');

Trigger Operations

TypeScript
// Create an event-based trigger
const trigger = await client.triggers.create({
  event_type: 'subscriber.created',
  template_id: 'welcome-template-uuid',
  description: 'Send welcome email on sign-up',
});

// List triggers
const triggers = await client.triggers.list();

// Deactivate a trigger
await client.triggers.update('trigger-id', { is_active: false });

// Delete a trigger
await client.triggers.delete('trigger-id');

Billing Operations

TypeScript
// Get subscription info
const subscription = await client.billing.getSubscription();
console.log(`Tier: ${subscription.tier}`);

// Create checkout session
const checkout = await client.billing.createCheckoutSession({
  priceId: 'price_...',
  successUrl: 'https://app.example.com/billing?success=true',
  cancelUrl: 'https://app.example.com/billing?cancel=true',
});
console.log(`Redirect to: ${checkout.url}`);

// Get usage
const usage = await client.billing.getUsage();
console.log(`Subscribers: ${usage.subscriberCount}/${usage.subscriberLimit}`);

Webhook Operations

TypeScript
// Create
const webhook = await client.webhooks.create({
  url: 'https://your-app.com/webhooks',
  events: ['subscriber.created', 'subscriber.tag.subscribed'],
  description: 'Main webhook',
});

// Store the signing secret!
console.log('Secret:', webhook.signing_secret);

// Update
await client.webhooks.update('webhook-id', {
  events: ['subscriber.created', 'subscriber.deleted'],
});

// Test
const result = await client.webhooks.test('webhook-id');
if (result.success) {
  console.log(`OK! Response time: ${result.response_time_ms}ms`);
}


// Rotate secret
const rotated = await client.webhooks.rotateSecret('webhook-id');
console.log('New secret:', rotated.signing_secret);

// Delivery history
const deliveries = await client.webhooks.listDeliveries('webhook-id');

// Statistics
const stats = await client.webhooks.getDeliveryStats('webhook-id');
console.log(`Success rate: ${stats.success_rate}%`);

// Retry a failed delivery
await client.webhooks.retryDelivery('webhook-id', 'delivery-id');

Preference Center

TypeScript
// Generate token
const tokenResponse = await client.subscribers.generatePreferenceToken('subscriber-id');

// Use the preference center with the token
const prefCenter = client.preferenceCenter(tokenResponse.token);

// Retrieve preferences
const info = await prefCenter.getInfo();

// Subscribe to / unsubscribe from a tag
await prefCenter.subscribeTag('tag-id');
await prefCenter.unsubscribeTag('tag-id');

// Export data (GDPR)
const exportData = await prefCenter.exportData();

// Delete account (GDPR)
await prefCenter.deleteAccount();

Error Handling

TypeScript
import {
  SubscribeFlowClient,
  SubscribeFlowError,
  LimitExceededError,
} from '@subscribeflow/sdk';

try {
  await client.subscribers.create({ email: 'user@example.com' });
} catch (error) {
  if (error instanceof LimitExceededError) {
    console.error('Tier limit reached:', error.detail);
    console.error('Current tier:', error.tier);
    console.error('Consider upgrading your plan');
  } else if (error instanceof SubscribeFlowError) {
    console.error('API Error:', error.message);
    console.error('Status:', error.status);
    console.error('Type:', error.type);
    console.error('Detail:', error.detail);

    // Specific error handling
    switch (error.status) {
      case 401:
        console.error('Invalid API key');
        break;
      case 404:
        console.error('Resource not found');
        break;
      case 429:
        console.error('Rate limit reached');
        break;
    }
  }
}

TypeScript Types

The SDK is fully typed:

TypeScript
1
2
3
4
5
6
7
8
import type { paths, components } from '@subscribeflow/sdk';

// Use component schemas
type Subscriber = components['schemas']['Subscriber'];
type Tag = components['schemas']['Tag'];

// Type-safe API operations
const subscriber: Subscriber = await client.subscribers.get('id');

Best Practices

API Key Management

Python
1
2
3
4
5
6
7
8
import os

# Never put API keys in code!
api_key = os.environ.get("SUBSCRIBEFLOW_API_KEY")
if not api_key:
    raise ValueError("SUBSCRIBEFLOW_API_KEY not set")

client = SubscribeFlowClient(api_key=api_key)
TypeScript
1
2
3
4
// TypeScript
const client = new SubscribeFlowClient({
  apiKey: process.env.SUBSCRIBEFLOW_API_KEY!,
});

Retry Logic

Python
import asyncio
from subscribeflow import SubscribeFlowClient, RateLimitError

async def with_retry(operation, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await operation()
        except RateLimitError:
            if attempt == max_retries - 1:
                raise
            wait_time = 2 ** attempt  # Exponential backoff
            await asyncio.sleep(wait_time)

# Usage
subscriber = await with_retry(
    lambda: client.subscribers.create(email="user@example.com")
)

Batch Operations

Python
import asyncio

async def create_subscribers_batch(emails: list[str]):
    """Create multiple subscribers in parallel."""
    tasks = [
        client.subscribers.create(email=email)
        for email in emails
    ]
    return await asyncio.gather(*tasks, return_exceptions=True)

results = await create_subscribers_batch([
    "user1@example.com",
    "user2@example.com",
    "user3@example.com",
])

Webhook Signature Verification

Python
import hmac
import hashlib

def verify_webhook_signature(
    payload: bytes,
    signature: str,
    secret: str,
) -> bool:
    """Verify the webhook signature."""
    expected = hmac.new(
        secret.encode(),
        payload,
        hashlib.sha256,
    ).hexdigest()
    return hmac.compare_digest(f"sha256={expected}", signature)
TypeScript
import { createHmac, timingSafeEqual } from 'crypto';

function verifyWebhookSignature(
  payload: string,
  signature: string,
  secret: string
): boolean {
  const expected = `sha256=${createHmac('sha256', secret)
    .update(payload)
    .digest('hex')}`;
  return timingSafeEqual(Buffer.from(signature), Buffer.from(expected));
}

Regenerating the SDK

If the API changes, the TypeScript types can be regenerated:

Bash
1
2
3
4
5
6
# Start the backend
make backend

# Generate TypeScript types
cd sdk/typescript
npm run generate

Further Reading