loading
Generated 2026-02-03T22:16:23+00:00

All Files ( 96.78% covered at 220.11 hits/line )

19 files in total.
901 relevant lines, 872 lines covered and 29 lines missed. ( 96.78% )
File % covered Lines Relevant Lines Lines covered Lines missed Avg. Hits / Line
lib/tobox.rb 100.00 % 68 27 27 0 98.59
lib/tobox/application.rb 100.00 % 40 22 22 0 107.95
lib/tobox/configuration.rb 100.00 % 200 107 107 0 629.26
lib/tobox/fetcher.rb 100.00 % 228 120 120 0 719.83
lib/tobox/plugins/datadog.rb 95.31 % 125 64 61 3 9.84
lib/tobox/plugins/datadog/configuration.rb 67.44 % 72 43 29 14 3.37
lib/tobox/plugins/datadog/integration.rb 100.00 % 39 19 19 0 16.05
lib/tobox/plugins/datadog/patcher.rb 100.00 % 26 11 11 0 5.45
lib/tobox/plugins/event_grouping.rb 100.00 % 53 25 25 0 37.72
lib/tobox/plugins/inbox.rb 86.11 % 69 36 31 5 43.75
lib/tobox/plugins/pg_notify.rb 100.00 % 85 45 45 0 8.69
lib/tobox/plugins/progress.rb 100.00 % 77 37 37 0 161.51
lib/tobox/plugins/sentry.rb 100.00 % 170 82 82 0 20.61
lib/tobox/plugins/stats.rb 96.10 % 149 77 74 3 22.73
lib/tobox/plugins/zeitwerk.rb 96.55 % 52 29 28 1 7.93
lib/tobox/pool.rb 100.00 % 52 27 27 0 160.67
lib/tobox/pool/fiber_pool.rb 94.23 % 98 52 49 3 30.25
lib/tobox/pool/threaded_pool.rb 100.00 % 79 44 44 0 172.36
lib/tobox/worker.rb 100.00 % 65 34 34 0 364.06

lib/tobox.rb

100.0% lines covered

27 relevant lines. 27 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 38 require "sequel"
  3. 38 require_relative "tobox/version"
  4. 38 module Tobox
  5. 38 class Error < StandardError; end
  6. 38 EMPTY = [].freeze
  7. 38 module Plugins
  8. 38 PLUGINS_MUTEX = Thread::Mutex.new
  9. 38 @plugins = {}
  10. # Loads a plugin based on a name. If the plugin hasn't been loaded, tries to load
  11. # it from the load path under "httpx/plugins/" directory.
  12. #
  13. 38 def self.load_plugin(name)
  14. 238 h = @plugins
  15. 476 unless (plugin = PLUGINS_MUTEX.synchronize { h[name] })
  16. 108 require "tobox/plugins/#{name}"
  17. 216 raise "Plugin #{name} hasn't been registered" unless (plugin = PLUGINS_MUTEX.synchronize { h[name] })
  18. end
  19. 238 plugin
  20. end
  21. # Registers a plugin (+mod+) in the central store indexed by +name+.
  22. #
  23. 38 def self.register_plugin(name, mod)
  24. 108 h = @plugins
  25. 208 PLUGINS_MUTEX.synchronize { h[name] = mod }
  26. end
  27. end
  28. # when using batch sizes higher than 1, this method can be used to signal multiple errors
  29. # for a subset of the events which may have failed processing; these events are identified
  30. # by the index inside the batch.
  31. #
  32. # on(:event_type) do |*events|
  33. # successful, failed = handle_event_batch(events)
  34. #
  35. # deal_with_success(successful)
  36. #
  37. # batch_errors = failed.to_h do |failed_event|
  38. # [
  39. # events.index(failed_event),
  40. # MyException.new("failed handling process batch")
  41. # ]
  42. # end
  43. #
  44. # Tobox.raise_batch_error(batch_errors)
  45. # end
  46. 38 def self.raise_batch_errors(batch_errors)
  47. 132 batch_errors = Hash.try_convert(batch_errors)
  48. 198 unless batch_errors && batch_errors.all? { |k, v| k.is_a?(Integer) && v.is_a?(Exception) }
  49. 66 raise Error, "batch errors must be an array of index-to-exception tuples"
  50. end
  51. 66 throw(:tobox_batch_errors, batch_errors)
  52. end
  53. end
  54. 38 require_relative "tobox/fetcher"
  55. 38 require_relative "tobox/worker"
  56. 38 require_relative "tobox/pool"
  57. 38 require_relative "tobox/application"
  58. 38 require_relative "tobox/configuration"

lib/tobox/application.rb

100.0% lines covered

22 relevant lines. 22 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 38 module Tobox
  3. 38 class Application
  4. 38 def initialize(configuration)
  5. 265 @configuration = configuration
  6. 265 @running = false
  7. 265 @on_start_handlers = Array(configuration.lifecycle_events[:on_start])
  8. 265 @on_stop_handlers = Array(configuration.lifecycle_events[:on_stop])
  9. 265 worker = configuration[:worker]
  10. 265 @pool = case worker
  11. 165 when :thread then ThreadedPool
  12. 67 when :fiber then FiberPool
  13. 33 else worker
  14. end.new(configuration)
  15. end
  16. 38 def start
  17. 66 return if @running
  18. 33 @on_start_handlers.each(&:call)
  19. 33 @pool.start
  20. 33 @running = true
  21. end
  22. 38 def stop
  23. 66 return unless @running
  24. 33 @on_stop_handlers.each(&:call)
  25. 33 @pool.stop
  26. 33 @running = false
  27. end
  28. end
  29. end

lib/tobox/configuration.rb

100.0% lines covered

107 relevant lines. 107 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 38 require "logger"
  3. 38 require "forwardable"
  4. 38 module Tobox
  5. 38 class Configuration
  6. 38 extend Forwardable
  7. 38 attr_reader :plugins, :handlers, :lifecycle_events, :arguments_handler, :default_logger, :database,
  8. :fetcher_class, :worker_class,
  9. :config
  10. 38 def_delegator :@config, :[]
  11. 7 DEFAULT_CONFIGURATION = {
  12. 31 environment: ENV.fetch("APP_ENV", "development"),
  13. logger: nil,
  14. log_level: nil,
  15. database_uri: nil,
  16. database_options: nil,
  17. table: :outbox,
  18. visibility_column: :run_at,
  19. attempts_column: :attempts,
  20. created_at_column: nil,
  21. batch_size: 1,
  22. max_attempts: 10,
  23. exponential_retry_factor: 2,
  24. wait_for_events_delay: 5,
  25. shutdown_timeout: 10,
  26. grace_shutdown_timeout: 5,
  27. concurrency: 4, # TODO: CPU count
  28. worker: :thread
  29. }.freeze
  30. 38 LOG_FORMAT_PATTERN = "%s, [%s #%d (th: %s)] %5s -- %s: %s\n"
  31. 38 DEFAULT_LOG_FORMATTER = Class.new(Logger::Formatter) do
  32. 38 def call(severity, time, progname, msg)
  33. 1651 format(LOG_FORMAT_PATTERN, severity[0, 1], format_datetime(time), Process.pid,
  34. Thread.current.name || Thread.current.object_id, severity, progname, msg2str(msg))
  35. end
  36. end.new
  37. 38 def initialize(name = nil, &block)
  38. 1183 @name = name
  39. 1183 @config = DEFAULT_CONFIGURATION.dup
  40. 1183 @lifecycle_events = {}
  41. 1183 @handlers = {}
  42. 1183 @message_to_arguments = nil
  43. 1183 @plugins = []
  44. 1183 @fetcher_class = Class.new(Fetcher)
  45. 1183 @worker_class = Class.new(Worker)
  46. 1183 if block
  47. 1084 case block.arity
  48. when 0
  49. 190 instance_exec(&block)
  50. when 1
  51. 960 yield(self)
  52. else
  53. 33 raise Error, "configuration does not support blocks with more than one variable"
  54. end
  55. end
  56. 1117 env = @config[:environment]
  57. 1117 @default_logger = @config[:logger] || Logger.new(STDERR, formatter: DEFAULT_LOG_FORMATTER) # rubocop:disable Style/GlobalStdStream
  58. 1117 @default_logger.level = @config[:log_level] || (env == "production" ? Logger::INFO : Logger::DEBUG)
  59. 1117 @database = if @config[:database_uri]
  60. 58 database_opts = @config[:database_options] || {}
  61. 58 database_opts[:max_connections] ||= (@config[:concurrency] if @config[:worker] == :thread)
  62. 58 db = Sequel.connect(@config[:database_uri].to_s, database_opts)
  63. 87 Array(@lifecycle_events[:database_connect]).each { |cb| cb.call(db) }
  64. 58 db
  65. else
  66. 1059 Sequel::DATABASES.first
  67. end
  68. 1117 raise Error, "no database found" unless @database
  69. 1117 if @database.frozen?
  70. 1021 raise "#{@database} must have the :date_arithmetic extension loaded" unless Sequel.respond_to?(:date_add)
  71. else
  72. 96 @database.extension :date_arithmetic
  73. 96 @database.loggers << @default_logger unless @config[:environment] == "production"
  74. end
  75. 1117 freeze
  76. end
  77. 38 def on(*event_types, &callback)
  78. 330 callback_events = (@handlers[callback] ||= [])
  79. 330 event_types.each do |event_type|
  80. 396 callback_events << event_type.to_sym
  81. end
  82. 330 self
  83. end
  84. 38 def on_start(&callback)
  85. 58 (@lifecycle_events[:on_start] ||= []) << callback
  86. 58 self
  87. end
  88. 38 def on_stop(&callback)
  89. 33 (@lifecycle_events[:on_stop] ||= []) << callback
  90. 33 self
  91. end
  92. 38 def on_before_event(&callback)
  93. 81 (@lifecycle_events[:before_event] ||= []) << callback
  94. 81 self
  95. end
  96. 38 def on_after_event(&callback)
  97. 89 (@lifecycle_events[:after_event] ||= []) << callback
  98. 89 self
  99. end
  100. 38 def on_error_event(&callback)
  101. 205 (@lifecycle_events[:error_event] ||= []) << callback
  102. 205 self
  103. end
  104. 38 def on_start_worker(&callback)
  105. 8 (@lifecycle_events[:start_worker] ||= []) << callback
  106. 8 self
  107. end
  108. 38 def on_error_worker(&callback)
  109. 30 (@lifecycle_events[:error_worker] ||= []) << callback
  110. 30 self
  111. end
  112. 38 def on_database_connect(&callback)
  113. 29 (@lifecycle_events[:database_connect] ||= []) << callback
  114. 29 self
  115. end
  116. 38 def message_to_arguments(&callback)
  117. 33 @arguments_handler = callback
  118. 33 self
  119. end
  120. 38 def visibility_type_bool?
  121. 2157 _, visibility_info = @database.schema(@config[:table]).find do |column, _|
  122. 17541 column == @config[:visibility_column]
  123. end
  124. 2157 raise Error, "a visibility column is required" unless visibility_info
  125. 2157 visibility_info[:type] == :boolean
  126. end
  127. 38 def plugin(plugin, **options, &block)
  128. 238 raise Error, "Cannot add a plugin to a frozen config" if frozen?
  129. 238 plugin = Plugins.load_plugin(plugin) if plugin.is_a?(Symbol)
  130. 238 return if @plugins.include?(plugin)
  131. 238 @plugins << plugin
  132. 238 plugin.load_dependencies(self, **options, &block) if plugin.respond_to?(:load_dependencies)
  133. 238 extend(plugin::ConfigurationMethods) if defined?(plugin::ConfigurationMethods)
  134. 238 @fetcher_class.__send__(:include, plugin::FetcherMethods) if defined?(plugin::FetcherMethods)
  135. 238 @worker_class.__send__(:include, plugin::WorkerMethods) if defined?(plugin::WorkerMethods)
  136. 238 plugin.configure(self, **options, &block) if plugin.respond_to?(:configure)
  137. end
  138. 38 def freeze
  139. 1117 @name.freeze
  140. 1117 @config.each_value(&:freeze).freeze
  141. 1117 @handlers.each_value(&:freeze).freeze
  142. 1117 @lifecycle_events.each_value(&:freeze).freeze
  143. 1117 @plugins.freeze
  144. 1117 @database.freeze
  145. 1117 super
  146. end
  147. 38 private
  148. 38 def method_missing(meth, *args, &block)
  149. 2422 if @config.key?(meth) && args.size == 1
  150. 2127 @config[meth] = args.first
  151. 95 elsif /\Aon_(.*)\z/.match(meth) && args.empty?
  152. 66 on(Regexp.last_match(1).to_sym, &block)
  153. else
  154. 33 super
  155. end
  156. end
  157. 38 def respond_to_missing?(meth, *args)
  158. 58 super ||
  159. @config.key?(meth) ||
  160. /\Aon_(.*)\z/.match(meth)
  161. end
  162. end
  163. end

lib/tobox/fetcher.rb

100.0% lines covered

120 relevant lines. 120 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 38 require "json"
  3. 38 module Tobox
  4. 38 class Fetcher
  5. 38 def initialize(label, configuration)
  6. 1357 @label = label
  7. 1357 @configuration = configuration
  8. 1357 @logger = @configuration.default_logger
  9. 1357 @db = configuration.database
  10. 1357 @table = configuration[:table]
  11. 1357 @exponential_retry_factor = configuration[:exponential_retry_factor]
  12. 1357 max_attempts = configuration[:max_attempts]
  13. 1357 @ds = @db[@table]
  14. 1357 @visibility_column = configuration[:visibility_column]
  15. 1357 @attempts_column = configuration[:attempts_column]
  16. 1357 @pick_next_sql = @ds
  17. 1357 if @attempts_column
  18. # filter out exhausted attempts
  19. 1282 @pick_next_sql = @pick_next_sql.where(Sequel[@table][@attempts_column] < max_attempts)
  20. end
  21. 1357 if configuration.visibility_type_bool?
  22. 75 @pick_next_sql = @pick_next_sql.where(@visibility_column => false).order(:id)
  23. else
  24. 225 visibility_conds = [
  25. 1057 { Sequel[@table][@visibility_column] => nil },
  26. 1282 (Sequel.expr(Sequel[@table][@visibility_column]) < Sequel::CURRENT_TIMESTAMP)
  27. 1282 ].reduce { |agg, cond| Sequel.expr(agg) | Sequel.expr(cond) }
  28. 1282 @pick_next_sql = @pick_next_sql.where(visibility_conds)
  29. .order(Sequel.desc(@visibility_column, nulls: :first), :id)
  30. end
  31. 1357 @batch_size = configuration[:batch_size]
  32. 1357 @before_event_handlers = Array(@configuration.lifecycle_events[:before_event])
  33. 1357 @after_event_handlers = Array(@configuration.lifecycle_events[:after_event])
  34. 1357 @error_event_handlers = Array(@configuration.lifecycle_events[:error_event])
  35. end
  36. 38 def fetch_events(&blk)
  37. 1960 num_events = 0
  38. 1960 events_tr do
  39. 1960 events = nil
  40. # @type var events: Array[event]?
  41. 1960 event_id_tr do
  42. 1960 events = do_fetch_events
  43. end
  44. 1959 if events && !events.empty?
  45. 961 with_events(events) do |events|
  46. 961 num_events = events.size
  47. 961 prepare_events(events, &blk)
  48. end
  49. end
  50. end
  51. 1940 num_events
  52. end
  53. 38 private
  54. 38 def prepare_events(events)
  55. 928 prepared_events = events.map do |event|
  56. 1027 event[:metadata] = try_json_parse(event[:metadata]) if event[:metadata]
  57. 1027 handle_before_event(event)
  58. 1027 to_message(event)
  59. end
  60. 928 yield(prepared_events)
  61. end
  62. 38 def fetch_event_ids
  63. 1888 ds = @pick_next_sql
  64. 1888 ds = ds.for_update.skip_locked if ds.supports_skip_locked?
  65. 1888 ds.limit(@batch_size).select(:id) # lock starts here
  66. end
  67. 38 def do_fetch_events
  68. 1498 @ds.where(id: fetch_event_ids).all
  69. end
  70. 38 def events_tr(&block)
  71. 1498 @db.transaction(savepoint: false, &block)
  72. end
  73. 38 def event_id_tr
  74. 1498 yield
  75. end
  76. 38 def with_events(events, &blk)
  77. 961 yield_events(events, &blk)
  78. 961 events.each do |event|
  79. 1060 event_error = event[:error]
  80. 1060 if event_error
  81. 261 event.merge!(mark_as_error(event, event_error))
  82. 261 handle_error_event(event, event_error)
  83. else
  84. 799 handle_after_event(event)
  85. end
  86. end
  87. end
  88. 38 def yield_events(events)
  89. 961 unless events.empty?
  90. 961 errors_by_id = catch(:tobox_batch_errors) do
  91. 961 yield events
  92. 700 nil
  93. end
  94. # some events from batch errored
  95. 733 if errors_by_id
  96. 33 failed = events.values_at(*errors_by_id.keys)
  97. 33 successful = events - failed
  98. # fill in with batch error
  99. 33 failed.each do |ev|
  100. 30 ev[:error] = errors_by_id[events.index(ev)]
  101. end
  102. # delete successful
  103. 66 @ds.where(id: successful.map { |ev| ev[:id] }).delete unless successful.empty?
  104. else
  105. 1466 @ds.where(id: events.map { |ev| ev[:id] }).delete
  106. end
  107. end
  108. rescue StandardError => e
  109. 228 events.each do |event|
  110. 210 event[:error] = e
  111. end
  112. end
  113. 38 def log_message(msg, event)
  114. 120 tags = { type: event[:type], attempts: event[@attempts_column] }.compact
  115. 120 "(worker: #{@label}) -> outbox event " \
  116. 240 "(#{tags.map { |*pair| pair.join(": ") }.join(", ")}) #{msg}"
  117. end
  118. 38 def mark_as_error(event, error)
  119. # @type var update_params: Hash[Symbol, untyped]
  120. 49 update_params = {
  121. 212 last_error: error.full_message(highlight: false)
  122. }
  123. 261 update_params[@attempts_column] = Sequel[@table][@attempts_column] + 1 if @attempts_column
  124. 240 update_params[@visibility_column] = if @configuration.visibility_type_bool?
  125. 66 false
  126. else
  127. 195 calculate_event_retry_interval(event[@attempts_column])
  128. end
  129. 261 set_event_retry_attempts(event, update_params)
  130. end
  131. 38 def set_event_retry_attempts(event, update_params)
  132. 261 ds = @ds.where(id: event[:id])
  133. 261 if @ds.supports_returning?(:update)
  134. 198 ds.returning.update(update_params).first
  135. else
  136. 63 ds.update(update_params)
  137. 63 ds.first
  138. end
  139. end
  140. 38 def calculate_event_retry_interval(attempts)
  141. # Sequel.date_add(Sequel::CURRENT_TIMESTAMP,
  142. # seconds: Sequel.function(:POWER, Sequel[@table][:attempts] + 1, 4)
  143. 195 Sequel.date_add(Sequel::CURRENT_TIMESTAMP, seconds: @exponential_retry_factor**attempts)
  144. end
  145. 38 def to_message(event)
  146. 86 {
  147. 937 id: event[:id],
  148. type: event[:type],
  149. 1027 before: (try_json_parse(event[:data_before]) if event[:data_before]),
  150. 1027 after: (try_json_parse(event[:data_after]) if event[:data_after]),
  151. at: event[:created_at]
  152. }
  153. end
  154. 38 def try_json_parse(data)
  155. 1070 data = JSON.parse(data.to_s) unless data.respond_to?(:to_hash)
  156. 1070 data
  157. end
  158. 38 def handle_before_event(event)
  159. 1027 @logger.debug do
  160. 60 log_message("starting...", event)
  161. end
  162. 1027 @before_event_handlers.each do |hd|
  163. 48 hd.call(event)
  164. end
  165. end
  166. 38 def handle_after_event(event)
  167. 829 @logger.debug { log_message("completed", event) }
  168. 799 @after_event_handlers.each do |hd|
  169. 52 hd.call(event)
  170. end
  171. end
  172. 38 def handle_error_event(event, error)
  173. 261 @logger.error do
  174. 30 log_message("failed with error\n" \
  175. "#{error.class}: #{error.message}\n" \
  176. "#{error.backtrace.join("\n")}", event)
  177. end
  178. 261 @error_event_handlers.each do |hd|
  179. 119 hd.call(event, error)
  180. end
  181. end
  182. end
  183. end

lib/tobox/plugins/datadog.rb

95.31% lines covered

64 relevant lines. 61 lines covered and 3 lines missed.
    
  1. # frozen_string_literal: true
  2. 5 require_relative "datadog/configuration"
  3. 5 require_relative "datadog/integration"
  4. 5 require_relative "datadog/patcher"
  5. 5 module Tobox
  6. 5 module Plugins
  7. 5 module Datadog
  8. 5 class EventHandler
  9. 5 DATADOG_VERSION = defined?(DDTrace) ? DDTrace::VERSION : ::Datadog::VERSION
  10. 5 def initialize(config)
  11. 15 @config = config
  12. 15 @db_table = @config[:table]
  13. end
  14. 5 def on_start(event)
  15. 15 datadog_config = ::Datadog.configuration.tracing[:tobox]
  16. 15 service = datadog_config[:service_name]
  17. 15 error_handler = datadog_config[:error_handler]
  18. 15 analytics_enabled = datadog_config[:analytics_enabled]
  19. 15 analytics_sample_rate = datadog_config[:analytics_sample_rate]
  20. 15 distributed_tracing = datadog_config[:distributed_tracing]
  21. 15 resource = event[:type]
  22. 15 if (metadata = event[:metadata])
  23. 5 previous_span = metadata["datadog-parent-id"]
  24. 5 if distributed_tracing && previous_span
  25. 5 trace_digest = ::Datadog::Tracing::TraceDigest.new(
  26. span_id: previous_span,
  27. trace_id: event[:metadata]["datadog-trace-id"],
  28. trace_sampling_priority: event[:metadata]["datadog-sampling-priority"],
  29. trace_origin: event[:metadata]["datadog-origin"]
  30. )
  31. 5 ::Datadog::Tracing.continue_trace!(trace_digest)
  32. end
  33. end
  34. 15 span = create_span(service, error_handler)
  35. 15 span.resource = resource
  36. 15 span.set_tag(::Datadog::Tracing::Metadata::Ext::TAG_COMPONENT, "tobox")
  37. 15 span.set_tag(::Datadog::Tracing::Metadata::Ext::TAG_OPERATION, "event")
  38. 15 if ::Datadog::Tracing::Contrib::Analytics.enabled?(analytics_enabled)
  39. ::Datadog::Tracing::Contrib::Analytics.set_sample_rate(span, analytics_sample_rate)
  40. end
  41. # Measure service stats
  42. 15 ::Datadog::Tracing::Contrib::Analytics.set_measured(span)
  43. 15 span.set_tag("tobox.event.id", event[:id])
  44. 15 span.set_tag("tobox.event.type", event[:type])
  45. 15 span.set_tag("tobox.event.retry", event[@attempts_column]) if @attempts_column
  46. 15 span.set_tag("tobox.event.table", @db_table)
  47. 15 span.set_tag("tobox.event.delay", (Time.now.utc - event[:created_at]).to_f)
  48. 15 event[:__tobox_event_span] = span
  49. end
  50. 5 def on_finish(event)
  51. 10 span = event[:__tobox_event_span]
  52. 10 return unless span
  53. 10 span.finish
  54. end
  55. 5 def on_error(event, error)
  56. 5 span = event[:__tobox_event_span]
  57. 5 return unless span
  58. 5 span.set_error(error)
  59. 5 span.finish
  60. end
  61. 5 private
  62. 5 if Gem::Version.new(DATADOG_VERSION::STRING) >= Gem::Version.new("2.0.0")
  63. 5 def create_span(service, error_handler)
  64. 15 ::Datadog::Tracing.trace(
  65. "tobox.event",
  66. service: service,
  67. type: ::Datadog::Tracing::Metadata::Ext::AppTypes::TYPE_WORKER,
  68. on_error: error_handler
  69. )
  70. end
  71. else
  72. def create_span(service, error_handler)
  73. ::Datadog::Tracing.trace(
  74. "tobox.event",
  75. service: service,
  76. span_type: ::Datadog::Tracing::Metadata::Ext::AppTypes::TYPE_WORKER,
  77. on_error: error_handler
  78. )
  79. end
  80. end
  81. end
  82. 5 class << self
  83. 5 def load_dependencies(*)
  84. 15 require "uri"
  85. end
  86. 5 def configure(config, **datadog_options, &blk)
  87. 15 event_handler = EventHandler.new(config)
  88. 15 config.on_before_event(&event_handler.method(:on_start))
  89. 15 config.on_after_event(&event_handler.method(:on_finish))
  90. 15 config.on_error_event(&event_handler.method(:on_error))
  91. 15 ::Datadog.configure do |c|
  92. 15 c.tracing.instrument :tobox, datadog_options
  93. 15 yield(c) if blk
  94. end
  95. end
  96. end
  97. end
  98. 5 register_plugin :datadog, Datadog
  99. end
  100. end

lib/tobox/plugins/datadog/configuration.rb

67.44% lines covered

43 relevant lines. 29 lines covered and 14 lines missed.
    
  1. # frozen_string_literal: true
  2. 5 require "datadog/tracing/contrib"
  3. 5 require "datadog/tracing/contrib/configuration/settings"
  4. 5 require "datadog/tracing/span_operation"
  5. 5 module Datadog
  6. 5 module Tracing
  7. 5 module Contrib
  8. 5 module Tobox
  9. 5 module Configuration
  10. 5 class Settings < Contrib::Configuration::Settings
  11. 5 DATADOG_VERSION = defined?(DDTrace) ? DDTrace::VERSION : Datadog::VERSION
  12. 5 if Gem::Version.new(DATADOG_VERSION::STRING) >= Gem::Version.new("1.13.0")
  13. 5 option :enabled do |o|
  14. 5 o.type :bool
  15. 5 o.env "DD_TOBOX_ENABLED"
  16. 5 o.default true
  17. end
  18. 5 option :analytics_enabled do |o|
  19. 5 o.type :bool
  20. 5 o.env "DD_TOBOX_ANALYTICS_ENABLED"
  21. 5 o.default false
  22. end
  23. 5 option :analytics_sample_rate do |o|
  24. 5 o.type :float
  25. 5 o.env "DD_TRACE_TOBOX_ANALYTICS_SAMPLE_RATE"
  26. 5 o.default 1.0
  27. end
  28. else
  29. option :enabled do |o|
  30. o.default { env_to_bool("DD_TOBOX_ENABLED", true) }
  31. o.lazy
  32. end
  33. option :analytics_enabled do |o|
  34. o.default { env_to_bool("DD_TOBOX_ANALYTICS_ENABLED", false) }
  35. o.lazy
  36. end
  37. option :analytics_sample_rate do |o|
  38. o.default { env_to_float("DD_TRACE_TOBOX_ANALYTICS_SAMPLE_RATE", 1.0) }
  39. o.lazy
  40. end
  41. end
  42. 5 option :service_name
  43. 5 if DATADOG_VERSION::STRING >= "1.15.0"
  44. 5 option :error_handler do |o|
  45. 5 o.type :proc
  46. 5 o.default_proc(&Tracing::SpanOperation::Events::DEFAULT_ON_ERROR)
  47. end
  48. elsif DATADOG_VERSION::STRING >= "1.13.0"
  49. option :error_handler do |o|
  50. o.type :proc
  51. o.experimental_default_proc(&Tracing::SpanOperation::Events::DEFAULT_ON_ERROR)
  52. end
  53. else
  54. option :error_handler, default: Tracing::SpanOperation::Events::DEFAULT_ON_ERROR
  55. end
  56. 5 option :distributed_tracing, default: false
  57. end
  58. end
  59. end
  60. end
  61. end
  62. end

lib/tobox/plugins/datadog/integration.rb

100.0% lines covered

19 relevant lines. 19 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 5 require "datadog/tracing/contrib/integration"
  3. 5 module Datadog
  4. 5 module Tracing
  5. 5 module Contrib
  6. 5 module Tobox
  7. 5 class Integration
  8. 5 include Contrib::Integration
  9. 5 MINIMUM_VERSION = Gem::Version.new("0.1.0")
  10. 5 register_as :tobox
  11. 5 def self.version
  12. 100 Gem.loaded_specs["tobox"] && Gem.loaded_specs["tobox"].version
  13. end
  14. 5 def self.loaded?
  15. 30 !defined?(::Tobox).nil?
  16. end
  17. 5 def self.compatible?
  18. 30 super && version >= MINIMUM_VERSION
  19. end
  20. 5 def new_configuration
  21. 15 Configuration::Settings.new
  22. end
  23. 5 def patcher
  24. 60 Patcher
  25. end
  26. end
  27. end
  28. end
  29. end
  30. end

lib/tobox/plugins/datadog/patcher.rb

100.0% lines covered

11 relevant lines. 11 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 5 require "datadog/tracing/contrib/patcher"
  3. 5 module Datadog
  4. 5 module Tracing
  5. 5 module Contrib
  6. 5 module Tobox
  7. 5 module Patcher
  8. 5 include Contrib::Patcher
  9. 5 module_function
  10. 5 def target_version
  11. 10 Integration.version
  12. end
  13. 5 def patch
  14. # server-patches provided by plugin(:sidekiq)
  15. # TODO: use this once we have a producer side
  16. end
  17. end
  18. end
  19. end
  20. end
  21. end

lib/tobox/plugins/event_grouping.rb

100.0% lines covered

25 relevant lines. 25 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 9 module Tobox
  3. 9 module Plugins
  4. 9 module EventGrouping
  5. 9 def self.configure(conf)
  6. 16 conf.config[:group_column] = :group_id
  7. end
  8. 9 module FetcherMethods
  9. 9 def initialize(_, configuration)
  10. 18 super
  11. 18 @group_column = configuration[:group_column]
  12. end
  13. 9 private
  14. 9 def fetch_event_ids
  15. 72 group = @pick_next_sql
  16. 72 group = group.for_update.skip_locked if group.supports_skip_locked?
  17. 72 group = group.limit(1).select(@group_column)
  18. # get total from a group, to compare to the number of future locked rows.
  19. 72 total_from_group = @ds.where(@group_column => group).count
  20. 72 event_ids = @ds.where(@group_column => group)
  21. 72 event_ids = if @configuration.visibility_type_bool?
  22. 36 event_ids.order(:id)
  23. else
  24. 36 event_ids.order(Sequel.desc(@visibility_column, nulls: :first), :id)
  25. end
  26. 72 event_ids = event_ids.for_update.skip_locked if event_ids.supports_skip_locked?
  27. 72 event_ids = event_ids.select_map(:id)
  28. 72 if event_ids.size != total_from_group
  29. # this happens if concurrent workers locked different rows from the same group,
  30. # or when new rows from a given group have been inserted after the lock has been
  31. # acquired
  32. 18 event_ids = []
  33. end
  34. # lock all
  35. 72 event_ids.first(@batch_size)
  36. end
  37. end
  38. end
  39. 9 register_plugin :event_grouping, EventGrouping
  40. end
  41. end

lib/tobox/plugins/inbox.rb

86.11% lines covered

36 relevant lines. 31 lines covered and 5 lines missed.
    
  1. # frozen_string_literal: true
  2. 33 module Tobox
  3. 33 module Plugins
  4. 33 module Inbox
  5. 33 def self.configure(conf)
  6. 30 conf.config[:inbox_table] = :inbox
  7. 30 conf.config[:inbox_column] = :unique_id
  8. end
  9. 33 module FetcherMethods
  10. 33 def initialize(_, configuration)
  11. 33 super
  12. 33 inbox_table = configuration[:inbox_table]
  13. 33 @inbox_ds = @db[inbox_table]
  14. 33 @inbox_column = configuration[:inbox_column]
  15. end
  16. 33 private
  17. 33 def prepare_events(events)
  18. 99 try_insert_inbox(events) do |deduped_events|
  19. 66 super(deduped_events)
  20. end
  21. end
  22. 33 def try_insert_inbox(events)
  23. 99 inboxed = nil
  24. 99 if @inbox_ds.respond_to?(:supports_insert_conflict?) && @inbox_ds.supports_insert_conflict?
  25. 27 if @inbox_ds.supports_returning?(:insert)
  26. 27 inboxed = @inbox_ds.insert_conflict
  27. .returning(@inbox_column)
  28. 27 .multi_insert(events.map { |event| { @inbox_column => event[@inbox_column] } })
  29. else
  30. ret = @inbox_ds.insert_conflict.multi_insert(events.map do |event|
  31. { @inbox_column => event[@inbox_column] }
  32. end)
  33. if ret
  34. inboxed = @inbox_ds.where(@inbox_column => events.map do |ev|
  35. ev[@inbox_column]
  36. end).select_map(@inbox_column)
  37. end
  38. end
  39. else
  40. 72 inboxed = []
  41. 72 events.each do |event|
  42. 72 @inbox_ds.insert(@inbox_column => event[@inbox_column])
  43. 48 inboxed << event[@inbox_column]
  44. rescue Sequel::UniqueConstraintViolation
  45. # ignore
  46. end
  47. 72 return events if inboxed.empty?
  48. end
  49. 75 return events if inboxed && inboxed.empty?
  50. 132 yield events.select { |ev| inboxed.include?(ev[@inbox_column]) }
  51. 66 events
  52. end
  53. end
  54. end
  55. 33 register_plugin :inbox, Inbox
  56. end
  57. end

lib/tobox/plugins/pg_notify.rb

100.0% lines covered

45 relevant lines. 45 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 8 module Tobox
  3. 8 module Plugins
  4. 8 module PgNotify
  5. 8 class Notifier
  6. 8 def initialize(config)
  7. 8 @config = config
  8. 8 @running = false
  9. 8 @notifier_mutex = Thread::Mutex.new
  10. 8 @notifier_semaphore = Thread::ConditionVariable.new
  11. end
  12. 8 def start
  13. 8 return if @running
  14. 8 config = @config
  15. 8 @db = Sequel.connect(config.database.opts.merge(max_connections: 1))
  16. 8 raise Error, "this plugin only works with postgresql" unless @db.database_type == :postgres
  17. 8 @db.loggers = config.database.loggers
  18. 8 Array(config.lifecycle_events[:database_connect]).each { |cb| cb.call(@db) }
  19. 8 channel = config[:notifier_channel]
  20. 8 @th = Thread.start do
  21. 8 Thread.current.name = "outbox-notifier"
  22. 8 @db.listen(channel, loop: true) do
  23. 8 signal
  24. end
  25. end
  26. 8 @running = true
  27. end
  28. 8 def stop
  29. 8 return unless @running
  30. 8 @th.terminate
  31. 8 @db.disconnect
  32. 8 @running = false
  33. end
  34. 8 def wait
  35. 16 @notifier_mutex.synchronize do
  36. 16 @notifier_semaphore.wait(@notifier_mutex)
  37. end
  38. end
  39. 8 def signal
  40. 8 @notifier_mutex.synchronize do
  41. 8 @notifier_semaphore.signal
  42. end
  43. end
  44. end
  45. 8 module WorkerMethods
  46. 8 attr_writer :notifier
  47. 8 def wait_for_work
  48. 16 @notifier.wait
  49. end
  50. end
  51. 8 class << self
  52. 8 def configure(config)
  53. 7 config.config[:notifier_channel] = :outbox_notifications
  54. 8 notifier = Notifier.new(config)
  55. 16 config.on_start_worker { |wk| wk.notifier = notifier }
  56. 8 config.on_start(&notifier.method(:start))
  57. 8 config.on_stop(&notifier.method(:stop))
  58. end
  59. end
  60. end
  61. 8 register_plugin :pg_notify, PgNotify
  62. end
  63. end

lib/tobox/plugins/progress.rb

100.0% lines covered

37 relevant lines. 37 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 38 module Tobox
  3. 38 module Plugins
  4. 38 module Progress
  5. 38 def self.configure(conf)
  6. 95 conf.config[:cte_table] = :outbox_cte
  7. 95 conf.config[:visibility_timeout] = 30
  8. end
  9. 38 module FetcherMethods
  10. 38 def initialize(_, configuration)
  11. 99 super
  12. 99 @cte_table = configuration[:cte_table]
  13. end
  14. 38 private
  15. 38 def do_fetch_events
  16. # mark events as invisible
  17. # @type var mark_as_fetched_params: Hash[Symbol, untyped]
  18. 462 mark_as_fetched_params = {
  19. last_error: nil
  20. }
  21. 462 mark_as_fetched_params[@attempts_column] = Sequel[@table][@attempts_column] + 1 if @attempts_column
  22. 420 mark_as_fetched_params[@visibility_column] = (@configuration.visibility_type_bool? || Sequel.date_add(
  23. Sequel::CURRENT_TIMESTAMP,
  24. seconds: @configuration[:visibility_timeout]
  25. ))
  26. 462 if @ds.supports_returning?(:update)
  27. 336 ids = fetch_event_ids
  28. 336 ids = if @db.database_type == :postgres && @ds.supports_cte?(:update)
  29. # The Postgres planner can ignore a LIMIT in the subquery, causing more UPDATEs than LIMIT.
  30. # A know solution or workaround is to use a CTE as an "optimization fence".
  31. # https://dba.stackexchange.com/questions/69471/postgres-update-limit-1/69497#69497
  32. 126 ds = @ds.with(@cte_table, ids.select(:id))
  33. .from(@table, @cte_table)
  34. .where(Sequel[@cte_table][:id] => Sequel[@table][:id])
  35. 126 ds = ds.where(Sequel[@table][@attempts_column] < @configuration[:max_attempts]) if @attempts_column
  36. 126 ds
  37. else
  38. 210 @ds.where(id: ids)
  39. end
  40. 336 ids.returning.update(mark_as_fetched_params)
  41. else
  42. 126 event_ids = fetch_event_ids.select_map(:id)
  43. 126 events_ds = @ds.where(id: event_ids)
  44. 126 events_ds.update(mark_as_fetched_params)
  45. 126 events_ds.first(@batch_size)
  46. end
  47. end
  48. 38 def calculate_event_retry_interval(attempts)
  49. 66 super(attempts - 1)
  50. end
  51. 38 def set_event_retry_attempts(event, update_params)
  52. 99 update_params.delete(@attempts_column)
  53. 99 super
  54. end
  55. 38 def events_tr
  56. 462 yield
  57. end
  58. 38 def event_id_tr(&block)
  59. 462 @db.transaction(savepoint: false, &block)
  60. end
  61. end
  62. end
  63. 38 register_plugin :progress, Progress
  64. end
  65. end

lib/tobox/plugins/sentry.rb

100.0% lines covered

82 relevant lines. 82 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 5 module Tobox
  3. 5 module Plugins
  4. 5 module Sentry
  5. 5 module ConfigurationMethods
  6. 5 def on_sentry_init(&callback)
  7. 25 (@lifecycle_events[:sentry_init] ||= []) << callback
  8. 25 self
  9. end
  10. end
  11. 5 class Configuration
  12. # Set this option to true if you want Sentry to only capture the last job
  13. # retry if it fails.
  14. 5 attr_accessor :report_after_retries
  15. 5 def initialize
  16. 150 @report_after_retries = false
  17. end
  18. end
  19. 5 class EventHandler
  20. 5 TOBOX_NAME = "tobox"
  21. 5 def initialize(config)
  22. 25 @config = config
  23. 25 @db_table = @config[:table]
  24. 25 @db_scheme = URI(@config[:database_uri]).scheme if @config[:database_uri]
  25. 25 @max_attempts = @config[:max_attempts]
  26. end
  27. 5 def on_start(event)
  28. 25 return unless ::Sentry.initialized?
  29. 25 ::Sentry.clone_hub_to_current_thread
  30. 25 scope = ::Sentry.get_current_scope
  31. 25 scope.set_contexts(tobox: {
  32. id: event[:id],
  33. type: event[:type],
  34. @attempts_column => event[@config[:attempts_column]],
  35. created_at: event[:created_at],
  36. @visibility_column => event[@config[:visibility_column]],
  37. last_error: event[:last_error]&.byteslice(0..1000),
  38. version: Tobox::VERSION,
  39. db_adapter: @db_scheme
  40. }.compact)
  41. 25 scope.set_tags(
  42. outbox: @db_table,
  43. event_id: event[:id],
  44. event_type: event[:type]
  45. )
  46. 25 scope.set_transaction_name("#{TOBOX_NAME}/#{event[:type]}") unless scope.transaction_name
  47. 25 transaction = start_transaction(scope.transaction_name, event[:metadata].to_h)
  48. 25 return unless transaction
  49. 25 scope.set_span(transaction)
  50. # good for thread pool, good for fiber pool
  51. 25 store_transaction(event, transaction)
  52. end
  53. 5 def on_finish(event)
  54. 10 return unless ::Sentry.initialized?
  55. 10 transaction = retrieve_transaction(event)
  56. 10 return unless transaction
  57. 10 finish_transaction(transaction, 200)
  58. 10 scope = ::Sentry.get_current_scope
  59. 10 scope.clear
  60. end
  61. 5 def on_error(event, error)
  62. 15 return unless ::Sentry.initialized?
  63. 15 capture_exception(event, error)
  64. 15 transaction = retrieve_transaction(event)
  65. 15 return unless transaction
  66. 15 finish_transaction(transaction, 500)
  67. end
  68. 5 private
  69. 5 def start_transaction(transaction_name, sentry_trace)
  70. 25 options = { name: transaction_name, op: "tobox" }
  71. 25 transaction = ::Sentry.continue_trace(sentry_trace, **options) unless sentry_trace.empty?
  72. 25 ::Sentry.start_transaction(transaction: transaction, **options)
  73. end
  74. 5 def finish_transaction(transaction, status)
  75. 25 transaction.set_http_status(status)
  76. 25 transaction.finish
  77. end
  78. 5 def store_transaction(event, transaction)
  79. 25 store = (Thread.current[:tobox_sentry_transactions] ||= {})
  80. 25 store[event[:id]] = transaction
  81. end
  82. 5 def retrieve_transaction(event)
  83. 25 return unless (store = Thread.current[:tobox_sentry_transactions])
  84. 25 store.delete(event[:id])
  85. end
  86. 5 def capture_exception(event, error)
  87. 15 if ::Sentry.configuration.tobox.report_after_retries &&
  88. event[@config[:attempts_column]] &&
  89. event[@config[:attempts_column]] < @max_attempts
  90. 5 return
  91. end
  92. 10 ::Sentry.capture_exception(
  93. error,
  94. hint: { background: false }
  95. )
  96. end
  97. end
  98. 5 class << self
  99. 5 def load_dependencies(*)
  100. 25 require "uri"
  101. 25 require "sentry-ruby"
  102. 25 require "sentry/integrable"
  103. 25 extend ::Sentry::Integrable
  104. end
  105. 5 def configure(config)
  106. 25 event_handler = EventHandler.new(config)
  107. 25 config.on_before_event(&event_handler.method(:on_start))
  108. 25 config.on_after_event(&event_handler.method(:on_finish))
  109. 25 config.on_error_event(&event_handler.method(:on_error))
  110. 25 config.on_error_worker do |error|
  111. 5 ::Sentry.capture_exception(error, hint: { background: false })
  112. end
  113. 25 ::Sentry::Configuration.attr_reader(:tobox)
  114. 25 ::Sentry::Configuration.add_post_initialization_callback do
  115. 150 @tobox = Plugins::Sentry::Configuration.new
  116. end
  117. 25 register_integration name: "tobox", version: Tobox::VERSION
  118. 25 config.on_start do
  119. 50 ::Sentry.init do |sentry_cfg|
  120. 50 Array(config.lifecycle_events[:sentry_init]).each do |cb|
  121. 50 cb[sentry_cfg]
  122. end
  123. end
  124. end
  125. end
  126. end
  127. end
  128. 5 register_plugin :sentry, Sentry
  129. end
  130. end

lib/tobox/plugins/stats.rb

96.1% lines covered

77 relevant lines. 74 lines covered and 3 lines missed.
    
  1. # frozen_string_literal: true
  2. 5 module Tobox
  3. 5 module Plugins
  4. 5 module Stats
  5. 5 module ConfigurationMethods
  6. 5 attr_reader :stats_interval_seconds
  7. 5 def on_stats(stats_interval_seconds, &callback)
  8. 25 @stats_interval_seconds = stats_interval_seconds
  9. 25 (@lifecycle_events[:stats] ||= []) << callback
  10. 25 self
  11. end
  12. end
  13. 5 class StatsEmitter
  14. 5 def initialize(config)
  15. 25 @config = config
  16. 25 @running = false
  17. end
  18. 5 def start
  19. 25 return if @running
  20. 25 config = @config
  21. 25 plugins = config.plugins.map(&:name)
  22. 25 interval = config.stats_interval_seconds
  23. 25 @stats_handlers = Array(config.lifecycle_events[:stats])
  24. 25 return if @stats_handlers.empty?
  25. 25 @error_handlers = Array(config.lifecycle_events[:error_worker])
  26. 25 @max_attempts = config[:max_attempts]
  27. 25 @created_at_column = config[:created_at_column]
  28. 25 @db = Sequel.connect(config.database.opts.merge(max_connections: 1))
  29. 25 @db.loggers = config.database.loggers
  30. 25 Array(config.lifecycle_events[:database_connect]).each { |cb| cb.call(@db) }
  31. 25 outbox_table = config[:table]
  32. 25 @outbox_ds = @db[outbox_table]
  33. 25 if plugins.include?("Tobox::Plugins::Inbox")
  34. inbox_table = config[:inbox_table]
  35. @inbox_ds = @db[inbox_table]
  36. end
  37. 25 if @created_at_column
  38. # discard already handled events
  39. #
  40. 5 @oldest_event_age_ds = @outbox_ds.where(last_error: nil)
  41. 5 @oldest_event_age_ds = if config.visibility_type_bool?
  42. @oldest_event_age_ds.where(config[:visibility_column] => false)
  43. else
  44. 5 @oldest_event_age_ds.where(config[:visibility_column] => nil)
  45. end
  46. 5 @oldest_event_age_ds = @oldest_event_age_ds.order(Sequel.asc(:id))
  47. end
  48. 25 logger = config.default_logger
  49. 25 stats = method(:collect_event_stats)
  50. 25 stats.instance_eval do
  51. 25 alias collect call
  52. end
  53. 25 @th = Thread.start do
  54. 25 Thread.current.name = "outbox-stats"
  55. 25 loop do
  56. 150 logger.debug { "stats worker: sleep for #{interval}s..." }
  57. 75 sleep interval
  58. begin
  59. 50 emit_event_stats(stats)
  60. rescue RuntimeError => e
  61. 10 @error_handlers.each { |hd| hd.call(e) }
  62. end
  63. 50 break unless @running
  64. end
  65. end
  66. 25 @running = true
  67. end
  68. 5 def stop
  69. 20 return unless @running
  70. 20 @th.terminate
  71. 20 @db.disconnect
  72. 20 @running = false
  73. end
  74. 5 private
  75. 5 def emit_event_stats(stats)
  76. 50 @stats_handlers.each do |hd|
  77. 50 hd.call(stats, @db)
  78. end
  79. end
  80. 5 def collect_event_stats
  81. 40 stats = @outbox_ds.group_and_count(
  82. Sequel.case([
  83. [{ last_error: nil }, "pending_count"],
  84. [Sequel.expr(@config[:attempts_column]) < @max_attempts, "failing_count"]
  85. ],
  86. "failed_count").as(:status)
  87. )
  88. 40 stats = stats.as_hash(:status, :count).transform_keys(&:to_sym)
  89. # fill it in
  90. 40 stats[:pending_count] ||= 0
  91. 40 stats[:failing_count] ||= 0
  92. 40 stats[:failed_count] ||= 0
  93. 40 stats[:inbox_count] = @inbox_ds.count if @inbox_ds
  94. 40 if @oldest_event_age_ds
  95. 5 created_at = @oldest_event_age_ds.get(@created_at_column)
  96. 5 age = created_at ? (Time.now - created_at).to_i : 0
  97. 5 stats[:oldest_event_age_in_seconds] = age
  98. end
  99. 40 stats
  100. end
  101. end
  102. 5 class << self
  103. 5 def configure(config)
  104. 25 emitter = StatsEmitter.new(config)
  105. 25 config.on_start(&emitter.method(:start))
  106. 25 config.on_stop(&emitter.method(:stop))
  107. end
  108. end
  109. end
  110. 5 register_plugin :stats, Stats
  111. end
  112. end

lib/tobox/plugins/zeitwerk.rb

96.55% lines covered

29 relevant lines. 28 lines covered and 1 lines missed.
    
  1. # frozen_string_literal: true
  2. 5 module Tobox
  3. 5 module Plugins
  4. 5 module Zeitwerk
  5. 5 module ConfigurationMethods
  6. 5 def zeitwerk_loader(loader = nil, &blk)
  7. 25 if loader
  8. 5 @zeitwerk_loader = loader
  9. 20 elsif blk
  10. 5 @zeitwerk_loader ||= ::Zeitwerk::Loader.new
  11. 5 yield(@zeitwerk_loader)
  12. 15 elsif !(loader || blk)
  13. 15 @zeitwerk_loader
  14. end
  15. end
  16. 5 def freeze
  17. 10 loader = @zeitwerk_loader
  18. 10 return super unless loader
  19. 10 if @config[:environment] == "production"
  20. 5 loader.setup
  21. 5 ::Zeitwerk::Loader.eager_load_all
  22. else
  23. 5 loader.enable_reloading
  24. 5 loader.setup
  25. end
  26. 10 super
  27. end
  28. end
  29. 5 class << self
  30. 5 def load_dependencies(*)
  31. 10 require "zeitwerk"
  32. end
  33. 5 def configure(config)
  34. 10 loader = config.zeitwerk_loader
  35. 10 return unless loader
  36. config.on_before_event { |*| loader.reload }
  37. end
  38. end
  39. end
  40. 5 register_plugin :zeitwerk, Zeitwerk
  41. end
  42. end

lib/tobox/pool.rb

100.0% lines covered

27 relevant lines. 27 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 38 module Tobox
  3. 38 class KillError < Interrupt; end
  4. 38 class Pool
  5. 38 def initialize(configuration)
  6. 303 @configuration = configuration
  7. 303 @logger = @configuration.default_logger
  8. 303 @num_workers = configuration[:concurrency]
  9. 303 @workers = Array.new(@num_workers) do |idx|
  10. 712 @configuration.worker_class.new("tobox-worker-#{idx}", configuration)
  11. end
  12. 303 @worker_error_handlers = Array(@configuration.lifecycle_events[:error_worker])
  13. 303 @running = true
  14. end
  15. 38 def stop
  16. 144 return unless @running
  17. 144 @workers.each(&:finish!)
  18. 144 @running = false
  19. end
  20. 38 private
  21. 38 def do_work(wrk)
  22. 423 wrk.work
  23. rescue Exception => e # rubocop:disable Lint/RescueException
  24. 161 handle_exception(wrk, e)
  25. end
  26. 38 def handle_exception(wrk, exc)
  27. 141 case exc
  28. when KillError
  29. # noop
  30. when Exception
  31. 87 wrk.finish!
  32. 87 @logger.error do
  33. 5 "(worker: #{wrk.label}) -> " \
  34. "crashed with error\n" \
  35. "#{exc.class}: #{exc.message}\n" \
  36. "#{exc.backtrace.join("\n")}"
  37. end
  38. 92 @worker_error_handlers.each { |hd| hd.call(exc) }
  39. end
  40. end
  41. end
  42. 38 autoload :ThreadedPool, File.join(__dir__, "pool", "threaded_pool")
  43. 38 autoload :FiberPool, File.join(__dir__, "pool", "fiber_pool")
  44. end

lib/tobox/pool/fiber_pool.rb

94.23% lines covered

52 relevant lines. 49 lines covered and 3 lines missed.
    
  1. # frozen_string_literal: true
  2. 23 require "timeout"
  3. 23 require "async/scheduler"
  4. 23 module Tobox
  5. 23 class FiberPool < Pool
  6. 23 def initialize(_)
  7. 67 Sequel.extension(:fiber_concurrency)
  8. 67 super
  9. 67 @fibers = []
  10. 67 @fiber_mtx = Mutex.new
  11. 67 @fiber_cond = ConditionVariable.new
  12. 67 @fiber_thread = nil
  13. end
  14. 23 def start
  15. 44 @fiber_thread = Thread.start do
  16. 44 Thread.current.name = "tobox-fibers-thread"
  17. begin
  18. 44 Fiber.set_scheduler(Async::Scheduler.new)
  19. 44 @fiber_mtx.synchronize do
  20. 44 @workers.each do |worker|
  21. 72 @fibers << start_fiber_worker(worker)
  22. end
  23. 44 @fiber_cond.signal
  24. end
  25. rescue KillError
  26. @fibers.each { |f| f.raise(KillError) }
  27. end
  28. end
  29. 44 @fiber_mtx.synchronize do
  30. 44 @fiber_cond.wait(@fiber_mtx)
  31. end
  32. end
  33. 23 def stop
  34. 12 shutdown_timeout = @configuration[:shutdown_timeout]
  35. 12 grace_shutdown_timeout = @configuration[:grace_shutdown_timeout]
  36. 12 super
  37. 12 th = @fiber_thread
  38. 12 return unless th
  39. 12 th.join(shutdown_timeout)
  40. 12 return unless th.alive?
  41. 4 th.report_on_exception = false
  42. 4 th.abort_on_exception = false
  43. begin
  44. 4 th.raise(KillError)
  45. 4 th.join(grace_shutdown_timeout)
  46. th.kill
  47. th.join(1)
  48. 1 rescue KillError
  49. # async re-raises the interrupt error
  50. end
  51. end
  52. 23 private
  53. 23 def handle_exception(wrk, exc)
  54. # noop
  55. 24 return if exc.is_a?(::Async::Stop)
  56. 16 super
  57. end
  58. 23 def start_fiber_worker(worker)
  59. 88 Fiber.schedule do
  60. 88 do_work(worker)
  61. 40 @fiber_mtx.synchronize do
  62. 40 @fibers.delete(Fiber.current)
  63. 40 if worker.finished? && @running
  64. 16 idx = @workers.index(worker)
  65. 16 raise Error, "worker not found" unless idx
  66. 16 subst_worker = @configuration.worker_class.new(worker.label, @configuration)
  67. 16 @workers[idx] = subst_worker
  68. 16 @fibers << start_fiber_worker(subst_worker)
  69. end
  70. end
  71. rescue KillError
  72. # noop
  73. end
  74. end
  75. end
  76. end

lib/tobox/pool/threaded_pool.rb

100.0% lines covered

44 relevant lines. 44 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 33 module Tobox
  3. 33 class ThreadedPool < Pool
  4. 33 def initialize(_configuration)
  5. 165 @parent_thread = Thread.main
  6. 165 @threads = []
  7. 165 @threads_mutex = Thread::Mutex.new
  8. 165 super
  9. end
  10. 33 def start
  11. 132 @workers.each do |wk|
  12. 264 th = start_thread_worker(wk)
  13. 264 @threads_mutex.synchronize do
  14. 264 @threads << th
  15. end
  16. end
  17. end
  18. 33 def stop
  19. 99 shutdown_timeout = @configuration[:shutdown_timeout]
  20. 99 grace_shutdown_timeout = @configuration[:grace_shutdown_timeout]
  21. 99 super
  22. 99 Thread.pass # let workers finish
  23. # soft exit
  24. 99 join = lambda do |timeout|
  25. 297 start = Process.clock_gettime(::Process::CLOCK_MONOTONIC)
  26. 297 loop do
  27. 858 terminating_th = @threads_mutex.synchronize { @threads.first }
  28. 429 return unless terminating_th
  29. 165 elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
  30. 165 break if elapsed > timeout
  31. 132 terminating_th.join(timeout - elapsed)
  32. end
  33. end
  34. 99 join.call(shutdown_timeout)
  35. # hard exit
  36. 264 @threads_mutex.synchronize { @threads.each { |th| th.raise(KillError) } }
  37. 99 join.call(grace_shutdown_timeout)
  38. 198 @threads_mutex.synchronize { @threads.each(&:kill) }
  39. 99 join.call(1)
  40. end
  41. 33 private
  42. 33 def start_thread_worker(wrk)
  43. 330 Thread.start(wrk) do |worker|
  44. 330 Thread.current.name = worker.label
  45. 330 do_work(worker)
  46. 264 @threads_mutex.synchronize do
  47. 264 @threads.delete(Thread.current)
  48. 264 if worker.finished? && @running
  49. 66 idx = @workers.index(worker)
  50. 66 raise Error, "worker not found" unless idx
  51. 66 subst_worker = @configuration.worker_class.new(worker.label, @configuration)
  52. 60 @workers[idx] = subst_worker
  53. 66 subst_thread = start_thread_worker(subst_worker)
  54. 66 @threads << subst_thread
  55. end
  56. end
  57. end
  58. end
  59. end
  60. end

lib/tobox/worker.rb

100.0% lines covered

34 relevant lines. 34 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 38 module Tobox
  3. 38 class Worker
  4. 38 attr_reader :label
  5. 38 def initialize(label, configuration)
  6. 972 @label = label
  7. 972 @wait_for_events_delay = configuration[:wait_for_events_delay]
  8. 972 @handlers = configuration.handlers || {}
  9. 972 @fetcher = configuration.fetcher_class.new(label, configuration)
  10. 972 @finished = false
  11. 972 if (message_to_arguments = configuration.arguments_handler)
  12. 33 define_singleton_method(:message_to_arguments, &message_to_arguments)
  13. end
  14. 972 Array(configuration.lifecycle_events[:start_worker]).each do |hd|
  15. 8 hd.call(self)
  16. end
  17. end
  18. 38 def finished?
  19. 304 @finished
  20. end
  21. 38 def finish!
  22. 416 @finished = true
  23. end
  24. 38 def work
  25. 344 do_work until @finished
  26. end
  27. 38 private
  28. 38 def do_work
  29. 640 return if @finished
  30. 640 sum_fetched_events = @fetcher.fetch_events do |events|
  31. 238 cs = @handlers.each_with_object({}) do |(callback, event_types), bucket|
  32. 231 events.each do |event|
  33. 264 (bucket[callback] ||= []) << message_to_arguments(event) if event_types.include?(event[:type].to_sym)
  34. end
  35. end
  36. 238 cs.each do |callback, evs|
  37. 132 callback.call(*evs)
  38. end
  39. end
  40. 620 return if @finished
  41. 577 wait_for_work if sum_fetched_events.zero?
  42. end
  43. 38 def wait_for_work
  44. 339 sleep(@wait_for_events_delay)
  45. end
  46. 38 def message_to_arguments(event)
  47. 132 event
  48. end
  49. end
  50. end