Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: Tempting to introduce peer-to-peer chunk data exchange #728

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from

Conversation

jqdai
Copy link

@jqdai jqdai commented Sep 28, 2023

What do these changes do?

This is a proof-of-concept draft pull request.

In Xorbits, storage is used to store intermediate data and final results during the computation process, supporting various types of storage such as GPU memory, main memory, and disk. Currently in the Xorbits, data produced by workers is stored and managed by a cetralized storage_api.

This project hopes to introduce peer-to-peer data storage and communication, where each Xorbits worker hold their own data locally. A meta_api maintains the keys of data and the address of the worker that produced this data. Each subtask runner holds an independent RunnerStorage to maintain all data created in this runner and respond to requests for data (if it has). When a runner needs a non-local data, it looks up the meta_api and finds the address of the runner that holds the data, and then fetches the data. Thus, a centralized data storage is no longer necessary, which may bring potential speed accelerance.

Check code requirements

  • tests added / passed (if needed)
  • Ensure all linting tests pass

@XprobeBot XprobeBot added the gpu label Sep 28, 2023
@XprobeBot XprobeBot added this to the v0.7.0 milestone Sep 28, 2023
@jqdai jqdai changed the title POC: temping to introduce peer-to-peer chunk data exchange POC: Tempting to introduce peer-to-peer chunk data exchange Sep 29, 2023
# )
# self.result.status = SubtaskStatus.errored
# raise
runner_storage: RunnerStorageActor = await mo.create_actor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need this creation? The RunnerStorageActor is already created by SubtaskRunnerManagerActor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There used to be a 'RunnerStorage not found' error during debugging. Now the implementation is changed to raising an exception when runner storage is not found. Thank you.

@@ -341,13 +388,40 @@ async def _store_data(self, chunk_graph: ChunkGraph):
storage_level,
)
if puts:
try:
runner_storage: RunnerStorageActor = await mo.actor_ref(
uid=RunnerStorageActor.gen_uid(self._band[1], self._slot_id),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just store the data to current RunnerStorageActor. It's OK to get the actor ref and call put_data API. The in-process actor communication is optimized to a function call automatically.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we use mo.actor_ref to get the runner storage of the current subtask runner because RunnerStorage is currently not a property of SubtaskProcessor class or SubtaskRunnerActor class. Thus, the processors of a runner may not have direct access to the runner storage of the runner. I wonder if there's a better or more direct implementation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, sometimes I'm confused whether the address of runner storage should be set to the supervisor_address of the processor or the address stored in band (band[0], specifically). Current implementation takes the former but I'm not quite sure about it. SubtaskRunnerActor uses self.address which is a property not seen in its construction function.

put_infos = asyncio.create_task(self._storage_api.put.batch(*puts))
try:
store_infos = await put_infos
for store_key, store_info in zip(stored_keys, store_infos):
data_key_to_store_size[store_key] = store_info.store_size
data_key_to_memory_size[store_key] = store_info.memory_size
data_key_to_object_id[store_key] = store_info.object_id
data_key_to_band[store_key] = self._band
data_key_to_slot_id[store_key] = self._slot_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The slot id is not stored to meta?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line 562 of python/xorbits/_mars/services/subtask/worker/processor.py, in _store_meta(), we directly added current band and slot_id (self._band and self._slot_id, specifically). Thus we did not modify the original _stor_mapper_data of SubtaskProcessor to store mappings from data_key to band or slot_id. Is this still necessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meanwhile in file python/xorbits/_mars/services/storage/handler.py#L639, StorageHandler calls modified meta_api.add_chunk_bands, which needs [slot_id] as additional input, which is not provided by StorageHandlerActor itself. This is still unsolved.

@XprobeBot XprobeBot modified the milestones: v0.7.0, v0.7.1 Oct 23, 2023
@XprobeBot XprobeBot modified the milestones: v0.7.1, v0.7.2 Nov 21, 2023
@XprobeBot XprobeBot modified the milestones: v0.7.2, v0.7.3 Jan 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants