Skip to main content

Overview

The @publish decorator allows you to automatically publish function return values to Contextbase without modifying your existing code.

Basic usage

Simple JSON publishing

from contextbase import publish

@publish("analytics", "user-actions")
def track_user_login(user_id, login_method):
    return {
        "user_id": user_id,
        "action": "login",
        "method": login_method,
        "timestamp": "2024-01-15T14:30:00Z",
        "success": True
    }

# Function runs normally and publishes result
result = track_user_login(123, "oauth")
print(result)  # Returns the original data

File publishing

Automatically convert function output to files:
@publish("reports", "daily-summary", as_file=True, file_name="summary.txt")
def generate_daily_report():
    return "Daily Report: All systems operational"

# Creates and uploads summary.txt automatically
report = generate_daily_report()

@publish("exports", "data-dumps", as_file=True, file_name="export.json")
def export_user_data(user_id):
    data = get_user_data(user_id)  # Your function
    return json.dumps(data, indent=2)

# Creates JSON file and uploads it
export_user_data(123)

Decorator parameters

Required parameters

@publish(context_name, **options)
  • context_name (str): Name of the context to publish to

Optional parameters

@publish(
    context_name="analytics",
    scopes={"environment": "prod"},          # Scoping data (static or callable)
    raise_on_error=False,                    # Error handling behavior
    as_file=False,                           # Publish as file instead of JSON
    file_name="output.txt"                   # File name (static or callable)
)

Scoping

Add scopes to filter and organize your published data:

Static scopes

@publish("monitoring", "metrics", scopes={"environment": "production"})
def collect_system_metrics():
    return {
        "cpu_usage": 75,
        "memory_usage": 60,
        "disk_usage": 45
    }

Dynamic scopes

Compute scopes dynamically based on the function’s return value:
@publish("user-data", "preferences", scopes=lambda result: {"user_id": result["user_id"]})
def update_user_preferences(user_id, preferences):
    return {
        "user_id": user_id,
        "preferences": preferences,
        "updated_at": datetime.now().isoformat()
    }

# Scopes will be: {"user_id": 123}
update_user_preferences(123, {"theme": "dark"})
Advanced dynamic scoping example:
@publish(
    "error-tracking", 
    "api-errors",
    scopes=lambda result: {
        "service": result["service"],
        "severity": result["severity"],
        "error_type": result["error_type"]
    }
)
def handle_api_error(service, error_message, severity="medium"):
    error_type = "timeout" if "timeout" in error_message.lower() else "unknown"
    return {
        "service": service,
        "error_message": error_message,
        "error_type": error_type,
        "severity": severity,
        "timestamp": datetime.now().isoformat()
    }

# Scopes computed from actual error details!
handle_api_error("payment-api", "Connection timeout", "high")

Error handling

Silent mode (Default)

By default, publishing errors don’t affect your function:
@publish("logs", "errors")  # raise_on_error=False by default
def process_data(data):
    # Your business logic
    result = expensive_computation(data)
    return result
    # Even if publishing fails, your function succeeds

Strict mode

Raise exceptions when publishing fails:
@publish("critical-data", "transactions", raise_on_error=True)
def process_payment(amount, user_id):
    # Critical data that must be logged
    transaction = {
        "amount": amount,
        "user_id": user_id,
        "status": "completed",
        "timestamp": datetime.now().isoformat()
    }
    return transaction
    # Will raise ContextbaseError if publishing fails

Custom error handling

from contextbase import ContextbaseError

@publish("analytics", "events", raise_on_error=True)
def track_event(event_type, data):
    try:
        return {"event": event_type, "data": data}
    except ContextbaseError as e:
        # Handle publishing error
        logger.error(f"Failed to publish event: {e.message}")
        # Could fallback to local storage, retry later, etc.
        raise

File publishing

Automatic file names

When file_name is not provided, the decorator generates one:
@publish("reports", "generated", as_file=True)
def create_user_report(user_id):
    return f"Report for user {user_id}: ..."

# Creates file: "create_user_report_output.txt"

Custom file names

@publish("exports", "csv-data", as_file=True, file_name="data.csv")
def export_to_csv():
    return "name,email,age\nJohn,[email protected],30"

# Creates file: "data.csv"

Dynamic file names

Generate file names dynamically:
from datetime import datetime

@publish("backups", "database", as_file=True, 
         file_name=lambda: f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.sql")
def backup_database():
    return generate_sql_dump()

# Creates file: "backup_20240115_143000.sql"
More dynamic file name examples:
# User-specific reports
@publish("user-reports", "monthly", as_file=True,
         file_name=lambda: f"user_report_{datetime.now().strftime('%Y%m')}.pdf")
def generate_monthly_report(user_id):
    return create_pdf_content(user_id)

# Content-based naming
@publish("analysis", "results", as_file=True,
         file_name=lambda: f"analysis_{int(time.time())}.json")
def analyze_data(dataset):
    return json.dumps(perform_analysis(dataset))

Advanced examples

ML experiment tracking

@publish(
    "ml-experiments", 
    "training-results",
    scopes=lambda result: {
        "model_type": result["model_config"]["type"],
        "accuracy_tier": "high" if result["accuracy"] > 0.9 else "medium"
    }
)
def train_model(model_config):
    model = create_model(model_config)
    metrics = train_and_evaluate(model)
    
    return {
        "model_config": model_config,
        "accuracy": metrics["accuracy"],
        "loss": metrics["loss"],
        "training_time": metrics["duration"],
        "experiment_id": generate_id()
    }

# Automatically categorizes experiments by type and performance!

Multi-tenant application logging

@publish(
    "multi-tenant-logs",
    "user-actions", 
    scopes=lambda result: {
        "tenant_id": result["tenant_id"],
        "user_role": result["user_role"],
        "action_category": result["action"].split("_")[0]
    }
)
def track_tenant_action(tenant_id, user_id, user_role, action):
    return {
        "tenant_id": tenant_id,
        "user_id": user_id, 
        "user_role": user_role,
        "action": action,
        "timestamp": datetime.now().isoformat()
    }

# Perfect scoping for multi-tenant analytics!

Combined dynamic features

@publish(
    "user-reports",
    "personalized",
    as_file=True,
    scopes=lambda result: {
        "user_id": extract_user_id(result),
        "report_type": "monthly"
    },
    file_name=lambda: f"user_report_{datetime.now().strftime('%Y%m')}.txt"
)
def generate_personalized_report(user_id):
    return f"User Report for {user_id}: ..."

# Both scopes AND filename computed dynamically! 🚀

Combining with other decorators

The @publish decorator works well with other decorators:
import time
from functools import wraps

def timing_decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        duration = time.time() - start
        print(f"{func.__name__} took {duration:.2f} seconds")
        return result
    return wrapper

@timing_decorator
@publish("performance", "function-timings")
def expensive_computation(data):
    # Simulate expensive work
    time.sleep(1)
    return {"result": len(data), "processed_at": datetime.now().isoformat()}

# Both decorators work together
expensive_computation([1, 2, 3, 4, 5])

Best practices

# Use a descriptive context name
@publish("user-login-events")
def track_login(user_id):
    return {"user_id": user_id, "timestamp": datetime.now().isoformat()}

# Use dynamic scopes for better organization
@publish("monitoring-errors", scopes=lambda result: {
    "service": result["service"],
    "severity": result["severity"]
})
def log_error(service, error):
    return {"service": service, "error": str(error), "severity": "high"}

# Use raise_on_error=True for critical data
@publish("financial-transactions", raise_on_error=True)
def process_transaction(amount):
    return {"amount": amount, "processed": True}

❌ Avoid these patterns

# Don't use vague names
@publish("stuff")  # ❌ Not descriptive

# Don't publish sensitive data
@publish("passwords")  # ❌ Security risk
def authenticate(password):
    return {"password": password}  # ❌ Never log passwords

# Don't ignore errors for critical data
@publish("charges", raise_on_error=False)  # ❌ Could lose important data
def process_charge(amount):
    return {"charge": amount}

## Troubleshooting

### Debugging dynamic scopes

```python
# Test your dynamic scopes logic separately
def compute_scopes(result):
    return {"user_id": result.get("user_id", "unknown")}

# Test it
test_result = {"user_id": 123, "action": "login"}
print(compute_scopes(test_result))  # {"user_id": 123}

# Use in decorator
@publish("events", scopes=compute_scopes)
def my_function():
    return test_result

Debugging publishing issues

from contextbase import ContextbaseError

@publish("test", raise_on_error=True)
def debug_function():
    return {"test": "data"}

try:
    result = debug_function()
    print("✅ Publishing successful")
except ContextbaseError as e:
    print(f"❌ Publishing failed: {e.message}")
    print(f"Status code: {e.status_code}")
    print(f"Errors: {e.errors}")