libjansson replaced to rapidjson.

Sync changes with proxy.
This commit is contained in:
XMRig
2017-10-04 23:33:30 +03:00
parent 4cf3bb9930
commit af51513614
78 changed files with 15550 additions and 6420 deletions

View File

@@ -23,13 +23,19 @@
#include <inttypes.h>
#include <iterator>
#include <stdio.h>
#include <string.h>
#include <utility>
#include "log/Log.h"
#include "interfaces/IClientListener.h"
#include "log/Log.h"
#include "net/Client.h"
#include "net/Url.h"
#include "rapidjson/document.h"
#include "rapidjson/error/en.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
#ifdef XMRIG_PROXY_PROJECT
@@ -69,8 +75,8 @@ Client::Client(int id, const char *agent, IClientListener *listener) :
m_hints.ai_socktype = SOCK_STREAM;
m_hints.ai_protocol = IPPROTO_TCP;
m_recvBuf.base = static_cast<char*>(malloc(kRecvBufSize));
m_recvBuf.len = kRecvBufSize;
m_recvBuf.base = m_buf;
m_recvBuf.len = sizeof(m_buf);
# ifndef XMRIG_PROXY_PROJECT
m_keepAliveTimer.data = this;
@@ -81,36 +87,7 @@ Client::Client(int id, const char *agent, IClientListener *listener) :
Client::~Client()
{
free(m_recvBuf.base);
free(m_socket);
}
/**
* @brief Send raw data to server.
*
* @param data
*/
int64_t Client::send(char *data, size_t size)
{
LOG_DEBUG("[%s:%u] send (%d bytes): \"%s\"", m_url.host(), m_url.port(), size ? size : strlen(data), data);
if (state() != ConnectedState || !uv_is_writable(m_stream)) {
LOG_DEBUG_ERR("[%s:%u] send failed, invalid state: %d", m_url.host(), m_url.port(), m_state);
return -1;
}
uv_buf_t buf = uv_buf_init(data, (unsigned int) (size ? size : strlen(data)));
uv_write_t *req = new uv_write_t;
req->data = buf.base;
uv_write(req, m_stream, &buf, 1, [](uv_write_t *req, int status) {
free(req->data);
delete req;
});
m_expire = uv_now(uv_default_loop()) + kResponseTimeout;
return m_sequence++;
delete m_socket;
}
@@ -175,8 +152,6 @@ void Client::tick(uint64_t now)
int64_t Client::submit(const JobResult &result)
{
char *req = static_cast<char*>(malloc(345));
# ifdef XMRIG_PROXY_PROJECT
const char *nonce = result.nonce;
const char *data = result.result;
@@ -191,11 +166,11 @@ int64_t Client::submit(const JobResult &result)
data[64] = '\0';
# endif
snprintf(req, 345, "{\"id\":%" PRIu64 ",\"jsonrpc\":\"2.0\",\"method\":\"submit\",\"params\":{\"id\":\"%s\",\"job_id\":\"%s\",\"nonce\":\"%s\",\"result\":\"%s\"}}\n",
m_sequence, m_rpcId, result.jobId, nonce, data);
const size_t size = snprintf(m_sendBuf, sizeof(m_sendBuf), "{\"id\":%" PRIu64 ",\"jsonrpc\":\"2.0\",\"method\":\"submit\",\"params\":{\"id\":\"%s\",\"job_id\":\"%s\",\"nonce\":\"%s\",\"result\":\"%s\"}}\n",
m_sequence, m_rpcId, result.jobId.data(), nonce, data);
m_results[m_sequence] = SubmitResult(m_sequence, result.diff, result.actualDiff());
return send(req);
return send(size);
}
@@ -221,25 +196,25 @@ bool Client::isCriticalError(const char *message)
}
bool Client::parseJob(const json_t *params, int *code)
bool Client::parseJob(const rapidjson::Value &params, int *code)
{
if (!json_is_object(params)) {
if (!params.IsObject()) {
*code = 2;
return false;
}
Job job(m_id, m_url.isNicehash());
if (!job.setId(json_string_value(json_object_get(params, "job_id")))) {
if (!job.setId(params["job_id"].GetString())) {
*code = 3;
return false;
}
if (!job.setBlob(json_string_value(json_object_get(params, "blob")))) {
if (!job.setBlob(params["blob"].GetString())) {
*code = 4;
return false;
}
if (!job.setTarget(json_string_value(json_object_get(params, "target")))) {
if (!job.setTarget(params["target"].GetString())) {
*code = 5;
return false;
}
@@ -254,9 +229,9 @@ bool Client::parseJob(const json_t *params, int *code)
}
bool Client::parseLogin(const json_t *result, int *code)
bool Client::parseLogin(const rapidjson::Value &result, int *code)
{
const char *id = json_string_value(json_object_get(result, "id"));
const char *id = result["id"].GetString();
if (!id || strlen(id) >= sizeof(m_rpcId)) {
*code = 1;
return false;
@@ -265,7 +240,7 @@ bool Client::parseLogin(const json_t *result, int *code)
memset(m_rpcId, 0, sizeof(m_rpcId));
memcpy(m_rpcId, id, strlen(id));
return parseJob(json_object_get(result, "job"), code);
return parseJob(result["job"], code);
}
@@ -292,6 +267,26 @@ int Client::resolve(const char *host)
}
int64_t Client::send(size_t size)
{
LOG_DEBUG("[%s:%u] send (%d bytes): \"%s\"", m_url.host(), m_url.port(), size, m_sendBuf);
if (state() != ConnectedState || !uv_is_writable(m_stream)) {
LOG_DEBUG_ERR("[%s:%u] send failed, invalid state: %d", m_url.host(), m_url.port(), m_state);
return -1;
}
uv_buf_t buf = uv_buf_init(m_sendBuf, (unsigned int) size);
if (uv_try_write(m_stream, &buf, 1) < 0) {
close();
return -1;
}
m_expire = uv_now(uv_default_loop()) + kResponseTimeout;
return m_sequence++;
}
void Client::close()
{
if (m_state == UnconnectedState || m_state == ClosingState || !m_socket) {
@@ -311,12 +306,12 @@ void Client::connect(struct sockaddr *addr)
setState(ConnectingState);
reinterpret_cast<struct sockaddr_in*>(addr)->sin_port = htons(m_url.port());
free(m_socket);
delete m_socket;
uv_connect_t *req = (uv_connect_t*) malloc(sizeof(uv_connect_t));
uv_connect_t *req = new uv_connect_t;
req->data = this;
m_socket = static_cast<uv_tcp_t*>(malloc(sizeof(uv_tcp_t)));
m_socket = new uv_tcp_t;
m_socket->data = this;
uv_tcp_init(uv_default_loop(), m_socket);
@@ -334,26 +329,36 @@ void Client::login()
{
m_results.clear();
json_t *req = json_object();
json_object_set(req, "id", json_integer(1));
json_object_set(req, "jsonrpc", json_string("2.0"));
json_object_set(req, "method", json_string("login"));
rapidjson::Document doc;
doc.SetObject();
json_t *params = json_object();
json_object_set(params, "login", json_string(m_url.user()));
json_object_set(params, "pass", json_string(m_url.password()));
json_object_set(params, "agent", json_string(m_agent));
auto &allocator = doc.GetAllocator();
json_object_set(req, "params", params);
doc.AddMember("id", 1, allocator);
doc.AddMember("jsonrpc", "2.0", allocator);
doc.AddMember("method", "login", allocator);
char *buf = json_dumps(req, JSON_COMPACT);
const size_t size = strlen(buf);
rapidjson::Value params(rapidjson::kObjectType);
params.AddMember("login", rapidjson::StringRef(m_url.user()), allocator);
params.AddMember("pass", rapidjson::StringRef(m_url.password()), allocator);
params.AddMember("agent", rapidjson::StringRef(m_agent), allocator);
buf[size] = '\n';
doc.AddMember("params", params, allocator);
json_decref(req);
rapidjson::StringBuffer buffer(0, 512);
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
doc.Accept(writer);
send(buf, size + 1);
const size_t size = buffer.GetSize();
if (size > (sizeof(m_buf) - 2)) {
return;
}
memcpy(m_sendBuf, buffer.GetString(), size);
m_sendBuf[size] = '\n';
m_sendBuf[size + 1] = '\0';
send(size + 1);
}
@@ -365,33 +370,34 @@ void Client::parse(char *line, size_t len)
LOG_DEBUG("[%s:%u] received (%d bytes): \"%s\"", m_url.host(), m_url.port(), len, line);
json_error_t err;
json_t *val = json_loads(line, 0, &err);
if (!val) {
rapidjson::Document doc;
if (doc.ParseInsitu(line).HasParseError()) {
if (!m_quiet) {
LOG_ERR("[%s:%u] JSON decode failed: \"%s\"", m_url.host(), m_url.port(), err.text);
LOG_ERR("[%s:%u] JSON decode failed: \"%s\"", m_url.host(), m_url.port(), rapidjson::GetParseError_En(doc.GetParseError()));
}
return;
}
const json_t *id = json_object_get(val, "id");
if (json_is_integer(id)) {
parseResponse(json_integer_value(id), json_object_get(val, "result"), json_object_get(val, "error"));
}
else {
parseNotification(json_string_value(json_object_get(val, "method")), json_object_get(val, "params"), json_object_get(val, "error"));
if (!doc.IsObject()) {
return;
}
json_decref(val);
const rapidjson::Value &id = doc["id"];
if (id.IsInt64()) {
parseResponse(id.GetInt64(), doc["result"], doc["error"]);
}
else {
parseNotification(doc["method"].GetString(), doc["params"], doc["error"]);
}
}
void Client::parseNotification(const char *method, const json_t *params, const json_t *error)
void Client::parseNotification(const char *method, const rapidjson::Value &params, const rapidjson::Value &error)
{
if (json_is_object(error)) {
if (error.IsObject()) {
if (!m_quiet) {
LOG_ERR("[%s:%u] error: \"%s\", code: %" PRId64, m_url.host(), m_url.port(), json_string_value(json_object_get(error, "message")), json_integer_value(json_object_get(error, "code")));
LOG_ERR("[%s:%u] error: \"%s\", code: %d", m_url.host(), m_url.port(), error["message"].GetString(), error["code"].GetInt());
}
return;
}
@@ -413,10 +419,10 @@ void Client::parseNotification(const char *method, const json_t *params, const j
}
void Client::parseResponse(int64_t id, const json_t *result, const json_t *error)
void Client::parseResponse(int64_t id, const rapidjson::Value &result, const rapidjson::Value &error)
{
if (json_is_object(error)) {
const char *message = json_string_value(json_object_get(error, "message"));
if (error.IsObject()) {
const char *message = error["message"].GetString();
auto it = m_results.find(id);
if (it != m_results.end()) {
@@ -425,7 +431,7 @@ void Client::parseResponse(int64_t id, const json_t *result, const json_t *error
m_results.erase(it);
}
else if (!m_quiet) {
LOG_ERR("[%s:%u] error: \"%s\", code: %" PRId64, m_url.host(), m_url.port(), message, json_integer_value(json_object_get(error, "code")));
LOG_ERR("[%s:%u] error: \"%s\", code: %d", m_url.host(), m_url.port(), message, error["code"].GetInt());
}
if (id == 1 || isCriticalError(message)) {
@@ -435,7 +441,7 @@ void Client::parseResponse(int64_t id, const json_t *result, const json_t *error
return;
}
if (!json_is_object(result)) {
if (!result.IsObject()) {
return;
}
@@ -466,10 +472,7 @@ void Client::parseResponse(int64_t id, const json_t *result, const json_t *error
void Client::ping()
{
char *req = static_cast<char*>(malloc(160));
snprintf(req, 160, "{\"id\":%" PRId64 ",\"jsonrpc\":\"2.0\",\"method\":\"keepalived\",\"params\":{\"id\":\"%s\"}}\n", m_sequence, m_rpcId);
send(req);
send(snprintf(m_sendBuf, sizeof(m_sendBuf), "{\"id\":%" PRId64 ",\"jsonrpc\":\"2.0\",\"method\":\"keepalived\",\"params\":{\"id\":\"%s\"}}\n", m_sequence, m_rpcId));
}
@@ -533,7 +536,7 @@ void Client::onClose(uv_handle_t *handle)
{
auto client = getClient(handle->data);
free(client->m_socket);
delete client->m_socket;
client->m_stream = nullptr;
client->m_socket = nullptr;
@@ -551,7 +554,7 @@ void Client::onConnect(uv_connect_t *req, int status)
LOG_ERR("[%s:%u] connect error: \"%s\"", client->m_url.host(), client->m_url.port(), uv_strerror(status));
}
free(req);
delete req;
client->close();
return;
}
@@ -561,7 +564,7 @@ void Client::onConnect(uv_connect_t *req, int status)
client->setState(ConnectedState);
uv_read_start(client->m_stream, Client::onAllocBuffer, Client::onRead);
free(req);
delete req;
client->login();
}
@@ -578,14 +581,14 @@ void Client::onRead(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
return client->close();;
}
if ((size_t) nread > (kRecvBufSize - 8 - client->m_recvBufPos)) {
if ((size_t) nread > (sizeof(m_buf) - 8 - client->m_recvBufPos)) {
return client->close();;
}
client->m_recvBufPos += nread;
char* end;
char* start = client->m_recvBuf.base;
char* start = buf->base;
size_t remaining = client->m_recvBufPos;
while ((end = static_cast<char*>(memchr(start, '\n', remaining))) != nullptr) {
@@ -602,11 +605,11 @@ void Client::onRead(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
return;
}
if (start == client->m_recvBuf.base) {
if (start == buf->base) {
return;
}
memcpy(client->m_recvBuf.base, start, remaining);
memcpy(buf->base, start, remaining);
client->m_recvBufPos = remaining;
}