fix: harden webhook/link/OAuth-avatar SSRF (advisory clusters A/B/C/E) (#9163)

* fix(api): harden webhook & link-unfurl SSRF (advisory clusters A/B/C)

Resolves three overlapping SSRF advisory clusters around webhook delivery
and work-item link unfurling:

- Cluster A (private-IP validation + PATCH bypass): the webhook PATCH
  handler passed context={request: request} (the request object as the
  dict key) so the loopback/disallowed-domain guard silently no-op'd —
  now context={"request": request}. Hardened IP classification
  (is_blocked_ip) to also block multicast, unspecified, CGNAT
  (100.64.0.0/10), and IPv4 embedded in IPv6 transition addresses
  (IPv4-mapped, NAT64, 6to4, Teredo), robust across Python versions.

- Cluster B (DNS-rebinding TOCTOU): validators resolved DNS, then
  requests resolved it again at connect time. New pinned-IP client
  (plane/utils/url_security.py) resolves+validates once and connects to
  the validated IP literal so urllib3 performs no second lookup, while
  preserving Host header, TLS SNI and certificate verification against
  the real hostname.

- Cluster C (redirect SSRF): webhook delivery never follows redirects;
  the link crawler follows them manually, re-resolving + re-validating +
  re-pinning every hop.

Also: pin requests==2.33.0 in base.txt (imported directly; the pinning
adapter needs the >=2.32 get_connection_with_tls_context hook), and log
webhook URL-validation rejections to WebhookLog instead of swallowing
them.

Tests: new test_url_security.py (pinning, rebinding, redirect
re-validation, IP edge cases, TLS SNI) + updated link-task tests.
Full unit suite: 178 passed.

* fix(api): block OAuth avatar SSRF + add per-advisory SSRF regression tests

Verified every SSRF-class advisory against the current code. The webhook /
link / favicon reports — including the published CVE-2026-30242 and
CVE-2026-39843 and the newer "still bypassable" reports (DNS rebinding
GHSA-3856/-fgcv/-9292/-whh3/-4mjx/-6p39/-fv24/-8wvv, IP-classification gaps
GHSA-75fg, redirect GHSA-6v37/-jw6g/-mq87) — are resolved by the pinned-IP
client + hardened classifier in this branch.

The one SSRF family still unresolved was the OAuth avatar path:
download_and_upload_avatar() fetched the provider-supplied avatar_url with a
raw requests.get (no IP validation, default redirect following), so an
attacker-controlled avatar could reach internal addresses and be exfiltrated
via the static-asset endpoint (GHSA-cv9p-325g-wmv5, and the avatar hop of the
Gitea SSRF GHSA-hx79-5pj5-qh42). It now uses pinned_fetch_following_redirects,
which validates + pins every hop and blocks internal targets.

Adds test_ssrf_advisories.py: a per-advisory regression map covering webhook
IP validation, the PATCH context-key guard, webhook DNS rebinding, webhook
redirect, favicon redirect + rebinding, and OAuth avatar SSRF.

docker compose test: 199 unit tests pass.

* fix(api): address PR review feedback on the SSRF pinned client

- url_security: preserve URL-embedded credentials (user:pass@host) as Basic
  Auth instead of silently dropping them when rewriting to the IP literal
  (Copilot); bracket IPv6-literal hostnames in the Host header (Copilot);
  add stream=True support that keeps the session open until the response is
  closed, and release intermediate redirect hops.
- ip_address / work_item_link_task: treat UnicodeError (IDNA failures) from
  getaddrinfo as a resolution failure, not an uncaught exception (CodeRabbit).
- authentication/adapter/base: stream the avatar download so the size cap
  actually bounds memory, upload the size-bounded buffer (not response.content),
  and always close the response (CodeRabbit, major).
- tests: cover auth preservation, IPv6 Host bracketing, IDNA handling, and
  streamed session lifetime; drop an unused import.

docker compose test: 204 unit tests pass.
This commit is contained in:
sriram veeraghanta
2026-05-31 00:12:23 +05:30
committed by GitHub
parent 248f5d66e6
commit 04622ce118
11 changed files with 1285 additions and 154 deletions
+1 -1
View File
@@ -81,7 +81,7 @@ class WebhookEndpoint(BaseAPIView):
serializer = WebhookSerializer(
webhook,
data=request.data,
context={request: request},
context={"request": request},
partial=True,
fields=(
"id",
+49 -35
View File
@@ -8,10 +8,10 @@ import os
import uuid
from io import BytesIO
import requests
from django.conf import settings
from django.core.exceptions import ValidationError
from django.core.validators import validate_email
from plane.utils.url_security import pinned_fetch_following_redirects
# Django imports
from django.utils import timezone
@@ -146,48 +146,62 @@ class Adapter:
try:
headers = self.get_avatar_download_headers()
# Download the avatar image
response = requests.get(avatar_url, timeout=10, headers=headers)
response.raise_for_status()
# Download the avatar image over an SSRF-safe client: the avatar URL
# comes from the OAuth provider's (attacker-influenceable) profile
# data, so it must not be allowed to reach internal addresses. The
# connection is pinned to the validated IP (defeats DNS rebinding)
# and every redirect hop is re-validated, so a public URL cannot
# bounce the fetch to an internal target — GHSA-cv9p-325g-wmv5 /
# GHSA-hx79-5pj5-qh42 (avatar hop).
# stream=True so the body is read incrementally and the size cap
# below actually bounds memory (without it, requests buffers the
# whole body before any check runs).
response, _ = pinned_fetch_following_redirects(
"GET", avatar_url, headers=headers, timeout=10, max_redirects=5, stream=True
)
try:
response.raise_for_status()
# Check content length before downloading
content_length = response.headers.get("Content-Length")
max_size = settings.DATA_UPLOAD_MAX_MEMORY_SIZE
if content_length and int(content_length) > max_size:
return None
# Get content type and determine file extension
content_type = response.headers.get("Content-Type", "image/jpeg")
extension_map = {
"image/jpeg": "jpg",
"image/jpg": "jpg",
"image/png": "png",
"image/gif": "gif",
"image/webp": "webp",
}
extension = extension_map.get(content_type)
if not extension:
return None
# Download with size limit
chunks = []
total_size = 0
for chunk in response.iter_content(chunk_size=8192):
total_size += len(chunk)
if total_size > max_size:
# Check content length before downloading
content_length = response.headers.get("Content-Length")
max_size = settings.DATA_UPLOAD_MAX_MEMORY_SIZE
if content_length and int(content_length) > max_size:
return None
chunks.append(chunk)
content = b"".join(chunks)
file_size = len(content)
# Get content type and determine file extension
content_type = response.headers.get("Content-Type", "image/jpeg")
extension_map = {
"image/jpeg": "jpg",
"image/jpg": "jpg",
"image/png": "png",
"image/gif": "gif",
"image/webp": "webp",
}
extension = extension_map.get(content_type)
if not extension:
return None
# Download with size limit
chunks = []
total_size = 0
for chunk in response.iter_content(chunk_size=8192):
total_size += len(chunk)
if total_size > max_size:
return None
chunks.append(chunk)
content = b"".join(chunks)
file_size = len(content)
finally:
response.close()
# Generate unique filename
filename = f"{uuid.uuid4().hex}-user-avatar.{extension}"
storage = S3Storage(request=self.request)
# Create file-like object
file_obj = BytesIO(response.content)
# Create file-like object from the size-bounded buffer
file_obj = BytesIO(content)
file_obj.seek(0)
# Upload using boto3 directly
+28 -11
View File
@@ -52,7 +52,7 @@ from plane.db.models import (
from plane.license.utils.instance_value import get_email_configuration
from plane.utils.email import generate_plain_text_from_html
from plane.utils.exception_logger import log_exception
from plane.utils.ip_address import validate_url
from plane.utils.url_security import pinned_fetch
SERIALIZER_MAPPER = {
@@ -307,22 +307,20 @@ def webhook_send_task(
return
try:
# Re-validate the webhook URL at send time to prevent DNS-rebinding attacks
validate_url(
# Resolve + validate the webhook URL and pin the connection to the
# validated IP. Pinning closes the DNS-rebinding TOCTOU (validating the
# name then letting requests re-resolve it lets an attacker swap in an
# internal IP between the two lookups). Redirects are never followed, so
# a 3xx Location cannot bounce the request to an internal address
# (GHSA-mq87-52pf-hm3h / cluster C).
response = pinned_fetch(
"POST",
webhook.url,
allowed_ips=settings.WEBHOOK_ALLOWED_IPS,
allowed_hosts=settings.WEBHOOK_ALLOWED_HOSTS,
)
# Send the webhook event
# allow_redirects=False prevents SSRF via 3xx hops to internal addresses
# bypassing the validate_url() check above (GHSA-mq87-52pf-hm3h).
response = requests.post(
webhook.url,
headers=headers,
json=payload,
timeout=30,
allow_redirects=False,
)
# Log the webhook request
@@ -366,6 +364,25 @@ def webhook_send_task(
return
raise requests.RequestException()
except ValueError as e:
# SSRF validation failure (blocked/internal target or unresolvable host).
# Not retryable — record it so the failure is visible to the admin, but
# do not raise (no Celery retry) and do not auto-deactivate (the cause
# may be transient DNS).
save_webhook_log(
webhook=webhook,
request_method=action,
request_headers=headers,
request_body=payload,
response_status=400,
response_headers="",
response_body=f"Webhook URL rejected: {e}",
retry_count=self.request.retries,
event_type=event,
)
logger.warning(f"Webhook {webhook.id} URL rejected: {e}")
return
except Exception as e:
log_exception(e)
return
+22 -33
View File
@@ -17,6 +17,8 @@ from typing import Dict, Any, Tuple
from typing import Optional
from plane.db.models import IssueLink
from plane.utils.exception_logger import log_exception
from plane.utils.ip_address import is_blocked_ip
from plane.utils.url_security import pinned_fetch, pinned_fetch_following_redirects
logger = logging.getLogger("plane.worker")
@@ -50,16 +52,19 @@ def validate_url_ip(url: str) -> None:
try:
addr_info = socket.getaddrinfo(hostname, None)
except socket.gaierror:
except (socket.gaierror, UnicodeError):
# UnicodeError covers IDNA failures raised before the address lookup.
raise ValueError("Hostname could not be resolved")
if not addr_info:
raise ValueError("No IP addresses found for the hostname")
# Check every resolved IP against blocked ranges to prevent SSRF
# Check every resolved IP against blocked ranges to prevent SSRF. The
# actual fetch is pinned to the validated IP (see safe_get), so this acts
# as an early, fail-closed pre-filter.
for addr in addr_info:
ip = ipaddress.ip_address(addr[4][0])
if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
ip = ipaddress.ip_address(addr[4][0].split("%")[0])
if is_blocked_ip(ip):
raise ValueError("Access to private/internal networks is not allowed")
@@ -72,8 +77,9 @@ def safe_get(
timeout: int = 1,
) -> Tuple[requests.Response, str]:
"""
Perform a GET request that validates every redirect hop against private IPs.
Prevents SSRF by ensuring no redirect lands on a private/internal address.
Perform a GET request that resolves, validates and pins every hop to its
validated IP. Prevents SSRF via private/internal targets, DNS rebinding
(TOCTOU) and redirects that bounce to internal addresses.
Args:
url: The URL to fetch
@@ -85,32 +91,16 @@ def safe_get(
Raises:
ValueError: If any URL in the redirect chain points to a private IP
requests.RequestException: On network errors
RuntimeError: If max redirects exceeded
requests.RequestException: On network errors (incl. TooManyRedirects)
"""
validate_url_ip(url)
current_url = url
response = requests.get(
current_url, headers=headers, timeout=timeout, allow_redirects=False
return pinned_fetch_following_redirects(
"GET",
url,
headers=headers,
timeout=timeout,
max_redirects=MAX_REDIRECTS,
)
redirect_count = 0
while response.is_redirect:
if redirect_count >= MAX_REDIRECTS:
raise RuntimeError(f"Too many redirects for URL: {url}")
redirect_url = response.headers.get("Location")
if not redirect_url:
break
current_url = urljoin(current_url, redirect_url)
validate_url_ip(current_url)
redirect_count += 1
response = requests.get(
current_url, headers=headers, timeout=timeout, allow_redirects=False
)
return response, current_url
def crawl_work_item_link_title_and_favicon(url: str) -> Dict[str, Any]:
"""
@@ -199,14 +189,13 @@ def find_favicon_url(soup: Optional[BeautifulSoup], base_url: str) -> Optional[s
parsed_url = urlparse(base_url)
fallback_url = f"{parsed_url.scheme}://{parsed_url.netloc}/favicon.ico"
# Check if fallback exists
# Check if fallback exists (pinned to the validated IP).
try:
validate_url_ip(fallback_url)
response = requests.head(fallback_url, timeout=2, allow_redirects=False)
response = pinned_fetch("HEAD", fallback_url, timeout=2)
if response.status_code == 200:
return fallback_url
except requests.RequestException as e:
except (requests.RequestException, ValueError) as e:
log_exception(e, warning=True)
return None
@@ -0,0 +1,289 @@
# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.
"""
Per-advisory SSRF regression tests.
Each test reproduces a published / reported SSRF advisory scenario and asserts
the current code blocks it. This file is the auditable map of "which advisory is
covered where"; the lower-level mechanics (IP classification, pinning, redirect
re-validation) are exercised in detail in ``test_url_security.py`` and
``test_work_item_link_task.py``.
Advisory coverage
-----------------
Webhook delivery
* GHSA-m3f8-q4wj-9grv / CVE-2026-30242 / GHSA-75vf-hh93-h7mx
webhook URL resolves to a private/metadata/loopback IP -> TestWebhookUrlValidation
* GHSA-75fg-f8qg-23wv CGNAT(100.64/10), 6to4, multicast missed -> TestWebhookUrlValidation
* GHSA-6485-m23r-fx8q PATCH serializer context-key bypass -> TestWebhookPatchContextGuard
* GHSA-whh3-5g95-4qhc / -4mjx-q738-87cf / -6p39-x6q9-h3g5 /
-9292-pvg4-7hvm / -fgcv-6h3f-xcx9 webhook DNS-rebinding TOCTOU -> TestWebhookRebinding
* GHSA-6v37-328w-j2wv / -jw6g-h7h5-rfc6 / -mq87-52pf-hm3h
webhook SSRF via HTTP redirect following -> TestWebhookRedirect
Work-item link unfurling / favicon
* GHSA-8wvv-p676-hcw4 / -fv24-3845-646g / -9292-pvg4-7hvm link rebinding
* GHSA-9fr2-pprw-pp9j / CVE-2026-39843 favicon redirect SSRF -> TestFaviconRedirect
* GHSA-3856-6mgg-rx84 favicon DNS-rebinding -> TestFaviconRebinding
OAuth avatar (the still-unresolved family this change adds)
* GHSA-cv9p-325g-wmv5 OAuth avatar redirect SSRF -> static-asset exfil
* GHSA-hx79-5pj5-qh42 Gitea OAuth SSRF (avatar hop) -> TestOAuthAvatarSSRF
"""
import pytest
import requests
from unittest.mock import MagicMock, patch
from bs4 import BeautifulSoup
from plane.utils.ip_address import validate_url
from plane.bgtasks.work_item_link_task import fetch_and_encode_favicon, DEFAULT_FAVICON
from plane.authentication.adapter.base import Adapter
def _addr(ip):
family = 6 if ":" in ip else 2
return (family, None, None, None, (ip, 0))
def _resp(status_code=200, headers=None, content=b"OK"):
resp = MagicMock(spec=requests.Response)
resp.status_code = status_code
resp.headers = headers or {}
resp.content = content
return resp
_BLOCKED = "Access to private/internal networks is not allowed"
# ---------------------------------------------------------------------------
# Webhook URL validation (creation/update-time defense in depth)
# GHSA-m3f8-q4wj-9grv / CVE-2026-30242 / GHSA-75vf-hh93-h7mx / GHSA-75fg-f8qg-23wv
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestWebhookUrlValidation:
@pytest.mark.parametrize(
"ip",
[
"169.254.169.254", # AWS/GCP metadata (CVE-2026-30242 PoC)
"127.0.0.1", # loopback
"10.0.0.1", # private
"172.16.0.1", # private
"192.168.0.1", # private
"::1", # IPv6 loopback
"100.64.0.1", # CGNAT / RFC 6598 (GHSA-75fg)
"2002:7f00:1::", # 6to4 -> 127.0.0.1 (GHSA-75fg)
"224.0.0.1", # multicast (GHSA-75fg)
"::ffff:169.254.169.254", # IPv4-mapped metadata
],
)
def test_webhook_url_to_internal_is_rejected(self, ip):
with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
dns.return_value = [_addr(ip)]
with pytest.raises(ValueError, match="private/internal"):
validate_url(
"https://attacker.example.com/hook",
allowed_ips=[],
allowed_hosts=[],
)
def test_legitimate_public_webhook_url_passes(self):
with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
dns.return_value = [_addr("93.184.216.34")]
# Should not raise
validate_url("https://hooks.example.com/x", allowed_ips=[], allowed_hosts=[])
# ---------------------------------------------------------------------------
# GHSA-6485-m23r-fx8q — PATCH serializer context-key bypass
# The PATCH view now passes context={"request": request}; with the request in
# context the disallowed-domain / request-host loop-back guard runs on update.
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestWebhookPatchContextGuard:
def _serializer_with_request(self, host):
from plane.app.serializers import WebhookSerializer
request = MagicMock()
request.get_host.return_value = host
return WebhookSerializer(context={"request": request})
def test_request_host_is_blocked_when_context_present(self):
# A webhook pointed at the instance's own host must be rejected — this
# is the guard the PATCH endpoint silently skipped with the wrong key.
ser = self._serializer_with_request("myplane.example.com")
with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
dns.return_value = [_addr("93.184.216.34")] # public, so only the host guard can block
with pytest.raises(Exception, match="not allowed"):
ser._validate_webhook_url("https://myplane.example.com/hook")
def test_unrelated_public_host_passes_with_context(self):
ser = self._serializer_with_request("myplane.example.com")
with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
dns.return_value = [_addr("93.184.216.34")]
ser._validate_webhook_url("https://hooks.partner.com/x") # should not raise
# ---------------------------------------------------------------------------
# Webhook DNS-rebinding TOCTOU
# GHSA-whh3-5g95-4qhc / -4mjx-q738-87cf / -6p39-x6q9-h3g5 / -9292 / -fgcv
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestWebhookRebinding:
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_connection_pinned_to_validated_ip(self, mock_resolve, mock_session_cls):
from plane.utils.url_security import pinned_fetch
# The validator resolves to a public IP; the connection must go to THAT
# IP literal, so a rebind to an internal IP after validation is moot.
mock_resolve.return_value = ["93.184.216.34"]
session = mock_session_cls.return_value
session.request.return_value = _resp(200)
pinned_fetch("POST", "https://rebinder.example.com/hook", json={})
_, url = session.request.call_args.args
assert url == "https://93.184.216.34:443/hook" # IP literal -> no 2nd DNS lookup
@patch("plane.utils.url_security.resolve_and_validate")
def test_rebind_to_internal_is_blocked(self, mock_resolve):
from plane.utils.url_security import pinned_fetch
mock_resolve.side_effect = ValueError(_BLOCKED)
with pytest.raises(ValueError, match="private/internal"):
pinned_fetch("POST", "https://rebinder.example.com/hook", json={})
# ---------------------------------------------------------------------------
# Webhook SSRF via HTTP redirect following
# GHSA-6v37-328w-j2wv / GHSA-jw6g-h7h5-rfc6 / GHSA-mq87-52pf-hm3h
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestWebhookRedirect:
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_webhook_does_not_follow_redirects(self, mock_resolve, mock_session_cls):
from plane.utils.url_security import pinned_fetch
mock_resolve.return_value = ["93.184.216.34"]
session = mock_session_cls.return_value
# The endpoint replies 302 -> internal; the webhook client must NOT follow.
session.request.return_value = _resp(
302, headers={"Location": "http://169.254.169.254/latest/meta-data/"}
)
resp = pinned_fetch("POST", "https://hooks.example.com/x", json={})
# The 3xx is returned as-is and only ONE request was made (no follow).
assert resp.status_code == 302
assert session.request.call_count == 1
assert session.request.call_args.kwargs["allow_redirects"] is False
# ---------------------------------------------------------------------------
# Favicon redirect SSRF — GHSA-9fr2-pprw-pp9j / CVE-2026-39843
# A <link rel=icon> whose href is public but 30x-redirects to a private IP must
# NOT exfiltrate internal content; the favicon falls back to the default icon.
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestFaviconRedirect:
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
@patch("plane.bgtasks.work_item_link_task.socket.getaddrinfo")
def test_favicon_redirect_to_private_returns_default(
self, mock_pre_dns, mock_resolve, mock_session_cls
):
# validate_url_ip pre-check (work_item_link_task.socket) sees a public IP.
mock_pre_dns.return_value = [_addr("93.184.216.34")]
# safe_get: hop0 public, hop1 (redirect target) blocked.
mock_resolve.side_effect = [["93.184.216.34"], ValueError(_BLOCKED)]
session = mock_session_cls.return_value
session.request.return_value = _resp(
302, headers={"Location": "http://192.168.8.14:8081/"}
)
soup = BeautifulSoup(
'<link rel="icon" href="https://redirector.example.com/x">',
"html.parser",
)
result = fetch_and_encode_favicon({}, soup, "https://attacker.example.com")
# Blocked -> default icon, NOT the internal response body.
assert result["favicon_base64"] == f"data:image/svg+xml;base64,{DEFAULT_FAVICON}"
# ---------------------------------------------------------------------------
# Favicon DNS rebinding — GHSA-3856-6mgg-rx84
# The favicon host passes the pre-check (public) but resolves to a private IP at
# fetch time; the pinned client re-resolves+validates and blocks it.
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestFaviconRebinding:
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
@patch("plane.bgtasks.work_item_link_task.socket.getaddrinfo")
def test_favicon_rebind_to_private_returns_default(
self, mock_pre_dns, mock_resolve, mock_session_cls
):
mock_pre_dns.return_value = [_addr("93.184.216.34")] # pre-check: public
mock_resolve.side_effect = ValueError(_BLOCKED) # fetch-time: rebound -> blocked
session = mock_session_cls.return_value
session.request.return_value = _resp(200)
soup = BeautifulSoup(
'<link rel="icon" href="http://rebind.example.com:8443/">',
"html.parser",
)
result = fetch_and_encode_favicon({}, soup, "https://attacker.example.com")
assert result["favicon_base64"] == f"data:image/svg+xml;base64,{DEFAULT_FAVICON}"
# ---------------------------------------------------------------------------
# OAuth avatar SSRF — GHSA-cv9p-325g-wmv5 / GHSA-hx79-5pj5-qh42 (avatar hop)
# download_and_upload_avatar must reject avatar URLs that point at, or redirect
# to, internal addresses, returning None (no fetch stored as an asset).
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestOAuthAvatarSSRF:
def _adapter(self):
return Adapter(request=MagicMock(), provider="gitea")
@patch("plane.utils.url_security.resolve_and_validate")
def test_avatar_to_internal_ip_is_blocked(self, mock_resolve):
mock_resolve.side_effect = ValueError(_BLOCKED)
result = self._adapter().download_and_upload_avatar(
"http://169.254.169.254/latest/meta-data/", user=MagicMock()
)
assert result is None
mock_resolve.assert_called() # SSRF validation was actually attempted
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_avatar_redirect_to_internal_is_blocked(self, mock_resolve, mock_session_cls):
# Public avatar URL that 302-redirects to the metadata service.
mock_resolve.side_effect = [["93.184.216.34"], ValueError(_BLOCKED)]
session = mock_session_cls.return_value
session.request.return_value = _resp(
302, headers={"Location": "http://169.254.169.254/imds"}
)
result = self._adapter().download_and_upload_avatar(
"https://evil.example.com/avatar", user=MagicMock()
)
assert result is None
@patch("plane.authentication.adapter.base.pinned_fetch_following_redirects")
def test_avatar_uses_ssrf_safe_client(self, mock_fetch):
# Wiring guard: the avatar path must go through the pinned client, never
# a raw requests.get (which would re-resolve + follow redirects freely).
mock_fetch.side_effect = ValueError(_BLOCKED)
result = self._adapter().download_and_upload_avatar(
"https://cdn.example.com/a.png", user=MagicMock()
)
assert result is None
assert mock_fetch.call_args.args[0] == "GET"
assert mock_fetch.call_args.args[1] == "https://cdn.example.com/a.png"
@@ -0,0 +1,395 @@
# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.
"""
SSRF-protection tests for the webhook + link-unfurling clusters (advisories A/B/C):
A — incomplete private-IP validation -> is_blocked_ip hardening
B — DNS-rebinding TOCTOU -> connection pinned to the validated IP
C — SSRF via HTTP redirect following -> redirects re-resolved/re-validated/re-pinned
"""
import ipaddress
import pytest
import requests
from unittest.mock import MagicMock, patch
from plane.utils.ip_address import is_blocked_ip, resolve_and_validate, validate_url
from plane.utils.url_security import (
PinnedIPAdapter,
pinned_fetch,
pinned_fetch_following_redirects,
)
def _addr(ip):
"""Build a single getaddrinfo-style result tuple for an IP string."""
family = 6 if ":" in ip else 2
return (family, None, None, None, (ip, 0))
def _resp(status_code=200, headers=None, content=b"OK"):
resp = MagicMock(spec=requests.Response)
resp.status_code = status_code
resp.headers = headers or {}
resp.content = content
return resp
# ---------------------------------------------------------------------------
# Cluster A — robust IP classification (verified on Python 3.12 semantics)
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestIsBlockedIp:
@pytest.mark.parametrize(
"ip",
[
"127.0.0.1", # loopback
"10.0.0.1", # private
"192.168.1.1", # private
"172.16.0.1", # private
"169.254.169.254", # link-local / cloud metadata
"0.0.0.0", # unspecified
"100.64.0.1", # CGNAT / shared (NOT is_private on py3.12!)
"224.0.0.1", # multicast
"239.255.255.250", # SSDP multicast
"255.255.255.255", # limited broadcast
"::1", # IPv6 loopback
"fe80::1", # IPv6 link-local
"fc00::1", # IPv6 unique-local
"ff02::1", # IPv6 multicast
"::ffff:127.0.0.1", # IPv4-mapped loopback
"::ffff:169.254.169.254", # IPv4-mapped metadata
"::ffff:10.0.0.1", # IPv4-mapped private
"64:ff9b::7f00:1", # NAT64 well-known prefix embedding 127.0.0.1
"64:ff9b::a9fe:a9fe", # NAT64 well-known prefix embedding 169.254.169.254
"64:ff9b:1::7f00:1", # NAT64 local-use prefix (RFC 8215, /48)
"64:ff9b:1:0100::1", # NAT64 local-use prefix, outside the /96 subset
"2002:7f00:1::", # 6to4 embedding 127.0.0.1
"2002:a00:1::", # 6to4 embedding 10.0.0.1
],
)
def test_blocks_internal(self, ip):
assert is_blocked_ip(ipaddress.ip_address(ip)) is True
@pytest.mark.parametrize(
"ip",
[
"8.8.8.8",
"93.184.216.34",
"1.1.1.1",
"2606:4700:4700::1111", # public IPv6 (Cloudflare)
"2001:4860:4860::8888", # public IPv6 (Google)
],
)
def test_allows_public(self, ip):
assert is_blocked_ip(ipaddress.ip_address(ip)) is False
# ---------------------------------------------------------------------------
# resolve_and_validate — resolution + validation, returns IPs to pin
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestResolveAndValidate:
def test_returns_public_ips(self):
with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
dns.return_value = [_addr("93.184.216.34")]
assert resolve_and_validate("example.com") == ["93.184.216.34"]
def test_raises_on_private(self):
with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
dns.return_value = [_addr("10.0.0.1")]
with pytest.raises(ValueError, match="private/internal"):
resolve_and_validate("internal.example.com")
def test_raises_if_any_resolved_ip_is_private(self):
# A hostname that resolves to BOTH a public and a private IP must fail
# closed — an attacker could otherwise steer the connection to the
# private one.
with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
dns.return_value = [_addr("93.184.216.34"), _addr("127.0.0.1")]
with pytest.raises(ValueError, match="private/internal"):
resolve_and_validate("rebinder.example.com")
def test_allowlist_permits_private(self):
allowed = [ipaddress.ip_network("10.0.0.0/8")]
with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
dns.return_value = [_addr("10.0.0.5")]
assert resolve_and_validate("internal", allowed_ips=allowed) == ["10.0.0.5"]
def test_unresolvable_raises(self):
import socket as _socket
with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
dns.side_effect = _socket.gaierror()
with pytest.raises(ValueError, match="could not be resolved"):
resolve_and_validate("nope.invalid")
# ---------------------------------------------------------------------------
# Cluster B — connection pinned to the validated IP (DNS-rebinding TOCTOU)
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestPinnedFetch:
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_connects_to_validated_ip_not_hostname(self, mock_resolve, mock_session_cls):
mock_resolve.return_value = ["93.184.216.34"]
session = mock_session_cls.return_value
session.request.return_value = _resp(200)
pinned_fetch("POST", "https://example.com/hook", json={"a": 1})
# The socket target is the validated IP literal — there is no second
# DNS lookup, so a rebind between validation and connection is
# impossible.
method, url = session.request.call_args.args
kwargs = session.request.call_args.kwargs
assert method == "POST"
assert url == "https://93.184.216.34:443/hook"
# Host header + TLS SNI still target the real hostname.
assert kwargs["headers"]["Host"] == "example.com"
assert kwargs["allow_redirects"] is False
assert kwargs["verify"] is True
assert kwargs["json"] == {"a": 1}
# Ambient proxy/env must not be honoured (would bypass pinning).
assert session.trust_env is False
assert kwargs["proxies"] == {"http": None, "https": None}
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_non_default_port_in_host_header(self, mock_resolve, mock_session_cls):
mock_resolve.return_value = ["93.184.216.34"]
session = mock_session_cls.return_value
session.request.return_value = _resp(200)
pinned_fetch("GET", "http://example.com:8080/x")
_, url = session.request.call_args.args
kwargs = session.request.call_args.kwargs
assert url == "http://93.184.216.34:8080/x"
assert kwargs["headers"]["Host"] == "example.com:8080"
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_ipv6_validated_ip_is_bracketed(self, mock_resolve, mock_session_cls):
mock_resolve.return_value = ["2606:4700:4700::1111"]
session = mock_session_cls.return_value
session.request.return_value = _resp(200)
pinned_fetch("GET", "https://example.com/x")
_, url = session.request.call_args.args
assert url == "https://[2606:4700:4700::1111]:443/x"
@patch("plane.utils.url_security.resolve_and_validate")
def test_blocked_target_raises_before_any_request(self, mock_resolve):
mock_resolve.side_effect = ValueError(
"Access to private/internal networks is not allowed"
)
with pytest.raises(ValueError, match="private/internal"):
pinned_fetch("POST", "https://attacker.com/hook")
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_tries_next_ip_on_connection_error(self, mock_resolve, mock_session_cls):
# Dual-stack host: first validated IP is unreachable, second works.
mock_resolve.return_value = ["93.184.216.34", "93.184.216.35"]
session = mock_session_cls.return_value
session.request.side_effect = [
requests.ConnectionError("down"),
_resp(200),
]
resp = pinned_fetch("GET", "https://example.com/x")
assert resp.status_code == 200
assert session.request.call_count == 2
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_allowed_host_skips_block_check_but_still_pins(self, mock_resolve, mock_session_cls):
# Trusted host (e.g. internal docker service) whose IP is private: the
# block check is skipped, but the connection is STILL pinned to the
# resolved IP so it cannot be rebound to a different internal target.
mock_resolve.return_value = ["172.18.0.5"]
session = mock_session_cls.return_value
session.request.return_value = _resp(200)
pinned_fetch(
"POST",
"http://silo:3000/hook",
allowed_hosts=["silo"],
json={"x": 1},
)
# Resolution happens with require_safe=False (trusted, skip block check).
assert mock_resolve.call_args.kwargs.get("require_safe") is False
# ...but the connection is pinned to the resolved IP literal, Host=silo.
_, url = session.request.call_args.args
assert url == "http://172.18.0.5:3000/hook"
assert session.request.call_args.kwargs["headers"]["Host"] == "silo:3000"
assert session.request.call_args.kwargs["allow_redirects"] is False
# ---------------------------------------------------------------------------
# Cluster C — redirects re-resolved / re-validated / re-pinned each hop
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestPinnedFetchRedirects:
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_no_redirect_returns_response(self, mock_resolve, mock_session_cls):
mock_resolve.return_value = ["93.184.216.34"]
session = mock_session_cls.return_value
session.request.return_value = _resp(200)
resp, final = pinned_fetch_following_redirects("GET", "https://example.com/a")
assert resp.status_code == 200
assert final == "https://example.com/a"
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_follows_and_revalidates_each_hop(self, mock_resolve, mock_session_cls):
mock_resolve.return_value = ["93.184.216.34"]
session = mock_session_cls.return_value
session.request.side_effect = [
_resp(301, headers={"Location": "https://other.com/page"}),
_resp(200),
]
resp, final = pinned_fetch_following_redirects("GET", "https://example.com/a")
assert resp.status_code == 200
assert final == "https://other.com/page"
# Re-resolved (and thus re-validated + re-pinned) on each hop.
assert mock_resolve.call_count == 2
assert mock_resolve.call_args_list[0].args[0] == "example.com"
assert mock_resolve.call_args_list[1].args[0] == "other.com"
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_blocks_redirect_to_private_ip(self, mock_resolve, mock_session_cls):
# First hop resolves public; redirect target resolves private -> blocked
mock_resolve.side_effect = [
["93.184.216.34"],
ValueError("Access to private/internal networks is not allowed"),
]
session = mock_session_cls.return_value
session.request.return_value = _resp(
302, headers={"Location": "http://169.254.169.254/latest/meta-data/"}
)
with pytest.raises(ValueError, match="private/internal"):
pinned_fetch_following_redirects("GET", "https://evil.com/r")
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_too_many_redirects(self, mock_resolve, mock_session_cls):
mock_resolve.return_value = ["93.184.216.34"]
session = mock_session_cls.return_value
session.request.return_value = _resp(
302, headers={"Location": "https://example.com/loop"}
)
with pytest.raises(requests.TooManyRedirects):
pinned_fetch_following_redirects(
"GET", "https://example.com/start", max_redirects=3
)
# ---------------------------------------------------------------------------
# PinnedIPAdapter — TLS server_hostname injection (cert verified vs hostname)
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestPinnedIPAdapter:
def test_injects_server_hostname_into_pool(self):
adapter = PinnedIPAdapter(server_hostname="example.com")
adapter.build_connection_pool_key_attributes = MagicMock(
return_value=({"scheme": "https", "host": "93.184.216.34", "port": 443}, {})
)
adapter.poolmanager = MagicMock()
request = MagicMock()
adapter.get_connection_with_tls_context(request, verify=True)
_, kwargs = adapter.poolmanager.connection_from_host.call_args
assert kwargs["pool_kwargs"]["server_hostname"] == "example.com"
# ---------------------------------------------------------------------------
# validate_url — create/update-time defense in depth still rejects bypasses
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestValidateUrlHardening:
@pytest.mark.parametrize("ip", ["100.64.0.1", "224.0.0.1", "0.0.0.0"])
def test_rejects_newly_covered_ranges(self, ip):
with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
dns.return_value = [_addr(ip)]
with pytest.raises(ValueError, match="private/internal"):
validate_url("http://attacker.example.com")
# ---------------------------------------------------------------------------
# Review-feedback fixes (PR #9163)
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestReviewFixes:
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_url_embedded_credentials_become_basic_auth(self, mock_resolve, mock_session_cls):
# user:pass@host -> Basic Auth preserved as auth=, userinfo stripped from URL
mock_resolve.return_value = ["93.184.216.34"]
session = mock_session_cls.return_value
session.request.return_value = _resp(200)
pinned_fetch("GET", "https://user:p%40ss@example.com/hook")
_, url = session.request.call_args.args
kwargs = session.request.call_args.kwargs
assert url == "https://93.184.216.34:443/hook" # no userinfo in the IP URL
assert kwargs["auth"] == ("user", "p@ss") # percent-decoded
assert kwargs["headers"]["Host"] == "example.com"
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_no_credentials_passes_auth_none(self, mock_resolve, mock_session_cls):
mock_resolve.return_value = ["93.184.216.34"]
session = mock_session_cls.return_value
session.request.return_value = _resp(200)
pinned_fetch("GET", "https://example.com/x")
assert session.request.call_args.kwargs["auth"] is None
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_ipv6_literal_host_header_is_bracketed(self, mock_resolve, mock_session_cls):
mock_resolve.return_value = ["2606:4700:4700::1111"]
session = mock_session_cls.return_value
session.request.return_value = _resp(200)
pinned_fetch("GET", "https://[2606:4700:4700::1111]/x")
kwargs = session.request.call_args.kwargs
assert kwargs["headers"]["Host"] == "[2606:4700:4700::1111]"
def test_idna_unicode_error_is_treated_as_unresolvable(self):
# getaddrinfo can raise UnicodeError (IDNA) before any lookup; it must
# surface as ValueError so webhook_send_task records a URL rejection.
with patch("plane.utils.ip_address.socket.getaddrinfo") as dns:
dns.side_effect = UnicodeError("label empty or too long")
with pytest.raises(ValueError, match="could not be resolved"):
resolve_and_validate("xn--bad-name")
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_stream_defers_session_close_until_response_close(self, mock_resolve, mock_session_cls):
# With stream=True the size cap can bound memory only if the session
# stays open until the body is read; closing the response closes it.
mock_resolve.return_value = ["93.184.216.34"]
session = mock_session_cls.return_value
resp = _resp(200)
session.request.return_value = resp
out = pinned_fetch("GET", "https://cdn.example.com/a.png", stream=True)
assert session.request.call_args.kwargs["stream"] is True
session.close.assert_not_called() # deferred
out.close()
session.close.assert_called_once()
@@ -5,6 +5,7 @@
import ipaddress
import pytest
import requests
from unittest.mock import patch, MagicMock
from plane.bgtasks.work_item_link_task import safe_get, validate_url_ip
from plane.utils.ip_address import validate_url
@@ -45,6 +46,22 @@ class TestValidateUrlIp:
mock_dns.return_value = [(None, None, None, None, ("93.184.216.34", 0))]
validate_url_ip("https://example.com") # Should not raise
@pytest.mark.parametrize(
"ip",
[
"100.64.0.1", # CGNAT / shared address space (not is_private on 3.12)
"224.0.0.1", # multicast
"0.0.0.0", # unspecified
"::ffff:169.254.169.254", # IPv4-mapped cloud metadata
"64:ff9b::a9fe:a9fe", # NAT64 embedding 169.254.169.254
],
)
def test_rejects_hardened_bypass_ranges(self, ip):
with patch("plane.bgtasks.work_item_link_task.socket.getaddrinfo") as mock_dns:
mock_dns.return_value = [(None, None, None, None, (ip, 0))]
with pytest.raises(ValueError, match="private/internal"):
validate_url_ip("http://attacker.example.com")
@pytest.mark.unit
class TestValidateUrlAllowlist:
@@ -133,82 +150,81 @@ class TestValidateUrlAllowlist:
@pytest.mark.unit
class TestSafeGet:
"""Test safe_get follows redirects safely and blocks SSRF."""
"""safe_get now delegates to the pinned SSRF-safe client; assert it resolves,
validates, pins to the validated IP and follows redirects safely. Network is
mocked at the requests.Session boundary inside plane.utils.url_security."""
@patch("plane.bgtasks.work_item_link_task.requests.get")
@patch("plane.bgtasks.work_item_link_task.validate_url_ip")
def test_returns_response_for_non_redirect(self, mock_validate, mock_get):
final_resp = _make_response(status_code=200, content=b"OK")
mock_get.return_value = final_resp
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_returns_response_for_non_redirect(self, mock_resolve, mock_session_cls):
mock_resolve.return_value = ["93.184.216.34"]
session = mock_session_cls.return_value
session.request.return_value = _make_response(status_code=200, content=b"OK")
response, final_url = safe_get("https://example.com")
assert response is final_resp
assert response.status_code == 200
assert final_url == "https://example.com"
mock_validate.assert_called_once_with("https://example.com")
# Pinned to the validated IP literal, not the hostname.
_, url = session.request.call_args.args
assert url == "https://93.184.216.34:443/"
assert session.request.call_args.kwargs["headers"]["Host"] == "example.com"
@patch("plane.bgtasks.work_item_link_task.requests.get")
@patch("plane.bgtasks.work_item_link_task.validate_url_ip")
def test_follows_redirect_and_validates_each_hop(self, mock_validate, mock_get):
redirect_resp = _make_response(
status_code=301,
is_redirect=True,
headers={"Location": "https://other.com/page"},
)
final_resp = _make_response(status_code=200, content=b"OK")
mock_get.side_effect = [redirect_resp, final_resp]
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_follows_redirect_and_validates_each_hop(self, mock_resolve, mock_session_cls):
mock_resolve.return_value = ["93.184.216.34"]
session = mock_session_cls.return_value
session.request.side_effect = [
_make_response(status_code=301, headers={"Location": "https://other.com/page"}),
_make_response(status_code=200, content=b"OK"),
]
response, final_url = safe_get("https://example.com")
assert response is final_resp
assert response.status_code == 200
assert final_url == "https://other.com/page"
# Should validate both the initial URL and the redirect target
assert mock_validate.call_count == 2
mock_validate.assert_any_call("https://example.com")
mock_validate.assert_any_call("https://other.com/page")
assert mock_resolve.call_count == 2
assert mock_resolve.call_args_list[0].args[0] == "example.com"
assert mock_resolve.call_args_list[1].args[0] == "other.com"
@patch("plane.bgtasks.work_item_link_task.requests.get")
@patch("plane.bgtasks.work_item_link_task.validate_url_ip")
def test_blocks_redirect_to_private_ip(self, mock_validate, mock_get):
redirect_resp = _make_response(
status_code=302,
is_redirect=True,
headers={"Location": "http://192.168.1.1:8080"},
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_blocks_redirect_to_private_ip(self, mock_resolve, mock_session_cls):
mock_resolve.side_effect = [
["93.184.216.34"],
ValueError("Access to private/internal networks is not allowed"),
]
session = mock_session_cls.return_value
session.request.return_value = _make_response(
status_code=302, headers={"Location": "http://192.168.1.1:8080"}
)
mock_get.return_value = redirect_resp
# First call (initial URL) succeeds, second call (redirect target) fails
mock_validate.side_effect = [None, ValueError("Access to private/internal networks is not allowed")]
with pytest.raises(ValueError, match="private/internal"):
safe_get("https://evil.com/redirect")
@patch("plane.bgtasks.work_item_link_task.requests.get")
@patch("plane.bgtasks.work_item_link_task.validate_url_ip")
def test_raises_on_too_many_redirects(self, mock_validate, mock_get):
redirect_resp = _make_response(
status_code=302,
is_redirect=True,
headers={"Location": "https://example.com/loop"},
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_raises_on_too_many_redirects(self, mock_resolve, mock_session_cls):
mock_resolve.return_value = ["93.184.216.34"]
session = mock_session_cls.return_value
session.request.return_value = _make_response(
status_code=302, headers={"Location": "https://example.com/loop"}
)
mock_get.return_value = redirect_resp
with pytest.raises(RuntimeError, match="Too many redirects"):
with pytest.raises(requests.TooManyRedirects):
safe_get("https://example.com/start")
@patch("plane.bgtasks.work_item_link_task.requests.get")
@patch("plane.bgtasks.work_item_link_task.validate_url_ip")
def test_succeeds_at_exact_max_redirects(self, mock_validate, mock_get):
"""After exactly MAX_REDIRECTS hops, if the final response is 200, it should succeed."""
redirect_resp = _make_response(
status_code=302,
is_redirect=True,
headers={"Location": "https://example.com/next"},
)
final_resp = _make_response(status_code=200, content=b"OK")
# 5 redirects then a 200
mock_get.side_effect = [redirect_resp] * 5 + [final_resp]
@patch("plane.utils.url_security.requests.Session")
@patch("plane.utils.url_security.resolve_and_validate")
def test_succeeds_at_exact_max_redirects(self, mock_resolve, mock_session_cls):
"""5 redirects then a 200 must succeed (MAX_REDIRECTS == 5)."""
mock_resolve.return_value = ["93.184.216.34"]
session = mock_session_cls.return_value
session.request.side_effect = [
_make_response(status_code=302, headers={"Location": "https://example.com/next"})
] * 5 + [_make_response(status_code=200, content=b"OK")]
response, final_url = safe_get("https://example.com/start")
assert response is final_resp
assert not response.is_redirect
assert response.status_code == 200
+153 -16
View File
@@ -8,10 +8,162 @@ import socket
from urllib.parse import urlparse
# Networks that must never be reachable as an outbound request target but which
# the stdlib ``ipaddress`` flags (is_private/is_loopback/...) do NOT reliably
# classify on every Python version. Listed explicitly so the verdict is
# identical and fail-closed across Python 3.9 3.14 (Plane ships on 3.12,
# where e.g. 100.64.0.0/10 is neither is_private nor is_global).
_BLOCKED_NETWORKS = [
ipaddress.ip_network(cidr)
for cidr in (
"0.0.0.0/8", # "this host on this network" (RFC 1122) / unspecified block
"100.64.0.0/10", # carrier-grade NAT / shared address space (RFC 6598)
"169.254.0.0/16", # link-local (incl. cloud metadata 169.254.169.254)
"255.255.255.255/32", # limited broadcast
"::ffff:0:0/96", # IPv4-mapped IPv6
"64:ff9b::/96", # NAT64 well-known prefix (RFC 6052)
"64:ff9b:1::/48", # NAT64 local-use prefix (RFC 8215)
"2002::/16", # 6to4
"2001::/32", # Teredo
"fec0::/10", # deprecated IPv6 site-local
)
]
def _embedded_ipv4(ip):
"""
Yield any IPv4 address embedded inside an IPv6 transition address.
An attacker who controls a hostname's AAAA record can point it at an IPv6
address that the network transparently translates to an internal IPv4
target (e.g. ``::ffff:169.254.169.254``, ``64:ff9b::7f00:1`` → 127.0.0.1,
6to4, Teredo). The embedded IPv4 is what the packet ultimately reaches, so
it must be validated too — we cannot trust the interpreter to classify the
outer IPv6 address consistently across versions.
"""
if ip.version != 6:
return
if ip.ipv4_mapped is not None:
yield ip.ipv4_mapped
if ip.sixtofour is not None:
yield ip.sixtofour
teredo = ip.teredo
if teredo is not None:
# (server_ipv4, client_ipv4)
yield teredo[0]
yield teredo[1]
# NAT64 well-known prefix (64:ff9b::/96): the low 32 bits embed the IPv4.
# The local-use prefix 64:ff9b:1::/48 uses a different (length-dependent)
# embedding per RFC 6052, so it is not decoded here — it is blocked wholesale
# via _BLOCKED_NETWORKS instead.
if ip in ipaddress.ip_network("64:ff9b::/96"):
yield ipaddress.ip_address(int(ip) & 0xFFFFFFFF)
def is_blocked_ip(ip):
"""
Return ``True`` if ``ip`` (an ``ipaddress`` address object) should never be
used as an outbound request target (SSRF protection).
Blocks private, loopback, reserved, link-local, multicast and unspecified
ranges; an explicit deny-list of networks the stdlib misclassifies on some
Python versions; and recurses into IPv4 addresses embedded in IPv6
transition formats. Fails closed: anything it cannot positively clear is
treated as blocked.
"""
if (
ip.is_private
or ip.is_loopback
or ip.is_reserved
or ip.is_link_local
or ip.is_multicast
or ip.is_unspecified
):
return True
if any(ip.version == net.version and ip in net for net in _BLOCKED_NETWORKS):
return True
for embedded in _embedded_ipv4(ip):
if is_blocked_ip(embedded):
return True
return False
def _is_allowed_ip(ip, allowed_ips):
"""Return True if ``ip`` falls inside an operator-trusted allowlist network."""
return bool(allowed_ips) and any(
net.version == ip.version and ip in net for net in allowed_ips
)
def resolve_and_validate(hostname, allowed_ips=None, require_safe=True):
"""
Resolve ``hostname`` and (when ``require_safe``) ensure every resolved
address is a safe outbound target, returning the list of resolved IP
strings (in resolver order, de-duplicated).
The returned list is intended to be *pinned* for the actual connection
(connect to the IP literal so no second DNS lookup occurs), which is what
closes the DNS-rebinding TOCTOU.
Args:
hostname: The hostname (or IP literal) to resolve.
allowed_ips: Optional list of ``ipaddress.ip_network`` objects. IPs
inside these networks are permitted even if otherwise
blocked (operator-trusted internal targets).
require_safe: When ``True`` (default) every resolved IP is checked and a
blocked/internal address raises. When ``False`` the host is
already operator-trusted (e.g. a WEBHOOK_ALLOWED_HOSTS
entry) so the block check is skipped — but resolution still
happens so the connection can be pinned (pinning prevents
rebinding even for trusted hosts).
Returns:
list[str]: The resolved IP addresses to which a connection may be
pinned.
Raises:
ValueError: If the hostname cannot be resolved or (when
``require_safe``) any resolved address is a blocked/internal target not
covered by ``allowed_ips``.
"""
try:
addr_info = socket.getaddrinfo(hostname, None)
except (socket.gaierror, UnicodeError):
# UnicodeError covers IDNA encoding/normalisation failures, which
# getaddrinfo raises before the address lookup for malformed hostnames.
raise ValueError("Hostname could not be resolved")
if not addr_info:
raise ValueError("No IP addresses found for the hostname")
validated = []
for addr in addr_info:
# Strip any IPv6 zone id (e.g. ``fe80::1%eth0``) before parsing.
ip_str = addr[4][0].split("%")[0]
ip = ipaddress.ip_address(ip_str)
if require_safe and not _is_allowed_ip(ip, allowed_ips) and is_blocked_ip(ip):
raise ValueError("Access to private/internal networks is not allowed")
if ip_str not in validated:
validated.append(ip_str)
return validated
def validate_url(url, allowed_ips=None, allowed_hosts=None):
"""
Validate that a URL doesn't resolve to a private/internal IP address (SSRF protection).
Note: this validates at a point in time. To defeat DNS-rebinding (TOCTOU),
the actual request must be pinned to the validated IP — see
``plane.utils.url_security.pinned_fetch``.
Args:
url: The URL to validate.
allowed_ips: Optional list of ipaddress.ip_network objects. IPs falling within
@@ -41,22 +193,7 @@ def validate_url(url, allowed_ips=None, allowed_hosts=None):
}:
return
try:
addr_info = socket.getaddrinfo(hostname, None)
except socket.gaierror:
raise ValueError("Hostname could not be resolved")
if not addr_info:
raise ValueError("No IP addresses found for the hostname")
for addr in addr_info:
ip = ipaddress.ip_address(addr[4][0])
if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
if allowed_ips and any(
network.version == ip.version and ip in network for network in allowed_ips
):
continue
raise ValueError("Access to private/internal networks is not allowed")
resolve_and_validate(hostname, allowed_ips=allowed_ips)
def get_client_ip(request):
+272
View File
@@ -0,0 +1,272 @@
# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.
"""
SSRF-safe outbound HTTP client.
The validators in :mod:`plane.utils.ip_address` resolve a hostname and confirm
that none of its addresses point at internal infrastructure. On their own they
are vulnerable to DNS rebinding (TOCTOU): the validator resolves the name, but
``requests`` resolves it a *second* time when it actually connects, and an
attacker who controls DNS can return a public IP to the validator and an
internal IP to the connection.
``pinned_fetch`` closes that window by resolving + validating once and then
connecting to the *validated IP literal* — urllib3 performs no second DNS
lookup, so the address that was checked is exactly the address that is reached.
The original hostname is still used for the ``Host`` header, TLS SNI and
certificate verification, so virtual-hosting and HTTPS continue to work.
Redirects are never auto-followed (``requests`` would re-resolve each hop and
reopen the rebinding window, and a ``Location`` can point at a new internal
host). ``pinned_fetch_following_redirects`` follows them manually, re-resolving,
re-validating and re-pinning every hop.
"""
# Python imports
import ipaddress
from urllib.parse import unquote, urljoin, urlsplit
# Third party imports
import requests
from requests.adapters import HTTPAdapter
# Module imports
from plane.utils.ip_address import resolve_and_validate
# 3xx status codes that carry a Location we may follow.
_REDIRECT_STATUSES = {301, 302, 303, 307, 308}
# Never route through an ambient proxy — a CONNECT to a proxy would tunnel to
# the original hostname and bypass the IP pinning entirely.
_NO_PROXIES = {"http": None, "https": None}
class PinnedIPAdapter(HTTPAdapter):
"""
A ``requests`` transport adapter that connects to whatever IP literal is in
the request URL while presenting ``server_hostname`` for TLS SNI and
certificate verification.
The IP literal in the URL means urllib3 opens the socket to that exact IP
with no DNS resolution. Injecting ``server_hostname`` (and leaving
``assert_hostname`` at its ``None`` default so ``SSLContext.check_hostname``
stays ``True``) makes the standard library verify the presented certificate
against the real hostname rather than the IP.
Instances hold no global state — one is mounted on a throwaway
:class:`requests.Session` per request, so this is safe under any Celery pool
(prefork / threads / gevent).
"""
def __init__(self, server_hostname, *args, **kwargs):
self._server_hostname = server_hostname
super().__init__(*args, **kwargs)
def get_connection_with_tls_context(self, request, verify, proxies=None, cert=None):
# requests >= 2.32 calls this (it replaced get_connection() as part of
# the CVE-2024-35195 fix). requests is pinned to 2.33 in base.txt.
host_params, pool_kwargs = self.build_connection_pool_key_attributes(
request, verify, cert
)
# server_hostname is a recognised urllib3 SSL pool-key field, so pools
# for different hostnames don't collide.
pool_kwargs["server_hostname"] = self._server_hostname
return self.poolmanager.connection_from_host(**host_params, pool_kwargs=pool_kwargs)
def _split_target(url):
"""Parse a URL into the pieces needed to build a pinned request.
Returns ``(scheme, hostname, port, path, auth)`` where ``auth`` carries any
URL-embedded credentials (``user:pass@host``) as a ``(user, pass)`` tuple so
HTTP Basic Auth still works once the URL is rewritten to an IP literal.
"""
parts = urlsplit(url)
scheme = parts.scheme
if scheme not in ("http", "https"):
raise ValueError("Invalid URL scheme. Only HTTP and HTTPS are allowed")
hostname = parts.hostname
if not hostname:
raise ValueError("Invalid URL: No hostname found")
port = parts.port or (443 if scheme == "https" else 80)
path = parts.path or "/"
if parts.query:
path = f"{path}?{parts.query}"
auth = None
if parts.username is not None or parts.password is not None:
auth = (unquote(parts.username or ""), unquote(parts.password or ""))
return scheme, hostname, port, path, auth
def _request_to_ip(method, scheme, hostname, ip, port, path, *, headers, timeout, auth=None, **kwargs):
"""Issue a single request whose socket is pinned to ``ip``.
With ``stream=True`` the session is kept open until the caller closes the
response (closing the response also closes the session), so a streamed body
can be read with a real size cap; otherwise the session is closed eagerly.
"""
ip_obj = ipaddress.ip_address(ip)
host_for_url = f"[{ip}]" if ip_obj.version == 6 else ip
url = f"{scheme}://{host_for_url}:{port}{path}"
request_headers = dict(headers or {})
default_port = 443 if scheme == "https" else 80
# Host header (and TLS) carry the ORIGINAL hostname, not the IP literal.
# An IPv6-literal hostname must be bracketed in the Host header.
host_label = f"[{hostname}]" if ":" in hostname else hostname
request_headers["Host"] = host_label if port == default_port else f"{host_label}:{port}"
session = requests.Session()
session.trust_env = False # ignore ambient proxy / netrc / env (see _NO_PROXIES)
if scheme == "https":
session.mount("https://", PinnedIPAdapter(server_hostname=hostname))
try:
response = session.request(
method,
url,
headers=request_headers,
timeout=timeout,
allow_redirects=False,
verify=True,
proxies=_NO_PROXIES,
auth=auth,
**kwargs,
)
except BaseException:
session.close()
raise
if kwargs.get("stream"):
# Defer closing the session until the response is closed, so the
# streamed body remains readable. response.close() now also closes
# the session.
_orig_close = response.close
def _close_all(_orig=_orig_close, _sess=session):
try:
_orig()
finally:
_sess.close()
response.close = _close_all
else:
session.close()
return response
def _fetch_validated_hop(method, url, *, allowed_ips, allowed_hosts, headers, timeout, **kwargs):
"""
Resolve ``url``'s host, validate it, then issue a single (non-redirecting)
request pinned to a resolved IP. Returns ``(response, normalized_host)``.
Hosts in ``allowed_hosts`` are operator-trusted (e.g. internal service DNS
whose IPs are dynamic): they skip the private-IP *block* check, but the
connection is STILL pinned to the resolved IP so a trusted hostname cannot
be rebound to a different internal target between validation and connect.
"""
scheme, hostname, port, path, auth = _split_target(url)
normalized_host = hostname.rstrip(".").lower()
trusted = bool(allowed_hosts) and normalized_host in {
(h or "").rstrip(".").lower() for h in allowed_hosts if h
}
# Resolve once (and validate unless the host is operator-trusted), then pin
# the connection to a resolved IP literal — urllib3 performs no second DNS
# lookup, so the address validated here is exactly the one reached.
ips = resolve_and_validate(hostname, allowed_ips=allowed_ips, require_safe=not trusted)
last_exc = None
for ip in ips:
try:
response = _request_to_ip(
method, scheme, hostname, ip, port, path,
headers=headers, timeout=timeout, auth=auth, **kwargs,
)
return response, normalized_host
except requests.RequestException as exc:
# Try the next resolved address (dual-stack / round-robin hosts).
last_exc = exc
if last_exc is not None:
raise last_exc
raise requests.ConnectionError(f"No reachable address for host: {hostname}")
def pinned_fetch(
method,
url,
*,
allowed_ips=None,
allowed_hosts=None,
headers=None,
timeout=30,
**kwargs,
):
"""
SSRF-safe single request. Resolves + validates the target host and pins the
connection to a validated IP (defeating DNS rebinding). Does NOT follow
redirects.
Raises:
ValueError: if the URL is invalid or resolves to a blocked address.
requests.RequestException: on network/transport errors.
"""
response, _ = _fetch_validated_hop(
method, url,
allowed_ips=allowed_ips, allowed_hosts=allowed_hosts,
headers=headers, timeout=timeout, **kwargs,
)
return response
def pinned_fetch_following_redirects(
method,
url,
*,
allowed_ips=None,
allowed_hosts=None,
headers=None,
timeout=30,
max_redirects=5,
**kwargs,
):
"""
SSRF-safe request that follows redirects manually, re-resolving,
re-validating and re-pinning every hop. Returns ``(response, final_url)``.
Raises:
ValueError: if any URL in the chain is invalid or resolves to a blocked
address.
requests.TooManyRedirects: if the hop limit is exceeded.
requests.RequestException: on network/transport errors.
"""
current_url = url
redirects = 0
while True:
response, _ = _fetch_validated_hop(
method, current_url,
allowed_ips=allowed_ips, allowed_hosts=allowed_hosts,
headers=headers, timeout=timeout, **kwargs,
)
if response.status_code not in _REDIRECT_STATUSES:
return response, current_url
location = response.headers.get("Location")
if not location:
return response, current_url
if redirects >= max_redirects:
response.close()
raise requests.TooManyRedirects(
f"Exceeded {max_redirects} redirects for URL: {url}"
)
redirects += 1
# Release the intermediate hop's connection/session before following.
response.close()
# Resolve the redirect target against the current URL; the next loop
# iteration re-validates and re-pins it.
current_url = urljoin(current_url, location)
+3
View File
@@ -56,6 +56,9 @@ lxml==6.1.0
boto3==1.34.96
# http client (pinned to address CVE-2026-44431 and CVE-2026-44432)
urllib3>=2.7.0
# requests — used directly for webhook delivery & link unfurling; pinned to
# >=2.32 for the get_connection_with_tls_context adapter hook (SSRF IP pinning)
requests==2.33.0
# password validator
zxcvbn==4.4.28
# timezone
+1 -2
View File
@@ -8,5 +8,4 @@ pytest-mock==3.11.1
factory-boy==3.3.0
freezegun==1.2.2
coverage==7.2.7
httpx==0.24.1
requests==2.33.0
httpx==0.24.1