Class: BatchImportRunner
- Defined in:
- backend/app/lib/job_runners/batch_import_runner.rb
Instance Method Summary collapse
Methods inherited from JobRunner
#add_success_hook, #cancelation_signaler, #canceled?, for, #initialize, #parse_job_params_string, register_for_job_type, registered_job_types, registered_runner_for, #success!, #symbol_keys
Constructor Details
This class inherits a constructor from JobRunner
Instance Method Details
#run ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'backend/app/lib/job_runners/batch_import_runner.rb', line 20 def run ticker = Ticker.new(@job) last_error = nil batch = nil success = false filenames = @json.job['filenames'] || [] import_maint_events = @json.job["import_events"] == "1" ? true : false import_subjects = @json.job["import_subjects"] == "1" ? true : false import_repository = @json.job["import_repository"] == "1" ? true : false # Wrap the import in a transaction if the DB supports MVCC begin DB.open(DB.supports_mvcc?, :retry_on_optimistic_locking_fail => true) do created_uris = [] begin @job.job_files.each_with_index do |input_file, i| ticker.log(("=" * 50) + "\n#{filenames[i]}\n" + ("=" * 50)) if filenames[i] converter = Converter.for(@json.job['import_type'], input_file.full_file_path, {:import_events => import_maint_events, :import_subjects => import_subjects, :import_repository => import_repository}) begin RequestContext.open(:create_enums => true, :current_username => @job.owner.username, :repo_id => @job.repo_id) do converter.run File.open(converter.get_output_path, "r") do |fh| batch = StreamingImport.new(fh, ticker, @import_canceled) batch.process if batch.created_records created_uris.concat(batch.created_records.values) end success = true end end ensure converter.remove_files end end # Note: it's important to call `success!` before attempting to store # the created URIs here. # # It turns out that the process of adding a new row to the # `job_created_record` table is enough to take a row-level lock on the # corresponding job entry in the `job` table (because of the foreign # key relationship). If the import thread locks that row, the # watchdog thread ends up deadlocked, and we can't finish the import # job. # # Calling `success!` ensures that the watchdog thread gets shut down. # Then it's safe for this thread to do whatever it needs to do to the # job tables. # self.success! log_created_uris(created_uris) rescue ImportCanceled raise Sequel::Rollback rescue JSONModel::ValidationException, ImportException, Converter::ConverterMappingError, Sequel::ValidationFailed, ReferenceError => e # Note: we deliberately don't catch Sequel::DatabaseError here. The # outer call to DB.open will catch that exception and retry the # import for us. last_error = e # Roll back the transaction (if there is one) raise Sequel::Rollback end end rescue # If we get to here, last_error will generally not have been set yet. The # conditional set is here to deal with code that does something like this: # # DB.open do # Start a transaction (1) # BatchImportRunner#run # Run this import process, which does another DB.open (2) # <run some other updates> # end # # The intention is to run the import and some related updates in a single # transaction, but the result is surprising: if the BatchImportRunner # fails for some reason, everything gets rolled back and the only error # logged is a Sequel::Rollback. # # What's happening here is that DB.open (2) actually didn't establish a # new transaction (since it was already running in a transaction) and, as # a result, Sequel didn't set up the begin/rescue block needed to catch # Sequel::Rollback. So the rollback exception keeps bubbling up until # it's caught by the block you're currently reading. When this happens, # last_error contains the real cause of the strife, and Sequel::Rollback # should be ignored. last_error ||= $! end if last_error ticker.log("\nIMPORT ERROR\n") if last_error.respond_to?(:errors) ticker.log("#{last_error}") if last_error.errors.empty? ticker.log("The following errors were found:\n") last_error.errors.each_pair do |k, v| ticker.log("\t#{k.to_s}: #{v.join(' -- ')}") end if last_error.is_a?(Sequel::ValidationFailed) ticker.log("\n" ) ticker.log("%" * 50 ) ticker.log("\n Full Error Message:\n #{last_error.to_s}\n\n") end if last_error.respond_to?(:invalid_object) && last_error.invalid_object ticker.log("\n\n For #{ last_error.invalid_object.class }: \n #{ last_error.invalid_object.inspect }") end if ( last_error.respond_to?(:import_context) && last_error.import_context ) ticker.log("\n\nIn : \n #{ CGI.escapeHTML( last_error.import_context ) } ") ticker.log("\n\n") end else ticker.log("Trace:" + last_error.backtrace.inspect) ticker.log("Errors: #{last_error.inspect}") Log.exception(last_error) end raise last_error end end |