Class: Job
- Inherits:
-
Sequel::Model
show all
- Includes:
- ASModel
- Defined in:
- backend/app/model/job.rb,
frontend/app/models/job.rb
Defined Under Namespace
Classes: JobFileStore
Constant Summary
Constants included
from JSONModel
JSONModel::REFERENCE_KEY_REGEX
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from ASModel
all_models, included, update_publish_flag, update_suppressed_flag
Methods included from JSONModel
JSONModel, #JSONModel, add_error_handler, all, allow_unmapped_enum_value, backend_url, check_valid_refs, client_mode?, custom_validations, destroy_model, enum_default_value, enum_values, handle_error, init, load_schema, #models, models, parse_jsonmodel_ref, parse_reference, repository, repository_for, schema_src, set_publish_flags!, set_repository, strict_mode, strict_mode?, validate_schema, with_repository
Constructor Details
#initialize(job_type, job_data, files_to_import, job_params = {}) ⇒ Job
Returns a new instance of Job.
3
4
5
6
7
8
9
10
11
12
13
|
# File 'frontend/app/models/job.rb', line 3
def initialize(job_type, job_data, files_to_import, job_params = {})
if job_type == 'import_job'
job_data[:filenames] = files_to_import.keys
end
@job = JSONModel(:job).from_hash(:job_type => job_type,
:job => job_data,
:job_params => ASUtils.to_json(job_params) )
@files = files_to_import
end
|
Class Method Details
.active ⇒ Object
38
39
40
|
# File 'frontend/app/models/job.rb', line 38
def self.active
JSONModel::HTTP::get_json(JSONModel(:job).uri_for("active"), "resolve[]" => "repository") || {'results' => []}
end
|
.any_running?(type) ⇒ Boolean
128
129
130
|
# File 'backend/app/model/job.rb', line 128
def self.any_running?(type)
!self.any_repo.filter(:status => 'running').where(:job_type => type).empty?
end
|
.archived(page) ⇒ Object
43
44
45
|
# File 'frontend/app/models/job.rb', line 43
def self.archived(page)
JSONModel::HTTP::get_json(JSONModel(:job).uri_for("archived"), :page => page, "resolve[]" => "repository") || {'results' => []}
end
|
.available_import_types ⇒ Object
70
71
72
|
# File 'frontend/app/models/job.rb', line 70
def self.available_import_types
JSONModel::HTTP.get_json(JSONModel(:job).uri_for("import_types"))
end
|
.available_types ⇒ Object
65
66
67
|
# File 'frontend/app/models/job.rb', line 65
def self.available_types
JSONModel::HTTP.get_json(JSONModel(:job).uri_for("types"))
end
|
.cancel(id) ⇒ Object
58
59
60
61
62
|
# File 'frontend/app/models/job.rb', line 58
def self.cancel(id)
response = JSONModel::HTTP.post_form("#{JSONModel(:job).uri_for(id)}/cancel")
ASUtils.json_parse(response.body)
end
|
.create_from_json(json, opts = {}) ⇒ Object
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
# File 'backend/app/model/job.rb', line 78
def self.create_from_json(json, opts = {})
if json.job_params == "null"
json.job_params = ""
end
job = JSONModel(json.job['jsonmodel_type'].intern).from_hash(json.job)
opts = opts.merge(:job_params => ASUtils.to_json(json.job_params)) unless json.job_params.nil?
super(json, opts.merge(:time_submitted => Time.now,
:owner_id => opts.fetch(:user).id,
:job_type => json.job['jsonmodel_type'],
:job_blob => ASUtils.to_json(json.job)
))
end
|
.log(id, offset = 0, &block) ⇒ Object
48
49
50
|
# File 'frontend/app/models/job.rb', line 48
def self.log(id, offset = 0, &block)
JSONModel::HTTP::stream("#{JSONModel(:job).uri_for(id)}/log", {:offset => offset}, &block)
end
|
.queued_jobs ⇒ Object
113
114
115
|
# File 'backend/app/model/job.rb', line 113
def self.queued_jobs
self.any_repo.filter(:status => 'queued').order(:time_submitted)
end
|
.records(id, page) ⇒ Object
53
54
55
|
# File 'frontend/app/models/job.rb', line 53
def self.records(id, page)
JSONModel::HTTP::get_json("#{JSONModel(:job).uri_for(id)}/records", :page => page, "resolve[]" => "record")
end
|
.running_jobs ⇒ Object
118
119
120
|
# File 'backend/app/model/job.rb', line 118
def self.running_jobs
self.any_repo.filter(:status => 'running').order(:time_submitted)
end
|
.running_jobs_untouched_since(time) ⇒ Object
123
124
125
|
# File 'backend/app/model/job.rb', line 123
def self.running_jobs_untouched_since(time)
self.any_repo.filter(:status => "running").where { system_mtime < time }
end
|
.sequel_to_jsonmodel(objs, opts = {}) ⇒ Object
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
# File 'backend/app/model/job.rb', line 95
def self.sequel_to_jsonmodel(objs, opts = {})
jsons = super
jsons.zip(objs).each do |json, obj|
begin
json.job = JSONModel(obj.type.intern).from_hash(obj.job)
rescue JSONModel::ModelNotFound => e
json.job = obj.job_type
json.inactive_record = true
end
json.owner = obj.owner.username
json.has_modified_records = obj.modified_records.first || obj.created_records.first
json.queue_position = obj.queue_position if obj.status === 'queued'
end
jsons
end
|
Instance Method Details
#add_file(io) ⇒ Object
148
149
150
|
# File 'backend/app/model/job.rb', line 148
def add_file(io)
add_job_file(JobFile.new(:file_path => file_store.store(io)))
end
|
#cancel! ⇒ Object
218
219
220
221
222
223
|
# File 'backend/app/model/job.rb', line 218
def cancel!
if ["queued", "running"].include? self.status
self.status = "canceled"
self.save
end
end
|
#file_store ⇒ Object
143
144
145
|
# File 'backend/app/model/job.rb', line 143
def file_store
@file_store ||= JobFileStore.new("#{type}_#{id}")
end
|
#finish!(status) ⇒ Object
196
197
198
199
200
201
202
203
|
# File 'backend/app/model/job.rb', line 196
def finish!(status)
file_store.close_output
self.reload
self.status = [:canceled, :failed].include?(status) ? status.to_s : 'completed'
self.time_finished = Time.now
self.save
end
|
#get_output_stream(offset = 0) ⇒ Object
158
159
160
161
162
163
164
|
# File 'backend/app/model/job.rb', line 158
def get_output_stream(offset = 0)
begin
file_store.get_output_stream(offset)
rescue
[StringIO.new(""), 0]
end
end
|
#job ⇒ Object
133
134
135
|
# File 'backend/app/model/job.rb', line 133
def job
@job ||= ASUtils.json_parse(job_blob)
end
|
#queue_position ⇒ Object
181
182
183
184
185
186
|
# File 'backend/app/model/job.rb', line 181
def queue_position
DB.open do |db|
job_id = self.id
db[:job].where { id < job_id }.where(:status => "queued").count
end
end
|
#record_created_uris(uris) ⇒ Object
167
168
169
170
171
|
# File 'backend/app/model/job.rb', line 167
def record_created_uris(uris)
uris.each do |uri|
add_created_record(:record_uri => uri)
end
end
|
#record_modified_uris(uris) ⇒ Object
174
175
176
177
178
|
# File 'backend/app/model/job.rb', line 174
def record_modified_uris(uris)
uris.each do |uri|
add_modified_record(:record_uri => uri)
end
end
|
#running? ⇒ Boolean
206
207
208
209
|
# File 'backend/app/model/job.rb', line 206
def running?
self.reload
self.status == 'running'
end
|
#start! ⇒ Object
189
190
191
192
193
|
# File 'backend/app/model/job.rb', line 189
def start!
self.status = 'running'
self.time_started = Time.now
self.save
end
|
#success? ⇒ Boolean
212
213
214
215
|
# File 'backend/app/model/job.rb', line 212
def success?
self.reload
self.status == 'completed'
end
|
#type ⇒ Object
138
139
140
|
# File 'backend/app/model/job.rb', line 138
def type
self.job_type
end
|
#update_mtime ⇒ Object
226
227
228
|
# File 'backend/app/model/job.rb', line 226
def update_mtime
Job.update_mtime_for_ids([self.id])
end
|
#upload ⇒ Object
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
# File 'frontend/app/models/job.rb', line 16
def upload
unless @files.empty?
upload_files = @files.each_with_index.map {|file, i|
(original_filename, stream) = file
["files[#{i}]", UploadIO.new(stream, "text/plain", original_filename)]
}
response = JSONModel::HTTP.post_form("#{JSONModel(:job).uri_for(nil)}_with_files",
Hash[upload_files].merge('job' => @job.to_json),
:multipart_form_data)
ASUtils.json_parse(response.body)
else
@job.save
{'uri' => @job.uri}
end
end
|
#write_output(s) ⇒ Object
153
154
155
|
# File 'backend/app/model/job.rb', line 153
def write_output(s)
file_store.write_output(s)
end
|