Archive / / / network.cpp
2007-09-23 17:05:28 UTC
previous next
#include <boost/asio.hpp> #include <boost/thread.hpp> #include <boost/bind.hpp> #include <boost/format.hpp> #include <exception> #include <iostream> #include <algorithm> #include <functional> #include "network.h" using boost::asio::ip::tcp; namespace Network { boost::scoped_ptr<Manager> manager; Manager::Manager() : _socket(_io_service) { _io_service_work.reset(new boost::asio::io_service::work(_io_service)); _thread.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &_io_service))); } Manager::~Manager() { _io_service_work.reset(); // Thread may not have been created correctly, so check first if (_thread) _thread->join(); } bool Manager::connect(std::string server, std::string port) { tcp::resolver resolver(_io_service); tcp::resolver::query query(server, port); tcp::resolver::iterator endpoint_it = resolver.resolve(query); tcp::resolver::iterator end_it; boost::asio::error error = boost::asio::error::host_not_found; while (error && endpoint_it != end_it) { _socket.close(); _socket.connect(*endpoint_it, boost::asio::assign_error(error)); endpoint_it++; } if (error) return false; std::size_t bytes_read = boost::asio::read(_socket, boost::asio::buffer(_buffer, 4), boost::asio::transfer_all(), boost::asio::assign_error(error)); if (error || bytes_read != 4 || _buffer[0] != 'n' || _buffer[1] != 'o' || _buffer[2] != 'v' || _buffer[3] != 'a') { _socket.close(); return false; } try { boost::asio::write(_socket, boost::asio::buffer("nova", 4)); boost::asio::read(_socket, boost::asio::buffer(_buffer, 2)); // Convert length from network byte order unsigned short version_length = _buffer[0]; version_length = (version_length << 8) + _buffer[1]; if (version_length > (_buffer.size() - 3)) throw; boost::asio::read(_socket, boost::asio::buffer(_buffer, version_length)); _buffer[version_length] = 0; std::cout << _buffer.c_array() << std::endl; // TODO: Better version string std::string my_version = "nova-client SVN"; _buffer[0] = 0; _buffer[1] = my_version.length(); boost::asio::write(_socket, boost::asio::buffer(_buffer, 2)); boost::asio::write(_socket, boost::asio::buffer(my_version)); boost::asio::async_read(_socket, boost::asio::buffer(_buffer), boost::bind(&Manager::receive_data, this, _1, _2)); return true; } catch(std::exception &e) { _socket.close(); return false; } } void Manager::receive_data( const boost::asio::error& error, std::size_t bytes_transferred) { // Data header format: (network byte order) // stream_id 2 bytes // data_length 2 bytes unsigned short stream_id = _buffer[0]; stream_id = (stream_id << 8) + _buffer[1]; unsigned short data_length = _buffer[2]; data_length = (data_length << 8) + _buffer[3]; // 0xFFFF = new stream // 0xFFFE = reponse to new stream request switch (stream_id) { case 0xFFFE: // data_length stores the new stream_id if (_new_streams.size() == 0) { // TODO: Do error message return; } if (data_length == _streams.size()) { boost::mutex::scoped_lock lock(_new_streams_mutex); _streams.push_back(_new_streams.front()); _new_streams.pop(); } else { if (data_length >= _streams.size() || _streams[data_length].callback) { // TODO: Do error message return; } boost::mutex::scoped_lock lock(_new_streams_mutex); _streams[data_length] = _new_streams.front(); _new_streams.pop(); } break; case 0xFFFF: boost::asio::read(_socket, boost::asio::buffer(_buffer, 2)); unsigned short new_stream_id = _buffer[0]; new_stream_id = (new_stream_id << 8) + _buffer[1]; // We allocate data_length-2+1 because we remove 2 bytes for // the stream id that we just got, and we add a byte so we // can add a null to make a string boost::scoped_array<unsigned char> buffer(new unsigned char[data_length-2+1]); boost::asio::read(_socket, boost::asio::buffer(buffer, data_length-2)); buffer[data_length-2] = 0; boost::mutex::scoped_lock lock(_stream_types_mutex); if (new_stream_id > _streams.size() || (new_stream_id < _streams.size() && _streams[new_stream_id].callback)) { // TODO: Handle error } for (std::list<StreamTypeInfo>::iterator it = _stream_types.begin(); it != _stream_types.end(); it++) { if (it->stream_type == buffer) { StreamInfo si; it->callback(new_stream_id, si.buffer, si.callback); if (new_stream_id == _streams.size()) _streams.push_back(si); else _streams[new_stream_id] = si; break; } // TODO: Handle case where stream_type is not found } break; default: if (stream_id >= _streams.size() || !_streams[stream_id].callback || data_length > boost::asio::buffer_size(_streams[stream_id].buffer)) { // TODO: Handle error } boost::asio::read(_socket, _streams[stream_id].buffer); _streams[stream_id].callback(); boost::asio::async_read(_socket, boost::asio::buffer(_buffer), boost::bind(&Manager::receive_data, this, _1, _2)); } } /* void Manager::start_stream( std::string stream_type, boost::asio::mutable_buffer buffer, boost::function<void ()> callback) { unsigned char header[4]; header[0] = 0xFF; // no stream_id header[1] = 0xFF; header[2] = (stream_type.size() >> 8) & 0xFF; header[3] = stream_type.size() & 0xFF; boost::mutex::scoped_lock lock1(_new_streams_mutex); _new_streams.push(StreamInfo(buffer, callback)); boost::mutex::scoped_lock lock2(_send_data_mutex); boost::asio::write(_socket, boost::asio::buffer(header, 4)); boost::asio::write(_socket, boost::asio::buffer(stream_type)); }*/ /* void Manager::send_data( unsigned short stream_id, boost::asio::const_buffer data, unsigned short data_length) { if (stream_id >= _streams.size() || !_streams[stream_id].callback || data_length > boost::asio::buffer_size(data)) { // TODO: Handle error } unsigned char header[4]; header[0] = (stream_id >> 8) & 0xFF; header[1] = stream_id & 0xFF; header[2] = (data_length >> 8) & 0xFF; header[3] = data_length & 0xFF; boost::mutex::scoped_lock lock(_send_data_mutex); boost::asio::write(_socket, boost::asio::buffer(header, 4)); boost::asio::write(_socket, boost::asio::buffer(data, data_length)); }*/ void Manager::register_stream_type( std::string stream_type, boost::function<void (unsigned short, boost::asio::mutable_buffer&, boost::function<void ()>&)> callback) { boost::mutex::scoped_lock lock(_stream_types_mutex); _stream_types.push_back(StreamTypeInfo(stream_type, callback)); } Stream::Stream() : _stream_id(0xFFFF) { } Stream::~Stream() { } void Stream::connect_stream( std::string string_type, boost::asio::mutable_buffer buffer, boost::function<void ()> callback) { if (_stream_id != 0xFFFF) { // TODO: Handle error } for (unsigned short i = 0; i < manager._streams.size(); i++) if (!manager._streams[i].callback) { _stream_id = i; break; } if (_stream_id == 0xFFFF) { _stream_id = manager._streams.size(); manager._streams.push_back(Manager::StreamInfo(buffer, callback)) } else manager._streams[_stream_id] = Manager::StreamInfo(buffer, callback)); unsigned char header[6]; header[0] = 0xFF; header[1] = 0xFF; header[2] = (_stream_id >> 8) & 0xFF; header[3] = _stream_id & 0xFF; header[4] = (stream_type.size() >> 8) & 0xFF; header[5] = stream_type.size() & 0xFF; boost::mutex::scoped_lock lock(manager._send_data_mutex); boost::asio::write(_socket, boost::asio::buffer(header, 6)); boost::asio::write(_socket, boost::asio::buffer(stream_type)); } void Stream::send_data( boost::asio::const_buffer data, unsigned short data_length) { unsigned char header[4]; header[0] = (_stream_id >> 8) & 0xFF; header[1] = _stream_id & 0xFF; header[2] = (data_length >> 8) & 0xFF; header[3] = data_length & 0xFF; boost::mutex::scoped_lock lock(manager._send_data_mutex); boost::asio::write(manager._socket, boost::asio::buffer(header, 4)); boost::asio::write(manager._socket, boost::asio::buffer(data, data_length)); } }