Class: PeriodicIndexer

Inherits:
IndexerCommon show all
Defined in:
indexer/app/lib/periodic_indexer.rb

Direct Known Subclasses

PUIIndexer

Constant Summary collapse

WORKER_STATUS_NOTHING_INDEXED =
0
WORKER_STATUS_INDEX_SUCCESS =
1
WORKER_STATUS_INDEX_ERROR =
2

Constants inherited from IndexerCommon

IndexerCommon::EXCLUDED_STRING_VALUE_PROPERTIES

Constants included from JSONModel

JSONModel::REFERENCE_KEY_REGEX

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from IndexerCommon

#add_agents, add_attribute_to_resolve, #add_audit_info, #add_batch_hook, #add_delete_hook, #add_document_prepare_hook, #add_extents, #add_extra_documents_hook, add_indexer_initialize_hook, #add_level, #add_notes, #add_subjects, #add_subjects_subrecord, #add_summary, #add_years, #apply_pui_fields, build_fullrecord, #clean_for_sort, #clean_whitespace, #configure_doc_rules, #dedupe_by_uri, #delete_records, #do_http_request, #enum_fields, extract_string_values, generate_permutations_for_identifier, generate_sort_string_for_identifier, generate_years_for_date_range, #get_record_scope, #index_batch, #index_records, #is_repository_unpublished?, #login, pause, paused?, #paused?, #record_has_children, #record_types, #records_with_children, #reset_session, #resolved_attributes, #sanitize_json, #send_commit, #skip_index_doc?, #skip_index_record?, #solr_url, #t

Methods included from JSONModel

JSONModel, #JSONModel, add_error_handler, all, allow_unmapped_enum_value, backend_url, check_valid_refs, client_mode?, custom_validations, destroy_model, enum_default_value, enum_values, handle_error, init, load_schema, #models, models, parse_jsonmodel_ref, parse_reference, repository, repository_for, schema_src, set_publish_flags!, set_repository, strict_mode, strict_mode?, validate_schema, with_repository

Constructor Details

#initialize(backend_url = nil, state = nil, indexer_name = nil, verbose = true) ⇒ PeriodicIndexer

Returns a new instance of PeriodicIndexer.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'indexer/app/lib/periodic_indexer.rb', line 16

def initialize(backend_url = nil, state = nil, indexer_name = nil, verbose = true)
  super(backend_url || AppConfig[:backend_url])

  @indexer_name = indexer_name || 'PeriodicIndexer'
  state_class = AppConfig[:index_state_class].constantize
  @state = state || state_class.new
  @verbose = verbose

  # A small window to account for the fact that transactions might be committed
  # after the periodic indexer has checked for updates, but with timestamps from
  # prior to the check.
  @window_seconds = 30

  @time_to_sleep = AppConfig[:solr_indexing_frequency_seconds].to_i
  @thread_count = AppConfig[:indexer_thread_count].to_i
  @records_per_thread = AppConfig[:indexer_records_per_thread].to_i

  # Space out our request threads a little, such that half the threads are
  # waiting on the backend while the other half are mapping documents &
  # indexing.
  concurrent_requests = (@thread_count <= 2) ? @thread_count : (@thread_count.to_f / 2).ceil
  @backend_fetch_sem = java.util.concurrent.Semaphore.new(concurrent_requests)

  @timing = IndexerTiming.new
end

Class Method Details

.get_indexer(state = nil, name = "Staff Indexer") ⇒ Object



299
300
301
# File 'indexer/app/lib/periodic_indexer.rb', line 299

def self.get_indexer(state = nil, name = "Staff Indexer")
  indexer = self.new(AppConfig[:backend_url], state, name)
end

Instance Method Details

#fetch_records(type, ids, resolve) ⇒ Object



303
304
305
# File 'indexer/app/lib/periodic_indexer.rb', line 303

def fetch_records(type, ids, resolve)
  JSONModel(type).all(:id_set => ids.join(","), 'resolve[]' => resolve)
end

#handle_deletes(opts = {}) ⇒ Object



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'indexer/app/lib/periodic_indexer.rb', line 253

def handle_deletes(opts = {})
  start = Time.now
  last_mtime = @state.get_last_mtime('_deletes', 'deletes')
  did_something = false

  page = 1
  while true
    deletes = JSONModel::HTTP.get_json("/delete-feed", :modified_since => [last_mtime - @window_seconds, 0].max, :page => page, :page_size => @records_per_thread)

    if !deletes['results'].empty?
      did_something = true
    end

    delete_records(deletes['results'], opts)

    break if deletes['last_page'] <= page

    page += 1
  end

  if did_something
    send_commit
  end

  @state.set_last_mtime('_deletes', 'deletes', start)
end

#index_round_complete(repository) ⇒ Object



249
250
251
# File 'indexer/app/lib/periodic_indexer.rb', line 249

def index_round_complete(repository)
  # Give subclasses a place to hang custom behavior.
end

#log(line) ⇒ Object

used for just info lines



295
296
297
# File 'indexer/app/lib/periodic_indexer.rb', line 295

def log(line)
  Log.info("#{@indexer_name} [#{Time.now}] #{line}")
end

#runObject



280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'indexer/app/lib/periodic_indexer.rb', line 280

def run
  while true
    begin
      run_index_round unless paused?
    rescue
      reset_session
      Log.error($!.backtrace.join("\n"))
      Log.error($!.inspect)
    end

    sleep @time_to_sleep
  end
end

#run_index_roundObject



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'indexer/app/lib/periodic_indexer.rb', line 147

def run_index_round
  log("Running index round")

  

  # Index any repositories that were changed
  start = Time.now
  repositories = JSONModel(:repository).all('resolve[]' => resolved_attributes)

  modified_since = [@state.get_last_mtime('repositories', 'repositories') - @window_seconds, 0].max
  updated_repositories = repositories.reject {|repository| Time.parse(repository['system_mtime']).to_i < modified_since}.
  map {|repository| {
      'record' => repository.to_hash(:raw),
      'uri' => repository.uri
    }
  }

  # indexing repos is usually easy, since its unlikely there will be lots of
  # them.
  if !updated_repositories.empty?
    index_records(updated_repositories)
    send_commit
  end

  @state.set_last_mtime('repositories', 'repositories', start)

  # And any records in any repositories
  repositories.each_with_index do |repository, i|
    JSONModel.set_repository(repository.id)

    checkpoints = []

    record_types.each do |type|
      next if @@global_types.include?(type) && i > 0
      start = Time.now

      modified_since = [@state.get_last_mtime(repository.id, type) - @window_seconds, 0].max

      # we get all the ids of this record type out of the repo
      id_set = JSONModel::HTTP.get_json(JSONModel(type).uri_for, :all_ids => true, :modified_since => modified_since) || ''

      next if id_set.empty?

      indexed_count = 0

      work_queue = java.util.concurrent.LinkedBlockingQueue.new(@thread_count)

      workers = (0...@thread_count).map {|thread_idx|
        start_worker_thread(work_queue, type)
      }

      begin
        # Feed our worker threads subsets of IDs to process
        id_set.each_slice(@records_per_thread) do |id_subset|
          # This will block if all threads are currently busy indexing.
          while !work_queue.offer(id_subset, 5000, java.util.concurrent.TimeUnit::MILLISECONDS)
            # The work queue is full.  Threads might just be busy, but check
            # for failed workers too.

            # If any of the workers have caught an exception, rethrow it immediately
            workers.each do |thread|
              thread.value if thread.status.nil?
            end
          end

          indexed_count += id_subset.length
          log("~~~ Indexed #{indexed_count} of #{id_set.length} #{type} records in repository #{repository.repo_code}")
        end
      ensure
        # Once we're done, instruct the workers to finish up.
        @thread_count.times { work_queue.offer(:finished, 5000, java.util.concurrent.TimeUnit::MILLISECONDS) }
      end

      # If any worker reports that they indexed some records, we'll send a
      # commit.
      worker_statuses = workers.map {|thread|
          thread.join
          thread.value
      }

      # Commit if anything was added to Solr
      unless worker_statuses.all? {|status| status == WORKER_STATUS_NOTHING_INDEXED}
        send_commit
      end

      log("Indexed #{id_set.length} records in #{Time.now.to_i - start.to_i} seconds")

      if worker_statuses.include?(WORKER_STATUS_INDEX_ERROR)
        Log.info("Skipping update of indexer state for record type #{type} in repository #{repository.id} due to previous failures")          
      else
        @state.set_last_mtime(repository.id, type, start)
      end
    end

    index_round_complete(repository)
  end

  handle_deletes

  log("Index round complete")
end

#start_worker_thread(queue, record_type) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'indexer/app/lib/periodic_indexer.rb', line 46

def start_worker_thread(queue, record_type)
  repo_id = JSONModel.repository
  session = JSONModel::HTTP.current_backend_session

  Thread.new do
    # Each worker thread will yield a value (Thread.value) indicating either
    # "did nothing", "complete success" or "errors encountered".
    worker_status = WORKER_STATUS_NOTHING_INDEXED

    begin
      # Inherit the repo_id and user session from the parent thread
      JSONModel.set_repository(repo_id)
      JSONModel::HTTP.current_backend_session = session

      while true
        id_subset = queue.poll(10000, java.util.concurrent.TimeUnit::MILLISECONDS)

        # If the parent thread has finished, it should have pushed a :finished
        # token.  But if we time out after a reasonable amount of time, assume
        # it isn't coming back.
        break if (id_subset == :finished || id_subset.nil?)

        records = @timing.time_block(:record_fetch_ms) do
          @backend_fetch_sem.acquire
          begin
            # Happy path: we request all of our records in one shot and
            # everything goes to plan.
            begin
              fetch_records(record_type, id_subset, resolved_attributes)
            rescue
              worker_status = WORKER_STATUS_INDEX_ERROR

              # Sad path: the fetch failed for some reason, possibly because
              # one or more records are malformed and triggering a bug
              # somewhere.  Recover as best we can by fetching records
              # individually.
              salvaged_records = []

              id_subset.each do |id|
                begin
                  salvaged_records << fetch_records(record_type, [id], resolved_attributes)[0]
                rescue
                  # Not seeing new workers get started?
                  Log.error("Failed fetching #{record_type} id=#{id}: #{$!}")
                end
              end

              salvaged_records
            end
          ensure
            @backend_fetch_sem.release
          end
        end

        if !records.empty?
          if worker_status == WORKER_STATUS_NOTHING_INDEXED
            worker_status = WORKER_STATUS_INDEX_SUCCESS
          end

          begin
            # Happy path: index all of our records in one shot
            index_records(records.map {|record|
                            {
                              'record' => record.to_hash(:trusted),
                              'uri' => record.uri
                            }
                          })
          rescue
            worker_status = WORKER_STATUS_INDEX_ERROR

            # Sad path: indexing of one or more records failed, possibly due
            # to weird data or bugs in mapping rules.  Index as much as we
            # can before reporting the error.
            records.each do |record|
              begin
                index_records([
                                {
                                  'record' => record.to_hash(:trusted),
                                  'uri' => record.uri
                                }
                              ])
              rescue
                Log.error("Failure while indexing record: #{record.uri}: #{$!}")
                Log.exception($!)
              end
            end
          end
        end
      end

      worker_status
    rescue
      Log.error("Failure in #{@indexer_name} worker thread: #{$!}")
      Log.error($@.join("\n"))

      return WORKER_STATUS_INDEX_ERROR
    end
  end
end