Class: Job

Inherits:
Sequel::Model
  • Object
show all
Includes:
ASModel
Defined in:
backend/app/model/job.rb,
frontend/app/models/job.rb

Defined Under Namespace

Classes: JobFileStore

Constant Summary

Constants included from JSONModel

JSONModel::REFERENCE_KEY_REGEX

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ASModel

all_models, included, update_publish_flag, update_suppressed_flag

Methods included from JSONModel

JSONModel, #JSONModel, add_error_handler, all, allow_unmapped_enum_value, backend_url, check_valid_refs, client_mode?, custom_validations, destroy_model, enum_default_value, enum_values, handle_error, init, load_schema, #models, models, parse_jsonmodel_ref, parse_reference, repository, repository_for, schema_src, set_publish_flags!, set_repository, strict_mode, strict_mode?, validate_schema, with_repository

Constructor Details

#initialize(job_type, job_data, files_to_import, job_params = {}) ⇒ Job

Returns a new instance of Job.



3
4
5
6
7
8
9
10
11
12
13
# File 'frontend/app/models/job.rb', line 3

def initialize(job_type, job_data, files_to_import, job_params = {})
  if job_type == 'import_job'
    job_data[:filenames] = files_to_import.keys
  end

  @job = JSONModel(:job).from_hash(:job_type => job_type,
                                   :job => job_data,
                                   :job_params =>  ASUtils.to_json(job_params) )

  @files = files_to_import
end

Class Method Details

.activeObject



38
39
40
# File 'frontend/app/models/job.rb', line 38

def self.active
  JSONModel::HTTP::get_json(JSONModel(:job).uri_for("active"), "resolve[]" => "repository") || {'results' => []}
end

.any_running?(type) ⇒ Boolean

Returns:

  • (Boolean)


128
129
130
# File 'backend/app/model/job.rb', line 128

def self.any_running?(type)
  !self.any_repo.filter(:status => 'running').where(:job_type => type).empty?
end

.archived(page) ⇒ Object



43
44
45
# File 'frontend/app/models/job.rb', line 43

def self.archived(page)
  JSONModel::HTTP::get_json(JSONModel(:job).uri_for("archived"), :page => page, "resolve[]" => "repository") || {'results' => []}
end

.available_import_typesObject



70
71
72
# File 'frontend/app/models/job.rb', line 70

def self.available_import_types
  JSONModel::HTTP.get_json(JSONModel(:job).uri_for("import_types"))
end

.available_typesObject



65
66
67
# File 'frontend/app/models/job.rb', line 65

def self.available_types
  JSONModel::HTTP.get_json(JSONModel(:job).uri_for("types"))
end

.cancel(id) ⇒ Object



58
59
60
61
62
# File 'frontend/app/models/job.rb', line 58

def self.cancel(id)
  response = JSONModel::HTTP.post_form("#{JSONModel(:job).uri_for(id)}/cancel")

  ASUtils.json_parse(response.body)
end

.create_from_json(json, opts = {}) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'backend/app/model/job.rb', line 78

def self.create_from_json(json, opts = {})
  if json.job_params == "null"
    json.job_params = ""
  end

  # force a validation on the job
  job = JSONModel(json.job['jsonmodel_type'].intern).from_hash(json.job)

  opts = opts.merge(:job_params => ASUtils.to_json(json.job_params)) unless json.job_params.nil?
  super(json, opts.merge(:time_submitted => Time.now,
                         :owner_id => opts.fetch(:user).id,
                         :job_type => json.job['jsonmodel_type'],
                         :job_blob => ASUtils.to_json(json.job)
                        ))
end

.log(id, offset = 0, &block) ⇒ Object



48
49
50
# File 'frontend/app/models/job.rb', line 48

def self.log(id, offset = 0, &block)
  JSONModel::HTTP::stream("#{JSONModel(:job).uri_for(id)}/log", {:offset => offset}, &block)
end

.queued_jobsObject



113
114
115
# File 'backend/app/model/job.rb', line 113

def self.queued_jobs
  self.any_repo.filter(:status => 'queued').order(:time_submitted)
end

.records(id, page) ⇒ Object



53
54
55
# File 'frontend/app/models/job.rb', line 53

def self.records(id, page)
  JSONModel::HTTP::get_json("#{JSONModel(:job).uri_for(id)}/records", :page => page, "resolve[]" => "record")
end

.running_jobsObject



118
119
120
# File 'backend/app/model/job.rb', line 118

def self.running_jobs
  self.any_repo.filter(:status => 'running').order(:time_submitted)
end

.running_jobs_untouched_since(time) ⇒ Object



123
124
125
# File 'backend/app/model/job.rb', line 123

def self.running_jobs_untouched_since(time)
  self.any_repo.filter(:status => "running").where { system_mtime < time }
end

.sequel_to_jsonmodel(objs, opts = {}) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'backend/app/model/job.rb', line 95

def self.sequel_to_jsonmodel(objs, opts = {})
  jsons = super
  jsons.zip(objs).each do |json, obj|
    begin
      json.job = JSONModel(obj.type.intern).from_hash(obj.job)
    rescue JSONModel::ModelNotFound => e
      json.job = obj.job_type
      json.inactive_record = true
    end
    json.owner = obj.owner.username
    json.has_modified_records = obj.modified_records.first || obj.created_records.first
    json.queue_position = obj.queue_position if obj.status === 'queued'
  end

  jsons
end

Instance Method Details

#add_file(io) ⇒ Object



148
149
150
# File 'backend/app/model/job.rb', line 148

def add_file(io)
  add_job_file(JobFile.new(:file_path => file_store.store(io)))
end

#cancel!Object



218
219
220
221
222
223
# File 'backend/app/model/job.rb', line 218

def cancel!
  if ["queued", "running"].include? self.status
    self.status = "canceled"
    self.save
  end
end

#file_storeObject



143
144
145
# File 'backend/app/model/job.rb', line 143

def file_store
  @file_store ||= JobFileStore.new("#{type}_#{id}")
end

#finish!(status) ⇒ Object



196
197
198
199
200
201
202
203
# File 'backend/app/model/job.rb', line 196

def finish!(status)
  file_store.close_output

  self.reload
  self.status = [:canceled, :failed].include?(status) ? status.to_s : 'completed'
  self.time_finished = Time.now
  self.save
end

#get_output_stream(offset = 0) ⇒ Object



158
159
160
161
162
163
164
# File 'backend/app/model/job.rb', line 158

def get_output_stream(offset = 0)
  begin
    file_store.get_output_stream(offset)
  rescue
    [StringIO.new(""), 0]
  end
end

#jobObject



133
134
135
# File 'backend/app/model/job.rb', line 133

def job
  @job ||= ASUtils.json_parse(job_blob)
end

#queue_positionObject



181
182
183
184
185
186
# File 'backend/app/model/job.rb', line 181

def queue_position
  DB.open do |db|
    job_id = self.id
    db[:job].where { id < job_id }.where(:status => "queued").count
  end
end

#record_created_uris(uris) ⇒ Object



167
168
169
170
171
# File 'backend/app/model/job.rb', line 167

def record_created_uris(uris)
  uris.each do |uri|
    add_created_record(:record_uri => uri)
  end
end

#record_modified_uris(uris) ⇒ Object



174
175
176
177
178
# File 'backend/app/model/job.rb', line 174

def record_modified_uris(uris)
  uris.each do |uri|
    add_modified_record(:record_uri => uri)
  end
end

#running?Boolean

Returns:

  • (Boolean)


206
207
208
209
# File 'backend/app/model/job.rb', line 206

def running?
  self.reload
  self.status == 'running'
end

#start!Object



189
190
191
192
193
# File 'backend/app/model/job.rb', line 189

def start!
  self.status = 'running'
  self.time_started = Time.now
  self.save
end

#success?Boolean

Returns:

  • (Boolean)


212
213
214
215
# File 'backend/app/model/job.rb', line 212

def success?
  self.reload
  self.status == 'completed'
end

#typeObject



138
139
140
# File 'backend/app/model/job.rb', line 138

def type
  self.job_type
end

#update_mtimeObject



226
227
228
# File 'backend/app/model/job.rb', line 226

def update_mtime
  Job.update_mtime_for_ids([self.id])
end

#uploadObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'frontend/app/models/job.rb', line 16

def upload
  unless @files.empty?

    upload_files = @files.each_with_index.map {|file, i|
      (original_filename, stream) = file
      ["files[#{i}]", UploadIO.new(stream, "text/plain", original_filename)]
    }

    response = JSONModel::HTTP.post_form("#{JSONModel(:job).uri_for(nil)}_with_files",
                                         Hash[upload_files].merge('job' => @job.to_json),
                                         :multipart_form_data)
    ASUtils.json_parse(response.body)

  else

    @job.save

    {'uri' => @job.uri}
  end
end

#write_output(s) ⇒ Object



153
154
155
# File 'backend/app/model/job.rb', line 153

def write_output(s)
  file_store.write_output(s)
end