Producer/consumer queues in C#
What is a producer/consumer queue
In a restaurant there are chefs in the kitchen who wait for orders from customers. As soon as a customer orders something, they (the chefs) make it and serve it to them (the customer). A producer/consumer queue is the same. There are workers who wait for a "job" to be entered into the queue and then process it when it is entered. The advantage of this method is that the workers consume negligible resources when they wait for a job to be added to the queue. So, "jobs" can be finished as soon as possible.
One of the advantages of this approach is that the class which adds the jobs to the queue (the customer) doesn't have to wait for the workers (the chefs) to finish what they are doing. It will keep inserting the jobs (orders) and the workers (chefs) can take their time processing it (cooking the order).
How do we accomplish this in C# you ask? Simple. The workers will check a job queue to see if anything has been inserted. If they find that there is a job waiting to be processed, then they will remove it from the queue and process it. This way, two workers won't end up working on the same job. If there are no jobs in the queue then the worker will go to sleep and wait for a signal to wake up when something is added to the queue. The "job manager" or the class that adds the jobs, will insert nulls if it is done adding jobs.
If that was hard to understand, let me explain it in another way. The restaurant will hire 20 chefs and 1 waiter. The customers will place their orders with the waiter. A chef will ask the waiter if he needs to cook anything for a customer. If the waiter says, "Yes, a customer ordered lamb curry" then the chef will start cooking lamb curry. If the orders of all customers have been cooked (or are being cooked by some other chef), then the chef will go to sleep and ask the waiter to wake him up if a customer orders something. However, the waiter will not wake up a specific chef. He will wake up all the chefs when an order comes in. The chef who wakes up first will start cooking and the rest of them will go back to sleep (and ask the waiter to wake them up if any other order comes in). At the end of the day, the waiter will give blank orders signaling to the chefs that they are done for the day.
Now that we know what a producer/consumer queue is, let's see how we can implement it in C#.
Frist we will take a look at the class which will "consume" the jobs - the resaurant containing the chefs and the waiter.
[csharp]
using System;
using System.Threading;
using System.Collections.Generic;
public class WorkerQueue : IDisposable
{
/* We will store our worker threads in this list. */
private List
/* The number of worker threads to use */
private int threadsToUse = 20;
/* synchronization lock */
private object locker = new object();
/* The queue in which jobs are stored */
private Queue
/* wait handle - used to wake up sleeping threads and to make them wait (sleep) */
EventWaitHandle wh = new AutoResetEvent(false);
/* public variable for accessing threadsToUseProperty */
public int ThreadsToUse { set {threadsToUse=value;} get { return threadsToUse; } }
/* The constructor of this class */
public void WorkerQueue()
{
/* In the constructor, we will start worker threads, which will
* wait for a job to be entered into the queue.
*/
for (int i; i
Thread t = new Thread(work);
threads.Add(t);
t.Start();
}
}
/* This is the method that other classes will use to add jobs to the queue
* All it does it, add the job to the queue and signal the worker threads
* that something has been added, so they can wake up and process it
* (if they are asleep). If the worker threads are already awake, then
* wh.Set() will have no effect (and won't error out)
*
* This is the waiter taking orders
*/
public void AddJob(string item)
{
lock (locker)
{
jobs.Enqueue(item);
}
wh.Set();
}
/* The method that will be used in worker threads. This will check our job queue
* to see if any jobs are available. If any job is available, then it will process that job
* and check for more jobs. If there are no jobs in the queue, then it will go to sleep
* (by using waitHandle.WaitOne method) until a new job is entered into the queue.
*
* In this method, we will end the worker queue by inserting nulls in the queue,
* so this worker thread will watch out for that and kill itself (by returning control)
* when it sees a null.
*
* This is our chef
*/
private void work()
{
while (true)
{
string item = null;
lock (locker)
{
/* chefs asking the waiter if anything needs to be cooked */
if (jobs.Count > 0)
{
item = jobs.Dequeue();
/* return if a null is found in the queue */
if (item == null) return;
}
}
if (item != null)
{
/* if a job was found then process it */
Console.WriteLine(item);
}
else
{
/* if a job was not found (meaning list is empty) then
* wait till something is added to it
*/
wh.WaitOne();
}
}
}
/* When the object of this class is about to be removed from memory
* we will signal all the worker threads to stop by inserting nulls in the queue
* and then we will wait for them to finish the jobs already in the queue
*
* Letting everyone know we are done for the day
*/
public void Dispose()
{
for (int i=0; i
AddJob(null);
}
for (int i=0; i
threads[i].Join();
}
}
}
[/csharp]
Now that we know what the restaurant looks like, let's take a look at the customer (the producer which will add the jobs)
[csharp]
using System;
public class Customer
{
public static void Main()
{
WorkerQueue q = new WorkerQueue();
q.AddJob("Lamb curry");
q.AddJob("Beef Steak");
}
}
[/csharp]
C# and SQL Server
In this article, I will discuss how to communicate with SQL Server using C#. You can use this code for communicating with other databases too but the connection string may vary. SQL Server supports SQL queries and stored procedure, and we will take a look at how to run both of them.
I will assume that you know what queries are, what SQL Server is, etc.
When communicating with a database usually you want to do 4 things:
- Connecting to the database
- Writing to the database (insert/update/delete)
- Reading from the database
- Closing the connection when you are done
Before we begin, let's assume that we already have a database which has a table called my_contacts. And, it looks like this:
[sql]
create table my_contacts (
name nvarchar(100),
phone nvarchar(20)
);
[/sql]
Our database server is localhost, the database's name is database_name. The username required to connect is myuser and the password for that user is mypass.
Running Queries
[csharp]
try {
/*************************************************************
* First we make a connection. For this, you will need to
* have the hostname (with which you connect to the database),
* database name, username, password. You can obtain this
* information from you DBA or your network administrator
* SqlConnection is the class you use to make the connection
*************************************************************/
string connectionString = "Server=localhost;Database=database_name;uid=myuser;pwd=mypass;MultipleActiveResultSets=True;";
SqlConnection con = new SqlConnection(connectionString);
/*************************************************************
* After a connection is made, you are free to read and write
* to your database using SQL queries. We will, first, take a
* look at how data is inserted into a table
*************************************************************/
string sql = "insert into my_contacts(name, phone) values ("
+ "'john doe', '123-123-1233')";
SqlCommand cmd = new SqlCommand(sql, con);
cmd.ExecuteNonQuery();
/*************************************************************
* Since insert,update and delete queries don't return anything
* we use ExecuteNonQuery() method to execute the query for them.
* We will use a different method for reading (selecting) the
* data
*************************************************************/
sql = "select * from my_contacts";
cmd.CommandText = sql; //This is another way of setting a query
SqlDataReader dr = cmd.ExecuteReader();
/*************************************************************
* Execute reader executes a query and returns a pointer to a
* reader. You can then traverse through the rows which are
* returned like this
*************************************************************/
while (dr != null && dr.Read())
{
Console.WriteLine(dr["name"].ToString()+" : "
+ dr["phone"].ToString());
}
/*************************************************************
* Remember to close the connection and the reader after you
* are done with it.
* This is very important. You don't want to fill your
* connection pool with unused connections.
*************************************************************/
dr.Close();
con.Close();
} catch (Exception e) {
System.Windows.Forms.MessageBox.Alert(e.ToString());
}
[/csharp]
You will have to include the namespace System.Data.SqlClient for the code above (and below) to work. You can do that by adding "using System.Data.SqlClient;" line at the top of the file.
Running Stored Procedures
A stored procedure is a query (or multiple queries) stored on the SQL Server which can be called from anywhere. An ideal situation where you want to use a stored procedure is when you want to have some of the data logic in your queries (like if statements). Running a stored procedure is very similar to running a SQL query except for a few minor differences.
[csharp]
try {
/*************************************************************
* We connect to the database just like we did for running a
* query. Or, if there is an open connection, we can use that.
* For this example, I will use a fresh connection
*************************************************************/
string connectionString = "Server=localhost;Database=database_name;uid=username;pwd=password;MultipleActiveResultSets=True;";
SqlConnection con = new SqlConnection(connectionString);
/*************************************************************
* After a connection is made, you run the stored procedure
* just like a query
*************************************************************/
string spName = "dbo.StoredProcedureName";
SqlCommand cmd = new SqlCommand(spName, con);
cmd.CommandType = SqlCommand.CommandTypes.StoredProcedure;
cmd.ExecuteNonQuery();
/*************************************************************
* In the above example, no parameters are passed to the
* stored procedure and it returns nothing back. What if you
* needed to pass parameters and read the records returned?
*************************************************************/
cmd.CommandText = "dbo.AddAContactAndGetContactList()";
cmd.Parameters.Add(new SqlParameter("name", "Joe Briefcase"));
cmd.Parameters.Add(new SqlParameter("phone", "123-321-1233"));
SqlDataReader dr = cmd.ExecuteReader();
/*************************************************************
* The constructor for SqlParameter takes 2 parameters -
* the parameter name, and the parameter value. In our case,
* we passed 2 parameters - name and phone.
*
* Execute reader executes a query and returns a pointer to a
* reader. You can then traverse through the rows which are
* returned like this
*************************************************************/
while (dr != null && dr.Read())
{
Console.WriteLine(dr["name"].ToString()+" : "
+ dr["phone"].ToString());
}
dr.Close();
con.Close();
} catch (Exception e) {
System.Windows.Forms.MessageBox.Alert(e.ToString());
}
[/csharp]
That's all for now, folks