Skip to content

[FEATURE][SECURITY]: Generic IP-based access control (allowlist) #536

@crivetimihai

Description

@crivetimihai

🔐 SECURITY FEATURE: Generic IP-Based Access Control

Summary: Implement flexible IP address restrictions for any operation type (admin, API, regular users) with support for both allowlists and blocklists. Provides granular control over network access with per-path, per-user, and per-role configurations.

Background: Organizations need flexible IP control beyond just admin access:

  • Public APIs restricted to partner IP ranges
  • Internal services limited to corporate networks
  • Geographic restrictions for compliance
  • DDoS protection via IP blocklisting
  • Customer-specific API access restrictions
  • Temporary blocks for suspicious activity

Scope:

  • Generic IP control system for any endpoint
  • Support both allowlist and blocklist modes
  • Path-based rules with regex support
  • Role-based and user-specific rules
  • Geographic IP restrictions (country-level)
  • Temporary IP blocks with expiration
  • Dynamic rule updates via API
  • IP reputation integration (optional)
  • Detailed metrics and logging

Out of Scope

  • This feature does not replace existing ingress / API gateway rules and security best practices, and adds to the existing layered security approach. It should NOT be the only mechanism you use to prevent access to specific API endpoints.

Implementation Details

1. Enhanced Configuration Structure

mcpgateway/config.py:

from typing import Dict, List, Optional, Set
from pydantic import BaseModel
from enum import Enum

class IPControlMode(str, Enum):
    ALLOWLIST = "allowlist"
    BLOCKLIST = "blocklist"
    DISABLED = "disabled"

class IPRule(BaseModel):
    """Individual IP access rule."""
    name: str
    description: Optional[str] = None
    mode: IPControlMode
    ips: List[str] = []  # IPs and CIDR ranges
    countries: List[str] = []  # ISO country codes
    applies_to_paths: List[str] = ["*"]  # Path patterns
    applies_to_roles: List[str] = ["*"]  # User roles
    applies_to_users: List[str] = []  # Specific user IDs
    priority: int = 0  # Higher priority rules evaluated first
    enabled: bool = True
    expires_at: Optional[datetime] = None

class IPControlConfig(BaseModel):
    """IP access control configuration."""
    # Global settings
    enabled: bool = True
    default_mode: IPControlMode = IPControlMode.DISABLED
    check_behind_proxy: bool = True
    trusted_proxies: List[str] = []
    enforcement_level: str = "enforce"  # enforce, monitor, off
    
    # Rules
    rules: List[IPRule] = []
    
    # Performance settings
    cache_ttl: int = 300  # Cache IP checks for 5 minutes
    use_geoip: bool = False
    geoip_database: Optional[str] = None
    
    # Emergency access
    bypass_enabled: bool = False
    bypass_code_ttl: int = 300
    
    # Integration
    use_ip_reputation: bool = False
    ip_reputation_threshold: float = 0.5

# Add to main config
ip_control: IPControlConfig = IPControlConfig()

2. Database Schema for Dynamic Rules

alembic/versions/xxx_add_ip_rules.py:

def upgrade():
    op.create_table(
        'ip_rules',
        sa.Column('id', sa.String, primary_key=True),
        sa.Column('name', sa.String, nullable=False, unique=True),
        sa.Column('description', sa.Text),
        sa.Column('mode', sa.String, nullable=False),  # allowlist/blocklist
        sa.Column('rule_data', sa.JSON, nullable=False),  # Full rule config
        sa.Column('enabled', sa.Boolean, default=True),
        sa.Column('priority', sa.Integer, default=0),
        sa.Column('expires_at', sa.DateTime(timezone=True)),
        sa.Column('created_at', sa.DateTime(timezone=True), default=func.now()),
        sa.Column('created_by', sa.String, sa.ForeignKey('users.id')),
        sa.Column('updated_at', sa.DateTime(timezone=True), onupdate=func.now()),
        sa.Column('updated_by', sa.String, sa.ForeignKey('users.id'))
    )
    
    op.create_index('idx_ip_rules_enabled_priority', 'ip_rules', 
                    ['enabled', 'priority'])
    
    # Temporary IP blocks table
    op.create_table(
        'ip_blocks',
        sa.Column('id', sa.String, primary_key=True),
        sa.Column('ip_address', sa.String, nullable=False),
        sa.Column('reason', sa.Text),
        sa.Column('blocked_until', sa.DateTime(timezone=True), nullable=False),
        sa.Column('blocked_by', sa.String, sa.ForeignKey('users.id')),
        sa.Column('created_at', sa.DateTime(timezone=True), default=func.now())
    )
    
    op.create_index('idx_ip_blocks_ip_until', 'ip_blocks', 
                    ['ip_address', 'blocked_until'])

3. Generic IP Control Middleware

mcpgateway/middleware/ip_control.py:

import re
import ipaddress
from datetime import datetime, timezone
from typing import Optional, Tuple, List
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from fastapi import HTTPException

from mcpgateway.config import settings
from mcpgateway.services.ip_control_service import IPControlService
from mcpgateway.services.audit_logger import audit_logger
from mcpgateway.auth import get_current_user_optional

class IPControlMiddleware(BaseHTTPMiddleware):
    """Generic IP-based access control middleware."""
    
    def __init__(self, app):
        super().__init__(app)
        self.ip_service = IPControlService()
        self._path_cache = {}  # Cache compiled regex patterns
    
    async def dispatch(self, request: Request, call_next):
        if not settings.ip_control.enabled:
            return await call_next(request)
        
        # Extract client IP
        client_ip = self._get_client_ip(request)
        request.state.client_ip = client_ip
        
        # Get current user context
        user = await get_current_user_optional(request)
        user_id = user.id if user else None
        user_roles = user.roles if user else []
        
        # Check IP access
        allowed, rule_name, reason = await self.ip_service.check_access(
            ip=client_ip,
            path=request.url.path,
            user_id=user_id,
            user_roles=user_roles
        )
        
        # Log the access attempt
        await self._log_access(
            request=request,
            client_ip=client_ip,
            allowed=allowed,
            rule_name=rule_name,
            reason=reason,
            user_id=user_id
        )
        
        # Store result in request state
        request.state.ip_check = {
            "allowed": allowed,
            "rule": rule_name,
            "reason": reason
        }
        
        # Enforce based on configuration
        if not allowed and settings.ip_control.enforcement_level == "enforce":
            raise HTTPException(
                status_code=403,
                detail="Access denied from your IP address"
            )
        
        response = await call_next(request)
        
        # Add security headers
        if not allowed and settings.ip_control.enforcement_level == "monitor":
            response.headers["X-IP-Access-Warning"] = "monitored"
        
        return response
    
    def _get_client_ip(self, request: Request) -> str:
        """Extract real client IP considering proxies."""
        if settings.ip_control.check_behind_proxy:
            forwarded_for = request.headers.get("X-Forwarded-For")
            real_ip = request.headers.get("X-Real-IP")
            
            if forwarded_for and self._is_trusted_proxy(request.client.host):
                # Take the first IP from X-Forwarded-For chain
                return forwarded_for.split(",")[0].strip()
            elif real_ip and self._is_trusted_proxy(request.client.host):
                return real_ip.strip()
        
        return request.client.host
    
    def _is_trusted_proxy(self, proxy_ip: str) -> bool:
        """Check if proxy IP is trusted."""
        return any(
            self._ip_matches(proxy_ip, trusted)
            for trusted in settings.ip_control.trusted_proxies
        )

4. IP Control Service

mcpgateway/services/ip_control_service.py:

class IPControlService:
    """Service for IP access control logic."""
    
    def __init__(self):
        self.redis = get_redis_client()
        self.geoip = GeoIPService() if settings.ip_control.use_geoip else None
        self.reputation = IPReputationService() if settings.ip_control.use_ip_reputation else None
    
    async def check_access(
        self,
        ip: str,
        path: str,
        user_id: Optional[str] = None,
        user_roles: List[str] = []
    ) -> Tuple[bool, Optional[str], str]:
        """
        Check if IP has access to the given path.
        Returns: (allowed, rule_name, reason)
        """
        # Check cache first
        cache_key = f"ip_access:{ip}:{path}:{user_id}:{':'.join(sorted(user_roles))}"
        cached = await self.redis.get(cache_key)
        if cached:
            return json.loads(cached)
        
        # Check temporary blocks
        if await self._is_temporarily_blocked(ip):
            result = (False, "temporary_block", "IP is temporarily blocked")
            await self._cache_result(cache_key, result)
            return result
        
        # Check IP reputation if enabled
        if self.reputation:
            reputation_score = await self.reputation.get_score(ip)
            if reputation_score < settings.ip_control.ip_reputation_threshold:
                result = (False, "reputation", f"IP reputation too low: {reputation_score}")
                await self._cache_result(cache_key, result)
                return result
        
        # Get applicable rules
        rules = await self._get_applicable_rules(path, user_id, user_roles)
        
        # Evaluate rules in priority order
        for rule in sorted(rules, key=lambda r: r.priority, reverse=True):
            matches = await self._evaluate_rule(rule, ip, path, user_id, user_roles)
            
            if matches:
                if rule.mode == IPControlMode.ALLOWLIST:
                    result = (True, rule.name, "Allowed by allowlist rule")
                else:  # BLOCKLIST
                    result = (False, rule.name, "Blocked by blocklist rule")
                
                await self._cache_result(cache_key, result)
                return result
        
        # No matching rules - use default mode
        if settings.ip_control.default_mode == IPControlMode.ALLOWLIST:
            result = (False, "default", "Not in any allowlist")
        elif settings.ip_control.default_mode == IPControlMode.BLOCKLIST:
            result = (True, "default", "Not in any blocklist")
        else:  # DISABLED
            result = (True, "default", "IP control disabled")
        
        await self._cache_result(cache_key, result)
        return result
    
    async def _evaluate_rule(
        self,
        rule: IPRule,
        ip: str,
        path: str,
        user_id: Optional[str],
        user_roles: List[str]
    ) -> bool:
        """Check if a rule matches the current request."""
        # Check if rule applies to this path
        if not self._path_matches(path, rule.applies_to_paths):
            return False
        
        # Check if rule applies to this user/role
        if rule.applies_to_users and user_id not in rule.applies_to_users:
            return False
        
        if rule.applies_to_roles != ["*"]:
            if not any(role in rule.applies_to_roles for role in user_roles):
                return False
        
        # Check IP matches
        if rule.ips:
            if any(self._ip_matches(ip, allowed_ip) for allowed_ip in rule.ips):
                return True
        
        # Check country matches
        if rule.countries and self.geoip:
            country = await self.geoip.get_country(ip)
            if country in rule.countries:
                return True
        
        return False

5. API Endpoints for Management

mcpgateway/api/ip_control.py:

@router.get("/api/ip-control/rules", response_model=List[IPRuleResponse])
async def list_ip_rules(
    enabled_only: bool = True,
    current_user: User = Depends(require_admin)
):
    """List all IP control rules."""
    return await ip_control_service.list_rules(enabled_only)

@router.post("/api/ip-control/rules", response_model=IPRuleResponse)
async def create_ip_rule(
    rule: IPRuleCreate,
    current_user: User = Depends(require_admin)
):
    """Create a new IP control rule."""
    # Validate rule
    await validate_ip_rule(rule)
    
    # Create rule
    created_rule = await ip_control_service.create_rule(
        rule=rule,
        created_by=current_user.id
    )
    
    # Audit log
    await audit_logger.log(
        event_type="security",
        event_action="ip_rule_created",
        event_result="success",
        severity="info",
        actor_id=current_user.id,
        resource_type="ip_rule",
        resource_id=created_rule.id,
        details={"rule_name": rule.name, "mode": rule.mode}
    )
    
    return created_rule

@router.post("/api/ip-control/test")
async def test_ip_access(
    request: IPTestRequest,
    current_user: User = Depends(require_admin)
):
    """Test if an IP would have access to a path."""
    allowed, rule, reason = await ip_control_service.check_access(
        ip=request.ip_address,
        path=request.path,
        user_id=request.test_user_id,
        user_roles=request.test_roles
    )
    
    return {
        "ip_address": request.ip_address,
        "path": request.path,
        "allowed": allowed,
        "matching_rule": rule,
        "reason": reason,
        "country": await geoip_service.get_country(request.ip_address) if geoip_service else None
    }

@router.post("/api/ip-control/block")
async def block_ip_temporarily(
    request: BlockIPRequest,
    current_user: User = Depends(require_admin)
):
    """Temporarily block an IP address."""
    await ip_control_service.block_ip(
        ip=request.ip_address,
        duration_minutes=request.duration_minutes,
        reason=request.reason,
        blocked_by=current_user.id
    )
    
    return {"status": "blocked", "until": request.blocked_until}

@router.get("/api/ip-control/analytics")
async def get_ip_analytics(
    time_range: str = "24h",
    current_user: User = Depends(require_admin)
):
    """Get IP access analytics."""
    return await ip_control_service.get_analytics(time_range)

6. Admin UI Components

IP Control Dashboard:

<div id="ip-control-dashboard" class="space-y-6">
    <!-- Overview Cards -->
    <div class="grid grid-cols-1 md:grid-cols-4 gap-4">
        <div class="bg-white dark:bg-gray-800 p-6 rounded-lg shadow">
            <h3 class="text-sm font-medium text-gray-500 dark:text-gray-400">Active Rules</h3>
            <p class="mt-2 text-3xl font-bold text-gray-900 dark:text-white">{{ stats.active_rules }}</p>
        </div>
        <div class="bg-white dark:bg-gray-800 p-6 rounded-lg shadow">
            <h3 class="text-sm font-medium text-gray-500 dark:text-gray-400">Blocked IPs (24h)</h3>
            <p class="mt-2 text-3xl font-bold text-red-600">{{ stats.blocked_24h }}</p>
        </div>
        <div class="bg-white dark:bg-gray-800 p-6 rounded-lg shadow">
            <h3 class="text-sm font-medium text-gray-500 dark:text-gray-400">Countries Blocked</h3>
            <p class="mt-2 text-3xl font-bold text-gray-900 dark:text-white">{{ stats.blocked_countries }}</p>
        </div>
        <div class="bg-white dark:bg-gray-800 p-6 rounded-lg shadow">
            <h3 class="text-sm font-medium text-gray-500 dark:text-gray-400">Mode</h3>
            <p class="mt-2 text-lg font-bold">
                <span class="px-3 py-1 rounded-full text-sm
                    {{ 'bg-green-100 text-green-800' if enforcement_level == 'enforce' else 'bg-yellow-100 text-yellow-800' }}">
                    {{ enforcement_level|upper }}
                </span>
            </p>
        </div>
    </div>

    <!-- Rule Management -->
    <div class="bg-white dark:bg-gray-800 shadow rounded-lg">
        <div class="px-6 py-4 border-b border-gray-200 dark:border-gray-700">
            <div class="flex justify-between items-center">
                <h2 class="text-lg font-bold text-gray-900 dark:text-white">IP Control Rules</h2>
                <button onclick="showCreateRuleModal()" 
                    class="px-4 py-2 bg-blue-600 text-white rounded-md hover:bg-blue-700">
                    Add Rule
                </button>
            </div>
        </div>
        
        <div class="overflow-x-auto">
            <table class="min-w-full divide-y divide-gray-200 dark:divide-gray-700">
                <thead class="bg-gray-50 dark:bg-gray-900">
                    <tr>
                        <th class="px-6 py-3 text-left text-xs font-medium text-gray-500 uppercase">Name</th>
                        <th class="px-6 py-3 text-left text-xs font-medium text-gray-500 uppercase">Mode</th>
                        <th class="px-6 py-3 text-left text-xs font-medium text-gray-500 uppercase">Applies To</th>
                        <th class="px-6 py-3 text-left text-xs font-medium text-gray-500 uppercase">IPs/Countries</th>
                        <th class="px-6 py-3 text-left text-xs font-medium text-gray-500 uppercase">Priority</th>
                        <th class="px-6 py-3 text-left text-xs font-medium text-gray-500 uppercase">Status</th>
                        <th class="px-6 py-3 text-left text-xs font-medium text-gray-500 uppercase">Actions</th>
                    </tr>
                </thead>
                <tbody class="divide-y divide-gray-200 dark:divide-gray-700">
                    {% for rule in rules %}
                    <tr class="hover:bg-gray-50 dark:hover:bg-gray-800">
                        <td class="px-6 py-4 whitespace-nowrap">
                            <div>
                                <div class="text-sm font-medium text-gray-900 dark:text-white">
                                    {{ rule.name }}
                                </div>
                                <div class="text-sm text-gray-500 dark:text-gray-400">
                                    {{ rule.description|truncate(50) }}
                                </div>
                            </div>
                        </td>
                        <td class="px-6 py-4 whitespace-nowrap">
                            <span class="px-2 py-1 text-xs rounded-full
                                {{ 'bg-green-100 text-green-800' if rule.mode == 'allowlist' else 'bg-red-100 text-red-800' }}">
                                {{ rule.mode|upper }}
                            </span>
                        </td>
                        <td class="px-6 py-4 text-sm">
                            <div class="space-y-1">
                                <div>Paths: {{ rule.applies_to_paths|join(', ')|truncate(30) }}</div>
                                <div>Roles: {{ rule.applies_to_roles|join(', ')|truncate(30) }}</div>
                            </div>
                        </td>
                        <td class="px-6 py-4 text-sm">
                            <div class="space-y-1">
                                {% if rule.ips %}
                                <div>IPs: {{ rule.ips|length }}</div>
                                {% endif %}
                                {% if rule.countries %}
                                <div>Countries: {{ rule.countries|join(', ') }}</div>
                                {% endif %}
                            </div>
                        </td>
                        <td class="px-6 py-4 whitespace-nowrap text-sm">
                            {{ rule.priority }}
                        </td>
                        <td class="px-6 py-4 whitespace-nowrap">
                            <label class="switch">
                                <input type="checkbox" 
                                    {{ 'checked' if rule.enabled }}
                                    onchange="toggleRule('{{ rule.id }}', this.checked)">
                                <span class="slider round"></span>
                            </label>
                        </td>
                        <td class="px-6 py-4 whitespace-nowrap text-sm">
                            <button onclick="editRule('{{ rule.id }}')" 
                                class="text-blue-600 hover:text-blue-900 mr-2">Edit</button>
                            <button onclick="deleteRule('{{ rule.id }}')" 
                                class="text-red-600 hover:text-red-900">Delete</button>
                        </td>
                    </tr>
                    {% endfor %}
                </tbody>
            </table>
        </div>
    </div>

    <!-- IP Testing Tool -->
    <div class="bg-white dark:bg-gray-800 shadow rounded-lg p-6">
        <h3 class="text-lg font-bold mb-4 text-gray-900 dark:text-white">Test IP Access</h3>
        <div class="grid grid-cols-1 md:grid-cols-2 gap-4">
            <input type="text" id="test-ip" placeholder="IP Address (e.g., 192.168.1.1)" 
                class="form-input rounded-md">
            <input type="text" id="test-path" placeholder="Path (e.g., /api/admin)" 
                class="form-input rounded-md">
            <input type="text" id="test-user" placeholder="User ID (optional)" 
                class="form-input rounded-md">
            <input type="text" id="test-roles" placeholder="Roles (comma-separated)" 
                class="form-input rounded-md">
        </div>
        <button onclick="testIPAccess()" 
            class="mt-4 px-4 py-2 bg-blue-600 text-white rounded-md hover:bg-blue-700">
            Test Access
        </button>
        
        <div id="test-results" class="mt-4 hidden">
            <!-- Test results will be displayed here -->
        </div>
    </div>
</div>

7. JavaScript for UI

// IP Control Management
async function createRule(ruleData) {
    try {
        const response = await fetch('/api/ip-control/rules', {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
                'Authorization': `Bearer ${getAuthToken()}`
            },
            body: JSON.stringify(ruleData)
        });
        
        if (response.ok) {
            showToast('Rule created successfully', 'success');
            await refreshRuleList();
        } else {
            throw new Error(await response.text());
        }
    } catch (error) {
        showToast(`Error: ${error.message}`, 'error');
    }
}

async function testIPAccess() {
    const ip = document.getElementById('test-ip').value;
    const path = document.getElementById('test-path').value;
    const userId = document.getElementById('test-user').value;
    const roles = document.getElementById('test-roles').value.split(',').filter(r => r.trim());
    
    try {
        const response = await fetch('/api/ip-control/test', {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
                'Authorization': `Bearer ${getAuthToken()}`
            },
            body: JSON.stringify({
                ip_address: ip,
                path: path,
                test_user_id: userId || null,
                test_roles: roles
            })
        });
        
        const result = await response.json();
        displayTestResults(result);
    } catch (error) {
        showToast(`Error: ${error.message}`, 'error');
    }
}

function displayTestResults(result) {
    const resultsDiv = document.getElementById('test-results');
    resultsDiv.innerHTML = `
        <div class="p-4 rounded-lg ${result.allowed ? 'bg-green-50' : 'bg-red-50'}">
            <h4 class="font-bold ${result.allowed ? 'text-green-800' : 'text-red-800'}">
                Access ${result.allowed ? 'ALLOWED' : 'DENIED'}
            </h4>
            <dl class="mt-2 space-y-1 text-sm">
                <div class="flex">
                    <dt class="font-medium text-gray-500 w-24">Rule:</dt>
                    <dd class="text-gray-900">${result.matching_rule || 'None'}</dd>
                </div>
                <div class="flex">
                    <dt class="font-medium text-gray-500 w-24">Reason:</dt>
                    <dd class="text-gray-900">${result.reason}</dd>
                </div>
                ${result.country ? `
                <div class="flex">
                    <dt class="font-medium text-gray-500 w-24">Country:</dt>
                    <dd class="text-gray-900">${result.country}</dd>
                </div>
                ` : ''}
            </dl>
        </div>
    `;
    resultsDiv.classList.remove('hidden');
}

8. Example Environment Configuration

#####################################
# IP Access Control Configuration
#####################################

# Enable IP control system
IP_CONTROL_ENABLED=true

# Default mode when no rules match: allowlist, blocklist, disabled
IP_CONTROL_DEFAULT_MODE=disabled

# Check X-Forwarded-For header
IP_CONTROL_CHECK_BEHIND_PROXY=true

# Trusted proxy IPs (JSON array)
IP_CONTROL_TRUSTED_PROXIES=["10.0.0.1", "10.0.0.2"]

# Enforcement level: enforce, monitor, off
IP_CONTROL_ENFORCEMENT_LEVEL=enforce

# Cache TTL in seconds
IP_CONTROL_CACHE_TTL=300

# GeoIP database support
IP_CONTROL_USE_GEOIP=true
IP_CONTROL_GEOIP_DATABASE=/path/to/GeoLite2-Country.mmdb

# IP reputation checking
IP_CONTROL_USE_IP_REPUTATION=false
IP_CONTROL_IP_REPUTATION_THRESHOLD=0.5

# Emergency bypass codes
IP_CONTROL_BYPASS_ENABLED=true
IP_CONTROL_BYPASS_CODE_TTL=300

#####################################
# Example Rules (configured via API/UI)
#####################################
# Rule 1: Admin endpoints - localhost only
# - Name: "admin_localhost_only"
# - Mode: "allowlist"
# - IPs: ["127.0.0.1", "::1"]
# - Paths: ["/admin/*", "/api/admin/*"]
# - Priority: 100

# Rule 2: Block specific countries from API
# - Name: "geo_restrictions"
# - Mode: "blocklist"
# - Countries: ["XX", "YY"]
# - Paths: ["/api/*"]
# - Priority: 90

# Rule 3: Partner API access
# - Name: "partner_api_access"
# - Mode: "allowlist"
# - IPs: ["203.0.113.0/24", "198.51.100.0/24"]
# - Paths: ["/api/v1/partner/*"]
# - Roles: ["partner"]
# - Priority: 80

# Rule 4: Rate limit violators
# - Name: "rate_limit_blocklist"
# - Mode: "blocklist"
# - IPs: [] # Dynamically updated
# - Paths: ["*"]
# - Priority: 95

9. Testing Scenarios

# Test cases for IP control system
@pytest.mark.asyncio
async def test_ip_control_scenarios():
    # Test 1: Admin access from localhost
    assert await ip_service.check_access(
        ip="127.0.0.1",
        path="/admin/dashboard",
        user_roles=["admin"]
    ) == (True, "admin_localhost_only", "Allowed by allowlist rule")
    
    # Test 2: Admin access from external IP
    assert await ip_service.check_access(
        ip="203.0.113.5",
        path="/admin/dashboard",
        user_roles=["admin"]
    ) == (False, "admin_localhost_only", "Not in allowlist")
    
    # Test 3: Partner API access
    assert await ip_service.check_access(
        ip="203.0.113.10",
        path="/api/v1/partner/data",
        user_roles=["partner"]
    ) == (True, "partner_api_access", "Allowed by allowlist rule")
    
    # Test 4: Geo-blocking
    assert await ip_service.check_access(
        ip="1.2.3.4",  # Assume this resolves to blocked country
        path="/api/public/data",
        user_roles=["user"]
    ) == (False, "geo_restrictions", "Blocked by blocklist rule")
    
    # Test 5: Temporary block
    await ip_service.block_ip("192.168.1.100", 60, "Suspicious activity")
    assert await ip_service.check_access(
        ip="192.168.1.100",
        path="/api/any",
        user_roles=["user"]
    ) == (False, "temporary_block", "IP is temporarily blocked")

10. Security Considerations

Key Features:

  • No IP addresses exposed in user-facing error messages
  • All access attempts logged with full context
  • Rate limiting on rule creation/modification APIs
  • Validation of CIDR notation and IP formats
  • Protection against IP spoofing via proxy validation
  • Emergency bypass codes are single-use and time-limited
  • Rule changes trigger immediate cache invalidation
  • Webhook notifications for critical events

Performance Optimizations:

  • Redis caching of IP check results
  • Compiled regex patterns for path matching
  • Priority-based rule evaluation
  • Bloom filter for quick IP lookups (optional)
  • Background geo-location updates

Monitoring & Alerts:

  • Alert on sudden spike in blocked IPs
  • Monitor for rules blocking legitimate traffic
  • Track bypass code usage
  • Geographic anomaly detection
  • Performance metrics for rule evaluation

This generic implementation provides maximum flexibility while maintaining security. Organizations can use it for various scenarios from simple admin restrictions to complex partner API access controls.

This feature does not replace existing security measures, ingress filtering, API gateways, etc. but enhances them by providing a robust, flexible IP-based access control system that can adapt to diverse organizational needs.

Metadata

Metadata

Assignees

No one assigned

    Labels

    SHOULDP2: Important but not vital; high-value items that are not crucial for the immediate releaseenhancementNew feature or requestpythonPython / backend development (FastAPI)securityImproves securitysweng-group-5Group 5 - Policy-as-Code Security & Compliance AutomationtcdSwEng Projects

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions