Class: Job

Inherits:
Sequel::Model
  • Object
show all
Includes:
ASModel
Defined in:
backend/app/model/job.rb,
frontend/app/models/job.rb

Defined Under Namespace

Classes: JobFileStore

Constant Summary

Constants included from JSONModel

JSONModel::REFERENCE_KEY_REGEX

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ASModel

all_models, included, update_publish_flag, update_suppressed_flag

Methods included from JSONModel

JSONModel, #JSONModel, add_error_handler, all, allow_unmapped_enum_value, backend_url, check_valid_refs, client_mode?, custom_validations, destroy_model, enum_default_value, enum_values, handle_error, init, load_schema, #models, models, parse_jsonmodel_ref, parse_reference, repository, repository_for, schema_src, set_publish_flags!, set_repository, strict_mode, strict_mode?, validate_schema, with_repository

Constructor Details

#initialize(job_type, job_data, files_to_import, job_params = {}) ⇒ Job

Returns a new instance of Job.



3
4
5
6
7
8
9
10
11
12
13
# File 'frontend/app/models/job.rb', line 3

def initialize(job_type, job_data, files_to_import, job_params = {})
  if job_type == 'import_job'
    job_data[:filenames] = files_to_import.keys
  end

  @job = JSONModel(:job).from_hash(:job_type => job_type,
                                   :job => job_data,
                                   :job_params =>  ASUtils.to_json(job_params) )

  @files = files_to_import
end

Class Method Details

.activeObject



38
39
40
# File 'frontend/app/models/job.rb', line 38

def self.active
  JSONModel::HTTP::get_json(JSONModel(:job).uri_for("active"), "resolve[]" => "repository") || {'results' => []}
end

.any_running?(type) ⇒ Boolean

Returns:

  • (Boolean)


122
123
124
# File 'backend/app/model/job.rb', line 122

def self.any_running?(type)
  !self.any_repo.filter(:status => 'running').where(:job_type => type).empty?
end

.archived(page) ⇒ Object



43
44
45
# File 'frontend/app/models/job.rb', line 43

def self.archived(page)
  JSONModel::HTTP::get_json(JSONModel(:job).uri_for("archived"), :page => page, "resolve[]" => "repository") || {'results' => []}
end

.available_import_typesObject



70
71
72
# File 'frontend/app/models/job.rb', line 70

def self.available_import_types
  JSONModel::HTTP.get_json(JSONModel(:job).uri_for("import_types"))
end

.available_typesObject



65
66
67
# File 'frontend/app/models/job.rb', line 65

def self.available_types
  JSONModel::HTTP.get_json(JSONModel(:job).uri_for("types"))
end

.cancel(id) ⇒ Object



58
59
60
61
62
# File 'frontend/app/models/job.rb', line 58

def self.cancel(id)
  response = JSONModel::HTTP.post_form("#{JSONModel(:job).uri_for(id)}/cancel")

  ASUtils.json_parse(response.body)
end

.create_from_json(json, opts = {}) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'backend/app/model/job.rb', line 78

def self.create_from_json(json, opts = {})
  if json.job_params == "null"
    json.job_params = ""
  end

  # force a validation on the job
  job = JSONModel(json.job['jsonmodel_type'].intern).from_hash(json.job)

  super(json, opts.merge(:time_submitted => Time.now,
                         :owner_id => opts.fetch(:user).id,
                         :job_type => json.job['jsonmodel_type'],
                         :job_blob => ASUtils.to_json(json.job),
                         :job_params => ASUtils.to_json(json.job_params)
                        ))
end

.log(id, offset = 0, &block) ⇒ Object



48
49
50
# File 'frontend/app/models/job.rb', line 48

def self.log(id, offset = 0, &block)
  JSONModel::HTTP::stream("#{JSONModel(:job).uri_for(id)}/log", {:offset => offset}, &block)
end

.queued_jobsObject



107
108
109
# File 'backend/app/model/job.rb', line 107

def self.queued_jobs
  self.any_repo.filter(:status => 'queued').order(:time_submitted)
end

.records(id, page) ⇒ Object



53
54
55
# File 'frontend/app/models/job.rb', line 53

def self.records(id, page)
  JSONModel::HTTP::get_json("#{JSONModel(:job).uri_for(id)}/records", :page => page, "resolve[]" => "record")
end

.running_jobsObject



112
113
114
# File 'backend/app/model/job.rb', line 112

def self.running_jobs
  self.any_repo.filter(:status => 'running').order(:time_submitted)
end

.running_jobs_untouched_since(time) ⇒ Object



117
118
119
# File 'backend/app/model/job.rb', line 117

def self.running_jobs_untouched_since(time)
  self.any_repo.filter(:status => "running").where { system_mtime < time }
end

.sequel_to_jsonmodel(objs, opts = {}) ⇒ Object



95
96
97
98
99
100
101
102
103
104
# File 'backend/app/model/job.rb', line 95

def self.sequel_to_jsonmodel(objs, opts = {})
  jsons = super
  jsons.zip(objs).each do |json, obj|
    json.job = JSONModel(obj.type.intern).from_hash(obj.job)
    json.owner = obj.owner.username
    json.queue_position = obj.queue_position if obj.status === 'queued'
  end

  jsons
end

Instance Method Details

#add_file(io) ⇒ Object



142
143
144
# File 'backend/app/model/job.rb', line 142

def add_file(io)
  add_job_file(JobFile.new(:file_path => file_store.store(io)))
end

#cancel!Object



212
213
214
215
216
217
# File 'backend/app/model/job.rb', line 212

def cancel!
  if ["queued", "running"].include? self.status
    self.status = "canceled"
    self.save
  end
end

#file_storeObject



137
138
139
# File 'backend/app/model/job.rb', line 137

def file_store
  @file_store ||= JobFileStore.new("#{type}_#{id}")
end

#finish!(status) ⇒ Object



190
191
192
193
194
195
196
197
# File 'backend/app/model/job.rb', line 190

def finish!(status)
  file_store.close_output

  self.reload
  self.status = [:canceled, :failed].include?(status) ? status.to_s : 'completed'
  self.time_finished = Time.now
  self.save
end

#get_output_stream(offset = 0) ⇒ Object



152
153
154
155
156
157
158
# File 'backend/app/model/job.rb', line 152

def get_output_stream(offset = 0)
  begin
    file_store.get_output_stream(offset)
  rescue
    [StringIO.new(""), 0]
  end
end

#jobObject



127
128
129
# File 'backend/app/model/job.rb', line 127

def job
  @job ||= ASUtils.json_parse(job_blob)
end

#queue_positionObject



175
176
177
178
179
180
# File 'backend/app/model/job.rb', line 175

def queue_position
  DB.open do |db|
    job_id = self.id
    db[:job].where { id < job_id }.where(:status => "queued").count
  end
end

#record_created_uris(uris) ⇒ Object



161
162
163
164
165
# File 'backend/app/model/job.rb', line 161

def record_created_uris(uris)
  uris.each do |uri|
    add_created_record(:record_uri => uri)
  end
end

#record_modified_uris(uris) ⇒ Object



168
169
170
171
172
# File 'backend/app/model/job.rb', line 168

def record_modified_uris(uris)
  uris.each do |uri|
    add_modified_record(:record_uri => uri)
  end
end

#running?Boolean

Returns:

  • (Boolean)


200
201
202
203
# File 'backend/app/model/job.rb', line 200

def running?
  self.reload
  self.status == 'running'
end

#start!Object



183
184
185
186
187
# File 'backend/app/model/job.rb', line 183

def start!
  self.status = 'running'
  self.time_started = Time.now
  self.save
end

#success?Boolean

Returns:

  • (Boolean)


206
207
208
209
# File 'backend/app/model/job.rb', line 206

def success?
  self.reload
  self.status == 'completed'
end

#typeObject



132
133
134
# File 'backend/app/model/job.rb', line 132

def type
  self.job_type
end

#update_mtimeObject



220
221
222
# File 'backend/app/model/job.rb', line 220

def update_mtime
  Job.update_mtime_for_ids([self.id])
end

#uploadObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'frontend/app/models/job.rb', line 16

def upload
  unless @files.empty?

    upload_files = @files.each_with_index.map {|file, i|
      (original_filename, stream) = file
      ["files[#{i}]", UploadIO.new(stream, "text/plain", original_filename)]
    }

    response = JSONModel::HTTP.post_form("#{JSONModel(:job).uri_for(nil)}_with_files",
                                         Hash[upload_files].merge('job' => @job.to_json),
                                         :multipart_form_data)
    ASUtils.json_parse(response.body)

  else

    @job.save

    {'uri' => @job.uri}
  end
end

#write_output(s) ⇒ Object



147
148
149
# File 'backend/app/model/job.rb', line 147

def write_output(s)
  file_store.write_output(s)
end