Browse Source

Remove PubSub

CLDC-1222-improve-case-log-import-performance
Mo Seedat 2 years ago
parent
commit
a25cdb73c5
  1. 4
      Gemfile
  2. 4
      Gemfile.lock
  3. 11
      app/jobs/lettings_log_import_job.rb
  4. 48
      app/listeners/lettings_log_import_listener.rb
  5. 236
      app/services/imports/lettings_logs_import_service.rb
  6. 7
      config/initializers/wisper.rb
  7. 21
      db/schema.rb
  8. BIN
      dump.rdb

4
Gemfile

@ -60,7 +60,6 @@ gem "possessive"
gem "auto_strip_attributes"
# Use sidekiq for background processing
gem "sidekiq"
gem 'wisper', '~> 2.0'
group :development, :test do
# Check gems for known vulnerabilities
@ -93,8 +92,7 @@ group :test do
gem "selenium-webdriver", require: false
gem "simplecov", require: false
gem "timecop", "~> 0.9.4"
gem "webmock", require: false
gem 'wisper-rspec', require: false
gem "webmock", require: false
end
group :development, :test do

4
Gemfile.lock

@ -422,8 +422,6 @@ GEM
websocket-driver (0.7.5)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.5)
wisper (2.0.1)
wisper-rspec (1.1.0)
xpath (3.2.0)
nokogiri (~> 1.8)
zeitwerk (2.6.0)
@ -486,8 +484,6 @@ DEPENDENCIES
view_component
web-console (>= 4.1.0)
webmock
wisper (~> 2.0)
wisper-rspec
RUBY VERSION
ruby 3.1.2p20

11
app/jobs/lettings_log_import_job.rb

@ -1,16 +1,9 @@
class LettingsLogImportJob < ApplicationJob
include Wisper::Publisher
self.queue_name_prefix = '_lettings_logs'
self.queue_name_prefix = 'lettings_logs_import'
queue_as :default
def perform(run_id, xml_document)
puts "PERFORMING RUN: #{run_id} WITH XML DOC: #{xml_document}"
#Wisper.subscribe(LettingsLogImportListener.new, prefix: :on)
processor = Imports::LettingsLogsImportProcessor.new(xml_document)
broadcast(::Import::ITEM_PROCESSED, run_id, processor)
Imports::LettingsLogsImportProcessor.new(xml_document)
end
end

48
app/listeners/lettings_log_import_listener.rb

@ -1,48 +0,0 @@
class LettingsLogImportListener
# include Wisper::Publisher
def on_events_import_started(run_id)
puts "LettingsLogs::ImportListener STARTING RUN -> #{run_id}"
end
def on_events_import_finished(run_id)
puts "LettingsLogs::ImportListener FINISHED RUN -> #{run_id}"
end
def on_events_import_item_processed(run_id, processor)
puts "LettingsLogs::ImportListener ITEM PROCESSED -> #{run_id} old_id: #{processor.old_id}, discrepency?: #{processor.discrepancy?}"
redis = Redis.new
obj = redis.get(run_id)
logs_import = Marshal.load(obj)
puts "GOT FROM REDIS: total: #{logs_import.total}"
logs_import.num_saved += 1
if processor.discrepancy?
logs_import.discrepancies << processor.old_id
end
redis.set(run_id, Marshal.dump(logs_import))
if last_item?(logs_import)
collate_results_and_update_db(logs_import)
send_email_with_results(logs_import)
# broadcast(::Import::FINISHED, run_id)
end
end
def last_item?(logs_import)
logs_import.total == (logs_import.num_saved + logs_import.num_skipped)
end
def collate_results_and_update_db(logs_import)
logs_import.finished_at = Time.zone.now
logs_import.duration_seconds = (logs_import.finished_at - logs_import.started_at).seconds.to_i
logs_import.save!
end
def send_email_with_results(logs_import)
# TODO
end
end

236
app/services/imports/lettings_logs_import_service.rb

@ -3,60 +3,24 @@ require 'json'
module Imports
class LettingsLogsImportService < ImportService
include Wisper::Publisher
def initialize(storage_service, logger = Rails.logger)
super
Wisper.subscribe(LettingsLogImportListener.new, prefix: :on)
end
def create_logs(folder)
import_from(folder, :create_log)
if @logs_with_discrepancies.count.positive?
@logger.warn("The following lettings logs had status discrepancies: [#{@logs_with_discrepancies.join(', ')}]")
end
end
def local_load(folder)
filenames = Dir["#{folder}/**/*.xml"]
puts "FILENAMES (#{filenames.size}): #{filenames}"
@run_id = "LLRun-#{Time.zone.now.to_s}"
@logger.info("START: Importing Lettings Logs @ #{Time.zone.now.strftime('%d-%m-%Y %H:%M')}. RunId: #{@run_id}")
@run_id = SecureRandom.uuid.to_s
logs_import = LogsImport.create!(
run_id: @run_id,
started_at: Time.zone.now,
total: filenames.size,
discrepancies: [],
filenames: filenames
)
redis = Redis.new
redis.set(@run_id, Marshal.dump(logs_import))
import_from(folder, :enqueue_job)
broadcast(::Import::STARTED, @run_id)
filenames.each do |filename|
puts "Loading filename: #{filename}"
Rack::MiniProfiler.step("Start Processing file #{filename}") do
# Generate background job to process file completely
xml_document = Nokogiri::XML(File.open(filename))
LettingsLogImportJob.perform_later(@run_id, xml_document.to_s)
#send(:create_log, xml_document)
end
rescue StandardError => e
@logger.error "#{e.class} in #{filename}: #{e.message}. Caller: #{e.backtrace.first}"
end
if @logs_with_discrepancies.count.positive?
@logger.warn("The following lettings logs had status discrepancies: [#{@logs_with_discrepancies.join(', ')}]")
end
end
@logger.info("FINISH: Importing Lettings Logs @ #{Time.zone.now.strftime('%d-%m-%Y %H:%M')}. RunId: #{@run_id}")
end
def enqueue_job(xml_document)
LettingsLogImportJob.perform_later(@run_id, xml_document.to_s)
end
end
class LettingsLogsImportProcessor
FORM_NAME_INDEX = {
start_year: 0,
@ -97,16 +61,42 @@ module Imports
other_intermediate_rent_product: 5,
}.freeze
attr_reader :xml_doc, :logs_with_discrepancies, :logs_overridden, :discrepancy, :old_id
SEX = {
"Male" => "M",
"Female" => "F",
"Other" => "X",
"Non-binary" => "X",
"Refused" => "R",
}.freeze
RELATION = {
"Child" => "C",
"Partner" => "P",
"Other" => "X",
"Non-binary" => "X",
"Refused" => "R",
}.freeze
FIELDS_NOT_PRESENT_IN_SOFTWIRE_DATA = %w[
majorrepairs
illness_type_0
tshortfall_known
pregnancy_value_check
retirement_value_check
rent_value_check
net_income_value_check
major_repairs_date_value_check void_date_value_check
housingneeds_type
housingneeds_other
].freeze
attr_reader :xml_doc, :logs_overridden, :discrepancy, :old_id
def initialize(xml_doc)
@xml_doc = xml_doc
def initialize(xml_document)
@xml_doc = xml_document
@discrepancy = false
@old_id = ''
@logs_with_discrepancies = Set.new
@logs_overridden = Set.new
@logs_overridden = false
end
def create_log(xml_doc)
@ -114,7 +104,6 @@ module Imports
previous_status = field_value(xml_doc, "meta", "status")
Rack::MiniProfiler.step("Loading attributes") do
# Required fields for status complete or logic to work
# Note: order matters when we derive from previous values (attributes parameter)
attributes["startdate"] = compose_date(xml_doc, "DAY", "MONTH", "YEAR")
@ -300,16 +289,13 @@ module Imports
attributes["created_by"] = user
end
end # ENDPROFILER
Rack::MiniProfiler.step("Saving...") do
apply_date_consistency!(attributes)
apply_household_consistency!(attributes)
lettings_log = save_lettings_log(attributes)
compute_differences(lettings_log, attributes)
check_status_completed(lettings_log, previous_status) unless @logs_overridden.include?(lettings_log.old_id)
end # ENDPROFILER
check_status_completed(lettings_log, previous_status)
end
def save_lettings_log(attributes)
@ -331,15 +317,16 @@ module Imports
def rescue_validation_or_raise(lettings_log, attributes, exception)
if lettings_log.errors.of_kind?(:referral, :internal_transfer_non_social_housing)
@logger.warn("Log #{lettings_log.old_id}: Removing internal transfer referral since previous tenancy is a non social housing")
@logs_overridden << lettings_log.old_id
@logs_overridden = true
attributes.delete("referral")
save_lettings_log(attributes)
elsif lettings_log.errors.of_kind?(:referral, :internal_transfer_fixed_or_lifetime)
@logger.warn("Log #{lettings_log.old_id}: Removing internal transfer referral since previous tenancy is fixed terms or lifetime")
@logs_overridden << lettings_log.old_id
@logs_overridden = true
attributes.delete("referral")
save_lettings_log(attributes)
else
@logger.error("[rescue_validation_or_raise] No actionable error for exception: #{exception.message}")
raise exception
end
end
@ -348,7 +335,7 @@ module Imports
differences = []
attributes.each do |key, value|
lettings_log_value = lettings_log.send(key.to_sym)
next if fields_not_present_in_softwire_data.include?(key)
next if FIELDS_NOT_PRESENT_IN_SOFTWIRE_DATA.include?(key)
if value != lettings_log_value
differences.push("#{key} #{value.inspect} #{lettings_log_value.inspect}")
@ -357,15 +344,15 @@ module Imports
@logger.warn "Differences found when saving log #{lettings_log.old_id}: #{differences}" unless differences.empty?
end
def fields_not_present_in_softwire_data
%w[majorrepairs illness_type_0 tshortfall_known pregnancy_value_check retirement_value_check rent_value_check net_income_value_check major_repairs_date_value_check void_date_value_check housingneeds_type housingneeds_other]
end
# @logs_overridden can only include?(lettings_log.old_id) if there was a
# validation error raised therefore no need to do @logs_overridden.include? but rather
# enough to set a flag for logs_overriden
def check_status_completed(lettings_log, previous_status)
return if @logs_overridden
if previous_status.include?("submitted") && lettings_log.status != "completed"
@logger.warn "lettings log #{lettings_log.id} is not completed"
@logger.warn "lettings log with old id:#{lettings_log.old_id} is incomplete but status should be complete"
@logs_with_discrepancies << lettings_log.old_id
@logger.warn "DISCREPENCY lettings log #{lettings_log.id} is not completed"
@logger.warn "DISCREPENCY lettings log with old id:#{lettings_log.old_id} is incomplete but status should be complete"
@discrepancy = true
end
end
@ -379,32 +366,27 @@ module Imports
# Safe: A string that represents only a decimal (or empty/nil)
def safe_string_as_decimal(xml_doc, attribute)
str = string_or_nil(xml_doc, attribute)
if str.nil?
nil
else
BigDecimal(str, exception: false)
end
return if str.nil?
BigDecimal(str, exception: false)
end
# Unsafe: A string that has more than just the integer value
def unsafe_string_as_integer(xml_doc, attribute)
str = string_or_nil(xml_doc, attribute)
if str.nil?
nil
else
str.to_i
end
return if str.nil?
str.to_i
end
def compose_date(xml_doc, day_str, month_str, year_str)
day = Integer(field_value(xml_doc, "xmlns", day_str), exception: false)
month = Integer(field_value(xml_doc, "xmlns", month_str), exception: false)
year = Integer(field_value(xml_doc, "xmlns", year_str), exception: false)
if day.nil? || month.nil? || year.nil?
nil
else
Time.zone.local(year, month, day)
end
return if [day, month, year].any?(&:nil?)
Time.zone.local(year, month, day)
end
def get_form_name_component(xml_doc, index)
@ -415,6 +397,7 @@ module Imports
def needs_type(xml_doc)
gn_sh = get_form_name_component(xml_doc, FORM_NAME_INDEX[:needs_type])
case gn_sh
when "GN"
GN_SH[:general_needs]
@ -460,45 +443,22 @@ module Imports
end
def sex(xml_doc, index)
sex = string_or_nil(xml_doc, "P#{index}Sex")
case sex
when "Male"
"M"
when "Female"
"F"
when "Other", "Non-binary"
"X"
when "Refused"
"R"
end
unmapped_sex = string_or_nil(xml_doc, "P#{index}Sex")
SEX[unmapped_sex]
end
def relat(xml_doc, index)
relat = string_or_nil(xml_doc, "P#{index}Rel")
case relat
when "Child"
"C"
when "Partner"
"P"
when "Other", "Non-binary"
"X"
when "Refused"
"R"
end
unmapped_relation = string_or_nil(xml_doc, "P#{index}Rel")
RELATION[unmapped_relation]
end
def age_known(xml_doc, index, hhmemb)
return nil if hhmemb.present? && index > hhmemb
age_refused = string_or_nil(xml_doc, "P#{index}AR")
if age_refused.present?
if age_refused.casecmp?("AGE_REFUSED") || age_refused.casecmp?("No")
return 1 # No
else
return 0 # Yes
end
end
0
return 0 unless age_refused.present?
(age_refused.casecmp?("AGE_REFUSED") || age_refused.casecmp?("No")) ? 1 : 0
end
def details_known(index, attributes)
@ -528,30 +488,22 @@ module Imports
def compose_postcode(xml_doc, outcode, incode)
outcode_value = string_or_nil(xml_doc, outcode)
incode_value = string_or_nil(xml_doc, incode)
if outcode_value.nil? || incode_value.nil? || !"#{outcode_value} #{incode_value}".match(POSTCODE_REGEXP)
nil
else
"#{outcode_value} #{incode_value}"
end
end
def london_affordable_rent(xml_doc)
lar = unsafe_string_as_integer(xml_doc, "LAR")
if lar == 1
1
else
# We default to No for any other values (nil, not known)
2
end
# Default to No (2) for any other values (nil, not known)
def london_affordable_rent(xml_doc)
unsafe_string_as_integer(xml_doc, "LAR") == 1 ? 1 : 2
end
def renewal(rsnvac)
# Relet – renewal of fixed-term tenancy
if rsnvac == 14
1
else
0
end
# Relet – renewal of fixed-term tenancy
def renewal(rsnvac)
rsnvac == 14 ? 1 : 0
end
def string_or_nil(xml_doc, attribute)
@ -584,12 +536,7 @@ module Imports
# Letters should be lowercase to match case
def housing_needs(xml_doc, letter)
housing_need = string_or_nil(xml_doc, "Q10-#{letter}")
if housing_need == "Yes"
1
else
0
end
string_or_nil(xml_doc, "Q10-#{letter}") == "Yes" ? 1 : 0
end
def net_income_known(xml_doc, earnings)
@ -607,12 +554,7 @@ module Imports
end
def illness_type(xml_doc, index, illness)
illness_type = string_or_nil(xml_doc, "Q10ib-#{index}")
if illness_type == "Yes" && illness == 1
1
elsif illness == 1
0
end
string_or_nil(xml_doc, "Q10ib-#{index}") == "Yes" && illness == 1 ? 1 : 0
end
def first_time_let(rsnvac)
@ -642,10 +584,12 @@ module Imports
def household_members(xml_doc, previous_status)
hhmemb = safe_string_as_integer(xml_doc, "HHMEMB")
if previous_status.include?("submitted") && hhmemb.nil?
hhmemb = people_with_details(xml_doc).count
return nil if hhmemb.zero?
end
hhmemb
end
@ -661,17 +605,13 @@ module Imports
end
end
def allocation_system(value)
case value
when 1
1
when 2
0
end
def allocation_system(value)
value == 1 ? 1 : 0
end
def allocation_system_unknown(cbl, chr, cap)
allocation_values = [cbl, chr, cap]
if allocation_values.all?(&:nil?)
nil
elsif allocation_values.all? { |att| att&.zero? }

7
config/initializers/wisper.rb

@ -1,7 +0,0 @@
# Wisper global subscribers
# https://github.com/krisleech/wisper
#require_relative '../../app/listeners/lettings_logs/import_listener'
#require_relative '../../app/services/imports/lettings_logs_import_service'
# Wisper.subscribe(LettingsLogsImportListener.new, scope: [Imports::LettingsLogsImportService])

21
db/schema.rb

@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema[7.0].define(version: 2022_09_28_002015) do
ActiveRecord::Schema[7.0].define(version: 2022_09_27_133123) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
@ -264,11 +264,6 @@ ActiveRecord::Schema[7.0].define(version: 2022_09_28_002015) do
t.index ["scheme_id"], name: "index_locations_on_scheme_id"
end
create_table "log_imports", force: :cascade do |t|
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
end
create_table "logs_exports", force: :cascade do |t|
t.datetime "created_at", default: -> { "CURRENT_TIMESTAMP" }
t.datetime "started_at", null: false
@ -277,20 +272,6 @@ ActiveRecord::Schema[7.0].define(version: 2022_09_28_002015) do
t.boolean "empty_export", default: false, null: false
end
create_table "logs_imports", force: :cascade do |t|
t.string "run_id", null: false
t.datetime "started_at"
t.datetime "finished_at"
t.integer "duration_seconds", default: 0
t.integer "total", default: 0
t.integer "num_saved", default: 0
t.integer "num_skipped", default: 0
t.jsonb "discrepancies"
t.jsonb "filenames"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
end
create_table "organisation_relationships", force: :cascade do |t|
t.integer "child_organisation_id"
t.integer "parent_organisation_id"

BIN
dump.rdb

Binary file not shown.
Loading…
Cancel
Save