Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Move bumpup logic out of FindInternal #4877

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 34 additions & 30 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,20 +355,9 @@ SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {

class DbSlice::PrimeBumpPolicy {
public:
PrimeBumpPolicy(absl::flat_hash_set<uint64_t, FpHasher>* items) : fetched_items_(items) {
}

// returns true if we can change the object location in dash table.
bool CanBump(const CompactObj& obj) const {
if (obj.IsSticky()) {
return false;
}
auto hc = obj.HashCode();
return fetched_items_->insert(hc).second;
return !obj.IsSticky();
}

private:
mutable absl::flat_hash_set<uint64_t, FpHasher>* fetched_items_;
};

DbSlice::DbSlice(uint32_t index, bool cache_mode, EngineShard* owner)
Expand Down Expand Up @@ -565,21 +554,9 @@ auto DbSlice::FindInternal(const Context& cntx, string_view key, optional<unsign
}

DCHECK(IsValid(res.it));
if (IsCacheMode()) {
if (!change_cb_.empty()) {
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
CallChangeCallbacks(cntx.db_index, key, bit);
};
db.prime.CVCUponBump(change_cb_.back().first, res.it, bump_cb);
}

// We must not change the bucket's internal order during serialization
serialization_latch_.Wait();
auto bump_it = db.prime.BumpUp(res.it, PrimeBumpPolicy{&fetched_items_});
if (bump_it != res.it) { // the item was bumped
res.it = bump_it;
++events_.bumpups;
}
if (IsCacheMode()) {
fetched_items_.insert({res.it->first.HashCode(), cntx.db_index, key});
}

switch (stats_mode) {
Expand Down Expand Up @@ -1714,10 +1691,37 @@ void DbSlice::PerformDeletion(Iterator del_it, DbTable* table) {
PerformDeletionAtomic(del_it, exp_it, table);
}

void DbSlice::OnCbFinish() {
// TBD update bumpups logic we can not clear now after cb finish as cb can preempt
// btw what do we do with inline?
fetched_items_.clear();
void DbSlice::OnCbFinishBlocking() {
if (IsCacheMode()) {
// move fetched items to local variable
auto moved_fetched_items_ = std::move(fetched_items_);
for (const auto& [key_hash, db_index, key] : moved_fetched_items_) {
auto& db = *db_arr_[db_index];

auto predicate = [&key](const PrimeKey& key_) { return key_ == key; };
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you do not comparing to key, you can just return true here.
The point is that it's not critical that once in a billion chance you have a collision and you bump up something else
so you can remove key from fetched_items_ tuple

Copy link
Contributor Author

@mkaruza mkaruza Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove and add extra comment for this decision.


PrimeIterator it = db.prime.FindFirst(key_hash, predicate);

if (!IsValid(it)) {
continue;
}

if (!change_cb_.empty()) {
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
CallChangeCallbacks(db_index, key, bit);
};
db.prime.CVCUponBump(change_cb_.back().first, it, bump_cb);
}

// We must not change the bucket's internal order during serialization
serialization_latch_.Wait();
auto bump_it = db.prime.BumpUp(it, PrimeBumpPolicy{});
if (bump_it != it) { // the item was bumped
++events_.bumpups;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add to our replication,snapshot pytests that we have for cache mode to check the bumpup counter, make sure that it is bigger than 0, to make sure we are still bumping items

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added pytest that checks few conditions for bumpups

}
}
fetched_items_.clear();
}

if (!pending_send_map_.empty()) {
SendQueuedInvalidationMessages();
Expand Down
10 changes: 8 additions & 2 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ class DbSlice {
return shard_id_;
}

void OnCbFinish();
void OnCbFinishBlocking();

bool Acquire(IntentLock::Mode m, const KeyLockArgs& lock_args);
void Release(IntentLock::Mode m, const KeyLockArgs& lock_args);
Expand Down Expand Up @@ -615,10 +615,16 @@ class DbSlice {

DbTableArray db_arr_;

// key for bump up items tuple contains <key hash, db_index, key>
using FetchedItemKey = std::tuple<uint64_t, DbIndex, std::string_view>;

struct FpHasher {
size_t operator()(uint64_t val) const {
return val;
}
size_t operator()(const FetchedItemKey& val) const {
return std::get<0>(val);
}
};

// Used in temporary computations in Acquire/Release.
Expand All @@ -635,7 +641,7 @@ class DbSlice {
// for operations that preempt in the middle we have another mechanism -
// auto laundering iterators, so in case of preemption we do not mind that fetched_items are
// cleared or changed.
mutable absl::flat_hash_set<uint64_t, FpHasher> fetched_items_;
mutable absl::flat_hash_set<FetchedItemKey, FpHasher> fetched_items_;

// Registered by shard indices on when first document index is created.
DocDeletionCallback doc_del_cb_;
Expand Down
10 changes: 5 additions & 5 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -868,11 +868,11 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t num_of_keys,

ess.AwaitRunningOnShardQueue([&](EngineShard* shard) {
DoPopulateBatch(options, ps[shard->shard_id()]);
// Debug populate does not use transaction framework therefore we call OnCbFinish manually
// after running the callback
// Note that running debug populate while running flushall/db can cause dcheck fail because the
// finish cb is executed just when we finish populating the database.
cntx_->ns->GetDbSlice(shard->shard_id()).OnCbFinish();
// Debug populate does not use transaction framework therefore we call OnCbFinishBlocking
// manually after running the callback Note that running debug populate while running
// flushall/db can cause dcheck fail because the finish cb is executed just when we finish
// populating the database.
cntx_->ns->GetDbSlice(shard->shard_id()).OnCbFinishBlocking();
});
}

Expand Down
8 changes: 4 additions & 4 deletions src/server/string_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,9 @@ TEST_F(StringFamilyTest, MGetCachingModeBug2276) {

resp = Run({"info", "stats"});
size_t bumps1 = get_bump_ups(resp.GetString());
EXPECT_GT(bumps1, 0);
EXPECT_LT(bumps1, 10); // we assume that some bumps are blocked because items reside next to each
// other in the slot.

EXPECT_GE(bumps1, 0);
EXPECT_LE(bumps1, 10);

for (int i = 0; i < 10; ++i) {
auto get_resp = Run({"get", vec[i]});
Expand Down Expand Up @@ -332,7 +332,7 @@ TEST_F(StringFamilyTest, MGetCachingModeBug2465) {

resp = Run({"info", "stats"});
size_t bumps = get_bump_ups(resp.GetString());
EXPECT_EQ(bumps, 3); // one bump for del and one for get and one for mget
EXPECT_EQ(bumps, 2); // one bump for get and one for mget
}

TEST_F(StringFamilyTest, MSetGet) {
Expand Down
4 changes: 2 additions & 2 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ void Transaction::RunCallback(EngineShard* shard) {
}

auto& db_slice = GetDbSlice(shard->shard_id());
db_slice.OnCbFinish();
db_slice.OnCbFinishBlocking();

// Handle result flags to alter behaviour.
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
Expand Down Expand Up @@ -1364,7 +1364,7 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
auto& db_slice = GetDbSlice(shard->shard_id());

auto result = cb(this, shard);
db_slice.OnCbFinish();
db_slice.OnCbFinishBlocking();

LogAutoJournalOnShard(shard, result);
MaybeInvokeTrackingCb();
Expand Down
60 changes: 57 additions & 3 deletions tests/dragonfly/generic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,19 @@ async def block(id):
tasks.append(block(i))
await asyncio.gather(*tasks)


# produce is constantly waking up consumers. It is used to trigger the
# flow that creates wake ups on a differrent database in the
# middle of continuation transaction.
async def tasks_produce(num, iters):
LPUSH_SCRIPT = """
redis.call('LPUSH', KEYS[1], "val")
"""

async def produce(id):
c = df_server.client(db=1) # important to be on a different db
for i in range(iters):
# Must be a lua script and not multi-exec for some reason.
await c.eval(LPUSH_SCRIPT, 1, f"list{{{id}}}")
await c.eval(LPUSH_SCRIPT, 1, f"list{{{id}}}")

tasks = []
for i in range(num):
Expand All @@ -151,7 +151,6 @@ async def drain(id, iters):
await asyncio.gather(*tasks)
logging.info("Finished consuming")


num_keys = 32
num_iters = 200
async_task1 = asyncio.create_task(blmove_task_loose(num_keys))
Expand Down Expand Up @@ -288,3 +287,58 @@ async def test_rename_huge_values(df_factory, type):
target_data = await StaticSeeder.capture(client)

assert source_data == target_data


@pytest.mark.asyncio
async def test_key_bump_ups(df_factory):
master = df_factory.create(
proactor_threads=2,
cache_mode="true",
)
df_factory.start_all([master])
c_master = master.client()

await c_master.execute_command("DEBUG POPULATE 18000 KEY 32 RAND")

info = await c_master.info("stats")
assert info["bump_ups"] == 0

keys = await c_master.execute_command("SCAN 0")
keys = keys[1][0:10]

# Bump keys
for key in keys:
await c_master.execute_command("GET " + key)
info = await c_master.info("stats")
assert info["bump_ups"] <= 10

# Multi get bump
await c_master.execute_command("MGET " + " ".join(keys))
info = await c_master.info("stats")
assert info["bump_ups"] >= 10 and info["bump_ups"] <= 20
last_bump_ups = info["bump_ups"]

for key in keys:
await c_master.execute_command("DEL " + key)

# DEL should not bump up any key
info = await c_master.info("stats")
assert last_bump_ups == info["bump_ups"]

# Find key that has slot > 0 and bump it
while True:
keys = await c_master.execute_command("SCAN 0")
key = keys[1][0]

debug_key_info = await c_master.execute_command("DEBUG OBJECT " + key)
slot_id = int(dict(map(lambda s: s.split(":"), debug_key_info.split()))["slot"])
if slot_id == 0:
# delete the key and continue
await c_master.execute_command("DEL " + key)
continue

await c_master.execute_command("GET " + key)
debug_key_info = await c_master.execute_command("DEBUG OBJECT " + key)
new_slot_id = int(dict(map(lambda s: s.split(":"), debug_key_info.split()))["slot"])
assert new_slot_id + 1 == slot_id
break
Loading