using System; using System.IO; using System.Collections.Generic; using System.Text; using System.Net.Sockets; using ProtoBuf; using Thousandto.Core.Base; using Thousandto.Core.Framework; using System.Threading; using Thousandto.Core.Support; using Thousandto.Code.Logic.Network; using PathUtils = UnityEngine.Gonbest.MagicCube.PathUtils; using CoroutinePool = UnityEngine.Gonbest.MagicCube.CoroutinePool; using StringUtils = UnityEngine.Gonbest.MagicCube.StringUtils; namespace Thousandto.Plugins.Common { public delegate void DoResMessageDelegate(uint msgid, byte[] bytes); /// /// socket管理类,主要处理发送数据的打包以及对收到的数据调用处理函数 /// 重要方法SendMessage 、 update /// /// public class Networker : BaseSystem { public MyAction OnUpdateFunc; public float RecordMsgTagTime = 0; private MyAction _onDisconnnect; //回调函数队列 private SyncQueue, bool>> _calbackQueue = new SyncQueue, bool>>(); //lua消息响应事件 public DoResMessageDelegate ResLuaMessageEvent = (msgid,bytes) => { }; public static List GetAllProtoPath() { return Thousandto.Code.Logic.LuaProtoBuff.GetAllProtoPath(); } public MyAction OnSocketDisconnect { get { return _onDisconnnect; } set { _onDisconnnect = value; if (_sockeClient != null) { _sockeClient.OnSocketDisconnect = x => { if (_onDisconnnect != null) _onDisconnnect(x); }; } } } #region 成员变量 // default vars in different platform #if (UNITY_IPHONE || UNITY_ANDROID) && !UNITY_EDITOR const float MaxMsgBlockingTime = 0.1f; const int MaxMsgBlockingFrame = 5; #else //每隔一定时间来处理一条消息 private const float MaxMsgBlockingTime = 5.0f; private const int MaxMsgBlockingFrame = 60; #endif //发消息需要记录发送消息的个数,与服务器做一一对应 private static Object lockSendCounter = new Object(); private static int msgSendCounter = 0; private static System.Random random = new Random(); //发送缓存最大为100k private static int BYTE_BUFFER_SIZE = 1024 * 100; // connecting / connected -- IP, Port public string IP = "124.220.47.41"; public int Port = 44801; //用于间隔MaxMsgBlockingTime这个时间来处理一条消息 private float _msgBlockingTime = 0; private int _msgBlockingFrame = 0; //跳帧处理消息 private int _msgLoopSkipCount = 0; private float _networkDelayCount = 0; private Byte[] _serializeBuffer = null; //发送消息的内存流,可以复用 private MemoryStream _serializeStream = null; //系列化消息的内存流,可以复用 private static MemoryStream _serializeMsgStream = null; private BinaryWriter _binaryWriter = null; private SocketClient _sockeClient = null; private Thread _thread; private bool _enableThread; #endregion #region get set方法 static Networker _sharedInstance = null; public static Networker Instance { get { return SharedInstance; } } public static Networker SharedInstance { get { if (_sharedInstance == null) { _sharedInstance = new Networker(); } return _sharedInstance; } } public bool IsConnected { get { return _sockeClient != null && _sockeClient.IsConnected; } } #endregion #region socket连接和断开 public void Connect(MyAction requestCallback = null) { _msgLoopSkipCount = 0; if (_sockeClient != null) { System.Diagnostics.Trace.Write(string.Format("begin connect ip:[{0}] port[{1}]", IP, Port)); _sockeClient.Connect((result) => { System.Diagnostics.Trace.Write(string.Format("connect ip:[{0}] port[{1}] result[{2}]", IP, Port, result)); if (result) { ReInitSocketCounter(); _sockeClient.EnableReceive(true); } else { _sockeClient.EnableReceive(false); } _calbackQueue.Enqueue(new KeyValuePair, bool>(requestCallback, result)); //if (requestCallback != null) //{ // requestCallback(result); //} }, IP, Port); } } public void Disconnect(bool notify = true) { System.Diagnostics.Trace.Write("Disconnect"); if (_sockeClient != null) { _sockeClient.Close(notify); } } public void TextCloseForReconnect() { if (_sockeClient != null) _sockeClient.TextCloseForReconnect(); } #endregion public void Initialize() { _msgLoopSkipCount = 0; _serializeBuffer = new Byte[BYTE_BUFFER_SIZE]; _serializeStream = new MemoryStream(_serializeBuffer); _binaryWriter = new FastBinaryWriter(_serializeStream); _sockeClient = new SocketClient(); _sockeClient.Initialize(); _enableThread = false; //心跳消息和断线重连消息需要在子线程做处理 _sockeClient.SetThreadHandelMsgID((int)MSG_heart.ResHeart.MsgID); _sockeClient.SetThreadHandelMsgID((int)MSG_heart.ResReconnectSign.MsgID); //优先处理的消息,比如伤害包 //_sockeClient.SetHighPriorityHandleMsgID((int)MSG_Fight.ResAttackResult.MsgID.eMsgID); //startThread(); } public void Uninitialize() { if (_sockeClient != null) { _sockeClient.Uninitialize(); //_sockeClient = null; } StopThread(); if (_serializeMsgStream != null) { _serializeMsgStream.Dispose(); _serializeMsgStream = null; } } public void SkipMessageLoop(int count = 1) { _msgLoopSkipCount += count; } public int GetSkipCount() { return _msgLoopSkipCount; } public void Send(byte[] mm, uint msgID) { //UnityEngine.Debug.LogWarning("[new发送消息:" + msgID + "] " + GetStr(mm)); if (GameGlobalData.IsReconnecting && msgID != MSG_heart.ReqReconnect.MsgID) { System.Diagnostics.Trace.Fail("当前是断线重连, 普通消息抛弃掉: " + msgID); return; } if (_thread == null || _thread.IsAlive == false) { System.Diagnostics.Trace.Fail(string.Format("msg id[{0}] 消息线程挂了", msgID)); } if (_sockeClient == null || _sockeClient.IsConnected == false) { System.Diagnostics.Trace.Fail(string.Format("msg id[{0}] 网络已经断开", msgID)); } if (msgID > 0) { try { NetworkpackageProfiler.PushTimestampData((uint)msgID, NetworkPackageTimestamp.MsgType.Send); int curMsgSendCounter = 0; // SendMessage可能多线程调用,需要加锁 lock (lockSendCounter) { curMsgSendCounter = msgSendCounter; msgSendCounter++; MemoryStream m = _serializeStream; m.SetLength(0); m.Position = 0; BinaryWriter s = _binaryWriter; UInt32 size = 0; UInt32 tempId = 0; UInt32 timeNow = (UInt32)TimeUtils.GetNow(); UInt32 id = (UInt32)msgID; size = (UInt32)mm.Length; tempId = (UInt32)curMsgSendCounter ^ (0x6B << 10); tempId = tempId ^ (16 + size); s.Write(MessageData.convertToBigInt(size + 16)); s.Write(MessageData.convertToBigInt(tempId)); uint ext = tempId % 100; byte[] timeBytes = MessageData.convertToBigInt(timeNow); byte[] idBytes = MessageData.convertToBigInt(id); int total = sumTotal(timeBytes) + sumTotal(idBytes) + sumTotal(mm); s.Write(MessageData.convertToBigInt((uint)total)); s.Write(xor(timeBytes, ext)); s.Write(xor(idBytes, ext)); s.Write(xor(mm, ext)); byte[] dataByte = new byte[size + 4 * 5]; Buffer.BlockCopy(_serializeBuffer, 0, dataByte, 0, dataByte.Length); _sockeClient.PostMessage(dataByte); } } catch (Exception e) { System.Diagnostics.Trace.Fail(e.Message + "\n" + e.StackTrace); } } ProtoBufUtils.Free(mm); } /// /// 结构--> |消息大小+12|消息计数器|时间戳|消息id|消息体| /// 消息发送有改动,新增一个消息ascii值的和作为一位,时间戳、消息id、消息体与计数器余数异或 /// 新结构--> |消息大小+16|消息计数器|ascii和|时间戳每位与余数异或|消息id每位与余数异或|消息体每位与余数异或| /// /// /// public void SendMessage(ProtoBuf.IExtensible msg, int msgID) { //UnityEngine.Debug.LogWarning("[old发送消息:" + msgID + "] " + GetStr(Serialize(msg))); } //异或 private byte[] xor(byte[] value, uint arg) { for (int i = 0; i < value.Length; ++i) { value[i] = (byte)(value[i] ^ arg); } return value; } //求和 private int sumTotal(byte[] value) { int total = 0; for (int i = 0; i < value.Length; ++i) { total += value[i]; } return total; } public void ReflushSendQueue() { _sockeClient.ReflushSendQueue(); } public void StopThread() { _enableThread = false; } /// /// 启动一个子线程,用来发送消息和处理需要在子线程里面处理的消息,比如心跳消息 /// public void StartThread() { if (_enableThread == true) { return; } _enableThread = true; _thread = new Thread(new ThreadStart(threadFunc)); _thread.IsBackground = true; _thread.Start(); } //多线程协议处理 private void threadFunc() { DateTime nowTime = DateTime.Now; int nCurThreadID = Thread.CurrentThread.ManagedThreadId; System.Diagnostics.Trace.Write(string.Format("Networker threadFunc start nowTime[{0}] nCurThreadID[{1}]", nowTime, nCurThreadID)); while (_enableThread && _sockeClient != null) { nowTime = System.DateTime.Now; Thread.Sleep(1); //peek需要在子线程处理的消息,比如心跳和断线重连消息 MessageData data = _sockeClient.PopMessageHandelInThread(); if (data != null) { try { //通过消息ID找到反序列化改消息的方法,主要是不同消息的类不同,需要不同的反序列化方法,统一不了 //IExtensible msg = HandleMsg.DeserializeMsgById((int)data.MsgID, data.Data, (int)data.DataSize); IExtensible msg = ProtoBuf.Serializers.CustomSetting.TryGetDeserializer(data.MsgID); if (msg != null) { IResMessage res = msg as IResMessage; res.ReadMessage(data.Data); msg.Excute(); } else { System.Diagnostics.Trace.Fail(string.Format("消息解析有问题,msg=null, id={0}, dataLen={1}", data.MsgID, data.Data == null ? 0 : data.Data.Length)); } } catch (Exception ex) { System.Diagnostics.Trace.Fail(string.Format("Excute message DataSize[{0}] id={1},throw exception:{2}", data.DataSize, data.MsgID, ex.Message)); System.Diagnostics.Trace.Fail(ex.Message + "\n" + ex.StackTrace); StringBuilder sb = new StringBuilder(); for (int i = 0; i < data.Data.Length; ++i) { sb.Append(data.Data[i]); sb.Append(" "); } System.Diagnostics.Trace.Fail(sb.ToString()); } finally { MessageData.FreeMsgData(data); } } //消息发送放到了后台线程来做 _sockeClient.ReflushSendQueue(); //心跳发送倒计时 if (OnUpdateFunc != null) { OnUpdateFunc((int)(DateTime.Now - nowTime).TotalMilliseconds); } } nowTime = DateTime.Now; System.Diagnostics.Trace.Write(string.Format("Networker threadFunc end nowTime[{0}] nCurThreadID[{1}]", nowTime, nCurThreadID)); } /// /// 这里主要是添加心跳处理函数,发送心跳消息用 /// /// public void AddThreadHandleFunc(MyAction func) { OnUpdateFunc = func; } protected override bool OnUpdate(float deltaTime) { //处理回调方法 if (_calbackQueue.Count > 0) { var callback = _calbackQueue.Dequeue(); if (callback.Key != null) callback.Key(callback.Value); } return true; } /// /// 处理消息队列 /// /// public void FixedUpdate(float deltaTime) { // 已处理完成的消息池需要定时回收内存 //MessageData.Update(deltaTime); if (_sockeClient == null) { return; } if (_sockeClient.IsConnected) { // 跳帧 if (_msgLoopSkipCount > 0) { --_msgLoopSkipCount; return; } #if UNITY_EDITOR bool debugMsg = false; int unHandleMsgCount = _sockeClient.ReceiveNormalMsgCount(); //待处理的消息超过60个,会造成明显卡顿,这里要给条日志,告知服务器消息太多了 if (unHandleMsgCount > 60) { //UnityEngine.Debug.LogError("Too many message receive in one frame, cout=" + unHandleMsgCount); debugMsg = true; } #endif //如果接收到的消息超过60个还没有被处理,说明游戏从后台切到前台,积累的很多消息没处理 //这里就在1帧里集中处理 do { MessageData data = _sockeClient.PopNormalMessage(); if (data != null) { try { //if(data.DataSize > 512) // UnityEngine.Debug.LogError("MsgID = " + data.MsgID + ", Size = " + data.DataSize); //lock (_lockObj) { if (MsgExtend.SharedInstance.IsGoLuaMsg(data.MsgID)) { //执行纯lua消息 //如果Client\Main\Assets\GameAssets\Resources\Lua\Network\ResMsgCMD.Lua 中有该CMD,或者ID大于500000,走Lua端消息 UnityEngine.Profiling.Profiler.BeginSample("Lua Process Message:" + data.MsgID); ResLuaMessageEvent(data.MsgID, data.Data); UnityEngine.Profiling.Profiler.EndSample(); continue; } if (MsgExtend.SharedInstance.IsGoLuaExtendMsg(data.MsgID)) { //执行lua和C#同时处理的消息 UnityEngine.Profiling.Profiler.BeginSample("Lua Process Extend Message:" + data.MsgID); ResLuaMessageEvent(data.MsgID, data.Data); UnityEngine.Profiling.Profiler.EndSample(); } //UnityEngine.Profiling.Profiler.BeginSample("====================== Networker cs: "+ data.MsgID); //通过消息ID找到反序列化改消息的方法,主要是不同消息的类不同,需要不同的反序列化方法,统一不了 //IExtensible msg = HandleMsg.DeserializeMsgById((int)data.MsgID, data.Data, (int)data.DataSize); IExtensible msg = ProtoBuf.Serializers.CustomSetting.TryGetDeserializer(data.MsgID); if (msg != null) { IResMessage res = msg as IResMessage; res.ReadMessage(data.Data); #if UNITY_EDITOR //if (UnityEngine.Time.realtimeSinceStartup - RecordMsgTagTime < 10) //{ // UnityEngine.Debug.Log("Reconnect: receive msg - " + "id=" + data.MsgID + "--" + msg); //} var startTime = DateTime.Now; NetworkpackageProfiler.PushTimestampData(data.MsgID, NetworkPackageTimestamp.MsgType.Receive); if (debugMsg) { //UnityEngine.Debug.LogError("HandMsg =" + msg.ToString()); } #endif if (TestMessage) { JsonSerialize.WriteTestLog(msg); } else JsonSerialize.Stop(); #if ENABLE_PROFILER //UnityEngine.Profiler.BeginSample("msg.Excute:" + msg.GetType().Name); #endif msg.Excute(); #if ENABLE_PROFILER //UnityEngine.Profiler.EndSample(); #endif MessageData.FreeMsgData(data); #if UNITY_EDITOR var endTime = DateTime.Now; NetworkpackageProfiler.PushTimeElapsed(data.MsgID, (endTime - startTime).TotalMilliseconds); #endif //UnityEngine.Profiling.Profiler.EndSample(); } else { System.Diagnostics.Trace.Fail(string.Format("消息解析有问题,msg=null, id={0}, dataLen={1}", data.MsgID, data.Data == null ? 0 : data.Data.Length)); break; } } } catch (Exception ex) { System.Diagnostics.Trace.Fail(string.Format("Excute message DataSize[{0}] id={1},throw exception:{2}", data.DataSize, data.MsgID, ex.Message)); System.Diagnostics.Trace.Fail(ex.Message + "\n" + ex.StackTrace); StringBuilder sb = new StringBuilder(); for (int i = 0; i < data.Data.Length; ++i) { sb.Append(data.Data[i]); sb.Append(" "); } System.Diagnostics.Trace.Fail(sb.ToString()); } } } while (_sockeClient.ReceiveNormalMsgCount() > 0); } } private string GetStr(byte[] bytes) { if (bytes.Length > 0) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < bytes.Length; i++) { sb.Append(bytes[i]); sb.Append(" "); } return sb.ToString(); } return ""; } public int GetSocketErrorCode() { return _sockeClient.GetSocketErrorCode(); } /// /// 重置消息数,当socket断开或者重连都需要调用 /// public static void ReInitSocketCounter() { msgSendCounter = 0; } //从连接开始计数,总共发送消息的个数 public static int GetMsgCount() { return msgSendCounter; } ////请优化@喻强 ///// ///// 消息序列化 ///// ///// ///// //public static byte[] Serialize(IExtensible msg) //{ // byte[] result; // //这里new不能优化,不能清空stream的内存 // // using (var stream = new MemoryStream()) // // { // // Serializer.Serialize(stream, msg); // // result = stream.ToArray(); // // } // if (_serializeMsgStream == null) // { // _serializeMsgStream = new MemoryStream(); // } // //设置为0后就可以清空内存,该对象就可以复用 // _serializeMsgStream.SetLength(0); // Serializer.Serialize(_serializeMsgStream, msg); // result = _serializeMsgStream.ToArray(); // return result; //} /// /// 反序列化 /// /// /// /// //public static T Deserialize(byte[] message) //{ // T result; // using (var stream = new MemoryStream(message)) // { // result = Serializer.Deserialize(stream); // } // return result; //} #region 测试用的,QA用来白盒测试消息数据是否正确 //开关 public static bool TestMessage; private class JsonSerialize { private static string _logPath = "Log/"; private static StreamWriter _sw; private static SyncQueue _logQueue; private static bool _init; private static Thread _thread; public static void WriteTestLog(object msg) { if (_sw == null) { _init = true; string dirName = _logPath; if (!Directory.Exists(dirName)) { Directory.CreateDirectory(dirName); } _sw = new StreamWriter(dirName + "/" + DateTime.Now.ToString("yyyy-MM-dd HH-mm-ss") + "_MsgLog.txt"); _logQueue = new SyncQueue(); _thread = new Thread(writeThread); _thread.IsBackground = true; _thread.Start(); } string json = ToJson(msg); _logQueue.Enqueue(json); UnityEngine.Debug.LogError(json); } public static void Stop() { if (!_init) { return; } _init = false; if (_logQueue != null) { _logQueue.Clear(); _logQueue = new SyncQueue(); } if (_thread != null) { _thread.Join(1000); } _sw = null; } private static void writeThread() { while (_init) { Thread.Sleep(1); if (_logQueue.Count > 0) { string line = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss -> ") + _logQueue.Dequeue(); _sw.WriteLine(line); } } _sw.Close(); } public static void Test() { MSG_Register.ResLoginGameSuccess info = new MSG_Register.ResLoginGameSuccess(); info.infoList.Add(new MSG_Register.RoleBaseInfo()); var value = ToJson(info); UnityEngine.Debug.Log(value); } /// /// List转成json /// /// /// /// /// public static string ListToJson(IList list, string jsonName) { StringBuilder Json = new StringBuilder(); if (string.IsNullOrEmpty(jsonName)) jsonName = list[0].GetType().Name; Json.Append("{\"" + jsonName + "\":["); if (list.Count > 0) { for (int i = 0; i < list.Count; i++) { T obj = Activator.CreateInstance(); System.Reflection.PropertyInfo[] pi = obj.GetType().GetProperties(); Json.Append("{"); for (int j = 0; j < pi.Length; j++) { Type type = pi[j].GetValue(list[i], null).GetType(); Json.Append("\"" + pi[j].Name.ToString() + "\":" + StringFormat(pi[j].GetValue(list[i], null).ToString(), type)); if (j < pi.Length - 1) { Json.Append(","); } } Json.Append("}"); if (i < list.Count - 1) { Json.Append(","); } } } Json.Append("]}"); return Json.ToString(); } /// /// List转成json /// /// /// /// public static string ListToJson(IList list) { object obj = list[0]; return ListToJson(list, obj.GetType().Name); } /// /// 对象转换为Json字符串 /// /// 对象 /// Json字符串 public static string ToJson(object jsonObject) { string jsonString = "{"; System.Reflection.PropertyInfo[] propertyInfo = jsonObject.GetType().GetProperties(); var method = jsonObject.GetType().GetMethod("GetMsgID"); if (method != null) { var className = jsonObject.GetType().Name; jsonString += "\"Class\":" + "'" + className + "'" + ","; var msgID = (int)method.Invoke(jsonObject, null); if (msgID != 0) { jsonString += "\"MsgID\":" + msgID + ","; } } for (int i = 0; i < propertyInfo.Length; i++) { object objectValue = null; try { objectValue = propertyInfo[i].GetGetMethod().Invoke(jsonObject, null); } catch (System.Exception ex) { UnityEngine.Debug.LogException(ex); try { objectValue = propertyInfo[i].GetValue(jsonObject, null); } catch (System.Exception e2) { UnityEngine.Debug.LogException(e2); objectValue = null; } } string value = string.Empty; if (objectValue == null) { value = "'null'"; } else if (objectValue is string) { value = "'" + (objectValue.ToString()) + "'"; } else if (objectValue.GetType().IsValueType) { value = (objectValue.ToString()); } else if (objectValue is System.Collections.IEnumerable) { value = ToJson((System.Collections.IEnumerable)objectValue); } else { value = ToJson(objectValue); } jsonString += "\"" + (propertyInfo[i].Name) + "\":" + value + ","; } jsonString.Remove(jsonString.Length - 1, 1); jsonString = jsonString.TrimEnd(','); return jsonString + "}"; } /// /// 对象集合转换Json /// /// 集合对象 /// Json字符串 public static string ToJson(System.Collections.IEnumerable array) { string jsonString = "["; foreach (object item in array) { jsonString += ToJson(item) + ","; } if (jsonString.Length == 0) { return ""; } jsonString.Remove(jsonString.Length - 1, 1); return jsonString + "]"; } /// /// 普通集合转换Json /// /// 集合对象 /// Json字符串 public static string ToArrayString(System.Collections.IEnumerable array) { string jsonString = "["; foreach (object item in array) { jsonString = ToJson(item.ToString()) + ","; } jsonString.Remove(jsonString.Length - 1, jsonString.Length); return jsonString + "]"; } /// /// 过滤特殊字符 /// /// /// private static string String2Json(String s) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < s.Length; i++) { char c = s.ToCharArray()[i]; switch (c) { case '\"': sb.Append("\\\""); break; case '\\': sb.Append("\\\\"); break; case '/': sb.Append("\\/"); break; case '\b': sb.Append("\\b"); break; case '\f': sb.Append("\\f"); break; case '\n': sb.Append("\\n"); break; case '\r': sb.Append("\\r"); break; case '\t': sb.Append("\\t"); break; default: sb.Append(c); break; } } return sb.ToString(); } /// /// 格式化字符型、日期型、布尔型 /// /// /// /// private static string StringFormat(string str, Type type) { if (type == typeof(string)) { str = String2Json(str); str = "\"" + str + "\""; } else if (type == typeof(DateTime)) { str = "\"" + str + "\""; } else if (type == typeof(bool)) { str = str.ToLower(); } return str; } } #endregion } }