Browse Source
			
			
			
			
				
		Introduces wisper, resque jobs to process long-runing tasksCLDC-122-experimental-background-job-lettings-log-import-with-report
				 25 changed files with 921 additions and 580 deletions
			
			
		| @ -1,2 +1,3 @@ | |||||||
|  | QUEUE=* rake resque:work | ||||||
| web: bin/rails server -p 3000 | web: bin/rails server -p 3000 | ||||||
| js: yarn build --watch | js: yarn build --watch | ||||||
|  | |||||||
| @ -0,0 +1,41 @@ | |||||||
|  | require 'nokogiri' | ||||||
|  | require 'securerandom' | ||||||
|  | require 'fileutils' | ||||||
|  | 
 | ||||||
|  | class LettingsTestController < ApplicationController | ||||||
|  |   def index | ||||||
|  |     folder = '/Users/mohseeadmin/development-meta/CORE/CLDC-1222' | ||||||
|  |     stash_folder = Time.now.to_i.to_s | ||||||
|  |     generate_fixtures(folder, stash_folder, 50) | ||||||
|  | 
 | ||||||
|  |      | ||||||
|  |       LettingsLog.connection.truncate(LettingsLog.table_name) | ||||||
|  |      | ||||||
|  | 
 | ||||||
|  |      | ||||||
|  |       | ||||||
|  |     Imports::LettingsLogsImportService.new(Storage::S3Service).local_load("#{folder}/#{stash_folder}") | ||||||
|  | 
 | ||||||
|  |   end | ||||||
|  | 
 | ||||||
|  |   def node(xml_document, namespace, field) | ||||||
|  |     xml_document.at_xpath("//#{namespace}:#{field}") | ||||||
|  |   end   | ||||||
|  | 
 | ||||||
|  |   def generate_fixtures(folder, stash_folder, num_files) | ||||||
|  |     #folder = '/Users/mohseeadmin/development-meta/CORE/CLDC-1222/' | ||||||
|  |     canonical_logfiles = %w[canonical_logfile1.xml canonical_logfile2.xml canonical_logfile3.xml] | ||||||
|  |      | ||||||
|  |      | ||||||
|  |     FileUtils.mkdir_p("#{folder}/#{stash_folder}") | ||||||
|  |      | ||||||
|  |     (1..num_files).each do |i| | ||||||
|  |       xml_document = Nokogiri::XML(File.read("#{folder.chomp('/')}/#{canonical_logfiles.sample}")) | ||||||
|  |       document_id = node(xml_document, 'meta', 'document-id') | ||||||
|  |       new_guid = SecureRandom.uuid.to_s | ||||||
|  |       document_id.content = new_guid | ||||||
|  |      | ||||||
|  |       File.open("#{folder.chomp('/')}/#{stash_folder}/#{new_guid}.xml", 'w') { |f| f.puts xml_document } | ||||||
|  |     end | ||||||
|  |   end   | ||||||
|  | end | ||||||
| @ -0,0 +1,6 @@ | |||||||
|  | 
 | ||||||
|  |   module Import | ||||||
|  |     STARTED = :events_import_started | ||||||
|  |     ITEM_PROCESSED = :events_import_item_processed | ||||||
|  |     FINISHED = :events_import_finished | ||||||
|  |   end | ||||||
| @ -0,0 +1,2 @@ | |||||||
|  | module LettingsTestHelper | ||||||
|  | end | ||||||
| @ -0,0 +1,7 @@ | |||||||
|  | class CleanupJob < ApplicationJob | ||||||
|  |   queue_as :default | ||||||
|  | 
 | ||||||
|  |   def perform(*args) | ||||||
|  |     # Do something later | ||||||
|  |   end | ||||||
|  | end | ||||||
| @ -0,0 +1,16 @@ | |||||||
|  | 
 | ||||||
|  | class LettingsLogImportJob < ApplicationJob | ||||||
|  |   include Wisper::Publisher | ||||||
|  | 
 | ||||||
|  |   self.queue_name_prefix = '_lettings_logs' | ||||||
|  |   queue_as :default | ||||||
|  | 
 | ||||||
|  |   def perform(run_id, xml_document) | ||||||
|  |     puts "PERFORMING RUN: #{run_id} WITH XML DOC: #{xml_document}" | ||||||
|  |     #Wisper.subscribe(LettingsLogImportListener.new, prefix: :on) | ||||||
|  | 
 | ||||||
|  |     processor = Imports::LettingsLogsImportProcessor.new(xml_document) | ||||||
|  | 
 | ||||||
|  |     broadcast(::Import::ITEM_PROCESSED, run_id, processor) | ||||||
|  |   end     | ||||||
|  | end | ||||||
| @ -0,0 +1,48 @@ | |||||||
|  | class LettingsLogImportListener | ||||||
|  |   # include Wisper::Publisher | ||||||
|  | 
 | ||||||
|  |   def on_events_import_started(run_id) | ||||||
|  |     puts "LettingsLogs::ImportListener STARTING RUN -> #{run_id}" | ||||||
|  |   end | ||||||
|  | 
 | ||||||
|  |   def on_events_import_finished(run_id) | ||||||
|  |     puts "LettingsLogs::ImportListener FINISHED RUN -> #{run_id}" | ||||||
|  |   end   | ||||||
|  | 
 | ||||||
|  |   def on_events_import_item_processed(run_id, processor) | ||||||
|  |     puts "LettingsLogs::ImportListener ITEM PROCESSED -> #{run_id} old_id: #{processor.old_id}, discrepency?: #{processor.discrepancy?}" | ||||||
|  |    | ||||||
|  |     redis = Redis.new | ||||||
|  |     obj = redis.get(run_id) | ||||||
|  |     logs_import = Marshal.load(obj) | ||||||
|  |     puts "GOT FROM REDIS: total: #{logs_import.total}" | ||||||
|  | 
 | ||||||
|  |     logs_import.num_saved += 1 | ||||||
|  | 
 | ||||||
|  |     if processor.discrepancy? | ||||||
|  |       logs_import.discrepancies << processor.old_id | ||||||
|  |     end | ||||||
|  | 
 | ||||||
|  |     redis.set(run_id, Marshal.dump(logs_import)) | ||||||
|  |      | ||||||
|  |     if last_item?(logs_import) | ||||||
|  |       collate_results_and_update_db(logs_import) | ||||||
|  |       send_email_with_results(logs_import) | ||||||
|  |       # broadcast(::Import::FINISHED, run_id) | ||||||
|  |     end | ||||||
|  |   end   | ||||||
|  | 
 | ||||||
|  |   def last_item?(logs_import) | ||||||
|  |     logs_import.total == (logs_import.num_saved + logs_import.num_skipped) | ||||||
|  |   end | ||||||
|  | 
 | ||||||
|  |   def collate_results_and_update_db(logs_import) | ||||||
|  |     logs_import.finished_at = Time.zone.now | ||||||
|  |     logs_import.duration_seconds = (logs_import.finished_at - logs_import.started_at).seconds.to_i | ||||||
|  |     logs_import.save! | ||||||
|  |   end | ||||||
|  | 
 | ||||||
|  |   def send_email_with_results(logs_import) | ||||||
|  |     # TODO | ||||||
|  |   end | ||||||
|  | end | ||||||
| @ -0,0 +1,2 @@ | |||||||
|  | class LogsImport < ApplicationRecord | ||||||
|  | end | ||||||
									
										
											File diff suppressed because it is too large
											Load Diff
										
									
								
							
						| @ -0,0 +1,3 @@ | |||||||
|  | if Rails.env == 'development' || Rails.env == 'test' | ||||||
|  |   # Rack::MiniProfilerRails.initialize!(Rails.application) | ||||||
|  | end | ||||||
| @ -0,0 +1,7 @@ | |||||||
|  | # Wisper global subscribers | ||||||
|  | # https://github.com/krisleech/wisper | ||||||
|  | 
 | ||||||
|  | #require_relative '../../app/listeners/lettings_logs/import_listener' | ||||||
|  | #require_relative '../../app/services/imports/lettings_logs_import_service' | ||||||
|  | 
 | ||||||
|  | # Wisper.subscribe(LettingsLogsImportListener.new, scope: [Imports::LettingsLogsImportService]) | ||||||
| @ -0,0 +1,18 @@ | |||||||
|  | class LogsImports < ActiveRecord::Migration[7.0] | ||||||
|  |   def change | ||||||
|  |     create_table :logs_imports do |t| | ||||||
|  |       t.string :run_id, null: false | ||||||
|  |       t.datetime :started_at | ||||||
|  |       t.datetime :finished_at | ||||||
|  |       t.integer :duration_seconds, default: 0 | ||||||
|  |       t.integer :total, default: 0 # Total logs to import in batch | ||||||
|  |       t.integer :num_saved, default: 0 | ||||||
|  |       t.integer :num_skipped, default: 0 | ||||||
|  | 
 | ||||||
|  |       t.jsonb :discrepancies # List of files with errors, etc | ||||||
|  |       t.jsonb :filenames # List of filenames processed | ||||||
|  | 
 | ||||||
|  |       t.timestamps | ||||||
|  |     end     | ||||||
|  |   end | ||||||
|  | end | ||||||
									
										Binary file not shown.
									
								
							
						
					Loading…
					
					
				
		Reference in new issue