Class: Job
- Inherits:
-
Sequel::Model
- Object
- Sequel::Model
- Job
- Defined in:
- backend/app/model/job.rb,
frontend/app/models/job.rb
Defined Under Namespace
Classes: JobFileStore
Class Method Summary collapse
-
.active ⇒ Object
-
.any_running?(type) ⇒ Boolean
-
.archived(page) ⇒ Object
-
.available_import_types ⇒ Object
-
.available_types ⇒ Object
-
.cancel(id) ⇒ Object
-
.create_from_json(json, opts = {}) ⇒ Object
-
.log(id, offset = 0, &block) ⇒ Object
-
.queued_jobs ⇒ Object
-
.records(id, page) ⇒ Object
-
.running_jobs ⇒ Object
-
.running_jobs_untouched_since(time) ⇒ Object
-
.sequel_to_jsonmodel(objs, opts = {}) ⇒ Object
Instance Method Summary collapse
-
#add_file(io) ⇒ Object
-
#cancel! ⇒ Object
-
#file_store ⇒ Object
-
#finish!(status) ⇒ Object
-
#get_output_stream(offset = 0) ⇒ Object
-
#initialize(job_type, job_data, files_to_import, job_params = {}) ⇒ Job
constructor
A new instance of Job.
-
#job ⇒ Object
-
#queue_position ⇒ Object
-
#record_created_uris(uris) ⇒ Object
-
#record_modified_uris(uris) ⇒ Object
-
#running? ⇒ Boolean
-
#start! ⇒ Object
-
#success? ⇒ Boolean
-
#type ⇒ Object
-
#update_mtime ⇒ Object
-
#upload ⇒ Object
-
#write_output(s) ⇒ Object
Constructor Details
#initialize(job_type, job_data, files_to_import, job_params = {}) ⇒ Job
Returns a new instance of Job.
3 4 5 6 7 8 9 10 11 12 13 |
# File 'frontend/app/models/job.rb', line 3 def initialize(job_type, job_data, files_to_import, job_params = {}) if job_type == 'import_job' job_data[:filenames] = files_to_import.keys end @job = JSONModel(:job).from_hash(:job_type => job_type, :job => job_data, :job_params => ASUtils.to_json(job_params) ) @files = files_to_import end |
Class Method Details
.active ⇒ Object
38 39 40 |
# File 'frontend/app/models/job.rb', line 38 def self.active JSONModel::HTTP::get_json(JSONModel(:job).uri_for("active"), "resolve[]" => "repository") || {'results' => []} end |
.any_running?(type) ⇒ Boolean
128 129 130 |
# File 'backend/app/model/job.rb', line 128 def self.any_running?(type) !self.any_repo.filter(:status => 'running').where(:job_type => type).empty? end |
.archived(page) ⇒ Object
43 44 45 |
# File 'frontend/app/models/job.rb', line 43 def self.archived(page) JSONModel::HTTP::get_json(JSONModel(:job).uri_for("archived"), :page => page, "resolve[]" => "repository") || {'results' => []} end |
.available_import_types ⇒ Object
70 71 72 |
# File 'frontend/app/models/job.rb', line 70 def self.available_import_types JSONModel::HTTP.get_json(JSONModel(:job).uri_for("import_types")) end |
.available_types ⇒ Object
65 66 67 |
# File 'frontend/app/models/job.rb', line 65 def self.available_types JSONModel::HTTP.get_json(JSONModel(:job).uri_for("types")) end |
.cancel(id) ⇒ Object
58 59 60 61 62 |
# File 'frontend/app/models/job.rb', line 58 def self.cancel(id) response = JSONModel::HTTP.post_form("#{JSONModel(:job).uri_for(id)}/cancel") ASUtils.json_parse(response.body) end |
.create_from_json(json, opts = {}) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'backend/app/model/job.rb', line 78 def self.create_from_json(json, opts = {}) if json.job_params == "null" json.job_params = "" end # force a validation on the job job = JSONModel(json.job['jsonmodel_type'].intern).from_hash(json.job) opts = opts.merge(:job_params => ASUtils.to_json(json.job_params)) unless json.job_params.nil? super(json, opts.merge(:time_submitted => Time.now, :owner_id => opts.fetch(:user).id, :job_type => json.job['jsonmodel_type'], :job_blob => ASUtils.to_json(json.job) )) end |
.log(id, offset = 0, &block) ⇒ Object
48 49 50 |
# File 'frontend/app/models/job.rb', line 48 def self.log(id, offset = 0, &block) JSONModel::HTTP::stream("#{JSONModel(:job).uri_for(id)}/log", {:offset => offset}, &block) end |
.queued_jobs ⇒ Object
113 114 115 |
# File 'backend/app/model/job.rb', line 113 def self.queued_jobs self.any_repo.filter(:status => 'queued').order(:time_submitted) end |
.records(id, page) ⇒ Object
53 54 55 |
# File 'frontend/app/models/job.rb', line 53 def self.records(id, page) JSONModel::HTTP::get_json("#{JSONModel(:job).uri_for(id)}/records", :page => page, "resolve[]" => "record") end |
.running_jobs ⇒ Object
118 119 120 |
# File 'backend/app/model/job.rb', line 118 def self.running_jobs self.any_repo.filter(:status => 'running').order(:time_submitted) end |
.running_jobs_untouched_since(time) ⇒ Object
123 124 125 |
# File 'backend/app/model/job.rb', line 123 def self.running_jobs_untouched_since(time) self.any_repo.filter(:status => "running").where { system_mtime < time } end |
.sequel_to_jsonmodel(objs, opts = {}) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'backend/app/model/job.rb', line 95 def self.sequel_to_jsonmodel(objs, opts = {}) jsons = super jsons.zip(objs).each do |json, obj| begin json.job = JSONModel(obj.type.intern).from_hash(obj.job) rescue JSONModel::ModelNotFound => e json.job = obj.job_type json.inactive_record = true end json.owner = obj.owner.username json.has_modified_records = obj.modified_records.first || obj.created_records.first json.queue_position = obj.queue_position if obj.status === 'queued' end jsons end |
Instance Method Details
#add_file(io) ⇒ Object
148 149 150 |
# File 'backend/app/model/job.rb', line 148 def add_file(io) add_job_file(JobFile.new(:file_path => file_store.store(io))) end |
#cancel! ⇒ Object
218 219 220 221 222 223 |
# File 'backend/app/model/job.rb', line 218 def cancel! if ["queued", "running"].include? self.status self.status = "canceled" self.save end end |
#file_store ⇒ Object
143 144 145 |
# File 'backend/app/model/job.rb', line 143 def file_store @file_store ||= JobFileStore.new("#{type}_#{id}") end |
#finish!(status) ⇒ Object
196 197 198 199 200 201 202 203 |
# File 'backend/app/model/job.rb', line 196 def finish!(status) file_store.close_output self.reload self.status = [:canceled, :failed].include?(status) ? status.to_s : 'completed' self.time_finished = Time.now self.save end |
#get_output_stream(offset = 0) ⇒ Object
158 159 160 161 162 163 164 |
# File 'backend/app/model/job.rb', line 158 def get_output_stream(offset = 0) begin file_store.get_output_stream(offset) rescue [StringIO.new(""), 0] end end |
#job ⇒ Object
133 134 135 |
# File 'backend/app/model/job.rb', line 133 def job @job ||= ASUtils.json_parse(job_blob) end |
#queue_position ⇒ Object
181 182 183 184 185 186 |
# File 'backend/app/model/job.rb', line 181 def queue_position DB.open do |db| job_id = self.id db[:job].where { id < job_id }.where(:status => "queued").count end end |
#record_created_uris(uris) ⇒ Object
167 168 169 170 171 |
# File 'backend/app/model/job.rb', line 167 def record_created_uris(uris) uris.each do |uri| add_created_record(:record_uri => uri) end end |
#record_modified_uris(uris) ⇒ Object
174 175 176 177 178 |
# File 'backend/app/model/job.rb', line 174 def record_modified_uris(uris) uris.each do |uri| add_modified_record(:record_uri => uri) end end |
#running? ⇒ Boolean
206 207 208 209 |
# File 'backend/app/model/job.rb', line 206 def running? self.reload self.status == 'running' end |
#start! ⇒ Object
189 190 191 192 193 |
# File 'backend/app/model/job.rb', line 189 def start! self.status = 'running' self.time_started = Time.now self.save end |
#success? ⇒ Boolean
212 213 214 215 |
# File 'backend/app/model/job.rb', line 212 def success? self.reload self.status == 'completed' end |
#type ⇒ Object
138 139 140 |
# File 'backend/app/model/job.rb', line 138 def type self.job_type end |
#update_mtime ⇒ Object
226 227 228 |
# File 'backend/app/model/job.rb', line 226 def update_mtime Job.update_mtime_for_ids([self.id]) end |
#upload ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'frontend/app/models/job.rb', line 16 def upload unless @files.empty? upload_files = @files.each_with_index.map {|file, i| (original_filename, stream) = file ["files[#{i}]", UploadIO.new(stream, "text/plain", original_filename)] } response = JSONModel::HTTP.post_form("#{JSONModel(:job).uri_for(nil)}_with_files", Hash[upload_files].merge('job' => @job.to_json), :multipart_form_data) ASUtils.json_parse(response.body) else @job.save {'uri' => @job.uri} end end |
#write_output(s) ⇒ Object
153 154 155 |
# File 'backend/app/model/job.rb', line 153 def write_output(s) file_store.write_output(s) end |