1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00
rails--rails/actionpack/lib/action_dispatch/http/content_security_policy.rb
Petrik b588ceb44f Improve Content Security Policy documentation [ci-skip]
- Use "HTTP Content-Security-Policy response header" instead of "Content
  Security Policy", to make it clear the header will be set.
- Instead of having a long list of examples in the guide, add a
  description to each example.

Co-authored-by: Jonathan Hefner <jonathan@hefner.pro>
2022-02-18 10:39:11 +01:00

359 lines
11 KiB
Ruby

# frozen_string_literal: true
require "active_support/core_ext/object/deep_dup"
module ActionDispatch # :nodoc:
# Configures the HTTP
# {Content-Security-Policy}[https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Security-Policy]
# response header to help protect against XSS and injection attacks.
#
# Example global policy:
#
# Rails.application.config.content_security_policy do |policy|
# policy.default_src :self, :https
# policy.font_src :self, :https, :data
# policy.img_src :self, :https, :data
# policy.object_src :none
# policy.script_src :self, :https
# policy.style_src :self, :https
#
# # Specify URI for violation reports
# policy.report_uri "/csp-violation-report-endpoint"
# end
class ContentSecurityPolicy
class Middleware
CONTENT_TYPE = "Content-Type"
POLICY = "Content-Security-Policy"
POLICY_REPORT_ONLY = "Content-Security-Policy-Report-Only"
def initialize(app)
@app = app
end
def call(env)
request = ActionDispatch::Request.new env
_, headers, _ = response = @app.call(env)
return response unless html_response?(headers)
return response if policy_present?(headers)
if policy = request.content_security_policy
nonce = request.content_security_policy_nonce
nonce_directives = request.content_security_policy_nonce_directives
context = request.controller_instance || request
headers[header_name(request)] = policy.build(context, nonce, nonce_directives)
end
response
end
private
def html_response?(headers)
if content_type = headers[CONTENT_TYPE]
/html/.match?(content_type)
end
end
def header_name(request)
if request.content_security_policy_report_only
POLICY_REPORT_ONLY
else
POLICY
end
end
def policy_present?(headers)
headers[POLICY] || headers[POLICY_REPORT_ONLY]
end
end
module Request
POLICY = "action_dispatch.content_security_policy"
POLICY_REPORT_ONLY = "action_dispatch.content_security_policy_report_only"
NONCE_GENERATOR = "action_dispatch.content_security_policy_nonce_generator"
NONCE = "action_dispatch.content_security_policy_nonce"
NONCE_DIRECTIVES = "action_dispatch.content_security_policy_nonce_directives"
def content_security_policy
get_header(POLICY)
end
def content_security_policy=(policy)
set_header(POLICY, policy)
end
def content_security_policy_report_only
get_header(POLICY_REPORT_ONLY)
end
def content_security_policy_report_only=(value)
set_header(POLICY_REPORT_ONLY, value)
end
def content_security_policy_nonce_generator
get_header(NONCE_GENERATOR)
end
def content_security_policy_nonce_generator=(generator)
set_header(NONCE_GENERATOR, generator)
end
def content_security_policy_nonce_directives
get_header(NONCE_DIRECTIVES)
end
def content_security_policy_nonce_directives=(generator)
set_header(NONCE_DIRECTIVES, generator)
end
def content_security_policy_nonce
if content_security_policy_nonce_generator
if nonce = get_header(NONCE)
nonce
else
set_header(NONCE, generate_content_security_policy_nonce)
end
end
end
private
def generate_content_security_policy_nonce
content_security_policy_nonce_generator.call(self)
end
end
MAPPINGS = {
self: "'self'",
unsafe_eval: "'unsafe-eval'",
unsafe_inline: "'unsafe-inline'",
none: "'none'",
http: "http:",
https: "https:",
data: "data:",
mediastream: "mediastream:",
allow_duplicates: "'allow-duplicates'",
blob: "blob:",
filesystem: "filesystem:",
report_sample: "'report-sample'",
script: "'script'",
strict_dynamic: "'strict-dynamic'",
ws: "ws:",
wss: "wss:"
}.freeze
DIRECTIVES = {
base_uri: "base-uri",
child_src: "child-src",
connect_src: "connect-src",
default_src: "default-src",
font_src: "font-src",
form_action: "form-action",
frame_ancestors: "frame-ancestors",
frame_src: "frame-src",
img_src: "img-src",
manifest_src: "manifest-src",
media_src: "media-src",
object_src: "object-src",
prefetch_src: "prefetch-src",
require_trusted_types_for: "require-trusted-types-for",
script_src: "script-src",
script_src_attr: "script-src-attr",
script_src_elem: "script-src-elem",
style_src: "style-src",
style_src_attr: "style-src-attr",
style_src_elem: "style-src-elem",
trusted_types: "trusted-types",
worker_src: "worker-src"
}.freeze
DEFAULT_NONCE_DIRECTIVES = %w[script-src style-src].freeze
private_constant :MAPPINGS, :DIRECTIVES, :DEFAULT_NONCE_DIRECTIVES
attr_reader :directives
def initialize
@directives = {}
yield self if block_given?
end
def initialize_copy(other)
@directives = other.directives.deep_dup
end
DIRECTIVES.each do |name, directive|
define_method(name) do |*sources|
if sources.first
@directives[directive] = apply_mappings(sources)
else
@directives.delete(directive)
end
end
end
# Specify whether to prevent the user agent from loading any assets over
# HTTP when the page uses HTTPS:
#
# policy.block_all_mixed_content
#
# Pass +false+ to allow it again:
#
# policy.block_all_mixed_content false
#
def block_all_mixed_content(enabled = true)
if enabled
@directives["block-all-mixed-content"] = true
else
@directives.delete("block-all-mixed-content")
end
end
# Restricts the set of plugins that can be embedded:
#
# policy.plugin_types "application/x-shockwave-flash"
#
# Leave empty to allow all plugins:
#
# policy.plugin_types
#
def plugin_types(*types)
if types.first
@directives["plugin-types"] = types
else
@directives.delete("plugin-types")
end
end
# Enable the {report-uri}[https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Security-Policy/report-uri]
# directive. Violation reports will be sent to the specified URI:
#
# policy.report_uri "/csp-violation-report-endpoint"
#
def report_uri(uri)
@directives["report-uri"] = [uri]
end
# Specify asset types for which {Subresource Integrity}[https://developer.mozilla.org/en-US/docs/Web/Security/Subresource_Integrity]
# is required:
#
# policy.require_sri_for :script, :style
#
# Leave empty to not require Subresource Integrity:
#
# policy.require_sri_for
#
def require_sri_for(*types)
if types.first
@directives["require-sri-for"] = types
else
@directives.delete("require-sri-for")
end
end
# Specify whether a {sandbox}[https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Security-Policy/sandbox]
# should be enabled for the requested resource:
#
# policy.sandbox
#
# Values can be passed as arguments:
#
# policy.sandbox "allow-scripts", "allow-modals"
#
# Pass +false+ to disable the sandbox:
#
# policy.sandbox false
#
def sandbox(*values)
if values.empty?
@directives["sandbox"] = true
elsif values.first
@directives["sandbox"] = values
else
@directives.delete("sandbox")
end
end
# Specify whether user agents should treat any assets over HTTP as HTTPS:
#
# policy.upgrade_insecure_requests
#
# Pass +false+ to disable it:
#
# policy.upgrade_insecure_requests false
#
def upgrade_insecure_requests(enabled = true)
if enabled
@directives["upgrade-insecure-requests"] = true
else
@directives.delete("upgrade-insecure-requests")
end
end
def build(context = nil, nonce = nil, nonce_directives = nil)
nonce_directives = DEFAULT_NONCE_DIRECTIVES if nonce_directives.nil?
build_directives(context, nonce, nonce_directives).compact.join("; ")
end
private
def apply_mappings(sources)
sources.map do |source|
case source
when Symbol
apply_mapping(source)
when String, Proc
source
else
raise ArgumentError, "Invalid content security policy source: #{source.inspect}"
end
end
end
def apply_mapping(source)
MAPPINGS.fetch(source) do
raise ArgumentError, "Unknown content security policy source mapping: #{source.inspect}"
end
end
def build_directives(context, nonce, nonce_directives)
@directives.map do |directive, sources|
if sources.is_a?(Array)
if nonce && nonce_directive?(directive, nonce_directives)
"#{directive} #{build_directive(sources, context).join(' ')} 'nonce-#{nonce}'"
else
"#{directive} #{build_directive(sources, context).join(' ')}"
end
elsif sources
directive
else
nil
end
end
end
def build_directive(sources, context)
sources.map { |source| resolve_source(source, context) }
end
def resolve_source(source, context)
case source
when String
source
when Symbol
source.to_s
when Proc
if context.nil?
raise RuntimeError, "Missing context for the dynamic content security policy source: #{source.inspect}"
else
resolved = context.instance_exec(&source)
resolved.is_a?(Symbol) ? apply_mapping(resolved) : resolved
end
else
raise RuntimeError, "Unexpected content security policy source: #{source.inspect}"
end
end
def nonce_directive?(directive, nonce_directives)
nonce_directives.include?(directive)
end
end
end