Skip to content

Commit b6e1b5c

Browse files
committed
Changes after PR review
1 parent d111594 commit b6e1b5c

File tree

5 files changed

+70
-14
lines changed

5 files changed

+70
-14
lines changed

src/server/db_slice.cc

+5-3
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {
356356
class DbSlice::PrimeBumpPolicy {
357357
public:
358358
bool CanBump(const CompactObj& obj) const {
359-
return obj.IsSticky() ? false : true;
359+
return !obj.IsSticky();
360360
}
361361
};
362362

@@ -1691,9 +1691,11 @@ void DbSlice::PerformDeletion(Iterator del_it, DbTable* table) {
16911691
PerformDeletionAtomic(del_it, exp_it, table);
16921692
}
16931693

1694-
void DbSlice::OnCbFinish() {
1694+
void DbSlice::OnCbFinishBlocking() {
16951695
if (IsCacheMode()) {
1696-
for (const auto& [key_hash, db_index, key] : fetched_items_) {
1696+
// move fetched items to local variable
1697+
auto moved_fetched_items_ = std::move(fetched_items_);
1698+
for (const auto& [key_hash, db_index, key] : moved_fetched_items_) {
16971699
auto& db = *db_arr_[db_index];
16981700

16991701
auto predicate = [&key](const PrimeKey& key_) { return key_ == key; };

src/server/db_slice.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ class DbSlice {
357357
return shard_id_;
358358
}
359359

360-
void OnCbFinish();
360+
void OnCbFinishBlocking();
361361

362362
bool Acquire(IntentLock::Mode m, const KeyLockArgs& lock_args);
363363
void Release(IntentLock::Mode m, const KeyLockArgs& lock_args);

src/server/debugcmd.cc

+5-5
Original file line numberDiff line numberDiff line change
@@ -868,11 +868,11 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t num_of_keys,
868868

869869
ess.AwaitRunningOnShardQueue([&](EngineShard* shard) {
870870
DoPopulateBatch(options, ps[shard->shard_id()]);
871-
// Debug populate does not use transaction framework therefore we call OnCbFinish manually
872-
// after running the callback
873-
// Note that running debug populate while running flushall/db can cause dcheck fail because the
874-
// finish cb is executed just when we finish populating the database.
875-
cntx_->ns->GetDbSlice(shard->shard_id()).OnCbFinish();
871+
// Debug populate does not use transaction framework therefore we call OnCbFinishBlocking
872+
// manually after running the callback Note that running debug populate while running
873+
// flushall/db can cause dcheck fail because the finish cb is executed just when we finish
874+
// populating the database.
875+
cntx_->ns->GetDbSlice(shard->shard_id()).OnCbFinishBlocking();
876876
});
877877
}
878878

src/server/transaction.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,7 @@ void Transaction::RunCallback(EngineShard* shard) {
695695
}
696696

697697
auto& db_slice = GetDbSlice(shard->shard_id());
698-
db_slice.OnCbFinish();
698+
db_slice.OnCbFinishBlocking();
699699

700700
// Handle result flags to alter behaviour.
701701
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
@@ -1364,7 +1364,7 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
13641364
auto& db_slice = GetDbSlice(shard->shard_id());
13651365

13661366
auto result = cb(this, shard);
1367-
db_slice.OnCbFinish();
1367+
db_slice.OnCbFinishBlocking();
13681368

13691369
LogAutoJournalOnShard(shard, result);
13701370
MaybeInvokeTrackingCb();

tests/dragonfly/generic_test.py

+57-3
Original file line numberDiff line numberDiff line change
@@ -114,19 +114,19 @@ async def block(id):
114114
tasks.append(block(i))
115115
await asyncio.gather(*tasks)
116116

117-
118117
# produce is constantly waking up consumers. It is used to trigger the
119118
# flow that creates wake ups on a differrent database in the
120119
# middle of continuation transaction.
121120
async def tasks_produce(num, iters):
122121
LPUSH_SCRIPT = """
123122
redis.call('LPUSH', KEYS[1], "val")
124123
"""
124+
125125
async def produce(id):
126126
c = df_server.client(db=1) # important to be on a different db
127127
for i in range(iters):
128128
# Must be a lua script and not multi-exec for some reason.
129-
await c.eval(LPUSH_SCRIPT, 1, f"list{{{id}}}")
129+
await c.eval(LPUSH_SCRIPT, 1, f"list{{{id}}}")
130130

131131
tasks = []
132132
for i in range(num):
@@ -151,7 +151,6 @@ async def drain(id, iters):
151151
await asyncio.gather(*tasks)
152152
logging.info("Finished consuming")
153153

154-
155154
num_keys = 32
156155
num_iters = 200
157156
async_task1 = asyncio.create_task(blmove_task_loose(num_keys))
@@ -288,3 +287,58 @@ async def test_rename_huge_values(df_factory, type):
288287
target_data = await StaticSeeder.capture(client)
289288

290289
assert source_data == target_data
290+
291+
292+
@pytest.mark.asyncio
293+
async def test_key_bump_ups(df_factory):
294+
master = df_factory.create(
295+
proactor_threads=2,
296+
cache_mode="true",
297+
)
298+
df_factory.start_all([master])
299+
c_master = master.client()
300+
301+
await c_master.execute_command("DEBUG POPULATE 18000 KEY 32 RAND")
302+
303+
info = await c_master.info("stats")
304+
assert info["bump_ups"] == 0
305+
306+
keys = await c_master.execute_command("SCAN 0")
307+
keys = keys[1][0:10]
308+
309+
# Bump keys
310+
for key in keys:
311+
await c_master.execute_command("GET " + key)
312+
info = await c_master.info("stats")
313+
assert info["bump_ups"] <= 10
314+
315+
# Multi get bump
316+
await c_master.execute_command("MGET " + " ".join(keys))
317+
info = await c_master.info("stats")
318+
assert info["bump_ups"] >= 10 and info["bump_ups"] <= 20
319+
last_bump_ups = info["bump_ups"]
320+
321+
for key in keys:
322+
await c_master.execute_command("DEL " + key)
323+
324+
# DEL should not bump up any key
325+
info = await c_master.info("stats")
326+
assert last_bump_ups == info["bump_ups"]
327+
328+
# Find key that has slot > 0 and bump it
329+
while True:
330+
keys = await c_master.execute_command("SCAN 0")
331+
key = keys[1][0]
332+
333+
debug_key_info = await c_master.execute_command("DEBUG OBJECT " + key)
334+
slot_id = int(dict(map(lambda s: s.split(":"), debug_key_info.split()))["slot"])
335+
if slot_id == 0:
336+
# delete the key and continue
337+
await c_master.execute_command("DEL " + key)
338+
continue
339+
340+
await c_master.execute_command("GET " + key)
341+
debug_key_info = await c_master.execute_command("DEBUG OBJECT " + key)
342+
new_slot_id = int(dict(map(lambda s: s.split(":"), debug_key_info.split()))["slot"])
343+
assert new_slot_id + 1 == slot_id
344+
break

0 commit comments

Comments
 (0)