summaryrefslogtreecommitdiffstats
path: root/src/plugins/tracing/qctfserver.cpp
diff options
context:
space:
mode:
authorAntti Määttä <antti.maatta@qt.io>2023-03-02 13:21:01 +0200
committerAntti Määttä <antti.maatta@qt.io>2023-08-03 08:23:36 +0300
commit74dd646e8202ce395223dc7768e238ca68e305ce (patch)
tree6f354f95cb400d32c3394a519050101c87ec6306 /src/plugins/tracing/qctfserver.cpp
parentc6ab516f71efeeaaf2a10dbc0368fb437f8b3a6c (diff)
CTF: Add streaming support
Implement TCP server to stream trace data. Use the QTRACE_LOCATION environment variable to configure the server address and port. The rest of the configuration is done in the client TraceRequest. The user can configure the compression level, buffer size and whether the server is buffering when client doesn't have connection. The server responds with basic response and then begins sending packets until the client disconnects or the app closes. Each packet contains stream name, stream data and whether to overwrite the stream, which is used with metadata streams. Task-number: QTBUG-105976 Change-Id: I41c708a4c7988666d59f0c6093dd41e8ccd88441 Reviewed-by: Janne Koskinen <janne.p.koskinen@qt.io>
Diffstat (limited to 'src/plugins/tracing/qctfserver.cpp')
-rw-r--r--src/plugins/tracing/qctfserver.cpp338
1 files changed, 338 insertions, 0 deletions
diff --git a/src/plugins/tracing/qctfserver.cpp b/src/plugins/tracing/qctfserver.cpp
new file mode 100644
index 00000000000..0dc01508987
--- /dev/null
+++ b/src/plugins/tracing/qctfserver.cpp
@@ -0,0 +1,338 @@
+// Copyright (C) 2023 The Qt Company Ltd.
+// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
+
+#include <qloggingcategory.h>
+#include "qctfserver_p.h"
+
+using namespace Qt::Literals::StringLiterals;
+
+Q_LOGGING_CATEGORY(lcCtfInfoTrace, "qt.core.ctfserver", QtWarningMsg)
+
+TracePacket &TracePacket::writePacket(TracePacket &packet, QCborStreamWriter &cbor, int compression)
+{
+ cbor.startMap(4);
+ cbor.append("magic"_L1);
+ cbor.append(packet.PacketMagicNumber);
+ cbor.append("name"_L1);
+ cbor.append(QString::fromUtf8(packet.stream_name));
+ cbor.append("flags"_L1);
+ cbor.append(packet.flags);
+
+ cbor.append("data"_L1);
+ if (compression > 0) {
+ QByteArray compressed = qCompress(packet.stream_data, compression);
+ cbor.append(compressed);
+ } else {
+ cbor.append(packet.stream_data);
+ }
+
+ cbor.endMap();
+ return packet;
+}
+
+QCtfServer::QCtfServer(QObject *parent)
+ : QThread(parent)
+{
+ m_keySet << "cliendId"_L1
+ << "clientVersion"_L1
+ << "sessionName"_L1
+ << "sessionTracepoints"_L1
+ << "flags"_L1
+ << "bufferSize"_L1;
+}
+
+void QCtfServer::setHost(const QString &address)
+{
+ m_address = address;
+}
+
+void QCtfServer::setPort(int port)
+{
+ m_port = port;
+}
+
+void QCtfServer::setCallback(ServerCallback *cb)
+{
+ m_cb = cb;
+}
+
+QString QCtfServer::sessionName() const
+{
+ return m_req.sessionName;
+}
+
+QString QCtfServer::sessionTracepoints() const
+{
+ return m_req.sessionTracepoints;
+}
+
+bool QCtfServer::bufferOnIdle() const
+{
+ return m_bufferOnIdle;
+}
+
+QCtfServer::ServerStatus QCtfServer::status() const
+{
+ return m_status;
+}
+
+void QCtfServer::setStatusAndNotify(ServerStatus status)
+{
+ m_status = status;
+ m_cb->handleStatusChange(status);
+}
+
+void QCtfServer::bytesWritten(qint64 size)
+{
+ m_writtenSize += size;
+ if (m_writtenSize >= m_waitWriteSize && m_eventLoop)
+ m_eventLoop->exit();
+}
+
+void QCtfServer::initWrite()
+{
+ m_waitWriteSize = 0;
+ m_writtenSize = 0;
+}
+
+bool QCtfServer::waitSocket()
+{
+ if (m_eventLoop)
+ m_eventLoop->exec();
+ return m_socket->state() == QTcpSocket::ConnectedState;
+}
+
+void QCtfServer::handleString(QCborStreamReader &cbor)
+{
+ const auto readString = [](QCborStreamReader &cbor) -> QString {
+ QString result;
+ auto r = cbor.readString();
+ while (r.status == QCborStreamReader::Ok) {
+ result += r.data;
+ r = cbor.readString();
+ }
+
+ if (r.status == QCborStreamReader::Error) {
+ // handle error condition
+ result.clear();
+ }
+ return result;
+ };
+ do {
+ if (m_currentKey.isEmpty()) {
+ m_currentKey = readString(cbor);
+ } else {
+ switch (m_keySet.indexOf(m_currentKey)) {
+ case RequestSessionName:
+ m_req.sessionName = readString(cbor);
+ break;
+ case RequestSessionTracepoints:
+ m_req.sessionTracepoints = readString(cbor);
+ break;
+ default:
+ // handle error
+ break;
+ }
+ m_currentKey.clear();
+ }
+ if (cbor.lastError() == QCborError::EndOfFile) {
+ if (!waitSocket())
+ return;
+ cbor.reparse();
+ }
+ } while (cbor.lastError() == QCborError::EndOfFile);
+}
+
+void QCtfServer::handleFixedWidth(QCborStreamReader &cbor)
+{
+ switch (m_keySet.indexOf(m_currentKey)) {
+ case RequestClientId:
+ if (!cbor.isUnsignedInteger())
+ return;
+ m_req.clientId = cbor.toUnsignedInteger();
+ break;
+ case RequestClientVersion:
+ if (!cbor.isUnsignedInteger())
+ return;
+ m_req.clientVersion = cbor.toUnsignedInteger();
+ break;
+ case RequestFlags:
+ if (!cbor.isUnsignedInteger())
+ return;
+ m_req.flags = cbor.toUnsignedInteger();
+ break;
+ case RequestBufferSize:
+ if (!cbor.isUnsignedInteger())
+ return;
+ m_req.bufferSize = cbor.toUnsignedInteger();
+ break;
+ default:
+ // handle error
+ break;
+ }
+ m_currentKey.clear();
+}
+
+void QCtfServer::readCbor(QCborStreamReader &cbor)
+{
+ switch (cbor.type()) {
+ case QCborStreamReader::UnsignedInteger:
+ case QCborStreamReader::NegativeInteger:
+ case QCborStreamReader::SimpleType:
+ case QCborStreamReader::Float16:
+ case QCborStreamReader::Float:
+ case QCborStreamReader::Double:
+ handleFixedWidth(cbor);
+ cbor.next();
+ break;
+ case QCborStreamReader::ByteArray:
+ case QCborStreamReader::String:
+ handleString(cbor);
+ break;
+ case QCborStreamReader::Array:
+ case QCborStreamReader::Map:
+ cbor.enterContainer();
+ while (cbor.lastError() == QCborError::NoError && cbor.hasNext())
+ readCbor(cbor);
+ if (cbor.lastError() == QCborError::NoError)
+ cbor.leaveContainer();
+ default:
+ break;
+ }
+}
+
+void QCtfServer::run()
+{
+ m_server = new QTcpServer();
+ QHostAddress addr;
+ if (m_address.isEmpty())
+ addr = QHostAddress(QHostAddress::Any);
+ else
+ addr = QHostAddress(m_address);
+
+ qCInfo(lcCtfInfoTrace) << "Starting CTF server: " << m_address << ", port: " << m_port;
+
+ while (m_stopping == 0) {
+ if (!m_server->isListening()) {
+ if (!m_server->listen(addr, m_port)) {
+ qCInfo(lcCtfInfoTrace) << "Unable to start server";
+ m_stopping = 1;
+ setStatusAndNotify(Error);
+ }
+ }
+ setStatusAndNotify(Idle);
+ if (m_server->waitForNewConnection(-1)) {
+ qCInfo(lcCtfInfoTrace) << "client connection";
+ m_eventLoop = new QEventLoop();
+ m_socket = m_server->nextPendingConnection();
+
+ QObject::connect(m_socket, &QTcpSocket::readyRead, [&](){
+ if (m_eventLoop) m_eventLoop->exit();
+ });
+ QObject::connect(m_socket, &QTcpSocket::bytesWritten, this, &QCtfServer::bytesWritten);
+ QObject::connect(m_socket, &QTcpSocket::disconnected, [&](){
+ if (m_eventLoop) m_eventLoop->exit();
+ });
+
+ m_server->close(); // Do not wait for more connections
+ setStatusAndNotify(Connected);
+
+ if (waitSocket())
+ {
+ QCborStreamReader cbor(m_socket);
+
+ m_req = {};
+ while (cbor.hasNext() && cbor.lastError() == QCborError::NoError)
+ readCbor(cbor);
+
+ if (!m_req.isValid()) {
+ qCInfo(lcCtfInfoTrace) << "Invalid trace request.";
+ m_socket->close();
+ } else {
+ m_compression = m_req.flags & CompressionMask;
+ m_bufferOnIdle = !(m_req.flags & DontBufferOnIdle);
+
+ m_maxPackets = qMax(m_req.bufferSize / TracePacket::PacketSize, 16u);
+
+ qCInfo(lcCtfInfoTrace) << "request received: " << m_req.sessionName << ", " << m_req.sessionTracepoints;
+
+ m_cb->handleSessionChange();
+ {
+ TraceResponse resp;
+ resp.serverId = ServerId;
+ resp.serverVersion = 1;
+ resp.serverName = QStringLiteral("Ctf Server");
+
+ QCborStreamWriter cbor(m_socket);
+ cbor.startMap(m_compression ? 4 : 3);
+ cbor.append("serverId"_L1);
+ cbor.append(resp.serverId);
+ cbor.append("serverVersion"_L1);
+ cbor.append(resp.serverVersion);
+ cbor.append("serverName"_L1);
+ cbor.append(resp.serverName);
+ if (m_compression) {
+ cbor.append("compressionScheme"_L1);
+ cbor.append("zlib"_L1);
+ }
+ cbor.endMap();
+ }
+
+ qCInfo(lcCtfInfoTrace) << "response sent, sending data";
+ if (waitSocket()) {
+ while (m_socket->state() == QTcpSocket::ConnectedState) {
+ QList<TracePacket> packets;
+ {
+ QMutexLocker lock(&m_mutex);
+ while (m_packets.size() == 0)
+ m_bufferHasData.wait(&m_mutex);
+ packets = std::exchange(m_packets, {});
+ }
+
+ {
+ QCborStreamWriter cbor(m_socket);
+ for (TracePacket &packet : packets) {
+ TracePacket::writePacket(packet, cbor, m_compression);
+ if (!waitSocket())
+ break;
+ }
+ }
+ qCInfo(lcCtfInfoTrace) << packets.size() << " packets written";
+ }
+ }
+
+ qCInfo(lcCtfInfoTrace) << "client connection closed";
+ }
+ }
+ delete m_eventLoop;
+ m_eventLoop = nullptr;
+ } else {
+ qCInfo(lcCtfInfoTrace) << "error: " << m_server->errorString();
+ m_stopping = 1;
+ setStatusAndNotify(Error);
+ }
+ }
+}
+
+void QCtfServer::startServer()
+{
+ start();
+}
+void QCtfServer::stopServer()
+{
+ this->m_stopping = 1;
+ wait();
+}
+
+void QCtfServer::bufferData(const QString &stream, const QByteArray &data, quint32 flags)
+{
+ QMutexLocker lock(&m_mutex);
+ TracePacket packet;
+ packet.stream_name = stream.toUtf8();
+ packet.stream_data = data;
+ packet.flags = flags;
+ m_packets.append(packet);
+ if (m_packets.size() > m_maxPackets)
+ m_packets.pop_front();
+ m_bufferHasData.wakeOne();
+}