Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] min_spilling_size cleanup #51998

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ RAY_CONFIG(int, max_io_workers, 4)

/// Ray's object spilling fuses small objects into a single file before flushing them
/// to optimize the performance.
/// The minimum object size that can be spilled by each spill operation. 100 MB by
/// Ray will try to spill at least this size or up to max_fused_object_count. 100 MB by
/// default. This value is not recommended to set beyond --object-store-memory.
RAY_CONFIG(int64_t, min_spilling_size, 100 * 1024 * 1024)

Expand Down
32 changes: 16 additions & 16 deletions src/ray/raylet/local_object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ void LocalObjectManager::ReleaseFreedObject(const ObjectID &object_id) {

RAY_LOG(DEBUG) << "Unpinning object " << object_id;
// The object should be in one of these states: pinned, spilling, or spilled.
RAY_CHECK(pinned_objects_.contains(object_id) ||
auto pinned_objects_it = pinned_objects_.find(object_id);
RAY_CHECK(pinned_objects_it != pinned_objects_.end() ||
spilled_objects_url_.contains(object_id) ||
objects_pending_spill_.contains(object_id));
if (pinned_objects_.contains(object_id)) {
pinned_objects_size_ -= pinned_objects_[object_id]->GetSize();
pinned_objects_.erase(object_id);
if (pinned_objects_it != pinned_objects_.end()) {
pinned_objects_size_ -= pinned_objects_it->second->GetSize();
pinned_objects_.erase(pinned_objects_it);
local_objects_.erase(it);
} else {
// If the object is being spilled or is already spilled, then we will clean
Expand Down Expand Up @@ -164,7 +165,7 @@ void LocalObjectManager::SpillObjectUptoMaxThroughput() {
// Spill as fast as we can using all our spill workers.
bool can_spill_more = true;
while (can_spill_more) {
if (!SpillObjectsOfSize(min_spilling_size_)) {
if (!TryToSpillObjects()) {
break;
}
can_spill_more = num_active_workers_ < max_active_workers_;
Expand All @@ -173,12 +174,14 @@ void LocalObjectManager::SpillObjectUptoMaxThroughput() {

bool LocalObjectManager::IsSpillingInProgress() { return num_active_workers_ > 0; }

bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) {
bool LocalObjectManager::TryToSpillObjects() {
if (RayConfig::instance().object_spilling_config().empty()) {
return false;
}

RAY_LOG(DEBUG) << "Choosing objects to spill of total size " << num_bytes_to_spill;
RAY_LOG(DEBUG) << "Choosing objects to spill with minimum total size "
<< min_spilling_size_
<< " or with total # of objects = " << max_fused_object_count_;
int64_t bytes_to_spill = 0;
std::vector<ObjectID> objects_to_spill;
int64_t num_to_spill = 0;
Expand All @@ -198,10 +201,11 @@ bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) {
return false;
}

if (idx == objects_pending_spill_.size() && bytes_to_spill < num_bytes_to_spill &&
if (idx == objects_pending_spill_.size() && bytes_to_spill < min_spilling_size_ &&
!objects_pending_spill_.empty()) {
// We have gone through all spillable objects but we have not yet reached
// the minimum bytes to spill and we are already spilling other objects.
// 1. We've gone through all objects and it didn't hit max_fused_object_count_.
// 2. The total size of the current objects is less than min_spilling_size.
// 3. There are other objects already being spilled.
// Let those spill requests finish before we try to spill the current
// objects. This gives us some time to decide whether we really need to
// spill the current objects or if we can afford to wait for additional
Expand Down Expand Up @@ -264,7 +268,7 @@ void LocalObjectManager::SpillObjectsInternal(
std::function<void(const ray::Status &)> callback) {
std::vector<ObjectID> objects_to_spill;
// Filter for the objects that can be spilled.
// TODO(dayshah): The logic in this loop should be moved to SpillObjectsOfSize. We can
// TODO(dayshah): The logic in this loop should be moved to TryToSpillObjects. We can
// do this logic while creating what we pass into object_ids here and don't need
// to recreate objects_to_spill. The error status is also thrown away in the callback
// here as a debug log, so we wouldn't know if we failed to spill because of the check.
Expand Down Expand Up @@ -392,11 +396,7 @@ void LocalObjectManager::OnObjectSpilled(const std::vector<ObjectID> &object_ids
auto parsed_url = ParseURL(object_url);
const auto base_url_it = parsed_url->find("url");
RAY_CHECK(base_url_it != parsed_url->end());
if (!url_ref_count_.contains(base_url_it->second)) {
url_ref_count_[base_url_it->second] = 1;
} else {
url_ref_count_[base_url_it->second] += 1;
}
url_ref_count_[base_url_it->second] += 1;

// Mark that the object is spilled and unpin the pending requests.
spilled_objects_url_.emplace(object_id, object_url);
Expand Down
21 changes: 8 additions & 13 deletions src/ray/raylet/local_object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class LocalObjectManager {
IOWorkerPoolInterface &io_worker_pool,
rpc::CoreWorkerClientPool &owner_client_pool,
int max_io_workers,
int64_t min_spilling_size,
bool is_external_storage_type_fs,
int64_t max_fused_object_count,
std::function<void(const std::vector<ObjectID> &)> on_objects_freed,
Expand All @@ -68,7 +67,7 @@ class LocalObjectManager {
owner_client_pool_(owner_client_pool),
on_objects_freed_(on_objects_freed),
last_free_objects_at_ms_(current_time_ms()),
min_spilling_size_(min_spilling_size),
min_spilling_size_(RayConfig::instance().min_spilling_size()),
num_active_workers_(0),
max_active_workers_(max_io_workers),
is_plasma_object_spillable_(is_plasma_object_spillable),
Expand Down Expand Up @@ -189,24 +188,22 @@ class LocalObjectManager {
size_t object_size;
};

FRIEND_TEST(LocalObjectManagerTest, TestSpillObjectsOfSizeZero);
FRIEND_TEST(LocalObjectManagerTest, TestTryToSpillObjectsZero);
FRIEND_TEST(LocalObjectManagerTest, TestSpillUptoMaxFuseCount);
FRIEND_TEST(LocalObjectManagerTest,
TestSpillObjectsOfSizeNumBytesToSpillHigherThanMinBytesToSpill);
TestTryToSpillObjectsNumBytesToSpillHigherThanMinBytesToSpill);
FRIEND_TEST(LocalObjectManagerTest, TestSpillObjectNotEvictable);
FRIEND_TEST(LocalObjectManagerTest, TestRetryDeleteSpilledObjects);

/// Asynchronously spill objects when space is needed. The callback tries to
/// spill at least num_bytes_to_spill and returns true if we found objects to
/// spill.
/// If num_bytes_to_spill many objects cannot be found and there are other
/// objects already being spilled, this will return false to give the
/// spill at least min_spilling_size_ or max_fused_object_count_ and returns true if we
/// found objects to spill. If neither are satisifed and there
/// are other objects already being spilled, this will return false to give the
/// currently spilling objects time to finish.
/// NOTE(sang): If 0 is given, this method spills a single object.
///
/// \param num_bytes_to_spill The total number of bytes to spill.
/// \return True if it can spill num_bytes_to_spill. False otherwise.
bool SpillObjectsOfSize(int64_t num_bytes_to_spill);
/// \return True if it decides to spill more objects. False otherwise.
bool TryToSpillObjects();

/// Internal helper method for spilling objects.
void SpillObjectsInternal(const std::vector<ObjectID> &objects_ids,
Expand Down Expand Up @@ -387,8 +384,6 @@ class LocalObjectManager {
std::atomic<int64_t> num_failed_deletion_requests_ = 0;

friend class LocalObjectManagerTestWithMinSpillingSize;
friend class LocalObjectManagerTest;
friend class LocalObjectManagerFusedTest;
};

}; // namespace raylet
Expand Down
1 change: 0 additions & 1 deletion src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,6 @@ int main(int argc, char *argv[]) {
node_manager_config.resource_dir = resource_dir;
node_manager_config.ray_debugger_external = ray_debugger_external;
node_manager_config.max_io_workers = RayConfig::instance().max_io_workers();
node_manager_config.min_spilling_size = RayConfig::instance().min_spilling_size();

// Configuration for the object manager.
ray::ObjectManagerConfig object_manager_config;
Expand Down
1 change: 0 additions & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ NodeManager::NodeManager(
worker_pool_,
worker_rpc_pool_,
/*max_io_workers*/ config.max_io_workers,
/*min_spilling_size*/ config.min_spilling_size,
/*is_external_storage_type_fs*/
RayConfig::instance().is_external_storage_type_fs(),
/*max_fused_object_count*/ RayConfig::instance().max_fused_object_count(),
Expand Down
2 changes: 0 additions & 2 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ struct NodeManagerConfig {
uint64_t record_metrics_period_ms;
// The number if max io workers.
int max_io_workers;
// The minimum object size that can be spilled by each spill operation.
int64_t min_spilling_size;
// The key-value labels of this node.
absl::flat_hash_map<std::string, std::string> labels;
// If true, core worker enables resource isolation by adding itself into appropriate
Expand Down
18 changes: 10 additions & 8 deletions src/ray/raylet/test/local_object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ class LocalObjectManagerTestWithMinSpillingSize {
worker_pool,
client_pool,
/*max_io_workers=*/2,
/*min_spilling_size=*/min_spilling_size,
/*is_external_storage_type_fs=*/true,
/*max_fused_object_count*/ max_fused_object_count_,
/*on_objects_freed=*/
Expand All @@ -365,6 +364,7 @@ class LocalObjectManagerTestWithMinSpillingSize {
object_directory_.get()),
unpins(std::make_shared<absl::flat_hash_map<ObjectID, int>>()) {
RayConfig::instance().initialize(R"({"object_spilling_config": "dummy"})");
manager.min_spilling_size_ = min_spilling_size;
}

int64_t NumBytesPendingSpill() { return manager.num_bytes_pending_spill_; }
Expand Down Expand Up @@ -636,7 +636,7 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) {
ASSERT_EQ(GetCurrentSpilledBytes(), object_ids.size() * object_size);
}

TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSizeZero) {
TEST_F(LocalObjectManagerTest, TestTryToSpillObjectsZero) {
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());

Expand All @@ -653,9 +653,9 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSizeZero) {
objects.push_back(std::move(object));
}
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
// Make sure providing 0 bytes to SpillObjectsOfSize will spill one object.
// This is important to cover min_spilling_size_== 0.
ASSERT_TRUE(manager.SpillObjectsOfSize(0));
// Make sure providing 0 bytes as min_spilling_size_ will spill one object.
manager.min_spilling_size_ = 0;
ASSERT_TRUE(manager.TryToSpillObjects());
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
EXPECT_CALL(worker_pool, PushSpillWorker(_));
const std::string url = BuildURL("url" + std::to_string(object_ids.size()));
Expand Down Expand Up @@ -689,7 +689,8 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxFuseCount) {
objects.push_back(std::move(object));
}
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
ASSERT_TRUE(manager.SpillObjectsOfSize(total_size));
manager.min_spilling_size_ = total_size;
ASSERT_TRUE(manager.TryToSpillObjects());
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
for (const auto &id : object_ids) {
ASSERT_EQ((*unpins)[id], 0);
Expand Down Expand Up @@ -734,14 +735,15 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectNotEvictable) {
objects.push_back(std::move(object));

manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
ASSERT_FALSE(manager.SpillObjectsOfSize(1000));
manager.min_spilling_size_ = 1000;
ASSERT_FALSE(manager.TryToSpillObjects());
for (const auto &id : object_ids) {
ASSERT_EQ((*unpins)[id], 0);
}

// Now object is evictable. Spill should succeed.
unevictable_objects_.erase(object_id);
ASSERT_TRUE(manager.SpillObjectsOfSize(1000));
ASSERT_TRUE(manager.TryToSpillObjects());

AssertIOWorkersDoSpill(/*num_objects*/ 1, /*num_batches*/ 1);
ASSERT_EQ(GetCurrentSpilledCount(), 1);
Expand Down