importasynciofromsubscribeflowimportSubscribeFlowClientasyncdefmain():# Use the client as a context manager (recommended)asyncwithSubscribeFlowClient(api_key="sf_live_xxx")asclient:# Create a subscribersubscriber=awaitclient.subscribers.create(email="user@example.com",tags=["newsletter"],metadata={"source":"website"},)print(f"Subscriber created: {subscriber.id}")# List tagstags=awaitclient.tags.list()fortagintags:print(f"- {tag.name}: {tag.subscriber_count} Subscribers")asyncio.run(main())
fromsubscribeflowimportSubscribeFlowClientclient=SubscribeFlowClient(# Required: Your API keyapi_key="sf_live_xxx",# Optional: API base URL (default: https://api.subscribeflow.net)base_url="https://api.subscribeflow.net",# Optional: Request timeout in seconds (default: 30)timeout=30.0,)
# Createsubscriber=awaitclient.subscribers.create(email="user@example.com",tags=["product-updates","newsletter"],metadata={"plan":"premium"},)# List with paginationresult=awaitclient.subscribers.list(limit=50,status="active",tag="newsletter",)# Iterate through all pageswhileTrue:forsubscriberinresult:print(subscriber.email)ifnotresult.next_cursor:breakresult=awaitclient.subscribers.list(cursor=result.next_cursor)# Get a single subscribersubscriber=awaitclient.subscribers.get("subscriber-id")# Updateupdated=awaitclient.subscribers.update("subscriber-id",metadata={"plan":"enterprise"},status="active",)# Deleteawaitclient.subscribers.delete("subscriber-id")# Generate preference center tokentoken=awaitclient.subscribers.generate_token("subscriber-id")preference_url=f"https://your-app.com/preferences?token={token.token}"
# Create a templatetemplate=awaitclient.templates.create(name="Welcome Email",subject="Welcome to {{company}}!",mjml_content="<mjml><mj-body>...</mj-body></mjml>",category="transactional",)# List templatestemplates=awaitclient.templates.list(category="transactional")# Get a template by slugtemplate=awaitclient.templates.get_by_slug("welcome-email")# Render a preview with variablespreview=awaitclient.templates.preview("template-id",variables={"company":"Acme AG"},)print(preview.html)# Updateawaitclient.templates.update("template-id",subject="New subject")# Deleteawaitclient.templates.delete("template-id")
fromsubscribeflow.modelsimportTagFilter# Create a campaigncampaign=awaitclient.campaigns.create(name="February Newsletter",template_id="template-uuid",tag_filter=TagFilter(include_tags=["newsletter"],match="any"),)# List campaignscampaigns=awaitclient.campaigns.list(status="draft")# Check recipient count in advancecount=awaitclient.campaigns.count_recipients("campaign-id")print(f"Will be sent to {count.count} subscribers")# Send campaignresult=awaitclient.campaigns.send("campaign-id")print(f"Sending to {result.total_recipients} recipients started")# Cancel a running campaignawaitclient.campaigns.cancel("campaign-id")
# Create an event-based triggertrigger=awaitclient.triggers.create(event_type="subscriber.created",template_id="welcome-template-uuid",description="Send welcome email on sign-up",)# List triggerstriggers=awaitclient.triggers.list()# Deactivate a triggerawaitclient.triggers.update("trigger-id",is_active=False)# Delete a triggerawaitclient.triggers.delete("trigger-id")
# Get current subscription infosubscription=awaitclient.billing.get_subscription()print(f"Tier: {subscription.tier}")print(f"Subscriber limit: {subscription.subscriber_limit}")print(f"Email limit: {subscription.email_limit}")# Create a Stripe checkout session for upgradingcheckout=awaitclient.billing.create_checkout_session(price_id="price_...",success_url="https://app.example.com/billing?success=true",cancel_url="https://app.example.com/billing?cancel=true",)print(f"Checkout URL: {checkout.url}")# Get usage statisticsusage=awaitclient.billing.get_usage()print(f"Subscribers: {usage.subscriber_count}/{usage.subscriber_limit}")print(f"Emails this month: {usage.emails_sent}/{usage.email_limit}")
# Generate a preference center token for a subscribertoken_response=awaitclient.subscribers.generate_preference_token("subscriber-id")# Access the preference center with the tokenpref_center=client.preference_center(token_response.token)# Retrieve preferences and available tagsinfo=awaitpref_center.get_info()print(f"Subscribed: {len(info.subscribed_tags)} tags")print(f"Available: {len(info.available_tags)} tags")# Subscribe to / unsubscribe from a tagawaitpref_center.subscribe_tag("tag-id")awaitpref_center.unsubscribe_tag("tag-id")# Export data (GDPR)export=awaitpref_center.export_data()# Delete account (GDPR)awaitpref_center.delete_account()
import{SubscribeFlowClient}from'@subscribeflow/sdk';constclient=newSubscribeFlowClient({apiKey:process.env.SUBSCRIBEFLOW_API_KEY!,});// Create a subscriberconstsubscriber=awaitclient.subscribers.create({email:'user@example.com',tags:['newsletter'],metadata:{source:'website'},});console.log('Subscriber created:',subscriber.id);
constclient=newSubscribeFlowClient({// Required: Your API keyapiKey:'sf_live_xxx',// Optional: API base URL (default: https://api.subscribeflow.net)baseUrl:'https://api.subscribeflow.net',// Optional: Request timeout in ms (default: 30000)timeout:30000,});
// Createconstsubscriber=awaitclient.subscribers.create({email:'user@example.com',tags:['product-updates'],metadata:{plan:'premium'},});// Listconst{items,total,next_cursor}=awaitclient.subscribers.list({limit:50,status:'active',tag:'newsletter',});// Paginationletcursor:string|undefined;do{constresult=awaitclient.subscribers.list({cursor});for(constsubscriberofresult.items){console.log(subscriber.email);}cursor=result.next_cursor;}while(cursor);// Get a single subscriberconstsubscriber=awaitclient.subscribers.get('subscriber-id');// Updateconstupdated=awaitclient.subscribers.update('subscriber-id',{metadata:{plan:'enterprise'},});// Deleteawaitclient.subscribers.delete('subscriber-id');// Generate tokenconst{token,expires_at}=awaitclient.subscribers.generateToken('subscriber-id');
// Create a templateconsttemplate=awaitclient.templates.create({name:'Welcome Email',subject:'Welcome to {{company}}!',mjml_content:'<mjml><mj-body>...</mj-body></mjml>',category:'transactional',});// List templatesconst{items}=awaitclient.templates.list({category:'transactional'});// Get a template by slugconsttmpl=awaitclient.templates.getBySlug('welcome-email');// Render a previewconstpreview=awaitclient.templates.preview('template-id',{company:'Acme AG',});console.log(preview.html);// Updateawaitclient.templates.update('template-id',{subject:'New subject'});// Deleteawaitclient.templates.delete('template-id');
// Create a campaignconstcampaign=awaitclient.campaigns.create({name:'February Newsletter',template_id:'template-uuid',tag_filter:{include_tags:['newsletter'],match:'any'},});// List campaignsconstcampaigns=awaitclient.campaigns.list({status:'draft'});// Check recipient count in advanceconstcount=awaitclient.campaigns.countRecipients('campaign-id');console.log(`Will be sent to ${count.count} subscribers`);// Send campaignconstsendResult=awaitclient.campaigns.send('campaign-id');// Cancel a running campaignawaitclient.campaigns.cancel('campaign-id');
// Create an event-based triggerconsttrigger=awaitclient.triggers.create({event_type:'subscriber.created',template_id:'welcome-template-uuid',description:'Send welcome email on sign-up',});// List triggersconsttriggers=awaitclient.triggers.list();// Deactivate a triggerawaitclient.triggers.update('trigger-id',{is_active:false});// Delete a triggerawaitclient.triggers.delete('trigger-id');
// Get subscription infoconstsubscription=awaitclient.billing.getSubscription();console.log(`Tier: ${subscription.tier}`);// Create checkout sessionconstcheckout=awaitclient.billing.createCheckoutSession({priceId:'price_...',successUrl:'https://app.example.com/billing?success=true',cancelUrl:'https://app.example.com/billing?cancel=true',});console.log(`Redirect to: ${checkout.url}`);// Get usageconstusage=awaitclient.billing.getUsage();console.log(`Subscribers: ${usage.subscriberCount}/${usage.subscriberLimit}`);
// Generate tokenconsttokenResponse=awaitclient.subscribers.generatePreferenceToken('subscriber-id');// Use the preference center with the tokenconstprefCenter=client.preferenceCenter(tokenResponse.token);// Retrieve preferencesconstinfo=awaitprefCenter.getInfo();// Subscribe to / unsubscribe from a tagawaitprefCenter.subscribeTag('tag-id');awaitprefCenter.unsubscribeTag('tag-id');// Export data (GDPR)constexportData=awaitprefCenter.exportData();// Delete account (GDPR)awaitprefCenter.deleteAccount();
import{SubscribeFlowClient,SubscribeFlowError,LimitExceededError,}from'@subscribeflow/sdk';try{awaitclient.subscribers.create({email:'user@example.com'});}catch(error){if(errorinstanceofLimitExceededError){console.error('Tier limit reached:',error.detail);console.error('Current tier:',error.tier);console.error('Consider upgrading your plan');}elseif(errorinstanceofSubscribeFlowError){console.error('API Error:',error.message);console.error('Status:',error.status);console.error('Type:',error.type);console.error('Detail:',error.detail);// Specific error handlingswitch(error.status){case401:console.error('Invalid API key');break;case404:console.error('Resource not found');break;case429:console.error('Rate limit reached');break;}}}
importtype{paths,components}from'@subscribeflow/sdk';// Use component schemastypeSubscriber=components['schemas']['Subscriber'];typeTag=components['schemas']['Tag'];// Type-safe API operationsconstsubscriber:Subscriber=awaitclient.subscribers.get('id');
importos# Never put API keys in code!api_key=os.environ.get("SUBSCRIBEFLOW_API_KEY")ifnotapi_key:raiseValueError("SUBSCRIBEFLOW_API_KEY not set")client=SubscribeFlowClient(api_key=api_key)
importasyncioasyncdefcreate_subscribers_batch(emails:list[str]):"""Create multiple subscribers in parallel."""tasks=[client.subscribers.create(email=email)foremailinemails]returnawaitasyncio.gather(*tasks,return_exceptions=True)results=awaitcreate_subscribers_batch(["user1@example.com","user2@example.com","user3@example.com",])
importhmacimporthashlibdefverify_webhook_signature(payload:bytes,signature:str,secret:str,)->bool:"""Verify the webhook signature."""expected=hmac.new(secret.encode(),payload,hashlib.sha256,).hexdigest()returnhmac.compare_digest(f"sha256={expected}",signature)