Skip to content

Commit 2dc412b

Browse files
author
henry.hj
committed
fusedev: support splice to handle FUSE requests.
1. Support enable splice_read/splice_write on FuseChannel. 2. Add splice interface for ZeroCopyReader and ZeroCopyWriter. 3. Add unit-test cases for splice interface. Signed-off-by: henry.hj <henry.hj@antgroup.com>
1 parent f18d4be commit 2dc412b

File tree

9 files changed

+1502
-249
lines changed

9 files changed

+1502
-249
lines changed

src/api/filesystem/mod.rs

+89
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ use crate::abi::fuse_abi::{ino64_t, stat64};
2626
mod async_io;
2727
#[cfg(feature = "async-io")]
2828
pub use async_io::{AsyncFileSystem, AsyncZeroCopyReader, AsyncZeroCopyWriter};
29+
#[cfg(all(target_os = "linux", feature = "fusedev"))]
30+
use std::os::unix::io::{AsRawFd, FromRawFd};
2931

3032
mod sync_io;
3133
pub use sync_io::FileSystem;
@@ -208,6 +210,16 @@ pub trait ZeroCopyReader: io::Read {
208210
off: u64,
209211
) -> io::Result<usize>;
210212

213+
/// Copies at most `count` bytes from `self` directly into `f` at offset `off` with less data copy
214+
/// `f` could be local file description or tcp socket
215+
#[cfg(all(target_os = "linux", feature = "fusedev"))]
216+
fn splice_to(&mut self, f: &dyn AsRawFd, count: usize, off: u64) -> io::Result<usize> {
217+
let mut file = unsafe { std::fs::File::from_raw_fd(f.as_raw_fd()) };
218+
let res = self.read_to(&mut file, count, off);
219+
std::mem::forget(file);
220+
res
221+
}
222+
211223
/// Copies exactly `count` bytes of data from `self` into `f` at offset `off`. `off + count`
212224
/// must be less than `u64::MAX`.
213225
///
@@ -251,6 +263,46 @@ pub trait ZeroCopyReader: io::Read {
251263
Ok(())
252264
}
253265

266+
/// Copies exactly `count` bytes of data from `self` into `f` at offset `off`. `off + count`
267+
/// must be less than `u64::MAX`.
268+
/// `f` could be local file description or tcp socket
269+
#[cfg(all(target_os = "linux", feature = "fusedev"))]
270+
fn splice_exact_to(
271+
&mut self,
272+
f: &mut dyn AsRawFd,
273+
mut count: usize,
274+
mut off: u64,
275+
) -> io::Result<()> {
276+
let c = count
277+
.try_into()
278+
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
279+
if off.checked_add(c).is_none() {
280+
return Err(io::Error::new(
281+
io::ErrorKind::InvalidInput,
282+
"`off` + `count` must be less than u64::MAX",
283+
));
284+
}
285+
286+
while count > 0 {
287+
match self.splice_to(f, count, off) {
288+
Ok(0) => {
289+
return Err(io::Error::new(
290+
io::ErrorKind::WriteZero,
291+
"failed to fill whole buffer",
292+
))
293+
}
294+
Ok(n) => {
295+
count -= n;
296+
off += n as u64;
297+
}
298+
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
299+
Err(e) => return Err(e),
300+
}
301+
}
302+
303+
Ok(())
304+
}
305+
254306
/// Copies all remaining bytes from `self` into `f` at offset `off`. Equivalent to repeatedly
255307
/// calling `read_to` until it returns either `Ok(0)` or a non-`ErrorKind::Interrupted` error.
256308
///
@@ -275,6 +327,24 @@ pub trait ZeroCopyReader: io::Read {
275327
}
276328
}
277329
}
330+
331+
/// Copies all remaining bytes from `self` into `f` at offset `off`.
332+
/// `f` could be local file description or tcp socket
333+
#[cfg(all(target_os = "linux", feature = "fusedev"))]
334+
fn splice_to_end(&mut self, f: &mut dyn AsRawFd, mut off: u64) -> io::Result<usize> {
335+
let mut out = 0;
336+
loop {
337+
match self.splice_to(f, ::std::usize::MAX, off) {
338+
Ok(0) => return Ok(out),
339+
Ok(n) => {
340+
off = off.saturating_add(n as u64);
341+
out += n;
342+
}
343+
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
344+
Err(e) => return Err(e),
345+
}
346+
}
347+
}
278348
}
279349

280350
/// A trait for directly copying data from a `File` into the fuse transport without first storing
@@ -300,6 +370,25 @@ pub trait ZeroCopyWriter: io::Write {
300370
off: u64,
301371
) -> io::Result<usize>;
302372

373+
/// Append `count` bytes data from fd `f` at offset `off`
374+
/// `f` should be file description or socket
375+
/// This data always at the end of all data, only available for bufferd writer.
376+
/// For example:
377+
/// We already write "aaa to writer. Then we append fd buf witch contains "bbb".
378+
/// Finally we write "ccc" to writer. The final data is "aaacccbbb".
379+
///
380+
/// # Errors
381+
/// EINVAL: writer doesn't support this operation, should fallback to use `write_from`.
382+
#[cfg(all(target_os = "linux", feature = "fusedev"))]
383+
fn append_fd_buf(
384+
&mut self,
385+
_f: &dyn AsRawFd,
386+
_count: usize,
387+
_off: Option<u64>,
388+
) -> io::Result<usize> {
389+
Err(io::Error::from_raw_os_error(libc::EINVAL))
390+
}
391+
303392
/// Copies exactly `count` bytes of data from `f` at offset `off` into `self`. `off + count`
304393
/// must be less than `u64::MAX`.
305394
///

src/api/server/async_io.rs

+40-19
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,15 @@ impl<'a, S: BitmapSlice> io::Read for AsyncZcReader<'a, S> {
6060
}
6161
}
6262

63-
struct AsyncZcWriter<'a, S: BitmapSlice = ()>(Writer<'a, S>);
63+
struct AsyncZcWriter<'a, 'b, S: BitmapSlice = ()>(Writer<'a, 'b, S>);
6464

6565
// The underlying VolatileSlice contains "*mut u8", which is just a pointer to a u8 array.
6666
// Actually we rely on the AsyncExecutor is a single-threaded worker, and we do not really send
6767
// 'Reader' to other threads.
68-
unsafe impl<'a, S: BitmapSlice> Send for AsyncZcWriter<'a, S> {}
68+
unsafe impl<'a, 'b, S: BitmapSlice> Send for AsyncZcWriter<'a, 'b, S> {}
6969

7070
#[async_trait(?Send)]
71-
impl<'a, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, S> {
71+
impl<'a, 'b, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, 'b, S> {
7272
async fn async_write_from(
7373
&mut self,
7474
f: Arc<dyn AsyncFileReadWriteVolatile>,
@@ -79,7 +79,7 @@ impl<'a, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, S> {
7979
}
8080
}
8181

82-
impl<'a, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, S> {
82+
impl<'a, 'b, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, 'b, S> {
8383
fn write_from(
8484
&mut self,
8585
f: &mut dyn FileReadWriteVolatile,
@@ -90,7 +90,7 @@ impl<'a, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, S> {
9090
}
9191
}
9292

93-
impl<'a, S: BitmapSlice> io::Write for AsyncZcWriter<'a, S> {
93+
impl<'a, 'b, S: BitmapSlice> io::Write for AsyncZcWriter<'a, 'b, S> {
9494
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
9595
self.0.write(buf)
9696
}
@@ -116,7 +116,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
116116
pub async unsafe fn async_handle_message<S: BitmapSlice>(
117117
&self,
118118
mut r: Reader<'_, S>,
119-
w: Writer<'_, S>,
119+
w: Writer<'_, '_, S>,
120120
vu_req: Option<&mut dyn FsCacheReqHandler>,
121121
hook: Option<&dyn MetricsHook>,
122122
) -> Result<usize> {
@@ -210,10 +210,13 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
210210
res
211211
}
212212

213-
async fn async_lookup<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
213+
async fn async_lookup<S: BitmapSlice>(
214+
&self,
215+
mut ctx: SrvContext<'_, '_, F, S>,
216+
) -> Result<usize> {
214217
let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, 0)?;
215218
let name = bytes_to_cstr(buf.as_ref())?;
216-
let version = self.vers.load();
219+
let version = &self.meta.load().version;
217220
let result = self
218221
.fs
219222
.async_lookup(ctx.context(), ctx.nodeid(), name)
@@ -236,7 +239,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
236239
}
237240
}
238241

239-
async fn async_getattr<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
242+
async fn async_getattr<S: BitmapSlice>(
243+
&self,
244+
mut ctx: SrvContext<'_, '_, F, S>,
245+
) -> Result<usize> {
240246
let GetattrIn { flags, fh, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
241247
let handle = if (flags & GETATTR_FH) != 0 {
242248
Some(fh.into())
@@ -251,7 +257,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
251257
ctx.async_handle_attr_result(result).await
252258
}
253259

254-
async fn async_setattr<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
260+
async fn async_setattr<S: BitmapSlice>(
261+
&self,
262+
mut ctx: SrvContext<'_, '_, F, S>,
263+
) -> Result<usize> {
255264
let setattr_in: SetattrIn = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
256265
let handle = if setattr_in.valid & FATTR_FH != 0 {
257266
Some(setattr_in.fh.into())
@@ -268,7 +277,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
268277
ctx.async_handle_attr_result(result).await
269278
}
270279

271-
async fn async_open<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
280+
async fn async_open<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result<usize> {
272281
let OpenIn { flags, fuse_flags } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
273282
let result = self
274283
.fs
@@ -289,7 +298,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
289298
}
290299
}
291300

292-
async fn async_read<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
301+
async fn async_read<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result<usize> {
293302
let ReadIn {
294303
fh,
295304
offset,
@@ -347,7 +356,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
347356
.await
348357
.map_err(Error::EncodeMessage)?;
349358
ctx.w
350-
.async_commit(Some(&data_writer.0))
359+
.async_commit(Some(&mut data_writer.0))
351360
.await
352361
.map_err(Error::EncodeMessage)?;
353362
Ok(out.len as usize)
@@ -356,7 +365,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
356365
}
357366
}
358367

359-
async fn async_write<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
368+
async fn async_write<S: BitmapSlice>(
369+
&self,
370+
mut ctx: SrvContext<'_, '_, F, S>,
371+
) -> Result<usize> {
360372
let WriteIn {
361373
fh,
362374
offset,
@@ -408,7 +420,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
408420
}
409421
}
410422

411-
async fn async_fsync<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
423+
async fn async_fsync<S: BitmapSlice>(
424+
&self,
425+
mut ctx: SrvContext<'_, '_, F, S>,
426+
) -> Result<usize> {
412427
let FsyncIn {
413428
fh, fsync_flags, ..
414429
} = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
@@ -424,7 +439,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
424439
}
425440
}
426441

427-
async fn async_fsyncdir<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
442+
async fn async_fsyncdir<S: BitmapSlice>(
443+
&self,
444+
mut ctx: SrvContext<'_, '_, F, S>,
445+
) -> Result<usize> {
428446
let FsyncIn {
429447
fh, fsync_flags, ..
430448
} = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
@@ -440,7 +458,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
440458
}
441459
}
442460

443-
async fn async_create<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
461+
async fn async_create<S: BitmapSlice>(
462+
&self,
463+
mut ctx: SrvContext<'_, '_, F, S>,
464+
) -> Result<usize> {
444465
let args: CreateIn = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
445466
let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, size_of::<CreateIn>())?;
446467
let name = bytes_to_cstr(&buf)?;
@@ -476,7 +497,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
476497

477498
async fn async_fallocate<S: BitmapSlice>(
478499
&self,
479-
mut ctx: SrvContext<'_, F, S>,
500+
mut ctx: SrvContext<'_, '_, F, S>,
480501
) -> Result<usize> {
481502
let FallocateIn {
482503
fh,
@@ -497,7 +518,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
497518
}
498519
}
499520

500-
impl<'a, F: AsyncFileSystem, S: BitmapSlice> SrvContext<'a, F, S> {
521+
impl<'a, 'b, F: AsyncFileSystem, S: BitmapSlice> SrvContext<'a, 'b, F, S> {
501522
async fn async_reply_ok<T: ByteValued>(
502523
&mut self,
503524
out: Option<T>,

0 commit comments

Comments
 (0)