ferret_server.rb
8.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
require 'drb'
require 'thread'
require 'yaml'
require 'erb'
################################################################################
module ActsAsFerret
module Remote
################################################################################
class Config
################################################################################
DEFAULTS = {
'host' => 'localhost',
'port' => '9009',
'cf' => "#{RAILS_ROOT}/config/ferret_server.yml",
'pid_file' => "#{RAILS_ROOT}/log/ferret_server.pid",
'log_file' => "#{RAILS_ROOT}/log/ferret_server.log",
'log_level' => 'debug',
'socket' => nil,
'script' => nil
}
################################################################################
# load the configuration file and apply default settings
def initialize (file=DEFAULTS['cf'])
@everything = YAML.load(ERB.new(IO.read(file)).result)
raise "malformed ferret server config" unless @everything.is_a?(Hash)
@config = DEFAULTS.merge(@everything[RAILS_ENV] || {})
if @everything[RAILS_ENV]
@config['uri'] = socket.nil? ? "druby://#{host}:#{port}" : "drbunix:#{socket}"
end
end
################################################################################
# treat the keys of the config data as methods
def method_missing (name, *args)
@config.has_key?(name.to_s) ? @config[name.to_s] : super
end
end
#################################################################################
# This class acts as a drb server listening for indexing and
# search requests from models declared to 'acts_as_ferret :remote => true'
#
# Usage:
# - modify RAILS_ROOT/config/ferret_server.yml to suit your needs.
# - environments for which no section in the config file exists will use
# the index locally (good for unit tests/development mode)
# - run script/ferret_server to start the server:
# script/ferret_server -e production start
# - to stop the server run
# script/ferret_server -e production stop
#
class Server
#################################################################################
# FIXME include detection of OS and include the correct file
require 'unix_daemon'
include(ActsAsFerret::Remote::UnixDaemon)
################################################################################
cattr_accessor :running
################################################################################
def initialize
ActiveRecord::Base.allow_concurrency = true
require 'ar_mysql_auto_reconnect_patch'
@cfg = ActsAsFerret::Remote::Config.new
ActiveRecord::Base.logger = @logger = Logger.new(@cfg.log_file)
ActiveRecord::Base.logger.level = Logger.const_get(@cfg.log_level.upcase) rescue Logger::DEBUG
if @cfg.script
path = File.join(RAILS_ROOT, @cfg.script)
load path
@logger.info "loaded custom startup script from #{path}"
end
end
################################################################################
# start the server as a daemon process
def start
raise "ferret_server not configured for #{RAILS_ENV}" unless (@cfg.uri rescue nil)
platform_daemon { run_drb_service }
end
################################################################################
# run the server and block until it exits
def run
raise "ferret_server not configured for #{RAILS_ENV}" unless (@cfg.uri rescue nil)
run_drb_service
end
def run_drb_service
$stdout.puts("starting ferret server...")
self.class.running = true
DRb.start_service(@cfg.uri, self)
DRb.thread.join
rescue Exception => e
@logger.error(e.to_s)
raise
end
#################################################################################
# handles all incoming method calls, and sends them on to the correct local index
# instance.
#
# Calls are not queued, so this will block until the call returned.
#
def method_missing(name, *args)
@logger.debug "\#method_missing(#{name.inspect}, #{args.inspect})"
index_name = args.shift
index = if name.to_s =~ /^multi_(.+)/
name = $1
ActsAsFerret::multi_index(index_name)
else
ActsAsFerret::get_index(index_name)
end
if index.nil?
@logger.error "\#index with name #{index_name} not found in call to #{name} with args #{args.inspect}"
raise ActsAsFerret::IndexNotDefined.new(index_name)
end
# TODO find another way to implement the reconnection logic (maybe in
# local_index or class_methods)
# reconnect_when_needed(clazz) do
# using respond_to? here so we not have to catch NoMethodError
# which would silently catch those from deep inside the indexing
# code, too...
if index.respond_to?(name)
index.send name, *args
# TODO check where we need this:
#elsif clazz.respond_to?(name)
# @logger.debug "no luck, trying to call class method instead"
# clazz.send name, *args
else
raise NoMethodError.new("method #{name} not supported by DRb server")
end
rescue => e
@logger.error "ferret server error #{$!}\n#{$!.backtrace.join "\n"}"
raise e
end
def register_class(class_name)
@logger.debug "############ registerclass #{class_name}"
class_name.constantize
@logger.debug "index for class #{class_name}: #{ActsAsFerret::ferret_indexes[class_name.underscore.to_sym]}"
end
# make sure we have a versioned index in place, building one if necessary
def ensure_index_exists(index_name)
@logger.debug "DRb server: ensure_index_exists for index #{index_name}"
definition = ActsAsFerret::get_index(index_name).index_definition
dir = definition[:index_dir]
unless File.directory?(dir) && File.file?(File.join(dir, 'segments')) && dir =~ %r{/\d+(_\d+)?$}
rebuild_index(index_name)
end
end
# disconnects the db connection for the class specified by class_name
# used only in unit tests to check the automatic reconnection feature
def db_disconnect!(class_name)
with_class class_name do |clazz|
clazz.connection.disconnect!
end
end
# hides LocalIndex#rebuild_index to implement index versioning
def rebuild_index(index_name)
definition = ActsAsFerret::get_index(index_name).index_definition.dup
models = definition[:registered_models]
index = new_index_for(definition)
# TODO fix reconnection stuff
# reconnect_when_needed(clazz) do
# @logger.debug "DRb server: rebuild index for class(es) #{models.inspect} in #{index.options[:path]}"
index.index_models models
# end
new_version = File.join definition[:index_base_dir], Time.now.utc.strftime('%Y%m%d%H%M%S')
# create a unique directory name (needed for unit tests where
# multiple rebuilds per second may occur)
if File.exists?(new_version)
i = 0
i+=1 while File.exists?("#{new_version}_#{i}")
new_version << "_#{i}"
end
File.rename index.options[:path], new_version
ActsAsFerret::change_index_dir index_name, new_version
end
protected
def reconnect_when_needed(clazz)
retried = false
begin
yield
rescue ActiveRecord::StatementInvalid => e
if e.message =~ /MySQL server has gone away/
if retried
raise e
else
@logger.info "StatementInvalid caught, trying to reconnect..."
clazz.connection.reconnect!
retried = true
retry
end
else
@logger.error "StatementInvalid caught, but unsure what to do with it: #{e}"
raise e
end
end
end
def new_index_for(index_definition)
ferret_cfg = index_definition[:ferret].dup
ferret_cfg.update :auto_flush => false,
:create => true,
:field_infos => ActsAsFerret::field_infos(index_definition),
:path => File.join(index_definition[:index_base_dir], 'rebuild')
returning Ferret::Index::Index.new(ferret_cfg) do |i|
i.batch_size = index_definition[:reindex_batch_size]
i.logger = @logger
end
end
end
end
end