Introduction
Serverless architecture lets you build and run applications without managing servers. AWS Lambda executes your code in response to events, and API Gateway provides a fully managed HTTP endpoint. Combined with DynamoDB, you get a scalable, cost-effective API stack.
In this tutorial, we'll build a complete CRUD REST API for a task management system.
Prerequisites
- AWS Account (Free Tier eligible)
- AWS CLI configured (
aws configure) - SAM CLI installed
- Python 3.12+
- Basic REST API knowledge
Project Setup
First, create a new SAM project:
# Install SAM CLI
pip install aws-sam-cli
# Initialize project
sam init --runtime python3.12 --name task-api --app-template hello-world
cd task-api
Project Structure
task-api/
├── template.yaml # SAM/CloudFormation template
├── src/
│ ├── handlers/
│ │ ├── __init__.py
│ │ ├── create_task.py
│ │ ├── get_task.py
│ │ ├── list_tasks.py
│ │ ├── update_task.py
│ │ └── delete_task.py
│ └── utils/
│ ├── __init__.py
│ ├── dynamodb.py
│ └── response.py
├── tests/
├── requirements.txt
└── samconfig.toml
Step 1: Define Infrastructure with SAM
Edit template.yaml:
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Serverless Task Management API
Globals:
Function:
Timeout: 10
Runtime: python3.12
MemorySize: 256
Environment:
Variables:
TABLE_NAME: !Ref TasksTable
Architectures:
- arm64 # Graviton2 — cheaper and faster
Resources:
# DynamoDB Table
TasksTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: tasks
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: task_id
AttributeType: S
- AttributeName: user_id
AttributeType: S
KeySchema:
- AttributeName: user_id
KeyType: HASH
- AttributeName: task_id
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: task-id-index
KeySchema:
- AttributeName: task_id
KeyType: HASH
Projection:
ProjectionType: ALL
# API Gateway
TaskApi:
Type: AWS::Serverless::Api
Properties:
StageName: prod
Cors:
AllowMethods: "'GET,POST,PUT,DELETE,OPTIONS'"
AllowHeaders: "'Content-Type,Authorization'"
AllowOrigin: "'*'"
# Lambda Functions
CreateTaskFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: handlers.create_task.handler
Events:
CreateTask:
Type: Api
Properties:
RestApiId: !Ref TaskApi
Path: /tasks
Method: POST
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref TasksTable
ListTasksFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: handlers.list_tasks.handler
Events:
ListTasks:
Type: Api
Properties:
RestApiId: !Ref TaskApi
Path: /tasks
Method: GET
Policies:
- DynamoDBReadPolicy:
TableName: !Ref TasksTable
GetTaskFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: handlers.get_task.handler
Events:
GetTask:
Type: Api
Properties:
RestApiId: !Ref TaskApi
Path: /tasks/{task_id}
Method: GET
Policies:
- DynamoDBReadPolicy:
TableName: !Ref TasksTable
UpdateTaskFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: handlers.update_task.handler
Events:
UpdateTask:
Type: Api
Properties:
RestApiId: !Ref TaskApi
Path: /tasks/{task_id}
Method: PUT
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref TasksTable
DeleteTaskFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: handlers.delete_task.handler
Events:
DeleteTask:
Type: Api
Properties:
RestApiId: !Ref TaskApi
Path: /tasks/{task_id}
Method: DELETE
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref TasksTable
Outputs:
ApiUrl:
Description: API Gateway endpoint URL
Value: !Sub 'https://${TaskApi}.execute-api.${AWS::Region}.amazonaws.com/prod/'
Step 2: Build Utility Modules
DynamoDB Helper (src/utils/dynamodb.py)
import os
import boto3
from boto3.dynamodb.conditions import Key
_table = None
def get_table():
"""Reuse DynamoDB table connection across invocations."""
global _table
if _table is None:
dynamodb = boto3.resource('dynamodb')
_table = dynamodb.Table(os.environ['TABLE_NAME'])
return _table
def put_item(item: dict) -> dict:
table = get_table()
table.put_item(Item=item)
return item
def get_item(user_id: str, task_id: str) -> dict | None:
table = get_table()
response = table.get_item(Key={'user_id': user_id, 'task_id': task_id})
return response.get('Item')
def query_by_user(user_id: str, limit: int = 50) -> list[dict]:
table = get_table()
response = table.query(
KeyConditionExpression=Key('user_id').eq(user_id),
Limit=limit,
ScanIndexForward=False # newest first
)
return response.get('Items', [])
def update_item(user_id: str, task_id: str, updates: dict) -> dict:
table = get_table()
expr_names = {}
expr_values = {}
update_parts = []
for key, value in updates.items():
safe_key = f'#{key}'
expr_names[safe_key] = key
expr_values[f':{key}'] = value
update_parts.append(f'{safe_key} = :{key}')
response = table.update_item(
Key={'user_id': user_id, 'task_id': task_id},
UpdateExpression='SET ' + ', '.join(update_parts),
ExpressionAttributeNames=expr_names,
ExpressionAttributeValues=expr_values,
ReturnValues='ALL_NEW'
)
return response['Attributes']
def delete_item(user_id: str, task_id: str):
table = get_table()
table.delete_item(Key={'user_id': user_id, 'task_id': task_id})
Response Helper (src/utils/response.py)
import json
from decimal import Decimal
class DecimalEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Decimal):
return int(obj) if obj % 1 == 0 else float(obj)
return super().default(obj)
def success(body: dict | list, status_code: int = 200) -> dict:
return {
'statusCode': status_code,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
},
'body': json.dumps(body, cls=DecimalEncoder)
}
def error(message: str, status_code: int = 400) -> dict:
return success({'error': message}, status_code)
Step 3: Implement CRUD Handlers
Create Task (src/handlers/create_task.py)
import json
import uuid
from datetime import datetime, timezone
from utils.dynamodb import put_item
from utils.response import success, error
def handler(event, context):
try:
body = json.loads(event.get('body', '{}'))
except json.JSONDecodeError:
return error('Invalid JSON body')
# Validate required fields
title = body.get('title', '').strip()
if not title:
return error('title is required')
user_id = body.get('user_id', 'default')
now = datetime.now(timezone.utc).isoformat()
task = {
'task_id': str(uuid.uuid4()),
'user_id': user_id,
'title': title,
'description': body.get('description', ''),
'status': 'pending',
'priority': body.get('priority', 'medium'),
'created_at': now,
'updated_at': now,
}
put_item(task)
return success(task, 201)
List Tasks (src/handlers/list_tasks.py)
from utils.dynamodb import query_by_user
from utils.response import success
def handler(event, context):
params = event.get('queryStringParameters') or {}
user_id = params.get('user_id', 'default')
limit = min(int(params.get('limit', '50')), 100)
tasks = query_by_user(user_id, limit)
return success({'tasks': tasks, 'count': len(tasks)})
Get Task (src/handlers/get_task.py)
from utils.dynamodb import get_item
from utils.response import success, error
def handler(event, context):
task_id = event['pathParameters']['task_id']
params = event.get('queryStringParameters') or {}
user_id = params.get('user_id', 'default')
task = get_item(user_id, task_id)
if not task:
return error('Task not found', 404)
return success(task)
Update Task (src/handlers/update_task.py)
import json
from datetime import datetime, timezone
from utils.dynamodb import get_item, update_item
from utils.response import success, error
ALLOWED_FIELDS = {'title', 'description', 'status', 'priority'}
VALID_STATUSES = {'pending', 'in_progress', 'done'}
VALID_PRIORITIES = {'low', 'medium', 'high'}
def handler(event, context):
task_id = event['pathParameters']['task_id']
try:
body = json.loads(event.get('body', '{}'))
except json.JSONDecodeError:
return error('Invalid JSON body')
user_id = body.pop('user_id', 'default')
# Check task exists
if not get_item(user_id, task_id):
return error('Task not found', 404)
# Filter and validate
updates = {k: v for k, v in body.items() if k in ALLOWED_FIELDS}
if not updates:
return error('No valid fields to update')
if 'status' in updates and updates['status'] not in VALID_STATUSES:
return error(f'status must be one of: {VALID_STATUSES}')
if 'priority' in updates and updates['priority'] not in VALID_PRIORITIES:
return error(f'priority must be one of: {VALID_PRIORITIES}')
updates['updated_at'] = datetime.now(timezone.utc).isoformat()
task = update_item(user_id, task_id, updates)
return success(task)
Delete Task (src/handlers/delete_task.py)
from utils.dynamodb import get_item, delete_item
from utils.response import success, error
def handler(event, context):
task_id = event['pathParameters']['task_id']
params = event.get('queryStringParameters') or {}
user_id = params.get('user_id', 'default')
if not get_item(user_id, task_id):
return error('Task not found', 404)
delete_item(user_id, task_id)
return success({'message': 'Task deleted', 'task_id': task_id})
Step 4: Local Testing with SAM
# Build the project
sam build
# Start local API (requires Docker)
sam local start-api --port 3000
# Test endpoints
curl -X POST http://localhost:3000/tasks \
-H 'Content-Type: application/json' \
-d '{"title": "Learn Lambda", "priority": "high"}'
curl http://localhost:3000/tasks
curl -X PUT http://localhost:3000/tasks/{task_id} \
-H 'Content-Type: application/json' \
-d '{"status": "done"}'
curl -X DELETE http://localhost:3000/tasks/{task_id}
Step 5: Add Request Validation with Middleware
Create a lightweight middleware pattern:
# src/utils/middleware.py
import json
import logging
import traceback
from functools import wraps
from utils.response import error
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def api_handler(func):
"""Decorator for consistent error handling and logging."""
@wraps(func)
def wrapper(event, context):
logger.info(json.dumps({
'method': event.get('httpMethod'),
'path': event.get('path'),
'request_id': context.aws_request_id,
}))
try:
return func(event, context)
except json.JSONDecodeError:
return error('Invalid JSON in request body', 400)
except KeyError as e:
return error(f'Missing required field: {e}', 400)
except Exception as e:
logger.error(traceback.format_exc())
return error('Internal server error', 500)
return wrapper
Apply to handlers:
from utils.middleware import api_handler
@api_handler
def handler(event, context):
# Your handler code — exceptions are caught automatically
...
Step 6: Deploy to AWS
# First deployment (guided)
sam deploy --guided
# Subsequent deployments
sam deploy
# Check the output for your API URL
# https://xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/prod/
Step 7: Add API Key Authentication
Update template.yaml to require API keys:
TaskApi:
Type: AWS::Serverless::Api
Properties:
StageName: prod
Auth:
ApiKeyRequired: true
UsagePlan:
CreateUsagePlan: PER_API
UsagePlanName: TaskApiUsagePlan
Throttle:
BurstLimit: 50
RateLimit: 20
Quota:
Limit: 10000
Period: MONTH
Call with API key:
curl -H 'x-api-key: YOUR_API_KEY' \
https://your-api.execute-api.region.amazonaws.com/prod/tasks
Step 8: Add CloudWatch Monitoring
# Add to template.yaml Resources
ApiErrorAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: TaskApi-5xx-Errors
MetricName: 5XXError
Namespace: AWS/ApiGateway
Dimensions:
- Name: ApiName
Value: !Ref TaskApi
Statistic: Sum
Period: 300
EvaluationPeriods: 1
Threshold: 5
ComparisonOperator: GreaterThanThreshold
AlarmActions:
- !Ref AlertTopic
AlertTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: task-api-alerts
Cost Estimation
| Service | Free Tier | Beyond Free Tier |
|---------|-----------|------------------|
| Lambda | 1M requests/month | $0.20 per 1M requests |
| API Gateway | 1M calls/month | $3.50 per 1M calls |
| DynamoDB | 25 GB storage | $0.25 per GB/month |
For a low-traffic API (~10K requests/day), the monthly cost is approximately $0 under Free Tier or ~$3 after.
Summary
You've built a complete serverless REST API with: