]> review.fuel-infra Code Review - packages/trusty/ceph.git/blob - ceph/src/mon/MonitorDBStore.h
Upgrade to ceph 0.80.11
[packages/trusty/ceph.git] / ceph / src / mon / MonitorDBStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2012 Inktank, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13 #ifndef CEPH_MONITOR_DB_STORE_H
14 #define CEPH_MONITOR_DB_STORE_H
15
16 #include "include/types.h"
17 #include "include/buffer.h"
18 #include <set>
19 #include <map>
20 #include <string>
21 #include <boost/scoped_ptr.hpp>
22 #include <sstream>
23 #include "os/KeyValueDB.h"
24 #include "os/LevelDBStore.h"
25
26 #include "include/assert.h"
27 #include "common/Formatter.h"
28 #include "common/errno.h"
29
30 class MonitorDBStore
31 {
32   boost::scoped_ptr<LevelDBStore> db;
33   bool do_dump;
34   int dump_fd;
35
36  public:
37
38   struct Op {
39     uint8_t type;
40     string prefix;
41     string key, endkey;
42     bufferlist bl;
43
44     Op()
45       : type(0) { }
46     Op(int t, string p, string k)
47       : type(t), prefix(p), key(k) { }
48     Op(int t, const string& p, string k, bufferlist& b)
49       : type(t), prefix(p), key(k), bl(b) { }
50     Op(int t, const string& p, string start, string end)
51       : type(t), prefix(p), key(start), endkey(end) { }
52
53     void encode(bufferlist& encode_bl) const {
54       ENCODE_START(2, 1, encode_bl);
55       ::encode(type, encode_bl);
56       ::encode(prefix, encode_bl);
57       ::encode(key, encode_bl);
58       ::encode(bl, encode_bl);
59       ::encode(endkey, encode_bl);
60       ENCODE_FINISH(encode_bl);
61     }
62
63     void decode(bufferlist::iterator& decode_bl) {
64       DECODE_START(2, decode_bl);
65       ::decode(type, decode_bl);
66       ::decode(prefix, decode_bl);
67       ::decode(key, decode_bl);
68       ::decode(bl, decode_bl);
69       if (struct_v >= 2)
70         ::decode(endkey, decode_bl);
71       DECODE_FINISH(decode_bl);
72     }
73
74     void dump(Formatter *f) const {
75       f->dump_int("type", type);
76       f->dump_string("prefix", prefix);
77       f->dump_string("key", key);
78       if (endkey.length())
79         f->dump_string("endkey", endkey);
80     }
81
82     static void generate_test_instances(list<Op*>& ls) {
83       ls.push_back(new Op);
84       // we get coverage here from the Transaction instances
85     }
86   };
87
88   struct Transaction {
89     list<Op> ops;
90     uint64_t bytes, keys;
91
92     Transaction() : bytes(0), keys(0) {}
93
94     enum {
95       OP_PUT    = 1,
96       OP_ERASE  = 2,
97       OP_COMPACT = 3,
98     };
99
100     void put(string prefix, string key, bufferlist& bl) {
101       ops.push_back(Op(OP_PUT, prefix, key, bl));
102       ++keys;
103       bytes += prefix.length() + key.length() + bl.length();
104     }
105
106     void put(string prefix, version_t ver, bufferlist& bl) {
107       ostringstream os;
108       os << ver;
109       put(prefix, os.str(), bl);
110     }
111
112     void put(string prefix, string key, version_t ver) {
113       bufferlist bl;
114       ::encode(ver, bl);
115       put(prefix, key, bl);
116     }
117
118     void erase(string prefix, string key) {
119       ops.push_back(Op(OP_ERASE, prefix, key));
120       ++keys;
121       bytes += prefix.length() + key.length();
122     }
123
124     void erase(string prefix, version_t ver) {
125       ostringstream os;
126       os << ver;
127       erase(prefix, os.str());
128     }
129
130     void compact_prefix(string prefix) {
131       ops.push_back(Op(OP_COMPACT, prefix, string()));
132     }
133
134     void compact_range(string prefix, string start, string end) {
135       ops.push_back(Op(OP_COMPACT, prefix, start, end));
136     }
137
138     void encode(bufferlist& bl) const {
139       ENCODE_START(2, 1, bl);
140       ::encode(ops, bl);
141       ::encode(bytes, bl);
142       ::encode(keys, bl);
143       ENCODE_FINISH(bl);
144     }
145
146     void decode(bufferlist::iterator& bl) {
147       DECODE_START(2, bl);
148       ::decode(ops, bl);
149       if (struct_v >= 2) {
150         ::decode(bytes, bl);
151         ::decode(keys, bl);
152       }
153       DECODE_FINISH(bl);
154     }
155
156     static void generate_test_instances(list<Transaction*>& ls) {
157       ls.push_back(new Transaction);
158       ls.push_back(new Transaction);
159       bufferlist bl;
160       bl.append("value");
161       ls.back()->put("prefix", "key", bl);
162       ls.back()->erase("prefix2", "key2");
163       ls.back()->compact_prefix("prefix3");
164       ls.back()->compact_range("prefix4", "from", "to");
165     }
166
167     void append(Transaction& other) {
168       ops.splice(ops.end(), other.ops);
169       keys += other.keys;
170       bytes += other.bytes;
171     }
172
173     void append_from_encoded(bufferlist& bl) {
174       Transaction other;
175       bufferlist::iterator it = bl.begin();
176       other.decode(it);
177       append(other);
178     }
179
180     bool empty() {
181       return (size() == 0);
182     }
183
184     bool size() {
185       return ops.size();
186     }
187     uint64_t get_keys() const {
188       return keys;
189     }
190     uint64_t get_bytes() const {
191       return bytes;
192     }
193
194     void dump(ceph::Formatter *f, bool dump_val=false) const {
195       f->open_object_section("transaction");
196       f->open_array_section("ops");
197       list<Op>::const_iterator it;
198       int op_num = 0;
199       for (it = ops.begin(); it != ops.end(); ++it) {
200         const Op& op = *it;
201         f->open_object_section("op");
202         f->dump_int("op_num", op_num++);
203         switch (op.type) {
204         case OP_PUT:
205           {
206             f->dump_string("type", "PUT");
207             f->dump_string("prefix", op.prefix);
208             f->dump_string("key", op.key);
209             f->dump_unsigned("length", op.bl.length());
210             if (dump_val) {
211               ostringstream os;
212               op.bl.hexdump(os);
213               f->dump_string("bl", os.str());
214             }
215           }
216           break;
217         case OP_ERASE:
218           {
219             f->dump_string("type", "ERASE");
220             f->dump_string("prefix", op.prefix);
221             f->dump_string("key", op.key);
222           }
223           break;
224         case OP_COMPACT:
225           {
226             f->dump_string("type", "COMPACT");
227             f->dump_string("prefix", op.prefix);
228             f->dump_string("start", op.key);
229             f->dump_string("end", op.endkey);
230           }
231           break;
232         default:
233           {
234             f->dump_string("type", "unknown");
235             f->dump_unsigned("op_code", op.type);
236             break;
237           }
238         }
239         f->close_section();
240       }
241       f->close_section();
242       f->dump_unsigned("num_keys", keys);
243       f->dump_unsigned("num_bytes", bytes);
244       f->close_section();
245     }
246   };
247
248   int apply_transaction(const MonitorDBStore::Transaction& t) {
249     KeyValueDB::Transaction dbt = db->get_transaction();
250
251     if (do_dump) {
252       bufferlist bl;
253       t.encode(bl);
254       bl.write_fd(dump_fd);
255     }
256
257     list<pair<string, pair<string,string> > > compact;
258     for (list<Op>::const_iterator it = t.ops.begin(); it != t.ops.end(); ++it) {
259       const Op& op = *it;
260       switch (op.type) {
261       case Transaction::OP_PUT:
262         dbt->set(op.prefix, op.key, op.bl);
263         break;
264       case Transaction::OP_ERASE:
265         dbt->rmkey(op.prefix, op.key);
266         break;
267       case Transaction::OP_COMPACT:
268         compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey)));
269         break;
270       default:
271         derr << __func__ << " unknown op type " << op.type << dendl;
272         ceph_assert(0);
273         break;
274       }
275     }
276     int r = db->submit_transaction_sync(dbt);
277     if (r >= 0) {
278       while (!compact.empty()) {
279         if (compact.front().second.first == string() &&
280             compact.front().second.second == string())
281           db->compact_prefix_async(compact.front().first);
282         else
283           db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second);
284         compact.pop_front();
285       }
286     } else {
287       assert(0 == "failed to write to db");
288     }
289     return r;
290   }
291
292   class StoreIteratorImpl {
293   protected:
294     bool done;
295     pair<string,string> last_key;
296     bufferlist crc_bl;
297
298     StoreIteratorImpl() : done(false) { }
299     virtual ~StoreIteratorImpl() { }
300
301     bool add_chunk_entry(Transaction &tx,
302                          string &prefix,
303                          string &key,
304                          bufferlist &value,
305                          uint64_t max) {
306       Transaction tmp;
307       bufferlist tmp_bl;
308       tmp.put(prefix, key, value);
309       tmp.encode(tmp_bl);
310
311       bufferlist tx_bl;
312       tx.encode(tx_bl);
313
314       size_t len = tx_bl.length() + tmp_bl.length();
315
316       if (!tx.empty() && (len > max)) {
317         return false;
318       }
319
320       tx.append(tmp);
321       last_key.first = prefix;
322       last_key.second = key;
323
324       if (g_conf->mon_sync_debug) {
325         ::encode(prefix, crc_bl);
326         ::encode(key, crc_bl);
327         ::encode(value, crc_bl);
328       }
329
330       return true;
331     }
332
333     virtual bool _is_valid() = 0;
334
335   public:
336     __u32 crc() {
337       if (g_conf->mon_sync_debug)
338         return crc_bl.crc32c(0);
339       return 0;
340     }
341     pair<string,string> get_last_key() {
342       return last_key;
343     };
344     virtual bool has_next_chunk() {
345       return !done && _is_valid();
346     }
347     virtual void get_chunk_tx(Transaction &tx, uint64_t max) = 0;
348     virtual pair<string,string> get_next_key() = 0;
349   };
350   typedef ceph::shared_ptr<StoreIteratorImpl> Synchronizer;
351
352   class WholeStoreIteratorImpl : public StoreIteratorImpl {
353     KeyValueDB::WholeSpaceIterator iter;
354     set<string> sync_prefixes;
355
356   public:
357     WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter,
358                            set<string> &prefixes)
359       : StoreIteratorImpl(),
360         iter(iter),
361         sync_prefixes(prefixes)
362     { }
363
364     virtual ~WholeStoreIteratorImpl() { }
365
366     /**
367      * Obtain a chunk of the store
368      *
369      * @param bl            Encoded transaction that will recreate the chunk
370      * @param first_key     Pair containing the first key to obtain, and that
371      *                      will contain the first key in the chunk (that may
372      *                      differ from the one passed on to the function)
373      * @param last_key[out] Last key in the chunk
374      */
375     virtual void get_chunk_tx(Transaction &tx, uint64_t max) {
376       assert(done == false);
377       assert(iter->valid() == true);
378
379       while (iter->valid()) {
380         string prefix(iter->raw_key().first);
381         string key(iter->raw_key().second);
382         if (sync_prefixes.count(prefix)) {
383           bufferlist value = iter->value();
384           if (!add_chunk_entry(tx, prefix, key, value, max))
385             return;
386         }
387         iter->next();
388       }
389       assert(iter->valid() == false);
390       done = true;
391     }
392
393     virtual pair<string,string> get_next_key() {
394       assert(iter->valid());
395
396       for (; iter->valid(); iter->next()) {
397         pair<string,string> r = iter->raw_key();
398         if (sync_prefixes.count(r.first) > 0) {
399           iter->next();
400           return r;
401         }
402       }
403       return pair<string,string>();
404     }
405
406     virtual bool _is_valid() {
407       return iter->valid();
408     }
409   };
410
411   Synchronizer get_synchronizer(pair<string,string> &key,
412                                 set<string> &prefixes) {
413     KeyValueDB::WholeSpaceIterator iter;
414     iter = db->get_snapshot_iterator();
415
416     if (!key.first.empty() && !key.second.empty())
417       iter->upper_bound(key.first, key.second);
418     else
419       iter->seek_to_first();
420
421     return ceph::shared_ptr<StoreIteratorImpl>(
422         new WholeStoreIteratorImpl(iter, prefixes)
423     );
424   }
425
426   KeyValueDB::Iterator get_iterator(const string &prefix) {
427     assert(!prefix.empty());
428     KeyValueDB::Iterator iter = db->get_snapshot_iterator(prefix);
429     iter->seek_to_first();
430     return iter;
431   }
432
433   KeyValueDB::WholeSpaceIterator get_iterator() {
434     KeyValueDB::WholeSpaceIterator iter;
435     iter = db->get_snapshot_iterator();
436     iter->seek_to_first();
437     return iter;
438   }
439
440   int get(const string& prefix, const string& key, bufferlist& bl) {
441     set<string> k;
442     k.insert(key);
443     map<string,bufferlist> out;
444
445     db->get(prefix, k, &out);
446     if (out.empty())
447       return -ENOENT;
448     bl.append(out[key]);
449
450     return 0;
451   }
452
453   int get(const string& prefix, const version_t ver, bufferlist& bl) {
454     ostringstream os;
455     os << ver;
456     return get(prefix, os.str(), bl);
457   }
458
459   version_t get(const string& prefix, const string& key) {
460     bufferlist bl;
461     int err = get(prefix, key, bl);
462     if (err < 0) {
463       if (err == -ENOENT) // if key doesn't exist, assume its value is 0
464         return 0;
465       // we're not expecting any other negative return value, and we can't
466       // just return a negative value if we're returning a version_t
467       generic_dout(0) << "MonitorDBStore::get() error obtaining"
468                       << " (" << prefix << ":" << key << "): "
469                       << cpp_strerror(err) << dendl;
470       assert(0 == "error obtaining key");
471     }
472
473     assert(bl.length());
474     version_t ver;
475     bufferlist::iterator p = bl.begin();
476     ::decode(ver, p);
477     return ver;
478   }
479
480   bool exists(const string& prefix, const string& key) {
481     KeyValueDB::Iterator it = db->get_iterator(prefix);
482     int err = it->lower_bound(key);
483     if (err < 0)
484       return false;
485
486     return (it->valid() && it->key() == key);
487   }
488
489   bool exists(const string& prefix, version_t ver) {
490     ostringstream os;
491     os << ver;
492     return exists(prefix, os.str());
493   }
494
495   string combine_strings(const string& prefix, const string& value) {
496     string out = prefix;
497     out.push_back('_');
498     out.append(value);
499     return out;
500   }
501
502   string combine_strings(const string& prefix, const version_t ver) {
503     ostringstream os;
504     os << ver;
505     return combine_strings(prefix, os.str());
506   }
507
508   void clear(set<string>& prefixes) {
509     set<string>::iterator iter;
510     KeyValueDB::Transaction dbt = db->get_transaction();
511
512     for (iter = prefixes.begin(); iter != prefixes.end(); ++iter) {
513       dbt->rmkeys_by_prefix((*iter));
514     }
515     int r = db->submit_transaction_sync(dbt);
516     assert(r >= 0);
517   }
518
519   void init_options() {
520     db->init();
521     if (g_conf->mon_leveldb_write_buffer_size)
522       db->options.write_buffer_size = g_conf->mon_leveldb_write_buffer_size;
523     if (g_conf->mon_leveldb_cache_size)
524       db->options.cache_size = g_conf->mon_leveldb_cache_size;
525     if (g_conf->mon_leveldb_block_size)
526       db->options.block_size = g_conf->mon_leveldb_block_size;
527     if (g_conf->mon_leveldb_bloom_size)
528       db->options.bloom_size = g_conf->mon_leveldb_bloom_size;
529     if (g_conf->mon_leveldb_compression)
530       db->options.compression_enabled = g_conf->mon_leveldb_compression;
531     if (g_conf->mon_leveldb_max_open_files)
532       db->options.max_open_files = g_conf->mon_leveldb_max_open_files;
533     if (g_conf->mon_leveldb_paranoid)
534       db->options.paranoid_checks = g_conf->mon_leveldb_paranoid;
535     if (g_conf->mon_leveldb_log.length())
536       db->options.log_file = g_conf->mon_leveldb_log;
537   }
538
539   int open(ostream &out) {
540     init_options();
541     return db->open(out);
542   }
543
544   int create_and_open(ostream &out) {
545     init_options();
546     return db->create_and_open(out);
547   }
548
549   void compact() {
550     db->compact();
551   }
552
553   void compact_prefix(const string& prefix) {
554     db->compact_prefix(prefix);
555   }
556
557   uint64_t get_estimated_size(map<string, uint64_t> &extras) {
558     return db->get_estimated_size(extras);
559   }
560
561   MonitorDBStore(const string& path) :
562     db(0), do_dump(false), dump_fd(-1) {
563     string::const_reverse_iterator rit;
564     int pos = 0;
565     for (rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) {
566       if (*rit != '/')
567         break;
568     }
569     ostringstream os;
570     os << path.substr(0, path.size() - pos) << "/store.db";
571     string full_path = os.str();
572
573     LevelDBStore *db_ptr = new LevelDBStore(g_ceph_context, full_path);
574     if (!db_ptr) {
575       derr << __func__ << " error initializing level db back storage in "
576                 << full_path << dendl;
577       assert(0 != "MonitorDBStore: error initializing level db back storage");
578     }
579     db.reset(db_ptr);
580
581     if (g_conf->mon_debug_dump_transactions) {
582       do_dump = true;
583       dump_fd = ::open(
584         g_conf->mon_debug_dump_location.c_str(),
585         O_CREAT|O_APPEND|O_WRONLY, 0644);
586       if (!dump_fd) {
587         dump_fd = -errno;
588         derr << "Could not open log file, got "
589              << cpp_strerror(dump_fd) << dendl;
590       }
591     }
592   }
593   MonitorDBStore(LevelDBStore *db_ptr) :
594     db(0), do_dump(false), dump_fd(-1) {
595     db.reset(db_ptr);
596   }
597   ~MonitorDBStore() {
598     if (do_dump)
599       ::close(dump_fd);
600   }
601
602 };
603
604 WRITE_CLASS_ENCODER(MonitorDBStore::Op);
605 WRITE_CLASS_ENCODER(MonitorDBStore::Transaction);
606
607 #endif /* CEPH_MONITOR_DB_STORE_H */