Skip to main content

Shield Authentication

SDD Classification: L3-Technical Authority: Engineering Team Review Cycle: Quarterly
This document covers Shield’s authentication mechanisms including email/password login, JWT token management, OAuth 2.0 flows, and session handling.

Authentication Overview


Email/Password Authentication

Login Endpoint

POST /auth/login
Content-Type: application/json

{
  "email": "[email protected]",
  "password": "secure_password"
}

Success Response

{
  "access_token": "eyJhbGciOiJSUzI1NiIs...",
  "refresh_token": "eyJhbGciOiJSUzI1NiIs...",
  "token_type": "Bearer",
  "expires_in": 900,
  "user": {
    "id": "2441f8c8-0e14-4a71-8f32-8cbbf80382ae",
    "email": "[email protected]",
    "name": "John Doe",
    "is_verified": true
  }
}

Login Flow Implementation

class AuthService:
    def authenticate(self, email: str, password: str) -> AuthResult:
        # 1. Rate limit check
        if self.rate_limiter.is_exceeded(email):
            raise RateLimitExceeded('Too many login attempts')

        # 2. Fetch user
        user = User.objects.filter(email=email, is_active=True).first()
        if not user:
            self.rate_limiter.record_failure(email)
            raise InvalidCredentials()

        # 3. Verify password
        if not check_password(password, user.password):
            self.rate_limiter.record_failure(email)
            self.audit_log.record_failed_login(email)
            raise InvalidCredentials()

        # 4. Check email verification
        if not user.is_verified:
            raise EmailNotVerified()

        # 5. Generate tokens
        access_token = self.token_service.create_access_token(user)
        refresh_token = self.token_service.create_refresh_token(user)

        # 6. Cache user context
        self.cache_user_permissions(user)

        # 7. Audit log
        self.audit_log.record_successful_login(user)

        return AuthResult(
            access_token=access_token,
            refresh_token=refresh_token,
            user=user
        )

JWT Token Structure

Access Token Claims

{
  "user_id": "2441f8c8-0e14-4a71-8f32-8cbbf80382ae",
  "email": "[email protected]",
  "name": "John Doe",
  "workspace_ids": ["ws_123", "ws_456"],
  "roles": {
    "ws_123": "admin",
    "ws_456": "member"
  },
  "permissions": {
    "documents": ["read", "write"],
    "workspaces": ["read"]
  },
  "iat": 1704067200,
  "exp": 1704068100,
  "jti": "token_unique_id",
  "token_type": "access"
}

Token Properties

PropertyAccess TokenRefresh Token
AlgorithmRS256RS256
Key Size2048-bit RSA2048-bit RSA
Expiry15 minutes30 days
ContainsFull user contextUser ID only
StorageClient memorySecure storage

Token Generation

class TokenService:
    def create_access_token(self, user: User) -> str:
        workspaces = user.get_workspace_memberships()

        payload = {
            'user_id': str(user.id),
            'email': user.email,
            'name': user.name,
            'workspace_ids': [str(ws.id) for ws in workspaces],
            'roles': {str(m.workspace_id): m.role for m in workspaces},
            'permissions': self.calculate_permissions(user),
            'iat': datetime.utcnow(),
            'exp': datetime.utcnow() + timedelta(minutes=15),
            'jti': str(uuid4()),
            'token_type': 'access'
        }

        return jwt.encode(
            payload,
            self.private_key,
            algorithm='RS256'
        )

    def create_refresh_token(self, user: User) -> str:
        token_id = uuid4()

        # Store refresh token in database
        RefreshToken.objects.create(
            id=token_id,
            user=user,
            token_hash=self.hash_token(str(token_id)),
            expires_at=datetime.utcnow() + timedelta(days=30)
        )

        payload = {
            'user_id': str(user.id),
            'jti': str(token_id),
            'iat': datetime.utcnow(),
            'exp': datetime.utcnow() + timedelta(days=30),
            'token_type': 'refresh'
        }

        return jwt.encode(
            payload,
            self.private_key,
            algorithm='RS256'
        )

Token Refresh

Refresh Endpoint

POST /auth/refresh
Content-Type: application/json

{
  "refresh_token": "eyJhbGciOiJSUzI1NiIs..."
}

Refresh Flow

Refresh Implementation

def refresh_tokens(self, refresh_token: str) -> AuthResult:
    # 1. Decode token
    try:
        payload = jwt.decode(
            refresh_token,
            self.public_key,
            algorithms=['RS256']
        )
    except jwt.ExpiredSignatureError:
        raise TokenExpired()
    except jwt.InvalidTokenError:
        raise InvalidToken()

    # 2. Verify token type
    if payload.get('token_type') != 'refresh':
        raise InvalidToken()

    # 3. Check database
    token_record = RefreshToken.objects.filter(
        id=payload['jti'],
        user_id=payload['user_id'],
        revoked_at__isnull=True,
        expires_at__gt=datetime.utcnow()
    ).first()

    if not token_record:
        # Possible token theft - revoke all user tokens
        self.revoke_all_user_tokens(payload['user_id'])
        raise TokenRevoked()

    # 4. Revoke old token
    token_record.revoked_at = datetime.utcnow()
    token_record.save()

    # 5. Issue new tokens
    user = token_record.user
    new_access_token = self.create_access_token(user)
    new_refresh_token = self.create_refresh_token(user)

    return AuthResult(
        access_token=new_access_token,
        refresh_token=new_refresh_token,
        user=user
    )

Token Revocation

Revoke Endpoint

POST /auth/revoke
Authorization: Bearer <access_token>
Content-Type: application/json

{
  "refresh_token": "eyJhbGciOiJSUzI1NiIs..."
}

Revoke All Sessions

POST /auth/revoke-all
Authorization: Bearer <access_token>

Implementation

def revoke_token(self, refresh_token: str) -> None:
    payload = jwt.decode(
        refresh_token,
        self.public_key,
        algorithms=['RS256'],
        options={'verify_exp': False}  # Allow revoking expired tokens
    )

    RefreshToken.objects.filter(id=payload['jti']).update(
        revoked_at=datetime.utcnow()
    )

    self.audit_log.record_token_revocation(payload['user_id'])

def revoke_all_user_tokens(self, user_id: str) -> None:
    RefreshToken.objects.filter(
        user_id=user_id,
        revoked_at__isnull=True
    ).update(revoked_at=datetime.utcnow())

    # Invalidate cached permissions
    self.cache.delete_pattern(f'permission:{user_id}:*')
    self.cache.delete(f'user_context:{user_id}')

    self.audit_log.record_all_tokens_revoked(user_id)

Public Key Distribution

JWKS Endpoint

GET /.well-known/jwks.json

Response

{
  "keys": [
    {
      "kty": "RSA",
      "use": "sig",
      "kid": "key_2024_01",
      "alg": "RS256",
      "n": "base64url_encoded_modulus...",
      "e": "AQAB"
    },
    {
      "kty": "RSA",
      "use": "sig",
      "kid": "key_2024_02",
      "alg": "RS256",
      "n": "base64url_encoded_modulus...",
      "e": "AQAB"
    }
  ]
}

Key Rotation

PropertyValue
Rotation Interval90 days
Key Overlap7 days
Active Keys2 (current + previous)
Cache TTL5 minutes

Rate Limiting

Limits by Endpoint

EndpointLimitWindow
/auth/login5 attempts15 minutes per IP
/auth/register3 accounts1 hour per IP
/auth/password-reset3 requests1 hour per IP
/auth/refresh10 requests1 minute

Implementation

class RateLimiter:
    def is_exceeded(self, identifier: str, limit: int = 5, window: int = 900) -> bool:
        key = f'rate_limit:{identifier}'
        current = self.redis.incr(key)

        if current == 1:
            self.redis.expire(key, window)

        return current > limit

    def get_retry_after(self, identifier: str) -> int:
        key = f'rate_limit:{identifier}'
        ttl = self.redis.ttl(key)
        return max(0, ttl)

Session Security

SESSION_COOKIE_SECURE = True      # HTTPS only
SESSION_COOKIE_HTTPONLY = True    # No JavaScript access
SESSION_COOKIE_SAMESITE = 'Lax'   # CSRF protection
SESSION_COOKIE_AGE = 1209600      # 2 weeks
SESSION_EXPIRE_AT_BROWSER_CLOSE = False

Session Storage

# Redis session backend
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'sessions'

CACHES = {
    'sessions': {
        'BACKEND': 'django_redis.cache.RedisCache',
        'LOCATION': 'redis://localhost:6379/1',
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
        }
    }
}

Error Codes

CodeHTTP StatusDescription
INVALID_CREDENTIALS401Email or password incorrect
EMAIL_NOT_VERIFIED403Email verification required
ACCOUNT_LOCKED403Too many failed attempts
ACCOUNT_SUSPENDED403Account disabled by admin
TOKEN_EXPIRED401Access/refresh token expired
TOKEN_REVOKED401Token has been revoked
RATE_LIMIT_EXCEEDED429Too many requests


Document Status: Complete Version: 2.0