Class: StreamingJsonReader

Inherits:
Object
  • Object
show all
Defined in:
backend/app/lib/streaming_json_reader.rb

Overview

Reads a large file of JSON records in a manner that only keeps one record in memory at a time.

Instance Method Summary collapse

Constructor Details

#initialize(filename) ⇒ StreamingJsonReader

Returns a new instance of StreamingJsonReader.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'backend/app/lib/streaming_json_reader.rb', line 6

def initialize(filename)
  @filename = filename

  # The indexes of any records marked as deleted.  We'll skip those on
  # subsequent iterations.
  @deleted_entries = java.util.BitSet.new

  # The total number of records in the underlying file (set upon the first
  # iteration and constant after that)
  @count = nil

  # The record number we've just yielded to the caller's `.each` block
  @record_index = 0

  # Unfortunate to need this: we need a way of skipping the commas between
  # incoming records.
  #
  # Calling parser.nextToken does discard them, but requires catching an
  # exception, which adds a lot of overhead (about 30 seconds per import cycle
  # for 500,000 records instead of ~5 seconds using this method).
  #
  @skip_next_character = org.codehaus.jackson.impl.ReaderBasedParser.java_class.declared_method("_skipWSOrEnd")
  @skip_next_character.accessible = true
end

Instance Method Details

#countObject

The number of non-deleted records available for reading.



100
101
102
103
104
105
106
# File 'backend/app/lib/streaming_json_reader.rb', line 100

def count
  if @count
    @count - @deleted_entries.cardinality
  else
    determine_count
  end
end

#delete_currentObject

Mark the record last yielded as deleted.



95
96
97
# File 'backend/app/lib/streaming_json_reader.rb', line 95

def delete_current
  @deleted_entries.set(@record_index)
end

#determine_countObject

Fly through our file to work out how many records we have



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'backend/app/lib/streaming_json_reader.rb', line 39

def determine_count
  if empty?
    @count = 0
    return
  end

  result = 0

  with_record_stream do |stream|
    mapper = org.codehaus.jackson.map.ObjectMapper.new
    parser = mapper.getJsonFactory.createJsonParser(stream)

    while parser.nextToken
      result += 1
      parser.skipChildren
      skip_comma(parser)
    end
  end

  @count = result
end

#eachObject

Parse and yield each record from our underlying JSON file. If you call delete_current we’ll mark the record we just handed you as deleted, and it will be skipped in subsequent iterations.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'backend/app/lib/streaming_json_reader.rb', line 65

def each
  return if empty?

  @record_index = -1
  with_record_stream do |stream|
    mapper = org.codehaus.jackson.map.ObjectMapper.new
    parser = mapper.getJsonFactory.createJsonParser(stream)

    while parser.nextToken
      @record_index += 1

      if @deleted_entries.get(@record_index)
        # Skip this entry
        parser.skipChildren
      else
        result = parser.readValueAs(java.util.Map.java_class)
        yield result
      end

      skip_comma(parser)
    end

    unless @count
      @count = @record_index + 1
    end
  end
end

#empty?Boolean

True if the underlying JSON file was empty

Returns:

  • (Boolean)


33
34
35
# File 'backend/app/lib/streaming_json_reader.rb', line 33

def empty?
  File.size(@filename) <= 2
end