Class: StreamingImport

Inherits:
Object
  • Object
show all
Defined in:
backend/app/lib/streaming_import.rb

Instance Method Summary collapse

Constructor Details

#initialize(stream, ticker, import_canceled = false, migration = false) ⇒ StreamingImport

Returns a new instance of StreamingImport.

Raises:

  • (StandardError)


14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'backend/app/lib/streaming_import.rb', line 14

def initialize(stream, ticker, import_canceled = false, migration = false)
  @import_canceled = import_canceled ? import_canceled : Atomic.new(false)
  @migration = migration ? Atomic.new(true) : Atomic.new(false)

  raise StandardError.new("Nothing to stream") unless stream

  @ticker = ticker

  with_status("Reading JSON records") do

    @ticker.tick_estimate = 1000 # this is totally made up, just want to show something

    @tempfile = ASUtils.tempfile('import_stream')

    begin
      while !(buf = stream.read(4096)).nil?
        @tempfile.write(buf)
        ticker.tick
      end
    ensure
      @tempfile.close
    end

  end

  @jstream = StreamingJsonReader.new(@tempfile.path)

  if @jstream.empty?
    @ticker.log("No records were found in the input file!")
  end

  with_status("Validating records and checking links") do
    @logical_urls = load_logical_urls
  end

  with_status("Evaluating record relationships") do
    @dependencies, @position_offsets = load_dependencies
  end

  @limbs_for_reattaching = {}
end

Instance Method Details

#abort_if_import_canceledObject



62
63
64
65
66
67
# File 'backend/app/lib/streaming_import.rb', line 62

def abort_if_import_canceled
  if @import_canceled.value
    @ticker.log("Import canceled!")
    raise ImportCanceled.new
  end
end

#created_recordsObject



57
58
59
# File 'backend/app/lib/streaming_import.rb', line 57

def created_records
  @logical_urls.reject {|k, v| v.nil?}
end

#processObject



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'backend/app/lib/streaming_import.rb', line 70

def process
  round = 0
  finished = true

  begin
    with_status("Looking for cyclic relationships") do
      uris_causing_cycles = []

      CycleFinder.new(@dependencies, @ticker).each do |cycle_uri|
        uris_causing_cycles << cycle_uri unless uris_causing_cycles.include?(cycle_uri)
      end

      create_records_without_relationships(uris_causing_cycles)
    end

    # Now we know our data is acyclic, we can run rounds without thinking
    # about it.
    while true
      round += 1

      finished = true
      progressed = false

      with_status("Saving records: cycle #{round}") do
        @ticker.tick_estimate = @jstream.count
        @jstream.each do |rec|
          abort_if_import_canceled

          uri = rec['uri']
          dependencies = @dependencies[uri]

          if !@logical_urls[uri] && dependencies.all? {|d| @logical_urls[d]}
            # migrate it
            @logical_urls[uri] = do_create(rewrite(rec, @logical_urls))

            # Now that it's created, we don't need to see the JSON record for
            # this again either.  This will speed up subsequent cycles.
            @jstream.delete_current

            progressed = true
          end

          if !@logical_urls[uri]
            finished = false
          end

          @ticker.tick
        end
      end

      if finished
        break
      end

      if !progressed
        raise "Failed to make any progress on the current import cycle.  This shouldn't happen!"
      end
    end

  ensure
    with_status("Cleaning up") do
      if finished
        reattach_severed_limbs
        touch_toplevel_records
      end

      cleanup
    end
  end

  @logical_urls
end