2007-09-23 17:05:28 UTC
previous
next
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/format.hpp>
#include <exception>
#include <iostream>
#include <algorithm>
#include <functional>
#include "network.h"
using boost::asio::ip::tcp;
namespace Network
{
boost::scoped_ptr<Manager> manager;
Manager::Manager() : _socket(_io_service)
{
_io_service_work.reset(new boost::asio::io_service::work(_io_service));
_thread.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &_io_service)));
}
Manager::~Manager()
{
_io_service_work.reset();
// Thread may not have been created correctly, so check first
if (_thread)
_thread->join();
}
bool Manager::connect(std::string server, std::string port)
{
tcp::resolver resolver(_io_service);
tcp::resolver::query query(server, port);
tcp::resolver::iterator endpoint_it = resolver.resolve(query);
tcp::resolver::iterator end_it;
boost::asio::error error = boost::asio::error::host_not_found;
while (error && endpoint_it != end_it)
{
_socket.close();
_socket.connect(*endpoint_it, boost::asio::assign_error(error));
endpoint_it++;
}
if (error)
return false;
std::size_t bytes_read = boost::asio::read(_socket,
boost::asio::buffer(_buffer, 4),
boost::asio::transfer_all(),
boost::asio::assign_error(error));
if (error || bytes_read != 4 || _buffer[0] != 'n' || _buffer[1] != 'o'
|| _buffer[2] != 'v' || _buffer[3] != 'a')
{
_socket.close();
return false;
}
try
{
boost::asio::write(_socket, boost::asio::buffer("nova", 4));
boost::asio::read(_socket, boost::asio::buffer(_buffer, 2));
// Convert length from network byte order
unsigned short version_length = _buffer[0];
version_length = (version_length << 8) + _buffer[1];
if (version_length > (_buffer.size() - 3))
throw;
boost::asio::read(_socket, boost::asio::buffer(_buffer, version_length));
_buffer[version_length] = 0;
std::cout << _buffer.c_array() << std::endl;
// TODO: Better version string
std::string my_version = "nova-client SVN";
_buffer[0] = 0;
_buffer[1] = my_version.length();
boost::asio::write(_socket, boost::asio::buffer(_buffer, 2));
boost::asio::write(_socket, boost::asio::buffer(my_version));
boost::asio::async_read(_socket, boost::asio::buffer(_buffer),
boost::bind(&Manager::receive_data, this, _1, _2));
return true;
}
catch(std::exception &e)
{
_socket.close();
return false;
}
}
void Manager::receive_data(
const boost::asio::error& error,
std::size_t bytes_transferred)
{
// Data header format: (network byte order)
// stream_id 2 bytes
// data_length 2 bytes
unsigned short stream_id = _buffer[0];
stream_id = (stream_id << 8) + _buffer[1];
unsigned short data_length = _buffer[2];
data_length = (data_length << 8) + _buffer[3];
// 0xFFFF = new stream
// 0xFFFE = reponse to new stream request
switch (stream_id)
{
case 0xFFFE:
// data_length stores the new stream_id
if (_new_streams.size() == 0)
{
// TODO: Do error message
return;
}
if (data_length == _streams.size())
{
boost::mutex::scoped_lock lock(_new_streams_mutex);
_streams.push_back(_new_streams.front());
_new_streams.pop();
}
else
{
if (data_length >= _streams.size() || _streams[data_length].callback)
{
// TODO: Do error message
return;
}
boost::mutex::scoped_lock lock(_new_streams_mutex);
_streams[data_length] = _new_streams.front();
_new_streams.pop();
}
break;
case 0xFFFF:
boost::asio::read(_socket, boost::asio::buffer(_buffer, 2));
unsigned short new_stream_id = _buffer[0];
new_stream_id = (new_stream_id << 8) + _buffer[1];
// We allocate data_length-2+1 because we remove 2 bytes for
// the stream id that we just got, and we add a byte so we
// can add a null to make a string
boost::scoped_array<unsigned char> buffer(new unsigned char[data_length-2+1]);
boost::asio::read(_socket, boost::asio::buffer(buffer, data_length-2));
buffer[data_length-2] = 0;
boost::mutex::scoped_lock lock(_stream_types_mutex);
if (new_stream_id > _streams.size() ||
(new_stream_id < _streams.size() && _streams[new_stream_id].callback))
{
// TODO: Handle error
}
for (std::list<StreamTypeInfo>::iterator it = _stream_types.begin();
it != _stream_types.end(); it++)
{
if (it->stream_type == buffer)
{
StreamInfo si;
it->callback(new_stream_id, si.buffer, si.callback);
if (new_stream_id == _streams.size())
_streams.push_back(si);
else
_streams[new_stream_id] = si;
break;
}
// TODO: Handle case where stream_type is not found
}
break;
default:
if (stream_id >= _streams.size() || !_streams[stream_id].callback
|| data_length > boost::asio::buffer_size(_streams[stream_id].buffer))
{
// TODO: Handle error
}
boost::asio::read(_socket, _streams[stream_id].buffer);
_streams[stream_id].callback();
boost::asio::async_read(_socket, boost::asio::buffer(_buffer),
boost::bind(&Manager::receive_data, this, _1, _2));
}
}
/* void Manager::start_stream(
std::string stream_type,
boost::asio::mutable_buffer buffer,
boost::function<void ()> callback)
{
unsigned char header[4];
header[0] = 0xFF; // no stream_id
header[1] = 0xFF;
header[2] = (stream_type.size() >> 8) & 0xFF;
header[3] = stream_type.size() & 0xFF;
boost::mutex::scoped_lock lock1(_new_streams_mutex);
_new_streams.push(StreamInfo(buffer, callback));
boost::mutex::scoped_lock lock2(_send_data_mutex);
boost::asio::write(_socket, boost::asio::buffer(header, 4));
boost::asio::write(_socket, boost::asio::buffer(stream_type));
}*/
/* void Manager::send_data(
unsigned short stream_id,
boost::asio::const_buffer data,
unsigned short data_length)
{
if (stream_id >= _streams.size() || !_streams[stream_id].callback
|| data_length > boost::asio::buffer_size(data))
{
// TODO: Handle error
}
unsigned char header[4];
header[0] = (stream_id >> 8) & 0xFF;
header[1] = stream_id & 0xFF;
header[2] = (data_length >> 8) & 0xFF;
header[3] = data_length & 0xFF;
boost::mutex::scoped_lock lock(_send_data_mutex);
boost::asio::write(_socket, boost::asio::buffer(header, 4));
boost::asio::write(_socket, boost::asio::buffer(data, data_length));
}*/
void Manager::register_stream_type(
std::string stream_type,
boost::function<void
(unsigned short,
boost::asio::mutable_buffer&,
boost::function<void ()>&)> callback)
{
boost::mutex::scoped_lock lock(_stream_types_mutex);
_stream_types.push_back(StreamTypeInfo(stream_type, callback));
}
Stream::Stream() : _stream_id(0xFFFF)
{
}
Stream::~Stream()
{
}
void Stream::connect_stream(
std::string string_type,
boost::asio::mutable_buffer buffer,
boost::function<void ()> callback)
{
if (_stream_id != 0xFFFF)
{
// TODO: Handle error
}
for (unsigned short i = 0; i < manager._streams.size(); i++)
if (!manager._streams[i].callback)
{
_stream_id = i;
break;
}
if (_stream_id == 0xFFFF)
{
_stream_id = manager._streams.size();
manager._streams.push_back(Manager::StreamInfo(buffer, callback))
}
else
manager._streams[_stream_id] = Manager::StreamInfo(buffer, callback));
unsigned char header[6];
header[0] = 0xFF;
header[1] = 0xFF;
header[2] = (_stream_id >> 8) & 0xFF;
header[3] = _stream_id & 0xFF;
header[4] = (stream_type.size() >> 8) & 0xFF;
header[5] = stream_type.size() & 0xFF;
boost::mutex::scoped_lock lock(manager._send_data_mutex);
boost::asio::write(_socket, boost::asio::buffer(header, 6));
boost::asio::write(_socket, boost::asio::buffer(stream_type));
}
void Stream::send_data(
boost::asio::const_buffer data,
unsigned short data_length)
{
unsigned char header[4];
header[0] = (_stream_id >> 8) & 0xFF;
header[1] = _stream_id & 0xFF;
header[2] = (data_length >> 8) & 0xFF;
header[3] = data_length & 0xFF;
boost::mutex::scoped_lock lock(manager._send_data_mutex);
boost::asio::write(manager._socket, boost::asio::buffer(header, 4));
boost::asio::write(manager._socket, boost::asio::buffer(data, data_length));
}
}