玉兔远程控制 0.1.0-bate1
载入中...
搜索中...
未找到
ChannelSFTP.cpp
1// Copyright Copyright (c) Kang Lin studio, All Rights Reserved
2// Author Kang Lin <kl222@126.com>
3
4#include <QLoggingCategory>
5#include "ChannelSFTP.h"
6#include "BackendFileTransfer.h"
7#include "RemoteFileSystemModel.h"
8#include "ListFileModel.h"
9
10#include <fcntl.h>
11#include <sys/stat.h>
12
13#ifdef _MSC_VER
14 // See: [_topen](https://learn.microsoft.com/zh-cn/cpp/c-runtime-library/reference/open-wopen?view=msvc-170)
15 #include <io.h>
16 #define S_IRUSR 0400 // Owner read permission
17 #define S_IWUSR 0200 // Owner write permission
18 #define S_IXUSR 0100 // Owner execute permission
19 #define S_IRWXU (S_IREAD | S_IWRITE | S_IEXEC) // Owner read, write, execute
20#endif
21
22static Q_LOGGING_CATEGORY(log, "Channel.SFTP")
23
25 bool bWakeUp, QObject *parent)
26 : CChannelSSH(pBackend, pPara, bWakeUp, parent)
27 , m_SessionSftp(nullptr)
28{
29 bool check = false;
30 CBackendFileTransfer* pB = qobject_cast<CBackendFileTransfer*>(pBackend);
31 if(pB) {
32 check = connect(this, SIGNAL(sigGetDir(CRemoteFileSystem*, QVector<QSharedPointer<CRemoteFileSystem> >, bool)),
33 pB, SIGNAL(sigGetDir(CRemoteFileSystem*, QVector<QSharedPointer<CRemoteFileSystem> >, bool)));
34 Q_ASSERT(check);
35 check = connect(this, SIGNAL(sigFileTransferUpdate(QSharedPointer<CFileTransfer>)),
36 pB, SIGNAL(sigFileTransferUpdate(QSharedPointer<CFileTransfer>)));
37 Q_ASSERT(check);
38 check = connect(this, SIGNAL(sigError(int,QString)),
39 pB, SIGNAL(sigError(int,QString)));
40 Q_ASSERT(check);
41 }
42}
43
44int CChannelSFTP::Process()
45{
46 int nRet = 0;
47
48 struct timeval timeout = {0, DEFAULT_TIMEOUT};
49 ssh_channel channels[2], channel_out[2];
50 channels[0] = nullptr; //m_SessionSftp->channel;
51 channels[1] = nullptr;
52
53 fd_set set;
54 FD_ZERO(&set);
55 socket_t fdEvent = SSH_INVALID_SOCKET;
56 if(m_pEvent)
57 fdEvent = m_pEvent->GetFd();
58 if(SSH_INVALID_SOCKET != fdEvent)
59 FD_SET(fdEvent, &set);
60
61 socket_t sshFd = ssh_get_fd(m_Session);
62 if(SSH_INVALID_SOCKET != sshFd)
63 FD_SET(sshFd, &set);
64
65 socket_t f = qMax(sshFd, fdEvent);
66 //qDebug(log) << "ssh_select:" << fdEvent;
67 nRet = ssh_select(channels, channel_out, f + 1, &set, &timeout);
68 //qDebug(log) << "ssh_select end:" << nRet;
69 if(EINTR == nRet)
70 return 0;
71
72 if(SSH_OK != nRet) {
73 QString szErr;
74 szErr = "ssh_channel_select failed: " + QString::number(nRet);
75 szErr += ssh_get_error(m_Session);
76 qCritical(log) << szErr;
77 setErrorString(szErr);
78 return -3;
79 }
80
81 if(SSH_INVALID_SOCKET != fdEvent && FD_ISSET(fdEvent, &set)) {
82 //qDebug(log) << "fires event";
83 if(m_pEvent) {
84 nRet = m_pEvent->Reset();
85 if(nRet) {
86 QString szErr = "Reset event fail";
87 qCritical(log) << szErr;
88 setErrorString(szErr);
89 return -4;
90 }
91 }
92 }
93
94 AsyncReadDir();
95
96 AsyncFile();
97
98 return 0;
99}
100
101QSharedPointer<CRemoteFileSystem> CChannelSFTP::GetFileNode(
102 const QString& szPath, sftp_attributes attributes)
103{
104 if(!attributes) return nullptr;
105 /*
106 qDebug(log) << szPath << "name:" << attributes->name
107 << "size:" << attributes->size << "type:" << attributes->type;//*/
108 QString szName = QString::fromUtf8(attributes->name);
109 if("." == szName || ".." == szName)
110 return nullptr;
111 CRemoteFileSystem::TYPE type = CRemoteFileSystem::TYPE::NO;
112 switch(attributes->type) {
113 case SSH_FILEXFER_TYPE_DIRECTORY:
114 type = CRemoteFileSystem::TYPE::DIR;
115 break;
116 case SSH_FILEXFER_TYPE_SYMLINK:
117 type = CRemoteFileSystem::TYPE::SYMLINK;
118 break;
119 case SSH_FILEXFER_TYPE_REGULAR:
120 type = CRemoteFileSystem::TYPE::FILE;
121 break;
122 case SSH_FILEXFER_TYPE_SPECIAL:
123 type = CRemoteFileSystem::TYPE::SPECIAL;
124 break;
125 default:
126 qWarning(log) << "Unsupported type:" << attributes->type;
127 return nullptr;
128 break;
129 }
130 if(szPath.right(1) == '/')
131 szName = szPath + szName;
132 else
133 szName = szPath + "/" + szName;
134 QSharedPointer<CRemoteFileSystem> p(new CRemoteFileSystem(szName, type));
135 p->SetSize(attributes->size);
136 p->SetPermissions((QFileDevice::Permissions)attributes->permissions);
137 p->SetLastModified(QDateTime::fromSecsSinceEpoch(attributes->mtime));
138 p->SetCreateTime(QDateTime::fromSecsSinceEpoch(attributes->createtime));
139 return p;
140}
141
143{
144 int nRet = SSH_OK;
145 if(!m_SessionSftp || !p)
146 return -1;
147 sftp_dir dir = nullptr;
148 sftp_attributes attributes = nullptr;
149
150 QString szPath = p->GetPath();
151 QVector<QSharedPointer<CRemoteFileSystem> > vFileNode;
152 dir = sftp_opendir(m_SessionSftp, szPath.toStdString().c_str());
153 if (!dir)
154 {
155 QString szErr = "Directory not opened:"
156 + QString::number(sftp_get_error(m_SessionSftp))
157 + ssh_get_error(m_Session);
158 qCritical(log) << szErr;
159 emit sigError(-1, szErr);
160 return SSH_ERROR;
161 }
162
163 while ((attributes = sftp_readdir(m_SessionSftp, dir)) != NULL)
164 {
165 qDebug(log) << "name:" << attributes->name << "size:" << attributes->size;
166 auto p = GetFileNode(szPath, attributes);
167 if(p)
168 vFileNode.append(p);
169 sftp_attributes_free(attributes);
170 }
171
172 if (!sftp_dir_eof(dir))
173 {
174 qCritical(log) << "Can't list directory:"
175 << sftp_get_error(m_SessionSftp)
176 << ssh_get_error(m_Session);
177 sftp_closedir(dir);
178 return SSH_ERROR;
179 }
180
181 nRet = sftp_closedir(dir);
182 if (nRet != SSH_OK)
183 {
184 qCritical(log) << "Can't close directory:"
185 << sftp_get_error(m_SessionSftp)
186 << ssh_get_error(m_Session);
187 return nRet;
188 }
189
190 emit sigGetDir(p, vFileNode, true);
191
192 return nRet;
193}
194
195int CChannelSFTP::MakeDir(const QString &dir)
196{
197 int nRet = SSH_FX_OK;
198 if(!m_SessionSftp)
199 return SSH_FX_NO_CONNECTION;
200 nRet = sftp_mkdir(m_SessionSftp, dir.toStdString().c_str(), S_IRWXU);
201 if (nRet != SSH_OK)
202 {
203 if (sftp_get_error(m_SessionSftp) != SSH_FX_FILE_ALREADY_EXISTS)
204 {
205 QString szErr = "Can't create directory: "
206 + dir + QString::number(nRet)
207 + ssh_get_error(m_Session);
208 qCritical(log) << szErr;
209 emit sigError(nRet, szErr);
210 } else
211 qDebug(log) << "Create directory:" << dir;
212 }
213 return nRet;
214}
215
216int CChannelSFTP::RemoveDir(const QString& dir)
217{
218 if (!m_SessionSftp)
219 return SSH_FX_NO_CONNECTION;
220
221 // 1. 打开目录
222 sftp_dir sftpDir = sftp_opendir(m_SessionSftp, dir.toStdString().c_str());
223 if (!sftpDir) {
224 qCritical(log) << "Failed to open directory for remove:" << dir;
225 // 如果打不开目录,尝试直接删除(可能是空目录或者文件)
226 int ret = sftp_rmdir(m_SessionSftp, dir.toStdString().c_str());
227 if (ret == SSH_OK) {
228 qDebug(log) << "Removed directory (directly):" << dir;
229 return SSH_OK;
230 } else {
231 QString szErr = "Can't remove directory:" + dir + ssh_get_error(m_Session);
232 qCritical(log) << szErr;
233 emit sigError(ret, szErr);
234 return ret;
235 }
236 }
237
238 // 2. 遍历目录内容
239 sftp_attributes attr;
240 while ((attr = sftp_readdir(m_SessionSftp, sftpDir)) != NULL) {
241 QString name = QString::fromUtf8(attr->name);
242 if (name == "." || name == "..") {
243 sftp_attributes_free(attr);
244 continue;
245 }
246 QString fullPath = dir + "/" + name;
247 if (attr->type == SSH_FILEXFER_TYPE_DIRECTORY) {
248 // 递归删除子目录
249 RemoveDir(fullPath);
250 } else {
251 // 删除文件
252 int ret = sftp_unlink(m_SessionSftp, fullPath.toStdString().c_str());
253 if (ret != SSH_OK) {
254 QString szErr = "Can't remove file:" + fullPath + ssh_get_error(m_Session);
255 qCritical(log) << szErr;
256 emit sigError(ret, szErr);
257 } else {
258 qDebug(log) << "Removed file:" << fullPath;
259 }
260 }
261 sftp_attributes_free(attr);
262 }
263
264 sftp_closedir(sftpDir);
265
266 // 3. 删除空目录
267 int ret = sftp_rmdir(m_SessionSftp, dir.toStdString().c_str());
268 if (ret != SSH_OK) {
269 QString szErr = "Can't remove directory:" + dir + ssh_get_error(m_Session);
270 qCritical(log) << szErr;
271 emit sigError(ret, szErr);
272 } else {
273 qDebug(log) << "Removed directory:" << dir;
274 }
275 return ret;
276}
277
278int CChannelSFTP::RemoveFile(const QString &file)
279{
280 // 删除文件
281 int ret = sftp_unlink(m_SessionSftp, file.toStdString().c_str());
282 if (ret != SSH_OK) {
283 QString szErr = "Can't remove file:" + file + ssh_get_error(m_Session);
284 qCritical(log) << szErr;
285 emit sigError(ret, szErr);
286 } else {
287 qDebug(log) << "Removed file:" << file;
288 }
289 return ret;
290}
291
292int CChannelSFTP::Rename(const QString &oldPath, const QString &newPath)
293{
294 int nRet = SSH_FX_OK;
295 if(!m_SessionSftp)
296 return SSH_FX_NO_CONNECTION;
297 nRet = sftp_rename(m_SessionSftp,
298 oldPath.toStdString().c_str(),
299 newPath.toStdString().c_str());
300 if (nRet != SSH_OK)
301 {
302 QString szErr = "Fail: Can't rename: " + QString::number(nRet)
303 + ssh_get_error(m_Session)
304 + " " + oldPath + " to " + newPath;
305 qCritical(log) << szErr;
306 emit sigError(nRet, szErr);
307 } else
308 qDebug(log) << "Rename:" << oldPath << "to" << newPath;
309 return nRet;
310}
311
312qint64 CChannelSFTP::readData(char *data, qint64 maxlen)
313{
314 qint64 nRet = 0;
315
316 return nRet;
317}
318
319qint64 CChannelSFTP::writeData(const char *data, qint64 len)
320{
321 qint64 nRet = 0;
322
323 return nRet;
324}
325
326int CChannelSFTP::OnOpen(ssh_session session)
327{
328 int nRet = SSH_OK;
329
330 m_SessionSftp = sftp_new(session);
331 if (!m_SessionSftp)
332 {
333 QString szErr = "Error allocating SFTP session: ";
334 szErr += ssh_get_error(session);
335 qCritical(log) << szErr;
336 emit sigError(nRet, szErr);
337 return SSH_ERROR;
338 }
339
340 nRet = sftp_init(m_SessionSftp);
341 if (SSH_OK != nRet)
342 {
343 QString szErr = "Error initializing SFTP session:" + sftp_get_error(m_SessionSftp);
344 qCritical(log) << szErr;
345 emit sigError(nRet, szErr);
346 sftp_free(m_SessionSftp);
347 m_SessionSftp = nullptr;
348 return nRet;
349 }
350
351 return nRet;
352}
353
354void CChannelSFTP::OnClose()
355{
356 foreach (auto d, m_vDirs) {
357 if(d->sftp) {
358 sftp_closedir(d->sftp);
359 d->sftp = nullptr;
360 }
361 }
362 m_vDirs.clear();
363 foreach(auto f, m_vFiles) {
364 if(f->remote) {
365 sftp_close(f->remote);
366 f->remote = nullptr;
367 }
368 if(-1 != f->local) {
369 ::close(f->local);
370 f->local = -1;
371 }
372 }
373 m_vFiles.clear();
374 if(m_SessionSftp) {
375 sftp_free(m_SessionSftp);
376 m_SessionSftp = nullptr;
377 }
378}
379
380void CChannelSFTP::slotGetDir(const QString &szPath, CRemoteFileSystem *p)
381{
382 if(szPath.isEmpty() || !p) {
383 qCritical(log) << "The path is empty";
384 return;
385 }
386 foreach(auto d, m_vDirs) {
387 if(d->remoteFileSystem == p) {
388 qDebug(log) << szPath << "already exists";
389 return;
390 }
391 }
392 QSharedPointer<DIR_READER> dir(new DIR_READER());
393 if(!dir)
394 return;
395 dir->sftp = nullptr;
396 dir->szPath = szPath;
397 dir->state = STATE::OPEN;
398 dir->remoteFileSystem = p;
399 dir->Error = SSH_FX_OK;
400 m_vDirs.append(dir);
401 WakeUp();
402}
403
404int CChannelSFTP::AsyncReadDir()
405{
406 //qDebug(log) << Q_FUNC_INFO;
407 for(auto it = m_vDirs.begin(); it != m_vDirs.end();) {
408 auto d = *it;
409 switch (d->state) {
410 case STATE::OPEN: {
411 d->sftp = sftp_opendir(m_SessionSftp, d->szPath.toStdString().c_str());
412 if(d->sftp) {
413 d->state = STATE::READ;
414 break;
415 }
416 int err = sftp_get_error(m_SessionSftp);
417 if (err == SSH_FX_OK || err == SSH_FX_EOF) {
418 d->state = STATE::FINISH;
419 break;
420 }
421 if (err == SSH_FX_FAILURE && ssh_get_error_code(m_Session) == SSH_REQUEST_DENIED) {
422 qDebug(log) << "Request denied:" << d->szPath;
423 break;
424 }
425 qCritical(log) << "Error opening directory:" << d->szPath
426 << "Error:"
427 << sftp_get_error(m_SessionSftp)
428 << ssh_get_error(m_Session);
429 d->state = STATE::ERR;
430 d->Error = err;
431 break;
432 }
433 case STATE::READ: {
434 sftp_attributes attributes = nullptr;
435 while ((attributes = sftp_readdir(m_SessionSftp, d->sftp)) != NULL) {
436 auto p = GetFileNode(d->szPath, attributes);
437 if(p)
438 d->vFileNode.append(p);
439 sftp_attributes_free(attributes);
440 }
441 // 检查是否结束
442 if (sftp_dir_eof(d->sftp)) {
443 d->state = STATE::CLOSE;
444 break;
445 }
446 int err = sftp_get_error(m_SessionSftp);
447 if (err == SSH_FX_OK)
448 break;
449 if (err == SSH_FX_EOF) {
450 break;
451 d->state = STATE::CLOSE;
452 }
453 qCritical(log) << "Error reading directory:" << d->szPath
454 << "Error:" << err << ssh_get_error(m_Session);
455 d->state = STATE::ERR;
456 d->Error = err;
457 break;
458 }
459 case STATE::CLOSE: {
460 if(!d->sftp) {
461 d->state = STATE::FINISH;
462 break;
463 }
464 int rc = sftp_closedir(d->sftp);
465 if(SSH_NO_ERROR == rc) {
466 d->state = STATE::FINISH;
467 d->sftp = nullptr;
468 break;
469 }
470 int err = ssh_get_error_code(m_Session);
471 qCritical(log) << "Error close directory:" << d->szPath
472 << "Error:" << err << ssh_get_error(m_Session);
473 d->state = STATE::FINISH;
474 d->Error = err;
475 break;
476 }
477 case STATE::FINISH:
478 if(d && STATE::FINISH == d->state) {
479 qDebug(log) << "Remote get path finish:" << d->szPath << d->vFileNode.size();
480 emit sigGetDir(d->remoteFileSystem, d->vFileNode, true);
481 it = m_vDirs.erase(it);
482 }
483 continue;
484 case STATE::ERR:
485 d->state = STATE::CLOSE;
486 break;
487 default:
488 break;
489 }
490
491 it++;
492 }
493
494 return 0;
495}
496
497void CChannelSFTP::slotStartFileTransfer(QSharedPointer<CFileTransfer> f)
498{
499 f->slotSetstate(CFileTransfer::State::Opening);
500 QSharedPointer<_AFILE> file(new _AFILE);
501 memset(file.data(), 0, sizeof(_AFILE));
502 file->local = -1;
503 file->remote = nullptr;
504 file->state = STATE::OPEN;
505 file->fileTransfer = f;
506#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0, 11, 0)
507 file->nChunkSize = BUF_SIZE;
508 file->nConcurrentCount = 5;
509#endif
510 m_vFiles.append(file);
511 emit sigFileTransferUpdate(f);
512 WakeUp();
513}
514
515void CChannelSFTP::slotStopFileTransfer(QSharedPointer<CFileTransfer> f)
516{
517 foreach(auto file, m_vFiles) {
518 if(file->fileTransfer == f) {
519 m_vFiles.removeAll(file);
520 if(-1 != file->local) {
521 ::close(file->local);
522 file->local = -1;
523 }
524 if(file->remote) {
525 sftp_close(file->remote);
526 file->remote = nullptr;
527 }
528 f->slotSetstate(CFileTransfer::State::Stop);
529 emit sigFileTransferUpdate(f);
530 break;
531 }
532 }
533}
534
535int CChannelSFTP::AsyncFile()
536{
537 for(auto it = m_vFiles.begin(); it != m_vFiles.end();) {
538 auto file = *it;
539 switch(file->state) {
540 case STATE::OPEN: {
541 int remoteFlag = O_WRONLY | O_CREAT | O_TRUNC;
542 int localFlag = O_RDONLY;
543 quint32 permission = file->fileTransfer->GetRemotePermission();
544
545 if(file->fileTransfer->GetDirection() == CFileTransfer::Direction::Download) {
546 remoteFlag = O_RDONLY;
547 localFlag = O_WRONLY | O_CREAT | O_TRUNC;
548 }
549
550 file->remote = sftp_open(
551 m_SessionSftp,
552 file->fileTransfer->GetRemoteFile().toStdString().c_str(),
553 remoteFlag, permission); // S_IRWXU);
554 if (!file->remote)
555 {
556 file->state = STATE::ERR;
557 QString szErr = "Can't open remote file: " + file->fileTransfer->GetRemoteFile()
558 + ssh_get_error(m_Session);
559 file->fileTransfer->slotSetExplanation(szErr);
560 qCritical(log) << szErr;
561 break;
562 }
563 qDebug(log) << "Open remote file:" << file->fileTransfer->GetRemoteFile();
564
565 sftp_file_set_nonblocking(file->remote);
566
567 file->local = ::open(
568 file->fileTransfer->GetLocalFile().toStdString().c_str(),
569 localFlag, permission);
570 if(-1 == file->local) {
571 file->state = STATE::ERR;
572 QString szErr = "Can't open local file: " + file->fileTransfer->GetLocalFile() + " " + strerror(errno);
573 file->fileTransfer->slotSetExplanation(szErr);
574 qCritical(log) << szErr;
575 break;
576 }
577 qDebug(log) << "Open local file:" << file->fileTransfer->GetLocalFile();
578
579 if(file->fileTransfer->GetDirection() == CFileTransfer::Direction::Download) {
580#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0, 11, 0)
581 if(ssh_version(SSH_VERSION_INT(0, 11,0))) {
582 sftp_limits_t lim = sftp_limits(m_SessionSftp);
583 if(lim) {
584 file->nChunkSize = lim->max_read_length;
585 qDebug(log) << "limits: max_open_handles:" << lim->max_open_handles
586 << "max_packet_length" << lim->max_packet_length
587 << "max_read_length" << lim->max_read_length
588 << "max_write_length:" << lim->max_write_length;
589 sftp_limits_free(lim);
590 }
591
592 quint64 nRequestBytes = 0;
593 for(int i = 0;
594 i < file->nConcurrentCount
595 && nRequestBytes < file->fileTransfer->GetFileSize();
596 i++) {
597 sftp_aio aio = nullptr;
598 quint64 nRequest = file->fileTransfer->GetFileSize() - nRequestBytes;
599 if(nRequest > file->nChunkSize)
600 nRequest = file->nChunkSize;
601 ssize_t nRet = sftp_aio_begin_read(file->remote, nRequest, &aio);
602 if(0 > nRet) {
603 int rc = ssh_get_error_code(m_Session);
604 if (rc != SSH_NO_ERROR) {
605 QString szErr = "Error during sftp aio download: " + QString::number(rc);
606 szErr += ssh_get_error(m_Session);
607 qCritical(log) << szErr;
608 break;
609 }
610 }
611 Q_ASSERT(nRequest == nRet);
612 nRequestBytes += nRet;
613 file->aio.append(aio);
614 }
615 }
616#else
617 file->asyncReadId = sftp_async_read_begin(file->remote, BUF_SIZE);
618 if(file->asyncReadId < 0) {
619 file->state = STATE::ERR;
620 QString szErr = "sftp_async_read_begin failed." + sftp_get_error(m_SessionSftp);
621 file->fileTransfer->slotSetExplanation(szErr);
622 qCritical(log) << szErr;
623 break;
624 }
625#endif
626 } else { // Upload
627#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0, 11, 0)
628 if(ssh_version(SSH_VERSION_INT(0, 11,0))) {
629 sftp_limits_t lim = sftp_limits(m_SessionSftp);
630 if(lim) {
631 file->nChunkSize = lim->max_write_length;
632 qDebug(log) << "limits: max_open_handles:" << lim->max_open_handles
633 << "max_packet_length" << lim->max_packet_length
634 << "max_read_length" << lim->max_read_length
635 << "max_write_length:" << lim->max_write_length;
636 sftp_limits_free(lim);
637 }
638
639 if(!file->buffer)
640 file->buffer = new char[file->nChunkSize];
641 if(!file->buffer) {
642 file->state = STATE::ERR;
643 break;
644 }
645
646 for(int i = 0;
647 i < file->nConcurrentCount
648 && file->nRequests < file->fileTransfer->GetFileSize();
649 i++) {
650 sftp_aio aio = nullptr;
651 quint64 nRequest = file->fileTransfer->GetFileSize() - file->nRequests;
652 if(nRequest > file->nChunkSize)
653 nRequest = file->nChunkSize;
654 ssize_t nLen = ::read(file->local, file->buffer, nRequest);
655 Q_ASSERT(nLen == nRequest);
656 ssize_t nRet = sftp_aio_begin_write(file->remote, file->buffer, nLen, &aio);
657 if(0 > nRet) {
658 int rc = ssh_get_error_code(m_Session);
659 if (rc != SSH_NO_ERROR) {
660 QString szErr = "Error during sftp aio download: " + QString::number(rc);
661 szErr += ssh_get_error(m_Session);
662 qCritical(log) << szErr;
663 break;
664 }
665 }
666 Q_ASSERT(nRequest == nRet);
667 file->nRequests += nRet;
668 file->aio.append(aio);
669 }
670 }
671#endif
672 }
673
674 file->state = STATE::READ;
675 file->fileTransfer->slotSetstate(CFileTransfer::State::Transferring);
676 emit sigFileTransferUpdate(file->fileTransfer);
677 }
678 case STATE::READ: {
679 if(file->fileTransfer->GetDirection() == CFileTransfer::Direction::Download) {
680#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0, 11, 0)
681 if(ssh_version(SSH_VERSION_INT(0, 11,0))) {
682 for(auto it = file->aio.begin(); it != file->aio.end();) {
683 auto aio = *it;
684 if(nullptr == file->buffer)
685 file->buffer = new char[file->nChunkSize];
686 if(!(aio && file->buffer))
687 break;
688 ssize_t nRet = sftp_aio_wait_read(&aio, file->buffer, file->nChunkSize);
689 if (nRet < 0) {
690 if(SSH_AGAIN == nRet)
691 break;
692 int rc = ssh_get_error_code(m_Session);
693 if (rc != SSH_NO_ERROR) {
694 file->state = STATE::ERR;
695 QString szErr = "Error during sftp aio download: " + QString::number(rc);
696 szErr += ssh_get_error(m_Session);
697 qCritical(log) << szErr;
698 break;
699 }
700 }
701 file->nTransfers += nRet;
702 if(file->fileTransfer->GetFileSize() == file->nTransfers) {
703 file->state = STATE::CLOSE;
704 }
705 it = file->aio.erase(it);
706
707 int nLen = ::write(file->local, file->buffer, nRet);
708 if(nLen < 0) {
709 if(EAGAIN != nLen)
710 file->state = STATE::ERR;
711 break;
712 }
713 if(nLen != nRet) {
714 qCritical(log) << "IO is buse, Write file error:" << file->fileTransfer->GetLocalFile();
715 Q_ASSERT(false);
716 }
717 file->fileTransfer->slotTransferSize(nRet);
718 emit sigFileTransferUpdate(file->fileTransfer);
719 }
720 if(file->fileTransfer->GetFileSize() == file->nTransfers) {
721 file->state = STATE::CLOSE;
722 break;
723 }
724 if(file->aio.size() > 0)
725 break;
726
727 quint64 nRequestBytes = file->nTransfers;
728 for(int i = 0;
729 i < file->nConcurrentCount
730 && nRequestBytes < file->fileTransfer->GetFileSize();
731 i++) {
732 sftp_aio aio = nullptr;
733 quint64 nRequest = file->fileTransfer->GetFileSize() - nRequestBytes;
734 if(nRequest > file->nChunkSize)
735 nRequest = file->nChunkSize;
736 ssize_t nRet = sftp_aio_begin_read(file->remote, nRequest, &aio);
737 if(0 > nRet) {
738 int rc = ssh_get_error_code(m_Session);
739 if (rc != SSH_NO_ERROR) {
740 QString szErr = "Error during sftp aio download: " + QString::number(rc);
741 szErr += ssh_get_error(m_Session);
742 qCritical(log) << szErr;
743 break;
744 }
745 }
746 if (nRequest != nRet) {
747 QString szErr =
748 "Error during sftp aio download: sftp_aio_begin_read() "
749 "requesting less bytes even when the number of bytes "
750 "asked to read are within the max limit";
751 qWarning(log) << szErr << nRequest << nRet;
752 }
753
754 nRequestBytes += nRet;
755 file->aio.append(aio);
756 }
757 } else {
758 file->state = STATE::ERR;
759 QString szErr = tr("Error: Asynchronous uploads are not supported");
760 file->fileTransfer->slotSetExplanation(szErr);
761 qCritical(log) << szErr;
762 break;
763 }
764#else
765 int nbytes = sftp_async_read(file->remote, file->buffer + file->offset, BUF_SIZE, file->asyncReadId);
766 if (nbytes > 0 || SSH_AGAIN == nbytes) {
767 if(nbytes > 0 ) {
768 int nLen = ::write(file->local, file->buffer + file->offset, nbytes);
769 if(nLen > 0) {
770 file->fileTransfer->slotTransferSize(nLen);
771 //TODO: add nLen < nbytes
772 emit sigFileTransferUpdate(file->fileTransfer);
773 } else {
774 file->state = STATE::ERR;
775 QString szErr = "Write local file fail:" + sftp_get_error(m_SessionSftp);
776 file->fileTransfer->slotSetExplanation(szErr);
777 }
778 }
779 // Start next async read
780 if (file->asyncReadId = sftp_async_read_begin(file->remote, BUF_SIZE) < 0) {
781 file->state = STATE::ERR;
782 QString szErr = "sftp_async_read_begin failed." + sftp_get_error(m_SessionSftp);
783 file->fileTransfer->slotSetExplanation(szErr);
784 qCritical(log) << szErr;
785 }
786 } else if (nbytes == 0) {
787 file->state = STATE::CLOSE;
788 file->asyncReadId = -1;
789 } else {
790 file->state = STATE::ERR;
791 QString szErr = "sftp_async_read failed." + sftp_get_error(m_SessionSftp);
792 file->fileTransfer->slotSetExplanation(szErr);
793 qCritical(log) << szErr;
794 }
795#endif
796 } else { // Upload
797 if(ssh_version(SSH_VERSION_INT(0, 11,0))) {
798 for(auto it = file->aio.begin(); it != file->aio.end();) {
799 auto aio = *it;
800 ssize_t nRet = sftp_aio_wait_write(&aio);
801 if (nRet < 0) {
802 if(SSH_AGAIN == nRet)
803 break;
804 int rc = ssh_get_error_code(m_Session);
805 if (rc != SSH_NO_ERROR) {
806 file->state = STATE::ERR;
807 QString szErr = "Error during sftp aio upload: " + QString::number(rc);
808 szErr += ssh_get_error(m_Session);
809 qCritical(log) << szErr;
810 break;
811 }
812 }
813 file->nTransfers += nRet;
814 if(file->fileTransfer->GetFileSize() == file->nTransfers) {
815 file->state = STATE::CLOSE;
816 }
817 it = file->aio.erase(it);
818 }
819 if(file->aio.size() >= file->nConcurrentCount)
820 break;
821 int size = file->nConcurrentCount - file->aio.size();
822 for(int i = 0;
823 i < size
824 && file->nRequests < file->fileTransfer->GetFileSize();
825 i++) {
826 sftp_aio aio = nullptr;
827 quint64 nRequest = file->fileTransfer->GetFileSize() - file->nRequests;
828 if(nRequest > file->nChunkSize)
829 nRequest = file->nChunkSize;
830 ssize_t nLen = ::read(file->local, file->buffer, nRequest);
831 Q_ASSERT(nLen == nRequest);
832 ssize_t nRet = sftp_aio_begin_write(file->remote, file->buffer, nLen, &aio);
833 if(0 > nRet) {
834 int rc = ssh_get_error_code(m_Session);
835 if (rc != SSH_NO_ERROR) {
836 QString szErr = "Error during sftp aio download: " + QString::number(rc);
837 szErr += ssh_get_error(m_Session);
838 qCritical(log) << szErr;
839 break;
840 }
841 }
842 Q_ASSERT(nRequest == nRet);
843 file->nRequests += nRet;
844 file->aio.append(aio);
845 }
846 }
847 }
848 break;
849 }
850 case STATE::CLOSE: {
851 file->state = STATE::FINISH;
852 file->fileTransfer->slotSetstate(CFileTransfer::State::Closing);
853 emit sigFileTransferUpdate(file->fileTransfer);
854 break;
855 }
856 case STATE::FINISH: {
857#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0, 11, 0)
858 CleanFileAIO(file);
859#endif
860 it = m_vFiles.erase(it);
861 file->fileTransfer->slotSetstate(CFileTransfer::State::Finish);
862 emit sigFileTransferUpdate(file->fileTransfer);
863 continue;
864 }
865 case STATE::ERR: {
866#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0, 11, 0)
867 foreach (auto aio, file->aio) {
868 sftp_aio_free(aio);
869 }
870#endif
871 CleanFileAIO(file);
872
873 it = m_vFiles.erase(it);
874 file->fileTransfer->slotSetstate(CFileTransfer::State::Fail);
875 emit sigFileTransferUpdate(file->fileTransfer);
876 continue;
877 }
878 }
879
880 it++;
881 }
882 return 0;
883}
884
885int CChannelSFTP::CleanFileAIO(QSharedPointer<_AFILE> file)
886{
887 if(file->buffer) {
888 delete []file->buffer;
889 file->buffer = nullptr;
890 }
891 if(file->remote) {
892 sftp_close(file->remote);
893 file->remote = nullptr;
894 }
895 if(-1 != file->local) {
896 ::close(file->local);
897 file->local = -1;
898 }
899 return 0;
900}
后端接口。它由协议插件实现。 它默认启动一个定时器来开启一个非 Qt 事件循环(就是普通的循环处理)。 详见: Start()、 slotTimeOut()、 OnProcess() 。 当然,它仍然支...
Definition Backend.h:42
int GetDir(CRemoteFileSystem *p)
Synchronize to get the directory
void slotGetDir(const QString &szPath, CRemoteFileSystem *p)
Get the directory asynchronously
virtual bool open(OpenMode mode) override
void sigError(int nErr, const QString &szErr)
emit when the channel is error