Skip to content

Commit 7aa0491

Browse files
author
henry.hj
committed
fusedev: support splice to handle FUSE requests.
1. Support enable splice_read/splice_write on FuseChannel. 2. Add splice interface for ZeroCopyReader and ZeroCopyWriter. 3. Add unit-test cases for splice interface. Signed-off-by: henry.hj <henry.hj@antgroup.com>
1 parent 5ae92b3 commit 7aa0491

File tree

8 files changed

+1493
-241
lines changed

8 files changed

+1493
-241
lines changed

src/api/filesystem/mod.rs

+95
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ use crate::abi::fuse_abi::{ino64_t, stat64};
2626
mod async_io;
2727
#[cfg(feature = "async-io")]
2828
pub use async_io::{AsyncFileSystem, AsyncZeroCopyReader, AsyncZeroCopyWriter};
29+
#[cfg(all(target_os = "linux", feature = "fusedev"))]
30+
use std::os::unix::io::AsRawFd;
2931

3032
mod sync_io;
3133
pub use sync_io::FileSystem;
@@ -208,6 +210,18 @@ pub trait ZeroCopyReader: io::Read {
208210
off: u64,
209211
) -> io::Result<usize>;
210212

213+
/// Copies at most `count` bytes from `self` directly into `f` at offset `off` with less data copy
214+
/// `f` could be local file description or tcp socket
215+
#[cfg(all(target_os = "linux", feature = "fusedev"))]
216+
fn splice_to(
217+
&mut self,
218+
_f: &dyn AsRawFd,
219+
_count: usize,
220+
_off: Option<u64>,
221+
) -> io::Result<usize> {
222+
Err(io::Error::from_raw_os_error(libc::ENOSYS))
223+
}
224+
211225
/// Copies exactly `count` bytes of data from `self` into `f` at offset `off`. `off + count`
212226
/// must be less than `u64::MAX`.
213227
///
@@ -251,6 +265,50 @@ pub trait ZeroCopyReader: io::Read {
251265
Ok(())
252266
}
253267

268+
/// Copies exactly `count` bytes of data from `self` into `f` at offset `off`. `off + count`
269+
/// must be less than `u64::MAX`.
270+
/// `f` could be local file description or tcp socket
271+
#[cfg(all(target_os = "linux", feature = "fusedev"))]
272+
fn splice_exact_to(
273+
&mut self,
274+
f: &mut dyn AsRawFd,
275+
mut count: usize,
276+
mut off: Option<u64>,
277+
) -> io::Result<()> {
278+
let c = count
279+
.try_into()
280+
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
281+
if let Some(v) = off.as_ref() {
282+
if v.checked_add(c).is_none() {
283+
return Err(io::Error::new(
284+
io::ErrorKind::InvalidInput,
285+
"`off` + `count` must be less than u64::MAX",
286+
));
287+
}
288+
}
289+
290+
while count > 0 {
291+
match self.splice_to(f, count, off) {
292+
Ok(0) => {
293+
return Err(io::Error::new(
294+
io::ErrorKind::WriteZero,
295+
"failed to fill whole buffer",
296+
))
297+
}
298+
Ok(n) => {
299+
count -= n;
300+
if let Some(v) = off.as_mut() {
301+
*v += n as u64;
302+
}
303+
}
304+
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
305+
Err(e) => return Err(e),
306+
}
307+
}
308+
309+
Ok(())
310+
}
311+
254312
/// Copies all remaining bytes from `self` into `f` at offset `off`. Equivalent to repeatedly
255313
/// calling `read_to` until it returns either `Ok(0)` or a non-`ErrorKind::Interrupted` error.
256314
///
@@ -275,6 +333,24 @@ pub trait ZeroCopyReader: io::Read {
275333
}
276334
}
277335
}
336+
337+
/// Copies all remaining bytes from `self` into `f` at offset `off`.
338+
/// `f` could be local file description or tcp socket
339+
#[cfg(all(target_os = "linux", feature = "fusedev"))]
340+
fn splice_to_end(&mut self, f: &mut dyn AsRawFd, mut off: Option<u64>) -> io::Result<usize> {
341+
let mut out = 0;
342+
loop {
343+
match self.splice_to(f, ::std::usize::MAX, off) {
344+
Ok(0) => return Ok(out),
345+
Ok(n) => {
346+
off.as_mut().map(|v| v.saturating_add(n as u64));
347+
out += n;
348+
}
349+
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
350+
Err(e) => return Err(e),
351+
}
352+
}
353+
}
278354
}
279355

280356
/// A trait for directly copying data from a `File` into the fuse transport without first storing
@@ -300,6 +376,25 @@ pub trait ZeroCopyWriter: io::Write {
300376
off: u64,
301377
) -> io::Result<usize>;
302378

379+
/// Append `count` bytes data from fd `f` at offset `off`
380+
/// `f` should be file description or socket
381+
/// This data is always append at the end of all data, only available for bufferd writer.
382+
/// For example:
383+
/// We already write "aaa" to writer. Then we append fd buf witch contains "bbb".
384+
/// Finally we write "ccc" to writer. The final data is "aaacccbbb".
385+
///
386+
/// # Errors
387+
/// ENOSYS: writer doesn't support this operation, should fallback to use `write_from`.
388+
#[cfg(all(target_os = "linux", feature = "fusedev"))]
389+
fn append_fd_buf(
390+
&mut self,
391+
_f: &dyn AsRawFd,
392+
_count: usize,
393+
_off: Option<u64>,
394+
) -> io::Result<usize> {
395+
Err(io::Error::from_raw_os_error(libc::ENOSYS))
396+
}
397+
303398
/// Copies exactly `count` bytes of data from `f` at offset `off` into `self`. `off + count`
304399
/// must be less than `u64::MAX`.
305400
///

src/api/server/async_io.rs

+40-20
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,15 @@ impl<'a, S: BitmapSlice> io::Read for AsyncZcReader<'a, S> {
6060
}
6161
}
6262

63-
struct AsyncZcWriter<'a, S: BitmapSlice = ()>(Writer<'a, S>);
63+
struct AsyncZcWriter<'a, 'b, S: BitmapSlice = ()>(Writer<'a, 'b, S>);
6464

6565
// The underlying VolatileSlice contains "*mut u8", which is just a pointer to a u8 array.
6666
// Actually we rely on the AsyncExecutor is a single-threaded worker, and we do not really send
6767
// 'Reader' to other threads.
68-
unsafe impl<'a, S: BitmapSlice> Send for AsyncZcWriter<'a, S> {}
68+
unsafe impl<'a, 'b, S: BitmapSlice> Send for AsyncZcWriter<'a, 'b, S> {}
6969

7070
#[async_trait(?Send)]
71-
impl<'a, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, S> {
71+
impl<'a, 'b, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, 'b, S> {
7272
async fn async_write_from(
7373
&mut self,
7474
f: Arc<dyn AsyncFileReadWriteVolatile>,
@@ -79,7 +79,7 @@ impl<'a, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, S> {
7979
}
8080
}
8181

82-
impl<'a, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, S> {
82+
impl<'a, 'b, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, 'b, S> {
8383
fn write_from(
8484
&mut self,
8585
f: &mut dyn FileReadWriteVolatile,
@@ -94,7 +94,7 @@ impl<'a, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, S> {
9494
}
9595
}
9696

97-
impl<'a, S: BitmapSlice> io::Write for AsyncZcWriter<'a, S> {
97+
impl<'a, 'b, S: BitmapSlice> io::Write for AsyncZcWriter<'a, 'b, S> {
9898
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
9999
self.0.write(buf)
100100
}
@@ -120,7 +120,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
120120
pub async unsafe fn async_handle_message<S: BitmapSlice>(
121121
&self,
122122
mut r: Reader<'_, S>,
123-
w: Writer<'_, S>,
123+
w: Writer<'_, '_, S>,
124124
vu_req: Option<&mut dyn FsCacheReqHandler>,
125125
hook: Option<&dyn MetricsHook>,
126126
) -> Result<usize> {
@@ -222,7 +222,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
222222
res
223223
}
224224

225-
async fn async_lookup<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
225+
async fn async_lookup<S: BitmapSlice>(
226+
&self,
227+
mut ctx: SrvContext<'_, '_, F, S>,
228+
) -> Result<usize> {
226229
let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, 0)?;
227230
let name = match bytes_to_cstr(buf.as_ref()) {
228231
Ok(name) => name,
@@ -234,8 +237,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
234237
return Err(e);
235238
}
236239
};
237-
238-
let version = self.vers.load();
240+
let version = &self.meta.load().version;
239241
let result = self
240242
.fs
241243
.async_lookup(ctx.context(), ctx.nodeid(), name)
@@ -258,7 +260,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
258260
}
259261
}
260262

261-
async fn async_getattr<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
263+
async fn async_getattr<S: BitmapSlice>(
264+
&self,
265+
mut ctx: SrvContext<'_, '_, F, S>,
266+
) -> Result<usize> {
262267
let GetattrIn { flags, fh, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
263268
let handle = if (flags & GETATTR_FH) != 0 {
264269
Some(fh.into())
@@ -273,7 +278,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
273278
ctx.async_handle_attr_result(result).await
274279
}
275280

276-
async fn async_setattr<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
281+
async fn async_setattr<S: BitmapSlice>(
282+
&self,
283+
mut ctx: SrvContext<'_, '_, F, S>,
284+
) -> Result<usize> {
277285
let setattr_in: SetattrIn = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
278286
let handle = if setattr_in.valid & FATTR_FH != 0 {
279287
Some(setattr_in.fh.into())
@@ -290,7 +298,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
290298
ctx.async_handle_attr_result(result).await
291299
}
292300

293-
async fn async_open<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
301+
async fn async_open<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result<usize> {
294302
let OpenIn { flags, fuse_flags } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
295303
let result = self
296304
.fs
@@ -311,7 +319,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
311319
}
312320
}
313321

314-
async fn async_read<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
322+
async fn async_read<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result<usize> {
315323
let ReadIn {
316324
fh,
317325
offset,
@@ -369,7 +377,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
369377
.await
370378
.map_err(Error::EncodeMessage)?;
371379
ctx.w
372-
.async_commit(Some(&data_writer.0))
380+
.async_commit(Some(&mut data_writer.0))
373381
.await
374382
.map_err(Error::EncodeMessage)?;
375383
Ok(out.len as usize)
@@ -378,7 +386,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
378386
}
379387
}
380388

381-
async fn async_write<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
389+
async fn async_write<S: BitmapSlice>(
390+
&self,
391+
mut ctx: SrvContext<'_, '_, F, S>,
392+
) -> Result<usize> {
382393
let WriteIn {
383394
fh,
384395
offset,
@@ -430,7 +441,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
430441
}
431442
}
432443

433-
async fn async_fsync<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
444+
async fn async_fsync<S: BitmapSlice>(
445+
&self,
446+
mut ctx: SrvContext<'_, '_, F, S>,
447+
) -> Result<usize> {
434448
let FsyncIn {
435449
fh, fsync_flags, ..
436450
} = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
@@ -446,7 +460,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
446460
}
447461
}
448462

449-
async fn async_fsyncdir<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
463+
async fn async_fsyncdir<S: BitmapSlice>(
464+
&self,
465+
mut ctx: SrvContext<'_, '_, F, S>,
466+
) -> Result<usize> {
450467
let FsyncIn {
451468
fh, fsync_flags, ..
452469
} = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
@@ -462,7 +479,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
462479
}
463480
}
464481

465-
async fn async_create<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
482+
async fn async_create<S: BitmapSlice>(
483+
&self,
484+
mut ctx: SrvContext<'_, '_, F, S>,
485+
) -> Result<usize> {
466486
let args: CreateIn = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
467487
let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, size_of::<CreateIn>())?;
468488
let name = match bytes_to_cstr(buf.as_ref()) {
@@ -508,7 +528,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
508528

509529
async fn async_fallocate<S: BitmapSlice>(
510530
&self,
511-
mut ctx: SrvContext<'_, F, S>,
531+
mut ctx: SrvContext<'_, '_, F, S>,
512532
) -> Result<usize> {
513533
let FallocateIn {
514534
fh,
@@ -529,7 +549,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
529549
}
530550
}
531551

532-
impl<'a, F: AsyncFileSystem, S: BitmapSlice> SrvContext<'a, F, S> {
552+
impl<'a, 'b, F: AsyncFileSystem, S: BitmapSlice> SrvContext<'a, 'b, F, S> {
533553
async fn async_reply_ok<T: ByteValued>(
534554
&mut self,
535555
out: Option<T>,

0 commit comments

Comments
 (0)