1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2012 Inktank, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
13 #ifndef CEPH_MONITOR_DB_STORE_H
14 #define CEPH_MONITOR_DB_STORE_H
16 #include "include/types.h"
17 #include "include/buffer.h"
21 #include <boost/scoped_ptr.hpp>
23 #include "os/KeyValueDB.h"
24 #include "os/LevelDBStore.h"
26 #include "include/assert.h"
27 #include "common/Formatter.h"
28 #include "common/errno.h"
32 boost::scoped_ptr<LevelDBStore> db;
46 Op(int t, string p, string k)
47 : type(t), prefix(p), key(k) { }
48 Op(int t, const string& p, string k, bufferlist& b)
49 : type(t), prefix(p), key(k), bl(b) { }
50 Op(int t, const string& p, string start, string end)
51 : type(t), prefix(p), key(start), endkey(end) { }
53 void encode(bufferlist& encode_bl) const {
54 ENCODE_START(2, 1, encode_bl);
55 ::encode(type, encode_bl);
56 ::encode(prefix, encode_bl);
57 ::encode(key, encode_bl);
58 ::encode(bl, encode_bl);
59 ::encode(endkey, encode_bl);
60 ENCODE_FINISH(encode_bl);
63 void decode(bufferlist::iterator& decode_bl) {
64 DECODE_START(2, decode_bl);
65 ::decode(type, decode_bl);
66 ::decode(prefix, decode_bl);
67 ::decode(key, decode_bl);
68 ::decode(bl, decode_bl);
70 ::decode(endkey, decode_bl);
71 DECODE_FINISH(decode_bl);
74 void dump(Formatter *f) const {
75 f->dump_int("type", type);
76 f->dump_string("prefix", prefix);
77 f->dump_string("key", key);
79 f->dump_string("endkey", endkey);
82 static void generate_test_instances(list<Op*>& ls) {
84 // we get coverage here from the Transaction instances
92 Transaction() : bytes(0), keys(0) {}
100 void put(string prefix, string key, bufferlist& bl) {
101 ops.push_back(Op(OP_PUT, prefix, key, bl));
103 bytes += prefix.length() + key.length() + bl.length();
106 void put(string prefix, version_t ver, bufferlist& bl) {
109 put(prefix, os.str(), bl);
112 void put(string prefix, string key, version_t ver) {
115 put(prefix, key, bl);
118 void erase(string prefix, string key) {
119 ops.push_back(Op(OP_ERASE, prefix, key));
121 bytes += prefix.length() + key.length();
124 void erase(string prefix, version_t ver) {
127 erase(prefix, os.str());
130 void compact_prefix(string prefix) {
131 ops.push_back(Op(OP_COMPACT, prefix, string()));
134 void compact_range(string prefix, string start, string end) {
135 ops.push_back(Op(OP_COMPACT, prefix, start, end));
138 void encode(bufferlist& bl) const {
139 ENCODE_START(2, 1, bl);
146 void decode(bufferlist::iterator& bl) {
156 static void generate_test_instances(list<Transaction*>& ls) {
157 ls.push_back(new Transaction);
158 ls.push_back(new Transaction);
161 ls.back()->put("prefix", "key", bl);
162 ls.back()->erase("prefix2", "key2");
163 ls.back()->compact_prefix("prefix3");
164 ls.back()->compact_range("prefix4", "from", "to");
167 void append(Transaction& other) {
168 ops.splice(ops.end(), other.ops);
170 bytes += other.bytes;
173 void append_from_encoded(bufferlist& bl) {
175 bufferlist::iterator it = bl.begin();
181 return (size() == 0);
187 uint64_t get_keys() const {
190 uint64_t get_bytes() const {
194 void dump(ceph::Formatter *f, bool dump_val=false) const {
195 f->open_object_section("transaction");
196 f->open_array_section("ops");
197 list<Op>::const_iterator it;
199 for (it = ops.begin(); it != ops.end(); ++it) {
201 f->open_object_section("op");
202 f->dump_int("op_num", op_num++);
206 f->dump_string("type", "PUT");
207 f->dump_string("prefix", op.prefix);
208 f->dump_string("key", op.key);
209 f->dump_unsigned("length", op.bl.length());
213 f->dump_string("bl", os.str());
219 f->dump_string("type", "ERASE");
220 f->dump_string("prefix", op.prefix);
221 f->dump_string("key", op.key);
226 f->dump_string("type", "COMPACT");
227 f->dump_string("prefix", op.prefix);
228 f->dump_string("start", op.key);
229 f->dump_string("end", op.endkey);
234 f->dump_string("type", "unknown");
235 f->dump_unsigned("op_code", op.type);
242 f->dump_unsigned("num_keys", keys);
243 f->dump_unsigned("num_bytes", bytes);
248 int apply_transaction(const MonitorDBStore::Transaction& t) {
249 KeyValueDB::Transaction dbt = db->get_transaction();
254 bl.write_fd(dump_fd);
257 list<pair<string, pair<string,string> > > compact;
258 for (list<Op>::const_iterator it = t.ops.begin(); it != t.ops.end(); ++it) {
261 case Transaction::OP_PUT:
262 dbt->set(op.prefix, op.key, op.bl);
264 case Transaction::OP_ERASE:
265 dbt->rmkey(op.prefix, op.key);
267 case Transaction::OP_COMPACT:
268 compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey)));
271 derr << __func__ << " unknown op type " << op.type << dendl;
276 int r = db->submit_transaction_sync(dbt);
278 while (!compact.empty()) {
279 if (compact.front().second.first == string() &&
280 compact.front().second.second == string())
281 db->compact_prefix_async(compact.front().first);
283 db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second);
287 assert(0 == "failed to write to db");
292 class StoreIteratorImpl {
295 pair<string,string> last_key;
298 StoreIteratorImpl() : done(false) { }
299 virtual ~StoreIteratorImpl() { }
301 bool add_chunk_entry(Transaction &tx,
308 tmp.put(prefix, key, value);
314 size_t len = tx_bl.length() + tmp_bl.length();
316 if (!tx.empty() && (len > max)) {
321 last_key.first = prefix;
322 last_key.second = key;
324 if (g_conf->mon_sync_debug) {
325 ::encode(prefix, crc_bl);
326 ::encode(key, crc_bl);
327 ::encode(value, crc_bl);
333 virtual bool _is_valid() = 0;
337 if (g_conf->mon_sync_debug)
338 return crc_bl.crc32c(0);
341 pair<string,string> get_last_key() {
344 virtual bool has_next_chunk() {
345 return !done && _is_valid();
347 virtual void get_chunk_tx(Transaction &tx, uint64_t max) = 0;
348 virtual pair<string,string> get_next_key() = 0;
350 typedef ceph::shared_ptr<StoreIteratorImpl> Synchronizer;
352 class WholeStoreIteratorImpl : public StoreIteratorImpl {
353 KeyValueDB::WholeSpaceIterator iter;
354 set<string> sync_prefixes;
357 WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter,
358 set<string> &prefixes)
359 : StoreIteratorImpl(),
361 sync_prefixes(prefixes)
364 virtual ~WholeStoreIteratorImpl() { }
367 * Obtain a chunk of the store
369 * @param bl Encoded transaction that will recreate the chunk
370 * @param first_key Pair containing the first key to obtain, and that
371 * will contain the first key in the chunk (that may
372 * differ from the one passed on to the function)
373 * @param last_key[out] Last key in the chunk
375 virtual void get_chunk_tx(Transaction &tx, uint64_t max) {
376 assert(done == false);
377 assert(iter->valid() == true);
379 while (iter->valid()) {
380 string prefix(iter->raw_key().first);
381 string key(iter->raw_key().second);
382 if (sync_prefixes.count(prefix)) {
383 bufferlist value = iter->value();
384 if (!add_chunk_entry(tx, prefix, key, value, max))
389 assert(iter->valid() == false);
393 virtual pair<string,string> get_next_key() {
394 assert(iter->valid());
396 for (; iter->valid(); iter->next()) {
397 pair<string,string> r = iter->raw_key();
398 if (sync_prefixes.count(r.first) > 0) {
403 return pair<string,string>();
406 virtual bool _is_valid() {
407 return iter->valid();
411 Synchronizer get_synchronizer(pair<string,string> &key,
412 set<string> &prefixes) {
413 KeyValueDB::WholeSpaceIterator iter;
414 iter = db->get_snapshot_iterator();
416 if (!key.first.empty() && !key.second.empty())
417 iter->upper_bound(key.first, key.second);
419 iter->seek_to_first();
421 return ceph::shared_ptr<StoreIteratorImpl>(
422 new WholeStoreIteratorImpl(iter, prefixes)
426 KeyValueDB::Iterator get_iterator(const string &prefix) {
427 assert(!prefix.empty());
428 KeyValueDB::Iterator iter = db->get_snapshot_iterator(prefix);
429 iter->seek_to_first();
433 KeyValueDB::WholeSpaceIterator get_iterator() {
434 KeyValueDB::WholeSpaceIterator iter;
435 iter = db->get_snapshot_iterator();
436 iter->seek_to_first();
440 int get(const string& prefix, const string& key, bufferlist& bl) {
443 map<string,bufferlist> out;
445 db->get(prefix, k, &out);
453 int get(const string& prefix, const version_t ver, bufferlist& bl) {
456 return get(prefix, os.str(), bl);
459 version_t get(const string& prefix, const string& key) {
461 int err = get(prefix, key, bl);
463 if (err == -ENOENT) // if key doesn't exist, assume its value is 0
465 // we're not expecting any other negative return value, and we can't
466 // just return a negative value if we're returning a version_t
467 generic_dout(0) << "MonitorDBStore::get() error obtaining"
468 << " (" << prefix << ":" << key << "): "
469 << cpp_strerror(err) << dendl;
470 assert(0 == "error obtaining key");
475 bufferlist::iterator p = bl.begin();
480 bool exists(const string& prefix, const string& key) {
481 KeyValueDB::Iterator it = db->get_iterator(prefix);
482 int err = it->lower_bound(key);
486 return (it->valid() && it->key() == key);
489 bool exists(const string& prefix, version_t ver) {
492 return exists(prefix, os.str());
495 string combine_strings(const string& prefix, const string& value) {
502 string combine_strings(const string& prefix, const version_t ver) {
505 return combine_strings(prefix, os.str());
508 void clear(set<string>& prefixes) {
509 set<string>::iterator iter;
510 KeyValueDB::Transaction dbt = db->get_transaction();
512 for (iter = prefixes.begin(); iter != prefixes.end(); ++iter) {
513 dbt->rmkeys_by_prefix((*iter));
515 int r = db->submit_transaction_sync(dbt);
519 void init_options() {
521 if (g_conf->mon_leveldb_write_buffer_size)
522 db->options.write_buffer_size = g_conf->mon_leveldb_write_buffer_size;
523 if (g_conf->mon_leveldb_cache_size)
524 db->options.cache_size = g_conf->mon_leveldb_cache_size;
525 if (g_conf->mon_leveldb_block_size)
526 db->options.block_size = g_conf->mon_leveldb_block_size;
527 if (g_conf->mon_leveldb_bloom_size)
528 db->options.bloom_size = g_conf->mon_leveldb_bloom_size;
529 if (g_conf->mon_leveldb_compression)
530 db->options.compression_enabled = g_conf->mon_leveldb_compression;
531 if (g_conf->mon_leveldb_max_open_files)
532 db->options.max_open_files = g_conf->mon_leveldb_max_open_files;
533 if (g_conf->mon_leveldb_paranoid)
534 db->options.paranoid_checks = g_conf->mon_leveldb_paranoid;
535 if (g_conf->mon_leveldb_log.length())
536 db->options.log_file = g_conf->mon_leveldb_log;
539 int open(ostream &out) {
541 return db->open(out);
544 int create_and_open(ostream &out) {
546 return db->create_and_open(out);
553 void compact_prefix(const string& prefix) {
554 db->compact_prefix(prefix);
557 uint64_t get_estimated_size(map<string, uint64_t> &extras) {
558 return db->get_estimated_size(extras);
561 MonitorDBStore(const string& path) :
562 db(0), do_dump(false), dump_fd(-1) {
563 string::const_reverse_iterator rit;
565 for (rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) {
570 os << path.substr(0, path.size() - pos) << "/store.db";
571 string full_path = os.str();
573 LevelDBStore *db_ptr = new LevelDBStore(g_ceph_context, full_path);
575 derr << __func__ << " error initializing level db back storage in "
576 << full_path << dendl;
577 assert(0 != "MonitorDBStore: error initializing level db back storage");
581 if (g_conf->mon_debug_dump_transactions) {
584 g_conf->mon_debug_dump_location.c_str(),
585 O_CREAT|O_APPEND|O_WRONLY, 0644);
588 derr << "Could not open log file, got "
589 << cpp_strerror(dump_fd) << dendl;
593 MonitorDBStore(LevelDBStore *db_ptr) :
594 db(0), do_dump(false), dump_fd(-1) {
604 WRITE_CLASS_ENCODER(MonitorDBStore::Op);
605 WRITE_CLASS_ENCODER(MonitorDBStore::Transaction);
607 #endif /* CEPH_MONITOR_DB_STORE_H */